Improve handling of probes on timeouts
[govpp.git] / core / core.go
1 // Copyright (c) 2017 Cisco and/or its affiliates.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 //go:generate binapi-generator --input-dir=bin_api --output-dir=bin_api
16
17 package core
18
19 import (
20         "errors"
21         "os"
22         "sync"
23         "sync/atomic"
24         "time"
25
26         logger "github.com/sirupsen/logrus"
27
28         "git.fd.io/govpp.git/adapter"
29         "git.fd.io/govpp.git/api"
30         "git.fd.io/govpp.git/core/bin_api/vpe"
31 )
32
33 var (
34         msgControlPing      api.Message = &vpe.ControlPing{}
35         msgControlPingReply api.Message = &vpe.ControlPingReply{}
36 )
37
38 const (
39         requestChannelBufSize      = 100 // default size of the request channel buffers
40         replyChannelBufSize        = 100 // default size of the reply channel buffers
41         notificationChannelBufSize = 100 // default size of the notification channel buffers
42 )
43
44 var (
45         healthCheckProbeInterval = time.Second * 1        // default health check probe interval
46         healthCheckReplyTimeout  = time.Millisecond * 100 // timeout for reply to a health check probe
47         healthCheckThreshold     = 1                      // number of failed healthProbe until the error is reported
48 )
49
50 // ConnectionState holds the current state of the connection to VPP.
51 type ConnectionState int
52
53 const (
54         // Connected connection state means that the connection to VPP has been successfully established.
55         Connected ConnectionState = iota
56
57         // Disconnected connection state means that the connection to VPP has been lost.
58         Disconnected
59 )
60
61 // ConnectionEvent is a notification about change in the VPP connection state.
62 type ConnectionEvent struct {
63         // Timestamp holds the time when the event has been generated.
64         Timestamp time.Time
65
66         // State holds the new state of the connection to VPP at the time when the event has been generated.
67         State ConnectionState
68 }
69
70 // Connection represents a shared memory connection to VPP via vppAdapter.
71 type Connection struct {
72         vpp       adapter.VppAdapter // VPP adapter
73         connected uint32             // non-zero if the adapter is connected to VPP
74         codec     *MsgCodec          // message codec
75
76         msgIDsLock sync.RWMutex      // lock for the message IDs map
77         msgIDs     map[string]uint16 // map of message IDs indexed by message name + CRC
78
79         channelsLock sync.RWMutex            // lock for the channels map
80         channels     map[uint32]*api.Channel // map of all API channels indexed by the channel ID
81
82         notifSubscriptionsLock sync.RWMutex                        // lock for the subscriptions map
83         notifSubscriptions     map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID
84
85         maxChannelID uint32 // maximum used client ID
86         pingReqID    uint16 // ID if the ControlPing message
87         pingReplyID  uint16 // ID of the ControlPingReply message
88
89         lastReplyLock sync.Mutex // lock for the last reply
90         lastReply     time.Time  // time of the last received reply from VPP
91 }
92
93 // channelMetadata contains core-local metadata of an API channel.
94 type channelMetadata struct {
95         id        uint32 // channel ID
96         multipart uint32 // 1 if multipart request is being processed, 0 otherwise
97 }
98
99 var (
100         log      *logger.Logger // global logger
101         conn     *Connection    // global handle to the Connection (used in the message receive callback)
102         connLock sync.RWMutex   // lock for the global connection
103 )
104
105 // init initializes global logger, which logs debug level messages to stdout.
106 func init() {
107         log = logger.New()
108         log.Out = os.Stdout
109         log.Level = logger.DebugLevel
110 }
111
112 // SetLogger sets global logger to provided one.
113 func SetLogger(l *logger.Logger) {
114         log = l
115 }
116
117 // SetHealthCheckProbeInterval sets health check probe interval.
118 // Beware: Function is not thread-safe. It is recommended to setup this parameter
119 // before connecting to vpp.
120 func SetHealthCheckProbeInterval(interval time.Duration) {
121         healthCheckProbeInterval = interval
122 }
123
124 // SetHealthCheckReplyTimeout sets timeout for reply to a health check probe.
125 // If reply arrives after the timeout, check is considered as failed.
126 // Beware: Function is not thread-safe. It is recommended to setup this parameter
127 // before connecting to vpp.
128 func SetHealthCheckReplyTimeout(timeout time.Duration) {
129         healthCheckReplyTimeout = timeout
130 }
131
132 // SetHealthCheckThreshold sets the number of failed healthProbe checks until the error is reported.
133 // Beware: Function is not thread-safe. It is recommended to setup this parameter
134 // before connecting to vpp.
135 func SetHealthCheckThreshold(threshold int) {
136         healthCheckThreshold = threshold
137 }
138
139 // SetControlPingMessages sets the messages for ControlPing and ControlPingReply
140 func SetControlPingMessages(controPing, controlPingReply api.Message) {
141         msgControlPing = controPing
142         msgControlPingReply = controlPingReply
143 }
144
145 // Connect connects to VPP using specified VPP adapter and returns the connection handle.
146 // This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed.
147 func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) {
148         // create new connection handle
149         c, err := newConnection(vppAdapter)
150         if err != nil {
151                 return nil, err
152         }
153
154         // blocking attempt to connect to VPP
155         err = c.connectVPP()
156         if err != nil {
157                 return nil, err
158         }
159
160         return conn, nil
161 }
162
163 // AsyncConnect asynchronously connects to VPP using specified VPP adapter and returns the connection handle
164 // and ConnectionState channel. This call does not block until connection is established, it
165 // returns immediately. The caller is supposed to watch the returned ConnectionState channel for
166 // Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect.
167 func AsyncConnect(vppAdapter adapter.VppAdapter) (*Connection, chan ConnectionEvent, error) {
168         // create new connection handle
169         c, err := newConnection(vppAdapter)
170         if err != nil {
171                 return nil, nil, err
172         }
173
174         // asynchronously attempt to connect to VPP
175         connChan := make(chan ConnectionEvent, notificationChannelBufSize)
176         go c.connectLoop(connChan)
177
178         return conn, connChan, nil
179 }
180
181 // Disconnect disconnects from VPP and releases all connection-related resources.
182 func (c *Connection) Disconnect() {
183         if c == nil {
184                 return
185         }
186         connLock.Lock()
187         defer connLock.Unlock()
188
189         if c != nil && c.vpp != nil {
190                 c.disconnectVPP()
191         }
192         conn = nil
193 }
194
195 // newConnection returns new connection handle.
196 func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) {
197         connLock.Lock()
198         defer connLock.Unlock()
199
200         if conn != nil {
201                 return nil, errors.New("only one connection per process is supported")
202         }
203
204         conn = &Connection{
205                 vpp:                vppAdapter,
206                 codec:              &MsgCodec{},
207                 channels:           make(map[uint32]*api.Channel),
208                 msgIDs:             make(map[string]uint16),
209                 notifSubscriptions: make(map[uint16][]*api.NotifSubscription),
210         }
211
212         conn.vpp.SetMsgCallback(msgCallback)
213         return conn, nil
214 }
215
216 // connectVPP performs one blocking attempt to connect to VPP.
217 func (c *Connection) connectVPP() error {
218         log.Debug("Connecting to VPP...")
219
220         // blocking connect
221         err := c.vpp.Connect()
222         if err != nil {
223                 log.Warn(err)
224                 return err
225         }
226
227         // store control ping IDs
228         if c.pingReqID, err = c.GetMessageID(msgControlPing); err != nil {
229                 c.vpp.Disconnect()
230                 return err
231         }
232         if c.pingReplyID, err = c.GetMessageID(msgControlPingReply); err != nil {
233                 c.vpp.Disconnect()
234                 return err
235         }
236
237         // store connected state
238         atomic.StoreUint32(&c.connected, 1)
239
240         log.Info("Connected to VPP.")
241         return nil
242 }
243
244 // disconnectVPP disconnects from VPP in case it is connected.
245 func (c *Connection) disconnectVPP() {
246         if atomic.CompareAndSwapUint32(&c.connected, 1, 0) {
247                 c.vpp.Disconnect()
248         }
249 }
250
251 // connectLoop attempts to connect to VPP until it succeeds.
252 // Then it continues with healthCheckLoop.
253 func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
254         // loop until connected
255         for {
256                 if err := c.vpp.WaitReady(); err != nil {
257                         log.Warnf("wait ready failed: %v", err)
258                 }
259                 if err := c.connectVPP(); err == nil {
260                         // signal connected event
261                         connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected}
262                         break
263                 } else {
264                         log.Errorf("connecting to VPP failed: %v", err)
265                         time.Sleep(time.Second)
266                 }
267         }
268
269         // we are now connected, continue with health check loop
270         c.healthCheckLoop(connChan)
271 }
272
273 // healthCheckLoop checks whether connection to VPP is alive. In case of disconnect,
274 // it continues with connectLoop and tries to reconnect.
275 func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
276         // create a separate API channel for health check probes
277         ch, err := conn.NewAPIChannelBuffered(1, 1)
278         if err != nil {
279                 log.Error("Failed to create health check API channel, health check will be disabled:", err)
280                 return
281         }
282
283         var sinceLastReply time.Duration
284         var failedChecks int
285
286         // send health check probes until an error or timeout occurs
287         for {
288                 // sleep until next health check probe period
289                 time.Sleep(healthCheckProbeInterval)
290
291                 if atomic.LoadUint32(&c.connected) == 0 {
292                         // Disconnect has been called in the meantime, return the healthcheck - reconnect loop
293                         log.Debug("Disconnected on request, exiting health check loop.")
294                         return
295                 }
296
297                 // try draining probe replies from previous request before sending next one
298                 select {
299                 case <-ch.ReplyChan:
300                         log.Debug("drained old probe reply from reply channel")
301                 default:
302                 }
303
304                 // send the control ping request
305                 ch.ReqChan <- &api.VppRequest{Message: msgControlPing}
306
307                 for {
308                         // expect response within timeout period
309                         select {
310                         case vppReply := <-ch.ReplyChan:
311                                 err = vppReply.Error
312
313                         case <-time.After(healthCheckReplyTimeout):
314                                 err = ErrProbeTimeout
315
316                                 // check if time since last reply from any other
317                                 // channel is less than health check reply timeout
318                                 conn.lastReplyLock.Lock()
319                                 sinceLastReply = time.Since(c.lastReply)
320                                 conn.lastReplyLock.Unlock()
321
322                                 if sinceLastReply < healthCheckReplyTimeout {
323                                         log.Warnf("VPP health check probe timing out, but some request on other channel was received %v ago, continue waiting!", sinceLastReply)
324                                         continue
325                                 }
326                         }
327                         break
328                 }
329
330                 if err == ErrProbeTimeout {
331                         failedChecks++
332                         log.Warnf("VPP health check probe timed out after %v (%d. timeout)", healthCheckReplyTimeout, failedChecks)
333                         if failedChecks > healthCheckThreshold {
334                                 // in case of exceeded treshold disconnect
335                                 log.Errorf("VPP health check exceeded treshold for timeouts (>%d), assuming disconnect", healthCheckThreshold)
336                                 connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
337                                 break
338                         }
339                 } else if err != nil {
340                         // in case of error disconnect
341                         log.Errorf("VPP health check probe failed: %v", err)
342                         connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
343                         break
344                 } else if failedChecks > 0 {
345                         failedChecks = 0
346                         log.Infof("VPP health check probe OK")
347                 }
348         }
349
350         // cleanup
351         ch.Close()
352         c.disconnectVPP()
353
354         // we are now disconnected, start connect loop
355         c.connectLoop(connChan)
356 }
357
358 // NewAPIChannel returns a new API channel for communication with VPP via govpp core.
359 // It uses default buffer sizes for the request and reply Go channels.
360 func (c *Connection) NewAPIChannel() (*api.Channel, error) {
361         if c == nil {
362                 return nil, errors.New("nil connection passed in")
363         }
364         return c.NewAPIChannelBuffered(requestChannelBufSize, replyChannelBufSize)
365 }
366
367 // NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core.
368 // It allows to specify custom buffer sizes for the request and reply Go channels.
369 func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (*api.Channel, error) {
370         if c == nil {
371                 return nil, errors.New("nil connection passed in")
372         }
373         chID := atomic.AddUint32(&c.maxChannelID, 1)
374         chMeta := &channelMetadata{id: chID}
375
376         ch := api.NewChannelInternal(chMeta)
377         ch.MsgDecoder = c.codec
378         ch.MsgIdentifier = c
379
380         // create the communication channels
381         ch.ReqChan = make(chan *api.VppRequest, reqChanBufSize)
382         ch.ReplyChan = make(chan *api.VppReply, replyChanBufSize)
383         ch.NotifSubsChan = make(chan *api.NotifSubscribeRequest, reqChanBufSize)
384         ch.NotifSubsReplyChan = make(chan error, replyChanBufSize)
385
386         // store API channel within the client
387         c.channelsLock.Lock()
388         c.channels[chID] = ch
389         c.channelsLock.Unlock()
390
391         // start watching on the request channel
392         go c.watchRequests(ch, chMeta)
393
394         return ch, nil
395 }
396
397 // releaseAPIChannel releases API channel that needs to be closed.
398 func (c *Connection) releaseAPIChannel(ch *api.Channel, chMeta *channelMetadata) {
399         log.WithFields(logger.Fields{
400                 "context": chMeta.id,
401         }).Debug("API channel closed.")
402
403         // delete the channel from channels map
404         c.channelsLock.Lock()
405         delete(c.channels, chMeta.id)
406         c.channelsLock.Unlock()
407 }