X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=core%2Fconnection.go;h=f3ff964e28b55f26efc9f37452be169ce479ce48;hb=5de7f6b85458615fa592a335d45c546397f32c9a;hp=917f1cbd96d9939eb599699285541a82876facf1;hpb=280b1c6c83b676ef4e592f4ecf60cb5b54b6a753;p=govpp.git diff --git a/core/connection.go b/core/connection.go index 917f1cb..f3ff964 100644 --- a/core/connection.go +++ b/core/connection.go @@ -17,6 +17,7 @@ package core import ( "errors" "fmt" + "path" "reflect" "sync" "sync/atomic" @@ -42,8 +43,8 @@ var ( var ( HealthCheckProbeInterval = time.Second // default health check probe interval - HealthCheckReplyTimeout = time.Millisecond * 100 // timeout for reply to a health check probe - HealthCheckThreshold = 1 // number of failed health checks until the error is reported + HealthCheckReplyTimeout = time.Millisecond * 250 // timeout for reply to a health check probe + HealthCheckThreshold = 2 // number of failed health checks until the error is reported DefaultReplyTimeout = time.Second // default timeout for replies from VPP ) @@ -54,7 +55,11 @@ const ( // Connected represents state in which the connection has been successfully established. Connected ConnectionState = iota - // Disconnected represents state in which the connection has been dropped. + // NotResponding represents a state where the VPP socket accepts messages but replies are received with delay, + // or not at all. GoVPP treats this state internally the same as disconnected. + NotResponding + + // Disconnected represents state in which the VPP socket is closed and the connection is considered dropped. Disconnected // Failed represents state in which the reconnecting failed after exceeding maximum number of attempts. @@ -65,6 +70,8 @@ func (s ConnectionState) String() string { switch s { case Connected: return "Connected" + case NotResponding: + return "NotResponding" case Disconnected: return "Disconnected" case Failed: @@ -95,9 +102,11 @@ type Connection struct { vppConnected uint32 // non-zero if the adapter is connected to VPP - codec MessageCodec // message codec - msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC - msgMap map[uint16]api.Message // map of messages indexed by message ID + connChan chan ConnectionEvent // connection status events are sent to this channel + + codec MessageCodec // message codec + msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC + msgMapByPath map[string]map[uint16]api.Message // map of messages indexed by message ID which are indexed by path maxChannelID uint32 // maximum used channel ID (the real limit is 2^15, 32-bit is used for atomic operations) channelsLock sync.RWMutex // lock for the channels map @@ -128,9 +137,10 @@ func newConnection(binapi adapter.VppAPI, attempts int, interval time.Duration) vppClient: binapi, maxAttempts: attempts, recInterval: interval, - codec: &codec.MsgCodec{}, + connChan: make(chan ConnectionEvent, NotificationChanBufSize), + codec: codec.DefaultCodec, msgIDs: make(map[string]uint16), - msgMap: make(map[uint16]api.Message), + msgMapByPath: make(map[string]map[uint16]api.Message), channels: make(map[uint16]*Channel), subscriptions: make(map[uint16][]*subscriptionCtx), msgControlPing: msgControlPing, @@ -164,10 +174,9 @@ func AsyncConnect(binapi adapter.VppAPI, attempts int, interval time.Duration) ( c := newConnection(binapi, attempts, interval) // asynchronously attempt to connect to VPP - connChan := make(chan ConnectionEvent, NotificationChanBufSize) - go c.connectLoop(connChan) + go c.connectLoop() - return c, connChan, nil + return c, c.connChan, nil } // connectVPP performs blocking attempt to connect to VPP. @@ -259,7 +268,7 @@ func (c *Connection) releaseAPIChannel(ch *Channel) { // connectLoop attempts to connect to VPP until it succeeds. // Then it continues with healthCheckLoop. -func (c *Connection) connectLoop(connChan chan ConnectionEvent) { +func (c *Connection) connectLoop() { var reconnectAttempts int // loop until connected @@ -269,25 +278,25 @@ func (c *Connection) connectLoop(connChan chan ConnectionEvent) { } if err := c.connectVPP(); err == nil { // signal connected event - connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected} + c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Connected}) break } else if reconnectAttempts < c.maxAttempts { reconnectAttempts++ log.Warnf("connecting failed (attempt %d/%d): %v", reconnectAttempts, c.maxAttempts, err) time.Sleep(c.recInterval) } else { - connChan <- ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err} + c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err}) return } } // we are now connected, continue with health check loop - c.healthCheckLoop(connChan) + c.healthCheckLoop() } // healthCheckLoop checks whether connection to VPP is alive. In case of disconnect, // it continues with connectLoop and tries to reconnect. -func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { +func (c *Connection) healthCheckLoop() { // create a separate API channel for health check probes ch, err := c.newAPIChannel(1, 1) if err != nil { @@ -348,15 +357,15 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { failedChecks++ log.Warnf("VPP health check probe timed out after %v (%d. timeout)", HealthCheckReplyTimeout, failedChecks) if failedChecks > HealthCheckThreshold { - // in case of exceeded failed check treshold, assume VPP disconnected - log.Errorf("VPP health check exceeded treshold for timeouts (>%d), assuming disconnect", HealthCheckThreshold) - connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected} + // in case of exceeded failed check threshold, assume VPP unresponsive + log.Errorf("VPP does not responding, the health check exceeded threshold for timeouts (>%d)", HealthCheckThreshold) + c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: NotResponding}) break } } else if err != nil { // in case of error, assume VPP disconnected log.Errorf("VPP health check probe failed: %v", err) - connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: err} + c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: err}) break } else if failedChecks > 0 { // in case of success after failed checks, clear failed check counter @@ -370,7 +379,7 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { c.disconnectVPP() // we are now disconnected, start connect loop - c.connectLoop(connChan) + c.connectLoop() } func getMsgNameWithCrc(x api.Message) string { @@ -392,69 +401,82 @@ func (c *Connection) GetMessageID(msg api.Message) (uint16, error) { if c == nil { return 0, errors.New("nil connection passed in") } - - if msgID, ok := c.msgIDs[getMsgNameWithCrc(msg)]; ok { - return msgID, nil - } - + pkgPath := c.GetMessagePath(msg) msgID, err := c.vppClient.GetMsgID(msg.GetMessageName(), msg.GetCrcString()) if err != nil { return 0, err } - + if pathMsgs, pathOk := c.msgMapByPath[pkgPath]; !pathOk { + c.msgMapByPath[pkgPath] = make(map[uint16]api.Message) + c.msgMapByPath[pkgPath][msgID] = msg + } else if _, msgOk := pathMsgs[msgID]; !msgOk { + c.msgMapByPath[pkgPath][msgID] = msg + } + if _, ok := c.msgIDs[getMsgNameWithCrc(msg)]; ok { + return msgID, nil + } c.msgIDs[getMsgNameWithCrc(msg)] = msgID - c.msgMap[msgID] = msg - return msgID, nil } // LookupByID looks up message name and crc by ID. -func (c *Connection) LookupByID(msgID uint16) (api.Message, error) { +func (c *Connection) LookupByID(path string, msgID uint16) (api.Message, error) { if c == nil { return nil, errors.New("nil connection passed in") } - - if msg, ok := c.msgMap[msgID]; ok { + if msg, ok := c.msgMapByPath[path][msgID]; ok { return msg, nil } + return nil, fmt.Errorf("unknown message ID %d for path '%s'", msgID, path) +} - return nil, fmt.Errorf("unknown message ID: %d", msgID) +// GetMessagePath returns path for the given message +func (c *Connection) GetMessagePath(msg api.Message) string { + return path.Dir(reflect.TypeOf(msg).Elem().PkgPath()) } // retrieveMessageIDs retrieves IDs for all registered messages and stores them in map func (c *Connection) retrieveMessageIDs() (err error) { t := time.Now() - msgs := api.GetRegisteredMessages() + msgsByPath := api.GetRegisteredMessages() var n int - for name, msg := range msgs { - typ := reflect.TypeOf(msg).Elem() - path := fmt.Sprintf("%s.%s", typ.PkgPath(), typ.Name()) + for pkgPath, msgs := range msgsByPath { + for _, msg := range msgs { + msgID, err := c.GetMessageID(msg) + if err != nil { + if debugMsgIDs { + log.Debugf("retrieving message ID for %s.%s failed: %v", + pkgPath, msg.GetMessageName(), err) + } + continue + } + n++ + + if c.pingReqID == 0 && msg.GetMessageName() == c.msgControlPing.GetMessageName() { + c.pingReqID = msgID + c.msgControlPing = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message) + } else if c.pingReplyID == 0 && msg.GetMessageName() == c.msgControlPingReply.GetMessageName() { + c.pingReplyID = msgID + c.msgControlPingReply = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message) + } - msgID, err := c.GetMessageID(msg) - if err != nil { if debugMsgIDs { - log.Debugf("retrieving message ID for %s failed: %v", path, err) + log.Debugf("message %q (%s) has ID: %d", msg.GetMessageName(), getMsgNameWithCrc(msg), msgID) } - continue - } - n++ - - if c.pingReqID == 0 && msg.GetMessageName() == c.msgControlPing.GetMessageName() { - c.pingReqID = msgID - c.msgControlPing = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message) - } else if c.pingReplyID == 0 && msg.GetMessageName() == c.msgControlPingReply.GetMessageName() { - c.pingReplyID = msgID - c.msgControlPingReply = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message) - } - - if debugMsgIDs { - log.Debugf("message %q (%s) has ID: %d", name, getMsgNameWithCrc(msg), msgID) } + log.WithField("took", time.Since(t)). + Debugf("retrieved IDs for %d messages (registered %d) from path %s", n, len(msgs), pkgPath) } - log.WithField("took", time.Since(t)). - Debugf("retrieved IDs for %d messages (registered %d)", n, len(msgs)) return nil } + +func (c *Connection) sendConnEvent(event ConnectionEvent) { + select { + case c.connChan <- event: + default: + log.Warn("Connection state channel is full, discarding value.") + } +}