Added VPP state 'NotResponding'
[govpp.git] / core / connection.go
index 6f82616..53a9acf 100644 (file)
@@ -54,7 +54,11 @@ const (
        // Connected represents state in which the connection has been successfully established.
        Connected ConnectionState = iota
 
-       // Disconnected represents state in which the connection has been dropped.
+       // NotResponding represents a state where the VPP socket accepts messages but replies are received with delay,
+       // or not at all. GoVPP treats this state internally the same as disconnected.
+       NotResponding
+
+       // Disconnected represents state in which the VPP socket is closed and the connection is considered dropped.
        Disconnected
 
        // Failed represents state in which the reconnecting failed after exceeding maximum number of attempts.
@@ -65,6 +69,8 @@ func (s ConnectionState) String() string {
        switch s {
        case Connected:
                return "Connected"
+       case NotResponding:
+               return "NotResponding"
        case Disconnected:
                return "Disconnected"
        case Failed:
@@ -95,7 +101,9 @@ type Connection struct {
 
        vppConnected uint32 // non-zero if the adapter is connected to VPP
 
-       codec  *codec.MsgCodec        // message codec
+       connChan chan ConnectionEvent // connection status events are sent to this channel
+
+       codec  MessageCodec           // message codec
        msgIDs map[string]uint16      // map of message IDs indexed by message name + CRC
        msgMap map[uint16]api.Message // map of messages indexed by message ID
 
@@ -111,6 +119,9 @@ type Connection struct {
 
        lastReplyLock sync.Mutex // lock for the last reply
        lastReply     time.Time  // time of the last received reply from VPP
+
+       msgControlPing      api.Message
+       msgControlPingReply api.Message
 }
 
 func newConnection(binapi adapter.VppAPI, attempts int, interval time.Duration) *Connection {
@@ -122,14 +133,17 @@ func newConnection(binapi adapter.VppAPI, attempts int, interval time.Duration)
        }
 
        c := &Connection{
-               vppClient:     binapi,
-               maxAttempts:   attempts,
-               recInterval:   interval,
-               codec:         &codec.MsgCodec{},
-               msgIDs:        make(map[string]uint16),
-               msgMap:        make(map[uint16]api.Message),
-               channels:      make(map[uint16]*Channel),
-               subscriptions: make(map[uint16][]*subscriptionCtx),
+               vppClient:           binapi,
+               maxAttempts:         attempts,
+               recInterval:         interval,
+               connChan:            make(chan ConnectionEvent, NotificationChanBufSize),
+               codec:               codec.DefaultCodec,
+               msgIDs:              make(map[string]uint16),
+               msgMap:              make(map[uint16]api.Message),
+               channels:            make(map[uint16]*Channel),
+               subscriptions:       make(map[uint16][]*subscriptionCtx),
+               msgControlPing:      msgControlPing,
+               msgControlPingReply: msgControlPingReply,
        }
        binapi.SetMsgCallback(c.msgCallback)
        return c
@@ -159,10 +173,9 @@ func AsyncConnect(binapi adapter.VppAPI, attempts int, interval time.Duration) (
        c := newConnection(binapi, attempts, interval)
 
        // asynchronously attempt to connect to VPP
-       connChan := make(chan ConnectionEvent, NotificationChanBufSize)
-       go c.connectLoop(connChan)
+       go c.connectLoop()
 
-       return c, connChan, nil
+       return c, c.connChan, nil
 }
 
 // connectVPP performs blocking attempt to connect to VPP.
@@ -254,7 +267,7 @@ func (c *Connection) releaseAPIChannel(ch *Channel) {
 
 // connectLoop attempts to connect to VPP until it succeeds.
 // Then it continues with healthCheckLoop.
-func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
+func (c *Connection) connectLoop() {
        var reconnectAttempts int
 
        // loop until connected
@@ -264,25 +277,25 @@ func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
                }
                if err := c.connectVPP(); err == nil {
                        // signal connected event
-                       connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected}
+                       c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Connected})
                        break
                } else if reconnectAttempts < c.maxAttempts {
                        reconnectAttempts++
                        log.Warnf("connecting failed (attempt %d/%d): %v", reconnectAttempts, c.maxAttempts, err)
                        time.Sleep(c.recInterval)
                } else {
-                       connChan <- ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err}
+                       c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err})
                        return
                }
        }
 
        // we are now connected, continue with health check loop
-       c.healthCheckLoop(connChan)
+       c.healthCheckLoop()
 }
 
 // healthCheckLoop checks whether connection to VPP is alive. In case of disconnect,
 // it continues with connectLoop and tries to reconnect.
-func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
+func (c *Connection) healthCheckLoop() {
        // create a separate API channel for health check probes
        ch, err := c.newAPIChannel(1, 1)
        if err != nil {
@@ -314,7 +327,7 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
                }
 
                // send the control ping request
-               ch.reqChan <- &vppRequest{msg: msgControlPing}
+               ch.reqChan <- &vppRequest{msg: c.msgControlPing}
 
                for {
                        // expect response within timeout period
@@ -343,15 +356,15 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
                        failedChecks++
                        log.Warnf("VPP health check probe timed out after %v (%d. timeout)", HealthCheckReplyTimeout, failedChecks)
                        if failedChecks > HealthCheckThreshold {
-                               // in case of exceeded failed check treshold, assume VPP disconnected
-                               log.Errorf("VPP health check exceeded treshold for timeouts (>%d), assuming disconnect", HealthCheckThreshold)
-                               connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
+                               // in case of exceeded failed check threshold, assume VPP unresponsive
+                               log.Errorf("VPP does not responding, the health check exceeded threshold for timeouts (>%d)", HealthCheckThreshold)
+                               c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: NotResponding})
                                break
                        }
                } else if err != nil {
                        // in case of error, assume VPP disconnected
                        log.Errorf("VPP health check probe failed: %v", err)
-                       connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: err}
+                       c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: err})
                        break
                } else if failedChecks > 0 {
                        // in case of success after failed checks, clear failed check counter
@@ -365,11 +378,15 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
        c.disconnectVPP()
 
        // we are now disconnected, start connect loop
-       c.connectLoop(connChan)
+       c.connectLoop()
 }
 
 func getMsgNameWithCrc(x api.Message) string {
-       return x.GetMessageName() + "_" + x.GetCrcString()
+       return getMsgID(x.GetMessageName(), x.GetCrcString())
+}
+
+func getMsgID(name, crc string) string {
+       return name + "_" + crc
 }
 
 func getMsgFactory(msg api.Message) func() api.Message {
@@ -420,26 +437,40 @@ func (c *Connection) retrieveMessageIDs() (err error) {
 
        var n int
        for name, msg := range msgs {
+               typ := reflect.TypeOf(msg).Elem()
+               path := fmt.Sprintf("%s.%s", typ.PkgPath(), typ.Name())
+
                msgID, err := c.GetMessageID(msg)
                if err != nil {
-                       log.Debugf("retrieving msgID for %s failed: %v", name, err)
+                       if debugMsgIDs {
+                               log.Debugf("retrieving message ID for %s failed: %v", path, err)
+                       }
                        continue
                }
                n++
 
-               if c.pingReqID == 0 && msg.GetMessageName() == msgControlPing.GetMessageName() {
+               if c.pingReqID == 0 && msg.GetMessageName() == c.msgControlPing.GetMessageName() {
                        c.pingReqID = msgID
-                       msgControlPing = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
-               } else if c.pingReplyID == 0 && msg.GetMessageName() == msgControlPingReply.GetMessageName() {
+                       c.msgControlPing = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
+               } else if c.pingReplyID == 0 && msg.GetMessageName() == c.msgControlPingReply.GetMessageName() {
                        c.pingReplyID = msgID
-                       msgControlPingReply = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
+                       c.msgControlPingReply = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
                }
 
                if debugMsgIDs {
                        log.Debugf("message %q (%s) has ID: %d", name, getMsgNameWithCrc(msg), msgID)
                }
        }
-       log.Debugf("retrieved %d/%d msgIDs (took %s)", n, len(msgs), time.Since(t))
+       log.WithField("took", time.Since(t)).
+               Debugf("retrieved IDs for %d messages (registered %d)", n, len(msgs))
 
        return nil
 }
+
+func (c *Connection) sendConnEvent(event ConnectionEvent) {
+       select {
+       case c.connChan <- event:
+       default:
+               log.Warn("Connection state channel is full, discarding value.")
+       }
+}