Introduce StatsAPI and it's initial implementation
[govpp.git] / core / connection.go
1 // Copyright (c) 2017 Cisco and/or its affiliates.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 package core
16
17 import (
18         "errors"
19         "fmt"
20         "reflect"
21         "sync"
22         "sync/atomic"
23         "time"
24
25         logger "github.com/sirupsen/logrus"
26
27         "git.fd.io/govpp.git/adapter"
28         "git.fd.io/govpp.git/api"
29         "git.fd.io/govpp.git/codec"
30 )
31
32 var (
33         RequestChanBufSize      = 100 // default size of the request channel buffer
34         ReplyChanBufSize        = 100 // default size of the reply channel buffer
35         NotificationChanBufSize = 100 // default size of the notification channel buffer
36 )
37
38 var (
39         HealthCheckProbeInterval = time.Second * 1        // default health check probe interval
40         HealthCheckReplyTimeout  = time.Millisecond * 100 // timeout for reply to a health check probe
41         HealthCheckThreshold     = 1                      // number of failed health checks until the error is reported
42         DefaultReplyTimeout      = time.Second * 1        // default timeout for replies from VPP
43 )
44
45 // ConnectionState represents the current state of the connection to VPP.
46 type ConnectionState int
47
48 const (
49         // Connected represents state in which the connection has been successfully established.
50         Connected ConnectionState = iota
51
52         // Disconnected represents state in which the connection has been dropped.
53         Disconnected
54 )
55
56 // ConnectionEvent is a notification about change in the VPP connection state.
57 type ConnectionEvent struct {
58         // Timestamp holds the time when the event has been created.
59         Timestamp time.Time
60
61         // State holds the new state of the connection at the time when the event has been created.
62         State ConnectionState
63
64         // Error holds error if any encountered.
65         Error error
66 }
67
68 var (
69         connLock sync.RWMutex // lock for the global connection
70         conn     *Connection  // global handle to the Connection (used in the message receive callback)
71 )
72
73 // Connection represents a shared memory connection to VPP via vppAdapter.
74 type Connection struct {
75         vppClient adapter.VppAPI // VPP binary API client adapter
76
77         vppConnected uint32 // non-zero if the adapter is connected to VPP
78
79         codec  *codec.MsgCodec        // message codec
80         msgIDs map[string]uint16      // map of message IDs indexed by message name + CRC
81         msgMap map[uint16]api.Message // map of messages indexed by message ID
82
83         maxChannelID uint32              // maximum used channel ID (the real limit is 2^15, 32-bit is used for atomic operations)
84         channelsLock sync.RWMutex        // lock for the channels map
85         channels     map[uint16]*Channel // map of all API channels indexed by the channel ID
86
87         subscriptionsLock sync.RWMutex                  // lock for the subscriptions map
88         subscriptions     map[uint16][]*subscriptionCtx // map od all notification subscriptions indexed by message ID
89
90         pingReqID   uint16 // ID if the ControlPing message
91         pingReplyID uint16 // ID of the ControlPingReply message
92
93         lastReplyLock sync.Mutex // lock for the last reply
94         lastReply     time.Time  // time of the last received reply from VPP
95 }
96
97 func newConnection(binapi adapter.VppAPI) *Connection {
98         c := &Connection{
99                 vppClient:     binapi,
100                 codec:         &codec.MsgCodec{},
101                 msgIDs:        make(map[string]uint16),
102                 msgMap:        make(map[uint16]api.Message),
103                 channels:      make(map[uint16]*Channel),
104                 subscriptions: make(map[uint16][]*subscriptionCtx),
105         }
106         binapi.SetMsgCallback(c.msgCallback)
107         return c
108 }
109
110 // Connect connects to VPP using specified VPP adapter and returns the connection handle.
111 // This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed.
112 func Connect(binapi adapter.VppAPI) (*Connection, error) {
113         // create new connection handle
114         c, err := createConnection(binapi)
115         if err != nil {
116                 return nil, err
117         }
118
119         // blocking attempt to connect to VPP
120         if err := c.connectVPP(); err != nil {
121                 return nil, err
122         }
123
124         return c, nil
125 }
126
127 // AsyncConnect asynchronously connects to VPP using specified VPP adapter and returns the connection handle
128 // and ConnectionState channel. This call does not block until connection is established, it
129 // returns immediately. The caller is supposed to watch the returned ConnectionState channel for
130 // Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect.
131 func AsyncConnect(binapi adapter.VppAPI) (*Connection, chan ConnectionEvent, error) {
132         // create new connection handle
133         c, err := createConnection(binapi)
134         if err != nil {
135                 return nil, nil, err
136         }
137
138         // asynchronously attempt to connect to VPP
139         connChan := make(chan ConnectionEvent, NotificationChanBufSize)
140         go c.connectLoop(connChan)
141
142         return c, connChan, nil
143 }
144
145 // Disconnect disconnects from VPP and releases all connection-related resources.
146 func (c *Connection) Disconnect() {
147         if c == nil {
148                 return
149         }
150
151         connLock.Lock()
152         defer connLock.Unlock()
153
154         if c.vppClient != nil {
155                 c.disconnectVPP()
156         }
157         conn = nil
158 }
159
160 // newConnection returns new connection handle.
161 func createConnection(binapi adapter.VppAPI) (*Connection, error) {
162         connLock.Lock()
163         defer connLock.Unlock()
164
165         if conn != nil {
166                 return nil, errors.New("only one connection per process is supported")
167         }
168
169         conn = newConnection(binapi)
170
171         return conn, nil
172 }
173
174 // connectVPP performs blocking attempt to connect to VPP.
175 func (c *Connection) connectVPP() error {
176         log.Debug("Connecting to VPP..")
177
178         // blocking connect
179         if err := c.vppClient.Connect(); err != nil {
180                 return err
181         }
182
183         log.Debugf("Connected to VPP.")
184
185         if err := c.retrieveMessageIDs(); err != nil {
186                 c.vppClient.Disconnect()
187                 return fmt.Errorf("VPP is incompatible: %v", err)
188         }
189
190         // store connected state
191         atomic.StoreUint32(&c.vppConnected, 1)
192
193         return nil
194 }
195
196 func (c *Connection) NewAPIChannel() (api.Channel, error) {
197         return c.newAPIChannel(RequestChanBufSize, ReplyChanBufSize)
198 }
199
200 func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (api.Channel, error) {
201         return c.newAPIChannel(reqChanBufSize, replyChanBufSize)
202 }
203
204 // NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core.
205 // It allows to specify custom buffer sizes for the request and reply Go channels.
206 func (c *Connection) newAPIChannel(reqChanBufSize, replyChanBufSize int) (*Channel, error) {
207         if c == nil {
208                 return nil, errors.New("nil connection passed in")
209         }
210
211         // create new channel
212         chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff)
213         channel := newChannel(chID, c, c.codec, c, reqChanBufSize, replyChanBufSize)
214
215         // store API channel within the client
216         c.channelsLock.Lock()
217         c.channels[chID] = channel
218         c.channelsLock.Unlock()
219
220         // start watching on the request channel
221         go c.watchRequests(channel)
222
223         return channel, nil
224 }
225
226 // releaseAPIChannel releases API channel that needs to be closed.
227 func (c *Connection) releaseAPIChannel(ch *Channel) {
228         log.WithFields(logger.Fields{
229                 "channel": ch.id,
230         }).Debug("API channel released")
231
232         // delete the channel from channels map
233         c.channelsLock.Lock()
234         delete(c.channels, ch.id)
235         c.channelsLock.Unlock()
236 }
237
238 // GetMessageID returns message identifier of given API message.
239 func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
240         if c == nil {
241                 return 0, errors.New("nil connection passed in")
242         }
243
244         if msgID, ok := c.msgIDs[getMsgNameWithCrc(msg)]; ok {
245                 return msgID, nil
246         }
247
248         return 0, fmt.Errorf("unknown message: %s (%s)", msg.GetMessageName(), msg.GetCrcString())
249 }
250
251 // LookupByID looks up message name and crc by ID.
252 func (c *Connection) LookupByID(msgID uint16) (api.Message, error) {
253         if c == nil {
254                 return nil, errors.New("nil connection passed in")
255         }
256
257         if msg, ok := c.msgMap[msgID]; ok {
258                 return msg, nil
259         }
260
261         return nil, fmt.Errorf("unknown message ID: %d", msgID)
262 }
263
264 // retrieveMessageIDs retrieves IDs for all registered messages and stores them in map
265 func (c *Connection) retrieveMessageIDs() (err error) {
266         t := time.Now()
267
268         var addMsg = func(msgID uint16, msg api.Message) {
269                 c.msgIDs[getMsgNameWithCrc(msg)] = msgID
270                 c.msgMap[msgID] = msg
271         }
272
273         msgs := api.GetAllMessages()
274
275         for name, msg := range msgs {
276                 msgID, err := c.vppClient.GetMsgID(msg.GetMessageName(), msg.GetCrcString())
277                 if err != nil {
278                         return err
279                 }
280
281                 addMsg(msgID, msg)
282
283                 if msg.GetMessageName() == msgControlPing.GetMessageName() {
284                         c.pingReqID = msgID
285                         msgControlPing = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
286                 } else if msg.GetMessageName() == msgControlPingReply.GetMessageName() {
287                         c.pingReplyID = msgID
288                         msgControlPingReply = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
289                 }
290
291                 if debugMsgIDs {
292                         log.Debugf("message %q (%s) has ID: %d", name, getMsgNameWithCrc(msg), msgID)
293                 }
294         }
295
296         log.Debugf("retrieving %d message IDs took %s", len(msgs), time.Since(t))
297
298         // fallback for control ping when vpe package is not imported
299         if c.pingReqID == 0 {
300                 c.pingReqID, err = c.vppClient.GetMsgID(msgControlPing.GetMessageName(), msgControlPing.GetCrcString())
301                 if err != nil {
302                         return err
303                 }
304                 addMsg(c.pingReqID, msgControlPing)
305         }
306         if c.pingReplyID == 0 {
307                 c.pingReplyID, err = c.vppClient.GetMsgID(msgControlPingReply.GetMessageName(), msgControlPingReply.GetCrcString())
308                 if err != nil {
309                         return err
310                 }
311                 addMsg(c.pingReplyID, msgControlPingReply)
312         }
313
314         return nil
315 }
316
317 // disconnectVPP disconnects from VPP in case it is connected.
318 func (c *Connection) disconnectVPP() {
319         if atomic.CompareAndSwapUint32(&c.vppConnected, 1, 0) {
320                 c.vppClient.Disconnect()
321         }
322 }
323
324 // connectLoop attempts to connect to VPP until it succeeds.
325 // Then it continues with healthCheckLoop.
326 func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
327         // loop until connected
328         for {
329                 if err := c.vppClient.WaitReady(); err != nil {
330                         log.Warnf("wait ready failed: %v", err)
331                 }
332                 if err := c.connectVPP(); err == nil {
333                         // signal connected event
334                         connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected}
335                         break
336                 } else {
337                         log.Errorf("connecting to VPP failed: %v", err)
338                         time.Sleep(time.Second)
339                 }
340         }
341
342         // we are now connected, continue with health check loop
343         c.healthCheckLoop(connChan)
344 }
345
346 // healthCheckLoop checks whether connection to VPP is alive. In case of disconnect,
347 // it continues with connectLoop and tries to reconnect.
348 func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
349         // create a separate API channel for health check probes
350         ch, err := c.newAPIChannel(1, 1)
351         if err != nil {
352                 log.Error("Failed to create health check API channel, health check will be disabled:", err)
353                 return
354         }
355
356         var (
357                 sinceLastReply time.Duration
358                 failedChecks   int
359         )
360
361         // send health check probes until an error or timeout occurs
362         for {
363                 // sleep until next health check probe period
364                 time.Sleep(HealthCheckProbeInterval)
365
366                 if atomic.LoadUint32(&c.vppConnected) == 0 {
367                         // Disconnect has been called in the meantime, return the healthcheck - reconnect loop
368                         log.Debug("Disconnected on request, exiting health check loop.")
369                         return
370                 }
371
372                 // try draining probe replies from previous request before sending next one
373                 select {
374                 case <-ch.replyChan:
375                         log.Debug("drained old probe reply from reply channel")
376                 default:
377                 }
378
379                 // send the control ping request
380                 ch.reqChan <- &vppRequest{msg: msgControlPing}
381
382                 for {
383                         // expect response within timeout period
384                         select {
385                         case vppReply := <-ch.replyChan:
386                                 err = vppReply.err
387
388                         case <-time.After(HealthCheckReplyTimeout):
389                                 err = ErrProbeTimeout
390
391                                 // check if time since last reply from any other
392                                 // channel is less than health check reply timeout
393                                 c.lastReplyLock.Lock()
394                                 sinceLastReply = time.Since(c.lastReply)
395                                 c.lastReplyLock.Unlock()
396
397                                 if sinceLastReply < HealthCheckReplyTimeout {
398                                         log.Warnf("VPP health check probe timing out, but some request on other channel was received %v ago, continue waiting!", sinceLastReply)
399                                         continue
400                                 }
401                         }
402                         break
403                 }
404
405                 if err == ErrProbeTimeout {
406                         failedChecks++
407                         log.Warnf("VPP health check probe timed out after %v (%d. timeout)", HealthCheckReplyTimeout, failedChecks)
408                         if failedChecks > HealthCheckThreshold {
409                                 // in case of exceeded failed check treshold, assume VPP disconnected
410                                 log.Errorf("VPP health check exceeded treshold for timeouts (>%d), assuming disconnect", HealthCheckThreshold)
411                                 connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
412                                 break
413                         }
414                 } else if err != nil {
415                         // in case of error, assume VPP disconnected
416                         log.Errorf("VPP health check probe failed: %v", err)
417                         connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: err}
418                         break
419                 } else if failedChecks > 0 {
420                         // in case of success after failed checks, clear failed check counter
421                         failedChecks = 0
422                         log.Infof("VPP health check probe OK")
423                 }
424         }
425
426         // cleanup
427         ch.Close()
428         c.disconnectVPP()
429
430         // we are now disconnected, start connect loop
431         c.connectLoop(connChan)
432 }
433
434 func getMsgNameWithCrc(x api.Message) string {
435         return x.GetMessageName() + "_" + x.GetCrcString()
436 }