Skip to content
Snippets Groups Projects

Execute docker tasks in RQ

Merged Valentin Rigal requested to merge dummy-tasks into community
Compare and Show latest version
2 files
+ 43
24
Compare changes
  • Side-by-side
  • Inline
Files
2
+ 41
24
@@ -17,6 +17,7 @@ import docker
from arkindex.ponos.models import State, Task
from arkindex.ponos.utils import upload_artifacts
from arkindex.process.models import Process, WorkerActivityState
from docker.errors import APIError, ImageNotFound
logger = logging.getLogger(__name__)
@@ -76,10 +77,26 @@ def notify_process_completion(
)
def upload_logs(task, text):
try:
task.logs.s3_object.upload_fileobj(
BytesIO(text),
ExtraArgs={"ContentType": "text/plain; charset=utf-8"},
)
except Exception as e:
logger.warning(f"Failed uploading logs for task {task}: {e}")
def run_docker_task(client, task, temp_dir):
# 1. Pull the docker image
logger.debug(f"Pulling docker image '{task.image}'")
client.images.pull(task.image)
try:
client.images.pull(task.image)
except (ImageNotFound, APIError) as e:
# Pulling is allowed to fail when the image is already present locally (local builds)
if not client.images.list(task.image):
raise Exception(f"Image not found locally nor remotely: {e}")
logger.info("Remote image could not be fetched, using the local image.")
# 2. Fetch artifacts
logger.info("Fetching artifacts from parents")
@@ -88,7 +105,8 @@ def run_docker_task(client, task, temp_dir):
folder.mkdir()
for artifact in parent.artifacts.all():
path = (folder / artifact.path).resolve()
# TODO: Artifact could leak any data, we should ensure path is a children of folder
# Ensure path is a children of folder
assert str(folder.resolve()) in str(path.resolve()), "Invalid artifact path: {artifact.path}."
artifact.download_to(str(path))
# 3. Do run the container asynchronously
@@ -96,20 +114,20 @@ def run_docker_task(client, task, temp_dir):
kwargs = {
"environment": {
**task.env,
"PONOS_DATA": "/data",
"PONOS_DATA": settings.PONOS_DATA_DIR,
},
"detach": True,
"network": "host",
"volumes": {temp_dir: {"bind": "/data", "mode": "rw"}},
"volumes": {temp_dir: {"bind": settings.PONOS_DATA_DIR, "mode": "rw"}},
}
artifacts_dir = temp_dir / str(task.id)
artifacts_dir.mkdir()
# The symlink will only work within docker context as binded to /data/<task_uuid>/
(temp_dir / "current").symlink_to(Path(f"/data/{task.id}"))
# The symlink will only work within docker context as binded to PONOS_DATA_DIR/<task_uuid>/
(temp_dir / "current").symlink_to(Path(settings.PONOS_DATA_DIR) / task.id)
if task.requires_gpu:
# Assign all GPUs to that container
# TODO: Make sure this works
# TODO: Make sure this works with a GPU
kwargs["environment"] = {"NVIDIA_VISIBLE_DEVICES": "all"}
logger.info("Starting container with GPU support")
if task.command is not None:
@@ -117,8 +135,6 @@ def run_docker_task(client, task, temp_dir):
container = client.containers.run(task.image, **kwargs)
while container.status == "created":
container.reload()
if container.status != "running":
raise Exception("Container was not updated to state running")
task.state = State.Running
task.save()
@@ -126,30 +142,24 @@ def run_docker_task(client, task, temp_dir):
logger.debug("Reading logs from the docker container")
previous_logs = b""
while container.status == "running":
# Update container attributes first, so we do not miss publishing logs
container.reload()
logs = container.logs()
if logs != previous_logs:
upload_logs(task, logs)
previous_logs = logs
try:
task.logs.s3_object.upload_fileobj(
BytesIO(logs),
ExtraArgs={"ContentType": "text/plain; charset=utf-8"},
)
except Exception as e:
logger.warning(f"Failed uploading logs for task {task}: {e}")
sleep(TASK_DOCKER_POLLING)
task.refresh_from_db()
# Handle a task that is being stopped during execution
task.refresh_from_db()
if task.state == State.Stopping:
container.stop()
task.state = State.Stopped
task.save()
return
break
sleep(TASK_DOCKER_POLLING)
container.reload()
# Upload logs one last time so we do not miss any data
upload_logs(task, container.logs())
# 5. Retrieve the state of the container
container.reload()
exit_code = container.attrs["State"]["ExitCode"]
if exit_code != 0:
logger.info("Task failed")
@@ -202,7 +212,14 @@ def run_task_rq(task: Task):
try:
run_docker_task(client, task, Path(temp_dir))
except Exception as e:
logger.error("An unexpected error occurred, updating state to Error.")
logger.error(f"An unexpected error occurred, updating state to Error: {e}")
task.state = State.Error
task.save()
# Add unexpected error details to task logs
text = BytesIO()
if task.logs.exists():
task.logs.s3_object.download_fileobj(text)
text = text.get_value()
text += f"\nPonos exception: {e}".encode()
upload_logs(task, text)
raise e
Loading