Skip to content
Snippets Groups Projects
Commit 6019805a authored by Bastien Abadie's avatar Bastien Abadie
Browse files

Merge branch 'upload-export' into 'master'

Upload exports to S3

Closes #765

See merge request !1372
parents 469fd66f b953b0b3
No related branches found
No related tags found
1 merge request!1372Upload exports to S3
import json
import logging
import os
import sqlite3
import tempfile
import uuid
......@@ -76,14 +77,14 @@ def save_sqlite(rows, table, cursor):
cursor.executemany(query, rows)
@job
@job('high')
def export_corpus(corpus_export: CorpusExport) -> None:
_, db_path = tempfile.mkstemp(suffix='db')
try:
rq_job = get_current_job()
corpus_export.state = CorpusExportState.Running
corpus_export.save()
_, db_path = tempfile.mkstemp(suffix='db')
logger.info(f"Exporting corpus {corpus_export.corpus_id} into {db_path}")
db = sqlite3.connect(db_path)
cursor = db.cursor()
......@@ -104,10 +105,27 @@ def export_corpus(corpus_export: CorpusExport) -> None:
db.close()
# Give a nice feedback on the upload portion of the export using boto3's upload callback
# This can be useful for large exports, when the database size is measured in gigabytes
size = os.stat(db_path).st_size
uploaded = 0
def progress_callback(bytes_transferred):
nonlocal uploaded
uploaded += bytes_transferred
if rq_job:
rq_job.set_progress(uploaded / size)
if rq_job:
rq_job.set_description(f'Uploading export for corpus {corpus_export.corpus.name}')
rq_job.set_progress(0.0)
corpus_export.s3_object.upload_file(db_path, Callback=progress_callback)
corpus_export.state = CorpusExportState.Done
corpus_export.save()
return db_path
except Exception:
corpus_export.state = CorpusExportState.Failed
corpus_export.save()
raise
finally:
os.unlink(db_path)
import json
import os
import sqlite3
from unittest.mock import call, patch
from arkindex.dataimport.models import WorkerVersion
from arkindex.documents.export import export_corpus
......@@ -20,7 +21,9 @@ from arkindex.project.tests import FixtureTestCase
class TestExport(FixtureTestCase):
def test_export(self):
@patch('arkindex.documents.export.os.unlink')
@patch('arkindex.project.aws.s3.Object')
def test_export(self, s3_object_mock, unlink_mock):
element = self.corpus.elements.get(name='Volume 1')
transcription = Transcription.objects.first()
version = WorkerVersion.objects.get(worker__slug='reco')
......@@ -58,11 +61,25 @@ class TestExport(FixtureTestCase):
export = self.corpus.exports.create(user=self.user)
db_path = export_corpus(export)
export_corpus(export)
export.refresh_from_db()
self.assertEqual(export.state, CorpusExportState.Done)
self.assertEqual(s3_object_mock().upload_file.call_count, 1)
args, kwargs = s3_object_mock().upload_file.call_args
self.assertCountEqual(kwargs.keys(), {'Callback'})
self.assertTrue(callable(kwargs['Callback']))
self.assertEqual(len(args), 1)
# Retrieve the database path from the S3 upload argument
db_path = args[0]
# We patched the file's removal to be able to check the DB after the job completes.
# In some cases, unlink can be called multiple times due to tempfile's implementation of gettempdir,
# which writes to a file then unlinks it.
# We just ignore the call count, and assume the last call is ours.
self.assertEqual(unlink_mock.call_args, call(db_path))
db = sqlite3.connect(db_path)
self.assertCountEqual(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment