Merge "Modify the generator to generate the code that is ignored by golint https...
[govpp.git] / core / core.go
1 // Copyright (c) 2017 Cisco and/or its affiliates.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 //go:generate binapi-generator --input-dir=bin_api --output-dir=bin_api
16
17 package core
18
19 import (
20         "errors"
21         "os"
22         "sync"
23         "sync/atomic"
24         "time"
25
26         logger "github.com/Sirupsen/logrus"
27
28         "git.fd.io/govpp.git/adapter"
29         "git.fd.io/govpp.git/api"
30         "git.fd.io/govpp.git/core/bin_api/vpe"
31 )
32
33 const (
34         requestChannelBufSize      = 100 // default size of the request channel buffers
35         replyChannelBufSize        = 100 // default size of the reply channel buffers
36         notificationChannelBufSize = 100 // default size of the notification channel buffers
37 )
38
39 const (
40         healthCheckProbeInterval = time.Second * 1        // default health check probe interval
41         healthCheckReplyTimeout  = time.Millisecond * 100 // timeout for reply to a health check probe
42 )
43
44 // ConnectionState holds the current state of the connection to VPP.
45 type ConnectionState int
46
47 const (
48         // Connected connection state means that the connection to VPP has been successfully established.
49         Connected ConnectionState = iota
50
51         // Disconnected connection state means that the connection to VPP has been lost.
52         Disconnected = iota
53 )
54
55 // ConnectionEvent is a notification about change in the VPP connection state.
56 type ConnectionEvent struct {
57         // Timestamp holds the time when the event has been generated.
58         Timestamp time.Time
59
60         // State holds the new state of the connection to VPP at the time when the event has been generated.
61         State ConnectionState
62 }
63
64 // Connection represents a shared memory connection to VPP via vppAdapter.
65 type Connection struct {
66         vpp       adapter.VppAdapter // VPP adapter
67         connected uint32             // non-zero if the adapter is connected to VPP
68         codec     *MsgCodec          // message codec
69
70         msgIDs     map[string]uint16 // map of message IDs indexed by message name + CRC
71         msgIDsLock sync.RWMutex      // lock for the message IDs map
72
73         channels     map[uint32]*api.Channel // map of all API channels indexed by the channel ID
74         channelsLock sync.RWMutex            // lock for the channels map
75
76         notifSubscriptions     map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID
77         notifSubscriptionsLock sync.RWMutex                        // lock for the subscriptions map
78
79         maxChannelID uint32 // maximum used client ID
80         pingReqID    uint16 // ID if the ControlPing message
81         pingReplyID  uint16 // ID of the ControlPingReply message
82 }
83
84 // channelMetadata contains core-local metadata of an API channel.
85 type channelMetadata struct {
86         id        uint32 // channel ID
87         multipart uint32 // 1 if multipart request is being processed, 0 otherwise
88 }
89
90 var (
91         log      *logger.Logger // global logger
92         conn     *Connection    // global handle to the Connection (used in the message receive callback)
93         connLock sync.RWMutex   // lock for the global connection
94 )
95
96 // init initializes global logger, which logs debug level messages to stdout.
97 func init() {
98         log = logger.New()
99         log.Out = os.Stdout
100         log.Level = logger.DebugLevel
101 }
102
103 // SetLogger sets global logger to provided one.
104 func SetLogger(l *logger.Logger) {
105         log = l
106 }
107
108 // Connect connects to VPP using specified VPP adapter and returns the connection handle.
109 // This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed.
110 func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) {
111         // create new connection handle
112         c, err := newConnection(vppAdapter)
113         if err != nil {
114                 return nil, err
115         }
116
117         // blocking attempt to connect to VPP
118         err = c.connectVPP()
119         if err != nil {
120                 return nil, err
121         }
122
123         return conn, nil
124 }
125
126 // AsyncConnect asynchronously connects to VPP using specified VPP adapter and returns the connection handle
127 // and ConnectionState channel. This call does not block until connection is established, it
128 // returns immediately. The caller is supposed to watch the returned ConnectionState channel for
129 // Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect.
130 func AsyncConnect(vppAdapter adapter.VppAdapter) (*Connection, chan ConnectionEvent, error) {
131         // create new connection handle
132         c, err := newConnection(vppAdapter)
133         if err != nil {
134                 return nil, nil, err
135         }
136
137         // asynchronously attempt to connect to VPP
138         connChan := make(chan ConnectionEvent, notificationChannelBufSize)
139         go c.connectLoop(connChan)
140
141         return conn, connChan, nil
142 }
143
144 // Disconnect disconnects from VPP and releases all connection-related resources.
145 func (c *Connection) Disconnect() {
146         if c == nil {
147                 return
148         }
149         connLock.Lock()
150         defer connLock.Unlock()
151
152         if c != nil && c.vpp != nil {
153                 c.disconnectVPP()
154         }
155         conn = nil
156 }
157
158 // newConnection returns new connection handle.
159 func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) {
160         connLock.Lock()
161         defer connLock.Unlock()
162
163         if conn != nil {
164                 return nil, errors.New("only one connection per process is supported")
165         }
166
167         conn = &Connection{vpp: vppAdapter, codec: &MsgCodec{}}
168         conn.channels = make(map[uint32]*api.Channel)
169         conn.msgIDs = make(map[string]uint16)
170         conn.notifSubscriptions = make(map[uint16][]*api.NotifSubscription)
171
172         conn.vpp.SetMsgCallback(msgCallback)
173         return conn, nil
174 }
175
176 // connectVPP performs one blocking attempt to connect to VPP.
177 func (c *Connection) connectVPP() error {
178         log.Debug("Connecting to VPP...")
179
180         // blocking connect
181         err := c.vpp.Connect()
182         if err != nil {
183                 log.Warn(err)
184                 return err
185         }
186
187         // store connected state
188         atomic.StoreUint32(&c.connected, 1)
189
190         // store control ping IDs
191         c.pingReqID, _ = c.GetMessageID(&vpe.ControlPing{})
192         c.pingReplyID, _ = c.GetMessageID(&vpe.ControlPingReply{})
193
194         log.Info("Connected to VPP.")
195         return nil
196 }
197
198 // disconnectVPP disconnects from VPP in case it is connected.
199 func (c *Connection) disconnectVPP() {
200         if atomic.CompareAndSwapUint32(&c.connected, 1, 0) {
201                 c.vpp.Disconnect()
202         }
203 }
204
205 // connectLoop attempts to connect to VPP until it succeeds.
206 // Then it continues with healthCheckLoop.
207 func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
208         // loop until connected
209         for {
210                 err := c.connectVPP()
211                 if err == nil {
212                         // signal connected event
213                         connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected}
214                         break
215                 }
216         }
217
218         // we are now connected, continue with health check loop
219         c.healthCheckLoop(connChan)
220 }
221
222 // healthCheckLoop checks whether connection to VPP is alive. In case of disconnect,
223 // it continues with connectLoop and tries to reconnect.
224 func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
225         // create a separate API channel for health check probes
226         ch, err := conn.NewAPIChannel()
227         if err != nil {
228                 log.Error("Error by creating health check API channel, health check will be disabled:", err)
229                 return
230         }
231
232         // send health check probes until an error occurs
233         for {
234                 // wait for healthCheckProbeInterval
235                 <-time.After(healthCheckProbeInterval)
236
237                 if atomic.LoadUint32(&c.connected) == 0 {
238                         // Disconnect has been called in the meantime, return the healthcheck - reconnect loop
239                         log.Debug("Disconnected on request, exiting health check loop.")
240                         return
241                 }
242
243                 // send the control ping
244                 ch.ReqChan <- &api.VppRequest{Message: &vpe.ControlPing{}}
245
246                 // expect response within timeout period
247                 select {
248                 case vppReply := <-ch.ReplyChan:
249                         err = vppReply.Error
250                 case <-time.After(healthCheckReplyTimeout):
251                         err = errors.New("probe reply not received within the timeout period")
252                 }
253
254                 // in case of error, break & disconnect
255                 if err != nil {
256                         log.Errorf("VPP health check failed: %v", err)
257                         // signal disconnected event via channel
258                         connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
259                         break
260                 }
261         }
262
263         // cleanup
264         ch.Close()
265         c.disconnectVPP()
266
267         // we are now disconnected, start connect loop
268         c.connectLoop(connChan)
269 }
270
271 // NewAPIChannel returns a new API channel for communication with VPP via govpp core.
272 // It uses default buffer sizes for the request and reply Go channels.
273 func (c *Connection) NewAPIChannel() (*api.Channel, error) {
274         if c == nil {
275                 return nil, errors.New("nil connection passed in")
276         }
277         return c.NewAPIChannelBuffered(requestChannelBufSize, replyChannelBufSize)
278 }
279
280 // NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core.
281 // It allows to specify custom buffer sizes for the request and reply Go channels.
282 func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (*api.Channel, error) {
283         if c == nil {
284                 return nil, errors.New("nil connection passed in")
285         }
286         chID := atomic.AddUint32(&c.maxChannelID, 1)
287         chMeta := &channelMetadata{id: chID}
288
289         ch := api.NewChannelInternal(chMeta)
290         ch.MsgDecoder = c.codec
291         ch.MsgIdentifier = c
292
293         // create the communication channels
294         ch.ReqChan = make(chan *api.VppRequest, reqChanBufSize)
295         ch.ReplyChan = make(chan *api.VppReply, replyChanBufSize)
296         ch.NotifSubsChan = make(chan *api.NotifSubscribeRequest, reqChanBufSize)
297         ch.NotifSubsReplyChan = make(chan error, replyChanBufSize)
298
299         // store API channel within the client
300         c.channelsLock.Lock()
301         c.channels[chID] = ch
302         c.channelsLock.Unlock()
303
304         // start watching on the request channel
305         go c.watchRequests(ch, chMeta)
306
307         return ch, nil
308 }
309
310 // releaseAPIChannel releases API channel that needs to be closed.
311 func (c *Connection) releaseAPIChannel(ch *api.Channel, chMeta *channelMetadata) {
312         log.WithFields(logger.Fields{
313                 "context": chMeta.id,
314         }).Debug("API channel closed.")
315
316         // delete the channel from channels map
317         c.channelsLock.Lock()
318         delete(c.channels, chMeta.id)
319         c.channelsLock.Unlock()
320 }