Jan has posted 1 posts at DZone. View Full User Profile

Java I/O streams and RMI

02.07.2010
| 12748 views |
  • submit to reddit

Perhaps every programmer working with Java RMI (Remote Method Invocation) comes to the point when he/she recognizes that java.io.Input/OutputStream classes do not implement java.io.Serializable interface - so that they cannot be used as remote methods' arguments or return values. I/O streams are usually tightly bound to a file descriptor or a network connection so its reasonable that they should not be used outside the local JVM.

In the project I have worked on there was a requirement to expose CMS system interface via RMI: however some methods took InputStream instance as document content. I have decided to use Spring and its AOP support to create an interceptor that pre-process service call arguments on the caller's side and another to post-process return value on the callee's side.

The idea is straightforward: we will wrap InputStream-s in a wrapper class implementing remote interface and instead of the InputStream we will pass an RMI stub performing remote calls back to the wrapper.

I/O streams' contracts as remote interfaces

 

Both input and output streams can be closed:

public interface Closeable extends Remote {

public void close() throws IOException, RemoteException;
}

Contract of a readable stream:

public interface Readable extends Closeable {

/**
* Reads at most count bytes from the stream.
* @param count
* @return data read as a byte array
* @throws IOException
* @throws RemoteException
*/
public byte[] read(int count) throws IOException, RemoteException;
}

Contract of an output stream:

public interface Writeable extends Closeable {

/**
* Writes given byte block to the stream.
* @param data
* @throws IOException
* @throws RemoteException
*/
public void write(byte data[]) throws IOException, RemoteException;
}

 

Implementing client-side proxies

This is an implementation of a java.io.InputStream subclass to be sent via RMI - the only need to implement an InputStream is to know how to read next byte of data. We will fetch data from the remote side in larger blocks, naturally. The buffer size grows up silently upto the specified size.

public class RemoteInputStream extends InputStream implements Serializable {

private static final long serialVersionUID = 1L;

private final Readable source;
private byte buffer[];
private int pos;
private int exp;
private final static int MAX_EXP = 6; // max fetch size = 2^MAX_EXP (64 KB)

RemoteInputStream(Readable source) {
this.source = source;
}

public int read() throws IOException {

if (pos == -2) return -1;
if (buffer == null || pos > buffer.length - 1) {
buffer = source.read(1024 *(exp > MAX_EXP ? 1 << MAX_EXP : 1 << exp++)); // max 64 KB fetch
pos = 0;
if (buffer.length == 0) {
pos = -2;
return -1;

}
}
return buffer[pos++] & 0xff;
}

public void close() throws IOException {
source.close();
}
}

The RemoteOutputStream's implementation is analogous - it can be found in attached project bundle.

 

Implementing server-side wrapper

public class RemoteInputStreamServer implements Readable {

private final InputStream in;

private static final byte EMPTY_BUFFER[] = new byte[0];

RemoteInputStreamServer(InputStream in) {
this.in = in;
}

public byte[] read(int count) throws IOException, RemoteException {

final byte buffer[] = new byte[count];
final int actualCount = in.read(buffer);
if (actualCount == count)
return buffer;
else if (actualCount == -1)
return EMPTY_BUFFER;
else {
final byte data[] = new byte[actualCount];
System.arraycopy(buffer, 0, data, 0, data.length);
return data;
}
}

public void close() throws IOException, RemoteException {

try {
in.close();
} catch (IOException ioex) {
throw ioex;
} finally {
UnicastRemoteObject.unexportObject(this, true);
}
}

public static RemoteInputStream wrap(InputStream in) throws RemoteException {

return new RemoteInputStream((Readable)
UnicastRemoteObject.exportObject(new RemoteInputStreamServer(in)));
}
}

Usage

The use of the introduced classes is simple - for given InputStream instance we will create remote proxy using the wrap(InputStream) static factory method of the RemoteInputStreamServer class. Here is the interceptor that pre-processes RMI methods' arguments on the client side:

public class RmiArgumentsPreprocessor implements MethodInterceptor {

public Object invoke(MethodInvocation invocation) throws Throwable {

final Object args[] = invocation.getArguments();
for (int i = 0; i < args.length; ++i) {
if (args[i] != null && args[i] instanceof InputStream) {
final InputStream in = (InputStream)args[i];
args[i] = RemoteInputStreamServer.wrap(in);
}
}
return invocation.proceed();
}
}

Usually, you know that you are using RMI, so you will perform wrapping manually when using stream arguments or return values. Note that we do not use rmiregistry at all - the stream proxy extending Input/OutputStream contains remote stub obtained on the server side - it is the object created by the UnicastRemoteObject.exportObject(Remote) call. Stream wrapper is un-exported when client performs close() call on the stream.

In the attachement you can find all source codes inside a zipped Eclipse project.

I hope this article helps you to overcome some of the limitations implied by RMI ;)

 

AttachmentSize
remote-io.zip16.09 KB
Published at DZone with permission of its author, Jan Michalica.

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)

Comments

Gregor Kovač replied on Sun, 2010/02/07 - 2:35pm

If you want a more complete solution (with retries, ...) you can try RMIIO (http://openhms.sourceforge.net/rmiio/) and if you want a simpler solution you just pass an object that implements Externalizable, where in writeExternal method write bytes that go out and in readExternal method read those bytes. In using this last solution in one of my projects and it just works.

Aidos Morg replied on Wed, 2010/02/10 - 4:35am

I second the suggestion of RMIIO. I wrote my own code once which was very similar to yours but pulled it out in favour of RMIIO. It's quite simple to use and it's one less thing you need to worry about maintaining. Also I don't think your stream implementation is complete, if you check the javadocs there are some methods that /must/ be implemented (such as available()) which you don't seem to have provided (at least not in the code in the article).

Comment viewing options

Select your preferred way to display the comments and click "Save settings" to activate your changes.