Merge "Add .gitreview"
[govpp.git] / core / connection.go
1 // Copyright (c) 2017 Cisco and/or its affiliates.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 package core
16
17 import (
18         "errors"
19         "fmt"
20         "reflect"
21         "sync"
22         "sync/atomic"
23         "time"
24
25         logger "github.com/sirupsen/logrus"
26
27         "git.fd.io/govpp.git/adapter"
28         "git.fd.io/govpp.git/api"
29         "git.fd.io/govpp.git/codec"
30 )
31
32 var (
33         RequestChanBufSize      = 100 // default size of the request channel buffer
34         ReplyChanBufSize        = 100 // default size of the reply channel buffer
35         NotificationChanBufSize = 100 // default size of the notification channel buffer
36 )
37
38 var (
39         HealthCheckProbeInterval = time.Second * 1        // default health check probe interval
40         HealthCheckReplyTimeout  = time.Millisecond * 100 // timeout for reply to a health check probe
41         HealthCheckThreshold     = 1                      // number of failed health checks until the error is reported
42         DefaultReplyTimeout      = time.Second * 1        // default timeout for replies from VPP
43 )
44
45 // ConnectionState represents the current state of the connection to VPP.
46 type ConnectionState int
47
48 const (
49         // Connected represents state in which the connection has been successfully established.
50         Connected ConnectionState = iota
51
52         // Disconnected represents state in which the connection has been dropped.
53         Disconnected
54 )
55
56 // ConnectionEvent is a notification about change in the VPP connection state.
57 type ConnectionEvent struct {
58         // Timestamp holds the time when the event has been created.
59         Timestamp time.Time
60
61         // State holds the new state of the connection at the time when the event has been created.
62         State ConnectionState
63
64         // Error holds error if any encountered.
65         Error error
66 }
67
68 var (
69         connLock sync.RWMutex // lock for the global connection
70         conn     *Connection  // global handle to the Connection (used in the message receive callback)
71 )
72
73 // Connection represents a shared memory connection to VPP via vppAdapter.
74 type Connection struct {
75         vpp       adapter.VppAdapter // VPP adapter
76         connected uint32             // non-zero if the adapter is connected to VPP
77
78         codec  *codec.MsgCodec        // message codec
79         msgIDs map[string]uint16      // map of message IDs indexed by message name + CRC
80         msgMap map[uint16]api.Message // map of messages indexed by message ID
81
82         maxChannelID uint32              // maximum used channel ID (the real limit is 2^15, 32-bit is used for atomic operations)
83         channelsLock sync.RWMutex        // lock for the channels map
84         channels     map[uint16]*Channel // map of all API channels indexed by the channel ID
85
86         subscriptionsLock sync.RWMutex                  // lock for the subscriptions map
87         subscriptions     map[uint16][]*subscriptionCtx // map od all notification subscriptions indexed by message ID
88
89         pingReqID   uint16 // ID if the ControlPing message
90         pingReplyID uint16 // ID of the ControlPingReply message
91
92         lastReplyLock sync.Mutex // lock for the last reply
93         lastReply     time.Time  // time of the last received reply from VPP
94 }
95
96 func newConnection(vpp adapter.VppAdapter) *Connection {
97         c := &Connection{
98                 vpp:           vpp,
99                 codec:         &codec.MsgCodec{},
100                 msgIDs:        make(map[string]uint16),
101                 msgMap:        make(map[uint16]api.Message),
102                 channels:      make(map[uint16]*Channel),
103                 subscriptions: make(map[uint16][]*subscriptionCtx),
104         }
105         vpp.SetMsgCallback(c.msgCallback)
106         return c
107 }
108
109 // Connect connects to VPP using specified VPP adapter and returns the connection handle.
110 // This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed.
111 func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) {
112         // create new connection handle
113         c, err := createConnection(vppAdapter)
114         if err != nil {
115                 return nil, err
116         }
117
118         // blocking attempt to connect to VPP
119         if err := c.connectVPP(); err != nil {
120                 return nil, err
121         }
122
123         return c, nil
124 }
125
126 // AsyncConnect asynchronously connects to VPP using specified VPP adapter and returns the connection handle
127 // and ConnectionState channel. This call does not block until connection is established, it
128 // returns immediately. The caller is supposed to watch the returned ConnectionState channel for
129 // Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect.
130 func AsyncConnect(vppAdapter adapter.VppAdapter) (*Connection, chan ConnectionEvent, error) {
131         // create new connection handle
132         c, err := createConnection(vppAdapter)
133         if err != nil {
134                 return nil, nil, err
135         }
136
137         // asynchronously attempt to connect to VPP
138         connChan := make(chan ConnectionEvent, NotificationChanBufSize)
139         go c.connectLoop(connChan)
140
141         return c, connChan, nil
142 }
143
144 // Disconnect disconnects from VPP and releases all connection-related resources.
145 func (c *Connection) Disconnect() {
146         if c == nil {
147                 return
148         }
149
150         connLock.Lock()
151         defer connLock.Unlock()
152
153         if c.vpp != nil {
154                 c.disconnectVPP()
155         }
156         conn = nil
157 }
158
159 // newConnection returns new connection handle.
160 func createConnection(vppAdapter adapter.VppAdapter) (*Connection, error) {
161         connLock.Lock()
162         defer connLock.Unlock()
163
164         if conn != nil {
165                 return nil, errors.New("only one connection per process is supported")
166         }
167
168         conn = newConnection(vppAdapter)
169
170         return conn, nil
171 }
172
173 // connectVPP performs blocking attempt to connect to VPP.
174 func (c *Connection) connectVPP() error {
175         log.Debug("Connecting to VPP..")
176
177         // blocking connect
178         if err := c.vpp.Connect(); err != nil {
179                 return err
180         }
181
182         log.Debugf("Connected to VPP.")
183
184         if err := c.retrieveMessageIDs(); err != nil {
185                 c.vpp.Disconnect()
186                 return fmt.Errorf("VPP is incompatible: %v", err)
187         }
188
189         // store connected state
190         atomic.StoreUint32(&c.connected, 1)
191
192         return nil
193 }
194
195 func (c *Connection) NewAPIChannel() (api.Channel, error) {
196         return c.newAPIChannel(RequestChanBufSize, ReplyChanBufSize)
197 }
198
199 func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (api.Channel, error) {
200         return c.newAPIChannel(reqChanBufSize, replyChanBufSize)
201 }
202
203 // NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core.
204 // It allows to specify custom buffer sizes for the request and reply Go channels.
205 func (c *Connection) newAPIChannel(reqChanBufSize, replyChanBufSize int) (*Channel, error) {
206         if c == nil {
207                 return nil, errors.New("nil connection passed in")
208         }
209
210         // create new channel
211         chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff)
212         channel := newChannel(chID, c, c.codec, c, reqChanBufSize, replyChanBufSize)
213
214         // store API channel within the client
215         c.channelsLock.Lock()
216         c.channels[chID] = channel
217         c.channelsLock.Unlock()
218
219         // start watching on the request channel
220         go c.watchRequests(channel)
221
222         return channel, nil
223 }
224
225 // releaseAPIChannel releases API channel that needs to be closed.
226 func (c *Connection) releaseAPIChannel(ch *Channel) {
227         log.WithFields(logger.Fields{
228                 "channel": ch.id,
229         }).Debug("API channel released")
230
231         // delete the channel from channels map
232         c.channelsLock.Lock()
233         delete(c.channels, ch.id)
234         c.channelsLock.Unlock()
235 }
236
237 // GetMessageID returns message identifier of given API message.
238 func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
239         if c == nil {
240                 return 0, errors.New("nil connection passed in")
241         }
242
243         if msgID, ok := c.msgIDs[getMsgNameWithCrc(msg)]; ok {
244                 return msgID, nil
245         }
246
247         return 0, fmt.Errorf("unknown message: %s (%s)", msg.GetMessageName(), msg.GetCrcString())
248 }
249
250 // LookupByID looks up message name and crc by ID.
251 func (c *Connection) LookupByID(msgID uint16) (api.Message, error) {
252         if c == nil {
253                 return nil, errors.New("nil connection passed in")
254         }
255
256         if msg, ok := c.msgMap[msgID]; ok {
257                 return msg, nil
258         }
259
260         return nil, fmt.Errorf("unknown message ID: %d", msgID)
261 }
262
263 // retrieveMessageIDs retrieves IDs for all registered messages and stores them in map
264 func (c *Connection) retrieveMessageIDs() (err error) {
265         t := time.Now()
266
267         var addMsg = func(msgID uint16, msg api.Message) {
268                 c.msgIDs[getMsgNameWithCrc(msg)] = msgID
269                 c.msgMap[msgID] = msg
270         }
271
272         msgs := api.GetAllMessages()
273
274         for name, msg := range msgs {
275                 msgID, err := c.vpp.GetMsgID(msg.GetMessageName(), msg.GetCrcString())
276                 if err != nil {
277                         return err
278                 }
279
280                 addMsg(msgID, msg)
281
282                 if msg.GetMessageName() == msgControlPing.GetMessageName() {
283                         c.pingReqID = msgID
284                         msgControlPing = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
285                 } else if msg.GetMessageName() == msgControlPingReply.GetMessageName() {
286                         c.pingReplyID = msgID
287                         msgControlPingReply = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
288                 }
289
290                 if debugMsgIDs {
291                         log.Debugf("message %q (%s) has ID: %d", name, getMsgNameWithCrc(msg), msgID)
292                 }
293         }
294
295         log.Debugf("retrieving %d message IDs took %s", len(msgs), time.Since(t))
296
297         // fallback for control ping when vpe package is not imported
298         if c.pingReqID == 0 {
299                 c.pingReqID, err = c.vpp.GetMsgID(msgControlPing.GetMessageName(), msgControlPing.GetCrcString())
300                 if err != nil {
301                         return err
302                 }
303                 addMsg(c.pingReqID, msgControlPing)
304         }
305         if c.pingReplyID == 0 {
306                 c.pingReplyID, err = c.vpp.GetMsgID(msgControlPingReply.GetMessageName(), msgControlPingReply.GetCrcString())
307                 if err != nil {
308                         return err
309                 }
310                 addMsg(c.pingReplyID, msgControlPingReply)
311         }
312
313         return nil
314 }
315
316 // disconnectVPP disconnects from VPP in case it is connected.
317 func (c *Connection) disconnectVPP() {
318         if atomic.CompareAndSwapUint32(&c.connected, 1, 0) {
319                 c.vpp.Disconnect()
320         }
321 }
322
323 // connectLoop attempts to connect to VPP until it succeeds.
324 // Then it continues with healthCheckLoop.
325 func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
326         // loop until connected
327         for {
328                 if err := c.vpp.WaitReady(); err != nil {
329                         log.Warnf("wait ready failed: %v", err)
330                 }
331                 if err := c.connectVPP(); err == nil {
332                         // signal connected event
333                         connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected}
334                         break
335                 } else {
336                         log.Errorf("connecting to VPP failed: %v", err)
337                         time.Sleep(time.Second)
338                 }
339         }
340
341         // we are now connected, continue with health check loop
342         c.healthCheckLoop(connChan)
343 }
344
345 // healthCheckLoop checks whether connection to VPP is alive. In case of disconnect,
346 // it continues with connectLoop and tries to reconnect.
347 func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
348         // create a separate API channel for health check probes
349         ch, err := c.newAPIChannel(1, 1)
350         if err != nil {
351                 log.Error("Failed to create health check API channel, health check will be disabled:", err)
352                 return
353         }
354
355         var (
356                 sinceLastReply time.Duration
357                 failedChecks   int
358         )
359
360         // send health check probes until an error or timeout occurs
361         for {
362                 // sleep until next health check probe period
363                 time.Sleep(HealthCheckProbeInterval)
364
365                 if atomic.LoadUint32(&c.connected) == 0 {
366                         // Disconnect has been called in the meantime, return the healthcheck - reconnect loop
367                         log.Debug("Disconnected on request, exiting health check loop.")
368                         return
369                 }
370
371                 // try draining probe replies from previous request before sending next one
372                 select {
373                 case <-ch.replyChan:
374                         log.Debug("drained old probe reply from reply channel")
375                 default:
376                 }
377
378                 // send the control ping request
379                 ch.reqChan <- &vppRequest{msg: msgControlPing}
380
381                 for {
382                         // expect response within timeout period
383                         select {
384                         case vppReply := <-ch.replyChan:
385                                 err = vppReply.err
386
387                         case <-time.After(HealthCheckReplyTimeout):
388                                 err = ErrProbeTimeout
389
390                                 // check if time since last reply from any other
391                                 // channel is less than health check reply timeout
392                                 c.lastReplyLock.Lock()
393                                 sinceLastReply = time.Since(c.lastReply)
394                                 c.lastReplyLock.Unlock()
395
396                                 if sinceLastReply < HealthCheckReplyTimeout {
397                                         log.Warnf("VPP health check probe timing out, but some request on other channel was received %v ago, continue waiting!", sinceLastReply)
398                                         continue
399                                 }
400                         }
401                         break
402                 }
403
404                 if err == ErrProbeTimeout {
405                         failedChecks++
406                         log.Warnf("VPP health check probe timed out after %v (%d. timeout)", HealthCheckReplyTimeout, failedChecks)
407                         if failedChecks > HealthCheckThreshold {
408                                 // in case of exceeded failed check treshold, assume VPP disconnected
409                                 log.Errorf("VPP health check exceeded treshold for timeouts (>%d), assuming disconnect", HealthCheckThreshold)
410                                 connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
411                                 break
412                         }
413                 } else if err != nil {
414                         // in case of error, assume VPP disconnected
415                         log.Errorf("VPP health check probe failed: %v", err)
416                         connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: err}
417                         break
418                 } else if failedChecks > 0 {
419                         // in case of success after failed checks, clear failed check counter
420                         failedChecks = 0
421                         log.Infof("VPP health check probe OK")
422                 }
423         }
424
425         // cleanup
426         ch.Close()
427         c.disconnectVPP()
428
429         // we are now disconnected, start connect loop
430         c.connectLoop(connChan)
431 }
432
433 func getMsgNameWithCrc(x api.Message) string {
434         return x.GetMessageName() + "_" + x.GetCrcString()
435 }