multiplexserver.py 9.6 KB


  1. """
  2. Socket server based on socket multiplexing. Doesn't use threads.
  3. Uses the best available selector (kqueue, poll, select).
  4. Pyro - Python Remote Objects. Copyright by Irmen de Jong (irmen@razorvine.net).
  5. """
  6. from __future__ import print_function
  7. import socket
  8. import time
  9. import sys
  10. import logging
  11. import os
  12. from collections import defaultdict
  13. from Pyro4 import socketutil, errors, util
  14. from Pyro4.configuration import config
  15. if sys.version_info >= (3, 5):
  16. import selectors
  17. else:
  18. try:
  19. # first try selectors2 as it has better semantics when dealing with interrupted system calls
  20. import selectors2 as selectors
  21. except ImportError:
  22. if sys.version_info >= (3, 4):
  23. import selectors
  24. else:
  25. try:
  26. import selectors34 as selectors
  27. except ImportError:
  28. selectors = None
  29. log = logging.getLogger("Pyro4.multiplexserver")
  30. class SocketServer_Multiplex(object):
  31. """Multiplexed transport server for socket connections (uses select, poll, kqueue, ...)"""
  32. def __init__(self):
  33. self.sock = self.daemon = self.locationStr = None
  34. if selectors is None:
  35. raise RuntimeError("This Python installation doesn't have the 'selectors2' or 'selectors34' module installed, " +
  36. "which is required to use Pyro's multiplex server. Install it, or use the threadpool server instead.")
  37. self.selector = selectors.DefaultSelector()
  38. self.shutting_down = False
  39. def init(self, daemon, host, port, unixsocket=None):
  40. log.info("starting multiplexed socketserver")
  41. log.debug("selector implementation: %s.%s", self.selector.__class__.__module__, self.selector.__class__.__name__)
  42. self.sock = None
  43. bind_location = unixsocket if unixsocket else (host, port)
  44. if config.SSL:
  45. sslContext = socketutil.getSSLcontext(servercert=config.SSL_SERVERCERT,
  46. serverkey=config.SSL_SERVERKEY,
  47. keypassword=config.SSL_SERVERKEYPASSWD,
  48. cacerts=config.SSL_CACERTS)
  49. log.info("using SSL, cert=%s key=%s cacerts=%s", config.SSL_SERVERCERT, config.SSL_SERVERKEY, config.SSL_CACERTS)
  50. else:
  51. sslContext = None
  52. log.info("not using SSL")
  53. self.sock = socketutil.createSocket(bind=bind_location,
  54. reuseaddr=config.SOCK_REUSE,
  55. timeout=config.COMMTIMEOUT,
  56. noinherit=True,
  57. nodelay=config.SOCK_NODELAY,
  58. sslContext=sslContext)
  59. self.daemon = daemon
  60. self._socketaddr = sockaddr = self.sock.getsockname()
  61. if not unixsocket and sockaddr[0].startswith("127."):
  62. if host is None or host.lower() != "localhost" and not host.startswith("127."):
  63. log.warning("weird DNS setup: %s resolves to localhost (127.x.x.x)", host)
  64. if unixsocket:
  65. self.locationStr = "./u:" + unixsocket
  66. else:
  67. host = host or sockaddr[0]
  68. port = port or sockaddr[1]
  69. if ":" in host: # ipv6
  70. self.locationStr = "[%s]:%d" % (host, port)
  71. else:
  72. self.locationStr = "%s:%d" % (host, port)
  73. self.selector.register(self.sock, selectors.EVENT_READ, self)
  74. def __repr__(self):
  75. return "<%s on %s; %d connections>" % (self.__class__.__name__, self.locationStr, len(self.selector.get_map()) - 1)
  76. def __del__(self):
  77. if self.sock is not None:
  78. self.selector.close()
  79. self.sock.close()
  80. self.sock = None
  81. def events(self, eventsockets):
  82. """handle events that occur on one of the sockets of this server"""
  83. for s in eventsockets:
  84. if self.shutting_down:
  85. return
  86. if s is self.sock:
  87. # server socket, means new connection
  88. conn = self._handleConnection(self.sock)
  89. if conn:
  90. self.selector.register(conn, selectors.EVENT_READ, self)
  91. else:
  92. # must be client socket, means remote call
  93. active = self.handleRequest(s)
  94. if not active:
  95. try:
  96. self.daemon._clientDisconnect(s)
  97. except Exception as x:
  98. log.warning("Error in clientDisconnect: " + str(x))
  99. self.selector.unregister(s)
  100. s.close()
  101. self.daemon._housekeeping()
  102. def _handleConnection(self, sock):
  103. try:
  104. if sock is None:
  105. return
  106. csock, caddr = sock.accept()
  107. if hasattr(csock, "getpeercert"):
  108. log.debug("connected %s - SSL", caddr)
  109. else:
  110. log.debug("connected %s - unencrypted", caddr)
  111. if config.COMMTIMEOUT:
  112. csock.settimeout(config.COMMTIMEOUT)
  113. except (socket.error, OSError) as x:
  114. err = getattr(x, "errno", x.args[0])
  115. if err in socketutil.ERRNO_BADF or err in socketutil.ERRNO_ENOTSOCK:
  116. # our server socket got destroyed
  117. raise errors.ConnectionClosedError("server socket closed")
  118. # socket errors may not lead to a server abort, so we log it and continue
  119. err = getattr(x, "errno", x.args[0])
  120. log.warning("accept() failed '%s' with errno=%d, shouldn't happen", x, err)
  121. return None
  122. try:
  123. conn = socketutil.SocketConnection(csock)
  124. if self.daemon._handshake(conn):
  125. return conn
  126. conn.close()
  127. except: # catch all errors, otherwise the event loop could terminate
  128. ex_t, ex_v, ex_tb = sys.exc_info()
  129. tb = util.formatTraceback(ex_t, ex_v, ex_tb)
  130. log.warning("error during connect/handshake: %s; %s", ex_v, "\n".join(tb))
  131. try:
  132. csock.shutdown(socket.SHUT_RDWR)
  133. except (OSError, socket.error):
  134. pass
  135. csock.close()
  136. return None
  137. def shutdown(self):
  138. self.shutting_down = True
  139. self.wakeup()
  140. time.sleep(0.05)
  141. self.close()
  142. self.sock = None
  143. def close(self):
  144. self.selector.close()
  145. if self.sock:
  146. sockname = None
  147. try:
  148. sockname = self.sock.getsockname()
  149. except (socket.error, OSError):
  150. pass
  151. self.sock.close()
  152. if type(sockname) is str:
  153. # it was a Unix domain socket, remove it from the filesystem
  154. if os.path.exists(sockname):
  155. os.remove(sockname)
  156. self.sock = None
  157. @property
  158. def sockets(self):
  159. registrations = self.selector.get_map()
  160. if registrations:
  161. return [sk.fileobj for sk in registrations.values()]
  162. else:
  163. return []
  164. def wakeup(self):
  165. """bit of a hack to trigger a blocking server to get out of the loop, useful at clean shutdowns"""
  166. socketutil.interruptSocket(self._socketaddr)
  167. def handleRequest(self, conn):
  168. """Handles a single connection request event and returns if the connection is still active"""
  169. try:
  170. self.daemon.handleRequest(conn)
  171. return True
  172. except (socket.error, errors.ConnectionClosedError, errors.SecurityError):
  173. # client went away or caused a security error.
  174. # close the connection silently.
  175. try:
  176. peername = conn.sock.getpeername()
  177. log.debug("disconnected %s", peername)
  178. except socket.error:
  179. log.debug("disconnected a client")
  180. return False
  181. except errors.TimeoutError as x:
  182. # for timeout errors we're not really interested in detailed traceback info
  183. log.warning("error during handleRequest: %s" % x)
  184. return False
  185. except:
  186. # other error occurred, close the connection, but also log a warning
  187. ex_t, ex_v, ex_tb = sys.exc_info()
  188. tb = util.formatTraceback(ex_t, ex_v, ex_tb)
  189. msg = "error during handleRequest: %s; %s" % (ex_v, "".join(tb))
  190. log.warning(msg)
  191. return False
  192. def loop(self, loopCondition=lambda: True):
  193. log.debug("entering multiplexed requestloop")
  194. while loopCondition():
  195. try:
  196. try:
  197. events = self.selector.select(config.POLLTIMEOUT)
  198. except OSError:
  199. events = []
  200. # get all the socket connection objects that have a READ event
  201. # (the WRITE events are ignored here, they're registered to let timeouts work etc)
  202. events_per_server = defaultdict(list)
  203. for key, mask in events:
  204. if mask & selectors.EVENT_READ:
  205. events_per_server[key.data].append(key.fileobj)
  206. for server, fileobjs in events_per_server.items():
  207. server.events(fileobjs)
  208. if not events_per_server:
  209. self.daemon._housekeeping()
  210. except socket.timeout:
  211. pass # just continue the loop on a timeout
  212. except KeyboardInterrupt:
  213. log.debug("stopping on break signal")
  214. break
  215. def combine_loop(self, server):
  216. for sock in server.sockets:
  217. self.selector.register(sock, selectors.EVENT_READ, server)
  218. server.selector = self.selector