diff --git a/arkindex/dataimport/models.py b/arkindex/dataimport/models.py index c3e47fea78d01254cd127221d88fda44fc1b50de..0a53eef9ac9d8a5b2504324f449b1df96c7094b4 100644 --- a/arkindex/dataimport/models.py +++ b/arkindex/dataimport/models.py @@ -77,7 +77,7 @@ class DataImport(IndexableModel): 'DB_PASSWORD': settings.DATABASES['default']['PASSWORD'], 'DB_NAME': settings.DATABASES['default']['NAME'], 'LOCAL_IMAGESERVER_ID': settings.LOCAL_IMAGESERVER_ID, - 'ES_HOST': settings.ELASTIC_SEARCH_HOSTS[0], + 'REDIS_HOST': settings.REDIS_HOST, # Some empty folder to bypass the system check 'ML_CLASSIFIERS_DIR': '/data/current', }, diff --git a/arkindex/documents/acts.py b/arkindex/documents/acts.py index 363b0382b23cfc8bda6d34601409561e1e3ef305..144d9a1667e5bae0845a37736a850958837dd53e 100644 --- a/arkindex/documents/acts.py +++ b/arkindex/documents/acts.py @@ -4,8 +4,8 @@ from django.db.models import CharField, Value, Prefetch from django.db.models.functions import Concat from arkindex_common.enums import MetaType from arkindex.project.polygon import Polygon +from arkindex.project.triggers import reindex_start from arkindex.images.models import Image -from arkindex.documents.indexer import Indexer from arkindex.documents.models import Element, ElementType, Corpus, MetaData import csv import logging @@ -154,4 +154,4 @@ class ActsImporter(object): assert failed < count, 'No acts were imported' logger.info('Updating search index') - Indexer().run_index(Element.objects.get_descending(self.folder.id).filter(type=self.act_type)) + reindex_start(element=self.folder, elements=True) diff --git a/arkindex/documents/api/admin.py b/arkindex/documents/api/admin.py new file mode 100644 index 0000000000000000000000000000000000000000..eb48ce5050d6e8d8672ff560e11937c4d7f15021 --- /dev/null +++ b/arkindex/documents/api/admin.py @@ -0,0 +1,16 @@ +from rest_framework.generics import CreateAPIView +from arkindex.project.permissions import IsAdminUser +from arkindex.documents.serializers.admin import ReindexConfigSerializer + + +class ReindexStart(CreateAPIView): + """ + Run an ElasticSearch indexation from the API. + """ + permission_classes = (IsAdminUser, ) + serializer_class = ReindexConfigSerializer + openapi_overrides = { + 'operationId': 'Reindex', + 'description': 'Manually reindex elements, transcriptions and entities for search APIs', + 'tags': ['management'] + } diff --git a/arkindex/documents/api/ml.py b/arkindex/documents/api/ml.py index 7ec27dfc6b2b0b3f54c2d2fd09bd281418b69c8d..eae17521bf5834dff9306ea53ac0b03395c5f844 100644 --- a/arkindex/documents/api/ml.py +++ b/arkindex/documents/api/ml.py @@ -17,14 +17,14 @@ from arkindex.documents.serializers.ml import ( TranscriptionsSerializer, TranscriptionCreateSerializer, DataSourceStatsSerializer, ClassificationsSelectionSerializer ) -from arkindex.documents.indexer import Indexer from arkindex.documents.pagexml import PageXmlParser from arkindex.images.models import Zone -from arkindex.images.importer import build_transcriptions, save_transcriptions, index_transcriptions +from arkindex.images.importer import build_transcriptions, save_transcriptions +from arkindex.project.mixins import SelectionMixin from arkindex.project.parsers import XMLParser from arkindex.project.permissions import IsVerified, IsAdminUser from arkindex.project.polygon import Polygon -from arkindex.project.mixins import SelectionMixin +from arkindex.project.triggers import reindex_start import os.path import logging @@ -63,9 +63,7 @@ class TranscriptionCreate(CreateAPIView): ts.save() # Index in ES - Indexer().run_index( - Transcription.objects.filter(id=ts.id) - ) + reindex_start(element=element, transcriptions=True, elements=True) return ts def create(self, request, *args, **kwargs): @@ -150,7 +148,7 @@ class TranscriptionBulk(CreateAPIView, UpdateAPIView): source=source, )) - index_transcriptions(transcriptions) + reindex_start(element=parent, transcriptions=True, elements=True) return Response(None, status=status.HTTP_201_CREATED, headers=headers) @@ -375,11 +373,7 @@ class PageXmlTranscriptionsImport(CreateModelMixin, APIView): len(entities_id) )) - if entities_id: - # Reindex entities to add new entities in ElasticSearch - Indexer().run_index( - Entity.objects.filter(id__in=entities_id) - ) + reindex_start(element=element, transcriptions=True, elements=True, entities=True) return Response( status=status.HTTP_201_CREATED, diff --git a/arkindex/documents/consumers.py b/arkindex/documents/consumers.py new file mode 100644 index 0000000000000000000000000000000000000000..e2d77a02b8de91184ea06398457da14c91420442 --- /dev/null +++ b/arkindex/documents/consumers.py @@ -0,0 +1,50 @@ +from channels.consumer import SyncConsumer +from django.db.models import Q +from arkindex.documents.indexer import Indexer +from arkindex.documents.models import Element, Transcription, Entity + + +class ReindexConsumer(SyncConsumer): + + def reindex_start(self, message): + corpus_id, element_id, transcriptions, elements, entities, drop = map( + message.get, + ('corpus', 'element', 'transcriptions', 'elements', 'entities', 'drop'), + (None, None, True, True, True, False), # Default values + ) + indexer = Indexer() + + if drop: + if transcriptions: + indexer.drop_index(Transcription.es_document) + if elements: + indexer.drop_index(Element.es_document) + if entities: + indexer.drop_index(Entity.es_document) + indexer.setup() + + if element_id or corpus_id: + if element_id: + # Pick this element, and all its children + elements_queryset = list(Element.objects.get_descending(element_id)) + elements_queryset.append(Element.objects.get(id=element_id)) + else: + # Pick all elements in the corpus + elements_queryset = Element.objects.filter(corpus_id=corpus_id) + + transcriptions_queryset = Transcription.objects.filter(element__in=elements_queryset) + entities_queryset = Entity.objects.filter( + Q(metadatas__element__in=elements_queryset) + | Q(transcriptions__element__in=elements_queryset) + ) + else: + transcriptions_queryset = Transcription.objects.all() + elements_queryset = Element.objects.all() + entities_queryset = Entity.objects.all() + + if transcriptions: + indexer.run_index(transcriptions_queryset, bulk_size=400) + if elements: + indexer.run_index(elements_queryset, bulk_size=100) + if entities: + indexer.run_index(entities_queryset, bulk_size=400) diff --git a/arkindex/documents/managers.py b/arkindex/documents/managers.py index 25eb0fbaf8e5a22e00d86d72f49dff9c5d3439ac..efd832dc71e40f0163320d72788b408af648c105 100644 --- a/arkindex/documents/managers.py +++ b/arkindex/documents/managers.py @@ -170,3 +170,17 @@ class CorpusManager(models.Manager): # Authenticated users can write only on corpora with ACL return qs.filter(corpus_right__user=user, corpus_right__can_write=True).distinct() + + def admin(self, user): + # An anonymous user cannot manage anything + if user.is_anonymous: + return super().none() + + qs = super().get_queryset().order_by('name') + + # Admins and internal users have access to every corpus + if user.is_admin or user.is_internal: + return qs.all() + + # Authenticated users can manage only corpora with ACL + return qs.filter(corpus_right__user=user, corpus_right__can_admin=True).distinct() diff --git a/arkindex/documents/pagexml.py b/arkindex/documents/pagexml.py index e95663bc986ec9466e2b9d8dab61de691631d06c..e8e8fc76e8fa95addd4a58ae5d62e158b159f69b 100644 --- a/arkindex/documents/pagexml.py +++ b/arkindex/documents/pagexml.py @@ -5,7 +5,6 @@ from arkindex.project.polygon import Polygon from arkindex_common.enums import TranscriptionType from arkindex.documents.models import \ DataSource, Element, Entity, EntityRole, EntityLink, TranscriptionEntity -from arkindex.documents.indexer import Indexer import functools import string import Levenshtein @@ -68,19 +67,6 @@ class PageXmlParser(object): score=1 if region.confidence is None else region.confidence, ) - def index(self, element): - """ - Create or Update ElasticSearch index for transcriptions and related elements - """ - assert isinstance(element, Element), 'Element should be an Arkindex element' - es_indexer = Indexer() - es_indexer.run_index( - Element.objects.filter(id=element.id), - ) - es_indexer.run_index( - element.transcriptions.all(), - ) - def save(self, element): assert isinstance(element, Element), 'Element should be an Arkindex element' if self.pagexml_page.page.text_regions is None or not len(self.pagexml_page.page.text_regions): @@ -115,7 +101,6 @@ class PageXmlParser(object): region_ts_count, line_ts_count, )) - self.index(element) return transcriptions def merge(self, blocks): diff --git a/arkindex/documents/serializers/admin.py b/arkindex/documents/serializers/admin.py new file mode 100644 index 0000000000000000000000000000000000000000..0377d233a19d6c2c61b650f416d79839aca77e55 --- /dev/null +++ b/arkindex/documents/serializers/admin.py @@ -0,0 +1,42 @@ +from rest_framework import serializers +from arkindex.project.triggers import reindex_start +from arkindex.documents.models import Corpus, Element + + +class ReindexConfigSerializer(serializers.Serializer): + corpus = serializers.PrimaryKeyRelatedField(queryset=Corpus.objects.all(), required=False) + element = serializers.PrimaryKeyRelatedField(queryset=Element.objects.all(), required=False) + transcriptions = serializers.BooleanField(default=True) + elements = serializers.BooleanField(default=True) + entities = serializers.BooleanField(default=True) + drop = serializers.BooleanField(default=False) + + default_error_messages = { + 'reindex_nothing': 'At least one index type is required.', + 'filtered_drop': '`drop` can only be used when reindexing everything.', + 'element_in_corpus': 'The selected element is not in the selected corpus.', + } + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + if not self.context.get('request'): + return + user = self.context.get('request').user + self.fields['corpus'].queryset = Corpus.objects.admin(user) + self.fields['element'].queryset = Element.objects.filter(corpus__in=Corpus.objects.admin(user)) + + def validate(self, data): + corpus, element, transcriptions, elements, entities, drop = map( + data.get, + ('corpus', 'element', 'transcriptions', 'elements', 'entities', 'drop') + ) + if not (transcriptions or elements or entities): + self.fail('reindex_nothing') + if drop and (corpus or element): + self.fail('filtered_drop') + if element and corpus and element.corpus_id != corpus.id: + self.fail('element_in_corpus') + return data + + def save(self): + reindex_start(**self.validated_data) diff --git a/arkindex/documents/tei.py b/arkindex/documents/tei.py index 078892f754ea1c746e2dd57f799bf8989e3970cb..0da96c338e1f218848a614489c0cb3f86945f670 100644 --- a/arkindex/documents/tei.py +++ b/arkindex/documents/tei.py @@ -2,8 +2,8 @@ from itertools import groupby from arkindex_common.tei import TeiElement, TeiParser as BaseTeiParser from arkindex_common.enums import MetaType from arkindex.project.tools import find_closest -from arkindex.documents.indexer import Indexer -from arkindex.documents.models import Element, Entity, MetaData, DataSource, MLToolType +from arkindex.project.triggers import reindex_start +from arkindex.documents.models import Element, Entity, DataSource, MLToolType import logging @@ -147,11 +147,7 @@ class TeiParser(BaseTeiParser): """ Create or Update ElasticSearch index for indexed elements """ - md_ids = (md.id for md in db_metadatas) - date_metadatas = MetaData.objects.filter(id__in=md_ids, type=MetaType.Date) - - elements = Element.objects \ - .filter(metadatas__in=date_metadatas, type__folder=False, type__hidden=False) \ - .prefetch_related('metadatas', 'transcriptions') + md_ids = (md.id for md in db_metadatas if md.type == MetaType.Date) + elements = Element.objects.filter(metadatas__id__in=md_ids, type__folder=False, type__hidden=False) if elements.exists(): - Indexer().run_index(elements) + reindex_start(corpus=self.db_corpus, elements=True) diff --git a/arkindex/documents/tests/test_acts_importer.py b/arkindex/documents/tests/test_acts_importer.py index 2216131d0328767809bc3c91ba2b69f9ac94acfd..36bd4cb29082a278a672fa0ab8ee709d8ce93487 100644 --- a/arkindex/documents/tests/test_acts_importer.py +++ b/arkindex/documents/tests/test_acts_importer.py @@ -1,5 +1,6 @@ from unittest.mock import patch from pathlib import Path +from asyncmock import AsyncMock from arkindex_common.enums import MetaType from arkindex.project.tests import FixtureTestCase from arkindex.project.polygon import Polygon @@ -22,7 +23,9 @@ class TestActsImporter(FixtureTestCase): cls.surface_type = cls.corpus.types.get(slug='surface') cls.volume = cls.corpus.elements.get(name='Volume 2', type=cls.folder_type) - def test_import(self): + @patch('arkindex.project.triggers.get_channel_layer') + def test_import(self, get_layer_mock): + get_layer_mock().send = AsyncMock() self.assertFalse( Element.objects .get_descending(self.volume.id) @@ -83,6 +86,16 @@ class TestActsImporter(FixtureTestCase): self.assertEqual(surface.zone.image, self.img5) self.assertEqual(surface.zone.polygon, Polygon.from_coords(0, 500, 1000, 500)) # Second half + get_layer_mock().send.assert_called_once_with('reindex', { + 'type': 'reindex.start', + 'element': str(self.volume.id), + 'corpus': None, + 'transcriptions': False, + 'elements': True, + 'entities': False, + 'drop': False, + }) + def test_epic_fail(self): importer = ActsImporter( ACT_SAMPLES / 'volume 2.csv', diff --git a/arkindex/documents/tests/test_admin_api.py b/arkindex/documents/tests/test_admin_api.py new file mode 100644 index 0000000000000000000000000000000000000000..d1cf40a44b6c579e29c249d661c5dffc9873f592 --- /dev/null +++ b/arkindex/documents/tests/test_admin_api.py @@ -0,0 +1,74 @@ +from unittest.mock import patch +from asyncmock import AsyncMock +from django.urls import reverse +from rest_framework import status +from arkindex.project.tests import FixtureTestCase +from arkindex.documents.models import Corpus + + +class TestAdminAPI(FixtureTestCase): + + def test_reindex_requires_login(self): + response = self.client.post(reverse('api:reindex-start'), {}) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + + def test_reindex_requires_admin(self): + self.client.force_login(self.user) + response = self.client.post(reverse('api:reindex-start'), {}) + self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) + + def test_reindex_requires_index_types(self): + self.client.force_login(self.superuser) + response = self.client.post(reverse('api:reindex-start'), { + 'transcriptions': False, + 'elements': False, + 'entities': False, + }) + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertDictEqual(response.json(), { + 'non_field_errors': ['At least one index type is required.'], + }) + + def test_reindex_drop_only_all(self): + self.client.force_login(self.superuser) + response = self.client.post(reverse('api:reindex-start'), { + 'corpus': str(self.corpus.id), + 'drop': True, + }) + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertDictEqual(response.json(), { + 'non_field_errors': ['`drop` can only be used when reindexing everything.'], + }) + + def test_reindex_element_in_corpus(self): + corpus2 = Corpus.objects.create(name='some corpus') + self.client.force_login(self.superuser) + response = self.client.post(reverse('api:reindex-start'), { + 'corpus': str(corpus2.id), + 'element': str(self.corpus.elements.first().id), + }) + self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) + self.assertDictEqual(response.json(), { + 'non_field_errors': ['The selected element is not in the selected corpus.'], + }) + + @patch('arkindex.project.triggers.get_channel_layer') + def test_reindex(self, get_layer_mock): + get_layer_mock.return_value.send = AsyncMock() + + self.client.force_login(self.superuser) + response = self.client.post(reverse('api:reindex-start'), {}) + self.assertEqual(response.status_code, status.HTTP_201_CREATED) + + get_layer_mock.return_value.send.assert_called_once_with( + 'reindex', + { + 'element': None, + 'corpus': None, + 'transcriptions': True, + 'elements': True, + 'entities': True, + 'drop': False, + 'type': 'reindex.start', + }, + ) diff --git a/arkindex/documents/tests/test_consumers.py b/arkindex/documents/tests/test_consumers.py new file mode 100644 index 0000000000000000000000000000000000000000..13e82f0293f0ceb32306f32babb591c871834c89 --- /dev/null +++ b/arkindex/documents/tests/test_consumers.py @@ -0,0 +1,171 @@ +from unittest.mock import patch +from django.db.models import Q +from arkindex_common.enums import TranscriptionType, MetaType, EntityType +from arkindex.project.tests import FixtureTestCase +from arkindex.documents.consumers import ReindexConsumer +from arkindex.documents.models import Corpus, Element, Transcription, Entity, DataSource + + +@patch('arkindex.documents.consumers.Indexer') +class TestConsumers(FixtureTestCase): + + @classmethod + def setUpTestData(cls): + super().setUpTestData() + source = DataSource.objects.get(slug='test') + + cls.folder = cls.corpus.elements.get(name='Volume 1') + cls.folder.metadatas.create( + type=MetaType.Entity, + name='something', + value='Some entity', + entity=cls.corpus.entities.create( + type=EntityType.Person, + name='Some entity', + source=source, + ) + ) + + corpus2 = Corpus.objects.create(name='Another corpus') + element2 = corpus2.elements.create( + type=corpus2.types.create(display_name='Element'), + name='An element', + ) + ts = element2.transcriptions.create( + score=0.8, + text='something', + type=TranscriptionType.Word, + source=source, + ) + ts.transcription_entities.create( + entity=corpus2.entities.create( + type=EntityType.Misc, + name='Some other entity', + source=source, + ), + offset=0, + length=1, + ) + + def assertQuerysetEqual(self, queryset1, queryset2, **options): + """ + Make Django's assertQuerysetEqual slightly nicer to use + """ + options.setdefault('ordered', False) + return super().assertQuerysetEqual(queryset1, map(repr, queryset2), **options) + + def _assert_all_elements(self, call_args): + (queryset, ), kwargs = call_args + self.assertQuerysetEqual(queryset, Element.objects.all()) + self.assertDictEqual(kwargs, {'bulk_size': 100}) + + def _assert_all_entities(self, call_args): + (queryset, ), kwargs = call_args + self.assertQuerysetEqual(queryset, Entity.objects.all()) + self.assertDictEqual(kwargs, {'bulk_size': 400}) + + def _assert_all_transcriptions(self, call_args): + (queryset, ), kwargs = call_args + self.assertQuerysetEqual(queryset, Transcription.objects.all()) + self.assertDictEqual(kwargs, {'bulk_size': 400}) + + def _assert_all(self, mock): + self.assertEqual(mock().run_index.call_count, 3) + elements_call, entities_call, ts_call = sorted(mock().run_index.call_args_list, key=repr) + self._assert_all_elements(elements_call) + self._assert_all_entities(entities_call) + self._assert_all_transcriptions(ts_call) + + def test_reindex_all(self, mock): + ReindexConsumer({}).reindex_start({}) + self.assertEqual(mock().drop_index.call_count, 0) + self._assert_all(mock) + + def test_reindex_drop(self, mock): + ReindexConsumer({}).reindex_start({ + 'drop': True, + }) + self.assertEqual(mock().drop_index.call_count, 3) + mock().drop_index.assert_any_call(Element.es_document) + mock().drop_index.assert_any_call(Entity.es_document) + mock().drop_index.assert_any_call(Transcription.es_document) + self._assert_all(mock) + + def test_reindex_only_transcriptions(self, mock): + ReindexConsumer({}).reindex_start({ + 'transcriptions': True, + 'entities': False, + 'elements': False, + }) + self.assertEqual(mock().drop_index.call_count, 0) + self.assertEqual(mock().run_index.call_count, 1) + self._assert_all_transcriptions(mock().run_index.call_args) + + def test_reindex_only_elements(self, mock): + ReindexConsumer({}).reindex_start({ + 'transcriptions': False, + 'entities': False, + 'elements': True, + }) + self.assertEqual(mock().drop_index.call_count, 0) + self.assertEqual(mock().run_index.call_count, 1) + self._assert_all_elements(mock().run_index.call_args) + + def test_reindex_only_entities(self, mock): + ReindexConsumer({}).reindex_start({ + 'transcriptions': False, + 'entities': True, + 'elements': False, + }) + self.assertEqual(mock().drop_index.call_count, 0) + self.assertEqual(mock().run_index.call_count, 1) + self._assert_all_entities(mock().run_index.call_args) + + def test_reindex_corpus(self, mock): + ReindexConsumer({}).reindex_start({ + 'corpus': str(self.corpus.id), + }) + self.assertEqual(mock().drop_index.call_count, 0) + self.assertEqual(mock().run_index.call_count, 3) + elements_call, entities_call, ts_call = sorted(mock().run_index.call_args_list, key=repr) + + (queryset, ), kwargs = elements_call + self.assertQuerysetEqual(queryset, self.corpus.elements.all()) + self.assertDictEqual(kwargs, {'bulk_size': 100}) + + (queryset, ), kwargs = entities_call + self.assertQuerysetEqual(queryset, self.corpus.entities.all()) + self.assertDictEqual(kwargs, {'bulk_size': 400}) + + (queryset, ), kwargs = ts_call + self.assertQuerysetEqual(queryset, Transcription.objects.filter(element__corpus_id=self.corpus.id)) + self.assertDictEqual(kwargs, {'bulk_size': 400}) + + def test_reindex_element(self, mock): + ReindexConsumer({}).reindex_start({ + 'element': str(self.folder.id), + }) + self.assertEqual(mock().drop_index.call_count, 0) + self.assertEqual(mock().run_index.call_count, 3) + entities_call, ts_call, elements_call = sorted(mock().run_index.call_args_list, key=repr) + + elements_list = list(Element.objects.get_descending(self.folder.id)) + elements_list.append(self.folder) + + (queryset, ), kwargs = elements_call + self.assertQuerysetEqual(queryset, elements_list) + self.assertDictEqual(kwargs, {'bulk_size': 100}) + + (queryset, ), kwargs = entities_call + self.assertQuerysetEqual( + queryset, + Entity.objects.filter( + Q(metadatas__element__in=elements_list) + | Q(transcriptions__element__in=elements_list) + ), + ) + self.assertDictEqual(kwargs, {'bulk_size': 400}) + + (queryset, ), kwargs = ts_call + self.assertQuerysetEqual(queryset, Transcription.objects.filter(element__in=elements_list)) + self.assertDictEqual(kwargs, {'bulk_size': 400}) diff --git a/arkindex/documents/tests/test_pagexml.py b/arkindex/documents/tests/test_pagexml.py index 4213a017bc43a69bd6040161d7880ca99a2eb65e..902a60cdfaa7b679525e36c309fd490c2c9477c7 100644 --- a/arkindex/documents/tests/test_pagexml.py +++ b/arkindex/documents/tests/test_pagexml.py @@ -1,4 +1,6 @@ from pathlib import Path +from unittest.mock import patch +from asyncmock import AsyncMock from django.urls import reverse from rest_framework import status from arkindex.project.tests import FixtureAPITestCase @@ -37,7 +39,9 @@ class TestPageXml(FixtureAPITestCase): ) self.assertEqual(resp.status_code, status.HTTP_403_FORBIDDEN) - def test_pagexml_import(self): + @patch('arkindex.project.triggers.get_channel_layer') + def test_pagexml_import(self, get_layer_mock): + get_layer_mock.return_value.send = AsyncMock() self.assertFalse(self.page.transcriptions.exists()) self.client.force_login(self.user) with (FIXTURES / 'transcript.xml').open() as f: @@ -60,6 +64,15 @@ class TestPageXml(FixtureAPITestCase): ]) # All transcriptions have a score of 100% self.assertFalse(self.page.transcriptions.exclude(score=1.0).exists()) + get_layer_mock().send.assert_called_once_with('reindex', { + 'type': 'reindex.start', + 'element': str(self.page.id), + 'corpus': None, + 'transcriptions': True, + 'entities': True, + 'elements': True, + 'drop': False, + }) def test_pagexml_import_requires_zone(self): volume = self.corpus.elements.get(name='Volume 1') @@ -74,7 +87,9 @@ class TestPageXml(FixtureAPITestCase): ) self.assertEqual(resp.status_code, status.HTTP_404_NOT_FOUND) - def test_pagexml_import_any_element(self): + @patch('arkindex.project.triggers.get_channel_layer') + def test_pagexml_import_any_element(self, get_layer_mock): + get_layer_mock.return_value.send = AsyncMock() surface = self.corpus.elements.get(name='Surface A') self.client.force_login(self.user) with (FIXTURES / 'transcript.xml').open() as f: @@ -97,6 +112,15 @@ class TestPageXml(FixtureAPITestCase): ]) # All transcriptions have a score of 100% self.assertFalse(surface.transcriptions.exclude(score=1.0).exists()) + get_layer_mock().send.assert_called_once_with('reindex', { + 'type': 'reindex.start', + 'element': str(surface.id), + 'corpus': None, + 'transcriptions': True, + 'elements': True, + 'entities': True, + 'drop': False, + }) def test_pagexml_create_blocks(self): parser = PageXmlParser(FIXTURES / 'create_blocks.xml') diff --git a/arkindex/documents/tests/test_transcription_create.py b/arkindex/documents/tests/test_transcription_create.py index 999351ce98dbcacefa340f095ba39a038b427f3d..050acf8170d03c84ca74ab02c1f12a16e98835b0 100644 --- a/arkindex/documents/tests/test_transcription_create.py +++ b/arkindex/documents/tests/test_transcription_create.py @@ -1,5 +1,6 @@ from django.urls import reverse from unittest.mock import patch +from asyncmock import AsyncMock from rest_framework import status from arkindex.project.tests import FixtureAPITestCase from arkindex.project.polygon import Polygon @@ -24,11 +25,13 @@ class TestTranscriptionCreate(FixtureAPITestCase): response = self.client.post(reverse('api:transcription-create'), format='json') self.assertEqual(response.status_code, status.HTTP_403_FORBIDDEN) - @patch('arkindex.documents.api.ml.Indexer') - def test_create_transcription(self, indexer): + @patch('arkindex.project.triggers.get_channel_layer') + def test_create_transcription(self, get_layer_mock): """ Checks the view creates transcriptions, zones, links, paths and runs ES indexing """ + get_layer_mock.return_value.send = AsyncMock() + self.client.force_login(self.user) response = self.client.post(reverse('api:transcription-create'), format='json', data={ "type": "word", @@ -39,18 +42,29 @@ class TestTranscriptionCreate(FixtureAPITestCase): "score": 0.83, }) self.assertEqual(response.status_code, status.HTTP_201_CREATED) + new_ts = Transcription.objects.get(text="NEKUDOTAYIM", type=TranscriptionType.Word) self.assertEqual(new_ts.zone.polygon, Polygon.from_coords(0, 0, 100, 100)) self.assertEqual(new_ts.score, 0.83) self.assertEqual(new_ts.source, self.src) self.assertTrue(self.page.transcriptions.filter(pk=new_ts.id).exists()) - self.assertTrue(indexer.return_value.run_index.called) - @patch('arkindex.documents.api.ml.Indexer') - def test_unique_zone(self, indexer): + get_layer_mock().send.assert_called_once_with('reindex', { + 'type': 'reindex.start', + 'element': str(self.page.id), + 'corpus': None, + 'transcriptions': True, + 'elements': True, + 'entities': False, + 'drop': False, + }) + + @patch('arkindex.project.triggers.get_channel_layer') + def test_unique_zone(self, get_layer_mock): """ Checks the view reuses zones when available """ + get_layer_mock.return_value.send = AsyncMock() self.client.force_login(self.user) ts = Transcription.objects.get(zone__image__path='img1', text="PARIS") response = self.client.post(reverse('api:transcription-create'), format='json', data={ @@ -63,12 +77,22 @@ class TestTranscriptionCreate(FixtureAPITestCase): }) self.assertEqual(response.status_code, status.HTTP_201_CREATED) self.assertEqual(Transcription.objects.get(text="GLOUBIBOULGA").zone.id, ts.zone.id) + get_layer_mock().send.assert_called_once_with('reindex', { + 'type': 'reindex.start', + 'element': str(self.page.id), + 'corpus': None, + 'transcriptions': True, + 'elements': True, + 'entities': False, + 'drop': False, + }) - @patch('arkindex.documents.api.ml.Indexer') - def test_update_transcription(self, indexer): + @patch('arkindex.project.triggers.get_channel_layer') + def test_update_transcription(self, get_layer_mock): """ Checks the view updates transcriptions when they already exist """ + get_layer_mock.return_value.send = AsyncMock() self.client.force_login(self.user) ts = Transcription.objects.get(zone__image__path='img1', text="PARIS") self.assertNotEqual(ts.score, 0.99) @@ -85,6 +109,16 @@ class TestTranscriptionCreate(FixtureAPITestCase): ts.refresh_from_db() self.assertEqual(ts.score, 0.99) + get_layer_mock().send.assert_called_once_with('reindex', { + 'type': 'reindex.start', + 'element': str(self.page.id), + 'corpus': None, + 'transcriptions': True, + 'elements': True, + 'entities': False, + 'drop': False, + }) + def test_invalid_data(self): """ Checks the view validates data properly @@ -123,12 +157,13 @@ class TestTranscriptionCreate(FixtureAPITestCase): self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) @patch('arkindex.project.serializer_fields.MLTool.get') - @patch('arkindex.images.importer.Indexer') - def test_create_bulk_transcription(self, indexer, ml_get_mock): + @patch('arkindex.project.triggers.get_channel_layer') + def test_create_bulk_transcription(self, get_layer_mock, ml_get_mock): """ Checks the view creates transcriptions, zones, links, paths and runs ES indexing Using bulk_transcriptions """ + get_layer_mock.return_value.send = AsyncMock() ml_get_mock.return_value.type = self.src.type ml_get_mock.return_value.slug = self.src.slug ml_get_mock.return_value.name = self.src.name @@ -176,18 +211,15 @@ class TestTranscriptionCreate(FixtureAPITestCase): self.assertEqual(page_ts.source, self.src) self.assertEqual(page_ts.zone, self.page.zone) - # Indexer called - self.assertEqual(indexer.return_value.run_index.call_count, 2) - indexer.reset_mock() - - # Call once more, ensuring creating the exact same transcriptions does not make duplicates - response = self.client.post(reverse('api:transcription-bulk'), format='json', data=request_data) - self.assertEqual(self.page.transcriptions.count(), 3) - self.assertCountEqual( - self.page.transcriptions.values_list('id', flat=True), - [word_ts.id, line_ts.id, page_ts.id], - ) - self.assertEqual(indexer.return_value.run_index.call_count, 0) + get_layer_mock().send.assert_called_once_with('reindex', { + 'type': 'reindex.start', + 'corpus': None, + 'element': str(self.page.id), + 'transcriptions': True, + 'elements': True, + 'entities': False, + 'drop': False, + }) @patch('arkindex.project.serializer_fields.MLTool.get') def test_bulk_transcription_no_zone(self, ml_get_mock): @@ -216,8 +248,9 @@ class TestTranscriptionCreate(FixtureAPITestCase): self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST) @patch('arkindex.project.serializer_fields.MLTool.get') - @patch('arkindex.images.importer.Indexer') - def test_update_bulk_transcription(self, indexer, ml_get_mock): + @patch('arkindex.project.triggers.get_channel_layer') + def test_update_bulk_transcription(self, get_layer_mock, ml_get_mock): + get_layer_mock.return_value.send = AsyncMock() ml_get_mock.return_value.type = self.src.type ml_get_mock.return_value.slug = self.src.slug ml_get_mock.return_value.name = self.src.name @@ -258,11 +291,18 @@ class TestTranscriptionCreate(FixtureAPITestCase): self.assertEqual(page_ts.source, self.src) self.assertEqual(page_ts.zone, self.page.zone) - # Indexer called - self.assertEqual(indexer.return_value.run_index.call_count, 2) + get_layer_mock().send.assert_called_once_with('reindex', { + 'type': 'reindex.start', + 'corpus': None, + 'element': str(self.page.id), + 'transcriptions': True, + 'elements': True, + 'entities': False, + 'drop': False, + }) # Update again - indexer.reset_mock() + get_layer_mock().send.reset_mock() data['transcriptions'] = [ { "type": "word", @@ -280,7 +320,15 @@ class TestTranscriptionCreate(FixtureAPITestCase): response = self.client.put(reverse('api:transcription-bulk'), format='json', data=data) self.assertEqual(response.status_code, status.HTTP_201_CREATED) - self.assertEqual(indexer.return_value.run_index.call_count, 2) + get_layer_mock().send.assert_called_once_with('reindex', { + 'type': 'reindex.start', + 'corpus': None, + 'element': str(self.page.id), + 'transcriptions': True, + 'elements': True, + 'entities': False, + 'drop': False, + }) # Previous transcriptions should be replaced by two Word and one Page transcription self.assertEqual(self.page.transcriptions.count(), 3) diff --git a/arkindex/images/importer.py b/arkindex/images/importer.py index 46313dc619b8ec4b467438451fabbebb3c8c7c49..3b7332f19eb398c2534254675b359966c651ef53 100644 --- a/arkindex/images/importer.py +++ b/arkindex/images/importer.py @@ -2,8 +2,7 @@ from collections import namedtuple from django.db import connection from arkindex_common.enums import TranscriptionType from arkindex.project.polygon import Polygon -from arkindex.documents.indexer import Indexer -from arkindex.documents.models import Element, Transcription +from arkindex.documents.models import Element from arkindex.images.models import Image, Zone import csv import io @@ -162,32 +161,3 @@ def save_transcriptions(*tr_polygons, delimiter='\t', quotechar='"'): ) return transcriptions - - -def index_transcriptions(items): - ''' - Index in ElasticSearch new transcriptions built above - TODO: Delete if both the PonosCommand deletion and Channels reindex are merged! - ''' - transcriptions = [] - for item in items: - if isinstance(item, tuple): - transcriptions.append(Transcription( - **dict(zip( - ('id', 'element_id', 'source_id', 'zone_id', 'type', 'text', 'score'), - item, - )) - )) - elif isinstance(item, Transcription): - transcriptions.append(item) - else: - raise ValueError('Items must be a list of tuples or Transcription instances') - - # Index transcriptions directly (IIIF search) - indexer = Indexer() - indexer.run_index(transcriptions) - - # Index transcriptions in elements - elements = Element.objects.filter(id__in=[t.element_id for t in transcriptions]) - if elements.exists(): - indexer.run_index(elements) diff --git a/arkindex/project/api_v1.py b/arkindex/project/api_v1.py index 4af5f90b57455d8cb0d02b804104d25df056e9d3..ff0c4714016725181e474357d9d2c572b9213b90 100644 --- a/arkindex/project/api_v1.py +++ b/arkindex/project/api_v1.py @@ -18,6 +18,7 @@ from arkindex.documents.api.entities import ( ElementLinks ) from arkindex.documents.api.iiif import FolderManifest, ElementAnnotationList, TranscriptionSearchAnnotationList +from arkindex.documents.api.admin import ReindexStart from arkindex.dataimport.api import ( DataImportsList, DataImportDetails, DataImportRetry, DataFileList, DataFileRetrieve, DataFileUpload, DataImportFromFiles, @@ -149,4 +150,7 @@ api = [ path('user/token/', UserEmailVerification.as_view(), name='user-token'), path('user/password-reset/', PasswordReset.as_view(), name='password-reset'), path('user/password-reset/confirm/', PasswordResetConfirm.as_view(), name='password-reset-confirm'), + + # Management tools + path('reindex/', ReindexStart.as_view(), name='reindex-start'), ] diff --git a/arkindex/project/routing.py b/arkindex/project/routing.py index 60f19878e014f3d94172b9f37f8f39f234636245..8fb6189ba68deef4e66e4eb5f648932de29453d7 100644 --- a/arkindex/project/routing.py +++ b/arkindex/project/routing.py @@ -1,6 +1,8 @@ from channels.auth import AuthMiddlewareStack -from channels.routing import ProtocolTypeRouter, URLRouter +from channels.routing import ProtocolTypeRouter, URLRouter, ChannelNameRouter from channels.security.websocket import AllowedHostsOriginValidator +from arkindex.project.triggers import REINDEX_CHANNEL +from arkindex.documents.consumers import ReindexConsumer application = ProtocolTypeRouter({ 'websocket': AllowedHostsOriginValidator( @@ -9,4 +11,7 @@ application = ProtocolTypeRouter({ ]), ), ), + 'channel': ChannelNameRouter({ + REINDEX_CHANNEL: ReindexConsumer, + }) }) diff --git a/arkindex/project/settings.py b/arkindex/project/settings.py index d860f56880919a39b2ea7de681c18a88e785caec..eab4794635ccd8a310ceea7e30db2e230e7bfc0e 100644 --- a/arkindex/project/settings.py +++ b/arkindex/project/settings.py @@ -280,12 +280,13 @@ CACHES = { } # Django Channels layer using Redis +REDIS_HOST = os.environ.get('REDIS_HOST', 'localhost') CHANNEL_LAYERS = { "default": { "BACKEND": "channels_redis.core.RedisChannelLayer", "CONFIG": { "hosts": [ - (os.environ.get('REDIS_HOST', 'localhost'), 6379) + (REDIS_HOST, 6379) ], }, }, diff --git a/arkindex/project/triggers.py b/arkindex/project/triggers.py new file mode 100644 index 0000000000000000000000000000000000000000..472c915da0269aafbe5e1c286e6e775a784bbbe9 --- /dev/null +++ b/arkindex/project/triggers.py @@ -0,0 +1,51 @@ +""" +Helper methods to trigger tasks in asynchronous workers +""" +from typing import Union +from uuid import UUID +from asgiref.sync import async_to_sync +from channels.layers import get_channel_layer +from arkindex.documents.models import Element, Corpus + +REINDEX_CHANNEL = 'reindex' + +ACTION_REINDEX_START = 'reindex.start' + + +def reindex_start(*, + element: Union[Element, UUID, str] = None, + corpus: Union[Corpus, UUID, str] = None, + transcriptions: bool = False, + elements: bool = False, + entities: bool = False, + drop: bool = False) -> None: + """ + Reindex elements into ElasticSearch. + + If `element` and `corpus` are left unspecified, all elements will be picked. + If `element` is specified, its children will be included. + + `drop` will REMOVE all indexes entirely and recreate them before indexing. + This is only allowed when reindexing all elements. + """ + element_id = None + corpus_id = None + if isinstance(element, Element): + element_id = str(element.id) + elif element: + element_id = str(element) + + if isinstance(corpus, Corpus): + corpus_id = str(corpus.id) + elif corpus: + corpus_id = str(corpus) + + async_to_sync(get_channel_layer().send)(REINDEX_CHANNEL, { + 'type': ACTION_REINDEX_START, + 'element': element_id, + 'corpus': corpus_id, + 'transcriptions': transcriptions, + 'elements': elements, + 'entities': entities, + 'drop': drop, + }) diff --git a/openapi/patch.yml b/openapi/patch.yml index 9d6b6ae748ffb9128d777e9f40cd7acde3a0e82f..3dad786b97eb128dc7315464e1bbc553d56b97f6 100644 --- a/openapi/patch.yml +++ b/openapi/patch.yml @@ -39,6 +39,8 @@ tags: description: Machine Learning tools and results - name: entities - name: users + - name: management + description: Admin-only tools paths: /api/v1/classification/bulk/: post: diff --git a/tests-requirements.txt b/tests-requirements.txt index 6c1358c164ff9bac9813f2de73bb6a8f2e9d9b3d..8381422153315868f5fa9f0085c686e5a0b3a6ae 100644 --- a/tests-requirements.txt +++ b/tests-requirements.txt @@ -4,3 +4,4 @@ django-nose coverage uritemplate==3 responses +asyncmock==0.4.1