proto_buffer_writer.h 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. /*
  2. *
  3. * Copyright 2018 gRPC authors.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. *
  17. */
  18. #ifndef GRPCPP_IMPL_CODEGEN_PROTO_BUFFER_WRITER_H
  19. #define GRPCPP_IMPL_CODEGEN_PROTO_BUFFER_WRITER_H
  20. // IWYU pragma: private, include <grpcpp/support/proto_buffer_writer.h>
  21. #include <type_traits>
  22. #include <grpc/impl/codegen/grpc_types.h>
  23. #include <grpc/impl/codegen/slice.h>
  24. #include <grpcpp/impl/codegen/byte_buffer.h>
  25. #include <grpcpp/impl/codegen/config_protobuf.h>
  26. #include <grpcpp/impl/codegen/core_codegen_interface.h>
  27. #include <grpcpp/impl/codegen/serialization_traits.h>
  28. #include <grpcpp/impl/codegen/status.h>
  29. /// This header provides an object that writes bytes directly into a
  30. /// grpc::ByteBuffer, via the ZeroCopyOutputStream interface
  31. namespace grpc {
  32. extern CoreCodegenInterface* g_core_codegen_interface;
  33. // Forward declaration for testing use only
  34. namespace internal {
  35. class ProtoBufferWriterPeer;
  36. } // namespace internal
  37. const int kProtoBufferWriterMaxBufferLength = 1024 * 1024;
  38. /// This is a specialization of the protobuf class ZeroCopyOutputStream.
  39. /// The principle is to give the proto layer one buffer of bytes at a time
  40. /// that it can use to serialize the next portion of the message, with the
  41. /// option to "backup" if more buffer is given than required at the last buffer.
  42. ///
  43. /// Read more about ZeroCopyOutputStream interface here:
  44. /// https://developers.google.com/protocol-buffers/docs/reference/cpp/google.protobuf.io.zero_copy_stream#ZeroCopyOutputStream
  45. class ProtoBufferWriter : public grpc::protobuf::io::ZeroCopyOutputStream {
  46. public:
  47. /// Constructor for this derived class
  48. ///
  49. /// \param[out] byte_buffer A pointer to the grpc::ByteBuffer created
  50. /// \param block_size How big are the chunks to allocate at a time
  51. /// \param total_size How many total bytes are required for this proto
  52. ProtoBufferWriter(ByteBuffer* byte_buffer, int block_size, int total_size)
  53. : block_size_(block_size),
  54. total_size_(total_size),
  55. byte_count_(0),
  56. have_backup_(false) {
  57. GPR_CODEGEN_ASSERT(!byte_buffer->Valid());
  58. /// Create an empty raw byte buffer and look at its underlying slice buffer
  59. grpc_byte_buffer* bp =
  60. g_core_codegen_interface->grpc_raw_byte_buffer_create(nullptr, 0);
  61. byte_buffer->set_buffer(bp);
  62. slice_buffer_ = &bp->data.raw.slice_buffer;
  63. }
  64. ~ProtoBufferWriter() override {
  65. if (have_backup_) {
  66. g_core_codegen_interface->grpc_slice_unref(backup_slice_);
  67. }
  68. }
  69. /// Give the proto library the next buffer of bytes and its size. It is
  70. /// safe for the caller to write from data[0, size - 1].
  71. bool Next(void** data, int* size) override {
  72. // Protobuf should not ask for more memory than total_size_.
  73. GPR_CODEGEN_ASSERT(byte_count_ < total_size_);
  74. // 1. Use the remaining backup slice if we have one
  75. // 2. Otherwise allocate a slice, up to the remaining length needed
  76. // or our maximum allocation size
  77. // 3. Provide the slice start and size available
  78. // 4. Add the slice being returned to the slice buffer
  79. size_t remain = static_cast<size_t>(total_size_ - byte_count_);
  80. if (have_backup_) {
  81. /// If we have a backup slice, we should use it first
  82. slice_ = backup_slice_;
  83. have_backup_ = false;
  84. if (GRPC_SLICE_LENGTH(slice_) > remain) {
  85. GRPC_SLICE_SET_LENGTH(slice_, remain);
  86. }
  87. } else {
  88. // When less than a whole block is needed, only allocate that much.
  89. // But make sure the allocated slice is not inlined.
  90. size_t allocate_length =
  91. remain > static_cast<size_t>(block_size_) ? block_size_ : remain;
  92. slice_ = g_core_codegen_interface->grpc_slice_malloc(
  93. allocate_length > GRPC_SLICE_INLINED_SIZE
  94. ? allocate_length
  95. : GRPC_SLICE_INLINED_SIZE + 1);
  96. }
  97. *data = GRPC_SLICE_START_PTR(slice_);
  98. // On win x64, int is only 32bit
  99. GPR_CODEGEN_ASSERT(GRPC_SLICE_LENGTH(slice_) <= INT_MAX);
  100. byte_count_ += * size = static_cast<int>(GRPC_SLICE_LENGTH(slice_));
  101. g_core_codegen_interface->grpc_slice_buffer_add(slice_buffer_, slice_);
  102. return true;
  103. }
  104. /// Backup by \a count bytes because Next returned more bytes than needed
  105. /// (only used in the last buffer). \a count must be less than or equal too
  106. /// the last buffer returned from next.
  107. void BackUp(int count) override {
  108. // count == 0 is invoked by ZeroCopyOutputStream users indicating that any
  109. // potential buffer obtained through a previous call to Next() is final.
  110. // ZeroCopyOutputStream implementations such as streaming output can use
  111. // these calls to flush any temporary buffer and flush the output. The logic
  112. // below is not robust against count == 0 invocations, so directly return.
  113. if (count == 0) return;
  114. /// 1. Remove the partially-used last slice from the slice buffer
  115. /// 2. Split it into the needed (if any) and unneeded part
  116. /// 3. Add the needed part back to the slice buffer
  117. /// 4. Mark that we still have the remaining part (for later use/unref)
  118. GPR_CODEGEN_ASSERT(count <= static_cast<int>(GRPC_SLICE_LENGTH(slice_)));
  119. g_core_codegen_interface->grpc_slice_buffer_pop(slice_buffer_);
  120. if (static_cast<size_t>(count) == GRPC_SLICE_LENGTH(slice_)) {
  121. backup_slice_ = slice_;
  122. } else {
  123. backup_slice_ = g_core_codegen_interface->grpc_slice_split_tail(
  124. &slice_, GRPC_SLICE_LENGTH(slice_) - count);
  125. g_core_codegen_interface->grpc_slice_buffer_add(slice_buffer_, slice_);
  126. }
  127. // It's dangerous to keep an inlined grpc_slice as the backup slice, since
  128. // on a following Next() call, a reference will be returned to this slice
  129. // via GRPC_SLICE_START_PTR, which will not be an address held by
  130. // slice_buffer_.
  131. have_backup_ = backup_slice_.refcount != nullptr;
  132. byte_count_ -= count;
  133. }
  134. /// Returns the total number of bytes written since this object was created.
  135. int64_t ByteCount() const override { return byte_count_; }
  136. // These protected members are needed to support internal optimizations.
  137. // they expose internal bits of grpc core that are NOT stable. If you have
  138. // a use case needs to use one of these functions, please send an email to
  139. // https://groups.google.com/forum/#!forum/grpc-io.
  140. protected:
  141. grpc_slice_buffer* slice_buffer() { return slice_buffer_; }
  142. void set_byte_count(int64_t byte_count) { byte_count_ = byte_count; }
  143. private:
  144. // friend for testing purposes only
  145. friend class internal::ProtoBufferWriterPeer;
  146. const int block_size_; ///< size to alloc for each new \a grpc_slice needed
  147. const int total_size_; ///< byte size of proto being serialized
  148. int64_t byte_count_; ///< bytes written since this object was created
  149. grpc_slice_buffer*
  150. slice_buffer_; ///< internal buffer of slices holding the serialized data
  151. bool have_backup_; ///< if we are holding a backup slice or not
  152. grpc_slice backup_slice_; ///< holds space we can still write to, if the
  153. ///< caller has called BackUp
  154. grpc_slice slice_; ///< current slice passed back to the caller
  155. };
  156. } // namespace grpc
  157. #endif // GRPCPP_IMPL_CODEGEN_PROTO_BUFFER_WRITER_H