session/tcp/vcl: fixes and optimizations
[vpp.git] / src / vcl / vppcom.c
1 /*
2  * Copyright (c) 2017 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <stdio.h>
17 #include <stdlib.h>
18 #include <svm/svm_fifo_segment.h>
19 #include <vcl/vppcom.h>
20 #include <vcl/vcl_debug.h>
21 #include <vcl/vcl_private.h>
22
23 __thread uword __vcl_worker_index = ~0;
24
25 static int
26 vcl_wait_for_segment (u64 segment_handle)
27 {
28   vcl_worker_t *wrk = vcl_worker_get_current ();
29   u32 wait_for_seconds = 10, segment_index;
30   f64 timeout;
31
32   if (segment_handle == VCL_INVALID_SEGMENT_HANDLE)
33     return 0;
34
35   timeout = clib_time_now (&wrk->clib_time) + wait_for_seconds;
36   while (clib_time_now (&wrk->clib_time) < timeout)
37     {
38       segment_index = vcl_segment_table_lookup (segment_handle);
39       if (segment_index != VCL_INVALID_SEGMENT_INDEX)
40         return 0;
41       usleep (10);
42     }
43   return 1;
44 }
45
46 static inline int
47 vcl_mq_dequeue_batch (vcl_worker_t * wrk, svm_msg_q_t * mq)
48 {
49   svm_msg_q_msg_t *msg;
50   u32 n_msgs;
51   int i;
52
53   n_msgs = svm_msg_q_size (mq);
54   for (i = 0; i < n_msgs; i++)
55     {
56       vec_add2 (wrk->mq_msg_vector, msg, 1);
57       svm_msg_q_sub_w_lock (mq, msg);
58     }
59   return n_msgs;
60 }
61
62 const char *
63 vppcom_session_state_str (vcl_session_state_t state)
64 {
65   char *st;
66
67   switch (state)
68     {
69     case STATE_START:
70       st = "STATE_START";
71       break;
72
73     case STATE_CONNECT:
74       st = "STATE_CONNECT";
75       break;
76
77     case STATE_LISTEN:
78       st = "STATE_LISTEN";
79       break;
80
81     case STATE_ACCEPT:
82       st = "STATE_ACCEPT";
83       break;
84
85     case STATE_VPP_CLOSING:
86       st = "STATE_VPP_CLOSING";
87       break;
88
89     case STATE_DISCONNECT:
90       st = "STATE_DISCONNECT";
91       break;
92
93     case STATE_FAILED:
94       st = "STATE_FAILED";
95       break;
96
97     case STATE_UPDATED:
98       st = "STATE_UPDATED";
99       break;
100
101     case STATE_LISTEN_NO_MQ:
102       st = "STATE_LISTEN_NO_MQ";
103       break;
104
105     default:
106       st = "UNKNOWN_STATE";
107       break;
108     }
109
110   return st;
111 }
112
113 u8 *
114 format_ip4_address (u8 * s, va_list * args)
115 {
116   u8 *a = va_arg (*args, u8 *);
117   return format (s, "%d.%d.%d.%d", a[0], a[1], a[2], a[3]);
118 }
119
120 u8 *
121 format_ip6_address (u8 * s, va_list * args)
122 {
123   ip6_address_t *a = va_arg (*args, ip6_address_t *);
124   u32 i, i_max_n_zero, max_n_zeros, i_first_zero, n_zeros, last_double_colon;
125
126   i_max_n_zero = ARRAY_LEN (a->as_u16);
127   max_n_zeros = 0;
128   i_first_zero = i_max_n_zero;
129   n_zeros = 0;
130   for (i = 0; i < ARRAY_LEN (a->as_u16); i++)
131     {
132       u32 is_zero = a->as_u16[i] == 0;
133       if (is_zero && i_first_zero >= ARRAY_LEN (a->as_u16))
134         {
135           i_first_zero = i;
136           n_zeros = 0;
137         }
138       n_zeros += is_zero;
139       if ((!is_zero && n_zeros > max_n_zeros)
140           || (i + 1 >= ARRAY_LEN (a->as_u16) && n_zeros > max_n_zeros))
141         {
142           i_max_n_zero = i_first_zero;
143           max_n_zeros = n_zeros;
144           i_first_zero = ARRAY_LEN (a->as_u16);
145           n_zeros = 0;
146         }
147     }
148
149   last_double_colon = 0;
150   for (i = 0; i < ARRAY_LEN (a->as_u16); i++)
151     {
152       if (i == i_max_n_zero && max_n_zeros > 1)
153         {
154           s = format (s, "::");
155           i += max_n_zeros - 1;
156           last_double_colon = 1;
157         }
158       else
159         {
160           s = format (s, "%s%x",
161                       (last_double_colon || i == 0) ? "" : ":",
162                       clib_net_to_host_u16 (a->as_u16[i]));
163           last_double_colon = 0;
164         }
165     }
166
167   return s;
168 }
169
170 /* Format an IP46 address. */
171 u8 *
172 format_ip46_address (u8 * s, va_list * args)
173 {
174   ip46_address_t *ip46 = va_arg (*args, ip46_address_t *);
175   ip46_type_t type = va_arg (*args, ip46_type_t);
176   int is_ip4 = 1;
177
178   switch (type)
179     {
180     case IP46_TYPE_ANY:
181       is_ip4 = ip46_address_is_ip4 (ip46);
182       break;
183     case IP46_TYPE_IP4:
184       is_ip4 = 1;
185       break;
186     case IP46_TYPE_IP6:
187       is_ip4 = 0;
188       break;
189     }
190
191   return is_ip4 ?
192     format (s, "%U", format_ip4_address, &ip46->ip4) :
193     format (s, "%U", format_ip6_address, &ip46->ip6);
194 }
195
196 /*
197  * VPPCOM Utility Functions
198  */
199
200
201 static void
202 vcl_send_session_accepted_reply (svm_msg_q_t * mq, u32 context,
203                                  session_handle_t handle, int retval)
204 {
205   app_session_evt_t _app_evt, *app_evt = &_app_evt;
206   session_accepted_reply_msg_t *rmp;
207   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_ACCEPTED_REPLY);
208   rmp = (session_accepted_reply_msg_t *) app_evt->evt->data;
209   rmp->handle = handle;
210   rmp->context = context;
211   rmp->retval = retval;
212   app_send_ctrl_evt_to_vpp (mq, app_evt);
213 }
214
215 static void
216 vcl_send_session_disconnected_reply (svm_msg_q_t * mq, u32 context,
217                                      session_handle_t handle, int retval)
218 {
219   app_session_evt_t _app_evt, *app_evt = &_app_evt;
220   session_disconnected_reply_msg_t *rmp;
221   app_alloc_ctrl_evt_to_vpp (mq, app_evt,
222                              SESSION_CTRL_EVT_DISCONNECTED_REPLY);
223   rmp = (session_disconnected_reply_msg_t *) app_evt->evt->data;
224   rmp->handle = handle;
225   rmp->context = context;
226   rmp->retval = retval;
227   app_send_ctrl_evt_to_vpp (mq, app_evt);
228 }
229
230 static void
231 vcl_send_session_reset_reply (svm_msg_q_t * mq, u32 context,
232                               session_handle_t handle, int retval)
233 {
234   app_session_evt_t _app_evt, *app_evt = &_app_evt;
235   session_reset_reply_msg_t *rmp;
236   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_RESET_REPLY);
237   rmp = (session_reset_reply_msg_t *) app_evt->evt->data;
238   rmp->handle = handle;
239   rmp->context = context;
240   rmp->retval = retval;
241   app_send_ctrl_evt_to_vpp (mq, app_evt);
242 }
243
244 void
245 vcl_send_session_worker_update (vcl_worker_t * wrk, vcl_session_t * s,
246                                 u32 wrk_index)
247 {
248   app_session_evt_t _app_evt, *app_evt = &_app_evt;
249   session_worker_update_msg_t *mp;
250   svm_msg_q_t *mq;
251
252   mq = vcl_session_vpp_evt_q (wrk, s);
253   app_alloc_ctrl_evt_to_vpp (mq, app_evt, SESSION_CTRL_EVT_WORKER_UPDATE);
254   mp = (session_worker_update_msg_t *) app_evt->evt->data;
255   mp->client_index = wrk->my_client_index;
256   mp->handle = s->vpp_handle;
257   mp->req_wrk_index = wrk->vpp_wrk_index;
258   mp->wrk_index = wrk_index;
259   app_send_ctrl_evt_to_vpp (mq, app_evt);
260 }
261
262 static u32
263 vcl_session_accepted_handler (vcl_worker_t * wrk, session_accepted_msg_t * mp)
264 {
265   vcl_session_t *session, *listen_session;
266   svm_fifo_t *rx_fifo, *tx_fifo;
267   u32 vpp_wrk_index;
268   svm_msg_q_t *evt_q;
269
270   session = vcl_session_alloc (wrk);
271
272   listen_session = vcl_session_table_lookup_listener (wrk,
273                                                       mp->listener_handle);
274   if (!listen_session)
275     {
276       evt_q = uword_to_pointer (mp->vpp_event_queue_address, svm_msg_q_t *);
277       clib_warning ("VCL<%d>: ERROR: couldn't find listen session: "
278                     "unknown vpp listener handle %llx",
279                     getpid (), mp->listener_handle);
280       vcl_send_session_accepted_reply (evt_q, mp->context, mp->handle,
281                                        VNET_API_ERROR_INVALID_ARGUMENT);
282       vcl_session_free (wrk, session);
283       return VCL_INVALID_SESSION_INDEX;
284     }
285
286   rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
287   tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
288
289   if (vcl_wait_for_segment (mp->segment_handle))
290     {
291       clib_warning ("segment for session %u couldn't be mounted!",
292                     session->session_index);
293       return VCL_INVALID_SESSION_INDEX;
294     }
295
296   session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
297                                          svm_msg_q_t *);
298   rx_fifo->client_session_index = session->session_index;
299   tx_fifo->client_session_index = session->session_index;
300   rx_fifo->client_thread_index = vcl_get_worker_index ();
301   tx_fifo->client_thread_index = vcl_get_worker_index ();
302   vpp_wrk_index = tx_fifo->master_thread_index;
303   vec_validate (wrk->vpp_event_queues, vpp_wrk_index);
304   wrk->vpp_event_queues[vpp_wrk_index] = session->vpp_evt_q;
305
306   session->vpp_handle = mp->handle;
307   session->vpp_thread_index = rx_fifo->master_thread_index;
308   session->client_context = mp->context;
309   session->rx_fifo = rx_fifo;
310   session->tx_fifo = tx_fifo;
311
312   session->session_state = STATE_ACCEPT;
313   session->transport.rmt_port = mp->port;
314   session->transport.is_ip4 = mp->is_ip4;
315   clib_memcpy_fast (&session->transport.rmt_ip, mp->ip,
316                     sizeof (ip46_address_t));
317
318   vcl_session_table_add_vpp_handle (wrk, mp->handle, session->session_index);
319   session->transport.lcl_port = listen_session->transport.lcl_port;
320   session->transport.lcl_ip = listen_session->transport.lcl_ip;
321   session->session_type = listen_session->session_type;
322   session->is_dgram = session->session_type == VPPCOM_PROTO_UDP;
323
324   VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: client accept request from %s"
325         " address %U port %d queue %p!", getpid (), mp->handle,
326         session->session_index,
327         mp->is_ip4 ? "IPv4" : "IPv6", format_ip46_address, &mp->ip,
328         mp->is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
329         clib_net_to_host_u16 (mp->port), session->vpp_evt_q);
330   vcl_evt (VCL_EVT_ACCEPT, session, listen_session, session_index);
331
332   return session->session_index;
333 }
334
335 static u32
336 vcl_session_connected_handler (vcl_worker_t * wrk,
337                                session_connected_msg_t * mp)
338 {
339   u32 session_index, vpp_wrk_index;
340   svm_fifo_t *rx_fifo, *tx_fifo;
341   vcl_session_t *session = 0;
342
343   session_index = mp->context;
344   session = vcl_session_get (wrk, session_index);
345   if (!session)
346     {
347       clib_warning ("[%s] ERROR: vpp handle 0x%llx, sid %u: "
348                     "Invalid session index (%u)!",
349                     getpid (), mp->handle, session_index);
350       return VCL_INVALID_SESSION_INDEX;
351     }
352   if (mp->retval)
353     {
354       clib_warning ("VCL<%d>: ERROR: sid %u: connect failed! %U", getpid (),
355                     session_index, format_api_error, ntohl (mp->retval));
356       session->session_state = STATE_FAILED;
357       session->vpp_handle = mp->handle;
358       return session_index;
359     }
360
361   rx_fifo = uword_to_pointer (mp->server_rx_fifo, svm_fifo_t *);
362   tx_fifo = uword_to_pointer (mp->server_tx_fifo, svm_fifo_t *);
363   if (vcl_wait_for_segment (mp->segment_handle))
364     {
365       VDBG (0, "segment for session %u couldn't be mounted!",
366             session->session_index);
367       return VCL_INVALID_SESSION_INDEX;
368     }
369
370   rx_fifo->client_session_index = session_index;
371   tx_fifo->client_session_index = session_index;
372   rx_fifo->client_thread_index = vcl_get_worker_index ();
373   tx_fifo->client_thread_index = vcl_get_worker_index ();
374
375   session->vpp_evt_q = uword_to_pointer (mp->vpp_event_queue_address,
376                                          svm_msg_q_t *);
377   vpp_wrk_index = tx_fifo->master_thread_index;
378   vec_validate (wrk->vpp_event_queues, vpp_wrk_index);
379   wrk->vpp_event_queues[vpp_wrk_index] = session->vpp_evt_q;
380
381   if (mp->ct_rx_fifo)
382     {
383       session->ct_rx_fifo = uword_to_pointer (mp->ct_rx_fifo, svm_fifo_t *);
384       session->ct_tx_fifo = uword_to_pointer (mp->ct_tx_fifo, svm_fifo_t *);
385       if (vcl_wait_for_segment (mp->ct_segment_handle))
386         {
387           VDBG (0, "ct segment for session %u couldn't be mounted!",
388                 session->session_index);
389           return VCL_INVALID_SESSION_INDEX;
390         }
391     }
392
393   session->rx_fifo = rx_fifo;
394   session->tx_fifo = tx_fifo;
395   session->vpp_handle = mp->handle;
396   session->vpp_thread_index = rx_fifo->master_thread_index;
397   session->transport.is_ip4 = mp->is_ip4;
398   clib_memcpy_fast (&session->transport.lcl_ip, mp->lcl_ip,
399                     sizeof (session->transport.lcl_ip));
400   session->transport.lcl_port = mp->lcl_port;
401   session->session_state = STATE_CONNECT;
402
403   /* Add it to lookup table */
404   vcl_session_table_add_vpp_handle (wrk, mp->handle, session_index);
405
406   VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: connect succeeded! "
407         "session_rx_fifo %p, refcnt %d, session_tx_fifo %p, refcnt %d",
408         getpid (), mp->handle, session_index, session->rx_fifo,
409         session->rx_fifo->refcnt, session->tx_fifo, session->tx_fifo->refcnt);
410
411   return session_index;
412 }
413
414 static int
415 vcl_flag_accepted_session (vcl_session_t * session, u64 handle, u32 flags)
416 {
417   vcl_session_msg_t *accepted_msg;
418   int i;
419
420   for (i = 0; i < vec_len (session->accept_evts_fifo); i++)
421     {
422       accepted_msg = &session->accept_evts_fifo[i];
423       if (accepted_msg->accepted_msg.handle == handle)
424         {
425           accepted_msg->flags |= flags;
426           return 1;
427         }
428     }
429   return 0;
430 }
431
432 static u32
433 vcl_session_reset_handler (vcl_worker_t * wrk,
434                            session_reset_msg_t * reset_msg)
435 {
436   vcl_session_t *session;
437   u32 sid;
438
439   sid = vcl_session_index_from_vpp_handle (wrk, reset_msg->handle);
440   session = vcl_session_get (wrk, sid);
441   if (!session)
442     {
443       VDBG (0, "request to reset unknown handle 0x%llx", reset_msg->handle);
444       return VCL_INVALID_SESSION_INDEX;
445     }
446
447   /* Caught a reset before actually accepting the session */
448   if (session->session_state == STATE_LISTEN)
449     {
450
451       if (!vcl_flag_accepted_session (session, reset_msg->handle,
452                                       VCL_ACCEPTED_F_RESET))
453         VDBG (0, "session was not accepted!");
454       return VCL_INVALID_SESSION_INDEX;
455     }
456
457   session->session_state = STATE_DISCONNECT;
458   VDBG (0, "reset session %u [0x%llx]", sid, reset_msg->handle);
459   return sid;
460 }
461
462 static u32
463 vcl_session_bound_handler (vcl_worker_t * wrk, session_bound_msg_t * mp)
464 {
465   vcl_session_t *session;
466   u32 sid = mp->context;
467
468   session = vcl_session_get (wrk, sid);
469   if (mp->retval)
470     {
471       VERR ("vpp handle 0x%llx, sid %u: bind failed: %U", mp->handle, sid,
472             format_api_error, mp->retval);
473       if (session)
474         {
475           session->session_state = STATE_FAILED;
476           session->vpp_handle = mp->handle;
477           return sid;
478         }
479       else
480         {
481           clib_warning ("[%s] ERROR: vpp handle 0x%llx, sid %u: "
482                         "Invalid session index (%u)!",
483                         getpid (), mp->handle, sid);
484           return VCL_INVALID_SESSION_INDEX;
485         }
486     }
487
488   session->vpp_handle = mp->handle;
489   session->transport.is_ip4 = mp->lcl_is_ip4;
490   clib_memcpy_fast (&session->transport.lcl_ip, mp->lcl_ip,
491                     sizeof (ip46_address_t));
492   session->transport.lcl_port = mp->lcl_port;
493   vcl_session_table_add_listener (wrk, mp->handle, sid);
494   session->session_state = STATE_LISTEN;
495
496   session->vpp_evt_q = uword_to_pointer (mp->vpp_evt_q, svm_msg_q_t *);
497   vec_validate (wrk->vpp_event_queues, 0);
498   wrk->vpp_event_queues[0] = session->vpp_evt_q;
499
500   if (session->is_dgram)
501     {
502       svm_fifo_t *rx_fifo, *tx_fifo;
503       session->vpp_evt_q = uword_to_pointer (mp->vpp_evt_q, svm_msg_q_t *);
504       rx_fifo = uword_to_pointer (mp->rx_fifo, svm_fifo_t *);
505       rx_fifo->client_session_index = sid;
506       tx_fifo = uword_to_pointer (mp->tx_fifo, svm_fifo_t *);
507       tx_fifo->client_session_index = sid;
508       session->rx_fifo = rx_fifo;
509       session->tx_fifo = tx_fifo;
510     }
511
512   VDBG (0, "session %u [0x%llx]: listen succeeded!", sid, mp->handle);
513   return sid;
514 }
515
516 static void
517 vcl_session_unlisten_reply_handler (vcl_worker_t * wrk, void *data)
518 {
519   session_unlisten_reply_msg_t *mp = (session_unlisten_reply_msg_t *) data;
520   vcl_session_t *s;
521
522   s = vcl_session_get_w_vpp_handle (wrk, mp->handle);
523   if (!s || s->session_state != STATE_DISCONNECT)
524     {
525       VDBG (0, "Unlisten reply with wrong handle %llx", mp->handle);
526       return;
527     }
528
529   if (mp->retval)
530     VDBG (0, "ERROR: session %u [0xllx]: unlisten failed: %U",
531           s->session_index, mp->handle, format_api_error, ntohl (mp->retval));
532
533   if (mp->context != wrk->wrk_index)
534     VDBG (0, "wrong context");
535
536   vcl_session_table_del_vpp_handle (wrk, mp->handle);
537   vcl_session_free (wrk, s);
538 }
539
540 static vcl_session_t *
541 vcl_session_accepted (vcl_worker_t * wrk, session_accepted_msg_t * msg)
542 {
543   vcl_session_msg_t *vcl_msg;
544   vcl_session_t *session;
545
546   session = vcl_session_get_w_vpp_handle (wrk, msg->handle);
547   if (PREDICT_FALSE (session != 0))
548     VWRN ("session overlap handle %lu state %u!", msg->handle,
549           session->session_state);
550
551   session = vcl_session_table_lookup_listener (wrk, msg->listener_handle);
552   if (!session)
553     {
554       VERR ("couldn't find listen session: listener handle %llx",
555             msg->listener_handle);
556       return 0;
557     }
558
559   clib_fifo_add2 (session->accept_evts_fifo, vcl_msg);
560   vcl_msg->accepted_msg = *msg;
561   /* Session handle points to listener until fully accepted by app */
562   vcl_session_table_add_vpp_handle (wrk, msg->handle, session->session_index);
563
564   return session;
565 }
566
567 static vcl_session_t *
568 vcl_session_disconnected_handler (vcl_worker_t * wrk,
569                                   session_disconnected_msg_t * msg)
570 {
571   vcl_session_t *session;
572
573   session = vcl_session_get_w_vpp_handle (wrk, msg->handle);
574   if (!session)
575     {
576       VDBG (0, "request to disconnect unknown handle 0x%llx", msg->handle);
577       return 0;
578     }
579
580   /* Caught a disconnect before actually accepting the session */
581   if (session->session_state == STATE_LISTEN)
582     {
583       if (!vcl_flag_accepted_session (session, msg->handle,
584                                       VCL_ACCEPTED_F_CLOSED))
585         VDBG (0, "session was not accepted!");
586       return 0;
587     }
588
589   session->session_state = STATE_VPP_CLOSING;
590   return session;
591 }
592
593 static void
594 vcl_session_req_worker_update_handler (vcl_worker_t * wrk, void *data)
595 {
596   session_req_worker_update_msg_t *msg;
597   vcl_session_t *s;
598
599   msg = (session_req_worker_update_msg_t *) data;
600   s = vcl_session_get_w_vpp_handle (wrk, msg->session_handle);
601   if (!s)
602     return;
603
604   vec_add1 (wrk->pending_session_wrk_updates, s->session_index);
605 }
606
607 static void
608 vcl_session_worker_update_reply_handler (vcl_worker_t * wrk, void *data)
609 {
610   session_worker_update_reply_msg_t *msg;
611   vcl_session_t *s;
612
613   msg = (session_worker_update_reply_msg_t *) data;
614   s = vcl_session_get_w_vpp_handle (wrk, msg->handle);
615   if (!s)
616     {
617       VDBG (0, "unknown handle 0x%llx", msg->handle);
618       return;
619     }
620   if (vcl_wait_for_segment (msg->segment_handle))
621     {
622       clib_warning ("segment for session %u couldn't be mounted!",
623                     s->session_index);
624       return;
625     }
626
627   if (s->rx_fifo)
628     {
629       s->rx_fifo = uword_to_pointer (msg->rx_fifo, svm_fifo_t *);
630       s->tx_fifo = uword_to_pointer (msg->tx_fifo, svm_fifo_t *);
631       s->rx_fifo->client_session_index = s->session_index;
632       s->tx_fifo->client_session_index = s->session_index;
633       s->rx_fifo->client_thread_index = wrk->wrk_index;
634       s->tx_fifo->client_thread_index = wrk->wrk_index;
635     }
636   s->session_state = STATE_UPDATED;
637
638   VDBG (0, "session %u[0x%llx] moved to worker %u", s->session_index,
639         s->vpp_handle, wrk->wrk_index);
640 }
641
642 static int
643 vcl_handle_mq_event (vcl_worker_t * wrk, session_event_t * e)
644 {
645   session_disconnected_msg_t *disconnected_msg;
646   vcl_session_t *session;
647
648   switch (e->event_type)
649     {
650     case SESSION_IO_EVT_RX:
651     case SESSION_IO_EVT_TX:
652       session = vcl_session_get (wrk, e->session_index);
653       if (!session || !(session->session_state & STATE_OPEN))
654         break;
655       vec_add1 (wrk->unhandled_evts_vector, *e);
656       break;
657     case SESSION_CTRL_EVT_ACCEPTED:
658       vcl_session_accepted (wrk, (session_accepted_msg_t *) e->data);
659       break;
660     case SESSION_CTRL_EVT_CONNECTED:
661       vcl_session_connected_handler (wrk,
662                                      (session_connected_msg_t *) e->data);
663       break;
664     case SESSION_CTRL_EVT_DISCONNECTED:
665       disconnected_msg = (session_disconnected_msg_t *) e->data;
666       session = vcl_session_disconnected_handler (wrk, disconnected_msg);
667       if (!session)
668         break;
669       VDBG (0, "disconnected session %u [0x%llx]", session->session_index,
670             session->vpp_handle);
671       break;
672     case SESSION_CTRL_EVT_RESET:
673       vcl_session_reset_handler (wrk, (session_reset_msg_t *) e->data);
674       break;
675     case SESSION_CTRL_EVT_BOUND:
676       vcl_session_bound_handler (wrk, (session_bound_msg_t *) e->data);
677       break;
678     case SESSION_CTRL_EVT_UNLISTEN_REPLY:
679       vcl_session_unlisten_reply_handler (wrk, e->data);
680       break;
681     case SESSION_CTRL_EVT_REQ_WORKER_UPDATE:
682       vcl_session_req_worker_update_handler (wrk, e->data);
683       break;
684     case SESSION_CTRL_EVT_WORKER_UPDATE_REPLY:
685       vcl_session_worker_update_reply_handler (wrk, e->data);
686       break;
687     default:
688       clib_warning ("unhandled %u", e->event_type);
689     }
690   return VPPCOM_OK;
691 }
692
693 static int
694 vppcom_wait_for_session_state_change (u32 session_index,
695                                       vcl_session_state_t state,
696                                       f64 wait_for_time)
697 {
698   vcl_worker_t *wrk = vcl_worker_get_current ();
699   f64 timeout = clib_time_now (&wrk->clib_time) + wait_for_time;
700   vcl_session_t *volatile session;
701   svm_msg_q_msg_t msg;
702   session_event_t *e;
703
704   do
705     {
706       session = vcl_session_get (wrk, session_index);
707       if (PREDICT_FALSE (!session))
708         {
709           return VPPCOM_EBADFD;
710         }
711       if (session->session_state & state)
712         {
713           return VPPCOM_OK;
714         }
715       if (session->session_state & STATE_FAILED)
716         {
717           return VPPCOM_ECONNREFUSED;
718         }
719
720       if (svm_msg_q_sub (wrk->app_event_queue, &msg, SVM_Q_NOWAIT, 0))
721         {
722           usleep (100);
723           continue;
724         }
725       e = svm_msg_q_msg_data (wrk->app_event_queue, &msg);
726       vcl_handle_mq_event (wrk, e);
727       svm_msg_q_free_msg (wrk->app_event_queue, &msg);
728     }
729   while (clib_time_now (&wrk->clib_time) < timeout);
730
731   VDBG (0, "timeout waiting for state 0x%x (%s)", state,
732         vppcom_session_state_str (state));
733   vcl_evt (VCL_EVT_SESSION_TIMEOUT, session, session_state);
734
735   return VPPCOM_ETIMEDOUT;
736 }
737
738 static void
739 vcl_handle_pending_wrk_updates (vcl_worker_t * wrk)
740 {
741   vcl_session_state_t state;
742   vcl_session_t *s;
743   u32 *sip;
744
745   if (PREDICT_TRUE (vec_len (wrk->pending_session_wrk_updates) == 0))
746     return;
747
748   vec_foreach (sip, wrk->pending_session_wrk_updates)
749   {
750     s = vcl_session_get (wrk, *sip);
751     vcl_send_session_worker_update (wrk, s, wrk->wrk_index);
752     state = s->session_state;
753     vppcom_wait_for_session_state_change (s->session_index, STATE_UPDATED, 5);
754     s->session_state = state;
755   }
756   vec_reset_length (wrk->pending_session_wrk_updates);
757 }
758
759 void
760 vcl_flush_mq_events (void)
761 {
762   vcl_worker_t *wrk = vcl_worker_get_current ();
763   svm_msg_q_msg_t *msg;
764   session_event_t *e;
765   svm_msg_q_t *mq;
766   int i;
767
768   mq = wrk->app_event_queue;
769   svm_msg_q_lock (mq);
770   vcl_mq_dequeue_batch (wrk, mq);
771   svm_msg_q_unlock (mq);
772
773   for (i = 0; i < vec_len (wrk->mq_msg_vector); i++)
774     {
775       msg = vec_elt_at_index (wrk->mq_msg_vector, i);
776       e = svm_msg_q_msg_data (mq, msg);
777       vcl_handle_mq_event (wrk, e);
778       svm_msg_q_free_msg (mq, msg);
779     }
780   vec_reset_length (wrk->mq_msg_vector);
781   vcl_handle_pending_wrk_updates (wrk);
782 }
783
784 static int
785 vppcom_app_session_enable (void)
786 {
787   int rv;
788
789   if (vcm->app_state != STATE_APP_ENABLED)
790     {
791       vppcom_send_session_enable_disable (1 /* is_enabled == TRUE */ );
792       rv = vcl_wait_for_app_state_change (STATE_APP_ENABLED);
793       if (PREDICT_FALSE (rv))
794         {
795           VDBG (0, "VCL<%d>: application session enable timed out! "
796                 "returning %d (%s)", getpid (), rv, vppcom_retval_str (rv));
797           return rv;
798         }
799     }
800   return VPPCOM_OK;
801 }
802
803 static int
804 vppcom_app_attach (void)
805 {
806   int rv;
807
808   vppcom_app_send_attach ();
809   rv = vcl_wait_for_app_state_change (STATE_APP_ATTACHED);
810   if (PREDICT_FALSE (rv))
811     {
812       VDBG (0, "VCL<%d>: application attach timed out! returning %d (%s)",
813             getpid (), rv, vppcom_retval_str (rv));
814       return rv;
815     }
816
817   return VPPCOM_OK;
818 }
819
820 static int
821 vppcom_session_unbind (u32 session_handle)
822 {
823   vcl_worker_t *wrk = vcl_worker_get_current ();
824   vcl_session_t *session = 0;
825   u64 vpp_handle;
826
827   session = vcl_session_get_w_handle (wrk, session_handle);
828   if (!session)
829     return VPPCOM_EBADFD;
830
831   vpp_handle = session->vpp_handle;
832   session->vpp_handle = ~0;
833   session->session_state = STATE_DISCONNECT;
834
835   VDBG (1, "vpp handle 0x%llx, sid %u: sending unbind msg! new state"
836         " 0x%x (%s)", vpp_handle, session_handle, STATE_DISCONNECT,
837         vppcom_session_state_str (STATE_DISCONNECT));
838   vcl_evt (VCL_EVT_UNBIND, session);
839   vppcom_send_unbind_sock (wrk, vpp_handle);
840
841   return VPPCOM_OK;
842 }
843
844 static int
845 vppcom_session_disconnect (u32 session_handle)
846 {
847   vcl_worker_t *wrk = vcl_worker_get_current ();
848   svm_msg_q_t *vpp_evt_q;
849   vcl_session_t *session;
850   vcl_session_state_t state;
851   u64 vpp_handle;
852
853   session = vcl_session_get_w_handle (wrk, session_handle);
854   if (!session)
855     return VPPCOM_EBADFD;
856
857   vpp_handle = session->vpp_handle;
858   state = session->session_state;
859
860   VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u state 0x%x (%s)", getpid (),
861         vpp_handle, session_handle, state, vppcom_session_state_str (state));
862
863   if (PREDICT_FALSE (state & STATE_LISTEN))
864     {
865       clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
866                     "Cannot disconnect a listen socket!",
867                     getpid (), vpp_handle, session_handle);
868       return VPPCOM_EBADFD;
869     }
870
871   if (state & STATE_VPP_CLOSING)
872     {
873       vpp_evt_q = vcl_session_vpp_evt_q (wrk, session);
874       vcl_send_session_disconnected_reply (vpp_evt_q, wrk->my_client_index,
875                                            vpp_handle, 0);
876       VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: sending disconnect "
877             "REPLY...", getpid (), vpp_handle, session_handle);
878     }
879   else
880     {
881       VDBG (1, "VCL<%d>: vpp handle 0x%llx, sid %u: sending disconnect...",
882             getpid (), vpp_handle, session_handle);
883       vppcom_send_disconnect_session (vpp_handle);
884     }
885
886   return VPPCOM_OK;
887 }
888
889 /**
890  * Handle app exit
891  *
892  * Notify vpp of the disconnect and mark the worker as free. If we're the
893  * last worker, do a full cleanup otherwise, since we're probably a forked
894  * child, avoid syscalls as much as possible. We might've lost privileges.
895  */
896 void
897 vppcom_app_exit (void)
898 {
899   if (!pool_elts (vcm->workers))
900     return;
901   vcl_worker_cleanup (vcl_worker_get_current (), 1 /* notify vpp */ );
902   vcl_set_worker_index (~0);
903   vcl_elog_stop (vcm);
904   if (vec_len (vcm->workers) == 1)
905     vl_client_disconnect_from_vlib ();
906   else
907     vl_client_send_disconnect (1 /* vpp should cleanup */ );
908 }
909
910 /*
911  * VPPCOM Public API functions
912  */
913 int
914 vppcom_app_create (char *app_name)
915 {
916   vppcom_cfg_t *vcl_cfg = &vcm->cfg;
917   int rv;
918
919   if (vcm->is_init)
920     {
921       VDBG (1, "already initialized");
922       return VPPCOM_EEXIST;
923     }
924
925   vcm->is_init = 1;
926   vppcom_cfg (&vcm->cfg);
927   vcl_cfg = &vcm->cfg;
928
929   vcm->main_cpu = pthread_self ();
930   vcm->main_pid = getpid ();
931   vcm->app_name = format (0, "%s", app_name);
932   vppcom_init_error_string_table ();
933   svm_fifo_segment_main_init (&vcm->segment_main, vcl_cfg->segment_baseva,
934                               20 /* timeout in secs */ );
935   pool_alloc (vcm->workers, vcl_cfg->max_workers);
936   clib_spinlock_init (&vcm->workers_lock);
937   clib_rwlock_init (&vcm->segment_table_lock);
938   atexit (vppcom_app_exit);
939
940   /* Allocate default worker */
941   vcl_worker_alloc_and_init ();
942
943   /* API hookup and connect to VPP */
944   vppcom_api_hookup ();
945   vcl_elog_init (vcm);
946   vcm->app_state = STATE_APP_START;
947   rv = vppcom_connect_to_vpp (app_name);
948   if (rv)
949     {
950       VERR ("couldn't connect to VPP!");
951       return rv;
952     }
953   VDBG (0, "sending session enable");
954   rv = vppcom_app_session_enable ();
955   if (rv)
956     {
957       VERR ("vppcom_app_session_enable() failed!");
958       return rv;
959     }
960
961   VDBG (0, "sending app attach");
962   rv = vppcom_app_attach ();
963   if (rv)
964     {
965       VERR ("vppcom_app_attach() failed!");
966       return rv;
967     }
968
969   VDBG (0, "app_name '%s', my_client_index %d (0x%x)", app_name,
970         vcm->workers[0].my_client_index, vcm->workers[0].my_client_index);
971
972   return VPPCOM_OK;
973 }
974
975 void
976 vppcom_app_destroy (void)
977 {
978   int rv;
979   f64 orig_app_timeout;
980
981   if (!pool_elts (vcm->workers))
982     return;
983
984   vcl_evt (VCL_EVT_DETACH, vcm);
985
986   if (pool_elts (vcm->workers) == 1)
987     {
988       vppcom_app_send_detach ();
989       orig_app_timeout = vcm->cfg.app_timeout;
990       vcm->cfg.app_timeout = 2.0;
991       rv = vcl_wait_for_app_state_change (STATE_APP_ENABLED);
992       vcm->cfg.app_timeout = orig_app_timeout;
993       if (PREDICT_FALSE (rv))
994         VDBG (0, "application detach timed out! returning %d (%s)", rv,
995               vppcom_retval_str (rv));
996       vec_free (vcm->app_name);
997       vcl_worker_cleanup (vcl_worker_get_current (), 0 /* notify vpp */ );
998     }
999   else
1000     {
1001       vcl_worker_cleanup (vcl_worker_get_current (), 1 /* notify vpp */ );
1002     }
1003
1004   vcl_set_worker_index (~0);
1005   vcl_elog_stop (vcm);
1006   vl_client_disconnect_from_vlib ();
1007 }
1008
1009 int
1010 vppcom_session_create (u8 proto, u8 is_nonblocking)
1011 {
1012   vcl_worker_t *wrk = vcl_worker_get_current ();
1013   vcl_session_t *session;
1014
1015   session = vcl_session_alloc (wrk);
1016
1017   session->session_type = proto;
1018   session->session_state = STATE_START;
1019   session->vpp_handle = ~0;
1020   session->is_dgram = proto == VPPCOM_PROTO_UDP;
1021
1022   if (is_nonblocking)
1023     VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_NONBLOCK);
1024
1025   vcl_evt (VCL_EVT_CREATE, session, session_type, session->session_state,
1026            is_nonblocking, session_index);
1027
1028   VDBG (0, "created sid %u", session->session_index);
1029
1030   return vcl_session_handle (session);
1031 }
1032
1033 int
1034 vcl_session_cleanup (vcl_worker_t * wrk, vcl_session_t * session,
1035                      vcl_session_handle_t sh, u8 do_disconnect)
1036 {
1037   vcl_session_state_t state;
1038   u32 next_sh, vep_sh;
1039   int rv = VPPCOM_OK;
1040   u64 vpp_handle;
1041   u8 is_vep;
1042
1043   is_vep = session->is_vep;
1044   next_sh = session->vep.next_sh;
1045   vep_sh = session->vep.vep_sh;
1046   state = session->session_state;
1047   vpp_handle = session->vpp_handle;
1048
1049   VDBG (1, "session %u [0x%llx] closing", session->session_index, vpp_handle);
1050
1051   if (is_vep)
1052     {
1053       while (next_sh != ~0)
1054         {
1055           rv = vppcom_epoll_ctl (sh, EPOLL_CTL_DEL, next_sh, 0);
1056           if (PREDICT_FALSE (rv < 0))
1057             VDBG (0, "vpp handle 0x%llx, sid %u: EPOLL_CTL_DEL vep_idx %u"
1058                   " failed! rv %d (%s)", vpp_handle, next_sh, vep_sh, rv,
1059                   vppcom_retval_str (rv));
1060
1061           next_sh = session->vep.next_sh;
1062         }
1063     }
1064   else
1065     {
1066       if (session->is_vep_session)
1067         {
1068           rv = vppcom_epoll_ctl (vep_sh, EPOLL_CTL_DEL, sh, 0);
1069           if (rv < 0)
1070             VDBG (0, "session %u [0x%llx]: EPOLL_CTL_DEL vep_idx %u "
1071                   "failed! rv %d (%s)", session->session_index, vpp_handle,
1072                   vep_sh, rv, vppcom_retval_str (rv));
1073         }
1074
1075       if (!do_disconnect)
1076         {
1077           VDBG (1, "session %u [0x%llx] disconnect skipped",
1078                 session->session_index, vpp_handle);
1079           goto cleanup;
1080         }
1081
1082       if (state & STATE_LISTEN)
1083         {
1084           rv = vppcom_session_unbind (sh);
1085           if (PREDICT_FALSE (rv < 0))
1086             VDBG (0, "session %u [0x%llx]: listener unbind failed! "
1087                   "rv %d (%s)", session->session_index, vpp_handle, rv,
1088                   vppcom_retval_str (rv));
1089           return rv;
1090         }
1091       else if (state & STATE_OPEN)
1092         {
1093           rv = vppcom_session_disconnect (sh);
1094           if (PREDICT_FALSE (rv < 0))
1095             VDBG (0, "ERROR: session %u [0x%llx]: disconnect failed!"
1096                   " rv %d (%s)", session->session_index, vpp_handle,
1097                   rv, vppcom_retval_str (rv));
1098         }
1099       else if (state == STATE_DISCONNECT)
1100         {
1101           svm_msg_q_t *mq = vcl_session_vpp_evt_q (wrk, session);
1102           vcl_send_session_reset_reply (mq, wrk->my_client_index,
1103                                         session->vpp_handle, 0);
1104         }
1105     }
1106
1107   VDBG (0, "session %u [0x%llx] removed", session->session_index, vpp_handle);
1108
1109 cleanup:
1110   vcl_session_table_del_vpp_handle (wrk, vpp_handle);
1111   vcl_session_free (wrk, session);
1112   vcl_evt (VCL_EVT_CLOSE, session, rv);
1113
1114   return rv;
1115 }
1116
1117 int
1118 vppcom_session_close (uint32_t session_handle)
1119 {
1120   vcl_worker_t *wrk = vcl_worker_get_current ();
1121   vcl_session_t *session;
1122
1123   session = vcl_session_get_w_handle (wrk, session_handle);
1124   if (!session)
1125     return VPPCOM_EBADFD;
1126   return vcl_session_cleanup (wrk, session, session_handle,
1127                               1 /* do_disconnect */ );
1128 }
1129
1130 int
1131 vppcom_session_bind (uint32_t session_handle, vppcom_endpt_t * ep)
1132 {
1133   vcl_worker_t *wrk = vcl_worker_get_current ();
1134   vcl_session_t *session = 0;
1135
1136   if (!ep || !ep->ip)
1137     return VPPCOM_EINVAL;
1138
1139   session = vcl_session_get_w_handle (wrk, session_handle);
1140   if (!session)
1141     return VPPCOM_EBADFD;
1142
1143   if (session->is_vep)
1144     {
1145       clib_warning ("VCL<%d>: ERROR: sid %u: cannot "
1146                     "bind to an epoll session!", getpid (), session_handle);
1147       return VPPCOM_EBADFD;
1148     }
1149
1150   session->transport.is_ip4 = ep->is_ip4;
1151   if (ep->is_ip4)
1152     clib_memcpy_fast (&session->transport.lcl_ip.ip4, ep->ip,
1153                       sizeof (ip4_address_t));
1154   else
1155     clib_memcpy_fast (&session->transport.lcl_ip.ip6, ep->ip,
1156                       sizeof (ip6_address_t));
1157   session->transport.lcl_port = ep->port;
1158
1159   VDBG (0, "VCL<%d>: sid %u: binding to local %s address %U port %u, "
1160         "proto %s", getpid (), session_handle,
1161         session->transport.is_ip4 ? "IPv4" : "IPv6",
1162         format_ip46_address, &session->transport.lcl_ip,
1163         session->transport.is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
1164         clib_net_to_host_u16 (session->transport.lcl_port),
1165         vppcom_proto_str (session->session_type));
1166   vcl_evt (VCL_EVT_BIND, session);
1167
1168   if (session->session_type == VPPCOM_PROTO_UDP)
1169     vppcom_session_listen (session_handle, 10);
1170
1171   return VPPCOM_OK;
1172 }
1173
1174 int
1175 vppcom_session_listen (uint32_t listen_sh, uint32_t q_len)
1176 {
1177   vcl_worker_t *wrk = vcl_worker_get_current ();
1178   vcl_session_t *listen_session = 0;
1179   u64 listen_vpp_handle;
1180   int rv;
1181
1182   listen_session = vcl_session_get_w_handle (wrk, listen_sh);
1183   if (!listen_session || listen_session->is_vep)
1184     return VPPCOM_EBADFD;
1185
1186   if (q_len == 0 || q_len == ~0)
1187     q_len = vcm->cfg.listen_queue_size;
1188
1189   listen_vpp_handle = listen_session->vpp_handle;
1190   if (listen_session->session_state & STATE_LISTEN)
1191     {
1192       VDBG (0, "session %u [0x%llx]: already in listen state!",
1193             listen_sh, listen_vpp_handle);
1194       return VPPCOM_OK;
1195     }
1196
1197   VDBG (0, "session %u [0x%llx]: sending vpp listen request...",
1198         listen_sh, listen_vpp_handle);
1199
1200   /*
1201    * Send listen request to vpp and wait for reply
1202    */
1203   vppcom_send_bind_sock (listen_session);
1204   rv = vppcom_wait_for_session_state_change (listen_session->session_index,
1205                                              STATE_LISTEN,
1206                                              vcm->cfg.session_timeout);
1207
1208   if (PREDICT_FALSE (rv))
1209     {
1210       listen_session = vcl_session_get_w_handle (wrk, listen_sh);
1211       VDBG (0, "session %u [0x%llx]: listen failed! returning %d (%s)",
1212             listen_sh, listen_session->vpp_handle, rv,
1213             vppcom_retval_str (rv));
1214       return rv;
1215     }
1216
1217   return VPPCOM_OK;
1218 }
1219
1220 int
1221 vppcom_session_tls_add_cert (uint32_t session_handle, char *cert,
1222                              uint32_t cert_len)
1223 {
1224
1225   vcl_worker_t *wrk = vcl_worker_get_current ();
1226   vcl_session_t *session = 0;
1227
1228   session = vcl_session_get_w_handle (wrk, session_handle);
1229   if (!session)
1230     return VPPCOM_EBADFD;
1231
1232   if (cert_len == 0 || cert_len == ~0)
1233     return VPPCOM_EBADFD;
1234
1235   /*
1236    * Send listen request to vpp and wait for reply
1237    */
1238   vppcom_send_application_tls_cert_add (session, cert, cert_len);
1239
1240   return VPPCOM_OK;
1241
1242 }
1243
1244 int
1245 vppcom_session_tls_add_key (uint32_t session_handle, char *key,
1246                             uint32_t key_len)
1247 {
1248
1249   vcl_worker_t *wrk = vcl_worker_get_current ();
1250   vcl_session_t *session = 0;
1251
1252   session = vcl_session_get_w_handle (wrk, session_handle);
1253   if (!session)
1254     return VPPCOM_EBADFD;
1255
1256   if (key_len == 0 || key_len == ~0)
1257     return VPPCOM_EBADFD;
1258
1259   /*
1260    * Send listen request to vpp and wait for reply
1261    */
1262   vppcom_send_application_tls_key_add (session, key, key_len);
1263
1264   return VPPCOM_OK;
1265
1266
1267 }
1268
1269 static int
1270 validate_args_session_accept_ (vcl_worker_t * wrk,
1271                                vcl_session_t * listen_session)
1272 {
1273   /* Input validation - expects spinlock on sessions_lockp */
1274   if (listen_session->is_vep)
1275     {
1276       clib_warning ("VCL<%d>: ERROR: sid %u: cannot accept on an "
1277                     "epoll session!", getpid (),
1278                     listen_session->session_index);
1279       return VPPCOM_EBADFD;
1280     }
1281
1282   if (listen_session->session_state != STATE_LISTEN)
1283     {
1284       clib_warning ("VCL<%d>: ERROR: vpp handle 0x%llx, sid %u: "
1285                     "not in listen state! state 0x%x (%s)", getpid (),
1286                     listen_session->vpp_handle, listen_session->session_index,
1287                     listen_session->session_state,
1288                     vppcom_session_state_str (listen_session->session_state));
1289       return VPPCOM_EBADFD;
1290     }
1291   return VPPCOM_OK;
1292 }
1293
1294 int
1295 vppcom_session_accept (uint32_t listen_session_handle, vppcom_endpt_t * ep,
1296                        uint32_t flags)
1297 {
1298   u32 client_session_index = ~0, listen_session_index, accept_flags = 0;
1299   vcl_worker_t *wrk = vcl_worker_get_current ();
1300   session_accepted_msg_t accepted_msg;
1301   vcl_session_t *listen_session = 0;
1302   vcl_session_t *client_session = 0;
1303   vcl_session_msg_t *evt;
1304   u64 listen_vpp_handle;
1305   svm_msg_q_msg_t msg;
1306   session_event_t *e;
1307   u8 is_nonblocking;
1308   int rv;
1309
1310   listen_session = vcl_session_get_w_handle (wrk, listen_session_handle);
1311   if (!listen_session)
1312     return VPPCOM_EBADFD;
1313
1314   listen_session_index = listen_session->session_index;
1315   if ((rv = validate_args_session_accept_ (wrk, listen_session)))
1316     return rv;
1317
1318   if (clib_fifo_elts (listen_session->accept_evts_fifo))
1319     {
1320       clib_fifo_sub2 (listen_session->accept_evts_fifo, evt);
1321       accept_flags = evt->flags;
1322       accepted_msg = evt->accepted_msg;
1323       goto handle;
1324     }
1325
1326   is_nonblocking = VCL_SESS_ATTR_TEST (listen_session->attr,
1327                                        VCL_SESS_ATTR_NONBLOCK);
1328   if (svm_msg_q_is_empty (wrk->app_event_queue) && is_nonblocking)
1329     return VPPCOM_EAGAIN;
1330
1331   while (1)
1332     {
1333       if (svm_msg_q_sub (wrk->app_event_queue, &msg, SVM_Q_WAIT, 0))
1334         return VPPCOM_EAGAIN;
1335
1336       e = svm_msg_q_msg_data (wrk->app_event_queue, &msg);
1337       if (e->event_type != SESSION_CTRL_EVT_ACCEPTED)
1338         {
1339           clib_warning ("discarded event: %u", e->event_type);
1340           svm_msg_q_free_msg (wrk->app_event_queue, &msg);
1341           continue;
1342         }
1343       clib_memcpy_fast (&accepted_msg, e->data, sizeof (accepted_msg));
1344       svm_msg_q_free_msg (wrk->app_event_queue, &msg);
1345       break;
1346     }
1347
1348 handle:
1349
1350   client_session_index = vcl_session_accepted_handler (wrk, &accepted_msg);
1351   listen_session = vcl_session_get (wrk, listen_session_index);
1352   client_session = vcl_session_get (wrk, client_session_index);
1353
1354   if (flags & O_NONBLOCK)
1355     VCL_SESS_ATTR_SET (client_session->attr, VCL_SESS_ATTR_NONBLOCK);
1356
1357   listen_vpp_handle = listen_session->vpp_handle;
1358   VDBG (1, "vpp handle 0x%llx, sid %u: Got a client request! "
1359         "vpp handle 0x%llx, sid %u, flags %d, is_nonblocking %u",
1360         listen_vpp_handle, listen_session_handle,
1361         client_session->vpp_handle, client_session_index,
1362         flags, VCL_SESS_ATTR_TEST (client_session->attr,
1363                                    VCL_SESS_ATTR_NONBLOCK));
1364
1365   if (ep)
1366     {
1367       ep->is_ip4 = client_session->transport.is_ip4;
1368       ep->port = client_session->transport.rmt_port;
1369       if (client_session->transport.is_ip4)
1370         clib_memcpy_fast (ep->ip, &client_session->transport.rmt_ip.ip4,
1371                           sizeof (ip4_address_t));
1372       else
1373         clib_memcpy_fast (ep->ip, &client_session->transport.rmt_ip.ip6,
1374                           sizeof (ip6_address_t));
1375     }
1376
1377   vcl_send_session_accepted_reply (client_session->vpp_evt_q,
1378                                    client_session->client_context,
1379                                    client_session->vpp_handle, 0);
1380
1381   VDBG (0, "listener %u [0x%llx] accepted %u [0x%llx] peer: %U:%u "
1382         "local: %U:%u", listen_session_handle, listen_vpp_handle,
1383         client_session_index, client_session->vpp_handle,
1384         format_ip46_address, &client_session->transport.rmt_ip,
1385         client_session->transport.is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
1386         clib_net_to_host_u16 (client_session->transport.rmt_port),
1387         format_ip46_address, &client_session->transport.lcl_ip,
1388         client_session->transport.is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
1389         clib_net_to_host_u16 (client_session->transport.lcl_port));
1390   vcl_evt (VCL_EVT_ACCEPT, client_session, listen_session,
1391            client_session_index);
1392
1393   /*
1394    * Session might have been closed already
1395    */
1396   if (accept_flags)
1397     {
1398       if (accept_flags & VCL_ACCEPTED_F_CLOSED)
1399         client_session->session_state = STATE_VPP_CLOSING;
1400       else if (accept_flags & VCL_ACCEPTED_F_RESET)
1401         client_session->session_state = STATE_DISCONNECT;
1402     }
1403   return vcl_session_handle (client_session);
1404 }
1405
1406 int
1407 vppcom_session_connect (uint32_t session_handle, vppcom_endpt_t * server_ep)
1408 {
1409   vcl_worker_t *wrk = vcl_worker_get_current ();
1410   vcl_session_t *session = 0;
1411   u32 session_index;
1412   int rv;
1413
1414   session = vcl_session_get_w_handle (wrk, session_handle);
1415   if (!session)
1416     return VPPCOM_EBADFD;
1417   session_index = session->session_index;
1418
1419   if (PREDICT_FALSE (session->is_vep))
1420     {
1421       clib_warning ("VCL<%d>: ERROR: sid %u: cannot "
1422                     "connect on an epoll session!", getpid (),
1423                     session_handle);
1424       return VPPCOM_EBADFD;
1425     }
1426
1427   if (PREDICT_FALSE (session->session_state & CLIENT_STATE_OPEN))
1428     {
1429       VDBG (0, "session handle %u [0x%llx]: session already "
1430             "connected to %s %U port %d proto %s, state 0x%x (%s)",
1431             session_handle, session->vpp_handle,
1432             session->transport.is_ip4 ? "IPv4" : "IPv6",
1433             format_ip46_address,
1434             &session->transport.rmt_ip, session->transport.is_ip4 ?
1435             IP46_TYPE_IP4 : IP46_TYPE_IP6,
1436             clib_net_to_host_u16 (session->transport.rmt_port),
1437             vppcom_proto_str (session->session_type), session->session_state,
1438             vppcom_session_state_str (session->session_state));
1439       return VPPCOM_OK;
1440     }
1441
1442   session->transport.is_ip4 = server_ep->is_ip4;
1443   if (session->transport.is_ip4)
1444     clib_memcpy_fast (&session->transport.rmt_ip.ip4, server_ep->ip,
1445                       sizeof (ip4_address_t));
1446   else
1447     clib_memcpy_fast (&session->transport.rmt_ip.ip6, server_ep->ip,
1448                       sizeof (ip6_address_t));
1449   session->transport.rmt_port = server_ep->port;
1450
1451   VDBG (0, "session handle %u [0x%llx]: connecting to server %s %U "
1452         "port %d proto %s", session_handle, session->vpp_handle,
1453         session->transport.is_ip4 ? "IPv4" : "IPv6",
1454         format_ip46_address,
1455         &session->transport.rmt_ip, session->transport.is_ip4 ?
1456         IP46_TYPE_IP4 : IP46_TYPE_IP6,
1457         clib_net_to_host_u16 (session->transport.rmt_port),
1458         vppcom_proto_str (session->session_type));
1459
1460   /*
1461    * Send connect request and wait for reply from vpp
1462    */
1463   vppcom_send_connect_sock (session);
1464   rv = vppcom_wait_for_session_state_change (session_index, STATE_CONNECT,
1465                                              vcm->cfg.session_timeout);
1466
1467   session = vcl_session_get (wrk, session_index);
1468
1469   if (PREDICT_FALSE (rv))
1470     {
1471       if (VPPCOM_DEBUG > 0)
1472         {
1473           if (session)
1474             clib_warning ("VCL<%d>: vpp handle 0x%llx, sid %u: connect "
1475                           "failed! returning %d (%s)", getpid (),
1476                           session->vpp_handle, session_handle, rv,
1477                           vppcom_retval_str (rv));
1478           else
1479             clib_warning ("VCL<%d>: no session for sid %u: connect failed! "
1480                           "returning %d (%s)", getpid (),
1481                           session_handle, rv, vppcom_retval_str (rv));
1482         }
1483     }
1484   else
1485     VDBG (0, "VCL<%d>: vpp handle 0x%llx, sid %u: connected!",
1486           getpid (), session->vpp_handle, session_handle);
1487
1488   return rv;
1489 }
1490
1491 static u8
1492 vcl_is_rx_evt_for_session (session_event_t * e, u32 sid, u8 is_ct)
1493 {
1494   return (e->event_type == SESSION_IO_EVT_RX && e->session_index == sid);
1495 }
1496
1497 static inline int
1498 vppcom_session_read_internal (uint32_t session_handle, void *buf, int n,
1499                               u8 peek)
1500 {
1501   vcl_worker_t *wrk = vcl_worker_get_current ();
1502   int n_read = 0, is_nonblocking;
1503   vcl_session_t *s = 0;
1504   svm_fifo_t *rx_fifo;
1505   svm_msg_q_msg_t msg;
1506   session_event_t *e;
1507   svm_msg_q_t *mq;
1508   u8 is_ct;
1509
1510   if (PREDICT_FALSE (!buf))
1511     return VPPCOM_EINVAL;
1512
1513   s = vcl_session_get_w_handle (wrk, session_handle);
1514   if (PREDICT_FALSE (!s || s->is_vep))
1515     return VPPCOM_EBADFD;
1516
1517   if (PREDICT_FALSE (!vcl_session_is_open (s)))
1518     {
1519       VDBG (0, "session handle %u[0x%llx] is not open! state 0x%x (%s)",
1520             s->session_index, s->vpp_handle, s->session_state,
1521             vppcom_session_state_str (s->session_state));
1522       return vcl_session_closed_error (s);
1523     }
1524
1525   is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
1526   is_ct = vcl_session_is_ct (s);
1527   mq = wrk->app_event_queue;
1528   rx_fifo = is_ct ? s->ct_rx_fifo : s->rx_fifo;
1529   s->has_rx_evt = 0;
1530
1531   if (svm_fifo_is_empty (rx_fifo))
1532     {
1533       if (is_nonblocking)
1534         {
1535           svm_fifo_unset_event (s->rx_fifo);
1536           return VPPCOM_EWOULDBLOCK;
1537         }
1538       while (svm_fifo_is_empty (rx_fifo))
1539         {
1540           if (vcl_session_is_closing (s))
1541             return vcl_session_closing_error (s);
1542
1543           svm_fifo_unset_event (s->rx_fifo);
1544           svm_msg_q_lock (mq);
1545           if (svm_msg_q_is_empty (mq))
1546             svm_msg_q_wait (mq);
1547
1548           svm_msg_q_sub_w_lock (mq, &msg);
1549           e = svm_msg_q_msg_data (mq, &msg);
1550           svm_msg_q_unlock (mq);
1551           if (!vcl_is_rx_evt_for_session (e, s->session_index, is_ct))
1552             vcl_handle_mq_event (wrk, e);
1553           svm_msg_q_free_msg (mq, &msg);
1554         }
1555     }
1556
1557   if (s->is_dgram)
1558     n_read = app_recv_dgram_raw (rx_fifo, buf, n, &s->transport, 0, peek);
1559   else
1560     n_read = app_recv_stream_raw (rx_fifo, buf, n, 0, peek);
1561
1562   if (svm_fifo_is_empty (rx_fifo))
1563     svm_fifo_unset_event (s->rx_fifo);
1564
1565   VDBG (2, "vpp handle 0x%llx, sid %u: read %d bytes from (%p)",
1566         s->vpp_handle, session_handle, n_read, rx_fifo);
1567
1568   return n_read;
1569 }
1570
1571 int
1572 vppcom_session_read (uint32_t session_handle, void *buf, size_t n)
1573 {
1574   return (vppcom_session_read_internal (session_handle, buf, n, 0));
1575 }
1576
1577 static int
1578 vppcom_session_peek (uint32_t session_handle, void *buf, int n)
1579 {
1580   return (vppcom_session_read_internal (session_handle, buf, n, 1));
1581 }
1582
1583 int
1584 vppcom_session_read_segments (uint32_t session_handle,
1585                               vppcom_data_segments_t ds)
1586 {
1587   vcl_worker_t *wrk = vcl_worker_get_current ();
1588   int n_read = 0, is_nonblocking;
1589   vcl_session_t *s = 0;
1590   svm_fifo_t *rx_fifo;
1591   svm_msg_q_msg_t msg;
1592   session_event_t *e;
1593   svm_msg_q_t *mq;
1594   u8 is_ct;
1595
1596   s = vcl_session_get_w_handle (wrk, session_handle);
1597   if (PREDICT_FALSE (!s || s->is_vep))
1598     return VPPCOM_EBADFD;
1599
1600   if (PREDICT_FALSE (!vcl_session_is_open (s)))
1601     return vcl_session_closed_error (s);
1602
1603   is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
1604   is_ct = vcl_session_is_ct (s);
1605   mq = is_ct ? s->our_evt_q : wrk->app_event_queue;
1606   rx_fifo = s->rx_fifo;
1607   s->has_rx_evt = 0;
1608
1609   if (is_ct)
1610     svm_fifo_unset_event (s->rx_fifo);
1611
1612   if (svm_fifo_is_empty (rx_fifo))
1613     {
1614       if (is_nonblocking)
1615         {
1616           svm_fifo_unset_event (rx_fifo);
1617           return VPPCOM_EWOULDBLOCK;
1618         }
1619       while (svm_fifo_is_empty (rx_fifo))
1620         {
1621           if (vcl_session_is_closing (s))
1622             return vcl_session_closing_error (s);
1623
1624           svm_fifo_unset_event (rx_fifo);
1625           svm_msg_q_lock (mq);
1626           if (svm_msg_q_is_empty (mq))
1627             svm_msg_q_wait (mq);
1628
1629           svm_msg_q_sub_w_lock (mq, &msg);
1630           e = svm_msg_q_msg_data (mq, &msg);
1631           svm_msg_q_unlock (mq);
1632           if (!vcl_is_rx_evt_for_session (e, s->session_index, is_ct))
1633             vcl_handle_mq_event (wrk, e);
1634           svm_msg_q_free_msg (mq, &msg);
1635         }
1636     }
1637
1638   n_read = svm_fifo_segments (rx_fifo, (svm_fifo_segment_t *) ds);
1639   svm_fifo_unset_event (rx_fifo);
1640
1641   return n_read;
1642 }
1643
1644 void
1645 vppcom_session_free_segments (uint32_t session_handle,
1646                               vppcom_data_segments_t ds)
1647 {
1648   vcl_worker_t *wrk = vcl_worker_get_current ();
1649   vcl_session_t *s;
1650
1651   s = vcl_session_get_w_handle (wrk, session_handle);
1652   if (PREDICT_FALSE (!s || s->is_vep))
1653     return;
1654
1655   svm_fifo_segments_free (s->rx_fifo, (svm_fifo_segment_t *) ds);
1656 }
1657
1658 int
1659 vppcom_data_segment_copy (void *buf, vppcom_data_segments_t ds, u32 max_bytes)
1660 {
1661   u32 first_copy = clib_min (ds[0].len, max_bytes);
1662   clib_memcpy_fast (buf, ds[0].data, first_copy);
1663   if (first_copy < max_bytes)
1664     {
1665       clib_memcpy_fast (buf + first_copy, ds[1].data,
1666                         clib_min (ds[1].len, max_bytes - first_copy));
1667     }
1668   return 0;
1669 }
1670
1671 static u8
1672 vcl_is_tx_evt_for_session (session_event_t * e, u32 sid, u8 is_ct)
1673 {
1674   return (e->event_type == SESSION_IO_EVT_TX && e->session_index == sid);
1675 }
1676
1677 static inline int
1678 vppcom_session_write_inline (uint32_t session_handle, void *buf, size_t n,
1679                              u8 is_flush)
1680 {
1681   vcl_worker_t *wrk = vcl_worker_get_current ();
1682   int n_write, is_nonblocking;
1683   vcl_session_t *s = 0;
1684   session_evt_type_t et;
1685   svm_msg_q_msg_t msg;
1686   svm_fifo_t *tx_fifo;
1687   session_event_t *e;
1688   svm_msg_q_t *mq;
1689   u8 is_ct;
1690
1691   if (PREDICT_FALSE (!buf))
1692     return VPPCOM_EINVAL;
1693
1694   s = vcl_session_get_w_handle (wrk, session_handle);
1695   if (PREDICT_FALSE (!s))
1696     return VPPCOM_EBADFD;
1697
1698   if (PREDICT_FALSE (s->is_vep))
1699     {
1700       VDBG (0, "ERROR: session %u [0x%llx]: cannot write to an epoll"
1701             " session!", s->session_index, s->vpp_handle);
1702       return VPPCOM_EBADFD;
1703     }
1704
1705   if (PREDICT_FALSE (!vcl_session_is_open (s)))
1706     {
1707       VDBG (1, "session %u [0x%llx]: is not open! state 0x%x (%s)",
1708             s->session_index, s->vpp_handle, s->session_state,
1709             vppcom_session_state_str (s->session_state));
1710       return vcl_session_closed_error (s);;
1711     }
1712
1713   is_ct = vcl_session_is_ct (s);
1714   tx_fifo = is_ct ? s->ct_tx_fifo : s->tx_fifo;
1715   is_nonblocking = VCL_SESS_ATTR_TEST (s->attr, VCL_SESS_ATTR_NONBLOCK);
1716   mq = wrk->app_event_queue;
1717   if (svm_fifo_is_full (tx_fifo))
1718     {
1719       if (is_nonblocking)
1720         {
1721           return VPPCOM_EWOULDBLOCK;
1722         }
1723       while (svm_fifo_is_full (tx_fifo))
1724         {
1725           svm_fifo_add_want_tx_ntf (tx_fifo, SVM_FIFO_WANT_TX_NOTIF);
1726           if (vcl_session_is_closing (s))
1727             return vcl_session_closing_error (s);
1728           svm_msg_q_lock (mq);
1729           if (svm_msg_q_is_empty (mq))
1730             svm_msg_q_wait (mq);
1731
1732           svm_msg_q_sub_w_lock (mq, &msg);
1733           e = svm_msg_q_msg_data (mq, &msg);
1734           svm_msg_q_unlock (mq);
1735
1736           if (!vcl_is_tx_evt_for_session (e, s->session_index, is_ct))
1737             vcl_handle_mq_event (wrk, e);
1738           svm_msg_q_free_msg (mq, &msg);
1739         }
1740     }
1741
1742   et = SESSION_IO_EVT_TX;
1743   if (is_flush && !is_ct)
1744     et = SESSION_IO_EVT_TX_FLUSH;
1745
1746   if (s->is_dgram)
1747     n_write = app_send_dgram_raw (tx_fifo, &s->transport,
1748                                   s->vpp_evt_q, buf, n, et,
1749                                   0 /* do_evt */ , SVM_Q_WAIT);
1750   else
1751     n_write = app_send_stream_raw (tx_fifo, s->vpp_evt_q, buf, n, et,
1752                                    0 /* do_evt */ , SVM_Q_WAIT);
1753
1754   if (svm_fifo_set_event (s->tx_fifo))
1755     app_send_io_evt_to_vpp (s->vpp_evt_q, s->tx_fifo->master_session_index,
1756                             et, SVM_Q_WAIT);
1757
1758   ASSERT (n_write > 0);
1759
1760   VDBG (2, "session %u [0x%llx]: wrote %d bytes", s->session_index,
1761         s->vpp_handle, n_write);
1762
1763   return n_write;
1764 }
1765
1766 int
1767 vppcom_session_write (uint32_t session_handle, void *buf, size_t n)
1768 {
1769   return vppcom_session_write_inline (session_handle, buf, n,
1770                                       0 /* is_flush */ );
1771 }
1772
1773 int
1774 vppcom_session_write_msg (uint32_t session_handle, void *buf, size_t n)
1775 {
1776   return vppcom_session_write_inline (session_handle, buf, n,
1777                                       1 /* is_flush */ );
1778 }
1779
1780 #define vcl_fifo_rx_evt_valid_or_break(_s)                              \
1781 if (PREDICT_FALSE (svm_fifo_is_empty (_s->rx_fifo)))                    \
1782   {                                                                     \
1783     if (!vcl_session_is_ct (_s))                                        \
1784       {                                                                 \
1785         svm_fifo_unset_event (_s->rx_fifo);                             \
1786         if (svm_fifo_is_empty (_s->rx_fifo))                            \
1787           break;                                                        \
1788       }                                                                 \
1789     else if (svm_fifo_is_empty (_s->ct_rx_fifo))                        \
1790       {                                                                 \
1791         svm_fifo_unset_event (_s->ct_rx_fifo);                          \
1792         if (svm_fifo_is_empty (_s->ct_rx_fifo))                         \
1793           break;                                                        \
1794       }                                                                 \
1795   }                                                                     \
1796
1797 static void
1798 vcl_select_handle_mq_event (vcl_worker_t * wrk, session_event_t * e,
1799                             unsigned long n_bits, unsigned long *read_map,
1800                             unsigned long *write_map,
1801                             unsigned long *except_map, u32 * bits_set)
1802 {
1803   session_disconnected_msg_t *disconnected_msg;
1804   session_connected_msg_t *connected_msg;
1805   vcl_session_t *session;
1806   u32 sid;
1807
1808   switch (e->event_type)
1809     {
1810     case SESSION_IO_EVT_RX:
1811       sid = e->session_index;
1812       session = vcl_session_get (wrk, sid);
1813       vcl_fifo_rx_evt_valid_or_break (session);
1814       if (!session)
1815         break;
1816       if (sid < n_bits && read_map)
1817         {
1818           clib_bitmap_set_no_check ((uword *) read_map, sid, 1);
1819           *bits_set += 1;
1820         }
1821       break;
1822     case SESSION_IO_EVT_TX:
1823       sid = e->session_index;
1824       session = vcl_session_get (wrk, sid);
1825       if (!session)
1826         break;
1827       if (sid < n_bits && write_map)
1828         {
1829           clib_bitmap_set_no_check ((uword *) write_map, sid, 1);
1830           *bits_set += 1;
1831         }
1832       break;
1833     case SESSION_CTRL_EVT_ACCEPTED:
1834       session = vcl_session_accepted (wrk,
1835                                       (session_accepted_msg_t *) e->data);
1836       if (!session)
1837         break;
1838       sid = session->session_index;
1839       if (sid < n_bits && read_map)
1840         {
1841           clib_bitmap_set_no_check ((uword *) read_map, sid, 1);
1842           *bits_set += 1;
1843         }
1844       break;
1845     case SESSION_CTRL_EVT_CONNECTED:
1846       connected_msg = (session_connected_msg_t *) e->data;
1847       vcl_session_connected_handler (wrk, connected_msg);
1848       break;
1849     case SESSION_CTRL_EVT_DISCONNECTED:
1850       disconnected_msg = (session_disconnected_msg_t *) e->data;
1851       session = vcl_session_disconnected_handler (wrk, disconnected_msg);
1852       if (!session)
1853         break;
1854       sid = session->session_index;
1855       if (sid < n_bits && except_map)
1856         {
1857           clib_bitmap_set_no_check ((uword *) except_map, sid, 1);
1858           *bits_set += 1;
1859         }
1860       break;
1861     case SESSION_CTRL_EVT_RESET:
1862       sid = vcl_session_reset_handler (wrk, (session_reset_msg_t *) e->data);
1863       if (sid < n_bits && except_map)
1864         {
1865           clib_bitmap_set_no_check ((uword *) except_map, sid, 1);
1866           *bits_set += 1;
1867         }
1868       break;
1869     case SESSION_CTRL_EVT_UNLISTEN_REPLY:
1870       vcl_session_unlisten_reply_handler (wrk, e->data);
1871       break;
1872     case SESSION_CTRL_EVT_WORKER_UPDATE_REPLY:
1873       vcl_session_worker_update_reply_handler (wrk, e->data);
1874       break;
1875     case SESSION_CTRL_EVT_REQ_WORKER_UPDATE:
1876       vcl_session_req_worker_update_handler (wrk, e->data);
1877       break;
1878     default:
1879       clib_warning ("unhandled: %u", e->event_type);
1880       break;
1881     }
1882 }
1883
1884 static int
1885 vcl_select_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
1886                       unsigned long n_bits, unsigned long *read_map,
1887                       unsigned long *write_map, unsigned long *except_map,
1888                       double time_to_wait, u32 * bits_set)
1889 {
1890   svm_msg_q_msg_t *msg;
1891   session_event_t *e;
1892   u32 i;
1893
1894   svm_msg_q_lock (mq);
1895   if (svm_msg_q_is_empty (mq))
1896     {
1897       if (*bits_set)
1898         {
1899           svm_msg_q_unlock (mq);
1900           return 0;
1901         }
1902
1903       if (!time_to_wait)
1904         {
1905           svm_msg_q_unlock (mq);
1906           return 0;
1907         }
1908       else if (time_to_wait < 0)
1909         {
1910           svm_msg_q_wait (mq);
1911         }
1912       else
1913         {
1914           if (svm_msg_q_timedwait (mq, time_to_wait))
1915             {
1916               svm_msg_q_unlock (mq);
1917               return 0;
1918             }
1919         }
1920     }
1921   vcl_mq_dequeue_batch (wrk, mq);
1922   svm_msg_q_unlock (mq);
1923
1924   for (i = 0; i < vec_len (wrk->mq_msg_vector); i++)
1925     {
1926       msg = vec_elt_at_index (wrk->mq_msg_vector, i);
1927       e = svm_msg_q_msg_data (mq, msg);
1928       vcl_select_handle_mq_event (wrk, e, n_bits, read_map, write_map,
1929                                   except_map, bits_set);
1930       svm_msg_q_free_msg (mq, msg);
1931     }
1932   vec_reset_length (wrk->mq_msg_vector);
1933   vcl_handle_pending_wrk_updates (wrk);
1934   return *bits_set;
1935 }
1936
1937 static int
1938 vppcom_select_condvar (vcl_worker_t * wrk, int n_bits,
1939                        vcl_si_set * read_map, vcl_si_set * write_map,
1940                        vcl_si_set * except_map, double time_to_wait,
1941                        u32 * bits_set)
1942 {
1943   double wait = 0, start = 0;
1944
1945   if (!*bits_set)
1946     {
1947       wait = time_to_wait;
1948       start = clib_time_now (&wrk->clib_time);
1949     }
1950
1951   do
1952     {
1953       vcl_select_handle_mq (wrk, wrk->app_event_queue, n_bits, read_map,
1954                             write_map, except_map, wait, bits_set);
1955       if (*bits_set)
1956         return *bits_set;
1957       if (wait == -1)
1958         continue;
1959
1960       wait = wait - (clib_time_now (&wrk->clib_time) - start);
1961     }
1962   while (wait > 0);
1963
1964   return 0;
1965 }
1966
1967 static int
1968 vppcom_select_eventfd (vcl_worker_t * wrk, int n_bits,
1969                        vcl_si_set * read_map, vcl_si_set * write_map,
1970                        vcl_si_set * except_map, double time_to_wait,
1971                        u32 * bits_set)
1972 {
1973   vcl_mq_evt_conn_t *mqc;
1974   int __clib_unused n_read;
1975   int n_mq_evts, i;
1976   u64 buf;
1977
1978   vec_validate (wrk->mq_events, pool_elts (wrk->mq_evt_conns));
1979   n_mq_evts = epoll_wait (wrk->mqs_epfd, wrk->mq_events,
1980                           vec_len (wrk->mq_events), time_to_wait);
1981   for (i = 0; i < n_mq_evts; i++)
1982     {
1983       mqc = vcl_mq_evt_conn_get (wrk, wrk->mq_events[i].data.u32);
1984       n_read = read (mqc->mq_fd, &buf, sizeof (buf));
1985       vcl_select_handle_mq (wrk, mqc->mq, n_bits, read_map, write_map,
1986                             except_map, 0, bits_set);
1987     }
1988
1989   return (n_mq_evts > 0 ? (int) *bits_set : 0);
1990 }
1991
1992 int
1993 vppcom_select (int n_bits, vcl_si_set * read_map, vcl_si_set * write_map,
1994                vcl_si_set * except_map, double time_to_wait)
1995 {
1996   u32 sid, minbits = clib_max (n_bits, BITS (uword)), bits_set = 0;
1997   vcl_worker_t *wrk = vcl_worker_get_current ();
1998   vcl_session_t *session = 0;
1999   int rv, i;
2000
2001   if (n_bits && read_map)
2002     {
2003       clib_bitmap_validate (wrk->rd_bitmap, minbits);
2004       clib_memcpy_fast (wrk->rd_bitmap, read_map,
2005                         vec_len (wrk->rd_bitmap) * sizeof (vcl_si_set));
2006       memset (read_map, 0, vec_len (wrk->rd_bitmap) * sizeof (vcl_si_set));
2007     }
2008   if (n_bits && write_map)
2009     {
2010       clib_bitmap_validate (wrk->wr_bitmap, minbits);
2011       clib_memcpy_fast (wrk->wr_bitmap, write_map,
2012                         vec_len (wrk->wr_bitmap) * sizeof (vcl_si_set));
2013       memset (write_map, 0, vec_len (wrk->wr_bitmap) * sizeof (vcl_si_set));
2014     }
2015   if (n_bits && except_map)
2016     {
2017       clib_bitmap_validate (wrk->ex_bitmap, minbits);
2018       clib_memcpy_fast (wrk->ex_bitmap, except_map,
2019                         vec_len (wrk->ex_bitmap) * sizeof (vcl_si_set));
2020       memset (except_map, 0, vec_len (wrk->ex_bitmap) * sizeof (vcl_si_set));
2021     }
2022
2023   if (!n_bits)
2024     return 0;
2025
2026   if (!write_map)
2027     goto check_rd;
2028
2029   /* *INDENT-OFF* */
2030   clib_bitmap_foreach (sid, wrk->wr_bitmap, ({
2031     if (!(session = vcl_session_get (wrk, sid)))
2032       {
2033         if (except_map && sid < minbits)
2034           clib_bitmap_set_no_check (except_map, sid, 1);
2035         continue;
2036       }
2037
2038     rv = svm_fifo_is_full (session->tx_fifo);
2039     if (!rv)
2040       {
2041         clib_bitmap_set_no_check ((uword*)write_map, sid, 1);
2042         bits_set++;
2043       }
2044     else
2045       svm_fifo_add_want_tx_ntf (session->tx_fifo, SVM_FIFO_WANT_TX_NOTIF);
2046   }));
2047
2048 check_rd:
2049   if (!read_map)
2050     goto check_mq;
2051
2052   clib_bitmap_foreach (sid, wrk->rd_bitmap, ({
2053     if (!(session = vcl_session_get (wrk, sid)))
2054       {
2055         if (except_map && sid < minbits)
2056           clib_bitmap_set_no_check (except_map, sid, 1);
2057         continue;
2058       }
2059
2060     rv = vcl_session_read_ready (session);
2061     if (rv)
2062       {
2063         clib_bitmap_set_no_check ((uword*)read_map, sid, 1);
2064         bits_set++;
2065       }
2066   }));
2067   /* *INDENT-ON* */
2068
2069 check_mq:
2070
2071   for (i = 0; i < vec_len (wrk->unhandled_evts_vector); i++)
2072     {
2073       vcl_select_handle_mq_event (wrk, &wrk->unhandled_evts_vector[i], n_bits,
2074                                   read_map, write_map, except_map, &bits_set);
2075     }
2076   vec_reset_length (wrk->unhandled_evts_vector);
2077
2078   if (vcm->cfg.use_mq_eventfd)
2079     vppcom_select_eventfd (wrk, n_bits, read_map, write_map, except_map,
2080                            time_to_wait, &bits_set);
2081   else
2082     vppcom_select_condvar (wrk, n_bits, read_map, write_map, except_map,
2083                            time_to_wait, &bits_set);
2084
2085   return (bits_set);
2086 }
2087
2088 static inline void
2089 vep_verify_epoll_chain (vcl_worker_t * wrk, u32 vep_idx)
2090 {
2091   vcl_session_t *session;
2092   vppcom_epoll_t *vep;
2093   u32 sid = vep_idx;
2094
2095   if (VPPCOM_DEBUG <= 2)
2096     return;
2097
2098   /* Assumes caller has acquired spinlock: vcm->sessions_lockp */
2099   session = vcl_session_get (wrk, vep_idx);
2100   if (PREDICT_FALSE (!session))
2101     {
2102       clib_warning ("VCL<%d>: ERROR: Invalid vep_idx (%u)!",
2103                     getpid (), vep_idx);
2104       goto done;
2105     }
2106   if (PREDICT_FALSE (!session->is_vep))
2107     {
2108       clib_warning ("VCL<%d>: ERROR: vep_idx (%u) is not a vep!",
2109                     getpid (), vep_idx);
2110       goto done;
2111     }
2112   vep = &session->vep;
2113   clib_warning ("VCL<%d>: vep_idx (%u): Dumping epoll chain\n"
2114                 "{\n"
2115                 "   is_vep         = %u\n"
2116                 "   is_vep_session = %u\n"
2117                 "   next_sid       = 0x%x (%u)\n"
2118                 "}\n", getpid (), vep_idx,
2119                 session->is_vep, session->is_vep_session,
2120                 vep->next_sh, vep->next_sh);
2121
2122   for (sid = vep->next_sh; sid != ~0; sid = vep->next_sh)
2123     {
2124       session = vcl_session_get (wrk, sid);
2125       if (PREDICT_FALSE (!session))
2126         {
2127           clib_warning ("VCL<%d>: ERROR: Invalid sid (%u)!", getpid (), sid);
2128           goto done;
2129         }
2130       if (PREDICT_FALSE (session->is_vep))
2131         clib_warning ("VCL<%d>: ERROR: sid (%u) is a vep!",
2132                       getpid (), vep_idx);
2133       else if (PREDICT_FALSE (!session->is_vep_session))
2134         {
2135           clib_warning ("VCL<%d>: ERROR: session (%u) "
2136                         "is not a vep session!", getpid (), sid);
2137           goto done;
2138         }
2139       vep = &session->vep;
2140       if (PREDICT_FALSE (vep->vep_sh != vep_idx))
2141         clib_warning ("VCL<%d>: ERROR: session (%u) vep_idx (%u) != "
2142                       "vep_idx (%u)!", getpid (),
2143                       sid, session->vep.vep_sh, vep_idx);
2144       if (session->is_vep_session)
2145         {
2146           clib_warning ("vep_idx[%u]: sid 0x%x (%u)\n"
2147                         "{\n"
2148                         "   next_sid       = 0x%x (%u)\n"
2149                         "   prev_sid       = 0x%x (%u)\n"
2150                         "   vep_idx        = 0x%x (%u)\n"
2151                         "   ev.events      = 0x%x\n"
2152                         "   ev.data.u64    = 0x%llx\n"
2153                         "   et_mask        = 0x%x\n"
2154                         "}\n",
2155                         vep_idx, sid, sid,
2156                         vep->next_sh, vep->next_sh,
2157                         vep->prev_sh, vep->prev_sh,
2158                         vep->vep_sh, vep->vep_sh,
2159                         vep->ev.events, vep->ev.data.u64, vep->et_mask);
2160         }
2161     }
2162
2163 done:
2164   clib_warning ("VCL<%d>: vep_idx (%u): Dump complete!\n",
2165                 getpid (), vep_idx);
2166 }
2167
2168 int
2169 vppcom_epoll_create (void)
2170 {
2171   vcl_worker_t *wrk = vcl_worker_get_current ();
2172   vcl_session_t *vep_session;
2173
2174   vep_session = vcl_session_alloc (wrk);
2175
2176   vep_session->is_vep = 1;
2177   vep_session->vep.vep_sh = ~0;
2178   vep_session->vep.next_sh = ~0;
2179   vep_session->vep.prev_sh = ~0;
2180   vep_session->vpp_handle = ~0;
2181
2182   vcl_evt (VCL_EVT_EPOLL_CREATE, vep_session, vep_session->session_index);
2183   VDBG (0, "Created vep_idx %u", vep_session->session_index);
2184
2185   return vcl_session_handle (vep_session);
2186 }
2187
2188 int
2189 vppcom_epoll_ctl (uint32_t vep_handle, int op, uint32_t session_handle,
2190                   struct epoll_event *event)
2191 {
2192   vcl_worker_t *wrk = vcl_worker_get_current ();
2193   vcl_session_t *vep_session;
2194   vcl_session_t *session;
2195   int rv = VPPCOM_OK;
2196
2197   if (vep_handle == session_handle)
2198     {
2199       VDBG (0, "vep_sh == session handle (%u)!", vep_handle);
2200       return VPPCOM_EINVAL;
2201     }
2202
2203   vep_session = vcl_session_get_w_handle (wrk, vep_handle);
2204   if (PREDICT_FALSE (!vep_session))
2205     {
2206       VDBG (0, "Invalid vep_sh (%u)!", vep_handle);
2207       return VPPCOM_EBADFD;
2208     }
2209   if (PREDICT_FALSE (!vep_session->is_vep))
2210     {
2211       VDBG (0, "vep_sh (%u) is not a vep!", vep_handle);
2212       return VPPCOM_EINVAL;
2213     }
2214
2215   ASSERT (vep_session->vep.vep_sh == ~0);
2216   ASSERT (vep_session->vep.prev_sh == ~0);
2217
2218   session = vcl_session_get_w_handle (wrk, session_handle);
2219   if (PREDICT_FALSE (!session))
2220     {
2221       VDBG (0, "Invalid session_handle (%u)!", session_handle);
2222       return VPPCOM_EBADFD;
2223     }
2224   if (PREDICT_FALSE (session->is_vep))
2225     {
2226       VDBG (0, "session_handle (%u) is a vep!", vep_handle);
2227       return VPPCOM_EINVAL;
2228     }
2229
2230   switch (op)
2231     {
2232     case EPOLL_CTL_ADD:
2233       if (PREDICT_FALSE (!event))
2234         {
2235           VDBG (0, "EPOLL_CTL_ADD: NULL pointer to epoll_event structure!");
2236           return VPPCOM_EINVAL;
2237         }
2238       if (vep_session->vep.next_sh != ~0)
2239         {
2240           vcl_session_t *next_session;
2241           next_session = vcl_session_get_w_handle (wrk,
2242                                                    vep_session->vep.next_sh);
2243           if (PREDICT_FALSE (!next_session))
2244             {
2245               VDBG (0, "EPOLL_CTL_ADD: Invalid vep.next_sid (%u) on "
2246                     "vep_idx (%u)!", vep_session->vep.next_sh, vep_handle);
2247               return VPPCOM_EBADFD;
2248             }
2249           ASSERT (next_session->vep.prev_sh == vep_handle);
2250           next_session->vep.prev_sh = session_handle;
2251         }
2252       session->vep.next_sh = vep_session->vep.next_sh;
2253       session->vep.prev_sh = vep_handle;
2254       session->vep.vep_sh = vep_handle;
2255       session->vep.et_mask = VEP_DEFAULT_ET_MASK;
2256       session->vep.ev = *event;
2257       session->is_vep = 0;
2258       session->is_vep_session = 1;
2259       vep_session->vep.next_sh = session_handle;
2260
2261       if (session->tx_fifo)
2262         svm_fifo_add_want_tx_ntf (session->tx_fifo,
2263                                   SVM_FIFO_WANT_TX_NOTIF_IF_FULL);
2264
2265       VDBG (1, "EPOLL_CTL_ADD: vep_sh %u, sh %u, events 0x%x, data 0x%llx!",
2266             vep_handle, session_handle, event->events, event->data.u64);
2267       vcl_evt (VCL_EVT_EPOLL_CTLADD, session, event->events, event->data.u64);
2268       break;
2269
2270     case EPOLL_CTL_MOD:
2271       if (PREDICT_FALSE (!event))
2272         {
2273           VDBG (0, "EPOLL_CTL_MOD: NULL pointer to epoll_event structure!");
2274           rv = VPPCOM_EINVAL;
2275           goto done;
2276         }
2277       else if (PREDICT_FALSE (!session->is_vep_session))
2278         {
2279           VDBG (0, "sid %u EPOLL_CTL_MOD: not a vep session!",
2280                 session_handle);
2281           rv = VPPCOM_EINVAL;
2282           goto done;
2283         }
2284       else if (PREDICT_FALSE (session->vep.vep_sh != vep_handle))
2285         {
2286           VDBG (0, "EPOLL_CTL_MOD: sh %u vep_sh (%u) != vep_sh (%u)!",
2287                 session_handle, session->vep.vep_sh, vep_handle);
2288           rv = VPPCOM_EINVAL;
2289           goto done;
2290         }
2291       session->vep.et_mask = VEP_DEFAULT_ET_MASK;
2292       session->vep.ev = *event;
2293       VDBG (1, "EPOLL_CTL_MOD: vep_sh %u, sh %u, events 0x%x, data 0x%llx!",
2294             vep_handle, session_handle, event->events, event->data.u64);
2295       break;
2296
2297     case EPOLL_CTL_DEL:
2298       if (PREDICT_FALSE (!session->is_vep_session))
2299         {
2300           VDBG (0, "EPOLL_CTL_DEL: %u not a vep session!", session_handle);
2301           rv = VPPCOM_EINVAL;
2302           goto done;
2303         }
2304       else if (PREDICT_FALSE (session->vep.vep_sh != vep_handle))
2305         {
2306           VDBG (0, "EPOLL_CTL_DEL: sh %u vep_sh (%u) != vep_sh (%u)!",
2307                 session_handle, session->vep.vep_sh, vep_handle);
2308           rv = VPPCOM_EINVAL;
2309           goto done;
2310         }
2311
2312       if (session->vep.prev_sh == vep_handle)
2313         vep_session->vep.next_sh = session->vep.next_sh;
2314       else
2315         {
2316           vcl_session_t *prev_session;
2317           prev_session = vcl_session_get_w_handle (wrk, session->vep.prev_sh);
2318           if (PREDICT_FALSE (!prev_session))
2319             {
2320               VDBG (0, "EPOLL_CTL_DEL: Invalid prev_sid (%u) on sid (%u)!",
2321                     session->vep.prev_sh, session_handle);
2322               return VPPCOM_EBADFD;
2323             }
2324           ASSERT (prev_session->vep.next_sh == session_handle);
2325           prev_session->vep.next_sh = session->vep.next_sh;
2326         }
2327       if (session->vep.next_sh != ~0)
2328         {
2329           vcl_session_t *next_session;
2330           next_session = vcl_session_get_w_handle (wrk, session->vep.next_sh);
2331           if (PREDICT_FALSE (!next_session))
2332             {
2333               VDBG (0, "EPOLL_CTL_DEL: Invalid next_sid (%u) on sid (%u)!",
2334                     session->vep.next_sh, session_handle);
2335               return VPPCOM_EBADFD;
2336             }
2337           ASSERT (next_session->vep.prev_sh == session_handle);
2338           next_session->vep.prev_sh = session->vep.prev_sh;
2339         }
2340
2341       memset (&session->vep, 0, sizeof (session->vep));
2342       session->vep.next_sh = ~0;
2343       session->vep.prev_sh = ~0;
2344       session->vep.vep_sh = ~0;
2345       session->is_vep_session = 0;
2346
2347       if (session->tx_fifo)
2348         svm_fifo_del_want_tx_ntf (session->tx_fifo, SVM_FIFO_NO_TX_NOTIF);
2349
2350       VDBG (1, "EPOLL_CTL_DEL: vep_idx %u, sid %u!", vep_handle,
2351             session_handle);
2352       vcl_evt (VCL_EVT_EPOLL_CTLDEL, session, vep_sh);
2353       break;
2354
2355     default:
2356       VDBG (0, "Invalid operation (%d)!", op);
2357       rv = VPPCOM_EINVAL;
2358     }
2359
2360   vep_verify_epoll_chain (wrk, vep_handle);
2361
2362 done:
2363   return rv;
2364 }
2365
2366 static inline void
2367 vcl_epoll_wait_handle_mq_event (vcl_worker_t * wrk, session_event_t * e,
2368                                 struct epoll_event *events, u32 * num_ev)
2369 {
2370   session_disconnected_msg_t *disconnected_msg;
2371   session_connected_msg_t *connected_msg;
2372   u32 sid = ~0, session_events;
2373   u64 session_evt_data = ~0;
2374   vcl_session_t *session;
2375   u8 add_event = 0;
2376
2377   switch (e->event_type)
2378     {
2379     case SESSION_IO_EVT_RX:
2380       sid = e->session_index;
2381       if (!(session = vcl_session_get (wrk, sid)))
2382         break;
2383       vcl_fifo_rx_evt_valid_or_break (session);
2384       session_events = session->vep.ev.events;
2385       if (!(EPOLLIN & session->vep.ev.events) || session->has_rx_evt)
2386         break;
2387       add_event = 1;
2388       events[*num_ev].events |= EPOLLIN;
2389       session_evt_data = session->vep.ev.data.u64;
2390       session->has_rx_evt = 1;
2391       break;
2392     case SESSION_IO_EVT_TX:
2393       sid = e->session_index;
2394       if (!(session = vcl_session_get (wrk, sid)))
2395         break;
2396       session_events = session->vep.ev.events;
2397       if (!(EPOLLOUT & session_events))
2398         break;
2399       add_event = 1;
2400       events[*num_ev].events |= EPOLLOUT;
2401       session_evt_data = session->vep.ev.data.u64;
2402       svm_fifo_reset_tx_ntf (session->tx_fifo);
2403       break;
2404     case SESSION_CTRL_EVT_ACCEPTED:
2405       session = vcl_session_accepted (wrk,
2406                                       (session_accepted_msg_t *) e->data);
2407       if (!session)
2408         break;
2409
2410       session_events = session->vep.ev.events;
2411       if (!(EPOLLIN & session_events))
2412         break;
2413
2414       add_event = 1;
2415       events[*num_ev].events |= EPOLLIN;
2416       session_evt_data = session->vep.ev.data.u64;
2417       break;
2418     case SESSION_CTRL_EVT_CONNECTED:
2419       connected_msg = (session_connected_msg_t *) e->data;
2420       vcl_session_connected_handler (wrk, connected_msg);
2421       /* Generate EPOLLOUT because there's no connected event */
2422       sid = vcl_session_index_from_vpp_handle (wrk, connected_msg->handle);
2423       if (!(session = vcl_session_get (wrk, sid)))
2424         break;
2425       session_events = session->vep.ev.events;
2426       if (!(EPOLLOUT & session_events))
2427         break;
2428       add_event = 1;
2429       events[*num_ev].events |= EPOLLOUT;
2430       session_evt_data = session->vep.ev.data.u64;
2431       break;
2432     case SESSION_CTRL_EVT_DISCONNECTED:
2433       disconnected_msg = (session_disconnected_msg_t *) e->data;
2434       session = vcl_session_disconnected_handler (wrk, disconnected_msg);
2435       if (!session)
2436         break;
2437       session_events = session->vep.ev.events;
2438       if (!((EPOLLHUP | EPOLLRDHUP) & session_events))
2439         break;
2440       add_event = 1;
2441       events[*num_ev].events |= EPOLLHUP | EPOLLRDHUP;
2442       session_evt_data = session->vep.ev.data.u64;
2443       break;
2444     case SESSION_CTRL_EVT_RESET:
2445       sid = vcl_session_reset_handler (wrk, (session_reset_msg_t *) e->data);
2446       if (!(session = vcl_session_get (wrk, sid)))
2447         break;
2448       session_events = session->vep.ev.events;
2449       if (!((EPOLLHUP | EPOLLRDHUP) & session_events))
2450         break;
2451       add_event = 1;
2452       events[*num_ev].events |= EPOLLHUP | EPOLLRDHUP;
2453       session_evt_data = session->vep.ev.data.u64;
2454       break;
2455     case SESSION_CTRL_EVT_UNLISTEN_REPLY:
2456       vcl_session_unlisten_reply_handler (wrk, e->data);
2457       break;
2458     case SESSION_CTRL_EVT_REQ_WORKER_UPDATE:
2459       vcl_session_req_worker_update_handler (wrk, e->data);
2460       break;
2461     case SESSION_CTRL_EVT_WORKER_UPDATE_REPLY:
2462       vcl_session_worker_update_reply_handler (wrk, e->data);
2463       break;
2464     default:
2465       VDBG (0, "unhandled: %u", e->event_type);
2466       break;
2467     }
2468
2469   if (add_event)
2470     {
2471       events[*num_ev].data.u64 = session_evt_data;
2472       if (EPOLLONESHOT & session_events)
2473         {
2474           session = vcl_session_get (wrk, sid);
2475           session->vep.ev.events = 0;
2476         }
2477       *num_ev += 1;
2478     }
2479 }
2480
2481 static int
2482 vcl_epoll_wait_handle_mq (vcl_worker_t * wrk, svm_msg_q_t * mq,
2483                           struct epoll_event *events, u32 maxevents,
2484                           double wait_for_time, u32 * num_ev)
2485 {
2486   svm_msg_q_msg_t *msg;
2487   session_event_t *e;
2488   int i;
2489
2490   if (vec_len (wrk->mq_msg_vector) && svm_msg_q_is_empty (mq))
2491     goto handle_dequeued;
2492
2493   svm_msg_q_lock (mq);
2494   if (svm_msg_q_is_empty (mq))
2495     {
2496       if (!wait_for_time)
2497         {
2498           svm_msg_q_unlock (mq);
2499           return 0;
2500         }
2501       else if (wait_for_time < 0)
2502         {
2503           svm_msg_q_wait (mq);
2504         }
2505       else
2506         {
2507           if (svm_msg_q_timedwait (mq, wait_for_time / 1e3))
2508             {
2509               svm_msg_q_unlock (mq);
2510               return 0;
2511             }
2512         }
2513     }
2514   vcl_mq_dequeue_batch (wrk, mq);
2515   svm_msg_q_unlock (mq);
2516
2517 handle_dequeued:
2518   for (i = 0; i < vec_len (wrk->mq_msg_vector); i++)
2519     {
2520       msg = vec_elt_at_index (wrk->mq_msg_vector, i);
2521       e = svm_msg_q_msg_data (mq, msg);
2522       if (*num_ev < maxevents)
2523         vcl_epoll_wait_handle_mq_event (wrk, e, events, num_ev);
2524       else
2525         vec_add1 (wrk->unhandled_evts_vector, *e);
2526       svm_msg_q_free_msg (mq, msg);
2527     }
2528   vec_reset_length (wrk->mq_msg_vector);
2529   vcl_handle_pending_wrk_updates (wrk);
2530   return *num_ev;
2531 }
2532
2533 static int
2534 vppcom_epoll_wait_condvar (vcl_worker_t * wrk, struct epoll_event *events,
2535                            int maxevents, u32 n_evts, double wait_for_time)
2536 {
2537   double wait = 0, start = 0;
2538
2539   if (!n_evts)
2540     {
2541       wait = wait_for_time;
2542       start = clib_time_now (&wrk->clib_time);
2543     }
2544
2545   do
2546     {
2547       vcl_epoll_wait_handle_mq (wrk, wrk->app_event_queue, events, maxevents,
2548                                 wait, &n_evts);
2549       if (n_evts)
2550         return n_evts;
2551       if (wait == -1)
2552         continue;
2553
2554       wait = wait - (clib_time_now (&wrk->clib_time) - start);
2555     }
2556   while (wait > 0);
2557
2558   return 0;
2559 }
2560
2561 static int
2562 vppcom_epoll_wait_eventfd (vcl_worker_t * wrk, struct epoll_event *events,
2563                            int maxevents, u32 n_evts, double wait_for_time)
2564 {
2565   vcl_mq_evt_conn_t *mqc;
2566   int __clib_unused n_read;
2567   int n_mq_evts, i;
2568   u64 buf;
2569
2570   vec_validate (wrk->mq_events, pool_elts (wrk->mq_evt_conns));
2571 again:
2572   n_mq_evts = epoll_wait (wrk->mqs_epfd, wrk->mq_events,
2573                           vec_len (wrk->mq_events), wait_for_time);
2574   for (i = 0; i < n_mq_evts; i++)
2575     {
2576       mqc = vcl_mq_evt_conn_get (wrk, wrk->mq_events[i].data.u32);
2577       n_read = read (mqc->mq_fd, &buf, sizeof (buf));
2578       vcl_epoll_wait_handle_mq (wrk, mqc->mq, events, maxevents, 0, &n_evts);
2579     }
2580   if (!n_evts && n_mq_evts > 0)
2581     goto again;
2582
2583   return (int) n_evts;
2584 }
2585
2586 int
2587 vppcom_epoll_wait (uint32_t vep_handle, struct epoll_event *events,
2588                    int maxevents, double wait_for_time)
2589 {
2590   vcl_worker_t *wrk = vcl_worker_get_current ();
2591   vcl_session_t *vep_session;
2592   u32 n_evts = 0;
2593   int i;
2594
2595   if (PREDICT_FALSE (maxevents <= 0))
2596     {
2597       clib_warning ("VCL<%d>: ERROR: Invalid maxevents (%d)!",
2598                     getpid (), maxevents);
2599       return VPPCOM_EINVAL;
2600     }
2601
2602   vep_session = vcl_session_get_w_handle (wrk, vep_handle);
2603   if (!vep_session)
2604     return VPPCOM_EBADFD;
2605
2606   if (PREDICT_FALSE (!vep_session->is_vep))
2607     {
2608       clib_warning ("VCL<%d>: ERROR: vep_idx (%u) is not a vep!",
2609                     getpid (), vep_handle);
2610       return VPPCOM_EINVAL;
2611     }
2612
2613   memset (events, 0, sizeof (*events) * maxevents);
2614
2615   if (vec_len (wrk->unhandled_evts_vector))
2616     {
2617       for (i = 0; i < vec_len (wrk->unhandled_evts_vector); i++)
2618         {
2619           vcl_epoll_wait_handle_mq_event (wrk, &wrk->unhandled_evts_vector[i],
2620                                           events, &n_evts);
2621           if (n_evts == maxevents)
2622             {
2623               i += 1;
2624               break;
2625             }
2626         }
2627       vec_delete (wrk->unhandled_evts_vector, i, 0);
2628     }
2629
2630   if (vcm->cfg.use_mq_eventfd)
2631     return vppcom_epoll_wait_eventfd (wrk, events, maxevents, n_evts,
2632                                       wait_for_time);
2633
2634   return vppcom_epoll_wait_condvar (wrk, events, maxevents, n_evts,
2635                                     wait_for_time);
2636 }
2637
2638 int
2639 vppcom_session_attr (uint32_t session_handle, uint32_t op,
2640                      void *buffer, uint32_t * buflen)
2641 {
2642   vcl_worker_t *wrk = vcl_worker_get_current ();
2643   vcl_session_t *session;
2644   int rv = VPPCOM_OK;
2645   u32 *flags = buffer, tmp_flags = 0;
2646   vppcom_endpt_t *ep = buffer;
2647
2648   session = vcl_session_get_w_handle (wrk, session_handle);
2649   if (!session)
2650     return VPPCOM_EBADFD;
2651
2652   switch (op)
2653     {
2654     case VPPCOM_ATTR_GET_NREAD:
2655       rv = vcl_session_read_ready (session);
2656       VDBG (2, "VPPCOM_ATTR_GET_NREAD: sid %u, nread = %d", rv);
2657       break;
2658
2659     case VPPCOM_ATTR_GET_NWRITE:
2660       rv = vcl_session_write_ready (session);
2661       VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_NWRITE: sid %u, nwrite = %d",
2662             getpid (), session_handle, rv);
2663       break;
2664
2665     case VPPCOM_ATTR_GET_FLAGS:
2666       if (PREDICT_TRUE (buffer && buflen && (*buflen >= sizeof (*flags))))
2667         {
2668           *flags = O_RDWR | (VCL_SESS_ATTR_TEST (session->attr,
2669                                                  VCL_SESS_ATTR_NONBLOCK));
2670           *buflen = sizeof (*flags);
2671           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_FLAGS: sid %u, flags = 0x%08x, "
2672                 "is_nonblocking = %u", getpid (),
2673                 session_handle, *flags,
2674                 VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_NONBLOCK));
2675         }
2676       else
2677         rv = VPPCOM_EINVAL;
2678       break;
2679
2680     case VPPCOM_ATTR_SET_FLAGS:
2681       if (PREDICT_TRUE (buffer && buflen && (*buflen == sizeof (*flags))))
2682         {
2683           if (*flags & O_NONBLOCK)
2684             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_NONBLOCK);
2685           else
2686             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_NONBLOCK);
2687
2688           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_FLAGS: sid %u, flags = 0x%08x,"
2689                 " is_nonblocking = %u",
2690                 getpid (), session_handle, *flags,
2691                 VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_NONBLOCK));
2692         }
2693       else
2694         rv = VPPCOM_EINVAL;
2695       break;
2696
2697     case VPPCOM_ATTR_GET_PEER_ADDR:
2698       if (PREDICT_TRUE (buffer && buflen &&
2699                         (*buflen >= sizeof (*ep)) && ep->ip))
2700         {
2701           ep->is_ip4 = session->transport.is_ip4;
2702           ep->port = session->transport.rmt_port;
2703           if (session->transport.is_ip4)
2704             clib_memcpy_fast (ep->ip, &session->transport.rmt_ip.ip4,
2705                               sizeof (ip4_address_t));
2706           else
2707             clib_memcpy_fast (ep->ip, &session->transport.rmt_ip.ip6,
2708                               sizeof (ip6_address_t));
2709           *buflen = sizeof (*ep);
2710           VDBG (1, "VCL<%d>: VPPCOM_ATTR_GET_PEER_ADDR: sid %u, is_ip4 = %u, "
2711                 "addr = %U, port %u", getpid (),
2712                 session_handle, ep->is_ip4, format_ip46_address,
2713                 &session->transport.rmt_ip,
2714                 ep->is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
2715                 clib_net_to_host_u16 (ep->port));
2716         }
2717       else
2718         rv = VPPCOM_EINVAL;
2719       break;
2720
2721     case VPPCOM_ATTR_GET_LCL_ADDR:
2722       if (PREDICT_TRUE (buffer && buflen &&
2723                         (*buflen >= sizeof (*ep)) && ep->ip))
2724         {
2725           ep->is_ip4 = session->transport.is_ip4;
2726           ep->port = session->transport.lcl_port;
2727           if (session->transport.is_ip4)
2728             clib_memcpy_fast (ep->ip, &session->transport.lcl_ip.ip4,
2729                               sizeof (ip4_address_t));
2730           else
2731             clib_memcpy_fast (ep->ip, &session->transport.lcl_ip.ip6,
2732                               sizeof (ip6_address_t));
2733           *buflen = sizeof (*ep);
2734           VDBG (1, "VCL<%d>: VPPCOM_ATTR_GET_LCL_ADDR: sid %u, is_ip4 = %u,"
2735                 " addr = %U port %d", getpid (),
2736                 session_handle, ep->is_ip4, format_ip46_address,
2737                 &session->transport.lcl_ip,
2738                 ep->is_ip4 ? IP46_TYPE_IP4 : IP46_TYPE_IP6,
2739                 clib_net_to_host_u16 (ep->port));
2740         }
2741       else
2742         rv = VPPCOM_EINVAL;
2743       break;
2744
2745     case VPPCOM_ATTR_GET_LIBC_EPFD:
2746       rv = session->libc_epfd;
2747       VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_LIBC_EPFD: libc_epfd %d",
2748             getpid (), rv);
2749       break;
2750
2751     case VPPCOM_ATTR_SET_LIBC_EPFD:
2752       if (PREDICT_TRUE (buffer && buflen &&
2753                         (*buflen == sizeof (session->libc_epfd))))
2754         {
2755           session->libc_epfd = *(int *) buffer;
2756           *buflen = sizeof (session->libc_epfd);
2757
2758           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_LIBC_EPFD: libc_epfd %d, "
2759                 "buflen %d", getpid (), session->libc_epfd, *buflen);
2760         }
2761       else
2762         rv = VPPCOM_EINVAL;
2763       break;
2764
2765     case VPPCOM_ATTR_GET_PROTOCOL:
2766       if (buffer && buflen && (*buflen >= sizeof (int)))
2767         {
2768           *(int *) buffer = session->session_type;
2769           *buflen = sizeof (int);
2770
2771           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_PROTOCOL: %d (%s), buflen %d",
2772                 getpid (), *(int *) buffer, *(int *) buffer ? "UDP" : "TCP",
2773                 *buflen);
2774         }
2775       else
2776         rv = VPPCOM_EINVAL;
2777       break;
2778
2779     case VPPCOM_ATTR_GET_LISTEN:
2780       if (buffer && buflen && (*buflen >= sizeof (int)))
2781         {
2782           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
2783                                                 VCL_SESS_ATTR_LISTEN);
2784           *buflen = sizeof (int);
2785
2786           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_LISTEN: %d, buflen %d",
2787                 getpid (), *(int *) buffer, *buflen);
2788         }
2789       else
2790         rv = VPPCOM_EINVAL;
2791       break;
2792
2793     case VPPCOM_ATTR_GET_ERROR:
2794       if (buffer && buflen && (*buflen >= sizeof (int)))
2795         {
2796           *(int *) buffer = 0;
2797           *buflen = sizeof (int);
2798
2799           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_ERROR: %d, buflen %d, #VPP-TBD#",
2800                 getpid (), *(int *) buffer, *buflen);
2801         }
2802       else
2803         rv = VPPCOM_EINVAL;
2804       break;
2805
2806     case VPPCOM_ATTR_GET_TX_FIFO_LEN:
2807       if (buffer && buflen && (*buflen >= sizeof (u32)))
2808         {
2809
2810           /* VPP-TBD */
2811           *(size_t *) buffer = (session->sndbuf_size ? session->sndbuf_size :
2812                                 session->tx_fifo ? session->tx_fifo->nitems :
2813                                 vcm->cfg.tx_fifo_size);
2814           *buflen = sizeof (u32);
2815
2816           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_TX_FIFO_LEN: %u (0x%x), "
2817                 "buflen %d, #VPP-TBD#", getpid (),
2818                 *(size_t *) buffer, *(size_t *) buffer, *buflen);
2819         }
2820       else
2821         rv = VPPCOM_EINVAL;
2822       break;
2823
2824     case VPPCOM_ATTR_SET_TX_FIFO_LEN:
2825       if (buffer && buflen && (*buflen == sizeof (u32)))
2826         {
2827           /* VPP-TBD */
2828           session->sndbuf_size = *(u32 *) buffer;
2829           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_TX_FIFO_LEN: %u (0x%x), "
2830                 "buflen %d, #VPP-TBD#", getpid (),
2831                 session->sndbuf_size, session->sndbuf_size, *buflen);
2832         }
2833       else
2834         rv = VPPCOM_EINVAL;
2835       break;
2836
2837     case VPPCOM_ATTR_GET_RX_FIFO_LEN:
2838       if (buffer && buflen && (*buflen >= sizeof (u32)))
2839         {
2840
2841           /* VPP-TBD */
2842           *(size_t *) buffer = (session->rcvbuf_size ? session->rcvbuf_size :
2843                                 session->rx_fifo ? session->rx_fifo->nitems :
2844                                 vcm->cfg.rx_fifo_size);
2845           *buflen = sizeof (u32);
2846
2847           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_RX_FIFO_LEN: %u (0x%x), "
2848                 "buflen %d, #VPP-TBD#", getpid (),
2849                 *(size_t *) buffer, *(size_t *) buffer, *buflen);
2850         }
2851       else
2852         rv = VPPCOM_EINVAL;
2853       break;
2854
2855     case VPPCOM_ATTR_SET_RX_FIFO_LEN:
2856       if (buffer && buflen && (*buflen == sizeof (u32)))
2857         {
2858           /* VPP-TBD */
2859           session->rcvbuf_size = *(u32 *) buffer;
2860           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_RX_FIFO_LEN: %u (0x%x), "
2861                 "buflen %d, #VPP-TBD#", getpid (),
2862                 session->sndbuf_size, session->sndbuf_size, *buflen);
2863         }
2864       else
2865         rv = VPPCOM_EINVAL;
2866       break;
2867
2868     case VPPCOM_ATTR_GET_REUSEADDR:
2869       if (buffer && buflen && (*buflen >= sizeof (int)))
2870         {
2871           /* VPP-TBD */
2872           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
2873                                                 VCL_SESS_ATTR_REUSEADDR);
2874           *buflen = sizeof (int);
2875
2876           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_REUSEADDR: %d, "
2877                 "buflen %d, #VPP-TBD#", getpid (), *(int *) buffer, *buflen);
2878         }
2879       else
2880         rv = VPPCOM_EINVAL;
2881       break;
2882
2883     case VPPCOM_ATTR_SET_REUSEADDR:
2884       if (buffer && buflen && (*buflen == sizeof (int)) &&
2885           !VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_LISTEN))
2886         {
2887           /* VPP-TBD */
2888           if (*(int *) buffer)
2889             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_REUSEADDR);
2890           else
2891             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_REUSEADDR);
2892
2893           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_REUSEADDR: %d, buflen %d,"
2894                 " #VPP-TBD#", getpid (),
2895                 VCL_SESS_ATTR_TEST (session->attr,
2896                                     VCL_SESS_ATTR_REUSEADDR), *buflen);
2897         }
2898       else
2899         rv = VPPCOM_EINVAL;
2900       break;
2901
2902     case VPPCOM_ATTR_GET_REUSEPORT:
2903       if (buffer && buflen && (*buflen >= sizeof (int)))
2904         {
2905           /* VPP-TBD */
2906           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
2907                                                 VCL_SESS_ATTR_REUSEPORT);
2908           *buflen = sizeof (int);
2909
2910           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_REUSEPORT: %d, buflen %d,"
2911                 " #VPP-TBD#", getpid (), *(int *) buffer, *buflen);
2912         }
2913       else
2914         rv = VPPCOM_EINVAL;
2915       break;
2916
2917     case VPPCOM_ATTR_SET_REUSEPORT:
2918       if (buffer && buflen && (*buflen == sizeof (int)) &&
2919           !VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_LISTEN))
2920         {
2921           /* VPP-TBD */
2922           if (*(int *) buffer)
2923             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_REUSEPORT);
2924           else
2925             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_REUSEPORT);
2926
2927           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_REUSEPORT: %d, buflen %d,"
2928                 " #VPP-TBD#", getpid (),
2929                 VCL_SESS_ATTR_TEST (session->attr,
2930                                     VCL_SESS_ATTR_REUSEPORT), *buflen);
2931         }
2932       else
2933         rv = VPPCOM_EINVAL;
2934       break;
2935
2936     case VPPCOM_ATTR_GET_BROADCAST:
2937       if (buffer && buflen && (*buflen >= sizeof (int)))
2938         {
2939           /* VPP-TBD */
2940           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
2941                                                 VCL_SESS_ATTR_BROADCAST);
2942           *buflen = sizeof (int);
2943
2944           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_BROADCAST: %d, buflen %d,"
2945                 " #VPP-TBD#", getpid (), *(int *) buffer, *buflen);
2946         }
2947       else
2948         rv = VPPCOM_EINVAL;
2949       break;
2950
2951     case VPPCOM_ATTR_SET_BROADCAST:
2952       if (buffer && buflen && (*buflen == sizeof (int)))
2953         {
2954           /* VPP-TBD */
2955           if (*(int *) buffer)
2956             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_BROADCAST);
2957           else
2958             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_BROADCAST);
2959
2960           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_BROADCAST: %d, buflen %d, "
2961                 "#VPP-TBD#", getpid (),
2962                 VCL_SESS_ATTR_TEST (session->attr,
2963                                     VCL_SESS_ATTR_BROADCAST), *buflen);
2964         }
2965       else
2966         rv = VPPCOM_EINVAL;
2967       break;
2968
2969     case VPPCOM_ATTR_GET_V6ONLY:
2970       if (buffer && buflen && (*buflen >= sizeof (int)))
2971         {
2972           /* VPP-TBD */
2973           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
2974                                                 VCL_SESS_ATTR_V6ONLY);
2975           *buflen = sizeof (int);
2976
2977           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_V6ONLY: %d, buflen %d, "
2978                 "#VPP-TBD#", getpid (), *(int *) buffer, *buflen);
2979         }
2980       else
2981         rv = VPPCOM_EINVAL;
2982       break;
2983
2984     case VPPCOM_ATTR_SET_V6ONLY:
2985       if (buffer && buflen && (*buflen == sizeof (int)))
2986         {
2987           /* VPP-TBD */
2988           if (*(int *) buffer)
2989             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_V6ONLY);
2990           else
2991             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_V6ONLY);
2992
2993           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_V6ONLY: %d, buflen %d, "
2994                 "#VPP-TBD#", getpid (),
2995                 VCL_SESS_ATTR_TEST (session->attr,
2996                                     VCL_SESS_ATTR_V6ONLY), *buflen);
2997         }
2998       else
2999         rv = VPPCOM_EINVAL;
3000       break;
3001
3002     case VPPCOM_ATTR_GET_KEEPALIVE:
3003       if (buffer && buflen && (*buflen >= sizeof (int)))
3004         {
3005           /* VPP-TBD */
3006           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
3007                                                 VCL_SESS_ATTR_KEEPALIVE);
3008           *buflen = sizeof (int);
3009
3010           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_KEEPALIVE: %d, buflen %d, "
3011                 "#VPP-TBD#", getpid (), *(int *) buffer, *buflen);
3012         }
3013       else
3014         rv = VPPCOM_EINVAL;
3015       break;
3016
3017     case VPPCOM_ATTR_SET_KEEPALIVE:
3018       if (buffer && buflen && (*buflen == sizeof (int)))
3019         {
3020           /* VPP-TBD */
3021           if (*(int *) buffer)
3022             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_KEEPALIVE);
3023           else
3024             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_KEEPALIVE);
3025
3026           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_KEEPALIVE: %d, buflen %d, "
3027                 "#VPP-TBD#", getpid (),
3028                 VCL_SESS_ATTR_TEST (session->attr,
3029                                     VCL_SESS_ATTR_KEEPALIVE), *buflen);
3030         }
3031       else
3032         rv = VPPCOM_EINVAL;
3033       break;
3034
3035     case VPPCOM_ATTR_GET_TCP_NODELAY:
3036       if (buffer && buflen && (*buflen >= sizeof (int)))
3037         {
3038           /* VPP-TBD */
3039           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
3040                                                 VCL_SESS_ATTR_TCP_NODELAY);
3041           *buflen = sizeof (int);
3042
3043           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_TCP_NODELAY: %d, buflen %d, "
3044                 "#VPP-TBD#", getpid (), *(int *) buffer, *buflen);
3045         }
3046       else
3047         rv = VPPCOM_EINVAL;
3048       break;
3049
3050     case VPPCOM_ATTR_SET_TCP_NODELAY:
3051       if (buffer && buflen && (*buflen == sizeof (int)))
3052         {
3053           /* VPP-TBD */
3054           if (*(int *) buffer)
3055             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_TCP_NODELAY);
3056           else
3057             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_TCP_NODELAY);
3058
3059           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_TCP_NODELAY: %d, buflen %d, "
3060                 "#VPP-TBD#", getpid (),
3061                 VCL_SESS_ATTR_TEST (session->attr,
3062                                     VCL_SESS_ATTR_TCP_NODELAY), *buflen);
3063         }
3064       else
3065         rv = VPPCOM_EINVAL;
3066       break;
3067
3068     case VPPCOM_ATTR_GET_TCP_KEEPIDLE:
3069       if (buffer && buflen && (*buflen >= sizeof (int)))
3070         {
3071           /* VPP-TBD */
3072           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
3073                                                 VCL_SESS_ATTR_TCP_KEEPIDLE);
3074           *buflen = sizeof (int);
3075
3076           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_TCP_KEEPIDLE: %d, buflen %d, "
3077                 "#VPP-TBD#", getpid (), *(int *) buffer, *buflen);
3078         }
3079       else
3080         rv = VPPCOM_EINVAL;
3081       break;
3082
3083     case VPPCOM_ATTR_SET_TCP_KEEPIDLE:
3084       if (buffer && buflen && (*buflen == sizeof (int)))
3085         {
3086           /* VPP-TBD */
3087           if (*(int *) buffer)
3088             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_TCP_KEEPIDLE);
3089           else
3090             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_TCP_KEEPIDLE);
3091
3092           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_TCP_KEEPIDLE: %d, buflen %d, "
3093                 "#VPP-TBD#", getpid (),
3094                 VCL_SESS_ATTR_TEST (session->attr,
3095                                     VCL_SESS_ATTR_TCP_KEEPIDLE), *buflen);
3096         }
3097       else
3098         rv = VPPCOM_EINVAL;
3099       break;
3100
3101     case VPPCOM_ATTR_GET_TCP_KEEPINTVL:
3102       if (buffer && buflen && (*buflen >= sizeof (int)))
3103         {
3104           /* VPP-TBD */
3105           *(int *) buffer = VCL_SESS_ATTR_TEST (session->attr,
3106                                                 VCL_SESS_ATTR_TCP_KEEPINTVL);
3107           *buflen = sizeof (int);
3108
3109           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_TCP_KEEPINTVL: %d, buflen %d, "
3110                 "#VPP-TBD#", getpid (), *(int *) buffer, *buflen);
3111         }
3112       else
3113         rv = VPPCOM_EINVAL;
3114       break;
3115
3116     case VPPCOM_ATTR_SET_TCP_KEEPINTVL:
3117       if (buffer && buflen && (*buflen == sizeof (int)))
3118         {
3119           /* VPP-TBD */
3120           if (*(int *) buffer)
3121             VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_TCP_KEEPINTVL);
3122           else
3123             VCL_SESS_ATTR_CLR (session->attr, VCL_SESS_ATTR_TCP_KEEPINTVL);
3124
3125           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_TCP_KEEPINTVL: %d, buflen %d, "
3126                 "#VPP-TBD#", getpid (),
3127                 VCL_SESS_ATTR_TEST (session->attr,
3128                                     VCL_SESS_ATTR_TCP_KEEPINTVL), *buflen);
3129         }
3130       else
3131         rv = VPPCOM_EINVAL;
3132       break;
3133
3134     case VPPCOM_ATTR_GET_TCP_USER_MSS:
3135       if (buffer && buflen && (*buflen >= sizeof (u32)))
3136         {
3137           /* VPP-TBD */
3138           *(u32 *) buffer = session->user_mss;
3139           *buflen = sizeof (int);
3140
3141           VDBG (2, "VCL<%d>: VPPCOM_ATTR_GET_TCP_USER_MSS: %d, buflen %d,"
3142                 " #VPP-TBD#", getpid (), *(int *) buffer, *buflen);
3143         }
3144       else
3145         rv = VPPCOM_EINVAL;
3146       break;
3147
3148     case VPPCOM_ATTR_SET_TCP_USER_MSS:
3149       if (buffer && buflen && (*buflen == sizeof (u32)))
3150         {
3151           /* VPP-TBD */
3152           session->user_mss = *(u32 *) buffer;
3153
3154           VDBG (2, "VCL<%d>: VPPCOM_ATTR_SET_TCP_USER_MSS: %u, buflen %d, "
3155                 "#VPP-TBD#", getpid (), session->user_mss, *buflen);
3156         }
3157       else
3158         rv = VPPCOM_EINVAL;
3159       break;
3160
3161     case VPPCOM_ATTR_SET_SHUT:
3162       if (*flags == SHUT_RD || *flags == SHUT_RDWR)
3163         VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_SHUT_RD);
3164       if (*flags == SHUT_WR || *flags == SHUT_RDWR)
3165         VCL_SESS_ATTR_SET (session->attr, VCL_SESS_ATTR_SHUT_WR);
3166       break;
3167
3168     case VPPCOM_ATTR_GET_SHUT:
3169       if (VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_SHUT_RD))
3170         tmp_flags = 1;
3171       if (VCL_SESS_ATTR_TEST (session->attr, VCL_SESS_ATTR_SHUT_WR))
3172         tmp_flags |= 2;
3173       if (tmp_flags == 1)
3174         *(int *) buffer = SHUT_RD;
3175       else if (tmp_flags == 2)
3176         *(int *) buffer = SHUT_WR;
3177       else if (tmp_flags == 3)
3178         *(int *) buffer = SHUT_RDWR;
3179       *buflen = sizeof (int);
3180       break;
3181     default:
3182       rv = VPPCOM_EINVAL;
3183       break;
3184     }
3185
3186   return rv;
3187 }
3188
3189 int
3190 vppcom_session_recvfrom (uint32_t session_handle, void *buffer,
3191                          uint32_t buflen, int flags, vppcom_endpt_t * ep)
3192 {
3193   vcl_worker_t *wrk = vcl_worker_get_current ();
3194   int rv = VPPCOM_OK;
3195   vcl_session_t *session = 0;
3196
3197   if (ep)
3198     {
3199       session = vcl_session_get_w_handle (wrk, session_handle);
3200       if (PREDICT_FALSE (!session))
3201         {
3202           VDBG (0, "VCL<%d>: invalid session, sid (%u) has been closed!",
3203                 getpid (), session_handle);
3204           return VPPCOM_EBADFD;
3205         }
3206       ep->is_ip4 = session->transport.is_ip4;
3207       ep->port = session->transport.rmt_port;
3208     }
3209
3210   if (flags == 0)
3211     rv = vppcom_session_read (session_handle, buffer, buflen);
3212   else if (flags & MSG_PEEK)
3213     rv = vppcom_session_peek (session_handle, buffer, buflen);
3214   else
3215     {
3216       VDBG (0, "Unsupport flags for recvfrom %d", flags);
3217       return VPPCOM_EAFNOSUPPORT;
3218     }
3219
3220   if (ep)
3221     {
3222       if (session->transport.is_ip4)
3223         clib_memcpy_fast (ep->ip, &session->transport.rmt_ip.ip4,
3224                           sizeof (ip4_address_t));
3225       else
3226         clib_memcpy_fast (ep->ip, &session->transport.rmt_ip.ip6,
3227                           sizeof (ip6_address_t));
3228     }
3229
3230   return rv;
3231 }
3232
3233 int
3234 vppcom_session_sendto (uint32_t session_handle, void *buffer,
3235                        uint32_t buflen, int flags, vppcom_endpt_t * ep)
3236 {
3237   if (!buffer)
3238     return VPPCOM_EINVAL;
3239
3240   if (ep)
3241     {
3242       // TBD
3243       return VPPCOM_EINVAL;
3244     }
3245
3246   if (flags)
3247     {
3248       // TBD check the flags and do the right thing
3249       VDBG (2, "VCL<%d>: handling flags 0x%u (%d) not implemented yet.",
3250             getpid (), flags, flags);
3251     }
3252
3253   return (vppcom_session_write_inline (session_handle, buffer, buflen, 1));
3254 }
3255
3256 int
3257 vppcom_poll (vcl_poll_t * vp, uint32_t n_sids, double wait_for_time)
3258 {
3259   vcl_worker_t *wrk = vcl_worker_get_current ();
3260   f64 timeout = clib_time_now (&wrk->clib_time) + wait_for_time;
3261   u32 i, keep_trying = 1;
3262   svm_msg_q_msg_t msg;
3263   session_event_t *e;
3264   int rv, num_ev = 0;
3265
3266   VDBG (3, "VCL<%d>: vp %p, nsids %u, wait_for_time %f",
3267         getpid (), vp, n_sids, wait_for_time);
3268
3269   if (!vp)
3270     return VPPCOM_EFAULT;
3271
3272   do
3273     {
3274       vcl_session_t *session;
3275
3276       /* Dequeue all events and drop all unhandled io events */
3277       while (svm_msg_q_sub (wrk->app_event_queue, &msg, SVM_Q_NOWAIT, 0) == 0)
3278         {
3279           e = svm_msg_q_msg_data (wrk->app_event_queue, &msg);
3280           vcl_handle_mq_event (wrk, e);
3281           svm_msg_q_free_msg (wrk->app_event_queue, &msg);
3282         }
3283       vec_reset_length (wrk->unhandled_evts_vector);
3284
3285       for (i = 0; i < n_sids; i++)
3286         {
3287           session = vcl_session_get (wrk, vp[i].sh);
3288           if (!session)
3289             {
3290               vp[i].revents = POLLHUP;
3291               num_ev++;
3292               continue;
3293             }
3294
3295           vp[i].revents = 0;
3296
3297           if (POLLIN & vp[i].events)
3298             {
3299               rv = vcl_session_read_ready (session);
3300               if (rv > 0)
3301                 {
3302                   vp[i].revents |= POLLIN;
3303                   num_ev++;
3304                 }
3305               else if (rv < 0)
3306                 {
3307                   switch (rv)
3308                     {
3309                     case VPPCOM_ECONNRESET:
3310                       vp[i].revents = POLLHUP;
3311                       break;
3312
3313                     default:
3314                       vp[i].revents = POLLERR;
3315                       break;
3316                     }
3317                   num_ev++;
3318                 }
3319             }
3320
3321           if (POLLOUT & vp[i].events)
3322             {
3323               rv = vcl_session_write_ready (session);
3324               if (rv > 0)
3325                 {
3326                   vp[i].revents |= POLLOUT;
3327                   num_ev++;
3328                 }
3329               else if (rv < 0)
3330                 {
3331                   switch (rv)
3332                     {
3333                     case VPPCOM_ECONNRESET:
3334                       vp[i].revents = POLLHUP;
3335                       break;
3336
3337                     default:
3338                       vp[i].revents = POLLERR;
3339                       break;
3340                     }
3341                   num_ev++;
3342                 }
3343             }
3344
3345           if (0)                // Note "done:" label used by VCL_SESSION_LOCK_AND_GET()
3346             {
3347               vp[i].revents = POLLNVAL;
3348               num_ev++;
3349             }
3350         }
3351       if (wait_for_time != -1)
3352         keep_trying = (clib_time_now (&wrk->clib_time) <= timeout) ? 1 : 0;
3353     }
3354   while ((num_ev == 0) && keep_trying);
3355
3356   if (VPPCOM_DEBUG > 3)
3357     {
3358       clib_warning ("VCL<%d>: returning %d", getpid (), num_ev);
3359       for (i = 0; i < n_sids; i++)
3360         {
3361           clib_warning ("VCL<%d>: vp[%d].sid %d (0x%x), .events 0x%x, "
3362                         ".revents 0x%x", getpid (), i, vp[i].sh, vp[i].sh,
3363                         vp[i].events, vp[i].revents);
3364         }
3365     }
3366   return num_ev;
3367 }
3368
3369 int
3370 vppcom_mq_epoll_fd (void)
3371 {
3372   vcl_worker_t *wrk = vcl_worker_get_current ();
3373   return wrk->mqs_epfd;
3374 }
3375
3376 int
3377 vppcom_session_index (vcl_session_handle_t session_handle)
3378 {
3379   return session_handle & 0xFFFFFF;
3380 }
3381
3382 int
3383 vppcom_session_worker (vcl_session_handle_t session_handle)
3384 {
3385   return session_handle >> 24;
3386 }
3387
3388 int
3389 vppcom_worker_register (void)
3390 {
3391   if (!vcl_worker_alloc_and_init ())
3392     return VPPCOM_EEXIST;
3393
3394   if (vcl_worker_set_bapi ())
3395     return VPPCOM_EEXIST;
3396
3397   if (vcl_worker_register_with_vpp ())
3398     return VPPCOM_EEXIST;
3399
3400   return VPPCOM_OK;
3401 }
3402
3403 int
3404 vppcom_worker_index (void)
3405 {
3406   return vcl_get_worker_index ();
3407 }
3408
3409 int
3410 vppcom_worker_mqs_epfd (void)
3411 {
3412   vcl_worker_t *wrk = vcl_worker_get_current ();
3413   if (!vcm->cfg.use_mq_eventfd)
3414     return -1;
3415   return wrk->mqs_epfd;
3416 }
3417
3418 /*
3419  * fd.io coding-style-patch-verification: ON
3420  *
3421  * Local Variables:
3422  * eval: (c-set-style "gnu")
3423  * End:
3424  */