Skip to content
Snippets Groups Projects
Commit 64a568e1 authored by Erwan Rouchet's avatar Erwan Rouchet Committed by Bastien Abadie
Browse files

Cleanup archived workers and models

parent e575e16f
No related branches found
No related tags found
1 merge request!2209Cleanup archived workers and models
...@@ -14,10 +14,21 @@ from rq.utils import as_text ...@@ -14,10 +14,21 @@ from rq.utils import as_text
from arkindex.documents.models import CorpusExport, CorpusExportState, Element from arkindex.documents.models import CorpusExport, CorpusExportState, Element
from arkindex.images.models import Image, ImageServer from arkindex.images.models import Image, ImageServer
from arkindex.ponos.models import Artifact, Task from arkindex.ponos.models import Artifact, Task
from arkindex.process.models import DataFile, GitRef, GitRefType, Process, WorkerVersion, WorkerVersionState from arkindex.process.models import (
CorpusWorkerVersion,
DataFile,
GitRef,
GitRefType,
Process,
Worker,
WorkerActivity,
WorkerRun,
WorkerVersion,
WorkerVersionState,
)
from arkindex.project.aws import s3 from arkindex.project.aws import s3
from arkindex.project.rq_overrides import Job from arkindex.project.rq_overrides import Job
from arkindex.training.models import ModelVersion from arkindex.training.models import Model, ModelVersion
from redis.exceptions import ConnectionError from redis.exceptions import ConnectionError
# Ponos artifacts use the path: <task id>/<path> # Ponos artifacts use the path: <task id>/<path>
...@@ -32,6 +43,9 @@ class Command(BaseCommand): ...@@ -32,6 +43,9 @@ class Command(BaseCommand):
help = "Clean up old corpus exports, trashed DataFiles, expired processes and S3 buckets" help = "Clean up old corpus exports, trashed DataFiles, expired processes and S3 buckets"
def handle(self, *args, **options): def handle(self, *args, **options):
# Cleaning up workers could free some artifacts, so clean them before artifacts
self.cleanup_archived_workers()
self.cleanup_artifacts() self.cleanup_artifacts()
self.cleanup_expired_processes() self.cleanup_expired_processes()
...@@ -48,6 +62,8 @@ class Command(BaseCommand): ...@@ -48,6 +62,8 @@ class Command(BaseCommand):
self.cleanup_ponos_logs() self.cleanup_ponos_logs()
self.cleanup_archived_models()
self.cleanup_unlinked_model_versions() self.cleanup_unlinked_model_versions()
self.cleanup_rq_user_registries() self.cleanup_rq_user_registries()
...@@ -294,6 +310,71 @@ class Command(BaseCommand): ...@@ -294,6 +310,71 @@ class Command(BaseCommand):
self.stdout.write(self.style.SUCCESS("Successfully cleaned up orphaned Ponos logs.")) self.stdout.write(self.style.SUCCESS("Successfully cleaned up orphaned Ponos logs."))
def cleanup_archived_workers(self):
"""
Remove Worker instances that have been archived for longer than the configured worker cleanup delay
and that are not being used in any worker result.
"""
self.stdout.write("Removing archived workers…")
workers = Worker.objects.filter(archived__lte=timezone.now() - timedelta(days=settings.WORKER_CLEANUP_DELAY))
skipped, deleted = 0, 0
for worker in workers.iterator():
# There are both foreign keys for worker versions and worker runs on worker results.
# Some old results might only have a worker version ID, but when a worker run ID is set,
# the worker version ID is deduced from it, so we only have to check on the version.
if worker.versions.all().in_use():
skipped += 1
continue
# Skip any workers whose WorkerConfigurations are in use.
# This should never happen since we already filter on the WorkerVersions,
# but that could lead to deleting worker results when we didn't want to.
if WorkerRun.objects.filter(configuration__worker=worker).in_use():
self.stdout.write(self.style.WARNING(
f"Worker {worker.name} ({worker.id}) does not have any worker versions used by worker results, "
"but some of its worker configurations are in use."
))
continue
self.stdout.write(f"Removing worker {worker.name} ({worker.id})")
worker.delete()
deleted += 1
if skipped:
self.stdout.write(f"Skipping {skipped} archived workers that have worker versions or configurations used in worker results.")
self.stdout.write(self.style.SUCCESS(f"Successfully cleaned up {deleted} archived workers."))
def cleanup_archived_models(self):
"""
Remove Model instances that have been archived for longer than the configured model cleanup delay
and that are not being used in any worker result.
"""
self.stdout.write("Removing archived models…")
models = Model.objects.filter(archived__lte=timezone.now() - timedelta(days=settings.MODEL_CLEANUP_DELAY))
skipped, deleted = 0, 0
for model in models.iterator():
if WorkerRun.objects.filter(model_version__model=model).in_use():
skipped += 1
continue
self.stdout.write(f"Removing model {model.name} ({model.id})")
# Remove CorpusWorkerVersions and WorkerActivities first
# Those normally use SET_NULL, but this can cause the unique constraints to complain
# if there already are rows with a model version set to None.
WorkerActivity.objects.filter(model_version__model=model).delete()
CorpusWorkerVersion.objects.filter(model_version__model=model).delete()
model.delete()
deleted += 1
if skipped:
self.stdout.write(f"Skipping {skipped} archived models that have model versions used in worker results.")
self.stdout.write(self.style.SUCCESS(f"Successfully cleaned up {deleted} archived models."))
def cleanup_unlinked_model_versions(self): def cleanup_unlinked_model_versions(self):
self.stdout.write("Removing orphaned model versions archives…") self.stdout.write("Removing orphaned model versions archives…")
bucket = s3.Bucket(settings.AWS_TRAINING_BUCKET) bucket = s3.Bucket(settings.AWS_TRAINING_BUCKET)
......
This diff is collapsed.
...@@ -130,6 +130,32 @@ class CorpusWorkerVersionManager(Manager): ...@@ -130,6 +130,32 @@ class CorpusWorkerVersionManager(Manager):
], ignore_conflicts=True) ], ignore_conflicts=True)
class WorkerResultSourceQuerySet(QuerySet):
@property
def worker_result_relations(self):
from arkindex.process.models import WorkerRun
return (
field
for field in self.model._meta.get_fields()
if isinstance(field, ManyToOneRel)
# Ignore reverse links to processes tasks
and field.related_model is not Task
# Ignore the WorkerRun→WorkerVersion link
and field.related_model is not WorkerRun
)
def in_use(self):
"""
Check if any data is linked to this worker run or worker version
"""
ids = self.values_list("id", flat=True)
return any(
relation.related_model.objects.filter(**{f"{relation.field.name}__in": ids}).exists()
for relation in self.worker_result_relations
)
class WorkerVersionManager(Manager): class WorkerVersionManager(Manager):
@cached_property @cached_property
...@@ -145,6 +171,9 @@ class WorkerVersionManager(Manager): ...@@ -145,6 +171,9 @@ class WorkerVersionManager(Manager):
.get(id=settings.IMPORTS_WORKER_VERSION) .get(id=settings.IMPORTS_WORKER_VERSION)
) )
def get_queryset(self):
return WorkerResultSourceQuerySet(self.model, using=self._db)
class WorkerManager(BaseACLManager): class WorkerManager(BaseACLManager):
...@@ -172,25 +201,7 @@ class WorkerManager(BaseACLManager): ...@@ -172,25 +201,7 @@ class WorkerManager(BaseACLManager):
).distinct() ).distinct()
class WorkerRunQuerySet(QuerySet): class WorkerRunQuerySet(WorkerResultSourceQuerySet):
def worker_results_models(self):
return [
field.related_model
for field in self.model._meta.get_fields()
if isinstance(field, ManyToOneRel)
# Ignore reverse links to processes tasks
and field.related_model is not Task
]
def in_use(self):
"""
Check if any data is linked to worker runs
"""
ids = self.values_list("id", flat=True)
for field in self.worker_results_models():
if field.objects.filter(worker_run__in=ids).exists():
return True
return False
def set_has_results(self): def set_has_results(self):
""" """
...@@ -204,9 +215,9 @@ class WorkerRunQuerySet(QuerySet): ...@@ -204,9 +215,9 @@ class WorkerRunQuerySet(QuerySet):
def has_results_expression(self): def has_results_expression(self):
return reduce(operator.or_, [ return reduce(operator.or_, [
Exists( Exists(
model.objects.filter(worker_run_id=OuterRef("pk")) relation.related_model.objects.filter(worker_run_id=OuterRef("pk"))
) )
for model in self.worker_results_models() for relation in self.worker_result_relations
]) ])
......
...@@ -221,4 +221,8 @@ def get_settings_parser(base_dir): ...@@ -221,4 +221,8 @@ def get_settings_parser(base_dir):
ingest_parser.add_option("extra_buckets", type=str, many=True, default=[]) ingest_parser.add_option("extra_buckets", type=str, many=True, default=[])
ingest_parser.add_option("prefix_by_bucket_name", type=bool, default=True) ingest_parser.add_option("prefix_by_bucket_name", type=bool, default=True)
cleanup_parser = parser.add_subparser("cleanup", default={})
cleanup_parser.add_option("worker_delay", type=int, default=30)
cleanup_parser.add_option("model_delay", type=int, default=30)
return parser return parser
...@@ -561,6 +561,8 @@ FRONTEND_DOORBELL = conf["doorbell"] ...@@ -561,6 +561,8 @@ FRONTEND_DOORBELL = conf["doorbell"]
# Banner settings # Banner settings
FRONTEND_BANNER = conf["banner"] FRONTEND_BANNER = conf["banner"]
WORKER_CLEANUP_DELAY = conf["cleanup"]["worker_delay"]
MODEL_CLEANUP_DELAY = conf["cleanup"]["model_delay"]
# Optional default group to add new users to when they complete email verification # Optional default group to add new users to when they complete email verification
SIGNUP_DEFAULT_GROUP = conf["signup_default_group"] SIGNUP_DEFAULT_GROUP = conf["signup_default_group"]
......
...@@ -7,6 +7,9 @@ cache: ...@@ -7,6 +7,9 @@ cache:
path: null path: null
type: null type: null
url: null url: null
cleanup:
model_delay: 30
worker_delay: 30
cors: cors:
origin_whitelist: origin_whitelist:
- http://localhost:8080 - http://localhost:8080
......
...@@ -4,6 +4,9 @@ banner: ...@@ -4,6 +4,9 @@ banner:
style: big style: big
cache: cache:
type: redis type: redis
cleanup:
model_delay: forever
worker_delay: immediately
cors: cors:
origin_whitelist: france origin_whitelist: france
suffixes: 1 suffixes: 1
......
...@@ -2,6 +2,9 @@ banner: ...@@ -2,6 +2,9 @@ banner:
style: "'big' is not a valid BannerStyle" style: "'big' is not a valid BannerStyle"
cache: cache:
url: cache.url is required for a Redis cache url: cache.url is required for a Redis cache
cleanup:
model_delay: "invalid literal for int() with base 10: 'forever'"
worker_delay: "invalid literal for int() with base 10: 'immediately'"
cors: cors:
suffixes: "'int' object is not iterable" suffixes: "'int' object is not iterable"
csrf: csrf:
......
...@@ -8,6 +8,9 @@ cache: ...@@ -8,6 +8,9 @@ cache:
path: / path: /
type: filesystem type: filesystem
url: http://aaa url: http://aaa
cleanup:
model_delay: 9998
worker_delay: 9999
cors: cors:
origin_whitelist: origin_whitelist:
- localtoast:1337 - localtoast:1337
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment