Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • arkindex/backend
1 result
Show changes
Commits on Source (13)
Showing
with 484 additions and 438 deletions
1.2.0-rc1
1.2.0-rc2
......@@ -1125,13 +1125,18 @@ class WorkerRunList(WorkerACLMixin, ListCreateAPIView):
# User must have admin access to the process project and a contributor access to the worker run
if not self.has_access(process.corpus, Role.Admin.value):
raise PermissionDenied(detail='You do not have an admin access to the corpus of this process.')
try:
worker = Worker.objects.get(versions__id=serializer.validated_data['version_id'])
except Worker.DoesNotExist:
raise ValidationError({'worker_version_id': ['This version does not exist.']})
if not self.has_execution_access(worker):
raise ValidationError({'worker_version_id': ['You do not have an execution access to this version.']})
if process.worker_runs.filter(version_id=serializer.validated_data['version_id']).exists():
raise ValidationError({'worker_version_id': ['A WorkerRun with this version already exists on this process.']})
configuration = serializer.validated_data.pop('configuration_id', None)
if configuration and configuration.worker_id != worker.id:
raise ValidationError({'configuration_id': ['The configuration must be part of the same worker.']})
......
#!/usr/bin/env python3
import logging
import uuid
from django.core.management.base import BaseCommand
from django.core.management.base import BaseCommand, CommandError
from arkindex.dataimport.models import Repository, RepositoryType, Revision, Worker, WorkerVersion
logging.basicConfig(
level=logging.INFO,
format='[%(levelname)s] %(message)s',
)
logger = logging.getLogger(__name__)
class Command(BaseCommand):
help = 'Create a fake worker version'
......@@ -37,7 +30,7 @@ class Command(BaseCommand):
versions = WorkerVersion.objects.filter(worker__slug=slug)
if versions.exists():
versions_id = list(versions.values_list('id', flat=True).distinct())
logger.info(f"This worker already has versions: {versions_id}")
raise CommandError(f"This worker already has versions: {versions_id}")
return
repo, _ = Repository.objects.get_or_create(
......@@ -68,4 +61,4 @@ class Command(BaseCommand):
configuration={"configuration": {}},
)
logger.info(f"Created a worker version: {version.id}")
self.stdout.write(self.style.SUCCESS(f"Created a worker version: {version.id}"))
#!/usr/bin/env python3
import logging
import yaml
from django.conf import settings
from django.core.management.base import BaseCommand, CommandError
......@@ -11,12 +9,6 @@ from arkindex.project.argparse import CorpusArgument
from arkindex.users.models import User
from ponos.models import Workflow
logging.basicConfig(
level=logging.INFO,
format='[%(levelname)s] %(message)s',
)
logger = logging.getLogger(__name__)
IMPORT_NAME = 'import-s3'
MAX_DEPTH_WARN = 3
......@@ -90,10 +82,9 @@ class Command(BaseCommand):
# Warn for high recursion level
depth = options['max_folder_depth']
if depth > MAX_DEPTH_WARN:
logger.warning(
'Maximum folder depth set to {}. A high value can considerably slow tasks import distribution'
.format(depth)
)
self.stderr.write(self.style.WARNING(
f'Maximum folder depth set to {depth}. A high value can considerably slow tasks import distribution'
))
iiif_cache_bucket = options.get('iiif_cache_bucket')
......@@ -146,7 +137,7 @@ class Command(BaseCommand):
recipe.setdefault('env', {}).update(env_vars)
workflow = Workflow.objects.create(farm_id=get_default_farm_id(), recipe=yaml.dump(recipe))
logger.info('Created Workflow with id {}'.format(workflow.id))
self.stdout.write('Created Workflow with id {}'.format(workflow.id))
admin = User.objects.filter(is_admin=True).first()
assert admin is not None, 'No admin user has been found to create a Dataimport'
......@@ -157,7 +148,7 @@ class Command(BaseCommand):
corpus_id=options['corpus'].id,
creator_id=admin.id
)
logger.info(
self.stdout.write(self.style.SUCCESS(
'Linked Workflow to DataImport {0} using user {1.email} ({1.id})'
.format(dataimport.id, admin)
)
))
import logging
from django.core.management.base import BaseCommand
from arkindex.dataimport.models import Repository
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
logger = logging.getLogger()
class Command(BaseCommand):
help = "Update all Git repositories hooks"
......@@ -23,7 +15,7 @@ class Command(BaseCommand):
def handle(self, base_url, *args, **kwargs):
for repository in Repository.objects.filter(credentials__isnull=False):
logger.info(f"Updating {repository}")
self.stdout.write(f"Updating {repository}")
repository.provider.create_hook(repository, base_url=base_url)
logger.info("All done")
self.stdout.write(self.style.SUCCESS("All done"))
import logging
from django.core.management.base import BaseCommand
from arkindex.dataimport.models import Repository
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
logger = logging.getLogger()
class Command(BaseCommand):
help = "Update all Git repositories references and trigger build processes"
......
......@@ -3,15 +3,16 @@ from typing import Optional
from django.conf import settings
from django.db import transaction
from django_rq import job
from rq import Retry
from arkindex.dataimport.models import ActivityState, DataImport, WorkerActivity, WorkerActivityState
@job('default', timeout=settings.RQ_TIMEOUTS['initialize_activity'])
@job('default', timeout=settings.RQ_TIMEOUTS['initialize_activity'], retry=Retry(max=4))
def initialize_activity(process: DataImport):
"""
List all worker versions used in a process and initialize their activity on processed elements.
Timeout is set to 1 hour
4 retries allowed, for a total of 5 attempts, to try to mitigate some database errors from the large query.
"""
try:
with transaction.atomic():
......
from django.core.management import call_command
from django.core.management.base import CommandError
from arkindex.dataimport.models import Repository, RepositoryType, Revision, Worker, WorkerVersion, WorkerVersionState
from arkindex.project.tests import FixtureTestCase
......@@ -55,12 +56,13 @@ class TestFakeWorker(FixtureTestCase):
old_versions_id = list(old_versions.values_list('id', flat=True).distinct())
self.assertTrue(old_versions.exists())
call_command(
'fake_worker_version',
slug=slug,
name=name,
url=url,
)
with self.assertRaises(CommandError):
call_command(
'fake_worker_version',
slug=slug,
name=name,
url=url,
)
new_versions = WorkerVersion.objects.filter(worker__slug=slug)
new_versions_id = list(new_versions.values_list('id', flat=True).distinct())
......
......@@ -122,6 +122,15 @@ class TestWorkerRuns(FixtureAPITestCase):
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertEqual(response.json(), {'worker_version_id': ['This version does not exist.']})
def test_create_run_unique(self):
self.client.force_login(self.user)
response = self.client.post(
reverse('api:worker-run-list', kwargs={'pk': str(self.dataimport_1.id)}),
data={'worker_version_id': str(self.version_1.id), 'parents': []}, format='json'
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertEqual(response.json(), {'worker_version_id': ['A WorkerRun with this version already exists on this process.']})
def test_runs_post_invalid_dataimport_id(self):
self.client.force_login(self.user)
response = self.client.post(
......@@ -161,7 +170,7 @@ class TestWorkerRuns(FixtureAPITestCase):
def test_runs_post_create_worker_run(self):
self.client.force_login(self.user)
with self.assertNumQueries(14):
with self.assertNumQueries(15):
response = self.client.post(
reverse('api:worker-run-list', kwargs={'pk': str(self.dataimport_2.id)}),
data={'worker_version_id': str(self.version_1.id), 'parents': []}, format='json'
......@@ -187,7 +196,7 @@ class TestWorkerRuns(FixtureAPITestCase):
self.worker_1.memberships.filter(user=self.user).update(level=Role.Guest.value)
self.worker_1.repository.memberships.create(user=self.user, level=Role.Contributor.value)
self.client.force_login(self.user)
with self.assertNumQueries(15):
with self.assertNumQueries(16):
response = self.client.post(
reverse('api:worker-run-list', kwargs={'pk': str(self.dataimport_2.id)}),
data={'worker_version_id': str(self.version_1.id), 'parents': []}, format='json'
......@@ -210,7 +219,7 @@ class TestWorkerRuns(FixtureAPITestCase):
def test_create_run_configuration(self):
self.client.force_login(self.user)
with self.assertNumQueries(15):
with self.assertNumQueries(16):
response = self.client.post(
reverse('api:worker-run-list', kwargs={'pk': str(self.dataimport_2.id)}),
data={
......
......@@ -1767,7 +1767,7 @@ class WorkerResultsDestroy(CorpusACLMixin, DestroyAPIView):
worker_results_delete(
corpus_id=corpus.id,
version=worker_version,
parent_id=element_id,
element_id=element_id,
user_id=self.request.user.id,
)
......
import logging
from uuid import UUID
from django.core.exceptions import ValidationError
......@@ -41,8 +40,6 @@ from arkindex.project.mixins import ACLMixin, CorpusACLMixin
from arkindex.project.permissions import IsVerified, IsVerifiedOrReadOnly
from arkindex.users.models import Role
logger = logging.getLogger(__name__)
@extend_schema(tags=['entities'])
@extend_schema_view(
......
import logging
import uuid
from django.db import transaction
......@@ -35,8 +34,6 @@ from arkindex.project.mixins import ACLMixin, CorpusACLMixin, SelectionMixin
from arkindex.project.permissions import IsVerified, IsVerifiedOrReadOnly
from arkindex.users.models import Role
logger = logging.getLogger(__name__)
@extend_schema_view(
post=extend_schema(
......
This diff is collapsed.
......@@ -7,12 +7,6 @@ from teklia_toolbox.time import Timer
from arkindex.documents.models import Element
# Enable deletion signal logs
logging.basicConfig(
level=logging.INFO,
format='[%(levelname)s] %(message)s',
)
def valid_uuid(value):
try:
......@@ -34,6 +28,9 @@ class Command(BaseCommand):
)
def handle(self, element_id, verbosity=0, **options):
# Enable deletion signal logs
logging.getLogger('arkindex.documents.delete').setLevel(logging.INFO)
for element in Element.objects.filter(id__in=element_id):
self.stdout.write(f"Deleting {element.id} : {element}")
with Timer() as t:
......
import logging
import multiprocessing
import os
import sys
......@@ -6,12 +5,6 @@ import sys
from django.core.management.base import BaseCommand, CommandError
from django.core.wsgi import get_wsgi_application
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
logger = logging.getLogger()
class Command(BaseCommand):
help = "Run Arkindex API server through Gunicorn"
......@@ -49,7 +42,7 @@ class Command(BaseCommand):
# Build bind string
bind = f"{host}:{port}"
logger.info(f"Running server on {bind} with {workers} workers")
self.stdout.write(f"Running server on {bind} with {workers} workers")
# Do not send out CLI args to gunicorn as they are not compatible
# and we override directly the configuration to pass the WSGI application
......
#!/usr/bin/env python3
import json
import logging
import os
import sqlite3
import uuid
......@@ -29,8 +28,6 @@ from arkindex.documents.models import (
from arkindex.images.models import Image, ImageServer
from arkindex.users.models import Role, User
logger = logging.getLogger(__name__)
TABLE_NAMES = {
'export_version',
'image_server',
......@@ -146,19 +143,26 @@ class Command(BaseCommand):
)]
def convert_worker_versions(self, row):
repository = Repository.objects.get(url=row['repository_url'])
worker, _ = Worker.objects.get_or_create(
slug=row["slug"],
repository__url=row["repository_url"],
repository=repository,
defaults={
"type": row["type"],
"name": row["name"],
}
)
revision, _ = Revision.objects.get_or_create(
repo__url=row["repository_url"],
repo=repository,
hash=row["revision"],
defaults={"message": "Fake revision", "author": self.user.display_name}
defaults={
"message": "Fake revision",
"author": self.user.display_name,
}
)
return [WorkerVersion(
id=row["id"],
worker=worker,
......@@ -284,7 +288,7 @@ class Command(BaseCommand):
verbose_name_plural = ModelClass._meta.verbose_name_plural.lower()
verbose_name = ModelClass._meta.verbose_name.capitalize()
logger.info(f"Creating {verbose_name_plural}")
self.stdout.write(f"Creating {verbose_name_plural}")
with Timer() as t:
count, failed = 0, 0
......@@ -295,7 +299,7 @@ class Command(BaseCommand):
try:
objects.extend(convert_method(row))
except Exception as e:
logger.warning(f"{verbose_name} creation failed: {e}")
self.stderr.write(self.style.WARNING(f"{verbose_name} conversion failed: {e}"))
failed += 1
# Create objects in bulk
......@@ -303,10 +307,10 @@ class Command(BaseCommand):
ModelClass.objects.bulk_create(objects, ignore_conflicts=True)
count += len(objects)
except Exception as e:
logger.warning(f"{verbose_name_plural.title()} creation failed: {e}")
self.stderr.write(self.style.WARNING(f"{verbose_name_plural.title()} insertion failed: {e}"))
failed += len(objects)
logger.info(f"Ran on {count+failed} rows: {count} completed, {failed} failed in {t.delta}")
self.stdout.write(f"Ran on {count+failed} rows: {count} completed, {failed} failed in {t.delta}")
def handle(self, db_path, email, **options):
# Check the database file
......@@ -347,7 +351,7 @@ class Command(BaseCommand):
if not corpus_name:
corpus_name = f"Corpus import {date}"
logger.info(f"Creating corpus {corpus_name}")
self.stdout.write(f"Creating corpus {corpus_name}")
with Timer() as t:
# Create corpus
self.corpus = Corpus.objects.create(name=corpus_name, description=f"Corpus import {date}")
......@@ -393,6 +397,6 @@ class Command(BaseCommand):
# Create classifications
self.bulk_create_objects(Classification, self.convert_classifications, SQL_CLASSIFICATION_QUERY)
logger.info(f"Created corpus {corpus_name} in {t.delta}")
self.stdout.write(self.style.SUCCESS(f"Created corpus {corpus_name} in {t.delta}"))
self.db.close()
import logging
import uuid
from urllib.parse import urljoin
......@@ -24,8 +23,6 @@ from arkindex.project.default_corpus import DEFAULT_CORPUS_TYPES, DEFAULT_TRANSK
from arkindex.project.fields import ArrayField, LinearRingField
from arkindex.project.models import IndexableModel
logger = logging.getLogger(__name__)
class Corpus(IndexableModel):
'''
......
......@@ -85,10 +85,14 @@ def element_trash(queryset: ElementQuerySet, delete_children: bool) -> None:
@job('high', timeout=settings.RQ_TIMEOUTS['worker_results_delete'])
def worker_results_delete(corpus_id: str, version_id: Optional[str], parent_id: Optional[str]) -> None:
def worker_results_delete(
corpus_id: str,
version_id: Optional[str],
element_id: Optional[str],
include_children: bool = True) -> None:
"""
Recursively delete all Worker Results produced by any WorkerVersion or a specific one
on a whole corpus or under a specified parent element (parent element included).
on a whole corpus, under a specified parent element (parent element included), or on a single element.
"""
elements = Element.objects.filter(corpus_id=corpus_id)
classifications = Classification.objects.filter(element__corpus_id=corpus_id)
......@@ -124,17 +128,31 @@ def worker_results_delete(corpus_id: str, version_id: Optional[str], parent_id:
metadata = metadata.exclude(worker_version_id=None)
worker_activities = worker_activities.exclude(worker_version_id=None)
# When a parent ID is defined, filter using element paths to match the parent and all its descendants
if parent_id:
elements = elements.filter(Q(id=parent_id) | Q(paths__path__overlap=[parent_id]))
classifications = classifications.filter(Q(element_id=parent_id) | Q(element__paths__path__overlap=[parent_id]))
transcriptions = transcriptions.filter(Q(element_id=parent_id) | Q(element__paths__path__overlap=[parent_id]))
# The same filter is applied to both TranscriptionEntity querysets
transcription_entities_filter = Q(transcription__element_id=parent_id) | Q(transcription__element__paths__path__overlap=[parent_id])
transcription_entities = transcription_entities.filter(transcription_entities_filter)
worker_transcription_entities = worker_transcription_entities.filter(transcription_entities_filter)
metadata = metadata.filter(Q(element_id=parent_id) | Q(element__paths__path__overlap=[parent_id]))
worker_activities = worker_activities.filter(Q(element_id=parent_id) | Q(element__paths__path__overlap=[parent_id]))
if element_id:
# include_children causes a deletion *only* on the element's descendants.
# To also delete on the element itself, we will call this task synchronously with the same arguments and include_children=False.
# This is used to avoid filtering by Q(id=element_id) | Q(paths__path__overlap=[element_id]) all at once,
# which was found to be up to 30× slower than running two separate deletions.
if include_children:
elements = elements.filter(paths__path__overlap=[element_id])
classifications = classifications.filter(element__paths__path__overlap=[element_id])
transcriptions = transcriptions.filter(element__paths__path__overlap=[element_id])
# The same filter is applied to both TranscriptionEntity querysets
transcription_entities_filter = Q(transcription__element__paths__path__overlap=[element_id])
transcription_entities = transcription_entities.filter(transcription_entities_filter)
worker_transcription_entities = worker_transcription_entities.filter(transcription_entities_filter)
metadata = metadata.filter(element__paths__path__overlap=[element_id])
worker_activities = worker_activities.filter(element__paths__path__overlap=[element_id])
else:
elements = elements.filter(id=element_id)
classifications = classifications.filter(element_id=element_id)
transcriptions = transcriptions.filter(element_id=element_id)
# The same filter is applied to both TranscriptionEntity querysets
transcription_entities_filter = Q(transcription__element_id=element_id)
transcription_entities = transcription_entities.filter(transcription_entities_filter)
worker_transcription_entities = worker_transcription_entities.filter(transcription_entities_filter)
metadata = metadata.filter(element_id=element_id)
worker_activities = worker_activities.filter(element_id=element_id)
elements.trash()
classifications.delete()
......@@ -145,6 +163,18 @@ def worker_results_delete(corpus_id: str, version_id: Optional[str], parent_id:
metadata.delete()
worker_activities.delete()
if element_id and include_children:
# Delete worker results on the parent element itself only after the deletion on its descendants has been done.
# This deletion could cause the parent element to be deleted if it is considered to be a worker result itself,
# which would remove the related ElementPaths and cause us to loose track of which child elements
# we were supposed to delete worker results on.
worker_results_delete(
corpus_id=corpus_id,
version_id=version_id,
element_id=element_id,
include_children=False,
)
@job('high', timeout=settings.RQ_TIMEOUTS['move_element'])
def move_element(source: Element, destination: Element) -> None:
......
......@@ -157,8 +157,9 @@ class TestLoadExport(FixtureTestCase):
db_path = args[0]
# Call dumpdata command before the deletion
# Ignore django_rq as it uses a fake database table to insert itself into Django's permissions system
_, dump_path_before = tempfile.mkstemp(suffix='.json')
call_command('dumpdata', output=dump_path_before)
call_command('dumpdata', output=dump_path_before, exclude=['django_rq'])
# Delete the existing corpus
corpus_delete(self.corpus.id)
......@@ -170,7 +171,7 @@ class TestLoadExport(FixtureTestCase):
# Call dumpdata command after the import
_, dump_path_after = tempfile.mkstemp(suffix='.json')
call_command('dumpdata', output=dump_path_after)
call_command('dumpdata', output=dump_path_after, exclude=['django_rq'])
corpus = Corpus.objects.get(name='My corpus')
......
......@@ -89,7 +89,7 @@ class TestDeleteWorkerResults(FixtureTestCase):
with self.assertExactQueries('worker_results_delete_under_parent.sql', params={
'corpus_id': str(self.corpus.id),
'version_id': str(self.version_1.id),
'parent_id': str(self.page1.id),
'element_id': str(self.page1.id),
}):
worker_results_delete(self.corpus.id, self.version_1.id, self.page1.id)
......@@ -116,7 +116,7 @@ class TestDeleteWorkerResults(FixtureTestCase):
with self.assertExactQueries('worker_results_delete_under_parent_included.sql', params={
'corpus_id': str(self.corpus.id),
'version_id': str(self.version_1.id),
'parent_id': str(self.page2.id),
'element_id': str(self.page2.id),
}):
worker_results_delete(self.corpus.id, self.version_1.id, self.page2.id)
......