1 // Copyright 2012 Google, Inc. All rights reserved.
3 // Use of this source code is governed by a BSD-style license
4 // that can be found in the LICENSE file in the root of the source
9 // Package afpacket provides Go bindings for MMap'd AF_PACKET socket reading.
12 // Couldn't have done this without:
13 // http://lxr.free-electrons.com/source/Documentation/networking/packet_mmap.txt
14 // http://codemonkeytips.blogspot.co.uk/2011/07/asynchronous-packet-socket-reading-with.html
26 "golang.org/x/net/bpf"
27 "golang.org/x/sys/unix"
29 "github.com/google/gopacket"
33 #include <linux/if_packet.h> // AF_PACKET, sockaddr_ll
34 #include <linux/if_ether.h> // ETH_P_ALL
35 #include <sys/socket.h> // socket()
36 #include <unistd.h> // close()
37 #include <arpa/inet.h> // htons()
38 #include <sys/mman.h> // mmap(), munmap()
39 #include <poll.h> // poll()
43 var pageSize = unix.Getpagesize()
44 var tpacketAlignment = uint(C.TPACKET_ALIGNMENT)
46 // ErrPoll returned by poll
47 var ErrPoll = errors.New("packet poll failed")
49 // ErrTimeout returned on poll timeout
50 var ErrTimeout = errors.New("packet poll timeout expired")
52 func tpacketAlign(v int) int {
53 return int((uint(v) + tpacketAlignment - 1) & ((^tpacketAlignment) - 1))
56 // Stats is a set of counters detailing the work TPacket has done so far.
58 // Packets is the total number of packets returned to the caller.
60 // Polls is the number of blocking syscalls made waiting for packets.
61 // This should always be <= Packets, since with TPacket one syscall
62 // can (and often does) return many results.
66 // SocketStats is a struct where socket stats are stored
67 type SocketStats C.struct_tpacket_stats
69 // SocketStatsV3 is a struct where socket stats for TPacketV3 are stored
70 type SocketStatsV3 C.struct_tpacket_stats_v3
72 // TPacket implements packet receiving for Linux AF_PACKET versions 1, 2, and 3.
74 // fd is the C file descriptor.
76 // ring points to the memory space of the ring buffer shared by tpacket and the kernel.
78 // rawring is the unsafe pointer that we use to poll for packets
79 rawring unsafe.Pointer
80 // opts contains read-only options for the TPacket object.
82 mu sync.Mutex // guards below
83 // offset is the offset into the ring of the current header.
85 // current is the current header.
87 // pollset is used by TPacket for its poll() call.
89 // shouldReleasePacket is set to true whenever we return packet data, to make sure we remember to release that data back to the kernel.
90 shouldReleasePacket bool
91 // headerNextNeeded is set to true when header need to move to the next packet. No need to move it case of poll error.
93 // tpVersion is the version of TPacket actually in use, set by setRequestedTPacketVersion.
94 tpVersion OptTPacketVersion
95 // Hackity hack hack hack. We need to return a pointer to the header with
96 // getTPacketHeader, and we don't want to allocate a v3wrapper every time,
97 // so we leave it in the TPacket object and return a pointer to it.
100 statsMu sync.Mutex // guards stats below
101 // stats is simple statistics on TPacket's run.
103 // socketStats contains stats from the socket
104 socketStats SocketStats
105 // same as socketStats, but with an extra field freeze_q_cnt
106 socketStatsV3 SocketStatsV3
109 var _ gopacket.ZeroCopyPacketDataSource = &TPacket{}
111 // bindToInterface binds the TPacket socket to a particular named interface.
112 func (h *TPacket) bindToInterface(ifaceName string) error {
114 // An empty string here means to listen to all interfaces
116 iface, err := net.InterfaceByName(ifaceName)
118 return fmt.Errorf("InterfaceByName: %v", err)
120 ifIndex = iface.Index
122 s := &unix.SockaddrLinklayer{
123 Protocol: htons(uint16(unix.ETH_P_ALL)),
126 return unix.Bind(h.fd, s)
129 // setTPacketVersion asks the kernel to set TPacket to a particular version, and returns an error on failure.
130 func (h *TPacket) setTPacketVersion(version OptTPacketVersion) error {
131 if err := unix.SetsockoptInt(h.fd, unix.SOL_PACKET, unix.PACKET_VERSION, int(version)); err != nil {
132 return fmt.Errorf("setsockopt packet_version: %v", err)
137 // setRequestedTPacketVersion tries to set TPacket to the requested version or versions.
138 func (h *TPacket) setRequestedTPacketVersion() error {
140 case (h.opts.version == TPacketVersionHighestAvailable || h.opts.version == TPacketVersion3) && h.setTPacketVersion(TPacketVersion3) == nil:
141 h.tpVersion = TPacketVersion3
142 case (h.opts.version == TPacketVersionHighestAvailable || h.opts.version == TPacketVersion2) && h.setTPacketVersion(TPacketVersion2) == nil:
143 h.tpVersion = TPacketVersion2
144 case (h.opts.version == TPacketVersionHighestAvailable || h.opts.version == TPacketVersion1) && h.setTPacketVersion(TPacketVersion1) == nil:
145 h.tpVersion = TPacketVersion1
147 return errors.New("no known tpacket versions work on this machine")
152 // setUpRing sets up the shared-memory ring buffer between the user process and the kernel.
153 func (h *TPacket) setUpRing() (err error) {
154 totalSize := int(h.opts.framesPerBlock * h.opts.numBlocks * h.opts.frameSize)
156 case TPacketVersion1, TPacketVersion2:
157 var tp C.struct_tpacket_req
158 tp.tp_block_size = C.uint(h.opts.blockSize)
159 tp.tp_block_nr = C.uint(h.opts.numBlocks)
160 tp.tp_frame_size = C.uint(h.opts.frameSize)
161 tp.tp_frame_nr = C.uint(h.opts.framesPerBlock * h.opts.numBlocks)
162 if err := setsockopt(h.fd, unix.SOL_PACKET, unix.PACKET_RX_RING, unsafe.Pointer(&tp), unsafe.Sizeof(tp)); err != nil {
163 return fmt.Errorf("setsockopt packet_rx_ring: %v", err)
165 case TPacketVersion3:
166 var tp C.struct_tpacket_req3
167 tp.tp_block_size = C.uint(h.opts.blockSize)
168 tp.tp_block_nr = C.uint(h.opts.numBlocks)
169 tp.tp_frame_size = C.uint(h.opts.frameSize)
170 tp.tp_frame_nr = C.uint(h.opts.framesPerBlock * h.opts.numBlocks)
171 tp.tp_retire_blk_tov = C.uint(h.opts.blockTimeout / time.Millisecond)
172 if err := setsockopt(h.fd, unix.SOL_PACKET, unix.PACKET_RX_RING, unsafe.Pointer(&tp), unsafe.Sizeof(tp)); err != nil {
173 return fmt.Errorf("setsockopt packet_rx_ring v3: %v", err)
176 return errors.New("invalid tpVersion")
178 h.ring, err = unix.Mmap(h.fd, 0, totalSize, unix.PROT_READ|unix.PROT_WRITE, unix.MAP_SHARED)
183 return errors.New("no ring")
185 h.rawring = unsafe.Pointer(&h.ring[0])
189 // Close cleans up the TPacket. It should not be used after the Close call.
190 func (h *TPacket) Close() {
192 return // already closed.
200 runtime.SetFinalizer(h, nil)
203 // NewTPacket returns a new TPacket object for reading packets off the wire.
204 // Its behavior may be modified by passing in any/all of afpacket.Opt* to this
206 // If this function succeeds, the user should be sure to Close the returned
207 // TPacket when finished with it.
208 func NewTPacket(opts ...interface{}) (h *TPacket, err error) {
210 if h.opts, err = parseOptions(opts...); err != nil {
213 fd, err := unix.Socket(unix.AF_PACKET, int(h.opts.socktype), int(htons(unix.ETH_P_ALL)))
218 if err = h.bindToInterface(h.opts.iface); err != nil {
221 if err = h.setRequestedTPacketVersion(); err != nil {
224 if err = h.setUpRing(); err != nil {
227 // Clear stat counter from socket
228 if err = h.InitSocketStats(); err != nil {
231 runtime.SetFinalizer(h, (*TPacket).Close)
238 // SetBPF attaches a BPF filter to the underlying socket
239 func (h *TPacket) SetBPF(filter []bpf.RawInstruction) error {
241 if len(filter) > int(^uint16(0)) {
242 return errors.New("filter too large")
244 p.Len = uint16(len(filter))
245 p.Filter = (*unix.SockFilter)(unsafe.Pointer(&filter[0]))
247 return setsockopt(h.fd, unix.SOL_SOCKET, unix.SO_ATTACH_FILTER, unsafe.Pointer(&p), unix.SizeofSockFprog)
250 func (h *TPacket) releaseCurrentPacket() error {
251 h.current.clearStatus()
253 h.shouldReleasePacket = false
257 // ZeroCopyReadPacketData reads the next packet off the wire, and returns its data.
258 // The slice returned by ZeroCopyReadPacketData points to bytes owned by the
259 // TPacket. Each call to ZeroCopyReadPacketData invalidates any data previously
260 // returned by ZeroCopyReadPacketData. Care must be taken not to keep pointers
261 // to old bytes when using ZeroCopyReadPacketData... if you need to keep data past
262 // the next time you call ZeroCopyReadPacketData, use ReadPacketData, which copies
263 // the bytes into a new buffer for you.
264 // tp, _ := NewTPacket(...)
265 // data1, _, _ := tp.ZeroCopyReadPacketData()
266 // // do everything you want with data1 here, copying bytes out of it if you'd like to keep them around.
267 // data2, _, _ := tp.ZeroCopyReadPacketData() // invalidates bytes in data1
268 func (h *TPacket) ZeroCopyReadPacketData() (data []byte, ci gopacket.CaptureInfo, err error) {
271 if h.current == nil || !h.headerNextNeeded || !h.current.next() {
272 if h.shouldReleasePacket {
273 h.releaseCurrentPacket()
275 h.current = h.getTPacketHeader()
276 if err = h.pollForFirstPacket(h.current); err != nil {
277 h.headerNextNeeded = false
281 // We received an empty block
282 if h.current.getLength() == 0 {
286 data = h.current.getData()
287 ci.Timestamp = h.current.getTime()
288 ci.CaptureLength = len(data)
289 ci.Length = h.current.getLength()
290 ci.InterfaceIndex = h.current.getIfaceIndex()
291 atomic.AddInt64(&h.stats.Packets, 1)
292 h.headerNextNeeded = true
298 // Stats returns statistics on the packets the TPacket has seen so far.
299 func (h *TPacket) Stats() (Stats, error) {
301 Polls: atomic.LoadInt64(&h.stats.Polls),
302 Packets: atomic.LoadInt64(&h.stats.Packets),
306 // InitSocketStats clears socket counters and return empty stats.
307 func (h *TPacket) InitSocketStats() error {
308 if h.tpVersion == TPacketVersion3 {
309 socklen := unsafe.Sizeof(h.socketStatsV3)
310 slt := C.socklen_t(socklen)
311 var ssv3 SocketStatsV3
313 err := getsockopt(h.fd, unix.SOL_PACKET, unix.PACKET_STATISTICS, unsafe.Pointer(&ssv3), uintptr(unsafe.Pointer(&slt)))
317 h.socketStatsV3 = SocketStatsV3{}
319 socklen := unsafe.Sizeof(h.socketStats)
320 slt := C.socklen_t(socklen)
323 err := getsockopt(h.fd, unix.SOL_PACKET, unix.PACKET_STATISTICS, unsafe.Pointer(&ss), uintptr(unsafe.Pointer(&slt)))
327 h.socketStats = SocketStats{}
332 // SocketStats saves stats from the socket to the TPacket instance.
333 func (h *TPacket) SocketStats() (SocketStats, SocketStatsV3, error) {
335 defer h.statsMu.Unlock()
336 // We need to save the counters since asking for the stats will clear them
337 if h.tpVersion == TPacketVersion3 {
338 socklen := unsafe.Sizeof(h.socketStatsV3)
339 slt := C.socklen_t(socklen)
340 var ssv3 SocketStatsV3
342 err := getsockopt(h.fd, unix.SOL_PACKET, unix.PACKET_STATISTICS, unsafe.Pointer(&ssv3), uintptr(unsafe.Pointer(&slt)))
344 return SocketStats{}, SocketStatsV3{}, err
347 h.socketStatsV3.tp_packets += ssv3.tp_packets
348 h.socketStatsV3.tp_drops += ssv3.tp_drops
349 h.socketStatsV3.tp_freeze_q_cnt += ssv3.tp_freeze_q_cnt
350 return h.socketStats, h.socketStatsV3, nil
352 socklen := unsafe.Sizeof(h.socketStats)
353 slt := C.socklen_t(socklen)
356 err := getsockopt(h.fd, unix.SOL_PACKET, unix.PACKET_STATISTICS, unsafe.Pointer(&ss), uintptr(unsafe.Pointer(&slt)))
358 return SocketStats{}, SocketStatsV3{}, err
361 h.socketStats.tp_packets += ss.tp_packets
362 h.socketStats.tp_drops += ss.tp_drops
363 return h.socketStats, h.socketStatsV3, nil
366 // ReadPacketDataTo reads packet data into a user-supplied buffer.
367 // This function reads up to the length of the passed-in slice.
368 // The number of bytes read into data will be returned in ci.CaptureLength,
369 // which is the minimum of the size of the passed-in buffer and the size of
370 // the captured packet.
371 func (h *TPacket) ReadPacketDataTo(data []byte) (ci gopacket.CaptureInfo, err error) {
373 d, ci, err = h.ZeroCopyReadPacketData()
377 ci.CaptureLength = copy(data, d)
381 // ReadPacketData reads the next packet, copies it into a new buffer, and returns
382 // that buffer. Since the buffer is allocated by ReadPacketData, it is safe for long-term
383 // use. This implements gopacket.PacketDataSource.
384 func (h *TPacket) ReadPacketData() (data []byte, ci gopacket.CaptureInfo, err error) {
386 d, ci, err = h.ZeroCopyReadPacketData()
390 data = make([]byte, len(d))
395 func (h *TPacket) getTPacketHeader() header {
397 case TPacketVersion1:
398 if h.offset >= h.opts.framesPerBlock*h.opts.numBlocks {
401 position := uintptr(h.rawring) + uintptr(h.opts.frameSize*h.offset)
402 return (*v1header)(unsafe.Pointer(position))
403 case TPacketVersion2:
404 if h.offset >= h.opts.framesPerBlock*h.opts.numBlocks {
407 position := uintptr(h.rawring) + uintptr(h.opts.frameSize*h.offset)
408 return (*v2header)(unsafe.Pointer(position))
409 case TPacketVersion3:
410 // TPacket3 uses each block to return values, instead of each frame. Hence we need to rotate when we hit #blocks, not #frames.
411 if h.offset >= h.opts.numBlocks {
414 position := uintptr(h.rawring) + uintptr(h.opts.frameSize*h.offset*h.opts.framesPerBlock)
415 h.v3 = initV3Wrapper(unsafe.Pointer(position))
418 panic("handle tpacket version is invalid")
421 func (h *TPacket) pollForFirstPacket(hdr header) error {
422 tm := int(h.opts.pollTimeout / time.Millisecond)
423 for hdr.getStatus()&C.TP_STATUS_USER == 0 {
424 h.pollset.Fd = int32(h.fd)
425 h.pollset.Events = unix.POLLIN
426 h.pollset.Revents = 0
427 n, err := unix.Poll([]unix.PollFd{h.pollset}, tm)
432 atomic.AddInt64(&h.stats.Polls, 1)
433 if h.pollset.Revents&unix.POLLERR > 0 {
441 h.shouldReleasePacket = true
445 // FanoutType determines the type of fanout to use with a TPacket SetFanout call.
448 // FanoutType values.
450 FanoutHash FanoutType = 0
451 // It appears that defrag only works with FanoutHash, see:
452 // http://lxr.free-electrons.com/source/net/packet/af_packet.c#L1204
453 FanoutHashWithDefrag FanoutType = 0x8000
454 FanoutLoadBalance FanoutType = 1
455 FanoutCPU FanoutType = 2
458 // SetFanout activates TPacket's fanout ability.
459 // Use of Fanout requires creating multiple TPacket objects and the same id/type to
460 // a SetFanout call on each. Note that this can be done cross-process, so if two
461 // different processes both call SetFanout with the same type/id, they'll share
462 // packets between them. The same should work for multiple TPacket objects within
464 func (h *TPacket) SetFanout(t FanoutType, id uint16) error {
467 arg := C.int(t) << 16
469 return setsockopt(h.fd, unix.SOL_PACKET, unix.PACKET_FANOUT, unsafe.Pointer(&arg), unsafe.Sizeof(arg))
472 // WritePacketData transmits a raw packet.
473 func (h *TPacket) WritePacketData(pkt []byte) error {
474 _, err := unix.Write(h.fd, pkt)