Enterprise Integration Zone is brought to you in partnership with:

Rick Copeland is the principal consultant at Arborian Consulting, LLC, where he helps clients build custom web applications using Python and MongoDB. He previously worked as a lead software engineer at SourceForge, where he helped lead the transformation from a PHP/Postgres/MySQL codebase to a Python/MongoDB codebase. Rick is the primary author of Ming, a Python object mapper for MongoDB, and Zarkov, a realtime analytics platform based on MongoDB. Prior to GeekNet, Rick worked in fields from retail analytics to hardware chip design. Rick's personal blog is hosted at Just a Little Python. Rick has posted 25 posts at DZone. You can read more from them at their website. View Full User Profile

Using ZeroMQ Devices to Support Complex Network Topologies

09.04.2012
| 7496 views |
  • submit to reddit
Continuing in my ZeroMQ series, today I'd like to look at ZeroMQ "devices" and how you can integrate ZeroMQ with Gevent so you can combine the easy networking topologies of ZeroMQ with the cooperative multitasking of gevent. If you're just getting started with ZeroMQ, you might want to check out the following articles:
And if you want some background on Gevent, you might want to check out that series at the following links:
Once you're caught up, let's get started... 

ZeroMQ "devices"

One of the nice aspects of ZeroMQ is that it decouples the communication pattern with the connection pattern of your endpoints. Without ZeroMQ, you'd commonly have a "server" socket which binds to a port and receives requests, while "clients" connect to that socket and send requests. With ZeroMQ, it's perfectly acceptable to have the "server" connect to the "client" to receive requests (and in fact you can have multiple "servers" connected to the same "client" socket).
While this is handy, in some cases you may want to have both client and server relatively dynamic, with both connecting and neitherbinding to a particular port. For this use case, ZeroMQ provides so-called "devices" that bind a couple of sockets and perform forwarding operations between them to support common communication patterns. The "client" and "server" then both connect to the device. In this section, I'll cover the devices provided by the ZeroMQ library.

Queue device

The Queue device is responsible for mediating the REQ/REPcommunication pattern. Suppose we have the following request code:

import sys
import zmq

context = zmq.Context()

for x in xrange(10):
    sock = context.socket(zmq.REQ)
    sock.connect(sys.argv[1])
    print 'REQ is', x,
    sock.send(str(x)) 
and the following response code:
import sys
import zmq

context = zmq.Context()

while True:
    sock = context.socket(zmq.REP)
    sock.connect(sys.argv[1])
    x = sock.recv()
    print 'REQ is', x,
    reply = 'x-%s' % x
    sock.send(reply) 

To set up a broker that forwards between the two, you need a tiny script: 

import sys
import zmq

context = zmq.Context()

s1 = context.socket(zmq.ROUTER)
s2 = context.socket(zmq.DEALER)
s1.bind(sys.argv[1])
s2.bind(sys.argv[2]) 
Note that we've used a couple of new socket types above:zmq.ROUTER and zmq.DEALER. These are similar to zmq.REP and zmq.REQ, respectively, but they allow us to break the strict "request-response" lockstep of REQ/REP. The way they work is the following:
  1. The ROUTER socket receives a message on a particular connection. It adds a message ID to the beginning of the message that identifies the sending REQ socket.
  2. The FORWARDER device sends the message received on theROUTER to the DEALER socket.
  3. The DEALER picks a connection to send the message to, stripping the prefix but noting the message ID and linking it to the REP socket handling the message.
  4. The DEALER receives the response message, pairs it up with the message ID it's responding to, and adds the message ID to the beginning of the mssage.
  5. The FORWARDER device sends the message received on theDEALER to the ROUTER socket.
  6. The ROUTER socket strips the message ID and sends the message to the REQ socket that sent the initial request.

By using message IDs as above, we can have multiple messages 'in flight', being handled by various servers. If you'd rather not use the built-in ZeroMQ device, you can build your own fairly simply. The code below shows a device that that uses a pair of threads to relay messages from one socket to another:

import sys
import zmq
import time

context = zmq.Context()

s1 = context.socket(zmq.ROUTER)
s2 = context.socket(zmq.DEALER)
s1.bind(sys.argv[1])
s2.bind(sys.argv[2])

def zeromq_relay(a, b):
    '''Copy data from zeromq socket a to zeromq socket b'''
    while True:
        msg = a.recv()
        more = a.getsockopt(zmq.RCVMORE)
        if more:
            b.send(msg, zmq.SNDMORE)
        else:
            b.send(msg)

def zmq_queue_device(s1, s2):
    import threading
    t1 = threading.Thread(target=zeromq_relay, args=(s1,s2))
    t2 = threading.Thread(target=zeromq_relay, args=(s2,s1))
    t1.daemon = t2.daemon = True
    t1.start()
    t2.start()
    while True:
        time.sleep(10)
 
You can also build a device using nonblocking IO, but that's a bit beyond the scope of what I want to cover here.

Forwarding device

In the same way the QUEUE device mediates the REQ/REP pattern, theFORWARDER device mediates the PUB/SUB pattern. Since PUB/SUB doesn't require the lockstep operation of REQ/REP, we can use the regularPUB/SUB sockets in our device: 

import sys
import zmq
context = zmq.Context()

s1 = context.socket(zmq.SUB)
s2 = context.socket(zmq.PUB)

s1.bind(sys.argv[1])
s2.bind(sys.argv[2])

s1.setsockopt(zmq.SUBSCRIBE, '')
 

Now we can connect one or more publishers to our device's "upstream" port (sys.argv[1]), and one or more subscribers to the device's "downstream" port (sys.argv[2]) to provide a PUB/SUBbroker. Note in particular that we had to subscribe to all mesages in the device code since we don't know which messages our downstream sockets are interested in. 
If you'd rather filter messages that get fowarded, you can subscribe to some subset of messages. Unfortunately, I'm not aware of any way to forward only messages that the downstream clients have subscribed to using built-in ZeroMQ functionality.
If we want to write our own FORWARDER device, it's even simpler than the QUEUE device since it only handles unidirectional communication. Assuming we have the zeromq_relay function as defined above, ourFORWARDER device is just the following:

def zmq_forwarder_device(upstream, downstream): 

Streaming device

Similar to the FORWARDER device, the STREAMER device just sends upstream packets downstream, but in this case in support of thePUSH/PULL pattern rather than PUB/SUB. To make a STREAMER broker, then, we just need the following code:  

import sys
import zmq
context = zmq.Context()

s1 = context.socket(zmq.PULL)
s2 = context.socket(zmq.PUSH)

s1.bind(sys.argv[1])
s2.bind(sys.argv[2])
 
And once again, if we wanted to create the device manually, it's just a relay:
def zmq_streaming_device(upstream, downstream): 

Integration with gevent

You may have noticed that the gevent device function doesn't return. If you want to create multiple devices within your Python program, then, you'll need to wrap the devices in threads. 

Another approach that you might use if you, like me, prefer the lightweight threading and asynchronous I/O of gevent, is to use thegevent-zeromq package available from PyPI: 

$ pip install gevent-zeromq  

 Now we can use a 'green' version of ZeroMQ by just importing from the gevent-zeromq wrapper in our scripts. A "pusher" script would then look like this:

import sys
import time

from gevent_zeromq import zmq

context = zmq.Context.instance()
sock = context.socket(zmq.PUSH)
sock.connect(sys.argv[1])

while True:
    time.sleep(1) 
Simple, right? The reason I throw this in this seemingly-unrelated post is that if you want to use gevent, you can't use the built-in devices. This is because the built-in devices block, and they block in the ZeroMQ C library, not in Python where they could be "greened". So if you want a device with gevent, you'll have to write your own (which as, after all, pretty straightforward).

Conclusion

ZeroMQ devices provide a handy way to stitch together complex routing topologies and allow you to decouple the various components of your architecture. Though the built-in devices are quite simple, they provide insight into how you could build more complex devices yourself to fulfill the role of "broker" in a distributed architecture. 

I'd be interested in hearing from you how you are using devices (or whether you find them useful at all) in ZeroMQ programming. Do you use the built-in devices? Any devices you write yourself? Do you prefer the multithreaded device approach I've used here, or using the zmq.Poller object? Let me know in the comments below! 

Published at DZone with permission of its author, Rick Copeland. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)