123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132 |
- """
- Thread pool job processor with variable number of worker threads (between max/min amount).
- Pyro - Python Remote Objects. Copyright by Irmen de Jong (irmen@razorvine.net).
- """
- from __future__ import with_statement
- import time
- import logging
- import threading
- from Pyro4.configuration import config
- log = logging.getLogger("Pyro4.threadpool")
- class PoolError(Exception):
- pass
- class NoFreeWorkersError(PoolError):
- pass
- class Worker(threading.Thread):
- def __init__(self, pool):
- super(Worker, self).__init__()
- self.daemon = True
- self.name = "Pyro-Worker-%d" % id(self)
- self.job_available = threading.Event()
- self.job = None
- self.pool = pool
- def process(self, job):
- self.job = job
- self.job_available.set()
- def run(self):
- while True:
- self.job_available.wait()
- self.job_available.clear()
- if self.job is None:
- break
- try:
- self.job()
- except Exception as x:
- log.exception("unhandled exception from job in worker thread %s: %s", self.name, x)
- self.job = None
- self.pool.notify_done(self)
- self.pool = None
- class Pool(object):
- """
- A job processing pool that is using a pool of worker threads.
- The amount of worker threads in the pool is configurable and scales between min/max size.
- """
- def __init__(self):
- if config.THREADPOOL_SIZE < 1 or config.THREADPOOL_SIZE_MIN < 1:
- raise ValueError("threadpool sizes must be greater than zero")
- if config.THREADPOOL_SIZE_MIN > config.THREADPOOL_SIZE:
- raise ValueError("minimum threadpool size must be less than or equal to max size")
- self.idle = set()
- self.busy = set()
- self.closed = False
- for _ in range(config.THREADPOOL_SIZE_MIN):
- worker = Worker(self)
- self.idle.add(worker)
- worker.start()
- log.debug("worker pool created with initial size %d", self.num_workers())
- self.count_lock = threading.Lock()
- def __enter__(self):
- return self
- def __exit__(self, exc_type, exc_val, exc_tb):
- self.close()
- def close(self):
- if not self.closed:
- log.debug("closing down")
- for w in list(self.busy):
- w.process(None)
- for w in list(self.idle):
- w.process(None)
- self.closed = True
- time.sleep(0.1)
- idle, self.idle = self.idle, set()
- busy, self.busy = self.busy, set()
- # check if the threads that are joined are not the current thread,
- # otherwise Python 2.x crashes with "cannot join current thread".
- current_thread = threading.current_thread()
- while idle:
- p = idle.pop()
- if p is not current_thread:
- p.join(timeout=0.1)
- while busy:
- p = busy.pop()
- if p is not current_thread:
- p.join(timeout=0.1)
- def __repr__(self):
- return "<%s.%s at 0x%x; %d busy workers; %d idle workers>" % \
- (self.__class__.__module__, self.__class__.__name__, id(self), len(self.busy), len(self.idle))
- def num_workers(self):
- return len(self.busy) + len(self.idle)
- def process(self, job):
- if self.closed:
- raise PoolError("job queue is closed")
- if self.idle:
- worker = self.idle.pop()
- elif self.num_workers() < config.THREADPOOL_SIZE:
- worker = Worker(self)
- worker.start()
- else:
- raise NoFreeWorkersError("no free workers available, increase thread pool size")
- self.busy.add(worker)
- worker.process(job)
- log.debug("worker counts: %d busy, %d idle", len(self.busy), len(self.idle))
- def notify_done(self, worker):
- if worker in self.busy:
- self.busy.remove(worker)
- if self.closed:
- worker.process(None)
- return
- if len(self.idle) >= config.THREADPOOL_SIZE_MIN:
- worker.process(None)
- else:
- self.idle.add(worker)
- log.debug("worker counts: %d busy, %d idle", len(self.busy), len(self.idle))
|