Fix codec fallback and generate type imports
[govpp.git] / core / connection.go
index 14b0af4..cfa94ee 100644 (file)
@@ -29,6 +29,11 @@ import (
        "git.fd.io/govpp.git/codec"
 )
 
+const (
+       DefaultReconnectInterval    = time.Second / 2 // default interval between reconnect attempts
+       DefaultMaxReconnectAttempts = 3               // default maximum number of reconnect attempts
+)
+
 var (
        RequestChanBufSize      = 100 // default size of the request channel buffer
        ReplyChanBufSize        = 100 // default size of the reply channel buffer
@@ -36,12 +41,10 @@ var (
 )
 
 var (
-       HealthCheckProbeInterval = time.Second * 1        // default health check probe interval
+       HealthCheckProbeInterval = time.Second            // default health check probe interval
        HealthCheckReplyTimeout  = time.Millisecond * 100 // timeout for reply to a health check probe
        HealthCheckThreshold     = 1                      // number of failed health checks until the error is reported
-       DefaultReplyTimeout      = time.Second * 1        // default timeout for replies from VPP
-       ReconnectInterval        = time.Second * 1        // default interval for reconnect attempts
-       MaxReconnectAttempts     = 10                     // maximum number of reconnect attempts
+       DefaultReplyTimeout      = time.Second            // default timeout for replies from VPP
 )
 
 // ConnectionState represents the current state of the connection to VPP.
@@ -58,6 +61,19 @@ const (
        Failed
 )
 
+func (s ConnectionState) String() string {
+       switch s {
+       case Connected:
+               return "Connected"
+       case Disconnected:
+               return "Disconnected"
+       case Failed:
+               return "Failed"
+       default:
+               return fmt.Sprintf("UnknownState(%d)", s)
+       }
+}
+
 // ConnectionEvent is a notification about change in the VPP connection state.
 type ConnectionEvent struct {
        // Timestamp holds the time when the event has been created.
@@ -72,11 +88,14 @@ type ConnectionEvent struct {
 
 // Connection represents a shared memory connection to VPP via vppAdapter.
 type Connection struct {
-       vppClient adapter.VppAPI // VPP binary API client adapter
+       vppClient adapter.VppAPI // VPP binary API client
+
+       maxAttempts int           // interval for reconnect attempts
+       recInterval time.Duration // maximum number of reconnect attempts
 
        vppConnected uint32 // non-zero if the adapter is connected to VPP
 
-       codec  *codec.MsgCodec        // message codec
+       codec  MessageCodec           // message codec
        msgIDs map[string]uint16      // map of message IDs indexed by message name + CRC
        msgMap map[uint16]api.Message // map of messages indexed by message ID
 
@@ -92,26 +111,41 @@ type Connection struct {
 
        lastReplyLock sync.Mutex // lock for the last reply
        lastReply     time.Time  // time of the last received reply from VPP
+
+       msgControlPing      api.Message
+       msgControlPingReply api.Message
 }
 
-func newConnection(binapi adapter.VppAPI) *Connection {
+func newConnection(binapi adapter.VppAPI, attempts int, interval time.Duration) *Connection {
+       if attempts == 0 {
+               attempts = DefaultMaxReconnectAttempts
+       }
+       if interval == 0 {
+               interval = DefaultReconnectInterval
+       }
+
        c := &Connection{
-               vppClient:     binapi,
-               codec:         &codec.MsgCodec{},
-               msgIDs:        make(map[string]uint16),
-               msgMap:        make(map[uint16]api.Message),
-               channels:      make(map[uint16]*Channel),
-               subscriptions: make(map[uint16][]*subscriptionCtx),
+               vppClient:           binapi,
+               maxAttempts:         attempts,
+               recInterval:         interval,
+               codec:               codec.DefaultCodec,
+               msgIDs:              make(map[string]uint16),
+               msgMap:              make(map[uint16]api.Message),
+               channels:            make(map[uint16]*Channel),
+               subscriptions:       make(map[uint16][]*subscriptionCtx),
+               msgControlPing:      msgControlPing,
+               msgControlPingReply: msgControlPingReply,
        }
        binapi.SetMsgCallback(c.msgCallback)
        return c
 }
 
-// Connect connects to VPP using specified VPP adapter and returns the connection handle.
-// This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed.
+// Connect connects to VPP API using specified adapter and returns a connection handle.
+// This call blocks until it is either connected, or an error occurs.
+// Only one connection attempt will be performed.
 func Connect(binapi adapter.VppAPI) (*Connection, error) {
        // create new connection handle
-       c := newConnection(binapi)
+       c := newConnection(binapi, DefaultMaxReconnectAttempts, DefaultReconnectInterval)
 
        // blocking attempt to connect to VPP
        if err := c.connectVPP(); err != nil {
@@ -125,9 +159,9 @@ func Connect(binapi adapter.VppAPI) (*Connection, error) {
 // and ConnectionState channel. This call does not block until connection is established, it
 // returns immediately. The caller is supposed to watch the returned ConnectionState channel for
 // Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect.
-func AsyncConnect(binapi adapter.VppAPI) (*Connection, chan ConnectionEvent, error) {
+func AsyncConnect(binapi adapter.VppAPI, attempts int, interval time.Duration) (*Connection, chan ConnectionEvent, error) {
        // create new connection handle
-       c := newConnection(binapi)
+       c := newConnection(binapi, attempts, interval)
 
        // asynchronously attempt to connect to VPP
        connChan := make(chan ConnectionEvent, NotificationChanBufSize)
@@ -144,11 +178,12 @@ func (c *Connection) connectVPP() error {
        if err := c.vppClient.Connect(); err != nil {
                return err
        }
-
-       log.Debugf("Connected to VPP.")
+       log.Debugf("Connected to VPP")
 
        if err := c.retrieveMessageIDs(); err != nil {
-               c.vppClient.Disconnect()
+               if err := c.vppClient.Disconnect(); err != nil {
+                       log.Debugf("disconnecting vpp client failed: %v", err)
+               }
                return fmt.Errorf("VPP is incompatible: %v", err)
        }
 
@@ -158,12 +193,11 @@ func (c *Connection) connectVPP() error {
        return nil
 }
 
-// Disconnect disconnects from VPP and releases all connection-related resources.
+// Disconnect disconnects from VPP API and releases all connection-related resources.
 func (c *Connection) Disconnect() {
        if c == nil {
                return
        }
-
        if c.vppClient != nil {
                c.disconnectVPP()
        }
@@ -172,7 +206,12 @@ func (c *Connection) Disconnect() {
 // disconnectVPP disconnects from VPP in case it is connected.
 func (c *Connection) disconnectVPP() {
        if atomic.CompareAndSwapUint32(&c.vppConnected, 1, 0) {
-               c.vppClient.Disconnect()
+               log.Debug("Disconnecting from VPP..")
+
+               if err := c.vppClient.Disconnect(); err != nil {
+                       log.Debugf("Disconnect from VPP failed: %v", err)
+               }
+               log.Debug("Disconnected from VPP")
        }
 }
 
@@ -221,21 +260,21 @@ func (c *Connection) releaseAPIChannel(ch *Channel) {
 // connectLoop attempts to connect to VPP until it succeeds.
 // Then it continues with healthCheckLoop.
 func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
-       reconnectAttempts := 0
+       var reconnectAttempts int
 
        // loop until connected
        for {
                if err := c.vppClient.WaitReady(); err != nil {
-                       log.Warnf("wait ready failed: %v", err)
+                       log.Debugf("wait ready failed: %v", err)
                }
                if err := c.connectVPP(); err == nil {
                        // signal connected event
                        connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected}
                        break
-               } else if reconnectAttempts < MaxReconnectAttempts {
+               } else if reconnectAttempts < c.maxAttempts {
                        reconnectAttempts++
-                       log.Errorf("connecting failed (attempt %d/%d): %v", reconnectAttempts, MaxReconnectAttempts, err)
-                       time.Sleep(ReconnectInterval)
+                       log.Warnf("connecting failed (attempt %d/%d): %v", reconnectAttempts, c.maxAttempts, err)
+                       time.Sleep(c.recInterval)
                } else {
                        connChan <- ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err}
                        return
@@ -280,7 +319,7 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
                }
 
                // send the control ping request
-               ch.reqChan <- &vppRequest{msg: msgControlPing}
+               ch.reqChan <- &vppRequest{msg: c.msgControlPing}
 
                for {
                        // expect response within timeout period
@@ -335,7 +374,11 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
 }
 
 func getMsgNameWithCrc(x api.Message) string {
-       return x.GetMessageName() + "_" + x.GetCrcString()
+       return getMsgID(x.GetMessageName(), x.GetCrcString())
+}
+
+func getMsgID(name, crc string) string {
+       return name + "_" + crc
 }
 
 func getMsgFactory(msg api.Message) func() api.Message {
@@ -386,26 +429,32 @@ func (c *Connection) retrieveMessageIDs() (err error) {
 
        var n int
        for name, msg := range msgs {
+               typ := reflect.TypeOf(msg).Elem()
+               path := fmt.Sprintf("%s.%s", typ.PkgPath(), typ.Name())
+
                msgID, err := c.GetMessageID(msg)
                if err != nil {
-                       log.Debugf("retrieving msgID for %s failed: %v", name, err)
+                       if debugMsgIDs {
+                               log.Debugf("retrieving message ID for %s failed: %v", path, err)
+                       }
                        continue
                }
                n++
 
-               if c.pingReqID == 0 && msg.GetMessageName() == msgControlPing.GetMessageName() {
+               if c.pingReqID == 0 && msg.GetMessageName() == c.msgControlPing.GetMessageName() {
                        c.pingReqID = msgID
-                       msgControlPing = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
-               } else if c.pingReplyID == 0 && msg.GetMessageName() == msgControlPingReply.GetMessageName() {
+                       c.msgControlPing = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
+               } else if c.pingReplyID == 0 && msg.GetMessageName() == c.msgControlPingReply.GetMessageName() {
                        c.pingReplyID = msgID
-                       msgControlPingReply = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
+                       c.msgControlPingReply = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
                }
 
                if debugMsgIDs {
                        log.Debugf("message %q (%s) has ID: %d", name, getMsgNameWithCrc(msg), msgID)
                }
        }
-       log.Debugf("retrieved %d/%d msgIDs (took %s)", n, len(msgs), time.Since(t))
+       log.WithField("took", time.Since(t)).
+               Debugf("retrieved IDs for %d messages (registered %d)", n, len(msgs))
 
        return nil
 }