From 92cff6748ed9396b7aaff31a51c8f0b6d5580343 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Wed, 3 Jun 2026 08:09:34 +0200 Subject: [PATCH 01/14] [Bug] Ignore duplicate entries in cache --- src/executorlib/standalone/hdf.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/executorlib/standalone/hdf.py b/src/executorlib/standalone/hdf.py index dffc5d4b2..5f1b5d69c 100644 --- a/src/executorlib/standalone/hdf.py +++ b/src/executorlib/standalone/hdf.py @@ -33,9 +33,10 @@ def dump(file_name: Optional[str], data_dict: dict) -> None: os.makedirs(os.path.dirname(file_name_abs), exist_ok=True) with h5py.File(file_name_abs, "a") as fname: for data_key, data_value in data_dict.items(): - if data_key in group_dict: + path = "/" + group_dict[data_key] + if data_key in group_dict and path not in fname: fname.create_dataset( - name="/" + group_dict[data_key], + name=path, data=np.void(cloudpickle.dumps(data_value)), ) From 4e1688737345e7f785ad8abde09e8050428c8b60 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Jan=C3=9Fen?= Date: Wed, 3 Jun 2026 08:33:55 +0200 Subject: [PATCH 02/14] fix tests --- src/executorlib/standalone/hdf.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/executorlib/standalone/hdf.py b/src/executorlib/standalone/hdf.py index 5f1b5d69c..f6a52d3a0 100644 --- a/src/executorlib/standalone/hdf.py +++ b/src/executorlib/standalone/hdf.py @@ -33,10 +33,9 @@ def dump(file_name: Optional[str], data_dict: dict) -> None: os.makedirs(os.path.dirname(file_name_abs), exist_ok=True) with h5py.File(file_name_abs, "a") as fname: for data_key, data_value in data_dict.items(): - path = "/" + group_dict[data_key] - if data_key in group_dict and path not in fname: + if data_key in group_dict and "/" + group_dict[data_key] not in fname: fname.create_dataset( - name=path, + name="/" + group_dict[data_key], data=np.void(cloudpickle.dumps(data_value)), ) From 9d4c0adaa31fbc285159bde2baa264fa8c591a81 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 3 Jun 2026 09:28:30 +0200 Subject: [PATCH 03/14] Add test --- tests/unit/executor/test_single_cache.py | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/tests/unit/executor/test_single_cache.py b/tests/unit/executor/test_single_cache.py index f04bd4dc1..b80619b6e 100644 --- a/tests/unit/executor/test_single_cache.py +++ b/tests/unit/executor/test_single_cache.py @@ -1,6 +1,7 @@ import os import shutil import unittest +from time import sleep from executorlib import SingleNodeExecutor, get_cache_data from executorlib.standalone.serialize import cloudpickle_register @@ -17,6 +18,11 @@ def get_error(a): raise ValueError(a) +def sum_with_wait(a, b): + sleep((a + b) / 10) + return a + b + + class AddClass: def __call__(self, a, b): return a+b @@ -69,6 +75,13 @@ def test_cache_key(self): sum([sum(c["input_args"][0]) for c in cache_lst]), sum(result_lst) ) + def test_cache_duplicate_function(self): + cache_directory = os.path.abspath("cache_duplicate") + with SingleNodeExecutor(hostname_localhost=True, cache_directory=cache_directory) as exe: + f1 = exe.submit(sum_with_wait, 1, 1) + f2 = exe.submit(sum_with_wait, 1, 1) + self.assertEqual(f1.result(), f2.result()) + def test_cache_error(self): cache_directory = os.path.abspath("cache_error") with SingleNodeExecutor(cache_directory=cache_directory) as exe: @@ -93,3 +106,4 @@ def test_cache_error_file(self): def tearDown(self): shutil.rmtree("executorlib_cache", ignore_errors=True) shutil.rmtree("cache_error", ignore_errors=True) + shutil.rmtree("cache_duplicate", ignore_errors=True) From 8aa864b1523f77555479f85ac90405b83d3247c3 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 3 Jun 2026 09:39:12 +0200 Subject: [PATCH 04/14] fix error --- src/executorlib/standalone/hdf.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/executorlib/standalone/hdf.py b/src/executorlib/standalone/hdf.py index f6a52d3a0..5fa406a93 100644 --- a/src/executorlib/standalone/hdf.py +++ b/src/executorlib/standalone/hdf.py @@ -1,4 +1,5 @@ import os +import contextlib from concurrent.futures import Future from time import sleep from typing import Any, Optional @@ -33,11 +34,12 @@ def dump(file_name: Optional[str], data_dict: dict) -> None: os.makedirs(os.path.dirname(file_name_abs), exist_ok=True) with h5py.File(file_name_abs, "a") as fname: for data_key, data_value in data_dict.items(): - if data_key in group_dict and "/" + group_dict[data_key] not in fname: - fname.create_dataset( - name="/" + group_dict[data_key], - data=np.void(cloudpickle.dumps(data_value)), - ) + if data_key in group_dict: + with contextlib.suppress(ValueError): + fname.create_dataset( + name="/" + group_dict[data_key], + data=np.void(cloudpickle.dumps(data_value)), + ) def load(file_name: str) -> dict: From 501f3404a7fd66ca5624dc70f842233661ccad04 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 3 Jun 2026 07:39:49 +0000 Subject: [PATCH 05/14] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/executorlib/standalone/hdf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/executorlib/standalone/hdf.py b/src/executorlib/standalone/hdf.py index 5fa406a93..daac1e467 100644 --- a/src/executorlib/standalone/hdf.py +++ b/src/executorlib/standalone/hdf.py @@ -1,5 +1,5 @@ -import os import contextlib +import os from concurrent.futures import Future from time import sleep from typing import Any, Optional From 7fdbe1c48f342e7c4bdd3dd3ee5d06415346356d Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 3 Jun 2026 10:04:13 +0200 Subject: [PATCH 06/14] Add duplicate to file based executors --- src/executorlib/task_scheduler/file/shared.py | 26 +++++++++++++++++-- tests/unit/executor/test_api.py | 11 ++++++++ 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/src/executorlib/task_scheduler/file/shared.py b/src/executorlib/task_scheduler/file/shared.py index e18a1bcca..af1191813 100644 --- a/src/executorlib/task_scheduler/file/shared.py +++ b/src/executorlib/task_scheduler/file/shared.py @@ -91,6 +91,7 @@ def execute_tasks_h5( process_dict: dict = {} cache_dir_dict: dict = {} file_name_dict: dict = {} + duplicate_dict: dict = {} while True: task_dict = None with contextlib.suppress(queue.Empty): @@ -101,6 +102,7 @@ def execute_tasks_h5( cancel_futures=task_dict.get("cancel_futures", False), memory_dict=memory_dict, process_dict=process_dict, + duplicate_dict=duplicate_dict, cache_dir_dict=cache_dir_dict, terminate_function=terminate_function, pysqa_config_directory=pysqa_config_directory, @@ -175,12 +177,17 @@ def execute_tasks_h5( process_dict[task_key] = queue_id memory_dict[task_key] = task_dict["future"] cache_dir_dict[task_key] = cache_directory + elif memory_dict[task_key] != task_dict["future"]: + if task_key not in duplicate_dict: + duplicate_dict[task_key] = [] + duplicate_dict[task_key].append(task_dict["future"]) future_queue.task_done() else: memory_dict = _refresh_memory_dict( memory_dict=memory_dict, cache_dir_dict=cache_dir_dict, process_dict=process_dict, + duplicate_dict=duplicate_dict, terminate_function=terminate_function, pysqa_config_directory=pysqa_config_directory, backend=backend, @@ -189,7 +196,7 @@ def execute_tasks_h5( def _check_task_output( - task_key: str, future_obj: Future, cache_directory: str + task_key: str, future_obj: Future, cache_directory: str, duplicate_dict: dict ) -> Future: """ Check the output of a task and set the result of the future object if available. @@ -198,7 +205,7 @@ def _check_task_output( task_key (str): The key of the task. future_obj (Future): The future object associated with the task. cache_directory (str): The directory where the HDF5 files are stored. - + duplicate_dict (dict): The dictionary mapping task keys to their associated duplicate future objects. Returns: Future: The updated future object. @@ -211,6 +218,13 @@ def _check_task_output( future_obj.set_result(result) elif exec_flag: future_obj.set_exception(result) + if task_key in duplicate_dict: + for duplicate_future in duplicate_dict[task_key]: + if exec_flag and no_error_flag: + duplicate_future.set_result(result) + elif exec_flag: + duplicate_future.set_exception(result) + del duplicate_dict[task_key] return future_obj @@ -281,6 +295,7 @@ def _refresh_memory_dict( memory_dict: dict, cache_dir_dict: dict, process_dict: dict, + duplicate_dict: dict, terminate_function: Optional[Callable] = None, pysqa_config_directory: Optional[str] = None, backend: Optional[str] = None, @@ -293,6 +308,7 @@ def _refresh_memory_dict( memory_dict (dict): dictionary with task keys and future objects cache_dir_dict (dict): dictionary with task keys and cache directories process_dict (dict): dictionary with task keys and process reference. + duplicate_dict (dict): dictionary with task keys and duplicate future objects. terminate_function (callable): The function to terminate the tasks. pysqa_config_directory (str): path to the pysqa config directory (only for pysqa based backend). backend (str): name of the backend used to spawn tasks. @@ -315,6 +331,7 @@ def _refresh_memory_dict( task_key=key, future_obj=value, cache_directory=cache_dir_dict[key], + duplicate_dict=duplicate_dict, ) for key, value in memory_dict.items() if not value.done() @@ -399,6 +416,7 @@ def _shutdown_executor( cancel_futures: bool, memory_dict: dict, process_dict: dict, + duplicate_dict: dict, cache_dir_dict: dict, terminate_function: Optional[Callable] = None, pysqa_config_directory: Optional[str] = None, @@ -421,6 +439,7 @@ def _shutdown_executor( cancel_futures (bool): Whether to cancel futures that have not yet started. memory_dict (dict): Mapping of task keys to their Future objects. process_dict (dict): Mapping of task keys to process handles or queue IDs. + duplicate_dict (dict): Mapping of task keys to lists of duplicate Future objects. cache_dir_dict (dict): Mapping of task keys to the cache directory for each task. terminate_function (Callable, optional): Function used to terminate running processes. pysqa_config_directory (str, optional): Path to the pysqa config directory. @@ -433,6 +452,7 @@ def _shutdown_executor( memory_dict=memory_dict, cache_dir_dict=cache_dir_dict, process_dict=process_dict, + duplicate_dict=duplicate_dict, terminate_function=terminate_function, pysqa_config_directory=pysqa_config_directory, backend=backend, @@ -447,6 +467,7 @@ def _shutdown_executor( memory_dict=memory_dict, cache_dir_dict=cache_dir_dict, process_dict=process_dict, + duplicate_dict=duplicate_dict, terminate_function=terminate_function, pysqa_config_directory=pysqa_config_directory, backend=backend, @@ -465,6 +486,7 @@ def _shutdown_executor( memory_dict=memory_dict, cache_dir_dict=cache_dir_dict, process_dict=process_dict, + duplicate_dict=duplicate_dict, terminate_function=terminate_function, pysqa_config_directory=pysqa_config_directory, backend=backend, diff --git a/tests/unit/executor/test_api.py b/tests/unit/executor/test_api.py index 160c79f69..1f202c2f2 100644 --- a/tests/unit/executor/test_api.py +++ b/tests/unit/executor/test_api.py @@ -156,6 +156,17 @@ def test_executor_dependency_plot(self): self.assertEqual(len(nodes), 4) self.assertEqual(len(edges), 4) + def test_duplicate_futures(self): + with TestClusterExecutor(cache_directory="cache_dir") as exe: + cloudpickle_register(ind=1) + future_1 = exe.submit(add_with_sleep, 1, parameter_2=2) + future_2 = exe.submit(add_with_sleep, 1, parameter_2=2) + self.assertFalse(future_1.done()) + self.assertFalse(future_2.done()) + self.assertEqual(future_1.result(), 3) + self.assertEqual(future_2.result(), 3) + self.assertEqual(len(os.listdir("cache_dir")), 1) + def test_shutdown_wait_false_cancel_futures_false(self): exe = TestClusterExecutor(cache_directory="shutdown_1_dir") cloudpickle_register(ind=1) From 5d057e8f6f5c52311302bfde8f74197ecc804a06 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 3 Jun 2026 10:08:29 +0200 Subject: [PATCH 07/14] fix test --- tests/unit/task_scheduler/file/test_backend.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/unit/task_scheduler/file/test_backend.py b/tests/unit/task_scheduler/file/test_backend.py index b000a159c..4e1a1ecf3 100644 --- a/tests/unit/task_scheduler/file/test_backend.py +++ b/tests/unit/task_scheduler/file/test_backend.py @@ -50,7 +50,7 @@ def test_execute_function_mixed(self): backend_execute_task_in_file(file_name=file_name) future_obj = Future() _check_task_output( - task_key=task_key, future_obj=future_obj, cache_directory=cache_directory + task_key=task_key, future_obj=future_obj, cache_directory=cache_directory, duplicate_dict={}, ) self.assertTrue(future_obj.done()) self.assertEqual(future_obj.result(), 3) @@ -101,7 +101,7 @@ def test_execute_function_mixed_selector_convert(self): backend_execute_task_in_file(file_name=file_name_1) f1 = Future() _check_task_output( - task_key=task_key_1, future_obj=f1, cache_directory=cache_directory + task_key=task_key_1, future_obj=f1, cache_directory=cache_directory, duplicate_dict={}, ) task_key_2, data_dict = serialize_funct( fn=return_list, @@ -113,7 +113,7 @@ def test_execute_function_mixed_selector_convert(self): backend_execute_task_in_file(file_name=file_name_2) f2 = Future() _check_task_output( - task_key=task_key_2, future_obj=f2, cache_directory=cache_directory + task_key=task_key_2, future_obj=f2, cache_directory=cache_directory, duplicate_dict={}, ) fs1 = FutureSelector(future=f1, selector="a") fs2 = FutureSelector(future=f2, selector=1) @@ -143,7 +143,7 @@ def test_execute_function_args(self): backend_execute_task_in_file(file_name=file_name) future_obj = Future() _check_task_output( - task_key=task_key, future_obj=future_obj, cache_directory=cache_directory + task_key=task_key, future_obj=future_obj, cache_directory=cache_directory, duplicate_dict={}, ) self.assertTrue(future_obj.done()) self.assertEqual(future_obj.result(), 3) @@ -170,7 +170,7 @@ def test_execute_function_kwargs(self): backend_execute_task_in_file(file_name=file_name) future_obj = Future() _check_task_output( - task_key=task_key, future_obj=future_obj, cache_directory=cache_directory + task_key=task_key, future_obj=future_obj, cache_directory=cache_directory, duplicate_dict={}, ) self.assertTrue(future_obj.done()) self.assertEqual(future_obj.result(), 3) @@ -198,7 +198,7 @@ def test_execute_function_error(self): backend_execute_task_in_file(file_name=file_name) future_obj = Future() _check_task_output( - task_key=task_key, future_obj=future_obj, cache_directory=cache_directory + task_key=task_key, future_obj=future_obj, cache_directory=cache_directory, duplicate_dict={}, ) self.assertTrue(future_obj.done()) with self.assertRaises(ValueError): From 188d48080089423759cf180e2f7c9e92eb469c14 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 3 Jun 2026 10:11:28 +0200 Subject: [PATCH 08/14] another fix --- tests/unit/executor/test_api.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/unit/executor/test_api.py b/tests/unit/executor/test_api.py index 1f202c2f2..1a3bc167d 100644 --- a/tests/unit/executor/test_api.py +++ b/tests/unit/executor/test_api.py @@ -213,6 +213,7 @@ def test_shutdown_executor_function(self): cancel_futures=True, memory_dict=memory_dict, process_dict={}, + duplicate_dict={}, cache_dir_dict={"a": "cache_dir"}, terminate_function=None, pysqa_config_directory=None, From 243ffc8e1d9cb11bd09923016fff7ac82bd548f4 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 3 Jun 2026 10:18:22 +0200 Subject: [PATCH 09/14] Define duplicate_dict as optional --- src/executorlib/task_scheduler/file/shared.py | 18 +++++++++--------- tests/unit/executor/test_api.py | 2 +- tests/unit/task_scheduler/file/test_backend.py | 12 ++++++------ 3 files changed, 16 insertions(+), 16 deletions(-) diff --git a/src/executorlib/task_scheduler/file/shared.py b/src/executorlib/task_scheduler/file/shared.py index af1191813..af4a56b1f 100644 --- a/src/executorlib/task_scheduler/file/shared.py +++ b/src/executorlib/task_scheduler/file/shared.py @@ -196,7 +196,7 @@ def execute_tasks_h5( def _check_task_output( - task_key: str, future_obj: Future, cache_directory: str, duplicate_dict: dict + task_key: str, future_obj: Future, cache_directory: str, duplicate_dict: Optional[dict] = None ) -> Future: """ Check the output of a task and set the result of the future object if available. @@ -218,7 +218,7 @@ def _check_task_output( future_obj.set_result(result) elif exec_flag: future_obj.set_exception(result) - if task_key in duplicate_dict: + if duplicate_dict is not None and task_key in duplicate_dict: for duplicate_future in duplicate_dict[task_key]: if exec_flag and no_error_flag: duplicate_future.set_result(result) @@ -295,7 +295,7 @@ def _refresh_memory_dict( memory_dict: dict, cache_dir_dict: dict, process_dict: dict, - duplicate_dict: dict, + duplicate_dict: Optional[dict] = None, terminate_function: Optional[Callable] = None, pysqa_config_directory: Optional[str] = None, backend: Optional[str] = None, @@ -412,12 +412,12 @@ def _cancel_futures(future_dict: dict): def _shutdown_executor( - wait: bool, - cancel_futures: bool, - memory_dict: dict, - process_dict: dict, - duplicate_dict: dict, - cache_dir_dict: dict, + wait: bool = True, + cancel_futures: bool = False, + memory_dict: Optional[dict] = None, + process_dict: Optional[dict] = None, + cache_dir_dict: Optional[dict] = None, + duplicate_dict: Optional[dict] = None, terminate_function: Optional[Callable] = None, pysqa_config_directory: Optional[str] = None, backend: Optional[str] = None, diff --git a/tests/unit/executor/test_api.py b/tests/unit/executor/test_api.py index 1a3bc167d..5fadc6ebd 100644 --- a/tests/unit/executor/test_api.py +++ b/tests/unit/executor/test_api.py @@ -213,7 +213,7 @@ def test_shutdown_executor_function(self): cancel_futures=True, memory_dict=memory_dict, process_dict={}, - duplicate_dict={}, + duplicate_dict=None, cache_dir_dict={"a": "cache_dir"}, terminate_function=None, pysqa_config_directory=None, diff --git a/tests/unit/task_scheduler/file/test_backend.py b/tests/unit/task_scheduler/file/test_backend.py index 4e1a1ecf3..bf189f7e3 100644 --- a/tests/unit/task_scheduler/file/test_backend.py +++ b/tests/unit/task_scheduler/file/test_backend.py @@ -50,7 +50,7 @@ def test_execute_function_mixed(self): backend_execute_task_in_file(file_name=file_name) future_obj = Future() _check_task_output( - task_key=task_key, future_obj=future_obj, cache_directory=cache_directory, duplicate_dict={}, + task_key=task_key, future_obj=future_obj, cache_directory=cache_directory, ) self.assertTrue(future_obj.done()) self.assertEqual(future_obj.result(), 3) @@ -101,7 +101,7 @@ def test_execute_function_mixed_selector_convert(self): backend_execute_task_in_file(file_name=file_name_1) f1 = Future() _check_task_output( - task_key=task_key_1, future_obj=f1, cache_directory=cache_directory, duplicate_dict={}, + task_key=task_key_1, future_obj=f1, cache_directory=cache_directory, ) task_key_2, data_dict = serialize_funct( fn=return_list, @@ -113,7 +113,7 @@ def test_execute_function_mixed_selector_convert(self): backend_execute_task_in_file(file_name=file_name_2) f2 = Future() _check_task_output( - task_key=task_key_2, future_obj=f2, cache_directory=cache_directory, duplicate_dict={}, + task_key=task_key_2, future_obj=f2, cache_directory=cache_directory, ) fs1 = FutureSelector(future=f1, selector="a") fs2 = FutureSelector(future=f2, selector=1) @@ -143,7 +143,7 @@ def test_execute_function_args(self): backend_execute_task_in_file(file_name=file_name) future_obj = Future() _check_task_output( - task_key=task_key, future_obj=future_obj, cache_directory=cache_directory, duplicate_dict={}, + task_key=task_key, future_obj=future_obj, cache_directory=cache_directory, ) self.assertTrue(future_obj.done()) self.assertEqual(future_obj.result(), 3) @@ -170,7 +170,7 @@ def test_execute_function_kwargs(self): backend_execute_task_in_file(file_name=file_name) future_obj = Future() _check_task_output( - task_key=task_key, future_obj=future_obj, cache_directory=cache_directory, duplicate_dict={}, + task_key=task_key, future_obj=future_obj, cache_directory=cache_directory, ) self.assertTrue(future_obj.done()) self.assertEqual(future_obj.result(), 3) @@ -198,7 +198,7 @@ def test_execute_function_error(self): backend_execute_task_in_file(file_name=file_name) future_obj = Future() _check_task_output( - task_key=task_key, future_obj=future_obj, cache_directory=cache_directory, duplicate_dict={}, + task_key=task_key, future_obj=future_obj, cache_directory=cache_directory, ) self.assertTrue(future_obj.done()) with self.assertRaises(ValueError): From ae6e978b5109be0a9717d502c45d022dfe383ea4 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 3 Jun 2026 08:18:34 +0000 Subject: [PATCH 10/14] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/executorlib/task_scheduler/file/shared.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/executorlib/task_scheduler/file/shared.py b/src/executorlib/task_scheduler/file/shared.py index af4a56b1f..990d80cf5 100644 --- a/src/executorlib/task_scheduler/file/shared.py +++ b/src/executorlib/task_scheduler/file/shared.py @@ -196,7 +196,10 @@ def execute_tasks_h5( def _check_task_output( - task_key: str, future_obj: Future, cache_directory: str, duplicate_dict: Optional[dict] = None + task_key: str, + future_obj: Future, + cache_directory: str, + duplicate_dict: Optional[dict] = None, ) -> Future: """ Check the output of a task and set the result of the future object if available. From eb1b5e8f893fd83abf01badb8cd59cdbeb0706a9 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 3 Jun 2026 10:22:04 +0200 Subject: [PATCH 11/14] fixes --- src/executorlib/task_scheduler/file/shared.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/executorlib/task_scheduler/file/shared.py b/src/executorlib/task_scheduler/file/shared.py index 990d80cf5..5f27daa8d 100644 --- a/src/executorlib/task_scheduler/file/shared.py +++ b/src/executorlib/task_scheduler/file/shared.py @@ -415,11 +415,11 @@ def _cancel_futures(future_dict: dict): def _shutdown_executor( - wait: bool = True, - cancel_futures: bool = False, - memory_dict: Optional[dict] = None, - process_dict: Optional[dict] = None, - cache_dir_dict: Optional[dict] = None, + wait: bool, + cancel_futures: bool, + memory_dict: dict, + process_dict: dict, + cache_dir_dict: dict, duplicate_dict: Optional[dict] = None, terminate_function: Optional[Callable] = None, pysqa_config_directory: Optional[str] = None, From 72f690c17a9977917addb4c8b07f4499bca9ac90 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 3 Jun 2026 10:28:59 +0200 Subject: [PATCH 12/14] fix coverage --- src/executorlib/task_scheduler/file/shared.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/src/executorlib/task_scheduler/file/shared.py b/src/executorlib/task_scheduler/file/shared.py index 5f27daa8d..606e8fc28 100644 --- a/src/executorlib/task_scheduler/file/shared.py +++ b/src/executorlib/task_scheduler/file/shared.py @@ -217,19 +217,18 @@ def _check_task_output( if not os.path.exists(file_name): return future_obj exec_flag, no_error_flag, result = get_output(file_name=file_name) - if exec_flag and no_error_flag: - future_obj.set_result(result) - elif exec_flag: - future_obj.set_exception(result) + _update_future(future_obj=future_obj, exec_flag=exec_flag, no_error_flag=no_error_flag, result=result) if duplicate_dict is not None and task_key in duplicate_dict: for duplicate_future in duplicate_dict[task_key]: - if exec_flag and no_error_flag: - duplicate_future.set_result(result) - elif exec_flag: - duplicate_future.set_exception(result) + _update_future(future_obj=duplicate_future, exec_flag=exec_flag, no_error_flag=no_error_flag, result=result) del duplicate_dict[task_key] return future_obj +def _update_future(future_obj: Future, exec_flag: bool, no_error_flag: bool, result: Any) -> None: + if exec_flag and no_error_flag: + future_obj.set_result(result) + elif exec_flag: + future_obj.set_exception(result) def _convert_args_and_kwargs( task_dict: dict, memory_dict: dict, file_name_dict: dict From fe9c8bffb20e70284e586c2ac95e478dc5160b83 Mon Sep 17 00:00:00 2001 From: pyiron-runner Date: Wed, 3 Jun 2026 08:29:56 +0000 Subject: [PATCH 13/14] Format black --- src/executorlib/task_scheduler/file/shared.py | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/src/executorlib/task_scheduler/file/shared.py b/src/executorlib/task_scheduler/file/shared.py index 606e8fc28..38c8e5b03 100644 --- a/src/executorlib/task_scheduler/file/shared.py +++ b/src/executorlib/task_scheduler/file/shared.py @@ -217,19 +217,33 @@ def _check_task_output( if not os.path.exists(file_name): return future_obj exec_flag, no_error_flag, result = get_output(file_name=file_name) - _update_future(future_obj=future_obj, exec_flag=exec_flag, no_error_flag=no_error_flag, result=result) + _update_future( + future_obj=future_obj, + exec_flag=exec_flag, + no_error_flag=no_error_flag, + result=result, + ) if duplicate_dict is not None and task_key in duplicate_dict: for duplicate_future in duplicate_dict[task_key]: - _update_future(future_obj=duplicate_future, exec_flag=exec_flag, no_error_flag=no_error_flag, result=result) + _update_future( + future_obj=duplicate_future, + exec_flag=exec_flag, + no_error_flag=no_error_flag, + result=result, + ) del duplicate_dict[task_key] return future_obj -def _update_future(future_obj: Future, exec_flag: bool, no_error_flag: bool, result: Any) -> None: + +def _update_future( + future_obj: Future, exec_flag: bool, no_error_flag: bool, result: Any +) -> None: if exec_flag and no_error_flag: future_obj.set_result(result) elif exec_flag: future_obj.set_exception(result) + def _convert_args_and_kwargs( task_dict: dict, memory_dict: dict, file_name_dict: dict ) -> tuple[list, dict, list]: From d1701ef31aa5de20daca5b1ef9548faf735c9211 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Wed, 3 Jun 2026 10:40:40 +0200 Subject: [PATCH 14/14] Add docstring --- src/executorlib/task_scheduler/file/shared.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/executorlib/task_scheduler/file/shared.py b/src/executorlib/task_scheduler/file/shared.py index 38c8e5b03..ae7b43b71 100644 --- a/src/executorlib/task_scheduler/file/shared.py +++ b/src/executorlib/task_scheduler/file/shared.py @@ -238,6 +238,15 @@ def _check_task_output( def _update_future( future_obj: Future, exec_flag: bool, no_error_flag: bool, result: Any ) -> None: + """ + Update the future object with the result of the task execution. + + Args: + future_obj (Future): The future object to be updated. + exec_flag (bool): Flag indicating whether the task has been executed. + no_error_flag (bool): Flag indicating whether the task execution resulted in an error. + result (Any): The result of the task execution. + """ if exec_flag and no_error_flag: future_obj.set_result(result) elif exec_flag: