socketclient: wait for socket to be created 81/18681/1
authorVladimir Lavor <vlavor@cisco.com>
Thu, 4 Apr 2019 11:37:53 +0000 (13:37 +0200)
committerVladimir Lavor <vlavor@cisco.com>
Thu, 4 Apr 2019 11:44:42 +0000 (13:44 +0200)
domain sockets are checked/watched for existence in same
manner as shm prefixes
also the connection attempts and timeout in between can
be configured for async-connect

Change-Id: I084a3efaefea10d106866968deab90d3fda77cfe
Signed-off-by: Vladimir Lavor <vlavor@cisco.com>
adapter/socketclient/socketclient.go
core/connection.go
core/connection_test.go

index eec4fd0..f281955 100644 (file)
@@ -4,9 +4,11 @@ import (
        "bufio"
        "bytes"
        "fmt"
+       "github.com/fsnotify/fsnotify"
        "io"
        "net"
        "os"
+       "path/filepath"
        "strings"
        "sync"
        "time"
@@ -72,9 +74,39 @@ func nilCallback(msgID uint16, data []byte) {
        Log.Warnf("no callback set, dropping message: ID=%v len=%d", msgID, len(data))
 }
 
-func (*vppClient) WaitReady() error {
-       // TODO: add watcher for socket file?
-       return nil
+// WaitReady checks socket file existence and waits for it if necessary
+func (c *vppClient) WaitReady() error {
+       // verify file existence
+       if _, err := os.Stat(c.sockAddr); err == nil {
+               return nil
+       } else if os.IsExist(err) {
+               return err
+       }
+
+       // if not, watch for it
+       watcher, err := fsnotify.NewWatcher()
+       if err != nil {
+               return err
+       }
+       defer func() {
+               if err := watcher.Close(); err != nil {
+                       Log.Errorf("failed to close file watcher: %v", err)
+               }
+       }()
+       path := filepath.Dir(c.sockAddr)
+       if err := watcher.Add(path); err != nil {
+               return err
+       }
+
+       for {
+               ev := <-watcher.Events
+               if ev.Name == path {
+                       if (ev.Op & fsnotify.Create) == fsnotify.Create {
+                               // socket ready
+                               return nil
+                       }
+               }
+       }
 }
 
 func (c *vppClient) SetMsgCallback(cb adapter.MsgCallback) {
index a21cc28..67c7e1d 100644 (file)
@@ -29,6 +29,11 @@ import (
        "git.fd.io/govpp.git/codec"
 )
 
+const (
+       DefaultReconnectInterval    = time.Second // default interval between reconnect attempts
+       DefaultMaxReconnectAttempts = 3           // default maximum number of reconnect attempts
+)
+
 var (
        RequestChanBufSize      = 100 // default size of the request channel buffer
        ReplyChanBufSize        = 100 // default size of the reply channel buffer
@@ -36,12 +41,10 @@ var (
 )
 
 var (
-       HealthCheckProbeInterval = time.Second * 1        // default health check probe interval
+       HealthCheckProbeInterval = time.Second            // default health check probe interval
        HealthCheckReplyTimeout  = time.Millisecond * 100 // timeout for reply to a health check probe
        HealthCheckThreshold     = 1                      // number of failed health checks until the error is reported
-       DefaultReplyTimeout      = time.Second * 1        // default timeout for replies from VPP
-       ReconnectInterval        = time.Second * 1        // default interval for reconnect attempts
-       MaxReconnectAttempts     = 3                      // maximum number of reconnect attempts
+       DefaultReplyTimeout      = time.Second            // default timeout for replies from VPP
 )
 
 // ConnectionState represents the current state of the connection to VPP.
@@ -88,6 +91,9 @@ type Connection struct {
        vppClient adapter.VppAPI // VPP binary API client
        //statsClient adapter.StatsAPI // VPP stats API client
 
+       maxAttempts int           // interval for reconnect attempts
+       recInterval time.Duration // maximum number of reconnect attempts
+
        vppConnected uint32 // non-zero if the adapter is connected to VPP
 
        codec  *codec.MsgCodec        // message codec
@@ -108,9 +114,18 @@ type Connection struct {
        lastReply     time.Time  // time of the last received reply from VPP
 }
 
-func newConnection(binapi adapter.VppAPI) *Connection {
+func newConnection(binapi adapter.VppAPI, attempts int, interval time.Duration) *Connection {
+       if attempts == 0 {
+               attempts = DefaultMaxReconnectAttempts
+       }
+       if interval == 0 {
+               interval = DefaultReconnectInterval
+       }
+
        c := &Connection{
                vppClient:     binapi,
+               maxAttempts:   attempts,
+               recInterval:   interval,
                codec:         &codec.MsgCodec{},
                msgIDs:        make(map[string]uint16),
                msgMap:        make(map[uint16]api.Message),
@@ -126,7 +141,7 @@ func newConnection(binapi adapter.VppAPI) *Connection {
 // Only one connection attempt will be performed.
 func Connect(binapi adapter.VppAPI) (*Connection, error) {
        // create new connection handle
-       c := newConnection(binapi)
+       c := newConnection(binapi, DefaultMaxReconnectAttempts, DefaultReconnectInterval)
 
        // blocking attempt to connect to VPP
        if err := c.connectVPP(); err != nil {
@@ -140,9 +155,9 @@ func Connect(binapi adapter.VppAPI) (*Connection, error) {
 // and ConnectionState channel. This call does not block until connection is established, it
 // returns immediately. The caller is supposed to watch the returned ConnectionState channel for
 // Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect.
-func AsyncConnect(binapi adapter.VppAPI) (*Connection, chan ConnectionEvent, error) {
+func AsyncConnect(binapi adapter.VppAPI, attempts int, interval time.Duration) (*Connection, chan ConnectionEvent, error) {
        // create new connection handle
-       c := newConnection(binapi)
+       c := newConnection(binapi, attempts, interval)
 
        // asynchronously attempt to connect to VPP
        connChan := make(chan ConnectionEvent, NotificationChanBufSize)
@@ -247,10 +262,10 @@ func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
                        // signal connected event
                        connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected}
                        break
-               } else if reconnectAttempts < MaxReconnectAttempts {
+               } else if reconnectAttempts < c.maxAttempts {
                        reconnectAttempts++
-                       log.Errorf("connecting failed (attempt %d/%d): %v", reconnectAttempts, MaxReconnectAttempts, err)
-                       time.Sleep(ReconnectInterval)
+                       log.Errorf("connecting failed (attempt %d/%d): %v", reconnectAttempts, c.maxAttempts, err)
+                       time.Sleep(c.recInterval)
                } else {
                        connChan <- ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err}
                        return
index 929f468..843c5ea 100644 (file)
@@ -78,7 +78,7 @@ func TestAsyncConnection(t *testing.T) {
        defer ctx.teardownTest()
 
        ctx.conn.Disconnect()
-       conn, statusChan, err := core.AsyncConnect(ctx.mockVpp)
+       conn, statusChan, err := core.AsyncConnect(ctx.mockVpp, core.DefaultMaxReconnectAttempts, core.DefaultReconnectInterval)
        ctx.conn = conn
 
        Expect(err).ShouldNot(HaveOccurred())