#!/usr/bin/env python # -*- coding: utf-8 -*- # ---------------------------------------------------------------------- # Threading related objects. # These classes are based on the code available at http://gist.github.com/51686 # (c) 2008, John Stowers # ---------------------------------------------------------------------- import gobject import threading import logging _log = logging.getLogger('helper.gtk_threads') # ---------------------------------------------------------------------- # Helper class for GObject # ---------------------------------------------------------------------- class _IdleObject(gobject.GObject): """ Override gobject.GObject to always emit signals in the main thread by emmitting on an idle handler """ def __init__(self): gobject.GObject.__init__(self) def emit(self, *args): gobject.idle_add(gobject.GObject.emit, self, *args) # ---------------------------------------------------------------------- # A (worker) thread; does all the hard work. # ---------------------------------------------------------------------- class _WorkerThread(threading.Thread, _IdleObject): """ A single working thread. """ __gsignals__ = { "completed": ( gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, (gobject.TYPE_PYOBJECT, )), # list/networkdata "exception": ( gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, (gobject.TYPE_PYOBJECT, ))} # The exception def __init__(self, function, *args, **kwargs): threading.Thread.__init__(self) _IdleObject.__init__(self) self._function = function self._args = args self._kwargs = kwargs def run(self): # call the function _log.debug('Thread id %s calling %s()', self.getName(), str(self._function.__name__)) args = self._args kwargs = self._kwargs try: result = self._function(*args, **kwargs) except Exception, exc: # Catch ALL exceptions # TODO: Check if this catch all warnins too! _log.debug('Exception inside thread %s:\n%s', self.getName(), str(exc)) self.emit("exception", exc) return _log.debug('Thread id %s completed', self.getName()) self.emit("completed", result) return # ---------------------------------------------------------------------- # The Thread manager class # ---------------------------------------------------------------------- class ThreadManager(object): """Manages the threads.""" def __init__(self, max_threads=2): """Start the thread pool. The number of threads in the pool is defined by `pool_size`, defaults to 2.""" self._max_threads = max_threads self._thread_pool = [] self._running = [] self._thread_id = 0 return def _remove_thread(self, widget, arg=None): """Called when the thread completes. We remove it from the thread list (dictionary, actually) and start the next thread (if there is one).""" # not actually a widget. It's the object that emitted the signal, in # this case, the _WorkerThread object. thread_id = widget.getName() self._running.remove(thread_id) _log.debug('Thread id %s completed, %d threads in the queue, ' \ '%d still running', thread_id, len(self._thread_pool), len(self._running)) if self._thread_pool: if len(self._running) < self._max_threads: next = self._thread_pool.pop() _log.debug('Dequeuing thread %s', next.getName()) self._running.append(next.getName()) next.start() return def add_work(self, complete_cb, exception_cb, func, *args, **kwargs): """Add a work to the thread list. `complete_cb` is the function to be called with the result of the work. `exception_cb` is the function to be called if there are any exceptions raised. Note that, once the work is complete, one of those will be called, not both. `func` is the function to be called in the secondary threads. `args` and `kwargs` are parameters passed to the function.""" thread = _WorkerThread(func, *args, **kwargs) thread_id = '%s-%s' % (self._thread_id, func.__name__) thread.connect('completed', complete_cb) thread.connect('completed', self._remove_thread) thread.connect('exception', exception_cb) thread.connect('exception', self._remove_thread) thread.setName(thread_id) if len(self._running) < self._max_threads: # immediatelly start the thread self._running.append(thread_id) thread.start() else: # add the thread to the queue running_names = ', '.join(self._running) _log.debug('Threads %s running, adding %s to the queue', running_names, thread_id) self._thread_pool.append(thread) self._thread_id += 1 return def clear(self): """Clear the thread pool list. This will cause the manager to stop working after the threads finish.""" self._thread_pool = [] self._running = [] return