diff --git a/clients/python/src/taskbroker_client/worker/workerchild.py b/clients/python/src/taskbroker_client/worker/workerchild.py index 7d23669f..a63634e4 100644 --- a/clients/python/src/taskbroker_client/worker/workerchild.py +++ b/clients/python/src/taskbroker_client/worker/workerchild.py @@ -287,6 +287,8 @@ def await_task_futures(task: ActivationWithPendingFutures) -> None: }, ) pending_task_futures.remove(task) + # FIXME(benm): Use passthrough option to get future-related metrics + # without placing a duplicate ProcessingResult _task_execution_complete( inflight=task.inflight, next_state=task.status, @@ -294,6 +296,7 @@ def await_task_futures(task: ActivationWithPendingFutures) -> None: execution_end_time=task.futures_start_time, task_func=task.task_func, futures_start_time=task.futures_start_time, + passthrough=True, ) def get_oldest_pending_activation() -> ActivationWithPendingFutures | None: @@ -512,6 +515,13 @@ def check_task_future_completion() -> None: task_func, ) else: + # FIXME(benm): Temporarily bypass producer futures needing to be completed + # before writing to `processed_tasks` while still recording metrics. + _place_processing_result( + inflight, + next_state, + task_func, + ) for name, futures in task_produced_futures.items(): # How many futures were produced in the executed task, # tagged by producer name @@ -721,13 +731,10 @@ def record_task_execution( status=monitor_status, ) - def _task_execution_complete( + def _place_processing_result( inflight: InflightTaskActivation, next_state: TaskActivationStatus.ValueType, - execution_start_time: float, - execution_end_time: float, task_func: Task[Any, Any] | None, - futures_start_time: float | None = None, ) -> None: with metrics.timer( "taskworker.worker.processed_tasks.put.duration", @@ -766,6 +773,26 @@ def _task_execution_complete( receive_timestamp=inflight.receive_timestamp, ) ) + + def _task_execution_complete( + inflight: InflightTaskActivation, + next_state: TaskActivationStatus.ValueType, + execution_start_time: float, + execution_end_time: float, + task_func: Task[Any, Any] | None, + futures_start_time: float | None = None, + # FIXME(benm): Temp option to skip placing a task in processed_tasks. + # This is for tasks with pending producer futures, as we still want to record + # metrics as usual but want to have a `ProcessingResult` placed immediately + # while we troubleshoot why futures are never being marked as done. + passthrough: bool = False, + ) -> None: + if not passthrough: + _place_processing_result( + inflight, + next_state, + task_func, + ) record_task_execution( inflight.activation, next_state, diff --git a/clients/python/tests/worker/test_worker.py b/clients/python/tests/worker/test_worker.py index eaecf269..65e565ed 100644 --- a/clients/python/tests/worker/test_worker.py +++ b/clients/python/tests/worker/test_worker.py @@ -1523,54 +1523,55 @@ def test_child_process_tracks_producer_futures( assert result.status == TASK_ACTIVATION_STATUS_COMPLETE -def test_child_process_holds_result_until_futures_done( - clear_pending_futures: None, restore_signal_handlers: None -) -> None: - task = _producing_task() - todo: queue.Queue[InflightTaskActivation] = queue.Queue() - processed: queue.Queue[ProcessingResult] = queue.Queue() - shutdown = Event() - - pending_future: Future[BrokerValue[KafkaPayload]] = Future() - todo.put(task) - - # `child_process` calls `signal.signal`, which must run on the main thread. - # Use a helper thread to observe the queue while the future is still - # pending, then resolve the future so the drain can complete. - observed_empty_while_pending = threading.Event() - - def observe_and_resolve() -> None: - # Wait for child_process to process the task and enter the drain loop. - time.sleep(0.5) - if processed.qsize() == 0: - observed_empty_while_pending.set() - pending_future.set_result(_make_broker_value()) - - observer = threading.Thread(target=observe_and_resolve, name="future-observer") - observer.start() - try: - with mock.patch.object( - TaskProducer, "collect_futures", return_value={"test.producer": {pending_future}} - ): - child_process( - "examples.app:app", - todo, - processed, - shutdown, - max_task_count=1, - processing_pool_name="test", - process_type="fork", - ) - finally: - observer.join(timeout=5) - shutdown.set() - - assert ( - observed_empty_while_pending.is_set() - ), "result was pushed before the producer future was resolved" - result = processed.get(timeout=5) - assert result.task_id == task.activation.id - assert result.status == TASK_ACTIVATION_STATUS_COMPLETE +# FIXME(benm): Skip this test while we're bypassing awaiting future completion +# def test_child_process_holds_result_until_futures_done( +# clear_pending_futures: None, restore_signal_handlers: None +# ) -> None: +# task = _producing_task() +# todo: queue.Queue[InflightTaskActivation] = queue.Queue() +# processed: queue.Queue[ProcessingResult] = queue.Queue() +# shutdown = Event() + +# pending_future: Future[BrokerValue[KafkaPayload]] = Future() +# todo.put(task) + +# # `child_process` calls `signal.signal`, which must run on the main thread. +# # Use a helper thread to observe the queue while the future is still +# # pending, then resolve the future so the drain can complete. +# observed_empty_while_pending = threading.Event() + +# def observe_and_resolve() -> None: +# # Wait for child_process to process the task and enter the drain loop. +# time.sleep(0.5) +# if processed.qsize() == 0: +# observed_empty_while_pending.set() +# pending_future.set_result(_make_broker_value()) + +# observer = threading.Thread(target=observe_and_resolve, name="future-observer") +# observer.start() +# try: +# with mock.patch.object( +# TaskProducer, "collect_futures", return_value={"test.producer": {pending_future}} +# ): +# child_process( +# "examples.app:app", +# todo, +# processed, +# shutdown, +# max_task_count=1, +# processing_pool_name="test", +# process_type="fork", +# ) +# finally: +# observer.join(timeout=5) +# shutdown.set() + +# assert ( +# observed_empty_while_pending.is_set() +# ), "result was pushed before the producer future was resolved" +# result = processed.get(timeout=5) +# assert result.task_id == task.activation.id +# assert result.status == TASK_ACTIVATION_STATUS_COMPLETE def test_child_process_drains_pending_futures_on_sigterm( @@ -1616,49 +1617,50 @@ def deliver_sigterm() -> None: assert result.status == TASK_ACTIVATION_STATUS_COMPLETE -def test_child_process_retries_on_failed_future( - clear_pending_futures: None, restore_signal_handlers: None -) -> None: - retriable_task = InflightTaskActivation( - host="localhost:50051", - receive_timestamp=0, - activation=TaskActivation( - id="failed-future-retry", - taskname="examples.will_retry", - namespace="examples", - parameters=orjson.dumps({"args": ["noop"], "kwargs": {}}).decode("utf8"), - processing_deadline_duration=2, - retry_state=RetryState( - attempts=0, - max_attempts=3, - on_attempts_exceeded=ON_ATTEMPTS_EXCEEDED_DISCARD, - ), - ), - ) - todo: queue.Queue[InflightTaskActivation] = queue.Queue() - processed: queue.Queue[ProcessingResult] = queue.Queue() - shutdown = Event() - - failed_future: Future[BrokerValue[KafkaPayload]] = Future() - failed_future.set_exception(RuntimeError("kafka produce failed")) - - todo.put(retriable_task) - with mock.patch.object( - TaskProducer, "collect_futures", return_value={"test.producer": {failed_future}} - ): - child_process( - "examples.app:app", - todo, - processed, - shutdown, - max_task_count=1, - processing_pool_name="test", - process_type="fork", - ) - - result = processed.get(timeout=5) - assert result.task_id == retriable_task.activation.id - assert result.status == TASK_ACTIVATION_STATUS_RETRY +# FIXME(benm): Skip this test while we're bypassing awaiting future completion +# def test_child_process_retries_on_failed_future( +# clear_pending_futures: None, restore_signal_handlers: None +# ) -> None: +# retriable_task = InflightTaskActivation( +# host="localhost:50051", +# receive_timestamp=0, +# activation=TaskActivation( +# id="failed-future-retry", +# taskname="examples.will_retry", +# namespace="examples", +# parameters=orjson.dumps({"args": ["noop"], "kwargs": {}}).decode("utf8"), +# processing_deadline_duration=2, +# retry_state=RetryState( +# attempts=0, +# max_attempts=3, +# on_attempts_exceeded=ON_ATTEMPTS_EXCEEDED_DISCARD, +# ), +# ), +# ) +# todo: queue.Queue[InflightTaskActivation] = queue.Queue() +# processed: queue.Queue[ProcessingResult] = queue.Queue() +# shutdown = Event() + +# failed_future: Future[BrokerValue[KafkaPayload]] = Future() +# failed_future.set_exception(RuntimeError("kafka produce failed")) + +# todo.put(retriable_task) +# with mock.patch.object( +# TaskProducer, "collect_futures", return_value={"test.producer": {failed_future}} +# ): +# child_process( +# "examples.app:app", +# todo, +# processed, +# shutdown, +# max_task_count=1, +# processing_pool_name="test", +# process_type="fork", +# ) + +# result = processed.get(timeout=5) +# assert result.task_id == retriable_task.activation.id +# assert result.status == TASK_ACTIVATION_STATUS_RETRY def test_child_process_clears_pending_futures_when_task_fails(