本站源代码
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

271 lines
5.4KB

  1. // Package buffer implements a buffer for serialization, consisting of a chain of []byte-s to
  2. // reduce copying and to allow reuse of individual chunks.
  3. package buffer
  4. import (
  5. "io"
  6. "sync"
  7. )
  8. // PoolConfig contains configuration for the allocation and reuse strategy.
  9. type PoolConfig struct {
  10. StartSize int // Minimum chunk size that is allocated.
  11. PooledSize int // Minimum chunk size that is reused, reusing chunks too small will result in overhead.
  12. MaxSize int // Maximum chunk size that will be allocated.
  13. }
  14. var config = PoolConfig{
  15. StartSize: 128,
  16. PooledSize: 512,
  17. MaxSize: 32768,
  18. }
  19. // Reuse pool: chunk size -> pool.
  20. var buffers = map[int]*sync.Pool{}
  21. func initBuffers() {
  22. for l := config.PooledSize; l <= config.MaxSize; l *= 2 {
  23. buffers[l] = new(sync.Pool)
  24. }
  25. }
  26. func init() {
  27. initBuffers()
  28. }
  29. // Init sets up a non-default pooling and allocation strategy. Should be run before serialization is done.
  30. func Init(cfg PoolConfig) {
  31. config = cfg
  32. initBuffers()
  33. }
  34. // putBuf puts a chunk to reuse pool if it can be reused.
  35. func putBuf(buf []byte) {
  36. size := cap(buf)
  37. if size < config.PooledSize {
  38. return
  39. }
  40. if c := buffers[size]; c != nil {
  41. c.Put(buf[:0])
  42. }
  43. }
  44. // getBuf gets a chunk from reuse pool or creates a new one if reuse failed.
  45. func getBuf(size int) []byte {
  46. if size < config.PooledSize {
  47. return make([]byte, 0, size)
  48. }
  49. if c := buffers[size]; c != nil {
  50. v := c.Get()
  51. if v != nil {
  52. return v.([]byte)
  53. }
  54. }
  55. return make([]byte, 0, size)
  56. }
  57. // Buffer is a buffer optimized for serialization without extra copying.
  58. type Buffer struct {
  59. // Buf is the current chunk that can be used for serialization.
  60. Buf []byte
  61. toPool []byte
  62. bufs [][]byte
  63. }
  64. // EnsureSpace makes sure that the current chunk contains at least s free bytes,
  65. // possibly creating a new chunk.
  66. func (b *Buffer) EnsureSpace(s int) {
  67. if cap(b.Buf)-len(b.Buf) >= s {
  68. return
  69. }
  70. l := len(b.Buf)
  71. if l > 0 {
  72. if cap(b.toPool) != cap(b.Buf) {
  73. // Chunk was reallocated, toPool can be pooled.
  74. putBuf(b.toPool)
  75. }
  76. if cap(b.bufs) == 0 {
  77. b.bufs = make([][]byte, 0, 8)
  78. }
  79. b.bufs = append(b.bufs, b.Buf)
  80. l = cap(b.toPool) * 2
  81. } else {
  82. l = config.StartSize
  83. }
  84. if l > config.MaxSize {
  85. l = config.MaxSize
  86. }
  87. b.Buf = getBuf(l)
  88. b.toPool = b.Buf
  89. }
  90. // AppendByte appends a single byte to buffer.
  91. func (b *Buffer) AppendByte(data byte) {
  92. if cap(b.Buf) == len(b.Buf) { // EnsureSpace won't be inlined.
  93. b.EnsureSpace(1)
  94. }
  95. b.Buf = append(b.Buf, data)
  96. }
  97. // AppendBytes appends a byte slice to buffer.
  98. func (b *Buffer) AppendBytes(data []byte) {
  99. for len(data) > 0 {
  100. if cap(b.Buf) == len(b.Buf) { // EnsureSpace won't be inlined.
  101. b.EnsureSpace(1)
  102. }
  103. sz := cap(b.Buf) - len(b.Buf)
  104. if sz > len(data) {
  105. sz = len(data)
  106. }
  107. b.Buf = append(b.Buf, data[:sz]...)
  108. data = data[sz:]
  109. }
  110. }
  111. // AppendBytes appends a string to buffer.
  112. func (b *Buffer) AppendString(data string) {
  113. for len(data) > 0 {
  114. if cap(b.Buf) == len(b.Buf) { // EnsureSpace won't be inlined.
  115. b.EnsureSpace(1)
  116. }
  117. sz := cap(b.Buf) - len(b.Buf)
  118. if sz > len(data) {
  119. sz = len(data)
  120. }
  121. b.Buf = append(b.Buf, data[:sz]...)
  122. data = data[sz:]
  123. }
  124. }
  125. // Size computes the size of a buffer by adding sizes of every chunk.
  126. func (b *Buffer) Size() int {
  127. size := len(b.Buf)
  128. for _, buf := range b.bufs {
  129. size += len(buf)
  130. }
  131. return size
  132. }
  133. // DumpTo outputs the contents of a buffer to a writer and resets the buffer.
  134. func (b *Buffer) DumpTo(w io.Writer) (written int, err error) {
  135. var n int
  136. for _, buf := range b.bufs {
  137. if err == nil {
  138. n, err = w.Write(buf)
  139. written += n
  140. }
  141. putBuf(buf)
  142. }
  143. if err == nil {
  144. n, err = w.Write(b.Buf)
  145. written += n
  146. }
  147. putBuf(b.toPool)
  148. b.bufs = nil
  149. b.Buf = nil
  150. b.toPool = nil
  151. return
  152. }
  153. // BuildBytes creates a single byte slice with all the contents of the buffer. Data is
  154. // copied if it does not fit in a single chunk. You can optionally provide one byte
  155. // slice as argument that it will try to reuse.
  156. func (b *Buffer) BuildBytes(reuse ...[]byte) []byte {
  157. if len(b.bufs) == 0 {
  158. ret := b.Buf
  159. b.toPool = nil
  160. b.Buf = nil
  161. return ret
  162. }
  163. var ret []byte
  164. size := b.Size()
  165. // If we got a buffer as argument and it is big enought, reuse it.
  166. if len(reuse) == 1 && cap(reuse[0]) >= size {
  167. ret = reuse[0][:0]
  168. } else {
  169. ret = make([]byte, 0, size)
  170. }
  171. for _, buf := range b.bufs {
  172. ret = append(ret, buf...)
  173. putBuf(buf)
  174. }
  175. ret = append(ret, b.Buf...)
  176. putBuf(b.toPool)
  177. b.bufs = nil
  178. b.toPool = nil
  179. b.Buf = nil
  180. return ret
  181. }
  182. type readCloser struct {
  183. offset int
  184. bufs [][]byte
  185. }
  186. func (r *readCloser) Read(p []byte) (n int, err error) {
  187. for _, buf := range r.bufs {
  188. // Copy as much as we can.
  189. x := copy(p[n:], buf[r.offset:])
  190. n += x // Increment how much we filled.
  191. // Did we empty the whole buffer?
  192. if r.offset+x == len(buf) {
  193. // On to the next buffer.
  194. r.offset = 0
  195. r.bufs = r.bufs[1:]
  196. // We can release this buffer.
  197. putBuf(buf)
  198. } else {
  199. r.offset += x
  200. }
  201. if n == len(p) {
  202. break
  203. }
  204. }
  205. // No buffers left or nothing read?
  206. if len(r.bufs) == 0 {
  207. err = io.EOF
  208. }
  209. return
  210. }
  211. func (r *readCloser) Close() error {
  212. // Release all remaining buffers.
  213. for _, buf := range r.bufs {
  214. putBuf(buf)
  215. }
  216. // In case Close gets called multiple times.
  217. r.bufs = nil
  218. return nil
  219. }
  220. // ReadCloser creates an io.ReadCloser with all the contents of the buffer.
  221. func (b *Buffer) ReadCloser() io.ReadCloser {
  222. ret := &readCloser{0, append(b.bufs, b.Buf)}
  223. b.bufs = nil
  224. b.toPool = nil
  225. b.Buf = nil
  226. return ret
  227. }
上海开阖软件有限公司 沪ICP备12045867号-1