Skip to content
Snippets Groups Projects
Commit c16efbfd authored by ml bonhomme's avatar ml bonhomme :bee:
Browse files

extra_files support for rq tasks

parent 2fb8ac7b
No related branches found
No related tags found
No related merge requests found
This commit is part of merge request !2249. Comments created here will be created in the context of that merge request.
......@@ -5,6 +5,7 @@ from pathlib import Path
from time import sleep
from urllib.parse import urljoin
import requests
from django.conf import settings
from django.core.mail import send_mail
from django.db.models import Count, F, Q
......@@ -15,7 +16,7 @@ from django_rq import job
import docker
from arkindex.ponos.models import State, Task
from arkindex.ponos.utils import upload_artifact
from arkindex.ponos.utils import decompress_zst_archive, extract_tar_archive, upload_artifact
from arkindex.process.models import Process, WorkerActivityState
from docker.errors import APIError, ImageNotFound
......@@ -23,6 +24,8 @@ logger = logging.getLogger(__name__)
# Delay for polling docker task's logs in seconds
TASK_DOCKER_POLLING = 1
# Timeout for HTTP requests: (connect timeout, read timeout)
REQUEST_TIMEOUT = (30, 60)
@job("default", timeout=settings.RQ_TIMEOUTS["notify_process_completion"])
......@@ -87,6 +90,41 @@ def upload_logs(task, text):
logger.warning(f"Failed uploading logs for task {task}: {e}")
def download_extra_files(task) -> None:
"""
Download the task's extra_files and store them in a dedicated `extra_files` folder.
This folder is mounted as `PONOS_DATA_DIR/extra_files` in docker containers.
If a downloaded file has a content-type of `application/zstd`, it is decompressed using zstandard.
"""
# Download every declared extra file
for path_name, file_url in task.extra_files.items():
logger.info(f"Downloading file {path_name} using url: {file_url}")
# Download file using the provided url
with requests.get(file_url, stream=True, timeout=REQUEST_TIMEOUT) as resp:
resp.raise_for_status()
# Write file to a specific data directory
extra_files_dir = settings.PONOS_DATA_DIR / "extra_files"
extra_files_dir.mkdir(exist_ok=True)
with open(extra_files_dir / path_name, "wb") as f:
for chunk in resp.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
if resp.headers["Content-Type"] == "application/zstd":
# If type is `application/zstd`, decompress using zstandard
archive_fd, archive_path = decompress_zst_archive(
compressed_archive=extra_files_dir / path_name,
)
# Extract Tar archive
extract_tar_archive(
archive_path=archive_path,
archive_fd=archive_fd,
destination=extra_files_dir,
)
def run_docker_task(client, task, temp_dir):
# 1. Pull the docker image
logger.debug(f"Pulling docker image '{task.image}'")
......@@ -140,7 +178,22 @@ def run_docker_task(client, task, temp_dir):
task.state = State.Running
task.save()
# 4. Read logs
# 4. Download extra_files
if task.extra_files:
logger.info("Downloading extra_files for task {!s}".format(task))
try:
download_extra_files(task)
except Exception as e:
logger.warning(
"Failed downloading extra_files for task {!s}: {!s}".format(
task, e
)
)
task.state = State.Error
task.save()
return
# 5. Read logs
logger.debug("Reading logs from the docker container")
previous_logs = b""
while container.status == "running":
......@@ -160,7 +213,7 @@ def run_docker_task(client, task, temp_dir):
# Upload logs one last time so we do not miss any data
upload_logs(task, container.logs())
# 5. Retrieve the state of the container
# 6. Retrieve the state of the container
container.reload()
exit_code = container.attrs["State"]["ExitCode"]
if exit_code != 0:
......@@ -172,7 +225,7 @@ def run_docker_task(client, task, temp_dir):
task.state = State.Completed
task.save()
# 6. Upload artifacts
# 7. Upload artifacts
logger.info(f"Uploading artifacts for task {task}")
for path in Path(artifacts_dir).glob("**/*"):
......
import logging
import os
import tarfile
import tempfile
import magic
import zstandard
from arkindex.ponos.models import Task
logger = logging.getLogger(__name__)
def is_admin_or_ponos_task(request):
return request.user.is_authenticated and (request.user.is_admin or isinstance(request.auth, Task))
......@@ -22,3 +28,67 @@ def upload_artifact(task, path, artifacts_dir):
size=size,
)
artifact.s3_object.upload_file(str(path), ExtraArgs={"ContentType": content_type})
def decompress_zst_archive(compressed_archive):
"""Decompress a zst-compressed tar archive in data dir.
This returns the path to the archive and the file descriptor.
.. warning::
Beware of closing the file descriptor explicitly or the main
process will keep the memory held even if the file is deleted.
:param compressed_archive: Path to the target ZST-compressed archive
:type compressed_archive: str
:return: File descriptor and path to the uncompressed tar archive
:rtype: list
"""
dctx = zstandard.ZstdDecompressor()
archive_fd, archive_path = tempfile.mkstemp(prefix="ponos-", suffix=".tar")
logger.debug(f"Uncompressing downloaded File to {archive_path}")
try:
with open(compressed_archive, "rb") as compressed, open(
archive_path, "wb"
) as decompressed:
dctx.copy_stream(compressed, decompressed)
logger.debug(f"Successfully uncompressed archive {compressed_archive}")
except zstandard.ZstdError as e:
raise Exception(f"Couldn't uncompressed archive: {e}")
# Deleting ZSTD archive tempfile as soon as possible
try:
os.remove(compressed_archive)
logger.debug(
f"Successfully deleted compressed archive file {compressed_archive}"
)
except OSError as e:
logger.debug(
f"Unable to delete compressed archive file {compressed_archive}: {e}"
)
return archive_fd, archive_path
def extract_tar_archive(archive_fd, archive_path, destination):
"""Extract the tar archive's content to a specific destination
:param archive_fd: File descriptor of the archive
:type archive_fd: int
:param archive_path: Path to the archive
:type archive_path: str
:param destination: Path where the archive's data will be extracted
:type destination: str
"""
try:
with tarfile.open(archive_path) as tar_archive:
tar_archive.extractall(destination)
except tarfile.ReadError as e:
raise Exception(f"Couldn't handle the decompressed Tar archive: {e}")
finally:
# Deleting Tar archive
try:
os.close(archive_fd)
os.remove(archive_path)
except OSError as e:
logger.warning(f"Unable to delete archive file {archive_path}: {e}")
......@@ -22,3 +22,4 @@ SolrClient==0.3.1
teklia-toolbox==0.1.3
tenacity==8.2.2
uritemplate==4.1.1
zstandard==0.20.0
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment