Skip to content
Snippets Groups Projects
Commit b953b0b3 authored by Erwan Rouchet's avatar Erwan Rouchet Committed by Bastien Abadie
Browse files

Upload exports to S3

parent 469fd66f
No related branches found
No related tags found
1 merge request!1372Upload exports to S3
import json
import logging
import os
import sqlite3
import tempfile
import uuid
......@@ -76,14 +77,14 @@ def save_sqlite(rows, table, cursor):
cursor.executemany(query, rows)
@job
@job('high')
def export_corpus(corpus_export: CorpusExport) -> None:
_, db_path = tempfile.mkstemp(suffix='db')
try:
rq_job = get_current_job()
corpus_export.state = CorpusExportState.Running
corpus_export.save()
_, db_path = tempfile.mkstemp(suffix='db')
logger.info(f"Exporting corpus {corpus_export.corpus_id} into {db_path}")
db = sqlite3.connect(db_path)
cursor = db.cursor()
......@@ -104,10 +105,27 @@ def export_corpus(corpus_export: CorpusExport) -> None:
db.close()
# Give a nice feedback on the upload portion of the export using boto3's upload callback
# This can be useful for large exports, when the database size is measured in gigabytes
size = os.stat(db_path).st_size
uploaded = 0
def progress_callback(bytes_transferred):
nonlocal uploaded
uploaded += bytes_transferred
if rq_job:
rq_job.set_progress(uploaded / size)
if rq_job:
rq_job.set_description(f'Uploading export for corpus {corpus_export.corpus.name}')
rq_job.set_progress(0.0)
corpus_export.s3_object.upload_file(db_path, Callback=progress_callback)
corpus_export.state = CorpusExportState.Done
corpus_export.save()
return db_path
except Exception:
corpus_export.state = CorpusExportState.Failed
corpus_export.save()
raise
finally:
os.unlink(db_path)
import json
import os
import sqlite3
from unittest.mock import call, patch
from arkindex.dataimport.models import WorkerVersion
from arkindex.documents.export import export_corpus
......@@ -20,7 +21,9 @@ from arkindex.project.tests import FixtureTestCase
class TestExport(FixtureTestCase):
def test_export(self):
@patch('arkindex.documents.export.os.unlink')
@patch('arkindex.project.aws.s3.Object')
def test_export(self, s3_object_mock, unlink_mock):
element = self.corpus.elements.get(name='Volume 1')
transcription = Transcription.objects.first()
version = WorkerVersion.objects.get(worker__slug='reco')
......@@ -58,11 +61,25 @@ class TestExport(FixtureTestCase):
export = self.corpus.exports.create(user=self.user)
db_path = export_corpus(export)
export_corpus(export)
export.refresh_from_db()
self.assertEqual(export.state, CorpusExportState.Done)
self.assertEqual(s3_object_mock().upload_file.call_count, 1)
args, kwargs = s3_object_mock().upload_file.call_args
self.assertCountEqual(kwargs.keys(), {'Callback'})
self.assertTrue(callable(kwargs['Callback']))
self.assertEqual(len(args), 1)
# Retrieve the database path from the S3 upload argument
db_path = args[0]
# We patched the file's removal to be able to check the DB after the job completes.
# In some cases, unlink can be called multiple times due to tempfile's implementation of gettempdir,
# which writes to a file then unlinks it.
# We just ignore the call count, and assume the last call is ours.
self.assertEqual(unlink_mock.call_args, call(db_path))
db = sqlite3.connect(db_path)
self.assertCountEqual(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment