Skip to content
Snippets Groups Projects

Implement worker

Merged Yoann Schneider requested to merge implem into main
2 files
+ 6
25
Compare changes
  • Side-by-side
  • Inline
Files
2
# -*- coding: utf-8 -*-
import ast
import logging
import os
import tarfile
import tempfile
import time
from pathlib import Path
from urllib.parse import urljoin
import cv2
import imageio.v2 as iio
import zstandard as zstd
from arkindex_worker.utils import close_delete_file, create_tar_archive, zstd_compress
from worker_generic_training_dataset.exceptions import ImageDownloadError
logger = logging.getLogger(__name__)
@@ -58,25 +55,9 @@ def download_image(element, folder: Path):
raise ImageDownloadError(element.id, e)
def create_tar_zstd_archive(folder_path, destination: Path, chunk_size=1024):
compressor = zstd.ZstdCompressor(level=3)
def create_tar_zstd_archive(folder_path, destination: Path):
tar_fd, tar_archive, _ = create_tar_archive(folder_path)
# Remove extension from the model filename
_, path_to_tar_archive = tempfile.mkstemp(prefix="teklia-", suffix=".tar")
_, _, _ = zstd_compress(tar_archive, destination)
# Create an uncompressed tar archive with all the needed files
# Files hierarchy ifs kept in the archive.
with tarfile.open(path_to_tar_archive, "w") as tar:
for p in folder_path.glob("**/*"):
x = p.relative_to(folder_path)
tar.add(p, arcname=x, recursive=False)
# Compress the archive
with destination.open("wb") as archive_file:
with open(path_to_tar_archive, "rb") as model_data:
for model_chunk in iter(lambda: model_data.read(chunk_size), b""):
compressed_chunk = compressor.compress(model_chunk)
archive_file.write(compressed_chunk)
# Remove the tar archive
os.remove(path_to_tar_archive)
close_delete_file(tar_fd, tar_archive)
Loading