Skip to content
Snippets Groups Projects

Execute docker tasks in RQ

Merged Valentin Rigal requested to merge dummy-tasks into community
All threads resolved!
1 file
+ 28
14
Compare changes
  • Side-by-side
  • Inline
+ 28
14
@@ -2,6 +2,7 @@ import logging
import tempfile
from io import BytesIO
from pathlib import Path
from time import sleep
from urllib.parse import urljoin
from django.conf import settings
@@ -19,6 +20,9 @@ from arkindex.process.models import Process, WorkerActivityState
logger = logging.getLogger(__name__)
# Delay for polling docker task's logs in seconds
TASK_DOCKER_POLLING = 1
@job("default", timeout=settings.RQ_TIMEOUTS["notify_process_completion"])
def notify_process_completion(
@@ -111,33 +115,43 @@ def run_docker_task(client, task, temp_dir):
if task.command is not None:
kwargs["command"] = task.command
container = client.containers.run(task.image, **kwargs)
# Wait for
while container.status == "created":
sleep(TASK_DOCKER_POLLING)
container.reload()
if container.status != "running":
raise Exception("Container was not updated to state running")
task.state = State.Running
task.save()
# 4. Read logs
logger.debug("Reading logs from the docker container")
data = b""
for line in container.logs(stream=True):
# Stop a task scheduled to be stopped
previous_logs = b""
while container.status == "running":
# Update container attributes first, so we do not miss publishing logs
container.reload()
logs = container.logs()
if logs != previous_logs:
previous_logs = logs
try:
task.logs.s3_object.upload_fileobj(
BytesIO(logs),
ExtraArgs={"ContentType": "text/plain; charset=utf-8"},
)
except Exception as e:
logger.warning(f"Failed uploading logs for task {task}: {e}")
sleep(TASK_DOCKER_POLLING)
task.refresh_from_db()
# Handle a task that is being stopped during execution
if task.state == State.Stopping:
container.stop()
task.state = State.Stopped
task.save()
return
data += line
try:
task.logs.s3_object.upload_fileobj(
BytesIO(data),
ExtraArgs={"ContentType": "text/plain; charset=utf-8"},
)
except Exception as e:
logger.warning(f"Failed uploading logs for task {task}: {e}")
# 5. Retrieve the state of the container
# Reload container to update `attrs` attribute
container.reload()
exit_code = container.attrs["State"]["ExitCode"]
if exit_code != 0:
logger.info("Task failed")
Loading