ODPM 266: Go-libmemif + 2 examples.
[govpp.git] / vendor / github.com / google / gopacket / tcpassembly / tcpreader / reader.go
1 // Copyright 2012 Google, Inc. All rights reserved.
2 //
3 // Use of this source code is governed by a BSD-style license
4 // that can be found in the LICENSE file in the root of the source
5 // tree.
6
7 // Package tcpreader provides an implementation for tcpassembly.Stream which presents
8 // the caller with an io.Reader for easy processing.
9 //
10 // The assembly package handles packet data reordering, but its output is
11 // library-specific, thus not usable by the majority of external Go libraries.
12 // The io.Reader interface, on the other hand, is used throughout much of Go
13 // code as an easy mechanism for reading in data streams and decoding them.  For
14 // example, the net/http package provides the ReadRequest function, which can
15 // parase an HTTP request from a live data stream, just what we'd want when
16 // sniffing HTTP traffic.  Using ReaderStream, this is relatively easy to set
17 // up:
18 //
19 //  // Create our StreamFactory
20 //  type httpStreamFactory struct {}
21 //  func (f *httpStreamFactory) New(a, b gopacket.Flow) {
22 //      r := tcpreader.NewReaderStream(false)
23 //      go printRequests(r)
24 //      return &r
25 //  }
26 //  func printRequests(r io.Reader) {
27 //    // Convert to bufio, since that's what ReadRequest wants.
28 //      buf := bufio.NewReader(r)
29 //      for {
30 //              if req, err := http.ReadRequest(buf); err == io.EOF {
31 //                      return
32 //              } else if err != nil {
33 //                      log.Println("Error parsing HTTP requests:", err)
34 //              } else {
35 //                      fmt.Println("HTTP REQUEST:", req)
36 //                      fmt.Println("Body contains", tcpreader.DiscardBytesToEOF(req.Body), "bytes")
37 //              }
38 //      }
39 //  }
40 //
41 // Using just this code, we're able to reference a powerful, built-in library
42 // for HTTP request parsing to do all the dirty-work of parsing requests from
43 // the wire in real-time.  Pass this stream factory to an tcpassembly.StreamPool,
44 // start up an tcpassembly.Assembler, and you're good to go!
45 package tcpreader
46
47 import (
48         "errors"
49         "github.com/google/gopacket/tcpassembly"
50         "io"
51 )
52
53 var discardBuffer = make([]byte, 4096)
54
55 // DiscardBytesToFirstError will read in all bytes up to the first error
56 // reported by the given reader, then return the number of bytes discarded
57 // and the error encountered.
58 func DiscardBytesToFirstError(r io.Reader) (discarded int, err error) {
59         for {
60                 n, e := r.Read(discardBuffer)
61                 discarded += n
62                 if e != nil {
63                         return discarded, e
64                 }
65         }
66 }
67
68 // DiscardBytesToEOF will read in all bytes from a Reader until it
69 // encounters an io.EOF, then return the number of bytes.  Be careful
70 // of this... if used on a Reader that returns a non-io.EOF error
71 // consistently, this will loop forever discarding that error while
72 // it waits for an EOF.
73 func DiscardBytesToEOF(r io.Reader) (discarded int) {
74         for {
75                 n, e := DiscardBytesToFirstError(r)
76                 discarded += n
77                 if e == io.EOF {
78                         return
79                 }
80         }
81 }
82
83 // ReaderStream implements both tcpassembly.Stream and io.Reader.  You can use it
84 // as a building block to make simple, easy stream handlers.
85 //
86 // IMPORTANT:  If you use a ReaderStream, you MUST read ALL BYTES from it,
87 // quickly.  Not reading available bytes will block TCP stream reassembly.  It's
88 // a common pattern to do this by starting a goroutine in the factory's New
89 // method:
90 //
91 //  type myStreamHandler struct {
92 //      r ReaderStream
93 //  }
94 //  func (m *myStreamHandler) run() {
95 //      // Do something here that reads all of the ReaderStream, or your assembly
96 //      // will block.
97 //      fmt.Println(tcpreader.DiscardBytesToEOF(&m.r))
98 //  }
99 //  func (f *myStreamFactory) New(a, b gopacket.Flow) tcpassembly.Stream {
100 //      s := &myStreamHandler{}
101 //      go s.run()
102 //      // Return the ReaderStream as the stream that assembly should populate.
103 //      return &s.r
104 //  }
105 type ReaderStream struct {
106         ReaderStreamOptions
107         reassembled  chan []tcpassembly.Reassembly
108         done         chan bool
109         current      []tcpassembly.Reassembly
110         closed       bool
111         lossReported bool
112         first        bool
113         initiated    bool
114 }
115
116 // ReaderStreamOptions provides user-resettable options for a ReaderStream.
117 type ReaderStreamOptions struct {
118         // LossErrors determines whether this stream will return
119         // ReaderStreamDataLoss errors from its Read function whenever it
120         // determines data has been lost.
121         LossErrors bool
122 }
123
124 // NewReaderStream returns a new ReaderStream object.
125 func NewReaderStream() ReaderStream {
126         r := ReaderStream{
127                 reassembled: make(chan []tcpassembly.Reassembly),
128                 done:        make(chan bool),
129                 first:       true,
130                 initiated:   true,
131         }
132         return r
133 }
134
135 // Reassembled implements tcpassembly.Stream's Reassembled function.
136 func (r *ReaderStream) Reassembled(reassembly []tcpassembly.Reassembly) {
137         if !r.initiated {
138                 panic("ReaderStream not created via NewReaderStream")
139         }
140         r.reassembled <- reassembly
141         <-r.done
142 }
143
144 // ReassemblyComplete implements tcpassembly.Stream's ReassemblyComplete function.
145 func (r *ReaderStream) ReassemblyComplete() {
146         close(r.reassembled)
147         close(r.done)
148 }
149
150 // stripEmpty strips empty reassembly slices off the front of its current set of
151 // slices.
152 func (r *ReaderStream) stripEmpty() {
153         for len(r.current) > 0 && len(r.current[0].Bytes) == 0 {
154                 r.current = r.current[1:]
155                 r.lossReported = false
156         }
157 }
158
159 // DataLost is returned by the ReaderStream's Read function when it encounters
160 // a Reassembly with Skip != 0.
161 var DataLost = errors.New("lost data")
162
163 // Read implements io.Reader's Read function.
164 // Given a byte slice, it will either copy a non-zero number of bytes into
165 // that slice and return the number of bytes and a nil error, or it will
166 // leave slice p as is and return 0, io.EOF.
167 func (r *ReaderStream) Read(p []byte) (int, error) {
168         if !r.initiated {
169                 panic("ReaderStream not created via NewReaderStream")
170         }
171         var ok bool
172         r.stripEmpty()
173         for !r.closed && len(r.current) == 0 {
174                 if r.first {
175                         r.first = false
176                 } else {
177                         r.done <- true
178                 }
179                 if r.current, ok = <-r.reassembled; ok {
180                         r.stripEmpty()
181                 } else {
182                         r.closed = true
183                 }
184         }
185         if len(r.current) > 0 {
186                 current := &r.current[0]
187                 if r.LossErrors && !r.lossReported && current.Skip != 0 {
188                         r.lossReported = true
189                         return 0, DataLost
190                 }
191                 length := copy(p, current.Bytes)
192                 current.Bytes = current.Bytes[length:]
193                 return length, nil
194         }
195         return 0, io.EOF
196 }
197
198 // Close implements io.Closer's Close function, making ReaderStream a
199 // io.ReadCloser.  It discards all remaining bytes in the reassembly in a
200 // manner that's safe for the assembler (IE: it doesn't block).
201 func (r *ReaderStream) Close() error {
202         r.current = nil
203         r.closed = true
204         for {
205                 if _, ok := <-r.reassembled; !ok {
206                         return nil
207                 }
208                 r.done <- true
209         }
210 }