Skip to content
Snippets Groups Projects
Commit 32611a0a authored by ml bonhomme's avatar ml bonhomme :bee: Committed by Erwan Rouchet
Browse files

Add CorpusExport source support

parent bc0c2a69
No related branches found
No related tags found
1 merge request!2252Add CorpusExport source support
......@@ -2,7 +2,9 @@ from datetime import timedelta
from textwrap import dedent
from django.conf import settings
from django.shortcuts import get_object_or_404
from django.utils import timezone
from django.utils.functional import cached_property
from drf_spectacular.utils import extend_schema, extend_schema_view
from rest_framework import permissions, serializers, status
from rest_framework.exceptions import PermissionDenied, ValidationError
......@@ -11,9 +13,7 @@ from rest_framework.response import Response
from arkindex.documents.models import Corpus, CorpusExport, CorpusExportState
from arkindex.documents.serializers.export import CorpusExportSerializer
from arkindex.project.mixins import CorpusACLMixin
from arkindex.project.permissions import IsVerified
from arkindex.users.models import Role
@extend_schema(tags=["exports"])
......@@ -27,47 +27,42 @@ from arkindex.users.models import Role
),
post=extend_schema(
operation_id="StartExport",
request=None,
description=dedent(
f"""
Start a corpus export job.
A user must wait for {settings.EXPORT_TTL_SECONDS} seconds after the last successful import
before being able to generate a new export of the same corpus.
before being able to generate a new export of the same corpus from the same source.
Contributor access is required.
"""
),
)
)
class CorpusExportAPIView(CorpusACLMixin, ListCreateAPIView):
class CorpusExportAPIView(ListCreateAPIView):
permission_classes = (IsVerified, )
serializer_class = CorpusExportSerializer
queryset = CorpusExport.objects.none()
@cached_property
def corpus(self):
qs = Corpus.objects.readable(self.request.user)
corpus = get_object_or_404(qs, pk=self.kwargs["pk"])
if self.request.method not in permissions.SAFE_METHODS and not corpus.is_writable(self.request.user):
raise PermissionDenied(detail="You do not have write access to this corpus.")
return corpus
def get_queryset(self):
return CorpusExport \
.objects \
.filter(corpus=self.get_corpus(self.kwargs["pk"])) \
.filter(corpus=self.corpus) \
.select_related("user") \
.order_by("-created")
def post(self, *args, **kwargs):
corpus = self.get_corpus(self.kwargs["pk"], role=Role.Contributor)
if corpus.exports.filter(state__in=(CorpusExportState.Created, CorpusExportState.Running)).exists():
raise ValidationError("An export is already running for this corpus.")
available_exports = corpus.exports.filter(
state=CorpusExportState.Done,
created__gte=timezone.now() - timedelta(seconds=settings.EXPORT_TTL_SECONDS)
)
if available_exports.exists():
raise ValidationError(f"An export has already been made for this corpus in the last {settings.EXPORT_TTL_SECONDS} seconds.")
export = corpus.exports.create(user=self.request.user)
export.start()
return Response(CorpusExportSerializer(export).data, status=status.HTTP_201_CREATED)
def get_serializer_context(self):
context = super().get_serializer_context()
context["corpus"] = self.corpus
return context
@extend_schema(
......
......@@ -46,12 +46,12 @@ EXPORT_QUERIES = [
]
def run_pg_query(query):
def run_pg_query(query, source_db):
"""
Run a single Postgresql query and split the results into chunks.
When a name is given to a cursor, psycopg2 uses a server-side cursor; we just use a random string as a name.
"""
with connections["default"].create_cursor(name=str(uuid.uuid4())) as pg_cursor:
with connections[source_db].create_cursor(name=str(uuid.uuid4())) as pg_cursor:
pg_cursor.itersize = BATCH_SIZE
pg_cursor.execute(query)
......@@ -122,7 +122,11 @@ def export_corpus(corpus_export: CorpusExport) -> None:
corpus_export.state = CorpusExportState.Running
corpus_export.save()
logger.info(f"Exporting corpus {corpus_export.corpus_id} into {db_path}")
export_source = f"{corpus_export.corpus_id}"
if corpus_export.source != "default":
export_source += f" from source {corpus_export.source}"
logger.info(f"Exporting corpus {export_source} into {db_path}")
db = sqlite3.connect(db_path)
cursor = db.cursor()
......@@ -135,7 +139,7 @@ def export_corpus(corpus_export: CorpusExport) -> None:
if rq_job:
rq_job.set_progress(i / (len(EXPORT_QUERIES) + 1))
for chunk in run_pg_query(query.format(corpus_id=corpus_export.corpus_id)):
for chunk in run_pg_query(query.format(corpus_id=corpus_export.corpus_id), corpus_export.source):
save_sqlite(chunk, name, cursor)
db.commit()
......
# Generated by Django 4.1.7 on 2024-02-28 15:56
from django.db import migrations, models
from arkindex.project import settings
class Migration(migrations.Migration):
dependencies = [
("documents", "0008_alter_elementtype_color_alter_entitytype_color"),
]
operations = [
migrations.AddField(
model_name="corpusexport",
name="source",
field=models.CharField(choices=[(source, source) for source in settings.EXPORT_SOURCES], default="default", max_length=50),
),
]
......@@ -73,6 +73,18 @@ class Corpus(IndexableModel):
for values in DEFAULT_CORPUS_TYPES
)
def is_writable(self, user) -> bool:
"""
Whether a user has write access to this corpus
"""
if user.is_anonymous or getattr(user, "is_agent", False):
return False
if user.is_admin:
return True
from arkindex.users.utils import get_max_level
level = get_max_level(user, self)
return level is not None and level >= Role.Contributor.value
class ElementType(models.Model):
id = models.UUIDField(default=uuid.uuid4, primary_key=True, editable=False)
......@@ -1185,6 +1197,7 @@ class CorpusExport(S3FileMixin, IndexableModel):
corpus = models.ForeignKey(Corpus, related_name="exports", on_delete=models.CASCADE)
user = models.ForeignKey(settings.AUTH_USER_MODEL, related_name="exports", on_delete=models.CASCADE)
state = EnumField(CorpusExportState, max_length=10, default=CorpusExportState.Created)
source = models.CharField(max_length=50, default="default", choices=[(source, source) for source in settings.EXPORT_SOURCES])
s3_bucket = settings.AWS_EXPORT_BUCKET
......
from datetime import timedelta
from django.conf import settings
from django.utils import timezone
from rest_framework import serializers
from rest_framework.exceptions import ValidationError
from arkindex.documents.models import CorpusExport, CorpusExportState
from arkindex.project.serializer_fields import EnumField
......@@ -6,9 +11,38 @@ from arkindex.users.serializers import SimpleUserSerializer
class CorpusExportSerializer(serializers.ModelSerializer):
user = SimpleUserSerializer()
state = EnumField(CorpusExportState)
user = SimpleUserSerializer(read_only=True)
state = EnumField(CorpusExportState, read_only=True)
class Meta:
model = CorpusExport
fields = ("id", "created", "updated", "corpus_id", "user", "state")
fields = ("id", "created", "updated", "corpus_id", "user", "state", "source",)
def validate(self, data):
corpus = self.context["corpus"]
source = data.get("source", "default")
# Check that there is no export already running for this corpus
if corpus.exports.filter(state__in=(CorpusExportState.Created, CorpusExportState.Running)).exists():
raise ValidationError("An export is already running for this corpus.")
# Check that there is no available completed export from the same source created less than {EXPORT_TTL_SECONDS}
# ago for this corpus
available_exports = corpus.exports.filter(
state=CorpusExportState.Done,
source=source,
created__gte=timezone.now() - timedelta(seconds=settings.EXPORT_TTL_SECONDS)
)
if available_exports.exists():
raise ValidationError(f"An export has already been made for this corpus in the last {settings.EXPORT_TTL_SECONDS} seconds.")
data["corpus"] = corpus
data["source"] = source
return data
def create(self, validated_data):
export = CorpusExport.objects.create(
user=self.context["request"].user,
corpus=validated_data["corpus"],
source=validated_data["source"]
)
export.start()
return export
......@@ -31,6 +31,7 @@ class TestExport(FixtureAPITestCase):
},
"corpus_id": str(self.corpus.id),
"state": CorpusExportState.Created.value,
"source": "default"
})
self.assertEqual(delay_mock.call_count, 1)
......@@ -61,19 +62,62 @@ class TestExport(FixtureAPITestCase):
self.assertFalse(delay_mock.called)
@patch("arkindex.project.triggers.export.export_corpus.delay")
@patch("arkindex.project.mixins.has_access", return_value=False)
def test_start_requires_contributor(self, has_access_mock, delay_mock):
@patch("arkindex.users.utils.get_max_level", return_value=Role.Guest.value)
def test_start_requires_contributor(self, max_level_mock, delay_mock):
self.user.rights.update(level=Role.Guest.value)
self.client.force_login(self.user)
response = self.client.post(reverse("api:corpus-export", kwargs={"pk": self.corpus.id}))
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
self.assertEqual(has_access_mock.call_count, 1)
self.assertEqual(has_access_mock.call_args, call(self.user, self.corpus, Role.Contributor.value, skip_public=False))
self.assertEqual(max_level_mock.call_count, 1)
self.assertEqual(max_level_mock.call_args, call(self.user, self.corpus))
self.assertFalse(self.corpus.exports.exists())
self.assertFalse(delay_mock.called)
@patch("arkindex.project.triggers.export.export_corpus.delay")
def test_start_bad_source(self, delay_mock):
self.client.force_login(self.superuser)
response = self.client.post(reverse("api:corpus-export", kwargs={"pk": self.corpus.id}), {"source": "jouvence"})
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertDictEqual(response.json(), {"source": ['"jouvence" is not a valid choice.']})
self.assertEqual(self.corpus.exports.count(), 0)
self.assertFalse(delay_mock.called)
@patch("arkindex.documents.models.CorpusExport.source")
@patch("arkindex.project.triggers.export.export_corpus.delay")
@override_settings(EXPORT_TTL_SECONDS=420)
def test_start_with_source(self, delay_mock, source_field_mock):
source_field_mock.field.choices.return_value = [("default", "default"), ("jouvence", "jouvence")]
self.client.force_login(self.superuser)
response = self.client.post(reverse("api:corpus-export", kwargs={"pk": self.corpus.id}), {"source": "jouvence"})
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
export = self.corpus.exports.get()
self.assertDictEqual(response.json(), {
"id": str(export.id),
"created": export.created.isoformat().replace("+00:00", "Z"),
"updated": export.updated.isoformat().replace("+00:00", "Z"),
"user": {
"id": self.superuser.id,
"display_name": self.superuser.display_name,
"email": self.superuser.email,
},
"corpus_id": str(self.corpus.id),
"state": CorpusExportState.Created.value,
"source": "jouvence"
})
self.assertEqual(delay_mock.call_count, 1)
self.assertEqual(delay_mock.call_args, call(
corpus_export=export,
user_id=self.superuser.id,
description="Export of corpus Unit Tests from source jouvence"
))
@patch("arkindex.project.triggers.export.export_corpus.delay")
def test_start_running(self, delay_mock):
self.client.force_login(self.superuser)
......@@ -81,7 +125,9 @@ class TestExport(FixtureAPITestCase):
response = self.client.post(reverse("api:corpus-export", kwargs={"pk": self.corpus.id}))
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertListEqual(response.json(), ["An export is already running for this corpus."])
self.assertDictEqual(response.json(), {
"non_field_errors": ["An export is already running for this corpus."]
})
self.assertEqual(self.corpus.exports.count(), 1)
self.assertFalse(delay_mock.called)
......@@ -99,11 +145,53 @@ class TestExport(FixtureAPITestCase):
response = self.client.post(reverse("api:corpus-export", kwargs={"pk": self.corpus.id}))
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertListEqual(response.json(), ["An export has already been made for this corpus in the last 420 seconds."])
self.assertDictEqual(response.json(), {
"non_field_errors": ["An export has already been made for this corpus in the last 420 seconds."]
})
self.assertEqual(self.corpus.exports.count(), 1)
self.assertFalse(delay_mock.called)
@override_settings(EXPORT_TTL_SECONDS=420)
@patch("arkindex.project.triggers.export.export_corpus.delay")
def test_start_recent_export_different_source(self, delay_mock):
from arkindex.documents.models import CorpusExport
CorpusExport.source.field.choices = [("default", "default"), ("jouvence", "jouvence")]
self.client.force_login(self.superuser)
with patch("django.utils.timezone.now") as mock_now:
mock_now.return_value = datetime.now(timezone.utc) - timedelta(minutes=2)
self.corpus.exports.create(
user=self.user,
state=CorpusExportState.Done,
source="jouvence"
)
response = self.client.post(reverse("api:corpus-export", kwargs={"pk": self.corpus.id}))
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
export = self.corpus.exports.get(source="default")
self.assertDictEqual(response.json(), {
"id": str(export.id),
"created": export.created.isoformat().replace("+00:00", "Z"),
"updated": export.updated.isoformat().replace("+00:00", "Z"),
"user": {
"id": self.superuser.id,
"display_name": self.superuser.display_name,
"email": self.superuser.email,
},
"corpus_id": str(self.corpus.id),
"state": CorpusExportState.Created.value,
"source": "default"
})
self.assertEqual(delay_mock.call_count, 1)
self.assertEqual(delay_mock.call_args, call(
corpus_export=export,
user_id=self.superuser.id,
description="Export of corpus Unit Tests"
))
def test_list(self):
export1 = self.corpus.exports.create(user=self.user, state=CorpusExportState.Done)
export2 = self.corpus.exports.create(user=self.superuser)
......@@ -123,6 +211,7 @@ class TestExport(FixtureAPITestCase):
"email": self.superuser.email,
},
"corpus_id": str(self.corpus.id),
"source": "default"
},
{
"id": str(export1.id),
......@@ -135,6 +224,7 @@ class TestExport(FixtureAPITestCase):
"email": self.user.email,
},
"corpus_id": str(self.corpus.id),
"source": "default"
},
])
......@@ -149,18 +239,19 @@ class TestExport(FixtureAPITestCase):
response = self.client.get(reverse("api:corpus-export", kwargs={"pk": self.corpus.id}))
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
@patch("arkindex.project.mixins.has_access", return_value=False)
def test_list_requires_guest(self, has_access_mock):
@patch("arkindex.users.managers.BaseACLManager.filter_rights")
def test_list_requires_guest(self, filter_rights_mock):
self.user.rights.all().delete()
self.corpus.public = False
self.corpus.save()
filter_rights_mock.return_value = Corpus.objects.none()
self.client.force_login(self.user)
response = self.client.get(reverse("api:corpus-export", kwargs={"pk": self.corpus.id}))
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
self.assertEqual(has_access_mock.call_count, 1)
self.assertEqual(has_access_mock.call_args, call(self.user, self.corpus, Role.Guest.value, skip_public=False))
self.assertEqual(filter_rights_mock.call_count, 1)
self.assertEqual(filter_rights_mock.call_args, call(self.user, Corpus, Role.Guest.value))
@patch("arkindex.project.aws.s3.meta.client.generate_presigned_url")
def test_download_export(self, presigned_url_mock):
......
......@@ -362,6 +362,8 @@ REDIS_ZREM_CHUNK_SIZE = 10000
# How long before a corpus export can be run again after a successful one
EXPORT_TTL_SECONDS = conf["export"]["ttl"]
# Available database sources for corpus exports
EXPORT_SOURCES = ["default"]
LOGGING = {
"version": 1,
......
......@@ -189,10 +189,13 @@ def export_corpus(corpus_export: CorpusExport) -> None:
"""
Export a corpus to a SQLite database
"""
description = f"Export of corpus {corpus_export.corpus.name}"
if corpus_export.source != "default":
description += f" from source {corpus_export.source}"
export.export_corpus.delay(
corpus_export=corpus_export,
user_id=corpus_export.user_id,
description=f"Export of corpus {corpus_export.corpus.name}"
description=description
)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment