Skip to content
Snippets Groups Projects
Commit 32092ddd authored by Erwan Rouchet's avatar Erwan Rouchet Committed by Bastien Abadie
Browse files

update_system_workers command

parent a6676ea0
No related branches found
No related tags found
1 merge request!2430update_system_workers command
......@@ -7,3 +7,4 @@ recursive-include arkindex/templates *.html
recursive-include arkindex/templates *.json
recursive-include arkindex/templates *.txt
include arkindex/documents/export/*.sql
include arkindex/system_workers.yml
from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
from django.db import transaction
from django.db.models import Max
from teklia_toolbox.config import ConfigParser
from arkindex.process.models import ArkindexFeature, FeatureUsage, Worker, WorkerType, WorkerVersion, WorkerVersionState
def parse_config():
parser = ConfigParser()
parser.add_option("version", type=str)
feature_parser = parser.add_subparser("features")
for feature in ArkindexFeature:
# Remove this after file imports are migrated to a worker
if feature == ArkindexFeature.FileImport:
continue
feature_parser.add_option(feature.value, type=str)
return parser.parse(settings.BASE_DIR / "system_workers.yml")
class Command(BaseCommand):
help = "Update the workers used to provide Arkindex features to the versions compatible with this release."
def get_system_worker(self, feature: ArkindexFeature) -> Worker:
"""
Update or create a `system` worker for this feature.
Creates the `system` worker type if it does not exist.
"""
try:
conflicting_worker = Worker.objects.exclude(type__slug="system").filter(slug=feature.value).get()
self.stdout.write(f"Worker {conflicting_worker.name} ({conflicting_worker.id}) uses the same slug with a different worker type.")
self.stdout.write("Please update this worker's type to `system` or change its slug.")
raise CommandError(f"Cannot create system worker with slug {feature.value} due to a conflict.")
except Worker.DoesNotExist:
pass
worker_type, created = WorkerType.objects.get_or_create(
slug="system",
defaults={"display_name": "System"},
)
if created:
self.stdout.write(f"Created new System worker type ({worker_type.id})")
worker, created = Worker.objects.get_or_create(
type=worker_type,
slug=feature.value,
defaults={
"name": feature.name,
"public": True,
},
)
if created:
self.stdout.write(f"Created new {worker.name} system worker")
else:
self.stdout.write(f"Using existing system worker {worker.name}")
self.update_existing_worker(worker)
return worker
def update_existing_worker(self, worker: Worker) -> None:
"""
Check that an existing worker has the correct attributes to have one of its versions provide a feature
"""
updated = False
if worker.archived is not None:
self.stdout.write("Unarchiving worker")
worker.archived = None
updated = True
if not worker.public:
self.stdout.write("Marking worker as public")
worker.public = True
updated = True
if updated:
worker.save()
else:
self.stdout.write("Worker is up to date")
def update_or_create_version(self, worker: Worker, feature: ArkindexFeature, docker_image: str) -> None:
"""
On a specified worker, assigns an existing version to a feature or creates a new one.
Expects that no version is already assigned to this feature on any worker.
"""
assert worker.archived is None, "Cannot assign a version on an archived worker to a feature"
# docker_image_iid is not unique, so we sort by state and take the most recent one.
# Sorting by state means we prefer `available` versions first, then `created`, then `error`.
worker_version = worker.versions.filter(
docker_image_iid=docker_image,
# We ignore existing versions with attributes that could interfere with the features,
# like blocking a start or retry, or wasting resources.
configuration__docker__command__isnull=True,
configuration__user_configuration__isnull=True,
gpu_usage=FeatureUsage.Disabled,
model_usage=FeatureUsage.Disabled,
).order_by("state", "-updated").first()
if worker_version:
self.stdout.write(f"Assigning existing worker version {worker_version.id} to the feature")
worker_version.feature = feature
if worker_version.state != WorkerVersionState.Available:
self.stdout.write("Marking the worker version as available")
worker_version.state = WorkerVersionState.Available
worker_version.save()
self.stdout.write(self.style.SUCCESS(f"Using existing worker version {worker_version.id}"))
return
self.stdout.write("Creating new worker version")
max_version = worker.versions.aggregate(max_version=Max("version"))["max_version"] or 0
worker_version = worker.versions.create(
docker_image_iid=docker_image,
feature=feature,
state=WorkerVersionState.Available,
version=max_version + 1,
)
self.stdout.write(self.style.SUCCESS(f"Using new worker version {worker_version.id}"))
def check_existing_version(self, worker_version: WorkerVersion, feature: ArkindexFeature, docker_image: str) -> None:
self.stdout.write(f"Current worker version: {worker_version.id} ({worker_version.docker_image_iid})")
valid = True
if worker_version.docker_image_iid != docker_image:
self.stdout.write(self.style.WARNING("This version has an invalid Docker image."))
valid = False
if worker_version.worker.archived is not None:
self.stdout.write(self.style.WARNING("This version is part of an archived worker."))
valid = False
if worker_version.configuration.get("docker", {}).get("command"):
self.stdout.write(self.style.WARNING("This version uses a custom Docker command which could interfere with the feature."))
valid = False
if worker_version.required_user_configuration_fields:
self.stdout.write(self.style.WARNING("This version requires a custom worker configuration which could interfere with the feature."))
valid = False
if valid:
# Ensure the worker is public, and keep using this worker version
self.update_existing_worker(worker_version.worker)
self.stdout.write(self.style.SUCCESS(f"Worker version for {feature.name} is up to date"))
return
self.stdout.write("Unassigning feature from the current version")
worker_version.feature = None
worker_version.save(update_fields=["feature"])
# Create a new version on the existing worker, even if it isn't a system worker
worker = worker_version.worker
if worker.archived is not None:
# except if it is archived, since the new version would be invalid
worker = self.get_system_worker(feature)
else:
self.update_existing_worker(worker_version.worker)
self.update_or_create_version(worker, feature, docker_image)
@transaction.atomic
def update_feature(self, feature: ArkindexFeature, docker_image: str):
self.stdout.write(f" {feature.name} ".center(80, ""))
self.stdout.write(f"Using {docker_image} to provide {feature.name}")
try:
worker_version = WorkerVersion.objects.get_by_feature(feature)
self.check_existing_version(worker_version, feature, docker_image)
except WorkerVersion.DoesNotExist:
worker = self.get_system_worker(feature)
self.update_or_create_version(worker, feature, docker_image)
def handle(self, *args, **options):
config = parse_config()
for feature_value, docker_image in config["features"].items():
feature = ArkindexFeature(feature_value)
self.update_feature(feature, docker_image)
......@@ -780,6 +780,22 @@ class WorkerVersion(models.Model):
return
return self.configuration["docker"].get("shm_size")
@property
def required_user_configuration_fields(self):
"""
Set of field names defined in this worker version's user configuration
which are required and without a default value.
A worker configuration will need to be set on the worker run with those fields set
before this worker version can be executed.
"""
return set(
name
for name, options in self.configuration.get("user_configuration", {}).items()
# Only pick the fields that are required and without default values
if options.get("required") and "default" not in options
)
class WorkerConfiguration(IndexableModel):
name = models.CharField(max_length=250, validators=[MinLengthValidator(1)])
......
......@@ -415,20 +415,13 @@ class StartProcessSerializer(serializers.Serializer):
# If the worker version has a user configuration, check that we aren't missing any required fields
if isinstance(worker_run.version.configuration.get("user_configuration"), dict):
required_fields = set(
name
for name, options in worker_run.version.configuration["user_configuration"].items()
# Only pick the fields that are required and without default values
if options.get("required") and "default" not in options
)
# List all the fields defined on the WorkerRun's configuration if there is one
worker_run_fields = set()
if worker_run.configuration is not None:
worker_run_fields = set(worker_run.configuration.configuration.keys())
# Check that all the worker version's required fields are set in that configuration
if not required_fields.issubset(worker_run_fields):
if not worker_run.version.required_user_configuration_fields.issubset(worker_run_fields):
missing_required_configurations.append(worker_run.version.worker.name)
if len(missing_model_versions) > 0:
......
This diff is collapsed.
......@@ -332,3 +332,21 @@ def ingest_version_check(*args, **kwargs):
)]
return []
@register()
def update_system_workers_check(*args, **kwargs):
from django.conf import settings
from arkindex.process.management.commands.update_system_workers import parse_config
if settings.VERSION == parse_config()["version"]:
return []
return [Error(
"The Docker images specified for workers providing features in `update_system_workers` "
f"have not been verified as up to date for Arkindex version {settings.VERSION}.",
hint="Update `arkindex/system_workers.yml` to the latest versions of the feature workers, "
f"then update its `version` to {settings.VERSION!r}.",
id="arkindex.E012",
)]
......@@ -110,22 +110,11 @@ class _AssertExactQueriesContext(CaptureQueriesContext):
self.test_case.assertEqual(expected_sql, actual_sql)
class FixtureMixin:
class ArkindexTestMixin:
"""
Add the database fixture to a test case
Adds various custom extensions to test cases
"""
fixtures = ["data.json", ]
@classmethod
def setUpTestData(cls):
super().setUpTestData()
cls.corpus = Corpus.objects.get(name="Unit Tests")
cls.user = User.objects.get(email="user@user.fr")
cls.group = Group.objects.get(name="User group")
cls.superuser = User.objects.get(email="root@root.fr")
cls.imgsrv = ImageServer.objects.get(url="http://server")
def clear_caches(self):
# Clean content type cache for SQL requests checks consistency
ContentType.objects.clear_cache()
......@@ -212,15 +201,44 @@ class FixtureMixin:
func(*args, **kwargs)
class FixtureTestCase(FixtureMixin, TestCase):
class FixtureMixin:
"""
Add the database fixture to a test case
"""
fixtures = ["data.json", ]
@classmethod
def setUpTestData(cls):
super().setUpTestData()
cls.corpus = Corpus.objects.get(name="Unit Tests")
cls.user = User.objects.get(email="user@user.fr")
cls.group = Group.objects.get(name="User group")
cls.superuser = User.objects.get(email="root@root.fr")
cls.imgsrv = ImageServer.objects.get(url="http://server")
class ArkindexTestCase(ArkindexTestMixin, TestCase):
"""
Django test case with custom Arkindex extensions
"""
class ArkindexAPITestCase(ArkindexTestMixin, APITestCase):
"""
Django REST Framework test case with custom Arkindex extensions
"""
class FixtureTestCase(FixtureMixin, ArkindexTestCase):
"""
Django test case with the database fixture
Django test case with the database fixture and custom Arkindex extensions
"""
class FixtureAPITestCase(FixtureMixin, APITestCase):
class FixtureAPITestCase(FixtureMixin, ArkindexAPITestCase):
"""
Django REST Framework test case with the database fixture
Django REST Framework test case with the database fixture and custom Arkindex extensions
"""
......
......@@ -457,3 +457,25 @@ class ChecksTestCase(TestCase):
from arkindex.project.checks import ingest_version_check
self.assertListEqual(ingest_version_check(), [])
def test_update_system_workers_check(self):
"""
The Docker images in the `arkindex update_system_workers` must be manually verified
as matching the current Arkindex release.
When the tests run, running the check should never return anything.
"""
from arkindex.project.checks import update_system_workers_check
self.assertListEqual(update_system_workers_check(), [])
@override_settings(VERSION="0.0.0")
def test_update_system_workers_check_error(self):
from arkindex.project.checks import update_system_workers_check
self.assertListEqual(update_system_workers_check(), [Error(
"The Docker images specified for workers providing features in `update_system_workers` "
"have not been verified as up to date for Arkindex version 0.0.0.",
hint="Update `arkindex/system_workers.yml` to the latest versions of the feature workers, "
"then update its `version` to '0.0.0'.",
id="arkindex.E012",
)])
# When releasing Arkindex, check that the Docker images set here are up to date,
# then update the `version` to the current Arkindex version as set in the `VERSION` file
# to confirm that the images have been manually checked.
version: 1.6.3-beta2
features:
init_elements: registry.gitlab.teklia.com/arkindex/workers/init-elements:0.1.0
s3_ingest: registry.gitlab.teklia.com/arkindex/workers/import/s3:0.1.0
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment