Millions of levels of import and export, watch me do this

image.png
Blog: xiaohugg.top

Background

I believe that in business, there will be a lot of data import and export, which generally involves the import and export of millions of data levels. Without an excellent performance architecture, it is easy to cause OOM and cause the server to crash, so I wrote this article. Simply write down the core pseudo-code to serve as a starting point

Functional design

Technology selection: springboot + mysql + thread pool + mybatisplus + aop + juc tools + easyExcel

Because data export involves IO operations, if the thread pool is not used, the serialization will take a long time, and the amount of data is large. If the data is not paging, memory overflow may occur.

Thread pool thread configuration

**Involving IO intensive and CPU cycles, here we mainly set the thread data to 2*n + 1. This configuration is ideal. For specific online business, you need to look at the business. If you are interested, you can read this article written by JD.com Blog: **Jingdong explains the calculation of CPU core and thread number

static {<!-- -->
        CPU = Runtime.getRuntime().availableProcessors();
        N_THREAD = 2 * CPU + 1;
        EXECUTOR_SERVICE = Executors.newFixedThreadPool(N_THREAD);
    }

Data generation

  • Create table statement
CREATE TABLE `employees1` (
  `id` int NOT NULL AUTO_INCREMENT,
  `name` varchar(24) NOT NULL DEFAULT '' COMMENT 'name',
  `age` int NOT NULL DEFAULT '0' COMMENT 'age',
  `position` varchar(20) NOT NULL DEFAULT '' COMMENT 'position',
  `hire_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'hire time',
  PRIMARY KEY (`id`),
  KEY `idx_name_age_position` (`name`,`age`,`position`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=1800001 DEFAULT CHARSET=utf8mb3 COMMENT='Employee Record Table';

You can directly generate 100W data in batches randomly on the mysql client

  • The code provides data generation (second-level generation)
@LogExecutionTime
    public void generateData(Integer number) throws InterruptedException {<!-- -->
        //Less than 10W, default to 10W
        number = number < 100000 ? 100000 : number;
        //By default, each thread processes 1W data
        int group = number / 10000;
        CountDownLatch countDownLatch = new CountDownLatch(group);
        otherThreadBuildData(group, countDownLatch);
        countDownLatch.await();
    }

    private void otherThreadBuildData(int group, CountDownLatch countDownLatch) {<!-- -->
        for (int i = 0; i < group; i + + ) {<!-- -->
            EXECUTOR_SERVICE.execute(() -> {<!-- -->
                List<Employees> read = new ArrayList<>();
                for (int j = 0; j < 10000; j + + ) {<!-- -->
                    try {<!-- -->
                        Employees employees = Employees.create("wang" + j, j, j + "");
                        read.add(employees);
                        if (read.size() >= 2000) {<!-- -->
                            //The idea of separation of reading and writing, the reading thread continues reading, and the writing thread brushes the library
                            final List[] writers = new List[]{<!-- -->new ArrayList<>(read)};
                            EXECUTOR_SERVICE.execute(() -> {<!-- -->
                                //todo transaction cannot take effect here, you need to use a proxy class here
                                saveBatch(writers[0], writers[0].size());
                                //convenient for gc
                                writers[0] = null;
                            });
                            read.clear();
                        }
                    } finally {<!-- -->
                        countDownLatch.countDown();
                    }
                }
            });
        }
    }

Mainly used The idea of separation of reading and writing improves the concurrent performance of reading and writing. Here, after the writing thread refreshes the library, it is displayed that the writers are set to null, imitates map source code to facilitate gc

Attention here!!! saveBatch needs to use a proxy class, and does not need to be called through this, which will cause the transaction to fail. I am just simulating

countDownLatch controls the asynchronous thread and controls the main thread to block, waits for all threads to finish processing, and wakes up the main thread

The computer performance configuration is sufficient, and it can basically be controlled within seconds

Export

Method 1

  • Use concurrentHashMap to safely join the queue
  • Asynchronous thread paging query data to construct data

** – 20 sheets are created by default**

** – Each asynchronous thread processes a sheet page and queries data by page**

** The paging offsite offset here is large, and paging will also have performance problems. You need to use mybatis stream query or custom sql, and inner join index coverage**

** < select> id, name from user where id > #{param1} limit 0, #{param2} < /select>**

  • completeableFeture blocks the main thread
  • Get the map data and write it to excel in the main thread single thread

easyExcel does not support multi-threaded writing to sheets. For details, please see the official issue:easyExcel issue

@Override
    @LogExecutionTime
    public void exportData(HttpServletRequest request, HttpServletResponse response) {<!-- -->
        //Build response object
        generateExcelHead(response);

        Map<Integer, List<Employees>> map = new ConcurrentHashMap<>();

        //Asynchronous thread paging query data
        List<CompletableFuture<Void>> futures = otherThreadBuildData(map);

        CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();

        //The main thread writes to excel once
        masterThreadExportExcel(response, map);
    }

    private List<CompletableFuture<Void>> otherThreadBuildData(Map<Integer, List<Employees>> map) {<!-- -->
        //Statistics on the number of databases
        Long count = employeesMapper.selectCount();
        //Default divided into 20 sheet pages
        int page = 20;
        long size = count / page;
        List<CompletableFuture<Void>> futures = new ArrayList<>(page);
        for (int i = 0; i < page; i + + ) {<!-- -->
            int finalI = i;
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {<!-- -->
                Page<Employees> employeesPage = new Page<>(finalI + 1L, size);
                IPage<Employees> selectedPage = employeesMapper.selectPage(employeesPage, null);
                map.put(finalI, selectedPage.getRecords());
            }, EXECUTOR_SERVICE);
            futures.add(future);
        }
        return futures;
    }

    @SneakyThrows
    private static void masterThreadExportExcel(HttpServletResponse response, Map<Integer, List<Employees>> map) {<!-- -->
        try (ExcelWriter excelWriter = EasyExcelFactory.write(response.getOutputStream(), Employees.class).build()) {<!-- -->
            //Write to excel. Writing to excel cannot be written by multiple threads. See easyexcel issue for details.
            for (Map.Entry<Integer, List<Employees>> entry : map.entrySet()) {<!-- -->
                Integer sheetNo = entry.getKey();
                List<Employees> employees = entry.getValue();
                WriteSheet writeSheet = EasyExcelFactory
                        .writerSheet(sheetNo, "template" + sheetNo)
                        .build();
                excelWriter.write(employees, writeSheet);
                logger.info("How many times have I written to excel {}" + sheetNo);
            }
        }
    }

Exporting 200W of data can be basically controlled within 10-15s on average, and the data can also be exported to excel normally

Method 2

  • Use blocking queues to form a publish-subscribe model. Producers produce messages and consumers consume them immediately.
  • Decoupled and no need to wait for all features to be executed
@LogExecutionTime
    public void exportData(HttpServletRequest request, HttpServletResponse response) {<!-- -->
        //Build response object
        generateExcelHead(response);

        BlockingQueue<List<Employees>> queue = new ArrayBlockingQueue<>(20);
        //Asynchronous thread paging query data
        otherThreadBuildData(queue);

        //The main thread writes to excel once
        masterThreadExportExcel(response, queue);
    }

    private void otherThreadBuildData(BlockingQueue<List<Employees>> queue) {<!-- -->
        //Statistics on the number of databases
        Long count = employeesMapper.selectCount();
        //Default divided into 20 sheet pages
        int page = 20;
        long size = count / page;
        for (int i = 0; i < page; i + + ) {<!-- -->
            int finalI = i;
            CompletableFuture.runAsync(() -> {<!-- -->
                Page<Employees> employeesPage = new Page<>(finalI + 1L, size);
                IPage<Employees> selectedPage = employeesMapper.selectPage(employeesPage, null);
                try {<!-- -->
                    queue.put(selectedPage.getRecords());
                } catch (InterruptedException e) {<!-- -->
                    logger.error("Failed to join the queue, failure reason" + e.getMessage());
                    //safe point
                    Thread.currentThread().interrupt();
                }
            }, EXECUTOR_SERVICE);
        }
    }

    @SneakyThrows
    private void masterThreadExportExcel(HttpServletResponse response, BlockingQueue<List<Employees>> queue) {<!-- -->
        try (ExcelWriter excelWriter = EasyExcelFactory.write(response.getOutputStream(), Employees.class).build()) {<!-- -->
            AtomicInteger atomicInteger = new AtomicInteger();
            do {<!-- -->
                List<Employees> employeesList = queue.take();
                int sheetNo = atomicInteger.getAndIncrement();
                if (!CollectionUtils.isEmpty(employeesList)) {<!-- -->
                    //Write to excel. Writing to excel cannot be written by multiple threads. See easyexcel issue for details.
                    WriteSheet writeSheet = EasyExcelFactory
                            .writerSheet(sheetNo, "template" + sheetNo)
                            .build();
                    excelWriter.write(employeesList, writeSheet);
                    logger.info(String.format("The %sth time writing to excel ",sheetNo));
                }

            } while (!queue.isEmpty());
        }
    }

Import

Multi-threaded multi-sheet import, the premise is that excel needs to have multiple sheets

  • Get the number of sheets in the excel file
  • Each thread processes one sheet page of data
  • Through easyExcel’s monitoring mechanism
  • Encapsulated into a Callable object
  • Thread pool unified scheduling
  • Listen to add data, because list is a shared variable, each thread has its own list, Threadlocal is used here

@LogExecutionTime
    public void importExcel(MultipartFile file) {<!-- -->
        //Get the total number of sheets in excel
        int size = listSheet(file.getInputStream());
        importExcelAsync(file, size);
    }

    public void importExcelAsync(MultipartFile file, int number) {<!-- -->
        List<Callable<Object>> tasks = new ArrayList<>();
        for (int i = 0; i < number; i + + ) {<!-- -->
            int num = i;
            tasks.add(() -> {<!-- -->
                EasyExcelFactory.read(file.getInputStream(), Employees.class, indexListener)
                        .sheet(num).doRead();
                return null;
            });
        }
        try {<!-- -->
            EXECUTOR_SERVICE.invokeAll(tasks);
        } catch (InterruptedException e) {<!-- -->
            Thread.currentThread().interrupt();
        }
    }

    private void generateExcelHead(HttpServletResponse response) throws UnsupportedEncodingException {<!-- -->
        response.setContentType("application/vnd.ms-excel");
        response.setCharacterEncoding("utf-8");
        String fileName = URLEncoder.encode("Test multi-threaded sheet export", "UTF-8") + "_" + System.currentTimeMillis();
        response.setHeader("Content-disposition", "attachment;filename=" + fileName + ".xlsx");
    }

    public int listSheet(InputStream inputStream) {<!-- -->
        if (inputStream == null) {<!-- -->
            throw new IllegalArgumentException("inputStream is null");
        }
        try (ExcelReader build = EasyExcelFactory.read(inputStream).build()) {<!-- -->
            return build.excelExecutor().sheetList().size();
        }
    }

It takes about 50 seconds to import 1 million data

End

The above code is quite simple. The real scenario is definitely not that simple. It requires various cleanings, conversions and ETL. Technology always serves the business. This is just to provide an idea. If there is a better one, Suggestions are welcome to discuss~~

Detailed source code address: https://gitee.com/xiaohu88/export