Skip to content
Snippets Groups Projects
Commit 0ba75081 authored by ml bonhomme's avatar ml bonhomme :bee:
Browse files

extra_files support for rq tasks

parent c6405687
No related branches found
No related tags found
No related merge requests found
......@@ -5,6 +5,7 @@ from pathlib import Path
from time import sleep
from urllib.parse import urljoin
import requests
from django.conf import settings
from django.core.mail import send_mail
from django.db.models import Count, F, Q
......@@ -15,7 +16,7 @@ from django_rq import job
import docker
from arkindex.ponos.models import State, Task
from arkindex.ponos.utils import upload_artifact
from arkindex.ponos.utils import decompress_zst_archive, extract_tar_archive, upload_artifact
from arkindex.process.models import Process, WorkerActivityState
from docker.errors import APIError, ImageNotFound
......@@ -23,6 +24,8 @@ logger = logging.getLogger(__name__)
# Delay for polling docker task's logs in seconds
TASK_DOCKER_POLLING = 1
# Timeout for HTTP requests: (connect timeout, read timeout)
REQUEST_TIMEOUT = (30, 60)
@job("default", timeout=settings.RQ_TIMEOUTS["notify_process_completion"])
......@@ -87,6 +90,41 @@ def upload_logs(task, text):
logger.warning(f"Failed uploading logs for task {task}: {e}")
def download_extra_files(task) -> None:
"""
Download the task's extra_files and store them in a dedicated `extra_files` folder.
This folder is mounted as `PONOS_DATA_DIR/extra_files` in docker containers.
If a downloaded file has a content-type of `application/zstd`, it is decompressed using zstandard.
"""
# Download every declared extra file
for path_name, file_url in task.extra_files.items():
logger.info(f"Downloading file {path_name} using url: {file_url}")
# Download file using the provided url
with requests.get(file_url, stream=True, timeout=REQUEST_TIMEOUT) as resp:
resp.raise_for_status()
# Write file to a specific data directory
extra_files_dir = settings.PONOS_DATA_DIR / "extra_files"
extra_files_dir.mkdir(exist_ok=True)
with open(extra_files_dir / path_name, "wb") as f:
for chunk in resp.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
if resp.headers["Content-Type"] == "application/zstd":
# If type is `application/zstd`, decompress using zstandard
archive_fd, archive_path = decompress_zst_archive(
compressed_archive=extra_files_dir / path_name,
)
# Extract Tar archive
extract_tar_archive(
archive_path=archive_path,
archive_fd=archive_fd,
destination=extra_files_dir,
)
def run_docker_task(client, task, temp_dir):
# 1. Pull the docker image
logger.debug(f"Pulling docker image '{task.image}'")
......@@ -140,7 +178,22 @@ def run_docker_task(client, task, temp_dir):
task.state = State.Running
task.save()
# 4. Read logs
# 4. Download extra_files
if task.extra_files:
logger.info("Downloading extra_files for task {!s}".format(task))
try:
download_extra_files(task)
except Exception as e:
logger.warning(
"Failed downloading extra_files for task {!s}: {!s}".format(
task, e
)
)
task.state = State.Error
task.save()
return
# 5. Read logs
logger.debug("Reading logs from the docker container")
previous_logs = b""
while container.status == "running":
......@@ -160,7 +213,7 @@ def run_docker_task(client, task, temp_dir):
# Upload logs one last time so we do not miss any data
upload_logs(task, container.logs())
# 5. Retrieve the state of the container
# 6. Retrieve the state of the container
container.reload()
exit_code = container.attrs["State"]["ExitCode"]
if exit_code != 0:
......@@ -172,7 +225,7 @@ def run_docker_task(client, task, temp_dir):
task.state = State.Completed
task.save()
# 6. Upload artifacts
# 7. Upload artifacts
logger.info(f"Uploading artifacts for task {task}")
for path in Path(artifacts_dir).glob("**/*"):
......
import logging
import os
import tarfile
import tempfile
import magic
import zstandard
from arkindex.ponos.models import Task
logger = logging.getLogger(__name__)
def is_admin_or_ponos_task(request):
return request.user.is_authenticated and (request.user.is_admin or isinstance(request.auth, Task))
......@@ -22,3 +28,67 @@ def upload_artifact(task, path, artifacts_dir):
size=size,
)
artifact.s3_object.upload_file(str(path), ExtraArgs={"ContentType": content_type})
def decompress_zst_archive(compressed_archive):
"""Decompress a zst-compressed tar archive in data dir.
This returns the path to the archive and the file descriptor.
.. warning::
Beware of closing the file descriptor explicitly or the main
process will keep the memory held even if the file is deleted.
:param compressed_archive: Path to the target ZST-compressed archive
:type compressed_archive: str
:return: File descriptor and path to the uncompressed tar archive
:rtype: list
"""
dctx = zstandard.ZstdDecompressor()
archive_fd, archive_path = tempfile.mkstemp(prefix="ponos-", suffix=".tar")
logger.debug(f"Uncompressing downloaded File to {archive_path}")
try:
with open(compressed_archive, "rb") as compressed, open(
archive_path, "wb"
) as decompressed:
dctx.copy_stream(compressed, decompressed)
logger.debug(f"Successfully uncompressed archive {compressed_archive}")
except zstandard.ZstdError as e:
raise Exception(f"Couldn't uncompressed archive: {e}")
# Deleting ZSTD archive tempfile as soon as possible
try:
os.remove(compressed_archive)
logger.debug(
f"Successfully deleted compressed archive file {compressed_archive}"
)
except OSError as e:
logger.debug(
f"Unable to delete compressed archive file {compressed_archive}: {e}"
)
return archive_fd, archive_path
def extract_tar_archive(archive_fd, archive_path, destination):
"""Extract the tar archive's content to a specific destination
:param archive_fd: File descriptor of the archive
:type archive_fd: int
:param archive_path: Path to the archive
:type archive_path: str
:param destination: Path where the archive's data will be extracted
:type destination: str
"""
try:
with tarfile.open(archive_path) as tar_archive:
tar_archive.extractall(destination)
except tarfile.ReadError as e:
raise Exception(f"Couldn't handle the decompressed Tar archive: {e}")
finally:
# Deleting Tar archive
try:
os.close(archive_fd)
os.remove(archive_path)
except OSError as e:
logger.warning(f"Unable to delete archive file {archive_path}: {e}")
......@@ -23,3 +23,4 @@ SolrClient==0.3.1
teklia-toolbox==0.1.3
tenacity==8.2.2
uritemplate==4.1.1
zstandard==0.20.0
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment