Skip to content
Snippets Groups Projects

Add SQLite export task

Merged Erwan Rouchet requested to merge sqlite-export into master
17 files
+ 633
1
Compare changes
  • Side-by-side
  • Inline
Files
17
+ 102
0
import json
import logging
import sqlite3
import tempfile
import uuid
from pathlib import Path
from django.db import connections
from django_rq import job
from rq import get_current_job
BASE_DIR = Path(__file__).absolute().parent
# Maximum number of rows that will be stored in memory at once before being sent to SQLite.
BATCH_SIZE = 10000
logger = logging.getLogger(__name__)
# Map SQLite table names to PostgreSQL query files
EXPORT_QUERIES = [
'image',
'worker_version',
'element',
'element_path',
'transcription',
'classification',
'entity',
'transcription_entity',
'entity_role',
'entity_link',
'metadata',
]
def run_pg_query(query):
"""
Run a single Postgresql query and split the results into chunks.
When a name is given to a cursor, psycopg2 uses a server-side cursor; we just use a random string as a name.
"""
with connections['default'].create_cursor(name=str(uuid.uuid4())) as pg_cursor:
pg_cursor.itersize = BATCH_SIZE
pg_cursor.execute(query)
while True:
out = pg_cursor.fetchmany(BATCH_SIZE)
if not out:
return
yield out
def save_sqlite(rows, table, cursor):
"""
Write a chunk of rows into an SQLite table
"""
def _serialize(value):
# Serialize UUID as string
if isinstance(value, uuid.UUID):
return str(value)
# Serialize list and dicts as json
if isinstance(value, (list, dict)):
return json.dumps(value)
return value
rows = [
list(map(_serialize, row))
for row in rows
]
# Build the parameterized query by counting the columns in a CSV row and repeating '?' parameters
insert_args = ",".join("?" for _ in range(len(rows[0])))
query = f"INSERT INTO {table} VALUES ({insert_args})"
cursor.executemany(query, rows)
@job
def export_corpus(corpus_id: str) -> None:
rq_job = get_current_job()
_, db_path = tempfile.mkstemp(suffix='db')
logger.info(f"Exporting corpus {corpus_id} into {db_path}")
db = sqlite3.connect(db_path)
cursor = db.cursor()
# Initialize all the tables
cursor.executescript((BASE_DIR / 'structure.sql').read_text())
for i, name in enumerate(EXPORT_QUERIES):
query = (BASE_DIR / f'{name}.sql').read_text()
logger.info(f"Running query {i+1}/{len(EXPORT_QUERIES)} {name}")
if rq_job:
rq_job.set_progress(i / len(EXPORT_QUERIES))
for chunk in run_pg_query(query.format(corpus_id=corpus_id)):
save_sqlite(chunk, name, cursor)
db.commit()
db.close()
return db_path
Loading