Skip to content
Snippets Groups Projects
Commit 78312e51 authored by Valentin Rigal's avatar Valentin Rigal Committed by Bastien Abadie
Browse files

Limit agents tasks to their CPU cores

parent f0541718
No related branches found
No related tags found
1 merge request!2141Limit agents tasks to their CPU cores
......@@ -14,7 +14,7 @@ from django.contrib.postgres.fields import HStoreField
from django.core.exceptions import ValidationError
from django.core.validators import MinLengthValidator, MinValueValidator, RegexValidator
from django.db import models, transaction
from django.db.models import Exists, OuterRef, Q
from django.db.models import Count, Exists, OuterRef, Q
from django.urls import reverse
from django.utils import timezone
from enumfields import Enum, EnumField
......@@ -154,10 +154,15 @@ class Agent(models.Model):
"""
Metric used to estimate the load on this agent starting new tasks.
Used as the cost function to minimize overall agents load while attributing tasks.
An agent cannot have more tasks that their CPU count, as the system cannot really process tasks faster.
:param tasks: Number of tasks to estimate the cost for.
:returns: A cost expressed as a percentage. If > 1, the agent would be overloaded.
"""
current_tasks_count = getattr(self, 'current_tasks', 0)
if current_tasks_count + AGENT_SLOT["cpu"] >= self.cpu_cores:
return 1
cpu_cost = (self.cpu_load + tasks * AGENT_SLOT["cpu"]) / self.cpu_cores
ram_cost = self.ram_load + tasks * AGENT_SLOT["ram"] / self.ram_total
return max(cpu_cost, ram_cost)
......@@ -165,6 +170,7 @@ class Agent(models.Model):
def next_tasks(self):
"""
Compute the next tasks that should be run by an agent.
Pending tasks are attributed to agents with the lightest load first.
:returns: A list of tasks.
"""
......@@ -184,9 +190,16 @@ class Agent(models.Model):
return []
# Retrieve active agents within the same farm
active_agents = Agent.objects.filter(
last_ping__gte=timezone.now() - AGENT_TIMEOUT
).filter(farm_id=self.farm_id)
active_agents = (
Agent.objects
.filter(
last_ping__gte=timezone.now() - AGENT_TIMEOUT,
farm_id=self.farm_id,
)
.annotate(
current_tasks=Count("tasks", filter=Q(tasks__state=State.Running))
)
)
# List available GPUs (without any active task assigned)
available_gpus = list(
......@@ -215,17 +228,15 @@ class Agent(models.Model):
# Compare the cost of adding a new task on compatible agents
min_cost_agent = min(
active_agents,
key=lambda agent: agent._estimate_new_tasks_cost(
tasks=len(attributed_tasks[agent]) + 1
),
key=lambda agent: agent._estimate_new_tasks_cost(tasks=len(attributed_tasks[agent]) + 1),
)
# Append the task to the queue of the agent with the minimal cost
tasks = attributed_tasks[min_cost_agent]
if min_cost_agent._estimate_new_tasks_cost(len(tasks) + 1) > 1:
current_tasks = attributed_tasks[min_cost_agent]
if min_cost_agent._estimate_new_tasks_cost(tasks=len(current_tasks) + 1) >= 1:
# Attributing the next task would overload the system
break
tasks.append(task)
current_tasks.append(task)
# Return tasks attributed to the agent making the request
return attributed_tasks[self]
......
......@@ -181,6 +181,7 @@ class TasksAttributionTestCase(FixtureTestCase):
hostname=f"agent_{i}",
cpu_cores=11,
ram_total=20e9,
ram_load=0.1,
)
for i in range(1, 4)
]
......@@ -202,11 +203,12 @@ class TasksAttributionTestCase(FixtureTestCase):
# Jump in the future. Agent 3 did never replied
future_now = timezone.now() + timedelta(seconds=2)
Agent.objects.filter(hostname__in=["agent_1", "agent_2"]).update(
ram_load=0.64,
cpu_load=3.99,
last_ping=future_now,
)
agent_1.cpu_load = 3.98
agent_1.last_ping = future_now
agent_1.save()
agent_2.cpu_load = 3.99
agent_2.last_ping = future_now
agent_2.save()
with patch.object(model_tz, "now") as now_mock:
now_mock.return_value = future_now
......@@ -219,7 +221,7 @@ class TasksAttributionTestCase(FixtureTestCase):
["agent_1", "agent_2"],
)
# 3 tasks should be distributed among 2 agents with similar load
# The 3 remaining tasks should be distributed among 2 agents with similar load
tasks_agent_1 = agent_1.next_tasks()
self.assertEqual(len(tasks_agent_1), 2)
......@@ -449,3 +451,31 @@ class TasksAttributionTestCase(FixtureTestCase):
self.assertEqual(tasks[3:6], np_tasks)
# Last three tasks should be the low priority ones
self.assertEqual(tasks[6:], lp_tasks)
def test_no_cpu_overload(self):
"""A ponos agent should never run more tasks than its number of reported cores."""
# Agent 1 has 4 free cores
agent_1 = self._build_agent(hostname="agent_1", cpu_cores=4, ram_total=128e9, cpu_load=0.0)
# Agent 2 only has 2 out of 64 cores available
agent_2 = self._build_agent(hostname="agent_2", cpu_cores=64, ram_total=128e9, cpu_load=61.9)
self.assertEqual(
[a.hostname for a in Agent.objects.all() if self._active_agent(a) is True],
["agent_1", "agent_2"],
)
tasks = self._add_pending_tasks(3)
# Agent 1 has 4 CPU slots
self.assertEqual(len(agent_1.next_tasks()), 3)
# Agent 2 has a huge load
self.assertEqual(len(agent_2.next_tasks()), 0)
# Assign those 3 tasks to agent 1, its real load is 10%
Task.objects.filter(id__in=[t.id for t in tasks]).update(agent=agent_1, state=State.Running)
agent_1.cpu_load = 0.4
agent_1.save()
self._add_pending_tasks(10, slug_ext='new')
# Agent 1 has a low load but 0 CPU slots according to the currently running tasks
self.assertEqual(len(agent_1.next_tasks()), 0)
# Agent 2 has a huge load but still 2 CPU slots
self.assertEqual(len(agent_2.next_tasks()), 2)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment