1 // Copyright 2012 Google, Inc. All rights reserved.
3 // Use of this source code is governed by a BSD-style license
4 // that can be found in the LICENSE file in the root of the source
7 // This binary provides sample code for using the gopacket TCP assembler raw,
8 // without the help of the tcpreader library. It watches TCP streams and
9 // reports statistics on completed streams.
11 // It also uses gopacket.DecodingLayerParser instead of the normal
12 // gopacket.PacketSource, to highlight the methods, pros, and cons of this
18 "github.com/google/gopacket"
19 "github.com/google/gopacket/examples/util"
20 "github.com/google/gopacket/layers"
21 "github.com/google/gopacket/pcap"
22 "github.com/google/gopacket/tcpassembly"
27 var iface = flag.String("i", "eth0", "Interface to get packets from")
28 var snaplen = flag.Int("s", 65536, "SnapLen for pcap packet capture")
29 var filter = flag.String("f", "tcp", "BPF filter for pcap")
30 var logAllPackets = flag.Bool("v", false, "Log whenever we see a packet")
31 var bufferedPerConnection = flag.Int("connection_max_buffer", 0, `
32 Max packets to buffer for a single connection before skipping over a gap in data
33 and continuing to stream the connection after the buffer. If zero or less, this
35 var bufferedTotal = flag.Int("total_max_buffer", 0, `
36 Max packets to buffer total before skipping over gaps in connections and
37 continuing to stream connection data. If zero or less, this is infinite`)
38 var flushAfter = flag.String("flush_after", "2m", `
39 Connections which have buffered packets (they've gotten packets out of order and
40 are waiting for old packets to fill the gaps) are flushed after they're this old
41 (their oldest gap is skipped). Any string parsed by time.ParseDuration is
43 var packetCount = flag.Int("c", -1, `
44 Quit after processing this many packets, flushing all currently buffered
45 connections. If negative, this is infinite`)
47 // simpleStreamFactory implements tcpassembly.StreamFactory
48 type statsStreamFactory struct{}
50 // statsStream will handle the actual decoding of stats requests.
51 type statsStream struct {
52 net, transport gopacket.Flow
53 bytes, packets, outOfOrder, skipped int64
58 // New creates a new stream. It's called whenever the assembler sees a stream
59 // it isn't currently following.
60 func (factory *statsStreamFactory) New(net, transport gopacket.Flow) tcpassembly.Stream {
61 log.Printf("new stream %v:%v started", net, transport)
68 // ReaderStream implements tcpassembly.Stream, so we can return a pointer to it.
72 // Reassembled is called whenever new packet data is available for reading.
73 // Reassembly objects contain stream data IN ORDER.
74 func (s *statsStream) Reassembled(reassemblies []tcpassembly.Reassembly) {
75 for _, reassembly := range reassemblies {
76 if reassembly.Seen.Before(s.end) {
79 s.end = reassembly.Seen
81 s.bytes += int64(len(reassembly.Bytes))
83 if reassembly.Skip > 0 {
84 s.skipped += int64(reassembly.Skip)
86 s.sawStart = s.sawStart || reassembly.Start
87 s.sawEnd = s.sawEnd || reassembly.End
91 // ReassemblyComplete is called when the TCP assembler believes a stream has
93 func (s *statsStream) ReassemblyComplete() {
94 diffSecs := float64(s.end.Sub(s.start)) / float64(time.Second)
95 log.Printf("Reassembly of stream %v:%v complete - start:%v end:%v bytes:%v packets:%v ooo:%v bps:%v pps:%v skipped:%v",
96 s.net, s.transport, s.start, s.end, s.bytes, s.packets, s.outOfOrder,
97 float64(s.bytes)/diffSecs, float64(s.packets)/diffSecs, s.skipped)
103 flushDuration, err := time.ParseDuration(*flushAfter)
105 log.Fatal("invalid flush duration: ", *flushAfter)
108 log.Printf("starting capture on interface %q", *iface)
109 // Set up pcap packet capture
110 handle, err := pcap.OpenLive(*iface, int32(*snaplen), true, flushDuration/2)
112 log.Fatal("error opening pcap handle: ", err)
114 if err := handle.SetBPFFilter(*filter); err != nil {
115 log.Fatal("error setting BPF filter: ", err)
119 streamFactory := &statsStreamFactory{}
120 streamPool := tcpassembly.NewStreamPool(streamFactory)
121 assembler := tcpassembly.NewAssembler(streamPool)
122 assembler.MaxBufferedPagesPerConnection = *bufferedPerConnection
123 assembler.MaxBufferedPagesTotal = *bufferedTotal
125 log.Println("reading in packets")
127 // We use a DecodingLayerParser here instead of a simpler PacketSource.
128 // This approach should be measurably faster, but is also more rigid.
129 // PacketSource will handle any known type of packet safely and easily,
130 // but DecodingLayerParser will only handle those packet types we
131 // specifically pass in. This trade-off can be quite useful, though, in
132 // high-throughput situations.
133 var eth layers.Ethernet
134 var dot1q layers.Dot1Q
137 var ip6extensions layers.IPv6ExtensionSkipper
139 var payload gopacket.Payload
140 parser := gopacket.NewDecodingLayerParser(layers.LayerTypeEthernet,
141 ð, &dot1q, &ip4, &ip6, &ip6extensions, &tcp, &payload)
142 decoded := make([]gopacket.LayerType, 0, 4)
144 nextFlush := time.Now().Add(flushDuration / 2)
150 for ; *packetCount != 0; *packetCount-- {
151 // Check to see if we should flush the streams we have
152 // that haven't seen any new data in a while. Note we set a
153 // timeout on our PCAP handle, so this should happen even if we
154 // never see packet data.
155 if time.Now().After(nextFlush) {
156 stats, _ := handle.Stats()
157 log.Printf("flushing all streams that haven't seen packets in the last 2 minutes, pcap stats: %+v", stats)
158 assembler.FlushOlderThan(time.Now().Add(flushDuration))
159 nextFlush = time.Now().Add(flushDuration / 2)
162 // To speed things up, we're also using the ZeroCopy method for
163 // reading packet data. This method is faster than the normal
164 // ReadPacketData, but the returned bytes in 'data' are
165 // invalidated by any subsequent ZeroCopyReadPacketData call.
166 // Note that tcpassembly is entirely compatible with this packet
167 // reading method. This is another trade-off which might be
168 // appropriate for high-throughput sniffing: it avoids a packet
169 // copy, but its cost is much more careful handling of the
170 // resulting byte slice.
171 data, ci, err := handle.ZeroCopyReadPacketData()
174 log.Printf("error getting packet: %v", err)
177 err = parser.DecodeLayers(data, &decoded)
179 log.Printf("error decoding packet: %v", err)
183 log.Printf("decoded the following layers: %v", decoded)
185 byteCount += int64(len(data))
186 // Find either the IPv4 or IPv6 address to use as our network
188 foundNetLayer := false
189 var netFlow gopacket.Flow
190 for _, typ := range decoded {
192 case layers.LayerTypeIPv4:
193 netFlow = ip4.NetworkFlow()
195 case layers.LayerTypeIPv6:
196 netFlow = ip6.NetworkFlow()
198 case layers.LayerTypeTCP:
200 assembler.AssembleWithTimestamp(netFlow, &tcp, ci.Timestamp)
202 log.Println("could not find IPv4 or IPv6 layer, inoring")
207 log.Println("could not find TCP layer")
210 log.Printf("processed %d bytes in %v", byteCount, time.Since(start))