Skip to content
Snippets Groups Projects
Commit b024e614 authored by Valentin Rigal's avatar Valentin Rigal
Browse files

Publish artifacts

parent 8c3f348e
No related branches found
No related tags found
No related merge requests found
This commit is part of merge request !2227. Comments created here will be created in the context of that merge request.
import logging
import tempfile
from pathlib import Path
from urllib.parse import urljoin
import requests
......@@ -13,6 +15,7 @@ from rq.job import Dependency
import docker
from arkindex.ponos.models import State, Task
from arkindex.ponos.utils import upload_artifacts
from arkindex.process.models import Process, WorkerActivityState
logger = logging.getLogger(__name__)
......@@ -84,6 +87,64 @@ def schedule_tasks(process: Process, run: int):
parent_job = run_task_rq.delay(task, **kwargs)
def run_docker_task(client, task, artifacts_dir):
# 1. Pull the docker image
logger.debug(f"Pulling docker image '{task.image}'")
client.images.pull(task.image)
# 2. Do run the container asynchronously
logger.debug("Running container")
kwargs = {
"environment": task.env,
"detach": True,
"network": "host",
"volumes": {artifacts_dir: {"bind": "/data", "mode": "rw"}},
}
if task.command is not None:
kwargs["command"] = task.command
container = client.containers.run(task.image, **kwargs)
task.state = State.Running
task.save()
# 3. Read logs (see agent.setup_logging)
logger.debug("Reading logs from the docker container")
data = b""
for line in container.logs(stream=True):
data += line
try:
requests.put(
task.logs.s3_put_url,
data,
headers={"Content-Type": "text/plain; charset=utf-8"},
)
except Exception as e:
logger.warning(f"Failed uploading logs for task {task}: {e}")
# 4. Retrieve the state of the container
container.reload()
exit_code = container.attrs["State"]["ExitCode"]
if exit_code != 0:
logger.info("Task failed")
task.state = State.Failed
task.save()
return
task.state = State.Completed
task.save()
# 5. Upload artifacts
logger.info(f"Uploading artifacts for task {task}")
for path in Path(artifacts_dir).glob("**"):
if path.is_dir:
continue
try:
upload_artifacts(task, path, artifacts_dir)
except Exception as e:
logger.warning(
f"Failed uploading artifacts for task {task}: {e}"
)
@job("tasks", timeout=settings.RQ_TIMEOUTS["task"])
def run_task_rq(task: Task):
"""Run a single task in RQ"""
......@@ -108,49 +169,11 @@ def run_task_rq(task: Task):
task.save()
return
try:
# 1. Pull the docker image
logger.debug(f"Pulling docker image '{task.image}'")
client.images.pull(task.image)
# 2. Do run the container asynchronously
logger.debug("Running container")
kwargs = {
"environment": task.env,
"detach": True,
"network": "host",
}
if task.command is not None:
kwargs["command"] = task.command
container = client.containers.run(task.image, **kwargs)
task.state = State.Running
task.save()
# 3. Read logs (see agent.setup_logging)
logger.debug("Reading logs from the docker container.")
data = b""
for line in container.logs(stream=True):
data += line
try:
requests.put(
task.logs.s3_put_url,
data,
headers={"Content-Type": "text/plain; charset=utf-8"},
)
except Exception as e:
logger.error(f"An error occurred uploading logs: {e}")
# 4. Retrieve the state of the container
container.reload()
exit_code = container.attrs["State"]["ExitCode"]
if exit_code == 0:
task.state = State.Completed
else:
logger.info("Task failed.")
task.state = State.Failed
task.save()
except Exception as e:
logger.error("An unexpected error occurred, updating state to Error.")
task.state = State.Error
task.save()
raise e
with tempfile.TemporaryDirectory() as artifacts_dir:
try:
run_docker_task(client, task, artifacts_dir)
except Exception as e:
logger.error("An unexpected error occurred, updating state to Error.")
task.state = State.Error
task.save()
raise e
import os
import requests
import magic
from arkindex.ponos.models import Task
......@@ -8,3 +13,19 @@ def is_admin_or_ponos_task(request):
def get_process_from_task_auth(request):
if isinstance(request.auth, Task):
return request.auth.process
def upload_artifacts(task, path, artifacts_dir):
content_type = magic.from_file(path, mime=True)
size = os.path.getsize(path)
artifact = task.artifacts.create(
path=os.path.relpath(path, artifacts_dir),
content_type=content_type,
size=size,
)
with open(path, "rb") as f:
requests.put(
artifact.s3_put_url,
data=f,
headers={"Content-Type": content_type},
)
......@@ -11,6 +11,7 @@ djangorestframework==3.12.4
djangorestframework-simplejwt==5.2.2
docker==7.0.0
drf-spectacular==0.18.2
python-magic==0.4.27
python-memcached==1.59
pytz==2023.3
PyYAML==6.0
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment