Lookup message name by ID when receiving unexpected message
[govpp.git] / core / core.go
index 550b6a7..ebe7f68 100644 (file)
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package core
+//go:generate binapi-generator --input-dir=bin_api --output-dir=bin_api
 
-//go:generate binapi_generator --input-file=/usr/share/vpp/api/vpe.api.json --output-dir=./bin_api
+package core
 
 import (
        "errors"
-       "fmt"
        "os"
        "sync"
        "sync/atomic"
+       "time"
 
-       logger "github.com/Sirupsen/logrus"
+       logger "github.com/sirupsen/logrus"
 
        "git.fd.io/govpp.git/adapter"
        "git.fd.io/govpp.git/api"
        "git.fd.io/govpp.git/core/bin_api/vpe"
 )
 
+var (
+       msgControlPing      api.Message = &vpe.ControlPing{}
+       msgControlPingReply api.Message = &vpe.ControlPingReply{}
+)
+
+const (
+       requestChannelBufSize      = 100 // default size of the request channel buffers
+       replyChannelBufSize        = 100 // default size of the reply channel buffers
+       notificationChannelBufSize = 100 // default size of the notification channel buffers
+)
+
+var (
+       healthCheckProbeInterval = time.Second * 1        // default health check probe interval
+       healthCheckReplyTimeout  = time.Millisecond * 100 // timeout for reply to a health check probe
+       healthCheckThreshold     = 1                      // number of failed healthProbe until the error is reported
+)
+
+// ConnectionState holds the current state of the connection to VPP.
+type ConnectionState int
+
 const (
-       requestChannelBufSize = 100 // default size of the request channel buffers
-       replyChannelBufSize   = 100 // default size of the reply channel buffers
+       // Connected connection state means that the connection to VPP has been successfully established.
+       Connected ConnectionState = iota
+
+       // Disconnected connection state means that the connection to VPP has been lost.
+       Disconnected = iota
 )
 
+// ConnectionEvent is a notification about change in the VPP connection state.
+type ConnectionEvent struct {
+       // Timestamp holds the time when the event has been generated.
+       Timestamp time.Time
+
+       // State holds the new state of the connection to VPP at the time when the event has been generated.
+       State ConnectionState
+}
+
 // Connection represents a shared memory connection to VPP via vppAdapter.
 type Connection struct {
-       vpp   adapter.VppAdapter // VPP adapter
-       codec *MsgCodec          // message codec
+       vpp       adapter.VppAdapter // VPP adapter
+       connected uint32             // non-zero if the adapter is connected to VPP
+       codec     *MsgCodec          // message codec
 
-       msgIDs     map[string]uint16 // map os message IDs indexed by message name + CRC
        msgIDsLock sync.RWMutex      // lock for the message IDs map
+       msgIDs     map[string]uint16 // map of message IDs indexed by message name + CRC
 
-       channels     map[uint32]*api.Channel // map of all API channels indexed by the channel ID
        channelsLock sync.RWMutex            // lock for the channels map
+       channels     map[uint32]*api.Channel // map of all API channels indexed by the channel ID
 
-       notifSubscriptions     map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID
        notifSubscriptionsLock sync.RWMutex                        // lock for the subscriptions map
+       notifSubscriptions     map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID
 
        maxChannelID uint32 // maximum used client ID
        pingReqID    uint16 // ID if the ControlPing message
@@ -78,8 +111,86 @@ func SetLogger(l *logger.Logger) {
        log = l
 }
 
+// SetHealthCheckProbeInterval sets health check probe interval.
+// Beware: Function is not thread-safe. It is recommended to setup this parameter
+// before connecting to vpp.
+func SetHealthCheckProbeInterval(interval time.Duration) {
+       healthCheckProbeInterval = interval
+}
+
+// SetHealthCheckReplyTimeout sets timeout for reply to a health check probe.
+// If reply arrives after the timeout, check is considered as failed.
+// Beware: Function is not thread-safe. It is recommended to setup this parameter
+// before connecting to vpp.
+func SetHealthCheckReplyTimeout(timeout time.Duration) {
+       healthCheckReplyTimeout = timeout
+}
+
+// SetHealthCheckThreshold sets the number of failed healthProbe checks until the error is reported.
+// Beware: Function is not thread-safe. It is recommended to setup this parameter
+// before connecting to vpp.
+func SetHealthCheckThreshold(threshold int) {
+       healthCheckThreshold = threshold
+}
+
+// SetControlPingMessages sets the messages for ControlPing and ControlPingReply
+func SetControlPingMessages(controPing, controlPingReply api.Message) {
+       msgControlPing = controPing
+       msgControlPingReply = controlPingReply
+}
+
 // Connect connects to VPP using specified VPP adapter and returns the connection handle.
+// This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed.
 func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) {
+       // create new connection handle
+       c, err := newConnection(vppAdapter)
+       if err != nil {
+               return nil, err
+       }
+
+       // blocking attempt to connect to VPP
+       err = c.connectVPP()
+       if err != nil {
+               return nil, err
+       }
+
+       return conn, nil
+}
+
+// AsyncConnect asynchronously connects to VPP using specified VPP adapter and returns the connection handle
+// and ConnectionState channel. This call does not block until connection is established, it
+// returns immediately. The caller is supposed to watch the returned ConnectionState channel for
+// Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect.
+func AsyncConnect(vppAdapter adapter.VppAdapter) (*Connection, chan ConnectionEvent, error) {
+       // create new connection handle
+       c, err := newConnection(vppAdapter)
+       if err != nil {
+               return nil, nil, err
+       }
+
+       // asynchronously attempt to connect to VPP
+       connChan := make(chan ConnectionEvent, notificationChannelBufSize)
+       go c.connectLoop(connChan)
+
+       return conn, connChan, nil
+}
+
+// Disconnect disconnects from VPP and releases all connection-related resources.
+func (c *Connection) Disconnect() {
+       if c == nil {
+               return
+       }
+       connLock.Lock()
+       defer connLock.Unlock()
+
+       if c != nil && c.vpp != nil {
+               c.disconnectVPP()
+       }
+       conn = nil
+}
+
+// newConnection returns new connection handle.
+func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) {
        connLock.Lock()
        defer connLock.Unlock()
 
@@ -87,49 +198,146 @@ func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) {
                return nil, errors.New("only one connection per process is supported")
        }
 
-       conn = &Connection{vpp: vppAdapter, codec: &MsgCodec{}}
-       conn.channels = make(map[uint32]*api.Channel)
-       conn.msgIDs = make(map[string]uint16)
-       conn.notifSubscriptions = make(map[uint16][]*api.NotifSubscription)
+       conn = &Connection{
+               vpp:                vppAdapter,
+               codec:              &MsgCodec{},
+               channels:           make(map[uint32]*api.Channel),
+               msgIDs:             make(map[string]uint16),
+               notifSubscriptions: make(map[uint16][]*api.NotifSubscription),
+       }
 
        conn.vpp.SetMsgCallback(msgCallback)
+       return conn, nil
+}
 
-       logger.Debug("Connecting to VPP...")
+// connectVPP performs one blocking attempt to connect to VPP.
+func (c *Connection) connectVPP() error {
+       log.Debug("Connecting to VPP...")
 
-       err := conn.vpp.Connect()
+       // blocking connect
+       err := c.vpp.Connect()
        if err != nil {
-               return nil, err
+               log.Warn(err)
+               return err
        }
 
        // store control ping IDs
-       conn.pingReqID, _ = conn.GetMessageID(&vpe.ControlPing{})
-       conn.pingReplyID, _ = conn.GetMessageID(&vpe.ControlPingReply{})
+       if c.pingReqID, err = c.GetMessageID(msgControlPing); err != nil {
+               c.vpp.Disconnect()
+               return err
+       }
+       if c.pingReplyID, err = c.GetMessageID(msgControlPingReply); err != nil {
+               c.vpp.Disconnect()
+               return err
+       }
 
-       logger.Debug("VPP connected.")
+       // store connected state
+       atomic.StoreUint32(&c.connected, 1)
 
-       return conn, nil
+       log.Info("Connected to VPP.")
+       return nil
 }
 
-// Disconnect disconnects from VPP.
-func (c *Connection) Disconnect() {
-       connLock.Lock()
-       defer connLock.Unlock()
-
-       if c != nil && c.vpp != nil {
+// disconnectVPP disconnects from VPP in case it is connected.
+func (c *Connection) disconnectVPP() {
+       if atomic.CompareAndSwapUint32(&c.connected, 1, 0) {
                c.vpp.Disconnect()
        }
-       conn = nil
+}
+
+// connectLoop attempts to connect to VPP until it succeeds.
+// Then it continues with healthCheckLoop.
+func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
+       // loop until connected
+       for {
+               if err := c.vpp.WaitReady(); err != nil {
+                       log.Warnf("wait ready failed: %v", err)
+               }
+               if err := c.connectVPP(); err == nil {
+                       // signal connected event
+                       connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected}
+                       break
+               } else {
+                       log.Errorf("connecting to VPP failed: %v", err)
+                       time.Sleep(time.Second)
+               }
+       }
+
+       // we are now connected, continue with health check loop
+       c.healthCheckLoop(connChan)
+}
+
+// healthCheckLoop checks whether connection to VPP is alive. In case of disconnect,
+// it continues with connectLoop and tries to reconnect.
+func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
+       // create a separate API channel for health check probes
+       ch, err := conn.NewAPIChannel()
+       if err != nil {
+               log.Error("Error by creating health check API channel, health check will be disabled:", err)
+               return
+       }
+
+       failedChecks := 0
+       // send health check probes until an error occurs
+       for {
+               // wait for healthCheckProbeInterval
+               <-time.After(healthCheckProbeInterval)
+
+               if atomic.LoadUint32(&c.connected) == 0 {
+                       // Disconnect has been called in the meantime, return the healthcheck - reconnect loop
+                       log.Debug("Disconnected on request, exiting health check loop.")
+                       return
+               }
+
+               // send the control ping
+               ch.ReqChan <- &api.VppRequest{Message: msgControlPing}
+
+               // expect response within timeout period
+               select {
+               case vppReply := <-ch.ReplyChan:
+                       err = vppReply.Error
+               case <-time.After(healthCheckReplyTimeout):
+                       err = errors.New("probe reply not received within the timeout period")
+               }
+
+               if err != nil {
+                       failedChecks++
+               } else {
+                       failedChecks = 0
+               }
+
+               if failedChecks > healthCheckThreshold {
+                       // in case of error, break & disconnect
+                       log.Errorf("VPP health check failed: %v", err)
+                       // signal disconnected event via channel
+                       connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
+                       break
+               }
+       }
+
+       // cleanup
+       ch.Close()
+       c.disconnectVPP()
+
+       // we are now disconnected, start connect loop
+       c.connectLoop(connChan)
 }
 
 // NewAPIChannel returns a new API channel for communication with VPP via govpp core.
 // It uses default buffer sizes for the request and reply Go channels.
 func (c *Connection) NewAPIChannel() (*api.Channel, error) {
+       if c == nil {
+               return nil, errors.New("nil connection passed in")
+       }
        return c.NewAPIChannelBuffered(requestChannelBufSize, replyChannelBufSize)
 }
 
 // NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core.
 // It allows to specify custom buffer sizes for the request and reply Go channels.
 func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (*api.Channel, error) {
+       if c == nil {
+               return nil, errors.New("nil connection passed in")
+       }
        chID := atomic.AddUint32(&c.maxChannelID, 1)
        chMeta := &channelMetadata{id: chID}
 
@@ -154,84 +362,6 @@ func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int)
        return ch, nil
 }
 
