Support ListDatasetElements
Since Arkindex 1.5.1 the endpoint ListDatasetElements is available and must be used by the pre-processing worker to list all elements needed to create archives.
We will need to implement a brand new DatasetWorker
that will mimic the current ElementsWorker
here. The worker will use this class as base.
Some specs
Click to expand
from itertools import groupby
from operator import itemgetter
from typing import Iterator, List, Tuple
from uuid import UUID
from arkindex_worker.models import Dataset, Element
from arkindex_worker.worker.base import BaseWorker
from arkindex_worker.worker.dataset import DatasetMixin
class DatasetWorker(BaseWorker, DatasetMixin):
def __init__(
self,
description: str = "Arkindex Elements Worker",
support_cache: bool = False,
creator=False,
):
super().__init__(description, support_cache)
# Only support `--dataset` argument
# No need for classes, entity types, worker version
self.creator = creator
def list_dataset_elements_per_set(
self, dataset: Dataset
) -> Iterator[Tuple[str, Element]]:
"""
Calls `list_dataset_elements` but returns results grouped by Set
"""
return groupby(
sorted(self.list_dataset_elements(dataset), key=itemgetter(0)),
key=itemgetter(0),
)
def process_dataset(self, dataset: Dataset):
"""To override"""
def list_datasets(self) -> List[Dataset] | List[str]:
"""
Calls ListProcessDatasets if not is_read_only,
else simply give the list of IDs passed via CLI
"""
def run(self):
self.configure()
# List all elements either from JSON file
# or direct list of elements on CLI
datasets: List[Dataset] = self.list_datasets()
if not datasets:
logger.warning("No elements to process, stopping.")
sys.exit(1)
# Process every element
count = len(datasets)
failed = 0
for i, item in enumerate(datasets, start=1):
dataset = None
try:
if self.is_read_only:
# Just use the result of list_datasets as the Dataset
dataset = item
else:
# Load element using the Arkindex API
dataset = Dataset(**self.request("RetrieveDataset", id=item))
# Check dataset state
# If creator, dataset state must be Open
# Else, dataset state must be Complete
# Raise if wrong state
if self.creator:
# Update State to Building
logger.info(f"Building {dataset} ({i}/{count})")
# Process the element and report its progress if activities are enabled
self.process_dataset(dataset)
if self.creator:
# Update State to Complete
logger.info(f"Completed {dataset} ({i}/{count})")
except Exception as e:
# Handle errors occurring while retrieving, processing or patching the activity for this element.
# Count the element as failed in case the activity update to "started" failed with no conflict.
# This prevent from processing the element
failed += 1
# Handle the case where we failed retrieving the element
dataset_id = dataset.id if dataset else item
if self.creator:
# Update State to Error
...
if isinstance(e, ErrorResponse):
message = f"An API error occurred while processing dataset {dataset_id}: {e.title} - {e.content}"
else:
message = (
f"Failed running worker on dataset {dataset_id}: {repr(e)}"
)
logger.warning(
message,
exc_info=e if self.args.verbose else None,
)
if failed:
logger.error(
"Ran on {} dataset: {} completed, {} failed".format(
count, count - failed, failed
)
)
if failed >= count: # Everything failed!
sys.exit(1)
class DatasetExtractor(DatasetWorker):
def process_dataset(self, dataset: Dataset):
# Call list_dataset_elements_per_set to do the process
...
def main():
DatasetExtractor(
description="Fill base-worker cache with information about dataset and extract images",
support_cache=True,
creator=True,
).run()
Edited by Yoann Schneider