Skip to content
Snippets Groups Projects

Depend only on RetrieveWorkerRun to get all the informations needed

Merged Yoann Schneider requested to merge depend-on-retrieve-worker-run into master
All threads resolved!
7 files
+ 287
204
Compare changes
  • Side-by-side
  • Inline
Files
7
@@ -85,7 +85,6 @@ class BaseWorker(object):
help=(
"Run worker in developer mode. "
"Worker will be in read-only state even if a worker_version is supplied. "
"ARKINDEX_PROCESS_ID environment variable is not required with this flag."
),
action="store_true",
default=False,
@@ -111,6 +110,11 @@ class BaseWorker(object):
logger.warning(
"Missing WORKER_VERSION_ID environment variable, worker is in read-only mode"
)
self.worker_run_id = os.environ.get("ARKINDEX_WORKER_RUN_ID")
if not self.worker_run_id:
logger.warning(
"Missing ARKINDEX_WORKER_RUN_ID environment variable, worker is in read-only mode"
)
logger.info(f"Worker will use {self.work_dir} as working directory")
@@ -130,7 +134,11 @@ class BaseWorker(object):
or when no worker version ID is provided.
:rtype: bool
"""
return self.args.dev or self.worker_version_id is None
return (
self.args.dev
or self.worker_version_id is None
or self.worker_run_id is None
)
def configure_for_developers(self):
assert self.is_read_only
@@ -170,21 +178,20 @@ class BaseWorker(object):
self.api_client = ArkindexClient(**options_from_env())
logger.debug(f"Setup Arkindex API client on {self.api_client.document.url}")
# Load worker run information
worker_run = self.request("RetrieveWorkerRun", id=self.worker_run_id)
# Load process information
assert os.environ.get(
"ARKINDEX_PROCESS_ID"
), "ARKINDEX_PROCESS_ID environment variable is not defined"
self.process_information = self.request(
"RetrieveDataImport", id=os.environ["ARKINDEX_PROCESS_ID"]
)
self.process_information = worker_run["process"]
# Retrieve initial configuration from API
worker_version = self.request(
"RetrieveWorkerVersion", id=self.worker_version_id
)
# Load worker version information
worker_version = worker_run["worker_version"]
self.worker_details = worker_version["worker"]
logger.info(
f"Loaded worker {worker_version['worker']['name']} revision {worker_version['revision']['hash'][0:7]} from API"
f"Loaded worker {self.worker_details['name']} revision {worker_version['revision']['hash'][0:7]} from API"
)
# Retrieve initial configuration from API
self.config = worker_version["configuration"]["configuration"]
if "user_configuration" in worker_version["configuration"]:
# Add default values (if set) to user_configuration
@@ -193,30 +200,20 @@ class BaseWorker(object):
].items():
if "default" in value:
self.user_configuration[key] = value["default"]
self.worker_details = worker_version["worker"]
required_secrets = worker_version["configuration"].get("secrets", [])
# Load all required secrets
required_secrets = worker_version["configuration"].get("secrets", [])
self.secrets = {name: self.load_secret(name) for name in required_secrets}
# Load worker run configuration when available
if os.environ.get("ARKINDEX_WORKER_RUN_ID"):
worker_run = self.request(
"RetrieveWorkerRun", id=os.environ["ARKINDEX_WORKER_RUN_ID"]
)
configuration_id = worker_run.get("configuration_id")
if configuration_id:
worker_configuration = self.request(
"RetrieveWorkerConfiguration", id=configuration_id
)
self.user_configuration = worker_configuration.get("configuration")
if self.user_configuration:
logger.info("Loaded user configuration from WorkerRun")
# if debug mode is set to true activate debug mode in logger
if self.user_configuration and self.user_configuration.get("debug"):
logger.setLevel(logging.DEBUG)
logger.debug("Debug output enabled")
worker_configuration = worker_run.get("configuration")
self.user_configuration = worker_configuration.get("configuration")
if self.user_configuration:
logger.info("Loaded user configuration from WorkerRun")
# if debug mode is set to true activate debug mode in logger
if self.user_configuration.get("debug"):
logger.setLevel(logging.DEBUG)
logger.debug("Debug output enabled")
def configure_cache(self):
task_id = os.environ.get("PONOS_TASK")
Loading