Skip to content

Split Process.build_workflow into a function per mode

After #1536 (closed), both starting and retrying a process works the same via Process.run:

image

PlantUML diagram
@startuml
start
if (Process already running?) then (yes)
    #pink:Error;
    kill
endif

if (Process has invalid attributes?) then (yes)
    #pink:Error;
    kill
endif

:Process.build_workflow|

:Update started/finished;
:Trigger activity initialization;
:Update WorkerVersion cache;
stop
@enduml

Process.build_workflow generates the actual tasks and saves them, and is a huge mess:

image

PlantUML diagram
@startuml
start
if (Farm set?) then (yes)
else (no)
    :Set default farm;
endif

:Prepare common environment variables;

switch (Process mode?)
case (Repository)
    :Check process attributes again;
    :Create a Git import task;
    stop
case (IIIF)
    :Create a fake worker run;
    :Create a IIIF import task;
    if (Thumbnail generation?) then (no)
        stop
    endif
case (Transkribus)
    :Create a fake worker run;
    :Set Transkribus-specific variables;
    :Create a Transkribus export task;
    :Create a Transkribus import task;
    if (Thumbnail generation?) then (no)
        stop
    endif
case (Files)
    :Create a fake worker run;
    :Create a file import task;
    if (Thumbnail generation?) then (no)
        stop
    endif
case (S3)
    :Create a fake worker run;
    :Set S3-specific variables;
    :Create a S3 import task;
    stop
case (Training)
    :Create a worker run from
    the training-specific attributes;
    :Create a task like for a worker run
    but with the ""training"" slug;
    stop
case (Workers)
    :Create an initialisation task;
case (Dataset)
endswitch

if (mode is IIIF/Transkribus/Files and has thumbnail generation, or mode is Workers/Dataset) then (no)
    stop
endif

:List all expected elements.json files
depending on chunks;

if (mode is Workers/Dataset) then (yes)
    :List all worker runs;
endif

repeat
    if (Dataset mode?) then (no)
        :Use elements.json in all tasks;
    endif
    if (Thumbnail generation?) then (yes)
        if (Dataset mode?) then (yes)
            #pink:Error;
            kill
        endif
        :Create thumbnail generation task;
    endif

    :Create tasks for all worker runs;

repeat while (Other chunks?) is (yes)

:Bulk create all worker run and thumbnail tasks;

stop
@enduml

To simplify this, and make reviewing things like !2065 (merged) which introduce new logic for a new process mode, we could have separate method to build tasks for each process mode. Process.run just has a big if .. elif .. elif calling those methods.

Each method returns Tuple[Sequence[Task], Mapping[str, Sequence[str]]]. This is a two-tuple with a list of tasks to bulk create (not yet saved), and a dict of { slug: [parent slug, ...] } to bulk create parents for each task. These data structures are already currently used by the worker run creation step for Workers and Dataset processes.

We can also have separate functions to create a thumbnail generation task, or create all tasks for all worker runs, with a elements_json_path argument to support chunks and the different import task names.

This could allow for something like this:

def run_files(self) -> Tuple[Sequence[Task], Mapping[str, Sequence[str]]]:
    worker_run = self.create_fake_worker_run(worker_version_id=settings.IMPORTS_WORKER_VERSION)
    import_task = self.create_file_import_task(worker_run)
    tasks, task_parents = [import_task], {}
    if self.generate_thumbnails:
        thumbnail_task = self.create_thumbnail_task()
        tasks.append(thumbnail_task)
        task_parents[thumbnail_task.slug] = [import_task.slug]
    return tasks, task_parents

This makes it clearer what this process does: it has a file import task, and if you enable thumbnail generation, it has a thumbnail generation task. It does not support chunks, does not care about GPUs, worker activities, etc.

We might be able to simplify this even further by using a separate class, something like a ProcessBuilder to sound java-esque, which stores attributes with self.tasks and self.task_parents, and could allow for a self.worker_versions_cache to store new CorpusWorkerVersion instances to create. This will make it easier to let the builder just bulk create everything at the end, reducing the amount of SQL queries to make and thus the amount of stale reads or other bugs.