Implement DatasetWorker
Since Arkindex 1.5.1 the endpoint ListDatasetElements is available and must be used by the pre-processing worker to list all elements needed to create archives.
We will need to implement a brand new DatasetWorker
that will mimic the current ElementsWorker
here. You can implement it in arkindex_worker/worker/__init__.py
Some specs
Click to expand
from itertools import groupby
from operator import itemgetter
from typing import Iterator, List, Tuple
from uuid import UUID
from arkindex_worker.models import Dataset, Element
from arkindex_worker.worker.base import BaseWorker
from arkindex_worker.worker.dataset import DatasetMixin
class DatasetWorker(BaseWorker, DatasetMixin):
def __init__(
self,
description: str = "Arkindex Elements Worker",
support_cache: bool = False,
# Whether this worker is allowed to update the state of this dataset
generator=False,
):
super().__init__(description, support_cache)
# Only support `--dataset` argument
# No need for classes, entity types, worker version
self.generator = generator
def list_dataset_elements_per_set(
self, dataset: Dataset
) -> Iterator[Tuple[str, Element]]:
"""
Calls `list_dataset_elements` but returns results grouped by Set
"""
return groupby(
sorted(self.list_dataset_elements(dataset), key=itemgetter(0)),
key=itemgetter(0),
)
def process_dataset(self, dataset: Dataset):
"""To override"""
def list_datasets(self) -> List[Dataset] | List[str]:
"""
Calls ListProcessDatasets if not is_read_only,
else simply give the list of IDs passed via CLI
"""
def run(self):
self.configure()
# List all elements either from JSON file
# or direct list of elements on CLI
datasets: List[Dataset] = self.list_datasets()
if not datasets:
logger.warning("No elements to process, stopping.")
sys.exit(1)
# Process every element
count = len(datasets)
failed = 0
for i, item in enumerate(datasets, start=1):
dataset = None
try:
if self.is_read_only:
# Just use the result of list_datasets as the Dataset
dataset = item
else:
# Load element using the Arkindex API
dataset = Dataset(**self.request("RetrieveDataset", id=item))
# Check dataset state
# If generator, dataset state must be Open
# Else, dataset state must be Complete
# Raise if wrong state
if self.generator:
# Update State to Building
logger.info(f"Building {dataset} ({i}/{count})")
# Process the element and report its progress if activities are enabled
self.process_dataset(dataset)
if self.generator:
# Update State to Complete
logger.info(f"Completed {dataset} ({i}/{count})")
except Exception as e:
# Handle errors occurring while retrieving, processing or patching the activity for this element.
# Count the element as failed in case the activity update to "started" failed with no conflict.
# This prevent from processing the element
failed += 1
# Handle the case where we failed retrieving the element
dataset_id = dataset.id if dataset else item
if self.generator:
# Update State to Error
...
if isinstance(e, ErrorResponse):
message = f"An API error occurred while processing dataset {dataset_id}: {e.title} - {e.content}"
else:
message = (
f"Failed running worker on dataset {dataset_id}: {repr(e)}"
)
logger.warning(
message,
exc_info=e if self.args.verbose else None,
)
if failed:
logger.error(
"Ran on {} dataset: {} completed, {} failed".format(
count, count - failed, failed
)
)
if failed >= count: # Everything failed!
sys.exit(1)
Edited by Yoann Schneider