Fix memory leak in health check 59/34159/2
authorVladimir Lavor <vlavor@cisco.com>
Tue, 19 Oct 2021 10:17:15 +0000 (12:17 +0200)
committerVladimir Lavor <vlavor@cisco.com>
Tue, 19 Oct 2021 10:51:40 +0000 (12:51 +0200)
1. On connection disconnect, health check loop ends immediately
   and independently on probe interval setup
2. Proper resource cleanup

On disconnect, health check terminates itself with a delay based
on the probe interval. It causes memory usage going up if
AsyncConnect() & Disconnect() are called in a loop.

Signed-off-by: Vladimir Lavor <vlavor@cisco.com>
Change-Id: Ibcdfec9d2d7736f9390e21039d9a917228114273

core/connection.go

index 935693e..442eb51 100644 (file)
@@ -102,7 +102,8 @@ type Connection struct {
 
        vppConnected uint32 // non-zero if the adapter is connected to VPP
 
-       connChan chan ConnectionEvent // connection status events are sent to this channel
+       connChan        chan ConnectionEvent // connection status events are sent to this channel
+       healthCheckDone chan struct{}        // used to terminate health check loop
 
        codec        MessageCodec                      // message codec
        msgIDs       map[string]uint16                 // map of message IDs indexed by message name + CRC
@@ -140,6 +141,7 @@ func newConnection(binapi adapter.VppAPI, attempts int, interval time.Duration)
                maxAttempts:         attempts,
                recInterval:         interval,
                connChan:            make(chan ConnectionEvent, NotificationChanBufSize),
+               healthCheckDone:     make(chan struct{}),
                codec:               codec.DefaultCodec,
                msgIDs:              make(map[string]uint16),
                msgMapByPath:        make(map[string]map[uint16]api.Message),
@@ -221,6 +223,7 @@ func (c *Connection) Disconnect() {
 // disconnectVPP disconnects from VPP in case it is connected.
 func (c *Connection) disconnectVPP() {
        if atomic.CompareAndSwapUint32(&c.vppConnected, 1, 0) {
+               close(c.healthCheckDone)
                log.Debug("Disconnecting from VPP..")
 
                if err := c.vppClient.Disconnect(); err != nil {
@@ -302,6 +305,7 @@ func (c *Connection) healthCheckLoop() {
                log.Error("Failed to create health check API channel, health check will be disabled:", err)
                return
        }
+       defer ch.Close()
 
        var (
                sinceLastReply time.Duration
@@ -309,72 +313,73 @@ func (c *Connection) healthCheckLoop() {
        )
 
        // send health check probes until an error or timeout occurs
-       for {
-               // sleep until next health check probe period
-               time.Sleep(HealthCheckProbeInterval)
+       probeInterval := time.NewTicker(HealthCheckProbeInterval)
+       defer probeInterval.Stop()
 
-               if atomic.LoadUint32(&c.vppConnected) == 0 {
-                       // Disconnect has been called in the meantime, return the healthcheck - reconnect loop
+HealthCheck:
+       for {
+               select {
+               case <-c.healthCheckDone:
+                       // Terminate the health check loop on connection disconnect
                        log.Debug("Disconnected on request, exiting health check loop.")
                        return
-               }
-
-               // try draining probe replies from previous request before sending next one
-               select {
-               case <-ch.replyChan:
-                       log.Debug("drained old probe reply from reply channel")
-               default:
-               }
+               case <-probeInterval.C:
+                       // try draining probe replies from previous request before sending next one
+                       select {
+                       case <-ch.replyChan:
+                               log.Debug("drained old probe reply from reply channel")
+                       default:
+                       }
 
-               // send the control ping request
-               ch.reqChan <- &vppRequest{msg: c.msgControlPing}
+                       // send the control ping request
+                       ch.reqChan <- &vppRequest{msg: c.msgControlPing}
 
-               for {
-                       // expect response within timeout period
-                       select {
-                       case vppReply := <-ch.replyChan:
-                               err = vppReply.err
+                       for {
+                               // expect response within timeout period
+                               select {
+                               case vppReply := <-ch.replyChan:
+                                       err = vppReply.err
 
-                       case <-time.After(HealthCheckReplyTimeout):
-                               err = ErrProbeTimeout
+                               case <-time.After(HealthCheckReplyTimeout):
+                                       err = ErrProbeTimeout
 
-                               // check if time since last reply from any other
-                               // channel is less than health check reply timeout
-                               c.lastReplyLock.Lock()
-                               sinceLastReply = time.Since(c.lastReply)
-                               c.lastReplyLock.Unlock()
+                                       // check if time since last reply from any other
+                                       // channel is less than health check reply timeout
+                                       c.lastReplyLock.Lock()
+                                       sinceLastReply = time.Since(c.lastReply)
+                                       c.lastReplyLock.Unlock()
 
-                               if sinceLastReply < HealthCheckReplyTimeout {
-                                       log.Warnf("VPP health check probe timing out, but some request on other channel was received %v ago, continue waiting!", sinceLastReply)
-                                       continue
+                                       if sinceLastReply < HealthCheckReplyTimeout {
+                                               log.Warnf("VPP health check probe timing out, but some request on other channel was received %v ago, continue waiting!", sinceLastReply)
+                                               continue
+                                       }
                                }
+                               break
                        }
-                       break
-               }
 
-               if err == ErrProbeTimeout {
-                       failedChecks++
-                       log.Warnf("VPP health check probe timed out after %v (%d. timeout)", HealthCheckReplyTimeout, failedChecks)
-                       if failedChecks > HealthCheckThreshold {
-                               // in case of exceeded failed check threshold, assume VPP unresponsive
-                               log.Errorf("VPP does not responding, the health check exceeded threshold for timeouts (>%d)", HealthCheckThreshold)
-                               c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: NotResponding})
-                               break
+                       if err == ErrProbeTimeout {
+                               failedChecks++
+                               log.Warnf("VPP health check probe timed out after %v (%d. timeout)", HealthCheckReplyTimeout, failedChecks)
+                               if failedChecks > HealthCheckThreshold {
+                                       // in case of exceeded failed check threshold, assume VPP unresponsive
+                                       log.Errorf("VPP does not responding, the health check exceeded threshold for timeouts (>%d)", HealthCheckThreshold)
+                                       c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: NotResponding})
+                                       break HealthCheck
+                               }
+                       } else if err != nil {
+                               // in case of error, assume VPP disconnected
+                               log.Errorf("VPP health check probe failed: %v", err)
+                               c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: err})
+                               break HealthCheck
+                       } else if failedChecks > 0 {
+                               // in case of success after failed checks, clear failed check counter
+                               failedChecks = 0
+                               log.Infof("VPP health check probe OK")
                        }
-               } else if err != nil {
-                       // in case of error, assume VPP disconnected
-                       log.Errorf("VPP health check probe failed: %v", err)
-                       c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: err})
-                       break
-               } else if failedChecks > 0 {
-                       // in case of success after failed checks, clear failed check counter
-                       failedChecks = 0
-                       log.Infof("VPP health check probe OK")
                }
        }
 
        // cleanup
-       ch.Close()
        c.disconnectVPP()
 
        // we are now disconnected, start connect loop