Java multithreading-pipeline

Java multi-threading-pipeline

1. Brief description of the assembly line

Simply put, the pipeline is to disassemble a complex and huge task into several small and simple tasks and execute them in sequence, and use the output value of the current task as the input value of the next task, through the corresponding worker threads in each stage to execute. Pipeline tasks created through Java can greatly improve task execution efficiency and speed, and shorten task execution time.

If a task can be divided into three processes ABC to complete, A takes 1t, B takes 1t, and C takes 1t. Before dismantling, it takes 15t to complete the five tasks. After disassembly, as shown in the figure, the time consumption becomes 8t. When the number of tasks increases, the difference in time consumption will become larger.

Use multiple threads to handle problems
Make full use of the CPU and improve its computing efficiency.
Parallel computing is implemented while allowing dependencies between subtasks.
It is very convenient to use a single-threaded model to process subtasks.

2. Implement pipeline

1. Create pipeline tasks

Create a task class, simulate the content of the task, and perform operations on num of + 10, *20, ^2. After the above processing, the final result of num should be 40000

public class Task {<!-- -->
    int num = 0;
\t
    public void taskA() {<!-- -->
        num + = 10;
    }

    public void taskB() {<!-- -->
        num *= 20;
    }

    public void taskC() {<!-- -->
        num *= num;
    }
}

2. Create worker thread

//Thread A processes the taskA() link task in Task
public class TaskThreadA extends Thread {<!-- -->

    ArrayList<Task> tasks;

    public TaskThreadA(ArrayList tasks) {<!-- -->
        this.tasks = tasks;
    }

    @Override
    public void run() {<!-- -->
        for (int i = 0; i < tasks.size(); i + + ) {<!-- -->
            Task task = tasks.get(i);
            task.taskA();
        }
    }
}
//Thread B processes the taskB() link task in Task
class TaskThreadB extends Thread {<!-- -->

    ArrayList<Task> tasks;

    public TaskThreadB(ArrayList tasks) {<!-- -->
        this.tasks = tasks;
    }

    @Override
    public void run() {<!-- -->
        for (int i = 0; i < tasks.size(); i + + ) {<!-- -->
            Task task = tasks.get(i);
            task.taskB();
        }
    }
}
//Thread C processes the taskC() link task in Task
class TaskThreadC extends Thread {<!-- -->

    ArrayList<Task> tasks;

    public TaskThreadC(ArrayList tasks) {<!-- -->
        this.tasks = tasks;
    }

    @Override
    public void run() {<!-- -->
        for (int i = 0; i < tasks.size(); i + + ) {<!-- -->
            Task task = tasks.get(i);
            task.taskC();
        
    }
}

3. Start the pipeline

public static void main(String[] args) {<!-- -->
\t//task list
    ArrayList<Task> tasks = new ArrayList<>();
    //Create 500 tasks
    for (int i = 0; i < 500; i + + ) {<!-- -->
        tasks.add(new Task());
    }
    //Initialize three threads ta, tb, tc
    TaskThreadA ta = new TaskThreadA(tasks);
    TaskThreadB tb = new TaskThreadB(tasks);
    TaskThreadC tc = new TaskThreadC(tasks);

//Start thread
    ta.start();
    tb.start();
    tc.start();

//The main process waits for the child process to end before continuing to run
    try {<!-- -->
        ta.join();
        tb.join();
        tc.join();
    } catch (InterruptedException e) {<!-- -->
        throw new RuntimeException(e);
    }

//Output the results of the task after processing
    for (int i = 0; i < tasks.size(); i + + ) {<!-- -->
        System.out.println(tasks.get(i).num);
    }
}

operation result:

2000
2000
2000
2000
2000
2000
2000
10
10
10
10
10

The output result of some tasks is not the expected 40000, because the threads do not necessarily run in the order A-B-C, and the running speed is different, which may cause the tasks to run in the order C-A-B, and the output results 2000.

Modification 1

Modify the task class and add three flags fA, fB, and fC to ensure that the task runs in the order A-B-C.

public class Task {<!-- -->
    int num = 0;
    //Add a flag to the task to ensure that the task runs in the order of A-B-C
    boolean fA = false;
    boolean fB = false;
    boolean fC = false;

    public void taskA() {<!-- -->
        if (!fA) {<!-- -->
            num + = 10;
            fA = true;
        }
    }

    public void taskB() {<!-- -->
        if (fA & amp; & amp; !fB) {<!-- -->
            num *= 20;
            fB = true;
        }
    }

    public void taskC() {<!-- -->
        if (fB & amp; & amp; !fC) {<!-- -->
            num *= num;
            fC = true;
        }
    }

    public boolean isEnd() {<!-- -->
        return fC;
    }
}

Modify the worker thread and add execution count to ensure that each task is executed 500 times.

public class TaskThreadA extends Thread {<!-- -->

    ArrayList<Task> tasks;

    public TaskThreadA(ArrayList tasks) {<!-- -->
        this.tasks = tasks;
    }

    @Override
    public void run() {<!-- -->
        while (true) {<!-- -->
            System.out.println("ThreadA run----");
            //Number of task executions
            int count = 0;
            for (int i = 0; i < tasks.size(); i + + ) {<!-- -->
                Task task = tasks.get(i);
                task.taskA();
                //Task A has been executed, count + +
                if (task.fA) {<!-- -->
                    count + + ;
                }
            }
            if (count == tasks.size()) {<!-- -->
                System.out.println("ThreadA end-----");
                break;
            }
        }


    }
}

class TaskThreadB extends Thread {<!-- -->

    ArrayList<Task> tasks;

    public TaskThreadB(ArrayList tasks) {<!-- -->
        this.tasks = tasks;
    }

    @Override
    public void run() {<!-- -->
        while (true) {<!-- -->
            System.out.println("ThreadB run----");
            //Number of task executions
            int count = 0;
            for (int i = 0; i < tasks.size(); i + + ) {<!-- -->
                Task task = tasks.get(i);
                task.taskB();
                //Task B has been executed, count + +
                if (task.fB) {<!-- -->
                    count + + ;
                }
            }
            if (count == tasks.size()) {<!-- -->
                System.out.println("ThreadB end-----");
                break;
            }
        }

    }
}

class TaskThreadC extends Thread {<!-- -->

    ArrayList<Task> tasks;

    public TaskThreadC(ArrayList tasks) {<!-- -->
        this.tasks = tasks;
    }

    @Override
    public void run() {<!-- -->
        while (true) {<!-- -->
            System.out.println("ThreadC run----");
            //Number of task executions
            int count = 0;
            for (int i = 0; i < tasks.size(); i + + ) {<!-- -->
                Task task = tasks.get(i);
                task.taskC();
                //Task C has been executed, count + +
                if (task.fC) {<!-- -->
                    count + + ;
                }
            }
            if (count == tasks.size()) {<!-- -->
                System.out.println("ThreadC end-----");
                break;
            }
        }
    }
}

operation result:

ThreadB run—-
ThreadC run—-
ThreadA run—-
ThreadB run—-
ThreadC run—-
ThreadA end—–
ThreadB end—–
ThreadC end—–
40000
40000
40000
40000
40000

Although the results are correct, the for loops of ThreadB and ThreadC ran twice. Because some problems did not complete the previous stage of the task due to the difference in running speed during the first traversal, so ThreadB and ThreadC skipped the task. Tasks, count < task.size(), need to be traversed from the beginning and re-complete the previously skipped tasks, which increases the time consumption.

Modification 2

We can make two modifications

1. Add a sleep time to the thread, let the thread sleep for a period of time after starting, and then run again to simulate a real pipeline
public class TaskThreadA extends Thread {<!-- -->

    ArrayList<Task> tasks;

    public TaskThreadA(ArrayList tasks) {<!-- -->
        this.tasks = tasks;
    }

    @Override
    public void run() {<!-- -->
        while (true) {<!-- -->
            System.out.println("ThreadA run----");
            //Increase sleep time
            try {<!-- -->
                Thread.sleep(200);
            } catch (InterruptedException e) {<!-- -->
                throw new RuntimeException(e);
            }
            //Number of task executions
            int count = 0;
            for (int i = 0; i < tasks.size(); i + + ) {<!-- -->
                Task task = tasks.get(i);
                task.taskA();
                //Task A has been executed, count + +
                if (task.fA) {<!-- -->
                    count + + ;
                }
            }
            if (count == tasks.size()) {<!-- -->
                System.out.println("ThreadA end-----");
                break;
            }
        }


    }
}

class TaskThreadB extends Thread {<!-- -->

    ArrayList<Task> tasks;

    public TaskThreadB(ArrayList tasks) {<!-- -->
        this.tasks = tasks;
    }

    @Override
    public void run() {<!-- -->
        while (true) {<!-- -->
            System.out.println("ThreadB run----");
            //Increase sleep time
            try {<!-- -->
                Thread.sleep(500);
            } catch (InterruptedException e) {<!-- -->
                throw new RuntimeException(e);
            }
            //Number of task executions
            int count = 0;
            for (int i = 0; i < tasks.size(); i + + ) {<!-- -->
                Task task = tasks.get(i);
                task.taskB();
                //Task B has been executed, count + +
                if (task.fB) {<!-- -->
                    count + + ;
                }
            }
            if (count == tasks.size()) {<!-- -->
                System.out.println("ThreadB end-----");
                break;
            }
        }

    }
}

class TaskThreadC extends Thread {<!-- -->

