api: add new stream message convention 91/20191/15
authorOle Troan <ot@cisco.com>
Wed, 20 May 2020 13:47:06 +0000 (15:47 +0200)
committerNeale Ranns <nranns@cisco.com>
Mon, 25 May 2020 11:22:34 +0000 (11:22 +0000)
Instead of having to wrap dump/detail calls in control ping, send details messages in between a normal
reply / request pair. As expressed in the below service statement.

Example:

service {
  rpc map_domains_gets returns map_domains_get_reply
    stream map_domain_details;
};

define map_domains_get
{
  u32 client_index;
  u32 context;
  u32 cursor;
};

define map_domains_get_reply
{
  u32 context;
  i32 retval;
  u32 cursor;
};

To avoid blocking the main thread for too long, the replies are now sent in client message queue size
chunks. The reply message returns VNET_API_ERROR_EAGAIN when there is more to read.
The API handler must also include a "cursor" that is used to the next call to the get function.

API handler example:
  REPLY_AND_DETAILS_MACRO (VL_API_MAP_DOMAINS_GET_REPLY, mm->domains,
  ({
    send_domain_details (cursor, rp, mp->context);
  }));

The macro starts from cursor and iterates through the pool
until vl_api_process_may_suspend() returns true or the iteration
reaches the end of the list.

Client Example:

cursor = 0
d = []
while True:
    rv, details = map_domains_get(cursor=cursor)
    d += details
    if rv.retval == 0 or rv.retval != -165:
        break
    cursor = rv.cursor

or the convenience iterator:
for x in vpp.details_iter(vpp.api.map_domains_get):
  pass

or

list(details_iter(map_domains_get))

Change-Id: Iad9f6b41b0ef886adb584c97708dd91cf552749e
Type: feature
Signed-off-by: Ole Troan <ot@cisco.com>
src/plugins/map/map.api
src/plugins/map/map_api.c
src/plugins/map/test/test_map.py
src/tools/vppapigen/vppapigen.py
src/tools/vppapigen/vppapigen_json.py
src/vlibapi/api_helper_macros.h
src/vlibmemory/api.h
src/vnet/api_errno.h
src/vpp-api/python/vpp_papi/vpp_papi.py

index 79deac8..0ae1901 100644 (file)
@@ -13,7 +13,7 @@
  * limitations under the License.
  */
 
-option version = "4.1.1";
+option version = "4.2.1";
 
 import "vnet/ip/ip_types.api";
 import "vnet/interface_types.api";
@@ -91,8 +91,28 @@ autoreply define map_add_del_rule
 /** \brief Get list of map domains
     @param client_index - opaque cookie to identify the sender
 */
+service {
+  rpc map_domains_get returns map_domains_get_reply
+    stream map_domain_details;
+};
+
+define map_domains_get
+{
+  u32 client_index;
+  u32 context;
+  u32 cursor;
+};
+
+define map_domains_get_reply
+{
+  u32 context;
+  i32 retval;
+  u32 cursor;
+};
+
 define map_domain_dump
 {
+  option deprecated="v20.12";
   u32 client_index;
   u32 context;
 };
index 7327732..13f0552 100644 (file)
@@ -86,14 +86,48 @@ vl_api_map_add_del_rule_t_handler (vl_api_map_add_del_rule_t * mp)
 }
 
 static void
-vl_api_map_domain_dump_t_handler (vl_api_map_domain_dump_t * mp)
+send_domain_details (u32 map_domain_index, vl_api_registration_t * rp,
+                    u32 context)
 {
+  map_main_t *mm = &map_main;
   vl_api_map_domain_details_t *rmp;
+  map_domain_t *d = pool_elt_at_index (mm->domains, map_domain_index);
+
+  /* Make sure every field is initiated (or don't skip the clib_memset()) */
+  map_domain_extra_t *de =
+    vec_elt_at_index (mm->domain_extras, map_domain_index);
+  int tag_len = clib_min (ARRAY_LEN (rmp->tag), vec_len (de->tag) + 1);
+
+  /* *INDENT-OFF* */
+  REPLY_MACRO_DETAILS4(VL_API_MAP_DOMAIN_DETAILS, rp, context,
+  ({
+    rmp->domain_index = htonl (map_domain_index);
+    clib_memcpy (&rmp->ip6_prefix.address, &d->ip6_prefix,
+                sizeof (rmp->ip6_prefix.address));
+    clib_memcpy (&rmp->ip4_prefix.address, &d->ip4_prefix,
+                sizeof (rmp->ip4_prefix.address));
+    clib_memcpy (&rmp->ip6_src.address, &d->ip6_src,
+                sizeof (rmp->ip6_src.address));
+    rmp->ip6_prefix.len = d->ip6_prefix_len;
+    rmp->ip4_prefix.len = d->ip4_prefix_len;
+    rmp->ip6_src.len = d->ip6_src_len;
+    rmp->ea_bits_len = d->ea_bits_len;
+    rmp->psid_offset = d->psid_offset;
+    rmp->psid_length = d->psid_length;
+    rmp->flags = d->flags;
+    rmp->mtu = htons (d->mtu);
+    memcpy (rmp->tag, de->tag, tag_len - 1);
+    rmp->tag[tag_len - 1] = '\0';
+  }));
+  /* *INDENT-ON* */
+}
+
+static void
+vl_api_map_domain_dump_t_handler (vl_api_map_domain_dump_t * mp)
+{
   map_main_t *mm = &map_main;
-  map_domain_t *d;
-  map_domain_extra_t *de;
+  int i;
   vl_api_registration_t *reg;
-  u32 map_domain_index;
 
   if (pool_elts (mm->domains) == 0)
     return;
@@ -103,33 +137,28 @@ vl_api_map_domain_dump_t_handler (vl_api_map_domain_dump_t * mp)
     return;
 
   /* *INDENT-OFF* */
-  pool_foreach(d, mm->domains,
+  pool_foreach_index(i, mm->domains,
   ({
-    map_domain_index = d - mm->domains;
-    de = vec_elt_at_index(mm->domain_extras, map_domain_index);
-    int tag_len = clib_min(ARRAY_LEN(rmp->tag), vec_len(de->tag) + 1);
-
-    /* Make sure every field is initiated (or don't skip the clib_memset()) */
-    rmp = vl_msg_api_alloc (sizeof (*rmp) + tag_len);
-
-    rmp->_vl_msg_id = htons(VL_API_MAP_DOMAIN_DETAILS + mm->msg_id_base);
-    rmp->context = mp->context;
-    rmp->domain_index = htonl(map_domain_index);
-    clib_memcpy(&rmp->ip6_prefix.address, &d->ip6_prefix, sizeof(rmp->ip6_prefix.address));
-    clib_memcpy(&rmp->ip4_prefix.address, &d->ip4_prefix, sizeof(rmp->ip4_prefix.address));
-    clib_memcpy(&rmp->ip6_src.address, &d->ip6_src, sizeof(rmp->ip6_src.address));
-    rmp->ip6_prefix.len = d->ip6_prefix_len;
-    rmp->ip4_prefix.len = d->ip4_prefix_len;
-    rmp->ip6_src.len = d->ip6_src_len;
-    rmp->ea_bits_len = d->ea_bits_len;
-    rmp->psid_offset = d->psid_offset;
-    rmp->psid_length = d->psid_length;
-    rmp->flags = d->flags;
-    rmp->mtu = htons(d->mtu);
-    memcpy(rmp->tag, de->tag, tag_len-1);
-    rmp->tag[tag_len-1] = '\0';
+    send_domain_details(i, reg, mp->context);
+  }));
+  /* *INDENT-ON* */
+}
 
-    vl_api_send_msg (reg, (u8 *) rmp);
+static void
+vl_api_map_domains_get_t_handler (vl_api_map_domains_get_t * mp)
+{
+  map_main_t *mm = &map_main;
+  vl_api_map_domains_get_reply_t *rmp;
+
+  i32 rv = 0;
+
+  if (pool_elts (mm->domains) == 0)
+    return;
+
+  /* *INDENT-OFF* */
+  REPLY_AND_DETAILS_MACRO (VL_API_MAP_DOMAINS_GET_REPLY, mm->domains,
+  ({
+    send_domain_details (cursor, rp, mp->context);
   }));
   /* *INDENT-ON* */
 }
index 59c2333..93ea3f0 100644 (file)
@@ -100,6 +100,48 @@ class TestMAP(VppTestCase):
         self.assertEqual(rv[0].tag, tag,
                          "output produced incorrect tag value.")
 
+    def create_domains(self, ip4_pfx_str, ip6_pfx_str, ip6_src_str):
+        ip4_pfx = ipaddress.ip_network(ip4_pfx_str)
+        ip6_dst = ipaddress.ip_network(ip6_pfx_str)
+        mod = ip4_pfx.num_addresses / 1024
+        indicies = []
+        for i in range(ip4_pfx.num_addresses):
+            rv = self.vapi.map_add_domain(ip6_prefix=ip6_pfx_str,
+                                          ip4_prefix=str(ip4_pfx[i]) + "/32",
+                                          ip6_src=ip6_src_str)
+            indicies.append(rv.index)
+        return indicies
+
+    def test_api_map_domains_get(self):
+        # Create a bunch of domains
+        domains = self.create_domains('130.67.0.0/24', '2001::/32',
+                                      '2001::1/128')
+        self.assertEqual(len(domains), 256)
+
+        d = []
+        cursor = 0
+
+        # Invalid cursor
+        rv, details = self.vapi.map_domains_get(cursor=1234)
+        self.assertEqual(rv.retval, -7)
+
+        # Delete a domain in the middle of walk
+        rv, details = self.vapi.map_domains_get(cursor=0)
+        self.assertEqual(rv.retval, -165)
+        self.vapi.map_del_domain(index=rv.cursor)
+        domains.remove(rv.cursor)
+
+        # Continue at point of deleted cursor
+        rv, details = self.vapi.map_domains_get(cursor=rv.cursor)
+        self.assertEqual(rv.retval, -165)
+
+        d = list(self.vapi.vpp.details_iter(self.vapi.map_domains_get))
+        self.assertEqual(len(d), 255)
+
+        # Clean up
+        for i in domains:
+            self.vapi.map_del_domain(index=i)
+
     def test_map_e_udp(self):
         """ MAP-E UDP"""
 
@@ -916,5 +958,6 @@ class TestMAP(VppTestCase):
                                                 ip6_nh_address="4001::1",
                                                 is_add=0)
 
