1 // Copyright (c) 2017 Cisco and/or its affiliates.
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
7 // http://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
15 //go:generate binapi-generator --input-dir=bin_api --output-dir=bin_api
26 logger "github.com/Sirupsen/logrus"
28 "git.fd.io/govpp.git/adapter"
29 "git.fd.io/govpp.git/api"
30 "git.fd.io/govpp.git/core/bin_api/vpe"
31 "github.com/fsnotify/fsnotify"
35 requestChannelBufSize = 100 // default size of the request channel buffers
36 replyChannelBufSize = 100 // default size of the reply channel buffers
37 notificationChannelBufSize = 100 // default size of the notification channel buffers
41 healthCheckProbeInterval = time.Second * 1 // default health check probe interval
42 healthCheckReplyTimeout = time.Millisecond * 100 // timeout for reply to a health check probe
45 // ConnectionState holds the current state of the connection to VPP.
46 type ConnectionState int
49 // Connected connection state means that the connection to VPP has been successfully established.
50 Connected ConnectionState = iota
52 // Disconnected connection state means that the connection to VPP has been lost.
57 // watchedFolder is a folder where vpp's shared memory is supposed to be created.
58 // File system events are monitored in this folder.
59 watchedFolder = "/dev/shm/"
60 // watchedFile is a name of the file in the watchedFolder. Once the file is present
61 // the vpp is ready to accept a new connection.
62 watchedFile = watchedFolder + "vpe-api"
65 // ConnectionEvent is a notification about change in the VPP connection state.
66 type ConnectionEvent struct {
67 // Timestamp holds the time when the event has been generated.
70 // State holds the new state of the connection to VPP at the time when the event has been generated.
74 // Connection represents a shared memory connection to VPP via vppAdapter.
75 type Connection struct {
76 vpp adapter.VppAdapter // VPP adapter
77 connected uint32 // non-zero if the adapter is connected to VPP
78 codec *MsgCodec // message codec
80 msgIDs map[string]uint16 // map of message IDs indexed by message name + CRC
81 msgIDsLock sync.RWMutex // lock for the message IDs map
83 channels map[uint32]*api.Channel // map of all API channels indexed by the channel ID
84 channelsLock sync.RWMutex // lock for the channels map
86 notifSubscriptions map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID
87 notifSubscriptionsLock sync.RWMutex // lock for the subscriptions map
89 maxChannelID uint32 // maximum used client ID
90 pingReqID uint16 // ID if the ControlPing message
91 pingReplyID uint16 // ID of the ControlPingReply message
94 // channelMetadata contains core-local metadata of an API channel.
95 type channelMetadata struct {
96 id uint32 // channel ID
97 multipart uint32 // 1 if multipart request is being processed, 0 otherwise
101 log *logger.Logger // global logger
102 conn *Connection // global handle to the Connection (used in the message receive callback)
103 connLock sync.RWMutex // lock for the global connection
106 // init initializes global logger, which logs debug level messages to stdout.
110 log.Level = logger.DebugLevel
113 // SetLogger sets global logger to provided one.
114 func SetLogger(l *logger.Logger) {
118 // Connect connects to VPP using specified VPP adapter and returns the connection handle.
119 // This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed.
120 func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) {
121 // create new connection handle
122 c, err := newConnection(vppAdapter)
127 // blocking attempt to connect to VPP
136 // AsyncConnect asynchronously connects to VPP using specified VPP adapter and returns the connection handle
137 // and ConnectionState channel. This call does not block until connection is established, it
138 // returns immediately. The caller is supposed to watch the returned ConnectionState channel for
139 // Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect.
140 func AsyncConnect(vppAdapter adapter.VppAdapter) (*Connection, chan ConnectionEvent, error) {
141 // create new connection handle
142 c, err := newConnection(vppAdapter)
147 // asynchronously attempt to connect to VPP
148 connChan := make(chan ConnectionEvent, notificationChannelBufSize)
149 go c.connectLoop(connChan)
151 return conn, connChan, nil
154 // Disconnect disconnects from VPP and releases all connection-related resources.
155 func (c *Connection) Disconnect() {
160 defer connLock.Unlock()
162 if c != nil && c.vpp != nil {
168 // newConnection returns new connection handle.
169 func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) {
171 defer connLock.Unlock()
174 return nil, errors.New("only one connection per process is supported")
177 conn = &Connection{vpp: vppAdapter, codec: &MsgCodec{}}
178 conn.channels = make(map[uint32]*api.Channel)
179 conn.msgIDs = make(map[string]uint16)
180 conn.notifSubscriptions = make(map[uint16][]*api.NotifSubscription)
182 conn.vpp.SetMsgCallback(msgCallback)
186 // connectVPP performs one blocking attempt to connect to VPP.
187 func (c *Connection) connectVPP() error {
188 log.Debug("Connecting to VPP...")
191 err := c.vpp.Connect()
197 // store connected state
198 atomic.StoreUint32(&c.connected, 1)
200 // store control ping IDs
201 c.pingReqID, _ = c.GetMessageID(&vpe.ControlPing{})
202 c.pingReplyID, _ = c.GetMessageID(&vpe.ControlPingReply{})
204 log.Info("Connected to VPP.")
208 // disconnectVPP disconnects from VPP in case it is connected.
209 func (c *Connection) disconnectVPP() {
210 if atomic.CompareAndSwapUint32(&c.connected, 1, 0) {
215 func fileExists(name string) bool {
216 if _, err := os.Stat(name); err != nil {
217 if os.IsNotExist(err) {
224 // waitForVpp blocks until shared memory for sending bin api calls
225 // is present on the file system.
226 func waitForVpp() error {
227 watcher, err := fsnotify.NewWatcher()
231 defer watcher.Close()
233 err = watcher.Add(watchedFolder)
238 if fileExists(watchedFile) {
243 ev := <-watcher.Events
244 if ev.Name == watchedFile && (ev.Op&fsnotify.Create) == fsnotify.Create {
251 // connectLoop attempts to connect to VPP until it succeeds.
252 // Then it continues with healthCheckLoop.
253 func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
254 // loop until connected
257 err := c.connectVPP()
259 // signal connected event
260 connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected}
265 // we are now connected, continue with health check loop
266 c.healthCheckLoop(connChan)
269 // healthCheckLoop checks whether connection to VPP is alive. In case of disconnect,
270 // it continues with connectLoop and tries to reconnect.
271 func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
272 // create a separate API channel for health check probes
273 ch, err := conn.NewAPIChannel()
275 log.Error("Error by creating health check API channel, health check will be disabled:", err)
279 // send health check probes until an error occurs
281 // wait for healthCheckProbeInterval
282 <-time.After(healthCheckProbeInterval)
284 if atomic.LoadUint32(&c.connected) == 0 {
285 // Disconnect has been called in the meantime, return the healthcheck - reconnect loop
286 log.Debug("Disconnected on request, exiting health check loop.")
290 // send the control ping
291 ch.ReqChan <- &api.VppRequest{Message: &vpe.ControlPing{}}
293 // expect response within timeout period
295 case vppReply := <-ch.ReplyChan:
297 case <-time.After(healthCheckReplyTimeout):
298 err = errors.New("probe reply not received within the timeout period")
301 // in case of error, break & disconnect
303 log.Errorf("VPP health check failed: %v", err)
304 // signal disconnected event via channel
305 connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
314 // we are now disconnected, start connect loop
315 c.connectLoop(connChan)
318 // NewAPIChannel returns a new API channel for communication with VPP via govpp core.
319 // It uses default buffer sizes for the request and reply Go channels.
320 func (c *Connection) NewAPIChannel() (*api.Channel, error) {
322 return nil, errors.New("nil connection passed in")
324 return c.NewAPIChannelBuffered(requestChannelBufSize, replyChannelBufSize)
327 // NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core.
328 // It allows to specify custom buffer sizes for the request and reply Go channels.
329 func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (*api.Channel, error) {
331 return nil, errors.New("nil connection passed in")
333 chID := atomic.AddUint32(&c.maxChannelID, 1)
334 chMeta := &channelMetadata{id: chID}
336 ch := api.NewChannelInternal(chMeta)
337 ch.MsgDecoder = c.codec
340 // create the communication channels
341 ch.ReqChan = make(chan *api.VppRequest, reqChanBufSize)
342 ch.ReplyChan = make(chan *api.VppReply, replyChanBufSize)
343 ch.NotifSubsChan = make(chan *api.NotifSubscribeRequest, reqChanBufSize)
344 ch.NotifSubsReplyChan = make(chan error, replyChanBufSize)
346 // store API channel within the client
347 c.channelsLock.Lock()
348 c.channels[chID] = ch
349 c.channelsLock.Unlock()
351 // start watching on the request channel
352 go c.watchRequests(ch, chMeta)
357 // releaseAPIChannel releases API channel that needs to be closed.
358 func (c *Connection) releaseAPIChannel(ch *api.Channel, chMeta *channelMetadata) {
359 log.WithFields(logger.Fields{
360 "context": chMeta.id,
361 }).Debug("API channel closed.")
363 // delete the channel from channels map
364 c.channelsLock.Lock()
365 delete(c.channels, chMeta.id)
366 c.channelsLock.Unlock()