In-depth interpretation of the design and implementation of MongoDB CDC

MongoDB CDC Overview

MongoDB is a popular document-based non-relational database. MongoDB CDC [1] is a Flink connector provided by the Flink CDC community [2] for capturing change data (Change Data Capturing). It can be connected to the MongoDB database and Collect and capture the document addition, update, replacement, deletion and other change operations to generate a standard Flink Changelog event stream, which supports data processing through Flink SQL or DataStream API. After processing, it can be easily written to various programs supported by Flink. in the downstream system.

MongoDB CDC Core Features

Full incremental integrated reading

In actual business scenarios, it is often necessary to collect existing data and incremental data in the MongoDB database at the same time. MongoDB CDC can read full data and incremental data in an integrated manner. When the startup option is configured in initial mode, CDC will first scan the target collection and send an Insert record for each existing piece of data; after the snapshot is completed, CDC will automatically switch to incremental mode and start capturing after the connector is started. Incoming change data. It supports fault recovery at any time and is guaranteed to provide exactly-once semantics with no loss or duplication.

Support multiple consumption models

For different scenario requirements, MongoDB CDC can be set to start in the following modes:

  • latest mode

    • In this mode, MongoDB CDC will not process data that already exists before startup, and will only generate change records for changed data that arrives after startup. This means that the connector can only read data changes after the connector was started.

  • initial mode

    • In this mode, MongoDB CDC will first take a snapshot of all existing data, and then start capturing changed data after the snapshot is completed.

  • timestamp mode

    • In this mode, MongoDB CDC captures change data that occurred after a given timestamp. The timestamp value must be within the valid logging range of MongoDB.

Supports generation of complete change event stream

Versions before MongoDB 6.0 do not provide data on pre-change documents and deleted documents by default; only Upsert semantics can be achieved using this information (that is, Update Before data entries are missing). However, many useful operator operations in Flink rely on the complete Insert, Update Before, Update After, and Delete change flow. If you need to supplement missing pre-change events, a natural idea is to cache the current version snapshot of all documents in the Flink state; when you encounter an updated or deleted document, you can know the pre-change state by looking up the table. In the worst case, however, this operation may require saving records equivalent to 100% of the original data volume.

Currently, Flink SQL Planner will automatically generate a ChangelogNormalize node for the Upsert type Source and convert it into a complete change stream according to the above operations; the cost is that the operator node needs to store a huge amount of State data.

037c6fe624bf3157b68260e14a9451ae.png

The new Pre- and Post-Image features of MongoDB 6.0 [6] provide a more efficient solution: as long as the changeStreamPreAndPostImages function is enabled, MongoDB will update the data in a special collection every time a change occurs. Record the complete status of the document before and after changes. MongoDB CDC supports reading these records and producing the complete event stream, eliminating the dependence on the ChangelogNormalize node.

Heartbeat-based tag pushing mechanism

The current version of CDC implementation requires a globally unique Resume Token to locate the location of the change stream. However, MongoDB does not store all logs indefinitely, and older change records may be cleared when the storage time exceeds the limit or the log size exceeds the limit.

For collections that change frequently, clearing records will not cause any problems, because the traceback mark will be refreshed every time the latest change entry is obtained, ensuring the validity of the traceback mark. But for some collections that change very slowly, there may be a situation where “the last change was very old, causing its corresponding traceback mark to have been cleared”, which means that it is no longer possible to recover from the stream and read the next change (due to The traceback marker does not exist and cannot be located).

MongoDB provides a “heartbeat mechanism” option to solve this problem. When there is no changed data in the stream, you can also send a heartbeat packet to refresh the traceback mark. This allows slow-changing collections to keep their traceback tags updated without expiring. The heartbeat mechanism can be enabled through the heartbeat.interval.ms option of MongoDB CDC.

MongoDB CDC design solution

According to the different technical methods used, the technical evolution process of MongoDB CDC can be roughly divided into three stages: the earliest CDC (such as the early Debezium MongoDB, etc.) is based on querying OpLog log collection, mainly for early versions of MongoDB; the second stage is upgraded to Based on the Change Stream API design provided by MongoDB 3.6; in the third phase, which is the latest version, the Flink CDC community implemented a design based on FLIP-27 and incremental snapshot algorithms.

Phase 1: OpLog-based design solution

Early MongoDB did not have a special API for change monitoring needs. However, in order to support data synchronization and failure recovery in the case of distributed deployment of primary and secondary nodes, MongoDB will write all document operation records in the database into a special system collection sys.oplog [3]. The format of each record is as follows:

{
  "ts": Timestamp(1625660877, 2),
  "t": NumberLong(2),
  "h": NumberLong("5521980394145765083"),
  "v": 2,
  "op": "i",
  "ns": "test.users",
  "ui": UUID("edabbd93-76eb-42be-b54a-cdc29eb1f267"),
  "wall": ISODate("2021-07-07T12:27:57.689Z"),
  "o": {
      "_id": ObjectId("60e59dcd46db1fb4605f8b18"),
      "name": "1"
  }
}

The ts segment is used to record the unique timestamp when the operation occurred (the first digit is the Unix epoch timestamp, and the second digit is the version number within this second); ns records the database and collection of the operation, and op is the operation performed ( For example, i represents insertion) and o represents the inserted document. The connector only needs to continuously query the OpLog collection to obtain the latest data in chronological order and generate the corresponding log stream.

It should be noted that due to logging overhead, MongoDB only records the fields changed by the update operation; the deletion operation only includes the _id field of the deleted document. Therefore, this type of OpLog-based CDC implementation requires additional operations (for example, querying the complete document information after Update) to generate the Flink Upsert (including Insert, Update After, Delete) event stream. Since OpLog does not record document data before update or deletion, this type of CDC usually cannot generate Update Before events.

In addition, each shard of the MongoDB database has its own OpLog collection, so the connector needs to establish a connection with each shard at the same time and handle synchronization issues, which is cumbersome to implement.

The second stage: Design plan based on Change Stream API

MongoDB 3.6 introduces the new Change Stream API [4], which supports change information subscription at the database and collection levels, and provides a resumption mechanism based on Resume Token. For example, using db..watch(), you can subscribe to the operation changes of the corresponding collection. The format of each change record returned is as follows:

{
  // Resume Token
  _id: { _data: '...' },


  // operation type
  operationType: 'insert',


  //The timestamp recorded in oplogs
  clusterTime: Timestamp({ t: 1686638232, i: 1 }),


  // Timestamp with higher precision, available after MongoDB 6.0
  wallTime: ISODate("2023-06-13T06:37:12.290Z"),


  //Insert complete document information
  fullDocument: {
    _id: ObjectId("64880e8a9f46de45aa2630a4"),
    fieldKey: 'fieldValue'
  },


  // updated database and collection
  ns: { db: 'testdb', coll: 'testtable' },


  //Insert the unique ID of the document
  // The sharded collection also includes the shard key
  documentKey: { _id: ObjectId("64880e8a9f46de45aa2630a4") }
}

Compared with reading Oplog, CDC based on the change stream API has the following advantages:

  • Better support for sharded clusters. To subscribe to all change operations on a sharded collection, you only need to create a change stream cursor;

  • Convenient for recovery. You only need to record the Resume Token of each record, and you can go back any time within the validity period;

  • Supports automatic acquisition of complete documents after changes. Change records containing the updated complete document can be obtained through parameter configuration.

Early versions of MongoDB CDC implemented streaming update subscriptions through the change stream API.

The third stage: Design plan based on incremental snapshot algorithm

CDC’s change monitoring operation is usually divided into two steps: the first step is to take a complete snapshot of the state in the current database at startup, and the second step is to monitor real-time streaming data changes. The snapshot phase of earlier versions is single concurrent reading and does not support checkpoint and failure recovery. This means that when the amount of data is large, the snapshot phase will take a long time to execute and must be restarted from the beginning if it fails. The FLIP-27 proposal [5] provides the next generation Flink Source architecture, which abstracts the responsibility of reading data from the source into two modules, as shown in the following figure:

05cf42e299b7891b65080fcc4366cc54.jpeg

1. SplitEnumerator, responsible for managing and splitting data sources into multiple abstract shards;

2. Reader, responsible for reading actual data from abstract shards.

At runtime, the process of reading data is also divided into two steps:

1. Initially, SplitEnumerator is executed to split all data into abstract shards;

2. Assign each abstract shard to a Reader and perform actual reading logic.

Enumerator and each Reader each have their own Checkpoints and support failure recovery. Each source does not need to maintain its own sharding and concurrency model issues.

MongoDB CDC began to migrate to this new Source architecture in Flink CDC 2.3. When taking a snapshot, MongoDB CDC needs to split the collection to be snapshotted by Key. The strategy is as follows:

  • If the target collection is a shard collection, it will be split according to the actual physical shards;

  • Otherwise, use the splitVector function provided by MongoDB to split evenly;

  • If splitVector cannot be called, a heuristic is used to estimate the average size of the document and split by the number of rows of data.

Each obtained fragment corresponds to a document range specified by MinKey and MaxKey, which is called a SnapshotSplit.

In the streaming reading stage, we only need to specify the start and end time points of the data stream and determine the range of stream data records to be monitored. Such sharding is called streaming sharding (StreamSplit). If the stop time point is set to MAX_TIMESTAMP (the maximum representable timestamp), it means that this is an unbounded stream fragment that does not limit the stop time point.

Currently, MongoDB CDC with the incremental snapshot function enabled uses the Source interface definition method recommended by FLIP-27. SplitEnumerator will first split the existing data and generate snapshot shards; it will continue to generate snapshot shards only after monitoring that all snapshot shards are completed. Streaming sharding is converted to streaming reading.

After TaskFetcher receives the pending fragment, it will pass it to the corresponding SplitFetchTask according to its type (snapshot or streaming) for actual reading work. ScanSplitFetchTask will read existing data from the MongoDB database based on the Key range defined by the incoming snapshot shards; StreamSplitFetchTask will subscribe to the Change Stream API to obtain change data.

c9bc73e1bbbe9fd403e44853f3259276.png

All records generated by SplitFetchTask will be put into the event queue and forwarded by RecordEmitter to the specified deserializer; the deserializer will convert them into the final RowData and hand them over to downstream consumption.

For compatibility reasons, MongoDB Sources defined using traditional SourceFunction still exist in the current version of MongoDB CDC for non-incremental snapshot mode. However, this way of definition has been marked as deprecated by Flink and may be deleted in the future.

The underlying implementation of MongoDB CDC

The current version of Mongo CDC implementation relies heavily on the support provided by the MongoDB underlying layer for capturing changed data, such as change stream API, shard collection support, and snapshots before and after changes. The underlying mechanisms of these functions are introduced in detail below. Readers who are not interested in the underlying principles can quickly skip this chapter.

CDC technology based on Change Stream API

As mentioned above, capturing change data based on the change stream API is simpler and more effective than reading OpLog. However, in fact, the bottom layer of MongoDB’s change stream API is also implemented based on OpLog, which is a layer of encapsulation provided on OpLog. They have the following correspondence:

  • Each record in the change stream has a unique _id (i.e. Resume Token), which corresponds to an operation log in the OpLog collection;

  • The namespace, updateDescription, operationType and other fields in the change stream correspond to the content recorded in the OpLog.

On top of this, the Change Stream API provides the following conveniences:

  • Supports backtracking change flow through any valid Resume Token

Because the corresponding operation record in the OpLog can be found through the Resume Token (that is, the _id field of the OpLog record), the next change data can be consumed from the corresponding location.

  • Supports backtracking change flow starting from a specific Timestamp

MongoDB provides the startAtOperationTime option to start the change stream, which supports reading changes starting from a given timestamp. Since the OpLog collection contains the timestamps of all changes and is arranged in order by time, you only need to perform a binary search to locate the position in the change stream corresponding to a given timestamp.

The current MongoDB implementation has a special limitation: if the specified Timestamp occurs in the past (that is, the starting point of the change needs to be found in OpLog), you need to ensure that this Timestamp is within the log range recorded by OpLog. This limitation is easy to understand: if the given timestamp is in the future, there is no need to perform a binary search, just wait for the change data after the timestamp to arrive before starting to capture; but if the given timestamp is in the past, then Reliable binary searches are only possible within the range of the OpLog record; if the timestamp is older than the oldest data in the current OpLog, MongoDB cannot determine whether there have been other changes between them; and even if there has been, it cannot retrieve it from there. Recovered because the OpLog corresponding to these changes has been cleared. MongoDB refuses to backtrack when encountering this situation.

7e505789a1b5788149772c385a7af4ca.png

  • Support Full Document Lookup function

In order to save storage overhead, OpLog will only store the minimum necessary change data. For example, for an update operation, MongoDB does not record the complete document after the change, but only stores the changed fields; that is, the update operation log in OpLog does not record the complete document, which is not very practical. If you need complete document information, you also need to manually search for each update record in the OpLog.

The change stream API wraps this query requirement: just use the fullDocument: updateLookup parameter, and MongoDB will add the complete document to the returned record when it reads the Update event, and record it in the fullDocument field. Note that this only applies to Update type operations, because the Insert operation always contains the complete document information (because the document does not exist before the insertion operation); the Delete operation always only contains the _id information (because the deleted document can no longer be found ).

It should be noted that MongoDB does not guarantee that the document given by updateLookup must correspond to the result of the update operation. In other words, in the change stream records generated by continuous Update operations on the same document, the FullDocument detected by earlier changes may be overwritten by later changes.

b862be9b5fbfb453db8f5b0c26c45c00.png

This is the same problem as manually reading OpLog and then manually reading Full Document. However, since each document in MongoDB has a unique _id field, this issue will not have a big impact on the results in the data flow of Upsert mode.

MongoDB sharded collection support

Each shard node of MongoDB has its own OpLog collection, which records the change data belonging to its own shard respectively; this means that if you need to monitor changes in the shard collection based on OpLog, you need to monitor the OpLog collection of each shard in parallel. , and manually handle synchronization issues, sort and output change logs from different shards according to timestamps, which is difficult and risky.

The change stream API encapsulates the change capture of sharded collections, making it easier to use. Even if you subscribe to a sharded storage collection through the change stream API, only one change stream cursor will be generated, which contains change data from all shards; and provides a strong ordering guarantee, that is, the changes generated Data is always time-ordered.

The implementation of MongoDB is to design a centralized management node for the sharded collection, which is responsible for obtaining records from each node and sorting them to produce ordered output. OpLog records within each node are always ordered. Then after all nodes have given the earliest record in the node, the earliest one can be considered as the global earliest value and can be sent to the client that subscribes to the Change Stream.

But this actually has a problem: if the data of a node has never been updated, then it will not give a change record; in this way, the central node cannot confirm that this node was updated a long time ago, but it has not been sent out. Record, there is still no data; so the global timestamp cannot be advanced.

MongoDB’s solution to this problem is very similar to Flink’s Watermark mechanism: MongoDB requires each node to periodically send a blank instruction to the central node even if it does not change data. This allows the central node to confirm the synchronization status of each node, thereby advancing the global timestamp.

Efficiently support pre- and post-change snapshots

The underlying layer of Change Stream is implemented based on OpLog, and OpLog does not record document information before updates and deletions. In order to support the need to “get the pre-change/deletion document”, MongoDB has to store this information in an additional location. For reasons of compatibility and storage overhead, MongoDB did not choose to modify the OpLog format to store additional fields. Instead, it stored the document information before and after the change in a special collection (called Pre- and Post-Images), and provided a collection Granular switch control.

Since the overhead of recording before and after snapshots for each change and delete operation is not small, MongoDB provides many switches to prevent state expansion, such as:

  • Supports the expiration time of snapshots before and after configuration changes at the system level:

db.runCommand({
  setClusterParameter: {
    changeStreamOptions: {
      preAndPostImages: {
        expireAfterSeconds: '600' //Set the expiration time to 600 seconds
      }
    }
  }
})
  • Supports individual configuration of each collection to enable or disable snapshots before and after changes:

db.runCommand({
  collMod: "testtable",
  changeStreamPreAndPostImages: {
    enabled: true // Configure the change record snapshot in the testtable collection
  }
})
  • Supports selecting whether to read snapshots before and after the change for each change stream cursor:

db.testtable.watch({
  // Configure whether to read the post-change snapshot
  fullDocument: 'required',


  // Configure whether to read the pre-change snapshot
  fullDocumentBeforeChange: 'required'
})

Why do we need to store the changed document at the same time? The previously mentioned method of setting FullDocument to updateLookup can obtain the complete document information after the change, but there is no guarantee that the document information obtained after this update will be obtained. For example, when two update operations are performed in succession, the FullDocument of the previous update operation may be overwritten by the latter one. The reason is that this information is independently queried (Lookup) afterwards, rather than associated with the change event itself.

The post-change document record is stored in each change trigger and is associated with a specific OpLog change entry, so it can be guaranteed that the recorded document reflects the results of the change; when the fullDocument option is set to required or whenAvailable, that is Requires that records be read from a post-change snapshot rather than a “post-hoc query”.

When the scan.full-changelog option is enabled, MongoDB CDC will require an Update After event to be generated from the changed document record, thereby ensuring that each Update After event corresponds to the actual change operation. This is not guaranteed by the implementation of normalizing the Upsert stream via the ChangelogNormalize operator.

Announce: Flink CDC Community

Add Maintainer member

The rapid development of the Flink CDC community is inseparable from the selfless dedication of contributors. As of now, there are more than 100 contributors in the community, and the contributor group continues to expand. After being proposed by the Maintainer members of the Flink CDC community, the Flink CDC community has officially invited whhe (Sichuan fan) to join the Maintainer list of the Flink CDC community.

487c0c3d24043dad990dc7cfb8809bdd.jpeg

Teacher Chuanfen is a R&D engineer of the OceanBase open source team. He is mainly responsible for OceanBase’s open source ecological docking and community governance related work. He has been active in the Flink CDC community for a long time and participated in the development of multiple versions. As a core contributor, he has contributed to the community including Multiple PRs, including OceanBase CDC Connector, actively help users solve problems in the user group and issue list, making continuous contributions to community development. I look forward to Teacher Chuanfen as the Maintainer of Flink CDC to bring more database-side perspectives to the development of the community and help more community contributors and users. I also hope that more contributors can join the Maintainer list in the future and continue to promote it. community development.

Alibaba Cloud’s real-time computing Flink enterprise version of Flink based on Flink 1.17 has been officially released [7]. In this version, the MongoDB CDC connector has begun public testing and supports functions such as arbitrary timestamp consumption and complete event streams< sup>[8]. welcome!

Reference

[1] MongoDB CDC Community Documentation:

https://ververica.github.io/flink-cdc-connectors/release-2.4/content/connectors/mongodb-cdc(ZH).html

[2] Flink CDC community official website:

https://ververica.github.io/flink-cdc-connectors/

[3] MongoDB Oplog documentation:

https://www.mongodb.com/docs/manual/core/replica-set-oplog/

[4] MongoDB change flow documentation:

https://www.mongodb.com/docs/manual/changeStreams/

[5] Apache Flink FLIP-27 proposal:

https://cwiki.apache.org/confluence/display/FLINK/FLIP-27: + Refactor + Source + Interface

[6] MongoDB PreImages documentation:

https://www.mongodb.com/docs/atlas/app-services/mongodb/preimages/

[7] Alibaba Cloud MongoDB CDC documentation:

https://help.aliyun.com/zh/flink/developer-reference/mongodb-cdc-connector

[8] Alibaba Cloud Flink real-time computing Release Notes:

https://help.aliyun.com/zh/flink/product-overview/august-21-2023

▼ “Activity Recommendations” First purchase of 99 yuan, monthly trial ▼

4e1967eae62c2bc8e71643d530221c6d.png

Past selections

0ca6c62613fa631830cd1b6cf28e10f7.png

b74e74e6901b432fd7caea2b39034175.png

f6ce5ad8bc237dc4c8214e1c4804ad35.png

0a009b1dab356be231231c45940cd240.png

00f1dbac34cac9cad2d3ac1a5c2688c0.png

▼ Follow “Apache Flink” to get more technical information ▼

11567f2078ecb1174cf081d2408ae1db.png

d20c8d2f5a5dab6c178be05600a2313d.gif Click “ReadRead the original text “, Receive 5000CU* hours of Flink cloud resources for free