Skip to content
GitLab
Explore
Sign in
Register
Primary navigation
Search or go to…
Project
I
Init Elements
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package registry
Container Registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
Arkindex
Workers
Init Elements
Commits
0d2722c7
Commit
0d2722c7
authored
1 year ago
by
Manon Blanco
Browse files
Options
Downloads
Patches
Plain Diff
Apply suggestions
parent
13d93830
No related branches found
No related tags found
1 merge request
!2
Port init elements code
Pipeline
#165775
passed
1 year ago
Stage: test
Stage: build
Stage: release
Changes
3
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
Showing
3 changed files
tests/conftest.py
+8
-4
8 additions, 4 deletions
tests/conftest.py
tests/test_worker.py
+78
-27
78 additions, 27 deletions
tests/test_worker.py
worker_init_elements/worker.py
+89
-58
89 additions, 58 deletions
worker_init_elements/worker.py
with
175 additions
and
89 deletions
tests/conftest.py
+
8
−
4
View file @
0d2722c7
...
@@ -5,7 +5,7 @@ import pytest
...
@@ -5,7 +5,7 @@ import pytest
from
arkindex.mock
import
MockApiClient
from
arkindex.mock
import
MockApiClient
from
arkindex_worker.worker.base
import
BaseWorker
from
arkindex_worker.worker.base
import
BaseWorker
from
worker_init_elements.worker
import
InitElementWorker
from
worker_init_elements.worker
import
InitElement
s
Worker
@pytest.fixture
()
@pytest.fixture
()
...
@@ -94,7 +94,11 @@ def _mock_worker_run_api(mock_api_client: MockApiClient) -> None:
...
@@ -94,7 +94,11 @@ def _mock_worker_run_api(mock_api_client: MockApiClient) -> None:
},
},
},
},
"
configuration
"
:
None
,
"
configuration
"
:
None
,
"
process
"
:
{
"
id
"
:
"
process_id
"
,
"
corpus
"
:
os
.
getenv
(
"
ARKINDEX_CORPUS_ID
"
)},
"
process
"
:
{
"
id
"
:
"
process_id
"
,
"
corpus
"
:
os
.
getenv
(
"
ARKINDEX_CORPUS_ID
"
),
"
activity_state
"
:
"
disabled
"
,
},
"
summary
"
:
os
.
getenv
(
"
ARKINDEX_WORKER_RUN_ID
"
)
+
"
@ version 1
"
,
"
summary
"
:
os
.
getenv
(
"
ARKINDEX_WORKER_RUN_ID
"
)
+
"
@ version 1
"
,
},
},
)
)
...
@@ -103,10 +107,10 @@ def _mock_worker_run_api(mock_api_client: MockApiClient) -> None:
...
@@ -103,10 +107,10 @@ def _mock_worker_run_api(mock_api_client: MockApiClient) -> None:
@pytest.fixture
()
@pytest.fixture
()
def
mock_worker
(
def
mock_worker
(
_mock_worker_run_api
,
tmp_path_factory
,
monkeypatch
_mock_worker_run_api
,
tmp_path_factory
,
monkeypatch
)
->
InitElementWorker
:
)
->
InitElement
s
Worker
:
monkeypatch
.
setattr
(
sys
,
"
argv
"
,
[
"
worker-init-elements
"
])
monkeypatch
.
setattr
(
sys
,
"
argv
"
,
[
"
worker-init-elements
"
])
worker
=
InitElementWorker
()
worker
=
InitElement
s
Worker
()
worker
.
work_dir
=
tmp_path_factory
.
mktemp
(
"
data
"
)
worker
.
work_dir
=
tmp_path_factory
.
mktemp
(
"
data
"
)
worker
.
configure
()
worker
.
configure
()
...
...
This diff is collapsed.
Click to expand it.
tests/test_worker.py
+
78
−
27
View file @
0d2722c7
...
@@ -5,6 +5,7 @@ from pathlib import Path
...
@@ -5,6 +5,7 @@ from pathlib import Path
import
pytest
import
pytest
from
arkindex_worker.cache
import
SQL_VERSION
from
worker_init_elements.worker
import
INIT_PAGE_SIZE
from
worker_init_elements.worker
import
INIT_PAGE_SIZE
...
@@ -20,7 +21,7 @@ def check_db(db_path: Path, elements: list, images: list) -> None:
...
@@ -20,7 +21,7 @@ def check_db(db_path: Path, elements: list, images: list) -> None:
db
.
row_factory
=
sqlite3
.
Row
db
.
row_factory
=
sqlite3
.
Row
assert
list
(
map
(
dict
,
db
.
execute
(
"
select * from version
"
).
fetchall
()))
==
[
assert
list
(
map
(
dict
,
db
.
execute
(
"
select * from version
"
).
fetchall
()))
==
[
{
"
version
"
:
3
}
{
"
version
"
:
SQL_VERSION
}
]
]
assert
(
assert
(
list
(
map
(
dict
,
db
.
execute
(
"
select * from elements order by id
"
).
fetchall
()))
list
(
map
(
dict
,
db
.
execute
(
"
select * from elements order by id
"
).
fetchall
()))
...
@@ -38,14 +39,6 @@ def check_db(db_path: Path, elements: list, images: list) -> None:
...
@@ -38,14 +39,6 @@ def check_db(db_path: Path, elements: list, images: list) -> None:
def
test_run_process
(
use_cache
,
mock_worker
):
def
test_run_process
(
use_cache
,
mock_worker
):
mock_worker
.
use_cache
=
use_cache
mock_worker
.
use_cache
=
use_cache
mock_worker
.
api_client
.
add_response
(
"
RetrieveProcess
"
,
id
=
mock_worker
.
process_information
[
"
id
"
],
response
=
{
"
activity_state
"
:
"
ready
"
,
"
corpus
"
:
"
corpusid
"
,
},
)
mock_worker
.
api_client
.
add_response
(
mock_worker
.
api_client
.
add_response
(
"
RetrieveCorpus
"
,
"
RetrieveCorpus
"
,
id
=
mock_worker
.
process_information
[
"
corpus
"
],
id
=
mock_worker
.
process_information
[
"
corpus
"
],
...
@@ -72,6 +65,7 @@ def test_run_process(use_cache, mock_worker):
...
@@ -72,6 +65,7 @@ def test_run_process(use_cache, mock_worker):
"
polygon
"
:
[[
0
,
0
],
[
0
,
1
],
[
1
,
1
],
[
1
,
0
],
[
0
,
0
]],
"
polygon
"
:
[[
0
,
0
],
[
0
,
1
],
[
1
,
1
],
[
1
,
0
],
[
0
,
0
]],
"
rotation_angle
"
:
42
,
"
rotation_angle
"
:
42
,
"
mirrored
"
:
False
,
"
mirrored
"
:
False
,
"
confidence
"
:
None
,
},
},
{
{
"
id
"
:
"
22222222-2222-2222-2222-222222222222
"
,
"
id
"
:
"
22222222-2222-2222-2222-222222222222
"
,
...
@@ -84,6 +78,7 @@ def test_run_process(use_cache, mock_worker):
...
@@ -84,6 +78,7 @@ def test_run_process(use_cache, mock_worker):
"
polygon
"
:
[[
0
,
0
],
[
0
,
2
],
[
2
,
2
],
[
2
,
0
],
[
0
,
0
]],
"
polygon
"
:
[[
0
,
0
],
[
0
,
2
],
[
2
,
2
],
[
2
,
0
],
[
0
,
0
]],
"
rotation_angle
"
:
0
,
"
rotation_angle
"
:
0
,
"
mirrored
"
:
True
,
"
mirrored
"
:
True
,
"
confidence
"
:
None
,
},
},
{
{
"
id
"
:
"
33333333-3333-3333-3333-333333333333
"
,
"
id
"
:
"
33333333-3333-3333-3333-333333333333
"
,
...
@@ -92,6 +87,17 @@ def test_run_process(use_cache, mock_worker):
...
@@ -92,6 +87,17 @@ def test_run_process(use_cache, mock_worker):
"
rotation_angle
"
:
17
,
"
rotation_angle
"
:
17
,
"
mirrored
"
:
True
,
"
mirrored
"
:
True
,
"
confidence
"
:
0.42
,
"
confidence
"
:
0.42
,
**
(
{
"
image_id
"
:
None
,
"
image_width
"
:
None
,
"
image_height
"
:
None
,
"
image_url
"
:
None
,
"
polygon
"
:
None
,
}
if
mock_worker
.
use_cache
else
{}
),
},
},
],
],
)
)
...
@@ -108,7 +114,7 @@ def test_run_process(use_cache, mock_worker):
...
@@ -108,7 +114,7 @@ def test_run_process(use_cache, mock_worker):
)
)
db_path
=
mock_worker
.
work_dir
/
"
db.sqlite
"
db_path
=
mock_worker
.
work_dir
/
"
db.sqlite
"
assert
db_path
.
is_file
()
==
use_cache
assert
db_path
.
is_file
()
is
use_cache
if
use_cache
:
if
use_cache
:
check_db
(
check_db
(
db_path
=
db_path
,
db_path
=
db_path
,
...
@@ -174,14 +180,6 @@ def test_run_distributed(mock_worker):
...
@@ -174,14 +180,6 @@ def test_run_distributed(mock_worker):
mock_worker
.
use_cache
=
True
mock_worker
.
use_cache
=
True
mock_worker
.
chunks_number
=
4
mock_worker
.
chunks_number
=
4
mock_worker
.
api_client
.
add_response
(
"
RetrieveProcess
"
,
id
=
mock_worker
.
process_information
[
"
id
"
],
response
=
{
"
activity_state
"
:
"
ready
"
,
"
corpus
"
:
"
corpusid
"
,
},
)
mock_worker
.
api_client
.
add_response
(
mock_worker
.
api_client
.
add_response
(
"
RetrieveCorpus
"
,
"
RetrieveCorpus
"
,
id
=
mock_worker
.
process_information
[
"
corpus
"
],
id
=
mock_worker
.
process_information
[
"
corpus
"
],
...
@@ -201,6 +199,14 @@ def test_run_distributed(mock_worker):
...
@@ -201,6 +199,14 @@ def test_run_distributed(mock_worker):
"
id
"
:
"
22222222-2222-2222-2222-222222222222
"
,
"
id
"
:
"
22222222-2222-2222-2222-222222222222
"
,
"
type_id
"
:
"
A
"
,
"
type_id
"
:
"
A
"
,
"
name
"
:
"
Class 2
"
,
"
name
"
:
"
Class 2
"
,
"
image_id
"
:
None
,
"
image_width
"
:
None
,
"
image_height
"
:
None
,
"
image_url
"
:
None
,
"
polygon
"
:
None
,
"
rotation_angle
"
:
0
,
"
mirrored
"
:
False
,
"
confidence
"
:
None
,
},
},
{
{
"
id
"
:
"
aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa
"
,
"
id
"
:
"
aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa
"
,
...
@@ -211,6 +217,9 @@ def test_run_distributed(mock_worker):
...
@@ -211,6 +217,9 @@ def test_run_distributed(mock_worker):
"
image_height
"
:
1337
,
"
image_height
"
:
1337
,
"
image_url
"
:
"
http://cafe
"
,
"
image_url
"
:
"
http://cafe
"
,
"
polygon
"
:
[[
0
,
0
],
[
0
,
1
],
[
1
,
1
],
[
1
,
0
],
[
0
,
0
]],
"
polygon
"
:
[[
0
,
0
],
[
0
,
1
],
[
1
,
1
],
[
1
,
0
],
[
0
,
0
]],
"
rotation_angle
"
:
0
,
"
mirrored
"
:
False
,
"
confidence
"
:
None
,
},
},
{
{
"
id
"
:
"
bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb
"
,
"
id
"
:
"
bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb
"
,
...
@@ -221,6 +230,9 @@ def test_run_distributed(mock_worker):
...
@@ -221,6 +230,9 @@ def test_run_distributed(mock_worker):
"
image_height
"
:
1337
,
"
image_height
"
:
1337
,
"
image_url
"
:
"
http://beef
"
,
"
image_url
"
:
"
http://beef
"
,
"
polygon
"
:
[[
0
,
0
],
[
0
,
2
],
[
2
,
2
],
[
2
,
0
],
[
0
,
0
]],
"
polygon
"
:
[[
0
,
0
],
[
0
,
2
],
[
2
,
2
],
[
2
,
0
],
[
0
,
0
]],
"
rotation_angle
"
:
0
,
"
mirrored
"
:
False
,
"
confidence
"
:
None
,
},
},
{
{
"
id
"
:
"
cccccccc-cccc-cccc-cccc-cccccccccccc
"
,
"
id
"
:
"
cccccccc-cccc-cccc-cccc-cccccccccccc
"
,
...
@@ -231,6 +243,9 @@ def test_run_distributed(mock_worker):
...
@@ -231,6 +243,9 @@ def test_run_distributed(mock_worker):
"
image_height
"
:
1337
,
"
image_height
"
:
1337
,
"
image_url
"
:
"
http://beef
"
,
"
image_url
"
:
"
http://beef
"
,
"
polygon
"
:
[[
0
,
0
],
[
0
,
2
],
[
2
,
2
],
[
2
,
0
],
[
0
,
0
]],
"
polygon
"
:
[[
0
,
0
],
[
0
,
2
],
[
2
,
2
],
[
2
,
0
],
[
0
,
0
]],
"
rotation_angle
"
:
0
,
"
mirrored
"
:
False
,
"
confidence
"
:
None
,
},
},
{
{
"
id
"
:
"
dddddddd-dddd-dddd-dddd-dddddddddddd
"
,
"
id
"
:
"
dddddddd-dddd-dddd-dddd-dddddddddddd
"
,
...
@@ -241,16 +256,35 @@ def test_run_distributed(mock_worker):
...
@@ -241,16 +256,35 @@ def test_run_distributed(mock_worker):
"
image_height
"
:
1337
,
"
image_height
"
:
1337
,
"
image_url
"
:
"
http://cafe
"
,
"
image_url
"
:
"
http://cafe
"
,
"
polygon
"
:
[[
0
,
0
],
[
0
,
1
],
[
1
,
1
],
[
1
,
0
],
[
0
,
0
]],
"
polygon
"
:
[[
0
,
0
],
[
0
,
1
],
[
1
,
1
],
[
1
,
0
],
[
0
,
0
]],
"
rotation_angle
"
:
0
,
"
mirrored
"
:
False
,
"
confidence
"
:
None
,
},
},
{
{
"
id
"
:
"
eeeeeeee-eeee-eeee-eeee-eeeeeeeeeeee
"
,
"
id
"
:
"
eeeeeeee-eeee-eeee-eeee-eeeeeeeeeeee
"
,
"
type_id
"
:
"
B
"
,
"
type_id
"
:
"
B
"
,
"
name
"
:
"
Student 5
"
,
"
name
"
:
"
Student 5
"
,
"
image_id
"
:
None
,
"
image_width
"
:
None
,
"
image_height
"
:
None
,
"
image_url
"
:
None
,
"
polygon
"
:
None
,
"
rotation_angle
"
:
0
,
"
mirrored
"
:
False
,
"
confidence
"
:
None
,
},
},
{
{
"
id
"
:
"
ffffffff-ffff-ffff-ffff-ffffffffffff
"
,
"
id
"
:
"
ffffffff-ffff-ffff-ffff-ffffffffffff
"
,
"
type_id
"
:
"
B
"
,
"
type_id
"
:
"
B
"
,
"
name
"
:
"
Student 6
"
,
"
name
"
:
"
Student 6
"
,
"
image_id
"
:
None
,
"
image_width
"
:
None
,
"
image_height
"
:
None
,
"
image_url
"
:
None
,
"
polygon
"
:
None
,
"
rotation_angle
"
:
0
,
"
mirrored
"
:
False
,
"
confidence
"
:
None
,
},
},
],
],
)
)
...
@@ -460,17 +494,11 @@ def test_not_enough_elements(mock_worker):
...
@@ -460,17 +494,11 @@ def test_not_enough_elements(mock_worker):
mock_worker
.
process
()
mock_worker
.
process
()
def
test_run_duplicates
(
mock_worker
):
def
test_run_duplicates
(
mock_worker
,
caplog
):
caplog
.
set_level
(
logging
.
WARNING
)
mock_worker
.
use_cache
=
True
mock_worker
.
use_cache
=
True
mock_worker
.
api_client
.
add_response
(
"
RetrieveProcess
"
,
id
=
mock_worker
.
process_information
[
"
id
"
],
response
=
{
"
activity_state
"
:
"
ready
"
,
"
corpus
"
:
"
corpusid
"
,
},
)
mock_worker
.
api_client
.
add_response
(
mock_worker
.
api_client
.
add_response
(
"
RetrieveCorpus
"
,
"
RetrieveCorpus
"
,
id
=
mock_worker
.
process_information
[
"
corpus
"
],
id
=
mock_worker
.
process_information
[
"
corpus
"
],
...
@@ -495,6 +523,9 @@ def test_run_duplicates(mock_worker):
...
@@ -495,6 +523,9 @@ def test_run_duplicates(mock_worker):
"
image_height
"
:
1337
,
"
image_height
"
:
1337
,
"
image_url
"
:
"
http://cafe
"
,
"
image_url
"
:
"
http://cafe
"
,
"
polygon
"
:
[[
0
,
0
],
[
0
,
1
],
[
1
,
1
],
[
1
,
0
],
[
0
,
0
]],
"
polygon
"
:
[[
0
,
0
],
[
0
,
1
],
[
1
,
1
],
[
1
,
0
],
[
0
,
0
]],
"
rotation_angle
"
:
0
,
"
mirrored
"
:
False
,
"
confidence
"
:
None
,
},
},
{
{
"
id
"
:
"
22222222-2222-2222-2222-222222222222
"
,
"
id
"
:
"
22222222-2222-2222-2222-222222222222
"
,
...
@@ -505,6 +536,9 @@ def test_run_duplicates(mock_worker):
...
@@ -505,6 +536,9 @@ def test_run_duplicates(mock_worker):
"
image_height
"
:
1337
,
"
image_height
"
:
1337
,
"
image_url
"
:
"
http://cafe
"
,
"
image_url
"
:
"
http://cafe
"
,
"
polygon
"
:
[[
0
,
0
],
[
0
,
1
],
[
1
,
1
],
[
1
,
0
],
[
0
,
0
]],
"
polygon
"
:
[[
0
,
0
],
[
0
,
1
],
[
1
,
1
],
[
1
,
0
],
[
0
,
0
]],
"
rotation_angle
"
:
0
,
"
mirrored
"
:
False
,
"
confidence
"
:
None
,
},
},
{
{
"
id
"
:
"
22222222-2222-2222-2222-222222222222
"
,
"
id
"
:
"
22222222-2222-2222-2222-222222222222
"
,
...
@@ -515,11 +549,22 @@ def test_run_duplicates(mock_worker):
...
@@ -515,11 +549,22 @@ def test_run_duplicates(mock_worker):
"
image_height
"
:
1337
,
"
image_height
"
:
1337
,
"
image_url
"
:
"
http://cafe
"
,
"
image_url
"
:
"
http://cafe
"
,
"
polygon
"
:
[[
0
,
0
],
[
0
,
1
],
[
1
,
1
],
[
1
,
0
],
[
0
,
0
]],
"
polygon
"
:
[[
0
,
0
],
[
0
,
1
],
[
1
,
1
],
[
1
,
0
],
[
0
,
0
]],
"
rotation_angle
"
:
0
,
"
mirrored
"
:
False
,
"
confidence
"
:
None
,
},
},
{
{
"
id
"
:
"
33333333-3333-3333-3333-333333333333
"
,
"
id
"
:
"
33333333-3333-3333-3333-333333333333
"
,
"
type_id
"
:
"
A
"
,
"
type_id
"
:
"
A
"
,
"
name
"
:
"
Class 3
"
,
"
name
"
:
"
Class 3
"
,
"
image_id
"
:
None
,
"
image_width
"
:
None
,
"
image_height
"
:
None
,
"
image_url
"
:
None
,
"
polygon
"
:
None
,
"
rotation_angle
"
:
0
,
"
mirrored
"
:
False
,
"
confidence
"
:
None
,
},
},
],
],
)
)
...
@@ -588,6 +633,10 @@ def test_run_duplicates(mock_worker):
...
@@ -588,6 +633,10 @@ def test_run_duplicates(mock_worker):
],
],
)
)
assert
[(
record
.
levelname
,
record
.
message
)
for
record
in
caplog
.
records
]
==
[
(
"
WARNING
"
,
"
1 duplicate elements have been ignored.
"
)
]
def
test_activity_state_awaiting
(
mock_worker
,
monkeypatch
):
def
test_activity_state_awaiting
(
mock_worker
,
monkeypatch
):
"""
"""
...
@@ -649,6 +698,8 @@ def test_activity_state_timeout(mock_worker, caplog, monkeypatch):
...
@@ -649,6 +698,8 @@ def test_activity_state_timeout(mock_worker, caplog, monkeypatch):
"""
"""
caplog
.
set_level
(
logging
.
WARNING
)
caplog
.
set_level
(
logging
.
WARNING
)
mock_worker
.
process_information
[
"
activity_state
"
]
=
"
pending
"
sleep_args
=
[]
sleep_args
=
[]
monkeypatch
.
setattr
(
monkeypatch
.
setattr
(
"
worker_init_elements.worker.sleep
"
,
lambda
seconds
:
sleep_args
.
append
(
seconds
)
"
worker_init_elements.worker.sleep
"
,
lambda
seconds
:
sleep_args
.
append
(
seconds
)
...
...
This diff is collapsed.
Click to expand it.
worker_init_elements/worker.py
+
89
−
58
View file @
0d2722c7
...
@@ -4,6 +4,7 @@ import sys
...
@@ -4,6 +4,7 @@ import sys
import
uuid
import
uuid
from
collections
import
OrderedDict
from
collections
import
OrderedDict
from
collections.abc
import
Iterator
from
collections.abc
import
Iterator
from
enum
import
Enum
from
logging
import
Logger
,
getLogger
from
logging
import
Logger
,
getLogger
from
time
import
sleep
from
time
import
sleep
...
@@ -14,17 +15,13 @@ from arkindex_worker.cache import (
...
@@ -14,17 +15,13 @@ from arkindex_worker.cache import (
create_version_table
,
create_version_table
,
init_cache_db
,
init_cache_db
,
)
)
from
arkindex_worker.models
import
Element
as
BaseElement
from
arkindex_worker.worker.base
import
BaseWorker
from
arkindex_worker.worker.base
import
BaseWorker
logger
:
Logger
=
getLogger
(
__name__
)
logger
:
Logger
=
getLogger
(
__name__
)
# Increases the number of elements returned per page by the API
INIT_PAGE_SIZE
=
500
INIT_PAGE_SIZE
=
500
# Await worker activities initialization when the process activity state is tagged as pending
PENDING_STATE
=
"
pending
"
ERROR_STATE
=
"
error
"
def
split_chunks
(
items
:
list
,
n
:
int
)
->
Iterator
[
list
]:
def
split_chunks
(
items
:
list
,
n
:
int
)
->
Iterator
[
list
]:
"""
"""
...
@@ -35,22 +32,35 @@ def split_chunks(items: list, n: int) -> Iterator[list]:
...
@@ -35,22 +32,35 @@ def split_chunks(items: list, n: int) -> Iterator[list]:
yield
items
[
i
::
n
]
yield
items
[
i
::
n
]
class
Element
(
BaseElement
):
class
ActivityState
(
Enum
):
"""
Store the state of the workers activity tracking for a process.
To support large elements set, the state is asynchronously set to `ready` after a process
has been started and worker activities have been initialized on its elements.
"""
Disabled
=
"
disabled
"
"""
Worker activities are disabled and will not be used
"""
Pending
=
"
pending
"
"""
"""
Some attributes in the `ListProcessElements` response conflict with the `BaseElement` property/function.
Worker activities are not yet initialized
Override them to access the API response directly.
"""
"""
@property
Ready
=
"
ready
"
def
polygon
(
self
)
->
list
[
float
]:
"""
return
self
[
"
polygon
"
]
Worker activities are initialized and ready for use
"""
@property
Error
=
"
error
"
def
image_url
(
self
)
->
list
[
float
]:
"""
return
self
[
"
image_url
"
]
An error occurred when initializing worker activities
"""
class
InitElementWorker
(
BaseWorker
):
class
InitElement
s
Worker
(
BaseWorker
):
def
configure
(
self
)
->
None
:
def
configure
(
self
)
->
None
:
# CLI args are stored on the instance so that implementations can access them
# CLI args are stored on the instance so that implementations can access them
self
.
args
=
self
.
parser
.
parse_args
()
self
.
args
=
self
.
parser
.
parse_args
()
...
@@ -69,15 +79,21 @@ class InitElementWorker(BaseWorker):
...
@@ -69,15 +79,21 @@ class InitElementWorker(BaseWorker):
self
.
use_cache
=
self
.
config
[
"
use_cache
"
]
self
.
use_cache
=
self
.
config
[
"
use_cache
"
]
self
.
api_client
.
sleep_duration
=
self
.
config
[
"
sleep
"
]
self
.
api_client
.
sleep_duration
=
self
.
config
[
"
sleep
"
]
def
dump_json
(
def
dump_json
(
self
,
elements
:
list
[
dict
],
filename
:
str
=
"
elements.json
"
)
->
None
:
self
,
elements
:
list
[
Element
],
filename
:
str
=
"
elements.json
"
"""
)
->
None
:
Store elements in a JSON file.
This file will become an artefact.
"""
path
=
self
.
work_dir
/
filename
path
=
self
.
work_dir
/
filename
assert
not
path
.
exists
(),
f
"
JSON at
{
path
}
already exists
"
assert
not
path
.
exists
(),
f
"
JSON at
{
path
}
already exists
"
path
.
write_text
(
json
.
dumps
(
elements
,
indent
=
4
))
path
.
write_text
(
json
.
dumps
(
elements
,
indent
=
4
))
def
dump_sqlite
(
self
,
elements
:
list
[
Element
],
filename
:
str
=
"
db.sqlite
"
)
->
None
:
def
dump_sqlite
(
self
,
elements
:
list
[
dict
],
filename
:
str
=
"
db.sqlite
"
)
->
None
:
"""
Store elements in a SQLite database. Only images and elements will be added.
This file will become an artefact.
"""
if
not
self
.
use_cache
:
if
not
self
.
use_cache
:
return
return
...
@@ -92,40 +108,40 @@ class InitElementWorker(BaseWorker):
...
@@ -92,40 +108,40 @@ class InitElementWorker(BaseWorker):
# Set of unique images found in the elements
# Set of unique images found in the elements
CachedImage
.
insert_many
(
CachedImage
.
insert_many
(
[
{
{
"
id
"
:
uuid
.
UUID
(
element
[
"
image_id
"
]).
hex
,
"
id
"
:
uuid
.
UUID
(
element
.
image_id
).
hex
,
"
width
"
:
element
[
"
image_width
"
],
"
width
"
:
element
.
image_width
,
"
height
"
:
element
[
"
image_height
"
],
"
height
"
:
element
.
image_height
,
"
url
"
:
element
[
"
image_url
"
],
"
url
"
:
element
.
image_url
,
}
}
for
element
in
elements
for
element
in
elements
if
element
[
"
image_id
"
]
if
element
.
get
(
"
image_id
"
)
is
not
None
]
).
on_conflict_ignore
(
ignore
=
True
).
execute
()
).
on_conflict_ignore
(
ignore
=
True
).
execute
()
# Fastest way to INSERT multiple rows.
# Fastest way to INSERT multiple rows.
CachedElement
.
insert_many
(
CachedElement
.
insert_many
(
[
{
{
"
id
"
:
uuid
.
UUID
(
element
[
"
id
"
]).
hex
,
"
id
"
:
uuid
.
UUID
(
element
.
id
).
hex
,
"
type
"
:
element
[
"
type
"
],
"
type
"
:
element
.
type
,
"
image_id
"
:
(
"
image_id
"
:
uuid
.
UUID
(
element
.
image_id
).
hex
uuid
.
UUID
(
element
[
"
image_id
"
]).
hex
if
element
[
"
image_id
"
]
else
None
if
element
.
get
(
"
image_id
"
)
),
else
None
,
"
polygon
"
:
element
[
"
polygon
"
],
"
polygon
"
:
(
element
.
polygon
if
element
.
get
(
"
polygon
"
)
else
None
),
"
rotation_angle
"
:
element
[
"
rotation_angle
"
],
"
rotation_angle
"
:
element
.
get
(
"
rotation_angle
"
)
or
0
,
"
mirrored
"
:
element
[
"
mirrored
"
],
"
mirrored
"
:
element
.
get
(
"
mirrored
"
)
or
False
,
"
confidence
"
:
element
[
"
confidence
"
],
"
confidence
"
:
element
.
get
(
"
confidence
"
),
"
initial
"
:
True
,
"
initial
"
:
True
,
}
}
for
element
in
elements
for
element
in
elements
]
).
execute
()
).
execute
()
db
.
close
()
db
.
close
()
def
dump_chunks
(
self
,
elements
:
list
[
Element
])
->
None
:
def
dump_chunks
(
self
,
elements
:
list
[
dict
])
->
None
:
"""
Store elements in a JSON file(s) and SQLite database(s).
If several chunks are requested, the files will be suffixed with the chunk index.
"""
assert
(
assert
(
len
(
elements
)
>=
self
.
chunks_number
len
(
elements
)
>=
self
.
chunks_number
),
f
"
Too few elements have been retrieved to distribute workflow among
{
self
.
chunks_number
}
branches
"
),
f
"
Too few elements have been retrieved to distribute workflow among
{
self
.
chunks_number
}
branches
"
...
@@ -137,8 +153,8 @@ class InitElementWorker(BaseWorker):
...
@@ -137,8 +153,8 @@ class InitElementWorker(BaseWorker):
self
.
dump_json
(
self
.
dump_json
(
elements
=
[
elements
=
[
{
{
"
id
"
:
element
.
id
,
"
id
"
:
element
[
"
id
"
]
,
"
type
"
:
element
.
type
,
"
type
"
:
element
[
"
type
"
]
,
}
}
for
element
in
chunk_elts
for
element
in
chunk_elts
],
],
...
@@ -159,18 +175,21 @@ class InitElementWorker(BaseWorker):
...
@@ -159,18 +175,21 @@ class InitElementWorker(BaseWorker):
f
"
Added
{
len
(
elements
)
}
element
{
'
s
'
[
:
len
(
elements
)
>
1
]
}
to workflow configuration
"
f
"
Added
{
len
(
elements
)
}
element
{
'
s
'
[
:
len
(
elements
)
>
1
]
}
to workflow configuration
"
)
)
def
list_process_elements
(
self
)
->
list
[
Element
]:
def
list_process_elements
(
self
)
->
list
[
dict
]:
"""
List all elements linked to this process and remove duplicates
"""
assert
self
.
process_information
.
get
(
assert
self
.
process_information
.
get
(
"
corpus
"
"
corpus
"
),
"
init_elements
only supports processes on corpora.
"
),
"
This worker
only supports processes on corpora.
"
corpus
=
self
.
request
(
"
RetrieveCorpus
"
,
id
=
self
.
process_information
[
"
corpus
"
])
corpus
=
self
.
request
(
"
RetrieveCorpus
"
,
id
=
self
.
process_information
[
"
corpus
"
])
type_slugs
=
{
type_slugs
=
{
element_type
[
"
id
"
]:
element_type
[
"
slug
"
]
for
element_type
in
corpus
[
"
types
"
]
element_type
[
"
id
"
]:
element_type
[
"
slug
"
]
for
element_type
in
corpus
[
"
types
"
]
}
}
elements
=
list
(
elements
=
[
Element
(
**
element
,
type
=
type_slugs
[
element
[
"
type_id
"
]]
)
{
**
element
,
"
type
"
:
type_slugs
[
element
[
"
type_id
"
]]
}
for
element
in
self
.
api_client
.
paginate
(
for
element
in
self
.
api_client
.
paginate
(
"
ListProcessElements
"
,
"
ListProcessElements
"
,
id
=
self
.
process_information
[
"
id
"
],
id
=
self
.
process_information
[
"
id
"
],
...
@@ -178,7 +197,7 @@ class InitElementWorker(BaseWorker):
...
@@ -178,7 +197,7 @@ class InitElementWorker(BaseWorker):
allow_missing_data
=
True
,
allow_missing_data
=
True
,
page_size
=
INIT_PAGE_SIZE
,
page_size
=
INIT_PAGE_SIZE
,
)
)
)
]
# Use a dict to make elements unique by ID, then turn them back into a elements.json-compatible list
# Use a dict to make elements unique by ID, then turn them back into a elements.json-compatible list
unique_elements
=
OrderedDict
(
unique_elements
=
OrderedDict
(
[(
element
[
"
id
"
],
element
)
for
element
in
elements
]
[(
element
[
"
id
"
],
element
)
for
element
in
elements
]
...
@@ -199,17 +218,29 @@ class InitElementWorker(BaseWorker):
...
@@ -199,17 +218,29 @@ class InitElementWorker(BaseWorker):
return
list
(
unique_elements
.
values
())
return
list
(
unique_elements
.
values
())
def
check_worker_activity
(
self
)
->
bool
:
def
check_worker_activity
(
self
)
->
bool
:
# Check if workers activity associated to this process is in a pending state
"""
Check if workers activity associated to this process is in a pending state
"""
process
=
self
.
request
(
"
RetrieveProcess
"
,
id
=
self
.
process_information
[
"
id
"
])
process
=
self
.
request
(
"
RetrieveProcess
"
,
id
=
self
.
process_information
[
"
id
"
])
if
process
.
get
(
"
activity_state
"
)
==
ERROR_STATE
:
if
process
.
get
(
"
activity_state
"
)
==
ActivityState
.
Error
:
logger
.
error
(
logger
.
error
(
"
Worker
s
activit
y
could not be initialized. Please report this incident to an instance administrator.
"
"
Worker activit
ies
could not be initialized. Please report this incident to an instance administrator.
"
)
)
sys
.
exit
(
1
)
sys
.
exit
(
1
)
return
process
.
get
(
"
activity_state
"
)
!
=
PENDING_STATE
return
process
.
get
(
"
activity_state
"
)
=
=
ActivityState
.
Ready
def
await_worker_activity
(
self
)
->
None
:
def
await_worker_activity
(
self
)
->
None
:
logger
.
info
(
"
Awaiting workers activity initialization
"
)
"""
Worker activities are initialized asynchronously after a process has been started.
This worker should be running until all activities have moved to `Ready`.
"""
if
(
ActivityState
(
self
.
process_information
[
"
activity_state
"
])
==
ActivityState
.
Disabled
):
return
logger
.
info
(
"
Awaiting worker activities initialization
"
)
# Await worker activities to be initialized for 0, 2, 4, 8 seconds up to an hour
# Await worker activities to be initialized for 0, 2, 4, 8 seconds up to an hour
timer
=
1
timer
=
1
while
True
:
while
True
:
...
@@ -236,7 +267,7 @@ class InitElementWorker(BaseWorker):
...
@@ -236,7 +267,7 @@ class InitElementWorker(BaseWorker):
def
main
()
->
None
:
def
main
()
->
None
:
InitElementWorker
(
InitElement
s
Worker
(
description
=
"
Worker to initialize Arkindex elements to process
"
description
=
"
Worker to initialize Arkindex elements to process
"
).
run
()
).
run
()
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment