vapi: support services 91/38491/9
authorStanislav Zaikin <zstaseg@gmail.com>
Thu, 21 Jul 2022 17:07:50 +0000 (19:07 +0200)
committerOle Tr�an <otroan@employees.org>
Fri, 18 Aug 2023 06:09:10 +0000 (06:09 +0000)
Add missing support for

service { rpc X_get returns X_get_reply stream X_details; }

Type: improvement

Change-Id: I27555f61a2974e414cb6554f32c550b8ee5eb037
Signed-off-by: Stanislav Zaikin <stanislav.zaikin@46labs.com>
Signed-off-by: Klement Sekera <klement.sekera@gmail.com>
src/vpp-api/vapi/vapi.c
src/vpp-api/vapi/vapi.hpp
src/vpp-api/vapi/vapi_c_gen.py
src/vpp-api/vapi/vapi_c_test.c
src/vpp-api/vapi/vapi_cpp_gen.py
src/vpp-api/vapi/vapi_cpp_test.cpp
src/vpp-api/vapi/vapi_internal.h
src/vpp-api/vapi/vapi_json_parser.py

index 7700eb0..45241e1 100644 (file)
@@ -63,7 +63,8 @@ typedef struct
   u32 context;
   vapi_cb_t callback;
   void *callback_ctx;
-  bool is_dump;
+  vapi_msg_id_t response_id;
+  enum vapi_request_type type;
 } vapi_req_t;
 
 static const u32 context_counter_mask = (1 << 31);
@@ -137,15 +138,17 @@ vapi_requests_end (vapi_ctx_t ctx)
 }
 
 void
-vapi_store_request (vapi_ctx_t ctx, u32 context, bool is_dump,
-                   vapi_cb_t callback, void *callback_ctx)
+vapi_store_request (vapi_ctx_t ctx, u32 context, vapi_msg_id_t response_id,
+                   enum vapi_request_type request_type, vapi_cb_t callback,
+                   void *callback_ctx)
 {
   assert (!vapi_requests_full (ctx));
   /* if the mutex is not held, bad things will happen */
   assert (0 != pthread_mutex_trylock (&ctx->requests_mutex));
   const int requests_end = vapi_requests_end (ctx);
   vapi_req_t *slot = &ctx->requests[requests_end];
-  slot->is_dump = is_dump;
+  slot->type = request_type;
+  slot->response_id = response_id;
   slot->context = context;
   slot->callback = callback;
   slot->callback_ctx = callback_ctx;
@@ -1116,8 +1119,34 @@ vapi_dispatch_response (vapi_ctx_t ctx, vapi_msg_id_t id,
       int payload_offset = vapi_get_payload_offset (id);
       void *payload = ((u8 *) msg) + payload_offset;
       bool is_last = true;
-      if (ctx->requests[tmp].is_dump)
+      switch (ctx->requests[tmp].type)
        {
+       case VAPI_REQUEST_STREAM:
+         if (ctx->requests[tmp].response_id == id)
+           {
+             is_last = false;
+           }
+         else
+           {
+             VAPI_DBG ("Stream response ID doesn't match current ID, move to "
+                       "next ID");
+             clib_memset (&ctx->requests[tmp], 0,
+                          sizeof (ctx->requests[tmp]));
+             ++ctx->requests_start;
+             --ctx->requests_count;
+             if (ctx->requests_start == ctx->requests_size)
+               {
+                 ctx->requests_start = 0;
+               }
+             tmp = ctx->requests_start;
+             if (ctx->requests[tmp].context != context)
+               {
+                 VAPI_ERR ("Unexpected context %u, expected context %u!",
+                           ctx->requests[tmp].context, context);
+               }
+           }
+         break;
+       case VAPI_REQUEST_DUMP:
          if (vapi_msg_id_control_ping_reply == id)
            {
              payload = NULL;
@@ -1126,6 +1155,9 @@ vapi_dispatch_response (vapi_ctx_t ctx, vapi_msg_id_t id,
            {
              is_last = false;
            }
+         break;
+       case VAPI_REQUEST_REG:
+         break;
        }
       if (payload_offset != -1)
        {
index a1e33a9..58d1706 100644 (file)
@@ -140,6 +140,10 @@ private:
 
   template <typename Req, typename Resp, typename... Args> friend class Dump;
 
+  template <typename Req, typename Resp, typename StreamMessage,
+           typename... Args>
+  friend class Stream;
+
   template <typename M> friend class Event_registration;
 };
 
@@ -451,6 +455,10 @@ private:
 
   template <typename Req, typename Resp, typename... Args> friend class Dump;
 
+  template <typename Req, typename Resp, typename StreamMessage,
+           typename... Args>
+  friend class Stream;
+
   template <typename M> friend class Result_set;
 
   template <typename M> friend class Event_registration;
@@ -497,6 +505,10 @@ template <typename Req, typename Resp, typename... Args> class Request;
 
 template <typename Req, typename Resp, typename... Args> class Dump;
 
+template <typename Req, typename Resp, typename StreamMessage,
+         typename... Args>
+class Stream;
+
 template <class, class = void> struct vapi_has_payload_trait : std::false_type
 {
 };
@@ -627,6 +639,10 @@ private:
 
   template <typename Req, typename Resp, typename... Args> friend class Dump;
 
+  template <typename Req, typename Resp, typename StreamMessage,
+           typename... Args>
+  friend class Stream;
+
   template <typename X> friend class Event_registration;
 
   template <typename X> friend class Result_set;
@@ -772,11 +788,95 @@ private:
   bool complete;
   std::vector<Msg<M>, typename Msg<M>::Msg_allocator> set;
 
+  template <typename Req, typename Resp, typename StreamMessage,
+           typename... Args>
+  friend class Stream;
+
   template <typename Req, typename Resp, typename... Args> friend class Dump;
 
   template <typename X> friend class Event_registration;
 };
 
+/**
+ * Class representing a RPC request - zero or more identical responses to a
+ * single request message with a response
+ */
+template <typename Req, typename Resp, typename StreamMessage,
+         typename... Args>
+class Stream : public Common_req
+{
+public:
+  Stream (
+    Connection &con, Args... args,
+    std::function<vapi_error_e (Stream<Req, Resp, StreamMessage, Args...> &)>
+      cb = nullptr)
+      : Common_req{ con }, request{ con, vapi_alloc<Req> (con, args...) },
+       response{ con, nullptr }, result_set{ con }, callback{ cb }
+  {
+  }
+
+  Stream (const Stream &) = delete;
+
+  virtual ~Stream () {}
+
+  virtual std::tuple<vapi_error_e, bool>
+  assign_response (vapi_msg_id_t id, void *shm_data)
+  {
+    if (id == response.get_msg_id ())
+      {
+       response.assign_response (id, shm_data);
+       result_set.mark_complete ();
+       set_response_state (RESPONSE_READY);
+       if (nullptr != callback)
+         {
+           return std::make_pair (callback (*this), true);
+         }
+       return std::make_pair (VAPI_OK, true);
+      }
+    else
+      {
+       result_set.assign_response (id, shm_data);
+      }
+    return std::make_pair (VAPI_OK, false);
+  }
+
+  vapi_error_e
+  execute ()
+  {
+    return con.send (this);
+  }
+
+  const Msg<Req> &
+  get_request (void)
+  {
+    return request;
+  }
+
+  const Msg<Resp> &
+  get_response (void)
+  {
+    return response;
+  }
+
+  using resp_type = typename Msg<StreamMessage>::shm_data_type;
+
+  const Result_set<StreamMessage> &
+  get_result_set (void) const
+  {
+    return result_set;
+  }
+
+private:
+  Msg<Req> request;
+  Msg<Resp> response;
+  Result_set<StreamMessage> result_set;
+  std::function<vapi_error_e (Stream<Req, Resp, StreamMessage, Args...> &)>
+    callback;
+
+  friend class Connection;
+  friend class Result_set<StreamMessage>;
+};
+
 /**
  * Class representing a dump request - zero or more identical responses to a
  * single request message
index 84d9aca..37f5ac1 100755 (executable)
@@ -477,7 +477,7 @@ class CMessage(Message):
                 {{
                   VAPI_ERR("Truncated '{self.name}' msg received, received %lu"
                     "bytes, expected %lu bytes.", buf_size,
-                    sizeof({self.get_calc_msg_size_func_name()}));
+                    {self.get_calc_msg_size_func_name()}(msg));
                   return -1;
                 }}
               return 0;
@@ -615,45 +615,66 @@ class CMessage(Message):
         return "vapi_%s" % self.name
 
     def get_op_func_decl(self):
-        if self.reply.has_payload():
-            return "vapi_error_e %s(%s)" % (
-                self.get_op_func_name(),
-                ",\n  ".join(
-                    [
-                        "struct vapi_ctx_s *ctx",
-                        "%s *msg" % self.get_c_name(),
-                        "vapi_error_e (*callback)(struct vapi_ctx_s *ctx",
-                        "                         void *callback_ctx",
-                        "                         vapi_error_e rv",
-                        "                         bool is_last",
-                        "                         %s *reply)"
-                        % self.reply.get_payload_struct_name(),
-                        "void *callback_ctx",
-                    ]
-                ),
-            )
-        else:
-            return "vapi_error_e %s(%s)" % (
-                self.get_op_func_name(),
-                ",\n  ".join(
-                    [
-                        "struct vapi_ctx_s *ctx",
-                        "%s *msg" % self.get_c_name(),
-                        "vapi_error_e (*callback)(struct vapi_ctx_s *ctx",
-                        "                         void *callback_ctx",
-                        "                         vapi_error_e rv",
-                        "                         bool is_last)",
-                        "void *callback_ctx",
-                    ]
-                ),
-            )
+        stream_param_lines = []
+        if self.has_stream_msg:
+            stream_param_lines = [
+                "vapi_error_e (*details_callback)(struct vapi_ctx_s *ctx",
+                "                                 void *callback_ctx",
+                "                                 vapi_error_e rv",
+                "                                 bool is_last",
+                "                                 %s *details)"
+                % self.stream_msg.get_payload_struct_name(),
+                "void *details_callback_ctx",
+            ]
+
+        return "vapi_error_e %s(%s)" % (
+            self.get_op_func_name(),
+            ",\n  ".join(
+                [
+                    "struct vapi_ctx_s *ctx",
+                    "%s *msg" % self.get_c_name(),
+                    "vapi_error_e (*reply_callback)(struct vapi_ctx_s *ctx",
+                    "                               void *callback_ctx",
+                    "                               vapi_error_e rv",
+                    "                               bool is_last",
+                    "                               %s *reply)"
+                    % self.reply.get_payload_struct_name(),
+                ]
+                + [
+                    "void *reply_callback_ctx",
+                ]
+                + stream_param_lines
+            ),
+        )
 
     def get_op_func_def(self):
+        param_check_lines = ["  if (!msg || !reply_callback) {"]
+        store_request_lines = [
+            "    vapi_store_request(ctx, req_context, %s, %s, "
+            % (
+                self.reply.get_msg_id_name(),
+                "VAPI_REQUEST_DUMP" if self.reply_is_stream else "VAPI_REQUEST_REG",
+            ),
+            "                       (vapi_cb_t)reply_callback, reply_callback_ctx);",
+        ]
+        if self.has_stream_msg:
+            param_check_lines = [
+                "  if (!msg || !reply_callback || !details_callback) {"
+            ]
+            store_request_lines = [
+                f"    vapi_store_request(ctx, req_context, {self.stream_msg.get_msg_id_name()}, VAPI_REQUEST_STREAM, ",
+                "                       (vapi_cb_t)details_callback, details_callback_ctx);",
+                f"    vapi_store_request(ctx, req_context, {self.reply.get_msg_id_name()}, VAPI_REQUEST_REG, ",
+                "                       (vapi_cb_t)reply_callback, reply_callback_ctx);",
+            ]
+
         return "\n".join(
             [
                 "%s" % self.get_op_func_decl(),
                 "{",
-                "  if (!msg || !callback) {",
+            ]
+            + param_check_lines
+            + [
                 "    return VAPI_EINVAL;",
                 "  }",
                 "  if (vapi_is_nonblocking(ctx) && vapi_requests_full(ctx)) {",
@@ -669,14 +690,12 @@ class CMessage(Message):
                 (
                     "  if (VAPI_OK == (rv = vapi_send_with_control_ping "
                     "(ctx, msg, req_context))) {"
-                    if self.reply_is_stream
+                    if (self.reply_is_stream and not self.has_stream_msg)
                     else "  if (VAPI_OK == (rv = vapi_send (ctx, msg))) {"
                 ),
-                (
-                    "    vapi_store_request(ctx, req_context, %s, "
-                    "(vapi_cb_t)callback, callback_ctx);"
-                    % ("true" if self.reply_is_stream else "false")
-                ),
+            ]
+            + store_request_lines
+            + [
                 "    if (VAPI_OK != vapi_producer_unlock (ctx)) {",
                 "      abort (); /* this really shouldn't happen */",
                 "    }",
@@ -792,6 +811,8 @@ def emit_definition(parser, json_file, emitted, o):
             emit_definition(parser, json_file, emitted, x)
     if hasattr(o, "reply"):
         emit_definition(parser, json_file, emitted, o.reply)
+    if hasattr(o, "stream_msg"):
+        emit_definition(parser, json_file, emitted, o.stream_msg)
     if hasattr(o, "get_c_def"):
         if (
             o not in parser.enums_by_json[json_file]
@@ -820,14 +841,14 @@ def emit_definition(parser, json_file, emitted, o):
             print("%s%s" % (function_attrs, o.get_calc_msg_size_func_def()))
             print("")
             print("%s%s" % (function_attrs, o.get_verify_msg_size_func_def()))
-            if not o.is_reply and not o.is_event:
+            if not o.is_reply and not o.is_event and not o.is_stream:
                 print("")
                 print("%s%s" % (function_attrs, o.get_alloc_func_def()))
                 print("")
                 print("%s%s" % (function_attrs, o.get_op_func_def()))
             print("")
             print("%s" % o.get_c_constructor())
-            if o.is_reply or o.is_event:
+            if (o.is_reply or o.is_event) and not o.is_stream:
                 print("")
                 print("%s%s;" % (function_attrs, o.get_event_cb_func_def()))
         elif hasattr(o, "get_swap_to_be_func_def"):
index 99a93fb..5eccb0f 100644 (file)
@@ -28,6 +28,7 @@
 #include <vapi/vlib.api.vapi.h>
 #include <vapi/vpe.api.vapi.h>
 #include <vapi/interface.api.vapi.h>
+#include <vapi/mss_clamp.api.vapi.h>
 #include <vapi/l2.api.vapi.h>
 #include <fake.api.vapi.h>
 
@@ -36,6 +37,7 @@
 
 DEFINE_VAPI_MSG_IDS_VPE_API_JSON;
 DEFINE_VAPI_MSG_IDS_INTERFACE_API_JSON;
+DEFINE_VAPI_MSG_IDS_MSS_CLAMP_API_JSON;
 DEFINE_VAPI_MSG_IDS_L2_API_JSON;
 DEFINE_VAPI_MSG_IDS_FAKE_API_JSON;
 
@@ -481,6 +483,48 @@ sw_interface_dump_cb (struct vapi_ctx_s *ctx, void *callback_ctx,
   return VAPI_OK;
 }
 
+vapi_error_e
+vapi_mss_clamp_enable_disable_reply_cb (
+  struct vapi_ctx_s *ctx, void *callback_ctx, vapi_error_e rv, bool is_last,
+  vapi_payload_mss_clamp_enable_disable_reply *reply)
+{
+  bool *x = callback_ctx;
+  *x = true;
+  return VAPI_OK;
+}
+
+vapi_error_e
+vapi_mss_clamp_get_reply_cb (struct vapi_ctx_s *ctx, void *callback_ctx,
+                            vapi_error_e rv, bool is_last,
+                            vapi_payload_mss_clamp_get_reply *reply)
+{
+  int *counter = callback_ctx;
+  ck_assert_int_gt (*counter, 0); // make sure details were called first
+  ++*counter;
+  ck_assert_int_eq (is_last, true);
+  printf ("Got mss clamp reply error %d\n", rv);
+  ck_assert_int_eq (rv, VAPI_OK);
+  printf ("counter is %d", *counter);
+  return VAPI_OK;
+}
+
+vapi_error_e
+vapi_mss_clamp_get_details_cb (struct vapi_ctx_s *ctx, void *callback_ctx,
+                              vapi_error_e rv, bool is_last,
+                              vapi_payload_mss_clamp_details *details)
+{
+  int *counter = callback_ctx;
+  ++*counter;
+  if (!is_last)
+    {
+      printf ("Got ipv4 mss clamp to %u for sw_if_index %u\n",
+             details->ipv4_mss, details->sw_if_index);
+      ck_assert_int_eq (details->ipv4_mss, 1000 + details->sw_if_index);
+    }
+  printf ("counter is %d", *counter);
+  return VAPI_OK;
+}
+
 START_TEST (test_loopbacks_1)
 {
   printf ("--- Create/delete loopbacks using blocking API ---\n");
@@ -521,6 +565,37 @@ START_TEST (test_loopbacks_1)
              mac_addresses[i][3], mac_addresses[i][4], mac_addresses[i][5],
              sw_if_indexes[i]);
     }
+
+  { // new context
+    for (int i = 0; i < num_ifs; ++i)
+      {
+       vapi_msg_mss_clamp_enable_disable *mc =
+         vapi_alloc_mss_clamp_enable_disable (ctx);
+       mc->payload.sw_if_index = sw_if_indexes[i];
+       mc->payload.ipv4_mss = 1000 + sw_if_indexes[i];
+       mc->payload.ipv4_direction = MSS_CLAMP_DIR_RX;
+       bool reply_ctx = false;
+       printf ("Set ipv4 mss clamp to %u for sw_if_index %u\n",
+               mc->payload.ipv4_mss, mc->payload.sw_if_index);
+       vapi_error_e rv = vapi_mss_clamp_enable_disable (
+         ctx, mc, vapi_mss_clamp_enable_disable_reply_cb, &reply_ctx);
+       ck_assert_int_eq (VAPI_OK, rv);
+       ck_assert_int_eq (reply_ctx, true);
+      }
+  }
+
+  { // new context
+    int counter = 0;
+    vapi_msg_mss_clamp_get *msg = vapi_alloc_mss_clamp_get (ctx);
+    msg->payload.sw_if_index = ~0;
+    vapi_error_e rv =
+      vapi_mss_clamp_get (ctx, msg, vapi_mss_clamp_get_reply_cb, &counter,
+                         vapi_mss_clamp_get_details_cb, &counter);
+    printf ("counter is %d", counter);
+    ck_assert_int_eq (VAPI_OK, rv);
+    ck_assert_int_eq (counter, num_ifs + 1);
+  }
+
   bool seen[num_ifs];
   sw_interface_dump_ctx dctx = { false, num_ifs, sw_if_indexes, seen, 0 };
   vapi_msg_sw_interface_dump *dump;
index 33744a3..165730c 100755 (executable)
@@ -96,6 +96,13 @@ class CppMessage(CMessage):
         return "%s%s" % (self.name[0].upper(), self.name[1:])
 
     def get_req_template_name(self):
+        if self.has_stream_msg:
+            return "Stream<%s, %s, %s>" % (
+                self.get_c_name(),
+                self.reply.get_c_name(),
+                self.stream_msg.get_c_name(),
+            )
+
         if self.reply_is_stream:
             template = "Dump"
         else:
@@ -196,7 +203,7 @@ def gen_json_header(parser, logger, j, io, gen_h_prefix, add_debug_comments):
             print("/* m.get_cpp_constructor() */")
         print("%s" % m.get_cpp_constructor())
         print("")
-        if not m.is_reply and not m.is_event:
+        if not m.is_reply and not m.is_event and not m.is_stream:
             if add_debug_comments:
                 print("/* m.get_alloc_template_instantiation() */")
             print("%s" % m.get_alloc_template_instantiation())
@@ -210,6 +217,8 @@ def gen_json_header(parser, logger, j, io, gen_h_prefix, add_debug_comments):
                 print("/* m.get_reply_type_alias() */")
             print("%s" % m.get_reply_type_alias())
             continue
+        if m.is_stream:
+            continue
         if add_debug_comments:
             print("/* m.get_req_template_instantiation() */")
         print("%s" % m.get_req_template_instantiation())
index c0e0cdc..25df5b7 100644 (file)
 #include <vapi/vapi.hpp>
 #include <vapi/vpe.api.vapi.hpp>
 #include <vapi/interface.api.vapi.hpp>
+#include <vapi/mss_clamp.api.vapi.hpp>
 #include <fake.api.vapi.hpp>
 
 DEFINE_VAPI_MSG_IDS_VPE_API_JSON;
 DEFINE_VAPI_MSG_IDS_INTERFACE_API_JSON;
+DEFINE_VAPI_MSG_IDS_MSS_CLAMP_API_JSON;
 DEFINE_VAPI_MSG_IDS_FAKE_API_JSON;
 
 static char *app_name = nullptr;
@@ -144,6 +146,51 @@ START_TEST (test_loopbacks_1)
               sw_if_indexes[i]);
     }
 
+  { // new context
+    for (int i = 0; i < num_ifs; ++i)
+      {
+       Mss_clamp_enable_disable d (con);
+       auto &req = d.get_request ().get_payload ();
+       req.sw_if_index = sw_if_indexes[i];
+       req.ipv4_mss = 1420;
+       req.ipv4_direction = vapi_enum_mss_clamp_dir::MSS_CLAMP_DIR_RX;
+       auto rv = d.execute ();
+       ck_assert_int_eq (VAPI_OK, rv);
+       WAIT_FOR_RESPONSE (d, rv);
+       ck_assert_int_eq (VAPI_OK, rv);
+      }
+  }
+
+  { // new context
+    bool seen[num_ifs] = { 0 };
+    Mss_clamp_get d (con);
+    d.get_request ().get_payload ().sw_if_index = ~0;
+    auto rv = d.execute ();
+    ck_assert_int_eq (VAPI_OK, rv);
+    WAIT_FOR_RESPONSE (d, rv);
+    ck_assert_int_eq (VAPI_OK, rv);
+    auto &rs = d.get_result_set ();
+    for (auto &r : rs)
+      {
+       auto &p = r.get_payload ();
+       ck_assert_int_eq (p.ipv4_mss, 1420);
+       printf ("tcp-clamp: sw_if_idx %u ip4-mss %d dir %d\n", p.sw_if_index,
+               p.ipv4_mss, p.ipv4_direction);
+       for (int i = 0; i < num_ifs; ++i)
+         {
+           if (sw_if_indexes[i] == p.sw_if_index)
+             {
+               ck_assert_int_eq (0, seen[i]);
+               seen[i] = true;
+             }
+         }
+      }
+    for (int i = 0; i < num_ifs; ++i)
+      {
+       ck_assert_int_eq (1, seen[i]);
+      }
+  }
+
   { // new context
     bool seen[num_ifs] = {0};
     Sw_interface_dump d (con);
index 49c0417..ca47dd1 100644 (file)
@@ -118,8 +118,18 @@ bool vapi_requests_full (vapi_ctx_t ctx);
 size_t vapi_get_request_count (vapi_ctx_t ctx);
 size_t vapi_get_max_request_count (vapi_ctx_t ctx);
 u32 vapi_gen_req_context (vapi_ctx_t ctx);
-void vapi_store_request (vapi_ctx_t ctx, u32 context, bool is_dump,
-                        vapi_cb_t callback, void *callback_ctx);
+
+enum vapi_request_type
+{
+  VAPI_REQUEST_REG = 0,
+  VAPI_REQUEST_DUMP = 1,
+  VAPI_REQUEST_STREAM = 2,
+};
+
+void vapi_store_request (vapi_ctx_t ctx, u32 context,
+                        vapi_msg_id_t response_id,
+                        enum vapi_request_type type, vapi_cb_t callback,
+                        void *callback_ctx);
 int vapi_get_payload_offset (vapi_msg_id_t id);
 void (*vapi_get_swap_to_host_func (vapi_msg_id_t id)) (void *payload);
 void (*vapi_get_swap_to_be_func (vapi_msg_id_t id)) (void *payload);
index 4f29f95..00c234f 100644 (file)
@@ -158,6 +158,7 @@ class Message(object):
         self.header = None
         self.is_reply = json_parser.is_reply(self.name)
         self.is_event = json_parser.is_event(self.name)
+        self.is_stream = json_parser.is_stream(self.name)
         fields = []
         for header in get_msg_header_defs(
             struct_type_class, field_class, json_parser, logger
@@ -346,6 +347,7 @@ class JsonParser(object):
         self.types["string"] = simple_type_class("vl_api_string_t")
         self.replies = set()
         self.events = set()
+        self.streams = set()
         self.simple_type_class = simple_type_class
         self.enum_class = enum_class
         self.union_class = union_class
@@ -384,6 +386,8 @@ class JsonParser(object):
                 if "events" in self.services[k]:
                     for x in self.services[k]["events"]:
                         self.events.add(x)
+                if "stream_msg" in self.services[k]:
+                    self.streams.add(self.services[k]["stream_msg"])
             for e in j["enums"]:
                 name = e[0]
                 value_pairs = e[1:-1]
@@ -521,6 +525,20 @@ class JsonParser(object):
     def is_event(self, message):
         return message in self.events
 
+    def is_stream(self, message):
+        return message in self.streams
+
+    def has_stream_msg(self, message):
+        return (
+            message.name in self.services
+            and "stream_msg" in self.services[message.name]
+        )
+
+    def get_stream_msg(self, message):
+        if not self.has_stream_msg(message):
+            return None
+        return self.messages[self.services[message.name]["stream_msg"]]
+
     def get_reply(self, message):
         return self.messages[self.services[message]["reply"]]
 
@@ -532,13 +550,15 @@ class JsonParser(object):
             remove = []
             for n, m in j.items():
                 try:
-                    if not m.is_reply and not m.is_event:
+                    if not m.is_reply and not m.is_event and not m.is_stream:
                         try:
                             m.reply = self.get_reply(n)
+                            m.reply_is_stream = False
+                            m.has_stream_msg = self.has_stream_msg(m)
                             if "stream" in self.services[m.name]:
                                 m.reply_is_stream = self.services[m.name]["stream"]
-                            else:
-                                m.reply_is_stream = False
+                            if m.has_stream_msg:
+                                m.stream_msg = self.get_stream_msg(m)
                             m.reply.request = m
                         except:
                             raise ParseError("Cannot find reply to message `%s'" % n)