Big data processing and analysis solution based on Kettle + StarRocks + FineReport

Big data processing and analysis solution of Kettle + StarRocks + FineReport

Among them, Kettle is responsible for the ETL processing of data, StarRocks is responsible for the storage and retrieval of massive data, and FineReport is responsible for data visualization. exhibit. The overall process is as follows:


If you don’t understand the above three components, you can refer to the following article first:

Introduction and basic use of Kettle

Introduction and use of StarRocks extremely fast full-scenario MPP database

FineReport quickly designs linked reports

1. Experimental data and data planning

COVID-19, referred to as “new coronavirus pneumonia”, is named by the World Health Organization as “2019 coronavirus disease” [1-2], which refers to 2019Pneumonia caused by novel coronavirus infection. The current U.S. No. 2021-01-28 provides cumulative case information of the new coronavirus epidemic in each county county, including confirmed cases and deaths. The data format is as follows:

date (date), county (county), state (state), fips (county code), cases (cumulative confirmed cases), deaths (cumulative death cases)
2021-01-28,Pike ,Alabama,01109,2704,35
2021-01-28,Randolph,Alabama,01111,1505,37
2021-01-28,Russell,Alabama,01113,3675,16
2021-01-28, Shelby ,Alabama,01117,19878,141
2021-01-28,St. Clair,Alabama,01115,8047,147
2021-01-28, Sumter ,Alabama,01119,925,28
2021-01-28,Talladega,Alabama,01121,6711,114
2021-01-28,Tallapoosa,Alabama,01123,3258,112
2021-01-28, Tuscaloosa,Alabama,01125,22083,283
2021-01-28,Walker,Alabama,01127,6105,185
2021-01-28,walker,Alabama,01129,1454,27

Data set download:

https://download.csdn.net/download/qq_43692950/86805389

Data planning and table design

The final presentation hopes to count the total number and maximum values of confirmed cases and deaths according to county and state respectively, and display them in the form of charts.

Consider using StarRocks aggregate models and detailed models:

-- County aggregation table
DROP TABLE IF EXISTS agg_county;
CREATE TABLE IF NOT EXISTS agg_county (
    county VARCHAR(255) COMMENT "COUNTY",
    cases_sum BIGINT SUM DEFAULT "0" COMMENT "Total number of confirmed cases",
    cases_max BIGINT MAX DEFAULT "0" COMMENT "Maximum confirmed value",
deaths_sum BIGINT SUM DEFAULT "0" COMMENT "Total Deaths",
    deaths_max BIGINT MAX DEFAULT "0" COMMENT "MAXIMUM DEATH"
)
DISTRIBUTED BY HASH(county) BUCKETS 8;

--State aggregation table
DROP TABLE IF EXISTS agg_state;
CREATE TABLE IF NOT EXISTS agg_state (
    state VARCHAR(255) COMMENT "state",
    cases_sum BIGINT SUM DEFAULT "0" COMMENT "Total number of confirmed cases",
    cases_max BIGINT MAX DEFAULT "0" COMMENT "Maximum confirmed value",
deaths_sum BIGINT SUM DEFAULT "0" COMMENT "Total Deaths",
    deaths_max BIGINT MAX DEFAULT "0" COMMENT "MAXIMUM DEATH"
)
DISTRIBUTED BY HASH(state) BUCKETS 8;

--list
DROP TABLE IF EXISTS covid;
CREATE TABLE IF NOT EXISTS covid (
county VARCHAR(255) COMMENT "COUNTY",
    date DATE COMMENT "date",
    state VARCHAR(255) COMMENT "state",
    fips VARCHAR(255) COMMENT "County code code",
    cases INT(10) COMMENT "Cumulative confirmed cases",
    deaths INT(10) COMMENT "Cumulative death cases"
)
DUPLICATE KEY(county)
DISTRIBUTED BY HASH(county) BUCKETS 8;

2. ETL processing

2.1 ETL Overall design:

2.2 Detailed processing process

  1. CSV file input

  1. Field selection

  1. The string is not empty, the same applies to state and county:

  1. String operations

  1. Sort records

  1. Remove duplicate records

  1. Table output:

2.3 ETL processing time:

It can be clearly seen that the writing speed is very slow!

2.4 What to do if the writing speed is very slow

StarRocks does not recommend writing data in small batches of INSERT. For continuous writing, you can use Kafka or MySQL for transfer. The following uses kafka as an example:

Official example: https://docs.starrocks.io/zh-cn/latest/loading/RoutineLoad

Clear data first

truncate table covid;
truncate table agg_state;
truncate table agg_county;

Create a kafka continuous import task:

-- covid data access
CREATE ROUTINE LOAD covid_load ON covid
COLUMNS TERMINATED BY ",",
COLUMNS (date,fips,cases,deaths,county,state)
PROPERTIES
(
    "desired_concurrent_number" = "5"
)
FROM KAFKA
(
    "kafka_broker_list" = "192.168.40.1:9092,192.168.40.2:9092,192.168.40.3:9092",
    "kafka_topic" = "starrocks_covid",
    "kafka_partitions" = "0,1,2",
    "property.kafka_default_offsets" = "OFFSET_END"
);

-- agg_state data access
CREATE ROUTINE LOAD agg_state_load ON agg_state
COLUMNS TERMINATED BY ",",
COLUMNS (state,deaths_sum,deaths_max,cases_sum,cases_max)
PROPERTIES
(
    "desired_concurrent_number" = "5"
)
FROM KAFKA
(
    "kafka_broker_list" = "192.168.40.1:9092,192.168.40.2:9092,192.168.40.3:9092",
    "kafka_topic" = "starrocks_agg_state",
    "kafka_partitions" = "0,1,2",
    "property.kafka_default_offsets" = "OFFSET_END"
);

-- agg_county data access
CREATE ROUTINE LOAD agg_county_load ON agg_county
COLUMNS TERMINATED BY ",",
COLUMNS (county,deaths_sum,deaths_max,cases_sum,cases_max)
PROPERTIES
(
    "desired_concurrent_number" = "5"
)
FROM KAFKA
(
    "kafka_broker_list" = "192.168.40.1:9092,192.168.40.2:9092,192.168.40.3:9092",
    "kafka_topic" = "starrocks_agg_county",
    "kafka_partitions" = "0,1,2",
    "property.kafka_default_offsets" = "OFFSET_END"
);

ETL Modification:

Mainly replace the table output with Concat fields and kafka producer:

Concat fields

kafka producer:

Run again to view the time taken by ETL:

Nearly 1000 times faster.

3. FineReport visual design

  1. New decision report:

  2. Drag into chart

  1. Define database connection

  2. Define database queries


    select state,deaths_sum from agg_state ORDER BY deaths_sum DESC limit 10
    

    Likewise add:

    Top 10 cumulative confirmed cases in the state:

    select state,cases_sum from agg_state ORDER BY cases_sum DESC limit 10
    

    Top 10 highest confirmed cases in each state:

    select state,cases_max from agg_state ORDER BY cases_max DESC limit 10
    

    Top 10 highest number of deaths in each state:

    select state,deaths_max from agg_state ORDER BY deaths_max DESC limit 10
    
  3. Top 10 cumulative total deaths in the state Bound data

  4. Binded data of the top 10 cumulative confirmed cases in the state

  5. Set up two other charts simultaneously

  6. Generate preview link:

  7. Display effect:

4. How to respond to demand modifications

Suppose we now need to count the average number of deaths in each state. How can we modify it efficiently and at low cost?

Answer: You can use asynchronous materialized views based on the detailed table to achieve the pre-aggregation effect.

Official description: https://docs.starrocks.io/zh-cn/latest/using_starrocks/Materialized_view

CREATE MATERIALIZED VIEW agg_state_view
DISTRIBUTED BY HASH(state) BUCKETS 8 AS
SELECT state,sum(deaths) AS deaths_max, COUNT(county) AS num FROM covid GROUP BY state

Note: Neither aggregate models nor materialized views support avg in StarRocks.

When querying in FineReport:

select state, deaths_max/num from agg_state_view

Thinking: When you have a materialized view and perform the same aggregation operation on the detail table, will the entire table still be scanned?

Answer: No more

For example:

EXPLAIN
SELECT state,sum(deaths) AS deaths_max, COUNT(county) AS num FROM covid GROUP BY state

As you can see below, it automatically switches to view: