Merge "Exposed input_queue_size argument to user"
authorOndrej Fabry <ofabry@cisco.com>
Fri, 5 Apr 2019 08:34:37 +0000 (08:34 +0000)
committerGerrit Code Review <gerrit@fd.io>
Fri, 5 Apr 2019 08:34:37 +0000 (08:34 +0000)
Makefile
adapter/socketclient/socketclient.go
adapter/vppapiclient/vppapiclient.go
core/connection.go
core/connection_test.go
examples/simple-client/simple_client.go
govpp.go

index e5ae92e..c4b742e 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -23,7 +23,7 @@ examples:
 test:
        @echo "=> running tests"
        go test -cover ./cmd/...
-       go test -cover ./core ./api ./codec
+       go test -cover ./ ./adapter ./core ./api ./codec
 
 extras:
        @echo "=> building extras"
index eec4fd0..4c576c3 100644 (file)
@@ -7,10 +7,13 @@ import (
        "io"
        "net"
        "os"
+       "path/filepath"
        "strings"
        "sync"
        "time"
 
+       "github.com/fsnotify/fsnotify"
+
        "github.com/lunixbochs/struc"
        logger "github.com/sirupsen/logrus"
 
@@ -31,6 +34,10 @@ var (
        ConnectTimeout    = time.Second * 3
        DisconnectTimeout = time.Second
 
+       // MaxWaitReady defines maximum duration before waiting for socket file
+       // times out
+       MaxWaitReady = time.Second * 15
+
        Debug       = os.Getenv("DEBUG_GOVPP_SOCK") != ""
        DebugMsgIds = os.Getenv("DEBUG_GOVPP_SOCKMSG") != ""
 
@@ -72,9 +79,47 @@ func nilCallback(msgID uint16, data []byte) {
        Log.Warnf("no callback set, dropping message: ID=%v len=%d", msgID, len(data))
 }
 
-func (*vppClient) WaitReady() error {
-       // TODO: add watcher for socket file?
-       return nil
+// WaitReady checks socket file existence and waits for it if necessary
+func (c *vppClient) WaitReady() error {
+       // check if file at the path already exists
+       if _, err := os.Stat(c.sockAddr); err == nil {
+               return nil
+       } else if os.IsExist(err) {
+               return err
+       }
+
+       // if not, watch for it
+       watcher, err := fsnotify.NewWatcher()
+       if err != nil {
+               return err
+       }
+       defer func() {
+               if err := watcher.Close(); err != nil {
+                       Log.Errorf("failed to close file watcher: %v", err)
+               }
+       }()
+
+       // start watching directory
+       if err := watcher.Add(filepath.Dir(c.sockAddr)); err != nil {
+               return err
+       }
+
+       for {
+               select {
+               case <-time.After(MaxWaitReady):
+                       return fmt.Errorf("waiting for socket file timed out (%s)", MaxWaitReady)
+               case e := <-watcher.Errors:
+                       return e
+               case ev := <-watcher.Events:
+                       Log.Debugf("watcher event: %+v", ev)
+                       if ev.Name == c.sockAddr {
+                               if (ev.Op & fsnotify.Create) == fsnotify.Create {
+                                       // socket was created, we are ready
+                                       return nil
+                               }
+                       }
+               }
+       }
 }
 
 func (c *vppClient) SetMsgCallback(cb adapter.MsgCallback) {
index cac8c71..f637060 100644 (file)
@@ -78,12 +78,19 @@ import (
        "os"
        "path/filepath"
        "reflect"
+       "time"
        "unsafe"
 
        "git.fd.io/govpp.git/adapter"
        "github.com/fsnotify/fsnotify"
 )
 
+var (
+       // MaxWaitReady defines maximum duration before waiting for shared memory
+       // segment times out
+       MaxWaitReady = time.Second * 15
+)
+
 const (
        // shmDir is a directory where shared memory is supposed to be created.
        shmDir = "/dev/shm/"
@@ -181,16 +188,15 @@ func (a *vppClient) SetMsgCallback(cb adapter.MsgCallback) {
 // WaitReady blocks until shared memory for sending
 // binary api calls is present on the file system.
 func (a *vppClient) WaitReady() error {
-       var path string
-
        // join the path to the shared memory segment
+       var path string
        if a.shmPrefix == "" {
                path = filepath.Join(shmDir, vppShmFile)
        } else {
                path = filepath.Join(shmDir, a.shmPrefix+"-"+vppShmFile)
        }
 
-       // check if file at the path exists
+       // check if file at the path already exists
        if _, err := os.Stat(path); err == nil {
                // file exists, we are ready
                return nil
@@ -205,21 +211,26 @@ func (a *vppClient) WaitReady() error {
        }
        defer watcher.Close()
 
+       // start watching directory
        if err := watcher.Add(shmDir); err != nil {
                return err
        }
 
        for {
-               ev := <-watcher.Events
-               if ev.Name == path {
-                       if (ev.Op & fsnotify.Create) == fsnotify.Create {
-                               // file was created, we are ready
-                               break
+               select {
+               case <-time.After(MaxWaitReady):
+                       return fmt.Errorf("waiting for shared memory segment timed out (%s)", MaxWaitReady)
+               case e := <-watcher.Errors:
+                       return e
+               case ev := <-watcher.Events:
+                       if ev.Name == path {
+                               if (ev.Op & fsnotify.Create) == fsnotify.Create {
+                                       // file was created, we are ready
+                                       return nil
+                               }
                        }
                }
        }
-
-       return nil
 }
 
 //export go_msg_callback
index a21cc28..67c7e1d 100644 (file)
@@ -29,6 +29,11 @@ import (
        "git.fd.io/govpp.git/codec"
 )
 
+const (
+       DefaultReconnectInterval    = time.Second // default interval between reconnect attempts
+       DefaultMaxReconnectAttempts = 3           // default maximum number of reconnect attempts
+)
+
 var (
        RequestChanBufSize      = 100 // default size of the request channel buffer
        ReplyChanBufSize        = 100 // default size of the reply channel buffer
@@ -36,12 +41,10 @@ var (
 )
 
 var (
-       HealthCheckProbeInterval = time.Second * 1        // default health check probe interval
+       HealthCheckProbeInterval = time.Second            // default health check probe interval
        HealthCheckReplyTimeout  = time.Millisecond * 100 // timeout for reply to a health check probe
        HealthCheckThreshold     = 1                      // number of failed health checks until the error is reported
-       DefaultReplyTimeout      = time.Second * 1        // default timeout for replies from VPP
-       ReconnectInterval        = time.Second * 1        // default interval for reconnect attempts
-       MaxReconnectAttempts     = 3                      // maximum number of reconnect attempts
+       DefaultReplyTimeout      = time.Second            // default timeout for replies from VPP
 )
 
 // ConnectionState represents the current state of the connection to VPP.
@@ -88,6 +91,9 @@ type Connection struct {
        vppClient adapter.VppAPI // VPP binary API client
        //statsClient adapter.StatsAPI // VPP stats API client
 
+       maxAttempts int           // interval for reconnect attempts
+       recInterval time.Duration // maximum number of reconnect attempts
+
        vppConnected uint32 // non-zero if the adapter is connected to VPP
 
        codec  *codec.MsgCodec        // message codec
@@ -108,9 +114,18 @@ type Connection struct {
        lastReply     time.Time  // time of the last received reply from VPP
 }
 
-func newConnection(binapi adapter.VppAPI) *Connection {
+func newConnection(binapi adapter.VppAPI, attempts int, interval time.Duration) *Connection {
+       if attempts == 0 {
+               attempts = DefaultMaxReconnectAttempts
+       }
+       if interval == 0 {
+               interval = DefaultReconnectInterval
+       }
+
        c := &Connection{
                vppClient:     binapi,
+               maxAttempts:   attempts,
+               recInterval:   interval,
                codec:         &codec.MsgCodec{},
                msgIDs:        make(map[string]uint16),
                msgMap:        make(map[uint16]api.Message),
@@ -126,7 +141,7 @@ func newConnection(binapi adapter.VppAPI) *Connection {
 // Only one connection attempt will be performed.
 func Connect(binapi adapter.VppAPI) (*Connection, error) {
        // create new connection handle
-       c := newConnection(binapi)
+       c := newConnection(binapi, DefaultMaxReconnectAttempts, DefaultReconnectInterval)
 
        // blocking attempt to connect to VPP
        if err := c.connectVPP(); err != nil {
@@ -140,9 +155,9 @@ func Connect(binapi adapter.VppAPI) (*Connection, error) {
 // and ConnectionState channel. This call does not block until connection is established, it
 // returns immediately. The caller is supposed to watch the returned ConnectionState channel for
 // Connected/Disconnected events. In case of disconnect, the library will asynchronously try to reconnect.
-func AsyncConnect(binapi adapter.VppAPI) (*Connection, chan ConnectionEvent, error) {
+func AsyncConnect(binapi adapter.VppAPI, attempts int, interval time.Duration) (*Connection, chan ConnectionEvent, error) {
        // create new connection handle
-       c := newConnection(binapi)
+       c := newConnection(binapi, attempts, interval)
 
        // asynchronously attempt to connect to VPP
        connChan := make(chan ConnectionEvent, NotificationChanBufSize)
@@ -247,10 +262,10 @@ func (c *Connection) connectLoop(connChan chan ConnectionEvent) {
                        // signal connected event
                        connChan <- ConnectionEvent{Timestamp: time.Now(), State: Connected}
                        break
-               } else if reconnectAttempts < MaxReconnectAttempts {
+               } else if reconnectAttempts < c.maxAttempts {
                        reconnectAttempts++
-                       log.Errorf("connecting failed (attempt %d/%d): %v", reconnectAttempts, MaxReconnectAttempts, err)
-                       time.Sleep(ReconnectInterval)
+                       log.Errorf("connecting failed (attempt %d/%d): %v", reconnectAttempts, c.maxAttempts, err)
+                       time.Sleep(c.recInterval)
                } else {
                        connChan <- ConnectionEvent{Timestamp: time.Now(), State: Failed, Error: err}
                        return
index 929f468..843c5ea 100644 (file)
@@ -78,7 +78,7 @@ func TestAsyncConnection(t *testing.T) {
        defer ctx.teardownTest()
 
        ctx.conn.Disconnect()
-       conn, statusChan, err := core.AsyncConnect(ctx.mockVpp)
+       conn, statusChan, err := core.AsyncConnect(ctx.mockVpp, core.DefaultMaxReconnectAttempts, core.DefaultReconnectInterval)
        ctx.conn = conn
 
        Expect(err).ShouldNot(HaveOccurred())
index 6429c35..a494e81 100644 (file)
@@ -35,7 +35,7 @@ func main() {
        fmt.Println("Starting simple VPP client...")
 
        // connect to VPP
-       conn, conev, err := govpp.AsyncConnect("")
+       conn, conev, err := govpp.AsyncConnect("", core.DefaultMaxReconnectAttempts, core.DefaultReconnectInterval)
        if err != nil {
                log.Fatalln("ERROR:", err)
        }
index f679242..d66d5dc 100644 (file)
--- a/govpp.go
+++ b/govpp.go
@@ -18,6 +18,7 @@ import (
        "git.fd.io/govpp.git/adapter"
        "git.fd.io/govpp.git/adapter/vppapiclient"
        "git.fd.io/govpp.git/core"
+       "time"
 )
 
 var (
@@ -49,6 +50,6 @@ func Connect(shm string) (*core.Connection, error) {
 // This call does not block until connection is established, it returns immediately. The caller is
 // supposed to watch the returned ConnectionState channel for Connected/Disconnected events.
 // In case of disconnect, the library will asynchronously try to reconnect.
-func AsyncConnect(shm string) (*core.Connection, chan core.ConnectionEvent, error) {
-       return core.AsyncConnect(getVppAdapter(shm))
+func AsyncConnect(shm string, attempts int, interval time.Duration) (*core.Connection, chan core.ConnectionEvent, error) {
+       return core.AsyncConnect(getVppAdapter(shm), attempts, interval)
 }