X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=extras%2Flibmemif%2Fadapter.go;h=eb7e209a7468c6c7eea91a653c6febebdfffcf4d;hb=91800ed117b781ede18cd45b84b80408ec31daf5;hp=a74c5cf6da77684b05e846ca2005ffb59fafad41;hpb=7c044e96ae66f2403e4d82005943a0f6d99bb6a7;p=govpp.git diff --git a/extras/libmemif/adapter.go b/extras/libmemif/adapter.go index a74c5cf..eb7e209 100644 --- a/extras/libmemif/adapter.go +++ b/extras/libmemif/adapter.go @@ -34,7 +34,7 @@ import ( #include #include #include -#include +#include // <-- VPP must be installed! // Feature tests. #ifndef MEMIF_HAVE_CANCEL_POLL_EVENT @@ -50,7 +50,7 @@ memif_cancel_poll_event () // are much easier to work with in cgo. typedef struct { - char *socket_filename; + memif_socket_handle_t socket; char *secret; uint8_t num_s2m_rings; uint8_t num_m2s_rings; @@ -75,6 +75,8 @@ typedef struct uint8_t role; uint8_t mode; char *socket_filename; + uint8_t regions_num; + memif_region_details_t *regions; uint8_t rx_queues_num; uint8_t tx_queues_num; memif_queue_details_t *rx_queues; @@ -106,7 +108,7 @@ govpp_memif_create (memif_conn_handle_t *conn, govpp_memif_conn_args_t *go_args, { memif_conn_args_t args; memset (&args, 0, sizeof (args)); - args.socket_filename = (char *)go_args->socket_filename; + args.socket = (char *)go_args->socket; if (go_args->secret != NULL) { strncpy ((char *)args.secret, go_args->secret, @@ -130,6 +132,12 @@ govpp_memif_create (memif_conn_handle_t *conn, govpp_memif_conn_args_t *go_args, private_ctx); } +static int +govpp_memif_create_socket (memif_socket_handle_t *sock, char *filename) +{ + return memif_create_socket(sock, filename, NULL); +} + // govpp_memif_get_details keeps reallocating buffer until it is large enough. // The buffer is returned to be deallocated when it is no longer needed. static int @@ -167,6 +175,8 @@ govpp_memif_get_details (memif_conn_handle_t conn, govpp_memif_details_t *govpp_ govpp_md->role = md.role; govpp_md->mode = md.mode; govpp_md->socket_filename = (char *)md.socket_filename; + govpp_md->regions_num = md.regions_num; + govpp_md->regions = md.regions; govpp_md->rx_queues_num = md.rx_queues_num; govpp_md->tx_queues_num = md.tx_queues_num; govpp_md->rx_queues = md.rx_queues; @@ -194,12 +204,20 @@ govpp_get_tx_queue_details (govpp_memif_details_t *md, int index) return md->tx_queues[index]; } -// Copy packet data into the selected buffer. +// Copy packet data into the selected buffer with splitting when necessary static void -govpp_copy_packet_data(memif_buffer_t *buffers, int index, void *data, uint16_t size) +govpp_copy_packet_data(memif_buffer_t *buffers, uint16_t allocated, int bufIndex, void *packetData, uint16_t packetSize) { - buffers[index].len = (size > buffers[index].len ? buffers[index].len : size); - memcpy(buffers[index].data, data, (size_t)buffers[index].len); + int dataOffset = 0; + + do { + buffers[bufIndex].len = (packetSize > buffers[bufIndex].len ? buffers[bufIndex].len : packetSize); + void * curData = (packetData + dataOffset); + memcpy(buffers[bufIndex].data, curData, (size_t)buffers[bufIndex].len); + dataOffset += buffers[bufIndex].len; + bufIndex += 1; + packetSize -= buffers[bufIndex].len; + } while(packetSize > 0 && bufIndex < allocated && buffers[bufIndex].flags & MEMIF_BUFFER_FLAG_NEXT > 0); } // Get packet data from the selected buffer. @@ -211,6 +229,26 @@ govpp_get_packet_data(memif_buffer_t *buffers, int index, int *size) return buffers[index].data; } +// Checks if memif buffer is chained +static int +govpp_is_buffer_chained(memif_buffer_t *buffers, int index) +{ + return buffers[index].flags & MEMIF_BUFFER_FLAG_NEXT; +} + +// Allocate memif buffers and return pointer to next free buffer +static int +govpp_memif_buffer_alloc(memif_conn_handle_t conn, uint16_t qid, + memif_buffer_t * bufs, uint16_t offset, memif_buffer_t ** nextFreeBuf, + uint16_t count, uint16_t * count_out, uint16_t size) +{ + memif_buffer_t * offsetBufs = (bufs + offset); + int err = memif_buffer_alloc(conn, qid, offsetBufs, count, count_out, size); + *count_out += offset; + *nextFreeBuf = offsetBufs; + return err; +} + */ import "C" @@ -333,8 +371,9 @@ type Memif struct { MemifMeta // Per-library references - ifIndex int // index used in the Go-libmemif context (Context.memifs) - cHandle C.memif_conn_handle_t // handle used in C-libmemif + ifIndex int // index used in the Go-libmemif context (Context.memifs) + cHandle C.memif_conn_handle_t // connection handle used in C-libmemif + sHandle C.memif_socket_handle_t // socket handle used in C-libmemif // Callbacks callbacks *MemifCallbacks @@ -345,6 +384,7 @@ type Memif struct { // Rx/Tx queues ringSize int // number of items in each ring + bufferSize int // max buffer size stopQPollFd int // event file descriptor used to stop pollRxQueue-s wg sync.WaitGroup // wait group for all pollRxQueue-s rxQueueBufs []CPacketBuffers // an array of C-libmemif packet buffers for each RX queue @@ -386,8 +426,9 @@ type MemifQueueDetails struct { // CPacketBuffers stores an array of memif buffers for use with TxBurst or RxBurst. type CPacketBuffers struct { - buffers *C.memif_buffer_t - count int + buffers *C.memif_buffer_t + count int + rxChainBuf []RawPacketData } // Context is a global Go-libmemif runtime context. @@ -400,6 +441,11 @@ type Context struct { wg sync.WaitGroup /* wait-group for pollEvents() */ } +type txPacketBuffer struct { + packets []RawPacketData + size int +} + var ( // logger used by the adapter. log *logger.Logger @@ -439,11 +485,11 @@ func Init(appName string) error { // Initialize C-libmemif. var errCode int if appName == "" { - errCode = int(C.memif_init(nil, nil, nil, nil)) + errCode = int(C.memif_init(nil, nil, nil, nil, nil)) } else { appName := C.CString(appName) defer C.free(unsafe.Pointer(appName)) - errCode = int(C.memif_init(nil, appName, nil, nil)) + errCode = int(C.memif_init(nil, appName, nil, nil, nil)) } err := getMemifError(errCode) if err != nil { @@ -517,12 +563,18 @@ func CreateInterface(config *MemifConfig, callbacks *MemifCallbacks) (memif *Mem log2RingSize = 10 } + bufferSize := config.BufferSize + if bufferSize <= 0 { + bufferSize = 2048 + } + // Create memif-wrapper for Go-libmemif. memif = &Memif{ - MemifMeta: config.MemifMeta, - callbacks: &MemifCallbacks{}, - ifIndex: context.nextMemifIndex, - ringSize: 1 << log2RingSize, + MemifMeta: config.MemifMeta, + callbacks: &MemifCallbacks{}, + ifIndex: context.nextMemifIndex, + ringSize: 1 << log2RingSize, + bufferSize: int(bufferSize), } // Initialize memif callbacks. @@ -544,9 +596,13 @@ func CreateInterface(config *MemifConfig, callbacks *MemifCallbacks) (memif *Mem args := &C.govpp_memif_conn_args_t{} // - socket file name if config.SocketFilename != "" { - args.socket_filename = C.CString(config.SocketFilename) - defer C.free(unsafe.Pointer(args.socket_filename)) + log.WithField("name", config.SocketFilename).Debug("A new memif socket was created") + errCode := C.govpp_memif_create_socket(&memif.sHandle, C.CString(config.SocketFilename)) + if getMemifError(int(errCode)) != nil { + return nil, err + } } + args.socket = memif.sHandle // - interface ID args.interface_id = C.uint32_t(config.ConnID) // - interface name @@ -587,8 +643,7 @@ func CreateInterface(config *MemifConfig, callbacks *MemifCallbacks) (memif *Mem // Create memif in C-libmemif. errCode := C.govpp_memif_create(&memif.cHandle, args, unsafe.Pointer(uintptr(memif.ifIndex))) - err = getMemifError(int(errCode)) - if err != nil { + if getMemifError(int(errCode)) != nil { return nil, err } @@ -717,10 +772,6 @@ func (memif *Memif) GetDetails() (details *MemifDetails, err error) { // Multiple TxBurst-s can run concurrently provided that each targets a different // TX queue. func (memif *Memif) TxBurst(queueID uint8, packets []RawPacketData) (count uint16, err error) { - var sentCount C.uint16_t - var allocated C.uint16_t - var bufSize int - if len(packets) == 0 { return 0, nil } @@ -729,16 +780,55 @@ func (memif *Memif) TxBurst(queueID uint8, packets []RawPacketData) (count uint1 return 0, ErrQueueID } - // The largest packet in the set determines the packet buffer size. + var bufCount int + buffers := make([]*txPacketBuffer, 0) + cQueueID := C.uint16_t(queueID) + for _, packet := range packets { - if len(packet) > int(bufSize) { - bufSize = len(packet) + packetLen := len(packet) + log.Debugf("%v - preparing packet with len %v", cQueueID, packetLen) + + if packetLen > memif.bufferSize { + // Create jumbo buffer + buffer := &txPacketBuffer{ + size: packetLen, + packets: []RawPacketData{packet}, + } + + buffers = append(buffers, buffer) + + // Increment bufCount by number of splits in this jumbo + bufCount += (buffer.size + memif.bufferSize - 1) / memif.bufferSize + } else { + buffersLen := len(buffers) + + // This is very first buffer so there is no data to append to, prepare empty one + if buffersLen == 0 { + buffers = []*txPacketBuffer{{}} + buffersLen = 1 + } + + lastBuffer := buffers[buffersLen-1] + + // Last buffer is jumbo buffer, create new buffer + if lastBuffer.size > memif.bufferSize { + lastBuffer = &txPacketBuffer{} + buffers = append(buffers, lastBuffer) + } + + // Determine buffer size by max packet size in buffer + if packetLen > lastBuffer.size { + lastBuffer.size = packetLen + } + + lastBuffer.packets = append(lastBuffer.packets, packet) + bufCount += 1 } } // Reallocate Tx buffers if needed to fit the input packets. - pb := memif.txQueueBufs[queueID] - bufCount := len(packets) + log.Debugf("%v - total buffer to allocate count %v", cQueueID, bufCount) + pb := &memif.txQueueBufs[queueID] if pb.count < bufCount { newBuffers := C.realloc(unsafe.Pointer(pb.buffers), C.size_t(bufCount*int(C.sizeof_memif_buffer_t))) if newBuffers == nil { @@ -751,32 +841,79 @@ func (memif *Memif) TxBurst(queueID uint8, packets []RawPacketData) (count uint1 } // Allocate ring slots. - cQueueID := C.uint16_t(queueID) - errCode := C.memif_buffer_alloc(memif.cHandle, cQueueID, pb.buffers, C.uint16_t(bufCount), - &allocated, C.uint32_t(bufSize)) - err = getMemifError(int(errCode)) - if err == ErrNoBufRing { - // Not enough ring slots, will be less than bufCount. - err = nil - } - if err != nil { - return 0, err - } + var allocated C.uint16_t + var subCount C.uint16_t + for _, buffer := range buffers { + packetCount := C.uint16_t(len(buffer.packets)) + isJumbo := buffer.size > memif.bufferSize + + log.Debugf("%v - trying to send max buff size %v, packets len %v, buffer len %v, jumbo %v", + cQueueID, buffer.size, len(buffer.packets), packetCount, isJumbo) + + var nextFreeBuff *C.memif_buffer_t + startOffset := allocated + errCode := C.govpp_memif_buffer_alloc(memif.cHandle, cQueueID, pb.buffers, startOffset, &nextFreeBuff, + packetCount, &allocated, C.uint16_t(buffer.size)) + + err = getMemifError(int(errCode)) + endEarly := err == ErrNoBufRing + if endEarly { + // Not enough ring slots, will be less than packetCount. + err = nil + } + if err != nil { + return 0, err + } + + // Copy packet data into the buffers. + nowAllocated := allocated - startOffset + toFill := nowAllocated + if !isJumbo { + // If this is not jumbo frame, only 1 packet needs to be copied each iteration + toFill = 1 + } + + // Iterate over all packets and try to fill them into allocated buffers + // If packet is jumbo frame, continue filling to allocated buffers until no buffer is left + for i, packet := range buffer.packets { + if i >= int(nowAllocated) { + // There was less allocated buffers than actual packet count so exit early + break + } - // Copy packet data into the buffers. - for i := 0; i < int(allocated); i++ { - packetData := unsafe.Pointer(&packets[i][0]) - C.govpp_copy_packet_data(pb.buffers, C.int(i), packetData, C.uint16_t(len(packets[i]))) + packetData := unsafe.Pointer(&packet[0]) + C.govpp_copy_packet_data(nextFreeBuff, toFill, C.int(i), packetData, C.uint16_t(len(packet))) + } + + if isJumbo && nowAllocated > 0 { + // If we successfully allocated required amount of buffers for entire jumbo to be sent + // simply sub entire amount of jumbo frame packets and leave only 1 so sender will think + // it only sent 1 packet so it does not need to know anything about jumbo frames + subCount += nowAllocated - 1 + } + + // If we do not have enough buffers left to allocate, simply end here to avoid packet loss and try + // to handle it next burst + if endEarly { + break + } } - errCode = C.memif_tx_burst(memif.cHandle, cQueueID, pb.buffers, allocated, &sentCount) + var sentCount C.uint16_t + errCode := C.memif_tx_burst(memif.cHandle, cQueueID, pb.buffers, allocated, &sentCount) err = getMemifError(int(errCode)) if err != nil { return 0, err } - count = uint16(sentCount) - return count, nil + // Prevent negative values + realSent := uint16(sentCount) - uint16(subCount) + if subCount > sentCount { + sentCount = 0 + } + + log.Debugf("%v - sent %v total allocated buffs %v", cQueueID, sentCount, allocated) + return realSent, nil } // RxBurst is used to receive multiple packets in one call from a selected queue. @@ -789,6 +926,7 @@ func (memif *Memif) TxBurst(queueID uint8, packets []RawPacketData) (count uint1 // Rx queue. func (memif *Memif) RxBurst(queueID uint8, count uint16) (packets []RawPacketData, err error) { var recvCount C.uint16_t + packets = make([]RawPacketData, 0) if count == 0 { return packets, nil @@ -799,7 +937,7 @@ func (memif *Memif) RxBurst(queueID uint8, count uint16) (packets []RawPacketDat } // Reallocate Rx buffers if needed to fit the output packets. - pb := memif.rxQueueBufs[queueID] + pb := &memif.rxQueueBufs[queueID] bufCount := int(count) if pb.count < bufCount { newBuffers := C.realloc(unsafe.Pointer(pb.buffers), C.size_t(bufCount*int(C.sizeof_memif_buffer_t))) @@ -824,11 +962,31 @@ func (memif *Memif) RxBurst(queueID uint8, count uint16) (packets []RawPacketDat return packets, err } + chained := len(pb.rxChainBuf) > 0 + if chained { + // We had stored data from previous burst because last buffer in previous burst was chained + // so we need to continue appending to this data + packets = pb.rxChainBuf + pb.rxChainBuf = nil + } + // Copy packet data into the instances of RawPacketData. for i := 0; i < int(recvCount); i++ { var packetSize C.int packetData := C.govpp_get_packet_data(pb.buffers, C.int(i), &packetSize) - packets = append(packets, C.GoBytes(packetData, packetSize)) + packetBytes := C.GoBytes(packetData, packetSize) + + if chained { + // We have chained buffers, so start merging packet data with last read packet + prevPacket := packets[len(packets)-1] + packets[len(packets)-1] = append(prevPacket, packetBytes...) + } else { + packets = append(packets, packetBytes) + } + + // Mark last buffer as chained based on property on current buffer so next buffers + // will try to append data to this one in case we got jumbo frame + chained = C.govpp_is_buffer_chained(pb.buffers, C.int(i)) > 0 } if recvCount > 0 { @@ -840,6 +998,15 @@ func (memif *Memif) RxBurst(queueID uint8, count uint16) (packets []RawPacketDat packets = nil } + if chained { + // We did not had enough space to process all chained buffers to the end so simply tell + // reader that it should not process any packets here and save them for next burst + // to finish reading the buffer chain + pb.rxChainBuf = packets + packets = nil + err = ErrNoBuf + } + return packets, err } @@ -990,6 +1157,11 @@ func pollRxQueue(memif *Memif, queueID uint8) { for { _, err := syscall.EpollWait(epFd, event[:], -1) if err != nil { + errno, _ := err.(syscall.Errno) + //EINTR and EAGAIN should not be considered as a fatal error, try again + if errno == syscall.EINTR || errno == syscall.EAGAIN { + continue + } log.WithField("err", err).Error("epoll_wait() failed") return }