Cache synchronization of multi-level cache

There are three common ways to synchronize cache data:

Set validity period: Set the validity period for the cache, and it will be automatically deleted after expiration. Update when querying again

  • Advantages: simple and convenient
  • Disadvantages: poor timeliness, cache may be inconsistent before expiration
  • Scenario: Business with low update frequency and low timeliness requirements

Synchronous dual write: Directly modify the cache while modifying the database

  • Advantages: strong timeliness, strong consistency between cache and database
  • Disadvantages: code intrusion and high coupling;
  • Scenario: Cache data with high consistency and timeliness requirements

Asynchronous notification: Send an event notification when the database is modified, and the relevant services modify the cached data after listening to the notification.

  • Advantages: low coupling, multiple cache services can be notified at the same time
  • Disadvantages: Average timeliness, there may be intermediate inconsistencies
  • Scenario: The timeliness requirements are average and there are multiple services that need to be synchronized.

Asynchronous implementation can be implemented based on MQ or Canal:

1) Asynchronous notification based on MQ:

Interpretation:

  • After the product service completes the modification of the data, it only needs to send a message to MQ.
  • The cache service listens for MQ messages and then completes updates to the cache

There is still a small amount of code intrusion.

2) Notification based on Canal

Interpretation:

  • After the product service completes the product modification, the business ends directly without any code intrusion.
  • Canal monitors changes in MySQL and immediately notifies the cache service when changes are detected.
  • The cache service receives the canal notification and updates the cache

Zero code intrusion

1. Get to know Canal

Canal [k?'n?l], translated as waterway/pipeline/ditch, canal is an open source project under Alibaba , developed based on Java. Based on database incremental log analysis, incremental data subscription & consumption is provided. The address of GitHub: https://github.com/alibaba/canal

Canal is implemented based on the master-slave synchronization of mysql. The principle of MySQL master-slave synchronization is as follows:

  • 1) MySQL master writes data changes to the binary log (binary log), and the recorded data is called binary log events
  • 2) MySQL slave copies master‘s binary log events to its relay log (relay log)
  • 3) MySQL slave replays the events in relay log and reflects the data changes to its own data

And Canal disguises itself as a slave node of MySQL, thereby monitoring the binary log of master Change. Then notify the client of Canal of the obtained change information, and then complete the synchronization of other databases.

2. Monitoring Canal

Canal provides clients in various languages. When Canal detects changes in binlog, it will notify Canal client.

We can use the Java client provided by Canal to monitor Canal notification messages. When a change message is received, the cache is updated.

But here we will use the third-party open source canal-starter client on GitHub. Address: https://github.com/NormanGyllenhaal/canal-client

Perfectly integrated with SpringBoot and automatically assembled, it is much simpler and easier to use than the official client.

2.1. Introduction of dependencies:

<dependency>
    <groupId>top.javatool</groupId>
    <artifactId>canal-spring-boot-starter</artifactId>
    <version>1.2.1-RELEASE</version>
</dependency>

2.2. Writing configuration:

canal:
  destination: dcxuexi # The cluster name of canal must be consistent with the name set when installing canal
  server: 192.168.150.101:11111 # canal service address

2.3. Modify Itementity class

Complete the mapping between Item and database table fields through @Id, @Column, and other annotations:

package com.dcxuexi.item.pojo;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import org.springframework.data.annotation.Id;
import org.springframework.data.annotation.Transient;

import javax.persistence.Column;
import java.util.Date;

@Data
@TableName("tb_item")
public class Item {<!-- -->
    @TableId(type = IdType.AUTO)
    @Id
    private Long id;//product id
    @Column(name = "name")
    private String name;//product name
    private String title;//product title
    private Long price;//price (cents)
    private String image;//product image
    private String category;//category name
    private String brand;//Brand name
    private String spec;//Specifications
    private Integer status;//Product status 1-normal, 2-off the shelf
    private Date createTime; //Creation time
    private Date updateTime;//update time
    @TableField(exist = false)
    @Transient
    private Integer stock;
    @TableField(exist = false)
    @Transient
    private Integer sold;
}

2.4. Writing a listener

Write a listener by implementing the EntryHandler interface to listen for Canal messages. Note two points:

  • The implementation class specifies the monitored table information through @CanalTable("tb_item")
  • The generic type of EntryHandler is the entity class corresponding to the table
package com.dcxuexi.item.canal;

import com.github.benmanes.caffeine.cache.Cache;
import com.dcxuexi.item.config.RedisHandler;
import com.dcxuexi.item.pojo.Item;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;

@CanalTable("tb_item")
@Component
public class ItemHandler implements EntryHandler<Item> {<!-- -->

    @Autowired
    private RedisHandler redisHandler;
    @Autowired
    private Cache<Long, Item> itemCache;

    @Override
    public void insert(Item item) {<!-- -->
        //Write data to JVM process cache
        itemCache.put(item.getId(), item);
        //Write data to redis
        redisHandler.saveItem(item);
    }

    @Override
    public void update(Item before, Item after) {<!-- -->
        //Write data to JVM process cache
        itemCache.put(after.getId(), after);
        //Write data to redis
        redisHandler.saveItem(after);
    }

    @Override
    public void delete(Item item) {<!-- -->
        //Delete data to JVM process cache
        itemCache.invalidate(item.getId());
        //Delete data to redis
        redisHandler.deleteItemById(item.getId());
    }
}

Here, the operations on Redis are encapsulated into the object RedisHandler, which is a class we wrote when we were doing cache preheating. The content is as follows:

package com.dcxuexi.item.config;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.dcxuexi.item.pojo.Item;
import com.dcxuexi.item.pojo.ItemStock;
import com.dcxuexi.item.service.IItemService;
import com.dcxuexi.item.service.IItemStockService;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
public class RedisHandler implements InitializingBean {<!-- -->

    @Autowired
    private StringRedisTemplate redisTemplate;

    @Autowired
    private IItemService itemService;
    @Autowired
    private IItemStockService stockService;

    private static final ObjectMapper MAPPER = new ObjectMapper();

    @Override
    public void afterPropertiesSet() throws Exception {<!-- -->
        //Initialize cache
        // 1. Query product information
        List<Item> itemList = itemService.list();
        // 2. Put in cache
        for (Item item : itemList) {<!-- -->
            // 2.1.item serialized to JSON
            String json = MAPPER.writeValueAsString(item);
            // 2.2. Store in redis
            redisTemplate.opsForValue().set("item:id:" + item.getId(), json);
        }

        // 3. Query product inventory information
        List<ItemStock> stockList = stockService.list();
        // 4. Put in cache
        for (ItemStock stock : stockList) {<!-- -->
            // 2.1.item serialized to JSON
            String json = MAPPER.writeValueAsString(stock);
            // 2.2. Store in redis
            redisTemplate.opsForValue().set("item:stock:id:" + stock.getId(), json);
        }
    }

    public void saveItem(Item item) {<!-- -->
        try {<!-- -->
            String json = MAPPER.writeValueAsString(item);
            redisTemplate.opsForValue().set("item:id:" + item.getId(), json);
        } catch (JsonProcessingException e) {<!-- -->
            throw new RuntimeException(e);
        }
    }

    public void deleteItemById(Long id) {<!-- -->
        redisTemplate.delete("item:id:" + id);
    }
}