// See the License for the specific language governing permissions and
// limitations under the License.
+//go:generate binapi-generator --input-dir=bin_api --output-dir=bin_api
+
package core
import (
"sync/atomic"
"time"
- logger "github.com/Sirupsen/logrus"
+ logger "github.com/sirupsen/logrus"
"git.fd.io/govpp.git/adapter"
"git.fd.io/govpp.git/api"
"git.fd.io/govpp.git/core/bin_api/vpe"
)
+var (
+ msgControlPing api.Message = &vpe.ControlPing{}
+ msgControlPingReply api.Message = &vpe.ControlPingReply{}
+)
+
const (
requestChannelBufSize = 100 // default size of the request channel buffers
replyChannelBufSize = 100 // default size of the reply channel buffers
notificationChannelBufSize = 100 // default size of the notification channel buffers
)
-const (
+var (
healthCheckProbeInterval = time.Second * 1 // default health check probe interval
healthCheckReplyTimeout = time.Millisecond * 100 // timeout for reply to a health check probe
+ healthCheckThreshold = 1 // number of failed healthProbe until the error is reported
)
// ConnectionState holds the current state of the connection to VPP.
Connected ConnectionState = iota
// Disconnected connection state means that the connection to VPP has been lost.
- Disconnected = iota
+ Disconnected
)
// ConnectionEvent is a notification about change in the VPP connection state.
connected uint32 // non-zero if the adapter is connected to VPP
codec *MsgCodec // message codec
- msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC
msgIDsLock sync.RWMutex // lock for the message IDs map
+ msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC
- channels map[uint32]*api.Channel // map of all API channels indexed by the channel ID
channelsLock sync.RWMutex // lock for the channels map
+ channels map[uint32]*api.Channel // map of all API channels indexed by the channel ID
- notifSubscriptions map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID
notifSubscriptionsLock sync.RWMutex // lock for the subscriptions map
+ notifSubscriptions map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID
maxChannelID uint32 // maximum used client ID
pingReqID uint16 // ID if the ControlPing message
pingReplyID uint16 // ID of the ControlPingReply message
+
+ lastReplyLock sync.Mutex // lock for the last reply
+ lastReply time.Time // time of the last received reply from VPP
}
// channelMetadata contains core-local metadata of an API channel.
log = l
}
+// SetHealthCheckProbeInterval sets health check probe interval.
+// Beware: Function is not thread-safe. It is recommended to setup this parameter
+// before connecting to vpp.
+func SetHealthCheckProbeInterval(interval time.Duration) {
+ healthCheckProbeInterval = interval
+}
+
+// SetHealthCheckReplyTimeout sets timeout for reply to a health check probe.
+// If reply arrives after the timeout, check is considered as failed.
+// Beware: Function is not thread-safe. It is recommended to setup this parameter
+// before connecting to vpp.
+func SetHealthCheckReplyTimeout(timeout time.Duration) {
+ healthCheckReplyTimeout = timeout
+}
+
+// SetHealthCheckThreshold sets the number of failed healthProbe checks until the error is reported.
+// Beware: Function is not thread-safe. It is recommended to setup this parameter
+// before connecting to vpp.
+func SetHealthCheckThreshold(threshold int) {
+ healthCheckThreshold = threshold
+}
+
+// SetControlPingMessages sets the messages for ControlPing and ControlPingReply
+func SetControlPingMessages(controPing, controlPingReply api.Message) {
+ msgControlPing = controPing
+ msgControlPingReply = controlPingReply
+}
+
// Connect connects to VPP using specified VPP adapter and returns the connection handle.
// This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed.
func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) {
return nil, errors.New("only one connection per process is supported")
}
- conn = &Connection{vpp: vppAdapter, codec: &MsgCodec{}}
- conn.channels = make(map[uint32]*api.Channel)
- conn.msgIDs = make(map[string]uint16)
- conn.notifSubscriptions = make(map[uint16][]*api.NotifSubscription)
+ conn = &Connection{
+ vpp: vppAdapter,
+ codec: &MsgCodec{},
+ channels: make(map[uint32]*api.Channel),
+ msgIDs: make(map[string]uint16),
+ notifSubscriptions: make(map[uint16][]*api.NotifSubscription),
+ }
conn.vpp.SetMsgCallback(msgCallback)
return conn, nil
return err
}
+ // store control ping IDs
+ if c.pingReqID, err = c.GetMessageID(msgControlPing); err != nil {
+ c.vpp.Disconnect()
+ return err
+ }
+ if c.pingReplyID, err = c.GetMessageID(msgControlPingReply); err != nil {
+ c.vpp.Disconnect()
+ return err
+ }
+
// store connected state
atomic.StoreUint32(&c.connected, 1)
- // store control ping IDs
- c.pingReqID, _ = c.GetMessageID(&vpe.ControlPing{})
- c.pingReplyID, _ = c.GetMessageID(&vpe.ControlPingReply{})
-
log.Info("Connected to VPP.")
return nil
}
func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
// loop until connected
for {
- err := c.connectVPP()
- if err == nil {
+ if err := c.vpp.WaitReady(); err != nil {
+ log.Warnf("wait ready failed: %v", err)
+ }
+ if err := c.connectVPP(); err == nil {
// signal connected event
connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected}
break
+ } else {
+ log.Errorf("connecting to VPP failed: %v", err)
+ time.Sleep(time.Second)
}
}
// it continues with connectLoop and tries to reconnect.
func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
// create a separate API channel for health check probes
- ch, err := conn.NewAPIChannel()
+ ch, err := conn.NewAPIChannelBuffered(1, 1)
if err != nil {
- log.Error("Error by creating health check API channel, health check will be disabled:", err)
+ log.Error("Failed to create health check API channel, health check will be disabled:", err)
return
}
- // send health check probes until an error occurs
+ var sinceLastReply time.Duration
+ var failedChecks int
+
+ // send health check probes until an error or timeout occurs
for {
- // wait for healthCheckProbeInterval
- <-time.After(healthCheckProbeInterval)
+ // sleep until next health check probe period
+ time.Sleep(healthCheckProbeInterval)
if atomic.LoadUint32(&c.connected) == 0 {
// Disconnect has been called in the meantime, return the healthcheck - reconnect loop
return
}
- // send the control ping
- ch.ReqChan <- &api.VppRequest{Message: &vpe.ControlPing{}}
-
- // expect response within timeout period
+ // try draining probe replies from previous request before sending next one
select {
- case vppReply := <-ch.ReplyChan:
- err = vppReply.Error
- case <-time.After(healthCheckReplyTimeout):
- err = errors.New("probe reply not received within the timeout period")
+ case <-ch.ReplyChan:
+ log.Debug("drained old probe reply from reply channel")
+ default:
+ }
+
+ // send the control ping request
+ ch.ReqChan <- &api.VppRequest{Message: msgControlPing}
+
+ for {
+ // expect response within timeout period
+ select {
+ case vppReply := <-ch.ReplyChan:
+ err = vppReply.Error
+
+ case <-time.After(healthCheckReplyTimeout):
+ err = ErrProbeTimeout
+
+ // check if time since last reply from any other
+ // channel is less than health check reply timeout
+ conn.lastReplyLock.Lock()
+ sinceLastReply = time.Since(c.lastReply)
+ conn.lastReplyLock.Unlock()
+
+ if sinceLastReply < healthCheckReplyTimeout {
+ log.Warnf("VPP health check probe timing out, but some request on other channel was received %v ago, continue waiting!", sinceLastReply)
+ continue
+ }
+ }
+ break
}
- // in case of error, break & disconnect
- if err != nil {
- log.Errorf("VPP health check failed: %v", err)
- // signal disconnected event via channel
+ if err == ErrProbeTimeout {
+ failedChecks++
+ log.Warnf("VPP health check probe timed out after %v (%d. timeout)", healthCheckReplyTimeout, failedChecks)
+ if failedChecks > healthCheckThreshold {
+ // in case of exceeded treshold disconnect
+ log.Errorf("VPP health check exceeded treshold for timeouts (>%d), assuming disconnect", healthCheckThreshold)
+ connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
+ break
+ }
+ } else if err != nil {
+ // in case of error disconnect
+ log.Errorf("VPP health check probe failed: %v", err)
connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
break
+ } else if failedChecks > 0 {
+ failedChecks = 0
+ log.Infof("VPP health check probe OK")
}
}