Performance Zone is brought to you in partnership with:

Pierre-Yves Saumont has managed his own company for 15 years, specializing in natural language processing. At the same time, he has been the publishing manager for the French subsidiary of a leading American computer books publisher. He wrote about 30 books about computers and software development in Java. Since 2008, he works for Alcatel-Lucent Submarine Networks as an R&D Software engineer, and he is responsible for the architecture of several projects such as distributed application framework, functional framework and functional DSML (Domain Specific Modeling Language). He is also a double bass Jazz player. Pierre-yves has posted 10 posts at DZone. You can read more from them at their website. View Full User Profile

What's Wrong in Java 8, Part III: Streams and Parallel Streams

05.20.2014
| 24387 views |
  • submit to reddit

When the first early access versions of Java 8 were made available, what seemed the most important (r)evolution were lambdas. This is now changing and many developers seem to think now that streams are the most valuable Java 8 feature. And this is because they believe that by changing a single word in their programs (replacing stream with parallelStream) they will make these programs work in parallel. Many Java 8 evangelists have demonstrated amazing examples of this. Is there something wrong with this? No. Not something. Many things:

  • Running in parallel may or may not be a benefit. It depends what you are using this feature for.
  • Java 8 parallel streams may make your programs run faster. Or not. Or even slower.
  • Thinking about streams as a way to achieve parallel processing at low cost will prevent developers to understand what is really happening. Streams are not directly linked to parallel processing.
  • Most of the above problems are based upon a misunderstanding: parallel processing is not the same thing as concurrent processing. And most examples shown about “automatic parallelization” with Java 8 are in fact examples of concurrent processing.
  • Thinking about map, filter and other operations as “internal iteration” is a complete nonsense (although this is not a problem with Java 8, but with the way we use it).

So, what are streams

According to Wikipedia:

“a stream is a potentially infinite analog of a list, given by the inductive definition:

data Stream a = Cons a (Stream a)

Generating and computing with streams requires lazy evaluation, either implicitly in a lazily evaluated language or by creating and forcing thunks in an eager language.”

One most important think to notice is that Java is what Wikipedia calls an “eager” language, which means Java is mostly strict (as opposed to lazy) in evaluating things. For example, if you create a List in Java, all elements are evaluated when the list is created. This may surprise you, since you may create an empty list and add elements after. This is only because either the list is mutable (and you are replacing a null reference with a reference to something) or you are creating a new list from the old one appended with the new element.

Lists are created from something producing its elements. For example:

 
 List<Integer> list = Arrays.asList(1, 2, 3, 4, 5);

Here the producer is an array, and all elements of the array are strictly evaluated.

It is also possible to create a list in a recursive way, for example the list starting with 1 and where all elements are equals to 1 plus the previous element and smaller than 6. In Java < 8, this translates into:

List<Integer> list = new ArrayList<Integer>();
for(int i = 0; i < 6; i++) {
  list.add(i);
}

One may argue that the for loop is one of the rare example of lazy evaluation in Java, but the result is a list in which all elements are evaluated.

What happens if we want to apply a function to all elements of this list? We may do this in a loop. For example, if with want to increase all elements by 2, we may do this:

for(int i = 0; i < list.size(); i++) {
  list.set(i, list.get(i) * 2);
}

However, this does not allow using an operation that changes the type of the elements, for example increasing all elements by 10%. The following solution solves this problem:

List<Double> list2 = new ArrayList<Double>();
for(int i = 0; i < list.size(); i++) {
  list2.add(list.get(i) * 1.2);
}

This form allows the use of a the Java 5 for each syntax:

List<Double> list2 = new ArrayList<>();
for(Integer i : list) {
  list2.add(i * 1.2);
}

or the Java 8 syntax:

List<Double> list2 = new ArrayList<>();
list.forEach(x -> list2.add(x * 1.2));

So far, so good. But what if we want to increase the value by 10% and then divide it by 3? The trivial answer would be to do:

List<Double> list2 = new ArrayList<>();
list.forEach(x -> list2.add(x * 1.2));
List<Double> list3 = new ArrayList<>();
list2.forEach(x -> list3.add(x / 3));

This is far from optimal because we are iterating twice on the list. A much better solution is:

List<Double> list2 = new ArrayList<>();
for(Integer i : list) {
  list2.add(i * 1.2 / 3);
}

Let aside the auto boxing/unboxing problem for now. In Java 8, this can be written as:

List<Double> list2 = new ArrayList<>();
list.forEach(x -> list2.add(x * 1.2 / 3));

But wait... This is only possible because we see the internals of the Consumer bound to the list, so we are able to manually compose the operations. If we had:

List<Double> list2 = new ArrayList<>();
list.forEach(consumer1);
List<Double> list3 = new ArrayList<>();
list2.forEach(consumer2);

How could we know how to compose them? No way. In Java 8, the Consumer interface has a default method andThen. We could be tempted to compose the consumers this way:

list.forEach(consumer1.andThen(consumer2));

but this will result in an error, because andThen is defined as:

default Consumer<T> andThen(Consumer<? super T> after) {
  Objects.requireNonNull(after);
  return (T t) -> { accept(t); after.accept(t); };
}

This means that we can't use andThen to compose consumers of different types.

In fact, we have it all wrong since the beginning. What we need is to bind the list to a function in order to get a new list, such as:

Function<Integer, Double> function1 = x -> x * 1.2;
Function<Double, Double> function2 = x -> x / 3;
list.bind(function1).bind(function2);

where the bind method would be defined in a special FList class like:

public class FList<T> {
  final List<T> list;

  public FList(List<T> list) {
    this.list = list;
  }

  public <U> FList<U> bind(Function<T, U> f) {
    List<U> newList = new ArrayList<U>();
    for (T t : list) {
      newList.add(f.apply(t));
    }
    return new FList<U>(newList);
  }
}

and we would use it as in the following example:

new Flist<>(list).bind(function1).bind(function2);

The only trouble we have then is that binding twice would require iterating twice on the list. This is because bind is evaluated strictly. What we would need is a lazy evaluation, so that we could iterate only once.

The problem here is that the bind method is not a real binding. It is in reality a composition of a real binding and a reduce. "Reducing" is applying an operation to each element of the list, resulting in the combination of this element and the result of the same operation applied to the previous element. As there is no previous element when we start from the first element, we start with an initial value. For example, applying (x) -> r + x, where r is the result of the operation on the previous element, or 0 for the first element, gives the sum of all elements of the list. Applying () -> r + 1 to each element, starting with r = 0 gives the length of the list. (This may not be the more efficient way to get the length of the list, but it is totally functional!)

Here, the operation is add(element) and the initial value is an empty list. And this occurs only because the function application is strictly evaluated.

What Java 8 streams give us is the same, but lazily evaluated, which means that when binding a function to a stream, no iteration is involved!

Binding a Function<T, U> to a Stream<T> gives us a Stream<U> with no iteration occurring. The resulting Stream is not evaluated, and this does not depend upon the fact that the initial stream was built with evaluated or non evaluated data.

In functional languages, binding a Function<T, U> to a Stream<T> is itself a function. In Java 8, it is a method, which means it's arguments are strictly evaluated, but this has nothing to do with the evaluation of the resulting stream. To understand what is happening, we can imagine that the functions to bind are stored somewhere and they become part of the data producer for the new (non evaluated) resulting stream.

In Java 8, the method binding a function T -> U to a Stream<T>, resulting in a Stream<U> is called map. The function binding a function T -> Stream<U> to a Stream<T>, resulting in a Stream<U> is called flatMap.

Where is flatten?

Most functional languages also offer a flatten function converting a Stream<Stream<U>> into a Stream<U>, but this is missing in Java 8 streams. It may not look like a big trouble since it is so easy to define a method for doing this. For example, given the following function:

