X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=core%2Fconnection.go;h=53a9acfced7d435116bfa31ca5019046f9a119f6;hb=a155cd438c6558da266c1c5931361ea088b35653;hp=605e1efa7170d458c8f3deac3027c18008d25af8;hpb=4dca07c803308611275f78b490ac0352c1052fe2;p=govpp.git diff --git a/core/connection.go b/core/connection.go index 605e1ef..53a9acf 100644 --- a/core/connection.go +++ b/core/connection.go @@ -54,7 +54,11 @@ const ( // Connected represents state in which the connection has been successfully established. Connected ConnectionState = iota - // Disconnected represents state in which the connection has been dropped. + // NotResponding represents a state where the VPP socket accepts messages but replies are received with delay, + // or not at all. GoVPP treats this state internally the same as disconnected. + NotResponding + + // Disconnected represents state in which the VPP socket is closed and the connection is considered dropped. Disconnected // Failed represents state in which the reconnecting failed after exceeding maximum number of attempts. @@ -65,6 +69,8 @@ func (s ConnectionState) String() string { switch s { case Connected: return "Connected" + case NotResponding: + return "NotResponding" case Disconnected: return "Disconnected" case Failed: @@ -89,14 +95,15 @@ type ConnectionEvent struct { // Connection represents a shared memory connection to VPP via vppAdapter. type Connection struct { vppClient adapter.VppAPI // VPP binary API client - //statsClient adapter.StatsAPI // VPP stats API client maxAttempts int // interval for reconnect attempts recInterval time.Duration // maximum number of reconnect attempts vppConnected uint32 // non-zero if the adapter is connected to VPP - codec *codec.MsgCodec // message codec + connChan chan ConnectionEvent // connection status events are sent to this channel + + codec MessageCodec // message codec msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC msgMap map[uint16]api.Message // map of messages indexed by message ID @@ -112,6 +119,9 @@ type Connection struct { lastReplyLock sync.Mutex // lock for the last reply lastReply time.Time // time of the last received reply from VPP + + msgControlPing api.Message + msgControlPingReply api.Message } func newConnection(binapi adapter.VppAPI, attempts int, interval time.Duration) *Connection { @@ -123,14 +133,17 @@ func newConnection(binapi adapter.VppAPI, attempts int, interval time.Duration) } c := &Connection{ - vppClient: binapi, - maxAttempts: attempts, - recInterval: interval, - codec: &codec.MsgCodec{}, - msgIDs: make(map[string]uint16), - msgMap: make(map[uint16]api.Message), - channels: make(map[uint16]*Channel), - subscriptions: make(map[uint16][]*subscriptionCtx), + vppClient: binapi, + maxAttempts: attempts, + recInterval: interval, + connChan: make(chan ConnectionEvent, NotificationChanBufSize), + codec: codec.DefaultCodec, + msgIDs: make(map[string]uint16), + msgMap: make(map[uint16]api.Message), + channels: make(map[uint16]*Channel), + subscriptions: make(map[uint16][]*subscriptionCtx), + msgControlPing: msgControlPing, + msgControlPingReply: msgControlPingReply, } binapi.SetMsgCallback(c.msgCallback) return c @@ -160,10 +173,9 @@ func AsyncConnect(binapi adapter.VppAPI, attempts int, interval time.Duration) ( c := newConnection(binapi, attempts, interval) // asynchronously attempt to connect to VPP - connChan := make(chan ConnectionEvent, NotificationChanBufSize) - go c.connectLoop(connChan) + go c.connectLoop() - return c, connChan, nil + return c, c.connChan, nil } // connectVPP performs blocking attempt to connect to VPP. @@ -177,7 +189,9 @@ func (c *Connection) connectVPP() error { log.Debugf("Connected to VPP") if err := c.retrieveMessageIDs(); err != nil { - c.vppClient.Disconnect() + if err := c.vppClient.Disconnect(); err != nil { + log.Debugf("disconnecting vpp client failed: %v", err) + } return fmt.Errorf("VPP is incompatible: %v", err) } @@ -192,7 +206,6 @@ func (c *Connection) Disconnect() { if c == nil { return } - if c.vppClient != nil { c.disconnectVPP() } @@ -254,35 +267,35 @@ func (c *Connection) releaseAPIChannel(ch *Channel) { // connectLoop attempts to connect to VPP until it succeeds. // Then it continues with healthCheckLoop. -func (c *Connection) connectLoop(connChan chan ConnectionEvent) { +func (c *Connection) connectLoop() { var reconnectAttempts int // loop until connected for { if err := c.vppClient.WaitReady(); err != nil { - log.Warnf("wait ready failed: %v", err) + log.Debugf("wait ready failed: %v", err) } if err := c.connectVPP(); err == nil { // signal connected event - connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected} + c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Connected}) break } else if reconnectAttempts < c.maxAttempts { reconnectAttempts++ log.Warnf("connecting failed (attempt %d/%d): %v", reconnectAttempts, c.maxAttempts, err) time.Sleep(c.recInterval) } else { - connChan <- ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err} + c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err}) return } } // we are now connected, continue with health check loop - c.healthCheckLoop(connChan) + c.healthCheckLoop() } // healthCheckLoop checks whether connection to VPP is alive. In case of disconnect, // it continues with connectLoop and tries to reconnect. -func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { +func (c *Connection) healthCheckLoop() { // create a separate API channel for health check probes ch, err := c.newAPIChannel(1, 1) if err != nil { @@ -314,7 +327,7 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { } // send the control ping request - ch.reqChan <- &vppRequest{msg: msgControlPing} + ch.reqChan <- &vppRequest{msg: c.msgControlPing} for { // expect response within timeout period @@ -343,15 +356,15 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { failedChecks++ log.Warnf("VPP health check probe timed out after %v (%d. timeout)", HealthCheckReplyTimeout, failedChecks) if failedChecks > HealthCheckThreshold { - // in case of exceeded failed check treshold, assume VPP disconnected - log.Errorf("VPP health check exceeded treshold for timeouts (>%d), assuming disconnect", HealthCheckThreshold) - connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected} + // in case of exceeded failed check threshold, assume VPP unresponsive + log.Errorf("VPP does not responding, the health check exceeded threshold for timeouts (>%d)", HealthCheckThreshold) + c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: NotResponding}) break } } else if err != nil { // in case of error, assume VPP disconnected log.Errorf("VPP health check probe failed: %v", err) - connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: err} + c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: err}) break } else if failedChecks > 0 { // in case of success after failed checks, clear failed check counter @@ -365,11 +378,15 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { c.disconnectVPP() // we are now disconnected, start connect loop - c.connectLoop(connChan) + c.connectLoop() } func getMsgNameWithCrc(x api.Message) string { - return x.GetMessageName() + "_" + x.GetCrcString() + return getMsgID(x.GetMessageName(), x.GetCrcString()) +} + +func getMsgID(name, crc string) string { + return name + "_" + crc } func getMsgFactory(msg api.Message) func() api.Message { @@ -420,26 +437,40 @@ func (c *Connection) retrieveMessageIDs() (err error) { var n int for name, msg := range msgs { + typ := reflect.TypeOf(msg).Elem() + path := fmt.Sprintf("%s.%s", typ.PkgPath(), typ.Name()) + msgID, err := c.GetMessageID(msg) if err != nil { - log.Debugf("retrieving msgID for %s failed: %v", name, err) + if debugMsgIDs { + log.Debugf("retrieving message ID for %s failed: %v", path, err) + } continue } n++ - if c.pingReqID == 0 && msg.GetMessageName() == msgControlPing.GetMessageName() { + if c.pingReqID == 0 && msg.GetMessageName() == c.msgControlPing.GetMessageName() { c.pingReqID = msgID - msgControlPing = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message) - } else if c.pingReplyID == 0 && msg.GetMessageName() == msgControlPingReply.GetMessageName() { + c.msgControlPing = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message) + } else if c.pingReplyID == 0 && msg.GetMessageName() == c.msgControlPingReply.GetMessageName() { c.pingReplyID = msgID - msgControlPingReply = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message) + c.msgControlPingReply = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message) } if debugMsgIDs { log.Debugf("message %q (%s) has ID: %d", name, getMsgNameWithCrc(msg), msgID) } } - log.Debugf("retrieved %d/%d msgIDs (took %s)", n, len(msgs), time.Since(t)) + log.WithField("took", time.Since(t)). + Debugf("retrieved IDs for %d messages (registered %d)", n, len(msgs)) return nil } + +func (c *Connection) sendConnEvent(event ConnectionEvent) { + select { + case c.connChan <- event: + default: + log.Warn("Connection state channel is full, discarding value.") + } +}