Václav is a programming enthusiast who's constantly seeking ways to make development more effective and enjoyable. He's particularly interested in server-side Java technologies, distributed systems, concurrency, agile methodologies, modern programming languages and DSLs. He works for JetBrains as a senior software developer and a technology evangelist. He is also a board member of the JetBrains Academy. On the side, he's leading the GPars project, an opensource concurrency library, and investigates the domains of neural networks, evolutionary programming and data mining. You can check out his blog or follow him on twitter. [dzone] Václav is a DZone Zone Leader and has posted 45 posts at DZone. View Full User Profile

Flowing With the Data

06.02.2010
| 8171 views |
  • submit to reddit

I'd like to elaborate a bit on dataflow concurrency, one of the concurrency abstractions GPars provides to the Groovy enhusiasts out there. And we'll only focus on the easier part of it - dataflow tasks and variables, leaving the domain of dataflow operators for a later post.
I fell for dataflow quite early on, but it's only now when I begin to realize the power and flexibility of the approach in its entirety. This article should summarize my current feelings about dataflow and lay grounds for some more investigation in the future. The fact that two highly respected and inovative concurrenty-focused projects I watch, Clojure and Scala (through Akka) added support for dataflow could indicate we're on an exciting track here.

Surprisingly enough, most of the power comes from the simlicity of the dataflow concept. It all boils down to allowing safe communication between concurrent tasks through variables with well-defined and constrained semantics. If I wanted to be brief and explain the concept to my followers on Twitter, I'd say: Dataflow abstraction consists of concurrently run tasks communicating through single-assignment variables with blocking reads. Let's look at an example:

public Map estimateProject(int numOfEntities) {
    def estimates = new DataFlows()
    
    task {
        log 'Calculating total duration estimate'
        estimates.duration = [estimates.dba, estimates.ui].max() + 1
    }

    task {
        log 'Calculating db admin time'
        estimates.dba = (numOfEntities * 3 / 20)
    }

    task {
        log 'Calculating UI designer time'
        estimates.ui = ((5 + numOfEntities) * 3 / 5)
    }

    task {
        log 'Calculating the cost'
        estimates.cost = 500 + numOfEntities * 9
    }

    return [cost : estimates.cost, duration : estimates.duration]
}

We've created a method that will calculate a project duration and cost estimate based on the number of business entities identified during requirement analysis (If only estimates were that easy in reality). Now, the important piece is that lots of the steps can be calculated concurrently and dependencies among the steps are expressed as read/write operations on particular dataflow variables.
Our function, after being invoked, will start four concurrent tasks, which may or may not map to four physical threads under the covers. The thread scheduler may schedule the tasks to run in any order, however, the semantics of dataflow variables will only allow for legal schemas to be used in reallity.
For example, the total duration estimate (calculated by the first task) requires dba and ui estimates to be calculated first, so as to be able to start the total duration estimate calculation. It then stores the duration estimate in the estimates.duration variable, which, after calculated, will in turn allow the main thread of execution to insert the value of estimates.duration into the result map.
You can see how the values flow seamlessly among tasks and dataflow variables enforce proper ordering of the activities.

Benefits of dataflow

As you can see from the semantics of the dataflow variables, dataflow concurrency naturally avoids race-conditions, including live locks. You can't get multiple threads fighting for a dataflow variable - the first write wins, all reads get the same value once available.
As for deadlocks, they also become much less harmful than usual. Dataflow makes deadlocks deterministic. With the same data input you can reproduce a deadlock any time on any system, irrespective of the underlying thread scheduler and the actual chosen threading scheme. To put it in simple words, if you don't get deadlock on your workstation during testing, you're safe to roll out your baby into production, at least when it comes to deadlocks.
Can you imagine how big win that is? Think of any plain Java threading-based application you wrote and how confident you were about the code being not vulnerable to deadlocks.

Obviously, there are limitations to what is reasonable to aim for with this concept. The need for manual split of the overall goal into smaller parallelizable tasks makes use of parallel collections or fork/join more natural on collections or hierarchical data respectively. And for large algorithms abstractions, which are more explicit about the communication channels, like actors or CSP, might offer more convenient and robust programming model. Yet there remains a great deal of work suitable for dataflow concurrency. I personally find dataflow concurrency very usefeul whenever I see natural rules for manual parallelization of the work.

A doze of theory

Let's introduce a bit of theory into our discussion here. Apparently, dataflow variables introduce partial ordering on our concurrent tasks. Look at this example:

//task 1
task {
    do_A
    read_dataflow
    do_B
}

//task 2
task {
    do_C
    write_dataflow
    do_D
}

Now we have two tasks, where task 2 ocassionally writes a value into a dataflow variable, which is read by task 1. The read/write operation on the dataflow variable splits both tasks into parts A, B, C and D. Possible execution of these parts is partially ordered. Let's use the "<" symbol to indicate that if x < y then x must run before y.
Obviously A < B and C < D, since all tasks by definition process their own body sequentially. Now with our dataflow variable in place we also gain C < B. This means that thanks to our dataflow variable, B will only be allowed to run after C and so we don't need to do anything extra to ensure B has available all the data it needs from C.

Deadlocks, we know about you all

If you created a graph with nodes representing parts of the tasks and directional edges indicating the partial ordering, you would get a better insight into what's happening. An edge in the graph shows that the target node (part of a task) depends on data stored into a dataflow variable by the source node.

 

As you may have guessed by now, you can quickly detect deadlocks just by searching for ordered cycles in the graph. Here's an example:

//task 1
task {
    read_dataflow_2
    do_A
    do_B
    write_dataflow_1
}

//task 2
task {
    do_C
    read_dataflow_1
    do_D
    write_dataflow_2
}

A, B and D form a cycle and none of them can ever proceed.

Summary

This is all for today. Now, go ahead an let me know your thoughts. I'll be very happy to receive comments, suggestions and ideas, which would help me explore the field further. In the near future my plan is to elaborate a bit more on dataflow tasks and their relationship to continuations.

From http://www.jroller.com/vaclav/entry/flowing_with_the_data

Published at DZone with permission of its author, Václav Pech.
Tags: