vcl: support inter worker rpc
[vpp.git] / src / vnet / session / session_node.c
index 0f5030b..763b789 100644 (file)
@@ -49,9 +49,7 @@ session_mq_listen_handler (void *data)
 
   clib_memset (a, 0, sizeof (*a));
   a->sep.is_ip4 = mp->is_ip4;
-  clib_memcpy_fast (&a->sep.ip, &mp->ip, sizeof (mp->ip));
-  if (mp->is_ip4)
-    ip46_address_mask_ip4 (&a->sep.ip);
+  ip_copy (&a->sep.ip, &mp->ip, mp->is_ip4);
   a->sep.port = mp->port;
   a->sep.fib_index = mp->vrf;
   a->sep.sw_if_index = ENDPOINT_INVALID_INDEX;
@@ -60,6 +58,7 @@ session_mq_listen_handler (void *data)
   a->sep_ext.crypto_engine = mp->crypto_engine;
   a->app_index = app->app_index;
   a->wrk_map_index = mp->wrk_index;
+  a->sep_ext.transport_flags = mp->flags;
 
   if ((rv = vnet_listen (a)))
     clib_warning ("listen returned: %d", rv);
@@ -491,6 +490,33 @@ session_mq_worker_update_handler (void *data)
     app_worker_close_notify (app_wrk, s);
 }
 
+static void
+session_mq_app_wrk_rpc_handler (void *data)
+{
+  session_app_wrk_rpc_msg_t *mp = (session_app_wrk_rpc_msg_t *) data;
+  svm_msg_q_msg_t _msg, *msg = &_msg;
+  session_app_wrk_rpc_msg_t *rmp;
+  app_worker_t *app_wrk;
+  session_event_t *evt;
+  application_t *app;
+
+  app = application_lookup (mp->client_index);
+  if (!app)
+    return;
+
+  app_wrk = application_get_worker (app, mp->wrk_index);
+
+  svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
+                                      SESSION_MQ_CTRL_EVT_RING, SVM_Q_WAIT,
+                                      msg);
+  evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
+  clib_memset (evt, 0, sizeof (*evt));
+  evt->event_type = SESSION_CTRL_EVT_APP_WRK_RPC;
+  rmp = (session_app_wrk_rpc_msg_t *) evt->data;
+  clib_memcpy (rmp->data, mp->data, sizeof (mp->data));
+  svm_msg_q_add_and_unlock (app_wrk->event_queue, msg);
+}
+
 vlib_node_registration_t session_queue_node;
 
 typedef struct
