make api.Channel as interface
[govpp.git] / core / connection.go
1 // Copyright (c) 2017 Cisco and/or its affiliates.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 //go:generate binapi-generator --input-dir=bin_api --output-dir=bin_api
16
17 package core
18
19 import (
20         "errors"
21         "os"
22         "sync"
23         "sync/atomic"
24         "time"
25
26         logger "github.com/sirupsen/logrus"
27
28         "git.fd.io/govpp.git/adapter"
29         "git.fd.io/govpp.git/api"
30         "git.fd.io/govpp.git/codec"
31         "git.fd.io/govpp.git/core/bin_api/vpe"
32 )
33
34 var (
35         msgControlPing      api.Message = &vpe.ControlPing{}
36         msgControlPingReply api.Message = &vpe.ControlPingReply{}
37 )
38
39 const (
40         requestChannelBufSize      = 100 // default size of the request channel buffers
41         replyChannelBufSize        = 100 // default size of the reply channel buffers
42         notificationChannelBufSize = 100 // default size of the notification channel buffers
43 )
44
45 var (
46         healthCheckProbeInterval = time.Second * 1        // default health check probe interval
47         healthCheckReplyTimeout  = time.Millisecond * 100 // timeout for reply to a health check probe
48         healthCheckThreshold     = 1                      // number of failed healthProbe until the error is reported
49 )
50
51 // ConnectionState holds the current state of the connection to VPP.
52 type ConnectionState int
53
54 const (
55         // Connected connection state means that the connection to VPP has been successfully established.
56         Connected ConnectionState = iota
57
58         // Disconnected connection state means that the connection to VPP has been lost.
59         Disconnected
60 )
61
62 // ConnectionEvent is a notification about change in the VPP connection state.
63 type ConnectionEvent struct {
64         // Timestamp holds the time when the event has been generated.
65         Timestamp time.Time
66
67         // State holds the new state of the connection to VPP at the time when the event has been generated.
68         State ConnectionState
69 }
70
71 // Connection represents a shared memory connection to VPP via vppAdapter.
72 type Connection struct {
73         vpp       adapter.VppAdapter // VPP adapter
74         connected uint32             // non-zero if the adapter is connected to VPP
75         codec     *codec.MsgCodec    // message codec
76
77         msgIDsLock sync.RWMutex      // lock for the message IDs map
78         msgIDs     map[string]uint16 // map of message IDs indexed by message name + CRC
79
80         channelsLock sync.RWMutex        // lock for the channels map
81         channels     map[uint16]*channel // map of all API channels indexed by the channel ID
82
83         notifSubscriptionsLock sync.RWMutex                        // lock for the subscriptions map
84         notifSubscriptions     map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID
85
86         maxChannelID uint32 // maximum used channel ID (the real limit is 2^15, 32-bit is used for atomic operations)
87         pingReqID    uint16 // ID if the ControlPing message
88         pingReplyID  uint16 // ID of the ControlPingReply message
89
90         lastReplyLock sync.Mutex // lock for the last reply
91         lastReply     time.Time  // time of the last received reply from VPP
92 }
93
94 var (
95         log      *logger.Logger // global logger
96         conn     *Connection    // global handle to the Connection (used in the message receive callback)
97         connLock sync.RWMutex   // lock for the global connection
98 )
99
100 // init initializes global logger, which logs debug level messages to stdout.
101 func init() {
102         log = logger.New()
103         log.Out = os.Stdout
104         log.Level = logger.DebugLevel
105 }
106
107 // SetLogger sets global logger to provided one.
108 func SetLogger(l *logger.Logger) {
109         log = l
110 }
111
112 // SetHealthCheckProbeInterval sets health check probe interval.
113 // Beware: Function is not thread-safe. It is recommended to setup this parameter
114 // before connecting to vpp.
115 func SetHealthCheckProbeInterval(interval time.Duration) {
116         healthCheckProbeInterval = interval
117 }
118
119 // SetHealthCheckReplyTimeout sets timeout for reply to a health check probe.
120 // If reply arrives after the timeout, check is considered as failed.
121 // Beware: Function is not thread-safe. It is recommended to setup this parameter
122 // before connecting to vpp.
123 func SetHealthCheckReplyTimeout(timeout time.Duration) {
124         healthCheckReplyTimeout = timeout
125 }
126
127 // SetHealthCheckThreshold sets the number of failed healthProbe checks until the error is reported.
128 // Beware: Function is not thread-safe. It is recommended to setup this parameter
129 // before connecting to vpp.
130 func SetHealthCheckThreshold(threshold int) {
131         healthCheckThreshold = threshold
132 }
133
134 // SetControlPingMessages sets the messages for ControlPing and ControlPingReply
135 func SetControlPingMessages(controPing, controlPingReply api.Message) {
136         msgControlPing = controPing
137         msgControlPingReply = controlPingReply
138 }
139
140 // Connect connects to VPP using specified VPP adapter and returns the connection handle.
141 // This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed.
142 func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) {
143         // create new connection handle
144         c, err := newConnection(vppAdapter)
145         if err != nil {
146                 return nil, err
147         }
148
149         // blocking attempt to connect to VPP
150         err = c.connectVPP()
151         if err != nil {
152                 return nil, err
153         }
154
155         return conn, nil
156 }
157
158 // AsyncConnect asynchronously connects to VPP using specified VPP adapter and returns the connection handle
159 // and ConnectionState channel. This call does not block until connection is established, it
160 // returns immediately. The caller is supposed to watch the returned ConnectionState channel for
161 // Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect.
162 func AsyncConnect(vppAdapter adapter.VppAdapter) (*Connection, chan ConnectionEvent, error) {
163         // create new connection handle
164         c, err := newConnection(vppAdapter)
165         if err != nil {
166                 return nil, nil, err
167         }
168
169         // asynchronously attempt to connect to VPP
170         connChan := make(chan ConnectionEvent, notificationChannelBufSize)
171         go c.connectLoop(connChan)
172
173         return conn, connChan, nil
174 }
175
176 // Disconnect disconnects from VPP and releases all connection-related resources.
177 func (c *Connection) Disconnect() {
178         if c == nil {
179                 return
180         }
181         connLock.Lock()
182         defer connLock.Unlock()
183
184         if c != nil && c.vpp != nil {
185                 c.disconnectVPP()
186         }
187         conn = nil
188 }
189
190 // newConnection returns new connection handle.
191 func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) {
192         connLock.Lock()
193         defer connLock.Unlock()
194
195         if conn != nil {
196                 return nil, errors.New("only one connection per process is supported")
197         }
198
199         conn = &Connection{
200                 vpp:                vppAdapter,
201                 codec:              &codec.MsgCodec{},
202                 channels:           make(map[uint16]*channel),
203                 msgIDs:             make(map[string]uint16),
204                 notifSubscriptions: make(map[uint16][]*api.NotifSubscription),
205         }
206
207         conn.vpp.SetMsgCallback(msgCallback)
208         return conn, nil
209 }
210
211 // connectVPP performs one blocking attempt to connect to VPP.
212 func (c *Connection) connectVPP() error {
213         log.Debug("Connecting to VPP...")
214
215         // blocking connect
216         err := c.vpp.Connect()
217         if err != nil {
218                 log.Warn(err)
219                 return err
220         }
221
222         // store control ping IDs
223         if c.pingReqID, err = c.GetMessageID(msgControlPing); err != nil {
224                 c.vpp.Disconnect()
225                 return err
226         }
227         if c.pingReplyID, err = c.GetMessageID(msgControlPingReply); err != nil {
228                 c.vpp.Disconnect()
229                 return err
230         }
231
232         // store connected state
233         atomic.StoreUint32(&c.connected, 1)
234
235         log.Info("Connected to VPP.")
236         return nil
237 }
238
239 // disconnectVPP disconnects from VPP in case it is connected.
240 func (c *Connection) disconnectVPP() {
241         if atomic.CompareAndSwapUint32(&c.connected, 1, 0) {
242                 c.vpp.Disconnect()
243         }
244 }
245
246 // connectLoop attempts to connect to VPP until it succeeds.
247 // Then it continues with healthCheckLoop.
248 func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
249         // loop until connected
250         for {
251                 if err := c.vpp.WaitReady(); err != nil {
252                         log.Warnf("wait ready failed: %v", err)
253                 }
254                 if err := c.connectVPP(); err == nil {
255                         // signal connected event
256                         connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected}
257                         break
258                 } else {
259                         log.Errorf("connecting to VPP failed: %v", err)
260                         time.Sleep(time.Second)
261                 }
262         }
263
264         // we are now connected, continue with health check loop
265         c.healthCheckLoop(connChan)
266 }
267
268 // healthCheckLoop checks whether connection to VPP is alive. In case of disconnect,
269 // it continues with connectLoop and tries to reconnect.
270 func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
271         // create a separate API channel for health check probes
272         ch, err := conn.newAPIChannelBuffered(1, 1)
273         if err != nil {
274                 log.Error("Failed to create health check API channel, health check will be disabled:", err)
275                 return
276         }
277
278         var sinceLastReply time.Duration
279         var failedChecks int
280
281         // send health check probes until an error or timeout occurs
282         for {
283                 // sleep until next health check probe period
284                 time.Sleep(healthCheckProbeInterval)
285
286                 if atomic.LoadUint32(&c.connected) == 0 {
287                         // Disconnect has been called in the meantime, return the healthcheck - reconnect loop
288                         log.Debug("Disconnected on request, exiting health check loop.")
289                         return
290                 }
291
292                 // try draining probe replies from previous request before sending next one
293                 select {
294                 case <-ch.replyChan:
295                         log.Debug("drained old probe reply from reply channel")
296                 default:
297                 }
298
299                 // send the control ping request
300                 ch.reqChan <- &api.VppRequest{Message: msgControlPing}
301
302                 for {
303                         // expect response within timeout period
304                         select {
305                         case vppReply := <-ch.replyChan:
306                                 err = vppReply.Error
307
308                         case <-time.After(healthCheckReplyTimeout):
309                                 err = ErrProbeTimeout
310
311                                 // check if time since last reply from any other
312                                 // channel is less than health check reply timeout
313                                 conn.lastReplyLock.Lock()
314                                 sinceLastReply = time.Since(c.lastReply)
315                                 conn.lastReplyLock.Unlock()
316
317                                 if sinceLastReply < healthCheckReplyTimeout {
318                                         log.Warnf("VPP health check probe timing out, but some request on other channel was received %v ago, continue waiting!", sinceLastReply)
319                                         continue
320                                 }
321                         }
322                         break
323                 }
324
325                 if err == ErrProbeTimeout {
326                         failedChecks++
327                         log.Warnf("VPP health check probe timed out after %v (%d. timeout)", healthCheckReplyTimeout, failedChecks)
328                         if failedChecks > healthCheckThreshold {
329                                 // in case of exceeded treshold disconnect
330                                 log.Errorf("VPP health check exceeded treshold for timeouts (>%d), assuming disconnect", healthCheckThreshold)
331                                 connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
332                                 break
333                         }
334                 } else if err != nil {
335                         // in case of error disconnect
336                         log.Errorf("VPP health check probe failed: %v", err)
337                         connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
338                         break
339                 } else if failedChecks > 0 {
340                         failedChecks = 0
341                         log.Infof("VPP health check probe OK")
342                 }
343         }
344
345         // cleanup
346         ch.Close()
347         c.disconnectVPP()
348
349         // we are now disconnected, start connect loop
350         c.connectLoop(connChan)
351 }
352
353 func (c *Connection) NewAPIChannel() (api.Channel, error) {
354         return c.newAPIChannelBuffered(requestChannelBufSize, replyChannelBufSize)
355 }
356
357 func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (api.Channel, error) {
358         return c.newAPIChannelBuffered(reqChanBufSize, replyChanBufSize)
359 }
360
361 // NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core.
362 // It allows to specify custom buffer sizes for the request and reply Go channels.
363 func (c *Connection) newAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (*channel, error) {
364         if c == nil {
365                 return nil, errors.New("nil connection passed in")
366         }
367
368         chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff)
369         ch := &channel{
370                 id:           chID,
371                 replyTimeout: defaultReplyTimeout,
372         }
373         ch.msgDecoder = c.codec
374         ch.msgIdentifier = c
375
376         // create the communication channels
377         ch.reqChan = make(chan *api.VppRequest, reqChanBufSize)
378         ch.replyChan = make(chan *api.VppReply, replyChanBufSize)
379         ch.notifSubsChan = make(chan *api.NotifSubscribeRequest, reqChanBufSize)
380         ch.notifSubsReplyChan = make(chan error, replyChanBufSize)
381
382         // store API channel within the client
383         c.channelsLock.Lock()
384         c.channels[chID] = ch
385         c.channelsLock.Unlock()
386
387         // start watching on the request channel
388         go c.watchRequests(ch)
389
390         return ch, nil
391 }
392
393 // releaseAPIChannel releases API channel that needs to be closed.
394 func (c *Connection) releaseAPIChannel(ch *channel) {
395         log.WithFields(logger.Fields{
396                 "ID": ch.id,
397         }).Debug("API channel closed.")
398
399         // delete the channel from channels map
400         c.channelsLock.Lock()
401         delete(c.channels, ch.id)
402         c.channelsLock.Unlock()
403 }