From 5f7cf4ee061f1bc085c91756c5bfcb5727f691f5 Mon Sep 17 00:00:00 2001
From: mlbonhomme <bonhomme@teklia.com>
Date: Tue, 27 Feb 2024 12:32:18 +0100
Subject: [PATCH] extra_files support for rq tasks

---
 arkindex/ponos/tasks.py | 61 ++++++++++++++++++++++++++++++++---
 arkindex/ponos/utils.py | 70 +++++++++++++++++++++++++++++++++++++++++
 requirements.txt        |  1 +
 3 files changed, 128 insertions(+), 4 deletions(-)

diff --git a/arkindex/ponos/tasks.py b/arkindex/ponos/tasks.py
index 4c53bb7be2..6278da544d 100644
--- a/arkindex/ponos/tasks.py
+++ b/arkindex/ponos/tasks.py
@@ -5,6 +5,7 @@ from pathlib import Path
 from time import sleep
 from urllib.parse import urljoin
 
+import requests
 from django.conf import settings
 from django.core.mail import send_mail
 from django.db.models import Count, F, Q
@@ -15,7 +16,7 @@ from django_rq import job
 
 import docker
 from arkindex.ponos.models import State, Task
-from arkindex.ponos.utils import upload_artifact
+from arkindex.ponos.utils import decompress_zst_archive, extract_tar_archive, upload_artifact
 from arkindex.process.models import Process, WorkerActivityState
 from docker.errors import APIError, ImageNotFound
 
@@ -23,6 +24,8 @@ logger = logging.getLogger(__name__)
 
 # Delay for polling docker task's logs in seconds
 TASK_DOCKER_POLLING = 1
+# Timeout for HTTP requests: (connect timeout, read timeout)
+REQUEST_TIMEOUT = (30, 60)
 
 
 @job("default", timeout=settings.RQ_TIMEOUTS["notify_process_completion"])
@@ -87,6 +90,41 @@ def upload_logs(task, text):
         logger.warning(f"Failed uploading logs for task {task}: {e}")
 
 
+def download_extra_files(task) -> None:
+    """
+    Download the task's extra_files and store them in a dedicated `extra_files` folder.
+    This folder is mounted as `PONOS_DATA_DIR/extra_files` in docker containers.
+    If a downloaded file has a content-type of `application/zstd`, it is decompressed using zstandard.
+    """
+    # Download every declared extra file
+    for path_name, file_url in task.extra_files.items():
+        logger.info(f"Downloading file {path_name} using url: {file_url}")
+
+        # Download file using the provided url
+        with requests.get(file_url, stream=True, timeout=REQUEST_TIMEOUT) as resp:
+            resp.raise_for_status()
+
+            # Write file to a specific data directory
+            extra_files_dir = settings.PONOS_DATA_DIR / "extra_files"
+            extra_files_dir.mkdir(exist_ok=True)
+            with open(extra_files_dir / path_name, "wb") as f:
+                for chunk in resp.iter_content(chunk_size=8192):
+                    if chunk:
+                        f.write(chunk)
+
+        if resp.headers["Content-Type"] == "application/zstd":
+            # If type is `application/zstd`, decompress using zstandard
+            archive_fd, archive_path = decompress_zst_archive(
+                compressed_archive=extra_files_dir / path_name,
+            )
+            # Extract Tar archive
+            extract_tar_archive(
+                archive_path=archive_path,
+                archive_fd=archive_fd,
+                destination=extra_files_dir,
+            )
+
+
 def run_docker_task(client, task, temp_dir):
     # 1. Pull the docker image
     logger.debug(f"Pulling docker image '{task.image}'")
@@ -140,7 +178,22 @@ def run_docker_task(client, task, temp_dir):
     task.state = State.Running
     task.save()
 
-    # 4. Read logs
+    # 4. Download extra_files
+    if task.extra_files:
+        logger.info("Downloading extra_files for task {!s}".format(task))
+        try:
+            download_extra_files(task)
+        except Exception as e:
+            logger.warning(
+                "Failed downloading extra_files for task {!s}: {!s}".format(
+                    task, e
+                )
+            )
+            task.state = State.Error
+            task.save()
+            return
+
+    # 5. Read logs
     logger.debug("Reading logs from the docker container")
     previous_logs = b""
     while container.status == "running":
@@ -160,7 +213,7 @@ def run_docker_task(client, task, temp_dir):
     # Upload logs one last time so we do not miss any data
     upload_logs(task, container.logs())
 
-    # 5. Retrieve the state of the container
+    # 6. Retrieve the state of the container
     container.reload()
     exit_code = container.attrs["State"]["ExitCode"]
     if exit_code != 0:
@@ -172,7 +225,7 @@ def run_docker_task(client, task, temp_dir):
     task.state = State.Completed
     task.save()
 
-    # 6. Upload artifacts
+    # 7. Upload artifacts
     logger.info(f"Uploading artifacts for task {task}")
 
     for path in Path(artifacts_dir).glob("**/*"):
diff --git a/arkindex/ponos/utils.py b/arkindex/ponos/utils.py
index eba004ebb5..f9731fc17c 100644
--- a/arkindex/ponos/utils.py
+++ b/arkindex/ponos/utils.py
@@ -1,8 +1,14 @@
+import logging
 import os
+import tarfile
+import tempfile
 
 import magic
+import zstandard
 from arkindex.ponos.models import Task
 
+logger = logging.getLogger(__name__)
+
 
 def is_admin_or_ponos_task(request):
     return request.user.is_authenticated and (request.user.is_admin or isinstance(request.auth, Task))
@@ -22,3 +28,67 @@ def upload_artifact(task, path, artifacts_dir):
         size=size,
     )
     artifact.s3_object.upload_file(str(path), ExtraArgs={"ContentType": content_type})
+
+
+def decompress_zst_archive(compressed_archive):
+    """Decompress a zst-compressed tar archive in data dir.
+    This returns the path to the archive and the file descriptor.
+
+    .. warning::
+
+        Beware of closing the file descriptor explicitly or the main
+        process will keep the memory held even if the file is deleted.
+
+    :param compressed_archive: Path to the target ZST-compressed archive
+    :type compressed_archive: str
+    :return: File descriptor and path to the uncompressed tar archive
+    :rtype: list
+    """
+    dctx = zstandard.ZstdDecompressor()
+    archive_fd, archive_path = tempfile.mkstemp(prefix="ponos-", suffix=".tar")
+
+    logger.debug(f"Uncompressing downloaded File to {archive_path}")
+    try:
+        with open(compressed_archive, "rb") as compressed, open(
+            archive_path, "wb"
+        ) as decompressed:
+            dctx.copy_stream(compressed, decompressed)
+        logger.debug(f"Successfully uncompressed archive {compressed_archive}")
+    except zstandard.ZstdError as e:
+        raise Exception(f"Couldn't uncompressed archive: {e}")
+
+    # Deleting ZSTD archive tempfile as soon as possible
+    try:
+        os.remove(compressed_archive)
+        logger.debug(
+            f"Successfully deleted compressed archive file {compressed_archive}"
+        )
+    except OSError as e:
+        logger.debug(
+            f"Unable to delete compressed archive file {compressed_archive}: {e}"
+        )
+    return archive_fd, archive_path
+
+
+def extract_tar_archive(archive_fd, archive_path, destination):
+    """Extract the tar archive's content to a specific destination
+
+    :param archive_fd: File descriptor of the archive
+    :type archive_fd: int
+    :param archive_path: Path to the archive
+    :type archive_path: str
+    :param destination: Path where the archive's data will be extracted
+    :type destination: str
+    """
+    try:
+        with tarfile.open(archive_path) as tar_archive:
+            tar_archive.extractall(destination)
+    except tarfile.ReadError as e:
+        raise Exception(f"Couldn't handle the decompressed Tar archive: {e}")
+    finally:
+        # Deleting Tar archive
+        try:
+            os.close(archive_fd)
+            os.remove(archive_path)
+        except OSError as e:
+            logger.warning(f"Unable to delete archive file {archive_path}: {e}")
diff --git a/requirements.txt b/requirements.txt
index d8a63d68f4..455a17b66a 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -23,3 +23,4 @@ SolrClient==0.3.1
 teklia-toolbox==0.1.3
 tenacity==8.2.2
 uritemplate==4.1.1
+zstandard==0.20.0
-- 
GitLab