Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • workers/base-worker
1 result
Show changes
Commits on Source (5)
......@@ -136,7 +136,14 @@ class ElementsWorker(
return self.process_information.get("activity_state") == "ready"
def configure(self):
super().configure()
# CLI args are stored on the instance so that implementations can access them
self.args = self.parser.parse_args()
if self.is_read_only:
super().configure_for_developers()
else:
super().configure()
super().configure_cache()
# Add report concerning elements
self.report = Reporter(
**self.worker_details, version=getattr(self, "worker_version_id", None)
......@@ -199,7 +206,9 @@ class ElementsWorker(
if isinstance(e, ErrorResponse):
message = f"An API error occurred while processing element {element_id}: {e.title} - {e.content}"
else:
message = f"Failed running worker on element {element_id}: {e}"
message = (
f"Failed running worker on element {element_id}: {repr(e)}"
)
logger.warning(
message,
......
......@@ -59,6 +59,40 @@ class BaseWorker(object):
"""
self.parser = argparse.ArgumentParser(description=description)
self.parser.add_argument(
"-c",
"--config",
help="Alternative configuration file when running without a Worker Version ID",
type=open,
)
self.parser.add_argument(
"-d",
"--database",
help="Alternative SQLite database to use for worker caching",
type=str,
default=None,
)
self.parser.add_argument(
"-v",
"--verbose",
"--debug",
help="Display more information on events and errors",
action="store_true",
default=False,
)
self.parser.add_argument(
"--dev",
help=(
"Run worker in developer mode. "
"Worker will be in read-only state even if a worker_version is supplied. "
"ARKINDEX_PROCESS_ID environment variable is not required with this flag."
),
action="store_true",
default=False,
)
# Call potential extra arguments
self.add_arguments()
# Setup workdir either in Ponos environment or on host's home
if os.environ.get("PONOS_DATA"):
......@@ -98,88 +132,14 @@ class BaseWorker(object):
"""
return self.args.dev or self.worker_version_id is None
def configure(self):
"""
Configure worker using CLI args and environment variables.
"""
self.parser.add_argument(
"-c",
"--config",
help="Alternative configuration file when running without a Worker Version ID",
type=open,
)
self.parser.add_argument(
"-d",
"--database",
help="Alternative SQLite database to use for worker caching",
type=str,
default=None,
)
self.parser.add_argument(
"-v",
"--verbose",
help="Display more information on events and errors",
action="store_true",
default=False,
)
self.parser.add_argument(
"--dev",
help=(
"Run worker in developer mode. "
"Worker will be in read-only state even if a worker_version is supplied. "
"ARKINDEX_PROCESS_ID environment variable is not required with this flag."
),
action="store_true",
)
# Call potential extra arguments
self.add_arguments()
# CLI args are stored on the instance so that implementations can access them
self.args = self.parser.parse_args()
# Setup logging level
if self.args.verbose:
def configure_for_developers(self):
assert self.is_read_only
# Setup logging level if verbose or if ARKINDEX_DEBUG is set to true
if self.args.verbose or os.environ.get("ARKINDEX_DEBUG"):
logger.setLevel(logging.DEBUG)
logger.debug("Debug output enabled")
# Build Arkindex API client from environment variables
self.api_client = ArkindexClient(**options_from_env())
logger.debug(f"Setup Arkindex API client on {self.api_client.document.url}")
# Load features available on backend, and check authentication
user = self.request("RetrieveUser")
logger.debug(f"Connected as {user['display_name']} - {user['email']}")
self.features = user["features"]
# Load process information except in developer mode
if not self.args.dev:
assert os.environ.get(
"ARKINDEX_PROCESS_ID"
), "ARKINDEX_PROCESS_ID environment variable is not defined"
self.process_information = self.request(
"RetrieveDataImport", id=os.environ["ARKINDEX_PROCESS_ID"]
)
if self.worker_version_id:
# Retrieve initial configuration from API
worker_version = self.request(
"RetrieveWorkerVersion", id=self.worker_version_id
)
logger.info(
f"Loaded worker {worker_version['worker']['name']} revision {worker_version['revision']['hash'][0:7]} from API"
)
self.config = worker_version["configuration"]["configuration"]
if "user_configuration" in worker_version["configuration"]:
# Add default values (if set) to user_configuration
for key, value in worker_version["configuration"][
"user_configuration"
].items():
if "default" in value:
self.user_configuration[key] = value["default"]
self.worker_details = worker_version["worker"]
required_secrets = worker_version["configuration"].get("secrets", [])
elif self.args.config:
if self.args.config:
# Load config from YAML file
self.config = yaml.safe_load(self.args.config)
self.worker_details = {"name": "Local worker"}
......@@ -196,8 +156,51 @@ class BaseWorker(object):
# Load all required secrets
self.secrets = {name: self.load_secret(name) for name in required_secrets}
# Load worker run configuration when available and not in dev mode
if os.environ.get("ARKINDEX_WORKER_RUN_ID") and not self.args.dev:
def configure(self):
"""
Configure worker using CLI args and environment variables.
"""
assert not self.is_read_only
# Setup logging level if verbose or if ARKINDEX_DEBUG is set to true
if self.args.verbose or os.environ.get("ARKINDEX_DEBUG"):
logger.setLevel(logging.DEBUG)
logger.debug("Debug output enabled")
# Build Arkindex API client from environment variables
self.api_client = ArkindexClient(**options_from_env())
logger.debug(f"Setup Arkindex API client on {self.api_client.document.url}")
# Load process information
assert os.environ.get(
"ARKINDEX_PROCESS_ID"
), "ARKINDEX_PROCESS_ID environment variable is not defined"
self.process_information = self.request(
"RetrieveDataImport", id=os.environ["ARKINDEX_PROCESS_ID"]
)
# Retrieve initial configuration from API
worker_version = self.request(
"RetrieveWorkerVersion", id=self.worker_version_id
)
logger.info(
f"Loaded worker {worker_version['worker']['name']} revision {worker_version['revision']['hash'][0:7]} from API"
)
self.config = worker_version["configuration"]["configuration"]
if "user_configuration" in worker_version["configuration"]:
# Add default values (if set) to user_configuration
for key, value in worker_version["configuration"][
"user_configuration"
].items():
if "default" in value:
self.user_configuration[key] = value["default"]
self.worker_details = worker_version["worker"]
required_secrets = worker_version["configuration"].get("secrets", [])
# Load all required secrets
self.secrets = {name: self.load_secret(name) for name in required_secrets}
# Load worker run configuration when available
if os.environ.get("ARKINDEX_WORKER_RUN_ID"):
worker_run = self.request(
"RetrieveWorkerRun", id=os.environ["ARKINDEX_WORKER_RUN_ID"]
)
......@@ -210,6 +213,12 @@ class BaseWorker(object):
if self.user_configuration:
logger.info("Loaded user configuration from WorkerRun")
# if debug mode is set to true activate debug mode in logger
if self.user_configuration and self.user_configuration.get("debug"):
logger.setLevel(logging.DEBUG)
logger.debug("Debug output enabled")
def configure_cache(self):
task_id = os.environ.get("PONOS_TASK")
paths = None
if self.support_cache and self.args.database is not None:
......
......@@ -109,7 +109,7 @@ def give_env_variable(request, monkeypatch):
@pytest.fixture
def mock_config_api(mock_worker_version_api, mock_process_api, mock_user_api):
def mock_config_api(mock_worker_version_api, mock_process_api):
"""Mock all API endpoints required to configure a worker"""
pass
......@@ -223,29 +223,6 @@ def mock_process_api(responses):
)
@pytest.fixture
def mock_user_api(responses):
"""
Provide a mock API response to retrieve user details
Signup is disabled in this mock
"""
payload = {
"id": 1,
"email": "bot@teklia.com",
"display_name": "Bender",
"features": {
"signup": False,
},
}
responses.add(
responses.GET,
"http://testserver/api/v1/user/",
status=200,
body=json.dumps(payload),
content_type="application/json",
)
@pytest.fixture
def mock_activity_calls(responses):
"""
......@@ -309,6 +286,7 @@ def mock_elements_worker_with_cache(monkeypatch, mock_config_api, tmp_path):
worker = ElementsWorker(support_cache=True)
worker.configure()
worker.configure_cache()
return worker
......
......@@ -45,21 +45,18 @@ def test_init_var_ponos_data_given(monkeypatch):
assert worker.worker_version_id == "12341234-1234-1234-1234-123412341234"
def test_init_var_worker_version_id_missing(
monkeypatch, mock_user_api, mock_process_api
):
def test_init_var_worker_version_id_missing(monkeypatch):
monkeypatch.setattr(sys, "argv", ["worker"])
monkeypatch.delenv("WORKER_VERSION_ID")
worker = BaseWorker()
worker.configure()
worker.args = worker.parser.parse_args()
worker.configure_for_developers()
assert worker.worker_version_id is None
assert worker.is_read_only is True
assert worker.config == {} # default empty case
def test_init_var_worker_local_file(
monkeypatch, tmp_path, mock_user_api, mock_process_api
):
def test_init_var_worker_local_file(monkeypatch, tmp_path):
# Build a dummy yaml config file
config = tmp_path / "config.yml"
config.write_text("---\nlocalKey: abcdef123")
......@@ -67,7 +64,8 @@ def test_init_var_worker_local_file(
monkeypatch.setattr(sys, "argv", ["worker", "-c", str(config)])
monkeypatch.delenv("WORKER_VERSION_ID")
worker = BaseWorker()
worker.configure()
worker.args = worker.parser.parse_args()
worker.configure_for_developers()
assert worker.worker_version_id is None
assert worker.is_read_only is True
assert worker.config == {"localKey": "abcdef123"} # Use a local file for devs
......@@ -77,16 +75,12 @@ def test_init_var_worker_local_file(
def test_cli_default(mocker, mock_config_api):
worker = BaseWorker()
spy = mocker.spy(worker, "add_arguments")
assert not spy.called
assert logger.level == logging.NOTSET
assert not hasattr(worker, "api_client")
mocker.patch.object(sys, "argv", ["worker"])
worker.args = worker.parser.parse_args()
worker.configure()
assert spy.called
assert spy.call_count == 1
assert not worker.args.verbose
assert logger.level == logging.NOTSET
assert worker.api_client
......@@ -99,16 +93,12 @@ def test_cli_default(mocker, mock_config_api):
def test_cli_arg_verbose_given(mocker, mock_config_api):
worker = BaseWorker()
spy = mocker.spy(worker, "add_arguments")
assert not spy.called
assert logger.level == logging.NOTSET
assert not hasattr(worker, "api_client")
mocker.patch.object(sys, "argv", ["worker", "-v"])
worker.args = worker.parser.parse_args()
worker.configure()
assert spy.called
assert spy.call_count == 1
assert worker.args.verbose
assert logger.level == logging.DEBUG
assert worker.api_client
......@@ -119,9 +109,25 @@ def test_cli_arg_verbose_given(mocker, mock_config_api):
logger.setLevel(logging.NOTSET)
def test_configure_dev_mode(
mocker, monkeypatch, mock_user_api, mock_worker_version_api
):
def test_cli_envvar_debug_given(mocker, monkeypatch, mock_config_api):
worker = BaseWorker()
assert logger.level == logging.NOTSET
assert not hasattr(worker, "api_client")
mocker.patch.object(sys, "argv", ["worker"])
monkeypatch.setenv("ARKINDEX_DEBUG", True)
worker.args = worker.parser.parse_args()
worker.configure()
assert logger.level == logging.DEBUG
assert worker.api_client
assert worker.worker_version_id == "12341234-1234-1234-1234-123412341234"
assert worker.is_read_only is False
assert worker.config == {"someKey": "someValue"} # from API
logger.setLevel(logging.NOTSET)
def test_configure_dev_mode(mocker, monkeypatch):
"""
Configuring a worker in developer mode avoid retrieving process information
"""
......@@ -130,7 +136,8 @@ def test_configure_dev_mode(
monkeypatch.setenv(
"ARKINDEX_WORKER_RUN_ID", "aaaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
)
worker.configure()
worker.args = worker.parser.parse_args()
worker.configure_for_developers()
assert worker.args.dev is True
assert worker.process_information is None
......@@ -155,6 +162,7 @@ def test_configure_worker_run(mocker, monkeypatch, responses, mock_config_api):
f"http://testserver/api/v1/workers/configurations/{configuration_id}/",
json={"id": configuration_id, "name": "BBB", "configuration": {"a": "b"}},
)
worker.args = worker.parser.parse_args()
worker.configure()
assert worker.user_configuration == {"a": "b"}
......@@ -164,7 +172,6 @@ def test_configure_user_configuration_defaults(
mocker,
monkeypatch,
mock_worker_version_user_configuration_api,
mock_user_api,
mock_process_api,
responses,
):
......@@ -177,6 +184,7 @@ def test_configure_user_configuration_defaults(
f"http://testserver/api/v1/imports/workers/{run_id}/",
json={"id": run_id},
)
worker.args = worker.parser.parse_args()
worker.configure()
assert worker.config == {"param_1": "/some/path/file.pth", "param_2": 12}
......@@ -186,6 +194,35 @@ def test_configure_user_configuration_defaults(
}
@pytest.mark.parametrize("debug_dict", [{"debug": True}, {"debug": False}])
def test_configure_user_config_debug(
mocker, monkeypatch, responses, debug_dict, mock_config_api
):
worker = BaseWorker()
mocker.patch.object(sys, "argv", ["worker"])
run_id = "aaaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"
configuration_id = "bbbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb"
monkeypatch.setenv("ARKINDEX_WORKER_RUN_ID", run_id)
assert logger.level == logging.NOTSET
responses.add(
responses.GET,
f"http://testserver/api/v1/imports/workers/{run_id}/",
json={"id": run_id, "configuration_id": configuration_id},
)
responses.add(
responses.GET,
f"http://testserver/api/v1/workers/configurations/{configuration_id}/",
json={"id": configuration_id, "name": "BBB", "configuration": debug_dict},
)
worker.args = worker.parser.parse_args()
worker.configure()
assert worker.user_configuration == debug_dict
expected_log_level = logging.DEBUG if debug_dict["debug"] else logging.NOTSET
assert logger.level == expected_log_level
logger.setLevel(logging.NOTSET)
def test_configure_worker_run_missing_conf(
mocker, monkeypatch, responses, mock_config_api
):
......@@ -204,6 +241,7 @@ def test_configure_worker_run_missing_conf(
f"http://testserver/api/v1/workers/configurations/{configuration_id}/",
json={"id": configuration_id, "name": "BBB"},
)
worker.args = worker.parser.parse_args()
worker.configure()
assert worker.user_configuration is None
......
# -*- coding: utf-8 -*-
# API calls during worker configuration
BASE_API_CALLS = [
("GET", "http://testserver/api/v1/user/"),
("GET", "http://testserver/api/v1/imports/aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeffff/"),
(
"GET",
......
......@@ -79,9 +79,7 @@ def test_readonly(responses, mock_elements_worker):
] == BASE_API_CALLS
def test_activities_disabled(
responses, monkeypatch, mock_worker_version_api, mock_user_api
):
def test_activities_disabled(responses, monkeypatch, mock_worker_version_api):
"""Test worker process elements without updating activities when they are disabled for the process"""
responses.add(
responses.GET,
......@@ -101,7 +99,7 @@ def test_activities_disabled(
] == BASE_API_CALLS
def test_activities_dev_mode(mocker, mock_user_api, mock_worker_version_api):
def test_activities_dev_mode(mocker):
"""
Worker activities are not stored in dev mode
"""
......
......@@ -181,7 +181,9 @@ def test_merge_from_worker(
monkeypatch.setenv("PONOS_DATA", str(tmpdir))
# Create the task's output dir, so that it can create its own database
(tmpdir / "my_task").mkdir()
mock_base_worker_with_cache.args = mock_base_worker_with_cache.parser.parse_args()
mock_base_worker_with_cache.configure()
mock_base_worker_with_cache.configure_cache()
# Then we have 2 elements and a transcription
assert CachedImage.select().count() == 0
......