Change module name to go.fd.io/govpp
[govpp.git] / core / connection.go
1 // Copyright (c) 2017 Cisco and/or its affiliates.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 package core
16
17 import (
18         "errors"
19         "fmt"
20         "path"
21         "reflect"
22         "sync"
23         "sync/atomic"
24         "time"
25
26         logger "github.com/sirupsen/logrus"
27
28         "go.fd.io/govpp/adapter"
29         "go.fd.io/govpp/api"
30         "go.fd.io/govpp/codec"
31 )
32
33 const (
34         DefaultReconnectInterval    = time.Second / 2 // default interval between reconnect attempts
35         DefaultMaxReconnectAttempts = 3               // default maximum number of reconnect attempts
36 )
37
38 var (
39         RequestChanBufSize      = 100 // default size of the request channel buffer
40         ReplyChanBufSize        = 100 // default size of the reply channel buffer
41         NotificationChanBufSize = 100 // default size of the notification channel buffer
42 )
43
44 var (
45         HealthCheckProbeInterval = time.Second            // default health check probe interval
46         HealthCheckReplyTimeout  = time.Millisecond * 250 // timeout for reply to a health check probe
47         HealthCheckThreshold     = 2                      // number of failed health checks until the error is reported
48         DefaultReplyTimeout      = time.Second            // default timeout for replies from VPP
49 )
50
51 // ConnectionState represents the current state of the connection to VPP.
52 type ConnectionState int
53
54 const (
55         // Connected represents state in which the connection has been successfully established.
56         Connected ConnectionState = iota
57
58         // NotResponding represents a state where the VPP socket accepts messages but replies are received with delay,
59         // or not at all. GoVPP treats this state internally the same as disconnected.
60         NotResponding
61
62         // Disconnected represents state in which the VPP socket is closed and the connection is considered dropped.
63         Disconnected
64
65         // Failed represents state in which the reconnecting failed after exceeding maximum number of attempts.
66         Failed
67 )
68
69 func (s ConnectionState) String() string {
70         switch s {
71         case Connected:
72                 return "Connected"
73         case NotResponding:
74                 return "NotResponding"
75         case Disconnected:
76                 return "Disconnected"
77         case Failed:
78                 return "Failed"
79         default:
80                 return fmt.Sprintf("UnknownState(%d)", s)
81         }
82 }
83
84 // ConnectionEvent is a notification about change in the VPP connection state.
85 type ConnectionEvent struct {
86         // Timestamp holds the time when the event has been created.
87         Timestamp time.Time
88
89         // State holds the new state of the connection at the time when the event has been created.
90         State ConnectionState
91
92         // Error holds error if any encountered.
93         Error error
94 }
95
96 // Connection represents a shared memory connection to VPP via vppAdapter.
97 type Connection struct {
98         vppClient adapter.VppAPI // VPP binary API client
99
100         maxAttempts int           // interval for reconnect attempts
101         recInterval time.Duration // maximum number of reconnect attempts
102
103         vppConnected uint32 // non-zero if the adapter is connected to VPP
104
105         connChan        chan ConnectionEvent // connection status events are sent to this channel
106         healthCheckDone chan struct{}        // used to terminate health check loop
107
108         codec        MessageCodec                      // message codec
109         msgIDs       map[string]uint16                 // map of message IDs indexed by message name + CRC
110         msgMapByPath map[string]map[uint16]api.Message // map of messages indexed by message ID which are indexed by path
111
112         channelsLock  sync.RWMutex        // lock for the channels map and the channel ID
113         nextChannelID uint16              // next potential channel ID (the real limit is 2^15)
114         channels      map[uint16]*Channel // map of all API channels indexed by the channel ID
115
116         subscriptionsLock sync.RWMutex                  // lock for the subscriptions map
117         subscriptions     map[uint16][]*subscriptionCtx // map od all notification subscriptions indexed by message ID
118
119         pingReqID   uint16 // ID if the ControlPing message
120         pingReplyID uint16 // ID of the ControlPingReply message
121
122         lastReplyLock sync.Mutex // lock for the last reply
123         lastReply     time.Time  // time of the last received reply from VPP
124
125         msgControlPing      api.Message
126         msgControlPingReply api.Message
127
128         apiTrace *trace // API tracer (disabled by default)
129 }
130
131 func newConnection(binapi adapter.VppAPI, attempts int, interval time.Duration) *Connection {
132         if attempts == 0 {
133                 attempts = DefaultMaxReconnectAttempts
134         }
135         if interval == 0 {
136                 interval = DefaultReconnectInterval
137         }
138
139         c := &Connection{
140                 vppClient:           binapi,
141                 maxAttempts:         attempts,
142                 recInterval:         interval,
143                 connChan:            make(chan ConnectionEvent, NotificationChanBufSize),
144                 healthCheckDone:     make(chan struct{}),
145                 codec:               codec.DefaultCodec,
146                 msgIDs:              make(map[string]uint16),
147                 msgMapByPath:        make(map[string]map[uint16]api.Message),
148                 channels:            make(map[uint16]*Channel),
149                 subscriptions:       make(map[uint16][]*subscriptionCtx),
150                 msgControlPing:      msgControlPing,
151                 msgControlPingReply: msgControlPingReply,
152                 apiTrace: &trace{
153                         list: make([]*api.Record, 0),
154                         mux:  &sync.Mutex{},
155                 },
156         }
157         binapi.SetMsgCallback(c.msgCallback)
158         return c
159 }
160
161 // Connect connects to VPP API using specified adapter and returns a connection handle.
162 // This call blocks until it is either connected, or an error occurs.
163 // Only one connection attempt will be performed.
164 func Connect(binapi adapter.VppAPI) (*Connection, error) {
165         // create new connection handle
166         c := newConnection(binapi, DefaultMaxReconnectAttempts, DefaultReconnectInterval)
167
168         // blocking attempt to connect to VPP
169         if err := c.connectVPP(); err != nil {
170                 return nil, err
171         }
172
173         return c, nil
174 }
175
176 // AsyncConnect asynchronously connects to VPP using specified VPP adapter and returns the connection handle
177 // and ConnectionState channel. This call does not block until connection is established, it
178 // returns immediately. The caller is supposed to watch the returned ConnectionState channel for
179 // Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect.
180 func AsyncConnect(binapi adapter.VppAPI, attempts int, interval time.Duration) (*Connection, chan ConnectionEvent, error) {
181         // create new connection handle
182         c := newConnection(binapi, attempts, interval)
183
184         // asynchronously attempt to connect to VPP
185         go c.connectLoop()
186
187         return c, c.connChan, nil
188 }
189
190 // connectVPP performs blocking attempt to connect to VPP.
191 func (c *Connection) connectVPP() error {
192         log.Debug("Connecting to VPP..")
193
194         // blocking connect
195         if err := c.vppClient.Connect(); err != nil {
196                 return err
197         }
198         log.Debugf("Connected to VPP")
199
200         if err := c.retrieveMessageIDs(); err != nil {
201                 if err := c.vppClient.Disconnect(); err != nil {
202                         log.Debugf("disconnecting vpp client failed: %v", err)
203                 }
204                 return fmt.Errorf("VPP is incompatible: %v", err)
205         }
206
207         // store connected state
208         atomic.StoreUint32(&c.vppConnected, 1)
209
210         return nil
211 }
212
213 // Disconnect disconnects from VPP API and releases all connection-related resources.
214 func (c *Connection) Disconnect() {
215         if c == nil {
216                 return
217         }
218         if c.vppClient != nil {
219                 c.disconnectVPP(true)
220         }
221 }
222
223 // disconnectVPP disconnects from VPP in case it is connected. terminate tells
224 // that disconnectVPP() was called from Close(), so healthCheckLoop() can be
225 // terminated.
226 func (c *Connection) disconnectVPP(terminate bool) {
227         if atomic.CompareAndSwapUint32(&c.vppConnected, 1, 0) {
228                 if terminate {
229                         close(c.healthCheckDone)
230                 }
231                 log.Debug("Disconnecting from VPP..")
232
233                 if err := c.vppClient.Disconnect(); err != nil {
234                         log.Debugf("Disconnect from VPP failed: %v", err)
235                 }
236                 log.Debug("Disconnected from VPP")
237         }
238 }
239
240 func (c *Connection) NewAPIChannel() (api.Channel, error) {
241         return c.newAPIChannel(RequestChanBufSize, ReplyChanBufSize)
242 }
243
244 func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (api.Channel, error) {
245         return c.newAPIChannel(reqChanBufSize, replyChanBufSize)
246 }
247
248 // NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core.
249 // It allows to specify custom buffer sizes for the request and reply Go channels.
250 func (c *Connection) newAPIChannel(reqChanBufSize, replyChanBufSize int) (*Channel, error) {
251         if c == nil {
252                 return nil, errors.New("nil connection passed in")
253         }
254
255         channel, err := c.newChannel(reqChanBufSize, replyChanBufSize)
256         if err != nil {
257                 return nil, err
258         }
259
260         // start watching on the request channel
261         go c.watchRequests(channel)
262
263         return channel, nil
264 }
265
266 // releaseAPIChannel releases API channel that needs to be closed.
267 func (c *Connection) releaseAPIChannel(ch *Channel) {
268         log.WithFields(logger.Fields{
269                 "channel": ch.id,
270         }).Debug("API channel released")
271
272         // delete the channel from channels map
273         c.channelsLock.Lock()
274         delete(c.channels, ch.id)
275         c.channelsLock.Unlock()
276 }
277
278 // connectLoop attempts to connect to VPP until it succeeds.
279 // Then it continues with healthCheckLoop.
280 func (c *Connection) connectLoop() {
281         var reconnectAttempts int
282
283         // loop until connected
284         for {
285                 if err := c.vppClient.WaitReady(); err != nil {
286                         log.Debugf("wait ready failed: %v", err)
287                 }
288                 if err := c.connectVPP(); err == nil {
289                         // signal connected event
290                         c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Connected})
291                         break
292                 } else if reconnectAttempts < c.maxAttempts {
293                         reconnectAttempts++
294                         log.Warnf("connecting failed (attempt %d/%d): %v", reconnectAttempts, c.maxAttempts, err)
295                         time.Sleep(c.recInterval)
296                 } else {
297                         c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err})
298                         return
299                 }
300         }
301
302         // we are now connected, continue with health check loop
303         c.healthCheckLoop()
304 }
305
306 // healthCheckLoop checks whether connection to VPP is alive. In case of disconnect,
307 // it continues with connectLoop and tries to reconnect.
308 func (c *Connection) healthCheckLoop() {
309         // create a separate API channel for health check probes
310         ch, err := c.newAPIChannel(1, 1)
311         if err != nil {
312                 log.Error("Failed to create health check API channel, health check will be disabled:", err)
313                 return
314         }
315         defer ch.Close()
316
317         var (
318                 sinceLastReply time.Duration
319                 failedChecks   int
320         )
321
322         // send health check probes until an error or timeout occurs
323         probeInterval := time.NewTicker(HealthCheckProbeInterval)
324         defer probeInterval.Stop()
325
326 HealthCheck:
327         for {
328                 select {
329                 case <-c.healthCheckDone:
330                         // Terminate the health check loop on connection disconnect
331                         log.Debug("Disconnected on request, exiting health check loop.")
332                         return
333                 case <-probeInterval.C:
334                         // try draining probe replies from previous request before sending next one
335                         select {
336                         case <-ch.replyChan:
337                                 log.Debug("drained old probe reply from reply channel")
338                         default:
339                         }
340
341                         // send the control ping request
342                         ch.reqChan <- &vppRequest{msg: c.msgControlPing}
343
344                         for {
345                                 // expect response within timeout period
346                                 select {
347                                 case vppReply := <-ch.replyChan:
348                                         err = vppReply.err
349
350                                 case <-time.After(HealthCheckReplyTimeout):
351                                         err = ErrProbeTimeout
352
353                                         // check if time since last reply from any other
354                                         // channel is less than health check reply timeout
355                                         c.lastReplyLock.Lock()
356                                         sinceLastReply = time.Since(c.lastReply)
357                                         c.lastReplyLock.Unlock()
358
359                                         if sinceLastReply < HealthCheckReplyTimeout {
360                                                 log.Warnf("VPP health check probe timing out, but some request on other channel was received %v ago, continue waiting!", sinceLastReply)
361                                                 continue
362                                         }
363                                 }
364                                 break
365                         }
366
367                         if err == ErrProbeTimeout {
368                                 failedChecks++
369                                 log.Warnf("VPP health check probe timed out after %v (%d. timeout)", HealthCheckReplyTimeout, failedChecks)
370                                 if failedChecks > HealthCheckThreshold {
371                                         // in case of exceeded failed check threshold, assume VPP unresponsive
372                                         log.Errorf("VPP does not responding, the health check exceeded threshold for timeouts (>%d)", HealthCheckThreshold)
373                                         c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: NotResponding})
374                                         break HealthCheck
375                                 }
376                         } else if err != nil {
377                                 // in case of error, assume VPP disconnected
378                                 log.Errorf("VPP health check probe failed: %v", err)
379                                 c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: err})
380                                 break HealthCheck
381                         } else if failedChecks > 0 {
382                                 // in case of success after failed checks, clear failed check counter
383                                 failedChecks = 0
384                                 log.Infof("VPP health check probe OK")
385                         }
386                 }
387         }
388
389         // cleanup
390         c.disconnectVPP(false)
391
392         // we are now disconnected, start connect loop
393         c.connectLoop()
394 }
395
396 func getMsgNameWithCrc(x api.Message) string {
397         return getMsgID(x.GetMessageName(), x.GetCrcString())
398 }
399
400 func getMsgID(name, crc string) string {
401         return name + "_" + crc
402 }
403
404 func getMsgFactory(msg api.Message) func() api.Message {
405         return func() api.Message {
406                 return reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
407         }
408 }
409
410 // GetMessageID returns message identifier of given API message.
411 func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
412         if c == nil {
413                 return 0, errors.New("nil connection passed in")
414         }
415         pkgPath := c.GetMessagePath(msg)
416         msgID, err := c.vppClient.GetMsgID(msg.GetMessageName(), msg.GetCrcString())
417         if err != nil {
418                 return 0, err
419         }
420         if pathMsgs, pathOk := c.msgMapByPath[pkgPath]; !pathOk {
421                 c.msgMapByPath[pkgPath] = make(map[uint16]api.Message)
422                 c.msgMapByPath[pkgPath][msgID] = msg
423         } else if _, msgOk := pathMsgs[msgID]; !msgOk {
424                 c.msgMapByPath[pkgPath][msgID] = msg
425         }
426         if _, ok := c.msgIDs[getMsgNameWithCrc(msg)]; ok {
427                 return msgID, nil
428         }
429         c.msgIDs[getMsgNameWithCrc(msg)] = msgID
430         return msgID, nil
431 }
432
433 // LookupByID looks up message name and crc by ID.
434 func (c *Connection) LookupByID(path string, msgID uint16) (api.Message, error) {
435         if c == nil {
436                 return nil, errors.New("nil connection passed in")
437         }
438         if msg, ok := c.msgMapByPath[path][msgID]; ok {
439                 return msg, nil
440         }
441         return nil, fmt.Errorf("unknown message ID %d for path '%s'", msgID, path)
442 }
443
444 // GetMessagePath returns path for the given message
445 func (c *Connection) GetMessagePath(msg api.Message) string {
446         return path.Dir(reflect.TypeOf(msg).Elem().PkgPath())
447 }
448
449 // retrieveMessageIDs retrieves IDs for all registered messages and stores them in map
450 func (c *Connection) retrieveMessageIDs() (err error) {
451         t := time.Now()
452
453         msgsByPath := api.GetRegisteredMessages()
454
455         var n int
456         for pkgPath, msgs := range msgsByPath {
457                 for _, msg := range msgs {
458                         msgID, err := c.GetMessageID(msg)
459                         if err != nil {
460                                 if debugMsgIDs {
461                                         log.Debugf("retrieving message ID for %s.%s failed: %v",
462                                                 pkgPath, msg.GetMessageName(), err)
463                                 }
464                                 continue
465                         }
466                         n++
467
468                         if c.pingReqID == 0 && msg.GetMessageName() == c.msgControlPing.GetMessageName() {
469                                 c.pingReqID = msgID
470                                 c.msgControlPing = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
471                         } else if c.pingReplyID == 0 && msg.GetMessageName() == c.msgControlPingReply.GetMessageName() {
472                                 c.pingReplyID = msgID
473                                 c.msgControlPingReply = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
474                         }
475
476                         if debugMsgIDs {
477                                 log.Debugf("message %q (%s) has ID: %d", msg.GetMessageName(), getMsgNameWithCrc(msg), msgID)
478                         }
479                 }
480                 log.WithField("took", time.Since(t)).
481                         Debugf("retrieved IDs for %d messages (registered %d) from path %s", n, len(msgs), pkgPath)
482         }
483
484         return nil
485 }
486
487 func (c *Connection) sendConnEvent(event ConnectionEvent) {
488         select {
489         case c.connChan <- event:
490         default:
491                 log.Warn("Connection state channel is full, discarding value.")
492         }
493 }
494
495 // Trace gives access to the API trace interface
496 func (c *Connection) Trace() api.Trace {
497         return c.apiTrace
498 }
499
500 // trace records api message
501 func (c *Connection) trace(msg api.Message, chId uint16, t time.Time, isReceived bool) {
502         if atomic.LoadInt32(&c.apiTrace.isEnabled) == 0 {
503                 return
504         }
505         entry := &api.Record{
506                 Message:    msg,
507                 Timestamp:  t,
508                 IsReceived: isReceived,
509                 ChannelID:  chId,
510         }
511         c.apiTrace.mux.Lock()
512         c.apiTrace.list = append(c.apiTrace.list, entry)
513         c.apiTrace.mux.Unlock()
514 }