X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=core%2Fconnection.go;h=ee5a06b82e04fe76c96ec2a7a132b2c45fb325a4;hb=57de49f7583b8174c7f3d8e21956d4eaac64ac28;hp=08f08f52a6276d997bca3017d272d149f202faa0;hpb=eb571ee5c0cf5a5194d6ae9aaa0be6e24ea2a943;p=govpp.git diff --git a/core/connection.go b/core/connection.go index 08f08f5..ee5a06b 100644 --- a/core/connection.go +++ b/core/connection.go @@ -17,6 +17,7 @@ package core import ( "errors" "fmt" + "path" "reflect" "sync" "sync/atomic" @@ -29,6 +30,11 @@ import ( "git.fd.io/govpp.git/codec" ) +const ( + DefaultReconnectInterval = time.Second / 2 // default interval between reconnect attempts + DefaultMaxReconnectAttempts = 3 // default maximum number of reconnect attempts +) + var ( RequestChanBufSize = 100 // default size of the request channel buffer ReplyChanBufSize = 100 // default size of the reply channel buffer @@ -36,10 +42,10 @@ var ( ) var ( - HealthCheckProbeInterval = time.Second * 1 // default health check probe interval - HealthCheckReplyTimeout = time.Millisecond * 100 // timeout for reply to a health check probe - HealthCheckThreshold = 1 // number of failed health checks until the error is reported - DefaultReplyTimeout = time.Second * 1 // default timeout for replies from VPP + HealthCheckProbeInterval = time.Second // default health check probe interval + HealthCheckReplyTimeout = time.Millisecond * 250 // timeout for reply to a health check probe + HealthCheckThreshold = 2 // number of failed health checks until the error is reported + DefaultReplyTimeout = time.Second // default timeout for replies from VPP ) // ConnectionState represents the current state of the connection to VPP. @@ -49,10 +55,32 @@ const ( // Connected represents state in which the connection has been successfully established. Connected ConnectionState = iota - // Disconnected represents state in which the connection has been dropped. + // NotResponding represents a state where the VPP socket accepts messages but replies are received with delay, + // or not at all. GoVPP treats this state internally the same as disconnected. + NotResponding + + // Disconnected represents state in which the VPP socket is closed and the connection is considered dropped. Disconnected + + // Failed represents state in which the reconnecting failed after exceeding maximum number of attempts. + Failed ) +func (s ConnectionState) String() string { + switch s { + case Connected: + return "Connected" + case NotResponding: + return "NotResponding" + case Disconnected: + return "Disconnected" + case Failed: + return "Failed" + default: + return fmt.Sprintf("UnknownState(%d)", s) + } +} + // ConnectionEvent is a notification about change in the VPP connection state. type ConnectionEvent struct { // Timestamp holds the time when the event has been created. @@ -67,13 +95,18 @@ type ConnectionEvent struct { // Connection represents a shared memory connection to VPP via vppAdapter. type Connection struct { - vppClient adapter.VppAPI // VPP binary API client adapter + vppClient adapter.VppAPI // VPP binary API client + + maxAttempts int // interval for reconnect attempts + recInterval time.Duration // maximum number of reconnect attempts vppConnected uint32 // non-zero if the adapter is connected to VPP - codec *codec.MsgCodec // message codec - msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC - msgMap map[uint16]api.Message // map of messages indexed by message ID + connChan chan ConnectionEvent // connection status events are sent to this channel + + codec MessageCodec // message codec + msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC + msgMapByPath map[string]map[uint16]api.Message // map of messages indexed by message ID which are indexed by path maxChannelID uint32 // maximum used channel ID (the real limit is 2^15, 32-bit is used for atomic operations) channelsLock sync.RWMutex // lock for the channels map @@ -87,26 +120,48 @@ type Connection struct { lastReplyLock sync.Mutex // lock for the last reply lastReply time.Time // time of the last received reply from VPP + + msgControlPing api.Message + msgControlPingReply api.Message + + apiTrace *trace // API tracer (disabled by default) } -func newConnection(binapi adapter.VppAPI) *Connection { +func newConnection(binapi adapter.VppAPI, attempts int, interval time.Duration) *Connection { + if attempts == 0 { + attempts = DefaultMaxReconnectAttempts + } + if interval == 0 { + interval = DefaultReconnectInterval + } + c := &Connection{ - vppClient: binapi, - codec: &codec.MsgCodec{}, - msgIDs: make(map[string]uint16), - msgMap: make(map[uint16]api.Message), - channels: make(map[uint16]*Channel), - subscriptions: make(map[uint16][]*subscriptionCtx), + vppClient: binapi, + maxAttempts: attempts, + recInterval: interval, + connChan: make(chan ConnectionEvent, NotificationChanBufSize), + codec: codec.DefaultCodec, + msgIDs: make(map[string]uint16), + msgMapByPath: make(map[string]map[uint16]api.Message), + channels: make(map[uint16]*Channel), + subscriptions: make(map[uint16][]*subscriptionCtx), + msgControlPing: msgControlPing, + msgControlPingReply: msgControlPingReply, + apiTrace: &trace{ + list: make([]*api.Record, 0), + mux: &sync.Mutex{}, + }, } binapi.SetMsgCallback(c.msgCallback) return c } -// Connect connects to VPP using specified VPP adapter and returns the connection handle. -// This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed. +// Connect connects to VPP API using specified adapter and returns a connection handle. +// This call blocks until it is either connected, or an error occurs. +// Only one connection attempt will be performed. func Connect(binapi adapter.VppAPI) (*Connection, error) { // create new connection handle - c := newConnection(binapi) + c := newConnection(binapi, DefaultMaxReconnectAttempts, DefaultReconnectInterval) // blocking attempt to connect to VPP if err := c.connectVPP(); err != nil { @@ -120,15 +175,14 @@ func Connect(binapi adapter.VppAPI) (*Connection, error) { // and ConnectionState channel. This call does not block until connection is established, it // returns immediately. The caller is supposed to watch the returned ConnectionState channel for // Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect. -func AsyncConnect(binapi adapter.VppAPI) (*Connection, chan ConnectionEvent, error) { +func AsyncConnect(binapi adapter.VppAPI, attempts int, interval time.Duration) (*Connection, chan ConnectionEvent, error) { // create new connection handle - c := newConnection(binapi) + c := newConnection(binapi, attempts, interval) // asynchronously attempt to connect to VPP - connChan := make(chan ConnectionEvent, NotificationChanBufSize) - go c.connectLoop(connChan) + go c.connectLoop() - return c, connChan, nil + return c, c.connChan, nil } // connectVPP performs blocking attempt to connect to VPP. @@ -139,11 +193,12 @@ func (c *Connection) connectVPP() error { if err := c.vppClient.Connect(); err != nil { return err } - - log.Debugf("Connected to VPP.") + log.Debugf("Connected to VPP") if err := c.retrieveMessageIDs(); err != nil { - c.vppClient.Disconnect() + if err := c.vppClient.Disconnect(); err != nil { + log.Debugf("disconnecting vpp client failed: %v", err) + } return fmt.Errorf("VPP is incompatible: %v", err) } @@ -153,12 +208,11 @@ func (c *Connection) connectVPP() error { return nil } -// Disconnect disconnects from VPP and releases all connection-related resources. +// Disconnect disconnects from VPP API and releases all connection-related resources. func (c *Connection) Disconnect() { if c == nil { return } - if c.vppClient != nil { c.disconnectVPP() } @@ -167,7 +221,12 @@ func (c *Connection) Disconnect() { // disconnectVPP disconnects from VPP in case it is connected. func (c *Connection) disconnectVPP() { if atomic.CompareAndSwapUint32(&c.vppConnected, 1, 0) { - c.vppClient.Disconnect() + log.Debug("Disconnecting from VPP..") + + if err := c.vppClient.Disconnect(); err != nil { + log.Debugf("Disconnect from VPP failed: %v", err) + } + log.Debug("Disconnected from VPP") } } @@ -213,110 +272,37 @@ func (c *Connection) releaseAPIChannel(ch *Channel) { c.channelsLock.Unlock() } -// GetMessageID returns message identifier of given API message. -func (c *Connection) GetMessageID(msg api.Message) (uint16, error) { - if c == nil { - return 0, errors.New("nil connection passed in") - } - - if msgID, ok := c.msgIDs[getMsgNameWithCrc(msg)]; ok { - return msgID, nil - } - - return 0, fmt.Errorf("unknown message: %s (%s)", msg.GetMessageName(), msg.GetCrcString()) -} - -// LookupByID looks up message name and crc by ID. -func (c *Connection) LookupByID(msgID uint16) (api.Message, error) { - if c == nil { - return nil, errors.New("nil connection passed in") - } - - if msg, ok := c.msgMap[msgID]; ok { - return msg, nil - } - - return nil, fmt.Errorf("unknown message ID: %d", msgID) -} - -// retrieveMessageIDs retrieves IDs for all registered messages and stores them in map -func (c *Connection) retrieveMessageIDs() (err error) { - t := time.Now() - - var addMsg = func(msgID uint16, msg api.Message) { - c.msgIDs[getMsgNameWithCrc(msg)] = msgID - c.msgMap[msgID] = msg - } - - msgs := api.GetRegisteredMessages() - - for name, msg := range msgs { - msgID, err := c.vppClient.GetMsgID(msg.GetMessageName(), msg.GetCrcString()) - if err != nil { - return err - } - - addMsg(msgID, msg) - - if msg.GetMessageName() == msgControlPing.GetMessageName() { - c.pingReqID = msgID - msgControlPing = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message) - } else if msg.GetMessageName() == msgControlPingReply.GetMessageName() { - c.pingReplyID = msgID - msgControlPingReply = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message) - } - - if debugMsgIDs { - log.Debugf("message %q (%s) has ID: %d", name, getMsgNameWithCrc(msg), msgID) - } - } - - log.Debugf("retrieving %d message IDs took %s", len(msgs), time.Since(t)) - - // fallback for control ping when vpe package is not imported - if c.pingReqID == 0 { - c.pingReqID, err = c.vppClient.GetMsgID(msgControlPing.GetMessageName(), msgControlPing.GetCrcString()) - if err != nil { - return err - } - addMsg(c.pingReqID, msgControlPing) - } - if c.pingReplyID == 0 { - c.pingReplyID, err = c.vppClient.GetMsgID(msgControlPingReply.GetMessageName(), msgControlPingReply.GetCrcString()) - if err != nil { - return err - } - addMsg(c.pingReplyID, msgControlPingReply) - } - - return nil -} - // connectLoop attempts to connect to VPP until it succeeds. // Then it continues with healthCheckLoop. -func (c *Connection) connectLoop(connChan chan ConnectionEvent) { +func (c *Connection) connectLoop() { + var reconnectAttempts int + // loop until connected for { if err := c.vppClient.WaitReady(); err != nil { - log.Warnf("wait ready failed: %v", err) + log.Debugf("wait ready failed: %v", err) } if err := c.connectVPP(); err == nil { // signal connected event - connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected} + c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Connected}) break + } else if reconnectAttempts < c.maxAttempts { + reconnectAttempts++ + log.Warnf("connecting failed (attempt %d/%d): %v", reconnectAttempts, c.maxAttempts, err) + time.Sleep(c.recInterval) } else { - log.Errorf("connecting to VPP failed: %v", err) - time.Sleep(time.Second) + c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err}) + return } } // we are now connected, continue with health check loop - c.healthCheckLoop(connChan) + c.healthCheckLoop() } // healthCheckLoop checks whether connection to VPP is alive. In case of disconnect, // it continues with connectLoop and tries to reconnect. -func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { +func (c *Connection) healthCheckLoop() { // create a separate API channel for health check probes ch, err := c.newAPIChannel(1, 1) if err != nil { @@ -348,7 +334,7 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { } // send the control ping request - ch.reqChan <- &vppRequest{msg: msgControlPing} + ch.reqChan <- &vppRequest{msg: c.msgControlPing} for { // expect response within timeout period @@ -377,15 +363,15 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { failedChecks++ log.Warnf("VPP health check probe timed out after %v (%d. timeout)", HealthCheckReplyTimeout, failedChecks) if failedChecks > HealthCheckThreshold { - // in case of exceeded failed check treshold, assume VPP disconnected - log.Errorf("VPP health check exceeded treshold for timeouts (>%d), assuming disconnect", HealthCheckThreshold) - connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected} + // in case of exceeded failed check threshold, assume VPP unresponsive + log.Errorf("VPP does not responding, the health check exceeded threshold for timeouts (>%d)", HealthCheckThreshold) + c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: NotResponding}) break } } else if err != nil { // in case of error, assume VPP disconnected log.Errorf("VPP health check probe failed: %v", err) - connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: err} + c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: err}) break } else if failedChecks > 0 { // in case of success after failed checks, clear failed check counter @@ -399,9 +385,125 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { c.disconnectVPP() // we are now disconnected, start connect loop - c.connectLoop(connChan) + c.connectLoop() } func getMsgNameWithCrc(x api.Message) string { - return x.GetMessageName() + "_" + x.GetCrcString() + return getMsgID(x.GetMessageName(), x.GetCrcString()) +} + +func getMsgID(name, crc string) string { + return name + "_" + crc +} + +func getMsgFactory(msg api.Message) func() api.Message { + return func() api.Message { + return reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message) + } +} + +// GetMessageID returns message identifier of given API message. +func (c *Connection) GetMessageID(msg api.Message) (uint16, error) { + if c == nil { + return 0, errors.New("nil connection passed in") + } + pkgPath := c.GetMessagePath(msg) + msgID, err := c.vppClient.GetMsgID(msg.GetMessageName(), msg.GetCrcString()) + if err != nil { + return 0, err + } + if pathMsgs, pathOk := c.msgMapByPath[pkgPath]; !pathOk { + c.msgMapByPath[pkgPath] = make(map[uint16]api.Message) + c.msgMapByPath[pkgPath][msgID] = msg + } else if _, msgOk := pathMsgs[msgID]; !msgOk { + c.msgMapByPath[pkgPath][msgID] = msg + } + if _, ok := c.msgIDs[getMsgNameWithCrc(msg)]; ok { + return msgID, nil + } + c.msgIDs[getMsgNameWithCrc(msg)] = msgID + return msgID, nil +} + +// LookupByID looks up message name and crc by ID. +func (c *Connection) LookupByID(path string, msgID uint16) (api.Message, error) { + if c == nil { + return nil, errors.New("nil connection passed in") + } + if msg, ok := c.msgMapByPath[path][msgID]; ok { + return msg, nil + } + return nil, fmt.Errorf("unknown message ID %d for path '%s'", msgID, path) +} + +// GetMessagePath returns path for the given message +func (c *Connection) GetMessagePath(msg api.Message) string { + return path.Dir(reflect.TypeOf(msg).Elem().PkgPath()) +} + +// retrieveMessageIDs retrieves IDs for all registered messages and stores them in map +func (c *Connection) retrieveMessageIDs() (err error) { + t := time.Now() + + msgsByPath := api.GetRegisteredMessages() + + var n int + for pkgPath, msgs := range msgsByPath { + for _, msg := range msgs { + msgID, err := c.GetMessageID(msg) + if err != nil { + if debugMsgIDs { + log.Debugf("retrieving message ID for %s.%s failed: %v", + pkgPath, msg.GetMessageName(), err) + } + continue + } + n++ + + if c.pingReqID == 0 && msg.GetMessageName() == c.msgControlPing.GetMessageName() { + c.pingReqID = msgID + c.msgControlPing = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message) + } else if c.pingReplyID == 0 && msg.GetMessageName() == c.msgControlPingReply.GetMessageName() { + c.pingReplyID = msgID + c.msgControlPingReply = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message) + } + + if debugMsgIDs { + log.Debugf("message %q (%s) has ID: %d", msg.GetMessageName(), getMsgNameWithCrc(msg), msgID) + } + } + log.WithField("took", time.Since(t)). + Debugf("retrieved IDs for %d messages (registered %d) from path %s", n, len(msgs), pkgPath) + } + + return nil +} + +func (c *Connection) sendConnEvent(event ConnectionEvent) { + select { + case c.connChan <- event: + default: + log.Warn("Connection state channel is full, discarding value.") + } +} + +// Trace gives access to the API trace interface +func (c *Connection) Trace() api.Trace { + return c.apiTrace +} + +// trace records api message +func (c *Connection) trace(msg api.Message, chId uint16, t time.Time, isReceived bool) { + if atomic.LoadInt32(&c.apiTrace.isEnabled) == 0 { + return + } + entry := &api.Record{ + Message: msg, + Timestamp: t, + IsReceived: isReceived, + ChannelID: chId, + } + c.apiTrace.mux.Lock() + c.apiTrace.list = append(c.apiTrace.list, entry) + c.apiTrace.mux.Unlock() }