socketclient: wait for socket to be created
[govpp.git] / core / connection.go
1 // Copyright (c) 2017 Cisco and/or its affiliates.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
6 //
7 //     http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 package core
16
17 import (
18         "errors"
19         "fmt"
20         "reflect"
21         "sync"
22         "sync/atomic"
23         "time"
24
25         logger "github.com/sirupsen/logrus"
26
27         "git.fd.io/govpp.git/adapter"
28         "git.fd.io/govpp.git/api"
29         "git.fd.io/govpp.git/codec"
30 )
31
32 const (
33         DefaultReconnectInterval    = time.Second // default interval between reconnect attempts
34         DefaultMaxReconnectAttempts = 3           // default maximum number of reconnect attempts
35 )
36
37 var (
38         RequestChanBufSize      = 100 // default size of the request channel buffer
39         ReplyChanBufSize        = 100 // default size of the reply channel buffer
40         NotificationChanBufSize = 100 // default size of the notification channel buffer
41 )
42
43 var (
44         HealthCheckProbeInterval = time.Second            // default health check probe interval
45         HealthCheckReplyTimeout  = time.Millisecond * 100 // timeout for reply to a health check probe
46         HealthCheckThreshold     = 1                      // number of failed health checks until the error is reported
47         DefaultReplyTimeout      = time.Second            // default timeout for replies from VPP
48 )
49
50 // ConnectionState represents the current state of the connection to VPP.
51 type ConnectionState int
52
53 const (
54         // Connected represents state in which the connection has been successfully established.
55         Connected ConnectionState = iota
56
57         // Disconnected represents state in which the connection has been dropped.
58         Disconnected
59
60         // Failed represents state in which the reconnecting failed after exceeding maximum number of attempts.
61         Failed
62 )
63
64 func (s ConnectionState) String() string {
65         switch s {
66         case Connected:
67                 return "Connected"
68         case Disconnected:
69                 return "Disconnected"
70         case Failed:
71                 return "Failed"
72         default:
73                 return fmt.Sprintf("UnknownState(%d)", s)
74         }
75 }
76
77 // ConnectionEvent is a notification about change in the VPP connection state.
78 type ConnectionEvent struct {
79         // Timestamp holds the time when the event has been created.
80         Timestamp time.Time
81
82         // State holds the new state of the connection at the time when the event has been created.
83         State ConnectionState
84
85         // Error holds error if any encountered.
86         Error error
87 }
88
89 // Connection represents a shared memory connection to VPP via vppAdapter.
90 type Connection struct {
91         vppClient adapter.VppAPI // VPP binary API client
92         //statsClient adapter.StatsAPI // VPP stats API client
93
94         maxAttempts int           // interval for reconnect attempts
95         recInterval time.Duration // maximum number of reconnect attempts
96
97         vppConnected uint32 // non-zero if the adapter is connected to VPP
98
99         codec  *codec.MsgCodec        // message codec
100         msgIDs map[string]uint16      // map of message IDs indexed by message name + CRC
101         msgMap map[uint16]api.Message // map of messages indexed by message ID
102
103         maxChannelID uint32              // maximum used channel ID (the real limit is 2^15, 32-bit is used for atomic operations)
104         channelsLock sync.RWMutex        // lock for the channels map
105         channels     map[uint16]*Channel // map of all API channels indexed by the channel ID
106
107         subscriptionsLock sync.RWMutex                  // lock for the subscriptions map
108         subscriptions     map[uint16][]*subscriptionCtx // map od all notification subscriptions indexed by message ID
109
110         pingReqID   uint16 // ID if the ControlPing message
111         pingReplyID uint16 // ID of the ControlPingReply message
112
113         lastReplyLock sync.Mutex // lock for the last reply
114         lastReply     time.Time  // time of the last received reply from VPP
115 }
116
117 func newConnection(binapi adapter.VppAPI, attempts int, interval time.Duration) *Connection {
118         if attempts == 0 {
119                 attempts = DefaultMaxReconnectAttempts
120         }
121         if interval == 0 {
122                 interval = DefaultReconnectInterval
123         }
124
125         c := &Connection{
126                 vppClient:     binapi,
127                 maxAttempts:   attempts,
128                 recInterval:   interval,
129                 codec:         &codec.MsgCodec{},
130                 msgIDs:        make(map[string]uint16),
131                 msgMap:        make(map[uint16]api.Message),
132                 channels:      make(map[uint16]*Channel),
133                 subscriptions: make(map[uint16][]*subscriptionCtx),
134         }
135         binapi.SetMsgCallback(c.msgCallback)
136         return c
137 }
138
139 // Connect connects to VPP API using specified adapter and returns a connection handle.
140 // This call blocks until it is either connected, or an error occurs.
141 // Only one connection attempt will be performed.
142 func Connect(binapi adapter.VppAPI) (*Connection, error) {
143         // create new connection handle
144         c := newConnection(binapi, DefaultMaxReconnectAttempts, DefaultReconnectInterval)
145
146         // blocking attempt to connect to VPP
147         if err := c.connectVPP(); err != nil {
148                 return nil, err
149         }
150
151         return c, nil
152 }
153
154 // AsyncConnect asynchronously connects to VPP using specified VPP adapter and returns the connection handle
155 // and ConnectionState channel. This call does not block until connection is established, it
156 // returns immediately. The caller is supposed to watch the returned ConnectionState channel for
157 // Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect.
158 func AsyncConnect(binapi adapter.VppAPI, attempts int, interval time.Duration) (*Connection, chan ConnectionEvent, error) {
159         // create new connection handle
160         c := newConnection(binapi, attempts, interval)
161
162         // asynchronously attempt to connect to VPP
163         connChan := make(chan ConnectionEvent, NotificationChanBufSize)
164         go c.connectLoop(connChan)
165
166         return c, connChan, nil
167 }
168
169 // connectVPP performs blocking attempt to connect to VPP.
170 func (c *Connection) connectVPP() error {
171         log.Debug("Connecting to VPP..")
172
173         // blocking connect
174         if err := c.vppClient.Connect(); err != nil {
175                 return err
176         }
177
178         log.Debugf("Connected to VPP.")
179
180         if err := c.retrieveMessageIDs(); err != nil {
181                 c.vppClient.Disconnect()
182                 return fmt.Errorf("VPP is incompatible: %v", err)
183         }
184
185         // store connected state
186         atomic.StoreUint32(&c.vppConnected, 1)
187
188         return nil
189 }
190
191 // Disconnect disconnects from VPP API and releases all connection-related resources.
192 func (c *Connection) Disconnect() {
193         if c == nil {
194                 return
195         }
196
197         if c.vppClient != nil {
198                 c.disconnectVPP()
199         }
200 }
201
202 // disconnectVPP disconnects from VPP in case it is connected.
203 func (c *Connection) disconnectVPP() {
204         if atomic.CompareAndSwapUint32(&c.vppConnected, 1, 0) {
205                 c.vppClient.Disconnect()
206         }
207 }
208
209 func (c *Connection) NewAPIChannel() (api.Channel, error) {
210         return c.newAPIChannel(RequestChanBufSize, ReplyChanBufSize)
211 }
212
213 func (c *Connection) NewAPIChannelBuffered(reqChanBufSize, replyChanBufSize int) (api.Channel, error) {
214         return c.newAPIChannel(reqChanBufSize, replyChanBufSize)
215 }
216
217 // NewAPIChannelBuffered returns a new API channel for communication with VPP via govpp core.
218 // It allows to specify custom buffer sizes for the request and reply Go channels.
219 func (c *Connection) newAPIChannel(reqChanBufSize, replyChanBufSize int) (*Channel, error) {
220         if c == nil {
221                 return nil, errors.New("nil connection passed in")
222         }
223
224         // create new channel
225         chID := uint16(atomic.AddUint32(&c.maxChannelID, 1) & 0x7fff)
226         channel := newChannel(chID, c, c.codec, c, reqChanBufSize, replyChanBufSize)
227
228         // store API channel within the client
229         c.channelsLock.Lock()
230         c.channels[chID] = channel
231         c.channelsLock.Unlock()
232
233         // start watching on the request channel
234         go c.watchRequests(channel)
235
236         return channel, nil
237 }
238
239 // releaseAPIChannel releases API channel that needs to be closed.
240 func (c *Connection) releaseAPIChannel(ch *Channel) {
241         log.WithFields(logger.Fields{
242                 "channel": ch.id,
243         }).Debug("API channel released")
244
245         // delete the channel from channels map
246         c.channelsLock.Lock()
247         delete(c.channels, ch.id)
248         c.channelsLock.Unlock()
249 }
250
251 // connectLoop attempts to connect to VPP until it succeeds.
252 // Then it continues with healthCheckLoop.
253 func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
254         reconnectAttempts := 0
255
256         // loop until connected
257         for {
258                 if err := c.vppClient.WaitReady(); err != nil {
259                         log.Warnf("wait ready failed: %v", err)
260                 }
261                 if err := c.connectVPP(); err == nil {
262                         // signal connected event
263                         connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected}
264                         break
265                 } else if reconnectAttempts < c.maxAttempts {
266                         reconnectAttempts++
267                         log.Errorf("connecting failed (attempt %d/%d): %v", reconnectAttempts, c.maxAttempts, err)
268                         time.Sleep(c.recInterval)
269                 } else {
270                         connChan <- ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err}
271                         return
272                 }
273         }
274
275         // we are now connected, continue with health check loop
276         c.healthCheckLoop(connChan)
277 }
278
279 // healthCheckLoop checks whether connection to VPP is alive. In case of disconnect,
280 // it continues with connectLoop and tries to reconnect.
281 func (c *Connection) healthCheckLoop(connChan chan ConnectionEvent) {
282         // create a separate API channel for health check probes
283         ch, err := c.newAPIChannel(1, 1)
284         if err != nil {
285                 log.Error("Failed to create health check API channel, health check will be disabled:", err)
286                 return
287         }
288
289         var (
290                 sinceLastReply time.Duration
291                 failedChecks   int
292         )
293
294         // send health check probes until an error or timeout occurs
295         for {
296                 // sleep until next health check probe period
297                 time.Sleep(HealthCheckProbeInterval)
298
299                 if atomic.LoadUint32(&c.vppConnected) == 0 {
300                         // Disconnect has been called in the meantime, return the healthcheck - reconnect loop
301                         log.Debug("Disconnected on request, exiting health check loop.")
302                         return
303                 }
304
305                 // try draining probe replies from previous request before sending next one
306                 select {
307                 case <-ch.replyChan:
308                         log.Debug("drained old probe reply from reply channel")
309                 default:
310                 }
311
312                 // send the control ping request
313                 ch.reqChan <- &vppRequest{msg: msgControlPing}
314
315                 for {
316                         // expect response within timeout period
317                         select {
318                         case vppReply := <-ch.replyChan:
319                                 err = vppReply.err
320
321                         case <-time.After(HealthCheckReplyTimeout):
322                                 err = ErrProbeTimeout
323
324                                 // check if time since last reply from any other
325                                 // channel is less than health check reply timeout
326                                 c.lastReplyLock.Lock()
327                                 sinceLastReply = time.Since(c.lastReply)
328                                 c.lastReplyLock.Unlock()
329
330                                 if sinceLastReply < HealthCheckReplyTimeout {
331                                         log.Warnf("VPP health check probe timing out, but some request on other channel was received %v ago, continue waiting!", sinceLastReply)
332                                         continue
333                                 }
334                         }
335                         break
336                 }
337
338                 if err == ErrProbeTimeout {
339                         failedChecks++
340                         log.Warnf("VPP health check probe timed out after %v (%d. timeout)", HealthCheckReplyTimeout, failedChecks)
341                         if failedChecks > HealthCheckThreshold {
342                                 // in case of exceeded failed check treshold, assume VPP disconnected
343                                 log.Errorf("VPP health check exceeded treshold for timeouts (>%d), assuming disconnect", HealthCheckThreshold)
344                                 connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected}
345                                 break
346                         }
347                 } else if err != nil {
348                         // in case of error, assume VPP disconnected
349                         log.Errorf("VPP health check probe failed: %v", err)
350                         connChan <- ConnectionEvent{Timestamp: time.Now(), State: Disconnected, Error: err}
351                         break
352                 } else if failedChecks > 0 {
353                         // in case of success after failed checks, clear failed check counter
354                         failedChecks = 0
355                         log.Infof("VPP health check probe OK")
356                 }
357         }
358
359         // cleanup
360         ch.Close()
361         c.disconnectVPP()
362
363         // we are now disconnected, start connect loop
364         c.connectLoop(connChan)
365 }
366
367 func getMsgNameWithCrc(x api.Message) string {
368         return x.GetMessageName() + "_" + x.GetCrcString()
369 }
370
371 func getMsgFactory(msg api.Message) func() api.Message {
372         return func() api.Message {
373                 return reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
374         }
375 }
376
377 // GetMessageID returns message identifier of given API message.
378 func (c *Connection) GetMessageID(msg api.Message) (uint16, error) {
379         if c == nil {
380                 return 0, errors.New("nil connection passed in")
381         }
382
383         if msgID, ok := c.msgIDs[getMsgNameWithCrc(msg)]; ok {
384                 return msgID, nil
385         }
386
387         msgID, err := c.vppClient.GetMsgID(msg.GetMessageName(), msg.GetCrcString())
388         if err != nil {
389                 return 0, err
390         }
391
392         c.msgIDs[getMsgNameWithCrc(msg)] = msgID
393         c.msgMap[msgID] = msg
394
395         return msgID, nil
396 }
397
398 // LookupByID looks up message name and crc by ID.
399 func (c *Connection) LookupByID(msgID uint16) (api.Message, error) {
400         if c == nil {
401                 return nil, errors.New("nil connection passed in")
402         }
403
404         if msg, ok := c.msgMap[msgID]; ok {
405                 return msg, nil
406         }
407
408         return nil, fmt.Errorf("unknown message ID: %d", msgID)
409 }
410
411 // retrieveMessageIDs retrieves IDs for all registered messages and stores them in map
412 func (c *Connection) retrieveMessageIDs() (err error) {
413         t := time.Now()
414
415         msgs := api.GetRegisteredMessages()
416
417         var n int
418         for name, msg := range msgs {
419                 msgID, err := c.GetMessageID(msg)
420                 if err != nil {
421                         log.Debugf("retrieving msgID for %s failed: %v", name, err)
422                         continue
423                 }
424                 n++
425
426                 if c.pingReqID == 0 && msg.GetMessageName() == msgControlPing.GetMessageName() {
427                         c.pingReqID = msgID
428                         msgControlPing = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
429                 } else if c.pingReplyID == 0 && msg.GetMessageName() == msgControlPingReply.GetMessageName() {
430                         c.pingReplyID = msgID
431                         msgControlPingReply = reflect.New(reflect.TypeOf(msg).Elem()).Interface().(api.Message)
432                 }
433
434                 if debugMsgIDs {
435                         log.Debugf("message %q (%s) has ID: %d", name, getMsgNameWithCrc(msg), msgID)
436                 }
437         }
438         log.Debugf("retrieved %d/%d msgIDs (took %s)", n, len(msgs), time.Since(t))
439
440         return nil
441 }