Skip to content
Snippets Groups Projects

Handle concurrency while initializing activity

Merged Valentin Rigal requested to merge activities-concurrency into master
1 file
+ 21
18
Compare changes
  • Side-by-side
  • Inline
@@ -127,23 +127,14 @@ class ElementsWorker(
logger.info(f"Processing {element} ({i}/{count})")
# Process the element and report its progress if activities are enabled
try:
self.update_activity(element.id, ActivityState.Started)
except ErrorResponse as e:
if e.status_code != 409:
raise e
# 409 conflict error when updating the state of an activity to "started" mean that we
# cannot process this element. We assume that the reason is that the state transition
# was forbidden i.e. that the activity was already in a started or processed state.
# This allow concurrent access to an element activity between multiple processes.
# Element should not be counted as failed as it is probably handled somewhere else.
logger.warning(
f"Cannot start processing element {element.id} due to a conflict. "
f"Another process could have processed it with the same version already."
if self.update_activity(element.id, ActivityState.Started):
self.process_element(element)
self.update_activity(element.id, ActivityState.Processed)
else:
logger.info(
f"Skipping element {element.id} as it was already processed"
)
continue
self.process_element(element)
self.update_activity(element.id, ActivityState.Processed)
except Exception as e:
# Handle errors occurring while retrieving, processing or patching the activity for this element.
# Count the element as failed in case the activity update to "started" failed with no conflict.
@@ -188,13 +179,14 @@ class ElementsWorker(
def update_activity(self, element_id, state):
"""
Update worker activity for this element
This method should not raise a runtime exception, but simply warn users
Returns False when there is a conflict initializing the activity
Otherwise return True or the response payload
"""
if not self.store_activity:
logger.debug(
"Activity is not stored as the feature is disabled on this process"
)
return
return True
assert element_id and isinstance(
element_id, (uuid.UUID, str)
@@ -203,7 +195,7 @@ class ElementsWorker(
if self.is_read_only:
logger.warning("Cannot update activity as this worker is in read-only mode")
return
return True
try:
out = self.request(
@@ -216,6 +208,17 @@ class ElementsWorker(
},
)
except ErrorResponse as e:
if state == ActivityState.Started and e.status_code == 409:
# 409 conflict error when updating the state of an activity to "started" mean that we
# cannot process this element. We assume that the reason is that the state transition
# was forbidden i.e. that the activity was already in a started or processed state.
# This allow concurrent access to an element activity between multiple processes.
# Element should not be counted as failed as it is probably handled somewhere else.
logger.debug(
f"Cannot start processing element {element_id} due to a conflict. "
f"Another process could have processed it with the same version already."
)
return False
logger.warning(
f"Failed to update activity of element {element_id} to {state.value} due to an API error: {e.content}"
)
Loading