New upstream version 18.11-rc1
[deb_dpdk.git] / drivers / event / dsw / dsw_evdev.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2018 Ericsson AB
3  */
4
5 #include <stdbool.h>
6
7 #include <rte_cycles.h>
8 #include <rte_eventdev_pmd.h>
9 #include <rte_eventdev_pmd_vdev.h>
10 #include <rte_random.h>
11
12 #include "dsw_evdev.h"
13
14 #define EVENTDEV_NAME_DSW_PMD event_dsw
15
16 static int
17 dsw_port_setup(struct rte_eventdev *dev, uint8_t port_id,
18                const struct rte_event_port_conf *conf)
19 {
20         struct dsw_evdev *dsw = dsw_pmd_priv(dev);
21         struct dsw_port *port;
22         struct rte_event_ring *in_ring;
23         struct rte_ring *ctl_in_ring;
24         char ring_name[RTE_RING_NAMESIZE];
25
26         port = &dsw->ports[port_id];
27
28         *port = (struct dsw_port) {
29                 .id = port_id,
30                 .dsw = dsw,
31                 .dequeue_depth = conf->dequeue_depth,
32                 .enqueue_depth = conf->enqueue_depth,
33                 .new_event_threshold = conf->new_event_threshold
34         };
35
36         snprintf(ring_name, sizeof(ring_name), "dsw%d_p%u", dev->data->dev_id,
37                  port_id);
38
39         in_ring = rte_event_ring_create(ring_name, DSW_IN_RING_SIZE,
40                                         dev->data->socket_id,
41                                         RING_F_SC_DEQ|RING_F_EXACT_SZ);
42
43         if (in_ring == NULL)
44                 return -ENOMEM;
45
46         snprintf(ring_name, sizeof(ring_name), "dswctl%d_p%u",
47                  dev->data->dev_id, port_id);
48
49         ctl_in_ring = rte_ring_create(ring_name, DSW_CTL_IN_RING_SIZE,
50                                       dev->data->socket_id,
51                                       RING_F_SC_DEQ|RING_F_EXACT_SZ);
52
53         if (ctl_in_ring == NULL) {
54                 rte_event_ring_free(in_ring);
55                 return -ENOMEM;
56         }
57
58         port->in_ring = in_ring;
59         port->ctl_in_ring = ctl_in_ring;
60
61         rte_atomic16_init(&port->load);
62
63         port->load_update_interval =
64                 (DSW_LOAD_UPDATE_INTERVAL * rte_get_timer_hz()) / US_PER_S;
65
66         port->migration_interval =
67                 (DSW_MIGRATION_INTERVAL * rte_get_timer_hz()) / US_PER_S;
68
69         dev->data->ports[port_id] = port;
70
71         return 0;
72 }
73
74 static void
75 dsw_port_def_conf(struct rte_eventdev *dev __rte_unused,
76                   uint8_t port_id __rte_unused,
77                   struct rte_event_port_conf *port_conf)
78 {
79         *port_conf = (struct rte_event_port_conf) {
80                 .new_event_threshold = 1024,
81                 .dequeue_depth = DSW_MAX_PORT_DEQUEUE_DEPTH / 4,
82                 .enqueue_depth = DSW_MAX_PORT_ENQUEUE_DEPTH / 4
83         };
84 }
85
86 static void
87 dsw_port_release(void *p)
88 {
89         struct dsw_port *port = p;
90
91         rte_event_ring_free(port->in_ring);
92         rte_ring_free(port->ctl_in_ring);
93 }
94
95 static int
96 dsw_queue_setup(struct rte_eventdev *dev, uint8_t queue_id,
97                 const struct rte_event_queue_conf *conf)
98 {
99         struct dsw_evdev *dsw = dsw_pmd_priv(dev);
100         struct dsw_queue *queue = &dsw->queues[queue_id];
101
102         if (RTE_EVENT_QUEUE_CFG_ALL_TYPES & conf->event_queue_cfg)
103                 return -ENOTSUP;
104
105         if (conf->schedule_type == RTE_SCHED_TYPE_ORDERED)
106                 return -ENOTSUP;
107
108         /* SINGLE_LINK is better off treated as TYPE_ATOMIC, since it
109          * avoid the "fake" TYPE_PARALLEL flow_id assignment. Since
110          * the queue will only have a single serving port, no
111          * migration will ever happen, so the extra TYPE_ATOMIC
112          * migration overhead is avoided.
113          */
114         if (RTE_EVENT_QUEUE_CFG_SINGLE_LINK & conf->event_queue_cfg)
115                 queue->schedule_type = RTE_SCHED_TYPE_ATOMIC;
116         else /* atomic or parallel */
117                 queue->schedule_type = conf->schedule_type;
118
119         queue->num_serving_ports = 0;
120
121         return 0;
122 }
123
124 static void
125 dsw_queue_def_conf(struct rte_eventdev *dev __rte_unused,
126                    uint8_t queue_id __rte_unused,
127                    struct rte_event_queue_conf *queue_conf)
128 {
129         *queue_conf = (struct rte_event_queue_conf) {
130                 .nb_atomic_flows = 4096,
131                 .schedule_type = RTE_SCHED_TYPE_ATOMIC,
132                 .priority = RTE_EVENT_DEV_PRIORITY_NORMAL
133         };
134 }
135
136 static void
137 dsw_queue_release(struct rte_eventdev *dev __rte_unused,
138                   uint8_t queue_id __rte_unused)
139 {
140 }
141
142 static void
143 queue_add_port(struct dsw_queue *queue, uint16_t port_id)
144 {
145         queue->serving_ports[queue->num_serving_ports] = port_id;
146         queue->num_serving_ports++;
147 }
148
149 static bool
150 queue_remove_port(struct dsw_queue *queue, uint16_t port_id)
151 {
152         uint16_t i;
153
154         for (i = 0; i < queue->num_serving_ports; i++)
155                 if (queue->serving_ports[i] == port_id) {
156                         uint16_t last_idx = queue->num_serving_ports - 1;
157                         if (i != last_idx)
158                                 queue->serving_ports[i] =
159                                         queue->serving_ports[last_idx];
160                         queue->num_serving_ports--;
161                         return true;
162                 }
163         return false;
164 }
165
166 static int
167 dsw_port_link_unlink(struct rte_eventdev *dev, void *port,
168                      const uint8_t queues[], uint16_t num, bool link)
169 {
170         struct dsw_evdev *dsw = dsw_pmd_priv(dev);
171         struct dsw_port *p = port;
172         uint16_t i;
173         uint16_t count = 0;
174
175         for (i = 0; i < num; i++) {
176                 uint8_t qid = queues[i];
177                 struct dsw_queue *q = &dsw->queues[qid];
178                 if (link) {
179                         queue_add_port(q, p->id);
180                         count++;
181                 } else {
182                         bool removed = queue_remove_port(q, p->id);
183                         if (removed)
184                                 count++;
185                 }
186         }
187
188         return count;
189 }
190
191 static int
192 dsw_port_link(struct rte_eventdev *dev, void *port, const uint8_t queues[],
193               const uint8_t priorities[] __rte_unused, uint16_t num)
194 {
195         return dsw_port_link_unlink(dev, port, queues, num, true);
196 }
197
198 static int
199 dsw_port_unlink(struct rte_eventdev *dev, void *port, uint8_t queues[],
200                 uint16_t num)
201 {
202         return dsw_port_link_unlink(dev, port, queues, num, false);
203 }
204
205 static void
206 dsw_info_get(struct rte_eventdev *dev __rte_unused,
207              struct rte_event_dev_info *info)
208 {
209         *info = (struct rte_event_dev_info) {
210                 .driver_name = DSW_PMD_NAME,
211                 .max_event_queues = DSW_MAX_QUEUES,
212                 .max_event_queue_flows = DSW_MAX_FLOWS,
213                 .max_event_queue_priority_levels = 1,
214                 .max_event_priority_levels = 1,
215                 .max_event_ports = DSW_MAX_PORTS,
216                 .max_event_port_dequeue_depth = DSW_MAX_PORT_DEQUEUE_DEPTH,
217                 .max_event_port_enqueue_depth = DSW_MAX_PORT_ENQUEUE_DEPTH,
218                 .max_num_events = DSW_MAX_EVENTS,
219                 .event_dev_cap = RTE_EVENT_DEV_CAP_BURST_MODE|
220                 RTE_EVENT_DEV_CAP_DISTRIBUTED_SCHED
221         };
222 }
223
224 static int
225 dsw_configure(const struct rte_eventdev *dev)
226 {
227         struct dsw_evdev *dsw = dsw_pmd_priv(dev);
228         const struct rte_event_dev_config *conf = &dev->data->dev_conf;
229         int32_t min_max_in_flight;
230
231         dsw->num_ports = conf->nb_event_ports;
232         dsw->num_queues = conf->nb_event_queues;
233
234         /* Avoid a situation where consumer ports are holding all the
235          * credits, without making use of them.
236          */
237         min_max_in_flight = conf->nb_event_ports * DSW_PORT_MAX_CREDITS;
238
239         dsw->max_inflight = RTE_MAX(conf->nb_events_limit, min_max_in_flight);
240
241         return 0;
242 }
243
244
245 static void
246 initial_flow_to_port_assignment(struct dsw_evdev *dsw)
247 {
248         uint8_t queue_id;
249         for (queue_id = 0; queue_id < dsw->num_queues; queue_id++) {
250                 struct dsw_queue *queue = &dsw->queues[queue_id];
251                 uint16_t flow_hash;
252                 for (flow_hash = 0; flow_hash < DSW_MAX_FLOWS; flow_hash++) {
253                         uint8_t port_idx =
254                                 rte_rand() % queue->num_serving_ports;
255                         uint8_t port_id =
256                                 queue->serving_ports[port_idx];
257                         dsw->queues[queue_id].flow_to_port_map[flow_hash] =
258                                 port_id;
259                 }
260         }
261 }
262
263 static int
264 dsw_start(struct rte_eventdev *dev)
265 {
266         struct dsw_evdev *dsw = dsw_pmd_priv(dev);
267         uint16_t i;
268         uint64_t now;
269
270         rte_atomic32_init(&dsw->credits_on_loan);
271
272         initial_flow_to_port_assignment(dsw);
273
274         now = rte_get_timer_cycles();
275         for (i = 0; i < dsw->num_ports; i++) {
276                 dsw->ports[i].measurement_start = now;
277                 dsw->ports[i].busy_start = now;
278         }
279
280         return 0;
281 }
282
283 static void
284 dsw_port_drain_buf(uint8_t dev_id, struct rte_event *buf, uint16_t buf_len,
285                    eventdev_stop_flush_t flush, void *flush_arg)
286 {
287         uint16_t i;
288
289         for (i = 0; i < buf_len; i++)
290                 flush(dev_id, buf[i], flush_arg);
291 }
292
293 static void
294 dsw_port_drain_paused(uint8_t dev_id, struct dsw_port *port,
295                       eventdev_stop_flush_t flush, void *flush_arg)
296 {
297         dsw_port_drain_buf(dev_id, port->paused_events, port->paused_events_len,
298                            flush, flush_arg);
299 }
300
301 static void
302 dsw_port_drain_out(uint8_t dev_id, struct dsw_evdev *dsw, struct dsw_port *port,
303                    eventdev_stop_flush_t flush, void *flush_arg)
304 {
305         uint16_t dport_id;
306
307         for (dport_id = 0; dport_id < dsw->num_ports; dport_id++)
308                 if (dport_id != port->id)
309                         dsw_port_drain_buf(dev_id, port->out_buffer[dport_id],
310                                            port->out_buffer_len[dport_id],
311                                            flush, flush_arg);
312 }
313
314 static void
315 dsw_port_drain_in_ring(uint8_t dev_id, struct dsw_port *port,
316                        eventdev_stop_flush_t flush, void *flush_arg)
317 {
318         struct rte_event ev;
319
320         while (rte_event_ring_dequeue_burst(port->in_ring, &ev, 1, NULL))
321                 flush(dev_id, ev, flush_arg);
322 }
323
324 static void
325 dsw_drain(uint8_t dev_id, struct dsw_evdev *dsw,
326           eventdev_stop_flush_t flush, void *flush_arg)
327 {
328         uint16_t port_id;
329
330         if (flush == NULL)
331                 return;
332
333         for (port_id = 0; port_id < dsw->num_ports; port_id++) {
334                 struct dsw_port *port = &dsw->ports[port_id];
335
336                 dsw_port_drain_out(dev_id, dsw, port, flush, flush_arg);
337                 dsw_port_drain_paused(dev_id, port, flush, flush_arg);
338                 dsw_port_drain_in_ring(dev_id, port, flush, flush_arg);
339         }
340 }
341
342 static void
343 dsw_stop(struct rte_eventdev *dev)
344 {
345         struct dsw_evdev *dsw = dsw_pmd_priv(dev);
346         uint8_t dev_id;
347         eventdev_stop_flush_t flush;
348         void *flush_arg;
349
350         dev_id = dev->data->dev_id;
351         flush = dev->dev_ops->dev_stop_flush;
352         flush_arg = dev->data->dev_stop_flush_arg;
353
354         dsw_drain(dev_id, dsw, flush, flush_arg);
355 }
356
357 static int
358 dsw_close(struct rte_eventdev *dev)
359 {
360         struct dsw_evdev *dsw = dsw_pmd_priv(dev);
361
362         dsw->num_ports = 0;
363         dsw->num_queues = 0;
364
365         return 0;
366 }
367
368 static struct rte_eventdev_ops dsw_evdev_ops = {
369         .port_setup = dsw_port_setup,
370         .port_def_conf = dsw_port_def_conf,
371         .port_release = dsw_port_release,
372         .queue_setup = dsw_queue_setup,
373         .queue_def_conf = dsw_queue_def_conf,
374         .queue_release = dsw_queue_release,
375         .port_link = dsw_port_link,
376         .port_unlink = dsw_port_unlink,
377         .dev_infos_get = dsw_info_get,
378         .dev_configure = dsw_configure,
379         .dev_start = dsw_start,
380         .dev_stop = dsw_stop,
381         .dev_close = dsw_close,
382         .xstats_get = dsw_xstats_get,
383         .xstats_get_names = dsw_xstats_get_names,
384         .xstats_get_by_name = dsw_xstats_get_by_name
385 };
386
387 static int
388 dsw_probe(struct rte_vdev_device *vdev)
389 {
390         const char *name;
391         struct rte_eventdev *dev;
392         struct dsw_evdev *dsw;
393
394         name = rte_vdev_device_name(vdev);
395
396         dev = rte_event_pmd_vdev_init(name, sizeof(struct dsw_evdev),
397                                       rte_socket_id());
398         if (dev == NULL)
399                 return -EFAULT;
400
401         dev->dev_ops = &dsw_evdev_ops;
402         dev->enqueue = dsw_event_enqueue;
403         dev->enqueue_burst = dsw_event_enqueue_burst;
404         dev->enqueue_new_burst = dsw_event_enqueue_new_burst;
405         dev->enqueue_forward_burst = dsw_event_enqueue_forward_burst;
406         dev->dequeue = dsw_event_dequeue;
407         dev->dequeue_burst = dsw_event_dequeue_burst;
408
409         if (rte_eal_process_type() != RTE_PROC_PRIMARY)
410                 return 0;
411
412         dsw = dev->data->dev_private;
413         dsw->data = dev->data;
414
415         return 0;
416 }
417
418 static int
419 dsw_remove(struct rte_vdev_device *vdev)
420 {
421         const char *name;
422
423         name = rte_vdev_device_name(vdev);
424         if (name == NULL)
425                 return -EINVAL;
426
427         return rte_event_pmd_vdev_uninit(name);
428 }
429
430 static struct rte_vdev_driver evdev_dsw_pmd_drv = {
431         .probe = dsw_probe,
432         .remove = dsw_remove
433 };
434
435 RTE_PMD_REGISTER_VDEV(EVENTDEV_NAME_DSW_PMD, evdev_dsw_pmd_drv);