diff --git a/.github/workflows/conda_env/environment_dask_2.30.0.yml b/.github/workflows/conda_env/environment_dask_2.30.0.yml index eb1b4192..92fc69ea 100644 --- a/.github/workflows/conda_env/environment_dask_2.30.0.yml +++ b/.github/workflows/conda_env/environment_dask_2.30.0.yml @@ -1,4 +1,4 @@ -name: test +name: ips_test_dask_2.30 channels: - conda-forge dependencies: diff --git a/.github/workflows/conda_env/environment_dask_2.5.2.yml b/.github/workflows/conda_env/environment_dask_2.5.2.yml index e3171522..932c4cfb 100644 --- a/.github/workflows/conda_env/environment_dask_2.5.2.yml +++ b/.github/workflows/conda_env/environment_dask_2.5.2.yml @@ -1,4 +1,4 @@ -name: test +name: ips_test_dask_2.5 channels: - conda-forge dependencies: diff --git a/.github/workflows/conda_env/environment_linux.yml b/.github/workflows/conda_env/environment_linux.yml index 84f7a8bd..cb5293ff 100644 --- a/.github/workflows/conda_env/environment_linux.yml +++ b/.github/workflows/conda_env/environment_linux.yml @@ -1,4 +1,4 @@ -name: test +name: ips_test channels: - conda-forge dependencies: @@ -6,5 +6,5 @@ dependencies: - pytest-timeout - psutil - mpi4py -- dask=2021.10.0 +- dask=2021.11.1 - dakota diff --git a/.github/workflows/conda_env/environment_macos.yml b/.github/workflows/conda_env/environment_macos.yml index 3ee040f1..c921e60c 100644 --- a/.github/workflows/conda_env/environment_macos.yml +++ b/.github/workflows/conda_env/environment_macos.yml @@ -1,8 +1,8 @@ -name: test +name: ips_test channels: - conda-forge dependencies: - pytest-cov - pytest-timeout - psutil -- dask=2021.10.0 +- dask=2021.11.1 diff --git a/.github/workflows/conda_env/environment_minimal.yml b/.github/workflows/conda_env/environment_minimal.yml index e3548b99..918096d3 100644 --- a/.github/workflows/conda_env/environment_minimal.yml +++ b/.github/workflows/conda_env/environment_minimal.yml @@ -1,4 +1,4 @@ -name: test +name: ips_test_minimal channels: - conda-forge dependencies: diff --git a/.github/workflows/conda_env/environment_static_analysis.yml b/.github/workflows/conda_env/environment_static_analysis.yml index d79ad9f4..93d8893a 100644 --- a/.github/workflows/conda_env/environment_static_analysis.yml +++ b/.github/workflows/conda_env/environment_static_analysis.yml @@ -1,4 +1,4 @@ -name: static_analysis +name: ips_static_analysis channels: - conda-forge dependencies: @@ -7,4 +7,4 @@ dependencies: - pylint=2.11.1 - bandit=1.7.0 - codespell=2.1.0 -- dask=2021.10.0 +- dask=2021.11.1 diff --git a/ipsframework/dakota_bridge.py b/ipsframework/dakota_bridge.py index 61307a93..214e2dd5 100644 --- a/ipsframework/dakota_bridge.py +++ b/ipsframework/dakota_bridge.py @@ -13,7 +13,7 @@ class Driver(Component): def __init__(self, services, config): - Component.__init__(self, services, config) + super().__init__(services, config) self.done = False self.events_received = [] self.socket_address = '' diff --git a/ipsframework/ipsExceptions.py b/ipsframework/ipsExceptions.py index aace0a01..9887762f 100644 --- a/ipsframework/ipsExceptions.py +++ b/ipsframework/ipsExceptions.py @@ -69,11 +69,34 @@ def __init__(self, caller_id, tid, nproc, ppn, max_procs, max_ppn): self.args = (caller_id, tid, nproc, ppn, max_procs, max_ppn) def __str__(self): - s = "component %s requested %d processes with %d processes per node, while the number of processes requested"\ + s = "component %s requested %d processes with %d processes per node, while the number of processes requested "\ "is less than the max (%d), the processes per node value is too low." % (self.caller_id, self.nproc, self.ppn, self.max_procs) return s +class ResourceRequestUnequalPartitioningException(Exception): + """Exception raised by the resource manager when it is possible to + launch the requested number of processes, but the requested number + of processes and processes per node will result in unequal + partitioning of nodes. + """ + + def __init__(self, caller_id, tid, nproc, ppn, max_procs, max_ppn): + super().__init__() + self.caller_id = caller_id + self.task_id = tid + self.nproc = nproc + self.ppn = ppn + self.max_procs = max_procs + self.max_ppn = max_ppn + self.args = (caller_id, tid, nproc, ppn, max_procs, max_ppn) + + def __str__(self): + s = "component %s requested %d processes with %d processes per node, while the number of processes requested is less than the max (%d), "\ + "it will result in unequal partitioning of processes across nodes" % (self.caller_id, self.nproc, self.ppn, self.max_procs) + return s + + class InvalidResourceSettingsException(Exception): """ Exception raised by the resource helper to indicate inconsistent resource settings. diff --git a/ipsframework/ipsLogging.py b/ipsframework/ipsLogging.py index e3bbb56e..b9b2cdc8 100644 --- a/ipsframework/ipsLogging.py +++ b/ipsframework/ipsLogging.py @@ -19,10 +19,9 @@ class myLogRecordStreamHandler(socketserver.StreamRequestHandler): def __init__(self, request, client_address, server, handler): self.handler = handler - socketserver.StreamRequestHandler.__init__(self, - request, - client_address, - server) + super().__init__(request, + client_address, + server) def handle(self): """ @@ -68,7 +67,7 @@ class LogRecordSocketReceiver(socketserver.ThreadingUnixStreamServer): def __init__(self, log_pipe, handler=myLogRecordStreamHandler): - socketserver.UnixStreamServer.__init__(self, log_pipe, handler) + super().__init__(log_pipe, handler) def get_file_no(self): return self.socket.fileno() diff --git a/ipsframework/messages.py b/ipsframework/messages.py index 6b8e9465..3c21ced8 100644 --- a/ipsframework/messages.py +++ b/ipsframework/messages.py @@ -46,7 +46,7 @@ class ServiceRequestMessage(Message): identifier = 'REQUEST' def __init__(self, sender_id, receiver_id, target_comp_id, target_method, *args, **keywords): - Message.__init__(self, sender_id, receiver_id) + super().__init__(sender_id, receiver_id) self.target_comp_id = target_comp_id self.target_method = target_method self.args = args @@ -70,7 +70,7 @@ class ServiceResponseMessage(Message): identifier = 'RESPONSE' def __init__(self, sender_id, receiver_id, request_msg_id, status, *args): - Message.__init__(self, sender_id, receiver_id) + super().__init__(sender_id, receiver_id) self.request_msg_id = request_msg_id self.status = status self.args = args @@ -92,7 +92,7 @@ class MethodInvokeMessage(Message): identifier = 'INVOKE' def __init__(self, sender_id, receiver_id, call_id, target_method, *args, **keywords): - Message.__init__(self, sender_id, receiver_id) + super().__init__(sender_id, receiver_id) self.call_id = call_id self.target_method = target_method self.args = args @@ -115,7 +115,7 @@ class MethodResultMessage(Message): identifier = 'RESULT' def __init__(self, sender_id, receiver_id, call_id, status, *args): - Message.__init__(self, sender_id, receiver_id) + super().__init__(sender_id, receiver_id) self.call_id = call_id self.args = args self.status = status diff --git a/ipsframework/portalBridge.py b/ipsframework/portalBridge.py index dbadb427..de4ba394 100644 --- a/ipsframework/portalBridge.py +++ b/ipsframework/portalBridge.py @@ -80,7 +80,7 @@ def __init__(self, services, config): Declaration of private variables and initialization of :py:class:`component.Component` object. """ - Component.__init__(self, services, config) + super().__init__(services, config) self.host = '' self.curTime = time.localtime() self.startTime = self.curTime diff --git a/ipsframework/resourceManager.py b/ipsframework/resourceManager.py index 535a9e3d..c4deb05a 100644 --- a/ipsframework/resourceManager.py +++ b/ipsframework/resourceManager.py @@ -5,9 +5,10 @@ import os import time from math import ceil -from .ipsExceptions import InsufficientResourcesException, \ - BadResourceRequestException, \ - ResourceRequestMismatchException +from .ipsExceptions import (InsufficientResourcesException, + BadResourceRequestException, + ResourceRequestMismatchException, + ResourceRequestUnequalPartitioningException) from .ips_es_spec import eventManager from .resourceHelper import getResourceList from .node_structure import Node @@ -328,6 +329,11 @@ def get_allocation(self, comp_id, nproc, task_id, c = ceil(float(nproc) / ppn) raise InsufficientResourcesException(comp_id, task_id, c, c - len(self.avail_nodes)) + if nodes == "unequal": + raise ResourceRequestUnequalPartitioningException(comp_id, task_id, + nproc, ppn, + self.total_cores, + self.max_ppn) else: try: self.processes += nproc @@ -441,7 +447,10 @@ def check_whole_node_cap(self, nproc, ppn): whole_cap += ppn nodes.append(n) if whole_cap >= nproc: - return True, nodes + if nproc > ppn and nproc % ppn != 0: + return False, "unequal" + else: + return True, nodes except Exception: self.fwk.exception("problem in RM.check_whole_node_cap") raise diff --git a/ipsframework/runspaceInitComponent.py b/ipsframework/runspaceInitComponent.py index 8e2fc1a3..6c8b910b 100644 --- a/ipsframework/runspaceInitComponent.py +++ b/ipsframework/runspaceInitComponent.py @@ -29,7 +29,7 @@ def __init__(self, services, config): Declaration of private variables and initialization of :py:class:`component.Component` object. """ - Component.__init__(self, services, config) + super().__init__(services, config) # get the simRootDir self.simRootDir = services.get_config_param('SIM_ROOT') self.cwd = self.config['OS_CWD'] diff --git a/ipsframework/taskManager.py b/ipsframework/taskManager.py index 7276efb0..5ee6aa28 100644 --- a/ipsframework/taskManager.py +++ b/ipsframework/taskManager.py @@ -553,14 +553,14 @@ def init_task_pool(self, init_task_msg): except BadResourceRequestException as e: self.fwk.error("There has been a fatal error, %s requested %d too many processors in task %d", caller_id, e.deficit, e.task_id) - for (task_id, cmd) in list(ret_dict.values()): + for task_id, _, _ in ret_dict.values(): self.resource_mgr.release_allocation(task_id, -1) del self.curr_task_table[task_id] raise except ResourceRequestMismatchException as e: self.fwk.error("There has been a fatal error, %s requested too few processors per node to launch task %d (request: procs = %d, ppn = %d)", caller_id, e.task_id, e.nproc, e.ppn) - for (task_id, cmd) in list(ret_dict.values()): + for task_id, _, _ in ret_dict.values(): self.resource_mgr.release_allocation(task_id, -1) del self.curr_task_table[task_id] raise diff --git a/tests/new/test_resourceManager.py b/tests/new/test_resourceManager.py index fd172881..4827290d 100644 --- a/tests/new/test_resourceManager.py +++ b/tests/new/test_resourceManager.py @@ -4,7 +4,8 @@ from ipsframework.resourceManager import ResourceManager from ipsframework.ipsExceptions import (InsufficientResourcesException, BadResourceRequestException, - ResourceRequestMismatchException) + ResourceRequestMismatchException, + ResourceRequestUnequalPartitioningException) def test_allocations(tmpdir): @@ -67,6 +68,17 @@ def test_allocations(tmpdir): assert "component comp0 requested 3 nodes, which is more than possible by 1 nodes, for task 0." == str(excinfo.value) + with pytest.raises(ResourceRequestUnequalPartitioningException) as excinfo: + rm.get_allocation(comp_id='comp0', + nproc=3, + task_id=0, + whole_nodes=True, + whole_socks=False, + task_ppn=2) + + assert "component comp0 requested 3 processes with 2 processes per node, while the number of processes requested is less than the max (8), "\ + "it will result in unequal partitioning of processes across nodes" == str(excinfo.value) + with pytest.raises(BadResourceRequestException) as excinfo: rm.get_allocation(comp_id='comp0', nproc=12, @@ -84,7 +96,7 @@ def test_allocations(tmpdir): whole_socks=False, task_ppn=2) - assert ("component comp0 requested 6 processes with 2 processes per node, while the number of processes requestedis less than the max (8), " + assert ("component comp0 requested 6 processes with 2 processes per node, while the number of processes requested is less than the max (8), " "the processes per node value is too low." == str(excinfo.value)) rm.get_allocation(comp_id='comp0', diff --git a/tests/new/test_taskManager.py b/tests/new/test_taskManager.py index df9670b7..43182c65 100644 --- a/tests/new/test_taskManager.py +++ b/tests/new/test_taskManager.py @@ -1,4 +1,10 @@ -from ipsframework.taskManager import TaskManager +from ipsframework import TaskManager, ResourceManager +from ipsframework.messages import ServiceRequestMessage +from ipsframework.ipsExceptions import (BadResourceRequestException, + ResourceRequestMismatchException, + BlockedMessageException, + InsufficientResourcesException, + ResourceRequestUnequalPartitioningException) import pytest import shutil from unittest import mock @@ -315,7 +321,7 @@ def test_build_launch_cmd_srun(): tm.task_launch_cmd = 'srun' tm.resource_mgr = mock.Mock(nodes=['node1']) - cmd = tm.build_launch_cmd(nproc=1, + cmd = tm.build_launch_cmd(nproc=4, binary='executable', cmd_args=(), working_dir=None, @@ -326,9 +332,9 @@ def test_build_launch_cmd_srun(): partial_nodes=None, task_id=None) - assert cmd == ('srun -N 2 -n 1 executable ', None) + assert cmd == ('srun -N 2 -n 4 executable ', None) - cmd = tm.build_launch_cmd(nproc=1, + cmd = tm.build_launch_cmd(nproc=4, binary='executable', cmd_args=('13', '42'), working_dir=None, @@ -339,4 +345,200 @@ def test_build_launch_cmd_srun(): partial_nodes=None, task_id=None) - assert cmd == ('srun -N 2 -n 1 executable 13 42', None) + assert cmd == ('srun -N 2 -n 4 executable 13 42', None) + + +def test_init_task_srun(tmpdir): + # this will combine calls to ResourceManager.get_allocation and + # TaskManager.build_launch_cmd + + fwk = mock.Mock() + dm = mock.Mock() + cm = mock.Mock() + cm.fwk_sim_name = 'sim_name' + cm.sim_map = {'sim_name': mock.Mock(sim_root=str(tmpdir))} + cm.get_platform_parameter.return_value = 'HOST' + + tm = TaskManager(fwk) + + rm = ResourceManager(fwk) + + tm.initialize(dm, rm, cm) + rm.initialize(dm, tm, cm, + cmd_nodes=2, + cmd_ppn=2) + + tm.task_launch_cmd = 'srun' + rm.accurateNodes = True + + def init_final_task(nproc, tppn): + task_id, cmd, _ = tm.init_task(ServiceRequestMessage('id', 'id', 'c', 'init_task', + nproc, 'exe', '/dir', tppn, True, + True, True)) + tm.finish_task(ServiceRequestMessage('id', 'id', 'c', 'finish_task', + task_id, None)) + return task_id, cmd + + task_id, cmd = init_final_task(1, 0) + assert task_id == 1 + assert cmd == "srun -N 1 -n 1 exe " + + task_id, cmd = init_final_task(2, 0) + assert task_id == 2 + assert cmd == "srun -N 1 -n 2 exe " + + with pytest.raises(ResourceRequestUnequalPartitioningException): + init_final_task(3, 0) + + task_id, cmd = init_final_task(4, 0) + assert task_id == 4 + assert cmd == "srun -N 2 -n 4 exe " + + with pytest.raises(BadResourceRequestException): + init_final_task(5, 0) + + task_id, cmd = init_final_task(1, 1) + assert task_id == 6 + assert cmd == "srun -N 1 -n 1 exe " + + task_id, cmd = init_final_task(2, 1) + assert task_id == 7 + assert cmd == "srun -N 2 -n 2 exe " + + with pytest.raises(ResourceRequestMismatchException): + init_final_task(3, 1) + + # start two task, second should fail with Insufficient Resources depending on block + task_id, cmd, _ = tm.init_task(ServiceRequestMessage('id', 'id', 'c', 'init_task', + 4, 'exe', '/dir', 0, True, + True, True)) + + with pytest.raises(BlockedMessageException): + tm.init_task(ServiceRequestMessage('id', 'id', 'c', 'init_task', + 1, 'exe', '/dir', 0, True, + True, True)) + + with pytest.raises(InsufficientResourcesException): + tm.init_task(ServiceRequestMessage('id', 'id', 'c', 'init_task', + 1, 'exe', '/dir', 0, False, + True, True)) + + +def test_init_task_pool_srun(tmpdir): + # this will combine calls to ResourceManager.get_allocation and + # TaskManager.build_launch_cmd + + fwk = mock.Mock() + dm = mock.Mock() + cm = mock.Mock() + cm.fwk_sim_name = 'sim_name' + cm.sim_map = {'sim_name': mock.Mock(sim_root=str(tmpdir))} + cm.get_platform_parameter.return_value = 'HOST' + + tm = TaskManager(fwk) + + rm = ResourceManager(fwk) + + tm.initialize(dm, rm, cm) + rm.initialize(dm, tm, cm, + cmd_nodes=2, + cmd_ppn=2) + + tm.task_launch_cmd = 'srun' + rm.accurateNodes = True + + def init_final_task_pool(nproc=1, tppn=0, number_of_tasks=1, msg=None): + if msg is None: + msg = {f'task{n}': (nproc, '/dir', f'exe{n}', (f'arg{n}',), tppn, True, False) for n in range(number_of_tasks)} + retval = tm.init_task_pool(ServiceRequestMessage('id', 'id', 'c', 'init_task_pool', msg)) + for task_id, _, _ in retval.values(): + tm.finish_task(ServiceRequestMessage('id', 'id', 'c', 'finish_task', + task_id, None)) + return retval + + retval = init_final_task_pool(1, 0, 1) + assert len(retval) == 1 + task_id, cmd, _ = retval['task0'] + assert task_id == 1 + assert cmd == 'srun -N 1 -n 1 exe0 arg0' + + retval = init_final_task_pool(2, 0, 1) + assert len(retval) == 1 + task_id, cmd, _ = retval['task0'] + assert task_id == 2 + assert cmd == 'srun -N 1 -n 2 exe0 arg0' + + with pytest.raises(ResourceRequestUnequalPartitioningException): + init_final_task_pool(3, 0, 1) + + retval = init_final_task_pool(4, 0, 1) + assert len(retval) == 1 + task_id, cmd, _ = retval['task0'] + assert task_id == 4 + assert cmd == 'srun -N 2 -n 4 exe0 arg0' + + with pytest.raises(BadResourceRequestException): + init_final_task_pool(5, 0, 1) + + retval = init_final_task_pool(1, 1, 1) + assert len(retval) == 1 + task_id, cmd, _ = retval['task0'] + assert task_id == 6 + assert cmd == 'srun -N 1 -n 1 exe0 arg0' + + retval = init_final_task_pool(2, 1, 1) + assert len(retval) == 1 + task_id, cmd, _ = retval['task0'] + assert task_id == 7 + assert cmd == 'srun -N 2 -n 2 exe0 arg0' + + with pytest.raises(ResourceRequestMismatchException): + init_final_task_pool(3, 1, 1) + + retval = init_final_task_pool(1, 0, 2) + assert len(retval) == 2 + task_id, cmd, _ = retval['task0'] + assert task_id == 9 + assert cmd == 'srun -N 1 -n 1 exe0 arg0' + task_id, cmd, _ = retval['task1'] + assert task_id == 10 + assert cmd == 'srun -N 1 -n 1 exe1 arg1' + + retval = init_final_task_pool(2, 0, 2) + assert len(retval) == 2 + task_id, cmd, _ = retval['task0'] + assert task_id == 11 + assert cmd == 'srun -N 1 -n 2 exe0 arg0' + task_id, cmd, _ = retval['task1'] + assert task_id == 12 + assert cmd == 'srun -N 1 -n 2 exe1 arg1' + + retval = init_final_task_pool(4, 0, 2) + assert len(retval) == 1 + task_id, cmd, _ = retval['task0'] + assert task_id == 13 + assert cmd == 'srun -N 2 -n 4 exe0 arg0' + + # different size tasks + msg = {'task0': (1, '/dir', 'exe0', ('arg0',), 0, True, False), + 'task1': (2, '/dir', 'exe1', ('arg1',), 0, True, False)} + retval = init_final_task_pool(msg=msg) + assert len(retval) == 2 + task_id, cmd, _ = retval['task0'] + assert task_id == 15 + assert cmd == 'srun -N 1 -n 1 exe0 arg0' + task_id, cmd, _ = retval['task1'] + assert task_id == 16 + assert cmd == 'srun -N 1 -n 2 exe1 arg1' + + # one good task, one bad task + msg = {'task0': (1, '/dir', 'exe0', ('arg0',), 0, True, False), + 'task1': (5, '/dir', 'exe1', ('arg1',), 0, True, False)} + with pytest.raises(BadResourceRequestException): + init_final_task_pool(msg=msg) + + # one good task, one bad task + msg = {'task0': (1, '/dir', 'exe0', ('arg0',), 0, True, False), + 'task1': (3, '/dir', 'exe1', ('arg1',), 1, True, False)} + with pytest.raises(ResourceRequestMismatchException): + init_final_task_pool(msg=msg)