Java 8’s Stream is so powerful, do you know its principle?

The Java 8 API adds a new abstraction called Stream that lets you manipulate data in a declarative way.

Stream provides a high-level abstraction for Java collection operations and representations in an intuitive way similar to querying data from a database with SQL statements.

Stream API can greatly improve the productivity of Java programmers, allowing programmers to write efficient, clean and concise code.

This article will analyze the implementation principle of Stream.

The composition and characteristics of Stream

Stream (stream) is a queue of elements from a data source and supports aggregation operations:

  • Elements are objects of a specific type that form a queue. Stream in Java does not store and manage elements like a collection, but calculates on demand

  • The source of the data source stream can be a collection Collection, an array Array, I/O channel, a generator generator, etc.

  • Aggregation operations are similar to SQL statements, such as filter, map, reduce, find, match, sorted, etc.

Different from the previous Collection operation, the Stream operation has two basic features:

  • Pipelining: Intermediate operations will return the stream object itself. In this way, multiple operations can be concatenated into a pipeline, just like the fluent style. Doing so allows for optimizations such as delayed execution (laziness evaluation) and short-circuiting (short-circuiting)

  • Internal Iteration: In the past, collection traversal was performed by means of Iterator or For-Each to explicitly iterate outside the collection, which is called external iteration. Stream provides an internal iteration method, implemented through the visitor pattern (Visitor).

Different from iterators, Stream can be operated in parallel, while iterators can only be operated imperatively and serially. As the name implies, when traversing in serial mode, each item is read before reading the next item. When using parallel traversal, the data will be divided into multiple segments, each of which is processed in a different thread, and the results are output together.

The parallel operation of Stream relies on the Fork/Join framework (JSR166y) introduced in Java7 to split tasks and Speed up the process. The evolution of the parallel API of Java is basically as follows:

java.lang.Thread in 1.0-1.4

java.util.concurrent in 5.0

Phasers et al in 6.0

Fork/Join framework in 7.0

Lambdas in 8.0

Stream has parallel processing capabilities, and the processing process will be divided and conquered, that is, a large task is divided into multiple small tasks, which means that each task is an operation:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers. parallelStream()
       .forEach(out::println);

You can see that a simple line of code helps us realize the function of parallel output of elements in the collection, but because the order of parallel execution is uncontrollable, the results of each execution are not necessarily the same.

If they must be the same, you can use the forEachOrdered method to perform the termination operation:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers. parallelStream()
       .forEachOrdered(out::println);

Here is a question, if the results need to be ordered, is it against our original intention of parallel execution? Yes, in this scenario, there is obviously no need to use parallel streams, and the serial streams can be used directly to execute, otherwise the performance may be worse, because all the parallel results are forced to be sorted in the end.

OK, let’s first introduce the relevant knowledge of the Stream interface.

BaseStream interface

The parent interface of Stream is BaseStream, which is the top-level interface implemented by all streams, defined as follows:

public interface BaseStream<T, S extends BaseStream<T, S>>
        extends AutoCloseable {
    Iterator<T> iterator();

    Spliterator<T> spliterator();

    boolean isParallel();

    S sequential();

    S parallel();

    S unordered();

    S onClose(Runnable closeHandler);

    void close();
}

Among them, T is the type of elements in the stream, S is an implementation class of BaseStream, and the elements in it are also T code> and S is also itself:

S extends BaseStream<T, S>

Are you a little dizzy?

In fact, it is easy to understand. Let’s look at the use of S in the interface: such as sequential() and parallel() method, they all return S instances, that is to say, they respectively support serial or parallel operations on the current stream, and return ” Changed” stream object.

If it is parallel, it must involve splitting the current stream, that is, splitting a stream into multiple sub-streams, and the type of the sub-stream must be consistent with the parent stream. The sub-flow can continue to split the sub-flow, and keep splitting…

That is to say, S here is an implementation class of BaseStream, which is also a stream, such as Stream, IntStream, LongStream, etc.

Stream interface

Let’s take a look at the interface declaration of Stream:

public interface Stream<T> extends BaseStream<T, Stream<T>>

It is not difficult to understand with reference to the above explanation: that is, Stream can continue to be split into Stream, we can confirm it through some methods:

Stream<T> filter(Predicate<? super T> predicate);
<R> Stream<R> map(Function<? super T, ? extends R> mapper);
<R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper);
Stream<T> sorted();
Stream<T> peek(Consumer<? super T> action);
Stream<T> limit(long maxSize);
Stream<T> skip(long n);
...

These are all intermediate operations of the operation stream, and their return results must be the stream object itself.

Close stream operation

BaseStream implements the AutoCloseable interface, that is, the close() method will be called when the stream is closed. At the same time, BaseStream also provides us with the onClose() method:

S onClose(Runnable closeHandler);

When the close() interface of AutoCloseable is called, it will trigger the onClose() method of the stream object, but there are a few points to note:

  • The onClose() method returns the stream object itself, which means it can be called multiple times on the object

  • If multiple onClose() methods are called, they will be triggered in the order they are called, but if a method has an exception, only the first exception will be thrown up

  • An exception thrown by the previous onClose() method will not affect the use of the subsequent onClose() method

  • If multiple onClose() methods throw exceptions, only the stack of the first exception will be displayed, while other exceptions will be compressed and only part of the information will be displayed

Parallel stream and serial stream

The BaseStream interface provides two methods: Parallel Stream and Serial Stream respectively. These two methods can be called any number of times. It is also possible to mix calls, but in the end only the return result of the last method call shall prevail.

Refer to the description of the parallel() method:

Returns an equivalent stream that is parallel. May return

itself, either because the stream was already parallel, or because

The underlying stream state was modified to be parallel.

So calling the same method multiple times does not generate a new stream, but directly reuses the current stream object.

In the following example, the last call to parallel() shall prevail, and finally the sum is calculated in parallel:

stream. parallel()
   .filter(...)
   .sequential()
   .map(...)
   .parallel()
   .sum();

The man behind ParallelStream: ForkJoinPool

The ForkJoin framework is a new feature from JDK7. Like ThreadPoolExecutor, it also implements the Executor and ExecutorService interfaces. It uses an “infinite queue” to save the tasks that need to be executed, and the number of threads is passed in through the constructor. If the desired number of threads is not passed into the constructor, the number of CPUs available to the current computer will be set. is the number of threads as the default.

ForkJoinPool is mainly used to solve problems using Divide-and-Conquer Algorithm, typical applications such as _Quick Sort Algorithm_. The point here is that ForkJoinPool needs to use relatively few threads to handle a large number of tasks.

For example, to sort 10 million data, this task will be divided into two sorting tasks of 5 million and one merge task for these two sets of 5 million data .

By analogy, the same segmentation process will be performed for 5 million data, and at the end a threshold will be set to specify when the data size reaches the limit, such segmentation process will stop. For example, when the number of elements is less than 10, splitting is stopped and insertion sort is used to sort them instead. Then in the end, there will be about 2,000,000+ tasks in total.

The crux of the problem is that, for a task, it can only be executed when all its subtasks are completed. Imagine the process of merging and sorting.

So when using ThreadPoolExecutor, there is a problem with divide and conquer, because threads in ThreadPoolExecutor cannot add another task to the task queue and wait for the task to complete before continuing. And When using ForkJoinPool, you can let the thread in it create a new task and suspend the current task. At this time, the thread can select a subtask from the queue to execute.

So what is the difference in performance when using ThreadPoolExecutor or ForkJoinPool?

First of all, using ForkJoinPool can use a limited number of threads to complete a lot of tasks with “parent-child relationship”, such as using 4 threads to complete more than 2 million tasks. When using ThreadPoolExecutor, it is impossible to complete, because Thread in ThreadPoolExecutor cannot choose to execute subtasks first. When 2 million tasks with parent-child relationship need to be completed, 2 million threads are also required. Obviously, this is not feasible.

Principle of Work Stealing:

  1. Each worker thread has its own work queue WorkQueue;

  2. This is a double-ended queue dequeue, which is private to the thread;

  3. The subtask of fork in ForkJoinTask will be placed at the head of the worker thread running the task, and the worker thread will process the tasks in the work queue in LIFO order, that is, the way of stack;

  4. In order to maximize CPU utilization, idle threads will “steal” tasks from other threads’ queues for execution

  5. But it steals tasks from the tail of the work queue to reduce competition with the thread to which the queue belongs;

  6. The operation of the double-ended queue: push()/pop() is only called in its owner worker thread, and poll() is called when other threads steal tasks;

  7. When there is only the last task left, there will still be competition, which is achieved through CAS;

Looking at ParallelStream from the perspective of ForkJoinPool

Java 8 adds a general-purpose thread pool to ForkJoinPool, which is used to handle tasks that have not been explicitly submitted to any thread pool. It is a static element on type ForkJoinPool that has a default number of threads equal to the number of CPUs on the running computer.

Automatic parallelization occurs when new methods added on the Arrays class are called.

For example, the parallel quick sort used to sort an array is used to traverse the elements in an array in parallel. Automatic parallelization is also used in Java 8’s newly added Stream API.

For example, the following code is used to traverse the elements in the list and perform the required operation:

List<UserInfo> userInfoList =
        DaoContainers.getUserInfoDAO().queryAllByList(new UserInfoModel());
userInfoList.parallelStream().forEach(RedisUserApi::setUserIdUserInfo);

Operations on the elements of the list are performed in parallel. The forEach method will create a task for the calculation operation of each element, which will be processed by the commonPool in the ForkJoinPool mentioned above.

Of course, the above parallel computing logic can also be completed using ThreadPoolExecutor, but in terms of code readability and code size, using ForkJoinPool is obviously better.

For the number of threads in the ForkJoinPool general-purpose thread pool, it is usually sufficient to use the default value, that is, the number of processors of the computer at runtime. You can also adjust the number of threads in ForkJoinPool by setting the system property: -Djava.util.concurrent .ForkJoinPool.common.parallelism=N (N is the number of threads).

It is worth noting that the currently executing thread will also be used to execute tasks, so the final number of threads is N + 1, and 1 is the current main thread.

There is a problem here. If you use _blocking operations_, such as I/O, in the execution calculation of parallel streams, it is likely to cause some problems:

public static String query(String question) {
  List<String> engines = new ArrayList<String>();
  engines.add("http://www.google.com/?q=");
  engines.add("http://duckduckgo.com/?q=");
  engines.add("http://www.bing.com/search?q=");

  // get element as soon as it is available
  Optional<String> result = engines. stream(). parallel(). map((base) - {
    String url = base + question;
    // open connection and fetch the result
    return WS.url(url).get();
  }).findAny();
  return result. get();
}

This example is typical, let’s analyze it:

  • This parallel stream computing operation will be executed jointly by the main thread and the default JVM ForkJoinPool.commonPool().

  • map is a blocking method, which needs to access the HTTP interface and get its response, so when any worker thread executes here will block and wait for the result.

  • Therefore, when the calculation method is called in other places through parallel flow at this time, it will be affected by the method of blocking and waiting here.

  • The current implementation of ForkJoinPool does not consider compensating for worker threads that are blocked waiting for newly spawned threads, so eventually threads in ForkJoinPool.commonPool() will be spared And block waiting.

As we have learned from the analysis of the example above, the execution of lambda is not instantaneous, all programs using parallel streams may become the source of blocking programs, and other parts of the program will not be able to access these workers during execution , which means that any program that relies on parallel streams will become unpredictable and potentially dangerous when something else occupies the common ForkJoinPool.

summary:

  1. Consider using ForkJoinPool when dealing with recursive divide-and-conquer algorithms.

  2. Carefully set the threshold at which task division is no longer done, which has an impact on performance.

  3. Some features in Java 8 will use the general thread pool in ForkJoinPool. In some cases, it is necessary to adjust the default number of threads in the thread pool

  4. Lambdas should try to avoid side effects, that is, avoid mutating heap-based state as well as any IO

  5. Lambda should not interfere with each other, that is to say, avoid modifying the data source (because this may cause thread safety issues)

  6. Avoid accessing state that may change during the lifetime of the stream operation

Performance of parallel streams

The performance of the parallel stream framework is affected by the following factors:

  • Data size: The data is large enough, and the processing time of each pipeline is long enough, so parallelism is meaningful;

  • Source data structure: Each pipeline operation is based on the initial data source, usually a collection, and splitting different collection data sources will consume a certain amount;

  • Boxing: Processing primitive types is faster than boxed types;

  • Number of cores: By default, the more cores there are, the more threads will be started by the underlying fork/join thread pool;

  • Unit processing overhead: The longer the time spent on each element in the stream, the more obvious the performance improvement brought by parallel operations;

The source data structures are divided into the following 3 groups:

  • Good performance: ArrayList, array or IntStream.range (data supports random reading and can be easily divided arbitrarily)

  • General performance: HashSet, TreeSet (data is not easy to decompose fairly, most of them are also possible)

  • Poor performance: LinkedList (need to traverse linked list, difficult to decompose in half), Stream.iterate and BufferedReader.lines (unknown length, difficult to decompose)

Note: The following parts are excerpted from: The behind-the-scenes principle of Streams, by the way, thanks to the author _Brian Goetz_, the writing is too transparent.

NQ model

To determine whether parallelism will provide a speedup, the final two factors to consider are: the amount of data available and the amount of computation performed on each data element.

In our original description of parallel decomposition, we employed the concept of splitting sources until a segment is small enough that a sequential approach to solving the problem on that segment is more efficient. The segment size must depend on the problem being solved and, more precisely, on the amount of work done by each element.

For example, calculating the length of a string involves much less work than calculating the SHA-1 hash of the string. The more work done for each element, the lower the “large enough to take advantage of parallelism” threshold. Similarly, the more data you have, the more segments you can split without violating the “too small” threshold.

A simple but useful parallel performance model is the NQ model, where N is the number of data elements and Q is the amount of work performed for each element . The larger the product N*Q, the more likely you will get a parallel speedup. For problems with a small Q, such as summing numbers, you might generally want to see N > 10,000 for a speedup; as Q code> increases, the data size required to get the speedup will decrease.

Many barriers to parallelization, such as splitting costs, combining costs, or encounter order sensitivity, can be mitigated by operations with higher Q . While the results of splitting a LinkedList trait can be bad, it is still possible to get a parallel speedup as long as you have a sufficiently large Q.

Encounter order

Encounter order refers to whether the order in which the source dispatches elements is critical to the computation. Some sources (such as hash-based sets and maps) do not have a meaningful encounter order. The stream flag ORDERED describes whether the stream has a meaningful encounter order.

The spliterator of the JDK collection will set this flag according to the specification of the collection;

Some intermediate operations may inject ORDERED (sorted()) or clear it (unordered()).

If a stream has no encounter order, most stream operations must respect that order. For sequential execution, encounter order is “automatically preserved”, as elements are naturally processed in the order in which they are encountered.

Even in parallel execution, many operations (stateless intermediate operations and some terminating operations (such as reduce())), respecting encounter order does not incur any real cost.

But for other operations (stateful intermediate operations, terminating operations whose semantics are tied to encounter order, such as findFirst() or forEachOrdered()), in parallel execution respect The responsibility of encounter order can be significant.

If the stream has a defined encounter order, but that order is not meaningful to the result, you can speed up include order-sensitive types by removing the ORDERED flag with the unordered() operation Sequential execution of the pipeline of operations.

As an example of operations that are sensitive to encounter order, consider limit(), which truncates a stream at a specified size. Implementing limit() in sequential execution is simple: keep a counter of how many elements have been seen, and discard any elements after that.

But in parallel execution, implementing limit() is much more complicated; you need to keep the first N elements. This requirement greatly limits the ability to exploit parallelism; if the input is divided into parts, you don’t know whether the results of a part will be included in the final result until all parts before that part have completed.

Therefore, the implementation generally makes the mistake of choosing not to use all available cores, or to cache the entire experimental result until you reach the target length.

If the stream is out of encounter order, the limit() operation is free to choose any N elements, which makes execution much more efficient. Elements can be sent downstream as soon as they are known, without any buffering, and the only coordination between threads is sending a signal to ensure the target stream length is not exceeded.

Another less common example where order costs are encountered is sorting. The sorted() operation implements a stable sort (same elements appear in the output in the same order they entered the input) if the encounter order is meaningful, whereas for unordered streams, Stability (which has a cost) is not required.

distinct() has a similar situation: if the stream has an encounter order, then for multiple identical input elements, distinct() must emit the first of these, And for unordered streams, it can emit any element-a much more efficient parallel implementation is also possible.

A similar situation occurs when using collect() aggregation. If a collect(groupingBy()) operation is performed on an unordered stream, the elements corresponding to any key must be provided to the downstream collector in the order they appear in the input.

This order is generally of little significance to the application, and no order is meaningful. In these cases, it may be best to choose a concurrent collector (such as groupingByConcurrent()) that ignores encounter order and lets all threads collect directly into a shared concurrent data structure (such as ConcurrentHashMap), rather than having each thread collect into its own intermediate map and then merge the intermediate maps (which can be very costly).