New upstream version 17.11-rc3
[deb_dpdk.git] / examples / quota_watermark / qw / main.c
1 /*-
2  *   BSD LICENSE
3  *
4  *   Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
5  *   All rights reserved.
6  *
7  *   Redistribution and use in source and binary forms, with or without
8  *   modification, are permitted provided that the following conditions
9  *   are met:
10  *
11  *     * Redistributions of source code must retain the above copyright
12  *       notice, this list of conditions and the following disclaimer.
13  *     * Redistributions in binary form must reproduce the above copyright
14  *       notice, this list of conditions and the following disclaimer in
15  *       the documentation and/or other materials provided with the
16  *       distribution.
17  *     * Neither the name of Intel Corporation nor the names of its
18  *       contributors may be used to endorse or promote products derived
19  *       from this software without specific prior written permission.
20  *
21  *   THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  *   "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  *   LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  *   A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  *   OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  *   SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  *   LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  *   DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  *   THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  *   (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  *   OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33
34 #include <rte_eal.h>
35
36 #include <rte_common.h>
37 #include <rte_debug.h>
38 #include <rte_errno.h>
39 #include <rte_ethdev.h>
40 #include <rte_launch.h>
41 #include <rte_lcore.h>
42 #include <rte_log.h>
43 #include <rte_mbuf.h>
44 #include <rte_ring.h>
45
46 #include <rte_byteorder.h>
47
48 #include "args.h"
49 #include "main.h"
50 #include "init.h"
51 #include "../include/conf.h"
52
53
54 #ifdef QW_SOFTWARE_FC
55 #define SEND_PAUSE_FRAME(port_id, duration) send_pause_frame(port_id, duration)
56 #else
57 #define SEND_PAUSE_FRAME(port_id, duration) do { } while(0)
58 #endif
59
60 #define ETHER_TYPE_FLOW_CONTROL 0x8808
61
62 struct ether_fc_frame {
63         uint16_t opcode;
64         uint16_t param;
65 } __attribute__((__packed__));
66
67
68 int *quota;
69 unsigned int *low_watermark;
70 unsigned int *high_watermark;
71
72 uint16_t port_pairs[RTE_MAX_ETHPORTS];
73
74 struct rte_ring *rings[RTE_MAX_LCORE][RTE_MAX_ETHPORTS];
75 struct rte_mempool *mbuf_pool;
76
77
78 static void send_pause_frame(uint16_t port_id, uint16_t duration)
79 {
80         struct rte_mbuf *mbuf;
81         struct ether_fc_frame *pause_frame;
82         struct ether_hdr *hdr;
83         struct ether_addr mac_addr;
84
85         RTE_LOG_DP(DEBUG, USER1,
86                         "Sending PAUSE frame (duration=%d) on port %d\n",
87                         duration, port_id);
88
89         /* Get a mbuf from the pool */
90         mbuf = rte_pktmbuf_alloc(mbuf_pool);
91         if (unlikely(mbuf == NULL))
92                 return;
93
94         /* Prepare a PAUSE frame */
95         hdr = rte_pktmbuf_mtod(mbuf, struct ether_hdr *);
96         pause_frame = (struct ether_fc_frame *) &hdr[1];
97
98         rte_eth_macaddr_get(port_id, &mac_addr);
99         ether_addr_copy(&mac_addr, &hdr->s_addr);
100
101         void *tmp = &hdr->d_addr.addr_bytes[0];
102         *((uint64_t *)tmp) = 0x010000C28001ULL;
103
104         hdr->ether_type = rte_cpu_to_be_16(ETHER_TYPE_FLOW_CONTROL);
105
106         pause_frame->opcode = rte_cpu_to_be_16(0x0001);
107         pause_frame->param  = rte_cpu_to_be_16(duration);
108
109         mbuf->pkt_len  = 60;
110         mbuf->data_len = 60;
111
112         rte_eth_tx_burst(port_id, 0, &mbuf, 1);
113 }
114
115 /**
116  * Get the previous enabled lcore ID
117  *
118  * @param lcore_id
119  *   The current lcore ID.
120  * @return
121  *   The previous enabled lcore_id or -1 if not found.
122  */
123 static unsigned int
124 get_previous_lcore_id(unsigned int lcore_id)
125 {
126         int i;
127
128         for (i = lcore_id - 1; i >= 0; i--)
129                 if (rte_lcore_is_enabled(i))
130                         return i;
131
132         return -1;
133 }
134
135 /**
136  * Get the last enabled lcore ID
137  *
138  * @return
139  *   The last enabled lcore_id.
140  */
141 static unsigned int
142 get_last_lcore_id(void)
143 {
144         int i;
145
146         for (i = RTE_MAX_LCORE; i >= 0; i--)
147                 if (rte_lcore_is_enabled(i))
148                         return i;
149
150         return 0;
151 }
152
153 static void
154 receive_stage(__attribute__((unused)) void *args)
155 {
156         int i, ret;
157
158         uint16_t port_id;
159         uint16_t nb_rx_pkts;
160
161         unsigned int lcore_id;
162         unsigned int free;
163
164         struct rte_mbuf *pkts[MAX_PKT_QUOTA];
165         struct rte_ring *ring;
166         enum ring_state ring_state[RTE_MAX_ETHPORTS] = { RING_READY };
167
168         lcore_id = rte_lcore_id();
169
170         RTE_LOG(INFO, USER1,
171                         "%s() started on core %u\n", __func__, lcore_id);
172
173         while (1) {
174
175                 /* Process each port round robin style */
176                 for (port_id = 0; port_id < RTE_MAX_ETHPORTS; port_id++) {
177
178                         if (!is_bit_set(port_id, portmask))
179                                 continue;
180
181                         ring = rings[lcore_id][port_id];
182
183                         if (ring_state[port_id] != RING_READY) {
184                                 if (rte_ring_count(ring) > *low_watermark)
185                                         continue;
186                                 else
187                                         ring_state[port_id] = RING_READY;
188                         }
189
190                         /* Enqueue received packets on the RX ring */
191                         nb_rx_pkts = rte_eth_rx_burst(port_id, 0, pkts,
192                                         (uint16_t) *quota);
193                         ret = rte_ring_enqueue_bulk(ring, (void *) pkts,
194                                         nb_rx_pkts, &free);
195                         if (RING_SIZE - free > *high_watermark) {
196                                 ring_state[port_id] = RING_OVERLOADED;
197                                 send_pause_frame(port_id, 1337);
198                         }
199
200                         if (ret == 0) {
201
202                                 /*
203                                  * Return  mbufs to the pool,
204                                  * effectively dropping packets
205                                  */
206                                 for (i = 0; i < nb_rx_pkts; i++)
207                                         rte_pktmbuf_free(pkts[i]);
208                         }
209                 }
210         }
211 }
212
213 static void
214 pipeline_stage(__attribute__((unused)) void *args)
215 {
216         int i, ret;
217         int nb_dq_pkts;
218
219         uint16_t port_id;
220
221         unsigned int lcore_id, previous_lcore_id;
222         unsigned int free;
223
224         void *pkts[MAX_PKT_QUOTA];
225         struct rte_ring *rx, *tx;
226         enum ring_state ring_state[RTE_MAX_ETHPORTS] = { RING_READY };
227
228         lcore_id = rte_lcore_id();
229         previous_lcore_id = get_previous_lcore_id(lcore_id);
230
231         RTE_LOG(INFO, USER1,
232                         "%s() started on core %u - processing packets from core %u\n",
233                         __func__, lcore_id, previous_lcore_id);
234
235         while (1) {
236
237                 for (port_id = 0; port_id < RTE_MAX_ETHPORTS; port_id++) {
238
239                         if (!is_bit_set(port_id, portmask))
240                                 continue;
241
242                         tx = rings[lcore_id][port_id];
243                         rx = rings[previous_lcore_id][port_id];
244
245                         if (ring_state[port_id] != RING_READY) {
246                                 if (rte_ring_count(tx) > *low_watermark)
247                                         continue;
248                                 else
249                                         ring_state[port_id] = RING_READY;
250                         }
251
252                         /* Dequeue up to quota mbuf from rx */
253                         nb_dq_pkts = rte_ring_dequeue_burst(rx, pkts,
254                                         *quota, NULL);
255                         if (unlikely(nb_dq_pkts < 0))
256                                 continue;
257
258                         /* Enqueue them on tx */
259                         ret = rte_ring_enqueue_bulk(tx, pkts,
260                                         nb_dq_pkts, &free);
261                         if (RING_SIZE - free > *high_watermark)
262                                 ring_state[port_id] = RING_OVERLOADED;
263
264                         if (ret == 0) {
265
266                                 /*
267                                  * Return  mbufs to the pool,
268                                  * effectively dropping packets
269                                  */
270                                 for (i = 0; i < nb_dq_pkts; i++)
271                                         rte_pktmbuf_free(pkts[i]);
272                         }
273                 }
274         }
275 }
276
277 static void
278 send_stage(__attribute__((unused)) void *args)
279 {
280         uint16_t nb_dq_pkts;
281
282         uint16_t port_id;
283         uint16_t dest_port_id;
284
285         unsigned int lcore_id, previous_lcore_id;
286
287         struct rte_ring *tx;
288         struct rte_mbuf *tx_pkts[MAX_PKT_QUOTA];
289
290         lcore_id = rte_lcore_id();
291         previous_lcore_id = get_previous_lcore_id(lcore_id);
292
293         RTE_LOG(INFO, USER1,
294                         "%s() started on core %u - processing packets from core %u\n",
295                         __func__, lcore_id, previous_lcore_id);
296
297         while (1) {
298
299                 /* Process each ring round robin style */
300                 for (port_id = 0; port_id < RTE_MAX_ETHPORTS; port_id++) {
301
302                         if (!is_bit_set(port_id, portmask))
303                                 continue;
304
305                         dest_port_id = port_pairs[port_id];
306                         tx = rings[previous_lcore_id][port_id];
307
308                         if (rte_ring_empty(tx))
309                                 continue;
310
311                         /* Dequeue packets from tx and send them */
312                         nb_dq_pkts = (uint16_t) rte_ring_dequeue_burst(tx,
313                                         (void *) tx_pkts, *quota, NULL);
314                         rte_eth_tx_burst(dest_port_id, 0, tx_pkts, nb_dq_pkts);
315
316                         /* TODO: Check if nb_dq_pkts == nb_tx_pkts? */
317                 }
318         }
319 }
320
321 int
322 main(int argc, char **argv)
323 {
324         int ret;
325         unsigned int lcore_id, master_lcore_id, last_lcore_id;
326
327         uint16_t port_id;
328
329         rte_log_set_global_level(RTE_LOG_INFO);
330
331         ret = rte_eal_init(argc, argv);
332         if (ret < 0)
333                 rte_exit(EXIT_FAILURE, "Cannot initialize EAL\n");
334
335         argc -= ret;
336         argv += ret;
337
338         init_dpdk();
339         setup_shared_variables();
340
341         *quota = 32;
342         *low_watermark = 60 * RING_SIZE / 100;
343
344         last_lcore_id   = get_last_lcore_id();
345         master_lcore_id = rte_get_master_lcore();
346
347         /* Parse the application's arguments */
348         ret = parse_qw_args(argc, argv);
349         if (ret < 0)
350                 rte_exit(EXIT_FAILURE, "Invalid quota/watermark argument(s)\n");
351
352         /* Create a pool of mbuf to store packets */
353         mbuf_pool = rte_pktmbuf_pool_create("mbuf_pool", MBUF_PER_POOL, 32, 0,
354                         MBUF_DATA_SIZE, rte_socket_id());
355         if (mbuf_pool == NULL)
356                 rte_panic("%s\n", rte_strerror(rte_errno));
357
358         for (port_id = 0; port_id < RTE_MAX_ETHPORTS; port_id++)
359                 if (is_bit_set(port_id, portmask)) {
360                         configure_eth_port(port_id);
361                         init_ring(master_lcore_id, port_id);
362                 }
363
364         pair_ports();
365
366         /*
367          * Start pipeline_connect() on all the available slave lcores
368          * but the last
369          */
370         for (lcore_id = 0 ; lcore_id < last_lcore_id; lcore_id++) {
371                 if (rte_lcore_is_enabled(lcore_id) &&
372                                 lcore_id != master_lcore_id) {
373
374                         for (port_id = 0; port_id < RTE_MAX_ETHPORTS; port_id++)
375                                 if (is_bit_set(port_id, portmask))
376                                         init_ring(lcore_id, port_id);
377
378                         /* typecast is a workaround for GCC 4.3 bug */
379                         rte_eal_remote_launch((int (*)(void *))pipeline_stage,
380                                         NULL, lcore_id);
381                 }
382         }
383
384         /* Start send_stage() on the last slave core */
385         /* typecast is a workaround for GCC 4.3 bug */
386         rte_eal_remote_launch((int (*)(void *))send_stage, NULL, last_lcore_id);
387
388         /* Start receive_stage() on the master core */
389         receive_stage(NULL);
390
391         return 0;
392 }