Add support for jumbo frames to libmemif
[govpp.git] / extras / libmemif / adapter.go
index a74c5cf..d5a1563 100644 (file)
@@ -194,12 +194,20 @@ govpp_get_tx_queue_details (govpp_memif_details_t *md, int index)
        return md->tx_queues[index];
 }
 
-// Copy packet data into the selected buffer.
+// Copy packet data into the selected buffer with splitting when necessary
 static void
-govpp_copy_packet_data(memif_buffer_t *buffers, int index, void *data, uint16_t size)
+govpp_copy_packet_data(memif_buffer_t *buffers, uint16_t allocated, int bufIndex, void *packetData, uint16_t packetSize)
 {
-       buffers[index].len = (size > buffers[index].len ? buffers[index].len : size);
-       memcpy(buffers[index].data, data, (size_t)buffers[index].len);
+       int dataOffset = 0;
+
+       do {
+               buffers[bufIndex].len = (packetSize > buffers[bufIndex].len ? buffers[bufIndex].len : packetSize);
+               void * curData = (packetData + dataOffset);
+               memcpy(buffers[bufIndex].data, curData, (size_t)buffers[bufIndex].len);
+               dataOffset += buffers[bufIndex].len;
+               bufIndex += 1;
+               packetSize -= buffers[bufIndex].len;
+       } while(packetSize > 0 && bufIndex < allocated && buffers[bufIndex].flags & MEMIF_BUFFER_FLAG_NEXT > 0);
 }
 
 // Get packet data from the selected buffer.
@@ -211,6 +219,26 @@ govpp_get_packet_data(memif_buffer_t *buffers, int index, int *size)
        return buffers[index].data;
 }
 
+// Checks if memif buffer is chained
+static int
+govpp_is_buffer_chained(memif_buffer_t *buffers, int index)
+{
+    return buffers[index].flags & MEMIF_BUFFER_FLAG_NEXT;
+}
+
+// Allocate memif buffers and return pointer to next free buffer
+static int
+govpp_memif_buffer_alloc(memif_conn_handle_t conn, uint16_t qid,
+                       memif_buffer_t * bufs, uint16_t offset, memif_buffer_t ** nextFreeBuf,
+                       uint16_t count, uint16_t * count_out, uint16_t size)
+{
+    memif_buffer_t * offsetBufs = (bufs + offset);
+    int err = memif_buffer_alloc(conn, qid, offsetBufs, count, count_out, size);
+    *count_out += offset;
+    *nextFreeBuf = offsetBufs;
+    return err;
+}
+
 */
 import "C"
 
@@ -345,6 +373,7 @@ type Memif struct {
 
        // Rx/Tx queues
        ringSize    int              // number of items in each ring
+       bufferSize  int              // max buffer size
        stopQPollFd int              // event file descriptor used to stop pollRxQueue-s
        wg          sync.WaitGroup   // wait group for all pollRxQueue-s
        rxQueueBufs []CPacketBuffers // an array of C-libmemif packet buffers for each RX queue
@@ -388,6 +417,7 @@ type MemifQueueDetails struct {
 type CPacketBuffers struct {
        buffers *C.memif_buffer_t
        count   int
+       rxChainBuf []RawPacketData
 }
 
 // Context is a global Go-libmemif runtime context.
@@ -400,6 +430,11 @@ type Context struct {
        wg sync.WaitGroup /* wait-group for pollEvents() */
 }
 
+type txPacketBuffer struct {
+       packets []RawPacketData
+       size    int
+}
+
 var (
        // logger used by the adapter.
        log *logger.Logger
@@ -517,12 +552,18 @@ func CreateInterface(config *MemifConfig, callbacks *MemifCallbacks) (memif *Mem
                log2RingSize = 10
        }
 
+       bufferSize := config.BufferSize
+       if bufferSize <= 0 {
+               bufferSize = 2048
+       }
+
        // Create memif-wrapper for Go-libmemif.
        memif = &Memif{
-               MemifMeta: config.MemifMeta,
-               callbacks: &MemifCallbacks{},
-               ifIndex:   context.nextMemifIndex,
-               ringSize:  1 << log2RingSize,
+               MemifMeta:  config.MemifMeta,
+               callbacks:  &MemifCallbacks{},
+               ifIndex:    context.nextMemifIndex,
+               ringSize:   1 << log2RingSize,
+               bufferSize: int(bufferSize),
        }
 
        // Initialize memif callbacks.
@@ -717,10 +758,6 @@ func (memif *Memif) GetDetails() (details *MemifDetails, err error) {
 // Multiple TxBurst-s can run concurrently provided that each targets a different
 // TX queue.
 func (memif *Memif) TxBurst(queueID uint8, packets []RawPacketData) (count uint16, err error) {
-       var sentCount C.uint16_t
-       var allocated C.uint16_t
-       var bufSize int
-
        if len(packets) == 0 {
                return 0, nil
        }
@@ -729,16 +766,55 @@ func (memif *Memif) TxBurst(queueID uint8, packets []RawPacketData) (count uint1
                return 0, ErrQueueID
        }
 
-       // The largest packet in the set determines the packet buffer size.
+       var bufCount int
+       var buffers []*txPacketBuffer
+       cQueueID := C.uint16_t(queueID)
+
        for _, packet := range packets {
-               if len(packet) > int(bufSize) {
-                       bufSize = len(packet)
+               packetLen := len(packet)
+               log.Debugf("%v - preparing packet with len %v", cQueueID, packetLen)
+
+               if packetLen > memif.bufferSize {
+                       // Create jumbo buffer
+                       buffer := &txPacketBuffer{
+                               size:    packetLen,
+                               packets: []RawPacketData{packet},
+                       }
+
+                       buffers = append(buffers, buffer)
+
+                       // Increment bufCount by number of splits in this jumbo
+                       bufCount += (buffer.size + memif.bufferSize - 1) / memif.bufferSize
+               } else {
+                       buffersLen := len(buffers)
+
+                       // This is very first buffer so there is no data to append to, prepare empty one
+                       if buffersLen == 0 {
+                               buffers = []*txPacketBuffer{{}}
+                               buffersLen = 1
+                       }
+
+                       lastBuffer := buffers[buffersLen-1]
+
+                       // Last buffer is jumbo buffer, create new buffer
+                       if lastBuffer.size > memif.bufferSize {
+                               lastBuffer = &txPacketBuffer{}
+                               buffers = append(buffers, lastBuffer)
+                       }
+
+                       // Determine buffer size by max packet size in buffer
+                       if packetLen > lastBuffer.size {
+                               lastBuffer.size = packetLen
+                       }
+
+                       lastBuffer.packets = append(lastBuffer.packets, packet)
+                       bufCount += 1
                }
        }
 
        // Reallocate Tx buffers if needed to fit the input packets.
-       pb := memif.txQueueBufs[queueID]
-       bufCount := len(packets)
+       log.Debugf("%v - total buffer to allocate count %v", cQueueID, bufCount)
+       pb := &memif.txQueueBufs[queueID]
        if pb.count < bufCount {
                newBuffers := C.realloc(unsafe.Pointer(pb.buffers), C.size_t(bufCount*int(C.sizeof_memif_buffer_t)))
                if newBuffers == nil {
@@ -751,32 +827,79 @@ func (memif *Memif) TxBurst(queueID uint8, packets []RawPacketData) (count uint1
        }
 
        // Allocate ring slots.
-       cQueueID := C.uint16_t(queueID)
-       errCode := C.memif_buffer_alloc(memif.cHandle, cQueueID, pb.buffers, C.uint16_t(bufCount),
-               &allocated, C.uint32_t(bufSize))
-       err = getMemifError(int(errCode))
-       if err == ErrNoBufRing {
-               // Not enough ring slots, <count> will be less than bufCount.
-               err = nil
-       }
-       if err != nil {
-               return 0, err
-       }
+       var allocated C.uint16_t
+       var subCount C.uint16_t
+       for _, buffer := range buffers {
+               packetCount := C.uint16_t(len(buffer.packets))
+               isJumbo := buffer.size > memif.bufferSize
+
+               log.Debugf("%v - trying to send max buff size %v, packets len %v, buffer len %v, jumbo %v",
+                       cQueueID, buffer.size, len(buffer.packets), packetCount, isJumbo)
+
+               var nextFreeBuff *C.memif_buffer_t
+               startOffset := allocated
+               errCode := C.govpp_memif_buffer_alloc(memif.cHandle, cQueueID, pb.buffers, startOffset, &nextFreeBuff,
+                       packetCount, &allocated, C.uint16_t(buffer.size))
+
+               err = getMemifError(int(errCode))
+               endEarly := err == ErrNoBufRing
+               if endEarly {
+                       // Not enough ring slots, <count> will be less than packetCount.
+                       err = nil
+               }
+               if err != nil {
+                       return 0, err
+               }
+
+               // Copy packet data into the buffers.
+               nowAllocated := allocated - startOffset
+               toFill := nowAllocated
+               if !isJumbo {
+                       // If this is not jumbo frame, only 1 packet needs to be copied each iteration
+                       toFill = 1
+               }
+
+               // Iterate over all packets and try to fill them into allocated buffers
+               // If packet is jumbo frame, continue filling to allocated buffers until no buffer is left
+               for i, packet := range buffer.packets {
+                       if i >= int(nowAllocated) {
+                               // There was less allocated buffers than actual packet count so exit early
+                               break
+                       }
+
+                       packetData := unsafe.Pointer(&packet[0])
+                       C.govpp_copy_packet_data(nextFreeBuff, toFill, C.int(i), packetData, C.uint16_t(len(packet)))
+               }
+
+               if isJumbo && nowAllocated > 0 {
+                       // If we successfully allocated required amount of buffers for entire jumbo to be sent
+                       // simply sub entire amount of jumbo frame packets and leave only 1 so sender will think
+                       // it only sent 1 packet so it does not need to know anything about jumbo frames
+                       subCount += nowAllocated - 1
+               }
 
-       // Copy packet data into the buffers.
-       for i := 0; i < int(allocated); i++ {
-               packetData := unsafe.Pointer(&packets[i][0])
-               C.govpp_copy_packet_data(pb.buffers, C.int(i), packetData, C.uint16_t(len(packets[i])))
+               // If we do not have enough buffers left to allocate, simply end here to avoid packet loss and try
+               // to handle it next burst
+               if endEarly {
+                       break
+               }
        }
 
-       errCode = C.memif_tx_burst(memif.cHandle, cQueueID, pb.buffers, allocated, &sentCount)
+       var sentCount C.uint16_t
+       errCode := C.memif_tx_burst(memif.cHandle, cQueueID, pb.buffers, allocated, &sentCount)
        err = getMemifError(int(errCode))
        if err != nil {
                return 0, err
        }
-       count = uint16(sentCount)
 
-       return count, nil
+       // Prevent negative values
+       realSent := uint16(sentCount) - uint16(subCount)
+       if subCount > sentCount {
+               sentCount = 0
+       }
+
+       log.Debugf("%v - sent %v total allocated buffs %v", cQueueID, sentCount, allocated)
+       return realSent, nil
 }
 
 // RxBurst is used to receive multiple packets in one call from a selected queue.
@@ -799,7 +922,7 @@ func (memif *Memif) RxBurst(queueID uint8, count uint16) (packets []RawPacketDat
        }
 
        // Reallocate Rx buffers if needed to fit the output packets.
-       pb := memif.rxQueueBufs[queueID]
+       pb := &memif.rxQueueBufs[queueID]
        bufCount := int(count)
        if pb.count < bufCount {
                newBuffers := C.realloc(unsafe.Pointer(pb.buffers), C.size_t(bufCount*int(C.sizeof_memif_buffer_t)))
@@ -824,11 +947,31 @@ func (memif *Memif) RxBurst(queueID uint8, count uint16) (packets []RawPacketDat
                return packets, err
        }
 
+       chained := len(pb.rxChainBuf) > 0
+       if chained {
+               // We had stored data from previous burst because last buffer in previous burst was chained
+               // so we need to continue appending to this data
+               packets = pb.rxChainBuf
+               pb.rxChainBuf = nil
+       }
+
        // Copy packet data into the instances of RawPacketData.
        for i := 0; i < int(recvCount); i++ {
                var packetSize C.int
                packetData := C.govpp_get_packet_data(pb.buffers, C.int(i), &packetSize)
-               packets = append(packets, C.GoBytes(packetData, packetSize))
+               packetBytes := C.GoBytes(packetData, packetSize)
+
+               if chained {
+                       // We have chained buffers, so start merging packet data with last read packet
+                       prevPacket := packets[len(packets)-1]
+                       packets[len(packets)-1] = append(prevPacket, packetBytes...)
+               } else {
+                       packets = append(packets, packetBytes)
+               }
+
+               // Mark last buffer as chained based on property on current buffer so next buffers
+               // will try to append data to this one in case we got jumbo frame
+               chained = C.govpp_is_buffer_chained(pb.buffers, C.int(i)) > 0
        }
 
        if recvCount > 0 {
@@ -840,6 +983,15 @@ func (memif *Memif) RxBurst(queueID uint8, count uint16) (packets []RawPacketDat
                packets = nil
        }
 
+       if chained {
+               // We did not had enough space to process all chained buffers to the end so simply tell
+               // reader that it should not process any packets here and save them for next burst
+               // to finish reading the buffer chain
+               pb.rxChainBuf = packets
+               packets = nil
+               err = ErrNoBuf
+       }
+
        return packets, err
 }