1 // Copyright 2012 Google, Inc. All rights reserved.
3 // Use of this source code is governed by a BSD-style license
4 // that can be found in the LICENSE file in the root of the source
7 // Package tcpassembly provides TCP stream re-assembly.
9 // The tcpassembly package implements uni-directional TCP reassembly, for use in
10 // packet-sniffing applications. The caller reads packets off the wire, then
11 // presents them to an Assembler in the form of gopacket layers.TCP packets
12 // (github.com/google/gopacket, github.com/google/gopacket/layers).
14 // The Assembler uses a user-supplied
15 // StreamFactory to create a user-defined Stream interface, then passes packet
16 // data in stream order to that object. A concurrency-safe StreamPool keeps
17 // track of all current Streams being reassembled, so multiple Assemblers may
18 // run at once to assemble packets while taking advantage of multiple cores.
24 "github.com/google/gopacket"
25 "github.com/google/gopacket/layers"
31 var memLog = flag.Bool("assembly_memuse_log", false, "If true, the github.com/google/gopacket/tcpassembly library will log information regarding its memory use every once in a while.")
32 var debugLog = flag.Bool("assembly_debug_log", false, "If true, the github.com/google/gopacket/tcpassembly library will log verbose debugging information (at least one line per packet)")
34 const invalidSequence = -1
35 const uint32Max = 0xFFFFFFFF
37 // Sequence is a TCP sequence number. It provides a few convenience functions
38 // for handling TCP wrap-around. The sequence should always be in the range
39 // [0,0xFFFFFFFF]... its other bits are simply used in wrap-around calculations
40 // and should never be set.
43 // Difference defines an ordering for comparing TCP sequences that's safe for
44 // roll-overs. It returns:
45 // > 0 : if t comes after s
46 // < 0 : if t comes before s
48 // The number returned is the sequence difference, so 4.Difference(8) will
51 // It handles rollovers by considering any sequence in the first quarter of the
52 // uint32 space to be after any sequence in the last quarter of that space, thus
53 // wrapping the uint32 space.
54 func (s Sequence) Difference(t Sequence) int {
55 if s > uint32Max-uint32Max/4 && t < uint32Max/4 {
57 } else if t > uint32Max-uint32Max/4 && s < uint32Max/4 {
63 // Add adds an integer to a sequence and returns the resulting sequence.
64 func (s Sequence) Add(t int) Sequence {
65 return (s + Sequence(t)) & uint32Max
68 // Reassembly objects are passed by an Assembler into Streams using the
69 // Reassembled call. Callers should not need to create these structs themselves
70 // except for testing.
71 type Reassembly struct {
72 // Bytes is the next set of bytes in the stream. May be empty.
74 // Skip is set to non-zero if bytes were skipped between this and the
75 // last Reassembly. If this is the first packet in a connection and we
76 // didn't see the start, we have no idea how many bytes we skipped, so
77 // we set it to -1. Otherwise, it's set to the number of bytes skipped.
79 // Start is set if this set of bytes has a TCP SYN accompanying it.
81 // End is set if this set of bytes has a TCP FIN or RST accompanying it.
83 // Seen is the timestamp this set of bytes was pulled off the wire.
87 const pageBytes = 1900
89 // page is used to store TCP data we're not ready for yet (out-of-order
90 // packets). Unused pages are stored in and returned from a pageCache, which
91 // avoids memory allocation. Used pages are stored in a doubly-linked list in
101 // pageCache is a concurrency-unsafe store of page objects we use to avoid
102 // memory allocation as much as we can. It grows but never shrinks.
103 type pageCache struct {
111 const initialAllocSize = 1024
113 func newPageCache() *pageCache {
115 free: make([]*page, 0, initialAllocSize),
116 pcSize: initialAllocSize,
122 // grow exponentially increases the size of our page cache as much as necessary.
123 func (c *pageCache) grow() {
124 pages := make([]page, c.pcSize)
125 c.pages = append(c.pages, pages)
127 for i := range pages {
128 c.free = append(c.free, &pages[i])
131 log.Println("PageCache: created", c.pcSize, "new pages")
136 // next returns a clean, ready-to-use page object.
137 func (c *pageCache) next(ts time.Time) (p *page) {
140 if c.pageRequests&0xFFFF == 0 {
141 log.Println("PageCache:", c.pageRequests, "requested,", c.used, "used,", len(c.free), "free")
144 if len(c.free) == 0 {
148 p, c.free = c.free[i], c.free[:i]
157 // replace replaces a page into the pageCache.
158 func (c *pageCache) replace(p *page) {
160 c.free = append(c.free, p)
163 // Stream is implemented by the caller to handle incoming reassembled
164 // TCP data. Callers create a StreamFactory, then StreamPool uses
165 // it to create a new Stream for every TCP stream.
167 // assembly will, in order:
168 // 1) Create the stream via StreamFactory.New
169 // 2) Call Reassembled 0 or more times, passing in reassembled TCP data in order
170 // 3) Call ReassemblyComplete one time, after which the stream is dereferenced by assembly.
171 type Stream interface {
172 // Reassembled is called zero or more times. assembly guarantees
173 // that the set of all Reassembly objects passed in during all
174 // calls are presented in the order they appear in the TCP stream.
175 // Reassembly objects are reused after each Reassembled call,
176 // so it's important to copy anything you need out of them
177 // (specifically out of Reassembly.Bytes) that you need to stay
178 // around after you return from the Reassembled call.
179 Reassembled([]Reassembly)
180 // ReassemblyComplete is called when assembly decides there is
181 // no more data for this Stream, either because a FIN or RST packet
182 // was seen, or because the stream has timed out without any new
183 // packet data (due to a call to FlushOlderThan).
187 // StreamFactory is used by assembly to create a new stream for each
189 type StreamFactory interface {
190 // New should return a new stream for the given TCP key.
191 New(netFlow, tcpFlow gopacket.Flow) Stream
194 func (p *StreamPool) connections() []*connection {
196 conns := make([]*connection, 0, len(p.conns))
197 for _, conn := range p.conns {
198 conns = append(conns, conn)
204 // FlushOptions provide options for flushing connections.
205 type FlushOptions struct {
206 T time.Time // If nonzero, only connections with data older than T are flushed
207 CloseAll bool // If true, ALL connections are closed post flush, not just those that correctly see FIN/RST.
210 // FlushWithOptions finds any streams waiting for packets older than
211 // the given time, and pushes through the data they have (IE: tells
212 // them to stop waiting and skip the data they're waiting for).
214 // Each Stream maintains a list of zero or more sets of bytes it has received
215 // out-of-order. For example, if it has processed up through sequence number
216 // 10, it might have bytes [15-20), [20-25), [30,50) in its list. Each set of
217 // bytes also has the timestamp it was originally viewed. A flush call will
218 // look at the smallest subsequent set of bytes, in this case [15-20), and if
219 // its timestamp is older than the passed-in time, it will push it and all
220 // contiguous byte-sets out to the Stream's Reassembled function. In this case,
221 // it will push [15-20), but also [20-25), since that's contiguous. It will
222 // only push [30-50) if its timestamp is also older than the passed-in time,
223 // otherwise it will wait until the next FlushOlderThan to see if bytes [25-30)
226 // If it pushes all bytes (or there were no sets of bytes to begin with)
227 // AND the connection has not received any bytes since the passed-in time,
228 // the connection will be closed.
230 // If CloseAll is set, it will close out connections that have been drained.
231 // Regardless of the CloseAll setting, connections stale for the specified
232 // time will be closed.
234 // Returns the number of connections flushed, and of those, the number closed
235 // because of the flush.
236 func (a *Assembler) FlushWithOptions(opt FlushOptions) (flushed, closed int) {
237 conns := a.connPool.connections()
240 for _, conn := range conns {
244 // Already closed connection, nothing to do here.
248 for conn.first != nil && conn.first.Seen.Before(opt.T) {
256 if opt.CloseAll && !conn.closed && conn.first == nil && conn.lastSeen.Before(opt.T) {
258 a.closeConnection(conn)
266 return flushes, closes
269 // FlushOlderThan calls FlushWithOptions with the CloseAll option set to true.
270 func (a *Assembler) FlushOlderThan(t time.Time) (flushed, closed int) {
271 return a.FlushWithOptions(FlushOptions{CloseAll: true, T: t})
274 // FlushAll flushes all remaining data into all remaining connections, closing
275 // those connections. It returns the total number of connections flushed/closed
277 func (a *Assembler) FlushAll() (closed int) {
278 conns := a.connPool.connections()
280 for _, conn := range conns {
290 type key [2]gopacket.Flow
292 func (k *key) String() string {
293 return fmt.Sprintf("%s:%s", k[0], k[1])
296 // StreamPool stores all streams created by Assemblers, allowing multiple
297 // assemblers to work together on stream processing while enforcing the fact
298 // that a single stream receives its data serially. It is safe
299 // for concurrency, usable by multiple Assemblers at once.
301 // StreamPool handles the creation and storage of Stream objects used by one or
302 // more Assembler objects. When a new TCP stream is found by an Assembler, it
303 // creates an associated Stream by calling its StreamFactory's New method.
304 // Thereafter (until the stream is closed), that Stream object will receive
305 // assembled TCP data via Assembler's calls to the stream's Reassembled
308 // Like the Assembler, StreamPool attempts to minimize allocation. Unlike the
309 // Assembler, though, it does have to do some locking to make sure that the
310 // connection objects it stores are accessible to multiple Assemblers.
311 type StreamPool struct {
312 conns map[key]*connection
315 factory StreamFactory
319 newConnectionCount int64
322 func (p *StreamPool) grow() {
323 conns := make([]connection, p.nextAlloc)
324 p.all = append(p.all, conns)
325 for i := range conns {
326 p.free = append(p.free, &conns[i])
329 log.Println("StreamPool: created", p.nextAlloc, "new connections")
334 // NewStreamPool creates a new connection pool. Streams will
335 // be created as necessary using the passed-in StreamFactory.
336 func NewStreamPool(factory StreamFactory) *StreamPool {
338 conns: make(map[key]*connection, initialAllocSize),
339 free: make([]*connection, 0, initialAllocSize),
341 nextAlloc: initialAllocSize,
345 const assemblerReturnValueInitialSize = 16
347 // NewAssembler creates a new assembler. Pass in the StreamPool
348 // to use, may be shared across assemblers.
350 // This sets some sane defaults for the assembler options,
351 // see DefaultAssemblerOptions for details.
352 func NewAssembler(pool *StreamPool) *Assembler {
357 ret: make([]Reassembly, assemblerReturnValueInitialSize),
360 AssemblerOptions: DefaultAssemblerOptions,
364 // DefaultAssemblerOptions provides default options for an assembler.
365 // These options are used by default when calling NewAssembler, so if
366 // modified before a NewAssembler call they'll affect the resulting Assembler.
368 // Note that the default options can result in ever-increasing memory usage
369 // unless one of the Flush* methods is called on a regular basis.
370 var DefaultAssemblerOptions = AssemblerOptions{
371 MaxBufferedPagesPerConnection: 0, // unlimited
372 MaxBufferedPagesTotal: 0, // unlimited
375 type connection struct {
380 created, lastSeen time.Time
386 func (c *connection) reset(k key, s Stream, ts time.Time) {
389 c.first, c.last = nil, nil
390 c.nextSeq = invalidSequence
396 // AssemblerOptions controls the behavior of each assembler. Modify the
397 // options of each assembler you create to change their behavior.
398 type AssemblerOptions struct {
399 // MaxBufferedPagesTotal is an upper limit on the total number of pages to
400 // buffer while waiting for out-of-order packets. Once this limit is
401 // reached, the assembler will degrade to flushing every connection it
402 // gets a packet for. If <= 0, this is ignored.
403 MaxBufferedPagesTotal int
404 // MaxBufferedPagesPerConnection is an upper limit on the number of pages
405 // buffered for a single connection. Should this limit be reached for a
406 // particular connection, the smallest sequence number will be flushed, along
407 // with any contiguous data. If <= 0, this is ignored.
408 MaxBufferedPagesPerConnection int
411 // Assembler handles reassembling TCP streams. It is not safe for
412 // concurrency... after passing a packet in via the Assemble call, the caller
413 // must wait for that call to return before calling Assemble again. Callers can
414 // get around this by creating multiple assemblers that share a StreamPool. In
415 // that case, each individual stream will still be handled serially (each stream
416 // has an individual mutex associated with it), however multiple assemblers can
417 // assemble different connections concurrently.
419 // The Assembler provides (hopefully) fast TCP stream re-assembly for sniffing
420 // applications written in Go. The Assembler uses the following methods to be
421 // as fast as possible, to keep packet processing speedy:
423 // Avoids Lock Contention
425 // Assemblers locks connections, but each connection has an individual lock, and
426 // rarely will two Assemblers be looking at the same connection. Assemblers
427 // lock the StreamPool when looking up connections, but they use Reader
428 // locks initially, and only force a write lock if they need to create a new
429 // connection or close one down. These happen much less frequently than
430 // individual packet handling.
432 // Each assembler runs in its own goroutine, and the only state shared between
433 // goroutines is through the StreamPool. Thus all internal Assembler state
434 // can be handled without any locking.
436 // NOTE: If you can guarantee that packets going to a set of Assemblers will
437 // contain information on different connections per Assembler (for example,
438 // they're already hashed by PF_RING hashing or some other hashing mechanism),
439 // then we recommend you use a seperate StreamPool per Assembler, thus
440 // avoiding all lock contention. Only when different Assemblers could receive
441 // packets for the same Stream should a StreamPool be shared between them.
443 // Avoids Memory Copying
445 // In the common case, handling of a single TCP packet should result in zero
446 // memory allocations. The Assembler will look up the connection, figure out
447 // that the packet has arrived in order, and immediately pass that packet on to
448 // the appropriate connection's handling code. Only if a packet arrives out of
449 // order is its contents copied and stored in memory for later.
451 // Avoids Memory Allocation
453 // Assemblers try very hard to not use memory allocation unless absolutely
454 // necessary. Packet data for sequential packets is passed directly to streams
455 // with no copying or allocation. Packet data for out-of-order packets is
456 // copied into reusable pages, and new pages are only allocated rarely when the
457 // page cache runs out. Page caches are Assembler-specific, thus not used
458 // concurrently and requiring no locking.
460 // Internal representations for connection objects are also reused over time.
461 // Because of this, the most common memory allocation done by the Assembler is
462 // generally what's done by the caller in StreamFactory.New. If no allocation
463 // is done there, then very little allocation is done ever, mostly to handle
464 // large increases in bandwidth or numbers of connections.
466 // TODO: The page caches used by an Assembler will grow to the size necessary
467 // to handle a workload, and currently will never shrink. This means that
468 // traffic spikes can result in large memory usage which isn't garbage
469 // collected when typical traffic levels return.
470 type Assembler struct {
477 func (p *StreamPool) newConnection(k key, s Stream, ts time.Time) (c *connection) {
479 p.newConnectionCount++
480 if p.newConnectionCount&0x7FFF == 0 {
481 log.Println("StreamPool:", p.newConnectionCount, "requests,", len(p.conns), "used,", len(p.free), "free")
484 if len(p.free) == 0 {
487 index := len(p.free) - 1
488 c, p.free = p.free[index], p.free[:index]
493 // getConnection returns a connection. If end is true and a connection
494 // does not already exist, returns nil. This allows us to check for a
495 // connection without actually creating one if it doesn't already exist.
496 func (p *StreamPool) getConnection(k key, end bool, ts time.Time) *connection {
500 if end || conn != nil {
503 s := p.factory.New(k[0], k[1])
505 conn = p.newConnection(k, s, ts)
506 if conn2 := p.conns[k]; conn2 != nil {
515 // Assemble calls AssembleWithTimestamp with the current timestamp, useful for
516 // packets being read directly off the wire.
517 func (a *Assembler) Assemble(netFlow gopacket.Flow, t *layers.TCP) {
518 a.AssembleWithTimestamp(netFlow, t, time.Now())
521 // AssembleWithTimestamp reassembles the given TCP packet into its appropriate
524 // The timestamp passed in must be the timestamp the packet was seen.
525 // For packets read off the wire, time.Now() should be fine. For packets read
526 // from PCAP files, CaptureInfo.Timestamp should be passed in. This timestamp
527 // will affect which streams are flushed by a call to FlushOlderThan.
529 // Each Assemble call results in, in order:
531 // zero or one calls to StreamFactory.New, creating a stream
532 // zero or one calls to Reassembled on a single stream
533 // zero or one calls to ReassemblyComplete on the same stream
534 func (a *Assembler) AssembleWithTimestamp(netFlow gopacket.Flow, t *layers.TCP, timestamp time.Time) {
535 // Ignore empty TCP packets
536 if !t.SYN && !t.FIN && !t.RST && len(t.LayerPayload()) == 0 {
538 log.Println("ignoring useless packet")
544 key := key{netFlow, t.TransportFlow()}
546 // This for loop handles a race condition where a connection will close, lock
547 // the connection pool, and remove itself, but before it locked the connection
548 // pool it's returned to another Assemble statement. This should loop 0-1
549 // times for the VAST majority of cases.
551 conn = a.connPool.getConnection(
552 key, !t.SYN && len(t.LayerPayload()) == 0, timestamp)
555 log.Printf("%v got empty packet on otherwise empty connection", key)
565 if conn.lastSeen.Before(timestamp) {
566 conn.lastSeen = timestamp
568 seq, bytes := Sequence(t.Seq), t.Payload
569 if conn.nextSeq == invalidSequence {
572 log.Printf("%v saw first SYN packet, returning immediately, seq=%v", key, seq)
574 a.ret = append(a.ret, Reassembly{
580 conn.nextSeq = seq.Add(len(bytes) + 1)
583 log.Printf("%v waiting for start, storing into connection", key)
585 a.insertIntoConn(t, conn, timestamp)
587 } else if diff := conn.nextSeq.Difference(seq); diff > 0 {
589 log.Printf("%v gap in sequence numbers (%v, %v) diff %v, storing into connection", key, conn.nextSeq, seq, diff)
591 a.insertIntoConn(t, conn, timestamp)
593 bytes, conn.nextSeq = byteSpan(conn.nextSeq, seq, bytes)
595 log.Printf("%v found contiguous data (%v, %v), returning immediately", key, seq, conn.nextSeq)
597 a.ret = append(a.ret, Reassembly{
605 a.sendToConnection(conn)
610 func byteSpan(expected, received Sequence, bytes []byte) (toSend []byte, next Sequence) {
611 if expected == invalidSequence {
612 return bytes, received.Add(len(bytes))
614 span := int(received.Difference(expected))
616 return bytes, received.Add(len(bytes))
617 } else if len(bytes) < span {
620 return bytes[span:], expected.Add(len(bytes) - span)
623 // sendToConnection sends the current values in a.ret to the connection, closing
624 // the connection if the last thing sent had End set.
625 func (a *Assembler) sendToConnection(conn *connection) {
626 a.addContiguous(conn)
627 if conn.stream == nil {
630 conn.stream.Reassembled(a.ret)
631 if a.ret[len(a.ret)-1].End {
632 a.closeConnection(conn)
636 // addContiguous adds contiguous byte-sets to a connection.
637 func (a *Assembler) addContiguous(conn *connection) {
638 for conn.first != nil && conn.nextSeq.Difference(conn.first.seq) <= 0 {
639 a.addNextFromConn(conn)
643 // skipFlush skips the first set of bytes we're waiting for and returns the
644 // first set of bytes we have. If we have no bytes pending, it closes the
646 func (a *Assembler) skipFlush(conn *connection) {
648 log.Printf("%v skipFlush %v", conn.key, conn.nextSeq)
650 if conn.first == nil {
651 a.closeConnection(conn)
655 a.addNextFromConn(conn)
656 a.addContiguous(conn)
657 a.sendToConnection(conn)
660 func (p *StreamPool) remove(conn *connection) {
662 delete(p.conns, conn.key)
663 p.free = append(p.free, conn)
667 func (a *Assembler) closeConnection(conn *connection) {
669 log.Printf("%v closing", conn.key)
671 conn.stream.ReassemblyComplete()
673 a.connPool.remove(conn)
674 for p := conn.first; p != nil; p = p.next {
679 // traverseConn traverses our doubly-linked list of pages for the correct
680 // position to put the given sequence number. Note that it traverses backwards,
681 // starting at the highest sequence number and going down, since we assume the
682 // common case is that TCP packets for a stream will appear in-order, with
683 // minimal loss or packet reordering.
684 func (c *connection) traverseConn(seq Sequence) (prev, current *page) {
686 for prev != nil && prev.seq.Difference(seq) < 0 {
693 // pushBetween inserts the doubly-linked list first-...-last in between the
694 // nodes prev-next in another doubly-linked list. If prev is nil, makes first
695 // the new first page in the connection's list. If next is nil, makes last the
696 // new last page in the list. first/last may point to the same page.
697 func (c *connection) pushBetween(prev, next, first, last *page) {
698 // Maintain our doubly linked list
699 if next == nil || c.last == nil {
705 if prev == nil || c.first == nil {
713 func (a *Assembler) insertIntoConn(t *layers.TCP, conn *connection, ts time.Time) {
714 if conn.first != nil && conn.first.seq == conn.nextSeq {
717 p, p2, numPages := a.pagesFromTCP(t, ts)
718 prev, current := conn.traverseConn(Sequence(t.Seq))
719 conn.pushBetween(prev, current, p, p2)
720 conn.pages += numPages
721 if (a.MaxBufferedPagesPerConnection > 0 && conn.pages >= a.MaxBufferedPagesPerConnection) ||
722 (a.MaxBufferedPagesTotal > 0 && a.pc.used >= a.MaxBufferedPagesTotal) {
724 log.Printf("%v hit max buffer size: %+v, %v, %v", conn.key, a.AssemblerOptions, conn.pages, a.pc.used)
726 a.addNextFromConn(conn)
730 // pagesFromTCP creates a page (or set of pages) from a TCP packet. Note that
731 // it should NEVER receive a SYN packet, as it doesn't handle sequences
734 // It returns the first and last page in its doubly-linked list of new pages.
735 func (a *Assembler) pagesFromTCP(t *layers.TCP, ts time.Time) (p, p2 *page, numPages int) {
736 first := a.pc.next(ts)
739 seq, bytes := Sequence(t.Seq), t.Payload
741 length := min(len(bytes), pageBytes)
742 current.Bytes = current.buf[:length]
743 copy(current.Bytes, bytes)
745 bytes = bytes[length:]
749 seq = seq.Add(length)
750 current.next = a.pc.next(ts)
751 current.next.prev = current
752 current = current.next
755 current.End = t.RST || t.FIN
756 return first, current, numPages
759 // addNextFromConn pops the first page from a connection off and adds it to the
761 func (a *Assembler) addNextFromConn(conn *connection) {
762 if conn.nextSeq == invalidSequence {
764 } else if diff := conn.nextSeq.Difference(conn.first.seq); diff > 0 {
765 conn.first.Skip = int(diff)
767 conn.first.Bytes, conn.nextSeq = byteSpan(conn.nextSeq, conn.first.seq, conn.first.Bytes)
769 log.Printf("%v adding from conn (%v, %v)", conn.key, conn.first.seq, conn.nextSeq)
771 a.ret = append(a.ret, conn.first.Reassembly)
772 a.pc.replace(conn.first)
773 if conn.first == conn.last {
777 conn.first = conn.first.next
778 conn.first.prev = nil
783 func min(a, b int) int {