# -*- coding: utf-8 -*- import argparse import json import logging import os import sys import uuid from arkindex import ArkindexClient, options_from_env from arkindex_worker import logger from arkindex_worker.models import Element from arkindex_worker.reporting import Reporter class BaseWorker(object): def __init__(self, description="Arkindex Base Worker"): self.parser = argparse.ArgumentParser(description=description) # Setup workdir either in Ponos environment or on host's home if os.environ.get("PONOS_DATA"): self.work_dir = os.path.join(os.environ["PONOS_DATA"], "current") else: # We use the official XDG convention to store file for developers # https://specifications.freedesktop.org/basedir-spec/basedir-spec-latest.html xdg_data_home = os.environ.get( "XDG_DATA_HOME", os.path.expanduser("~/.local/share") ) self.work_dir = os.path.join(xdg_data_home, "arkindex") os.makedirs(self.work_dir, exist_ok=True) logger.info(f"Worker will use {self.work_dir} as working directory") def configure(self): """ Configure worker using cli args and environment variables """ self.parser.add_argument( "-v", "--verbose", help="Display more information on events and errors", action="store_true", default=False, ) # Call potential extra arguments self.add_arguments() # CLI args are stored on the instance so that implementations can access them self.args = self.parser.parse_args() # Setup logging level if self.args.verbose: logger.setLevel(logging.DEBUG) logger.debug("Debug output enabled") # Build Arkindex API client from environment variables self.api_client = ArkindexClient(**options_from_env()) logger.debug(f"Setup Arkindex API client on {self.api_client.document.url}") def add_arguments(self): """Override this method to add argparse argument to this worker""" def run(self): """Override this method to implement your own process""" class ElementsWorker(BaseWorker): def __init__(self, description="Arkindex Elements Worker"): super().__init__(description) # Add report concerning elements self.report = Reporter("unknown worker") # Add mandatory argument to process elements self.parser.add_argument( "--elements-list", help="JSON elements list to use", type=open, default=os.environ.get("TASK_ELEMENTS"), ) self.parser.add_argument( "--element", type=uuid.UUID, nargs="+", help="One or more Arkindex element ID", ) def list_elements(self): out = [] # Process elements from JSON file if self.args.elements_list: data = json.load(self.args.elements_list) assert isinstance(data, list), "Elements list must be a list" assert len(data), "No elements in elements list" out += list(filter(None, [element.get("id") for element in data])) # Add any extra element from CLI if self.args.element: out += self.args.element return out def run(self): """ Process every elements from the provided list """ self.configure() # List all elements either from JSON file # or direct list of elements on CLI elements = self.list_elements() if not elements: logger.warning("No elements to process, stopping.") sys.exit(1) # Process every element count = len(elements) failed = 0 for i, element_id in enumerate(elements, start=1): try: # Load element using Arkindex API element = Element( **self.api_client.request("RetrieveElement", id=element_id) ) logger.info(f"Processing {element} ({i}/{count})") self.process_element(element) except Exception as e: failed += 1 logger.warning( "Failed running worker on {}: {!r}".format(element_id, e), exc_info=e if self.args.verbose else None, ) self.report.error(element_id, e) # Save report as local artifact self.report.save(os.path.join(self.work_dir, "ml_report.json")) if failed: logger.error( "Ran on {} elements: {} completed, {} failed".format( count, count - failed, failed ) ) if failed >= count: # Everything failed! sys.exit(1) def process_element(self, element): """Override this method to analyze an Arkindex element from the provided list"""