A task fragmentation scheme based on zookeeper/curator

If there is a table now, the records in the table are some delayed messages, that is, the messages that need to be sent out at a certain time in the future. The number of records in the table is indefinite. Two are enough. At this time, a leader election/task sharding is needed. The following is a solution, using curator to achieve.
Above code:

import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.curator.retry.RetryNTimes;
import org.apache.zookeeper.CreateMode;

import java.util.List;
import java.util.UUID;

public class JobDistribute {
    private static final String ZK_ADDRESS = "127.0.0.1:2181";
    private static final String ZK_LEADER_PATH = "/task/exec/leader";
    private static final String ZK_TASK_SPLIT_PATH = "/task/split/worker";
    private static final String WORKERS_PARENT = "/task/split/";

    private static List<String> allJobs = Lists. newArrayList("a","b","c","d","e","f","g","h");

    public static void main(String[] args) throws Exception {
        CuratorFramework client = CuratorFrameworkFactory
                .newClient(ZK_ADDRESS,10000,5000,
                        new RetryNTimes(Integer. MAX_VALUE, 1000));
        client.start();
        String getJobPath = client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(ZK_TASK_SPLIT_PATH);
        //Monitor the value change of the task assignment node, and execute the task according to the value
        final NodeCache nodeCache = new NodeCache(client, getJobPath, false);
        nodeCache.getListenable().addListener(new NodeCacheListener() {
            @Override
            public void nodeChanged() throws Exception {
                System.out.println("nodeChanged: next time do those jobs ================" + nodeCache.getCurrentData());
                //You can write to the local cache and execute the corresponding task. . . . .
            }
        });
        nodeCache. start();
        System.out.println("getJobPath:" + getJobPath);
        final String id = UUID. randomUUID(). toString();
        System.out.println("id:" + id);
        LeaderLatch leaderLatch = new LeaderLatch(client, ZK_LEADER_PATH, id);
        leaderLatch. addListener(new LeaderLatchListener() {
            @Override
            public void isLeader() {
                System.out.println("Currently run as leader");
                //Manage task sharding after becoming a leader
                try {
                    splitJobs(client);
                } catch (Exception e){
                    e.printStackTrace();
                }
                try {
                    PathChildrenCache cache = new PathChildrenCache(client,
                            WORKERS_PARENT.substring(0,WORKERS_PARENT.length()-1), true);
                    cache. start();
                    cache.getListenable().addListener(new PathChildrenCacheListener(){
                        @Override
                        public void childEvent(CuratorFramework curatorFramework,
                                               PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
                            System.err.println("Event TYpe: " + pathChildrenCacheEvent.getType());
                            if(pathChildrenCacheEvent.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED ||
                                    pathChildrenCacheEvent.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED){
                                // Newly added nodes or deleted nodes, reassign tasks
                                splitJobs(client);
                            }
                        }
                    });
                } catch (Exception e){
                    e.printStackTrace();
                }
            }
            @Override
            public void notLeader() {
                System.out.println("Currently run as slave");
            }
        });
        leaderLatch. start();
        Thread. sleep(Integer. MAX_VALUE);
    }
    
    //Here is a simple and crude method. In fact, there are many ways to divide tasks equally. Double loops can be used. The number of outer loops is large, and the number of inner loops is small.
    //If you want a task not to be randomly transferred as much as possible, you can also have other algorithms
    //Here are 8 tasks, and up to four machines can execute them at the same time.
    private static void splitJobs(CuratorFramework client) throws Exception {
        List<String> workers = client.getChildren().forPath(WORKERS_PARENT.substring(0,WORKERS_PARENT.length()-1));
        System.out.println("workers:" + workers);
        if(workers. size()==1){
            String path = WORKERS_PARENT + workers. get(0);
            client.setData().forPath(path, StringUtils.join(allJobs,",").getBytes());
        }else if(workers. size() ==2){
            String path0 = WORKERS_PARENT + workers. get(0);
            String path1 = WORKERS_PARENT + workers. get(1);
            client.setData().forPath(path0, StringUtils.join(allJobs.subList(0,4), ",").getBytes());
            client.setData().forPath(path1, StringUtils.join(allJobs.subList(4,8), ",").getBytes());
        }else if(workers. size() == 3){
            String path0 = WORKERS_PARENT + workers. get(0);
            String path1 = WORKERS_PARENT + workers. get(1);
            String path2 = WORKERS_PARENT + workers. get(2);
            client.setData().forPath(path0, StringUtils.join(allJobs.subList(0,3), ",").getBytes());
            client.setData().forPath(path1, StringUtils.join(allJobs.subList(3,6), ",").getBytes());
            client.setData().forPath(path2, StringUtils.join(allJobs.subList(6,8), ",").getBytes());
        }else if(workers. size() == 4){
            String path0 = WORKERS_PARENT + workers. get(0);
            String path1 = WORKERS_PARENT + workers. get(1);
            String path2 = WORKERS_PARENT + workers. get(2);
            String path3 = WORKERS_PARENT + workers. get(3);
            client.setData().forPath(path0, StringUtils.join(allJobs.subList(0,2), ",").getBytes());
            client.setData().forPath(path1, StringUtils.join(allJobs.subList(2,4), ",").getBytes());
            client.setData().forPath(path2, StringUtils.join(allJobs.subList(4,6), ",").getBytes());
            client.setData().forPath(path3, StringUtils.join(allJobs.subList(6,8), ",").getBytes());
        }
        System.out.println("splitJobs ok");
    }
}

You can use zkCli to view the value of the corresponding node, you can force the leader to quit, and after a few seconds, a new leader will be elected and tasks will be assigned.
This is just a rough prototype, and a lot of work needs to be done to achieve high availability. This is what I thought of when doing delayed tasks recently. If you add a scheduled task to poll the task table and add jdk delayqueue, then you can implement a delayed message system or a delayed task system. OK.