diff --git a/google/cloud/storage/internal/async/writer_connection_impl.cc b/google/cloud/storage/internal/async/writer_connection_impl.cc index 8e579cfe64cd7..a97a62960e3f6 100644 --- a/google/cloud/storage/internal/async/writer_connection_impl.cc +++ b/google/cloud/storage/internal/async/writer_connection_impl.cc @@ -94,9 +94,16 @@ AsyncWriterConnectionImpl::~AsyncWriterConnectionImpl() { // (2) calls to `Write()`, `Finalize()`, and `Query()` must have completed // by the time the destructor is called Finish(); + + // We use a local copy of `impl` moved under lock to avoid + // data races with concurrent calls to `Finish()` from callbacks. + std::unique_lock lk(mu_); + auto impl = std::move(impl_); + lk.unlock(); + // When `impl_->Finish()` is satisfied then `finished_` is satisfied too. // This extends the lifetime of `impl_` until it is safe to delete. - finished_.then([impl = std::move(impl_)](auto) mutable { + finished_.then([impl = std::move(impl)](auto) mutable { // Break the ownership cycle between the completion queue and this callback. impl.reset(); }); @@ -213,7 +220,11 @@ AsyncWriterConnectionImpl::OnFinalUpload(std::size_t upload_size, .then(transform); } offset_ += upload_size; - return impl_->Read() + + std::unique_lock lk(mu_); + auto impl = impl_; + lk.unlock(); + return impl->Read() .then([this](auto f) { return OnQuery(f.get()); }) .then([this](auto g) -> StatusOr { auto status = g.get(); @@ -256,11 +267,15 @@ future> AsyncWriterConnectionImpl::OnQuery( } future AsyncWriterConnectionImpl::Finish() { + std::unique_lock lk(mu_); if (std::exchange(finish_called_, true)) { return make_ready_future( internal::CancelledError("already finished", GCP_ERROR_INFO())); } - return impl_->Finish().then([p = std::move(on_finish_)](auto f) mutable { + auto impl = impl_; + lk.unlock(); + + return impl->Finish().then([p = std::move(on_finish_)](auto f) mutable { p.set_value(); return f.get(); }); diff --git a/google/cloud/storage/internal/async/writer_connection_impl.h b/google/cloud/storage/internal/async/writer_connection_impl.h index 46e505c20b827..4cc1e979f1fe0 100644 --- a/google/cloud/storage/internal/async/writer_connection_impl.h +++ b/google/cloud/storage/internal/async/writer_connection_impl.h @@ -24,6 +24,7 @@ #include #include #include +#include namespace google { namespace cloud { @@ -111,6 +112,8 @@ class AsyncWriterConnectionImpl : public storage::AsyncWriterConnection { // Track the latest write handle seen in responses. absl::optional latest_write_handle_; + + std::mutex mu_; }; GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END