1 // Copyright (c) 2017 Cisco and/or its affiliates.
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
7 // http://www.apache.org/licenses/LICENSE-2.0
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
15 // +build !windows,!darwin
26 logger "github.com/sirupsen/logrus"
36 #include <sys/eventfd.h>
37 #include <libmemif.h> // <-- VPP must be installed!
40 #ifndef MEMIF_HAVE_CANCEL_POLL_EVENT
41 // memif_cancel_poll_event that simply returns ErrUnsupported.
43 memif_cancel_poll_event ()
45 return 102; // ErrUnsupported
49 // govpp_memif_conn_args_t replaces fixed sized arrays with C-strings which
50 // are much easier to work with in cgo.
53 memif_socket_handle_t socket;
55 uint8_t num_s2m_rings;
56 uint8_t num_m2s_rings;
58 uint8_t log2_ring_size;
60 uint32_t interface_id;
62 memif_interface_mode_t mode;
63 } govpp_memif_conn_args_t;
65 // govpp_memif_details_t replaces strings represented with (uint8_t *)
66 // to the standard and easy to work with in cgo: (char *)
72 char *remote_inst_name;
77 char *socket_filename;
79 memif_region_details_t *regions;
80 uint8_t rx_queues_num;
81 uint8_t tx_queues_num;
82 memif_queue_details_t *rx_queues;
83 memif_queue_details_t *tx_queues;
85 } govpp_memif_details_t;
87 extern int go_on_connect_callback(void *privateCtx);
88 extern int go_on_disconnect_callback(void *privateCtx);
90 // Callbacks strip the connection handle away.
93 govpp_on_connect_callback(memif_conn_handle_t conn, void *private_ctx)
95 return go_on_connect_callback(private_ctx);
99 govpp_on_disconnect_callback(memif_conn_handle_t conn, void *private_ctx)
101 return go_on_disconnect_callback(private_ctx);
104 // govpp_memif_create uses govpp_memif_conn_args_t.
106 govpp_memif_create (memif_conn_handle_t *conn, govpp_memif_conn_args_t *go_args,
109 memif_conn_args_t args;
110 memset (&args, 0, sizeof (args));
111 args.socket = (char *)go_args->socket;
112 if (go_args->secret != NULL)
114 strncpy ((char *)args.secret, go_args->secret,
115 sizeof (args.secret) - 1);
117 args.num_s2m_rings = go_args->num_s2m_rings;
118 args.num_m2s_rings = go_args->num_m2s_rings;
119 args.buffer_size = go_args->buffer_size;
120 args.log2_ring_size = go_args->log2_ring_size;
121 args.is_master = go_args->is_master;
122 args.interface_id = go_args->interface_id;
123 if (go_args->interface_name != NULL)
125 strncpy ((char *)args.interface_name, go_args->interface_name,
126 sizeof(args.interface_name) - 1);
128 args.mode = go_args->mode;
130 return memif_create(conn, &args, govpp_on_connect_callback,
131 govpp_on_disconnect_callback, NULL,
136 govpp_memif_create_socket (memif_socket_handle_t *sock, char *filename)
138 return memif_create_socket(sock, filename, NULL);
141 // govpp_memif_get_details keeps reallocating buffer until it is large enough.
142 // The buffer is returned to be deallocated when it is no longer needed.
144 govpp_memif_get_details (memif_conn_handle_t conn, govpp_memif_details_t *govpp_md,
148 size_t buflen = 1 << 7;
149 char *buffer = NULL, *new_buffer = NULL;
150 memif_details_t md = {0};
153 // initial malloc (256 bytes) or realloc
155 new_buffer = realloc(buffer, buflen);
156 if (new_buffer == NULL)
159 return MEMIF_ERR_NOMEM;
162 // try to get details
163 rv = memif_get_details(conn, &md, buffer, buflen);
164 } while (rv == MEMIF_ERR_NOBUF_DET);
169 govpp_md->if_name = (char *)md.if_name;
170 govpp_md->inst_name = (char *)md.inst_name;
171 govpp_md->remote_if_name = (char *)md.remote_if_name;
172 govpp_md->remote_inst_name = (char *)md.remote_inst_name;
173 govpp_md->id = md.id;
174 govpp_md->secret = (char *)md.secret;
175 govpp_md->role = md.role;
176 govpp_md->mode = md.mode;
177 govpp_md->socket_filename = (char *)md.socket_filename;
178 govpp_md->regions_num = md.regions_num;
179 govpp_md->regions = md.regions;
180 govpp_md->rx_queues_num = md.rx_queues_num;
181 govpp_md->tx_queues_num = md.tx_queues_num;
182 govpp_md->rx_queues = md.rx_queues;
183 govpp_md->tx_queues = md.tx_queues;
184 govpp_md->link_up_down = md.link_up_down;
191 // Used to avoid cumbersome tricks that use unsafe.Pointer() + unsafe.Sizeof()
192 // or even cast C-array directly into Go-slice.
193 static memif_queue_details_t
194 govpp_get_rx_queue_details (govpp_memif_details_t *md, int index)
196 return md->rx_queues[index];
199 // Used to avoid cumbersome tricks that use unsafe.Pointer() + unsafe.Sizeof()
200 // or even cast C-array directly into Go-slice.
201 static memif_queue_details_t
202 govpp_get_tx_queue_details (govpp_memif_details_t *md, int index)
204 return md->tx_queues[index];
207 // Copy packet data into the selected buffer with splitting when necessary
209 govpp_copy_packet_data(memif_buffer_t *buffers, uint16_t allocated, int bufIndex, void *packetData, uint16_t packetSize)
214 buffers[bufIndex].len = (packetSize > buffers[bufIndex].len ? buffers[bufIndex].len : packetSize);
215 void * curData = (packetData + dataOffset);
216 memcpy(buffers[bufIndex].data, curData, (size_t)buffers[bufIndex].len);
217 dataOffset += buffers[bufIndex].len;
219 packetSize -= buffers[bufIndex].len;
220 } while(packetSize > 0 && bufIndex < allocated && buffers[bufIndex].flags & MEMIF_BUFFER_FLAG_NEXT > 0);
223 // Get packet data from the selected buffer.
224 // Used to avoid an ugly unsafe.Pointer() + unsafe.Sizeof().
226 govpp_get_packet_data(memif_buffer_t *buffers, int index, int *size)
228 *size = (int)buffers[index].len;
229 return buffers[index].data;
232 // Checks if memif buffer is chained
234 govpp_is_buffer_chained(memif_buffer_t *buffers, int index)
236 return buffers[index].flags & MEMIF_BUFFER_FLAG_NEXT;
239 // Allocate memif buffers and return pointer to next free buffer
241 govpp_memif_buffer_alloc(memif_conn_handle_t conn, uint16_t qid,
242 memif_buffer_t * bufs, uint16_t offset, memif_buffer_t ** nextFreeBuf,
243 uint16_t count, uint16_t * count_out, uint16_t size)
245 memif_buffer_t * offsetBufs = (bufs + offset);
246 int err = memif_buffer_alloc(conn, qid, offsetBufs, count, count_out, size);
247 *count_out += offset;
248 *nextFreeBuf = offsetBufs;
255 // IfMode represents the mode (layer/behaviour) in which the interface operates.
259 // IfModeEthernet tells memif to operate on the L2 layer.
260 IfModeEthernet IfMode = iota
262 // IfModeIP tells memif to operate on the L3 layer.
265 // IfModePuntInject tells memif to behave as Inject/Punt interface.
269 // RxMode is used to switch between polling and interrupt for RX.
273 // RxModeInterrupt tells libmemif to send interrupt signal when data are available.
274 RxModeInterrupt RxMode = iota
276 // RxModePolling means that the user needs to explicitly poll for data on RX
281 // RawPacketData represents raw packet data. libmemif doesn't care what the
282 // actual content is, it only manipulates with raw bytes.
283 type RawPacketData []byte
285 // MemifMeta is used to store a basic memif metadata needed for identification
286 // and connection establishment.
287 type MemifMeta struct {
288 // IfName is the interface name. Has to be unique across all created memifs.
289 // Interface name is truncated if needed to have no more than 32 characters.
292 // InstanceName identifies the endpoint. If omitted, the application
293 // name passed to Init() will be used instead.
294 // Instance name is truncated if needed to have no more than 32 characters.
297 // ConnID is a connection ID used to match opposite sides of the memif
301 // SocketFilename is the filename of the AF_UNIX socket through which
302 // the connection is established.
303 // The string is truncated if neede to fit into sockaddr_un.sun_path
304 // (108 characters on Linux).
305 SocketFilename string
307 // Secret must be the same on both sides for the authentication to succeed.
308 // Empty string is allowed.
309 // The secret is truncated if needed to have no more than 24 characters.
312 // IsMaster is set to true if memif operates in the Master mode.
315 // Mode is the mode (layer/behaviour) in which the memif operates.
319 // MemifShmSpecs is used to store the specification of the shared memory segment
320 // used by memif to send/receive packets.
321 type MemifShmSpecs struct {
322 // NumRxQueues is the number of Rx queues.
323 // Default is 1 (used if the value is 0).
326 // NumTxQueues is the number of Tx queues.
327 // Default is 1 (used if the value is 0).
330 // BufferSize is the size of the buffer to hold one packet, or a single
331 // fragment of a jumbo frame. Default is 2048 (used if the value is 0).
334 // Log2RingSize is the number of items in the ring represented through
335 // the logarithm base 2.
336 // Default is 10 (used if the value is 0).
340 // MemifConfig is the memif configuration.
341 // Used as the input argument to CreateInterface().
342 // It is the slave's config that mostly decides the parameters of the connection,
343 // but master may limit some of the quantities if needed (based on the memif
344 // protocol or master's configuration)
345 type MemifConfig struct {
350 // ConnUpdateCallback is a callback type declaration used with callbacks
351 // related to connection status changes.
352 type ConnUpdateCallback func(memif *Memif) (err error)
354 // MemifCallbacks is a container for all callbacks provided by memif.
355 // Any callback can be nil, in which case it will be simply skipped.
356 // Important: Do not call CreateInterface() or Memif.Close() from within a callback
357 // or a deadlock will occur. Instead send signal through a channel to another
358 // go routine which will be able to create/remove memif interface(s).
359 type MemifCallbacks struct {
360 // OnConnect is triggered when a connection for a given memif was established.
361 OnConnect ConnUpdateCallback
363 // OnDisconnect is triggered when a connection for a given memif was lost.
364 OnDisconnect ConnUpdateCallback
367 // Memif represents a single memif interface. It provides methods to send/receive
368 // packets in bursts in either the polling mode or in the interrupt mode with
369 // the help of golang channels.
373 // Per-library references
374 ifIndex int // index used in the Go-libmemif context (Context.memifs)
375 cHandle C.memif_conn_handle_t // connection handle used in C-libmemif
376 sHandle C.memif_socket_handle_t // socket handle used in C-libmemif
379 callbacks *MemifCallbacks
382 intCh chan uint8 // memif-global interrupt channel (value = queue ID)
383 intErrCh chan error // triggered when interrupt error occurs
384 queueIntCh []chan struct{} // per RX queue interrupt channel
387 ringSize int // number of items in each ring
388 bufferSize int // max buffer size
389 stopQPollFd int // event file descriptor used to stop pollRxQueue-s
390 wg sync.WaitGroup // wait group for all pollRxQueue-s
391 rxQueueBufs []CPacketBuffers // an array of C-libmemif packet buffers for each RX queue
392 txQueueBufs []CPacketBuffers // an array of C-libmemif packet buffers for each TX queue
395 // MemifDetails provides a detailed runtime information about a memif interface.
396 type MemifDetails struct {
401 // MemifConnDetails provides a detailed runtime information about a memif
403 type MemifConnDetails struct {
404 // RemoteIfName is the name of the memif on the opposite side.
406 // RemoteInstanceName is the name of the endpoint on the opposite side.
407 RemoteInstanceName string
408 // HasLink is true if the connection has link (= is established and functional).
410 // RxQueues contains details for each Rx queue.
411 RxQueues []MemifQueueDetails
412 // TxQueues contains details for each Tx queue.
413 TxQueues []MemifQueueDetails
416 // MemifQueueDetails provides a detailed runtime information about a memif queue.
417 // Queue = Ring + the associated buffers (one directional).
418 type MemifQueueDetails struct {
419 // QueueID is the ID of the queue.
421 // RingSize is the number of slots in the ring (not logarithmic).
423 // BufferSize is the size of each buffer pointed to from the ring slots.
425 /* Further ring information TO-BE-ADDED when C-libmemif supports them. */
428 // CPacketBuffers stores an array of memif buffers for use with TxBurst or RxBurst.
429 type CPacketBuffers struct {
430 buffers *C.memif_buffer_t
432 rxChainBuf []RawPacketData
435 // Context is a global Go-libmemif runtime context.
436 type Context struct {
439 memifs map[int] /* ifIndex */ *Memif /* slice of all active memif interfaces */
442 wg sync.WaitGroup /* wait-group for pollEvents() */
445 type txPacketBuffer struct {
446 packets []RawPacketData
451 // logger used by the adapter.
454 // Global Go-libmemif context.
455 context = &Context{initialized: false}
458 // init initializes global logger, which logs debug level messages to stdout.
462 log.Level = logger.DebugLevel
465 // SetLogger changes the logger for Go-libmemif to the provided one.
466 // The logger is not used for logging of C-libmemif.
467 func SetLogger(l *logger.Logger) {
471 // Init initializes the libmemif library. Must by called exactly once and before
472 // any libmemif functions. Do not forget to call Cleanup() before exiting
474 // <appName> should be a human-readable string identifying your application.
475 // For example, VPP returns the version information ("show version" from VPP CLI).
476 func Init(appName string) error {
478 defer context.lock.Unlock()
480 if context.initialized {
481 return ErrAlreadyInit
484 log.Debug("Initializing libmemif library")
486 // Initialize C-libmemif.
489 errCode = int(C.memif_init(nil, nil, nil, nil, nil))
491 appName := C.CString(appName)
492 defer C.free(unsafe.Pointer(appName))
493 errCode = int(C.memif_init(nil, appName, nil, nil, nil))
495 err := getMemifError(errCode)
500 // Initialize the map of memory interfaces.
501 context.memifs = make(map[int]*Memif)
503 // Start event polling.
507 context.initialized = true
508 log.Debug("libmemif library was initialized")
512 // Cleanup cleans up all the resources allocated by libmemif.
513 func Cleanup() error {
515 defer context.lock.Unlock()
517 if !context.initialized {
521 log.Debug("Closing libmemif library")
523 // Delete all active interfaces.
524 for _, memif := range context.memifs {
528 // Stop the event loop (if supported by C-libmemif).
529 errCode := C.memif_cancel_poll_event()
530 err := getMemifError(int(errCode))
532 log.Debug("Waiting for pollEvents() to stop...")
534 log.Debug("pollEvents() has stopped...")
536 log.WithField("err", err).Debug("NOT Waiting for pollEvents to stop...")
539 // Run cleanup for C-libmemif.
540 err = getMemifError(int(C.memif_cleanup()))
542 context.initialized = false
543 log.Debug("libmemif library was closed")
548 // CreateInterface creates a new memif interface with the given configuration.
549 // The same callbacks can be used with multiple memifs. The first callback input
550 // argument (*Memif) can be used to tell which memif the callback was triggered for.
551 // The method is thread-safe.
552 func CreateInterface(config *MemifConfig, callbacks *MemifCallbacks) (memif *Memif, err error) {
554 defer context.lock.Unlock()
556 if !context.initialized {
557 return nil, ErrNotInit
560 log.WithField("ifName", config.IfName).Debug("Creating a new memif interface")
562 log2RingSize := config.Log2RingSize
563 if log2RingSize == 0 {
567 bufferSize := config.BufferSize
572 // Create memif-wrapper for Go-libmemif.
574 MemifMeta: config.MemifMeta,
575 callbacks: &MemifCallbacks{},
576 ifIndex: context.nextMemifIndex,
577 ringSize: 1 << log2RingSize,
578 bufferSize: int(bufferSize),
581 // Initialize memif callbacks.
582 if callbacks != nil {
583 memif.callbacks.OnConnect = callbacks.OnConnect
584 memif.callbacks.OnDisconnect = callbacks.OnDisconnect
587 // Initialize memif-global interrupt channel.
588 memif.intCh = make(chan uint8, 1<<6)
589 memif.intErrCh = make(chan error, 1<<6)
591 // Initialize event file descriptor for stopping Rx/Tx queue polling.
592 memif.stopQPollFd = int(C.eventfd(0, C.EFD_NONBLOCK))
593 if memif.stopQPollFd < 0 {
594 return nil, ErrSyscall
597 // Initialize memif input arguments.
598 args := &C.govpp_memif_conn_args_t{}
599 // - socket file name
600 if config.SocketFilename != "" {
601 log.WithField("name", config.SocketFilename).Debug("A new memif socket was created")
602 errCode := C.govpp_memif_create_socket(&memif.sHandle, C.CString(config.SocketFilename))
603 if getMemifError(int(errCode)) != nil {
607 args.socket = memif.sHandle
609 args.interface_id = C.uint32_t(config.ConnID)
611 if config.IfName != "" {
612 args.interface_name = C.CString(config.IfName)
613 defer C.free(unsafe.Pointer(args.interface_name))
618 args.mode = C.MEMIF_INTERFACE_MODE_ETHERNET
620 args.mode = C.MEMIF_INTERFACE_MODE_IP
621 case IfModePuntInject:
622 args.mode = C.MEMIF_INTERFACE_MODE_PUNT_INJECT
624 args.mode = C.MEMIF_INTERFACE_MODE_ETHERNET
627 if config.Secret != "" {
628 args.secret = C.CString(config.Secret)
629 defer C.free(unsafe.Pointer(args.secret))
631 // - master/slave flag + number of Rx/Tx queues
633 args.num_s2m_rings = C.uint8_t(config.NumRxQueues)
634 args.num_m2s_rings = C.uint8_t(config.NumTxQueues)
635 args.is_master = C.uint8_t(1)
637 args.num_s2m_rings = C.uint8_t(config.NumTxQueues)
638 args.num_m2s_rings = C.uint8_t(config.NumRxQueues)
639 args.is_master = C.uint8_t(0)
642 args.buffer_size = C.uint16_t(config.BufferSize)
643 // - log_2(ring size)
644 args.log2_ring_size = C.uint8_t(config.Log2RingSize)
646 // Create memif in C-libmemif.
647 errCode := C.govpp_memif_create(&memif.cHandle, args, unsafe.Pointer(uintptr(memif.ifIndex)))
648 if getMemifError(int(errCode)) != nil {
652 // Register the new memif.
653 context.memifs[memif.ifIndex] = memif
654 context.nextMemifIndex++
655 log.WithField("ifName", config.IfName).Debug("A new memif interface was created")
660 // GetInterruptChan returns a channel which is continuously being filled with
661 // IDs of queues with data ready to be received.
662 // Since there is only one interrupt signal sent for an entire burst of packets,
663 // an interrupt handling routine should repeatedly call RxBurst() until
664 // the function returns an empty slice of packets. This way it is ensured
665 // that there are no packets left on the queue unread when the interrupt signal
667 // The method is thread-safe.
668 func (memif *Memif) GetInterruptChan() (ch <-chan uint8 /* queue ID */) {
672 // GetInterruptErrorChan returns an Error channel
673 // which fires if there are errors occurred while read data.
674 func (memif *Memif) GetInterruptErrorChan() (ch <-chan error /* The error */) {
675 return memif.intErrCh
678 // GetQueueInterruptChan returns an empty-data channel which fires every time
679 // there are data to read on a given queue.
680 // It is only valid to call this function if memif is in the connected state.
681 // Channel is automatically closed when the connection goes down (but after
682 // the user provided callback OnDisconnect has executed).
683 // Since there is only one interrupt signal sent for an entire burst of packets,
684 // an interrupt handling routine should repeatedly call RxBurst() until
685 // the function returns an empty slice of packets. This way it is ensured
686 // that there are no packets left on the queue unread when the interrupt signal
688 // The method is thread-safe.
689 func (memif *Memif) GetQueueInterruptChan(queueID uint8) (ch <-chan struct{}, err error) {
690 if int(queueID) >= len(memif.queueIntCh) {
691 return nil, ErrQueueID
693 return memif.queueIntCh[queueID], nil
696 // SetRxMode allows to switch between the interrupt and the polling mode for Rx.
697 // The method is thread-safe.
698 func (memif *Memif) SetRxMode(queueID uint8, rxMode RxMode) (err error) {
699 var cRxMode C.memif_rx_mode_t
701 case RxModeInterrupt:
702 cRxMode = C.MEMIF_RX_MODE_INTERRUPT
704 cRxMode = C.MEMIF_RX_MODE_POLLING
706 cRxMode = C.MEMIF_RX_MODE_INTERRUPT
708 errCode := C.memif_set_rx_mode(memif.cHandle, cRxMode, C.uint16_t(queueID))
709 return getMemifError(int(errCode))
712 // GetDetails returns a detailed runtime information about this memif.
713 // The method is thread-safe.
714 func (memif *Memif) GetDetails() (details *MemifDetails, err error) {
715 cDetails := C.govpp_memif_details_t{}
718 // Get memif details from C-libmemif.
719 errCode := C.govpp_memif_get_details(memif.cHandle, &cDetails, &buf)
720 err = getMemifError(int(errCode))
724 defer C.free(unsafe.Pointer(buf))
726 // Convert details from C to Go.
727 details = &MemifDetails{}
729 details.IfName = C.GoString(cDetails.if_name)
730 details.InstanceName = C.GoString(cDetails.inst_name)
731 details.ConnID = uint32(cDetails.id)
732 details.SocketFilename = C.GoString(cDetails.socket_filename)
733 if cDetails.secret != nil {
734 details.Secret = C.GoString(cDetails.secret)
736 details.IsMaster = cDetails.role == C.uint8_t(0)
737 switch cDetails.mode {
738 case C.MEMIF_INTERFACE_MODE_ETHERNET:
739 details.Mode = IfModeEthernet
740 case C.MEMIF_INTERFACE_MODE_IP:
741 details.Mode = IfModeIP
742 case C.MEMIF_INTERFACE_MODE_PUNT_INJECT:
743 details.Mode = IfModePuntInject
745 details.Mode = IfModeEthernet
747 // - connection details:
748 details.RemoteIfName = C.GoString(cDetails.remote_if_name)
749 details.RemoteInstanceName = C.GoString(cDetails.remote_inst_name)
750 details.HasLink = cDetails.link_up_down == C.uint8_t(1)
753 for i = 0; i < uint8(cDetails.rx_queues_num); i++ {
754 cRxQueue := C.govpp_get_rx_queue_details(&cDetails, C.int(i))
755 queueDetails := MemifQueueDetails{
756 QueueID: uint8(cRxQueue.qid),
757 RingSize: uint32(cRxQueue.ring_size),
758 BufferSize: uint16(cRxQueue.buffer_size),
760 details.RxQueues = append(details.RxQueues, queueDetails)
763 for i = 0; i < uint8(cDetails.tx_queues_num); i++ {
764 cTxQueue := C.govpp_get_tx_queue_details(&cDetails, C.int(i))
765 queueDetails := MemifQueueDetails{
766 QueueID: uint8(cTxQueue.qid),
767 RingSize: uint32(cTxQueue.ring_size),
768 BufferSize: uint16(cTxQueue.buffer_size),
770 details.TxQueues = append(details.TxQueues, queueDetails)
776 // TxBurst is used to send multiple packets in one call into a selected queue.
777 // The actual number of packets sent may be smaller and is returned as <count>.
778 // The method is non-blocking even if the ring is full and no packet can be sent.
779 // It is only valid to call this function if memif is in the connected state.
780 // Multiple TxBurst-s can run concurrently provided that each targets a different
782 func (memif *Memif) TxBurst(queueID uint8, packets []RawPacketData) (count uint16, err error) {
783 if len(packets) == 0 {
787 if int(queueID) >= len(memif.txQueueBufs) {
792 buffers := make([]*txPacketBuffer, 0)
793 cQueueID := C.uint16_t(queueID)
795 for _, packet := range packets {
796 packetLen := len(packet)
797 log.Debugf("%v - preparing packet with len %v", cQueueID, packetLen)
799 if packetLen > memif.bufferSize {
800 // Create jumbo buffer
801 buffer := &txPacketBuffer{
803 packets: []RawPacketData{packet},
806 buffers = append(buffers, buffer)
808 // Increment bufCount by number of splits in this jumbo
809 bufCount += (buffer.size + memif.bufferSize - 1) / memif.bufferSize
811 buffersLen := len(buffers)
813 // This is very first buffer so there is no data to append to, prepare empty one
815 buffers = []*txPacketBuffer{{}}
819 lastBuffer := buffers[buffersLen-1]
821 // Last buffer is jumbo buffer, create new buffer
822 if lastBuffer.size > memif.bufferSize {
823 lastBuffer = &txPacketBuffer{}
824 buffers = append(buffers, lastBuffer)
827 // Determine buffer size by max packet size in buffer
828 if packetLen > lastBuffer.size {
829 lastBuffer.size = packetLen
832 lastBuffer.packets = append(lastBuffer.packets, packet)
837 // Reallocate Tx buffers if needed to fit the input packets.
838 log.Debugf("%v - total buffer to allocate count %v", cQueueID, bufCount)
839 pb := &memif.txQueueBufs[queueID]
840 if pb.count < bufCount {
841 newBuffers := C.realloc(unsafe.Pointer(pb.buffers), C.size_t(bufCount*int(C.sizeof_memif_buffer_t)))
842 if newBuffers == nil {
843 // Realloc failed, <count> will be less than len(packets).
846 pb.buffers = (*C.memif_buffer_t)(newBuffers)
851 // Allocate ring slots.
852 var allocated C.uint16_t
853 var subCount C.uint16_t
854 for _, buffer := range buffers {
855 packetCount := C.uint16_t(len(buffer.packets))
856 isJumbo := buffer.size > memif.bufferSize
858 log.Debugf("%v - trying to send max buff size %v, packets len %v, buffer len %v, jumbo %v",
859 cQueueID, buffer.size, len(buffer.packets), packetCount, isJumbo)
861 var nextFreeBuff *C.memif_buffer_t
862 startOffset := allocated
863 errCode := C.govpp_memif_buffer_alloc(memif.cHandle, cQueueID, pb.buffers, startOffset, &nextFreeBuff,
864 packetCount, &allocated, C.uint16_t(buffer.size))
866 err = getMemifError(int(errCode))
867 endEarly := err == ErrNoBufRing
869 // Not enough ring slots, <count> will be less than packetCount.
876 // Copy packet data into the buffers.
877 nowAllocated := allocated - startOffset
878 toFill := nowAllocated
880 // If this is not jumbo frame, only 1 packet needs to be copied each iteration
884 // Iterate over all packets and try to fill them into allocated buffers
885 // If packet is jumbo frame, continue filling to allocated buffers until no buffer is left
886 for i, packet := range buffer.packets {
887 if i >= int(nowAllocated) {
888 // There was less allocated buffers than actual packet count so exit early
892 packetData := unsafe.Pointer(&packet[0])
893 C.govpp_copy_packet_data(nextFreeBuff, toFill, C.int(i), packetData, C.uint16_t(len(packet)))
896 if isJumbo && nowAllocated > 0 {
897 // If we successfully allocated required amount of buffers for entire jumbo to be sent
898 // simply sub entire amount of jumbo frame packets and leave only 1 so sender will think
899 // it only sent 1 packet so it does not need to know anything about jumbo frames
900 subCount += nowAllocated - 1
903 // If we do not have enough buffers left to allocate, simply end here to avoid packet loss and try
904 // to handle it next burst
910 var sentCount C.uint16_t
911 errCode := C.memif_tx_burst(memif.cHandle, cQueueID, pb.buffers, allocated, &sentCount)
912 err = getMemifError(int(errCode))
917 // Prevent negative values
918 realSent := uint16(sentCount) - uint16(subCount)
919 if subCount > sentCount {
923 log.Debugf("%v - sent %v total allocated buffs %v", cQueueID, sentCount, allocated)
927 // RxBurst is used to receive multiple packets in one call from a selected queue.
928 // <count> is the number of packets to receive. The actual number of packets
929 // received may be smaller. <count> effectively limits the maximum number
930 // of packets to receive in one burst (for a flat, predictable memory usage).
931 // The method is non-blocking even if there are no packets to receive.
932 // It is only valid to call this function if memif is in the connected state.
933 // Multiple RxBurst-s can run concurrently provided that each targets a different
935 func (memif *Memif) RxBurst(queueID uint8, count uint16) (packets []RawPacketData, err error) {
936 var recvCount C.uint16_t
937 packets = make([]RawPacketData, 0)
943 if int(queueID) >= len(memif.rxQueueBufs) {
944 return packets, ErrQueueID
947 // Reallocate Rx buffers if needed to fit the output packets.
948 pb := &memif.rxQueueBufs[queueID]
949 bufCount := int(count)
950 if pb.count < bufCount {
951 newBuffers := C.realloc(unsafe.Pointer(pb.buffers), C.size_t(bufCount*int(C.sizeof_memif_buffer_t)))
952 if newBuffers == nil {
953 // Realloc failed, len(<packets>) will be certainly less than <count>.
956 pb.buffers = (*C.memif_buffer_t)(newBuffers)
961 cQueueID := C.uint16_t(queueID)
962 errCode := C.memif_rx_burst(memif.cHandle, cQueueID, pb.buffers, C.uint16_t(bufCount), &recvCount)
963 err = getMemifError(int(errCode))
965 // More packets to read - the user is expected to run RxBurst() until there
966 // are no more packets to receive.
973 chained := len(pb.rxChainBuf) > 0
975 // We had stored data from previous burst because last buffer in previous burst was chained
976 // so we need to continue appending to this data
977 packets = pb.rxChainBuf
981 // Copy packet data into the instances of RawPacketData.
982 for i := 0; i < int(recvCount); i++ {
984 packetData := C.govpp_get_packet_data(pb.buffers, C.int(i), &packetSize)
985 packetBytes := C.GoBytes(packetData, packetSize)
988 // We have chained buffers, so start merging packet data with last read packet
989 prevPacket := packets[len(packets)-1]
990 packets[len(packets)-1] = append(prevPacket, packetBytes...)
992 packets = append(packets, packetBytes)
995 // Mark last buffer as chained based on property on current buffer so next buffers
996 // will try to append data to this one in case we got jumbo frame
997 chained = C.govpp_is_buffer_chained(pb.buffers, C.int(i)) > 0
1001 errCode = C.memif_refill_queue(memif.cHandle, cQueueID, recvCount, 0)
1003 err = getMemifError(int(errCode))
1005 // Throw away packets to avoid duplicities.
1010 // We did not had enough space to process all chained buffers to the end so simply tell
1011 // reader that it should not process any packets here and save them for next burst
1012 // to finish reading the buffer chain
1013 pb.rxChainBuf = packets
1021 // Close removes the memif interface. If the memif is in the connected state,
1022 // the connection is first properly closed.
1023 // Do not access memif after it is closed, let garbage collector to remove it.
1024 func (memif *Memif) Close() error {
1025 log.WithField("ifName", memif.IfName).Debug("Closing the memif interface")
1027 // Delete memif from C-libmemif.
1028 err := getMemifError(int(C.memif_delete(&memif.cHandle)))
1031 // Close memif-global interrupt channel.
1033 close(memif.intErrCh)
1034 // Close file descriptor stopQPollFd.
1035 C.close(C.int(memif.stopQPollFd))
1039 defer context.lock.Unlock()
1040 // Unregister the interface from the context.
1041 delete(context.memifs, memif.ifIndex)
1042 log.WithField("ifName", memif.IfName).Debug("memif interface was closed")
1047 // initQueues allocates resources associated with Rx/Tx queues.
1048 func (memif *Memif) initQueues() error {
1049 // Get Rx/Tx queues count.
1050 details, err := memif.GetDetails()
1055 log.WithFields(logger.Fields{
1056 "ifName": memif.IfName,
1057 "Rx-count": len(details.RxQueues),
1058 "Tx-count": len(details.TxQueues),
1059 }).Debug("Initializing Rx/Tx queues.")
1061 // Initialize interrupt channels.
1063 for i = 0; i < len(details.RxQueues); i++ {
1064 queueIntCh := make(chan struct{}, 1)
1065 memif.queueIntCh = append(memif.queueIntCh, queueIntCh)
1068 // Initialize Rx/Tx packet buffers.
1069 for i = 0; i < len(details.RxQueues); i++ {
1070 memif.rxQueueBufs = append(memif.rxQueueBufs, CPacketBuffers{})
1071 if !memif.IsMaster {
1072 errCode := C.memif_refill_queue(memif.cHandle, C.uint16_t(i), C.uint16_t(memif.ringSize-1), 0)
1073 err = getMemifError(int(errCode))
1075 log.Warn(err.Error())
1079 for i = 0; i < len(details.TxQueues); i++ {
1080 memif.txQueueBufs = append(memif.txQueueBufs, CPacketBuffers{})
1086 // closeQueues deallocates all resources associated with Rx/Tx queues.
1087 func (memif *Memif) closeQueues() {
1088 log.WithFields(logger.Fields{
1089 "ifName": memif.IfName,
1090 "Rx-count": len(memif.rxQueueBufs),
1091 "Tx-count": len(memif.txQueueBufs),
1092 }).Debug("Closing Rx/Tx queues.")
1094 // Close interrupt channels.
1095 for _, ch := range memif.queueIntCh {
1098 memif.queueIntCh = nil
1100 // Deallocate Rx/Tx packet buffers.
1101 for _, pb := range memif.rxQueueBufs {
1102 C.free(unsafe.Pointer(pb.buffers))
1104 memif.rxQueueBufs = nil
1105 for _, pb := range memif.txQueueBufs {
1106 C.free(unsafe.Pointer(pb.buffers))
1108 memif.txQueueBufs = nil
1111 // pollEvents repeatedly polls for a libmemif event.
1113 defer context.wg.Done()
1115 errCode := C.memif_poll_event(C.int(-1))
1116 err := getMemifError(int(errCode))
1117 if err == ErrPollCanceled {
1123 // pollRxQueue repeatedly polls an Rx queue for interrupts.
1124 func pollRxQueue(memif *Memif, queueID uint8) {
1125 defer memif.wg.Done()
1127 log.WithFields(logger.Fields{
1128 "ifName": memif.IfName,
1129 "queue-ID": queueID,
1130 }).Debug("Started queue interrupt polling.")
1133 errCode := C.memif_get_queue_efd(memif.cHandle, C.uint16_t(queueID), &qfd)
1134 err := getMemifError(int(errCode))
1136 log.WithField("err", err).Error("memif_get_queue_efd() failed")
1140 // Create epoll file descriptor.
1141 var event [1]syscall.EpollEvent
1142 epFd, err := syscall.EpollCreate1(0)
1144 log.WithField("err", err).Error("epoll_create1() failed")
1147 defer syscall.Close(epFd)
1149 // Add Rx queue interrupt file descriptor.
1150 event[0].Events = syscall.EPOLLIN
1151 event[0].Fd = int32(qfd)
1152 if err = syscall.EpollCtl(epFd, syscall.EPOLL_CTL_ADD, int(qfd), &event[0]); err != nil {
1153 log.WithField("err", err).Error("epoll_ctl() failed")
1157 // Add file descriptor used to stop this go routine.
1158 event[0].Events = syscall.EPOLLIN
1159 event[0].Fd = int32(memif.stopQPollFd)
1160 if err = syscall.EpollCtl(epFd, syscall.EPOLL_CTL_ADD, memif.stopQPollFd, &event[0]); err != nil {
1161 log.WithField("err", err).Error("epoll_ctl() failed")
1165 // Poll for interrupts.
1167 _, err := syscall.EpollWait(epFd, event[:], -1)
1169 errno, _ := err.(syscall.Errno)
1170 //EINTR and EAGAIN should not be considered as a fatal error, try again
1171 if errno == syscall.EINTR || errno == syscall.EAGAIN {
1174 log.WithField("err", err).Error("epoll_wait() failed")
1175 memif.intErrCh <- err
1179 // Handle Rx Interrupt.
1180 if event[0].Fd == int32(qfd) {
1181 // Consume the interrupt event.
1182 buf := make([]byte, 8)
1183 _, err = syscall.Read(int(qfd), buf[:])
1185 log.WithField("err", err).Warn("read() failed")
1188 // Send signal to memif-global interrupt channel.
1190 case memif.intCh <- queueID:
1196 // Send signal to queue-specific interrupt channel.
1198 case memif.queueIntCh[queueID] <- struct{}{}:
1205 // Stop the go routine if requested.
1206 if event[0].Fd == int32(memif.stopQPollFd) {
1207 log.WithFields(logger.Fields{
1208 "ifName": memif.IfName,
1209 "queue-ID": queueID,
1210 }).Debug("Stopped queue interrupt polling.")
1216 //export go_on_connect_callback
1217 func go_on_connect_callback(privateCtx unsafe.Pointer) C.int {
1218 log.Debug("go_on_connect_callback BEGIN")
1219 defer log.Debug("go_on_connect_callback END")
1220 context.lock.RLock()
1221 defer context.lock.RUnlock()
1223 // Get memif reference.
1224 ifIndex := int(uintptr(privateCtx))
1225 memif, exists := context.memifs[ifIndex]
1227 return C.int(ErrNoConn.Code())
1230 // Initialize Rx/Tx queues.
1231 err := memif.initQueues()
1233 if memifErr, ok := err.(*MemifError); ok {
1234 return C.int(memifErr.Code())
1236 return C.int(ErrUnknown.Code())
1239 // Call the user callback.
1240 if memif.callbacks.OnConnect != nil {
1241 memif.callbacks.OnConnect(memif)
1244 // Start polling the RX queues for interrupts.
1245 for i := 0; i < len(memif.queueIntCh); i++ {
1247 go pollRxQueue(memif, uint8(i))
1253 //export go_on_disconnect_callback
1254 func go_on_disconnect_callback(privateCtx unsafe.Pointer) C.int {
1255 log.Debug("go_on_disconnect_callback BEGIN")
1256 defer log.Debug("go_on_disconnect_callback END")
1257 context.lock.RLock()
1258 defer context.lock.RUnlock()
1260 // Get memif reference.
1261 ifIndex := int(uintptr(privateCtx))
1262 memif, exists := context.memifs[ifIndex]
1268 // Stop polling the RX queues for interrupts.
1269 buf := make([]byte, 8)
1270 binary.PutUvarint(buf, 1)
1272 _, err := syscall.Write(memif.stopQPollFd, buf[:])
1274 return C.int(ErrSyscall.Code())
1278 // - remove the event
1279 _, err = syscall.Read(memif.stopQPollFd, buf[:])
1281 return C.int(ErrSyscall.Code())
1284 // Call the user callback.
1285 if memif.callbacks.OnDisconnect != nil {
1286 memif.callbacks.OnDisconnect(memif)
1289 // Close Rx/Tx queues.