spark-4 Spark Streaming

Spark Streaming

  • 4. Spark Streaming
    • 4.1 Spark Streaming and Storm
    • 4.2 Principle of Streaming
    • 4.3 Create StreamingContext
    • 4.4 Input DStream and Receiver
    • 4.5 Operation of DStream
      • 4.5.1 DStream Conversion
      • 4.5.2 Dstream output
    • 4.6 DataFrame and SQL operations
    • 4.7 Stream Processing
      • 4.7.1 Spark-shell stream processing
      • 4.7.2 File stream
      • 4.7.3 Socket stream
      • 4.7.4 Kafka stream (window)
    • 4.8 updateStateByKey operation

4. Spark Streaming

Spark Streaming is a high-throughput, high-fault-tolerant real-time processing engine extended from Spark Core, which supports scalable, high-throughput, and fault-tolerant real-time data stream processing. Data can be ingested from many sources such as Kafka, Flume, Kinesis or TCP sockets and can be processed using complex algorithms expressed with high-level functions such as map, reduce, join and window. Finally, processed data can be pushed to file systems, databases, and event dashboards. In fact, Spark’s MLlib machine learning and GraphX graph processing algorithms can also be applied to data streams. A typical processing flow is shown in Figure 4.1.


Figure 4.1 Spark Streaming processing process

4.1 Spark Streaming and Storm

Both Spark Streaming and Storm can be used for real-time stream computing. But the difference between them is very big. One of the differences is that the computing models of Spark Streaming and Storm are completely different. Spark Streaming is based on RDD, so it is necessary to collect data within a short period of time, such as within 1 second, as an RDD, and then target The data of this batch is processed. However, Storm can process and calculate every piece of data immediately. Therefore, in a strict sense, Spark Streaming can only be called a quasi-real-time stream computing framework; while Storm is a real-time computing framework in the true sense.

In addition, an advanced feature supported by Storm is temporarily not available in Spark Streaming, that is, Storm supports distributed streaming computing programs (Topolopy). During the running process, the degree of parallelism can be adjusted dynamically, thereby dynamically improving the concurrent processing capability. However, Spark Streaming cannot dynamically adjust the degree of parallelism.

But Spark Streaming also has its advantages. First of all, because Spark Streaming is processed based on batches, it has several times or even dozens of times the throughput compared with Storm based on single data processing. In addition, because Spark Streaming is also in the Spark ecosystem, Spark Streaming can seamlessly integrate with Spark Core, Spark SQL, and even Spark Mllib and Spark GraphX. The stream-processed data can immediately perform various map and reduce conversion operations, can immediately use SQL to query, and can even immediately use machine learning or graph computing algorithms for processing. This one-stop big data processing function and advantage is unmatched by Storm.

Therefore, you can choose to use Storm when the real-time requirements are particularly high and the real-time data volume is unstable, such as when there are peak periods during the day. However, if the real-time requirements are average, quasi-real-time processing of 1 second is allowed, and dynamic adjustment of parallelism is not required, Spark Streaming is a better choice.

4.2 Principle of Streaming

Spark Streaming provides a high-level abstraction called discrete stream or DStream, which represents a continuous data stream. Internally, DStream is represented as a sequence of RDDs, and each RDD contains data at a certain interval. All operations on DStream will be correspondingly converted to operations on RDD.

The basic steps to write a Spark Streaming program are:
(1) Define the input source by creating an input DStream;
(2) Stream computing is defined by applying transformation operations and output operations to DStream;
(3) Use streamingContext.start() to start receiving data and processing flow;
(4) Use the streamingContext.awaitTermination() method to wait for the end of processing (either manually or due to an error);
(5) You can manually end the stream computing process through streamingContext.stop().

4.3 Create StreamingContext

The creation of DStream depends on the StreamingContext object. There are two ways to create StreamingContext: through SparkContext and through SparkConf;
(1) Spark conf creation:

val conf = new SparkConf().setAppName(appName).setMaster(master);
val ssc = new StreamingContext(conf, Seconds(1));

appName is the application name used to display on the Spark UI. master is the URL of a Spark, Mesos, or Yarn cluster, or local[*]. The batch interval can be set according to your application’s latency requirements and available cluster resources.
(2) Created through SparkContext:

val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(1))

4.4 Input DStream and Receiver

The input DStream obtained from the source corresponds to a receiver object, which can receive messages from the source and store them in Spark memory for processing. Spark Streaming provides two streaming sources:
(1) Basic source: sources that can directly use the streaming context API, such as files and sockets;
(2) Advanced sources: Kafka and Flume sources obtained by referencing additional entity classes. Multiple input DStreams can be created in the application to read multiple data streams simultaneously.

The worker/executor is a persistent task, so it will occupy a core allocated to the application, so Spark Streaming needs to allocate enough cores to run the receiver and process the received data. When running a Spark Streaming program locally, do not use “local” or “local[1]” as the main URL. Either of these means that running the task locally uses only one thread. If using a receiver-based input DStream (such as Kafka, Flume, etc.), this indicates that a single thread will be used to run the receiver, leaving no threads for processing the received data. Therefore, when running locally, always use “local[n]” as the main URL, where n must be greater than the number of receivers running, otherwise the system will receive the data, but cannot process it.

Sources such as Kafka and Flume require external dependencies, and some of these libraries have complex dependencies. These high-level source codes are not available in Spark shell, so applications based on these high-level source codes cannot be tested in spark-shell, but you can manually install package import.

Based on reliability considerations, data sources can be divided into two categories: reliable receivers send confirmation to the source (such as Kafka, Flume) after receiving data from Receiver, and store the data in spark; unreliable receivers will not Send an acknowledgment to the source.

4.5 Operation of DStream

4.5.1 DStream conversion

Similar to RDD, transformation operations allow modification of data from the input DStream, and transformation operations include stateless transformation operations and stateful transformation operations.
Example of stateless conversion operation: stateless conversion is used in the word frequency statistics of “socket stream”, and each statistics only counts the word frequency of the words arriving in the current batch, which has nothing to do with the previous batch and will not be accumulated.
Examples of stateful conversion operations: sliding window conversion operation and updateStateByKey operation.
Some common transformations (similar to RDD transformation operations) are as follows:
(1) Window operation
Every time the window slides on the source DStream, the source RDDs in the window are combined/operated to generate the window RDD. Any window operation needs to specify two parameters: window length: the duration of the window (the value in the code is 30); sliding interval: the interval for performing the window operation (the value in the code is 10). These two parameters must be multiples of the source DStream’s batch interval.
Example of window operations: Want to extend the previous example by generating word counts in the last 30 seconds of data every 10 seconds. To do this, we have to apply the reduceByKey operation on the (word,1) DStream key-value pair on the most recent 30 seconds of data. This is done using the reduceByKeyAndWindow operation.

val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))

All sliding window operations require parameters: windowLength (window length) and slideInterval (sliding interval). Common window operations are summarized as follows:
Window: Calculate the new Dstream based on the windowed batch data generated by the source DStream.
countByWindow: Returns the sliding window count of elements in the DStream.
reduceByWindow: Returns a single-element stream. Create this single-element stream by aggregating the elements of the stream at sliding time intervals using the function func. The function func must satisfy the associative law to support parallel computing.
reduceByKeyAndWindow (three parameters): When applied to a DStream composed of (K, V) key-value pairs, a new DStream composed of (K, V) key-value pairs will be returned. The value of each key is aggregated by a given reduce function (func function). Note: By default, this operator uses Spark’s default number of concurrent tasks to group. You can specify a different number of tasks by setting the numTasks parameter.
reduceByKeyAndWindow (four parameters): reduceByKeyAndWindow is more efficient than the above reduceByKeyAndWindow (three parameters), the reduce value of each window is calculated incrementally based on the reduce value of the previous window; it will enter the slide The reduce operation is performed on the new data of the window, and the “reverse reduce” operation is performed on the old data leaving the window. However, it can only be used for “reversible reduce functions”, that is, those reduce functions have a corresponding “reverse reduce function” (passed in as the InvFunc parameter).
countByValueAndWindow: When applied to a DStream composed of (K, V) key-value pairs, return a new DStream composed of (K, V) key-value pairs. The value of each key is their frequency in the sliding window.
updateStateByKey: When the state needs to be maintained across batches, the updateStateByKey operation must be used.

(2) Multi-stream association
The join operation on window computing is very useful. Different types of joins can be easily implemented in Spark Streaming, including leftouterjoin, rightouterjoin, and fullouterjoin. The RDD generated by stream1 in each batch interval is associated with the RDD generated by stream2 as follows:

val stream1: DStream[String, String] = ...
val stream2: DStream[String, String] = ...
val joinedStream = stream1. join(stream2)

4.5.2 Dstream output

The output operation allows the data of the DStream to be pushed to an external system, such as a database or files, because the output operation triggers the actual execution of all DStream transformations (similar to the action operation of RDD), and allows the external system to use the converted data. The output operation is shown in the figure 4.2 shows the following types:

Figure 4.2 Dstream output operation

4.6 DataFrame and SQL operations

DataFrame and SQL operations can be easily used on streaming data, but the SparkContext being used must be created as a SparkSession using a StreamingContext. The following example generates word counts using DataFrame and SQL. Each RDD is converted to a DataFrame, registered as a temporary table and then queried using SQL:

val words: DStream[String] = words.foreachRDD {<!-- --> rdd =>
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
val wordsDataFrame = rdd.toDF("word") wordsDataFrame.createOrReplaceTempView("words")
val wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word") wordCountsDataFrame.show() }

4.7 Stream processing

4.7.1 Spark-shell stream processing

After entering the spark-shell, the SparkConext obtained by default, that is, sc, creates a StreamingContext object from the SparkConf object. The StreamingContext object created in the spark-shell is as follows:

import org.apache.spark.streaming._
val ssc = new StreamingContext(sc, Seconds(1))

If you are writing an independent Spark Streaming program instead of running it in spark-shell, you need to create a StreamingContext object as follows:

import org.apache.spark._
import org.apache.spark.streaming._
val conf = new SparkConf().setAppName("TestDStream").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(1))

4.7.2 File Stream

The file stream can read local files or files on HDFS. If Spark is deployed in yarn mode, start spark-shell to read the corresponding files on HDFS by default: hdfs:xxxx/user/xx/ document;

import org.apache.spark.streaming._scala>
val ssc = new StreamingContext(sc, Seconds(5))
val lines = ssc.textFileStream("hdfs://xxx/yzg_test.txt")
val Counts = lines. flatMap(_. split(" ")). map((_,1)). reduceByKey(_ + _)
 Counts.saveAsTextFiles("hdfs://xxx/bendi"))
ssc. start()
ssc. awaitTermination()
ssc. stop()

After the above code runs in spark-shell, it reads the files on hdfs every 5 seconds and performs word frequency statistics and writes them to the “bendi-timestamp” folder in hdfs until ssc.stop(); Counts. saveAsTextFiles(“file://xxx/bendi”)) and Counts.print write local and std output respectively.

4.7.3 Socket stream

Spark Streaming can monitor and receive data calculations in real time through the Socket port, the steps are as follows:
(1) The driver side creates a StreamingContext object, creates JobScheduler and ReceiverTracker in turn when starting the context, and calls their start method;
(2) ReceiverTracker sends a start receiver message to the remote Executor in the start method, and the message contains the address information of ServerSocket;
(3) On the executor side, the Receiver TrackerEndpoint terminal receives the message, extracts the message content, uses sparkContext to combine the message content to create a ReceiverRDD object, and finally submits the rdd to the spark cluster.
In terms of code implementation, use nc –lk 9999 to open the 9999 listening port of the host with address 172.22.241.184, and continue to write data into it; use spark-shell to realize the listening port code as follows, the input source is the socket source, and after simple word frequency statistics , and the statistical results are output to the HDFS file system.

import org.apache.spark._scala>
import org.apache.spark.streaming._scala>
import org.apache.spark.storage.StorageLevel
val ssc = new StreamingContext(sc, Seconds(5))
val lines = ssc.socketTextStream("172.22.241.184", 9999, StorageLevel.MEMORY_AND_DISK_SER)
val wordCounts = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _)
wordCounts.saveAsTextFiles("hdfs://xxx/bendi-socket"))
ssc. start()
ssc. awaitTermination()
ssc. stop()

4.7.4 Kafka stream (window)

Advanced input sources such as Kafka and Flume need to rely on independent libraries (jar files). If you use spark-shell to read advanced input sources such as Kafka, you need to place the corresponding dependent jar packages in the spark dependency folder lib. According to the currently used kafka version, adapt the required spark-streaming-kafka dependent version and download it from the maven repository. Put the corresponding dependent jar package in CDH’s spark dependency folder lib, and verify whether it is successful by introducing dependencies in the package:

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
val ssc = new StreamingContext(sc, Seconds(5))
ssc.checkpoint("hdfs://usr/spark/kafka/checkpoint")
val zkQuorum = "172.22.241.186:2181"
val group = "test-consumer-group"
val topics = "yzg_spark"
val numThreads = 1
val topicMap =topics.split(",").map((_,numThreads.toInt)).toMap
val lineMap = KafkaUtils.createStream(ssc,zkQuorum,group,topicMap)
val pair = lineMap.map(_._2).flatMap(_.split(" ")).map((_,1))
val wordCounts = pair.reduceByKeyAndWindow(_ + _,_ -_,Minutes(2),Seconds(10),2)
wordCounts.print
ssc.start

4.8 updateStateByKey operation

When Spark Streaming needs to maintain state across batches, the updateStateByKey operation must be used. Taking word frequency statistics as an example, for stateful conversion operations, the word frequency statistics of the current batch are continuously accumulated on the basis of the word frequency statistics results of the previous batches, so the word frequency obtained in the final statistics is the total of words in all batches Word frequency statistics.

val updateFunc = (values: Seq[Int], state: Option[Int]) => {<!-- -->
val currentCount = values.foldLeft(0)(_ + _)
val previousCount = state. getOrElse(0)
Some(currentCount + previousCount) }

accomplish:

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevel
val ssc = new StreamingContext(sc, Seconds(5)) ssc. checkpoint("hdfs:172.22.241.184:8020//usr/spark/checkpoint")
val lines = ssc.socketTextStream("172.22.241.184", 9999, StorageLevel.MEMORY_AND_DISK_SER)
val wordCounts = lines.flatMap(_.split(" ")).map((_,1)).updateStateByKey[Int](updateFunc)
wordCounts.saveAsTextFiles("hdfs:172.22.241.184:8020//user/spark/bendi-socket")
ssc. start()
ssc. awaitTermination()
ssc. stop()