diff --git a/arkindex/process/api.py b/arkindex/process/api.py
index d5666f879be62f5bc0962dad4d62f66dac8e94ac..8348ff930641d50335a665336eb1f0b95502208f 100644
--- a/arkindex/process/api.py
+++ b/arkindex/process/api.py
@@ -262,14 +262,17 @@ class ProcessList(ProcessACLMixin, ListAPIView):
             except ValueError:
                 raise ValidationError({"state": [f"State '{state_value}' does not exist"]})
 
+            last_run_tasks_filter = (
+                Q(tasks__run=F("last_run"))
+                & ~Q(tasks__id__in=Task.objects.filter(process_id=OuterRef("id")).values("original_task_id"))
+            )
+
             # Filter out processes which have a task with an incompatible state on their last run
             excluding_states = STATES_ORDERING[:STATES_ORDERING.index(state)]
-            excluded_processes = qs.filter(
-                Q(tasks__run=F("last_run")),
-                Q(tasks__state__in=excluding_states)
-            )
+            excluded_processes = qs.filter(last_run_tasks_filter & Q(tasks__state__in=excluding_states))
+
             # Keep non excluded processes matching the state on their last run tasks
-            state_query = Q(tasks__run=F("last_run")) & Q(tasks__state=state)
+            state_query = last_run_tasks_filter & Q(tasks__state=state)
             if state == State.Unscheduled:
                 # Handle the absence of tasks as unscheduled
                 state_query |= Q(tasks__isnull=True)
diff --git a/arkindex/process/models.py b/arkindex/process/models.py
index f6fd0dd8d986cb3546c5cc58418dfea67dae867c..b618b6c530dc3ee5f8435a8a31dcc6fe9b9930f9 100644
--- a/arkindex/process/models.py
+++ b/arkindex/process/models.py
@@ -6,7 +6,7 @@ from django.contrib.contenttypes.fields import GenericRelation
 from django.core.exceptions import ValidationError
 from django.core.validators import MinLengthValidator, MinValueValidator
 from django.db import models, transaction
-from django.db.models import F, Q
+from django.db.models import Exists, F, OuterRef, Q
 from django.urls import reverse
 from django.utils import timezone
 from django.utils.functional import cached_property
@@ -276,10 +276,24 @@ class Process(IndexableModel):
         # This prevents performing another SQL request when tasks have already been prefetched.
         # See https://stackoverflow.com/a/19651840/5990435
         if self.has_prefetched_tasks:
-            task_states = set(t.state for t in self.tasks.all() if t.run == run)
+            restarted_tasks = set(t.original_task_id for t in self.tasks.all())
+            task_states = set(
+                t.state
+                for t in self.tasks.all()
+                if t.run == run
+                and t.id not in restarted_tasks
+            )
         else:
             task_states = set(
-                self.tasks.filter(run=run).values_list("state", flat=True)
+                self.tasks
+                .filter(run=run)
+                # Skip tasks that have been restarted
+                .exclude(Exists(Task.objects.filter(
+                    process_id=OuterRef("process_id"),
+                    run=OuterRef("run"),
+                    original_task_id=OuterRef("pk"),
+                )))
+                .values_list("state", flat=True)
             )
 
         # This run has no tasks
diff --git a/arkindex/process/tests/process/test_list.py b/arkindex/process/tests/process/test_list.py
index a230eccc950578b5f1c74e42fef982f3100ef921..3becb856e9a8e2afaaa4c3c3a2906b794fe7f26c 100644
--- a/arkindex/process/tests/process/test_list.py
+++ b/arkindex/process/tests/process/test_list.py
@@ -227,14 +227,32 @@ class TestProcessList(FixtureAPITestCase):
         self.assertEqual(completed_process.state, State.Completed)
 
         error_process = Process.objects.create(mode=ProcessMode.Workers, creator=self.user, corpus=self.corpus)
-        error_process.tasks.create(depth=0, run=0, slug="error_task", state=State.Error, ttl=0)
+        # `Failed` has a higher priority than `Error`, so it would normally be returned as the process state,
+        # but it should be ignored on a task that has been restarted in the same run
+        restarted_task = error_process.tasks.create(depth=0, run=0, slug="restarted_task", state=State.Failed, ttl=0)
+        error_process.tasks.create(
+            depth=0,
+            run=0,
+            slug="error_task",
+            state=State.Error,
+            ttl=0,
+            original_task=restarted_task,
+        )
         error_process.tasks.create(depth=0, run=0, slug="completed_task", state=State.Completed, ttl=0)
         self.assertEqual(error_process.state, State.Error)
 
         stopped_process = Process.objects.create(mode=ProcessMode.Workers, creator=self.user, corpus=self.corpus)
         stopped_process.tasks.create(depth=0, run=0, slug="completed_task", state=State.Completed, ttl=0)
         stopped_process.tasks.create(depth=0, run=1, slug="completed_task", state=State.Completed, ttl=0)
-        stopped_process.tasks.create(depth=0, run=2, slug="stopped_task", state=State.Stopped, ttl=0)
+        restarted_task = stopped_process.tasks.create(depth=0, run=1, slug="restarted_task", state=State.Error, ttl=0)
+        stopped_process.tasks.create(
+            depth=0,
+            run=2,
+            slug="stopped_task",
+            state=State.Stopped,
+            ttl=0,
+            original_task=restarted_task,
+        )
         self.assertEqual(stopped_process.state, State.Stopped)
 
         self.client.force_login(self.user)