Skip to content
Snippets Groups Projects
Commit 47a31812 authored by ml bonhomme's avatar ml bonhomme :bee: Committed by Erwan Rouchet
Browse files

Support extra_files in RQ tasks

parent 336ba731
No related branches found
No related tags found
1 merge request!2249Support extra_files in RQ tasks
......@@ -15,9 +15,10 @@ from django.utils.text import slugify
from enumfields import EnumField
from arkindex.images.managers import ImageServerManager
from arkindex.project.aws import S3FileMixin, S3FileStatus, should_verify_cert
from arkindex.project.aws import S3FileMixin, S3FileStatus
from arkindex.project.fields import LStripTextField, MD5HashField, StripSlashURLField
from arkindex.project.models import IndexableModel
from arkindex.project.tools import should_verify_cert
logger = logging.getLogger(__name__)
profile_uri_validator = URLValidator(schemes=["http", "https"], message="Invalid IIIF profile URI")
......
......@@ -5,6 +5,7 @@ from pathlib import Path
from time import sleep
from urllib.parse import urljoin
import requests
from django.conf import settings
from django.core.mail import send_mail
from django.db.models import Count, F, Q
......@@ -15,14 +16,17 @@ from django_rq import job
import docker
from arkindex.ponos.models import State, Task
from arkindex.ponos.utils import upload_artifact
from arkindex.ponos.utils import decompress_zst_archive, extract_tar_archive, upload_artifact
from arkindex.process.models import Process, WorkerActivityState
from arkindex.project.tools import should_verify_cert
from docker.errors import APIError, ImageNotFound
logger = logging.getLogger(__name__)
# Delay for polling docker task's logs in seconds
TASK_DOCKER_POLLING = 1
# Timeout for HTTP requests: (connect timeout, read timeout)
REQUEST_TIMEOUT = (30, 60)
@job("default", timeout=settings.RQ_TIMEOUTS["notify_process_completion"])
......@@ -87,6 +91,41 @@ def upload_logs(task, text):
logger.warning(f"Failed uploading logs for task {task}: {e}")
def download_extra_files(task, temp_dir) -> None:
"""
Download the task's extra_files and store them in a dedicated `extra_files` folder.
This folder is mounted as `PONOS_DATA_DIR/extra_files` in docker containers.
If a downloaded file has a content-type of `application/zstd`, it is decompressed using zstandard.
"""
# Download every declared extra file
for path_name, file_url in task.extra_files.items():
logger.info(f"Downloading file {path_name} using url: {file_url}")
# Download file using the provided url
with requests.get(file_url, stream=True, timeout=REQUEST_TIMEOUT, verify=should_verify_cert(file_url)) as resp:
resp.raise_for_status()
# Write file to a specific data directory
extra_files_dir = temp_dir / "extra_files"
extra_files_dir.mkdir(exist_ok=True)
with open(extra_files_dir / path_name, "wb") as f:
for chunk in resp.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
if resp.headers["Content-Type"] == "application/zstd":
# If type is `application/zstd`, decompress using zstandard
archive_fd, archive_path = decompress_zst_archive(
compressed_archive=extra_files_dir / path_name,
)
# Extract Tar archive
extract_tar_archive(
archive_path=archive_path,
archive_fd=archive_fd,
destination=extra_files_dir,
)
def run_docker_task(client, task, temp_dir):
# 1. Pull the docker image
logger.debug(f"Pulling docker image '{task.image}'")
......@@ -140,7 +179,22 @@ def run_docker_task(client, task, temp_dir):
task.state = State.Running
task.save()
# 4. Read logs
# 4. Download extra_files
if task.extra_files:
logger.info("Downloading extra_files for task {!s}".format(task))
try:
download_extra_files(task, temp_dir)
except Exception as e:
logger.warning(
"Failed downloading extra_files for task {!s}: {!s}".format(
task, e
)
)
task.state = State.Error
task.save()
return
# 5. Read logs
logger.debug("Reading logs from the docker container")
previous_logs = b""
while container.status == "running":
......@@ -160,7 +214,7 @@ def run_docker_task(client, task, temp_dir):
# Upload logs one last time so we do not miss any data
upload_logs(task, container.logs())
# 5. Retrieve the state of the container
# 6. Retrieve the state of the container
container.reload()
exit_code = container.attrs["State"]["ExitCode"]
if exit_code != 0:
......@@ -172,7 +226,7 @@ def run_docker_task(client, task, temp_dir):
task.state = State.Completed
task.save()
# 6. Upload artifacts
# 7. Upload artifacts
logger.info(f"Uploading artifacts for task {task}")
for path in Path(artifacts_dir).glob("**/*"):
......
import logging
import os
import tarfile
import tempfile
import magic
import zstandard
from arkindex.ponos.models import Task
logger = logging.getLogger(__name__)
def is_admin_or_ponos_task(request):
return request.user.is_authenticated and (request.user.is_admin or isinstance(request.auth, Task))
......@@ -22,3 +28,67 @@ def upload_artifact(task, path, artifacts_dir):
size=size,
)
artifact.s3_object.upload_file(str(path), ExtraArgs={"ContentType": content_type})
def decompress_zst_archive(compressed_archive):
"""Decompress a zst-compressed tar archive in data dir.
This returns the path to the archive and the file descriptor.
.. warning::
Beware of closing the file descriptor explicitly or the main
process will keep the memory held even if the file is deleted.
:param compressed_archive: Path to the target ZST-compressed archive
:type compressed_archive: str
:return: File descriptor and path to the uncompressed tar archive
:rtype: list
"""
dctx = zstandard.ZstdDecompressor()
archive_fd, archive_path = tempfile.mkstemp(prefix="ponos-", suffix=".tar")
logger.debug(f"Uncompressing downloaded File to {archive_path}")
try:
with open(compressed_archive, "rb") as compressed, open(
archive_path, "wb"
) as decompressed:
dctx.copy_stream(compressed, decompressed)
logger.debug(f"Successfully uncompressed archive {compressed_archive}")
except zstandard.ZstdError as e:
raise Exception(f"Couldn't uncompressed archive: {e}")
# Deleting ZSTD archive tempfile as soon as possible
try:
os.remove(compressed_archive)
logger.debug(
f"Successfully deleted compressed archive file {compressed_archive}"
)
except OSError as e:
logger.debug(
f"Unable to delete compressed archive file {compressed_archive}: {e}"
)
return archive_fd, archive_path
def extract_tar_archive(archive_fd, archive_path, destination):
"""Extract the tar archive's content to a specific destination
:param archive_fd: File descriptor of the archive
:type archive_fd: int
:param archive_path: Path to the archive
:type archive_path: str
:param destination: Path where the archive's data will be extracted
:type destination: str
"""
try:
with tarfile.open(archive_path) as tar_archive:
tar_archive.extractall(destination)
except tarfile.ReadError as e:
raise Exception(f"Couldn't handle the decompressed Tar archive: {e}")
finally:
# Deleting Tar archive
try:
os.close(archive_fd)
os.remove(archive_path)
except OSError as e:
logger.warning(f"Unable to delete archive file {archive_path}: {e}")
import logging
from functools import wraps
from io import BytesIO
from urllib.parse import urlparse
import boto3.session
from botocore.config import Config
......@@ -11,19 +10,9 @@ from django.utils.functional import cached_property
from enumfields import Enum
from tenacity import retry, retry_if_exception, stop_after_delay
logger = logging.getLogger(__name__)
def should_verify_cert(url):
"""
Skip SSL certification validation when hitting a development instance
"""
# Special case when no url is provided
if url is None:
return True
from arkindex.project.tools import should_verify_cert
host = urlparse(url).netloc
return not host.endswith("ark.localhost")
logger = logging.getLogger(__name__)
def get_s3_resource(
......
from django.test import TestCase
from arkindex.project.aws import should_verify_cert # noqa
from arkindex.project.tools import should_verify_cert # noqa
class AWSTestCase(TestCase):
class ToolsTest(TestCase):
def test_should_verify_cert(self):
self.assertTrue(should_verify_cert("https://google.fr/whatever"))
......
from collections.abc import Iterable, Iterator, Sized
from datetime import datetime, timezone
from urllib.parse import urlparse
from django.db.models import Aggregate, CharField, Func
from django.db.models.expressions import BaseExpression, OrderByList
from django.urls import reverse
from arkindex.documents.models import Element, ElementPath
def should_verify_cert(url):
"""
Skip SSL certification validation when hitting a development instance
"""
# Special case when no url is provided
if url is None:
return True
host = urlparse(url).netloc
return not host.endswith("ark.localhost")
def build_absolute_url(element, request, name, id_argument="pk", **kwargs):
......@@ -26,6 +37,9 @@ def build_tree(tree, *, corpus, type):
Returns a dict associating element names with created Elements.
"""
# Avoid circular import issue
from arkindex.documents.models import Element, ElementPath
assert isinstance(tree, dict)
def parse_value(val):
......
......@@ -23,3 +23,4 @@ SolrClient==0.3.1
teklia-toolbox==0.1.3
tenacity==8.2.2
uritemplate==4.1.1
zstandard==0.20.0
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment