diff --git a/arkindex/documents/fixtures/data.json b/arkindex/documents/fixtures/data.json index 313c927cb7beed72a1e5722aed811dd5dc11107c..5d7bba2fdb6313aa34af9c179a3d373e393f84ae 100644 --- a/arkindex/documents/fixtures/data.json +++ b/arkindex/documents/fixtures/data.json @@ -249,6 +249,7 @@ "docker_image_iid": "registry.somewhere.com/something:latest", "revision_url": null, "tag": null, + "feature": null, "created": "2020-02-02T01:23:45.678Z", "updated": "2020-02-02T01:23:45.678Z" } @@ -270,6 +271,7 @@ "docker_image_iid": "registry.gitlab.teklia.com/arkindex/workers/init-elements:latest", "revision_url": null, "tag": null, + "feature": "init_elements", "created": "2020-02-02T01:23:45.678Z", "updated": "2020-02-02T01:23:45.678Z" } @@ -289,6 +291,7 @@ "docker_image_iid": null, "revision_url": null, "tag": null, + "feature": null, "created": "2020-02-02T01:23:45.678Z", "updated": "2020-02-02T01:23:45.678Z" } @@ -306,6 +309,7 @@ "docker_image_iid": "registry.somewhere.com/something:latest", "revision_url": null, "tag": null, + "feature": null, "created": "2020-02-02T01:23:45.678Z", "updated": "2020-02-02T01:23:45.678Z" } @@ -325,6 +329,7 @@ "docker_image_iid": "registry.somewhere.com/something:latest", "revision_url": null, "tag": null, + "feature": null, "created": "2020-02-02T01:23:45.678Z", "updated": "2020-02-02T01:23:45.678Z" } @@ -344,6 +349,7 @@ "docker_image_iid": "registry.somewhere.com/something:latest", "revision_url": null, "tag": null, + "feature": null, "created": "2020-02-02T01:23:45.678Z", "updated": "2020-02-02T01:23:45.678Z" } @@ -363,6 +369,7 @@ "docker_image_iid": "registry.somewhere.com/something:latest", "revision_url": null, "tag": null, + "feature": null, "created": "2020-02-02T01:23:45.678Z", "updated": "2020-02-02T01:23:45.678Z" } diff --git a/arkindex/documents/management/commands/build_fixtures.py b/arkindex/documents/management/commands/build_fixtures.py index ff5049a195591c2396833f3601f7f33cc92e0096..2029047e68263eae1a792c2fe4d041d2110bbd5a 100644 --- a/arkindex/documents/management/commands/build_fixtures.py +++ b/arkindex/documents/management/commands/build_fixtures.py @@ -9,6 +9,7 @@ from arkindex.documents.models import Corpus, Element, MetaData, MetaType from arkindex.images.models import Image, ImageServer from arkindex.ponos.models import Farm from arkindex.process.models import ( + ArkindexFeature, FeatureUsage, Process, ProcessMode, @@ -93,7 +94,8 @@ class Command(BaseCommand): } }, state=WorkerVersionState.Available, - docker_image_iid="registry.gitlab.teklia.com/arkindex/workers/init-elements:latest" + docker_image_iid="registry.gitlab.teklia.com/arkindex/workers/init-elements:latest", + feature=ArkindexFeature.InitElements, ) # Create some workers with available versions diff --git a/arkindex/process/admin.py b/arkindex/process/admin.py index 14becfab776196c820cd17757b69daf9688f3d34..7d427806eb78fe2ee3cbf6311c71111a7188c800 100644 --- a/arkindex/process/admin.py +++ b/arkindex/process/admin.py @@ -101,11 +101,25 @@ class WorkerTypeAdmin(admin.ModelAdmin): class WorkerVersionAdmin(admin.ModelAdmin): - list_display = ("id", "worker", "version") - list_filter = ("worker", ) - fields = ("id", "worker", "version", "configuration", "model_usage", "gpu_usage", "docker_image_iid") + list_display = ("id", "worker", "version", "feature") + list_filter = ("worker", "feature") + fields = ("id", "worker", "version", "configuration", "model_usage", "gpu_usage", "docker_image_iid", "state", "feature") readonly_fields = ("id", ) + def save_model(self, request, obj, form, change): + # When a WorkerVersion is created with a feature, or an existing one has its feature updated, clear the cached versions providing features + if form["feature"]._has_changed(): + # `cache_clear` is a function defined by the `functools.lru_cache` decorator + # on the function itself, not on its return value + WorkerVersion.objects.get_by_feature.cache_clear() + super().save_model(request, obj, form, change) + + def delete_model(self, request, obj): + # When this WorkerVersion provides an Arkindex feature, clear the cached versions providing features + if obj.feature is not None: + WorkerVersion.objects.get_by_feature.cache_clear() + super().delete_model(request, obj) + class WorkerConfigurationAdmin(admin.ModelAdmin): list_display = ("id", "name", "worker") diff --git a/arkindex/process/builder.py b/arkindex/process/builder.py index c5ba7eb84e5f145c2b9085e4ba7fe746fbeabc09..e17f55d06825065a3e98e5c6b78d3b4af38eee81 100644 --- a/arkindex/process/builder.py +++ b/arkindex/process/builder.py @@ -236,8 +236,8 @@ class ProcessBuilder: self._create_worker_versions_cache([(settings.IMPORTS_WORKER_VERSION, None, None)]) def build_s3(self): - from arkindex.process.models import WorkerVersion - ingest_version = WorkerVersion.objects.ingest_version + from arkindex.process.models import ArkindexFeature, WorkerVersion + ingest_version = WorkerVersion.objects.get_by_feature(ArkindexFeature.S3Ingest) worker_configuration, _ = ingest_version.worker.configurations.get_or_create( configuration={ @@ -277,14 +277,15 @@ class ProcessBuilder: @prefetch_worker_runs def build_workers(self): - from arkindex.process.models import WorkerVersion + from arkindex.process.models import ArkindexFeature, WorkerVersion # Retrieve worker runs worker_runs = list(self.process.worker_runs.all()) # Find the WorkerRun to use for the initialisation task, or create it + init_elements_version = WorkerVersion.objects.get_by_feature(ArkindexFeature.InitElements) initialisation_runs = [ - run for run in worker_runs if run.version == WorkerVersion.objects.init_elements_version and not len(run.parents) + run for run in worker_runs if run.version == init_elements_version and not len(run.parents) ] if len(initialisation_runs): # In case there is more than one run using the element initialisation worker, use the first one and ignore the others. @@ -294,9 +295,8 @@ class ProcessBuilder: worker_runs.remove(initialisation_worker_run) # If there is no elements initialisation worker run in the process, create one else: - initialisation_worker = WorkerVersion.objects.init_elements_version initialisation_worker_run = self.process.worker_runs.create( - version=initialisation_worker + version=init_elements_version ) # Link all parentless worker runs to the initialisation worker run no_parents = [run for run in worker_runs if not len(run.parents)] diff --git a/arkindex/process/managers.py b/arkindex/process/managers.py index 44e89ce0b2d4fc3e8298f7f205367127452a89d6..ede67482f78d2d1730ae15574f42b2ffc5a02d79 100644 --- a/arkindex/process/managers.py +++ b/arkindex/process/managers.py @@ -1,6 +1,6 @@ import logging import operator -from functools import reduce +from functools import lru_cache, reduce from django.conf import settings from django.db import connections @@ -158,35 +158,17 @@ class WorkerResultSourceQuerySet(QuerySet): class WorkerVersionManager(Manager): - def _get_image_version(self, docker_image_iid, title): - from arkindex.process.models import WorkerVersionState - version = ( - self - .select_related("worker") - .get(docker_image_iid=docker_image_iid) - ) - if version.state != WorkerVersionState.Available: - raise ValueError(f"The {title} worker version must be 'available'.") - return version - - @cached_property - def init_elements_version(self): - """ - WorkerVersion for elements initialization. - """ - return self._get_image_version(settings.INIT_ELEMENTS_DOCKER_IMAGE, "elements initialization") - - @cached_property - def ingest_version(self): - """ - WorkerVersion for S3 ingest processes. - """ - return self._get_image_version(settings.INGEST_DOCKER_IMAGE, "S3 ingest") + @lru_cache + def get_by_feature(self, feature): + try: + return self.select_related("worker").get(feature=feature) + except self.model.DoesNotExist: + raise self.model.DoesNotExist(f"There are no worker versions supporting the {feature.name} feature.") @cached_property def imports_version(self): """ - WorkerVersion used for all import tasks. + WorkerVersion used for file import tasks. """ return ( self diff --git a/arkindex/process/migrations/0042_workerversion_feature.py b/arkindex/process/migrations/0042_workerversion_feature.py new file mode 100644 index 0000000000000000000000000000000000000000..dc2972555544c84c91d2e2fce30c3ff7317d3aaa --- /dev/null +++ b/arkindex/process/migrations/0042_workerversion_feature.py @@ -0,0 +1,45 @@ +# Generated by Django 5.0.8 on 2024-08-20 18:02 + +from django.db import migrations, models +from enumfields import EnumField + +from arkindex.process.models import ArkindexFeature, WorkerVersionState + + +class Migration(migrations.Migration): + + dependencies = [ + ("documents", "0012_alter_transcriptionentity_id"), + ("process", "0041_drop_old_git_references"), + ] + + operations = [ + migrations.AddField( + model_name="workerversion", + name="feature", + field=EnumField( + blank=True, + enum=ArkindexFeature, + help_text="An Arkindex feature that this worker version provides. There can only be one worker version per feature across an Arkindex instance.", + max_length=20, + null=True, + ), + ), + migrations.AddConstraint( + model_name="workerversion", + constraint=models.UniqueConstraint( + condition=models.Q(feature__isnull=False), + fields=("feature",), + name="workerversion_unique_feature", + violation_error_message="There can only be one worker version per feature across an Arkindex instance.", + ), + ), + migrations.AddConstraint( + model_name="workerversion", + constraint=models.CheckConstraint( + check=models.Q(state=WorkerVersionState.Available) | models.Q(feature=None), + name="workerversion_feature_requires_available", + violation_error_message="A worker version can only provide an Arkindex feature if it is marked as available.", + ), + ), + ] diff --git a/arkindex/process/models.py b/arkindex/process/models.py index 905fbc5289484c63b560649df33e42c36a2c999b..b0e0d243f4da58da64c532d9c979423e7bb49c35 100644 --- a/arkindex/process/models.py +++ b/arkindex/process/models.py @@ -316,7 +316,7 @@ class Process(IndexableModel): # First we copy each worker runs, excluding elements initialisation worker runs for run in self.worker_runs.all(): - if run.version_id != WorkerVersion.objects.init_elements_version.id: + if run.version_id != WorkerVersion.objects.get_by_feature(ArkindexFeature.InitElements).id: # Create a new WorkerRun with same version, configuration and parents. new_run = WorkerRun( process=new_process, @@ -660,6 +660,12 @@ class FeatureUsage(Enum): Required = "required" +class ArkindexFeature(Enum): + InitElements = "init_elements" + FileImport = "file_import" + S3Ingest = "s3_ingest" + + class WorkerVersion(models.Model): id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) worker = models.ForeignKey("process.Worker", on_delete=models.CASCADE, related_name="versions") @@ -681,6 +687,15 @@ class WorkerVersion(models.Model): revision_url = models.URLField(null=True, blank=True, max_length=250, default=None) tag = models.CharField(blank=True, null=True, max_length=50, default=None) + feature = EnumField( + ArkindexFeature, + max_length=20, + blank=True, + null=True, + help_text="An Arkindex feature that this worker version provides. " + "There can only be one worker version per feature across an Arkindex instance.", + ) + corpora = models.ManyToManyField( "documents.Corpus", through="process.CorpusWorkerVersion", @@ -721,6 +736,17 @@ class WorkerVersion(models.Model): name="workerversion_unique_tag", condition=Q(tag__isnull=False), ), + models.UniqueConstraint( + fields=["feature"], + name="workerversion_unique_feature", + condition=Q(feature__isnull=False), + violation_error_message="There can only be one worker version per feature across an Arkindex instance.", + ), + models.CheckConstraint( + check=Q(state=WorkerVersionState.Available) | Q(feature=None), + name="workerversion_feature_requires_available", + violation_error_message="A worker version can only provide an Arkindex feature if it is marked as available.", + ), ] def __str__(self): @@ -870,7 +896,7 @@ class WorkerRun(models.Model): worker_run.task_slug + slug_suffix # If the parent is the initialisation task, no suffix is added as it is not being split in # chunks and the import_task_name is used, not worker_run.task_slug - if worker_run.version_id != WorkerVersion.objects.init_elements_version.id + if worker_run.version_id != WorkerVersion.objects.get_by_feature(ArkindexFeature.InitElements).id else import_task_name for worker_run in parent_runs ] diff --git a/arkindex/process/tasks.py b/arkindex/process/tasks.py index f2491994aeaf1c82089eab058e287dc505d910cf..de36e4a46e311e724829ca7be9a80040a56ee3b5 100644 --- a/arkindex/process/tasks.py +++ b/arkindex/process/tasks.py @@ -11,6 +11,7 @@ from rq import Retry, get_current_job from arkindex.process.models import ( ActivityState, + ArkindexFeature, Process, ProcessMode, WorkerActivity, @@ -30,7 +31,7 @@ def initialize_activity(process: Process): with transaction.atomic(): for version_id, configuration_id, model_version_id in process.worker_runs.values_list("version_id", "configuration_id", "model_version_id"): # Do not generate worker activities for the elements initialisation task - if version_id != WorkerVersion.objects.init_elements_version.id: + if version_id != WorkerVersion.objects.get_by_feature(ArkindexFeature.InitElements).id: WorkerActivity.objects.bulk_insert( worker_version_id=version_id, process_id=process.id, diff --git a/arkindex/process/tests/test_create_s3_import.py b/arkindex/process/tests/test_create_s3_import.py index 013ea9c592b804fb4997f333574aaddf5912690f..5de4d3c8ff03b4f72615c5924cb49da666fa83c9 100644 --- a/arkindex/process/tests/test_create_s3_import.py +++ b/arkindex/process/tests/test_create_s3_import.py @@ -7,7 +7,15 @@ from rest_framework import status from arkindex.documents.models import Corpus from arkindex.images.models import ImageServer from arkindex.ponos.models import Farm -from arkindex.process.models import Process, ProcessMode, Worker, WorkerType, WorkerVersion, WorkerVersionState +from arkindex.process.models import ( + ArkindexFeature, + Process, + ProcessMode, + Worker, + WorkerType, + WorkerVersion, + WorkerVersionState, +) from arkindex.project.tests import FixtureTestCase from arkindex.users.models import Role @@ -25,6 +33,7 @@ class TestCreateS3Import(FixtureTestCase): ), version=1, state=WorkerVersionState.Available, + feature=ArkindexFeature.S3Ingest, docker_image_iid="registry.gitlab.teklia.com/arkindex/s4:latest", configuration={ "docker": { diff --git a/arkindex/process/tests/test_elements_initialisation.py b/arkindex/process/tests/test_elements_initialisation.py index 1f2a8259e96fbc93b8bb03240c061cf9cb96a829..add8b6015c40044b49febe74f2e9205847cf023b 100644 --- a/arkindex/process/tests/test_elements_initialisation.py +++ b/arkindex/process/tests/test_elements_initialisation.py @@ -2,7 +2,7 @@ from rest_framework import status from rest_framework.reverse import reverse from arkindex.ponos.models import Farm -from arkindex.process.models import Process, ProcessMode, WorkerVersion +from arkindex.process.models import ArkindexFeature, Process, ProcessMode, WorkerVersion from arkindex.project.tests import FixtureAPITestCase @@ -13,7 +13,7 @@ class TestElementsInit(FixtureAPITestCase): @classmethod def setUpTestData(cls): super().setUpTestData() - cls.init_elements_version = WorkerVersion.objects.init_elements_version + cls.init_elements_version = WorkerVersion.objects.get_by_feature(ArkindexFeature.InitElements) cls.reco_version = WorkerVersion.objects.get(worker__slug="reco") cls.dla_version = WorkerVersion.objects.get(worker__slug="dla") cls.process = Process.objects.create( diff --git a/arkindex/process/tests/test_processes.py b/arkindex/process/tests/test_processes.py index eceab04504b310ad6cb858837d79ef7cc53bf27e..fa0537f20cef227c65ea7582355e2901dea96534 100644 --- a/arkindex/process/tests/test_processes.py +++ b/arkindex/process/tests/test_processes.py @@ -14,6 +14,7 @@ from arkindex.documents.models import Corpus, ElementType from arkindex.ponos.models import Farm, State from arkindex.process.models import ( ActivityState, + ArkindexFeature, DataFile, FeatureUsage, Process, @@ -1745,6 +1746,8 @@ class TestProcesses(FixtureAPITestCase): element_type=self.page_type, creator=self.user, ) + self.elements_init_worker.feature = ArkindexFeature.S3Ingest + self.elements_init_worker.save() process.worker_runs.create( version=self.elements_init_worker, configuration=self.elements_init_worker.worker.configurations.create( diff --git a/arkindex/process/tests/test_templates.py b/arkindex/process/tests/test_templates.py index 98cf6a78a31706176375899b952edd489ba44d45..2d0c47c3c75eb56b3e87d8b1058010553ef843c0 100644 --- a/arkindex/process/tests/test_templates.py +++ b/arkindex/process/tests/test_templates.py @@ -6,6 +6,7 @@ from rest_framework.reverse import reverse from arkindex.documents.models import Corpus from arkindex.process.models import ( + ArkindexFeature, FeatureUsage, Process, ProcessMode, @@ -155,7 +156,8 @@ class TestTemplates(FixtureAPITestCase): ]) def test_create_excludes_init_elements(self): - init_run = self.process_template.worker_runs.create(version=WorkerVersion.objects.init_elements_version) + init_version = WorkerVersion.objects.get_by_feature(ArkindexFeature.InitElements) + init_run = self.process_template.worker_runs.create(version=init_version) self.run_1.parents = [init_run.id] self.run_1.save() @@ -178,7 +180,7 @@ class TestTemplates(FixtureAPITestCase): self.assertEqual(Process.objects.get(id=new_process_id).worker_runs.count(), 2) # No elements initialisation run in the created template - self.assertFalse(WorkerRun.objects.filter(process_id=new_process_id, version_id=WorkerVersion.objects.init_elements_version).exists()) + self.assertFalse(WorkerRun.objects.filter(process_id=new_process_id, version=init_version).exists()) child_run, parent_run = WorkerRun.objects.select_related("version__worker").filter(process__id=new_process_id).order_by("version__worker__slug").all() # Check dependencies @@ -416,7 +418,8 @@ class TestTemplates(FixtureAPITestCase): self.assertListEqual(child_run.parents, [parent_run.id]) def test_apply_excludes_init_elements(self): - init_run = self.template.worker_runs.create(version=WorkerVersion.objects.init_elements_version) + init_version = WorkerVersion.objects.get_by_feature(ArkindexFeature.InitElements) + init_run = self.template.worker_runs.create(version=init_version) self.template_run_1.parents = [init_run.id] self.template_run_1.save() @@ -454,7 +457,7 @@ class TestTemplates(FixtureAPITestCase): self.assertListEqual(child_run.parents, [parent_run.id]) # No elements initialisation run in the created process - self.assertFalse(self.process.worker_runs.filter(version_id=WorkerVersion.objects.init_elements_version).exists()) + self.assertFalse(self.process.worker_runs.filter(version=init_version).exists()) def test_apply_delete_previous_worker_runs(self): self.client.force_login(self.user) diff --git a/arkindex/project/checks.py b/arkindex/project/checks.py index 793ac8313a32b8a58f7447d4fe7f1cba096ebba1..993b21996bdc1c876a3d70368a270f84dc4e6fd2 100644 --- a/arkindex/project/checks.py +++ b/arkindex/project/checks.py @@ -11,7 +11,7 @@ import sys from django.core.checks import Critical, Error, Warning, register -from arkindex.process.models import WorkerVersion +from arkindex.process.models import ArkindexFeature, WorkerVersion def only_runserver(func): @@ -299,26 +299,14 @@ def signup_default_group_check(*args, **kwargs): @register() @only_runserver def init_elements_check(*args, **kwargs): - from django.conf import settings - docker_image_iid = settings.INIT_ELEMENTS_DOCKER_IMAGE - - warning = None - try: - WorkerVersion.objects.init_elements_version + WorkerVersion.objects.get_by_feature(ArkindexFeature.InitElements) except WorkerVersion.DoesNotExist: - warning = f"Worker version with Docker image {docker_image_iid} does not exist." - except WorkerVersion.MultipleObjectsReturned: - warning = f"Multiple worker versions returned for Docker image {docker_image_iid}." - except ValueError: - warning = f"Worker version with Docker image {docker_image_iid} is not in the Available state." - - if warning: return [Warning( - f"{warning}\n" + "There is no worker version available to initialize elements.\n" "This worker version is required to initialize elements when running a process on Arkindex.\n" "Starting or retrying processes will fail.", - hint=f"settings.INIT_ELEMENTS_DOCKER_IMAGE = {docker_image_iid}", + hint="Run `arkindex update_system_workers` or set a worker version's `feature` to `init_elements`.", id="arkindex.W014", )] @@ -328,29 +316,18 @@ def init_elements_check(*args, **kwargs): @only_runserver def ingest_version_check(*args, **kwargs): from django.conf import settings - docker_image_iid = settings.INGEST_DOCKER_IMAGE # Do not run any check when the S3 ingest feature flag is disabled if not settings.ARKINDEX_FEATURES["ingest"]: return [] - warning = None - try: - WorkerVersion.objects.ingest_version + WorkerVersion.objects.get_by_feature(ArkindexFeature.S3Ingest) except WorkerVersion.DoesNotExist: - warning = f"Worker version with Docker image {docker_image_iid} does not exist." - except WorkerVersion.MultipleObjectsReturned: - warning = f"Multiple worker versions returned for Docker image {docker_image_iid}." - except ValueError: - warning = f"Worker version with Docker image {docker_image_iid} is not in the Available state." - - if warning: return [Warning( - f"{warning}\n" - "This worker version is required to import from S3 on Arkindex.\n" + "There is no worker version available to import elements from S3.\n" "Starting or retrying S3 import processes will fail.", - hint=f"settings.INGEST_DOCKER_IMAGE = {docker_image_iid}", + hint="Run `arkindex update_system_workers` or set a worker version's `feature` to `s3_ingest`.", id="arkindex.W015", )] diff --git a/arkindex/project/config.py b/arkindex/project/config.py index d4e61a5bb8ce763c95b7cc89b5e4bd0331b291b0..e62922511b3d92a07f88647c423322fab2612206 100644 --- a/arkindex/project/config.py +++ b/arkindex/project/config.py @@ -177,11 +177,6 @@ def get_settings_parser(base_dir): # Default task expiry delay in days ponos_parser.add_option("task_expiry", type=int, default=30) - docker_parser = parser.add_subparser("docker", default={}) - docker_parser.add_option("tasks_image", type=str, default="registry.gitlab.teklia.com/arkindex/tasks") - docker_parser.add_option("init_elements_image", type=str, default="registry.gitlab.teklia.com/arkindex/workers/init-elements:latest") - docker_parser.add_option("ingest_image", type=str, default="registry.gitlab.teklia.com/arkindex/workers/import/s3:latest") - sentry_parser = parser.add_subparser("sentry", default={}) sentry_parser.add_option("dsn", type=str, default=None) sentry_parser.add_option("frontend_dsn", type=str, default=None) diff --git a/arkindex/project/settings.py b/arkindex/project/settings.py index a358754fe948ea07b3d9b81382be4d5e5d0d1b96..54a153a7059e5bf723e438a8c702f94582f33668 100644 --- a/arkindex/project/settings.py +++ b/arkindex/project/settings.py @@ -562,11 +562,7 @@ PONOS_DATA_DIR = "/data" PONOS_TASK_EXPIRY = conf["ponos"]["task_expiry"] # Docker images used by our ponos workflow -ARKINDEX_TASKS_IMAGE = conf["docker"]["tasks_image"] -# Docker image tag of the init elements worker -INIT_ELEMENTS_DOCKER_IMAGE = conf["docker"]["init_elements_image"] -# Docker image tag of the S3 ingest worker -INGEST_DOCKER_IMAGE = conf["docker"]["ingest_image"] +ARKINDEX_TASKS_IMAGE = "registry.gitlab.teklia.com/arkindex/tasks:0.6.2" # Robots.txt options ROBOTS_TXT_DISALLOW = conf["robots_txt_disallow"] diff --git a/arkindex/project/tests/__init__.py b/arkindex/project/tests/__init__.py index 84204281c02abb13a13a184c61dc3cc2e8cd16ff..968d84802a6707807c5a8e8959b56dfaa987da4e 100644 --- a/arkindex/project/tests/__init__.py +++ b/arkindex/project/tests/__init__.py @@ -130,6 +130,10 @@ class FixtureMixin: # Clean content type cache for SQL requests checks consistency ContentType.objects.clear_cache() + # `cache_clear` is a function defined by the `functools.lru_cache` decorator + # on the function itself, not on its return value + WorkerVersion.objects.get_by_feature.cache_clear() + # Clear the local cached properties so that it is re-fetched on each test # to avoid intermittently changing query counts. # Using `del` on a cached property that has not been accessed yet can cause an AttributeError. @@ -142,14 +146,6 @@ class FixtureMixin: del WorkerVersion.objects.imports_version except AttributeError: pass - try: - del WorkerVersion.objects.init_elements_version - except AttributeError: - pass - try: - del WorkerVersion.objects.ingest_version - except AttributeError: - pass try: del ImageServer.objects.ingest except AttributeError: diff --git a/arkindex/project/tests/config_samples/defaults.yaml b/arkindex/project/tests/config_samples/defaults.yaml index a678d3fcd46b9be13a363a629a750082194d6bff..b4984a88abaf813b2651b42aade5a23490e92ae7 100644 --- a/arkindex/project/tests/config_samples/defaults.yaml +++ b/arkindex/project/tests/config_samples/defaults.yaml @@ -30,10 +30,6 @@ database: port: 5432 replica: null user: devuser -docker: - ingest_image: registry.gitlab.teklia.com/arkindex/workers/import/s3:latest - init_elements_image: registry.gitlab.teklia.com/arkindex/workers/init-elements:latest - tasks_image: registry.gitlab.teklia.com/arkindex/tasks email: null export: ttl: 21600 diff --git a/arkindex/project/tests/config_samples/errors.yaml b/arkindex/project/tests/config_samples/errors.yaml index e0b77e72dc3b7e9d3e6276a1c98c56f5caa2aef0..d30364d646ecd2a8e88b66a29f29bd44071e1232 100644 --- a/arkindex/project/tests/config_samples/errors.yaml +++ b/arkindex/project/tests/config_samples/errors.yaml @@ -20,9 +20,6 @@ database: password: hunter2 port: rotterdam user: bob -docker: - tasks_image: - here: have a dict email: host: 123 export: diff --git a/arkindex/project/tests/config_samples/override.yaml b/arkindex/project/tests/config_samples/override.yaml index 238cd7171bd73bb473e10a0de64b5204e96aa329..21216d362727710162192af3df6428c4668ff70d 100644 --- a/arkindex/project/tests/config_samples/override.yaml +++ b/arkindex/project/tests/config_samples/override.yaml @@ -36,10 +36,6 @@ database: port: 1 user: postgres user: littlebobbytables -docker: - ingest_image: yrtsiger.baltig.ailket.moc/xednikra/tropmi:tsetal - init_elements_image: registry.gitlab.teklia.com/arkindex/entry_plug:latest - tasks_image: registry.gitlab.teklia.com/arkindex/stonks email: error_report_recipients: - noreply@nasa.gov diff --git a/arkindex/project/tests/test_checks.py b/arkindex/project/tests/test_checks.py index cb8d407e22ab0c89c82d54174287794133c37127..ea9723e359558853fe6b87bba272f4d47b6f9371 100644 --- a/arkindex/project/tests/test_checks.py +++ b/arkindex/project/tests/test_checks.py @@ -376,300 +376,84 @@ class ChecksTestCase(TestCase): self.assertListEqual(signup_default_group_check(), []) def test_init_elements_worker_version_check_ok(self): - from arkindex.process.models import Worker, WorkerType, WorkerVersion, WorkerVersionState + from arkindex.process.models import ArkindexFeature, Worker, WorkerType, WorkerVersion, WorkerVersionState from arkindex.project.checks import init_elements_check - with self.settings(INIT_ELEMENTS_DOCKER_IMAGE="registry.teklia.gitlab.com/submarine-reflection:latest"): - try: - del WorkerVersion.objects.init_elements_version - except AttributeError: - pass - - init_type = WorkerType.objects.create(slug="init", display_name="Elements Initialisation") - WorkerVersion.objects.create( - worker=Worker.objects.create( + init_type = WorkerType.objects.create(slug="init", display_name="Elements Initialisation") + WorkerVersion.objects.create( + worker=Worker.objects.create( name="Elements Initialisation Worker", slug="initialisation", type=init_type, - ), - version=1, - configuration={ - "docker": { - "command": "worker-init-elements" - } - }, - state=WorkerVersionState.Available, - docker_image_iid=settings.INIT_ELEMENTS_DOCKER_IMAGE - ) - self.assertListEqual(init_elements_check(), []) + ), + version=1, + configuration={ + "docker": { + "command": "worker-init-elements" + } + }, + state=WorkerVersionState.Available, + docker_image_iid="registry.teklia.gitlab.com/submarine-reflection:latest", + feature=ArkindexFeature.InitElements, + ) + self.assertListEqual(init_elements_check(), []) def test_init_elements_worker_version_check_missing(self): """ There is a warning when no worker version with the docker_image_iid settings.INIT_ELEMENTS_DOCKER_IMAGE exists """ - from arkindex.process.models import WorkerVersion - from arkindex.project.checks import init_elements_check - - with self.settings(INIT_ELEMENTS_DOCKER_IMAGE="registry.teklia.gitlab.com/submarine-reflection:latest"): - try: - del WorkerVersion.objects.init_elements_version - except AttributeError: - pass - - self.assertListEqual(init_elements_check(), [ - Warning( - "Worker version with Docker image registry.teklia.gitlab.com/submarine-reflection:latest does not exist.\n" - "This worker version is required to initialize elements when running a process on Arkindex.\n" - "Starting or retrying processes will fail.", - hint="settings.INIT_ELEMENTS_DOCKER_IMAGE = registry.teklia.gitlab.com/submarine-reflection:latest", - id="arkindex.W014", - ), - ]) - - def test_init_elements_worker_version_check_unavailable(self): - """ - There is a warning when the worker version exists but is not available - """ - from arkindex.process.models import Worker, WorkerType, WorkerVersion, WorkerVersionState - from arkindex.project.checks import init_elements_check - - with self.settings(INIT_ELEMENTS_DOCKER_IMAGE="registry.teklia.gitlab.com/submarine-reflection:latest"): - try: - del WorkerVersion.objects.init_elements_version - except AttributeError: - pass - - init_type = WorkerType.objects.create(slug="init", display_name="Elements Initialisation") - WorkerVersion.objects.create( - worker=Worker.objects.create( - name="Elements Initialisation Worker", - slug="initialisation", - type=init_type, - ), - version=1, - configuration={ - "docker": { - "command": "worker-init-elements" - } - }, - state=WorkerVersionState.Created, - docker_image_iid=settings.INIT_ELEMENTS_DOCKER_IMAGE - ) - self.assertListEqual(init_elements_check(), [ - Warning( - "Worker version with Docker image registry.teklia.gitlab.com/submarine-reflection:latest is not in the Available state.\n" - "This worker version is required to initialize elements when running a process on Arkindex.\n" - "Starting or retrying processes will fail.", - hint="settings.INIT_ELEMENTS_DOCKER_IMAGE = registry.teklia.gitlab.com/submarine-reflection:latest", - id="arkindex.W014", - ), - ]) - - def test_init_elements_worker_version_check_multiple(self): - """ - There is a warning if more than one worker version exists with the docker_image_iid setting.INIT_ELEMENTS_DOCKER_IMAGE - """ - from arkindex.process.models import Worker, WorkerType, WorkerVersion, WorkerVersionState from arkindex.project.checks import init_elements_check - with self.settings(INIT_ELEMENTS_DOCKER_IMAGE="registry.teklia.gitlab.com/submarine-reflection:latest"): - try: - del WorkerVersion.objects.init_elements_version - except AttributeError: - pass - - init_type = WorkerType.objects.create(slug="init", display_name="Elements Initialisation") - WorkerVersion.objects.create( - worker=Worker.objects.create( - name="Elements Initialisation Worker", - slug="initialisation", - type=init_type, - ), - version=1, - configuration={ - "docker": { - "command": "worker-init-elements" - } - }, - state=WorkerVersionState.Available, - docker_image_iid=settings.INIT_ELEMENTS_DOCKER_IMAGE - ) - WorkerVersion.objects.create( - worker=Worker.objects.create( - name="Shine Aqua Illusion", - slug="shineaqua", - type=init_type, - ), - version=1, - configuration={ - "docker": { - "command": "worker-init-elements" - } - }, - state=WorkerVersionState.Created, - docker_image_iid=settings.INIT_ELEMENTS_DOCKER_IMAGE - ) - - self.assertEqual(WorkerVersion.objects.filter(docker_image_iid="registry.teklia.gitlab.com/submarine-reflection:latest").count(), 2) - self.assertListEqual(init_elements_check(), [ - Warning( - "Multiple worker versions returned for Docker image registry.teklia.gitlab.com/submarine-reflection:latest.\n" - "This worker version is required to initialize elements when running a process on Arkindex.\n" - "Starting or retrying processes will fail.", - hint="settings.INIT_ELEMENTS_DOCKER_IMAGE = registry.teklia.gitlab.com/submarine-reflection:latest", - id="arkindex.W014", - ), - ]) + self.assertListEqual(init_elements_check(), [ + Warning( + "There is no worker version available to initialize elements.\n" + "This worker version is required to initialize elements when running a process on Arkindex.\n" + "Starting or retrying processes will fail.", + hint="Run `arkindex update_system_workers` or set a worker version's `feature` to `init_elements`.", + id="arkindex.W014", + ), + ]) + @override_settings(ARKINDEX_FEATURES={"ingest": True}) def test_ingest_worker_version_check_ok(self): - from arkindex.process.models import Worker, WorkerType, WorkerVersion, WorkerVersionState + from arkindex.process.models import ArkindexFeature, Worker, WorkerType, WorkerVersion, WorkerVersionState from arkindex.project.checks import ingest_version_check - with self.settings(INGEST_DOCKER_IMAGE="registry.teklia.gitlab.com/submarine-reflection:latest"): - try: - del WorkerVersion.objects.ingest_version - except AttributeError: - pass - - import_type = WorkerType.objects.create(slug="import", display_name="Import") - WorkerVersion.objects.create( - worker=Worker.objects.create( - name="S3 Import Worker", - slug="s3-import", - type=import_type, - ), - version=1, - state=WorkerVersionState.Available, - docker_image_iid=settings.INGEST_DOCKER_IMAGE - ) - self.assertListEqual(ingest_version_check(), []) + import_type = WorkerType.objects.create(slug="import", display_name="Import") + WorkerVersion.objects.create( + worker=Worker.objects.create( + name="S3 Import Worker", + slug="s3-import", + type=import_type, + ), + version=1, + state=WorkerVersionState.Available, + docker_image_iid="registry.teklia.gitlab.com/submarine-reflection:latest", + feature=ArkindexFeature.S3Ingest, + ) + self.assertListEqual(ingest_version_check(), []) + @override_settings(ARKINDEX_FEATURES={"ingest": True}) def test_ingest_worker_version_check_missing(self): """ There is a warning when no worker version with the docker_image_iid settings.INGEST_DOCKER_IMAGE exists """ - from arkindex.process.models import WorkerVersion from arkindex.project.checks import ingest_version_check - with self.settings( - ARKINDEX_FEATURES={"ingest": True}, - INGEST_DOCKER_IMAGE="registry.teklia.gitlab.com/submarine-reflection:latest", - ): - try: - del WorkerVersion.objects.ingest_version - except AttributeError: - pass - - self.assertListEqual(ingest_version_check(), [ - Warning( - "Worker version with Docker image registry.teklia.gitlab.com/submarine-reflection:latest does not exist.\n" - "This worker version is required to import from S3 on Arkindex.\n" - "Starting or retrying S3 import processes will fail.", - hint="settings.INGEST_DOCKER_IMAGE = registry.teklia.gitlab.com/submarine-reflection:latest", - id="arkindex.W015", - ), - ]) + self.assertListEqual(ingest_version_check(), [ + Warning( + "There is no worker version available to import elements from S3.\n" + "Starting or retrying S3 import processes will fail.", + hint="Run `arkindex update_system_workers` or set a worker version's `feature` to `s3_ingest`.", + id="arkindex.W015", + ), + ]) + @override_settings(ARKINDEX_FEATURES={"ingest": False}) def test_ingest_worker_version_check_feature_disabled(self): """ The ingest worker version check does not run when the ingest feature flag is disabled """ - from arkindex.process.models import WorkerVersion from arkindex.project.checks import ingest_version_check - with self.settings( - ARKINDEX_FEATURES={"ingest": False}, - INGEST_DOCKER_IMAGE="registry.teklia.gitlab.com/submarine-reflection:latest", - ): - try: - del WorkerVersion.objects.ingest_version - except AttributeError: - pass - - self.assertListEqual(ingest_version_check(), []) - - def test_ingest_worker_version_check_unavailable(self): - """ - There is a warning when the worker version exists but is not available - """ - from arkindex.process.models import Worker, WorkerType, WorkerVersion, WorkerVersionState - from arkindex.project.checks import ingest_version_check - - with self.settings( - ARKINDEX_FEATURES={"ingest": True}, - INGEST_DOCKER_IMAGE="registry.teklia.gitlab.com/submarine-reflection:latest", - ): - try: - del WorkerVersion.objects.ingest_version - except AttributeError: - pass - - import_type = WorkerType.objects.create(slug="import", display_name="Import") - WorkerVersion.objects.create( - worker=Worker.objects.create( - name="S3 Import Worker", - slug="s3-import", - type=import_type, - ), - version=1, - state=WorkerVersionState.Created, - docker_image_iid=settings.INGEST_DOCKER_IMAGE - ) - self.assertListEqual(ingest_version_check(), [ - Warning( - "Worker version with Docker image registry.teklia.gitlab.com/submarine-reflection:latest is not in the Available state.\n" - "This worker version is required to import from S3 on Arkindex.\n" - "Starting or retrying S3 import processes will fail.", - hint="settings.INGEST_DOCKER_IMAGE = registry.teklia.gitlab.com/submarine-reflection:latest", - id="arkindex.W015", - ), - ]) - - def test_ingest_worker_version_check_multiple(self): - """ - There is a warning if more than one worker version exists with the docker_image_iid setting.INGEST_DOCKER_IMAGE - """ - from arkindex.process.models import Worker, WorkerType, WorkerVersion, WorkerVersionState - from arkindex.project.checks import ingest_version_check - - with self.settings( - ARKINDEX_FEATURES={"ingest": True}, - INGEST_DOCKER_IMAGE="registry.teklia.gitlab.com/submarine-reflection:latest", - ): - try: - del WorkerVersion.objects.ingest_version - except AttributeError: - pass - - import_type = WorkerType.objects.create(slug="import", display_name="Import") - WorkerVersion.objects.create( - worker=Worker.objects.create( - name="S3 Import Worker", - slug="s3-import", - type=import_type, - ), - version=1, - state=WorkerVersionState.Available, - docker_image_iid=settings.INGEST_DOCKER_IMAGE - ) - WorkerVersion.objects.create( - worker=Worker.objects.create( - name="Shine Aqua Illusion", - slug="shineaqua", - type=import_type, - ), - version=1, - state=WorkerVersionState.Created, - docker_image_iid=settings.INGEST_DOCKER_IMAGE - ) - - self.assertEqual(WorkerVersion.objects.filter(docker_image_iid="registry.teklia.gitlab.com/submarine-reflection:latest").count(), 2) - self.assertListEqual(ingest_version_check(), [ - Warning( - "Multiple worker versions returned for Docker image registry.teklia.gitlab.com/submarine-reflection:latest.\n" - "This worker version is required to import from S3 on Arkindex.\n" - "Starting or retrying S3 import processes will fail.", - hint="settings.INGEST_DOCKER_IMAGE = registry.teklia.gitlab.com/submarine-reflection:latest", - id="arkindex.W015", - ), - ]) + self.assertListEqual(ingest_version_check(), []) diff --git a/arkindex/sql_validation/indexer_prefetch.sql b/arkindex/sql_validation/indexer_prefetch.sql index 1ee14c2bd6ed009db543fb780180c543f10d6ee2..2befb06882808636cd380dc4982931ca16d7d4ab 100644 --- a/arkindex/sql_validation/indexer_prefetch.sql +++ b/arkindex/sql_validation/indexer_prefetch.sql @@ -70,6 +70,7 @@ SELECT "process_workerversion"."id", "process_workerversion"."docker_image_iid", "process_workerversion"."revision_url", "process_workerversion"."tag", + "process_workerversion"."feature", "process_workerversion"."created", "process_workerversion"."updated" FROM "process_workerversion" @@ -160,6 +161,7 @@ SELECT "process_workerversion"."id", "process_workerversion"."docker_image_iid", "process_workerversion"."revision_url", "process_workerversion"."tag", + "process_workerversion"."feature", "process_workerversion"."created", "process_workerversion"."updated" FROM "process_workerversion"