Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • arkindex/backend
1 result
Show changes
Commits on Source (2)
......@@ -85,6 +85,18 @@ class Corpus(IndexableModel):
level = get_max_level(user, self)
return level is not None and level >= Role.Contributor.value
def is_processable(self, user) -> bool:
"""
Whether a user can create and execute processes on this corpus
"""
if user.is_anonymous or getattr(user, "is_agent", False):
return False
if user.is_admin:
return True
from arkindex.users.utils import get_max_level
level = get_max_level(user, self)
return level is not None and level >= Role.Admin.value
class ElementType(models.Model):
id = models.UUIDField(default=uuid.uuid4, primary_key=True, editable=False)
......
......@@ -98,7 +98,7 @@ class CorpusSearchQuerySerializer(serializers.Serializer):
"""))
sources = serializers.MultipleChoiceField(
[
choices=[
("element", "element"),
("transcription", "transcription"),
("metadata", "metadata"),
......
......@@ -77,6 +77,7 @@ from arkindex.process.serializers.imports import (
ProcessDetailsSerializer,
ProcessElementLightSerializer,
ProcessElementSerializer,
ProcessFailuresSerializer,
ProcessListSerializer,
ProcessSerializer,
StartProcessSerializer,
......@@ -2287,3 +2288,41 @@ class CreateDockerWorkerVersion(CreateAPIView):
status=status.HTTP_201_CREATED,
data=WorkerVersionSerializer(serializer.instance, context={"request": self.request}).data,
)
@extend_schema_view(
post=extend_schema(
operation_id="CreateProcessFailures",
tags=["process"],
responses=ProcessSerializer,
),
)
class ProcessFailures(CreateAPIView):
"""
Create a new Workers process, linked to elements with a WorkerActivity in an Error state from a finished process.
Requires an **admin** access to the corpus of this process.
"""
permission_classes = (IsVerified, )
serializer_class = ProcessFailuresSerializer
queryset = (
Process.objects
.select_related("corpus")
.annotate(
has_failures=Exists(
WorkerActivity.objects.filter(process=OuterRef("pk"), state=WorkerActivityState.Error)
)
)
)
def check_object_permissions(self, request, process):
super().check_object_permissions(request, process)
if not process.corpus.is_processable(self.request.user):
raise PermissionDenied(detail="You do not have an admin access to the corpus of this process.")
def get_serializer_context(self):
context = super().get_serializer_context()
if self.request and "pk" in self.kwargs:
context["process"] = self.get_object()
return context
......@@ -2,6 +2,7 @@ from collections import defaultdict
from textwrap import dedent
from django.conf import settings
from django.db import transaction
from django.utils.module_loading import import_string
from rest_framework import serializers
from rest_framework.exceptions import PermissionDenied, ValidationError
......@@ -15,6 +16,7 @@ from arkindex.process.models import (
FeatureUsage,
Process,
ProcessMode,
WorkerActivityState,
WorkerRun,
WorkerVersionState,
)
......@@ -725,3 +727,49 @@ class ProcessElementSerializer(ProcessElementLightSerializer):
"mirrored"
)
read_only_fields = fields
class ProcessFailuresSerializer(ProcessSerializer):
# Override state field from ProcessLightSerializer to mark it as read-only
state = EnumField(State, required=False, read_only=True)
@transaction.atomic
def create(self, validated_data):
base_process = self.context["process"]
process = Process.objects.create(
corpus=base_process.corpus,
mode=ProcessMode.Workers,
name=f"Failures from {base_process.id}",
creator=self.context["request"].user,
)
process.elements.set(
base_process.activities
.filter(state=WorkerActivityState.Error)
.distinct("element_id")
.values_list("element_id", flat=True)
)
return process
def validate(self, data):
base_process = self.context["process"]
errors = defaultdict(list)
if base_process.mode != ProcessMode.Workers:
errors["__all__"].append("The process must be in mode Workers to perform this action")
if not base_process.finished:
errors["__all__"].append("The process must be finished to perform this action")
if not base_process.has_failures:
errors["__all__"].append("The process has no element with a WorkerActivity in an Error state")
if errors:
raise ValidationError(errors)
return data
class Meta(ProcessLightSerializer.Meta):
read_only_fields = ProcessSerializer.Meta.read_only_fields + (
"name",
"state",
"element_type",
"element_name_contains",
"ml_class_id",
"load_children",
"use_gpu",
)
......@@ -3104,3 +3104,97 @@ class TestProcesses(FixtureAPITestCase):
list(self.user.selections.values_list("element__name", flat=True)),
["Volume 1, page 1r", "Volume 1, page 2r"],
)
def test_process_failures_requires_login(self):
response = self.client.post(reverse("api:process-failures", kwargs={"pk": self.workers_process.id}))
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
def test_process_failures_requires_verified(self):
self.user.verified_email = False
self.user.save()
self.client.force_login(self.user)
response = self.client.post(reverse("api:process-failures", kwargs={"pk": self.workers_process.id}))
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
@patch("arkindex.users.utils.get_max_level", return_value=Role.Contributor.value)
def test_process_failures_requires_corpus_admin(self, has_access_mock):
self.client.force_login(self.user)
response = self.client.post(reverse("api:process-failures", kwargs={"pk": self.workers_process.id}))
self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN)
def test_process_failures_wrong_id(self):
self.client.force_login(self.user)
response = self.client.post(reverse("api:process-failures", kwargs={"pk": str(uuid.uuid4())}))
self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND)
def test_process_failures_invalid_mode(self):
self.client.force_login(self.user)
self.user_img_process.finished = self.user_img_process.started = self.user_img_process.updated
self.user_img_process.save()
with self.assertNumQueries(3):
response = self.client.post(reverse("api:process-failures", kwargs={"pk": self.user_img_process.id}))
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertEqual(response.json(), {"__all__": [
"The process must be in mode Workers to perform this action",
"The process has no element with a WorkerActivity in an Error state",
]})
def test_process_failures_invalid_state(self):
self.client.force_login(self.user)
self.workers_process.activities.create(
element=self.corpus.elements.get(name="Volume 1, page 1r"),
worker_version=self.version_gpu,
state=WorkerActivityState.Error,
)
with self.assertNumQueries(3):
response = self.client.post(reverse("api:process-failures", kwargs={"pk": self.workers_process.id}))
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertEqual(response.json(), {"__all__": ["The process must be finished to perform this action"]})
def test_process_failures_no_failures(self):
self.client.force_login(self.user)
self.workers_process.finished = self.workers_process.started = self.workers_process.updated
self.workers_process.save()
with self.assertNumQueries(3):
response = self.client.post(reverse("api:process-failures", kwargs={"pk": self.workers_process.id}))
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertEqual(response.json(), {"__all__": [
"The process has no element with a WorkerActivity in an Error state"
]})
def test_process_failures(self):
self.client.force_login(self.user)
self.workers_process.finished = self.workers_process.started = self.workers_process.updated
self.workers_process.activity_state = ActivityState.Ready
self.workers_process.save()
# Create worker activities, with 1 error on the volume element
self.workers_process.activities.create(
element=self.corpus.elements.get(name="Volume 1, page 1r"),
worker_version=self.version_gpu,
state=WorkerActivityState.Processed,
)
volume = self.corpus.elements.get(name="Volume 1")
self.workers_process.activities.create(
element=volume, worker_version=self.version_gpu, state=WorkerActivityState.Processed
)
self.workers_process.activities.create(
element=volume, worker_version=self.version_with_model, state=WorkerActivityState.Error
)
with self.assertNumQueries(11):
response = self.client.post(reverse("api:process-failures", kwargs={"pk": self.workers_process.id}))
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
data = response.json()
new_id = data.pop("id")
self.assertDictEqual(data, {
"corpus": str(self.corpus.id),
"mode": ProcessMode.Workers.value,
"name": f"Failures from {self.workers_process.id}",
"state": State.Unscheduled.value,
"activity_state": "disabled",
"use_cache": False
})
self.assertQuerysetEqual(
Process.objects.get(id=new_id).elements.all(),
[volume],
)
......@@ -75,6 +75,7 @@ from arkindex.process.api import (
ProcessDatasetSetManage,
ProcessDatasetSets,
ProcessDetails,
ProcessFailures,
ProcessList,
ProcessRetry,
ProcessWorkersActivity,
......@@ -248,6 +249,7 @@ api = [
path("process/<uuid:pk>/", ProcessDetails.as_view(), name="process-details"),
path("process/<uuid:pk>/retry/", ProcessRetry.as_view(), name="process-retry"),
path("process/<uuid:pk>/start/", StartProcess.as_view(), name="process-start"),
path("process/<uuid:pk>/process-failures/", ProcessFailures.as_view(), name="process-failures"),
path("process/files/<uuid:pk>/", DataFileList.as_view(), name="file-list"),
path("process/files/create/", DataFileCreate.as_view(), name="file-create"),
path("process/file/<uuid:pk>/", DataFileRetrieve.as_view(), name="file-retrieve"),
......
......@@ -7,7 +7,7 @@ django-cors-headers==3.14.0
django-enumfields==2.1.1
django-pgtrigger==4.7.0
django-rq==2.10.1
djangorestframework==3.12.4
djangorestframework==3.13.1
djangorestframework-simplejwt==5.2.2
docker==7.0.0
drf-spectacular==0.18.2
......