From 58b0cdcc1fb2487b38cb9b9b82917e13562f354a Mon Sep 17 00:00:00 2001
From: Erwan Rouchet <rouchet@teklia.com>
Date: Thu, 4 Jul 2024 07:37:30 +0000
Subject: [PATCH] Use a worker for S3 imports

---
 arkindex/process/builder.py                   |  99 +++++++-------
 arkindex/process/managers.py                  |  30 +++--
 .../process/tests/test_create_s3_import.py    |  77 ++++++-----
 arkindex/process/tests/test_processes.py      |  20 ++-
 arkindex/project/checks.py                    |  28 ++++
 arkindex/project/config.py                    |   1 +
 arkindex/project/settings.py                  |   2 +
 arkindex/project/tests/__init__.py            |   4 +
 .../tests/config_samples/defaults.yaml        |   1 +
 .../tests/config_samples/override.yaml        |   1 +
 arkindex/project/tests/test_checks.py         | 126 ++++++++++++++++++
 11 files changed, 288 insertions(+), 101 deletions(-)

diff --git a/arkindex/process/builder.py b/arkindex/process/builder.py
index bdb49182e7..e38efd0699 100644
--- a/arkindex/process/builder.py
+++ b/arkindex/process/builder.py
@@ -81,14 +81,24 @@ class ProcessBuilder:
         """
         if image is None:
             image = settings.ARKINDEX_TASKS_IMAGE
+
         token = task_token_default()
+
+        env = {
+            **self.base_env,
+            **env,
+            "ARKINDEX_TASK_TOKEN": token,
+        }
+        if worker_run is not None:
+            env["ARKINDEX_WORKER_RUN_ID"] = str(worker_run.id)
+
         self.tasks.append(
             Task(
                 process=self.process,
                 command=command,
                 slug=slug,
                 token=token,
-                env={**env, "ARKINDEX_TASK_TOKEN": token},
+                env=env,
                 run=self.run,
                 image=image,
                 requires_gpu=requires_gpu,
@@ -220,28 +230,49 @@ class ProcessBuilder:
         self._build_task(
             command=f"python -m arkindex_tasks.import_files {self.process.id}",
             slug="import_files",
-            env={
-                **self.base_env,
-                "ARKINDEX_WORKER_RUN_ID": str(worker_run.id),
-            },
             worker_run=worker_run,
         )
         self._create_worker_versions_cache([(settings.IMPORTS_WORKER_VERSION, None, None)])
 
-    def build_iiif(self):
+    def build_s3(self):
         from arkindex.process.models import WorkerVersion
+        ingest_version = WorkerVersion.objects.ingest_version
 
-        worker_run = self._create_fake_worker_run(worker_version=WorkerVersion.objects.imports_version)
-        self._build_task(
-            command=f"python -m arkindex_tasks.import_iiif.process {self.process.id}",
-            slug="import_iiif",
-            env={
-                **self.base_env,
-                "ARKINDEX_WORKER_RUN_ID": str(worker_run.id),
+        worker_configuration, _ = ingest_version.worker.configurations.get_or_create(
+            configuration={
+                "bucket": self.process.bucket_name,
+                "bucket_prefix": settings.INGEST_PREFIX_BY_BUCKET_NAME,
+                "iiif_base_url": ImageServer.objects.ingest.url,
             },
+            defaults={
+                "name": f"Configuration for process {self.process.id}",
+            },
+        )
+
+        worker_run, _ = self.process.worker_runs.get_or_create(
+            version=ingest_version,
+            model_version=None,
+            configuration=worker_configuration,
+        )
+
+        env = {
+            **self.base_env,
+            "INGEST_S3_ACCESS_KEY": settings.INGEST_S3_ACCESS_KEY,
+            "INGEST_S3_SECRET_KEY": settings.INGEST_S3_SECRET_KEY,
+        }
+        if settings.INGEST_S3_ENDPOINT:
+            env["INGEST_S3_ENDPOINT"] = settings.INGEST_S3_ENDPOINT
+        if settings.INGEST_S3_REGION:
+            env["INGEST_S3_REGION"] = settings.INGEST_S3_REGION
+
+        self._build_task(
+            slug="import_s3",
+            image=ingest_version.docker_image_iid,
+            command=ingest_version.docker_command,
+            env=env,
             worker_run=worker_run,
         )
-        self._create_worker_versions_cache([(settings.IMPORTS_WORKER_VERSION, None, None)])
+        self._create_worker_versions_cache([(worker_run.version_id, None, worker_run.configuration_id)])
 
     @prefetch_worker_runs
     def build_workers(self):
@@ -275,16 +306,11 @@ class ProcessBuilder:
 
         # Create the initialisation task
         import_task_slug = "initialisation"
-        env = {
-            "ARKINDEX_WORKER_RUN_ID": str(initialisation_worker_run.id),
-            **self.base_env
-        }
         self._build_task(
             slug=import_task_slug,
             command=initialisation_worker_run.version.docker_command,
             image=initialisation_worker_run.version.docker_image_iid,
             worker_run=initialisation_worker_run,
-            env=env,
         )
 
         # Distribute worker run tasks
@@ -316,41 +342,6 @@ class ProcessBuilder:
             ) for run in worker_runs
         ])
 
-    def build_s3(self):
-        from arkindex.process.models import WorkerVersion
-
-        env = {**self.base_env}
-        env["INGEST_S3_ACCESS_KEY"] = settings.INGEST_S3_ACCESS_KEY
-        env["INGEST_S3_SECRET_KEY"] = settings.INGEST_S3_SECRET_KEY
-        if settings.INGEST_S3_ENDPOINT:
-            env["INGEST_S3_ENDPOINT"] = settings.INGEST_S3_ENDPOINT
-        if settings.INGEST_S3_REGION:
-            env["INGEST_S3_REGION"] = settings.INGEST_S3_REGION
-
-        worker_run = self._create_fake_worker_run(worker_version=WorkerVersion.objects.imports_version)
-        env["ARKINDEX_WORKER_RUN_ID"] = str(worker_run.id)
-
-        command = (
-            "python -m arkindex_tasks.import_s3"
-            f" --corpus={self.process.corpus_id}"
-            f" --bucket={shlex.quote(self.process.bucket_name)}"
-            f" --folder-type={shlex.quote(self.process.folder_type.slug)}"
-            f" --page-type={shlex.quote(self.process.element_type.slug)}"
-            f" --iiif-base-url={shlex.quote(ImageServer.objects.ingest.url)}"
-        )
-        if settings.INGEST_PREFIX_BY_BUCKET_NAME:
-            command += " --bucket-prefix"
-        if self.process.prefix:
-            command += f" --prefix={shlex.quote(self.process.prefix)}"
-        if self.process.element:
-            command += f" --element={self.process.element_id}"
-        self._build_task(
-            command=command,
-            slug="import_s3",
-            env=env,
-        )
-        self._create_worker_versions_cache([(settings.IMPORTS_WORKER_VERSION, None, None)])
-
     @prefetch_worker_runs
     def build_dataset(self):
         worker_runs = list(self.process.worker_runs.all())
diff --git a/arkindex/process/managers.py b/arkindex/process/managers.py
index e817e2a2dd..ce7893b376 100644
--- a/arkindex/process/managers.py
+++ b/arkindex/process/managers.py
@@ -158,21 +158,31 @@ class WorkerResultSourceQuerySet(QuerySet):
 
 class WorkerVersionManager(Manager):
 
+    def _get_image_version(self, docker_image_iid, title):
+        from arkindex.process.models import WorkerVersionState
+        version = (
+            self
+            .select_related("worker", "revision")
+            .prefetch_related("revision__refs")
+            .get(docker_image_iid=docker_image_iid)
+        )
+        if version.state != WorkerVersionState.Available:
+            raise ValueError(f"The {title} worker version must be 'available'.")
+        return version
+
     @cached_property
     def init_elements_version(self):
         """
         WorkerVersion for elements initialization.
         """
-        from arkindex.process.models import WorkerVersionState
-        init_version = (
-            self
-            .select_related("worker", "revision")
-            .prefetch_related("revision__refs")
-            .get(docker_image_iid=settings.INIT_ELEMENTS_DOCKER_IMAGE)
-            )
-        if init_version.state != WorkerVersionState.Available:
-            raise ValueError("The elements initialization worker version must be 'available'.")
-        return init_version
+        return self._get_image_version(settings.INIT_ELEMENTS_DOCKER_IMAGE, "elements initialization")
+
+    @cached_property
+    def ingest_version(self):
+        """
+        WorkerVersion for S3 ingest processes.
+        """
+        return self._get_image_version(settings.INGEST_DOCKER_IMAGE, "S3 ingest")
 
     @cached_property
     def imports_version(self):
diff --git a/arkindex/process/tests/test_create_s3_import.py b/arkindex/process/tests/test_create_s3_import.py
index fcd875b64d..9c59dc94ba 100644
--- a/arkindex/process/tests/test_create_s3_import.py
+++ b/arkindex/process/tests/test_create_s3_import.py
@@ -7,7 +7,7 @@ from rest_framework import status
 from arkindex.documents.models import Corpus
 from arkindex.images.models import ImageServer
 from arkindex.ponos.models import Farm
-from arkindex.process.models import Process, ProcessMode, WorkerVersion
+from arkindex.process.models import Process, ProcessMode, Worker, WorkerType, WorkerVersion, WorkerVersionState
 from arkindex.project.tests import FixtureTestCase
 from arkindex.users.models import Role, Scope
 
@@ -17,7 +17,23 @@ class TestCreateS3Import(FixtureTestCase):
     @classmethod
     def setUpTestData(cls):
         super().setUpTestData()
-        cls.import_worker_version = WorkerVersion.objects.get(worker__slug="file_import")
+        cls.import_worker_version = WorkerVersion.objects.create(
+            worker=Worker.objects.create(
+                name="S3 Import",
+                slug="s3_import",
+                type=WorkerType.objects.get(slug="import"),
+            ),
+            version=1,
+            state=WorkerVersionState.Available,
+            docker_image_iid="registry.gitlab.teklia.com/arkindex/s4:latest",
+            configuration={
+                "docker": {
+                    "command": "rm -rf /",
+                },
+            },
+        )
+        ImageServer.objects.create(id=999, display_name="Ingest image server", url="https://dev.null.teklia.com")
+        cls.corpus.types.create(slug="folder", display_name="Folder", folder=True)
 
     def test_requires_login(self):
         with self.assertNumQueries(0):
@@ -122,7 +138,7 @@ class TestCreateS3Import(FixtureTestCase):
 
     @override_settings(
         PONOS_DEFAULT_ENV={},
-        ARKINDEX_TASKS_IMAGE="arkindex-tasks-image",
+        INGEST_DOCKER_IMAGE="registry.gitlab.teklia.com/arkindex/s4:latest",
         INGEST_IMAGESERVER_ID=999,
         INGEST_S3_ENDPOINT="http://s3.null.teklia.com",
         INGEST_S3_REGION=None,
@@ -133,11 +149,10 @@ class TestCreateS3Import(FixtureTestCase):
     def test_create(self):
         self.user.user_scopes.create(scope=Scope.S3Ingest)
         self.client.force_login(self.user)
-        ImageServer.objects.create(id=999, display_name="Ingest image server", url="https://dev.null.teklia.com")
         element = self.corpus.elements.get(name="Volume 1")
         farm = Farm.objects.create(name="Crypto farm")
 
-        with self.assertNumQueries(23), self.settings(IMPORTS_WORKER_VERSION=str(self.import_worker_version.id)):
+        with self.assertNumQueries(25):
             response = self.client.post(reverse("api:s3-import-create"), {
                 "corpus_id": str(self.corpus.id),
                 "element_id": str(element.id),
@@ -165,16 +180,18 @@ class TestCreateS3Import(FixtureTestCase):
         worker_run = process.worker_runs.get()
         self.assertEqual(worker_run.version, self.import_worker_version)
         self.assertListEqual(worker_run.parents, [])
-        self.assertIsNone(worker_run.configuration_id)
+        self.assertEqual(worker_run.configuration.name, f"Configuration for process {process.id}")
+        self.assertDictEqual(worker_run.configuration.configuration, {
+            "bucket": "blah",
+            "bucket_prefix": True,
+            "iiif_base_url": "https://dev.null.teklia.com",
+        })
         self.assertIsNone(worker_run.model_version_id)
 
         task = process.tasks.get()
         self.assertEqual(task.slug, "import_s3")
-        self.assertEqual(task.image, "arkindex-tasks-image")
-        self.assertEqual(task.command, f"python -m arkindex_tasks.import_s3 --corpus={self.corpus.id} "
-                                       "--bucket=blah --folder-type=volume --page-type=page "
-                                       "--iiif-base-url=https://dev.null.teklia.com --bucket-prefix "
-                                       f"--prefix=a/b/c --element={element.id}")
+        self.assertEqual(task.image, "registry.gitlab.teklia.com/arkindex/s4:latest")
+        self.assertEqual(task.command, "rm -rf /")
         self.assertDictEqual(task.env, {
             "ARKINDEX_CORPUS_ID": str(self.corpus.id),
             "ARKINDEX_PROCESS_ID": str(process.id),
@@ -187,22 +204,19 @@ class TestCreateS3Import(FixtureTestCase):
 
     @override_settings(
         PONOS_DEFAULT_ENV={},
-        ARKINDEX_TASKS_IMAGE="arkindex-tasks-image",
+        INGEST_DOCKER_IMAGE="registry.gitlab.teklia.com/arkindex/s4:latest",
         INGEST_IMAGESERVER_ID=999,
         INGEST_S3_ENDPOINT="http://s3.null.teklia.com",
         INGEST_S3_REGION=None,
         INGEST_S3_ACCESS_KEY="🔑",
         INGEST_S3_SECRET_KEY="its-secret-i-wont-tell-you",
-        IMPORTS_WORKER_VERSION="aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa",
         INGEST_PREFIX_BY_BUCKET_NAME=True,
     )
     def test_defaults(self):
         self.user.user_scopes.create(scope=Scope.S3Ingest)
         self.client.force_login(self.user)
-        self.corpus.types.create(slug="folder", display_name="Folder", folder=True)
-        ImageServer.objects.create(id=999, display_name="Ingest image server", url="https://dev.null.teklia.com")
 
-        with self.assertNumQueries(21), self.settings(IMPORTS_WORKER_VERSION=str(self.import_worker_version.id)):
+        with self.assertNumQueries(23):
             response = self.client.post(reverse("api:s3-import-create"), {
                 "corpus_id": str(self.corpus.id),
                 "bucket_name": "blah",
@@ -225,16 +239,18 @@ class TestCreateS3Import(FixtureTestCase):
         worker_run = process.worker_runs.get()
         self.assertEqual(worker_run.version, self.import_worker_version)
         self.assertListEqual(worker_run.parents, [])
-        self.assertIsNone(worker_run.configuration_id)
+        self.assertEqual(worker_run.configuration.name, f"Configuration for process {process.id}")
+        self.assertDictEqual(worker_run.configuration.configuration, {
+            "bucket": "blah",
+            "bucket_prefix": True,
+            "iiif_base_url": "https://dev.null.teklia.com",
+        })
         self.assertIsNone(worker_run.model_version_id)
 
         task = process.tasks.get()
         self.assertEqual(task.slug, "import_s3")
-        self.assertEqual(task.image, "arkindex-tasks-image")
-        self.assertEqual(task.command, f"python -m arkindex_tasks.import_s3 --corpus={self.corpus.id} "
-                                       "--bucket=blah --folder-type=folder --page-type=page "
-                                       "--iiif-base-url=https://dev.null.teklia.com "
-                                       "--bucket-prefix")
+        self.assertEqual(task.image, "registry.gitlab.teklia.com/arkindex/s4:latest")
+        self.assertEqual(task.command, "rm -rf /")
         self.assertDictEqual(task.env, {
             "ARKINDEX_CORPUS_ID": str(self.corpus.id),
             "ARKINDEX_PROCESS_ID": str(process.id),
@@ -250,12 +266,10 @@ class TestCreateS3Import(FixtureTestCase):
     def test_farm_guest(self, is_available_mock):
         self.user.user_scopes.create(scope=Scope.S3Ingest)
         self.client.force_login(self.user)
-        self.corpus.types.create(slug="folder", display_name="Folder", folder=True)
-        ImageServer.objects.create(id=999, display_name="Ingest image server", url="https://dev.null.teklia.com")
 
         farm = Farm.objects.create(name="Crypto farm")
 
-        with self.assertNumQueries(5), self.settings(IMPORTS_WORKER_VERSION=str(self.import_worker_version.id)):
+        with self.assertNumQueries(5):
             response = self.client.post(reverse("api:s3-import-create"), {
                 "corpus_id": str(self.corpus.id),
                 "bucket_name": "blah",
@@ -271,18 +285,19 @@ class TestCreateS3Import(FixtureTestCase):
         self.assertEqual(is_available_mock.call_count, 1)
         self.assertEqual(is_available_mock.call_args, call(self.user))
 
-    @override_settings(INGEST_IMAGESERVER_ID=999)
+    @override_settings(
+        INGEST_IMAGESERVER_ID=999,
+        INGEST_DOCKER_IMAGE="registry.gitlab.teklia.com/arkindex/s4:latest",
+    )
     @patch("arkindex.process.serializers.ingest.get_default_farm")
     def test_default_farm(self, get_default_farm_mock):
         self.user.user_scopes.create(scope=Scope.S3Ingest)
         self.client.force_login(self.user)
-        self.corpus.types.create(slug="folder", display_name="Folder", folder=True)
-        ImageServer.objects.create(id=999, display_name="Ingest image server", url="https://dev.null.teklia.com")
 
         default_farm = Farm.objects.create(name="Crypto farm")
         get_default_farm_mock.return_value = default_farm
 
-        with self.assertNumQueries(21), self.settings(IMPORTS_WORKER_VERSION=str(self.import_worker_version.id)):
+        with self.assertNumQueries(23):
             response = self.client.post(reverse("api:s3-import-create"), {
                 "corpus_id": str(self.corpus.id),
                 "bucket_name": "blah",
@@ -298,13 +313,11 @@ class TestCreateS3Import(FixtureTestCase):
     def test_default_farm_guest(self, get_default_farm_mock, is_available_mock):
         self.user.user_scopes.create(scope=Scope.S3Ingest)
         self.client.force_login(self.user)
-        self.corpus.types.create(slug="folder", display_name="Folder", folder=True)
-        ImageServer.objects.create(id=999, display_name="Ingest image server", url="https://dev.null.teklia.com")
 
         default_farm = Farm.objects.create(name="Crypto farm")
         get_default_farm_mock.return_value = default_farm
 
-        with self.assertNumQueries(4), self.settings(IMPORTS_WORKER_VERSION=str(self.import_worker_version.id)):
+        with self.assertNumQueries(4):
             response = self.client.post(reverse("api:s3-import-create"), {
                 "corpus_id": str(self.corpus.id),
                 "bucket_name": "blah",
diff --git a/arkindex/process/tests/test_processes.py b/arkindex/process/tests/test_processes.py
index ded528a565..39307558df 100644
--- a/arkindex/process/tests/test_processes.py
+++ b/arkindex/process/tests/test_processes.py
@@ -9,7 +9,6 @@ from django.utils import timezone
 from rest_framework import status
 
 from arkindex.documents.models import Corpus, ElementType
-from arkindex.images.models import ImageServer
 from arkindex.ponos.models import Farm, State
 from arkindex.process.models import (
     ActivityState,
@@ -1765,19 +1764,30 @@ class TestProcesses(FixtureAPITestCase):
             element_type=self.page_type,
             creator=self.user,
         )
-        process.worker_runs.create(version=self.version_with_model)
+        process.worker_runs.create(
+            version=self.elements_init_worker,
+            configuration=self.elements_init_worker.worker.configurations.create(
+                name="S3 configuration",
+                configuration={
+                    "bucket": "test",
+                    "bucket_prefix": False,
+                    "iiif_base_url": self.imgsrv.url,
+                },
+            ),
+        )
         process.tasks.create(state=State.Error, run=0, depth=0)
         self.assertEqual(process.state, State.Error)
         process.finished = timezone.now()
 
-        # Make sure the import version and the image server ar not cached to count all queries.
+        # Make sure the ingest version and the image server are not cached to count all queries.
         # This can occur when running tests in a different order.
         self.clear_caches()
 
         with (
             self.settings(
-                IMPORTS_WORKER_VERSION=str(self.version_with_model.id),
-                INGEST_IMAGESERVER_ID=str(ImageServer.objects.get(url="http://server").id),
+                INGEST_DOCKER_IMAGE=self.elements_init_worker.docker_image_iid,
+                INGEST_PREFIX_BY_BUCKET_NAME=False,
+                INGEST_IMAGESERVER_ID=self.imgsrv.id,
             ),
             self.assertNumQueries(15),
         ):
diff --git a/arkindex/project/checks.py b/arkindex/project/checks.py
index 2bf951a840..728e257925 100644
--- a/arkindex/project/checks.py
+++ b/arkindex/project/checks.py
@@ -319,3 +319,31 @@ def init_elements_check(*args, **kwargs):
         )]
 
     return []
+
+@register()
+@only_runserver
+def ingest_version_check(*args, **kwargs):
+    from django.conf import settings
+    docker_image_iid = settings.INGEST_DOCKER_IMAGE
+
+    warning = None
+
+    try:
+        WorkerVersion.objects.ingest_version
+    except WorkerVersion.DoesNotExist:
+        warning = f"Worker version with Docker image {docker_image_iid} does not exist."
+    except WorkerVersion.MultipleObjectsReturned:
+        warning = f"Multiple worker versions returned for Docker image {docker_image_iid}."
+    except ValueError:
+        warning = f"Worker version with Docker image {docker_image_iid} is not in the Available state."
+
+    if warning:
+        return [Warning(
+            f"{warning}\n"
+            "This worker version is required to import from S3 on Arkindex.\n"
+            "Starting or retrying S3 import processes will fail.",
+            hint=f"settings.INGEST_DOCKER_IMAGE = {docker_image_iid}",
+            id="arkindex.W015",
+        )]
+
+    return []
diff --git a/arkindex/project/config.py b/arkindex/project/config.py
index 1438c5960f..fde338a21c 100644
--- a/arkindex/project/config.py
+++ b/arkindex/project/config.py
@@ -183,6 +183,7 @@ def get_settings_parser(base_dir):
     docker_parser = parser.add_subparser("docker", default={})
     docker_parser.add_option("tasks_image", type=str, default="registry.gitlab.teklia.com/arkindex/tasks")
     docker_parser.add_option("init_elements_image", type=str, default="registry.gitlab.teklia.com/arkindex/workers/init-elements:latest")
+    docker_parser.add_option("ingest_image", type=str, default="registry.gitlab.teklia.com/arkindex/workers/import:latest")
 
     sentry_parser = parser.add_subparser("sentry", default={})
     sentry_parser.add_option("dsn", type=str, default=None)
diff --git a/arkindex/project/settings.py b/arkindex/project/settings.py
index 07c3fed517..636341adb0 100644
--- a/arkindex/project/settings.py
+++ b/arkindex/project/settings.py
@@ -512,6 +512,8 @@ PONOS_TASK_EXPIRY = conf["ponos"]["task_expiry"]
 ARKINDEX_TASKS_IMAGE = conf["docker"]["tasks_image"]
 # Docker image tag of the init elements worker
 INIT_ELEMENTS_DOCKER_IMAGE = conf["docker"]["init_elements_image"]
+# Docker image tag of the S3 ingest worker
+INGEST_DOCKER_IMAGE = conf["docker"]["ingest_image"]
 
 # Robots.txt options
 ROBOTS_TXT_DISALLOW = conf["robots_txt_disallow"]
diff --git a/arkindex/project/tests/__init__.py b/arkindex/project/tests/__init__.py
index f73d0734c4..b0e5f516f4 100644
--- a/arkindex/project/tests/__init__.py
+++ b/arkindex/project/tests/__init__.py
@@ -132,6 +132,10 @@ class FixtureMixin:
             del WorkerVersion.objects.init_elements_version
         except AttributeError:
             pass
+        try:
+            del WorkerVersion.objects.ingest_version
+        except AttributeError:
+            pass
         try:
             del ImageServer.objects.ingest
         except AttributeError:
diff --git a/arkindex/project/tests/config_samples/defaults.yaml b/arkindex/project/tests/config_samples/defaults.yaml
index bde7f69e89..26e0623d42 100644
--- a/arkindex/project/tests/config_samples/defaults.yaml
+++ b/arkindex/project/tests/config_samples/defaults.yaml
@@ -31,6 +31,7 @@ database:
   replica: null
   user: devuser
 docker:
+  ingest_image: registry.gitlab.teklia.com/arkindex/workers/import:latest
   init_elements_image: registry.gitlab.teklia.com/arkindex/workers/init-elements:latest
   tasks_image: registry.gitlab.teklia.com/arkindex/tasks
 doorbell:
diff --git a/arkindex/project/tests/config_samples/override.yaml b/arkindex/project/tests/config_samples/override.yaml
index ecee59be8d..189ad21095 100644
--- a/arkindex/project/tests/config_samples/override.yaml
+++ b/arkindex/project/tests/config_samples/override.yaml
@@ -37,6 +37,7 @@ database:
     user: postgres
   user: littlebobbytables
 docker:
+  ingest_image: yrtsiger.baltig.ailket.moc/xednikra/tropmi:tsetal
   init_elements_image: registry.gitlab.teklia.com/arkindex/entry_plug:latest
   tasks_image: registry.gitlab.teklia.com/arkindex/stonks
 doorbell:
diff --git a/arkindex/project/tests/test_checks.py b/arkindex/project/tests/test_checks.py
index d0527725e4..98e42f4196 100644
--- a/arkindex/project/tests/test_checks.py
+++ b/arkindex/project/tests/test_checks.py
@@ -520,3 +520,129 @@ class ChecksTestCase(TestCase):
                     id="arkindex.W014",
                 ),
             ])
+
+    def test_ingest_worker_version_check_ok(self):
+        from arkindex.process.models import Worker, WorkerType, WorkerVersion, WorkerVersionState
+        from arkindex.project.checks import ingest_version_check
+
+        with self.settings(INGEST_DOCKER_IMAGE="registry.teklia.gitlab.com/submarine-reflection:latest"):
+            try:
+                del WorkerVersion.objects.ingest_version
+            except AttributeError:
+                pass
+
+            import_type = WorkerType.objects.create(slug="import", display_name="Import")
+            WorkerVersion.objects.create(
+                worker=Worker.objects.create(
+                    name="S3 Import Worker",
+                    slug="s3-import",
+                    type=import_type,
+                ),
+                version=1,
+                state=WorkerVersionState.Available,
+                docker_image_iid=settings.INGEST_DOCKER_IMAGE
+            )
+            self.assertListEqual(ingest_version_check(), [])
+
+    def test_ingest_worker_version_check_missing(self):
+        """
+        There is a warning when no worker version with the docker_image_iid settings.INGEST_DOCKER_IMAGE exists
+        """
+        from arkindex.process.models import WorkerVersion
+        from arkindex.project.checks import ingest_version_check
+
+        with self.settings(INGEST_DOCKER_IMAGE="registry.teklia.gitlab.com/submarine-reflection:latest"):
+            try:
+                del WorkerVersion.objects.ingest_version
+            except AttributeError:
+                pass
+
+            self.assertListEqual(ingest_version_check(), [
+                Warning(
+                    "Worker version with Docker image registry.teklia.gitlab.com/submarine-reflection:latest does not exist.\n"
+                    "This worker version is required to import from S3 on Arkindex.\n"
+                    "Starting or retrying S3 import processes will fail.",
+                    hint="settings.INGEST_DOCKER_IMAGE = registry.teklia.gitlab.com/submarine-reflection:latest",
+                    id="arkindex.W015",
+                ),
+            ])
+
+    def test_ingest_worker_version_check_unavailable(self):
+        """
+        There is a warning when the worker version exists but is not available
+        """
+        from arkindex.process.models import Worker, WorkerType, WorkerVersion, WorkerVersionState
+        from arkindex.project.checks import ingest_version_check
+
+        with self.settings(INGEST_DOCKER_IMAGE="registry.teklia.gitlab.com/submarine-reflection:latest"):
+            try:
+                del WorkerVersion.objects.ingest_version
+            except AttributeError:
+                pass
+
+            import_type = WorkerType.objects.create(slug="import", display_name="Import")
+            WorkerVersion.objects.create(
+                worker=Worker.objects.create(
+                    name="S3 Import Worker",
+                    slug="s3-import",
+                    type=import_type,
+                ),
+                version=1,
+                state=WorkerVersionState.Created,
+                docker_image_iid=settings.INGEST_DOCKER_IMAGE
+            )
+            self.assertListEqual(ingest_version_check(), [
+                Warning(
+                    "Worker version with Docker image registry.teklia.gitlab.com/submarine-reflection:latest is not in the Available state.\n"
+                    "This worker version is required to import from S3 on Arkindex.\n"
+                    "Starting or retrying S3 import processes will fail.",
+                    hint="settings.INGEST_DOCKER_IMAGE = registry.teklia.gitlab.com/submarine-reflection:latest",
+                    id="arkindex.W015",
+                ),
+            ])
+
+    def test_ingest_worker_version_check_multiple(self):
+        """
+        There is a warning if more than one worker version exists with the docker_image_iid setting.INGEST_DOCKER_IMAGE
+        """
+        from arkindex.process.models import Worker, WorkerType, WorkerVersion, WorkerVersionState
+        from arkindex.project.checks import ingest_version_check
+
+        with self.settings(INGEST_DOCKER_IMAGE="registry.teklia.gitlab.com/submarine-reflection:latest"):
+            try:
+                del WorkerVersion.objects.ingest_version
+            except AttributeError:
+                pass
+
+            import_type = WorkerType.objects.create(slug="import", display_name="Import")
+            WorkerVersion.objects.create(
+                    worker=Worker.objects.create(
+                    name="S3 Import Worker",
+                    slug="s3-import",
+                    type=import_type,
+                ),
+                version=1,
+                state=WorkerVersionState.Available,
+                docker_image_iid=settings.INGEST_DOCKER_IMAGE
+            )
+            WorkerVersion.objects.create(
+                worker=Worker.objects.create(
+                    name="Shine Aqua Illusion",
+                    slug="shineaqua",
+                    type=import_type,
+                ),
+                version=1,
+                state=WorkerVersionState.Created,
+                docker_image_iid=settings.INGEST_DOCKER_IMAGE
+            )
+
+            self.assertEqual(WorkerVersion.objects.filter(docker_image_iid="registry.teklia.gitlab.com/submarine-reflection:latest").count(), 2)
+            self.assertListEqual(ingest_version_check(), [
+                Warning(
+                    "Multiple worker versions returned for Docker image registry.teklia.gitlab.com/submarine-reflection:latest.\n"
+                    "This worker version is required to import from S3 on Arkindex.\n"
+                    "Starting or retrying S3 import processes will fail.",
+                    hint="settings.INGEST_DOCKER_IMAGE = registry.teklia.gitlab.com/submarine-reflection:latest",
+                    id="arkindex.W015",
+                ),
+            ])
-- 
GitLab