SpringBoot + Disruptor implements extremely fast and high-concurrency processing, supporting 6 million orders per second without any pressure!

1. Introduction to Disruptor

Disruptor is a high-performance queue developed by LMAX, a British foreign exchange trading company. The original intention of research and development was to solve the delay problem of memory queues (in performance tests, it was found that it is in the same order of magnitude as I/O operations).

The system developed based on Disruptor can support 6 million orders per second in a single thread. After giving a speech at QCon in 2010, it gained industry attention.

Disruptor is an open source Java framework designed to achieve the highest possible throughput (TPS) and the lowest possible latency on the producer-consumer problem (PCP).

From a functional point of view, Disruptor implements the “queue” function, and it is a bounded queue. Then its application scenario is naturally the application scenario of the “producer-consumer” model.

Disruptor is a key component of the LMAX online trading platform. The LMAX platform uses this framework to process orders at a speed of 6 million TPS. In addition to the financial field, Disruptor can be used in other general applications, and it can bring significant performance promote.

In fact, Disruptor is not so much a framework as a design idea. For programs with elements such as “concurrency, buffers, producer-consumer models, and transaction processing”, Disruptor proposes a large-scale design idea. Solutions to improve performance (TPS).

Disruptor’s github homepage:

https://github.com/LMAX-Exchange/disruptor

2. The core concept of Disruptor

Let’s start by understanding the core concepts of Disruptor to understand how it works. The conceptual model introduced below is both a domain object and a core object mapped to code implementation.

1. Ring Buffer

As the name suggests, a circular buffer. RingBuffer used to be the main object in Disruptor, but starting from version 3.0, its responsibilities were simplified to only storing and updating the data (events) exchanged through Disruptor. In some more advanced application scenarios, Ring Buffer can be completely replaced by user-defined implementation.

2. Sequence Disruptor

The data (events) exchanged through it are numbered and managed through sequentially increasing serial numbers, and the processing of data (events) is always processed incrementally along the serial number. A Sequence is used to track the processing progress of a specific event handler (RingBuffer/Consumer).

Although an AtomicLong can also be used to identify progress, defining Sequence to be responsible for this problem has another purpose, which is to prevent the problem of CPU cache false sharing (Flase Sharing) between different Sequences.

Note: This is one of the key points for Disruptor to achieve high performance. There are already a lot of introductions to the pseudo-sharing problem on the Internet, so I won’t go into details here.

3. Sequencer

Sequencer is the true heart of Disruptor. This interface has two implementation classes, SingleProducerSequencer and MultiProducerSequencer, which define concurrency algorithms for fast and correct data transfer between producers and consumers.

4. Sequence Barrier

Used to maintain references to the main published Sequence of RingBuffer and the Sequences of other Consumers that the Consumer depends on. The Sequence Barrier also defines the logic that determines whether the Consumer has more events to handle.

5. Wait Strategy

Define the strategy for how the Consumer waits for the next event. (Note: Disruptor defines a variety of different strategies, providing different performance for different scenarios)

6. Event

In the semantics of Disruptor, the data exchanged between producers and consumers is called events. It is not a specific type defined by Disruptor, but is defined and specified by the user of Disruptor.

7. EventProcessor

EventProcessor holds the Sequence of a specific consumer (Consumer) and provides an event loop (Event Loop) for calling event processing implementation.

8. EventHandler

The event processing interface defined by Disruptor is implemented by users and is used to process events. It is the real implementation of Consumer.

9. Producer

That is, the producer generally refers to the user code that calls Disruptor to publish events. Disruptor does not define a specific interface or type.

3. Case-demo

