Fixed incorrect message error in the stream API
[govpp.git] / core / connection.go
1 // Copyright (c) 2017 Cisco and/or its affiliates.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 package core
16
17 import (
18         "errors"
19         "fmt"
20         "path"
21         "reflect"
22         "sync"
23         "sync/atomic"
24         "time"
25
26         logger "github.com/sirupsen/logrus"
27
28         "git.fd.io/govpp.git/adapter"
29         "git.fd.io/govpp.git/api"
30         "git.fd.io/govpp.git/codec"
31 )
32
33 const (
34         DefaultReconnectInterval    = time.Second / 2 // default interval between reconnect attempts
35         DefaultMaxReconnectAttempts = 3               // default maximum number of reconnect attempts
36 )
37
38 var (
39         RequestChanBufSize      = 100 // default size of the request channel buffer
40         ReplyChanBufSize        = 100 // default size of the reply channel buffer
41         NotificationChanBufSize = 100 // default size of the notification channel buffer
42 )
43
44 var (
45         HealthCheckProbeInterval = time.Second            // default health check probe interval
46         HealthCheckReplyTimeout  = time.Millisecond * 250 // timeout for reply to a health check probe
47         HealthCheckThreshold     = 2                      // number of failed health checks until the error is reported
48         DefaultReplyTimeout      = time.Second            // default timeout for replies from VPP
49 )
50
51 // ConnectionState represents the current state of the connection to VPP.
52 type ConnectionState int
53
54 const (
55         // Connected represents state in which the connection has been successfully established.
56         Connected ConnectionState = iota
57
58         // NotResponding represents a state where the VPP socket accepts messages but replies are received with delay,
59         // or not at all. GoVPP treats this state internally the same as disconnected.
60         NotResponding
61
62         // Disconnected represents state in which the VPP socket is closed and the connection is considered dropped.
63         Disconnected
64
65         // Failed represents state in which the reconnecting failed after exceeding maximum number of attempts.
66         Failed
67 )
68
69 func (s ConnectionState) String() string {
70         switch s {
71         case Connected:
72                 return "Connected"
73         case NotResponding:
74                 return "NotResponding"
75         case Disconnected:
76                 return "Disconnected"
77         case Failed:
78                 return "Failed"
79         default:
80                 return fmt.Sprintf("UnknownState(%d)", s)
81         }
82 }
83
84 // ConnectionEvent is a notification about change in the VPP connection state.
85 type ConnectionEvent struct {
86         // Timestamp holds the time when the event has been created.
87         Timestamp time.Time
88
89         // State holds the new state of the connection at the time when the event has been created.
90         State ConnectionState
91
92         // Error holds error if any encountered.
93         Error error
94 }
95
96 // Connection represents a shared memory connection to VPP via vppAdapter.
97 type Connection struct {
98         vppClient adapter.VppAPI // VPP binary API client
99
100         maxAttempts int           // interval for reconnect attempts
101         recInterval time.Duration // maximum number of reconnect attempts
102
103         vppConnected uint32 // non-zero if the adapter is connected to VPP
104
105         connChan chan ConnectionEvent // connection status events are sent to this channel
106
107         codec        MessageCodec                      // message codec
108         msgIDs       map[string]uint16                 // map of message IDs indexed by message name + CRC
109         msgMapByPath map[string]map[uint16]api.Message // map of messages indexed by message ID which are indexed by path
110
111         maxChannelID uint32              // maximum used channel ID (the real limit is 2^15, 32-bit is used for atomic operations)
112         channelsLock sync.RWMutex        // lock for the channels map
113         channels     map[uint16]*Channel // map of all API channels indexed by the channel ID
114
115         subscriptionsLock sync.RWMutex                  // lock for the subscriptions map
116         subscriptions     map[uint16][]*subscriptionCtx // map od all notification subscriptions indexed by message ID
117
118         pingReqID   uint16 // ID if the ControlPing message
119         pingReplyID uint16 // ID of the ControlPingReply message
120
121         lastReplyLock sync.Mutex // lock for the last reply
122         lastReply     time.Time  // time of the last received reply from VPP
123
124         msgControlPing      api.Message
125         msgControlPingReply api.Message
126 }
127
128 func newConnection(binapi adapter.VppAPI, attempts int, interval time.Duration) *Connection {
129         if attempts == 0 {
130                 attempts = DefaultMaxReconnectAttempts
131         }
132         if interval == 0 {
133                 interval = DefaultReconnectInterval
134         }
135
136         c := &Connection{
137                 vppClient:           binapi,
138                 maxAttempts:         attempts,
139                 recInterval:         interval,
140                 connChan:            make(chan ConnectionEvent, NotificationChanBufSize),
141                 codec:               codec.DefaultCodec,
142                 msgIDs:              make(map[string]uint16),
143                 msgMapByPath:        make(map[string]map[uint16]api.Message),
144                 channels:            make(map[uint16]*Channel),
145                 subscriptions:       make(map[uint16][]*subscriptionCtx),
146                 msgControlPing:      msgControlPing,
147                 msgControlPingReply: msgControlPingReply,
148         }
149         binapi.SetMsgCallback(c.msgCallback)
150         return c
151 }
152
153 // Connect connects to VPP API using specified adapter and returns a connection handle.
154 // This call blocks until it is either connected, or an error occurs.
155 // Only one connection attempt will be performed.
156 func Connect(binapi adapter.VppAPI) (*Connection, error) {
157         // create new connection handle
158         c := newConnection(binapi, DefaultMaxReconnectAttempts, DefaultReconnectInterval)
159
160         // blocking attempt to connect to VPP
161         if err := c.connectVPP(); err != nil {
162                 return nil, err
163         }
164
165         return c, nil
166 }
167
168 // AsyncConnect asynchronously connects to VPP using specified VPP adapter and returns the connection handle
169 // and ConnectionState channel. This call does not block until connection is established, it
170 // returns immediately. The caller is supposed to watch the returned ConnectionState channel for
171 // Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect.
172 func AsyncConnect(binapi adapter.VppAPI, attempts int, interval time.Duration) (*Connection, chan ConnectionEvent, error) {
173         // create new connection handle
174         c := newConnection(binapi, attempts, interval)
175
176         // asynchronously attempt to connect to VPP
177         go c.connectLoop()
178
179         return c, c.connChan, nil
180 }
181
182 // connectVPP performs blocking attempt to connect to VPP.
183 func (c *Connection) connectVPP() error {
184         log.Debug("Connecting to VPP..")
185
186         // blocking connect
187         if err := c.vppClient.Connect(); err != nil {
188                 return err
189         }
190         log.Debugf("Connected to VPP")
191
192         if err := c.retrieveMessageIDs(); err != nil {
193                 if err := c.vppClient.Disconnect(); err != nil {
194                         log.Debugf("disconnecting vpp client failed: %v", err)
195                 }
196                 return fmt.Errorf("VPP is incompatible: %v", err)
197         }
198
199         // store connected state
200         atomic.StoreUint32(&c.vppConnected, 1)
201
202         return nil
203 }
204
205 // Disconnect disconnects from VPP API and releases all connection-related resources.
206 func (c *Connection) Disconnect() {
207         if c == nil {
208                 return
209         }
210         if c.vppClient != nil {
211                 c.disconnectVPP()
212         }
213 }
214
215 // disconnectVPP disconnects from VPP in case it is connected.
216 func (c *Connection) disconnectVPP() {
217         if atomic.CompareAndSwapUint32(&c.vppConnected, 1, 0) {
218                 log.Debug("Disconnecting from VPP..")
219
220                 if err := c.vppClient.Disconnect(); err != nil {
221                         log.Debugf("Disconnect from VPP failed: %v", err)
222                 }
223                 log.Debug("Disconnected from VPP")
224         }
225 }
226
227 func (c *Connection) NewAPIChannel() (api.Channel, error) {
228         return c.newAPIChannel(RequestChanBufSize, ReplyChanBufSize)
229 }
230
231 func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (api.Channel, error) {
232         return c.newAPIChannel(reqChanBufSize, replyChanBufSize)
233 }
234
235 // NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core.
236 // It allows to specify custom buffer sizes for the request and reply Go channels.
237 func (c *Connection) newAPIChannel(reqChanBufSize, replyChanBufSize int) (*Channel, error) {
238         if c == nil {
239                 return nil, errors.New("nil connection passed in")
240         }
241
242         // create new channel
243         chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff)
244         channel := newChannel(chID, c, c.codec, c, reqChanBufSize, replyChanBufSize)
245
246         // store API channel within the client
247         c.channelsLock.Lock()
248         c.channels[chID] = channel
249         c.channelsLock.Unlock()
250
251         // start watching on the request channel
252         go c.watchRequests(channel)
253
254         return channel, nil
255 }
256
257 // releaseAPIChannel releases API channel that needs to be closed.
258 func (c *Connection) releaseAPIChannel(ch *Channel) {
259         log.WithFields(logger.Fields{
260                 "channel": ch.id,
261         }).Debug("API channel released")
262
263         // delete the channel from channels map
264         c.channelsLock.Lock()
265         delete(c.channels, ch.id)
266         c.channelsLock.Unlock()
267 }
268
269 // connectLoop attempts to connect to VPP until it succeeds.
270 // Then it continues with healthCheckLoop.
271 func (c *Connection) connectLoop() {
272         var reconnectAttempts int
273
274         // loop until connected
275         for {
276                 if err := c.vppClient.WaitReady(); err != nil {
277                         log.Debugf("wait ready failed: %v", err)
278                 }
279                 if err := c.connectVPP(); err == nil {
280                         // signal connected event
281                         c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Connected})
282                         break
283                 } else if reconnectAttempts < c.maxAttempts {
284                         reconnectAttempts++
285                         log.Warnf("connecting failed (attempt %d/%d): %v", reconnectAttempts, c.maxAttempts, err)
286                         time.Sleep(c.recInterval)
287                 } else {
288                         c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err})
289                         return
290                 }
291         }
292
293         // we are now connected, continue with health check loop
294         c.healthCheckLoop()
295 }
296
297 // healthCheckLoop checks whether connection to VPP is alive. In case of disconnect,
298 // it continues with connectLoop and tries to reconnect.
299 func (c *Connection) healthCheckLoop() {
300         // create a separate API channel for health check probes
301         ch, err := c.newAPIChannel(1, 1)
302         if err != nil {
303                 log.Error("Failed to create health check API channel, health check will be disabled:", err)
304                 return
305         }
306
307         var (
308                 sinceLastReply time.Duration
309                 failedChecks   int
310         )
311
312         // send health check probes until an error or timeout occurs
313         for {
314                 // sleep until next health check probe period
315                 time.Sleep(HealthCheckProbeInterval)
316
317                 if atomic.LoadUint32(&c.vppConnected) == 0 {
318                         // Disconnect has been called in the meantime, return the healthcheck - reconnect loop
319                         log.Debug("Disconnected on request, exiting health check loop.")
320                         return
321                 }
322
323                 // try draining probe replies from previous request before sending next one
324                 select {
325                 case <-ch.replyChan:
326                         log.Debug("drained old probe reply from reply channel")
327                 default:
328                 }
329
330                 // send the control ping request
331                 ch.reqChan <- &vppRequest{msg: c.msgControlPing}
332
333                 for {
334                         // expect response within timeout period
335                         select {
336                         case vppReply := <-ch.replyChan:
337                                 err = vppReply.err
338
339                         case <-time.After(HealthCheckReplyTimeout):
340                                 err = ErrProbeTimeout
341
342                                 // check if time since last reply from any other
343                                 // channel is less than health check reply timeout
344                                 c.lastReplyLock.Lock()
345                                 sinceLastReply = time.Since(c.lastReply)
346                                 c.lastReplyLock.Unlock()
347
348                                 if sinceLastReply < HealthCheckReplyTimeout {
349                                         log.Warnf("VPP health check probe timing out, but some request on other channel was received %v ago, continue waiting!", sinceLastReply)
350                                         continue
351                                 }
352                         }
353                         break
354                 }
355
356                 if err == ErrProbeTimeout {
357                         failedChecks++
358                         log.Warnf("VPP health check probe timed out after %v (%d. timeout)", HealthCheckReplyTimeout, failedChecks)
359                         if failedChecks > HealthCheckThreshold {
360                                 // in case of exceeded failed check threshold, assume VPP unresponsive
361                                 log.Errorf("VPP does not responding, the health check exceeded threshold for timeouts (>%d)", HealthCheckThreshold)
362                                 c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: NotResponding})
363                                 break
364                         }
365                 } else if err != nil {
366                         // in case of error, assume VPP disconnected
367                         log.Errorf("VPP health check probe failed: %v", err)
368                         c.sendConnEvent(ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: err})
369                         break
370                 } else if failedChecks > 0 {
371                         // in case of success after failed checks, clear failed check counter
372                         failedChecks = 0
373                         log.Infof("VPP health check probe OK")
374                 }
375         }
376
377         // cleanup
378         ch.Close()
379         c.disconnectVPP()
380
381         // we are now disconnected, start connect loop
382         c.connectLoop()
383 }
384
385 func getMsgNameWithCrc(x api.Message) string {
386         return getMsgID(x.GetMessageName(), x.GetCrcString())
387 }
388
389 func getMsgID(name, crc string) string {
390         return name + "_" + crc
391 }
392
393 func getMsgFactory(msg api.Message) func() api.Message {
394         return func() api.Message {
395                 return reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
396         }
397 }
398
399 // GetMessageID returns message identifier of given API message.
400 func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
401         if c == nil {
402                 return 0, errors.New("nil connection passed in")
403         }
404         pkgPath := c.GetMessagePath(msg)
405         msgID, err := c.vppClient.GetMsgID(msg.GetMessageName(), msg.GetCrcString())
406         if err != nil {
407                 return 0, err
408         }
409         if pathMsgs, pathOk := c.msgMapByPath[pkgPath]; !pathOk {
410                 c.msgMapByPath[pkgPath] = make(map[uint16]api.Message)
411                 c.msgMapByPath[pkgPath][msgID] = msg
412         } else if _, msgOk := pathMsgs[msgID]; !msgOk {
413                 c.msgMapByPath[pkgPath][msgID] = msg
414         }
415         if _, ok := c.msgIDs[getMsgNameWithCrc(msg)]; ok {
416                 return msgID, nil
417         }
418         c.msgIDs[getMsgNameWithCrc(msg)] = msgID
419         return msgID, nil
420 }
421
422 // LookupByID looks up message name and crc by ID.
423 func (c *Connection) LookupByID(path string, msgID uint16) (api.Message, error) {
424         if c == nil {
425                 return nil, errors.New("nil connection passed in")
426         }
427         if msg, ok := c.msgMapByPath[path][msgID]; ok {
428                 return msg, nil
429         }
430         return nil, fmt.Errorf("unknown message ID %d for path '%s'", msgID, path)
431 }
432
433 // GetMessagePath returns path for the given message
434 func (c *Connection) GetMessagePath(msg api.Message) string {
435         return path.Dir(reflect.TypeOf(msg).Elem().PkgPath())
436 }
437
438 // retrieveMessageIDs retrieves IDs for all registered messages and stores them in map
439 func (c *Connection) retrieveMessageIDs() (err error) {
440         t := time.Now()
441
442         msgsByPath := api.GetRegisteredMessages()
443
444         var n int
445         for pkgPath, msgs := range msgsByPath {
446                 for _, msg := range msgs {
447                         msgID, err := c.GetMessageID(msg)
448                         if err != nil {
449                                 if debugMsgIDs {
450                                         log.Debugf("retrieving message ID for %s.%s failed: %v",
451                                                 pkgPath, msg.GetMessageName(), err)
452                                 }
453                                 continue
454                         }
455                         n++
456
457                         if c.pingReqID == 0 && msg.GetMessageName() == c.msgControlPing.GetMessageName() {
458                                 c.pingReqID = msgID
459                                 c.msgControlPing = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
460                         } else if c.pingReplyID == 0 && msg.GetMessageName() == c.msgControlPingReply.GetMessageName() {
461                                 c.pingReplyID = msgID
462                                 c.msgControlPingReply = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
463                         }
464
465                         if debugMsgIDs {
466                                 log.Debugf("message %q (%s) has ID: %d", msg.GetMessageName(), getMsgNameWithCrc(msg), msgID)
467                         }
468                 }
469                 log.WithField("took", time.Since(t)).
470                         Debugf("retrieved IDs for %d messages (registered %d) from path %s", n, len(msgs), pkgPath)
471         }
472
473         return nil
474 }
475
476 func (c *Connection) sendConnEvent(event ConnectionEvent) {
477         select {
478         case c.connChan <- event:
479         default:
480                 log.Warn("Connection state channel is full, discarding value.")
481         }
482 }