ip: reassembly: send packet out on correct worker 76/20976/5
authorKlement Sekera <ksekera@cisco.com>
Fri, 19 Jul 2019 09:14:19 +0000 (09:14 +0000)
committerOle Trøan <otroan@employees.org>
Tue, 20 Aug 2019 13:33:30 +0000 (13:33 +0000)
Note which worker received fragment with offset zero and use this worker
to send out the reassembled packet.

Type: fix
Change-Id: I1d3cee16788db3b230682525239c0100d51dc380
Signed-off-by: Klement Sekera <ksekera@cisco.com>
src/vnet/ip/ip4_reassembly.c
src/vnet/ip/ip6_reassembly.c
test/framework.py
test/test_reassembly.py

index b82bafe..7159b8a 100644 (file)
@@ -25,6 +25,7 @@
 #include <vnet/ip/ip.h>
 #include <vppinfra/bihash_16_8.h>
 #include <vnet/ip/ip4_reassembly.h>
+#include <stddef.h>
 
 #define MSEC_PER_SEC 1000
 #define IP4_REASS_TIMEOUT_DEFAULT_MS 100
@@ -61,6 +62,7 @@ typedef enum
   IP4_REASS_RC_TOO_MANY_FRAGMENTS,
   IP4_REASS_RC_INTERNAL_ERROR,
   IP4_REASS_RC_NO_BUF,
+  IP4_REASS_RC_HANDOFF,
 } ip4_reass_rc_t;
 
 typedef struct
@@ -85,7 +87,7 @@ typedef union
   struct
   {
     u32 reass_index;
-    u32 thread_index;
+    u32 memory_owner_thread_index;
   };
   u64 as_u64;
 } ip4_reass_val_t;
@@ -139,10 +141,16 @@ typedef struct
   u16 min_fragment_length;
   // number of fragments in this reassembly
   u32 fragments_n;
+  // thread owning memory for this context (whose pool contains this ctx)
+  u32 memory_owner_thread_index;
+  // thread which received fragment with offset 0 and which sends out the
+  // completed reassembly
+  u32 sendout_thread_index;
 } ip4_reass_t;
 
 typedef struct
 {
+  // pool of reassembly contexts
   ip4_reass_t *pool;
   u32 reass_n;
   u32 id_counter;
@@ -167,7 +175,6 @@ typedef struct
 
   // convenience
   vlib_main_t *vlib_main;
-  vnet_main_t *vnet_main;
 
   // node index of ip4-drop node
   u32 ip4_drop_idx;
@@ -176,7 +183,6 @@ typedef struct
   /** Worker handoff */
   u32 fq_index;
   u32 fq_feature_index;
-
 } ip4_reass_main_t;
 
 extern ip4_reass_main_t ip4_reass_main;
@@ -200,6 +206,7 @@ typedef enum
   RANGE_DISCARD,
   RANGE_OVERLAP,
   FINALIZE,
+  HANDOFF,
 } ip4_reass_trace_operation_e;
 
 typedef struct
@@ -219,6 +226,8 @@ typedef struct
   ip4_reass_range_trace_t trace_range;
   u32 size_diff;
   u32 op_id;
+  u32 thread_id;
+  u32 thread_id_to;
   u32 fragment_first;
   u32 fragment_last;
   u32 total_data_len;
@@ -256,11 +265,17 @@ format_ip4_reass_trace (u8 * s, va_list * args)
   CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *);
   CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *);
   ip4_reass_trace_t *t = va_arg (*args, ip4_reass_trace_t *);
