From 26d00cc991a397d826fcb21afd1a397b1290eb3f Mon Sep 17 00:00:00 2001 From: Valentin Rigal <rigal@teklia.com> Date: Tue, 14 Jan 2025 14:25:22 +0000 Subject: [PATCH] Support feature worker declaration from gitlab.teklia.com --- .../commands/update_system_workers.py | 109 ++++++-- .../commands/test_update_system_workers.py | 236 +++++++++++++++++- arkindex/system_workers.yml | 18 +- 3 files changed, 322 insertions(+), 41 deletions(-) diff --git a/arkindex/process/management/commands/update_system_workers.py b/arkindex/process/management/commands/update_system_workers.py index 49949726e7..50d4fca06f 100644 --- a/arkindex/process/management/commands/update_system_workers.py +++ b/arkindex/process/management/commands/update_system_workers.py @@ -1,11 +1,18 @@ +from collections import defaultdict + +import requests +import yaml from django.conf import settings from django.core.management.base import BaseCommand, CommandError from django.db import transaction from django.db.models import Max -from teklia_toolbox.config import ConfigParser +from teklia_toolbox.config import ConfigParser, ConfigurationError from arkindex.process.models import ArkindexFeature, FeatureUsage, Worker, WorkerType, WorkerVersion, WorkerVersionState +REQUEST_TIMEOUT = (30, 60) +WORKER_YAML_VERSION = 2 + def parse_config(): parser = ConfigParser() @@ -15,16 +22,33 @@ def parse_config(): features_parser = parser.add_subparser("features") for feature in ArkindexFeature: feature_parser = features_parser.add_subparser(feature.value, allow_extra_keys=False, default={}) - feature_parser.add_option("image", type=str) + feature_parser.add_option("image", type=str, default=None) feature_parser.add_option("command", type=str, default=None) - - return parser.parse(settings.BASE_DIR / "system_workers.yml") + teklia_worker_parser = feature_parser.add_subparser("teklia_worker", allow_extra_keys=False, default=None) + teklia_worker_parser.add_option("name", type=str, default=None) + teklia_worker_parser.add_option("version", type=str, default=None) + teklia_worker_parser.add_option("slug", type=str, default=None) + + parsed = parser.parse(settings.BASE_DIR / "system_workers.yml") + + errors = defaultdict(list) + for feature, config in parsed["features"].items(): + if config["image"] and config["teklia_worker"]: + errors[feature].append("Exactly one of image/command or teklia_parser must be set") + continue + if (config["command"] and config["image"] is None): + errors[feature].append("command argument must be set with the image argument") + if (subparser := config["teklia_worker"]) and (None in subparser.values()): + errors[feature].append("teklia_parser configuration must define a name, a version and a slug") + if errors: + raise ConfigurationError(errors) + return parsed class Command(BaseCommand): help = "Update the workers used to provide Arkindex features to the versions compatible with this release." - def get_system_worker(self, feature: ArkindexFeature) -> Worker: + def get_system_worker(self, feature: ArkindexFeature, repo = None) -> Worker: """ Update or create a `system` worker for this feature. Creates the `system` worker type if it does not exist. @@ -48,6 +72,7 @@ class Command(BaseCommand): worker, created = Worker.objects.get_or_create( type=worker_type, slug=feature.value, + repository_url=repo, defaults={ "name": feature.name, "public": True, @@ -58,7 +83,7 @@ class Command(BaseCommand): self.stdout.write(f"Created new {worker.name} system worker") else: self.stdout.write(f"Using existing system worker {worker.name}") - self.update_existing_worker(worker) + self. update_existing_worker(worker) return worker @@ -83,7 +108,7 @@ class Command(BaseCommand): else: self.stdout.write("Worker is up to date") - def update_or_create_version(self, worker: Worker, feature: ArkindexFeature, docker_image: str, docker_command: str = None) -> None: + def update_or_create_version(self, worker: Worker, feature: ArkindexFeature, docker_image: str, docker_command: str = None, configuration: dict = {}) -> None: """ On a specified worker, assigns an existing version to a feature or creates a new one. Expects that no version is already assigned to this feature on any worker. @@ -97,11 +122,9 @@ class Command(BaseCommand): # Sorting by state means we prefer `available` versions first, then `created`, then `error`. worker_version = worker.versions.filter( docker_image_iid=docker_image, - # We ignore existing versions with attributes that could interfere with the features, - # like blocking a start or retry, or wasting resources. - configuration__user_configuration__isnull=True, gpu_usage=FeatureUsage.Disabled, model_usage=FeatureUsage.Disabled, + configuration=configuration, **docker_command_filter ).order_by("state", "-updated").first() @@ -130,11 +153,11 @@ class Command(BaseCommand): feature=feature, state=WorkerVersionState.Available, version=max_version + 1, - configuration={"docker": {"command": docker_command}} if docker_command else {} + configuration=configuration, ) self.stdout.write(self.style.SUCCESS(f"Using new worker version {worker_version.id}")) - def check_existing_version(self, worker_version: WorkerVersion, feature: ArkindexFeature, docker_image: str, docker_command: str = None) -> None: + def check_existing_version(self, worker_version: WorkerVersion, feature: ArkindexFeature, *, docker_image: str, docker_command: str = None, configuration: dict = {}, repo: str = None) -> None: self.stdout.write(f"Current worker version: {worker_version.id} ({worker_version.docker_image_iid})") valid = True @@ -158,8 +181,10 @@ class Command(BaseCommand): self.stdout.write(self.style.WARNING("This version uses a custom Docker command which could interfere with the feature.")) valid = False - if worker_version.required_user_configuration_fields: - self.stdout.write(self.style.WARNING("This version requires a custom worker configuration which could interfere with the feature.")) + if worker_version.configuration != configuration: + self.stdout.write(self.style.WARNING( + "This version uses a custom configuration which could interfere with the feature." + )) valid = False if valid: @@ -176,26 +201,64 @@ class Command(BaseCommand): worker = worker_version.worker if worker.archived is not None: # except if it is archived, since the new version would be invalid - worker = self.get_system_worker(feature) + worker = self.get_system_worker(feature, repo) else: self.update_existing_worker(worker_version.worker) - self.update_or_create_version(worker, feature, docker_image, docker_command) + self.update_or_create_version(worker, feature, docker_image, docker_command, configuration) @transaction.atomic - def update_feature(self, feature: ArkindexFeature, config: dict): - self.stdout.write(f" {feature.name} ".center(80, "─")) - self.stdout.write(f"Using {config['image']} to provide {feature.name}") + def update_feature(self, feature: ArkindexFeature, *, image, command, configuration = {}, repo = None): + self.stdout.write(f"Using {image} to provide {feature.name}") try: worker_version = WorkerVersion.objects.get_by_feature(feature) - self.check_existing_version(worker_version, feature, config["image"], config["command"]) + self.check_existing_version( + worker_version, + feature, + docker_image=image, + docker_command=command, + configuration=configuration + ) except WorkerVersion.DoesNotExist: - worker = self.get_system_worker(feature) - self.update_or_create_version(worker, feature, config["image"], config["command"]) + worker = self.get_system_worker(feature, repo) + self.update_or_create_version(worker, feature, image, command, configuration) + + def update_feature_from_worker(self, feature, *, name, version, slug): + """ + Update a feature from a worker repository hosted on https://gitlab.teklia.com/ + """ + repo = f"https://gitlab.teklia.com/{name}" + self.stdout.write(f"Configuring feature {feature} from {repo}") + + # Retrieve the .arkindex.yml file with no auth + url = f"{repo}/-/raw/{version}/.arkindex.yml" + with requests.get(url, timeout=REQUEST_TIMEOUT) as resp: + try: + resp.raise_for_status() + except requests.exceptions.HTTPError as e: + raise CommandError(f"Error retrieving configuration at {url}: {e.response.status_code}.") + data = yaml.safe_load(resp.content) + if not isinstance(data, dict) or data.get("version", 0) < WORKER_YAML_VERSION or not data.get("workers"): + raise CommandError(f"Error retrieving configuration at {url}: invalid YAML configuration.") + # Look for the worker matching feature's slug + worker_conf = next((worker for worker in data["workers"] if worker["slug"] == slug), None) + if worker_conf is None: + raise CommandError(f"No worker with slug {slug} in .arkindex.yml at {url}.") + image = f"registry.gitlab.teklia.com/{name}/{version}" + command = worker_conf.get("docker", {}).get("command", None) + self.update_feature(feature, image=image, command=command, configuration=worker_conf, repo=repo) def handle(self, *args, **options): config = parse_config() for feature_value, feature_config in config["features"].items(): feature = ArkindexFeature(feature_value) - self.update_feature(feature, feature_config) + self.stdout.write(f" {feature.name} ".center(80, "─")) + worker = feature_config.pop("teklia_worker", None) + if worker is not None: + self.update_feature_from_worker(feature, **worker) + else: + configuration = {} + if (command := feature_config["command"]): + configuration["docker"] = {"command": command} + self.update_feature(feature, **feature_config, configuration=configuration) diff --git a/arkindex/process/tests/commands/test_update_system_workers.py b/arkindex/process/tests/commands/test_update_system_workers.py index 30736c3954..a626942629 100644 --- a/arkindex/process/tests/commands/test_update_system_workers.py +++ b/arkindex/process/tests/commands/test_update_system_workers.py @@ -2,13 +2,16 @@ from io import StringIO from textwrap import dedent from unittest.mock import patch +import responses from django.core.management import CommandError, call_command from django.utils import timezone from arkindex.process.models import ArkindexFeature, FeatureUsage, Worker, WorkerType, WorkerVersion, WorkerVersionState from arkindex.project.tests import ArkindexTestCase -MOCK_CONFIG = { + +def mock_config(): + return { "features": { "file_import": {"image": "registry.example.com/file-import:latest", "command": None}, "init_elements": @@ -19,6 +22,19 @@ MOCK_CONFIG = { } } +def mock_teklia_worker_config(): + return { + "features": { + "file_import": { + "teklia_worker": { + "name": "repository/import/file", + "version": "0.1.0", + "slug": "file-import", + } + } + } + } + class TestUpdateSystemWorkers(ArkindexTestCase): @@ -41,8 +57,8 @@ class TestUpdateSystemWorkers(ArkindexTestCase): Assert that a WorkerVersion has the expected attributes to provide a given feature """ self.assertEqual(worker_version.feature, feature) - self.assertEqual(worker_version.docker_image_iid, MOCK_CONFIG["features"][feature.value]["image"]) - self.assertEqual(worker_version.docker_command, MOCK_CONFIG["features"][feature.value]["command"]) + self.assertEqual(worker_version.docker_image_iid, mock_config()["features"][feature.value]["image"]) + self.assertEqual(worker_version.docker_command, mock_config()["features"][feature.value]["command"]) self.assertEqual(worker_version.state, WorkerVersionState.Available) self.assertEqual(worker_version.required_user_configuration_fields, set()) self.assertEqual(worker_version.gpu_usage, FeatureUsage.Disabled) @@ -51,7 +67,7 @@ class TestUpdateSystemWorkers(ArkindexTestCase): self.assertIsNone(worker_version.worker.archived) self.assertTrue(worker_version.worker.public) - @patch("arkindex.process.management.commands.update_system_workers.parse_config", lambda: MOCK_CONFIG) + @patch("arkindex.process.management.commands.update_system_workers.parse_config", mock_config) def test_from_scratch(self): output = self.update_system_workers() @@ -122,7 +138,7 @@ class TestUpdateSystemWorkers(ArkindexTestCase): """ ).strip()) - @patch("arkindex.process.management.commands.update_system_workers.parse_config", lambda: MOCK_CONFIG) + @patch("arkindex.process.management.commands.update_system_workers.parse_config", mock_config) def test_existing_worker_type(self): worker_type = WorkerType.objects.create(slug="system", display_name="Système") @@ -189,7 +205,7 @@ class TestUpdateSystemWorkers(ArkindexTestCase): """ ).strip()) - @patch("arkindex.process.management.commands.update_system_workers.parse_config", lambda: MOCK_CONFIG) + @patch("arkindex.process.management.commands.update_system_workers.parse_config", mock_config) def test_existing_system_workers(self): worker_type = WorkerType.objects.create(slug="system", display_name="Système") file_import_worker = Worker.objects.create( @@ -283,7 +299,7 @@ class TestUpdateSystemWorkers(ArkindexTestCase): """ ).strip()) - @patch("arkindex.process.management.commands.update_system_workers.parse_config", lambda: MOCK_CONFIG) + @patch("arkindex.process.management.commands.update_system_workers.parse_config", mock_config) def test_worker_slug_conflict(self): worker_type = WorkerType.objects.create(slug="systemnt", display_name="Systemn't") @@ -311,7 +327,7 @@ class TestUpdateSystemWorkers(ArkindexTestCase): """ ).strip()) - @patch("arkindex.process.management.commands.update_system_workers.parse_config", lambda: MOCK_CONFIG) + @patch("arkindex.process.management.commands.update_system_workers.parse_config", mock_config) def test_update_existing_system_workers(self): worker_type = WorkerType.objects.create(slug="system", display_name="Système") file_import_worker = Worker.objects.create( @@ -413,7 +429,7 @@ class TestUpdateSystemWorkers(ArkindexTestCase): """ ).strip()) - @patch("arkindex.process.management.commands.update_system_workers.parse_config", lambda: MOCK_CONFIG) + @patch("arkindex.process.management.commands.update_system_workers.parse_config", mock_config) def test_assigns_existing_compatible_versions(self): """ The command should assign the feature to an existing WorkerVersion compatible with it, @@ -576,7 +592,7 @@ class TestUpdateSystemWorkers(ArkindexTestCase): pagexml_export_version.refresh_from_db() self.check_feature_version(pagexml_export_version, ArkindexFeature.ExportPageXML) - @patch("arkindex.process.management.commands.update_system_workers.parse_config", lambda: MOCK_CONFIG) + @patch("arkindex.process.management.commands.update_system_workers.parse_config", mock_config) def test_unassigns_incompatible_versions(self): worker_type = WorkerType.objects.create(slug="systemnt", display_name="Systemn't") file_import_worker = Worker.objects.create( @@ -671,7 +687,7 @@ class TestUpdateSystemWorkers(ArkindexTestCase): Current worker version: {init_elements_version.id} (registry.gitlab.teklia.com/callico/callico:latest) This version has an invalid Docker image. This version uses a custom Docker command which could interfere with the feature. - This version requires a custom worker configuration which could interfere with the feature. + This version uses a custom configuration which could interfere with the feature. Unassigning feature from the current version Worker is up to date Creating new worker version @@ -698,7 +714,7 @@ class TestUpdateSystemWorkers(ArkindexTestCase): """ ).strip()) - @patch("arkindex.process.management.commands.update_system_workers.parse_config", lambda: MOCK_CONFIG) + @patch("arkindex.process.management.commands.update_system_workers.parse_config", mock_config) def test_noop(self): worker_type = WorkerType.objects.create(slug="system", display_name="Système") worker = Worker.objects.create(type=worker_type, slug="worker", name="Worker", public=True) @@ -790,3 +806,199 @@ class TestUpdateSystemWorkers(ArkindexTestCase): self.check_feature_version(pdf_export_version, ArkindexFeature.ExportPDF) pagexml_export_version.refresh_from_db() self.check_feature_version(pagexml_export_version, ArkindexFeature.ExportPageXML) + + @responses.activate + @patch("arkindex.process.management.commands.update_system_workers.parse_config", mock_teklia_worker_config) + def test_teklia_worker_invalid_slug(self): + responses.add( + responses.GET, + "https://gitlab.teklia.com/repository/import/file/-/raw/0.1.0/.arkindex.yml", + json={"version": 2, "workers": [{"slug": "no"}]}, + ) + with self.assertRaisesMessage(CommandError, ( + "No worker with slug file-import in .arkindex.yml at " + "https://gitlab.teklia.com/repository/import/file/-/raw/0.1.0/.arkindex.yml." + )): + self.update_system_workers() + + @responses.activate + @patch("arkindex.process.management.commands.update_system_workers.parse_config", mock_teklia_worker_config) + def test_teklia_worker_invalid_yaml(self): + responses.add( + responses.GET, + "https://gitlab.teklia.com/repository/import/file/-/raw/0.1.0/.arkindex.yml", + json={"version": 42} + ) + with self.assertRaisesMessage(CommandError, ( + "Error retrieving configuration at " + "https://gitlab.teklia.com/repository/import/file/-/raw/0.1.0/.arkindex.yml: " + "invalid YAML configuration" + )): + self.update_system_workers() + + @responses.activate + @patch("arkindex.process.management.commands.update_system_workers.parse_config", mock_teklia_worker_config) + def test_teklia_worker_from_scratch(self): + responses.add( + responses.GET, + "https://gitlab.teklia.com/repository/import/file/-/raw/0.1.0/.arkindex.yml", + json={ + "version": 2, + "workers": [{ + "slug": "file-import", + "name": "File Import", + "type": "import_type", + "description": "test", + }], + }, + ) + + with self.assertRaises(WorkerVersion.DoesNotExist): + WorkerVersion.objects.get_by_feature(ArkindexFeature.FileImport) + + output = self.update_system_workers() + worker_version = WorkerVersion.objects.get_by_feature(ArkindexFeature.FileImport) + self.assertEqual(output, dedent( + f""" + ────────────────────────────────── FileImport ────────────────────────────────── + Configuring feature Fileimport from https://gitlab.teklia.com/repository/import/file + Using registry.gitlab.teklia.com/repository/import/file/0.1.0 to provide FileImport + Created new System worker type ({worker_version.worker.type_id}) + Created new FileImport system worker + Creating new worker version + Using new worker version {worker_version.id} + """ + ).strip()) + self.assertEqual(worker_version.configuration, { + "description": "test", + "name": "File Import", + "slug": "file-import", + "type": "import_type", + }) + self.assertEqual(worker_version.docker_image_iid, "registry.gitlab.teklia.com/repository/import/file/0.1.0") + self.assertEqual(worker_version.feature, ArkindexFeature.FileImport) + self.assertEqual(worker_version.state, WorkerVersionState.Available) + self.assertEqual(worker_version.version, 1) + self.assertEqual(worker_version.docker_image_iid, "registry.gitlab.teklia.com/repository/import/file/0.1.0") + self.assertEqual(worker_version.worker.name, "FileImport") + self.assertEqual(worker_version.worker.repository_url, "https://gitlab.teklia.com/repository/import/file") + + # Retrying does not averride the version + self.assertEqual(self.update_system_workers(), dedent( + f""" + ────────────────────────────────── FileImport ────────────────────────────────── + Configuring feature Fileimport from https://gitlab.teklia.com/repository/import/file + Using registry.gitlab.teklia.com/repository/import/file/0.1.0 to provide FileImport + Current worker version: {worker_version.id} (registry.gitlab.teklia.com/repository/import/file/0.1.0) + Worker is up to date + Worker version for FileImport is up to date + """ + ).strip()) + + @responses.activate + @patch("arkindex.process.management.commands.update_system_workers.parse_config", mock_teklia_worker_config) + def test_teklia_worker_unassign(self): + worker_type = WorkerType.objects.create(slug="system", display_name="Système") + file_import_worker = Worker.objects.create( + type=worker_type, + slug="file_import", + name="Sir File-a-Lot", + public=True, + ) + version = file_import_worker.versions.create( + version=1, + docker_image_iid="test", + state=WorkerVersionState.Available, + feature=ArkindexFeature.FileImport, + ) + responses.add( + responses.GET, + "https://gitlab.teklia.com/repository/import/file/-/raw/0.1.0/.arkindex.yml", + json={ + "version": 2, + "workers": [{ + "slug": "file-import", + "name": "File Import", + "type": "import_type", + "description": "test", + }], + }, + ) + + output = self.update_system_workers() + worker_version = WorkerVersion.objects.get_by_feature(ArkindexFeature.FileImport) + self.assertEqual(output, dedent( + f""" + ────────────────────────────────── FileImport ────────────────────────────────── + Configuring feature Fileimport from https://gitlab.teklia.com/repository/import/file + Using registry.gitlab.teklia.com/repository/import/file/0.1.0 to provide FileImport + Current worker version: {version.id} (test) + This version has an invalid Docker image. + This version uses a custom configuration which could interfere with the feature. + Unassigning feature from the current version + Worker is up to date + Creating new worker version + Using new worker version {worker_version.id} + """ + ).strip()) + self.assertEqual(worker_version.configuration, { + "description": "test", + "name": "File Import", + "slug": "file-import", + "type": "import_type", + }) + self.assertEqual(worker_version.docker_image_iid, "registry.gitlab.teklia.com/repository/import/file/0.1.0") + self.assertEqual(worker_version.feature, ArkindexFeature.FileImport) + self.assertEqual(worker_version.state, WorkerVersionState.Available) + self.assertEqual(worker_version.version, 2) + self.assertEqual(worker_version.worker.name, "Sir File-a-Lot") + self.assertEqual(worker_version.worker.repository_url, None) + + @responses.activate + @patch("arkindex.process.management.commands.update_system_workers.parse_config", mock_teklia_worker_config) + def test_teklia_worker_existing(self): + worker_type = WorkerType.objects.create(slug="system", display_name="Système") + worker_conf = { + "slug": "file-import", + "name": "File Import", + "type": "import_type", + "description": "test", + } + file_import_worker = Worker.objects.create( + type=worker_type, + slug="file_import", + name="Sir File-a-Lot", + public=True, + ) + version = file_import_worker.versions.create( + version=1, + docker_image_iid="registry.gitlab.teklia.com/repository/import/file/0.1.0", + state=WorkerVersionState.Available, + feature=ArkindexFeature.FileImport, + configuration=worker_conf, + ) + responses.add( + responses.GET, + "https://gitlab.teklia.com/repository/import/file/-/raw/0.1.0/.arkindex.yml", + json={"version": 2, "workers": [worker_conf]}, + ) + + output = self.update_system_workers() + self.assertEqual(WorkerVersion.objects.count(), 1) + worker_version = WorkerVersion.objects.get_by_feature(ArkindexFeature.FileImport) + self.assertEqual(output, dedent( + f""" + ────────────────────────────────── FileImport ────────────────────────────────── + Configuring feature Fileimport from https://gitlab.teklia.com/repository/import/file + Using registry.gitlab.teklia.com/repository/import/file/0.1.0 to provide FileImport + Current worker version: {version.id} (registry.gitlab.teklia.com/repository/import/file/0.1.0) + Worker is up to date + Worker version for FileImport is up to date + """ + ).strip()) + self.assertEqual(worker_version.docker_image_iid, "registry.gitlab.teklia.com/repository/import/file/0.1.0") + self.assertEqual(worker_version.feature, ArkindexFeature.FileImport) + self.assertEqual(worker_version.state, WorkerVersionState.Available) + self.assertEqual(worker_version.version, 1) + self.assertEqual(worker_version.worker.name, "Sir File-a-Lot") + self.assertEqual(worker_version.worker.repository_url, None) diff --git a/arkindex/system_workers.yml b/arkindex/system_workers.yml index 4b50def7e9..ccd29e3d69 100644 --- a/arkindex/system_workers.yml +++ b/arkindex/system_workers.yml @@ -12,11 +12,17 @@ features: s3_ingest: image: registry.gitlab.teklia.com/arkindex/workers/import/s3:0.2.0-rc2 pdf_export: - image: registry.gitlab.teklia.com/arkindex/workers/export:0.2.1-rc1 - command: worker-export-pdf + teklia_worker: + name: arkindex/workers/export + version: 0.2.1-rc1 + slug: pdf-export pagexml_export: - image: registry.gitlab.teklia.com/arkindex/workers/export:0.2.1-rc1 - command: worker-export-pagexml + teklia_worker: + name: arkindex/workers/export + version: 0.2.1-rc1 + slug: pagexml-export docx_export: - image: registry.gitlab.teklia.com/arkindex/workers/export:0.2.1-rc1 - command: worker-export-docx + teklia_worker: + name: arkindex/workers/export + version: 0.2.1-rc1 + slug: docx-export -- GitLab