futures.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. """
  2. Support for Futures (asynchronously executed callables).
  3. If you're using Python 3.2 or newer, also see
  4. http://docs.python.org/3/library/concurrent.futures.html#future-objects
  5. Pyro - Python Remote Objects. Copyright by Irmen de Jong (irmen@razorvine.net).
  6. """
  7. import sys
  8. import functools
  9. import logging
  10. import threading
  11. import time
  12. __all__ = ["Future", "FutureResult", "_ExceptionWrapper"]
  13. log = logging.getLogger("Pyro4.futures")
  14. class Future(object):
  15. """
  16. Holds a callable that will be executed asynchronously and provide its
  17. result value some time in the future.
  18. This is a more general implementation than the AsyncRemoteMethod, which
  19. only works with Pyro proxies (and provides a bit different syntax).
  20. This class has a few extra features as well (delay, canceling).
  21. """
  22. def __init__(self, somecallable):
  23. self.callable = somecallable
  24. self.chain = []
  25. self.exceptionhandler = None
  26. self.call_delay = 0
  27. self.cancelled = False
  28. self.completed = False
  29. def __call__(self, *args, **kwargs):
  30. """
  31. Start the future call with the provided arguments.
  32. Control flow returns immediately, with a FutureResult object.
  33. """
  34. if self.completed or not hasattr(self, "chain"):
  35. raise RuntimeError("the future has already been evaluated")
  36. if self.cancelled:
  37. raise RuntimeError("the future has been cancelled")
  38. chain = self.chain
  39. del self.chain # make it impossible to add new calls to the chain once we started executing it
  40. result = FutureResult() # notice that the call chain doesn't sit on the result object
  41. thread = threading.Thread(target=self.__asynccall, args=(result, chain, args, kwargs))
  42. thread.setDaemon(True)
  43. thread.start()
  44. return result
  45. def __asynccall(self, asyncresult, chain, args, kwargs):
  46. while self.call_delay > 0 and not self.cancelled:
  47. delay = min(self.call_delay, 2)
  48. time.sleep(delay)
  49. self.call_delay -= delay
  50. if self.cancelled:
  51. self.completed = True
  52. asyncresult.set_cancelled()
  53. return
  54. try:
  55. self.completed = True
  56. self.cancelled = False
  57. value = self.callable(*args, **kwargs)
  58. # now walk the callchain, passing on the previous value as first argument
  59. for call, args, kwargs in chain:
  60. call = functools.partial(call, value)
  61. value = call(*args, **kwargs)
  62. asyncresult.value = value
  63. except Exception as x:
  64. if self.exceptionhandler:
  65. self.exceptionhandler(x)
  66. asyncresult.value = _ExceptionWrapper(x)
  67. def delay(self, seconds):
  68. """
  69. Delay the evaluation of the future for the given number of seconds.
  70. Return True if successful otherwise False if the future has already been evaluated.
  71. """
  72. if self.completed:
  73. return False
  74. self.call_delay = seconds
  75. return True
  76. def cancel(self):
  77. """
  78. Cancels the execution of the future altogether.
  79. If the execution hasn't been started yet, the cancellation is successful and returns True.
  80. Otherwise, it failed and returns False.
  81. """
  82. if self.completed:
  83. return False
  84. self.cancelled = True
  85. return True
  86. def then(self, call, *args, **kwargs):
  87. """
  88. Add a callable to the call chain, to be invoked when the results become available.
  89. The result of the current call will be used as the first argument for the next call.
  90. Optional extra arguments can be provided in args and kwargs.
  91. Returns self so you can easily chain then() calls.
  92. """
  93. self.chain.append((call, args, kwargs))
  94. return self
  95. def iferror(self, exceptionhandler):
  96. """
  97. Specify the exception handler to be invoked (with the exception object as only
  98. argument) when calculating the result raises an exception.
  99. If no exception handler is set, any exception raised in the asynchronous call will be silently ignored.
  100. Returns self so you can easily chain other calls.
  101. """
  102. self.exceptionhandler = exceptionhandler
  103. return self
  104. class FutureResult(object):
  105. """
  106. The result object for asynchronous Pyro calls.
  107. Unfortunatley it should be similar to the more general Future class but
  108. it is still somewhat limited (no delay, no canceling).
  109. """
  110. def __init__(self):
  111. self.__ready = threading.Event()
  112. self.callchain = []
  113. self.valueLock = threading.Lock()
  114. self.exceptionhandler = None
  115. def wait(self, timeout=None):
  116. """
  117. Wait for the result to become available, with optional timeout (in seconds).
  118. Returns True if the result is ready, or False if it still isn't ready.
  119. """
  120. result = self.__ready.wait(timeout)
  121. if result is None:
  122. # older pythons return None from wait()
  123. return self.__ready.isSet()
  124. return result
  125. @property
  126. def ready(self):
  127. """Boolean that contains the readiness of the asynchronous result"""
  128. return self.__ready.isSet()
  129. def get_value(self):
  130. self.__ready.wait()
  131. if isinstance(self.__value, _ExceptionWrapper):
  132. self.__value.raiseIt()
  133. else:
  134. return self.__value
  135. def set_value(self, value):
  136. with self.valueLock:
  137. self.__value = value
  138. # walk the call chain if the result is not an exception, otherwise invoke the errorhandler (if any)
  139. if isinstance(value, _ExceptionWrapper):
  140. if self.exceptionhandler:
  141. self.exceptionhandler(value.exception)
  142. else:
  143. for call, args, kwargs in self.callchain:
  144. call = functools.partial(call, self.__value)
  145. self.__value = call(*args, **kwargs)
  146. if isinstance(self.__value, _ExceptionWrapper):
  147. break
  148. self.callchain = []
  149. self.__ready.set()
  150. value = property(get_value, set_value, None, "The result value of the call. Reading it will block if not available yet.")
  151. def set_cancelled(self):
  152. self.set_value(_ExceptionWrapper(RuntimeError("future has been cancelled")))
  153. def then(self, call, *args, **kwargs):
  154. """
  155. Add a callable to the call chain, to be invoked when the results become available.
  156. The result of the current call will be used as the first argument for the next call.
  157. Optional extra arguments can be provided in args and kwargs.
  158. Returns self so you can easily chain then() calls.
  159. """
  160. with self.valueLock:
  161. if self.__ready.isSet():
  162. # value is already known, we need to process it immediately (can't use the call chain anymore)
  163. call = functools.partial(call, self.__value)
  164. self.__value = call(*args, **kwargs)
  165. else:
  166. # add the call to the call chain, it will be processed later when the result arrives
  167. self.callchain.append((call, args, kwargs))
  168. return self
  169. def iferror(self, exceptionhandler):
  170. """
  171. Specify the exception handler to be invoked (with the exception object as only
  172. argument) when asking for the result raises an exception.
  173. If no exception handler is set, any exception result will be silently ignored (unless
  174. you explicitly ask for the value). Returns self so you can easily chain other calls.
  175. """
  176. self.exceptionhandler = exceptionhandler
  177. return self
  178. class _ExceptionWrapper(object):
  179. """Class that wraps a remote exception. If this is returned, Pyro will
  180. re-throw the exception on the receiving side. Usually this is taken care of
  181. by a special response message flag, but in the case of batched calls this
  182. flag is useless and another mechanism was needed."""
  183. def __init__(self, exception):
  184. self.exception = exception
  185. def raiseIt(self):
  186. from Pyro4.util import fixIronPythonExceptionForPickle # XXX circular
  187. if sys.platform == "cli":
  188. fixIronPythonExceptionForPickle(self.exception, False)
  189. raise self.exception
  190. def __serialized_dict__(self):
  191. """serialized form as a dictionary"""
  192. from Pyro4.util import SerializerBase # XXX circular
  193. return {
  194. "__class__": "Pyro4.futures._ExceptionWrapper",
  195. "exception": SerializerBase.class_to_dict(self.exception)
  196. }