Skip to content
Snippets Groups Projects

Support feature worker declaration from gitlab.teklia.com

Merged Valentin Rigal requested to merge teklia-workers into release-1.7.1
All threads resolved!
1 file
+ 44
9
Compare changes
  • Side-by-side
  • Inline
from collections import defaultdict
import requests
import yaml
from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
from django.db import transaction
@@ -8,6 +10,9 @@ from teklia_toolbox.config import ConfigParser, ConfigurationError
from arkindex.process.models import ArkindexFeature, FeatureUsage, Worker, WorkerType, WorkerVersion, WorkerVersionState
REQUEST_TIMEOUT = (30, 60)
WORKER_YAML_VERSION = 2
def parse_config():
parser = ConfigParser()
@@ -31,10 +36,10 @@ def parse_config():
if config["image"] and config["teklia_worker"]:
errors[feature].append("Exactly one of image/command or teklia_parser must be set")
continue
if (subparser := config["teklia_worker"]) and (None in subparser.values()):
errors[feature].append("teklia_parser configuration must define a name, a version and a slug")
if (config["command"] and config["image"] is None):
errors[feature].append("command argument must be set with the image argument")
if (subparser := config["teklia_worker"]) and (None in subparser.values()):
errors[feature].append("teklia_parser configuration must define a name, a version and a slug")
if errors:
raise ConfigurationError(errors)
return parsed
@@ -77,7 +82,7 @@ class Command(BaseCommand):
self.stdout.write(f"Created new {worker.name} system worker")
else:
self.stdout.write(f"Using existing system worker {worker.name}")
self.update_existing_worker(worker)
self. update_existing_worker(worker)
return worker
@@ -102,7 +107,7 @@ class Command(BaseCommand):
else:
self.stdout.write("Worker is up to date")
def update_or_create_version(self, worker: Worker, feature: ArkindexFeature, docker_image: str, docker_command: str = None) -> None:
def update_or_create_version(self, worker: Worker, feature: ArkindexFeature, docker_image: str, docker_command: str = None, configuration: dict = None) -> None:
"""
On a specified worker, assigns an existing version to a feature or creates a new one.
Expects that no version is already assigned to this feature on any worker.
@@ -153,7 +158,7 @@ class Command(BaseCommand):
)
self.stdout.write(self.style.SUCCESS(f"Using new worker version {worker_version.id}"))
def check_existing_version(self, worker_version: WorkerVersion, feature: ArkindexFeature, docker_image: str, docker_command: str = None) -> None:
def check_existing_version(self, worker_version: WorkerVersion, feature: ArkindexFeature, docker_image: str, docker_command: str = None, configuration: dict = None) -> None:
self.stdout.write(f"Current worker version: {worker_version.id} ({worker_version.docker_image_iid})")
valid = True
@@ -203,18 +208,48 @@ class Command(BaseCommand):
@transaction.atomic
def update_feature(self, feature: ArkindexFeature, config: dict):
self.stdout.write(f" {feature.name} ".center(80, ""))
self.stdout.write(f"Using {config['image']} to provide {feature.name}")
try:
worker_version = WorkerVersion.objects.get_by_feature(feature)
self.check_existing_version(worker_version, feature, config["image"], config["command"])
self.check_existing_version(worker_version, feature, config["image"], config["command"], config["configuration"])
except WorkerVersion.DoesNotExist:
worker = self.get_system_worker(feature)
self.update_or_create_version(worker, feature, config["image"], config["command"])
self.update_or_create_version(worker, feature, config["image"], config["command"], config["config uration"])
def update_feature_from_worker(self, feature, *, name, version, slug):
"""
Update a feature from a worker repository hosted on https://gitlab.teklia.com/
"""
repo = f"https://gitlab.teklia.com/{name}"
self.stdout.write(f"Configuring feature {feature} from {repo}")
# Retrieve the .arkindex.yml file with no auth
url = f"{repo}/-/raw/{version}/.arkindex.yml"
with requests.get(url, timeout=REQUEST_TIMEOUT) as resp:
try:
resp.raise_for_status()
except requests.exceptions.HTTPError as e:
raise CommandError(f"Error retrieving configuration at {url}: {e.response.status_code}.")
data = yaml.safe_load(resp.content)
if not isinstance(data, dict) or data.get("version", 0) < WORKER_YAML_VERSION or not data.get("workers"):
raise CommandError(f"Error retrieving configuration at {url}: invalid YAML configuration.")
# Look for the worker matching feature's slug
worker_conf = next((worker for worker in data["workers"] if worker["slug"] == slug), None)
if worker_conf is None:
raise CommandError(f"No worker with slug {slug} in .arkindex.yml at {url}.")
image = f"registry.gitlab.teklia.com/{name}/{version}"
command = worker_conf.get("command", None)
configuration = worker_conf.get("user_configuration", {})
self.update_feature(feature, {"image": image, "command": command, configuration: configuration})
def handle(self, *args, **options):
config = parse_config()
for feature_value, feature_config in config["features"].items():
feature = ArkindexFeature(feature_value)
self.update_feature(feature, feature_config)
self.stdout.write(f" {feature.name} ".center(80, ""))
worker = feature_config.pop("teklia_worker", None)
if worker is not None:
self.update_feature_from_worker(feature, **worker)
else:
self.update_feature(feature, feature_config)
Loading