Skip to content
Snippets Groups Projects

Handle concurrency while initializing activity

Merged Valentin Rigal requested to merge activities-concurrency into master
3 files
+ 168
74
Compare changes
  • Side-by-side
  • Inline
Files
3
@@ -127,16 +127,22 @@ class ElementsWorker(
logger.info(f"Processing {element} ({i}/{count})")
# Process the element and report its progress if activities are enabled
self.update_activity(element.id, ActivityState.Started)
self.process_element(element)
self.update_activity(element.id, ActivityState.Processed)
if self.update_activity(element.id, ActivityState.Started):
self.process_element(element)
self.update_activity(element.id, ActivityState.Processed)
else:
logger.info(
f"Skipping element {element.id} as it was already processed"
)
continue
except Exception as e:
# Handle errors occurring while retrieving, processing or patching the activity for this element.
# Count the element as failed in case the activity update to "started" failed with no conflict.
# This prevent from processing the element
failed += 1
element_id = (
element.id
if isinstance(element, (Element, CachedElement))
else item
)
# Handle the case where we failed retrieving the element
element_id = element.id if element else item
if isinstance(e, ErrorResponse):
message = f"An API error occurred while processing element {element_id}: {e.title} - {e.content}"
@@ -147,7 +153,12 @@ class ElementsWorker(
message,
exc_info=e if self.args.verbose else None,
)
self.update_activity(element_id, ActivityState.Error)
if element:
# Try to update the activity to error state regardless of the response
try:
self.update_activity(element.id, ActivityState.Error)
except Exception:
pass
self.report.error(element_id, e)
# Save report as local artifact
@@ -168,13 +179,14 @@ class ElementsWorker(
def update_activity(self, element_id, state):
"""
Update worker activity for this element
This method should not raise a runtime exception, but simply warn users
Returns False when there is a conflict initializing the activity
Otherwise return True or the response payload
"""
if not self.store_activity:
logger.debug(
"Activity is not stored as the feature is disabled on this process"
)
return
return True
assert element_id and isinstance(
element_id, (uuid.UUID, str)
@@ -183,10 +195,10 @@ class ElementsWorker(
if self.is_read_only:
logger.warning("Cannot update activity as this worker is in read-only mode")
return
return True
try:
out = self.request(
self.request(
"UpdateWorkerActivity",
id=self.worker_version_id,
body={
@@ -195,13 +207,22 @@ class ElementsWorker(
"state": state.value,
},
)
logger.debug(f"Updated activity of element {element_id} to {state}")
return out
except ErrorResponse as e:
if state == ActivityState.Started and e.status_code == 409:
# 409 conflict error when updating the state of an activity to "started" mean that we
# cannot process this element. We assume that the reason is that the state transition
# was forbidden i.e. that the activity was already in a started or processed state.
# This allow concurrent access to an element activity between multiple processes.
# Element should not be counted as failed as it is probably handled somewhere else.
logger.debug(
f"Cannot start processing element {element_id} due to a conflict. "
f"Another process could have processed it with the same version already."
)
return False
logger.warning(
f"Failed to update activity of element {element_id} to {state.value} due to an API error: {e.content}"
)
except Exception as e:
logger.warning(
f"Failed to update activity of element {element_id} to {state.value}: {e}"
)
raise e
logger.debug(f"Updated activity of element {element_id} to {state}")
return True
Loading