1 // Copyright 2012 Google, Inc. All rights reserved.
3 // Use of this source code is governed by a BSD-style license
4 // that can be found in the LICENSE file in the root of the source
7 // Package tcpreader provides an implementation for tcpassembly.Stream which presents
8 // the caller with an io.Reader for easy processing.
10 // The assembly package handles packet data reordering, but its output is
11 // library-specific, thus not usable by the majority of external Go libraries.
12 // The io.Reader interface, on the other hand, is used throughout much of Go
13 // code as an easy mechanism for reading in data streams and decoding them. For
14 // example, the net/http package provides the ReadRequest function, which can
15 // parase an HTTP request from a live data stream, just what we'd want when
16 // sniffing HTTP traffic. Using ReaderStream, this is relatively easy to set
19 // // Create our StreamFactory
20 // type httpStreamFactory struct {}
21 // func (f *httpStreamFactory) New(a, b gopacket.Flow) {
22 // r := tcpreader.NewReaderStream(false)
23 // go printRequests(r)
26 // func printRequests(r io.Reader) {
27 // // Convert to bufio, since that's what ReadRequest wants.
28 // buf := bufio.NewReader(r)
30 // if req, err := http.ReadRequest(buf); err == io.EOF {
32 // } else if err != nil {
33 // log.Println("Error parsing HTTP requests:", err)
35 // fmt.Println("HTTP REQUEST:", req)
36 // fmt.Println("Body contains", tcpreader.DiscardBytesToEOF(req.Body), "bytes")
41 // Using just this code, we're able to reference a powerful, built-in library
42 // for HTTP request parsing to do all the dirty-work of parsing requests from
43 // the wire in real-time. Pass this stream factory to an tcpassembly.StreamPool,
44 // start up an tcpassembly.Assembler, and you're good to go!
49 "github.com/google/gopacket/tcpassembly"
53 var discardBuffer = make([]byte, 4096)
55 // DiscardBytesToFirstError will read in all bytes up to the first error
56 // reported by the given reader, then return the number of bytes discarded
57 // and the error encountered.
58 func DiscardBytesToFirstError(r io.Reader) (discarded int, err error) {
60 n, e := r.Read(discardBuffer)
68 // DiscardBytesToEOF will read in all bytes from a Reader until it
69 // encounters an io.EOF, then return the number of bytes. Be careful
70 // of this... if used on a Reader that returns a non-io.EOF error
71 // consistently, this will loop forever discarding that error while
72 // it waits for an EOF.
73 func DiscardBytesToEOF(r io.Reader) (discarded int) {
75 n, e := DiscardBytesToFirstError(r)
83 // ReaderStream implements both tcpassembly.Stream and io.Reader. You can use it
84 // as a building block to make simple, easy stream handlers.
86 // IMPORTANT: If you use a ReaderStream, you MUST read ALL BYTES from it,
87 // quickly. Not reading available bytes will block TCP stream reassembly. It's
88 // a common pattern to do this by starting a goroutine in the factory's New
91 // type myStreamHandler struct {
94 // func (m *myStreamHandler) run() {
95 // // Do something here that reads all of the ReaderStream, or your assembly
97 // fmt.Println(tcpreader.DiscardBytesToEOF(&m.r))
99 // func (f *myStreamFactory) New(a, b gopacket.Flow) tcpassembly.Stream {
100 // s := &myStreamHandler{}
102 // // Return the ReaderStream as the stream that assembly should populate.
105 type ReaderStream struct {
107 reassembled chan []tcpassembly.Reassembly
109 current []tcpassembly.Reassembly
116 // ReaderStreamOptions provides user-resettable options for a ReaderStream.
117 type ReaderStreamOptions struct {
118 // LossErrors determines whether this stream will return
119 // ReaderStreamDataLoss errors from its Read function whenever it
120 // determines data has been lost.
124 // NewReaderStream returns a new ReaderStream object.
125 func NewReaderStream() ReaderStream {
127 reassembled: make(chan []tcpassembly.Reassembly),
128 done: make(chan bool),
135 // Reassembled implements tcpassembly.Stream's Reassembled function.
136 func (r *ReaderStream) Reassembled(reassembly []tcpassembly.Reassembly) {
138 panic("ReaderStream not created via NewReaderStream")
140 r.reassembled <- reassembly
144 // ReassemblyComplete implements tcpassembly.Stream's ReassemblyComplete function.
145 func (r *ReaderStream) ReassemblyComplete() {
150 // stripEmpty strips empty reassembly slices off the front of its current set of
152 func (r *ReaderStream) stripEmpty() {
153 for len(r.current) > 0 && len(r.current[0].Bytes) == 0 {
154 r.current = r.current[1:]
155 r.lossReported = false
159 // DataLost is returned by the ReaderStream's Read function when it encounters
160 // a Reassembly with Skip != 0.
161 var DataLost = errors.New("lost data")
163 // Read implements io.Reader's Read function.
164 // Given a byte slice, it will either copy a non-zero number of bytes into
165 // that slice and return the number of bytes and a nil error, or it will
166 // leave slice p as is and return 0, io.EOF.
167 func (r *ReaderStream) Read(p []byte) (int, error) {
169 panic("ReaderStream not created via NewReaderStream")
173 for !r.closed && len(r.current) == 0 {
179 if r.current, ok = <-r.reassembled; ok {
185 if len(r.current) > 0 {
186 current := &r.current[0]
187 if r.LossErrors && !r.lossReported && current.Skip != 0 {
188 r.lossReported = true
191 length := copy(p, current.Bytes)
192 current.Bytes = current.Bytes[length:]
198 // Close implements io.Closer's Close function, making ReaderStream a
199 // io.ReadCloser. It discards all remaining bytes in the reassembly in a
200 // manner that's safe for the assembler (IE: it doesn't block).
201 func (r *ReaderStream) Close() error {
205 if _, ok := <-r.reassembled; !ok {