From 6fe52d72255456e7d73df9d2f6b4a8f724ed447d Mon Sep 17 00:00:00 2001 From: Vladimir Lavor Date: Thu, 4 Apr 2019 13:37:53 +0200 Subject: [PATCH] socketclient: wait for socket to be created domain sockets are checked/watched for existence in same manner as shm prefixes also the connection attempts and timeout in between can be configured for async-connect Change-Id: I084a3efaefea10d106866968deab90d3fda77cfe Signed-off-by: Vladimir Lavor --- adapter/socketclient/socketclient.go | 38 +++++++++++++++++++++++++++++++++--- core/connection.go | 37 ++++++++++++++++++++++++----------- core/connection_test.go | 2 +- 3 files changed, 62 insertions(+), 15 deletions(-) diff --git a/adapter/socketclient/socketclient.go b/adapter/socketclient/socketclient.go index eec4fd0..f281955 100644 --- a/adapter/socketclient/socketclient.go +++ b/adapter/socketclient/socketclient.go @@ -4,9 +4,11 @@ import ( "bufio" "bytes" "fmt" + "github.com/fsnotify/fsnotify" "io" "net" "os" + "path/filepath" "strings" "sync" "time" @@ -72,9 +74,39 @@ func nilCallback(msgID uint16, data []byte) { Log.Warnf("no callback set, dropping message: ID=%v len=%d", msgID, len(data)) } -func (*vppClient) WaitReady() error { - // TODO: add watcher for socket file? - return nil +// WaitReady checks socket file existence and waits for it if necessary +func (c *vppClient) WaitReady() error { + // verify file existence + if _, err := os.Stat(c.sockAddr); err == nil { + return nil + } else if os.IsExist(err) { + return err + } + + // if not, watch for it + watcher, err := fsnotify.NewWatcher() + if err != nil { + return err + } + defer func() { + if err := watcher.Close(); err != nil { + Log.Errorf("failed to close file watcher: %v", err) + } + }() + path := filepath.Dir(c.sockAddr) + if err := watcher.Add(path); err != nil { + return err + } + + for { + ev := <-watcher.Events + if ev.Name == path { + if (ev.Op & fsnotify.Create) == fsnotify.Create { + // socket ready + return nil + } + } + } } func (c *vppClient) SetMsgCallback(cb adapter.MsgCallback) { diff --git a/core/connection.go b/core/connection.go index a21cc28..67c7e1d 100644 --- a/core/connection.go +++ b/core/connection.go @@ -29,6 +29,11 @@ import ( "git.fd.io/govpp.git/codec" ) +const ( + DefaultReconnectInterval = time.Second // default interval between reconnect attempts + DefaultMaxReconnectAttempts = 3 // default maximum number of reconnect attempts +) + var ( RequestChanBufSize = 100 // default size of the request channel buffer ReplyChanBufSize = 100 // default size of the reply channel buffer @@ -36,12 +41,10 @@ var ( ) var ( - HealthCheckProbeInterval = time.Second * 1 // default health check probe interval + HealthCheckProbeInterval = time.Second // default health check probe interval HealthCheckReplyTimeout = time.Millisecond * 100 // timeout for reply to a health check probe HealthCheckThreshold = 1 // number of failed health checks until the error is reported - DefaultReplyTimeout = time.Second * 1 // default timeout for replies from VPP - ReconnectInterval = time.Second * 1 // default interval for reconnect attempts - MaxReconnectAttempts = 3 // maximum number of reconnect attempts + DefaultReplyTimeout = time.Second // default timeout for replies from VPP ) // ConnectionState represents the current state of the connection to VPP. @@ -88,6 +91,9 @@ type Connection struct { vppClient adapter.VppAPI // VPP binary API client //statsClient adapter.StatsAPI // VPP stats API client + maxAttempts int // interval for reconnect attempts + recInterval time.Duration // maximum number of reconnect attempts + vppConnected uint32 // non-zero if the adapter is connected to VPP codec *codec.MsgCodec // message codec @@ -108,9 +114,18 @@ type Connection struct { lastReply time.Time // time of the last received reply from VPP } -func newConnection(binapi adapter.VppAPI) *Connection { +func newConnection(binapi adapter.VppAPI, attempts int, interval time.Duration) *Connection { + if attempts == 0 { + attempts = DefaultMaxReconnectAttempts + } + if interval == 0 { + interval = DefaultReconnectInterval + } + c := &Connection{ vppClient: binapi, + maxAttempts: attempts, + recInterval: interval, codec: &codec.MsgCodec{}, msgIDs: make(map[string]uint16), msgMap: make(map[uint16]api.Message), @@ -126,7 +141,7 @@ func newConnection(binapi adapter.VppAPI) *Connection { // Only one connection attempt will be performed. func Connect(binapi adapter.VppAPI) (*Connection, error) { // create new connection handle - c := newConnection(binapi) + c := newConnection(binapi, DefaultMaxReconnectAttempts, DefaultReconnectInterval) // blocking attempt to connect to VPP if err := c.connectVPP(); err != nil { @@ -140,9 +155,9 @@ func Connect(binapi adapter.VppAPI) (*Connection, error) { // and ConnectionState channel. This call does not block until connection is established, it // returns immediately. The caller is supposed to watch the returned ConnectionState channel for // Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect. -func AsyncConnect(binapi adapter.VppAPI) (*Connection, chan ConnectionEvent, error) { +func AsyncConnect(binapi adapter.VppAPI, attempts int, interval time.Duration) (*Connection, chan ConnectionEvent, error) { // create new connection handle - c := newConnection(binapi) + c := newConnection(binapi, attempts, interval) // asynchronously attempt to connect to VPP connChan := make(chan ConnectionEvent, NotificationChanBufSize) @@ -247,10 +262,10 @@ func (c *Connection) connectLoop(connChan chan ConnectionEvent) { // signal connected event connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected} break - } else if reconnectAttempts < MaxReconnectAttempts { + } else if reconnectAttempts < c.maxAttempts { reconnectAttempts++ - log.Errorf("connecting failed (attempt %d/%d): %v", reconnectAttempts, MaxReconnectAttempts, err) - time.Sleep(ReconnectInterval) + log.Errorf("connecting failed (attempt %d/%d): %v", reconnectAttempts, c.maxAttempts, err) + time.Sleep(c.recInterval) } else { connChan <- ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err} return diff --git a/core/connection_test.go b/core/connection_test.go index 929f468..843c5ea 100644 --- a/core/connection_test.go +++ b/core/connection_test.go @@ -78,7 +78,7 @@ func TestAsyncConnection(t *testing.T) { defer ctx.teardownTest() ctx.conn.Disconnect() - conn, statusChan, err := core.AsyncConnect(ctx.mockVpp) + conn, statusChan, err := core.AsyncConnect(ctx.mockVpp, core.DefaultMaxReconnectAttempts, core.DefaultReconnectInterval) ctx.conn = conn Expect(err).ShouldNot(HaveOccurred()) -- 2.16.6