Add support for using multiple generated versions
[govpp.git] / core / connection.go
1 // Copyright (c) 2017 Cisco and/or its affiliates.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 package core
16
17 import (
18         "errors"
19         "fmt"
20         "reflect"
21         "sync"
22         "sync/atomic"
23         "time"
24
25         logger "github.com/sirupsen/logrus"
26
27         "git.fd.io/govpp.git/adapter"
28         "git.fd.io/govpp.git/api"
29         "git.fd.io/govpp.git/codec"
30 )
31
32 var (
33         RequestChanBufSize      = 100 // default size of the request channel buffer
34         ReplyChanBufSize        = 100 // default size of the reply channel buffer
35         NotificationChanBufSize = 100 // default size of the notification channel buffer
36 )
37
38 var (
39         HealthCheckProbeInterval = time.Second * 1        // default health check probe interval
40         HealthCheckReplyTimeout  = time.Millisecond * 100 // timeout for reply to a health check probe
41         HealthCheckThreshold     = 1                      // number of failed health checks until the error is reported
42         DefaultReplyTimeout      = time.Second * 1        // default timeout for replies from VPP
43         ReconnectInterval        = time.Second * 1        // default interval for reconnect attempts
44         MaxReconnectAttempts     = 10                     // maximum number of reconnect attempts
45 )
46
47 // ConnectionState represents the current state of the connection to VPP.
48 type ConnectionState int
49
50 const (
51         // Connected represents state in which the connection has been successfully established.
52         Connected ConnectionState = iota
53
54         // Disconnected represents state in which the connection has been dropped.
55         Disconnected
56
57         // Failed represents state in which the reconnecting failed after exceeding maximum number of attempts.
58         Failed
59 )
60
61 // ConnectionEvent is a notification about change in the VPP connection state.
62 type ConnectionEvent struct {
63         // Timestamp holds the time when the event has been created.
64         Timestamp time.Time
65
66         // State holds the new state of the connection at the time when the event has been created.
67         State ConnectionState
68
69         // Error holds error if any encountered.
70         Error error
71 }
72
73 // Connection represents a shared memory connection to VPP via vppAdapter.
74 type Connection struct {
75         vppClient adapter.VppAPI // VPP binary API client adapter
76
77         vppConnected uint32 // non-zero if the adapter is connected to VPP
78
79         codec  *codec.MsgCodec        // message codec
80         msgIDs map[string]uint16      // map of message IDs indexed by message name + CRC
81         msgMap map[uint16]api.Message // map of messages indexed by message ID
82
83         maxChannelID uint32              // maximum used channel ID (the real limit is 2^15, 32-bit is used for atomic operations)
84         channelsLock sync.RWMutex        // lock for the channels map
85         channels     map[uint16]*Channel // map of all API channels indexed by the channel ID
86
87         subscriptionsLock sync.RWMutex                  // lock for the subscriptions map
88         subscriptions     map[uint16][]*subscriptionCtx // map od all notification subscriptions indexed by message ID
89
90         pingReqID   uint16 // ID if the ControlPing message
91         pingReplyID uint16 // ID of the ControlPingReply message
92
93         lastReplyLock sync.Mutex // lock for the last reply
94         lastReply     time.Time  // time of the last received reply from VPP
95 }
96
97 func newConnection(binapi adapter.VppAPI) *Connection {
98         c := &Connection{
99                 vppClient:     binapi,
100                 codec:         &codec.MsgCodec{},
101                 msgIDs:        make(map[string]uint16),
102                 msgMap:        make(map[uint16]api.Message),
103                 channels:      make(map[uint16]*Channel),
104                 subscriptions: make(map[uint16][]*subscriptionCtx),
105         }
106         binapi.SetMsgCallback(c.msgCallback)
107         return c
108 }
109
110 // Connect connects to VPP using specified VPP adapter and returns the connection handle.
111 // This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed.
112 func Connect(binapi adapter.VppAPI) (*Connection, error) {
113         // create new connection handle
114         c := newConnection(binapi)
115
116         // blocking attempt to connect to VPP
117         if err := c.connectVPP(); err != nil {
118                 return nil, err
119         }
120
121         return c, nil
122 }
123
124 // AsyncConnect asynchronously connects to VPP using specified VPP adapter and returns the connection handle
125 // and ConnectionState channel. This call does not block until connection is established, it
126 // returns immediately. The caller is supposed to watch the returned ConnectionState channel for
127 // Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect.
128 func AsyncConnect(binapi adapter.VppAPI) (*Connection, chan ConnectionEvent, error) {
129         // create new connection handle
130         c := newConnection(binapi)
131
132         // asynchronously attempt to connect to VPP
133         connChan := make(chan ConnectionEvent, NotificationChanBufSize)
134         go c.connectLoop(connChan)
135
136         return c, connChan, nil
137 }
138
139 // connectVPP performs blocking attempt to connect to VPP.
140 func (c *Connection) connectVPP() error {
141         log.Debug("Connecting to VPP..")
142
143         // blocking connect
144         if err := c.vppClient.Connect(); err != nil {
145                 return err
146         }
147
148         log.Debugf("Connected to VPP.")
149
150         if err := c.retrieveMessageIDs(); err != nil {
151                 c.vppClient.Disconnect()
152                 return fmt.Errorf("VPP is incompatible: %v", err)
153         }
154
155         // store connected state
156         atomic.StoreUint32(&c.vppConnected, 1)
157
158         return nil
159 }
160
161 // Disconnect disconnects from VPP and releases all connection-related resources.
162 func (c *Connection) Disconnect() {
163         if c == nil {
164                 return
165         }
166
167         if c.vppClient != nil {
168                 c.disconnectVPP()
169         }
170 }
171
172 // disconnectVPP disconnects from VPP in case it is connected.
173 func (c *Connection) disconnectVPP() {
174         if atomic.CompareAndSwapUint32(&c.vppConnected, 1, 0) {
175                 c.vppClient.Disconnect()
176         }
177 }
178
179 func (c *Connection) NewAPIChannel() (api.Channel, error) {
180         return c.newAPIChannel(RequestChanBufSize, ReplyChanBufSize)
181 }
182
183 func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (api.Channel, error) {
184         return c.newAPIChannel(reqChanBufSize, replyChanBufSize)
185 }
186
187 // NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core.
188 // It allows to specify custom buffer sizes for the request and reply Go channels.
189 func (c *Connection) newAPIChannel(reqChanBufSize, replyChanBufSize int) (*Channel, error) {
190         if c == nil {
191                 return nil, errors.New("nil connection passed in")
192         }
193
194         // create new channel
195         chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff)
196         channel := newChannel(chID, c, c.codec, c, reqChanBufSize, replyChanBufSize)
197
198         // store API channel within the client
199         c.channelsLock.Lock()
200         c.channels[chID] = channel
201         c.channelsLock.Unlock()
202
203         // start watching on the request channel
204         go c.watchRequests(channel)
205
206         return channel, nil
207 }
208
209 // releaseAPIChannel releases API channel that needs to be closed.
210 func (c *Connection) releaseAPIChannel(ch *Channel) {
211         log.WithFields(logger.Fields{
212                 "channel": ch.id,
213         }).Debug("API channel released")
214
215         // delete the channel from channels map
216         c.channelsLock.Lock()
217         delete(c.channels, ch.id)
218         c.channelsLock.Unlock()
219 }
220
221 // connectLoop attempts to connect to VPP until it succeeds.
222 // Then it continues with healthCheckLoop.
223 func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
224         reconnectAttempts := 0
225
226         // loop until connected
227         for {
228                 if err := c.vppClient.WaitReady(); err != nil {
229                         log.Warnf("wait ready failed: %v", err)
230                 }
231                 if err := c.connectVPP(); err == nil {
232                         // signal connected event
233                         connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected}
234                         break
235                 } else if reconnectAttempts < MaxReconnectAttempts {
236                         reconnectAttempts++
237                         log.Errorf("connecting failed (attempt %d/%d): %v", reconnectAttempts, MaxReconnectAttempts, err)
238                         time.Sleep(ReconnectInterval)
239                 } else {
240                         connChan <- ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err}
241                         return
242                 }
243         }
244
245         // we are now connected, continue with health check loop
246         c.healthCheckLoop(connChan)
247 }
248
249 // healthCheckLoop checks whether connection to VPP is alive. In case of disconnect,
250 // it continues with connectLoop and tries to reconnect.
251 func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
252         // create a separate API channel for health check probes
253         ch, err := c.newAPIChannel(1, 1)
254         if err != nil {
255                 log.Error("Failed to create health check API channel, health check will be disabled:", err)
256                 return
257         }
258
259         var (
260                 sinceLastReply time.Duration
261                 failedChecks   int
262         )
263
264         // send health check probes until an error or timeout occurs
265         for {
266                 // sleep until next health check probe period
267                 time.Sleep(HealthCheckProbeInterval)
268
269                 if atomic.LoadUint32(&c.vppConnected) == 0 {
270                         // Disconnect has been called in the meantime, return the healthcheck - reconnect loop
271                         log.Debug("Disconnected on request, exiting health check loop.")
272                         return
273                 }
274
275                 // try draining probe replies from previous request before sending next one
276                 select {
277                 case <-ch.replyChan:
278                         log.Debug("drained old probe reply from reply channel")
279                 default:
280                 }
281
282                 // send the control ping request
283                 ch.reqChan <- &vppRequest{msg: msgControlPing}
284
285                 for {
286                         // expect response within timeout period
287                         select {
288                         case vppReply := <-ch.replyChan:
289                                 err = vppReply.err
290
291                         case <-time.After(HealthCheckReplyTimeout):
292                                 err = ErrProbeTimeout
293
294                                 // check if time since last reply from any other
295                                 // channel is less than health check reply timeout
296                                 c.lastReplyLock.Lock()
297                                 sinceLastReply = time.Since(c.lastReply)
298                                 c.lastReplyLock.Unlock()
299
300                                 if sinceLastReply < HealthCheckReplyTimeout {
301                                         log.Warnf("VPP health check probe timing out, but some request on other channel was received %v ago, continue waiting!", sinceLastReply)
302                                         continue
303                                 }
304                         }
305                         break
306                 }
307
308                 if err == ErrProbeTimeout {
309                         failedChecks++
310                         log.Warnf("VPP health check probe timed out after %v (%d. timeout)", HealthCheckReplyTimeout, failedChecks)
311                         if failedChecks > HealthCheckThreshold {
312                                 // in case of exceeded failed check treshold, assume VPP disconnected
313                                 log.Errorf("VPP health check exceeded treshold for timeouts (>%d), assuming disconnect", HealthCheckThreshold)
314                                 connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
315                                 break
316                         }
317                 } else if err != nil {
318                         // in case of error, assume VPP disconnected
319                         log.Errorf("VPP health check probe failed: %v", err)
320                         connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: err}
321                         break
322                 } else if failedChecks > 0 {
323                         // in case of success after failed checks, clear failed check counter
324                         failedChecks = 0
325                         log.Infof("VPP health check probe OK")
326                 }
327         }
328
329         // cleanup
330         ch.Close()
331         c.disconnectVPP()
332
333         // we are now disconnected, start connect loop
334         c.connectLoop(connChan)
335 }
336
337 func getMsgNameWithCrc(x api.Message) string {
338         return x.GetMessageName() + "_" + x.GetCrcString()
339 }
340
341 func getMsgFactory(msg api.Message) func() api.Message {
342         return func() api.Message {
343                 return reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
344         }
345 }
346
347 // GetMessageID returns message identifier of given API message.
348 func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
349         if c == nil {
350                 return 0, errors.New("nil connection passed in")
351         }
352
353         if msgID, ok := c.msgIDs[getMsgNameWithCrc(msg)]; ok {
354                 return msgID, nil
355         }
356
357         msgID, err := c.vppClient.GetMsgID(msg.GetMessageName(), msg.GetCrcString())
358         if err != nil {
359                 return 0, err
360         }
361
362         c.msgIDs[getMsgNameWithCrc(msg)] = msgID
363         c.msgMap[msgID] = msg
364
365         return msgID, nil
366 }
367
368 // LookupByID looks up message name and crc by ID.
369 func (c *Connection) LookupByID(msgID uint16) (api.Message, error) {
370         if c == nil {
371                 return nil, errors.New("nil connection passed in")
372         }
373
374         if msg, ok := c.msgMap[msgID]; ok {
375                 return msg, nil
376         }
377
378         return nil, fmt.Errorf("unknown message ID: %d", msgID)
379 }
380
381 // retrieveMessageIDs retrieves IDs for all registered messages and stores them in map
382 func (c *Connection) retrieveMessageIDs() (err error) {
383         t := time.Now()
384
385         msgs := api.GetRegisteredMessages()
386
387         var n int
388         for name, msg := range msgs {
389                 msgID, err := c.GetMessageID(msg)
390                 if err != nil {
391                         log.Debugf("retrieving msgID for %s failed: %v", name, err)
392                         continue
393                 }
394                 n++
395
396                 if c.pingReqID == 0 && msg.GetMessageName() == msgControlPing.GetMessageName() {
397                         c.pingReqID = msgID
398                         msgControlPing = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
399                 } else if c.pingReplyID == 0 && msg.GetMessageName() == msgControlPingReply.GetMessageName() {
400                         c.pingReplyID = msgID
401                         msgControlPingReply = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
402                 }
403
404                 if debugMsgIDs {
405                         log.Debugf("message %q (%s) has ID: %d", name, getMsgNameWithCrc(msg), msgID)
406                 }
407         }
408         log.Debugf("retrieved %d/%d msgIDs (took %s)", n, len(msgs), time.Since(t))
409
410         return nil
411 }