[Time Zone] Flink JDBC and CDC time field time zone testing and time benchmark

Related articles:
A brief analysis of the relationship between various time types and timezone

1. Test purpose and value

1. Test the time zone of the general database without time zone type.

  • mysql timestamp(3) type
  • postgres timestamp(3) type
  • sqlserver datetime2(3) type
  • oracle type TIMESTAMP(3) type
    In the following tests, they are all ts fields.

2. Test metadata op_ts time zone in CDC

op_tsTIMESTAMP_LTZ(3) NOT NULL The time when the current record table was updated in the database. This value will always be 0 if records are read from a snapshot of the table instead of the binlog. |

In the following tests, the cdc table creation table is represented by ts_ms TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL.
CDC has two stages when reading the table:

  1. Full reading stage, characterized by jdbc reading, op=r in reading data
  2. Incremental reading stage, characterized by log reading, reading data op=c or u or d
    In the screenshot, op can be seen as 3="r" or 3="r", 3 is the index value of the op field.
    ts_ms reads data in the full phase and becomes READ data
    ts_ms reads data in the incremental phase and becomes CREATE data

3. Flink data time representation and time zone

Time in flink Table must be represented by org.apache.flink.table.data.TimestampData object.

@PublicEvolving
public final class TimestampData implements Comparable<TimestampData> {<!-- -->
    private final long millisecond;
    private final int nanoOfMillisecond;
}

This type uses the following two values to jointly represent the recording time. Time zone data is not logged.

Practical test:

@Test
public void testTimeZone(){<!-- -->
  
    // Common sense: Epoch is the 0 time point of utc, which is the global absolute time point. It is essentially the 0 time under `ZoneOffset.of(" + 0")`. Considered equivalent to `January 1, 1970, 00:00:00 GMT`.
    // GMT is the former universal time, and UTC is the current universal time. UTC is more accurate than GMT and uses atomic time to adapt to the precise timing of modern society.
    // 28800000=8*3600*1000. 8 hours millisecond value.
  
    // The following time is the time stored in the + 8 time zone database without time zone: 2023-09-28T09:43:20.320
    long ts=1695894200320L;
  
    // If ts is converted into a string as UTC time 0, it will result in time + 8 hour. 2023-09-28 17:43:20. This is a result of commonly used online conversion times. Because the default is epoch time, it will be + 8h after conversion.
    // It can be seen that the millisecond value without timezone time read from the database is not based on utc0 time (epoch), but on the current time zone 0.
  
    // The LocalDateTime object essentially supports two objects, LocalDate and LocalTime. LocalDate holds the `year`, `month`, and `day` of Integer. LocalTime holds Integer's hours, minutes, seconds, etc., which are different from the java.util.Date type.
    // The LocalDateTime method with ZoneOffset is difficult to understand, here:
    // Of course, the value of epochSecond is the number of seconds in the epoch, which corresponds to the absolute time concept and `java.util.Date.getTime()/1000`, and offset refers to the amount of time that this epoch second needs to be offset.
    //The internal code is `long localSecond = epochSecond + offset.getTotalSeconds();`.
  
    // The following code is correct, because the `java.util.Date` class and `java.sql.Timestamp` type in java are classes that hold absolute time, and `Date.getTime` obtains milliseconds relative to Epoch. Value (Returns the number of milliseconds since January 1, 1970, 00:00:00 GMT).
    LocalDateTime ldtFromDate = LocalDateTime.ofEpochSecond(new Date().getTime() / 1000, 0, ZoneOffset.of(" + 8"));
    System.out.println(ldtFromDate); // 2023-09-28T16:16:45. The clock is also 16:17:44 at this time.
    Date date0 = new Date(0); // number of milliseconds since the standard base time known as "the epoch"
    System.out.println(date0.getTime()); // 0, date0.getTime() method returns absolute timeReturns the number of milliseconds since January 1, 1970, 00:00:00 GMT
  
    // Providing `ZoneOffset.UTC` as follows can be understood as telling LocalDateTime that the epochSecond I provided is already `localSecond=local time - 0 o'clock local time` and no further conversion is required.
    LocalDateTime ldt0 = LocalDateTime.ofEpochSecond(0L, 0, ZoneOffset.UTC);
    System.out.println(ldt0); // 1970-01-01T00:00
    LocalDateTime ldt8 = LocalDateTime.ofEpochSecond(0L, 0, ZoneOffset.of(" + 8"));
    System.out.println(ldt8); // 1970-01-01T08:00
  
    // TimestampData does not perform any time zone conversion by default. No time zone information is stored either. Internally only `long millisecond` and `int nanoOfMillisecond` are used to store information for easy serialization.
    // millisecond can generally be considered as local time. Because there will be no time zone conversion in the toString method, the toString method only calls `toLocalDateTime()`, performs simple operations, and finally calls the `LocalDateTime.toString` method.
    TimestampData td0 = TimestampData.fromEpochMillis(0); // Equivalent to LocalDateTime.ofEpochSecond(0, 0, ZoneOffset.UTC).
    System.out.println(td0); // 1970-01-01T00:00. It can be seen that the time when the TimestampData output is converted to a string is based on utc time, which is consistent with the java.util.Date type.
  
    LocalDateTime ldt = LocalDateTime.ofEpochSecond(
            ts/1000
            , (int) (ts % 1000 * 1_000_000)
            , ZoneOffset.UTC);
    System.out.println(ldt); // 2023-09-28T09:43:20.320
    TimestampData td = TimestampData.fromEpochMillis(ts);
    System.out.println(td); // 2023-09-28T09:43:20.320
  
    Date date = new Date(ts); // Note: The parameter date (the specified number of milliseconds since the standard base time known as "the epoch") should be epoch but at this time ts is not the epoch base but the local local base. of.
    System.out.println(date); // Thu Sep 28 17:43:20 CST 2023, CST is Beijing time, and the time zone conversion is performed in the toString method `BaseCalendar.Date date = normalize();` which is + 8 already.
}

4. Test component version

  • flink 1.13
  • flink-cdc 2.2.1
  • flink-connector-jdbc is customized and modified according to the 3.1.1-1.17 version.

2. This test tests four major databases:

  • mysql
  • postgres
  • sqlserver
  • oracle

2. 8 tests for each database:

  • database-SQL
    Reading data directly from the data is the benchmark value for testing
  • cdc-RowData
    Use cdc’s SQL API to read values from the database and debug the data in the com.ververica.cdc.debezium.table.AppendMetadataCollector#collect method
  • cdc-SQL (test fields except ts_ms)
    Use cdc’s SQL API to read the value and use flink sql-client to query, used to test fields except ts_ms. Because the accuracy of ts_ms needs to be discussed in two situations.
  • cdc-SQL-RealTime (test ts_ms)
    Use CDC’s SQL API to read values. The upper left corner is the system time, and the lower side is the real-time read data.
  • cdc-Read data (test snapshot reads ts_ms field)
    The test snapshot reads the ts_ms field, which is the ts_ms value of the full read stage. According to the official explanation of flink-cdc, the full stage values of these four data are all 0 (1970-01-01 00:00:00). Non-0 means incorrect.
  • cdc-Create data (test incremental reading ts_ms field)
    The test incremental reads the ts_ms field, which is the ts_ms value in the incremental reading phase. According to the official explanation of flink-cdc, the incremental phase value of these four data is the data log recording time.
  • jdbc-RowData
    Use flink SQL API to read the table data of org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat#nextRecord whose connector is jdbc and get the data by debugging in the method. . Does not contain tm_ms data.
  • jdbc-SQL
    Use flink SQL API to read table data whose connector is jdbc. Use flink sql-client to query. . Does not contain tm_ms data.

3. Test process data

3.1 mysql

3.1.1 database-SQL

3.1.2 cdc-RowData

3.1.3 cdc-SQL (test fields except ts_ms)

![[image-20230927163847043.png|201]]

3.1.4 cdc-SQL-RealTime (test ts_ms)

As follows: upper side (screenshot of win system display time), lower side (ts_ms of cdc-query)
If they are basically consistent (not a difference of 8h), it means that the ts_ms of cdc-query is correct.
![[image-20230928132434484.png|325]]

3.1.5 cdc-Read data (test snapshot reads ts_ms field)

![[image-20230928100333641.png]]

3.1.6 cdc-Create data (test incremental reading of ts_ms field)

![[image-20230928101529479.png]]

3.1.7 jdbc-RowData

![[image-20230927172538194.png]]

3.1.8 jdbc-SQL

![[image-20230927171613530.png|206]]

3.2 postgres

3.2.1 database-SQL

![[image-20230927145744323.png]]

3.2.2 cdc

cdc-RowData
![[image-20230927145825569.png]]

3.2.3 cdc-SQL (test fields except ts_ms)

![[image-20230927151801248.png|200]]

3.2.4 cdc-SQL-RealTime (test ts_ms)

![[image-20230928132850256.png|325]]

3.2.5 cdc-Read data (test snapshot reads ts_ms field)

![[image-20230928095911025.png]]

3.2.6 cdc-Create data (test incremental reading of ts_ms field)

![[image-20230928101453266.png]]

3.2.7 jdbc

jdbc-RowData
![[image-20230927173637049.png]]

3.2.8 jdbc-SQL

![[image-20230927173456643.png|212]]

3.3 sqlserver

3.3.1 database-SQL

![[image-20230927163637993.png]]

3.3.2 cdc-RowData

![[image-20230927163611807.png]]

3.3.3 cdc-SQL (test fields except ts_ms)

![[image-20230927163808365.png|192]]

3.3.4 cdc-SQL-RealTime (test ts_ms)

![[image-20230928133349412.png|350]]

3.3.5 cdc-Read data (test snapshot reads ts_ms field)

![[image-20230928094006306.png]]

3.3.6 cdc-Create data (test incremental reading of ts_ms field)

![[image-20230928101415704.png]]

3.3.7 jdbc-RowData

![[image-20230927174904854.png]]

3.3.8 jdbc-SQL

![[image-20230927182456589.png|194]]

3.4 oracle

3.4.1 database-SQL

![[image-20230927160526864.png]]

3.4.2 cdc-RowData

![[image-20230927160425443.png]]

3.4.3 cdc-SQL (test fields except ts_ms)

![[image-20230927160753056.png|191]]

3.4.3 cdc-SQL-RealTime (test ts_ms)

![[image-20230928133736851.png|400]]

3.4.4 cdc-Read data (test snapshot reads ts_ms field)

![[image-20230928101223538.png]]

3.4.5 cdc-Create data (test incremental reading of ts_ms field)

![[image-20230928101030948.png]]

3.4.7 jdbc-RowData

![[image-20230927183056565.png]]

3.4.8 jdbc-SQL

![[image-20230927182935788.png|203]]

4. Conclusion

(1) The without time zone obtained from the database is stored in local time in flink. It can be obtained directly using LocalDateTime.ofEpochSecond(long epochSecond, int nanoOfSecond, ZoneOffset.UTC).
(2) What is stored in TimestampData in Flink can generally be considered as local time. But please note: TimestampData cannot mix instant-related methods with localDateTime and Timestamp-related methods. Because instant represents the time difference from epoch. The latter two represent the time difference from local.
(3) The standard values of time in Flink programs are all local. Because the results printed in the Sql API (sql-client) will be consistent with those printed in the original database.

As shown in the figure below, the data in red font is incorrect data. Using CDC requires extra attention and conversion.
![[image-20230928164847790.png]]

5. Appendix

5.1 Query database time zone SQL

--mysql is based on: time_zone, system_time_zone to the server time zone
show variables like '%time_zone%';

--postgres
show time zone;

-- sqlserver
DECLARE
@TimeZone NVARCHAR(255)
EXEC
master.dbo.xp_instance_regread
N'HKEY_LOCAL_MACHINE'
,
N'SYSTEM\CurrentControlSet\Control\TimeZoneInformation'
,
N'TimeZoneKeyName'
,
@TimeZone
OUTPUT
SELECT
@TimeZone

-- oracle
select dbtimezone from dual;