@@ -611,7 +637,13 @@ session_tx_fifo_chain_tail (vlib_main_t * vm, session_tx_context_t * ctx,
                {
                  u32 offset = hdr->data_length + SESSION_CONN_HDR_LEN;
                  svm_fifo_dequeue_drop (f, offset);
+                 if (ctx->left_to_snd > n_bytes_read)
+                   svm_fifo_peek (ctx->s->tx_fifo, 0, sizeof (ctx->hdr),
+                                  (u8 *) & ctx->hdr);
                }
+             else if (ctx->left_to_snd == n_bytes_read)
+               svm_fifo_overwrite_head (ctx->s->tx_fifo, (u8 *) & ctx->hdr,
+                                        sizeof (session_dgram_pre_hdr_t));
            }
          else
            n_bytes_read = svm_fifo_dequeue (ctx->s->tx_fifo,
@@ -690,7 +722,13 @@ session_tx_fill_buffer (vlib_main_t * vm, session_tx_context_t * ctx,
            {
              offset = hdr->data_length + SESSION_CONN_HDR_LEN;
              svm_fifo_dequeue_drop (f, offset);
+             if (ctx->left_to_snd > n_bytes_read)
+               svm_fifo_peek (ctx->s->tx_fifo, 0, sizeof (ctx->hdr),
+                              (u8 *) & ctx->hdr);
            }
+         else if (ctx->left_to_snd == n_bytes_read)
+           svm_fifo_overwrite_head (ctx->s->tx_fifo, (u8 *) & ctx->hdr,
+                                    sizeof (session_dgram_pre_hdr_t));
        }
       else
        {
@@ -758,7 +796,10 @@ session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx,
                               u32 max_segs, u8 peek_data)
 {
   u32 n_bytes_per_buf, n_bytes_per_seg;
+
+  n_bytes_per_buf = vlib_buffer_get_default_data_size (vm);
   ctx->max_dequeue = svm_fifo_max_dequeue_cons (ctx->s->tx_fifo);
+
   if (peek_data)
     {
       /* Offset in rx fifo from where to peek data */
@@ -773,15 +814,49 @@ session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx,
     {
       if (ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM)
        {
+         u32 len, chain_limit;
+
          if (ctx->max_dequeue <= sizeof (ctx->hdr))
            {
              ctx->max_len_to_snd = 0;
              return;
            }
+
          svm_fifo_peek (ctx->s->tx_fifo, 0, sizeof (ctx->hdr),
                         (u8 *) & ctx->hdr);
          ASSERT (ctx->hdr.data_length > ctx->hdr.data_offset);
-         ctx->max_dequeue = ctx->hdr.data_length - ctx->hdr.data_offset;
+         len = ctx->hdr.data_length - ctx->hdr.data_offset;
+
+         /* Process multiple dgrams if smaller than min (buf_space, mss).
+          * This avoids handling multiple dgrams if they require buffer
+          * chains */
+         chain_limit = clib_min (n_bytes_per_buf - TRANSPORT_MAX_HDRS_LEN,
+                                 ctx->sp.snd_mss);
+         if (ctx->hdr.data_length <= chain_limit)
+           {
+             u32 first_dgram_len, dgram_len, offset, max_offset;
+             session_dgram_hdr_t hdr;
+
+             ctx->sp.snd_mss = clib_min (ctx->sp.snd_mss, len);
+             offset = ctx->hdr.data_length + sizeof (session_dgram_hdr_t);
+             first_dgram_len = len;
+             max_offset = clib_min (ctx->max_dequeue, 16 << 10);
+
+             while (offset < max_offset)
+               {
+                 svm_fifo_peek (ctx->s->tx_fifo, offset, sizeof (ctx->hdr),
+                                (u8 *) & hdr);
+                 ASSERT (hdr.data_length > hdr.data_offset);
+                 dgram_len = hdr.data_length - hdr.data_offset;
+                 if (len + dgram_len > ctx->max_dequeue
+                     || first_dgram_len != dgram_len)
+                   break;
+                 len += dgram_len;
+                 offset += sizeof (hdr) + hdr.data_length;
+               }
+           }
+
+         ctx->max_dequeue = len;
        }
     }
   ASSERT (ctx->max_dequeue > 0);
@@ -809,7 +884,6 @@ session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx,
       ctx->max_len_to_snd = max_segs * ctx->sp.snd_mss;
     }
 
-  n_bytes_per_buf = vlib_buffer_get_default_data_size (vm);
   ASSERT (n_bytes_per_buf > TRANSPORT_MAX_HDRS_LEN);
   if (ctx->n_segs_per_evt > 1)
     {
@@ -959,6 +1033,8 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk,
       return SESSION_TX_NO_BUFFERS;
     }
 
+  transport_connection_update_tx_bytes (ctx->tc, ctx->max_len_to_snd);
+
   ctx->left_to_snd = ctx->max_len_to_snd;
   n_left = ctx->n_segs_per_evt;
 
@@ -1032,7 +1108,6 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk,
     vlib_buffer_free (vm, wrk->tx_buffers, n_bufs);
 
   *n_tx_packets += ctx->n_segs_per_evt;
-  transport_connection_update_tx_bytes (ctx->tc, ctx->max_len_to_snd);
 
   SESSION_EVT (SESSION_EVT_DEQ, ctx->s, ctx->max_len_to_snd, ctx->max_dequeue,
               ctx->s->tx_fifo->has_event, wrk->last_vlib_time);
@@ -1048,11 +1123,6 @@ session_tx_fifo_read_and_snd_i (session_worker_t * wrk,
 
   if (!peek_data)
     {
-      /* Fix dgram pre header */
-      if (ctx->transport_vft->transport_options.tx_type == TRANSPORT_TX_DGRAM
-         && ctx->max_len_to_snd < ctx->max_dequeue)
-       svm_fifo_overwrite_head (ctx->s->tx_fifo, (u8 *) & ctx->hdr,
-                                sizeof (session_dgram_pre_hdr_t));
       if (svm_fifo_needs_deq_ntf (ctx->s->tx_fifo, ctx->max_len_to_snd))
        session_dequeue_notify (ctx->s);
     }
@@ -1183,6 +1253,9 @@ session_event_dispatch_ctrl (session_worker_t * wrk, session_evt_elt_t * elt)
     case SESSION_CTRL_EVT_APP_DETACH:
       app_mq_detach_handler (session_evt_ctrl_data (wrk, elt));
       break;
+    case SESSION_CTRL_EVT_APP_WRK_RPC:
+      session_mq_app_wrk_rpc_handler (session_evt_ctrl_data (wrk, elt));
+      break;
     default:
       clib_warning ("unhandled event type %d", e->event_type);
     }