Article directory
-
- 16: Real-time computing requirements and technical solutions
- 17: Basic introduction to Flink
- 18: Code module construction
- 19: Province parsing tool test
- 20: Flink code interpretation
- 21: Flink real-time computing test
16: Real-time computing requirements and technical solutions
-
Goal: Understand real-time computing requirements and technical solutions
-
Path
- step1: real-time calculation requirements
- step2: technical solution
-
Implementation
-
Real-time computing requirements
-
Real-time statistics of total message volume
select count(*) from tbname;
-
Real-time statistics on the total volume of messages sent in each region
select sender_area,count(*) from tbname group by sender_area;
-
Real-time statistics of the total number of messages received in each region
select receiver_area,count(*) from tbname group by receiver_area;
-
Real-time statistics of the total number of messages sent by each user
select sender_account,count(*) from tbname group by sender_account;
-
Real-time statistics of the total number of messages received by each user
select receiver_account,count(*) from tbname group by receiver_account;
-
|
-
Build real-time statistical reports
-
-
Technical solutions
- Real-time collection: Flume
- Real-time storage: Kafka
- Real-time computing: Flink
- Real-time results: MySQL/Redis
- Real-time reporting: FineBI/JavaWeb visualization
-
-
Summary
- Understand real-time computing requirements and technology selection
17: Basic introduction to Flink
-
Goal: Understand the functions, features and application scenarios of Flink
-
Path
- step1: function
- step2: Features
- step3: Application
-
Implementation
Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.
-
Function: It can implement high-performance stateful distributed real-time computing for bounded or unbounded data flows based on any common cluster platform.
- Flink DataSet: Batch operations on bounded data
- Flink DataStream: perform real-time processing operations on unbounded data
- Flink Table: Implementing structured data processing based on DSL
- Flink SQL: Implementing structured data processing based on SQL
- Flink Gelly: Flink’s graph computing library
- Flink ML: Flink’s machine learning library
-
Features
- Support high throughput, low latency, and high performance stream processing
- Supports window operations with event time
- Supports exactly-once semantics for stateful computing
- Support highly flexible window (Window) operations, support window operations based on time, count, session, and data-driven
- Supports continuous flow model with Backpressure function
- Supports fault tolerance based on lightweight distributed snapshots (Snapshot)
- One runtime supports both Batch on Streaming processing and Streaming processing
- Flink implements its own memory management inside the JVM
- Supports iterative calculations
- Support automatic program optimization: avoid expensive operations such as Shuffle and sorting under certain circumstances, and intermediate results must be cached
-
Application: All real-time and offline data computing scenarios
- Data analysis applications: real-time data warehouse
- Batch analytics
- Streaming analytics
- Event-driven applications
- Fraud detection
- Anomaly detection
- Rule-based alerting
- Business process monitoring
- Web Applications (Social Networks)
- Data pipeline ETL
- Periodic ETL and Data Pipeline
- company
- Ali: https://developer.aliyun.com/article/72242
- Tencent: https://mp.weixin.qq.com/s/tyq6ZdwsgiuXYGi-VR_6KA
- Meituan: https://tech.meituan.com/2021/08/26/data-warehouse-in-meituan-waimai.html
- Youzan: https://tech.youzan.com/flink-practice/
- Oppo, iQiyi, Vipshop…
- Data analysis applications: real-time data warehouse
-
-
Summary
- Understand the functions, features and application scenarios of Flink
18: Code module construction
-
Goal: Achieve the construction of code modules in the development environment
-
Implementation
-
Import the code in the package into IDEA
-
flink package: Application class package, used to store actual application classes
-
MoMoFlinkCount: used to implement statistical calculations for each requirement
-
MySQLSink: used to write calculated results into MySQL
-
-
pojo package: Entity class package, used to store all entity classes
-
MoMoCountBean: used to encapsulate the results of statistical analysis
private Integer id; //result id private Long moMoTotalCount; //Total number of messages private String moMoProvince; //Province private String moMoUsername; //User private Long moMo_MsgCount; //number of messages //Result type: 1-Total number of messages 2-Number of messages sent by each province 3-Number of messages received by each province 4-Number of messages sent by each user 5-Number of messages received by each user private String groupType;
-
-
utils package: tool package, used to store all tool classes
-
HttpClientUtils: Tool class used to resolve longitude and latitude addresses into provinces
public static String findByLatAndLng(String lat, String lng)
- Parameters: longitude, latitude
- Return value: province
-
-
-
Summary
- Implement code module construction for development environment
19: Province parsing tool test
-
Goal: Understand the implementation of province parsing
-
Path
- step1: basic design
- step2: Register as a Baidu developer
- step3: Test province analysis
-
Implementation
-
Basic design
- Business scenario: Obtain the user’s country, province, and city information based on IP or latitude and longitude analysis
- Option 1: Offline parsing library [local parsing, fast, inaccurate]
- Option 2: Online parsing library [remote parsing, concurrency limit, accurate]
-
-
Register Baidu Developer
-
Baidu map open platform: https://lbsyun.baidu.com/
-
Reverse geocoding: https://lbsyun.baidu.com/index.php?title=webapi/guide/webservice-geocoding-abroad
https://api.map.baidu.com/reverse_geocoding/v3/?ak=yourak & amp;output=json & amp;coordtype=wgs84ll & amp;location=31.225696563611,121.49884033194 / /GET request
-
Register on the open platform and obtain the AK code: refer to “Appendix Three”
-
-
Test province analysis
- Note: Change the AK code in the code to your own
package bigdata.itcast.cn.momo.online.utils; import com.alibaba.fastjson.JSONObject; import org.apache.http.HttpEntity; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.util.EntityUtils; import javax.swing.text.html.parser.Entity; import java.io.IOException; import java.util.Map; public class HttpClientUtils {<!-- --> //Pass in the latitude and longitude and return the queried area public static String findByLatAndLng(String lat , String lng){<!-- --> try {<!-- --> CloseableHttpClient httpClient = HttpClients.createDefault(); String url = "http://api.map.baidu.com/reverse_geocoding/v3/?ak=l8hKKRCuX2zrRa93jneDrPmc2UspGatO & amp;output=json & amp;coordtype=wgs84ll & amp;location=" + lat + ", " + lng; System.out.println(url); //Request parsing HttpGet httpGet = new HttpGet(url); //got the answer CloseableHttpResponse response = httpClient.execute(httpGet); //retrieve data HttpEntity httpEntity = response.getEntity(); //Convert to JSON String json = EntityUtils.toString(httpEntity); //Return provinces from JSON Map<String,Object> result = JSONObject.parseObject(json, Map.class); if(result.get("status").equals(0)){<!-- --> Map<String,Object> resultMap = (Map<String,Object>)result.get("result"); resultMap = (Map<String, Object>) resultMap.get("addressComponent"); String province = (String) resultMap.get("province"); return province; } } catch (IOException e) {<!-- --> e.printStackTrace(); } return null; } public static void main(String[] args) {<!-- --> //test String sf = findByLatAndLng("43.921297","124.655376"); System.out.println(sf); } }
-
Summary
- Understand the implementation of province parsing
20: Flink code interpretation
-
Goal: Understand the basic implementation of Flink code
-
Path
- step1: Consume Kafka
- step2: real-time statistical analysis
- step3: Update the results to MySQL in real time
-
Implementation
-
Consume Kafka
//Build Kafka configuration Properties props = new Properties(); props.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092"); props.setProperty("group.id", "momo2"); //Build consumer FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>("MOMO_MSG", new SimpleStringSchema(),props); //Flink loads consumers DataStreamSource<String> streamSource = env.addSource(kafkaConsumer);
-
Real-time statistical analysis
//todo:3. Perform conversion statistics operations: //3.1: Statistics of total message volume countTotalMsg(streamSource); //3.2: Based on latitude and longitude, count the number of messages sent by each province countProvinceSenderMsg(streamSource); //3.3: Based on latitude and longitude, count the number of messages received by each province countProvinceReceiverMsg(streamSource); //3.4: Statistics of each user, volume of messages sent countUserNameSenderMsg(streamSource); //3.5: Statistics of each user, volume of messages received countUserNameReceiverMsg(streamSource); //5. Perform flink operation env.execute("momoFlinkCount");
-
Update results to MySQL in real time
streamOperator.addSink(new MysqlSink("2"));
if (status.equals("2")){<!-- --> String sql = "select * from momo_count where momo_groupType = '2' and momo_province= '" + value.getMoMoProvince() + "' "; ResultSet resultSet = stat.executeQuery(sql); boolean flag = resultSet.next(); if(flag) {<!-- --> sql = "update momo_count set momo_msgcount= '" + value.getMoMo_MsgCount() + "' where momo_groupType = '2' and momo_province= '" + value.getMoMoProvince() + "' "; }else {<!-- --> sql = "insert into momo_count(momo_province,momo_msgcount,momo_groupType) values ('" + value.getMoMoProvince() + "'," + value.getMoMo_MsgCount() + ",'2') "; } stat.executeUpdate(sql); }
-
-
Summary
- Understand the basic implementation of Flink code
21: Flink real-time computing test
-
Goal: Implement Flink real-time analysis test
-
Path
- step1: MySQL preparation
- step2: run the test
-
Implementation
-
MySQL preparation
-
Find SQL file
-
Run SQL files to create result databases and tables
-
-
Run the test
-
Start the Flink program: run MoMoFlinkCount
-
Start the Flume program
cd /export/server/flume-1.9.0-bin bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_kafka.properties -Dflume.root.logger=INFO,console
-
Start simulation data
java -jar /export/data/momo_init/MoMo_DataGen.jar \ /export/data/momo_init/MoMo_Data.xlsx \ /export/data/momo_data/ \ 100
-
Observe MySQL results
-
-
-
Summary
- Implement Flink real-time analysis and testing
AI Side Business Practical Manual: http://www.yibencezi.com/notes/253200?affiliate_id=1317 (Currently 40 + tools and practical cases, continuously updated, practical booklet ranks first, you can’t make money after three months of doing it Find me a refund and make a friend’s product)