Skip to content

Implement DatasetWorker

Since Arkindex 1.5.1 the endpoint ListDatasetElements is available and must be used by the pre-processing worker to list all elements needed to create archives.

We will need to implement a brand new DatasetWorker that will mimic the current ElementsWorker here. You can implement it in arkindex_worker/worker/__init__.py

Some specs

Click to expand

from itertools import groupby
from operator import itemgetter
from typing import Iterator, List, Tuple
from uuid import UUID
from arkindex_worker.models import Dataset, Element
from arkindex_worker.worker.base import BaseWorker
from arkindex_worker.worker.dataset import DatasetMixin


class DatasetWorker(BaseWorker, DatasetMixin):
    def __init__(
        self,
        description: str = "Arkindex Elements Worker",
        support_cache: bool = False,
        # Whether this worker is allowed to update the state of this dataset
        generator=False,
    ):
        super().__init__(description, support_cache)

        # Only support `--dataset` argument
        # No need for classes, entity types, worker version

        self.generator = generator

    def list_dataset_elements_per_set(
        self, dataset: Dataset
    ) -> Iterator[Tuple[str, Element]]:
        """
        Calls `list_dataset_elements` but returns results grouped by Set
        """
        return groupby(
            sorted(self.list_dataset_elements(dataset), key=itemgetter(0)),
            key=itemgetter(0),
        )

    def process_dataset(self, dataset: Dataset):
        """To override"""

    def list_datasets(self) -> List[Dataset] | List[str]:
        """
        Calls ListProcessDatasets if not is_read_only,
        else simply give the list of IDs passed via CLI
        """

    def run(self):
        self.configure()

        # List all elements either from JSON file
        # or direct list of elements on CLI
        datasets: List[Dataset] = self.list_datasets()
        if not datasets:
            logger.warning("No elements to process, stopping.")
            sys.exit(1)

        # Process every element
        count = len(datasets)
        failed = 0
        for i, item in enumerate(datasets, start=1):
            dataset = None
            try:
                if self.is_read_only:
                    # Just use the result of list_datasets as the Dataset
                    dataset = item
                else:
                    # Load element using the Arkindex API
                    dataset = Dataset(**self.request("RetrieveDataset", id=item))

                # Check dataset state
                # If generator, dataset state must be Open
                # Else, dataset state must be Complete
                # Raise if wrong state

                if self.generator:
                    # Update State to Building
                    logger.info(f"Building {dataset} ({i}/{count})")

                # Process the element and report its progress if activities are enabled
                self.process_dataset(dataset)

                if self.generator:
                    # Update State to Complete
                    logger.info(f"Completed {dataset} ({i}/{count})")
            except Exception as e:
                # Handle errors occurring while retrieving, processing or patching the activity for this element.
                # Count the element as failed in case the activity update to "started" failed with no conflict.
                # This prevent from processing the element
                failed += 1

                # Handle the case where we failed retrieving the element
                dataset_id = dataset.id if dataset else item
                if self.generator:
                    # Update State to Error
                    ...

                if isinstance(e, ErrorResponse):
                    message = f"An API error occurred while processing dataset {dataset_id}: {e.title} - {e.content}"
                else:
                    message = (
                        f"Failed running worker on dataset {dataset_id}: {repr(e)}"
                    )

                logger.warning(
                    message,
                    exc_info=e if self.args.verbose else None,
                )

        if failed:
            logger.error(
                "Ran on {} dataset: {} completed, {} failed".format(
                    count, count - failed, failed
                )
            )
            if failed >= count:  # Everything failed!
                sys.exit(1)
Edited by Yoann Schneider