diff --git a/arkindex/dataimport/api.py b/arkindex/dataimport/api.py index 0a41fa2b1b493f9cac9a9f114a1b9f5dc9ae68c2..afcdfa6bff049854bba77ee4e0e2420cfd292333 100644 --- a/arkindex/dataimport/api.py +++ b/arkindex/dataimport/api.py @@ -1362,8 +1362,8 @@ class UpdateWorkerActivity(GenericAPIView): ) def put(self, request, *args, **kwarg): """ - Update a worker activity with a single database requests. - If the couple element_id, worker_version matches no existing + Update a worker activity with two database requests. + If the group element_id, worker_version, configuration_id matches no existing activity, the update count is 0. If the new state is disallowed, the update count is 0 too. """ @@ -1375,10 +1375,15 @@ class UpdateWorkerActivity(GenericAPIView): state = serializer.validated_data['state'].value process_id = serializer.validated_data['process_id'] + # Between zero and one worker run can match the filter due to the DB constraint + worker_run = WorkerRun.objects.filter(dataimport_id=process_id, version_id=worker_version_id).first() + configuration_id = worker_run.configuration_id if worker_run else None + # Between zero and one worker activity can match the filter due to the DB constraint activity = WorkerActivity.objects.filter( worker_version_id=worker_version_id, element_id=element_id, + configuration_id=configuration_id, state__in=self.allowed_transitions[state] ) update_count = activity.update(state=state, process_id=process_id) diff --git a/arkindex/dataimport/managers.py b/arkindex/dataimport/managers.py index 9c8d2c0bacaee61653bbd0f2232e44c245a5662e..268f5e4b8e472702ad459b75d75759b5a45302b1 100644 --- a/arkindex/dataimport/managers.py +++ b/arkindex/dataimport/managers.py @@ -8,7 +8,7 @@ logger = logging.getLogger(__name__) class ActivityManager(models.Manager): """Model management for worker activities""" - def bulk_insert(self, worker_version_id, process_id, elements_qs, state=None): + def bulk_insert(self, worker_version_id, process_id, configuration_id, elements_qs, state=None): """ Create initial worker activities from a queryset of elements in a efficient way. Due to the possible large amount of elements, we use a bulk insert from the elements query (best performances). @@ -22,25 +22,19 @@ class ActivityManager(models.Manager): assert isinstance(state, WorkerActivityState), 'State should be an instance of WorkerActivityState' sql, params = elements_qs.values('id').query.sql_with_params() + select_params = (worker_version_id, configuration_id, state.value, process_id) + params with connections['default'].cursor() as cursor: cursor.execute( f""" INSERT INTO dataimport_workeractivity - (element_id, worker_version_id, state, process_id, id, created, updated) + (element_id, worker_version_id, configuration_id, state, process_id, id, created, updated) SELECT - elt.id, - '{worker_version_id}'::uuid, - '{state.value}', - '{process_id}', - uuid_generate_v4(), - current_timestamp, - current_timestamp + elt.id, %s, %s, %s, %s, uuid_generate_v4(), current_timestamp, current_timestamp FROM ({sql}) AS elt - ON CONFLICT (element_id, worker_version_id) DO NOTHING + ON CONFLICT DO NOTHING """, - params - ) + select_params) class CorpusWorkerVersionManager(models.Manager): diff --git a/arkindex/dataimport/migrations/0042_alter_workeractivity_constraints.py b/arkindex/dataimport/migrations/0042_alter_workeractivity_constraints.py new file mode 100644 index 0000000000000000000000000000000000000000..81e073c5b6b82900913b4de06efac457b4c03773 --- /dev/null +++ b/arkindex/dataimport/migrations/0042_alter_workeractivity_constraints.py @@ -0,0 +1,25 @@ +# Generated by Django 3.2.5 on 2021-11-05 13:25 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('dataimport', '0041_remove_old_configuration'), + ] + + operations = [ + migrations.AlterUniqueTogether( + name='workeractivity', + unique_together=set(), + ), + migrations.AddConstraint( + model_name='workeractivity', + constraint=models.UniqueConstraint(condition=models.Q(('configuration__isnull', True)), fields=('worker_version', 'element'), name='worker_activity_unique_no_configuration'), + ), + migrations.AddConstraint( + model_name='workeractivity', + constraint=models.UniqueConstraint(condition=models.Q(('configuration__isnull', False)), fields=('worker_version', 'element', 'configuration'), name='worker_activity_unique_configuration'), + ), + ] diff --git a/arkindex/dataimport/models.py b/arkindex/dataimport/models.py index a62dd05d2e856ea1a3c2fce211370f0937e75236..6660737e8026400adedd3dbe5ac1037a62447e24 100644 --- a/arkindex/dataimport/models.py +++ b/arkindex/dataimport/models.py @@ -727,9 +727,20 @@ class WorkerActivity(IndexableModel): objects = ActivityManager() class Meta: - unique_together = ( - ('worker_version', 'element'), - ) + constraints = [ + # Add configuration unicity + # Two constraints are required as Null values are not compared for unicity + models.UniqueConstraint( + fields=['worker_version', 'element'], + name='worker_activity_unique_no_configuration', + condition=Q(configuration__isnull=True), + ), + models.UniqueConstraint( + fields=['worker_version', 'element', 'configuration'], + name='worker_activity_unique_configuration', + condition=Q(configuration__isnull=False), + ) + ] class CorpusWorkerVersion(models.Model): diff --git a/arkindex/dataimport/tasks.py b/arkindex/dataimport/tasks.py index eb159744074a235e44e2db339f5dc4a2eb7468eb..a0c6601c454279225d9db91dbbdc6d62942340c3 100644 --- a/arkindex/dataimport/tasks.py +++ b/arkindex/dataimport/tasks.py @@ -15,10 +15,11 @@ def initialize_activity(process: DataImport): """ try: with transaction.atomic(): - for version_id in process.versions.values_list('id', flat=True): + for version_id, configuration_id in process.worker_runs.values_list('version_id', 'configuration_id'): WorkerActivity.objects.bulk_insert( worker_version_id=version_id, process_id=process.id, + configuration_id=configuration_id, elements_qs=process.list_elements() ) except Exception as e: diff --git a/arkindex/dataimport/tests/test_workeractivity.py b/arkindex/dataimport/tests/test_workeractivity.py index 2f715f5a35626ed542270ba33f505dee67d139a5..82193db12f6130596afe91c075049664e7aeb633 100644 --- a/arkindex/dataimport/tests/test_workeractivity.py +++ b/arkindex/dataimport/tests/test_workeractivity.py @@ -10,6 +10,7 @@ from arkindex.dataimport.models import ( DataImportMode, WorkerActivity, WorkerActivityState, + WorkerConfiguration, WorkerVersion, ) from arkindex.dataimport.tasks import initialize_activity @@ -29,15 +30,34 @@ class TestWorkerActivity(FixtureTestCase): mode=DataImportMode.Workers, creator=cls.user ) + cls.configuration = WorkerConfiguration.objects.create(worker=cls.worker_version.worker, name='A config', configuration={'a': 'b'}) + cls.process.worker_runs.create(version=cls.worker_version, parents=[], configuration=cls.configuration) def setUp(self): # Create a queued activity for this element self.activity = self.element.activities.create( process=self.process, worker_version=self.worker_version, - state=WorkerActivityState.Queued + state=WorkerActivityState.Queued, + configuration=self.configuration ) + def test_bulk_insert_activity_no_configuration(self): + """ + Bulk insert worker activities for acts + """ + elements_qs = Element.objects.filter(type__slug='act', type__corpus_id=self.corpus.id) + params = { + 'worker_version_id': self.worker_version.id, + 'corpus_id': self.corpus.id, + 'state': WorkerActivityState.Started.value, + 'process_id': self.process.id, + } + with self.assertExactQueries('workeractivity_bulk_insert_no_configuration.sql', params=params): + WorkerActivity.objects.bulk_insert(self.worker_version.id, self.process.id, None, elements_qs, state=WorkerActivityState.Started) + self.assertEqual(elements_qs.count(), 5) + self.assertEqual(WorkerActivity.objects.filter(state=WorkerActivityState.Started, process=self.process).count(), 5) + def test_bulk_insert_activity_children(self): """ Bulk insert worker activities for acts @@ -45,12 +65,13 @@ class TestWorkerActivity(FixtureTestCase): elements_qs = Element.objects.filter(type__slug='act', type__corpus_id=self.corpus.id) params = { 'worker_version_id': self.worker_version.id, + 'configuration_id': self.configuration.id, 'corpus_id': self.corpus.id, 'state': WorkerActivityState.Started.value, 'process_id': self.process.id, } with self.assertExactQueries('workeractivity_bulk_insert.sql', params=params): - WorkerActivity.objects.bulk_insert(self.worker_version.id, self.process.id, elements_qs, state=WorkerActivityState.Started) + WorkerActivity.objects.bulk_insert(self.worker_version.id, self.process.id, self.configuration.id, elements_qs, state=WorkerActivityState.Started) self.assertEqual(elements_qs.count(), 5) self.assertEqual(WorkerActivity.objects.filter(state=WorkerActivityState.Started, process=self.process).count(), 5) @@ -66,12 +87,13 @@ class TestWorkerActivity(FixtureTestCase): element=element, worker_version=self.worker_version, state=WorkerActivityState.Processed.value, - process=old_process + process=old_process, + configuration=self.configuration ) for element in elements_qs[:2] ]) with self.assertNumQueries(1): - WorkerActivity.objects.bulk_insert(self.worker_version.id, self.process.id, elements_qs, state=WorkerActivityState.Started) + WorkerActivity.objects.bulk_insert(self.worker_version.id, self.process.id, self.configuration.id, elements_qs, state=WorkerActivityState.Started) self.assertEqual(WorkerActivity.objects.filter(element_id__in=elements_qs.values('id')).count(), 5) self.assertEqual(elements_qs.count(), 5) # Only 3 acts have been marked as started for this worker @@ -84,7 +106,6 @@ class TestWorkerActivity(FixtureTestCase): Worker activities creation should work with complex elements selection (e.g. with a class filter) """ # Mock workers activity to make it a synchronous job - from arkindex.dataimport.tasks import initialize_activity activities_delay_mock.side_effect = initialize_activity agent_class = MLClass.objects.create(name='James', corpus=self.corpus) @@ -99,7 +120,7 @@ class TestWorkerActivity(FixtureTestCase): corpus=self.corpus, best_class=agent_class.name ) - dataimport.worker_runs.create(version=self.worker_version, parents=[]) + dataimport.worker_runs.create(version=self.worker_version, parents=[], configuration=self.configuration) with self.assertNumQueries(22): dataimport.start() @@ -121,8 +142,8 @@ class TestWorkerActivity(FixtureTestCase): (None, status.HTTP_403_FORBIDDEN, 0), (self.user, status.HTTP_403_FORBIDDEN, 2), (self.superuser, status.HTTP_403_FORBIDDEN, 2), - (self.internal_user, status.HTTP_200_OK, 4), - (internal_admin_user, status.HTTP_200_OK, 4) + (self.internal_user, status.HTTP_200_OK, 5), + (internal_admin_user, status.HTTP_200_OK, 5) ) for user, status_code, requests_count in cases: self.activity.state = WorkerActivityState.Queued @@ -147,7 +168,7 @@ class TestWorkerActivity(FixtureTestCase): The response is a HTTP_409_CONFLICT """ self.client.force_login(self.internal_user) - with self.assertNumQueries(4): + with self.assertNumQueries(5): response = self.client.put( reverse('api:update-worker-activity', kwargs={'pk': str(uuid.uuid4())}), { @@ -171,7 +192,7 @@ class TestWorkerActivity(FixtureTestCase): The response is a HTTP_409_CONFLICT """ self.client.force_login(self.internal_user) - with self.assertNumQueries(4): + with self.assertNumQueries(5): response = self.client.put( reverse('api:update-worker-activity', kwargs={'pk': str(self.worker_version.id)}), { @@ -229,7 +250,7 @@ class TestWorkerActivity(FixtureTestCase): for state, payload_state in allowed_states_update: self.activity.state = state self.activity.save() - with self.assertNumQueries(4): + with self.assertNumQueries(5): response = self.client.put( reverse('api:update-worker-activity', kwargs={'pk': str(self.worker_version.id)}), { @@ -262,7 +283,7 @@ class TestWorkerActivity(FixtureTestCase): for state, payload_state in forbidden_states_update: self.activity.state = state self.activity.save() - with self.assertNumQueries(4): + with self.assertNumQueries(5): response = self.client.put( reverse('api:update-worker-activity', kwargs={'pk': str(self.worker_version.id)}), { @@ -291,10 +312,11 @@ class TestWorkerActivity(FixtureTestCase): mode=DataImportMode.Workers, creator=self.user ) + process2.worker_runs.create(version=self.worker_version, parents=[], configuration=self.configuration) self.activity.process_id = process2.id self.activity.save() - with self.assertNumQueries(4): + with self.assertNumQueries(5): response = self.client.put( reverse('api:update-worker-activity', kwargs={'pk': str(self.worker_version.id)}), { @@ -331,22 +353,22 @@ class TestWorkerActivity(FixtureTestCase): process.list_elements = MagicMock() process.list_elements.return_value = elts_qs - v1_id, v2_id = uuid.uuid4(), uuid.uuid4() + worker_version_2 = WorkerVersion.objects.get(worker__slug='dla') + process.worker_runs.create(version=self.worker_version, parents=[], configuration=None) + process.worker_runs.create(version=worker_version_2, parents=[], configuration=None) + bulk_insert_mock.side_effect = [ bulk_insert_mock, Exception('Something went wrong') ] - process.versions = MagicMock() - process.versions.values_list.return_value = [v1_id, v2_id] - with self.assertRaises(Exception): initialize_activity(process) self.assertEqual(bulk_insert_mock.call_count, 2) self.assertListEqual(bulk_insert_mock.call_args_list, [ - call(worker_version_id=v1_id, process_id=process.id, elements_qs=elts_qs), - call(worker_version_id=v2_id, process_id=process.id, elements_qs=elts_qs) + call(worker_version_id=self.worker_version.id, process_id=process.id, configuration_id=None, elements_qs=elts_qs), + call(worker_version_id=worker_version_2.id, process_id=process.id, configuration_id=None, elements_qs=elts_qs) ]) process.refresh_from_db() diff --git a/arkindex/sql_validation/workeractivity_bulk_insert.sql b/arkindex/sql_validation/workeractivity_bulk_insert.sql index 35f8b07cd98be5af97c815852c25e48b7f06c0d7..aec2f151d56cb6f6b5fab404dec198ca4d7cf421 100644 --- a/arkindex/sql_validation/workeractivity_bulk_insert.sql +++ b/arkindex/sql_validation/workeractivity_bulk_insert.sql @@ -1,8 +1,9 @@ -INSERT INTO dataimport_workeractivity (element_id, worker_version_id, state, process_id, id, created, updated) +INSERT INTO dataimport_workeractivity (element_id, worker_version_id, configuration_id, state, process_id, id, created, updated) SELECT elt.id, '{worker_version_id}'::uuid, + '{configuration_id}'::uuid, '{state}', - '{process_id}', + '{process_id}'::uuid, uuid_generate_v4(), current_timestamp, current_timestamp @@ -11,4 +12,4 @@ FROM FROM "documents_element" INNER JOIN "documents_elementtype" ON ("documents_element"."type_id" = "documents_elementtype"."id") WHERE ("documents_elementtype"."corpus_id" = '{corpus_id}'::uuid - AND "documents_elementtype"."slug" = 'act')) AS elt ON CONFLICT (element_id, worker_version_id) DO NOTHING + AND "documents_elementtype"."slug" = 'act')) AS elt ON CONFLICT DO NOTHING diff --git a/arkindex/sql_validation/workeractivity_bulk_insert_no_configuration.sql b/arkindex/sql_validation/workeractivity_bulk_insert_no_configuration.sql new file mode 100644 index 0000000000000000000000000000000000000000..d43adfa741f03a669193d9641875559dbdfe6f40 --- /dev/null +++ b/arkindex/sql_validation/workeractivity_bulk_insert_no_configuration.sql @@ -0,0 +1,15 @@ +INSERT INTO dataimport_workeractivity (element_id, worker_version_id, configuration_id, state, process_id, id, created, updated) +SELECT elt.id, + '{worker_version_id}'::uuid, + NULL, + '{state}', + '{process_id}'::uuid, + uuid_generate_v4(), + current_timestamp, + current_timestamp +FROM + (SELECT "documents_element"."id" + FROM "documents_element" + INNER JOIN "documents_elementtype" ON ("documents_element"."type_id" = "documents_elementtype"."id") + WHERE ("documents_elementtype"."corpus_id" = '{corpus_id}'::uuid + AND "documents_elementtype"."slug" = 'act')) AS elt ON CONFLICT DO NOTHING