Skip to content
Snippets Groups Projects

Support extra_files in RQ tasks

Merged ml bonhomme requested to merge rq-task-extra-files into master
All threads resolved!
3 files
+ 128
4
Compare changes
  • Side-by-side
  • Inline
Files
3
+ 57
4
@@ -5,6 +5,7 @@ from pathlib import Path
from time import sleep
from urllib.parse import urljoin
import requests
from django.conf import settings
from django.core.mail import send_mail
from django.db.models import Count, F, Q
@@ -15,7 +16,7 @@ from django_rq import job
import docker
from arkindex.ponos.models import State, Task
from arkindex.ponos.utils import upload_artifact
from arkindex.ponos.utils import decompress_zst_archive, extract_tar_archive, upload_artifact
from arkindex.process.models import Process, WorkerActivityState
from docker.errors import APIError, ImageNotFound
@@ -23,6 +24,8 @@ logger = logging.getLogger(__name__)
# Delay for polling docker task's logs in seconds
TASK_DOCKER_POLLING = 1
# Timeout for HTTP requests: (connect timeout, read timeout)
REQUEST_TIMEOUT = (30, 60)
@job("default", timeout=settings.RQ_TIMEOUTS["notify_process_completion"])
@@ -87,6 +90,41 @@ def upload_logs(task, text):
logger.warning(f"Failed uploading logs for task {task}: {e}")
def download_extra_files(task) -> None:
"""
Download the task's extra_files and store them in a dedicated `extra_files` folder.
This folder is mounted as `PONOS_DATA_DIR/extra_files` in docker containers.
If a downloaded file has a content-type of `application/zstd`, it is decompressed using zstandard.
"""
# Download every declared extra file
for path_name, file_url in task.extra_files.items():
logger.info(f"Downloading file {path_name} using url: {file_url}")
# Download file using the provided url
with requests.get(file_url, stream=True, timeout=REQUEST_TIMEOUT) as resp:
resp.raise_for_status()
# Write file to a specific data directory
extra_files_dir = settings.PONOS_DATA_DIR / "extra_files"
extra_files_dir.mkdir(exist_ok=True)
with open(extra_files_dir / path_name, "wb") as f:
for chunk in resp.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
if resp.headers["Content-Type"] == "application/zstd":
# If type is `application/zstd`, decompress using zstandard
archive_fd, archive_path = decompress_zst_archive(
compressed_archive=extra_files_dir / path_name,
)
# Extract Tar archive
extract_tar_archive(
archive_path=archive_path,
archive_fd=archive_fd,
destination=extra_files_dir,
)
def run_docker_task(client, task, temp_dir):
# 1. Pull the docker image
logger.debug(f"Pulling docker image '{task.image}'")
@@ -140,7 +178,22 @@ def run_docker_task(client, task, temp_dir):
task.state = State.Running
task.save()
# 4. Read logs
# 4. Download extra_files
if task.extra_files:
logger.info("Downloading extra_files for task {!s}".format(task))
try:
download_extra_files(task)
except Exception as e:
logger.warning(
"Failed downloading extra_files for task {!s}: {!s}".format(
task, e
)
)
task.state = State.Error
task.save()
return
# 5. Read logs
logger.debug("Reading logs from the docker container")
previous_logs = b""
while container.status == "running":
@@ -160,7 +213,7 @@ def run_docker_task(client, task, temp_dir):
# Upload logs one last time so we do not miss any data
upload_logs(task, container.logs())
# 5. Retrieve the state of the container
# 6. Retrieve the state of the container
container.reload()
exit_code = container.attrs["State"]["ExitCode"]
if exit_code != 0:
@@ -172,7 +225,7 @@ def run_docker_task(client, task, temp_dir):
task.state = State.Completed
task.save()
# 6. Upload artifacts
# 7. Upload artifacts
logger.info(f"Uploading artifacts for task {task}")
for path in Path(artifacts_dir).glob("**/*"):
Loading