What you need to know about TDengine 3.0 data subscription

Little T Introduction: In order to help applications obtain data written to the Time Series Database (Time Series Database) TDengine in real time, or process data in the order of event arrival, TDengine provides a data subscription and consumption interface similar to message queue products. In this way, in many scenarios, the time series data processing system using TDengine does not need to integrate message queue products such as Kafka, thus simplifying the complexity of system design and reducing operation and maintenance costs. TDengine 3.0 has optimized and upgraded the data subscription function. This article will introduce its syntax rules in detail to facilitate the use of developers and enterprises.

Like Kafka, you also need to define a topic when applying TDengine, but TDengine’s topic is based on the query condition of an existing super table, subtable or ordinary table, that is, a SELECT statement. You can use SQL to filter conditions such as labels, table names, columns, expressions, and perform scalar functions and UDF calculations on data (excluding data aggregation). Compared with other message queue software, this is the biggest advantage of TDengine’s data subscription function. It provides greater flexibility. The granularity of data can be adjusted by the application at any time, and the filtering and preprocessing of data is left to TDengine. , effectively reducing the amount of data transmitted and the complexity of the application.

After a consumer subscribes to a topic (a consumer can subscribe to multiple topics), the latest data can be obtained in real time. Multiple consumers can form a consumer group (consumer group). Multiple consumers in a consumer group share the consumption progress, which facilitates multi-threaded and distributed consumption of data and improves consumption speed; however, consumers in different consumer groups Even if consumers consume the same topic, they do not share the consumption progress. If you subscribe to a super table, the data may be distributed on multiple different vnodes, that is, multiple shards. In this way, having multiple consumers in a consumer group can improve consumption efficiency. TDengine’s message queue provides an ACK mechanism for messages, ensuring at least once consumption even in complex environments such as downtime and restart.

In order to achieve the above functions, TDengine automatically creates indexes for WAL (Write-Ahead-Log) files to support fast random access, and provides a flexible and configurable file switching and retention mechanism. Users can specify the retention time and duration of WAL files as needed. size:

  • WAL_RETENTION_PERIOD: The maximum length of time policy for additional retention of WAL log files for data subscription consumption. WAL log cleaning is not affected by the consumption status of subscribing clients. The unit is s, and the default is 3600, which means the last 3600 seconds of data are retained in WAL. Users can modify this parameter to an appropriate value according to the needs of data subscription.
  • WAL_RETENTION_SIZE: Maximum cumulative size policy that requires additional retention of WAL log files for data subscription consumption. The unit is KB, and the default is 0, which means there is no upper limit on the cumulative size.

Through the above method, we transformed WAL into a durable storage engine that retains the order of event arrival (but because TSDB has a much higher compression rate than WAL, it is not recommended to retain it for too long. Generally speaking, it is not recommended to more than a few days). For queries created in the form of topics, TDengine will connect to WAL instead of TSDB as its storage engine. When consuming, TDengine directly reads data from WAL according to the current consumption progress, uses a unified query engine to implement filtering, transformation and other operations, and pushes the data to consumers.
In order to facilitate everyone’s hands-on operation, the syntax related to TDengine data subscription will be explained in detail below.

Write data

First complete the database creation, a super table and multiple sub-table operations, and then you can write data, for example:

DROP DATABASE IF EXISTS tmqdb;
CREATE DATABASE tmqdb;
CREATE TABLE tmqdb.stb (ts TIMESTAMP, c1 INT, c2 FLOAT, c3 VARCHAR(16) TAGS(t1 INT, t3 VARCHAR(16));
CREATE TABLE tmqdb.ctb0 USING tmqdb.stb TAGS(0, "subtable0");
CREATE TABLE tmqdb.ctb1 USING tmqdb.stb TAGS(1, "subtable1");
INSERT INTO tmqdb.ctb0 VALUES(now, 0, 0, 'a0')(now + 1s, 0, 0, 'a00');
INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now + 1s, 11, 11, 'a11');

Create topic

TDengine uses SQL to create the topic as shown below (the number of topics created has an upper limit, controlled by the parameter tmqMaxTopicNum, and the default is 20):

CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1;

TMQ supports the following subscription types:

Column subscription

CREATE TOPIC topic_name as subquery

Subscription through SELECT statement (including SELECT *, or subscription to specified columns such as SELECT ts, c1, etc., which can carry conditional filtering and scalar function calculation, but does not support aggregate functions and time window aggregation). But what needs to be noted is:

  • Once the type TOPIC is created, the structure of the subscription data is determined;
  • Columns or labels that are subscribed or used for calculation cannot be deleted (ALTER table DROP) or modified (ALTER table MODIFY);
  • If the table structure changes, the newly added columns will not appear in the results.

Super table subscription

CREATE TOPIC topic_name AS STABLE stb_name

The difference from SELECT * from stbName subscription is:

  • Users’ table structure changes will not be restricted.
  • What is returned is unstructured data: the structure of the returned data will change as the table structure of the super table changes.
  • The with meta parameter is optional. When selected, statements such as creating super tables and sub-tables will be returned. It is mainly used for super table migration in taosx.
  • The where_condition parameter is optional. When selected, it will be used to filter sub-tables that meet the conditions and subscribe to these sub-tables. There cannot be ordinary columns in the where condition, it can only be tag or tbname. Functions can be used in the where condition to filter tags, but they cannot be aggregate functions because the tag values of subtables cannot be aggregated. It can also be a constant expression, such as 2 > 1 (subscribe to all subtables), or false (subscribe to 0 subtables).
  • The returned data does not contain labels.

Database subscription

CREATE TOPIC topic_name [WITH META] AS DATABASE db_name;

This statement can be used to create a subscription that contains data from all tables in the database. The with meta parameter is optional, as above.

Create a consumer

Subscribe to topics

A consumer supports subscribing to multiple topics at the same time. Take Java as an example:

List<String> topics = new ArrayList<>();
topics.add("tmq_topic");
consumer.subscribe(topics);

Consumption

How to consume TMQ messages in Java language, the code is as follows:

while(running){
  ConsumerRecords<Meters> meters = consumer.poll(Duration.ofMillis(100));
    for (Meters meter : meters) {
      processMsg(meter);
    }
}

End consumption

After consumption is completed, the subscription should be canceled.

/* Cancel subscription */
tmq_unsubscribe(tmq);

/* Close the consumer object */
tmq_consumer_close(tmq);

Delete topic

If you no longer need the subscription data, you can delete the topic. Note that only topics that are not currently in the subscription can be deleted.

/* Delete topic */
DROP TOPIC topic_name;

Status view

1. Topics: Query the created topics

SHOW TOPICS;

2. Consumers: Query the status of the consumer and the topics it subscribes to

SHOW CONSUMERS;

3. Subscriptions: Query the allocation relationship between consumer and vgroup

SHOW SUBSCRIPTIONS;

Write at the end

Due to the length limit of the article, this article only shares the specific implementation of part of the syntax. If you need to know the relevant settings and code examples in more languages, you can go to the TDengine official website to query the relevant documents for data subscription. For more complex application problems, you are also welcome to join the TDengine developer exchange group (add little T vx: tdengine) and seek help directly from the community technical support personnel.