123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226 |
- """
- Support for Futures (asynchronously executed callables).
- If you're using Python 3.2 or newer, also see
- http://docs.python.org/3/library/concurrent.futures.html#future-objects
- Pyro - Python Remote Objects. Copyright by Irmen de Jong (irmen@razorvine.net).
- """
- import sys
- import functools
- import logging
- import threading
- import time
- __all__ = ["Future", "FutureResult", "_ExceptionWrapper"]
- log = logging.getLogger("Pyro4.futures")
- class Future(object):
- """
- Holds a callable that will be executed asynchronously and provide its
- result value some time in the future.
- This is a more general implementation than the AsyncRemoteMethod, which
- only works with Pyro proxies (and provides a bit different syntax).
- This class has a few extra features as well (delay, canceling).
- """
- def __init__(self, somecallable):
- self.callable = somecallable
- self.chain = []
- self.exceptionhandler = None
- self.call_delay = 0
- self.cancelled = False
- self.completed = False
- def __call__(self, *args, **kwargs):
- """
- Start the future call with the provided arguments.
- Control flow returns immediately, with a FutureResult object.
- """
- if self.completed or not hasattr(self, "chain"):
- raise RuntimeError("the future has already been evaluated")
- if self.cancelled:
- raise RuntimeError("the future has been cancelled")
- chain = self.chain
- del self.chain # make it impossible to add new calls to the chain once we started executing it
- result = FutureResult() # notice that the call chain doesn't sit on the result object
- thread = threading.Thread(target=self.__asynccall, args=(result, chain, args, kwargs))
- thread.setDaemon(True)
- thread.start()
- return result
- def __asynccall(self, asyncresult, chain, args, kwargs):
- while self.call_delay > 0 and not self.cancelled:
- delay = min(self.call_delay, 2)
- time.sleep(delay)
- self.call_delay -= delay
- if self.cancelled:
- self.completed = True
- asyncresult.set_cancelled()
- return
- try:
- self.completed = True
- self.cancelled = False
- value = self.callable(*args, **kwargs)
- # now walk the callchain, passing on the previous value as first argument
- for call, args, kwargs in chain:
- call = functools.partial(call, value)
- value = call(*args, **kwargs)
- asyncresult.value = value
- except Exception as x:
- if self.exceptionhandler:
- self.exceptionhandler(x)
- asyncresult.value = _ExceptionWrapper(x)
- def delay(self, seconds):
- """
- Delay the evaluation of the future for the given number of seconds.
- Return True if successful otherwise False if the future has already been evaluated.
- """
- if self.completed:
- return False
- self.call_delay = seconds
- return True
- def cancel(self):
- """
- Cancels the execution of the future altogether.
- If the execution hasn't been started yet, the cancellation is successful and returns True.
- Otherwise, it failed and returns False.
- """
- if self.completed:
- return False
- self.cancelled = True
- return True
- def then(self, call, *args, **kwargs):
- """
- Add a callable to the call chain, to be invoked when the results become available.
- The result of the current call will be used as the first argument for the next call.
- Optional extra arguments can be provided in args and kwargs.
- Returns self so you can easily chain then() calls.
- """
- self.chain.append((call, args, kwargs))
- return self
- def iferror(self, exceptionhandler):
- """
- Specify the exception handler to be invoked (with the exception object as only
- argument) when calculating the result raises an exception.
- If no exception handler is set, any exception raised in the asynchronous call will be silently ignored.
- Returns self so you can easily chain other calls.
- """
- self.exceptionhandler = exceptionhandler
- return self
- class FutureResult(object):
- """
- The result object for asynchronous Pyro calls.
- Unfortunatley it should be similar to the more general Future class but
- it is still somewhat limited (no delay, no canceling).
- """
- def __init__(self):
- self.__ready = threading.Event()
- self.callchain = []
- self.valueLock = threading.Lock()
- self.exceptionhandler = None
- def wait(self, timeout=None):
- """
- Wait for the result to become available, with optional timeout (in seconds).
- Returns True if the result is ready, or False if it still isn't ready.
- """
- result = self.__ready.wait(timeout)
- if result is None:
- # older pythons return None from wait()
- return self.__ready.isSet()
- return result
- @property
- def ready(self):
- """Boolean that contains the readiness of the asynchronous result"""
- return self.__ready.isSet()
- def get_value(self):
- self.__ready.wait()
- if isinstance(self.__value, _ExceptionWrapper):
- self.__value.raiseIt()
- else:
- return self.__value
- def set_value(self, value):
- with self.valueLock:
- self.__value = value
- # walk the call chain if the result is not an exception, otherwise invoke the errorhandler (if any)
- if isinstance(value, _ExceptionWrapper):
- if self.exceptionhandler:
- self.exceptionhandler(value.exception)
- else:
- for call, args, kwargs in self.callchain:
- call = functools.partial(call, self.__value)
- self.__value = call(*args, **kwargs)
- if isinstance(self.__value, _ExceptionWrapper):
- break
- self.callchain = []
- self.__ready.set()
- value = property(get_value, set_value, None, "The result value of the call. Reading it will block if not available yet.")
- def set_cancelled(self):
- self.set_value(_ExceptionWrapper(RuntimeError("future has been cancelled")))
- def then(self, call, *args, **kwargs):
- """
- Add a callable to the call chain, to be invoked when the results become available.
- The result of the current call will be used as the first argument for the next call.
- Optional extra arguments can be provided in args and kwargs.
- Returns self so you can easily chain then() calls.
- """
- with self.valueLock:
- if self.__ready.isSet():
- # value is already known, we need to process it immediately (can't use the call chain anymore)
- call = functools.partial(call, self.__value)
- self.__value = call(*args, **kwargs)
- else:
- # add the call to the call chain, it will be processed later when the result arrives
- self.callchain.append((call, args, kwargs))
- return self
- def iferror(self, exceptionhandler):
- """
- Specify the exception handler to be invoked (with the exception object as only
- argument) when asking for the result raises an exception.
- If no exception handler is set, any exception result will be silently ignored (unless
- you explicitly ask for the value). Returns self so you can easily chain other calls.
- """
- self.exceptionhandler = exceptionhandler
- return self
- class _ExceptionWrapper(object):
- """Class that wraps a remote exception. If this is returned, Pyro will
- re-throw the exception on the receiving side. Usually this is taken care of
- by a special response message flag, but in the case of batched calls this
- flag is useless and another mechanism was needed."""
- def __init__(self, exception):
- self.exception = exception
- def raiseIt(self):
- from Pyro4.util import fixIronPythonExceptionForPickle # XXX circular
- if sys.platform == "cli":
- fixIronPythonExceptionForPickle(self.exception, False)
- raise self.exception
- def __serialized_dict__(self):
- """serialized form as a dictionary"""
- from Pyro4.util import SerializerBase # XXX circular
- return {
- "__class__": "Pyro4.futures._ExceptionWrapper",
- "exception": SerializerBase.class_to_dict(self.exception)
- }
|