Just show me the code!
Sure, here you go: PyZMQ Paranoid Pirate
Considering ZeroMQ for your application
I sometimes get the opportunity to work with ZeroMQ professionally. ZeroMQ is a very powerful and versatile tool, which is great, but comes with the tradeoff of needing additional development work and testing to ensure your application code is using it correctly. If you're considering using ZeroMQ, you should first ask yourself if one of it's competing technologies will work. If you have sufficient hardware and can use an external piece of hardware or a cloud service to run something like RabbitMQ, ActiveMQ, or MQTT, I generally recommend you do it. These technologies are going to just be easier to develop against, and you'll save a good chunk of development time using them instead. That being said, if your use case involves distributed networking without a dedicated central hub, or specifically requires a very quick messaging rate, pull in ZeroMQ.
If you're reading this, you should already have some knowledge on ZeroMQ works and it's multiple forms of communication. For this article, we're going to focus on the DEALER/ROUTER pattern. The official documentation for ZeroMQ is better than what I can write, so reference these articles for more information on what the DEALER/ROUTER pattern is and how it works:
The key thing you'll need to take away for this article is to know that DEALER/ROUTER ZeroMQ sockets allow for reliable bi-directional communication. If your use case doesn't need bidirectional communication, you can use a simpler ZeroMQ pattern-Publish/Subscribe or Request/Reply. These work as advertised, but I often find that typical use cases demand an initial handshake before sending information, and to be alerted if their target fails for some reason. The ZeroMQ developers recognized this need long ago, and describe a solution with DEALER/ROUTER called the Paranoid Pirate
That link is it will describe the primary benefit of the paranoid pirate-keeping a heartbeat in addition to client/worker communications gives you a ZMQ-style way of letting clients and workers introduce themselves to each other, and an established way for these partners to halt/resume communications. You can even get a core Python code sample.
The bad news is that while the ZeroMQ documentation is readable and does a great job describing the ZeroMQ protocol, but doesn't feature many application level examples-there are some snippets that show you how to correctly set up communications, but larger examples are left to higher-level libraries that wrap around ZeroMQ. These higher level libraries like PyZMQ or zmqpp in turn explain their own usage well, but don't really provide an example implementation of any of the higher level patterns like the paranoid pirate. So, let's implement the paranoid pirate in PyZMQ.
A PyZMQ implementation of the ZeroMQ Paranoid Pirate pattern
We'll start with an intentionally simplified version of the Paranoid Pirate-the "full" implementation would include a single client and multiple workers, but to keeps things simple here we'll stick to just a single worker. Additionally, we'll make the example a bit easier to visualize by implementing an RPC-style interface-the client will issue a request to the worker, and the worker will check that request against a set of known procedures it has, and will either return a result or throw an exception. For the most part, the sample code is documented well, so to understand what's going on I'm mostly going to direct you to look at the code. Still, let's look at a few key parts and see what outcome we get when we run the sample.
Let's run it!
I've included a few scripts that should make running this code easier. First, take a look at run_demo_xero_in_docker_env.sh
#!/bin/bash set -eu set -o pipefail docker-compose build && docker-compose up --abort-on-container-exit \ codedrunk_xero_uniclient \ codedrunk_xero_uniworker
I've included some dockerfiles that will build the example and run some integration tests to make sure things are working correctly. If everything builds correctly, you should see an output like this:
Building codedrunk_xero_base Step 1/16 : FROM ubuntu:18.04 ---> ccc6e87d482b Step 2/16 : ENV DEBIAN_FRONTEND noninteractive ---> Using cache ---> 05583fe42b89 ... Step 16/16 : RUN (cd demo_xero && pip3 install .) ---> Using cache ---> f63b97161944 Successfully built f63b97161944 Successfully tagged stackhead/codedrunk_xero_base:latest Building codedrunk_xero_uniclient Step 1/2 : FROM stackhead/codedrunk_xero_base ---> f63b97161944 Step 2/2 : CMD ["demo_xerouniclient", "tcp://*:5550", "ping", "--count", "20"] ---> Running in 7d5aa7ad7cf5 Removing intermediate container 7d5aa7ad7cf5 ---> ba084294f665 Successfully built ba084294f665 Successfully tagged stackhead/codedrunk_xero_uniclient:latest Building codedrunk_xero_uniworker Step 1/2 : FROM stackhead/codedrunk_xero_base ---> f63b97161944 Step 2/2 : CMD ["demo_xerouniworker", "tcp://codedrunk_xero_uniclient:5550"] ---> Using cache ---> 3b83047e3d4b Successfully built 3b83047e3d4b Successfully tagged stackhead/codedrunk_xero_uniworker:latest Starting codedrunk_xero_uniworker ... done Recreating codedrunk_xero_uniclient ... done Attaching to codedrunk_xero_uniworker, codedrunk_xero_uniclient codedrunk_xero_uniworker | 03:49:43.094 asyncio DEBUG [MainThread ] Using selector: EpollSelector codedrunk_xero_uniworker | 03:49:43.095 asyncio DEBUG [MainThread ] Using selector: EpollSelector codedrunk_xero_uniclient | 03:49:43.162 asyncio DEBUG [MainThread ] Using selector: EpollSelector codedrunk_xero_uniclient | 03:49:43.162 asyncio DEBUG [MainThread ] Using selector: EpollSelector codedrunk_xero_uniclient | 03:49:43.280 xero.uni.uniclient INFO [uniclientthread] _register_worker codedrunk_xero_uniclient | 03:49:43.280 xero.uni.uniclient DEBUG [uniclientthread] worker.registerWorker for 'b'\x00k\x8bEg'' is connected. codedrunk_xero_uniclient | 03:49:43.281 demo_xero.demo_xerouniclient DEBUG [uniclientthread] partial msg: 'started' codedrunk_xero_uniclient | 03:49:43.282 demo_xero.demo_xerouniclient DEBUG [MainThread ] rpc reply: 'pong' ... codedrunk_xero_uniclient | 03:49:43.292 demo_xero.demo_xerouniclient DEBUG [uniclientthread] partial msg: 'started' codedrunk_xero_uniclient | 03:49:43.292 demo_xero.demo_xerouniclient DEBUG [MainThread ] rpc reply: 'pong' codedrunk_xero_uniclient | 03:49:43.293 demo_xero.demo_xerouniclient DEBUG [uniclientthread] partial msg: 'started' codedrunk_xero_uniclient | 03:49:43.293 demo_xero.demo_xerouniclient DEBUG [MainThread ] rpc reply: 'pong' codedrunk_xero_uniclient | Starting client codedrunk_xero_uniclient | waiting for worker codedrunk_xero_uniclient | Shutting down codedrunk_xero_uniclient exited with code 0 Aborting on container exit... Stopping codedrunk_xero_uniworker ... done
You should find the dockerfiles have installed the example python packages and are running the basic "ping" command as quickly as possible.
If you want to run the samples directly on your dev machine, install both xero and demo_xero via:
pip3 install -e ./xero pip3 install -e ./demo_xero
In one terminal, run the worker:
In a second terminal, run any of the commands in demo_xero/run_examples.sh:
#!/bin/bash set -eu set -o pipefail # Simplest example-call RPC with no arguments and get single string as a reply ./bin/demo_xerouniclient tcp://127.0.0.1:5550 ping # Similar, call RPC with no arguments and ensure the "reply" comes in, but no data returned ./bin/demo_xerouniclient tcp://127.0.0.1:5550 return_none # Call an RPC that just does an equality comparison. Simple example to demonstrate how to pass arguments to worker RPC ./bin/demo_xerouniclient tcp://127.0.0.1:5550 compare --args '"apple", "orange"' # Effectively the same thing, but showing how to use named arguments. ./bin/demo_xerouniclient tcp://127.0.0.1:5550 compare --kwargs '"str1":"apple", "str2":"orange"' # Call an RPC that doesn't actually exist, and make sure the worker is able to handle the problem cleanly while returning # the actual exception that is raised. ./bin/demo_xerouniclient tcp://127.0.0.1:5550 invalid_request # The slow_ functions are serviced by an "actor" model. slow_succeed will succeed after the provided timeout, # slow_fail will fail after the provided timeout. ./bin/demo_xerouniclient tcp://127.0.0.1:5550 slow_succeed --args 5 --timeout 20 ./bin/demo_xerouniclient tcp://127.0.0.1:5550 slow_fail --args 5 --timeout 20
A warning about PyZMQ
The PyZMQ project seems to be under pretty active development, which is great. However, it looks like they're working with a fairly challenging dependency environment as they try to incorporate/maintain compatibility with Facebook's Tornado module. I'd say they're doing rather well, but it's still easy to inadvertently create a system that seems to work reliable for tens of seconds, or even minutes, but will later stop dropping messages for seemingly no reason. This can be mitigated by making a particular effort to version match your environment, PyZMQ, and tornado once you have something that works reliably. The PyZMQ team also keeps a good listing of errata/upgrade headaches and their solutions listed in their documentation, so if you have reliability issues with PyZMQ I'd carefully read their explanation of PyZMQ and the event loop. Whenever I've had problems with this sample breaking, it is because the PyZMQ team had to make updates under the hood to manage the different async libraries they're compatible with, and the problem could be fixed by reading the changelog and updating accordingly.
Thanks for reading, this should give you some ideas on how to use PyZMQ with some other modern python libraries to do robust messaging. This sample hasn't been battle-hardened on a production system, but it does includes some tests that should keep it running on your dev machine. Here's a link to the full implementation again: PyZMQ Paranoid Pirate.