+
 if __name__ == '__main__':
     unittest.main(testRunner=VppTestRunner)
index 06bfbff..94e770e 100755 (executable)
@@ -176,10 +176,11 @@ def vla_is_last_check(name, block):
 
 
 class Service():
-    def __init__(self, caller, reply, events=None, stream=False):
+    def __init__(self, caller, reply, events=None, stream_message=None, stream=False):
         self.caller = caller
         self.reply = reply
         self.stream = stream
+        self.stream_message = stream_message
         self.events = [] if events is None else events
 
 
@@ -511,6 +512,10 @@ class VPPAPIParser(object):
         else:
             p[0] = Service(p[2], p[4])
 
+    def p_service_statement2(self, p):
+        '''service_statement : RPC ID RETURNS ID STREAM ID ';' '''
+        p[0] = Service(p[2], p[4], stream_message=p[6], stream=True)
+
     def p_event_list(self, p):
         '''event_list : events
                       | event_list events '''
index 35dcbca..6e7aaa2 100644 (file)
@@ -26,6 +26,8 @@ def walk_services(s):
         d = {'reply': e.reply}
         if e.stream:
             d['stream'] = True
+        if e.stream_message:
+            d['stream_msg'] = e.stream_message
         if e.events:
             d['events'] = e.events
         r[e.caller] = d
index b19d4f9..5750257 100644 (file)
@@ -90,6 +90,15 @@ do {                                                                    \
     vl_api_send_msg (rp, (u8 *)rmp);                                    \
 } while(0);
 
+#define REPLY_MACRO_DETAILS4(t, rp, context, body)                     \
+do {                                                                    \
+    rmp = vl_msg_api_alloc (sizeof (*rmp));                             \
+    rmp->_vl_msg_id = htons((t)+(REPLY_MSG_ID_BASE));                   \
+    rmp->context = context;                                             \
+    do {body;} while (0);                                               \
+    vl_api_send_msg (rp, (u8 *)rmp);                                    \
+} while(0);
+
 #define REPLY_MACRO3(t, n, body)                                        \
 do {                                                                    \
     vl_api_registration_t *rp;                                          \
@@ -153,6 +162,34 @@ do {                                                                    \
     vl_api_send_msg (rp, (u8 *)rmp);                                    \
 } while(0);
 
+#define REPLY_AND_DETAILS_MACRO(t, p, body)                    \
+do {                                                           \
+  vl_api_registration_t *rp;                                   \
+  rp = vl_api_client_index_to_registration (mp->client_index); \
+  if (rp == 0)                                                 \
+    return;                                                    \
+  u32 cursor = clib_net_to_host_u32 (mp->cursor);              \
+  vlib_main_t *vm = vlib_get_main ();                          \
+  f64 start = vlib_time_now (vm);                              \
+  if (pool_is_free_index (p, cursor)) {                                \
+    cursor = pool_next_index (p, cursor);                      \
+    if (cursor == ~0)                                          \
+      rv = VNET_API_ERROR_INVALID_VALUE;                       \
+  }                                                            \
+  while (cursor != ~0) {                                       \
+    do {body;} while (0);                                      \
+    cursor = pool_next_index (p, cursor);                      \
+    if (vl_api_process_may_suspend (vm, rp, start)) {          \
+      if (cursor != ~0)                                                \
+        rv = VNET_API_ERROR_EAGAIN;                            \
+      break;                                                   \
+    }                                                          \
+  }                                                            \
+  REPLY_MACRO2 (t, ({                                          \
+    rmp->cursor = clib_host_to_net_u32 (cursor);               \
+  }));                                                         \
+} while(0);
+
 /* "trust, but verify" */
 
 static inline uword
index 6cd645b..6628053 100644 (file)
@@ -53,6 +53,28 @@ vl_api_can_send_msg (vl_api_registration_t * rp)
     return vl_mem_api_can_send (rp->vl_input_queue);
 }
 
+/*
+ * Suggests to an API handler to relinguish control. Currently limits
+ * an handler to a maximum of 1ms or it earlier if the client queue is
+ * full.
+ *
+ * May be enhanced in the future based on other performance
+ * characteristics of the main thread.
+ */
+#define VL_API_MAX_TIME_IN_HANDLER 0.001       /* 1 ms */
+always_inline int
+vl_api_process_may_suspend (vlib_main_t * vm, vl_api_registration_t * rp,
+                           f64 start)
+{
+  /* Is client queue full (leave space for reply message) */
+  if (rp->registration_type <= REGISTRATION_TYPE_SHMEM &&
+      rp->vl_input_queue->cursize + 1 >= rp->vl_input_queue->maxsize)
+    return true;
+  if (vlib_time_now (vm) > start + VL_API_MAX_TIME_IN_HANDLER)
+    return true;
+  return false;
+}
+
 always_inline vl_api_registration_t *
 vl_api_client_index_to_registration (u32 index)
 {
index 15b17a8..dd206ce 100644 (file)
@@ -156,6 +156,7 @@ _(MISSING_CERT_KEY, -161, "Missing certifcate or key")                  \
 _(LIMIT_EXCEEDED, -162, "limit exceeded")                               \
 _(IKE_NO_PORT, -163, "port not managed by IKE")                         \
 _(UDP_PORT_TAKEN, -164, "UDP port already taken")                       \
+_(EAGAIN, -165, "Retry stream call with cursor")                        \
 
 typedef enum
 {
index 6c17fa8..1921687 100644 (file)
@@ -472,12 +472,7 @@ class VPPApiClient(object):
 
                 # Create function for client side messages.
                 if name in self.services:
-                    if 'stream' in self.services[name] and \
-                       self.services[name]['stream']:
-                        multipart = True
-                    else:
-                        multipart = False
-                    f = self.make_function(msg, i, multipart, do_async)
+                    f = self.make_function(msg, i, self.services[name], do_async)
                     setattr(self._api, name, FuncWrapper(f))
             else:
                 self.logger.debug(
@@ -644,7 +639,7 @@ class VPPApiClient(object):
                                                            n[1]['avg'], n[1]['max'])
         return s
 
-    def _call_vpp(self, i, msgdef, multipart, **kwargs):
+    def _call_vpp(self, i, msgdef, service, **kwargs):
         """Given a message, send the message and await a reply.
 
         msgdef - the message packing definition
@@ -686,10 +681,21 @@ class VPPApiClient(object):
 
         self.transport.write(b)
 
-        if multipart:
-            # Send a ping after the request - we use its response
-            # to detect that we have seen all results.
-            self._control_ping(context)
+        msgreply = service['reply']
+        stream = True if 'stream' in service else False
+        if stream:
+            if 'stream_msg' in service:
+                # New service['reply'] = _reply and service['stream_message'] = _details
+                stream_message = service['stream_msg']
+                modern =True
+            else:
+                # Old  service['reply'] = _details
+                stream_message = msgreply
+                msgreply = 'control_ping_reply'
+                modern = False
+                # Send a ping after the request - we use its response
+                # to detect that we have seen all results.
+                self._control_ping(context)
 
         # Block until we get a reply.
         rl = []
@@ -702,11 +708,14 @@ class VPPApiClient(object):
                 # Message being queued
                 self.message_queue.put_nowait(r)
                 continue
-
-            if not multipart:
+            if msgname != msgreply and (stream and (msgname != stream_message)):
+                print('REPLY MISMATCH', msgreply, msgname, stream_message, stream)
+            if not stream:
                 rl = r
                 break
-            if msgname == 'control_ping_reply':
+            if msgname == msgreply:
+                if modern: # Return both reply and list
+                    rl = r, rl
                 break
 
             rl.append(r)
@@ -847,6 +856,19 @@ class VPPApiClient(object):
                    self.logger, self.read_timeout, self.use_socket,
                    self.server_address)
 
+    def details_iter(self, f, **kwargs):
+        cursor = 0
+        while True:
+            kwargs['cursor'] = cursor
+            rv, details = f(**kwargs)
+            #
+            # Convert to yield from details when we only support python 3
+            #
+            for d in details:
+                yield d
+            if rv.retval == 0 or rv.retval != -165:
+                break
+            cursor = rv.cursor
 
 # Provide the old name for backward compatibility.
 VPP = VPPApiClient