    ArrayList<Task> tasks;

    public TaskThreadC(ArrayList tasks) {<!-- -->
        this.tasks = tasks;
    }

    @Override
    public void run() {<!-- -->
        while (true) {<!-- -->
            System.out.println("ThreadC run----");
            //Increase sleep time
            try {<!-- -->
                Thread.sleep(1000);
            } catch (InterruptedException e) {<!-- -->
                throw new RuntimeException(e);
            }
            //Number of task executions
            int count = 0;
            for (int i = 0; i < tasks.size(); i + + ) {<!-- -->
                Task task = tasks.get(i);
                task.taskC();
                //Task C has been executed, count + +
                if (task.fC) {<!-- -->
                    count + + ;
                }
            }
            if (count == tasks.size()) {<!-- -->
                System.out.println("ThreadC end-----");
                break;
            }
        }
    }
}

After increasing the sleep time, ensure that thread B starts working after thread A and thread C starts working after thread B.

operation result:

ThreadA run—-
ThreadC run—-
ThreadB run—-
ThreadA end—–
ThreadB end—–
ThreadC end—–
40000
40000
40000
40000
40000

2. When letting the task execute, wait for the completion of the previous stage task
public class Task {<!-- -->
    int num = 0;
    boolean fA = false;
    boolean fB = false;
    boolean fC = false;

    public void taskA() {<!-- -->
        if (!fA) {<!-- -->
            num + = 10;
            fA = true;
        }
    }

    public void taskB() {<!-- -->
    //The task in the previous stage has not been executed, so it has been waiting.
        while (!fA) {<!-- -->
        }
        if (!fB) {<!-- -->
            num *= 20;
            fB = true;
        }
    }

    public void taskC() {<!-- -->
    //The task in the previous stage has not been executed, so it has been waiting.
        while (!fB) {<!-- -->
        }
        if (!fC) {<!-- -->
            num *= num;
            fC = true;
        }
    }

The running results are the same as above.

Although these two methods temporarily solve the time-consuming problem of executing the for loop multiple times, these two methods still have flaws.

  • Although Modification 1 does not require multiple executions of the for loop, it also increases the thread waiting time, and when the task volume increases significantly, due to the difference in task execution speed, the for loop will still be executed multiple times;
  • Modification 2 seems to have perfectly solved this problem, but when thread A skips a task while executing a task, subsequent threads will wait endlessly, causing the pipeline to be paralyzed.
public class TaskThreadA extends Thread {<!-- -->

    ArrayList<Task> tasks;

    public TaskThreadA(ArrayList tasks) {<!-- -->
        this.tasks = tasks;
    }

    @Override
    public void run() {<!-- -->

        for (int i = 0; i < tasks.size(); i + + ) {<!-- -->
            if (i == 100) {<!-- -->
                continue;
            }
            Task task = tasks.get(i);
            task.taskA();
        }
    }
}

The above methods can only handle pipeline tasks with a fixed number of tasks. The number of real pipeline tasks is dynamic. In order to simulate a real pipeline, we conduct another analysis of the pipeline.

4. Analyze the pipeline again

The generation task thread production task is put into task warehouse 0. The thread TA only needs to call the task from task warehouse 0 to complete it. After completion, the task is put into task warehouse 1. When an exception occurs and is not completed, it is handed over to the exception task detection and processing. The same goes for TB and TC. Each thread actually retrieves tasks from its own task warehouse to complete, and then puts them into the task warehouse of the next stage after completion.

Previously, we used to add flags to tasks to determine which running stage the tasks were in;
Now we can put tasks at different stages into corresponding different sets and classify them.

public class TaskThreadA extends Thread {<!-- -->
    private boolean flag = true; //Thread termination flag

    private BlockingQueue<Task> tasks;
    private BlockingQueue<Task> taskADone; // The new collection is used to store the tasks completed by thread A.

    public TaskThreadA(BlockingQueue<Task> tasks, BlockingQueue<Task> taskADone) {<!-- -->
        this.tasks = tasks;
        this.taskADone = taskADone;
    }

    @Override
    public void run() {<!-- -->
    System.out.println("ThreadA run----");
        while (this.flag) {<!-- -->
            try {<!-- -->
                Task task = tasks.take();
                task.taskA();
                taskADone.add(task); // Add the completed task to the taskADone collection
            } catch (InterruptedException e) {<!-- -->
                e.printStackTrace();
            }
        }
        System.out.println("ThreadA end----");
    }

    public void Stop(){<!-- -->
        this.flag = false;
    }
}

class TaskThreadB extends Thread {<!-- -->
    private boolean flag = true; //Thread termination flag

    private BlockingQueue<Task> taskADone;
    private BlockingQueue<Task> taskBDone; // The new collection is used to store the tasks completed by thread B

    public TaskThreadB(BlockingQueue<Task> taskADone, BlockingQueue<Task> taskBDone) {<!-- -->
        this.taskADone = taskADone;
        this.taskBDone = taskBDone;
    }

    @Override
    public void run() {<!-- -->
    System.out.println("ThreadB run----");
        while (this.flag) {<!-- -->
            try {<!-- -->
                Task task = taskADone.take();
                task.taskB();
                taskBDone.add(task); // Add the completed task to the taskBDone collection
            } catch (InterruptedException e) {<!-- -->
                e.printStackTrace();
            }
        }
        System.out.println("ThreadB end----");
    }

    public void Stop(){<!-- -->
        this.flag = false;
    }
}

class TaskThreadC extends Thread {<!-- -->
    private boolean flag = true; //Thread termination flag

    private BlockingQueue<Task> taskBDone;
    private ArrayList<Task> taskCDone; // The new collection is used to store tasks completed by thread C

    public TaskThreadC(BlockingQueue<Task> taskBDone, ArrayList<Task> taskCDone) {<!-- -->
        this.taskBDone = taskBDone;
        this.taskCDone = taskCDone;
    }

    @Override
    public void run() {<!-- -->
    System.out.println("ThreadC run----");
        while (this.flag) {<!-- -->
            try {<!-- -->
                Task task = taskBDone.take();
                task.taskC();
                taskCDone.add(task); // Add the completed task to the taskCDone collection
            } catch (InterruptedException e) {<!-- -->
                e.printStackTrace();
            }
        }
        System.out.println("ThreadC end----");
    }

    public void Stop(){<!-- -->
        this.flag = false;
    }
}
//Production task thread
class TaskCreator extends Thread {<!-- -->
    private boolean flag = true; //Thread termination flag

    private BlockingQueue<Task> tasks;

    public TaskCreator(BlockingQueue<Task> tasks) {<!-- -->
        this.tasks = tasks;
    }

    @Override
    public void run() {<!-- -->
    System.out.println("ThreadCreator run----");
        Random r = new Random();
        while (this.flag) {<!-- -->
            try {<!-- -->
                Thread.sleep(r.nextInt(1000));
            } catch (InterruptedException e) {<!-- -->
                e.printStackTrace();
            }
            tasks.add(new Task());
        }
        System.out.println("ThreadCreator end----");
    }

    public void Stop(){<!-- -->
        this.flag = false;
    }
}

Add a running termination flag to each thread, control the termination of threads according to needs, and complete a dynamic number of tasks.

public class Main {<!-- -->

public static void main(String[] args) {<!-- -->
Scanner scanner = new Scanner(System.in);
boolean flag = true; //Thread termination flag
BlockingQueue<Task> tasks = new LinkedBlockingDeque<>();//Generated task list
BlockingQueue<Task> taskADone = new LinkedBlockingDeque<>(); //Thread A processes the completed task
BlockingQueue<Task> taskBDone = new LinkedBlockingDeque<>(); //Thread B processes the completed task
ArrayList<Task> taskCDone = new ArrayList<>(); //Thread C processes completed tasks

TaskCreator creator = new TaskCreator(tasks);
TaskThreadA ta = new TaskThreadA(tasks, taskADone);
TaskThreadB tb = new TaskThreadB(taskADone, taskBDone);
TaskThreadC tc = new TaskThreadC(taskBDone, taskCDone);

creator.start();
ta.start();
tb.start();
tc.start();
\t\t
int i = scanner.nextInt();
if (i > 0) {<!-- -->
flag = false;
}
if (!flag) {<!-- -->
creator.Stop();
ta.Stop();
tb.Stop();
tc.Stop();
}
try {<!-- -->
creator.join();
ta.join();
tb.join();
tc.join();

} catch (InterruptedException e) {<!-- -->
throw new RuntimeException(e);
}

for (Task task : taskCDone) {<!-- -->
System.out.println(task.num);
}
}
}

Control thread termination via input.

operation result:

ThreadCreator run—-
ThreadB run—-
ThreadA run—-
ThreadC run—-
1
ThreadCreator end—-
ThreadA end—-
ThreadB end—-
ThreadC end—-
40000
40000
40000
40000

At this point, the pipeline implementation is completed.