Lookup message name by ID when receiving unexpected message
[govpp.git] / core / core.go
index a045e60..ebe7f68 100644 (file)
@@ -23,12 +23,16 @@ import (
        "sync/atomic"
        "time"
 
-       logger "github.com/Sirupsen/logrus"
+       logger "github.com/sirupsen/logrus"
 
        "git.fd.io/govpp.git/adapter"
        "git.fd.io/govpp.git/api"
        "git.fd.io/govpp.git/core/bin_api/vpe"
-       "github.com/fsnotify/fsnotify"
+)
+
+var (
+       msgControlPing      api.Message = &vpe.ControlPing{}
+       msgControlPingReply api.Message = &vpe.ControlPingReply{}
 )
 
 const (
@@ -54,15 +58,6 @@ const (
        Disconnected = iota
 )
 
-const (
-       // watchedFolder is a folder where vpp's shared memory is supposed to be created.
-       // File system events are monitored in this folder.
-       watchedFolder = "/dev/shm/"
-       // watchedFile is a name of the file in the watchedFolder. Once the file is present
-       // the vpp is ready to accept a new connection.
-       watchedFile = watchedFolder + "vpe-api"
-)
-
 // ConnectionEvent is a notification about change in the VPP connection state.
 type ConnectionEvent struct {
        // Timestamp holds the time when the event has been generated.
@@ -78,14 +73,14 @@ type Connection struct {
        connected uint32             // non-zero if the adapter is connected to VPP
        codec     *MsgCodec          // message codec
 
-       msgIDs     map[string]uint16 // map of message IDs indexed by message name + CRC
        msgIDsLock sync.RWMutex      // lock for the message IDs map
+       msgIDs     map[string]uint16 // map of message IDs indexed by message name + CRC
 
-       channels     map[uint32]*api.Channel // map of all API channels indexed by the channel ID
        channelsLock sync.RWMutex            // lock for the channels map
+       channels     map[uint32]*api.Channel // map of all API channels indexed by the channel ID
 
-       notifSubscriptions     map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID
        notifSubscriptionsLock sync.RWMutex                        // lock for the subscriptions map
+       notifSubscriptions     map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID
 
        maxChannelID uint32 // maximum used client ID
        pingReqID    uint16 // ID if the ControlPing message
@@ -138,6 +133,12 @@ func SetHealthCheckThreshold(threshold int) {
        healthCheckThreshold = threshold
 }
 
+// SetControlPingMessages sets the messages for ControlPing and ControlPingReply
+func SetControlPingMessages(controPing, controlPingReply api.Message) {
+       msgControlPing = controPing
+       msgControlPingReply = controlPingReply
+}
+
 // Connect connects to VPP using specified VPP adapter and returns the connection handle.
 // This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed.
 func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) {
@@ -197,10 +198,13 @@ func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) {
                return nil, errors.New("only one connection per process is supported")
        }
 
-       conn = &Connection{vpp: vppAdapter, codec: &MsgCodec{}}
-       conn.channels = make(map[uint32]*api.Channel)
-       conn.msgIDs = make(map[string]uint16)
-       conn.notifSubscriptions = make(map[uint16][]*api.NotifSubscription)
+       conn = &Connection{
+               vpp:                vppAdapter,
+               codec:              &MsgCodec{},
+               channels:           make(map[uint32]*api.Channel),
+               msgIDs:             make(map[string]uint16),
+               notifSubscriptions: make(map[uint16][]*api.NotifSubscription),
+       }
 
        conn.vpp.SetMsgCallback(msgCallback)
        return conn, nil
@@ -217,13 +221,19 @@ func (c *Connection) connectVPP() error {
                return err
        }
 
+       // store control ping IDs
+       if c.pingReqID, err = c.GetMessageID(msgControlPing); err != nil {
+               c.vpp.Disconnect()
+               return err
+       }
+       if c.pingReplyID, err = c.GetMessageID(msgControlPingReply); err != nil {
+               c.vpp.Disconnect()
+               return err
+       }
+
        // store connected state
        atomic.StoreUint32(&c.connected, 1)
 
-       // store control ping IDs
-       c.pingReqID, _ = c.GetMessageID(&vpe.ControlPing{})
-       c.pingReplyID, _ = c.GetMessageID(&vpe.ControlPingReply{})
-
        log.Info("Connected to VPP.")
        return nil
 }
@@ -235,53 +245,21 @@ func (c *Connection) disconnectVPP() {
        }
 }
 
-func fileExists(name string) bool {
-       if _, err := os.Stat(name); err != nil {
-               if os.IsNotExist(err) {
-                       return false
-               }
-       }
-       return true
-}
-
-// waitForVpp blocks until shared memory for sending bin api calls
-// is present on the file system.
-func waitForVpp() error {
-       watcher, err := fsnotify.NewWatcher()
-       if err != nil {
-               return err
-       }
-       defer watcher.Close()
-
-       err = watcher.Add(watchedFolder)
-       if err != nil {
-               return err
-       }
-
-       if fileExists(watchedFile) {
-               return nil
-       }
-
-       for {
-               ev := <-watcher.Events
-               if ev.Name == watchedFile && (ev.Op&fsnotify.Create) == fsnotify.Create {
-                       break
-               }
-       }
-       return nil
-}
-
 // connectLoop attempts to connect to VPP until it succeeds.
 // Then it continues with healthCheckLoop.
 func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
        // loop until connected
        for {
-               waitForVpp()
-               err := c.connectVPP()
-               if err == nil {
+               if err := c.vpp.WaitReady(); err != nil {
+                       log.Warnf("wait ready failed: %v", err)
+               }
+               if err := c.connectVPP(); err == nil {
                        // signal connected event
                        connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected}
                        break
+               } else {
+                       log.Errorf("connecting to VPP failed: %v", err)
+                       time.Sleep(time.Second)
                }
        }
 
@@ -312,7 +290,7 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
                }
 
                // send the control ping
-               ch.ReqChan <- &api.VppRequest{Message: &vpe.ControlPing{}}
+               ch.ReqChan <- &api.VppRequest{Message: msgControlPing}
 
                // expect response within timeout period
                select {
@@ -328,7 +306,7 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
                        failedChecks = 0
                }
 
-               if failedChecks >= healthCheckThreshold {
+               if failedChecks > healthCheckThreshold {
                        // in case of error, break & disconnect
                        log.Errorf("VPP health check failed: %v", err)
                        // signal disconnected event via channel