Module light_pipe.parallelizer

Classes

class AsyncGatherer (loop: Optional[asyncio.events.AbstractEventLoop] = None, *args, **kwargs)
Expand source code
class AsyncGatherer(Parallelizer):
    def __init__(
        self, loop: Optional[asyncio.AbstractEventLoop] = None, *args, **kwargs
    ):
        super().__init__(*args, **kwargs)
        self.loop = loop
        self._terminate_flag = False


    def _make_async_error_handler_decorator(
        self, num_tries: Optional[int] = 1, 
        raise_after_retries: Optional[bool] = True,
        failed_tasks: Optional[List[Tuple[Callable, Tuple, Dict]]] = None
    ) -> Callable:
        if not raise_after_retries:
            assert failed_tasks is not None, \
                "`failed_tasks` must be passed when `raise_after_retries` is `False`."
        def handle_errors(fn: Callable) -> Callable:
            @functools.wraps(fn)
            async def handle_errors_wrapper(*args, **kwargs) -> Any:
                error: Union[None, Exception] = None
                for _ in range(num_tries):
                    try:
                        result: Any = fn(*args, **kwargs)
                        if isinstance(result, Coroutine):
                            return await result
                        return result
                    except Exception as e:
                        error = e
                        pass
                if raise_after_retries:
                    raise error
                else:
                    logging.warn(
                        f"An exception occurred while processing an item: {type(error).__name__}: {str(error)}"
                    )
                    failed_tasks.append((fn, args, kwargs))
            return handle_errors_wrapper
        return handle_errors


    def _make_async_decorator(self, f: Callable):
        @functools.wraps(f)
        async def async_wrapper(*args, **kwargs):
            result = f(*args, **kwargs)
            if isinstance(result, Coroutine):
                return await result
            return result
        return async_wrapper


    def _get_tasks(
        self, iterable: Iterable, tuple_to_args: Optional[bool] = True, 
        dict_to_kwargs: Optional[bool] = True, num_tries: Optional[int] = 1, 
        raise_after_retries: Optional[bool] = True, 
        failed_tasks: Optional[List[Tuple[Callable, Tuple, Dict]]] = None,
        **kwargs
    ) -> List[asyncio.Task]:
        error_handler: Callable = self._make_async_error_handler_decorator(
            num_tries=num_tries, raise_after_retries=raise_after_retries,
            failed_tasks=failed_tasks
        )
        tasks = list()
        for f, item, args, wkwargs in iterable:
            f = self._make_async_decorator(f)
            f: Callable = error_handler(f)
            if tuple_to_args and isinstance(item, Tuple):
                tasks.append(f(*item, *args, **kwargs, **wkwargs))
            elif dict_to_kwargs and isinstance(item, Dict):
                tasks.append(f(*args, **item, **kwargs, **wkwargs))
            else:
                tasks.append(f(item, *args, **kwargs, **wkwargs))
        return tasks
            

    async def _async_gen(
        self, iterable: Iterable, **kwargs
    ) -> AsyncGenerator:
        tasks = self._get_tasks(iterable, **kwargs)
        for result in asyncio.as_completed(tasks):
            result = await result
            yield result


    def _iter(
        self, loop: asyncio.AbstractEventLoop, async_generator: AsyncGenerator, 
        q: queue.Queue
    ) -> Generator:
        ait = async_generator.__aiter__()
        async def get_next() -> Tuple[bool, Any]:
            try:
                obj = await ait.__anext__()
                done = False
            except StopAsyncIteration:
                obj = None
                done = True
            return done, obj


        while not self._terminate_flag:
            done, obj = loop.run_until_complete(get_next())
            if done:
                q.put(QueueEmptySignal())
                break
            # yield obj 
            q.put(obj)   


    def _queue_generator(self, q: queue.Queue) -> Generator:
        while not self._terminate_flag:
            obj: Any = q.get()
            if isinstance(obj, QueueEmptySignal):
                break
            yield obj   


    def __call__(
        self, iterable: Iterable,
        loop: Optional[asyncio.AbstractEventLoop] = None,
        tuple_to_args: Optional[bool] = True, 
        dict_to_kwargs: Optional[bool] = True, num_tries: Optional[int] = 1, 
        raise_after_retries: Optional[bool] = True, 
        failed_tasks: Optional[List[Tuple[Callable, Tuple, Dict]]] = None
    ) -> Generator:
        if loop is None:
            if self.loop is not None:
                loop = self.loop
            else:
                loop = asyncio.new_event_loop()
        async_generator = self._async_gen(
            iterable, tuple_to_args=tuple_to_args, dict_to_kwargs=dict_to_kwargs,
            num_tries=num_tries, raise_after_retries=raise_after_retries,
            failed_tasks=failed_tasks
        )
        q: queue.Queue = queue.Queue()
        t = threading.Thread(
            target=self._iter, 
            kwargs={
                "loop": loop,
                "async_generator": async_generator,
                "q": q
            }
        )
        t.start()
        yield from self._queue_generator(q=q)
        t.join()

Ancestors

class BlockingPooler (max_workers: Optional[int] = None, queue_size: Optional[int] = None, DefaultBlockingExecutor: Optional[type] = None, executor: Union[concurrent.futures.thread.ThreadPoolExecutor, concurrent.futures.process.ProcessPoolExecutor, ForwardRef(None)] = None, *args, **kwargs)
Expand source code
class BlockingPooler(Parallelizer):
    def __init__(
        self, max_workers: Optional[int] = None, queue_size: Optional[int] = None,
        DefaultBlockingExecutor: Optional[type] = None,
        executor: Optional[Union[
            concurrent.futures.ThreadPoolExecutor, 
            concurrent.futures.ProcessPoolExecutor
        ]] = None, *args, **kwargs
    ):
        super().__init__(*args, **kwargs)
        if executor is None:
            assert max_workers is not None and queue_size is not None, \
                "Both `max_workers` and `queue_size` must be set if `executor` is not passed."
        self.max_workers = max_workers
        self.DefaultBlockingExecutor = DefaultBlockingExecutor
        self.queue_size = queue_size
        self.executor = executor


    def _submit_task(
        self, f: Callable, item: Any,
        executor: Union[
            concurrent.futures.ThreadPoolExecutor, 
            concurrent.futures.ProcessPoolExecutor
        ],
        tuple_to_args: Optional[bool] = True, 
        dict_to_kwargs: Optional[bool] = True,
        *args, **kwargs
    ) -> Future:
        if tuple_to_args and isinstance(item, Tuple):
            return executor.submit(f, *item, *args, **kwargs)
        elif dict_to_kwargs and isinstance(item, Dict):
            return executor.submit(f, *args, **item, **kwargs)
        else:
            return executor.submit(f, item, *args, **kwargs)


    def _blocking_submitter(
        self,  iterable: Iterable, queue_size: int,
        executor: Optional[Union[
            concurrent.futures.ThreadPoolExecutor, 
            concurrent.futures.ProcessPoolExecutor
        ]] = None,
        tuple_to_args: Optional[bool] = True, 
        dict_to_kwargs: Optional[bool] = True, num_tries: Optional[int] = 1, 
        raise_after_retries: Optional[bool] = True, 
        failed_tasks: Optional[List[Tuple[Callable, Tuple, Dict]]] = None
    ) -> Generator:
        if isinstance(executor, concurrent.futures.ProcessPoolExecutor):
            if num_tries != 1 or not raise_after_retries:
                logging.warn("Error handling is not implemented for `BlockingProcessPooler` instances.")
            error_handler: None = None
        else:
            error_handler: Callable = self._make_error_handler_decorator(
                num_tries=num_tries, raise_after_retries=raise_after_retries,
                failed_tasks=failed_tasks
            )
        futures = dict()
        exhausted = False
        num_submitted = 0
        while True:
            while not exhausted and num_submitted < queue_size:
                try:
                    f, item, args, kwargs = next(iterable)
                    if error_handler is not None:
                        f: Callable = error_handler(f)
                except StopIteration:
                    exhausted = True
                    break
                # futures[executor.submit(f, item, *args, **kwargs)] = "Done"
                futures[
                    self._submit_task(
                        f=f, item=item, executor=executor, tuple_to_args=tuple_to_args, 
                        dict_to_kwargs=dict_to_kwargs, *args, **kwargs
                    )
                ] = "Done"
                num_submitted += 1
            if futures: # There's at least one task left to await
                done, _ = concurrent.futures.wait(
                    futures, return_when=concurrent.futures.FIRST_COMPLETED
                ) # Will block until at least one future finishes or cancels
                future = done.pop()
                yield future.result()
                num_submitted -= 1
                del(futures[future])
            else:
                assert num_submitted == 0, \
                    f"There are still {num_submitted} tasks left to await."
                break


    def __call__(
        self, iterable: Iterable, max_workers: Optional[int] = None,
        queue_size: Optional[int] = None,
        executor: Optional[Union[
            concurrent.futures.ThreadPoolExecutor, 
            concurrent.futures.ProcessPoolExecutor
        ]] = None,
        tuple_to_args: Optional[bool] = True, 
        dict_to_kwargs: Optional[bool] = True, num_tries: Optional[int] = 1, 
        raise_after_retries: Optional[bool] = True, 
        failed_tasks: Optional[List[Tuple[Callable, Tuple, Dict]]] = None
    ) -> Generator:
        if max_workers is None:
            max_workers = self.max_workers
        if queue_size is None:
            queue_size = self.queue_size
        if executor is None:
            executor = self.executor     

        if executor is not None:
            yield from self._blocking_submitter(
                iterable=iterable, queue_size=queue_size, executor=executor,
                tuple_to_args=tuple_to_args, dict_to_kwargs=dict_to_kwargs,
                num_tries=num_tries, raise_after_retries=raise_after_retries,
                failed_tasks=failed_tasks
            )
        else:
            with self.DefaultBlockingExecutor(
                max_workers=max_workers,
            ) as executor:
                yield from self._blocking_submitter(
                    iterable=iterable, queue_size=queue_size, executor=executor,
                    tuple_to_args=tuple_to_args, dict_to_kwargs=dict_to_kwargs,
                    num_tries=num_tries, raise_after_retries=raise_after_retries,
                    failed_tasks=failed_tasks
                )

