Skip to content
Snippets Groups Projects

Support feature worker declaration from gitlab.teklia.com

Merged Valentin Rigal requested to merge teklia-workers into release-1.7.1
All threads resolved!
1 file
+ 20
13
Compare changes
  • Side-by-side
  • Inline
@@ -107,7 +107,7 @@ class Command(BaseCommand):
else:
self.stdout.write("Worker is up to date")
def update_or_create_version(self, worker: Worker, feature: ArkindexFeature, docker_image: str, docker_command: str = None, configuration: dict = None) -> None:
def update_or_create_version(self, worker: Worker, feature: ArkindexFeature, docker_image: str, docker_command: str = None, configuration: dict = {}) -> None:
"""
On a specified worker, assigns an existing version to a feature or creates a new one.
Expects that no version is already assigned to this feature on any worker.
@@ -121,11 +121,9 @@ class Command(BaseCommand):
# Sorting by state means we prefer `available` versions first, then `created`, then `error`.
worker_version = worker.versions.filter(
docker_image_iid=docker_image,
# We ignore existing versions with attributes that could interfere with the features,
# like blocking a start or retry, or wasting resources.
configuration__user_configuration__isnull=True,
gpu_usage=FeatureUsage.Disabled,
model_usage=FeatureUsage.Disabled,
configuration=configuration,
**docker_command_filter
).order_by("state", "-updated").first()
@@ -149,16 +147,18 @@ class Command(BaseCommand):
self.stdout.write("Creating new worker version")
max_version = worker.versions.aggregate(max_version=Max("version"))["max_version"] or 0
if configuration.get("docker") is None and docker_command:
configuration["docker"] = {"command": docker_command}
worker_version = worker.versions.create(
docker_image_iid=docker_image,
feature=feature,
state=WorkerVersionState.Available,
version=max_version + 1,
configuration={"docker": {"command": docker_command}} if docker_command else {}
configuration=configuration,
)
self.stdout.write(self.style.SUCCESS(f"Using new worker version {worker_version.id}"))
def check_existing_version(self, worker_version: WorkerVersion, feature: ArkindexFeature, docker_image: str, docker_command: str = None, configuration: dict = None) -> None:
def check_existing_version(self, worker_version: WorkerVersion, feature: ArkindexFeature, docker_image: str, docker_command: str = None, configuration: dict = {}) -> None:
self.stdout.write(f"Current worker version: {worker_version.id} ({worker_version.docker_image_iid})")
valid = True
@@ -182,8 +182,10 @@ class Command(BaseCommand):
self.stdout.write(self.style.WARNING("This version uses a custom Docker command which could interfere with the feature."))
valid = False
if worker_version.required_user_configuration_fields:
self.stdout.write(self.style.WARNING("This version requires a custom worker configuration which could interfere with the feature."))
if worker_version.configuration != configuration:
self.stdout.write(self.style.WARNING(
"This version uses a custom configuration which could interfere with the feature."
))
valid = False
if valid:
@@ -204,7 +206,7 @@ class Command(BaseCommand):
else:
self.update_existing_worker(worker_version.worker)
self.update_or_create_version(worker, feature, docker_image, docker_command)
self.update_or_create_version(worker, feature, docker_image, docker_command, configuration)
@transaction.atomic
def update_feature(self, feature: ArkindexFeature, config: dict):
@@ -212,10 +214,16 @@ class Command(BaseCommand):
try:
worker_version = WorkerVersion.objects.get_by_feature(feature)
self.check_existing_version(worker_version, feature, config["image"], config["command"], config["configuration"])
self.check_existing_version(
worker_version,
feature,
config["image"],
config["command"],
config.get("configuration", {}),
)
except WorkerVersion.DoesNotExist:
worker = self.get_system_worker(feature)
self.update_or_create_version(worker, feature, config["image"], config["command"], config["config uration"])
self.update_or_create_version(worker, feature, config["image"], config["command"], config["configuration"])
def update_feature_from_worker(self, feature, *, name, version, slug):
"""
@@ -240,8 +248,7 @@ class Command(BaseCommand):
raise CommandError(f"No worker with slug {slug} in .arkindex.yml at {url}.")
image = f"registry.gitlab.teklia.com/{name}/{version}"
command = worker_conf.get("command", None)
configuration = worker_conf.get("user_configuration", {})
self.update_feature(feature, {"image": image, "command": command, configuration: configuration})
self.update_feature(feature, {"image": image, "command": command, "configuration": worker_conf})
def handle(self, *args, **options):
config = parse_config()
Loading