inspired from: adnans
Output:
chromium-browse 19664 kB
chromium-browse 6128 kB
avahi-daemon 320 kB
avahi-daemon 220 kB
colord 644 kB
smbd 840 kB
This blog is for technical and personal posts. Over different jobs over the years I've written hundreds of articles, and now it's time to share with everyone!
$ strace -o z -e trace=open lftp -c 'open jta'
Password:
cd: Login failed: 530 Login incorrect.
$ egrep ook z
open("/home/johnm/.local/share/lftp/bookmarks", O_RDONLY) = 4
#!/usr/bin/env python
'''
pversions.py -- search for package version from PyPi
'''
# adapted from pip.commands.SearchCommand
import sys, xmlrpclib
pnames = sys.argv[1:]
if not pnames:
sys.exit('Usage: pversions (packagename)...')
pypi = xmlrpclib.ServerProxy('https://pypi.python.org/pypi')
for packagename in (pname.lower() for pname in pnames):
print packagename,':'
exact_hits = (
hit for hit in pypi.search({'name': packagename})
if hit['name'].lower() == packagename
)
print ', '.join( (hit['version'] for hit in exact_hits) )
$$
variable), then use pgrep to find all sub-processes of the script. That is, pgrep
tells us all the workflow process IDs. If there are any, then we use the corresponding pkill command to squash them before a restart.#!/usr/bin/env python ''' mptest_proxy.py -- producer adds to fixed-sized list; scanner uses them OPTIONS: -v verbose multiprocessing output ''' import logging, multiprocessing, sys, time def producer(objlist): ''' add an item to list every 2 sec; ensure fixed size list ''' logger = multiprocessing.get_logger() logger.info('start') while True: try: time.sleep(1) except KeyboardInterrupt: return msg = 'ding: {:04d}'.format(int(time.time()) % 10000) logger.info('put: %s', msg) del objlist[0] objlist.append( msg ) def scanner(objlist): ''' every now and then, run calculation on objlist ''' logger = multiprocessing.get_logger() logger.info('start') while True: try: time.sleep(5) except KeyboardInterrupt: return logger.info('items: %s', list(objlist)) def main(): opt_verbose = '-v' in sys.argv[1:] logger = multiprocessing.log_to_stderr( level=logging.DEBUG if opt_verbose else logging.INFO, ) logger.info('setup') # create fixed-length list, shared between producer & consumer manager = multiprocessing.Manager() my_objlist = manager.list( # pylint: disable=E1101 [None] * 10 ) multiprocessing.Process( target=producer, args=(my_objlist,), name='producer', ).start() multiprocessing.Process( target=scanner, args=(my_objlist,), name='scanner', ).start() logger.info('running forever') try: manager.join() # wait until both workers die except KeyboardInterrupt: pass logger.info('done') if __name__=='__main__': main()