X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=core%2Fcore.go;h=ebe7f680d317150ef1d9c9217199026a06c24e67;hb=392a56b700fa6715d091e56c49f79bfe32613fc6;hp=897790268c3f493eec974b8ff27f7967543e835b;hpb=0b6ef19a7de5b073b7818013930cc7f07526c929;p=govpp.git diff --git a/core/core.go b/core/core.go index 8977902..ebe7f68 100644 --- a/core/core.go +++ b/core/core.go @@ -23,22 +23,28 @@ import ( "sync/atomic" "time" - logger "github.com/Sirupsen/logrus" + logger "github.com/sirupsen/logrus" "git.fd.io/govpp.git/adapter" "git.fd.io/govpp.git/api" "git.fd.io/govpp.git/core/bin_api/vpe" ) +var ( + msgControlPing api.Message = &vpe.ControlPing{} + msgControlPingReply api.Message = &vpe.ControlPingReply{} +) + const ( requestChannelBufSize = 100 // default size of the request channel buffers replyChannelBufSize = 100 // default size of the reply channel buffers notificationChannelBufSize = 100 // default size of the notification channel buffers ) -const ( +var ( healthCheckProbeInterval = time.Second * 1 // default health check probe interval healthCheckReplyTimeout = time.Millisecond * 100 // timeout for reply to a health check probe + healthCheckThreshold = 1 // number of failed healthProbe until the error is reported ) // ConnectionState holds the current state of the connection to VPP. @@ -67,14 +73,14 @@ type Connection struct { connected uint32 // non-zero if the adapter is connected to VPP codec *MsgCodec // message codec - msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC msgIDsLock sync.RWMutex // lock for the message IDs map + msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC - channels map[uint32]*api.Channel // map of all API channels indexed by the channel ID channelsLock sync.RWMutex // lock for the channels map + channels map[uint32]*api.Channel // map of all API channels indexed by the channel ID - notifSubscriptions map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID notifSubscriptionsLock sync.RWMutex // lock for the subscriptions map + notifSubscriptions map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID maxChannelID uint32 // maximum used client ID pingReqID uint16 // ID if the ControlPing message @@ -105,6 +111,34 @@ func SetLogger(l *logger.Logger) { log = l } +// SetHealthCheckProbeInterval sets health check probe interval. +// Beware: Function is not thread-safe. It is recommended to setup this parameter +// before connecting to vpp. +func SetHealthCheckProbeInterval(interval time.Duration) { + healthCheckProbeInterval = interval +} + +// SetHealthCheckReplyTimeout sets timeout for reply to a health check probe. +// If reply arrives after the timeout, check is considered as failed. +// Beware: Function is not thread-safe. It is recommended to setup this parameter +// before connecting to vpp. +func SetHealthCheckReplyTimeout(timeout time.Duration) { + healthCheckReplyTimeout = timeout +} + +// SetHealthCheckThreshold sets the number of failed healthProbe checks until the error is reported. +// Beware: Function is not thread-safe. It is recommended to setup this parameter +// before connecting to vpp. +func SetHealthCheckThreshold(threshold int) { + healthCheckThreshold = threshold +} + +// SetControlPingMessages sets the messages for ControlPing and ControlPingReply +func SetControlPingMessages(controPing, controlPingReply api.Message) { + msgControlPing = controPing + msgControlPingReply = controlPingReply +} + // Connect connects to VPP using specified VPP adapter and returns the connection handle. // This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed. func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) { @@ -164,10 +198,13 @@ func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) { return nil, errors.New("only one connection per process is supported") } - conn = &Connection{vpp: vppAdapter, codec: &MsgCodec{}} - conn.channels = make(map[uint32]*api.Channel) - conn.msgIDs = make(map[string]uint16) - conn.notifSubscriptions = make(map[uint16][]*api.NotifSubscription) + conn = &Connection{ + vpp: vppAdapter, + codec: &MsgCodec{}, + channels: make(map[uint32]*api.Channel), + msgIDs: make(map[string]uint16), + notifSubscriptions: make(map[uint16][]*api.NotifSubscription), + } conn.vpp.SetMsgCallback(msgCallback) return conn, nil @@ -184,13 +221,19 @@ func (c *Connection) connectVPP() error { return err } + // store control ping IDs + if c.pingReqID, err = c.GetMessageID(msgControlPing); err != nil { + c.vpp.Disconnect() + return err + } + if c.pingReplyID, err = c.GetMessageID(msgControlPingReply); err != nil { + c.vpp.Disconnect() + return err + } + // store connected state atomic.StoreUint32(&c.connected, 1) - // store control ping IDs - c.pingReqID, _ = c.GetMessageID(&vpe.ControlPing{}) - c.pingReplyID, _ = c.GetMessageID(&vpe.ControlPingReply{}) - log.Info("Connected to VPP.") return nil } @@ -207,11 +250,16 @@ func (c *Connection) disconnectVPP() { func (c *Connection) connectLoop(connChan chan ConnectionEvent) { // loop until connected for { - err := c.connectVPP() - if err == nil { + if err := c.vpp.WaitReady(); err != nil { + log.Warnf("wait ready failed: %v", err) + } + if err := c.connectVPP(); err == nil { // signal connected event connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected} break + } else { + log.Errorf("connecting to VPP failed: %v", err) + time.Sleep(time.Second) } } @@ -229,6 +277,7 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { return } + failedChecks := 0 // send health check probes until an error occurs for { // wait for healthCheckProbeInterval @@ -241,7 +290,7 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { } // send the control ping - ch.ReqChan <- &api.VppRequest{Message: &vpe.ControlPing{}} + ch.ReqChan <- &api.VppRequest{Message: msgControlPing} // expect response within timeout period select { @@ -251,8 +300,14 @@ func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) { err = errors.New("probe reply not received within the timeout period") } - // in case of error, break & disconnect if err != nil { + failedChecks++ + } else { + failedChecks = 0 + } + + if failedChecks > healthCheckThreshold { + // in case of error, break & disconnect log.Errorf("VPP health check failed: %v", err) // signal disconnected event via channel connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}