Skip to content
Snippets Groups Projects

S3 import state CSV artifact

Merged Valentin Rigal requested to merge s3-import-state into master
All threads resolved!
1 file
+ 27
0
Compare changes
  • Side-by-side
  • Inline
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import csv
import logging
import os
import tempfile
@@ -53,6 +54,8 @@ class S3Import(object):
# Maps S3 key prefixes to paths of temporary files for extracted PDF files,
# so that transcriptions can be added once page elements are created for each image.
self.pdf_paths = {}
# Store state of the import {<file_path>: <arkindex_id>}
self.import_state = {}
# Ensure all the parameters are valid before starting a full import
try:
@@ -107,6 +110,13 @@ class S3Import(object):
logger.error(e)
raise e
@property
def working_dir(self):
if "PONOS_DATA" in os.environ:
return Path(os.environ["PONOS_DATA"]) / "current"
else:
return Path(tempfile.mkdtemp())
def list_bucket_keys(self):
bucket = self.boto_resource.Bucket(self.bucket)
for obj in bucket.objects.filter(Prefix=self.prefix):
@@ -291,6 +301,7 @@ class S3Import(object):
if elt_id:
logger.debug(f"Using existing element {node.name} ({elt_id})")
node.arkindex_id = elt_id
self.import_state[node.key] = elt_id
self.progress["existing"] += 1
# Handle PDF files separately: they only have "final" children, the pages, and we need
# to store these pages created on Arkindex in order to upload the corresponding text
@@ -324,6 +335,7 @@ class S3Import(object):
node.arkindex_id = element["id"]
except Exception as e:
self.import_state[node.key] = None
skip_count = len(node)
self.progress["errors"] += skip_count
# Log information about the error
@@ -334,6 +346,7 @@ class S3Import(object):
return
else:
self.import_state[node.key] = element["id"]
self.progress["completed"] += 1
# Handle PDF files separately: they only have "final" children, the pages, and we need
# to store these pages created on Arkindex in order to upload the corresponding text.
@@ -382,10 +395,12 @@ class S3Import(object):
# Create the element and save its ID to the current node
element = retried_request("CreateElement", body=body)
child_node.arkindex_id = element["id"]
self.import_state[child_node.key] = element["id"]
self.progress["completed"] += 1
arkindex_pages.append(element)
except Exception as e:
self.import_state[child_node.key] = None
skip_count = len(child_node)
self.progress["errors"] += skip_count
# Log information about the error
@@ -401,6 +416,7 @@ class S3Import(object):
else:
# Skip creating page element if it already exists
logger.debug(f"Using existing element {child_node.name} ({child_id})")
self.import_state[child_node.key] = child_id
child_node.arkindex_id = child_id
self.progress["existing"] += 1
arkindex_pages.append({"id": child_id})
@@ -488,6 +504,14 @@ class S3Import(object):
finally:
file_path.unlink(missing_ok=True)
def publish(self):
logger.info("Building import_state.csv artifact")
with open(self.working_dir / "import_state.csv", "w", newline="") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=["file_path", "arkindex_id"])
writer.writeheader()
for path, arkindex_id in self.import_state.items():
writer.writerow({"file_path": path, "arkindex_id": arkindex_id})
def run(self):
assert WORKER_RUN_ID, "A WorkerRun ID is required"
@@ -511,6 +535,9 @@ class S3Import(object):
for child in self.root_node:
self.build_elements(child)
# Publish a CSV with import state
self.publish()
stats_msg = ", ".join(
f"{count} {key}" for key, count in self.progress.items() if key != "total"
)
Loading