diff --git a/MANIFEST.in b/MANIFEST.in index cdd0d8a105feab5edc128c1e1db0263e4b140716..82d7694773df8ed75a694c4691c3e7b9d042378f 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -7,3 +7,4 @@ recursive-include arkindex/templates *.html recursive-include arkindex/templates *.json recursive-include arkindex/templates *.txt include arkindex/documents/export/*.sql +include arkindex/system_workers.yml diff --git a/arkindex/process/management/commands/update_system_workers.py b/arkindex/process/management/commands/update_system_workers.py new file mode 100644 index 0000000000000000000000000000000000000000..37d5467b5c975709149d8c1f2977156030c23d94 --- /dev/null +++ b/arkindex/process/management/commands/update_system_workers.py @@ -0,0 +1,184 @@ +from django.conf import settings +from django.core.management.base import BaseCommand, CommandError +from django.db import transaction +from django.db.models import Max +from teklia_toolbox.config import ConfigParser + +from arkindex.process.models import ArkindexFeature, FeatureUsage, Worker, WorkerType, WorkerVersion, WorkerVersionState + + +def parse_config(): + parser = ConfigParser() + parser.add_option("version", type=str) + + feature_parser = parser.add_subparser("features") + for feature in ArkindexFeature: + # Remove this after file imports are migrated to a worker + if feature == ArkindexFeature.FileImport: + continue + + feature_parser.add_option(feature.value, type=str) + + return parser.parse(settings.BASE_DIR / "system_workers.yml") + + +class Command(BaseCommand): + help = "Update the workers used to provide Arkindex features to the versions compatible with this release." + + def get_system_worker(self, feature: ArkindexFeature) -> Worker: + """ + Update or create a `system` worker for this feature. + Creates the `system` worker type if it does not exist. + """ + try: + conflicting_worker = Worker.objects.exclude(type__slug="system").filter(slug=feature.value).get() + self.stdout.write(f"Worker {conflicting_worker.name} ({conflicting_worker.id}) uses the same slug with a different worker type.") + self.stdout.write("Please update this worker's type to `system` or change its slug.") + raise CommandError(f"Cannot create system worker with slug {feature.value} due to a conflict.") + except Worker.DoesNotExist: + pass + + worker_type, created = WorkerType.objects.get_or_create( + slug="system", + defaults={"display_name": "System"}, + ) + + if created: + self.stdout.write(f"Created new System worker type ({worker_type.id})") + + worker, created = Worker.objects.get_or_create( + type=worker_type, + slug=feature.value, + defaults={ + "name": feature.name, + "public": True, + }, + ) + + if created: + self.stdout.write(f"Created new {worker.name} system worker") + else: + self.stdout.write(f"Using existing system worker {worker.name}") + self.update_existing_worker(worker) + + return worker + + def update_existing_worker(self, worker: Worker) -> None: + """ + Check that an existing worker has the correct attributes to have one of its versions provide a feature + """ + updated = False + + if worker.archived is not None: + self.stdout.write("Unarchiving worker") + worker.archived = None + updated = True + + if not worker.public: + self.stdout.write("Marking worker as public") + worker.public = True + updated = True + + if updated: + worker.save() + else: + self.stdout.write("Worker is up to date") + + def update_or_create_version(self, worker: Worker, feature: ArkindexFeature, docker_image: str) -> None: + """ + On a specified worker, assigns an existing version to a feature or creates a new one. + Expects that no version is already assigned to this feature on any worker. + """ + assert worker.archived is None, "Cannot assign a version on an archived worker to a feature" + + # docker_image_iid is not unique, so we sort by state and take the most recent one. + # Sorting by state means we prefer `available` versions first, then `created`, then `error`. + worker_version = worker.versions.filter( + docker_image_iid=docker_image, + # We ignore existing versions with attributes that could interfere with the features, + # like blocking a start or retry, or wasting resources. + configuration__docker__command__isnull=True, + configuration__user_configuration__isnull=True, + gpu_usage=FeatureUsage.Disabled, + model_usage=FeatureUsage.Disabled, + ).order_by("state", "-updated").first() + + if worker_version: + self.stdout.write(f"Assigning existing worker version {worker_version.id} to the feature") + worker_version.feature = feature + + if worker_version.state != WorkerVersionState.Available: + self.stdout.write("Marking the worker version as available") + worker_version.state = WorkerVersionState.Available + + worker_version.save() + self.stdout.write(self.style.SUCCESS(f"Using existing worker version {worker_version.id}")) + return + + self.stdout.write("Creating new worker version") + max_version = worker.versions.aggregate(max_version=Max("version"))["max_version"] or 0 + worker_version = worker.versions.create( + docker_image_iid=docker_image, + feature=feature, + state=WorkerVersionState.Available, + version=max_version + 1, + ) + self.stdout.write(self.style.SUCCESS(f"Using new worker version {worker_version.id}")) + + def check_existing_version(self, worker_version: WorkerVersion, feature: ArkindexFeature, docker_image: str) -> None: + self.stdout.write(f"Current worker version: {worker_version.id} ({worker_version.docker_image_iid})") + valid = True + + if worker_version.docker_image_iid != docker_image: + self.stdout.write(self.style.WARNING("This version has an invalid Docker image.")) + valid = False + + if worker_version.worker.archived is not None: + self.stdout.write(self.style.WARNING("This version is part of an archived worker.")) + valid = False + + if worker_version.configuration.get("docker", {}).get("command"): + self.stdout.write(self.style.WARNING("This version uses a custom Docker command which could interfere with the feature.")) + valid = False + + if worker_version.required_user_configuration_fields: + self.stdout.write(self.style.WARNING("This version requires a custom worker configuration which could interfere with the feature.")) + valid = False + + if valid: + # Ensure the worker is public, and keep using this worker version + self.update_existing_worker(worker_version.worker) + self.stdout.write(self.style.SUCCESS(f"Worker version for {feature.name} is up to date")) + return + + self.stdout.write("Unassigning feature from the current version") + worker_version.feature = None + worker_version.save(update_fields=["feature"]) + + # Create a new version on the existing worker, even if it isn't a system worker + worker = worker_version.worker + if worker.archived is not None: + # except if it is archived, since the new version would be invalid + worker = self.get_system_worker(feature) + else: + self.update_existing_worker(worker_version.worker) + + self.update_or_create_version(worker, feature, docker_image) + + @transaction.atomic + def update_feature(self, feature: ArkindexFeature, docker_image: str): + self.stdout.write(f" {feature.name} ".center(80, "─")) + self.stdout.write(f"Using {docker_image} to provide {feature.name}") + + try: + worker_version = WorkerVersion.objects.get_by_feature(feature) + self.check_existing_version(worker_version, feature, docker_image) + except WorkerVersion.DoesNotExist: + worker = self.get_system_worker(feature) + self.update_or_create_version(worker, feature, docker_image) + + def handle(self, *args, **options): + config = parse_config() + for feature_value, docker_image in config["features"].items(): + feature = ArkindexFeature(feature_value) + self.update_feature(feature, docker_image) diff --git a/arkindex/process/models.py b/arkindex/process/models.py index 8c618e980f0e230efb708d57111fafe7471efd91..2f18d47bbdbe74c307dfa2ed66504866ac2f0d53 100644 --- a/arkindex/process/models.py +++ b/arkindex/process/models.py @@ -780,6 +780,22 @@ class WorkerVersion(models.Model): return return self.configuration["docker"].get("shm_size") + @property + def required_user_configuration_fields(self): + """ + Set of field names defined in this worker version's user configuration + which are required and without a default value. + + A worker configuration will need to be set on the worker run with those fields set + before this worker version can be executed. + """ + return set( + name + for name, options in self.configuration.get("user_configuration", {}).items() + # Only pick the fields that are required and without default values + if options.get("required") and "default" not in options + ) + class WorkerConfiguration(IndexableModel): name = models.CharField(max_length=250, validators=[MinLengthValidator(1)]) diff --git a/arkindex/process/serializers/imports.py b/arkindex/process/serializers/imports.py index 2b3dd034d01ec6b8f26e9d929bae4ef89b4828df..6acda06468f8b55af009f22c2348cc4f680901c8 100644 --- a/arkindex/process/serializers/imports.py +++ b/arkindex/process/serializers/imports.py @@ -415,20 +415,13 @@ class StartProcessSerializer(serializers.Serializer): # If the worker version has a user configuration, check that we aren't missing any required fields if isinstance(worker_run.version.configuration.get("user_configuration"), dict): - required_fields = set( - name - for name, options in worker_run.version.configuration["user_configuration"].items() - # Only pick the fields that are required and without default values - if options.get("required") and "default" not in options - ) - # List all the fields defined on the WorkerRun's configuration if there is one worker_run_fields = set() if worker_run.configuration is not None: worker_run_fields = set(worker_run.configuration.configuration.keys()) # Check that all the worker version's required fields are set in that configuration - if not required_fields.issubset(worker_run_fields): + if not worker_run.version.required_user_configuration_fields.issubset(worker_run_fields): missing_required_configurations.append(worker_run.version.worker.name) if len(missing_model_versions) > 0: diff --git a/arkindex/process/tests/commands/test_update_system_workers.py b/arkindex/process/tests/commands/test_update_system_workers.py new file mode 100644 index 0000000000000000000000000000000000000000..e3b4849c60a447470b2f23ace9df70e1cb933e2c --- /dev/null +++ b/arkindex/process/tests/commands/test_update_system_workers.py @@ -0,0 +1,479 @@ +from io import StringIO +from textwrap import dedent +from unittest.mock import patch + +from django.core.management import CommandError, call_command +from django.utils import timezone + +from arkindex.process.models import ArkindexFeature, FeatureUsage, Worker, WorkerType, WorkerVersion, WorkerVersionState +from arkindex.project.tests import ArkindexTestCase + +MOCK_CONFIG = { + "features": { + "init_elements": "registry.example.com/init-elements:latest", + "s3_ingest": "registry.example.com/s3-ingest:latest", + } +} + + +class TestUpdateSystemWorkers(ArkindexTestCase): + + def update_system_workers(self): + output = StringIO() + call_command( + "update_system_workers", + ["--no-color"], + stdout=output, + stderr=output, + ) + return output.getvalue().strip() + + def check_feature_version(self, worker_version, feature): + """ + Assert that a WorkerVersion has the expected attributes to provide a given feature + """ + self.assertEqual(worker_version.feature, feature) + self.assertEqual(worker_version.docker_image_iid, MOCK_CONFIG["features"][feature.value]) + self.assertEqual(worker_version.state, WorkerVersionState.Available) + self.assertNotIn("command", worker_version.configuration.get("docker", {})) + self.assertEqual(worker_version.required_user_configuration_fields, set()) + self.assertEqual(worker_version.gpu_usage, FeatureUsage.Disabled) + self.assertEqual(worker_version.model_usage, FeatureUsage.Disabled) + + self.assertIsNone(worker_version.worker.archived) + self.assertTrue(worker_version.worker.public) + + @patch("arkindex.process.management.commands.update_system_workers.parse_config", lambda: MOCK_CONFIG) + def test_from_scratch(self): + output = self.update_system_workers() + + worker_type = WorkerType.objects.get() + self.assertEqual(worker_type.slug, "system") + self.assertEqual(worker_type.display_name, "System") + + init_elements_version = WorkerVersion.objects.get_by_feature(ArkindexFeature.InitElements) + self.check_feature_version(init_elements_version, ArkindexFeature.InitElements) + + s3_ingest_version = WorkerVersion.objects.get_by_feature(ArkindexFeature.S3Ingest) + self.check_feature_version(s3_ingest_version, ArkindexFeature.S3Ingest) + + # Check the attributes on the two new workers + self.assertEqual(init_elements_version.worker.name, "InitElements") + self.assertEqual(init_elements_version.worker.slug, "init_elements") + self.assertEqual(init_elements_version.worker.type, worker_type) + self.assertEqual(s3_ingest_version.worker.name, "S3Ingest") + self.assertEqual(s3_ingest_version.worker.slug, "s3_ingest") + self.assertEqual(s3_ingest_version.worker.type, worker_type) + + self.assertEqual(output, dedent( + f""" + ───────────────────────────────── InitElements ───────────────────────────────── + Using registry.example.com/init-elements:latest to provide InitElements + Created new System worker type ({worker_type.id}) + Created new InitElements system worker + Creating new worker version + Using new worker version {init_elements_version.id} + ─────────────────────────────────── S3Ingest ─────────────────────────────────── + Using registry.example.com/s3-ingest:latest to provide S3Ingest + Created new S3Ingest system worker + Creating new worker version + Using new worker version {s3_ingest_version.id} + """ + ).strip()) + + @patch("arkindex.process.management.commands.update_system_workers.parse_config", lambda: MOCK_CONFIG) + def test_existing_worker_type(self): + worker_type = WorkerType.objects.create(slug="system", display_name="Système") + + output = self.update_system_workers() + + self.assertEqual(WorkerType.objects.count(), 1) + init_elements_version = WorkerVersion.objects.get_by_feature(ArkindexFeature.InitElements) + self.check_feature_version(init_elements_version, ArkindexFeature.InitElements) + + s3_ingest_version = WorkerVersion.objects.get_by_feature(ArkindexFeature.S3Ingest) + self.check_feature_version(s3_ingest_version, ArkindexFeature.S3Ingest) + + # Check the attributes on the two new workers + self.assertEqual(init_elements_version.worker.name, "InitElements") + self.assertEqual(init_elements_version.worker.slug, "init_elements") + self.assertEqual(init_elements_version.worker.type, worker_type) + self.assertEqual(s3_ingest_version.worker.name, "S3Ingest") + self.assertEqual(s3_ingest_version.worker.slug, "s3_ingest") + self.assertEqual(s3_ingest_version.worker.type, worker_type) + + self.assertEqual(output, dedent( + f""" + ───────────────────────────────── InitElements ───────────────────────────────── + Using registry.example.com/init-elements:latest to provide InitElements + Created new InitElements system worker + Creating new worker version + Using new worker version {init_elements_version.id} + ─────────────────────────────────── S3Ingest ─────────────────────────────────── + Using registry.example.com/s3-ingest:latest to provide S3Ingest + Created new S3Ingest system worker + Creating new worker version + Using new worker version {s3_ingest_version.id} + """ + ).strip()) + + @patch("arkindex.process.management.commands.update_system_workers.parse_config", lambda: MOCK_CONFIG) + def test_existing_system_workers(self): + worker_type = WorkerType.objects.create(slug="system", display_name="Système") + init_elements_worker = Worker.objects.create( + type=worker_type, + slug="init_elements", + name="Init of the Elements", + public=True, + ) + s3_ingest_worker = Worker.objects.create( + type=worker_type, + slug="s3_ingest", + name="S3, Ingest of the", + public=True, + ) + + output = self.update_system_workers() + + self.assertEqual(WorkerType.objects.count(), 1) + self.assertEqual(Worker.objects.count(), 2) + + init_elements_version = WorkerVersion.objects.get_by_feature(ArkindexFeature.InitElements) + self.assertEqual(init_elements_version.worker, init_elements_worker) + self.check_feature_version(init_elements_version, ArkindexFeature.InitElements) + + s3_ingest_version = WorkerVersion.objects.get_by_feature(ArkindexFeature.S3Ingest) + self.assertEqual(s3_ingest_version.worker, s3_ingest_worker) + self.check_feature_version(s3_ingest_version, ArkindexFeature.S3Ingest) + + self.assertEqual(output, dedent( + f""" + ───────────────────────────────── InitElements ───────────────────────────────── + Using registry.example.com/init-elements:latest to provide InitElements + Using existing system worker Init of the Elements + Worker is up to date + Creating new worker version + Using new worker version {init_elements_version.id} + ─────────────────────────────────── S3Ingest ─────────────────────────────────── + Using registry.example.com/s3-ingest:latest to provide S3Ingest + Using existing system worker S3, Ingest of the + Worker is up to date + Creating new worker version + Using new worker version {s3_ingest_version.id} + """ + ).strip()) + + @patch("arkindex.process.management.commands.update_system_workers.parse_config", lambda: MOCK_CONFIG) + def test_worker_slug_conflict(self): + worker_type = WorkerType.objects.create(slug="systemnt", display_name="Systemn't") + + worker = Worker.objects.create( + type=worker_type, + slug="init_elements", + name="Troublemaker", + ) + + output = StringIO() + with self.assertRaisesMessage(CommandError, "Cannot create system worker with slug init_elements due to a conflict."): + call_command( + "update_system_workers", + ["--no-color"], + stdout=output, + stderr=output, + ) + + self.assertEqual(output.getvalue().strip(), dedent( + f""" + ───────────────────────────────── InitElements ───────────────────────────────── + Using registry.example.com/init-elements:latest to provide InitElements + Worker Troublemaker ({worker.id}) uses the same slug with a different worker type. + Please update this worker's type to `system` or change its slug. + """ + ).strip()) + + @patch("arkindex.process.management.commands.update_system_workers.parse_config", lambda: MOCK_CONFIG) + def test_update_existing_system_workers(self): + worker_type = WorkerType.objects.create(slug="system", display_name="Système") + init_elements_worker = Worker.objects.create( + type=worker_type, + slug="init_elements", + name="Init of the Elements", + # This worker will be made public + public=False, + ) + s3_ingest_worker = Worker.objects.create( + type=worker_type, + slug="s3_ingest", + name="S3, Ingest of the", + public=True, + # This worker will be unarchived + archived=timezone.now(), + ) + + output = self.update_system_workers() + + self.assertEqual(WorkerType.objects.count(), 1) + self.assertEqual(Worker.objects.count(), 2) + + init_elements_worker.refresh_from_db() + self.assertEqual(init_elements_worker.name, "Init of the Elements") + self.assertEqual(init_elements_worker.slug, "init_elements") + self.assertEqual(init_elements_worker.type, worker_type) + self.assertTrue(init_elements_worker.public) + self.assertIsNone(init_elements_worker.archived) + + s3_ingest_worker.refresh_from_db() + self.assertEqual(s3_ingest_worker.name, "S3, Ingest of the") + self.assertEqual(s3_ingest_worker.slug, "s3_ingest") + self.assertEqual(s3_ingest_worker.type, worker_type) + self.assertTrue(s3_ingest_worker.public) + self.assertIsNone(s3_ingest_worker.archived) + + init_elements_version = WorkerVersion.objects.get_by_feature(ArkindexFeature.InitElements) + self.assertEqual(init_elements_version.worker, init_elements_worker) + self.check_feature_version(init_elements_version, ArkindexFeature.InitElements) + + s3_ingest_version = WorkerVersion.objects.get_by_feature(ArkindexFeature.S3Ingest) + self.assertEqual(s3_ingest_version.worker, s3_ingest_worker) + self.check_feature_version(s3_ingest_version, ArkindexFeature.S3Ingest) + + self.assertEqual(output, dedent( + f""" + ───────────────────────────────── InitElements ───────────────────────────────── + Using registry.example.com/init-elements:latest to provide InitElements + Using existing system worker Init of the Elements + Marking worker as public + Creating new worker version + Using new worker version {init_elements_version.id} + ─────────────────────────────────── S3Ingest ─────────────────────────────────── + Using registry.example.com/s3-ingest:latest to provide S3Ingest + Using existing system worker S3, Ingest of the + Unarchiving worker + Creating new worker version + Using new worker version {s3_ingest_version.id} + """ + ).strip()) + + @patch("arkindex.process.management.commands.update_system_workers.parse_config", lambda: MOCK_CONFIG) + def test_assigns_existing_compatible_versions(self): + """ + The command should assign the feature to an existing WorkerVersion compatible with it, + marking the version as available if necessary + """ + worker_type = WorkerType.objects.create(slug="system", display_name="Système") + init_elements_worker = Worker.objects.create( + type=worker_type, + slug="init_elements", + name="Init of the Elements", + public=True, + ) + s3_ingest_worker = Worker.objects.create( + type=worker_type, + slug="s3_ingest", + name="S3, Ingest of the", + public=True, + ) + + # Those versions use the correct image, but do not have acceptable attributes + init_elements_worker.versions.create( + docker_image_iid="registry.example.com/init-elements:latest", + version=1, + configuration={"docker": {"command": "exit"}}, + ) + init_elements_worker.versions.create( + docker_image_iid="registry.example.com/init-elements:latest", + version=2, + configuration={ + "user_configuration": { + "nobody_expects_the": { + "title": "required configuration", + "type": "int", + "required": True, + } + }, + }, + ) + init_elements_worker.versions.create( + docker_image_iid="registry.example.com/init-elements:latest", + version=3, + gpu_usage=FeatureUsage.Required, + ) + init_elements_worker.versions.create( + docker_image_iid="registry.example.com/init-elements:latest", + version=4, + model_usage=FeatureUsage.Required, + ) + + # These versions should be assigned to the features + init_elements_version = init_elements_worker.versions.create( + docker_image_iid="registry.example.com/init-elements:latest", + version=5, + state=WorkerVersionState.Available, + ) + s3_ingest_version = s3_ingest_worker.versions.create( + docker_image_iid="registry.example.com/s3-ingest:latest", + version=1, + state=WorkerVersionState.Created, + ) + + self.assertEqual(self.update_system_workers(), dedent( + f""" + ───────────────────────────────── InitElements ───────────────────────────────── + Using registry.example.com/init-elements:latest to provide InitElements + Using existing system worker Init of the Elements + Worker is up to date + Assigning existing worker version {init_elements_version.id} to the feature + Using existing worker version {init_elements_version.id} + ─────────────────────────────────── S3Ingest ─────────────────────────────────── + Using registry.example.com/s3-ingest:latest to provide S3Ingest + Using existing system worker S3, Ingest of the + Worker is up to date + Assigning existing worker version {s3_ingest_version.id} to the feature + Marking the worker version as available + Using existing worker version {s3_ingest_version.id} + """ + ).strip()) + + self.assertEqual(WorkerType.objects.count(), 1) + self.assertEqual(Worker.objects.count(), 2) + self.assertEqual(WorkerVersion.objects.count(), 6) + + self.assertEqual(WorkerVersion.objects.get_by_feature(ArkindexFeature.InitElements), init_elements_version) + self.assertEqual(WorkerVersion.objects.get_by_feature(ArkindexFeature.S3Ingest), s3_ingest_version) + + init_elements_version.refresh_from_db() + self.check_feature_version(init_elements_version, ArkindexFeature.InitElements) + s3_ingest_version.refresh_from_db() + self.check_feature_version(s3_ingest_version, ArkindexFeature.S3Ingest) + + @patch("arkindex.process.management.commands.update_system_workers.parse_config", lambda: MOCK_CONFIG) + def test_unassigns_incompatible_versions(self): + worker_type = WorkerType.objects.create(slug="systemnt", display_name="Systemn't") + init_elements_worker = Worker.objects.create( + type=worker_type, + slug="init_elements", + name="Init of the Elements", + public=True, + ) + s3_ingest_worker = Worker.objects.create( + type=worker_type, + slug="s3_digest", + name="S3Digest", + public=True, + # This isn't a system worker, so this worker being archived means the command + # should create a new system worker instead of reusing this one + archived=timezone.now(), + ) + + init_elements_version = init_elements_worker.versions.create( + version=1, + feature=ArkindexFeature.InitElements, + docker_image_iid="registry.gitlab.teklia.com/callico/callico:latest", + state=WorkerVersionState.Available, + configuration={ + "docker": { + "command": "false", + }, + "user_configuration": { + "nobody_expects_the": { + "title": "required configuration", + "type": "int", + "required": True, + }, + }, + }, + ) + # Everything is valid for this version, but it is on an archived worker + s3_ingest_version = s3_ingest_worker.versions.create( + feature=ArkindexFeature.S3Ingest, + docker_image_iid="registry.example.com/s3-ingest:latest", + version=1, + state=WorkerVersionState.Available, + ) + + output = self.update_system_workers() + + system_worker_type = WorkerType.objects.exclude(slug=worker_type.slug).get() + self.assertEqual(system_worker_type.slug, "system") + self.assertEqual(system_worker_type.display_name, "System") + + self.assertEqual(Worker.objects.count(), 3) + new_s3_ingest_worker = Worker.objects.exclude(id__in=[init_elements_worker.id, s3_ingest_worker.id]).get() + self.assertEqual(new_s3_ingest_worker.type, system_worker_type) + self.assertEqual(new_s3_ingest_worker.slug, "s3_ingest") + self.assertEqual(new_s3_ingest_worker.name, "S3Ingest") + + self.assertEqual(WorkerVersion.objects.count(), 4) + new_init_elements_version = init_elements_worker.versions.get(version=2) + self.check_feature_version(new_init_elements_version, ArkindexFeature.InitElements) + new_s3_ingest_version = new_s3_ingest_worker.versions.get() + self.check_feature_version(new_s3_ingest_version, ArkindexFeature.S3Ingest) + + self.assertEqual(output, dedent( + f""" + ───────────────────────────────── InitElements ───────────────────────────────── + Using registry.example.com/init-elements:latest to provide InitElements + Current worker version: {init_elements_version.id} ({init_elements_version.docker_image_iid}) + This version has an invalid Docker image. + This version uses a custom Docker command which could interfere with the feature. + This version requires a custom worker configuration which could interfere with the feature. + Unassigning feature from the current version + Worker is up to date + Creating new worker version + Using new worker version {new_init_elements_version.id} + ─────────────────────────────────── S3Ingest ─────────────────────────────────── + Using registry.example.com/s3-ingest:latest to provide S3Ingest + Current worker version: {s3_ingest_version.id} (registry.example.com/s3-ingest:latest) + This version is part of an archived worker. + Unassigning feature from the current version + Created new System worker type ({system_worker_type.id}) + Created new S3Ingest system worker + Creating new worker version + Using new worker version {new_s3_ingest_version.id} + """ + ).strip()) + + @patch("arkindex.process.management.commands.update_system_workers.parse_config", lambda: MOCK_CONFIG) + def test_noop(self): + worker_type = WorkerType.objects.create(slug="system", display_name="Système") + worker = Worker.objects.create(type=worker_type, slug="worker", name="Worker", public=True) + + init_elements_version = worker.versions.create( + feature=ArkindexFeature.InitElements, + docker_image_iid="registry.example.com/init-elements:latest", + version=1, + state=WorkerVersionState.Available, + ) + s3_ingest_version = worker.versions.create( + feature=ArkindexFeature.S3Ingest, + docker_image_iid="registry.example.com/s3-ingest:latest", + version=2, + state=WorkerVersionState.Available, + ) + + self.assertEqual(self.update_system_workers(), dedent( + f""" + ───────────────────────────────── InitElements ───────────────────────────────── + Using registry.example.com/init-elements:latest to provide InitElements + Current worker version: {init_elements_version.id} (registry.example.com/init-elements:latest) + Worker is up to date + Worker version for InitElements is up to date + ─────────────────────────────────── S3Ingest ─────────────────────────────────── + Using registry.example.com/s3-ingest:latest to provide S3Ingest + Current worker version: {s3_ingest_version.id} (registry.example.com/s3-ingest:latest) + Worker is up to date + Worker version for S3Ingest is up to date + """ + ).strip()) + + self.assertEqual(WorkerType.objects.count(), 1) + self.assertEqual(Worker.objects.count(), 1) + self.assertEqual(WorkerVersion.objects.count(), 2) + + self.assertEqual(WorkerVersion.objects.get_by_feature(ArkindexFeature.InitElements), init_elements_version) + self.assertEqual(WorkerVersion.objects.get_by_feature(ArkindexFeature.S3Ingest), s3_ingest_version) + + init_elements_version.refresh_from_db() + self.check_feature_version(init_elements_version, ArkindexFeature.InitElements) + s3_ingest_version.refresh_from_db() + self.check_feature_version(s3_ingest_version, ArkindexFeature.S3Ingest) diff --git a/arkindex/project/checks.py b/arkindex/project/checks.py index 993b21996bdc1c876a3d70368a270f84dc4e6fd2..c9eebd2ba075697e84614f6171bbf42bf1263101 100644 --- a/arkindex/project/checks.py +++ b/arkindex/project/checks.py @@ -332,3 +332,21 @@ def ingest_version_check(*args, **kwargs): )] return [] + + +@register() +def update_system_workers_check(*args, **kwargs): + from django.conf import settings + + from arkindex.process.management.commands.update_system_workers import parse_config + + if settings.VERSION == parse_config()["version"]: + return [] + + return [Error( + "The Docker images specified for workers providing features in `update_system_workers` " + f"have not been verified as up to date for Arkindex version {settings.VERSION}.", + hint="Update `arkindex/system_workers.yml` to the latest versions of the feature workers, " + f"then update its `version` to {settings.VERSION!r}.", + id="arkindex.E012", + )] diff --git a/arkindex/project/tests/__init__.py b/arkindex/project/tests/__init__.py index 968d84802a6707807c5a8e8959b56dfaa987da4e..e7e502d818eb3bce01948aa839f96366feea492f 100644 --- a/arkindex/project/tests/__init__.py +++ b/arkindex/project/tests/__init__.py @@ -110,22 +110,11 @@ class _AssertExactQueriesContext(CaptureQueriesContext): self.test_case.assertEqual(expected_sql, actual_sql) -class FixtureMixin: +class ArkindexTestMixin: """ - Add the database fixture to a test case + Adds various custom extensions to test cases """ - fixtures = ["data.json", ] - - @classmethod - def setUpTestData(cls): - super().setUpTestData() - cls.corpus = Corpus.objects.get(name="Unit Tests") - cls.user = User.objects.get(email="user@user.fr") - cls.group = Group.objects.get(name="User group") - cls.superuser = User.objects.get(email="root@root.fr") - cls.imgsrv = ImageServer.objects.get(url="http://server") - def clear_caches(self): # Clean content type cache for SQL requests checks consistency ContentType.objects.clear_cache() @@ -212,15 +201,44 @@ class FixtureMixin: func(*args, **kwargs) -class FixtureTestCase(FixtureMixin, TestCase): +class FixtureMixin: + """ + Add the database fixture to a test case + """ + + fixtures = ["data.json", ] + + @classmethod + def setUpTestData(cls): + super().setUpTestData() + cls.corpus = Corpus.objects.get(name="Unit Tests") + cls.user = User.objects.get(email="user@user.fr") + cls.group = Group.objects.get(name="User group") + cls.superuser = User.objects.get(email="root@root.fr") + cls.imgsrv = ImageServer.objects.get(url="http://server") + + +class ArkindexTestCase(ArkindexTestMixin, TestCase): + """ + Django test case with custom Arkindex extensions + """ + + +class ArkindexAPITestCase(ArkindexTestMixin, APITestCase): + """ + Django REST Framework test case with custom Arkindex extensions + """ + + +class FixtureTestCase(FixtureMixin, ArkindexTestCase): """ - Django test case with the database fixture + Django test case with the database fixture and custom Arkindex extensions """ -class FixtureAPITestCase(FixtureMixin, APITestCase): +class FixtureAPITestCase(FixtureMixin, ArkindexAPITestCase): """ - Django REST Framework test case with the database fixture + Django REST Framework test case with the database fixture and custom Arkindex extensions """ diff --git a/arkindex/project/tests/test_checks.py b/arkindex/project/tests/test_checks.py index ea9723e359558853fe6b87bba272f4d47b6f9371..c392fbaef75034531834b5c94c7625665ec95a58 100644 --- a/arkindex/project/tests/test_checks.py +++ b/arkindex/project/tests/test_checks.py @@ -457,3 +457,25 @@ class ChecksTestCase(TestCase): from arkindex.project.checks import ingest_version_check self.assertListEqual(ingest_version_check(), []) + + def test_update_system_workers_check(self): + """ + The Docker images in the `arkindex update_system_workers` must be manually verified + as matching the current Arkindex release. + When the tests run, running the check should never return anything. + """ + from arkindex.project.checks import update_system_workers_check + + self.assertListEqual(update_system_workers_check(), []) + + @override_settings(VERSION="0.0.0") + def test_update_system_workers_check_error(self): + from arkindex.project.checks import update_system_workers_check + + self.assertListEqual(update_system_workers_check(), [Error( + "The Docker images specified for workers providing features in `update_system_workers` " + "have not been verified as up to date for Arkindex version 0.0.0.", + hint="Update `arkindex/system_workers.yml` to the latest versions of the feature workers, " + "then update its `version` to '0.0.0'.", + id="arkindex.E012", + )]) diff --git a/arkindex/system_workers.yml b/arkindex/system_workers.yml new file mode 100644 index 0000000000000000000000000000000000000000..7064a649d6a8bc03f9376f2a62eb3843c3d01e55 --- /dev/null +++ b/arkindex/system_workers.yml @@ -0,0 +1,8 @@ +# When releasing Arkindex, check that the Docker images set here are up to date, +# then update the `version` to the current Arkindex version as set in the `VERSION` file +# to confirm that the images have been manually checked. +version: 1.6.3-beta2 + +features: + init_elements: registry.gitlab.teklia.com/arkindex/workers/init-elements:0.1.0 + s3_ingest: registry.gitlab.teklia.com/arkindex/workers/import/s3:0.1.0