diff --git a/arkindex/process/api.py b/arkindex/process/api.py index 2737bf1781db742f48fdd1cc9c224544a4f8ce0b..dd8c33decbdd05670902c8535f2d859dfde84ed8 100644 --- a/arkindex/process/api.py +++ b/arkindex/process/api.py @@ -118,7 +118,7 @@ from arkindex.project.openapi import UUID_OR_FALSE, UUID_OR_STR from arkindex.project.pagination import CountCursorPagination from arkindex.project.permissions import IsVerified, IsVerifiedOrReadOnly from arkindex.project.tools import PercentileCont -from arkindex.project.triggers import process_delete +from arkindex.project.triggers import create_process_failures, process_delete from arkindex.training.models import DatasetSet, Model from arkindex.users.models import Role, Scope @@ -2284,14 +2284,17 @@ class CreateDockerWorkerVersion(CreateAPIView): post=extend_schema( operation_id="CreateProcessFailures", tags=["process"], - responses=ProcessSerializer, + responses={202: None}, ), ) class ProcessFailures(CreateAPIView): """ Create a new Workers process, linked to elements with a WorkerActivity in an Error state from a finished process. - Requires an **admin** access to the corpus of this process. + The creation is triggered in an asynchronous task and **HTTP 202 Accepted** is returned. + The user that performed the request will be warned by email once the new process is available. + + Requires an **admin** access to the corpus of the original process. """ permission_classes = (IsVerified, ) serializer_class = ProcessFailuresSerializer @@ -2311,8 +2314,18 @@ class ProcessFailures(CreateAPIView): if not process.corpus.is_processable(self.request.user): raise PermissionDenied(detail="You do not have an admin access to the corpus of this process.") + @cached_property + def process(self): + return super().get_object() + def get_serializer_context(self): context = super().get_serializer_context() if self.request and "pk" in self.kwargs: - context["process"] = self.get_object() + context["process"] = self.process return context + + def create(self, request, *args, **kwargs): + serializer = self.get_serializer(data=request.data) + serializer.is_valid(raise_exception=True) + create_process_failures(self.process, request.user) + return Response(status=status.HTTP_202_ACCEPTED) diff --git a/arkindex/process/serializers/imports.py b/arkindex/process/serializers/imports.py index c26ace36c13b937ab911e394933414a25b8ba20a..a8c5e04c5d39a116c68486a35da4a1bae4818c47 100644 --- a/arkindex/process/serializers/imports.py +++ b/arkindex/process/serializers/imports.py @@ -2,7 +2,6 @@ from collections import defaultdict from textwrap import dedent from django.conf import settings -from django.db import transaction from django.utils.module_loading import import_string from rest_framework import serializers from rest_framework.exceptions import PermissionDenied, ValidationError @@ -16,7 +15,6 @@ from arkindex.process.models import ( FeatureUsage, Process, ProcessMode, - WorkerActivityState, WorkerRun, WorkerVersionState, ) @@ -702,23 +700,6 @@ class ProcessFailuresSerializer(ProcessSerializer): # Override state field from ProcessLightSerializer to mark it as read-only state = EnumField(State, required=False, read_only=True) - @transaction.atomic - def create(self, validated_data): - base_process = self.context["process"] - process = Process.objects.create( - corpus=base_process.corpus, - mode=ProcessMode.Workers, - name=f"Failures from {base_process.id}", - creator=self.context["request"].user, - ) - process.elements.set( - base_process.activities - .filter(state=WorkerActivityState.Error) - .distinct("element_id") - .values_list("element_id", flat=True) - ) - return process - def validate(self, data): base_process = self.context["process"] errors = defaultdict(list) diff --git a/arkindex/process/tasks.py b/arkindex/process/tasks.py index 6e11a544beffb881b7e344cbd38520d223ed034b..f2491994aeaf1c82089eab058e287dc505d910cf 100644 --- a/arkindex/process/tasks.py +++ b/arkindex/process/tasks.py @@ -1,11 +1,23 @@ +import urllib from typing import Optional from django.conf import settings +from django.core.mail import send_mail from django.db import transaction +from django.template.loader import render_to_string +from django.urls import reverse from django_rq import job -from rq import Retry +from rq import Retry, get_current_job -from arkindex.process.models import ActivityState, Process, WorkerActivity, WorkerActivityState, WorkerVersion +from arkindex.process.models import ( + ActivityState, + Process, + ProcessMode, + WorkerActivity, + WorkerActivityState, + WorkerVersion, +) +from arkindex.users.models import User @job("default", timeout=settings.RQ_TIMEOUTS["initialize_activity"], retry=Retry(max=4)) @@ -43,3 +55,43 @@ def process_delete(process: Process, delete_activity: Optional[WorkerActivitySta if delete_activity: process.activities.filter(state=delete_activity).delete() process.delete() + + +@job("default", timeout=settings.RQ_TIMEOUTS["create_process_failures"]) +def create_process_failures(base_process: Process): + """ + Create a process containing elements from previous WorkerActivity failures. + Once it is done, warn the user by email with a link to process' configuration. + """ + rq_job = get_current_job() + assert rq_job is not None, "This task can only be run in a RQ job context." + assert rq_job.user_id is not None, "This task requires a user ID to be defined on the RQ job." + user = User.objects.get(id=rq_job.user_id) + + with transaction.atomic(): + process = Process.objects.create( + corpus=base_process.corpus, + mode=ProcessMode.Workers, + name=f"Failures from {base_process.id}", + creator=user, + ) + process.elements.set( + base_process.activities + .filter(state=WorkerActivityState.Error) + .distinct("element_id") + .values_list("element_id", flat=True) + ) + process_edit_url = urllib.parse.urljoin( + settings.PUBLIC_HOSTNAME, + reverse("frontend-configure-process", kwargs={"pk": process.id}), + ) + subject="Process created from previous failures" + if base_process.name: + subject += f" ({base_process.name})" + send_mail( + subject=subject, + message=render_to_string("process_failures.html", context={"user": user, "url": process_edit_url}), + from_email=None, + recipient_list=[user.email], + fail_silently=False, + ) diff --git a/arkindex/process/tests/test_processes.py b/arkindex/process/tests/test_processes.py index b158c2da5995037e340d2919ef7c0369cc42a287..27851cf68cc03e8bd9a29184f15e8ef1d3c1ab94 100644 --- a/arkindex/process/tests/test_processes.py +++ b/arkindex/process/tests/test_processes.py @@ -1,12 +1,14 @@ import uuid -from unittest.mock import call, patch +from unittest.mock import MagicMock, call, patch from django.conf import settings +from django.core import mail from django.db import transaction from django.test import override_settings from django.urls import reverse from django.utils import timezone from rest_framework import status +from rq.job import Job from arkindex.documents.models import Corpus, ElementType from arkindex.ponos.models import Farm, State @@ -3138,7 +3140,12 @@ class TestProcesses(FixtureAPITestCase): "The process has no element with a WorkerActivity in an Error state" ]}) - def test_process_failures(self): + @patch("arkindex.project.triggers.process_tasks.create_process_failures.delay") + @patch("arkindex.process.tasks.get_current_job") + def test_process_failures(self, current_job_mock, failures_task_mock): + from arkindex.process.tasks import create_process_failures + failures_task_mock.side_effect = lambda process, user_id, description: create_process_failures(process) + current_job_mock.return_value = MagicMock(spec=Job, user_id = self.user.id) self.client.force_login(self.user) self.workers_process.finished = self.workers_process.started = self.workers_process.updated self.workers_process.activity_state = ActivityState.Ready @@ -3159,19 +3166,16 @@ class TestProcesses(FixtureAPITestCase): with self.assertNumQueries(11): response = self.client.post(reverse("api:process-failures", kwargs={"pk": self.workers_process.id})) - self.assertEqual(response.status_code, status.HTTP_201_CREATED) - data = response.json() - new_id = data.pop("id") - self.assertDictEqual(data, { - "corpus": str(self.corpus.id), - "chunks": 1, - "mode": ProcessMode.Workers.value, - "name": f"Failures from {self.workers_process.id}", - "state": State.Unscheduled.value, - "activity_state": "disabled", - "use_cache": False - }) + self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED) + self.assertEqual(response.content, b"") + new_process = Process.objects.order_by("created").last() + self.assertEqual(new_process.name, f"Failures from {self.workers_process.id}") self.assertQuerySetEqual( - Process.objects.get(id=new_id).elements.all(), + new_process.elements.all(), [volume], ) + # Ensure the user received an email once async creation is done + self.assertEqual(len(mail.outbox), 1) + self.assertEqual(mail.outbox[0].to, [self.user.email]) + self.assertEqual(mail.outbox[0].subject, "Process created from previous failures (Process fixture)") + self.assertIn(reverse("frontend-configure-process", kwargs={"pk": new_process.id}), mail.outbox[0].body) diff --git a/arkindex/project/config.py b/arkindex/project/config.py index fde338a21c77708bbe7767fbde6bdc203baf7bc3..3b080878e65518998e01e19cb91f06f8a45dea52 100644 --- a/arkindex/project/config.py +++ b/arkindex/project/config.py @@ -152,6 +152,7 @@ def get_settings_parser(base_dir): # Task execution in RQ timeouts after 10 hours by default job_timeouts_parser.add_option("task", type=int, default=36000) job_timeouts_parser.add_option("send_verification_email", type=int, default=120) + job_timeouts_parser.add_option("create_process_failures", type=int, default=3600) csrf_parser = parser.add_subparser("csrf", default={}) csrf_parser.add_option("cookie_name", type=str, default="arkindex.csrf") diff --git a/arkindex/project/tests/config_samples/defaults.yaml b/arkindex/project/tests/config_samples/defaults.yaml index 26e0623d42862b540e8ff1d7ae36e75c84eea6c2..53561756ef60eb888d018f4293a8557bb5d0df1b 100644 --- a/arkindex/project/tests/config_samples/defaults.yaml +++ b/arkindex/project/tests/config_samples/defaults.yaml @@ -56,6 +56,7 @@ ingest: secret_access_key: null job_timeouts: corpus_delete: 7200 + create_process_failures: 3600 element_trash: 3600 export_corpus: 7200 initialize_activity: 3600 diff --git a/arkindex/project/tests/config_samples/errors.yaml b/arkindex/project/tests/config_samples/errors.yaml index c4cbfe733368892702fddd52549442ead166b43f..e0b77e72dc3b7e9d3e6276a1c98c56f5caa2aef0 100644 --- a/arkindex/project/tests/config_samples/errors.yaml +++ b/arkindex/project/tests/config_samples/errors.yaml @@ -37,6 +37,7 @@ ingest: secret_access_key: null job_timeouts: corpus_delete: lol + create_process_failures: lol element_trash: no export_corpus: [] move_element: diff --git a/arkindex/project/tests/config_samples/expected_errors.yaml b/arkindex/project/tests/config_samples/expected_errors.yaml index c070fed5dfeb2e41882e71ef717e4a05cc3f758d..f0eaf9603de0d85f02c7774f60202ad63b80a3ce 100644 --- a/arkindex/project/tests/config_samples/expected_errors.yaml +++ b/arkindex/project/tests/config_samples/expected_errors.yaml @@ -24,6 +24,7 @@ ingest: imageserver_id: cannot convert float infinity to integer job_timeouts: corpus_delete: "invalid literal for int() with base 10: 'lol'" + create_process_failures: "invalid literal for int() with base 10: 'lol'" export_corpus: "int() argument must be a string, a bytes-like object or a real number, not 'list'" move_element: "int() argument must be a string, a bytes-like object or a real number, not 'dict'" reindex_corpus: "int() argument must be a string, a bytes-like object or a real number, not 'dict'" diff --git a/arkindex/project/tests/config_samples/override.yaml b/arkindex/project/tests/config_samples/override.yaml index 189ad21095ba8c7fbc0bb4b1acedf4b8142eb973..195040ab00df2ff890316b5f7b00fc393d9d3868 100644 --- a/arkindex/project/tests/config_samples/override.yaml +++ b/arkindex/project/tests/config_samples/override.yaml @@ -70,16 +70,17 @@ ingest: secret_access_key: hunter2 job_timeouts: corpus_delete: 1 - element_trash: 2 - export_corpus: 3 - initialize_activity: 4 - move_element: 5 - notify_process_completion: 6 - process_delete: 7 - reindex_corpus: 8 - send_verification_email: 9 - task: 10 - worker_results_delete: 11 + create_process_failures: 2 + element_trash: 3 + export_corpus: 4 + initialize_activity: 5 + move_element: 6 + notify_process_completion: 7 + process_delete: 8 + reindex_corpus: 9 + send_verification_email: 10 + task: 11 + worker_results_delete: 12 jwt_signing_key: deadbeef local_imageserver_id: 45 metrics_port: 4242 diff --git a/arkindex/project/triggers.py b/arkindex/project/triggers.py index 9db62d1b7ac2bdbeb8413cf6588ad2c99e3d938a..94bbaec25f7cb6510b170a81b63f5e577244804b 100644 --- a/arkindex/project/triggers.py +++ b/arkindex/project/triggers.py @@ -267,3 +267,13 @@ def send_verification_email(user: User): """Send validation email to an user""" assert user.verified_email is False, "Only non verified users can receive a verification email" user_tasks.send_verification_email.delay(user) + + +def create_process_failures(process: Process, user: User): + """Create a process from WorkerActivity failures""" + process_name = process.name or str(process.id) + process_tasks.create_process_failures.delay( + process, + user_id=user.id, + description=f"Creating process from failed elements of process {process_name}" + ) diff --git a/arkindex/project/urls.py b/arkindex/project/urls.py index 72e194d0f6ef199c24dcccf6ecdd447b2ed20e96..03789af80141f86d8514d549c553d27ab7a928b3 100644 --- a/arkindex/project/urls.py +++ b/arkindex/project/urls.py @@ -23,6 +23,7 @@ urlpatterns = [ path("process/<uuid:pk>/<int:run>", frontend_view.as_view(), name="frontend-process-details"), # Link to the corpus management page, shown in the Django admin path("corpus/<uuid:pk>", frontend_view.as_view(), name="frontend-corpus-details"), + path("process/<uuid:pk>/configure", frontend_view.as_view(), name="frontend-configure-process"), ] if settings.ROBOTS_TXT_DISALLOW: diff --git a/arkindex/templates/process_failures.html b/arkindex/templates/process_failures.html new file mode 100644 index 0000000000000000000000000000000000000000..36b1d31a1c1aeb45f2ae2aee1ba7006d9e9a0e5b --- /dev/null +++ b/arkindex/templates/process_failures.html @@ -0,0 +1,11 @@ +{% autoescape off %} +Hello {{ user.display_name }}, + +The process you created from previous worker activity failures is ready to be configured. + +You can access it directly from this link: +{{ url }} + +-- +Arkindex +{% endautoescape %}