Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • arkindex/backend
1 result
Show changes
Commits on Source (5)
1.1.3-beta1
1.1.3-rc1
......@@ -1362,8 +1362,8 @@ class UpdateWorkerActivity(GenericAPIView):
)
def put(self, request, *args, **kwarg):
"""
Update a worker activity with a single database requests.
If the couple element_id, worker_version matches no existing
Update a worker activity with two database requests.
If the group element_id, worker_version, configuration_id matches no existing
activity, the update count is 0.
If the new state is disallowed, the update count is 0 too.
"""
......@@ -1375,10 +1375,15 @@ class UpdateWorkerActivity(GenericAPIView):
state = serializer.validated_data['state'].value
process_id = serializer.validated_data['process_id']
# Between zero and one worker run can match the filter due to the DB constraint
worker_run = WorkerRun.objects.filter(dataimport_id=process_id, version_id=worker_version_id).first()
configuration_id = worker_run.configuration_id if worker_run else None
# Between zero and one worker activity can match the filter due to the DB constraint
activity = WorkerActivity.objects.filter(
worker_version_id=worker_version_id,
element_id=element_id,
configuration_id=configuration_id,
state__in=self.allowed_transitions[state]
)
update_count = activity.update(state=state, process_id=process_id)
......
......@@ -8,7 +8,7 @@ logger = logging.getLogger(__name__)
class ActivityManager(models.Manager):
"""Model management for worker activities"""
def bulk_insert(self, worker_version_id, process_id, elements_qs, state=None):
def bulk_insert(self, worker_version_id, process_id, configuration_id, elements_qs, state=None):
"""
Create initial worker activities from a queryset of elements in a efficient way.
Due to the possible large amount of elements, we use a bulk insert from the elements query (best performances).
......@@ -22,25 +22,19 @@ class ActivityManager(models.Manager):
assert isinstance(state, WorkerActivityState), 'State should be an instance of WorkerActivityState'
sql, params = elements_qs.values('id').query.sql_with_params()
select_params = (worker_version_id, configuration_id, state.value, process_id) + params
with connections['default'].cursor() as cursor:
cursor.execute(
f"""
INSERT INTO dataimport_workeractivity
(element_id, worker_version_id, state, process_id, id, created, updated)
(element_id, worker_version_id, configuration_id, state, process_id, id, created, updated)
SELECT
elt.id,
'{worker_version_id}'::uuid,
'{state.value}',
'{process_id}',
uuid_generate_v4(),
current_timestamp,
current_timestamp
elt.id, %s, %s, %s, %s, uuid_generate_v4(), current_timestamp, current_timestamp
FROM ({sql}) AS elt
ON CONFLICT (element_id, worker_version_id) DO NOTHING
ON CONFLICT DO NOTHING
""",
params
)
select_params)
class CorpusWorkerVersionManager(models.Manager):
......
# Generated by Django 3.2.5 on 2021-11-05 13:25
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('dataimport', '0041_remove_old_configuration'),
]
operations = [
migrations.AlterUniqueTogether(
name='workeractivity',
unique_together=set(),
),
migrations.AddConstraint(
model_name='workeractivity',
constraint=models.UniqueConstraint(condition=models.Q(('configuration__isnull', True)), fields=('worker_version', 'element'), name='worker_activity_unique_no_configuration'),
),
migrations.AddConstraint(
model_name='workeractivity',
constraint=models.UniqueConstraint(condition=models.Q(('configuration__isnull', False)), fields=('worker_version', 'element', 'configuration'), name='worker_activity_unique_configuration'),
),
]
......@@ -727,9 +727,20 @@ class WorkerActivity(IndexableModel):
objects = ActivityManager()
class Meta:
unique_together = (
('worker_version', 'element'),
)
constraints = [
# Add configuration unicity
# Two constraints are required as Null values are not compared for unicity
models.UniqueConstraint(
fields=['worker_version', 'element'],
name='worker_activity_unique_no_configuration',
condition=Q(configuration__isnull=True),
),
models.UniqueConstraint(
fields=['worker_version', 'element', 'configuration'],
name='worker_activity_unique_configuration',
condition=Q(configuration__isnull=False),
)
]
class CorpusWorkerVersion(models.Model):
......
......@@ -15,10 +15,11 @@ def initialize_activity(process: DataImport):
"""
try:
with transaction.atomic():
for version_id in process.versions.values_list('id', flat=True):
for version_id, configuration_id in process.worker_runs.values_list('version_id', 'configuration_id'):
WorkerActivity.objects.bulk_insert(
worker_version_id=version_id,
process_id=process.id,
configuration_id=configuration_id,
elements_qs=process.list_elements()
)
except Exception as e:
......
......@@ -10,6 +10,7 @@ from arkindex.dataimport.models import (
DataImportMode,
WorkerActivity,
WorkerActivityState,
WorkerConfiguration,
WorkerVersion,
)
from arkindex.dataimport.tasks import initialize_activity
......@@ -29,15 +30,34 @@ class TestWorkerActivity(FixtureTestCase):
mode=DataImportMode.Workers,
creator=cls.user
)
cls.configuration = WorkerConfiguration.objects.create(worker=cls.worker_version.worker, name='A config', configuration={'a': 'b'})
cls.process.worker_runs.create(version=cls.worker_version, parents=[], configuration=cls.configuration)
def setUp(self):
# Create a queued activity for this element
self.activity = self.element.activities.create(
process=self.process,
worker_version=self.worker_version,
state=WorkerActivityState.Queued
state=WorkerActivityState.Queued,
configuration=self.configuration
)
def test_bulk_insert_activity_no_configuration(self):
"""
Bulk insert worker activities for acts
"""
elements_qs = Element.objects.filter(type__slug='act', type__corpus_id=self.corpus.id)
params = {
'worker_version_id': self.worker_version.id,
'corpus_id': self.corpus.id,
'state': WorkerActivityState.Started.value,
'process_id': self.process.id,
}
with self.assertExactQueries('workeractivity_bulk_insert_no_configuration.sql', params=params):
WorkerActivity.objects.bulk_insert(self.worker_version.id, self.process.id, None, elements_qs, state=WorkerActivityState.Started)
self.assertEqual(elements_qs.count(), 5)
self.assertEqual(WorkerActivity.objects.filter(state=WorkerActivityState.Started, process=self.process).count(), 5)
def test_bulk_insert_activity_children(self):
"""
Bulk insert worker activities for acts
......@@ -45,12 +65,13 @@ class TestWorkerActivity(FixtureTestCase):
elements_qs = Element.objects.filter(type__slug='act', type__corpus_id=self.corpus.id)
params = {
'worker_version_id': self.worker_version.id,
'configuration_id': self.configuration.id,
'corpus_id': self.corpus.id,
'state': WorkerActivityState.Started.value,
'process_id': self.process.id,
}
with self.assertExactQueries('workeractivity_bulk_insert.sql', params=params):
WorkerActivity.objects.bulk_insert(self.worker_version.id, self.process.id, elements_qs, state=WorkerActivityState.Started)
WorkerActivity.objects.bulk_insert(self.worker_version.id, self.process.id, self.configuration.id, elements_qs, state=WorkerActivityState.Started)
self.assertEqual(elements_qs.count(), 5)
self.assertEqual(WorkerActivity.objects.filter(state=WorkerActivityState.Started, process=self.process).count(), 5)
......@@ -66,12 +87,13 @@ class TestWorkerActivity(FixtureTestCase):
element=element,
worker_version=self.worker_version,
state=WorkerActivityState.Processed.value,
process=old_process
process=old_process,
configuration=self.configuration
)
for element in elements_qs[:2]
])
with self.assertNumQueries(1):
WorkerActivity.objects.bulk_insert(self.worker_version.id, self.process.id, elements_qs, state=WorkerActivityState.Started)
WorkerActivity.objects.bulk_insert(self.worker_version.id, self.process.id, self.configuration.id, elements_qs, state=WorkerActivityState.Started)
self.assertEqual(WorkerActivity.objects.filter(element_id__in=elements_qs.values('id')).count(), 5)
self.assertEqual(elements_qs.count(), 5)
# Only 3 acts have been marked as started for this worker
......@@ -84,7 +106,6 @@ class TestWorkerActivity(FixtureTestCase):
Worker activities creation should work with complex elements selection (e.g. with a class filter)
"""
# Mock workers activity to make it a synchronous job
from arkindex.dataimport.tasks import initialize_activity
activities_delay_mock.side_effect = initialize_activity
agent_class = MLClass.objects.create(name='James', corpus=self.corpus)
......@@ -99,7 +120,7 @@ class TestWorkerActivity(FixtureTestCase):
corpus=self.corpus,
best_class=agent_class.name
)
dataimport.worker_runs.create(version=self.worker_version, parents=[])
dataimport.worker_runs.create(version=self.worker_version, parents=[], configuration=self.configuration)
with self.assertNumQueries(22):
dataimport.start()
......@@ -121,8 +142,8 @@ class TestWorkerActivity(FixtureTestCase):
(None, status.HTTP_403_FORBIDDEN, 0),
(self.user, status.HTTP_403_FORBIDDEN, 2),
(self.superuser, status.HTTP_403_FORBIDDEN, 2),
(self.internal_user, status.HTTP_200_OK, 4),
(internal_admin_user, status.HTTP_200_OK, 4)
(self.internal_user, status.HTTP_200_OK, 5),
(internal_admin_user, status.HTTP_200_OK, 5)
)
for user, status_code, requests_count in cases:
self.activity.state = WorkerActivityState.Queued
......@@ -147,7 +168,7 @@ class TestWorkerActivity(FixtureTestCase):
The response is a HTTP_409_CONFLICT
"""
self.client.force_login(self.internal_user)
with self.assertNumQueries(4):
with self.assertNumQueries(5):
response = self.client.put(
reverse('api:update-worker-activity', kwargs={'pk': str(uuid.uuid4())}),
{
......@@ -171,7 +192,7 @@ class TestWorkerActivity(FixtureTestCase):
The response is a HTTP_409_CONFLICT
"""
self.client.force_login(self.internal_user)
with self.assertNumQueries(4):
with self.assertNumQueries(5):
response = self.client.put(
reverse('api:update-worker-activity', kwargs={'pk': str(self.worker_version.id)}),
{
......@@ -229,7 +250,7 @@ class TestWorkerActivity(FixtureTestCase):
for state, payload_state in allowed_states_update:
self.activity.state = state
self.activity.save()
with self.assertNumQueries(4):
with self.assertNumQueries(5):
response = self.client.put(
reverse('api:update-worker-activity', kwargs={'pk': str(self.worker_version.id)}),
{
......@@ -262,7 +283,7 @@ class TestWorkerActivity(FixtureTestCase):
for state, payload_state in forbidden_states_update:
self.activity.state = state
self.activity.save()
with self.assertNumQueries(4):
with self.assertNumQueries(5):
response = self.client.put(
reverse('api:update-worker-activity', kwargs={'pk': str(self.worker_version.id)}),
{
......@@ -291,10 +312,11 @@ class TestWorkerActivity(FixtureTestCase):
mode=DataImportMode.Workers,
creator=self.user
)
process2.worker_runs.create(version=self.worker_version, parents=[], configuration=self.configuration)
self.activity.process_id = process2.id
self.activity.save()
with self.assertNumQueries(4):
with self.assertNumQueries(5):
response = self.client.put(
reverse('api:update-worker-activity', kwargs={'pk': str(self.worker_version.id)}),
{
......@@ -331,22 +353,22 @@ class TestWorkerActivity(FixtureTestCase):
process.list_elements = MagicMock()
process.list_elements.return_value = elts_qs
v1_id, v2_id = uuid.uuid4(), uuid.uuid4()
worker_version_2 = WorkerVersion.objects.get(worker__slug='dla')
process.worker_runs.create(version=self.worker_version, parents=[], configuration=None)
process.worker_runs.create(version=worker_version_2, parents=[], configuration=None)
bulk_insert_mock.side_effect = [
bulk_insert_mock,
Exception('Something went wrong')
]
process.versions = MagicMock()
process.versions.values_list.return_value = [v1_id, v2_id]
with self.assertRaises(Exception):
initialize_activity(process)
self.assertEqual(bulk_insert_mock.call_count, 2)
self.assertListEqual(bulk_insert_mock.call_args_list, [
call(worker_version_id=v1_id, process_id=process.id, elements_qs=elts_qs),
call(worker_version_id=v2_id, process_id=process.id, elements_qs=elts_qs)
call(worker_version_id=self.worker_version.id, process_id=process.id, configuration_id=None, elements_qs=elts_qs),
call(worker_version_id=worker_version_2.id, process_id=process.id, configuration_id=None, elements_qs=elts_qs)
])
process.refresh_from_db()
......
import os.path
import subprocess
import yaml
from django.core.checks import Error, Warning, register
......@@ -44,56 +43,18 @@ def local_imageserver_check(*args, **kwargs):
"""
from django.conf import settings
from arkindex.images.models import ImageServer
errors = []
local_id = settings.LOCAL_IMAGESERVER_ID
try:
ImageServer.objects.get(id=local_id)
except ImageServer.DoesNotExist:
errors.append(Error(
'Local ImageServer with ID {} does not exist'.format(local_id),
hint='settings.LOCAL_IMAGESERVER_ID = {}'.format(local_id),
id='arkindex.E004',
))
return errors
@register()
def docker_images_check(*args, **kwargs):
"""
Check that the Arkindex backend and ML images exist
"""
from django.conf import settings
errors = []
if settings.PONOS_RECIPE is None:
# In a Ponos task
return []
images = (
(settings.ARKINDEX_TASKS_IMAGE, 'ARKINDEX_TASKS_IMAGE'),
)
for image_tag, setting_name in images:
try:
subprocess.run(
['docker', 'image', 'inspect', image_tag],
check=True,
# Silent output
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
except subprocess.CalledProcessError:
errors.append(Error(
'Docker image with tag "{}" was not found.'.format(image_tag),
hint='settings.{} = "{}"'.format(setting_name, image_tag),
id='arkindex.E006',
))
except FileNotFoundError:
# Docker is not available, ignore check
pass
return [Warning(
f'Local ImageServer with ID {local_id} does not exist',
hint=f'settings.LOCAL_IMAGESERVER_ID = {local_id}',
id='arkindex.W009',
)]
return errors
return []
@register()
......@@ -176,14 +137,15 @@ def s3_check(*args, **kwargs):
'AWS_SECRET_KEY': 'AWS secret key',
'AWS_THUMBNAIL_BUCKET': 'S3 thumbnails bucket name',
'AWS_STAGING_BUCKET': 'S3 staging bucket name',
'AWS_EXPORT_BUCKET': 'S3 export bucket name',
}
errors = []
for name, display_name in aws_settings.items():
value = getattr(settings, name, None)
if not value:
errors.append(Error(
'{} is missing; all S3-related features will fail.'.format(display_name),
hint='settings.{} = {}'.format(name, repr(value)),
f'{display_name} is missing; all S3-related features will fail.',
hint=f'settings.{name} = {value!r}',
id='arkindex.E011',
))
......@@ -198,6 +160,6 @@ def public_hostname_check(*args, **kwargs):
return [Warning(
'The public_hostname setting is missing; absolute URLs may not be correctly generated.',
hint='settings.PUBLIC_HOSTNAME',
id='arkindex.W007',
id='arkindex.W008',
)]
return []
import subprocess
from pathlib import Path
from subprocess import CalledProcessError
from unittest.mock import call, patch
from unittest.mock import patch
from django.conf import settings
from django.core.checks import Error, Warning
......@@ -47,10 +45,10 @@ class ChecksTestCase(TestCase):
with self.settings(LOCAL_IMAGESERVER_ID=42):
self.assertListEqual(local_imageserver_check(), [
Error(
Warning(
'Local ImageServer with ID 42 does not exist',
hint='settings.LOCAL_IMAGESERVER_ID = 42',
id='arkindex.E004',
id='arkindex.W009',
),
])
......@@ -61,55 +59,6 @@ class ChecksTestCase(TestCase):
srv.delete()
@patch('arkindex.project.checks.subprocess.run')
@override_settings(
ARKINDEX_TASKS_IMAGE='nuh',
)
def test_docker_images_check(self, run_mock):
from arkindex.project.checks import docker_images_check
expected_calls = [
call(
['docker', 'image', 'inspect', 'nuh'],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
check=True,
),
]
run_mock.side_effect = CalledProcessError(1, '')
self.assertListEqual(docker_images_check(), [
Error(
'Docker image with tag "nuh" was not found.',
hint='settings.ARKINDEX_TASKS_IMAGE = "nuh"',
id='arkindex.E006',
)
])
self.assertEqual(run_mock.call_count, 1)
self.assertEqual(run_mock.call_args_list, expected_calls)
@patch('arkindex.project.checks.subprocess.run')
def test_docker_images_check_missing_client(self, run_mock):
"""
Test the Docker images check does not show errors if the Docker client is missing
"""
from arkindex.project.checks import docker_images_check
run_mock.side_effect = FileNotFoundError
with self.settings(ARKINDEX_APP_IMAGE='nope', ARKINDEX_TASKS_IMAGE='nuh'):
self.assertListEqual(docker_images_check(), [])
self.assertEqual(run_mock.call_count, 1)
self.assertEqual(run_mock.call_args_list, [
call(
['docker', 'image', 'inspect', 'nuh'],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
check=True,
),
])
@override_settings()
@patch('arkindex.project.checks.parse_recipe')
def test_ponos_recipe_check(self, parse_mock):
......@@ -180,6 +129,7 @@ class ChecksTestCase(TestCase):
del settings.AWS_SECRET_KEY
del settings.AWS_THUMBNAIL_BUCKET
del settings.AWS_STAGING_BUCKET
del settings.AWS_EXPORT_BUCKET
self.assertCountEqual(s3_check(), [
Error(
'AWS access key ID is missing; all S3-related features will fail.',
......@@ -201,12 +151,18 @@ class ChecksTestCase(TestCase):
hint='settings.AWS_STAGING_BUCKET = None',
id='arkindex.E011',
),
Error(
'S3 export bucket name is missing; all S3-related features will fail.',
hint='settings.AWS_EXPORT_BUCKET = None',
id='arkindex.E011',
),
])
settings.AWS_ACCESS_KEY = 'key'
settings.AWS_SECRET_KEY = 's3kr3t'
settings.AWS_THUMBNAIL_BUCKET = 'Thumbs.db'
settings.AWS_STAGING_BUCKET = 'buckette'
settings.AWS_EXPORT_BUCKET = 'devnull'
self.assertListEqual(s3_check(), [])
@override_settings()
......@@ -218,7 +174,7 @@ class ChecksTestCase(TestCase):
Warning(
'The public_hostname setting is missing; absolute URLs may not be correctly generated.',
hint='settings.PUBLIC_HOSTNAME',
id='arkindex.W007',
id='arkindex.W008',
)
])
......
INSERT INTO dataimport_workeractivity (element_id, worker_version_id, state, process_id, id, created, updated)
INSERT INTO dataimport_workeractivity (element_id, worker_version_id, configuration_id, state, process_id, id, created, updated)
SELECT elt.id,
'{worker_version_id}'::uuid,
'{configuration_id}'::uuid,
'{state}',
'{process_id}',
'{process_id}'::uuid,
uuid_generate_v4(),
current_timestamp,
current_timestamp
......@@ -11,4 +12,4 @@ FROM
FROM "documents_element"
INNER JOIN "documents_elementtype" ON ("documents_element"."type_id" = "documents_elementtype"."id")
WHERE ("documents_elementtype"."corpus_id" = '{corpus_id}'::uuid
AND "documents_elementtype"."slug" = 'act')) AS elt ON CONFLICT (element_id, worker_version_id) DO NOTHING
AND "documents_elementtype"."slug" = 'act')) AS elt ON CONFLICT DO NOTHING
INSERT INTO dataimport_workeractivity (element_id, worker_version_id, configuration_id, state, process_id, id, created, updated)
SELECT elt.id,
'{worker_version_id}'::uuid,
NULL,
'{state}',
'{process_id}'::uuid,
uuid_generate_v4(),
current_timestamp,
current_timestamp
FROM
(SELECT "documents_element"."id"
FROM "documents_element"
INNER JOIN "documents_elementtype" ON ("documents_element"."type_id" = "documents_elementtype"."id")
WHERE ("documents_elementtype"."corpus_id" = '{corpus_id}'::uuid
AND "documents_elementtype"."slug" = 'act')) AS elt ON CONFLICT DO NOTHING