X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=core%2Fconnection.go;h=917f1cbd96d9939eb599699285541a82876facf1;hb=280b1c6c83b676ef4e592f4ecf60cb5b54b6a753;hp=14b0af446f3668366f127667d9d4d4a314ae59c5;hpb=df05a70f90a1486a86a4156b1b0d68c94f2098b4;p=govpp.git diff --git a/core/connection.go b/core/connection.go index 14b0af4..917f1cb 100644 --- a/core/connection.go +++ b/core/connection.go @@ -29,6 +29,11 @@ import ( "git.fd.io/govpp.git/codec" ) +const ( + DefaultReconnectInterval = time.Second / 2 // default interval between reconnect attempts + DefaultMaxReconnectAttempts = 3 // default maximum number of reconnect attempts +) + var ( RequestChanBufSize = 100 // default size of the request channel buffer ReplyChanBufSize = 100 // default size of the reply channel buffer @@ -36,12 +41,10 @@ var ( ) var ( - HealthCheckProbeInterval = time.Second * 1 // default health check probe interval + HealthCheckProbeInterval = time.Second // default health check probe interval HealthCheckReplyTimeout = time.Millisecond * 100 // timeout for reply to a health check probe HealthCheckThreshold = 1 // number of failed health checks until the error is reported - DefaultReplyTimeout = time.Second * 1 // default timeout for replies from VPP - ReconnectInterval = time.Second * 1 // default interval for reconnect attempts - MaxReconnectAttempts = 10 // maximum number of reconnect attempts + DefaultReplyTimeout = time.Second // default timeout for replies from VPP ) // ConnectionState represents the current state of the connection to VPP. @@ -58,6 +61,19 @@ const ( Failed ) +func (s ConnectionState) String() string { + switch s { + case Connected: + return "Connected" + case Disconnected: + return "Disconnected" + case Failed: + return "Failed" + default: + return fmt.Sprintf("UnknownState(%d)", s) + } +} + // ConnectionEvent is a notification about change in the VPP connection state. type ConnectionEvent struct { // Timestamp holds the time when the event has been created. @@ -72,11 +88,14 @@ type ConnectionEvent struct { // Connection represents a shared memory connection to VPP via vppAdapter. type Connection struct { - vppClient adapter.VppAPI // VPP binary API client adapter + vppClient adapter.VppAPI // VPP binary API client + + maxAttempts int // interval for reconnect attempts + recInterval time.Duration // maximum number of reconnect attempts vppConnected uint32 // non-zero if the adapter is connected to VPP - codec *codec.MsgCodec // message codec + codec MessageCodec // message codec msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC msgMap map[uint16]api.Message // map of messages indexed by message ID @@ -92,26 +111,41 @@ type Connection struct { lastReplyLock sync.Mutex // lock for the last reply lastReply time.Time // time of the last received reply from VPP + + msgControlPing api.Message + msgControlPingReply api.Message } -func newConnection(binapi adapter.VppAPI) *Connection { +func newConnection(binapi adapter.VppAPI, attempts int, interval time.Duration) *Connection { + if attempts == 0 { + attempts = DefaultMaxReconnectAttempts + } + if interval == 0 { + interval = DefaultReconnectInterval + } + c := &Connection{ - vppClient: binapi, - codec: &codec.MsgCodec{}, - msgIDs: make(map[string]uint16), - msgMap: make(map[uint16]api.Message), - channels: make(map[uint16]*Channel), - subscriptions: make(map[uint16][]*subscriptionCtx), + vppClient: binapi, + maxAttempts: attempts, + recInterval: interval, + codec: &codec.MsgCodec{}, + msgIDs: make(map[string]uint16), + msgMap: make(map[uint16]api.Message), + channels: make(map[uint16]*Channel), + subscriptions: make(map[uint16][]*subscriptionCtx), + msgControlPing: msgControlPing, + msgControlPingReply: msgControlPingReply, } binapi.SetMsgCallback(c.msgCallback) return c } -// Connect connects to VPP using specified VPP adapter and returns the connection handle. -// This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed. +// Connect connects to VPP API using specified adapter and returns a connection handle. +// This call blocks until it is either connected, or an error occurs. +// Only one connection attempt will be performed. func Connect(binapi adapter.VppAPI) (*Connection, error) { // create new connection handle - c := newConnection(binapi) + c := newConnection(binapi, DefaultMaxReconnectAttempts, DefaultReconnectInterval) // blocking attempt to connect to VPP if err := c.connectVPP(); err != nil { @@ -125,9 +159,9 @@ func Connect(binapi adapter.VppAPI) (*Connection, error) { // and ConnectionState channel. This call does not block until connection is established, it // returns immediately. The caller is supposed to watch the returned ConnectionState channel for // Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect. -func AsyncConnect(binapi adapter.VppAPI) (*Connection, chan ConnectionEvent, error) { +func AsyncConnect(binapi adapter.VppAPI, attempts int, interval time.Duration) (*Connection, chan ConnectionEvent, error) { // create new connection handle - c := newConnection(binapi) + c := newConnection(binapi, attempts, interval) // asynchronously attempt to connect to VPP connChan := make(chan ConnectionEvent, NotificationChanBufSize) @@ -144,11 +178,12 @@ func (c *Connection) connectVPP() error { if err := c.vppClient.Connect(); err != nil { return err } - - log.Debugf("Connected to VPP.") + log.Debugf("Connected to VPP") if err := c.retrieveMessageIDs(); err != nil { - c.vppClient.Disconnect() + if err := c.vppClient.Disconnect(); err != nil { + log.Debugf("disconnecting vpp client failed: %v", err) + } return fmt.Errorf("VPP is incompatible: %v", err) } @@ -158,12 +193,11 @@ func (c *Connection) connectVPP() error { return nil } -// Disconnect disconnects from VPP and releases all connection-related resources. +// Disconnect disconnects from VPP API and releases all connection-related resources. func (c *Connection) Disconnect() { if c == nil { return } - if c.vppClient != nil { c.disconnectVPP() } @@ -172,7 +206,12 @@ func (c *Connection) Disconnect() { // disconnectVPP disconnects from VPP in case it is connected. func (c *Connection) disconnectVPP() { if atomic.CompareAndSwapUint32(&c.vppConnected, 1, 0) { - c.vppClient.Disconnect() + log.Debug("Disconnecting from VPP..") + + if err := c.vppClient.Disconnect(); err != nil { + log.Debugf("Disconnect from VPP failed: %v", err) + } + log.Debug("Disconnected from VPP") } } @@ -221,21 +260,21 @@ func (c *Connection) releaseAPIChannel(ch *Channel) { // connectLoop attempts to connect to VPP until it succeeds. // Then it continues with healthCheckLoop. func (c *Connection) connectLoop(connChan chan ConnectionEvent) { - reconnectAttempts := 0 + var reconnectAttempts int // loop until connected for { if err := c.vppClient.WaitReady(); err != nil { - log.Warnf("wait ready failed: %v", err) + log.Debugf("wait ready failed: %v", err) } if err := c.connectVPP(); err == nil { // signal connected event connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected} break - } else if reconnectAttempts < MaxReconnectAttempts { + } else if reconnectAttempts < c.maxAttempts { reconnectAttempts++ - log.Errorf("connecting failed (attempt %d/%d): %v", reconnectAttempts, MaxReconnectAttempts, err) - time.Sleep(ReconnectInterval) + log.Warnf("connecting failed (attempt %d/%d): %v", reconnectAttempts, c.maxAttempts, err) + time.Sleep(c.recInterval) } else { connChan <- ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err} return @@ -280,7 +319,7 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { } // send the control ping request - ch.reqChan <- &vppRequest{msg: msgControlPing} + ch.reqChan <- &vppRequest{msg: c.msgControlPing} for { // expect response within timeout period @@ -335,7 +374,11 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { } func getMsgNameWithCrc(x api.Message) string { - return x.GetMessageName() + "_" + x.GetCrcString() + return getMsgID(x.GetMessageName(), x.GetCrcString()) +} + +func getMsgID(name, crc string) string { + return name + "_" + crc } func getMsgFactory(msg api.Message) func() api.Message { @@ -386,26 +429,32 @@ func (c *Connection) retrieveMessageIDs() (err error) { var n int for name, msg := range msgs { + typ := reflect.TypeOf(msg).Elem() + path := fmt.Sprintf("%s.%s", typ.PkgPath(), typ.Name()) + msgID, err := c.GetMessageID(msg) if err != nil { - log.Debugf("retrieving msgID for %s failed: %v", name, err) + if debugMsgIDs { + log.Debugf("retrieving message ID for %s failed: %v", path, err) + } continue } n++ - if c.pingReqID == 0 && msg.GetMessageName() == msgControlPing.GetMessageName() { + if c.pingReqID == 0 && msg.GetMessageName() == c.msgControlPing.GetMessageName() { c.pingReqID = msgID - msgControlPing = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message) - } else if c.pingReplyID == 0 && msg.GetMessageName() == msgControlPingReply.GetMessageName() { + c.msgControlPing = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message) + } else if c.pingReplyID == 0 && msg.GetMessageName() == c.msgControlPingReply.GetMessageName() { c.pingReplyID = msgID - msgControlPingReply = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message) + c.msgControlPingReply = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message) } if debugMsgIDs { log.Debugf("message %q (%s) has ID: %d", name, getMsgNameWithCrc(msg), msgID) } } - log.Debugf("retrieved %d/%d msgIDs (took %s)", n, len(msgs), time.Since(t)) + log.WithField("took", time.Since(t)). + Debugf("retrieved IDs for %d messages (registered %d)", n, len(msgs)) return nil }