本站源代码
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

268 lines
5.6KB

  1. package gomemcached
  2. import (
  3. "encoding/binary"
  4. "fmt"
  5. "io"
  6. "sync"
  7. )
  8. // MCResponse is memcached response
  9. type MCResponse struct {
  10. // The command opcode of the command that sent the request
  11. Opcode CommandCode
  12. // The status of the response
  13. Status Status
  14. // The opaque sent in the request
  15. Opaque uint32
  16. // The CAS identifier (if applicable)
  17. Cas uint64
  18. // Extras, key, and body for this response
  19. Extras, Key, Body []byte
  20. // If true, this represents a fatal condition and we should hang up
  21. Fatal bool
  22. // Datatype identifier
  23. DataType uint8
  24. }
  25. // A debugging string representation of this response
  26. func (res MCResponse) String() string {
  27. return fmt.Sprintf("{MCResponse status=%v keylen=%d, extralen=%d, bodylen=%d}",
  28. res.Status, len(res.Key), len(res.Extras), len(res.Body))
  29. }
  30. // Response as an error.
  31. func (res *MCResponse) Error() string {
  32. return fmt.Sprintf("MCResponse status=%v, opcode=%v, opaque=%v, msg: %s",
  33. res.Status, res.Opcode, res.Opaque, string(res.Body))
  34. }
  35. func errStatus(e error) Status {
  36. status := Status(0xffff)
  37. if res, ok := e.(*MCResponse); ok {
  38. status = res.Status
  39. }
  40. return status
  41. }
  42. // IsNotFound is true if this error represents a "not found" response.
  43. func IsNotFound(e error) bool {
  44. return errStatus(e) == KEY_ENOENT
  45. }
  46. // IsFatal is false if this error isn't believed to be fatal to a connection.
  47. func IsFatal(e error) bool {
  48. if e == nil {
  49. return false
  50. }
  51. _, ok := isFatal[errStatus(e)]
  52. if ok {
  53. return true
  54. }
  55. return false
  56. }
  57. // Size is number of bytes this response consumes on the wire.
  58. func (res *MCResponse) Size() int {
  59. return HDR_LEN + len(res.Extras) + len(res.Key) + len(res.Body)
  60. }
  61. func (res *MCResponse) fillHeaderBytes(data []byte) int {
  62. pos := 0
  63. data[pos] = RES_MAGIC
  64. pos++
  65. data[pos] = byte(res.Opcode)
  66. pos++
  67. binary.BigEndian.PutUint16(data[pos:pos+2],
  68. uint16(len(res.Key)))
  69. pos += 2
  70. // 4
  71. data[pos] = byte(len(res.Extras))
  72. pos++
  73. // Data type
  74. if res.DataType != 0 {
  75. data[pos] = byte(res.DataType)
  76. } else {
  77. data[pos] = 0
  78. }
  79. pos++
  80. binary.BigEndian.PutUint16(data[pos:pos+2], uint16(res.Status))
  81. pos += 2
  82. // 8
  83. binary.BigEndian.PutUint32(data[pos:pos+4],
  84. uint32(len(res.Body)+len(res.Key)+len(res.Extras)))
  85. pos += 4
  86. // 12
  87. binary.BigEndian.PutUint32(data[pos:pos+4], res.Opaque)
  88. pos += 4
  89. // 16
  90. binary.BigEndian.PutUint64(data[pos:pos+8], res.Cas)
  91. pos += 8
  92. if len(res.Extras) > 0 {
  93. copy(data[pos:pos+len(res.Extras)], res.Extras)
  94. pos += len(res.Extras)
  95. }
  96. if len(res.Key) > 0 {
  97. copy(data[pos:pos+len(res.Key)], res.Key)
  98. pos += len(res.Key)
  99. }
  100. return pos
  101. }
  102. // HeaderBytes will get just the header bytes for this response.
  103. func (res *MCResponse) HeaderBytes() []byte {
  104. data := make([]byte, HDR_LEN+len(res.Extras)+len(res.Key))
  105. res.fillHeaderBytes(data)
  106. return data
  107. }
  108. // Bytes will return the actual bytes transmitted for this response.
  109. func (res *MCResponse) Bytes() []byte {
  110. data := make([]byte, res.Size())
  111. pos := res.fillHeaderBytes(data)
  112. copy(data[pos:pos+len(res.Body)], res.Body)
  113. return data
  114. }
  115. // Transmit will send this response message across a writer.
  116. func (res *MCResponse) Transmit(w io.Writer) (n int, err error) {
  117. if len(res.Body) < 128 {
  118. n, err = w.Write(res.Bytes())
  119. } else {
  120. n, err = w.Write(res.HeaderBytes())
  121. if err == nil {
  122. m := 0
  123. m, err = w.Write(res.Body)
  124. m += n
  125. }
  126. }
  127. return
  128. }
  129. // Receive will fill this MCResponse with the data from this reader.
  130. func (res *MCResponse) Receive(r io.Reader, hdrBytes []byte) (n int, err error) {
  131. if len(hdrBytes) < HDR_LEN {
  132. hdrBytes = []byte{
  133. 0, 0, 0, 0, 0, 0, 0, 0,
  134. 0, 0, 0, 0, 0, 0, 0, 0,
  135. 0, 0, 0, 0, 0, 0, 0, 0}
  136. }
  137. n, err = io.ReadFull(r, hdrBytes)
  138. if err != nil {
  139. return n, err
  140. }
  141. if hdrBytes[0] != RES_MAGIC && hdrBytes[0] != REQ_MAGIC {
  142. return n, fmt.Errorf("bad magic: 0x%02x", hdrBytes[0])
  143. }
  144. klen := int(binary.BigEndian.Uint16(hdrBytes[2:4]))
  145. elen := int(hdrBytes[4])
  146. res.Opcode = CommandCode(hdrBytes[1])
  147. res.DataType = uint8(hdrBytes[5])
  148. res.Status = Status(binary.BigEndian.Uint16(hdrBytes[6:8]))
  149. res.Opaque = binary.BigEndian.Uint32(hdrBytes[12:16])
  150. res.Cas = binary.BigEndian.Uint64(hdrBytes[16:24])
  151. bodyLen := int(binary.BigEndian.Uint32(hdrBytes[8:12])) - (klen + elen)
  152. //defer function to debug the panic seen with MB-15557
  153. defer func() {
  154. if e := recover(); e != nil {
  155. err = fmt.Errorf(`Panic in Receive. Response %v \n
  156. key len %v extra len %v bodylen %v`, res, klen, elen, bodyLen)
  157. }
  158. }()
  159. buf := make([]byte, klen+elen+bodyLen)
  160. m, err := io.ReadFull(r, buf)
  161. if err == nil {
  162. res.Extras = buf[0:elen]
  163. res.Key = buf[elen : klen+elen]
  164. res.Body = buf[klen+elen:]
  165. }
  166. return n + m, err
  167. }
  168. type MCResponsePool struct {
  169. pool *sync.Pool
  170. }
  171. func NewMCResponsePool() *MCResponsePool {
  172. rv := &MCResponsePool{
  173. pool: &sync.Pool{
  174. New: func() interface{} {
  175. return &MCResponse{}
  176. },
  177. },
  178. }
  179. return rv
  180. }
  181. func (this *MCResponsePool) Get() *MCResponse {
  182. return this.pool.Get().(*MCResponse)
  183. }
  184. func (this *MCResponsePool) Put(r *MCResponse) {
  185. if r == nil {
  186. return
  187. }
  188. r.Extras = nil
  189. r.Key = nil
  190. r.Body = nil
  191. r.Fatal = false
  192. this.pool.Put(r)
  193. }
  194. type StringMCResponsePool struct {
  195. pool *sync.Pool
  196. size int
  197. }
  198. func NewStringMCResponsePool(size int) *StringMCResponsePool {
  199. rv := &StringMCResponsePool{
  200. pool: &sync.Pool{
  201. New: func() interface{} {
  202. return make(map[string]*MCResponse, size)
  203. },
  204. },
  205. size: size,
  206. }
  207. return rv
  208. }
  209. func (this *StringMCResponsePool) Get() map[string]*MCResponse {
  210. return this.pool.Get().(map[string]*MCResponse)
  211. }
  212. func (this *StringMCResponsePool) Put(m map[string]*MCResponse) {
  213. if m == nil || len(m) > 2*this.size {
  214. return
  215. }
  216. for k := range m {
  217. m[k] = nil
  218. delete(m, k)
  219. }
  220. this.pool.Put(m)
  221. }
上海开阖软件有限公司 沪ICP备12045867号-1