1 // Copyright (c) 2017 Cisco and/or its affiliates.
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
7 // http://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
26 logger "github.com/sirupsen/logrus"
28 "git.fd.io/govpp.git/adapter"
29 "git.fd.io/govpp.git/api"
30 "git.fd.io/govpp.git/codec"
34 DefaultReconnectInterval = time.Second / 2 // default interval between reconnect attempts
35 DefaultMaxReconnectAttempts = 3 // default maximum number of reconnect attempts
39 RequestChanBufSize = 100 // default size of the request channel buffer
40 ReplyChanBufSize = 100 // default size of the reply channel buffer
41 NotificationChanBufSize = 100 // default size of the notification channel buffer
45 HealthCheckProbeInterval = time.Second // default health check probe interval
46 HealthCheckReplyTimeout = time.Millisecond * 250 // timeout for reply to a health check probe
47 HealthCheckThreshold = 2 // number of failed health checks until the error is reported
48 DefaultReplyTimeout = time.Second // default timeout for replies from VPP
51 // ConnectionState represents the current state of the connection to VPP.
52 type ConnectionState int
55 // Connected represents state in which the connection has been successfully established.
56 Connected ConnectionState = iota
58 // NotResponding represents a state where the VPP socket accepts messages but replies are received with delay,
59 // or not at all. GoVPP treats this state internally the same as disconnected.
62 // Disconnected represents state in which the VPP socket is closed and the connection is considered dropped.
65 // Failed represents state in which the reconnecting failed after exceeding maximum number of attempts.
69 func (s ConnectionState) String() string {
74 return "NotResponding"
80 return fmt.Sprintf("UnknownState(%d)", s)
84 // ConnectionEvent is a notification about change in the VPP connection state.
85 type ConnectionEvent struct {
86 // Timestamp holds the time when the event has been created.
89 // State holds the new state of the connection at the time when the event has been created.
92 // Error holds error if any encountered.
96 // Connection represents a shared memory connection to VPP via vppAdapter.
97 type Connection struct {
98 vppClient adapter.VppAPI // VPP binary API client
100 maxAttempts int // interval for reconnect attempts
101 recInterval time.Duration // maximum number of reconnect attempts
103 vppConnected uint32 // non-zero if the adapter is connected to VPP
105 connChan chan ConnectionEvent // connection status events are sent to this channel
107 codec MessageCodec // message codec
108 msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC
109 msgMapByPath map[string]map[uint16]api.Message // map of messages indexed by message ID which are indexed by path
111 maxChannelID uint32 // maximum used channel ID (the real limit is 2^15, 32-bit is used for atomic operations)
112 channelsLock sync.RWMutex // lock for the channels map
113 channels map[uint16]*Channel // map of all API channels indexed by the channel ID
115 subscriptionsLock sync.RWMutex // lock for the subscriptions map
116 subscriptions map[uint16][]*subscriptionCtx // map od all notification subscriptions indexed by message ID
118 pingReqID uint16 // ID if the ControlPing message
119 pingReplyID uint16 // ID of the ControlPingReply message
121 lastReplyLock sync.Mutex // lock for the last reply
122 lastReply time.Time // time of the last received reply from VPP
124 msgControlPing api.Message
125 msgControlPingReply api.Message
127 apiTrace *trace // API tracer (disabled by default)
130 func newConnection(binapi adapter.VppAPI, attempts int, interval time.Duration) *Connection {
132 attempts = DefaultMaxReconnectAttempts
135 interval = DefaultReconnectInterval
140 maxAttempts: attempts,
141 recInterval: interval,
142 connChan: make(chan ConnectionEvent, NotificationChanBufSize),
143 codec: codec.DefaultCodec,
144 msgIDs: make(map[string]uint16),
145 msgMapByPath: make(map[string]map[uint16]api.Message),
146 channels: make(map[uint16]*Channel),
147 subscriptions: make(map[uint16][]*subscriptionCtx),
148 msgControlPing: msgControlPing,
149 msgControlPingReply: msgControlPingReply,
151 list: make([]*api.Record, 0),
155 binapi.SetMsgCallback(c.msgCallback)
159 // Connect connects to VPP API using specified adapter and returns a connection handle.
160 // This call blocks until it is either connected, or an error occurs.
161 // Only one connection attempt will be performed.
162 func Connect(binapi adapter.VppAPI) (*Connection, error) {
163 // create new connection handle
164 c := newConnection(binapi, DefaultMaxReconnectAttempts, DefaultReconnectInterval)
166 // blocking attempt to connect to VPP
167 if err := c.connectVPP(); err != nil {
174 // AsyncConnect asynchronously connects to VPP using specified VPP adapter and returns the connection handle
175 // and ConnectionState channel. This call does not block until connection is established, it
176 // returns immediately. The caller is supposed to watch the returned ConnectionState channel for
177 // Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect.
178 func AsyncConnect(binapi adapter.VppAPI, attempts int, interval time.Duration) (*Connection, chan ConnectionEvent, error) {
179 // create new connection handle
180 c := newConnection(binapi, attempts, interval)
182 // asynchronously attempt to connect to VPP
185 return c, c.connChan, nil
188 // connectVPP performs blocking attempt to connect to VPP.
189 func (c *Connection) connectVPP() error {
190 log.Debug("Connecting to VPP..")
193 if err := c.vppClient.Connect(); err != nil {
196 log.Debugf("Connected to VPP")
198 if err := c.retrieveMessageIDs(); err != nil {
199 if err := c.vppClient.Disconnect(); err != nil {
200 log.Debugf("disconnecting vpp client failed: %v", err)
202 return fmt.Errorf("VPP is incompatible: %v", err)
205 // store connected state
206 atomic.StoreUint32(&c.vppConnected, 1)
211 // Disconnect disconnects from VPP API and releases all connection-related resources.
212 func (c *Connection) Disconnect() {
216 if c.vppClient != nil {
221 // disconnectVPP disconnects from VPP in case it is connected.
222 func (c *Connection) disconnectVPP() {
223 if atomic.CompareAndSwapUint32(&c.vppConnected, 1, 0) {
224 log.Debug("Disconnecting from VPP..")
226 if err := c.vppClient.Disconnect(); err != nil {
227 log.Debugf("Disconnect from VPP failed: %v", err)
229 log.Debug("Disconnected from VPP")
233 func (c *Connection) NewAPIChannel() (api.Channel, error) {
234 return c.newAPIChannel(RequestChanBufSize, ReplyChanBufSize)
237 func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (api.Channel, error) {
238 return c.newAPIChannel(reqChanBufSize, replyChanBufSize)
241 // NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core.
242 // It allows to specify custom buffer sizes for the request and reply Go channels.
243 func (c *Connection) newAPIChannel(reqChanBufSize, replyChanBufSize int) (*Channel, error) {
245 return nil, errors.New("nil connection passed in")
248 // create new channel
249 chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff)
250 channel := newChannel(chID, c, c.codec, c, reqChanBufSize, replyChanBufSize)
252 // store API channel within the client
253 c.channelsLock.Lock()
254 c.channels[chID] = channel
255 c.channelsLock.Unlock()
257 // start watching on the request channel
258 go c.watchRequests(channel)
263 // releaseAPIChannel releases API channel that needs to be closed.
264 func (c *Connection) releaseAPIChannel(ch *Channel) {
265 log.WithFields(logger.Fields{
267 }).Debug("API channel released")
269 // delete the channel from channels map
270 c.channelsLock.Lock()
271 delete(c.channels, ch.id)
272 c.channelsLock.Unlock()
275 // connectLoop attempts to connect to VPP until it succeeds.
276 // Then it continues with healthCheckLoop.
277 func (c *Connection) connectLoop() {
278 var reconnectAttempts int
280 // loop until connected
282 if err := c.vppClient.WaitReady(); err != nil {
283 log.Debugf("wait ready failed: %v", err)
285 if err := c.connectVPP(); err == nil {
286 // signal connected event
287 c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Connected})
289 } else if reconnectAttempts < c.maxAttempts {
291 log.Warnf("connecting failed (attempt %d/%d): %v", reconnectAttempts, c.maxAttempts, err)
292 time.Sleep(c.recInterval)
294 c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err})
299 // we are now connected, continue with health check loop
303 // healthCheckLoop checks whether connection to VPP is alive. In case of disconnect,
304 // it continues with connectLoop and tries to reconnect.
305 func (c *Connection) healthCheckLoop() {
306 // create a separate API channel for health check probes
307 ch, err := c.newAPIChannel(1, 1)
309 log.Error("Failed to create health check API channel, health check will be disabled:", err)
314 sinceLastReply time.Duration
318 // send health check probes until an error or timeout occurs
320 // sleep until next health check probe period
321 time.Sleep(HealthCheckProbeInterval)
323 if atomic.LoadUint32(&c.vppConnected) == 0 {
324 // Disconnect has been called in the meantime, return the healthcheck - reconnect loop
325 log.Debug("Disconnected on request, exiting health check loop.")
329 // try draining probe replies from previous request before sending next one
332 log.Debug("drained old probe reply from reply channel")
336 // send the control ping request
337 ch.reqChan <- &vppRequest{msg: c.msgControlPing}
340 // expect response within timeout period
342 case vppReply := <-ch.replyChan:
345 case <-time.After(HealthCheckReplyTimeout):
346 err = ErrProbeTimeout
348 // check if time since last reply from any other
349 // channel is less than health check reply timeout
350 c.lastReplyLock.Lock()
351 sinceLastReply = time.Since(c.lastReply)
352 c.lastReplyLock.Unlock()
354 if sinceLastReply < HealthCheckReplyTimeout {
355 log.Warnf("VPP health check probe timing out, but some request on other channel was received %v ago, continue waiting!", sinceLastReply)
362 if err == ErrProbeTimeout {
364 log.Warnf("VPP health check probe timed out after %v (%d. timeout)", HealthCheckReplyTimeout, failedChecks)
365 if failedChecks > HealthCheckThreshold {
366 // in case of exceeded failed check threshold, assume VPP unresponsive
367 log.Errorf("VPP does not responding, the health check exceeded threshold for timeouts (>%d)", HealthCheckThreshold)
368 c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: NotResponding})
371 } else if err != nil {
372 // in case of error, assume VPP disconnected
373 log.Errorf("VPP health check probe failed: %v", err)
374 c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: err})
376 } else if failedChecks > 0 {
377 // in case of success after failed checks, clear failed check counter
379 log.Infof("VPP health check probe OK")
387 // we are now disconnected, start connect loop
391 func getMsgNameWithCrc(x api.Message) string {
392 return getMsgID(x.GetMessageName(), x.GetCrcString())
395 func getMsgID(name, crc string) string {
396 return name + "_" + crc
399 func getMsgFactory(msg api.Message) func() api.Message {
400 return func() api.Message {
401 return reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
405 // GetMessageID returns message identifier of given API message.
406 func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
408 return 0, errors.New("nil connection passed in")
410 pkgPath := c.GetMessagePath(msg)
411 msgID, err := c.vppClient.GetMsgID(msg.GetMessageName(), msg.GetCrcString())
415 if pathMsgs, pathOk := c.msgMapByPath[pkgPath]; !pathOk {
416 c.msgMapByPath[pkgPath] = make(map[uint16]api.Message)
417 c.msgMapByPath[pkgPath][msgID] = msg
418 } else if _, msgOk := pathMsgs[msgID]; !msgOk {
419 c.msgMapByPath[pkgPath][msgID] = msg
421 if _, ok := c.msgIDs[getMsgNameWithCrc(msg)]; ok {
424 c.msgIDs[getMsgNameWithCrc(msg)] = msgID
428 // LookupByID looks up message name and crc by ID.
429 func (c *Connection) LookupByID(path string, msgID uint16) (api.Message, error) {
431 return nil, errors.New("nil connection passed in")
433 if msg, ok := c.msgMapByPath[path][msgID]; ok {
436 return nil, fmt.Errorf("unknown message ID %d for path '%s'", msgID, path)
439 // GetMessagePath returns path for the given message
440 func (c *Connection) GetMessagePath(msg api.Message) string {
441 return path.Dir(reflect.TypeOf(msg).Elem().PkgPath())
444 // retrieveMessageIDs retrieves IDs for all registered messages and stores them in map
445 func (c *Connection) retrieveMessageIDs() (err error) {
448 msgsByPath := api.GetRegisteredMessages()
451 for pkgPath, msgs := range msgsByPath {
452 for _, msg := range msgs {
453 msgID, err := c.GetMessageID(msg)
456 log.Debugf("retrieving message ID for %s.%s failed: %v",
457 pkgPath, msg.GetMessageName(), err)
463 if c.pingReqID == 0 && msg.GetMessageName() == c.msgControlPing.GetMessageName() {
465 c.msgControlPing = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
466 } else if c.pingReplyID == 0 && msg.GetMessageName() == c.msgControlPingReply.GetMessageName() {
467 c.pingReplyID = msgID
468 c.msgControlPingReply = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
472 log.Debugf("message %q (%s) has ID: %d", msg.GetMessageName(), getMsgNameWithCrc(msg), msgID)
475 log.WithField("took", time.Since(t)).
476 Debugf("retrieved IDs for %d messages (registered %d) from path %s", n, len(msgs), pkgPath)
482 func (c *Connection) sendConnEvent(event ConnectionEvent) {
484 case c.connChan <- event:
486 log.Warn("Connection state channel is full, discarding value.")
490 // Trace gives access to the API trace interface
491 func (c *Connection) Trace() api.Trace {
495 // trace records api message
496 func (c *Connection) trace(msg api.Message, chId uint16, t time.Time, isReceived bool) {
497 if atomic.LoadInt32(&c.apiTrace.isEnabled) == 0 {
500 entry := &api.Record{
503 IsReceived: isReceived,
506 c.apiTrace.mux.Lock()
507 c.apiTrace.list = append(c.apiTrace.list, entry)
508 c.apiTrace.mux.Unlock()