From 9eba7b3a37cdea46edd03b7662f88c5bdd0e1499 Mon Sep 17 00:00:00 2001 From: Erwan Rouchet <rouchet@teklia.com> Date: Thu, 5 Dec 2024 16:09:45 +0100 Subject: [PATCH] Ignore restarted tasks when computing process states --- arkindex/process/api.py | 13 +++++++----- arkindex/process/models.py | 20 ++++++++++++++++--- arkindex/process/tests/process/test_list.py | 22 +++++++++++++++++++-- 3 files changed, 45 insertions(+), 10 deletions(-) diff --git a/arkindex/process/api.py b/arkindex/process/api.py index d5666f879b..8348ff9306 100644 --- a/arkindex/process/api.py +++ b/arkindex/process/api.py @@ -262,14 +262,17 @@ class ProcessList(ProcessACLMixin, ListAPIView): except ValueError: raise ValidationError({"state": [f"State '{state_value}' does not exist"]}) + last_run_tasks_filter = ( + Q(tasks__run=F("last_run")) + & ~Q(tasks__id__in=Task.objects.filter(process_id=OuterRef("id")).values("original_task_id")) + ) + # Filter out processes which have a task with an incompatible state on their last run excluding_states = STATES_ORDERING[:STATES_ORDERING.index(state)] - excluded_processes = qs.filter( - Q(tasks__run=F("last_run")), - Q(tasks__state__in=excluding_states) - ) + excluded_processes = qs.filter(last_run_tasks_filter & Q(tasks__state__in=excluding_states)) + # Keep non excluded processes matching the state on their last run tasks - state_query = Q(tasks__run=F("last_run")) & Q(tasks__state=state) + state_query = last_run_tasks_filter & Q(tasks__state=state) if state == State.Unscheduled: # Handle the absence of tasks as unscheduled state_query |= Q(tasks__isnull=True) diff --git a/arkindex/process/models.py b/arkindex/process/models.py index f6fd0dd8d9..b618b6c530 100644 --- a/arkindex/process/models.py +++ b/arkindex/process/models.py @@ -6,7 +6,7 @@ from django.contrib.contenttypes.fields import GenericRelation from django.core.exceptions import ValidationError from django.core.validators import MinLengthValidator, MinValueValidator from django.db import models, transaction -from django.db.models import F, Q +from django.db.models import Exists, F, OuterRef, Q from django.urls import reverse from django.utils import timezone from django.utils.functional import cached_property @@ -276,10 +276,24 @@ class Process(IndexableModel): # This prevents performing another SQL request when tasks have already been prefetched. # See https://stackoverflow.com/a/19651840/5990435 if self.has_prefetched_tasks: - task_states = set(t.state for t in self.tasks.all() if t.run == run) + restarted_tasks = set(t.original_task_id for t in self.tasks.all()) + task_states = set( + t.state + for t in self.tasks.all() + if t.run == run + and t.id not in restarted_tasks + ) else: task_states = set( - self.tasks.filter(run=run).values_list("state", flat=True) + self.tasks + .filter(run=run) + # Skip tasks that have been restarted + .exclude(Exists(Task.objects.filter( + process_id=OuterRef("process_id"), + run=OuterRef("run"), + original_task_id=OuterRef("pk"), + ))) + .values_list("state", flat=True) ) # This run has no tasks diff --git a/arkindex/process/tests/process/test_list.py b/arkindex/process/tests/process/test_list.py index a230eccc95..3becb856e9 100644 --- a/arkindex/process/tests/process/test_list.py +++ b/arkindex/process/tests/process/test_list.py @@ -227,14 +227,32 @@ class TestProcessList(FixtureAPITestCase): self.assertEqual(completed_process.state, State.Completed) error_process = Process.objects.create(mode=ProcessMode.Workers, creator=self.user, corpus=self.corpus) - error_process.tasks.create(depth=0, run=0, slug="error_task", state=State.Error, ttl=0) + # `Failed` has a higher priority than `Error`, so it would normally be returned as the process state, + # but it should be ignored on a task that has been restarted in the same run + restarted_task = error_process.tasks.create(depth=0, run=0, slug="restarted_task", state=State.Failed, ttl=0) + error_process.tasks.create( + depth=0, + run=0, + slug="error_task", + state=State.Error, + ttl=0, + original_task=restarted_task, + ) error_process.tasks.create(depth=0, run=0, slug="completed_task", state=State.Completed, ttl=0) self.assertEqual(error_process.state, State.Error) stopped_process = Process.objects.create(mode=ProcessMode.Workers, creator=self.user, corpus=self.corpus) stopped_process.tasks.create(depth=0, run=0, slug="completed_task", state=State.Completed, ttl=0) stopped_process.tasks.create(depth=0, run=1, slug="completed_task", state=State.Completed, ttl=0) - stopped_process.tasks.create(depth=0, run=2, slug="stopped_task", state=State.Stopped, ttl=0) + restarted_task = stopped_process.tasks.create(depth=0, run=1, slug="restarted_task", state=State.Error, ttl=0) + stopped_process.tasks.create( + depth=0, + run=2, + slug="stopped_task", + state=State.Stopped, + ttl=0, + original_task=restarted_task, + ) self.assertEqual(stopped_process.state, State.Stopped) self.client.force_login(self.user) -- GitLab