Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • arkindex/backend
1 result
Show changes
Commits on Source (4)
Showing
with 466 additions and 61 deletions
......@@ -54,7 +54,7 @@ release:
clean-docker:
$(eval containers:=$(shell docker ps -a -q))
@if [ -n "$(containers)" ]; then \
echo "Cleaning up past containers\n" \
echo "Cleaning up past containers\n" ; \
docker rm -f $(containers) ; \
fi
......
......@@ -14,10 +14,21 @@ from rq.utils import as_text
from arkindex.documents.models import CorpusExport, CorpusExportState, Element
from arkindex.images.models import Image, ImageServer
from arkindex.ponos.models import Artifact, Task
from arkindex.process.models import DataFile, GitRef, GitRefType, Process, WorkerVersion, WorkerVersionState
from arkindex.process.models import (
CorpusWorkerVersion,
DataFile,
GitRef,
GitRefType,
Process,
Worker,
WorkerActivity,
WorkerRun,
WorkerVersion,
WorkerVersionState,
)
from arkindex.project.aws import s3
from arkindex.project.rq_overrides import Job
from arkindex.training.models import ModelVersion
from arkindex.training.models import Model, ModelVersion
from redis.exceptions import ConnectionError
# Ponos artifacts use the path: <task id>/<path>
......@@ -32,6 +43,9 @@ class Command(BaseCommand):
help = "Clean up old corpus exports, trashed DataFiles, expired processes and S3 buckets"
def handle(self, *args, **options):
# Cleaning up workers could free some artifacts, so clean them before artifacts
self.cleanup_archived_workers()
self.cleanup_artifacts()
self.cleanup_expired_processes()
......@@ -48,6 +62,8 @@ class Command(BaseCommand):
self.cleanup_ponos_logs()
self.cleanup_archived_models()
self.cleanup_unlinked_model_versions()
self.cleanup_rq_user_registries()
......@@ -294,6 +310,71 @@ class Command(BaseCommand):
self.stdout.write(self.style.SUCCESS("Successfully cleaned up orphaned Ponos logs."))
def cleanup_archived_workers(self):
"""
Remove Worker instances that have been archived for longer than the configured worker cleanup delay
and that are not being used in any worker result.
"""
self.stdout.write("Removing archived workers…")
workers = Worker.objects.filter(archived__lte=timezone.now() - timedelta(days=settings.WORKER_CLEANUP_DELAY))
skipped, deleted = 0, 0
for worker in workers.iterator():
# There are both foreign keys for worker versions and worker runs on worker results.
# Some old results might only have a worker version ID, but when a worker run ID is set,
# the worker version ID is deduced from it, so we only have to check on the version.
if worker.versions.all().in_use():
skipped += 1
continue
# Skip any workers whose WorkerConfigurations are in use.
# This should never happen since we already filter on the WorkerVersions,
# but that could lead to deleting worker results when we didn't want to.
if WorkerRun.objects.filter(configuration__worker=worker).in_use():
self.stdout.write(self.style.WARNING(
f"Worker {worker.name} ({worker.id}) does not have any worker versions used by worker results, "
"but some of its worker configurations are in use."
))
continue
self.stdout.write(f"Removing worker {worker.name} ({worker.id})")
worker.delete()
deleted += 1
if skipped:
self.stdout.write(f"Skipping {skipped} archived workers that have worker versions or configurations used in worker results.")
self.stdout.write(self.style.SUCCESS(f"Successfully cleaned up {deleted} archived workers."))
def cleanup_archived_models(self):
"""
Remove Model instances that have been archived for longer than the configured model cleanup delay
and that are not being used in any worker result.
"""
self.stdout.write("Removing archived models…")
models = Model.objects.filter(archived__lte=timezone.now() - timedelta(days=settings.MODEL_CLEANUP_DELAY))
skipped, deleted = 0, 0
for model in models.iterator():
if WorkerRun.objects.filter(model_version__model=model).in_use():
skipped += 1
continue
self.stdout.write(f"Removing model {model.name} ({model.id})")
# Remove CorpusWorkerVersions and WorkerActivities first
# Those normally use SET_NULL, but this can cause the unique constraints to complain
# if there already are rows with a model version set to None.
WorkerActivity.objects.filter(model_version__model=model).delete()
CorpusWorkerVersion.objects.filter(model_version__model=model).delete()
model.delete()
deleted += 1
if skipped:
self.stdout.write(f"Skipping {skipped} archived models that have model versions used in worker results.")
self.stdout.write(self.style.SUCCESS(f"Successfully cleaned up {deleted} archived models."))
def cleanup_unlinked_model_versions(self):
self.stdout.write("Removing orphaned model versions archives…")
bucket = s3.Bucket(settings.AWS_TRAINING_BUCKET)
......
......@@ -74,7 +74,6 @@ class TestCorpus(FixtureAPITestCase):
mock_now.return_value = FAKE_NOW
cls.corpus_hidden = Corpus.objects.create(name="C Hidden")
@expectedFailure
def test_anon(self):
# An anonymous user has only access to public
with self.assertNumQueries(4):
......@@ -225,7 +224,6 @@ class TestCorpus(FixtureAPITestCase):
self.assertEqual(len(data), 13)
self.assertSetEqual({corpus["top_level_type"] for corpus in data}, {None, "top_level"})
@expectedFailure
def test_mixin(self):
vol1 = Element.objects.get(name="Volume 1")
vol2 = Element.objects.get(name="Volume 2")
......@@ -345,7 +343,7 @@ class TestCorpus(FixtureAPITestCase):
"description": self.corpus_public.description,
"public": True,
"indexable": False,
"rights": ["read", "write", "admin"],
"rights": ["read"],
"created": DB_CREATED,
"authorized_users": 1,
"top_level_type": None,
......
......@@ -43,7 +43,7 @@ class TestRetrieveElements(FixtureAPITestCase):
"public": True,
},
"thumbnail_url": self.vol.thumbnail.s3_url,
"thumbnail_put_url": self.vol.thumbnail.s3_put_url,
"thumbnail_put_url": None,
"worker_version": None,
"confidence": None,
"zone": None,
......@@ -51,7 +51,7 @@ class TestRetrieveElements(FixtureAPITestCase):
"mirrored": False,
"created": "2020-02-02T01:23:45.678000Z",
"creator": None,
"rights": ["read", "write", "admin"],
"rights": ["read"],
"metadata_count": 0,
"classifications": [
{
......@@ -102,6 +102,8 @@ class TestRetrieveElements(FixtureAPITestCase):
"""
Check getting an element only gives a thumbnail URL with folders
"""
self.client.force_login(self.user)
self.assertTrue(self.vol.type.folder)
response = self.client.get(reverse("api:element-retrieve", kwargs={"pk": str(self.vol.id)}))
self.assertEqual(response.status_code, status.HTTP_200_OK)
......@@ -230,7 +232,7 @@ class TestRetrieveElements(FixtureAPITestCase):
"public": True,
},
"thumbnail_url": self.vol.thumbnail.s3_url,
"thumbnail_put_url": self.vol.thumbnail.s3_put_url,
"thumbnail_put_url": None,
"worker_version": str(self.worker_version.id),
"confidence": None,
"zone": None,
......@@ -238,7 +240,7 @@ class TestRetrieveElements(FixtureAPITestCase):
"mirrored": False,
"created": "2020-02-02T01:23:45.678000Z",
"creator": None,
"rights": ["read", "write", "admin"],
"rights": ["read"],
"metadata_count": 0,
"classifications": [],
"worker_run": {
......@@ -265,7 +267,7 @@ class TestRetrieveElements(FixtureAPITestCase):
"public": True,
},
"thumbnail_url": self.vol.thumbnail.s3_url,
"thumbnail_put_url": self.vol.thumbnail.s3_put_url,
"thumbnail_put_url": None,
"worker_version": None,
"confidence": None,
"zone": None,
......@@ -273,7 +275,7 @@ class TestRetrieveElements(FixtureAPITestCase):
"mirrored": False,
"created": "2020-02-02T01:23:45.678000Z",
"creator": None,
"rights": ["read", "write", "admin"],
"rights": ["read"],
"metadata_count": 0,
"classifications": [
{
......
......@@ -130,6 +130,32 @@ class CorpusWorkerVersionManager(Manager):
], ignore_conflicts=True)
class WorkerResultSourceQuerySet(QuerySet):
@property
def worker_result_relations(self):
from arkindex.process.models import WorkerRun
return (
field
for field in self.model._meta.get_fields()
if isinstance(field, ManyToOneRel)
# Ignore reverse links to processes tasks
and field.related_model is not Task
# Ignore the WorkerRun→WorkerVersion link
and field.related_model is not WorkerRun
)
def in_use(self):
"""
Check if any data is linked to this worker run or worker version
"""
ids = self.values_list("id", flat=True)
return any(
relation.related_model.objects.filter(**{f"{relation.field.name}__in": ids}).exists()
for relation in self.worker_result_relations
)
class WorkerVersionManager(Manager):
@cached_property
......@@ -145,6 +171,9 @@ class WorkerVersionManager(Manager):
.get(id=settings.IMPORTS_WORKER_VERSION)
)
def get_queryset(self):
return WorkerResultSourceQuerySet(self.model, using=self._db)
class WorkerManager(BaseACLManager):
......@@ -172,25 +201,7 @@ class WorkerManager(BaseACLManager):
).distinct()
class WorkerRunQuerySet(QuerySet):
def worker_results_models(self):
return [
field.related_model
for field in self.model._meta.get_fields()
if isinstance(field, ManyToOneRel)
# Ignore reverse links to processes tasks
and field.related_model is not Task
]
def in_use(self):
"""
Check if any data is linked to worker runs
"""
ids = self.values_list("id", flat=True)
for field in self.worker_results_models():
if field.objects.filter(worker_run__in=ids).exists():
return True
return False
class WorkerRunQuerySet(WorkerResultSourceQuerySet):
def set_has_results(self):
"""
......@@ -204,9 +215,9 @@ class WorkerRunQuerySet(QuerySet):
def has_results_expression(self):
return reduce(operator.or_, [
Exists(
model.objects.filter(worker_run_id=OuterRef("pk"))
relation.related_model.objects.filter(worker_run_id=OuterRef("pk"))
)
for model in self.worker_results_models()
for relation in self.worker_result_relations
])
......
......@@ -18,6 +18,7 @@ from arkindex.process.models import (
WorkerRun,
WorkerVersionState,
)
from arkindex.process.utils import get_default_farm
from arkindex.project.mixins import ProcessACLMixin
from arkindex.project.serializer_fields import EnumField, LinearRingField
from arkindex.project.validators import MaxValueValidator
......@@ -26,7 +27,6 @@ from arkindex.users.models import Role
from arkindex.users.utils import get_max_level
ProcessFarmField = import_string(getattr(settings, "PROCESS_FARM_FIELD", None) or "arkindex.project.serializer_fields.NullField")
get_default_farm = import_string(getattr(settings, "GET_DEFAULT_FARM", None) or "arkindex.process.utils.get_default_farm")
class ProcessLightSerializer(serializers.ModelSerializer):
......
from unittest import expectedFailure
from unittest.mock import call, patch
from django.test import override_settings
......@@ -19,8 +18,6 @@ class TestCreateS3Import(FixtureTestCase):
def setUpTestData(cls):
super().setUpTestData()
cls.import_worker_version = WorkerVersion.objects.get(worker__slug="file_import")
cls.default_farm = Farm.objects.create(name="Crypto farm")
cls.default_farm.memberships.create(user=cls.user, level=Role.Guest.value)
def test_requires_login(self):
with self.assertNumQueries(0):
......@@ -138,8 +135,9 @@ class TestCreateS3Import(FixtureTestCase):
self.client.force_login(self.user)
ImageServer.objects.create(id=999, display_name="Ingest image server", url="https://dev.null.teklia.com")
element = self.corpus.elements.get(name="Volume 1")
farm = Farm.objects.create(name="Crypto farm")
with self.assertNumQueries(22), self.settings(IMPORTS_WORKER_VERSION=str(self.import_worker_version.id)):
with self.assertNumQueries(23), self.settings(IMPORTS_WORKER_VERSION=str(self.import_worker_version.id)):
response = self.client.post(reverse("api:s3-import-create"), {
"corpus_id": str(self.corpus.id),
"element_id": str(element.id),
......@@ -147,6 +145,7 @@ class TestCreateS3Import(FixtureTestCase):
"element_type": "page",
"bucket_name": "blah",
"prefix": "a/b/c",
"farm_id": str(farm.id),
})
self.assertEqual(response.status_code, status.HTTP_201_CREATED, response.json())
data = response.json()
......@@ -161,6 +160,7 @@ class TestCreateS3Import(FixtureTestCase):
self.assertEqual(process.element_type, self.corpus.types.get(slug="page"))
self.assertEqual(process.bucket_name, "blah")
self.assertEqual(process.prefix, "a/b/c")
self.assertEqual(process.farm, farm)
worker_run = process.worker_runs.get()
self.assertEqual(worker_run.version, self.import_worker_version)
......@@ -245,19 +245,21 @@ class TestCreateS3Import(FixtureTestCase):
"INGEST_S3_SECRET_KEY": "its-secret-i-wont-tell-you",
})
@expectedFailure
@override_settings(INGEST_IMAGESERVER_ID=999)
@patch("arkindex.users.utils.get_max_level", return_value=None)
def test_farm_guest(self, get_max_level_mock):
@patch("arkindex.ponos.models.Farm.is_available", return_value=False)
def test_farm_guest(self, is_available_mock):
self.user.user_scopes.create(scope=Scope.S3Ingest)
self.client.force_login(self.user)
self.corpus.types.create(slug="folder", display_name="Folder", folder=True)
ImageServer.objects.create(id=999, display_name="Ingest image server", url="https://dev.null.teklia.com")
farm = Farm.objects.create(name="Crypto farm")
with self.assertNumQueries(5), self.settings(IMPORTS_WORKER_VERSION=str(self.import_worker_version.id)):
response = self.client.post(reverse("api:s3-import-create"), {
"corpus_id": str(self.corpus.id),
"bucket_name": "blah",
"farm_id": str(farm.id),
})
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
......@@ -266,20 +268,43 @@ class TestCreateS3Import(FixtureTestCase):
})
self.assertFalse(Process.objects.filter(mode=ProcessMode.S3).exists())
self.assertEqual(get_max_level_mock.call_count, 1)
self.assertEqual(is_available_mock.call_count, 1)
self.assertEqual(is_available_mock.call_args, call(self.user))
@expectedFailure
@override_settings(INGEST_IMAGESERVER_ID=999)
@patch("arkindex.users.utils.get_max_level", return_value=None)
def test_default_farm_guest(self, get_max_level_mock):
@patch("arkindex.process.serializers.ingest.get_default_farm")
def test_default_farm(self, get_default_farm_mock):
self.user.user_scopes.create(scope=Scope.S3Ingest)
self.client.force_login(self.user)
self.corpus.types.create(slug="folder", display_name="Folder", folder=True)
ImageServer.objects.create(id=999, display_name="Ingest image server", url="https://dev.null.teklia.com")
self.default_farm.memberships.filter(user=self.user).delete()
default_farm = Farm.objects.create(name="Crypto farm")
get_default_farm_mock.return_value = default_farm
with self.assertNumQueries(5), self.settings(IMPORTS_WORKER_VERSION=str(self.import_worker_version.id)):
with self.assertNumQueries(21), self.settings(IMPORTS_WORKER_VERSION=str(self.import_worker_version.id)):
response = self.client.post(reverse("api:s3-import-create"), {
"corpus_id": str(self.corpus.id),
"bucket_name": "blah",
})
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
process = Process.objects.get(id=response.json()["id"])
self.assertEqual(process.farm, default_farm)
@override_settings(INGEST_IMAGESERVER_ID=999)
@patch("arkindex.ponos.models.Farm.is_available", return_value=False)
@patch("arkindex.process.serializers.ingest.get_default_farm")
def test_default_farm_guest(self, get_default_farm_mock, is_available_mock):
self.user.user_scopes.create(scope=Scope.S3Ingest)
self.client.force_login(self.user)
self.corpus.types.create(slug="folder", display_name="Folder", folder=True)
ImageServer.objects.create(id=999, display_name="Ingest image server", url="https://dev.null.teklia.com")
default_farm = Farm.objects.create(name="Crypto farm")
get_default_farm_mock.return_value = default_farm
with self.assertNumQueries(4), self.settings(IMPORTS_WORKER_VERSION=str(self.import_worker_version.id)):
response = self.client.post(reverse("api:s3-import-create"), {
"corpus_id": str(self.corpus.id),
"bucket_name": "blah",
......@@ -291,5 +316,5 @@ class TestCreateS3Import(FixtureTestCase):
})
self.assertFalse(Process.objects.filter(mode=ProcessMode.S3).exists())
self.assertEqual(get_max_level_mock.call_count, 1)
self.assertEqual(get_max_level_mock.call_args, call(self.user, self.default_farm))
self.assertEqual(is_available_mock.call_count, 1)
self.assertEqual(is_available_mock.call_args, call(self.user))
......@@ -2076,6 +2076,28 @@ class TestProcesses(FixtureAPITestCase):
process = Process.objects.get(id=data["id"])
self.assertEqual(process.farm, farm)
@patch("arkindex.process.serializers.imports.get_default_farm")
def test_from_files_default_farm(self, get_default_farm_mock):
farm = Farm.objects.get(name="Wheat farm")
get_default_farm_mock.return_value = farm
self.client.force_login(self.user)
self.assertEqual(self.version_with_model.worker_runs.count(), 0)
with (
self.settings(IMPORTS_WORKER_VERSION=str(self.version_with_model.id)),
self.assertNumQueries(25),
):
response = self.client.post(reverse("api:files-process"), {
"files": [str(self.img_df.id)],
"folder_type": "volume",
"element_type": "page",
}, format="json")
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
data = response.json()
process = Process.objects.get(id=data["id"])
self.assertEqual(process.farm, farm)
@patch("arkindex.ponos.models.Farm.is_available", return_value=False)
def test_from_files_farm_guest(self, is_available_mock):
self.client.force_login(self.user)
......@@ -2472,6 +2494,28 @@ class TestProcesses(FixtureAPITestCase):
self.assertEqual(workers_process.state, State.Unscheduled)
self.assertEqual(workers_process.farm_id, farm.id)
@patch("arkindex.process.serializers.imports.get_default_farm")
def test_start_process_default_farm(self, get_default_farm_mock):
farm = Farm.objects.get(name="Wheat farm")
get_default_farm_mock.return_value = farm
process2 = self.corpus.processes.create(creator=self.user, mode=ProcessMode.Workers)
process2.worker_runs.create(version=self.recognizer, parents=[], configuration=None)
self.assertFalse(process2.tasks.exists())
self.client.force_login(self.user)
with self.assertNumQueries(15):
response = self.client.post(
reverse("api:process-start", kwargs={"pk": str(process2.id)})
)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
self.assertEqual(response.json()["id"], str(process2.id))
process2.refresh_from_db()
self.assertEqual(process2.state, State.Unscheduled)
self.assertEqual(process2.farm, farm)
@patch("arkindex.ponos.models.Farm.is_available", return_value=False)
@patch("arkindex.process.serializers.imports.get_default_farm")
def test_start_process_default_farm_guest(self, get_default_farm_mock, is_available_mock):
......
import json
from hashlib import md5
from django.conf import settings
from django.db.models import CharField, Value
from django.db.models.functions import Cast, Concat, NullIf
from django.utils.module_loading import import_string
from arkindex.project.tools import RTrimChr
__default_farm = None
def get_default_farm():
return None
get_default_farm = (
import_string(settings.GET_DEFAULT_FARM)
if getattr(settings, "GET_DEFAULT_FARM", None)
else lambda: None
)
def hash_object(object):
......
......@@ -221,4 +221,8 @@ def get_settings_parser(base_dir):
ingest_parser.add_option("extra_buckets", type=str, many=True, default=[])
ingest_parser.add_option("prefix_by_bucket_name", type=bool, default=True)
cleanup_parser = parser.add_subparser("cleanup", default={})
cleanup_parser.add_option("worker_delay", type=int, default=30)
cleanup_parser.add_option("model_delay", type=int, default=30)
return parser
......@@ -561,6 +561,8 @@ FRONTEND_DOORBELL = conf["doorbell"]
# Banner settings
FRONTEND_BANNER = conf["banner"]
WORKER_CLEANUP_DELAY = conf["cleanup"]["worker_delay"]
MODEL_CLEANUP_DELAY = conf["cleanup"]["model_delay"]
# Optional default group to add new users to when they complete email verification
SIGNUP_DEFAULT_GROUP = conf["signup_default_group"]
......
......@@ -116,11 +116,6 @@ class FixtureMixin(object):
# Clean content type cache for SQL requests checks consistency
ContentType.objects.clear_cache()
# Force clean the default farm global variable in `arkindex.process.utils` module
# This is required not to alter query counts and avoid caching a farm that does not exist in the fixture
from arkindex.process import utils
setattr(utils, "__default_farm", None)
# Clear the local cached properties so that it is re-fetched on each test
# to avoid intermittently changing query counts.
# Using `del` on a cached property that has not been accessed yet can cause an AttributeError.
......
......@@ -7,6 +7,9 @@ cache:
path: null
type: null
url: null
cleanup:
model_delay: 30
worker_delay: 30
cors:
origin_whitelist:
- http://localhost:8080
......
......@@ -4,6 +4,9 @@ banner:
style: big
cache:
type: redis
cleanup:
model_delay: forever
worker_delay: immediately
cors:
origin_whitelist: france
suffixes: 1
......
......@@ -2,6 +2,9 @@ banner:
style: "'big' is not a valid BannerStyle"
cache:
url: cache.url is required for a Redis cache
cleanup:
model_delay: "invalid literal for int() with base 10: 'forever'"
worker_delay: "invalid literal for int() with base 10: 'immediately'"
cors:
suffixes: "'int' object is not iterable"
csrf:
......
......@@ -8,6 +8,9 @@ cache:
path: /
type: filesystem
url: http://aaa
cleanup:
model_delay: 9998
worker_delay: 9999
cors:
origin_whitelist:
- localtoast:1337
......
......@@ -10,6 +10,8 @@ def has_access(user: User, instance, level: int, skip_public: bool = False) -> b
Check if the user has access to a generic instance with a minimum level
If skip_public parameter is set to true, exclude rights on public instances
"""
if user.is_anonymous:
return level <= Role.Guest.value and not skip_public and getattr(instance, "public", False)
return True
......@@ -18,6 +20,11 @@ def filter_rights(user: User, model, level: int):
Return a generic queryset of objects with access rights for this user.
Level filtering parameter should be an integer between 1 and 100.
"""
if user.is_anonymous:
if hasattr(model, "public"):
return model.objects.filter(public=True).annotate(max_level=Value(Role.Guest.value, IntegerField()))
return model.objects.none()
return model.objects.annotate(max_level=Value(Role.Admin.value, IntegerField()))
......@@ -25,4 +32,9 @@ def get_max_level(user: User, instance) -> Optional[int]:
"""
Returns the maximum access level on a given model instance
"""
if user.is_anonymous:
if getattr(instance, "public", False):
return Role.Guest.value
return None
return Role.Admin.value