threadpoolserver.py 9.8 KB


  1. """
  2. Socket server based on a worker thread pool. Doesn't use select.
  3. Uses a single worker thread per client connection.
  4. Pyro - Python Remote Objects. Copyright by Irmen de Jong (irmen@razorvine.net).
  5. """
  6. from __future__ import print_function
  7. import socket
  8. import logging
  9. import sys
  10. import time
  11. import threading
  12. import os
  13. from Pyro4 import socketutil, errors, util
  14. from Pyro4.configuration import config
  15. from .threadpool import Pool, NoFreeWorkersError
  16. from .multiplexserver import selectors
  17. log = logging.getLogger("Pyro4.threadpoolserver")
  18. _client_disconnect_lock = threading.Lock()
  19. class ClientConnectionJob(object):
  20. """
  21. Takes care of a single client connection and all requests
  22. that may arrive during its life span.
  23. """
  24. def __init__(self, clientSocket, clientAddr, daemon):
  25. self.csock = socketutil.SocketConnection(clientSocket)
  26. self.caddr = clientAddr
  27. self.daemon = daemon
  28. def __call__(self):
  29. if self.handleConnection():
  30. try:
  31. while True:
  32. try:
  33. self.daemon.handleRequest(self.csock)
  34. except (socket.error, errors.ConnectionClosedError):
  35. # client went away.
  36. log.debug("disconnected %s", self.caddr)
  37. break
  38. except errors.SecurityError:
  39. log.debug("security error on client %s", self.caddr)
  40. break
  41. except errors.TimeoutError as x:
  42. # for timeout errors we're not really interested in detailed traceback info
  43. log.warning("error during handleRequest: %s" % x)
  44. break
  45. except:
  46. # other errors log a warning, break this loop and close the client connection
  47. ex_t, ex_v, ex_tb = sys.exc_info()
  48. tb = util.formatTraceback(ex_t, ex_v, ex_tb)
  49. msg = "error during handleRequest: %s; %s" % (ex_v, "".join(tb))
  50. log.warning(msg)
  51. break
  52. finally:
  53. with _client_disconnect_lock:
  54. try:
  55. self.daemon._clientDisconnect(self.csock)
  56. except Exception as x:
  57. log.warning("Error in clientDisconnect: " + str(x))
  58. self.csock.close()
  59. def handleConnection(self):
  60. # connection handshake
  61. try:
  62. if self.daemon._handshake(self.csock):
  63. return True
  64. self.csock.close()
  65. except:
  66. ex_t, ex_v, ex_tb = sys.exc_info()
  67. tb = util.formatTraceback(ex_t, ex_v, ex_tb)
  68. log.warning("error during connect/handshake: %s; %s", ex_v, "\n".join(tb))
  69. self.csock.close()
  70. return False
  71. def denyConnection(self, reason):
  72. log.warning("client connection was denied: " + reason)
  73. # return failed handshake
  74. self.daemon._handshake(self.csock, denied_reason=reason)
  75. self.csock.close()
  76. class Housekeeper(threading.Thread):
  77. def __init__(self, daemon):
  78. super(Housekeeper, self).__init__(name="housekeeper")
  79. self.pyroDaemon = daemon
  80. self.stop = threading.Event()
  81. self.daemon = True
  82. self.waittime = min(config.POLLTIMEOUT or 0, max(config.COMMTIMEOUT or 0, 5))
  83. def run(self):
  84. while True:
  85. if self.stop.wait(self.waittime):
  86. break
  87. self.pyroDaemon._housekeeping()
  88. class SocketServer_Threadpool(object):
  89. """transport server for socket connections, worker thread pool version."""
  90. def __init__(self):
  91. self.daemon = self.sock = self._socketaddr = self.locationStr = self.pool = None
  92. self.shutting_down = False
  93. self.housekeeper = None
  94. self._selector = selectors.DefaultSelector() if selectors else None
  95. def init(self, daemon, host, port, unixsocket=None):
  96. log.info("starting thread pool socketserver")
  97. self.daemon = daemon
  98. self.sock = None
  99. bind_location = unixsocket if unixsocket else (host, port)
  100. if config.SSL:
  101. sslContext = socketutil.getSSLcontext(servercert=config.SSL_SERVERCERT,
  102. serverkey=config.SSL_SERVERKEY,
  103. keypassword=config.SSL_SERVERKEYPASSWD,
  104. cacerts=config.SSL_CACERTS)
  105. log.info("using SSL, cert=%s key=%s cacerts=%s", config.SSL_SERVERCERT, config.SSL_SERVERKEY, config.SSL_CACERTS)
  106. else:
  107. sslContext = None
  108. log.info("not using SSL")
  109. self.sock = socketutil.createSocket(bind=bind_location,
  110. reuseaddr=config.SOCK_REUSE,
  111. timeout=config.COMMTIMEOUT,
  112. noinherit=True,
  113. nodelay=config.SOCK_NODELAY,
  114. sslContext=sslContext)
  115. self._socketaddr = self.sock.getsockname()
  116. if not unixsocket and self._socketaddr[0].startswith("127."):
  117. if host is None or host.lower() != "localhost" and not host.startswith("127."):
  118. log.warning("weird DNS setup: %s resolves to localhost (127.x.x.x)", host)
  119. if unixsocket:
  120. self.locationStr = "./u:" + unixsocket
  121. else:
  122. host = host or self._socketaddr[0]
  123. port = port or self._socketaddr[1]
  124. if ":" in host: # ipv6
  125. self.locationStr = "[%s]:%d" % (host, port)
  126. else:
  127. self.locationStr = "%s:%d" % (host, port)
  128. self.pool = Pool()
  129. self.housekeeper = Housekeeper(daemon)
  130. self.housekeeper.start()
  131. if self._selector:
  132. self._selector.register(self.sock, selectors.EVENT_READ, self)
  133. def __del__(self):
  134. if self.sock is not None:
  135. self.sock.close()
  136. self.sock = None
  137. if self.pool is not None:
  138. self.pool.close()
  139. self.pool = None
  140. if self.housekeeper:
  141. self.housekeeper.stop.set()
  142. self.housekeeper.join()
  143. self.housekeeper = None
  144. def __repr__(self):
  145. return "<%s on %s; %d workers>" % (self.__class__.__name__, self.locationStr, self.pool.num_workers())
  146. def loop(self, loopCondition=lambda: True):
  147. log.debug("threadpool server requestloop")
  148. while (self.sock is not None) and not self.shutting_down and loopCondition():
  149. try:
  150. self.events([self.sock])
  151. except (socket.error, OSError) as x:
  152. if not loopCondition():
  153. # swallow the socket error if loop terminates anyway
  154. # this can occur if we are asked to shutdown, socket can be invalid then
  155. break
  156. # socket errors may not lead to a server abort, so we log it and continue
  157. err = getattr(x, "errno", x.args[0])
  158. log.warning("socket error '%s' with errno=%d, shouldn't happen", x, err)
  159. continue
  160. except KeyboardInterrupt:
  161. log.debug("stopping on break signal")
  162. break
  163. def combine_loop(self, server):
  164. raise TypeError("You can't use the loop combiner on the threadpool server type")
  165. def events(self, eventsockets):
  166. """used for external event loops: handle events that occur on one of the sockets of this server"""
  167. # we only react on events on our own server socket.
  168. # all other (client) sockets are owned by their individual threads.
  169. assert self.sock in eventsockets
  170. try:
  171. if self._selector:
  172. events = self._selector.select(config.POLLTIMEOUT)
  173. if not events:
  174. return
  175. csock, caddr = self.sock.accept()
  176. if self.shutting_down:
  177. csock.close()
  178. return
  179. if hasattr(csock, "getpeercert"):
  180. log.debug("connected %s - SSL", caddr)
  181. else:
  182. log.debug("connected %s - unencrypted", caddr)
  183. if config.COMMTIMEOUT:
  184. csock.settimeout(config.COMMTIMEOUT)
  185. job = ClientConnectionJob(csock, caddr, self.daemon)
  186. try:
  187. self.pool.process(job)
  188. except NoFreeWorkersError:
  189. job.denyConnection("no free workers, increase server threadpool size")
  190. except socket.timeout:
  191. pass # just continue the loop on a timeout on accept
  192. def shutdown(self):
  193. self.shutting_down = True
  194. self.wakeup()
  195. time.sleep(0.05)
  196. self.close()
  197. self.sock = None
  198. def close(self):
  199. if self.housekeeper:
  200. self.housekeeper.stop.set()
  201. self.housekeeper.join()
  202. self.housekeeper = None
  203. if self.sock:
  204. sockname = None
  205. try:
  206. sockname = self.sock.getsockname()
  207. except (socket.error, OSError):
  208. pass
  209. try:
  210. self.sock.close()
  211. if type(sockname) is str:
  212. # it was a Unix domain socket, remove it from the filesystem
  213. if os.path.exists(sockname):
  214. os.remove(sockname)
  215. except Exception:
  216. pass
  217. self.sock = None
  218. self.pool.close()
  219. @property
  220. def sockets(self):
  221. # the server socket is all we care about, all client sockets are running in their own threads
  222. return [self.sock]
  223. @property
  224. def selector(self):
  225. raise TypeError("threadpool server doesn't have multiplexing selector")
  226. def wakeup(self):
  227. socketutil.interruptSocket(self._socketaddr)