diff --git a/mitterlib/ui/ui_pygtk.py b/mitterlib/ui/ui_pygtk.py index 1624e2f..d44a34f 100644 --- a/mitterlib/ui/ui_pygtk.py +++ b/mitterlib/ui/ui_pygtk.py @@ -23,6 +23,8 @@ import gobject gobject.threads_init() import logging +import threading +import Queue from mitterlib.ui.helpers.image_helpers import find_image @@ -35,6 +37,107 @@ from mitterlib.ui.helpers.image_helpers import find_image _log = logging.getLogger('ui.pygtk') +# ---------------------------------------------------------------------- +# Threading related objects. +# These classes are based on the code available at http://gist.github.com/51686 +# (c) 2008, John Stowers +# ---------------------------------------------------------------------- + +class _IdleObject(gobject.GObject): + """ + Override gobject.GObject to always emit signals in the main thread + by emmitting on an idle handler + """ + def __init__(self): + gobject.GObject.__init__(self) + + def emit(self, *args): + gobject.idle_add(gobject.GObject.emit,self,*args) + + +class _WorkerThread(threading.Thread, _IdleObject): + """ + A single working thread. + """ + __gsignals__ = { + "completed": ( + gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, [ + gobject.TYPE_PYOBJECT]), # The list/NetworkData object + "exception": ( + gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, [ + gobject.TYPE_PYOBJECT]) # The exception + } + + def __init__(self, queue): + threading.Thread.__init__(self) + _IdleObject.__init__(self) + self._queue = queue + + def run(self): + while True: + _log.debug('Thread %d waiting for work', self.get_ident()) + work = self._queue.get() + if work is None: + # End the thread + break + + # call the function + _log.debug('Thread %s got work', self.get_ident()) + (complete, exception, function, args, kwargs) = work + + _log.debug('Calling %s', str(function)) + + try: + result = function(*args, **kwargs) + except Exception, exc: # Catch ALL exceptions + # XXX: Check if this catch all warnins too! + _log.debug('Exception %s', str(exc)) + self.emit("exception", exc) + + self.emit("completed", result) + + # work complete + self._queue.task_done() + + _log.debug('Thread %d ending', self.get_ident()) + return + + +class _ThreadManager(object): + """Manages the threads.""" + + def __init__(self, pool_size=2): + """Start the thread pool. The number of threads in the pool is defined + by `pool_size`, defaults to 2.""" + self._queue = Queue.Queue() + self._thread_pool = [] + + while pool_size > 0: + _log.debug('Starting thread %d', pool_size) + new_thread = _WorkerThread(self._queue) + new_thread.start() + self._thread_pool.append(new_thread) + pool_size -= 1 + + _log.debug('All threads started') + return + + def add_work(self, complete_cb, exception_cb, func, *args, **kwargs): + """Add a work to the thread list. `complete_cb` is the function to be + called with the result of the work. `exception_cb` is the function to + be called if there are any exceptions raised. Note that, once the + work is complete, one of those will be called, not both. `func` is the + function to be called in the secondary threads. `args` and `kwargs` + are parameters passed to the function.""" + + # wrap all the params in a nice tuple to be added to the queue. + work = (complete_cb, exception_cb, func, args, kwargs) + self._queue.put(work) + return + +# ---------------------------------------------------------------------- +# Mitter interface object +# ---------------------------------------------------------------------- class Interface(object): """Linux/GTK interface for Mitter."""