Skip to content
Snippets Groups Projects
Commit 0d4e7e50 authored by ml bonhomme's avatar ml bonhomme :bee: Committed by Erwan Rouchet
Browse files

Do not save PDF files from ZIPs, only upload JPEGs

parent 3a5124b6
No related branches found
No related tags found
1 merge request!365Do not save PDF files from ZIPs, only upload JPEGs
Pipeline #147847 passed
......@@ -92,15 +92,20 @@ def build_transcription(pdf_element, pdf_page, ark_page):
}
def extract_pdf_text(path, ark_pages):
def extract_pdf_text(path, ark_pages, existing_pages=None):
# Load all pages and children
pdf_pages = list(extract_pages(path))
assert len(pdf_pages) == len(
ark_pages
), f"Invalid nb of pages: pdf has {len(pdf_pages)}, ark has {len(ark_pages)}"
# Do not upload transcriptions for pages that already existed on Arkindex (retried imports)
if not existing_pages:
existing_pages = []
out = {}
for ark_page, pdf_page in zip(ark_pages, pdf_pages):
if ark_page["id"] in existing_pages:
continue
logger.debug(
f"PDF text extraction on arkindex element {ark_page['id']} and pdf page {pdf_page}"
)
......@@ -150,7 +155,7 @@ def save_pdf_transcriptions(parent_id, transcriptions) -> None:
raise
def upload_pdf_text(pdf_path, ark_pages) -> None:
def upload_pdf_text(pdf_path, ark_pages, existing_pages=None) -> None:
"""
Upload transcriptions from the text found in a PDF file to existing Arkindex elements.
......@@ -158,6 +163,11 @@ def upload_pdf_text(pdf_path, ark_pages) -> None:
:type pdf_path: str or pathlib.Path
:param ark_pages list: List of existing Arkindex elements matching each page of the PDF,
as they would be returned by the `ListElements` or `RetrieveElement` API endpoints.
:param existing_pages: List of Arkindex elements that should be skipped,
as they already have transcriptions.
:type existing_pages: list or None
"""
for page_id, transcriptions in extract_pdf_text(pdf_path, ark_pages).items():
for page_id, transcriptions in extract_pdf_text(
pdf_path, ark_pages, existing_pages=existing_pages
).items():
save_pdf_transcriptions(page_id, transcriptions)
......@@ -8,13 +8,14 @@ from pathlib import Path
from urllib.parse import quote_plus, urljoin
from apistar.exceptions import ErrorResponse
from pdf2image import convert_from_path
from arkindex_tasks import default_client
from arkindex_tasks.base import WORKER_RUN_ID
from arkindex_tasks.import_files.pdf import count_pdf_pages, upload_pdf_text
from arkindex_tasks.import_files.pdf import upload_pdf_text
from arkindex_tasks.import_s3.boto import get_client_from_env
from arkindex_tasks.import_s3.graph import PATH_DELIMITER, Node
from arkindex_tasks.utils import download_file, retried_request
from arkindex_tasks.utils import retried_request
from botocore.exceptions import ClientError
logging.basicConfig(format="[%(levelname)s] %(message)s", level=logging.INFO)
......@@ -50,6 +51,9 @@ class S3Import(object):
logger.setLevel(logging.DEBUG)
# Store progress statistics
self.progress = {"completed": 0, "existing": 0, "errors": 0, "total": 0}
# Maps S3 key prefixes to paths of temporary files for extracted PDF files,
# so that transcriptions can be added once page elements are created for each image.
self.pdf_paths = {}
# Ensure all the parameters are valid before starting a full import
try:
......@@ -116,7 +120,7 @@ class S3Import(object):
"""Retrieves elements hierarchy in the Arkindex corpus
Stores a global mapping between element's path and its ID
"""
# Do never serialize corpus nor zone on listed elements
# Never serialize corpus nor zone on listed elements
api_params = {"with_corpus": False, "with_zone": False}
paths_prefix = (self.prefix,) if self.prefix else ()
......@@ -230,74 +234,46 @@ class S3Import(object):
return retried_request("RetrieveImage", id=image_id)
raise e
def build_pdf_pages(self, node):
"""
For PDF files, we download the file to extract the transcriptions for each page.
We use Cantaloupe's meta-identifiers to create one child element per page,
then upload all the transcriptions.
https://cantaloupe-project.github.io/manual/5.0/images.html#MetaIdentifiers
"""
def extract_pdf(self, node):
assert node.is_pdf, "Only PDF nodes are supported"
assert node.arkindex_id, "Missing parent folder ID"
assert WORKER_RUN_ID, "A WorkerRun ID is required to upload PDF transcriptions"
pdf_url = self.boto_resource.meta.client.generate_presigned_url(
"get_object", Params={"Bucket": self.bucket, "Key": node.key}
)
_, pdf_path = tempfile.mkstemp(prefix="tasks-", suffix=".pdf")
pdf_path = Path(pdf_path)
try:
download_file(pdf_url, pdf_path)
# Extracting PDF transcriptions requires that we first create all pages, as it needs access
# to the resulting JPEG image width/height to scale the polygon coordinates to them.
# Since we rely on Cantaloupe's PDF processing, only Cantaloupe can tell us the image's size.
# To create all pages, we need a page count, which Cantaloupe does not provide and pdfminer
# does not explicitly provide or document an API for, so we use undocumented methods that
# pdfminer.high_level.extract_pages uses to list all pages.
page_count = count_pdf_pages(pdf_path)
# extract_pdf_text will require the pages' zone.image.width/height attributes,
# which would force us to create each element one by one with slim_output=False.
# We build fake elements instead that we will assign the element IDs to afterwards.
# We still won't be able to use any bulk endpoint since none support elements on multiple images.
pages = []
for i in range(1, page_count + 1):
# Add the ;<number> suffix to build the meta-identifier for this page
image = self.create_image(node, f";{i}")
page = retried_request(
"CreateElement",
body={
"corpus": self.corpus_id,
"parent": node.arkindex_id,
"name": str(i),
"type": self.page_type,
"image": image["id"],
"worker_run_id": WORKER_RUN_ID,
},
slim_output=True,
)
# If the PDF suffix is not removed, then the "parent" PDF node which is created
# has the same path as the existing PDF object, which breaks the bucket
key_stripped = node.key.replace(".pdf", "")
self.pdf_paths[key_stripped] = pdf_path
pages.append(
{
"id": page["id"],
"zone": {"image": image},
}
)
pdf_path = Path(pdf_path)
self.boto_resource.meta.client.download_file(self.bucket, node.key, pdf_path)
self.upload_pdf_pages(pdf_path, key_stripped)
def upload_pdf_pages(self, pdf_path, key):
with tempfile.TemporaryDirectory() as base_path:
images = convert_from_path(
pdf_path,
output_folder=base_path,
output_file="pdf-", # prefix image names
dpi=300,
fmt="jpg",
)
upload_pdf_text(pdf_path, pages)
finally:
pdf_path.unlink(missing_ok=True)
for image in images:
local_path = image.filename
bucket_path = f"{key}/{Path(local_path).name}"
try:
self.boto_resource.meta.client.upload_file(
local_path, self.bucket, bucket_path
)
self.root_node.add_descendant(bucket_path)
except ClientError as e:
logging.error(e)
def build_elements(self, node: Node) -> None:
"""Creates elements on the Arkindex corpus from a hierarchical node on the S3 bucket"""
if node.is_zip:
# Skip ZIP nodes, as those have been extracted separately.
if node.is_zip or node.is_pdf:
# Skip ZIP and PDF nodes, as those have been extracted separately.
return
# Continuously log progress
......@@ -315,14 +291,16 @@ class S3Import(object):
elt_id = self.arkindex_elements.get(node.lineage)
if elt_id:
logger.debug(f"Using existing element {node.name} ({elt_id})")
self.progress["existing"] += 1
node.arkindex_id = elt_id
if node.is_pdf:
self.progress["existing"] += 1
# Handle PDF files separately: they only have "final" children, the pages, and we need
# to store these pages created on Arkindex in order to upload the corresponding text
if node.key in self.pdf_paths:
self.build_pdf_pages(node)
else:
# Recursively handle node's children
for child_node in node:
self.build_elements(child_node)
return
# Recursively handle node's children
for child_node in node:
self.build_elements(child_node)
return
try:
......@@ -332,7 +310,7 @@ class S3Import(object):
"corpus": str(self.corpus_id),
"worker_run_id": WORKER_RUN_ID,
}
if node.is_final and not node.is_pdf:
if node.is_final:
# This element should be created with its image
image = self.create_image(node)
body.update({"type": self.page_type, "image": image["id"]})
......@@ -343,7 +321,7 @@ class S3Import(object):
body.update({"parent": self.top_folder_id})
# Create the element and save its ID to the current node
element = retried_request("CreateElement", slim_output=True, body=body)
element = retried_request("CreateElement", body=body)
node.arkindex_id = element["id"]
except Exception as e:
......@@ -358,12 +336,99 @@ class S3Import(object):
else:
self.progress["completed"] += 1
if node.is_pdf:
# Handle PDF files separately: they only have "final" children, the pages, and we need
# to store these pages created on Arkindex in order to upload the corresponding text.
if node.key in self.pdf_paths:
self.build_pdf_pages(node)
return
# Recursively handle node's children
for child_node in node:
self.build_elements(child_node)
def build_pdf_pages(self, node):
assert node.arkindex_id, "Missing parent folder ID"
arkindex_pages = []
# List of IDs of pages that already exist on Arkindex, to skip when importing transcriptions as well
existing_pages = []
for child_node in node:
child_id = self.arkindex_elements.get(child_node.lineage)
if not child_id:
try:
body = {
"type": self.page_type,
"name": child_node.name,
"corpus": str(self.corpus_id),
"worker_run_id": WORKER_RUN_ID,
}
if child_node.is_final:
# This element should be created with its image
image = self.create_image(child_node)
body.update({"image": image["id"]})
else:
# Children nodes of a PDF file should always be final
logger.error(
f"An error occurred processing PDF node '{node.key}': non-final child node found."
)
skip_count = len(node)
self.progress["errors"] += skip_count
logger.warning(
f"Skipping object {node.key} and its descendants"
)
break
if child_node.parent and child_node.parent.arkindex_id:
body.update({"parent": child_node.parent.arkindex_id})
# Create the element and save its ID to the current node
element = retried_request("CreateElement", body=body)
child_node.arkindex_id = element["id"]
self.progress["completed"] += 1
arkindex_pages.append(element)
except Exception as e:
skip_count = len(child_node)
self.progress["errors"] += skip_count
# Log information about the error
error = getattr(e, "content", e)
logger.error(
f"An error occurred processing object '{child_node.key}': {error}"
)
if skip_count > 1:
logger.warning(
f"{skip_count} descendant objects will be skipped"
)
return
else:
# Recursively handle node's children
for child_node in node:
self.build_elements(child_node)
# Skip creating page element if it already exists
logger.debug(f"Using existing element {child_node.name} ({child_id})")
child_node.arkindex_id = child_id
self.progress["existing"] += 1
arkindex_pages.append({"id": child_id})
existing_pages.append(child_id)
# Create transcriptions
upload_pdf_text(
self.pdf_paths[node.key], arkindex_pages, existing_pages=existing_pages
)
# Remove temporary PDF file
Path(self.pdf_paths[node.key]).unlink(missing_ok=True)
def handle_pdf_nodes(self) -> int:
"""
Extract and save images from all PDF files on the bucket, returning the PDF count.
Rebuilds the S3 objects graph if any PDF files are found.
"""
extracted = 0
# Copy the root_node as as we extract pages from the PDF files the graph will change
nodes = list(self.root_node.recurse())
for node in nodes:
if node.is_pdf:
self.extract_pdf(node)
extracted += 1
return extracted
def handle_zip_nodes(self) -> int:
"""
......@@ -371,15 +436,13 @@ class S3Import(object):
If any archive is extracted, the S3 objects graph is rebuilt.
"""
extracted = 0
for node in self.root_node.recurse():
# Copy the root_node as as files are extracted from the archive the graph will change
nodes = list(self.root_node.recurse())
for node in nodes:
if node.is_zip:
self.extract_zip_node(node)
extracted += 1
if extracted:
logger.info("Rebuilding graph after archive extraction")
self.build_graph()
return extracted
def extract_zip_node(self, node: Node) -> None:
......@@ -409,12 +472,19 @@ class S3Import(object):
key = PATH_DELIMITER.join((node.parent.name, key))
with zip_file.open(info) as f:
try:
self.boto_resource.meta.client.upload_fileobj(
f, self.bucket, key
)
except ClientError as e:
logging.error(e)
if key.lower().endswith(".pdf"):
temp_dir = tempfile.mkdtemp()
pdf_path = zip_file.extract(info, path=temp_dir)
self.upload_pdf_pages(pdf_path, key.replace(".pdf", ""))
self.pdf_paths[key.replace(".pdf", "")] = pdf_path
else:
try:
self.boto_resource.meta.client.upload_fileobj(
f, self.bucket, key
)
self.root_node.add_descendant(key)
except ClientError as e:
logging.error(e)
finally:
file_path.unlink(missing_ok=True)
......@@ -430,11 +500,12 @@ class S3Import(object):
)
self.build_graph()
pdf_count = self.handle_pdf_nodes()
zip_count = self.handle_zip_nodes()
# Build arkindex elements from the first level (e.g. skip the root node)
# Subtract the ZIP archive count since we know won't be importing those
self.progress["total"] = len(self.root_node) - 1 - zip_count
self.progress["total"] = len(self.root_node) - 1 - zip_count - pdf_count
logger.info(
f"Creating {self.progress['total']} elements in corpus '{self.corpus['name']}'"
)
......
......@@ -494,6 +494,7 @@ class TestFileImport(TestCase):
)
def test_run_pdf(self, mock):
# Process info
self.maxDiff = None
mock.get(
"/api/v1/process/processid/",
json={
......
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment