Skip to content
Snippets Groups Projects
Commit 41f2493c authored by Erwan Rouchet's avatar Erwan Rouchet Committed by Bastien Abadie
Browse files

Add SQLite export task

parent f8539f96
No related branches found
No related tags found
1 merge request!1349Add SQLite export task
Showing
with 633 additions and 1 deletion
......@@ -74,6 +74,7 @@ COPY VERSION /etc/arkindex.version
# Copy templates in base dir for binary
ENV BASE_DIR=/usr/share/arkindex
COPY arkindex/templates /usr/share/arkindex/templates
COPY arkindex/documents/export/*.sql /usr/share/arkindex/documents/export/
# Touch python files for needed management commands
# Otherwise Django will not load the compiled module
......
......@@ -5,3 +5,4 @@ include tests-requirements.txt
recursive-include arkindex/templates *.html
recursive-include arkindex/templates *.json
recursive-include arkindex/templates *.txt
include arkindex/documents/export/*.sql
import json
import logging
import sqlite3
import tempfile
import uuid
from pathlib import Path
from django.db import connections
from django_rq import job
from rq import get_current_job
BASE_DIR = Path(__file__).absolute().parent
# Maximum number of rows that will be stored in memory at once before being sent to SQLite.
BATCH_SIZE = 10000
logger = logging.getLogger(__name__)
# Map SQLite table names to PostgreSQL query files
EXPORT_QUERIES = [
'image',
'worker_version',
'element',
'element_path',
'transcription',
'classification',
'entity',
'transcription_entity',
'entity_role',
'entity_link',
'metadata',
]
def run_pg_query(query):
"""
Run a single Postgresql query and split the results into chunks.
When a name is given to a cursor, psycopg2 uses a server-side cursor; we just use a random string as a name.
"""
with connections['default'].create_cursor(name=str(uuid.uuid4())) as pg_cursor:
pg_cursor.itersize = BATCH_SIZE
pg_cursor.execute(query)
while True:
out = pg_cursor.fetchmany(BATCH_SIZE)
if not out:
return
yield out
def save_sqlite(rows, table, cursor):
"""
Write a chunk of rows into an SQLite table
"""
def _serialize(value):
# Serialize UUID as string
if isinstance(value, uuid.UUID):
return str(value)
# Serialize list and dicts as json
if isinstance(value, (list, dict)):
return json.dumps(value)
return value
rows = [
list(map(_serialize, row))
for row in rows
]
# Build the parameterized query by counting the columns in a CSV row and repeating '?' parameters
insert_args = ",".join("?" for _ in range(len(rows[0])))
query = f"INSERT INTO {table} VALUES ({insert_args})"
cursor.executemany(query, rows)
@job
def export_corpus(corpus_id: str) -> None:
rq_job = get_current_job()
_, db_path = tempfile.mkstemp(suffix='db')
logger.info(f"Exporting corpus {corpus_id} into {db_path}")
db = sqlite3.connect(db_path)
cursor = db.cursor()
# Initialize all the tables
cursor.executescript((BASE_DIR / 'structure.sql').read_text())
for i, name in enumerate(EXPORT_QUERIES):
query = (BASE_DIR / f'{name}.sql').read_text()
logger.info(f"Running query {i+1}/{len(EXPORT_QUERIES)} {name}")
if rq_job:
rq_job.set_progress(i / len(EXPORT_QUERIES))
for chunk in run_pg_query(query.format(corpus_id=corpus_id)):
save_sqlite(chunk, name, cursor)
db.commit()
db.close()
return db_path
SELECT
classification.id,
classification.element_id,
mlclass.name,
classification.state,
moderator.email,
classification.confidence,
-- SQLite has no boolean type, so high_confidence becomes an integer (0 or 1)
classification.high_confidence::integer,
classification.worker_version_id
FROM documents_classification classification
INNER JOIN documents_element element ON (element.id = classification.element_id)
INNER JOIN documents_mlclass mlclass ON (mlclass.id = classification.ml_class_id)
LEFT JOIN users_user moderator ON (moderator.id = classification.moderator_id)
WHERE element.corpus_id = '{corpus_id}'::uuid
-- Dates are exported as UNIX timestamps because SQLite does not have a datetime type,
-- but has date and time functions that take UNIX timestamps as arguments.
SELECT
element.id,
EXTRACT(EPOCH FROM element.created),
EXTRACT(EPOCH FROM element.updated),
element.name,
type.slug,
ST_AsGeoJSON(zone.polygon)::json->'coordinates',
zone.image_id,
element.worker_version_id
FROM documents_element element
INNER JOIN documents_elementtype type ON (element.type_id = type.id)
LEFT JOIN images_zone zone ON (element.zone_id = zone.id)
WHERE element.corpus_id = '{corpus_id}'::uuid
-- The destination table has id, parent_id, child_id, ordering.
-- In an ElementPath, the child ID is the element ID, and its immediate parent
-- is the last item in the array (path[array_length(path, 1)]).
-- We can safely ditch the rest of the array as the parent will have its own paths
-- with its own parents as last items.
-- This uses DISTINCT ON because when the parent has two paths, the child will have
-- two paths with the same direct parent. These two paths should have the same
-- ordering, but this is not actually enforced by any constraint. ORDER BY is used
-- because DISTINCT ON requires it.
SELECT
DISTINCT ON (path.path[array_length(path.path, 1)], path.element_id)
path.id,
path.path[array_length(path.path, 1)],
path.element_id,
path.ordering
FROM documents_elementpath path
INNER JOIN documents_element element ON (path.element_id = element.id)
WHERE element.corpus_id = '{corpus_id}'::uuid
ORDER BY path.path[array_length(path.path, 1)], path.element_id
SELECT
entity.id,
entity.name,
entity.type,
entity.validated::integer,
moderator.email,
hstore_to_json(entity.metas),
entity.worker_version_id
FROM documents_entity entity
LEFT JOIN users_user moderator ON (moderator.id = entity.moderator_id)
WHERE entity.corpus_id = '{corpus_id}'::uuid
\ No newline at end of file
SELECT link.id, link.parent_id, link.child_id, link.role_id
FROM documents_entitylink link
INNER JOIN documents_entityrole role ON (link.role_id = role.id)
WHERE role.corpus_id = '{corpus_id}'::uuid
\ No newline at end of file
SELECT id, parent_name, child_name, parent_type, child_type
FROM documents_entityrole
WHERE corpus_id = '{corpus_id}'::uuid
\ No newline at end of file
SELECT DISTINCT image.id, CONCAT(TRIM(TRAILING '/' FROM server.url), '/', image.path), image.width, image.height
FROM images_image image
INNER JOIN images_zone zone ON (zone.image_id = image.id)
INNER JOIN documents_element element ON (element.zone_id = zone.id)
INNER JOIN images_imageserver server ON (server.id = image.server_id)
WHERE element.corpus_id = '{corpus_id}'::uuid
\ No newline at end of file
SELECT
metadata.id,
metadata.element_id,
metadata.name,
metadata.type,
metadata.value,
metadata.entity_id,
metadata.worker_version_id
FROM documents_metadata metadata
INNER JOIN documents_element element ON (element.id = metadata.element_id)
WHERE element.corpus_id = '{corpus_id}'::uuid
\ No newline at end of file
PRAGMA foreign_keys = ON;
CREATE TABLE image (
id VARCHAR(37) NOT NULL,
url TEXT NOT NULL,
width INTEGER NOT NULL,
height INTEGER NOT NULL,
PRIMARY KEY (id),
UNIQUE (url),
CHECK (width >= 0 AND height >= 0)
);
CREATE TABLE worker_version (
id VARCHAR(37) NOT NULL,
name VARCHAR(100) NOT NULL,
slug VARCHAR(100) NOT NULL,
type VARCHAR(50) NOT NULL,
revision VARCHAR(50) NOT NULL,
PRIMARY KEY (id)
);
CREATE TABLE element (
id VARCHAR(37) NOT NULL,
created REAL NOT NULL,
updated REAL NOT NULL,
name VARCHAR(250) NOT NULL,
type VARCHAR(50) NOT NULL,
polygon TEXT,
image_id VARCHAR(37),
worker_version_id VARCHAR(37),
PRIMARY KEY (id),
FOREIGN KEY (image_id) REFERENCES image (id) ON DELETE CASCADE,
FOREIGN KEY (worker_version_id) REFERENCES worker_version (id) ON DELETE CASCADE
);
CREATE TABLE element_path (
id VARCHAR(37) NOT NULL,
parent_id VARCHAR(37) NOT NULL,
child_id VARCHAR(37) NOT NULL,
ordering INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY (id),
FOREIGN KEY (parent_id) REFERENCES element (id) ON DELETE CASCADE,
FOREIGN KEY (child_id) REFERENCES element (id) ON DELETE CASCADE,
UNIQUE (parent_id, child_id),
CHECK (ordering >= 0)
);
CREATE TABLE transcription (
id VARCHAR(37) NOT NULL,
element_id VARCHAR(37) NOT NULL,
text TEXT NOT NULL,
confidence REAL,
worker_version_id VARCHAR(37),
PRIMARY KEY (id),
FOREIGN KEY (element_id) REFERENCES element (id) ON DELETE CASCADE,
FOREIGN KEY (worker_version_id) REFERENCES worker_version (id) ON DELETE CASCADE,
CHECK (confidence IS NULL OR (confidence >= 0 AND confidence <= 1))
);
CREATE TABLE classification (
id VARCHAR(37) NOT NULL,
element_id VARCHAR(37) NOT NULL,
class_name VARCHAR(1024) NOT NULL,
state VARCHAR(16) NOT NULL DEFAULT 'pending',
moderator VARCHAR(255),
confidence REAL,
high_confidence INTEGER NOT NULL DEFAULT 0,
worker_version_id VARCHAR(37),
PRIMARY KEY (id),
FOREIGN KEY (element_id) REFERENCES element (id) ON DELETE CASCADE,
FOREIGN KEY (worker_version_id) REFERENCES worker_version (id) ON DELETE CASCADE,
CHECK (confidence IS NULL OR (confidence >= 0 AND confidence <= 1)),
CHECK (high_confidence = 0 OR high_confidence = 1)
);
CREATE TABLE entity (
id VARCHAR(37) NOT NULL,
name TEXT NOT NULL,
type VARCHAR(50) NOT NULL,
validated INTEGER NOT NULL DEFAULT 0,
moderator VARCHAR(255),
metas TEXT,
worker_version_id VARCHAR(37),
PRIMARY KEY (id),
FOREIGN KEY (worker_version_id) REFERENCES worker_version (id) ON DELETE CASCADE,
CHECK (validated = 0 OR validated = 1)
);
CREATE TABLE transcription_entity (
id VARCHAR(37) NOT NULL,
transcription_id VARCHAR(37) NOT NULL,
entity_id VARCHAR(37) NOT NULL,
offset INTEGER NOT NULL,
length INTEGER NOT NULL,
worker_version_id VARCHAR(37),
PRIMARY KEY (id),
FOREIGN KEY (transcription_id) REFERENCES transcription (id) ON DELETE CASCADE,
FOREIGN KEY (entity_id) REFERENCES entity (id) ON DELETE CASCADE,
FOREIGN KEY (worker_version_id) REFERENCES worker_version (id) ON DELETE CASCADE,
UNIQUE (transcription_id, entity_id, offset, length, worker_version_id),
CHECK (offset >= 0 AND length >= 0)
);
CREATE TABLE entity_role (
id VARCHAR(37) NOT NULL,
parent_name VARCHAR(250) NOT NULL,
child_name VARCHAR(250) NOT NULL,
parent_type VARCHAR(50) NOT NULL,
child_type VARCHAR(50) NOT NULL,
PRIMARY KEY (id),
UNIQUE (parent_name, child_name, parent_type, child_type)
);
CREATE TABLE entity_link (
id VARCHAR(37) NOT NULL,
parent_id VARCHAR(37) NOT NULL,
child_id VARCHAR(37) NOT NULL,
role_id VARCHAR(37) NOT NULL,
PRIMARY KEY (id),
FOREIGN KEY (parent_id) REFERENCES entity (id),
FOREIGN KEY (child_id) REFERENCES entity (id),
FOREIGN KEY (role_id) REFERENCES entity_role (id)
);
CREATE TABLE metadata (
id VARCHAR(37) NOT NULL,
element_id VARCHAR(37) NOT NULL,
name VARCHAR(250) NOT NULL,
type VARCHAR(50) NOT NULL,
value TEXT NOT NULL,
entity_id VARCHAR(37),
worker_version_id VARCHAR(37),
PRIMARY KEY (id),
FOREIGN KEY (element_id) REFERENCES element (id) ON DELETE CASCADE,
FOREIGN KEY (entity_id) REFERENCES entity (id) ON DELETE SET NULL,
FOREIGN KEY (worker_version_id) REFERENCES worker_version (id) ON DELETE CASCADE
);
SELECT
transcription.id,
transcription.element_id,
transcription.text,
transcription.confidence,
transcription.worker_version_id
FROM documents_transcription transcription
INNER JOIN documents_element element ON (element.id = transcription.element_id)
WHERE element.corpus_id = '{corpus_id}'::uuid
\ No newline at end of file
-- This joins with the entity table to filter by corpus, as going via transcriptions
-- requires two joins on larger tables (transcriptions then elements)
SELECT
te.id,
te.transcription_id,
te.entity_id,
te.offset,
te.length,
te.worker_version_id
FROM documents_transcriptionentity te
INNER JOIN documents_entity entity ON (te.entity_id = entity.id)
WHERE entity.corpus_id = '{corpus_id}'::uuid
-- This filters worker versions to only include those used in any of the kinds
-- of ML results we have. Doing it using LEFT JOIN would require 10 joins and
-- fills up the RAM. Adding DISTINCT to all the SELECT queries of the UNION
-- slows this query down by ~20%. Using multiple INs instead of a UNION makes
-- this query twice as slow.
SELECT version.id, worker.name, worker.slug, worker.type, revision.hash
FROM dataimport_workerversion version
INNER JOIN dataimport_worker worker ON (version.worker_id = worker.id)
INNER JOIN dataimport_revision revision ON (version.revision_id = revision.id)
WHERE version.id IN (
SELECT worker_version_id FROM documents_element WHERE corpus_id = '{corpus_id}'::uuid
UNION
SELECT worker_version_id FROM documents_entity WHERE corpus_id = '{corpus_id}'::uuid
UNION
SELECT classification.worker_version_id
FROM documents_classification classification
INNER JOIN documents_element element ON (element.id = classification.element_id)
WHERE element.corpus_id = '{corpus_id}'::uuid
UNION
SELECT transcription.worker_version_id
FROM documents_transcription transcription
INNER JOIN documents_element element ON (element.id = transcription.element_id)
WHERE element.corpus_id = '{corpus_id}'::uuid
UNION
SELECT te.worker_version_id
FROM documents_transcriptionentity te
INNER JOIN documents_entity entity ON (te.entity_id = entity.id)
WHERE entity.corpus_id = '{corpus_id}'::uuid
)
import json
import os
import sqlite3
from arkindex.dataimport.models import WorkerVersion
from arkindex.documents.export import export_corpus
from arkindex.documents.models import (
Classification,
ElementPath,
EntityLink,
EntityType,
MetaData,
Transcription,
TranscriptionEntity,
)
from arkindex.images.models import Image
from arkindex.project.tests import FixtureTestCase
class TestExport(FixtureTestCase):
def test_export(self):
element = self.corpus.elements.get(name='Volume 1')
transcription = Transcription.objects.first()
version = WorkerVersion.objects.get(worker__slug='reco')
element.classifications.create(
ml_class=self.corpus.ml_classes.create(name='Blah'),
confidence=.55555555,
)
entity1 = self.corpus.entities.create(
name='Arrokuda',
type=EntityType.Location,
metas={'subtype': 'pokemon'},
)
entity2 = self.corpus.entities.create(
name='Stonjourner',
type=EntityType.Person,
validated=True,
moderator=self.superuser,
)
role = self.corpus.roles.create(
parent_name='parent',
child_name='child',
parent_type=EntityType.Location,
child_type=EntityType.Person,
)
role.links.create(parent=entity1, child=entity2)
transcription.transcription_entities.create(
entity=entity1,
offset=1,
length=1,
worker_version=version,
)
db_path = export_corpus(self.corpus.id)
db = sqlite3.connect(db_path)
self.assertCountEqual(
db.execute('SELECT id, url, width, height FROM image').fetchall(),
[
(
str(image.id),
image.url,
image.width,
image.height
)
for image in Image.objects.all()
]
)
self.assertCountEqual(
db.execute('SELECT id, name, slug, type, revision FROM worker_version').fetchall(),
[
(
str(version.id),
version.worker.name,
version.worker.slug,
version.worker.type,
version.revision.hash
)
]
)
actual_rows = db.execute("SELECT id, created, updated, name, type, worker_version_id, image_id, polygon FROM element").fetchall()
for i in range(len(actual_rows)):
# Convert the row from a tuple to a list because we'll change it
row = list(actual_rows[i])
if row[-1] is not None:
# Parse the polygons as JSON for easier comparison
row[-1] = json.loads(row[-1])
actual_rows[i] = row
expected_rows = []
for element in self.corpus.elements.all():
row = [
str(element.id),
element.created.timestamp(),
element.updated.timestamp(),
element.name,
element.type.slug,
]
if element.worker_version_id:
row.append(str(element.worker_version_id))
else:
row.append(None)
if element.zone:
row.append(str(element.zone.image_id))
row.append([
# coords returns a list of tuples of floats, we turn it into a list of lists of ints
[int(x), int(y)] for x, y in element.zone.polygon.coords
])
else:
row.extend([None, None])
expected_rows.append(row)
self.assertCountEqual(actual_rows, expected_rows)
self.assertCountEqual(
db.execute("SELECT id, parent_id, child_id, ordering FROM element_path").fetchall(),
[
(
str(id),
str(parent_id),
str(child_id),
ordering
)
for id, parent_id, child_id, ordering
in ElementPath.objects
.filter(element__corpus=self.corpus)
.values_list('id', 'path__last', 'element_id', 'ordering')
]
)
self.assertCountEqual(
db.execute("SELECT id, element_id, text, confidence, worker_version_id FROM transcription").fetchall(),
[
(
str(transcription.id),
str(transcription.element_id),
transcription.text,
transcription.confidence,
str(transcription.worker_version_id) if transcription.worker_version_id else None
)
for transcription in Transcription.objects.filter(element__corpus=self.corpus)
]
)
self.assertCountEqual(
db.execute("SELECT id, element_id, class_name, state, moderator, confidence, high_confidence, worker_version_id FROM classification").fetchall(),
[
(
str(classification.id),
str(classification.element_id),
classification.ml_class.name,
classification.state.value,
classification.moderator.email if classification.moderator else None,
classification.confidence,
int(classification.high_confidence),
str(classification.worker_version_id) if classification.worker_version_id else None
)
for classification in Classification.objects.filter(element__corpus=self.corpus)
]
)
self.assertCountEqual(
db.execute("SELECT id, element_id, name, type, value, entity_id, worker_version_id FROM metadata").fetchall(),
[
(
str(metadata.id),
str(metadata.element_id),
metadata.name,
metadata.type.value,
metadata.value,
str(metadata.entity_id) if metadata.entity_id else None,
str(metadata.worker_version_id) if metadata.worker_version_id else None
)
for metadata in MetaData.objects.filter(element__corpus=self.corpus)
]
)
self.assertCountEqual(
db.execute("SELECT id, name, type, validated, moderator, metas, worker_version_id FROM entity").fetchall(),
[
(
str(entity.id),
entity.name,
entity.type.value,
int(entity.validated),
entity.moderator.email if entity.moderator else None,
json.dumps(entity.metas) if entity.metas else None,
str(entity.worker_version_id) if entity.worker_version_id else None,
)
for entity in self.corpus.entities.all()
]
)
self.assertCountEqual(
db.execute("SELECT id, parent_name, child_name, parent_type, child_type FROM entity_role").fetchall(),
[
(
str(role.id),
role.parent_name,
role.child_name,
role.parent_type.value,
role.child_type.value,
)
for role in self.corpus.roles.all()
]
)
self.assertCountEqual(
db.execute("SELECT id, parent_id, child_id, role_id FROM entity_link").fetchall(),
[
(
str(link.id),
str(link.parent_id),
str(link.child_id),
str(link.role_id)
)
for link in EntityLink.objects.filter(role__corpus=self.corpus)
]
)
self.assertCountEqual(
db.execute("SELECT id, transcription_id, entity_id, offset, length, worker_version_id FROM transcription_entity").fetchall(),
[
(
str(transcription_entity.id),
str(transcription_entity.transcription_id),
str(transcription_entity.entity_id),
transcription_entity.offset,
transcription_entity.length,
str(transcription_entity.worker_version_id) if transcription_entity.worker_version_id else None
)
for transcription_entity in TranscriptionEntity.objects.filter(entity__corpus=self.corpus)
]
)
os.unlink(db_path)
......@@ -7,7 +7,7 @@ from uuid import UUID
from django.conf import settings
from arkindex.dataimport.models import DataImport, WorkerVersion
from arkindex.documents import tasks
from arkindex.documents import export, tasks
from arkindex.documents.managers import ElementQuerySet
from arkindex.documents.models import Corpus, Element, Entity
......@@ -142,3 +142,14 @@ def initialize_activity(process: DataImport):
Initialize activity on every process elements for worker versions that are part of its workflow
"""
tasks.initialize_activity.delay(process)
def export_corpus(corpus: Corpus, user_id: Optional[int] = None) -> None:
"""
Export a corpus to a SQLite database
"""
export.export_corpus.delay(
corpus_id=corpus.id,
user_id=user_id,
description=f'Export of corpus {corpus.name}'
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment