Skip to content
GitLab
Explore
Sign in
Register
Primary navigation
Search or go to…
Project
B
Base Worker
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package registry
Container Registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
Workers
Base Worker
Commits
2433af0d
Commit
2433af0d
authored
2 years ago
by
Chaza Abdelwahab
Committed by
Yoann Schneider
2 years ago
Browse files
Options
Downloads
Patches
Plain Diff
push for debug
parent
f7fe1424
No related branches found
No related tags found
1 merge request
!179
Move developer setup in a dedicated method
Changes
3
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
arkindex_worker/worker/__init__.py
+10
-2
10 additions, 2 deletions
arkindex_worker/worker/__init__.py
arkindex_worker/worker/base.py
+80
-73
80 additions, 73 deletions
arkindex_worker/worker/base.py
tests/test_base_worker.py
+9
-14
9 additions, 14 deletions
tests/test_base_worker.py
with
99 additions
and
89 deletions
arkindex_worker/worker/__init__.py
+
10
−
2
View file @
2433af0d
...
...
@@ -67,7 +67,9 @@ class ElementsWorker(
"""
def
__init__
(
self
,
description
=
"
Arkindex Elements Worker
"
,
support_cache
=
False
):
print
(
"
in __init__ in __init__.py before super
"
)
super
().
__init__
(
description
,
support_cache
)
print
(
"
in __init__ in __init__.py after super
"
)
# Add mandatory argument to process elements
self
.
parser
.
add_argument
(
...
...
@@ -87,6 +89,9 @@ class ElementsWorker(
self
.
_worker_version_cache
=
{}
# CLI args are stored on the instance so that implementations can access them
self
.
args
=
self
.
parser
.
parse_args
()
def
list_elements
(
self
):
"""
List the elements to be processed, either from the CLI arguments or
...
...
@@ -136,8 +141,11 @@ class ElementsWorker(
return
self
.
process_information
.
get
(
"
activity_state
"
)
==
"
ready
"
def
configure
(
self
):
super
().
configure
()
super
().
configure_cache
()
if
self
.
is_read_only
:
super
().
configure_for_developers
()
else
:
super
().
configure
()
super
().
configure_cache
()
# Add report concerning elements
self
.
report
=
Reporter
(
...
...
This diff is collapsed.
Click to expand it.
arkindex_worker/worker/base.py
+
80
−
73
View file @
2433af0d
...
...
@@ -59,6 +59,39 @@ class BaseWorker(object):
"""
self
.
parser
=
argparse
.
ArgumentParser
(
description
=
description
)
self
.
parser
.
add_argument
(
"
-c
"
,
"
--config
"
,
help
=
"
Alternative configuration file when running without a Worker Version ID
"
,
type
=
open
,
)
self
.
parser
.
add_argument
(
"
-d
"
,
"
--database
"
,
help
=
"
Alternative SQLite database to use for worker caching
"
,
type
=
str
,
default
=
None
,
)
self
.
parser
.
add_argument
(
"
-v
"
,
"
--verbose
"
,
"
--debug
"
,
help
=
"
Display more information on events and errors
"
,
action
=
"
store_true
"
,
default
=
False
,
)
self
.
parser
.
add_argument
(
"
--dev
"
,
help
=
(
"
Run worker in developer mode.
"
"
Worker will be in read-only state even if a worker_version is supplied.
"
"
ARKINDEX_PROCESS_ID environment variable is not required with this flag.
"
),
action
=
"
store_true
"
,
)
# Call potential extra arguments
self
.
add_arguments
()
# Setup workdir either in Ponos environment or on host's home
if
os
.
environ
.
get
(
"
PONOS_DATA
"
):
...
...
@@ -98,84 +131,14 @@ class BaseWorker(object):
"""
return
self
.
args
.
dev
or
self
.
worker_version_id
is
None
def
configure
(
self
):
"""
Configure worker using CLI args and environment variables.
"""
self
.
parser
.
add_argument
(
"
-c
"
,
"
--config
"
,
help
=
"
Alternative configuration file when running without a Worker Version ID
"
,
type
=
open
,
)
self
.
parser
.
add_argument
(
"
-d
"
,
"
--database
"
,
help
=
"
Alternative SQLite database to use for worker caching
"
,
type
=
str
,
default
=
None
,
)
self
.
parser
.
add_argument
(
"
-v
"
,
"
--verbose
"
,
"
--debug
"
,
help
=
"
Display more information on events and errors
"
,
action
=
"
store_true
"
,
default
=
False
,
)
self
.
parser
.
add_argument
(
"
--dev
"
,
help
=
(
"
Run worker in developer mode.
"
"
Worker will be in read-only state even if a worker_version is supplied.
"
"
ARKINDEX_PROCESS_ID environment variable is not required with this flag.
"
),
action
=
"
store_true
"
,
)
# Call potential extra arguments
self
.
add_arguments
()
# CLI args are stored on the instance so that implementations can access them
self
.
args
=
self
.
parser
.
parse_args
()
def
configure_for_developers
(
self
):
print
(
"
in configure_for_developers in base.py
"
)
# Setup logging level if verbose or if ARKINDEX_DEBUG is set to true
if
self
.
args
.
verbose
or
os
.
environ
.
get
(
"
ARKINDEX_DEBUG
"
):
logger
.
setLevel
(
logging
.
DEBUG
)
logger
.
debug
(
"
Debug output enabled
"
)
# Build Arkindex API client from environment variables
self
.
api_client
=
ArkindexClient
(
**
options_from_env
())
logger
.
debug
(
f
"
Setup Arkindex API client on
{
self
.
api_client
.
document
.
url
}
"
)
# Load process information except in developer mode
if
not
self
.
args
.
dev
:
assert
os
.
environ
.
get
(
"
ARKINDEX_PROCESS_ID
"
),
"
ARKINDEX_PROCESS_ID environment variable is not defined
"
self
.
process_information
=
self
.
request
(
"
RetrieveDataImport
"
,
id
=
os
.
environ
[
"
ARKINDEX_PROCESS_ID
"
]
)
if
self
.
worker_version_id
:
# Retrieve initial configuration from API
worker_version
=
self
.
request
(
"
RetrieveWorkerVersion
"
,
id
=
self
.
worker_version_id
)
logger
.
info
(
f
"
Loaded worker
{
worker_version
[
'
worker
'
][
'
name
'
]
}
revision
{
worker_version
[
'
revision
'
][
'
hash
'
][
0
:
7
]
}
from API
"
)
self
.
config
=
worker_version
[
"
configuration
"
][
"
configuration
"
]
if
"
user_configuration
"
in
worker_version
[
"
configuration
"
]:
# Add default values (if set) to user_configuration
for
key
,
value
in
worker_version
[
"
configuration
"
][
"
user_configuration
"
].
items
():
if
"
default
"
in
value
:
self
.
user_configuration
[
key
]
=
value
[
"
default
"
]
self
.
worker_details
=
worker_version
[
"
worker
"
]
required_secrets
=
worker_version
[
"
configuration
"
].
get
(
"
secrets
"
,
[])
elif
self
.
args
.
config
:
if
self
.
args
.
config
:
# Load config from YAML file
self
.
config
=
yaml
.
safe_load
(
self
.
args
.
config
)
self
.
worker_details
=
{
"
name
"
:
"
Local worker
"
}
...
...
@@ -192,6 +155,50 @@ class BaseWorker(object):
# Load all required secrets
self
.
secrets
=
{
name
:
self
.
load_secret
(
name
)
for
name
in
required_secrets
}
def
configure
(
self
):
"""
Configure worker using CLI args and environment variables.
"""
print
(
"
in configure in base.py
"
)
assert
not
self
.
is_read_only
# Setup logging level if verbose or if ARKINDEX_DEBUG is set to true
if
self
.
args
.
verbose
or
os
.
environ
.
get
(
"
ARKINDEX_DEBUG
"
):
logger
.
setLevel
(
logging
.
DEBUG
)
logger
.
debug
(
"
Debug output enabled
"
)
# Build Arkindex API client from environment variables
self
.
api_client
=
ArkindexClient
(
**
options_from_env
())
logger
.
debug
(
f
"
Setup Arkindex API client on
{
self
.
api_client
.
document
.
url
}
"
)
# Load process information except in developer mode
assert
os
.
environ
.
get
(
"
ARKINDEX_PROCESS_ID
"
),
"
ARKINDEX_PROCESS_ID environment variable is not defined
"
self
.
process_information
=
self
.
request
(
"
RetrieveDataImport
"
,
id
=
os
.
environ
[
"
ARKINDEX_PROCESS_ID
"
]
)
# Retrieve initial configuration from API
worker_version
=
self
.
request
(
"
RetrieveWorkerVersion
"
,
id
=
self
.
worker_version_id
)
logger
.
info
(
f
"
Loaded worker
{
worker_version
[
'
worker
'
][
'
name
'
]
}
revision
{
worker_version
[
'
revision
'
][
'
hash
'
][
0
:
7
]
}
from API
"
)
self
.
config
=
worker_version
[
"
configuration
"
][
"
configuration
"
]
if
"
user_configuration
"
in
worker_version
[
"
configuration
"
]:
# Add default values (if set) to user_configuration
for
key
,
value
in
worker_version
[
"
configuration
"
][
"
user_configuration
"
].
items
():
if
"
default
"
in
value
:
self
.
user_configuration
[
key
]
=
value
[
"
default
"
]
self
.
worker_details
=
worker_version
[
"
worker
"
]
required_secrets
=
worker_version
[
"
configuration
"
].
get
(
"
secrets
"
,
[])
# Load all required secrets
self
.
secrets
=
{
name
:
self
.
load_secret
(
name
)
for
name
in
required_secrets
}
# Load worker run configuration when available and not in dev mode
if
os
.
environ
.
get
(
"
ARKINDEX_WORKER_RUN_ID
"
)
and
not
self
.
args
.
dev
:
worker_run
=
self
.
request
(
...
...
This diff is collapsed.
Click to expand it.
tests/test_base_worker.py
+
9
−
14
View file @
2433af0d
...
...
@@ -45,17 +45,17 @@ def test_init_var_ponos_data_given(monkeypatch):
assert
worker
.
worker_version_id
==
"
12341234-1234-1234-1234-123412341234
"
def
test_init_var_worker_version_id_missing
(
monkeypatch
,
mock_process_api
):
def
test_init_var_worker_version_id_missing
(
monkeypatch
):
monkeypatch
.
setattr
(
sys
,
"
argv
"
,
[
"
worker
"
])
monkeypatch
.
delenv
(
"
WORKER_VERSION_ID
"
)
worker
=
BaseWorker
()
worker
.
configure
()
worker
.
configure
_for_developers
()
assert
worker
.
worker_version_id
is
None
assert
worker
.
is_read_only
is
True
assert
worker
.
config
==
{}
# default empty case
def
test_init_var_worker_local_file
(
monkeypatch
,
tmp_path
,
mock_process_api
):
def
test_init_var_worker_local_file
(
monkeypatch
,
tmp_path
):
# Build a dummy yaml config file
config
=
tmp_path
/
"
config.yml
"
config
.
write_text
(
"
---
\n
localKey: abcdef123
"
)
...
...
@@ -63,7 +63,7 @@ def test_init_var_worker_local_file(monkeypatch, tmp_path, mock_process_api):
monkeypatch
.
setattr
(
sys
,
"
argv
"
,
[
"
worker
"
,
"
-c
"
,
str
(
config
)])
monkeypatch
.
delenv
(
"
WORKER_VERSION_ID
"
)
worker
=
BaseWorker
()
worker
.
configure
()
worker
.
configure
_for_developers
()
assert
worker
.
worker_version_id
is
None
assert
worker
.
is_read_only
is
True
assert
worker
.
config
==
{
"
localKey
"
:
"
abcdef123
"
}
# Use a local file for devs
...
...
@@ -71,18 +71,13 @@ def test_init_var_worker_local_file(monkeypatch, tmp_path, mock_process_api):
config
.
unlink
()
def
test_cli_default
(
mocker
,
mock_config_api
):
def
test_cli_default
(
mocker
):
worker
=
BaseWorker
()
spy
=
mocker
.
spy
(
worker
,
"
add_arguments
"
)
assert
not
spy
.
called
assert
logger
.
level
==
logging
.
NOTSET
assert
not
hasattr
(
worker
,
"
api_client
"
)
mocker
.
patch
.
object
(
sys
,
"
argv
"
,
[
"
worker
"
])
worker
.
configure
()
assert
spy
.
called
assert
spy
.
call_count
==
1
worker
.
configure_for_developers
()
assert
not
worker
.
args
.
verbose
assert
logger
.
level
==
logging
.
NOTSET
assert
worker
.
api_client
...
...
@@ -93,7 +88,7 @@ def test_cli_default(mocker, mock_config_api):
logger
.
setLevel
(
logging
.
NOTSET
)
def
test_cli_arg_verbose_given
(
mocker
,
mock_config_api
):
def
test_cli_arg_verbose_given
(
mocker
):
worker
=
BaseWorker
()
spy
=
mocker
.
spy
(
worker
,
"
add_arguments
"
)
assert
not
spy
.
called
...
...
@@ -115,7 +110,7 @@ def test_cli_arg_verbose_given(mocker, mock_config_api):
logger
.
setLevel
(
logging
.
NOTSET
)
def
test_cli_envvar_debug_given
(
mocker
,
monkeypatch
,
mock_config_api
):
def
test_cli_envvar_debug_given
(
mocker
,
monkeypatch
):
worker
=
BaseWorker
()
assert
logger
.
level
==
logging
.
NOTSET
...
...
@@ -133,7 +128,7 @@ def test_cli_envvar_debug_given(mocker, monkeypatch, mock_config_api):
logger
.
setLevel
(
logging
.
NOTSET
)
def
test_configure_dev_mode
(
mocker
,
monkeypatch
,
mock_worker_version_api
):
def
test_configure_dev_mode
(
mocker
,
monkeypatch
):
"""
Configuring a worker in developer mode avoid retrieving process information
"""
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment