Add WaitReady to VppAdapter
[govpp.git] / core / core.go
1 // Copyright (c) 2017 Cisco and/or its affiliates.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 //go:generate binapi-generator --input-dir=bin_api --output-dir=bin_api
16
17 package core
18
19 import (
20         "errors"
21         "os"
22         "sync"
23         "sync/atomic"
24         "time"
25
26         logger "github.com/Sirupsen/logrus"
27
28         "git.fd.io/govpp.git/adapter"
29         "git.fd.io/govpp.git/api"
30         "git.fd.io/govpp.git/core/bin_api/vpe"
31 )
32
33 const (
34         requestChannelBufSize      = 100 // default size of the request channel buffers
35         replyChannelBufSize        = 100 // default size of the reply channel buffers
36         notificationChannelBufSize = 100 // default size of the notification channel buffers
37 )
38
39 var (
40         healthCheckProbeInterval = time.Second * 1        // default health check probe interval
41         healthCheckReplyTimeout  = time.Millisecond * 100 // timeout for reply to a health check probe
42         healthCheckThreshold     = 1                      // number of failed healthProbe until the error is reported
43 )
44
45 // ConnectionState holds the current state of the connection to VPP.
46 type ConnectionState int
47
48 const (
49         // Connected connection state means that the connection to VPP has been successfully established.
50         Connected ConnectionState = iota
51
52         // Disconnected connection state means that the connection to VPP has been lost.
53         Disconnected = iota
54 )
55
56 // ConnectionEvent is a notification about change in the VPP connection state.
57 type ConnectionEvent struct {
58         // Timestamp holds the time when the event has been generated.
59         Timestamp time.Time
60
61         // State holds the new state of the connection to VPP at the time when the event has been generated.
62         State ConnectionState
63 }
64
65 // Connection represents a shared memory connection to VPP via vppAdapter.
66 type Connection struct {
67         vpp       adapter.VppAdapter // VPP adapter
68         connected uint32             // non-zero if the adapter is connected to VPP
69         codec     *MsgCodec          // message codec
70
71         msgIDs     map[string]uint16 // map of message IDs indexed by message name + CRC
72         msgIDsLock sync.RWMutex      // lock for the message IDs map
73
74         channels     map[uint32]*api.Channel // map of all API channels indexed by the channel ID
75         channelsLock sync.RWMutex            // lock for the channels map
76
77         notifSubscriptions     map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID
78         notifSubscriptionsLock sync.RWMutex                        // lock for the subscriptions map
79
80         maxChannelID uint32 // maximum used client ID
81         pingReqID    uint16 // ID if the ControlPing message
82         pingReplyID  uint16 // ID of the ControlPingReply message
83 }
84
85 // channelMetadata contains core-local metadata of an API channel.
86 type channelMetadata struct {
87         id        uint32 // channel ID
88         multipart uint32 // 1 if multipart request is being processed, 0 otherwise
89 }
90
91 var (
92         log      *logger.Logger // global logger
93         conn     *Connection    // global handle to the Connection (used in the message receive callback)
94         connLock sync.RWMutex   // lock for the global connection
95 )
96
97 // init initializes global logger, which logs debug level messages to stdout.
98 func init() {
99         log = logger.New()
100         log.Out = os.Stdout
101         log.Level = logger.DebugLevel
102 }
103
104 // SetLogger sets global logger to provided one.
105 func SetLogger(l *logger.Logger) {
106         log = l
107 }
108
109 // SetHealthCheckProbeInterval sets health check probe interval.
110 // Beware: Function is not thread-safe. It is recommended to setup this parameter
111 // before connecting to vpp.
112 func SetHealthCheckProbeInterval(interval time.Duration) {
113         healthCheckProbeInterval = interval
114 }
115
116 // SetHealthCheckReplyTimeout sets timeout for reply to a health check probe.
117 // If reply arrives after the timeout, check is considered as failed.
118 // Beware: Function is not thread-safe. It is recommended to setup this parameter
119 // before connecting to vpp.
120 func SetHealthCheckReplyTimeout(timeout time.Duration) {
121         healthCheckReplyTimeout = timeout
122 }
123
124 // SetHealthCheckThreshold sets the number of failed healthProbe checks until the error is reported.
125 // Beware: Function is not thread-safe. It is recommended to setup this parameter
126 // before connecting to vpp.
127 func SetHealthCheckThreshold(threshold int) {
128         healthCheckThreshold = threshold
129 }
130
131 // Connect connects to VPP using specified VPP adapter and returns the connection handle.
132 // This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed.
133 func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) {
134         // create new connection handle
135         c, err := newConnection(vppAdapter)
136         if err != nil {
137                 return nil, err
138         }
139
140         // blocking attempt to connect to VPP
141         err = c.connectVPP()
142         if err != nil {
143                 return nil, err
144         }
145
146         return conn, nil
147 }
148
149 // AsyncConnect asynchronously connects to VPP using specified VPP adapter and returns the connection handle
150 // and ConnectionState channel. This call does not block until connection is established, it
151 // returns immediately. The caller is supposed to watch the returned ConnectionState channel for
152 // Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect.
153 func AsyncConnect(vppAdapter adapter.VppAdapter) (*Connection, chan ConnectionEvent, error) {
154         // create new connection handle
155         c, err := newConnection(vppAdapter)
156         if err != nil {
157                 return nil, nil, err
158         }
159
160         // asynchronously attempt to connect to VPP
161         connChan := make(chan ConnectionEvent, notificationChannelBufSize)
162         go c.connectLoop(connChan)
163
164         return conn, connChan, nil
165 }
166
167 // Disconnect disconnects from VPP and releases all connection-related resources.
168 func (c *Connection) Disconnect() {
169         if c == nil {
170                 return
171         }
172         connLock.Lock()
173         defer connLock.Unlock()
174
175         if c != nil && c.vpp != nil {
176                 c.disconnectVPP()
177         }
178         conn = nil
179 }
180
181 // newConnection returns new connection handle.
182 func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) {
183         connLock.Lock()
184         defer connLock.Unlock()
185
186         if conn != nil {
187                 return nil, errors.New("only one connection per process is supported")
188         }
189
190         conn = &Connection{
191                 vpp:                vppAdapter,
192                 codec:              &MsgCodec{},
193                 channels:           make(map[uint32]*api.Channel),
194                 msgIDs:             make(map[string]uint16),
195                 notifSubscriptions: make(map[uint16][]*api.NotifSubscription),
196         }
197
198         conn.vpp.SetMsgCallback(msgCallback)
199         return conn, nil
200 }
201
202 // connectVPP performs one blocking attempt to connect to VPP.
203 func (c *Connection) connectVPP() error {
204         log.Debug("Connecting to VPP...")
205
206         // blocking connect
207         err := c.vpp.Connect()
208         if err != nil {
209                 log.Warn(err)
210                 return err
211         }
212
213         // store connected state
214         atomic.StoreUint32(&c.connected, 1)
215
216         // store control ping IDs
217         c.pingReqID, _ = c.GetMessageID(&vpe.ControlPing{})
218         c.pingReplyID, _ = c.GetMessageID(&vpe.ControlPingReply{})
219
220         log.Info("Connected to VPP.")
221         return nil
222 }
223
224 // disconnectVPP disconnects from VPP in case it is connected.
225 func (c *Connection) disconnectVPP() {
226         if atomic.CompareAndSwapUint32(&c.connected, 1, 0) {
227                 c.vpp.Disconnect()
228         }
229 }
230
231 // connectLoop attempts to connect to VPP until it succeeds.
232 // Then it continues with healthCheckLoop.
233 func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
234         // loop until connected
235         waitForVpp := c.vpp.WaitReady()
236         for {
237                 waitForVpp()
238                 if err := c.connectVPP(); err == nil {
239                         // signal connected event
240                         connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected}
241                         break
242                 }
243         }
244
245         // we are now connected, continue with health check loop
246         c.healthCheckLoop(connChan)
247 }
248
249 // healthCheckLoop checks whether connection to VPP is alive. In case of disconnect,
250 // it continues with connectLoop and tries to reconnect.
251 func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
252         // create a separate API channel for health check probes
253         ch, err := conn.NewAPIChannel()
254         if err != nil {
255                 log.Error("Error by creating health check API channel, health check will be disabled:", err)
256                 return
257         }
258
259         failedChecks := 0
260         // send health check probes until an error occurs
261         for {
262                 // wait for healthCheckProbeInterval
263                 <-time.After(healthCheckProbeInterval)
264
265                 if atomic.LoadUint32(&c.connected) == 0 {
266                         // Disconnect has been called in the meantime, return the healthcheck - reconnect loop
267                         log.Debug("Disconnected on request, exiting health check loop.")
268                         return
269                 }
270
271                 // send the control ping
272                 ch.ReqChan <- &api.VppRequest{Message: &vpe.ControlPing{}}
273
274                 // expect response within timeout period
275                 select {
276                 case vppReply := <-ch.ReplyChan:
277                         err = vppReply.Error
278                 case <-time.After(healthCheckReplyTimeout):
279                         err = errors.New("probe reply not received within the timeout period")
280                 }
281
282                 if err != nil {
283                         failedChecks++
284                 } else {
285                         failedChecks = 0
286                 }
287
288                 if failedChecks >= healthCheckThreshold {
289                         // in case of error, break & disconnect
290                         log.Errorf("VPP health check failed: %v", err)
291                         // signal disconnected event via channel
292                         connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
293                         break
294                 }
295         }
296
297         // cleanup
298         ch.Close()
299         c.disconnectVPP()
300
301         // we are now disconnected, start connect loop
302         c.connectLoop(connChan)
303 }
304
305 // NewAPIChannel returns a new API channel for communication with VPP via govpp core.
306 // It uses default buffer sizes for the request and reply Go channels.
307 func (c *Connection) NewAPIChannel() (*api.Channel, error) {
308         if c == nil {
309                 return nil, errors.New("nil connection passed in")
310         }
311         return c.NewAPIChannelBuffered(requestChannelBufSize, replyChannelBufSize)
312 }
313
314 // NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core.
315 // It allows to specify custom buffer sizes for the request and reply Go channels.
316 func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (*api.Channel, error) {
317         if c == nil {
318                 return nil, errors.New("nil connection passed in")
319         }
320         chID := atomic.AddUint32(&c.maxChannelID, 1)
321         chMeta := &channelMetadata{id: chID}
322
323         ch := api.NewChannelInternal(chMeta)
324         ch.MsgDecoder = c.codec
325         ch.MsgIdentifier = c
326
327         // create the communication channels
328         ch.ReqChan = make(chan *api.VppRequest, reqChanBufSize)
329         ch.ReplyChan = make(chan *api.VppReply, replyChanBufSize)
330         ch.NotifSubsChan = make(chan *api.NotifSubscribeRequest, reqChanBufSize)
331         ch.NotifSubsReplyChan = make(chan error, replyChanBufSize)
332
333         // store API channel within the client
334         c.channelsLock.Lock()
335         c.channels[chID] = ch
336         c.channelsLock.Unlock()
337
338         // start watching on the request channel
339         go c.watchRequests(ch, chMeta)
340
341         return ch, nil
342 }
343
344 // releaseAPIChannel releases API channel that needs to be closed.
345 func (c *Connection) releaseAPIChannel(ch *api.Channel, chMeta *channelMetadata) {
346         log.WithFields(logger.Fields{
347                 "context": chMeta.id,
348         }).Debug("API channel closed.")
349
350         // delete the channel from channels map
351         c.channelsLock.Lock()
352         delete(c.channels, chMeta.id)
353         c.channelsLock.Unlock()
354 }