Skip to content
Snippets Groups Projects
Verified Commit ef3f6c8b authored by Erwan Rouchet's avatar Erwan Rouchet
Browse files

Remove the task_failure signal

parent 19124cf2
No related branches found
No related tags found
1 merge request!2435Remove the task_failure signal
......@@ -11,7 +11,6 @@ from rest_framework import serializers
from rest_framework.exceptions import ValidationError
from arkindex.ponos.models import FINAL_STATES, Agent, AgentMode, Artifact, State, Task
from arkindex.ponos.signals import task_failure
from arkindex.process.models import ActivityState, WorkerActivityState
from arkindex.project.serializer_fields import EnumField
from arkindex.project.triggers import notify_process_completion
......@@ -193,11 +192,6 @@ class TaskSerializer(TaskLightSerializer):
notify_process_completion(instance.process)
# We already checked earlier that the task was in a final state.
# If this state is both final and not completed, then we should trigger the task failure signal.
if instance.state != State.Completed:
task_failure.send_robust(self.__class__, task=instance)
return instance
......
from django.dispatch import Signal
# Sphinx does not detect the docstring for signals when using `task_failure.__doc__`,
# because this is an instance, not a class, and this does not look like a constant.
# We would then normally have to add a string right below the signal.
# This string gets picked up successfully by Sphinx, but pre-commit fails due to a
# false positive: https://github.com/pre-commit/pre-commit-hooks/issues/159
# So we use an alternative, lesser known syntax that Sphinx supports but most other
# tools (such as IDE autocompletions) don't, with `#:` comments.
#: A task has reached a final state that is not :attr:`~State.Completed`.
#:
#: This signal will be called with the related :class:`Task` instance as the `task` keyword argument.
task_failure = Signal()
......@@ -4,12 +4,8 @@ from django.db.models.signals import pre_save
from django.dispatch import receiver
from rest_framework.exceptions import ValidationError
from arkindex.ponos.signals import task_failure
from arkindex.process.models import (
ActivityState,
FeatureUsage,
Process,
WorkerActivityState,
WorkerConfiguration,
WorkerRun,
)
......@@ -80,30 +76,3 @@ def set_gpu_usage(sender, instance, **kwargs):
# If the WorkerRun's WorkerVersion requires a GPU, set use_gpu to True; otherwise it's set to False by default
if instance.version.gpu_usage == FeatureUsage.Required:
instance.use_gpu = True
@receiver(task_failure)
def stop_started_activities(sender, task, **kwargs):
"""
When a Ponos task fails, update WorkerActivity from the same WorkerRun from `started` to `error`.
This allows to retry a process and get it to re-run on activities that were not finished without skipping.
"""
process = Process.objects.filter(id=task.process_id).only("id", "activity_state").first()
if process is None or process.activity_state == ActivityState.Disabled:
return
extra_filters = {}
if task.worker_run:
# Look for process activities matching this specific worker version, model version and configuration
extra_filters.update({
"worker_version_id": task.worker_run.version_id,
"model_version_id": task.worker_run.model_version_id,
"configuration_id": task.worker_run.configuration_id,
})
count = (
process.activities
.filter(state=WorkerActivityState.Started, **extra_filters)
.update(state=WorkerActivityState.Error)
)
logger.info(f"Updated {count} worker activities from Started to Error as a task failed on {process.id}")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment