template <typename Req, typename Resp, typename... Args> friend class Dump;
+ template <typename Req, typename Resp, typename StreamMessage,
+ typename... Args>
+ friend class Stream;
+
template <typename M> friend class Event_registration;
};
* @param name application name
* @param chroot_prefix shared memory prefix
* @param max_queued_request max number of outstanding requests queued
+ * @param handle_keepalives handle memclnt_keepalive automatically
*
* @return VAPI_OK on success, other error code on error
*/
vapi_error_e connect (const char *name, const char *chroot_prefix,
- int max_outstanding_requests, int response_queue_size)
+ int max_outstanding_requests, int response_queue_size,
+ bool handle_keepalives = true)
{
return vapi_connect (vapi_ctx, name, chroot_prefix,
max_outstanding_requests, response_queue_size,
- VAPI_MODE_BLOCKING);
+ VAPI_MODE_BLOCKING, handle_keepalives);
}
/**
*
* @return VAPI_OK on success, other error code on error
*/
- vapi_error_e dispatch (const Common_req *limit = nullptr)
+ vapi_error_e dispatch (const Common_req *limit = nullptr, u32 time = 5)
{
std::lock_guard<std::mutex> lock (dispatch_mutex);
vapi_error_e rv = VAPI_OK;
{
void *shm_data;
size_t shm_data_size;
- rv = vapi_recv (vapi_ctx, &shm_data, &shm_data_size);
+ rv = vapi_recv (vapi_ctx, &shm_data, &shm_data_size, SVM_Q_TIMEDWAIT,
+ time);
if (VAPI_OK != rv)
{
return rv;
template <typename Req, typename Resp, typename... Args> friend class Dump;
+ template <typename Req, typename Resp, typename StreamMessage,
+ typename... Args>
+ friend class Stream;
+
template <typename M> friend class Result_set;
template <typename M> friend class Event_registration;
template <typename Req, typename Resp, typename... Args> class Dump;
+template <typename Req, typename Resp, typename StreamMessage,
+ typename... Args>
+class Stream;
+
template <class, class = void> struct vapi_has_payload_trait : std::false_type
{
};
static void set_msg_id (vapi_msg_id_t id)
{
- assert ((INVALID_MSG_ID == *msg_id_holder ()) ||
+ assert ((VAPI_INVALID_MSG_ID == *msg_id_holder ()) ||
(id == *msg_id_holder ()));
*msg_id_holder () = id;
}
static vapi_msg_id_t *msg_id_holder ()
{
- static vapi_msg_id_t my_id{INVALID_MSG_ID};
+ static vapi_msg_id_t my_id{VAPI_INVALID_MSG_ID};
return &my_id;
}
template <typename Req, typename Resp, typename... Args> friend class Dump;
+ template <typename Req, typename Resp, typename StreamMessage,
+ typename... Args>
+ friend class Stream;
+
template <typename X> friend class Event_registration;
template <typename X> friend class Result_set;
bool complete;
std::vector<Msg<M>, typename Msg<M>::Msg_allocator> set;
+ template <typename Req, typename Resp, typename StreamMessage,
+ typename... Args>
+ friend class Stream;
+
template <typename Req, typename Resp, typename... Args> friend class Dump;
template <typename X> friend class Event_registration;
};
+/**
+ * Class representing a RPC request - zero or more identical responses to a
+ * single request message with a response
+ */
+template <typename Req, typename Resp, typename StreamMessage,
+ typename... Args>
+class Stream : public Common_req
+{
+public:
+ Stream (
+ Connection &con, Args... args,
+ std::function<vapi_error_e (Stream<Req, Resp, StreamMessage, Args...> &)>
+ cb = nullptr)
+ : Common_req{ con }, request{ con, vapi_alloc<Req> (con, args...) },
+ response{ con, nullptr }, result_set{ con }, callback{ cb }
+ {
+ }
+
+ Stream (const Stream &) = delete;
+
+ virtual ~Stream () {}
+
+ virtual std::tuple<vapi_error_e, bool>
+ assign_response (vapi_msg_id_t id, void *shm_data)
+ {
+ if (id == response.get_msg_id ())
+ {
+ response.assign_response (id, shm_data);
+ result_set.mark_complete ();
+ set_response_state (RESPONSE_READY);
+ if (nullptr != callback)
+ {
+ return std::make_pair (callback (*this), true);
+ }
+ return std::make_pair (VAPI_OK, true);
+ }
+ else
+ {
+ result_set.assign_response (id, shm_data);
+ }
+ return std::make_pair (VAPI_OK, false);
+ }
+
+ vapi_error_e
+ execute ()
+ {
+ return con.send (this);
+ }
+
+ const Msg<Req> &
+ get_request (void)
+ {
+ return request;
+ }
+
+ const Msg<Resp> &
+ get_response (void)
+ {
+ return response;
+ }
+
+ using resp_type = typename Msg<StreamMessage>::shm_data_type;
+
+ const Result_set<StreamMessage> &
+ get_result_set (void) const
+ {
+ return result_set;
+ }
+
+private:
+ Msg<Req> request;
+ Msg<Resp> response;
+ Result_set<StreamMessage> result_set;
+ std::function<vapi_error_e (Stream<Req, Resp, StreamMessage, Args...> &)>
+ callback;
+
+ friend class Connection;
+ friend class Result_set<StreamMessage>;
+};
+
/**
* Class representing a dump request - zero or more identical responses to a
* single request message