-// watchRequests watches for requests on the request API channel and forwards them as messages to VPP.
-func (c *Connection) watchRequests(ch *api.Channel, chMeta *channelMetadata) {
-       for {
-               select {
-               case req, ok := <-ch.ReqChan:
-                       // new request on the request channel
-                       if !ok {
-                               // after closing the request channel, release API channel and return
-                               c.releaseAPIChannel(ch, chMeta)
-                               return
-                       }
-                       c.processRequest(ch, chMeta, req)
-
-               case req := <-ch.NotifSubsChan:
-                       // new request on the notification subscribe channel
-                       c.processNotifSubscribeRequest(ch, req)
-               }
-       }
-}
-
-// processRequest processes a single request received on the request channel.
-func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, req *api.VppRequest) error {
-       // retrieve message ID
-       msgID, err := c.GetMessageID(req.Message)
-       if err != nil {
-               error := fmt.Errorf("unable to retrieve message ID: %v", err)
-               log.WithFields(logger.Fields{
-                       "msg_name": req.Message.GetMessageName(),
-                       "msg_crc":  req.Message.GetCrcString(),
-               }).Errorf("unable to retrieve message ID: %v", err)
-               sendReply(ch, &api.VppReply{Error: error})
-               return error
-       }
-
-       // encode the message into binary
-       data, err := c.codec.EncodeMsg(req.Message, msgID)
-       if err != nil {
-               error := fmt.Errorf("unable to encode the messge: %v", err)
-               log.WithFields(logger.Fields{
-                       "context": chMeta.id,
-                       "msg_id":  msgID,
-               }).Errorf("%v", error)
-               sendReply(ch, &api.VppReply{Error: error})
-               return error
-       }
-
-       // send the message
-       log.WithFields(logger.Fields{
-               "context":  chMeta.id,
-               "msg_id":   msgID,
-               "msg_size": len(data),
-       }).Debug("Sending a message to VPP.")
-
-       if req.Multipart {
-               // expect multipart response
-               atomic.StoreUint32(&chMeta.multipart, 1)
-       }
-
-       // send the request to VPP
-       c.vpp.SendMsg(chMeta.id, data)
-
-       if req.Multipart {
-               // send a control ping to determine end of the multipart response
-               ping := &vpe.ControlPing{}
-               pingData, _ := c.codec.EncodeMsg(ping, c.pingReqID)
-
-               log.WithFields(logger.Fields{
-                       "context":  chMeta.id,
-                       "msg_id":   c.pingReqID,
-                       "msg_size": len(pingData),
-               }).Debug("Sending a control ping to VPP.")
-
-               c.vpp.SendMsg(chMeta.id, pingData)
-       }
-
-       return nil
-}
-
 // releaseAPIChannel releases API channel that needs to be closed.
 func (c *Connection) releaseAPIChannel(ch *api.Channel, chMeta *channelMetadata) {
        log.WithFields(logger.Fields{
@@ -243,101 +373,3 @@ func (c *Connection) releaseAPIChannel(ch *api.Channel, chMeta *channelMetadata)
        delete(c.channels, chMeta.id)
        c.channelsLock.Unlock()
 }
-
-// msgCallback is called whenever any binary API message comes from VPP.
-func msgCallback(context uint32, msgID uint16, data []byte) {
-       connLock.RLock()
-       defer connLock.RUnlock()
-
-       if conn == nil {
-               log.Warn("Already disconnected, ignoring the message.")
-               return
-       }
-
-       log.WithFields(logger.Fields{
-               "context":  context,
-               "msg_id":   msgID,
-               "msg_size": len(data),
-       }).Debug("Received a message from VPP.")
-
-       if context == 0 || conn.isNotificationMessage(msgID) {
-               // process the message as a notification
-               conn.sendNotifications(msgID, data)
-               return
-       }
-
-       // match ch according to the context
-       conn.channelsLock.RLock()
-       ch, ok := conn.channels[context]
-       conn.channelsLock.RUnlock()
-
-       if !ok {
-               log.WithFields(logger.Fields{
-                       "context": context,
-                       "msg_id":  msgID,
-               }).Error("Context ID not known, ignoring the message.")
-               return
-       }
-
-       chMeta := ch.Metadata().(*channelMetadata)
-       lastReplyReceived := false
-       // if this is a control ping reply and multipart request is being processed, treat this as a last part of the reply
-       if msgID == conn.pingReplyID && atomic.CompareAndSwapUint32(&chMeta.multipart, 1, 0) {
-               lastReplyReceived = true
-       }
-
-       // send the data to the channel
-       sendReply(ch, &api.VppReply{
-               MessageID:         msgID,
-               Data:              data,
-               LastReplyReceived: lastReplyReceived,
-       })
-}
-
-// sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise
-// it logs the error and do not send the message.
-func sendReply(ch *api.Channel, reply *api.VppReply) {
-       select {
-       case ch.ReplyChan <- reply:
-               // reply sent successfully
-       default:
-               // unable to write into the channel without blocking
-               log.WithFields(logger.Fields{
-                       "channel": ch,
-                       "msg_id":  reply.MessageID,
-               }).Warn("Unable to send the reply, reciever end not ready.")
-       }
-}
-
-// GetMessageID returns message identifier of given API message.
-func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
-       return c.messageNameToID(msg.GetMessageName(), msg.GetCrcString())
-}
-
-// messageNameToID returns message ID of a message identified by its name and CRC.
-func (c *Connection) messageNameToID(msgName string, msgCrc string) (uint16, error) {
-       // try to get the ID from the map
-       c.msgIDsLock.RLock()
-       id, ok := c.msgIDs[msgName+msgCrc]
-       c.msgIDsLock.RUnlock()
-       if ok {
-               return id, nil
-       }
-
-       // get the ID using VPP API
-       id, err := c.vpp.GetMsgID(msgName, msgCrc)
-       if err != nil {
-               error := fmt.Errorf("unable to retrieve message ID: %v", err)
-               log.WithFields(logger.Fields{
-                       "msg_name": msgName,
-                       "msg_crc":  msgCrc,
-               }).Errorf("unable to retrieve message ID: %v", err)
-               return id, error
-       }
-
-       c.msgIDsLock.Lock()
-       c.msgIDs[msgName+msgCrc] = id
-       c.msgIDsLock.Unlock()
-
-       return id, nil
-}