Saturday, May 3, 2014

code: Multiprocessing producer/consumer with logging

Wordclouds (and multiple processes) are fun

Let's say you're writing a browser toy, to display what people say in their important #beer tweets. On the server you want to scan Twitter for #beer, and store the tweets. On occasion, a browser will fetch the list of words, then display most recent related words in those tweets as a word cloud.

To reduce complexity you don't want to add any extra packages, which might be untested and/or sketchy. What do you do with a standard "batteries included" Python? You use multiprocessing!

The multiprocessing module lets you write programs as a system of a connected processes. In this case, one is a producer: it does work then pushes information to list of tweets, shared in the system. Another process is a consumer: it waits for data from producer, then processes it for display on a browser in a pretty word cloud.

Server programming: log early and often

Without writing asynchronous code it's hard to do a lot of I/O in a single Python process. By splitting up your project into multiple tasks, each with its own process, each task can run on a separate CPU in parallel. The multiprocessing module helps us start and stop proceses, and communicate data back and forth.

In app programming, debugging is interactive; "print" statements are one way of testing the code.  On a server, this doesn't work as well.  It's best to have logging statements and lots of them to make sure the system works, and to diagnose errors.  Each logging entry has a timestamp, a severity, and a message.  By reviewing ERROR and WARNING entries one can verify the system works, and can diagnose it if it's behaving strangely, even if issues are rare.

It's better to have too much logging than not enough. Your Operations people don't understand your code.  If they see overall system problems it's easier for them to sift out the irrelevant logging messages, rather than add more logging into a complex system.

Log early, log often -- you and your operations people will love you for it.


This post was inspired by Playing with REALTIME data, Python and D3 by Brett Dangerfield. His code actually scans Twitter and does the word cloud display.


If you're even curious about Python, run don't walk to get Python Cookbook by David Beazley and Brian K. Jones.  I've been programming in Python for 15 years and learn new tools and techniques from every chapter!


In modern Python3, take a look at the more graceful concurrent.futures solution.


Code: mptest_proxy



#!/usr/bin/env python

'''
mptest_proxy.py -- producer adds to fixed-sized list; scanner uses them

OPTIONS:
-v  verbose multiprocessing output
'''

import logging, multiprocessing, sys, time


def producer(objlist):
    '''
    add an item to list every 2 sec; ensure fixed size list
    '''
    logger = multiprocessing.get_logger()
    logger.info('start')
    while True:
        try:
            time.sleep(1)
        except KeyboardInterrupt:
            return
        msg = 'ding: {:04d}'.format(int(time.time()) % 10000)
        logger.info('put: %s', msg)
        del objlist[0]
        objlist.append( msg )


def scanner(objlist):
    '''
    every now and then, run calculation on objlist
    '''
    logger = multiprocessing.get_logger()
    logger.info('start')
    while True:
        try:
            time.sleep(5)
        except KeyboardInterrupt:
            return
        logger.info('items: %s', list(objlist))
            

def main():
    opt_verbose = '-v' in sys.argv[1:] 
    logger = multiprocessing.log_to_stderr(
            level=logging.DEBUG if opt_verbose else logging.INFO,
    )
    logger.info('setup')

    # create fixed-length list, shared between producer & consumer
    manager = multiprocessing.Manager()
    my_objlist = manager.list( # pylint: disable=E1101
        [None] * 10
    )

    multiprocessing.Process(
        target=producer,
        args=(my_objlist,),
        name='producer',
    ).start()

    multiprocessing.Process(
        target=scanner,
        args=(my_objlist,),
        name='scanner',
        ).start()

    logger.info('running forever')
    try:
        manager.join() # wait until both workers die
    except KeyboardInterrupt:
        pass
    logger.info('done')
    

if __name__=='__main__':
    main()

No comments:

Post a Comment