a21cc2881da205f59fea2facb24dd1f9cfa74599
[govpp.git] / core / connection.go
1 // Copyright (c) 2017 Cisco and/or its affiliates.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 package core
16
17 import (
18         "errors"
19         "fmt"
20         "reflect"
21         "sync"
22         "sync/atomic"
23         "time"
24
25         logger "github.com/sirupsen/logrus"
26
27         "git.fd.io/govpp.git/adapter"
28         "git.fd.io/govpp.git/api"
29         "git.fd.io/govpp.git/codec"
30 )
31
32 var (
33         RequestChanBufSize      = 100 // default size of the request channel buffer
34         ReplyChanBufSize        = 100 // default size of the reply channel buffer
35         NotificationChanBufSize = 100 // default size of the notification channel buffer
36 )
37
38 var (
39         HealthCheckProbeInterval = time.Second * 1        // default health check probe interval
40         HealthCheckReplyTimeout  = time.Millisecond * 100 // timeout for reply to a health check probe
41         HealthCheckThreshold     = 1                      // number of failed health checks until the error is reported
42         DefaultReplyTimeout      = time.Second * 1        // default timeout for replies from VPP
43         ReconnectInterval        = time.Second * 1        // default interval for reconnect attempts
44         MaxReconnectAttempts     = 3                      // maximum number of reconnect attempts
45 )
46
47 // ConnectionState represents the current state of the connection to VPP.
48 type ConnectionState int
49
50 const (
51         // Connected represents state in which the connection has been successfully established.
52         Connected ConnectionState = iota
53
54         // Disconnected represents state in which the connection has been dropped.
55         Disconnected
56
57         // Failed represents state in which the reconnecting failed after exceeding maximum number of attempts.
58         Failed
59 )
60
61 func (s ConnectionState) String() string {
62         switch s {
63         case Connected:
64                 return "Connected"
65         case Disconnected:
66                 return "Disconnected"
67         case Failed:
68                 return "Failed"
69         default:
70                 return fmt.Sprintf("UnknownState(%d)", s)
71         }
72 }
73
74 // ConnectionEvent is a notification about change in the VPP connection state.
75 type ConnectionEvent struct {
76         // Timestamp holds the time when the event has been created.
77         Timestamp time.Time
78
79         // State holds the new state of the connection at the time when the event has been created.
80         State ConnectionState
81
82         // Error holds error if any encountered.
83         Error error
84 }
85
86 // Connection represents a shared memory connection to VPP via vppAdapter.
87 type Connection struct {
88         vppClient adapter.VppAPI // VPP binary API client
89         //statsClient adapter.StatsAPI // VPP stats API client
90
91         vppConnected uint32 // non-zero if the adapter is connected to VPP
92
93         codec  *codec.MsgCodec        // message codec
94         msgIDs map[string]uint16      // map of message IDs indexed by message name + CRC
95         msgMap map[uint16]api.Message // map of messages indexed by message ID
96
97         maxChannelID uint32              // maximum used channel ID (the real limit is 2^15, 32-bit is used for atomic operations)
98         channelsLock sync.RWMutex        // lock for the channels map
99         channels     map[uint16]*Channel // map of all API channels indexed by the channel ID
100
101         subscriptionsLock sync.RWMutex                  // lock for the subscriptions map
102         subscriptions     map[uint16][]*subscriptionCtx // map od all notification subscriptions indexed by message ID
103
104         pingReqID   uint16 // ID if the ControlPing message
105         pingReplyID uint16 // ID of the ControlPingReply message
106
107         lastReplyLock sync.Mutex // lock for the last reply
108         lastReply     time.Time  // time of the last received reply from VPP
109 }
110
111 func newConnection(binapi adapter.VppAPI) *Connection {
112         c := &Connection{
113                 vppClient:     binapi,
114                 codec:         &codec.MsgCodec{},
115                 msgIDs:        make(map[string]uint16),
116                 msgMap:        make(map[uint16]api.Message),
117                 channels:      make(map[uint16]*Channel),
118                 subscriptions: make(map[uint16][]*subscriptionCtx),
119         }
120         binapi.SetMsgCallback(c.msgCallback)
121         return c
122 }
123
124 // Connect connects to VPP API using specified adapter and returns a connection handle.
125 // This call blocks until it is either connected, or an error occurs.
126 // Only one connection attempt will be performed.
127 func Connect(binapi adapter.VppAPI) (*Connection, error) {
128         // create new connection handle
129         c := newConnection(binapi)
130
131         // blocking attempt to connect to VPP
132         if err := c.connectVPP(); err != nil {
133                 return nil, err
134         }
135
136         return c, nil
137 }
138
139 // AsyncConnect asynchronously connects to VPP using specified VPP adapter and returns the connection handle
140 // and ConnectionState channel. This call does not block until connection is established, it
141 // returns immediately. The caller is supposed to watch the returned ConnectionState channel for
142 // Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect.
143 func AsyncConnect(binapi adapter.VppAPI) (*Connection, chan ConnectionEvent, error) {
144         // create new connection handle
145         c := newConnection(binapi)
146
147         // asynchronously attempt to connect to VPP
148         connChan := make(chan ConnectionEvent, NotificationChanBufSize)
149         go c.connectLoop(connChan)
150
151         return c, connChan, nil
152 }
153
154 // connectVPP performs blocking attempt to connect to VPP.
155 func (c *Connection) connectVPP() error {
156         log.Debug("Connecting to VPP..")
157
158         // blocking connect
159         if err := c.vppClient.Connect(); err != nil {
160                 return err
161         }
162
163         log.Debugf("Connected to VPP.")
164
165         if err := c.retrieveMessageIDs(); err != nil {
166                 c.vppClient.Disconnect()
167                 return fmt.Errorf("VPP is incompatible: %v", err)
168         }
169
170         // store connected state
171         atomic.StoreUint32(&c.vppConnected, 1)
172
173         return nil
174 }
175
176 // Disconnect disconnects from VPP API and releases all connection-related resources.
177 func (c *Connection) Disconnect() {
178         if c == nil {
179                 return
180         }
181
182         if c.vppClient != nil {
183                 c.disconnectVPP()
184         }
185 }
186
187 // disconnectVPP disconnects from VPP in case it is connected.
188 func (c *Connection) disconnectVPP() {
189         if atomic.CompareAndSwapUint32(&c.vppConnected, 1, 0) {
190                 c.vppClient.Disconnect()
191         }
192 }
193
194 func (c *Connection) NewAPIChannel() (api.Channel, error) {
195         return c.newAPIChannel(RequestChanBufSize, ReplyChanBufSize)
196 }
197
198 func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (api.Channel, error) {
199         return c.newAPIChannel(reqChanBufSize, replyChanBufSize)
200 }
201
202 // NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core.
203 // It allows to specify custom buffer sizes for the request and reply Go channels.
204 func (c *Connection) newAPIChannel(reqChanBufSize, replyChanBufSize int) (*Channel, error) {
205         if c == nil {
206                 return nil, errors.New("nil connection passed in")
207         }
208
209         // create new channel
210         chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff)
211         channel := newChannel(chID, c, c.codec, c, reqChanBufSize, replyChanBufSize)
212
213         // store API channel within the client
214         c.channelsLock.Lock()
215         c.channels[chID] = channel
216         c.channelsLock.Unlock()
217
218         // start watching on the request channel
219         go c.watchRequests(channel)
220
221         return channel, nil
222 }
223
224 // releaseAPIChannel releases API channel that needs to be closed.
225 func (c *Connection) releaseAPIChannel(ch *Channel) {
226         log.WithFields(logger.Fields{
227                 "channel": ch.id,
228         }).Debug("API channel released")
229
230         // delete the channel from channels map
231         c.channelsLock.Lock()
232         delete(c.channels, ch.id)
233         c.channelsLock.Unlock()
234 }
235
236 // connectLoop attempts to connect to VPP until it succeeds.
237 // Then it continues with healthCheckLoop.
238 func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
239         reconnectAttempts := 0
240
241         // loop until connected
242         for {
243                 if err := c.vppClient.WaitReady(); err != nil {
244                         log.Warnf("wait ready failed: %v", err)
245                 }
246                 if err := c.connectVPP(); err == nil {
247                         // signal connected event
248                         connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected}
249                         break
250                 } else if reconnectAttempts < MaxReconnectAttempts {
251                         reconnectAttempts++
252                         log.Errorf("connecting failed (attempt %d/%d): %v", reconnectAttempts, MaxReconnectAttempts, err)
253                         time.Sleep(ReconnectInterval)
254                 } else {
255                         connChan <- ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err}
256                         return
257                 }
258         }
259
260         // we are now connected, continue with health check loop
261         c.healthCheckLoop(connChan)
262 }
263
264 // healthCheckLoop checks whether connection to VPP is alive. In case of disconnect,
265 // it continues with connectLoop and tries to reconnect.
266 func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
267         // create a separate API channel for health check probes
268         ch, err := c.newAPIChannel(1, 1)
269         if err != nil {
270                 log.Error("Failed to create health check API channel, health check will be disabled:", err)
271                 return
272         }
273
274         var (
275                 sinceLastReply time.Duration
276                 failedChecks   int
277         )
278
279         // send health check probes until an error or timeout occurs
280         for {
281                 // sleep until next health check probe period
282                 time.Sleep(HealthCheckProbeInterval)
283
284                 if atomic.LoadUint32(&c.vppConnected) == 0 {
285                         // Disconnect has been called in the meantime, return the healthcheck - reconnect loop
286                         log.Debug("Disconnected on request, exiting health check loop.")
287                         return
288                 }
289
290                 // try draining probe replies from previous request before sending next one
291                 select {
292                 case <-ch.replyChan:
293                         log.Debug("drained old probe reply from reply channel")
294                 default:
295                 }
296
297                 // send the control ping request
298                 ch.reqChan <- &vppRequest{msg: msgControlPing}
299
300                 for {
301                         // expect response within timeout period
302                         select {
303                         case vppReply := <-ch.replyChan:
304                                 err = vppReply.err
305
306                         case <-time.After(HealthCheckReplyTimeout):
307                                 err = ErrProbeTimeout
308
309                                 // check if time since last reply from any other
310                                 // channel is less than health check reply timeout
311                                 c.lastReplyLock.Lock()
312                                 sinceLastReply = time.Since(c.lastReply)
313                                 c.lastReplyLock.Unlock()
314
315                                 if sinceLastReply < HealthCheckReplyTimeout {
316                                         log.Warnf("VPP health check probe timing out, but some request on other channel was received %v ago, continue waiting!", sinceLastReply)
317                                         continue
318                                 }
319                         }
320                         break
321                 }
322
323                 if err == ErrProbeTimeout {
324                         failedChecks++
325                         log.Warnf("VPP health check probe timed out after %v (%d. timeout)", HealthCheckReplyTimeout, failedChecks)
326                         if failedChecks > HealthCheckThreshold {
327                                 // in case of exceeded failed check treshold, assume VPP disconnected
328                                 log.Errorf("VPP health check exceeded treshold for timeouts (>%d), assuming disconnect", HealthCheckThreshold)
329                                 connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
330                                 break
331                         }
332                 } else if err != nil {
333                         // in case of error, assume VPP disconnected
334                         log.Errorf("VPP health check probe failed: %v", err)
335                         connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: err}
336                         break
337                 } else if failedChecks > 0 {
338                         // in case of success after failed checks, clear failed check counter
339                         failedChecks = 0
340                         log.Infof("VPP health check probe OK")
341                 }
342         }
343
344         // cleanup
345         ch.Close()
346         c.disconnectVPP()
347
348         // we are now disconnected, start connect loop
349         c.connectLoop(connChan)
350 }
351
352 func getMsgNameWithCrc(x api.Message) string {
353         return x.GetMessageName() + "_" + x.GetCrcString()
354 }
355
356 func getMsgFactory(msg api.Message) func() api.Message {
357         return func() api.Message {
358                 return reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
359         }
360 }
361
362 // GetMessageID returns message identifier of given API message.
363 func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
364         if c == nil {
365                 return 0, errors.New("nil connection passed in")
366         }
367
368         if msgID, ok := c.msgIDs[getMsgNameWithCrc(msg)]; ok {
369                 return msgID, nil
370         }
371
372         msgID, err := c.vppClient.GetMsgID(msg.GetMessageName(), msg.GetCrcString())
373         if err != nil {
374                 return 0, err
375         }
376
377         c.msgIDs[getMsgNameWithCrc(msg)] = msgID
378         c.msgMap[msgID] = msg
379
380         return msgID, nil
381 }
382
383 // LookupByID looks up message name and crc by ID.
384 func (c *Connection) LookupByID(msgID uint16) (api.Message, error) {
385         if c == nil {
386                 return nil, errors.New("nil connection passed in")
387         }
388
389         if msg, ok := c.msgMap[msgID]; ok {
390                 return msg, nil
391         }
392
393         return nil, fmt.Errorf("unknown message ID: %d", msgID)
394 }
395
396 // retrieveMessageIDs retrieves IDs for all registered messages and stores them in map
397 func (c *Connection) retrieveMessageIDs() (err error) {
398         t := time.Now()
399
400         msgs := api.GetRegisteredMessages()
401
402         var n int
403         for name, msg := range msgs {
404                 msgID, err := c.GetMessageID(msg)
405                 if err != nil {
406                         log.Debugf("retrieving msgID for %s failed: %v", name, err)
407                         continue
408                 }
409                 n++
410
411                 if c.pingReqID == 0 && msg.GetMessageName() == msgControlPing.GetMessageName() {
412                         c.pingReqID = msgID
413                         msgControlPing = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
414                 } else if c.pingReplyID == 0 && msg.GetMessageName() == msgControlPingReply.GetMessageName() {
415                         c.pingReplyID = msgID
416                         msgControlPingReply = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
417                 }
418
419                 if debugMsgIDs {
420                         log.Debugf("message %q (%s) has ID: %d", name, getMsgNameWithCrc(msg), msgID)
421                 }
422         }
423         log.Debugf("retrieved %d/%d msgIDs (took %s)", n, len(msgs), time.Since(t))
424
425         return nil
426 }