Make healthCheck parameters configurable
[govpp.git] / core / core.go
1 // Copyright (c) 2017 Cisco and/or its affiliates.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 //go:generate binapi-generator --input-dir=bin_api --output-dir=bin_api
16
17 package core
18
19 import (
20         "errors"
21         "os"
22         "sync"
23         "sync/atomic"
24         "time"
25
26         logger "github.com/Sirupsen/logrus"
27
28         "git.fd.io/govpp.git/adapter"
29         "git.fd.io/govpp.git/api"
30         "git.fd.io/govpp.git/core/bin_api/vpe"
31         "github.com/fsnotify/fsnotify"
32 )
33
34 const (
35         requestChannelBufSize      = 100 // default size of the request channel buffers
36         replyChannelBufSize        = 100 // default size of the reply channel buffers
37         notificationChannelBufSize = 100 // default size of the notification channel buffers
38 )
39
40 var (
41         healthCheckProbeInterval = time.Second * 1        // default health check probe interval
42         healthCheckReplyTimeout  = time.Millisecond * 100 // timeout for reply to a health check probe
43         healthCheckThreshold     = 1                      // number of failed healthProbe until the error is reported
44 )
45
46 // ConnectionState holds the current state of the connection to VPP.
47 type ConnectionState int
48
49 const (
50         // Connected connection state means that the connection to VPP has been successfully established.
51         Connected ConnectionState = iota
52
53         // Disconnected connection state means that the connection to VPP has been lost.
54         Disconnected = iota
55 )
56
57 const (
58         // watchedFolder is a folder where vpp's shared memory is supposed to be created.
59         // File system events are monitored in this folder.
60         watchedFolder = "/dev/shm/"
61         // watchedFile is a name of the file in the watchedFolder. Once the file is present
62         // the vpp is ready to accept a new connection.
63         watchedFile = watchedFolder + "vpe-api"
64 )
65
66 // ConnectionEvent is a notification about change in the VPP connection state.
67 type ConnectionEvent struct {
68         // Timestamp holds the time when the event has been generated.
69         Timestamp time.Time
70
71         // State holds the new state of the connection to VPP at the time when the event has been generated.
72         State ConnectionState
73 }
74
75 // Connection represents a shared memory connection to VPP via vppAdapter.
76 type Connection struct {
77         vpp       adapter.VppAdapter // VPP adapter
78         connected uint32             // non-zero if the adapter is connected to VPP
79         codec     *MsgCodec          // message codec
80
81         msgIDs     map[string]uint16 // map of message IDs indexed by message name + CRC
82         msgIDsLock sync.RWMutex      // lock for the message IDs map
83
84         channels     map[uint32]*api.Channel // map of all API channels indexed by the channel ID
85         channelsLock sync.RWMutex            // lock for the channels map
86
87         notifSubscriptions     map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID
88         notifSubscriptionsLock sync.RWMutex                        // lock for the subscriptions map
89
90         maxChannelID uint32 // maximum used client ID
91         pingReqID    uint16 // ID if the ControlPing message
92         pingReplyID  uint16 // ID of the ControlPingReply message
93 }
94
95 // channelMetadata contains core-local metadata of an API channel.
96 type channelMetadata struct {
97         id        uint32 // channel ID
98         multipart uint32 // 1 if multipart request is being processed, 0 otherwise
99 }
100
101 var (
102         log      *logger.Logger // global logger
103         conn     *Connection    // global handle to the Connection (used in the message receive callback)
104         connLock sync.RWMutex   // lock for the global connection
105 )
106
107 // init initializes global logger, which logs debug level messages to stdout.
108 func init() {
109         log = logger.New()
110         log.Out = os.Stdout
111         log.Level = logger.DebugLevel
112 }
113
114 // SetLogger sets global logger to provided one.
115 func SetLogger(l *logger.Logger) {
116         log = l
117 }
118
119 // SetHealthCheckProbeInterval sets health check probe interval.
120 // Beware: Function is not thread-safe. It is recommended to setup this parameter
121 // before connecting to vpp.
122 func SetHealthCheckProbeInterval(interval time.Duration) {
123         healthCheckProbeInterval = interval
124 }
125
126 // SetHealthCheckReplyTimeout sets timeout for reply to a health check probe.
127 // If reply arrives after the timeout, check is considered as failed.
128 // Beware: Function is not thread-safe. It is recommended to setup this parameter
129 // before connecting to vpp.
130 func SetHealthCheckReplyTimeout(timeout time.Duration) {
131         healthCheckReplyTimeout = timeout
132 }
133
134 // SetHealthCheckThreshold sets the number of failed healthProbe checks until the error is reported.
135 // Beware: Function is not thread-safe. It is recommended to setup this parameter
136 // before connecting to vpp.
137 func SetHealthCheckThreshold(threshold int) {
138         healthCheckThreshold = threshold
139 }
140
141 // Connect connects to VPP using specified VPP adapter and returns the connection handle.
142 // This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed.
143 func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) {
144         // create new connection handle
145         c, err := newConnection(vppAdapter)
146         if err != nil {
147                 return nil, err
148         }
149
150         // blocking attempt to connect to VPP
151         err = c.connectVPP()
152         if err != nil {
153                 return nil, err
154         }
155
156         return conn, nil
157 }
158
159 // AsyncConnect asynchronously connects to VPP using specified VPP adapter and returns the connection handle
160 // and ConnectionState channel. This call does not block until connection is established, it
161 // returns immediately. The caller is supposed to watch the returned ConnectionState channel for
162 // Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect.
163 func AsyncConnect(vppAdapter adapter.VppAdapter) (*Connection, chan ConnectionEvent, error) {
164         // create new connection handle
165         c, err := newConnection(vppAdapter)
166         if err != nil {
167                 return nil, nil, err
168         }
169
170         // asynchronously attempt to connect to VPP
171         connChan := make(chan ConnectionEvent, notificationChannelBufSize)
172         go c.connectLoop(connChan)
173
174         return conn, connChan, nil
175 }
176
177 // Disconnect disconnects from VPP and releases all connection-related resources.
178 func (c *Connection) Disconnect() {
179         if c == nil {
180                 return
181         }
182         connLock.Lock()
183         defer connLock.Unlock()
184
185         if c != nil && c.vpp != nil {
186                 c.disconnectVPP()
187         }
188         conn = nil
189 }
190
191 // newConnection returns new connection handle.
192 func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) {
193         connLock.Lock()
194         defer connLock.Unlock()
195
196         if conn != nil {
197                 return nil, errors.New("only one connection per process is supported")
198         }
199
200         conn = &Connection{vpp: vppAdapter, codec: &MsgCodec{}}
201         conn.channels = make(map[uint32]*api.Channel)
202         conn.msgIDs = make(map[string]uint16)
203         conn.notifSubscriptions = make(map[uint16][]*api.NotifSubscription)
204
205         conn.vpp.SetMsgCallback(msgCallback)
206         return conn, nil
207 }
208
209 // connectVPP performs one blocking attempt to connect to VPP.
210 func (c *Connection) connectVPP() error {
211         log.Debug("Connecting to VPP...")
212
213         // blocking connect
214         err := c.vpp.Connect()
215         if err != nil {
216                 log.Warn(err)
217                 return err
218         }
219
220         // store connected state
221         atomic.StoreUint32(&c.connected, 1)
222
223         // store control ping IDs
224         c.pingReqID, _ = c.GetMessageID(&vpe.ControlPing{})
225         c.pingReplyID, _ = c.GetMessageID(&vpe.ControlPingReply{})
226
227         log.Info("Connected to VPP.")
228         return nil
229 }
230
231 // disconnectVPP disconnects from VPP in case it is connected.
232 func (c *Connection) disconnectVPP() {
233         if atomic.CompareAndSwapUint32(&c.connected, 1, 0) {
234                 c.vpp.Disconnect()
235         }
236 }
237
238 func fileExists(name string) bool {
239         if _, err := os.Stat(name); err != nil {
240                 if os.IsNotExist(err) {
241                         return false
242                 }
243         }
244         return true
245 }
246
247 // waitForVpp blocks until shared memory for sending bin api calls
248 // is present on the file system.
249 func waitForVpp() error {
250         watcher, err := fsnotify.NewWatcher()
251         if err != nil {
252                 return err
253         }
254         defer watcher.Close()
255
256         err = watcher.Add(watchedFolder)
257         if err != nil {
258                 return err
259         }
260
261         if fileExists(watchedFile) {
262                 return nil
263         }
264
265         for {
266                 ev := <-watcher.Events
267                 if ev.Name == watchedFile && (ev.Op&fsnotify.Create) == fsnotify.Create {
268                         break
269                 }
270         }
271         return nil
272 }
273
274 // connectLoop attempts to connect to VPP until it succeeds.
275 // Then it continues with healthCheckLoop.
276 func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
277         // loop until connected
278         for {
279                 waitForVpp()
280                 err := c.connectVPP()
281                 if err == nil {
282                         // signal connected event
283                         connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected}
284                         break
285                 }
286         }
287
288         // we are now connected, continue with health check loop
289         c.healthCheckLoop(connChan)
290 }
291
292 // healthCheckLoop checks whether connection to VPP is alive. In case of disconnect,
293 // it continues with connectLoop and tries to reconnect.
294 func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
295         // create a separate API channel for health check probes
296         ch, err := conn.NewAPIChannel()
297         if err != nil {
298                 log.Error("Error by creating health check API channel, health check will be disabled:", err)
299                 return
300         }
301
302         failedChecks := 0
303         // send health check probes until an error occurs
304         for {
305                 // wait for healthCheckProbeInterval
306                 <-time.After(healthCheckProbeInterval)
307
308                 if atomic.LoadUint32(&c.connected) == 0 {
309                         // Disconnect has been called in the meantime, return the healthcheck - reconnect loop
310                         log.Debug("Disconnected on request, exiting health check loop.")
311                         return
312                 }
313
314                 // send the control ping
315                 ch.ReqChan <- &api.VppRequest{Message: &vpe.ControlPing{}}
316
317                 // expect response within timeout period
318                 select {
319                 case vppReply := <-ch.ReplyChan:
320                         err = vppReply.Error
321                 case <-time.After(healthCheckReplyTimeout):
322                         err = errors.New("probe reply not received within the timeout period")
323                 }
324
325                 if err != nil {
326                         failedChecks++
327                 } else {
328                         failedChecks = 0
329                 }
330
331                 if failedChecks >= healthCheckThreshold {
332                         // in case of error, break & disconnect
333                         log.Errorf("VPP health check failed: %v", err)
334                         // signal disconnected event via channel
335                         connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
336                         break
337                 }
338         }
339
340         // cleanup
341         ch.Close()
342         c.disconnectVPP()
343
344         // we are now disconnected, start connect loop
345         c.connectLoop(connChan)
346 }
347
348 // NewAPIChannel returns a new API channel for communication with VPP via govpp core.
349 // It uses default buffer sizes for the request and reply Go channels.
350 func (c *Connection) NewAPIChannel() (*api.Channel, error) {
351         if c == nil {
352                 return nil, errors.New("nil connection passed in")
353         }
354         return c.NewAPIChannelBuffered(requestChannelBufSize, replyChannelBufSize)
355 }
356
357 // NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core.
358 // It allows to specify custom buffer sizes for the request and reply Go channels.
359 func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (*api.Channel, error) {
360         if c == nil {
361                 return nil, errors.New("nil connection passed in")
362         }
363         chID := atomic.AddUint32(&c.maxChannelID, 1)
364         chMeta := &channelMetadata{id: chID}
365
366         ch := api.NewChannelInternal(chMeta)
367         ch.MsgDecoder = c.codec
368         ch.MsgIdentifier = c
369
370         // create the communication channels
371         ch.ReqChan = make(chan *api.VppRequest, reqChanBufSize)
372         ch.ReplyChan = make(chan *api.VppReply, replyChanBufSize)
373         ch.NotifSubsChan = make(chan *api.NotifSubscribeRequest, reqChanBufSize)
374         ch.NotifSubsReplyChan = make(chan error, replyChanBufSize)
375
376         // store API channel within the client
377         c.channelsLock.Lock()
378         c.channels[chID] = ch
379         c.channelsLock.Unlock()
380
381         // start watching on the request channel
382         go c.watchRequests(ch, chMeta)
383
384         return ch, nil
385 }
386
387 // releaseAPIChannel releases API channel that needs to be closed.
388 func (c *Connection) releaseAPIChannel(ch *api.Channel, chMeta *channelMetadata) {
389         log.WithFields(logger.Fields{
390                 "context": chMeta.id,
391         }).Debug("API channel closed.")
392
393         // delete the channel from channels map
394         c.channelsLock.Lock()
395         delete(c.channels, chMeta.id)
396         c.channelsLock.Unlock()
397 }