Meetup Review|The 16th Issue of Data Infra Research Society (including data release)

This article is compiled from the activities of the 16th issue of Data Infra last Saturday (October 21). In this event, Databend R&D engineer Wang Xudong brought you a sharing with the theme of “Databend hash join spill design and implementation”. Let us review it together~

The following are the relevant videos, materials and texts of this event:

Through this sharing, we can better understand the design and implementation of Databend’s hash join spill, and learn how to use the spill function.

The replay of this event can also be found on Station B:

Databend hash join spill design and implementation|Data Infra Research Institute Issue 16_bilibili_bilibili

“Databend hash join spill design and implementation”

The lecture notes and related materials for this event can be found in the PDF file of Data Infra Issue 16: https://github.com/databendcn/data-infra/tree/main/Issue 16-20231021

Hash join design under pipeline architecture

The left side is a typical two-table join plan. The pipeline builder will generate the pipeline on the right, including the main pipeline and a sub-pipeline (build pipeline).

The probe pipeline and the build pipeline are related through the bridge structure. The hash table and some states shared by build and probe will be stored in the bridge. After the hash join build side generates the hash table, the hash table will be used by the probe side through the bridge.

Hash join is multi-threaded, assuming that the build side has N threads and the probe side has M threads. Probe needs to wait for the build to complete before it can start. Because the two pipelines start at the same time, we cannot determine whether the build or the probe arrives first, so the probe may occur before the build, and because it is multi-threaded execution, all probe threads may arrive first with the build thread, or it may Interleaving occurs, and the probe thread that arrives early needs an asynchronous wait state.

The most intuitive idea is to use notify to control the waiting between build and probe. Because it is multi-threaded, consider notify waiters(), but notify does not know how many waiters there are in advance, it will only wake up For waiters who have register, it is impossible to find a suitable place to register in the build and probe modes, so we do not consider notify but use tokio’s watch channel Solve the multi-threaded model of Hash join.

The initial value in the channel is 0. When the build side is completed, the last build thread sends 1 to the channel to wake up all probe threads. When the probe starts waiting for the build, it will subscribe to the watcher channel and get a receiver. If it is already 1 at this time, you can directly probe. Otherwise, you have to wait for a change to occur in the channel and the last thread of the build to write 1 to the channel.

pub async fn wait_first_round_build_done( & amp;self) -> Result<()> {
    let mut rx = self.build_done_watcher.subscribe();
    if *rx.borrow() == 1_u8 {
        return Ok(());
    }
    rx.changed()
        .await
        .map_err(|_| ErrorCode::TokioError("build_done_watcher's sender is dropped"))?;
    debug_assert!(*rx.borrow() == 1_u8);
    Ok(())
}

After sorting out the interaction between build and probe, take a look at the status of build. When spill is not considered, its state is relatively simple, with only three steps. Different steps correspond to different events and trigger different behaviors, some are asynchronous and some are synchronous. Some heavier IO will be asynchronous, and there are also threads. The waiting time will also be asynchronous. For example, you need to wait for all threads to complete the running step (that is, collect all data) before finalize.

enum HashJoinBuildStep {
    // The running step of the build phase.
    Running,
    // The finalize step is waiting all build threads to finish and build the hash table.
    Finalize,
    // The fast return step indicates there is no data in build side,
    // so we can directly finish the following steps for hash join and return empty result.
    FastReturn,
    // Wait to spill
    WaitSpill,
    // Start the first spill
    FirstSpill,
    // Following spill after the first spill
    FollowSpill,
    // Wait probe
    WaitProbe,
    // The whole build phase is finished.
    Finished,
}

First, all threads start running and enter the first step-running. This step mainly collects input data into the chunk. After one thread completes the current task, it needs to wait for others to complete. Here we can use Barrier This sync structure. The last thread is responsible for dividing finalize tasks and initializing the hash table. After that, all threads enter the finalize stage and write the hash table in parallel.

FastReturn is a fast path. If the build side data is empty, then for some specific join types, the probe can return directly without the need to probe an empty hash table.

Next, look at the status of the probe

enum HashJoinProbeStep {
    // The step is to wait build phase finished.
    WaitBuild,
    // The running step of the probe phase.
    Running,
    // The final scan step is used to fill missing rows for non-inner join.
    FinalScan,
    // The fast return step indicates we can directly finish the probe phase.
    FastReturn,
    // Spill step is used to spill the probe side data.
    Spill,
    // Async running will read the spilled data, then go to probe
    AsyncRunning,
}

The first step is what we mentioned before: the waiting for build phase. After this phase is completed, enter the probe phase. After all threads have completed the probe, a Final Scan must be performed for non-inner joins to fill in NULL.

Spiller module design

Spiller is a relatively independent module, which means it is not limited to a certain operator. All operators with spill requirements can use the Spiller module to complete spill operations.

Specifically, the spiller is responsible for the following tasks:

  1. Collect data that needs to be spilled
  2. partition needs to spill data
  3. Serialize and deserialize data
  4. Read and write interaction with storage

Each partition has a file list, and the corresponding files are written to the storage through opendal.

Hash join spill design and implementation

First, take a look at the build side. 80% of the workload is on the build side. The probe only needs to spill based on the spill information of the build.

enum HashJoinBuildStep {
    // The running step of the build phase.
    Running,
    // The finalize step is waiting all build threads to finish and build the hash table.
    Finalize,
    // The fast return step indicates there is no data in build side,
    // so we can directly finish the following steps for hash join and return empty result.
    FastReturn,
    // Wait to spill
    WaitSpill,
    // Start the first spill
    FirstSpill,
    // Following spill after the first spill
    FollowSpill,
    // Wait probe
    WaitProbe,
    // The whole build phase is finished.
    Finished,
}

After the introduction of spill, the build step has four more main steps, WaitSpill, FirstSpill, FollowSpill and WaitProbe.

Each thread has its own Spiller, otherwise the spill work of this thread, and the spill of different threads are coordinated through BuildSpillCoordinator.

If a thread determines the size of the current memory data and finds that spill is needed, it will enter the WaitSpill state. BuildSpillCoordinator will record the number of threads currently waiting for spill. The last thread will not enter the wait state, but directly As a coordinator to coordinate the first spill, it will collect all the data waiting for spill in the buffer, partition it, generate tasks evenly, and distribute them to each thread. The partition set of each thread is the same. After the first spill is completed, there is no need to buffer data for subsequent spills. If the data has a corresponding partition, it can be spilled directly. Otherwise, it will be buffered to see whether subsequent spills are needed. If the memory is enough, a hash table can be generated directly.

After all the spill work is completed, perform the normal hash join build process on the data in the memory to generate a hash table, send it to the probe through the bridge, and then enter the wait probe state.

Next, let’s take a look at the work of spill on the hash join probe side, and then return to build.

Probe is the same as build, each thread has a Spiller.

enum HashJoinProbeStep {
    // The step is to wait build phase finished.
    WaitBuild,
    // The running step of the probe phase.
    Running,
    // The final scan step is used to fill missing rows for non-inner join.
    FinalScan,
    // The fast return step indicates we can directly finish the probe phase.
    FastReturn,
    // Spill step is used to spill the probe side data.
    Spill,
    // Async running will read the spilled data, then go to probe
    AsyncRunning,
}

With spill, when the WaitBuild phase ends, it will enter the Spill phase.

The build will send its partition set through the bridge, such as {0, 1, 2 3}, and the probe will also use Spiller to calculate the partition on the data. If the partition id is in the partition set of the build, it will be flushed. For data that is not there, it will be flushed. , if it is the first round, it will be probed with the hash table sent by the build.

After the spill is completed, a partition id will be selected and sent to build. After build gets the id, it will read the relevant partition data, perform the normal hash join build process, and generate a hash table to the probe, and the probe will also read the corresponding The data of id is probed, which is the normal hash join process. Each time a round is completed, a partition id is taken until there are no more partitions to read.

Future Planning

  1. Support recursive spill
  2. Application specific scenarios
  3. advanced optimization

Connect With Us

Databend is an open source, flexible, low-cost, new data warehouse based on object storage that can also perform real-time analysis. We look forward to your attention and exploring cloud native data warehouse solutions together to create a new generation of open source Data Cloud.

  • Databend Website
  • GitHub Discussions
  • Twitter
  • Slack Channel

The knowledge points of the article match the official knowledge files, and you can further learn relevant knowledge. Cloud native entry-level skills treeHomepageOverview 16,757 people are learning the system