cfa94eee96f858a20a7d2624c72ebdab5aec7706
[govpp.git] / core / connection.go
1 // Copyright (c) 2017 Cisco and/or its affiliates.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 package core
16
17 import (
18         "errors"
19         "fmt"
20         "reflect"
21         "sync"
22         "sync/atomic"
23         "time"
24
25         logger "github.com/sirupsen/logrus"
26
27         "git.fd.io/govpp.git/adapter"
28         "git.fd.io/govpp.git/api"
29         "git.fd.io/govpp.git/codec"
30 )
31
32 const (
33         DefaultReconnectInterval    = time.Second / 2 // default interval between reconnect attempts
34         DefaultMaxReconnectAttempts = 3               // default maximum number of reconnect attempts
35 )
36
37 var (
38         RequestChanBufSize      = 100 // default size of the request channel buffer
39         ReplyChanBufSize        = 100 // default size of the reply channel buffer
40         NotificationChanBufSize = 100 // default size of the notification channel buffer
41 )
42
43 var (
44         HealthCheckProbeInterval = time.Second            // default health check probe interval
45         HealthCheckReplyTimeout  = time.Millisecond * 100 // timeout for reply to a health check probe
46         HealthCheckThreshold     = 1                      // number of failed health checks until the error is reported
47         DefaultReplyTimeout      = time.Second            // default timeout for replies from VPP
48 )
49
50 // ConnectionState represents the current state of the connection to VPP.
51 type ConnectionState int
52
53 const (
54         // Connected represents state in which the connection has been successfully established.
55         Connected ConnectionState = iota
56
57         // Disconnected represents state in which the connection has been dropped.
58         Disconnected
59
60         // Failed represents state in which the reconnecting failed after exceeding maximum number of attempts.
61         Failed
62 )
63
64 func (s ConnectionState) String() string {
65         switch s {
66         case Connected:
67                 return "Connected"
68         case Disconnected:
69                 return "Disconnected"
70         case Failed:
71                 return "Failed"
72         default:
73                 return fmt.Sprintf("UnknownState(%d)", s)
74         }
75 }
76
77 // ConnectionEvent is a notification about change in the VPP connection state.
78 type ConnectionEvent struct {
79         // Timestamp holds the time when the event has been created.
80         Timestamp time.Time
81
82         // State holds the new state of the connection at the time when the event has been created.
83         State ConnectionState
84
85         // Error holds error if any encountered.
86         Error error
87 }
88
89 // Connection represents a shared memory connection to VPP via vppAdapter.
90 type Connection struct {
91         vppClient adapter.VppAPI // VPP binary API client
92
93         maxAttempts int           // interval for reconnect attempts
94         recInterval time.Duration // maximum number of reconnect attempts
95
96         vppConnected uint32 // non-zero if the adapter is connected to VPP
97
98         codec  MessageCodec           // message codec
99         msgIDs map[string]uint16      // map of message IDs indexed by message name + CRC
100         msgMap map[uint16]api.Message // map of messages indexed by message ID
101
102         maxChannelID uint32              // maximum used channel ID (the real limit is 2^15, 32-bit is used for atomic operations)
103         channelsLock sync.RWMutex        // lock for the channels map
104         channels     map[uint16]*Channel // map of all API channels indexed by the channel ID
105
106         subscriptionsLock sync.RWMutex                  // lock for the subscriptions map
107         subscriptions     map[uint16][]*subscriptionCtx // map od all notification subscriptions indexed by message ID
108
109         pingReqID   uint16 // ID if the ControlPing message
110         pingReplyID uint16 // ID of the ControlPingReply message
111
112         lastReplyLock sync.Mutex // lock for the last reply
113         lastReply     time.Time  // time of the last received reply from VPP
114
115         msgControlPing      api.Message
116         msgControlPingReply api.Message
117 }
118
119 func newConnection(binapi adapter.VppAPI, attempts int, interval time.Duration) *Connection {
120         if attempts == 0 {
121                 attempts = DefaultMaxReconnectAttempts
122         }
123         if interval == 0 {
124                 interval = DefaultReconnectInterval
125         }
126
127         c := &Connection{
128                 vppClient:           binapi,
129                 maxAttempts:         attempts,
130                 recInterval:         interval,
131                 codec:               codec.DefaultCodec,
132                 msgIDs:              make(map[string]uint16),
133                 msgMap:              make(map[uint16]api.Message),
134                 channels:            make(map[uint16]*Channel),
135                 subscriptions:       make(map[uint16][]*subscriptionCtx),
136                 msgControlPing:      msgControlPing,
137                 msgControlPingReply: msgControlPingReply,
138         }
139         binapi.SetMsgCallback(c.msgCallback)
140         return c
141 }
142
143 // Connect connects to VPP API using specified adapter and returns a connection handle.
144 // This call blocks until it is either connected, or an error occurs.
145 // Only one connection attempt will be performed.
146 func Connect(binapi adapter.VppAPI) (*Connection, error) {
147         // create new connection handle
148         c := newConnection(binapi, DefaultMaxReconnectAttempts, DefaultReconnectInterval)
149
150         // blocking attempt to connect to VPP
151         if err := c.connectVPP(); err != nil {
152                 return nil, err
153         }
154
155         return c, nil
156 }
157
158 // AsyncConnect asynchronously connects to VPP using specified VPP adapter and returns the connection handle
159 // and ConnectionState channel. This call does not block until connection is established, it
160 // returns immediately. The caller is supposed to watch the returned ConnectionState channel for
161 // Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect.
162 func AsyncConnect(binapi adapter.VppAPI, attempts int, interval time.Duration) (*Connection, chan ConnectionEvent, error) {
163         // create new connection handle
164         c := newConnection(binapi, attempts, interval)
165
166         // asynchronously attempt to connect to VPP
167         connChan := make(chan ConnectionEvent, NotificationChanBufSize)
168         go c.connectLoop(connChan)
169
170         return c, connChan, nil
171 }
172
173 // connectVPP performs blocking attempt to connect to VPP.
174 func (c *Connection) connectVPP() error {
175         log.Debug("Connecting to VPP..")
176
177         // blocking connect
178         if err := c.vppClient.Connect(); err != nil {
179                 return err
180         }
181         log.Debugf("Connected to VPP")
182
183         if err := c.retrieveMessageIDs(); err != nil {
184                 if err := c.vppClient.Disconnect(); err != nil {
185                         log.Debugf("disconnecting vpp client failed: %v", err)
186                 }
187                 return fmt.Errorf("VPP is incompatible: %v", err)
188         }
189
190         // store connected state
191         atomic.StoreUint32(&c.vppConnected, 1)
192
193         return nil
194 }
195
196 // Disconnect disconnects from VPP API and releases all connection-related resources.
197 func (c *Connection) Disconnect() {
198         if c == nil {
199                 return
200         }
201         if c.vppClient != nil {
202                 c.disconnectVPP()
203         }
204 }
205
206 // disconnectVPP disconnects from VPP in case it is connected.
207 func (c *Connection) disconnectVPP() {
208         if atomic.CompareAndSwapUint32(&c.vppConnected, 1, 0) {
209                 log.Debug("Disconnecting from VPP..")
210
211                 if err := c.vppClient.Disconnect(); err != nil {
212                         log.Debugf("Disconnect from VPP failed: %v", err)
213                 }
214                 log.Debug("Disconnected from VPP")
215         }
216 }
217
218 func (c *Connection) NewAPIChannel() (api.Channel, error) {
219         return c.newAPIChannel(RequestChanBufSize, ReplyChanBufSize)
220 }
221
222 func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (api.Channel, error) {
223         return c.newAPIChannel(reqChanBufSize, replyChanBufSize)
224 }
225
226 // NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core.
227 // It allows to specify custom buffer sizes for the request and reply Go channels.
228 func (c *Connection) newAPIChannel(reqChanBufSize, replyChanBufSize int) (*Channel, error) {
229         if c == nil {
230                 return nil, errors.New("nil connection passed in")
231         }
232
233         // create new channel
234         chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff)
235         channel := newChannel(chID, c, c.codec, c, reqChanBufSize, replyChanBufSize)
236
237         // store API channel within the client
238         c.channelsLock.Lock()
239         c.channels[chID] = channel
240         c.channelsLock.Unlock()
241
242         // start watching on the request channel
243         go c.watchRequests(channel)
244
245         return channel, nil
246 }
247
248 // releaseAPIChannel releases API channel that needs to be closed.
249 func (c *Connection) releaseAPIChannel(ch *Channel) {
250         log.WithFields(logger.Fields{
251                 "channel": ch.id,
252         }).Debug("API channel released")
253
254         // delete the channel from channels map
255         c.channelsLock.Lock()
256         delete(c.channels, ch.id)
257         c.channelsLock.Unlock()
258 }
259
260 // connectLoop attempts to connect to VPP until it succeeds.
261 // Then it continues with healthCheckLoop.
262 func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
263         var reconnectAttempts int
264
265         // loop until connected
266         for {
267                 if err := c.vppClient.WaitReady(); err != nil {
268                         log.Debugf("wait ready failed: %v", err)
269                 }
270                 if err := c.connectVPP(); err == nil {
271                         // signal connected event
272                         connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected}
273                         break
274                 } else if reconnectAttempts < c.maxAttempts {
275                         reconnectAttempts++
276                         log.Warnf("connecting failed (attempt %d/%d): %v", reconnectAttempts, c.maxAttempts, err)
277                         time.Sleep(c.recInterval)
278                 } else {
279                         connChan <- ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err}
280                         return
281                 }
282         }
283
284         // we are now connected, continue with health check loop
285         c.healthCheckLoop(connChan)
286 }
287
288 // healthCheckLoop checks whether connection to VPP is alive. In case of disconnect,
289 // it continues with connectLoop and tries to reconnect.
290 func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
291         // create a separate API channel for health check probes
292         ch, err := c.newAPIChannel(1, 1)
293         if err != nil {
294                 log.Error("Failed to create health check API channel, health check will be disabled:", err)
295                 return
296         }
297
298         var (
299                 sinceLastReply time.Duration
300                 failedChecks   int
301         )
302
303         // send health check probes until an error or timeout occurs
304         for {
305                 // sleep until next health check probe period
306                 time.Sleep(HealthCheckProbeInterval)
307
308                 if atomic.LoadUint32(&c.vppConnected) == 0 {
309                         // Disconnect has been called in the meantime, return the healthcheck - reconnect loop
310                         log.Debug("Disconnected on request, exiting health check loop.")
311                         return
312                 }
313
314                 // try draining probe replies from previous request before sending next one
315                 select {
316                 case <-ch.replyChan:
317                         log.Debug("drained old probe reply from reply channel")
318                 default:
319                 }
320
321                 // send the control ping request
322                 ch.reqChan <- &vppRequest{msg: c.msgControlPing}
323
324                 for {
325                         // expect response within timeout period
326                         select {
327                         case vppReply := <-ch.replyChan:
328                                 err = vppReply.err
329
330                         case <-time.After(HealthCheckReplyTimeout):
331                                 err = ErrProbeTimeout
332
333                                 // check if time since last reply from any other
334                                 // channel is less than health check reply timeout
335                                 c.lastReplyLock.Lock()
336                                 sinceLastReply = time.Since(c.lastReply)
337                                 c.lastReplyLock.Unlock()
338
339                                 if sinceLastReply < HealthCheckReplyTimeout {
340                                         log.Warnf("VPP health check probe timing out, but some request on other channel was received %v ago, continue waiting!", sinceLastReply)
341                                         continue
342                                 }
343                         }
344                         break
345                 }
346
347                 if err == ErrProbeTimeout {
348                         failedChecks++
349                         log.Warnf("VPP health check probe timed out after %v (%d. timeout)", HealthCheckReplyTimeout, failedChecks)
350                         if failedChecks > HealthCheckThreshold {
351                                 // in case of exceeded failed check treshold, assume VPP disconnected
352                                 log.Errorf("VPP health check exceeded treshold for timeouts (>%d), assuming disconnect", HealthCheckThreshold)
353                                 connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
354                                 break
355                         }
356                 } else if err != nil {
357                         // in case of error, assume VPP disconnected
358                         log.Errorf("VPP health check probe failed: %v", err)
359                         connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: err}
360                         break
361                 } else if failedChecks > 0 {
362                         // in case of success after failed checks, clear failed check counter
363                         failedChecks = 0
364                         log.Infof("VPP health check probe OK")
365                 }
366         }
367
368         // cleanup
369         ch.Close()
370         c.disconnectVPP()
371
372         // we are now disconnected, start connect loop
373         c.connectLoop(connChan)
374 }
375
376 func getMsgNameWithCrc(x api.Message) string {
377         return getMsgID(x.GetMessageName(), x.GetCrcString())
378 }
379
380 func getMsgID(name, crc string) string {
381         return name + "_" + crc
382 }
383
384 func getMsgFactory(msg api.Message) func() api.Message {
385         return func() api.Message {
386                 return reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
387         }
388 }
389
390 // GetMessageID returns message identifier of given API message.
391 func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
392         if c == nil {
393                 return 0, errors.New("nil connection passed in")
394         }
395
396         if msgID, ok := c.msgIDs[getMsgNameWithCrc(msg)]; ok {
397                 return msgID, nil
398         }
399
400         msgID, err := c.vppClient.GetMsgID(msg.GetMessageName(), msg.GetCrcString())
401         if err != nil {
402                 return 0, err
403         }
404
405         c.msgIDs[getMsgNameWithCrc(msg)] = msgID
406         c.msgMap[msgID] = msg
407
408         return msgID, nil
409 }
410
411 // LookupByID looks up message name and crc by ID.
412 func (c *Connection) LookupByID(msgID uint16) (api.Message, error) {
413         if c == nil {
414                 return nil, errors.New("nil connection passed in")
415         }
416
417         if msg, ok := c.msgMap[msgID]; ok {
418                 return msg, nil
419         }
420
421         return nil, fmt.Errorf("unknown message ID: %d", msgID)
422 }
423
424 // retrieveMessageIDs retrieves IDs for all registered messages and stores them in map
425 func (c *Connection) retrieveMessageIDs() (err error) {
426         t := time.Now()
427
428         msgs := api.GetRegisteredMessages()
429
430         var n int
431         for name, msg := range msgs {
432                 typ := reflect.TypeOf(msg).Elem()
433                 path := fmt.Sprintf("%s.%s", typ.PkgPath(), typ.Name())
434
435                 msgID, err := c.GetMessageID(msg)
436                 if err != nil {
437                         if debugMsgIDs {
438                                 log.Debugf("retrieving message ID for %s failed: %v", path, err)
439                         }
440                         continue
441                 }
442                 n++
443
444                 if c.pingReqID == 0 && msg.GetMessageName() == c.msgControlPing.GetMessageName() {
445                         c.pingReqID = msgID
446                         c.msgControlPing = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
447                 } else if c.pingReplyID == 0 && msg.GetMessageName() == c.msgControlPingReply.GetMessageName() {
448                         c.pingReplyID = msgID
449                         c.msgControlPingReply = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
450                 }
451
452                 if debugMsgIDs {
453                         log.Debugf("message %q (%s) has ID: %d", name, getMsgNameWithCrc(msg), msgID)
454                 }
455         }
456         log.WithField("took", time.Since(t)).
457                 Debugf("retrieved IDs for %d messages (registered %d)", n, len(msgs))
458
459         return nil
460 }