Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • arkindex/backend
1 result
Show changes
Commits on Source (13)
......@@ -19,7 +19,7 @@ binary:
CI_PROJECT_DIR=$(ROOT_DIR) CI_REGISTRY_IMAGE=$(IMAGE_TAG) $(ROOT_DIR)/ci/build.sh Dockerfile.binary -binary
worker:
arkindex/manage.py rqworker -v 2 default high
arkindex/manage.py rqworker -v 2 default high tasks
test-fixtures:
$(eval export PGPASSWORD=devdata)
......
import logging
import tempfile
from io import BytesIO
from pathlib import Path
from time import sleep
from urllib.parse import urljoin
from django.conf import settings
......@@ -9,10 +13,16 @@ from django.shortcuts import reverse
from django.template.loader import render_to_string
from django_rq import job
import docker
from arkindex.ponos.models import State, Task
from arkindex.ponos.utils import upload_artifacts
from arkindex.process.models import Process, WorkerActivityState
logger = logging.getLogger(__name__)
# Delay for polling docker task's logs in seconds
TASK_DOCKER_POLLING = 1
@job("default", timeout=settings.RQ_TIMEOUTS["notify_process_completion"])
def notify_process_completion(
......@@ -64,3 +74,135 @@ def notify_process_completion(
recipient_list=[process.creator.email],
fail_silently=False,
)
def run_docker_task(client, task, temp_dir):
# 1. Pull the docker image
logger.debug(f"Pulling docker image '{task.image}'")
client.images.pull(task.image)
# 2. Fetch artifacts
logger.info("Fetching artifacts from parents")
for parent in task.parents.order_by("depth", "id"):
folder = temp_dir / str(parent.slug)
folder.mkdir()
for artifact in parent.artifacts.all():
path = (folder / artifact.path).resolve()
# TODO: Artifact could leak any data, we should ensure path is a children of folder
artifact.download_to(str(path))
# 3. Do run the container asynchronously
logger.debug("Running container")
kwargs = {
"environment": {
**task.env,
"PONOS_DATA": "/data",
},
"detach": True,
"network": "host",
"volumes": {temp_dir: {"bind": "/data", "mode": "rw"}},
}
artifacts_dir = temp_dir / str(task.id)
artifacts_dir.mkdir()
# The symlink will only work within docker context as binded to /data/<task_uuid>/
(temp_dir / "current").symlink_to(Path(f"/data/{task.id}"))
if task.requires_gpu:
# Assign all GPUs to that container
# TODO: Make sure this works
kwargs["environment"] = {"NVIDIA_VISIBLE_DEVICES": "all"}
logger.info("Starting container with GPU support")
if task.command is not None:
kwargs["command"] = task.command
container = client.containers.run(task.image, **kwargs)
while container.status == "created":
container.reload()
if container.status != "running":
raise Exception("Container was not updated to state running")
task.state = State.Running
task.save()
# 4. Read logs
logger.debug("Reading logs from the docker container")
previous_logs = b""
while container.status == "running":
# Update container attributes first, so we do not miss publishing logs
container.reload()
logs = container.logs()
if logs != previous_logs:
previous_logs = logs
try:
task.logs.s3_object.upload_fileobj(
BytesIO(logs),
ExtraArgs={"ContentType": "text/plain; charset=utf-8"},
)
except Exception as e:
logger.warning(f"Failed uploading logs for task {task}: {e}")
sleep(TASK_DOCKER_POLLING)
task.refresh_from_db()
# Handle a task that is being stopped during execution
if task.state == State.Stopping:
container.stop()
task.state = State.Stopped
task.save()
return
# 5. Retrieve the state of the container
exit_code = container.attrs["State"]["ExitCode"]
if exit_code != 0:
logger.info("Task failed")
task.state = State.Failed
task.save()
return
task.state = State.Completed
task.save()
# 6. Upload artifacts
logger.info(f"Uploading artifacts for task {task}")
for path in Path(artifacts_dir).glob("**/*"):
if path.is_dir():
continue
try:
upload_artifacts(task, path, artifacts_dir)
except Exception as e:
logger.warning(
f"Failed uploading artifacts for task {task}: {e}"
)
@job("tasks", timeout=settings.RQ_TIMEOUTS["task"])
def run_task_rq(task: Task):
"""Run a single task in RQ"""
# Update task and parents from the DB
task.refresh_from_db()
parents = list(task.parents.order_by("depth", "id"))
client = docker.from_env()
if not task.image:
raise ValueError(f"Task {task} has no docker image.")
if task.state != State.Pending:
raise ValueError(f"Task {task} must be in pending state to run in RQ.")
# Automatically update children in case an error occurred
if (parent_state := next(
(parent.state for parent in parents if parent.state in (State.Stopped, State.Error, State.Failed)),
None
)) is not None:
task.state = parent_state
task.save()
return
with tempfile.TemporaryDirectory(suffix=f"_{task.id}") as temp_dir:
try:
run_docker_task(client, task, Path(temp_dir))
except Exception as e:
logger.error("An unexpected error occurred, updating state to Error.")
task.state = State.Error
task.save()
raise e
import os
import magic
from arkindex.ponos.models import Task
......@@ -8,3 +11,14 @@ def is_admin_or_ponos_task(request):
def get_process_from_task_auth(request):
if isinstance(request.auth, Task):
return request.auth.process
def upload_artifacts(task, path, artifacts_dir):
content_type = magic.from_file(path, mime=True)
size = os.path.getsize(path)
artifact = task.artifacts.create(
path=os.path.relpath(path, artifacts_dir),
content_type=content_type,
size=size,
)
artifact.s3_object.upload_file(str(path), ExtraArgs={"ContentType": content_type})
......@@ -393,3 +393,4 @@ class ProcessBuilder(object):
for child_slug, parent_slugs in self.tasks_parents.items()
for parent_slug in parent_slugs
)
return tasks
......@@ -390,11 +390,15 @@ class Process(IndexableModel):
"""
Build and start a new run for this process.
"""
from arkindex.project.triggers import schedule_tasks
process_builder = ProcessBuilder(self)
process_builder.validate()
process_builder.build()
# Save all tasks and their relations
process_builder.save()
# Trigger tasks execution in RQ
schedule_tasks(self, process_builder.run)
self.started = timezone.now()
self.finished = None
......
......@@ -154,6 +154,8 @@ def get_settings_parser(base_dir):
job_timeouts_parser.add_option("process_delete", type=int, default=3600)
job_timeouts_parser.add_option("reindex_corpus", type=int, default=7200)
job_timeouts_parser.add_option("notify_process_completion", type=int, default=120)
# Task execution in RQ timeouts after 10 hours by default
job_timeouts_parser.add_option("task", type=int, default=36000)
csrf_parser = parser.add_subparser("csrf", default={})
csrf_parser.add_option("cookie_name", type=str, default="arkindex.csrf")
......
......@@ -352,6 +352,13 @@ RQ_QUEUES = {
"DB": conf["redis"]["db"],
"PASSWORD": conf["redis"]["password"],
"DEFAULT_TIMEOUT": conf["redis"]["timeout"],
},
"tasks": {
"HOST": conf["redis"]["host"],
"PORT": conf["redis"]["port"],
"DB": conf["redis"]["db"],
"PASSWORD": conf["redis"]["password"],
"DEFAULT_TIMEOUT": conf["redis"]["timeout"],
}
}
......
......@@ -61,6 +61,7 @@ job_timeouts:
notify_process_completion: 120
process_delete: 3600
reindex_corpus: 7200
task: 36000
worker_results_delete: 3600
jwt_signing_key: null
local_imageserver_id: 1
......
......@@ -43,6 +43,7 @@ job_timeouts:
move_element:
a: b
reindex_corpus: {}
task: ''
worker_results_delete: null
jwt_signing_key: null
local_imageserver_id: 1
......
......@@ -24,6 +24,7 @@ job_timeouts:
export_corpus: "int() argument must be a string, a bytes-like object or a real number, not 'list'"
move_element: "int() argument must be a string, a bytes-like object or a real number, not 'dict'"
reindex_corpus: "int() argument must be a string, a bytes-like object or a real number, not 'dict'"
task: "invalid literal for int() with base 10: ''"
worker_results_delete: "int() argument must be a string, a bytes-like object or a real number, not 'NoneType'"
ponos:
artifact_max_size: cannot convert float NaN to integer
......
......@@ -75,7 +75,8 @@ job_timeouts:
notify_process_completion: 6
process_delete: 7
reindex_corpus: 8
worker_results_delete: 9
task: 9
worker_results_delete: 10
jwt_signing_key: deadbeef
local_imageserver_id: 45
metrics_port: 4242
......
......@@ -5,6 +5,7 @@ from typing import Literal, Optional, Union
from uuid import UUID
from django.db.models import Prefetch, prefetch_related_objects
from rq.job import Dependency
from arkindex.documents import export
from arkindex.documents import tasks as documents_tasks
......@@ -211,3 +212,17 @@ def notify_process_completion(process: Process):
process=process,
subject=f"Your process {process_name} finished {state_msg[state]}",
)
def schedule_tasks(process: Process, run: int):
"""Run tasks of a process in RQ, one by one"""
tasks = process.tasks.filter(run=run).order_by("depth", "id")
# Initially mark all tasks as pending
tasks.update(state=State.Pending)
# Build a simple dependency scheme between tasks, based on depth
parent_job = None
for task in tasks:
kwargs = {}
if parent_job:
kwargs["depends_on"] = Dependency(jobs=[parent_job], allow_failure=True)
parent_job = ponos_tasks.run_task_rq.delay(task, **kwargs)
......@@ -53,6 +53,12 @@ from arkindex.users.utils import RightContent, get_max_level
logger = logging.getLogger(__name__)
# Process tasks running in RQ are hidden from user jobs
VISIBLE_QUEUES = [
q for q in QUEUES.keys()
if q != "tasks"
]
@extend_schema(tags=["users"])
@extend_schema_view(
......@@ -340,7 +346,7 @@ class JobRetrieve(RetrieveDestroyAPIView):
serializer_class = JobSerializer
def get_object(self):
for queue_name in QUEUES.keys():
for queue_name in VISIBLE_QUEUES:
job = get_queue(queue_name).fetch_job(str(self.kwargs["pk"]))
if not job:
continue
......
......@@ -9,7 +9,9 @@ django-pgtrigger==4.7.0
django-rq==2.8.1
djangorestframework==3.12.4
djangorestframework-simplejwt==5.2.2
docker==7.0.0
drf-spectacular==0.18.2
python-magic==0.4.27
python-memcached==1.59
pytz==2023.3
PyYAML==6.0
......