Warehouse data incremental update loading algorithm (supports chaotic date batch running)

1. Database construction and test data insertion script

--Create incremental update target table
--Create table
create table EDW_T100_BAL_IU
(ID VARCHAR2(8) not null,
  BAL NUMBER(22,2),
  UPDATE_DT VARCHAR2(8)
);
--Add comments to the table
comment on table EDW_T100_BAL_IU
  is 'Balance (incremental update mode loading) table';
-- Add comments to the columns
comment on column EDW_T100_BAL_IU.ID
  is 'primary key ID';
comment on column EDW_T100_BAL_IU.BAL
  is 'balance';
comment on column EDW_T100_BAL_IU.UPDATE_DT
  is 'update date';
-- Create/Recreate primary, unique and foreign key constraints
alter table EDW_T100_BAL_IU
  add constraint EDW_T100_BAL_IU_ID primary key (ID);
--Create a temporary table for incremental updates
--Create table
create global temporary table TMP_T100_BAL_IU
(ID VARCHAR2(8) not null,
  BAL NUMBER(22,2),
  UPDATE_DT VARCHAR2(8) not null
)
on commit delete rows;
-- Create/Recreate primary, unique and foreign key constraints
alter table TMP_T100_BAL_IU
  add constraint TMP_T100_BAL_IU_ID_UPDT primary key (ID, UPDATE_DT);
--Create log table
create table EDW_ETL_LOG_DETAIL
(PROC_NAME VARCHAR2(50),
  P_ETLDATE VARCHAR2(20),
  ETL_MEMO VARCHAR2(10),
  ETL_RECORD_NUM INTEGER,
  ERR_MSG VARCHAR2(1000),
  ERR_SQL VARCHAR2(1000),
  TABLE_NAME VARCHAR2(50),
  START_TIMESTAMP TIMESTAMP(6),
  END_TIMESTAMP TIMESTAMP(6)
);
--Create source tables and insert source test data
--Create table
create table ODS_CMIS_YE
(ID VARCHAR2(8) not null,
  BAL NUMBER(22,2),
  ODS_DATA_DATE VARCHAR2(8) not null
);
--Add comments to the table
comment on table ODS_CMIS_YE
  is 'ODS balance table (incremental update data provided)';
-- Add comments to the columns
comment on column ODS_CMIS_YE.ID
  is 'primary key ID';
comment on column ODS_CMIS_YE.BAL
  is 'balance';
comment on column ODS_CMIS_YE.ODS_DATA_DATE
  is 'ODS data date';
-- Create/Recreate primary, unique and foreign key constraints
alter table ODS_CMIS_YE
  add constraint ODS_CMIS_YE primary key (ID, ODS_DATA_DATE);
--insert test data
insert into ods_cmis_ye (ID, BAL, ODS_DATA_DATE) values ('4', 34545.09, '20101202');
insert into ods_cmis_ye (ID, BAL, ODS_DATA_DATE) values ('1', 10.10, '20101201');
insert into ods_cmis_ye (ID, BAL, ODS_DATA_DATE) values ('2', 20.30, '20101201');
insert into ods_cmis_ye (ID, BAL, ODS_DATA_DATE) values ('3', 22.10, '20101204');
insert into ods_cmis_ye (ID, BAL, ODS_DATA_DATE) values ('4', 33.80, '20101205');
insert into ods_cmis_ye (ID, BAL, ODS_DATA_DATE) values ('5', 66.09, '20101203');
insert into ods_cmis_ye (ID, BAL, ODS_DATA_DATE) values ('6', 8889.08, '20101204');
insert into ods_cmis_ye (ID, BAL, ODS_DATA_DATE) values ('7', 2324.07, '20101205');
insert into ods_cmis_ye (ID, BAL, ODS_DATA_DATE) values ('8', 232449.98, '20101201');
insert into ods_cmis_ye (ID, BAL, ODS_DATA_DATE) values ('9', 3434.99, '20101201');
insert into ods_cmis_ye (ID, BAL, ODS_DATA_DATE) values ('1', 23.99, '20101202');
insert into ods_cmis_ye (ID, BAL, ODS_DATA_DATE) values ('2', 20.30, '20101202');
insert into ods_cmis_ye (ID, BAL, ODS_DATA_DATE) values ('1', 3333.98, '20101203');
insert into ods_cmis_ye (ID, BAL, ODS_DATA_DATE) values ('10', 3453252.09, '20101203');
insert into ods_cmis_ye (ID, BAL, ODS_DATA_DATE) values ('11', 2343432.78, '20101202');
commit;

2. Incremental update sample code

CREATE OR REPLACE PROCEDURE P_T100_BAL_IU
 (P_ETLDATE IN VARCHAR2, --Date parameter
 O_RUNSTATUS OUT NUMBER, --execution result
 O_MSG OUT VARCHAR2 -- error return
 )AS

--Procedure Name:P_T100_BAL_IU
--Author :nisj
--Script File :P_T100_BAL_IU.SQL
/*############################################## ###*/
--Souce Databse.table :
--ODS_CMIS_YE ODS balance table (incremental update data is provided)
/*############################################## ###*/
--Target Databse.table :
--EDW_T100_BAL_IU balance (loaded in incremental update mode) table
/*############################################## ###*/
--Loading method: incremental update

 --Define stored procedure information
 V_PROC_NAME VARCHAR2(50) := 'P_T100_BAL_IU';
 V_TABLE_NAME VARCHAR2(50) := 'EDW_T100_BAL_IU';
 V_START_TIMESTAMP TIMESTAMP; --Loading start time
 V_END_TIMESTAMP TIMESTAMP; --loading end time
 V_RECORD_NUMBER INTEGER; --Number of records
 --Define error code, error status
 V_SQLERRM VARCHAR2(1000); --Exception information
 V_ERR_SQL VARCHAR2(1000); --Error location

  BEGIN
      --Capture process start time
      SELECT SYSDATE INTO V_START_TIMESTAMP FROM DUAL;
      --Data processing after incremental update of P_ETLDATE
      --TMP_T100_BAL_IU stores all temporary data in the EDW table that is greater than or equal to the update date. The greater than or equal to the update date is obtained from the backup, and the equal to is obtained when loading the data of the current day.
      --The data backup condition greater than the batch running date is Update_Dt > P_ETLDATE, and when deleting the target table, it is Update_Dt >= P_ETLDATE
      V_ERR_SQL := 'INSERT INTO TMP_T100_BAL_IU FROM EDW_T100_BAL_IU WHERE Update_Dt > P_ETLDATE';
      INSERT INTO TMP_T100_BAL_IU
      SELECT *
        FROM EDW_T100_BAL_IU WHERE Update_Dt > P_ETLDATE;
      V_ERR_SQL := 'DELETE FROM EDW_T100_BAL_IU WHERE Update_Dt >= P_ETLDATE';
      DELETE FROM EDW_T100_BAL_IU WHERE Update_Dt >= P_ETLDATE;

      --Load the full data of the day
      V_ERR_SQL := 'INSERT INTO TMP_T100_BAL_IU FROM ODS_CMIS_YE';
      INSERT INTO TMP_T100_BAL_IU--This table has no primary key, but the logical primary key is the composite primary key of (ID, UPDATE_DT)
              (ID --primary key ID
              ,BAL --Balance
              ,UPDATE_DT --Update date
              )
      SELECT a1.ID --primary key ID
              ,a1.BAL --Balance
              ,a1.ODS_DATA_DATE --ODS data date
      FROM ODS_CMIS_YE a1
      WHERE Ods_Data_Date = P_ETLDATE
      ;


      --incremental update operation
      --Delete and update some records by primary key
      V_ERR_SQL := 'DELETE FROM EDW_T100_BAL_IU WHERE (ID) IN TMP_T100_BAL_IU';
      DELETE FROM EDW_T100_BAL_IU
      WHERE (ID)
      IN (SELECT ID
                 FROM TMP_T100_BAL_IU
              )
      ;

      --Insert new and updated data
      V_ERR_SQL := 'INSERT INTO EDW_T100_BAL_IU WHERE TMP_T100_BAL_IU [ID,MAX(UPDATE_DT)]';
      INSERT INTO EDW_T100_BAL_IU
              (ID --primary key ID
              ,BAL --Balance
              ,UPDATE_DT --Update date
              )
      SELECT ID --primary key ID
              ,BAL --Balance
              ,UPDATE_DT --Update date
      FROM TMP_T100_BAL_IU
      WHERE (ID,UPDATE_DT)
      IN (SELECT ID,MAX(UPDATE_DT)
                 FROM TMP_T100_BAL_IU
                GROUP BY ID
              )
      ;

      --Normal processing
      V_RECORD_NUMBER := SQL%ROWCOUNT;
      SELECT SYSDATE INTO V_END_TIMESTAMP FROM dual;
      INSERT INTO EDW_ETL_LOG_DETAIL(START_TIMESTAMP,END_TIMESTAMP,PROC_NAME,TABLE_NAME,ETL_RECORD_NUM,ETL_MEMO,P_ETLDATE)
      VALUES (V_START_TIMESTAMP,V_END_TIMESTAMP,V_PROC_NAME,V_TABLE_NAME,V_RECORD_NUMBER,'Success',P_ETLDATE);
      COMMIT;

      --Exception handling
      EXCEPTION WHEN OTHERS THEN
      BEGIN
             ROLLBACK;
             V_SQLERRM := SQLERRM;
             INSERT INTO EDW_ETL_LOG_DETAIL(START_TIMESTAMP,END_TIMESTAMP,PROC_NAME,TABLE_NAME,ETL_RECORD_NUM,ETL_MEMO,ERR_MSG,ERR_SQL,P_ETLDATE)
             VALUES (V_START_TIMESTAMP,V_END_TIMESTAMP,V_PROC_NAME,V_TABLE_NAME,0,'Failed',V_SQLERRM,V_ERR_SQL,P_ETLDATE);
             O_RUNSTATUS := 1;
             O_MSG := 'PROGRAMMING ERROR HAPPENED';
             COMMIT;
      END;
  END;
/

3. Result test

Incremental_Update
truncate table edw_t100_bal_iu;
select * from ods_cmis_ye;
select * from edw_t100_bal_iu;
--No matter how the batch is adjusted, as long as it is adjusted every day, edw_t100_bal_iu will be consistent with the result obtained by the following SQL!
SELECT *
  FROM ODS_CMIS_YE
 WHERE ODS_DATA_DATE <= '20101205'
   AND (ID, ODS_DATA_DATE) IN
       (SELECT ID, MAX(ODS_DATA_DATE) FROM ODS_CMIS_YE GROUP BY ID);

p_t100_bal_iu;
p_t100_bal_iu_new;
select * from edw_etl_log_detail order by 8 desc;

4. Summary

This process code, for example, if the data on 12.5 has been run but there is a problem with the source data on 12.1, we only need to reschedule the batch for 12.1, instead of running all the way from 12.1 to that day, and It can ensure the integrity of the data; if it is found that there is a problem with the data on a certain day, and it needs to run from the day of the error to the current day, the code is as follows, which can be selected according to the actual situation.

CREATE OR REPLACE PROCEDURE P_T100_BAL_IU
 (P_ETLDATE IN VARCHAR2, --Date parameter
 O_RUNSTATUS OUT NUMBER, --execution result
 O_MSG OUT VARCHAR2 --Error return
 )AS

--Procedure Name:P_T100_BAL_IU
--Author :nisj
--Script File :P_T100_BAL_IU.SQL
/*############################################## ###*/
--Souce Databse.table :
--ODS_CMIS_YE ODS balance table (incremental update data is provided)
/*############################################## ###*/
--Target Databse.table :
--EDW_T100_BAL_IU balance (loaded in incremental update mode) table
/*############################################## ###*/
--Loading method: incremental update

 --Define stored procedure information
 V_PROC_NAME VARCHAR2(50) := 'P_T100_BAL_IU';
 V_TABLE_NAME VARCHAR2(50) := 'EDW_T100_BAL_IU';
 V_START_TIMESTAMP TIMESTAMP; --Loading start time
 V_END_TIMESTAMP TIMESTAMP; --loading end time
 V_RECORD_NUMBER INTEGER; --Number of records
 --Define error code, error status
 V_SQLERRM VARCHAR2(1000); --Exception information
 V_ERR_SQL VARCHAR2(1000); --Error location

  BEGIN
      --Capture process start time
      SELECT SYSDATE INTO V_START_TIMESTAMP FROM DUAL;
      --Incremental update history rollback
      DELETE FROM EDW_T100_BAL_IU WHERE Update_Dt >= P_ETLDATE;

      --Load the full data of the day
      V_ERR_SQL := 'INSERT INTO TMP_T100_BAL_IU FROM ODS_CMIS_YE';
      INSERT INTO TMP_T100_BAL_IU
              (ID --primary key ID
              ,BAL --Balance
              ,UPDATE_DT --Update date
              )
      SELECT a1.ID --primary key ID
              ,a1.BAL --Balance
              ,a1.ODS_DATA_DATE --ODS data date
      FROM ODS_CMIS_YE a1
      WHERE Ods_Data_Date = P_ETLDATE
      ;


      --incremental update operation
      --Delete and update some records by primary key
      DELETE FROM EDW_T100_BAL_IU
      WHERE (ID)
      IN (SELECT ID
              FROM TMP_T100_BAL_IU
              )
      ;

      --Add and update data in and out
      INSERT INTO EDW_T100_BAL_IU
              (ID --primary key ID
              ,BAL --Balance
              ,UPDATE_DT --Update date
              )
      SELECT ID --primary key ID
              ,BAL --Balance
              ,UPDATE_DT --Update date
      FROM TMP_T100_BAL_IU
      ;

      --Normal processing
      V_RECORD_NUMBER := SQL%ROWCOUNT;
      SELECT SYSDATE INTO V_END_TIMESTAMP FROM dual;
      INSERT INTO EDW_ETL_LOG_DETAIL(START_TIMESTAMP,END_TIMESTAMP,PROC_NAME,TABLE_NAME,ETL_RECORD_NUM,ETL_MEMO,P_ETLDATE)
      VALUES (V_START_TIMESTAMP,V_END_TIMESTAMP,V_PROC_NAME,V_TABLE_NAME,V_RECORD_NUMBER,'Success',P_ETLDATE);
      COMMIT;

      --Exception handling
      EXCEPTION WHEN OTHERS THEN
      BEGIN
             ROLLBACK;
             V_SQLERRM := SQLERRM;
             INSERT INTO EDW_ETL_LOG_DETAIL(START_TIMESTAMP,END_TIMESTAMP,PROC_NAME,TABLE_NAME,ETL_RECORD_NUM,ETL_MEMO,ERR_MSG,ERR_SQL,P_ETLDATE)
             VALUES (V_START_TIMESTAMP,V_END_TIMESTAMP,V_PROC_NAME,V_TABLE_NAME,0,'Failed',V_SQLERRM,V_ERR_SQL,P_ETLDATE);
             O_RUNSTATUS := 1;
             O_MSG := 'PROGRAMMING ERROR HAPPENED';
             COMMIT;
      END;
  END;
/