3a1d48b1a7e55cebe12b15bbd1a1129a87b30023
[vpp.git] / src / vnet / session / application_local.c
1 /*
2  * Copyright (c) 2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include <vnet/session/application_local.h>
17 #include <vnet/session/session.h>
18
19 typedef enum ct_segment_flags_
20 {
21   CT_SEGMENT_F_CLIENT_DETACHED = 1 << 0,
22   CT_SEGMENT_F_SERVER_DETACHED = 1 << 1,
23 } ct_segment_flags_t;
24
25 typedef struct ct_segment_
26 {
27   u32 client_n_sessions;
28   u32 server_n_sessions;
29   u32 seg_ctx_index;
30   u32 ct_seg_index;
31   u32 segment_index;
32   ct_segment_flags_t flags;
33 } ct_segment_t;
34
35 typedef struct ct_segments_
36 {
37   u32 sm_index;
38   u32 server_wrk;
39   u32 client_wrk;
40   u32 fifo_pair_bytes;
41   ct_segment_t *segments;
42 } ct_segments_ctx_t;
43
44 typedef struct ct_cleanup_req_
45 {
46   u32 ct_index;
47 } ct_cleanup_req_t;
48
49 typedef struct ct_worker_
50 {
51   ct_connection_t *connections;       /**< Per-worker connection pools */
52   ct_cleanup_req_t *pending_cleanups; /**< Fifo of pending indices */
53   u8 have_cleanups;                   /**< Set if cleanup rpc pending */
54 } ct_worker_t;
55
56 typedef struct ct_main_
57 {
58   ct_worker_t *wrk;                     /**< Per-worker state */
59   u32 n_workers;                        /**< Number of vpp workers */
60   u32 n_sessions;                       /**< Cumulative sessions counter */
61   u32 *ho_reusable;                     /**< Vector of reusable ho indices */
62   clib_spinlock_t ho_reuseable_lock;    /**< Lock for reusable ho indices */
63   clib_rwlock_t app_segs_lock;          /**< RW lock for seg contexts */
64   uword *app_segs_ctxs_table;           /**< App handle to segment pool map */
65   ct_segments_ctx_t *app_seg_ctxs;      /**< Pool of ct segment contexts */
66 } ct_main_t;
67
68 static ct_main_t ct_main;
69
70 static inline ct_worker_t *
71 ct_worker_get (u32 thread_index)
72 {
73   return &ct_main.wrk[thread_index];
74 }
75
76 static ct_connection_t *
77 ct_connection_alloc (u32 thread_index)
78 {
79   ct_worker_t *wrk = ct_worker_get (thread_index);
80   ct_connection_t *ct;
81
82   pool_get_zero (wrk->connections, ct);
83   ct->c_c_index = ct - wrk->connections;
84   ct->c_thread_index = thread_index;
85   ct->client_wrk = ~0;
86   ct->server_wrk = ~0;
87   ct->seg_ctx_index = ~0;
88   ct->ct_seg_index = ~0;
89   return ct;
90 }
91
92 static ct_connection_t *
93 ct_connection_get (u32 ct_index, u32 thread_index)
94 {
95   ct_worker_t *wrk = ct_worker_get (thread_index);
96
97   if (pool_is_free_index (wrk->connections, ct_index))
98     return 0;
99   return pool_elt_at_index (wrk->connections, ct_index);
100 }
101
102 static void
103 ct_connection_free (ct_connection_t * ct)
104 {
105   ct_worker_t *wrk = ct_worker_get (ct->c_thread_index);
106
107   if (CLIB_DEBUG)
108     {
109       clib_memset (ct, 0xfc, sizeof (*ct));
110       pool_put (wrk->connections, ct);
111       return;
112     }
113   pool_put (wrk->connections, ct);
114 }
115
116 static ct_connection_t *
117 ct_half_open_alloc (void)
118 {
119   ct_main_t *cm = &ct_main;
120   u32 *hip;
121
122   clib_spinlock_lock (&cm->ho_reuseable_lock);
123   vec_foreach (hip, cm->ho_reusable)
124     pool_put_index (cm->wrk[0].connections, *hip);
125   vec_reset_length (cm->ho_reusable);
126   clib_spinlock_unlock (&cm->ho_reuseable_lock);
127
128   return ct_connection_alloc (0);
129 }
130
131 void
132 ct_half_open_add_reusable (u32 ho_index)
133 {
134   ct_main_t *cm = &ct_main;
135
136   clib_spinlock_lock (&cm->ho_reuseable_lock);
137   vec_add1 (cm->ho_reusable, ho_index);
138   clib_spinlock_unlock (&cm->ho_reuseable_lock);
139 }
140
141 session_t *
142 ct_session_get_peer (session_t * s)
143 {
144   ct_connection_t *ct, *peer_ct;
145   ct = ct_connection_get (s->connection_index, s->thread_index);
146   peer_ct = ct_connection_get (ct->peer_index, s->thread_index);
147   return session_get (peer_ct->c_s_index, s->thread_index);
148 }
149
150 void
151 ct_session_endpoint (session_t * ll, session_endpoint_t * sep)
152 {
153   ct_connection_t *ct;
154   ct = (ct_connection_t *) session_get_transport (ll);
155   sep->transport_proto = ct->actual_tp;
156   sep->port = ct->c_lcl_port;
157   sep->is_ip4 = ct->c_is_ip4;
158   ip_copy (&sep->ip, &ct->c_lcl_ip, ct->c_is_ip4);
159 }
160
161 static void
162 ct_set_invalid_app_wrk (ct_connection_t *ct, u8 is_client)
163 {
164   ct_connection_t *peer_ct;
165
166   peer_ct = ct_connection_get (ct->peer_index, ct->c_thread_index);
167
168   if (is_client)
169     {
170       ct->client_wrk = APP_INVALID_INDEX;
171       if (peer_ct)
172         ct->client_wrk = APP_INVALID_INDEX;
173     }
174   else
175     {
176       ct->server_wrk = APP_INVALID_INDEX;
177       if (peer_ct)
178         ct->server_wrk = APP_INVALID_INDEX;
179     }
180 }
181
182 static void
183 ct_session_dealloc_fifos (ct_connection_t *ct, svm_fifo_t *rx_fifo,
184                           svm_fifo_t *tx_fifo)
185 {
186   ct_segments_ctx_t *seg_ctx;
187   ct_main_t *cm = &ct_main;
188   segment_manager_t *sm;
189   app_worker_t *app_wrk;
190   ct_segment_t *ct_seg;
191   fifo_segment_t *fs;
192   u32 seg_index;
193   session_t *s;
194   int cnt;
195
196   /*
197    * Cleanup fifos
198    */
199
200   sm = segment_manager_get (rx_fifo->segment_manager);
201   seg_index = rx_fifo->segment_index;
202
203   fs = segment_manager_get_segment_w_lock (sm, seg_index);
204   fifo_segment_free_fifo (fs, rx_fifo);
205   fifo_segment_free_fifo (fs, tx_fifo);
206   segment_manager_segment_reader_unlock (sm);
207
208   /*
209    * Atomically update segment context with readers lock
210    */
211
212   clib_rwlock_reader_lock (&cm->app_segs_lock);
213
214   seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, ct->seg_ctx_index);
215   ct_seg = pool_elt_at_index (seg_ctx->segments, ct->ct_seg_index);
216
217   if (ct->flags & CT_CONN_F_CLIENT)
218     {
219       cnt =
220         __atomic_sub_fetch (&ct_seg->client_n_sessions, 1, __ATOMIC_RELAXED);
221     }
222   else
223     {
224       cnt =
225         __atomic_sub_fetch (&ct_seg->server_n_sessions, 1, __ATOMIC_RELAXED);
226     }
227
228   clib_rwlock_reader_unlock (&cm->app_segs_lock);
229
230   /*
231    * No need to do any app updates, return
232    */
233   ASSERT (cnt >= 0);
234   if (cnt)
235     return;
236
237   /*
238    * Grab exclusive lock and update flags unless some other thread
239    * added more sessions
240    */
241   clib_rwlock_writer_lock (&cm->app_segs_lock);
242
243   seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, ct->seg_ctx_index);
244   ct_seg = pool_elt_at_index (seg_ctx->segments, ct->ct_seg_index);
245   if (ct->flags & CT_CONN_F_CLIENT)
246     {
247       cnt = ct_seg->client_n_sessions;
248       if (cnt)
249         goto done;
250       ct_seg->flags |= CT_SEGMENT_F_CLIENT_DETACHED;
251       s = session_get (ct->c_s_index, ct->c_thread_index);
252       if (s->app_wrk_index == APP_INVALID_INDEX)
253         ct_set_invalid_app_wrk (ct, 1 /* is_client */);
254     }
255   else
256     {
257       cnt = ct_seg->server_n_sessions;
258       if (cnt)
259         goto done;
260       ct_seg->flags |= CT_SEGMENT_F_SERVER_DETACHED;
261       s = session_get (ct->c_s_index, ct->c_thread_index);
262       if (s->app_wrk_index == APP_INVALID_INDEX)
263         ct_set_invalid_app_wrk (ct, 0 /* is_client */);
264     }
265
266   if (!(ct_seg->flags & CT_SEGMENT_F_CLIENT_DETACHED) ||
267       !(ct_seg->flags & CT_SEGMENT_F_SERVER_DETACHED))
268     goto done;
269
270   /*
271    * Remove segment context because both client and server detached
272    */
273
274   pool_put_index (seg_ctx->segments, ct->ct_seg_index);
275
276   /*
277    * No more segment indices left, remove the segments context
278    */
279   if (!pool_elts (seg_ctx->segments))
280     {
281       u64 table_handle = seg_ctx->client_wrk << 16 | seg_ctx->server_wrk;
282       table_handle = (u64) seg_ctx->sm_index << 32 | table_handle;
283       hash_unset (cm->app_segs_ctxs_table, table_handle);
284       pool_free (seg_ctx->segments);
285       pool_put_index (cm->app_seg_ctxs, ct->seg_ctx_index);
286     }
287
288   /*
289    * Segment to be removed so notify both apps
290    */
291
292   app_wrk = app_worker_get_if_valid (ct->client_wrk);
293   /* Determine if client app still needs notification, i.e., if it is
294    * still attached. If client detached and this is the last ct session
295    * on this segment, then its connects segment manager should also be
296    * detached, so do not send notification */
297   if (app_wrk)
298     {
299       segment_manager_t *csm;
300       csm = app_worker_get_connect_segment_manager (app_wrk);
301       if (!segment_manager_app_detached (csm))
302         app_worker_del_segment_notify (app_wrk, ct->segment_handle);
303     }
304
305   if (!segment_manager_app_detached (sm))
306     {
307       app_wrk = app_worker_get (ct->server_wrk);
308       app_worker_del_segment_notify (app_wrk, ct->segment_handle);
309     }
310
311   segment_manager_lock_and_del_segment (sm, seg_index);
312
313   /* Cleanup segment manager if needed. If server detaches there's a chance
314    * the client's sessions will hold up segment removal */
315   if (segment_manager_app_detached (sm) && !segment_manager_has_fifos (sm))
316     segment_manager_free_safe (sm);
317
318 done:
319
320   clib_rwlock_writer_unlock (&cm->app_segs_lock);
321 }
322
323 int
324 ct_session_connect_notify (session_t *ss, session_error_t err)
325 {
326   u32 ss_index, opaque, thread_index;
327   ct_connection_t *sct, *cct;
328   app_worker_t *client_wrk;
329   session_t *cs;
330
331   ss_index = ss->session_index;
332   thread_index = ss->thread_index;
333   sct = (ct_connection_t *) session_get_transport (ss);
334   client_wrk = app_worker_get (sct->client_wrk);
335   opaque = sct->client_opaque;
336
337   cct = ct_connection_get (sct->peer_index, thread_index);
338
339   /* Client closed while waiting for reply from server */
340   if (PREDICT_FALSE (!cct))
341     {
342       session_transport_closing_notify (&sct->connection);
343       session_transport_delete_notify (&sct->connection);
344       ct_connection_free (sct);
345       return 0;
346     }
347
348   session_half_open_delete_notify (&cct->connection);
349   cct->flags &= ~CT_CONN_F_HALF_OPEN;
350
351   if (PREDICT_FALSE (err))
352     goto connect_error;
353
354   /*
355    * Alloc client session
356    */
357
358   cs = session_alloc (thread_index);
359   ss = session_get (ss_index, thread_index);
360   cs->session_type = ss->session_type;
361   cs->listener_handle = SESSION_INVALID_HANDLE;
362   cs->session_state = SESSION_STATE_CONNECTING;
363   cs->app_wrk_index = client_wrk->wrk_index;
364   cs->connection_index = cct->c_c_index;
365   cct->c_s_index = cs->session_index;
366
367   /* This will allocate fifos for the session. They won't be used for
368    * exchanging data but they will be used to close the connection if
369    * the segment manager/worker is freed */
370   if ((err = app_worker_init_connected (client_wrk, cs)))
371     {
372       session_free (cs);
373       session_close (ss);
374       err = SESSION_E_ALLOC;
375       goto connect_error;
376     }
377
378   cs->session_state = SESSION_STATE_CONNECTING;
379
380   if (app_worker_connect_notify (client_wrk, cs, 0, opaque))
381     {
382       segment_manager_dealloc_fifos (cs->rx_fifo, cs->tx_fifo);
383       session_free (cs);
384       session_close (ss);
385       goto cleanup_client;
386     }
387
388   cs = session_get (cct->c_s_index, cct->c_thread_index);
389   cs->session_state = SESSION_STATE_READY;
390
391   return 0;
392
393 connect_error:
394
395   app_worker_connect_notify (client_wrk, 0, err, cct->client_opaque);
396
397 cleanup_client:
398
399   if (cct->client_rx_fifo)
400     ct_session_dealloc_fifos (cct, cct->client_rx_fifo, cct->client_tx_fifo);
401   ct_connection_free (cct);
402   return -1;
403 }
404
405 static inline ct_segment_t *
406 ct_lookup_free_segment (ct_main_t *cm, segment_manager_t *sm,
407                         u32 seg_ctx_index)
408 {
409   uword free_bytes, max_free_bytes;
410   ct_segment_t *ct_seg, *res = 0;
411   ct_segments_ctx_t *seg_ctx;
412   fifo_segment_t *fs;
413   u32 max_fifos;
414
415   seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, seg_ctx_index);
416   max_free_bytes = seg_ctx->fifo_pair_bytes;
417
418   pool_foreach (ct_seg, seg_ctx->segments)
419     {
420       /* Client or server has detached so segment cannot be used */
421       fs = segment_manager_get_segment (sm, ct_seg->segment_index);
422       free_bytes = fifo_segment_available_bytes (fs);
423       max_fifos = fifo_segment_size (fs) / seg_ctx->fifo_pair_bytes;
424       if (free_bytes > max_free_bytes &&
425           fifo_segment_num_fifos (fs) / 2 < max_fifos)
426         {
427           max_free_bytes = free_bytes;
428           res = ct_seg;
429         }
430     }
431
432   return res;
433 }
434
435 static ct_segment_t *
436 ct_alloc_segment (ct_main_t *cm, app_worker_t *server_wrk, u64 table_handle,
437                   segment_manager_t *sm, u32 client_wrk_index)
438 {
439   u32 seg_ctx_index = ~0, sm_index, pair_bytes;
440   segment_manager_props_t *props;
441   const u32 margin = 16 << 10;
442   ct_segments_ctx_t *seg_ctx;
443   app_worker_t *client_wrk;
444   u64 seg_size, seg_handle;
445   application_t *server;
446   ct_segment_t *ct_seg;
447   uword *spp;
448   int fs_index;
449
450   server = application_get (server_wrk->app_index);
451   props = application_segment_manager_properties (server);
452   sm_index = segment_manager_index (sm);
453   pair_bytes = props->rx_fifo_size + props->tx_fifo_size + margin;
454
455   /*
456    * Make sure another thread did not alloc a segment while acquiring the lock
457    */
458
459   spp = hash_get (cm->app_segs_ctxs_table, table_handle);
460   if (spp)
461     {
462       seg_ctx_index = *spp;
463       ct_seg = ct_lookup_free_segment (cm, sm, seg_ctx_index);
464       if (ct_seg)
465         return ct_seg;
466     }
467
468   /*
469    * No segment, try to alloc one and notify the server and the client.
470    * Make sure the segment is not used for other fifos
471    */
472   seg_size = clib_max (props->segment_size, 128 << 20);
473   fs_index =
474     segment_manager_add_segment2 (sm, seg_size, FIFO_SEGMENT_F_CUSTOM_USE);
475   if (fs_index < 0)
476     return 0;
477
478   if (seg_ctx_index == ~0)
479     {
480       pool_get_zero (cm->app_seg_ctxs, seg_ctx);
481       seg_ctx_index = seg_ctx - cm->app_seg_ctxs;
482       hash_set (cm->app_segs_ctxs_table, table_handle, seg_ctx_index);
483       seg_ctx->server_wrk = server_wrk->wrk_index;
484       seg_ctx->client_wrk = client_wrk_index;
485       seg_ctx->sm_index = sm_index;
486       seg_ctx->fifo_pair_bytes = pair_bytes;
487     }
488   else
489     {
490       seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, seg_ctx_index);
491     }
492
493   pool_get_zero (seg_ctx->segments, ct_seg);
494   ct_seg->segment_index = fs_index;
495   ct_seg->server_n_sessions = 0;
496   ct_seg->client_n_sessions = 0;
497   ct_seg->ct_seg_index = ct_seg - seg_ctx->segments;
498   ct_seg->seg_ctx_index = seg_ctx_index;
499
500   /* New segment, notify the server and client */
501   seg_handle = segment_manager_make_segment_handle (sm_index, fs_index);
502   if (app_worker_add_segment_notify (server_wrk, seg_handle))
503     goto error;
504
505   client_wrk = app_worker_get (client_wrk_index);
506   if (app_worker_add_segment_notify (client_wrk, seg_handle))
507     {
508       app_worker_del_segment_notify (server_wrk, seg_handle);
509       goto error;
510     }
511
512   return ct_seg;
513
514 error:
515
516   segment_manager_lock_and_del_segment (sm, fs_index);
517   pool_put_index (seg_ctx->segments, ct_seg->seg_ctx_index);
518   return 0;
519 }
520
521 static int
522 ct_init_accepted_session (app_worker_t *server_wrk, ct_connection_t *ct,
523                           session_t *ls, session_t *ll)
524 {
525   segment_manager_props_t *props;
526   u64 seg_handle, table_handle;
527   u32 sm_index, fs_index = ~0;
528   ct_segments_ctx_t *seg_ctx;
529   ct_main_t *cm = &ct_main;
530   application_t *server;
531   segment_manager_t *sm;
532   ct_segment_t *ct_seg;
533   fifo_segment_t *fs;
534   uword *spp;
535   int rv;
536
537   sm = app_worker_get_listen_segment_manager (server_wrk, ll);
538   sm_index = segment_manager_index (sm);
539   server = application_get (server_wrk->app_index);
540   props = application_segment_manager_properties (server);
541
542   table_handle = ct->client_wrk << 16 | server_wrk->wrk_index;
543   table_handle = (u64) sm_index << 32 | table_handle;
544
545   /*
546    * Check if we already have a segment that can hold the fifos
547    */
548
549   clib_rwlock_reader_lock (&cm->app_segs_lock);
550
551   spp = hash_get (cm->app_segs_ctxs_table, table_handle);
552   if (spp)
553     {
554       ct_seg = ct_lookup_free_segment (cm, sm, *spp);
555       if (ct_seg)
556         {
557           ct->seg_ctx_index = ct_seg->seg_ctx_index;
558           ct->ct_seg_index = ct_seg->ct_seg_index;
559           fs_index = ct_seg->segment_index;
560           ct_seg->flags &=
561             ~(CT_SEGMENT_F_SERVER_DETACHED | CT_SEGMENT_F_CLIENT_DETACHED);
562           __atomic_add_fetch (&ct_seg->server_n_sessions, 1, __ATOMIC_RELAXED);
563           __atomic_add_fetch (&ct_seg->client_n_sessions, 1, __ATOMIC_RELAXED);
564         }
565     }
566
567   clib_rwlock_reader_unlock (&cm->app_segs_lock);
568
569   /*
570    * If not, grab exclusive lock and allocate segment
571    */
572   if (fs_index == ~0)
573     {
574       clib_rwlock_writer_lock (&cm->app_segs_lock);
575
576       ct_seg =
577         ct_alloc_segment (cm, server_wrk, table_handle, sm, ct->client_wrk);
578       if (!ct_seg)
579         {
580           clib_rwlock_writer_unlock (&cm->app_segs_lock);
581           return -1;
582         }
583
584       ct->seg_ctx_index = ct_seg->seg_ctx_index;
585       ct->ct_seg_index = ct_seg->ct_seg_index;
586       ct_seg->server_n_sessions += 1;
587       ct_seg->client_n_sessions += 1;
588       fs_index = ct_seg->segment_index;
589
590       clib_rwlock_writer_unlock (&cm->app_segs_lock);
591     }
592
593   /*
594    * Allocate and initialize the fifos
595    */
596   fs = segment_manager_get_segment_w_lock (sm, fs_index);
597   rv = segment_manager_try_alloc_fifos (
598     fs, ls->thread_index, props->rx_fifo_size, props->tx_fifo_size,
599     &ls->rx_fifo, &ls->tx_fifo);
600   if (rv)
601     {
602       segment_manager_segment_reader_unlock (sm);
603
604       clib_rwlock_reader_lock (&cm->app_segs_lock);
605
606       seg_ctx = pool_elt_at_index (cm->app_seg_ctxs, ct->seg_ctx_index);
607       ct_seg = pool_elt_at_index (seg_ctx->segments, ct->ct_seg_index);
608       __atomic_sub_fetch (&ct_seg->server_n_sessions, 1, __ATOMIC_RELAXED);
609       __atomic_sub_fetch (&ct_seg->client_n_sessions, 1, __ATOMIC_RELAXED);
610
611       clib_rwlock_reader_unlock (&cm->app_segs_lock);
612
613       return rv;
614     }
615
616   ls->rx_fifo->shr->master_session_index = ls->session_index;
617   ls->tx_fifo->shr->master_session_index = ls->session_index;
618   ls->rx_fifo->master_thread_index = ls->thread_index;
619   ls->tx_fifo->master_thread_index = ls->thread_index;
620   ls->rx_fifo->segment_manager = sm_index;
621   ls->tx_fifo->segment_manager = sm_index;
622   ls->rx_fifo->segment_index = fs_index;
623   ls->tx_fifo->segment_index = fs_index;
624
625   seg_handle = segment_manager_segment_handle (sm, fs);
626   segment_manager_segment_reader_unlock (sm);
627
628   ct->segment_handle = seg_handle;
629
630   return 0;
631 }
632
633 static void
634 ct_accept_rpc_wrk_handler (void *accept_args)
635 {
636   u32 cct_index, ho_index, thread_index, ll_index;
637   ct_connection_t *sct, *cct, *ho;
638   transport_connection_t *ll_ct;
639   app_worker_t *server_wrk;
640   session_t *ss, *ll;
641
642   /*
643    * Alloc client ct and initialize from ho
644    */
645   thread_index = vlib_get_thread_index ();
646   cct = ct_connection_alloc (thread_index);
647   cct_index = cct->c_c_index;
648
649   ho_index = pointer_to_uword (accept_args);
650   ho = ct_connection_get (ho_index, 0);
651
652   /* Unlikely but half-open session and transport could have been freed */
653   if (PREDICT_FALSE (!ho))
654     {
655       ct_connection_free (cct);
656       return;
657     }
658
659   clib_memcpy (cct, ho, sizeof (*ho));
660   cct->c_c_index = cct_index;
661   cct->c_thread_index = thread_index;
662   cct->flags |= CT_CONN_F_HALF_OPEN;
663
664   /* Notify session layer that half-open is on a different thread
665    * and mark ho connection index reusable. Avoids another rpc
666    */
667   session_half_open_migrate_notify (&cct->connection);
668   session_half_open_migrated_notify (&cct->connection);
669   ct_half_open_add_reusable (ho_index);
670
671   /*
672    * Alloc and init server transport
673    */
674
675   ll_index = cct->peer_index;
676   ll = listen_session_get (ll_index);
677   sct = ct_connection_alloc (thread_index);
678   /* Transport not necessarily ct but it might, so grab after sct alloc */
679   ll_ct = listen_session_get_transport (ll);
680
681   /* Make sure cct is valid after sct alloc */
682   cct = ct_connection_get (cct_index, thread_index);
683
684   sct->c_rmt_port = 0;
685   sct->c_lcl_port = ll_ct->lcl_port;
686   sct->c_is_ip4 = cct->c_is_ip4;
687   clib_memcpy (&sct->c_lcl_ip, &cct->c_rmt_ip, sizeof (cct->c_rmt_ip));
688   sct->client_wrk = cct->client_wrk;
689   sct->c_proto = TRANSPORT_PROTO_NONE;
690   sct->client_opaque = cct->client_opaque;
691   sct->actual_tp = cct->actual_tp;
692
693   sct->peer_index = cct->c_c_index;
694   cct->peer_index = sct->c_c_index;
695
696   /*
697    * Accept server session. Client session is created only after
698    * server confirms accept.
699    */
700   ss = session_alloc (thread_index);
701   ll = listen_session_get (ll_index);
702   ss->session_type = session_type_from_proto_and_ip (TRANSPORT_PROTO_NONE,
703                                                      sct->c_is_ip4);
704   ss->connection_index = sct->c_c_index;
705   ss->listener_handle = listen_session_get_handle (ll);
706   ss->session_state = SESSION_STATE_CREATED;
707
708   server_wrk = application_listener_select_worker (ll);
709   ss->app_wrk_index = server_wrk->wrk_index;
710
711   sct->c_s_index = ss->session_index;
712   sct->server_wrk = ss->app_wrk_index;
713
714   if (ct_init_accepted_session (server_wrk, sct, ss, ll))
715     {
716       ct_session_connect_notify (ss, SESSION_E_ALLOC);
717       ct_connection_free (sct);
718       session_free (ss);
719       return;
720     }
721
722   cct->server_wrk = sct->server_wrk;
723   cct->seg_ctx_index = sct->seg_ctx_index;
724   cct->ct_seg_index = sct->ct_seg_index;
725   cct->client_rx_fifo = ss->tx_fifo;
726   cct->client_tx_fifo = ss->rx_fifo;
727   cct->client_rx_fifo->refcnt++;
728   cct->client_tx_fifo->refcnt++;
729   cct->segment_handle = sct->segment_handle;
730
731   ss->session_state = SESSION_STATE_ACCEPTING;
732   if (app_worker_accept_notify (server_wrk, ss))
733     {
734       ct_session_connect_notify (ss, SESSION_E_REFUSED);
735       ct_session_dealloc_fifos (sct, ss->rx_fifo, ss->tx_fifo);
736       ct_connection_free (sct);
737       session_free (ss);
738     }
739 }
740
741 static int
742 ct_connect (app_worker_t * client_wrk, session_t * ll,
743             session_endpoint_cfg_t * sep)
744 {
745   u32 thread_index, ho_index;
746   ct_main_t *cm = &ct_main;
747   ct_connection_t *ho;
748
749   /* Simple round-robin policy for spreading sessions over workers. We skip
750    * thread index 0, i.e., offset the index by 1, when we have workers as it
751    * is the one dedicated to main thread. Note that n_workers does not include
752    * main thread */
753   cm->n_sessions += 1;
754   thread_index = cm->n_workers ? (cm->n_sessions % cm->n_workers) + 1 : 0;
755
756   /*
757    * Alloc and init client half-open transport
758    */
759
760   ho = ct_half_open_alloc ();
761   ho_index = ho->c_c_index;
762   ho->c_rmt_port = sep->port;
763   ho->c_lcl_port = 0;
764   ho->c_is_ip4 = sep->is_ip4;
765   ho->client_opaque = sep->opaque;
766   ho->client_wrk = client_wrk->wrk_index;
767   ho->peer_index = ll->session_index;
768   ho->c_proto = TRANSPORT_PROTO_NONE;
769   ho->c_flags |= TRANSPORT_CONNECTION_F_NO_LOOKUP;
770   clib_memcpy (&ho->c_rmt_ip, &sep->ip, sizeof (sep->ip));
771   ho->flags |= CT_CONN_F_CLIENT;
772   ho->c_s_index = ~0;
773   ho->actual_tp = sep->original_tp;
774
775   /*
776    * Accept connection on thread selected above. Connected reply comes
777    * after server accepts the connection.
778    */
779
780   session_send_rpc_evt_to_thread_force (thread_index,
781                                         ct_accept_rpc_wrk_handler,
782                                         uword_to_pointer (ho_index, void *));
783
784   return ho_index;
785 }
786
787 static u32
788 ct_start_listen (u32 app_listener_index, transport_endpoint_t * tep)
789 {
790   session_endpoint_cfg_t *sep;
791   ct_connection_t *ct;
792
793   sep = (session_endpoint_cfg_t *) tep;
794   ct = ct_connection_alloc (0);
795   ct->server_wrk = sep->app_wrk_index;
796   ct->c_is_ip4 = sep->is_ip4;
797   clib_memcpy (&ct->c_lcl_ip, &sep->ip, sizeof (sep->ip));
798   ct->c_lcl_port = sep->port;
799   ct->c_s_index = app_listener_index;
800   ct->actual_tp = sep->transport_proto;
801   return ct->c_c_index;
802 }
803
804 static u32
805 ct_stop_listen (u32 ct_index)
806 {
807   ct_connection_t *ct;
808   ct = ct_connection_get (ct_index, 0);
809   ct_connection_free (ct);
810   return 0;
811 }
812
813 static transport_connection_t *
814 ct_listener_get (u32 ct_index)
815 {
816   return (transport_connection_t *) ct_connection_get (ct_index, 0);
817 }
818
819 static transport_connection_t *
820 ct_half_open_get (u32 ct_index)
821 {
822   return (transport_connection_t *) ct_connection_get (ct_index, 0);
823 }
824
825 static void
826 ct_session_cleanup (u32 conn_index, u32 thread_index)
827 {
828   ct_connection_t *ct, *peer_ct;
829
830   ct = ct_connection_get (conn_index, thread_index);
831   if (!ct)
832     return;
833
834   peer_ct = ct_connection_get (ct->peer_index, thread_index);
835   if (peer_ct)
836     peer_ct->peer_index = ~0;
837
838   ct_connection_free (ct);
839 }
840
841 static void
842 ct_cleanup_ho (u32 ho_index)
843 {
844   ct_connection_free (ct_connection_get (ho_index, 0));
845 }
846
847 static int
848 ct_session_connect (transport_endpoint_cfg_t * tep)
849 {
850   session_endpoint_cfg_t *sep_ext;
851   session_endpoint_t _sep, *sep = &_sep;
852   app_worker_t *app_wrk;
853   session_handle_t lh;
854   application_t *app;
855   app_listener_t *al;
856   u32 table_index;
857   session_t *ll;
858   u8 fib_proto;
859
860   sep_ext = (session_endpoint_cfg_t *) tep;
861   _sep = *(session_endpoint_t *) tep;
862   app_wrk = app_worker_get (sep_ext->app_wrk_index);
863   app = application_get (app_wrk->app_index);
864
865   sep->transport_proto = sep_ext->original_tp;
866   table_index = application_local_session_table (app);
867   lh = session_lookup_local_endpoint (table_index, sep);
868   if (lh == SESSION_DROP_HANDLE)
869     return SESSION_E_FILTERED;
870
871   if (lh == SESSION_INVALID_HANDLE)
872     goto global_scope;
873
874   ll = listen_session_get_from_handle (lh);
875   al = app_listener_get_w_session (ll);
876
877   /*
878    * Break loop if rule in local table points to connecting app. This
879    * can happen if client is a generic proxy. Route connect through
880    * global table instead.
881    */
882   if (al->app_index == app->app_index)
883     goto global_scope;
884
885   return ct_connect (app_wrk, ll, sep_ext);
886
887   /*
888    * If nothing found, check the global scope for locally attached
889    * destinations. Make sure first that we're allowed to.
890    */
891
892 global_scope:
893   if (session_endpoint_is_local (sep))
894     return SESSION_E_NOROUTE;
895
896   if (!application_has_global_scope (app))
897     return SESSION_E_SCOPE;
898
899   fib_proto = session_endpoint_fib_proto (sep);
900   table_index = session_lookup_get_index_for_fib (fib_proto, sep->fib_index);
901   ll = session_lookup_listener_wildcard (table_index, sep);
902
903   /* Avoid connecting app to own listener */
904   if (ll && ll->app_index != app->app_index)
905     return ct_connect (app_wrk, ll, sep_ext);
906
907   /* Failed to connect but no error */
908   return SESSION_E_LOCAL_CONNECT;
909 }
910
911 static void
912 ct_session_postponed_cleanup (ct_connection_t *ct)
913 {
914   app_worker_t *app_wrk;
915   session_t *s;
916
917   s = session_get (ct->c_s_index, ct->c_thread_index);
918   app_wrk = app_worker_get_if_valid (s->app_wrk_index);
919
920   if (ct->flags & CT_CONN_F_CLIENT)
921     {
922       if (app_wrk)
923         app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_TRANSPORT);
924
925       /* Normal free for client session as the fifos are allocated through
926        * the connects segment manager in a segment that's not shared with
927        * the server */
928       ct_session_dealloc_fifos (ct, ct->client_rx_fifo, ct->client_tx_fifo);
929       session_free_w_fifos (s);
930     }
931   else
932     {
933       /* Manual session and fifo segment cleanup to avoid implicit
934        * segment manager cleanups and notifications */
935       app_wrk = app_worker_get_if_valid (s->app_wrk_index);
936       if (app_wrk)
937         {
938           app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_TRANSPORT);
939           app_worker_cleanup_notify (app_wrk, s, SESSION_CLEANUP_SESSION);
940         }
941
942       ct_session_dealloc_fifos (ct, s->rx_fifo, s->tx_fifo);
943       session_free (s);
944     }
945
946   ct_connection_free (ct);
947 }
948
949 static void
950 ct_handle_cleanups (void *args)
951 {
952   uword thread_index = pointer_to_uword (args);
953   const u32 max_cleanups = 100;
954   ct_cleanup_req_t *req;
955   ct_connection_t *ct;
956   u32 n_to_handle = 0;
957   ct_worker_t *wrk;
958   session_t *s;
959
960   wrk = ct_worker_get (thread_index);
961   wrk->have_cleanups = 0;
962   n_to_handle = clib_fifo_elts (wrk->pending_cleanups);
963   n_to_handle = clib_min (n_to_handle, max_cleanups);
964
965   while (n_to_handle)
966     {
967       clib_fifo_sub2 (wrk->pending_cleanups, req);
968       ct = ct_connection_get (req->ct_index, thread_index);
969       s = session_get (ct->c_s_index, ct->c_thread_index);
970       if (!svm_fifo_has_event (s->tx_fifo))
971         ct_session_postponed_cleanup (ct);
972       else
973         clib_fifo_add1 (wrk->pending_cleanups, *req);
974       n_to_handle -= 1;
975     }
976
977   if (clib_fifo_elts (wrk->pending_cleanups))
978     {
979       wrk->have_cleanups = 1;
980       session_send_rpc_evt_to_thread_force (
981         thread_index, ct_handle_cleanups,
982         uword_to_pointer (thread_index, void *));
983     }
984 }
985
986 static void
987 ct_program_cleanup (ct_connection_t *ct)
988 {
989   ct_cleanup_req_t *req;
990   uword thread_index;
991   ct_worker_t *wrk;
992
993   thread_index = ct->c_thread_index;
994   wrk = ct_worker_get (ct->c_thread_index);
995
996   clib_fifo_add2 (wrk->pending_cleanups, req);
997   req->ct_index = ct->c_c_index;
998
999   if (wrk->have_cleanups)
1000     return;
1001
1002   wrk->have_cleanups = 1;
1003   session_send_rpc_evt_to_thread_force (
1004     thread_index, ct_handle_cleanups, uword_to_pointer (thread_index, void *));
1005 }
1006
1007 static inline int
1008 ct_close_is_reset (ct_connection_t *ct, session_t *s)
1009 {
1010   if (ct->flags & CT_CONN_F_CLIENT)
1011     return (svm_fifo_max_dequeue (ct->client_rx_fifo) > 0);
1012   else
1013     return (svm_fifo_max_dequeue (s->rx_fifo) > 0);
1014 }
1015
1016 static void
1017 ct_session_close (u32 ct_index, u32 thread_index)
1018 {
1019   ct_connection_t *ct, *peer_ct;
1020   session_t *s;
1021
1022   ct = ct_connection_get (ct_index, thread_index);
1023   s = session_get (ct->c_s_index, ct->c_thread_index);
1024   peer_ct = ct_connection_get (ct->peer_index, thread_index);
1025   if (peer_ct)
1026     {
1027       peer_ct->peer_index = ~0;
1028       /* Make sure session was allocated */
1029       if (peer_ct->flags & CT_CONN_F_HALF_OPEN)
1030         {
1031           ct_session_connect_notify (s, SESSION_E_REFUSED);
1032         }
1033       else if (peer_ct->c_s_index != ~0)
1034         {
1035           if (ct_close_is_reset (ct, s))
1036             session_transport_reset_notify (&peer_ct->connection);
1037           else
1038             session_transport_closing_notify (&peer_ct->connection);
1039         }
1040       else
1041         {
1042           /* should not happen */
1043           clib_warning ("ct peer without session");
1044           ct_connection_free (peer_ct);
1045         }
1046     }
1047
1048   /* Do not send closed notify to make sure pending tx events are
1049    * still delivered and program cleanup */
1050   ct_program_cleanup (ct);
1051 }
1052
1053 static transport_connection_t *
1054 ct_session_get (u32 ct_index, u32 thread_index)
1055 {
1056   return (transport_connection_t *) ct_connection_get (ct_index,
1057                                                        thread_index);
1058 }
1059
1060 static u8 *
1061 format_ct_connection_id (u8 * s, va_list * args)
1062 {
1063   ct_connection_t *ct = va_arg (*args, ct_connection_t *);
1064   if (!ct)
1065     return s;
1066   if (ct->c_is_ip4)
1067     {
1068       s = format (s, "[%d:%d][CT:%U] %U:%d->%U:%d", ct->c_thread_index,
1069                   ct->c_s_index, format_transport_proto_short, ct->actual_tp,
1070                   format_ip4_address, &ct->c_lcl_ip4,
1071                   clib_net_to_host_u16 (ct->c_lcl_port), format_ip4_address,
1072                   &ct->c_rmt_ip4, clib_net_to_host_u16 (ct->c_rmt_port));
1073     }
1074   else
1075     {
1076       s = format (s, "[%d:%d][CT:%U] %U:%d->%U:%d", ct->c_thread_index,
1077                   ct->c_s_index, format_transport_proto_short, ct->actual_tp,
1078                   format_ip6_address, &ct->c_lcl_ip6,
1079                   clib_net_to_host_u16 (ct->c_lcl_port), format_ip6_address,
1080                   &ct->c_rmt_ip6, clib_net_to_host_u16 (ct->c_rmt_port));
1081     }
1082
1083   return s;
1084 }
1085
1086 static int
1087 ct_custom_tx (void *session, transport_send_params_t * sp)
1088 {
1089   session_t *s = (session_t *) session;
1090   if (session_has_transport (s))
1091     return 0;
1092   /* If event enqueued towards peer, remove from scheduler and remove
1093    * session tx flag, i.e., accept new tx events. Unset fifo flag now to
1094    * avoid missing events if peer did not clear fifo flag yet, which is
1095    * interpreted as successful notification and session is descheduled. */
1096   svm_fifo_unset_event (s->tx_fifo);
1097   if (!ct_session_tx (s))
1098     sp->flags = TRANSPORT_SND_F_DESCHED;
1099
1100   /* The scheduler uses packet count as a means of upper bounding the amount
1101    * of work done per dispatch. So make it look like we have sent something */
1102   return 1;
1103 }
1104
1105 static int
1106 ct_app_rx_evt (transport_connection_t * tc)
1107 {
1108   ct_connection_t *ct = (ct_connection_t *) tc, *peer_ct;
1109   session_t *ps;
1110
1111   peer_ct = ct_connection_get (ct->peer_index, tc->thread_index);
1112   if (!peer_ct)
1113     return -1;
1114   ps = session_get (peer_ct->c_s_index, peer_ct->c_thread_index);
1115   return session_dequeue_notify (ps);
1116 }
1117
1118 static u8 *
1119 format_ct_listener (u8 * s, va_list * args)
1120 {
1121   u32 tc_index = va_arg (*args, u32);
1122   u32 __clib_unused thread_index = va_arg (*args, u32);
1123   u32 __clib_unused verbose = va_arg (*args, u32);
1124   ct_connection_t *ct = ct_connection_get (tc_index, 0);
1125   s = format (s, "%-" SESSION_CLI_ID_LEN "U", format_ct_connection_id, ct);
1126   if (verbose)
1127     s = format (s, "%-" SESSION_CLI_STATE_LEN "s", "LISTEN");
1128   return s;
1129 }
1130
1131 static u8 *
1132 format_ct_half_open (u8 *s, va_list *args)
1133 {
1134   u32 ho_index = va_arg (*args, u32);
1135   u32 verbose = va_arg (*args, u32);
1136   ct_connection_t *ct = ct_connection_get (ho_index, 0);
1137   s = format (s, "%-" SESSION_CLI_ID_LEN "U", format_ct_connection_id, ct);
1138   if (verbose)
1139     s = format (s, "%-" SESSION_CLI_STATE_LEN "s", "HALF-OPEN");
1140   return s;
1141 }
1142
1143 static u8 *
1144 format_ct_connection (u8 * s, va_list * args)
1145 {
1146   ct_connection_t *ct = va_arg (*args, ct_connection_t *);
1147   u32 verbose = va_arg (*args, u32);
1148
1149   if (!ct)
1150     return s;
1151   s = format (s, "%-" SESSION_CLI_ID_LEN "U", format_ct_connection_id, ct);
1152   if (verbose)
1153     {
1154       s = format (s, "%-" SESSION_CLI_STATE_LEN "s", "ESTABLISHED");
1155       if (verbose > 1)
1156         {
1157           s = format (s, "\n");
1158         }
1159     }
1160   return s;
1161 }
1162
1163 static u8 *
1164 format_ct_session (u8 * s, va_list * args)
1165 {
1166   u32 ct_index = va_arg (*args, u32);
1167   u32 thread_index = va_arg (*args, u32);
1168   u32 verbose = va_arg (*args, u32);
1169   ct_connection_t *ct;
1170
1171   ct = ct_connection_get (ct_index, thread_index);
1172   if (!ct)
1173     {
1174       s = format (s, "empty\n");
1175       return s;
1176     }
1177
1178   s = format (s, "%U", format_ct_connection, ct, verbose);
1179   return s;
1180 }
1181
1182 clib_error_t *
1183 ct_enable_disable (vlib_main_t * vm, u8 is_en)
1184 {
1185   vlib_thread_main_t *vtm = &vlib_thread_main;
1186   ct_main_t *cm = &ct_main;
1187
1188   cm->n_workers = vlib_num_workers ();
1189   vec_validate (cm->wrk, vtm->n_vlib_mains);
1190   clib_spinlock_init (&cm->ho_reuseable_lock);
1191   clib_rwlock_init (&cm->app_segs_lock);
1192   return 0;
1193 }
1194
1195 /* *INDENT-OFF* */
1196 static const transport_proto_vft_t cut_thru_proto = {
1197   .enable = ct_enable_disable,
1198   .start_listen = ct_start_listen,
1199   .stop_listen = ct_stop_listen,
1200   .get_connection = ct_session_get,
1201   .get_listener = ct_listener_get,
1202   .get_half_open = ct_half_open_get,
1203   .cleanup = ct_session_cleanup,
1204   .cleanup_ho = ct_cleanup_ho,
1205   .connect = ct_session_connect,
1206   .close = ct_session_close,
1207   .custom_tx = ct_custom_tx,
1208   .app_rx_evt = ct_app_rx_evt,
1209   .format_listener = format_ct_listener,
1210   .format_half_open = format_ct_half_open,
1211   .format_connection = format_ct_session,
1212   .transport_options = {
1213     .name = "ct",
1214     .short_name = "C",
1215     .tx_type = TRANSPORT_TX_INTERNAL,
1216     .service_type = TRANSPORT_SERVICE_VC,
1217   },
1218 };
1219 /* *INDENT-ON* */
1220
1221 int
1222 ct_session_tx (session_t * s)
1223 {
1224   ct_connection_t *ct, *peer_ct;
1225   session_t *peer_s;
1226
1227   ct = (ct_connection_t *) session_get_transport (s);
1228   peer_ct = ct_connection_get (ct->peer_index, ct->c_thread_index);
1229   if (!peer_ct)
1230     return 0;
1231   peer_s = session_get (peer_ct->c_s_index, peer_ct->c_thread_index);
1232   if (peer_s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
1233     return 0;
1234   return session_enqueue_notify (peer_s);
1235 }
1236
1237 static clib_error_t *
1238 ct_transport_init (vlib_main_t * vm)
1239 {
1240   transport_register_protocol (TRANSPORT_PROTO_NONE, &cut_thru_proto,
1241                                FIB_PROTOCOL_IP4, ~0);
1242   transport_register_protocol (TRANSPORT_PROTO_NONE, &cut_thru_proto,
1243                                FIB_PROTOCOL_IP6, ~0);
1244   return 0;
1245 }
1246
1247 VLIB_INIT_FUNCTION (ct_transport_init);
1248
1249 /*
1250  * fd.io coding-style-patch-verification: ON
1251  *
1252  * Local Variables:
1253  * eval: (c-set-style "gnu")
1254  * End:
1255  */