Skip to content
Snippets Groups Projects

Support extra_files in RQ tasks

Merged ml bonhomme requested to merge rq-task-extra-files into master
Files
3
+ 58
4
@@ -5,6 +5,7 @@ from pathlib import Path
@@ -5,6 +5,7 @@ from pathlib import Path
from time import sleep
from time import sleep
from urllib.parse import urljoin
from urllib.parse import urljoin
 
import requests
from django.conf import settings
from django.conf import settings
from django.core.mail import send_mail
from django.core.mail import send_mail
from django.db.models import Count, F, Q
from django.db.models import Count, F, Q
@@ -15,14 +16,17 @@ from django_rq import job
@@ -15,14 +16,17 @@ from django_rq import job
import docker
import docker
from arkindex.ponos.models import State, Task
from arkindex.ponos.models import State, Task
from arkindex.ponos.utils import upload_artifact
from arkindex.ponos.utils import decompress_zst_archive, extract_tar_archive, upload_artifact
from arkindex.process.models import Process, WorkerActivityState
from arkindex.process.models import Process, WorkerActivityState
 
from arkindex.project.tools import should_verify_cert
from docker.errors import APIError, ImageNotFound
from docker.errors import APIError, ImageNotFound
logger = logging.getLogger(__name__)
logger = logging.getLogger(__name__)
# Delay for polling docker task's logs in seconds
# Delay for polling docker task's logs in seconds
TASK_DOCKER_POLLING = 1
TASK_DOCKER_POLLING = 1
 
# Timeout for HTTP requests: (connect timeout, read timeout)
 
REQUEST_TIMEOUT = (30, 60)
@job("default", timeout=settings.RQ_TIMEOUTS["notify_process_completion"])
@job("default", timeout=settings.RQ_TIMEOUTS["notify_process_completion"])
@@ -87,6 +91,41 @@ def upload_logs(task, text):
@@ -87,6 +91,41 @@ def upload_logs(task, text):
logger.warning(f"Failed uploading logs for task {task}: {e}")
logger.warning(f"Failed uploading logs for task {task}: {e}")
 
def download_extra_files(task, temp_dir) -> None:
 
"""
 
Download the task's extra_files and store them in a dedicated `extra_files` folder.
 
This folder is mounted as `PONOS_DATA_DIR/extra_files` in docker containers.
 
If a downloaded file has a content-type of `application/zstd`, it is decompressed using zstandard.
 
"""
 
# Download every declared extra file
 
for path_name, file_url in task.extra_files.items():
 
logger.info(f"Downloading file {path_name} using url: {file_url}")
 
 
# Download file using the provided url
 
with requests.get(file_url, stream=True, timeout=REQUEST_TIMEOUT, verify=should_verify_cert(file_url)) as resp:
 
resp.raise_for_status()
 
 
# Write file to a specific data directory
 
extra_files_dir = temp_dir / "extra_files"
 
extra_files_dir.mkdir(exist_ok=True)
 
with open(extra_files_dir / path_name, "wb") as f:
 
for chunk in resp.iter_content(chunk_size=8192):
 
if chunk:
 
f.write(chunk)
 
 
if resp.headers["Content-Type"] == "application/zstd":
 
# If type is `application/zstd`, decompress using zstandard
 
archive_fd, archive_path = decompress_zst_archive(
 
compressed_archive=extra_files_dir / path_name,
 
)
 
# Extract Tar archive
 
extract_tar_archive(
 
archive_path=archive_path,
 
archive_fd=archive_fd,
 
destination=extra_files_dir,
 
)
 
 
def run_docker_task(client, task, temp_dir):
def run_docker_task(client, task, temp_dir):
# 1. Pull the docker image
# 1. Pull the docker image
logger.debug(f"Pulling docker image '{task.image}'")
logger.debug(f"Pulling docker image '{task.image}'")
@@ -140,7 +179,22 @@ def run_docker_task(client, task, temp_dir):
@@ -140,7 +179,22 @@ def run_docker_task(client, task, temp_dir):
task.state = State.Running
task.state = State.Running
task.save()
task.save()
# 4. Read logs
# 4. Download extra_files
 
if task.extra_files:
 
logger.info("Downloading extra_files for task {!s}".format(task))
 
try:
 
download_extra_files(task, temp_dir)
 
except Exception as e:
 
logger.warning(
 
"Failed downloading extra_files for task {!s}: {!s}".format(
 
task, e
 
)
 
)
 
task.state = State.Error
 
task.save()
 
return
 
 
# 5. Read logs
logger.debug("Reading logs from the docker container")
logger.debug("Reading logs from the docker container")
previous_logs = b""
previous_logs = b""
while container.status == "running":
while container.status == "running":
@@ -160,7 +214,7 @@ def run_docker_task(client, task, temp_dir):
@@ -160,7 +214,7 @@ def run_docker_task(client, task, temp_dir):
# Upload logs one last time so we do not miss any data
# Upload logs one last time so we do not miss any data
upload_logs(task, container.logs())
upload_logs(task, container.logs())
# 5. Retrieve the state of the container
# 6. Retrieve the state of the container
container.reload()
container.reload()
exit_code = container.attrs["State"]["ExitCode"]
exit_code = container.attrs["State"]["ExitCode"]
if exit_code != 0:
if exit_code != 0:
@@ -172,7 +226,7 @@ def run_docker_task(client, task, temp_dir):
@@ -172,7 +226,7 @@ def run_docker_task(client, task, temp_dir):
task.state = State.Completed
task.state = State.Completed
task.save()
task.save()
# 6. Upload artifacts
# 7. Upload artifacts
logger.info(f"Uploading artifacts for task {task}")
logger.info(f"Uploading artifacts for task {task}")
for path in Path(artifacts_dir).glob("**/*"):
for path in Path(artifacts_dir).glob("**/*"):
Loading