Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 17 additions & 12 deletions google/cloud/storage/internal/async/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ future<StatusOr<google::storage::v2::Object>> AsyncConnectionImpl::InsertObject(

auto hash_function = CreateHashFunction(*options);
ApplyRoutingHeaders(*context, request.write_object_spec());
context->AddMetadata("x-goog-gcs-idempotency-token", id);
AddIdempotencyToken(*context, id);
auto rpc = stub->AsyncWriteObject(cq, std::move(context), options);
rpc = std::make_unique<StreamingRpcTimeout>(cq, timeout, timeout,
std::move(rpc));
Expand Down Expand Up @@ -336,13 +336,14 @@ AsyncConnectionImpl::AppendableObjectUploadImpl(AppendableUploadParams p) {
// NOLINTNEXTLINE(bugprone-lambda-function-name)
backoff = std::move(backoff), current, function_name = __func__,
// Use shared_ptr to propagate RoutingHeaderOptions across retries.
current_routing_options = std::make_shared<RoutingHeaderOptions>()](
current_routing_options = std::make_shared<RoutingHeaderOptions>(),
id = invocation_id_generator_.MakeInvocationId()](
google::storage::v2::BidiWriteObjectRequest req) {
auto call = [stub, request = std::move(req), current_routing_options](
CompletionQueue& cq,
std::shared_ptr<grpc::ClientContext> context,
google::cloud::internal::ImmutableOptions options,
RequestPlaceholder const&) mutable
auto call = [stub, request = std::move(req), current_routing_options,
id](CompletionQueue& cq,
std::shared_ptr<grpc::ClientContext> context,
google::cloud::internal::ImmutableOptions options,
RequestPlaceholder const&) mutable
-> future<StatusOr<WriteObject::WriteResult>> {
auto start_timeout = ScaleStallTimeout(
options->get<storage::TransferStallTimeoutOption>(),
Expand All @@ -359,6 +360,8 @@ AsyncConnectionImpl::AppendableObjectUploadImpl(AppendableUploadParams p) {
ApplyRoutingHeaders(*context, request.append_object_spec(),
*current_routing_options);

AddIdempotencyToken(*context, id);

auto rpc = stub->AsyncBidiWriteObject(cq, std::move(context),
std::move(options));
rpc = std::make_unique<StreamingRpcTimeout>(
Expand Down Expand Up @@ -507,11 +510,12 @@ AsyncConnectionImpl::ComposeObject(ComposeObjectParams p) {
auto current = internal::MakeImmutableOptions(std::move(p.options));
auto const idempotency =
idempotency_policy(*current)->ComposeObject(p.request);
auto call = [stub = stub_](
auto call = [stub = stub_, id = invocation_id_generator_.MakeInvocationId()](
CompletionQueue& cq,
std::shared_ptr<grpc::ClientContext> context,
google::cloud::internal::ImmutableOptions options,
google::storage::v2::ComposeObjectRequest const& request) {
AddIdempotencyToken(*context, id);
return stub->AsyncComposeObject(cq, std::move(context), std::move(options),
request);
};
Expand All @@ -530,10 +534,11 @@ future<Status> AsyncConnectionImpl::DeleteObject(DeleteObjectParams p) {
auto backoff = backoff_policy(*current);
return google::cloud::internal::AsyncRetryLoop(
std::move(retry), std::move(backoff), idempotency, cq_,
[stub = stub_](CompletionQueue& cq,
std::shared_ptr<grpc::ClientContext> context,
google::cloud::internal::ImmutableOptions options,
google::storage::v2::DeleteObjectRequest const& proto) {
[stub = stub_, id = invocation_id_generator_.MakeInvocationId()](
CompletionQueue& cq, std::shared_ptr<grpc::ClientContext> context,
google::cloud::internal::ImmutableOptions options,
google::storage::v2::DeleteObjectRequest const& proto) {
AddIdempotencyToken(*context, id);
return stub->AsyncDeleteObject(cq, std::move(context),
std::move(options), proto);
},
Expand Down
72 changes: 70 additions & 2 deletions google/cloud/storage/internal/async/connection_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "google/cloud/storage/internal/async/connection_impl.h"
#include "google/cloud/storage/async/idempotency_policy.h"
#include "google/cloud/storage/async/retry_policy.h"
#include "google/cloud/storage/async/writer_connection.h"
#include "google/cloud/storage/internal/async/default_options.h"
#include "google/cloud/storage/internal/async/write_payload_impl.h"
#include "google/cloud/storage/options.h"
Expand Down Expand Up @@ -105,9 +106,14 @@ TEST_F(AsyncConnectionImplTest, ComposeObject) {
return StatusOr<google::storage::v2::Object>(TransientError());
});
})
.WillOnce([&](CompletionQueue&, auto,
.WillOnce([&](CompletionQueue&,
std::shared_ptr<grpc::ClientContext> context,
google::cloud::internal::ImmutableOptions const& options,
google::storage::v2::ComposeObjectRequest const& request) {
EXPECT_THAT(
GetMetadata(*context),
testing::Contains(testing::Pair("x-goog-gcs-idempotency-token",
testing::Not(testing::IsEmpty()))));
// Verify at least one option is initialized with the correct value.
EXPECT_EQ(options->get<AuthorityOption>(), kAuthority);
auto expected = google::storage::v2::ComposeObjectRequest{};
Expand Down Expand Up @@ -205,9 +211,14 @@ TEST_F(AsyncConnectionImplTest, DeleteObject) {
return TransientError();
});
})
.WillOnce([&](CompletionQueue&, auto,
.WillOnce([&](CompletionQueue&,
std::shared_ptr<grpc::ClientContext> context,
google::cloud::internal::ImmutableOptions const& options,
google::storage::v2::DeleteObjectRequest const& request) {
EXPECT_THAT(
GetMetadata(*context),
testing::Contains(testing::Pair("x-goog-gcs-idempotency-token",
testing::Not(testing::IsEmpty()))));
// Verify at least one option is initialized with the correct values.
EXPECT_EQ(options->get<AuthorityOption>(), kAuthority);
google::storage::v2::DeleteObjectRequest expected;
Expand Down Expand Up @@ -344,6 +355,63 @@ TEST_F(AsyncConnectionImplTest, RewriteObject) {
EXPECT_THAT(r1.get(), IsOkAndHolds(match_progress(1000, 3000)));
}

TEST_F(AsyncConnectionImplTest, AppendableObjectUploadToken) {
auto constexpr kRequestText = R"pb(
write_object_spec {
resource {
bucket: "projects/_/buckets/test-bucket"
name: "test-object"
content_type: "text/plain"
}
}
)pb";
AsyncSequencer<bool> sequencer;
auto mock = std::make_shared<storage::testing::MockStorageStub>();

EXPECT_CALL(*mock, AsyncBidiWriteObject)
.WillOnce([&](CompletionQueue const&,
std::shared_ptr<grpc::ClientContext> context,
internal::ImmutableOptions const&) {
EXPECT_THAT(
GetMetadata(*context),
testing::Contains(testing::Pair("x-goog-gcs-idempotency-token",
testing::Not(testing::IsEmpty()))));

auto stream = std::make_unique<::google::cloud::storage::testing::
MockAsyncBidiWriteObjectStream>();
EXPECT_CALL(*stream, Start).WillOnce([&] {
return sequencer.PushBack("Start");
});
EXPECT_CALL(*stream, Finish).WillOnce([&] {
return sequencer.PushBack("Finish").then(
[](auto) { return Status(StatusCode::kCancelled, "cancelled"); });
});
using AsyncBidiWriteObjectStream =
::google::cloud::AsyncStreamingReadWriteRpc<
google::storage::v2::BidiWriteObjectRequest,
google::storage::v2::BidiWriteObjectResponse>;
return std::unique_ptr<AsyncBidiWriteObjectStream>(std::move(stream));
});

internal::AutomaticallyCreatedBackgroundThreads pool(1);
auto connection = MakeTestConnection(pool.cq(), mock);

auto request = google::storage::v2::BidiWriteObjectRequest{};
ASSERT_TRUE(TextFormat::ParseFromString(kRequestText, &request));
auto pending = connection->StartAppendableObjectUpload(
{std::move(request), connection->options()});

auto next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Start");
next.first.set_value(false); // Fail to start

next = sequencer.PopFrontWithName();
EXPECT_EQ(next.second, "Finish");
next.first.set_value(true);

EXPECT_THAT(pending.get(), StatusIs(StatusCode::kCancelled));
}

} // namespace
GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
} // namespace storage_internal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ void AddIdempotencyToken(grpc::ClientContext& ctx,
}
}

void AddIdempotencyToken(grpc::ClientContext& ctx, std::string const& token) {
ctx.AddMetadata(kIdempotencyTokenHeader, token);
}

void ApplyRoutingHeaders(
grpc::ClientContext& context,
storage::internal::InsertObjectMediaRequest const& request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ struct RoutingHeaderOptions {
void AddIdempotencyToken(grpc::ClientContext& ctx,
rest_internal::RestContext const& context);

/// Configures @p ctx using @p token.
void AddIdempotencyToken(grpc::ClientContext& ctx, std::string const& token);

/**
* Inject request query parameters into grpc::ClientContext.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ TEST_F(GrpcConfigureClientContext, AddIdempotencyToken) {
Contains(Pair("x-goog-gcs-idempotency-token", "token-123")));
}

TEST_F(GrpcConfigureClientContext, AddIdempotencyTokenString) {
grpc::ClientContext ctx;
AddIdempotencyToken(ctx, "token-123");
auto metadata = GetMetadata(ctx);
EXPECT_THAT(metadata,
Contains(Pair("x-goog-gcs-idempotency-token", "token-123")));
}

TEST_F(GrpcConfigureClientContext, ApplyQueryParametersEmpty) {
grpc::ClientContext ctx;
ApplyQueryParameters(
Expand Down
Loading