Added VPP state 'NotResponding'
[govpp.git] / core / connection.go
1 // Copyright (c) 2017 Cisco and/or its affiliates.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 package core
16
17 import (
18         "errors"
19         "fmt"
20         "reflect"
21         "sync"
22         "sync/atomic"
23         "time"
24
25         logger "github.com/sirupsen/logrus"
26
27         "git.fd.io/govpp.git/adapter"
28         "git.fd.io/govpp.git/api"
29         "git.fd.io/govpp.git/codec"
30 )
31
32 const (
33         DefaultReconnectInterval    = time.Second / 2 // default interval between reconnect attempts
34         DefaultMaxReconnectAttempts = 3               // default maximum number of reconnect attempts
35 )
36
37 var (
38         RequestChanBufSize      = 100 // default size of the request channel buffer
39         ReplyChanBufSize        = 100 // default size of the reply channel buffer
40         NotificationChanBufSize = 100 // default size of the notification channel buffer
41 )
42
43 var (
44         HealthCheckProbeInterval = time.Second            // default health check probe interval
45         HealthCheckReplyTimeout  = time.Millisecond * 100 // timeout for reply to a health check probe
46         HealthCheckThreshold     = 1                      // number of failed health checks until the error is reported
47         DefaultReplyTimeout      = time.Second            // default timeout for replies from VPP
48 )
49
50 // ConnectionState represents the current state of the connection to VPP.
51 type ConnectionState int
52
53 const (
54         // Connected represents state in which the connection has been successfully established.
55         Connected ConnectionState = iota
56
57         // NotResponding represents a state where the VPP socket accepts messages but replies are received with delay,
58         // or not at all. GoVPP treats this state internally the same as disconnected.
59         NotResponding
60
61         // Disconnected represents state in which the VPP socket is closed and the connection is considered dropped.
62         Disconnected
63
64         // Failed represents state in which the reconnecting failed after exceeding maximum number of attempts.
65         Failed
66 )
67
68 func (s ConnectionState) String() string {
69         switch s {
70         case Connected:
71                 return "Connected"
72         case NotResponding:
73                 return "NotResponding"
74         case Disconnected:
75                 return "Disconnected"
76         case Failed:
77                 return "Failed"
78         default:
79                 return fmt.Sprintf("UnknownState(%d)", s)
80         }
81 }
82
83 // ConnectionEvent is a notification about change in the VPP connection state.
84 type ConnectionEvent struct {
85         // Timestamp holds the time when the event has been created.
86         Timestamp time.Time
87
88         // State holds the new state of the connection at the time when the event has been created.
89         State ConnectionState
90
91         // Error holds error if any encountered.
92         Error error
93 }
94
95 // Connection represents a shared memory connection to VPP via vppAdapter.
96 type Connection struct {
97         vppClient adapter.VppAPI // VPP binary API client
98
99         maxAttempts int           // interval for reconnect attempts
100         recInterval time.Duration // maximum number of reconnect attempts
101
102         vppConnected uint32 // non-zero if the adapter is connected to VPP
103
104         connChan chan ConnectionEvent // connection status events are sent to this channel
105
106         codec  MessageCodec           // message codec
107         msgIDs map[string]uint16      // map of message IDs indexed by message name + CRC
108         msgMap map[uint16]api.Message // map of messages indexed by message ID
109
110         maxChannelID uint32              // maximum used channel ID (the real limit is 2^15, 32-bit is used for atomic operations)
111         channelsLock sync.RWMutex        // lock for the channels map
112         channels     map[uint16]*Channel // map of all API channels indexed by the channel ID
113
114         subscriptionsLock sync.RWMutex                  // lock for the subscriptions map
115         subscriptions     map[uint16][]*subscriptionCtx // map od all notification subscriptions indexed by message ID
116
117         pingReqID   uint16 // ID if the ControlPing message
118         pingReplyID uint16 // ID of the ControlPingReply message
119
120         lastReplyLock sync.Mutex // lock for the last reply
121         lastReply     time.Time  // time of the last received reply from VPP
122
123         msgControlPing      api.Message
124         msgControlPingReply api.Message
125 }
126
127 func newConnection(binapi adapter.VppAPI, attempts int, interval time.Duration) *Connection {
128         if attempts == 0 {
129                 attempts = DefaultMaxReconnectAttempts
130         }
131         if interval == 0 {
132                 interval = DefaultReconnectInterval
133         }
134
135         c := &Connection{
136                 vppClient:           binapi,
137                 maxAttempts:         attempts,
138                 recInterval:         interval,
139                 connChan:            make(chan ConnectionEvent, NotificationChanBufSize),
140                 codec:               codec.DefaultCodec,
141                 msgIDs:              make(map[string]uint16),
142                 msgMap:              make(map[uint16]api.Message),
143                 channels:            make(map[uint16]*Channel),
144                 subscriptions:       make(map[uint16][]*subscriptionCtx),
145                 msgControlPing:      msgControlPing,
146                 msgControlPingReply: msgControlPingReply,
147         }
148         binapi.SetMsgCallback(c.msgCallback)
149         return c
150 }
151
152 // Connect connects to VPP API using specified adapter and returns a connection handle.
153 // This call blocks until it is either connected, or an error occurs.
154 // Only one connection attempt will be performed.
155 func Connect(binapi adapter.VppAPI) (*Connection, error) {
156         // create new connection handle
157         c := newConnection(binapi, DefaultMaxReconnectAttempts, DefaultReconnectInterval)
158
159         // blocking attempt to connect to VPP
160         if err := c.connectVPP(); err != nil {
161                 return nil, err
162         }
163
164         return c, nil
165 }
166
167 // AsyncConnect asynchronously connects to VPP using specified VPP adapter and returns the connection handle
168 // and ConnectionState channel. This call does not block until connection is established, it
169 // returns immediately. The caller is supposed to watch the returned ConnectionState channel for
170 // Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect.
171 func AsyncConnect(binapi adapter.VppAPI, attempts int, interval time.Duration) (*Connection, chan ConnectionEvent, error) {
172         // create new connection handle
173         c := newConnection(binapi, attempts, interval)
174
175         // asynchronously attempt to connect to VPP
176         go c.connectLoop()
177
178         return c, c.connChan, nil
179 }
180
181 // connectVPP performs blocking attempt to connect to VPP.
182 func (c *Connection) connectVPP() error {
183         log.Debug("Connecting to VPP..")
184
185         // blocking connect
186         if err := c.vppClient.Connect(); err != nil {
187                 return err
188         }
189         log.Debugf("Connected to VPP")
190
191         if err := c.retrieveMessageIDs(); err != nil {
192                 if err := c.vppClient.Disconnect(); err != nil {
193                         log.Debugf("disconnecting vpp client failed: %v", err)
194                 }
195                 return fmt.Errorf("VPP is incompatible: %v", err)
196         }
197
198         // store connected state
199         atomic.StoreUint32(&c.vppConnected, 1)
200
201         return nil
202 }
203
204 // Disconnect disconnects from VPP API and releases all connection-related resources.
205 func (c *Connection) Disconnect() {
206         if c == nil {
207                 return
208         }
209         if c.vppClient != nil {
210                 c.disconnectVPP()
211         }
212 }
213
214 // disconnectVPP disconnects from VPP in case it is connected.
215 func (c *Connection) disconnectVPP() {
216         if atomic.CompareAndSwapUint32(&c.vppConnected, 1, 0) {
217                 log.Debug("Disconnecting from VPP..")
218
219                 if err := c.vppClient.Disconnect(); err != nil {
220                         log.Debugf("Disconnect from VPP failed: %v", err)
221                 }
222                 log.Debug("Disconnected from VPP")
223         }
224 }
225
226 func (c *Connection) NewAPIChannel() (api.Channel, error) {
227         return c.newAPIChannel(RequestChanBufSize, ReplyChanBufSize)
228 }
229
230 func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (api.Channel, error) {
231         return c.newAPIChannel(reqChanBufSize, replyChanBufSize)
232 }
233
234 // NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core.
235 // It allows to specify custom buffer sizes for the request and reply Go channels.
236 func (c *Connection) newAPIChannel(reqChanBufSize, replyChanBufSize int) (*Channel, error) {
237         if c == nil {
238                 return nil, errors.New("nil connection passed in")
239         }
240
241         // create new channel
242         chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff)
243         channel := newChannel(chID, c, c.codec, c, reqChanBufSize, replyChanBufSize)
244
245         // store API channel within the client
246         c.channelsLock.Lock()
247         c.channels[chID] = channel
248         c.channelsLock.Unlock()
249
250         // start watching on the request channel
251         go c.watchRequests(channel)
252
253         return channel, nil
254 }
255
256 // releaseAPIChannel releases API channel that needs to be closed.
257 func (c *Connection) releaseAPIChannel(ch *Channel) {
258         log.WithFields(logger.Fields{
259                 "channel": ch.id,
260         }).Debug("API channel released")
261
262         // delete the channel from channels map
263         c.channelsLock.Lock()
264         delete(c.channels, ch.id)
265         c.channelsLock.Unlock()
266 }
267
268 // connectLoop attempts to connect to VPP until it succeeds.
269 // Then it continues with healthCheckLoop.
270 func (c *Connection) connectLoop() {
271         var reconnectAttempts int
272
273         // loop until connected
274         for {
275                 if err := c.vppClient.WaitReady(); err != nil {
276                         log.Debugf("wait ready failed: %v", err)
277                 }
278                 if err := c.connectVPP(); err == nil {
279                         // signal connected event
280                         c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Connected})
281                         break
282                 } else if reconnectAttempts < c.maxAttempts {
283                         reconnectAttempts++
284                         log.Warnf("connecting failed (attempt %d/%d): %v", reconnectAttempts, c.maxAttempts, err)
285                         time.Sleep(c.recInterval)
286                 } else {
287                         c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err})
288                         return
289                 }
290         }
291
292         // we are now connected, continue with health check loop
293         c.healthCheckLoop()
294 }
295
296 // healthCheckLoop checks whether connection to VPP is alive. In case of disconnect,
297 // it continues with connectLoop and tries to reconnect.
298 func (c *Connection) healthCheckLoop() {
299         // create a separate API channel for health check probes
300         ch, err := c.newAPIChannel(1, 1)
301         if err != nil {
302                 log.Error("Failed to create health check API channel, health check will be disabled:", err)
303                 return
304         }
305
306         var (
307                 sinceLastReply time.Duration
308                 failedChecks   int
309         )
310
311         // send health check probes until an error or timeout occurs
312         for {
313                 // sleep until next health check probe period
314                 time.Sleep(HealthCheckProbeInterval)
315
316                 if atomic.LoadUint32(&c.vppConnected) == 0 {
317                         // Disconnect has been called in the meantime, return the healthcheck - reconnect loop
318                         log.Debug("Disconnected on request, exiting health check loop.")
319                         return
320                 }
321
322                 // try draining probe replies from previous request before sending next one
323                 select {
324                 case <-ch.replyChan:
325                         log.Debug("drained old probe reply from reply channel")
326                 default:
327                 }
328
329                 // send the control ping request
330                 ch.reqChan <- &vppRequest{msg: c.msgControlPing}
331
332                 for {
333                         // expect response within timeout period
334                         select {
335                         case vppReply := <-ch.replyChan:
336                                 err = vppReply.err
337
338                         case <-time.After(HealthCheckReplyTimeout):
339                                 err = ErrProbeTimeout
340
341                                 // check if time since last reply from any other
342                                 // channel is less than health check reply timeout
343                                 c.lastReplyLock.Lock()
344                                 sinceLastReply = time.Since(c.lastReply)
345                                 c.lastReplyLock.Unlock()
346
347                                 if sinceLastReply < HealthCheckReplyTimeout {
348                                         log.Warnf("VPP health check probe timing out, but some request on other channel was received %v ago, continue waiting!", sinceLastReply)
349                                         continue
350                                 }
351                         }
352                         break
353                 }
354
355                 if err == ErrProbeTimeout {
356                         failedChecks++
357                         log.Warnf("VPP health check probe timed out after %v (%d. timeout)", HealthCheckReplyTimeout, failedChecks)
358                         if failedChecks > HealthCheckThreshold {
359                                 // in case of exceeded failed check threshold, assume VPP unresponsive
360                                 log.Errorf("VPP does not responding, the health check exceeded threshold for timeouts (>%d)", HealthCheckThreshold)
361                                 c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: NotResponding})
362                                 break
363                         }
364                 } else if err != nil {
365                         // in case of error, assume VPP disconnected
366                         log.Errorf("VPP health check probe failed: %v", err)
367                         c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: err})
368                         break
369                 } else if failedChecks > 0 {
370                         // in case of success after failed checks, clear failed check counter
371                         failedChecks = 0
372                         log.Infof("VPP health check probe OK")
373                 }
374         }
375
376         // cleanup
377         ch.Close()
378         c.disconnectVPP()
379
380         // we are now disconnected, start connect loop
381         c.connectLoop()
382 }
383
384 func getMsgNameWithCrc(x api.Message) string {
385         return getMsgID(x.GetMessageName(), x.GetCrcString())
386 }
387
388 func getMsgID(name, crc string) string {
389         return name + "_" + crc
390 }
391
392 func getMsgFactory(msg api.Message) func() api.Message {
393         return func() api.Message {
394                 return reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
395         }
396 }
397
398 // GetMessageID returns message identifier of given API message.
399 func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
400         if c == nil {
401                 return 0, errors.New("nil connection passed in")
402         }
403
404         if msgID, ok := c.msgIDs[getMsgNameWithCrc(msg)]; ok {
405                 return msgID, nil
406         }
407
408         msgID, err := c.vppClient.GetMsgID(msg.GetMessageName(), msg.GetCrcString())
409         if err != nil {
410                 return 0, err
411         }
412
413         c.msgIDs[getMsgNameWithCrc(msg)] = msgID
414         c.msgMap[msgID] = msg
415
416         return msgID, nil
417 }
418
419 // LookupByID looks up message name and crc by ID.
420 func (c *Connection) LookupByID(msgID uint16) (api.Message, error) {
421         if c == nil {
422                 return nil, errors.New("nil connection passed in")
423         }
424
425         if msg, ok := c.msgMap[msgID]; ok {
426                 return msg, nil
427         }
428
429         return nil, fmt.Errorf("unknown message ID: %d", msgID)
430 }
431
432 // retrieveMessageIDs retrieves IDs for all registered messages and stores them in map
433 func (c *Connection) retrieveMessageIDs() (err error) {
434         t := time.Now()
435
436         msgs := api.GetRegisteredMessages()
437
438         var n int
439         for name, msg := range msgs {
440                 typ := reflect.TypeOf(msg).Elem()
441                 path := fmt.Sprintf("%s.%s", typ.PkgPath(), typ.Name())
442
443                 msgID, err := c.GetMessageID(msg)
444                 if err != nil {
445                         if debugMsgIDs {
446                                 log.Debugf("retrieving message ID for %s failed: %v", path, err)
447                         }
448                         continue
449                 }
450                 n++
451
452                 if c.pingReqID == 0 && msg.GetMessageName() == c.msgControlPing.GetMessageName() {
453                         c.pingReqID = msgID
454                         c.msgControlPing = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
455                 } else if c.pingReplyID == 0 && msg.GetMessageName() == c.msgControlPingReply.GetMessageName() {
456                         c.pingReplyID = msgID
457                         c.msgControlPingReply = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
458                 }
459
460                 if debugMsgIDs {
461                         log.Debugf("message %q (%s) has ID: %d", name, getMsgNameWithCrc(msg), msgID)
462                 }
463         }
464         log.WithField("took", time.Since(t)).
465                 Debugf("retrieved IDs for %d messages (registered %d)", n, len(msgs))
466
467         return nil
468 }
469
470 func (c *Connection) sendConnEvent(event ConnectionEvent) {
471         select {
472         case c.connChan <- event:
473         default:
474                 log.Warn("Connection state channel is full, discarding value.")
475         }
476 }