Skip to content
Snippets Groups Projects
Commit f184cf98 authored by Manon Blanco's avatar Manon Blanco Committed by Erwan Rouchet
Browse files

Trigger children tasks update on worker activity errors

parent f4bc5bab
No related branches found
No related tags found
1 merge request!1627Trigger children tasks update on worker activity errors
......@@ -1438,6 +1438,14 @@ class UpdateWorkerActivity(GenericAPIView):
}
)
# Update other activities
if state == WorkerActivityState.Error.value:
WorkerActivity.objects.filter(
process_id=process_id,
element_id=element_id,
state=WorkerActivityState.Queued,
).update(state=WorkerActivityState.Error)
return Response(serializer.data)
......
......@@ -9,10 +9,12 @@ from arkindex.dataimport.models import (
ActivityState,
DataImport,
DataImportMode,
Repository,
WorkerActivity,
WorkerActivityState,
WorkerConfiguration,
WorkerVersion,
WorkerVersionState,
)
from arkindex.dataimport.tasks import initialize_activity
from arkindex.documents.models import Corpus, Element
......@@ -212,17 +214,17 @@ class TestWorkerActivity(FixtureTestCase):
for state in ['Queued', 'Started', 'Error', 'Processed']
)
allowed_states_update = (
(queued, started),
(started, processed),
(started, error),
(error, queued),
(error, started),
(queued, started, 5),
(started, processed, 5),
(started, error, 6),
(error, queued, 5),
(error, started, 5),
)
self.client.force_login(self.internal_user)
for state, payload_state in allowed_states_update:
for state, payload_state, nb_query in allowed_states_update:
self.activity.state = state
self.activity.save()
with self.assertNumQueries(5):
with self.assertNumQueries(nb_query):
response = self.client.put(
reverse('api:update-worker-activity', kwargs={'pk': str(self.worker_version.id)}),
{
......@@ -357,6 +359,80 @@ class TestWorkerActivity(FixtureTestCase):
self.activity.refresh_from_db()
self.assertEqual(self.activity.process_id, process2.id)
def test_put_activity_update_other_activities(self):
"""
The error update should update the status on the following tasks
"""
worker_version_2 = WorkerVersion.objects.get(worker__slug='dla')
worker_version_3 = WorkerVersion.objects.get(worker__slug='worker-gpu')
worker_version_4 = WorkerVersion.objects.create(
worker=Repository.objects.first().workers.create(
name='New version',
slug='new',
type='new',
),
revision=self.worker_version.revision,
configuration={},
state=WorkerVersionState.Available,
docker_image=self.worker_version.docker_image
)
# Create runs run_1 > run_2 > run_3
run_1 = self.process.worker_runs.first()
run_2 = self.process.worker_runs.create(
version=worker_version_2,
parents=[run_1.id],
)
self.process.worker_runs.create(
version=worker_version_3,
parents=[run_2.id],
)
self.process.worker_runs.create(
version=worker_version_4,
parents=[],
)
# Create activities for run_2, run_3 and run_4
activity_2 = self.element.activities.create(
process=self.process,
worker_version=worker_version_2,
state=WorkerActivityState.Processed
)
activity_3 = self.element.activities.create(
process=self.process,
worker_version=worker_version_3,
state=WorkerActivityState.Queued
)
activity_4 = self.element.activities.create(
process=self.process,
worker_version=worker_version_4,
state=WorkerActivityState.Queued
)
self.client.force_login(self.internal_user)
self.activity.state = WorkerActivityState.Started
self.activity.save()
with self.assertNumQueries(6):
response = self.client.put(
reverse('api:update-worker-activity', kwargs={'pk': str(self.worker_version.id)}),
{
'element_id': str(self.element.id),
'process_id': str(self.process.id),
'state': WorkerActivityState.Error.value,
},
content_type='application/json',
)
self.assertEqual(response.status_code, status.HTTP_200_OK)
self.activity.refresh_from_db()
self.assertEqual(self.activity.state, WorkerActivityState.Error)
activity_2.refresh_from_db()
self.assertEqual(activity_2.state, WorkerActivityState.Processed)
activity_3.refresh_from_db()
self.assertEqual(activity_3.state, WorkerActivityState.Error)
activity_4.refresh_from_db()
self.assertEqual(activity_4.state, WorkerActivityState.Error)
@patch('arkindex.dataimport.models.DataImport.versions')
@patch('arkindex.dataimport.models.ActivityManager.bulk_insert')
def test_async_activities_error(self, bulk_insert_mock, versions_mock):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment