threadpool.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. """
  2. Thread pool job processor with variable number of worker threads (between max/min amount).
  3. Pyro - Python Remote Objects. Copyright by Irmen de Jong (irmen@razorvine.net).
  4. """
  5. from __future__ import with_statement
  6. import time
  7. import logging
  8. import threading
  9. from Pyro4.configuration import config
  10. log = logging.getLogger("Pyro4.threadpool")
  11. class PoolError(Exception):
  12. pass
  13. class NoFreeWorkersError(PoolError):
  14. pass
  15. class Worker(threading.Thread):
  16. def __init__(self, pool):
  17. super(Worker, self).__init__()
  18. self.daemon = True
  19. self.name = "Pyro-Worker-%d" % id(self)
  20. self.job_available = threading.Event()
  21. self.job = None
  22. self.pool = pool
  23. def process(self, job):
  24. self.job = job
  25. self.job_available.set()
  26. def run(self):
  27. while True:
  28. self.job_available.wait()
  29. self.job_available.clear()
  30. if self.job is None:
  31. break
  32. try:
  33. self.job()
  34. except Exception as x:
  35. log.exception("unhandled exception from job in worker thread %s: %s", self.name, x)
  36. self.job = None
  37. self.pool.notify_done(self)
  38. self.pool = None
  39. class Pool(object):
  40. """
  41. A job processing pool that is using a pool of worker threads.
  42. The amount of worker threads in the pool is configurable and scales between min/max size.
  43. """
  44. def __init__(self):
  45. if config.THREADPOOL_SIZE < 1 or config.THREADPOOL_SIZE_MIN < 1:
  46. raise ValueError("threadpool sizes must be greater than zero")
  47. if config.THREADPOOL_SIZE_MIN > config.THREADPOOL_SIZE:
  48. raise ValueError("minimum threadpool size must be less than or equal to max size")
  49. self.idle = set()
  50. self.busy = set()
  51. self.closed = False
  52. for _ in range(config.THREADPOOL_SIZE_MIN):
  53. worker = Worker(self)
  54. self.idle.add(worker)
  55. worker.start()
  56. log.debug("worker pool created with initial size %d", self.num_workers())
  57. self.count_lock = threading.Lock()
  58. def __enter__(self):
  59. return self
  60. def __exit__(self, exc_type, exc_val, exc_tb):
  61. self.close()
  62. def close(self):
  63. if not self.closed:
  64. log.debug("closing down")
  65. for w in list(self.busy):
  66. w.process(None)
  67. for w in list(self.idle):
  68. w.process(None)
  69. self.closed = True
  70. time.sleep(0.1)
  71. idle, self.idle = self.idle, set()
  72. busy, self.busy = self.busy, set()
  73. # check if the threads that are joined are not the current thread,
  74. # otherwise Python 2.x crashes with "cannot join current thread".
  75. current_thread = threading.current_thread()
  76. while idle:
  77. p = idle.pop()
  78. if p is not current_thread:
  79. p.join(timeout=0.1)
  80. while busy:
  81. p = busy.pop()
  82. if p is not current_thread:
  83. p.join(timeout=0.1)
  84. def __repr__(self):
  85. return "<%s.%s at 0x%x; %d busy workers; %d idle workers>" % \
  86. (self.__class__.__module__, self.__class__.__name__, id(self), len(self.busy), len(self.idle))
  87. def num_workers(self):
  88. return len(self.busy) + len(self.idle)
  89. def process(self, job):
  90. if self.closed:
  91. raise PoolError("job queue is closed")
  92. if self.idle:
  93. worker = self.idle.pop()
  94. elif self.num_workers() < config.THREADPOOL_SIZE:
  95. worker = Worker(self)
  96. worker.start()
  97. else:
  98. raise NoFreeWorkersError("no free workers available, increase thread pool size")
  99. self.busy.add(worker)
  100. worker.process(job)
  101. log.debug("worker counts: %d busy, %d idle", len(self.busy), len(self.idle))
  102. def notify_done(self, worker):
  103. if worker in self.busy:
  104. self.busy.remove(worker)
  105. if self.closed:
  106. worker.process(None)
  107. return
  108. if len(self.idle) >= config.THREADPOOL_SIZE_MIN:
  109. worker.process(None)
  110. else:
  111. self.idle.add(worker)
  112. log.debug("worker counts: %d busy, %d idle", len(self.busy), len(self.idle))