https://jira.fd.io/browse/GOVPP-21
[govpp.git] / extras / libmemif / adapter.go
1 // Copyright (c) 2017 Cisco and/or its affiliates.
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at:
6 //
7 //       http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14
15 // +build !windows,!darwin
16
17 package libmemif
18
19 import (
20         "encoding/binary"
21         "os"
22         "sync"
23         "syscall"
24         "unsafe"
25
26         logger "github.com/sirupsen/logrus"
27 )
28
29 /*
30 #cgo LDFLAGS: -lmemif
31
32 #include <unistd.h>
33 #include <stdlib.h>
34 #include <stdint.h>
35 #include <string.h>
36 #include <sys/eventfd.h>
37 #include <libmemif.h>   // <-- VPP must be installed!
38
39 // Feature tests.
40 #ifndef MEMIF_HAVE_CANCEL_POLL_EVENT
41 // memif_cancel_poll_event that simply returns ErrUnsupported.
42 static int
43 memif_cancel_poll_event ()
44 {
45         return 102; // ErrUnsupported
46 }
47 #endif
48
49 // govpp_memif_conn_args_t replaces fixed sized arrays with C-strings which
50 // are much easier to work with in cgo.
51 typedef struct
52 {
53         memif_socket_handle_t socket;
54         char *secret;
55         uint8_t num_s2m_rings;
56         uint8_t num_m2s_rings;
57         uint16_t buffer_size;
58         uint8_t log2_ring_size;
59         uint8_t is_master;
60         uint32_t interface_id;
61         char *interface_name;
62         memif_interface_mode_t mode;
63 } govpp_memif_conn_args_t;
64
65 // govpp_memif_details_t replaces strings represented with (uint8_t *)
66 // to the standard and easy to work with in cgo: (char *)
67 typedef struct
68 {
69         char *if_name;
70         char *inst_name;
71         char *remote_if_name;
72         char *remote_inst_name;
73         uint32_t id;
74         char *secret;
75         uint8_t role;
76         uint8_t mode;
77         char *socket_filename;
78         uint8_t regions_num;
79         memif_region_details_t *regions;
80         uint8_t rx_queues_num;
81         uint8_t tx_queues_num;
82         memif_queue_details_t *rx_queues;
83         memif_queue_details_t *tx_queues;
84         uint8_t link_up_down;
85 } govpp_memif_details_t;
86
87 extern int go_on_connect_callback(void *privateCtx);
88 extern int go_on_disconnect_callback(void *privateCtx);
89
90 // Callbacks strip the connection handle away.
91
92 static int
93 govpp_on_connect_callback(memif_conn_handle_t conn, void *private_ctx)
94 {
95         return go_on_connect_callback(private_ctx);
96 }
97
98 static int
99 govpp_on_disconnect_callback(memif_conn_handle_t conn, void *private_ctx)
100 {
101         return go_on_disconnect_callback(private_ctx);
102 }
103
104 // govpp_memif_create uses govpp_memif_conn_args_t.
105 static int
106 govpp_memif_create (memif_conn_handle_t *conn, govpp_memif_conn_args_t *go_args,
107                     void *private_ctx)
108 {
109         memif_conn_args_t args;
110         memset (&args, 0, sizeof (args));
111         args.socket = (char *)go_args->socket;
112         if (go_args->secret != NULL)
113         {
114                 strncpy ((char *)args.secret, go_args->secret,
115                                  sizeof (args.secret) - 1);
116         }
117         args.num_s2m_rings = go_args->num_s2m_rings;
118         args.num_m2s_rings = go_args->num_m2s_rings;
119         args.buffer_size = go_args->buffer_size;
120         args.log2_ring_size = go_args->log2_ring_size;
121         args.is_master = go_args->is_master;
122         args.interface_id = go_args->interface_id;
123         if (go_args->interface_name != NULL)
124         {
125                 strncpy ((char *)args.interface_name, go_args->interface_name,
126                                  sizeof(args.interface_name) - 1);
127         }
128         args.mode = go_args->mode;
129
130         return memif_create(conn, &args, govpp_on_connect_callback,
131                                                 govpp_on_disconnect_callback, NULL,
132                                                 private_ctx);
133 }
134
135 static int
136 govpp_memif_create_socket (memif_socket_handle_t *sock, char *filename)
137 {
138         return memif_create_socket(sock, filename, NULL);
139 }
140
141 // govpp_memif_get_details keeps reallocating buffer until it is large enough.
142 // The buffer is returned to be deallocated when it is no longer needed.
143 static int
144 govpp_memif_get_details (memif_conn_handle_t conn, govpp_memif_details_t *govpp_md,
145                          char **buf)
146 {
147         int rv = 0;
148         size_t buflen = 1 << 7;
149         char *buffer = NULL, *new_buffer = NULL;
150         memif_details_t md = {0};
151
152         do {
153                 // initial malloc (256 bytes) or realloc
154                 buflen <<= 1;
155                 new_buffer = realloc(buffer, buflen);
156                 if (new_buffer == NULL)
157                 {
158                         free(buffer);
159                         return MEMIF_ERR_NOMEM;
160                 }
161                 buffer = new_buffer;
162                 // try to get details
163                 rv = memif_get_details(conn, &md, buffer, buflen);
164         } while (rv == MEMIF_ERR_NOBUF_DET);
165
166         if (rv == 0)
167         {
168                 *buf = buffer;
169                 govpp_md->if_name = (char *)md.if_name;
170                 govpp_md->inst_name = (char *)md.inst_name;
171                 govpp_md->remote_if_name = (char *)md.remote_if_name;
172                 govpp_md->remote_inst_name = (char *)md.remote_inst_name;
173                 govpp_md->id = md.id;
174                 govpp_md->secret = (char *)md.secret;
175                 govpp_md->role = md.role;
176                 govpp_md->mode = md.mode;
177                 govpp_md->socket_filename = (char *)md.socket_filename;
178                 govpp_md->regions_num = md.regions_num;
179                 govpp_md->regions = md.regions;
180                 govpp_md->rx_queues_num = md.rx_queues_num;
181                 govpp_md->tx_queues_num = md.tx_queues_num;
182                 govpp_md->rx_queues = md.rx_queues;
183                 govpp_md->tx_queues = md.tx_queues;
184                 govpp_md->link_up_down = md.link_up_down;
185         }
186         else
187                 free(buffer);
188         return rv;
189 }
190
191 // Used to avoid cumbersome tricks that use unsafe.Pointer() + unsafe.Sizeof()
192 // or even cast C-array directly into Go-slice.
193 static memif_queue_details_t
194 govpp_get_rx_queue_details (govpp_memif_details_t *md, int index)
195 {
196         return md->rx_queues[index];
197 }
198
199 // Used to avoid cumbersome tricks that use unsafe.Pointer() + unsafe.Sizeof()
200 // or even cast C-array directly into Go-slice.
201 static memif_queue_details_t
202 govpp_get_tx_queue_details (govpp_memif_details_t *md, int index)
203 {
204         return md->tx_queues[index];
205 }
206
207 // Copy packet data into the selected buffer with splitting when necessary
208 static void
209 govpp_copy_packet_data(memif_buffer_t *buffers, uint16_t allocated, int bufIndex, void *packetData, uint16_t packetSize)
210 {
211         int dataOffset = 0;
212
213         do {
214                 buffers[bufIndex].len = (packetSize > buffers[bufIndex].len ? buffers[bufIndex].len : packetSize);
215                 void * curData = (packetData + dataOffset);
216                 memcpy(buffers[bufIndex].data, curData, (size_t)buffers[bufIndex].len);
217                 dataOffset += buffers[bufIndex].len;
218                 bufIndex += 1;
219                 packetSize -= buffers[bufIndex].len;
220         } while(packetSize > 0 && bufIndex < allocated && buffers[bufIndex].flags & MEMIF_BUFFER_FLAG_NEXT > 0);
221 }
222
223 // Get packet data from the selected buffer.
224 // Used to avoid an ugly unsafe.Pointer() + unsafe.Sizeof().
225 static void *
226 govpp_get_packet_data(memif_buffer_t *buffers, int index, int *size)
227 {
228         *size = (int)buffers[index].len;
229         return buffers[index].data;
230 }
231
232 // Checks if memif buffer is chained
233 static int
234 govpp_is_buffer_chained(memif_buffer_t *buffers, int index)
235 {
236     return buffers[index].flags & MEMIF_BUFFER_FLAG_NEXT;
237 }
238
239 // Allocate memif buffers and return pointer to next free buffer
240 static int
241 govpp_memif_buffer_alloc(memif_conn_handle_t conn, uint16_t qid,
242                         memif_buffer_t * bufs, uint16_t offset, memif_buffer_t ** nextFreeBuf,
243                         uint16_t count, uint16_t * count_out, uint16_t size)
244 {
245     memif_buffer_t * offsetBufs = (bufs + offset);
246     int err = memif_buffer_alloc(conn, qid, offsetBufs, count, count_out, size);
247     *count_out += offset;
248     *nextFreeBuf = offsetBufs;
249     return err;
250 }
251
252 */
253 import "C"
254
255 // IfMode represents the mode (layer/behaviour) in which the interface operates.
256 type IfMode int
257
258 const (
259         // IfModeEthernet tells memif to operate on the L2 layer.
260         IfModeEthernet IfMode = iota
261
262         // IfModeIP tells memif to operate on the L3 layer.
263         IfModeIP
264
265         // IfModePuntInject tells memif to behave as Inject/Punt interface.
266         IfModePuntInject
267 )
268
269 // RxMode is used to switch between polling and interrupt for RX.
270 type RxMode int
271
272 const (
273         // RxModeInterrupt tells libmemif to send interrupt signal when data are available.
274         RxModeInterrupt RxMode = iota
275
276         // RxModePolling means that the user needs to explicitly poll for data on RX
277         // queues.
278         RxModePolling
279 )
280
281 // RawPacketData represents raw packet data. libmemif doesn't care what the
282 // actual content is, it only manipulates with raw bytes.
283 type RawPacketData []byte
284
285 // MemifMeta is used to store a basic memif metadata needed for identification
286 // and connection establishment.
287 type MemifMeta struct {
288         // IfName is the interface name. Has to be unique across all created memifs.
289         // Interface name is truncated if needed to have no more than 32 characters.
290         IfName string
291
292         // InstanceName identifies the endpoint. If omitted, the application
293         // name passed to Init() will be used instead.
294         // Instance name is truncated if needed to have no more than 32 characters.
295         InstanceName string
296
297         // ConnID is a connection ID used to match opposite sides of the memif
298         // connection.
299         ConnID uint32
300
301         // SocketFilename is the filename of the AF_UNIX socket through which
302         // the connection is established.
303         // The string is truncated if neede to fit into sockaddr_un.sun_path
304         // (108 characters on Linux).
305         SocketFilename string
306
307         // Secret must be the same on both sides for the authentication to succeed.
308         // Empty string is allowed.
309         // The secret is truncated if needed to have no more than 24 characters.
310         Secret string
311
312         // IsMaster is set to true if memif operates in the Master mode.
313         IsMaster bool
314
315         // Mode is the mode (layer/behaviour) in which the memif operates.
316         Mode IfMode
317 }
318
319 // MemifShmSpecs is used to store the specification of the shared memory segment
320 // used by memif to send/receive packets.
321 type MemifShmSpecs struct {
322         // NumRxQueues is the number of Rx queues.
323         // Default is 1 (used if the value is 0).
324         NumRxQueues uint8
325
326         // NumTxQueues is the number of Tx queues.
327         // Default is 1 (used if the value is 0).
328         NumTxQueues uint8
329
330         // BufferSize is the size of the buffer to hold one packet, or a single
331         // fragment of a jumbo frame. Default is 2048 (used if the value is 0).
332         BufferSize uint16
333
334         // Log2RingSize is the number of items in the ring represented through
335         // the logarithm base 2.
336         // Default is 10 (used if the value is 0).
337         Log2RingSize uint8
338 }
339
340 // MemifConfig is the memif configuration.
341 // Used as the input argument to CreateInterface().
342 // It is the slave's config that mostly decides the parameters of the connection,
343 // but master may limit some of the quantities if needed (based on the memif
344 // protocol or master's configuration)
345 type MemifConfig struct {
346         MemifMeta
347         MemifShmSpecs
348 }
349
350 // ConnUpdateCallback is a callback type declaration used with callbacks
351 // related to connection status changes.
352 type ConnUpdateCallback func(memif *Memif) (err error)
353
354 // MemifCallbacks is a container for all callbacks provided by memif.
355 // Any callback can be nil, in which case it will be simply skipped.
356 // Important: Do not call CreateInterface() or Memif.Close() from within a callback
357 // or a deadlock will occur. Instead send signal through a channel to another
358 // go routine which will be able to create/remove memif interface(s).
359 type MemifCallbacks struct {
360         // OnConnect is triggered when a connection for a given memif was established.
361         OnConnect ConnUpdateCallback
362
363         // OnDisconnect is triggered when a connection for a given memif was lost.
364         OnDisconnect ConnUpdateCallback
365 }
366
367 // Memif represents a single memif interface. It provides methods to send/receive
368 // packets in bursts in either the polling mode or in the interrupt mode with
369 // the help of golang channels.
370 type Memif struct {
371         MemifMeta
372
373         // Per-library references
374         ifIndex int                     // index used in the Go-libmemif context (Context.memifs)
375         cHandle C.memif_conn_handle_t   // connection handle used in C-libmemif
376         sHandle C.memif_socket_handle_t // socket handle used in C-libmemif
377
378         // Callbacks
379         callbacks *MemifCallbacks
380
381         // Interrupt
382         intCh      chan uint8      // memif-global interrupt channel (value = queue ID)
383         intErrCh   chan error      // triggered when interrupt error occurs
384         queueIntCh []chan struct{} // per RX queue interrupt channel
385
386         // Rx/Tx queues
387         ringSize    int              // number of items in each ring
388         bufferSize  int              // max buffer size
389         stopQPollFd int              // event file descriptor used to stop pollRxQueue-s
390         wg          sync.WaitGroup   // wait group for all pollRxQueue-s
391         rxQueueBufs []CPacketBuffers // an array of C-libmemif packet buffers for each RX queue
392         txQueueBufs []CPacketBuffers // an array of C-libmemif packet buffers for each TX queue
393 }
394
395 // MemifDetails provides a detailed runtime information about a memif interface.
396 type MemifDetails struct {
397         MemifMeta
398         MemifConnDetails
399 }
400
401 // MemifConnDetails provides a detailed runtime information about a memif
402 // connection.
403 type MemifConnDetails struct {
404         // RemoteIfName is the name of the memif on the opposite side.
405         RemoteIfName string
406         // RemoteInstanceName is the name of the endpoint on the opposite side.
407         RemoteInstanceName string
408         // HasLink is true if the connection has link (= is established and functional).
409         HasLink bool
410         // RxQueues contains details for each Rx queue.
411         RxQueues []MemifQueueDetails
412         // TxQueues contains details for each Tx queue.
413         TxQueues []MemifQueueDetails
414 }
415
416 // MemifQueueDetails provides a detailed runtime information about a memif queue.
417 // Queue = Ring + the associated buffers (one directional).
418 type MemifQueueDetails struct {
419         // QueueID is the ID of the queue.
420         QueueID uint8
421         // RingSize is the number of slots in the ring (not logarithmic).
422         RingSize uint32
423         // BufferSize is the size of each buffer pointed to from the ring slots.
424         BufferSize uint16
425         /* Further ring information TO-BE-ADDED when C-libmemif supports them. */
426 }
427
428 // CPacketBuffers stores an array of memif buffers for use with TxBurst or RxBurst.
429 type CPacketBuffers struct {
430         buffers    *C.memif_buffer_t
431         count      int
432         rxChainBuf []RawPacketData
433 }
434
435 // Context is a global Go-libmemif runtime context.
436 type Context struct {
437         lock           sync.RWMutex
438         initialized    bool
439         memifs         map[int] /* ifIndex */ *Memif /* slice of all active memif interfaces */
440         nextMemifIndex int
441
442         wg sync.WaitGroup /* wait-group for pollEvents() */
443 }
444
445 type txPacketBuffer struct {
446         packets []RawPacketData
447         size    int
448 }
449
450 var (
451         // logger used by the adapter.
452         log *logger.Logger
453
454         // Global Go-libmemif context.
455         context = &Context{initialized: false}
456 )
457
458 // init initializes global logger, which logs debug level messages to stdout.
459 func init() {
460         log = logger.New()
461         log.Out = os.Stdout
462         log.Level = logger.DebugLevel
463 }
464
465 // SetLogger changes the logger for Go-libmemif to the provided one.
466 // The logger is not used for logging of C-libmemif.
467 func SetLogger(l *logger.Logger) {
468         log = l
469 }
470
471 // Init initializes the libmemif library. Must by called exactly once and before
472 // any libmemif functions. Do not forget to call Cleanup() before exiting
473 // your application.
474 // <appName> should be a human-readable string identifying your application.
475 // For example, VPP returns the version information ("show version" from VPP CLI).
476 func Init(appName string) error {
477         context.lock.Lock()
478         defer context.lock.Unlock()
479
480         if context.initialized {
481                 return ErrAlreadyInit
482         }
483
484         log.Debug("Initializing libmemif library")
485
486         // Initialize C-libmemif.
487         var errCode int
488         if appName == "" {
489                 errCode = int(C.memif_init(nil, nil, nil, nil, nil))
490         } else {
491                 appName := C.CString(appName)
492                 defer C.free(unsafe.Pointer(appName))
493                 errCode = int(C.memif_init(nil, appName, nil, nil, nil))
494         }
495         err := getMemifError(errCode)
496         if err != nil {
497                 return err
498         }
499
500         // Initialize the map of memory interfaces.
501         context.memifs = make(map[int]*Memif)
502
503         // Start event polling.
504         context.wg.Add(1)
505         go pollEvents()
506
507         context.initialized = true
508         log.Debug("libmemif library was initialized")
509         return err
510 }
511
512 // Cleanup cleans up all the resources allocated by libmemif.
513 func Cleanup() error {
514         context.lock.Lock()
515         defer context.lock.Unlock()
516
517         if !context.initialized {
518                 return ErrNotInit
519         }
520
521         log.Debug("Closing libmemif library")
522
523         // Delete all active interfaces.
524         for _, memif := range context.memifs {
525                 memif.Close()
526         }
527
528         // Stop the event loop (if supported by C-libmemif).
529         errCode := C.memif_cancel_poll_event()
530         err := getMemifError(int(errCode))
531         if err == nil {
532                 log.Debug("Waiting for pollEvents() to stop...")
533                 context.wg.Wait()
534                 log.Debug("pollEvents() has stopped...")
535         } else {
536                 log.WithField("err", err).Debug("NOT Waiting for pollEvents to stop...")
537         }
538
539         // Run cleanup for C-libmemif.
540         err = getMemifError(int(C.memif_cleanup()))
541         if err == nil {
542                 context.initialized = false
543                 log.Debug("libmemif library was closed")
544         }
545         return err
546 }
547
548 // CreateInterface creates a new memif interface with the given configuration.
549 // The same callbacks can be used with multiple memifs. The first callback input
550 // argument (*Memif) can be used to tell which memif the callback was triggered for.
551 // The method is thread-safe.
552 func CreateInterface(config *MemifConfig, callbacks *MemifCallbacks) (memif *Memif, err error) {
553         context.lock.Lock()
554         defer context.lock.Unlock()
555
556         if !context.initialized {
557                 return nil, ErrNotInit
558         }
559
560         log.WithField("ifName", config.IfName).Debug("Creating a new memif interface")
561
562         log2RingSize := config.Log2RingSize
563         if log2RingSize == 0 {
564                 log2RingSize = 10
565         }
566
567         bufferSize := config.BufferSize
568         if bufferSize <= 0 {
569                 bufferSize = 2048
570         }
571
572         // Create memif-wrapper for Go-libmemif.
573         memif = &Memif{
574                 MemifMeta:  config.MemifMeta,
575                 callbacks:  &MemifCallbacks{},
576                 ifIndex:    context.nextMemifIndex,
577                 ringSize:   1 << log2RingSize,
578                 bufferSize: int(bufferSize),
579         }
580
581         // Initialize memif callbacks.
582         if callbacks != nil {
583                 memif.callbacks.OnConnect = callbacks.OnConnect
584                 memif.callbacks.OnDisconnect = callbacks.OnDisconnect
585         }
586
587         // Initialize memif-global interrupt channel.
588         memif.intCh = make(chan uint8, 1<<6)
589         memif.intErrCh = make(chan error, 1<<6)
590
591         // Initialize event file descriptor for stopping Rx/Tx queue polling.
592         memif.stopQPollFd = int(C.eventfd(0, C.EFD_NONBLOCK))
593         if memif.stopQPollFd < 0 {
594                 return nil, ErrSyscall
595         }
596
597         // Initialize memif input arguments.
598         args := &C.govpp_memif_conn_args_t{}
599         // - socket file name
600         if config.SocketFilename != "" {
601                 log.WithField("name", config.SocketFilename).Debug("A new memif socket was created")
602                 errCode := C.govpp_memif_create_socket(&memif.sHandle, C.CString(config.SocketFilename))
603                 if getMemifError(int(errCode)) != nil {
604                         return nil, err
605                 }
606         }
607         args.socket = memif.sHandle
608         // - interface ID
609         args.interface_id = C.uint32_t(config.ConnID)
610         // - interface name
611         if config.IfName != "" {
612                 args.interface_name = C.CString(config.IfName)
613                 defer C.free(unsafe.Pointer(args.interface_name))
614         }
615         // - mode
616         switch config.Mode {
617         case IfModeEthernet:
618                 args.mode = C.MEMIF_INTERFACE_MODE_ETHERNET
619         case IfModeIP:
620                 args.mode = C.MEMIF_INTERFACE_MODE_IP
621         case IfModePuntInject:
622                 args.mode = C.MEMIF_INTERFACE_MODE_PUNT_INJECT
623         default:
624                 args.mode = C.MEMIF_INTERFACE_MODE_ETHERNET
625         }
626         // - secret
627         if config.Secret != "" {
628                 args.secret = C.CString(config.Secret)
629                 defer C.free(unsafe.Pointer(args.secret))
630         }
631         // - master/slave flag + number of Rx/Tx queues
632         if config.IsMaster {
633                 args.num_s2m_rings = C.uint8_t(config.NumRxQueues)
634                 args.num_m2s_rings = C.uint8_t(config.NumTxQueues)
635                 args.is_master = C.uint8_t(1)
636         } else {
637                 args.num_s2m_rings = C.uint8_t(config.NumTxQueues)
638                 args.num_m2s_rings = C.uint8_t(config.NumRxQueues)
639                 args.is_master = C.uint8_t(0)
640         }
641         // - buffer size
642         args.buffer_size = C.uint16_t(config.BufferSize)
643         // - log_2(ring size)
644         args.log2_ring_size = C.uint8_t(config.Log2RingSize)
645
646         // Create memif in C-libmemif.
647         errCode := C.govpp_memif_create(&memif.cHandle, args, unsafe.Pointer(uintptr(memif.ifIndex)))
648         if getMemifError(int(errCode)) != nil {
649                 return nil, err
650         }
651
652         // Register the new memif.
653         context.memifs[memif.ifIndex] = memif
654         context.nextMemifIndex++
655         log.WithField("ifName", config.IfName).Debug("A new memif interface was created")
656
657         return memif, nil
658 }
659
660 // GetInterruptChan returns a channel which is continuously being filled with
661 // IDs of queues with data ready to be received.
662 // Since there is only one interrupt signal sent for an entire burst of packets,
663 // an interrupt handling routine should repeatedly call RxBurst() until
664 // the function returns an empty slice of packets. This way it is ensured
665 // that there are no packets left on the queue unread when the interrupt signal
666 // is cleared.
667 // The method is thread-safe.
668 func (memif *Memif) GetInterruptChan() (ch <-chan uint8 /* queue ID */) {
669         return memif.intCh
670 }
671
672 // GetInterruptErrorChan returns an Error channel
673 // which fires if there are errors occurred while read data.
674 func (memif *Memif) GetInterruptErrorChan() (ch <-chan error /* The error */) {
675         return memif.intErrCh
676 }
677
678 // GetQueueInterruptChan returns an empty-data channel which fires every time
679 // there are data to read on a given queue.
680 // It is only valid to call this function if memif is in the connected state.
681 // Channel is automatically closed when the connection goes down (but after
682 // the user provided callback OnDisconnect has executed).
683 // Since there is only one interrupt signal sent for an entire burst of packets,
684 // an interrupt handling routine should repeatedly call RxBurst() until
685 // the function returns an empty slice of packets. This way it is ensured
686 // that there are no packets left on the queue unread when the interrupt signal
687 // is cleared.
688 // The method is thread-safe.
689 func (memif *Memif) GetQueueInterruptChan(queueID uint8) (ch <-chan struct{}, err error) {
690         if int(queueID) >= len(memif.queueIntCh) {
691                 return nil, ErrQueueID
692         }
693         return memif.queueIntCh[queueID], nil
694 }
695
696 // SetRxMode allows to switch between the interrupt and the polling mode for Rx.
697 // The method is thread-safe.
698 func (memif *Memif) SetRxMode(queueID uint8, rxMode RxMode) (err error) {
699         var cRxMode C.memif_rx_mode_t
700         switch rxMode {
701         case RxModeInterrupt:
702                 cRxMode = C.MEMIF_RX_MODE_INTERRUPT
703         case RxModePolling:
704                 cRxMode = C.MEMIF_RX_MODE_POLLING
705         default:
706                 cRxMode = C.MEMIF_RX_MODE_INTERRUPT
707         }
708         errCode := C.memif_set_rx_mode(memif.cHandle, cRxMode, C.uint16_t(queueID))
709         return getMemifError(int(errCode))
710 }
711
712 // GetDetails returns a detailed runtime information about this memif.
713 // The method is thread-safe.
714 func (memif *Memif) GetDetails() (details *MemifDetails, err error) {
715         cDetails := C.govpp_memif_details_t{}
716         var buf *C.char
717
718         // Get memif details from C-libmemif.
719         errCode := C.govpp_memif_get_details(memif.cHandle, &cDetails, &buf)
720         err = getMemifError(int(errCode))
721         if err != nil {
722                 return nil, err
723         }
724         defer C.free(unsafe.Pointer(buf))
725
726         // Convert details from C to Go.
727         details = &MemifDetails{}
728         // - metadata:
729         details.IfName = C.GoString(cDetails.if_name)
730         details.InstanceName = C.GoString(cDetails.inst_name)
731         details.ConnID = uint32(cDetails.id)
732         details.SocketFilename = C.GoString(cDetails.socket_filename)
733         if cDetails.secret != nil {
734                 details.Secret = C.GoString(cDetails.secret)
735         }
736         details.IsMaster = cDetails.role == C.uint8_t(0)
737         switch cDetails.mode {
738         case C.MEMIF_INTERFACE_MODE_ETHERNET:
739                 details.Mode = IfModeEthernet
740         case C.MEMIF_INTERFACE_MODE_IP:
741                 details.Mode = IfModeIP
742         case C.MEMIF_INTERFACE_MODE_PUNT_INJECT:
743                 details.Mode = IfModePuntInject
744         default:
745                 details.Mode = IfModeEthernet
746         }
747         // - connection details:
748         details.RemoteIfName = C.GoString(cDetails.remote_if_name)
749         details.RemoteInstanceName = C.GoString(cDetails.remote_inst_name)
750         details.HasLink = cDetails.link_up_down == C.uint8_t(1)
751         // - RX queues:
752         var i uint8
753         for i = 0; i < uint8(cDetails.rx_queues_num); i++ {
754                 cRxQueue := C.govpp_get_rx_queue_details(&cDetails, C.int(i))
755                 queueDetails := MemifQueueDetails{
756                         QueueID:    uint8(cRxQueue.qid),
757                         RingSize:   uint32(cRxQueue.ring_size),
758                         BufferSize: uint16(cRxQueue.buffer_size),
759                 }
760                 details.RxQueues = append(details.RxQueues, queueDetails)
761         }
762         // - TX queues:
763         for i = 0; i < uint8(cDetails.tx_queues_num); i++ {
764                 cTxQueue := C.govpp_get_tx_queue_details(&cDetails, C.int(i))
765                 queueDetails := MemifQueueDetails{
766                         QueueID:    uint8(cTxQueue.qid),
767                         RingSize:   uint32(cTxQueue.ring_size),
768                         BufferSize: uint16(cTxQueue.buffer_size),
769                 }
770                 details.TxQueues = append(details.TxQueues, queueDetails)
771         }
772
773         return details, nil
774 }
775
776 // TxBurst is used to send multiple packets in one call into a selected queue.
777 // The actual number of packets sent may be smaller and is returned as <count>.
778 // The method is non-blocking even if the ring is full and no packet can be sent.
779 // It is only valid to call this function if memif is in the connected state.
780 // Multiple TxBurst-s can run concurrently provided that each targets a different
781 // TX queue.
782 func (memif *Memif) TxBurst(queueID uint8, packets []RawPacketData) (count uint16, err error) {
783         if len(packets) == 0 {
784                 return 0, nil
785         }
786
787         if int(queueID) >= len(memif.txQueueBufs) {
788                 return 0, ErrQueueID
789         }
790
791         var bufCount int
792         buffers := make([]*txPacketBuffer, 0)
793         cQueueID := C.uint16_t(queueID)
794
795         for _, packet := range packets {
796                 packetLen := len(packet)
797                 log.Debugf("%v - preparing packet with len %v", cQueueID, packetLen)
798
799                 if packetLen > memif.bufferSize {
800                         // Create jumbo buffer
801                         buffer := &txPacketBuffer{
802                                 size:    packetLen,
803                                 packets: []RawPacketData{packet},
804                         }
805
806                         buffers = append(buffers, buffer)
807
808                         // Increment bufCount by number of splits in this jumbo
809                         bufCount += (buffer.size + memif.bufferSize - 1) / memif.bufferSize
810                 } else {
811                         buffersLen := len(buffers)
812
813                         // This is very first buffer so there is no data to append to, prepare empty one
814                         if buffersLen == 0 {
815                                 buffers = []*txPacketBuffer{{}}
816                                 buffersLen = 1
817                         }
818
819                         lastBuffer := buffers[buffersLen-1]
820
821                         // Last buffer is jumbo buffer, create new buffer
822                         if lastBuffer.size > memif.bufferSize {
823                                 lastBuffer = &txPacketBuffer{}
824                                 buffers = append(buffers, lastBuffer)
825                         }
826
827                         // Determine buffer size by max packet size in buffer
828                         if packetLen > lastBuffer.size {
829                                 lastBuffer.size = packetLen
830                         }
831
832                         lastBuffer.packets = append(lastBuffer.packets, packet)
833                         bufCount += 1
834                 }
835         }
836
837         // Reallocate Tx buffers if needed to fit the input packets.
838         log.Debugf("%v - total buffer to allocate count %v", cQueueID, bufCount)
839         pb := &memif.txQueueBufs[queueID]
840         if pb.count < bufCount {
841                 newBuffers := C.realloc(unsafe.Pointer(pb.buffers), C.size_t(bufCount*int(C.sizeof_memif_buffer_t)))
842                 if newBuffers == nil {
843                         // Realloc failed, <count> will be less than len(packets).
844                         bufCount = pb.count
845                 } else {
846                         pb.buffers = (*C.memif_buffer_t)(newBuffers)
847                         pb.count = bufCount
848                 }
849         }
850
851         // Allocate ring slots.
852         var allocated C.uint16_t
853         var subCount C.uint16_t
854         for _, buffer := range buffers {
855                 packetCount := C.uint16_t(len(buffer.packets))
856                 isJumbo := buffer.size > memif.bufferSize
857
858                 log.Debugf("%v - trying to send max buff size %v, packets len %v, buffer len %v, jumbo %v",
859                         cQueueID, buffer.size, len(buffer.packets), packetCount, isJumbo)
860
861                 var nextFreeBuff *C.memif_buffer_t
862                 startOffset := allocated
863                 errCode := C.govpp_memif_buffer_alloc(memif.cHandle, cQueueID, pb.buffers, startOffset, &nextFreeBuff,
864                         packetCount, &allocated, C.uint16_t(buffer.size))
865
866                 err = getMemifError(int(errCode))
867                 endEarly := err == ErrNoBufRing
868                 if endEarly {
869                         // Not enough ring slots, <count> will be less than packetCount.
870                         err = nil
871                 }
872                 if err != nil {
873                         return 0, err
874                 }
875
876                 // Copy packet data into the buffers.
877                 nowAllocated := allocated - startOffset
878                 toFill := nowAllocated
879                 if !isJumbo {
880                         // If this is not jumbo frame, only 1 packet needs to be copied each iteration
881                         toFill = 1
882                 }
883
884                 // Iterate over all packets and try to fill them into allocated buffers
885                 // If packet is jumbo frame, continue filling to allocated buffers until no buffer is left
886                 for i, packet := range buffer.packets {
887                         if i >= int(nowAllocated) {
888                                 // There was less allocated buffers than actual packet count so exit early
889                                 break
890                         }
891
892                         packetData := unsafe.Pointer(&packet[0])
893                         C.govpp_copy_packet_data(nextFreeBuff, toFill, C.int(i), packetData, C.uint16_t(len(packet)))
894                 }
895
896                 if isJumbo && nowAllocated > 0 {
897                         // If we successfully allocated required amount of buffers for entire jumbo to be sent
898                         // simply sub entire amount of jumbo frame packets and leave only 1 so sender will think
899                         // it only sent 1 packet so it does not need to know anything about jumbo frames
900                         subCount += nowAllocated - 1
901                 }
902
903                 // If we do not have enough buffers left to allocate, simply end here to avoid packet loss and try
904                 // to handle it next burst
905                 if endEarly {
906                         break
907                 }
908         }
909
910         var sentCount C.uint16_t
911         errCode := C.memif_tx_burst(memif.cHandle, cQueueID, pb.buffers, allocated, &sentCount)
912         err = getMemifError(int(errCode))
913         if err != nil {
914                 return 0, err
915         }
916
917         // Prevent negative values
918         realSent := uint16(sentCount) - uint16(subCount)
919         if subCount > sentCount {
920                 sentCount = 0
921         }
922
923         log.Debugf("%v - sent %v total allocated buffs %v", cQueueID, sentCount, allocated)
924         return realSent, nil
925 }
926
927 // RxBurst is used to receive multiple packets in one call from a selected queue.
928 // <count> is the number of packets to receive. The actual number of packets
929 // received may be smaller. <count> effectively limits the maximum number
930 // of packets to receive in one burst (for a flat, predictable memory usage).
931 // The method is non-blocking even if there are no packets to receive.
932 // It is only valid to call this function if memif is in the connected state.
933 // Multiple RxBurst-s can run concurrently provided that each targets a different
934 // Rx queue.
935 func (memif *Memif) RxBurst(queueID uint8, count uint16) (packets []RawPacketData, err error) {
936         var recvCount C.uint16_t
937         packets = make([]RawPacketData, 0)
938
939         if count == 0 {
940                 return packets, nil
941         }
942
943         if int(queueID) >= len(memif.rxQueueBufs) {
944                 return packets, ErrQueueID
945         }
946
947         // Reallocate Rx buffers if needed to fit the output packets.
948         pb := &memif.rxQueueBufs[queueID]
949         bufCount := int(count)
950         if pb.count < bufCount {
951                 newBuffers := C.realloc(unsafe.Pointer(pb.buffers), C.size_t(bufCount*int(C.sizeof_memif_buffer_t)))
952                 if newBuffers == nil {
953                         // Realloc failed, len(<packets>) will be certainly less than <count>.
954                         bufCount = pb.count
955                 } else {
956                         pb.buffers = (*C.memif_buffer_t)(newBuffers)
957                         pb.count = bufCount
958                 }
959         }
960
961         cQueueID := C.uint16_t(queueID)
962         errCode := C.memif_rx_burst(memif.cHandle, cQueueID, pb.buffers, C.uint16_t(bufCount), &recvCount)
963         err = getMemifError(int(errCode))
964         if err == ErrNoBuf {
965                 // More packets to read - the user is expected to run RxBurst() until there
966                 // are no more packets to receive.
967                 err = nil
968         }
969         if err != nil {
970                 return packets, err
971         }
972
973         chained := len(pb.rxChainBuf) > 0
974         if chained {
975                 // We had stored data from previous burst because last buffer in previous burst was chained
976                 // so we need to continue appending to this data
977                 packets = pb.rxChainBuf
978                 pb.rxChainBuf = nil
979         }
980
981         // Copy packet data into the instances of RawPacketData.
982         for i := 0; i < int(recvCount); i++ {
983                 var packetSize C.int
984                 packetData := C.govpp_get_packet_data(pb.buffers, C.int(i), &packetSize)
985                 packetBytes := C.GoBytes(packetData, packetSize)
986
987                 if chained {
988                         // We have chained buffers, so start merging packet data with last read packet
989                         prevPacket := packets[len(packets)-1]
990                         packets[len(packets)-1] = append(prevPacket, packetBytes...)
991                 } else {
992                         packets = append(packets, packetBytes)
993                 }
994
995                 // Mark last buffer as chained based on property on current buffer so next buffers
996                 // will try to append data to this one in case we got jumbo frame
997                 chained = C.govpp_is_buffer_chained(pb.buffers, C.int(i)) > 0
998         }
999
1000         if recvCount > 0 {
1001                 errCode = C.memif_refill_queue(memif.cHandle, cQueueID, recvCount, 0)
1002         }
1003         err = getMemifError(int(errCode))
1004         if err != nil {
1005                 // Throw away packets to avoid duplicities.
1006                 packets = nil
1007         }
1008
1009         if chained {
1010                 // We did not had enough space to process all chained buffers to the end so simply tell
1011                 // reader that it should not process any packets here and save them for next burst
1012                 // to finish reading the buffer chain
1013                 pb.rxChainBuf = packets
1014                 packets = nil
1015                 err = ErrNoBuf
1016         }
1017
1018         return packets, err
1019 }
1020
1021 // Close removes the memif interface. If the memif is in the connected state,
1022 // the connection is first properly closed.
1023 // Do not access memif after it is closed, let garbage collector to remove it.
1024 func (memif *Memif) Close() error {
1025         log.WithField("ifName", memif.IfName).Debug("Closing the memif interface")
1026
1027         // Delete memif from C-libmemif.
1028         err := getMemifError(int(C.memif_delete(&memif.cHandle)))
1029
1030         if err != nil {
1031                 // Close memif-global interrupt channel.
1032                 close(memif.intCh)
1033                 close(memif.intErrCh)
1034                 // Close file descriptor stopQPollFd.
1035                 C.close(C.int(memif.stopQPollFd))
1036         }
1037
1038         context.lock.Lock()
1039         defer context.lock.Unlock()
1040         // Unregister the interface from the context.
1041         delete(context.memifs, memif.ifIndex)
1042         log.WithField("ifName", memif.IfName).Debug("memif interface was closed")
1043
1044         return err
1045 }
1046
1047 // initQueues allocates resources associated with Rx/Tx queues.
1048 func (memif *Memif) initQueues() error {
1049         // Get Rx/Tx queues count.
1050         details, err := memif.GetDetails()
1051         if err != nil {
1052                 return err
1053         }
1054
1055         log.WithFields(logger.Fields{
1056                 "ifName":   memif.IfName,
1057                 "Rx-count": len(details.RxQueues),
1058                 "Tx-count": len(details.TxQueues),
1059         }).Debug("Initializing Rx/Tx queues.")
1060
1061         // Initialize interrupt channels.
1062         var i int
1063         for i = 0; i < len(details.RxQueues); i++ {
1064                 queueIntCh := make(chan struct{}, 1)
1065                 memif.queueIntCh = append(memif.queueIntCh, queueIntCh)
1066         }
1067
1068         // Initialize Rx/Tx packet buffers.
1069         for i = 0; i < len(details.RxQueues); i++ {
1070                 memif.rxQueueBufs = append(memif.rxQueueBufs, CPacketBuffers{})
1071                 if !memif.IsMaster {
1072                         errCode := C.memif_refill_queue(memif.cHandle, C.uint16_t(i), C.uint16_t(memif.ringSize-1), 0)
1073                         err = getMemifError(int(errCode))
1074                         if err != nil {
1075                                 log.Warn(err.Error())
1076                         }
1077                 }
1078         }
1079         for i = 0; i < len(details.TxQueues); i++ {
1080                 memif.txQueueBufs = append(memif.txQueueBufs, CPacketBuffers{})
1081         }
1082
1083         return nil
1084 }
1085
1086 // closeQueues deallocates all resources associated with Rx/Tx queues.
1087 func (memif *Memif) closeQueues() {
1088         log.WithFields(logger.Fields{
1089                 "ifName":   memif.IfName,
1090                 "Rx-count": len(memif.rxQueueBufs),
1091                 "Tx-count": len(memif.txQueueBufs),
1092         }).Debug("Closing Rx/Tx queues.")
1093
1094         // Close interrupt channels.
1095         for _, ch := range memif.queueIntCh {
1096                 close(ch)
1097         }
1098         memif.queueIntCh = nil
1099
1100         // Deallocate Rx/Tx packet buffers.
1101         for _, pb := range memif.rxQueueBufs {
1102                 C.free(unsafe.Pointer(pb.buffers))
1103         }
1104         memif.rxQueueBufs = nil
1105         for _, pb := range memif.txQueueBufs {
1106                 C.free(unsafe.Pointer(pb.buffers))
1107         }
1108         memif.txQueueBufs = nil
1109 }
1110
1111 // pollEvents repeatedly polls for a libmemif event.
1112 func pollEvents() {
1113         defer context.wg.Done()
1114         for {
1115                 errCode := C.memif_poll_event(C.int(-1))
1116                 err := getMemifError(int(errCode))
1117                 if err == ErrPollCanceled {
1118                         return
1119                 }
1120         }
1121 }
1122
1123 // pollRxQueue repeatedly polls an Rx queue for interrupts.
1124 func pollRxQueue(memif *Memif, queueID uint8) {
1125         defer memif.wg.Done()
1126
1127         log.WithFields(logger.Fields{
1128                 "ifName":   memif.IfName,
1129                 "queue-ID": queueID,
1130         }).Debug("Started queue interrupt polling.")
1131
1132         var qfd C.int
1133         errCode := C.memif_get_queue_efd(memif.cHandle, C.uint16_t(queueID), &qfd)
1134         err := getMemifError(int(errCode))
1135         if err != nil {
1136                 log.WithField("err", err).Error("memif_get_queue_efd() failed")
1137                 return
1138         }
1139
1140         // Create epoll file descriptor.
1141         var event [1]syscall.EpollEvent
1142         epFd, err := syscall.EpollCreate1(0)
1143         if err != nil {
1144                 log.WithField("err", err).Error("epoll_create1() failed")
1145                 return
1146         }
1147         defer syscall.Close(epFd)
1148
1149         // Add Rx queue interrupt file descriptor.
1150         event[0].Events = syscall.EPOLLIN
1151         event[0].Fd = int32(qfd)
1152         if err = syscall.EpollCtl(epFd, syscall.EPOLL_CTL_ADD, int(qfd), &event[0]); err != nil {
1153                 log.WithField("err", err).Error("epoll_ctl() failed")
1154                 return
1155         }
1156
1157         // Add file descriptor used to stop this go routine.
1158         event[0].Events = syscall.EPOLLIN
1159         event[0].Fd = int32(memif.stopQPollFd)
1160         if err = syscall.EpollCtl(epFd, syscall.EPOLL_CTL_ADD, memif.stopQPollFd, &event[0]); err != nil {
1161                 log.WithField("err", err).Error("epoll_ctl() failed")
1162                 return
1163         }
1164
1165         // Poll for interrupts.
1166         for {
1167                 _, err := syscall.EpollWait(epFd, event[:], -1)
1168                 if err != nil {
1169                         errno, _ := err.(syscall.Errno)
1170                         //EINTR and EAGAIN should not be considered as a fatal error, try again
1171                         if errno == syscall.EINTR || errno == syscall.EAGAIN {
1172                                 continue
1173                         }
1174                         log.WithField("err", err).Error("epoll_wait() failed")
1175                         memif.intErrCh <- err
1176                         return
1177                 }
1178
1179                 // Handle Rx Interrupt.
1180                 if event[0].Fd == int32(qfd) {
1181                         // Consume the interrupt event.
1182                         buf := make([]byte, 8)
1183                         _, err = syscall.Read(int(qfd), buf[:])
1184                         if err != nil {
1185                                 log.WithField("err", err).Warn("read() failed")
1186                         }
1187
1188                         // Send signal to memif-global interrupt channel.
1189                         select {
1190                         case memif.intCh <- queueID:
1191                                 break
1192                         default:
1193                                 break
1194                         }
1195
1196                         // Send signal to queue-specific interrupt channel.
1197                         select {
1198                         case memif.queueIntCh[queueID] <- struct{}{}:
1199                                 break
1200                         default:
1201                                 break
1202                         }
1203                 }
1204
1205                 // Stop the go routine if requested.
1206                 if event[0].Fd == int32(memif.stopQPollFd) {
1207                         log.WithFields(logger.Fields{
1208                                 "ifName":   memif.IfName,
1209                                 "queue-ID": queueID,
1210                         }).Debug("Stopped queue interrupt polling.")
1211                         return
1212                 }
1213         }
1214 }
1215
1216 //export go_on_connect_callback
1217 func go_on_connect_callback(privateCtx unsafe.Pointer) C.int {
1218         log.Debug("go_on_connect_callback BEGIN")
1219         defer log.Debug("go_on_connect_callback END")
1220         context.lock.RLock()
1221         defer context.lock.RUnlock()
1222
1223         // Get memif reference.
1224         ifIndex := int(uintptr(privateCtx))
1225         memif, exists := context.memifs[ifIndex]
1226         if !exists {
1227                 return C.int(ErrNoConn.Code())
1228         }
1229
1230         // Initialize Rx/Tx queues.
1231         err := memif.initQueues()
1232         if err != nil {
1233                 if memifErr, ok := err.(*MemifError); ok {
1234                         return C.int(memifErr.Code())
1235                 }
1236                 return C.int(ErrUnknown.Code())
1237         }
1238
1239         // Call the user callback.
1240         if memif.callbacks.OnConnect != nil {
1241                 memif.callbacks.OnConnect(memif)
1242         }
1243
1244         // Start polling the RX queues for interrupts.
1245         for i := 0; i < len(memif.queueIntCh); i++ {
1246                 memif.wg.Add(1)
1247                 go pollRxQueue(memif, uint8(i))
1248         }
1249
1250         return C.int(0)
1251 }
1252
1253 //export go_on_disconnect_callback
1254 func go_on_disconnect_callback(privateCtx unsafe.Pointer) C.int {
1255         log.Debug("go_on_disconnect_callback BEGIN")
1256         defer log.Debug("go_on_disconnect_callback END")
1257         context.lock.RLock()
1258         defer context.lock.RUnlock()
1259
1260         // Get memif reference.
1261         ifIndex := int(uintptr(privateCtx))
1262         memif, exists := context.memifs[ifIndex]
1263         if !exists {
1264                 // Already closed.
1265                 return C.int(0)
1266         }
1267
1268         // Stop polling the RX queues for interrupts.
1269         buf := make([]byte, 8)
1270         binary.PutUvarint(buf, 1)
1271         // - add an event
1272         _, err := syscall.Write(memif.stopQPollFd, buf[:])
1273         if err != nil {
1274                 return C.int(ErrSyscall.Code())
1275         }
1276         // - wait
1277         memif.wg.Wait()
1278         // - remove the event
1279         _, err = syscall.Read(memif.stopQPollFd, buf[:])
1280         if err != nil {
1281                 return C.int(ErrSyscall.Code())
1282         }
1283
1284         // Call the user callback.
1285         if memif.callbacks.OnDisconnect != nil {
1286                 memif.callbacks.OnDisconnect(memif)
1287         }
1288
1289         // Close Rx/Tx queues.
1290         memif.closeQueues()
1291
1292         return C.int(0)
1293 }