Skip to content
Snippets Groups Projects
Commit 25d4fff2 authored by Valentin Rigal's avatar Valentin Rigal
Browse files

Use a custom conflict Exception

parent 3de131d9
No related branches found
No related tags found
No related merge requests found
......@@ -27,6 +27,11 @@ class ActivityState(Enum):
Error = "error"
class ConflictResponse(ErrorResponse):
# Custom Exception type for HTTP 409 conflict
pass
class ElementsWorker(
BaseWorker,
ClassificationMixin,
......@@ -127,30 +132,30 @@ class ElementsWorker(
logger.info(f"Processing {element} ({i}/{count})")
# Process the element and report its progress if activities are enabled
response = self.update_activity(element.id, ActivityState.Started)
if isinstance(response, ErrorResponse) and response.status_code == 409:
# 409 conflict error when setting an activity to "started" mean that we cannot
# process this element. We assume that the reason is that the state transition
# was forbidden i.e. that the activity was already in a started or processed state.
# This allow concurrent access to an element activity.
# Element is not counted as failed as it is probably handled by another process.
try:
self.update_activity(element.id, ActivityState.Started)
except ConflictResponse as e:
# Skip this element in case of conflict while initializing the activity
logger.warning(
f"Cannot start processing element {element.id} due to a conflict. "
"Another process could have processed it with the same version already."
f"Cannot start processing element {element.id} due to a conflict, "
f"another process could have processed it with the same version already: {e.content}"
)
continue
elif isinstance(response, Exception):
# Count the element as failed in case the activity update to "started" failed with no conflict.
# This prevent from processing the element
self.process_element(element)
try:
self.update_activity(element.id, ActivityState.Processed)
except ConflictResponse as e:
# Do not count this element as failed
logger.warning(
f"Element {element.id} is counted as failed because its activity could not be initialized."
f"Element {element.id} was processed but its activity could not be updated: {e.content}"
)
failed += 1
continue
self.process_element(element)
self.update_activity(element.id, ActivityState.Processed)
except Exception as e:
# Handle errors occurring while retrieving, processing or patching the activity of the element
# Count the element as failed in case the activity update to "started" failed with no conflict.
# This prevent from processing the element
failed += 1
# Handle the case where we failed retrieving the element
element_id = element.id if element else item
......@@ -212,15 +217,18 @@ class ElementsWorker(
"state": state.value,
},
)
logger.debug(f"Updated activity of element {element_id} to {state}")
return out
except ErrorResponse as e:
logger.warning(
f"Failed to update activity of element {element_id} to {state.value} due to an API error: {e.content}"
)
return e
except Exception as e:
logger.warning(
f"Failed to update activity of element {element_id} to {state.value}: {e}"
)
return e
# 409 conflict error when updating the state of an activity mean that we cannot
# process this element. We assume that the reason is that the state transition
# was forbidden i.e. that the activity was already in a started or processed state.
# This allow concurrent access to an element activity between multiple processes.
# Element should not be counted as failed as it is probably handled somewhere else.
if e.status_code == 409:
raise ConflictResponse(e)
raise e
logger.debug(f"Updated activity of element {element_id} to {state}")
return out
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment