Instead of having to wrap dump/detail calls in control ping, send details messages in between a normal
reply / request pair. As expressed in the below service statement.
Example:
service {
rpc map_domains_gets returns map_domains_get_reply
stream map_domain_details;
};
define map_domains_get
{
u32 client_index;
u32 context;
u32 cursor;
};
define map_domains_get_reply
{
u32 context;
i32 retval;
u32 cursor;
};
To avoid blocking the main thread for too long, the replies are now sent in client message queue size
chunks. The reply message returns VNET_API_ERROR_EAGAIN when there is more to read.
The API handler must also include a "cursor" that is used to the next call to the get function.
API handler example:
REPLY_AND_DETAILS_MACRO (VL_API_MAP_DOMAINS_GET_REPLY, mm->domains,
({
send_domain_details (cursor, rp, mp->context);
}));
The macro starts from cursor and iterates through the pool
until vl_api_process_may_suspend() returns true or the iteration
reaches the end of the list.
Client Example:
cursor = 0
d = []
while True:
rv, details = map_domains_get(cursor=cursor)
d += details
if rv.retval == 0 or rv.retval != -165:
break
cursor = rv.cursor
or the convenience iterator:
for x in vpp.details_iter(vpp.api.map_domains_get):
pass
or
list(details_iter(map_domains_get))
Change-Id: Iad9f6b41b0ef886adb584c97708dd91cf552749e
Type: feature
Signed-off-by: Ole Troan <ot@cisco.com>
* limitations under the License.
*/
-option version = "4.1.1";
+option version = "4.2.1";
import "vnet/ip/ip_types.api";
import "vnet/interface_types.api";
/** \brief Get list of map domains
@param client_index - opaque cookie to identify the sender
*/
+service {
+ rpc map_domains_get returns map_domains_get_reply
+ stream map_domain_details;
+};
+
+define map_domains_get
+{
+ u32 client_index;
+ u32 context;
+ u32 cursor;
+};
+
+define map_domains_get_reply
+{
+ u32 context;
+ i32 retval;
+ u32 cursor;
+};
+
define map_domain_dump
{
+ option deprecated="v20.12";
u32 client_index;
u32 context;
};
}
static void
-vl_api_map_domain_dump_t_handler (vl_api_map_domain_dump_t * mp)
+send_domain_details (u32 map_domain_index, vl_api_registration_t * rp,
+ u32 context)
{
+ map_main_t *mm = &map_main;
vl_api_map_domain_details_t *rmp;
+ map_domain_t *d = pool_elt_at_index (mm->domains, map_domain_index);
+
+ /* Make sure every field is initiated (or don't skip the clib_memset()) */
+ map_domain_extra_t *de =
+ vec_elt_at_index (mm->domain_extras, map_domain_index);
+ int tag_len = clib_min (ARRAY_LEN (rmp->tag), vec_len (de->tag) + 1);
+
+ /* *INDENT-OFF* */
+ REPLY_MACRO_DETAILS4(VL_API_MAP_DOMAIN_DETAILS, rp, context,
+ ({
+ rmp->domain_index = htonl (map_domain_index);
+ clib_memcpy (&rmp->ip6_prefix.address, &d->ip6_prefix,
+ sizeof (rmp->ip6_prefix.address));
+ clib_memcpy (&rmp->ip4_prefix.address, &d->ip4_prefix,
+ sizeof (rmp->ip4_prefix.address));
+ clib_memcpy (&rmp->ip6_src.address, &d->ip6_src,
+ sizeof (rmp->ip6_src.address));
+ rmp->ip6_prefix.len = d->ip6_prefix_len;
+ rmp->ip4_prefix.len = d->ip4_prefix_len;
+ rmp->ip6_src.len = d->ip6_src_len;
+ rmp->ea_bits_len = d->ea_bits_len;
+ rmp->psid_offset = d->psid_offset;
+ rmp->psid_length = d->psid_length;
+ rmp->flags = d->flags;
+ rmp->mtu = htons (d->mtu);
+ memcpy (rmp->tag, de->tag, tag_len - 1);
+ rmp->tag[tag_len - 1] = '\0';
+ }));
+ /* *INDENT-ON* */
+}
+
+static void
+vl_api_map_domain_dump_t_handler (vl_api_map_domain_dump_t * mp)
+{
map_main_t *mm = &map_main;
- map_domain_t *d;
- map_domain_extra_t *de;
+ int i;
vl_api_registration_t *reg;
- u32 map_domain_index;
if (pool_elts (mm->domains) == 0)
return;
return;
/* *INDENT-OFF* */
- pool_foreach(d, mm->domains,
+ pool_foreach_index(i, mm->domains,
({
- map_domain_index = d - mm->domains;
- de = vec_elt_at_index(mm->domain_extras, map_domain_index);
- int tag_len = clib_min(ARRAY_LEN(rmp->tag), vec_len(de->tag) + 1);
-
- /* Make sure every field is initiated (or don't skip the clib_memset()) */
- rmp = vl_msg_api_alloc (sizeof (*rmp) + tag_len);
-
- rmp->_vl_msg_id = htons(VL_API_MAP_DOMAIN_DETAILS + mm->msg_id_base);
- rmp->context = mp->context;
- rmp->domain_index = htonl(map_domain_index);
- clib_memcpy(&rmp->ip6_prefix.address, &d->ip6_prefix, sizeof(rmp->ip6_prefix.address));
- clib_memcpy(&rmp->ip4_prefix.address, &d->ip4_prefix, sizeof(rmp->ip4_prefix.address));
- clib_memcpy(&rmp->ip6_src.address, &d->ip6_src, sizeof(rmp->ip6_src.address));
- rmp->ip6_prefix.len = d->ip6_prefix_len;
- rmp->ip4_prefix.len = d->ip4_prefix_len;
- rmp->ip6_src.len = d->ip6_src_len;
- rmp->ea_bits_len = d->ea_bits_len;
- rmp->psid_offset = d->psid_offset;
- rmp->psid_length = d->psid_length;
- rmp->flags = d->flags;
- rmp->mtu = htons(d->mtu);
- memcpy(rmp->tag, de->tag, tag_len-1);
- rmp->tag[tag_len-1] = '\0';
+ send_domain_details(i, reg, mp->context);
+ }));
+ /* *INDENT-ON* */
+}
- vl_api_send_msg (reg, (u8 *) rmp);
+static void
+vl_api_map_domains_get_t_handler (vl_api_map_domains_get_t * mp)
+{
+ map_main_t *mm = &map_main;
+ vl_api_map_domains_get_reply_t *rmp;
+
+ i32 rv = 0;
+
+ if (pool_elts (mm->domains) == 0)
+ return;
+
+ /* *INDENT-OFF* */
+ REPLY_AND_DETAILS_MACRO (VL_API_MAP_DOMAINS_GET_REPLY, mm->domains,
+ ({
+ send_domain_details (cursor, rp, mp->context);
}));
/* *INDENT-ON* */
}
self.assertEqual(rv[0].tag, tag,
"output produced incorrect tag value.")
+ def create_domains(self, ip4_pfx_str, ip6_pfx_str, ip6_src_str):
+ ip4_pfx = ipaddress.ip_network(ip4_pfx_str)
+ ip6_dst = ipaddress.ip_network(ip6_pfx_str)
+ mod = ip4_pfx.num_addresses / 1024
+ indicies = []
+ for i in range(ip4_pfx.num_addresses):
+ rv = self.vapi.map_add_domain(ip6_prefix=ip6_pfx_str,
+ ip4_prefix=str(ip4_pfx[i]) + "/32",
+ ip6_src=ip6_src_str)
+ indicies.append(rv.index)
+ return indicies
+
+ def test_api_map_domains_get(self):
+ # Create a bunch of domains
+ domains = self.create_domains('130.67.0.0/24', '2001::/32',
+ '2001::1/128')
+ self.assertEqual(len(domains), 256)
+
+ d = []
+ cursor = 0
+
+ # Invalid cursor
+ rv, details = self.vapi.map_domains_get(cursor=1234)
+ self.assertEqual(rv.retval, -7)
+
+ # Delete a domain in the middle of walk
+ rv, details = self.vapi.map_domains_get(cursor=0)
+ self.assertEqual(rv.retval, -165)
+ self.vapi.map_del_domain(index=rv.cursor)
+ domains.remove(rv.cursor)
+
+ # Continue at point of deleted cursor
+ rv, details = self.vapi.map_domains_get(cursor=rv.cursor)
+ self.assertEqual(rv.retval, -165)
+
+ d = list(self.vapi.vpp.details_iter(self.vapi.map_domains_get))
+ self.assertEqual(len(d), 255)
+
+ # Clean up
+ for i in domains:
+ self.vapi.map_del_domain(index=i)
+
def test_map_e_udp(self):
""" MAP-E UDP"""
ip6_nh_address="4001::1",
is_add=0)
+
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)
class Service():
- def __init__(self, caller, reply, events=None, stream=False):
+ def __init__(self, caller, reply, events=None, stream_message=None, stream=False):
self.caller = caller
self.reply = reply
self.stream = stream
+ self.stream_message = stream_message
self.events = [] if events is None else events
else:
p[0] = Service(p[2], p[4])
+ def p_service_statement2(self, p):
+ '''service_statement : RPC ID RETURNS ID STREAM ID ';' '''
+ p[0] = Service(p[2], p[4], stream_message=p[6], stream=True)
+
def p_event_list(self, p):
'''event_list : events
| event_list events '''
d = {'reply': e.reply}
if e.stream:
d['stream'] = True
+ if e.stream_message:
+ d['stream_msg'] = e.stream_message
if e.events:
d['events'] = e.events
r[e.caller] = d
vl_api_send_msg (rp, (u8 *)rmp); \
} while(0);
+#define REPLY_MACRO_DETAILS4(t, rp, context, body) \
+do { \
+ rmp = vl_msg_api_alloc (sizeof (*rmp)); \
+ rmp->_vl_msg_id = htons((t)+(REPLY_MSG_ID_BASE)); \
+ rmp->context = context; \
+ do {body;} while (0); \
+ vl_api_send_msg (rp, (u8 *)rmp); \
+} while(0);
+
#define REPLY_MACRO3(t, n, body) \
do { \
vl_api_registration_t *rp; \
vl_api_send_msg (rp, (u8 *)rmp); \
} while(0);
+#define REPLY_AND_DETAILS_MACRO(t, p, body) \
+do { \
+ vl_api_registration_t *rp; \
+ rp = vl_api_client_index_to_registration (mp->client_index); \
+ if (rp == 0) \
+ return; \
+ u32 cursor = clib_net_to_host_u32 (mp->cursor); \
+ vlib_main_t *vm = vlib_get_main (); \
+ f64 start = vlib_time_now (vm); \
+ if (pool_is_free_index (p, cursor)) { \
+ cursor = pool_next_index (p, cursor); \
+ if (cursor == ~0) \
+ rv = VNET_API_ERROR_INVALID_VALUE; \
+ } \
+ while (cursor != ~0) { \
+ do {body;} while (0); \
+ cursor = pool_next_index (p, cursor); \
+ if (vl_api_process_may_suspend (vm, rp, start)) { \
+ if (cursor != ~0) \
+ rv = VNET_API_ERROR_EAGAIN; \
+ break; \
+ } \
+ } \
+ REPLY_MACRO2 (t, ({ \
+ rmp->cursor = clib_host_to_net_u32 (cursor); \
+ })); \
+} while(0);
+
/* "trust, but verify" */
static inline uword
return vl_mem_api_can_send (rp->vl_input_queue);
}
+/*
+ * Suggests to an API handler to relinguish control. Currently limits
+ * an handler to a maximum of 1ms or it earlier if the client queue is
+ * full.
+ *
+ * May be enhanced in the future based on other performance
+ * characteristics of the main thread.
+ */
+#define VL_API_MAX_TIME_IN_HANDLER 0.001 /* 1 ms */
+always_inline int
+vl_api_process_may_suspend (vlib_main_t * vm, vl_api_registration_t * rp,
+ f64 start)
+{
+ /* Is client queue full (leave space for reply message) */
+ if (rp->registration_type <= REGISTRATION_TYPE_SHMEM &&
+ rp->vl_input_queue->cursize + 1 >= rp->vl_input_queue->maxsize)
+ return true;
+ if (vlib_time_now (vm) > start + VL_API_MAX_TIME_IN_HANDLER)
+ return true;
+ return false;
+}
+
always_inline vl_api_registration_t *
vl_api_client_index_to_registration (u32 index)
{
_(LIMIT_EXCEEDED, -162, "limit exceeded") \
_(IKE_NO_PORT, -163, "port not managed by IKE") \
_(UDP_PORT_TAKEN, -164, "UDP port already taken") \
+_(EAGAIN, -165, "Retry stream call with cursor") \
typedef enum
{
# Create function for client side messages.
if name in self.services:
- if 'stream' in self.services[name] and \
- self.services[name]['stream']:
- multipart = True
- else:
- multipart = False
- f = self.make_function(msg, i, multipart, do_async)
+ f = self.make_function(msg, i, self.services[name], do_async)
setattr(self._api, name, FuncWrapper(f))
else:
self.logger.debug(
n[1]['avg'], n[1]['max'])
return s
- def _call_vpp(self, i, msgdef, multipart, **kwargs):
+ def _call_vpp(self, i, msgdef, service, **kwargs):
"""Given a message, send the message and await a reply.
msgdef - the message packing definition
self.transport.write(b)
- if multipart:
- # Send a ping after the request - we use its response
- # to detect that we have seen all results.
- self._control_ping(context)
+ msgreply = service['reply']
+ stream = True if 'stream' in service else False
+ if stream:
+ if 'stream_msg' in service:
+ # New service['reply'] = _reply and service['stream_message'] = _details
+ stream_message = service['stream_msg']
+ modern =True
+ else:
+ # Old service['reply'] = _details
+ stream_message = msgreply
+ msgreply = 'control_ping_reply'
+ modern = False
+ # Send a ping after the request - we use its response
+ # to detect that we have seen all results.
+ self._control_ping(context)
# Block until we get a reply.
rl = []
# Message being queued
self.message_queue.put_nowait(r)
continue
-
- if not multipart:
+ if msgname != msgreply and (stream and (msgname != stream_message)):
+ print('REPLY MISMATCH', msgreply, msgname, stream_message, stream)
+ if not stream:
rl = r
break
- if msgname == 'control_ping_reply':
+ if msgname == msgreply:
+ if modern: # Return both reply and list
+ rl = r, rl
break
rl.append(r)
self.logger, self.read_timeout, self.use_socket,
self.server_address)
+ def details_iter(self, f, **kwargs):
+ cursor = 0
+ while True:
+ kwargs['cursor'] = cursor
+ rv, details = f(**kwargs)
+ #
+ # Convert to yield from details when we only support python 3
+ #
+ for d in details:
+ yield d
+ if rv.retval == 0 or rv.retval != -165:
+ break
+ cursor = rv.cursor
# Provide the old name for backward compatibility.
VPP = VPPApiClient