Using event listening to implement message subscription and publishing in springboot

The previous article introduced the use of eventbus in guava to implement the publish and subscribe function. It can already achieve code decoupling very well for ordinary projects. In fact, similar functions are also provided in spring. There is no need to introduce third-party dependencies in spring projects. In order to implement the publish and subscribe function, in spring, messages are published mainly through the publishEvent() method in the ApplicationContext method, and then messages are received through the implementation class of ApplicationListener. Here’s how to use it:

First, you need to introduce springboot related dependencies:

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.6.11</version>
    <relativePath/>
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.12</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.7</version>
    </dependency>
</dependencies>

<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
        </plugin>
    </plugins>
</build>

Enable asynchronous on the main startup class, so that when adding asynchronous annotations to the consumer, it can be decoupled from the producer thread and achieve asynchronous consumption. Otherwise, the producer and consumer are in the same thread and the asynchronous effect cannot be achieved. :

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;

/**
 * @Author xingo
 * @Date 2023/11/1
 */
@SpringBootApplication
@EnableAsync
public class ProviderApplication {<!-- -->

    public static void main(String[] args) {<!-- -->
        SpringApplication.run(ProviderApplication.class, args);
    }
}

The message produced by the producer can be encapsulated into an entity class. This entity class can be an ordinary pojo class. This kind of message needs to be consumed through the @EventListener annotation. The second is to inherit the entity class of ApplicationEvent. This kind of message can be passed through @ EventListener annotation can be used for consumption, or it can be consumed by implementing the ApplicationListener interface. Create two entity classes for message carriers below:

import org.springframework.context.ApplicationEvent;

/**
 *Event entity class 1
 *
 * @Author xingo
 * @Date 2023/11/1
 */
public class MyEvent1 extends ApplicationEvent {<!-- -->

    private String message;

    public MyEvent1(Object source) {<!-- -->
        super(source);
    }

    public String getMessage() {<!-- -->
        return message;
    }

    public void setMessage(String message) {<!-- -->
        this.message = message;
    }
}
/**
 * Event entity class 2
 *
 * @Author xingo
 * @Date 2023/11/1
 */
public class MyEvent2 {<!-- -->

    private String message;

    public String getMessage() {<!-- -->
        return message;
    }

    public void setMessage(String message) {<!-- -->
        this.message = message;
    }
}

There are two types of message consumers: the first way is to implement the ApplicationListener interface, specify the message type in the generic, and receive and consume the message through the onApplicationEvent method:

import com.alibaba.fastjson.JSONObject;
import org.example.pojo.MyEvent1;
import org.springframework.context.ApplicationListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

/**
 *Event monitoring method 1
 *
 * @Author xingo
 * @Date 2023/11/1
 */
@Async
@Component
public class MyApplicationListener implements ApplicationListener<MyEvent1> {<!-- -->

    @Override
    public void onApplicationEvent(MyEvent1 event) {<!-- -->
        System.out.println("onApplicationEvent -> thread : " + Thread.currentThread().getName() + " | receive : " + JSONObject.toJSONString(event));
    }
}

The second way is to consume data through the @EventListener annotation. The messages it consumes can be consumed without inheriting ApplicationEvent:

import com.alibaba.fastjson.JSONObject;
import org.example.pojo.MyEvent1;
import org.example.pojo.MyEvent2;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

/**
 *Event monitoring
 *
 * @Author xingo
 * @Date 2023/11/1
 */
@Component
public class MyEventListener {<!-- -->

    @Async
    @EventListener(MyEvent1.class)
    public void listener01(MyEvent1 event) {<!-- -->
        System.out.println("listener01 -> thread : " + Thread.currentThread().getName() + " | receive : " + JSONObject.toJSONString(event));
    }

    @Async
    @EventListener(MyEvent2.class)
    public void listener02(MyEvent2 event) {<!-- -->
        System.out.println("listener02 -> thread : " + Thread.currentThread().getName() + " | receive : " + JSONObject.toJSONString(event));
    }

    @EventListener(MyEvent1.class)
    public void listener03(MyEvent1 event) {<!-- -->
        System.out.println("listener03 -> thread : " + Thread.currentThread().getName() + " | receive : " + JSONObject.toJSONString(event));
    }

    @EventListener(MyEvent2.class)
    public void listener04(MyEvent2 event) {<!-- -->
        System.out.println("listener04 -> thread : " + Thread.currentThread().getName() + " | receive : " + JSONObject.toJSONString(event));
    }
}

The above method can implement asynchronous consumption function by adding @Async. If this annotation is not added, the consumer and producer will execute in the same thread.

Producers are relatively simple. They only need to inject ApplicationContext into the class and send event messages through the publishEvent() method:

import org.example.pojo.ApiResult;
import org.example.pojo.MyEvent1;
import org.example.pojo.MyEvent2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Author xingo
 * @Date 2023/11/1
 */
@RestController
public class MessageController {<!-- -->

    @Autowired
    private ApplicationContext applicationContext;

    @GetMapping("/send/message")
    public ApiResult sendMessage(String message) {<!-- -->
        System.out.println("sendMessage -> thread : " + Thread.currentThread().getName() + " | message : " + message);

        MyEvent1 event1 = new MyEvent1(this);
        event1.setMessage(message);
        MyEvent2 event2 = new MyEvent2();
        event2.setMessage(message);

        applicationContext.publishEvent(event1);
        applicationContext.publishEvent(event2);
        return ApiResult.success(null);
    }
}

Through the above simple steps, the event subscription and publishing function in the same process is realized. It is relatively simple. The startup program calls the interface:

You can see that the console prints the following:

sendMessage -> thread : http-nio-9523-exec-1 | message : hello,world
listener03 -> thread : http-nio-9523-exec-1 | receive : {"message":"hello,world","timestamp":1698824483483}
onApplicationEvent -> thread : task-1 | receive : {"message":"hello,world","timestamp":1698824483483}
listener04 -> thread : http-nio-9523-exec-1 | receive : {"message":"hello,world"}
listener01 -> thread : task-2 | receive : {"message":"hello,world","timestamp":1698824483483}
listener02 -> thread : task-3 | receive : {"message":"hello,world"}

It can be seen that the method with the @Async annotation will open a new thread when receiving the message, and the method without the annotation will receive the message and publish the message in the same thread. If you want to achieve true decoupling, adding @Async on the method is necessary.

Subscription and publishing implemented in this way cannot replace the real message queue publishing and subscription. First of all, the producers and consumers in this way are in the same process and cannot achieve expansion. Although the number of threads can be increased by configuring the thread pool, but It also adds burden to the system; secondly, because this publish-subscribe method is based on memory, persistence is not provided by default to ensure that messages are not lost. Once the system crashes or restarts, unconsumed data will be lost. risk. In this case, you need to achieve data consistency in your business.

The subscription publishing model provided by the framework encapsulates related functions, and the usage scenarios are more versatile. If we don’t use the framework, we can still implement related functions, but it is a bit useless. For example, I can achieve the same effect based on blocking queues:

import com.alibaba.fastjson.JSONObject;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * Implement publish and subscribe function based on blocking queue
 *
 * @Author wangxixin
 * @Date 2023/11/1
 */
public class MyPubSub {<!-- -->

    private final LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();

    /**
     * Publish event
     * @param event
     */
    public void publish(String event) {<!-- -->
        queue.add(event);
    }

    /**
     * Listen for events
     */
    public void listener() {<!-- -->
        new Thread(() -> {<!-- -->
            while (true) {<!-- -->
                try {<!-- -->
                    String take = queue.take();
                    System.out.println("rec : " + JSONObject.toJSONString(take));
                } catch (Exception e) {<!-- -->
                    e.printStackTrace();
                }
            }
        }).start();
    }

    public static void main(String[] args) {<!-- -->
        MyPubSub myPubSub = new MyPubSub();
        myPubSub.listener();

        myPubSub.publish("Hello,world!");
    }
}