Skip to content
Snippets Groups Projects

Execute docker tasks in RQ

Merged Valentin Rigal requested to merge dummy-tasks into community
All threads resolved!
8 files
+ 101
1
Compare changes
  • Side-by-side
  • Inline
Files
8
+ 81
0
@@ -8,7 +8,10 @@ from django.db.models.functions import Round
from django.shortcuts import reverse
from django.template.loader import render_to_string
from django_rq import job
from rq.job import Dependency
import docker
from arkindex.ponos.models import State, Task
from arkindex.process.models import Process, WorkerActivityState
logger = logging.getLogger(__name__)
@@ -64,3 +67,81 @@ def notify_process_completion(
recipient_list=[process.creator.email],
fail_silently=False,
)
@job("tasks", timeout=None)
def schedule_tasks(process: Process, run: int):
# Build a simple dependency scheme between tasks, based on depth
tasks = process.tasks.filter(run=run).order_by("depth", "id")
tasks.update(state=State.Pending)
# Run tasks in RQ, one by one
parent_job = None
for task in tasks:
kwargs = {}
if parent_job:
kwargs["depends_on"] = Dependency(jobs=[parent_job], allow_failure=True)
parent_job = run_task_rq.delay(task, **kwargs)
@job("tasks", timeout=settings.RQ_TIMEOUTS["task"])
def run_task_rq(task: Task):
"""Run a single task in RQ"""
# Update task and parents from the DB
task.refresh_from_db()
parents = list(task.parents.order_by("depth", "id"))
client = docker.from_env()
if not task.image:
raise ValueError("The task must have a docker image.")
if task.state != State.Pending:
raise ValueError("The task must be in pending state run in RQ.")
# Automatically update children in case an error occurred
if (parent_state := next(
(parent.state for parent in parents if parent.state in (State.Stopped, State.Error, State.Failed)),
None
)) is not None:
task.state = parent_state
task.save()
return
try:
# 1. Pull the docker image
logger.debug(f"Pulling docker image '{task.image}'")
client.images.pull(task.image)
# 2. Do run the container asynchronously
logger.debug("Running container")
kwargs = {
"environment": task.env,
"detach": True,
"network": "host",
}
if task.command is not None:
kwargs["command"] = task.command
container = client.containers.run(task.image, **kwargs)
task.state = State.Running
task.save()
# 3. Read logs (see agent.setup_logging)
logger.debug("Reading logs from the docker container.")
for line in container.logs(stream=True):
# TODO: continuously publish the logs as artifact
logger.info(line)
# 4. Retrieve the state of the container
container.reload()
exit_code = container.attrs["State"]["ExitCode"]
if exit_code == 0:
task.state = State.Completed
else:
logger.error("Task failed.")
task.state = State.Failed
task.save()
except Exception as e:
logger.error("An unexpected error occurred, updating state to Error.")
task.state = State.Error
task.save()
raise e
Loading