Skip to content
Snippets Groups Projects
Commit 97f85431 authored by Bastien Abadie's avatar Bastien Abadie
Browse files

Do not use CSV

parent e90e97b4
No related branches found
No related tags found
No related merge requests found
import csv
import logging
import sqlite3
import tempfile
from io import StringIO
from itertools import islice
import uuid
from pathlib import Path
from django.db import connections
......@@ -12,7 +11,9 @@ from rq import get_current_job
BASE_DIR = Path(__file__).absolute().parent
# Maximum number of rows that will be stored in memory at once before being sent to SQLite.
CSV_BATCH_SIZE = 10000
BATCH_SIZE = 10000
logger = logging.getLogger(__name__)
# Map SQLite table names to PostgreSQL query files
EXPORT_QUERIES = [
......@@ -30,32 +31,39 @@ EXPORT_QUERIES = [
]
def pg_to_csv(query):
output = StringIO()
def run_pg_query(query):
"""
Run a single Postgresql query and split the results
into chunks
"""
with connections['default'].cursor() as pg_cursor:
pg_cursor.copy_expert(f"COPY ({query}) TO STDOUT WITH (FORMAT CSV, HEADER OFF, NULL '__null__')", output)
return output
pg_cursor.execute(query)
while True:
out = pg_cursor.fetchmany(BATCH_SIZE)
if not out:
return
yield out
def csv_to_sqlite(csv_file, table, cursor):
csv_file.seek(0)
reader = csv.reader(csv_file)
while True:
rows = list(islice(reader, CSV_BATCH_SIZE))
if not len(rows):
return
def save_sqlite(rows, table, cursor):
"""
Write a chunk of rows into an SQLite table
"""
# Replace null strings with None
for row in rows:
for i in range(len(row)):
if row[i] == "__null__":
row[i] = None
# Serialize UUID and lists as strings
rows = [
[
str(value) if isinstance(value, (uuid.UUID, list)) else value
for value in row
]
for row in rows
]
# Build the parameterized query by counting the columns in a CSV row and repeating '?' parameters
insert_args = ",".join("?" for _ in range(len(rows[0])))
query = f"INSERT INTO {table} VALUES ({insert_args})"
cursor.executemany(query, rows)
# Build the parameterized query by counting the columns in a CSV row and repeating '?' parameters
insert_args = ",".join("?" for _ in range(len(rows[0])))
query = f"INSERT INTO {table} VALUES ({insert_args})"
cursor.executemany(query, rows)
@job
......@@ -63,21 +71,22 @@ def export_corpus(corpus_id: str) -> None:
rq_job = get_current_job()
_, db_path = tempfile.mkstemp(suffix='db')
logger.info(f"Exporting corpus {corpus_id} into {db_path}")
db = sqlite3.connect(db_path)
cursor = db.cursor()
# Initialize all the tables
cursor.executescript((BASE_DIR / 'structure.sql').read_text())
for i, name in enumerate(EXPORT_QUERIES):
query = (BASE_DIR / f'{name}.sql').read_text()
logger.info(f"Running query {i+1}/{len(EXPORT_QUERIES)} {name}")
if rq_job:
rq_job.set_progress(i / len(EXPORT_QUERIES))
csv_file = pg_to_csv(query.format(corpus_id=corpus_id))
for chunk in run_pg_query(query.format(corpus_id=corpus_id)):
save_sqlite(chunk, name, cursor)
if rq_job:
rq_job.set_progress((i + 0.5) / len(EXPORT_QUERIES))
csv_to_sqlite(csv_file, name, cursor)
db.commit()
db.close()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment