Enterprise Integration Zone is brought to you in partnership with:

Rick Copeland is the principal consultant at Arborian Consulting, LLC, where he helps clients build custom web applications using Python and MongoDB. He previously worked as a lead software engineer at SourceForge, where he helped lead the transformation from a PHP/Postgres/MySQL codebase to a Python/MongoDB codebase. Rick is the primary author of Ming, a Python object mapper for MongoDB, and Zarkov, a realtime analytics platform based on MongoDB. Prior to GeekNet, Rick worked in fields from retail analytics to hardware chip design. Rick's personal blog is hosted at Just a Little Python. Rick has posted 25 posts at DZone. You can read more from them at their website. View Full User Profile

Distributed Systems with ZeroMQ

08.15.2012
| 33774 views |
  • submit to reddit

Departing a bit from my current series on gevent and Python, today I want to take a look at a different networking technology that's been gaining traction: ZeroMQ. So without further ado, let's jump right in...

ZeroMQ design principles

First, ZeroMQ is not a message broker. People sometimes mistake it for one because of its name. Actually, ZeroMQ is a library that supports certain network communication patterns using sockets. The "MQ" part comes in because ZeroMQ uses queues internally to buffer messages so that you don't block your application when sending data. When you say socket.send(...), ZeroMQ actually enqueues a message to be sent later by a dedicated communication thread. (This communication thread and its state are encapsulated in the ZeroMQ Context object used below; most programs will have a single Context.)

ZeroMQ binding/connecting versus "normal" sockets

Next, keep in mind that ZeroMQ separates the notion of clients and servers from the underlying communication pattern. For instance, you may be used to creating a socket for receiving requests with a pattern similar to the following:

from socket import socket

sock = socket()
sock.bind(('', 8080))
sock.listen(256)
while True:
    cli = sock.accept()
    # The following code would probably be handled in a 'worker' thread or
    # greenlet. It's included here only for example purposes.
    message = cli.recv(...)
    response = handle_message(message)
    cli.send(response)
The following code would probably be handled in a 'worker' thread or greenlet. It's included here only for example purposes.

The client would then connect() to the server and send a request:

from socket import socket

sock = socket()
sock.connect(('localhost', 8080))
sock.send(message)
response = sock.recv(...)


In ZeroMQ, either end of the request/response pattern can bind, and either end can connect. For instance, using the pyzmq library, you can have your "server" (the one who handles requests) connect to the "client" (the one who sends requests). The "server" code then looks like this:

import zmq
context = zmq.Context.instance()

sock = context.socket(zmq.REP)
sock.connect('tcp://localhost:8080')

while True:
    message = sock.recv()
    response = handle_message(message)
    sock.send(response)


The "client" code would look like this:

import zmq
context = zmq.Context.instance()

sock = context.socket(zmq.REQ)
sock.bind('tcp://*:8080')

sock.send(message)
response = sock.recv()


A couple of things deserve attention here. First, as noted above, the "server" is doing the connecting, and the "client" is doing the binding. Another thing to note is the address being used. Rather than passing a hostname/port, we pass a URI.

ZeroMQ transport types

ZeroMQ supports several different styles of URIs for its transport layer, each of which supports the full gamut of ZeroMQ functionality:

  • tcp://hostname:port sockets let us do "regular" TCP networking
  • inproc://name sockets let us do in-process networking (inter-thread/greenlet) with the same code we'd use for TCP networking
  • ipc:///tmp/filename sockets use UNIX domain sockets for inter-process communication
  • pgm://interface:address:port and epgm://interface:address:port use the OpenPGM library to support multicast over IP (pgm) and over UDP (epgm). Due to the nature of multicast, the pgm and epgm transports can only be used with PUB/SUB socket types (more on this below).

ZeroMQ disconnected operation

One feature that sometimes catches programmers new to ZeroMQ off guard is that it supports disconnected operation. In the code above, for instance, we could have started the server first and the client later. With TCP sockets, this wouldn't work because the server tries to connect() to the client. In ZeroMQ, the connect() will go through "optimistically," assuming that someone's going to bind to that port later.

What's more is that you can have a client start up, bind to port 8080, perform a transaction with the server, and then shutdown. Another client can then start up, bind to port 8080, and perform another transaction. The server just keeps handling requests, happily "connected" to whatever happens to bind to port 8080.

ZeroMQ message encapsulation

One final aspect of ZeroMQ is that it encapsulates communication into messages that may be composed of multiple parts. Rather than asking ZeroMQ to receive a certain number of bytes from the socket, you ask ZeroMQ to receive a single message. You can also send and receive multipart messages using the zmq.SNDMORE and zmq.RECVMORE options. To send a multipart message, just use zmq.SNDMORE as a second argument to each part's send() except the last:

sock.send(part1, zmq.SNDMORE)
sock.send(part2, zmq.SNDMORE)
sock.send(part3, zmq.SNDMORE)
sock.send(final)

 
The client can then ask if there's more to receive:

more = True
parts = []
while more:
    parts.append(sock.recv())
    more = sock.getsockopt(zmq.RCVMORE) 


ZeroMQ communication patterns

A core concept of ZeroMQ that I've alluded to above but not made explicit is the communication patterns supported by ZeroMQ. Because of some of the whiz-bang features such as asynchronous communication and disconnected operation, it's necessary to apply higher-level patterns than just shoving bytes from one endpoint to another. ZeroMQ implements this by making you specify a socket_type when you call zmq.Context.socket(). Each socket type has a set of "compatible" socket types with which it can communicate, and ZeroMQ will raise an exception if you try to communicate between incompatible sockets. Here, I'll describe some of the basic patterns:

ZeroMQ request/reply pattern

This pattern is fairly classic; one end (with socket_type=zmq.REQ) sends a request and receives a response. The other end (with socket_type=zmq.REP) receives a request and sends a response. A simple echo server might use this pattern. The server would be the following:

import sys
import zmq

context = zmq.Context()
sock = context.socket(zmq.REP)
sock.bind(sys.argv[1])

while True:
    message = sock.recv()
    sock.send('Echoing: ' + message)


Your client then looks like this:

import sys
import zmq
context = zmq.Context()

sock = context.socket(zmq.REQ)
sock.connect(sys.argv[1])
sock.send(' '.join(sys.argv[2:]))
print sock.recv()


Note that in this pattern the zmq.REQ socket must communicate with a series of send(), recv() pairs, and the zmq.REP socket must communicate with a series of recv(), send() pairs. If you try to send or recv two messages in a row, ZeroMQ will raise an exception. This can cause problems if you have a server that crashes, for instance, because you'd leave your client in a "dangling send" state. To recover, you need some other mechanism for timing out requests, closing the socket, and retrying with a new, fresh zmq.REQ socket.

ZeroMQ publish/subscribe pattern

In the publish/subscribe pattern, you have a single socket of type zmq.PUB and zero or more connected zmq.SUB sockets. The zmq.PUB socket broadcasts messages using send() that the zmq.SUB sockets recv(). Each subscriber must explicitly say what messages it's interested in using the setsockopt method. A subscription is a string specifying a prefix of messages the subscriber is interested in. Thus to subscribe to all messages, the subscriber would use the call sub_sock.setsockopt(zmq.SUBSCRIBE, ''). Subscribers can also explicitly unsubscribe from a topic using setsockopt(zmq.UNSUBSCRIBE, ... as well.

One interesting aspect of the zmq.SUB sockets is that they can connect to multiple endpoints, so that they receive messages from all the publishers. For example, suppose you have a server periodically sending messages:

import sys
import time
import zmq

context = zmq.Context()
sock = context.socket(zmq.PUB)
sock.bind(sys.argv[1])

while True:
    time.sleep(1)
    sock.send(sys.argv[1] + ':' + time.ctime())


You could have a client connect to multiple servers with the following code:

import sys
import zmq

context = zmq.Context()
sock = context.socket(zmq.SUB)
sock.setsockopt(zmq.SUBSCRIBE, '')

for arg in sys.argv[1:]:
    sock.connect(arg)

while True:
    message= sock.recv()
    print message


To see the multi-subscribe in action, you can start these programs as follows:

$ python publisher.py tcp://*:8080 & python publisher.py tcp://*:8081 &
$ python subscriber.py tcp://localhost:8080 tcp://localhost:8081 


ZeroMQ push/pull pattern

Similar to the pub/sub pattern in the push/pull pattern you have one side (the zmq.PUSH socket) that's doing all the sending, and the other side (zmq.PULL) does all the receiving. The difference between push/pull and pub/sub is that in push/pull each message is routed to a single zmq.PULL socket, whereas in pub/sub each message is broadcast to all the zmq.SUB sockets. The push/pull pattern is useful for pipelined workloads where a worker process performs some operations and then sends results along for further processing. It's also useful for implementing traditional message queues.

We can see the routing of messages by connecting multiple clients to a single server. For this example, we can just change our socket type in the publisher code to be of type zmq.PUSH:

import sys
import time
import zmq

context = zmq.Context()
sock = context.socket(zmq.PUSH)
sock.bind(sys.argv[1])

while True:
    time.sleep(1)
    sock.send(sys.argv[1] + ':' + time.ctime())


Our client is likewise similar to the subscriber code:

import sys
import zmq

context = zmq.Context()
sock = context.socket(zmq.PULL)

for arg in sys.argv[1:]:
    sock.connect(arg)

while True:
    message= sock.recv()
    print message


(Note that we can do the same multi-connect trick we did with the pub/sub, as well.) Now to see the multi-push, multi-pull, we can start two "pushers" and two "pullers":

$ # Start the pushers in one window
$ python pusher.py tcp://*:8080 & python pusher.py tcp://*:8081 &
$ # Start a puller in another window
$ python puller.py tcp://localhost:8080 tcp://localhost:8081
$ # Start another puller in a third window
$ python puller.py tcp://localhost:8080 tcp://localhost:8081


Conclusion

ZeroMQ provides a handy abstraction for several network communication patterns that we can use quite easily from Python. If you're thinking of building a high-performance distributed system, its certainly worth checking out ZeroMQ as a possible transport layer. Here, I've barely scratched the surface of what's possible with ZeroMQ in Python. In future posts, I'll go a bit deeper, covering topics including:

  • flow control with ZeroMQ
  • advanced communication patterns and devices
  • using ZeroMQ with gevent

I'd love to hear how you're using (or are thinking of using) ZeroMQ for building Python applications. In particular, are there any questions you have about ZeroMQ that I might be able to answer in successive posts? Are you using ZeroMQ already, and if so, have you run into any issues? Tell me about it in the comments below!

Published at DZone with permission of its author, Rick Copeland. (source)

(Note: Opinions expressed in this article and its replies are the opinions of their respective authors and not those of DZone, Inc.)