Skip to content
Snippets Groups Projects

Update Worker Activity per element

Merged Bastien Abadie requested to merge update-activity into master
5 files
+ 325
34
Compare changes
  • Side-by-side
  • Inline
Files
5
+ 56
0
@@ -84,6 +84,11 @@ class BaseWorker(object):
self.api_client = ArkindexClient(**options_from_env())
logger.debug(f"Setup Arkindex API client on {self.api_client.document.url}")
# Load features available on backend, and check authentication
user = self.api_client.request("RetrieveUser")
logger.debug(f"Connected as {user['display_name']} - {user['email']}")
self.features = user["features"]
if self.worker_version_id:
# Retrieve initial configuration from API
worker_version = self.api_client.request(
@@ -189,6 +194,13 @@ class MetaType(Enum):
Reference = "reference"
class ActivityState(Enum):
Queued = "queued"
Started = "started"
Processed = "processed"
Error = "error"
class ElementsWorker(BaseWorker):
def __init__(self, description="Arkindex Elements Worker"):
super().__init__(description)
@@ -254,13 +266,18 @@ class ElementsWorker(BaseWorker):
**self.api_client.request("RetrieveElement", id=element_id)
)
logger.info(f"Processing {element} ({i}/{count})")
# Report start of process, run process, then report end of process
self.update_activity(element, ActivityState.Started)
self.process_element(element)
self.update_activity(element, ActivityState.Processed)
except ErrorResponse as e:
failed += 1
logger.warning(
f"An API error occurred while processing element {element_id}: {e.title} - {e.content}",
exc_info=e if self.args.verbose else None,
)
self.update_activity(element, ActivityState.Error)
self.report.error(element_id, e)
except Exception as e:
failed += 1
@@ -268,6 +285,7 @@ class ElementsWorker(BaseWorker):
f"Failed running worker on element {element_id}: {e}",
exc_info=e if self.args.verbose else None,
)
self.update_activity(element, ActivityState.Error)
self.report.error(element_id, e)
# Save report as local artifact
@@ -782,3 +800,41 @@ class ElementsWorker(BaseWorker):
)
return children
def update_activity(self, element, state):
"""
Update worker activity for this element
This method should not raise a runtime exception, but simply warn users
"""
assert element and isinstance(
element, Element
), "element shouldn't be null and should be of type Element"
assert isinstance(state, ActivityState), "state should be an ActivityState"
if not self.features.get("workers_activity"):
logger.debug("Skipping Worker activity update as it's disabled on backend")
return
if self.is_read_only:
logger.warning("Cannot update activity as this worker is in read-only mode")
return
try:
out = self.api_client.request(
"UpdateWorkerActivity",
id=self.worker_version_id,
body={
"element_id": element.id,
"state": state.value,
},
)
logger.debug(f"Updated activity of element {element.id} to {state}")
return out
except ErrorResponse as e:
logger.warning(
f"Failed to update activity of element {element.id} to {state.value} due to an API error: {e.content}"
)
except Exception as e:
logger.warning(
f"Failed to update activity of element {element.id} to {state.value}: {e}"
)
Loading