diff --git a/arkindex_worker/worker/__init__.py b/arkindex_worker/worker/__init__.py index cfd043dcb2397a0b132b15d7ed4193ca68d92ce8..318e7ab56b88ec706d04b589872095d25a8e69cb 100644 --- a/arkindex_worker/worker/__init__.py +++ b/arkindex_worker/worker/__init__.py @@ -127,16 +127,22 @@ class ElementsWorker( logger.info(f"Processing {element} ({i}/{count})") # Process the element and report its progress if activities are enabled - self.update_activity(element.id, ActivityState.Started) - self.process_element(element) - self.update_activity(element.id, ActivityState.Processed) + if self.update_activity(element.id, ActivityState.Started): + self.process_element(element) + self.update_activity(element.id, ActivityState.Processed) + else: + logger.info( + f"Skipping element {element.id} as it was already processed" + ) + continue except Exception as e: + # Handle errors occurring while retrieving, processing or patching the activity for this element. + # Count the element as failed in case the activity update to "started" failed with no conflict. + # This prevent from processing the element failed += 1 - element_id = ( - element.id - if isinstance(element, (Element, CachedElement)) - else item - ) + + # Handle the case where we failed retrieving the element + element_id = element.id if element else item if isinstance(e, ErrorResponse): message = f"An API error occurred while processing element {element_id}: {e.title} - {e.content}" @@ -147,7 +153,12 @@ class ElementsWorker( message, exc_info=e if self.args.verbose else None, ) - self.update_activity(element_id, ActivityState.Error) + if element: + # Try to update the activity to error state regardless of the response + try: + self.update_activity(element.id, ActivityState.Error) + except Exception: + pass self.report.error(element_id, e) # Save report as local artifact @@ -168,13 +179,14 @@ class ElementsWorker( def update_activity(self, element_id, state): """ Update worker activity for this element - This method should not raise a runtime exception, but simply warn users + Returns False when there is a conflict initializing the activity + Otherwise return True or the response payload """ if not self.store_activity: logger.debug( "Activity is not stored as the feature is disabled on this process" ) - return + return True assert element_id and isinstance( element_id, (uuid.UUID, str) @@ -183,10 +195,10 @@ class ElementsWorker( if self.is_read_only: logger.warning("Cannot update activity as this worker is in read-only mode") - return + return True try: - out = self.request( + self.request( "UpdateWorkerActivity", id=self.worker_version_id, body={ @@ -195,13 +207,22 @@ class ElementsWorker( "state": state.value, }, ) - logger.debug(f"Updated activity of element {element_id} to {state}") - return out except ErrorResponse as e: + if state == ActivityState.Started and e.status_code == 409: + # 409 conflict error when updating the state of an activity to "started" mean that we + # cannot process this element. We assume that the reason is that the state transition + # was forbidden i.e. that the activity was already in a started or processed state. + # This allow concurrent access to an element activity between multiple processes. + # Element should not be counted as failed as it is probably handled somewhere else. + logger.debug( + f"Cannot start processing element {element_id} due to a conflict. " + f"Another process could have processed it with the same version already." + ) + return False logger.warning( f"Failed to update activity of element {element_id} to {state.value} due to an API error: {e.content}" ) - except Exception as e: - logger.warning( - f"Failed to update activity of element {element_id} to {state.value}: {e}" - ) + raise e + + logger.debug(f"Updated activity of element {element_id} to {state}") + return True diff --git a/tests/conftest.py b/tests/conftest.py index 322ef9d6483a0d27344a070fa2f0fb6a11542ec8..5a4596c6f95e32b3442f29895dab3ad994e191da 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -202,6 +202,18 @@ def mock_user_api(responses): ) +@pytest.fixture +def mock_activity_calls(responses): + """ + Mock responses when updating the activity state for multiple element of the same version + """ + responses.add( + responses.PUT, + "http://testserver/api/v1/workers/versions/12341234-1234-1234-1234-123412341234/activity/", + status=200, + ) + + @pytest.fixture def mock_elements_worker(monkeypatch, mock_config_api): """Build and configure an ElementsWorker with fixed CLI parameters to avoid issues with pytest""" @@ -212,6 +224,27 @@ def mock_elements_worker(monkeypatch, mock_config_api): return worker +@pytest.fixture +def mock_elements_worker_with_list(monkeypatch, responses, mock_elements_worker): + """ + Mock a worker instance to list and retrieve a single element + """ + monkeypatch.setattr( + mock_elements_worker, "list_elements", lambda: ["1234-deadbeef"] + ) + responses.add( + responses.GET, + "http://testserver/api/v1/element/1234-deadbeef/", + status=200, + json={ + "id": "1234-deadbeef", + "type": "page", + "name": "Test Page n°1", + }, + ) + return mock_elements_worker + + @pytest.fixture def mock_base_worker_with_cache(mocker, monkeypatch, mock_config_api): """Build a BaseWorker using SQLite cache, also mocking a PONOS_TASK""" diff --git a/tests/test_elements_worker/test_worker.py b/tests/test_elements_worker/test_worker.py index ca52c53a6217403857104e81802b4a418a8623af..5289d678e9fe559aaaaef0fa546eb4d8dbb0ba73 100644 --- a/tests/test_elements_worker/test_worker.py +++ b/tests/test_elements_worker/test_worker.py @@ -71,7 +71,8 @@ def test_readonly(responses, mock_elements_worker): out = mock_elements_worker.update_activity("1234-deadbeef", ActivityState.Processed) - assert out is None + # update_activity returns False in very specific cases + assert out is True assert len(responses.calls) == len(BASE_API_CALLS) assert [ (call.request.method, call.request.url) for call in responses.calls @@ -130,11 +131,7 @@ def test_update_call(responses, mock_elements_worker, mock_process_api): out = mock_elements_worker.update_activity("1234-deadbeef", ActivityState.Processed) # Check the response received by worker - assert out == { - "element_id": "1234-deadbeef", - "process_id": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeffff", - "state": "processed", - } + assert out is True assert len(responses.calls) == len(BASE_API_CALLS) + 1 assert [ @@ -169,54 +166,17 @@ def test_update_call(responses, mock_elements_worker, mock_process_api): ], ) def test_run( - monkeypatch, mock_elements_worker, responses, process_exception, final_state + monkeypatch, + mock_elements_worker_with_list, + responses, + process_exception, + final_state, + mock_activity_calls, ): """Check the normal runtime sends 2 API calls to update activity""" # Disable second configure call from run() - monkeypatch.setattr(mock_elements_worker, "configure", lambda: None) - - # Mock elements - monkeypatch.setattr( - mock_elements_worker, - "list_elements", - lambda: [ - "1234-deadbeef", - ], - ) - responses.add( - responses.GET, - "http://testserver/api/v1/element/1234-deadbeef/", - status=200, - json={ - "id": "1234-deadbeef", - "type": "page", - "name": "Test Page n°1", - }, - ) - - # Mock Update activity - responses.add( - responses.PUT, - "http://testserver/api/v1/workers/versions/12341234-1234-1234-1234-123412341234/activity/", - status=200, - json={ - "element_id": "1234-deadbeef", - "process_id": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeffff", - "state": "started", - }, - ) - responses.add( - responses.PUT, - "http://testserver/api/v1/workers/versions/12341234-1234-1234-1234-123412341234/activity/", - status=200, - json={ - "element_id": "1234-deadbeef", - "process_id": "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeffff", - "state": final_state, - }, - ) - - assert mock_elements_worker.is_read_only is False + monkeypatch.setattr(mock_elements_worker_with_list, "configure", lambda: None) + assert mock_elements_worker_with_list.is_read_only is False # Mock exception in process_element if process_exception: @@ -224,14 +184,14 @@ def test_run( def _err(): raise process_exception - monkeypatch.setattr(mock_elements_worker, "process_element", _err) + monkeypatch.setattr(mock_elements_worker_with_list, "process_element", _err) # The worker stops because all elements failed ! with pytest.raises(SystemExit): - mock_elements_worker.run() + mock_elements_worker_with_list.run() else: # Simply run the process - mock_elements_worker.run() + mock_elements_worker_with_list.run() assert len(responses.calls) == len(BASE_API_CALLS) + 3 assert [ @@ -262,7 +222,11 @@ def test_run( def test_run_cache( - monkeypatch, mocker, mock_elements_worker_with_cache, mock_cached_elements + monkeypatch, + mocker, + mock_elements_worker_with_cache, + mock_cached_elements, + mock_activity_calls, ): # Disable second configure call from run() monkeypatch.setattr(mock_elements_worker_with_cache, "configure", lambda: None) @@ -278,3 +242,79 @@ def test_run_cache( mocker.call(elt) for elt in CachedElement.select() ] + + +def test_start_activity_conflict( + monkeypatch, responses, mocker, mock_elements_worker_with_list +): + # Disable second configure call from run() + monkeypatch.setattr(mock_elements_worker_with_list, "configure", lambda: None) + + # Mock a "normal" conflict during in activity update, which returns the Exception + responses.add( + responses.PUT, + "http://testserver/api/v1/workers/versions/12341234-1234-1234-1234-123412341234/activity/", + body=ErrorResponse( + title="conflict", + status_code=409, + content="Either this activity does not exists or this state is not allowed.", + ), + ) + from arkindex_worker.worker import logger + + logger.info = mocker.MagicMock() + + mock_elements_worker_with_list.run() + + assert len(responses.calls) == len(BASE_API_CALLS) + 2 + assert [ + (call.request.method, call.request.url) for call in responses.calls + ] == BASE_API_CALLS + [ + ("GET", "http://testserver/api/v1/element/1234-deadbeef/"), + ( + "PUT", + "http://testserver/api/v1/workers/versions/12341234-1234-1234-1234-123412341234/activity/", + ), + ] + assert logger.info.call_args_list[:2] == [ + mocker.call("Processing page Test Page n°1 (1234-deadbeef) (1/1)"), + mocker.call("Skipping element 1234-deadbeef as it was already processed"), + ] + + +def test_start_activity_error( + monkeypatch, responses, mocker, mock_elements_worker_with_list +): + # Disable second configure call from run() + monkeypatch.setattr(mock_elements_worker_with_list, "configure", lambda: None) + + # Mock a random error occurring during the activity update + responses.add( + responses.PUT, + "http://testserver/api/v1/workers/versions/12341234-1234-1234-1234-123412341234/activity/", + body=Exception("A wild Petilil appears !"), + ) + from arkindex_worker.worker import logger + + logger.error = mocker.MagicMock() + + with pytest.raises(SystemExit): + mock_elements_worker_with_list.run() + + assert [ + (call.request.method, call.request.url) for call in responses.calls + ] == BASE_API_CALLS + [ + ("GET", "http://testserver/api/v1/element/1234-deadbeef/"), + ( + "PUT", + "http://testserver/api/v1/workers/versions/12341234-1234-1234-1234-123412341234/activity/", + ), + # Activity is updated to the "error" state regardless of the Exception occurring during the call + ( + "PUT", + "http://testserver/api/v1/workers/versions/12341234-1234-1234-1234-123412341234/activity/", + ), + ] + assert logger.error.call_args_list == [ + mocker.call("Ran on 1 elements: 0 completed, 1 failed") + ]