// See the License for the specific language governing permissions and
// limitations under the License.
-package core
+//go:generate binapi-generator --input-dir=bin_api --output-dir=bin_api
-//go:generate binapi_generator --input-file=/usr/share/vpp/api/vpe.api.json --output-dir=./bin_api
+package core
import (
"errors"
- "fmt"
"os"
"sync"
"sync/atomic"
+ "time"
- logger "github.com/Sirupsen/logrus"
+ logger "github.com/sirupsen/logrus"
"git.fd.io/govpp.git/adapter"
"git.fd.io/govpp.git/api"
"git.fd.io/govpp.git/core/bin_api/vpe"
)
+var (
+ msgControlPing api.Message = &vpe.ControlPing{}
+ msgControlPingReply api.Message = &vpe.ControlPingReply{}
+)
+
const (
- requestChannelBufSize = 100 // default size of the request channel buffers
- replyChannelBufSize = 100 // default size of the reply channel buffers
+ requestChannelBufSize = 100 // default size of the request channel buffers
+ replyChannelBufSize = 100 // default size of the reply channel buffers
+ notificationChannelBufSize = 100 // default size of the notification channel buffers
)
+var (
+ healthCheckProbeInterval = time.Second * 1 // default health check probe interval
+ healthCheckReplyTimeout = time.Millisecond * 100 // timeout for reply to a health check probe
+ healthCheckThreshold = 1 // number of failed healthProbe until the error is reported
+)
+
+// ConnectionState holds the current state of the connection to VPP.
+type ConnectionState int
+
+const (
+ // Connected connection state means that the connection to VPP has been successfully established.
+ Connected ConnectionState = iota
+
+ // Disconnected connection state means that the connection to VPP has been lost.
+ Disconnected
+)
+
+// ConnectionEvent is a notification about change in the VPP connection state.
+type ConnectionEvent struct {
+ // Timestamp holds the time when the event has been generated.
+ Timestamp time.Time
+
+ // State holds the new state of the connection to VPP at the time when the event has been generated.
+ State ConnectionState
+}
+
// Connection represents a shared memory connection to VPP via vppAdapter.
type Connection struct {
- vpp adapter.VppAdapter // VPP adapter
- codec *MsgCodec // message codec
+ vpp adapter.VppAdapter // VPP adapter
+ connected uint32 // non-zero if the adapter is connected to VPP
+ codec *MsgCodec // message codec
- msgIDs map[string]uint16 // map os message IDs indexed by message name + CRC
msgIDsLock sync.RWMutex // lock for the message IDs map
+ msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC
- channels map[uint32]*api.Channel // map of all API channels indexed by the channel ID
channelsLock sync.RWMutex // lock for the channels map
+ channels map[uint32]*api.Channel // map of all API channels indexed by the channel ID
- notifSubscriptions map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID
notifSubscriptionsLock sync.RWMutex // lock for the subscriptions map
+ notifSubscriptions map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID
maxChannelID uint32 // maximum used client ID
pingReqID uint16 // ID if the ControlPing message
pingReplyID uint16 // ID of the ControlPingReply message
+
+ lastReplyLock sync.Mutex // lock for the last reply
+ lastReply time.Time // time of the last received reply from VPP
}
// channelMetadata contains core-local metadata of an API channel.
log = l
}
+// SetHealthCheckProbeInterval sets health check probe interval.
+// Beware: Function is not thread-safe. It is recommended to setup this parameter
+// before connecting to vpp.
+func SetHealthCheckProbeInterval(interval time.Duration) {
+ healthCheckProbeInterval = interval
+}
+
+// SetHealthCheckReplyTimeout sets timeout for reply to a health check probe.
+// If reply arrives after the timeout, check is considered as failed.
+// Beware: Function is not thread-safe. It is recommended to setup this parameter
+// before connecting to vpp.
+func SetHealthCheckReplyTimeout(timeout time.Duration) {
+ healthCheckReplyTimeout = timeout
+}
+
+// SetHealthCheckThreshold sets the number of failed healthProbe checks until the error is reported.
+// Beware: Function is not thread-safe. It is recommended to setup this parameter
+// before connecting to vpp.
+func SetHealthCheckThreshold(threshold int) {
+ healthCheckThreshold = threshold
+}
+
+// SetControlPingMessages sets the messages for ControlPing and ControlPingReply
+func SetControlPingMessages(controPing, controlPingReply api.Message) {
+ msgControlPing = controPing
+ msgControlPingReply = controlPingReply
+}
+
// Connect connects to VPP using specified VPP adapter and returns the connection handle.
+// This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed.
func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) {
+ // create new connection handle
+ c, err := newConnection(vppAdapter)
+ if err != nil {
+ return nil, err
+ }
+
+ // blocking attempt to connect to VPP
+ err = c.connectVPP()
+ if err != nil {
+ return nil, err
+ }
+
+ return conn, nil
+}
+
+// AsyncConnect asynchronously connects to VPP using specified VPP adapter and returns the connection handle
+// and ConnectionState channel. This call does not block until connection is established, it
+// returns immediately. The caller is supposed to watch the returned ConnectionState channel for
+// Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect.
+func AsyncConnect(vppAdapter adapter.VppAdapter) (*Connection, chan ConnectionEvent, error) {
+ // create new connection handle
+ c, err := newConnection(vppAdapter)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ // asynchronously attempt to connect to VPP
+ connChan := make(chan ConnectionEvent, notificationChannelBufSize)
+ go c.connectLoop(connChan)
+
+ return conn, connChan, nil
+}
+
+// Disconnect disconnects from VPP and releases all connection-related resources.
+func (c *Connection) Disconnect() {
+ if c == nil {
+ return
+ }
+ connLock.Lock()
+ defer connLock.Unlock()
+
+ if c != nil && c.vpp != nil {
+ c.disconnectVPP()
+ }
+ conn = nil
+}
+
+// newConnection returns new connection handle.
+func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) {
connLock.Lock()
defer connLock.Unlock()
return nil, errors.New("only one connection per process is supported")
}
- conn = &Connection{vpp: vppAdapter, codec: &MsgCodec{}}
- conn.channels = make(map[uint32]*api.Channel)
- conn.msgIDs = make(map[string]uint16)
- conn.notifSubscriptions = make(map[uint16][]*api.NotifSubscription)
+ conn = &Connection{
+ vpp: vppAdapter,
+ codec: &MsgCodec{},
+ channels: make(map[uint32]*api.Channel),
+ msgIDs: make(map[string]uint16),
+ notifSubscriptions: make(map[uint16][]*api.NotifSubscription),
+ }
conn.vpp.SetMsgCallback(msgCallback)
+ return conn, nil
+}
- logger.Debug("Connecting to VPP...")
+// connectVPP performs one blocking attempt to connect to VPP.
+func (c *Connection) connectVPP() error {
+ log.Debug("Connecting to VPP...")
- err := conn.vpp.Connect()
+ // blocking connect
+ err := c.vpp.Connect()
if err != nil {
- return nil, err
+ log.Warn(err)
+ return err
}
// store control ping IDs
- conn.pingReqID, _ = conn.GetMessageID(&vpe.ControlPing{})
- conn.pingReplyID, _ = conn.GetMessageID(&vpe.ControlPingReply{})
+ if c.pingReqID, err = c.GetMessageID(msgControlPing); err != nil {
+ c.vpp.Disconnect()
+ return err
+ }
+ if c.pingReplyID, err = c.GetMessageID(msgControlPingReply); err != nil {
+ c.vpp.Disconnect()
+ return err
+ }
- logger.Debug("VPP connected.")
+ // store connected state
+ atomic.StoreUint32(&c.connected, 1)
- return conn, nil
+ log.Info("Connected to VPP.")
+ return nil
}
-// Disconnect disconnects from VPP.
-func (c *Connection) Disconnect() {
- connLock.Lock()
- defer connLock.Unlock()
-
- if c != nil && c.vpp != nil {
+// disconnectVPP disconnects from VPP in case it is connected.
+func (c *Connection) disconnectVPP() {
+ if atomic.CompareAndSwapUint32(&c.connected, 1, 0) {
c.vpp.Disconnect()
}
- conn = nil
+}
+
+// connectLoop attempts to connect to VPP until it succeeds.
+// Then it continues with healthCheckLoop.
+func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
+ // loop until connected
+ for {
+ if err := c.vpp.WaitReady(); err != nil {
+ log.Warnf("wait ready failed: %v", err)
+ }
+ if err := c.connectVPP(); err == nil {
+ // signal connected event
+ connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected}
+ break
+ } else {
+ log.Errorf("connecting to VPP failed: %v", err)
+ time.Sleep(time.Second)
+ }
+ }
+
+ // we are now connected, continue with health check loop
+ c.healthCheckLoop(connChan)
+}
+
+// healthCheckLoop checks whether connection to VPP is alive. In case of disconnect,
+// it continues with connectLoop and tries to reconnect.
+func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
+ // create a separate API channel for health check probes
+ ch, err := conn.NewAPIChannelBuffered(1, 1)
+ if err != nil {
+ log.Error("Failed to create health check API channel, health check will be disabled:", err)
+ return
+ }
+
+ var sinceLastReply time.Duration
+ var failedChecks int
+
+ // send health check probes until an error or timeout occurs
+ for {
+ // sleep until next health check probe period
+ time.Sleep(healthCheckProbeInterval)
+
+ if atomic.LoadUint32(&c.connected) == 0 {
+ // Disconnect has been called in the meantime, return the healthcheck - reconnect loop
+ log.Debug("Disconnected on request, exiting health check loop.")
+ return
+ }
+
+ // try draining probe replies from previous request before sending next one
+ select {
+ case <-ch.ReplyChan:
+ log.Debug("drained old probe reply from reply channel")
+ default:
+ }
+
+ // send the control ping request
+ ch.ReqChan <- &api.VppRequest{Message: msgControlPing}
+
+ for {
+ // expect response within timeout period
+ select {
+ case vppReply := <-ch.ReplyChan:
+ err = vppReply.Error
+
+ case <-time.After(healthCheckReplyTimeout):
+ err = ErrProbeTimeout
+
+ // check if time since last reply from any other
+ // channel is less than health check reply timeout
+ conn.lastReplyLock.Lock()
+ sinceLastReply = time.Since(c.lastReply)
+ conn.lastReplyLock.Unlock()
+
+ if sinceLastReply < healthCheckReplyTimeout {
+ log.Warnf("VPP health check probe timing out, but some request on other channel was received %v ago, continue waiting!", sinceLastReply)
+ continue
+ }
+ }
+ break
+ }
+
+ if err == ErrProbeTimeout {
+ failedChecks++
+ log.Warnf("VPP health check probe timed out after %v (%d. timeout)", healthCheckReplyTimeout, failedChecks)
+ if failedChecks > healthCheckThreshold {
+ // in case of exceeded treshold disconnect
+ log.Errorf("VPP health check exceeded treshold for timeouts (>%d), assuming disconnect", healthCheckThreshold)
+ connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
+ break
+ }
+ } else if err != nil {
+ // in case of error disconnect
+ log.Errorf("VPP health check probe failed: %v", err)
+ connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
+ break
+ } else if failedChecks > 0 {
+ failedChecks = 0
+ log.Infof("VPP health check probe OK")
+ }
+ }
+
+ // cleanup
+ ch.Close()
+ c.disconnectVPP()
+
+ // we are now disconnected, start connect loop
+ c.connectLoop(connChan)
}
// NewAPIChannel returns a new API channel for communication with VPP via govpp core.
// It uses default buffer sizes for the request and reply Go channels.
func (c *Connection) NewAPIChannel() (*api.Channel, error) {
+ if c == nil {
+ return nil, errors.New("nil connection passed in")
+ }
return c.NewAPIChannelBuffered(requestChannelBufSize, replyChannelBufSize)
}
// NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core.
// It allows to specify custom buffer sizes for the request and reply Go channels.
func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (*api.Channel, error) {
+ if c == nil {
+ return nil, errors.New("nil connection passed in")
+ }
chID := atomic.AddUint32(&c.maxChannelID, 1)
chMeta := &channelMetadata{id: chID}
return ch, nil
}
-// watchRequests watches for requests on the request API channel and forwards them as messages to VPP.
-func (c *Connection) watchRequests(ch *api.Channel, chMeta *channelMetadata) {
- for {
- select {
- case req, ok := <-ch.ReqChan:
- // new request on the request channel
- if !ok {
- // after closing the request channel, release API channel and return
- c.releaseAPIChannel(ch, chMeta)
- return
- }
- c.processRequest(ch, chMeta, req)
-
- case req := <-ch.NotifSubsChan:
- // new request on the notification subscribe channel
- c.processNotifSubscribeRequest(ch, req)
- }
- }
-}
-
-// processRequest processes a single request received on the request channel.
-func (c *Connection) processRequest(ch *api.Channel, chMeta *channelMetadata, req *api.VppRequest) error {
- // retrieve message ID
- msgID, err := c.GetMessageID(req.Message)
- if err != nil {
- error := fmt.Errorf("unable to retrieve message ID: %v", err)
- log.WithFields(logger.Fields{
- "msg_name": req.Message.GetMessageName(),
- "msg_crc": req.Message.GetCrcString(),
- }).Errorf("unable to retrieve message ID: %v", err)
- sendReply(ch, &api.VppReply{Error: error})
- return error
- }
-
- // encode the message into binary
- data, err := c.codec.EncodeMsg(req.Message, msgID)
- if err != nil {
- error := fmt.Errorf("unable to encode the messge: %v", err)
- log.WithFields(logger.Fields{
- "context": chMeta.id,
- "msg_id": msgID,
- }).Errorf("%v", error)
- sendReply(ch, &api.VppReply{Error: error})
- return error
- }
-
- // send the message
- log.WithFields(logger.Fields{
- "context": chMeta.id,
- "msg_id": msgID,
- "msg_size": len(data),
- }).Debug("Sending a message to VPP.")
-
- c.vpp.SendMsg(chMeta.id, data)
-
- if req.Multipart {
- // multipart request
- atomic.StoreUint32(&chMeta.multipart, 1)
-
- // send a control ping
- ping := &vpe.ControlPing{}
- pingData, _ := c.codec.EncodeMsg(ping, c.pingReqID)
-
- log.WithFields(logger.Fields{
- "context": chMeta.id,
- "msg_id": c.pingReqID,
- "msg_size": len(pingData),
- }).Debug("Sending a control ping to VPP.")
-
- c.vpp.SendMsg(chMeta.id, pingData)
- }
-
- return nil
-}
-
// releaseAPIChannel releases API channel that needs to be closed.
func (c *Connection) releaseAPIChannel(ch *api.Channel, chMeta *channelMetadata) {
log.WithFields(logger.Fields{
delete(c.channels, chMeta.id)
c.channelsLock.Unlock()
}
-
-// msgCallback is called whenever any binary API message comes from VPP.
-func msgCallback(context uint32, msgID uint16, data []byte) {
- connLock.RLock()
- defer connLock.RUnlock()
-
- if conn == nil {
- log.Warn("Already disconnected, ignoring the message.")
- return
- }
-
- log.WithFields(logger.Fields{
- "context": context,
- "msg_id": msgID,
- "msg_size": len(data),
- }).Debug("Received a message from VPP.")
-
- if context == 0 || conn.isNotificationMessage(msgID) {
- // process the message as a notification
- conn.sendNotifications(msgID, data)
- return
- }
-
- // match ch according to the context
- conn.channelsLock.RLock()
- ch, ok := conn.channels[context]
- conn.channelsLock.RUnlock()
-
- if !ok {
- log.WithFields(logger.Fields{
- "context": context,
- "msg_id": msgID,
- }).Error("Context ID not known, ignoring the message.")
- return
- }
-
- chMeta := ch.Metadata().(*channelMetadata)
- lastReplyReceived := false
- // if this is a control ping reply and multipart request is being processed, treat this as a last part of the reply
- if msgID == conn.pingReplyID && atomic.CompareAndSwapUint32(&chMeta.multipart, 1, 0) {
- lastReplyReceived = true
- }
-
- // send the data to the channel
- sendReply(ch, &api.VppReply{
- MessageID: msgID,
- Data: data,
- LastReplyReceived: lastReplyReceived,
- })
-}
-
-// sendReply sends the reply into the go channel, if it cannot be completed without blocking, otherwise
-// it logs the error and do not send the message.
-func sendReply(ch *api.Channel, reply *api.VppReply) {
- select {
- case ch.ReplyChan <- reply:
- // reply sent successfully
- default:
- // unable to write into the channel without blocking
- log.WithFields(logger.Fields{
- "channel": ch,
- "msg_id": reply.MessageID,
- }).Warn("Unable to send the reply, reciever end not ready.")
- }
-}
-
-// GetMessageID returns message identifier of given API message.
-func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
- return c.messageNameToID(msg.GetMessageName(), msg.GetCrcString())
-}
-
-// messageNameToID returns message ID of a message identified by its name and CRC.
-func (c *Connection) messageNameToID(msgName string, msgCrc string) (uint16, error) {
- // try to get the ID from the map
- c.msgIDsLock.RLock()
- id, ok := c.msgIDs[msgName+msgCrc]
- c.msgIDsLock.RUnlock()
- if ok {
- return id, nil
- }
-
- // get the ID using VPP API
- id, err := c.vpp.GetMsgID(msgName, msgCrc)
- if err != nil {
- error := fmt.Errorf("unable to retrieve message ID: %v", err)
- log.WithFields(logger.Fields{
- "msg_name": msgName,
- "msg_crc": msgCrc,
- }).Errorf("unable to retrieve message ID: %v", err)
- return id, error
- }
-
- c.msgIDsLock.Lock()
- c.msgIDs[msgName+msgCrc] = id
- c.msgIDsLock.Unlock()
-
- return id, nil
-}