1 // Copyright 2012 Google, Inc. All rights reserved.
3 // Use of this source code is governed by a BSD-style license
4 // that can be found in the LICENSE file in the root of the source
15 "github.com/google/gopacket/layers"
18 var memLog = flag.Bool("assembly_memuse_log", defaultDebug, "If true, the github.com/google/gopacket/reassembly library will log information regarding its memory use every once in a while.")
23 // pageCache is a concurrency-unsafe store of page objects we use to avoid
24 // memory allocation as much as we can.
25 type pageCache struct {
34 const initialAllocSize = 1024
36 func newPageCache() *pageCache {
38 free: make([]*page, 0, initialAllocSize),
39 pcSize: initialAllocSize,
45 // grow exponentially increases the size of our page cache as much as necessary.
46 func (c *pageCache) grow() {
47 pages := make([]page, c.pcSize)
49 for i := range pages {
50 c.free = append(c.free, &pages[i])
53 log.Println("PageCache: created", c.pcSize, "new pages, size:", c.size, "cap:", cap(c.free), "len:", len(c.free))
55 // control next shrink attempt
56 c.nextShrink = c.pcSize
58 // prepare for next alloc
62 // Remove references to unused pages to let GC collect them
63 // Note: memory used by c.free itself it not collected.
64 func (c *pageCache) tryShrink() {
65 var min = c.pcSize / 2
66 if min < initialAllocSize {
67 min = initialAllocSize
69 if len(c.free) <= min {
72 for i := range c.free[min:] {
75 c.size -= len(c.free) - min
80 // next returns a clean, ready-to-use page object.
81 func (c *pageCache) next(ts time.Time) (p *page) {
84 if c.pageRequests&0xFFFF == 0 {
85 log.Println("PageCache:", c.pageRequests, "requested,", c.used, "used,", len(c.free), "free")
92 p, c.free = c.free[i], c.free[:i]
97 log.Printf("allocator returns %s\n", p)
100 if c.ops > c.nextShrink {
108 // replace replaces a page into the pageCache.
109 func (c *pageCache) replace(p *page) {
112 log.Printf("replacing %s\n", p)
116 c.free = append(c.free, p)
123 // StreamPool stores all streams created by Assemblers, allowing multiple
124 // assemblers to work together on stream processing while enforcing the fact
125 // that a single stream receives its data serially. It is safe
126 // for concurrency, usable by multiple Assemblers at once.
128 // StreamPool handles the creation and storage of Stream objects used by one or
129 // more Assembler objects. When a new TCP stream is found by an Assembler, it
130 // creates an associated Stream by calling its StreamFactory's New method.
131 // Thereafter (until the stream is closed), that Stream object will receive
132 // assembled TCP data via Assembler's calls to the stream's Reassembled
135 // Like the Assembler, StreamPool attempts to minimize allocation. Unlike the
136 // Assembler, though, it does have to do some locking to make sure that the
137 // connection objects it stores are accessible to multiple Assemblers.
138 type StreamPool struct {
139 conns map[key]*connection
142 factory StreamFactory
146 newConnectionCount int64
149 func (p *StreamPool) grow() {
150 conns := make([]connection, p.nextAlloc)
151 p.all = append(p.all, conns)
152 for i := range conns {
153 p.free = append(p.free, &conns[i])
156 log.Println("StreamPool: created", p.nextAlloc, "new connections")
161 // Dump logs all connections
162 func (p *StreamPool) Dump() {
165 log.Printf("Remaining %d connections: ", len(p.conns))
166 for _, conn := range p.conns {
167 log.Printf("%v %s", conn.key, conn)
171 func (p *StreamPool) remove(conn *connection) {
173 if _, ok := p.conns[conn.key]; ok {
174 delete(p.conns, conn.key)
175 p.free = append(p.free, conn)
180 // NewStreamPool creates a new connection pool. Streams will
181 // be created as necessary using the passed-in StreamFactory.
182 func NewStreamPool(factory StreamFactory) *StreamPool {
184 conns: make(map[key]*connection, initialAllocSize),
185 free: make([]*connection, 0, initialAllocSize),
187 nextAlloc: initialAllocSize,
191 func (p *StreamPool) connections() []*connection {
193 conns := make([]*connection, 0, len(p.conns))
194 for _, conn := range p.conns {
195 conns = append(conns, conn)
201 func (p *StreamPool) newConnection(k key, s Stream, ts time.Time) (c *connection, h *halfconnection, r *halfconnection) {
203 p.newConnectionCount++
204 if p.newConnectionCount&0x7FFF == 0 {
205 log.Println("StreamPool:", p.newConnectionCount, "requests,", len(p.conns), "used,", len(p.free), "free")
208 if len(p.free) == 0 {
211 index := len(p.free) - 1
212 c, p.free = p.free[index], p.free[:index]
214 return c, &c.c2s, &c.s2c
217 func (p *StreamPool) getHalf(k key) (*connection, *halfconnection, *halfconnection) {
220 return conn, &conn.c2s, &conn.s2c
225 return conn, &conn.s2c, &conn.c2s
230 // getConnection returns a connection. If end is true and a connection
231 // does not already exist, returns nil. This allows us to check for a
232 // connection without actually creating one if it doesn't already exist.
233 func (p *StreamPool) getConnection(k key, end bool, ts time.Time, tcp *layers.TCP, ac AssemblerContext) (*connection, *halfconnection, *halfconnection) {
235 conn, half, rev := p.getHalf(k)
237 if end || conn != nil {
238 return conn, half, rev
240 s := p.factory.New(k[0], k[1], tcp, ac)
243 conn, half, rev = p.newConnection(k, s, ts)
244 conn2, half2, rev2 := p.getHalf(k)
247 panic("FIXME: other dir added in the meantime...")
250 return conn2, half2, rev2
253 return conn, half, rev