Skip to content

Support ListDatasetElements

Since Arkindex 1.5.1 the endpoint ListDatasetElements is available and must be used by the pre-processing worker to list all elements needed to create archives.

We will need to implement a brand new DatasetWorker that will mimic the current ElementsWorker here. The worker will use this class as base.

Some specs

Click to expand

from itertools import groupby
from operator import itemgetter
from typing import Iterator, List, Tuple
from uuid import UUID
from arkindex_worker.models import Dataset, Element
from arkindex_worker.worker.base import BaseWorker
from arkindex_worker.worker.dataset import DatasetMixin


class DatasetWorker(BaseWorker, DatasetMixin):
    def __init__(
        self,
        description: str = "Arkindex Elements Worker",
        support_cache: bool = False,
        creator=False,
    ):
        super().__init__(description, support_cache)

        # Only support `--dataset` argument
        # No need for classes, entity types, worker version

        self.creator = creator

    def list_dataset_elements_per_set(
        self, dataset: Dataset
    ) -> Iterator[Tuple[str, Element]]:
        """
        Calls `list_dataset_elements` but returns results grouped by Set
        """
        return groupby(
            sorted(self.list_dataset_elements(dataset), key=itemgetter(0)),
            key=itemgetter(0),
        )

    def process_dataset(self, dataset: Dataset):
        """To override"""

    def list_datasets(self) -> List[Dataset] | List[str]:
        """
        Calls ListProcessDatasets if not is_read_only,
        else simply give the list of IDs passed via CLI
        """

    def run(self):
        self.configure()

        # List all elements either from JSON file
        # or direct list of elements on CLI
        datasets: List[Dataset] = self.list_datasets()
        if not datasets:
            logger.warning("No elements to process, stopping.")
            sys.exit(1)

        # Process every element
        count = len(datasets)
        failed = 0
        for i, item in enumerate(datasets, start=1):
            dataset = None
            try:
                if self.is_read_only:
                    # Just use the result of list_datasets as the Dataset
                    dataset = item
                else:
                    # Load element using the Arkindex API
                    dataset = Dataset(**self.request("RetrieveDataset", id=item))

                # Check dataset state
                # If creator, dataset state must be Open
                # Else, dataset state must be Complete
                # Raise if wrong state

                if self.creator:
                    # Update State to Building
                    logger.info(f"Building {dataset} ({i}/{count})")

                # Process the element and report its progress if activities are enabled
                self.process_dataset(dataset)

                if self.creator:
                    # Update State to Complete
                    logger.info(f"Completed {dataset} ({i}/{count})")
            except Exception as e:
                # Handle errors occurring while retrieving, processing or patching the activity for this element.
                # Count the element as failed in case the activity update to "started" failed with no conflict.
                # This prevent from processing the element
                failed += 1

                # Handle the case where we failed retrieving the element
                dataset_id = dataset.id if dataset else item
                if self.creator:
                    # Update State to Error
                    ...

                if isinstance(e, ErrorResponse):
                    message = f"An API error occurred while processing dataset {dataset_id}: {e.title} - {e.content}"
                else:
                    message = (
                        f"Failed running worker on dataset {dataset_id}: {repr(e)}"
                    )

                logger.warning(
                    message,
                    exc_info=e if self.args.verbose else None,
                )

        if failed:
            logger.error(
                "Ran on {} dataset: {} completed, {} failed".format(
                    count, count - failed, failed
                )
            )
            if failed >= count:  # Everything failed!
                sys.exit(1)


class DatasetExtractor(DatasetWorker):
    def process_dataset(self, dataset: Dataset):
        # Call list_dataset_elements_per_set to do the process
        ...


def main():
    DatasetExtractor(
        description="Fill base-worker cache with information about dataset and extract images",
        support_cache=True,
        creator=True,
    ).run()
Edited by Yoann Schneider