From e705b84592e37964535d2d632bbbe244e47267ff Mon Sep 17 00:00:00 2001 From: Valentin Rigal <rigal@teklia.com> Date: Wed, 10 Jul 2024 14:21:00 +0000 Subject: [PATCH] Create process from failures in an async task --- arkindex/process/api.py | 21 +++++-- arkindex/process/serializers/imports.py | 19 ------- arkindex/process/tasks.py | 56 ++++++++++++++++++- arkindex/process/tests/test_processes.py | 34 ++++++----- arkindex/project/config.py | 1 + .../tests/config_samples/defaults.yaml | 1 + .../project/tests/config_samples/errors.yaml | 1 + .../tests/config_samples/expected_errors.yaml | 1 + .../tests/config_samples/override.yaml | 21 +++---- arkindex/project/triggers.py | 10 ++++ arkindex/project/urls.py | 1 + arkindex/templates/process_failures.html | 11 ++++ 12 files changed, 127 insertions(+), 50 deletions(-) create mode 100644 arkindex/templates/process_failures.html diff --git a/arkindex/process/api.py b/arkindex/process/api.py index 2737bf1781..dd8c33decb 100644 --- a/arkindex/process/api.py +++ b/arkindex/process/api.py @@ -118,7 +118,7 @@ from arkindex.project.openapi import UUID_OR_FALSE, UUID_OR_STR from arkindex.project.pagination import CountCursorPagination from arkindex.project.permissions import IsVerified, IsVerifiedOrReadOnly from arkindex.project.tools import PercentileCont -from arkindex.project.triggers import process_delete +from arkindex.project.triggers import create_process_failures, process_delete from arkindex.training.models import DatasetSet, Model from arkindex.users.models import Role, Scope @@ -2284,14 +2284,17 @@ class CreateDockerWorkerVersion(CreateAPIView): post=extend_schema( operation_id="CreateProcessFailures", tags=["process"], - responses=ProcessSerializer, + responses={202: None}, ), ) class ProcessFailures(CreateAPIView): """ Create a new Workers process, linked to elements with a WorkerActivity in an Error state from a finished process. - Requires an **admin** access to the corpus of this process. + The creation is triggered in an asynchronous task and **HTTP 202 Accepted** is returned. + The user that performed the request will be warned by email once the new process is available. + + Requires an **admin** access to the corpus of the original process. """ permission_classes = (IsVerified, ) serializer_class = ProcessFailuresSerializer @@ -2311,8 +2314,18 @@ class ProcessFailures(CreateAPIView): if not process.corpus.is_processable(self.request.user): raise PermissionDenied(detail="You do not have an admin access to the corpus of this process.") + @cached_property + def process(self): + return super().get_object() + def get_serializer_context(self): context = super().get_serializer_context() if self.request and "pk" in self.kwargs: - context["process"] = self.get_object() + context["process"] = self.process return context + + def create(self, request, *args, **kwargs): + serializer = self.get_serializer(data=request.data) + serializer.is_valid(raise_exception=True) + create_process_failures(self.process, request.user) + return Response(status=status.HTTP_202_ACCEPTED) diff --git a/arkindex/process/serializers/imports.py b/arkindex/process/serializers/imports.py index c26ace36c1..a8c5e04c5d 100644 --- a/arkindex/process/serializers/imports.py +++ b/arkindex/process/serializers/imports.py @@ -2,7 +2,6 @@ from collections import defaultdict from textwrap import dedent from django.conf import settings -from django.db import transaction from django.utils.module_loading import import_string from rest_framework import serializers from rest_framework.exceptions import PermissionDenied, ValidationError @@ -16,7 +15,6 @@ from arkindex.process.models import ( FeatureUsage, Process, ProcessMode, - WorkerActivityState, WorkerRun, WorkerVersionState, ) @@ -702,23 +700,6 @@ class ProcessFailuresSerializer(ProcessSerializer): # Override state field from ProcessLightSerializer to mark it as read-only state = EnumField(State, required=False, read_only=True) - @transaction.atomic - def create(self, validated_data): - base_process = self.context["process"] - process = Process.objects.create( - corpus=base_process.corpus, - mode=ProcessMode.Workers, - name=f"Failures from {base_process.id}", - creator=self.context["request"].user, - ) - process.elements.set( - base_process.activities - .filter(state=WorkerActivityState.Error) - .distinct("element_id") - .values_list("element_id", flat=True) - ) - return process - def validate(self, data): base_process = self.context["process"] errors = defaultdict(list) diff --git a/arkindex/process/tasks.py b/arkindex/process/tasks.py index 6e11a544be..f2491994ae 100644 --- a/arkindex/process/tasks.py +++ b/arkindex/process/tasks.py @@ -1,11 +1,23 @@ +import urllib from typing import Optional from django.conf import settings +from django.core.mail import send_mail from django.db import transaction +from django.template.loader import render_to_string +from django.urls import reverse from django_rq import job -from rq import Retry +from rq import Retry, get_current_job -from arkindex.process.models import ActivityState, Process, WorkerActivity, WorkerActivityState, WorkerVersion +from arkindex.process.models import ( + ActivityState, + Process, + ProcessMode, + WorkerActivity, + WorkerActivityState, + WorkerVersion, +) +from arkindex.users.models import User @job("default", timeout=settings.RQ_TIMEOUTS["initialize_activity"], retry=Retry(max=4)) @@ -43,3 +55,43 @@ def process_delete(process: Process, delete_activity: Optional[WorkerActivitySta if delete_activity: process.activities.filter(state=delete_activity).delete() process.delete() + + +@job("default", timeout=settings.RQ_TIMEOUTS["create_process_failures"]) +def create_process_failures(base_process: Process): + """ + Create a process containing elements from previous WorkerActivity failures. + Once it is done, warn the user by email with a link to process' configuration. + """ + rq_job = get_current_job() + assert rq_job is not None, "This task can only be run in a RQ job context." + assert rq_job.user_id is not None, "This task requires a user ID to be defined on the RQ job." + user = User.objects.get(id=rq_job.user_id) + + with transaction.atomic(): + process = Process.objects.create( + corpus=base_process.corpus, + mode=ProcessMode.Workers, + name=f"Failures from {base_process.id}", + creator=user, + ) + process.elements.set( + base_process.activities + .filter(state=WorkerActivityState.Error) + .distinct("element_id") + .values_list("element_id", flat=True) + ) + process_edit_url = urllib.parse.urljoin( + settings.PUBLIC_HOSTNAME, + reverse("frontend-configure-process", kwargs={"pk": process.id}), + ) + subject="Process created from previous failures" + if base_process.name: + subject += f" ({base_process.name})" + send_mail( + subject=subject, + message=render_to_string("process_failures.html", context={"user": user, "url": process_edit_url}), + from_email=None, + recipient_list=[user.email], + fail_silently=False, + ) diff --git a/arkindex/process/tests/test_processes.py b/arkindex/process/tests/test_processes.py index b158c2da59..27851cf68c 100644 --- a/arkindex/process/tests/test_processes.py +++ b/arkindex/process/tests/test_processes.py @@ -1,12 +1,14 @@ import uuid -from unittest.mock import call, patch +from unittest.mock import MagicMock, call, patch from django.conf import settings +from django.core import mail from django.db import transaction from django.test import override_settings from django.urls import reverse from django.utils import timezone from rest_framework import status +from rq.job import Job from arkindex.documents.models import Corpus, ElementType from arkindex.ponos.models import Farm, State @@ -3138,7 +3140,12 @@ class TestProcesses(FixtureAPITestCase): "The process has no element with a WorkerActivity in an Error state" ]}) - def test_process_failures(self): + @patch("arkindex.project.triggers.process_tasks.create_process_failures.delay") + @patch("arkindex.process.tasks.get_current_job") + def test_process_failures(self, current_job_mock, failures_task_mock): + from arkindex.process.tasks import create_process_failures + failures_task_mock.side_effect = lambda process, user_id, description: create_process_failures(process) + current_job_mock.return_value = MagicMock(spec=Job, user_id = self.user.id) self.client.force_login(self.user) self.workers_process.finished = self.workers_process.started = self.workers_process.updated self.workers_process.activity_state = ActivityState.Ready @@ -3159,19 +3166,16 @@ class TestProcesses(FixtureAPITestCase): with self.assertNumQueries(11): response = self.client.post(reverse("api:process-failures", kwargs={"pk": self.workers_process.id})) - self.assertEqual(response.status_code, status.HTTP_201_CREATED) - data = response.json() - new_id = data.pop("id") - self.assertDictEqual(data, { - "corpus": str(self.corpus.id), - "chunks": 1, - "mode": ProcessMode.Workers.value, - "name": f"Failures from {self.workers_process.id}", - "state": State.Unscheduled.value, - "activity_state": "disabled", - "use_cache": False - }) + self.assertEqual(response.status_code, status.HTTP_202_ACCEPTED) + self.assertEqual(response.content, b"") + new_process = Process.objects.order_by("created").last() + self.assertEqual(new_process.name, f"Failures from {self.workers_process.id}") self.assertQuerySetEqual( - Process.objects.get(id=new_id).elements.all(), + new_process.elements.all(), [volume], ) + # Ensure the user received an email once async creation is done + self.assertEqual(len(mail.outbox), 1) + self.assertEqual(mail.outbox[0].to, [self.user.email]) + self.assertEqual(mail.outbox[0].subject, "Process created from previous failures (Process fixture)") + self.assertIn(reverse("frontend-configure-process", kwargs={"pk": new_process.id}), mail.outbox[0].body) diff --git a/arkindex/project/config.py b/arkindex/project/config.py index fde338a21c..3b080878e6 100644 --- a/arkindex/project/config.py +++ b/arkindex/project/config.py @@ -152,6 +152,7 @@ def get_settings_parser(base_dir): # Task execution in RQ timeouts after 10 hours by default job_timeouts_parser.add_option("task", type=int, default=36000) job_timeouts_parser.add_option("send_verification_email", type=int, default=120) + job_timeouts_parser.add_option("create_process_failures", type=int, default=3600) csrf_parser = parser.add_subparser("csrf", default={}) csrf_parser.add_option("cookie_name", type=str, default="arkindex.csrf") diff --git a/arkindex/project/tests/config_samples/defaults.yaml b/arkindex/project/tests/config_samples/defaults.yaml index 26e0623d42..53561756ef 100644 --- a/arkindex/project/tests/config_samples/defaults.yaml +++ b/arkindex/project/tests/config_samples/defaults.yaml @@ -56,6 +56,7 @@ ingest: secret_access_key: null job_timeouts: corpus_delete: 7200 + create_process_failures: 3600 element_trash: 3600 export_corpus: 7200 initialize_activity: 3600 diff --git a/arkindex/project/tests/config_samples/errors.yaml b/arkindex/project/tests/config_samples/errors.yaml index c4cbfe7333..e0b77e72dc 100644 --- a/arkindex/project/tests/config_samples/errors.yaml +++ b/arkindex/project/tests/config_samples/errors.yaml @@ -37,6 +37,7 @@ ingest: secret_access_key: null job_timeouts: corpus_delete: lol + create_process_failures: lol element_trash: no export_corpus: [] move_element: diff --git a/arkindex/project/tests/config_samples/expected_errors.yaml b/arkindex/project/tests/config_samples/expected_errors.yaml index c070fed5df..f0eaf9603d 100644 --- a/arkindex/project/tests/config_samples/expected_errors.yaml +++ b/arkindex/project/tests/config_samples/expected_errors.yaml @@ -24,6 +24,7 @@ ingest: imageserver_id: cannot convert float infinity to integer job_timeouts: corpus_delete: "invalid literal for int() with base 10: 'lol'" + create_process_failures: "invalid literal for int() with base 10: 'lol'" export_corpus: "int() argument must be a string, a bytes-like object or a real number, not 'list'" move_element: "int() argument must be a string, a bytes-like object or a real number, not 'dict'" reindex_corpus: "int() argument must be a string, a bytes-like object or a real number, not 'dict'" diff --git a/arkindex/project/tests/config_samples/override.yaml b/arkindex/project/tests/config_samples/override.yaml index 189ad21095..195040ab00 100644 --- a/arkindex/project/tests/config_samples/override.yaml +++ b/arkindex/project/tests/config_samples/override.yaml @@ -70,16 +70,17 @@ ingest: secret_access_key: hunter2 job_timeouts: corpus_delete: 1 - element_trash: 2 - export_corpus: 3 - initialize_activity: 4 - move_element: 5 - notify_process_completion: 6 - process_delete: 7 - reindex_corpus: 8 - send_verification_email: 9 - task: 10 - worker_results_delete: 11 + create_process_failures: 2 + element_trash: 3 + export_corpus: 4 + initialize_activity: 5 + move_element: 6 + notify_process_completion: 7 + process_delete: 8 + reindex_corpus: 9 + send_verification_email: 10 + task: 11 + worker_results_delete: 12 jwt_signing_key: deadbeef local_imageserver_id: 45 metrics_port: 4242 diff --git a/arkindex/project/triggers.py b/arkindex/project/triggers.py index 9db62d1b7a..94bbaec25f 100644 --- a/arkindex/project/triggers.py +++ b/arkindex/project/triggers.py @@ -267,3 +267,13 @@ def send_verification_email(user: User): """Send validation email to an user""" assert user.verified_email is False, "Only non verified users can receive a verification email" user_tasks.send_verification_email.delay(user) + + +def create_process_failures(process: Process, user: User): + """Create a process from WorkerActivity failures""" + process_name = process.name or str(process.id) + process_tasks.create_process_failures.delay( + process, + user_id=user.id, + description=f"Creating process from failed elements of process {process_name}" + ) diff --git a/arkindex/project/urls.py b/arkindex/project/urls.py index 72e194d0f6..03789af801 100644 --- a/arkindex/project/urls.py +++ b/arkindex/project/urls.py @@ -23,6 +23,7 @@ urlpatterns = [ path("process/<uuid:pk>/<int:run>", frontend_view.as_view(), name="frontend-process-details"), # Link to the corpus management page, shown in the Django admin path("corpus/<uuid:pk>", frontend_view.as_view(), name="frontend-corpus-details"), + path("process/<uuid:pk>/configure", frontend_view.as_view(), name="frontend-configure-process"), ] if settings.ROBOTS_TXT_DISALLOW: diff --git a/arkindex/templates/process_failures.html b/arkindex/templates/process_failures.html new file mode 100644 index 0000000000..36b1d31a1c --- /dev/null +++ b/arkindex/templates/process_failures.html @@ -0,0 +1,11 @@ +{% autoescape off %} +Hello {{ user.display_name }}, + +The process you created from previous worker activity failures is ready to be configured. + +You can access it directly from this link: +{{ url }} + +-- +Arkindex +{% endautoescape %} -- GitLab