ODPM 266: Go-libmemif + 2 examples.
[govpp.git] / vendor / github.com / google / gopacket / tcpassembly / assembly.go
1 // Copyright 2012 Google, Inc. All rights reserved.
2 //
3 // Use of this source code is governed by a BSD-style license
4 // that can be found in the LICENSE file in the root of the source
5 // tree.
6
7 // Package tcpassembly provides TCP stream re-assembly.
8 //
9 // The tcpassembly package implements uni-directional TCP reassembly, for use in
10 // packet-sniffing applications.  The caller reads packets off the wire, then
11 // presents them to an Assembler in the form of gopacket layers.TCP packets
12 // (github.com/google/gopacket, github.com/google/gopacket/layers).
13 //
14 // The Assembler uses a user-supplied
15 // StreamFactory to create a user-defined Stream interface, then passes packet
16 // data in stream order to that object.  A concurrency-safe StreamPool keeps
17 // track of all current Streams being reassembled, so multiple Assemblers may
18 // run at once to assemble packets while taking advantage of multiple cores.
19 package tcpassembly
20
21 import (
22         "flag"
23         "fmt"
24         "github.com/google/gopacket"
25         "github.com/google/gopacket/layers"
26         "log"
27         "sync"
28         "time"
29 )
30
31 var memLog = flag.Bool("assembly_memuse_log", false, "If true, the github.com/google/gopacket/tcpassembly library will log information regarding its memory use every once in a while.")
32 var debugLog = flag.Bool("assembly_debug_log", false, "If true, the github.com/google/gopacket/tcpassembly library will log verbose debugging information (at least one line per packet)")
33
34 const invalidSequence = -1
35 const uint32Max = 0xFFFFFFFF
36
37 // Sequence is a TCP sequence number.  It provides a few convenience functions
38 // for handling TCP wrap-around.  The sequence should always be in the range
39 // [0,0xFFFFFFFF]... its other bits are simply used in wrap-around calculations
40 // and should never be set.
41 type Sequence int64
42
43 // Difference defines an ordering for comparing TCP sequences that's safe for
44 // roll-overs.  It returns:
45 //    > 0 : if t comes after s
46 //    < 0 : if t comes before s
47 //      0 : if t == s
48 // The number returned is the sequence difference, so 4.Difference(8) will
49 // return 4.
50 //
51 // It handles rollovers by considering any sequence in the first quarter of the
52 // uint32 space to be after any sequence in the last quarter of that space, thus
53 // wrapping the uint32 space.
54 func (s Sequence) Difference(t Sequence) int {
55         if s > uint32Max-uint32Max/4 && t < uint32Max/4 {
56                 t += uint32Max
57         } else if t > uint32Max-uint32Max/4 && s < uint32Max/4 {
58                 s += uint32Max
59         }
60         return int(t - s)
61 }
62
63 // Add adds an integer to a sequence and returns the resulting sequence.
64 func (s Sequence) Add(t int) Sequence {
65         return (s + Sequence(t)) & uint32Max
66 }
67
68 // Reassembly objects are passed by an Assembler into Streams using the
69 // Reassembled call.  Callers should not need to create these structs themselves
70 // except for testing.
71 type Reassembly struct {
72         // Bytes is the next set of bytes in the stream.  May be empty.
73         Bytes []byte
74         // Skip is set to non-zero if bytes were skipped between this and the
75         // last Reassembly.  If this is the first packet in a connection and we
76         // didn't see the start, we have no idea how many bytes we skipped, so
77         // we set it to -1.  Otherwise, it's set to the number of bytes skipped.
78         Skip int
79         // Start is set if this set of bytes has a TCP SYN accompanying it.
80         Start bool
81         // End is set if this set of bytes has a TCP FIN or RST accompanying it.
82         End bool
83         // Seen is the timestamp this set of bytes was pulled off the wire.
84         Seen time.Time
85 }
86
87 const pageBytes = 1900
88
89 // page is used to store TCP data we're not ready for yet (out-of-order
90 // packets).  Unused pages are stored in and returned from a pageCache, which
91 // avoids memory allocation.  Used pages are stored in a doubly-linked list in
92 // a connection.
93 type page struct {
94         Reassembly
95         seq        Sequence
96         index      int
97         prev, next *page
98         buf        [pageBytes]byte
99 }
100
101 // pageCache is a concurrency-unsafe store of page objects we use to avoid
102 // memory allocation as much as we can.  It grows but never shrinks.
103 type pageCache struct {
104         free         []*page
105         pcSize       int
106         size, used   int
107         pages        [][]page
108         pageRequests int64
109 }
110
111 const initialAllocSize = 1024
112
113 func newPageCache() *pageCache {
114         pc := &pageCache{
115                 free:   make([]*page, 0, initialAllocSize),
116                 pcSize: initialAllocSize,
117         }
118         pc.grow()
119         return pc
120 }
121
122 // grow exponentially increases the size of our page cache as much as necessary.
123 func (c *pageCache) grow() {
124         pages := make([]page, c.pcSize)
125         c.pages = append(c.pages, pages)
126         c.size += c.pcSize
127         for i := range pages {
128                 c.free = append(c.free, &pages[i])
129         }
130         if *memLog {
131                 log.Println("PageCache: created", c.pcSize, "new pages")
132         }
133         c.pcSize *= 2
134 }
135
136 // next returns a clean, ready-to-use page object.
137 func (c *pageCache) next(ts time.Time) (p *page) {
138         if *memLog {
139                 c.pageRequests++
140                 if c.pageRequests&0xFFFF == 0 {
141                         log.Println("PageCache:", c.pageRequests, "requested,", c.used, "used,", len(c.free), "free")
142                 }
143         }
144         if len(c.free) == 0 {
145                 c.grow()
146         }
147         i := len(c.free) - 1
148         p, c.free = c.free[i], c.free[:i]
149         p.prev = nil
150         p.next = nil
151         p.Seen = ts
152         p.Bytes = p.buf[:0]
153         c.used++
154         return p
155 }
156
157 // replace replaces a page into the pageCache.
158 func (c *pageCache) replace(p *page) {
159         c.used--
160         c.free = append(c.free, p)
161 }
162
163 // Stream is implemented by the caller to handle incoming reassembled
164 // TCP data.  Callers create a StreamFactory, then StreamPool uses
165 // it to create a new Stream for every TCP stream.
166 //
167 // assembly will, in order:
168 //    1) Create the stream via StreamFactory.New
169 //    2) Call Reassembled 0 or more times, passing in reassembled TCP data in order
170 //    3) Call ReassemblyComplete one time, after which the stream is dereferenced by assembly.
171 type Stream interface {
172         // Reassembled is called zero or more times.  assembly guarantees
173         // that the set of all Reassembly objects passed in during all
174         // calls are presented in the order they appear in the TCP stream.
175         // Reassembly objects are reused after each Reassembled call,
176         // so it's important to copy anything you need out of them
177         // (specifically out of Reassembly.Bytes) that you need to stay
178         // around after you return from the Reassembled call.
179         Reassembled([]Reassembly)
180         // ReassemblyComplete is called when assembly decides there is
181         // no more data for this Stream, either because a FIN or RST packet
182         // was seen, or because the stream has timed out without any new
183         // packet data (due to a call to FlushOlderThan).
184         ReassemblyComplete()
185 }
186
187 // StreamFactory is used by assembly to create a new stream for each
188 // new TCP session.
189 type StreamFactory interface {
190         // New should return a new stream for the given TCP key.
191         New(netFlow, tcpFlow gopacket.Flow) Stream
192 }
193
194 func (p *StreamPool) connections() []*connection {
195         p.mu.RLock()
196         conns := make([]*connection, 0, len(p.conns))
197         for _, conn := range p.conns {
198                 conns = append(conns, conn)
199         }
200         p.mu.RUnlock()
201         return conns
202 }
203
204 // FlushOptions provide options for flushing connections.
205 type FlushOptions struct {
206         T        time.Time // If nonzero, only connections with data older than T are flushed
207         CloseAll bool      // If true, ALL connections are closed post flush, not just those that correctly see FIN/RST.
208 }
209
210 // FlushWithOptions finds any streams waiting for packets older than
211 // the given time, and pushes through the data they have (IE: tells
212 // them to stop waiting and skip the data they're waiting for).
213 //
214 // Each Stream maintains a list of zero or more sets of bytes it has received
215 // out-of-order.  For example, if it has processed up through sequence number
216 // 10, it might have bytes [15-20), [20-25), [30,50) in its list.  Each set of
217 // bytes also has the timestamp it was originally viewed.  A flush call will
218 // look at the smallest subsequent set of bytes, in this case [15-20), and if
219 // its timestamp is older than the passed-in time, it will push it and all
220 // contiguous byte-sets out to the Stream's Reassembled function.  In this case,
221 // it will push [15-20), but also [20-25), since that's contiguous.  It will
222 // only push [30-50) if its timestamp is also older than the passed-in time,
223 // otherwise it will wait until the next FlushOlderThan to see if bytes [25-30)
224 // come in.
225 //
226 // If it pushes all bytes (or there were no sets of bytes to begin with)
227 // AND the connection has not received any bytes since the passed-in time,
228 // the connection will be closed.
229 //
230 // If CloseAll is set, it will close out connections that have been drained.
231 // Regardless of the CloseAll setting, connections stale for the specified
232 // time will be closed.
233 //
234 // Returns the number of connections flushed, and of those, the number closed
235 // because of the flush.
236 func (a *Assembler) FlushWithOptions(opt FlushOptions) (flushed, closed int) {
237         conns := a.connPool.connections()
238         closes := 0
239         flushes := 0
240         for _, conn := range conns {
241                 flushed := false
242                 conn.mu.Lock()
243                 if conn.closed {
244                         // Already closed connection, nothing to do here.
245                         conn.mu.Unlock()
246                         continue
247                 }
248                 for conn.first != nil && conn.first.Seen.Before(opt.T) {
249                         a.skipFlush(conn)
250                         flushed = true
251                         if conn.closed {
252                                 closes++
253                                 break
254                         }
255                 }
256                 if opt.CloseAll && !conn.closed && conn.first == nil && conn.lastSeen.Before(opt.T) {
257                         flushed = true
258                         a.closeConnection(conn)
259                         closes++
260                 }
261                 if flushed {
262                         flushes++
263                 }
264                 conn.mu.Unlock()
265         }
266         return flushes, closes
267 }
268
269 // FlushOlderThan calls FlushWithOptions with the CloseAll option set to true.
270 func (a *Assembler) FlushOlderThan(t time.Time) (flushed, closed int) {
271         return a.FlushWithOptions(FlushOptions{CloseAll: true, T: t})
272 }
273
274 // FlushAll flushes all remaining data into all remaining connections, closing
275 // those connections.  It returns the total number of connections flushed/closed
276 // by the call.
277 func (a *Assembler) FlushAll() (closed int) {
278         conns := a.connPool.connections()
279         closed = len(conns)
280         for _, conn := range conns {
281                 conn.mu.Lock()
282                 for !conn.closed {
283                         a.skipFlush(conn)
284                 }
285                 conn.mu.Unlock()
286         }
287         return
288 }
289
290 type key [2]gopacket.Flow
291
292 func (k *key) String() string {
293         return fmt.Sprintf("%s:%s", k[0], k[1])
294 }
295
296 // StreamPool stores all streams created by Assemblers, allowing multiple
297 // assemblers to work together on stream processing while enforcing the fact
298 // that a single stream receives its data serially.  It is safe
299 // for concurrency, usable by multiple Assemblers at once.
300 //
301 // StreamPool handles the creation and storage of Stream objects used by one or
302 // more Assembler objects.  When a new TCP stream is found by an Assembler, it
303 // creates an associated Stream by calling its StreamFactory's New method.
304 // Thereafter (until the stream is closed), that Stream object will receive
305 // assembled TCP data via Assembler's calls to the stream's Reassembled
306 // function.
307 //
308 // Like the Assembler, StreamPool attempts to minimize allocation.  Unlike the
309 // Assembler, though, it does have to do some locking to make sure that the
310 // connection objects it stores are accessible to multiple Assemblers.
311 type StreamPool struct {
312         conns              map[key]*connection
313         users              int
314         mu                 sync.RWMutex
315         factory            StreamFactory
316         free               []*connection
317         all                [][]connection
318         nextAlloc          int
319         newConnectionCount int64
320 }
321
322 func (p *StreamPool) grow() {
323         conns := make([]connection, p.nextAlloc)
324         p.all = append(p.all, conns)
325         for i := range conns {
326                 p.free = append(p.free, &conns[i])
327         }
328         if *memLog {
329                 log.Println("StreamPool: created", p.nextAlloc, "new connections")
330         }
331         p.nextAlloc *= 2
332 }
333
334 // NewStreamPool creates a new connection pool.  Streams will
335 // be created as necessary using the passed-in StreamFactory.
336 func NewStreamPool(factory StreamFactory) *StreamPool {
337         return &StreamPool{
338                 conns:     make(map[key]*connection, initialAllocSize),
339                 free:      make([]*connection, 0, initialAllocSize),
340                 factory:   factory,
341                 nextAlloc: initialAllocSize,
342         }
343 }
344
345 const assemblerReturnValueInitialSize = 16
346
347 // NewAssembler creates a new assembler.  Pass in the StreamPool
348 // to use, may be shared across assemblers.
349 //
350 // This sets some sane defaults for the assembler options,
351 // see DefaultAssemblerOptions for details.
352 func NewAssembler(pool *StreamPool) *Assembler {
353         pool.mu.Lock()
354         pool.users++
355         pool.mu.Unlock()
356         return &Assembler{
357                 ret:              make([]Reassembly, assemblerReturnValueInitialSize),
358                 pc:               newPageCache(),
359                 connPool:         pool,
360                 AssemblerOptions: DefaultAssemblerOptions,
361         }
362 }
363
364 // DefaultAssemblerOptions provides default options for an assembler.
365 // These options are used by default when calling NewAssembler, so if
366 // modified before a NewAssembler call they'll affect the resulting Assembler.
367 //
368 // Note that the default options can result in ever-increasing memory usage
369 // unless one of the Flush* methods is called on a regular basis.
370 var DefaultAssemblerOptions = AssemblerOptions{
371         MaxBufferedPagesPerConnection: 0, // unlimited
372         MaxBufferedPagesTotal:         0, // unlimited
373 }
374
375 type connection struct {
376         key               key
377         pages             int
378         first, last       *page
379         nextSeq           Sequence
380         created, lastSeen time.Time
381         stream            Stream
382         closed            bool
383         mu                sync.Mutex
384 }
385
386 func (c *connection) reset(k key, s Stream, ts time.Time) {
387         c.key = k
388         c.pages = 0
389         c.first, c.last = nil, nil
390         c.nextSeq = invalidSequence
391         c.created = ts
392         c.stream = s
393         c.closed = false
394 }
395
396 // AssemblerOptions controls the behavior of each assembler.  Modify the
397 // options of each assembler you create to change their behavior.
398 type AssemblerOptions struct {
399         // MaxBufferedPagesTotal is an upper limit on the total number of pages to
400         // buffer while waiting for out-of-order packets.  Once this limit is
401         // reached, the assembler will degrade to flushing every connection it
402         // gets a packet for.  If <= 0, this is ignored.
403         MaxBufferedPagesTotal int
404         // MaxBufferedPagesPerConnection is an upper limit on the number of pages
405         // buffered for a single connection.  Should this limit be reached for a
406         // particular connection, the smallest sequence number will be flushed, along
407         // with any contiguous data.  If <= 0, this is ignored.
408         MaxBufferedPagesPerConnection int
409 }
410
411 // Assembler handles reassembling TCP streams.  It is not safe for
412 // concurrency... after passing a packet in via the Assemble call, the caller
413 // must wait for that call to return before calling Assemble again.  Callers can
414 // get around this by creating multiple assemblers that share a StreamPool.  In
415 // that case, each individual stream will still be handled serially (each stream
416 // has an individual mutex associated with it), however multiple assemblers can
417 // assemble different connections concurrently.
418 //
419 // The Assembler provides (hopefully) fast TCP stream re-assembly for sniffing
420 // applications written in Go.  The Assembler uses the following methods to be
421 // as fast as possible, to keep packet processing speedy:
422 //
423 // Avoids Lock Contention
424 //
425 // Assemblers locks connections, but each connection has an individual lock, and
426 // rarely will two Assemblers be looking at the same connection.  Assemblers
427 // lock the StreamPool when looking up connections, but they use Reader
428 // locks initially, and only force a write lock if they need to create a new
429 // connection or close one down.  These happen much less frequently than
430 // individual packet handling.
431 //
432 // Each assembler runs in its own goroutine, and the only state shared between
433 // goroutines is through the StreamPool.  Thus all internal Assembler state
434 // can be handled without any locking.
435 //
436 // NOTE:  If you can guarantee that packets going to a set of Assemblers will
437 // contain information on different connections per Assembler (for example,
438 // they're already hashed by PF_RING hashing or some other hashing mechanism),
439 // then we recommend you use a seperate StreamPool per Assembler, thus
440 // avoiding all lock contention.  Only when different Assemblers could receive
441 // packets for the same Stream should a StreamPool be shared between them.
442 //
443 // Avoids Memory Copying
444 //
445 // In the common case, handling of a single TCP packet should result in zero
446 // memory allocations.  The Assembler will look up the connection, figure out
447 // that the packet has arrived in order, and immediately pass that packet on to
448 // the appropriate connection's handling code.  Only if a packet arrives out of
449 // order is its contents copied and stored in memory for later.
450 //
451 // Avoids Memory Allocation
452 //
453 // Assemblers try very hard to not use memory allocation unless absolutely
454 // necessary.  Packet data for sequential packets is passed directly to streams
455 // with no copying or allocation.  Packet data for out-of-order packets is
456 // copied into reusable pages, and new pages are only allocated rarely when the
457 // page cache runs out.  Page caches are Assembler-specific, thus not used
458 // concurrently and requiring no locking.
459 //
460 // Internal representations for connection objects are also reused over time.
461 // Because of this, the most common memory allocation done by the Assembler is
462 // generally what's done by the caller in StreamFactory.New.  If no allocation
463 // is done there, then very little allocation is done ever, mostly to handle
464 // large increases in bandwidth or numbers of connections.
465 //
466 // TODO:  The page caches used by an Assembler will grow to the size necessary
467 // to handle a workload, and currently will never shrink.  This means that
468 // traffic spikes can result in large memory usage which isn't garbage
469 // collected when typical traffic levels return.
470 type Assembler struct {
471         AssemblerOptions
472         ret      []Reassembly
473         pc       *pageCache
474         connPool *StreamPool
475 }
476
477 func (p *StreamPool) newConnection(k key, s Stream, ts time.Time) (c *connection) {
478         if *memLog {
479                 p.newConnectionCount++
480                 if p.newConnectionCount&0x7FFF == 0 {
481                         log.Println("StreamPool:", p.newConnectionCount, "requests,", len(p.conns), "used,", len(p.free), "free")
482                 }
483         }
484         if len(p.free) == 0 {
485                 p.grow()
486         }
487         index := len(p.free) - 1
488         c, p.free = p.free[index], p.free[:index]
489         c.reset(k, s, ts)
490         return c
491 }
492
493 // getConnection returns a connection.  If end is true and a connection
494 // does not already exist, returns nil.  This allows us to check for a
495 // connection without actually creating one if it doesn't already exist.
496 func (p *StreamPool) getConnection(k key, end bool, ts time.Time) *connection {
497         p.mu.RLock()
498         conn := p.conns[k]
499         p.mu.RUnlock()
500         if end || conn != nil {
501                 return conn
502         }
503         s := p.factory.New(k[0], k[1])
504         p.mu.Lock()
505         conn = p.newConnection(k, s, ts)
506         if conn2 := p.conns[k]; conn2 != nil {
507                 p.mu.Unlock()
508                 return conn2
509         }
510         p.conns[k] = conn
511         p.mu.Unlock()
512         return conn
513 }
514
515 // Assemble calls AssembleWithTimestamp with the current timestamp, useful for
516 // packets being read directly off the wire.
517 func (a *Assembler) Assemble(netFlow gopacket.Flow, t *layers.TCP) {
518         a.AssembleWithTimestamp(netFlow, t, time.Now())
519 }
520
521 // AssembleWithTimestamp reassembles the given TCP packet into its appropriate
522 // stream.
523 //
524 // The timestamp passed in must be the timestamp the packet was seen.
525 // For packets read off the wire, time.Now() should be fine.  For packets read
526 // from PCAP files, CaptureInfo.Timestamp should be passed in.  This timestamp
527 // will affect which streams are flushed by a call to FlushOlderThan.
528 //
529 // Each Assemble call results in, in order:
530 //
531 //    zero or one calls to StreamFactory.New, creating a stream
532 //    zero or one calls to Reassembled on a single stream
533 //    zero or one calls to ReassemblyComplete on the same stream
534 func (a *Assembler) AssembleWithTimestamp(netFlow gopacket.Flow, t *layers.TCP, timestamp time.Time) {
535         // Ignore empty TCP packets
536         if !t.SYN && !t.FIN && !t.RST && len(t.LayerPayload()) == 0 {
537                 if *debugLog {
538                         log.Println("ignoring useless packet")
539                 }
540                 return
541         }
542
543         a.ret = a.ret[:0]
544         key := key{netFlow, t.TransportFlow()}
545         var conn *connection
546         // This for loop handles a race condition where a connection will close, lock
547         // the connection pool, and remove itself, but before it locked the connection
548         // pool it's returned to another Assemble statement.  This should loop 0-1
549         // times for the VAST majority of cases.
550         for {
551                 conn = a.connPool.getConnection(
552                         key, !t.SYN && len(t.LayerPayload()) == 0, timestamp)
553                 if conn == nil {
554                         if *debugLog {
555                                 log.Printf("%v got empty packet on otherwise empty connection", key)
556                         }
557                         return
558                 }
559                 conn.mu.Lock()
560                 if !conn.closed {
561                         break
562                 }
563                 conn.mu.Unlock()
564         }
565         if conn.lastSeen.Before(timestamp) {
566                 conn.lastSeen = timestamp
567         }
568         seq, bytes := Sequence(t.Seq), t.Payload
569         if conn.nextSeq == invalidSequence {
570                 if t.SYN {
571                         if *debugLog {
572                                 log.Printf("%v saw first SYN packet, returning immediately, seq=%v", key, seq)
573                         }
574                         a.ret = append(a.ret, Reassembly{
575                                 Bytes: bytes,
576                                 Skip:  0,
577                                 Start: true,
578                                 Seen:  timestamp,
579                         })
580                         conn.nextSeq = seq.Add(len(bytes) + 1)
581                 } else {
582                         if *debugLog {
583                                 log.Printf("%v waiting for start, storing into connection", key)
584                         }
585                         a.insertIntoConn(t, conn, timestamp)
586                 }
587         } else if diff := conn.nextSeq.Difference(seq); diff > 0 {
588                 if *debugLog {
589                         log.Printf("%v gap in sequence numbers (%v, %v) diff %v, storing into connection", key, conn.nextSeq, seq, diff)
590                 }
591                 a.insertIntoConn(t, conn, timestamp)
592         } else {
593                 bytes, conn.nextSeq = byteSpan(conn.nextSeq, seq, bytes)
594                 if *debugLog {
595                         log.Printf("%v found contiguous data (%v, %v), returning immediately", key, seq, conn.nextSeq)
596                 }
597                 a.ret = append(a.ret, Reassembly{
598                         Bytes: bytes,
599                         Skip:  0,
600                         End:   t.RST || t.FIN,
601                         Seen:  timestamp,
602                 })
603         }
604         if len(a.ret) > 0 {
605                 a.sendToConnection(conn)
606         }
607         conn.mu.Unlock()
608 }
609
610 func byteSpan(expected, received Sequence, bytes []byte) (toSend []byte, next Sequence) {
611         if expected == invalidSequence {
612                 return bytes, received.Add(len(bytes))
613         }
614         span := int(received.Difference(expected))
615         if span <= 0 {
616                 return bytes, received.Add(len(bytes))
617         } else if len(bytes) < span {
618                 return nil, expected
619         }
620         return bytes[span:], expected.Add(len(bytes) - span)
621 }
622
623 // sendToConnection sends the current values in a.ret to the connection, closing
624 // the connection if the last thing sent had End set.
625 func (a *Assembler) sendToConnection(conn *connection) {
626         a.addContiguous(conn)
627         if conn.stream == nil {
628                 panic("why?")
629         }
630         conn.stream.Reassembled(a.ret)
631         if a.ret[len(a.ret)-1].End {
632                 a.closeConnection(conn)
633         }
634 }
635
636 // addContiguous adds contiguous byte-sets to a connection.
637 func (a *Assembler) addContiguous(conn *connection) {
638         for conn.first != nil && conn.nextSeq.Difference(conn.first.seq) <= 0 {
639                 a.addNextFromConn(conn)
640         }
641 }
642
643 // skipFlush skips the first set of bytes we're waiting for and returns the
644 // first set of bytes we have.  If we have no bytes pending, it closes the
645 // connection.
646 func (a *Assembler) skipFlush(conn *connection) {
647         if *debugLog {
648                 log.Printf("%v skipFlush %v", conn.key, conn.nextSeq)
649         }
650         if conn.first == nil {
651                 a.closeConnection(conn)
652                 return
653         }
654         a.ret = a.ret[:0]
655         a.addNextFromConn(conn)
656         a.addContiguous(conn)
657         a.sendToConnection(conn)
658 }
659
660 func (p *StreamPool) remove(conn *connection) {
661         p.mu.Lock()
662         delete(p.conns, conn.key)
663         p.free = append(p.free, conn)
664         p.mu.Unlock()
665 }
666
667 func (a *Assembler) closeConnection(conn *connection) {
668         if *debugLog {
669                 log.Printf("%v closing", conn.key)
670         }
671         conn.stream.ReassemblyComplete()
672         conn.closed = true
673         a.connPool.remove(conn)
674         for p := conn.first; p != nil; p = p.next {
675                 a.pc.replace(p)
676         }
677 }
678
679 // traverseConn traverses our doubly-linked list of pages for the correct
680 // position to put the given sequence number.  Note that it traverses backwards,
681 // starting at the highest sequence number and going down, since we assume the
682 // common case is that TCP packets for a stream will appear in-order, with
683 // minimal loss or packet reordering.
684 func (c *connection) traverseConn(seq Sequence) (prev, current *page) {
685         prev = c.last
686         for prev != nil && prev.seq.Difference(seq) < 0 {
687                 current = prev
688                 prev = current.prev
689         }
690         return
691 }
692
693 // pushBetween inserts the doubly-linked list first-...-last in between the
694 // nodes prev-next in another doubly-linked list.  If prev is nil, makes first
695 // the new first page in the connection's list.  If next is nil, makes last the
696 // new last page in the list.  first/last may point to the same page.
697 func (c *connection) pushBetween(prev, next, first, last *page) {
698         // Maintain our doubly linked list
699         if next == nil || c.last == nil {
700                 c.last = last
701         } else {
702                 last.next = next
703                 next.prev = last
704         }
705         if prev == nil || c.first == nil {
706                 c.first = first
707         } else {
708                 first.prev = prev
709                 prev.next = first
710         }
711 }
712
713 func (a *Assembler) insertIntoConn(t *layers.TCP, conn *connection, ts time.Time) {
714         if conn.first != nil && conn.first.seq == conn.nextSeq {
715                 panic("wtf")
716         }
717         p, p2, numPages := a.pagesFromTCP(t, ts)
718         prev, current := conn.traverseConn(Sequence(t.Seq))
719         conn.pushBetween(prev, current, p, p2)
720         conn.pages += numPages
721         if (a.MaxBufferedPagesPerConnection > 0 && conn.pages >= a.MaxBufferedPagesPerConnection) ||
722                 (a.MaxBufferedPagesTotal > 0 && a.pc.used >= a.MaxBufferedPagesTotal) {
723                 if *debugLog {
724                         log.Printf("%v hit max buffer size: %+v, %v, %v", conn.key, a.AssemblerOptions, conn.pages, a.pc.used)
725                 }
726                 a.addNextFromConn(conn)
727         }
728 }
729
730 // pagesFromTCP creates a page (or set of pages) from a TCP packet.  Note that
731 // it should NEVER receive a SYN packet, as it doesn't handle sequences
732 // correctly.
733 //
734 // It returns the first and last page in its doubly-linked list of new pages.
735 func (a *Assembler) pagesFromTCP(t *layers.TCP, ts time.Time) (p, p2 *page, numPages int) {
736         first := a.pc.next(ts)
737         current := first
738         numPages++
739         seq, bytes := Sequence(t.Seq), t.Payload
740         for {
741                 length := min(len(bytes), pageBytes)
742                 current.Bytes = current.buf[:length]
743                 copy(current.Bytes, bytes)
744                 current.seq = seq
745                 bytes = bytes[length:]
746                 if len(bytes) == 0 {
747                         break
748                 }
749                 seq = seq.Add(length)
750                 current.next = a.pc.next(ts)
751                 current.next.prev = current
752                 current = current.next
753                 numPages++
754         }
755         current.End = t.RST || t.FIN
756         return first, current, numPages
757 }
758
759 // addNextFromConn pops the first page from a connection off and adds it to the
760 // return array.
761 func (a *Assembler) addNextFromConn(conn *connection) {
762         if conn.nextSeq == invalidSequence {
763                 conn.first.Skip = -1
764         } else if diff := conn.nextSeq.Difference(conn.first.seq); diff > 0 {
765                 conn.first.Skip = int(diff)
766         }
767         conn.first.Bytes, conn.nextSeq = byteSpan(conn.nextSeq, conn.first.seq, conn.first.Bytes)
768         if *debugLog {
769                 log.Printf("%v   adding from conn (%v, %v)", conn.key, conn.first.seq, conn.nextSeq)
770         }
771         a.ret = append(a.ret, conn.first.Reassembly)
772         a.pc.replace(conn.first)
773         if conn.first == conn.last {
774                 conn.first = nil
775                 conn.last = nil
776         } else {
777                 conn.first = conn.first.next
778                 conn.first.prev = nil
779         }
780         conn.pages--
781 }
782
783 func min(a, b int) int {
784         if a < b {
785                 return a
786         }
787         return b
788 }