""" Socket server based on a worker thread pool. Doesn't use select. Uses a single worker thread per client connection. Pyro - Python Remote Objects. Copyright by Irmen de Jong (irmen@razorvine.net). """ from __future__ import print_function import socket import logging import sys import time import threading import os from Pyro4 import socketutil, errors, util from Pyro4.configuration import config from .threadpool import Pool, NoFreeWorkersError from .multiplexserver import selectors log = logging.getLogger("Pyro4.threadpoolserver") _client_disconnect_lock = threading.Lock() class ClientConnectionJob(object): """ Takes care of a single client connection and all requests that may arrive during its life span. """ def __init__(self, clientSocket, clientAddr, daemon): self.csock = socketutil.SocketConnection(clientSocket) self.caddr = clientAddr self.daemon = daemon def __call__(self): if self.handleConnection(): try: while True: try: self.daemon.handleRequest(self.csock) except (socket.error, errors.ConnectionClosedError): # client went away. log.debug("disconnected %s", self.caddr) break except errors.SecurityError: log.debug("security error on client %s", self.caddr) break except errors.TimeoutError as x: # for timeout errors we're not really interested in detailed traceback info log.warning("error during handleRequest: %s" % x) break except: # other errors log a warning, break this loop and close the client connection ex_t, ex_v, ex_tb = sys.exc_info() tb = util.formatTraceback(ex_t, ex_v, ex_tb) msg = "error during handleRequest: %s; %s" % (ex_v, "".join(tb)) log.warning(msg) break finally: with _client_disconnect_lock: try: self.daemon._clientDisconnect(self.csock) except Exception as x: log.warning("Error in clientDisconnect: " + str(x)) self.csock.close() def handleConnection(self): # connection handshake try: if self.daemon._handshake(self.csock): return True self.csock.close() except: ex_t, ex_v, ex_tb = sys.exc_info() tb = util.formatTraceback(ex_t, ex_v, ex_tb) log.warning("error during connect/handshake: %s; %s", ex_v, "\n".join(tb)) self.csock.close() return False def denyConnection(self, reason): log.warning("client connection was denied: " + reason) # return failed handshake self.daemon._handshake(self.csock, denied_reason=reason) self.csock.close() class Housekeeper(threading.Thread): def __init__(self, daemon): super(Housekeeper, self).__init__(name="housekeeper") self.pyroDaemon = daemon self.stop = threading.Event() self.daemon = True self.waittime = min(config.POLLTIMEOUT or 0, max(config.COMMTIMEOUT or 0, 5)) def run(self): while True: if self.stop.wait(self.waittime): break self.pyroDaemon._housekeeping() class SocketServer_Threadpool(object): """transport server for socket connections, worker thread pool version.""" def __init__(self): self.daemon = self.sock = self._socketaddr = self.locationStr = self.pool = None self.shutting_down = False self.housekeeper = None self._selector = selectors.DefaultSelector() if selectors else None def init(self, daemon, host, port, unixsocket=None): log.info("starting thread pool socketserver") self.daemon = daemon self.sock = None bind_location = unixsocket if unixsocket else (host, port) if config.SSL: sslContext = socketutil.getSSLcontext(servercert=config.SSL_SERVERCERT, serverkey=config.SSL_SERVERKEY, keypassword=config.SSL_SERVERKEYPASSWD, cacerts=config.SSL_CACERTS) log.info("using SSL, cert=%s key=%s cacerts=%s", config.SSL_SERVERCERT, config.SSL_SERVERKEY, config.SSL_CACERTS) else: sslContext = None log.info("not using SSL") self.sock = socketutil.createSocket(bind=bind_location, reuseaddr=config.SOCK_REUSE, timeout=config.COMMTIMEOUT, noinherit=True, nodelay=config.SOCK_NODELAY, sslContext=sslContext) self._socketaddr = self.sock.getsockname() if not unixsocket and self._socketaddr[0].startswith("127."): if host is None or host.lower() != "localhost" and not host.startswith("127."): log.warning("weird DNS setup: %s resolves to localhost (127.x.x.x)", host) if unixsocket: self.locationStr = "./u:" + unixsocket else: host = host or self._socketaddr[0] port = port or self._socketaddr[1] if ":" in host: # ipv6 self.locationStr = "[%s]:%d" % (host, port) else: self.locationStr = "%s:%d" % (host, port) self.pool = Pool() self.housekeeper = Housekeeper(daemon) self.housekeeper.start() if self._selector: self._selector.register(self.sock, selectors.EVENT_READ, self) def __del__(self): if self.sock is not None: self.sock.close() self.sock = None if self.pool is not None: self.pool.close() self.pool = None if self.housekeeper: self.housekeeper.stop.set() self.housekeeper.join() self.housekeeper = None def __repr__(self): return "<%s on %s; %d workers>" % (self.__class__.__name__, self.locationStr, self.pool.num_workers()) def loop(self, loopCondition=lambda: True): log.debug("threadpool server requestloop") while (self.sock is not None) and not self.shutting_down and loopCondition(): try: self.events([self.sock]) except (socket.error, OSError) as x: if not loopCondition(): # swallow the socket error if loop terminates anyway # this can occur if we are asked to shutdown, socket can be invalid then break # socket errors may not lead to a server abort, so we log it and continue err = getattr(x, "errno", x.args[0]) log.warning("socket error '%s' with errno=%d, shouldn't happen", x, err) continue except KeyboardInterrupt: log.debug("stopping on break signal") break def combine_loop(self, server): raise TypeError("You can't use the loop combiner on the threadpool server type") def events(self, eventsockets): """used for external event loops: handle events that occur on one of the sockets of this server""" # we only react on events on our own server socket. # all other (client) sockets are owned by their individual threads. assert self.sock in eventsockets try: if self._selector: events = self._selector.select(config.POLLTIMEOUT) if not events: return csock, caddr = self.sock.accept() if self.shutting_down: csock.close() return if hasattr(csock, "getpeercert"): log.debug("connected %s - SSL", caddr) else: log.debug("connected %s - unencrypted", caddr) if config.COMMTIMEOUT: csock.settimeout(config.COMMTIMEOUT) job = ClientConnectionJob(csock, caddr, self.daemon) try: self.pool.process(job) except NoFreeWorkersError: job.denyConnection("no free workers, increase server threadpool size") except socket.timeout: pass # just continue the loop on a timeout on accept def shutdown(self): self.shutting_down = True self.wakeup() time.sleep(0.05) self.close() self.sock = None def close(self): if self.housekeeper: self.housekeeper.stop.set() self.housekeeper.join() self.housekeeper = None if self.sock: sockname = None try: sockname = self.sock.getsockname() except (socket.error, OSError): pass try: self.sock.close() if type(sockname) is str: # it was a Unix domain socket, remove it from the filesystem if os.path.exists(sockname): os.remove(sockname) except Exception: pass self.sock = None self.pool.close() @property def sockets(self): # the server socket is all we care about, all client sockets are running in their own threads return [self.sock] @property def selector(self): raise TypeError("threadpool server doesn't have multiplexing selector") def wakeup(self): socketutil.interruptSocket(self._socketaddr)