diff --git a/arkindex/ponos/models.py b/arkindex/ponos/models.py index f468e90ae81d592c4ce853af89c0858c026c73f5..907aee6680f07c582a7665e188ed31b16c6f879a 100644 --- a/arkindex/ponos/models.py +++ b/arkindex/ponos/models.py @@ -14,7 +14,7 @@ from django.contrib.postgres.fields import HStoreField from django.core.exceptions import ValidationError from django.core.validators import MinLengthValidator, MinValueValidator, RegexValidator from django.db import models, transaction -from django.db.models import Exists, OuterRef, Q +from django.db.models import Count, Exists, OuterRef, Q from django.urls import reverse from django.utils import timezone from enumfields import Enum, EnumField @@ -154,10 +154,15 @@ class Agent(models.Model): """ Metric used to estimate the load on this agent starting new tasks. Used as the cost function to minimize overall agents load while attributing tasks. + An agent cannot have more tasks that their CPU count, as the system cannot really process tasks faster. :param tasks: Number of tasks to estimate the cost for. :returns: A cost expressed as a percentage. If > 1, the agent would be overloaded. """ + current_tasks_count = getattr(self, 'current_tasks', 0) + if current_tasks_count + AGENT_SLOT["cpu"] >= self.cpu_cores: + return 1 + cpu_cost = (self.cpu_load + tasks * AGENT_SLOT["cpu"]) / self.cpu_cores ram_cost = self.ram_load + tasks * AGENT_SLOT["ram"] / self.ram_total return max(cpu_cost, ram_cost) @@ -165,6 +170,7 @@ class Agent(models.Model): def next_tasks(self): """ Compute the next tasks that should be run by an agent. + Pending tasks are attributed to agents with the lightest load first. :returns: A list of tasks. """ @@ -184,9 +190,16 @@ class Agent(models.Model): return [] # Retrieve active agents within the same farm - active_agents = Agent.objects.filter( - last_ping__gte=timezone.now() - AGENT_TIMEOUT - ).filter(farm_id=self.farm_id) + active_agents = ( + Agent.objects + .filter( + last_ping__gte=timezone.now() - AGENT_TIMEOUT, + farm_id=self.farm_id, + ) + .annotate( + current_tasks=Count("tasks", filter=Q(tasks__state=State.Running)) + ) + ) # List available GPUs (without any active task assigned) available_gpus = list( @@ -215,17 +228,15 @@ class Agent(models.Model): # Compare the cost of adding a new task on compatible agents min_cost_agent = min( active_agents, - key=lambda agent: agent._estimate_new_tasks_cost( - tasks=len(attributed_tasks[agent]) + 1 - ), + key=lambda agent: agent._estimate_new_tasks_cost(tasks=len(attributed_tasks[agent]) + 1), ) # Append the task to the queue of the agent with the minimal cost - tasks = attributed_tasks[min_cost_agent] - if min_cost_agent._estimate_new_tasks_cost(len(tasks) + 1) > 1: + current_tasks = attributed_tasks[min_cost_agent] + if min_cost_agent._estimate_new_tasks_cost(tasks=len(current_tasks) + 1) >= 1: # Attributing the next task would overload the system break - tasks.append(task) + current_tasks.append(task) # Return tasks attributed to the agent making the request return attributed_tasks[self] diff --git a/arkindex/ponos/tests/test_tasks_attribution.py b/arkindex/ponos/tests/test_tasks_attribution.py index 0b9139b0f30bb1dc1b4fbfe10fd3aa7e214c3dd0..a4a7c41411328c6e4e2830a882616dcb0816b1e5 100644 --- a/arkindex/ponos/tests/test_tasks_attribution.py +++ b/arkindex/ponos/tests/test_tasks_attribution.py @@ -181,6 +181,7 @@ class TasksAttributionTestCase(FixtureTestCase): hostname=f"agent_{i}", cpu_cores=11, ram_total=20e9, + ram_load=0.1, ) for i in range(1, 4) ] @@ -202,11 +203,12 @@ class TasksAttributionTestCase(FixtureTestCase): # Jump in the future. Agent 3 did never replied future_now = timezone.now() + timedelta(seconds=2) - Agent.objects.filter(hostname__in=["agent_1", "agent_2"]).update( - ram_load=0.64, - cpu_load=3.99, - last_ping=future_now, - ) + agent_1.cpu_load = 3.98 + agent_1.last_ping = future_now + agent_1.save() + agent_2.cpu_load = 3.99 + agent_2.last_ping = future_now + agent_2.save() with patch.object(model_tz, "now") as now_mock: now_mock.return_value = future_now @@ -219,7 +221,7 @@ class TasksAttributionTestCase(FixtureTestCase): ["agent_1", "agent_2"], ) - # 3 tasks should be distributed among 2 agents with similar load + # The 3 remaining tasks should be distributed among 2 agents with similar load tasks_agent_1 = agent_1.next_tasks() self.assertEqual(len(tasks_agent_1), 2) @@ -449,3 +451,31 @@ class TasksAttributionTestCase(FixtureTestCase): self.assertEqual(tasks[3:6], np_tasks) # Last three tasks should be the low priority ones self.assertEqual(tasks[6:], lp_tasks) + + def test_no_cpu_overload(self): + """A ponos agent should never run more tasks than its number of reported cores.""" + # Agent 1 has 4 free cores + agent_1 = self._build_agent(hostname="agent_1", cpu_cores=4, ram_total=128e9, cpu_load=0.0) + # Agent 2 only has 2 out of 64 cores available + agent_2 = self._build_agent(hostname="agent_2", cpu_cores=64, ram_total=128e9, cpu_load=61.9) + self.assertEqual( + [a.hostname for a in Agent.objects.all() if self._active_agent(a) is True], + ["agent_1", "agent_2"], + ) + + tasks = self._add_pending_tasks(3) + # Agent 1 has 4 CPU slots + self.assertEqual(len(agent_1.next_tasks()), 3) + # Agent 2 has a huge load + self.assertEqual(len(agent_2.next_tasks()), 0) + + # Assign those 3 tasks to agent 1, its real load is 10% + Task.objects.filter(id__in=[t.id for t in tasks]).update(agent=agent_1, state=State.Running) + agent_1.cpu_load = 0.4 + agent_1.save() + + self._add_pending_tasks(10, slug_ext='new') + # Agent 1 has a low load but 0 CPU slots according to the currently running tasks + self.assertEqual(len(agent_1.next_tasks()), 0) + # Agent 2 has a huge load but still 2 CPU slots + self.assertEqual(len(agent_2.next_tasks()), 2)