pradeep has posted 1 posts at DZone. View Full User Profile

Introduction to JOODAMP Framework for Bulk data processing

02.20.2009
| 13983 views |
  • submit to reddit

This article gives a brief introduction about JOODAMP (Java Pooled Data Multi Processor https://joodamp.dev.java.net ) framework which started from scratch. JOODAMP is used for processing huge amount of data. It can be easily integrated with standalone java application or Enterprise java application. It’s very simple and easy to integrate with any application. Only base framework is done and we need lot of comments and new ideas to extends this to fit all the users needs.

JOODAMP process data using multiple java threads, manages the thread and writes the output to a file or any other system say database. The class diagram of the framework is given below.

JOODAMP Framework

MultiProcessor is the main class used by the application to configure JOODAMP. Pool is a shard memory which contains many input queues and output queues. The data processing thread gets the required data from the pool and also the output data is written to the output queue.

There are separate threads for all the output and input queues to marshall and unmarshall the data from the queue to file and vice versa.

We will take an example of summing two numbers and writing the output to another file.

Below is the input file content
10, 20
30, 40
...

First we need to configure JOODAMP with number of queues we are using and initializing it. Below is the code to do this.

MultiProcessor processor = new MultiProcessor(org.joodamp.multiprocessor.testing.TestTask.class,3,true,null);
processor.addInputQueue("input",10000,new FileInputStream("c:\\input.txt"),new InputDO(),MultiProcessor.DATASTORE_FILE);
processor.addOutputQueue("output",10000,new FileOutputStream("c:\\output.txt"),null,MultiProcessor.DATASTORE_FILE);
processor.start();

The first line creates an Multiprocessor object and the first parameter represents the class name of the task(Data processing thread) which process the data, second parameter is the queue size, third is the thread count, fourth represent whether it’s a synchronous or asynchronous process and the last parameter is the listener used when it’s a asynchronous process.Then we need to add the input queues and output queues to the initialize pool using addInputQueue() andaddOutputQueue() method.
The first parameter for the method is a logical name for that particular queue; second parameter is the InputStream which specify the input resource and the last parameter tell the input resource is a file. Similarly we add the ouput queue to the pool with OutputStream as the output resource.

Internally for each input queues and output queues threads are created and data are added into the queues, So that Task thread (Data processing thread) can take the data for processing.

Input data and Ouput data are stored in DOs which implements DataInput and DataOutput interfaces and implements marshall and unMarshall method respectively.

The unMarshall method isused to splits the string into two and store it in particular variable.

Marshall does the serialization of the data from the member variable and returns an object.

Below is the code for boththe DOs.

InputDO

public class InputDO implements DataInput,DataFactory{
public int a;
public int b;

/** Creates a new instance of InputDO */
public InputDO() {
}

public void unMarshal(Object obj) {
String str = (String)obj;
StringTokenizer st = new StringTokenizer(str,",");
st.hasMoreElements();
a = Integer.parseInt(st.nextToken());
st.hasMoreElements();
b = Integer.parseInt(st.nextToken());
}
public Data createInstance() {
return new InputDO();
}
}

OutputDO

public class OutputDO implements DataOutput{
public int a;
public int b;
public int sum;

/*** Creates a new instance of OutputDO*/
public OutputDO() {
}

public Object marshal() {
return a +","+b+","+sum;
}
}

DataFactory interface is used by the framework to create input dataobject.

Now we have to create Task (Data processing thread) which process all the data, this class should extend Task class and override execute method.

Below is the example code

public class TestTask extends Task{
PoolContext cxt;
Pool pool;

/*** Creates a new instance of TestTask */
public TestTask() {
}
public void init(PoolContext cxt) {
this.cxt = cxt;
pool = cxt.getPool();
}
public void execute() {
System.out.println("Thread started...");
int data=0;
InputDO input = null;
while(pool.isInputDataAvailable("input")) {
input = (InputDO) pool.getInputData("input");
if(input != null) {
OutputDO output = new OutputDO();
output.sum = input.a + input.b;
output.a = input.a;
output.b = input.b;
pool.addOutputData("output",output);
data++;
}
}
System.out.println(data);
}

public void destroyTask() {
}
}

In the above program we create a pool context and from that we get a reference to a Pool. Inside the execute method we have to do the dataprocessing logic. First we have to check whether the input queue is having any data or not and then we can get the data from the queue using getInputData() method which returns DataInput object using that we get the data, compute the sum of two numbers, create a outputDO, set all the values and add the outputDO to the output queue.

The output file is shown below

10, 20, 30
30, 40, 70
...

The framework takes care of loading all the data inside input queue, writing all the data from output queue and manage all the task threads.

Published at DZone with permission of its author, pradeep Duraisamy.

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)

Comments

Andy Leung replied on Wed, 2009/02/25 - 8:22am

Well, the BPM in Glassfish v2.2 and v3 give you what you've mentioned using GUI tool. Besides, my 2 cent is to mention also NIO in this sense, otherwise system is not very responsive if developers uses traditional I/O. Remember! Just my 2 cent :)

pradeep Duraisamy replied on Thu, 2009/02/26 - 5:56am in response to: Andy Leung

Thanks for your reply. But what is NIO can you explain a bit detail.

Ralf Wisser replied on Thu, 2009/05/28 - 1:12pm

NIO, is a collection of Java programming language APIs that offer features for intensive I/O operations, see http://en.wikipedia.org/wiki/New_I/O

lucas hal replied on Mon, 2009/06/29 - 2:30pm

Lets start from beginning. I have started earn $$ with sports betting about 3 years ago. Firstly I was very very bed thinking about this kind of job. Lots of people said me that sports btting is just gambling and nothing more. That I have started read a lot of betting and bookmakers firms, bonuses ect. First what I found what very interested me was betfair and Betfair trading system it is nothing but exchange bets with other members of this system. It is amazing option earn lots of money. One more think what I love in betfair is great bonus with Sports betting. Firstly when You wanna start betting You have to choose of the betting system and next find good site with livescore. When You have more information about sport betting You could think about change Your bookmaker to something new like bwin or bet365. Last think what I wanna tell You is Good LUCK, read and learn as much as You can and finaly You got You succes with bookies.Best regards !!

exinco ed replied on Thu, 2009/07/02 - 7:12am

nice platform. i'm just an end user even i try to be a real programmer

Ian Lim replied on Sun, 2009/11/15 - 11:50am

Any difference between this and GUI ETL tools like Pentaho Data Analysis or Talend?

If the difference is not much, how to convince people to sway from GUI tools to writing codes?

pradeep Duraisamy replied on Tue, 2010/06/29 - 3:09am

This framework is light weight and can be used by any java application. Just include the jar to your project and you are ready to use it. This is for processing large dataset and does not intended for integration.

Comment viewing options

Select your preferred way to display the comments and click "Save settings" to activate your changes.