This post will show you the basic concepts pertaining to programming asynchronous applications with WebSphere MQ (‘MQ’ from now on) and Python. MQ, in case you didn’t know it, is a proprietary piece of software by IBM which lets you easily design and create distributed applications utilising the popular messaging patterns – queues and topics. If you’ve ever heard of RabbitMQ, ZeroMQ or similar projects then you’ll find yourself comfortable with MQ right away. Only MQ is much older and proprietary though there’s a 90-day trial version you can download from IBM’s site. I don’t know about you but I stick to the opinion that one should usually know main opensource as well as closed applications in their domain so if you’re into asynchronous programming in general, you’ll most likely find MQ an interesting subject to learn. Python isn’t (yet?) an officially IBM-blessed language for MQ programming but PyMQI, the Python bindings, has been around for about a decade and it’s extremely well field-tested, so that’s what we’ll be using.
Let’s say we have a CRM and a pair of surrounding applications, an IVR system and a self-service portal. When a customer account’s data is being changed we need to notify the other systems and we’re to use MQ for the task. Note that in reality I’d likely use a middleware construct – such as an ESB – in the middle of it all to avoid the tight coupling between the apps but the post’s about MQ so let’s turn a blind eye to it for now. There are apps and queues, each client application – one to be notified – has its dedicated queue on which it listens for messages from the CRM.
The figure below depicts the communication paths; note that even though we can say that MQ notifies the IVR and a portal about the changes, it’s actually the case that both apps need to initiate a connection to MQ and listen for incoming messages hence the arrows point towards MQ, not the other way around.
Each particular arrow represents a connection through a channel. A channel is sort of a link and a gate through which messages are being sent. There’s one channel for each application but a single channel may be reused for sending messages to more than one queue as it’s the case with CRM which puts messages onto both client applications’ queues. It’s even more than that, a single channel may be used by more that one application and that’s what the code below does – not that it’s particularly safe or sound to do it but it’s there in case that’s how you’d like things be set up – and how channels can be secured with or without Python around is a topic for one of the next posts.
For the sake of the post let’s say our environment is comprised of the following MQ objects:
- an MQ queue manager (that is, an MQ server) named QM01, running on host 192.168.1.139, listening for connections on port 1434
- a channel for the CRM, called CRM.SVRCONN.1,
- a channel for the IVR, called IVR.SVRCONN.1,
- a channel for the portal, called PORTAL.SVRCONN.1,
- a queue for the IVR, called IVR.1,
- a queue for the portal, called PORTAL.1
The first thing we’ll do is to connect the CRM to the queue manager over its dedicated channel and put some messages onto queues:
# stdlib import time # PyMQI import pymqi queue_manager = 'QM01' channel = 'SVRCONN.1' host = '192.168.1.139' port = '1434' message = 'New data!' queue_names = ['CRM.1', 'IVR.1'] conn_info = '%s(%s)' % (host, port) qmgr = pymqi.QueueManager(None) qmgr.connectTCPClient(queue_manager, pymqi.cd(), channel, conn_info) queues = [pymqi.Queue(qmgr, name) for name in queue_names] try: while True: for queue in queues: queue.put(message) time.sleep(1) finally: for queue in queues: queue.close() qmgr.disconnect()
What we’ve just achieved is obtaining a connection to MQ through a TCP channel and adding a loop which makes sure messages are trickling into each of the client queues. Note the finally block which closes any objects used in the example. Always clean up after yourself
OK, let’s add the receiving part now. That’s the same code for both IVR and a portal so the name of a queue to get messages off is read from the command line.
# stdlib import sys, optparse # (I know there's argparse but I'm stuck with 2.6) # PyMQI import pymqi, CMQC queue_manager = 'QM01' channel = 'SVRCONN.1' host = '192.168.1.139' port = '1434' conn_info = '%s(%s)' % (host, port) # Read the queue name from the command line parser = optparse.OptionParser() parser.add_option("-q", dest="queue", help="Queue to get the messages off") (options, args) = parser.parse_args() if not options.queue: print("Forgot to specify the queue?") parser.print_help() sys.exit(1) qmgr = pymqi.QueueManager(None) qmgr.connectTCPClient(queue_manager, pymqi.cd(), channel, conn_info) queue = pymqi.Queue(qmgr, options.queue) # Message Descriptor md = pymqi.md() # Get Message Options gmo = pymqi.gmo() gmo.Options = CMQC.MQGMO_WAIT | CMQC.MQGMO_FAIL_IF_QUIESCING gmo.WaitInterval = 5000 # 5 seconds try: while True: try: # Wait up to to gmo.WaitInterval for a new message. message = queue.get(None, md, gmo) print(message) # Reset the MsgId, CorrelId & GroupId so that we can reuse # the same 'md' object again. md.MsgId = CMQC.MQMI_NONE md.CorrelId = CMQC.MQCI_NONE md.GroupId = CMQC.MQGI_NONE except pymqi.MQMIError, e: if e.comp == CMQC.MQCC_FAILED and e.reason == CMQC.MQRC_NO_MSG_AVAILABLE: # No messages, that's OK, we can ignore it. pass else: # Some other error condition. raise finally: queue.close() qmgr.disconnect()
I now feel the urge to mention that the client code above could’ve been written in a shorter – and much a sloppier – way but I instead opted for showing you all the correct MQ programming patterns that make the code run with a speed close to that of a C MQ program with a minimal amount of RAM needed. That’s why the code may seem a little bit verbose but then again, even though I’m about as lazy as most Python programmers are, I don’t really have any problems with writing 3 lines of code instead of a single one if only the API doesn’t feel too contrived and baroque.
So what’s new in the client code anyway?
A Message Descriptor (MD) and Get Message Options (GMO) are one of the structures used by PyMQI to pass the relevant options to MQ layers below. That’s how you do programming with PyMQI – and with MQ generally speaking – there are few concepts and verbs, like a queue, putting a message onto queue or getting it from it, but there are literally hundreds of customization options it all can be wrapped with. This particular GMO says that we don’t want to bail out immediately if there aren’t any messages on the queue (MQGMO_WAIT) and that we’re good MQ citizens and when the queue manager’s shutting down we will be disconnecting from it instead of leaving dangling half-open connections that prevent a queue manager from shutting down – and believe me that MQ admins will start loving you for using that option (MQGMO_FAIL_IF_QUIESCING). Then there’s gmo.WaitInterval which says how many milliseconds we intend to wait for a message before giving up and raising an exception as an indication that there has been none (for the curious one, PyMQI’s releasing GIL while waiting for messages or indeed, when calling any MQ API function).
Note there’s an inner try/except block now. What it does is catching any MQ exceptions and checking whether the exception raised isn’t trying to tell us that there have been no messages on a queue – that’s how it’s signalled to layers above, by raising an exception. The exception object, ‘e’, has two interesting properties – one is a completion code ‘.comp’, the latter one’s a reason code ‘.reason’. Basically, the completion code says if there was an error, a warning condition or if everything went fine. If there indeed was an error or a warning situation, the reason code is set to some meaningful value, such as MQRC_NO_MSG_AVAILABLE above which makes it obvious for us that well, no messages were found on a queue during that interval, and if so, we may safely ignore the exception and let it try getting the message again. In this example any other error condition is being re-raised though so that we know something went wrong. All the constants – and their meaning – can be looked up in IBM’s documentation, PyMQI is simply passing it on from MQ to our code.
As for resetting the IDs (MsgId, CorrelId and GroupId), the thing is that most options are passed to PyMQI and MQ by reference. In this context it means that some of the MD object’s attributes will be overwritten by MQ and it just so happens that
- on a successful call to a .get, MQ sets .MsgId, .CorrelId and .GroupId values to those found in the message read,
- we’re using the same MD object over and over in the loop,
- if a call to a .get specifies an MD whose .MsgId, .CorrelId and .GroupId attributes are set to anything but their equivalent of None (MQMI_NONE, MQCI_NONE and MQGI_NONE respectively), MQ will be trying to get a message from a queue but only if that message’s attributes have values matching those set in the MD.
See where it takes us to? Hadn’t we been resetting the IDs, the first successful call to .get would’ve set the MD’s IDs for good and any subsequent call would’ve been trying to get new messages by those IDs, which – unless you’re trying hard, but what for – is guaranteed to never succeed, IDs are unique by default.
So that would be it as far as an introduction goes I’m not even saying I haven’t covered everything about PyMQI because I’ve rather only mentioned a tiny little bit of what’s available to Python programmers. And there’s more, there’s Jython, Spring, Spring Python, JMS programming, MQ administration – PCF and MQAI, publish/subscribe and even more. Be sure to leave a comment or drop me an e-mail if there’s anything in particular you’d like to see covered!