1bb922996ea648ca84844725182fcce39a40b39a
[govpp.git] / extras / libmemif / adapter.go
1 // Copyright (c) 2017 Cisco and/or its affiliates.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
6 //
7 //       http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 // +build !windows,!darwin
16
17 package libmemif
18
19 import (
20         "encoding/binary"
21         "os"
22         "sync"
23         "syscall"
24         "unsafe"
25
26         logger "github.com/sirupsen/logrus"
27 )
28
29 /*
30 #cgo LDFLAGS: -lmemif
31
32 #include <unistd.h>
33 #include <stdlib.h>
34 #include <stdint.h>
35 #include <string.h>
36 #include <sys/eventfd.h>
37 #include <libmemif.h>   // <-- VPP must be installed!
38
39 // Feature tests.
40 #ifndef MEMIF_HAVE_CANCEL_POLL_EVENT
41 // memif_cancel_poll_event that simply returns ErrUnsupported.
42 static int
43 memif_cancel_poll_event ()
44 {
45         return 102; // ErrUnsupported
46 }
47 #endif
48
49 // govpp_memif_conn_args_t replaces fixed sized arrays with C-strings which
50 // are much easier to work with in cgo.
51 typedef struct
52 {
53         char *socket_filename;
54         char *secret;
55         uint8_t num_s2m_rings;
56         uint8_t num_m2s_rings;
57         uint16_t buffer_size;
58         uint8_t log2_ring_size;
59         uint8_t is_master;
60         uint32_t interface_id;
61         char *interface_name;
62         memif_interface_mode_t mode;
63 } govpp_memif_conn_args_t;
64
65 // govpp_memif_details_t replaces strings represented with (uint8_t *)
66 // to the standard and easy to work with in cgo: (char *)
67 typedef struct
68 {
69         char *if_name;
70         char *inst_name;
71         char *remote_if_name;
72         char *remote_inst_name;
73         uint32_t id;
74         char *secret;
75         uint8_t role;
76         uint8_t mode;
77         char *socket_filename;
78         uint8_t regions_num;
79         memif_region_details_t *regions;
80         uint8_t rx_queues_num;
81         uint8_t tx_queues_num;
82         memif_queue_details_t *rx_queues;
83         memif_queue_details_t *tx_queues;
84         uint8_t link_up_down;
85 } govpp_memif_details_t;
86
87 extern int go_on_connect_callback(void *privateCtx);
88 extern int go_on_disconnect_callback(void *privateCtx);
89
90 // Callbacks strip the connection handle away.
91
92 static int
93 govpp_on_connect_callback(memif_conn_handle_t conn, void *private_ctx)
94 {
95         return go_on_connect_callback(private_ctx);
96 }
97
98 static int
99 govpp_on_disconnect_callback(memif_conn_handle_t conn, void *private_ctx)
100 {
101         return go_on_disconnect_callback(private_ctx);
102 }
103
104 // govpp_memif_create uses govpp_memif_conn_args_t.
105 static int
106 govpp_memif_create (memif_conn_handle_t *conn, govpp_memif_conn_args_t *go_args,
107                     void *private_ctx)
108 {
109         memif_conn_args_t args;
110         memset (&args, 0, sizeof (args));
111         args.socket_filename = (char *)go_args->socket_filename;
112         if (go_args->secret != NULL)
113         {
114                 strncpy ((char *)args.secret, go_args->secret,
115                                  sizeof (args.secret) - 1);
116         }
117         args.num_s2m_rings = go_args->num_s2m_rings;
118         args.num_m2s_rings = go_args->num_m2s_rings;
119         args.buffer_size = go_args->buffer_size;
120         args.log2_ring_size = go_args->log2_ring_size;
121         args.is_master = go_args->is_master;
122         args.interface_id = go_args->interface_id;
123         if (go_args->interface_name != NULL)
124         {
125                 strncpy ((char *)args.interface_name, go_args->interface_name,
126                                  sizeof(args.interface_name) - 1);
127         }
128         args.mode = go_args->mode;
129
130         return memif_create(conn, &args, govpp_on_connect_callback,
131                                                 govpp_on_disconnect_callback, NULL,
132                                                 private_ctx);
133 }
134
135 // govpp_memif_get_details keeps reallocating buffer until it is large enough.
136 // The buffer is returned to be deallocated when it is no longer needed.
137 static int
138 govpp_memif_get_details (memif_conn_handle_t conn, govpp_memif_details_t *govpp_md,
139                          char **buf)
140 {
141         int rv = 0;
142         size_t buflen = 1 << 7;
143         char *buffer = NULL, *new_buffer = NULL;
144         memif_details_t md = {0};
145
146         do {
147                 // initial malloc (256 bytes) or realloc
148                 buflen <<= 1;
149                 new_buffer = realloc(buffer, buflen);
150                 if (new_buffer == NULL)
151                 {
152                         free(buffer);
153                         return MEMIF_ERR_NOMEM;
154                 }
155                 buffer = new_buffer;
156                 // try to get details
157                 rv = memif_get_details(conn, &md, buffer, buflen);
158         } while (rv == MEMIF_ERR_NOBUF_DET);
159
160         if (rv == 0)
161         {
162                 *buf = buffer;
163                 govpp_md->if_name = (char *)md.if_name;
164                 govpp_md->inst_name = (char *)md.inst_name;
165                 govpp_md->remote_if_name = (char *)md.remote_if_name;
166                 govpp_md->remote_inst_name = (char *)md.remote_inst_name;
167                 govpp_md->id = md.id;
168                 govpp_md->secret = (char *)md.secret;
169                 govpp_md->role = md.role;
170                 govpp_md->mode = md.mode;
171                 govpp_md->socket_filename = (char *)md.socket_filename;
172                 govpp_md->regions_num = md.regions_num;
173                 govpp_md->regions = md.regions;
174                 govpp_md->rx_queues_num = md.rx_queues_num;
175                 govpp_md->tx_queues_num = md.tx_queues_num;
176                 govpp_md->rx_queues = md.rx_queues;
177                 govpp_md->tx_queues = md.tx_queues;
178                 govpp_md->link_up_down = md.link_up_down;
179         }
180         else
181                 free(buffer);
182         return rv;
183 }
184
185 // Used to avoid cumbersome tricks that use unsafe.Pointer() + unsafe.Sizeof()
186 // or even cast C-array directly into Go-slice.
187 static memif_queue_details_t
188 govpp_get_rx_queue_details (govpp_memif_details_t *md, int index)
189 {
190         return md->rx_queues[index];
191 }
192
193 // Used to avoid cumbersome tricks that use unsafe.Pointer() + unsafe.Sizeof()
194 // or even cast C-array directly into Go-slice.
195 static memif_queue_details_t
196 govpp_get_tx_queue_details (govpp_memif_details_t *md, int index)
197 {
198         return md->tx_queues[index];
199 }
200
201 // Copy packet data into the selected buffer with splitting when necessary
202 static void
203 govpp_copy_packet_data(memif_buffer_t *buffers, uint16_t allocated, int bufIndex, void *packetData, uint16_t packetSize)
204 {
205         int dataOffset = 0;
206
207         do {
208                 buffers[bufIndex].len = (packetSize > buffers[bufIndex].len ? buffers[bufIndex].len : packetSize);
209                 void * curData = (packetData + dataOffset);
210                 memcpy(buffers[bufIndex].data, curData, (size_t)buffers[bufIndex].len);
211                 dataOffset += buffers[bufIndex].len;
212                 bufIndex += 1;
213                 packetSize -= buffers[bufIndex].len;
214         } while(packetSize > 0 && bufIndex < allocated && buffers[bufIndex].flags & MEMIF_BUFFER_FLAG_NEXT > 0);
215 }
216
217 // Get packet data from the selected buffer.
218 // Used to avoid an ugly unsafe.Pointer() + unsafe.Sizeof().
219 static void *
220 govpp_get_packet_data(memif_buffer_t *buffers, int index, int *size)
221 {
222         *size = (int)buffers[index].len;
223         return buffers[index].data;
224 }
225
226 // Checks if memif buffer is chained
227 static int
228 govpp_is_buffer_chained(memif_buffer_t *buffers, int index)
229 {
230     return buffers[index].flags & MEMIF_BUFFER_FLAG_NEXT;
231 }
232
233 // Allocate memif buffers and return pointer to next free buffer
234 static int
235 govpp_memif_buffer_alloc(memif_conn_handle_t conn, uint16_t qid,
236                         memif_buffer_t * bufs, uint16_t offset, memif_buffer_t ** nextFreeBuf,
237                         uint16_t count, uint16_t * count_out, uint16_t size)
238 {
239     memif_buffer_t * offsetBufs = (bufs + offset);
240     int err = memif_buffer_alloc(conn, qid, offsetBufs, count, count_out, size);
241     *count_out += offset;
242     *nextFreeBuf = offsetBufs;
243     return err;
244 }
245
246 */
247 import "C"
248
249 // IfMode represents the mode (layer/behaviour) in which the interface operates.
250 type IfMode int
251
252 const (
253         // IfModeEthernet tells memif to operate on the L2 layer.
254         IfModeEthernet IfMode = iota
255
256         // IfModeIP tells memif to operate on the L3 layer.
257         IfModeIP
258
259         // IfModePuntInject tells memif to behave as Inject/Punt interface.
260         IfModePuntInject
261 )
262
263 // RxMode is used to switch between polling and interrupt for RX.
264 type RxMode int
265
266 const (
267         // RxModeInterrupt tells libmemif to send interrupt signal when data are available.
268         RxModeInterrupt RxMode = iota
269
270         // RxModePolling means that the user needs to explicitly poll for data on RX
271         // queues.
272         RxModePolling
273 )
274
275 // RawPacketData represents raw packet data. libmemif doesn't care what the
276 // actual content is, it only manipulates with raw bytes.
277 type RawPacketData []byte
278
279 // MemifMeta is used to store a basic memif metadata needed for identification
280 // and connection establishment.
281 type MemifMeta struct {
282         // IfName is the interface name. Has to be unique across all created memifs.
283         // Interface name is truncated if needed to have no more than 32 characters.
284         IfName string
285
286         // InstanceName identifies the endpoint. If omitted, the application
287         // name passed to Init() will be used instead.
288         // Instance name is truncated if needed to have no more than 32 characters.
289         InstanceName string
290
291         // ConnID is a connection ID used to match opposite sides of the memif
292         // connection.
293         ConnID uint32
294
295         // SocketFilename is the filename of the AF_UNIX socket through which
296         // the connection is established.
297         // The string is truncated if neede to fit into sockaddr_un.sun_path
298         // (108 characters on Linux).
299         SocketFilename string
300
301         // Secret must be the same on both sides for the authentication to succeed.
302         // Empty string is allowed.
303         // The secret is truncated if needed to have no more than 24 characters.
304         Secret string
305
306         // IsMaster is set to true if memif operates in the Master mode.
307         IsMaster bool
308
309         // Mode is the mode (layer/behaviour) in which the memif operates.
310         Mode IfMode
311 }
312
313 // MemifShmSpecs is used to store the specification of the shared memory segment
314 // used by memif to send/receive packets.
315 type MemifShmSpecs struct {
316         // NumRxQueues is the number of Rx queues.
317         // Default is 1 (used if the value is 0).
318         NumRxQueues uint8
319
320         // NumTxQueues is the number of Tx queues.
321         // Default is 1 (used if the value is 0).
322         NumTxQueues uint8
323
324         // BufferSize is the size of the buffer to hold one packet, or a single
325         // fragment of a jumbo frame. Default is 2048 (used if the value is 0).
326         BufferSize uint16
327
328         // Log2RingSize is the number of items in the ring represented through
329         // the logarithm base 2.
330         // Default is 10 (used if the value is 0).
331         Log2RingSize uint8
332 }
333
334 // MemifConfig is the memif configuration.
335 // Used as the input argument to CreateInterface().
336 // It is the slave's config that mostly decides the parameters of the connection,
337 // but master may limit some of the quantities if needed (based on the memif
338 // protocol or master's configuration)
339 type MemifConfig struct {
340         MemifMeta
341         MemifShmSpecs
342 }
343
344 // ConnUpdateCallback is a callback type declaration used with callbacks
345 // related to connection status changes.
346 type ConnUpdateCallback func(memif *Memif) (err error)
347
348 // MemifCallbacks is a container for all callbacks provided by memif.
349 // Any callback can be nil, in which case it will be simply skipped.
350 // Important: Do not call CreateInterface() or Memif.Close() from within a callback
351 // or a deadlock will occur. Instead send signal through a channel to another
352 // go routine which will be able to create/remove memif interface(s).
353 type MemifCallbacks struct {
354         // OnConnect is triggered when a connection for a given memif was established.
355         OnConnect ConnUpdateCallback
356
357         // OnDisconnect is triggered when a connection for a given memif was lost.
358         OnDisconnect ConnUpdateCallback
359 }
360
361 // Memif represents a single memif interface. It provides methods to send/receive
362 // packets in bursts in either the polling mode or in the interrupt mode with
363 // the help of golang channels.
364 type Memif struct {
365         MemifMeta
366
367         // Per-library references
368         ifIndex int                   // index used in the Go-libmemif context (Context.memifs)
369         cHandle C.memif_conn_handle_t // handle used in C-libmemif
370
371         // Callbacks
372         callbacks *MemifCallbacks
373
374         // Interrupt
375         intCh      chan uint8      // memif-global interrupt channel (value = queue ID)
376         queueIntCh []chan struct{} // per RX queue interrupt channel
377
378         // Rx/Tx queues
379         ringSize    int              // number of items in each ring
380         bufferSize  int              // max buffer size
381         stopQPollFd int              // event file descriptor used to stop pollRxQueue-s
382         wg          sync.WaitGroup   // wait group for all pollRxQueue-s
383         rxQueueBufs []CPacketBuffers // an array of C-libmemif packet buffers for each RX queue
384         txQueueBufs []CPacketBuffers // an array of C-libmemif packet buffers for each TX queue
385 }
386
387 // MemifDetails provides a detailed runtime information about a memif interface.
388 type MemifDetails struct {
389         MemifMeta
390         MemifConnDetails
391 }
392
393 // MemifConnDetails provides a detailed runtime information about a memif
394 // connection.
395 type MemifConnDetails struct {
396         // RemoteIfName is the name of the memif on the opposite side.
397         RemoteIfName string
398         // RemoteInstanceName is the name of the endpoint on the opposite side.
399         RemoteInstanceName string
400         // HasLink is true if the connection has link (= is established and functional).
401         HasLink bool
402         // RxQueues contains details for each Rx queue.
403         RxQueues []MemifQueueDetails
404         // TxQueues contains details for each Tx queue.
405         TxQueues []MemifQueueDetails
406 }
407
408 // MemifQueueDetails provides a detailed runtime information about a memif queue.
409 // Queue = Ring + the associated buffers (one directional).
410 type MemifQueueDetails struct {
411         // QueueID is the ID of the queue.
412         QueueID uint8
413         // RingSize is the number of slots in the ring (not logarithmic).
414         RingSize uint32
415         // BufferSize is the size of each buffer pointed to from the ring slots.
416         BufferSize uint16
417         /* Further ring information TO-BE-ADDED when C-libmemif supports them. */
418 }
419
420 // CPacketBuffers stores an array of memif buffers for use with TxBurst or RxBurst.
421 type CPacketBuffers struct {
422         buffers    *C.memif_buffer_t
423         count      int
424         rxChainBuf []RawPacketData
425 }
426
427 // Context is a global Go-libmemif runtime context.
428 type Context struct {
429         lock           sync.RWMutex
430         initialized    bool
431         memifs         map[int] /* ifIndex */ *Memif /* slice of all active memif interfaces */
432         nextMemifIndex int
433
434         wg sync.WaitGroup /* wait-group for pollEvents() */
435 }
436
437 type txPacketBuffer struct {
438         packets []RawPacketData
439         size    int
440 }
441
442 var (
443         // logger used by the adapter.
444         log *logger.Logger
445
446         // Global Go-libmemif context.
447         context = &Context{initialized: false}
448 )
449
450 // init initializes global logger, which logs debug level messages to stdout.
451 func init() {
452         log = logger.New()
453         log.Out = os.Stdout
454         log.Level = logger.DebugLevel
455 }
456
457 // SetLogger changes the logger for Go-libmemif to the provided one.
458 // The logger is not used for logging of C-libmemif.
459 func SetLogger(l *logger.Logger) {
460         log = l
461 }
462
463 // Init initializes the libmemif library. Must by called exactly once and before
464 // any libmemif functions. Do not forget to call Cleanup() before exiting
465 // your application.
466 // <appName> should be a human-readable string identifying your application.
467 // For example, VPP returns the version information ("show version" from VPP CLI).
468 func Init(appName string) error {
469         context.lock.Lock()
470         defer context.lock.Unlock()
471
472         if context.initialized {
473                 return ErrAlreadyInit
474         }
475
476         log.Debug("Initializing libmemif library")
477
478         // Initialize C-libmemif.
479         var errCode int
480         if appName == "" {
481                 errCode = int(C.memif_init(nil, nil, nil, nil, nil))
482         } else {
483                 appName := C.CString(appName)
484                 defer C.free(unsafe.Pointer(appName))
485                 errCode = int(C.memif_init(nil, appName, nil, nil, nil))
486         }
487         err := getMemifError(errCode)
488         if err != nil {
489                 return err
490         }
491
492         // Initialize the map of memory interfaces.
493         context.memifs = make(map[int]*Memif)
494
495         // Start event polling.
496         context.wg.Add(1)
497         go pollEvents()
498
499         context.initialized = true
500         log.Debug("libmemif library was initialized")
501         return err
502 }
503
504 // Cleanup cleans up all the resources allocated by libmemif.
505 func Cleanup() error {
506         context.lock.Lock()
507         defer context.lock.Unlock()
508
509         if !context.initialized {
510                 return ErrNotInit
511         }
512
513         log.Debug("Closing libmemif library")
514
515         // Delete all active interfaces.
516         for _, memif := range context.memifs {
517                 memif.Close()
518         }
519
520         // Stop the event loop (if supported by C-libmemif).
521         errCode := C.memif_cancel_poll_event()
522         err := getMemifError(int(errCode))
523         if err == nil {
524                 log.Debug("Waiting for pollEvents() to stop...")
525                 context.wg.Wait()
526                 log.Debug("pollEvents() has stopped...")
527         } else {
528                 log.WithField("err", err).Debug("NOT Waiting for pollEvents to stop...")
529         }
530
531         // Run cleanup for C-libmemif.
532         err = getMemifError(int(C.memif_cleanup()))
533         if err == nil {
534                 context.initialized = false
535                 log.Debug("libmemif library was closed")
536         }
537         return err
538 }
539
540 // CreateInterface creates a new memif interface with the given configuration.
541 // The same callbacks can be used with multiple memifs. The first callback input
542 // argument (*Memif) can be used to tell which memif the callback was triggered for.
543 // The method is thread-safe.
544 func CreateInterface(config *MemifConfig, callbacks *MemifCallbacks) (memif *Memif, err error) {
545         context.lock.Lock()
546         defer context.lock.Unlock()
547
548         if !context.initialized {
549                 return nil, ErrNotInit
550         }
551
552         log.WithField("ifName", config.IfName).Debug("Creating a new memif interface")
553
554         log2RingSize := config.Log2RingSize
555         if log2RingSize == 0 {
556                 log2RingSize = 10
557         }
558
559         bufferSize := config.BufferSize
560         if bufferSize <= 0 {
561                 bufferSize = 2048
562         }
563
564         // Create memif-wrapper for Go-libmemif.
565         memif = &Memif{
566                 MemifMeta:  config.MemifMeta,
567                 callbacks:  &MemifCallbacks{},
568                 ifIndex:    context.nextMemifIndex,
569                 ringSize:   1 << log2RingSize,
570                 bufferSize: int(bufferSize),
571         }
572
573         // Initialize memif callbacks.
574         if callbacks != nil {
575                 memif.callbacks.OnConnect = callbacks.OnConnect
576                 memif.callbacks.OnDisconnect = callbacks.OnDisconnect
577         }
578
579         // Initialize memif-global interrupt channel.
580         memif.intCh = make(chan uint8, 1<<6)
581
582         // Initialize event file descriptor for stopping Rx/Tx queue polling.
583         memif.stopQPollFd = int(C.eventfd(0, C.EFD_NONBLOCK))
584         if memif.stopQPollFd < 0 {
585                 return nil, ErrSyscall
586         }
587
588         // Initialize memif input arguments.
589         args := &C.govpp_memif_conn_args_t{}
590         // - socket file name
591         if config.SocketFilename != "" {
592                 args.socket_filename = C.CString(config.SocketFilename)
593                 defer C.free(unsafe.Pointer(args.socket_filename))
594         }
595         // - interface ID
596         args.interface_id = C.uint32_t(config.ConnID)
597         // - interface name
598         if config.IfName != "" {
599                 args.interface_name = C.CString(config.IfName)
600                 defer C.free(unsafe.Pointer(args.interface_name))
601         }
602         // - mode
603         switch config.Mode {
604         case IfModeEthernet:
605                 args.mode = C.MEMIF_INTERFACE_MODE_ETHERNET
606         case IfModeIP:
607                 args.mode = C.MEMIF_INTERFACE_MODE_IP
608         case IfModePuntInject:
609                 args.mode = C.MEMIF_INTERFACE_MODE_PUNT_INJECT
610         default:
611                 args.mode = C.MEMIF_INTERFACE_MODE_ETHERNET
612         }
613         // - secret
614         if config.Secret != "" {
615                 args.secret = C.CString(config.Secret)
616                 defer C.free(unsafe.Pointer(args.secret))
617         }
618         // - master/slave flag + number of Rx/Tx queues
619         if config.IsMaster {
620                 args.num_s2m_rings = C.uint8_t(config.NumRxQueues)
621                 args.num_m2s_rings = C.uint8_t(config.NumTxQueues)
622                 args.is_master = C.uint8_t(1)
623         } else {
624                 args.num_s2m_rings = C.uint8_t(config.NumTxQueues)
625                 args.num_m2s_rings = C.uint8_t(config.NumRxQueues)
626                 args.is_master = C.uint8_t(0)
627         }
628         // - buffer size
629         args.buffer_size = C.uint16_t(config.BufferSize)
630         // - log_2(ring size)
631         args.log2_ring_size = C.uint8_t(config.Log2RingSize)
632
633         // Create memif in C-libmemif.
634         errCode := C.govpp_memif_create(&memif.cHandle, args, unsafe.Pointer(uintptr(memif.ifIndex)))
635         err = getMemifError(int(errCode))
636         if err != nil {
637                 return nil, err
638         }
639
640         // Register the new memif.
641         context.memifs[memif.ifIndex] = memif
642         context.nextMemifIndex++
643         log.WithField("ifName", config.IfName).Debug("A new memif interface was created")
644
645         return memif, nil
646 }
647
648 // GetInterruptChan returns a channel which is continuously being filled with
649 // IDs of queues with data ready to be received.
650 // Since there is only one interrupt signal sent for an entire burst of packets,
651 // an interrupt handling routine should repeatedly call RxBurst() until
652 // the function returns an empty slice of packets. This way it is ensured
653 // that there are no packets left on the queue unread when the interrupt signal
654 // is cleared.
655 // The method is thread-safe.
656 func (memif *Memif) GetInterruptChan() (ch <-chan uint8 /* queue ID */) {
657         return memif.intCh
658 }
659
660 // GetQueueInterruptChan returns an empty-data channel which fires every time
661 // there are data to read on a given queue.
662 // It is only valid to call this function if memif is in the connected state.
663 // Channel is automatically closed when the connection goes down (but after
664 // the user provided callback OnDisconnect has executed).
665 // Since there is only one interrupt signal sent for an entire burst of packets,
666 // an interrupt handling routine should repeatedly call RxBurst() until
667 // the function returns an empty slice of packets. This way it is ensured
668 // that there are no packets left on the queue unread when the interrupt signal
669 // is cleared.
670 // The method is thread-safe.
671 func (memif *Memif) GetQueueInterruptChan(queueID uint8) (ch <-chan struct{}, err error) {
672         if int(queueID) >= len(memif.queueIntCh) {
673                 return nil, ErrQueueID
674         }
675         return memif.queueIntCh[queueID], nil
676 }
677
678 // SetRxMode allows to switch between the interrupt and the polling mode for Rx.
679 // The method is thread-safe.
680 func (memif *Memif) SetRxMode(queueID uint8, rxMode RxMode) (err error) {
681         var cRxMode C.memif_rx_mode_t
682         switch rxMode {
683         case RxModeInterrupt:
684                 cRxMode = C.MEMIF_RX_MODE_INTERRUPT
685         case RxModePolling:
686                 cRxMode = C.MEMIF_RX_MODE_POLLING
687         default:
688                 cRxMode = C.MEMIF_RX_MODE_INTERRUPT
689         }
690         errCode := C.memif_set_rx_mode(memif.cHandle, cRxMode, C.uint16_t(queueID))
691         return getMemifError(int(errCode))
692 }
693
694 // GetDetails returns a detailed runtime information about this memif.
695 // The method is thread-safe.
696 func (memif *Memif) GetDetails() (details *MemifDetails, err error) {
697         cDetails := C.govpp_memif_details_t{}
698         var buf *C.char
699
700         // Get memif details from C-libmemif.
701         errCode := C.govpp_memif_get_details(memif.cHandle, &cDetails, &buf)
702         err = getMemifError(int(errCode))
703         if err != nil {
704                 return nil, err
705         }
706         defer C.free(unsafe.Pointer(buf))
707
708         // Convert details from C to Go.
709         details = &MemifDetails{}
710         // - metadata:
711         details.IfName = C.GoString(cDetails.if_name)
712         details.InstanceName = C.GoString(cDetails.inst_name)
713         details.ConnID = uint32(cDetails.id)
714         details.SocketFilename = C.GoString(cDetails.socket_filename)
715         if cDetails.secret != nil {
716                 details.Secret = C.GoString(cDetails.secret)
717         }
718         details.IsMaster = cDetails.role == C.uint8_t(0)
719         switch cDetails.mode {
720         case C.MEMIF_INTERFACE_MODE_ETHERNET:
721                 details.Mode = IfModeEthernet
722         case C.MEMIF_INTERFACE_MODE_IP:
723                 details.Mode = IfModeIP
724         case C.MEMIF_INTERFACE_MODE_PUNT_INJECT:
725                 details.Mode = IfModePuntInject
726         default:
727                 details.Mode = IfModeEthernet
728         }
729         // - connection details:
730         details.RemoteIfName = C.GoString(cDetails.remote_if_name)
731         details.RemoteInstanceName = C.GoString(cDetails.remote_inst_name)
732         details.HasLink = cDetails.link_up_down == C.uint8_t(1)
733         // - RX queues:
734         var i uint8
735         for i = 0; i < uint8(cDetails.rx_queues_num); i++ {
736                 cRxQueue := C.govpp_get_rx_queue_details(&cDetails, C.int(i))
737                 queueDetails := MemifQueueDetails{
738                         QueueID:    uint8(cRxQueue.qid),
739                         RingSize:   uint32(cRxQueue.ring_size),
740                         BufferSize: uint16(cRxQueue.buffer_size),
741                 }
742                 details.RxQueues = append(details.RxQueues, queueDetails)
743         }
744         // - TX queues:
745         for i = 0; i < uint8(cDetails.tx_queues_num); i++ {
746                 cTxQueue := C.govpp_get_tx_queue_details(&cDetails, C.int(i))
747                 queueDetails := MemifQueueDetails{
748                         QueueID:    uint8(cTxQueue.qid),
749                         RingSize:   uint32(cTxQueue.ring_size),
750                         BufferSize: uint16(cTxQueue.buffer_size),
751                 }
752                 details.TxQueues = append(details.TxQueues, queueDetails)
753         }
754
755         return details, nil
756 }
757
758 // TxBurst is used to send multiple packets in one call into a selected queue.
759 // The actual number of packets sent may be smaller and is returned as <count>.
760 // The method is non-blocking even if the ring is full and no packet can be sent.
761 // It is only valid to call this function if memif is in the connected state.
762 // Multiple TxBurst-s can run concurrently provided that each targets a different
763 // TX queue.
764 func (memif *Memif) TxBurst(queueID uint8, packets []RawPacketData) (count uint16, err error) {
765         if len(packets) == 0 {
766                 return 0, nil
767         }
768
769         if int(queueID) >= len(memif.txQueueBufs) {
770                 return 0, ErrQueueID
771         }
772
773         var bufCount int
774         var buffers []*txPacketBuffer
775         cQueueID := C.uint16_t(queueID)
776
777         for _, packet := range packets {
778                 packetLen := len(packet)
779                 log.Debugf("%v - preparing packet with len %v", cQueueID, packetLen)
780
781                 if packetLen > memif.bufferSize {
782                         // Create jumbo buffer
783                         buffer := &txPacketBuffer{
784                                 size:    packetLen,
785                                 packets: []RawPacketData{packet},
786                         }
787
788                         buffers = append(buffers, buffer)
789
790                         // Increment bufCount by number of splits in this jumbo
791                         bufCount += (buffer.size + memif.bufferSize - 1) / memif.bufferSize
792                 } else {
793                         buffersLen := len(buffers)
794
795                         // This is very first buffer so there is no data to append to, prepare empty one
796                         if buffersLen == 0 {
797                                 buffers = []*txPacketBuffer{{}}
798                                 buffersLen = 1
799                         }
800
801                         lastBuffer := buffers[buffersLen-1]
802
803                         // Last buffer is jumbo buffer, create new buffer
804                         if lastBuffer.size > memif.bufferSize {
805                                 lastBuffer = &txPacketBuffer{}
806                                 buffers = append(buffers, lastBuffer)
807                         }
808
809                         // Determine buffer size by max packet size in buffer
810                         if packetLen > lastBuffer.size {
811                                 lastBuffer.size = packetLen
812                         }
813
814                         lastBuffer.packets = append(lastBuffer.packets, packet)
815                         bufCount += 1
816                 }
817         }
818
819         // Reallocate Tx buffers if needed to fit the input packets.
820         log.Debugf("%v - total buffer to allocate count %v", cQueueID, bufCount)
821         pb := &memif.txQueueBufs[queueID]
822         if pb.count < bufCount {
823                 newBuffers := C.realloc(unsafe.Pointer(pb.buffers), C.size_t(bufCount*int(C.sizeof_memif_buffer_t)))
824                 if newBuffers == nil {
825                         // Realloc failed, <count> will be less than len(packets).
826                         bufCount = pb.count
827                 } else {
828                         pb.buffers = (*C.memif_buffer_t)(newBuffers)
829                         pb.count = bufCount
830                 }
831         }
832
833         // Allocate ring slots.
834         var allocated C.uint16_t
835         var subCount C.uint16_t
836         for _, buffer := range buffers {
837                 packetCount := C.uint16_t(len(buffer.packets))
838                 isJumbo := buffer.size > memif.bufferSize
839
840                 log.Debugf("%v - trying to send max buff size %v, packets len %v, buffer len %v, jumbo %v",
841                         cQueueID, buffer.size, len(buffer.packets), packetCount, isJumbo)
842
843                 var nextFreeBuff *C.memif_buffer_t
844                 startOffset := allocated
845                 errCode := C.govpp_memif_buffer_alloc(memif.cHandle, cQueueID, pb.buffers, startOffset, &nextFreeBuff,
846                         packetCount, &allocated, C.uint16_t(buffer.size))
847
848                 err = getMemifError(int(errCode))
849                 endEarly := err == ErrNoBufRing
850                 if endEarly {
851                         // Not enough ring slots, <count> will be less than packetCount.
852                         err = nil
853                 }
854                 if err != nil {
855                         return 0, err
856                 }
857
858                 // Copy packet data into the buffers.
859                 nowAllocated := allocated - startOffset
860                 toFill := nowAllocated
861                 if !isJumbo {
862                         // If this is not jumbo frame, only 1 packet needs to be copied each iteration
863                         toFill = 1
864                 }
865
866                 // Iterate over all packets and try to fill them into allocated buffers
867                 // If packet is jumbo frame, continue filling to allocated buffers until no buffer is left
868                 for i, packet := range buffer.packets {
869                         if i >= int(nowAllocated) {
870                                 // There was less allocated buffers than actual packet count so exit early
871                                 break
872                         }
873
874                         packetData := unsafe.Pointer(&packet[0])
875                         C.govpp_copy_packet_data(nextFreeBuff, toFill, C.int(i), packetData, C.uint16_t(len(packet)))
876                 }
877
878                 if isJumbo && nowAllocated > 0 {
879                         // If we successfully allocated required amount of buffers for entire jumbo to be sent
880                         // simply sub entire amount of jumbo frame packets and leave only 1 so sender will think
881                         // it only sent 1 packet so it does not need to know anything about jumbo frames
882                         subCount += nowAllocated - 1
883                 }
884
885                 // If we do not have enough buffers left to allocate, simply end here to avoid packet loss and try
886                 // to handle it next burst
887                 if endEarly {
888                         break
889                 }
890         }
891
892         var sentCount C.uint16_t
893         errCode := C.memif_tx_burst(memif.cHandle, cQueueID, pb.buffers, allocated, &sentCount)
894         err = getMemifError(int(errCode))
895         if err != nil {
896                 return 0, err
897         }
898
899         // Prevent negative values
900         realSent := uint16(sentCount) - uint16(subCount)
901         if subCount > sentCount {
902                 sentCount = 0
903         }
904
905         log.Debugf("%v - sent %v total allocated buffs %v", cQueueID, sentCount, allocated)
906         return realSent, nil
907 }
908
909 // RxBurst is used to receive multiple packets in one call from a selected queue.
910 // <count> is the number of packets to receive. The actual number of packets
911 // received may be smaller. <count> effectively limits the maximum number
912 // of packets to receive in one burst (for a flat, predictable memory usage).
913 // The method is non-blocking even if there are no packets to receive.
914 // It is only valid to call this function if memif is in the connected state.
915 // Multiple RxBurst-s can run concurrently provided that each targets a different
916 // Rx queue.
917 func (memif *Memif) RxBurst(queueID uint8, count uint16) (packets []RawPacketData, err error) {
918         var recvCount C.uint16_t
919
920         if count == 0 {
921                 return packets, nil
922         }
923
924         if int(queueID) >= len(memif.rxQueueBufs) {
925                 return packets, ErrQueueID
926         }
927
928         // Reallocate Rx buffers if needed to fit the output packets.
929         pb := &memif.rxQueueBufs[queueID]
930         bufCount := int(count)
931         if pb.count < bufCount {
932                 newBuffers := C.realloc(unsafe.Pointer(pb.buffers), C.size_t(bufCount*int(C.sizeof_memif_buffer_t)))
933                 if newBuffers == nil {
934                         // Realloc failed, len(<packets>) will be certainly less than <count>.
935                         bufCount = pb.count
936                 } else {
937                         pb.buffers = (*C.memif_buffer_t)(newBuffers)
938                         pb.count = bufCount
939                 }
940         }
941
942         cQueueID := C.uint16_t(queueID)
943         errCode := C.memif_rx_burst(memif.cHandle, cQueueID, pb.buffers, C.uint16_t(bufCount), &recvCount)
944         err = getMemifError(int(errCode))
945         if err == ErrNoBuf {
946                 // More packets to read - the user is expected to run RxBurst() until there
947                 // are no more packets to receive.
948                 err = nil
949         }
950         if err != nil {
951                 return packets, err
952         }
953
954         chained := len(pb.rxChainBuf) > 0
955         if chained {
956                 // We had stored data from previous burst because last buffer in previous burst was chained
957                 // so we need to continue appending to this data
958                 packets = pb.rxChainBuf
959                 pb.rxChainBuf = nil
960         }
961
962         // Copy packet data into the instances of RawPacketData.
963         for i := 0; i < int(recvCount); i++ {
964                 var packetSize C.int
965                 packetData := C.govpp_get_packet_data(pb.buffers, C.int(i), &packetSize)
966                 packetBytes := C.GoBytes(packetData, packetSize)
967
968                 if chained {
969                         // We have chained buffers, so start merging packet data with last read packet
970                         prevPacket := packets[len(packets)-1]
971                         packets[len(packets)-1] = append(prevPacket, packetBytes...)
972                 } else {
973                         packets = append(packets, packetBytes)
974                 }
975
976                 // Mark last buffer as chained based on property on current buffer so next buffers
977                 // will try to append data to this one in case we got jumbo frame
978                 chained = C.govpp_is_buffer_chained(pb.buffers, C.int(i)) > 0
979         }
980
981         if recvCount > 0 {
982                 errCode = C.memif_refill_queue(memif.cHandle, cQueueID, recvCount, 0)
983         }
984         err = getMemifError(int(errCode))
985         if err != nil {
986                 // Throw away packets to avoid duplicities.
987                 packets = nil
988         }
989
990         if chained {
991                 // We did not had enough space to process all chained buffers to the end so simply tell
992                 // reader that it should not process any packets here and save them for next burst
993                 // to finish reading the buffer chain
994                 pb.rxChainBuf = packets
995                 packets = nil
996                 err = ErrNoBuf
997         }
998
999         return packets, err
1000 }
1001
1002 // Close removes the memif interface. If the memif is in the connected state,
1003 // the connection is first properly closed.
1004 // Do not access memif after it is closed, let garbage collector to remove it.
1005 func (memif *Memif) Close() error {
1006         log.WithField("ifName", memif.IfName).Debug("Closing the memif interface")
1007
1008         // Delete memif from C-libmemif.
1009         err := getMemifError(int(C.memif_delete(&memif.cHandle)))
1010
1011         if err != nil {
1012                 // Close memif-global interrupt channel.
1013                 close(memif.intCh)
1014                 // Close file descriptor stopQPollFd.
1015                 C.close(C.int(memif.stopQPollFd))
1016         }
1017
1018         context.lock.Lock()
1019         defer context.lock.Unlock()
1020         // Unregister the interface from the context.
1021         delete(context.memifs, memif.ifIndex)
1022         log.WithField("ifName", memif.IfName).Debug("memif interface was closed")
1023
1024         return err
1025 }
1026
1027 // initQueues allocates resources associated with Rx/Tx queues.
1028 func (memif *Memif) initQueues() error {
1029         // Get Rx/Tx queues count.
1030         details, err := memif.GetDetails()
1031         if err != nil {
1032                 return err
1033         }
1034
1035         log.WithFields(logger.Fields{
1036                 "ifName":   memif.IfName,
1037                 "Rx-count": len(details.RxQueues),
1038                 "Tx-count": len(details.TxQueues),
1039         }).Debug("Initializing Rx/Tx queues.")
1040
1041         // Initialize interrupt channels.
1042         var i int
1043         for i = 0; i < len(details.RxQueues); i++ {
1044                 queueIntCh := make(chan struct{}, 1)
1045                 memif.queueIntCh = append(memif.queueIntCh, queueIntCh)
1046         }
1047
1048         // Initialize Rx/Tx packet buffers.
1049         for i = 0; i < len(details.RxQueues); i++ {
1050                 memif.rxQueueBufs = append(memif.rxQueueBufs, CPacketBuffers{})
1051                 if !memif.IsMaster {
1052                         errCode := C.memif_refill_queue(memif.cHandle, C.uint16_t(i), C.uint16_t(memif.ringSize-1), 0)
1053                         err = getMemifError(int(errCode))
1054                         if err != nil {
1055                                 log.Warn(err.Error())
1056                         }
1057                 }
1058         }
1059         for i = 0; i < len(details.TxQueues); i++ {
1060                 memif.txQueueBufs = append(memif.txQueueBufs, CPacketBuffers{})
1061         }
1062
1063         return nil
1064 }
1065
1066 // closeQueues deallocates all resources associated with Rx/Tx queues.
1067 func (memif *Memif) closeQueues() {
1068         log.WithFields(logger.Fields{
1069                 "ifName":   memif.IfName,
1070                 "Rx-count": len(memif.rxQueueBufs),
1071                 "Tx-count": len(memif.txQueueBufs),
1072         }).Debug("Closing Rx/Tx queues.")
1073
1074         // Close interrupt channels.
1075         for _, ch := range memif.queueIntCh {
1076                 close(ch)
1077         }
1078         memif.queueIntCh = nil
1079
1080         // Deallocate Rx/Tx packet buffers.
1081         for _, pb := range memif.rxQueueBufs {
1082                 C.free(unsafe.Pointer(pb.buffers))
1083         }
1084         memif.rxQueueBufs = nil
1085         for _, pb := range memif.txQueueBufs {
1086                 C.free(unsafe.Pointer(pb.buffers))
1087         }
1088         memif.txQueueBufs = nil
1089 }
1090
1091 // pollEvents repeatedly polls for a libmemif event.
1092 func pollEvents() {
1093         defer context.wg.Done()
1094         for {
1095                 errCode := C.memif_poll_event(C.int(-1))
1096                 err := getMemifError(int(errCode))
1097                 if err == ErrPollCanceled {
1098                         return
1099                 }
1100         }
1101 }
1102
1103 // pollRxQueue repeatedly polls an Rx queue for interrupts.
1104 func pollRxQueue(memif *Memif, queueID uint8) {
1105         defer memif.wg.Done()
1106
1107         log.WithFields(logger.Fields{
1108                 "ifName":   memif.IfName,
1109                 "queue-ID": queueID,
1110         }).Debug("Started queue interrupt polling.")
1111
1112         var qfd C.int
1113         errCode := C.memif_get_queue_efd(memif.cHandle, C.uint16_t(queueID), &qfd)
1114         err := getMemifError(int(errCode))
1115         if err != nil {
1116                 log.WithField("err", err).Error("memif_get_queue_efd() failed")
1117                 return
1118         }
1119
1120         // Create epoll file descriptor.
1121         var event [1]syscall.EpollEvent
1122         epFd, err := syscall.EpollCreate1(0)
1123         if err != nil {
1124                 log.WithField("err", err).Error("epoll_create1() failed")
1125                 return
1126         }
1127         defer syscall.Close(epFd)
1128
1129         // Add Rx queue interrupt file descriptor.
1130         event[0].Events = syscall.EPOLLIN
1131         event[0].Fd = int32(qfd)
1132         if err = syscall.EpollCtl(epFd, syscall.EPOLL_CTL_ADD, int(qfd), &event[0]); err != nil {
1133                 log.WithField("err", err).Error("epoll_ctl() failed")
1134                 return
1135         }
1136
1137         // Add file descriptor used to stop this go routine.
1138         event[0].Events = syscall.EPOLLIN
1139         event[0].Fd = int32(memif.stopQPollFd)
1140         if err = syscall.EpollCtl(epFd, syscall.EPOLL_CTL_ADD, memif.stopQPollFd, &event[0]); err != nil {
1141                 log.WithField("err", err).Error("epoll_ctl() failed")
1142                 return
1143         }
1144
1145         // Poll for interrupts.
1146         for {
1147                 _, err := syscall.EpollWait(epFd, event[:], -1)
1148                 if err != nil {
1149                         log.WithField("err", err).Error("epoll_wait() failed")
1150                         return
1151                 }
1152
1153                 // Handle Rx Interrupt.
1154                 if event[0].Fd == int32(qfd) {
1155                         // Consume the interrupt event.
1156                         buf := make([]byte, 8)
1157                         _, err = syscall.Read(int(qfd), buf[:])
1158                         if err != nil {
1159                                 log.WithField("err", err).Warn("read() failed")
1160                         }
1161
1162                         // Send signal to memif-global interrupt channel.
1163                         select {
1164                         case memif.intCh <- queueID:
1165                                 break
1166                         default:
1167                                 break
1168                         }
1169
1170                         // Send signal to queue-specific interrupt channel.
1171                         select {
1172                         case memif.queueIntCh[queueID] <- struct{}{}:
1173                                 break
1174                         default:
1175                                 break
1176                         }
1177                 }
1178
1179                 // Stop the go routine if requested.
1180                 if event[0].Fd == int32(memif.stopQPollFd) {
1181                         log.WithFields(logger.Fields{
1182                                 "ifName":   memif.IfName,
1183                                 "queue-ID": queueID,
1184                         }).Debug("Stopped queue interrupt polling.")
1185                         return
1186                 }
1187         }
1188 }
1189
1190 //export go_on_connect_callback
1191 func go_on_connect_callback(privateCtx unsafe.Pointer) C.int {
1192         log.Debug("go_on_connect_callback BEGIN")
1193         defer log.Debug("go_on_connect_callback END")
1194         context.lock.RLock()
1195         defer context.lock.RUnlock()
1196
1197         // Get memif reference.
1198         ifIndex := int(uintptr(privateCtx))
1199         memif, exists := context.memifs[ifIndex]
1200         if !exists {
1201                 return C.int(ErrNoConn.Code())
1202         }
1203
1204         // Initialize Rx/Tx queues.
1205         err := memif.initQueues()
1206         if err != nil {
1207                 if memifErr, ok := err.(*MemifError); ok {
1208                         return C.int(memifErr.Code())
1209                 }
1210                 return C.int(ErrUnknown.Code())
1211         }
1212
1213         // Call the user callback.
1214         if memif.callbacks.OnConnect != nil {
1215                 memif.callbacks.OnConnect(memif)
1216         }
1217
1218         // Start polling the RX queues for interrupts.
1219         for i := 0; i < len(memif.queueIntCh); i++ {
1220                 memif.wg.Add(1)
1221                 go pollRxQueue(memif, uint8(i))
1222         }
1223
1224         return C.int(0)
1225 }
1226
1227 //export go_on_disconnect_callback
1228 func go_on_disconnect_callback(privateCtx unsafe.Pointer) C.int {
1229         log.Debug("go_on_disconnect_callback BEGIN")
1230         defer log.Debug("go_on_disconnect_callback END")
1231         context.lock.RLock()
1232         defer context.lock.RUnlock()
1233
1234         // Get memif reference.
1235         ifIndex := int(uintptr(privateCtx))
1236         memif, exists := context.memifs[ifIndex]
1237         if !exists {
1238                 // Already closed.
1239                 return C.int(0)
1240         }
1241
1242         // Stop polling the RX queues for interrupts.
1243         buf := make([]byte, 8)
1244         binary.PutUvarint(buf, 1)
1245         // - add an event
1246         _, err := syscall.Write(memif.stopQPollFd, buf[:])
1247         if err != nil {
1248                 return C.int(ErrSyscall.Code())
1249         }
1250         // - wait
1251         memif.wg.Wait()
1252         // - remove the event
1253         _, err = syscall.Read(memif.stopQPollFd, buf[:])
1254         if err != nil {
1255                 return C.int(ErrSyscall.Code())
1256         }
1257
1258         // Call the user callback.
1259         if memif.callbacks.OnDisconnect != nil {
1260                 memif.callbacks.OnDisconnect(memif)
1261         }
1262
1263         // Close Rx/Tx queues.
1264         memif.closeQueues()
1265
1266         return C.int(0)
1267 }