Improve handling of probes on timeouts 31/12831/1
authorOndrej Fabry <ofabry@cisco.com>
Thu, 31 May 2018 14:06:41 +0000 (16:06 +0200)
committerOndrej Fabry <ofabry@cisco.com>
Thu, 31 May 2018 14:06:41 +0000 (16:06 +0200)
Change-Id: If94059586d4be739d6c8ae7843cfaf3bc90a5323
Signed-off-by: Ondrej Fabry <ofabry@cisco.com>
core/core.go
core/request_handler.go

index 6714f6f..4782ba1 100644 (file)
@@ -55,7 +55,7 @@ const (
        Connected ConnectionState = iota
 
        // Disconnected connection state means that the connection to VPP has been lost.
-       Disconnected = iota
+       Disconnected
 )
 
 // ConnectionEvent is a notification about change in the VPP connection state.
@@ -85,6 +85,9 @@ type Connection struct {
        maxChannelID uint32 // maximum used client ID
        pingReqID    uint16 // ID if the ControlPing message
        pingReplyID  uint16 // ID of the ControlPingReply message
+
+       lastReplyLock sync.Mutex // lock for the last reply
+       lastReply     time.Time  // time of the last received reply from VPP
 }
 
 // channelMetadata contains core-local metadata of an API channel.
@@ -271,17 +274,19 @@ func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
 // it continues with connectLoop and tries to reconnect.
 func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
        // create a separate API channel for health check probes
-       ch, err := conn.NewAPIChannel()
+       ch, err := conn.NewAPIChannelBuffered(1, 1)
        if err != nil {
-               log.Error("Error by creating health check API channel, health check will be disabled:", err)
+               log.Error("Failed to create health check API channel, health check will be disabled:", err)
                return
        }
 
-       failedChecks := 0
-       // send health check probes until an error occurs
+       var sinceLastReply time.Duration
+       var failedChecks int
+
+       // send health check probes until an error or timeout occurs
        for {
-               // wait for healthCheckProbeInterval
-               <-time.After(healthCheckProbeInterval)
+               // sleep until next health check probe period
+               time.Sleep(healthCheckProbeInterval)
 
                if atomic.LoadUint32(&c.connected) == 0 {
                        // Disconnect has been called in the meantime, return the healthcheck - reconnect loop
@@ -289,30 +294,56 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
                        return
                }
 
-               // send the control ping
-               ch.ReqChan <- &api.VppRequest{Message: msgControlPing}
-
-               // expect response within timeout period
+               // try draining probe replies from previous request before sending next one
                select {
-               case vppReply := <-ch.ReplyChan:
-                       err = vppReply.Error
-               case <-time.After(healthCheckReplyTimeout):
-                       err = errors.New("probe reply not received within the timeout period")
+               case <-ch.ReplyChan:
+                       log.Debug("drained old probe reply from reply channel")
+               default:
                }
 
-               if err != nil {
-                       failedChecks++
-                       log.Warnf("VPP health check failed (%d. time): %v", failedChecks, err)
-               } else {
-                       failedChecks = 0
+               // send the control ping request
+               ch.ReqChan <- &api.VppRequest{Message: msgControlPing}
+
+               for {
+                       // expect response within timeout period
+                       select {
+                       case vppReply := <-ch.ReplyChan:
+                               err = vppReply.Error
+
+                       case <-time.After(healthCheckReplyTimeout):
+                               err = ErrProbeTimeout
+
+                               // check if time since last reply from any other
+                               // channel is less than health check reply timeout
+                               conn.lastReplyLock.Lock()
+                               sinceLastReply = time.Since(c.lastReply)
+                               conn.lastReplyLock.Unlock()
+
+                               if sinceLastReply < healthCheckReplyTimeout {
+                                       log.Warnf("VPP health check probe timing out, but some request on other channel was received %v ago, continue waiting!", sinceLastReply)
+                                       continue
+                               }
+                       }
+                       break
                }
 
-               if failedChecks > healthCheckThreshold {
-                       // in case of error, break & disconnect
-                       log.Errorf("Number of VPP health check fails exceeded treshold (%d)", healthCheckThreshold, err)
-                       // signal disconnected event via channel
+               if err == ErrProbeTimeout {
+                       failedChecks++
+                       log.Warnf("VPP health check probe timed out after %v (%d. timeout)", healthCheckReplyTimeout, failedChecks)
+                       if failedChecks > healthCheckThreshold {
+                               // in case of exceeded treshold disconnect
+                               log.Errorf("VPP health check exceeded treshold for timeouts (>%d), assuming disconnect", healthCheckThreshold)
+                               connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
+                               break
+                       }
+               } else if err != nil {
+                       // in case of error disconnect
+                       log.Errorf("VPP health check probe failed: %v", err)
                        connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
                        break
+               } else if failedChecks > 0 {
+                       failedChecks = 0
+                       log.Infof("VPP health check probe OK")
                }
        }
 
index 27ff4fc..8f793f5 100644 (file)
@@ -18,6 +18,7 @@ import (
        "errors"
        "fmt"
        "sync/atomic"
+       "time"
 
        logger "github.com/sirupsen/logrus"
 
@@ -26,6 +27,7 @@ import (
 
 var (
        ErrNotConnected = errors.New("not connected to VPP, ignoring the request")
+       ErrProbeTimeout = errors.New("probe reply not received within timeout period")
 )
 
 // watchRequests watches for requests on the request API channel and forwards them as messages to VPP.
@@ -98,7 +100,16 @@ func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, re
        }
 
        // send the request to VPP
-       c.vpp.SendMsg(chMeta.id, data)
+       err = c.vpp.SendMsg(chMeta.id, data)
+       if err != nil {
+               err = fmt.Errorf("unable to send the messge: %v", err)
+               log.WithFields(logger.Fields{
+                       "context": chMeta.id,
+                       "msg_id":  msgID,
+               }).Error(err)
+               sendReply(ch, &api.VppReply{Error: err})
+               return err
+       }
 
        if req.Multipart {
                // send a control ping to determine end of the multipart response
@@ -166,6 +177,11 @@ func msgCallback(context uint32, msgID uint16, data []byte) {
                Data:              data,
                LastReplyReceived: lastReplyReceived,
        })
+
+       // store actual time of this reply
+       conn.lastReplyLock.Lock()
+       conn.lastReply = time.Now()
+       conn.lastReplyLock.Unlock()
 }
 
 // sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise
@@ -174,8 +190,8 @@ func sendReply(ch *api.Channel, reply *api.VppReply) {
        select {
        case ch.ReplyChan <- reply:
                // reply sent successfully
-       default:
-               // unable to write into the channel without blocking
+       case <-time.After(time.Millisecond * 100):
+               // receiver still not ready
                log.WithFields(logger.Fields{
                        "channel": ch,
                        "msg_id":  reply.MessageID,