vcl/session: add api for changing session app worker
[vpp.git] / src / vnet / session / session_node.c
index 1457668..880f163 100644 (file)
@@ -29,6 +29,7 @@ session_mq_accepted_reply_handler (void *data)
 {
   session_accepted_reply_msg_t *mp = (session_accepted_reply_msg_t *) data;
   vnet_disconnect_args_t _a = { 0 }, *a = &_a;
+  stream_session_state_t old_state;
   app_worker_t *app_wrk;
   local_session_t *ls;
   stream_session_t *s;
@@ -65,19 +66,28 @@ session_mq_accepted_reply_handler (void *data)
     {
       s = session_get_from_handle_if_valid (mp->handle);
       if (!s)
-       {
-         clib_warning ("session doesn't exist");
-         return;
-       }
+       return;
+
       app_wrk = app_worker_get (s->app_wrk_index);
       if (app_wrk->app_index != mp->context)
        {
          clib_warning ("app doesn't own session");
          return;
        }
+
+      old_state = s->session_state;
       s->session_state = SESSION_STATE_READY;
       if (!svm_fifo_is_empty (s->server_rx_fifo))
        app_worker_lock_and_send_event (app_wrk, s, FIFO_EVENT_APP_RX);
+
+      /* Closed while waiting for app to reply. Resend disconnect */
+      if (old_state >= SESSION_STATE_TRANSPORT_CLOSING)
+       {
+         application_t *app = application_get (app_wrk->app_index);
+         app->cb_fns.session_disconnect_callback (s);
+         s->session_state = old_state;
+         return;
+       }
     }
 }
 
@@ -92,17 +102,17 @@ session_mq_reset_reply_handler (void *data)
   u32 index, thread_index;
 
   mp = (session_reset_reply_msg_t *) data;
-  app = application_lookup (mp->client_index);
+  app = application_lookup (mp->context);
   if (!app)
     return;
 
   session_parse_handle (mp->handle, &index, &thread_index);
   s = session_get_if_valid (index, thread_index);
-  if (!s)
-    {
-      SESSION_DBG ("Invalid session!");
-      return;
-    }
+
+  /* Session was already closed or already cleaned up */
+  if (!s || s->session_state != SESSION_STATE_TRANSPORT_CLOSING)
+    return;
+
   app_wrk = app_worker_get (s->app_wrk_index);
   if (!app_wrk || app_wrk->app_index != app->app_index)
     {
@@ -163,7 +173,7 @@ session_mq_disconnected_handler (void *data)
   svm_msg_q_unlock (app_wrk->event_queue);
   evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
   clib_memset (evt, 0, sizeof (*evt));
-  evt->event_type = SESSION_CTRL_EVT_DISCONNECTED;
+  evt->event_type = SESSION_CTRL_EVT_DISCONNECTED_REPLY;
   rmp = (session_disconnected_reply_msg_t *) evt->data;
   rmp->handle = mp->handle;
   rmp->context = mp->context;
@@ -197,6 +207,86 @@ session_mq_disconnected_reply_handler (void *data)
     }
 }
 
+static void
+session_mq_worker_update_handler (void *data)
+{
+  session_worker_update_msg_t *mp = (session_worker_update_msg_t *) data;
+  session_worker_update_reply_msg_t *rmp;
+  svm_msg_q_msg_t _msg, *msg = &_msg;
+  app_worker_t *app_wrk;
+  u32 owner_app_wrk_map;
+  session_event_t *evt;
+  stream_session_t *s;
+  application_t *app;
+
+  app = application_lookup (mp->client_index);
+  if (!app)
+    return;
+  if (!(s = session_get_from_handle_if_valid (mp->handle)))
+    {
+      clib_warning ("invalid handle %llu", mp->handle);
+      return;
+    }
+  app_wrk = app_worker_get (s->app_wrk_index);
+  if (app_wrk->app_index != app->app_index)
+    {
+      clib_warning ("app %u does not own session %llu", app->app_index,
+                   mp->handle);
+      return;
+    }
+  owner_app_wrk_map = app_wrk->wrk_map_index;
+  app_wrk = application_get_worker (app, mp->wrk_index);
+
+  /* This needs to come from the new owner */
+  if (mp->req_wrk_index == owner_app_wrk_map)
+    {
+      session_req_worker_update_msg_t *wump;
+
+      svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
+                                          SESSION_MQ_CTRL_EVT_RING,
+                                          SVM_Q_WAIT, msg);
+      svm_msg_q_unlock (app_wrk->event_queue);
+      evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
+      clib_memset (evt, 0, sizeof (*evt));
+      evt->event_type = SESSION_CTRL_EVT_REQ_WORKER_UPDATE;
+      wump = (session_req_worker_update_msg_t *) evt->data;
+      wump->session_handle = mp->handle;
+      svm_msg_q_add (app_wrk->event_queue, msg, SVM_Q_WAIT);
+      return;
+    }
+
+  app_worker_own_session (app_wrk, s);
+
+  /*
+   * Send reply
+   */
+  svm_msg_q_lock_and_alloc_msg_w_ring (app_wrk->event_queue,
+                                      SESSION_MQ_CTRL_EVT_RING,
+                                      SVM_Q_WAIT, msg);
+  svm_msg_q_unlock (app_wrk->event_queue);
+  evt = svm_msg_q_msg_data (app_wrk->event_queue, msg);
+  clib_memset (evt, 0, sizeof (*evt));
+  evt->event_type = SESSION_CTRL_EVT_WORKER_UPDATE_REPLY;
+  rmp = (session_worker_update_reply_msg_t *) evt->data;
+  rmp->handle = mp->handle;
+  rmp->rx_fifo = pointer_to_uword (s->server_rx_fifo);
+  rmp->tx_fifo = pointer_to_uword (s->server_tx_fifo);
+  rmp->segment_handle = session_segment_handle (s);
+  svm_msg_q_add (app_wrk->event_queue, msg, SVM_Q_WAIT);
+
+  /*
+   * Retransmit messages that may have been lost
+   */
+  if (!svm_fifo_is_empty (s->server_tx_fifo))
+    session_send_io_evt_to_thread (s->server_tx_fifo, FIFO_EVENT_APP_TX);
+
+  if (!svm_fifo_is_empty (s->server_rx_fifo))
+    app_worker_lock_and_send_event (app_wrk, s, FIFO_EVENT_APP_RX);
+
+  if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSING)
+    app->cb_fns.session_disconnect_callback (s);
+}
+
 vlib_node_registration_t session_queue_node;
 
 typedef struct
@@ -448,7 +538,7 @@ session_tx_not_ready (stream_session_t * s, u8 peek_data)
        * session is not ready or closed */
       if (s->session_state < SESSION_STATE_READY)
        return 1;
-      if (s->session_state == SESSION_STATE_CLOSED)
+      if (s->session_state >= SESSION_STATE_TRANSPORT_CLOSED)
        return 2;
     }
   return 0;
@@ -531,8 +621,7 @@ session_tx_set_dequeue_params (vlib_main_t * vm, session_tx_context_t * ctx,
       ctx->max_len_to_snd = max_segs * ctx->snd_mss;
     }
 
-  n_bytes_per_buf = vlib_buffer_free_list_buffer_size (vm,
-                                                      VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX);
+  n_bytes_per_buf = VLIB_BUFFER_DATA_SIZE;
   ASSERT (n_bytes_per_buf > MAX_HDRS_LEN);
   n_bytes_per_seg = MAX_HDRS_LEN + ctx->snd_mss;
   ctx->n_bufs_per_seg = ceil ((f64) n_bytes_per_seg / n_bytes_per_buf);
@@ -572,9 +661,17 @@ session_tx_fifo_read_and_snd_i (vlib_main_t * vm, vlib_node_runtime_t * node,
   ctx->transport_vft = transport_protocol_get_vft (tp);
   ctx->tc = session_tx_get_transport (ctx, peek_data);
   ctx->snd_mss = ctx->transport_vft->send_mss (ctx->tc);
-  ctx->snd_space =
-    transport_connection_snd_space (ctx->tc, vm->clib_time.last_cpu_time,
-                                   ctx->snd_mss);
+
+  if (PREDICT_FALSE (e->event_type == SESSION_IO_EVT_TX_FLUSH))
+    {
+      if (ctx->transport_vft->flush_data)
+       ctx->transport_vft->flush_data (ctx->tc);
+    }
+
+  ctx->snd_space = transport_connection_snd_space (ctx->tc,
+                                                  vm->clib_time.
+                                                  last_cpu_time,
+                                                  ctx->snd_mss);
   if (ctx->snd_space == 0 || ctx->snd_mss == 0)
     {
       vec_add1 (wrk->pending_event_vector, *e);
@@ -803,6 +900,9 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
        {
          vec_add2 (fifo_events, e, 1);
          svm_msg_q_sub_w_lock (mq, msg);
+         /* Works because reply messages are smaller than a session evt.
+          * If we ever need to support bigger messages this needs to be
+          * fixed */
          clib_memcpy_fast (e, svm_msg_q_msg_data (mq, msg), sizeof (*e));
          svm_msg_q_free_msg (mq, msg);
        }
@@ -828,6 +928,7 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
       e = &fifo_events[i];
       switch (e->event_type)
        {
+       case SESSION_IO_EVT_TX_FLUSH:
        case FIFO_EVENT_APP_TX:
          /* Don't try to send more that one frame per dispatch cycle */
          if (n_tx_packets == VLIB_FRAME_SIZE)
@@ -864,23 +965,24 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
            }
          break;
        case FIFO_EVENT_DISCONNECT:
-         /* Make sure stream disconnects run after the pending list is
-          * drained */
-         s = session_get_from_handle (e->session_handle);
-         if (!e->postponed)
-           {
-             e->postponed = 1;
-             vec_add1 (wrk->pending_disconnects, *e);
-             continue;
-           }
-         /* If tx queue is still not empty, wait */
-         if (svm_fifo_max_dequeue (s->server_tx_fifo))
+         s = session_get_from_handle_if_valid (e->session_handle);
+         if (PREDICT_FALSE (!s))
+           break;
+
+         /* Make sure session disconnects run after the pending list is
+          * drained, i.e., postpone if the first time. If not the first
+          * and the tx queue is still not empty, try to wait for some
+          * dispatch cycles */
+         if (!e->postponed
+             || (e->postponed < 200
+                 && svm_fifo_max_dequeue (s->server_tx_fifo)))
            {
+             e->postponed += 1;
              vec_add1 (wrk->pending_disconnects, *e);
              continue;
            }
 
-         stream_session_disconnect_transport (s);
+         session_transport_close (s);
          break;
        case FIFO_EVENT_BUILTIN_RX:
          s = session_event_get_session (e, thread_index);
@@ -914,6 +1016,9 @@ session_queue_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
        case SESSION_CTRL_EVT_RESET_REPLY:
          session_mq_reset_reply_handler (e->data);
          break;
+       case SESSION_CTRL_EVT_WORKER_UPDATE:
+         session_mq_worker_update_handler (e->data);
+         break;
        default:
          clib_warning ("unhandled event type %d", e->event_type);
        }
@@ -986,7 +1091,8 @@ dump_thread_0_event_queue (void)
 
        case FIFO_EVENT_RPC:
          fformat (stdout, "[%04d] RPC call %llx with %llx\n",
-                  i, (u64) (e->rpc_args.fp), (u64) (e->rpc_args.arg));
+                  i, (u64) (uword) (e->rpc_args.fp),
+                  (u64) (uword) (e->rpc_args.arg));
          break;
 
        default: