u32 context;
vapi_cb_t callback;
void *callback_ctx;
- bool is_dump;
+ vapi_msg_id_t response_id;
+ enum vapi_request_type type;
} vapi_req_t;
static const u32 context_counter_mask = (1 << 31);
}
void
-vapi_store_request (vapi_ctx_t ctx, u32 context, bool is_dump,
- vapi_cb_t callback, void *callback_ctx)
+vapi_store_request (vapi_ctx_t ctx, u32 context, vapi_msg_id_t response_id,
+ enum vapi_request_type request_type, vapi_cb_t callback,
+ void *callback_ctx)
{
assert (!vapi_requests_full (ctx));
/* if the mutex is not held, bad things will happen */
assert (0 != pthread_mutex_trylock (&ctx->requests_mutex));
const int requests_end = vapi_requests_end (ctx);
vapi_req_t *slot = &ctx->requests[requests_end];
- slot->is_dump = is_dump;
+ slot->type = request_type;
+ slot->response_id = response_id;
slot->context = context;
slot->callback = callback;
slot->callback_ctx = callback_ctx;
ctx->vl_input_queue = 0;
}
-int
+static int
vapi_client_connect (vapi_ctx_t ctx, const char *name, int ctx_quota,
int input_queue_size, bool keepalive)
{
return (rv);
}
+static void
+vapi_client_send_disconnect (vapi_ctx_t ctx, u8 do_cleanup)
+{
+ vl_api_memclnt_delete_t *mp;
+ vl_shmem_hdr_t *shmem_hdr;
+ api_main_t *am = vlibapi_get_main ();
+
+ ASSERT (am->vlib_rp);
+ shmem_hdr = am->shmem_hdr;
+ ASSERT (shmem_hdr && shmem_hdr->vl_input_queue);
+
+ mp = vl_msg_api_alloc (sizeof (vl_api_memclnt_delete_t));
+ clib_memset (mp, 0, sizeof (*mp));
+ mp->_vl_msg_id = ntohs (VL_API_MEMCLNT_DELETE);
+ mp->index = ctx->my_client_index;
+ mp->do_cleanup = do_cleanup;
+
+ vl_msg_api_send_shmem (shmem_hdr->vl_input_queue, (u8 *) &mp);
+}
+
+static int
+vapi_client_disconnect (vapi_ctx_t ctx)
+{
+ vl_api_memclnt_delete_reply_t *rp;
+ svm_queue_t *vl_input_queue;
+ time_t begin;
+ msgbuf_t *msgbuf;
+
+ vl_input_queue = ctx->vl_input_queue;
+ vapi_client_send_disconnect (ctx, 0 /* wait for reply */);
+
+ /*
+ * Have to be careful here, in case the client is disconnecting
+ * because e.g. the vlib process died, or is unresponsive.
+ */
+ begin = time (0);
+ while (1)
+ {
+ time_t now;
+
+ now = time (0);
+
+ if (now >= (begin + 2))
+ {
+ clib_warning ("peer unresponsive, give up");
+ ctx->my_client_index = ~0;
+ return -1;
+ }
+ if (svm_queue_sub (vl_input_queue, (u8 *) &rp, SVM_Q_NOWAIT, 0) < 0)
+ continue;
+
+ VL_MSG_API_UNPOISON (rp);
+
+ /* drain the queue */
+ if (ntohs (rp->_vl_msg_id) != VL_API_MEMCLNT_DELETE_REPLY)
+ {
+ clib_warning ("queue drain: %d", ntohs (rp->_vl_msg_id));
+ msgbuf = (msgbuf_t *) ((u8 *) rp - offsetof (msgbuf_t, data));
+ vl_msg_api_handler ((void *) rp, ntohl (msgbuf->data_len));
+ continue;
+ }
+ msgbuf = (msgbuf_t *) ((u8 *) rp - offsetof (msgbuf_t, data));
+ vl_msg_api_handler ((void *) rp, ntohl (msgbuf->data_len));
+ break;
+ }
+
+ vapi_api_name_and_crc_free (ctx);
+ return 0;
+}
+
u32
vapi_api_get_msg_index (vapi_ctx_t ctx, u8 *name_and_crc)
{
}
return VAPI_OK;
fail:
- vl_client_disconnect ();
+ vapi_client_disconnect (ctx);
vl_client_api_unmap ();
return rv;
}
}
return VAPI_OK;
fail:
- vl_client_disconnect ();
+ vapi_client_disconnect (ctx);
return rv;
}
svm_queue_t *vl_input_queue;
time_t begin;
vl_input_queue = ctx->vl_input_queue;
- vl_client_send_disconnect (0 /* wait for reply */);
+ vapi_client_send_disconnect (ctx, 0 /* wait for reply */);
/*
* Have to be careful here, in case the client is disconnecting
svm_queue_t *vl_input_queue;
time_t begin;
vl_input_queue = ctx->vl_input_queue;
- vl_client_send_disconnect (0 /* wait for reply */);
+ vapi_client_send_disconnect (ctx, 0 /* wait for reply */);
/*
* Have to be careful here, in case the client is disconnecting
int payload_offset = vapi_get_payload_offset (id);
void *payload = ((u8 *) msg) + payload_offset;
bool is_last = true;
- if (ctx->requests[tmp].is_dump)
+ switch (ctx->requests[tmp].type)
{
+ case VAPI_REQUEST_STREAM:
+ if (ctx->requests[tmp].response_id == id)
+ {
+ is_last = false;
+ }
+ else
+ {
+ VAPI_DBG ("Stream response ID doesn't match current ID, move to "
+ "next ID");
+ clib_memset (&ctx->requests[tmp], 0,
+ sizeof (ctx->requests[tmp]));
+ ++ctx->requests_start;
+ --ctx->requests_count;
+ if (ctx->requests_start == ctx->requests_size)
+ {
+ ctx->requests_start = 0;
+ }
+ tmp = ctx->requests_start;
+ if (ctx->requests[tmp].context != context)
+ {
+ VAPI_ERR ("Unexpected context %u, expected context %u!",
+ ctx->requests[tmp].context, context);
+ }
+ }
+ break;
+ case VAPI_REQUEST_DUMP:
if (vapi_msg_id_control_ping_reply == id)
{
payload = NULL;
{
is_last = false;
}
+ break;
+ case VAPI_REQUEST_REG:
+ break;
}
if (payload_offset != -1)
{
return VAPI_EINVAL;
}
const vapi_msg_id_t id = ctx->vl_msg_id_to_vapi_msg_t[vpp_id];
+ vapi_get_swap_to_host_func (id) (msg);
if (vapi_verify_msg_size (id, msg, size))
{
vapi_msg_free (ctx, msg);
return VAPI_EINVAL;
}
u32 context;
- vapi_get_swap_to_host_func (id) (msg);
if (vapi_msg_is_with_context (id))
{
context = *(u32 *) (((u8 *) msg) + vapi_get_context_offset (id));
return __vapi_metadata.msgs[id]->name;
}
+void
+vapi_stop_rx_thread (vapi_ctx_t ctx)
+{
+ if (!ctx || !ctx->connected || !ctx->vl_input_queue)
+ {
+ return;
+ }
+
+ vl_client_stop_rx_thread (ctx->vl_input_queue);
+}
/*
* fd.io coding-style-patch-verification: ON
*