From 81db1f889fdf8cb17fb68c25b49fb7f21bd516a4 Mon Sep 17 00:00:00 2001
From: Filippo Cremonese <filippocremonese@rev.ng>
Date: Mon, 24 Aug 2020 17:09:00 +0200
Subject: [PATCH] Refactored executor: stop scheduling new jobs if one fails

---
 orchestra/executor.py | 42 ++++++++++++++++++++++++++----------------
 1 file changed, 26 insertions(+), 16 deletions(-)

diff --git a/orchestra/executor.py b/orchestra/executor.py
index 8c969a1..70f35fb 100644
--- a/orchestra/executor.py
+++ b/orchestra/executor.py
@@ -1,16 +1,17 @@
 import logging
 from concurrent import futures
+from typing import List
 
 from .model.actions.action import Action
 
 
 class Executor:
     def __init__(self, threads=1, show_output=False):
-        self.pending_actions = set()
-        self.futures = []
         self.threads = 1
         self.show_output = show_output
-        self.pool = futures.ThreadPoolExecutor(max_workers=threads, thread_name_prefix="Builder")
+        self._pending_actions = set()
+        self._running_actions: List[futures.Future] = []
+        self._pool = futures.ThreadPoolExecutor(max_workers=threads, thread_name_prefix="Builder")
 
     def run(self, action, force=False):
         self._collect_actions(action, force=force)
@@ -18,23 +19,32 @@ class Executor:
         for _ in range(self.threads):
             self._schedule_next()
 
-        while self.futures:
-            done, not_done = futures.wait(self.futures, return_when=futures.FIRST_COMPLETED)
+        while self._running_actions:
+            done, not_done = futures.wait(self._running_actions, return_when=futures.FIRST_COMPLETED)
             for d in done:
-                self.futures.remove(d)
-                self._schedule_next()
+                self._running_actions.remove(d)
+                exception = d.exception()
+                if exception:
+                    logging.critical("An action failed!")
+                    logging.critical(exception)
+                    if self._pending_actions:
+                        logging.critical("Waiting for other running actions to terminate")
+                    self._pending_actions = set()
+                else:
+                    self._schedule_next()
 
-        if self.pending_actions:
+        # Todo: remove these 4 lines
+        if self._pending_actions:
             logging.error("Could not schedule any action, something failed")
-            logging.error(f"Remaining: {self.pending_actions}")
+            logging.error(f"Remaining: {self._pending_actions}")
             breakpoint()
 
     def _collect_actions(self, action: Action, force=False):
         if not force and action.is_satisfied():
             return
 
-        if action not in self.pending_actions:
-            self.pending_actions.add(action)
+        if action not in self._pending_actions:
+            self._pending_actions.add(action)
             for dep in action.dependencies:
                 self._collect_actions(dep)
 
@@ -44,14 +54,14 @@ class Executor:
             logging.debug(f"Did not find more runnable actions")
             return
 
-        future = self.pool.submit(self._run_action, next_runnable_action)
-        self.futures.append(future)
+        future = self._pool.submit(self._run_action, next_runnable_action)
+        self._running_actions.append(future)
 
     def _get_next_runnable_action(self):
-        for action in self.pending_actions:
+        for action in self._pending_actions:
             if all([d.is_satisfied() for d in action.dependencies]):
-                self.pending_actions.remove(action)
+                self._pending_actions.remove(action)
                 return action
 
     def _run_action(self, action: Action):
-        action.run(show_output=self.show_output)
+        return action.run(show_output=self.show_output)
-- 
GitLab