Initial commit of vpp code.
[vpp.git] / vnet / vnet / devices / dpdk / threads.c
1 /*
2  * Copyright (c) 2015 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15 #include <vnet/vnet.h>
16 #include <vppinfra/vec.h>
17 #include <vppinfra/error.h>
18 #include <vppinfra/format.h>
19 #include <signal.h>
20
21 #include <vnet/ethernet/ethernet.h>
22 #include <vnet/devices/dpdk/dpdk.h>
23
24 #include <vlibmemory/api.h>
25 #include <vlibmemory/vl_memory_msg_enum.h> /* enumerate all vlib messages */
26
27 #define vl_typedefs             /* define message structures */
28 #include <vlibmemory/vl_memory_api_h.h> 
29 #undef vl_typedefs
30
31 /* instantiate all the print functions we know about */
32 #define vl_print(handle, ...) vlib_cli_output (handle, __VA_ARGS__)
33 #define vl_printfun
34 #include <vlibmemory/vl_memory_api_h.h> 
35 #undef vl_printfun
36
37 vlib_thread_main_t vlib_thread_main;
38
39 frame_queue_trace_t *frame_queue_traces;
40
41 /*
42  * Check the frame queue to see if any frames are available.
43  * If so, pull the packets off the frames and put them to 
44  * the handoff node.
45  */
46 static inline int vlib_frame_queue_dequeue_internal (vlib_main_t *vm)
47 {
48   u32 thread_id = vm->cpu_index;
49   vlib_frame_queue_t *fq = vlib_frame_queues[thread_id];
50   vlib_frame_queue_elt_t *elt;
51   u32 * from, * to;
52   vlib_frame_t * f;
53   int msg_type;
54   int processed = 0;
55   u32 n_left_to_node;
56   u32 vectors = 0;
57
58   ASSERT (fq);
59   ASSERT(vm == vlib_mains[thread_id]);
60
61   /*
62    * Gather trace data for frame queues
63    */
64   if (PREDICT_FALSE(fq->trace))
65     {
66       frame_queue_trace_t *fqt;
67       u32 elix;
68    
69       fqt = &frame_queue_traces[thread_id];
70       fqt->nelts = fq->nelts;
71       fqt->head = fq->head;
72       fqt->head_hint = fq->head_hint;
73       fqt->tail = fq->tail;
74       fqt->threshold = fq->vector_threshold;
75       fqt->n_in_use = fqt->tail - fqt->head;
76       if (fqt->n_in_use > fqt->nelts){
77         fqt->n_in_use = 0;
78       }
79
80       for (elix=0; elix<fqt->nelts; elix++) {
81         elt = fq->elts + ((fq->head+1 + elix) & (fq->nelts-1));
82         if (1 || elt->valid) 
83           {
84             fqt->n_vectors[elix] = elt->n_vectors;
85           }
86       }
87       fqt->written = 1;
88     }
89
90   while (1)
91     {
92       if (fq->head == fq->tail)
93         {
94           fq->head_hint = fq->head;
95           return processed;
96         }
97       
98       elt = fq->elts + ((fq->head+1) & (fq->nelts-1));
99       
100       if (!elt->valid)
101         {
102           fq->head_hint = fq->head;
103           return processed;
104         }
105
106       from = elt->buffer_index;
107       msg_type = elt->msg_type;
108
109       ASSERT (msg_type == VLIB_FRAME_QUEUE_ELT_DISPATCH_FRAME);
110       ASSERT (elt->n_vectors <= VLIB_FRAME_SIZE);
111
112       f = vlib_get_frame_to_node 
113           (vm, 1 ? handoff_dispatch_node.index : ethernet_input_node.index);
114
115       to = vlib_frame_vector_args (f);
116
117       n_left_to_node = elt->n_vectors;
118
119       while (n_left_to_node >= 4)
120         {
121           to[0] = from[0];
122           to[1] = from[1];
123           to[2] = from[2];
124           to[3] = from[3];
125           to += 4;
126           from += 4;
127           n_left_to_node -= 4;
128         }
129
130       while (n_left_to_node > 0)
131         {
132           to[0] = from[0];
133           to++;
134           from++;
135           n_left_to_node--;
136         }
137
138       vectors += elt->n_vectors;
139       f->n_vectors = elt->n_vectors;
140       vlib_put_frame_to_node 
141           (vm, 1 ? handoff_dispatch_node.index : ethernet_input_node.index, f);
142
143       elt->valid = 0;
144       elt->n_vectors = 0;
145       elt->msg_type = 0xfefefefe;
146       CLIB_MEMORY_BARRIER();
147       fq->head++;
148       processed++;
149
150       /* 
151        * Limit the number of packets pushed into the graph
152        */
153       if (vectors >= fq->vector_threshold)
154         {
155           fq->head_hint = fq->head;
156           return processed;
157         }
158     }
159   ASSERT(0);
160   return processed;
161 }
162
163 int dpdk_frame_queue_dequeue (vlib_main_t *vm) 
164 {
165   return vlib_frame_queue_dequeue_internal (vm);
166 }
167
168 /*
169  * dpdk_worker_thread - Contains the main loop of a worker thread.
170  *
171  * w
172  *     Information for the current thread
173  * io_name
174  *     The name of thread performing dpdk device IO (if any). If there are no
175  *     instances of that thread, then the current thread will do dpdk device
176  *     polling. Ports will be divided among instances of the current thread.
177  * callback
178  *     If not null, this function will be called once during each main loop.
179  */
180 static_always_inline void
181 dpdk_worker_thread_internal (vlib_main_t *vm,
182                              dpdk_worker_thread_callback_t callback,
183                              int have_io_threads)
184 {
185   vlib_node_main_t * nm = &vm->node_main;
186   u64 cpu_time_now = clib_cpu_time_now ();
187
188   while (1)
189     {
190       vlib_worker_thread_barrier_check ();
191
192       vlib_frame_queue_dequeue_internal (vm);
193
194       /* Invoke callback if supplied */
195       if (PREDICT_FALSE(callback != NULL))
196           callback(vm);
197
198       if (!have_io_threads)
199         {
200           vlib_node_runtime_t * n;
201           vec_foreach (n, nm->nodes_by_type[VLIB_NODE_TYPE_INPUT])
202             {
203               cpu_time_now = dispatch_node (vm, n, VLIB_NODE_TYPE_INPUT,
204                                             VLIB_NODE_STATE_POLLING, /* frame */ 0,
205                                             cpu_time_now);
206             }
207
208         }
209
210       if (_vec_len (nm->pending_frames))
211         {
212           int i;
213           cpu_time_now = clib_cpu_time_now ();
214           for (i = 0; i < _vec_len (nm->pending_frames); i++) {
215             vlib_pending_frame_t *p;
216
217             p = nm->pending_frames + i;
218
219             cpu_time_now = dispatch_pending_node (vm, p, cpu_time_now);
220           }
221           _vec_len (nm->pending_frames) = 0;
222         }
223       vlib_increment_main_loop_counter (vm);
224
225       /* Record time stamp in case there are no enabled nodes and above
226          calls do not update time stamp. */
227       cpu_time_now = clib_cpu_time_now ();
228     }
229 }
230
231 void dpdk_worker_thread (vlib_worker_thread_t * w,
232                          char *io_name,
233                          dpdk_worker_thread_callback_t callback)
234 {
235   vlib_main_t *vm;
236   uword * p;
237   vlib_thread_main_t * tm = vlib_get_thread_main();
238   vlib_thread_registration_t * tr;
239   dpdk_main_t * dm = &dpdk_main;
240
241   vm = vlib_get_main();
242
243   ASSERT(vm->cpu_index == os_get_cpu_number());
244
245   clib_time_init (&vm->clib_time);
246   clib_mem_set_heap (w->thread_mheap);
247
248   /* Wait until the dpdk init sequence is complete */
249   while (dm->io_thread_release == 0)
250     vlib_worker_thread_barrier_check ();
251
252   /* any I/O threads? */
253   p = hash_get_mem (tm->thread_registrations_by_name, io_name);
254   tr = (vlib_thread_registration_t *)p[0];
255
256   if (tr && tr->count > 0)
257     dpdk_worker_thread_internal(vm, callback, /* have_io_threads */ 1);
258   else
259     dpdk_worker_thread_internal(vm, callback, /* have_io_threads */ 0);
260 }
261
262 void dpdk_worker_thread_fn (void * arg)
263 {
264   vlib_worker_thread_t *w = (vlib_worker_thread_t *) arg;
265   vlib_worker_thread_init (w);
266   dpdk_worker_thread (w, "io", 0);
267 }
268
269 #if VIRL == 0
270 VLIB_REGISTER_THREAD (worker_thread_reg, static) = {
271   .name = "workers",
272   .short_name = "wk",
273   .function = dpdk_worker_thread_fn,
274   .mheap_size = 256<<20,
275 };
276 #endif
277
278 void dpdk_io_thread_fn (void * arg)
279 {
280   vlib_worker_thread_t *w = (vlib_worker_thread_t *) arg;
281   vlib_worker_thread_init (w);
282   dpdk_io_thread (w, 0, 0, "workers", 0);
283 }
284
285 #if VIRL == 0
286 VLIB_REGISTER_THREAD (io_thread_reg, static) = {
287   .name = "io",
288   .short_name = "io",
289   .function = dpdk_io_thread_fn,
290   .mheap_size = 256<<20,
291 };
292 #endif
293
294 static void vl_api_rpc_call_t_handler (vl_api_rpc_call_t * mp)
295 {
296   vl_api_rpc_reply_t * rmp;
297   int (*fp)(void *);
298   i32 rv = 0;
299   vlib_main_t * vm = vlib_get_main();
300
301   if (mp->function == 0)
302     {
303       rv = -1;
304       clib_warning ("rpc NULL function pointer");
305     }
306   
307   else
308     {
309       if (mp->need_barrier_sync)
310         vlib_worker_thread_barrier_sync (vm);
311
312       fp = (void *)(mp->function);
313       rv = (*fp)(mp->data);
314
315       if (mp->need_barrier_sync)
316         vlib_worker_thread_barrier_release (vm);
317     }
318
319   if (mp->send_reply)
320     {
321       unix_shared_memory_queue_t * q =
322         vl_api_client_index_to_input_queue (mp->client_index);
323       if (q)
324         {
325           rmp = vl_msg_api_alloc_as_if_client (sizeof (*rmp));
326           rmp->_vl_msg_id = ntohs (VL_API_RPC_REPLY);
327           rmp->context = mp->context;
328           rmp->retval = rv;
329           vl_msg_api_send_shmem (q, (u8 *)&rmp);
330         }
331     }
332   if (mp->multicast)
333     {
334       clib_warning ("multicast not yet implemented...");
335     }
336 }
337
338 static void vl_api_rpc_reply_t_handler (vl_api_rpc_reply_t * mp)
339 { clib_warning ("unimplemented"); }
340
341 void vl_api_rpc_call_main_thread (void *fp, u8 * data, u32 data_length)
342 {
343   vl_api_rpc_call_t * mp;
344   api_main_t *am = &api_main;
345   vl_shmem_hdr_t *shmem_hdr = am->shmem_hdr;
346
347   mp = vl_msg_api_alloc_as_if_client (sizeof (*mp) + data_length);
348   memset (mp, 0, sizeof (*mp));
349   memcpy (mp->data, data, data_length);
350   mp->_vl_msg_id = ntohs (VL_API_RPC_CALL);
351   mp->function = (u64)fp;
352   mp->need_barrier_sync = 1;
353   
354   /* Use the "normal" control-plane mechanism for the main thread */
355   vl_msg_api_send_shmem (shmem_hdr->vl_input_queue, (u8 *)&mp);
356 }
357
358
359 #define foreach_rpc_api_msg                     \
360 _(RPC_CALL,rpc_call)                            \
361 _(RPC_REPLY,rpc_reply)
362
363 static clib_error_t *
364 rpc_api_hookup (vlib_main_t *vm)
365 {
366 #define _(N,n)                                                  \
367     vl_msg_api_set_handlers(VL_API_##N, #n,                     \
368                            vl_api_##n##_t_handler,              \
369                            vl_noop_handler,                     \
370                            vl_noop_handler,                     \
371                            vl_api_##n##_t_print,                \
372                            sizeof(vl_api_##n##_t), 0 /* do not trace */); 
373     foreach_rpc_api_msg;
374 #undef _
375     return 0;
376 }
377
378 VLIB_API_INIT_FUNCTION(rpc_api_hookup);