Optimizations for statsclient
[govpp.git] / core / connection.go
1 // Copyright (c) 2017 Cisco and/or its affiliates.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 package core
16
17 import (
18         "errors"
19         "fmt"
20         "reflect"
21         "sync"
22         "sync/atomic"
23         "time"
24
25         logger "github.com/sirupsen/logrus"
26
27         "git.fd.io/govpp.git/adapter"
28         "git.fd.io/govpp.git/api"
29         "git.fd.io/govpp.git/codec"
30 )
31
32 const (
33         DefaultReconnectInterval    = time.Second / 2 // default interval between reconnect attempts
34         DefaultMaxReconnectAttempts = 3               // default maximum number of reconnect attempts
35 )
36
37 var (
38         RequestChanBufSize      = 100 // default size of the request channel buffer
39         ReplyChanBufSize        = 100 // default size of the reply channel buffer
40         NotificationChanBufSize = 100 // default size of the notification channel buffer
41 )
42
43 var (
44         HealthCheckProbeInterval = time.Second            // default health check probe interval
45         HealthCheckReplyTimeout  = time.Millisecond * 100 // timeout for reply to a health check probe
46         HealthCheckThreshold     = 1                      // number of failed health checks until the error is reported
47         DefaultReplyTimeout      = time.Second            // default timeout for replies from VPP
48 )
49
50 // ConnectionState represents the current state of the connection to VPP.
51 type ConnectionState int
52
53 const (
54         // Connected represents state in which the connection has been successfully established.
55         Connected ConnectionState = iota
56
57         // Disconnected represents state in which the connection has been dropped.
58         Disconnected
59
60         // Failed represents state in which the reconnecting failed after exceeding maximum number of attempts.
61         Failed
62 )
63
64 func (s ConnectionState) String() string {
65         switch s {
66         case Connected:
67                 return "Connected"
68         case Disconnected:
69                 return "Disconnected"
70         case Failed:
71                 return "Failed"
72         default:
73                 return fmt.Sprintf("UnknownState(%d)", s)
74         }
75 }
76
77 // ConnectionEvent is a notification about change in the VPP connection state.
78 type ConnectionEvent struct {
79         // Timestamp holds the time when the event has been created.
80         Timestamp time.Time
81
82         // State holds the new state of the connection at the time when the event has been created.
83         State ConnectionState
84
85         // Error holds error if any encountered.
86         Error error
87 }
88
89 // Connection represents a shared memory connection to VPP via vppAdapter.
90 type Connection struct {
91         vppClient adapter.VppAPI // VPP binary API client
92
93         maxAttempts int           // interval for reconnect attempts
94         recInterval time.Duration // maximum number of reconnect attempts
95
96         vppConnected uint32 // non-zero if the adapter is connected to VPP
97
98         codec  *codec.MsgCodec        // message codec
99         msgIDs map[string]uint16      // map of message IDs indexed by message name + CRC
100         msgMap map[uint16]api.Message // map of messages indexed by message ID
101
102         maxChannelID uint32              // maximum used channel ID (the real limit is 2^15, 32-bit is used for atomic operations)
103         channelsLock sync.RWMutex        // lock for the channels map
104         channels     map[uint16]*Channel // map of all API channels indexed by the channel ID
105
106         subscriptionsLock sync.RWMutex                  // lock for the subscriptions map
107         subscriptions     map[uint16][]*subscriptionCtx // map od all notification subscriptions indexed by message ID
108
109         pingReqID   uint16 // ID if the ControlPing message
110         pingReplyID uint16 // ID of the ControlPingReply message
111
112         lastReplyLock sync.Mutex // lock for the last reply
113         lastReply     time.Time  // time of the last received reply from VPP
114 }
115
116 func newConnection(binapi adapter.VppAPI, attempts int, interval time.Duration) *Connection {
117         if attempts == 0 {
118                 attempts = DefaultMaxReconnectAttempts
119         }
120         if interval == 0 {
121                 interval = DefaultReconnectInterval
122         }
123
124         c := &Connection{
125                 vppClient:     binapi,
126                 maxAttempts:   attempts,
127                 recInterval:   interval,
128                 codec:         &codec.MsgCodec{},
129                 msgIDs:        make(map[string]uint16),
130                 msgMap:        make(map[uint16]api.Message),
131                 channels:      make(map[uint16]*Channel),
132                 subscriptions: make(map[uint16][]*subscriptionCtx),
133         }
134         binapi.SetMsgCallback(c.msgCallback)
135         return c
136 }
137
138 // Connect connects to VPP API using specified adapter and returns a connection handle.
139 // This call blocks until it is either connected, or an error occurs.
140 // Only one connection attempt will be performed.
141 func Connect(binapi adapter.VppAPI) (*Connection, error) {
142         // create new connection handle
143         c := newConnection(binapi, DefaultMaxReconnectAttempts, DefaultReconnectInterval)
144
145         // blocking attempt to connect to VPP
146         if err := c.connectVPP(); err != nil {
147                 return nil, err
148         }
149
150         return c, nil
151 }
152
153 // AsyncConnect asynchronously connects to VPP using specified VPP adapter and returns the connection handle
154 // and ConnectionState channel. This call does not block until connection is established, it
155 // returns immediately. The caller is supposed to watch the returned ConnectionState channel for
156 // Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect.
157 func AsyncConnect(binapi adapter.VppAPI, attempts int, interval time.Duration) (*Connection, chan ConnectionEvent, error) {
158         // create new connection handle
159         c := newConnection(binapi, attempts, interval)
160
161         // asynchronously attempt to connect to VPP
162         connChan := make(chan ConnectionEvent, NotificationChanBufSize)
163         go c.connectLoop(connChan)
164
165         return c, connChan, nil
166 }
167
168 // connectVPP performs blocking attempt to connect to VPP.
169 func (c *Connection) connectVPP() error {
170         log.Debug("Connecting to VPP..")
171
172         // blocking connect
173         if err := c.vppClient.Connect(); err != nil {
174                 return err
175         }
176         log.Debugf("Connected to VPP")
177
178         if err := c.retrieveMessageIDs(); err != nil {
179                 if err := c.vppClient.Disconnect(); err != nil {
180                         log.Debugf("disconnecting vpp client failed: %v", err)
181                 }
182                 return fmt.Errorf("VPP is incompatible: %v", err)
183         }
184
185         // store connected state
186         atomic.StoreUint32(&c.vppConnected, 1)
187
188         return nil
189 }
190
191 // Disconnect disconnects from VPP API and releases all connection-related resources.
192 func (c *Connection) Disconnect() {
193         if c == nil {
194                 return
195         }
196         if c.vppClient != nil {
197                 c.disconnectVPP()
198         }
199 }
200
201 // disconnectVPP disconnects from VPP in case it is connected.
202 func (c *Connection) disconnectVPP() {
203         if atomic.CompareAndSwapUint32(&c.vppConnected, 1, 0) {
204                 log.Debug("Disconnecting from VPP..")
205
206                 if err := c.vppClient.Disconnect(); err != nil {
207                         log.Debugf("Disconnect from VPP failed: %v", err)
208                 }
209                 log.Debug("Disconnected from VPP")
210         }
211 }
212
213 func (c *Connection) NewAPIChannel() (api.Channel, error) {
214         return c.newAPIChannel(RequestChanBufSize, ReplyChanBufSize)
215 }
216
217 func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (api.Channel, error) {
218         return c.newAPIChannel(reqChanBufSize, replyChanBufSize)
219 }
220
221 // NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core.
222 // It allows to specify custom buffer sizes for the request and reply Go channels.
223 func (c *Connection) newAPIChannel(reqChanBufSize, replyChanBufSize int) (*Channel, error) {
224         if c == nil {
225                 return nil, errors.New("nil connection passed in")
226         }
227
228         // create new channel
229         chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff)
230         channel := newChannel(chID, c, c.codec, c, reqChanBufSize, replyChanBufSize)
231
232         // store API channel within the client
233         c.channelsLock.Lock()
234         c.channels[chID] = channel
235         c.channelsLock.Unlock()
236
237         // start watching on the request channel
238         go c.watchRequests(channel)
239
240         return channel, nil
241 }
242
243 // releaseAPIChannel releases API channel that needs to be closed.
244 func (c *Connection) releaseAPIChannel(ch *Channel) {
245         log.WithFields(logger.Fields{
246                 "channel": ch.id,
247         }).Debug("API channel released")
248
249         // delete the channel from channels map
250         c.channelsLock.Lock()
251         delete(c.channels, ch.id)
252         c.channelsLock.Unlock()
253 }
254
255 // connectLoop attempts to connect to VPP until it succeeds.
256 // Then it continues with healthCheckLoop.
257 func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
258         var reconnectAttempts int
259
260         // loop until connected
261         for {
262                 if err := c.vppClient.WaitReady(); err != nil {
263                         log.Debugf("wait ready failed: %v", err)
264                 }
265                 if err := c.connectVPP(); err == nil {
266                         // signal connected event
267                         connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected}
268                         break
269                 } else if reconnectAttempts < c.maxAttempts {
270                         reconnectAttempts++
271                         log.Warnf("connecting failed (attempt %d/%d): %v", reconnectAttempts, c.maxAttempts, err)
272                         time.Sleep(c.recInterval)
273                 } else {
274                         connChan <- ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err}
275                         return
276                 }
277         }
278
279         // we are now connected, continue with health check loop
280         c.healthCheckLoop(connChan)
281 }
282
283 // healthCheckLoop checks whether connection to VPP is alive. In case of disconnect,
284 // it continues with connectLoop and tries to reconnect.
285 func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
286         // create a separate API channel for health check probes
287         ch, err := c.newAPIChannel(1, 1)
288         if err != nil {
289                 log.Error("Failed to create health check API channel, health check will be disabled:", err)
290                 return
291         }
292
293         var (
294                 sinceLastReply time.Duration
295                 failedChecks   int
296         )
297
298         // send health check probes until an error or timeout occurs
299         for {
300                 // sleep until next health check probe period
301                 time.Sleep(HealthCheckProbeInterval)
302
303                 if atomic.LoadUint32(&c.vppConnected) == 0 {
304                         // Disconnect has been called in the meantime, return the healthcheck - reconnect loop
305                         log.Debug("Disconnected on request, exiting health check loop.")
306                         return
307                 }
308
309                 // try draining probe replies from previous request before sending next one
310                 select {
311                 case <-ch.replyChan:
312                         log.Debug("drained old probe reply from reply channel")
313                 default:
314                 }
315
316                 // send the control ping request
317                 ch.reqChan <- &vppRequest{msg: msgControlPing}
318
319                 for {
320                         // expect response within timeout period
321                         select {
322                         case vppReply := <-ch.replyChan:
323                                 err = vppReply.err
324
325                         case <-time.After(HealthCheckReplyTimeout):
326                                 err = ErrProbeTimeout
327
328                                 // check if time since last reply from any other
329                                 // channel is less than health check reply timeout
330                                 c.lastReplyLock.Lock()
331                                 sinceLastReply = time.Since(c.lastReply)
332                                 c.lastReplyLock.Unlock()
333
334                                 if sinceLastReply < HealthCheckReplyTimeout {
335                                         log.Warnf("VPP health check probe timing out, but some request on other channel was received %v ago, continue waiting!", sinceLastReply)
336                                         continue
337                                 }
338                         }
339                         break
340                 }
341
342                 if err == ErrProbeTimeout {
343                         failedChecks++
344                         log.Warnf("VPP health check probe timed out after %v (%d. timeout)", HealthCheckReplyTimeout, failedChecks)
345                         if failedChecks > HealthCheckThreshold {
346                                 // in case of exceeded failed check treshold, assume VPP disconnected
347                                 log.Errorf("VPP health check exceeded treshold for timeouts (>%d), assuming disconnect", HealthCheckThreshold)
348                                 connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
349                                 break
350                         }
351                 } else if err != nil {
352                         // in case of error, assume VPP disconnected
353                         log.Errorf("VPP health check probe failed: %v", err)
354                         connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: err}
355                         break
356                 } else if failedChecks > 0 {
357                         // in case of success after failed checks, clear failed check counter
358                         failedChecks = 0
359                         log.Infof("VPP health check probe OK")
360                 }
361         }
362
363         // cleanup
364         ch.Close()
365         c.disconnectVPP()
366
367         // we are now disconnected, start connect loop
368         c.connectLoop(connChan)
369 }
370
371 func getMsgNameWithCrc(x api.Message) string {
372         return x.GetMessageName() + "_" + x.GetCrcString()
373 }
374
375 func getMsgFactory(msg api.Message) func() api.Message {
376         return func() api.Message {
377                 return reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
378         }
379 }
380
381 // GetMessageID returns message identifier of given API message.
382 func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
383         if c == nil {
384                 return 0, errors.New("nil connection passed in")
385         }
386
387         if msgID, ok := c.msgIDs[getMsgNameWithCrc(msg)]; ok {
388                 return msgID, nil
389         }
390
391         msgID, err := c.vppClient.GetMsgID(msg.GetMessageName(), msg.GetCrcString())
392         if err != nil {
393                 return 0, err
394         }
395
396         c.msgIDs[getMsgNameWithCrc(msg)] = msgID
397         c.msgMap[msgID] = msg
398
399         return msgID, nil
400 }
401
402 // LookupByID looks up message name and crc by ID.
403 func (c *Connection) LookupByID(msgID uint16) (api.Message, error) {
404         if c == nil {
405                 return nil, errors.New("nil connection passed in")
406         }
407
408         if msg, ok := c.msgMap[msgID]; ok {
409                 return msg, nil
410         }
411
412         return nil, fmt.Errorf("unknown message ID: %d", msgID)
413 }
414
415 // retrieveMessageIDs retrieves IDs for all registered messages and stores them in map
416 func (c *Connection) retrieveMessageIDs() (err error) {
417         t := time.Now()
418
419         msgs := api.GetRegisteredMessages()
420
421         var n int
422         for name, msg := range msgs {
423                 msgID, err := c.GetMessageID(msg)
424                 if err != nil {
425                         log.Debugf("retrieving msgID for %s failed: %v", name, err)
426                         continue
427                 }
428                 n++
429
430                 if c.pingReqID == 0 && msg.GetMessageName() == msgControlPing.GetMessageName() {
431                         c.pingReqID = msgID
432                         msgControlPing = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
433                 } else if c.pingReplyID == 0 && msg.GetMessageName() == msgControlPingReply.GetMessageName() {
434                         c.pingReplyID = msgID
435                         msgControlPingReply = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
436                 }
437
438                 if debugMsgIDs {
439                         log.Debugf("message %q (%s) has ID: %d", name, getMsgNameWithCrc(msg), msgID)
440                 }
441         }
442         log.Debugf("retrieved %d/%d msgIDs (took %s)", n, len(msgs), time.Since(t))
443
444         return nil
445 }