Change module name to go.fd.io/govpp
[govpp.git] / core / connection.go
index 14b0af4..2c05333 100644 (file)
@@ -17,6 +17,7 @@ package core
 import (
        "errors"
        "fmt"
+       "path"
        "reflect"
        "sync"
        "sync/atomic"
@@ -24,9 +25,14 @@ import (
 
        logger "github.com/sirupsen/logrus"
 
-       "git.fd.io/govpp.git/adapter"
-       "git.fd.io/govpp.git/api"
-       "git.fd.io/govpp.git/codec"
+       "go.fd.io/govpp/adapter"
+       "go.fd.io/govpp/api"
+       "go.fd.io/govpp/codec"
+)
+
+const (
+       DefaultReconnectInterval    = time.Second / 2 // default interval between reconnect attempts
+       DefaultMaxReconnectAttempts = 3               // default maximum number of reconnect attempts
 )
 
 var (
@@ -36,12 +42,10 @@ var (
 )
 
 var (
-       HealthCheckProbeInterval = time.Second * 1        // default health check probe interval
-       HealthCheckReplyTimeout  = time.Millisecond * 100 // timeout for reply to a health check probe
-       HealthCheckThreshold     = 1                      // number of failed health checks until the error is reported
-       DefaultReplyTimeout      = time.Second * 1        // default timeout for replies from VPP
-       ReconnectInterval        = time.Second * 1        // default interval for reconnect attempts
-       MaxReconnectAttempts     = 10                     // maximum number of reconnect attempts
+       HealthCheckProbeInterval = time.Second            // default health check probe interval
+       HealthCheckReplyTimeout  = time.Millisecond * 250 // timeout for reply to a health check probe
+       HealthCheckThreshold     = 2                      // number of failed health checks until the error is reported
+       DefaultReplyTimeout      = time.Second            // default timeout for replies from VPP
 )
 
 // ConnectionState represents the current state of the connection to VPP.
@@ -51,13 +55,32 @@ const (
        // Connected represents state in which the connection has been successfully established.
        Connected ConnectionState = iota
 
-       // Disconnected represents state in which the connection has been dropped.
+       // NotResponding represents a state where the VPP socket accepts messages but replies are received with delay,
+       // or not at all. GoVPP treats this state internally the same as disconnected.
+       NotResponding
+
+       // Disconnected represents state in which the VPP socket is closed and the connection is considered dropped.
        Disconnected
 
        // Failed represents state in which the reconnecting failed after exceeding maximum number of attempts.
        Failed
 )
 
+func (s ConnectionState) String() string {
+       switch s {
+       case Connected:
+               return "Connected"
+       case NotResponding:
+               return "NotResponding"
+       case Disconnected:
+               return "Disconnected"
+       case Failed:
+               return "Failed"
+       default:
+               return fmt.Sprintf("UnknownState(%d)", s)
+       }
+}
+
 // ConnectionEvent is a notification about change in the VPP connection state.
 type ConnectionEvent struct {
        // Timestamp holds the time when the event has been created.
@@ -72,17 +95,23 @@ type ConnectionEvent struct {
 
 // Connection represents a shared memory connection to VPP via vppAdapter.
 type Connection struct {
-       vppClient adapter.VppAPI // VPP binary API client adapter
+       vppClient adapter.VppAPI // VPP binary API client
+
+       maxAttempts int           // interval for reconnect attempts
+       recInterval time.Duration // maximum number of reconnect attempts
 
        vppConnected uint32 // non-zero if the adapter is connected to VPP
 
-       codec  *codec.MsgCodec        // message codec
-       msgIDs map[string]uint16      // map of message IDs indexed by message name + CRC
-       msgMap map[uint16]api.Message // map of messages indexed by message ID
+       connChan        chan ConnectionEvent // connection status events are sent to this channel
+       healthCheckDone chan struct{}        // used to terminate health check loop
+
+       codec        MessageCodec                      // message codec
+       msgIDs       map[string]uint16                 // map of message IDs indexed by message name + CRC
+       msgMapByPath map[string]map[uint16]api.Message // map of messages indexed by message ID which are indexed by path
 
-       maxChannelID uint32              // maximum used channel ID (the real limit is 2^15, 32-bit is used for atomic operations)
-       channelsLock sync.RWMutex        // lock for the channels map
-       channels     map[uint16]*Channel // map of all API channels indexed by the channel ID
+       channelsLock  sync.RWMutex        // lock for the channels map and the channel ID
+       nextChannelID uint16              // next potential channel ID (the real limit is 2^15)
+       channels      map[uint16]*Channel // map of all API channels indexed by the channel ID
 
        subscriptionsLock sync.RWMutex                  // lock for the subscriptions map
        subscriptions     map[uint16][]*subscriptionCtx // map od all notification subscriptions indexed by message ID
@@ -92,26 +121,49 @@ type Connection struct {
 
        lastReplyLock sync.Mutex // lock for the last reply
        lastReply     time.Time  // time of the last received reply from VPP
+
+       msgControlPing      api.Message
+       msgControlPingReply api.Message
+
+       apiTrace *trace // API tracer (disabled by default)
 }
 
-func newConnection(binapi adapter.VppAPI) *Connection {
+func newConnection(binapi adapter.VppAPI, attempts int, interval time.Duration) *Connection {
+       if attempts == 0 {
+               attempts = DefaultMaxReconnectAttempts
+       }
+       if interval == 0 {
+               interval = DefaultReconnectInterval
+       }
+
        c := &Connection{
-               vppClient:     binapi,
-               codec:         &codec.MsgCodec{},
-               msgIDs:        make(map[string]uint16),
-               msgMap:        make(map[uint16]api.Message),
-               channels:      make(map[uint16]*Channel),
-               subscriptions: make(map[uint16][]*subscriptionCtx),
+               vppClient:           binapi,
+               maxAttempts:         attempts,
+               recInterval:         interval,
+               connChan:            make(chan ConnectionEvent, NotificationChanBufSize),
+               healthCheckDone:     make(chan struct{}),
+               codec:               codec.DefaultCodec,
+               msgIDs:              make(map[string]uint16),
+               msgMapByPath:        make(map[string]map[uint16]api.Message),
+               channels:            make(map[uint16]*Channel),
+               subscriptions:       make(map[uint16][]*subscriptionCtx),
+               msgControlPing:      msgControlPing,
+               msgControlPingReply: msgControlPingReply,
+               apiTrace: &trace{
+                       list: make([]*api.Record, 0),
+                       mux:  &sync.Mutex{},
+               },
        }
        binapi.SetMsgCallback(c.msgCallback)
        return c
 }
 
-// Connect connects to VPP using specified VPP adapter and returns the connection handle.
-// This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed.
+// Connect connects to VPP API using specified adapter and returns a connection handle.
+// This call blocks until it is either connected, or an error occurs.
+// Only one connection attempt will be performed.
 func Connect(binapi adapter.VppAPI) (*Connection, error) {
        // create new connection handle
-       c := newConnection(binapi)
+       c := newConnection(binapi, DefaultMaxReconnectAttempts, DefaultReconnectInterval)
 
        // blocking attempt to connect to VPP
        if err := c.connectVPP(); err != nil {
@@ -125,15 +177,14 @@ func Connect(binapi adapter.VppAPI) (*Connection, error) {
 // and ConnectionState channel. This call does not block until connection is established, it
 // returns immediately. The caller is supposed to watch the returned ConnectionState channel for
 // Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect.
-func AsyncConnect(binapi adapter.VppAPI) (*Connection, chan ConnectionEvent, error) {
+func AsyncConnect(binapi adapter.VppAPI, attempts int, interval time.Duration) (*Connection, chan ConnectionEvent, error) {
        // create new connection handle
-       c := newConnection(binapi)
+       c := newConnection(binapi, attempts, interval)
 
        // asynchronously attempt to connect to VPP
-       connChan := make(chan ConnectionEvent, NotificationChanBufSize)
-       go c.connectLoop(connChan)
+       go c.connectLoop()
 
-       return c, connChan, nil
+       return c, c.connChan, nil
 }
 
 // connectVPP performs blocking attempt to connect to VPP.
@@ -144,11 +195,12 @@ func (c *Connection) connectVPP() error {
        if err := c.vppClient.Connect(); err != nil {
                return err
        }
-
-       log.Debugf("Connected to VPP.")
+       log.Debugf("Connected to VPP")
 
        if err := c.retrieveMessageIDs(); err != nil {
-               c.vppClient.Disconnect()
+               if err := c.vppClient.Disconnect(); err != nil {
+                       log.Debugf("disconnecting vpp client failed: %v", err)
+               }
                return fmt.Errorf("VPP is incompatible: %v", err)
        }
 
@@ -158,21 +210,30 @@ func (c *Connection) connectVPP() error {
        return nil
 }
 
-// Disconnect disconnects from VPP and releases all connection-related resources.
+// Disconnect disconnects from VPP API and releases all connection-related resources.
 func (c *Connection) Disconnect() {
        if c == nil {
                return
        }
-
        if c.vppClient != nil {
-               c.disconnectVPP()
+               c.disconnectVPP(true)
        }
 }
 
-// disconnectVPP disconnects from VPP in case it is connected.
-func (c *Connection) disconnectVPP() {
+// disconnectVPP disconnects from VPP in case it is connected. terminate tells
+// that disconnectVPP() was called from Close(), so healthCheckLoop() can be
+// terminated.
+func (c *Connection) disconnectVPP(terminate bool) {
        if atomic.CompareAndSwapUint32(&c.vppConnected, 1, 0) {
-               c.vppClient.Disconnect()
+               if terminate {
+                       close(c.healthCheckDone)
+               }
+               log.Debug("Disconnecting from VPP..")
+
+               if err := c.vppClient.Disconnect(); err != nil {
+                       log.Debugf("Disconnect from VPP failed: %v", err)
+               }
+               log.Debug("Disconnected from VPP")
        }
 }
 
@@ -191,14 +252,10 @@ func (c *Connection) newAPIChannel(reqChanBufSize, replyChanBufSize int) (*Chann
                return nil, errors.New("nil connection passed in")
        }
 
-       // create new channel
-       chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff)
-       channel := newChannel(chID, c, c.codec, c, reqChanBufSize, replyChanBufSize)
-
-       // store API channel within the client
-       c.channelsLock.Lock()
-       c.channels[chID] = channel
-       c.channelsLock.Unlock()
+       channel, err := c.newChannel(reqChanBufSize, replyChanBufSize)
+       if err != nil {
+               return nil, err
+       }
 
        // start watching on the request channel
        go c.watchRequests(channel)
@@ -220,41 +277,42 @@ func (c *Connection) releaseAPIChannel(ch *Channel) {
 
 // connectLoop attempts to connect to VPP until it succeeds.
 // Then it continues with healthCheckLoop.
-func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
-       reconnectAttempts := 0
+func (c *Connection) connectLoop() {
+       var reconnectAttempts int
 
        // loop until connected
        for {
                if err := c.vppClient.WaitReady(); err != nil {
-                       log.Warnf("wait ready failed: %v", err)
+                       log.Debugf("wait ready failed: %v", err)
                }
                if err := c.connectVPP(); err == nil {
                        // signal connected event
-                       connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected}
+                       c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Connected})
                        break
-               } else if reconnectAttempts < MaxReconnectAttempts {
+               } else if reconnectAttempts < c.maxAttempts {
                        reconnectAttempts++
-                       log.Errorf("connecting failed (attempt %d/%d): %v", reconnectAttempts, MaxReconnectAttempts, err)
-                       time.Sleep(ReconnectInterval)
+                       log.Warnf("connecting failed (attempt %d/%d): %v", reconnectAttempts, c.maxAttempts, err)
+                       time.Sleep(c.recInterval)
                } else {
-                       connChan <- ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err}
+                       c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err})
                        return
                }
        }
 
        // we are now connected, continue with health check loop
-       c.healthCheckLoop(connChan)
+       c.healthCheckLoop()
 }
 
 // healthCheckLoop checks whether connection to VPP is alive. In case of disconnect,
 // it continues with connectLoop and tries to reconnect.
-func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
+func (c *Connection) healthCheckLoop() {
        // create a separate API channel for health check probes
        ch, err := c.newAPIChannel(1, 1)
        if err != nil {
                log.Error("Failed to create health check API channel, health check will be disabled:", err)
                return
        }
+       defer ch.Close()
 
        var (
                sinceLastReply time.Duration
@@ -262,80 +320,85 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
        )
 
        // send health check probes until an error or timeout occurs
-       for {
-               // sleep until next health check probe period
-               time.Sleep(HealthCheckProbeInterval)
+       probeInterval := time.NewTicker(HealthCheckProbeInterval)
+       defer probeInterval.Stop()
 
-               if atomic.LoadUint32(&c.vppConnected) == 0 {
-                       // Disconnect has been called in the meantime, return the healthcheck - reconnect loop
+HealthCheck:
+       for {
+               select {
+               case <-c.healthCheckDone:
+                       // Terminate the health check loop on connection disconnect
                        log.Debug("Disconnected on request, exiting health check loop.")
                        return
-               }
-
-               // try draining probe replies from previous request before sending next one
-               select {
-               case <-ch.replyChan:
-                       log.Debug("drained old probe reply from reply channel")
-               default:
-               }
+               case <-probeInterval.C:
+                       // try draining probe replies from previous request before sending next one
+                       select {
+                       case <-ch.replyChan:
+                               log.Debug("drained old probe reply from reply channel")
+                       default:
+                       }
 
-               // send the control ping request
-               ch.reqChan <- &vppRequest{msg: msgControlPing}
+                       // send the control ping request
+                       ch.reqChan <- &vppRequest{msg: c.msgControlPing}
 
-               for {
-                       // expect response within timeout period
-                       select {
-                       case vppReply := <-ch.replyChan:
-                               err = vppReply.err
+                       for {
+                               // expect response within timeout period
+                               select {
+                               case vppReply := <-ch.replyChan:
+                                       err = vppReply.err
 
-                       case <-time.After(HealthCheckReplyTimeout):
-                               err = ErrProbeTimeout
+                               case <-time.After(HealthCheckReplyTimeout):
+                                       err = ErrProbeTimeout
 
-                               // check if time since last reply from any other
-                               // channel is less than health check reply timeout
-                               c.lastReplyLock.Lock()
-                               sinceLastReply = time.Since(c.lastReply)
-                               c.lastReplyLock.Unlock()
+                                       // check if time since last reply from any other
+                                       // channel is less than health check reply timeout
+                                       c.lastReplyLock.Lock()
+                                       sinceLastReply = time.Since(c.lastReply)
+                                       c.lastReplyLock.Unlock()
 
-                               if sinceLastReply < HealthCheckReplyTimeout {
-                                       log.Warnf("VPP health check probe timing out, but some request on other channel was received %v ago, continue waiting!", sinceLastReply)
-                                       continue
+                                       if sinceLastReply < HealthCheckReplyTimeout {
+                                               log.Warnf("VPP health check probe timing out, but some request on other channel was received %v ago, continue waiting!", sinceLastReply)
+                                               continue
+                                       }
                                }
+                               break
                        }
-                       break
-               }
 
-               if err == ErrProbeTimeout {
-                       failedChecks++
-                       log.Warnf("VPP health check probe timed out after %v (%d. timeout)", HealthCheckReplyTimeout, failedChecks)
-                       if failedChecks > HealthCheckThreshold {
-                               // in case of exceeded failed check treshold, assume VPP disconnected
-                               log.Errorf("VPP health check exceeded treshold for timeouts (>%d), assuming disconnect", HealthCheckThreshold)
-                               connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
-                               break
+                       if err == ErrProbeTimeout {
+                               failedChecks++
+                               log.Warnf("VPP health check probe timed out after %v (%d. timeout)", HealthCheckReplyTimeout, failedChecks)
+                               if failedChecks > HealthCheckThreshold {
+                                       // in case of exceeded failed check threshold, assume VPP unresponsive
+                                       log.Errorf("VPP does not responding, the health check exceeded threshold for timeouts (>%d)", HealthCheckThreshold)
+                                       c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: NotResponding})
+                                       break HealthCheck
+                               }
+                       } else if err != nil {
+                               // in case of error, assume VPP disconnected
+                               log.Errorf("VPP health check probe failed: %v", err)
+                               c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: err})
+                               break HealthCheck
+                       } else if failedChecks > 0 {
+                               // in case of success after failed checks, clear failed check counter
+                               failedChecks = 0
+                               log.Infof("VPP health check probe OK")
                        }
-               } else if err != nil {
-                       // in case of error, assume VPP disconnected
-                       log.Errorf("VPP health check probe failed: %v", err)
-                       connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: err}
-                       break
-               } else if failedChecks > 0 {
-                       // in case of success after failed checks, clear failed check counter
-                       failedChecks = 0
-                       log.Infof("VPP health check probe OK")
                }
        }
 
        // cleanup
-       ch.Close()
-       c.disconnectVPP()
+       c.disconnectVPP(false)
 
        // we are now disconnected, start connect loop
-       c.connectLoop(connChan)
+       c.connectLoop()
 }
 
 func getMsgNameWithCrc(x api.Message) string {
-       return x.GetMessageName() + "_" + x.GetCrcString()
+       return getMsgID(x.GetMessageName(), x.GetCrcString())
+}
+
+func getMsgID(name, crc string) string {
+       return name + "_" + crc
 }
 
 func getMsgFactory(msg api.Message) func() api.Message {
@@ -349,63 +412,103 @@ func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
        if c == nil {
                return 0, errors.New("nil connection passed in")
        }
-
-       if msgID, ok := c.msgIDs[getMsgNameWithCrc(msg)]; ok {
-               return msgID, nil
-       }
-
+       pkgPath := c.GetMessagePath(msg)
        msgID, err := c.vppClient.GetMsgID(msg.GetMessageName(), msg.GetCrcString())
        if err != nil {
                return 0, err
        }
-
+       if pathMsgs, pathOk := c.msgMapByPath[pkgPath]; !pathOk {
+               c.msgMapByPath[pkgPath] = make(map[uint16]api.Message)
+               c.msgMapByPath[pkgPath][msgID] = msg
+       } else if _, msgOk := pathMsgs[msgID]; !msgOk {
+               c.msgMapByPath[pkgPath][msgID] = msg
+       }
+       if _, ok := c.msgIDs[getMsgNameWithCrc(msg)]; ok {
+               return msgID, nil
+       }
        c.msgIDs[getMsgNameWithCrc(msg)] = msgID
-       c.msgMap[msgID] = msg
-
        return msgID, nil
 }
 
 // LookupByID looks up message name and crc by ID.
-func (c *Connection) LookupByID(msgID uint16) (api.Message, error) {
+func (c *Connection) LookupByID(path string, msgID uint16) (api.Message, error) {
        if c == nil {
                return nil, errors.New("nil connection passed in")
        }
-
-       if msg, ok := c.msgMap[msgID]; ok {
+       if msg, ok := c.msgMapByPath[path][msgID]; ok {
                return msg, nil
        }
+       return nil, fmt.Errorf("unknown message ID %d for path '%s'", msgID, path)
+}
 
-       return nil, fmt.Errorf("unknown message ID: %d", msgID)
+// GetMessagePath returns path for the given message
+func (c *Connection) GetMessagePath(msg api.Message) string {
+       return path.Dir(reflect.TypeOf(msg).Elem().PkgPath())
 }
 
 // retrieveMessageIDs retrieves IDs for all registered messages and stores them in map
 func (c *Connection) retrieveMessageIDs() (err error) {
        t := time.Now()
 
-       msgs := api.GetRegisteredMessages()
+       msgsByPath := api.GetRegisteredMessages()
 
        var n int
-       for name, msg := range msgs {
-               msgID, err := c.GetMessageID(msg)
-               if err != nil {
-                       log.Debugf("retrieving msgID for %s failed: %v", name, err)
-                       continue
-               }
-               n++
-
-               if c.pingReqID == 0 && msg.GetMessageName() == msgControlPing.GetMessageName() {
-                       c.pingReqID = msgID
-                       msgControlPing = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
-               } else if c.pingReplyID == 0 && msg.GetMessageName() == msgControlPingReply.GetMessageName() {
-                       c.pingReplyID = msgID
-                       msgControlPingReply = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
-               }
+       for pkgPath, msgs := range msgsByPath {
+               for _, msg := range msgs {
+                       msgID, err := c.GetMessageID(msg)
+                       if err != nil {
+                               if debugMsgIDs {
+                                       log.Debugf("retrieving message ID for %s.%s failed: %v",
+                                               pkgPath, msg.GetMessageName(), err)
+                               }
+                               continue
+                       }
+                       n++
+
+                       if c.pingReqID == 0 && msg.GetMessageName() == c.msgControlPing.GetMessageName() {
+                               c.pingReqID = msgID
+                               c.msgControlPing = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
+                       } else if c.pingReplyID == 0 && msg.GetMessageName() == c.msgControlPingReply.GetMessageName() {
+                               c.pingReplyID = msgID
+                               c.msgControlPingReply = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
+                       }
 
-               if debugMsgIDs {
-                       log.Debugf("message %q (%s) has ID: %d", name, getMsgNameWithCrc(msg), msgID)
+                       if debugMsgIDs {
+                               log.Debugf("message %q (%s) has ID: %d", msg.GetMessageName(), getMsgNameWithCrc(msg), msgID)
+                       }
                }
+               log.WithField("took", time.Since(t)).
+                       Debugf("retrieved IDs for %d messages (registered %d) from path %s", n, len(msgs), pkgPath)
        }
-       log.Debugf("retrieved %d/%d msgIDs (took %s)", n, len(msgs), time.Since(t))
 
        return nil
 }
+
+func (c *Connection) sendConnEvent(event ConnectionEvent) {
+       select {
+       case c.connChan <- event:
+       default:
+               log.Warn("Connection state channel is full, discarding value.")
+       }
+}
+
+// Trace gives access to the API trace interface
+func (c *Connection) Trace() api.Trace {
+       return c.apiTrace
+}
+
+// trace records api message
+func (c *Connection) trace(msg api.Message, chId uint16, t time.Time, isReceived bool) {
+       if atomic.LoadInt32(&c.apiTrace.isEnabled) == 0 {
+               return
+       }
+       entry := &api.Record{
+               Message:    msg,
+               Timestamp:  t,
+               IsReceived: isReceived,
+               ChannelID:  chId,
+       }
+       c.apiTrace.mux.Lock()
+       c.apiTrace.list = append(c.apiTrace.list, entry)
+       c.apiTrace.mux.Unlock()
+}