Through the following 8 steps, you can get Disruptor Get home:

  1. Add pom.xml dependency

    <dependency>
        <groupId>com.lmax</groupId>
        <artifactId>disruptor</artifactId>
        <version>3.4.4</version>
    </dependency>
    
  2. Message body Model

    /**
     * Message body
     */
    @Data
    public class MessageModel {
        private String message;
    }
    
  3. ConstructEventFactory

    public class HelloEventFactory implements EventFactory<MessageModel> {
        @Override
        public MessageModel newInstance() {
            return new MessageModel();
        }
    }
    
  4. Construct EventHandler-Consumer

    @Slf4j
    public class HelloEventHandler implements EventHandler<MessageModel> {
        @Override
        public void onEvent(MessageModel event, long sequence, boolean endOfBatch) {
            try {
                //The purpose of stopping 1000ms here is to confirm that the consumption message is asynchronous
                Thread.sleep(1000);
                log.info("Consumer processing message begins");
                if (event != null) {
                    log.info("The information consumed by the consumer is: {}", event);
                }
            } catch (Exception e) {
                log.info("Consumer failed to process message");
            }
            log.info("Consumer processing message ends");
        }
    }
    
  5. Construct BeanManager

    /**
     * Get the instantiated object
     */
    @Component
    public class BeanManager implements ApplicationContextAware {
    
        private static ApplicationContext applicationContext = null;
    
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            this.applicationContext = applicationContext;
        }
    
        public static ApplicationContext getApplicationContext() { return applicationContext; }
    
        public static Object getBean(String name) {
            return applicationContext.getBean(name);
        }
    
        public static <T> T getBean(Class<T> clazz) {
            return applicationContext.getBean(clazz);
        }
    }
    
  6. Construct MQManager

    @Configuration
    public class MQManager {
    
        @Bean("messageModel")
        public RingBuffer<MessageModel> messageModelRingBuffer() {
            //Define the thread pool for event processing. Disruptor triggers consumer event processing through the thread provided by java.util.concurrent.ExecutorSerivce.
            ExecutorService executor = Executors.newFixedThreadPool(2);
    
            //Specify event factory
            HelloEventFactory factory = new HelloEventFactory();
    
            //Specify the ringbuffer byte size, which must be 2 to the Nth power (it can convert the modulo operation into a bit operation to improve efficiency), otherwise it will affect the efficiency
            int bufferSize = 1024 * 256;
    
            //Single-threaded mode to gain additional performance
            Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, executor,
                    ProducerType.SINGLE, new BlockingWaitStrategy());
    
            //Set event business processor---consumer
            disruptor.handleEventsWith(new HelloEventHandler());
    
            //Start the disruptor thread
            disruptor.start();
    
            //Get the ringbuffer ring, used to receive events produced by the producer
            RingBuffer<MessageModel> ringBuffer = disruptor.getRingBuffer();
    
            return ringBuffer;
        }
    }
    
  7. Construct Mqservice and implementation class-producer

    public interface DisruptorMqService {
    
        /**
         * information
         * @param message
         */
        void sayHelloMq(String message);
    }
    
    @Slf4j
    @Component
    @Service
    public class DisruptorMqServiceImpl implements DisruptorMqService {
    
        @Autowired
        private RingBuffer<MessageModel> messageModelRingBuffer;
    
    
        @Override
        public void sayHelloMq(String message) {
            log.info("record the message: {}",message);
            //Get the subscript of the next Event slot
            long sequence = messageModelRingBuffer.next();
            try {
                //Fill data into Event
                MessageModel event = messageModelRingBuffer.get(sequence);
                event.setMessage(message);
                log.info("Add message to message queue: {}", event);
            } catch (Exception e) {
                log.error("failed to add event to messageModelRingBuffer for : e = {},{}",e,e.getMessage());
            } finally {
                //Publish the Event, activate the observer to consume, and pass the sequence to the consumer
                //Note that the last publish method must be placed in finally to ensure that it must be called; if the sequence of a request is not submitted, it will block subsequent publishing operations or other producers.
                messageModelRingBuffer.publish(sequence);
            }
        }
    }
    
  8. Construct test classes and methods

    @Slf4j
    @RunWith(SpringRunner.class)
    @SpringBootTest(classes = DemoApplication.class)
    public class DemoApplicationTests {
    
        @Autowired
        private DisruptorMqService disruptorMqService;
        /**
         * The project uses Disruptor as a message queue internally
         * @throwsException
         */
        @Test
        public void sayHelloMqTest() throws Exception{
            disruptorMqService.sayHelloMq("The message has arrived, Hello world!");
            log.info("Message queue has been sent");
            //The purpose of stopping 2000ms here is to ensure that the message processing is asynchronous.
            Thread.sleep(2000);
        }
    }
    

Test run results

2023-04-05 14:31:18.543 INFO 7274 --- [main] c.e.u.d.d.s.Impl.DisruptorMqServiceImpl: record the message: The message has arrived, Hello world!
2023-04-05 14:31:18.545 INFO 7274 --- [main] c.e.u.d.d.s.Impl.DisruptorMqServiceImpl: Add a message to the message queue: MessageModel(message=Message has arrived, Hello world!)
2023-04-05 14:31:18.545 INFO 7274 --- [main] c.e.utils.demo.DemoApplicationTests: The message queue has been sent
2023-04-05 14:31:19.547 INFO 7274 --- [pool-1-thread-1] c.e.u.d.disrupMq.mq.HelloEventHandler: Consumer processing message starts
2023-04-05 14:31:19.547 INFO 7274 --- [pool-1-thread-1] c.e.u.d.disrupMq.mq.HelloEventHandler: The information consumed by the consumer is: MessageModel(message=Message has arrived, Hello world!)
2023-04-05 14:31:19.547 INFO 7274 --- [pool-1-thread-1] c.e.u.d.disrupMq.mq.HelloEventHandler: Consumer processing message ends

4. Summary

In fact, the generator -> consumer pattern is very common, and the above effects can be easily achieved through some message queues. The difference is that Disruptor is implemented as a queue in memory and is lock-free. This is why Disruptor is so efficient.

The knowledge points of the article match the official knowledge files, and you can further learn related knowledge. Java skill treeConcurrencyDefinition of concurrency 139214 people are learning the system