Skip to content
Snippets Groups Projects
Commit 63b0f4e3 authored by Yoann Schneider's avatar Yoann Schneider :tennis:
Browse files

Specify destination in archive utils

parent 40768b62
No related branches found
No related tags found
1 merge request!349Specify destination in archive utils
Pipeline #80336 passed
......@@ -5,7 +5,7 @@ import os
import tarfile
import tempfile
from pathlib import Path
from typing import Tuple
from typing import Optional, Tuple, Union
import zstandard
import zstandard as zstd
......@@ -88,21 +88,29 @@ def close_delete_file(file_descriptor: int, file_path: Path):
logger.warning(f"Unable to delete file {file_path}: {e}")
def zstd_compress(source: Path) -> Tuple[int, Path, str]:
def zstd_compress(
source: Path, destination: Optional[Path] = None
) -> Tuple[Union[int, None], Path, str]:
"""Compress a file using the Zstandard compression algorithm.
:param source: Path to the file to compress.
:return: The file descriptor and path to the compressed file, hash of its content.
:param destination: Optional path for the created ZSTD archive. A tempfile will be created if this is omitted.
:return: The file descriptor (if one was created) and path to the compressed file, hash of its content.
"""
compressor = zstd.ZstdCompressor(level=3)
archive_hasher = hashlib.md5()
file_d, path_to_zst_archive = tempfile.mkstemp(prefix="teklia-", suffix=".tar.zst")
logger.debug(f"Compressing file to {path_to_zst_archive}")
path_to_zst_archive = Path(path_to_zst_archive)
# Parse destination and create a tmpfile if none was specified
file_d, destination = (
tempfile.mkstemp(prefix="teklia-", suffix=".tar.zst")
if destination is None
else (None, destination)
)
destination = Path(destination)
logger.debug(f"Compressing file to {destination}")
try:
with path_to_zst_archive.open("wb") as archive_file, source.open(
"rb"
) as model_data:
with destination.open("wb") as archive_file, source.open("rb") as model_data:
for model_chunk in iter(lambda: model_data.read(CHUNK_SIZE), b""):
compressed_chunk = compressor.compress(model_chunk)
archive_hasher.update(compressed_chunk)
......@@ -110,26 +118,33 @@ def zstd_compress(source: Path) -> Tuple[int, Path, str]:
logger.debug(f"Successfully compressed {source}")
except zstandard.ZstdError as e:
raise Exception(f"Couldn't compress archive: {e}")
return file_d, path_to_zst_archive, archive_hasher.hexdigest()
return file_d, destination, archive_hasher.hexdigest()
def create_tar_archive(path: Path) -> Tuple[Path, str]:
def create_tar_archive(
path: Path, destination: Optional[Path] = None
) -> Tuple[Union[int, None], Path, str]:
"""Create a tar archive using the content at specified location.
:param path: Path to the file to archive
:return: The file descriptor and path to the TAR archive, hash of its content.
:param destination: Optional path for the created TAR archive. A tempfile will be created if this is omitted.
:return: The file descriptor (if one was created) and path to the TAR archive, hash of its content.
"""
# Remove extension from the model filename
tar_descriptor, path_to_tar_archive = tempfile.mkstemp(
prefix="teklia-", suffix=".tar"
# Parse destination and create a tmpfile if none was specified
file_d, destination = (
tempfile.mkstemp(prefix="teklia-", suffix=".tar")
if destination is None
else (None, destination)
)
destination = Path(destination)
logger.debug(f"Compressing file to {destination}")
# Create an uncompressed tar archive with all the needed files
# Files hierarchy ifs kept in the archive.
files = []
try:
logger.debug(f"Compressing files to {path_to_tar_archive}")
with tarfile.open(path_to_tar_archive, "w") as tar:
logger.debug(f"Compressing files to {destination}")
with tarfile.open(destination, "w") as tar:
for p in path.rglob("*"):
x = p.relative_to(path)
tar.add(p, arcname=x, recursive=False)
......@@ -149,4 +164,23 @@ def create_tar_archive(path: Path) -> Tuple[Path, str]:
with file_path.open("rb") as file_data:
for chunk in iter(lambda: file_data.read(CHUNK_SIZE), b""):
content_hasher.update(chunk)
return tar_descriptor, Path(path_to_tar_archive), content_hasher.hexdigest()
return file_d, destination, content_hasher.hexdigest()
def create_tar_zst_archive(
source: Path, destination: Optional[Path] = None
) -> Tuple[Union[int, None], Path, str, str]:
"""Helper to create a TAR+ZST archive from a source folder.
:param source: Path to the folder whose content should be archived.
:param destination: Path to the created archive, defaults to None. If unspecified, a temporary file will be created.
:return: The file descriptor of the created tempfile (if one was created), path to the archive, its hash and the hash of the tar archive's content.
"""
# Create tar archive
tar_fd, tar_archive, tar_hash = create_tar_archive(source)
zstd_fd, zstd_archive, zstd_hash = zstd_compress(tar_archive, destination)
close_delete_file(tar_fd, tar_archive)
return zstd_fd, zstd_archive, zstd_hash, tar_hash
......@@ -13,7 +13,7 @@ import requests
from apistar.exceptions import ErrorResponse
from arkindex_worker import logger
from arkindex_worker.utils import close_delete_file, create_tar_archive, zstd_compress
from arkindex_worker.utils import close_delete_file, create_tar_zst_archive
DirPath = NewType("DirPath", Path)
"""Path to a directory"""
......@@ -37,13 +37,9 @@ def create_archive(path: DirPath) -> Tuple[Path, Hash, FileSize, Hash]:
"""
assert path.is_dir(), "create_archive needs a directory"
tar_descriptor, tar_archive, content_hash = create_tar_archive(path)
# Compress the archive
zstd_descriptor, zstd_archive, archive_hash = zstd_compress(tar_archive)
# Remove the tar archive
close_delete_file(tar_descriptor, tar_archive)
zstd_descriptor, zstd_archive, archive_hash, content_hash = create_tar_zst_archive(
path
)
# Get content hash, archive size and hash
yield zstd_archive, content_hash, zstd_archive.stat().st_size, archive_hash
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment