https://jira.fd.io/browse/GOVPP-21
[govpp.git] / extras / libmemif / adapter.go
index a74c5cf..ac8a827 100644 (file)
@@ -34,7 +34,7 @@ import (
 #include <stdint.h>
 #include <string.h>
 #include <sys/eventfd.h>
-#include <libmemif.h>
+#include <libmemif.h>  // <-- VPP must be installed!
 
 // Feature tests.
 #ifndef MEMIF_HAVE_CANCEL_POLL_EVENT
@@ -50,7 +50,7 @@ memif_cancel_poll_event ()
 // are much easier to work with in cgo.
 typedef struct
 {
-       char *socket_filename;
+       memif_socket_handle_t socket;
        char *secret;
        uint8_t num_s2m_rings;
        uint8_t num_m2s_rings;
@@ -75,6 +75,8 @@ typedef struct
        uint8_t role;
        uint8_t mode;
        char *socket_filename;
+       uint8_t regions_num;
+       memif_region_details_t *regions;
        uint8_t rx_queues_num;
        uint8_t tx_queues_num;
        memif_queue_details_t *rx_queues;
@@ -106,7 +108,7 @@ govpp_memif_create (memif_conn_handle_t *conn, govpp_memif_conn_args_t *go_args,
 {
        memif_conn_args_t args;
        memset (&args, 0, sizeof (args));
-       args.socket_filename = (char *)go_args->socket_filename;
+       args.socket = (char *)go_args->socket;
        if (go_args->secret != NULL)
        {
                strncpy ((char *)args.secret, go_args->secret,
@@ -130,6 +132,12 @@ govpp_memif_create (memif_conn_handle_t *conn, govpp_memif_conn_args_t *go_args,
                                                private_ctx);
 }
 
+static int
+govpp_memif_create_socket (memif_socket_handle_t *sock, char *filename)
+{
+       return memif_create_socket(sock, filename, NULL);
+}
+
 // govpp_memif_get_details keeps reallocating buffer until it is large enough.
 // The buffer is returned to be deallocated when it is no longer needed.
 static int
@@ -167,6 +175,8 @@ govpp_memif_get_details (memif_conn_handle_t conn, govpp_memif_details_t *govpp_
                govpp_md->role = md.role;
                govpp_md->mode = md.mode;
                govpp_md->socket_filename = (char *)md.socket_filename;
+               govpp_md->regions_num = md.regions_num;
+               govpp_md->regions = md.regions;
                govpp_md->rx_queues_num = md.rx_queues_num;
                govpp_md->tx_queues_num = md.tx_queues_num;
                govpp_md->rx_queues = md.rx_queues;
@@ -194,12 +204,20 @@ govpp_get_tx_queue_details (govpp_memif_details_t *md, int index)
        return md->tx_queues[index];
 }
 
-// Copy packet data into the selected buffer.
+// Copy packet data into the selected buffer with splitting when necessary
 static void
-govpp_copy_packet_data(memif_buffer_t *buffers, int index, void *data, uint16_t size)
+govpp_copy_packet_data(memif_buffer_t *buffers, uint16_t allocated, int bufIndex, void *packetData, uint16_t packetSize)
 {
-       buffers[index].len = (size > buffers[index].len ? buffers[index].len : size);
-       memcpy(buffers[index].data, data, (size_t)buffers[index].len);
+       int dataOffset = 0;
+
+       do {
+               buffers[bufIndex].len = (packetSize > buffers[bufIndex].len ? buffers[bufIndex].len : packetSize);
+               void * curData = (packetData + dataOffset);
+               memcpy(buffers[bufIndex].data, curData, (size_t)buffers[bufIndex].len);
+               dataOffset += buffers[bufIndex].len;
+               bufIndex += 1;
+               packetSize -= buffers[bufIndex].len;
+       } while(packetSize > 0 && bufIndex < allocated && buffers[bufIndex].flags & MEMIF_BUFFER_FLAG_NEXT > 0);
 }
 
 // Get packet data from the selected buffer.
@@ -211,6 +229,26 @@ govpp_get_packet_data(memif_buffer_t *buffers, int index, int *size)
        return buffers[index].data;
 }
 
+// Checks if memif buffer is chained
+static int
+govpp_is_buffer_chained(memif_buffer_t *buffers, int index)
+{
+    return buffers[index].flags & MEMIF_BUFFER_FLAG_NEXT;
+}
+
+// Allocate memif buffers and return pointer to next free buffer
+static int
+govpp_memif_buffer_alloc(memif_conn_handle_t conn, uint16_t qid,
+                       memif_buffer_t * bufs, uint16_t offset, memif_buffer_t ** nextFreeBuf,
+                       uint16_t count, uint16_t * count_out, uint16_t size)
+{
+    memif_buffer_t * offsetBufs = (bufs + offset);
+    int err = memif_buffer_alloc(conn, qid, offsetBufs, count, count_out, size);
+    *count_out += offset;
+    *nextFreeBuf = offsetBufs;
+    return err;
+}
+
 */
 import "C"
 
@@ -333,18 +371,21 @@ type Memif struct {
        MemifMeta
 
        // Per-library references
-       ifIndex int                   // index used in the Go-libmemif context (Context.memifs)
-       cHandle C.memif_conn_handle_t // handle used in C-libmemif
+       ifIndex int                     // index used in the Go-libmemif context (Context.memifs)
+       cHandle C.memif_conn_handle_t   // connection handle used in C-libmemif
+       sHandle C.memif_socket_handle_t // socket handle used in C-libmemif
 
        // Callbacks
        callbacks *MemifCallbacks
 
        // Interrupt
        intCh      chan uint8      // memif-global interrupt channel (value = queue ID)
+       intErrCh   chan error      // triggered when interrupt error occurs
        queueIntCh []chan struct{} // per RX queue interrupt channel
 
        // Rx/Tx queues
        ringSize    int              // number of items in each ring
+       bufferSize  int              // max buffer size
        stopQPollFd int              // event file descriptor used to stop pollRxQueue-s
        wg          sync.WaitGroup   // wait group for all pollRxQueue-s
        rxQueueBufs []CPacketBuffers // an array of C-libmemif packet buffers for each RX queue
@@ -386,8 +427,9 @@ type MemifQueueDetails struct {
 
 // CPacketBuffers stores an array of memif buffers for use with TxBurst or RxBurst.
 type CPacketBuffers struct {
-       buffers *C.memif_buffer_t
-       count   int
+       buffers    *C.memif_buffer_t
+       count      int
+       rxChainBuf []RawPacketData
 }
 
 // Context is a global Go-libmemif runtime context.
@@ -400,6 +442,11 @@ type Context struct {
        wg sync.WaitGroup /* wait-group for pollEvents() */
 }
 
+type txPacketBuffer struct {
+       packets []RawPacketData
+       size    int
+}
+
 var (
        // logger used by the adapter.
        log *logger.Logger
@@ -439,11 +486,11 @@ func Init(appName string) error {
        // Initialize C-libmemif.
        var errCode int
        if appName == "" {
-               errCode = int(C.memif_init(nil, nil, nil, nil))
+               errCode = int(C.memif_init(nil, nil, nil, nil, nil))
        } else {
                appName := C.CString(appName)
                defer C.free(unsafe.Pointer(appName))
-               errCode = int(C.memif_init(nil, appName, nil, nil))
+               errCode = int(C.memif_init(nil, appName, nil, nil, nil))
        }
        err := getMemifError(errCode)
        if err != nil {
@@ -517,12 +564,18 @@ func CreateInterface(config *MemifConfig, callbacks *MemifCallbacks) (memif *Mem
                log2RingSize = 10
        }
 
+       bufferSize := config.BufferSize
+       if bufferSize <= 0 {
+               bufferSize = 2048
+       }
+
        // Create memif-wrapper for Go-libmemif.
        memif = &Memif{
-               MemifMeta: config.MemifMeta,
-               callbacks: &MemifCallbacks{},
-               ifIndex:   context.nextMemifIndex,
-               ringSize:  1 << log2RingSize,
+               MemifMeta:  config.MemifMeta,
+               callbacks:  &MemifCallbacks{},
+               ifIndex:    context.nextMemifIndex,
+               ringSize:   1 << log2RingSize,
+               bufferSize: int(bufferSize),
        }
 
        // Initialize memif callbacks.
@@ -533,6 +586,7 @@ func CreateInterface(config *MemifConfig, callbacks *MemifCallbacks) (memif *Mem
 
        // Initialize memif-global interrupt channel.
        memif.intCh = make(chan uint8, 1<<6)
+       memif.intErrCh = make(chan error, 1<<6)
 
        // Initialize event file descriptor for stopping Rx/Tx queue polling.
        memif.stopQPollFd = int(C.eventfd(0, C.EFD_NONBLOCK))
@@ -544,9 +598,13 @@ func CreateInterface(config *MemifConfig, callbacks *MemifCallbacks) (memif *Mem
        args := &C.govpp_memif_conn_args_t{}
        // - socket file name
        if config.SocketFilename != "" {
-               args.socket_filename = C.CString(config.SocketFilename)
-               defer C.free(unsafe.Pointer(args.socket_filename))
+               log.WithField("name", config.SocketFilename).Debug("A new memif socket was created")
+               errCode := C.govpp_memif_create_socket(&memif.sHandle, C.CString(config.SocketFilename))
+               if getMemifError(int(errCode)) != nil {
+                       return nil, err
+               }
        }
+       args.socket = memif.sHandle
        // - interface ID
        args.interface_id = C.uint32_t(config.ConnID)
        // - interface name
@@ -587,8 +645,7 @@ func CreateInterface(config *MemifConfig, callbacks *MemifCallbacks) (memif *Mem
 
        // Create memif in C-libmemif.
        errCode := C.govpp_memif_create(&memif.cHandle, args, unsafe.Pointer(uintptr(memif.ifIndex)))
-       err = getMemifError(int(errCode))
-       if err != nil {
+       if getMemifError(int(errCode)) != nil {
                return nil, err
        }
 
@@ -612,6 +669,12 @@ func (memif *Memif) GetInterruptChan() (ch <-chan uint8 /* queue ID */) {
        return memif.intCh
 }
 
+// GetInterruptErrorChan returns an Error channel
+// which fires if there are errors occurred while read data.
+func (memif *Memif) GetInterruptErrorChan() (ch <-chan error /* The error */) {
+       return memif.intErrCh
+}
+
 // GetQueueInterruptChan returns an empty-data channel which fires every time
 // there are data to read on a given queue.
 // It is only valid to call this function if memif is in the connected state.
@@ -717,10 +780,6 @@ func (memif *Memif) GetDetails() (details *MemifDetails, err error) {
 // Multiple TxBurst-s can run concurrently provided that each targets a different
 // TX queue.
 func (memif *Memif) TxBurst(queueID uint8, packets []RawPacketData) (count uint16, err error) {
-       var sentCount C.uint16_t
-       var allocated C.uint16_t
-       var bufSize int
-
        if len(packets) == 0 {
                return 0, nil
        }
@@ -729,16 +788,55 @@ func (memif *Memif) TxBurst(queueID uint8, packets []RawPacketData) (count uint1
                return 0, ErrQueueID
        }
 
-       // The largest packet in the set determines the packet buffer size.
+       var bufCount int
+       buffers := make([]*txPacketBuffer, 0)
+       cQueueID := C.uint16_t(queueID)
+
        for _, packet := range packets {
-               if len(packet) > int(bufSize) {
-                       bufSize = len(packet)
+               packetLen := len(packet)
+               log.Debugf("%v - preparing packet with len %v", cQueueID, packetLen)
+
+               if packetLen > memif.bufferSize {
+                       // Create jumbo buffer
+                       buffer := &txPacketBuffer{
+                               size:    packetLen,
+                               packets: []RawPacketData{packet},
+                       }
+
+                       buffers = append(buffers, buffer)
+
+                       // Increment bufCount by number of splits in this jumbo
+                       bufCount += (buffer.size + memif.bufferSize - 1) / memif.bufferSize
+               } else {
+                       buffersLen := len(buffers)
+
+                       // This is very first buffer so there is no data to append to, prepare empty one
+                       if buffersLen == 0 {
+                               buffers = []*txPacketBuffer{{}}
+                               buffersLen = 1
+                       }
+
+                       lastBuffer := buffers[buffersLen-1]
+
+                       // Last buffer is jumbo buffer, create new buffer
+                       if lastBuffer.size > memif.bufferSize {
+                               lastBuffer = &txPacketBuffer{}
+                               buffers = append(buffers, lastBuffer)
+                       }
+
+                       // Determine buffer size by max packet size in buffer
+                       if packetLen > lastBuffer.size {
+                               lastBuffer.size = packetLen
+                       }
+
+                       lastBuffer.packets = append(lastBuffer.packets, packet)
+                       bufCount += 1
                }
        }
 
        // Reallocate Tx buffers if needed to fit the input packets.
-       pb := memif.txQueueBufs[queueID]
-       bufCount := len(packets)
+       log.Debugf("%v - total buffer to allocate count %v", cQueueID, bufCount)
+       pb := &memif.txQueueBufs[queueID]
        if pb.count < bufCount {
                newBuffers := C.realloc(unsafe.Pointer(pb.buffers), C.size_t(bufCount*int(C.sizeof_memif_buffer_t)))
                if newBuffers == nil {
@@ -751,32 +849,79 @@ func (memif *Memif) TxBurst(queueID uint8, packets []RawPacketData) (count uint1
        }
 
        // Allocate ring slots.
-       cQueueID := C.uint16_t(queueID)
-       errCode := C.memif_buffer_alloc(memif.cHandle, cQueueID, pb.buffers, C.uint16_t(bufCount),
-               &allocated, C.uint32_t(bufSize))
-       err = getMemifError(int(errCode))
-       if err == ErrNoBufRing {
-               // Not enough ring slots, <count> will be less than bufCount.
-               err = nil
-       }
-       if err != nil {
-               return 0, err
-       }
+       var allocated C.uint16_t
+       var subCount C.uint16_t
+       for _, buffer := range buffers {
+               packetCount := C.uint16_t(len(buffer.packets))
+               isJumbo := buffer.size > memif.bufferSize
+
+               log.Debugf("%v - trying to send max buff size %v, packets len %v, buffer len %v, jumbo %v",
+                       cQueueID, buffer.size, len(buffer.packets), packetCount, isJumbo)
+
+               var nextFreeBuff *C.memif_buffer_t
+               startOffset := allocated
+               errCode := C.govpp_memif_buffer_alloc(memif.cHandle, cQueueID, pb.buffers, startOffset, &nextFreeBuff,
+                       packetCount, &allocated, C.uint16_t(buffer.size))
+
+               err = getMemifError(int(errCode))
+               endEarly := err == ErrNoBufRing
+               if endEarly {
+                       // Not enough ring slots, <count> will be less than packetCount.
+                       err = nil
+               }
+               if err != nil {
+                       return 0, err
+               }
+
+               // Copy packet data into the buffers.
+               nowAllocated := allocated - startOffset
+               toFill := nowAllocated
+               if !isJumbo {
+                       // If this is not jumbo frame, only 1 packet needs to be copied each iteration
+                       toFill = 1
+               }
+
+               // Iterate over all packets and try to fill them into allocated buffers
+               // If packet is jumbo frame, continue filling to allocated buffers until no buffer is left
+               for i, packet := range buffer.packets {
+                       if i >= int(nowAllocated) {
+                               // There was less allocated buffers than actual packet count so exit early
+                               break
+                       }
 
-       // Copy packet data into the buffers.
-       for i := 0; i < int(allocated); i++ {
-               packetData := unsafe.Pointer(&packets[i][0])
-               C.govpp_copy_packet_data(pb.buffers, C.int(i), packetData, C.uint16_t(len(packets[i])))
+                       packetData := unsafe.Pointer(&packet[0])
+                       C.govpp_copy_packet_data(nextFreeBuff, toFill, C.int(i), packetData, C.uint16_t(len(packet)))
+               }
+
+               if isJumbo && nowAllocated > 0 {
+                       // If we successfully allocated required amount of buffers for entire jumbo to be sent
+                       // simply sub entire amount of jumbo frame packets and leave only 1 so sender will think
+                       // it only sent 1 packet so it does not need to know anything about jumbo frames
+                       subCount += nowAllocated - 1
+               }
+
+               // If we do not have enough buffers left to allocate, simply end here to avoid packet loss and try
+               // to handle it next burst
+               if endEarly {
+                       break
+               }
        }
 
-       errCode = C.memif_tx_burst(memif.cHandle, cQueueID, pb.buffers, allocated, &sentCount)
+       var sentCount C.uint16_t
+       errCode := C.memif_tx_burst(memif.cHandle, cQueueID, pb.buffers, allocated, &sentCount)
        err = getMemifError(int(errCode))
        if err != nil {
                return 0, err
        }
-       count = uint16(sentCount)
 
-       return count, nil
+       // Prevent negative values
+       realSent := uint16(sentCount) - uint16(subCount)
+       if subCount > sentCount {
+               sentCount = 0
+       }
+
+       log.Debugf("%v - sent %v total allocated buffs %v", cQueueID, sentCount, allocated)
+       return realSent, nil
 }
 
 // RxBurst is used to receive multiple packets in one call from a selected queue.
@@ -789,6 +934,7 @@ func (memif *Memif) TxBurst(queueID uint8, packets []RawPacketData) (count uint1
 // Rx queue.
 func (memif *Memif) RxBurst(queueID uint8, count uint16) (packets []RawPacketData, err error) {
        var recvCount C.uint16_t
+       packets = make([]RawPacketData, 0)
 
        if count == 0 {
                return packets, nil
@@ -799,7 +945,7 @@ func (memif *Memif) RxBurst(queueID uint8, count uint16) (packets []RawPacketDat
        }
 
        // Reallocate Rx buffers if needed to fit the output packets.
-       pb := memif.rxQueueBufs[queueID]
+       pb := &memif.rxQueueBufs[queueID]
        bufCount := int(count)
        if pb.count < bufCount {
                newBuffers := C.realloc(unsafe.Pointer(pb.buffers), C.size_t(bufCount*int(C.sizeof_memif_buffer_t)))
@@ -824,11 +970,31 @@ func (memif *Memif) RxBurst(queueID uint8, count uint16) (packets []RawPacketDat
                return packets, err
        }
 
+       chained := len(pb.rxChainBuf) > 0
+       if chained {
+               // We had stored data from previous burst because last buffer in previous burst was chained
+               // so we need to continue appending to this data
+               packets = pb.rxChainBuf
+               pb.rxChainBuf = nil
+       }
+
        // Copy packet data into the instances of RawPacketData.
        for i := 0; i < int(recvCount); i++ {
                var packetSize C.int
                packetData := C.govpp_get_packet_data(pb.buffers, C.int(i), &packetSize)
-               packets = append(packets, C.GoBytes(packetData, packetSize))
+               packetBytes := C.GoBytes(packetData, packetSize)
+
+               if chained {
+                       // We have chained buffers, so start merging packet data with last read packet
+                       prevPacket := packets[len(packets)-1]
+                       packets[len(packets)-1] = append(prevPacket, packetBytes...)
+               } else {
+                       packets = append(packets, packetBytes)
+               }
+
+               // Mark last buffer as chained based on property on current buffer so next buffers
+               // will try to append data to this one in case we got jumbo frame
+               chained = C.govpp_is_buffer_chained(pb.buffers, C.int(i)) > 0
        }
 
        if recvCount > 0 {
@@ -840,6 +1006,15 @@ func (memif *Memif) RxBurst(queueID uint8, count uint16) (packets []RawPacketDat
                packets = nil
        }
 
+       if chained {
+               // We did not had enough space to process all chained buffers to the end so simply tell
+               // reader that it should not process any packets here and save them for next burst
+               // to finish reading the buffer chain
+               pb.rxChainBuf = packets
+               packets = nil
+               err = ErrNoBuf
+       }
+
        return packets, err
 }
 
@@ -855,6 +1030,7 @@ func (memif *Memif) Close() error {
        if err != nil {
                // Close memif-global interrupt channel.
                close(memif.intCh)
+               close(memif.intErrCh)
                // Close file descriptor stopQPollFd.
                C.close(C.int(memif.stopQPollFd))
        }
@@ -990,7 +1166,13 @@ func pollRxQueue(memif *Memif, queueID uint8) {
        for {
                _, err := syscall.EpollWait(epFd, event[:], -1)
                if err != nil {
+                       errno, _ := err.(syscall.Errno)
+                       //EINTR and EAGAIN should not be considered as a fatal error, try again
+                       if errno == syscall.EINTR || errno == syscall.EAGAIN {
+                               continue
+                       }
                        log.WithField("err", err).Error("epoll_wait() failed")
+                       memif.intErrCh <- err
                        return
                }