-  s = format (s, "reass id: %u, op id: %u ", t->reass_id, t->op_id);
-  u32 indent = format_get_indent (s);
-  s = format (s, "first bi: %u, data len: %u, ip/fragment[%u, %u]",
-             t->trace_range.first_bi, t->total_data_len, t->fragment_first,
-             t->fragment_last);
+  u32 indent = 0;
+  if (~0 != t->reass_id)
+    {
+      s = format (s, "reass id: %u, op id: %u, ", t->reass_id, t->op_id);
+      indent = format_get_indent (s);
+      s =
+       format (s,
+               "first bi: %u, data len: %u, ip/fragment[%u, %u]",
+               t->trace_range.first_bi, t->total_data_len, t->fragment_first,
+               t->fragment_last);
+    }
   switch (t->action)
     {
     case RANGE_SHRINK:
@@ -283,28 +298,36 @@ format_ip4_reass_trace (u8 * s, va_list * args)
     case FINALIZE:
       s = format (s, "\n%Ufinalize reassembly", format_white_space, indent);
       break;
+    case HANDOFF:
+      s =
+       format (s, "handoff from thread #%u to thread #%u", t->thread_id,
+               t->thread_id_to);
+      break;
     }
   return s;
 }
 
 static void
 ip4_reass_add_trace (vlib_main_t * vm, vlib_node_runtime_t * node,
-                    ip4_reass_main_t * rm, ip4_reass_t * reass, u32 bi,
-                    ip4_reass_trace_operation_e action, u32 size_diff)
+                    ip4_reass_main_t * rm, u32 reass_id, u32 op_id,
+                    u32 bi, u32 first_bi, u32 data_len,
+                    ip4_reass_trace_operation_e action, u32 size_diff,
+                    u32 thread_id_to)
 {
   vlib_buffer_t *b = vlib_get_buffer (vm, bi);
   vnet_buffer_opaque_t *vnb = vnet_buffer (b);
   ip4_reass_trace_t *t = vlib_add_trace (vm, node, b, sizeof (t[0]));
-  t->reass_id = reass->id;
+  t->reass_id = reass_id;
   t->action = action;
   ip4_reass_trace_details (vm, bi, &t->trace_range);
   t->size_diff = size_diff;
-  t->op_id = reass->trace_op_counter;
-  ++reass->trace_op_counter;
+  t->op_id = op_id;
+  t->thread_id = vm->thread_index;
+  t->thread_id_to = thread_id_to;
   t->fragment_first = vnb->ip.reass.fragment_first;
   t->fragment_last = vnb->ip.reass.fragment_last;
-  t->trace_range.first_bi = reass->first_bi;
-  t->total_data_len = reass->data_len;
+  t->trace_range.first_bi = first_bi;
+  t->total_data_len = data_len;
 #if 0
   static u8 *s = NULL;
   s = format (s, "%U", format_ip4_reass_trace, NULL, NULL, t);
@@ -314,17 +337,22 @@ ip4_reass_add_trace (vlib_main_t * vm, vlib_node_runtime_t * node,
 #endif
 }
 
+always_inline void
+ip4_reass_free_ctx (ip4_reass_per_thread_t * rt, ip4_reass_t * reass)
+{
+  pool_put (rt->pool, reass);
+  --rt->reass_n;
+}
 
 always_inline void
-ip4_reass_free (ip4_reass_main_t * rm, ip4_reass_per_thread_t * rt,
-               ip4_reass_t * reass)
+ip4_reass_free (vlib_main_t * vm, ip4_reass_main_t * rm,
+               ip4_reass_per_thread_t * rt, ip4_reass_t * reass)
 {
   clib_bihash_kv_16_8_t kv;
   kv.key[0] = reass->key.as_u64[0];
   kv.key[1] = reass->key.as_u64[1];
   clib_bihash_add_del_16_8 (&rm->hash, &kv, 0);
-  pool_put (rt->pool, reass);
-  --rt->reass_n;
+  return ip4_reass_free_ctx (rt, reass);
 }
 
 always_inline void
@@ -396,23 +424,30 @@ ip4_reass_find_or_create (vlib_main_t * vm, vlib_node_runtime_t * node,
                          ip4_reass_main_t * rm, ip4_reass_per_thread_t * rt,
                          ip4_reass_kv_t * kv, u8 * do_handoff)
 {
-  ip4_reass_t *reass = NULL;
-  f64 now = vlib_time_now (rm->vlib_main);
+  ip4_reass_t *reass;
+  f64 now;
 
+again:
+
+  reass = NULL;
+  now = vlib_time_now (vm);
   if (!clib_bihash_search_16_8
       (&rm->hash, (clib_bihash_kv_16_8_t *) kv, (clib_bihash_kv_16_8_t *) kv))
     {
-      if (vm->thread_index != kv->v.thread_index)
+      reass =
+       pool_elt_at_index (rm->per_thread_data
+                          [kv->v.memory_owner_thread_index].pool,
+                          kv->v.reass_index);
+      if (vm->thread_index != reass->memory_owner_thread_index)
        {
          *do_handoff = 1;
-         return NULL;
+         return reass;
        }
-      reass = pool_elt_at_index (rt->pool, kv->v.reass_index);
 
       if (now > reass->last_heard + rm->timeout)
        {
          ip4_reass_drop_all (vm, node, rm, reass);
-         ip4_reass_free (rm, rt, reass);
+         ip4_reass_free (vm, rm, rt, reass);
          reass = NULL;
        }
     }
@@ -433,6 +468,7 @@ ip4_reass_find_or_create (vlib_main_t * vm, vlib_node_runtime_t * node,
       pool_get (rt->pool, reass);
       clib_memset (reass, 0, sizeof (*reass));
       reass->id = ((u64) vm->thread_index * 1000000000) + rt->id_counter;
+      reass->memory_owner_thread_index = vm->thread_index;
       ++rt->id_counter;
       reass->first_bi = ~0;
       reass->last_packet_octet = ~0;
@@ -445,13 +481,18 @@ ip4_reass_find_or_create (vlib_main_t * vm, vlib_node_runtime_t * node,
   reass->key.as_u64[0] = ((clib_bihash_kv_16_8_t *) kv)->key[0];
   reass->key.as_u64[1] = ((clib_bihash_kv_16_8_t *) kv)->key[1];
   kv->v.reass_index = (reass - rt->pool);
-  kv->v.thread_index = vm->thread_index;
+  kv->v.memory_owner_thread_index = vm->thread_index;
   reass->last_heard = now;
 
-  if (clib_bihash_add_del_16_8 (&rm->hash, (clib_bihash_kv_16_8_t *) kv, 1))
+  int rv =
+    clib_bihash_add_del_16_8 (&rm->hash, (clib_bihash_kv_16_8_t *) kv, 2);
+  if (rv)
     {
-      ip4_reass_free (rm, rt, reass);
+      ip4_reass_free_ctx (rt, reass);
       reass = NULL;
+      // if other worker created a context already work with the other copy
+      if (-2 == rv)
+       goto again;
     }
 
   return reass;
@@ -615,7 +656,10 @@ ip4_reass_finalize (vlib_main_t * vm, vlib_node_runtime_t * node,
   first_b->flags &= ~VLIB_BUFFER_EXT_HDR_VALID;
   if (PREDICT_FALSE (first_b->flags & VLIB_BUFFER_IS_TRACED))
     {
-      ip4_reass_add_trace (vm, node, rm, reass, reass->first_bi, FINALIZE, 0);
+      ip4_reass_add_trace (vm, node, rm, reass->id, reass->trace_op_counter,
+                          reass->first_bi, reass->first_bi, reass->data_len,
+                          FINALIZE, 0, ~0);
+      ++reass->trace_op_counter;
 #if 0
       // following code does a hexdump of packet fragments to stdout ...
       do
@@ -654,7 +698,7 @@ ip4_reass_finalize (vlib_main_t * vm, vlib_node_runtime_t * node,
     }
   vnet_buffer (first_b)->ip.reass.estimated_mtu = reass->min_fragment_length;
   *error0 = IP4_ERROR_NONE;
-  ip4_reass_free (rm, rt, reass);
+  ip4_reass_free (vm, rm, rt, reass);
   reass = NULL;
   return IP4_REASS_RC_OK;
 }
@@ -728,8 +772,11 @@ ip4_reass_remove_range_from_chain (vlib_main_t * vm,
       u32 to_be_freed_bi = discard_bi;
       if (PREDICT_FALSE (discard_b->flags & VLIB_BUFFER_IS_TRACED))
        {
-         ip4_reass_add_trace (vm, node, rm, reass, discard_bi, RANGE_DISCARD,
-                              0);
+         ip4_reass_add_trace (vm, node, rm, reass->id,
+                              reass->trace_op_counter, discard_bi,
+                              reass->first_bi, reass->data_len,
+                              RANGE_DISCARD, 0, ~0);
+         ++reass->trace_op_counter;
        }
       if (discard_b->flags & VLIB_BUFFER_NEXT_PRESENT)
        {
@@ -753,7 +800,7 @@ always_inline ip4_reass_rc_t
 ip4_reass_update (vlib_main_t * vm, vlib_node_runtime_t * node,
                  ip4_reass_main_t * rm, ip4_reass_per_thread_t * rt,
                  ip4_reass_t * reass, u32 * bi0, u32 * next0, u32 * error0,
-                 bool is_custom_app)
+                 bool is_custom_app, u32 * handoff_thread_idx)
 {
   ip4_reass_rc_t rc = IP4_REASS_RC_OK;
   int consumed = 0;
@@ -794,7 +841,10 @@ ip4_reass_update (vlib_main_t * vm, vlib_node_runtime_t * node,
        }
       if (PREDICT_FALSE (fb->flags & VLIB_BUFFER_IS_TRACED))
        {
-         ip4_reass_add_trace (vm, node, rm, reass, *bi0, RANGE_NEW, 0);
+         ip4_reass_add_trace (vm, node, rm, reass->id,
+                              reass->trace_op_counter, *bi0, reass->first_bi,
+                              reass->data_len, RANGE_NEW, 0, ~0);
+         ++reass->trace_op_counter;
        }
       *bi0 = ~0;
       reass->min_fragment_length = clib_net_to_host_u16 (fip->length);
@@ -848,8 +898,11 @@ ip4_reass_update (vlib_main_t * vm, vlib_node_runtime_t * node,
              // this fragment is a (sub)part of existing range, ignore it
              if (PREDICT_FALSE (fb->flags & VLIB_BUFFER_IS_TRACED))
                {
-                 ip4_reass_add_trace (vm, node, rm, reass, *bi0,
-                                      RANGE_OVERLAP, 0);
+                 ip4_reass_add_trace (vm, node, rm, reass->id,
+                                      reass->trace_op_counter, *bi0,
+                                      reass->first_bi, reass->data_len,
+                                      RANGE_OVERLAP, 0, ~0);
+                 ++reass->trace_op_counter;
                }
              break;
            }
@@ -868,9 +921,12 @@ ip4_reass_update (vlib_main_t * vm, vlib_node_runtime_t * node,
                  reass->data_len -= overlap;
                  if (PREDICT_FALSE (fb->flags & VLIB_BUFFER_IS_TRACED))
                    {
-                     ip4_reass_add_trace (vm, node, rm, reass,
-                                          candidate_range_bi, RANGE_SHRINK,
-                                          overlap);
+                     ip4_reass_add_trace (vm, node, rm, reass->id,
+                                          reass->trace_op_counter,
+                                          candidate_range_bi,
+                                          reass->first_bi, reass->data_len,
+                                          RANGE_SHRINK, 0, ~0);
+                     ++reass->trace_op_counter;
                    }
                  rc =
                    ip4_reass_insert_range_in_chain (vm, rm, rt, reass,
@@ -961,14 +1017,24 @@ ip4_reass_update (vlib_main_t * vm, vlib_node_runtime_t * node,
     {
       if (PREDICT_FALSE (fb->flags & VLIB_BUFFER_IS_TRACED))
        {
-         ip4_reass_add_trace (vm, node, rm, reass, *bi0, RANGE_NEW, 0);
+         ip4_reass_add_trace (vm, node, rm, reass->id,
+                              reass->trace_op_counter, *bi0, reass->first_bi,
+                              reass->data_len, RANGE_NEW, 0, ~0);
+         ++reass->trace_op_counter;
        }
     }
   if (~0 != reass->last_packet_octet &&
       reass->data_len == reass->last_packet_octet + 1)
     {
-      return ip4_reass_finalize (vm, node, rm, rt, reass, bi0, next0, error0,
-                                is_custom_app);
+      *handoff_thread_idx = reass->sendout_thread_index;
+      rc =
+       ip4_reass_finalize (vm, node, rm, rt, reass, bi0, next0, error0,
+                           is_custom_app);
+      if (IP4_REASS_RC_OK == rc
+         && reass->memory_owner_thread_index != reass->sendout_thread_index)
+       {
+         rc = IP4_REASS_RC_HANDOFF;
+       }
     }
   else
     {
@@ -1056,33 +1122,53 @@ ip4_reassembly_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
                  ip4_reass_t *reass =
                    ip4_reass_find_or_create (vm, node, rm, rt, &kv,
                                              &do_handoff);
-
+                 if (reass)
+                   {
+                     const u32 fragment_first =
+                       ip4_get_fragment_offset_bytes (ip0);
+                     if (0 == fragment_first)
+                       {
+                         reass->sendout_thread_index = vm->thread_index;
+                       }
+                   }
                  if (PREDICT_FALSE (do_handoff))
                    {
                      next0 = IP4_REASSEMBLY_NEXT_HANDOFF;
                      if (is_feature)
                        vnet_buffer (b0)->ip.
                          reass.owner_feature_thread_index =
-                         kv.v.thread_index;
+                         kv.v.memory_owner_thread_index;
                      else
                        vnet_buffer (b0)->ip.reass.owner_thread_index =
-                         kv.v.thread_index;
+                         kv.v.memory_owner_thread_index;
                    }
                  else if (reass)
                    {
+                     u32 handoff_thread_idx;
                      switch (ip4_reass_update
                              (vm, node, rm, rt, reass, &bi0, &next0,
-                              &error0, is_custom_app))
+                              &error0, is_custom_app, &handoff_thread_idx))
                        {
                        case IP4_REASS_RC_OK:
                          /* nothing to do here */
                          break;
+                       case IP4_REASS_RC_HANDOFF:
+                         next0 = IP4_REASSEMBLY_NEXT_HANDOFF;
+                         b0 = vlib_get_buffer (vm, bi0);
+                         if (is_feature)
+                           vnet_buffer (b0)->ip.
+                             reass.owner_feature_thread_index =
+                             handoff_thread_idx;
+                         else
+                           vnet_buffer (b0)->ip.reass.owner_thread_index =
+                             handoff_thread_idx;
+                         break;
                        case IP4_REASS_RC_TOO_MANY_FRAGMENTS:
                          vlib_node_increment_counter (vm, node->node_index,
                                                       IP4_ERROR_REASS_FRAGMENT_CHAIN_TOO_LONG,
                                                       1);
                          ip4_reass_drop_all (vm, node, rm, reass);
-                         ip4_reass_free (rm, rt, reass);
+                         ip4_reass_free (vm, rm, rt, reass);
                          goto next_packet;
                          break;
                        case IP4_REASS_RC_NO_BUF:
@@ -1090,7 +1176,7 @@ ip4_reassembly_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
                                                       IP4_ERROR_REASS_NO_BUF,
                                                       1);
                          ip4_reass_drop_all (vm, node, rm, reass);
-                         ip4_reass_free (rm, rt, reass);
+                         ip4_reass_free (vm, rm, rt, reass);
                          goto next_packet;
                          break;
                        case IP4_REASS_RC_INTERNAL_ERROR:
@@ -1099,7 +1185,7 @@ ip4_reassembly_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
                                                       IP4_ERROR_REASS_INTERNAL_ERROR,
                                                       1);
                          ip4_reass_drop_all (vm, node, rm, reass);
-                         ip4_reass_free (rm, rt, reass);
+                         ip4_reass_free (vm, rm, rt, reass);
                          goto next_packet;
                          break;
                        }
@@ -1119,7 +1205,24 @@ ip4_reassembly_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
              to_next[0] = bi0;
              to_next += 1;
              n_left_to_next -= 1;
-             if (is_feature && IP4_ERROR_NONE == error0)
+             if (next0 == IP4_REASSEMBLY_NEXT_HANDOFF)
+               {
+                 if (PREDICT_FALSE (b0->flags & VLIB_BUFFER_IS_TRACED))
+                   {
+                     if (is_feature)
+                       ip4_reass_add_trace (vm, node, rm, ~0,
+                                            ~0,
+                                            bi0, ~0, ~0, HANDOFF, 0,
+                                            vnet_buffer (b0)->ip.
+                                            reass.owner_feature_thread_index);
+                     else
+                       ip4_reass_add_trace (vm, node, rm, ~0, ~0, bi0,
+                                            ~0, ~0, HANDOFF, 0,
+                                            vnet_buffer (b0)->ip.
+                                            reass.owner_thread_index);
+                   }
+               }
+             else if (is_feature && IP4_ERROR_NONE == error0)
                {
                  b0 = vlib_get_buffer (vm, bi0);
                  vnet_feature_next (&next0, b0);
@@ -1318,7 +1421,6 @@ ip4_reass_init_function (vlib_main_t * vm)
   vlib_node_t *node;
 
   rm->vlib_main = vm;
-  rm->vnet_main = vnet_get_main ();
 
   vec_validate (rm->per_thread_data, vlib_num_workers ());
   ip4_reass_per_thread_t *rt;
@@ -1348,7 +1450,6 @@ ip4_reass_init_function (vlib_main_t * vm)
   rm->fq_feature_index =
     vlib_frame_queue_main_init (ip4_reass_node_feature.index, 0);
 
-
   return error;
 }
 
@@ -1410,7 +1511,7 @@ ip4_reass_walk_expired (vlib_main_t * vm,
           {
             ip4_reass_t *reass = pool_elt_at_index (rt->pool, i[0]);
             ip4_reass_drop_all (vm, node, rm, reass);
-            ip4_reass_free (rm, rt, reass);
+            ip4_reass_free (vm, rm, rt, reass);
           }
           /* *INDENT-ON* */
 
index 20bb772..a65697b 100644 (file)
@@ -39,6 +39,7 @@ typedef enum
   IP6_REASS_RC_INTERNAL_ERROR,
   IP6_REASS_RC_TOO_MANY_FRAGMENTS,
   IP6_REASS_RC_NO_BUF,
+  IP6_REASS_RC_HANDOFF,
 } ip6_reass_rc_t;
 
 typedef struct
@@ -63,7 +64,7 @@ typedef union
   struct
   {
     u32 reass_index;
-    u32 thread_index;
+    u32 memory_owner_thread_index;
   };
   u64 as_u64;
 } ip6_reass_val_t;
@@ -118,6 +119,11 @@ typedef struct
   u16 min_fragment_length;
   // number of fragments for this reassembly
   u32 fragments_n;
+  // thread owning memory for this context (whose pool contains this ctx)
+  u32 memory_owner_thread_index;
+  // thread which received fragment with offset 0 and which sends out the
+  // completed reassembly
+  u32 sendout_thread_index;
 } ip6_reass_t;
 
 typedef struct
@@ -147,7 +153,6 @@ typedef struct
 
   // convenience
   vlib_main_t *vlib_main;
-  vnet_main_t *vnet_main;
 
   // node index of ip6-drop node
   u32 ip6_drop_idx;
@@ -183,6 +188,7 @@ typedef enum
   ICMP_ERROR_FL_TOO_BIG,
   ICMP_ERROR_FL_NOT_MULT_8,
   FINALIZE,
+  HANDOFF,
 } ip6_reass_trace_operation_e;
 
 typedef struct
@@ -200,11 +206,12 @@ typedef struct
   ip6_reass_trace_operation_e action;
   u32 reass_id;
   ip6_reass_range_trace_t trace_range;
-  u32 size_diff;
   u32 op_id;
   u32 fragment_first;
   u32 fragment_last;
   u32 total_data_len;
+  u32 thread_id;
+  u32 thread_id_to;
 } ip6_reass_trace_t;
 
 static void
@@ -236,11 +243,15 @@ format_ip6_reass_trace (u8 * s, va_list * args)
   CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *);
   CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *);
   ip6_reass_trace_t *t = va_arg (*args, ip6_reass_trace_t *);
-  s = format (s, "reass id: %u, op id: %u ", t->reass_id, t->op_id);
-  u32 indent = format_get_indent (s);
-  s = format (s, "first bi: %u, data len: %u, ip/fragment[%u, %u]",
-             t->trace_range.first_bi, t->total_data_len, t->fragment_first,
-             t->fragment_last);
+  u32 indent = 0;
+  if (~0 != t->reass_id)
+    {
+      s = format (s, "reass id: %u, op id: %u ", t->reass_id, t->op_id);
+      indent = format_get_indent (s);
+      s = format (s, "first bi: %u, data len: %u, ip/fragment[%u, %u]",
+                 t->trace_range.first_bi, t->total_data_len,
+                 t->fragment_first, t->fragment_last);
+    }
   switch (t->action)
     {
     case RANGE_NEW:
@@ -268,29 +279,34 @@ format_ip6_reass_trace (u8 * s, va_list * args)
     case FINALIZE:
       s = format (s, "\n%Ufinalize reassembly", format_white_space, indent);
       break;
+    case HANDOFF:
+      s =
+       format (s, "handoff from thread #%u to thread #%u", t->thread_id,
+               t->thread_id_to);
+      break;
     }
   return s;
 }
 
 static void
 ip6_reass_add_trace (vlib_main_t * vm, vlib_node_runtime_t * node,
-                    ip6_reass_main_t * rm, ip6_reass_t * reass,
-                    u32 bi, ip6_reass_trace_operation_e action,
-                    u32 size_diff)
+                    ip6_reass_main_t * rm, u32 reass_id, u32 op_id,
+                    u32 bi, u32 first_bi, u32 data_len,
+                    ip6_reass_trace_operation_e action, u32 thread_id_to)
 {
   vlib_buffer_t *b = vlib_get_buffer (vm, bi);
   vnet_buffer_opaque_t *vnb = vnet_buffer (b);
   ip6_reass_trace_t *t = vlib_add_trace (vm, node, b, sizeof (t[0]));
-  t->reass_id = reass->id;
+  t->reass_id = reass_id;
   t->action = action;
   ip6_reass_trace_details (vm, bi, &t->trace_range);
-  t->size_diff = size_diff;
-  t->op_id = reass->trace_op_counter;
-  ++reass->trace_op_counter;
+  t->op_id = op_id;
+  t->thread_id = vm->thread_index;
+  t->thread_id_to = thread_id_to;
   t->fragment_first = vnb->ip.reass.fragment_first;
   t->fragment_last = vnb->ip.reass.fragment_last;
-  t->trace_range.first_bi = reass->first_bi;
-  t->total_data_len = reass->data_len;
+  t->trace_range.first_bi = first_bi;
+  t->total_data_len = data_len;
 #if 0
   static u8 *s = NULL;
   s = format (s, "%U", format_ip6_reass_trace, NULL, NULL, t);
@@ -300,6 +316,13 @@ ip6_reass_add_trace (vlib_main_t * vm, vlib_node_runtime_t * node,
 #endif
 }
 
+always_inline void
+ip6_reass_free_ctx (ip6_reass_per_thread_t * rt, ip6_reass_t * reass)
+{
+  pool_put (rt->pool, reass);
+  --rt->reass_n;
+}
+
 always_inline void
 ip6_reass_free (ip6_reass_main_t * rm, ip6_reass_per_thread_t * rt,
                ip6_reass_t * reass)
@@ -312,8 +335,7 @@ ip6_reass_free (ip6_reass_main_t * rm, ip6_reass_per_thread_t * rt,
   kv.key[4] = reass->key.as_u64[4];
   kv.key[5] = reass->key.as_u64[5];
   clib_bihash_add_del_48_8 (&rm->hash, &kv, 0);
-  pool_put (rt->pool, reass);
-  --rt->reass_n;
+  ip6_reass_free_ctx (rt, reass);
 }
 
 always_inline void
@@ -398,8 +420,11 @@ ip6_reass_on_timeout (vlib_main_t * vm, vlib_node_runtime_t * node,
          *icmp_bi = reass->first_bi;
          if (PREDICT_FALSE (b->flags & VLIB_BUFFER_IS_TRACED))
            {
-             ip6_reass_add_trace (vm, node, rm, reass, reass->first_bi,
-                                  ICMP_ERROR_RT_EXCEEDED, 0);
+             ip6_reass_add_trace (vm, node, rm, reass->id,
+                                  reass->trace_op_counter, reass->first_bi,
+                                  reass->first_bi, reass->data_len,
+                                  ICMP_ERROR_RT_EXCEEDED, ~0);
+             ++reass->trace_op_counter;
            }
          // fragment with offset zero received - send icmp message back
          if (b->flags & VLIB_BUFFER_NEXT_PRESENT)
@@ -425,18 +450,26 @@ ip6_reass_find_or_create (vlib_main_t * vm, vlib_node_runtime_t * node,
                          ip6_reass_main_t * rm, ip6_reass_per_thread_t * rt,
                          ip6_reass_kv_t * kv, u32 * icmp_bi, u8 * do_handoff)
 {
-  ip6_reass_t *reass = NULL;
-  f64 now = vlib_time_now (rm->vlib_main);
+  ip6_reass_t *reass;
+  f64 now;
+
+again:
+
+  reass = NULL;
+  now = vlib_time_now (vm);
 
   if (!clib_bihash_search_48_8
       (&rm->hash, (clib_bihash_kv_48_8_t *) kv, (clib_bihash_kv_48_8_t *) kv))
     {
-      if (vm->thread_index != kv->v.thread_index)
+      reass =
+       pool_elt_at_index (rm->per_thread_data
+                          [kv->v.memory_owner_thread_index].pool,
+                          kv->v.reass_index);
+      if (vm->thread_index != kv->v.memory_owner_thread_index)
        {
          *do_handoff = 1;
-         return NULL;
+         return reass;
        }
-      reass = pool_elt_at_index (rt->pool, kv->v.reass_index);
 
       if (now > reass->last_heard + rm->timeout)
        {
@@ -478,13 +511,18 @@ ip6_reass_find_or_create (vlib_main_t * vm, vlib_node_runtime_t * node,
   reass->key.as_u64[4] = ((clib_bihash_kv_48_8_t *) kv)->key[4];
   reass->key.as_u64[5] = ((clib_bihash_kv_48_8_t *) kv)->key[5];
   kv->v.reass_index = (reass - rt->pool);
-  kv->v.thread_index = vm->thread_index;
+  kv->v.memory_owner_thread_index = vm->thread_index;
   reass->last_heard = now;
 
-  if (clib_bihash_add_del_48_8 (&rm->hash, (clib_bihash_kv_48_8_t *) kv, 1))
+  int rv =
+    clib_bihash_add_del_48_8 (&rm->hash, (clib_bihash_kv_48_8_t *) kv, 2);
+  if (rv)
     {
-      ip6_reass_free (rm, rt, reass);
+      ip6_reass_free_ctx (rt, reass);
       reass = NULL;
+      // if other worker created a context already work with the other copy
+      if (-2 == rv)
+       goto again;
     }
 
   return reass;
@@ -669,7 +707,10 @@ ip6_reass_finalize (vlib_main_t * vm, vlib_node_runtime_t * node,
   first_b->flags &= ~VLIB_BUFFER_EXT_HDR_VALID;
   if (PREDICT_FALSE (first_b->flags & VLIB_BUFFER_IS_TRACED))
     {
-      ip6_reass_add_trace (vm, node, rm, reass, reass->first_bi, FINALIZE, 0);
+      ip6_reass_add_trace (vm, node, rm, reass->id, reass->trace_op_counter,
+                          reass->first_bi, reass->first_bi, reass->data_len,
+                          FINALIZE, ~0);
+      ++reass->trace_op_counter;
 #if 0
       // following code does a hexdump of packet fragments to stdout ...
       do
@@ -745,7 +786,8 @@ always_inline ip6_reass_rc_t
 ip6_reass_update (vlib_main_t * vm, vlib_node_runtime_t * node,
                  ip6_reass_main_t * rm, ip6_reass_per_thread_t * rt,
                  ip6_reass_t * reass, u32 * bi0, u32 * next0, u32 * error0,
-                 ip6_frag_hdr_t * frag_hdr, bool is_custom_app)
+                 ip6_frag_hdr_t * frag_hdr, bool is_custom_app,
+                 u32 * handoff_thread_idx)
 {
   int consumed = 0;
   vlib_buffer_t *fb = vlib_get_buffer (vm, *bi0);
@@ -835,8 +877,11 @@ ip6_reass_update (vlib_main_t * vm, vlib_node_runtime_t * node,
          ip6_reass_free (rm, rt, reass);
          if (PREDICT_FALSE (fb->flags & VLIB_BUFFER_IS_TRACED))
            {
-             ip6_reass_add_trace (vm, node, rm, reass, *bi0, RANGE_OVERLAP,
-                                  0);
+             ip6_reass_add_trace (vm, node, rm, reass->id,
+                                  reass->trace_op_counter, *bi0,
+                                  reass->first_bi, reass->data_len,
+                                  RANGE_OVERLAP, ~0);
+             ++reass->trace_op_counter;
            }
          *next0 = IP6_REASSEMBLY_NEXT_DROP;
          *error0 = IP6_ERROR_REASS_OVERLAPPING_FRAGMENT;
@@ -850,7 +895,10 @@ check_if_done_maybe:
     {
       if (PREDICT_FALSE (fb->flags & VLIB_BUFFER_IS_TRACED))
        {
-         ip6_reass_add_trace (vm, node, rm, reass, *bi0, RANGE_NEW, 0);
+         ip6_reass_add_trace (vm, node, rm, reass->id,
+                              reass->trace_op_counter, *bi0, reass->first_bi,
+                              reass->data_len, RANGE_NEW, ~0);
+         ++reass->trace_op_counter;
        }
     }
   if (~0 != reass->last_packet_octet &&
@@ -858,6 +906,16 @@ check_if_done_maybe:
     {
       return ip6_reass_finalize (vm, node, rm, rt, reass, bi0, next0, error0,
                                 is_custom_app);
+      *handoff_thread_idx = reass->sendout_thread_index;
+      ip6_reass_rc_t rc =
+       ip6_reass_finalize (vm, node, rm, rt, reass, bi0, next0, error0,
+                           is_custom_app);
+      if (IP6_REASS_RC_OK == rc
+         && reass->memory_owner_thread_index != reass->sendout_thread_index)
+       {
+         return IP6_REASS_RC_HANDOFF;
+       }
+      return rc;
     }
   else
     {
@@ -1023,24 +1081,44 @@ ip6_reassembly_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
            ip6_reass_find_or_create (vm, node, rm, rt, &kv, &icmp_bi,
                                      &do_handoff);
 
+         if (reass)
+           {
+             const u32 fragment_first = ip6_frag_hdr_offset (frag_hdr);
+             if (0 == fragment_first)
+               {
+                 reass->sendout_thread_index = vm->thread_index;
+               }
+           }
          if (PREDICT_FALSE (do_handoff))
            {
              next0 = IP6_REASSEMBLY_NEXT_HANDOFF;
              if (is_feature)
                vnet_buffer (b0)->ip.reass.owner_feature_thread_index =
-                 kv.v.thread_index;
+                 kv.v.memory_owner_thread_index;
              else
                vnet_buffer (b0)->ip.reass.owner_thread_index =
-                 kv.v.thread_index;
+                 kv.v.memory_owner_thread_index;
            }
          else if (reass)
            {
+             u32 handoff_thread_idx;
              switch (ip6_reass_update (vm, node, rm, rt, reass, &bi0, &next0,
-                                       &error0, frag_hdr, is_custom_app))
+                                       &error0, frag_hdr, is_custom_app,
+                                       &handoff_thread_idx))
                {
                case IP6_REASS_RC_OK:
                  /* nothing to do here */
                  break;
+               case IP6_REASS_RC_HANDOFF:
+                 next0 = IP6_REASSEMBLY_NEXT_HANDOFF;
+                 b0 = vlib_get_buffer (vm, bi0);
+                 if (is_feature)
+                   vnet_buffer (b0)->ip.reass.owner_feature_thread_index =
+                     handoff_thread_idx;
+                 else
+                   vnet_buffer (b0)->ip.reass.owner_thread_index =
+                     handoff_thread_idx;
+                 break;
                case IP6_REASS_RC_TOO_MANY_FRAGMENTS:
                  vlib_node_increment_counter (vm, node->node_index,
                                               IP6_ERROR_REASS_FRAGMENT_CHAIN_TOO_LONG,
@@ -1089,7 +1167,24 @@ ip6_reassembly_inline (vlib_main_t * vm, vlib_node_runtime_t * node,
              to_next[0] = bi0;
              to_next += 1;
              n_left_to_next -= 1;
-             if (is_feature && IP6_ERROR_NONE == error0)
+             if (next0 == IP6_REASSEMBLY_NEXT_HANDOFF)
+               {
+                 if (PREDICT_FALSE (b0->flags & VLIB_BUFFER_IS_TRACED))
+                   {
+                     if (is_feature)
+                       ip6_reass_add_trace (vm, node, rm, ~0,
+                                            ~0,
+                                            bi0, ~0, ~0, HANDOFF,
+                                            vnet_buffer (b0)->ip.
+                                            reass.owner_feature_thread_index);
+                     else
+                       ip6_reass_add_trace (vm, node, rm, ~0, ~0, bi0,
+                                            ~0, ~0, HANDOFF,
+                                            vnet_buffer (b0)->ip.
+                                            reass.owner_thread_index);
+                   }
+               }
+             else if (is_feature && IP6_ERROR_NONE == error0)
                {
                  b0 = vlib_get_buffer (vm, bi0);
                  vnet_feature_next (&next0, b0);
@@ -1296,7 +1391,6 @@ ip6_reass_init_function (vlib_main_t * vm)
   vlib_node_t *node;
 
   rm->vlib_main = vm;
-  rm->vnet_main = vnet_get_main ();
 
   vec_validate (rm->per_thread_data, vlib_num_workers ());
   ip6_reass_per_thread_t *rt;
index 2bfb4b2..307da8f 100644 (file)
@@ -362,13 +362,16 @@ class VppTestCase(unittest.TestCase):
             coredump_size = "coredump-size unlimited"
 
         cpu_core_number = cls.get_least_used_cpu()
+        if not hasattr(cls, "worker_config"):
+            cls.worker_config = ""
 
         cls.vpp_cmdline = [cls.vpp_bin, "unix",
                            "{", "nodaemon", debug_cli, "full-coredump",
                            coredump_size, "runtime-dir", cls.tempdir, "}",
                            "api-trace", "{", "on", "}", "api-segment", "{",
                            "prefix", cls.shm_prefix, "}", "cpu", "{",
-                           "main-core", str(cpu_core_number), "}",
+                           "main-core", str(cpu_core_number),
+                           cls.worker_config, "}",
                            "statseg", "{", "socket-name", cls.stats_sock, "}",
                            "socksvr", "{", "socket-name", cls.api_sock, "}",
                            "plugins",
index e95d533..4c8712f 100644 (file)
@@ -2,7 +2,7 @@
 
 import six
 import unittest
-from random import shuffle
+from random import shuffle, choice, randrange
 
 from framework import VppTestCase, VppTestRunner
 
@@ -10,11 +10,10 @@ import scapy.compat
 from scapy.packet import Raw
 from scapy.layers.l2 import Ether, GRE
 from scapy.layers.inet import IP, UDP, ICMP
-from util import ppp, fragment_rfc791, fragment_rfc8200
 from scapy.layers.inet6 import IPv6, IPv6ExtHdrFragment, ICMPv6ParamProblem,\
     ICMPv6TimeExceeded
 from framework import VppTestCase, VppTestRunner
-from util import ppp, fragment_rfc791, fragment_rfc8200
+from util import ppp, ppc, fragment_rfc791, fragment_rfc8200
 from vpp_gre_interface import VppGreInterface
 from vpp_ip import DpoProto
 from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathProto
@@ -22,6 +21,9 @@ from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathProto
 # 35 is enough to have >257 400-byte fragments
 test_packet_count = 35
 
+# number of workers used for multi-worker test cases
+worker_count = 3
+
 
 class TestIPv4Reassembly(VppTestCase):
     """ IPv4 Reassembly """
@@ -499,6 +501,179 @@ class TestIPv4Reassembly(VppTestCase):
         self.src_if.assert_nothing_captured()
 
 
+class TestIPv4MWReassembly(VppTestCase):
+    """ IPv4 Reassembly (multiple workers) """
+    worker_config = "workers %d" % worker_count
+
+    @classmethod
+    def setUpClass(cls):
+        super(TestIPv4MWReassembly, cls).setUpClass()
+
+        cls.create_pg_interfaces(range(worker_count+1))
+        cls.src_if = cls.pg0
+        cls.send_ifs = cls.pg_interfaces[:-1]
+        cls.dst_if = cls.pg_interfaces[-1]
+
+        # setup all interfaces
+        for i in cls.pg_interfaces:
+            i.admin_up()
+            i.config_ip4()
+            i.resolve_arp()
+
+        # packets sizes reduced here because we are generating packets without
+        # Ethernet headers, which are added later (diff fragments go via
+        # different interfaces)
+        cls.packet_sizes = [64-len(Ether()), 512-len(Ether()),
+                            1518-len(Ether()), 9018-len(Ether())]
+        cls.padding = " abcdefghijklmn"
+        cls.create_stream(cls.packet_sizes)
+        cls.create_fragments()
+
+    @classmethod
+    def tearDownClass(cls):
+        super(TestIPv4MWReassembly, cls).tearDownClass()
+
+    def setUp(self):
+        """ Test setup - force timeout on existing reassemblies """
+        super(TestIPv4MWReassembly, self).setUp()
+        for intf in self.send_ifs:
+            self.vapi.ip_reassembly_enable_disable(
+                sw_if_index=intf.sw_if_index, enable_ip4=True)
+        self.vapi.ip_reassembly_set(timeout_ms=0, max_reassemblies=1000,
+                                    max_reassembly_length=1000,
+                                    expire_walk_interval_ms=10)
+        self.sleep(.25)
+        self.vapi.ip_reassembly_set(timeout_ms=1000000, max_reassemblies=1000,
+                                    max_reassembly_length=1000,
+                                    expire_walk_interval_ms=10000)
+
+    def tearDown(self):
+        super(TestIPv4MWReassembly, self).tearDown()
+
+    def show_commands_at_teardown(self):
+        self.logger.debug(self.vapi.ppcli("show ip4-reassembly details"))
+        self.logger.debug(self.vapi.ppcli("show buffers"))
+
+    @classmethod
+    def create_stream(cls, packet_sizes, packet_count=test_packet_count):
+        """Create input packet stream
+
+        :param list packet_sizes: Required packet sizes.
+        """
+        for i in range(0, packet_count):
+            info = cls.create_packet_info(cls.src_if, cls.src_if)
+            payload = cls.info_to_payload(info)
+            p = (IP(id=info.index, src=cls.src_if.remote_ip4,
+                    dst=cls.dst_if.remote_ip4) /
+                 UDP(sport=1234, dport=5678) /
+                 Raw(payload))
+            size = packet_sizes[(i // 2) % len(packet_sizes)]
+            cls.extend_packet(p, size, cls.padding)
+            info.data = p
+
+    @classmethod
+    def create_fragments(cls):
+        infos = cls._packet_infos
+        cls.pkt_infos = []
+        for index, info in six.iteritems(infos):
+            p = info.data
+            # cls.logger.debug(ppp("Packet:",
+            #                      p.__class__(scapy.compat.raw(p))))
+            fragments_400 = fragment_rfc791(p, 400)
+            cls.pkt_infos.append((index, fragments_400))
+        cls.fragments_400 = [
+            x for (_, frags) in cls.pkt_infos for x in frags]
+        cls.logger.debug("Fragmented %s packets into %s 400-byte fragments, " %
+                         (len(infos), len(cls.fragments_400)))
+
+    def verify_capture(self, capture, dropped_packet_indexes=[]):
+        """Verify captured packet stream.
+
+        :param list capture: Captured packet stream.
+        """
+        info = None
+        seen = set()
+        for packet in capture:
+            try:
+                self.logger.debug(ppp("Got packet:", packet))
+                ip = packet[IP]
+                udp = packet[UDP]
+                payload_info = self.payload_to_info(packet[Raw])
+                packet_index = payload_info.index
+                self.assertTrue(
+                    packet_index not in dropped_packet_indexes,
+                    ppp("Packet received, but should be dropped:", packet))
+                if packet_index in seen:
+                    raise Exception(ppp("Duplicate packet received", packet))
+                seen.add(packet_index)
+                self.assertEqual(payload_info.dst, self.src_if.sw_if_index)
+                info = self._packet_infos[packet_index]
+                self.assertTrue(info is not None)
+                self.assertEqual(packet_index, info.index)
+                saved_packet = info.data
+                self.assertEqual(ip.src, saved_packet[IP].src)
+                self.assertEqual(ip.dst, saved_packet[IP].dst)
+                self.assertEqual(udp.payload, saved_packet[UDP].payload)
+            except Exception:
+                self.logger.error(ppp("Unexpected or invalid packet:", packet))
+                raise
+        for index in self._packet_infos:
+            self.assertTrue(index in seen or index in dropped_packet_indexes,
+                            "Packet with packet_index %d not received" % index)
+
+    def send_packets(self, packets):
+        for counter in range(worker_count):
+            if 0 == len(packets[counter]):
+                continue
+            send_if = self.send_ifs[counter]
+            send_if.add_stream(
+                (Ether(dst=send_if.local_mac, src=send_if.remote_mac) / x
+                 for x in packets[counter]),
+                worker=counter)
+        self.pg_start()
+
+    def test_worker_conflict(self):
+        """ 1st and FO=0 fragments on different workers """
+
+        # in first wave we send fragments which don't start at offset 0
+        # then we send fragments with offset 0 on a different thread
+        # then the rest of packets on a random thread
+        first_packets = [[] for n in range(worker_count)]
+        second_packets = [[] for n in range(worker_count)]
+        rest_of_packets = [[] for n in range(worker_count)]
+        for (_, p) in self.pkt_infos:
+            wi = randrange(worker_count)
+            second_packets[wi].append(p[0])
+            if len(p) <= 1:
+                continue
+            wi2 = wi
+            while wi2 == wi:
+                wi2 = randrange(worker_count)
+            first_packets[wi2].append(p[1])
+            wi3 = randrange(worker_count)
+            rest_of_packets[wi3].extend(p[2:])
+
+        self.pg_enable_capture()
+        self.send_packets(first_packets)
+        self.send_packets(second_packets)
+        self.send_packets(rest_of_packets)
+
+        packets = self.dst_if.get_capture(len(self.pkt_infos))
+        self.verify_capture(packets)
+        for send_if in self.send_ifs:
+            send_if.assert_nothing_captured()
+
+        self.pg_enable_capture()
+        self.send_packets(first_packets)
+        self.send_packets(second_packets)
+        self.send_packets(rest_of_packets)
+
+        packets = self.dst_if.get_capture(len(self.pkt_infos))
+        self.verify_capture(packets)
+        for send_if in self.send_ifs:
+            send_if.assert_nothing_captured()
+
+
 class TestIPv6Reassembly(VppTestCase):
     """ IPv6 Reassembly """
 
@@ -937,6 +1112,179 @@ class TestIPv6Reassembly(VppTestCase):
         self.assert_equal(icmp[ICMPv6ParamProblem].code, 0, "ICMP code")
 
 
+class TestIPv6MWReassembly(VppTestCase):
+    """ IPv6 Reassembly (multiple workers) """
+    worker_config = "workers %d" % worker_count
+
+    @classmethod
+    def setUpClass(cls):
+        super(TestIPv6MWReassembly, cls).setUpClass()
+
+        cls.create_pg_interfaces(range(worker_count+1))
+        cls.src_if = cls.pg0
+        cls.send_ifs = cls.pg_interfaces[:-1]
+        cls.dst_if = cls.pg_interfaces[-1]
+
+        # setup all interfaces
+        for i in cls.pg_interfaces:
+            i.admin_up()
+            i.config_ip6()
+            i.resolve_ndp()
+
+        # packets sizes reduced here because we are generating packets without
+        # Ethernet headers, which are added later (diff fragments go via
+        # different interfaces)
+        cls.packet_sizes = [64-len(Ether()), 512-len(Ether()),
+                            1518-len(Ether()), 9018-len(Ether())]
+        cls.padding = " abcdefghijklmn"
+        cls.create_stream(cls.packet_sizes)
+        cls.create_fragments()
+
+    @classmethod
+    def tearDownClass(cls):
+        super(TestIPv6MWReassembly, cls).tearDownClass()
+
+    def setUp(self):
+        """ Test setup - force timeout on existing reassemblies """
+        super(TestIPv6MWReassembly, self).setUp()
+        for intf in self.send_ifs:
+            self.vapi.ip_reassembly_enable_disable(
+                sw_if_index=intf.sw_if_index, enable_ip6=True)
+        self.vapi.ip_reassembly_set(timeout_ms=0, max_reassemblies=1000,
+                                    max_reassembly_length=1000,
+                                    expire_walk_interval_ms=10, is_ip6=1)
+        self.sleep(.25)
+        self.vapi.ip_reassembly_set(timeout_ms=1000000, max_reassemblies=1000,
+                                    max_reassembly_length=1000,
+                                    expire_walk_interval_ms=1000, is_ip6=1)
+
+    def tearDown(self):
+        super(TestIPv6MWReassembly, self).tearDown()
+
+    def show_commands_at_teardown(self):
+        self.logger.debug(self.vapi.ppcli("show ip6-reassembly details"))
+        self.logger.debug(self.vapi.ppcli("show buffers"))
+
+    @classmethod
+    def create_stream(cls, packet_sizes, packet_count=test_packet_count):
+        """Create input packet stream
+
+        :param list packet_sizes: Required packet sizes.
+        """
+        for i in range(0, packet_count):
+            info = cls.create_packet_info(cls.src_if, cls.src_if)
+            payload = cls.info_to_payload(info)
+            p = (IPv6(src=cls.src_if.remote_ip6,
+                      dst=cls.dst_if.remote_ip6) /
+                 UDP(sport=1234, dport=5678) /
+                 Raw(payload))
+            size = packet_sizes[(i // 2) % len(packet_sizes)]
+            cls.extend_packet(p, size, cls.padding)
+            info.data = p
+
+    @classmethod
+    def create_fragments(cls):
+        infos = cls._packet_infos
+        cls.pkt_infos = []
+        for index, info in six.iteritems(infos):
+            p = info.data
+            # cls.logger.debug(ppp("Packet:",
+            #                      p.__class__(scapy.compat.raw(p))))
+            fragments_400 = fragment_rfc8200(p, index, 400)
+            cls.pkt_infos.append((index, fragments_400))
+        cls.fragments_400 = [
+            x for (_, frags) in cls.pkt_infos for x in frags]
+        cls.logger.debug("Fragmented %s packets into %s 400-byte fragments, " %
+                         (len(infos), len(cls.fragments_400)))
+
+    def verify_capture(self, capture, dropped_packet_indexes=[]):
+        """Verify captured packet strea .
+
+        :param list capture: Captured packet stream.
+        """
+        info = None
+        seen = set()
+        for packet in capture:
+            try:
+                self.logger.debug(ppp("Got packet:", packet))
+                ip = packet[IPv6]
+                udp = packet[UDP]
+                payload_info = self.payload_to_info(packet[Raw])
+                packet_index = payload_info.index
+                self.assertTrue(
+                    packet_index not in dropped_packet_indexes,
+                    ppp("Packet received, but should be dropped:", packet))
+                if packet_index in seen:
+                    raise Exception(ppp("Duplicate packet received", packet))
+                seen.add(packet_index)
+                self.assertEqual(payload_info.dst, self.src_if.sw_if_index)
+                info = self._packet_infos[packet_index]
+                self.assertTrue(info is not None)
+                self.assertEqual(packet_index, info.index)
+                saved_packet = info.data
+                self.assertEqual(ip.src, saved_packet[IPv6].src)
+                self.assertEqual(ip.dst, saved_packet[IPv6].dst)
+                self.assertEqual(udp.payload, saved_packet[UDP].payload)
+            except Exception:
+                self.logger.error(ppp("Unexpected or invalid packet:", packet))
+                raise
+        for index in self._packet_infos:
+            self.assertTrue(index in seen or index in dropped_packet_indexes,
+                            "Packet with packet_index %d not received" % index)
+
+    def send_packets(self, packets):
+        for counter in range(worker_count):
+            if 0 == len(packets[counter]):
+                continue
+            send_if = self.send_ifs[counter]
+            send_if.add_stream(
+                (Ether(dst=send_if.local_mac, src=send_if.remote_mac) / x
+                 for x in packets[counter]),
+                worker=counter)
+        self.pg_start()
+
+    def test_worker_conflict(self):
+        """ 1st and FO=0 fragments on different workers """
+
+        # in first wave we send fragments which don't start at offset 0
+        # then we send fragments with offset 0 on a different thread
+        # then the rest of packets on a random thread
+        first_packets = [[] for n in range(worker_count)]
+        second_packets = [[] for n in range(worker_count)]
+        rest_of_packets = [[] for n in range(worker_count)]
+        for (_, p) in self.pkt_infos:
+            wi = randrange(worker_count)
+            second_packets[wi].append(p[0])
+            if len(p) <= 1:
+                continue
+            wi2 = wi
+            while wi2 == wi:
+                wi2 = randrange(worker_count)
+            first_packets[wi2].append(p[1])
+            wi3 = randrange(worker_count)
+            rest_of_packets[wi3].extend(p[2:])
+
+        self.pg_enable_capture()
+        self.send_packets(first_packets)
+        self.send_packets(second_packets)
+        self.send_packets(rest_of_packets)
+
+        packets = self.dst_if.get_capture(len(self.pkt_infos))
+        self.verify_capture(packets)
+        for send_if in self.send_ifs:
+            send_if.assert_nothing_captured()
+
+        self.pg_enable_capture()
+        self.send_packets(first_packets)
+        self.send_packets(second_packets)
+        self.send_packets(rest_of_packets)
+
+        packets = self.dst_if.get_capture(len(self.pkt_infos))
+        self.verify_capture(packets)
+        for send_if in self.send_ifs:
+            send_if.assert_nothing_captured()
+
+
 class TestIPv4ReassemblyLocalNode(VppTestCase):
     """ IPv4 Reassembly for packets coming to ip4-local node """