Lookup message name by ID when receiving unexpected message
[govpp.git] / core / core.go
1 // Copyright (c) 2017 Cisco and/or its affiliates.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 //go:generate binapi-generator --input-dir=bin_api --output-dir=bin_api
16
17 package core
18
19 import (
20         "errors"
21         "os"
22         "sync"
23         "sync/atomic"
24         "time"
25
26         logger "github.com/sirupsen/logrus"
27
28         "git.fd.io/govpp.git/adapter"
29         "git.fd.io/govpp.git/api"
30         "git.fd.io/govpp.git/core/bin_api/vpe"
31 )
32
33 var (
34         msgControlPing      api.Message = &vpe.ControlPing{}
35         msgControlPingReply api.Message = &vpe.ControlPingReply{}
36 )
37
38 const (
39         requestChannelBufSize      = 100 // default size of the request channel buffers
40         replyChannelBufSize        = 100 // default size of the reply channel buffers
41         notificationChannelBufSize = 100 // default size of the notification channel buffers
42 )
43
44 var (
45         healthCheckProbeInterval = time.Second * 1        // default health check probe interval
46         healthCheckReplyTimeout  = time.Millisecond * 100 // timeout for reply to a health check probe
47         healthCheckThreshold     = 1                      // number of failed healthProbe until the error is reported
48 )
49
50 // ConnectionState holds the current state of the connection to VPP.
51 type ConnectionState int
52
53 const (
54         // Connected connection state means that the connection to VPP has been successfully established.
55         Connected ConnectionState = iota
56
57         // Disconnected connection state means that the connection to VPP has been lost.
58         Disconnected = iota
59 )
60
61 // ConnectionEvent is a notification about change in the VPP connection state.
62 type ConnectionEvent struct {
63         // Timestamp holds the time when the event has been generated.
64         Timestamp time.Time
65
66         // State holds the new state of the connection to VPP at the time when the event has been generated.
67         State ConnectionState
68 }
69
70 // Connection represents a shared memory connection to VPP via vppAdapter.
71 type Connection struct {
72         vpp       adapter.VppAdapter // VPP adapter
73         connected uint32             // non-zero if the adapter is connected to VPP
74         codec     *MsgCodec          // message codec
75
76         msgIDsLock sync.RWMutex      // lock for the message IDs map
77         msgIDs     map[string]uint16 // map of message IDs indexed by message name + CRC
78
79         channelsLock sync.RWMutex            // lock for the channels map
80         channels     map[uint32]*api.Channel // map of all API channels indexed by the channel ID
81
82         notifSubscriptionsLock sync.RWMutex                        // lock for the subscriptions map
83         notifSubscriptions     map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID
84
85         maxChannelID uint32 // maximum used client ID
86         pingReqID    uint16 // ID if the ControlPing message
87         pingReplyID  uint16 // ID of the ControlPingReply message
88 }
89
90 // channelMetadata contains core-local metadata of an API channel.
91 type channelMetadata struct {
92         id        uint32 // channel ID
93         multipart uint32 // 1 if multipart request is being processed, 0 otherwise
94 }
95
96 var (
97         log      *logger.Logger // global logger
98         conn     *Connection    // global handle to the Connection (used in the message receive callback)
99         connLock sync.RWMutex   // lock for the global connection
100 )
101
102 // init initializes global logger, which logs debug level messages to stdout.
103 func init() {
104         log = logger.New()
105         log.Out = os.Stdout
106         log.Level = logger.DebugLevel
107 }
108
109 // SetLogger sets global logger to provided one.
110 func SetLogger(l *logger.Logger) {
111         log = l
112 }
113
114 // SetHealthCheckProbeInterval sets health check probe interval.
115 // Beware: Function is not thread-safe. It is recommended to setup this parameter
116 // before connecting to vpp.
117 func SetHealthCheckProbeInterval(interval time.Duration) {
118         healthCheckProbeInterval = interval
119 }
120
121 // SetHealthCheckReplyTimeout sets timeout for reply to a health check probe.
122 // If reply arrives after the timeout, check is considered as failed.
123 // Beware: Function is not thread-safe. It is recommended to setup this parameter
124 // before connecting to vpp.
125 func SetHealthCheckReplyTimeout(timeout time.Duration) {
126         healthCheckReplyTimeout = timeout
127 }
128
129 // SetHealthCheckThreshold sets the number of failed healthProbe checks until the error is reported.
130 // Beware: Function is not thread-safe. It is recommended to setup this parameter
131 // before connecting to vpp.
132 func SetHealthCheckThreshold(threshold int) {
133         healthCheckThreshold = threshold
134 }
135
136 // SetControlPingMessages sets the messages for ControlPing and ControlPingReply
137 func SetControlPingMessages(controPing, controlPingReply api.Message) {
138         msgControlPing = controPing
139         msgControlPingReply = controlPingReply
140 }
141
142 // Connect connects to VPP using specified VPP adapter and returns the connection handle.
143 // This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed.
144 func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) {
145         // create new connection handle
146         c, err := newConnection(vppAdapter)
147         if err != nil {
148                 return nil, err
149         }
150
151         // blocking attempt to connect to VPP
152         err = c.connectVPP()
153         if err != nil {
154                 return nil, err
155         }
156
157         return conn, nil
158 }
159
160 // AsyncConnect asynchronously connects to VPP using specified VPP adapter and returns the connection handle
161 // and ConnectionState channel. This call does not block until connection is established, it
162 // returns immediately. The caller is supposed to watch the returned ConnectionState channel for
163 // Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect.
164 func AsyncConnect(vppAdapter adapter.VppAdapter) (*Connection, chan ConnectionEvent, error) {
165         // create new connection handle
166         c, err := newConnection(vppAdapter)
167         if err != nil {
168                 return nil, nil, err
169         }
170
171         // asynchronously attempt to connect to VPP
172         connChan := make(chan ConnectionEvent, notificationChannelBufSize)
173         go c.connectLoop(connChan)
174
175         return conn, connChan, nil
176 }
177
178 // Disconnect disconnects from VPP and releases all connection-related resources.
179 func (c *Connection) Disconnect() {
180         if c == nil {
181                 return
182         }
183         connLock.Lock()
184         defer connLock.Unlock()
185
186         if c != nil && c.vpp != nil {
187                 c.disconnectVPP()
188         }
189         conn = nil
190 }
191
192 // newConnection returns new connection handle.
193 func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) {
194         connLock.Lock()
195         defer connLock.Unlock()
196
197         if conn != nil {
198                 return nil, errors.New("only one connection per process is supported")
199         }
200
201         conn = &Connection{
202                 vpp:                vppAdapter,
203                 codec:              &MsgCodec{},
204                 channels:           make(map[uint32]*api.Channel),
205                 msgIDs:             make(map[string]uint16),
206                 notifSubscriptions: make(map[uint16][]*api.NotifSubscription),
207         }
208
209         conn.vpp.SetMsgCallback(msgCallback)
210         return conn, nil
211 }
212
213 // connectVPP performs one blocking attempt to connect to VPP.
214 func (c *Connection) connectVPP() error {
215         log.Debug("Connecting to VPP...")
216
217         // blocking connect
218         err := c.vpp.Connect()
219         if err != nil {
220                 log.Warn(err)
221                 return err
222         }
223
224         // store control ping IDs
225         if c.pingReqID, err = c.GetMessageID(msgControlPing); err != nil {
226                 c.vpp.Disconnect()
227                 return err
228         }
229         if c.pingReplyID, err = c.GetMessageID(msgControlPingReply); err != nil {
230                 c.vpp.Disconnect()
231                 return err
232         }
233
234         // store connected state
235         atomic.StoreUint32(&c.connected, 1)
236
237         log.Info("Connected to VPP.")
238         return nil
239 }
240
241 // disconnectVPP disconnects from VPP in case it is connected.
242 func (c *Connection) disconnectVPP() {
243         if atomic.CompareAndSwapUint32(&c.connected, 1, 0) {
244                 c.vpp.Disconnect()
245         }
246 }
247
248 // connectLoop attempts to connect to VPP until it succeeds.
249 // Then it continues with healthCheckLoop.
250 func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
251         // loop until connected
252         for {
253                 if err := c.vpp.WaitReady(); err != nil {
254                         log.Warnf("wait ready failed: %v", err)
255                 }
256                 if err := c.connectVPP(); err == nil {
257                         // signal connected event
258                         connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected}
259                         break
260                 } else {
261                         log.Errorf("connecting to VPP failed: %v", err)
262                         time.Sleep(time.Second)
263                 }
264         }
265
266         // we are now connected, continue with health check loop
267         c.healthCheckLoop(connChan)
268 }
269
270 // healthCheckLoop checks whether connection to VPP is alive. In case of disconnect,
271 // it continues with connectLoop and tries to reconnect.
272 func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
273         // create a separate API channel for health check probes
274         ch, err := conn.NewAPIChannel()
275         if err != nil {
276                 log.Error("Error by creating health check API channel, health check will be disabled:", err)
277                 return
278         }
279
280         failedChecks := 0
281         // send health check probes until an error occurs
282         for {
283                 // wait for healthCheckProbeInterval
284                 <-time.After(healthCheckProbeInterval)
285
286                 if atomic.LoadUint32(&c.connected) == 0 {
287                         // Disconnect has been called in the meantime, return the healthcheck - reconnect loop
288                         log.Debug("Disconnected on request, exiting health check loop.")
289                         return
290                 }
291
292                 // send the control ping
293                 ch.ReqChan <- &api.VppRequest{Message: msgControlPing}
294
295                 // expect response within timeout period
296                 select {
297                 case vppReply := <-ch.ReplyChan:
298                         err = vppReply.Error
299                 case <-time.After(healthCheckReplyTimeout):
300                         err = errors.New("probe reply not received within the timeout period")
301                 }
302
303                 if err != nil {
304                         failedChecks++
305                 } else {
306                         failedChecks = 0
307                 }
308
309                 if failedChecks > healthCheckThreshold {
310                         // in case of error, break & disconnect
311                         log.Errorf("VPP health check failed: %v", err)
312                         // signal disconnected event via channel
313                         connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
314                         break
315                 }
316         }
317
318         // cleanup
319         ch.Close()
320         c.disconnectVPP()
321
322         // we are now disconnected, start connect loop
323         c.connectLoop(connChan)
324 }
325
326 // NewAPIChannel returns a new API channel for communication with VPP via govpp core.
327 // It uses default buffer sizes for the request and reply Go channels.
328 func (c *Connection) NewAPIChannel() (*api.Channel, error) {
329         if c == nil {
330                 return nil, errors.New("nil connection passed in")
331         }
332         return c.NewAPIChannelBuffered(requestChannelBufSize, replyChannelBufSize)
333 }
334
335 // NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core.
336 // It allows to specify custom buffer sizes for the request and reply Go channels.
337 func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (*api.Channel, error) {
338         if c == nil {
339                 return nil, errors.New("nil connection passed in")
340         }
341         chID := atomic.AddUint32(&c.maxChannelID, 1)
342         chMeta := &channelMetadata{id: chID}
343
344         ch := api.NewChannelInternal(chMeta)
345         ch.MsgDecoder = c.codec
346         ch.MsgIdentifier = c
347
348         // create the communication channels
349         ch.ReqChan = make(chan *api.VppRequest, reqChanBufSize)
350         ch.ReplyChan = make(chan *api.VppReply, replyChanBufSize)
351         ch.NotifSubsChan = make(chan *api.NotifSubscribeRequest, reqChanBufSize)
352         ch.NotifSubsReplyChan = make(chan error, replyChanBufSize)
353
354         // store API channel within the client
355         c.channelsLock.Lock()
356         c.channels[chID] = ch
357         c.channelsLock.Unlock()
358
359         // start watching on the request channel
360         go c.watchRequests(ch, chMeta)
361
362         return ch, nil
363 }
364
365 // releaseAPIChannel releases API channel that needs to be closed.
366 func (c *Connection) releaseAPIChannel(ch *api.Channel, chMeta *channelMetadata) {
367         log.WithFields(logger.Fields{
368                 "context": chMeta.id,
369         }).Debug("API channel closed.")
370
371         // delete the channel from channels map
372         c.channelsLock.Lock()
373         delete(c.channels, chMeta.id)
374         c.channelsLock.Unlock()
375 }