Skip to content
Snippets Groups Projects

Execute docker tasks in RQ

Merged Valentin Rigal requested to merge dummy-tasks into community
3 files
+ 91
46
Compare changes
  • Side-by-side
  • Inline
Files
3
+ 69
46
import logging
import tempfile
from pathlib import Path
from urllib.parse import urljoin
import requests
@@ -13,6 +15,7 @@ from rq.job import Dependency
import docker
from arkindex.ponos.models import State, Task
from arkindex.ponos.utils import upload_artifacts
from arkindex.process.models import Process, WorkerActivityState
logger = logging.getLogger(__name__)
@@ -84,6 +87,64 @@ def schedule_tasks(process: Process, run: int):
parent_job = run_task_rq.delay(task, **kwargs)
def run_docker_task(client, task, artifacts_dir):
# 1. Pull the docker image
logger.debug(f"Pulling docker image '{task.image}'")
client.images.pull(task.image)
# 2. Do run the container asynchronously
logger.debug("Running container")
kwargs = {
"environment": task.env,
"detach": True,
"network": "host",
"volumes": {artifacts_dir: {"bind": "/data", "mode": "rw"}},
}
if task.command is not None:
kwargs["command"] = task.command
container = client.containers.run(task.image, **kwargs)
task.state = State.Running
task.save()
# 3. Read logs (see agent.setup_logging)
logger.debug("Reading logs from the docker container")
data = b""
for line in container.logs(stream=True):
data += line
try:
requests.put(
task.logs.s3_put_url,
data,
headers={"Content-Type": "text/plain; charset=utf-8"},
)
except Exception as e:
logger.warning(f"Failed uploading logs for task {task}: {e}")
# 4. Retrieve the state of the container
container.reload()
exit_code = container.attrs["State"]["ExitCode"]
if exit_code != 0:
logger.info("Task failed")
task.state = State.Failed
task.save()
return
task.state = State.Completed
task.save()
# 5. Upload artifacts
logger.info(f"Uploading artifacts for task {task}")
for path in Path(artifacts_dir).glob("**"):
if path.is_dir:
continue
try:
upload_artifacts(task, path, artifacts_dir)
except Exception as e:
logger.warning(
f"Failed uploading artifacts for task {task}: {e}"
)
@job("tasks", timeout=settings.RQ_TIMEOUTS["task"])
def run_task_rq(task: Task):
"""Run a single task in RQ"""
@@ -108,49 +169,11 @@ def run_task_rq(task: Task):
task.save()
return
try:
# 1. Pull the docker image
logger.debug(f"Pulling docker image '{task.image}'")
client.images.pull(task.image)
# 2. Do run the container asynchronously
logger.debug("Running container")
kwargs = {
"environment": task.env,
"detach": True,
"network": "host",
}
if task.command is not None:
kwargs["command"] = task.command
container = client.containers.run(task.image, **kwargs)
task.state = State.Running
task.save()
# 3. Read logs (see agent.setup_logging)
logger.debug("Reading logs from the docker container.")
data = b""
for line in container.logs(stream=True):
data += line
try:
requests.put(
task.logs.s3_put_url,
data,
headers={"Content-Type": "text/plain; charset=utf-8"},
)
except Exception as e:
logger.error(f"An error occurred uploading logs: {e}")
# 4. Retrieve the state of the container
container.reload()
exit_code = container.attrs["State"]["ExitCode"]
if exit_code == 0:
task.state = State.Completed
else:
logger.info("Task failed.")
task.state = State.Failed
task.save()
except Exception as e:
logger.error("An unexpected error occurred, updating state to Error.")
task.state = State.Error
task.save()
raise e
with tempfile.TemporaryDirectory() as artifacts_dir:
try:
run_docker_task(client, task, artifacts_dir)
except Exception as e:
logger.error("An unexpected error occurred, updating state to Error.")
task.state = State.Error
task.save()
raise e
Loading