added async connect API, new structure of examples
[govpp.git] / core / core.go
1 // Copyright (c) 2017 Cisco and/or its affiliates.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 package core
16
17 import (
18         "errors"
19         "os"
20         "sync"
21         "sync/atomic"
22         "time"
23
24         logger "github.com/Sirupsen/logrus"
25
26         "git.fd.io/govpp.git/adapter"
27         "git.fd.io/govpp.git/api"
28         "git.fd.io/govpp.git/core/bin_api/vpe"
29 )
30
31 const (
32         requestChannelBufSize      = 100 // default size of the request channel buffers
33         replyChannelBufSize        = 100 // default size of the reply channel buffers
34         notificationChannelBufSize = 100 // default size of the notification channel buffers
35 )
36
37 const (
38         healthCheckProbeInterval = time.Second * 1        // default health check probe interval
39         healthCheckReplyTimeout  = time.Millisecond * 100 // timeout for reply to a health check probe
40 )
41
42 // ConnectionState holds the current state of the connection to VPP.
43 type ConnectionState int
44
45 const (
46         // Connected connection state means that the connection to VPP has been successfully established.
47         Connected ConnectionState = iota
48
49         // Disconnected connection state means that the connection to VPP has been lost.
50         Disconnected = iota
51 )
52
53 // ConnectionEvent is a notification about change in the VPP connection state.
54 type ConnectionEvent struct {
55         // Timestamp holds the time when the event has been generated.
56         Timestamp time.Time
57
58         // State holds the new state of the connection to VPP at the time when the event has been generated.
59         State ConnectionState
60 }
61
62 // Connection represents a shared memory connection to VPP via vppAdapter.
63 type Connection struct {
64         vpp       adapter.VppAdapter // VPP adapter
65         connected uint32             // non-zero if the adapter is connected to VPP
66         codec     *MsgCodec          // message codec
67
68         msgIDs     map[string]uint16 // map of message IDs indexed by message name + CRC
69         msgIDsLock sync.RWMutex      // lock for the message IDs map
70
71         channels     map[uint32]*api.Channel // map of all API channels indexed by the channel ID
72         channelsLock sync.RWMutex            // lock for the channels map
73
74         notifSubscriptions     map[uint16][]*api.NotifSubscription // map od all notification subscriptions indexed by message ID
75         notifSubscriptionsLock sync.RWMutex                        // lock for the subscriptions map
76
77         maxChannelID uint32 // maximum used client ID
78         pingReqID    uint16 // ID if the ControlPing message
79         pingReplyID  uint16 // ID of the ControlPingReply message
80 }
81
82 // channelMetadata contains core-local metadata of an API channel.
83 type channelMetadata struct {
84         id        uint32 // channel ID
85         multipart uint32 // 1 if multipart request is being processed, 0 otherwise
86 }
87
88 var (
89         log      *logger.Logger // global logger
90         conn     *Connection    // global handle to the Connection (used in the message receive callback)
91         connLock sync.RWMutex   // lock for the global connection
92 )
93
94 // init initializes global logger, which logs debug level messages to stdout.
95 func init() {
96         log = logger.New()
97         log.Out = os.Stdout
98         log.Level = logger.DebugLevel
99 }
100
101 // SetLogger sets global logger to provided one.
102 func SetLogger(l *logger.Logger) {
103         log = l
104 }
105
106 // Connect connects to VPP using specified VPP adapter and returns the connection handle.
107 // This call blocks until VPP is connected, or an error occurs. Only one connection attempt will be performed.
108 func Connect(vppAdapter adapter.VppAdapter) (*Connection, error) {
109         // create new connection handle
110         c, err := newConnection(vppAdapter)
111         if err != nil {
112                 return nil, err
113         }
114
115         // blocking attempt to connect to VPP
116         err = c.connectVPP()
117         if err != nil {
118                 return nil, err
119         }
120
121         return conn, nil
122 }
123
124 // AsyncConnect asynchronously connects to VPP using specified VPP adapter and returns the connection handle
125 // and ConnectionState channel. This call does not block until connection is established, it
126 // returns immediately. The caller is supposed to watch the returned ConnectionState channel for
127 // Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect.
128 func AsyncConnect(vppAdapter adapter.VppAdapter) (*Connection, chan ConnectionEvent, error) {
129         // create new connection handle
130         c, err := newConnection(vppAdapter)
131         if err != nil {
132                 return nil, nil, err
133         }
134
135         // asynchronously attempt to connect to VPP
136         connChan := make(chan ConnectionEvent, notificationChannelBufSize)
137         go c.connectLoop(connChan)
138
139         return conn, connChan, nil
140 }
141
142 // Disconnect disconnects from VPP and releases all connection-related resources.
143 func (c *Connection) Disconnect() {
144         if c == nil {
145                 return
146         }
147         connLock.Lock()
148         defer connLock.Unlock()
149
150         if c != nil && c.vpp != nil {
151                 c.disconnectVPP()
152         }
153         conn = nil
154 }
155
156 // newConnection returns new connection handle.
157 func newConnection(vppAdapter adapter.VppAdapter) (*Connection, error) {
158         connLock.Lock()
159         defer connLock.Unlock()
160
161         if conn != nil {
162                 return nil, errors.New("only one connection per process is supported")
163         }
164
165         conn = &Connection{vpp: vppAdapter, codec: &MsgCodec{}}
166         conn.channels = make(map[uint32]*api.Channel)
167         conn.msgIDs = make(map[string]uint16)
168         conn.notifSubscriptions = make(map[uint16][]*api.NotifSubscription)
169
170         conn.vpp.SetMsgCallback(msgCallback)
171         return conn, nil
172 }
173
174 // connectVPP performs one blocking attempt to connect to VPP.
175 func (c *Connection) connectVPP() error {
176         log.Debug("Connecting to VPP...")
177
178         // blocking connect
179         err := c.vpp.Connect()
180         if err != nil {
181                 log.Warn(err)
182                 return err
183         }
184
185         // store connected state
186         atomic.StoreUint32(&c.connected, 1)
187
188         // store control ping IDs
189         c.pingReqID, _ = c.GetMessageID(&vpe.ControlPing{})
190         c.pingReplyID, _ = c.GetMessageID(&vpe.ControlPingReply{})
191
192         log.Info("Connected to VPP.")
193         return nil
194 }
195
196 // disconnectVPP disconnects from VPP in case it is connected.
197 func (c *Connection) disconnectVPP() {
198         if atomic.CompareAndSwapUint32(&c.connected, 1, 0) {
199                 c.vpp.Disconnect()
200         }
201 }
202
203 // connectLoop attempts to connect to VPP until it succeeds.
204 // Then it continues with healthCheckLoop.
205 func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
206         // loop until connected
207         for {
208                 err := c.connectVPP()
209                 if err == nil {
210                         // signal connected event
211                         connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected}
212                         break
213                 }
214         }
215
216         // we are now connected, continue with health check loop
217         c.healthCheckLoop(connChan)
218 }
219
220 // healthCheckLoop checks whether connection to VPP is alive. In case of disconnect,
221 // it continues with connectLoop and tries to reconnect.
222 func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
223         // create a separate API channel for health check probes
224         ch, err := conn.NewAPIChannel()
225         if err != nil {
226                 log.Error("Error by creating health check API channel, health check will be disabled:", err)
227                 return
228         }
229
230         // send health check probes until an error occurs
231         for {
232                 // wait for healthCheckProbeInterval
233                 <-time.After(healthCheckProbeInterval)
234
235                 if atomic.LoadUint32(&c.connected) == 0 {
236                         // Disconnect has been called in the meantime, return the healthcheck - reconnect loop
237                         log.Debug("Disconnected on request, exiting health check loop.")
238                         return
239                 }
240
241                 // send the control ping
242                 ch.ReqChan <- &api.VppRequest{Message: &vpe.ControlPing{}}
243
244                 // expect response within timeout period
245                 select {
246                 case vppReply := <-ch.ReplyChan:
247                         err = vppReply.Error
248                 case <-time.After(healthCheckReplyTimeout):
249                         err = errors.New("probe reply not received within the timeout period")
250                 }
251
252                 // in case of error, break & disconnect
253                 if err != nil {
254                         log.Errorf("VPP health check failed: %v", err)
255                         // signal disconnected event via channel
256                         connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
257                         break
258                 }
259         }
260
261         // cleanup
262         ch.Close()
263         c.disconnectVPP()
264
265         // we are now disconnected, start connect loop
266         c.connectLoop(connChan)
267 }
268
269 // NewAPIChannel returns a new API channel for communication with VPP via govpp core.
270 // It uses default buffer sizes for the request and reply Go channels.
271 func (c *Connection) NewAPIChannel() (*api.Channel, error) {
272         if c == nil {
273                 return nil, errors.New("nil connection passed in")
274         }
275         return c.NewAPIChannelBuffered(requestChannelBufSize, replyChannelBufSize)
276 }
277
278 // NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core.
279 // It allows to specify custom buffer sizes for the request and reply Go channels.
280 func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (*api.Channel, error) {
281         if c == nil {
282                 return nil, errors.New("nil connection passed in")
283         }
284         chID := atomic.AddUint32(&c.maxChannelID, 1)
285         chMeta := &channelMetadata{id: chID}
286
287         ch := api.NewChannelInternal(chMeta)
288         ch.MsgDecoder = c.codec
289         ch.MsgIdentifier = c
290
291         // create the communication channels
292         ch.ReqChan = make(chan *api.VppRequest, reqChanBufSize)
293         ch.ReplyChan = make(chan *api.VppReply, replyChanBufSize)
294         ch.NotifSubsChan = make(chan *api.NotifSubscribeRequest, reqChanBufSize)
295         ch.NotifSubsReplyChan = make(chan error, replyChanBufSize)
296
297         // store API channel within the client
298         c.channelsLock.Lock()
299         c.channels[chID] = ch
300         c.channelsLock.Unlock()
301
302         // start watching on the request channel
303         go c.watchRequests(ch, chMeta)
304
305         return ch, nil
306 }
307
308 // releaseAPIChannel releases API channel that needs to be closed.
309 func (c *Connection) releaseAPIChannel(ch *api.Channel, chMeta *channelMetadata) {
310         log.WithFields(logger.Fields{
311                 "context": chMeta.id,
312         }).Debug("API channel closed.")
313
314         // delete the channel from channels map
315         c.channelsLock.Lock()
316         delete(c.channels, chMeta.id)
317         c.channelsLock.Unlock()
318 }