1 // Copyright (c) 2017 Cisco and/or its affiliates.
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
7 // http://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
25 logger "github.com/sirupsen/logrus"
27 "git.fd.io/govpp.git/adapter"
28 "git.fd.io/govpp.git/api"
29 "git.fd.io/govpp.git/codec"
33 RequestChanBufSize = 100 // default size of the request channel buffer
34 ReplyChanBufSize = 100 // default size of the reply channel buffer
35 NotificationChanBufSize = 100 // default size of the notification channel buffer
39 HealthCheckProbeInterval = time.Second * 1 // default health check probe interval
40 HealthCheckReplyTimeout = time.Millisecond * 100 // timeout for reply to a health check probe
41 HealthCheckThreshold = 1 // number of failed health checks until the error is reported
42 DefaultReplyTimeout = time.Second * 1 // default timeout for replies from VPP
45 // ConnectionState represents the current state of the connection to VPP.
46 type ConnectionState int
49 // Connected represents state in which the connection has been successfully established.
50 Connected ConnectionState = iota
52 // Disconnected represents state in which the connection has been dropped.
56 // ConnectionEvent is a notification about change in the VPP connection state.
57 type ConnectionEvent struct {
58 // Timestamp holds the time when the event has been created.
61 // State holds the new state of the connection at the time when the event has been created.
64 // Error holds error if any encountered.
69 connLock sync.RWMutex // lock for the global connection
70 conn *Connection // global handle to the Connection (used in the message receive callback)
73 // Connection represents a shared memory connection to VPP via vppAdapter.
74 type Connection struct {
75 vppClient adapter.VppAPI // VPP binary API client adapter
77 vppConnected uint32 // non-zero if the adapter is connected to VPP
79 codec *codec.MsgCodec // message codec
80 msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC
81 msgMap map[uint16]api.Message // map of messages indexed by message ID
83 maxChannelID uint32 // maximum used channel ID (the real limit is 2^15, 32-bit is used for atomic operations)
84 channelsLock sync.RWMutex // lock for the channels map
85 channels map[uint16]*Channel // map of all API channels indexed by the channel ID
87 subscriptionsLock sync.RWMutex // lock for the subscriptions map
88 subscriptions map[uint16][]*subscriptionCtx // map od all notification subscriptions indexed by message ID
90 pingReqID uint16 // ID if the ControlPing message
91 pingReplyID uint16 // ID of the ControlPingReply message
93 lastReplyLock sync.Mutex // lock for the last reply
94 lastReply time.Time // time of the last received reply from VPP
97 func newConnection(binapi adapter.VppAPI) *Connection {
100 codec: &codec.MsgCodec{},
101 msgIDs: make(map[string]uint16),
102 msgMap: make(map[uint16]api.Message),
103 channels: make(map[uint16]*Channel),
104 subscriptions: make(map[uint16][]*subscriptionCtx),
106 binapi.SetMsgCallback(c.msgCallback)
110 // Connect connects to VPP using specified VPP adapter and returns the connection handle.
111 // This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed.
112 func Connect(binapi adapter.VppAPI) (*Connection, error) {
113 // create new connection handle
114 c, err := createConnection(binapi)
119 // blocking attempt to connect to VPP
120 if err := c.connectVPP(); err != nil {
127 // AsyncConnect asynchronously connects to VPP using specified VPP adapter and returns the connection handle
128 // and ConnectionState channel. This call does not block until connection is established, it
129 // returns immediately. The caller is supposed to watch the returned ConnectionState channel for
130 // Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect.
131 func AsyncConnect(binapi adapter.VppAPI) (*Connection, chan ConnectionEvent, error) {
132 // create new connection handle
133 c, err := createConnection(binapi)
138 // asynchronously attempt to connect to VPP
139 connChan := make(chan ConnectionEvent, NotificationChanBufSize)
140 go c.connectLoop(connChan)
142 return c, connChan, nil
145 // Disconnect disconnects from VPP and releases all connection-related resources.
146 func (c *Connection) Disconnect() {
152 defer connLock.Unlock()
154 if c.vppClient != nil {
160 // newConnection returns new connection handle.
161 func createConnection(binapi adapter.VppAPI) (*Connection, error) {
163 defer connLock.Unlock()
166 return nil, errors.New("only one connection per process is supported")
169 conn = newConnection(binapi)
174 // connectVPP performs blocking attempt to connect to VPP.
175 func (c *Connection) connectVPP() error {
176 log.Debug("Connecting to VPP..")
179 if err := c.vppClient.Connect(); err != nil {
183 log.Debugf("Connected to VPP.")
185 if err := c.retrieveMessageIDs(); err != nil {
186 c.vppClient.Disconnect()
187 return fmt.Errorf("VPP is incompatible: %v", err)
190 // store connected state
191 atomic.StoreUint32(&c.vppConnected, 1)
196 func (c *Connection) NewAPIChannel() (api.Channel, error) {
197 return c.newAPIChannel(RequestChanBufSize, ReplyChanBufSize)
200 func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (api.Channel, error) {
201 return c.newAPIChannel(reqChanBufSize, replyChanBufSize)
204 // NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core.
205 // It allows to specify custom buffer sizes for the request and reply Go channels.
206 func (c *Connection) newAPIChannel(reqChanBufSize, replyChanBufSize int) (*Channel, error) {
208 return nil, errors.New("nil connection passed in")
211 // create new channel
212 chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff)
213 channel := newChannel(chID, c, c.codec, c, reqChanBufSize, replyChanBufSize)
215 // store API channel within the client
216 c.channelsLock.Lock()
217 c.channels[chID] = channel
218 c.channelsLock.Unlock()
220 // start watching on the request channel
221 go c.watchRequests(channel)
226 // releaseAPIChannel releases API channel that needs to be closed.
227 func (c *Connection) releaseAPIChannel(ch *Channel) {
228 log.WithFields(logger.Fields{
230 }).Debug("API channel released")
232 // delete the channel from channels map
233 c.channelsLock.Lock()
234 delete(c.channels, ch.id)
235 c.channelsLock.Unlock()
238 // GetMessageID returns message identifier of given API message.
239 func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
241 return 0, errors.New("nil connection passed in")
244 if msgID, ok := c.msgIDs[getMsgNameWithCrc(msg)]; ok {
248 return 0, fmt.Errorf("unknown message: %s (%s)", msg.GetMessageName(), msg.GetCrcString())
251 // LookupByID looks up message name and crc by ID.
252 func (c *Connection) LookupByID(msgID uint16) (api.Message, error) {
254 return nil, errors.New("nil connection passed in")
257 if msg, ok := c.msgMap[msgID]; ok {
261 return nil, fmt.Errorf("unknown message ID: %d", msgID)
264 // retrieveMessageIDs retrieves IDs for all registered messages and stores them in map
265 func (c *Connection) retrieveMessageIDs() (err error) {
268 var addMsg = func(msgID uint16, msg api.Message) {
269 c.msgIDs[getMsgNameWithCrc(msg)] = msgID
270 c.msgMap[msgID] = msg
273 msgs := api.GetAllMessages()
275 for name, msg := range msgs {
276 msgID, err := c.vppClient.GetMsgID(msg.GetMessageName(), msg.GetCrcString())
283 if msg.GetMessageName() == msgControlPing.GetMessageName() {
285 msgControlPing = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
286 } else if msg.GetMessageName() == msgControlPingReply.GetMessageName() {
287 c.pingReplyID = msgID
288 msgControlPingReply = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
292 log.Debugf("message %q (%s) has ID: %d", name, getMsgNameWithCrc(msg), msgID)
296 log.Debugf("retrieving %d message IDs took %s", len(msgs), time.Since(t))
298 // fallback for control ping when vpe package is not imported
299 if c.pingReqID == 0 {
300 c.pingReqID, err = c.vppClient.GetMsgID(msgControlPing.GetMessageName(), msgControlPing.GetCrcString())
304 addMsg(c.pingReqID, msgControlPing)
306 if c.pingReplyID == 0 {
307 c.pingReplyID, err = c.vppClient.GetMsgID(msgControlPingReply.GetMessageName(), msgControlPingReply.GetCrcString())
311 addMsg(c.pingReplyID, msgControlPingReply)
317 // disconnectVPP disconnects from VPP in case it is connected.
318 func (c *Connection) disconnectVPP() {
319 if atomic.CompareAndSwapUint32(&c.vppConnected, 1, 0) {
320 c.vppClient.Disconnect()
324 // connectLoop attempts to connect to VPP until it succeeds.
325 // Then it continues with healthCheckLoop.
326 func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
327 // loop until connected
329 if err := c.vppClient.WaitReady(); err != nil {
330 log.Warnf("wait ready failed: %v", err)
332 if err := c.connectVPP(); err == nil {
333 // signal connected event
334 connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected}
337 log.Errorf("connecting to VPP failed: %v", err)
338 time.Sleep(time.Second)
342 // we are now connected, continue with health check loop
343 c.healthCheckLoop(connChan)
346 // healthCheckLoop checks whether connection to VPP is alive. In case of disconnect,
347 // it continues with connectLoop and tries to reconnect.
348 func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
349 // create a separate API channel for health check probes
350 ch, err := c.newAPIChannel(1, 1)
352 log.Error("Failed to create health check API channel, health check will be disabled:", err)
357 sinceLastReply time.Duration
361 // send health check probes until an error or timeout occurs
363 // sleep until next health check probe period
364 time.Sleep(HealthCheckProbeInterval)
366 if atomic.LoadUint32(&c.vppConnected) == 0 {
367 // Disconnect has been called in the meantime, return the healthcheck - reconnect loop
368 log.Debug("Disconnected on request, exiting health check loop.")
372 // try draining probe replies from previous request before sending next one
375 log.Debug("drained old probe reply from reply channel")
379 // send the control ping request
380 ch.reqChan <- &vppRequest{msg: msgControlPing}
383 // expect response within timeout period
385 case vppReply := <-ch.replyChan:
388 case <-time.After(HealthCheckReplyTimeout):
389 err = ErrProbeTimeout
391 // check if time since last reply from any other
392 // channel is less than health check reply timeout
393 c.lastReplyLock.Lock()
394 sinceLastReply = time.Since(c.lastReply)
395 c.lastReplyLock.Unlock()
397 if sinceLastReply < HealthCheckReplyTimeout {
398 log.Warnf("VPP health check probe timing out, but some request on other channel was received %v ago, continue waiting!", sinceLastReply)
405 if err == ErrProbeTimeout {
407 log.Warnf("VPP health check probe timed out after %v (%d. timeout)", HealthCheckReplyTimeout, failedChecks)
408 if failedChecks > HealthCheckThreshold {
409 // in case of exceeded failed check treshold, assume VPP disconnected
410 log.Errorf("VPP health check exceeded treshold for timeouts (>%d), assuming disconnect", HealthCheckThreshold)
411 connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
414 } else if err != nil {
415 // in case of error, assume VPP disconnected
416 log.Errorf("VPP health check probe failed: %v", err)
417 connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: err}
419 } else if failedChecks > 0 {
420 // in case of success after failed checks, clear failed check counter
422 log.Infof("VPP health check probe OK")
430 // we are now disconnected, start connect loop
431 c.connectLoop(connChan)
434 func getMsgNameWithCrc(x api.Message) string {
435 return x.GetMessageName() + "_" + x.GetCrcString()