X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=adapter%2Fvppapiclient%2Fvppapiclient.go;h=3ab460da9cbc20b4601dcfae0ea7a4a31f91369a;hb=HEAD;hp=34ad19989fea689597a7d555b096f95317f8c4f5;hpb=28df7c855784e784fb0e614c1cca0e565a7ef75f;p=govpp.git diff --git a/adapter/vppapiclient/vppapiclient.go b/adapter/vppapiclient/vppapiclient.go index 34ad199..3ab460d 100644 --- a/adapter/vppapiclient/vppapiclient.go +++ b/adapter/vppapiclient/vppapiclient.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -// +build !windows,!darwin +// +build !windows,!darwin,!novpp package vppapiclient @@ -20,56 +20,7 @@ package vppapiclient #cgo CFLAGS: -DPNG_DEBUG=1 #cgo LDFLAGS: -lvppapiclient -#include -#include -#include -#include -#include - -extern void go_msg_callback(uint16_t msg_id, void* data, size_t size); - -typedef struct __attribute__((__packed__)) _req_header { - uint16_t msg_id; - uint32_t client_index; - uint32_t context; -} req_header_t; - -typedef struct __attribute__((__packed__)) _reply_header { - uint16_t msg_id; -} reply_header_t; - -static void -govpp_msg_callback(unsigned char *data, int size) -{ - reply_header_t *header = ((reply_header_t *)data); - go_msg_callback(ntohs(header->msg_id), data, size); -} - -static int -govpp_send(uint32_t context, void *data, size_t size) -{ - req_header_t *header = ((req_header_t *)data); - header->context = htonl(context); - return vac_write(data, size); -} - -static int -govpp_connect(char *shm) -{ - return vac_connect("govpp", shm, govpp_msg_callback, 32); -} - -static int -govpp_disconnect() -{ - return vac_disconnect(); -} - -static uint32_t -govpp_get_msg_index(char *name_and_crc) -{ - return vac_get_msg_index(name_and_crc); -} +#include "vppapiclient_wrapper.h" */ import "C" @@ -78,10 +29,19 @@ import ( "os" "path/filepath" "reflect" + "sync/atomic" + "time" "unsafe" - "git.fd.io/govpp.git/adapter" "github.com/fsnotify/fsnotify" + + "go.fd.io/govpp/adapter" +) + +var ( + // MaxWaitReady defines maximum duration before waiting for shared memory + // segment times out + MaxWaitReady = time.Second * 10 ) const ( @@ -91,71 +51,79 @@ const ( vppShmFile = "vpe-api" ) -// global VPP binary API client adapter context -var client *VppClient +// global VPP binary API client, library vppapiclient only supports +// single connection at a time +var globalVppClient unsafe.Pointer -// VppClient is the default implementation of the VppAPI. -type VppClient struct { - shmPrefix string - msgCallback adapter.MsgCallback +// stubVppClient is the default implementation of the VppAPI. +type vppClient struct { + shmPrefix string + msgCallback adapter.MsgCallback + inputQueueSize uint16 } // NewVppClient returns a new VPP binary API client. -func NewVppClient(shmPrefix string) *VppClient { - return &VppClient{ - shmPrefix: shmPrefix, +func NewVppClient(shmPrefix string) adapter.VppAPI { + return NewVppClientWithInputQueueSize(shmPrefix, 32) +} + +// NewVppClientWithInputQueueSize returns a new VPP binary API client with a custom input queue size. +func NewVppClientWithInputQueueSize(shmPrefix string, inputQueueSize uint16) adapter.VppAPI { + return &vppClient{ + shmPrefix: shmPrefix, + inputQueueSize: inputQueueSize, } } // Connect connects the process to VPP. -func (a *VppClient) Connect() error { - if client != nil { +func (a *vppClient) Connect() error { + h := (*vppClient)(atomic.LoadPointer(&globalVppClient)) + if h != nil { return fmt.Errorf("already connected to binary API, disconnect first") } - var rc _Ctype_int + rxQlen := C.int(a.inputQueueSize) + var rc C.int if a.shmPrefix == "" { - rc = C.govpp_connect(nil) + rc = C.govpp_connect(nil, rxQlen) } else { shm := C.CString(a.shmPrefix) - rc = C.govpp_connect(shm) + rc = C.govpp_connect(shm, rxQlen) } if rc != 0 { return fmt.Errorf("connecting to VPP binary API failed (rc=%v)", rc) } - client = a + atomic.StorePointer(&globalVppClient, unsafe.Pointer(a)) return nil } // Disconnect disconnects the process from VPP. -func (a *VppClient) Disconnect() error { - client = nil - +func (a *vppClient) Disconnect() error { + atomic.StorePointer(&globalVppClient, nil) rc := C.govpp_disconnect() if rc != 0 { return fmt.Errorf("disconnecting from VPP binary API failed (rc=%v)", rc) } - return nil } // GetMsgID returns a runtime message ID for the given message name and CRC. -func (a *VppClient) GetMsgID(msgName string, msgCrc string) (uint16, error) { +func (a *vppClient) GetMsgID(msgName string, msgCrc string) (uint16, error) { nameAndCrc := C.CString(msgName + "_" + msgCrc) defer C.free(unsafe.Pointer(nameAndCrc)) msgID := uint16(C.govpp_get_msg_index(nameAndCrc)) if msgID == ^uint16(0) { // VPP does not know this message - return msgID, fmt.Errorf("unknown message: %v (crc: %v)", msgName, msgCrc) + return msgID, &adapter.UnknownMsgError{msgName, msgCrc} } return msgID, nil } // SendMsg sends a binary-encoded message to VPP. -func (a *VppClient) SendMsg(context uint32, data []byte) error { +func (a *vppClient) SendMsg(context uint32, data []byte) error { rc := C.govpp_send(C.uint32_t(context), unsafe.Pointer(&data[0]), C.size_t(len(data))) if rc != 0 { return fmt.Errorf("unable to send the message (rc=%v)", rc) @@ -165,28 +133,26 @@ func (a *VppClient) SendMsg(context uint32, data []byte) error { // SetMsgCallback sets a callback function that will be called by the adapter // whenever a message comes from VPP. -func (a *VppClient) SetMsgCallback(cb adapter.MsgCallback) { +func (a *vppClient) SetMsgCallback(cb adapter.MsgCallback) { a.msgCallback = cb } // WaitReady blocks until shared memory for sending // binary api calls is present on the file system. -func (a *VppClient) WaitReady() error { - var path string - +func (a *vppClient) WaitReady() error { // join the path to the shared memory segment + var path string if a.shmPrefix == "" { path = filepath.Join(shmDir, vppShmFile) } else { path = filepath.Join(shmDir, a.shmPrefix+"-"+vppShmFile) } - // check if file at the path exists + // check if file already exists if _, err := os.Stat(path); err == nil { - // file exists, we are ready - return nil + return nil // file exists, we are ready } else if !os.IsNotExist(err) { - return err + return err // some other error occurred } // file does not exist, start watching folder @@ -196,28 +162,37 @@ func (a *VppClient) WaitReady() error { } defer watcher.Close() + // start watching directory if err := watcher.Add(shmDir); err != nil { return err } + timeout := time.NewTimer(MaxWaitReady) for { - ev := <-watcher.Events - if ev.Name == path { - if (ev.Op & fsnotify.Create) == fsnotify.Create { - // file was created, we are ready - break + select { + case <-timeout.C: + return fmt.Errorf("timeout waiting (%s) for shm file: %s", MaxWaitReady, path) + + case e := <-watcher.Errors: + return e + + case ev := <-watcher.Events: + if ev.Name == path && (ev.Op&fsnotify.Create) == fsnotify.Create { + // file created, we are ready + return nil } } } - - return nil } //export go_msg_callback func go_msg_callback(msgID C.uint16_t, data unsafe.Pointer, size C.size_t) { + h := (*vppClient)(atomic.LoadPointer(&globalVppClient)) + if h == nil { + return + } // convert unsafe.Pointer to byte slice sliceHeader := &reflect.SliceHeader{Data: uintptr(data), Len: int(size), Cap: int(size)} byteSlice := *(*[]byte)(unsafe.Pointer(sliceHeader)) - - client.msgCallback(uint16(msgID), byteSlice) + h.msgCallback(uint16(msgID), byteSlice) }