Thursday, March 17, 2011

Python multiprocessing utility

This article talks about a simple python command line utility that lets you run your existing programs in parallel, feeding them input and collecting their output to utilize the power of a multi-core processor.


Problem:
We have an Windows executable that reads some input (from stdin), performs a calculation based on this input and prints some output (to stdout). Since the computation for different inputs is independent, it makes sense to run this executable in multiple processes, each one processing their own input file and then in the end, we append all their results (note that you should include the input data in the output, to be able to reconstruct the "Question to the answer" ;-).

Solution:
Here you can find a python script that does the process management in a bit more clever way for you.
It takes 2 command line parameters, the input file and the output file. It then reads the input file line by line, assuming that each line holds single instance of the input parameters you want to feed to your executable (you can easily tweak this input behavior, just check the code and/or ask in comments).
The script is keeping track of the running processes (number of which you can set in the class constructor) and as soon as some process finishes, it takes the next input instance and spawns a new process, that gets the input instance to its stdin. The stdout of the process is mapped to a temporary file. The scripts creates the same number of temp. files as there are processes and it circulates the file handles in a way that prevents from 2 processing writing in a single file at a single moment.
Eventually the content of temp. files is collected in the desired output file and the temp. files are deleted.

What you need to know:
  • The process spawning is rather slow (couple of milliseconds), so you should make sure the individual computations take at least a couple of seconds to make the spawning time negligible.
  • Each instance is calculated in a separate process, so memory leaks should not cause a problem (process releases memory to OS once it finishes).
  • Include the input description in the output of the executable, so that you can map outputs to inputs (in other words, the output file doesn't contain the solutions in the same order as the inputs in the input file).
Let me know if you find this useful ;-)

import ctypes, subprocess, sys, time, os

class Manu:
  def __init__(self,app,numwork):
    self.SYNCHRONIZE=0x00100000
    self.INFINITE = -1
    self.numwork = numwork
    self.workers={}
    self.app=app

#######################################    
############# Timer stuff #############

  def start_timer(self):
    self.t=time.time()
  def get_time(self):
    return time.time()-self.t

#######################################    
############# File stuff  #############
  
  def init_files(self,out_file):
    self.out_file=out_file    
    self.free_files=set()
    self.taken_files={}
    for i in range(self.numwork):
      self.free_files.add(open("_proces_file"+str(i)+".tmp","w+"))    

  def free_file(self,proc_handle):
    self.free_files.add(self.taken_files[proc_handle])
    del self.taken_files[proc_handle]

  def close_files(self):
    for f in self.free_files:
      f.seek(0)
      s=f.read(1048576)
      while s!="":
        self.out_file.write(s)
        s=f.read(1048576)                
      f.close()

    self.out_file.close()
    for i in range(self.numwork):
      os.remove("_proces_file"+str(i)+".tmp")    
      

#######################################    
############# Process stuff ###########
  def print_status(self,solved_items):
    avg_yet=(self.get_time()/solved_items)
    print >> sys.stderr, "Solved item: %3d Time: %5ds End in: %5ds Exp. total: %5ds "%(solved_items,self.get_time(),(len(data)-solved_items)*avg_yet,len(data)*avg_yet)


  def work(self,data,out_file):
    print >> sys.stderr, "Work started for",len(data),"items"
    self.init_files(out_file)
    self.start_timer()
    solved_items=0

    for i,dat in enumerate(data):
      if len(self.workers) == self.numwork:
        self.get_finished()
        solved_items+=1
        self.print_status(solved_items)

      self.my_spawn(dat)
      
    while len(self.workers)>0:
      self.get_finished()
      solved_items+=1
      self.print_status(solved_items)
      
    
    print >> sys.stderr, "Finished. Total time:",self.get_time(),"Avg. time:",self.get_time()/len(data)
    self.close_files()


  def my_spawn(self,data):
    f=self.free_files.pop()
    p = subprocess.Popen([self.app], stdout=f, stdin=subprocess.PIPE, shell=False)    
    h = ctypes.windll.kernel32.OpenProcess(self.SYNCHRONIZE, False, p.pid)      
    p.stdin.write(data+"\n")

    self.taken_files[h]=f    
    self.workers[h]=p


  def get_finished(self):
    arrtype = ctypes.c_long * len(self.workers)
    handle_array = arrtype(*self.workers.keys())
    ret = ctypes.windll.kernel32.WaitForMultipleObjects(len(self.workers), handle_array, False, self.INFINITE)
    h = handle_array[ret]

    ctypes.windll.kernel32.CloseHandle(h)
    self.free_file(h)
    del self.workers[h]


#Example usage:
in_file=sys.argv[1]
data=[ x.strip() for x in open(in_file).readlines()]
out_file=sys.argv[2]
a=Manu(r"wrapper.exe",8)
a.work(data,open(out_file,"w"))

No comments:

Post a Comment