X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=extras%2Flibmemif%2Fadapter.go;h=eb7e209a7468c6c7eea91a653c6febebdfffcf4d;hb=91800ed117b781ede18cd45b84b80408ec31daf5;hp=6058c946012945d5c210a7dff900afa3fbc043e3;hpb=ac6019137d5b10d02c7acac4f63fa22091b9933d;p=govpp.git diff --git a/extras/libmemif/adapter.go b/extras/libmemif/adapter.go index 6058c94..eb7e209 100644 --- a/extras/libmemif/adapter.go +++ b/extras/libmemif/adapter.go @@ -34,7 +34,7 @@ import ( #include #include #include -#include +#include // <-- VPP must be installed! // Feature tests. #ifndef MEMIF_HAVE_CANCEL_POLL_EVENT @@ -50,16 +50,15 @@ memif_cancel_poll_event () // are much easier to work with in cgo. typedef struct { - char *socket_filename; + memif_socket_handle_t socket; char *secret; uint8_t num_s2m_rings; uint8_t num_m2s_rings; uint16_t buffer_size; - memif_log2_ring_size_t log2_ring_size; + uint8_t log2_ring_size; uint8_t is_master; - memif_interface_id_t interface_id; + uint32_t interface_id; char *interface_name; - char *instance_name; memif_interface_mode_t mode; } govpp_memif_conn_args_t; @@ -76,6 +75,8 @@ typedef struct uint8_t role; uint8_t mode; char *socket_filename; + uint8_t regions_num; + memif_region_details_t *regions; uint8_t rx_queues_num; uint8_t tx_queues_num; memif_queue_details_t *rx_queues; @@ -107,7 +108,7 @@ govpp_memif_create (memif_conn_handle_t *conn, govpp_memif_conn_args_t *go_args, { memif_conn_args_t args; memset (&args, 0, sizeof (args)); - args.socket_filename = (char *)go_args->socket_filename; + args.socket = (char *)go_args->socket; if (go_args->secret != NULL) { strncpy ((char *)args.secret, go_args->secret, @@ -124,11 +125,6 @@ govpp_memif_create (memif_conn_handle_t *conn, govpp_memif_conn_args_t *go_args, strncpy ((char *)args.interface_name, go_args->interface_name, sizeof(args.interface_name) - 1); } - if (go_args->instance_name != NULL) - { - strncpy ((char *)args.instance_name, go_args->instance_name, - sizeof (args.instance_name) - 1); - } args.mode = go_args->mode; return memif_create(conn, &args, govpp_on_connect_callback, @@ -136,6 +132,12 @@ govpp_memif_create (memif_conn_handle_t *conn, govpp_memif_conn_args_t *go_args, private_ctx); } +static int +govpp_memif_create_socket (memif_socket_handle_t *sock, char *filename) +{ + return memif_create_socket(sock, filename, NULL); +} + // govpp_memif_get_details keeps reallocating buffer until it is large enough. // The buffer is returned to be deallocated when it is no longer needed. static int @@ -173,6 +175,8 @@ govpp_memif_get_details (memif_conn_handle_t conn, govpp_memif_details_t *govpp_ govpp_md->role = md.role; govpp_md->mode = md.mode; govpp_md->socket_filename = (char *)md.socket_filename; + govpp_md->regions_num = md.regions_num; + govpp_md->regions = md.regions; govpp_md->rx_queues_num = md.rx_queues_num; govpp_md->tx_queues_num = md.tx_queues_num; govpp_md->rx_queues = md.rx_queues; @@ -200,12 +204,20 @@ govpp_get_tx_queue_details (govpp_memif_details_t *md, int index) return md->tx_queues[index]; } -// Copy packet data into the selected buffer. +// Copy packet data into the selected buffer with splitting when necessary static void -govpp_copy_packet_data(memif_buffer_t *buffers, int index, void *data, uint32_t size) +govpp_copy_packet_data(memif_buffer_t *buffers, uint16_t allocated, int bufIndex, void *packetData, uint16_t packetSize) { - buffers[index].data_len = (size > buffers[index].buffer_len ? buffers[index].buffer_len : size); - memcpy(buffers[index].data, data, (size_t)buffers[index].data_len); + int dataOffset = 0; + + do { + buffers[bufIndex].len = (packetSize > buffers[bufIndex].len ? buffers[bufIndex].len : packetSize); + void * curData = (packetData + dataOffset); + memcpy(buffers[bufIndex].data, curData, (size_t)buffers[bufIndex].len); + dataOffset += buffers[bufIndex].len; + bufIndex += 1; + packetSize -= buffers[bufIndex].len; + } while(packetSize > 0 && bufIndex < allocated && buffers[bufIndex].flags & MEMIF_BUFFER_FLAG_NEXT > 0); } // Get packet data from the selected buffer. @@ -213,10 +225,30 @@ govpp_copy_packet_data(memif_buffer_t *buffers, int index, void *data, uint32_t static void * govpp_get_packet_data(memif_buffer_t *buffers, int index, int *size) { - *size = (int)buffers[index].data_len; + *size = (int)buffers[index].len; return buffers[index].data; } +// Checks if memif buffer is chained +static int +govpp_is_buffer_chained(memif_buffer_t *buffers, int index) +{ + return buffers[index].flags & MEMIF_BUFFER_FLAG_NEXT; +} + +// Allocate memif buffers and return pointer to next free buffer +static int +govpp_memif_buffer_alloc(memif_conn_handle_t conn, uint16_t qid, + memif_buffer_t * bufs, uint16_t offset, memif_buffer_t ** nextFreeBuf, + uint16_t count, uint16_t * count_out, uint16_t size) +{ + memif_buffer_t * offsetBufs = (bufs + offset); + int err = memif_buffer_alloc(conn, qid, offsetBufs, count, count_out, size); + *count_out += offset; + *nextFreeBuf = offsetBufs; + return err; +} + */ import "C" @@ -339,8 +371,9 @@ type Memif struct { MemifMeta // Per-library references - ifIndex int // index used in the Go-libmemif context (Context.memifs) - cHandle C.memif_conn_handle_t // handle used in C-libmemif + ifIndex int // index used in the Go-libmemif context (Context.memifs) + cHandle C.memif_conn_handle_t // connection handle used in C-libmemif + sHandle C.memif_socket_handle_t // socket handle used in C-libmemif // Callbacks callbacks *MemifCallbacks @@ -350,6 +383,8 @@ type Memif struct { queueIntCh []chan struct{} // per RX queue interrupt channel // Rx/Tx queues + ringSize int // number of items in each ring + bufferSize int // max buffer size stopQPollFd int // event file descriptor used to stop pollRxQueue-s wg sync.WaitGroup // wait group for all pollRxQueue-s rxQueueBufs []CPacketBuffers // an array of C-libmemif packet buffers for each RX queue @@ -391,8 +426,9 @@ type MemifQueueDetails struct { // CPacketBuffers stores an array of memif buffers for use with TxBurst or RxBurst. type CPacketBuffers struct { - buffers *C.memif_buffer_t - count int + buffers *C.memif_buffer_t + count int + rxChainBuf []RawPacketData } // Context is a global Go-libmemif runtime context. @@ -405,6 +441,11 @@ type Context struct { wg sync.WaitGroup /* wait-group for pollEvents() */ } +type txPacketBuffer struct { + packets []RawPacketData + size int +} + var ( // logger used by the adapter. log *logger.Logger @@ -444,11 +485,11 @@ func Init(appName string) error { // Initialize C-libmemif. var errCode int if appName == "" { - errCode = int(C.memif_init(nil, nil)) + errCode = int(C.memif_init(nil, nil, nil, nil, nil)) } else { appName := C.CString(appName) defer C.free(unsafe.Pointer(appName)) - errCode = int(C.memif_init(nil, appName)) + errCode = int(C.memif_init(nil, appName, nil, nil, nil)) } err := getMemifError(errCode) if err != nil { @@ -517,11 +558,23 @@ func CreateInterface(config *MemifConfig, callbacks *MemifCallbacks) (memif *Mem log.WithField("ifName", config.IfName).Debug("Creating a new memif interface") + log2RingSize := config.Log2RingSize + if log2RingSize == 0 { + log2RingSize = 10 + } + + bufferSize := config.BufferSize + if bufferSize <= 0 { + bufferSize = 2048 + } + // Create memif-wrapper for Go-libmemif. memif = &Memif{ - MemifMeta: config.MemifMeta, - callbacks: &MemifCallbacks{}, - ifIndex: context.nextMemifIndex, + MemifMeta: config.MemifMeta, + callbacks: &MemifCallbacks{}, + ifIndex: context.nextMemifIndex, + ringSize: 1 << log2RingSize, + bufferSize: int(bufferSize), } // Initialize memif callbacks. @@ -543,21 +596,20 @@ func CreateInterface(config *MemifConfig, callbacks *MemifCallbacks) (memif *Mem args := &C.govpp_memif_conn_args_t{} // - socket file name if config.SocketFilename != "" { - args.socket_filename = C.CString(config.SocketFilename) - defer C.free(unsafe.Pointer(args.socket_filename)) + log.WithField("name", config.SocketFilename).Debug("A new memif socket was created") + errCode := C.govpp_memif_create_socket(&memif.sHandle, C.CString(config.SocketFilename)) + if getMemifError(int(errCode)) != nil { + return nil, err + } } + args.socket = memif.sHandle // - interface ID - args.interface_id = C.memif_interface_id_t(config.ConnID) + args.interface_id = C.uint32_t(config.ConnID) // - interface name if config.IfName != "" { args.interface_name = C.CString(config.IfName) defer C.free(unsafe.Pointer(args.interface_name)) } - // - instance name - if config.InstanceName != "" { - args.instance_name = C.CString(config.InstanceName) - defer C.free(unsafe.Pointer(args.instance_name)) - } // - mode switch config.Mode { case IfModeEthernet: @@ -587,12 +639,11 @@ func CreateInterface(config *MemifConfig, callbacks *MemifCallbacks) (memif *Mem // - buffer size args.buffer_size = C.uint16_t(config.BufferSize) // - log_2(ring size) - args.log2_ring_size = C.memif_log2_ring_size_t(config.Log2RingSize) + args.log2_ring_size = C.uint8_t(config.Log2RingSize) // Create memif in C-libmemif. errCode := C.govpp_memif_create(&memif.cHandle, args, unsafe.Pointer(uintptr(memif.ifIndex))) - err = getMemifError(int(errCode)) - if err != nil { + if getMemifError(int(errCode)) != nil { return nil, err } @@ -721,10 +772,6 @@ func (memif *Memif) GetDetails() (details *MemifDetails, err error) { // Multiple TxBurst-s can run concurrently provided that each targets a different // TX queue. func (memif *Memif) TxBurst(queueID uint8, packets []RawPacketData) (count uint16, err error) { - var sentCount C.uint16_t - var allocated C.uint16_t - var bufSize int - if len(packets) == 0 { return 0, nil } @@ -733,16 +780,55 @@ func (memif *Memif) TxBurst(queueID uint8, packets []RawPacketData) (count uint1 return 0, ErrQueueID } - // The largest packet in the set determines the packet buffer size. + var bufCount int + buffers := make([]*txPacketBuffer, 0) + cQueueID := C.uint16_t(queueID) + for _, packet := range packets { - if len(packet) > int(bufSize) { - bufSize = len(packet) + packetLen := len(packet) + log.Debugf("%v - preparing packet with len %v", cQueueID, packetLen) + + if packetLen > memif.bufferSize { + // Create jumbo buffer + buffer := &txPacketBuffer{ + size: packetLen, + packets: []RawPacketData{packet}, + } + + buffers = append(buffers, buffer) + + // Increment bufCount by number of splits in this jumbo + bufCount += (buffer.size + memif.bufferSize - 1) / memif.bufferSize + } else { + buffersLen := len(buffers) + + // This is very first buffer so there is no data to append to, prepare empty one + if buffersLen == 0 { + buffers = []*txPacketBuffer{{}} + buffersLen = 1 + } + + lastBuffer := buffers[buffersLen-1] + + // Last buffer is jumbo buffer, create new buffer + if lastBuffer.size > memif.bufferSize { + lastBuffer = &txPacketBuffer{} + buffers = append(buffers, lastBuffer) + } + + // Determine buffer size by max packet size in buffer + if packetLen > lastBuffer.size { + lastBuffer.size = packetLen + } + + lastBuffer.packets = append(lastBuffer.packets, packet) + bufCount += 1 } } // Reallocate Tx buffers if needed to fit the input packets. - pb := memif.txQueueBufs[queueID] - bufCount := len(packets) + log.Debugf("%v - total buffer to allocate count %v", cQueueID, bufCount) + pb := &memif.txQueueBufs[queueID] if pb.count < bufCount { newBuffers := C.realloc(unsafe.Pointer(pb.buffers), C.size_t(bufCount*int(C.sizeof_memif_buffer_t))) if newBuffers == nil { @@ -755,32 +841,79 @@ func (memif *Memif) TxBurst(queueID uint8, packets []RawPacketData) (count uint1 } // Allocate ring slots. - cQueueID := C.uint16_t(queueID) - errCode := C.memif_buffer_alloc(memif.cHandle, cQueueID, pb.buffers, C.uint16_t(bufCount), - &allocated, C.uint16_t(bufSize)) - err = getMemifError(int(errCode)) - if err == ErrNoBufRing { - // Not enough ring slots, will be less than bufCount. - err = nil - } - if err != nil { - return 0, err - } + var allocated C.uint16_t + var subCount C.uint16_t + for _, buffer := range buffers { + packetCount := C.uint16_t(len(buffer.packets)) + isJumbo := buffer.size > memif.bufferSize + + log.Debugf("%v - trying to send max buff size %v, packets len %v, buffer len %v, jumbo %v", + cQueueID, buffer.size, len(buffer.packets), packetCount, isJumbo) + + var nextFreeBuff *C.memif_buffer_t + startOffset := allocated + errCode := C.govpp_memif_buffer_alloc(memif.cHandle, cQueueID, pb.buffers, startOffset, &nextFreeBuff, + packetCount, &allocated, C.uint16_t(buffer.size)) + + err = getMemifError(int(errCode)) + endEarly := err == ErrNoBufRing + if endEarly { + // Not enough ring slots, will be less than packetCount. + err = nil + } + if err != nil { + return 0, err + } + + // Copy packet data into the buffers. + nowAllocated := allocated - startOffset + toFill := nowAllocated + if !isJumbo { + // If this is not jumbo frame, only 1 packet needs to be copied each iteration + toFill = 1 + } + + // Iterate over all packets and try to fill them into allocated buffers + // If packet is jumbo frame, continue filling to allocated buffers until no buffer is left + for i, packet := range buffer.packets { + if i >= int(nowAllocated) { + // There was less allocated buffers than actual packet count so exit early + break + } + + packetData := unsafe.Pointer(&packet[0]) + C.govpp_copy_packet_data(nextFreeBuff, toFill, C.int(i), packetData, C.uint16_t(len(packet))) + } - // Copy packet data into the buffers. - for i := 0; i < int(allocated); i++ { - packetData := unsafe.Pointer(&packets[i][0]) - C.govpp_copy_packet_data(pb.buffers, C.int(i), packetData, C.uint32_t(len(packets[i]))) + if isJumbo && nowAllocated > 0 { + // If we successfully allocated required amount of buffers for entire jumbo to be sent + // simply sub entire amount of jumbo frame packets and leave only 1 so sender will think + // it only sent 1 packet so it does not need to know anything about jumbo frames + subCount += nowAllocated - 1 + } + + // If we do not have enough buffers left to allocate, simply end here to avoid packet loss and try + // to handle it next burst + if endEarly { + break + } } - errCode = C.memif_tx_burst(memif.cHandle, cQueueID, pb.buffers, allocated, &sentCount) + var sentCount C.uint16_t + errCode := C.memif_tx_burst(memif.cHandle, cQueueID, pb.buffers, allocated, &sentCount) err = getMemifError(int(errCode)) if err != nil { return 0, err } - count = uint16(sentCount) - return count, nil + // Prevent negative values + realSent := uint16(sentCount) - uint16(subCount) + if subCount > sentCount { + sentCount = 0 + } + + log.Debugf("%v - sent %v total allocated buffs %v", cQueueID, sentCount, allocated) + return realSent, nil } // RxBurst is used to receive multiple packets in one call from a selected queue. @@ -793,7 +926,7 @@ func (memif *Memif) TxBurst(queueID uint8, packets []RawPacketData) (count uint1 // Rx queue. func (memif *Memif) RxBurst(queueID uint8, count uint16) (packets []RawPacketData, err error) { var recvCount C.uint16_t - var freed C.uint16_t + packets = make([]RawPacketData, 0) if count == 0 { return packets, nil @@ -804,7 +937,7 @@ func (memif *Memif) RxBurst(queueID uint8, count uint16) (packets []RawPacketDat } // Reallocate Rx buffers if needed to fit the output packets. - pb := memif.rxQueueBufs[queueID] + pb := &memif.rxQueueBufs[queueID] bufCount := int(count) if pb.count < bufCount { newBuffers := C.realloc(unsafe.Pointer(pb.buffers), C.size_t(bufCount*int(C.sizeof_memif_buffer_t))) @@ -829,20 +962,51 @@ func (memif *Memif) RxBurst(queueID uint8, count uint16) (packets []RawPacketDat return packets, err } + chained := len(pb.rxChainBuf) > 0 + if chained { + // We had stored data from previous burst because last buffer in previous burst was chained + // so we need to continue appending to this data + packets = pb.rxChainBuf + pb.rxChainBuf = nil + } + // Copy packet data into the instances of RawPacketData. for i := 0; i < int(recvCount); i++ { var packetSize C.int packetData := C.govpp_get_packet_data(pb.buffers, C.int(i), &packetSize) - packets = append(packets, C.GoBytes(packetData, packetSize)) + packetBytes := C.GoBytes(packetData, packetSize) + + if chained { + // We have chained buffers, so start merging packet data with last read packet + prevPacket := packets[len(packets)-1] + packets[len(packets)-1] = append(prevPacket, packetBytes...) + } else { + packets = append(packets, packetBytes) + } + + // Mark last buffer as chained based on property on current buffer so next buffers + // will try to append data to this one in case we got jumbo frame + chained = C.govpp_is_buffer_chained(pb.buffers, C.int(i)) > 0 } - errCode = C.memif_buffer_free(memif.cHandle, cQueueID, pb.buffers, recvCount, &freed) + if recvCount > 0 { + errCode = C.memif_refill_queue(memif.cHandle, cQueueID, recvCount, 0) + } err = getMemifError(int(errCode)) if err != nil { // Throw away packets to avoid duplicities. packets = nil } + if chained { + // We did not had enough space to process all chained buffers to the end so simply tell + // reader that it should not process any packets here and save them for next burst + // to finish reading the buffer chain + pb.rxChainBuf = packets + packets = nil + err = ErrNoBuf + } + return packets, err } @@ -895,6 +1059,13 @@ func (memif *Memif) initQueues() error { // Initialize Rx/Tx packet buffers. for i = 0; i < len(details.RxQueues); i++ { memif.rxQueueBufs = append(memif.rxQueueBufs, CPacketBuffers{}) + if !memif.IsMaster { + errCode := C.memif_refill_queue(memif.cHandle, C.uint16_t(i), C.uint16_t(memif.ringSize-1), 0) + err = getMemifError(int(errCode)) + if err != nil { + log.Warn(err.Error()) + } + } } for i = 0; i < len(details.TxQueues); i++ { memif.txQueueBufs = append(memif.txQueueBufs, CPacketBuffers{}) @@ -986,6 +1157,11 @@ func pollRxQueue(memif *Memif, queueID uint8) { for { _, err := syscall.EpollWait(epFd, event[:], -1) if err != nil { + errno, _ := err.(syscall.Errno) + //EINTR and EAGAIN should not be considered as a fatal error, try again + if errno == syscall.EINTR || errno == syscall.EAGAIN { + continue + } log.WithField("err", err).Error("epoll_wait() failed") return }