springboot integrates disruptor (thread factory)

1. Introduction to the concurrent framework disruptor

1. Concept: a framework for asynchronous communication between threads in the same jvm process

2. Ring array RingBuffer: the core storage container of the disruptor

2.1. The elements in the ring array are overwritten to avoid jvm GC

2.2. The size of the array is 2 to the nth power, so that element positioning can be done more efficiently through bit operations. In fact, it is the same as the index operation of hashmap. The difference is that hashmap will expand, and this RingBuffer will not expand to cover the original data

3. Sequence Barrier:

It is a class that acts as a barrier, because in the process of putting it into the RingBuffer, the inconsistent access speed of the producer and the consumer will cause errors. At this time, SequenceBarrier can be used to limit too fast deposit or withdrawal to achieve consistent speed and ensure no errors. The principle is that every time the consumer fetches, it will return the position of the fetched data to the producer, and the producer uses this position to judge when to put the data in the RingBuffer

4. Workflow:

The producer puts data in the RingBuffer, and the disruptor pushes the data to the consumer

5. Working mode:

Unified consumption, group consumption, sequential consumption, multi-branch sequential consumption

Detailed introduction: https://blog.csdn.net/zhouzhenyong/article/details/81303011

2. Springboot integrates disruptor

1. Message body

package com.huwei.hotel.collector.contacter.interfaces.auth.event;


import com.huwei.hotel.common.enums.MqttAuthTypeEnum;
import lombok.Data;
import lombok. NoArgsConstructor;
import org.springframework.http.HttpStatus;

/**
 * @author ljy
 * @date 2023/3/06
 **/
@Data
@NoArgsConstructor
public class AuthEvent {
    private String authName;
    private MqttAuthTypeEnum authType;
    private String clientKey;
    private String failureReason;

    void clear() {
        authName = null;
        authType = null;
        clientKey = null;
        failureReason = null;
    }
}

2. Business data factory

package com.huwei.hotel.collector.contacter.interfaces.auth.event;

import com.lmax.disruptor.EventFactory;

/**
 * factory method
 * @author ljy
 * @date 2023/3/06
 */
public class AuthEventFactory implements EventFactory<AuthEvent> {

    @Override
    public AuthEvent newInstance() {
        return new AuthEvent();
    }

}

3. Producer

package com.huwei.hotel.collector.contacter.interfaces.auth.event;

import com.huwei.hotel.common.enums.MqttAuthTypeEnum;
import com.lmax.disruptor.RingBuffer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
  * @author ljy
 * @date 2023/3/06
 **/
@Slf4j
@Component
public class AuthEventProducer {
    @Resource(name = "authEventRingBuffer")
    RingBuffer<AuthEvent> ringBuffer;

    public void publish(String authName, MqttAuthTypeEnum authType, String clientKey, String failureReason) {
        ringBuffer. publishEvent((event, sequence) -> {
            event.setAuthName(authName);
            event.setAuthType(authType);
            event.setClientKey(clientKey);
            event.setFailureReason(failureReason);
        });
    }
}

4. Define the configuration class RingBufferConfiguration

package com.huwei.hotel.collector.contacter.infrastructure.config;

import com.huwei.hotel.collector.contacter.application.event.MessageEvent;
import com.huwei.hotel.collector.contacter.infrastructure.disruptor.DisruptorFactory;
import com.huwei.hotel.collector.contacter.interfaces.auth.event.AuthEvent;
import com.huwei.hotel.collector.contacter.interfaces.auth.event.AuthHandler;
import com.huwei.hotel.collector.contacter.interfaces.debug.event.DebugEvent;
import com.huwei.hotel.collector.contacter.interfaces.debug.event.DebugHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PreDestroy;

/**
  * @author ljy
 * @date 2023/3/06
 **/
@Configuration
public class RingBufferConfiguration {
    final DisruptorFactory disruptorFactory = new DisruptorFactory();
    final Disruptor<MessageEvent> messageEventDisruptor;
    final Disruptor<DebugEvent> debugEventDisruptor;
    final Disruptor<AuthEvent> authEventDisruptor;
    final DebugHandler debugHandler;
    final AuthHandler authHandler;

    RingBufferConfiguration(DebugHandler debugHandler, AuthHandler authHandler){
        this.debugHandler = debugHandler;
        this.authHandler = authHandler;
        messageEventDisruptor = disruptorFactory. createDisruptor();
        debugEventDisruptor = disruptorFactory.createDisruptor(this.debugHandler);
        authEventDisruptor = disruptorFactory.createDisruptor(this.authHandler);
        this. start();
    }

    private void start() {
        if(messageEventDisruptor != null){
            messageEventDisruptor.start();
        }

        if(debugEventDisruptor != null){
            debugEventDisruptor.start();
        }

        if(authEventDisruptor != null){
            authEventDisruptor.start();
        }
    }

    @PreDestroy
    public void doDestory(){
        if(messageEventDisruptor != null){
            messageEventDisruptor. shutdown();
        }

        if(debugEventDisruptor != null){
            debugEventDisruptor.shutdown();
        }

        if(authEventDisruptor != null){
            authEventDisruptor. shutdown();
        }
    }

    @Bean(name = "messageEventRingBuffer")
    public RingBuffer<MessageEvent> messageEventRingBuffer() {
        return messageEventDisruptor.getRingBuffer();
    }

    @Bean(name = "debugEventRingBuffer")
    public RingBuffer<DebugEvent> debugEventRingBuffer() {
        return debugEventDisruptor.getRingBuffer();
    }

    @Bean(name = "authEventRingBuffer")
    public RingBuffer<AuthEvent> authEventRingBuffer() {
        return authEventDisruptor. getRingBuffer();
    }
}

5. Define the DisruptorFactory thread factory

package com.huwei.hotel.collector.contacter.infrastructure.disruptor;

import com.huwei.hotel.collector.contacter.application.event.*;
import com.huwei.hotel.collector.contacter.infrastructure.helper.ThreadPoolHelper;
import com.huwei.hotel.collector.contacter.interfaces.auth.event.AuthEvent;
import com.huwei.hotel.collector.contacter.interfaces.auth.event.AuthEventExceptionHandler;
import com.huwei.hotel.collector.contacter.interfaces.auth.event.AuthHandler;
import com.huwei.hotel.collector.contacter.interfaces.debug.event.DebugEvent;
import com.huwei.hotel.collector.contacter.interfaces.debug.event.DebugEventExceptionHandler;
import com.huwei.hotel.collector.contacter.interfaces.debug.event.DebugHandler;
import com.lmax.disruptor.dsl.Disruptor;

/**
  * @author ljy
 * @date 2023/3/06
 **/
public class DisruptorFactory {
    /**
     * Multi-producer mode, the default waiting strategy is blocking strategy
     *
     * @return
     */
    public Disruptor<MessageEvent> createDisruptor() {
        int bufferSize = 1024 * 64;

        Disruptor<MessageEvent> disruptor =
                new Disruptor<>(MessageEvent::new, bufferSize, ThreadPoolHelper. threadFactory("MessageEvent"));

        ResponseHandler[] cackHandlers = new ResponseHandler[5];
        for (int i = 0; i < cackHandlers. length; i ++ ) {
            cackHandlers[i] = new ResponseHandler();
        }

        DataHandler[] workHandlers = new DataHandler[5];
        for (int i = 0; i < workHandlers. length; i ++ ) {
            workHandlers[i] = new DataHandler();
        }

        ClearHandler clearHandler = new ClearHandler();
 /* Set event business processor---consumer introduces several types
        //Define the consumer execution mode (here a consumer is a thread, and the consumer execution mode is also the execution mode of the thread)
   // disruptor.handleEventsWith(msg1, msg2, msg3, msg4); //Unified consumption: a message will be consumed by all consumers
// disruptor.handleEventsWithWorkerPool(msg1, msg2); //Group consumption: a message can only be consumed by one consumer, and multiple consumers poll for processing
// disruptor.handleEventsWith(msg1, msg3).then(msg2); //Sequential consumption: 1 and 3 are processed in parallel first, and then 2 is processed
// disruptor.handleEventsWith(msg1, msg3); // Multi-branch sequential consumption: consumer 1 and consumer 3 a branch, consumer 2 and consumer 4 a branch, after consumer 3 and consumer 4 consume, consume 5 to consume
// disruptor. handleEventsWith(msg2, msg4);
   // disruptor.after(msg3, msg4).handleEventsWith(msg5);
*/
        disruptor. handleEventsWithWorkerPool(cackHandlers)
                .thenHandleEventsWithWorkerPool(workHandlers).then(clearHandler);
        disruptor.setDefaultExceptionHandler(MessageEventExceptionHandler.INSTANCE);
        return disruptor;
    }

    /**
     * Multi-producer mode, the default waiting strategy is blocking strategy
     *
     * @return
     */
    public Disruptor<DebugEvent> createDisruptor(DebugHandler debugHandler) {
        int bufferSize = 1024 * 64;

        Disruptor<DebugEvent> disruptor =
                new Disruptor<>(DebugEvent::new, bufferSize, ThreadPoolHelper. threadFactory("DebugEvent"));

        disruptor. handleEventsWith(debugHandler);
        disruptor.setDefaultExceptionHandler(DebugEventExceptionHandler.INSTANCE);
        return disruptor;
    }

    /**
     * Multi-producer mode, the default waiting strategy is blocking strategy
     *
     * @return
     */
    public Disruptor<AuthEvent> createDisruptor(AuthHandler authHandler) {
        int bufferSize = 1024 * 64;

        Disruptor<AuthEvent> disruptor =
                new Disruptor<>(AuthEvent::new, bufferSize, ThreadPoolHelper. threadFactory("AuthEvent"));

        disruptor. handleEventsWith(authHandler);
        disruptor.setDefaultExceptionHandler(AuthEventExceptionHandler.INSTANCE);
        return disruptor;
    }
}

6. DisruptorFactory thread factory tool class

package com.huwei.hotel.collector.contacter.infrastructure.helper;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 *  Thread Pool
 *
  * @author ljy
 * @date 2023/3/06
 **/
@Slf4j
public class ThreadPoolHelper {
    /**
     * thread factory
     *
     * @param preName
     * @return
     */
    public static ThreadFactory threadFactory(String preName) {
        BasicThreadFactory threadFactory = new BasicThreadFactory. Builder()
                .namingPattern(preName + "-Disruptor-%d")
                .daemon(true)
                .priority(Thread.NORM_PRIORITY)
                .uncaughtExceptionHandler(new Thread. UncaughtExceptionHandler() {
                    @Override
                    public void uncaughtException(Thread t, Throwable e) {
                        log.error(String.format("Create thread (%s) exception", t.getName()), e);
                    }
                })
                .build();
        return threadFactory;
    }

    /**
     * Thread Pool
     *
     * @param nThreads
     * @param preName
     * @return
     */
    public static ExecutorService executorService(int nThreads, String preName) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(nThreads,
                nThreads,
                60L,
                TimeUnit. SECONDS,
                new ArrayBlockingQueue<>(nThreads),
                threadFactory(preName));
        return executor;
    }
}

7. Consumers

package com.huwei.hotel.collector.contacter.interfaces.auth.event;

import com.huwei.hotel.collector.contacter.interfaces.userlog.service.UserLogService;
import com.huwei.hotel.common.enums.MqttAuthTypeEnum;
import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * To be improved, it can be processed according to the batch reading of Disruptor
 *
 * @author ljy
 * @date 2023/3/06 14:24
 **/
@Slf4j
@Component
public class AuthHandler implements WorkHandler<AuthEvent>, EventHandler<AuthEvent> {
    @Autowired
    UserLogService userLogService;

    @Override
    public void onEvent(AuthEvent event) throws Exception {
        try {
            String authName = event. getAuthName();
            MqttAuthTypeEnum authType = event. getAuthType();
            String clientKey = event. getClientKey();
            String failureReason = event. getFailureReason();

            //Record authentication result
            userLogService. save(authName, authType, clientKey, failureReason);
        } finally {
            event. clear();
        }
    }

    @Override
    public void onEvent(AuthEvent event, long sequence, boolean endOfBatch) throws Exception {
        this.onEvent(event); // TODO: Batch processing is possible
    }
}

8. Call