Refactor GoVPP
[govpp.git] / core / connection.go
1 // Copyright (c) 2017 Cisco and/or its affiliates.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 package core
16
17 import (
18         "errors"
19         "fmt"
20         "reflect"
21         "sync"
22         "sync/atomic"
23         "time"
24
25         logger "github.com/sirupsen/logrus"
26
27         "git.fd.io/govpp.git/adapter"
28         "git.fd.io/govpp.git/api"
29         "git.fd.io/govpp.git/codec"
30 )
31
32 const (
33         requestChannelBufSize      = 100 // default size of the request channel buffer
34         replyChannelBufSize        = 100 // default size of the reply channel buffer
35         notificationChannelBufSize = 100 // default size of the notification channel buffer
36
37         defaultReplyTimeout = time.Second * 1 // default timeout for replies from VPP, can be changed with SetReplyTimeout
38 )
39
40 var (
41         healthCheckInterval     = time.Second * 1        // default health check interval
42         healthCheckReplyTimeout = time.Millisecond * 100 // timeout for reply to a health check
43         healthCheckThreshold    = 1                      // number of failed health checks until the error is reported
44 )
45
46 // SetHealthCheckProbeInterval sets health check probe interval.
47 // Beware: Function is not thread-safe. It is recommended to setup this parameter
48 // before connecting to vpp.
49 func SetHealthCheckProbeInterval(interval time.Duration) {
50         healthCheckInterval = interval
51 }
52
53 // SetHealthCheckReplyTimeout sets timeout for reply to a health check probe.
54 // If reply arrives after the timeout, check is considered as failed.
55 // Beware: Function is not thread-safe. It is recommended to setup this parameter
56 // before connecting to vpp.
57 func SetHealthCheckReplyTimeout(timeout time.Duration) {
58         healthCheckReplyTimeout = timeout
59 }
60
61 // SetHealthCheckThreshold sets the number of failed healthProbe checks until the error is reported.
62 // Beware: Function is not thread-safe. It is recommended to setup this parameter
63 // before connecting to vpp.
64 func SetHealthCheckThreshold(threshold int) {
65         healthCheckThreshold = threshold
66 }
67
68 // ConnectionState represents the current state of the connection to VPP.
69 type ConnectionState int
70
71 const (
72         // Connected represents state in which the connection has been successfully established.
73         Connected ConnectionState = iota
74
75         // Disconnected represents state in which the connection has been dropped.
76         Disconnected
77 )
78
79 // ConnectionEvent is a notification about change in the VPP connection state.
80 type ConnectionEvent struct {
81         // Timestamp holds the time when the event has been created.
82         Timestamp time.Time
83
84         // State holds the new state of the connection at the time when the event has been created.
85         State ConnectionState
86
87         // Error holds error if any encountered.
88         Error error
89 }
90
91 var (
92         connLock sync.RWMutex // lock for the global connection
93         conn     *Connection  // global handle to the Connection (used in the message receive callback)
94 )
95
96 // Connection represents a shared memory connection to VPP via vppAdapter.
97 type Connection struct {
98         vpp       adapter.VppAdapter // VPP adapter
99         connected uint32             // non-zero if the adapter is connected to VPP
100
101         codec  *codec.MsgCodec        // message codec
102         msgIDs map[string]uint16      // map of message IDs indexed by message name + CRC
103         msgMap map[uint16]api.Message // map of messages indexed by message ID
104
105         maxChannelID uint32              // maximum used channel ID (the real limit is 2^15, 32-bit is used for atomic operations)
106         channelsLock sync.RWMutex        // lock for the channels map
107         channels     map[uint16]*channel // map of all API channels indexed by the channel ID
108
109         notifSubscriptionsLock sync.RWMutex                        // lock for the subscriptions map
110         notifSubscriptions     map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID
111
112         pingReqID   uint16 // ID if the ControlPing message
113         pingReplyID uint16 // ID of the ControlPingReply message
114
115         lastReplyLock sync.Mutex // lock for the last reply
116         lastReply     time.Time  // time of the last received reply from VPP
117 }
118
119 // Connect connects to VPP using specified VPP adapter and returns the connection handle.
120 // This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed.
121 func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) {
122         // create new connection handle
123         c, err := newConnection(vppAdapter)
124         if err != nil {
125                 return nil, err
126         }
127
128         // blocking attempt to connect to VPP
129         err = c.connectVPP()
130         if err != nil {
131                 return nil, err
132         }
133
134         return c, nil
135 }
136
137 // AsyncConnect asynchronously connects to VPP using specified VPP adapter and returns the connection handle
138 // and ConnectionState channel. This call does not block until connection is established, it
139 // returns immediately. The caller is supposed to watch the returned ConnectionState channel for
140 // Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect.
141 func AsyncConnect(vppAdapter adapter.VppAdapter) (*Connection, chan ConnectionEvent, error) {
142         // create new connection handle
143         c, err := newConnection(vppAdapter)
144         if err != nil {
145                 return nil, nil, err
146         }
147
148         // asynchronously attempt to connect to VPP
149         connChan := make(chan ConnectionEvent, notificationChannelBufSize)
150         go c.connectLoop(connChan)
151
152         return c, connChan, nil
153 }
154
155 // Disconnect disconnects from VPP and releases all connection-related resources.
156 func (c *Connection) Disconnect() {
157         if c == nil {
158                 return
159         }
160
161         connLock.Lock()
162         defer connLock.Unlock()
163
164         if c.vpp != nil {
165                 c.disconnectVPP()
166         }
167         conn = nil
168 }
169
170 // newConnection returns new connection handle.
171 func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) {
172         connLock.Lock()
173         defer connLock.Unlock()
174
175         if conn != nil {
176                 return nil, errors.New("only one connection per process is supported")
177         }
178
179         conn = &Connection{
180                 vpp:                vppAdapter,
181                 codec:              &codec.MsgCodec{},
182                 channels:           make(map[uint16]*channel),
183                 msgIDs:             make(map[string]uint16),
184                 msgMap:             make(map[uint16]api.Message),
185                 notifSubscriptions: make(map[uint16][]*api.NotifSubscription),
186         }
187         conn.vpp.SetMsgCallback(conn.msgCallback)
188
189         return conn, nil
190 }
191
192 // connectVPP performs blocking attempt to connect to VPP.
193 func (c *Connection) connectVPP() error {
194         log.Debug("Connecting to VPP..")
195
196         // blocking connect
197         if err := c.vpp.Connect(); err != nil {
198                 return err
199         }
200
201         log.Debugf("Connected to VPP.")
202
203         if err := c.retrieveMessageIDs(); err != nil {
204                 c.vpp.Disconnect()
205                 return fmt.Errorf("VPP is incompatible: %v", err)
206         }
207
208         // store connected state
209         atomic.StoreUint32(&c.connected, 1)
210
211         return nil
212 }
213
214 func getMsgNameWithCrc(x api.Message) string {
215         return x.GetMessageName() + "_" + x.GetCrcString()
216 }
217
218 // retrieveMessageIDs retrieves IDs for all registered messages and stores them in map
219 func (c *Connection) retrieveMessageIDs() (err error) {
220         t := time.Now()
221
222         var addMsg = func(msgID uint16, msg api.Message) {
223                 c.msgIDs[getMsgNameWithCrc(msg)] = msgID
224                 c.msgMap[msgID] = msg
225         }
226
227         msgs := api.GetAllMessages()
228
229         for name, msg := range msgs {
230                 msgID, err := c.vpp.GetMsgID(msg.GetMessageName(), msg.GetCrcString())
231                 if err != nil {
232                         return err
233                 }
234
235                 addMsg(msgID, msg)
236
237                 if msg.GetMessageName() == msgControlPing.GetMessageName() {
238                         c.pingReqID = msgID
239                         msgControlPing = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
240                 } else if msg.GetMessageName() == msgControlPingReply.GetMessageName() {
241                         c.pingReplyID = msgID
242                         msgControlPingReply = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
243                 }
244
245                 if debugMsgIDs {
246                         log.Debugf("message %q (%s) has ID: %d", name, getMsgNameWithCrc(msg), msgID)
247                 }
248         }
249
250         log.Debugf("retrieving %d message IDs took %s", len(msgs), time.Since(t))
251
252         // fallback for control ping when vpe package is not imported
253         if c.pingReqID == 0 {
254                 c.pingReqID, err = c.vpp.GetMsgID(msgControlPing.GetMessageName(), msgControlPing.GetCrcString())
255                 if err != nil {
256                         return err
257                 }
258                 addMsg(c.pingReqID, msgControlPing)
259         }
260         if c.pingReplyID == 0 {
261                 c.pingReplyID, err = c.vpp.GetMsgID(msgControlPingReply.GetMessageName(), msgControlPingReply.GetCrcString())
262                 if err != nil {
263                         return err
264                 }
265                 addMsg(c.pingReplyID, msgControlPingReply)
266         }
267
268         return nil
269 }
270
271 // GetMessageID returns message identifier of given API message.
272 func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
273         if c == nil {
274                 return 0, errors.New("nil connection passed in")
275         }
276
277         if msgID, ok := c.msgIDs[getMsgNameWithCrc(msg)]; ok {
278                 return msgID, nil
279         }
280
281         return 0, fmt.Errorf("unknown message: %s (%s)", msg.GetMessageName(), msg.GetCrcString())
282 }
283
284 // LookupByID looks up message name and crc by ID.
285 func (c *Connection) LookupByID(msgID uint16) (api.Message, error) {
286         if c == nil {
287                 return nil, errors.New("nil connection passed in")
288         }
289
290         if msg, ok := c.msgMap[msgID]; ok {
291                 return msg, nil
292         }
293
294         return nil, fmt.Errorf("unknown message ID: %d", msgID)
295 }
296
297 // disconnectVPP disconnects from VPP in case it is connected.
298 func (c *Connection) disconnectVPP() {
299         if atomic.CompareAndSwapUint32(&c.connected, 1, 0) {
300                 c.vpp.Disconnect()
301         }
302 }
303
304 // connectLoop attempts to connect to VPP until it succeeds.
305 // Then it continues with healthCheckLoop.
306 func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
307         // loop until connected
308         for {
309                 if err := c.vpp.WaitReady(); err != nil {
310                         log.Warnf("wait ready failed: %v", err)
311                 }
312                 if err := c.connectVPP(); err == nil {
313                         // signal connected event
314                         connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected}
315                         break
316                 } else {
317                         log.Errorf("connecting to VPP failed: %v", err)
318                         time.Sleep(time.Second)
319                 }
320         }
321
322         // we are now connected, continue with health check loop
323         c.healthCheckLoop(connChan)
324 }
325
326 // healthCheckLoop checks whether connection to VPP is alive. In case of disconnect,
327 // it continues with connectLoop and tries to reconnect.
328 func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
329         // create a separate API channel for health check probes
330         ch, err := c.newAPIChannel(1, 1)
331         if err != nil {
332                 log.Error("Failed to create health check API channel, health check will be disabled:", err)
333                 return
334         }
335
336         var (
337                 sinceLastReply time.Duration
338                 failedChecks   int
339         )
340
341         // send health check probes until an error or timeout occurs
342         for {
343                 // sleep until next health check probe period
344                 time.Sleep(healthCheckInterval)
345
346                 if atomic.LoadUint32(&c.connected) == 0 {
347                         // Disconnect has been called in the meantime, return the healthcheck - reconnect loop
348                         log.Debug("Disconnected on request, exiting health check loop.")
349                         return
350                 }
351
352                 // try draining probe replies from previous request before sending next one
353                 select {
354                 case <-ch.replyChan:
355                         log.Debug("drained old probe reply from reply channel")
356                 default:
357                 }
358
359                 // send the control ping request
360                 ch.reqChan <- &vppRequest{msg: msgControlPing}
361
362                 for {
363                         // expect response within timeout period
364                         select {
365                         case vppReply := <-ch.replyChan:
366                                 err = vppReply.err
367
368                         case <-time.After(healthCheckReplyTimeout):
369                                 err = ErrProbeTimeout
370
371                                 // check if time since last reply from any other
372                                 // channel is less than health check reply timeout
373                                 c.lastReplyLock.Lock()
374                                 sinceLastReply = time.Since(c.lastReply)
375                                 c.lastReplyLock.Unlock()
376
377                                 if sinceLastReply < healthCheckReplyTimeout {
378                                         log.Warnf("VPP health check probe timing out, but some request on other channel was received %v ago, continue waiting!", sinceLastReply)
379                                         continue
380                                 }
381                         }
382                         break
383                 }
384
385                 if err == ErrProbeTimeout {
386                         failedChecks++
387                         log.Warnf("VPP health check probe timed out after %v (%d. timeout)", healthCheckReplyTimeout, failedChecks)
388                         if failedChecks > healthCheckThreshold {
389                                 // in case of exceeded failed check treshold, assume VPP disconnected
390                                 log.Errorf("VPP health check exceeded treshold for timeouts (>%d), assuming disconnect", healthCheckThreshold)
391                                 connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
392                                 break
393                         }
394                 } else if err != nil {
395                         // in case of error, assume VPP disconnected
396                         log.Errorf("VPP health check probe failed: %v", err)
397                         connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: err}
398                         break
399                 } else if failedChecks > 0 {
400                         // in case of success after failed checks, clear failed check counter
401                         failedChecks = 0
402                         log.Infof("VPP health check probe OK")
403                 }
404         }
405
406         // cleanup
407         ch.Close()
408         c.disconnectVPP()
409
410         // we are now disconnected, start connect loop
411         c.connectLoop(connChan)
412 }
413
414 func (c *Connection) NewAPIChannel() (api.Channel, error) {
415         return c.newAPIChannel(requestChannelBufSize, replyChannelBufSize)
416 }
417
418 func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (api.Channel, error) {
419         return c.newAPIChannel(reqChanBufSize, replyChanBufSize)
420 }
421
422 // NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core.
423 // It allows to specify custom buffer sizes for the request and reply Go channels.
424 func (c *Connection) newAPIChannel(reqChanBufSize, replyChanBufSize int) (*channel, error) {
425         if c == nil {
426                 return nil, errors.New("nil connection passed in")
427         }
428
429         chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff)
430         ch := &channel{
431                 id:                 chID,
432                 replyTimeout:       defaultReplyTimeout,
433                 msgDecoder:         c.codec,
434                 msgIdentifier:      c,
435                 reqChan:            make(chan *vppRequest, reqChanBufSize),
436                 replyChan:          make(chan *vppReply, replyChanBufSize),
437                 notifSubsChan:      make(chan *subscriptionRequest, reqChanBufSize),
438                 notifSubsReplyChan: make(chan error, replyChanBufSize),
439         }
440
441         // store API channel within the client
442         c.channelsLock.Lock()
443         c.channels[chID] = ch
444         c.channelsLock.Unlock()
445
446         // start watching on the request channel
447         go c.watchRequests(ch)
448
449         return ch, nil
450 }
451
452 // releaseAPIChannel releases API channel that needs to be closed.
453 func (c *Connection) releaseAPIChannel(ch *channel) {
454         log.WithFields(logger.Fields{
455                 "channel": ch.id,
456         }).Debug("API channel released")
457
458         // delete the channel from channels map
459         c.channelsLock.Lock()
460         delete(c.channels, ch.id)
461         c.channelsLock.Unlock()
462 }