From 25d4fff22fddc43e0f645eb686edbb5bded322fe Mon Sep 17 00:00:00 2001
From: Valentin Rigal <rigal@teklia.com>
Date: Mon, 21 Jun 2021 11:01:23 +0200
Subject: [PATCH] Use a custom conflict Exception

---
 arkindex_worker/worker/__init__.py | 56 +++++++++++++++++-------------
 1 file changed, 32 insertions(+), 24 deletions(-)

diff --git a/arkindex_worker/worker/__init__.py b/arkindex_worker/worker/__init__.py
index d83b92a1..96135192 100644
--- a/arkindex_worker/worker/__init__.py
+++ b/arkindex_worker/worker/__init__.py
@@ -27,6 +27,11 @@ class ActivityState(Enum):
     Error = "error"
 
 
+class ConflictResponse(ErrorResponse):
+    # Custom Exception type for HTTP 409 conflict
+    pass
+
+
 class ElementsWorker(
     BaseWorker,
     ClassificationMixin,
@@ -127,30 +132,30 @@ class ElementsWorker(
                 logger.info(f"Processing {element} ({i}/{count})")
 
                 # Process the element and report its progress if activities are enabled
-                response = self.update_activity(element.id, ActivityState.Started)
-                if isinstance(response, ErrorResponse) and response.status_code == 409:
-                    # 409 conflict error when setting an activity to "started" mean that we cannot
-                    # process this element. We assume that the reason is that the state transition
-                    # was forbidden i.e. that the activity was already in a started or processed state.
-                    # This allow concurrent access to an element activity.
-                    # Element is not counted as failed as it is probably handled by another process.
+                try:
+                    self.update_activity(element.id, ActivityState.Started)
+                except ConflictResponse as e:
+                    # Skip this element in case of conflict while initializing the activity
                     logger.warning(
-                        f"Cannot start processing element {element.id} due to a conflict. "
-                        "Another process could have processed it with the same version already."
+                        f"Cannot start processing element {element.id} due to a conflict, "
+                        f"another process could have processed it with the same version already: {e.content}"
                     )
                     continue
-                elif isinstance(response, Exception):
-                    # Count the element as failed in case the activity update to "started" failed with no conflict.
-                    # This prevent from processing the element
+                self.process_element(element)
+                try:
+                    self.update_activity(element.id, ActivityState.Processed)
+                except ConflictResponse as e:
+                    # Do not count this element as failed
                     logger.warning(
-                        f"Element {element.id} is counted as failed because its activity could not be initialized."
+                        f"Element {element.id} was processed but its activity could not be updated: {e.content}"
                     )
-                    failed += 1
                     continue
-                self.process_element(element)
-                self.update_activity(element.id, ActivityState.Processed)
             except Exception as e:
+                # Handle errors occurring while retrieving, processing or patching the activity of the element
+                # Count the element as failed in case the activity update to "started" failed with no conflict.
+                # This prevent from processing the element
                 failed += 1
+
                 # Handle the case where we failed retrieving the element
                 element_id = element.id if element else item
 
@@ -212,15 +217,18 @@ class ElementsWorker(
                     "state": state.value,
                 },
             )
-            logger.debug(f"Updated activity of element {element_id} to {state}")
-            return out
         except ErrorResponse as e:
             logger.warning(
                 f"Failed to update activity of element {element_id} to {state.value} due to an API error: {e.content}"
             )
-            return e
-        except Exception as e:
-            logger.warning(
-                f"Failed to update activity of element {element_id} to {state.value}: {e}"
-            )
-            return e
+            # 409 conflict error when updating the state of an activity mean that we cannot
+            # process this element. We assume that the reason is that the state transition
+            # was forbidden i.e. that the activity was already in a started or processed state.
+            # This allow concurrent access to an element activity between multiple processes.
+            # Element should not be counted as failed as it is probably handled somewhere else.
+            if e.status_code == 409:
+                raise ConflictResponse(e)
+            raise e
+
+        logger.debug(f"Updated activity of element {element_id} to {state}")
+        return out
-- 
GitLab