Add support for jumbo frames to libmemif 95/13295/16
authorTomas Slusny <slusnucky@gmail.com>
Tue, 26 Jun 2018 14:53:48 +0000 (16:53 +0200)
committerTomas Slusny <slusnucky@gmail.com>
Fri, 6 Jul 2018 14:17:37 +0000 (16:17 +0200)
Add support for frames larger than maximum buffer size to libmemif
adapter.

Change-Id: I8b2d9fe7e05328194cd0aafd6e3ab40392ebbfed
Signed-off-by: Tomas Slusny <slusnucky@gmail.com>
Makefile
extras/libmemif/README.md
extras/libmemif/adapter.go
extras/libmemif/examples/jumbo-frames/jumbo-frames.go [new file with mode: 0644]

index 214f632..ad9aa5e 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -16,6 +16,7 @@ extras:
        @cd extras/libmemif/examples/raw-data && go build -v
        @cd extras/libmemif/examples/icmp-responder && go build -v
        @cd extras/libmemif/examples/gopacket && go build -v
+       @cd extras/libmemif/examples/jumbo-frames && go build -v
 
 clean:
        @rm -f cmd/binapi-generator/binapi-generator
index 854aa96..d663fd2 100644 (file)
@@ -146,6 +146,12 @@ through each of the 3 queues. The received packets are printed to stdout.
 
 Stop an instance of *raw-data* with an interrupt signal (^C).
 
+#### Jumbo Frames Raw data (libmemif <-> libmemif)
+
+*jumbo-frames* is simple example how to send larger and larger jumbo
+packets with libmemif adapter. This is simple copy of *raw-data* but with
+sending larger packets, so for more information read its code and documentation.
+
 #### ICMP Responder
 
 *icmp-responder* is a simple example showing how to answer APR and ICMP
index a74c5cf..d5a1563 100644 (file)
@@ -194,12 +194,20 @@ govpp_get_tx_queue_details (govpp_memif_details_t *md, int index)
        return md->tx_queues[index];
 }
 
-// Copy packet data into the selected buffer.
+// Copy packet data into the selected buffer with splitting when necessary
 static void
-govpp_copy_packet_data(memif_buffer_t *buffers, int index, void *data, uint16_t size)
+govpp_copy_packet_data(memif_buffer_t *buffers, uint16_t allocated, int bufIndex, void *packetData, uint16_t packetSize)
 {
-       buffers[index].len = (size > buffers[index].len ? buffers[index].len : size);
-       memcpy(buffers[index].data, data, (size_t)buffers[index].len);
+       int dataOffset = 0;
+
+       do {
+               buffers[bufIndex].len = (packetSize > buffers[bufIndex].len ? buffers[bufIndex].len : packetSize);
+               void * curData = (packetData + dataOffset);
+               memcpy(buffers[bufIndex].data, curData, (size_t)buffers[bufIndex].len);
+               dataOffset += buffers[bufIndex].len;
+               bufIndex += 1;
+               packetSize -= buffers[bufIndex].len;
+       } while(packetSize > 0 && bufIndex < allocated && buffers[bufIndex].flags & MEMIF_BUFFER_FLAG_NEXT > 0);
 }
 
 // Get packet data from the selected buffer.
@@ -211,6 +219,26 @@ govpp_get_packet_data(memif_buffer_t *buffers, int index, int *size)
        return buffers[index].data;
 }
 
+// Checks if memif buffer is chained
+static int
+govpp_is_buffer_chained(memif_buffer_t *buffers, int index)
+{
+    return buffers[index].flags & MEMIF_BUFFER_FLAG_NEXT;
+}
+
+// Allocate memif buffers and return pointer to next free buffer
+static int
+govpp_memif_buffer_alloc(memif_conn_handle_t conn, uint16_t qid,
+                       memif_buffer_t * bufs, uint16_t offset, memif_buffer_t ** nextFreeBuf,
+                       uint16_t count, uint16_t * count_out, uint16_t size)
+{
+    memif_buffer_t * offsetBufs = (bufs + offset);
+    int err = memif_buffer_alloc(conn, qid, offsetBufs, count, count_out, size);
+    *count_out += offset;
+    *nextFreeBuf = offsetBufs;
+    return err;
+}
+
 */
 import "C"
 
