Generator improvements
[govpp.git] / core / connection.go
1 // Copyright (c) 2017 Cisco and/or its affiliates.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 package core
16
17 import (
18         "errors"
19         "fmt"
20         "reflect"
21         "sync"
22         "sync/atomic"
23         "time"
24
25         logger "github.com/sirupsen/logrus"
26
27         "git.fd.io/govpp.git/adapter"
28         "git.fd.io/govpp.git/api"
29         "git.fd.io/govpp.git/codec"
30 )
31
32 var (
33         RequestChanBufSize      = 100 // default size of the request channel buffer
34         ReplyChanBufSize        = 100 // default size of the reply channel buffer
35         NotificationChanBufSize = 100 // default size of the notification channel buffer
36 )
37
38 var (
39         HealthCheckProbeInterval = time.Second * 1        // default health check probe interval
40         HealthCheckReplyTimeout  = time.Millisecond * 100 // timeout for reply to a health check probe
41         HealthCheckThreshold     = 1                      // number of failed health checks until the error is reported
42         DefaultReplyTimeout      = time.Second * 1        // default timeout for replies from VPP
43 )
44
45 // ConnectionState represents the current state of the connection to VPP.
46 type ConnectionState int
47
48 const (
49         // Connected represents state in which the connection has been successfully established.
50         Connected ConnectionState = iota
51
52         // Disconnected represents state in which the connection has been dropped.
53         Disconnected
54 )
55
56 // ConnectionEvent is a notification about change in the VPP connection state.
57 type ConnectionEvent struct {
58         // Timestamp holds the time when the event has been created.
59         Timestamp time.Time
60
61         // State holds the new state of the connection at the time when the event has been created.
62         State ConnectionState
63
64         // Error holds error if any encountered.
65         Error error
66 }
67
68 // Connection represents a shared memory connection to VPP via vppAdapter.
69 type Connection struct {
70         vppClient adapter.VppAPI // VPP binary API client adapter
71
72         vppConnected uint32 // non-zero if the adapter is connected to VPP
73
74         codec  *codec.MsgCodec        // message codec
75         msgIDs map[string]uint16      // map of message IDs indexed by message name + CRC
76         msgMap map[uint16]api.Message // map of messages indexed by message ID
77
78         maxChannelID uint32              // maximum used channel ID (the real limit is 2^15, 32-bit is used for atomic operations)
79         channelsLock sync.RWMutex        // lock for the channels map
80         channels     map[uint16]*Channel // map of all API channels indexed by the channel ID
81
82         subscriptionsLock sync.RWMutex                  // lock for the subscriptions map
83         subscriptions     map[uint16][]*subscriptionCtx // map od all notification subscriptions indexed by message ID
84
85         pingReqID   uint16 // ID if the ControlPing message
86         pingReplyID uint16 // ID of the ControlPingReply message
87
88         lastReplyLock sync.Mutex // lock for the last reply
89         lastReply     time.Time  // time of the last received reply from VPP
90 }
91
92 func newConnection(binapi adapter.VppAPI) *Connection {
93         c := &Connection{
94                 vppClient:     binapi,
95                 codec:         &codec.MsgCodec{},
96                 msgIDs:        make(map[string]uint16),
97                 msgMap:        make(map[uint16]api.Message),
98                 channels:      make(map[uint16]*Channel),
99                 subscriptions: make(map[uint16][]*subscriptionCtx),
100         }
101         binapi.SetMsgCallback(c.msgCallback)
102         return c
103 }
104
105 // Connect connects to VPP using specified VPP adapter and returns the connection handle.
106 // This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed.
107 func Connect(binapi adapter.VppAPI) (*Connection, error) {
108         // create new connection handle
109         c := newConnection(binapi)
110
111         // blocking attempt to connect to VPP
112         if err := c.connectVPP(); err != nil {
113                 return nil, err
114         }
115
116         return c, nil
117 }
118
119 // AsyncConnect asynchronously connects to VPP using specified VPP adapter and returns the connection handle
120 // and ConnectionState channel. This call does not block until connection is established, it
121 // returns immediately. The caller is supposed to watch the returned ConnectionState channel for
122 // Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect.
123 func AsyncConnect(binapi adapter.VppAPI) (*Connection, chan ConnectionEvent, error) {
124         // create new connection handle
125         c := newConnection(binapi)
126
127         // asynchronously attempt to connect to VPP
128         connChan := make(chan ConnectionEvent, NotificationChanBufSize)
129         go c.connectLoop(connChan)
130
131         return c, connChan, nil
132 }
133
134 // connectVPP performs blocking attempt to connect to VPP.
135 func (c *Connection) connectVPP() error {
136         log.Debug("Connecting to VPP..")
137
138         // blocking connect
139         if err := c.vppClient.Connect(); err != nil {
140                 return err
141         }
142
143         log.Debugf("Connected to VPP.")
144
145         if err := c.retrieveMessageIDs(); err != nil {
146                 c.vppClient.Disconnect()
147                 return fmt.Errorf("VPP is incompatible: %v", err)
148         }
149
150         // store connected state
151         atomic.StoreUint32(&c.vppConnected, 1)
152
153         return nil
154 }
155
156 // Disconnect disconnects from VPP and releases all connection-related resources.
157 func (c *Connection) Disconnect() {
158         if c == nil {
159                 return
160         }
161
162         if c.vppClient != nil {
163                 c.disconnectVPP()
164         }
165 }
166
167 // disconnectVPP disconnects from VPP in case it is connected.
168 func (c *Connection) disconnectVPP() {
169         if atomic.CompareAndSwapUint32(&c.vppConnected, 1, 0) {
170                 c.vppClient.Disconnect()
171         }
172 }
173
174 func (c *Connection) NewAPIChannel() (api.Channel, error) {
175         return c.newAPIChannel(RequestChanBufSize, ReplyChanBufSize)
176 }
177
178 func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (api.Channel, error) {
179         return c.newAPIChannel(reqChanBufSize, replyChanBufSize)
180 }
181
182 // NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core.
183 // It allows to specify custom buffer sizes for the request and reply Go channels.
184 func (c *Connection) newAPIChannel(reqChanBufSize, replyChanBufSize int) (*Channel, error) {
185         if c == nil {
186                 return nil, errors.New("nil connection passed in")
187         }
188
189         // create new channel
190         chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff)
191         channel := newChannel(chID, c, c.codec, c, reqChanBufSize, replyChanBufSize)
192
193         // store API channel within the client
194         c.channelsLock.Lock()
195         c.channels[chID] = channel
196         c.channelsLock.Unlock()
197
198         // start watching on the request channel
199         go c.watchRequests(channel)
200
201         return channel, nil
202 }
203
204 // releaseAPIChannel releases API channel that needs to be closed.
205 func (c *Connection) releaseAPIChannel(ch *Channel) {
206         log.WithFields(logger.Fields{
207                 "channel": ch.id,
208         }).Debug("API channel released")
209
210         // delete the channel from channels map
211         c.channelsLock.Lock()
212         delete(c.channels, ch.id)
213         c.channelsLock.Unlock()
214 }
215
216 // GetMessageID returns message identifier of given API message.
217 func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
218         if c == nil {
219                 return 0, errors.New("nil connection passed in")
220         }
221
222         if msgID, ok := c.msgIDs[getMsgNameWithCrc(msg)]; ok {
223                 return msgID, nil
224         }
225
226         return 0, fmt.Errorf("unknown message: %s (%s)", msg.GetMessageName(), msg.GetCrcString())
227 }
228
229 // LookupByID looks up message name and crc by ID.
230 func (c *Connection) LookupByID(msgID uint16) (api.Message, error) {
231         if c == nil {
232                 return nil, errors.New("nil connection passed in")
233         }
234
235         if msg, ok := c.msgMap[msgID]; ok {
236                 return msg, nil
237         }
238
239         return nil, fmt.Errorf("unknown message ID: %d", msgID)
240 }
241
242 // retrieveMessageIDs retrieves IDs for all registered messages and stores them in map
243 func (c *Connection) retrieveMessageIDs() (err error) {
244         t := time.Now()
245
246         var addMsg = func(msgID uint16, msg api.Message) {
247                 c.msgIDs[getMsgNameWithCrc(msg)] = msgID
248                 c.msgMap[msgID] = msg
249         }
250
251         msgs := api.GetRegisteredMessages()
252
253         for name, msg := range msgs {
254                 msgID, err := c.vppClient.GetMsgID(msg.GetMessageName(), msg.GetCrcString())
255                 if err != nil {
256                         return err
257                 }
258
259                 addMsg(msgID, msg)
260
261                 if msg.GetMessageName() == msgControlPing.GetMessageName() {
262                         c.pingReqID = msgID
263                         msgControlPing = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
264                 } else if msg.GetMessageName() == msgControlPingReply.GetMessageName() {
265                         c.pingReplyID = msgID
266                         msgControlPingReply = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
267                 }
268
269                 if debugMsgIDs {
270                         log.Debugf("message %q (%s) has ID: %d", name, getMsgNameWithCrc(msg), msgID)
271                 }
272         }
273
274         log.Debugf("retrieving %d message IDs took %s", len(msgs), time.Since(t))
275
276         // fallback for control ping when vpe package is not imported
277         if c.pingReqID == 0 {
278                 c.pingReqID, err = c.vppClient.GetMsgID(msgControlPing.GetMessageName(), msgControlPing.GetCrcString())
279                 if err != nil {
280                         return err
281                 }
282                 addMsg(c.pingReqID, msgControlPing)
283         }
284         if c.pingReplyID == 0 {
285                 c.pingReplyID, err = c.vppClient.GetMsgID(msgControlPingReply.GetMessageName(), msgControlPingReply.GetCrcString())
286                 if err != nil {
287                         return err
288                 }
289                 addMsg(c.pingReplyID, msgControlPingReply)
290         }
291
292         return nil
293 }
294
295 // connectLoop attempts to connect to VPP until it succeeds.
296 // Then it continues with healthCheckLoop.
297 func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
298         // loop until connected
299         for {
300                 if err := c.vppClient.WaitReady(); err != nil {
301                         log.Warnf("wait ready failed: %v", err)
302                 }
303                 if err := c.connectVPP(); err == nil {
304                         // signal connected event
305                         connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected}
306                         break
307                 } else {
308                         log.Errorf("connecting to VPP failed: %v", err)
309                         time.Sleep(time.Second)
310                 }
311         }
312
313         // we are now connected, continue with health check loop
314         c.healthCheckLoop(connChan)
315 }
316
317 // healthCheckLoop checks whether connection to VPP is alive. In case of disconnect,
318 // it continues with connectLoop and tries to reconnect.
319 func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
320         // create a separate API channel for health check probes
321         ch, err := c.newAPIChannel(1, 1)
322         if err != nil {
323                 log.Error("Failed to create health check API channel, health check will be disabled:", err)
324                 return
325         }
326
327         var (
328                 sinceLastReply time.Duration
329                 failedChecks   int
330         )
331
332         // send health check probes until an error or timeout occurs
333         for {
334                 // sleep until next health check probe period
335                 time.Sleep(HealthCheckProbeInterval)
336
337                 if atomic.LoadUint32(&c.vppConnected) == 0 {
338                         // Disconnect has been called in the meantime, return the healthcheck - reconnect loop
339                         log.Debug("Disconnected on request, exiting health check loop.")
340                         return
341                 }
342
343                 // try draining probe replies from previous request before sending next one
344                 select {
345                 case <-ch.replyChan:
346                         log.Debug("drained old probe reply from reply channel")
347                 default:
348                 }
349
350                 // send the control ping request
351                 ch.reqChan <- &vppRequest{msg: msgControlPing}
352
353                 for {
354                         // expect response within timeout period
355                         select {
356                         case vppReply := <-ch.replyChan:
357                                 err = vppReply.err
358
359                         case <-time.After(HealthCheckReplyTimeout):
360                                 err = ErrProbeTimeout
361
362                                 // check if time since last reply from any other
363                                 // channel is less than health check reply timeout
364                                 c.lastReplyLock.Lock()
365                                 sinceLastReply = time.Since(c.lastReply)
366                                 c.lastReplyLock.Unlock()
367
368                                 if sinceLastReply < HealthCheckReplyTimeout {
369                                         log.Warnf("VPP health check probe timing out, but some request on other channel was received %v ago, continue waiting!", sinceLastReply)
370                                         continue
371                                 }
372                         }
373                         break
374                 }
375
376                 if err == ErrProbeTimeout {
377                         failedChecks++
378                         log.Warnf("VPP health check probe timed out after %v (%d. timeout)", HealthCheckReplyTimeout, failedChecks)
379                         if failedChecks > HealthCheckThreshold {
380                                 // in case of exceeded failed check treshold, assume VPP disconnected
381                                 log.Errorf("VPP health check exceeded treshold for timeouts (>%d), assuming disconnect", HealthCheckThreshold)
382                                 connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
383                                 break
384                         }
385                 } else if err != nil {
386                         // in case of error, assume VPP disconnected
387                         log.Errorf("VPP health check probe failed: %v", err)
388                         connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: err}
389                         break
390                 } else if failedChecks > 0 {
391                         // in case of success after failed checks, clear failed check counter
392                         failedChecks = 0
393                         log.Infof("VPP health check probe OK")
394                 }
395         }
396
397         // cleanup
398         ch.Close()
399         c.disconnectVPP()
400
401         // we are now disconnected, start connect loop
402         c.connectLoop(connChan)
403 }
404
405 func getMsgNameWithCrc(x api.Message) string {
406         return x.GetMessageName() + "_" + x.GetCrcString()
407 }