1 // Copyright (c) 2017 Cisco and/or its affiliates.
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
7 // http://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
25 logger "github.com/sirupsen/logrus"
27 "git.fd.io/govpp.git/adapter"
28 "git.fd.io/govpp.git/api"
29 "git.fd.io/govpp.git/codec"
33 requestChannelBufSize = 100 // default size of the request channel buffer
34 replyChannelBufSize = 100 // default size of the reply channel buffer
35 notificationChannelBufSize = 100 // default size of the notification channel buffer
37 defaultReplyTimeout = time.Second * 1 // default timeout for replies from VPP, can be changed with SetReplyTimeout
41 healthCheckInterval = time.Second * 1 // default health check interval
42 healthCheckReplyTimeout = time.Millisecond * 100 // timeout for reply to a health check
43 healthCheckThreshold = 1 // number of failed health checks until the error is reported
46 // SetHealthCheckProbeInterval sets health check probe interval.
47 // Beware: Function is not thread-safe. It is recommended to setup this parameter
48 // before connecting to vpp.
49 func SetHealthCheckProbeInterval(interval time.Duration) {
50 healthCheckInterval = interval
53 // SetHealthCheckReplyTimeout sets timeout for reply to a health check probe.
54 // If reply arrives after the timeout, check is considered as failed.
55 // Beware: Function is not thread-safe. It is recommended to setup this parameter
56 // before connecting to vpp.
57 func SetHealthCheckReplyTimeout(timeout time.Duration) {
58 healthCheckReplyTimeout = timeout
61 // SetHealthCheckThreshold sets the number of failed healthProbe checks until the error is reported.
62 // Beware: Function is not thread-safe. It is recommended to setup this parameter
63 // before connecting to vpp.
64 func SetHealthCheckThreshold(threshold int) {
65 healthCheckThreshold = threshold
68 // ConnectionState represents the current state of the connection to VPP.
69 type ConnectionState int
72 // Connected represents state in which the connection has been successfully established.
73 Connected ConnectionState = iota
75 // Disconnected represents state in which the connection has been dropped.
79 // ConnectionEvent is a notification about change in the VPP connection state.
80 type ConnectionEvent struct {
81 // Timestamp holds the time when the event has been created.
84 // State holds the new state of the connection at the time when the event has been created.
87 // Error holds error if any encountered.
92 connLock sync.RWMutex // lock for the global connection
93 conn *Connection // global handle to the Connection (used in the message receive callback)
96 // Connection represents a shared memory connection to VPP via vppAdapter.
97 type Connection struct {
98 vpp adapter.VppAdapter // VPP adapter
99 connected uint32 // non-zero if the adapter is connected to VPP
101 codec *codec.MsgCodec // message codec
102 msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC
103 msgMap map[uint16]api.Message // map of messages indexed by message ID
105 maxChannelID uint32 // maximum used channel ID (the real limit is 2^15, 32-bit is used for atomic operations)
106 channelsLock sync.RWMutex // lock for the channels map
107 channels map[uint16]*channel // map of all API channels indexed by the channel ID
109 notifSubscriptionsLock sync.RWMutex // lock for the subscriptions map
110 notifSubscriptions map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID
112 pingReqID uint16 // ID if the ControlPing message
113 pingReplyID uint16 // ID of the ControlPingReply message
115 lastReplyLock sync.Mutex // lock for the last reply
116 lastReply time.Time // time of the last received reply from VPP
119 // Connect connects to VPP using specified VPP adapter and returns the connection handle.
120 // This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed.
121 func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) {
122 // create new connection handle
123 c, err := newConnection(vppAdapter)
128 // blocking attempt to connect to VPP
137 // AsyncConnect asynchronously connects to VPP using specified VPP adapter and returns the connection handle
138 // and ConnectionState channel. This call does not block until connection is established, it
139 // returns immediately. The caller is supposed to watch the returned ConnectionState channel for
140 // Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect.
141 func AsyncConnect(vppAdapter adapter.VppAdapter) (*Connection, chan ConnectionEvent, error) {
142 // create new connection handle
143 c, err := newConnection(vppAdapter)
148 // asynchronously attempt to connect to VPP
149 connChan := make(chan ConnectionEvent, notificationChannelBufSize)
150 go c.connectLoop(connChan)
152 return c, connChan, nil
155 // Disconnect disconnects from VPP and releases all connection-related resources.
156 func (c *Connection) Disconnect() {
162 defer connLock.Unlock()
170 // newConnection returns new connection handle.
171 func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) {
173 defer connLock.Unlock()
176 return nil, errors.New("only one connection per process is supported")
181 codec: &codec.MsgCodec{},
182 channels: make(map[uint16]*channel),
183 msgIDs: make(map[string]uint16),
184 msgMap: make(map[uint16]api.Message),
185 notifSubscriptions: make(map[uint16][]*api.NotifSubscription),
187 conn.vpp.SetMsgCallback(conn.msgCallback)
192 // connectVPP performs blocking attempt to connect to VPP.
193 func (c *Connection) connectVPP() error {
194 log.Debug("Connecting to VPP..")
197 if err := c.vpp.Connect(); err != nil {
201 log.Debugf("Connected to VPP.")
203 if err := c.retrieveMessageIDs(); err != nil {
205 return fmt.Errorf("VPP is incompatible: %v", err)
208 // store connected state
209 atomic.StoreUint32(&c.connected, 1)
214 func getMsgNameWithCrc(x api.Message) string {
215 return x.GetMessageName() + "_" + x.GetCrcString()
218 // retrieveMessageIDs retrieves IDs for all registered messages and stores them in map
219 func (c *Connection) retrieveMessageIDs() (err error) {
222 var addMsg = func(msgID uint16, msg api.Message) {
223 c.msgIDs[getMsgNameWithCrc(msg)] = msgID
224 c.msgMap[msgID] = msg
227 msgs := api.GetAllMessages()
229 for name, msg := range msgs {
230 msgID, err := c.vpp.GetMsgID(msg.GetMessageName(), msg.GetCrcString())
237 if msg.GetMessageName() == msgControlPing.GetMessageName() {
239 msgControlPing = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
240 } else if msg.GetMessageName() == msgControlPingReply.GetMessageName() {
241 c.pingReplyID = msgID
242 msgControlPingReply = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
246 log.Debugf("message %q (%s) has ID: %d", name, getMsgNameWithCrc(msg), msgID)
250 log.Debugf("retrieving %d message IDs took %s", len(msgs), time.Since(t))
252 // fallback for control ping when vpe package is not imported
253 if c.pingReqID == 0 {
254 c.pingReqID, err = c.vpp.GetMsgID(msgControlPing.GetMessageName(), msgControlPing.GetCrcString())
258 addMsg(c.pingReqID, msgControlPing)
260 if c.pingReplyID == 0 {
261 c.pingReplyID, err = c.vpp.GetMsgID(msgControlPingReply.GetMessageName(), msgControlPingReply.GetCrcString())
265 addMsg(c.pingReplyID, msgControlPingReply)
271 // GetMessageID returns message identifier of given API message.
272 func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
274 return 0, errors.New("nil connection passed in")
277 if msgID, ok := c.msgIDs[getMsgNameWithCrc(msg)]; ok {
281 return 0, fmt.Errorf("unknown message: %s (%s)", msg.GetMessageName(), msg.GetCrcString())
284 // LookupByID looks up message name and crc by ID.
285 func (c *Connection) LookupByID(msgID uint16) (api.Message, error) {
287 return nil, errors.New("nil connection passed in")
290 if msg, ok := c.msgMap[msgID]; ok {
294 return nil, fmt.Errorf("unknown message ID: %d", msgID)
297 // disconnectVPP disconnects from VPP in case it is connected.
298 func (c *Connection) disconnectVPP() {
299 if atomic.CompareAndSwapUint32(&c.connected, 1, 0) {
304 // connectLoop attempts to connect to VPP until it succeeds.
305 // Then it continues with healthCheckLoop.
306 func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
307 // loop until connected
309 if err := c.vpp.WaitReady(); err != nil {
310 log.Warnf("wait ready failed: %v", err)
312 if err := c.connectVPP(); err == nil {
313 // signal connected event
314 connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected}
317 log.Errorf("connecting to VPP failed: %v", err)
318 time.Sleep(time.Second)
322 // we are now connected, continue with health check loop
323 c.healthCheckLoop(connChan)
326 // healthCheckLoop checks whether connection to VPP is alive. In case of disconnect,
327 // it continues with connectLoop and tries to reconnect.
328 func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
329 // create a separate API channel for health check probes
330 ch, err := c.newAPIChannel(1, 1)
332 log.Error("Failed to create health check API channel, health check will be disabled:", err)
337 sinceLastReply time.Duration
341 // send health check probes until an error or timeout occurs
343 // sleep until next health check probe period
344 time.Sleep(healthCheckInterval)
346 if atomic.LoadUint32(&c.connected) == 0 {
347 // Disconnect has been called in the meantime, return the healthcheck - reconnect loop
348 log.Debug("Disconnected on request, exiting health check loop.")
352 // try draining probe replies from previous request before sending next one
355 log.Debug("drained old probe reply from reply channel")
359 // send the control ping request
360 ch.reqChan <- &vppRequest{msg: msgControlPing}
363 // expect response within timeout period
365 case vppReply := <-ch.replyChan:
368 case <-time.After(healthCheckReplyTimeout):
369 err = ErrProbeTimeout
371 // check if time since last reply from any other
372 // channel is less than health check reply timeout
373 c.lastReplyLock.Lock()
374 sinceLastReply = time.Since(c.lastReply)
375 c.lastReplyLock.Unlock()
377 if sinceLastReply < healthCheckReplyTimeout {
378 log.Warnf("VPP health check probe timing out, but some request on other channel was received %v ago, continue waiting!", sinceLastReply)
385 if err == ErrProbeTimeout {
387 log.Warnf("VPP health check probe timed out after %v (%d. timeout)", healthCheckReplyTimeout, failedChecks)
388 if failedChecks > healthCheckThreshold {
389 // in case of exceeded failed check treshold, assume VPP disconnected
390 log.Errorf("VPP health check exceeded treshold for timeouts (>%d), assuming disconnect", healthCheckThreshold)
391 connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
394 } else if err != nil {
395 // in case of error, assume VPP disconnected
396 log.Errorf("VPP health check probe failed: %v", err)
397 connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: err}
399 } else if failedChecks > 0 {
400 // in case of success after failed checks, clear failed check counter
402 log.Infof("VPP health check probe OK")
410 // we are now disconnected, start connect loop
411 c.connectLoop(connChan)
414 func (c *Connection) NewAPIChannel() (api.Channel, error) {
415 return c.newAPIChannel(requestChannelBufSize, replyChannelBufSize)
418 func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (api.Channel, error) {
419 return c.newAPIChannel(reqChanBufSize, replyChanBufSize)
422 // NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core.
423 // It allows to specify custom buffer sizes for the request and reply Go channels.
424 func (c *Connection) newAPIChannel(reqChanBufSize, replyChanBufSize int) (*channel, error) {
426 return nil, errors.New("nil connection passed in")
429 chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff)
432 replyTimeout: defaultReplyTimeout,
435 reqChan: make(chan *vppRequest, reqChanBufSize),
436 replyChan: make(chan *vppReply, replyChanBufSize),
437 notifSubsChan: make(chan *subscriptionRequest, reqChanBufSize),
438 notifSubsReplyChan: make(chan error, replyChanBufSize),
441 // store API channel within the client
442 c.channelsLock.Lock()
443 c.channels[chID] = ch
444 c.channelsLock.Unlock()
446 // start watching on the request channel
447 go c.watchRequests(ch)
452 // releaseAPIChannel releases API channel that needs to be closed.
453 func (c *Connection) releaseAPIChannel(ch *channel) {
454 log.WithFields(logger.Fields{
456 }).Debug("API channel released")
458 // delete the channel from channels map
459 c.channelsLock.Lock()
460 delete(c.channels, ch.id)
461 c.channelsLock.Unlock()