diff --git a/arkindex/process/managers.py b/arkindex/process/managers.py index 83c628646ac7a4c4954359da880c3e793bd2893a..81991e0f06cfd22cabc3c701e382982f60110cc5 100644 --- a/arkindex/process/managers.py +++ b/arkindex/process/managers.py @@ -22,7 +22,7 @@ class ActivityManager(models.Manager): assert isinstance(state, WorkerActivityState), 'State should be an instance of WorkerActivityState' - sql, params = elements_qs.values('id').query.sql_with_params() + sql, params = elements_qs.distinct().values('id').query.sql_with_params() select_params = (worker_version_id, configuration_id, state.value, process_id) + params # With ON CONFLICT, the target constraint is only optional when the action is DO NOTHING. diff --git a/arkindex/process/tests/test_workeractivity.py b/arkindex/process/tests/test_workeractivity.py index 9733c45f58d7f4fd4011d4c0d1e8777ba9d3e130..41243ae4baa760caf599e177e8532c9b2b466eb6 100644 --- a/arkindex/process/tests/test_workeractivity.py +++ b/arkindex/process/tests/test_workeractivity.py @@ -64,6 +64,32 @@ class TestWorkerActivity(FixtureTestCase): self.assertEqual(elements_qs.count(), 5) self.assertEqual(WorkerActivity.objects.filter(state=WorkerActivityState.Started, process=self.process).count(), 5) + def test_bulk_insert_activity_duplicate_elements(self): + """ + WorkerActivity.bulk_insert should exclude duplicate elements + """ + element_type = self.corpus.types.first() + parent1 = self.corpus.elements.create(type=element_type, name='Parent 1') + parent2 = self.corpus.elements.create(type=element_type, name='Parent 2') + element = self.corpus.elements.create(type=element_type, name='Element') + child = self.corpus.elements.create(type=element_type, name='Child') + element.add_parent(parent1) + element.add_parent(parent2) + child.add_parent(element) + elements_qs = Element.objects.filter(paths__path__contains=[element.id], name='Child') + # `child` has two paths that both contain the ID of `element`, because `element` has two parents, + # so filtering on paths__path will duplicate the child + self.assertEqual(elements_qs.count(), 2) + + WorkerActivity.objects.bulk_insert( + self.worker_version.id, + self.process.id, + self.configuration.id, + elements_qs, + state=WorkerActivityState.Started, + ) + self.assertEqual(self.process.activities.filter(state=WorkerActivityState.Started).get().element_id, child.id) + def test_bulk_insert_activity_children(self): """ Bulk insert worker activities for acts diff --git a/arkindex/sql_validation/workeractivity_bulk_insert.sql b/arkindex/sql_validation/workeractivity_bulk_insert.sql index 911bb32b58cb927b5bc69c75aa3a1dcd619b60e8..18bc1c4a2f0447fea9d14d96fe94d437f2559d74 100644 --- a/arkindex/sql_validation/workeractivity_bulk_insert.sql +++ b/arkindex/sql_validation/workeractivity_bulk_insert.sql @@ -8,7 +8,7 @@ SELECT elt.id, current_timestamp, current_timestamp FROM - (SELECT "documents_element"."id" + (SELECT DISTINCT "documents_element"."id" FROM "documents_element" INNER JOIN "documents_elementtype" ON ("documents_element"."type_id" = "documents_elementtype"."id") WHERE ("documents_elementtype"."corpus_id" = '{corpus_id}'::uuid diff --git a/arkindex/sql_validation/workeractivity_bulk_insert_no_configuration.sql b/arkindex/sql_validation/workeractivity_bulk_insert_no_configuration.sql index a3af27ec293feb4840cd94aa81a245749354756a..a10ac8e18baa8e381617ea88512e9ce85901a321 100644 --- a/arkindex/sql_validation/workeractivity_bulk_insert_no_configuration.sql +++ b/arkindex/sql_validation/workeractivity_bulk_insert_no_configuration.sql @@ -8,7 +8,7 @@ SELECT elt.id, current_timestamp, current_timestamp FROM - (SELECT "documents_element"."id" + (SELECT DISTINCT "documents_element"."id" FROM "documents_element" INNER JOIN "documents_elementtype" ON ("documents_element"."type_id" = "documents_elementtype"."id") WHERE ("documents_elementtype"."corpus_id" = '{corpus_id}'::uuid