Wait until vpp is ready + Update vendor
[govpp.git] / core / core.go
1 // Copyright (c) 2017 Cisco and/or its affiliates.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 //go:generate binapi-generator --input-dir=bin_api --output-dir=bin_api
16
17 package core
18
19 import (
20         "errors"
21         "os"
22         "sync"
23         "sync/atomic"
24         "time"
25
26         logger "github.com/Sirupsen/logrus"
27
28         "git.fd.io/govpp.git/adapter"
29         "git.fd.io/govpp.git/api"
30         "git.fd.io/govpp.git/core/bin_api/vpe"
31         "github.com/fsnotify/fsnotify"
32 )
33
34 const (
35         requestChannelBufSize      = 100 // default size of the request channel buffers
36         replyChannelBufSize        = 100 // default size of the reply channel buffers
37         notificationChannelBufSize = 100 // default size of the notification channel buffers
38 )
39
40 const (
41         healthCheckProbeInterval = time.Second * 1        // default health check probe interval
42         healthCheckReplyTimeout  = time.Millisecond * 100 // timeout for reply to a health check probe
43 )
44
45 // ConnectionState holds the current state of the connection to VPP.
46 type ConnectionState int
47
48 const (
49         // Connected connection state means that the connection to VPP has been successfully established.
50         Connected ConnectionState = iota
51
52         // Disconnected connection state means that the connection to VPP has been lost.
53         Disconnected = iota
54 )
55
56 const (
57         // watchedFolder is a folder where vpp's shared memory is supposed to be created.
58         // File system events are monitored in this folder.
59         watchedFolder = "/dev/shm/"
60         // watchedFile is a name of the file in the watchedFolder. Once the file is present
61         // the vpp is ready to accept a new connection.
62         watchedFile = watchedFolder + "vpe-api"
63 )
64
65 // ConnectionEvent is a notification about change in the VPP connection state.
66 type ConnectionEvent struct {
67         // Timestamp holds the time when the event has been generated.
68         Timestamp time.Time
69
70         // State holds the new state of the connection to VPP at the time when the event has been generated.
71         State ConnectionState
72 }
73
74 // Connection represents a shared memory connection to VPP via vppAdapter.
75 type Connection struct {
76         vpp       adapter.VppAdapter // VPP adapter
77         connected uint32             // non-zero if the adapter is connected to VPP
78         codec     *MsgCodec          // message codec
79
80         msgIDs     map[string]uint16 // map of message IDs indexed by message name + CRC
81         msgIDsLock sync.RWMutex      // lock for the message IDs map
82
83         channels     map[uint32]*api.Channel // map of all API channels indexed by the channel ID
84         channelsLock sync.RWMutex            // lock for the channels map
85
86         notifSubscriptions     map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID
87         notifSubscriptionsLock sync.RWMutex                        // lock for the subscriptions map
88
89         maxChannelID uint32 // maximum used client ID
90         pingReqID    uint16 // ID if the ControlPing message
91         pingReplyID  uint16 // ID of the ControlPingReply message
92 }
93
94 // channelMetadata contains core-local metadata of an API channel.
95 type channelMetadata struct {
96         id        uint32 // channel ID
97         multipart uint32 // 1 if multipart request is being processed, 0 otherwise
98 }
99
100 var (
101         log      *logger.Logger // global logger
102         conn     *Connection    // global handle to the Connection (used in the message receive callback)
103         connLock sync.RWMutex   // lock for the global connection
104 )
105
106 // init initializes global logger, which logs debug level messages to stdout.
107 func init() {
108         log = logger.New()
109         log.Out = os.Stdout
110         log.Level = logger.DebugLevel
111 }
112
113 // SetLogger sets global logger to provided one.
114 func SetLogger(l *logger.Logger) {
115         log = l
116 }
117
118 // Connect connects to VPP using specified VPP adapter and returns the connection handle.
119 // This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed.
120 func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) {
121         // create new connection handle
122         c, err := newConnection(vppAdapter)
123         if err != nil {
124                 return nil, err
125         }
126
127         // blocking attempt to connect to VPP
128         err = c.connectVPP()
129         if err != nil {
130                 return nil, err
131         }
132
133         return conn, nil
134 }
135
136 // AsyncConnect asynchronously connects to VPP using specified VPP adapter and returns the connection handle
137 // and ConnectionState channel. This call does not block until connection is established, it
138 // returns immediately. The caller is supposed to watch the returned ConnectionState channel for
139 // Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect.
140 func AsyncConnect(vppAdapter adapter.VppAdapter) (*Connection, chan ConnectionEvent, error) {
141         // create new connection handle
142         c, err := newConnection(vppAdapter)
143         if err != nil {
144                 return nil, nil, err
145         }
146
147         // asynchronously attempt to connect to VPP
148         connChan := make(chan ConnectionEvent, notificationChannelBufSize)
149         go c.connectLoop(connChan)
150
151         return conn, connChan, nil
152 }
153
154 // Disconnect disconnects from VPP and releases all connection-related resources.
155 func (c *Connection) Disconnect() {
156         if c == nil {
157                 return
158         }
159         connLock.Lock()
160         defer connLock.Unlock()
161
162         if c != nil && c.vpp != nil {
163                 c.disconnectVPP()
164         }
165         conn = nil
166 }
167
168 // newConnection returns new connection handle.
169 func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) {
170         connLock.Lock()
171         defer connLock.Unlock()
172
173         if conn != nil {
174                 return nil, errors.New("only one connection per process is supported")
175         }
176
177         conn = &Connection{vpp: vppAdapter, codec: &MsgCodec{}}
178         conn.channels = make(map[uint32]*api.Channel)
179         conn.msgIDs = make(map[string]uint16)
180         conn.notifSubscriptions = make(map[uint16][]*api.NotifSubscription)
181
182         conn.vpp.SetMsgCallback(msgCallback)
183         return conn, nil
184 }
185
186 // connectVPP performs one blocking attempt to connect to VPP.
187 func (c *Connection) connectVPP() error {
188         log.Debug("Connecting to VPP...")
189
190         // blocking connect
191         err := c.vpp.Connect()
192         if err != nil {
193                 log.Warn(err)
194                 return err
195         }
196
197         // store connected state
198         atomic.StoreUint32(&c.connected, 1)
199
200         // store control ping IDs
201         c.pingReqID, _ = c.GetMessageID(&vpe.ControlPing{})
202         c.pingReplyID, _ = c.GetMessageID(&vpe.ControlPingReply{})
203
204         log.Info("Connected to VPP.")
205         return nil
206 }
207
208 // disconnectVPP disconnects from VPP in case it is connected.
209 func (c *Connection) disconnectVPP() {
210         if atomic.CompareAndSwapUint32(&c.connected, 1, 0) {
211                 c.vpp.Disconnect()
212         }
213 }
214
215 func fileExists(name string) bool {
216         if _, err := os.Stat(name); err != nil {
217                 if os.IsNotExist(err) {
218                         return false
219                 }
220         }
221         return true
222 }
223
224 // waitForVpp blocks until shared memory for sending bin api calls
225 // is present on the file system.
226 func waitForVpp() error {
227         watcher, err := fsnotify.NewWatcher()
228         if err != nil {
229                 return err
230         }
231         defer watcher.Close()
232
233         err = watcher.Add(watchedFolder)
234         if err != nil {
235                 return err
236         }
237
238         if fileExists(watchedFile) {
239                 return nil
240         }
241
242         for {
243                 ev := <-watcher.Events
244                 if ev.Name == watchedFile && (ev.Op&fsnotify.Create) == fsnotify.Create {
245                         break
246                 }
247         }
248         return nil
249 }
250
251 // connectLoop attempts to connect to VPP until it succeeds.
252 // Then it continues with healthCheckLoop.
253 func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
254         // loop until connected
255         for {
256                 waitForVpp()
257                 err := c.connectVPP()
258                 if err == nil {
259                         // signal connected event
260                         connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected}
261                         break
262                 }
263         }
264
265         // we are now connected, continue with health check loop
266         c.healthCheckLoop(connChan)
267 }
268
269 // healthCheckLoop checks whether connection to VPP is alive. In case of disconnect,
270 // it continues with connectLoop and tries to reconnect.
271 func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
272         // create a separate API channel for health check probes
273         ch, err := conn.NewAPIChannel()
274         if err != nil {
275                 log.Error("Error by creating health check API channel, health check will be disabled:", err)
276                 return
277         }
278
279         // send health check probes until an error occurs
280         for {
281                 // wait for healthCheckProbeInterval
282                 <-time.After(healthCheckProbeInterval)
283
284                 if atomic.LoadUint32(&c.connected) == 0 {
285                         // Disconnect has been called in the meantime, return the healthcheck - reconnect loop
286                         log.Debug("Disconnected on request, exiting health check loop.")
287                         return
288                 }
289
290                 // send the control ping
291                 ch.ReqChan <- &api.VppRequest{Message: &vpe.ControlPing{}}
292
293                 // expect response within timeout period
294                 select {
295                 case vppReply := <-ch.ReplyChan:
296                         err = vppReply.Error
297                 case <-time.After(healthCheckReplyTimeout):
298                         err = errors.New("probe reply not received within the timeout period")
299                 }
300
301                 // in case of error, break & disconnect
302                 if err != nil {
303                         log.Errorf("VPP health check failed: %v", err)
304                         // signal disconnected event via channel
305                         connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
306                         break
307                 }
308         }
309
310         // cleanup
311         ch.Close()
312         c.disconnectVPP()
313
314         // we are now disconnected, start connect loop
315         c.connectLoop(connChan)
316 }
317
318 // NewAPIChannel returns a new API channel for communication with VPP via govpp core.
319 // It uses default buffer sizes for the request and reply Go channels.
320 func (c *Connection) NewAPIChannel() (*api.Channel, error) {
321         if c == nil {
322                 return nil, errors.New("nil connection passed in")
323         }
324         return c.NewAPIChannelBuffered(requestChannelBufSize, replyChannelBufSize)
325 }
326
327 // NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core.
328 // It allows to specify custom buffer sizes for the request and reply Go channels.
329 func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (*api.Channel, error) {
330         if c == nil {
331                 return nil, errors.New("nil connection passed in")
332         }
333         chID := atomic.AddUint32(&c.maxChannelID, 1)
334         chMeta := &channelMetadata{id: chID}
335
336         ch := api.NewChannelInternal(chMeta)
337         ch.MsgDecoder = c.codec
338         ch.MsgIdentifier = c
339
340         // create the communication channels
341         ch.ReqChan = make(chan *api.VppRequest, reqChanBufSize)
342         ch.ReplyChan = make(chan *api.VppReply, replyChanBufSize)
343         ch.NotifSubsChan = make(chan *api.NotifSubscribeRequest, reqChanBufSize)
344         ch.NotifSubsReplyChan = make(chan error, replyChanBufSize)
345
346         // store API channel within the client
347         c.channelsLock.Lock()
348         c.channels[chID] = ch
349         c.channelsLock.Unlock()
350
351         // start watching on the request channel
352         go c.watchRequests(ch, chMeta)
353
354         return ch, nil
355 }
356
357 // releaseAPIChannel releases API channel that needs to be closed.
358 func (c *Connection) releaseAPIChannel(ch *api.Channel, chMeta *channelMetadata) {
359         log.WithFields(logger.Fields{
360                 "context": chMeta.id,
361         }).Debug("API channel closed.")
362
363         // delete the channel from channels map
364         c.channelsLock.Lock()
365         delete(c.channels, chMeta.id)
366         c.channelsLock.Unlock()
367 }