From 47a31812c4ec575e88f9b9ffad90b3dad98f81b2 Mon Sep 17 00:00:00 2001
From: ml bonhomme <bonhomme@teklia.com>
Date: Wed, 10 Apr 2024 13:13:10 +0000
Subject: [PATCH] Support extra_files in RQ tasks

---
 arkindex/images/models.py                     |  3 +-
 arkindex/ponos/tasks.py                       | 62 ++++++++++++++--
 arkindex/ponos/utils.py                       | 70 +++++++++++++++++++
 arkindex/project/aws.py                       | 15 +---
 .../tests/{test_aws.py => test_tools.py}      |  4 +-
 arkindex/project/tools.py                     | 16 ++++-
 requirements.txt                              |  1 +
 7 files changed, 150 insertions(+), 21 deletions(-)
 rename arkindex/project/tests/{test_aws.py => test_tools.py} (80%)

diff --git a/arkindex/images/models.py b/arkindex/images/models.py
index a682541439..aecc25fca6 100644
--- a/arkindex/images/models.py
+++ b/arkindex/images/models.py
@@ -15,9 +15,10 @@ from django.utils.text import slugify
 from enumfields import EnumField
 
 from arkindex.images.managers import ImageServerManager
-from arkindex.project.aws import S3FileMixin, S3FileStatus, should_verify_cert
+from arkindex.project.aws import S3FileMixin, S3FileStatus
 from arkindex.project.fields import LStripTextField, MD5HashField, StripSlashURLField
 from arkindex.project.models import IndexableModel
+from arkindex.project.tools import should_verify_cert
 
 logger = logging.getLogger(__name__)
 profile_uri_validator = URLValidator(schemes=["http", "https"], message="Invalid IIIF profile URI")
diff --git a/arkindex/ponos/tasks.py b/arkindex/ponos/tasks.py
index 4c53bb7be2..b4744c1a01 100644
--- a/arkindex/ponos/tasks.py
+++ b/arkindex/ponos/tasks.py
@@ -5,6 +5,7 @@ from pathlib import Path
 from time import sleep
 from urllib.parse import urljoin
 
+import requests
 from django.conf import settings
 from django.core.mail import send_mail
 from django.db.models import Count, F, Q
@@ -15,14 +16,17 @@ from django_rq import job
 
 import docker
 from arkindex.ponos.models import State, Task
-from arkindex.ponos.utils import upload_artifact
+from arkindex.ponos.utils import decompress_zst_archive, extract_tar_archive, upload_artifact
 from arkindex.process.models import Process, WorkerActivityState
+from arkindex.project.tools import should_verify_cert
 from docker.errors import APIError, ImageNotFound
 
 logger = logging.getLogger(__name__)
 
 # Delay for polling docker task's logs in seconds
 TASK_DOCKER_POLLING = 1
+# Timeout for HTTP requests: (connect timeout, read timeout)
+REQUEST_TIMEOUT = (30, 60)
 
 
 @job("default", timeout=settings.RQ_TIMEOUTS["notify_process_completion"])
@@ -87,6 +91,41 @@ def upload_logs(task, text):
         logger.warning(f"Failed uploading logs for task {task}: {e}")
 
 
+def download_extra_files(task, temp_dir) -> None:
+    """
+    Download the task's extra_files and store them in a dedicated `extra_files` folder.
+    This folder is mounted as `PONOS_DATA_DIR/extra_files` in docker containers.
+    If a downloaded file has a content-type of `application/zstd`, it is decompressed using zstandard.
+    """
+    # Download every declared extra file
+    for path_name, file_url in task.extra_files.items():
+        logger.info(f"Downloading file {path_name} using url: {file_url}")
+
+        # Download file using the provided url
+        with requests.get(file_url, stream=True, timeout=REQUEST_TIMEOUT, verify=should_verify_cert(file_url)) as resp:
+            resp.raise_for_status()
+
+            # Write file to a specific data directory
+            extra_files_dir = temp_dir / "extra_files"
+            extra_files_dir.mkdir(exist_ok=True)
+            with open(extra_files_dir / path_name, "wb") as f:
+                for chunk in resp.iter_content(chunk_size=8192):
+                    if chunk:
+                        f.write(chunk)
+
+        if resp.headers["Content-Type"] == "application/zstd":
+            # If type is `application/zstd`, decompress using zstandard
+            archive_fd, archive_path = decompress_zst_archive(
+                compressed_archive=extra_files_dir / path_name,
+            )
+            # Extract Tar archive
+            extract_tar_archive(
+                archive_path=archive_path,
+                archive_fd=archive_fd,
+                destination=extra_files_dir,
+            )
+
+
 def run_docker_task(client, task, temp_dir):
     # 1. Pull the docker image
     logger.debug(f"Pulling docker image '{task.image}'")
@@ -140,7 +179,22 @@ def run_docker_task(client, task, temp_dir):
     task.state = State.Running
     task.save()
 
-    # 4. Read logs
+    # 4. Download extra_files
+    if task.extra_files:
+        logger.info("Downloading extra_files for task {!s}".format(task))
+        try:
+            download_extra_files(task, temp_dir)
+        except Exception as e:
+            logger.warning(
+                "Failed downloading extra_files for task {!s}: {!s}".format(
+                    task, e
+                )
+            )
+            task.state = State.Error
+            task.save()
+            return
+
+    # 5. Read logs
     logger.debug("Reading logs from the docker container")
     previous_logs = b""
     while container.status == "running":
@@ -160,7 +214,7 @@ def run_docker_task(client, task, temp_dir):
     # Upload logs one last time so we do not miss any data
     upload_logs(task, container.logs())
 
-    # 5. Retrieve the state of the container
+    # 6. Retrieve the state of the container
     container.reload()
     exit_code = container.attrs["State"]["ExitCode"]
     if exit_code != 0:
@@ -172,7 +226,7 @@ def run_docker_task(client, task, temp_dir):
     task.state = State.Completed
     task.save()
 
-    # 6. Upload artifacts
+    # 7. Upload artifacts
     logger.info(f"Uploading artifacts for task {task}")
 
     for path in Path(artifacts_dir).glob("**/*"):
diff --git a/arkindex/ponos/utils.py b/arkindex/ponos/utils.py
index eba004ebb5..f9731fc17c 100644
--- a/arkindex/ponos/utils.py
+++ b/arkindex/ponos/utils.py
@@ -1,8 +1,14 @@
+import logging
 import os
+import tarfile
+import tempfile
 
 import magic
+import zstandard
 from arkindex.ponos.models import Task
 
+logger = logging.getLogger(__name__)
+
 
 def is_admin_or_ponos_task(request):
     return request.user.is_authenticated and (request.user.is_admin or isinstance(request.auth, Task))
@@ -22,3 +28,67 @@ def upload_artifact(task, path, artifacts_dir):
         size=size,
     )
     artifact.s3_object.upload_file(str(path), ExtraArgs={"ContentType": content_type})
+
+
+def decompress_zst_archive(compressed_archive):
+    """Decompress a zst-compressed tar archive in data dir.
+    This returns the path to the archive and the file descriptor.
+
+    .. warning::
+
+        Beware of closing the file descriptor explicitly or the main
+        process will keep the memory held even if the file is deleted.
+
+    :param compressed_archive: Path to the target ZST-compressed archive
+    :type compressed_archive: str
+    :return: File descriptor and path to the uncompressed tar archive
+    :rtype: list
+    """
+    dctx = zstandard.ZstdDecompressor()
+    archive_fd, archive_path = tempfile.mkstemp(prefix="ponos-", suffix=".tar")
+
+    logger.debug(f"Uncompressing downloaded File to {archive_path}")
+    try:
+        with open(compressed_archive, "rb") as compressed, open(
+            archive_path, "wb"
+        ) as decompressed:
+            dctx.copy_stream(compressed, decompressed)
+        logger.debug(f"Successfully uncompressed archive {compressed_archive}")
+    except zstandard.ZstdError as e:
+        raise Exception(f"Couldn't uncompressed archive: {e}")
+
+    # Deleting ZSTD archive tempfile as soon as possible
+    try:
+        os.remove(compressed_archive)
+        logger.debug(
+            f"Successfully deleted compressed archive file {compressed_archive}"
+        )
+    except OSError as e:
+        logger.debug(
+            f"Unable to delete compressed archive file {compressed_archive}: {e}"
+        )
+    return archive_fd, archive_path
+
+
+def extract_tar_archive(archive_fd, archive_path, destination):
+    """Extract the tar archive's content to a specific destination
+
+    :param archive_fd: File descriptor of the archive
+    :type archive_fd: int
+    :param archive_path: Path to the archive
+    :type archive_path: str
+    :param destination: Path where the archive's data will be extracted
+    :type destination: str
+    """
+    try:
+        with tarfile.open(archive_path) as tar_archive:
+            tar_archive.extractall(destination)
+    except tarfile.ReadError as e:
+        raise Exception(f"Couldn't handle the decompressed Tar archive: {e}")
+    finally:
+        # Deleting Tar archive
+        try:
+            os.close(archive_fd)
+            os.remove(archive_path)
+        except OSError as e:
+            logger.warning(f"Unable to delete archive file {archive_path}: {e}")
diff --git a/arkindex/project/aws.py b/arkindex/project/aws.py
index ef0edfb725..659f37326a 100644
--- a/arkindex/project/aws.py
+++ b/arkindex/project/aws.py
@@ -1,7 +1,6 @@
 import logging
 from functools import wraps
 from io import BytesIO
-from urllib.parse import urlparse
 
 import boto3.session
 from botocore.config import Config
@@ -11,19 +10,9 @@ from django.utils.functional import cached_property
 from enumfields import Enum
 from tenacity import retry, retry_if_exception, stop_after_delay
 
-logger = logging.getLogger(__name__)
-
-
-def should_verify_cert(url):
-    """
-    Skip SSL certification validation when hitting a development instance
-    """
-    # Special case when no url is provided
-    if url is None:
-        return True
+from arkindex.project.tools import should_verify_cert
 
-    host = urlparse(url).netloc
-    return not host.endswith("ark.localhost")
+logger = logging.getLogger(__name__)
 
 
 def get_s3_resource(
diff --git a/arkindex/project/tests/test_aws.py b/arkindex/project/tests/test_tools.py
similarity index 80%
rename from arkindex/project/tests/test_aws.py
rename to arkindex/project/tests/test_tools.py
index 7aae4cb45d..89d313e74c 100644
--- a/arkindex/project/tests/test_aws.py
+++ b/arkindex/project/tests/test_tools.py
@@ -1,9 +1,9 @@
 from django.test import TestCase
 
-from arkindex.project.aws import should_verify_cert  # noqa
+from arkindex.project.tools import should_verify_cert  # noqa
 
 
-class AWSTestCase(TestCase):
+class ToolsTest(TestCase):
 
     def test_should_verify_cert(self):
         self.assertTrue(should_verify_cert("https://google.fr/whatever"))
diff --git a/arkindex/project/tools.py b/arkindex/project/tools.py
index 74bc0dd81f..ab448463f8 100644
--- a/arkindex/project/tools.py
+++ b/arkindex/project/tools.py
@@ -1,11 +1,22 @@
 from collections.abc import Iterable, Iterator, Sized
 from datetime import datetime, timezone
+from urllib.parse import urlparse
 
 from django.db.models import Aggregate, CharField, Func
 from django.db.models.expressions import BaseExpression, OrderByList
 from django.urls import reverse
 
-from arkindex.documents.models import Element, ElementPath
+
+def should_verify_cert(url):
+    """
+    Skip SSL certification validation when hitting a development instance
+    """
+    # Special case when no url is provided
+    if url is None:
+        return True
+
+    host = urlparse(url).netloc
+    return not host.endswith("ark.localhost")
 
 
 def build_absolute_url(element, request, name, id_argument="pk", **kwargs):
@@ -26,6 +37,9 @@ def build_tree(tree, *, corpus, type):
 
     Returns a dict associating element names with created Elements.
     """
+    # Avoid circular import issue
+    from arkindex.documents.models import Element, ElementPath
+
     assert isinstance(tree, dict)
 
     def parse_value(val):
diff --git a/requirements.txt b/requirements.txt
index d8a63d68f4..455a17b66a 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -23,3 +23,4 @@ SolrClient==0.3.1
 teklia-toolbox==0.1.3
 tenacity==8.2.2
 uritemplate==4.1.1
+zstandard==0.20.0
-- 
GitLab