New upstream version 16.11.5
[deb_dpdk.git] / app / test / test_ring_perf.c
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33
34
35 #include <stdio.h>
36 #include <inttypes.h>
37 #include <rte_ring.h>
38 #include <rte_cycles.h>
39 #include <rte_launch.h>
40
41 #include "test.h"
42
43 /*
44  * Ring
45  * ====
46  *
47  * Measures performance of various operations using rdtsc
48  *  * Empty ring dequeue
49  *  * Enqueue/dequeue of bursts in 1 threads
50  *  * Enqueue/dequeue of bursts in 2 threads
51  */
52
53 #define RING_NAME "RING_PERF"
54 #define RING_SIZE 4096
55 #define MAX_BURST 32
56
57 /*
58  * the sizes to enqueue and dequeue in testing
59  * (marked volatile so they won't be seen as compile-time constants)
60  */
61 static const volatile unsigned bulk_sizes[] = { 8, 32 };
62
63 struct lcore_pair {
64         unsigned c1, c2;
65 };
66
67 static volatile unsigned lcore_count = 0;
68
69 /**** Functions to analyse our core mask to get cores for different tests ***/
70
71 static int
72 get_two_hyperthreads(struct lcore_pair *lcp)
73 {
74         unsigned id1, id2;
75         unsigned c1, c2, s1, s2;
76         RTE_LCORE_FOREACH(id1) {
77                 /* inner loop just re-reads all id's. We could skip the first few
78                  * elements, but since number of cores is small there is little point
79                  */
80                 RTE_LCORE_FOREACH(id2) {
81                         if (id1 == id2)
82                                 continue;
83                         c1 = lcore_config[id1].core_id;
84                         c2 = lcore_config[id2].core_id;
85                         s1 = lcore_config[id1].socket_id;
86                         s2 = lcore_config[id2].socket_id;
87                         if ((c1 == c2) && (s1 == s2)){
88                                 lcp->c1 = id1;
89                                 lcp->c2 = id2;
90                                 return 0;
91                         }
92                 }
93         }
94         return 1;
95 }
96
97 static int
98 get_two_cores(struct lcore_pair *lcp)
99 {
100         unsigned id1, id2;
101         unsigned c1, c2, s1, s2;
102         RTE_LCORE_FOREACH(id1) {
103                 RTE_LCORE_FOREACH(id2) {
104                         if (id1 == id2)
105                                 continue;
106                         c1 = lcore_config[id1].core_id;
107                         c2 = lcore_config[id2].core_id;
108                         s1 = lcore_config[id1].socket_id;
109                         s2 = lcore_config[id2].socket_id;
110                         if ((c1 != c2) && (s1 == s2)){
111                                 lcp->c1 = id1;
112                                 lcp->c2 = id2;
113                                 return 0;
114                         }
115                 }
116         }
117         return 1;
118 }
119
120 static int
121 get_two_sockets(struct lcore_pair *lcp)
122 {
123         unsigned id1, id2;
124         unsigned s1, s2;
125         RTE_LCORE_FOREACH(id1) {
126                 RTE_LCORE_FOREACH(id2) {
127                         if (id1 == id2)
128                                 continue;
129                         s1 = lcore_config[id1].socket_id;
130                         s2 = lcore_config[id2].socket_id;
131                         if (s1 != s2){
132                                 lcp->c1 = id1;
133                                 lcp->c2 = id2;
134                                 return 0;
135                         }
136                 }
137         }
138         return 1;
139 }
140
141 /* Get cycle counts for dequeuing from an empty ring. Should be 2 or 3 cycles */
142 static void
143 test_empty_dequeue(struct rte_ring *r)
144 {
145         const unsigned iter_shift = 26;
146         const unsigned iterations = 1<<iter_shift;
147         unsigned i = 0;
148         void *burst[MAX_BURST];
149
150         const uint64_t sc_start = rte_rdtsc();
151         for (i = 0; i < iterations; i++)
152                 rte_ring_sc_dequeue_bulk(r, burst, bulk_sizes[0]);
153         const uint64_t sc_end = rte_rdtsc();
154
155         const uint64_t mc_start = rte_rdtsc();
156         for (i = 0; i < iterations; i++)
157                 rte_ring_mc_dequeue_bulk(r, burst, bulk_sizes[0]);
158         const uint64_t mc_end = rte_rdtsc();
159
160         printf("SC empty dequeue: %.2F\n",
161                         (double)(sc_end-sc_start) / iterations);
162         printf("MC empty dequeue: %.2F\n",
163                         (double)(mc_end-mc_start) / iterations);
164 }
165
166 /*
167  * for the separate enqueue and dequeue threads they take in one param
168  * and return two. Input = burst size, output = cycle average for sp/sc & mp/mc
169  */
170 struct thread_params {
171         struct rte_ring *r;
172         unsigned size;        /* input value, the burst size */
173         double spsc, mpmc;    /* output value, the single or multi timings */
174 };
175
176 /*
177  * Function that uses rdtsc to measure timing for ring enqueue. Needs pair
178  * thread running dequeue_bulk function
179  */
180 static int
181 enqueue_bulk(void *p)
182 {
183         const unsigned iter_shift = 23;
184         const unsigned iterations = 1<<iter_shift;
185         struct thread_params *params = p;
186         struct rte_ring *r = params->r;
187         const unsigned size = params->size;
188         unsigned i;
189         void *burst[MAX_BURST] = {0};
190
191         if ( __sync_add_and_fetch(&lcore_count, 1) != 2 )
192                 while(lcore_count != 2)
193                         rte_pause();
194
195         const uint64_t sp_start = rte_rdtsc();
196         for (i = 0; i < iterations; i++)
197                 while (rte_ring_sp_enqueue_bulk(r, burst, size) != 0)
198                         rte_pause();
199         const uint64_t sp_end = rte_rdtsc();
200
201         const uint64_t mp_start = rte_rdtsc();
202         for (i = 0; i < iterations; i++)
203                 while (rte_ring_mp_enqueue_bulk(r, burst, size) != 0)
204                         rte_pause();
205         const uint64_t mp_end = rte_rdtsc();
206
207         params->spsc = ((double)(sp_end - sp_start))/(iterations*size);
208         params->mpmc = ((double)(mp_end - mp_start))/(iterations*size);
209         return 0;
210 }
211
212 /*
213  * Function that uses rdtsc to measure timing for ring dequeue. Needs pair
214  * thread running enqueue_bulk function
215  */
216 static int
217 dequeue_bulk(void *p)
218 {
219         const unsigned iter_shift = 23;
220         const unsigned iterations = 1<<iter_shift;
221         struct thread_params *params = p;
222         struct rte_ring *r = params->r;
223         const unsigned size = params->size;
224         unsigned i;
225         void *burst[MAX_BURST] = {0};
226
227         if ( __sync_add_and_fetch(&lcore_count, 1) != 2 )
228                 while(lcore_count != 2)
229                         rte_pause();
230
231         const uint64_t sc_start = rte_rdtsc();
232         for (i = 0; i < iterations; i++)
233                 while (rte_ring_sc_dequeue_bulk(r, burst, size) != 0)
234                         rte_pause();
235         const uint64_t sc_end = rte_rdtsc();
236
237         const uint64_t mc_start = rte_rdtsc();
238         for (i = 0; i < iterations; i++)
239                 while (rte_ring_mc_dequeue_bulk(r, burst, size) != 0)
240                         rte_pause();
241         const uint64_t mc_end = rte_rdtsc();
242
243         params->spsc = ((double)(sc_end - sc_start))/(iterations*size);
244         params->mpmc = ((double)(mc_end - mc_start))/(iterations*size);
245         return 0;
246 }
247
248 /*
249  * Function that calls the enqueue and dequeue bulk functions on pairs of cores.
250  * used to measure ring perf between hyperthreads, cores and sockets.
251  */
252 static void
253 run_on_core_pair(struct lcore_pair *cores, struct rte_ring *r,
254                 lcore_function_t f1, lcore_function_t f2)
255 {
256         struct thread_params param1 = {0}, param2 = {0};
257         unsigned i;
258         for (i = 0; i < sizeof(bulk_sizes)/sizeof(bulk_sizes[0]); i++) {
259                 lcore_count = 0;
260                 param1.size = param2.size = bulk_sizes[i];
261                 param1.r = param2.r = r;
262                 if (cores->c1 == rte_get_master_lcore()) {
263                         rte_eal_remote_launch(f2, &param2, cores->c2);
264                         f1(&param1);
265                         rte_eal_wait_lcore(cores->c2);
266                 } else {
267                         rte_eal_remote_launch(f1, &param1, cores->c1);
268                         rte_eal_remote_launch(f2, &param2, cores->c2);
269                         rte_eal_wait_lcore(cores->c1);
270                         rte_eal_wait_lcore(cores->c2);
271                 }
272                 printf("SP/SC bulk enq/dequeue (size: %u): %.2F\n", bulk_sizes[i],
273                                 param1.spsc + param2.spsc);
274                 printf("MP/MC bulk enq/dequeue (size: %u): %.2F\n", bulk_sizes[i],
275                                 param1.mpmc + param2.mpmc);
276         }
277 }
278
279 /*
280  * Test function that determines how long an enqueue + dequeue of a single item
281  * takes on a single lcore. Result is for comparison with the bulk enq+deq.
282  */
283 static void
284 test_single_enqueue_dequeue(struct rte_ring *r)
285 {
286         const unsigned iter_shift = 24;
287         const unsigned iterations = 1<<iter_shift;
288         unsigned i = 0;
289         void *burst = NULL;
290
291         const uint64_t sc_start = rte_rdtsc();
292         for (i = 0; i < iterations; i++) {
293                 rte_ring_sp_enqueue(r, burst);
294                 rte_ring_sc_dequeue(r, &burst);
295         }
296         const uint64_t sc_end = rte_rdtsc();
297
298         const uint64_t mc_start = rte_rdtsc();
299         for (i = 0; i < iterations; i++) {
300                 rte_ring_mp_enqueue(r, burst);
301                 rte_ring_mc_dequeue(r, &burst);
302         }
303         const uint64_t mc_end = rte_rdtsc();
304
305         printf("SP/SC single enq/dequeue: %"PRIu64"\n",
306                         (sc_end-sc_start) >> iter_shift);
307         printf("MP/MC single enq/dequeue: %"PRIu64"\n",
308                         (mc_end-mc_start) >> iter_shift);
309 }
310
311 /*
312  * Test that does both enqueue and dequeue on a core using the burst() API calls
313  * instead of the bulk() calls used in other tests. Results should be the same
314  * as for the bulk function called on a single lcore.
315  */
316 static void
317 test_burst_enqueue_dequeue(struct rte_ring *r)
318 {
319         const unsigned iter_shift = 23;
320         const unsigned iterations = 1<<iter_shift;
321         unsigned sz, i = 0;
322         void *burst[MAX_BURST] = {0};
323
324         for (sz = 0; sz < sizeof(bulk_sizes)/sizeof(bulk_sizes[0]); sz++) {
325                 const uint64_t sc_start = rte_rdtsc();
326                 for (i = 0; i < iterations; i++) {
327                         rte_ring_sp_enqueue_burst(r, burst, bulk_sizes[sz]);
328                         rte_ring_sc_dequeue_burst(r, burst, bulk_sizes[sz]);
329                 }
330                 const uint64_t sc_end = rte_rdtsc();
331
332                 const uint64_t mc_start = rte_rdtsc();
333                 for (i = 0; i < iterations; i++) {
334                         rte_ring_mp_enqueue_burst(r, burst, bulk_sizes[sz]);
335                         rte_ring_mc_dequeue_burst(r, burst, bulk_sizes[sz]);
336                 }
337                 const uint64_t mc_end = rte_rdtsc();
338
339                 uint64_t mc_avg = ((mc_end-mc_start) >> iter_shift) / bulk_sizes[sz];
340                 uint64_t sc_avg = ((sc_end-sc_start) >> iter_shift) / bulk_sizes[sz];
341
342                 printf("SP/SC burst enq/dequeue (size: %u): %"PRIu64"\n", bulk_sizes[sz],
343                                 sc_avg);
344                 printf("MP/MC burst enq/dequeue (size: %u): %"PRIu64"\n", bulk_sizes[sz],
345                                 mc_avg);
346         }
347 }
348
349 /* Times enqueue and dequeue on a single lcore */
350 static void
351 test_bulk_enqueue_dequeue(struct rte_ring *r)
352 {
353         const unsigned iter_shift = 23;
354         const unsigned iterations = 1<<iter_shift;
355         unsigned sz, i = 0;
356         void *burst[MAX_BURST] = {0};
357
358         for (sz = 0; sz < sizeof(bulk_sizes)/sizeof(bulk_sizes[0]); sz++) {
359                 const uint64_t sc_start = rte_rdtsc();
360                 for (i = 0; i < iterations; i++) {
361                         rte_ring_sp_enqueue_bulk(r, burst, bulk_sizes[sz]);
362                         rte_ring_sc_dequeue_bulk(r, burst, bulk_sizes[sz]);
363                 }
364                 const uint64_t sc_end = rte_rdtsc();
365
366                 const uint64_t mc_start = rte_rdtsc();
367                 for (i = 0; i < iterations; i++) {
368                         rte_ring_mp_enqueue_bulk(r, burst, bulk_sizes[sz]);
369                         rte_ring_mc_dequeue_bulk(r, burst, bulk_sizes[sz]);
370                 }
371                 const uint64_t mc_end = rte_rdtsc();
372
373                 double sc_avg = ((double)(sc_end-sc_start) /
374                                 (iterations * bulk_sizes[sz]));
375                 double mc_avg = ((double)(mc_end-mc_start) /
376                                 (iterations * bulk_sizes[sz]));
377
378                 printf("SP/SC bulk enq/dequeue (size: %u): %.2F\n", bulk_sizes[sz],
379                                 sc_avg);
380                 printf("MP/MC bulk enq/dequeue (size: %u): %.2F\n", bulk_sizes[sz],
381                                 mc_avg);
382         }
383 }
384
385 static int
386 test_ring_perf(void)
387 {
388         struct lcore_pair cores;
389         struct rte_ring *r = NULL;
390
391         r = rte_ring_create(RING_NAME, RING_SIZE, rte_socket_id(), 0);
392         if (r == NULL)
393                 return -1;
394
395         printf("### Testing single element and burst enq/deq ###\n");
396         test_single_enqueue_dequeue(r);
397         test_burst_enqueue_dequeue(r);
398
399         printf("\n### Testing empty dequeue ###\n");
400         test_empty_dequeue(r);
401
402         printf("\n### Testing using a single lcore ###\n");
403         test_bulk_enqueue_dequeue(r);
404
405         if (get_two_hyperthreads(&cores) == 0) {
406                 printf("\n### Testing using two hyperthreads ###\n");
407                 run_on_core_pair(&cores, r, enqueue_bulk, dequeue_bulk);
408         }
409         if (get_two_cores(&cores) == 0) {
410                 printf("\n### Testing using two physical cores ###\n");
411                 run_on_core_pair(&cores, r, enqueue_bulk, dequeue_bulk);
412         }
413         if (get_two_sockets(&cores) == 0) {
414                 printf("\n### Testing using two NUMA nodes ###\n");
415                 run_on_core_pair(&cores, r, enqueue_bulk, dequeue_bulk);
416         }
417         rte_ring_free(r);
418         return 0;
419 }
420
421 REGISTER_TEST_COMMAND(ring_perf_autotest, test_ring_perf);