Skip to content
Snippets Groups Projects

Support feature worker declaration from gitlab.teklia.com

Merged Valentin Rigal requested to merge teklia-workers into release-1.7.1
All threads resolved!
Files
3
from collections import defaultdict
import requests
import yaml
from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
from django.db import transaction
from django.db.models import Max
from teklia_toolbox.config import ConfigParser
from teklia_toolbox.config import ConfigParser, ConfigurationError
from arkindex.process.models import ArkindexFeature, FeatureUsage, Worker, WorkerType, WorkerVersion, WorkerVersionState
REQUEST_TIMEOUT = (30, 60)
WORKER_YAML_VERSION = 2
def parse_config():
parser = ConfigParser()
@@ -15,16 +22,33 @@ def parse_config():
features_parser = parser.add_subparser("features")
for feature in ArkindexFeature:
feature_parser = features_parser.add_subparser(feature.value, allow_extra_keys=False, default={})
feature_parser.add_option("image", type=str)
feature_parser.add_option("image", type=str, default=None)
feature_parser.add_option("command", type=str, default=None)
return parser.parse(settings.BASE_DIR / "system_workers.yml")
teklia_worker_parser = feature_parser.add_subparser("teklia_worker", allow_extra_keys=False, default=None)
teklia_worker_parser.add_option("name", type=str, default=None)
teklia_worker_parser.add_option("version", type=str, default=None)
teklia_worker_parser.add_option("slug", type=str, default=None)
parsed = parser.parse(settings.BASE_DIR / "system_workers.yml")
errors = defaultdict(list)
for feature, config in parsed["features"].items():
if config["image"] and config["teklia_worker"]:
errors[feature].append("Exactly one of image/command or teklia_parser must be set")
continue
if (config["command"] and config["image"] is None):
errors[feature].append("command argument must be set with the image argument")
if (subparser := config["teklia_worker"]) and (None in subparser.values()):
errors[feature].append("teklia_parser configuration must define a name, a version and a slug")
if errors:
raise ConfigurationError(errors)
return parsed
class Command(BaseCommand):
help = "Update the workers used to provide Arkindex features to the versions compatible with this release."
def get_system_worker(self, feature: ArkindexFeature) -> Worker:
def get_system_worker(self, feature: ArkindexFeature, repo = None) -> Worker:
"""
Update or create a `system` worker for this feature.
Creates the `system` worker type if it does not exist.
@@ -48,6 +72,7 @@ class Command(BaseCommand):
worker, created = Worker.objects.get_or_create(
type=worker_type,
slug=feature.value,
repository_url=repo,
defaults={
"name": feature.name,
"public": True,
@@ -58,7 +83,7 @@ class Command(BaseCommand):
self.stdout.write(f"Created new {worker.name} system worker")
else:
self.stdout.write(f"Using existing system worker {worker.name}")
self.update_existing_worker(worker)
self. update_existing_worker(worker)
return worker
@@ -83,7 +108,7 @@ class Command(BaseCommand):
else:
self.stdout.write("Worker is up to date")
def update_or_create_version(self, worker: Worker, feature: ArkindexFeature, docker_image: str, docker_command: str = None) -> None:
def update_or_create_version(self, worker: Worker, feature: ArkindexFeature, docker_image: str, docker_command: str = None, configuration: dict = {}) -> None:
"""
On a specified worker, assigns an existing version to a feature or creates a new one.
Expects that no version is already assigned to this feature on any worker.
@@ -97,11 +122,9 @@ class Command(BaseCommand):
# Sorting by state means we prefer `available` versions first, then `created`, then `error`.
worker_version = worker.versions.filter(
docker_image_iid=docker_image,
# We ignore existing versions with attributes that could interfere with the features,
# like blocking a start or retry, or wasting resources.
configuration__user_configuration__isnull=True,
gpu_usage=FeatureUsage.Disabled,
model_usage=FeatureUsage.Disabled,
configuration=configuration,
**docker_command_filter
).order_by("state", "-updated").first()
@@ -130,11 +153,11 @@ class Command(BaseCommand):
feature=feature,
state=WorkerVersionState.Available,
version=max_version + 1,
configuration={"docker": {"command": docker_command}} if docker_command else {}
configuration=configuration,
)
self.stdout.write(self.style.SUCCESS(f"Using new worker version {worker_version.id}"))
def check_existing_version(self, worker_version: WorkerVersion, feature: ArkindexFeature, docker_image: str, docker_command: str = None) -> None:
def check_existing_version(self, worker_version: WorkerVersion, feature: ArkindexFeature, *, docker_image: str, docker_command: str = None, configuration: dict = {}, repo: str = None) -> None:
self.stdout.write(f"Current worker version: {worker_version.id} ({worker_version.docker_image_iid})")
valid = True
@@ -158,8 +181,10 @@ class Command(BaseCommand):
self.stdout.write(self.style.WARNING("This version uses a custom Docker command which could interfere with the feature."))
valid = False
if worker_version.required_user_configuration_fields:
self.stdout.write(self.style.WARNING("This version requires a custom worker configuration which could interfere with the feature."))
if worker_version.configuration != configuration:
self.stdout.write(self.style.WARNING(
"This version uses a custom configuration which could interfere with the feature."
))
valid = False
if valid:
@@ -176,26 +201,64 @@ class Command(BaseCommand):
worker = worker_version.worker
if worker.archived is not None:
# except if it is archived, since the new version would be invalid
worker = self.get_system_worker(feature)
worker = self.get_system_worker(feature, repo)
else:
self.update_existing_worker(worker_version.worker)
self.update_or_create_version(worker, feature, docker_image, docker_command)
self.update_or_create_version(worker, feature, docker_image, docker_command, configuration)
@transaction.atomic
def update_feature(self, feature: ArkindexFeature, config: dict):
self.stdout.write(f" {feature.name} ".center(80, ""))
self.stdout.write(f"Using {config['image']} to provide {feature.name}")
def update_feature(self, feature: ArkindexFeature, *, image, command, configuration = {}, repo = None):
self.stdout.write(f"Using {image} to provide {feature.name}")
try:
worker_version = WorkerVersion.objects.get_by_feature(feature)
self.check_existing_version(worker_version, feature, config["image"], config["command"])
self.check_existing_version(
worker_version,
feature,
docker_image=image,
docker_command=command,
configuration=configuration
)
except WorkerVersion.DoesNotExist:
worker = self.get_system_worker(feature)
self.update_or_create_version(worker, feature, config["image"], config["command"])
worker = self.get_system_worker(feature, repo)
self.update_or_create_version(worker, feature, image, command, configuration)
def update_feature_from_worker(self, feature, *, name, version, slug):
"""
Update a feature from a worker repository hosted on https://gitlab.teklia.com/
"""
repo = f"https://gitlab.teklia.com/{name}"
self.stdout.write(f"Configuring feature {feature} from {repo}")
# Retrieve the .arkindex.yml file with no auth
url = f"{repo}/-/raw/{version}/.arkindex.yml"
with requests.get(url, timeout=REQUEST_TIMEOUT) as resp:
try:
resp.raise_for_status()
except requests.exceptions.HTTPError as e:
raise CommandError(f"Error retrieving configuration at {url}: {e.response.status_code}.")
data = yaml.safe_load(resp.content)
if not isinstance(data, dict) or data.get("version", 0) < WORKER_YAML_VERSION or not data.get("workers"):
raise CommandError(f"Error retrieving configuration at {url}: invalid YAML configuration.")
# Look for the worker matching feature's slug
worker_conf = next((worker for worker in data["workers"] if worker["slug"] == slug), None)
if worker_conf is None:
raise CommandError(f"No worker with slug {slug} in .arkindex.yml at {url}.")
image = f"registry.gitlab.teklia.com/{name}/{version}"
command = worker_conf.get("docker", {}).get("command", None)
self.update_feature(feature, image=image, command=command, configuration=worker_conf, repo=repo)
def handle(self, *args, **options):
config = parse_config()
for feature_value, feature_config in config["features"].items():
feature = ArkindexFeature(feature_value)
self.update_feature(feature, feature_config)
self.stdout.write(f" {feature.name} ".center(80, ""))
worker = feature_config.pop("teklia_worker", None)
if worker is not None:
self.update_feature_from_worker(feature, **worker)
else:
configuration = {}
if (command := feature_config["command"]):
configuration["docker"] = {"command": command}
self.update_feature(feature, **feature_config, configuration=configuration)
Loading