From 630ab5846bceddf8d663e9f488a2dc0378949827 Mon Sep 17 00:00:00 2001 From: Klement Sekera Date: Fri, 19 Jul 2019 09:14:19 +0000 Subject: [PATCH] ip: reassembly: send packet out on correct worker Note which worker received fragment with offset zero and use this worker to send out the reassembled packet. Type: fix Change-Id: I1d3cee16788db3b230682525239c0100d51dc380 Signed-off-by: Klement Sekera --- src/vnet/ip/ip4_reassembly.c | 207 ++++++++++++++++++------- src/vnet/ip/ip6_reassembly.c | 172 ++++++++++++++++----- test/framework.py | 5 +- test/test_reassembly.py | 354 ++++++++++++++++++++++++++++++++++++++++++- 4 files changed, 642 insertions(+), 96 deletions(-) diff --git a/src/vnet/ip/ip4_reassembly.c b/src/vnet/ip/ip4_reassembly.c index b82bafeac54..7159b8a5232 100644 --- a/src/vnet/ip/ip4_reassembly.c +++ b/src/vnet/ip/ip4_reassembly.c @@ -25,6 +25,7 @@ #include #include #include +#include #define MSEC_PER_SEC 1000 #define IP4_REASS_TIMEOUT_DEFAULT_MS 100 @@ -61,6 +62,7 @@ typedef enum IP4_REASS_RC_TOO_MANY_FRAGMENTS, IP4_REASS_RC_INTERNAL_ERROR, IP4_REASS_RC_NO_BUF, + IP4_REASS_RC_HANDOFF, } ip4_reass_rc_t; typedef struct @@ -85,7 +87,7 @@ typedef union struct { u32 reass_index; - u32 thread_index; + u32 memory_owner_thread_index; }; u64 as_u64; } ip4_reass_val_t; @@ -139,10 +141,16 @@ typedef struct u16 min_fragment_length; // number of fragments in this reassembly u32 fragments_n; + // thread owning memory for this context (whose pool contains this ctx) + u32 memory_owner_thread_index; + // thread which received fragment with offset 0 and which sends out the + // completed reassembly + u32 sendout_thread_index; } ip4_reass_t; typedef struct { + // pool of reassembly contexts ip4_reass_t *pool; u32 reass_n; u32 id_counter; @@ -167,7 +175,6 @@ typedef struct // convenience vlib_main_t *vlib_main; - vnet_main_t *vnet_main; // node index of ip4-drop node u32 ip4_drop_idx; @@ -176,7 +183,6 @@ typedef struct /** Worker handoff */ u32 fq_index; u32 fq_feature_index; - } ip4_reass_main_t; extern ip4_reass_main_t ip4_reass_main; @@ -200,6 +206,7 @@ typedef enum RANGE_DISCARD, RANGE_OVERLAP, FINALIZE, + HANDOFF, } ip4_reass_trace_operation_e; typedef struct @@ -219,6 +226,8 @@ typedef struct ip4_reass_range_trace_t trace_range; u32 size_diff; u32 op_id; + u32 thread_id; + u32 thread_id_to; u32 fragment_first; u32 fragment_last; u32 total_data_len; @@ -256,11 +265,17 @@ format_ip4_reass_trace (u8 * s, va_list * args) CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *); CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *); ip4_reass_trace_t *t = va_arg (*args, ip4_reass_trace_t *); - s = format (s, "reass id: %u, op id: %u ", t->reass_id, t->op_id); - u32 indent = format_get_indent (s); - s = format (s, "first bi: %u, data len: %u, ip/fragment[%u, %u]", - t->trace_range.first_bi, t->total_data_len, t->fragment_first, - t->fragment_last); + u32 indent = 0; + if (~0 != t->reass_id) + { + s = format (s, "reass id: %u, op id: %u, ", t->reass_id, t->op_id); + indent = format_get_indent (s); + s = + format (s, + "first bi: %u, data len: %u, ip/fragment[%u, %u]", + t->trace_range.first_bi, t->total_data_len, t->fragment_first, + t->fragment_last); + } switch (t->action) { case RANGE_SHRINK: @@ -283,28 +298,36 @@ format_ip4_reass_trace (u8 * s, va_list * args) case FINALIZE: s = format (s, "\n%Ufinalize reassembly", format_white_space, indent); break; + case HANDOFF: + s = + format (s, "handoff from thread #%u to thread #%u", t->thread_id, + t->thread_id_to); + break; } return s; } static void ip4_reass_add_trace (vlib_main_t * vm, vlib_node_runtime_t * node, - ip4_reass_main_t * rm, ip4_reass_t * reass, u32 bi, - ip4_reass_trace_operation_e action, u32 size_diff) + ip4_reass_main_t * rm, u32 reass_id, u32 op_id, + u32 bi, u32 first_bi, u32 data_len, + ip4_reass_trace_operation_e action, u32 size_diff, + u32 thread_id_to) { vlib_buffer_t *b = vlib_get_buffer (vm, bi); vnet_buffer_opaque_t *vnb = vnet_buffer (b); ip4_reass_trace_t *t = vlib_add_trace (vm, node, b, sizeof (t[0])); - t->reass_id = reass->id; + t->reass_id = reass_id; t->action = action; ip4_reass_trace_details (vm, bi, &t->trace_range); t->size_diff = size_diff; - t->op_id = reass->trace_op_counter; - ++reass->trace_op_counter; + t->op_id = op_id; + t->thread_id = vm->thread_index; + t->thread_id_to = thread_id_to; t->fragment_first = vnb->ip.reass.fragment_first; t->fragment_last = vnb->ip.reass.fragment_last; - t->trace_range.first_bi = reass->first_bi; - t->total_data_len = reass->data_len; + t->trace_range.first_bi = first_bi; + t->total_data_len = data_len; #if 0 static u8 *s = NULL; s = format (s, "%U", format_ip4_reass_trace, NULL, NULL, t); @@ -314,17 +337,22 @@ ip4_reass_add_trace (vlib_main_t * vm, vlib_node_runtime_t * node, #endif } +always_inline void +ip4_reass_free_ctx (ip4_reass_per_thread_t * rt, ip4_reass_t * reass) +{ + pool_put (rt->pool, reass); + --rt->reass_n; +} always_inline void -ip4_reass_free (ip4_reass_main_t * rm, ip4_reass_per_thread_t * rt, - ip4_reass_t * reass) +ip4_reass_free (vlib_main_t * vm, ip4_reass_main_t * rm, + ip4_reass_per_thread_t * rt, ip4_reass_t * reass) { clib_bihash_kv_16_8_t kv; kv.key[0] = reass->key.as_u64[0]; kv.key[1] = reass->key.as_u64[1]; clib_bihash_add_del_16_8 (&rm->hash, &kv, 0); - pool_put (rt->pool, reass); - --rt->reass_n; + return ip4_reass_free_ctx (rt, reass); } always_inline void @@ -396,23 +424,30 @@ ip4_reass_find_or_create (vlib_main_t * vm, vlib_node_runtime_t * node, ip4_reass_main_t * rm, ip4_reass_per_thread_t * rt, ip4_reass_kv_t * kv, u8 * do_handoff) { - ip4_reass_t *reass = NULL; - f64 now = vlib_time_now (rm->vlib_main); + ip4_reass_t *reass; + f64 now; +again: + + reass = NULL; + now = vlib_time_now (vm); if (!clib_bihash_search_16_8 (&rm->hash, (clib_bihash_kv_16_8_t *) kv, (clib_bihash_kv_16_8_t *) kv)) { - if (vm->thread_index != kv->v.thread_index) + reass = + pool_elt_at_index (rm->per_thread_data + [kv->v.memory_owner_thread_index].pool, + kv->v.reass_index); + if (vm->thread_index != reass->memory_owner_thread_index) { *do_handoff = 1; - return NULL; + return reass; } - reass = pool_elt_at_index (rt->pool, kv->v.reass_index); if (now > reass->last_heard + rm->timeout) { ip4_reass_drop_all (vm, node, rm, reass); - ip4_reass_free (rm, rt, reass); + ip4_reass_free (vm, rm, rt, reass); reass = NULL; } } @@ -433,6 +468,7 @@ ip4_reass_find_or_create (vlib_main_t * vm, vlib_node_runtime_t * node, pool_get (rt->pool, reass); clib_memset (reass, 0, sizeof (*reass)); reass->id = ((u64) vm->thread_index * 1000000000) + rt->id_counter; + reass->memory_owner_thread_index = vm->thread_index; ++rt->id_counter; reass->first_bi = ~0; reass->last_packet_octet = ~0; @@ -445,13 +481,18 @@ ip4_reass_find_or_create (vlib_main_t * vm, vlib_node_runtime_t * node, reass->key.as_u64[0] = ((clib_bihash_kv_16_8_t *) kv)->key[0]; reass->key.as_u64[1] = ((clib_bihash_kv_16_8_t *) kv)->key[1]; kv->v.reass_index = (reass - rt->pool); - kv->v.thread_index = vm->thread_index; + kv->v.memory_owner_thread_index = vm->thread_index; reass->last_heard = now; - if (clib_bihash_add_del_16_8 (&rm->hash, (clib_bihash_kv_16_8_t *) kv, 1)) + int rv = + clib_bihash_add_del_16_8 (&rm->hash, (clib_bihash_kv_16_8_t *) kv, 2); + if (rv) { - ip4_reass_free (rm, rt, reass); + ip4_reass_free_ctx (rt, reass); reass = NULL; + // if other worker created a context already work with the other copy + if (-2 == rv) + goto again; } return reass; @@ -615,7 +656,10 @@ ip4_reass_finalize (vlib_main_t * vm, vlib_node_runtime_t * node, first_b->flags &= ~VLIB_BUFFER_EXT_HDR_VALID; if (PREDICT_FALSE (first_b->flags & VLIB_BUFFER_IS_TRACED)) { - ip4_reass_add_trace (vm, node, rm, reass, reass->first_bi, FINALIZE, 0); + ip4_reass_add_trace (vm, node, rm, reass->id, reass->trace_op_counter, + reass->first_bi, reass->first_bi, reass->data_len, + FINALIZE, 0, ~0); + ++reass->trace_op_counter; #if 0 // following code does a hexdump of packet fragments to stdout ... do @@ -654,7 +698,7 @@ ip4_reass_finalize (vlib_main_t * vm, vlib_node_runtime_t * node, } vnet_buffer (first_b)->ip.reass.estimated_mtu = reass->min_fragment_length; *error0 = IP4_ERROR_NONE; - ip4_reass_free (rm, rt, reass); + ip4_reass_free (vm, rm, rt, reass); reass = NULL; return IP4_REASS_RC_OK; } @@ -728,8 +772,11 @@ ip4_reass_remove_range_from_chain (vlib_main_t * vm, u32 to_be_freed_bi = discard_bi; if (PREDICT_FALSE (discard_b->flags & VLIB_BUFFER_IS_TRACED)) { - ip4_reass_add_trace (vm, node, rm, reass, discard_bi, RANGE_DISCARD, - 0); + ip4_reass_add_trace (vm, node, rm, reass->id, + reass->trace_op_counter, discard_bi, + reass->first_bi, reass->data_len, + RANGE_DISCARD, 0, ~0); + ++reass->trace_op_counter; } if (discard_b->flags & VLIB_BUFFER_NEXT_PRESENT) { @@ -753,7 +800,7 @@ always_inline ip4_reass_rc_t ip4_reass_update (vlib_main_t * vm, vlib_node_runtime_t * node, ip4_reass_main_t * rm, ip4_reass_per_thread_t * rt, ip4_reass_t * reass, u32 * bi0, u32 * next0, u32 * error0, - bool is_custom_app) + bool is_custom_app, u32 * handoff_thread_idx) { ip4_reass_rc_t rc = IP4_REASS_RC_OK; int consumed = 0; @@ -794,7 +841,10 @@ ip4_reass_update (vlib_main_t * vm, vlib_node_runtime_t * node, } if (PREDICT_FALSE (fb->flags & VLIB_BUFFER_IS_TRACED)) { - ip4_reass_add_trace (vm, node, rm, reass, *bi0, RANGE_NEW, 0); + ip4_reass_add_trace (vm, node, rm, reass->id, + reass->trace_op_counter, *bi0, reass->first_bi, + reass->data_len, RANGE_NEW, 0, ~0); + ++reass->trace_op_counter; } *bi0 = ~0; reass->min_fragment_length = clib_net_to_host_u16 (fip->length); @@ -848,8 +898,11 @@ ip4_reass_update (vlib_main_t * vm, vlib_node_runtime_t * node, // this fragment is a (sub)part of existing range, ignore it if (PREDICT_FALSE (fb->flags & VLIB_BUFFER_IS_TRACED)) { - ip4_reass_add_trace (vm, node, rm, reass, *bi0, - RANGE_OVERLAP, 0); + ip4_reass_add_trace (vm, node, rm, reass->id, + reass->trace_op_counter, *bi0, + reass->first_bi, reass->data_len, + RANGE_OVERLAP, 0, ~0); + ++reass->trace_op_counter; } break; } @@ -868,9 +921,12 @@ ip4_reass_update (vlib_main_t * vm, vlib_node_runtime_t * node, reass->data_len -= overlap; if (PREDICT_FALSE (fb->flags & VLIB_BUFFER_IS_TRACED)) { - ip4_reass_add_trace (vm, node, rm, reass, - candidate_range_bi, RANGE_SHRINK, - overlap); + ip4_reass_add_trace (vm, node, rm, reass->id, + reass->trace_op_counter, + candidate_range_bi, + reass->first_bi, reass->data_len, + RANGE_SHRINK, 0, ~0); + ++reass->trace_op_counter; } rc = ip4_reass_insert_range_in_chain (vm, rm, rt, reass, @@ -961,14 +1017,24 @@ ip4_reass_update (vlib_main_t * vm, vlib_node_runtime_t * node, { if (PREDICT_FALSE (fb->flags & VLIB_BUFFER_IS_TRACED)) { - ip4_reass_add_trace (vm, node, rm, reass, *bi0, RANGE_NEW, 0); + ip4_reass_add_trace (vm, node, rm, reass->id, + reass->trace_op_counter, *bi0, reass->first_bi, + reass->data_len, RANGE_NEW, 0, ~0); + ++reass->trace_op_counter; } } if (~0 != reass->last_packet_octet && reass->data_len == reass->last_packet_octet + 1) { - return ip4_reass_finalize (vm, node, rm, rt, reass, bi0, next0, error0, - is_custom_app); + *handoff_thread_idx = reass->sendout_thread_index; + rc = + ip4_reass_finalize (vm, node, rm, rt, reass, bi0, next0, error0, + is_custom_app); + if (IP4_REASS_RC_OK == rc + && reass->memory_owner_thread_index != reass->sendout_thread_index) + { + rc = IP4_REASS_RC_HANDOFF; + } } else { @@ -1056,33 +1122,53 @@ ip4_reassembly_inline (vlib_main_t * vm, vlib_node_runtime_t * node, ip4_reass_t *reass = ip4_reass_find_or_create (vm, node, rm, rt, &kv, &do_handoff); - + if (reass) + { + const u32 fragment_first = + ip4_get_fragment_offset_bytes (ip0); + if (0 == fragment_first) + { + reass->sendout_thread_index = vm->thread_index; + } + } if (PREDICT_FALSE (do_handoff)) { next0 = IP4_REASSEMBLY_NEXT_HANDOFF; if (is_feature) vnet_buffer (b0)->ip. reass.owner_feature_thread_index = - kv.v.thread_index; + kv.v.memory_owner_thread_index; else vnet_buffer (b0)->ip.reass.owner_thread_index = - kv.v.thread_index; + kv.v.memory_owner_thread_index; } else if (reass) { + u32 handoff_thread_idx; switch (ip4_reass_update (vm, node, rm, rt, reass, &bi0, &next0, - &error0, is_custom_app)) + &error0, is_custom_app, &handoff_thread_idx)) { case IP4_REASS_RC_OK: /* nothing to do here */ break; + case IP4_REASS_RC_HANDOFF: + next0 = IP4_REASSEMBLY_NEXT_HANDOFF; + b0 = vlib_get_buffer (vm, bi0); + if (is_feature) + vnet_buffer (b0)->ip. + reass.owner_feature_thread_index = + handoff_thread_idx; + else + vnet_buffer (b0)->ip.reass.owner_thread_index = + handoff_thread_idx; + break; case IP4_REASS_RC_TOO_MANY_FRAGMENTS: vlib_node_increment_counter (vm, node->node_index, IP4_ERROR_REASS_FRAGMENT_CHAIN_TOO_LONG, 1); ip4_reass_drop_all (vm, node, rm, reass); - ip4_reass_free (rm, rt, reass); + ip4_reass_free (vm, rm, rt, reass); goto next_packet; break; case IP4_REASS_RC_NO_BUF: @@ -1090,7 +1176,7 @@ ip4_reassembly_inline (vlib_main_t * vm, vlib_node_runtime_t * node, IP4_ERROR_REASS_NO_BUF, 1); ip4_reass_drop_all (vm, node, rm, reass); - ip4_reass_free (rm, rt, reass); + ip4_reass_free (vm, rm, rt, reass); goto next_packet; break; case IP4_REASS_RC_INTERNAL_ERROR: @@ -1099,7 +1185,7 @@ ip4_reassembly_inline (vlib_main_t * vm, vlib_node_runtime_t * node, IP4_ERROR_REASS_INTERNAL_ERROR, 1); ip4_reass_drop_all (vm, node, rm, reass); - ip4_reass_free (rm, rt, reass); + ip4_reass_free (vm, rm, rt, reass); goto next_packet; break; } @@ -1119,7 +1205,24 @@ ip4_reassembly_inline (vlib_main_t * vm, vlib_node_runtime_t * node, to_next[0] = bi0; to_next += 1; n_left_to_next -= 1; - if (is_feature && IP4_ERROR_NONE == error0) + if (next0 == IP4_REASSEMBLY_NEXT_HANDOFF) + { + if (PREDICT_FALSE (b0->flags & VLIB_BUFFER_IS_TRACED)) + { + if (is_feature) + ip4_reass_add_trace (vm, node, rm, ~0, + ~0, + bi0, ~0, ~0, HANDOFF, 0, + vnet_buffer (b0)->ip. + reass.owner_feature_thread_index); + else + ip4_reass_add_trace (vm, node, rm, ~0, ~0, bi0, + ~0, ~0, HANDOFF, 0, + vnet_buffer (b0)->ip. + reass.owner_thread_index); + } + } + else if (is_feature && IP4_ERROR_NONE == error0) { b0 = vlib_get_buffer (vm, bi0); vnet_feature_next (&next0, b0); @@ -1318,7 +1421,6 @@ ip4_reass_init_function (vlib_main_t * vm) vlib_node_t *node; rm->vlib_main = vm; - rm->vnet_main = vnet_get_main (); vec_validate (rm->per_thread_data, vlib_num_workers ()); ip4_reass_per_thread_t *rt; @@ -1348,7 +1450,6 @@ ip4_reass_init_function (vlib_main_t * vm) rm->fq_feature_index = vlib_frame_queue_main_init (ip4_reass_node_feature.index, 0); - return error; } @@ -1410,7 +1511,7 @@ ip4_reass_walk_expired (vlib_main_t * vm, { ip4_reass_t *reass = pool_elt_at_index (rt->pool, i[0]); ip4_reass_drop_all (vm, node, rm, reass); - ip4_reass_free (rm, rt, reass); + ip4_reass_free (vm, rm, rt, reass); } /* *INDENT-ON* */ diff --git a/src/vnet/ip/ip6_reassembly.c b/src/vnet/ip/ip6_reassembly.c index 20bb7720526..a65697b15b2 100644 --- a/src/vnet/ip/ip6_reassembly.c +++ b/src/vnet/ip/ip6_reassembly.c @@ -39,6 +39,7 @@ typedef enum IP6_REASS_RC_INTERNAL_ERROR, IP6_REASS_RC_TOO_MANY_FRAGMENTS, IP6_REASS_RC_NO_BUF, + IP6_REASS_RC_HANDOFF, } ip6_reass_rc_t; typedef struct @@ -63,7 +64,7 @@ typedef union struct { u32 reass_index; - u32 thread_index; + u32 memory_owner_thread_index; }; u64 as_u64; } ip6_reass_val_t; @@ -118,6 +119,11 @@ typedef struct u16 min_fragment_length; // number of fragments for this reassembly u32 fragments_n; + // thread owning memory for this context (whose pool contains this ctx) + u32 memory_owner_thread_index; + // thread which received fragment with offset 0 and which sends out the + // completed reassembly + u32 sendout_thread_index; } ip6_reass_t; typedef struct @@ -147,7 +153,6 @@ typedef struct // convenience vlib_main_t *vlib_main; - vnet_main_t *vnet_main; // node index of ip6-drop node u32 ip6_drop_idx; @@ -183,6 +188,7 @@ typedef enum ICMP_ERROR_FL_TOO_BIG, ICMP_ERROR_FL_NOT_MULT_8, FINALIZE, + HANDOFF, } ip6_reass_trace_operation_e; typedef struct @@ -200,11 +206,12 @@ typedef struct ip6_reass_trace_operation_e action; u32 reass_id; ip6_reass_range_trace_t trace_range; - u32 size_diff; u32 op_id; u32 fragment_first; u32 fragment_last; u32 total_data_len; + u32 thread_id; + u32 thread_id_to; } ip6_reass_trace_t; static void @@ -236,11 +243,15 @@ format_ip6_reass_trace (u8 * s, va_list * args) CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *); CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *); ip6_reass_trace_t *t = va_arg (*args, ip6_reass_trace_t *); - s = format (s, "reass id: %u, op id: %u ", t->reass_id, t->op_id); - u32 indent = format_get_indent (s); - s = format (s, "first bi: %u, data len: %u, ip/fragment[%u, %u]", - t->trace_range.first_bi, t->total_data_len, t->fragment_first, - t->fragment_last); + u32 indent = 0; + if (~0 != t->reass_id) + { + s = format (s, "reass id: %u, op id: %u ", t->reass_id, t->op_id); + indent = format_get_indent (s); + s = format (s, "first bi: %u, data len: %u, ip/fragment[%u, %u]", + t->trace_range.first_bi, t->total_data_len, + t->fragment_first, t->fragment_last); + } switch (t->action) { case RANGE_NEW: @@ -268,29 +279,34 @@ format_ip6_reass_trace (u8 * s, va_list * args) case FINALIZE: s = format (s, "\n%Ufinalize reassembly", format_white_space, indent); break; + case HANDOFF: + s = + format (s, "handoff from thread #%u to thread #%u", t->thread_id, + t->thread_id_to); + break; } return s; } static void ip6_reass_add_trace (vlib_main_t * vm, vlib_node_runtime_t * node, - ip6_reass_main_t * rm, ip6_reass_t * reass, - u32 bi, ip6_reass_trace_operation_e action, - u32 size_diff) + ip6_reass_main_t * rm, u32 reass_id, u32 op_id, + u32 bi, u32 first_bi, u32 data_len, + ip6_reass_trace_operation_e action, u32 thread_id_to) { vlib_buffer_t *b = vlib_get_buffer (vm, bi); vnet_buffer_opaque_t *vnb = vnet_buffer (b); ip6_reass_trace_t *t = vlib_add_trace (vm, node, b, sizeof (t[0])); - t->reass_id = reass->id; + t->reass_id = reass_id; t->action = action; ip6_reass_trace_details (vm, bi, &t->trace_range); - t->size_diff = size_diff; - t->op_id = reass->trace_op_counter; - ++reass->trace_op_counter; + t->op_id = op_id; + t->thread_id = vm->thread_index; + t->thread_id_to = thread_id_to; t->fragment_first = vnb->ip.reass.fragment_first; t->fragment_last = vnb->ip.reass.fragment_last; - t->trace_range.first_bi = reass->first_bi; - t->total_data_len = reass->data_len; + t->trace_range.first_bi = first_bi; + t->total_data_len = data_len; #if 0 static u8 *s = NULL; s = format (s, "%U", format_ip6_reass_trace, NULL, NULL, t); @@ -300,6 +316,13 @@ ip6_reass_add_trace (vlib_main_t * vm, vlib_node_runtime_t * node, #endif } +always_inline void +ip6_reass_free_ctx (ip6_reass_per_thread_t * rt, ip6_reass_t * reass) +{ + pool_put (rt->pool, reass); + --rt->reass_n; +} + always_inline void ip6_reass_free (ip6_reass_main_t * rm, ip6_reass_per_thread_t * rt, ip6_reass_t * reass) @@ -312,8 +335,7 @@ ip6_reass_free (ip6_reass_main_t * rm, ip6_reass_per_thread_t * rt, kv.key[4] = reass->key.as_u64[4]; kv.key[5] = reass->key.as_u64[5]; clib_bihash_add_del_48_8 (&rm->hash, &kv, 0); - pool_put (rt->pool, reass); - --rt->reass_n; + ip6_reass_free_ctx (rt, reass); } always_inline void @@ -398,8 +420,11 @@ ip6_reass_on_timeout (vlib_main_t * vm, vlib_node_runtime_t * node, *icmp_bi = reass->first_bi; if (PREDICT_FALSE (b->flags & VLIB_BUFFER_IS_TRACED)) { - ip6_reass_add_trace (vm, node, rm, reass, reass->first_bi, - ICMP_ERROR_RT_EXCEEDED, 0); + ip6_reass_add_trace (vm, node, rm, reass->id, + reass->trace_op_counter, reass->first_bi, + reass->first_bi, reass->data_len, + ICMP_ERROR_RT_EXCEEDED, ~0); + ++reass->trace_op_counter; } // fragment with offset zero received - send icmp message back if (b->flags & VLIB_BUFFER_NEXT_PRESENT) @@ -425,18 +450,26 @@ ip6_reass_find_or_create (vlib_main_t * vm, vlib_node_runtime_t * node, ip6_reass_main_t * rm, ip6_reass_per_thread_t * rt, ip6_reass_kv_t * kv, u32 * icmp_bi, u8 * do_handoff) { - ip6_reass_t *reass = NULL; - f64 now = vlib_time_now (rm->vlib_main); + ip6_reass_t *reass; + f64 now; + +again: + + reass = NULL; + now = vlib_time_now (vm); if (!clib_bihash_search_48_8 (&rm->hash, (clib_bihash_kv_48_8_t *) kv, (clib_bihash_kv_48_8_t *) kv)) { - if (vm->thread_index != kv->v.thread_index) + reass = + pool_elt_at_index (rm->per_thread_data + [kv->v.memory_owner_thread_index].pool, + kv->v.reass_index); + if (vm->thread_index != kv->v.memory_owner_thread_index) { *do_handoff = 1; - return NULL; + return reass; } - reass = pool_elt_at_index (rt->pool, kv->v.reass_index); if (now > reass->last_heard + rm->timeout) { @@ -478,13 +511,18 @@ ip6_reass_find_or_create (vlib_main_t * vm, vlib_node_runtime_t * node, reass->key.as_u64[4] = ((clib_bihash_kv_48_8_t *) kv)->key[4]; reass->key.as_u64[5] = ((clib_bihash_kv_48_8_t *) kv)->key[5]; kv->v.reass_index = (reass - rt->pool); - kv->v.thread_index = vm->thread_index; + kv->v.memory_owner_thread_index = vm->thread_index; reass->last_heard = now; - if (clib_bihash_add_del_48_8 (&rm->hash, (clib_bihash_kv_48_8_t *) kv, 1)) + int rv = + clib_bihash_add_del_48_8 (&rm->hash, (clib_bihash_kv_48_8_t *) kv, 2); + if (rv) { - ip6_reass_free (rm, rt, reass); + ip6_reass_free_ctx (rt, reass); reass = NULL; + // if other worker created a context already work with the other copy + if (-2 == rv) + goto again; } return reass; @@ -669,7 +707,10 @@ ip6_reass_finalize (vlib_main_t * vm, vlib_node_runtime_t * node, first_b->flags &= ~VLIB_BUFFER_EXT_HDR_VALID; if (PREDICT_FALSE (first_b->flags & VLIB_BUFFER_IS_TRACED)) { - ip6_reass_add_trace (vm, node, rm, reass, reass->first_bi, FINALIZE, 0); + ip6_reass_add_trace (vm, node, rm, reass->id, reass->trace_op_counter, + reass->first_bi, reass->first_bi, reass->data_len, + FINALIZE, ~0); + ++reass->trace_op_counter; #if 0 // following code does a hexdump of packet fragments to stdout ... do @@ -745,7 +786,8 @@ always_inline ip6_reass_rc_t ip6_reass_update (vlib_main_t * vm, vlib_node_runtime_t * node, ip6_reass_main_t * rm, ip6_reass_per_thread_t * rt, ip6_reass_t * reass, u32 * bi0, u32 * next0, u32 * error0, - ip6_frag_hdr_t * frag_hdr, bool is_custom_app) + ip6_frag_hdr_t * frag_hdr, bool is_custom_app, + u32 * handoff_thread_idx) { int consumed = 0; vlib_buffer_t *fb = vlib_get_buffer (vm, *bi0); @@ -835,8 +877,11 @@ ip6_reass_update (vlib_main_t * vm, vlib_node_runtime_t * node, ip6_reass_free (rm, rt, reass); if (PREDICT_FALSE (fb->flags & VLIB_BUFFER_IS_TRACED)) { - ip6_reass_add_trace (vm, node, rm, reass, *bi0, RANGE_OVERLAP, - 0); + ip6_reass_add_trace (vm, node, rm, reass->id, + reass->trace_op_counter, *bi0, + reass->first_bi, reass->data_len, + RANGE_OVERLAP, ~0); + ++reass->trace_op_counter; } *next0 = IP6_REASSEMBLY_NEXT_DROP; *error0 = IP6_ERROR_REASS_OVERLAPPING_FRAGMENT; @@ -850,7 +895,10 @@ check_if_done_maybe: { if (PREDICT_FALSE (fb->flags & VLIB_BUFFER_IS_TRACED)) { - ip6_reass_add_trace (vm, node, rm, reass, *bi0, RANGE_NEW, 0); + ip6_reass_add_trace (vm, node, rm, reass->id, + reass->trace_op_counter, *bi0, reass->first_bi, + reass->data_len, RANGE_NEW, ~0); + ++reass->trace_op_counter; } } if (~0 != reass->last_packet_octet && @@ -858,6 +906,16 @@ check_if_done_maybe: { return ip6_reass_finalize (vm, node, rm, rt, reass, bi0, next0, error0, is_custom_app); + *handoff_thread_idx = reass->sendout_thread_index; + ip6_reass_rc_t rc = + ip6_reass_finalize (vm, node, rm, rt, reass, bi0, next0, error0, + is_custom_app); + if (IP6_REASS_RC_OK == rc + && reass->memory_owner_thread_index != reass->sendout_thread_index) + { + return IP6_REASS_RC_HANDOFF; + } + return rc; } else { @@ -1023,24 +1081,44 @@ ip6_reassembly_inline (vlib_main_t * vm, vlib_node_runtime_t * node, ip6_reass_find_or_create (vm, node, rm, rt, &kv, &icmp_bi, &do_handoff); + if (reass) + { + const u32 fragment_first = ip6_frag_hdr_offset (frag_hdr); + if (0 == fragment_first) + { + reass->sendout_thread_index = vm->thread_index; + } + } if (PREDICT_FALSE (do_handoff)) { next0 = IP6_REASSEMBLY_NEXT_HANDOFF; if (is_feature) vnet_buffer (b0)->ip.reass.owner_feature_thread_index = - kv.v.thread_index; + kv.v.memory_owner_thread_index; else vnet_buffer (b0)->ip.reass.owner_thread_index = - kv.v.thread_index; + kv.v.memory_owner_thread_index; } else if (reass) { + u32 handoff_thread_idx; switch (ip6_reass_update (vm, node, rm, rt, reass, &bi0, &next0, - &error0, frag_hdr, is_custom_app)) + &error0, frag_hdr, is_custom_app, + &handoff_thread_idx)) { case IP6_REASS_RC_OK: /* nothing to do here */ break; + case IP6_REASS_RC_HANDOFF: + next0 = IP6_REASSEMBLY_NEXT_HANDOFF; + b0 = vlib_get_buffer (vm, bi0); + if (is_feature) + vnet_buffer (b0)->ip.reass.owner_feature_thread_index = + handoff_thread_idx; + else + vnet_buffer (b0)->ip.reass.owner_thread_index = + handoff_thread_idx; + break; case IP6_REASS_RC_TOO_MANY_FRAGMENTS: vlib_node_increment_counter (vm, node->node_index, IP6_ERROR_REASS_FRAGMENT_CHAIN_TOO_LONG, @@ -1089,7 +1167,24 @@ ip6_reassembly_inline (vlib_main_t * vm, vlib_node_runtime_t * node, to_next[0] = bi0; to_next += 1; n_left_to_next -= 1; - if (is_feature && IP6_ERROR_NONE == error0) + if (next0 == IP6_REASSEMBLY_NEXT_HANDOFF) + { + if (PREDICT_FALSE (b0->flags & VLIB_BUFFER_IS_TRACED)) + { + if (is_feature) + ip6_reass_add_trace (vm, node, rm, ~0, + ~0, + bi0, ~0, ~0, HANDOFF, + vnet_buffer (b0)->ip. + reass.owner_feature_thread_index); + else + ip6_reass_add_trace (vm, node, rm, ~0, ~0, bi0, + ~0, ~0, HANDOFF, + vnet_buffer (b0)->ip. + reass.owner_thread_index); + } + } + else if (is_feature && IP6_ERROR_NONE == error0) { b0 = vlib_get_buffer (vm, bi0); vnet_feature_next (&next0, b0); @@ -1296,7 +1391,6 @@ ip6_reass_init_function (vlib_main_t * vm) vlib_node_t *node; rm->vlib_main = vm; - rm->vnet_main = vnet_get_main (); vec_validate (rm->per_thread_data, vlib_num_workers ()); ip6_reass_per_thread_t *rt; diff --git a/test/framework.py b/test/framework.py index 2bfb4b2fcd9..307da8f22f2 100644 --- a/test/framework.py +++ b/test/framework.py @@ -362,13 +362,16 @@ class VppTestCase(unittest.TestCase): coredump_size = "coredump-size unlimited" cpu_core_number = cls.get_least_used_cpu() + if not hasattr(cls, "worker_config"): + cls.worker_config = "" cls.vpp_cmdline = [cls.vpp_bin, "unix", "{", "nodaemon", debug_cli, "full-coredump", coredump_size, "runtime-dir", cls.tempdir, "}", "api-trace", "{", "on", "}", "api-segment", "{", "prefix", cls.shm_prefix, "}", "cpu", "{", - "main-core", str(cpu_core_number), "}", + "main-core", str(cpu_core_number), + cls.worker_config, "}", "statseg", "{", "socket-name", cls.stats_sock, "}", "socksvr", "{", "socket-name", cls.api_sock, "}", "plugins", diff --git a/test/test_reassembly.py b/test/test_reassembly.py index e95d533684e..4c8712f5bc7 100644 --- a/test/test_reassembly.py +++ b/test/test_reassembly.py @@ -2,7 +2,7 @@ import six import unittest -from random import shuffle +from random import shuffle, choice, randrange from framework import VppTestCase, VppTestRunner @@ -10,11 +10,10 @@ import scapy.compat from scapy.packet import Raw from scapy.layers.l2 import Ether, GRE from scapy.layers.inet import IP, UDP, ICMP -from util import ppp, fragment_rfc791, fragment_rfc8200 from scapy.layers.inet6 import IPv6, IPv6ExtHdrFragment, ICMPv6ParamProblem,\ ICMPv6TimeExceeded from framework import VppTestCase, VppTestRunner -from util import ppp, fragment_rfc791, fragment_rfc8200 +from util import ppp, ppc, fragment_rfc791, fragment_rfc8200 from vpp_gre_interface import VppGreInterface from vpp_ip import DpoProto from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathProto @@ -22,6 +21,9 @@ from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathProto # 35 is enough to have >257 400-byte fragments test_packet_count = 35 +# number of workers used for multi-worker test cases +worker_count = 3 + class TestIPv4Reassembly(VppTestCase): """ IPv4 Reassembly """ @@ -499,6 +501,179 @@ class TestIPv4Reassembly(VppTestCase): self.src_if.assert_nothing_captured() +class TestIPv4MWReassembly(VppTestCase): + """ IPv4 Reassembly (multiple workers) """ + worker_config = "workers %d" % worker_count + + @classmethod + def setUpClass(cls): + super(TestIPv4MWReassembly, cls).setUpClass() + + cls.create_pg_interfaces(range(worker_count+1)) + cls.src_if = cls.pg0 + cls.send_ifs = cls.pg_interfaces[:-1] + cls.dst_if = cls.pg_interfaces[-1] + + # setup all interfaces + for i in cls.pg_interfaces: + i.admin_up() + i.config_ip4() + i.resolve_arp() + + # packets sizes reduced here because we are generating packets without + # Ethernet headers, which are added later (diff fragments go via + # different interfaces) + cls.packet_sizes = [64-len(Ether()), 512-len(Ether()), + 1518-len(Ether()), 9018-len(Ether())] + cls.padding = " abcdefghijklmn" + cls.create_stream(cls.packet_sizes) + cls.create_fragments() + + @classmethod + def tearDownClass(cls): + super(TestIPv4MWReassembly, cls).tearDownClass() + + def setUp(self): + """ Test setup - force timeout on existing reassemblies """ + super(TestIPv4MWReassembly, self).setUp() + for intf in self.send_ifs: + self.vapi.ip_reassembly_enable_disable( + sw_if_index=intf.sw_if_index, enable_ip4=True) + self.vapi.ip_reassembly_set(timeout_ms=0, max_reassemblies=1000, + max_reassembly_length=1000, + expire_walk_interval_ms=10) + self.sleep(.25) + self.vapi.ip_reassembly_set(timeout_ms=1000000, max_reassemblies=1000, + max_reassembly_length=1000, + expire_walk_interval_ms=10000) + + def tearDown(self): + super(TestIPv4MWReassembly, self).tearDown() + + def show_commands_at_teardown(self): + self.logger.debug(self.vapi.ppcli("show ip4-reassembly details")) + self.logger.debug(self.vapi.ppcli("show buffers")) + + @classmethod + def create_stream(cls, packet_sizes, packet_count=test_packet_count): + """Create input packet stream + + :param list packet_sizes: Required packet sizes. + """ + for i in range(0, packet_count): + info = cls.create_packet_info(cls.src_if, cls.src_if) + payload = cls.info_to_payload(info) + p = (IP(id=info.index, src=cls.src_if.remote_ip4, + dst=cls.dst_if.remote_ip4) / + UDP(sport=1234, dport=5678) / + Raw(payload)) + size = packet_sizes[(i // 2) % len(packet_sizes)] + cls.extend_packet(p, size, cls.padding) + info.data = p + + @classmethod + def create_fragments(cls): + infos = cls._packet_infos + cls.pkt_infos = [] + for index, info in six.iteritems(infos): + p = info.data + # cls.logger.debug(ppp("Packet:", + # p.__class__(scapy.compat.raw(p)))) + fragments_400 = fragment_rfc791(p, 400) + cls.pkt_infos.append((index, fragments_400)) + cls.fragments_400 = [ + x for (_, frags) in cls.pkt_infos for x in frags] + cls.logger.debug("Fragmented %s packets into %s 400-byte fragments, " % + (len(infos), len(cls.fragments_400))) + + def verify_capture(self, capture, dropped_packet_indexes=[]): + """Verify captured packet stream. + + :param list capture: Captured packet stream. + """ + info = None + seen = set() + for packet in capture: + try: + self.logger.debug(ppp("Got packet:", packet)) + ip = packet[IP] + udp = packet[UDP] + payload_info = self.payload_to_info(packet[Raw]) + packet_index = payload_info.index + self.assertTrue( + packet_index not in dropped_packet_indexes, + ppp("Packet received, but should be dropped:", packet)) + if packet_index in seen: + raise Exception(ppp("Duplicate packet received", packet)) + seen.add(packet_index) + self.assertEqual(payload_info.dst, self.src_if.sw_if_index) + info = self._packet_infos[packet_index] + self.assertTrue(info is not None) + self.assertEqual(packet_index, info.index) + saved_packet = info.data + self.assertEqual(ip.src, saved_packet[IP].src) + self.assertEqual(ip.dst, saved_packet[IP].dst) + self.assertEqual(udp.payload, saved_packet[UDP].payload) + except Exception: + self.logger.error(ppp("Unexpected or invalid packet:", packet)) + raise + for index in self._packet_infos: + self.assertTrue(index in seen or index in dropped_packet_indexes, + "Packet with packet_index %d not received" % index) + + def send_packets(self, packets): + for counter in range(worker_count): + if 0 == len(packets[counter]): + continue + send_if = self.send_ifs[counter] + send_if.add_stream( + (Ether(dst=send_if.local_mac, src=send_if.remote_mac) / x + for x in packets[counter]), + worker=counter) + self.pg_start() + + def test_worker_conflict(self): + """ 1st and FO=0 fragments on different workers """ + + # in first wave we send fragments which don't start at offset 0 + # then we send fragments with offset 0 on a different thread + # then the rest of packets on a random thread + first_packets = [[] for n in range(worker_count)] + second_packets = [[] for n in range(worker_count)] + rest_of_packets = [[] for n in range(worker_count)] + for (_, p) in self.pkt_infos: + wi = randrange(worker_count) + second_packets[wi].append(p[0]) + if len(p) <= 1: + continue + wi2 = wi + while wi2 == wi: + wi2 = randrange(worker_count) + first_packets[wi2].append(p[1]) + wi3 = randrange(worker_count) + rest_of_packets[wi3].extend(p[2:]) + + self.pg_enable_capture() + self.send_packets(first_packets) + self.send_packets(second_packets) + self.send_packets(rest_of_packets) + + packets = self.dst_if.get_capture(len(self.pkt_infos)) + self.verify_capture(packets) + for send_if in self.send_ifs: + send_if.assert_nothing_captured() + + self.pg_enable_capture() + self.send_packets(first_packets) + self.send_packets(second_packets) + self.send_packets(rest_of_packets) + + packets = self.dst_if.get_capture(len(self.pkt_infos)) + self.verify_capture(packets) + for send_if in self.send_ifs: + send_if.assert_nothing_captured() + + class TestIPv6Reassembly(VppTestCase): """ IPv6 Reassembly """ @@ -937,6 +1112,179 @@ class TestIPv6Reassembly(VppTestCase): self.assert_equal(icmp[ICMPv6ParamProblem].code, 0, "ICMP code") +class TestIPv6MWReassembly(VppTestCase): + """ IPv6 Reassembly (multiple workers) """ + worker_config = "workers %d" % worker_count + + @classmethod + def setUpClass(cls): + super(TestIPv6MWReassembly, cls).setUpClass() + + cls.create_pg_interfaces(range(worker_count+1)) + cls.src_if = cls.pg0 + cls.send_ifs = cls.pg_interfaces[:-1] + cls.dst_if = cls.pg_interfaces[-1] + + # setup all interfaces + for i in cls.pg_interfaces: + i.admin_up() + i.config_ip6() + i.resolve_ndp() + + # packets sizes reduced here because we are generating packets without + # Ethernet headers, which are added later (diff fragments go via + # different interfaces) + cls.packet_sizes = [64-len(Ether()), 512-len(Ether()), + 1518-len(Ether()), 9018-len(Ether())] + cls.padding = " abcdefghijklmn" + cls.create_stream(cls.packet_sizes) + cls.create_fragments() + + @classmethod + def tearDownClass(cls): + super(TestIPv6MWReassembly, cls).tearDownClass() + + def setUp(self): + """ Test setup - force timeout on existing reassemblies """ + super(TestIPv6MWReassembly, self).setUp() + for intf in self.send_ifs: + self.vapi.ip_reassembly_enable_disable( + sw_if_index=intf.sw_if_index, enable_ip6=True) + self.vapi.ip_reassembly_set(timeout_ms=0, max_reassemblies=1000, + max_reassembly_length=1000, + expire_walk_interval_ms=10, is_ip6=1) + self.sleep(.25) + self.vapi.ip_reassembly_set(timeout_ms=1000000, max_reassemblies=1000, + max_reassembly_length=1000, + expire_walk_interval_ms=1000, is_ip6=1) + + def tearDown(self): + super(TestIPv6MWReassembly, self).tearDown() + + def show_commands_at_teardown(self): + self.logger.debug(self.vapi.ppcli("show ip6-reassembly details")) + self.logger.debug(self.vapi.ppcli("show buffers")) + + @classmethod + def create_stream(cls, packet_sizes, packet_count=test_packet_count): + """Create input packet stream + + :param list packet_sizes: Required packet sizes. + """ + for i in range(0, packet_count): + info = cls.create_packet_info(cls.src_if, cls.src_if) + payload = cls.info_to_payload(info) + p = (IPv6(src=cls.src_if.remote_ip6, + dst=cls.dst_if.remote_ip6) / + UDP(sport=1234, dport=5678) / + Raw(payload)) + size = packet_sizes[(i // 2) % len(packet_sizes)] + cls.extend_packet(p, size, cls.padding) + info.data = p + + @classmethod + def create_fragments(cls): + infos = cls._packet_infos + cls.pkt_infos = [] + for index, info in six.iteritems(infos): + p = info.data + # cls.logger.debug(ppp("Packet:", + # p.__class__(scapy.compat.raw(p)))) + fragments_400 = fragment_rfc8200(p, index, 400) + cls.pkt_infos.append((index, fragments_400)) + cls.fragments_400 = [ + x for (_, frags) in cls.pkt_infos for x in frags] + cls.logger.debug("Fragmented %s packets into %s 400-byte fragments, " % + (len(infos), len(cls.fragments_400))) + + def verify_capture(self, capture, dropped_packet_indexes=[]): + """Verify captured packet strea . + + :param list capture: Captured packet stream. + """ + info = None + seen = set() + for packet in capture: + try: + self.logger.debug(ppp("Got packet:", packet)) + ip = packet[IPv6] + udp = packet[UDP] + payload_info = self.payload_to_info(packet[Raw]) + packet_index = payload_info.index + self.assertTrue( + packet_index not in dropped_packet_indexes, + ppp("Packet received, but should be dropped:", packet)) + if packet_index in seen: + raise Exception(ppp("Duplicate packet received", packet)) + seen.add(packet_index) + self.assertEqual(payload_info.dst, self.src_if.sw_if_index) + info = self._packet_infos[packet_index] + self.assertTrue(info is not None) + self.assertEqual(packet_index, info.index) + saved_packet = info.data + self.assertEqual(ip.src, saved_packet[IPv6].src) + self.assertEqual(ip.dst, saved_packet[IPv6].dst) + self.assertEqual(udp.payload, saved_packet[UDP].payload) + except Exception: + self.logger.error(ppp("Unexpected or invalid packet:", packet)) + raise + for index in self._packet_infos: + self.assertTrue(index in seen or index in dropped_packet_indexes, + "Packet with packet_index %d not received" % index) + + def send_packets(self, packets): + for counter in range(worker_count): + if 0 == len(packets[counter]): + continue + send_if = self.send_ifs[counter] + send_if.add_stream( + (Ether(dst=send_if.local_mac, src=send_if.remote_mac) / x + for x in packets[counter]), + worker=counter) + self.pg_start() + + def test_worker_conflict(self): + """ 1st and FO=0 fragments on different workers """ + + # in first wave we send fragments which don't start at offset 0 + # then we send fragments with offset 0 on a different thread + # then the rest of packets on a random thread + first_packets = [[] for n in range(worker_count)] + second_packets = [[] for n in range(worker_count)] + rest_of_packets = [[] for n in range(worker_count)] + for (_, p) in self.pkt_infos: + wi = randrange(worker_count) + second_packets[wi].append(p[0]) + if len(p) <= 1: + continue + wi2 = wi + while wi2 == wi: + wi2 = randrange(worker_count) + first_packets[wi2].append(p[1]) + wi3 = randrange(worker_count) + rest_of_packets[wi3].extend(p[2:]) + + self.pg_enable_capture() + self.send_packets(first_packets) + self.send_packets(second_packets) + self.send_packets(rest_of_packets) + + packets = self.dst_if.get_capture(len(self.pkt_infos)) + self.verify_capture(packets) + for send_if in self.send_ifs: + send_if.assert_nothing_captured() + + self.pg_enable_capture() + self.send_packets(first_packets) + self.send_packets(second_packets) + self.send_packets(rest_of_packets) + + packets = self.dst_if.get_capture(len(self.pkt_infos)) + self.verify_capture(packets) + for send_if in self.send_ifs: + send_if.assert_nothing_captured() + + class TestIPv4ReassemblyLocalNode(VppTestCase): """ IPv4 Reassembly for packets coming to ip4-local node """ -- 2.16.6