Skip to content
Snippets Groups Projects
Commit 42fb504b authored by Manon Blanco's avatar Manon Blanco
Browse files

Configure + Reuse `process_information`

parent c01c20e9
No related branches found
No related tags found
1 merge request!2Port init elements code
Pipeline #165159 passed
This commit is part of merge request !2. Comments created here will be created in the context of that merge request.
import os
import sys
import pytest
......@@ -7,8 +8,13 @@ from arkindex_worker.worker.base import BaseWorker
from worker_init_elements.worker import InitElementWorker
@pytest.fixture()
def mock_api_client() -> MockApiClient:
return MockApiClient()
@pytest.fixture(autouse=True)
def _setup_environment(responses, monkeypatch) -> None:
def _setup_environment(mock_api_client: MockApiClient, responses, monkeypatch) -> None:
"""Setup needed environment variables"""
# Allow accessing remote API schemas
......@@ -28,25 +34,80 @@ def _setup_environment(responses, monkeypatch) -> None:
# Setup a mock api client instead of using a real one
def mock_setup_api_client(self):
self.api_client = MockApiClient()
self.api_client = mock_api_client
monkeypatch.setattr(BaseWorker, "setup_api_client", mock_setup_api_client)
@pytest.fixture()
def mock_worker(tmp_path_factory) -> InitElementWorker:
worker = InitElementWorker()
def _mock_worker_run_api(mock_api_client: MockApiClient) -> None:
"""Provide a mock API response to get worker run information"""
mock_api_client.add_response(
"RetrieveWorkerRun",
id=os.getenv("ARKINDEX_WORKER_RUN_ID"),
response={
"id": os.getenv("ARKINDEX_WORKER_RUN_ID"),
"worker_version": {
"id": "12341234-1234-1234-1234-123412341234",
"revision": {"hash": "deadbeef1234"},
"worker": {"name": "Fake worker"},
"configuration": {
"name": "Init Elements",
"slug": "init-elements",
"type": "extractor",
"docker": {
"build": "Dockerfile",
"image": "",
"command": None,
"context": None,
"shm_size": None,
"environment": {},
},
"secrets": [],
"description": None,
"configuration": {},
"user_configuration": {
"chunks_number": {
"type": "int",
"title": "Chunks number",
"default": 1,
"required": True,
},
"use_cache": {
"type": "bool",
"title": "Use cache",
"default": False,
},
"threshold_value": {
"type": "float",
"title": "Threshold Value",
"default": 0.1,
"subtype": "number",
"required": False,
},
"sleep": {
"type": "float",
"title": "Sleep",
"default": 0.0,
},
},
},
},
"configuration": None,
"process": {"id": "process_id", "corpus": os.getenv("ARKINDEX_CORPUS_ID")},
"summary": os.getenv("ARKINDEX_WORKER_RUN_ID") + " @ version 1",
},
)
# Default parameters
worker.chunks_number = 1
worker.use_cache = False
worker.work_dir = tmp_path_factory.mktemp("data")
@pytest.fixture()
def mock_worker(
_mock_worker_run_api, tmp_path_factory, monkeypatch
) -> InitElementWorker:
monkeypatch.setattr(sys, "argv", ["worker-init-elements"])
worker.worker_run_id = os.environ["ARKINDEX_WORKER_RUN_ID"]
worker.process = {
"id": "process_id",
"corpus": os.environ["ARKINDEX_CORPUS_ID"],
}
worker = InitElementWorker()
worker.work_dir = tmp_path_factory.mktemp("data")
worker.configure()
return worker
......@@ -40,7 +40,7 @@ def test_run_process(use_cache, mock_worker):
mock_worker.api_client.add_response(
"RetrieveProcess",
id=mock_worker.process["id"],
id=mock_worker.process_information["id"],
response={
"activity_state": "ready",
"corpus": "corpusid",
......@@ -48,15 +48,15 @@ def test_run_process(use_cache, mock_worker):
)
mock_worker.api_client.add_response(
"RetrieveCorpus",
id=mock_worker.process["corpus"],
id=mock_worker.process_information["corpus"],
response={
"id": mock_worker.process["corpus"],
"id": mock_worker.process_information["corpus"],
"types": [{"id": "A", "slug": "class"}, {"id": "B", "slug": "student"}],
},
)
mock_worker.api_client.add_response(
"ListProcessElements",
id=mock_worker.process["id"],
id=mock_worker.process_information["id"],
with_image=mock_worker.use_cache,
allow_missing_data=True,
page_size=INIT_PAGE_SIZE,
......@@ -96,7 +96,7 @@ def test_run_process(use_cache, mock_worker):
],
)
mock_worker.run()
mock_worker.process()
check_json(
json_path=mock_worker.work_dir / "elements.json",
......@@ -176,7 +176,7 @@ def test_run_distributed(mock_worker):
mock_worker.api_client.add_response(
"RetrieveProcess",
id=mock_worker.process["id"],
id=mock_worker.process_information["id"],
response={
"activity_state": "ready",
"corpus": "corpusid",
......@@ -184,15 +184,15 @@ def test_run_distributed(mock_worker):
)
mock_worker.api_client.add_response(
"RetrieveCorpus",
id=mock_worker.process["corpus"],
id=mock_worker.process_information["corpus"],
response={
"id": mock_worker.process["corpus"],
"id": mock_worker.process_information["corpus"],
"types": [{"id": "A", "slug": "class"}, {"id": "B", "slug": "student"}],
},
)
mock_worker.api_client.add_response(
"ListProcessElements",
id=mock_worker.process["id"],
id=mock_worker.process_information["id"],
with_image=True,
allow_missing_data=True,
page_size=INIT_PAGE_SIZE,
......@@ -255,7 +255,7 @@ def test_run_distributed(mock_worker):
],
)
mock_worker.run()
mock_worker.process()
check_json(
json_path=mock_worker.work_dir / "elements_chunk_1.json",
......@@ -436,15 +436,15 @@ def test_not_enough_elements(mock_worker):
mock_worker.api_client.add_response(
"RetrieveCorpus",
id=mock_worker.process["corpus"],
id=mock_worker.process_information["corpus"],
response={
"id": mock_worker.process["corpus"],
"id": mock_worker.process_information["corpus"],
"types": [{"id": "A", "slug": "class"}, {"id": "B", "slug": "student"}],
},
)
mock_worker.api_client.add_response(
"ListProcessElements",
id=mock_worker.process["id"],
id=mock_worker.process_information["id"],
with_image=False,
allow_missing_data=True,
page_size=INIT_PAGE_SIZE,
......@@ -457,7 +457,7 @@ def test_not_enough_elements(mock_worker):
],
)
with pytest.raises(AssertionError, match="Too few elements have been retrieved"):
mock_worker.run()
mock_worker.process()
def test_run_duplicates(mock_worker):
......@@ -465,7 +465,7 @@ def test_run_duplicates(mock_worker):
mock_worker.api_client.add_response(
"RetrieveProcess",
id=mock_worker.process["id"],
id=mock_worker.process_information["id"],
response={
"activity_state": "ready",
"corpus": "corpusid",
......@@ -473,15 +473,15 @@ def test_run_duplicates(mock_worker):
)
mock_worker.api_client.add_response(
"RetrieveCorpus",
id=mock_worker.process["corpus"],
id=mock_worker.process_information["corpus"],
response={
"id": mock_worker.process["corpus"],
"id": mock_worker.process_information["corpus"],
"types": [{"id": "A", "slug": "class"}, {"id": "B", "slug": "student"}],
},
)
mock_worker.api_client.add_response(
"ListProcessElements",
id=mock_worker.process["id"],
id=mock_worker.process_information["id"],
with_image=True,
allow_missing_data=True,
page_size=INIT_PAGE_SIZE,
......@@ -524,7 +524,7 @@ def test_run_duplicates(mock_worker):
],
)
mock_worker.run()
mock_worker.process()
check_json(
json_path=mock_worker.work_dir / "elements.json",
......@@ -604,7 +604,7 @@ def test_activity_state_awaiting(mock_worker, monkeypatch):
for state in ["pending", "pending", "pending", "ready"]:
mock_worker.api_client.add_response(
"RetrieveProcess",
id=mock_worker.process["id"],
id=mock_worker.process_information["id"],
response={
"activity_state": state,
"corpus": "corpusid",
......@@ -612,15 +612,15 @@ def test_activity_state_awaiting(mock_worker, monkeypatch):
)
mock_worker.api_client.add_response(
"RetrieveCorpus",
id=mock_worker.process["corpus"],
id=mock_worker.process_information["corpus"],
response={
"id": mock_worker.process["corpus"],
"id": mock_worker.process_information["corpus"],
"types": [{"id": "A", "slug": "class"}, {"id": "B", "slug": "student"}],
},
)
mock_worker.api_client.add_response(
"ListProcessElements",
id=mock_worker.process["id"],
id=mock_worker.process_information["id"],
with_image=False,
allow_missing_data=True,
page_size=INIT_PAGE_SIZE,
......@@ -633,7 +633,7 @@ def test_activity_state_awaiting(mock_worker, monkeypatch):
],
)
mock_worker.run()
mock_worker.process()
check_json(
json_path=mock_worker.work_dir / "elements.json",
......@@ -658,7 +658,7 @@ def test_activity_state_timeout(mock_worker, caplog, monkeypatch):
for _ in range(12):
mock_worker.api_client.add_response(
"RetrieveProcess",
id=mock_worker.process["id"],
id=mock_worker.process_information["id"],
response={
"activity_state": "pending",
"corpus": "corpusid",
......@@ -666,15 +666,15 @@ def test_activity_state_timeout(mock_worker, caplog, monkeypatch):
)
mock_worker.api_client.add_response(
"RetrieveCorpus",
id=mock_worker.process["corpus"],
id=mock_worker.process_information["corpus"],
response={
"id": mock_worker.process["corpus"],
"id": mock_worker.process_information["corpus"],
"types": [{"id": "A", "slug": "class"}, {"id": "B", "slug": "student"}],
},
)
mock_worker.api_client.add_response(
"ListProcessElements",
id=mock_worker.process["id"],
id=mock_worker.process_information["id"],
with_image=False,
allow_missing_data=True,
page_size=INIT_PAGE_SIZE,
......@@ -688,7 +688,7 @@ def test_activity_state_timeout(mock_worker, caplog, monkeypatch):
)
with pytest.raises(Exception, match="Worker activity timeout"):
mock_worker.run()
mock_worker.process()
assert sum(sleep_args) == 4094
assert [(record.levelname, record.message) for record in caplog.records] == [
......@@ -705,15 +705,15 @@ def test_run_empty(mock_worker, caplog):
mock_worker.api_client.add_response(
"RetrieveCorpus",
id=mock_worker.process["corpus"],
id=mock_worker.process_information["corpus"],
response={
"id": mock_worker.process["corpus"],
"id": mock_worker.process_information["corpus"],
"types": [{"id": "A", "slug": "class"}, {"id": "B", "slug": "student"}],
},
)
mock_worker.api_client.add_response(
"ListProcessElements",
id=mock_worker.process["id"],
id=mock_worker.process_information["id"],
with_image=False,
allow_missing_data=True,
page_size=INIT_PAGE_SIZE,
......@@ -721,7 +721,7 @@ def test_run_empty(mock_worker, caplog):
)
with pytest.raises(SystemExit) as ctx:
mock_worker.run()
mock_worker.process()
assert ctx.value.code == 1
......
......@@ -59,7 +59,6 @@ class InitElementWorker(BaseWorker):
super().configure_for_developers()
else:
super().configure()
super().configure_cache()
# Retrieve the user configuration
if self.user_configuration:
......@@ -70,11 +69,6 @@ class InitElementWorker(BaseWorker):
self.use_cache = self.config["use_cache"]
self.api_client.sleep_duration = self.config["sleep"]
assert self.worker_run_id, "Missing ARKINDEX_WORKER_RUN_ID environment variable, cannot retrieve process information"
self.process = self.request("RetrieveWorkerRun", id=self.worker_run_id)[
"process"
]
def dump_json(
self, elements: list[Element], filename: str = "elements.json"
) -> None:
......@@ -166,11 +160,11 @@ class InitElementWorker(BaseWorker):
)
def list_process_elements(self) -> list[Element]:
assert self.process.get(
assert self.process_information.get(
"corpus"
), "init_elements only supports processes on corpora."
corpus = self.request("RetrieveCorpus", id=self.process["corpus"])
corpus = self.request("RetrieveCorpus", id=self.process_information["corpus"])
type_slugs = {
element_type["id"]: element_type["slug"] for element_type in corpus["types"]
}
......@@ -179,7 +173,7 @@ class InitElementWorker(BaseWorker):
Element(**element, type=type_slugs[element["type_id"]])
for element in self.api_client.paginate(
"ListProcessElements",
id=self.process["id"],
id=self.process_information["id"],
with_image=self.use_cache,
allow_missing_data=True,
page_size=INIT_PAGE_SIZE,
......@@ -191,7 +185,7 @@ class InitElementWorker(BaseWorker):
)
logger.info(
f"Retrieved {len(unique_elements)} element{'s'[:len(unique_elements) > 1]} from process {self.process['id']}"
f"Retrieved {len(unique_elements)} element{'s'[:len(unique_elements) > 1]} from process {self.process_information['id']}"
)
duplicate_count = len(elements) - len(unique_elements)
......@@ -206,7 +200,7 @@ class InitElementWorker(BaseWorker):
def check_worker_activity(self) -> bool:
# Check if workers activity associated to this process is in a pending state
process = self.request("RetrieveProcess", id=self.process["id"])
process = self.request("RetrieveProcess", id=self.process_information["id"])
if process.get("activity_state") == ERROR_STATE:
logger.error(
"Workers activity could not be initialized. Please report this incident to an instance administrator."
......@@ -230,12 +224,16 @@ class InitElementWorker(BaseWorker):
raise Exception("Worker activity timeout")
sleep(timer)
def run(self) -> None:
def process(self) -> None:
elements = self.list_process_elements()
self.dump_chunks(elements)
self.await_worker_activity()
def run(self) -> None:
self.configure()
self.process()
def main() -> None:
InitElementWorker(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment