1 // Copyright (c) 2017 Cisco and/or its affiliates.
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
7 // http://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
15 //go:generate binapi-generator --input-dir=bin_api --output-dir=bin_api
26 logger "github.com/sirupsen/logrus"
28 "git.fd.io/govpp.git/adapter"
29 "git.fd.io/govpp.git/api"
30 "git.fd.io/govpp.git/core/bin_api/vpe"
34 msgControlPing api.Message = &vpe.ControlPing{}
35 msgControlPingReply api.Message = &vpe.ControlPingReply{}
39 requestChannelBufSize = 100 // default size of the request channel buffers
40 replyChannelBufSize = 100 // default size of the reply channel buffers
41 notificationChannelBufSize = 100 // default size of the notification channel buffers
45 healthCheckProbeInterval = time.Second * 1 // default health check probe interval
46 healthCheckReplyTimeout = time.Millisecond * 100 // timeout for reply to a health check probe
47 healthCheckThreshold = 1 // number of failed healthProbe until the error is reported
50 // ConnectionState holds the current state of the connection to VPP.
51 type ConnectionState int
54 // Connected connection state means that the connection to VPP has been successfully established.
55 Connected ConnectionState = iota
57 // Disconnected connection state means that the connection to VPP has been lost.
61 // ConnectionEvent is a notification about change in the VPP connection state.
62 type ConnectionEvent struct {
63 // Timestamp holds the time when the event has been generated.
66 // State holds the new state of the connection to VPP at the time when the event has been generated.
70 // Connection represents a shared memory connection to VPP via vppAdapter.
71 type Connection struct {
72 vpp adapter.VppAdapter // VPP adapter
73 connected uint32 // non-zero if the adapter is connected to VPP
74 codec *MsgCodec // message codec
76 msgIDsLock sync.RWMutex // lock for the message IDs map
77 msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC
79 channelsLock sync.RWMutex // lock for the channels map
80 channels map[uint32]*api.Channel // map of all API channels indexed by the channel ID
82 notifSubscriptionsLock sync.RWMutex // lock for the subscriptions map
83 notifSubscriptions map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID
85 maxChannelID uint32 // maximum used client ID
86 pingReqID uint16 // ID if the ControlPing message
87 pingReplyID uint16 // ID of the ControlPingReply message
90 // channelMetadata contains core-local metadata of an API channel.
91 type channelMetadata struct {
92 id uint32 // channel ID
93 multipart uint32 // 1 if multipart request is being processed, 0 otherwise
97 log *logger.Logger // global logger
98 conn *Connection // global handle to the Connection (used in the message receive callback)
99 connLock sync.RWMutex // lock for the global connection
102 // init initializes global logger, which logs debug level messages to stdout.
106 log.Level = logger.DebugLevel
109 // SetLogger sets global logger to provided one.
110 func SetLogger(l *logger.Logger) {
114 // SetHealthCheckProbeInterval sets health check probe interval.
115 // Beware: Function is not thread-safe. It is recommended to setup this parameter
116 // before connecting to vpp.
117 func SetHealthCheckProbeInterval(interval time.Duration) {
118 healthCheckProbeInterval = interval
121 // SetHealthCheckReplyTimeout sets timeout for reply to a health check probe.
122 // If reply arrives after the timeout, check is considered as failed.
123 // Beware: Function is not thread-safe. It is recommended to setup this parameter
124 // before connecting to vpp.
125 func SetHealthCheckReplyTimeout(timeout time.Duration) {
126 healthCheckReplyTimeout = timeout
129 // SetHealthCheckThreshold sets the number of failed healthProbe checks until the error is reported.
130 // Beware: Function is not thread-safe. It is recommended to setup this parameter
131 // before connecting to vpp.
132 func SetHealthCheckThreshold(threshold int) {
133 healthCheckThreshold = threshold
136 // SetControlPingMessages sets the messages for ControlPing and ControlPingReply
137 func SetControlPingMessages(controPing, controlPingReply api.Message) {
138 msgControlPing = controPing
139 msgControlPingReply = controlPingReply
142 // Connect connects to VPP using specified VPP adapter and returns the connection handle.
143 // This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed.
144 func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) {
145 // create new connection handle
146 c, err := newConnection(vppAdapter)
151 // blocking attempt to connect to VPP
160 // AsyncConnect asynchronously connects to VPP using specified VPP adapter and returns the connection handle
161 // and ConnectionState channel. This call does not block until connection is established, it
162 // returns immediately. The caller is supposed to watch the returned ConnectionState channel for
163 // Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect.
164 func AsyncConnect(vppAdapter adapter.VppAdapter) (*Connection, chan ConnectionEvent, error) {
165 // create new connection handle
166 c, err := newConnection(vppAdapter)
171 // asynchronously attempt to connect to VPP
172 connChan := make(chan ConnectionEvent, notificationChannelBufSize)
173 go c.connectLoop(connChan)
175 return conn, connChan, nil
178 // Disconnect disconnects from VPP and releases all connection-related resources.
179 func (c *Connection) Disconnect() {
184 defer connLock.Unlock()
186 if c != nil && c.vpp != nil {
192 // newConnection returns new connection handle.
193 func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) {
195 defer connLock.Unlock()
198 return nil, errors.New("only one connection per process is supported")
204 channels: make(map[uint32]*api.Channel),
205 msgIDs: make(map[string]uint16),
206 notifSubscriptions: make(map[uint16][]*api.NotifSubscription),
209 conn.vpp.SetMsgCallback(msgCallback)
213 // connectVPP performs one blocking attempt to connect to VPP.
214 func (c *Connection) connectVPP() error {
215 log.Debug("Connecting to VPP...")
218 err := c.vpp.Connect()
224 // store control ping IDs
225 if c.pingReqID, err = c.GetMessageID(msgControlPing); err != nil {
229 if c.pingReplyID, err = c.GetMessageID(msgControlPingReply); err != nil {
234 // store connected state
235 atomic.StoreUint32(&c.connected, 1)
237 log.Info("Connected to VPP.")
241 // disconnectVPP disconnects from VPP in case it is connected.
242 func (c *Connection) disconnectVPP() {
243 if atomic.CompareAndSwapUint32(&c.connected, 1, 0) {
248 // connectLoop attempts to connect to VPP until it succeeds.
249 // Then it continues with healthCheckLoop.
250 func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
251 // loop until connected
253 if err := c.vpp.WaitReady(); err != nil {
254 log.Warnf("wait ready failed: %v", err)
256 if err := c.connectVPP(); err == nil {
257 // signal connected event
258 connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected}
261 log.Errorf("connecting to VPP failed: %v", err)
262 time.Sleep(time.Second)
266 // we are now connected, continue with health check loop
267 c.healthCheckLoop(connChan)
270 // healthCheckLoop checks whether connection to VPP is alive. In case of disconnect,
271 // it continues with connectLoop and tries to reconnect.
272 func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
273 // create a separate API channel for health check probes
274 ch, err := conn.NewAPIChannel()
276 log.Error("Error by creating health check API channel, health check will be disabled:", err)
281 // send health check probes until an error occurs
283 // wait for healthCheckProbeInterval
284 <-time.After(healthCheckProbeInterval)
286 if atomic.LoadUint32(&c.connected) == 0 {
287 // Disconnect has been called in the meantime, return the healthcheck - reconnect loop
288 log.Debug("Disconnected on request, exiting health check loop.")
292 // send the control ping
293 ch.ReqChan <- &api.VppRequest{Message: msgControlPing}
295 // expect response within timeout period
297 case vppReply := <-ch.ReplyChan:
299 case <-time.After(healthCheckReplyTimeout):
300 err = errors.New("probe reply not received within the timeout period")
305 log.Warnf("VPP health check failed (%d. time): %v", failedChecks, err)
310 if failedChecks > healthCheckThreshold {
311 // in case of error, break & disconnect
312 log.Errorf("Number of VPP health check fails exceeded treshold (%d)", healthCheckThreshold, err)
313 // signal disconnected event via channel
314 connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
323 // we are now disconnected, start connect loop
324 c.connectLoop(connChan)
327 // NewAPIChannel returns a new API channel for communication with VPP via govpp core.
328 // It uses default buffer sizes for the request and reply Go channels.
329 func (c *Connection) NewAPIChannel() (*api.Channel, error) {
331 return nil, errors.New("nil connection passed in")
333 return c.NewAPIChannelBuffered(requestChannelBufSize, replyChannelBufSize)
336 // NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core.
337 // It allows to specify custom buffer sizes for the request and reply Go channels.
338 func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (*api.Channel, error) {
340 return nil, errors.New("nil connection passed in")
342 chID := atomic.AddUint32(&c.maxChannelID, 1)
343 chMeta := &channelMetadata{id: chID}
345 ch := api.NewChannelInternal(chMeta)
346 ch.MsgDecoder = c.codec
349 // create the communication channels
350 ch.ReqChan = make(chan *api.VppRequest, reqChanBufSize)
351 ch.ReplyChan = make(chan *api.VppReply, replyChanBufSize)
352 ch.NotifSubsChan = make(chan *api.NotifSubscribeRequest, reqChanBufSize)
353 ch.NotifSubsReplyChan = make(chan error, replyChanBufSize)
355 // store API channel within the client
356 c.channelsLock.Lock()
357 c.channels[chID] = ch
358 c.channelsLock.Unlock()
360 // start watching on the request channel
361 go c.watchRequests(ch, chMeta)
366 // releaseAPIChannel releases API channel that needs to be closed.
367 func (c *Connection) releaseAPIChannel(ch *api.Channel, chMeta *channelMetadata) {
368 log.WithFields(logger.Fields{
369 "context": chMeta.id,
370 }).Debug("API channel closed.")
372 // delete the channel from channels map
373 c.channelsLock.Lock()
374 delete(c.channels, chMeta.id)
375 c.channelsLock.Unlock()