本站源代码
Nie możesz wybrać więcej, niż 25 tematów Tematy muszą się zaczynać od litery lub cyfry, mogą zawierać myślniki ('-') i mogą mieć do 35 znaków.

514 lines
14KB

  1. package unsnap
  2. import (
  3. "bytes"
  4. "encoding/binary"
  5. "fmt"
  6. "io"
  7. "io/ioutil"
  8. "os"
  9. "hash/crc32"
  10. snappy "github.com/golang/snappy"
  11. // The C library can be used, but this makes the binary dependent
  12. // lots of extraneous c-libraries; it is no longer stand-alone. Yuck.
  13. //
  14. // Therefore we comment out the "dgryski/go-csnappy" path and use the
  15. // "github.com/golang/snappy/snappy" above instead. If you are
  16. // performance limited and can deal with distributing more libraries,
  17. // then this is easy to swap.
  18. //
  19. // If you swap, note that some of the tests won't pass
  20. // because snappy-go produces slightly different (but still
  21. // conformant) encodings on some data. Here are bindings
  22. // to the C-snappy:
  23. // snappy "github.com/dgryski/go-csnappy"
  24. )
  25. // SnappyFile: create a drop-in-replacement/wrapper for an *os.File that handles doing the unsnappification online as more is read from it
  26. type SnappyFile struct {
  27. Fname string
  28. Reader io.Reader
  29. Writer io.Writer
  30. // allow clients to substitute us for an os.File and just switch
  31. // off compression if they don't want it.
  32. SnappyEncodeDecodeOff bool // if true, we bypass straight to Filep
  33. EncBuf FixedSizeRingBuf // holds any extra that isn't yet returned, encoded
  34. DecBuf FixedSizeRingBuf // holds any extra that isn't yet returned, decoded
  35. // for writing to stream-framed snappy
  36. HeaderChunkWritten bool
  37. // Sanity check: we can only read, or only write, to one SnappyFile.
  38. // EncBuf and DecBuf are used differently in each mode. Verify
  39. // that we are consistent with this flag.
  40. Writing bool
  41. }
  42. var total int
  43. // for debugging, show state of buffers
  44. func (f *SnappyFile) Dump() {
  45. fmt.Printf("EncBuf has length %d and contents:\n%s\n", len(f.EncBuf.Bytes()), string(f.EncBuf.Bytes()))
  46. fmt.Printf("DecBuf has length %d and contents:\n%s\n", len(f.DecBuf.Bytes()), string(f.DecBuf.Bytes()))
  47. }
  48. func (f *SnappyFile) Read(p []byte) (n int, err error) {
  49. if f.SnappyEncodeDecodeOff {
  50. return f.Reader.Read(p)
  51. }
  52. if f.Writing {
  53. panic("Reading on a write-only SnappyFile")
  54. }
  55. // before we unencrypt more, try to drain the DecBuf first
  56. n, _ = f.DecBuf.Read(p)
  57. if n > 0 {
  58. total += n
  59. return n, nil
  60. }
  61. //nEncRead, nDecAdded, err := UnsnapOneFrame(f.Filep, &f.EncBuf, &f.DecBuf, f.Fname)
  62. _, _, err = UnsnapOneFrame(f.Reader, &f.EncBuf, &f.DecBuf, f.Fname)
  63. if err != nil && err != io.EOF {
  64. panic(err)
  65. }
  66. n, _ = f.DecBuf.Read(p)
  67. if n > 0 {
  68. total += n
  69. return n, nil
  70. }
  71. if f.DecBuf.Readable == 0 {
  72. if f.DecBuf.Readable == 0 && f.EncBuf.Readable == 0 {
  73. // only now (when EncBuf is empty) can we give io.EOF.
  74. // Any earlier, and we leave stuff un-decoded!
  75. return 0, io.EOF
  76. }
  77. }
  78. return 0, nil
  79. }
  80. func Open(name string) (file *SnappyFile, err error) {
  81. fp, err := os.Open(name)
  82. if err != nil {
  83. return nil, err
  84. }
  85. // encoding in snappy can apparently go beyond the original size, so
  86. // we make our buffers big enough, 2*max snappy chunk => 2 * CHUNK_MAX(65536)
  87. snap := NewReader(fp)
  88. snap.Fname = name
  89. return snap, nil
  90. }
  91. func NewReader(r io.Reader) *SnappyFile {
  92. return &SnappyFile{
  93. Reader: r,
  94. EncBuf: *NewFixedSizeRingBuf(CHUNK_MAX * 2), // buffer of snappy encoded bytes
  95. DecBuf: *NewFixedSizeRingBuf(CHUNK_MAX * 2), // buffer of snapppy decoded bytes
  96. Writing: false,
  97. }
  98. }
  99. func NewWriter(w io.Writer) *SnappyFile {
  100. return &SnappyFile{
  101. Writer: w,
  102. EncBuf: *NewFixedSizeRingBuf(65536), // on writing: temp for testing compression
  103. DecBuf: *NewFixedSizeRingBuf(65536 * 2), // on writing: final buffer of snappy framed and encoded bytes
  104. Writing: true,
  105. }
  106. }
  107. func Create(name string) (file *SnappyFile, err error) {
  108. fp, err := os.Create(name)
  109. if err != nil {
  110. return nil, err
  111. }
  112. snap := NewWriter(fp)
  113. snap.Fname = name
  114. return snap, nil
  115. }
  116. func (f *SnappyFile) Close() error {
  117. if f.Writing {
  118. wc, ok := f.Writer.(io.WriteCloser)
  119. if ok {
  120. return wc.Close()
  121. }
  122. return nil
  123. }
  124. rc, ok := f.Reader.(io.ReadCloser)
  125. if ok {
  126. return rc.Close()
  127. }
  128. return nil
  129. }
  130. func (f *SnappyFile) Sync() error {
  131. file, ok := f.Writer.(*os.File)
  132. if ok {
  133. return file.Sync()
  134. }
  135. return nil
  136. }
  137. // for an increment of a frame at a time:
  138. // read from r into encBuf (encBuf is still encoded, thus the name), and write unsnappified frames into outDecodedBuf
  139. // the returned n: number of bytes read from the encrypted encBuf
  140. func UnsnapOneFrame(r io.Reader, encBuf *FixedSizeRingBuf, outDecodedBuf *FixedSizeRingBuf, fname string) (nEnc int64, nDec int64, err error) {
  141. // b, err := ioutil.ReadAll(r)
  142. // if err != nil {
  143. // panic(err)
  144. // }
  145. nEnc = 0
  146. nDec = 0
  147. // read up to 65536 bytes from r into encBuf, at least a snappy frame
  148. nread, err := io.CopyN(encBuf, r, 65536) // returns nwrotebytes, err
  149. nEnc += nread
  150. if err != nil {
  151. if err == io.EOF {
  152. if nread == 0 {
  153. if encBuf.Readable == 0 {
  154. return nEnc, nDec, io.EOF
  155. }
  156. // else we have bytes in encBuf, so decode them!
  157. err = nil
  158. } else {
  159. // continue below, processing the nread bytes
  160. err = nil
  161. }
  162. } else {
  163. panic(err)
  164. }
  165. }
  166. // flag for printing chunk size alignment messages
  167. verbose := false
  168. const snappyStreamHeaderSz = 10
  169. const headerSz = 4
  170. const crc32Sz = 4
  171. // the magic 18 bytes accounts for the snappy streaming header and the first chunks size and checksum
  172. // http://code.google.com/p/snappy/source/browse/trunk/framing_format.txt
  173. chunk := (*encBuf).Bytes()
  174. // however we exit, advance as
  175. // defer func() { (*encBuf).Next(N) }()
  176. // 65536 is the max size of a snappy framed chunk. See
  177. // http://code.google.com/p/snappy/source/browse/trunk/framing_format.txt:91
  178. // buf := make([]byte, 65536)
  179. // fmt.Printf("read from file, b is len:%d with value: %#v\n", len(b), b)
  180. // fmt.Printf("read from file, bcut is len:%d with value: %#v\n", len(bcut), bcut)
  181. //fmt.Printf("raw bytes of chunksz are: %v\n", b[11:14])
  182. fourbytes := make([]byte, 4)
  183. chunkCount := 0
  184. for nDec < 65536 {
  185. if len(chunk) == 0 {
  186. break
  187. }
  188. chunkCount++
  189. fourbytes[3] = 0
  190. copy(fourbytes, chunk[1:4])
  191. chunksz := binary.LittleEndian.Uint32(fourbytes)
  192. chunk_type := chunk[0]
  193. switch true {
  194. case chunk_type == 0xff:
  195. { // stream identifier
  196. streamHeader := chunk[:snappyStreamHeaderSz]
  197. if 0 != bytes.Compare(streamHeader, []byte{0xff, 0x06, 0x00, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59}) {
  198. panic("file had chunk starting with 0xff but then no magic snappy streaming protocol bytes, aborting.")
  199. } else {
  200. //fmt.Printf("got streaming snappy magic header just fine.\n")
  201. }
  202. chunk = chunk[snappyStreamHeaderSz:]
  203. (*encBuf).Advance(snappyStreamHeaderSz)
  204. nEnc += snappyStreamHeaderSz
  205. continue
  206. }
  207. case chunk_type == 0x00:
  208. { // compressed data
  209. if verbose {
  210. fmt.Fprintf(os.Stderr, "chunksz is %d while total bytes avail are: %d\n", int(chunksz), len(chunk)-4)
  211. }
  212. crc := binary.LittleEndian.Uint32(chunk[headerSz:(headerSz + crc32Sz)])
  213. section := chunk[(headerSz + crc32Sz):(headerSz + chunksz)]
  214. dec, ok := snappy.Decode(nil, section)
  215. if ok != nil {
  216. // we've probably truncated a snappy frame at this point
  217. // ok=snappy: corrupt input
  218. // len(dec) == 0
  219. //
  220. panic(fmt.Sprintf("could not decode snappy stream: '%s' and len dec=%d and ok=%v\n", fname, len(dec), ok))
  221. // get back to caller with what we've got so far
  222. return nEnc, nDec, nil
  223. }
  224. // fmt.Printf("ok, b is %#v , %#v\n", ok, dec)
  225. // spit out decoded text
  226. // n, err := w.Write(dec)
  227. //fmt.Printf("len(dec) = %d, outDecodedBuf.Readable=%d\n", len(dec), outDecodedBuf.Readable)
  228. bnb := bytes.NewBuffer(dec)
  229. n, err := io.Copy(outDecodedBuf, bnb)
  230. if err != nil {
  231. //fmt.Printf("got n=%d, err= %s ; when trying to io.Copy(outDecodedBuf: N=%d, Readable=%d)\n", n, err, outDecodedBuf.N, outDecodedBuf.Readable)
  232. panic(err)
  233. }
  234. if n != int64(len(dec)) {
  235. panic("could not write all bytes to outDecodedBuf")
  236. }
  237. nDec += n
  238. // verify the crc32 rotated checksum
  239. m32 := masked_crc32c(dec)
  240. if m32 != crc {
  241. panic(fmt.Sprintf("crc32 masked failiure. expected: %v but got: %v", crc, m32))
  242. } else {
  243. //fmt.Printf("\nchecksums match: %v == %v\n", crc, m32)
  244. }
  245. // move to next header
  246. inc := (headerSz + int(chunksz))
  247. chunk = chunk[inc:]
  248. (*encBuf).Advance(inc)
  249. nEnc += int64(inc)
  250. continue
  251. }
  252. case chunk_type == 0x01:
  253. { // uncompressed data
  254. //n, err := w.Write(chunk[(headerSz+crc32Sz):(headerSz + int(chunksz))])
  255. n, err := io.Copy(outDecodedBuf, bytes.NewBuffer(chunk[(headerSz+crc32Sz):(headerSz+int(chunksz))]))
  256. if verbose {
  257. //fmt.Printf("debug: n=%d err=%v chunksz=%d outDecodedBuf='%v'\n", n, err, chunksz, outDecodedBuf)
  258. }
  259. if err != nil {
  260. panic(err)
  261. }
  262. if n != int64(chunksz-crc32Sz) {
  263. panic("could not write all bytes to stdout")
  264. }
  265. nDec += n
  266. inc := (headerSz + int(chunksz))
  267. chunk = chunk[inc:]
  268. (*encBuf).Advance(inc)
  269. nEnc += int64(inc)
  270. continue
  271. }
  272. case chunk_type == 0xfe:
  273. fallthrough // padding, just skip it
  274. case chunk_type >= 0x80 && chunk_type <= 0xfd:
  275. { // Reserved skippable chunks
  276. //fmt.Printf("\nin reserved skippable chunks, at nEnc=%v\n", nEnc)
  277. inc := (headerSz + int(chunksz))
  278. chunk = chunk[inc:]
  279. nEnc += int64(inc)
  280. (*encBuf).Advance(inc)
  281. continue
  282. }
  283. default:
  284. panic(fmt.Sprintf("unrecognized/unsupported chunk type %#v", chunk_type))
  285. }
  286. } // end for{}
  287. return nEnc, nDec, err
  288. //return int64(N), nil
  289. }
  290. // for whole file at once:
  291. //
  292. // receive on stdin a stream of bytes in the snappy-streaming framed
  293. // format, defined here: http://code.google.com/p/snappy/source/browse/trunk/framing_format.txt
  294. // Grab each frame, run it through the snappy decoder, and spit out
  295. // each frame all joined back-to-back on stdout.
  296. //
  297. func Unsnappy(r io.Reader, w io.Writer) (err error) {
  298. b, err := ioutil.ReadAll(r)
  299. if err != nil {
  300. panic(err)
  301. }
  302. // flag for printing chunk size alignment messages
  303. verbose := false
  304. const snappyStreamHeaderSz = 10
  305. const headerSz = 4
  306. const crc32Sz = 4
  307. // the magic 18 bytes accounts for the snappy streaming header and the first chunks size and checksum
  308. // http://code.google.com/p/snappy/source/browse/trunk/framing_format.txt
  309. chunk := b[:]
  310. // 65536 is the max size of a snappy framed chunk. See
  311. // http://code.google.com/p/snappy/source/browse/trunk/framing_format.txt:91
  312. //buf := make([]byte, 65536)
  313. // fmt.Printf("read from file, b is len:%d with value: %#v\n", len(b), b)
  314. // fmt.Printf("read from file, bcut is len:%d with value: %#v\n", len(bcut), bcut)
  315. //fmt.Printf("raw bytes of chunksz are: %v\n", b[11:14])
  316. fourbytes := make([]byte, 4)
  317. chunkCount := 0
  318. for {
  319. if len(chunk) == 0 {
  320. break
  321. }
  322. chunkCount++
  323. fourbytes[3] = 0
  324. copy(fourbytes, chunk[1:4])
  325. chunksz := binary.LittleEndian.Uint32(fourbytes)
  326. chunk_type := chunk[0]
  327. switch true {
  328. case chunk_type == 0xff:
  329. { // stream identifier
  330. streamHeader := chunk[:snappyStreamHeaderSz]
  331. if 0 != bytes.Compare(streamHeader, []byte{0xff, 0x06, 0x00, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59}) {
  332. panic("file had chunk starting with 0xff but then no magic snappy streaming protocol bytes, aborting.")
  333. } else {
  334. //fmt.Printf("got streaming snappy magic header just fine.\n")
  335. }
  336. chunk = chunk[snappyStreamHeaderSz:]
  337. continue
  338. }
  339. case chunk_type == 0x00:
  340. { // compressed data
  341. if verbose {
  342. fmt.Fprintf(os.Stderr, "chunksz is %d while total bytes avail are: %d\n", int(chunksz), len(chunk)-4)
  343. }
  344. //crc := binary.LittleEndian.Uint32(chunk[headerSz:(headerSz + crc32Sz)])
  345. section := chunk[(headerSz + crc32Sz):(headerSz + chunksz)]
  346. dec, ok := snappy.Decode(nil, section)
  347. if ok != nil {
  348. panic("could not decode snappy stream")
  349. }
  350. // fmt.Printf("ok, b is %#v , %#v\n", ok, dec)
  351. // spit out decoded text
  352. n, err := w.Write(dec)
  353. if err != nil {
  354. panic(err)
  355. }
  356. if n != len(dec) {
  357. panic("could not write all bytes to stdout")
  358. }
  359. // TODO: verify the crc32 rotated checksum?
  360. // move to next header
  361. chunk = chunk[(headerSz + int(chunksz)):]
  362. continue
  363. }
  364. case chunk_type == 0x01:
  365. { // uncompressed data
  366. //crc := binary.LittleEndian.Uint32(chunk[headerSz:(headerSz + crc32Sz)])
  367. section := chunk[(headerSz + crc32Sz):(headerSz + chunksz)]
  368. n, err := w.Write(section)
  369. if err != nil {
  370. panic(err)
  371. }
  372. if n != int(chunksz-crc32Sz) {
  373. panic("could not write all bytes to stdout")
  374. }
  375. chunk = chunk[(headerSz + int(chunksz)):]
  376. continue
  377. }
  378. case chunk_type == 0xfe:
  379. fallthrough // padding, just skip it
  380. case chunk_type >= 0x80 && chunk_type <= 0xfd:
  381. { // Reserved skippable chunks
  382. chunk = chunk[(headerSz + int(chunksz)):]
  383. continue
  384. }
  385. default:
  386. panic(fmt.Sprintf("unrecognized/unsupported chunk type %#v", chunk_type))
  387. }
  388. } // end for{}
  389. return nil
  390. }
  391. // 0xff 0x06 0x00 0x00 sNaPpY
  392. var SnappyStreamHeaderMagic = []byte{0xff, 0x06, 0x00, 0x00, 0x73, 0x4e, 0x61, 0x50, 0x70, 0x59}
  393. const CHUNK_MAX = 65536
  394. const _STREAM_TO_STREAM_BLOCK_SIZE = CHUNK_MAX
  395. const _STREAM_IDENTIFIER = `sNaPpY`
  396. const _COMPRESSED_CHUNK = 0x00
  397. const _UNCOMPRESSED_CHUNK = 0x01
  398. const _IDENTIFIER_CHUNK = 0xff
  399. const _RESERVED_UNSKIPPABLE0 = 0x02 // chunk ranges are [inclusive, exclusive)
  400. const _RESERVED_UNSKIPPABLE1 = 0x80
  401. const _RESERVED_SKIPPABLE0 = 0x80
  402. const _RESERVED_SKIPPABLE1 = 0xff
  403. // the minimum percent of bytes compression must save to be enabled in automatic
  404. // mode
  405. const _COMPRESSION_THRESHOLD = .125
  406. var crctab *crc32.Table
  407. func init() {
  408. crctab = crc32.MakeTable(crc32.Castagnoli) // this is correct table, matches the crc32c.c code used by python
  409. }
  410. func masked_crc32c(data []byte) uint32 {
  411. // see the framing format specification, http://code.google.com/p/snappy/source/browse/trunk/framing_format.txt
  412. var crc uint32 = crc32.Checksum(data, crctab)
  413. return (uint32((crc>>15)|(crc<<17)) + 0xa282ead8)
  414. }
  415. func ReadSnappyStreamCompressedFile(filename string) ([]byte, error) {
  416. snappyFile, err := Open(filename)
  417. if err != nil {
  418. return []byte{}, err
  419. }
  420. var bb bytes.Buffer
  421. _, err = bb.ReadFrom(snappyFile)
  422. if err == io.EOF {
  423. err = nil
  424. }
  425. if err != nil {
  426. panic(err)
  427. }
  428. return bb.Bytes(), err
  429. }
上海开阖软件有限公司 沪ICP备12045867号-1