Improve handling of probes on timeouts
[govpp.git] / core / core.go
index 2484c81..4782ba1 100644 (file)
@@ -12,6 +12,8 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+//go:generate binapi-generator --input-dir=bin_api --output-dir=bin_api
+
 package core
 
 import (
@@ -21,22 +23,28 @@ import (
        "sync/atomic"
        "time"
 
-       logger "github.com/Sirupsen/logrus"
+       logger "github.com/sirupsen/logrus"
 
        "git.fd.io/govpp.git/adapter"
        "git.fd.io/govpp.git/api"
        "git.fd.io/govpp.git/core/bin_api/vpe"
 )
 
+var (
+       msgControlPing      api.Message = &vpe.ControlPing{}
+       msgControlPingReply api.Message = &vpe.ControlPingReply{}
+)
+
 const (
        requestChannelBufSize      = 100 // default size of the request channel buffers
        replyChannelBufSize        = 100 // default size of the reply channel buffers
        notificationChannelBufSize = 100 // default size of the notification channel buffers
 )
 
-const (
+var (
        healthCheckProbeInterval = time.Second * 1        // default health check probe interval
        healthCheckReplyTimeout  = time.Millisecond * 100 // timeout for reply to a health check probe
+       healthCheckThreshold     = 1                      // number of failed healthProbe until the error is reported
 )
 
 // ConnectionState holds the current state of the connection to VPP.
@@ -47,7 +55,7 @@ const (
        Connected ConnectionState = iota
 
        // Disconnected connection state means that the connection to VPP has been lost.
-       Disconnected = iota
+       Disconnected
 )
 
 // ConnectionEvent is a notification about change in the VPP connection state.
@@ -65,18 +73,21 @@ type Connection struct {
        connected uint32             // non-zero if the adapter is connected to VPP
        codec     *MsgCodec          // message codec
 
-       msgIDs     map[string]uint16 // map of message IDs indexed by message name + CRC
        msgIDsLock sync.RWMutex      // lock for the message IDs map
+       msgIDs     map[string]uint16 // map of message IDs indexed by message name + CRC
 
-       channels     map[uint32]*api.Channel // map of all API channels indexed by the channel ID
        channelsLock sync.RWMutex            // lock for the channels map
+       channels     map[uint32]*api.Channel // map of all API channels indexed by the channel ID
 
-       notifSubscriptions     map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID
        notifSubscriptionsLock sync.RWMutex                        // lock for the subscriptions map
+       notifSubscriptions     map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID
 
        maxChannelID uint32 // maximum used client ID
        pingReqID    uint16 // ID if the ControlPing message
        pingReplyID  uint16 // ID of the ControlPingReply message
+
+       lastReplyLock sync.Mutex // lock for the last reply
+       lastReply     time.Time  // time of the last received reply from VPP
 }
 
 // channelMetadata contains core-local metadata of an API channel.
@@ -103,6 +114,34 @@ func SetLogger(l *logger.Logger) {
        log = l
 }
 
+// SetHealthCheckProbeInterval sets health check probe interval.
+// Beware: Function is not thread-safe. It is recommended to setup this parameter
+// before connecting to vpp.
+func SetHealthCheckProbeInterval(interval time.Duration) {
+       healthCheckProbeInterval = interval
+}
+
+// SetHealthCheckReplyTimeout sets timeout for reply to a health check probe.
+// If reply arrives after the timeout, check is considered as failed.
+// Beware: Function is not thread-safe. It is recommended to setup this parameter
+// before connecting to vpp.
+func SetHealthCheckReplyTimeout(timeout time.Duration) {
+       healthCheckReplyTimeout = timeout
+}
+
+// SetHealthCheckThreshold sets the number of failed healthProbe checks until the error is reported.
+// Beware: Function is not thread-safe. It is recommended to setup this parameter
+// before connecting to vpp.
+func SetHealthCheckThreshold(threshold int) {
+       healthCheckThreshold = threshold
+}
+
+// SetControlPingMessages sets the messages for ControlPing and ControlPingReply
+func SetControlPingMessages(controPing, controlPingReply api.Message) {
+       msgControlPing = controPing
+       msgControlPingReply = controlPingReply
+}
+
 // Connect connects to VPP using specified VPP adapter and returns the connection handle.
 // This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed.
 func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) {
@@ -162,10 +201,13 @@ func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) {
                return nil, errors.New("only one connection per process is supported")
        }
 
-       conn = &Connection{vpp: vppAdapter, codec: &MsgCodec{}}
-       conn.channels = make(map[uint32]*api.Channel)
-       conn.msgIDs = make(map[string]uint16)
-       conn.notifSubscriptions = make(map[uint16][]*api.NotifSubscription)
+       conn = &Connection{
+               vpp:                vppAdapter,
+               codec:              &MsgCodec{},
+               channels:           make(map[uint32]*api.Channel),
+               msgIDs:             make(map[string]uint16),
+               notifSubscriptions: make(map[uint16][]*api.NotifSubscription),
+       }
 
        conn.vpp.SetMsgCallback(msgCallback)
        return conn, nil
@@ -182,13 +224,19 @@ func (c *Connection) connectVPP() error {
                return err
        }
 
+       // store control ping IDs
+       if c.pingReqID, err = c.GetMessageID(msgControlPing); err != nil {
+               c.vpp.Disconnect()
+               return err
+       }
+       if c.pingReplyID, err = c.GetMessageID(msgControlPingReply); err != nil {
+               c.vpp.Disconnect()
+               return err
+       }
+
        // store connected state
        atomic.StoreUint32(&c.connected, 1)
 
-       // store control ping IDs
-       c.pingReqID, _ = c.GetMessageID(&vpe.ControlPing{})
-       c.pingReplyID, _ = c.GetMessageID(&vpe.ControlPingReply{})
-
        log.Info("Connected to VPP.")
        return nil
 }
