From 3787ba3ca494b06c3575d8bebc7b0bfc6fe8e8c4 Mon Sep 17 00:00:00 2001 From: Gauri Kalra Date: Fri, 15 May 2026 10:44:09 +0000 Subject: [PATCH] feat(storage): Add idempotency tokens on async path --- .../storage/internal/async/connection_impl.cc | 29 ++++---- .../internal/async/connection_impl_test.cc | 72 ++++++++++++++++++- .../internal/grpc/configure_client_context.cc | 4 ++ .../internal/grpc/configure_client_context.h | 3 + .../grpc/configure_client_context_test.cc | 8 +++ 5 files changed, 102 insertions(+), 14 deletions(-) diff --git a/google/cloud/storage/internal/async/connection_impl.cc b/google/cloud/storage/internal/async/connection_impl.cc index 409a45fbf1f35..7e123e61ce06c 100644 --- a/google/cloud/storage/internal/async/connection_impl.cc +++ b/google/cloud/storage/internal/async/connection_impl.cc @@ -169,7 +169,7 @@ future> AsyncConnectionImpl::InsertObject( auto hash_function = CreateHashFunction(*options); ApplyRoutingHeaders(*context, request.write_object_spec()); - context->AddMetadata("x-goog-gcs-idempotency-token", id); + AddIdempotencyToken(*context, id); auto rpc = stub->AsyncWriteObject(cq, std::move(context), options); rpc = std::make_unique(cq, timeout, timeout, std::move(rpc)); @@ -336,13 +336,14 @@ AsyncConnectionImpl::AppendableObjectUploadImpl(AppendableUploadParams p) { // NOLINTNEXTLINE(bugprone-lambda-function-name) backoff = std::move(backoff), current, function_name = __func__, // Use shared_ptr to propagate RoutingHeaderOptions across retries. - current_routing_options = std::make_shared()]( + current_routing_options = std::make_shared(), + id = invocation_id_generator_.MakeInvocationId()]( google::storage::v2::BidiWriteObjectRequest req) { - auto call = [stub, request = std::move(req), current_routing_options]( - CompletionQueue& cq, - std::shared_ptr context, - google::cloud::internal::ImmutableOptions options, - RequestPlaceholder const&) mutable + auto call = [stub, request = std::move(req), current_routing_options, + id](CompletionQueue& cq, + std::shared_ptr context, + google::cloud::internal::ImmutableOptions options, + RequestPlaceholder const&) mutable -> future> { auto start_timeout = ScaleStallTimeout( options->get(), @@ -359,6 +360,8 @@ AsyncConnectionImpl::AppendableObjectUploadImpl(AppendableUploadParams p) { ApplyRoutingHeaders(*context, request.append_object_spec(), *current_routing_options); + AddIdempotencyToken(*context, id); + auto rpc = stub->AsyncBidiWriteObject(cq, std::move(context), std::move(options)); rpc = std::make_unique( @@ -507,11 +510,12 @@ AsyncConnectionImpl::ComposeObject(ComposeObjectParams p) { auto current = internal::MakeImmutableOptions(std::move(p.options)); auto const idempotency = idempotency_policy(*current)->ComposeObject(p.request); - auto call = [stub = stub_]( + auto call = [stub = stub_, id = invocation_id_generator_.MakeInvocationId()]( CompletionQueue& cq, std::shared_ptr context, google::cloud::internal::ImmutableOptions options, google::storage::v2::ComposeObjectRequest const& request) { + AddIdempotencyToken(*context, id); return stub->AsyncComposeObject(cq, std::move(context), std::move(options), request); }; @@ -530,10 +534,11 @@ future AsyncConnectionImpl::DeleteObject(DeleteObjectParams p) { auto backoff = backoff_policy(*current); return google::cloud::internal::AsyncRetryLoop( std::move(retry), std::move(backoff), idempotency, cq_, - [stub = stub_](CompletionQueue& cq, - std::shared_ptr context, - google::cloud::internal::ImmutableOptions options, - google::storage::v2::DeleteObjectRequest const& proto) { + [stub = stub_, id = invocation_id_generator_.MakeInvocationId()]( + CompletionQueue& cq, std::shared_ptr context, + google::cloud::internal::ImmutableOptions options, + google::storage::v2::DeleteObjectRequest const& proto) { + AddIdempotencyToken(*context, id); return stub->AsyncDeleteObject(cq, std::move(context), std::move(options), proto); }, diff --git a/google/cloud/storage/internal/async/connection_impl_test.cc b/google/cloud/storage/internal/async/connection_impl_test.cc index 03a3738d3a3b9..741e1ec92905e 100644 --- a/google/cloud/storage/internal/async/connection_impl_test.cc +++ b/google/cloud/storage/internal/async/connection_impl_test.cc @@ -15,6 +15,7 @@ #include "google/cloud/storage/internal/async/connection_impl.h" #include "google/cloud/storage/async/idempotency_policy.h" #include "google/cloud/storage/async/retry_policy.h" +#include "google/cloud/storage/async/writer_connection.h" #include "google/cloud/storage/internal/async/default_options.h" #include "google/cloud/storage/internal/async/write_payload_impl.h" #include "google/cloud/storage/options.h" @@ -105,9 +106,14 @@ TEST_F(AsyncConnectionImplTest, ComposeObject) { return StatusOr(TransientError()); }); }) - .WillOnce([&](CompletionQueue&, auto, + .WillOnce([&](CompletionQueue&, + std::shared_ptr const& context, google::cloud::internal::ImmutableOptions const& options, google::storage::v2::ComposeObjectRequest const& request) { + EXPECT_THAT( + GetMetadata(*context), + testing::Contains(testing::Pair("x-goog-gcs-idempotency-token", + testing::Not(testing::IsEmpty())))); // Verify at least one option is initialized with the correct value. EXPECT_EQ(options->get(), kAuthority); auto expected = google::storage::v2::ComposeObjectRequest{}; @@ -205,9 +211,14 @@ TEST_F(AsyncConnectionImplTest, DeleteObject) { return TransientError(); }); }) - .WillOnce([&](CompletionQueue&, auto, + .WillOnce([&](CompletionQueue&, + std::shared_ptr const& context, google::cloud::internal::ImmutableOptions const& options, google::storage::v2::DeleteObjectRequest const& request) { + EXPECT_THAT( + GetMetadata(*context), + testing::Contains(testing::Pair("x-goog-gcs-idempotency-token", + testing::Not(testing::IsEmpty())))); // Verify at least one option is initialized with the correct values. EXPECT_EQ(options->get(), kAuthority); google::storage::v2::DeleteObjectRequest expected; @@ -344,6 +355,63 @@ TEST_F(AsyncConnectionImplTest, RewriteObject) { EXPECT_THAT(r1.get(), IsOkAndHolds(match_progress(1000, 3000))); } +TEST_F(AsyncConnectionImplTest, AppendableObjectUploadToken) { + auto constexpr kRequestText = R"pb( + write_object_spec { + resource { + bucket: "projects/_/buckets/test-bucket" + name: "test-object" + content_type: "text/plain" + } + } + )pb"; + AsyncSequencer sequencer; + auto mock = std::make_shared(); + + EXPECT_CALL(*mock, AsyncBidiWriteObject) + .WillOnce([&](CompletionQueue const&, + std::shared_ptr const& context, + internal::ImmutableOptions const&) { + EXPECT_THAT( + GetMetadata(*context), + testing::Contains(testing::Pair("x-goog-gcs-idempotency-token", + testing::Not(testing::IsEmpty())))); + + auto stream = std::make_unique<::google::cloud::storage::testing:: + MockAsyncBidiWriteObjectStream>(); + EXPECT_CALL(*stream, Start).WillOnce([&] { + return sequencer.PushBack("Start"); + }); + EXPECT_CALL(*stream, Finish).WillOnce([&] { + return sequencer.PushBack("Finish").then( + [](auto) { return Status(StatusCode::kCancelled, "cancelled"); }); + }); + using AsyncBidiWriteObjectStream = + ::google::cloud::AsyncStreamingReadWriteRpc< + google::storage::v2::BidiWriteObjectRequest, + google::storage::v2::BidiWriteObjectResponse>; + return std::unique_ptr(std::move(stream)); + }); + + internal::AutomaticallyCreatedBackgroundThreads pool(1); + auto connection = MakeTestConnection(pool.cq(), mock); + + auto request = google::storage::v2::BidiWriteObjectRequest{}; + ASSERT_TRUE(TextFormat::ParseFromString(kRequestText, &request)); + auto pending = connection->StartAppendableObjectUpload( + {std::move(request), connection->options()}); + + auto next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Start"); + next.first.set_value(false); // Fail to start + + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Finish"); + next.first.set_value(true); + + EXPECT_THAT(pending.get(), StatusIs(StatusCode::kCancelled)); +} + } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage_internal diff --git a/google/cloud/storage/internal/grpc/configure_client_context.cc b/google/cloud/storage/internal/grpc/configure_client_context.cc index 2d77f99dced76..20ebc3a88f3ed 100644 --- a/google/cloud/storage/internal/grpc/configure_client_context.cc +++ b/google/cloud/storage/internal/grpc/configure_client_context.cc @@ -47,6 +47,10 @@ void AddIdempotencyToken(grpc::ClientContext& ctx, } } +void AddIdempotencyToken(grpc::ClientContext& ctx, std::string const& token) { + ctx.AddMetadata(kIdempotencyTokenHeader, token); +} + void ApplyRoutingHeaders( grpc::ClientContext& context, storage::internal::InsertObjectMediaRequest const& request) { diff --git a/google/cloud/storage/internal/grpc/configure_client_context.h b/google/cloud/storage/internal/grpc/configure_client_context.h index bbc4a4c80d226..b948330b811da 100644 --- a/google/cloud/storage/internal/grpc/configure_client_context.h +++ b/google/cloud/storage/internal/grpc/configure_client_context.h @@ -37,6 +37,9 @@ struct RoutingHeaderOptions { void AddIdempotencyToken(grpc::ClientContext& ctx, rest_internal::RestContext const& context); +/// Configures @p ctx using @p token. +void AddIdempotencyToken(grpc::ClientContext& ctx, std::string const& token); + /** * Inject request query parameters into grpc::ClientContext. * diff --git a/google/cloud/storage/internal/grpc/configure_client_context_test.cc b/google/cloud/storage/internal/grpc/configure_client_context_test.cc index 7e18790de74d3..e96a3d85fd24a 100644 --- a/google/cloud/storage/internal/grpc/configure_client_context_test.cc +++ b/google/cloud/storage/internal/grpc/configure_client_context_test.cc @@ -55,6 +55,14 @@ TEST_F(GrpcConfigureClientContext, AddIdempotencyToken) { Contains(Pair("x-goog-gcs-idempotency-token", "token-123"))); } +TEST_F(GrpcConfigureClientContext, AddIdempotencyTokenString) { + grpc::ClientContext ctx; + AddIdempotencyToken(ctx, "token-123"); + auto metadata = GetMetadata(ctx); + EXPECT_THAT(metadata, + Contains(Pair("x-goog-gcs-idempotency-token", "token-123"))); +} + TEST_F(GrpcConfigureClientContext, ApplyQueryParametersEmpty) { grpc::ClientContext ctx; ApplyQueryParameters(