Operation of Spark Streaming DStream

1. Definition of DStream

DStream is a discrete stream, a high-level abstraction provided by Spark Streaming, which represents a continuous data stream. DStreams can be created by inputting data sources, such as Kafka, Flume, or by applying higher-order functions to other DStreams, such as map, reduce, join, and window.
The inside of DStream is actually a series of RDDs that are continuously generated. RDD is the core abstraction of Spark Core, that is, an immutable and distributed data set.
The operator applied to the DStream is actually translated into an operation on each RDD in the DStream at the bottom layer. For example, performing a map operation on a DStream will generate a new DStream. The underlying principle is that for each RDD in the input DStream The RDD of each time period is applied with a map operation, and then the generated RDD is used as an RDD of that time period in the new DStream.

2. DStream operation

1, normal conversion operation

Conversion Description
map(func) Each element of the source DStream returns a new DStream through the function func.
flatMap(func) It is similar to the map operation, the difference is that each input element can be mapped to 0 or more output elements.
filter(func) Selects elements on the source DSTREAM where the Func function returns only true, eventually returning a new DSTREAM.
repartition(numPartitions) Change the partition size of DStream by the value of the input parameter numPartitions.
union(otherStream) Returns a new DSTREAM containing the elements of the source DStream and other DStream combined.
count() Count the number of elements in the RDD contained in the source DStream, and return a DStream in which the internal RDD contains only one element .
reduce(func) Use the function func (with two parameters and return a result) to aggregate the elements of each RDD in the source DStream , returns a new DStream containing only one element of the RDD inside.
countByValue() Calculate the frequency of occurrence of elements in each RDD in DStream and return a new DStream[(K,Long)], where K is the type of element in RDD, and Long is the frequency of occurrence of the element.
reduceByKey(func, [numTasks]) When a DStream of type (K, V) key-value pair is called, the return type is a new DStream of key-value pairs of type (K, V), where the value V for each key is aggregated using the aggregation function func. Note: By default, tasks are submitted using Spark’s default parallelism (parallelism is 2 in local mode, and 8 in cluster mode). You can set different numbers of parallel tasks by configuring numTasks.
join(otherStream, [numTasks]) When the called types are (K, V) and (K, W) key-value pairs When there are 2 DStreams, return a new DSTREAM whose type is (K, (V, W)) key-value pairs.
cogroup(otherStream, [numTasks]) When the two DStreams being called contain (K, V) and (K, W) keys respectively Value pair, returns a new DStream of type (K, Seq[V], Seq[W]).
transform(func) Returns a new DStream by applying the RDD-to-RDD function to each RDD of the source DStream, which can be used in DStream does arbitrary RDD operations.
updateStateByKey(func) Returns a new state DStream, where the state of each key is based on the previous state of the key and the new value of the key An update after applying the given function func. This method can be used to maintain any state data per key.

2, window conversion function

Transition Description
window(windowLength, slideInterval) returns a new DStream based on the window batch calculation of the source DStream.
countByWindow(windowLength,slideInterval) Returns the number of elements in the DStream based on the sliding window.
reduceByWindow(func, windowLength,slideInterval) Based on the sliding window, the elements in the source DStream are aggregated to obtain a new DStream.
reduceByKeyAndWindow(func,windowLength, slideInterval, [numTasks]) DStream based on the sliding window pair (K, V) key-value pair type Values are aggregated by K using the aggregate function func to obtain a new DStream.
reduceByKeyAndWindow(func, invFunc,windowLength, slideInterval, [numTasks]) A more efficient implementation of reduceByKkeyAndWindow(), first for the sliding window Incrementally aggregate the data in the new time interval and remove the data statistics in the time interval between the earliest and the new data volume. For example, to calculate the WordCount of the past 5-second window at the moment of t+4 seconds, then we can add the statistics of the past 5 seconds at the moment of t+3 to the statistics of [t+3,t+4], and subtract [t -2, t-1] statistics, this method can reuse the statistics of the middle three seconds to improve the efficiency of statistics.
countByValueAndWindow(windowLength,slideInterval, [numTasks]) Calculate the frequency of occurrence of each element in each RDD in the source DStream based on the sliding window and return DStream[(K,Long)], where K is the type of elements in RDD, and Long is the frequency of elements. Like countByValue, the number of reduce tasks can be configured via an optional parameter.

