From 058d9a8f8136983005ff1a31ea1b8edfbd97e099 Mon Sep 17 00:00:00 2001 From: Bastien Abadie <bastien@nextcairn.com> Date: Mon, 29 Mar 2021 22:51:50 +0200 Subject: [PATCH] Working low level tests --- arkindex_worker/cache.py | 33 +-- arkindex_worker/worker.py | 7 +- tests/conftest.py | 63 ++--- tests/test_merge.py | 508 ++++++++++++++++++-------------------- 4 files changed, 288 insertions(+), 323 deletions(-) diff --git a/arkindex_worker/cache.py b/arkindex_worker/cache.py index ea7adf1a..984d697b 100644 --- a/arkindex_worker/cache.py +++ b/arkindex_worker/cache.py @@ -80,7 +80,7 @@ def create_tables(): db.create_tables([CachedElement, CachedTranscription]) -def merge_parents_caches(parent_ids, current_database, data_dir="/data"): +def merge_parents_cache(parent_ids, current_database, data_dir="/data"): """ Merge all the potential parent task's databases into the existing local one """ @@ -102,20 +102,21 @@ def merge_parents_caches(parent_ids, current_database, data_dir="/data"): # Open a connection on current database connection = sqlite3.connect(current_database) - - with connection.cursor() as cursor: - for idx, path in enumerate(paths): - # Merge each table into the local database - statements = [ - "PRAGMA page_size=80000;", - "PRAGMA synchronous=OFF;", - f"ATTACH DATABASE '{path}' AS source_{idx};", - f"REPLACE INTO elements SELECT * FROM source_{idx}.elements;", - f"REPLACE INTO transcriptions SELECT * FROM source_{idx}.transcriptions;", - ] - - for statement in statements: - cursor.execute(statement) - connection.commit() + cursor = connection.cursor() + + # Merge each table into the local database + for idx, path in enumerate(paths): + logger.info(f"Merging parent db {path} into {current_database}") + statements = [ + "PRAGMA page_size=80000;", + "PRAGMA synchronous=OFF;", + f"ATTACH DATABASE '{path}' AS source_{idx};", + f"REPLACE INTO elements SELECT * FROM source_{idx}.elements;", + f"REPLACE INTO transcriptions SELECT * FROM source_{idx}.transcriptions;", + ] + + for statement in statements: + cursor.execute(statement) + connection.commit() # TODO: maybe reopen peewee connection ? diff --git a/arkindex_worker/worker.py b/arkindex_worker/worker.py index 97c3a72d..19a04641 100644 --- a/arkindex_worker/worker.py +++ b/arkindex_worker/worker.py @@ -154,10 +154,9 @@ class BaseWorker(object): self.secrets = {name: self.load_secret(name) for name in required_secrets} # Merging parents caches (if there are any) in the current task local cache - if self.cache and os.environ.get("TASK_ID"): - task = self.api_client.request( - "RetrieveTaskFromAgent", id=os.environ.get("TASK_ID") - ) + task_id = os.environ.get("TASK_ID") + if self.use_cache and task_id is not None: + task = self.api_client.request("RetrieveTaskFromAgent", id=task_id) merge_parents_cache(task["parents"], self.cache_path) def load_secret(self, name): diff --git a/tests/conftest.py b/tests/conftest.py index 64aba5d6..7f4c904c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -9,6 +9,7 @@ from uuid import UUID import pytest import yaml +from peewee import SqliteDatabase from arkindex.mock import MockApiClient from arkindex_worker.cache import CachedElement, CachedTranscription @@ -98,42 +99,6 @@ def temp_working_directory(monkeypatch, tmp_path): monkeypatch.setattr(os, "getcwd", _getcwd) -@pytest.fixture -def first_parent_folder(): - cache_dir = f"{CACHE_DIR}/first_parent_id" - os.mkdir(cache_dir) - yield - if os.path.isdir(cache_dir): - os.rmdir(cache_dir) - - -@pytest.fixture -def second_parent_folder(): - cache_dir = f"{CACHE_DIR}/second_parent_id" - os.mkdir(cache_dir) - yield - if os.path.isdir(cache_dir): - os.rmdir(cache_dir) - - -@pytest.fixture -def first_parent_cache(first_parent_folder): - parent_cache = LocalDB(f"{CACHE_DIR}/first_parent_id/db.sqlite") - parent_cache.create_tables() - yield - if os.path.isfile(parent_cache.path): - os.remove(parent_cache.path) - - -@pytest.fixture -def second_parent_cache(second_parent_folder): - parent_cache = LocalDB(f"{CACHE_DIR}/second_parent_id/db.sqlite") - parent_cache.create_tables() - yield - if os.path.isfile(parent_cache.path): - os.remove(parent_cache.path) - - @pytest.fixture(autouse=True) def give_worker_version_id_env_variable(monkeypatch): monkeypatch.setenv("WORKER_VERSION_ID", "12341234-1234-1234-1234-123412341234") @@ -208,12 +173,12 @@ def mock_elements_worker(monkeypatch, mock_worker_version_api): @pytest.fixture def mock_base_worker_with_cache(mocker, monkeypatch, mock_worker_version_api): - """Build a BaseWorker using SQLite cache""" + """Build a BaseWorker using SQLite cache, also mocking a TASK_ID""" monkeypatch.setattr(sys, "argv", ["worker"]) worker = BaseWorker(use_cache=True) monkeypatch.setenv("TASK_ID", "my_task") - mocker.patch("arkindex_worker.worker.DATA_DIR", CACHE_DIR) + # mocker.patch("arkindex_worker.worker.DATA_DIR", CACHE_DIR) return worker @@ -322,3 +287,25 @@ def mock_cached_transcriptions(): confidence=0.42, worker_version_id=UUID("90129012-9012-9012-9012-901290129012"), ) + + +@pytest.fixture +def mock_databases(tmpdir): + """ + Initialize several temporary databases + to help testing the merge algorithm + """ + out = {} + for name in ("target", "first", "second"): + # Build a local database in sub directory + # for each name required + path = tmpdir / name / "db.sqlite" + (tmpdir / name).mkdir() + local_db = SqliteDatabase(None) + local_db.init(path, pragmas={}) + local_db.connect() + local_db.bind([CachedElement, CachedTranscription]) + local_db.create_tables([CachedElement, CachedTranscription]) + out[name] = {"path": path, "db": local_db} + + return out diff --git a/tests/test_merge.py b/tests/test_merge.py index 9e01234b..3f2be59d 100644 --- a/tests/test_merge.py +++ b/tests/test_merge.py @@ -1,298 +1,276 @@ # -*- coding: utf-8 -*- -import json from pathlib import Path +from uuid import UUID -from arkindex_worker.cache import CachedElement, CachedTranscription, LocalDB -from arkindex_worker.utils import convert_str_uuid_to_hex +from arkindex_worker.cache import CachedElement, merge_parents_cache CACHE_DIR = str(Path(__file__).resolve().parent / "data/cache") FIRST_PARENT_CACHE = f"{CACHE_DIR}/first_parent_id/db.sqlite" SECOND_PARENT_CACHE = f"{CACHE_DIR}/second_parent_id/db.sqlite" -FIRST_ELEM_TO_INSERT = CachedElement( - id=convert_str_uuid_to_hex("11111111-1111-1111-1111-111111111111"), - parent_id=convert_str_uuid_to_hex("12341234-1234-1234-1234-123412341234"), - type="something", - polygon=json.dumps([[1, 1], [2, 2], [2, 1], [1, 2]]), - worker_version_id=convert_str_uuid_to_hex("56785678-5678-5678-5678-567856785678"), -) -SECOND_ELEM_TO_INSERT = CachedElement( - id=convert_str_uuid_to_hex("22222222-2222-2222-2222-222222222222"), - parent_id=convert_str_uuid_to_hex("12341234-1234-1234-1234-123412341234"), - type="something", - polygon=json.dumps([[1, 1], [2, 2], [2, 1], [1, 2]]), - worker_version_id=convert_str_uuid_to_hex("56785678-5678-5678-5678-567856785678"), -) -FIRST_TR_TO_INSERT = CachedTranscription( - id=convert_str_uuid_to_hex("11111111-1111-1111-1111-111111111111"), - element_id=convert_str_uuid_to_hex("11111111-1111-1111-1111-111111111111"), - text="Hello!", - confidence=0.42, - worker_version_id=convert_str_uuid_to_hex("56785678-5678-5678-5678-567856785678"), -) -SECOND_TR_TO_INSERT = CachedTranscription( - id=convert_str_uuid_to_hex("22222222-2222-2222-2222-222222222222"), - element_id=convert_str_uuid_to_hex("22222222-2222-2222-2222-222222222222"), - text="How are you?", - confidence=0.42, - worker_version_id=convert_str_uuid_to_hex("56785678-5678-5678-5678-567856785678"), -) - - -def test_configure_cache_merging_no_parent(responses, mock_base_worker_with_cache): - responses.add( - responses.GET, - "http://testserver/ponos/v1/task/my_task/from-agent/", - status=200, - json={"parents": []}, - ) - cache_path = mock_base_worker_with_cache.cache.path - with open(cache_path, "rb") as before_file: + +def test_merge_no_parent(mock_databases, tmpdir): + """ + Check the db merge algorithm does nothing when no parents are + available + """ + with open(mock_databases["target"]["path"], "rb") as before_file: before = before_file.read() - mock_base_worker_with_cache.configure() + merge_parents_cache( + [ + "first", + ], + mock_databases["target"]["path"], + data_dir=tmpdir, + ) - with open(cache_path, "rb") as after_file: - after = after_file.read() + with open(mock_databases["target"]["path"], "rb") as before_file: + after = before_file.read() assert before == after, "Cache was modified" - assert len(responses.calls) == 3 - assert [call.request.url for call in responses.calls] == [ - "http://testserver/api/v1/user/", - "http://testserver/api/v1/workers/versions/12341234-1234-1234-1234-123412341234/", - "http://testserver/ponos/v1/task/my_task/from-agent/", - ] - - -def test_configure_cache_merging_one_parent_without_file( - responses, mock_base_worker_with_cache, first_parent_folder -): - responses.add( - responses.GET, - "http://testserver/ponos/v1/task/my_task/from-agent/", - status=200, - json={"parents": ["first_parent_id"]}, - ) + with mock_databases["target"]["db"].bind_ctx( + [ + CachedElement, + ] + ): + assert CachedElement.select().count() == 0 + - cache_path = mock_base_worker_with_cache.cache.path - with open(cache_path, "rb") as before_file: +def test_merge_no_file(mock_databases, tmpdir): + """ + Check the db merge algorithm does nothing when the parent file + does not exist + """ + with open(mock_databases["target"]["path"], "rb") as before_file: before = before_file.read() - mock_base_worker_with_cache.configure() + merge_parents_cache( + [ + "first", + ], + mock_databases["target"]["path"], + data_dir=tmpdir, + ) - with open(cache_path, "rb") as after_file: - after = after_file.read() + with open(mock_databases["target"]["path"], "rb") as before_file: + after = before_file.read() assert before == after, "Cache was modified" - assert len(responses.calls) == 3 - assert [call.request.url for call in responses.calls] == [ - "http://testserver/api/v1/user/", - "http://testserver/api/v1/workers/versions/12341234-1234-1234-1234-123412341234/", - "http://testserver/ponos/v1/task/my_task/from-agent/", - ] - - -def test_configure_cache_merging_one_parent( - responses, mock_base_worker_with_cache, first_parent_cache -): - parent_cache = LocalDB(FIRST_PARENT_CACHE) - parent_cache.insert("elements", [FIRST_ELEM_TO_INSERT]) - parent_cache.insert("transcriptions", [FIRST_TR_TO_INSERT]) - - responses.add( - responses.GET, - "http://testserver/ponos/v1/task/my_task/from-agent/", - status=200, - json={"parents": ["first_parent_id"]}, + with mock_databases["target"]["db"].bind_ctx( + [ + CachedElement, + ] + ): + assert CachedElement.select().count() == 0 + + +def test_merge_one_file(mock_databases, tmpdir): + """ + Check the db merge algorithm support one parent + and add an element from first into the target + """ + # At first we have nothing in target + with mock_databases["target"]["db"].bind_ctx( + [ + CachedElement, + ] + ): + assert CachedElement.select().count() == 0 + + # Add an element in first parent database + with mock_databases["first"]["db"].bind_ctx( + [ + CachedElement, + ] + ): + CachedElement.create( + id=UUID("12341234-1234-1234-1234-123412341234"), + type="page", + polygon="[[1, 1], [2, 2], [2, 1], [1, 2]]", + worker_version_id=UUID("56785678-5678-5678-5678-567856785678"), + ) + + with open(mock_databases["target"]["path"], "rb") as before_file: + before = before_file.read() + + merge_parents_cache( + [ + "first", + ], + mock_databases["target"]["path"], + data_dir=tmpdir, ) - mock_base_worker_with_cache.configure() + with open(mock_databases["target"]["path"], "rb") as before_file: + after = before_file.read() + + assert before != after, "Cache was not modified" + + with mock_databases["target"]["db"].bind_ctx( + [ + CachedElement, + ] + ): + assert CachedElement.select().count() == 1 + + +def test_merge_two_files(mock_databases, tmpdir): + """ + Check the db merge algorithm support two parents + and add an element from first into the target + but also another one from second into the target + """ + # At first we have nothing in target + with mock_databases["target"]["db"].bind_ctx( + [ + CachedElement, + ] + ): + assert CachedElement.select().count() == 0 + + # Add an element in first parent database + with mock_databases["first"]["db"].bind_ctx( + [ + CachedElement, + ] + ): + CachedElement.create( + id=UUID("12341234-1234-1234-1234-123412341234"), + type="page", + polygon="[[1, 1], [2, 2], [2, 1], [1, 2]]", + worker_version_id=UUID("56785678-5678-5678-5678-567856785678"), + ) + + # Add another element in second parent database + with mock_databases["second"]["db"].bind_ctx( + [ + CachedElement, + ] + ): + CachedElement.create( + id=UUID("56785678-5678-5678-5678-567856785678"), + type="page", + polygon="[[1, 1], [2, 2], [2, 1], [1, 2]]", + worker_version_id=UUID("56785678-5678-5678-5678-567856785678"), + ) + + with open(mock_databases["target"]["path"], "rb") as before_file: + before = before_file.read() - stored_rows = mock_base_worker_with_cache.cache.cursor.execute( - "SELECT * FROM elements" - ).fetchall() - assert ( - stored_rows == parent_cache.cursor.execute("SELECT * FROM elements").fetchall() - ) - assert [CachedElement(**dict(row)) for row in stored_rows] == [FIRST_ELEM_TO_INSERT] - - stored_rows = mock_base_worker_with_cache.cache.cursor.execute( - "SELECT * FROM transcriptions" - ).fetchall() - assert ( - stored_rows - == parent_cache.cursor.execute("SELECT * FROM transcriptions").fetchall() - ) - assert [CachedTranscription(**dict(row)) for row in stored_rows] == [ - FIRST_TR_TO_INSERT - ] - - assert len(responses.calls) == 3 - assert [call.request.url for call in responses.calls] == [ - "http://testserver/api/v1/user/", - "http://testserver/api/v1/workers/versions/12341234-1234-1234-1234-123412341234/", - "http://testserver/ponos/v1/task/my_task/from-agent/", - ] - - -def test_configure_cache_merging_multiple_parents_one_file( - responses, mock_base_worker_with_cache, first_parent_cache, second_parent_folder -): - parent_cache = LocalDB(FIRST_PARENT_CACHE) - parent_cache.insert("elements", [FIRST_ELEM_TO_INSERT]) - parent_cache.insert("transcriptions", [FIRST_TR_TO_INSERT]) - - responses.add( - responses.GET, - "http://testserver/ponos/v1/task/my_task/from-agent/", - status=200, - json={"parents": ["first_parent_id", "second_parent_id"]}, + merge_parents_cache( + [ + "first", + "second", + ], + mock_databases["target"]["path"], + data_dir=tmpdir, ) - mock_base_worker_with_cache.configure() + with open(mock_databases["target"]["path"], "rb") as before_file: + after = before_file.read() + + assert before != after, "Cache was not modified" + + with mock_databases["target"]["db"].bind_ctx( + [ + CachedElement, + ] + ): + assert CachedElement.select().count() == 2 + assert [e.id for e in CachedElement.select().order_by("id")] == [ + UUID("12341234-1234-1234-1234-123412341234"), + UUID("56785678-5678-5678-5678-567856785678"), + ] + + +def test_merge_conflict(mock_databases, tmpdir): + """ + Check the db merge algorithm support two parents + with conflicting elements + """ + # At first we have nothing in target + with mock_databases["target"]["db"].bind_ctx( + [ + CachedElement, + ] + ): + assert CachedElement.select().count() == 0 + + # Add an element in first parent database + with mock_databases["first"]["db"].bind_ctx( + [ + CachedElement, + ] + ): + CachedElement.create( + id=UUID("12341234-1234-1234-1234-123412341234"), + type="page", + polygon="[[1, 1], [2, 2], [2, 1], [1, 2]]", + worker_version_id=UUID("56785678-5678-5678-5678-567856785678"), + ) + CachedElement.create( + id=UUID("56785678-5678-5678-5678-567856785678"), + type="page", + polygon="[[1, 1], [2, 2], [2, 1], [1, 2]]", + worker_version_id=UUID("56785678-5678-5678-5678-567856785678"), + ) + + # Add another element in second parent database + with mock_databases["second"]["db"].bind_ctx( + [ + CachedElement, + ] + ): + + # This element will be skipped + CachedElement.create( + id=UUID("56785678-5678-5678-5678-567856785678"), + type="page", + polygon="[[1, 1], [2, 2], [2, 1], [1, 2]]", + worker_version_id=UUID("56785678-5678-5678-5678-567856785678"), + ) + + # This one will be inserted + CachedElement.create( + id=UUID("43214321-4321-4321-4321-432143214321"), + type="page", + polygon="[[1, 1], [2, 2], [2, 1], [1, 2]]", + worker_version_id=UUID("56785678-5678-5678-5678-567856785678"), + ) + + with open(mock_databases["target"]["path"], "rb") as before_file: + before = before_file.read() - stored_rows = mock_base_worker_with_cache.cache.cursor.execute( - "SELECT * FROM elements" - ).fetchall() - assert ( - stored_rows == parent_cache.cursor.execute("SELECT * FROM elements").fetchall() - ) - assert [CachedElement(**dict(row)) for row in stored_rows] == [FIRST_ELEM_TO_INSERT] - - stored_rows = mock_base_worker_with_cache.cache.cursor.execute( - "SELECT * FROM transcriptions" - ).fetchall() - assert ( - stored_rows - == parent_cache.cursor.execute("SELECT * FROM transcriptions").fetchall() - ) - assert [CachedTranscription(**dict(row)) for row in stored_rows] == [ - FIRST_TR_TO_INSERT - ] - - assert len(responses.calls) == 3 - assert [call.request.url for call in responses.calls] == [ - "http://testserver/api/v1/user/", - "http://testserver/api/v1/workers/versions/12341234-1234-1234-1234-123412341234/", - "http://testserver/ponos/v1/task/my_task/from-agent/", - ] - - -def test_configure_cache_merging_multiple_parents_differing_lines( - responses, mock_base_worker_with_cache, first_parent_cache, second_parent_cache -): - # Inserting differing lines in both parents caches - parent_cache = LocalDB(FIRST_PARENT_CACHE) - parent_cache = LocalDB(FIRST_PARENT_CACHE) - parent_cache.insert("elements", [FIRST_ELEM_TO_INSERT]) - parent_cache.insert("transcriptions", [FIRST_TR_TO_INSERT]) - second_parent_cache = LocalDB(SECOND_PARENT_CACHE) - second_parent_cache.insert("elements", [SECOND_ELEM_TO_INSERT]) - second_parent_cache.insert("transcriptions", [SECOND_TR_TO_INSERT]) - - responses.add( - responses.GET, - "http://testserver/ponos/v1/task/my_task/from-agent/", - status=200, - json={"parents": ["first_parent_id", "second_parent_id"]}, + merge_parents_cache( + [ + "first", + "second", + ], + mock_databases["target"]["path"], + data_dir=tmpdir, ) - mock_base_worker_with_cache.configure() + with open(mock_databases["target"]["path"], "rb") as before_file: + after = before_file.read() - stored_rows = mock_base_worker_with_cache.cache.cursor.execute( - "SELECT * FROM elements" - ).fetchall() - assert ( - stored_rows - == parent_cache.cursor.execute("SELECT * FROM elements").fetchall() - + second_parent_cache.cursor.execute("SELECT * FROM elements").fetchall() - ) - assert [CachedElement(**dict(row)) for row in stored_rows] == [ - FIRST_ELEM_TO_INSERT, - SECOND_ELEM_TO_INSERT, - ] - - stored_rows = mock_base_worker_with_cache.cache.cursor.execute( - "SELECT * FROM transcriptions" - ).fetchall() - assert ( - stored_rows - == parent_cache.cursor.execute("SELECT * FROM transcriptions").fetchall() - + second_parent_cache.cursor.execute("SELECT * FROM transcriptions").fetchall() - ) - assert [CachedTranscription(**dict(row)) for row in stored_rows] == [ - FIRST_TR_TO_INSERT, - SECOND_TR_TO_INSERT, - ] - - assert len(responses.calls) == 3 - assert [call.request.url for call in responses.calls] == [ - "http://testserver/api/v1/user/", - "http://testserver/api/v1/workers/versions/12341234-1234-1234-1234-123412341234/", - "http://testserver/ponos/v1/task/my_task/from-agent/", - ] - - -def test_configure_cache_merging_multiple_parents_identical_lines( - responses, mock_base_worker_with_cache, first_parent_cache, second_parent_cache -): - # Inserting identical lines in both parents caches - parent_cache = LocalDB(FIRST_PARENT_CACHE) - parent_cache.insert("elements", [FIRST_ELEM_TO_INSERT, SECOND_ELEM_TO_INSERT]) - parent_cache.insert("transcriptions", [FIRST_TR_TO_INSERT, SECOND_TR_TO_INSERT]) - second_parent_cache = LocalDB(SECOND_PARENT_CACHE) - second_parent_cache.insert( - "elements", [FIRST_ELEM_TO_INSERT, SECOND_ELEM_TO_INSERT] - ) - second_parent_cache.insert( - "transcriptions", [FIRST_TR_TO_INSERT, SECOND_TR_TO_INSERT] - ) + assert before != after, "Cache was not modified" - responses.add( - responses.GET, - "http://testserver/ponos/v1/task/my_task/from-agent/", - status=200, - json={"parents": ["first_parent_id", "second_parent_id"]}, - ) + with mock_databases["target"]["db"].bind_ctx( + [ + CachedElement, + ] + ): + assert CachedElement.select().count() == 3 + assert [e.id for e in CachedElement.select().order_by("id")] == [ + UUID("12341234-1234-1234-1234-123412341234"), + UUID("56785678-5678-5678-5678-567856785678"), + UUID("43214321-4321-4321-4321-432143214321"), + ] - mock_base_worker_with_cache.configure() - stored_rows = mock_base_worker_with_cache.cache.cursor.execute( - "SELECT * FROM elements" - ).fetchall() - assert ( - stored_rows == parent_cache.cursor.execute("SELECT * FROM elements").fetchall() - ) - assert ( - stored_rows - == second_parent_cache.cursor.execute("SELECT * FROM elements").fetchall() - ) - assert [CachedElement(**dict(row)) for row in stored_rows] == [ - FIRST_ELEM_TO_INSERT, - SECOND_ELEM_TO_INSERT, - ] - - stored_rows = mock_base_worker_with_cache.cache.cursor.execute( - "SELECT * FROM transcriptions" - ).fetchall() - assert ( - stored_rows - == parent_cache.cursor.execute("SELECT * FROM transcriptions").fetchall() - ) - assert ( - stored_rows - == second_parent_cache.cursor.execute("SELECT * FROM transcriptions").fetchall() - ) - assert [CachedTranscription(**dict(row)) for row in stored_rows] == [ - FIRST_TR_TO_INSERT, - SECOND_TR_TO_INSERT, - ] - - assert len(responses.calls) == 3 - assert [call.request.url for call in responses.calls] == [ - "http://testserver/api/v1/user/", - "http://testserver/api/v1/workers/versions/12341234-1234-1234-1234-123412341234/", - "http://testserver/ponos/v1/task/my_task/from-agent/", - ] +# TODO: add a unit test using base worker + +# TODO: add a unit test using base worker -- GitLab