Skip to content
Snippets Groups Projects
Commit ba950dfb authored by Erwan Rouchet's avatar Erwan Rouchet Committed by Bastien Abadie
Browse files

First version of SQLite export

parent c84c2e42
No related branches found
No related tags found
No related merge requests found
......@@ -5,3 +5,4 @@ include tests-requirements.txt
recursive-include arkindex/templates *.html
recursive-include arkindex/templates *.json
recursive-include arkindex/templates *.txt
include arkindex/documents/export/*.sql
import csv
import sqlite3
import tempfile
from itertools import islice
from pathlib import Path
from django.db import connections
from django_rq import job
from rq import get_current_job
BASE_DIR = Path(__file__).absolute().parent
# Maximum number of rows that will be stored in memory at once before being sent to SQLite.
CSV_BATCH_SIZE = 10000
# Map SQLite table names to PostgreSQL queries
EXPORT_QUERIES = [(
'image',
# TODO: Build URLs
"""
SELECT image.id, '', image.width, image.height
FROM images_image image
INNER JOIN images_zone zone ON (zone.image_id = image.id)
INNER JOIN documents_element element ON (element.zone_id = zone.id)
WHERE element.corpus_id = '{corpus_id}'::uuid
"""
), (
'worker_version',
# TODO: Filter worker versions properly
"""
SELECT version.id, worker.name, worker.slug, worker.type, revision.hash
FROM dataimport_workerversion version
INNER JOIN dataimport_worker worker ON (version.worker_id = worker.id)
INNER JOIN dataimport_revision revision ON (version.revision_id = revision.id)
"""
), (
'element',
# Dates are exported as UNIX timestamps because SQLite does not have a datetime type,
# but has date and time functions that take UNIX timestamps as arguments.
"""
SELECT
element.id,
EXTRACT(EPOCH FROM element.created),
EXTRACT(EPOCH FROM element.updated),
element.name,
type.slug,
ST_AsGeoJSON(zone.polygon)::json->'coordinates',
zone.image_id,
element.worker_version_id
FROM documents_element element
INNER JOIN documents_elementtype type ON (element.type_id = type.id)
LEFT JOIN images_zone zone ON (element.zone_id = zone.id)
WHERE element.corpus_id = '{corpus_id}'::uuid
"""
), (
'element_path',
# The destination table has id, parent_id, child_id, ordering.
# In an ElementPath, the child ID is the element ID, and its immediate parent
# is the last item in the array (path[array_length(path, 1)]).
# We can safely ditch the rest of the array as the parent will have its own paths
# with its own parents as last items.
# This uses DISTINCT ON because when the parent has two paths, the child will have
# two paths with the same direct parent. These two paths should have the same
# ordering, but this is not actually enforced by any constraint. ORDER BY is used
# because DISTINCT ON requires it.
"""
SELECT
DISTINCT ON (path.path[array_length(path.path, 1)], path.element_id)
path.id,
path.path[array_length(path.path, 1)],
path.element_id,
path.ordering
FROM documents_elementpath path
INNER JOIN documents_element element ON (path.element_id = element.id)
WHERE element.corpus_id = '{corpus_id}'::uuid
ORDER BY path.path[array_length(path.path, 1)], path.element_id
"""
), (
'transcription',
"""
SELECT
transcription.id,
transcription.element_id,
transcription.text,
transcription.confidence,
transcription.worker_version_id
FROM documents_transcription transcription
INNER JOIN documents_element element ON (element.id = transcription.element_id)
WHERE element.corpus_id = '{corpus_id}'::uuid
"""
), (
'classification',
# SQLite has no boolean type, so high_confidence becomes an integer (0 or 1)
"""
SELECT
classification.id,
classification.element_id,
mlclass.name,
classification.state,
user.email,
classification.confidence,
classification.high_confidence::integer,
classification.worker_version_id
FROM documents_classification classification
INNER JOIN documents_element element ON (element.id = classification.element_id)
INNER JOIN documents_mlclass mlclass ON (mlclass.id = classification.ml_class_id)
LEFT JOIN users_user user ON (user.id = classification.moderator_id)
WHERE element.corpus_id = '{corpus_id}'::uuid
"""
), (
'entity',
"""
SELECT
entity.id,
entity.name,
entity.type,
entity.validated::integer,
user.email,
hstore_to_json(entity.metas),
entity.worker_version_id
FROM documents_entity entity
INNER JOIN users_user user ON (user.id = entity.moderator_id)
WHERE entity.corpus_id = '{corpus_id}'::uuid
"""
), (
'transcription_entity',
# This joins with the entity table to filter by corpus, as going via transcriptions
# requires two joins on larger tables (transcriptions then elements)
"""
SELECT
te.id,
te.transcription_id,
te.entity_id,
te.offset,
te.length,
te.worker_version_id
FROM documents_transcriptionentity te
INNER JOIN documents_entity entity ON (te.entity_id = entity.id)
WHERE entity.corpus_id = '{corpus_id}'::uuid
"""
), (
'entity_role',
"""
SELECT id, parent_name, child_name, parent_type, child_type
FROM documents_entityrole
WHERE corpus_id = '{corpus_id}'::uuid
"""
), (
'entity_link',
"""
SELECT link.id, link.parent_id, link.child_id, link.role_id
FROM documents_entitylink link
INNER JOIN documents_entityrole role ON (link.role_id = role.id)
WHERE role.corpus_id = '{corpus_id}'::uuid
"""
), (
'metadata',
"""
SELECT
metadata.id,
metadata.element_id,
metadata.name,
metadata.type,
metadata.value,
metadata.entity_id,
metadata.worker_version_id
FROM documents_metadata metadata
INNER JOIN documents_element element ON (element.id = metadata.element_id)
WHERE element.corpus_id = '{corpus_id}'::uuid
"""
)]
def pg_to_csv(csv_file, query):
csv_file.seek(0)
with connections['default'].cursor() as pg_cursor:
pg_cursor.copy_expert(f"COPY ({query}) TO STDOUT WITH FORMAT CSV, HEADER OFF", csv_file)
def csv_to_sqlite(csv_file, table, cursor):
csv_file.seek(0)
reader = csv.reader(csv_file)
while True:
rows = list(islice(reader, CSV_BATCH_SIZE))
if not len(rows):
return
# Build the parameterized query by counting the columns in a CSV row and repeating '?' parameters
insert_args = ",".join("?" for _ in range(len(rows[0])))
query = f"INSERT INTO {table} VALUES ({insert_args})"
cursor.executemany(query, rows)
@job
def export_corpus(corpus_id: str) -> None:
rq_job = get_current_job()
_, db_path = tempfile.mkstemp(suffix='db')
db = sqlite3.connect(db_path)
cursor = db.cursor()
# Initialize all the tables
cursor.executescript((BASE_DIR / 'tables.sql').read_text())
with tempfile.TemporaryFile() as csv_file:
for i, (table_name, query) in enumerate(EXPORT_QUERIES):
if rq_job:
rq_job.set_progress(i / len(EXPORT_QUERIES))
pg_to_csv(csv_file, query.format(corpus_id=corpus_id))
if rq_job:
rq_job.set_progress((i + 0.5) / len(EXPORT_QUERIES))
csv_to_sqlite(csv_file, table_name, cursor)
db.commit()
db.close()
return db_path
CREATE TABLE image (
id VARCHAR(37) NOT NULL,
url TEXT NOT NULL,
width INTEGER NOT NULL,
height INTEGER NOT NULL,
PRIMARY KEY (id),
UNIQUE (url),
CHECK (width >= 0 AND height >= 0)
);
CREATE TABLE worker_version (
id VARCHAR(37) NOT NULL,
name VARCHAR(100) NOT NULL,
slug VARCHAR(100) NOT NULL,
type VARCHAR(50) NOT NULL,
revision VARCHAR(50) NOT NULL,
PRIMARY KEY (id)
);
CREATE TABLE element (
id VARCHAR(37) NOT NULL,
created REAL NOT NULL,
updated REAL NOT NULL,
name VARCHAR(250) NOT NULL,
type VARCHAR(50) NOT NULL,
polygon TEXT,
image_id VARCHAR(37),
worker_version_id VARCHAR(37),
PRIMARY KEY (id),
FOREIGN KEY (image_id) REFERENCES image (id) ON DELETE CASCADE,
FOREIGN KEY (worker_version_id) REFERENCES worker_version (id) ON DELETE CASCADE
);
CREATE TABLE element_path (
id VARCHAR(37) NOT NULL,
parent_id VARCHAR(37) NOT NULL,
child_id VARCHAR(37) NOT NULL,
ordering INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (id),
FOREIGN KEY (parent_id) REFERENCES element (id) ON DELETE CASCADE,
FOREIGN KEY (child_id) REFERENCES element (id) ON DELETE CASCADE,
UNIQUE (parent_id, child_id),
CHECK (ordering >= 0)
);
CREATE TABLE transcription (
id VARCHAR(37) NOT NULL,
element_id VARCHAR(37) NOT NULL,
text TEXT NOT NULL,
confidence REAL,
worker_version_id VARCHAR(37),
PRIMARY KEY (id),
FOREIGN KEY (element_id) REFERENCES element (id) ON DELETE CASCADE,
FOREIGN KEY (worker_version_id) REFERENCES worker_version (id) ON DELETE CASCADE,
CHECK (confidence IS NULL OR (confidence >= 0 AND confidence <= 1))
);
CREATE TABLE classification (
id VARCHAR(37) NOT NULL,
element_id VARCHAR(37) NOT NULL,
class_name VARCHAR(1024) NOT NULL,
state VARCHAR(16) NOT NULL DEFAULT 'pending',
moderator VARCHAR(255),
confidence REAL,
high_confidence INTEGER NOT NULL DEFAULT 0,
worker_version_id VARCHAR(37),
PRIMARY KEY (id),
FOREIGN KEY (element_id) REFERENCES element (id) ON DELETE CASCADE,
FOREIGN KEY (worker_version_id) REFERENCES worker_version (id) ON DELETE CASCADE,
CHECK (confidence IS NULL OR (confidence >= 0 AND confidence <= 1)),
CHECK (high_confidence = 0 OR high_confidence = 1)
);
CREATE TABLE entity (
id VARCHAR(37) NOT NULL,
name TEXT NOT NULL,
type VARCHAR(50) NOT NULL,
validated INTEGER NOT NULL DEFAULT 0,
moderator VARCHAR(255),
metas TEXT,
worker_version_id VARCHAR(37),
PRIMARY KEY (id),
FOREIGN KEY (worker_version_id) REFERENCES worker_version (id) ON DELETE CASCADE,
CHECK (validated = 0 OR validated = 1)
);
CREATE TABLE transcription_entity (
id VARCHAR(37) NOT NULL,
transcription_id VARCHAR(37) NOT NULL,
entity_id VARCHAR(37) NOT NULL,
offset INTEGER NOT NULL,
length INTEGER NOT NULL,
worker_version_id VARCHAR(37),
PRIMARY KEY (id),
FOREIGN KEY (transcription_id) REFERENCES transcription (id) ON DELETE CASCADE,
FOREIGN KEY (entity_id) REFERENCES entity (id) ON DELETE CASCADE,
FOREIGN KEY (worker_version_id) REFERENCES worker_version (id) ON DELETE CASCADE,
UNIQUE (transcription_id, entity_id, offset, length, worker_version_id),
CHECK (offset >= 0 AND length >= 0)
);
CREATE TABLE entity_role (
id VARCHAR(37) NOT NULL,
parent_name VARCHAR(250) NOT NULL,
child_name VARCHAR(250) NOT NULL,
parent_type VARCHAR(50) NOT NULL,
child_type VARCHAR(50) NOT NULL,
PRIMARY KEY (id),
UNIQUE (parent_name, child_name, parent_type, child_type)
);
CREATE TABLE entity_link (
id VARCHAR(37) NOT NULL,
parent_id VARCHAR(37) NOT NULL,
child_id VARCHAR(37) NOT NULL,
role_id VARCHAR(37) NOT NULL,
PRIMARY KEY (id),
FOREIGN KEY (parent_id) REFERENCES entity (id),
FOREIGN KEY (child_id) REFERENCES entity (id),
FOREIGN KEY (role_id) REFERENCES role (id)
);
CREATE TABLE metadata (
id VARCHAR(37) NOT NULL,
element_id VARCHAR(37) NOT NULL,
name VARCHAR(250) NOT NULL,
type VARCHAR(50) NOT NULL,
value TEXT NOT NULL,
entity_id VARCHAR(37),
worker_version_id VARCHAR(37),
PRIMARY KEY (id),
FOREIGN KEY (element_id) REFERENCES element (id) ON DELETE CASCADE,
FOREIGN KEY (entity_id) REFERENCES entity (id) ON DELETE SET NULL,
FOREIGN KEY (worker_version_id) REFERENCES worker_version (id) ON DELETE CASCADE
);
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment