Workflow Engine Design and Implementation · Task Blocking and Execution

In the previous article, when we talked about the task model, we mentioned blocking tasks, so how is this formed in the workflow? What is the relationship between blocking tasks and task models? Let’s do a simple analysis below. Here is still the simplest process as an example:

Process definition

Process Description

node name display name model class
start Start StartModel
apply Leave Application TaskModel
approveDept Department leader approval TaskModel
end End EndModel

Process execution analysis

  • Initiate a process, that is, get the StartModel node, call its execution method, startModel.execute(...)
  1. At this time, because the target node of the output edge is TaskModel, a blocking task (leave application) needs to be generated. Note that the task is only generated here and not executed.
  2. Suppose the generated tasks are stored in the following task table:
    Task ID Task Name Display Name Task Status
    1 apply Application for leave in progress
  • Blocking tasks generally have two states: in progress and completed
  • Completing a task (leave application) needs to be processed as follows
  1. Find the record corresponding to the task ID
  2. Change task status from in progress to completed
  3. Convert the process definition file corresponding to the task into a process model
  4. Find the task model with the same task name from the process model, that is, the leave application node named above
  5. Call the execution method of the task model, applyTaskModel.execute(...)
  6. At this time, because the target node of the output edge is still TaskModel, blocking tasks will still be generated (approved by department leaders)

At this point the task table is as follows:

Task ID Task Name Display Name Task Status
1 apply Application for Leave Completed
2 approveDept Department leader approval in progress
  • Repeat the previous step to complete a task (department leader approval)
  1. Find the record corresponding to the task ID
  2. Change task status from in progress to completed
  3. Convert the process definition file corresponding to the task into a process model
  4. Find the task model with the same task name from the process model, that is, the department leader approval node named above
  5. Call the execution method of the task model, approveDeptTaskModel.execute(...)
  6. At this point, because the target node of the output edge is the end node, the process ends

At the end of the final process, the task list is as follows:

Task ID Task Name Display Name Task Status
1 apply Application for Leave Completed
2 approveDept Department Leader Approval Completed

From the above process analysis, we can draw the following explanation:

  • A blocking task is something a user has to do, which will be recorded on the task table
  • The blocking task is associated with a task model node of a process model. When the task is completed, it needs to find its task model node from the associated process model and call the node execution method

Class Diagram

For generality, we can organize the code as follows:

Code Implementation

Added the operation processing interface of each model of the process handlers/IHandler.java, which is mainly used to expand the execution method of the node.

package com.mldong.flow.engine.handlers;


import com.mldong.flow.engine.core.Execution;

/**
 * Operation and processing interfaces of each model in the process
 * @author mldong
 * @date 2023/5/17
 */
public interface IHandler {<!-- -->
    /**
     * Subclasses need to implement methods to handle specific operations
     * @param execution execution object
     */
    void handle(Execution execution);
}

The method fire is added to the base model class to facilitate subclasses to call models/BaseModel.java

package com.mldong.flow.engine.model;

import com.mldong.flow.engine.core.Execution;
import com.mldong.flow.engine.handlers.IHandler;
import lombok.Data;
/**
 *
 * Model base class
 * @author mldong
 * @date 2023/4/25
 */
@Data
public class BaseModel {<!-- -->
    private String name; // unique code
    private String displayName; // display name
    /**
     * Hand over the execution object execution to a specific processor for processing
     * @param handler
     * @param execution
     */
    protected void fire(IHandler handler, Execution execution) {<!-- -->
        handler. handle(execution);
    }
}

Add the task entity class entity/ProcessTask.java, and only define a few necessary fields for the time being. When we talk about database design later, we will improve the fields.

package com.mldong.flow.engine.entity;

import lombok.Data;

import java.io.Serializable;

/**
 *
 * Process tasks
 * @author mldong
 * @date 2023/5/17
 */
@Data
public class ProcessTask implements Serializable {<!-- -->
    private Long id; // primary key
    private String taskName; // task name
    private String displayName; // task display name
    private Integer taskState; // task state
}

Corresponding task state enumeration class enums/TaskStateEnum.java

package com.mldong.flow.engine.enums;
/**
 *
 * Task status enumeration
 * @author mldong
 * @date 2023/5/17
 */
public enum TaskStateEnum {<!-- -->
    DOING(10,"in progress"),
    FINISHED(20, "Finished"),
    ;
    private final Integer code;

    private final String message;


    TaskStateEnum(Integer code, String message) {<!-- -->
        this.code = code;
        this. message = message;
    }

    public Integer getCode() {<!-- -->
        return code;
    }

    public String getMessage() {<!-- -->
        return message;
    }
}

Process execution parameter core/Execution.java increased

  • Current Process Model Properties
  • Current Process Task Properties
  • Process Task List Properties
  • Add task to task collection method
  • Get the list of tasks in progress
package com.mldong.flow.engine.core;

import cn.hutool.core.lang.Dict;
import com.mldong.flow.engine.entity.ProcessTask;
import com.mldong.flow.engine.enums.TaskStateEnum;
import com.mldong.flow.engine.model.ProcessModel;
import lombok.Data;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

/**
 *
 * Execution object parameters
 * @author mldong
 * @date 2023/4/25
 */
@Data
public class Execution {<!-- -->
    // Process instance ID
    private String processInstanceId;
    // Current process task ID
    private String processTaskId;
    // Execution object extension parameters
    private Dict args;
    // current process model
    private ProcessModel processModel;
    // current task
    private ProcessTask processTask;
    // collection of all tasks
    private List<ProcessTask> processTaskList = new ArrayList<>();

    /**
     * Add task to task collection
     * @param processTask
     */
    public void addTask(ProcessTask processTask) {<!-- -->
        this.processTaskList.add(processTask);
    }

    /**
     * Get a list of ongoing tasks
     * @return
     */
    public List<ProcessTask> getDoingTaskList() {<!-- -->
        return this.processTaskList.stream().filter(item->{<!-- -->
            return TaskStateEnum.DOING.getCode().equals(item.getTaskState());
        }).collect(Collectors.toList());
    }
 }

The process model class model/ProcesssModel.java adds a node model method for obtaining the specified node name defined by the process

package com.mldong.flow.engine.model;

import lombok.Data;

import java.util.ArrayList;
import java.util.List;
/**
 *
 * Process model
 * @author mldong
 * @date 2023/4/25
 */
@Data
public class ProcessModel extends BaseModel {<!-- -->
    private String type; // Process definition classification
    private String instanceUrl; // The form key to be filled in to start the instance
    private String expireTime; // expected completion time variable key
    private String instanceNoClass; // instance number generator implementation class
    // All nodes of the process definition
    private List<NodeModel> nodes = new ArrayList<NodeModel>();
    // All task nodes defined by the process
    private List<TaskModel> tasks = new ArrayList<TaskModel>();

    /**
     * get start node
     * @return
     */
    public StartModel getStart() {<!-- -->
        StartModel startModel = null;
        for (int i = 0; i < nodes. size(); i ++ ) {<!-- -->
            NodeModel nodeModel = nodes. get(i);
            if(nodeModel instanceof StartModel) {<!-- -->
                startModel = (StartModel) nodeModel;
                break;
            }
        }
        return startModel;
    }
    /**
     * Get the node model of the specified node name defined by the process
     * @param nodeName node name
     * @return
     */
    public NodeModel getNode(String nodeName) {<!-- -->
        for(NodeModel node : nodes) {<!-- -->
            if(node.getName().equals(nodeName)) {<!-- -->
                return node;
            }
        }
        return null;
    }
}

The newly created task handler class handlers/impl/CreateTaskHandler.java implements the IHandler interface, which is mainly used to create process tasks. The created process tasks are added to the task list of the execution object, and the current task will be bound to set to the execution parameters.

package com.mldong.flow.engine.handlers.impl;


import cn.hutool.core.util.RandomUtil;
import com.mldong.flow.engine.core.Execution;
import com.mldong.flow.engine.entity.ProcessTask;
import com.mldong.flow.engine.enums.TaskStateEnum;
import com.mldong.flow.engine.handlers.IHandler;
import com.mldong.flow.engine.model.TaskModel;

/**
 * Create a task handler
 * @author mldong
 * @date 2023/5/16
 */
public class CreateTaskHandler implements IHandler {<!-- -->
    private TaskModel taskModel;
    public CreateTaskHandler(TaskModel taskModel) {<!-- -->
        this.taskModel = taskModel;
    }
    @Override
    public void handle(Execution execution) {<!-- -->
        ProcessTask processTask = new ProcessTask();
        processTask.setId(RandomUtil.randomLong());
        processTask.setTaskName(this.taskModel.getName());
        processTask.setDisplayName(this.taskModel.getDisplayName());
        processTask.setTaskState(TaskStateEnum.DOING.getCode());
        execution.setProcessTask(processTask);
        System.out.println("Create task:" + processTask.getTaskName() + "," + processTask.getDisplayName());
        execution. addTask(processTask);
    }
}

Adjust the model/TransitionModel.java method. When the target node is a task model, call the create task processor to create the corresponding task.
For the task model node, the execution cycle is essentially: generate the corresponding blocking task -> complete the blocking task -> call the execution method of the task model node

package com.mldong.flow.engine.model;

import com.mldong.flow.engine.Action;
import com.mldong.flow.engine.core.Execution;
import com.mldong.flow.engine.handlers.impl.CreateTaskHandler;
import lombok.Data;
/**
 *
 * Edge model
 * @author mldong
 * @date 2023/4/25
 */
@Data
public class TransitionModel extends BaseModel implements Action {<!-- -->
    private NodeModel source; // edge source node reference
    private NodeModel target; // edge target node reference
    private String to; // target node name
    private String expr; // edge expression
    private String g; // set of edge point coordinates (x1, y1; x2, y2, x3, y3...) start, corner, end
    private boolean enabled; // Whether it can be executed
    @Override
    public void execute(Execution execution) {<!-- -->
        if(!enabled) return;
        if(target instanceof TaskModel) {<!-- -->
            // Create a blocking task
            fire(new CreateTaskHandler((TaskModel) target), execution);
        } else {<!-- -->
            target. execute(execution);
        }
    }
}

Add the class MockExecuteTask.java that simulates the whole process of executing the task

  • Loop to determine whether there are ongoing tasks
  • If it exists, take the first unfinished task
  • Change task status from in progress to completed
  • Find the task model with the same task name from the process model
  • Call the node model execution method

Of course, in order to simulate the blocking of the task, we have added a sleep method, sleep for 1s ThreadUtil.safeSleep(1000) and then execute the task

package com.mldong.flow.engine;

import cn.hutool.core.thread.ThreadUtil;
import com.mldong.flow.engine.core.Execution;
import com.mldong.flow.engine.entity.ProcessTask;
import com.mldong.flow.engine.model.NodeModel;

import java.util.List;

/**
 *
 * Simulate the whole process of executing tasks
 * @author mldong
 * @date 2023/5/17
 */
public class MockExecuteTask {<!-- -->
    private Execution execution;
    public MockExecuteTask(Execution execution) {<!-- -->
        this.execution = execution;
    }
    public void run() {<!-- -->
        while (!execution.getDoingTaskList().isEmpty()) {<!-- -->
            // Sleep for 1s
            ThreadUtil. safeSleep(1000);
            // find unfinished tasks
            List<ProcessTask> doingTaskList = execution. getDoingTaskList();
            // Get the first unfinished task
            ProcessTask processTask = doingTaskList. get(0);
            // Change the status of the task from in progress to completed
            processTask. setTaskState(20);
            System.out.println("Set task status as completed:" + processTask.getTaskName() + "," + processTask.getDisplayName());
            execution.setProcessTask(processTask);
            // Find the task model with the same task name from the process model
            NodeModel nodeModel = execution.getProcessModel().getNode(processTask.getTaskName());
            // Call the node model execution method
            nodeModel. execute(execution);
        }
    }
}

Adjust the unit test class ExecuteTest.java

package com.mldong.flow;

import cn.hutool.core.io.IoUtil;
import cn.hutool.core.lang.Dict;
import com.mldong.flow.engine.MockExecuteTask;
import com.mldong.flow.engine.cfg.Configuration;
import com.mldong.flow.engine.core.Execution;
import com.mldong.flow.engine.model.ProcessModel;
import com.mldong.flow.engine.parser.ModelParser;
import org.junit.Test;
/**
 *
 * Execute the test
 * @author mldong
 * @date 2023/5/1
 */
public class ExecuteTest {<!-- -->
    @Test
    public void executeLeave_01() {<!-- -->
        new Configuration();
        // Parse the process definition file into a process model
        ProcessModel processModel = ModelParser. parse(IoUtil. readBytes(this. getClass(). getResourceAsStream("/leave. json")));
        // Construct execution parameters
        Execution execution = new Execution();
        // Set the current process model
        execution.setProcessModel(processModel);
        // set extended properties
        execution.setArgs(Dict.create());
        // Get the start node and call the execution method
        processModel.getStart().execute(execution);
        // Execute the simulation execution task method
        new MockExecuteTask(execution).run();
    }
}

Final execution result:

Call the model node execution method: model:StartModel,name:start,displayName:start
Create tasks: apply, leave application
Set the task status as completed: apply, leave application
Call the execution method of the model node: model:TaskModel,name:apply,displayName:leave application
Create task: approveDept, department leader approval
Set the task status as completed: approveDept, approved by the department leader
Call the execution method of the model node: model:TaskModel, name: approveDept, displayName: department leader approval
Call the model node execution method: model:EndModel,name:end,displayName:end

Summary

This chapter mainly explains the blocking and execution of tasks, which is essentially the relationship between blocking tasks and task model nodes. In the workflow, blocking tasks are my to-do/completed tasks in our personal office, ongoing tasks are to-do tasks, and completed tasks are completed tasks. When we process a task in progress, we modify the status of the task in progress to be completed, and call the execution method of the task model node bound to the task to drive the process to the next node. For convenience, we use MockExecuteTask to simulate the process, and we will further improve it when we design the database.

Join an organization

Please open in WeChat:
“Lidong and His Friends”

fenchuan.jpg

Related source code

mldong-flow-demo-05

Process Designer

online experience