"git.fd.io/govpp.git/codec"
)
+const (
+ DefaultReconnectInterval = time.Second / 2 // default interval between reconnect attempts
+ DefaultMaxReconnectAttempts = 3 // default maximum number of reconnect attempts
+)
+
var (
RequestChanBufSize = 100 // default size of the request channel buffer
ReplyChanBufSize = 100 // default size of the reply channel buffer
)
var (
- HealthCheckProbeInterval = time.Second * 1 // default health check probe interval
+ HealthCheckProbeInterval = time.Second // default health check probe interval
HealthCheckReplyTimeout = time.Millisecond * 100 // timeout for reply to a health check probe
HealthCheckThreshold = 1 // number of failed health checks until the error is reported
- DefaultReplyTimeout = time.Second * 1 // default timeout for replies from VPP
- ReconnectInterval = time.Second * 1 // default interval for reconnect attempts
- MaxReconnectAttempts = 10 // maximum number of reconnect attempts
+ DefaultReplyTimeout = time.Second // default timeout for replies from VPP
)
// ConnectionState represents the current state of the connection to VPP.
Failed
)
+func (s ConnectionState) String() string {
+ switch s {
+ case Connected:
+ return "Connected"
+ case Disconnected:
+ return "Disconnected"
+ case Failed:
+ return "Failed"
+ default:
+ return fmt.Sprintf("UnknownState(%d)", s)
+ }
+}
+
// ConnectionEvent is a notification about change in the VPP connection state.
type ConnectionEvent struct {
// Timestamp holds the time when the event has been created.
// Connection represents a shared memory connection to VPP via vppAdapter.
type Connection struct {
- vppClient adapter.VppAPI // VPP binary API client adapter
+ vppClient adapter.VppAPI // VPP binary API client
+
+ maxAttempts int // interval for reconnect attempts
+ recInterval time.Duration // maximum number of reconnect attempts
vppConnected uint32 // non-zero if the adapter is connected to VPP
lastReplyLock sync.Mutex // lock for the last reply
lastReply time.Time // time of the last received reply from VPP
+
+ msgControlPing api.Message
+ msgControlPingReply api.Message
}
-func newConnection(binapi adapter.VppAPI) *Connection {
+func newConnection(binapi adapter.VppAPI, attempts int, interval time.Duration) *Connection {
+ if attempts == 0 {
+ attempts = DefaultMaxReconnectAttempts
+ }
+ if interval == 0 {
+ interval = DefaultReconnectInterval
+ }
+
c := &Connection{
- vppClient: binapi,
- codec: &codec.MsgCodec{},
- msgIDs: make(map[string]uint16),
- msgMap: make(map[uint16]api.Message),
- channels: make(map[uint16]*Channel),
- subscriptions: make(map[uint16][]*subscriptionCtx),
+ vppClient: binapi,
+ maxAttempts: attempts,
+ recInterval: interval,
+ codec: &codec.MsgCodec{},
+ msgIDs: make(map[string]uint16),
+ msgMap: make(map[uint16]api.Message),
+ channels: make(map[uint16]*Channel),
+ subscriptions: make(map[uint16][]*subscriptionCtx),
+ msgControlPing: msgControlPing,
+ msgControlPingReply: msgControlPingReply,
}
binapi.SetMsgCallback(c.msgCallback)
return c
}
-// Connect connects to VPP using specified VPP adapter and returns the connection handle.
-// This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed.
+// Connect connects to VPP API using specified adapter and returns a connection handle.
+// This call blocks until it is either connected, or an error occurs.
+// Only one connection attempt will be performed.
func Connect(binapi adapter.VppAPI) (*Connection, error) {
// create new connection handle
- c := newConnection(binapi)
+ c := newConnection(binapi, DefaultMaxReconnectAttempts, DefaultReconnectInterval)
// blocking attempt to connect to VPP
if err := c.connectVPP(); err != nil {
// and ConnectionState channel. This call does not block until connection is established, it
// returns immediately. The caller is supposed to watch the returned ConnectionState channel for
// Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect.
-func AsyncConnect(binapi adapter.VppAPI) (*Connection, chan ConnectionEvent, error) {
+func AsyncConnect(binapi adapter.VppAPI, attempts int, interval time.Duration) (*Connection, chan ConnectionEvent, error) {
// create new connection handle
- c := newConnection(binapi)
+ c := newConnection(binapi, attempts, interval)
// asynchronously attempt to connect to VPP
connChan := make(chan ConnectionEvent, NotificationChanBufSize)
if err := c.vppClient.Connect(); err != nil {
return err
}
-
- log.Debugf("Connected to VPP.")
+ log.Debugf("Connected to VPP")
if err := c.retrieveMessageIDs(); err != nil {
- c.vppClient.Disconnect()
+ if err := c.vppClient.Disconnect(); err != nil {
+ log.Debugf("disconnecting vpp client failed: %v", err)
+ }
return fmt.Errorf("VPP is incompatible: %v", err)
}
return nil
}
-// Disconnect disconnects from VPP and releases all connection-related resources.
+// Disconnect disconnects from VPP API and releases all connection-related resources.
func (c *Connection) Disconnect() {
if c == nil {
return
}
-
if c.vppClient != nil {
c.disconnectVPP()
}
// disconnectVPP disconnects from VPP in case it is connected.
func (c *Connection) disconnectVPP() {
if atomic.CompareAndSwapUint32(&c.vppConnected, 1, 0) {
- c.vppClient.Disconnect()
+ log.Debug("Disconnecting from VPP..")
+
+ if err := c.vppClient.Disconnect(); err != nil {
+ log.Debugf("Disconnect from VPP failed: %v", err)
+ }
+ log.Debug("Disconnected from VPP")
}
}
// connectLoop attempts to connect to VPP until it succeeds.
// Then it continues with healthCheckLoop.
func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
- reconnectAttempts := 0
+ var reconnectAttempts int
// loop until connected
for {
if err := c.vppClient.WaitReady(); err != nil {
- log.Warnf("wait ready failed: %v", err)
+ log.Debugf("wait ready failed: %v", err)
}
if err := c.connectVPP(); err == nil {
// signal connected event
connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected}
break
- } else if reconnectAttempts < MaxReconnectAttempts {
+ } else if reconnectAttempts < c.maxAttempts {
reconnectAttempts++
- log.Errorf("connecting failed (attempt %d/%d): %v", reconnectAttempts, MaxReconnectAttempts, err)
- time.Sleep(ReconnectInterval)
+ log.Warnf("connecting failed (attempt %d/%d): %v", reconnectAttempts, c.maxAttempts, err)
+ time.Sleep(c.recInterval)
} else {
connChan <- ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err}
return
}
// send the control ping request
- ch.reqChan <- &vppRequest{msg: msgControlPing}
+ ch.reqChan <- &vppRequest{msg: c.msgControlPing}
for {
// expect response within timeout period
}
n++
- if c.pingReqID == 0 && msg.GetMessageName() == msgControlPing.GetMessageName() {
+ if c.pingReqID == 0 && msg.GetMessageName() == c.msgControlPing.GetMessageName() {
c.pingReqID = msgID
- msgControlPing = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
- } else if c.pingReplyID == 0 && msg.GetMessageName() == msgControlPingReply.GetMessageName() {
+ c.msgControlPing = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
+ } else if c.pingReplyID == 0 && msg.GetMessageName() == c.msgControlPingReply.GetMessageName() {
c.pingReplyID = msgID
- msgControlPingReply = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
+ c.msgControlPingReply = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
}
if debugMsgIDs {