Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • arkindex/backend
1 result
Show changes
Commits on Source (3)
Showing
with 1223 additions and 797 deletions
......@@ -51,7 +51,13 @@ def run_pg_query(query, source_db):
Run a single Postgresql query and split the results into chunks.
When a name is given to a cursor, psycopg2 uses a server-side cursor; we just use a random string as a name.
"""
with connections[source_db].create_cursor(name=str(uuid.uuid4())) as pg_cursor:
db = connections[source_db]
# Make sure a connection is open and available for export databases
if source_db != "default" and db.connection is None:
db.connect()
with db.create_cursor(name=str(uuid.uuid4())) as pg_cursor:
pg_cursor.itersize = BATCH_SIZE
pg_cursor.execute(query)
......
This diff is collapsed.
......@@ -23,7 +23,7 @@ from arkindex.documents.models import (
TranscriptionEntity,
)
from arkindex.ponos.models import Task
from arkindex.process.models import Process, ProcessDataset, ProcessElement, WorkerActivity, WorkerRun
from arkindex.process.models import Process, ProcessDatasetSet, ProcessElement, WorkerActivity, WorkerRun
from arkindex.training.models import DatasetElement, DatasetSet
from arkindex.users.models import User
......@@ -70,9 +70,9 @@ def corpus_delete(corpus_id: str) -> None:
Selection.objects.filter(element__corpus_id=corpus_id),
corpus.memberships.all(),
corpus.exports.all(),
# ProcessDataset M2M
ProcessDataset.objects.filter(dataset__corpus_id=corpus_id),
ProcessDataset.objects.filter(process__corpus_id=corpus_id),
# ProcessDatasetSet M2M
ProcessDatasetSet.objects.filter(set__dataset__corpus_id=corpus_id),
ProcessDatasetSet.objects.filter(process__corpus_id=corpus_id),
DatasetElement.objects.filter(set__dataset__corpus_id=corpus_id),
DatasetSet.objects.filter(dataset__corpus_id=corpus_id),
corpus.datasets.all(),
......
......@@ -3,7 +3,14 @@ from django.db.models.signals import pre_delete
from arkindex.documents.models import Corpus, Element, EntityType, MetaType, Transcription
from arkindex.documents.tasks import corpus_delete
from arkindex.ponos.models import Farm, State, Task
from arkindex.process.models import CorpusWorkerVersion, Process, ProcessDataset, ProcessMode, Repository, WorkerVersion
from arkindex.process.models import (
CorpusWorkerVersion,
Process,
ProcessDatasetSet,
ProcessMode,
Repository,
WorkerVersion,
)
from arkindex.project.tests import FixtureTestCase, force_constraints_immediate
from arkindex.training.models import Dataset, DatasetSet
......@@ -123,16 +130,16 @@ class TestDeleteCorpus(FixtureTestCase):
name=set_name
) for set_name in ["test", "training", "validation"]
)
# Process on cls.corpus and with a dataset from cls.corpus
# Process on cls.corpus and with a set from cls.corpus
dataset_process1 = cls.corpus.processes.create(creator=cls.user, mode=ProcessMode.Dataset)
ProcessDataset.objects.create(process=dataset_process1, dataset=dataset1, sets=list(dataset1.sets.values_list("name", flat=True)))
# Process on cls.corpus with a dataset from another corpus
ProcessDatasetSet.objects.create(process=dataset_process1, set=test_set_1)
# Process on cls.corpus with a set from another corpus
dataset_process2 = cls.corpus.processes.create(creator=cls.user, mode=ProcessMode.Dataset)
ProcessDataset.objects.create(process=dataset_process2, dataset=dataset1, sets=list(dataset1.sets.values_list("name", flat=True)))
ProcessDataset.objects.create(process=dataset_process2, dataset=cls.dataset2, sets=list(cls.dataset2.sets.values_list("name", flat=True)))
# Process on another corpus with a dataset from another corpus and none from cls.corpus
ProcessDatasetSet.objects.create(process=dataset_process2, set=test_set_1)
ProcessDatasetSet.objects.create(process=dataset_process2, set=cls.dataset2.sets.get(name="training"))
# Process on another corpus with a set from another corpus and none from cls.corpus
cls.dataset_process3 = cls.corpus2.processes.create(creator=cls.user, mode=ProcessMode.Dataset)
ProcessDataset.objects.create(process=cls.dataset_process3, dataset=cls.dataset2, sets=list(cls.dataset2.sets.values_list("name", flat=True)))
ProcessDatasetSet.objects.create(process=cls.dataset_process3, set=cls.dataset2.sets.get(name="validation"))
cls.rev = cls.repo.revisions.create(
hash="42",
......
......@@ -155,7 +155,6 @@ class TestRetrieveElements(FixtureAPITestCase):
process = Process.objects.create(
mode=ProcessMode.Workers,
creator=self.user,
generate_thumbnails=True,
farm=Farm.objects.first(),
corpus=self.corpus
)
......
......@@ -46,7 +46,6 @@ from rest_framework.generics import (
RetrieveDestroyAPIView,
RetrieveUpdateAPIView,
RetrieveUpdateDestroyAPIView,
UpdateAPIView,
)
from rest_framework.response import Response
from rest_framework.serializers import Serializer
......@@ -61,7 +60,7 @@ from arkindex.process.models import (
GitRef,
GitRefType,
Process,
ProcessDataset,
ProcessDatasetSet,
ProcessMode,
Revision,
Worker,
......@@ -87,7 +86,7 @@ from arkindex.process.serializers.imports import (
StartProcessSerializer,
)
from arkindex.process.serializers.ingest import BucketSerializer, S3ImportSerializer
from arkindex.process.serializers.training import ProcessDatasetSerializer
from arkindex.process.serializers.training import ProcessDatasetSetSerializer
from arkindex.process.serializers.worker_runs import (
CorpusWorkerRunSerializer,
UserWorkerRunSerializer,
......@@ -126,7 +125,7 @@ from arkindex.project.pagination import CountCursorPagination
from arkindex.project.permissions import IsVerified, IsVerifiedOrReadOnly
from arkindex.project.tools import PercentileCont
from arkindex.project.triggers import process_delete
from arkindex.training.models import Model
from arkindex.training.models import DatasetSet, Model
from arkindex.users.models import Role, Scope
logger = logging.getLogger(__name__)
......@@ -565,7 +564,7 @@ class StartProcess(CorpusACLMixin, CreateAPIView):
"model_version__model",
"configuration",
)))
.prefetch_related("datasets")
.prefetch_related(Prefetch("sets", queryset=DatasetSet.objects.select_related("dataset")))
# Uses Exists() for has_tasks and not a __isnull because we are not joining on tasks and do not need to fetch them
.annotate(has_tasks=Exists(Task.objects.filter(process=OuterRef("pk"))))
)
......@@ -677,20 +676,20 @@ class DataFileCreate(CreateAPIView):
@extend_schema(tags=["process"])
@extend_schema_view(
get=extend_schema(
operation_id="ListProcessDatasets",
operation_id="ListProcessSets",
description=dedent(
"""
List all datasets on a process.
List all dataset sets on a process.
Requires a **guest** access to the process.
"""
),
),
)
class ProcessDatasets(ProcessACLMixin, ListAPIView):
class ProcessDatasetSets(ProcessACLMixin, ListAPIView):
permission_classes = (IsVerified, )
serializer_class = ProcessDatasetSerializer
queryset = ProcessDataset.objects.none()
serializer_class = ProcessDatasetSetSerializer
queryset = ProcessDatasetSet.objects.none()
@cached_property
def process(self):
......@@ -704,10 +703,10 @@ class ProcessDatasets(ProcessACLMixin, ListAPIView):
def get_queryset(self):
return (
ProcessDataset.objects.filter(process_id=self.process.id)
.select_related("process__creator", "dataset__creator")
.prefetch_related("dataset__sets")
.order_by("dataset__name")
ProcessDatasetSet.objects.filter(process_id=self.process.id)
.select_related("process__creator", "set__dataset__creator")
.prefetch_related(Prefetch("set__dataset__sets", queryset=DatasetSet.objects.order_by("name")))
.order_by("set__dataset__name", "set__name")
)
def get_serializer_context(self):
......@@ -722,51 +721,52 @@ class ProcessDatasets(ProcessACLMixin, ListAPIView):
@extend_schema(tags=["process"])
@extend_schema_view(
post=extend_schema(
operation_id="CreateProcessDataset",
operation_id="CreateProcessSet",
description=dedent(
"""
Add a dataset to a process.
Add a dataset set to a process.
Requires an **admin** access to the process and a **guest** access to the dataset's corpus.
"""
),
),
delete=extend_schema(
operation_id="DestroyProcessDataset",
operation_id="DestroyProcessSet",
description=dedent(
"""
Remove a dataset from a process.
Remove a dataset set from a process.
Requires an **admin** access to the process.
"""
),
),
)
class ProcessDatasetManage(CreateAPIView, UpdateAPIView, DestroyAPIView):
class ProcessDatasetSetManage(CreateAPIView, DestroyAPIView):
permission_classes = (IsVerified, )
serializer_class = ProcessDatasetSerializer
serializer_class = ProcessDatasetSetSerializer
def get_object(self):
process_dataset = get_object_or_404(
ProcessDataset.objects
.select_related("dataset__creator", "process__corpus")
.prefetch_related("dataset__sets")
qs = (
ProcessDatasetSet.objects
.select_related("set__dataset__creator", "process__corpus")
# Required to check for a process that have already started
.annotate(process_has_tasks=Exists(Task.objects.filter(process_id=self.kwargs["process"]))),
dataset_id=self.kwargs["dataset"], process_id=self.kwargs["process"]
.annotate(process_has_tasks=Exists(Task.objects.filter(process_id=self.kwargs["process"])))
)
# Only prefetch the dataset sets when creating
if self.request.method != "DELETE":
qs.prefetch_related(Prefetch("set__dataset__sets", queryset=DatasetSet.objects.order_by("name")))
process_set = get_object_or_404(
qs,
set_id=self.kwargs["set"], process_id=self.kwargs["process"]
)
# Copy the has_tasks annotation onto the process
process_dataset.process.has_tasks = process_dataset.process_has_tasks
return process_dataset
process_set.process.has_tasks = process_set.process_has_tasks
return process_set
def destroy(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
# Ignore the sets when retrieving the ProcessDataset instance, as there cannot be
# two ProcessDatasets with the same dataset and process, whatever the sets
validated_data = serializer.validated_data
del validated_data["sets"]
get_object_or_404(ProcessDataset, **validated_data).delete()
get_object_or_404(ProcessDatasetSet, **serializer.validated_data).delete()
return Response(status=status.HTTP_204_NO_CONTENT)
......
......@@ -62,24 +62,6 @@ class ProcessBuilder(object):
for parent_slug in parent_slugs
) + 1
def _add_thumbnails(self, import_task_slug: str) -> None:
"""
Automatically adds a thumbnail generation task at the end of each distributed chunk
If a specific parent slug is provided, attach the thumbnails generation task to it.
"""
if not self.process.generate_thumbnails:
return
chunks = self._get_elements_json_chunks()
for index, chunk in enumerate(chunks, start=1):
slug = "thumbnails"
if len(chunks) > 1:
slug += f"_{index}"
elements_path = shlex.quote(path.join("/data", import_task_slug, chunk))
command = f"python3 -m arkindex_tasks.generate_thumbnails {elements_path}"
self._build_task(command=command, slug=slug, env=self.base_env)
self.tasks_parents[slug].append(import_task_slug)
def _build_task(
self,
command,
......@@ -204,8 +186,6 @@ class ProcessBuilder(object):
def validate_dataset(self) -> None:
self.validate_gpu_requirement()
self.validate_archived()
if self.process.generate_thumbnails:
raise ValidationError("Thumbnails generation is incompatible with dataset mode processes.")
def validate(self) -> None:
"""
......@@ -275,8 +255,6 @@ class ProcessBuilder(object):
slug=import_task_slug,
env=self.base_env,
)
# Add thumbnails
self._add_thumbnails(import_task_slug=import_task_slug)
# Distribute worker run tasks
worker_runs = list(self.process.worker_runs.all())
......
# Generated by Django 4.1.7 on 2024-03-21 12:00
import uuid
import django.db.models.deletion
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("training", "0007_datasetset_model"),
("process", "0031_process_corpus_check_and_remove_revision_field"),
]
operations = [
migrations.CreateModel(
name="ProcessDatasetSet",
fields=[
("id", models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)),
("process", models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name="process_sets", to="process.process")),
("set", models.ForeignKey(on_delete=django.db.models.deletion.DO_NOTHING, related_name="process_sets", to="training.datasetset")),
],
),
migrations.AddConstraint(
model_name="processdatasetset",
constraint=models.UniqueConstraint(fields=("process", "set"), name="unique_process_set"),
),
migrations.RunSQL(
"""
INSERT INTO process_processdatasetset (id, process_id, set_id)
SELECT gen_random_uuid(), p.process_id, dss.id
FROM (
SELECT DISTINCT process_id, unnest(sets) AS set
FROM process_processdataset
) p
INNER JOIN training_datasetset AS dss ON (dataset_id = dss.dataset_id AND set = dss.name)
""",
),
migrations.RemoveField(
model_name="process",
name="datasets",
),
migrations.AddField(
model_name="process",
name="sets",
field=models.ManyToManyField(related_name="processes", through="process.ProcessDatasetSet", to="training.datasetset"),
),
migrations.DeleteModel(
name="ProcessDataset",
),
]
# Generated by Django 4.1.7 on 2024-03-27 16:38
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
("process", "0032_processdatasetset_model"),
]
operations = [
migrations.RemoveField(
model_name="process",
name="generate_thumbnails",
),
]
......@@ -76,7 +76,7 @@ class Process(IndexableModel):
corpus = models.ForeignKey("documents.Corpus", on_delete=models.CASCADE, related_name="processes", null=True)
mode = EnumField(ProcessMode, max_length=30)
files = models.ManyToManyField("process.DataFile", related_name="processes")
datasets = models.ManyToManyField("training.Dataset", related_name="processes", through="process.ProcessDataset")
sets = models.ManyToManyField("training.DatasetSet", related_name="processes", through="process.ProcessDatasetSet")
versions = models.ManyToManyField("process.WorkerVersion", through="process.WorkerRun", related_name="processes")
activity_state = EnumField(ActivityState, max_length=32, default=ActivityState.Disabled)
......@@ -135,9 +135,6 @@ class Process(IndexableModel):
# Used to include sub-elements of the actual query in Workers processes
load_children = models.BooleanField(default=False)
# Whether or not to add a thumbnail generation task
generate_thumbnails = models.BooleanField(default=False)
chunks = models.PositiveIntegerField(
default=1,
validators=[
......@@ -465,26 +462,19 @@ class ProcessElement(models.Model):
)
class ProcessDataset(models.Model):
class ProcessDatasetSet(models.Model):
"""
Link between Processes and Datasets.
Link between Processes and Dataset Sets.
"""
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
process = models.ForeignKey(Process, on_delete=models.CASCADE, related_name="process_datasets")
dataset = models.ForeignKey("training.Dataset", on_delete=models.CASCADE, related_name="process_datasets")
sets = ArrayField(
models.CharField(max_length=50, validators=[MinLengthValidator(1)]),
validators=[
MinLengthValidator(1),
validate_unique_set_names,
]
)
process = models.ForeignKey(Process, on_delete=models.CASCADE, related_name="process_sets")
set = models.ForeignKey("training.DatasetSet", on_delete=models.DO_NOTHING, related_name="process_sets")
class Meta:
constraints = [
models.UniqueConstraint(
fields=["process", "dataset"],
name="unique_process_dataset",
fields=["process", "set"],
name="unique_process_set",
)
]
......
......@@ -258,9 +258,6 @@ class FilesProcessSerializer(serializers.ModelSerializer):
# Automatically set the creator on the process
creator = serializers.HiddenField(default=serializers.CurrentUserDefault())
# Always start the process with thumbnail generation enabled
generate_thumbnails = serializers.HiddenField(default=True)
default_error_messages = {
"mode_not_allowed": "This mode is not allowed when importing from files",
"files_required": "At least one file is required to start an import process",
......@@ -287,7 +284,6 @@ class FilesProcessSerializer(serializers.ModelSerializer):
"element_type",
"farm_id",
"creator",
"generate_thumbnails",
)
def validate_mode(self, mode):
......@@ -389,7 +385,6 @@ class StartProcessSerializer(serializers.Serializer):
validators=[MaxValueValidator(lambda: settings.MAX_CHUNKS)],
default=1,
)
thumbnails = serializers.BooleanField(default=False, source="generate_thumbnails")
farm = serializers.PrimaryKeyRelatedField(queryset=Farm.objects.all(), default=None, allow_null=True)
use_cache = serializers.BooleanField(default=False)
use_gpu = serializers.BooleanField(default=False, allow_null=True)
......@@ -415,14 +410,12 @@ class StartProcessSerializer(serializers.Serializer):
errors = defaultdict(list)
if self.instance.mode == ProcessMode.Dataset:
# Only call .count() and .all() as they will access the prefetched datasets and not cause any extra query
if not self.instance.datasets.count():
errors["non_field_errors"].append("A dataset process cannot be started if it does not have any associated datasets.")
elif not any(dataset.corpus_id == self.instance.corpus.id for dataset in self.instance.datasets.all()):
errors["non_field_errors"].append("At least one of the process datasets must be from the same corpus as the process.")
if validated_data.get("generate_thumbnails"):
errors["thumbnails"].append("Thumbnails generation is not supported on Dataset processes.")
# Only call .count() and .all() as they will access the prefetched dataset sets and not cause any extra query
if not self.instance.sets.count():
errors["non_field_errors"].append("A dataset process cannot be started if it does not have any associated dataset sets.")
elif not any(ds.dataset.corpus_id == self.instance.corpus.id for ds in self.instance.sets.all()):
errors["non_field_errors"].append("At least one of the process sets must be from the same corpus as the process.")
if validated_data.get("worker_activity"):
errors["worker_activity"].append("Worker activities are not supported on Dataset processes.")
if validated_data.get("use_cache"):
......@@ -481,14 +474,7 @@ class StartProcessSerializer(serializers.Serializer):
)
else:
if validated_data.get("worker_activity"):
errors["worker_activity"].append("The process must have workers attached to handle their activity.")
if validated_data.get("use_cache"):
errors["use_cache"].append("The process must have workers attached to use cached results.")
if validated_data.get("use_gpu"):
errors["use_gpu"].append("The process must have workers attached to use GPUs.")
if not validated_data.get("generate_thumbnails"):
errors["__all__"].append("The process must either use thumbnail generation or have worker runs.")
errors["__all__"].append("The process must have worker runs to be started.")
if errors:
raise ValidationError(errors)
......
......@@ -5,47 +5,41 @@ from rest_framework import serializers
from rest_framework.exceptions import PermissionDenied, ValidationError
from arkindex.documents.models import Corpus
from arkindex.process.models import Process, ProcessDataset, ProcessMode, Task
from arkindex.process.models import Process, ProcessDatasetSet, ProcessMode, Task
from arkindex.project.mixins import ProcessACLMixin
from arkindex.training.models import Dataset
from arkindex.training.models import DatasetSet
from arkindex.training.serializers import DatasetSerializer
from arkindex.users.models import Role
def _dataset_id_from_context(serializer_field):
return serializer_field.context.get("view").kwargs["dataset"]
def _set_id_from_context(serializer_field):
return serializer_field.context.get("view").kwargs["set"]
def _process_id_from_context(serializer_field):
return serializer_field.context.get("view").kwargs["process"]
_dataset_id_from_context.requires_context = True
_set_id_from_context.requires_context = True
_process_id_from_context.requires_context = True
class ProcessDatasetSerializer(ProcessACLMixin, serializers.ModelSerializer):
class ProcessDatasetSetSerializer(ProcessACLMixin, serializers.ModelSerializer):
process_id = serializers.HiddenField(
write_only=True,
default=_process_id_from_context
)
dataset_id = serializers.HiddenField(
set_id = serializers.HiddenField(
write_only=True,
default=_dataset_id_from_context
)
dataset = DatasetSerializer(read_only=True)
sets = serializers.ListField(
child=serializers.CharField(max_length=50),
required=False,
allow_null=False,
allow_empty=False,
min_length=1
default=_set_id_from_context
)
dataset = DatasetSerializer(read_only=True, source="set.dataset")
set_name = serializers.CharField(read_only=True, source="set.name")
class Meta:
model = ProcessDataset
fields = ("dataset_id", "dataset", "process_id", "id", "sets", )
read_only_fields = ("process_id", "id", )
model = ProcessDatasetSet
fields = ("process_id", "set_id", "id", "dataset", "set_name", )
read_only_fields = ("process_id", "id", "dataset", "set_name", )
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
......@@ -81,44 +75,34 @@ class ProcessDatasetSerializer(ProcessACLMixin, serializers.ModelSerializer):
if not access or not (access >= Role.Admin.value):
raise PermissionDenied(detail="You do not have admin access to this process.")
# Validate dataset
# Validate dataset set
if not self.instance:
if request_method == "DELETE":
# Allow deleting ProcessDatasets even if the user looses access to the corpus
dataset_qs = Dataset.objects.all()
# Allow deleting ProcessDatasetSets even if the user looses access to the corpus
set_qs = DatasetSet.objects.all()
else:
dataset_qs = Dataset.objects.filter(corpus__in=Corpus.objects.readable(self._user))
set_qs = (
DatasetSet.objects.filter(dataset__corpus__in=Corpus.objects.readable(self._user))
.select_related("dataset__creator").prefetch_related("dataset__sets")
)
try:
dataset = dataset_qs.select_related("creator").prefetch_related("sets").get(pk=data["dataset_id"])
except Dataset.DoesNotExist:
raise ValidationError({"dataset": [f'Invalid pk "{str(data["dataset_id"])}" - object does not exist.']})
dataset_set = set_qs.get(pk=data["set_id"])
except DatasetSet.DoesNotExist:
raise ValidationError({"set": [f'Invalid pk "{str(data["set_id"])}" - object does not exist.']})
else:
dataset = self.instance.dataset
data["dataset"] = dataset
dataset_set = self.instance.set
if process.mode != ProcessMode.Dataset:
errors["process"].append('Datasets can only be added to or removed from processes of mode "dataset".')
errors["process"].append('Dataset sets can only be added to or removed from processes of mode "dataset".')
if process.has_tasks:
errors["process"].append("Datasets cannot be updated on processes that have already started.")
if self.context["request"].method == "POST" and process.datasets.filter(id=dataset.id).exists():
errors["dataset"].append("This dataset is already selected in this process.")
errors["process"].append("Dataset sets cannot be updated on processes that have already started.")
# Validate sets
sets = data.get("sets")
if not sets or len(sets) == 0:
if not self.instance:
data["sets"] = [item.name for item in list(dataset.sets.all())]
else:
errors["sets"].append("This field cannot be empty.")
else:
if any(s not in [item.name for item in list(dataset.sets.all())] for s in sets):
errors["sets"].append("The specified sets must all exist in the specified dataset.")
if len(set(sets)) != len(sets):
errors["sets"].append("Sets must be unique.")
if self.context["request"].method == "POST" and process.sets.filter(id=dataset_set.id).exists():
errors["set"].append("This dataset set is already selected in this process.")
if errors:
raise ValidationError(errors)
data["set"] = dataset_set
return data
......@@ -13,7 +13,7 @@ from arkindex.process.models import (
ActivityState,
FeatureUsage,
Process,
ProcessDataset,
ProcessDatasetSet,
ProcessMode,
Repository,
WorkerActivity,
......@@ -117,10 +117,14 @@ class TestCreateProcess(FixtureAPITestCase):
})
# The process needs to be started to produce a workflow
process.worker_runs.create(
version=self.version_1
)
response = self.client.post(
reverse("api:process-start", kwargs={"pk": str(process.id)}),
# The process needs a worker run or thumbnails generation to start
{"thumbnails": "true"}
{}
)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
process.refresh_from_db()
......@@ -487,10 +491,13 @@ class TestCreateProcess(FixtureAPITestCase):
self.assertEqual(process.elements.get(), page)
# The process needs to be started to produce a workflow
process.worker_runs.create(
version=self.version_1
)
response = self.client.post(
reverse("api:process-start", kwargs={"pk": str(process.id)}),
# The process needs a worker run or thumbnails generation to start
{"thumbnails": "true"}
{}
)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
process.refresh_from_db()
......@@ -533,46 +540,6 @@ class TestCreateProcess(FixtureAPITestCase):
process = Process.objects.get(id=data["id"])
self.assertEqual(process.name, "blah")
def test_thumbnails_generation_only(self):
"""
Generating thumbnails without any worker must generate a process task
and a thumbnails task, when the process is started
"""
self.client.force_login(self.user)
corpus_type = self.corpus.types.first()
response = self.client.post(
reverse("api:corpus-process"),
{
"corpus": str(self.corpus.id),
"element_type": corpus_type.slug,
},
format="json"
)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
data = response.json()
process = Process.objects.get(id=data["id"])
self.assertFalse(process.tasks.exists())
response = self.client.post(
reverse("api:process-start", kwargs={"pk": str(process.id)}),
{
"chunks": 3,
"thumbnails": True,
}
)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
process.refresh_from_db()
self.assertEqual(process.tasks.count(), 4)
init_task = process.tasks.get(slug="initialisation")
self.assertEqual(init_task.command, f"python -m arkindex_tasks.init_elements {process.id} --chunks-number 3")
for i in range(1, 4):
self.assertEqual(
process.tasks.get(slug=f"thumbnails_{i}").command,
f"python3 -m arkindex_tasks.generate_thumbnails /data/initialisation/elements_chunk_{i}.json"
)
@override_settings(
ARKINDEX_TASKS_IMAGE="registry.teklia.com/tasks",
PONOS_DEFAULT_ENV={}
......@@ -899,8 +866,8 @@ class TestCreateProcess(FixtureAPITestCase):
self.client.force_login(self.user)
process = self.corpus.processes.create(creator=self.user, mode=ProcessMode.Dataset)
dataset = self.corpus.datasets.first()
test_sets = list(dataset.sets.values_list("name", flat=True))
ProcessDataset.objects.create(process=process, dataset=dataset, sets=test_sets)
test_set = dataset.sets.get(name="test")
ProcessDatasetSet.objects.create(process=process, set=test_set)
process.versions.set([self.version_2, self.version_3])
with self.assertNumQueries(9):
......@@ -930,8 +897,8 @@ class TestCreateProcess(FixtureAPITestCase):
self.worker_1.save()
process = self.corpus.processes.create(creator=self.user, mode=ProcessMode.Dataset)
dataset = self.corpus.datasets.first()
test_sets = list(dataset.sets.values_list("name", flat=True))
ProcessDataset.objects.create(process=process, dataset=dataset, sets=test_sets)
test_set = dataset.sets.get(name="test")
ProcessDatasetSet.objects.create(process=process, set=test_set)
process.versions.add(self.version_1)
with self.assertNumQueries(9):
......
......@@ -16,7 +16,7 @@ from arkindex.process.models import (
ActivityState,
DataFile,
Process,
ProcessDataset,
ProcessDatasetSet,
ProcessMode,
WorkerActivity,
WorkerActivityState,
......@@ -43,6 +43,7 @@ class TestProcesses(FixtureAPITestCase):
description="Human instrumentality manual",
creator=cls.user
)
cls.private_dataset.sets.create(name="test")
cls.img_df = cls.corpus.files.create(
name="test.jpg",
size=42,
......@@ -2167,7 +2168,7 @@ class TestProcesses(FixtureAPITestCase):
self.assertEqual(
response.json(),
{"__all__": ["The process must either use thumbnail generation or have worker runs."]},
{"__all__": ["The process must have worker runs to be started."]},
)
def test_start_process_unavailable_worker_version(self):
......@@ -2319,12 +2320,13 @@ class TestProcesses(FixtureAPITestCase):
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertDictEqual(response.json(), {
"non_field_errors": ["A dataset process cannot be started if it does not have any associated datasets."]
"non_field_errors": ["A dataset process cannot be started if it does not have any associated dataset sets."]
})
def test_start_process_dataset_requires_dataset_in_same_corpus(self):
process2 = self.corpus.processes.create(creator=self.user, mode=ProcessMode.Dataset)
ProcessDataset.objects.create(process=process2, dataset=self.private_dataset, sets=list(self.private_dataset.sets.values_list("name", flat=True)))
test_set = self.private_dataset.sets.get(name="test")
ProcessDatasetSet.objects.create(process=process2, set=test_set)
process2.worker_runs.create(version=self.recognizer, parents=[], configuration=None)
self.assertFalse(process2.tasks.exists())
......@@ -2336,13 +2338,15 @@ class TestProcesses(FixtureAPITestCase):
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertDictEqual(response.json(), {
"non_field_errors": ["At least one of the process datasets must be from the same corpus as the process."]
"non_field_errors": ["At least one of the process sets must be from the same corpus as the process."]
})
def test_start_process_dataset_unsupported_parameters(self):
process2 = self.corpus.processes.create(creator=self.user, mode=ProcessMode.Dataset)
ProcessDataset.objects.create(process=process2, dataset=self.dataset1, sets=list(self.dataset1.sets.values_list("name", flat=True)))
ProcessDataset.objects.create(process=process2, dataset=self.private_dataset, sets=list(self.dataset2.sets.values_list("name", flat=True)))
test_set_1 = self.dataset1.sets.get(name="test")
test_set_2 = self.dataset2.sets.get(name="test")
ProcessDatasetSet.objects.create(process=process2, set=test_set_1)
ProcessDatasetSet.objects.create(process=process2, set=test_set_2)
process2.worker_runs.create(version=self.recognizer, parents=[], configuration=None)
self.client.force_login(self.user)
......@@ -2351,7 +2355,6 @@ class TestProcesses(FixtureAPITestCase):
response = self.client.post(
reverse("api:process-start", kwargs={"pk": str(process2.id)}),
{
"thumbnails": True,
"use_cache": True,
"worker_activity": True,
}
......@@ -2359,15 +2362,16 @@ class TestProcesses(FixtureAPITestCase):
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertDictEqual(response.json(), {
"thumbnails": ["Thumbnails generation is not supported on Dataset processes."],
"use_cache": ["Caching is not supported on Dataset processes."],
"worker_activity": ["Worker activities are not supported on Dataset processes."],
})
def test_start_process_dataset(self):
process2 = self.corpus.processes.create(creator=self.user, mode=ProcessMode.Dataset)
ProcessDataset.objects.create(process=process2, dataset=self.dataset1, sets=list(self.dataset1.sets.values_list("name", flat=True)))
ProcessDataset.objects.create(process=process2, dataset=self.private_dataset, sets=list(self.private_dataset.sets.values_list("name", flat=True)))
test_set_1 = self.dataset1.sets.get(name="test")
test_set_2 = self.private_dataset.sets.get(name="test")
ProcessDatasetSet.objects.create(process=process2, set=test_set_1)
ProcessDatasetSet.objects.create(process=process2, set=test_set_2)
run = process2.worker_runs.create(version=self.recognizer, parents=[], configuration=None)
self.assertFalse(process2.tasks.exists())
......@@ -2387,8 +2391,8 @@ class TestProcesses(FixtureAPITestCase):
self.assertEqual(process2.tasks.count(), 1)
task = process2.tasks.get()
self.assertEqual(task.slug, run.task_slug)
self.assertQuerysetEqual(process2.datasets.order_by("name"), [
self.private_dataset, self.dataset1
self.assertQuerysetEqual(process2.sets.order_by("dataset__name"), [
test_set_2, test_set_1
])
def test_start_process_from_docker_image(self):
......@@ -2503,7 +2507,6 @@ class TestProcesses(FixtureAPITestCase):
({"chunks": 0}, {"chunks": ["Ensure this value is greater than or equal to 1."]}),
({"chunks": 20}, {"chunks": ["Ensure this value is less than or equal to 10."]}),
({"chunks": "max"}, {"chunks": ["A valid integer is required."]}),
({"thumbnails": "gloubiboulga"}, {"thumbnails": ["Must be a valid boolean."]})
]
for (params, check) in wrong_params_checks:
with self.subTest(**params), self.assertNumQueries(5):
......@@ -2533,7 +2536,7 @@ class TestProcesses(FixtureAPITestCase):
def test_start_process_workers_parameters(self):
"""
It should be possible to pass chunks and thumbnails parameters when starting a workers process
It should be possible to pass chunks parameters when starting a workers process
"""
process = self.corpus.processes.create(creator=self.user, mode=ProcessMode.Workers)
# Add a worker run to this process
......@@ -2542,7 +2545,7 @@ class TestProcesses(FixtureAPITestCase):
self.client.force_login(self.user)
response = self.client.post(
reverse("api:process-start", kwargs={"pk": str(process.id)}),
{"chunks": 3, "thumbnails": "true"}
{"chunks": 3}
)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
process.refresh_from_db()
......@@ -2552,9 +2555,6 @@ class TestProcesses(FixtureAPITestCase):
f"{run.task_slug}_1",
f"{run.task_slug}_2",
f"{run.task_slug}_3",
"thumbnails_1",
"thumbnails_2",
"thumbnails_3"
])
def test_start_process_dataset_chunks(self):
......@@ -2562,8 +2562,10 @@ class TestProcesses(FixtureAPITestCase):
It should be possible to pass chunks when starting a dataset process
"""
process = self.corpus.processes.create(creator=self.user, mode=ProcessMode.Dataset)
ProcessDataset.objects.create(process=process, dataset=self.dataset1, sets=list(self.dataset1.sets.values_list("name", flat=True)))
ProcessDataset.objects.create(process=process, dataset=self.dataset2, sets=list(self.dataset2.sets.values_list("name", flat=True)))
test_set_1 = self.dataset1.sets.get(name="test")
test_set_2 = self.dataset2.sets.get(name="test")
ProcessDatasetSet.objects.create(process=process, set=test_set_1)
ProcessDatasetSet.objects.create(process=process, set=test_set_2)
# Add a worker run to this process
run = process.worker_runs.create(version=self.recognizer, parents=[], configuration=None)
......@@ -2600,10 +2602,7 @@ class TestProcesses(FixtureAPITestCase):
)
self.assertEqual(response.status_code, status.HTTP_400_BAD_REQUEST)
self.assertDictEqual(response.json(), {
"__all__": ["The process must either use thumbnail generation or have worker runs."],
"use_cache": ["The process must have workers attached to use cached results."],
"worker_activity": ["The process must have workers attached to handle their activity."],
"use_gpu": ["The process must have workers attached to use GPUs."],
"__all__": ["The process must have worker runs to be started."]
})
process.refresh_from_db()
self.assertFalse(process.use_cache)
......
......@@ -75,8 +75,8 @@ from arkindex.process.api import (
DataFileRetrieve,
FilesProcess,
ListProcessElements,
ProcessDatasetManage,
ProcessDatasets,
ProcessDatasetSetManage,
ProcessDatasetSets,
ProcessDetails,
ProcessList,
ProcessRetry,
......@@ -272,8 +272,8 @@ api = [
path("process/<uuid:pk>/apply/", ApplyProcessTemplate.as_view(), name="apply-process-template"),
path("process/<uuid:pk>/clear/", ClearProcess.as_view(), name="clear-process"),
path("process/<uuid:pk>/select-failures/", SelectProcessFailures.as_view(), name="process-select-failures"),
path("process/<uuid:pk>/datasets/", ProcessDatasets.as_view(), name="process-datasets"),
path("process/<uuid:process>/dataset/<uuid:dataset>/", ProcessDatasetManage.as_view(), name="process-dataset"),
path("process/<uuid:pk>/sets/", ProcessDatasetSets.as_view(), name="process-sets"),
path("process/<uuid:process>/set/<uuid:set>/", ProcessDatasetSetManage.as_view(), name="process-set"),
# ML models training
path("modelversion/<uuid:pk>/", ModelVersionsRetrieve.as_view(), name="model-version-retrieve"),
......
......@@ -165,18 +165,19 @@ FROM "documents_corpusexport"
WHERE "documents_corpusexport"."corpus_id" = '{corpus_id}'::uuid;
DELETE
FROM "process_processdataset"
WHERE "process_processdataset"."id" IN
FROM "process_processdatasetset"
WHERE "process_processdatasetset"."id" IN
(SELECT U0."id"
FROM "process_processdataset" U0
INNER JOIN "training_dataset" U1 ON (U0."dataset_id" = U1."id")
WHERE U1."corpus_id" = '{corpus_id}'::uuid);
FROM "process_processdatasetset" U0
INNER JOIN "training_datasetset" U1 ON (U0."set_id" = U1."id")
INNER JOIN "training_dataset" U2 ON (U1."dataset_id" = U2."id")
WHERE U2."corpus_id" = '{corpus_id}'::uuid);
DELETE
FROM "process_processdataset"
WHERE "process_processdataset"."id" IN
FROM "process_processdatasetset"
WHERE "process_processdatasetset"."id" IN
(SELECT U0."id"
FROM "process_processdataset" U0
FROM "process_processdatasetset" U0
INNER JOIN "process_process" U1 ON (U0."process_id" = U1."id")
WHERE U1."corpus_id" = '{corpus_id}'::uuid);
......
......@@ -169,18 +169,19 @@ FROM "documents_corpusexport"
WHERE "documents_corpusexport"."corpus_id" = '{corpus_id}'::uuid;
DELETE
FROM "process_processdataset"
WHERE "process_processdataset"."id" IN
FROM "process_processdatasetset"
WHERE "process_processdatasetset"."id" IN
(SELECT U0."id"
FROM "process_processdataset" U0
INNER JOIN "training_dataset" U1 ON (U0."dataset_id" = U1."id")
WHERE U1."corpus_id" = '{corpus_id}'::uuid);
FROM "process_processdatasetset" U0
INNER JOIN "training_datasetset" U1 ON (U0."set_id" = U1."id")
INNER JOIN "training_dataset" U2 ON (U1."dataset_id" = U2."id")
WHERE U2."corpus_id" = '{corpus_id}'::uuid);
DELETE
FROM "process_processdataset"
WHERE "process_processdataset"."id" IN
FROM "process_processdatasetset"
WHERE "process_processdatasetset"."id" IN
(SELECT U0."id"
FROM "process_processdataset" U0
FROM "process_processdatasetset" U0
INNER JOIN "process_process" U1 ON (U0."process_id" = U1."id")
WHERE U1."corpus_id" = '{corpus_id}'::uuid);
......
......@@ -29,7 +29,6 @@ SELECT "process_process"."id",
"process_process"."name_contains",
"process_process"."ml_class_id",
"process_process"."load_children",
"process_process"."generate_thumbnails",
"process_process"."chunks",
"process_process"."use_cache",
"process_process"."use_gpu",
......
......@@ -29,7 +29,6 @@ SELECT "process_process"."id",
"process_process"."name_contains",
"process_process"."ml_class_id",
"process_process"."load_children",
"process_process"."generate_thumbnails",
"process_process"."chunks",
"process_process"."use_cache",
"process_process"."use_gpu",
......