From c6f52d0846cce786e4a3ac6419a4304dd19a09f4 Mon Sep 17 00:00:00 2001 From: Ben McKerry <110857332+bmckerry@users.noreply.github.com> Date: Wed, 17 Jun 2026 15:20:44 -0400 Subject: [PATCH 1/3] ref(worker): add more observability for producer futures --- .../src/taskbroker_client/worker/producer.py | 23 ++--- .../taskbroker_client/worker/workerchild.py | 85 ++++++++++++++++--- clients/python/tests/worker/test_producer.py | 8 +- clients/python/tests/worker/test_worker.py | 16 ++-- 4 files changed, 102 insertions(+), 30 deletions(-) diff --git a/clients/python/src/taskbroker_client/worker/producer.py b/clients/python/src/taskbroker_client/worker/producer.py index 607263d9..09347a5c 100644 --- a/clients/python/src/taskbroker_client/worker/producer.py +++ b/clients/python/src/taskbroker_client/worker/producer.py @@ -1,5 +1,5 @@ import atexit -from collections import deque +from collections import defaultdict, deque from collections.abc import Callable from concurrent.futures import Future from typing import Any, Sequence @@ -14,10 +14,10 @@ # This is global as TaskWorker needs to be able to call TaskProducer.collect_futures() # without a reference to a task's specific instance of TaskProducer. -# Has a max_len to prevent unbounded future growth if TaskProducer.collect_futures() -# is never called. -_pending_futures: deque[ProducerFuture[BrokerValue[KafkaPayload]]] = deque( - maxlen=TASK_PRODUCER_MAX_PENDING_FUTURES +# Keys are the names of each `TaskProducer` instance in the current process, values are +# deques with a maxlen to prevent unbounded queue size if `collect_futures()` is never called. +_pending_futures: defaultdict[str, deque[ProducerFuture[BrokerValue[KafkaPayload]]]] = defaultdict( + lambda: deque(maxlen=TASK_PRODUCER_MAX_PENDING_FUTURES) ) @@ -55,18 +55,21 @@ def _get(self) -> CloseableProducerProtocol: return self._inner_producer def track_future(self, future: ProducerFuture[BrokerValue[KafkaPayload]]) -> None: - _pending_futures.append(future) + _pending_futures[self.name].append(future) self.metrics.gauge( "task.producer.pending.futures", - len(_pending_futures), + len(_pending_futures[self.name]), tags={"producer_name": self.name}, ) @staticmethod - def collect_futures() -> set[ProducerFuture[BrokerValue[KafkaPayload]]]: - futures = _pending_futures.copy() + def collect_futures() -> dict[str, set[ProducerFuture[BrokerValue[KafkaPayload]]]]: + """ + Clears the `_pending_futures` dict, and returns a copy with all values converted to sets. + """ + pending_copy = _pending_futures.copy() _pending_futures.clear() - return set(futures) + return {name: set(val) for name, val in pending_copy.items()} def produce( self, diff --git a/clients/python/src/taskbroker_client/worker/workerchild.py b/clients/python/src/taskbroker_client/worker/workerchild.py index 9dd27c70..7db4521a 100644 --- a/clients/python/src/taskbroker_client/worker/workerchild.py +++ b/clients/python/src/taskbroker_client/worker/workerchild.py @@ -66,7 +66,7 @@ class ActivationWithPendingFutures: status: TaskActivationStatus.ValueType execution_start_time: float futures_start_time: float - pending_futures: set[ProducerFuture[BrokerValue[KafkaPayload]]] + pending_futures: dict[str, set[ProducerFuture[BrokerValue[KafkaPayload]]]] task_func: Task[Any, Any] @@ -263,24 +263,27 @@ def await_task_futures(task: ActivationWithPendingFutures) -> None: and submitted for retry (if the policy allows). """ RESULT_TIMEOUT_SEC = 1 + error_name: str = "N/A" try: # We don't care about the actual result value, # we just care if result() raises or not - [f.result(RESULT_TIMEOUT_SEC) for f in task.pending_futures] + [f.result(RESULT_TIMEOUT_SEC) for fut in task.pending_futures.values() for f in fut] produce_status = "success" # If any pending producer futures failed, retry the task - except Exception: + except Exception as e: task.status = TASK_ACTIVATION_STATUS_FAILURE if task.task_func.retry: retry_state = task.inflight.activation.retry_state if not task.task_func.retry.max_attempts_reached(retry_state): task.status = TASK_ACTIVATION_STATUS_RETRY produce_status = "failure" + error_name = type(e).__name__ metrics.incr( "taskworker.worker.produce.result", tags={ "status": produce_status, "processing_pool": processing_pool_name, + "error_name": error_name, }, ) pending_task_futures.remove(task) @@ -288,10 +291,22 @@ def await_task_futures(task: ActivationWithPendingFutures) -> None: inflight=task.inflight, next_state=task.status, execution_start_time=task.execution_start_time, + execution_end_time=task.futures_start_time, task_func=task.task_func, futures_start_time=task.futures_start_time, ) + def get_oldest_pending_activation() -> ActivationWithPendingFutures | None: + oldest: ActivationWithPendingFutures | None = None + max_age = 0.0 + now = time.time() + for task in pending_task_futures: + age = now - task.futures_start_time + if age > max_age: + max_age = age + oldest = task + return oldest + def check_task_future_completion() -> None: if len(pending_task_futures) > 0: # Records how many activations with pending producer futures @@ -304,8 +319,31 @@ def check_task_future_completion() -> None: }, ) for task in pending_task_futures.copy(): - if all([f.done() for f in task.pending_futures]): + future_status = [f.done() for fut in task.pending_futures.values() for f in fut] + if all(future_status): await_task_futures(task) + else: + # How many futures are still pending in this task + metrics.gauge( + "taskworker.task.incomplete_futures", + len([f for f in future_status if not f]), + tags={ + "processing_pool": processing_pool_name, + "namespace": task.inflight.activation.namespace, + "taskname": task.inflight.activation.taskname, + }, + ) + # How long has the oldest pending task been sitting in the queue + if oldest := get_oldest_pending_activation(): + metrics.gauge( + "taskworker.worker.oldest_pending_activation_age", + time.time() - oldest.futures_start_time, + tags={ + "processing_pool": processing_pool_name, + "namespace": oldest.inflight.activation.namespace, + "taskname": oldest.inflight.activation.taskname, + }, + ) while not shutdown_event.is_set() and not local_shutdown.is_set(): if max_task_count and processed_task_count >= max_task_count: @@ -463,16 +501,30 @@ def check_task_future_completion() -> None: # If the task function itself failed, we don't need to await any # producer futures since it'll be retried anyways if next_state != TASK_ACTIVATION_STATUS_COMPLETE: - task_produced_futures = set() + task_produced_futures = {} if len(task_produced_futures) == 0: _task_execution_complete( inflight, next_state, execution_start_time, + time.time(), task_func, ) else: + for name, futures in task_produced_futures.items(): + # How many futures were produced in the executed task, + # tagged by producer name + metrics.gauge( + "taskworker.task.futures_produced", + len(futures), + tags={ + "producer_name": name, + "processing_pool": processing_pool_name, + "namespace": inflight.activation.namespace, + "taskname": inflight.activation.taskname, + }, + ) pending_task = ActivationWithPendingFutures( inflight=inflight, status=next_state, @@ -582,9 +634,7 @@ def record_task_execution( task_added_time = activation.received_at.ToDatetime().timestamp() execution_duration = completion_time - start_time execution_latency = completion_time - task_added_time - futures_duration = ( - completion_time - futures_enqueued_time if futures_enqueued_time else None - ) + futures_duration = time.time() - futures_enqueued_time if futures_enqueued_time else 0 logger.debug( "taskworker.task_execution", @@ -625,7 +675,7 @@ def record_task_execution( "taskbroker_host": taskbroker_host, }, ) - if futures_duration: + if futures_duration != 0: metrics.distribution( "taskworker.worker.future_completion_duration", futures_duration, @@ -636,6 +686,18 @@ def record_task_execution( "taskbroker_host": taskbroker_host, }, ) + # Latency between task execution start and all producer futures completing + # (i.e. how long it took to put the task in the processed_tasks queue) + metrics.distribution( + "taskworker.worker.e2e_latency", + (execution_duration + futures_duration), + tags={ + "namespace": activation.namespace, + "taskname": activation.taskname, + "processing_pool": processing_pool_name, + "taskbroker_host": taskbroker_host, + }, + ) namespace = app.get_namespace(activation.namespace) metrics.incr( @@ -663,11 +725,10 @@ def _task_execution_complete( inflight: InflightTaskActivation, next_state: TaskActivationStatus.ValueType, execution_start_time: float, + execution_end_time: float, task_func: Task[Any, Any] | None, futures_start_time: float | None = None, ) -> None: - # Get completion time before pushing to queue, so we can measure queue append time - execution_complete_time = time.time() with metrics.timer( "taskworker.worker.processed_tasks.put.duration", tags={ @@ -709,7 +770,7 @@ def _task_execution_complete( inflight.activation, next_state, execution_start_time, - execution_complete_time, + execution_end_time, processing_pool_name, inflight.host, futures_start_time, diff --git a/clients/python/tests/worker/test_producer.py b/clients/python/tests/worker/test_producer.py index e8eafd4b..afb6e52c 100644 --- a/clients/python/tests/worker/test_producer.py +++ b/clients/python/tests/worker/test_producer.py @@ -57,7 +57,8 @@ def test_producer_tracks_futures() -> None: producer = TaskProducer("test.producer", partial(get_dummy_producer, use_simple_futures=True)) producer.produce(Topic("test"), make_kafka_payload()) assert len(_pending_futures) == 1 - future = next(iter(TaskProducer.collect_futures())) + collected = TaskProducer.collect_futures() + future = next(iter(collected["test.producer"])) assert future.result() == make_broker_value() assert len(_pending_futures) == 0 @@ -70,7 +71,8 @@ def callback(future: Future[BrokerValue[KafkaPayload]]) -> None: received.append(future) producer.produce(Topic("test"), make_kafka_payload(), callbacks=[callback]) - tracked_future = next(iter(TaskProducer.collect_futures())) + collected = TaskProducer.collect_futures() + tracked_future = next(iter(collected["test.producer"])) assert len(received) == 1 assert received[0] is tracked_future @@ -91,4 +93,4 @@ def test_pending_futures_max_len() -> None: producer = TaskProducer("test.producer", partial(get_dummy_producer, use_simple_futures=True)) for _ in range(10001): producer.produce(Topic("test"), make_kafka_payload()) - assert len(_pending_futures) == 10000 + assert len(_pending_futures["test.producer"]) == 10000 diff --git a/clients/python/tests/worker/test_worker.py b/clients/python/tests/worker/test_worker.py index 2ce3130a..eaecf269 100644 --- a/clients/python/tests/worker/test_worker.py +++ b/clients/python/tests/worker/test_worker.py @@ -1503,7 +1503,7 @@ def test_child_process_tracks_producer_futures( todo.put(task) with mock.patch.object( - TaskProducer, "collect_futures", return_value={done_future} + TaskProducer, "collect_futures", return_value={"test.producer": {done_future}} ) as collect_mock: child_process( "examples.app:app", @@ -1549,7 +1549,9 @@ def observe_and_resolve() -> None: observer = threading.Thread(target=observe_and_resolve, name="future-observer") observer.start() try: - with mock.patch.object(TaskProducer, "collect_futures", return_value={pending_future}): + with mock.patch.object( + TaskProducer, "collect_futures", return_value={"test.producer": {pending_future}} + ): child_process( "examples.app:app", todo, @@ -1593,7 +1595,9 @@ def deliver_sigterm() -> None: sigterm_thread = threading.Thread(target=deliver_sigterm, name="sigterm-sender") sigterm_thread.start() try: - with mock.patch.object(TaskProducer, "collect_futures", return_value={pending_future}): + with mock.patch.object( + TaskProducer, "collect_futures", return_value={"test.producer": {pending_future}} + ): child_process( "examples.app:app", todo, @@ -1639,7 +1643,9 @@ def test_child_process_retries_on_failed_future( failed_future.set_exception(RuntimeError("kafka produce failed")) todo.put(retriable_task) - with mock.patch.object(TaskProducer, "collect_futures", return_value={failed_future}): + with mock.patch.object( + TaskProducer, "collect_futures", return_value={"test.producer": {failed_future}} + ): child_process( "examples.app:app", todo, @@ -1660,7 +1666,7 @@ def test_child_process_clears_pending_futures_when_task_fails( ) -> None: leftover_future: Future[BrokerValue[KafkaPayload]] = Future() leftover_future.set_result(_make_broker_value()) - _pending_futures.append(leftover_future) + _pending_futures["test.producer"].append(leftover_future) assert len(_pending_futures) == 1 todo: queue.Queue[InflightTaskActivation] = queue.Queue() From 3e6aa192ec9e9f4efd40c74c6d69921eda6a7a8f Mon Sep 17 00:00:00 2001 From: Ben McKerry <110857332+bmckerry@users.noreply.github.com> Date: Wed, 17 Jun 2026 16:35:12 -0400 Subject: [PATCH 2/3] temp: bypass awaiting futures while still recording metrics --- .../taskbroker_client/worker/workerchild.py | 87 ++++++--- clients/python/tests/worker/test_worker.py | 184 +++++++++--------- 2 files changed, 150 insertions(+), 121 deletions(-) diff --git a/clients/python/src/taskbroker_client/worker/workerchild.py b/clients/python/src/taskbroker_client/worker/workerchild.py index 7db4521a..44c8d791 100644 --- a/clients/python/src/taskbroker_client/worker/workerchild.py +++ b/clients/python/src/taskbroker_client/worker/workerchild.py @@ -287,6 +287,8 @@ def await_task_futures(task: ActivationWithPendingFutures) -> None: }, ) pending_task_futures.remove(task) + # FIXME(benm): Use passthrough option to get future-related metrics + # without placing a duplicate ProcessingResult _task_execution_complete( inflight=task.inflight, next_state=task.status, @@ -294,6 +296,7 @@ def await_task_futures(task: ActivationWithPendingFutures) -> None: execution_end_time=task.futures_start_time, task_func=task.task_func, futures_start_time=task.futures_start_time, + passthrough=True, ) def get_oldest_pending_activation() -> ActivationWithPendingFutures | None: @@ -512,6 +515,13 @@ def check_task_future_completion() -> None: task_func, ) else: + # FIXME(benm): Temporarily bypass producer futures needing to be completed + # before writing to `processed_tasks` while still recording metrics. + _place_processing_result( + inflight, + next_state, + task_func, + ) for name, futures in task_produced_futures.items(): # How many futures were produced in the executed task, # tagged by producer name @@ -721,6 +731,43 @@ def record_task_execution( status=monitor_status, ) + def _place_processing_result( + inflight: InflightTaskActivation, + next_state: TaskActivationStatus.ValueType, + task_func: Task[Any, Any] | None, + ) -> None: + if task_func and task_func.retry and next_state == TASK_ACTIVATION_STATUS_RETRY: + processed_tasks.put( + ProcessingResult( + task_id=inflight.activation.id, + status=next_state, + host=inflight.host, + receive_timestamp=inflight.receive_timestamp, + # Send max_attempts and delay_on_retry if this is a retry. + # Don't send it on every task as this codepath is relatively + # unoptimized on the broker side. + max_attempts=( + task_func.retry._times + 1 + if task_func.retry and next_state == TASK_ACTIVATION_STATUS_RETRY + else None + ), + delay_on_retry=( + task_func.retry._delay + if task_func.retry and next_state == TASK_ACTIVATION_STATUS_RETRY + else None + ), + ) + ) + else: + processed_tasks.put( + ProcessingResult( + task_id=inflight.activation.id, + status=next_state, + host=inflight.host, + receive_timestamp=inflight.receive_timestamp, + ) + ) + def _task_execution_complete( inflight: InflightTaskActivation, next_state: TaskActivationStatus.ValueType, @@ -728,6 +775,11 @@ def _task_execution_complete( execution_end_time: float, task_func: Task[Any, Any] | None, futures_start_time: float | None = None, + # FIXME(benm): Temp option to skip placing a task in processed_tasks. + # This is for tasks with pending producer futures, as we still want to record + # metrics as usual but want to have a `ProcessingResult` placed immediately + # while we troubleshoot why futures are never being marked as done. + passthrough: bool = False, ) -> None: with metrics.timer( "taskworker.worker.processed_tasks.put.duration", @@ -735,36 +787,11 @@ def _task_execution_complete( "processing_pool": processing_pool_name, }, ): - if task_func and task_func.retry and next_state == TASK_ACTIVATION_STATUS_RETRY: - processed_tasks.put( - ProcessingResult( - task_id=inflight.activation.id, - status=next_state, - host=inflight.host, - receive_timestamp=inflight.receive_timestamp, - # Send max_attempts and delay_on_retry if this is a retry. - # Don't send it on every task as this codepath is relatively - # unoptimized on the broker side. - max_attempts=( - task_func.retry._times + 1 - if task_func.retry and next_state == TASK_ACTIVATION_STATUS_RETRY - else None - ), - delay_on_retry=( - task_func.retry._delay - if task_func.retry and next_state == TASK_ACTIVATION_STATUS_RETRY - else None - ), - ) - ) - else: - processed_tasks.put( - ProcessingResult( - task_id=inflight.activation.id, - status=next_state, - host=inflight.host, - receive_timestamp=inflight.receive_timestamp, - ) + if not passthrough: + _place_processing_result( + inflight, + next_state, + task_func, ) record_task_execution( inflight.activation, diff --git a/clients/python/tests/worker/test_worker.py b/clients/python/tests/worker/test_worker.py index eaecf269..65e565ed 100644 --- a/clients/python/tests/worker/test_worker.py +++ b/clients/python/tests/worker/test_worker.py @@ -1523,54 +1523,55 @@ def test_child_process_tracks_producer_futures( assert result.status == TASK_ACTIVATION_STATUS_COMPLETE -def test_child_process_holds_result_until_futures_done( - clear_pending_futures: None, restore_signal_handlers: None -) -> None: - task = _producing_task() - todo: queue.Queue[InflightTaskActivation] = queue.Queue() - processed: queue.Queue[ProcessingResult] = queue.Queue() - shutdown = Event() - - pending_future: Future[BrokerValue[KafkaPayload]] = Future() - todo.put(task) - - # `child_process` calls `signal.signal`, which must run on the main thread. - # Use a helper thread to observe the queue while the future is still - # pending, then resolve the future so the drain can complete. - observed_empty_while_pending = threading.Event() - - def observe_and_resolve() -> None: - # Wait for child_process to process the task and enter the drain loop. - time.sleep(0.5) - if processed.qsize() == 0: - observed_empty_while_pending.set() - pending_future.set_result(_make_broker_value()) - - observer = threading.Thread(target=observe_and_resolve, name="future-observer") - observer.start() - try: - with mock.patch.object( - TaskProducer, "collect_futures", return_value={"test.producer": {pending_future}} - ): - child_process( - "examples.app:app", - todo, - processed, - shutdown, - max_task_count=1, - processing_pool_name="test", - process_type="fork", - ) - finally: - observer.join(timeout=5) - shutdown.set() - - assert ( - observed_empty_while_pending.is_set() - ), "result was pushed before the producer future was resolved" - result = processed.get(timeout=5) - assert result.task_id == task.activation.id - assert result.status == TASK_ACTIVATION_STATUS_COMPLETE +# FIXME(benm): Skip this test while we're bypassing awaiting future completion +# def test_child_process_holds_result_until_futures_done( +# clear_pending_futures: None, restore_signal_handlers: None +# ) -> None: +# task = _producing_task() +# todo: queue.Queue[InflightTaskActivation] = queue.Queue() +# processed: queue.Queue[ProcessingResult] = queue.Queue() +# shutdown = Event() + +# pending_future: Future[BrokerValue[KafkaPayload]] = Future() +# todo.put(task) + +# # `child_process` calls `signal.signal`, which must run on the main thread. +# # Use a helper thread to observe the queue while the future is still +# # pending, then resolve the future so the drain can complete. +# observed_empty_while_pending = threading.Event() + +# def observe_and_resolve() -> None: +# # Wait for child_process to process the task and enter the drain loop. +# time.sleep(0.5) +# if processed.qsize() == 0: +# observed_empty_while_pending.set() +# pending_future.set_result(_make_broker_value()) + +# observer = threading.Thread(target=observe_and_resolve, name="future-observer") +# observer.start() +# try: +# with mock.patch.object( +# TaskProducer, "collect_futures", return_value={"test.producer": {pending_future}} +# ): +# child_process( +# "examples.app:app", +# todo, +# processed, +# shutdown, +# max_task_count=1, +# processing_pool_name="test", +# process_type="fork", +# ) +# finally: +# observer.join(timeout=5) +# shutdown.set() + +# assert ( +# observed_empty_while_pending.is_set() +# ), "result was pushed before the producer future was resolved" +# result = processed.get(timeout=5) +# assert result.task_id == task.activation.id +# assert result.status == TASK_ACTIVATION_STATUS_COMPLETE def test_child_process_drains_pending_futures_on_sigterm( @@ -1616,49 +1617,50 @@ def deliver_sigterm() -> None: assert result.status == TASK_ACTIVATION_STATUS_COMPLETE -def test_child_process_retries_on_failed_future( - clear_pending_futures: None, restore_signal_handlers: None -) -> None: - retriable_task = InflightTaskActivation( - host="localhost:50051", - receive_timestamp=0, - activation=TaskActivation( - id="failed-future-retry", - taskname="examples.will_retry", - namespace="examples", - parameters=orjson.dumps({"args": ["noop"], "kwargs": {}}).decode("utf8"), - processing_deadline_duration=2, - retry_state=RetryState( - attempts=0, - max_attempts=3, - on_attempts_exceeded=ON_ATTEMPTS_EXCEEDED_DISCARD, - ), - ), - ) - todo: queue.Queue[InflightTaskActivation] = queue.Queue() - processed: queue.Queue[ProcessingResult] = queue.Queue() - shutdown = Event() - - failed_future: Future[BrokerValue[KafkaPayload]] = Future() - failed_future.set_exception(RuntimeError("kafka produce failed")) - - todo.put(retriable_task) - with mock.patch.object( - TaskProducer, "collect_futures", return_value={"test.producer": {failed_future}} - ): - child_process( - "examples.app:app", - todo, - processed, - shutdown, - max_task_count=1, - processing_pool_name="test", - process_type="fork", - ) - - result = processed.get(timeout=5) - assert result.task_id == retriable_task.activation.id - assert result.status == TASK_ACTIVATION_STATUS_RETRY +# FIXME(benm): Skip this test while we're bypassing awaiting future completion +# def test_child_process_retries_on_failed_future( +# clear_pending_futures: None, restore_signal_handlers: None +# ) -> None: +# retriable_task = InflightTaskActivation( +# host="localhost:50051", +# receive_timestamp=0, +# activation=TaskActivation( +# id="failed-future-retry", +# taskname="examples.will_retry", +# namespace="examples", +# parameters=orjson.dumps({"args": ["noop"], "kwargs": {}}).decode("utf8"), +# processing_deadline_duration=2, +# retry_state=RetryState( +# attempts=0, +# max_attempts=3, +# on_attempts_exceeded=ON_ATTEMPTS_EXCEEDED_DISCARD, +# ), +# ), +# ) +# todo: queue.Queue[InflightTaskActivation] = queue.Queue() +# processed: queue.Queue[ProcessingResult] = queue.Queue() +# shutdown = Event() + +# failed_future: Future[BrokerValue[KafkaPayload]] = Future() +# failed_future.set_exception(RuntimeError("kafka produce failed")) + +# todo.put(retriable_task) +# with mock.patch.object( +# TaskProducer, "collect_futures", return_value={"test.producer": {failed_future}} +# ): +# child_process( +# "examples.app:app", +# todo, +# processed, +# shutdown, +# max_task_count=1, +# processing_pool_name="test", +# process_type="fork", +# ) + +# result = processed.get(timeout=5) +# assert result.task_id == retriable_task.activation.id +# assert result.status == TASK_ACTIVATION_STATUS_RETRY def test_child_process_clears_pending_futures_when_task_fails( From 758f08c77d1e35692c7cf72ed35da6a130035add Mon Sep 17 00:00:00 2001 From: Ben McKerry <110857332+bmckerry@users.noreply.github.com> Date: Thu, 18 Jun 2026 12:19:09 -0400 Subject: [PATCH 3/3] fix timer metric --- .../taskbroker_client/worker/workerchild.py | 82 +++++++++---------- 1 file changed, 41 insertions(+), 41 deletions(-) diff --git a/clients/python/src/taskbroker_client/worker/workerchild.py b/clients/python/src/taskbroker_client/worker/workerchild.py index d3cc06e2..a63634e4 100644 --- a/clients/python/src/taskbroker_client/worker/workerchild.py +++ b/clients/python/src/taskbroker_client/worker/workerchild.py @@ -736,37 +736,43 @@ def _place_processing_result( next_state: TaskActivationStatus.ValueType, task_func: Task[Any, Any] | None, ) -> None: - if task_func and task_func.retry and next_state == TASK_ACTIVATION_STATUS_RETRY: - processed_tasks.put( - ProcessingResult( - task_id=inflight.activation.id, - status=next_state, - host=inflight.host, - receive_timestamp=inflight.receive_timestamp, - # Send max_attempts and delay_on_retry if this is a retry. - # Don't send it on every task as this codepath is relatively - # unoptimized on the broker side. - max_attempts=( - task_func.retry._times + 1 - if task_func.retry and next_state == TASK_ACTIVATION_STATUS_RETRY - else None - ), - delay_on_retry=( - task_func.retry._delay - if task_func.retry and next_state == TASK_ACTIVATION_STATUS_RETRY - else None - ), + with metrics.timer( + "taskworker.worker.processed_tasks.put.duration", + tags={ + "processing_pool": processing_pool_name, + }, + ): + if task_func and task_func.retry and next_state == TASK_ACTIVATION_STATUS_RETRY: + processed_tasks.put( + ProcessingResult( + task_id=inflight.activation.id, + status=next_state, + host=inflight.host, + receive_timestamp=inflight.receive_timestamp, + # Send max_attempts and delay_on_retry if this is a retry. + # Don't send it on every task as this codepath is relatively + # unoptimized on the broker side. + max_attempts=( + task_func.retry._times + 1 + if task_func.retry and next_state == TASK_ACTIVATION_STATUS_RETRY + else None + ), + delay_on_retry=( + task_func.retry._delay + if task_func.retry and next_state == TASK_ACTIVATION_STATUS_RETRY + else None + ), + ) ) - ) - else: - processed_tasks.put( - ProcessingResult( - task_id=inflight.activation.id, - status=next_state, - host=inflight.host, - receive_timestamp=inflight.receive_timestamp, + else: + processed_tasks.put( + ProcessingResult( + task_id=inflight.activation.id, + status=next_state, + host=inflight.host, + receive_timestamp=inflight.receive_timestamp, + ) ) - ) def _task_execution_complete( inflight: InflightTaskActivation, @@ -781,18 +787,12 @@ def _task_execution_complete( # while we troubleshoot why futures are never being marked as done. passthrough: bool = False, ) -> None: - with metrics.timer( - "taskworker.worker.processed_tasks.put.duration", - tags={ - "processing_pool": processing_pool_name, - }, - ): - if not passthrough: - _place_processing_result( - inflight, - next_state, - task_func, - ) + if not passthrough: + _place_processing_result( + inflight, + next_state, + task_func, + ) record_task_execution( inflight.activation, next_state,