In Spark Streaming, data processing is performed in batches, while data collection is performed item by item. Therefore, in Spark Streaming, the batch duration will be set first, and when the batch duration is exceeded, the collected data will be aggregated into a batch of data and handed over to the system for processing.

For window operations, there will be N batches of data inside the window. The size of the batch data is determined by the window duration (window duration), and the window duration refers to the duration of the window. In window operations, only the window The processing of batch data will be triggered only when the length is satisfied. In addition to the length of the window, another important parameter of the window operation is the sliding interval (slide duration), which refers to how long it takes for the window to slide once to form a new window. By default, the sliding window is the same as the batch interval. The window interval is generally set larger than the two of them.

3, output operation

Conversion Description
print() prints out the first 10 elements of the data in the DStream in the Driver.
saveAsTextFiles(prefix, [suffix]) Save the content in DStream as a text file in the form of text, and each batch processing interval The generated files are named with prefix-TIME_IN_MS[.suffix].
saveAsObjectFiles(prefix, [suffix]) Serialize the content in DStream by object and save it in SequenceFile format. The files generated in each batch interval are named in the form of prefix-TIME_IN_MS[.suffix].
saveAsHadoopFiles(prefix, [suffix]) Save the content in DStream as a Hadoop file in the form of text, and each batch interval The generated files are named with prefix-TIME_IN_MS[.suffix].
foreachRDD(func) The most basic output operation, apply the func function to the RDD in DStream, this operation will output data to the external system , such as saving RDDs to files or network databases. It should be noted that the func function is executed in the Driver process running the streaming application.

3. Detailed explanation of common operations

1, transform(func)

The transform operation (transform operation), and its analog transformWith operation, allow to apply arbitrary RDD-to-RDD functions on DStream. It can implement operations that are not provided in the DStream API, such as the connection operation of two data streams.
Sample code:

val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information

val cleanedDStream = wordCounts. transform {<!-- --> rdd =>
  rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
  ...
}

2, updateStateByKey operation

The general operations used do not record historical data, that is to say, only the data in the currently defined time period is recorded, regardless of the time period before and after. If you want to count the total data in the historical time and update it in real time, how to solve it? The updateStateByKey operation allows you to maintain arbitrary state while constantly being updated with new information.

To use the updateStateByKey operation, the following two steps must be performed:
(1) Define the state: The state can be any data type.
(2) Define the state update function: use a function to specify how to update the state with the previous state and the new value obtained from the input stream.
Real-time update is realized by updateStateByKey(updateFunction) for DStream.

The update function has two parameters:
(1) newValues is the current newly entered data.
(2) runningCount is historical data, which is encapsulated into Option.

Example:
First we need to understand the type of data
Write processing method
Package result
code:

//Define the update function
//The data of Int type we use here, because we need to count the number
def updateFunc(newValues : Seq[Int],state :Option[Int]) : Some[Int] = {<!-- -->
 //The incoming newVaules saves all the data of the current time period into Seq
 //Call foldLeft(0)(_ + _) to accumulate from position 0 to end
 val currentCount = newValues. foldLeft(0)(_ + _)
 //Get the historical value, None when there is no historical data, Some when there is data
 //getOrElse(x) method, if the obtained value is None, replace it with x
 val previousCount = state. getOrElse(0)
 //Calculate the result, encapsulate it into Some and return
 Some(currentCount + previousCount)
}
//use
val stateDStream = DStream. updateStateByKey[Int](updateFunc)