parallel.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. """Convenient parallelization of higher order functions.
  2. This module provides two helper functions, with appropriate fallbacks on
  3. Python 2 and on systems lacking support for synchronization mechanisms:
  4. - map_multiprocess
  5. - map_multithread
  6. These helpers work like Python 3's map, with two differences:
  7. - They don't guarantee the order of processing of
  8. the elements of the iterable.
  9. - The underlying process/thread pools chop the iterable into
  10. a number of chunks, so that for very long iterables using
  11. a large value for chunksize can make the job complete much faster
  12. than using the default value of 1.
  13. """
  14. __all__ = ["map_multiprocess", "map_multithread"]
  15. from contextlib import contextmanager
  16. from multiprocessing import Pool as ProcessPool
  17. from multiprocessing import pool
  18. from multiprocessing.dummy import Pool as ThreadPool
  19. from typing import Callable, Iterable, Iterator, TypeVar, Union
  20. from pip._vendor.requests.adapters import DEFAULT_POOLSIZE
  21. Pool = Union[pool.Pool, pool.ThreadPool]
  22. S = TypeVar("S")
  23. T = TypeVar("T")
  24. # On platforms without sem_open, multiprocessing[.dummy] Pool
  25. # cannot be created.
  26. try:
  27. import multiprocessing.synchronize # noqa
  28. except ImportError:
  29. LACK_SEM_OPEN = True
  30. else:
  31. LACK_SEM_OPEN = False
  32. # Incredibly large timeout to work around bpo-8296 on Python 2.
  33. TIMEOUT = 2000000
  34. @contextmanager
  35. def closing(pool):
  36. # type: (Pool) -> Iterator[Pool]
  37. """Return a context manager making sure the pool closes properly."""
  38. try:
  39. yield pool
  40. finally:
  41. # For Pool.imap*, close and join are needed
  42. # for the returned iterator to begin yielding.
  43. pool.close()
  44. pool.join()
  45. pool.terminate()
  46. def _map_fallback(func, iterable, chunksize=1):
  47. # type: (Callable[[S], T], Iterable[S], int) -> Iterator[T]
  48. """Make an iterator applying func to each element in iterable.
  49. This function is the sequential fallback either on Python 2
  50. where Pool.imap* doesn't react to KeyboardInterrupt
  51. or when sem_open is unavailable.
  52. """
  53. return map(func, iterable)
  54. def _map_multiprocess(func, iterable, chunksize=1):
  55. # type: (Callable[[S], T], Iterable[S], int) -> Iterator[T]
  56. """Chop iterable into chunks and submit them to a process pool.
  57. For very long iterables using a large value for chunksize can make
  58. the job complete much faster than using the default value of 1.
  59. Return an unordered iterator of the results.
  60. """
  61. with closing(ProcessPool()) as pool:
  62. return pool.imap_unordered(func, iterable, chunksize)
  63. def _map_multithread(func, iterable, chunksize=1):
  64. # type: (Callable[[S], T], Iterable[S], int) -> Iterator[T]
  65. """Chop iterable into chunks and submit them to a thread pool.
  66. For very long iterables using a large value for chunksize can make
  67. the job complete much faster than using the default value of 1.
  68. Return an unordered iterator of the results.
  69. """
  70. with closing(ThreadPool(DEFAULT_POOLSIZE)) as pool:
  71. return pool.imap_unordered(func, iterable, chunksize)
  72. if LACK_SEM_OPEN:
  73. map_multiprocess = map_multithread = _map_fallback
  74. else:
  75. map_multiprocess = _map_multiprocess
  76. map_multithread = _map_multithread