diff --git a/orchestra/executor.py b/orchestra/executor.py index 8c969a16d81c0901b98c7028abcf9abb8114be0d..70f35fb4f42092df82ea7422edfe96a71fe0df7c 100644 --- a/orchestra/executor.py +++ b/orchestra/executor.py @@ -1,16 +1,17 @@ import logging from concurrent import futures +from typing import List from .model.actions.action import Action class Executor: def __init__(self, threads=1, show_output=False): - self.pending_actions = set() - self.futures = [] self.threads = 1 self.show_output = show_output - self.pool = futures.ThreadPoolExecutor(max_workers=threads, thread_name_prefix="Builder") + self._pending_actions = set() + self._running_actions: List[futures.Future] = [] + self._pool = futures.ThreadPoolExecutor(max_workers=threads, thread_name_prefix="Builder") def run(self, action, force=False): self._collect_actions(action, force=force) @@ -18,23 +19,32 @@ class Executor: for _ in range(self.threads): self._schedule_next() - while self.futures: - done, not_done = futures.wait(self.futures, return_when=futures.FIRST_COMPLETED) + while self._running_actions: + done, not_done = futures.wait(self._running_actions, return_when=futures.FIRST_COMPLETED) for d in done: - self.futures.remove(d) - self._schedule_next() + self._running_actions.remove(d) + exception = d.exception() + if exception: + logging.critical("An action failed!") + logging.critical(exception) + if self._pending_actions: + logging.critical("Waiting for other running actions to terminate") + self._pending_actions = set() + else: + self._schedule_next() - if self.pending_actions: + # Todo: remove these 4 lines + if self._pending_actions: logging.error("Could not schedule any action, something failed") - logging.error(f"Remaining: {self.pending_actions}") + logging.error(f"Remaining: {self._pending_actions}") breakpoint() def _collect_actions(self, action: Action, force=False): if not force and action.is_satisfied(): return - if action not in self.pending_actions: - self.pending_actions.add(action) + if action not in self._pending_actions: + self._pending_actions.add(action) for dep in action.dependencies: self._collect_actions(dep) @@ -44,14 +54,14 @@ class Executor: logging.debug(f"Did not find more runnable actions") return - future = self.pool.submit(self._run_action, next_runnable_action) - self.futures.append(future) + future = self._pool.submit(self._run_action, next_runnable_action) + self._running_actions.append(future) def _get_next_runnable_action(self): - for action in self.pending_actions: + for action in self._pending_actions: if all([d.is_satisfied() for d in action.dependencies]): - self.pending_actions.remove(action) + self._pending_actions.remove(action) return action def _run_action(self, action: Action): - action.run(show_output=self.show_output) + return action.run(show_output=self.show_output)