Function<Integer, Stream<Integer>> f = x -> Stream.iterate(1, y -> y + 1).limit(x);
Stream<Integer> stream = Stream.iterate(1, x -> x + 1);
Stream<Integer> stream2 = stream.limit(5).flatMap(f);

System.out.println(stream2.collect(toList()))

to produce:

[1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5]

Using map instead of flatMap:

Stream<Integer> stream = Stream.iterate(1, x -> x + 1);
Stream<Integer> stream2 = stream.limit(5).map(f);

System.out.println(stream2.collect(toList()))

will produce a stream of streams:

[java.util.stream.SliceOps$1@12133b1, java.util.stream.SliceOps$1@ea2f77,
java.util.stream.SliceOps$1@1c7353a, java.util.stream.SliceOps$1@1a9515, 
java.util.stream.SliceOps$1@f49f1c]

Converting this stream of streams of integers to a stream of integers is very straightforward using the functional paradigm: one just need to flatMap the identity function to it:

System.out.println(stream2.flatMap(x -> x).collect(toList()));

It is however strange that a flatten method has not been added to the stream, knowing the strong relation that ties map, flatMap, unit and flatten, where unit is the function from T to Stream<T>, represented by the method:

Stream<T> Stream.of(T... t)

When are stream evaluated?

Streams are evaluated when we apply to them some specific operations called terminal operation. This may be done only once. Once a terminal operation is applied to a stream, is is no longer usable. Terminal operations are:

  • forEach
  • forEachOrdered
  • toArray
  • reduce
  • collect
  • min
  • max
  • count
  • anyMatch
  • allMatch
  • noneMatch
  • findFirst
  • findAny
  • iterator
  • spliterator

Some of these methods are short circuiting. For example, findFirst will return as soon as the first element will be found.

Non terminal operations are called intermediate and can be stateful (if evaluation of an element depends upon the evaluation of the previous) or stateless. Intermediate operations are:

  • filter
  • map
  • mapTo... (Int, Long or Double)
  • flatMap
  • flatMapTo... (Int, Long or Double)
  • distinct
  • sorted
  • peek
  • limit
  • skip
  • sequential
  • parallel
  • unordered
  • onClose

Several intermediate operations may be applied to a stream, but only one terminal operation may be use.

So what about parallel processing?

One most advertised functionality of streams is that they allow automatic parallelization of processing. And one can find the amazing demonstrations on the web, mainly based of the same example of a program contacting a server to get the values corresponding to a list of stocks and finding the highest one not exceeding a given limit value. Such an example may show an increase of speed of 400 % and more.

But this example as little to do with parallel processing. It is an example of concurrent processing, which means that the increase of speed will be observed also on a single processor computer. This is because the main part of each “parallel” task is waiting. Parallel processing is about running at the same time tasks that do no wait, such as intensive calculations.

Automatic parallelization will generally not give the expected result for at least two reasons:

  1. The increase of speed is highly dependent upon the kind of task and the parallelization strategy. And over all things, the best strategy is dependent upon the type of task.
  2. The increase of speed in highly dependent upon the environment. In some environments, it is easy to obtain a decrease of speed by parallelizing.

Whatever the kind of tasks to parallelize, the strategy applied by parallel streams will be the same, unless you devise this strategy yourself, which will remove much of the interest of parallel streams. Parallelization requires:

  • A pool of threads to execute the subtasks,
  • Dividing the initial task into subtasks,
  • Distributing subtasks to threads,
  • Collating the results.

Without entering the details, all this implies some overhead. It will show amazing results when:

  • Some tasks imply blocking for a long time, such as accessing a remote service, or
  • There are not many threads running at the same time, and in particular no other parallel stream.

If all subtasks imply intense calculation, the potential gain is limited by the number of available processors. Java 8 will by default use as many threads as they are processors on the computer, so, for intensive tasks, the result is highly dependent upon what other threads may be doing at the same time. Of course, if each subtask is essentially waiting, the gain may appear to be huge.

The worst case is if the application runs in a server or a container alongside other applications, and subtasks do not imply waiting. In such a case, (for example running in a J2EE server), parallel streams will often be slower that serial ones. Imagine a server serving hundreds of requests each second. There are great chances that several streams might be evaluated at the same time, so the work is already parallelized. A new layer of parallelization at the business level will most probably make things slower.

Worst: there are great chances that the business applications will see a speed increase in the development environment and a decrease in production. And that is the worst possible situation.

Edit: for a better understanding of why parallel streams in Java 8 (and the Fork/Join pool in Java 7) are broken, refer to these excellent articles by Edward Harned:

What streams are good for

Stream are a useful tool because they allow lazy evaluation. This is very important in several aspect:

  • They allow functional programming style using bindings.
  • They allow for better performance by removing iteration. Iteration occurs with evaluation. With streams, we can bind dozens of functions without iterating.
  • They allow easy parallelization for task including long waits.
  • Streams may be infinite (since they are lazy). Functions may be bound to infinite streams without problem. Upon evaluation, there must be some way to make them finite. This is often done through a short circuiting operation.

What streams are not good for

Streams should be used with high caution when processing intensive computation tasks. In particular, by default, all streams will use the same ForkJoinPool, configured to use as many threads as there are cores in the computer on which the program is running.

If evaluation of one parallel stream results in a very long running task, this may be split into as many long running sub-tasks that will be distributed to each thread in the pool. From there, no other parallel stream can be processed because all threads will be occupied. So, for computation intensive stream evaluation, one should always use a specific ForkJoinPool in order not to block other streams.

To do this, one may create a Callable from the stream and submit it to the pool:

List<SomeClass> list = // A list of objects
Stream<SomeClass> stream = list.parallelStream().map(this::veryLongProcessing);
Callable<List<Integer>> task = () -> stream.collect(toList());
ForkJoinPool forkJoinPool = new ForkJoinPool(4);
List<SomeClass> newList = forkJoinPool.submit(task).get()

This way, other parallel streams (using their own ForkJoinPool) will not be blocked by this one. In other words, we would need a pool of ForkJoinPool in order to avoid this problem.

If a program is to be run inside a container, one must be very careful when using parallel streams. Never use the default pool in such a situation unless you know for sure that the container can handle it. In a Java EE container, do not use parallel streams.

Previous articles

What's Wrong with Java 8, Part I: Currying vs Closures

What's Wrong in Java 8, Part II: Functions & Primitives

Published at DZone with permission of its author, Pierre-yves Saumont.

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)

Comments

Hendy Irawan replied on Tue, 2014/05/20 - 12:28pm

Pierre-yves, I second many of your points. My immediate question is: Why now?

JDK 8 milestones have been available since 2 years ago. Had you published your series (voiced your observations) throughout the development period, many of these issues could have been improved (or at least considered).

Edward Harned replied on Tue, 2014/05/20 - 2:50pm

 Great article, you’re batting 1000. 


The real problem with parallel streams is that the underlying structure (fork/join) is defective. I’ve been writing a critique about this faulty framework for four years now. http://coopsoft.com/ar/Calamity2Article.html  Parallel streams expose the awful decision by Oracle not to build a parallel engine themselves but to rely on an academic experiment underpinning a research paper as the basis of the parallel option.

Pierre-yves Saumont replied on Wed, 2014/05/21 - 1:54am in response to: Hendy Irawan

Hendy, There are several answers to your question. First, my experience with submitting remarks about the evolution of Java in the past has given absolutely no result. But this was not a surprise. One more important reason is that I was only assigned Java 8 evaluation at work in September 2013. I could have learn about Java 8 before in my spare time, but I had many other things to do, among which developing in other (more functional) languages.

But the main reason is perhaps that I do not think that Java 8 is wrong in itself. What decided me to write this series are the many examples I read about Java 8. I was using Java with the functional paradigm before Java 8. It has a cost. We have our own functional framework written in Java 6, with functions, immutable collections (missing in Java 8), monads (the most useful missing in Java 8), streams, actors and more. Most of the examples I have read about try to show how simple it is to write programs using the functional paradigm with Java 8 by just throwing in some magic words such as Optional or parallelStream. This is not true. There is still a big cost. It is worth it, but we must know the limitations and how to work around them. In the last article in the series, I will summarize what at think are good practices when using the functional paradigm with Java 8.

Pierre-yves Saumont replied on Wed, 2014/05/21 - 2:40am in response to: Edward Harned

Edward, thanks for you comment. By the way, your article is really a must read for all programmers considering using parallel streams and/or the Fork/Join framework. Having developed our own parallel engine, I feel quite frustrated each time I heard people considering Java 8 parallel stream as the right drop in solution for their problem.

By the way, I edited my article to include links to your two articles.

Peter Huber replied on Fri, 2014/06/06 - 3:57am

Blogs have to be filled, bytes have to fill the network cables, electrons have to be pushed around - so many usefull blog entries these days...

I don't get why we cannot do more than one Type in Transforming... Use "map" maybe?


        List list = Arrays.asList(1, 2, 3, 4, 5);
        
        List intermediate = new ArrayList<>();
        List end = new ArrayList<>();
        
        list.stream()
                .map(i -> (double)i * 1.2)
                .forEach(d -> intermediate.add(d));
        
        System.out.println(intermediate);
        
        list.stream()
                .map(i -> (double)i * 5.2)
                .map(d -> (long)(d/3))
                .forEach(d -> end.add(d));
        
        System.out.println(end);

And about Fork/Join
  1. Please, you cannot accuse the Java designers of making us, the developers, stupid programms with a given API. Hilarious! It's not their fault if developers are not willing to look inside the black box to understand what the API does and how it works, what the limits are. Please go on and accuse hibernate, spring, etc. then as well! And please don't stop, accuse Oracle of their DB and SQL and and and...If you try to see a hammer in any tool you use, you might end up trying to put a nail into a wall with a soldering iron.
  2. Fork/Join a calamity? Accademic? I trust Doug Lea, if your read his code and try to understand it you know, that he knows what he's doing. You can still go on and try to be better with your own lib, I'd suggest. Try to match the optimization level the Java guys reached in for instance LongAdder (http://blog.palominolabs.com/2014/02/10/java-8-performance-improvements-longadder-vs-atomiclong/ ), then let's talk again

Pierre-yves Saumont replied on Fri, 2014/06/06 - 8:12am in response to: Peter Huber

>I don't get why we cannot do more than one Type in Transforming... Use "map" maybe?

Using map twice as I did in the following example (I used bind as a generic name for map or flatMap, depending on the type of the function):

list.bind(function1).bind(function2);

is not composing functions. This is applying successively two functions. What is much more interesting is to be able to compose functions without applying them, in order to build new functions that will be reused later. This is what programming is all about. Otherwise, it is only scripting.

What I was saying is that a map (and flatMap) method should have been added to List. Instead, we have to use Stream, which can't be reused.

But the main interest of Stream is that it is lazily evaluated. This allows deferred evaluation, and more specifically parallel evaluation.

>And about Fork/Join

You do not give any valid reason. You trust Doug Lea. This kind of religious reason is irrelevant. But you are right that every programmer should look at the code before using it. Did you do so? If you did, you can then have good technical reasons and you should expose them. But instead, you are just saying things like:

>Please go on and accuse hibernate, spring, etc. then as well! And please don't stop, accuse Oracle of their DB and SQL and...

Did I wrote that Hibernate or Spring or Oracle DB was wrong? Why would you invent such lies instead of explaining us good use cases for parallel streams?

Use parallel streams on a single core machine and see what happen. Use parallel streams on a server and see what happens. Use parallel streams more than once at the same time in any application and see what happens.

>Try to match the optimization level the Java guys reached in for instance LongAdder then let's talk again

Are you able to match this optimization level yourself? Because if you are not, and if you are right that no one should argue before being able to do so, guess what you should have done!

Soylent Green replied on Fri, 2014/06/06 - 1:59pm

I understand that you're angry, Pierre-yves, but actually Peter has a point in claiming it's not the APIs Designers fault if APIs are used the "wrong" way. And I bet he just wanted to give an example with Spring and Oracle which are both tremendously capable instruments, if you know how to use them. if not, then it's the same as with parallel streams - you'll just get a bleeding nose. But still it's not Springs/Oracles fault if Devs don't rtfm...

Peter Huber replied on Fri, 2014/06/06 - 2:34pm in response to: Soylent Green

 Thanks Soylent...people different than me might come to the conclusion that there's a reason people at Oracle do not listen ;-)

Anyway talking about function composition...I go for the straight forward approach, seem like easy...but I bet - yes I see it's not Lisp I know I did Lisp myself - it's not completly correct in terms of functional theoretical constructs...

  Function<Integer,Long> reuseMe = compose(i -> (double)i * 5.2, d->(long)(d/3));   list.stream()   .map(compose(reuseMe, lo -> lo*17))   .forEach(d -> end.add(d));   }

/**compose to functions alpha-intermediate intermediate-omega */

public static <A,I,O> Function<A,O> compose(final Function<A,I> a, final Function<I,O> b) {   return new Function<A,O>() {   @Override   public O apply(A t) {   return b.apply(a.apply(t));   }   };   }

Pierre-yves Saumont replied on Sun, 2014/06/08 - 4:27am in response to: Soylent Green

You're mostly right except that you're slightly wrong when you say that parallel streams may be useful IF you know how to use them. Oracle tells us how to use them. Oracle demonstrated uses of parallel streams. So everybody knows how parallel streams are intended to be used. I would say that parallel streams are useful if you know WHEN to use them. And most importantly when NOT to use them.

My answer to this question is quite simple: I cannot find any business use case for parallel streams. I do not say that it is not an interesting piece of software. It surely is. However, I could not find any useful use case for it.

I found two cases when parallel streams are efficient: parallelizing waiting tasks, and parallelizing intensive computing tasks on an otherwise idling machine with more than one processor.

The first use case was demonstrated several times with an example of tasks accessing a remote service. This kind of use case is fine, except that it forces the user to use parallel processing although concurrent processing would be much more efficient. If you have hundred tasks accessing a remote service (resulting mostly in tasks waiting rather than computing), it's inefficient to limit the number of threads to the number of available hardware threads. Using parallel streams will give increased performance compared to mono threading, but using a cached thread pool with as many threads as needed will perform much better.

The second use case (parallelizing intensive computing tasks on an otherwise idling machine with more than one processor) is purely theoretical. No one will ever have to implement this use case. Probably at least ninety percent of Java developers are business programmers creating task running on a J2EE server. Using parallel streams on a J2EE server, although “legal”, is nonsense because to get profit from parallel streams, you've got to master the context (in term of thread use). And you can't do this with J2EE. A J2EE server is already highly parallelizing tasks, so there is absolutely no benefit to have any new layer of parallelization for computing intensive tasks.

So my question is: can one show us some business use cases where parallel streams are efficient. Of course, the fact that I could not find any does not mean that parallel streams are bad. I would really love to see parallel streams shinning. But all examples I could see where biased. Most often in these examples, parallel streams are used to solve software problems, i.e. mostly academic problems which have nothing in common with real business problems developers have to solve.

In all use cases I have (and we I am not working under J2EE), I have seen that parallel streams where far less efficient than the solutions that had been developed before. In some situations, they even were a disaster.

So you can argue with good counter examples and valid argument, or you cant rant about blasphemy. You choice. But believe it or not, it will not make me angry!

Comment viewing options

Select your preferred way to display the comments and click "Save settings" to activate your changes.