Skip to content
Snippets Groups Projects

Execute docker tasks in RQ

Merged Valentin Rigal requested to merge dummy-tasks into community
1 file
+ 22
7
Compare changes
  • Side-by-side
  • Inline
+ 22
7
@@ -77,7 +77,17 @@ def run_docker_task(client, task, temp_dir):
logger.debug(f"Pulling docker image '{task.image}'")
client.images.pull(task.image)
# 2. Do run the container asynchronously
# 2. Fetch artifacts
logger.info("Fetching artifacts from parents")
for parent in task.parents.order_by("depth", "id"):
folder = temp_dir / str(parent.slug)
folder.mkdir()
for artifact in parent.artifacts.all():
path = (folder / artifact.path).resolve()
# TODO: Artifact could leak any data, we should ensure path is a children of folder
artifact.download_to(str(path))
# 3. Do run the container asynchronously
logger.debug("Running container")
kwargs = {
"environment": {
@@ -86,8 +96,13 @@ def run_docker_task(client, task, temp_dir):
},
"detach": True,
"network": "host",
"volumes": {temp_dir: {"bind": "/data/current", "mode": "rw"}},
"volumes": {temp_dir: {"bind": "/data", "mode": "rw"}},
}
artifacts_dir = temp_dir / str(task.id)
artifacts_dir.mkdir()
# The symlink will only work within docker context as binded to /data/<task_uuid>/
(temp_dir / "current").symlink_to(Path(f"/data/{task.id}"))
if task.requires_gpu:
# Assign all GPUs to that container
# TODO: Make sure this works
@@ -99,7 +114,7 @@ def run_docker_task(client, task, temp_dir):
task.state = State.Running
task.save()
# 3. Read logs (see agent.setup_logging)
# 4. Read logs (see agent.setup_logging)
logger.debug("Reading logs from the docker container")
data = b""
for line in container.logs(stream=True):
@@ -112,7 +127,7 @@ def run_docker_task(client, task, temp_dir):
except Exception as e:
logger.warning(f"Failed uploading logs for task {task}: {e}")
# 4. Retrieve the state of the container
# 5. Retrieve the state of the container
container.reload()
exit_code = container.attrs["State"]["ExitCode"]
if exit_code != 0:
@@ -124,14 +139,14 @@ def run_docker_task(client, task, temp_dir):
task.state = State.Completed
task.save()
# 5. Upload artifacts
# 6. Upload artifacts
logger.info(f"Uploading artifacts for task {task}")
for path in Path(temp_dir).glob("**/*"):
for path in Path(artifacts_dir).glob("**/*"):
if path.is_dir():
continue
try:
upload_artifacts(task, path, temp_dir)
upload_artifacts(task, path, artifacts_dir)
except Exception as e:
logger.warning(
f"Failed uploading artifacts for task {task}: {e}"
Loading