@@ -345,6 +373,7 @@ type Memif struct {
 
        // Rx/Tx queues
        ringSize    int              // number of items in each ring
+       bufferSize  int              // max buffer size
        stopQPollFd int              // event file descriptor used to stop pollRxQueue-s
        wg          sync.WaitGroup   // wait group for all pollRxQueue-s
        rxQueueBufs []CPacketBuffers // an array of C-libmemif packet buffers for each RX queue
@@ -388,6 +417,7 @@ type MemifQueueDetails struct {
 type CPacketBuffers struct {
        buffers *C.memif_buffer_t
        count   int
+       rxChainBuf []RawPacketData
 }
 
 // Context is a global Go-libmemif runtime context.
@@ -400,6 +430,11 @@ type Context struct {
        wg sync.WaitGroup /* wait-group for pollEvents() */
 }
 
+type txPacketBuffer struct {
+       packets []RawPacketData
+       size    int
+}
+
 var (
        // logger used by the adapter.
        log *logger.Logger
@@ -517,12 +552,18 @@ func CreateInterface(config *MemifConfig, callbacks *MemifCallbacks) (memif *Mem
                log2RingSize = 10
        }
 
+       bufferSize := config.BufferSize
+       if bufferSize <= 0 {
+               bufferSize = 2048
+       }
+
        // Create memif-wrapper for Go-libmemif.
        memif = &Memif{
-               MemifMeta: config.MemifMeta,
-               callbacks: &MemifCallbacks{},
-               ifIndex:   context.nextMemifIndex,
-               ringSize:  1 << log2RingSize,
+               MemifMeta:  config.MemifMeta,
+               callbacks:  &MemifCallbacks{},
+               ifIndex:    context.nextMemifIndex,
+               ringSize:   1 << log2RingSize,
+               bufferSize: int(bufferSize),
        }
 
        // Initialize memif callbacks.
@@ -717,10 +758,6 @@ func (memif *Memif) GetDetails() (details *MemifDetails, err error) {
 // Multiple TxBurst-s can run concurrently provided that each targets a different
 // TX queue.
 func (memif *Memif) TxBurst(queueID uint8, packets []RawPacketData) (count uint16, err error) {
-       var sentCount C.uint16_t
-       var allocated C.uint16_t
-       var bufSize int
-
        if len(packets) == 0 {
                return 0, nil
        }
@@ -729,16 +766,55 @@ func (memif *Memif) TxBurst(queueID uint8, packets []RawPacketData) (count uint1
                return 0, ErrQueueID
        }
 
-       // The largest packet in the set determines the packet buffer size.
+       var bufCount int
+       var buffers []*txPacketBuffer
+       cQueueID := C.uint16_t(queueID)
+
        for _, packet := range packets {
-               if len(packet) > int(bufSize) {
-                       bufSize = len(packet)
+               packetLen := len(packet)
+               log.Debugf("%v - preparing packet with len %v", cQueueID, packetLen)
+
+               if packetLen > memif.bufferSize {
+                       // Create jumbo buffer
+                       buffer := &txPacketBuffer{
+                               size:    packetLen,
+                               packets: []RawPacketData{packet},
+                       }
+
+                       buffers = append(buffers, buffer)
+
+                       // Increment bufCount by number of splits in this jumbo
+                       bufCount += (buffer.size + memif.bufferSize - 1) / memif.bufferSize
+               } else {
+                       buffersLen := len(buffers)
+
+                       // This is very first buffer so there is no data to append to, prepare empty one
+                       if buffersLen == 0 {
+                               buffers = []*txPacketBuffer{{}}
+                               buffersLen = 1
+                       }
+
+                       lastBuffer := buffers[buffersLen-1]
+
+                       // Last buffer is jumbo buffer, create new buffer
+                       if lastBuffer.size > memif.bufferSize {
+                               lastBuffer = &txPacketBuffer{}
+                               buffers = append(buffers, lastBuffer)
+                       }
+
+                       // Determine buffer size by max packet size in buffer
+                       if packetLen > lastBuffer.size {
+                               lastBuffer.size = packetLen
+                       }
+
+                       lastBuffer.packets = append(lastBuffer.packets, packet)
+                       bufCount += 1
                }
        }
 
        // Reallocate Tx buffers if needed to fit the input packets.
-       pb := memif.txQueueBufs[queueID]
-       bufCount := len(packets)
+       log.Debugf("%v - total buffer to allocate count %v", cQueueID, bufCount)
+       pb := &memif.txQueueBufs[queueID]
        if pb.count < bufCount {
                newBuffers := C.realloc(unsafe.Pointer(pb.buffers), C.size_t(bufCount*int(C.sizeof_memif_buffer_t)))
                if newBuffers == nil {
@@ -751,32 +827,79 @@ func (memif *Memif) TxBurst(queueID uint8, packets []RawPacketData) (count uint1
        }
 
        // Allocate ring slots.
-       cQueueID := C.uint16_t(queueID)
-       errCode := C.memif_buffer_alloc(memif.cHandle, cQueueID, pb.buffers, C.uint16_t(bufCount),
-               &allocated, C.uint32_t(bufSize))
-       err = getMemifError(int(errCode))
-       if err == ErrNoBufRing {
-               // Not enough ring slots, <count> will be less than bufCount.
-               err = nil
-       }
-       if err != nil {
-               return 0, err
-       }
+       var allocated C.uint16_t
+       var subCount C.uint16_t
+       for _, buffer := range buffers {
+               packetCount := C.uint16_t(len(buffer.packets))
+               isJumbo := buffer.size > memif.bufferSize
+
+               log.Debugf("%v - trying to send max buff size %v, packets len %v, buffer len %v, jumbo %v",
+                       cQueueID, buffer.size, len(buffer.packets), packetCount, isJumbo)
+
+               var nextFreeBuff *C.memif_buffer_t
+               startOffset := allocated
+               errCode := C.govpp_memif_buffer_alloc(memif.cHandle, cQueueID, pb.buffers, startOffset, &nextFreeBuff,
+                       packetCount, &allocated, C.uint16_t(buffer.size))
+
+               err = getMemifError(int(errCode))
+               endEarly := err == ErrNoBufRing
+               if endEarly {
+                       // Not enough ring slots, <count> will be less than packetCount.
+                       err = nil
+               }
+               if err != nil {
+                       return 0, err
+               }
+
+               // Copy packet data into the buffers.
+               nowAllocated := allocated - startOffset
+               toFill := nowAllocated
+               if !isJumbo {
+                       // If this is not jumbo frame, only 1 packet needs to be copied each iteration
+                       toFill = 1
+               }
+
+               // Iterate over all packets and try to fill them into allocated buffers
+               // If packet is jumbo frame, continue filling to allocated buffers until no buffer is left
+               for i, packet := range buffer.packets {
+                       if i >= int(nowAllocated) {
+                               // There was less allocated buffers than actual packet count so exit early
+                               break
+                       }
+
+                       packetData := unsafe.Pointer(&packet[0])
+                       C.govpp_copy_packet_data(nextFreeBuff, toFill, C.int(i), packetData, C.uint16_t(len(packet)))
+               }
+
+               if isJumbo && nowAllocated > 0 {
+                       // If we successfully allocated required amount of buffers for entire jumbo to be sent
+                       // simply sub entire amount of jumbo frame packets and leave only 1 so sender will think
+                       // it only sent 1 packet so it does not need to know anything about jumbo frames
+                       subCount += nowAllocated - 1
+               }
 
-       // Copy packet data into the buffers.
-       for i := 0; i < int(allocated); i++ {
-               packetData := unsafe.Pointer(&packets[i][0])
-               C.govpp_copy_packet_data(pb.buffers, C.int(i), packetData, C.uint16_t(len(packets[i])))
+               // If we do not have enough buffers left to allocate, simply end here to avoid packet loss and try
+               // to handle it next burst
+               if endEarly {
+                       break
+               }
        }
 
-       errCode = C.memif_tx_burst(memif.cHandle, cQueueID, pb.buffers, allocated, &sentCount)
+       var sentCount C.uint16_t
+       errCode := C.memif_tx_burst(memif.cHandle, cQueueID, pb.buffers, allocated, &sentCount)
        err = getMemifError(int(errCode))
        if err != nil {
                return 0, err
        }
-       count = uint16(sentCount)
 
-       return count, nil
+       // Prevent negative values
+       realSent := uint16(sentCount) - uint16(subCount)
+       if subCount > sentCount {
+               sentCount = 0
+       }
+
+       log.Debugf("%v - sent %v total allocated buffs %v", cQueueID, sentCount, allocated)
+       return realSent, nil
 }
 
 // RxBurst is used to receive multiple packets in one call from a selected queue.
@@ -799,7 +922,7 @@ func (memif *Memif) RxBurst(queueID uint8, count uint16) (packets []RawPacketDat
        }
 
        // Reallocate Rx buffers if needed to fit the output packets.
-       pb := memif.rxQueueBufs[queueID]
+       pb := &memif.rxQueueBufs[queueID]
        bufCount := int(count)
        if pb.count < bufCount {
                newBuffers := C.realloc(unsafe.Pointer(pb.buffers), C.size_t(bufCount*int(C.sizeof_memif_buffer_t)))
@@ -824,11 +947,31 @@ func (memif *Memif) RxBurst(queueID uint8, count uint16) (packets []RawPacketDat
                return packets, err
        }
 
+       chained := len(pb.rxChainBuf) > 0
+       if chained {
+               // We had stored data from previous burst because last buffer in previous burst was chained
+               // so we need to continue appending to this data
+               packets = pb.rxChainBuf
+               pb.rxChainBuf = nil
+       }
+
        // Copy packet data into the instances of RawPacketData.
        for i := 0; i < int(recvCount); i++ {
                var packetSize C.int
                packetData := C.govpp_get_packet_data(pb.buffers, C.int(i), &packetSize)
-               packets = append(packets, C.GoBytes(packetData, packetSize))
+               packetBytes := C.GoBytes(packetData, packetSize)
+
+               if chained {
+                       // We have chained buffers, so start merging packet data with last read packet
+                       prevPacket := packets[len(packets)-1]
+                       packets[len(packets)-1] = append(prevPacket, packetBytes...)
+               } else {
+                       packets = append(packets, packetBytes)
+               }
+
+               // Mark last buffer as chained based on property on current buffer so next buffers
+               // will try to append data to this one in case we got jumbo frame
+               chained = C.govpp_is_buffer_chained(pb.buffers, C.int(i)) > 0
        }
 
        if recvCount > 0 {
@@ -840,6 +983,15 @@ func (memif *Memif) RxBurst(queueID uint8, count uint16) (packets []RawPacketDat
                packets = nil
        }
 
+       if chained {
+               // We did not had enough space to process all chained buffers to the end so simply tell
+               // reader that it should not process any packets here and save them for next burst
+               // to finish reading the buffer chain
+               pb.rxChainBuf = packets
+               packets = nil
+               err = ErrNoBuf
+       }
+
        return packets, err
 }
 
diff --git a/extras/libmemif/examples/jumbo-frames/jumbo-frames.go b/extras/libmemif/examples/jumbo-frames/jumbo-frames.go
new file mode 100644 (file)
index 0000000..1bc943f
--- /dev/null
@@ -0,0 +1,176 @@
+// jumbo-frames is simple example how to send larger and larger jumbo packets with libmemif adapter. This is simple copy
+// of raw-data but with sending larger packets, so for more information read its code and docs.
+package main
+
+import (
+       "fmt"
+       "os"
+       "os/signal"
+       "sync"
+       "time"
+
+       "git.fd.io/govpp.git/extras/libmemif"
+)
+
+const (
+       Socket             = "/tmp/jumbo-frames-example"
+       Secret             = "secret"
+       ConnectionID       = 1
+       NumQueues    uint8 = 3
+)
+
+var wg sync.WaitGroup
+var stopCh chan struct{}
+
+func OnConnect(memif *libmemif.Memif) (err error) {
+       details, err := memif.GetDetails()
+       if err != nil {
+               fmt.Printf("libmemif.GetDetails() error: %v\n", err)
+       }
+       fmt.Printf("memif %s has been connected: %+v\n", memif.IfName, details)
+
+       stopCh = make(chan struct{})
+       var i uint8
+       for i = 0; i < uint8(len(details.RxQueues)); i++ {
+               wg.Add(1)
+               go ReadAndPrintPackets(memif, i)
+       }
+       for i = 0; i < uint8(len(details.TxQueues)); i++ {
+               wg.Add(1)
+               go SendPackets(memif, i)
+       }
+       return nil
+}
+
+func OnDisconnect(memif *libmemif.Memif) (err error) {
+       fmt.Printf("memif %s has been disconnected\n", memif.IfName)
+       close(stopCh)
+       wg.Wait()
+       return nil
+}
+
+func ReadAndPrintPackets(memif *libmemif.Memif, queueID uint8) {
+       defer wg.Done()
+
+       interruptCh, err := memif.GetQueueInterruptChan(queueID)
+       if err != nil {
+               switch err {
+               case libmemif.ErrQueueID:
+                       fmt.Printf("libmemif.Memif.GetQueueInterruptChan() complains about invalid queue id!?")
+               default:
+                       fmt.Printf("libmemif.Memif.GetQueueInterruptChan() error: %v\n", err)
+               }
+               return
+       }
+
+       counter := 0
+       for {
+               select {
+               case <-interruptCh:
+                       counter++
+                       for {
+                               packets, err := memif.RxBurst(queueID, 10)
+                               if err != nil {
+                                       fmt.Printf("libmemif.Memif.RxBurst() error: %v\n", err)
+                               } else {
+                                       if len(packets) == 0 {
+                                               break
+                                       }
+                                       for _, packet := range packets {
+                                               fmt.Printf("Received packet queue=%d: %v in burst %d\n", queueID, len(packet), counter)
+                                       }
+                               }
+                       }
+               case <-stopCh:
+                       return
+               }
+       }
+}
+
+func SendPackets(memif *libmemif.Memif, queueID uint8) {
+       defer wg.Done()
+
+       counter := 0
+       for {
+               select {
+               case <-time.After(3 * time.Second):
+                       counter++
+                       packetMul :=  counter % 100 + 1 // Limit max iterations to 100 to not go out of bounds
+                       packets := []libmemif.RawPacketData{
+                               make([]byte, 128*packetMul),
+                               make([]byte, 256*packetMul),
+                               make([]byte, 512*packetMul),
+                       }
+                       sent := 0
+                       for {
+                               count, err := memif.TxBurst(queueID, packets[sent:])
+                               if err != nil {
+                                       fmt.Printf("libmemif.Memif.TxBurst() error: %v\n", err)
+                                       break
+                               } else {
+                                       fmt.Printf("libmemif.Memif.TxBurst() has sent %d packets in burst %v.\n", count, counter)
+                                       sent += int(count)
+                                       if sent == len(packets) {
+                                               break
+                                       }
+                               }
+                       }
+               case <-stopCh:
+                       return
+               }
+       }
+}
+
+func main() {
+       var isMaster = true
+       var appSuffix string
+       if len(os.Args) > 1 && (os.Args[1] == "--slave" || os.Args[1] == "-slave") {
+               isMaster = false
+               appSuffix = "-slave"
+       }
+
+       appName := "jumbo-frames" + appSuffix
+       fmt.Println("Initializing libmemif as ", appName)
+       err := libmemif.Init(appName)
+       if err != nil {
+               fmt.Printf("libmemif.Init() error: %v\n", err)
+               return
+       }
+       defer libmemif.Cleanup()
+
+       memifCallbacks := &libmemif.MemifCallbacks{
+               OnConnect:    OnConnect,
+               OnDisconnect: OnDisconnect,
+       }
+
+       memifConfig := &libmemif.MemifConfig{
+               MemifMeta: libmemif.MemifMeta{
+                       IfName:         "memif1",
+                       ConnID:         ConnectionID,
+                       SocketFilename: Socket,
+                       Secret:         Secret,
+                       IsMaster:       isMaster,
+                       Mode:           libmemif.IfModeEthernet,
+               },
+               MemifShmSpecs: libmemif.MemifShmSpecs{
+                       NumRxQueues:  NumQueues,
+                       NumTxQueues:  NumQueues,
+                       BufferSize:   2048,
+                       Log2RingSize: 10,
+               },
+       }
+
+       fmt.Printf("Callbacks: %+v\n", memifCallbacks)
+       fmt.Printf("Config: %+v\n", memifConfig)
+
+       memif, err := libmemif.CreateInterface(memifConfig, memifCallbacks)
+       if err != nil {
+               fmt.Printf("libmemif.CreateInterface() error: %v\n", err)
+               return
+       }
+       defer memif.Close()
+
+       sigChan := make(chan os.Signal, 1)
+       signal.Notify(sigChan, os.Interrupt)
+       <-sigChan
+}