diff --git a/arkindex_worker/worker/__init__.py b/arkindex_worker/worker/__init__.py index cc32636d0c0e3e6bb056c4728ba996bda4f967be..d4f709a8a09676ec7443b0a7c2497730ef25c558 100644 --- a/arkindex_worker/worker/__init__.py +++ b/arkindex_worker/worker/__init__.py @@ -127,23 +127,14 @@ class ElementsWorker( logger.info(f"Processing {element} ({i}/{count})") # Process the element and report its progress if activities are enabled - try: - self.update_activity(element.id, ActivityState.Started) - except ErrorResponse as e: - if e.status_code != 409: - raise e - # 409 conflict error when updating the state of an activity to "started" mean that we - # cannot process this element. We assume that the reason is that the state transition - # was forbidden i.e. that the activity was already in a started or processed state. - # This allow concurrent access to an element activity between multiple processes. - # Element should not be counted as failed as it is probably handled somewhere else. - logger.warning( - f"Cannot start processing element {element.id} due to a conflict. " - f"Another process could have processed it with the same version already." + if self.update_activity(element.id, ActivityState.Started): + self.process_element(element) + self.update_activity(element.id, ActivityState.Processed) + else: + logger.info( + f"Skipping element {element.id} as it was already processed" ) continue - self.process_element(element) - self.update_activity(element.id, ActivityState.Processed) except Exception as e: # Handle errors occurring while retrieving, processing or patching the activity for this element. # Count the element as failed in case the activity update to "started" failed with no conflict. @@ -188,13 +179,14 @@ class ElementsWorker( def update_activity(self, element_id, state): """ Update worker activity for this element - This method should not raise a runtime exception, but simply warn users + Returns False when there is a conflict initializing the activity + Otherwise return True or the response payload """ if not self.store_activity: logger.debug( "Activity is not stored as the feature is disabled on this process" ) - return + return True assert element_id and isinstance( element_id, (uuid.UUID, str) @@ -203,7 +195,7 @@ class ElementsWorker( if self.is_read_only: logger.warning("Cannot update activity as this worker is in read-only mode") - return + return True try: out = self.request( @@ -216,6 +208,17 @@ class ElementsWorker( }, ) except ErrorResponse as e: + if state == ActivityState.Started and e.status_code == 409: + # 409 conflict error when updating the state of an activity to "started" mean that we + # cannot process this element. We assume that the reason is that the state transition + # was forbidden i.e. that the activity was already in a started or processed state. + # This allow concurrent access to an element activity between multiple processes. + # Element should not be counted as failed as it is probably handled somewhere else. + logger.debug( + f"Cannot start processing element {element_id} due to a conflict. " + f"Another process could have processed it with the same version already." + ) + return False logger.warning( f"Failed to update activity of element {element_id} to {state.value} due to an API error: {e.content}" )