@@ -205,11 +253,16 @@ func (c *Connection) disconnectVPP() {
 func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
        // loop until connected
        for {
-               err := c.connectVPP()
-               if err == nil {
+               if err := c.vpp.WaitReady(); err != nil {
+                       log.Warnf("wait ready failed: %v", err)
+               }
+               if err := c.connectVPP(); err == nil {
                        // signal connected event
                        connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected}
                        break
+               } else {
+                       log.Errorf("connecting to VPP failed: %v", err)
+                       time.Sleep(time.Second)
                }
        }
 
@@ -221,16 +274,19 @@ func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
 // it continues with connectLoop and tries to reconnect.
 func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
        // create a separate API channel for health check probes
-       ch, err := conn.NewAPIChannel()
+       ch, err := conn.NewAPIChannelBuffered(1, 1)
        if err != nil {
-               log.Error("Error by creating health check API channel, health check will be disabled:", err)
+               log.Error("Failed to create health check API channel, health check will be disabled:", err)
                return
        }
 
-       // send health check probes until an error occurs
+       var sinceLastReply time.Duration
+       var failedChecks int
+
+       // send health check probes until an error or timeout occurs
        for {
-               // wait for healthCheckProbeInterval
-               <-time.After(healthCheckProbeInterval)
+               // sleep until next health check probe period
+               time.Sleep(healthCheckProbeInterval)
 
                if atomic.LoadUint32(&c.connected) == 0 {
                        // Disconnect has been called in the meantime, return the healthcheck - reconnect loop
@@ -238,23 +294,56 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
                        return
                }
 
-               // send the control ping
-               ch.ReqChan <- &api.VppRequest{Message: &vpe.ControlPing{}}
-
-               // expect response within timeout period
+               // try draining probe replies from previous request before sending next one
                select {
-               case vppReply := <-ch.ReplyChan:
-                       err = vppReply.Error
-               case <-time.After(healthCheckReplyTimeout):
-                       err = errors.New("probe reply not received within the timeout period")
+               case <-ch.ReplyChan:
+                       log.Debug("drained old probe reply from reply channel")
+               default:
+               }
+
+               // send the control ping request
+               ch.ReqChan <- &api.VppRequest{Message: msgControlPing}
+
+               for {
+                       // expect response within timeout period
+                       select {
+                       case vppReply := <-ch.ReplyChan:
+                               err = vppReply.Error
+
+                       case <-time.After(healthCheckReplyTimeout):
+                               err = ErrProbeTimeout
+
+                               // check if time since last reply from any other
+                               // channel is less than health check reply timeout
+                               conn.lastReplyLock.Lock()
+                               sinceLastReply = time.Since(c.lastReply)
+                               conn.lastReplyLock.Unlock()
+
+                               if sinceLastReply < healthCheckReplyTimeout {
+                                       log.Warnf("VPP health check probe timing out, but some request on other channel was received %v ago, continue waiting!", sinceLastReply)
+                                       continue
+                               }
+                       }
+                       break
                }
 
-               // in case of error, break & disconnect
-               if err != nil {
-                       log.Errorf("VPP health check failed: %v", err)
-                       // signal disconnected event via channel
+               if err == ErrProbeTimeout {
+                       failedChecks++
+                       log.Warnf("VPP health check probe timed out after %v (%d. timeout)", healthCheckReplyTimeout, failedChecks)
+                       if failedChecks > healthCheckThreshold {
+                               // in case of exceeded treshold disconnect
+                               log.Errorf("VPP health check exceeded treshold for timeouts (>%d), assuming disconnect", healthCheckThreshold)
+                               connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
+                               break
+                       }
+               } else if err != nil {
+                       // in case of error disconnect
+                       log.Errorf("VPP health check probe failed: %v", err)
                        connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
                        break
+               } else if failedChecks > 0 {
+                       failedChecks = 0
+                       log.Infof("VPP health check probe OK")
                }
        }