Merge "Add buffer pool stats"
[govpp.git] / adapter / vppapiclient / vppapiclient.go
index 34ad199..f637060 100644 (file)
@@ -54,9 +54,9 @@ govpp_send(uint32_t context, void *data, size_t size)
 }
 
 static int
-govpp_connect(char *shm)
+govpp_connect(char *shm, int rx_qlen)
 {
-    return vac_connect("govpp", shm, govpp_msg_callback, 32);
+    return vac_connect("govpp", shm, govpp_msg_callback, rx_qlen);
 }
 
 static int
@@ -78,12 +78,19 @@ import (
        "os"
        "path/filepath"
        "reflect"
+       "time"
        "unsafe"
 
        "git.fd.io/govpp.git/adapter"
        "github.com/fsnotify/fsnotify"
 )
 
+var (
+       // MaxWaitReady defines maximum duration before waiting for shared memory
+       // segment times out
+       MaxWaitReady = time.Second * 15
+)
+
 const (
        // shmDir is a directory where shared memory is supposed to be created.
        shmDir = "/dev/shm/"
@@ -91,46 +98,55 @@ const (
        vppShmFile = "vpe-api"
 )
 
-// global VPP binary API client adapter context
-var client *VppClient
+// global VPP binary API client, library vppapiclient only supports
+// single connection at a time
+var globalVppClient *vppClient
 
-// VppClient is the default implementation of the VppAPI.
-type VppClient struct {
-       shmPrefix   string
-       msgCallback adapter.MsgCallback
+// stubVppClient is the default implementation of the VppAPI.
+type vppClient struct {
+       shmPrefix      string
+       msgCallback    adapter.MsgCallback
+       inputQueueSize uint16
 }
 
 // NewVppClient returns a new VPP binary API client.
-func NewVppClient(shmPrefix string) *VppClient {
-       return &VppClient{
-               shmPrefix: shmPrefix,
+func NewVppClient(shmPrefix string) adapter.VppAPI {
+       return NewVppClientWithInputQueueSize(shmPrefix, 32)
+}
+
+// NewVppClientWithInputQueueSize returns a new VPP binary API client with a custom input queue size.
+func NewVppClientWithInputQueueSize(shmPrefix string, inputQueueSize uint16) adapter.VppAPI {
+       return &vppClient{
+               shmPrefix:      shmPrefix,
+               inputQueueSize: inputQueueSize,
        }
 }
 
 // Connect connects the process to VPP.
-func (a *VppClient) Connect() error {
-       if client != nil {
+func (a *vppClient) Connect() error {
+       if globalVppClient != nil {
                return fmt.Errorf("already connected to binary API, disconnect first")
        }
 
-       var rc _Ctype_int
+       rxQlen := C.int(a.inputQueueSize)
+       var rc C.int
        if a.shmPrefix == "" {
-               rc = C.govpp_connect(nil)
+               rc = C.govpp_connect(nil, rxQlen)
        } else {
                shm := C.CString(a.shmPrefix)
-               rc = C.govpp_connect(shm)
+               rc = C.govpp_connect(shm, rxQlen)
        }
        if rc != 0 {
                return fmt.Errorf("connecting to VPP binary API failed (rc=%v)", rc)
        }
 
-       client = a
+       globalVppClient = a
        return nil
 }
 
 // Disconnect disconnects the process from VPP.
-func (a *VppClient) Disconnect() error {
-       client = nil
+func (a *vppClient) Disconnect() error {
+       globalVppClient = nil
 
        rc := C.govpp_disconnect()
        if rc != 0 {
@@ -141,7 +157,7 @@ func (a *VppClient) Disconnect() error {
 }
 
 // GetMsgID returns a runtime message ID for the given message name and CRC.
-func (a *VppClient) GetMsgID(msgName string, msgCrc string) (uint16, error) {
+func (a *vppClient) GetMsgID(msgName string, msgCrc string) (uint16, error) {
        nameAndCrc := C.CString(msgName + "_" + msgCrc)
        defer C.free(unsafe.Pointer(nameAndCrc))
 
@@ -155,7 +171,7 @@ func (a *VppClient) GetMsgID(msgName string, msgCrc string) (uint16, error) {
 }
 
 // SendMsg sends a binary-encoded message to VPP.
-func (a *VppClient) SendMsg(context uint32, data []byte) error {
+func (a *vppClient) SendMsg(context uint32, data []byte) error {
        rc := C.govpp_send(C.uint32_t(context), unsafe.Pointer(&data[0]), C.size_t(len(data)))
        if rc != 0 {
                return fmt.Errorf("unable to send the message (rc=%v)", rc)
@@ -165,23 +181,22 @@ func (a *VppClient) SendMsg(context uint32, data []byte) error {
 
 // SetMsgCallback sets a callback function that will be called by the adapter
 // whenever a message comes from VPP.
-func (a *VppClient) SetMsgCallback(cb adapter.MsgCallback) {
+func (a *vppClient) SetMsgCallback(cb adapter.MsgCallback) {
        a.msgCallback = cb
 }
 
 // WaitReady blocks until shared memory for sending
 // binary api calls is present on the file system.
-func (a *VppClient) WaitReady() error {
-       var path string
-
+func (a *vppClient) WaitReady() error {
        // join the path to the shared memory segment
+       var path string
        if a.shmPrefix == "" {
                path = filepath.Join(shmDir, vppShmFile)
        } else {
                path = filepath.Join(shmDir, a.shmPrefix+"-"+vppShmFile)
        }
 
-       // check if file at the path exists
+       // check if file at the path already exists
        if _, err := os.Stat(path); err == nil {
                // file exists, we are ready
                return nil
@@ -196,21 +211,26 @@ func (a *VppClient) WaitReady() error {
        }
        defer watcher.Close()
 
+       // start watching directory
        if err := watcher.Add(shmDir); err != nil {
                return err
        }
 
        for {
-               ev := <-watcher.Events
-               if ev.Name == path {
-                       if (ev.Op & fsnotify.Create) == fsnotify.Create {
-                               // file was created, we are ready
-                               break
+               select {
+               case <-time.After(MaxWaitReady):
+                       return fmt.Errorf("waiting for shared memory segment timed out (%s)", MaxWaitReady)
+               case e := <-watcher.Errors:
+                       return e
+               case ev := <-watcher.Events:
+                       if ev.Name == path {
+                               if (ev.Op & fsnotify.Create) == fsnotify.Create {
+                                       // file was created, we are ready
+                                       return nil
+                               }
                        }
                }
        }
-
-       return nil
 }
 
 //export go_msg_callback
@@ -219,5 +239,5 @@ func go_msg_callback(msgID C.uint16_t, data unsafe.Pointer, size C.size_t) {
        sliceHeader := &reflect.SliceHeader{Data: uintptr(data), Len: int(size), Cap: int(size)}
        byteSlice := *(*[]byte)(unsafe.Pointer(sliceHeader))
 
-       client.msgCallback(uint16(msgID), byteSlice)
+       globalVppClient.msgCallback(uint16(msgID), byteSlice)
 }