diff --git a/arkindex/documents/management/commands/cleanup.py b/arkindex/documents/management/commands/cleanup.py index 46c0dce60f4efa6ee6ec9013c7c4f872c3d9f8b2..ff521f16bd832c5fe4e2e06961a474d30e87e4f3 100644 --- a/arkindex/documents/management/commands/cleanup.py +++ b/arkindex/documents/management/commands/cleanup.py @@ -14,10 +14,21 @@ from rq.utils import as_text from arkindex.documents.models import CorpusExport, CorpusExportState, Element from arkindex.images.models import Image, ImageServer from arkindex.ponos.models import Artifact, Task -from arkindex.process.models import DataFile, GitRef, GitRefType, Process, WorkerVersion, WorkerVersionState +from arkindex.process.models import ( + CorpusWorkerVersion, + DataFile, + GitRef, + GitRefType, + Process, + Worker, + WorkerActivity, + WorkerRun, + WorkerVersion, + WorkerVersionState, +) from arkindex.project.aws import s3 from arkindex.project.rq_overrides import Job -from arkindex.training.models import ModelVersion +from arkindex.training.models import Model, ModelVersion from redis.exceptions import ConnectionError # Ponos artifacts use the path: <task id>/<path> @@ -32,6 +43,9 @@ class Command(BaseCommand): help = "Clean up old corpus exports, trashed DataFiles, expired processes and S3 buckets" def handle(self, *args, **options): + # Cleaning up workers could free some artifacts, so clean them before artifacts + self.cleanup_archived_workers() + self.cleanup_artifacts() self.cleanup_expired_processes() @@ -48,6 +62,8 @@ class Command(BaseCommand): self.cleanup_ponos_logs() + self.cleanup_archived_models() + self.cleanup_unlinked_model_versions() self.cleanup_rq_user_registries() @@ -294,6 +310,71 @@ class Command(BaseCommand): self.stdout.write(self.style.SUCCESS("Successfully cleaned up orphaned Ponos logs.")) + def cleanup_archived_workers(self): + """ + Remove Worker instances that have been archived for longer than the configured worker cleanup delay + and that are not being used in any worker result. + """ + self.stdout.write("Removing archived workers…") + + workers = Worker.objects.filter(archived__lte=timezone.now() - timedelta(days=settings.WORKER_CLEANUP_DELAY)) + skipped, deleted = 0, 0 + for worker in workers.iterator(): + # There are both foreign keys for worker versions and worker runs on worker results. + # Some old results might only have a worker version ID, but when a worker run ID is set, + # the worker version ID is deduced from it, so we only have to check on the version. + if worker.versions.all().in_use(): + skipped += 1 + continue + + # Skip any workers whose WorkerConfigurations are in use. + # This should never happen since we already filter on the WorkerVersions, + # but that could lead to deleting worker results when we didn't want to. + if WorkerRun.objects.filter(configuration__worker=worker).in_use(): + self.stdout.write(self.style.WARNING( + f"Worker {worker.name} ({worker.id}) does not have any worker versions used by worker results, " + "but some of its worker configurations are in use." + )) + continue + + self.stdout.write(f"Removing worker {worker.name} ({worker.id})") + worker.delete() + deleted += 1 + + if skipped: + self.stdout.write(f"Skipping {skipped} archived workers that have worker versions or configurations used in worker results.") + self.stdout.write(self.style.SUCCESS(f"Successfully cleaned up {deleted} archived workers.")) + + def cleanup_archived_models(self): + """ + Remove Model instances that have been archived for longer than the configured model cleanup delay + and that are not being used in any worker result. + """ + self.stdout.write("Removing archived models…") + models = Model.objects.filter(archived__lte=timezone.now() - timedelta(days=settings.MODEL_CLEANUP_DELAY)) + + skipped, deleted = 0, 0 + for model in models.iterator(): + if WorkerRun.objects.filter(model_version__model=model).in_use(): + skipped += 1 + continue + + self.stdout.write(f"Removing model {model.name} ({model.id})") + + # Remove CorpusWorkerVersions and WorkerActivities first + # Those normally use SET_NULL, but this can cause the unique constraints to complain + # if there already are rows with a model version set to None. + WorkerActivity.objects.filter(model_version__model=model).delete() + CorpusWorkerVersion.objects.filter(model_version__model=model).delete() + + model.delete() + + deleted += 1 + + if skipped: + self.stdout.write(f"Skipping {skipped} archived models that have model versions used in worker results.") + self.stdout.write(self.style.SUCCESS(f"Successfully cleaned up {deleted} archived models.")) + def cleanup_unlinked_model_versions(self): self.stdout.write("Removing orphaned model versions archives…") bucket = s3.Bucket(settings.AWS_TRAINING_BUCKET) diff --git a/arkindex/documents/tests/commands/test_cleanup.py b/arkindex/documents/tests/commands/test_cleanup.py index 50270d5f014e53cce1694b50426bc55a4230f03e..40a215c8c8ccb441be6eaf8796e9aac72179217a 100644 --- a/arkindex/documents/tests/commands/test_cleanup.py +++ b/arkindex/documents/tests/commands/test_cleanup.py @@ -11,7 +11,17 @@ from django.test import override_settings from arkindex.documents.models import CorpusExport, CorpusExportState, Element from arkindex.images.models import Image, ImageServer from arkindex.ponos.models import Artifact, Farm, Task -from arkindex.process.models import DataFile, GitRefType, Process, ProcessMode, Repository, WorkerVersionState +from arkindex.process.models import ( + DataFile, + GitRefType, + Process, + ProcessMode, + Repository, + Worker, + WorkerRun, + WorkerVersion, + WorkerVersionState, +) from arkindex.project.tests import FixtureTestCase from arkindex.training.models import Dataset, Model, ModelVersion @@ -41,6 +51,8 @@ class TestCleanupCommand(FixtureTestCase): self.cleanup(), dedent( f""" + Removing archived workers… + Successfully cleaned up 0 archived workers. Removing orphaned Ponos artifacts… Successfully cleaned up orphaned Ponos artifacts. Removing 0 artifacts of expired processes from S3… @@ -63,6 +75,8 @@ class TestCleanupCommand(FixtureTestCase): Successfully cleaned up orphaned local images. Removing orphaned Ponos logs… Successfully cleaned up orphaned Ponos logs. + Removing archived models… + Successfully cleaned up 0 archived models. Removing orphaned model versions archives… Successfully cleaned up orphaned model versions archives. Cleaning up deleted jobs from RQ user registries… @@ -83,6 +97,8 @@ class TestCleanupCommand(FixtureTestCase): self.cleanup(), dedent( """ + Removing archived workers… + Successfully cleaned up 0 archived workers. Removing orphaned Ponos artifacts… Successfully cleaned up orphaned Ponos artifacts. Removing 0 artifacts of expired processes from S3… @@ -104,6 +120,8 @@ class TestCleanupCommand(FixtureTestCase): Successfully cleaned up orphaned local images. Removing orphaned Ponos logs… Successfully cleaned up orphaned Ponos logs. + Removing archived models… + Successfully cleaned up 0 archived models. Removing orphaned model versions archives… Successfully cleaned up orphaned model versions archives. Cleaning up deleted jobs from RQ user registries… @@ -124,6 +142,8 @@ class TestCleanupCommand(FixtureTestCase): self.cleanup(), dedent( f""" + Removing archived workers… + Successfully cleaned up 0 archived workers. Removing orphaned Ponos artifacts… Successfully cleaned up orphaned Ponos artifacts. Removing 0 artifacts of expired processes from S3… @@ -147,6 +167,8 @@ class TestCleanupCommand(FixtureTestCase): Successfully cleaned up orphaned local images. Removing orphaned Ponos logs… Successfully cleaned up orphaned Ponos logs. + Removing archived models… + Successfully cleaned up 0 archived models. Removing orphaned model versions archives… Successfully cleaned up orphaned model versions archives. Cleaning up deleted jobs from RQ user registries… @@ -175,6 +197,8 @@ class TestCleanupCommand(FixtureTestCase): self.cleanup(), dedent( f""" + Removing archived workers… + Successfully cleaned up 0 archived workers. Removing orphaned Ponos artifacts… Successfully cleaned up orphaned Ponos artifacts. Removing 0 artifacts of expired processes from S3… @@ -197,6 +221,8 @@ class TestCleanupCommand(FixtureTestCase): Successfully cleaned up orphaned local images. Removing orphaned Ponos logs… Successfully cleaned up orphaned Ponos logs. + Removing archived models… + Successfully cleaned up 0 archived models. Removing orphaned model versions archives… Successfully cleaned up orphaned model versions archives. Cleaning up deleted jobs from RQ user registries… @@ -227,6 +253,8 @@ class TestCleanupCommand(FixtureTestCase): self.cleanup(), dedent( f""" + Removing archived workers… + Successfully cleaned up 0 archived workers. Removing orphaned Ponos artifacts… Successfully cleaned up orphaned Ponos artifacts. Removing 0 artifacts of expired processes from S3… @@ -249,6 +277,8 @@ class TestCleanupCommand(FixtureTestCase): Successfully cleaned up orphaned local images. Removing orphaned Ponos logs… Successfully cleaned up orphaned Ponos logs. + Removing archived models… + Successfully cleaned up 0 archived models. Removing orphaned model versions archives… Successfully cleaned up orphaned model versions archives. Cleaning up deleted jobs from RQ user registries… @@ -287,6 +317,8 @@ class TestCleanupCommand(FixtureTestCase): self.cleanup(), dedent( f""" + Removing archived workers… + Successfully cleaned up 0 archived workers. Removing orphaned Ponos artifacts… Successfully cleaned up orphaned Ponos artifacts. Removing 0 artifacts of expired processes from S3… @@ -310,6 +342,8 @@ class TestCleanupCommand(FixtureTestCase): Successfully cleaned up orphaned local images. Removing orphaned Ponos logs… Successfully cleaned up orphaned Ponos logs. + Removing archived models… + Successfully cleaned up 0 archived models. Removing orphaned model versions archives… Successfully cleaned up orphaned model versions archives. Cleaning up deleted jobs from RQ user registries… @@ -358,6 +392,8 @@ class TestCleanupCommand(FixtureTestCase): self.cleanup(), dedent( """ + Removing archived workers… + Successfully cleaned up 0 archived workers. Removing orphaned Ponos artifacts… Removing artifact aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa/sad/artifact.zip… Unsupported artifact cant_touch_this.txt.vbs @@ -383,6 +419,8 @@ class TestCleanupCommand(FixtureTestCase): Successfully cleaned up orphaned local images. Removing orphaned Ponos logs… Successfully cleaned up orphaned Ponos logs. + Removing archived models… + Successfully cleaned up 0 archived models. Removing orphaned model versions archives… Successfully cleaned up orphaned model versions archives. Cleaning up deleted jobs from RQ user registries… @@ -465,6 +503,8 @@ class TestCleanupCommand(FixtureTestCase): self.cleanup(), dedent( """ + Removing archived workers… + Successfully cleaned up 0 archived workers. Removing orphaned Ponos artifacts… Successfully cleaned up orphaned Ponos artifacts. Removing 1 artifacts of expired processes from S3… @@ -488,6 +528,8 @@ class TestCleanupCommand(FixtureTestCase): Successfully cleaned up orphaned local images. Removing orphaned Ponos logs… Successfully cleaned up orphaned Ponos logs. + Removing archived models… + Successfully cleaned up 0 archived models. Removing orphaned model versions archives… Successfully cleaned up orphaned model versions archives. Cleaning up deleted jobs from RQ user registries… @@ -571,6 +613,8 @@ class TestCleanupCommand(FixtureTestCase): self.cleanup(), dedent( """ + Removing archived workers… + Successfully cleaned up 0 archived workers. Removing orphaned Ponos artifacts… Successfully cleaned up orphaned Ponos artifacts. Removing 2 artifacts of expired processes from S3… @@ -596,6 +640,8 @@ class TestCleanupCommand(FixtureTestCase): Successfully cleaned up orphaned local images. Removing orphaned Ponos logs… Successfully cleaned up orphaned Ponos logs. + Removing archived models… + Successfully cleaned up 0 archived models. Removing orphaned model versions archives… Successfully cleaned up orphaned model versions archives. Cleaning up deleted jobs from RQ user registries… @@ -667,6 +713,8 @@ class TestCleanupCommand(FixtureTestCase): self.cleanup(), dedent( """ + Removing archived workers… + Successfully cleaned up 0 archived workers. Removing orphaned Ponos artifacts… Successfully cleaned up orphaned Ponos artifacts. Removing 2 artifacts of expired processes from S3… @@ -692,6 +740,8 @@ class TestCleanupCommand(FixtureTestCase): Successfully cleaned up orphaned local images. Removing orphaned Ponos logs… Successfully cleaned up orphaned Ponos logs. + Removing archived models… + Successfully cleaned up 0 archived models. Removing orphaned model versions archives… Successfully cleaned up orphaned model versions archives. Cleaning up deleted jobs from RQ user registries… @@ -747,6 +797,8 @@ class TestCleanupCommand(FixtureTestCase): self.cleanup(), dedent( """ + Removing archived workers… + Successfully cleaned up 0 archived workers. Removing orphaned Ponos artifacts… Successfully cleaned up orphaned Ponos artifacts. Removing 0 artifacts of expired processes from S3… @@ -771,6 +823,8 @@ class TestCleanupCommand(FixtureTestCase): Successfully cleaned up orphaned local images. Removing orphaned Ponos logs… Successfully cleaned up orphaned Ponos logs. + Removing archived models… + Successfully cleaned up 0 archived models. Removing orphaned model versions archives… Successfully cleaned up orphaned model versions archives. Cleaning up deleted jobs from RQ user registries… @@ -802,11 +856,13 @@ class TestCleanupCommand(FixtureTestCase): image_no_element_old_2 = Image.objects.create(path="path/pathpathpath/img", width=12, height=12, server=img_server) image_no_element_new = Image.objects.create(path="path/pathpath/img", width=12, height=12, server=img_server) - with self.assertNumQueries(19): + with self.assertNumQueries(21): self.assertEqual( self.cleanup(), dedent( """ + Removing archived workers… + Successfully cleaned up 0 archived workers. Removing orphaned Ponos artifacts… Successfully cleaned up orphaned Ponos artifacts. Removing 0 artifacts of expired processes from S3… @@ -829,6 +885,8 @@ class TestCleanupCommand(FixtureTestCase): Successfully cleaned up orphaned local images. Removing orphaned Ponos logs… Successfully cleaned up orphaned Ponos logs. + Removing archived models… + Successfully cleaned up 0 archived models. Removing orphaned model versions archives… Successfully cleaned up orphaned model versions archives. Cleaning up deleted jobs from RQ user registries… @@ -881,6 +939,8 @@ class TestCleanupCommand(FixtureTestCase): self.cleanup(), dedent( """ + Removing archived workers… + Successfully cleaned up 0 archived workers. Removing orphaned Ponos artifacts… Successfully cleaned up orphaned Ponos artifacts. Removing 0 artifacts of expired processes from S3… @@ -906,6 +966,8 @@ class TestCleanupCommand(FixtureTestCase): Removing log bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb.log… An error occurred (500) when calling the delete_object operation: Unknown Successfully cleaned up orphaned Ponos logs. + Removing archived models… + Successfully cleaned up 0 archived models. Removing orphaned model versions archives… Successfully cleaned up orphaned model versions archives. Cleaning up deleted jobs from RQ user registries… @@ -960,6 +1022,8 @@ class TestCleanupCommand(FixtureTestCase): self.cleanup(), dedent( """ + Removing archived workers… + Successfully cleaned up 0 archived workers. Removing orphaned Ponos artifacts… Successfully cleaned up orphaned Ponos artifacts. Removing 0 artifacts of expired processes from S3… @@ -981,6 +1045,8 @@ class TestCleanupCommand(FixtureTestCase): Successfully cleaned up orphaned local images. Removing orphaned Ponos logs… Successfully cleaned up orphaned Ponos logs. + Removing archived models… + Successfully cleaned up 0 archived models. Removing orphaned model versions archives… Removing model version archive bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb.zst… Unsupported model version archive cant_touch_this.txt.vbs @@ -1038,6 +1104,8 @@ class TestCleanupCommand(FixtureTestCase): self.cleanup(), dedent( """ + Removing archived workers… + Successfully cleaned up 0 archived workers. Removing orphaned Ponos artifacts… Successfully cleaned up orphaned Ponos artifacts. Removing 0 artifacts of expired processes from S3… @@ -1063,6 +1131,8 @@ class TestCleanupCommand(FixtureTestCase): Successfully cleaned up orphaned local images. Removing orphaned Ponos logs… Successfully cleaned up orphaned Ponos logs. + Removing archived models… + Successfully cleaned up 0 archived models. Removing orphaned model versions archives… Successfully cleaned up orphaned model versions archives. Cleaning up deleted jobs from RQ user registries… @@ -1115,6 +1185,8 @@ class TestCleanupCommand(FixtureTestCase): self.cleanup(), dedent( """ + Removing archived workers… + Successfully cleaned up 0 archived workers. Removing orphaned Ponos artifacts… Successfully cleaned up orphaned Ponos artifacts. Removing 0 artifacts of expired processes from S3… @@ -1136,6 +1208,8 @@ class TestCleanupCommand(FixtureTestCase): Successfully cleaned up orphaned local images. Removing orphaned Ponos logs… Successfully cleaned up orphaned Ponos logs. + Removing archived models… + Successfully cleaned up 0 archived models. Removing orphaned model versions archives… Successfully cleaned up orphaned model versions archives. Cleaning up deleted jobs from RQ user registries… @@ -1172,3 +1246,146 @@ class TestCleanupCommand(FixtureTestCase): call("rq:registry:user:2", "job4", "job2"), )) self.assertEqual(remaining_call, call("rq:registry:user:2", "job5")) + + @override_settings(WORKER_CLEANUP_DELAY=10) + def test_cleanup_archived_workers(self, s3_mock, rq_mock): + # This worker can be cleaned up + removable_worker = Worker.objects.get(slug="dla") + self.assertFalse(removable_worker.versions.all().in_use()) + removable_worker.archived = datetime.now(timezone.utc) - timedelta(days=11) + removable_worker.save() + + # This worker cannot be cleaned up because it is used in ML results + used_worker = Worker.objects.get(slug="reco") + self.assertTrue(used_worker.versions.all().in_use()) + used_worker.archived = datetime.now(timezone.utc) - timedelta(days=11) + used_worker.save() + + non_archived = Worker.objects.create( + name="Unarchived worker", + type=used_worker.type, + slug="unarchived", + ) + recently_archived = Worker.objects.create( + name="Recently archived worker", + type=used_worker.type, + slug="pending", + archived=datetime.now(timezone.utc) - timedelta(days=9), + ) + + self.assertEqual( + self.cleanup(), + dedent( + f""" + Removing archived workers… + Removing worker Document layout analyser ({removable_worker.id}) + Skipping 1 archived workers that have worker versions or configurations used in worker results. + Successfully cleaned up 1 archived workers. + Removing orphaned Ponos artifacts… + Successfully cleaned up orphaned Ponos artifacts. + Removing 0 artifacts of expired processes from S3… + Removing logs for 0 tasks of expired processes from S3… + Updating 0 available worker versions to the Error state… + Removing 0 artifacts of expired processes… + Removing 0 tasks of expired processes… + Successfully cleaned up expired processes. + Removing 0 old corpus exports from S3… + Removing 0 old corpus exports… + Successfully cleaned up old corpus exports. + Removing orphaned corpus exports… + Successfully cleaned up orphaned corpus exports. + Deleting 0 DataFiles marked as trashed from S3 and the database… + Successfully cleaned up DataFiles marked as trashed. + Removing orphan images… + Successfully cleaned up orphan images. + Removing orphaned local images… + Successfully cleaned up orphaned local images. + Removing orphaned Ponos logs… + Successfully cleaned up orphaned Ponos logs. + Removing archived models… + Successfully cleaned up 0 archived models. + Removing orphaned model versions archives… + Successfully cleaned up orphaned model versions archives. + Cleaning up deleted jobs from RQ user registries… + Successfully cleaned up 0 deleted jobs from 0 RQ user registries. + """ + ).strip() + ) + + with self.assertRaises(Worker.DoesNotExist): + removable_worker.refresh_from_db() + used_worker.refresh_from_db() + non_archived.refresh_from_db() + recently_archived.refresh_from_db() + + @override_settings(MODEL_CLEANUP_DELAY=10) + def test_cleanup_archived_models(self, s3_mock, rq_mock): + # This model can be cleaned up + removable_model = Model.objects.create(name="Removable", archived=datetime.now(timezone.utc) - timedelta(days=11)) + + # This model cannot be cleaned up because it is used in ML results + used_model = Model.objects.create(name="Used", archived=datetime.now(timezone.utc) - timedelta(days=11)) + process = self.corpus.processes.create(mode=ProcessMode.Workers, creator=self.superuser) + worker_version = WorkerVersion.objects.first() + worker_run = process.worker_runs.create( + version=worker_version, + model_version=used_model.versions.create(), + ) + self.corpus.elements.create( + type=self.corpus.types.first(), + name="Some element", + worker_run=worker_run, + worker_version=worker_version, + ) + self.assertTrue(WorkerRun.objects.filter(model_version__model=used_model).in_use()) + + non_archived = Model.objects.create(name="Unarchived") + recently_archived = Model.objects.create( + name="Recently archived", + archived=datetime.now(timezone.utc) - timedelta(days=9), + ) + + self.assertEqual( + self.cleanup(), + dedent( + f""" + Removing archived workers… + Successfully cleaned up 0 archived workers. + Removing orphaned Ponos artifacts… + Successfully cleaned up orphaned Ponos artifacts. + Removing 0 artifacts of expired processes from S3… + Removing logs for 0 tasks of expired processes from S3… + Updating 0 available worker versions to the Error state… + Removing 0 artifacts of expired processes… + Removing 0 tasks of expired processes… + Successfully cleaned up expired processes. + Removing 0 old corpus exports from S3… + Removing 0 old corpus exports… + Successfully cleaned up old corpus exports. + Removing orphaned corpus exports… + Successfully cleaned up orphaned corpus exports. + Deleting 0 DataFiles marked as trashed from S3 and the database… + Successfully cleaned up DataFiles marked as trashed. + Removing orphan images… + Successfully cleaned up orphan images. + Removing orphaned local images… + Successfully cleaned up orphaned local images. + Removing orphaned Ponos logs… + Successfully cleaned up orphaned Ponos logs. + Removing archived models… + Removing model Removable ({removable_model.id}) + Skipping 1 archived models that have model versions used in worker results. + Successfully cleaned up 1 archived models. + Removing orphaned model versions archives… + Successfully cleaned up orphaned model versions archives. + Cleaning up deleted jobs from RQ user registries… + Successfully cleaned up 0 deleted jobs from 0 RQ user registries. + """ + ).strip() + ) + + with self.assertRaises(Model.DoesNotExist): + removable_model.refresh_from_db() + used_model.refresh_from_db() + non_archived.refresh_from_db() + recently_archived.refresh_from_db() diff --git a/arkindex/process/managers.py b/arkindex/process/managers.py index 328593060a041f8af3bd99845cdc164388c94383..a094d7b63cdca0b1d1a3d6783ed81d63a8dc6054 100644 --- a/arkindex/process/managers.py +++ b/arkindex/process/managers.py @@ -130,6 +130,32 @@ class CorpusWorkerVersionManager(Manager): ], ignore_conflicts=True) +class WorkerResultSourceQuerySet(QuerySet): + + @property + def worker_result_relations(self): + from arkindex.process.models import WorkerRun + return ( + field + for field in self.model._meta.get_fields() + if isinstance(field, ManyToOneRel) + # Ignore reverse links to processes tasks + and field.related_model is not Task + # Ignore the WorkerRun→WorkerVersion link + and field.related_model is not WorkerRun + ) + + def in_use(self): + """ + Check if any data is linked to this worker run or worker version + """ + ids = self.values_list("id", flat=True) + return any( + relation.related_model.objects.filter(**{f"{relation.field.name}__in": ids}).exists() + for relation in self.worker_result_relations + ) + + class WorkerVersionManager(Manager): @cached_property @@ -145,6 +171,9 @@ class WorkerVersionManager(Manager): .get(id=settings.IMPORTS_WORKER_VERSION) ) + def get_queryset(self): + return WorkerResultSourceQuerySet(self.model, using=self._db) + class WorkerManager(BaseACLManager): @@ -172,25 +201,7 @@ class WorkerManager(BaseACLManager): ).distinct() -class WorkerRunQuerySet(QuerySet): - def worker_results_models(self): - return [ - field.related_model - for field in self.model._meta.get_fields() - if isinstance(field, ManyToOneRel) - # Ignore reverse links to processes tasks - and field.related_model is not Task - ] - - def in_use(self): - """ - Check if any data is linked to worker runs - """ - ids = self.values_list("id", flat=True) - for field in self.worker_results_models(): - if field.objects.filter(worker_run__in=ids).exists(): - return True - return False +class WorkerRunQuerySet(WorkerResultSourceQuerySet): def set_has_results(self): """ @@ -204,9 +215,9 @@ class WorkerRunQuerySet(QuerySet): def has_results_expression(self): return reduce(operator.or_, [ Exists( - model.objects.filter(worker_run_id=OuterRef("pk")) + relation.related_model.objects.filter(worker_run_id=OuterRef("pk")) ) - for model in self.worker_results_models() + for relation in self.worker_result_relations ]) diff --git a/arkindex/project/config.py b/arkindex/project/config.py index 905a70bcdd4b8551855074aa7a7efd7d1a482c8c..a6ee86f39411821a8f4b26b4669d032bb3e20c2e 100644 --- a/arkindex/project/config.py +++ b/arkindex/project/config.py @@ -221,4 +221,8 @@ def get_settings_parser(base_dir): ingest_parser.add_option("extra_buckets", type=str, many=True, default=[]) ingest_parser.add_option("prefix_by_bucket_name", type=bool, default=True) + cleanup_parser = parser.add_subparser("cleanup", default={}) + cleanup_parser.add_option("worker_delay", type=int, default=30) + cleanup_parser.add_option("model_delay", type=int, default=30) + return parser diff --git a/arkindex/project/settings.py b/arkindex/project/settings.py index b3c25f74170d7ee09e6c2920950135926f4d544b..ee1cd960077868fdf019eff6cd76fefba3fe7ff4 100644 --- a/arkindex/project/settings.py +++ b/arkindex/project/settings.py @@ -561,6 +561,8 @@ FRONTEND_DOORBELL = conf["doorbell"] # Banner settings FRONTEND_BANNER = conf["banner"] +WORKER_CLEANUP_DELAY = conf["cleanup"]["worker_delay"] +MODEL_CLEANUP_DELAY = conf["cleanup"]["model_delay"] # Optional default group to add new users to when they complete email verification SIGNUP_DEFAULT_GROUP = conf["signup_default_group"] diff --git a/arkindex/project/tests/config_samples/defaults.yaml b/arkindex/project/tests/config_samples/defaults.yaml index d0beb4cbebd209cb16fa665acbfd33b1bdd416f1..4818fad81bd21d83baec6febe3473f75f5e0e401 100644 --- a/arkindex/project/tests/config_samples/defaults.yaml +++ b/arkindex/project/tests/config_samples/defaults.yaml @@ -7,6 +7,9 @@ cache: path: null type: null url: null +cleanup: + model_delay: 30 + worker_delay: 30 cors: origin_whitelist: - http://localhost:8080 diff --git a/arkindex/project/tests/config_samples/errors.yaml b/arkindex/project/tests/config_samples/errors.yaml index c22b5729fe5b7b4175ee462228662c2d9be1f3fe..2d7135b74342cecfbfc9a0d3ed1c203a90efed90 100644 --- a/arkindex/project/tests/config_samples/errors.yaml +++ b/arkindex/project/tests/config_samples/errors.yaml @@ -4,6 +4,9 @@ banner: style: big cache: type: redis +cleanup: + model_delay: forever + worker_delay: immediately cors: origin_whitelist: france suffixes: 1 diff --git a/arkindex/project/tests/config_samples/expected_errors.yaml b/arkindex/project/tests/config_samples/expected_errors.yaml index 0c4f96b0b279134569fb428ccec6dc2ca906880f..58738d1f2fbe16ea0351e95e73c18cda8727ee80 100644 --- a/arkindex/project/tests/config_samples/expected_errors.yaml +++ b/arkindex/project/tests/config_samples/expected_errors.yaml @@ -2,6 +2,9 @@ banner: style: "'big' is not a valid BannerStyle" cache: url: cache.url is required for a Redis cache +cleanup: + model_delay: "invalid literal for int() with base 10: 'forever'" + worker_delay: "invalid literal for int() with base 10: 'immediately'" cors: suffixes: "'int' object is not iterable" csrf: diff --git a/arkindex/project/tests/config_samples/override.yaml b/arkindex/project/tests/config_samples/override.yaml index 47d6f745d611626a34dd68f45897cd6ab899ee32..5bba5fe9ebf8e2899e2ce0867114adca02bddddc 100644 --- a/arkindex/project/tests/config_samples/override.yaml +++ b/arkindex/project/tests/config_samples/override.yaml @@ -8,6 +8,9 @@ cache: path: / type: filesystem url: http://aaa +cleanup: + model_delay: 9998 + worker_delay: 9999 cors: origin_whitelist: - localtoast:1337