Skip to content
Snippets Groups Projects
Commit 81db1f88 authored by Filippo Cremonese's avatar Filippo Cremonese
Browse files

Refactored executor: stop scheduling new jobs if one fails

parent ae8268e2
No related branches found
No related tags found
No related merge requests found
import logging import logging
from concurrent import futures from concurrent import futures
from typing import List
from .model.actions.action import Action from .model.actions.action import Action
class Executor: class Executor:
def __init__(self, threads=1, show_output=False): def __init__(self, threads=1, show_output=False):
self.pending_actions = set()
self.futures = []
self.threads = 1 self.threads = 1
self.show_output = show_output self.show_output = show_output
self.pool = futures.ThreadPoolExecutor(max_workers=threads, thread_name_prefix="Builder") self._pending_actions = set()
self._running_actions: List[futures.Future] = []
self._pool = futures.ThreadPoolExecutor(max_workers=threads, thread_name_prefix="Builder")
def run(self, action, force=False): def run(self, action, force=False):
self._collect_actions(action, force=force) self._collect_actions(action, force=force)
...@@ -18,23 +19,32 @@ class Executor: ...@@ -18,23 +19,32 @@ class Executor:
for _ in range(self.threads): for _ in range(self.threads):
self._schedule_next() self._schedule_next()
while self.futures: while self._running_actions:
done, not_done = futures.wait(self.futures, return_when=futures.FIRST_COMPLETED) done, not_done = futures.wait(self._running_actions, return_when=futures.FIRST_COMPLETED)
for d in done: for d in done:
self.futures.remove(d) self._running_actions.remove(d)
self._schedule_next() exception = d.exception()
if exception:
logging.critical("An action failed!")
logging.critical(exception)
if self._pending_actions:
logging.critical("Waiting for other running actions to terminate")
self._pending_actions = set()
else:
self._schedule_next()
if self.pending_actions: # Todo: remove these 4 lines
if self._pending_actions:
logging.error("Could not schedule any action, something failed") logging.error("Could not schedule any action, something failed")
logging.error(f"Remaining: {self.pending_actions}") logging.error(f"Remaining: {self._pending_actions}")
breakpoint() breakpoint()
def _collect_actions(self, action: Action, force=False): def _collect_actions(self, action: Action, force=False):
if not force and action.is_satisfied(): if not force and action.is_satisfied():
return return
if action not in self.pending_actions: if action not in self._pending_actions:
self.pending_actions.add(action) self._pending_actions.add(action)
for dep in action.dependencies: for dep in action.dependencies:
self._collect_actions(dep) self._collect_actions(dep)
...@@ -44,14 +54,14 @@ class Executor: ...@@ -44,14 +54,14 @@ class Executor:
logging.debug(f"Did not find more runnable actions") logging.debug(f"Did not find more runnable actions")
return return
future = self.pool.submit(self._run_action, next_runnable_action) future = self._pool.submit(self._run_action, next_runnable_action)
self.futures.append(future) self._running_actions.append(future)
def _get_next_runnable_action(self): def _get_next_runnable_action(self):
for action in self.pending_actions: for action in self._pending_actions:
if all([d.is_satisfied() for d in action.dependencies]): if all([d.is_satisfied() for d in action.dependencies]):
self.pending_actions.remove(action) self._pending_actions.remove(action)
return action return action
def _run_action(self, action: Action): def _run_action(self, action: Action):
action.run(show_output=self.show_output) return action.run(show_output=self.show_output)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment