From d94722e934955f7ebea5943a56086db1eff49d8b Mon Sep 17 00:00:00 2001
From: Bastien Abadie <bastien@nextcairn.com>
Date: Fri, 20 Oct 2017 12:38:57 +0200
Subject: [PATCH] Use bulk insert for indexes

---
 requirements.txt                              |  6 +++
 .../management/commands/import_indexes.py     | 20 ++++++--
 src/documents/models.py                       | 46 +++++++++++++------
 3 files changed, 56 insertions(+), 16 deletions(-)

diff --git a/requirements.txt b/requirements.txt
index ee06c330d0..0307fbc37f 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,6 +1,12 @@
+certifi==2017.7.27.1
+chardet==3.0.4
 Django==1.11.6
 djangorestframework==3.7.1
 elasticsearch==5.4.0
+idna==2.6
+olefile==0.44
+Pillow==4.3.0
 pkg-resources==0.0.0
 pytz==2017.2
+requests==2.18.4
 urllib3==1.22
diff --git a/src/documents/management/commands/import_indexes.py b/src/documents/management/commands/import_indexes.py
index 1c4645a14c..ac68f2c04c 100644
--- a/src/documents/management/commands/import_indexes.py
+++ b/src/documents/management/commands/import_indexes.py
@@ -30,8 +30,17 @@ class Command(BaseCommand):
         )
 
     def handle(self, *args, **options):
+        imported = 0
         for path in options['files']:
-            self.import_indexes(path, options['iiif_base_url'])
+            try:
+                self.import_indexes(path, options['iiif_base_url'])
+                imported += 1
+            except Exception as e:
+                logger.error(e)
+        logger.info('Imported {}/{} index files'.format(
+            imported,
+            len(options['files']),
+        ))
 
     def import_indexes(self, index_path, iiif_base_url, extension='jpg'):
         """
@@ -50,10 +59,13 @@ class Command(BaseCommand):
             extension,
         )
         image, _ = Image.objects.get_or_create(iiif_url=url)
+        assert image.check_source(), \
+            'Image not found {}'.format(url)
         logger.info('Image {}'.format(image))
 
         # Import each index
         with gzip.open(index_path) as f:
+            indexes = []
             for line in f.readlines():
                 if line.decode('utf-8').startswith('#'):
                     continue
@@ -63,7 +75,7 @@ class Command(BaseCommand):
                 else:
                     # Store index
                     data = index.groups()
-                    image.add_index(
+                    indexes.append(image.build_index(
                         line=int(data[0]),
                         word=data[1].decode('utf-8'),
                         score=float(data[2]),
@@ -73,4 +85,6 @@ class Command(BaseCommand):
                             int(data[5]),
                             int(data[6]),
                         ],
-                    )
+                    ))
+
+            image.store_indexes(indexes)
diff --git a/src/documents/models.py b/src/documents/models.py
index e0bbe94fa0..9468849530 100644
--- a/src/documents/models.py
+++ b/src/documents/models.py
@@ -1,6 +1,8 @@
 from django.db import models
 from django.conf import settings
 from elasticsearch import Elasticsearch
+from elasticsearch.helpers import bulk as es_bulk, scan as es_scan
+import requests
 import hashlib
 import json
 import base64
@@ -19,9 +21,17 @@ class Image(models.Model):
     def __str__(self):
         return '{} - {}'.format(self.id, self.iiif_url)
 
-    def add_index(self, line, word, score, box):
+    def check_source(self):
         """
-        Store an index in ElasticSearch
+        Check the image is available through this url
+        """
+        info_url = '{}/info.json'.format(self.iiif_url)
+        resp = requests.get(info_url, allow_redirects=True)
+        return resp.ok
+
+    def build_index(self, line, word, score, box):
+        """
+        Build an index to store in ElasticSearch
         """
         assert isinstance(line, int)
         assert isinstance(word, str)
@@ -41,16 +51,26 @@ class Image(models.Model):
         h.update(json.dumps(payload, sort_keys=True).encode('utf-8'))
         index_id = base64.urlsafe_b64encode(h.digest()).decode('utf-8')
 
-        # Insert in ElasticSearch database
+        # To insert in ElasticSearch database
         # using the calculated index to avoid duplicates
+        return (index_id, payload)
+
+    def store_indexes(self, indexes):
+        """
+        Store indexes in Bulk mode
+        """
+        assert isinstance(indexes, list)
+
+        # Build raw ElasticSearch insert
+        actions = [{
+            '_index': 'volume_xxx', # TODO: use volume ?
+            '_type': 'word',
+            '_source': index_data,
+            '_id': index_id,
+        } for index_id, index_data in indexes]
+
+        # Run actions in bulk
         elastic = Elasticsearch(settings.ELASTIC_SEARCH_HOSTS)
-        res = elastic.index(
-            index='volume_xxx', # TODO: use volume ?
-            doc_type='word',
-            id=index_id,
-            body=payload,
-        )
-        assert res['result'] in ('created', 'updated'), \
-            'Failed to insert word index {} : {}'.format(index_id, res)
-
-        return res['created']
+        nb_insert, _ = es_bulk(elastic, actions)
+
+        return nb_insert
-- 
GitLab