Skip to content
Snippets Groups Projects

Cleanup archived workers and models

Merged Erwan Rouchet requested to merge archived-cleanup into master
All threads resolved!
Files
9
@@ -14,10 +14,21 @@ from rq.utils import as_text
@@ -14,10 +14,21 @@ from rq.utils import as_text
from arkindex.documents.models import CorpusExport, CorpusExportState, Element
from arkindex.documents.models import CorpusExport, CorpusExportState, Element
from arkindex.images.models import Image, ImageServer
from arkindex.images.models import Image, ImageServer
from arkindex.ponos.models import Artifact, Task
from arkindex.ponos.models import Artifact, Task
from arkindex.process.models import DataFile, GitRef, GitRefType, Process, WorkerVersion, WorkerVersionState
from arkindex.process.models import (
 
CorpusWorkerVersion,
 
DataFile,
 
GitRef,
 
GitRefType,
 
Process,
 
Worker,
 
WorkerActivity,
 
WorkerRun,
 
WorkerVersion,
 
WorkerVersionState,
 
)
from arkindex.project.aws import s3
from arkindex.project.aws import s3
from arkindex.project.rq_overrides import Job
from arkindex.project.rq_overrides import Job
from arkindex.training.models import ModelVersion
from arkindex.training.models import Model, ModelVersion
from redis.exceptions import ConnectionError
from redis.exceptions import ConnectionError
# Ponos artifacts use the path: <task id>/<path>
# Ponos artifacts use the path: <task id>/<path>
@@ -32,6 +43,9 @@ class Command(BaseCommand):
@@ -32,6 +43,9 @@ class Command(BaseCommand):
help = "Clean up old corpus exports, trashed DataFiles, expired processes and S3 buckets"
help = "Clean up old corpus exports, trashed DataFiles, expired processes and S3 buckets"
def handle(self, *args, **options):
def handle(self, *args, **options):
 
# Cleaning up workers could free some artifacts, so clean them before artifacts
 
self.cleanup_archived_workers()
 
self.cleanup_artifacts()
self.cleanup_artifacts()
self.cleanup_expired_processes()
self.cleanup_expired_processes()
@@ -48,6 +62,8 @@ class Command(BaseCommand):
@@ -48,6 +62,8 @@ class Command(BaseCommand):
self.cleanup_ponos_logs()
self.cleanup_ponos_logs()
 
self.cleanup_archived_models()
 
self.cleanup_unlinked_model_versions()
self.cleanup_unlinked_model_versions()
self.cleanup_rq_user_registries()
self.cleanup_rq_user_registries()
@@ -294,6 +310,71 @@ class Command(BaseCommand):
@@ -294,6 +310,71 @@ class Command(BaseCommand):
self.stdout.write(self.style.SUCCESS("Successfully cleaned up orphaned Ponos logs."))
self.stdout.write(self.style.SUCCESS("Successfully cleaned up orphaned Ponos logs."))
 
def cleanup_archived_workers(self):
 
"""
 
Remove Worker instances that have been archived for longer than the configured worker cleanup delay
 
and that are not being used in any worker result.
 
"""
 
self.stdout.write("Removing archived workers…")
 
 
workers = Worker.objects.filter(archived__lte=timezone.now() - timedelta(days=settings.WORKER_CLEANUP_DELAY))
 
skipped, deleted = 0, 0
 
for worker in workers.iterator():
 
# There are both foreign keys for worker versions and worker runs on worker results.
 
# Some old results might only have a worker version ID, but when a worker run ID is set,
 
# the worker version ID is deduced from it, so we only have to check on the version.
 
if worker.versions.all().in_use():
 
skipped += 1
 
continue
 
 
# Skip any workers whose WorkerConfigurations are in use.
 
# This should never happen since we already filter on the WorkerVersions,
 
# but that could lead to deleting worker results when we didn't want to.
 
if WorkerRun.objects.filter(configuration__worker=worker).in_use():
 
self.stdout.write(self.style.WARNING(
 
f"Worker {worker.name} ({worker.id}) does not have any worker versions used by worker results, "
 
"but some of its worker configurations are in use."
 
))
 
continue
 
 
self.stdout.write(f"Removing worker {worker.name} ({worker.id})")
 
worker.delete()
 
deleted += 1
 
 
if skipped:
 
self.stdout.write(f"Skipping {skipped} archived workers that have worker versions or configurations used in worker results.")
 
self.stdout.write(self.style.SUCCESS(f"Successfully cleaned up {deleted} archived workers."))
 
 
def cleanup_archived_models(self):
 
"""
 
Remove Model instances that have been archived for longer than the configured model cleanup delay
 
and that are not being used in any worker result.
 
"""
 
self.stdout.write("Removing archived models…")
 
models = Model.objects.filter(archived__lte=timezone.now() - timedelta(days=settings.MODEL_CLEANUP_DELAY))
 
 
skipped, deleted = 0, 0
 
for model in models.iterator():
 
if WorkerRun.objects.filter(model_version__model=model).in_use():
 
skipped += 1
 
continue
 
 
self.stdout.write(f"Removing model {model.name} ({model.id})")
 
 
# Remove CorpusWorkerVersions and WorkerActivities first
 
# Those normally use SET_NULL, but this can cause the unique constraints to complain
 
# if there already are rows with a model version set to None.
 
WorkerActivity.objects.filter(model_version__model=model).delete()
 
CorpusWorkerVersion.objects.filter(model_version__model=model).delete()
 
 
model.delete()
 
 
deleted += 1
 
 
if skipped:
 
self.stdout.write(f"Skipping {skipped} archived models that have model versions used in worker results.")
 
self.stdout.write(self.style.SUCCESS(f"Successfully cleaned up {deleted} archived models."))
 
def cleanup_unlinked_model_versions(self):
def cleanup_unlinked_model_versions(self):
self.stdout.write("Removing orphaned model versions archives…")
self.stdout.write("Removing orphaned model versions archives…")
bucket = s3.Bucket(settings.AWS_TRAINING_BUCKET)
bucket = s3.Bucket(settings.AWS_TRAINING_BUCKET)
Loading