initial tle_dring implementation 09/1909/2
authorKonstantin Ananyev <konstantin.ananyev@intel.com>
Thu, 7 Jul 2016 18:22:38 +0000 (19:22 +0100)
committerKonstantin Ananyev <konstantin.ananyev@intel.com>
Thu, 7 Jul 2016 22:34:40 +0000 (23:34 +0100)
The Dynamic Ring (dring) is a implementation of unbounded FIFO queue,
that supports lockless bulk enqueue/dequeue for multiple producers/consumers.
Internally it contains producer/consumer head/tail indexes
(same as DPDK rte_ring), plus linked list of Dynamic Ring Blocks (drb)s.
Each drb contains some metadata plus array of pointers to queued objects.
It is a caller responsibility to provide sufficient number of drbs for
enqueue operation, and manage unused drbs returned by dequeue operation.
dring features:
- FIFO (First In First Out)
- Lockless implementation.
- Multi- or single-consumer dequeue.
- Multi- or single-producer enqueue.
- Bulk dequeue.
- Bulk enqueue.

Change-Id: I3621c99c6b114a387036a397e79baa8d1588bdb5
Signed-off-by: Konstantin Ananyev <konstantin.ananyev@intel.com>
Makefile
lib/Makefile
lib/libtle_dring/Makefile [new file with mode: 0644]
lib/libtle_dring/dring.c [new file with mode: 0644]
lib/libtle_dring/tle_dring.h [new file with mode: 0644]
test/Makefile [new file with mode: 0644]
test/dring/Makefile [new file with mode: 0644]
test/dring/test_dring.c [new file with mode: 0644]

index c22527c..072b466 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -26,6 +26,7 @@ RTE_TARGET ?= x86_64-native-linuxapp-gcc
 
 DIRS-y += lib
 DIRS-y += examples
+DIRS-y += test
 
 MAKEFLAGS += --no-print-directory
 
index 56e8efb..8ce9bac 100644 (file)
@@ -21,6 +21,7 @@ endif
 
 include $(RTE_SDK)/mk/rte.vars.mk
 
+DIRS-y += libtle_dring
 DIRS-y += libtle_udp
 
 include $(TLDK_ROOT)/mk/tle.subdir.mk
diff --git a/lib/libtle_dring/Makefile b/lib/libtle_dring/Makefile
new file mode 100644 (file)
index 0000000..1f2c940
--- /dev/null
@@ -0,0 +1,38 @@
+# Copyright (c) 2016 Intel Corporation.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+ifeq ($(RTE_SDK),)
+$(error "Please define RTE_SDK environment variable")
+endif
+
+# Default target, can be overwritten by command line or environment
+RTE_TARGET ?= x86_64-native-linuxapp-gcc
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+# library name
+LIB = libtle_dring.a
+
+CFLAGS += -O3
+CFLAGS += $(WERROR_FLAGS) -I$(SRCDIR)
+
+EXPORT_MAP := tle_dring_version.map
+
+LIBABIVER := 1
+
+#source files
+SRCS-y += dring.c
+
+SYMLINK-y-include += tle_dring.h
+
+include $(RTE_SDK)/mk/rte.extlib.mk
diff --git a/lib/libtle_dring/dring.c b/lib/libtle_dring/dring.c
new file mode 100644 (file)
index 0000000..e0fae19
--- /dev/null
@@ -0,0 +1,101 @@
+/*
+ * Copyright (c) 2016  Intel Corporation.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <tle_dring.h>
+
+static const char *
+str_drb_dummy(const struct tle_dring *dr, const struct tle_drb *db)
+{
+       return (db == &dr->dummy) ? "<dummy>" : "";
+}
+
+static const char *
+str_obj_state(const struct tle_dring *dr, const struct tle_drb *db,
+       uint32_t idx)
+{
+       if (db->start + idx < dr->cons.tail)
+               return "<stale>";
+       else if (db->start + idx >= dr->prod.tail)
+               return "<free>";
+       else
+               return NULL;
+}
+
+static void
+drb_obj_dump(FILE *f, int32_t verb, const struct tle_dring *dr,
+       const struct tle_drb *db, uint32_t idx)
+{
+       const char *st;
+
+       st = str_obj_state(dr, db, idx);
+
+       /* pointer to object is valid, dump it. */
+       if (st == NULL)
+               fprintf(f, "\t\t\t\t%u:%p\n", db->start + idx, db->objs[idx]);
+
+       /* dump in verbose mode only. */
+       else if (verb > 0)
+               fprintf(f, "\t\t\t\t%u:%p%s\n",
+                               db->start + idx, db->objs[idx], st);
+}
+
+static void
+drb_dump(FILE *f, int32_t verb, const struct tle_dring *dr,
+       const struct tle_drb *db)
+{
+       uint32_t i;
+
+       fprintf(f, "\t\t@%p%s={\n", db, str_drb_dummy(dr, db));
+       fprintf(f, "\t\t\tnext=%p,\n", db->next);
+       fprintf(f, "\t\t\tsize=%u,\n", db->size);
+       fprintf(f, "\t\t\tstart=%u,\n", db->start);
+
+       fprintf(f, "\t\t\tobjs[]={\n");
+       for (i = 0; i != db->size; i++)
+               drb_obj_dump(f, verb, dr, db, i);
+       fprintf(f, "\t\t\t},\n");
+
+       fprintf(f, "\t\t},\n");
+}
+
+void
+tle_dring_dump(FILE *f, int32_t verb, const struct tle_dring *dr)
+{
+       struct tle_drb *db;
+
+       fprintf(f, "tle_dring@%p={\n", dr);
+       fprintf(f, "\tflags=%#x,\n", dr->flags);
+
+       fprintf(f, "\tprod={,\n");
+       fprintf(f, "\t\thead=%u,\n", dr->prod.head);
+       fprintf(f, "\t\ttail=%u,\n", dr->prod.tail);
+       fprintf(f, "\t\tcrb=%p%s,\n", dr->prod.crb,
+               str_drb_dummy(dr, dr->prod.crb));
+       fprintf(f, "\t},\n");
+
+       fprintf(f, "\tcons={,\n");
+       fprintf(f, "\t\thead=%u,\n", dr->cons.head);
+       fprintf(f, "\t\ttail=%u,\n", dr->cons.tail);
+       fprintf(f, "\t\tcrb=%p%s,\n", dr->cons.crb,
+               str_drb_dummy(dr, dr->cons.crb));
+       fprintf(f, "\t},\n");
+
+       fprintf(f, "\tdrbs[] = {\n");
+       for (db = dr->prod.crb; db != NULL; db = db->next)
+               drb_dump(f, verb, dr, db);
+       fprintf(f, "\t},\n");
+
+       fprintf(f, "};\n");
+}
diff --git a/lib/libtle_dring/tle_dring.h b/lib/libtle_dring/tle_dring.h
new file mode 100644 (file)
index 0000000..e89679d
--- /dev/null
@@ -0,0 +1,473 @@
+/*
+ * Copyright (c) 2016  Intel Corporation.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _TLE_DRING_H_
+#define _TLE_DRING_H_
+
+#include <string.h>
+
+#include <rte_common.h>
+#include <rte_atomic.h>
+#include <rte_memory.h>
+#include <rte_debug.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+/**
+ * @file
+ * TLE dring
+ *
+ * The Dynamic Ring (dring) is a implementation of unbounded FIFO queue,
+ * that supports lockless bulk enqueue/dequeue for multiple producers/consumers.
+ * Internally it is represented by linked list of Dynamic Ring Blocks (drb).
+ * Each drb contains some metadata plus array of pointers to queued objects.
+ * It is a caller responsibility to provide sufficient number of drbs for
+ * enqueue operation, and manage unused drbs returned by dequeue operation.
+ * dring features:
+ *
+ * - FIFO (First In First Out)
+ * - Lockless implementation.
+ * - Multi- or single-consumer dequeue.
+ * - Multi- or single-producer enqueue.
+ * - Bulk dequeue.
+ * - Bulk enqueue.
+ */
+
+/*
+ * RTE_ASSERT was introduced in DPDK 16.07.
+ * For older versions, use RTE_VERIFY.
+ */
+#ifdef RTE_ASSERT
+#define TLE_DRING_ASSERT(exp)  RTE_ASSERT(exp)
+#else
+#define TLE_DRING_ASSERT(exp)  RTE_VERIFY(exp)
+#endif
+
+struct tle_drb {
+       struct tle_drb *next;
+       void *udata;          /**< user data. */
+       uint32_t size;        /**< number of objects in that buffer. */
+       uint32_t start;       /**< start index for that block. */
+       const void *objs[0];
+} __rte_cache_aligned;
+
+struct tle_dring {
+       uint32_t flags;
+       struct  {
+               volatile uint32_t head;         /**< producer head */
+               volatile uint32_t tail;         /**< producer tail */
+               struct tle_drb * volatile crb;  /**< block to enqueue to */
+       } prod __rte_cache_aligned;
+       struct  {
+               volatile uint32_t head;         /**< consumer head */
+               volatile uint32_t tail;         /**< consumer tail */
+               struct tle_drb * volatile crb;  /**< block to dequeue from */
+       } cons __rte_cache_aligned;
+
+       struct tle_drb dummy;  /**< dummy block */
+};
+
+/*
+ * helper routine, to copy objects to/from the ring.
+ */
+static inline void __attribute__((always_inline))
+__tle_dring_copy_objs(const void *dst[], const void * const src[], uint32_t num)
+{
+       uint32_t i;
+
+       for (i = 0; i != RTE_ALIGN_FLOOR(num, 4); i += 4) {
+               dst[i] = src[i];
+               dst[i + 1] = src[i + 1];
+               dst[i + 2] = src[i + 2];
+               dst[i + 3] = src[i + 3];
+       }
+       switch (num % 4) {
+       case 3:
+               dst[i + 2] = src[i + 2];
+       case 2:
+               dst[i + 1] = src[i + 1];
+       case 1:
+               dst[i] = src[i];
+       }
+}
+
+/*
+ * helper routine, to enqueue objects into the ring.
+ */
+static inline uint32_t __attribute__((always_inline))
+__tle_dring_enqueue(struct tle_dring *dr, uint32_t head,
+       const void * const objs[], uint32_t nb_obj,
+       struct tle_drb *drbs[], uint32_t nb_drb)
+{
+       uint32_t i, j, k, n;
+       struct tle_drb *pb;
+
+       pb = dr->prod.crb;
+       i = 0;
+
+       /* fill the current producer block */
+       if (pb->size != 0) {
+               n = head - pb->start;
+               k = RTE_MIN(pb->size - n, nb_obj);
+               __tle_dring_copy_objs(pb->objs + n, objs, k);
+               i += k;
+       }
+
+       /* fill new blocks, if any */
+       j = 0;
+       if (i != nb_obj && nb_drb != 0) {
+
+               do {
+                       pb->next = drbs[j];
+                       pb = drbs[j];
+                       pb->start = head + i;
+                       k = RTE_MIN(pb->size, nb_obj - i);
+                       __tle_dring_copy_objs(pb->objs, objs + i, k);
+                       i += k;
+               } while (++j != nb_drb && i != nb_obj);
+
+               pb->next = NULL;
+
+               /* new procucer current block. */
+               dr->prod.crb = pb;
+       }
+
+       /* we have to enqueue all requested objects. */
+       TLE_DRING_ASSERT(nb_obj == i);
+
+       /* return number of unused blocks. */
+       return nb_drb - j;
+}
+
+/**
+ * Enqueue several objects on the dring (multi-producers safe).
+ * Note that it is a caller responsibility to provide enough drbs
+ * to enqueue all requested objects.
+ *
+ * @param dr
+ *   A pointer to the ring structure.
+ * @param objs
+ *   An array of pointers to objects to enqueue.
+ * @param nb_obj
+ *   The number of objects to add to the dring from the objs[].
+ * @param drbs
+ *   An array of pointers to the drbs that can be used by the dring
+ *   to perform enqueue operation.
+ * @param nb_drb
+ *   at input: number of elements in the drbs[] array.
+ *   at output: number of unused by the dring elements in the drbs[] array.
+ * @return
+ *   - number of enqueued objects.
+ */
+static inline uint32_t
+tle_dring_mp_enqueue(struct tle_dring *dr, const void * const objs[],
+       uint32_t nb_obj, struct tle_drb *drbs[], uint32_t *nb_drb)
+{
+       uint32_t head, next;
+
+       if (nb_obj == 0)
+               return 0;
+
+       /* reserve position inside the ring. */
+       do {
+               head = dr->prod.head;
+               next = head + nb_obj;
+       } while (rte_atomic32_cmpset(&dr->prod.head, head, next) == 0);
+
+       /*
+        * If there are other enqueues in progress that preceded that one,
+        * then wait for them to complete
+        */
+       while (dr->prod.tail != head)
+               rte_pause();
+
+       /* make sure that changes from previous updates are visible. */
+       rte_smp_rmb();
+
+       /* now it is safe to enqueue into the ring. */
+       *nb_drb = __tle_dring_enqueue(dr, head, objs, nb_obj, drbs, *nb_drb);
+
+       /* make new objects visible to the consumer. */
+       rte_smp_wmb();
+       dr->prod.tail = next;
+
+       return nb_obj;
+}
+
+/**
+ * Enqueue several objects on the dring (NOT multi-producers safe).
+ * Note that it is a caller responsibility to provide enough drbs
+ * to enqueue all requested objects.
+ *
+ * @param dr
+ *   A pointer to the ring structure.
+ * @param objs
+ *   An array of pointers to objects to enqueue.
+ * @param nb_obj
+ *   The number of objects to add to the dring from the objs[].
+ * @param drbs
+ *   An array of pointers to the drbs that can be used by the dring
+ *   to perform enqueue operation.
+ * @param nb_drb
+ *   at input: number of elements in the drbs[] array.
+ *   at output: number of unused by the dring elements in the drbs[] array.
+ * @return
+ *   - number of enqueued objects.
+ */
+static inline uint32_t
+tle_dring_sp_enqueue(struct tle_dring *dr, const void * const objs[],
+       uint32_t nb_obj, struct tle_drb *drbs[], uint32_t *nb_drb)
+{
+       uint32_t head, next;
+
+       if (nb_obj == 0)
+               return 0;
+
+       head = dr->prod.head;
+       next = head + nb_obj;
+
+       /* update producer head value. */
+       dr->prod.head = next;
+
+       /* enqueue into the ring. */
+       *nb_drb = __tle_dring_enqueue(dr, head, objs, nb_obj, drbs, *nb_drb);
+
+       /* make new objects visible to the consumer. */
+       rte_smp_wmb();
+       dr->prod.tail = next;
+
+       return nb_obj;
+}
+
+/*
+ * helper routine, to dequeue objects from the ring.
+ */
+static inline uint32_t __attribute__((always_inline))
+__tle_dring_dequeue(struct tle_dring *dr, uint32_t head,
+       const void *objs[], uint32_t nb_obj,
+       struct tle_drb *drbs[], uint32_t nb_drb)
+{
+       uint32_t i, j, k, n;
+       struct tle_drb *pb;
+
+       pb = dr->cons.crb;
+       i = 0;
+
+       /* copy from the current consumer block */
+       if (pb->size != 0) {
+               n = head - pb->start;
+               k = RTE_MIN(pb->size - n, nb_obj);
+               __tle_dring_copy_objs(objs, pb->objs + n, k);
+               i += k;
+       }
+
+       /* copy from other blocks */
+       j = 0;
+       if (i != nb_obj && nb_drb != 0) {
+
+               do {
+                       /* current block is empty, put it into the free list. */
+                       if (pb != &dr->dummy)
+                               drbs[j++] = pb;
+
+                       /* proceed to the next block. */
+                       pb = pb->next;
+                       k = RTE_MIN(pb->size, nb_obj - i);
+                       __tle_dring_copy_objs(objs + i, pb->objs, k);
+                       i += k;
+               } while (j != nb_drb && i != nb_obj);
+
+               /* new consumer currect block. */
+               dr->cons.crb = pb;
+       }
+
+       /* we have to dequeue all requested objects. */
+       TLE_DRING_ASSERT(nb_obj == i);
+
+       /* return number of blocks to free. */
+       return j;
+}
+
+/**
+ * Dequeue several objects from the dring (multi-consumers safe).
+ * Note, that it is a caller responsibility to provide drbs[] large
+ * enough to store pointers to all drbs that might become unused
+ * after that dequeue operation. It is a caller responsibility to manage
+ * unused drbs after the dequeue operation is completed
+ * (i.e mark them as free/reusable again, etc.).
+ *
+ * @param dr
+ *   A pointer to the ring structure.
+ * @param objs
+ *   An array of pointers to objects that will be dequeued.
+ * @param nb_obj
+ *   The number of objects to dequeue from the dring.
+ * @param drbs
+ *   An array of pointers to the drbs that will become unused after that
+ *   dequeue operation.
+ * @param nb_drb
+ *   at input: number of elements in the drbs[] array.
+ *   at output: number of filled entries in the drbs[] array.
+ * @return
+ *   - number of dequeued objects.
+ */
+static inline uint32_t
+tle_dring_mc_dequeue(struct tle_dring *dr, const void *objs[], uint32_t nb_obj,
+       struct tle_drb *drbs[], uint32_t *nb_drb)
+{
+       uint32_t head, next, num, tail;
+
+       /* move cons.head atomically */
+       do {
+               head = dr->cons.head;
+               tail = dr->prod.tail;
+
+               num = RTE_MIN(tail - head, nb_obj);
+
+               /* no objects to dequeue */
+               if (num == 0) {
+                       *nb_drb = 0;
+                       return 0;
+               }
+
+               next = head + num;
+       } while (rte_atomic32_cmpset(&dr->cons.head, head, next) == 0);
+
+       /*
+        * If there are other dequeues in progress that preceded that one,
+        * then wait for them to complete
+        */
+        while (dr->cons.tail != head)
+               rte_pause();
+
+       /* make sure that changes from previous updates are visible. */
+       rte_smp_rmb();
+
+       /* now it is safe to dequeue from the ring. */
+       *nb_drb = __tle_dring_dequeue(dr, head, objs, num, drbs, *nb_drb);
+
+       /* update consumer tail value. */
+       rte_smp_wmb();
+       dr->cons.tail = next;
+
+       return num;
+}
+
+/**
+ * Dequeue several objects from the dring (NOT multi-consumers safe).
+ * Note, that it is a caller responsibility to provide drbs[] large
+ * enough to store pointers to all drbs that might become unused
+ * after that dequeue operation. It is a caller responsibility to manage
+ * unused drbs after the dequeue operation is completed
+ * (i.e mark them as free/reusable again, etc.).
+ *
+ * @param dr
+ *   A pointer to the ring structure.
+ * @param objs
+ *   An array of pointers to objects that will be dequeued.
+ * @param nb_obj
+ *   The number of objects to dequeue from the dring.
+ * @param drbs
+ *   An array of pointers to the drbs that will become unused after that
+ *   dequeue operation.
+ * @param nb_drb
+ *   at input: number of elements in the drbs[] array.
+ *   at output: number of filled entries in the drbs[] array.
+ * @return
+ *   - number of dequeued objects.
+ */
+static inline uint32_t
+tle_dring_sc_dequeue(struct tle_dring *dr, const void *objs[], uint32_t nb_obj,
+       struct tle_drb *drbs[], uint32_t *nb_drb)
+{
+       uint32_t head, next, num, tail;
+
+       head = dr->cons.head;
+       tail = dr->prod.tail;
+
+       num = RTE_MIN(tail - head, nb_obj);
+
+       /* no objects to dequeue */
+       if (num == 0) {
+               *nb_drb = 0;
+               return 0;
+       }
+
+       next = head + num;
+
+       /* update consumer head value. */
+       dr->cons.head = next;
+
+       /* dequeue from the ring. */
+       *nb_drb = __tle_dring_dequeue(dr, head, objs, num, drbs, *nb_drb);
+
+       /* update consumer tail value. */
+       rte_smp_wmb();
+       dr->cons.tail = next;
+
+       return num;
+}
+
+/**
+ * Reset given dring to the initial state.
+ * Note, that information about all queued objects will be lost.
+ *
+ * @param dr
+ *   A pointer to the dring structure.
+ */
+static inline void
+tle_dring_reset(struct tle_dring *dr)
+{
+       memset(dr, 0, sizeof(*dr));
+       dr->prod.crb = &dr->dummy;
+       dr->cons.crb = &dr->dummy;
+}
+
+/**
+ * Calculate required size for drb to store up to *num* objects.
+ *
+ * @param num
+ *   Number of objects drb should be able to store.
+ * @return
+ *   - required size of the drb.
+ */
+static inline size_t
+tle_drb_calc_size(uint32_t num)
+{
+       size_t sz;
+
+       sz = offsetof(struct tle_drb, objs[num]);
+       return RTE_ALIGN_CEIL(sz, RTE_CACHE_LINE_SIZE);
+}
+
+/**
+ * Dump information about the dring to the file.
+ *
+ * @param f
+ *   A pointer to the file.
+ * @param verb
+ *   Verbosity level (currently only 0 or 1).
+ * @param dr
+ *   A pointer to the dring structure.
+ */
+extern void tle_dring_dump(FILE *f, int32_t verb, const struct tle_dring *dr);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* _TLE_DRING_H_ */
diff --git a/test/Makefile b/test/Makefile
new file mode 100644 (file)
index 0000000..fb446b4
--- /dev/null
@@ -0,0 +1,26 @@
+# Copyright (c) 2016 Intel Corporation.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+ifeq ($(RTE_SDK),)
+$(error "Please define RTE_SDK environment variable")
+endif
+
+ifeq ($(TLDK_ROOT),)
+$(error "Please define TLDK_SDK environment variable")
+endif
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+DIRS-y += dring
+
+include $(TLDK_ROOT)/mk/tle.subdir.mk
diff --git a/test/dring/Makefile b/test/dring/Makefile
new file mode 100644 (file)
index 0000000..6822a1f
--- /dev/null
@@ -0,0 +1,42 @@
+# Copyright (c) 2016 Intel Corporation.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at:
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+ifeq ($(RTE_SDK),)
+$(error "Please define RTE_SDK environment variable")
+endif
+
+ifeq ($(RTE_TARGET),)
+$(error "Please define RTE_TARGET environment variable")
+endif
+
+ifeq ($(TLDK_ROOT),)
+$(error "Please define TLDK_ROOT environment variable")
+endif
+
+include $(RTE_SDK)/mk/rte.vars.mk
+
+# binary name
+APP = test_dring
+
+# all source are stored in SRCS-y
+SRCS-y += test_dring.c
+
+CFLAGS += $(WERROR_FLAGS)
+CFLAGS += -I$(TLDK_ROOT)/$(RTE_TARGET)/include
+
+LDLIBS += -L$(TLDK_ROOT)/$(RTE_TARGET)/lib
+LDLIBS += -ltle_dring
+
+EXTRA_CFLAGS += -O3
+
+include $(RTE_SDK)/mk/rte.extapp.mk
diff --git a/test/dring/test_dring.c b/test/dring/test_dring.c
new file mode 100644 (file)
index 0000000..22d0db4
--- /dev/null
@@ -0,0 +1,525 @@
+/*
+ * Copyright (c) 2016  Intel Corporation.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <string.h>
+#include <stdarg.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdint.h>
+#include <inttypes.h>
+#include <errno.h>
+
+#include <rte_common.h>
+#include <rte_log.h>
+#include <rte_errno.h>
+#include <rte_launch.h>
+#include <rte_cycles.h>
+#include <rte_eal.h>
+#include <rte_per_lcore.h>
+#include <rte_lcore.h>
+#include <rte_ring.h>
+#include <tle_dring.h>
+#include <rte_random.h>
+
+#define OBJ_NUM                UINT16_MAX
+#define ITER_NUM       (4 * OBJ_NUM)
+
+enum {
+       NONE,
+       SINGLE,
+       MULTI,
+};
+
+struct dring_arg {
+       struct tle_dring *dr;
+       struct rte_ring *r;
+       uint32_t iter;
+       int32_t enq_type;
+       int32_t deq_type;
+       uint32_t enq;
+       uint32_t deq;
+};
+
+/*
+ * free memory allocated for drbs and for the ring itself.
+ */
+static void
+fini_drb_ring(struct rte_ring *r)
+{
+       struct tle_drb *drb;
+
+       /* free drbs. */
+       while (rte_ring_dequeue(r, (void **)&drb) == 0)
+               free(drb);
+
+       /* free ring. */
+       free(r);
+}
+
+/*
+ * allocate drbs for specified number of objects, put them into the ring.
+ */
+static struct rte_ring *
+init_drb_ring(uint32_t num)
+{
+       uint32_t i, k, n;
+       size_t sz, tsz;
+       struct rte_ring *r;
+       struct tle_drb *drb;
+
+       /* allocate and initialise rte_ring. */
+
+       n = rte_align32pow2(num);
+       sz =  sizeof(*r) + n * sizeof(r->ring[0]);
+
+       r = calloc(1, sz);
+       if (r == NULL) {
+               printf("%s:%d(%u) failed to allocate %zu bytes;\n",
+                       __func__, __LINE__, num, sz);
+               return NULL;
+       }
+
+       rte_ring_init(r, __func__, n, 0);
+
+       /* allocate drbs and put them into the ring. */
+
+       tsz = sz;
+       for (i = 0; i != num; i += k) {
+               k =  rte_rand() % (UINT8_MAX + 1) + 1;
+               k = RTE_MIN(k, num - i);
+               sz = tle_drb_calc_size(k);
+               drb = calloc(1, sz);
+               if (drb == NULL) {
+                       printf("%s:%d(%u) %u-th iteration: "
+                               "failed to allocate %zu bytes;\n",
+                               __func__, __LINE__, num, i, sz);
+                       fini_drb_ring(r);
+                       return NULL;
+               }
+               drb->size = k;
+               rte_ring_enqueue(r, drb);
+               tsz += sz;
+       }
+
+       printf("%s(%u) total %zu bytes allocated, number of drbs: %u;\n",
+               __func__, num, tsz, rte_ring_count(r));
+       return r;
+}
+
+/*
+ * Each enqueued object will contain:
+ * [2-3]B: it's own sequence number.
+ * [0-1]B: next object sequence number, or UINT16_MAX.
+ */
+static void
+test_fill_obj(uintptr_t obj[], uint32_t num)
+{
+       uint32_t i;
+
+       for (i = 0; i != num - 1; i++)
+               obj[i] = i << 16 | (i + 1);
+
+       obj[i] = i << 16 | UINT16_MAX;
+}
+
+static uint32_t
+test_check_obj(uintptr_t obj[], uint32_t num)
+{
+       uint32_t i, h, l, oh, ol;
+
+       h = obj[0] >> 16;
+       l = obj[0] & UINT16_MAX;
+
+       if (h + 1 != l && l != UINT16_MAX)
+               return 0;
+
+       if (l == UINT16_MAX)
+               l = 0;
+
+       for (i = 1; i != num; i++) {
+
+               oh = obj[i] >> 16;
+               ol = obj[i] & UINT16_MAX;
+
+               if (l != oh || (oh + 1 != ol && ol != UINT16_MAX))
+                       return i;
+
+               l = ol;
+               if (l == UINT16_MAX)
+                       l = 0;
+       }
+
+       return num;
+}
+
+static int
+test_dring_dequeue(struct tle_dring *dr, struct rte_ring *r, uint32_t num,
+       int32_t type)
+{
+       uint32_t i, k, lc, n, t;
+       struct tle_drb *drb[num];
+       uintptr_t obj[num];
+
+       lc = rte_lcore_id();
+       k = num;
+
+       /* dequeue objects. */
+       if (type == SINGLE)
+               n = tle_dring_sc_dequeue(dr, (const void **)obj, num, drb, &k);
+       else if (type == MULTI)
+               n = tle_dring_mc_dequeue(dr, (const void **)obj, num, drb, &k);
+       else
+               return -EINVAL;
+
+       if (n == 0)
+               return 0;
+
+       /* check the data returned. */
+       t = test_check_obj(obj, n);
+       if (t != n) {
+               printf("%s:%d(%p, %u) at lcore %u: invalid dequeued object, "
+                       "n=%u, idx=%u, obj=%#x, prev obj=%#x;\n",
+                       __func__, __LINE__, dr, num, lc, n, t,
+                       (uint32_t)obj[t], (t == 0) ? 0 : (uint32_t)obj[t - 1]);
+               return -EFAULT;
+       }
+
+       /* check and free drbs. */
+       for (i = 0; i != k; i++) {
+               /* udata value for drb in use shouldn't be zero. */
+               if (drb[i]->udata == NULL) {
+                       printf("error @ %s:%d(%p, %u) at lcore %u: "
+                               "erroneous drb@%p={udata=%p, size=%u,};\n",
+                               __func__, __LINE__, dr, num, lc, drb[i],
+                               drb[i]->udata, drb[i]->size);
+                       return -EFAULT;
+               }
+               drb[i]->udata = NULL;
+               rte_ring_enqueue(r, drb[i]);
+       }
+
+       return n;
+}
+
+static int
+test_dring_enqueue(struct tle_dring *dr, struct rte_ring *r, uint32_t num,
+       int32_t type)
+{
+       uint32_t i, j, k, lc, nb;
+       struct tle_drb *drb[num];
+       uintptr_t obj[num];
+
+       lc = rte_lcore_id();
+
+       /* prepare drbs to enqueue up to *num* objects. */
+       for (i = 0, j = 0; i != num; i += k, j++) {
+
+               if (rte_ring_dequeue(r, (void **)&drb[j]) != 0)
+                       break;
+
+               /* udata value for unused drb should be zero. */
+               if (drb[j]->udata != NULL) {
+                       printf("error @ %s:%d(%p, %u) at lcore %u: "
+                               "erroneous drb@%p={udata=%p, size=%u,};\n",
+                               __func__, __LINE__, dr, num, lc, drb[j],
+                               drb[j]->udata, drb[j]->size);
+                       return -EFAULT;
+               }
+
+               /* update udata value with current lcore id. */
+               drb[j]->udata = (void *)(uintptr_t)(lc + 1);
+               k = drb[j]->size;
+               k = RTE_MIN(k, num - i);
+       }
+
+       /* no free drbs left. */
+       if (i == 0)
+               return 0;
+
+       /* fill objects to enqueue. */
+       test_fill_obj(obj, i);
+
+       /* enqueue into the dring. */
+       nb = j;
+       if (type == SINGLE)
+               k = tle_dring_sp_enqueue(dr, (const void **)obj, i, drb, &nb);
+       else if (type == MULTI)
+               k = tle_dring_mp_enqueue(dr, (const void **)obj, i, drb, &nb);
+       else
+               return -EINVAL;
+
+       if (k != i) {
+               printf("%s:%d(%p, %p, %u): failed to enqueue %u objects;\n",
+                       __func__, __LINE__, dr, r, num, i);
+       }
+
+       /* free unused drbs */
+       for (i = j - nb; i != j; i++) {
+               if ((uintptr_t)drb[i]->udata != lc + 1) {
+                       printf("error @ %s:%d(%p, %u) at lcore %u: "
+                               "erroneous drb@%p={udata=%p, size=%u,};\n",
+                               __func__, __LINE__, dr, num, lc, drb[i],
+                               drb[i]->udata, drb[i]->size);
+                       return -EFAULT;
+               }
+               drb[i]->udata = NULL;
+               rte_ring_enqueue(r, drb[i]);
+       }
+
+       return k;
+}
+
+static int
+test_dring_enq_deq(struct dring_arg *arg)
+{
+       int32_t rc;
+       uint32_t i, lc, n;
+
+       rc = 0;
+       arg->enq = 0;
+       arg->deq = 0;
+       lc = rte_lcore_id();
+
+       for (i = 0; i != arg->iter; i++) {
+
+               /* try to enqueue random number of objects. */
+               if (arg->enq_type != NONE) {
+                       n = rte_rand() % (UINT8_MAX + 1);
+                       rc = test_dring_enqueue(arg->dr, arg->r, n,
+                               arg->enq_type);
+                       if (rc < 0)
+                               break;
+                       arg->enq += rc;
+               }
+
+               /* try to dequeue random number of objects. */
+               if (arg->deq_type != NONE) {
+                       n = rte_rand() % (UINT8_MAX + 1);
+                       rc = test_dring_dequeue(arg->dr, arg->r, n,
+                               arg->deq_type);
+                       if (rc < 0)
+                               break;
+                       arg->deq += rc;
+               }
+       }
+
+       if (rc < 0)
+               return rc;
+
+       /* dequeue remaining objects. */
+       while (arg->deq_type != NONE && arg->enq != arg->deq) {
+
+               /* try to dequeue random number of objects. */
+               n = rte_rand() % (UINT8_MAX + 1) + 1;
+               rc = test_dring_dequeue(arg->dr, arg->r, n, arg->deq_type);
+               if (rc <= 0)
+                       break;
+               arg->deq += rc;
+       }
+
+       printf("%s:%d(lcore=%u, enq_type=%d, deq_type=%d): "
+               "%u objects enqueued, %u objects dequeued\n",
+               __func__, __LINE__, lc, arg->enq_type, arg->deq_type,
+               arg->enq, arg->deq);
+       return 0;
+}
+
+/*
+ * enqueue/dequeue by single thread.
+ */
+static int
+test_dring_st(void)
+{
+       int32_t rc;
+       struct rte_ring *r;
+       struct tle_dring dr;
+       struct dring_arg arg;
+
+       printf("%s started;\n", __func__);
+
+       tle_dring_reset(&dr);
+       r = init_drb_ring(OBJ_NUM);
+       if (r == NULL)
+               return -ENOMEM;
+
+       tle_dring_dump(stdout, 1, &dr);
+
+       memset(&arg, 0, sizeof(arg));
+       arg.dr = &dr;
+       arg.r = r;
+       arg.iter = ITER_NUM;
+       arg.enq_type = SINGLE;
+       arg.deq_type = SINGLE;
+       rc = test_dring_enq_deq(&arg);
+
+       rc = (rc != 0) ? rc : (arg.enq != arg.deq);
+       printf("%s finished with status: %s(%d);\n",
+               __func__, strerror(-rc), rc);
+
+       tle_dring_dump(stdout, rc != 0, &dr);
+       fini_drb_ring(r);
+
+       return rc;
+}
+
+static int
+test_dring_worker(void *arg)
+{
+       struct dring_arg *p;
+
+       p = (struct dring_arg *)arg;
+       return test_dring_enq_deq(p);
+}
+
+/*
+ * enqueue/dequeue by multiple threads.
+ */
+static int
+test_dring_mt(int32_t master_enq_type, int32_t master_deq_type,
+       int32_t slave_enq_type, int32_t slave_deq_type)
+{
+       int32_t rc;
+       uint32_t lc;
+       uint64_t deq, enq;
+       struct rte_ring *r;
+       struct tle_dring dr;
+       struct dring_arg arg[RTE_MAX_LCORE];
+
+       tle_dring_reset(&dr);
+       r = init_drb_ring(OBJ_NUM);
+       if (r == NULL)
+               return -ENOMEM;
+
+       memset(arg, 0, sizeof(arg));
+
+       /* launch on all slaves */
+       RTE_LCORE_FOREACH_SLAVE(lc) {
+               arg[lc].dr = &dr;
+               arg[lc].r = r;
+               arg[lc].iter = ITER_NUM;
+               arg[lc].enq_type = slave_enq_type;
+               arg[lc].deq_type = slave_deq_type;
+               rte_eal_remote_launch(test_dring_worker, &arg[lc], lc);
+       }
+
+       /* launch on master */
+       lc = rte_lcore_id();
+       arg[lc].dr = &dr;
+       arg[lc].r = r;
+       arg[lc].iter = ITER_NUM;
+       arg[lc].enq_type = master_enq_type;
+       arg[lc].deq_type = master_deq_type;
+       rc = test_dring_worker(&arg[lc]);
+       enq = arg[lc].enq;
+       deq = arg[lc].deq;
+
+       /* wait for slaves. */
+       RTE_LCORE_FOREACH_SLAVE(lc) {
+               rc |= rte_eal_wait_lcore(lc);
+               enq += arg[lc].enq;
+               deq += arg[lc].deq;
+       }
+
+       printf("%s:%d: total %" PRIu64 " objects enqueued, %"
+               PRIu64 " objects dequeued\n",
+               __func__, __LINE__, enq, deq);
+
+       rc = (rc != 0) ? rc : (enq != deq);
+       if (rc != 0)
+               tle_dring_dump(stdout, 1, &dr);
+
+       fini_drb_ring(r);
+       return rc;
+}
+
+static int
+test_dring_mp_mc(void)
+{
+       int32_t rc;
+
+       printf("%s started;\n", __func__);
+       rc = test_dring_mt(MULTI, MULTI, MULTI, MULTI);
+       printf("%s finished with status: %s(%d);\n",
+               __func__, strerror(-rc), rc);
+       return rc;
+}
+
+static int
+test_dring_mp_sc(void)
+{
+       int32_t rc;
+
+       printf("%s started;\n", __func__);
+       rc = test_dring_mt(MULTI, SINGLE, MULTI, NONE);
+       printf("%s finished with status: %s(%d);\n",
+               __func__, strerror(-rc), rc);
+       return rc;
+}
+
+static int
+test_dring_sp_mc(void)
+{
+       int32_t rc;
+
+       printf("%s started;\n", __func__);
+       rc = test_dring_mt(SINGLE, MULTI, NONE, MULTI);
+       printf("%s finished with status: %s(%d);\n",
+               __func__, strerror(-rc), rc);
+       return rc;
+}
+
+static int
+test_dring(void)
+{
+       int32_t rc;
+
+       rc = test_dring_st();
+       if (rc != 0)
+               return rc;
+
+       rc = test_dring_mp_mc();
+       if (rc != 0)
+               return rc;
+
+       rc = test_dring_mp_sc();
+       if (rc != 0)
+               return rc;
+
+       rc = test_dring_sp_mc();
+       if (rc != 0)
+               return rc;
+
+       return 0;
+}
+
+int
+main(int argc, char *argv[])
+{
+       int32_t rc;
+
+       rc = rte_eal_init(argc, argv);
+       if (rc < 0)
+               rte_exit(EXIT_FAILURE,
+                       "%s: rte_eal_init failed with error code: %d\n",
+                       __func__, rc);
+
+       rc = test_dring();
+       if (rc != 0)
+               printf("TEST FAILED\n");
+       else
+               printf("TEST OK\n");
+
+       return rc;
+}