Ancestors

Subclasses

class BlockingProcessPooler (*args, DefaultBlockingExecutor: Optional[type] = concurrent.futures.process.ProcessPoolExecutor, **kwargs)
Expand source code
class BlockingProcessPooler(BlockingPooler):
    def __init__(
        self, *args, 
        DefaultBlockingExecutor: Optional[type] = concurrent.futures.ProcessPoolExecutor,
        **kwargs
    ):
        super().__init__(
            *args, DefaultBlockingExecutor=DefaultBlockingExecutor, **kwargs
        )           

Ancestors

class BlockingThreadPooler (*args, DefaultBlockingExecutor: Optional[type] = concurrent.futures.thread.ThreadPoolExecutor, **kwargs)
Expand source code
class BlockingThreadPooler(BlockingPooler):
    def __init__(
        self, *args, 
        DefaultBlockingExecutor: Optional[type] = concurrent.futures.ThreadPoolExecutor,
        **kwargs
    ):
        super().__init__(
            *args, DefaultBlockingExecutor=DefaultBlockingExecutor, **kwargs
        )

Ancestors

class Parallelizer
Expand source code
class Parallelizer:
    # def __init__(
    #     self, num_tries: Optional[int] = 1, 
    #     raise_after_retries: Optional[bool] = True, 
    #     failed_tasks: Optional[List[Tuple[Callable, Tuple, Dict]]] = None
    # ):
    #     if not raise_after_retries:
    #         assert failed_tasks is not None, \
    #             "`failed_tasks` must be passed when `raise_after_retries` is `False`."

    #     self._error_handler: Callable = self._make_error_handler_decorator(
    #         num_tries=num_tries, raise_after_retries=raise_after_retries,
    #         failed_tasks=failed_tasks
    #     )

    #     self.num_tries = num_tries
    #     self.raise_after_retries = raise_after_retries
    #     self.failed_tasks = failed_tasks


    def _make_error_handler_decorator(
        self, num_tries: Optional[int] = 1, 
        raise_after_retries: Optional[bool] = True,
        failed_tasks: Optional[List[Tuple[Callable, Tuple, Dict]]] = None
    ) -> Callable:
        if not raise_after_retries:
            assert failed_tasks is not None, \
                "`failed_tasks` must be passed when `raise_after_retries` is `False`."
        def handle_errors(fn: Callable) -> Callable:
            @functools.wraps(fn)
            def handle_errors_wrapper(*args, **kwargs) -> Any:
                error: Union[None, Exception] = None
                for _ in range(num_tries):
                    try:
                        result: Any = fn(*args, **kwargs)
                        return result
                    except Exception as e:
                        error = e
                        pass
                if raise_after_retries:
                    raise error
                else:
                    logging.warn(
                        f"An exception occurred while processing an item: {type(error).__name__}: {str(error)}"
                    )
                    failed_tasks.append((fn, args, kwargs))
            return handle_errors_wrapper
        return handle_errors


    def __call__(
        self, iterable: Iterable, tuple_to_args: Optional[bool] = True,
        dict_to_kwargs: Optional[bool] = True, num_tries: Optional[int] = 1, 
        raise_after_retries: Optional[bool] = True, 
        failed_tasks: Optional[List[Tuple[Callable, Tuple, Dict]]] = None
    ):
        error_handler: Callable = self._make_error_handler_decorator(
            num_tries=num_tries, raise_after_retries=raise_after_retries,
            failed_tasks=failed_tasks
        )
        for f, item, args, kwargs in iterable:
            f: Callable = error_handler(f)
            if tuple_to_args and isinstance(item, Tuple):
                yield f(*item, *args, **kwargs)
            elif dict_to_kwargs and isinstance(item, Dict):
                yield f(*args, **item, **kwargs)
            else:
                yield f(item, *args, **kwargs)

Subclasses

class Pooler (max_workers: Optional[int] = None, DefaultExecutor: Optional[type] = None, executor: Union[concurrent.futures.thread.ThreadPoolExecutor, concurrent.futures.process.ProcessPoolExecutor, ForwardRef(None)] = None, *args, **kwargs)
Expand source code
class Pooler(Parallelizer):
    def __init__(
        self, max_workers: Optional[int] = None,
        DefaultExecutor: Optional[type] = None,
        executor: Optional[Union[
            concurrent.futures.ThreadPoolExecutor, 
            concurrent.futures.ProcessPoolExecutor
        ]] = None, *args, **kwargs
    ):
        super().__init__(*args, **kwargs)
        if executor is None:
            assert max_workers is not None, \
                "`max_workers` must be set if `executor` is not passed."
        self.max_workers = max_workers
        self.DefaultExecutor = DefaultExecutor
        self.executor = executor


    def __call__(
        self, iterable: Iterable, max_workers: Optional[int] = None,
        executor: Optional[Union[
            concurrent.futures.ThreadPoolExecutor, 
            concurrent.futures.ProcessPoolExecutor
        ]] = None, 
        tuple_to_args: Optional[bool] = True,
        dict_to_kwargs: Optional[bool] = True, num_tries: Optional[int] = 1, 
        raise_after_retries: Optional[bool] = True, 
        failed_tasks: Optional[List[Tuple[Callable, Tuple, Dict]]] = None
    ) -> Generator:
        if executor is None:
            executor = self.executor
        if max_workers is None:
            max_workers = self.max_workers

        if executor is not None:
            # futures = [
            #     executor.submit(f, item, *args, **kwargs) for 
            #     f, item, args, kwargs in iterable
            # ]
            yield from self._submit_tasks(
                iterable=iterable, executor=executor, tuple_to_args=tuple_to_args, 
                dict_to_kwargs=dict_to_kwargs, num_tries=num_tries, 
                raise_after_retries=raise_after_retries, 
                failed_tasks=failed_tasks
            )
        else:
            with self.DefaultExecutor(max_workers=max_workers) as executor:
                # futures = [
                #     executor.submit(f, item, *args, **kwargs) for 
                #     f, item, args, kwargs in iterable
                # ]
                # for future in concurrent.futures.as_completed(futures):
                #     yield future.result()
                yield from self._submit_tasks(
                    iterable=iterable, executor=executor, 
                    tuple_to_args=tuple_to_args, dict_to_kwargs=dict_to_kwargs,
                    num_tries=num_tries, raise_after_retries=raise_after_retries, 
                    failed_tasks=failed_tasks
                )


    def _submit_tasks(
        self, iterable: Iterable, 
        executor: Union[
            concurrent.futures.ThreadPoolExecutor, 
            concurrent.futures.ProcessPoolExecutor
        ],
        tuple_to_args: Optional[bool] = True, 
        dict_to_kwargs: Optional[bool] = True, num_tries: Optional[int] = 1, 
        raise_after_retries: Optional[bool] = True, 
        failed_tasks: Optional[List[Tuple[Callable, Tuple, Dict]]] = None
    ) -> Generator:
        if isinstance(executor, concurrent.futures.ProcessPoolExecutor):
            if num_tries != 1 or not raise_after_retries:
                logging.warn("Error handling is not implemented for `ProcessPooler` instances.")
            error_handler: None = None
        else:
            error_handler: Callable = self._make_error_handler_decorator(
                num_tries=num_tries, raise_after_retries=raise_after_retries,
                failed_tasks=failed_tasks
            )
        futures: list = list()
        for f, item, args, kwargs in iterable:
            if error_handler is not None:
                f: Callable = error_handler(f)
            if tuple_to_args and isinstance(item, Tuple):
                futures.append(executor.submit(f, *item, *args, **kwargs))
            elif dict_to_kwargs and isinstance(item, Dict):
                futures.append(executor.submit(f, *args, **item, **kwargs))
            else:
                futures.append(executor.submit(f, item, *args, **kwargs))
        for future in concurrent.futures.as_completed(futures):
            yield future.result()

Ancestors

Subclasses

class ProcessPooler (*args, DefaultExecutor: Optional[type] = concurrent.futures.process.ProcessPoolExecutor, **kwargs)
Expand source code
class ProcessPooler(Pooler):
    def __init__(
        self, *args, 
        DefaultExecutor: Optional[type] = concurrent.futures.ProcessPoolExecutor,
        **kwargs
    ):
        super().__init__(*args, DefaultExecutor=DefaultExecutor, **kwargs)              

Ancestors

class QueueEmptySignal
Expand source code
class QueueEmptySignal:
    pass
class ThreadPooler (*args, DefaultExecutor: Optional[type] = concurrent.futures.thread.ThreadPoolExecutor, **kwargs)
Expand source code
class ThreadPooler(Pooler):
    def __init__(
        self, *args, 
        DefaultExecutor: Optional[type] = concurrent.futures.ThreadPoolExecutor,
        **kwargs
    ):
        super().__init__(*args, DefaultExecutor=DefaultExecutor, **kwargs)

Ancestors