diff --git a/src/brpc/channel.cpp b/src/brpc/channel.cpp index a8caeaf953..66c9ae3b5c 100644 --- a/src/brpc/channel.cpp +++ b/src/brpc/channel.cpp @@ -443,11 +443,18 @@ static void HandleBackupRequest(void* arg) { bthread_id_error(correlation_id, EBACKUPREQUEST); } -void Channel::CallMethod(const google::protobuf::MethodDescriptor* method, - google::protobuf::RpcController* controller_base, - const google::protobuf::Message* request, - google::protobuf::Message* response, - google::protobuf::Closure* done) { +template +void Channel::CallMethodInternal(const typename std::conditional::type* method, + google::protobuf::RpcController* controller_base, + const typename std::conditional::type* request, + typename std::conditional::type* response, + google::protobuf::Closure* done) { const int64_t start_send_real_us = butil::gettimeofday_us(); Controller* cntl = static_cast(controller_base); cntl->OnRPCBegin(start_send_real_us); @@ -507,22 +514,38 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method, const int64_t start_send_us = butil::cpuwide_time_us(); std::string method_name; if (_get_method_name) { - method_name = butil::EnsureString(_get_method_name(method, cntl)); + if (is_pb) { + auto pb_method = reinterpret_cast(method); + method_name = butil::EnsureString(_get_method_name(pb_method, cntl)); + } else { + // FlatBuffers doesn't support _get_method_name yet + method_name = ""; + } + } else if (method) { - method_name = butil::EnsureString(method->full_name()); + if (is_pb) { + auto pb_method = reinterpret_cast(method); + method_name = butil::EnsureString(pb_method->full_name()); + } else { + auto fb_method = reinterpret_cast(method); + method_name = butil::EnsureString(fb_method->full_name()); + } + } else { const static std::string NULL_METHOD_STR = "null-method"; method_name = NULL_METHOD_STR; } - std::shared_ptr span = Span::CreateClientSpan( + if (!method_name.empty()) { + std::shared_ptr span = Span::CreateClientSpan( method_name, start_send_real_us - start_send_us); - if (span) { - ControllerPrivateAccessor accessor(cntl); - span->set_log_id(cntl->log_id()); - span->set_base_cid(correlation_id); - span->set_protocol(_options.protocol); - span->set_start_send_us(start_send_us); - accessor.set_span(span); + if (span) { + ControllerPrivateAccessor accessor(cntl); + span->set_log_id(cntl->log_id()); + span->set_base_cid(correlation_id); + span->set_protocol(_options.protocol); + span->set_start_send_us(start_send_us); + accessor.set_span(span); + } } } // Override some options if they haven't been set by Controller @@ -541,11 +564,20 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method, if (cntl->connection_type() == CONNECTION_TYPE_UNKNOWN) { cntl->set_connection_type(_options.connection_type); } - cntl->_response = response; + cntl->_done = done; cntl->_pack_request = _pack_request; - cntl->_method = method; cntl->_auth = _options.auth; + // Use reinterpret_cast to avoid template instantiation errors + // The actual type is guaranteed by the is_pb parameter + if (is_pb) { + cntl->_method = reinterpret_cast(method); + cntl->_response = reinterpret_cast(response); + } else { + cntl->_fb_method = reinterpret_cast(method); + cntl->_fb_response = reinterpret_cast(response); + cntl->set_use_flatbuffer(); + } if (SingleServer()) { cntl->_single_server_id = _server_id; @@ -629,6 +661,22 @@ void Channel::CallMethod(const google::protobuf::MethodDescriptor* method, } } +void Channel::CallMethod(const google::protobuf::MethodDescriptor* method, + google::protobuf::RpcController* controller_base, + const google::protobuf::Message* request, + google::protobuf::Message* response, + google::protobuf::Closure* done) { + CallMethodInternal(method, controller_base, request, response, done); +} + +void Channel::FBCallMethod(const brpc::flatbuffers::MethodDescriptor* method, + google::protobuf::RpcController* controller_base, + const brpc::flatbuffers::Message* request, + brpc::flatbuffers::Message* response, + google::protobuf::Closure* done) { + CallMethodInternal(method, controller_base, request, response, done); +} + void Channel::Describe(std::ostream& os, const DescribeOptions& opt) const { os << "Channel["; if (SingleServer()) { @@ -658,4 +706,24 @@ int Channel::CheckHealth() { } } +// CallMethodInternal instance for pb and fb +template +void Channel::CallMethodInternal( + const google::protobuf::MethodDescriptor* method, + google::protobuf::RpcController* controller_base, + const google::protobuf::Message* request, + google::protobuf::Message* response, + google::protobuf::Closure* done +); + +// CallMethodInternal instance for pb and fb +template +void Channel::CallMethodInternal( + const brpc::flatbuffers::MethodDescriptor* method, + google::protobuf::RpcController* controller_base, + const brpc::flatbuffers::Message* request, + brpc::flatbuffers::Message* response, + google::protobuf::Closure* done +); + } // namespace brpc diff --git a/src/brpc/channel.h b/src/brpc/channel.h index 28a17ac8ea..368759cd98 100644 --- a/src/brpc/channel.h +++ b/src/brpc/channel.h @@ -38,6 +38,7 @@ #include "brpc/naming_service_filter.h" #include "brpc/health_check_option.h" #include "brpc/socket_mode.h" +#include "brpc/details/flatbuffers_impl.h" // Flatbuffers Protocol namespace brpc { @@ -175,7 +176,8 @@ struct ChannelOptions { // channel.Init("bns://rdev.matrix.all", "rr", NULL/*default options*/); // MyService_Stub stub(&channel); // stub.MyMethod(&controller, &request, &response, NULL); -class Channel : public ChannelBase { +class Channel : public ChannelBase, + public brpc::flatbuffers::RpcChannel { friend class Controller; friend class SelectiveChannel; public: @@ -225,6 +227,12 @@ friend class SelectiveChannel; google::protobuf::Closure* done); // Get current options. + void FBCallMethod(const brpc::flatbuffers::MethodDescriptor* method, + google::protobuf::RpcController* controller_base, + const brpc::flatbuffers::Message* request, + brpc::flatbuffers::Message* response, + google::protobuf::Closure* done); + const ChannelOptions& options() const { return _options; } void Describe(std::ostream&, const DescribeOptions&) const; @@ -250,6 +258,19 @@ friend class SelectiveChannel; const char* raw_server_address, const ChannelOptions* options, int raw_port = -1); + + template + inline void CallMethodInternal(const typename std::conditional::type* method, + google::protobuf::RpcController* controller_base, + const typename std::conditional::type* request, + typename std::conditional::type* response, + google::protobuf::Closure* done); std::string _service_name; std::string _scheme; diff --git a/src/brpc/channel_base.h b/src/brpc/channel_base.h index ed6ff24e40..3b5a13f8aa 100644 --- a/src/brpc/channel_base.h +++ b/src/brpc/channel_base.h @@ -24,6 +24,7 @@ #include "butil/logging.h" #include // google::protobuf::RpcChannel #include "brpc/describable.h" +#include "brpc/details/flatbuffers_common.h" // To brpc developers: This is a header included by user, don't depend // on internal structures, use opaque pointers instead. diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp index 15c8c91887..386b1e4b9a 100644 --- a/src/brpc/controller.cpp +++ b/src/brpc/controller.cpp @@ -276,6 +276,7 @@ void Controller::ResetPods() { _inheritable.Reset(); _pchan_sub_count = 0; _response = NULL; + _fb_response = NULL; _done = NULL; _sender = NULL; _request_code = 0; @@ -285,6 +286,7 @@ void Controller::ResetPods() { _accessed = NULL; _pack_request = NULL; _method = NULL; + _fb_method = NULL; _auth = NULL; _idl_names = idl_single_req_single_res; _idl_result = IDL_VOID_RESULT; @@ -1211,7 +1213,9 @@ void Controller::IssueRPC(int64_t start_realtime_us) { // Make request butil::IOBuf packet; SocketMessage* user_packet = NULL; - _pack_request(&packet, &user_packet, cid.value, _method, this, + const void *method_desc = is_use_flatbuffer()? (const void*)_fb_method : + (const void*)_method; + _pack_request(&packet, &user_packet, cid.value, method_desc, this, _request_buf, using_auth); // TODO: PackRequest may accept SocketMessagePtr<>? SocketMessagePtr<> user_packet_guard(user_packet); diff --git a/src/brpc/controller.h b/src/brpc/controller.h index 45f71b72f6..abffcb671a 100644 --- a/src/brpc/controller.h +++ b/src/brpc/controller.h @@ -48,6 +48,7 @@ #include "brpc/grpc.h" #include "brpc/kvmap.h" #include "brpc/rpc_dump.h" +#include "brpc/details/flatbuffers_common.h" // EAUTH is defined in MAC #ifndef EAUTH @@ -152,6 +153,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); static const uint32_t FLAGS_PB_SINGLE_REPEATED_TO_ARRAY = (1 << 20); static const uint32_t FLAGS_MANAGE_HTTP_BODY_ON_ERROR = (1 << 21); static const uint32_t FLAGS_WRITE_TO_SOCKET_IN_BACKGROUND = (1 << 22); + static const uint32_t FLAGS_USE_FLATBUFFER = (1 << 23); public: struct Inheritable { @@ -220,6 +222,8 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); // Response of the RPC call (passed to CallMethod) google::protobuf::Message* response() const { return _response; } + + brpc::flatbuffers::Message* fb_response() const { return _fb_response; } // An identifier to send to server along with request. This is widely used // throughout baidu's servers to tag a searching session (a series of @@ -293,6 +297,8 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); // Get the called method. May-be NULL for non-pb services. const google::protobuf::MethodDescriptor* method() const { return _method; } + const brpc::flatbuffers::MethodDescriptor* fb_method() const { return _fb_method; } + // Get the controllers for accessing sub channels in combo channels. // Ordinary channel: // sub_count() is 0 and sub() is always NULL. @@ -650,6 +656,9 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); // the received time of RPC is not recorded in the controller. int64_t get_rpc_received_us() const { return _rpc_received_us; } + void set_use_flatbuffer() { add_flag(FLAGS_USE_FLATBUFFER); } + bool is_use_flatbuffer() const { return has_flag(FLAGS_USE_FLATBUFFER); } + private: struct CompletionInfo { CallId id; // call_id of the corresponding request @@ -861,6 +870,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); Inheritable _inheritable; int _pchan_sub_count; google::protobuf::Message* _response; + brpc::flatbuffers::Message* _fb_response; google::protobuf::Closure* _done; RPCSender* _sender; uint64_t _request_code; @@ -879,6 +889,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); // Fields will be used when making requests Protocol::PackRequest _pack_request; const google::protobuf::MethodDescriptor* _method; + const brpc::flatbuffers::MethodDescriptor* _fb_method; const Authenticator* _auth; butil::IOBuf _request_buf; IdlNames _idl_names; diff --git a/src/brpc/details/controller_private_accessor.h b/src/brpc/details/controller_private_accessor.h index 0ad1aba640..db45d07710 100644 --- a/src/brpc/details/controller_private_accessor.h +++ b/src/brpc/details/controller_private_accessor.h @@ -125,6 +125,9 @@ class ControllerPrivateAccessor { void set_method(const google::protobuf::MethodDescriptor* method) { _cntl->_method = method; } + void set_fb_method(const brpc::flatbuffers::MethodDescriptor* method) + { _cntl->_fb_method = method; } + void set_readable_progressive_attachment(ReadableProgressiveAttachment* s) { _cntl->_rpa.reset(s); } diff --git a/src/brpc/details/server_private_accessor.h b/src/brpc/details/server_private_accessor.h index aacf283564..8d39f9c35f 100644 --- a/src/brpc/details/server_private_accessor.h +++ b/src/brpc/details/server_private_accessor.h @@ -86,6 +86,11 @@ class ServerPrivateAccessor { return _server->FindServicePropertyByName(name); } + const Server::FlatBuffersMethodProperty* FindFlatBuffersMethodPropertyByIndex( + uint32_t server_index, int method_index) const { + return _server->FindFlatBuffersMethodPropertyByIndex(server_index, method_index); + } + const Server::ServiceProperty* FindServicePropertyAdaptively(const butil::StringPiece& service_name) const { if (service_name.find('.') == butil::StringPiece::npos) { diff --git a/src/brpc/global.cpp b/src/brpc/global.cpp index 1f67aee20b..4a9d0e3204 100644 --- a/src/brpc/global.cpp +++ b/src/brpc/global.cpp @@ -83,6 +83,7 @@ #include "brpc/policy/nshead_mcpack_protocol.h" #include "brpc/policy/rtmp_protocol.h" #include "brpc/policy/esp_protocol.h" +#include "brpc/policy/flatbuffers_protocol.h" #ifdef ENABLE_THRIFT_FRAMED_PROTOCOL # include "brpc/policy/thrift_protocol.h" #endif @@ -427,6 +428,15 @@ static void GlobalInitializeOrDieImpl() { exit(1); } + Protocol fb_protocol = { ParseFlatBuffersMessage, + SerializeFlatBuffersRequest, PackFlatBuffersRequest, + ProcessFlatBuffersRequest, ProcessFlatBuffersResponse, + NULL, NULL, NULL, + CONNECTION_TYPE_SINGLE, "fb_rpc" }; + if (RegisterProtocol(PROTOCOL_FLATBUFFERS_RPC, fb_protocol) != 0) { + exit(1); + } + Protocol streaming_protocol = { ParseStreamingMessage, NULL, NULL, ProcessStreamingMessage, ProcessStreamingMessage, diff --git a/src/brpc/options.proto b/src/brpc/options.proto index 4ad97aa828..2cd9dcd349 100644 --- a/src/brpc/options.proto +++ b/src/brpc/options.proto @@ -65,6 +65,7 @@ enum ProtocolType { PROTOCOL_ESP = 25; // Client side only PROTOCOL_H2 = 26; PROTOCOL_COUCHBASE = 27; + PROTOCOL_FLATBUFFERS_RPC = 28; } enum CompressType { diff --git a/src/brpc/policy/flatbuffers_protocol.cpp b/src/brpc/policy/flatbuffers_protocol.cpp new file mode 100644 index 0000000000..a3ced453a3 --- /dev/null +++ b/src/brpc/policy/flatbuffers_protocol.cpp @@ -0,0 +1,480 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "butil/logging.h" // LOG() +#include "butil/iobuf.h" // butil::IOBuf +#include "butil/single_iobuf.h" // butil::SingleIOBuf +#include "butil/time.h" + +#include "butil/raw_pack.h" // RawPacker RawUnpacker + +#include "brpc/controller.h" // Controller +#include "brpc/socket.h" // Socket +#include "brpc/server.h" // Server +#include "brpc/stream_impl.h" +#include "brpc/rpc_dump.h" // SampledRequest +#include "brpc/policy/most_common_message.h" +#include "brpc/details/controller_private_accessor.h" +#include "brpc/details/server_private_accessor.h" +#include "brpc/policy/flatbuffers_protocol.h" + +namespace brpc { +namespace policy { + +struct FBRpcRequestMeta { + struct { + uint32_t service_index; + int32_t method_index; + } request; + int32_t message_size; + int32_t attachment_size; + int64_t correlation_id; +}__attribute__((packed)); + +struct FBRpcResponseMeta { + struct { + int32_t error_code; + } response; + int32_t message_size; + int32_t attachment_size; + int64_t correlation_id; +}__attribute__((packed)); + +struct FBRpcRequestHeader { + char header[12]; + struct FBRpcRequestMeta meta; +}__attribute__((packed)); + +struct FBRpcResponseHeader { + char header[12]; + struct FBRpcResponseMeta meta; +}__attribute__((packed)); + +bool inline ParseFbFromIOBuf(brpc::flatbuffers::Message* msg, size_t msg_size, const butil::IOBuf& buf) { + return brpc::flatbuffers::ParseFbFromIOBUF(msg, msg_size, buf); +} + +// Notes: +// 1. 12-byte header [FRPC][body_size][meta_size] +// 2. body_size and meta_size are in network byte order +// 3. Use service->service_index + method_index to specify the method to call +// 4. `attachment_size' is set iff request/response has attachment +// 5. Not supported: chunk_info + +// Pack header into `buf' + +static inline void PackFlatbuffersRpcHeader(char* rpc_header, int meta_size, int payload_size) { + // supress strict-aliasing warning. + uint32_t* dummy = (uint32_t*)rpc_header; + *dummy = *(uint32_t*)"FRPC"; + butil::RawPacker(rpc_header + 4) + .pack32(meta_size + payload_size) + .pack32(meta_size); +} + +static inline bool ParseMetaBufferFromIOBUF(butil::SingleIOBuf* dest, + const butil::IOBuf& source, uint32_t msg_size) { + return dest->assign(source, msg_size); +} + +ParseResult ParseFlatBuffersMessage(butil::IOBuf* source, Socket* socket, + bool /*read_eof*/, const void*) { + char header_buf[12]; + const size_t n = source->copy_to(header_buf, sizeof(header_buf)); + if (n >= 4) { + void* dummy = header_buf; + if (*(const uint32_t*)dummy != *(const uint32_t*)"FRPC") { + return MakeParseError(PARSE_ERROR_TRY_OTHERS); + } + + } else { + if (memcmp(header_buf, "FRPC", n) != 0) { + return MakeParseError(PARSE_ERROR_TRY_OTHERS); + } + } + + if (n < sizeof(header_buf)) { + return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA); + } + + uint32_t body_size; + uint32_t meta_size; + butil::RawUnpacker(header_buf + 4).unpack32(body_size).unpack32(meta_size); + if (body_size > FLAGS_max_body_size) { + // We need this log to report the body_size to give users some clues + // which is not printed in InputMessenger. + LOG(ERROR) << "body_size=" << body_size << " from " + << socket->remote_side() << " is too large"; + return MakeParseError(PARSE_ERROR_TOO_BIG_DATA); + } else if (source->length() < sizeof(header_buf) + body_size) { + return MakeParseError(PARSE_ERROR_NOT_ENOUGH_DATA); + } + if (meta_size > body_size) { + LOG(ERROR) << "meta_size=" << meta_size << " is bigger than body_size=" + << body_size; + // Pop the message + source->pop_front(sizeof(header_buf) + body_size); + return MakeParseError(PARSE_ERROR_TRY_OTHERS); + } + source->pop_front(sizeof(header_buf)); + MostCommonMessage* msg = MostCommonMessage::Get(); + source->cutn(&msg->meta, meta_size); + source->cutn(&msg->payload, body_size - meta_size); + return MakeMessage(msg); +} + +static void SendFlatBuffersRpcResponse(int64_t correlation_id, + Controller* cntl, + brpc::flatbuffers::Message* req, + brpc::flatbuffers::Message* res, + const Server* server, + MethodStatus* method_status_raw, + int64_t received_us) { + ControllerPrivateAccessor accessor(cntl); + Socket* sock = accessor.get_sending_socket(); + ConcurrencyRemover concurrency_remover(method_status_raw, cntl, received_us); + std::unique_ptr recycle_cntl(cntl); + std::unique_ptr recycle_req(req); + std::unique_ptr recycle_res(res); + if (cntl->IsCloseConnection()) { + sock->SetFailed(); + return; + } + bool append_body = false; + butil::IOBuf res_body; + // `res' can be NULL here, in which case we don't serialize it + // If user calls `SetFailed' on Controller, we don't serialize + // response either + struct FBRpcResponseHeader *rpc_header = NULL; + uint32_t reserve_size = sizeof(struct FBRpcResponseHeader); + + if (res != NULL && !cntl->Failed()) { + rpc_header = static_cast(res->reduce_meta_size_and_get_buf(sizeof(struct FBRpcResponseHeader))); + if (BAIDU_UNLIKELY(rpc_header == NULL)) { + cntl->SetFailed(ERESPONSE, "Fail to reduce meta size and get buf"); + } else { + if (!brpc::flatbuffers::SerializeFbToIOBUF(res, res_body)) { + cntl->SetFailed(ERESPONSE, "Fail to serialize response"); + } else { + append_body = true; + } + } + } + + // Don't use res->ByteSize() since it may be compressed + size_t res_size = 0; + size_t attached_size = 0; + size_t meta_size = sizeof(struct FBRpcResponseMeta); + if (append_body && rpc_header != NULL) { + res_size = res_body.length() - reserve_size; + attached_size = cntl->response_attachment().length(); + PackFlatbuffersRpcHeader(rpc_header->header, + meta_size, res_size + attached_size); + rpc_header->meta.message_size = res_size; + rpc_header->meta.attachment_size = attached_size; + rpc_header->meta.response.error_code = cntl->ErrorCode(); + rpc_header->meta.correlation_id = correlation_id; + if (attached_size > 0) { + res_body.append(cntl->response_attachment().movable()); + } + } else { // error response + struct FBRpcResponseHeader tmp_header; + tmp_header.meta.message_size = 0; + tmp_header.meta.attachment_size = 0; + tmp_header.meta.response.error_code = cntl->ErrorCode(); + tmp_header.meta.correlation_id = correlation_id; + PackFlatbuffersRpcHeader(tmp_header.header, + meta_size, 0); + res_body.clear(); + res_body.append((void const*) &tmp_header, + sizeof(struct FBRpcResponseHeader)); + } + Socket::WriteOptions wopt; + wopt.ignore_eovercrowded = true; + if (sock->Write(&res_body, &wopt) != 0) { + const int errcode = errno; + PLOG_IF(WARNING, errcode != EPIPE) << "Fail to write into " << *sock; + cntl->SetFailed(errcode, "Fail to write into %s", + sock->description().c_str()); + return; + } +} + +void ProcessFlatBuffersRequest(InputMessageBase* msg_base) { + DestroyingPtr msg(static_cast(msg_base)); + SocketUniquePtr socket_guard(msg->ReleaseSocket()); + Socket* socket = socket_guard.get(); + const Server* server = static_cast(msg_base->arg()); + ScopedNonServiceError non_service_error(server); + butil::SingleIOBuf meta_buf; + if (!ParseMetaBufferFromIOBUF(&meta_buf, + msg->meta, sizeof(struct FBRpcRequestMeta))) { + LOG(WARNING) << "Fail to parse RpcMeta from " << *socket; + socket->SetFailed(EREQUEST, "Fail to parse RpcMeta from %s", + socket->description().c_str()); + return; + } + const struct FBRpcRequestMeta* meta = + static_cast(meta_buf.get_begin()); + if (!meta) { + LOG(WARNING) << "RpcMeta from " << *socket << " is NULL"; + socket->SetFailed(EREQUEST, "Fail to parse RpcMeta from %s", + socket->description().c_str()); + return; + } + + std::unique_ptr cntl; + cntl.reset(new (std::nothrow) Controller); + if (NULL == cntl.get()) { + LOG(WARNING) << "Fail to new Controller"; + return; + } + + std::unique_ptr req; + std::unique_ptr res; + + ServerPrivateAccessor server_accessor(server); + + ControllerPrivateAccessor accessor(cntl.get()); + accessor.set_server(server) + .set_peer_id(socket->id()) + .set_remote_side(socket->remote_side()) + .set_local_side(socket->local_side()) + .set_request_protocol(PROTOCOL_FLATBUFFERS_RPC) + .move_in_server_receiving_sock(socket_guard); + MethodStatus* method_status = NULL; + do { + if (!server->IsRunning()) { + cntl->SetFailed(ELOGOFF, "Server is stopping"); + break; + } + + if (socket->is_overcrowded()) { + cntl->SetFailed(EOVERCROWDED, "Connection to %s is overcrowded", + butil::endpoint2str(socket->remote_side()).c_str()); + break; + } + + if (!server_accessor.AddConcurrency(cntl.get())) { + cntl->SetFailed(ELIMIT, "Reached server's max_concurrency=%d", + server->options().max_concurrency); + break; + } + + const Server::FlatBuffersMethodProperty* mp = + server_accessor.FindFlatBuffersMethodPropertyByIndex(meta->request.service_index, + meta->request.method_index); + if (NULL == mp) { + cntl->SetFailed(ENOMETHOD, "Fail to find method_index=%d service_index=%u ", + meta->request.method_index, + meta->request.service_index); + break; + } + // Switch to service-specific error. + non_service_error.release(); + if (mp->status) { + method_status = mp->status; + if (!method_status->OnRequested()) { + cntl->SetFailed(ELIMIT, "Reached %s's MaxConcurrency=%d", + mp->method->full_name().c_str(), + method_status->MaxConcurrency()); + break; + } + } + brpc::flatbuffers::Service* svc = mp->service; + const brpc::flatbuffers::MethodDescriptor* method = mp->method; + accessor.set_fb_method(method); + const int reqsize = static_cast(msg->payload.size()); + butil::IOBuf req_buf; + butil::IOBuf* req_buf_ptr = &msg->payload; + if (meta->attachment_size > 0) { + if (reqsize < meta->attachment_size) { + cntl->SetFailed(EREQUEST, + "attachment_size=%d is larger than request_size=%d", + meta->attachment_size, reqsize); + break; + } + int body_without_attachment_size = reqsize - meta->attachment_size; + msg->payload.cutn(&req_buf, body_without_attachment_size); + req_buf_ptr = &req_buf; + cntl->request_attachment().swap(msg->payload); + } + + req.reset(new brpc::flatbuffers::Message()); + if (!brpc::flatbuffers::ParseFbFromIOBUF(req.get(), meta->message_size, *req_buf_ptr)) { + cntl->SetFailed(EREQUEST, "Fail to parse request message, " + "request_size=%d", reqsize); + break; + } + res.reset(new brpc::flatbuffers::Message()); + // `socket' will be held until response has been sent + google::protobuf::Closure* done = ::brpc::NewCallback< + int64_t, Controller*, brpc::flatbuffers::Message*, + brpc::flatbuffers::Message*, const Server*, + MethodStatus*, int64_t>( + &SendFlatBuffersRpcResponse, meta->correlation_id, cntl.get(), + req.get(), res.get(), server, + method_status, msg->received_us()); + + msg.reset(); + req_buf.clear(); + svc->FBCallMethod(method, cntl.release(), + req.release(), res.release(), done); + return; + } while (false); + // `cntl', `req' and `res' will be deleted inside `SendFlatBuffersRpcResponse' + // `socket' will be held until response has been sent + SendFlatBuffersRpcResponse(meta->correlation_id, cntl.release(), + req.release(), res.release(), server, + method_status, -1); +} + +void ProcessFlatBuffersResponse(InputMessageBase* msg_base) { + DestroyingPtr msg(static_cast(msg_base)); + butil::SingleIOBuf meta_buf; + if (!ParseMetaBufferFromIOBUF(&meta_buf, + msg->meta, sizeof(struct FBRpcResponseMeta))) { + LOG(WARNING) << "Fail to parse from response meta"; + return; + } + const struct FBRpcResponseMeta* meta = + static_cast(meta_buf.get_begin()); + if (!meta) { + LOG(WARNING) << "Fail to parse from response meta: meta is NULL"; + return; + } + + const bthread_id_t cid = { static_cast(meta->correlation_id) }; + Controller* cntl = NULL; + const int rc = bthread_id_lock(cid, (void**)&cntl); + if (rc != 0) { + LOG_IF(ERROR, rc != EINVAL && rc != EPERM) + << "Fail to lock correlation_id=" << cid << ": " << berror(rc); + return; + } + + ControllerPrivateAccessor accessor(cntl); + const int saved_error = cntl->ErrorCode(); + do { + if (meta->response.error_code != 0) { + // If error_code is unset, default is 0 = success. + cntl->SetFailed(meta->response.error_code, + "server response error"); + break; + } + // Parse response message if error code from meta is 0 + butil::IOBuf res_buf; + const int res_size = msg->payload.length(); + butil::IOBuf* res_buf_ptr = &msg->payload; + if (meta->attachment_size > 0) { + if (meta->attachment_size > res_size) { + cntl->SetFailed( + ERESPONSE, + "attachment_size=%d is larger than response_size=%d", + meta->attachment_size, res_size); + break; + } + int body_without_attachment_size = res_size - meta->attachment_size; + msg->payload.cutn(&res_buf, body_without_attachment_size); + res_buf_ptr = &res_buf; + cntl->response_attachment().swap(msg->payload); + } + + if (cntl->fb_response()) { + if (!brpc::flatbuffers::ParseFbFromIOBUF(cntl->fb_response(), + meta->message_size, *res_buf_ptr)) { + cntl->SetFailed( + ERESPONSE, "Fail to parse response message, " + " response_size=%d", res_size); + } + } // else silently ignore the response. + } while (0); + // Unlocks correlation_id inside. Revert controller's + // error code if it version check of `cid' fails + msg.reset(); // optional, just release resourse ASAP + accessor.OnResponse(cid, saved_error); +} + +void PackFlatBuffersRequest(butil::IOBuf* req_buf, + SocketMessage**, + uint64_t correlation_id, + const google::protobuf::MethodDescriptor* method, + Controller* cntl, + const butil::IOBuf& request_body, + const Authenticator* /*auth*/) { + // FlatBuffers does not use protobuf service definitions. The caller passes + // a brpc::details::flatbuffers::MethodDescriptor* disguised as a + // google::protobuf::MethodDescriptor*, so we reinterpret_cast it back. + const brpc::details::flatbuffers::MethodDescriptor* fb_method = + reinterpret_cast(method); + struct FBRpcRequestHeader *rpc_header = NULL; + size_t req_size = request_body.length(); + rpc_header = (struct FBRpcRequestHeader*)const_cast(request_body.fetch1()); + if (BAIDU_UNLIKELY(rpc_header == NULL)) { + return cntl->SetFailed(ERESPONSE, "fail to get fb request rpc header"); + } + req_size -= sizeof(struct FBRpcRequestHeader); + + //ControllerPrivateAccessor accessor(cntl); + if (fb_method) { + rpc_header->meta.request.service_index = fb_method->service()->index(); + rpc_header->meta.request.method_index = fb_method->index(); + } else { + return cntl->SetFailed(ENOMETHOD, "%s.method is NULL", __FUNCTION__); + } + + rpc_header->meta.correlation_id = correlation_id; + + size_t meta_size = sizeof(struct FBRpcRequestMeta); + rpc_header->meta.message_size = req_size; + const size_t attached_size = cntl->request_attachment().length(); + if (attached_size > 0) { + rpc_header->meta.attachment_size = attached_size; + } else { + rpc_header->meta.attachment_size = 0; + } + PackFlatbuffersRpcHeader(rpc_header->header, meta_size, req_size + attached_size); + + req_buf->append(request_body); + + if (attached_size > 0) { + req_buf->append(cntl->request_attachment()); + } +} + +void SerializeFlatBuffersRequest(butil::IOBuf* buf, + Controller* cntl, + const google::protobuf::Message* request) { + if (!request) { + return cntl->SetFailed(EREQUEST, "`request' is NULL"); + } + if (request->GetDescriptor() != + brpc::details::flatbuffers::Message::descriptor()) { + return cntl->SetFailed(EREQUEST, "request is not a flatbuffers::Message"); + } + brpc::details::flatbuffers::Message* fb_request = + const_cast( + static_cast(request)); + uint32_t reserve_size = sizeof(struct FBRpcRequestHeader); + fb_request->reduce_meta_size_and_get_buf(reserve_size); + if (!brpc::details::flatbuffers::SerializeFbToIOBUF(fb_request, *buf)) { + return cntl->SetFailed(EREQUEST, "Fail to serialize request"); + } +} + +} // namespace policy +} // namespace brpc diff --git a/src/brpc/policy/flatbuffers_protocol.h b/src/brpc/policy/flatbuffers_protocol.h new file mode 100644 index 0000000000..2795c59f20 --- /dev/null +++ b/src/brpc/policy/flatbuffers_protocol.h @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BRPC_POLICY_FLATBUFFERS_PROTOCOL_H +#define BRPC_POLICY_FLATBUFFERS_PROTOCOL_H + +#include "brpc/protocol.h" +#include "brpc/details/flatbuffers_impl.h" + +namespace brpc { +namespace policy { + +// Parse binary format of flatbuffers-pbrpc. +ParseResult ParseFlatBuffersMessage(butil::IOBuf* source, Socket *socket, bool read_eof, const void *arg); + +// Actions to a (client) request in flatbuffers-pbrpc format. +void ProcessFlatBuffersRequest(InputMessageBase* msg_base); + +// Actions to a (server) response in flatbuffers-pbrpc format. +void ProcessFlatBuffersResponse(InputMessageBase* msg); + +// The serialize_request implementation used by flatbuffers protocol. +void SerializeFlatBuffersRequest(butil::IOBuf* buf, + Controller* cntl, + const google::protobuf::Message* request); + +// Pack `request' to `method' into `buf'. +void PackFlatBuffersRequest(butil::IOBuf* buf, + SocketMessage**, + uint64_t correlation_id, + const google::protobuf::MethodDescriptor* method, + Controller* controller, + const butil::IOBuf& request, + const Authenticator* auth); + +} // namespace policy +} // namespace brpc + +#endif // BRPC_POLICY_FLATBUFFERS_PROTOCOL_H \ No newline at end of file diff --git a/src/brpc/server.cpp b/src/brpc/server.cpp index 9470220d09..1de21d9de3 100644 --- a/src/brpc/server.cpp +++ b/src/brpc/server.cpp @@ -168,6 +168,55 @@ ServerSSLOptions* ServerOptions::mutable_ssl_options() { return _ssl_options.get(); } +Server::FlatBuffersMethodProperty::FlatBuffersMethodProperty() + : service(NULL) + , method(NULL) + , status(NULL) { +} + +Server::FlatBuffersServiceProperty::FlatBuffersServiceProperty() + :service(NULL) + ,method_count(0) + ,methods_list(NULL){ +} + +Server::FlatBuffersServiceProperty::~FlatBuffersServiceProperty() { + if (methods_list) { + for (int i = 0; i < method_count; ++i) { + if (methods_list[i]) { + delete methods_list[i]->status; + delete methods_list[i]; + } + } + delete[] methods_list; + methods_list = NULL; + } +} + +Server::FlatBuffersServiceProperty::FlatBuffersServiceProperty( + FlatBuffersServiceProperty&& other) + : service(other.service) + , method_count(other.method_count) + , methods_list(other.methods_list) { + other.service = NULL; + other.method_count = 0; + other.methods_list = NULL; +} + +Server::FlatBuffersServiceProperty& +Server::FlatBuffersServiceProperty::operator=(FlatBuffersServiceProperty&& other) { + if (this != &other) { + this->~FlatBuffersServiceProperty(); + service = other.service; + method_count = other.method_count; + methods_list = other.methods_list; + other.service = NULL; + other.method_count = 0; + other.methods_list = NULL; + } + return *this; +} + Server::MethodProperty::OpaqueParams::OpaqueParams() : is_tabbed(false) , allow_default_url(false) @@ -423,6 +472,14 @@ const std::string Server::ServiceProperty::service_name() const { return s_unknown_name; } +const std::string& Server::FlatBuffersServiceProperty::service_name() const { + if (service) { + return service->GetDescriptor()->full_name(); + } + const static std::string s_unknown_name = ""; + return s_unknown_name; +} + Server::Server(ProfilerLinker) : _session_local_data_pool(NULL) , _status(UNINITIALIZED) @@ -1584,6 +1641,72 @@ int Server::AddServiceInternal(google::protobuf::Service* service, return 0; } +int Server::AddServiceInternal(brpc::flatbuffers::Service* service, + bool is_builtin_service, + const ServiceOptions& options) { + if (is_builtin_service) { + LOG(ERROR) << "builtin_service of flatbuffers rpc is not supported"; + return -1; + } + if (NULL == service) { + LOG(ERROR) << "Parameter[service] is NULL!"; + return -1; + } + const brpc::flatbuffers::ServiceDescriptor* sd = service->GetDescriptor(); + int method_count = sd->method_count(); + if (method_count <= 0) { + LOG(ERROR) << "service=" << sd->full_name() + << " does not have any method."; + return -1; + } + if (InitializeOnce() != 0) { + LOG(ERROR) << "Fail to initialize Server[" << version() << ']'; + return -1; + } + if (status() != READY) { + LOG(ERROR) << "Can't add service=" << sd->full_name() << " to Server[" + << version() << "] which is " << status_str(status()); + return -1; + } + // Check service conflict using service's index + FlatBuffersServiceProperty* c_ss = _fb_server_index_map.seek(sd->index()); + if (c_ss != NULL) { + LOG(ERROR) << "service:" << sd->full_name() + << " with index:"<< sd->index() + << " conflicts with registed service:" << c_ss->service->GetDescriptor()->full_name() + << " Try to change your service name."; + return -1; + } + + // Register ServiceProperty + FlatBuffersServiceProperty ss; + ss.service = service; + ss.method_count = method_count; + ss.methods_list = new FlatBuffersMethodProperty*[method_count]; + if (!ss.methods_list) { + LOG(ERROR) << "Fail to alloc methods_list"; + return -1; + } + memset(ss.methods_list, 0, method_count * sizeof(FlatBuffersMethodProperty*)); + + // Register MethodProperty + for (int i = 0; i < method_count; ++i) { + const brpc::flatbuffers::MethodDescriptor* md = sd->method(i); + FlatBuffersMethodProperty* mp = new FlatBuffersMethodProperty(); + if (!mp) { + LOG(ERROR) << "Fail to alloc FlatBuffersMethodProperty"; + return -1; + } + mp->service = service; + mp->method = md; + mp->status = new MethodStatus; + ss.methods_list[i] = mp; + } + _fb_server_index_map[sd->index()] = std::move(ss); + + return 0; +} + ServiceOptions::ServiceOptions() : ownership(SERVER_DOESNT_OWN_SERVICE) , allow_default_url(false) @@ -1621,6 +1744,18 @@ int Server::AddService(google::protobuf::Service* service, return AddServiceInternal(service, false, options); } +int Server::AddService(brpc::flatbuffers::Service* service, + ServiceOwnership ownership) { + ServiceOptions options; + options.ownership = ownership; + return AddServiceInternal(service, false, options); +} + +int Server::AddService(brpc::flatbuffers::Service* service, + const ServiceOptions& options) { + return AddServiceInternal(service, false, options); +} + int Server::AddBuiltinService(google::protobuf::Service* service) { ServiceOptions options; options.ownership = SERVER_OWNS_SERVICE; @@ -1746,6 +1881,7 @@ void Server::ClearServices() { } delete it->second.http_url; } + _fb_server_index_map.clear(); _fullname_service_map.clear(); _service_map.clear(); _method_map.clear(); @@ -2012,6 +2148,24 @@ Server::FindServicePropertyByName(const butil::StringPiece& name) const { return _service_map.seek(name); } +const Server::FlatBuffersServiceProperty* +Server::FindFlatBuffersServicePropertyByIndex(uint32_t service_index) const { + return _fb_server_index_map.seek(service_index); +} + +const Server::FlatBuffersMethodProperty* +Server::FindFlatBuffersMethodPropertyByIndex(uint32_t service_index, int method_index) const { + const Server::FlatBuffersServiceProperty* sp = + FindFlatBuffersServicePropertyByIndex(service_index); + if (NULL == sp || NULL == sp->methods_list) { + return NULL; + } + if (method_index < 0 || method_index >= sp->method_count) { + return NULL; + } + return sp->methods_list[method_index]; +} + int Server::AddCertificate(const CertInfo& cert) { if (!_options.has_ssl_options()) { LOG(ERROR) << "ServerOptions.ssl_options is not configured yet"; diff --git a/src/brpc/server.h b/src/brpc/server.h index 9f69a83458..e817687173 100644 --- a/src/brpc/server.h +++ b/src/brpc/server.h @@ -46,6 +46,7 @@ #include "brpc/baidu_master_service.h" #include "brpc/rpc_pb_message_factory.h" #include "brpc/socket_mode.h" +#include "brpc/details/flatbuffers_impl.h" namespace brpc { @@ -429,6 +430,28 @@ class Server { }; typedef butil::FlatMap MethodMap; + struct FlatBuffersMethodProperty { + brpc::flatbuffers::Service* service; + const brpc::flatbuffers::MethodDescriptor* method; + MethodStatus* status; + FlatBuffersMethodProperty(); + }; + + struct FlatBuffersServiceProperty { + brpc::flatbuffers::Service* service; + int method_count; + FlatBuffersMethodProperty** methods_list; + bool is_user_service() const {return false;} + + const std::string& service_name() const; + FlatBuffersServiceProperty(); + ~FlatBuffersServiceProperty(); + FlatBuffersServiceProperty(const FlatBuffersServiceProperty&) = delete; + FlatBuffersServiceProperty& operator=(const FlatBuffersServiceProperty&) = delete; + FlatBuffersServiceProperty(FlatBuffersServiceProperty&& other); + FlatBuffersServiceProperty& operator=(FlatBuffersServiceProperty&& other); + }; + struct ThreadLocalOptions { bthread_key_t tls_key; const DataFactory* thread_local_data_factory; @@ -494,7 +517,10 @@ class Server { bool allow_default_url = false); int AddService(google::protobuf::Service* service, const ServiceOptions& options); - + int AddService(brpc::flatbuffers::Service* service, + ServiceOwnership ownership); + int AddService(brpc::flatbuffers::Service* service, + const ServiceOptions& options); // Remove a service from this server. // NOTE: removing a service while server is running is forbidden. // Returns 0 on success, -1 otherwise. @@ -629,6 +655,10 @@ friend class Controller; bool is_builtin_service, const ServiceOptions& options); + int AddServiceInternal(brpc::flatbuffers::Service* service, + bool is_builtin_service, + const ServiceOptions& options); + int AddBuiltinService(google::protobuf::Service* service); // Remove all methods of `service' from internal structures. @@ -681,6 +711,12 @@ friend class Controller; const ServiceProperty* FindServicePropertyByName(const butil::StringPiece& name) const; + const FlatBuffersServiceProperty* + FindFlatBuffersServicePropertyByIndex(uint32_t service_index) const; + + const FlatBuffersMethodProperty* + FindFlatBuffersMethodPropertyByIndex(uint32_t service_index, int method_index) const; + std::string ServerPrefix() const; // Mapping from hostname to corresponding SSL_CTX @@ -755,6 +791,10 @@ friend class Controller; // uses service->name() to designate an RPC service ServiceMap _service_map; + //used by flatbuffers + typedef butil::FlatMap FlatBuffersServiceIDMap; + FlatBuffersServiceIDMap _fb_server_index_map; + // The only non-builtin service in _service_map, otherwise NULL. google::protobuf::Service* _first_service;