private:
Connection &con;
- Common_req (Connection &con) : con{con}, response_state{RESPONSE_NOT_READY}
+ Common_req (Connection &con)
+ : con (con), context{0}, response_state{RESPONSE_NOT_READY}
{
}
template <typename Req, typename Resp, typename... Args> friend class Dump;
+ template <typename Req, typename Resp, typename StreamMessage,
+ typename... Args>
+ friend class Stream;
+
template <typename M> friend class Event_registration;
};
* @param name application name
* @param chroot_prefix shared memory prefix
* @param max_queued_request max number of outstanding requests queued
+ * @param handle_keepalives handle memclnt_keepalive automatically
*
* @return VAPI_OK on success, other error code on error
*/
vapi_error_e connect (const char *name, const char *chroot_prefix,
- int max_outstanding_requests, int response_queue_size)
+ int max_outstanding_requests, int response_queue_size,
+ bool handle_keepalives = true)
{
return vapi_connect (vapi_ctx, name, chroot_prefix,
max_outstanding_requests, response_queue_size,
- VAPI_MODE_BLOCKING);
+ VAPI_MODE_BLOCKING, handle_keepalives);
}
/**
*
* @return VAPI_OK on success, other error code on error
*/
- vapi_error_e dispatch (const Common_req *limit = nullptr)
+ vapi_error_e dispatch (const Common_req *limit = nullptr, u32 time = 5)
{
std::lock_guard<std::mutex> lock (dispatch_mutex);
vapi_error_e rv = VAPI_OK;
{
void *shm_data;
size_t shm_data_size;
- rv = vapi_recv (vapi_ctx, &shm_data, &shm_data_size);
+ rv = vapi_recv (vapi_ctx, &shm_data, &shm_data_size, SVM_Q_TIMEDWAIT,
+ time);
if (VAPI_OK != rv)
{
return rv;
template <typename Req, typename Resp, typename... Args> friend class Dump;
+ template <typename Req, typename Resp, typename StreamMessage,
+ typename... Args>
+ friend class Stream;
+
template <typename M> friend class Result_set;
template <typename M> friend class Event_registration;
template <typename Req, typename Resp, typename... Args> class Dump;
+template <typename Req, typename Resp, typename StreamMessage,
+ typename... Args>
+class Stream;
+
template <class, class = void> struct vapi_has_payload_trait : std::false_type
{
};
static void set_msg_id (vapi_msg_id_t id)
{
- assert ((~0 == *msg_id_holder ()) || (id == *msg_id_holder ()));
+ assert ((VAPI_INVALID_MSG_ID == *msg_id_holder ()) ||
+ (id == *msg_id_holder ()));
*msg_id_holder () = id;
}
static vapi_msg_id_t *msg_id_holder ()
{
- static vapi_msg_id_t my_id{~0};
+ static vapi_msg_id_t my_id{VAPI_INVALID_MSG_ID};
return &my_id;
}
- Msg (Connection &con, void *shm_data) throw (Msg_not_available_exception)
- : con{con}
+ Msg (Connection &con, void *shm_data) : con{con}
{
if (!con.is_msg_available (get_msg_id ()))
{
this, shm_data);
}
- void assign_response (vapi_msg_id_t resp_id,
- void *shm_data) throw (Unexpected_msg_id_exception)
+ void assign_response (vapi_msg_id_t resp_id, void *shm_data)
{
assert (nullptr == this->shm_data);
if (resp_id != get_msg_id ())
template <typename Req, typename Resp, typename... Args> friend class Dump;
+ template <typename Req, typename Resp, typename StreamMessage,
+ typename... Args>
+ friend class Stream;
+
template <typename X> friend class Event_registration;
template <typename X> friend class Result_set;
{
public:
Request (Connection &con, Args... args,
- std::function<vapi_error_e (Request<Req, Resp, Args...> &)>
- callback = nullptr)
- : Common_req{con}, callback{callback},
- request{con, vapi_alloc<Req> (con, args...)}, response{con, nullptr}
+ std::function<vapi_error_e (Request<Req, Resp, Args...> &)>
+ callback = nullptr)
+ : Common_req{ con }, callback{ std::move (callback) },
+ request{ con, vapi_alloc<Req> (con, args...) }, response{ con,
+ nullptr }
{
}
complete = true;
}
- void assign_response (vapi_msg_id_t resp_id,
- void *shm_data) throw (Unexpected_msg_id_exception)
+ void assign_response (vapi_msg_id_t resp_id, void *shm_data)
{
if (resp_id != Msg<M>::get_msg_id ())
{
}
}
- Result_set (Connection &con) : con{con}, complete{false}
+ Result_set (Connection &con) : con (con), complete{false}
{
}
bool complete;
std::vector<Msg<M>, typename Msg<M>::Msg_allocator> set;
+ template <typename Req, typename Resp, typename StreamMessage,
+ typename... Args>
+ friend class Stream;
+
template <typename Req, typename Resp, typename... Args> friend class Dump;
template <typename X> friend class Event_registration;
};
+/**
+ * Class representing a RPC request - zero or more identical responses to a
+ * single request message with a response
+ */
+template <typename Req, typename Resp, typename StreamMessage,
+ typename... Args>
+class Stream : public Common_req
+{
+public:
+ Stream (
+ Connection &con, Args... args,
+ std::function<vapi_error_e (Stream<Req, Resp, StreamMessage, Args...> &)>
+ cb = nullptr)
+ : Common_req{ con }, request{ con, vapi_alloc<Req> (con, args...) },
+ response{ con, nullptr }, result_set{ con }, callback{ std::move (cb) }
+ {
+ }
+
+ Stream (const Stream &) = delete;
+
+ virtual ~Stream () {}
+
+ virtual std::tuple<vapi_error_e, bool>
+ assign_response (vapi_msg_id_t id, void *shm_data)
+ {
+ if (id == response.get_msg_id ())
+ {
+ response.assign_response (id, shm_data);
+ result_set.mark_complete ();
+ set_response_state (RESPONSE_READY);
+ if (nullptr != callback)
+ {
+ return std::make_pair (callback (*this), true);
+ }
+ return std::make_pair (VAPI_OK, true);
+ }
+ else
+ {
+ result_set.assign_response (id, shm_data);
+ }
+ return std::make_pair (VAPI_OK, false);
+ }
+
+ vapi_error_e
+ execute ()
+ {
+ return con.send (this);
+ }
+
+ const Msg<Req> &
+ get_request (void)
+ {
+ return request;
+ }
+
+ const Msg<Resp> &
+ get_response (void)
+ {
+ return response;
+ }
+
+ using resp_type = typename Msg<StreamMessage>::shm_data_type;
+
+ const Result_set<StreamMessage> &
+ get_result_set (void) const
+ {
+ return result_set;
+ }
+
+private:
+ Msg<Req> request;
+ Msg<Resp> response;
+ Result_set<StreamMessage> result_set;
+ std::function<vapi_error_e (Stream<Req, Resp, StreamMessage, Args...> &)>
+ callback;
+
+ friend class Connection;
+ friend class Result_set<StreamMessage>;
+};
+
/**
* Class representing a dump request - zero or more identical responses to a
* single request message
{
public:
Dump (Connection &con, Args... args,
- std::function<vapi_error_e (Dump<Req, Resp, Args...> &)> callback =
- nullptr)
- : Common_req{con}, request{con, vapi_alloc<Req> (con, args...)},
- result_set{con}, callback{callback}
+ std::function<vapi_error_e (Dump<Req, Resp, Args...> &)> callback =
+ nullptr)
+ : Common_req{ con }, request{ con, vapi_alloc<Req> (con, args...) },
+ result_set{ con }, callback{ std::move (callback) }
{
}
{
public:
Event_registration (
- Connection &con,
- std::function<vapi_error_e (Event_registration<M> &)> callback =
- nullptr) throw (Msg_not_available_exception)
- : Common_req{con}, result_set{con}, callback{callback}
+ Connection &con,
+ std::function<vapi_error_e (Event_registration<M> &)> callback = nullptr)
+ : Common_req{ con }, result_set{ con }, callback{ std::move (callback) }
{
if (!con.is_msg_available (M::get_msg_id ()))
{