Skip to content
Snippets Groups Projects

Execute docker tasks in RQ

Merged Valentin Rigal requested to merge dummy-tasks into community
All threads resolved!
4 files
+ 34
42
Compare changes
  • Side-by-side
  • Inline
Files
4
+ 18
30
import logging
import tempfile
from io import BytesIO
from pathlib import Path
from urllib.parse import urljoin
import requests
from django.conf import settings
from django.core.mail import send_mail
from django.db.models import Count, F, Q
@@ -11,7 +11,6 @@ from django.db.models.functions import Round
from django.shortcuts import reverse
from django.template.loader import render_to_string
from django_rq import job
from rq.job import Dependency
import docker
from arkindex.ponos.models import State, Task
@@ -73,21 +72,7 @@ def notify_process_completion(
)
@job("tasks", timeout=None)
def schedule_tasks(process: Process, run: int):
# Build a simple dependency scheme between tasks, based on depth
tasks = process.tasks.filter(run=run).order_by("depth", "id")
tasks.update(state=State.Pending)
# Run tasks in RQ, one by one
parent_job = None
for task in tasks:
kwargs = {}
if parent_job:
kwargs["depends_on"] = Dependency(jobs=[parent_job], allow_failure=True)
parent_job = run_task_rq.delay(task, **kwargs)
def run_docker_task(client, task, artifacts_dir):
def run_docker_task(client, task, temp_dir):
# 1. Pull the docker image
logger.debug(f"Pulling docker image '{task.image}'")
client.images.pull(task.image)
@@ -95,10 +80,13 @@ def run_docker_task(client, task, artifacts_dir):
# 2. Do run the container asynchronously
logger.debug("Running container")
kwargs = {
"environment": task.env,
"environment": {
**task.env,
"PONOS_DATA": "/data",
},
"detach": True,
"network": "host",
"volumes": {artifacts_dir: {"bind": "/data", "mode": "rw"}},
"volumes": {temp_dir: {"bind": "/data/current", "mode": "rw"}},
}
if task.requires_gpu:
# Assign all GPUs to that container
@@ -117,10 +105,9 @@ def run_docker_task(client, task, artifacts_dir):
for line in container.logs(stream=True):
data += line
try:
requests.put(
task.logs.s3_put_url,
data,
headers={"Content-Type": "text/plain; charset=utf-8"},
task.logs.s3_object.upload_fileobj(
BytesIO(data),
ExtraArgs={"ContentType": "text/plain; charset=utf-8"},
)
except Exception as e:
logger.warning(f"Failed uploading logs for task {task}: {e}")
@@ -139,11 +126,12 @@ def run_docker_task(client, task, artifacts_dir):
# 5. Upload artifacts
logger.info(f"Uploading artifacts for task {task}")
for path in Path(artifacts_dir).glob("**"):
if path.is_dir:
for path in Path(temp_dir).glob("**/*"):
if path.is_dir():
continue
try:
upload_artifacts(task, path, artifacts_dir)
upload_artifacts(task, path, temp_dir)
except Exception as e:
logger.warning(
f"Failed uploading artifacts for task {task}: {e}"
@@ -160,10 +148,10 @@ def run_task_rq(task: Task):
client = docker.from_env()
if not task.image:
raise ValueError("The task must have a docker image.")
raise ValueError(f"Task {task} has no docker image.")
if task.state != State.Pending:
raise ValueError("The task must be in pending state run in RQ.")
raise ValueError(f"Task {task} must be in pending state to run in RQ.")
# Automatically update children in case an error occurred
if (parent_state := next(
@@ -174,9 +162,9 @@ def run_task_rq(task: Task):
task.save()
return
with tempfile.TemporaryDirectory() as artifacts_dir:
with tempfile.TemporaryDirectory(suffix=f"_{task.id}") as temp_dir:
try:
run_docker_task(client, task, artifacts_dir)
run_docker_task(client, task, Path(temp_dir))
except Exception as e:
logger.error("An unexpected error occurred, updating state to Error.")
task.state = State.Error
Loading