Thursday, October 25, 2012

Python Event Listener - multiprocessing deamon

I wanted to have a file change listener such that if new data is added to a file I could run a code.
Following this post, I've written a modification so now the listener is working in multiprocessing.



#!/usr/bin/python
# A code to stream file as it grows
# by Ran Novitsky Nof Oct 25, 2012

import os,time
from multiprocessing import Process,Pipe

# change this file name
FileName = 'stream.txt'

# adapted from http://www.valuedlessons.com/2008/04/events-in-python.html


# the Event class take care of adding, removing and executing the functions
class Event:
    def __init__(self):
        self.handlers = set()

    def handle(self, handler):
        self.handlers.add(handler)
        return self

    def unhandle(self, handler):
        try:
            self.handlers.remove(handler)
        except:
            raise ValueError("Handler is not handling this event, so cannot unhandle it.")
        return self

    def fire(self, *args, **kargs):
        for handler in self.handlers:
            handler(*args, **kargs)

    def getHandlerCount(self):
        return len(self.handlers)

    __iadd__ = handle
    __isub__ = unhandle
    __call__ = fire
    __len__  = getHandlerCount
 
# the stream class takes care of the
# function to fire in case of an event
class Stream:
  def __init__(self,fname):
    self.fname = fname
    self.fd = open(fname,'r')
    self.get_stream()
  def get_stream(self):
    # this is the actual function to fire
    print self.fd.read(),
  def restart(self):
    self.fd.seek(0)
 
# the MockFileWatcher creats the multiprocessing deamon to alert on file change
class MockFileWatcher:
    def __init__(self,source_path,sleeptime=0.1):
        self.fileChanged = Event()
        self.source_path = source_path
        self.running=False
        self.deamon=False
        self.fd = None
        self.sleep = sleeptime
        # the next two lines can be set outside the class
        self.stream = Stream(source_path)
        self.fileChanged += self.stream.get_stream # add event handler
    def watchFiles(self):
        # open the file in non-blocking read mode
        self.fd = os.open(self.source_path,os.O_RDONLY | os.O_NONBLOCK)
        # go to the end
        os.lseek(self.fd,0,2)
        self.running=True
        self.deamon=False
        while self.running:
          # try to read
          if os.read(self.fd,1)!='':
            # New data was added
            os.lseek(self.fd,0,2) # go to the end of file
            self.fileChanged() # fire the event
          # see if the parent process has a message
          if self.chiled_conn.poll():
            # execute the message
            eval(self.chiled_conn.recv())
          # sleep for a while
          time.sleep(self.sleep)
    def watchFilesDeamon(self):
        # start the deamon
        self.parent_conn,self.chiled_conn = Pipe()
        p = Process(target=self.watchFiles)
        p.start()
        self.p = p  
        self.deamon=True
        return p
    def stop(self):
        # stop deamon/process
        if self.deamon:
          self.parent_conn.send('self.stop()')
          print "Hold while stopping child process..."
          # hold depends on the sleeping time
          if self.parent_conn.recv():
            pass
            #print "chiled is stopped"
          self.p.terminate() # terminate multiprocessing
          self.deamon=False
        # stop process
        self.running=False
        self.fd=None
        # if this is the process make sure parent know we are done
        if not self.deamon:  self.chiled_conn.send(True)      
        return True

if __name__=='__main__':
  watcher = MockFileWatcher(FileName)
  p = watcher.watchFilesDeamon()
  print 'from this point the listner is running at the background'
  print 'add more code here so you can see how it works.'

Try this by changing the file name at the top and run it. then add new lines to the file.

No comments:

Post a Comment

Please Comment this Post or send me an Email