Improve handling of probes on timeouts
[govpp.git] / core / core.go
index 6714f6f..4782ba1 100644 (file)
@@ -55,7 +55,7 @@ const (
        Connected ConnectionState = iota
 
        // Disconnected connection state means that the connection to VPP has been lost.
-       Disconnected = iota
+       Disconnected
 )
 
 // ConnectionEvent is a notification about change in the VPP connection state.
@@ -85,6 +85,9 @@ type Connection struct {
        maxChannelID uint32 // maximum used client ID
        pingReqID    uint16 // ID if the ControlPing message
        pingReplyID  uint16 // ID of the ControlPingReply message
+
+       lastReplyLock sync.Mutex // lock for the last reply
+       lastReply     time.Time  // time of the last received reply from VPP
 }
 
 // channelMetadata contains core-local metadata of an API channel.
@@ -271,17 +274,19 @@ func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
 // it continues with connectLoop and tries to reconnect.
 func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
        // create a separate API channel for health check probes
-       ch, err := conn.NewAPIChannel()
+       ch, err := conn.NewAPIChannelBuffered(1, 1)
        if err != nil {
-               log.Error("Error by creating health check API channel, health check will be disabled:", err)
+               log.Error("Failed to create health check API channel, health check will be disabled:", err)
                return
        }
 
-       failedChecks := 0
-       // send health check probes until an error occurs
+       var sinceLastReply time.Duration
+       var failedChecks int
+
+       // send health check probes until an error or timeout occurs
        for {
-               // wait for healthCheckProbeInterval
-               <-time.After(healthCheckProbeInterval)
+               // sleep until next health check probe period
+               time.Sleep(healthCheckProbeInterval)
 
                if atomic.LoadUint32(&c.connected) == 0 {
                        // Disconnect has been called in the meantime, return the healthcheck - reconnect loop
@@ -289,30 +294,56 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
                        return
                }
 
-               // send the control ping
-               ch.ReqChan <- &api.VppRequest{Message: msgControlPing}
-
-               // expect response within timeout period
+               // try draining probe replies from previous request before sending next one
                select {
-               case vppReply := <-ch.ReplyChan:
-                       err = vppReply.Error
-               case <-time.After(healthCheckReplyTimeout):
-                       err = errors.New("probe reply not received within the timeout period")
+               case <-ch.ReplyChan:
+                       log.Debug("drained old probe reply from reply channel")
+               default:
                }
 
-               if err != nil {
-                       failedChecks++
-                       log.Warnf("VPP health check failed (%d. time): %v", failedChecks, err)
-               } else {
-                       failedChecks = 0
+               // send the control ping request
+               ch.ReqChan <- &api.VppRequest{Message: msgControlPing}
+
+               for {
+                       // expect response within timeout period
+                       select {
+                       case vppReply := <-ch.ReplyChan:
+                               err = vppReply.Error
+
+                       case <-time.After(healthCheckReplyTimeout):
+                               err = ErrProbeTimeout
+
+                               // check if time since last reply from any other
+                               // channel is less than health check reply timeout
+                               conn.lastReplyLock.Lock()
+                               sinceLastReply = time.Since(c.lastReply)
+                               conn.lastReplyLock.Unlock()
+
+                               if sinceLastReply < healthCheckReplyTimeout {
+                                       log.Warnf("VPP health check probe timing out, but some request on other channel was received %v ago, continue waiting!", sinceLastReply)
+                                       continue
+                               }
+                       }
+                       break
                }
 
-               if failedChecks > healthCheckThreshold {
-                       // in case of error, break & disconnect
-                       log.Errorf("Number of VPP health check fails exceeded treshold (%d)", healthCheckThreshold, err)
-                       // signal disconnected event via channel
+               if err == ErrProbeTimeout {
+                       failedChecks++
+                       log.Warnf("VPP health check probe timed out after %v (%d. timeout)", healthCheckReplyTimeout, failedChecks)
+                       if failedChecks > healthCheckThreshold {
+                               // in case of exceeded treshold disconnect
+                               log.Errorf("VPP health check exceeded treshold for timeouts (>%d), assuming disconnect", healthCheckThreshold)
+                               connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
+                               break
+                       }
+               } else if err != nil {
+                       // in case of error disconnect
+                       log.Errorf("VPP health check probe failed: %v", err)
                        connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
                        break
+               } else if failedChecks > 0 {
+                       failedChecks = 0
+                       log.Infof("VPP health check probe OK")
                }
        }