Skip to content
Snippets Groups Projects
Commit c5c76b59 authored by Yoann Schneider's avatar Yoann Schneider :tennis:
Browse files

Merge branch 'bump-arkindex-base-worker' into 'master'

Bump Python requirement arkindex-base-worker to 0.4.0

See merge request !17
parents af210a08 4e31126a
No related branches found
No related tags found
1 merge request!17Bump Python requirement arkindex-base-worker to 0.4.0
Pipeline #205915 passed
......@@ -6,5 +6,4 @@ workers:
name: Init Elements
type: extractor
docker:
build: Dockerfile
command: worker-init-elements
......@@ -14,7 +14,7 @@ variables:
DEBIAN_FRONTEND: non-interactive
test:
image: python:slim
image: python:3.12-slim
stage: test
cache:
......@@ -41,7 +41,7 @@ test:
- tox -- --junitxml=test-report.xml --durations=50
lint:
image: python:slim
image: python:3.12-slim
cache:
paths:
......@@ -105,4 +105,4 @@ bump-python-deps:
- schedules
script:
- devops python-deps requirements.txt
- devops python-deps pyproject.toml
repos:
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.3.1
rev: v0.8.2
hooks:
# Run the linter.
- id: ruff
......@@ -9,7 +9,7 @@ repos:
# Run the formatter.
- id: ruff-format
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.5.0
rev: v5.0.0
hooks:
- id: check-ast
- id: check-docstring-first
......@@ -25,9 +25,8 @@ repos:
args: ['--django']
- id: check-json
- id: check-toml
- id: requirements-txt-fixer
- repo: https://github.com/codespell-project/codespell
rev: v2.2.6
rev: v2.3.0
hooks:
- id: codespell
args: ['--write-changes']
......@@ -37,4 +36,10 @@ repos:
- repo: https://github.com/shellcheck-py/shellcheck-py
rev: v0.10.0.1
hooks:
- id: shellcheck
\ No newline at end of file
- id: shellcheck
- repo: https://gitlab.teklia.com/tools/pre-commit-hooks
rev: 0.1.0
hooks:
- id: long-test-files
args: ['1000']
files: '^tests\/(.*\/)?test_[^\/]*\.py$'
FROM python:3.11-slim
FROM python:3.12-slim
WORKDIR /src
# Install curl
ENV DEBIAN_FRONTEND=non-interactive
RUN apt-get update -q -y && apt-get install -q -y --no-install-recommends curl
# Install worker as a package
COPY worker_init_elements worker_init_elements
COPY requirements.txt setup.py pyproject.toml ./
COPY pyproject.toml ./
RUN pip install . --no-cache-dir
# Add archi local CA
RUN curl https://assets.teklia.com/teklia_dev_ca.pem > /usr/local/share/ca-certificates/arkindex-dev.crt && update-ca-certificates
ENV REQUESTS_CA_BUNDLE /etc/ssl/certs/ca-certificates.crt
CMD ["worker-init-elements"]
include requirements.txt
......@@ -6,7 +6,9 @@ build-backend = "setuptools.build_meta"
name = "worker_init_elements"
version = "0.1.1"
description = "Worker to initialize Arkindex elements to process"
dynamic = ["dependencies"]
dependencies = [
"arkindex-base-worker==0.4.0",
]
authors = [
{ name = "Teklia", email = "contact@teklia.com" },
]
......@@ -21,16 +23,18 @@ classifiers = [
"Programming Language :: Python :: 3 :: Only",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
]
[project.scripts]
worker-init-elements = "worker_init_elements.worker:main"
"worker-init-elements" = "worker_init_elements.worker:main"
[tool.setuptools.dynamic]
dependencies = { file = ["requirements.txt"] }
[tool.setuptools.packages]
find = {}
[tool.ruff]
exclude = [".git", "__pycache__"]
target-version = "py312"
[tool.ruff.lint]
ignore = ["E501"]
......
arkindex-base-worker==0.3.7.post1
#!/usr/bin/env python
from setuptools import find_packages, setup
setup(packages=find_packages())
......@@ -8,7 +8,7 @@ from arkindex_worker.worker.base import BaseWorker
from worker_init_elements.worker import InitElementsWorker
@pytest.fixture()
@pytest.fixture
def mock_api_client() -> MockApiClient:
return MockApiClient()
......@@ -39,7 +39,7 @@ def _setup_environment(mock_api_client: MockApiClient, responses, monkeypatch) -
monkeypatch.setattr(BaseWorker, "setup_api_client", mock_setup_api_client)
@pytest.fixture()
@pytest.fixture
def _mock_worker_run_api(mock_api_client: MockApiClient) -> None:
"""Provide a mock API response to get worker run information"""
mock_api_client.add_response(
......@@ -83,7 +83,7 @@ def _mock_worker_run_api(mock_api_client: MockApiClient) -> None:
)
@pytest.fixture()
@pytest.fixture
def mock_worker(
_mock_worker_run_api, tmp_path_factory, monkeypatch
) -> InitElementsWorker:
......
......@@ -2,8 +2,8 @@ import logging
import pytest
from arkindex_worker.worker.process import PROCESS_ELEMENTS_PAGE_SIZE
from tests import check_json
from worker_init_elements.worker import INIT_PAGE_SIZE
def test_activity_state_awaiting(mock_worker, monkeypatch):
......@@ -34,7 +34,22 @@ def test_activity_state_awaiting(mock_worker, monkeypatch):
id=mock_worker.process_information["corpus"],
response={
"id": mock_worker.process_information["corpus"],
"types": [{"id": "A", "slug": "class"}, {"id": "B", "slug": "student"}],
"types": [
{
"id": "A",
"slug": "class",
"display_name": "Class",
"folder": False,
"color": "#28b62c",
},
{
"id": "B",
"slug": "student",
"display_name": "Student",
"folder": False,
"color": "#28b62c",
},
],
},
)
mock_worker.api_client.add_response(
......@@ -42,7 +57,7 @@ def test_activity_state_awaiting(mock_worker, monkeypatch):
id=mock_worker.process_information["id"],
with_image=False,
allow_missing_data=True,
page_size=INIT_PAGE_SIZE,
page_size=PROCESS_ELEMENTS_PAGE_SIZE,
response=[
{
"id": "11111111-1111-1111-1111-111111111111",
......@@ -92,7 +107,22 @@ def test_activity_state_timeout(mock_worker, caplog, monkeypatch):
id=mock_worker.process_information["corpus"],
response={
"id": mock_worker.process_information["corpus"],
"types": [{"id": "A", "slug": "class"}, {"id": "B", "slug": "student"}],
"types": [
{
"id": "A",
"slug": "class",
"display_name": "Class",
"folder": False,
"color": "#28b62c",
},
{
"id": "B",
"slug": "student",
"display_name": "Student",
"folder": False,
"color": "#28b62c",
},
],
},
)
mock_worker.api_client.add_response(
......@@ -100,7 +130,7 @@ def test_activity_state_timeout(mock_worker, caplog, monkeypatch):
id=mock_worker.process_information["id"],
with_image=False,
allow_missing_data=True,
page_size=INIT_PAGE_SIZE,
page_size=PROCESS_ELEMENTS_PAGE_SIZE,
response=[
{
"id": "11111111-1111-1111-1111-111111111111",
......@@ -117,7 +147,7 @@ def test_activity_state_timeout(mock_worker, caplog, monkeypatch):
assert [(record.levelname, record.message) for record in caplog.records] == [
(
"ERROR",
"Workers activity not initialized 68 minutes after starting the process."
"Worker activities not initialized 68 minutes after starting the process."
" Please report this incident to an instance administrator.",
)
]
......@@ -2,8 +2,8 @@ import logging
import pytest
from arkindex_worker.worker.process import PROCESS_ELEMENTS_PAGE_SIZE
from tests import check_db, check_json
from worker_init_elements.worker import INIT_PAGE_SIZE
@pytest.mark.parametrize("use_cache", [True, False])
......@@ -15,7 +15,22 @@ def test_run_process(use_cache, mock_worker):
id=mock_worker.process_information["corpus"],
response={
"id": mock_worker.process_information["corpus"],
"types": [{"id": "A", "slug": "class"}, {"id": "B", "slug": "student"}],
"types": [
{
"id": "A",
"slug": "class",
"display_name": "Class",
"folder": False,
"color": "#28b62c",
},
{
"id": "B",
"slug": "student",
"display_name": "Student",
"folder": False,
"color": "#28b62c",
},
],
},
)
mock_worker.api_client.add_response(
......@@ -23,7 +38,7 @@ def test_run_process(use_cache, mock_worker):
id=mock_worker.process_information["id"],
with_image=mock_worker.use_cache,
allow_missing_data=True,
page_size=INIT_PAGE_SIZE,
page_size=PROCESS_ELEMENTS_PAGE_SIZE,
response=[
{
"id": "11111111-1111-1111-1111-111111111111",
......@@ -168,7 +183,22 @@ def test_run_distributed(mock_worker):
id=mock_worker.process_information["corpus"],
response={
"id": mock_worker.process_information["corpus"],
"types": [{"id": "A", "slug": "class"}, {"id": "B", "slug": "student"}],
"types": [
{
"id": "A",
"slug": "class",
"display_name": "Class",
"folder": False,
"color": "#28b62c",
},
{
"id": "B",
"slug": "student",
"display_name": "Student",
"folder": False,
"color": "#28b62c",
},
],
},
)
mock_worker.api_client.add_response(
......@@ -176,7 +206,7 @@ def test_run_distributed(mock_worker):
id=mock_worker.process_information["id"],
with_image=True,
allow_missing_data=True,
page_size=INIT_PAGE_SIZE,
page_size=PROCESS_ELEMENTS_PAGE_SIZE,
response=[
{
"id": "22222222-2222-2222-2222-222222222222",
......@@ -456,7 +486,22 @@ def test_not_enough_elements(mock_worker):
id=mock_worker.process_information["corpus"],
response={
"id": mock_worker.process_information["corpus"],
"types": [{"id": "A", "slug": "class"}, {"id": "B", "slug": "student"}],
"types": [
{
"id": "A",
"slug": "class",
"display_name": "Class",
"folder": False,
"color": "#28b62c",
},
{
"id": "B",
"slug": "student",
"display_name": "Student",
"folder": False,
"color": "#28b62c",
},
],
},
)
mock_worker.api_client.add_response(
......@@ -464,7 +509,7 @@ def test_not_enough_elements(mock_worker):
id=mock_worker.process_information["id"],
with_image=False,
allow_missing_data=True,
page_size=INIT_PAGE_SIZE,
page_size=PROCESS_ELEMENTS_PAGE_SIZE,
response=[
{
"id": "22222222-2222-2222-2222-222222222222",
......@@ -488,7 +533,22 @@ def test_run_duplicates(mock_worker, caplog):
id=mock_worker.process_information["corpus"],
response={
"id": mock_worker.process_information["corpus"],
"types": [{"id": "A", "slug": "class"}, {"id": "B", "slug": "student"}],
"types": [
{
"id": "A",
"slug": "class",
"display_name": "Class",
"folder": False,
"color": "#28b62c",
},
{
"id": "B",
"slug": "student",
"display_name": "Student",
"folder": False,
"color": "#28b62c",
},
],
},
)
mock_worker.api_client.add_response(
......@@ -496,7 +556,7 @@ def test_run_duplicates(mock_worker, caplog):
id=mock_worker.process_information["id"],
with_image=True,
allow_missing_data=True,
page_size=INIT_PAGE_SIZE,
page_size=PROCESS_ELEMENTS_PAGE_SIZE,
response=[
{
"id": "11111111-1111-1111-1111-111111111111",
......@@ -618,7 +678,11 @@ def test_run_duplicates(mock_worker, caplog):
)
assert [(record.levelname, record.message) for record in caplog.records] == [
("WARNING", "1 duplicate elements have been ignored.")
(
"WARNING",
"This API helper `list_process_elements` did not update the cache database",
),
("WARNING", "1 duplicate element have been ignored."),
]
......@@ -630,7 +694,22 @@ def test_run_empty(mock_worker, caplog):
id=mock_worker.process_information["corpus"],
response={
"id": mock_worker.process_information["corpus"],
"types": [{"id": "A", "slug": "class"}, {"id": "B", "slug": "student"}],
"types": [
{
"id": "A",
"slug": "class",
"display_name": "Class",
"folder": False,
"color": "#28b62c",
},
{
"id": "B",
"slug": "student",
"display_name": "Student",
"folder": False,
"color": "#28b62c",
},
],
},
)
mock_worker.api_client.add_response(
......@@ -638,7 +717,7 @@ def test_run_empty(mock_worker, caplog):
id=mock_worker.process_information["id"],
with_image=False,
allow_missing_data=True,
page_size=INIT_PAGE_SIZE,
page_size=PROCESS_ELEMENTS_PAGE_SIZE,
response=[],
)
......
......@@ -9,4 +9,3 @@ commands =
deps =
pytest
pytest-responses
-rrequirements.txt
......@@ -15,13 +15,13 @@ from arkindex_worker.cache import (
create_version_table,
init_cache_db,
)
from arkindex_worker.utils import pluralize
from arkindex_worker.worker.base import BaseWorker
from arkindex_worker.worker.element import ElementMixin
from arkindex_worker.worker.process import ProcessMixin
logger: Logger = getLogger(__name__)
# Increases the number of elements returned per page by the API
INIT_PAGE_SIZE = 500
def split_chunks(items: list, n: int) -> Iterator[list]:
"""
......@@ -60,15 +60,9 @@ class ActivityState(Enum):
"""
class InitElementsWorker(BaseWorker):
class InitElementsWorker(BaseWorker, ProcessMixin, ElementMixin):
def configure(self) -> None:
# CLI args are stored on the instance so that implementations can access them
self.args = self.parser.parse_args()
if self.is_read_only:
super().configure_for_developers()
else:
super().configure()
super().configure()
self.chunks_number = self.process_information["chunks"]
self.use_cache = self.process_information["use_cache"]
......@@ -166,30 +160,22 @@ class InitElementsWorker(BaseWorker):
)
logger.info(
f"Added {len(elements)} element{'s'[:len(elements) > 1]} to workflow configuration"
f"Added {len(elements)} {pluralize('element', len(elements))} to workflow configuration"
)
def list_process_elements(self) -> list[dict]:
"""
List all elements linked to this process and remove duplicates
"""
assert self.process_information.get(
"corpus"
), "This worker only supports processes on corpora."
corpus = self.request("RetrieveCorpus", id=self.process_information["corpus"])
# Load corpus element types
self.list_corpus_types()
type_slugs = {
element_type["id"]: element_type["slug"] for element_type in corpus["types"]
element_type["id"]: slug for slug, element_type in self.corpus_types.items()
}
elements = [
{**element, "type": type_slugs[element["type_id"]]}
for element in self.api_client.paginate(
"ListProcessElements",
id=self.process_information["id"],
for element in super().list_process_elements(
with_image=self.use_cache,
allow_missing_data=True,
page_size=INIT_PAGE_SIZE,
)
]
# Use a dict to make elements unique by ID, then turn them back into a elements.json-compatible list
......@@ -198,12 +184,14 @@ class InitElementsWorker(BaseWorker):
)
logger.info(
f"Retrieved {len(unique_elements)} element{'s'[:len(unique_elements) > 1]} from process {self.process_information['id']}"
f"Retrieved {len(unique_elements)} {pluralize('element', len(unique_elements))} from process {self.process_information['id']}"
)
duplicate_count = len(elements) - len(unique_elements)
if duplicate_count:
logger.warning(f"{duplicate_count} duplicate elements have been ignored.")
logger.warning(
f"{duplicate_count} duplicate {pluralize('element', duplicate_count)} have been ignored."
)
if not unique_elements:
logger.error("No elements found, aborting workflow.")
......@@ -213,12 +201,12 @@ class InitElementsWorker(BaseWorker):
def check_worker_activity(self) -> bool:
"""
Check if workers activity associated to this process is in a pending state
Check if worker activities associated to this process is in a pending state
"""
activity_state = ActivityState(
self.request("RetrieveProcess", id=self.process_information["id"])[
"activity_state"
]
self.api_client.request(
"RetrieveProcess", id=self.process_information["id"]
)["activity_state"]
)
if activity_state == ActivityState.Error:
logger.error(
......@@ -247,7 +235,7 @@ class InitElementsWorker(BaseWorker):
timer *= 2
if timer >= 3600:
logger.error(
f"Workers activity not initialized {int(timer/60)} minutes after starting the process."
f"Worker activities not initialized {int(timer/60)} minutes after starting the process."
" Please report this incident to an instance administrator."
)
raise Exception("Worker activity timeout")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment