1 // Copyright (c) 2017 Cisco and/or its affiliates.
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
7 // http://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
15 //go:generate binapi-generator --input-dir=bin_api --output-dir=bin_api
26 logger "github.com/sirupsen/logrus"
28 "git.fd.io/govpp.git/adapter"
29 "git.fd.io/govpp.git/api"
30 "git.fd.io/govpp.git/codec"
31 "git.fd.io/govpp.git/core/bin_api/vpe"
35 msgControlPing api.Message = &vpe.ControlPing{}
36 msgControlPingReply api.Message = &vpe.ControlPingReply{}
40 requestChannelBufSize = 100 // default size of the request channel buffers
41 replyChannelBufSize = 100 // default size of the reply channel buffers
42 notificationChannelBufSize = 100 // default size of the notification channel buffers
46 healthCheckProbeInterval = time.Second * 1 // default health check probe interval
47 healthCheckReplyTimeout = time.Millisecond * 100 // timeout for reply to a health check probe
48 healthCheckThreshold = 1 // number of failed healthProbe until the error is reported
51 // ConnectionState holds the current state of the connection to VPP.
52 type ConnectionState int
55 // Connected connection state means that the connection to VPP has been successfully established.
56 Connected ConnectionState = iota
58 // Disconnected connection state means that the connection to VPP has been lost.
62 // ConnectionEvent is a notification about change in the VPP connection state.
63 type ConnectionEvent struct {
64 // Timestamp holds the time when the event has been generated.
67 // State holds the new state of the connection to VPP at the time when the event has been generated.
71 // Connection represents a shared memory connection to VPP via vppAdapter.
72 type Connection struct {
73 vpp adapter.VppAdapter // VPP adapter
74 connected uint32 // non-zero if the adapter is connected to VPP
75 codec *codec.MsgCodec // message codec
77 msgIDsLock sync.RWMutex // lock for the message IDs map
78 msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC
80 channelsLock sync.RWMutex // lock for the channels map
81 channels map[uint16]*channel // map of all API channels indexed by the channel ID
83 notifSubscriptionsLock sync.RWMutex // lock for the subscriptions map
84 notifSubscriptions map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID
86 maxChannelID uint32 // maximum used channel ID (the real limit is 2^15, 32-bit is used for atomic operations)
87 pingReqID uint16 // ID if the ControlPing message
88 pingReplyID uint16 // ID of the ControlPingReply message
90 lastReplyLock sync.Mutex // lock for the last reply
91 lastReply time.Time // time of the last received reply from VPP
95 log *logger.Logger // global logger
96 conn *Connection // global handle to the Connection (used in the message receive callback)
97 connLock sync.RWMutex // lock for the global connection
100 // init initializes global logger, which logs debug level messages to stdout.
104 log.Level = logger.DebugLevel
107 // SetLogger sets global logger to provided one.
108 func SetLogger(l *logger.Logger) {
112 // SetHealthCheckProbeInterval sets health check probe interval.
113 // Beware: Function is not thread-safe. It is recommended to setup this parameter
114 // before connecting to vpp.
115 func SetHealthCheckProbeInterval(interval time.Duration) {
116 healthCheckProbeInterval = interval
119 // SetHealthCheckReplyTimeout sets timeout for reply to a health check probe.
120 // If reply arrives after the timeout, check is considered as failed.
121 // Beware: Function is not thread-safe. It is recommended to setup this parameter
122 // before connecting to vpp.
123 func SetHealthCheckReplyTimeout(timeout time.Duration) {
124 healthCheckReplyTimeout = timeout
127 // SetHealthCheckThreshold sets the number of failed healthProbe checks until the error is reported.
128 // Beware: Function is not thread-safe. It is recommended to setup this parameter
129 // before connecting to vpp.
130 func SetHealthCheckThreshold(threshold int) {
131 healthCheckThreshold = threshold
134 // SetControlPingMessages sets the messages for ControlPing and ControlPingReply
135 func SetControlPingMessages(controPing, controlPingReply api.Message) {
136 msgControlPing = controPing
137 msgControlPingReply = controlPingReply
140 // Connect connects to VPP using specified VPP adapter and returns the connection handle.
141 // This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed.
142 func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) {
143 // create new connection handle
144 c, err := newConnection(vppAdapter)
149 // blocking attempt to connect to VPP
158 // AsyncConnect asynchronously connects to VPP using specified VPP adapter and returns the connection handle
159 // and ConnectionState channel. This call does not block until connection is established, it
160 // returns immediately. The caller is supposed to watch the returned ConnectionState channel for
161 // Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect.
162 func AsyncConnect(vppAdapter adapter.VppAdapter) (*Connection, chan ConnectionEvent, error) {
163 // create new connection handle
164 c, err := newConnection(vppAdapter)
169 // asynchronously attempt to connect to VPP
170 connChan := make(chan ConnectionEvent, notificationChannelBufSize)
171 go c.connectLoop(connChan)
173 return conn, connChan, nil
176 // Disconnect disconnects from VPP and releases all connection-related resources.
177 func (c *Connection) Disconnect() {
182 defer connLock.Unlock()
184 if c != nil && c.vpp != nil {
190 // newConnection returns new connection handle.
191 func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) {
193 defer connLock.Unlock()
196 return nil, errors.New("only one connection per process is supported")
201 codec: &codec.MsgCodec{},
202 channels: make(map[uint16]*channel),
203 msgIDs: make(map[string]uint16),
204 notifSubscriptions: make(map[uint16][]*api.NotifSubscription),
207 conn.vpp.SetMsgCallback(msgCallback)
211 // connectVPP performs one blocking attempt to connect to VPP.
212 func (c *Connection) connectVPP() error {
213 log.Debug("Connecting to VPP...")
216 err := c.vpp.Connect()
222 // store control ping IDs
223 if c.pingReqID, err = c.GetMessageID(msgControlPing); err != nil {
227 if c.pingReplyID, err = c.GetMessageID(msgControlPingReply); err != nil {
232 // store connected state
233 atomic.StoreUint32(&c.connected, 1)
235 log.Info("Connected to VPP.")
239 // disconnectVPP disconnects from VPP in case it is connected.
240 func (c *Connection) disconnectVPP() {
241 if atomic.CompareAndSwapUint32(&c.connected, 1, 0) {
246 // connectLoop attempts to connect to VPP until it succeeds.
247 // Then it continues with healthCheckLoop.
248 func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
249 // loop until connected
251 if err := c.vpp.WaitReady(); err != nil {
252 log.Warnf("wait ready failed: %v", err)
254 if err := c.connectVPP(); err == nil {
255 // signal connected event
256 connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected}
259 log.Errorf("connecting to VPP failed: %v", err)
260 time.Sleep(time.Second)
264 // we are now connected, continue with health check loop
265 c.healthCheckLoop(connChan)
268 // healthCheckLoop checks whether connection to VPP is alive. In case of disconnect,
269 // it continues with connectLoop and tries to reconnect.
270 func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
271 // create a separate API channel for health check probes
272 ch, err := conn.newAPIChannelBuffered(1, 1)
274 log.Error("Failed to create health check API channel, health check will be disabled:", err)
278 var sinceLastReply time.Duration
281 // send health check probes until an error or timeout occurs
283 // sleep until next health check probe period
284 time.Sleep(healthCheckProbeInterval)
286 if atomic.LoadUint32(&c.connected) == 0 {
287 // Disconnect has been called in the meantime, return the healthcheck - reconnect loop
288 log.Debug("Disconnected on request, exiting health check loop.")
292 // try draining probe replies from previous request before sending next one
295 log.Debug("drained old probe reply from reply channel")
299 // send the control ping request
300 ch.reqChan <- &api.VppRequest{Message: msgControlPing}
303 // expect response within timeout period
305 case vppReply := <-ch.replyChan:
308 case <-time.After(healthCheckReplyTimeout):
309 err = ErrProbeTimeout
311 // check if time since last reply from any other
312 // channel is less than health check reply timeout
313 conn.lastReplyLock.Lock()
314 sinceLastReply = time.Since(c.lastReply)
315 conn.lastReplyLock.Unlock()
317 if sinceLastReply < healthCheckReplyTimeout {
318 log.Warnf("VPP health check probe timing out, but some request on other channel was received %v ago, continue waiting!", sinceLastReply)
325 if err == ErrProbeTimeout {
327 log.Warnf("VPP health check probe timed out after %v (%d. timeout)", healthCheckReplyTimeout, failedChecks)
328 if failedChecks > healthCheckThreshold {
329 // in case of exceeded treshold disconnect
330 log.Errorf("VPP health check exceeded treshold for timeouts (>%d), assuming disconnect", healthCheckThreshold)
331 connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
334 } else if err != nil {
335 // in case of error disconnect
336 log.Errorf("VPP health check probe failed: %v", err)
337 connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
339 } else if failedChecks > 0 {
341 log.Infof("VPP health check probe OK")
349 // we are now disconnected, start connect loop
350 c.connectLoop(connChan)
353 func (c *Connection) NewAPIChannel() (api.Channel, error) {
354 return c.newAPIChannelBuffered(requestChannelBufSize, replyChannelBufSize)
357 func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (api.Channel, error) {
358 return c.newAPIChannelBuffered(reqChanBufSize, replyChanBufSize)
361 // NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core.
362 // It allows to specify custom buffer sizes for the request and reply Go channels.
363 func (c *Connection) newAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (*channel, error) {
365 return nil, errors.New("nil connection passed in")
368 chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff)
371 replyTimeout: defaultReplyTimeout,
373 ch.msgDecoder = c.codec
376 // create the communication channels
377 ch.reqChan = make(chan *api.VppRequest, reqChanBufSize)
378 ch.replyChan = make(chan *api.VppReply, replyChanBufSize)
379 ch.notifSubsChan = make(chan *api.NotifSubscribeRequest, reqChanBufSize)
380 ch.notifSubsReplyChan = make(chan error, replyChanBufSize)
382 // store API channel within the client
383 c.channelsLock.Lock()
384 c.channels[chID] = ch
385 c.channelsLock.Unlock()
387 // start watching on the request channel
388 go c.watchRequests(ch)
393 // releaseAPIChannel releases API channel that needs to be closed.
394 func (c *Connection) releaseAPIChannel(ch *channel) {
395 log.WithFields(logger.Fields{
397 }).Debug("API channel closed.")
399 // delete the channel from channels map
400 c.channelsLock.Lock()
401 delete(c.channels, ch.id)
402 c.channelsLock.Unlock()