diff --git a/arkindex/images/models.py b/arkindex/images/models.py index a6825414399ad31b889581815fb32e22ef5fe8d7..aecc25fca690b179e55696ab1f9775179b18d830 100644 --- a/arkindex/images/models.py +++ b/arkindex/images/models.py @@ -15,9 +15,10 @@ from django.utils.text import slugify from enumfields import EnumField from arkindex.images.managers import ImageServerManager -from arkindex.project.aws import S3FileMixin, S3FileStatus, should_verify_cert +from arkindex.project.aws import S3FileMixin, S3FileStatus from arkindex.project.fields import LStripTextField, MD5HashField, StripSlashURLField from arkindex.project.models import IndexableModel +from arkindex.project.tools import should_verify_cert logger = logging.getLogger(__name__) profile_uri_validator = URLValidator(schemes=["http", "https"], message="Invalid IIIF profile URI") diff --git a/arkindex/ponos/tasks.py b/arkindex/ponos/tasks.py index 4c53bb7be2f91e026c9ec810c4866c3b9f01fd85..b4744c1a0134e57e14ea402fce88598b88ec57ed 100644 --- a/arkindex/ponos/tasks.py +++ b/arkindex/ponos/tasks.py @@ -5,6 +5,7 @@ from pathlib import Path from time import sleep from urllib.parse import urljoin +import requests from django.conf import settings from django.core.mail import send_mail from django.db.models import Count, F, Q @@ -15,14 +16,17 @@ from django_rq import job import docker from arkindex.ponos.models import State, Task -from arkindex.ponos.utils import upload_artifact +from arkindex.ponos.utils import decompress_zst_archive, extract_tar_archive, upload_artifact from arkindex.process.models import Process, WorkerActivityState +from arkindex.project.tools import should_verify_cert from docker.errors import APIError, ImageNotFound logger = logging.getLogger(__name__) # Delay for polling docker task's logs in seconds TASK_DOCKER_POLLING = 1 +# Timeout for HTTP requests: (connect timeout, read timeout) +REQUEST_TIMEOUT = (30, 60) @job("default", timeout=settings.RQ_TIMEOUTS["notify_process_completion"]) @@ -87,6 +91,41 @@ def upload_logs(task, text): logger.warning(f"Failed uploading logs for task {task}: {e}") +def download_extra_files(task, temp_dir) -> None: + """ + Download the task's extra_files and store them in a dedicated `extra_files` folder. + This folder is mounted as `PONOS_DATA_DIR/extra_files` in docker containers. + If a downloaded file has a content-type of `application/zstd`, it is decompressed using zstandard. + """ + # Download every declared extra file + for path_name, file_url in task.extra_files.items(): + logger.info(f"Downloading file {path_name} using url: {file_url}") + + # Download file using the provided url + with requests.get(file_url, stream=True, timeout=REQUEST_TIMEOUT, verify=should_verify_cert(file_url)) as resp: + resp.raise_for_status() + + # Write file to a specific data directory + extra_files_dir = temp_dir / "extra_files" + extra_files_dir.mkdir(exist_ok=True) + with open(extra_files_dir / path_name, "wb") as f: + for chunk in resp.iter_content(chunk_size=8192): + if chunk: + f.write(chunk) + + if resp.headers["Content-Type"] == "application/zstd": + # If type is `application/zstd`, decompress using zstandard + archive_fd, archive_path = decompress_zst_archive( + compressed_archive=extra_files_dir / path_name, + ) + # Extract Tar archive + extract_tar_archive( + archive_path=archive_path, + archive_fd=archive_fd, + destination=extra_files_dir, + ) + + def run_docker_task(client, task, temp_dir): # 1. Pull the docker image logger.debug(f"Pulling docker image '{task.image}'") @@ -140,7 +179,22 @@ def run_docker_task(client, task, temp_dir): task.state = State.Running task.save() - # 4. Read logs + # 4. Download extra_files + if task.extra_files: + logger.info("Downloading extra_files for task {!s}".format(task)) + try: + download_extra_files(task, temp_dir) + except Exception as e: + logger.warning( + "Failed downloading extra_files for task {!s}: {!s}".format( + task, e + ) + ) + task.state = State.Error + task.save() + return + + # 5. Read logs logger.debug("Reading logs from the docker container") previous_logs = b"" while container.status == "running": @@ -160,7 +214,7 @@ def run_docker_task(client, task, temp_dir): # Upload logs one last time so we do not miss any data upload_logs(task, container.logs()) - # 5. Retrieve the state of the container + # 6. Retrieve the state of the container container.reload() exit_code = container.attrs["State"]["ExitCode"] if exit_code != 0: @@ -172,7 +226,7 @@ def run_docker_task(client, task, temp_dir): task.state = State.Completed task.save() - # 6. Upload artifacts + # 7. Upload artifacts logger.info(f"Uploading artifacts for task {task}") for path in Path(artifacts_dir).glob("**/*"): diff --git a/arkindex/ponos/utils.py b/arkindex/ponos/utils.py index eba004ebb5c48f50dd973df996ede155b62395e6..f9731fc17cafaa9ebff810cda1218c3312c6c5c2 100644 --- a/arkindex/ponos/utils.py +++ b/arkindex/ponos/utils.py @@ -1,8 +1,14 @@ +import logging import os +import tarfile +import tempfile import magic +import zstandard from arkindex.ponos.models import Task +logger = logging.getLogger(__name__) + def is_admin_or_ponos_task(request): return request.user.is_authenticated and (request.user.is_admin or isinstance(request.auth, Task)) @@ -22,3 +28,67 @@ def upload_artifact(task, path, artifacts_dir): size=size, ) artifact.s3_object.upload_file(str(path), ExtraArgs={"ContentType": content_type}) + + +def decompress_zst_archive(compressed_archive): + """Decompress a zst-compressed tar archive in data dir. + This returns the path to the archive and the file descriptor. + + .. warning:: + + Beware of closing the file descriptor explicitly or the main + process will keep the memory held even if the file is deleted. + + :param compressed_archive: Path to the target ZST-compressed archive + :type compressed_archive: str + :return: File descriptor and path to the uncompressed tar archive + :rtype: list + """ + dctx = zstandard.ZstdDecompressor() + archive_fd, archive_path = tempfile.mkstemp(prefix="ponos-", suffix=".tar") + + logger.debug(f"Uncompressing downloaded File to {archive_path}") + try: + with open(compressed_archive, "rb") as compressed, open( + archive_path, "wb" + ) as decompressed: + dctx.copy_stream(compressed, decompressed) + logger.debug(f"Successfully uncompressed archive {compressed_archive}") + except zstandard.ZstdError as e: + raise Exception(f"Couldn't uncompressed archive: {e}") + + # Deleting ZSTD archive tempfile as soon as possible + try: + os.remove(compressed_archive) + logger.debug( + f"Successfully deleted compressed archive file {compressed_archive}" + ) + except OSError as e: + logger.debug( + f"Unable to delete compressed archive file {compressed_archive}: {e}" + ) + return archive_fd, archive_path + + +def extract_tar_archive(archive_fd, archive_path, destination): + """Extract the tar archive's content to a specific destination + + :param archive_fd: File descriptor of the archive + :type archive_fd: int + :param archive_path: Path to the archive + :type archive_path: str + :param destination: Path where the archive's data will be extracted + :type destination: str + """ + try: + with tarfile.open(archive_path) as tar_archive: + tar_archive.extractall(destination) + except tarfile.ReadError as e: + raise Exception(f"Couldn't handle the decompressed Tar archive: {e}") + finally: + # Deleting Tar archive + try: + os.close(archive_fd) + os.remove(archive_path) + except OSError as e: + logger.warning(f"Unable to delete archive file {archive_path}: {e}") diff --git a/arkindex/project/aws.py b/arkindex/project/aws.py index ef0edfb7251a1c988cd35ddf823522acb460821c..659f37326a0e132505e1fb4b63a7729e668ba328 100644 --- a/arkindex/project/aws.py +++ b/arkindex/project/aws.py @@ -1,7 +1,6 @@ import logging from functools import wraps from io import BytesIO -from urllib.parse import urlparse import boto3.session from botocore.config import Config @@ -11,19 +10,9 @@ from django.utils.functional import cached_property from enumfields import Enum from tenacity import retry, retry_if_exception, stop_after_delay -logger = logging.getLogger(__name__) - - -def should_verify_cert(url): - """ - Skip SSL certification validation when hitting a development instance - """ - # Special case when no url is provided - if url is None: - return True +from arkindex.project.tools import should_verify_cert - host = urlparse(url).netloc - return not host.endswith("ark.localhost") +logger = logging.getLogger(__name__) def get_s3_resource( diff --git a/arkindex/project/tests/test_aws.py b/arkindex/project/tests/test_tools.py similarity index 80% rename from arkindex/project/tests/test_aws.py rename to arkindex/project/tests/test_tools.py index 7aae4cb45d7bb9c3654230aa3fd8d6d8ceff86c8..89d313e74c45516be2237b7ea7024f4239fb3e15 100644 --- a/arkindex/project/tests/test_aws.py +++ b/arkindex/project/tests/test_tools.py @@ -1,9 +1,9 @@ from django.test import TestCase -from arkindex.project.aws import should_verify_cert # noqa +from arkindex.project.tools import should_verify_cert # noqa -class AWSTestCase(TestCase): +class ToolsTest(TestCase): def test_should_verify_cert(self): self.assertTrue(should_verify_cert("https://google.fr/whatever")) diff --git a/arkindex/project/tools.py b/arkindex/project/tools.py index 74bc0dd81f3629a722797d88824f3a0d7caa5695..ab448463f8b008537077603e579814aea024c477 100644 --- a/arkindex/project/tools.py +++ b/arkindex/project/tools.py @@ -1,11 +1,22 @@ from collections.abc import Iterable, Iterator, Sized from datetime import datetime, timezone +from urllib.parse import urlparse from django.db.models import Aggregate, CharField, Func from django.db.models.expressions import BaseExpression, OrderByList from django.urls import reverse -from arkindex.documents.models import Element, ElementPath + +def should_verify_cert(url): + """ + Skip SSL certification validation when hitting a development instance + """ + # Special case when no url is provided + if url is None: + return True + + host = urlparse(url).netloc + return not host.endswith("ark.localhost") def build_absolute_url(element, request, name, id_argument="pk", **kwargs): @@ -26,6 +37,9 @@ def build_tree(tree, *, corpus, type): Returns a dict associating element names with created Elements. """ + # Avoid circular import issue + from arkindex.documents.models import Element, ElementPath + assert isinstance(tree, dict) def parse_value(val): diff --git a/requirements.txt b/requirements.txt index d8a63d68f45514585b67cbab2a078115c633e608..455a17b66a4e8a27a2e7e74de0c7860ee43c8973 100644 --- a/requirements.txt +++ b/requirements.txt @@ -23,3 +23,4 @@ SolrClient==0.3.1 teklia-toolbox==0.1.3 tenacity==8.2.2 uritemplate==4.1.1 +zstandard==0.20.0