nat: fix HA multi-worker issues 46/31646/17
authorKlement Sekera <ksekera@cisco.com>
Mon, 15 Mar 2021 15:34:01 +0000 (16:34 +0100)
committerOle Tr�an <otroan@employees.org>
Tue, 30 Mar 2021 12:26:25 +0000 (12:26 +0000)
Use correct vlib_main() in various code parts. Fix tests.

Type: fix
Signed-off-by: Klement Sekera <ksekera@cisco.com>
Change-Id: Ia379f3b686599532dedaafad2278c4097a3f03f3

src/plugins/nat/nat44-ei/nat44_ei.c
src/plugins/nat/nat44-ei/nat44_ei_api.c
src/plugins/nat/nat44-ei/nat44_ei_cli.c
src/plugins/nat/nat44-ei/nat44_ei_ha.c
src/plugins/nat/nat44-ei/nat44_ei_ha.h
src/plugins/nat/test/test_nat44_ei.py

index 253dd78..77c224d 100644 (file)
@@ -1785,27 +1785,23 @@ nat44_ei_del_session (nat44_ei_main_t *nm, ip4_address_t *addr, u16 port,
 {
   nat44_ei_main_per_thread_data_t *tnm;
   clib_bihash_kv_8_8_t kv, value;
-  ip4_header_t ip;
   u32 fib_index = fib_table_find (FIB_PROTOCOL_IP4, vrf_id);
   nat44_ei_session_t *s;
   clib_bihash_8_8_t *t;
 
-  ip.dst_address.as_u32 = ip.src_address.as_u32 = addr->as_u32;
-  if (nm->num_workers > 1)
-    tnm =
-      vec_elt_at_index (nm->per_thread_data,
-                       nat44_ei_get_in2out_worker_index (&ip, fib_index, 0));
-  else
-    tnm = vec_elt_at_index (nm->per_thread_data, nm->num_workers);
-
   init_nat_k (&kv, *addr, port, fib_index, proto);
   t = is_in ? &nm->in2out : &nm->out2in;
   if (!clib_bihash_search_8_8 (t, &kv, &value))
     {
-      if (pool_is_free_index (tnm->sessions, value.value))
+      // this is called from API/CLI, so the world is stopped here
+      // it's safe to manipulate arbitrary per-thread data
+      u32 thread_index = nat_value_get_thread_index (&value);
+      tnm = vec_elt_at_index (nm->per_thread_data, thread_index);
+      u32 session_index = nat_value_get_session_index (&value);
+      if (pool_is_free_index (tnm->sessions, session_index))
        return VNET_API_ERROR_UNSPECIFIED;
 
-      s = pool_elt_at_index (tnm->sessions, value.value);
+      s = pool_elt_at_index (tnm->sessions, session_index);
       nat44_ei_free_session_data_v2 (nm, s, tnm - nm->per_thread_data, 0);
       nat44_ei_delete_session (nm, s, tnm - nm->per_thread_data);
       return 0;
index 5ec07b0..427140f 100644 (file)
@@ -303,7 +303,8 @@ vl_api_nat44_ei_ha_set_listener_t_handler (
   int rv;
 
   memcpy (&addr, &mp->ip_address, sizeof (addr));
-  rv = nat_ha_set_listener (&addr, clib_net_to_host_u16 (mp->port),
+  rv = nat_ha_set_listener (vlib_get_main (), &addr,
+                           clib_net_to_host_u16 (mp->port),
                            clib_net_to_host_u32 (mp->path_mtu));
 
   REPLY_MACRO (VL_API_NAT44_EI_HA_SET_LISTENER_REPLY);
@@ -339,9 +340,9 @@ vl_api_nat44_ei_ha_set_failover_t_handler (
   int rv;
 
   memcpy (&addr, &mp->ip_address, sizeof (addr));
-  rv =
-    nat_ha_set_failover (&addr, clib_net_to_host_u16 (mp->port),
-                        clib_net_to_host_u32 (mp->session_refresh_interval));
+  rv = nat_ha_set_failover (
+    vlib_get_main (), &addr, clib_net_to_host_u16 (mp->port),
+    clib_net_to_host_u32 (mp->session_refresh_interval));
 
   REPLY_MACRO (VL_API_NAT44_EI_HA_SET_FAILOVER_REPLY);
 }
index 3aa3a2f..96c6de3 100644 (file)
@@ -616,7 +616,7 @@ nat_ha_failover_command_fn (vlib_main_t *vm, unformat_input_t *input,
        }
     }
 
-  rv = nat_ha_set_failover (&addr, (u16) port, session_refresh_interval);
+  rv = nat_ha_set_failover (vm, &addr, (u16) port, session_refresh_interval);
   if (rv)
     error = clib_error_return (0, "set HA failover failed");
 
@@ -654,7 +654,7 @@ nat_ha_listener_command_fn (vlib_main_t *vm, unformat_input_t *input,
        }
     }
 
-  rv = nat_ha_set_listener (&addr, (u16) port, path_mtu);
+  rv = nat_ha_set_listener (vm, &addr, (u16) port, path_mtu);
   if (rv)
     error = clib_error_return (0, "set HA listener failed");
 
index 3d634dc..344d104 100644 (file)
@@ -143,7 +143,6 @@ typedef struct nat_ha_main_s
   u32 session_refresh_interval;
   /* counters */
   vlib_simple_counter_main_t counters[NAT_HA_N_COUNTERS];
-  vlib_main_t *vlib_main;
   /* sequence number counter */
   u32 sequence_number;
   /* 1 if resync in progress */
@@ -326,13 +325,13 @@ nat_ha_resync_fin (void)
 
 /* cache HA NAT data waiting for ACK */
 static int
-nat_ha_resend_queue_add (u32 seq, u8 * data, u8 data_len, u8 is_resync,
-                        u32 thread_index)
+nat_ha_resend_queue_add (vlib_main_t *vm, u32 seq, u8 *data, u8 data_len,
+                        u8 is_resync, u32 vlib_thread_index)
 {
   nat_ha_main_t *ha = &nat_ha_main;
-  nat_ha_per_thread_data_t *td = &ha->per_thread_data[thread_index];
+  nat_ha_per_thread_data_t *td = &ha->per_thread_data[vlib_thread_index];
   nat_ha_resend_entry_t *entry;
-  f64 now = vlib_time_now (ha->vlib_main);
+  f64 now = vlib_time_now (vm);
 
   vec_add2 (td->resend_queue, entry, 1);
   clib_memset (entry, 0, sizeof (*entry));
@@ -376,17 +375,17 @@ nat_ha_ack_recv (u32 seq, u32 thread_index)
 
 /* scan non-ACKed HA NAT for retry */
 static void
-nat_ha_resend_scan (f64 now, u32 thread_index)
+nat_ha_resend_scan (vlib_main_t *vm, u32 thread_index)
 {
   nat44_ei_main_t *nm = &nat44_ei_main;
   nat_ha_main_t *ha = &nat_ha_main;
   nat_ha_per_thread_data_t *td = &ha->per_thread_data[thread_index];
   u32 i, *del, *to_delete = 0;
-  vlib_main_t *vm = ha->vlib_main;
   vlib_buffer_t *b = 0;
   vlib_frame_t *f;
   u32 bi, *to_next;
   ip4_header_t *ip;
+  f64 now = vlib_time_now (vm);
 
   vec_foreach_index (i, td->resend_queue)
   {
@@ -485,7 +484,6 @@ nat_ha_init (vlib_main_t * vm, u32 num_workers, u32 num_threads)
 
   nat_ha_set_node_indexes (ha, vm);
 
-  ha->vlib_main = vm;
   ha->fq_index = ~0;
 
   ha->num_workers = num_workers;
@@ -501,14 +499,15 @@ nat_ha_init (vlib_main_t * vm, u32 num_workers, u32 num_threads)
 }
 
 int
-nat_ha_set_listener (ip4_address_t * addr, u16 port, u32 path_mtu)
+nat_ha_set_listener (vlib_main_t *vm, ip4_address_t *addr, u16 port,
+                    u32 path_mtu)
 {
   nat44_ei_main_t *nm = &nat44_ei_main;
   nat_ha_main_t *ha = &nat_ha_main;
 
   /* unregister previously set UDP port */
   if (ha->src_port)
-    udp_unregister_dst_port (ha->vlib_main, ha->src_port, 1);
+    udp_unregister_dst_port (vm, ha->src_port, 1);
 
   ha->src_ip_address.as_u32 = addr->as_u32;
   ha->src_port = port;
@@ -521,12 +520,11 @@ nat_ha_set_listener (ip4_address_t * addr, u16 port, u32 path_mtu)
        {
          if (ha->fq_index == ~0)
            ha->fq_index = vlib_frame_queue_main_init (ha->ha_node_index, 0);
-         udp_register_dst_port (ha->vlib_main, port,
-                                ha->ha_handoff_node_index, 1);
+         udp_register_dst_port (vm, port, ha->ha_handoff_node_index, 1);
        }
       else
        {
-         udp_register_dst_port (ha->vlib_main, port, ha->ha_node_index, 1);
+         udp_register_dst_port (vm, port, ha->ha_node_index, 1);
        }
       nat_elog_info_X1 (nm, "HA listening on port %d for state sync", "i4",
                        port);
@@ -546,7 +544,7 @@ nat_ha_get_listener (ip4_address_t * addr, u16 * port, u32 * path_mtu)
 }
 
 int
-nat_ha_set_failover (ip4_address_t * addr, u16 port,
+nat_ha_set_failover (vlib_main_t *vm, ip4_address_t *addr, u16 port,
                     u32 session_refresh_interval)
 {
   nat_ha_main_t *ha = &nat_ha_main;
@@ -555,7 +553,7 @@ nat_ha_set_failover (ip4_address_t * addr, u16 port,
   ha->dst_port = port;
   ha->session_refresh_interval = session_refresh_interval;
 
-  vlib_process_signal_event (ha->vlib_main, ha->ha_process_node_index, 1, 0);
+  vlib_process_signal_event (vm, ha->ha_process_node_index, 1, 0);
 
   return 0;
 }
@@ -703,15 +701,15 @@ nat_ha_header_create (vlib_buffer_t * b, u32 * offset, u32 thread_index)
 }
 
 static inline void
-nat_ha_send (vlib_frame_t * f, vlib_buffer_t * b, u8 is_resync,
-            u32 thread_index)
+nat_ha_send (vlib_frame_t *f, vlib_buffer_t *b, u8 is_resync,
+            u32 vlib_thread_index)
 {
   nat_ha_main_t *ha = &nat_ha_main;
-  nat_ha_per_thread_data_t *td = &ha->per_thread_data[thread_index];
+  nat_ha_per_thread_data_t *td = &ha->per_thread_data[vlib_thread_index];
   nat_ha_message_header_t *h;
   ip4_header_t *ip;
   udp_header_t *udp;
-  vlib_main_t *vm = vlib_get_main_by_index (thread_index);
+  vlib_main_t *vm = vlib_get_main_by_index (vlib_thread_index);
 
   ip = vlib_buffer_get_current (b);
   udp = ip4_next_header (ip);
@@ -723,21 +721,22 @@ nat_ha_send (vlib_frame_t * f, vlib_buffer_t * b, u8 is_resync,
   ip->checksum = ip4_header_checksum (ip);
   udp->length = clib_host_to_net_u16 (b->current_length - sizeof (*ip));
 
-  nat_ha_resend_queue_add (h->sequence_number, (u8 *) ip, b->current_length,
-                          is_resync, thread_index);
+  nat_ha_resend_queue_add (vm, h->sequence_number, (u8 *) ip,
+                          b->current_length, is_resync, vlib_thread_index);
 
   vlib_put_frame_to_node (vm, ip4_lookup_node.index, f);
 }
 
 /* add NAT HA protocol event */
 static_always_inline void
-nat_ha_event_add (nat_ha_event_t * event, u8 do_flush, u32 thread_index,
+nat_ha_event_add (nat_ha_event_t *event, u8 do_flush, u32 session_thread_index,
                  u8 is_resync)
 {
   nat44_ei_main_t *nm = &nat44_ei_main;
   nat_ha_main_t *ha = &nat_ha_main;
-  nat_ha_per_thread_data_t *td = &ha->per_thread_data[thread_index];
-  vlib_main_t *vm = vlib_get_main_by_index (thread_index);
+  u32 vlib_thread_index = vlib_get_thread_index ();
+  nat_ha_per_thread_data_t *td = &ha->per_thread_data[vlib_thread_index];
+  vlib_main_t *vm = vlib_get_main_by_index (vlib_thread_index);
   vlib_buffer_t *b = 0;
   vlib_frame_t *f;
   u32 bi = ~0, offset;
@@ -778,7 +777,7 @@ nat_ha_event_add (nat_ha_event_t * event, u8 do_flush, u32 thread_index,
     }
 
   if (PREDICT_FALSE (td->state_sync_count == 0))
-    nat_ha_header_create (b, &offset, thread_index);
+    nat_ha_header_create (b, &offset, session_thread_index);
 
   if (PREDICT_TRUE (do_flush == 0))
     {
@@ -790,19 +789,17 @@ nat_ha_event_add (nat_ha_event_t * event, u8 do_flush, u32 thread_index,
       switch (event->event_type)
        {
        case NAT_HA_ADD:
-         vlib_increment_simple_counter (&ha->counters
-                                        [NAT_HA_COUNTER_SEND_ADD],
-                                        thread_index, 0, 1);
+         vlib_increment_simple_counter (
+           &ha->counters[NAT_HA_COUNTER_SEND_ADD], vlib_thread_index, 0, 1);
          break;
        case NAT_HA_DEL:
-         vlib_increment_simple_counter (&ha->counters
-                                        [NAT_HA_COUNTER_SEND_DEL],
-                                        thread_index, 0, 1);
+         vlib_increment_simple_counter (
+           &ha->counters[NAT_HA_COUNTER_SEND_DEL], vlib_thread_index, 0, 1);
          break;
        case NAT_HA_REFRESH:
-         vlib_increment_simple_counter (&ha->counters
-                                        [NAT_HA_COUNTER_SEND_REFRESH],
-                                        thread_index, 0, 1);
+         vlib_increment_simple_counter (
+           &ha->counters[NAT_HA_COUNTER_SEND_REFRESH], vlib_thread_index, 0,
+           1);
          break;
        default:
          break;
@@ -812,7 +809,7 @@ nat_ha_event_add (nat_ha_event_t * event, u8 do_flush, u32 thread_index,
   if (PREDICT_FALSE
       (do_flush || offset + (sizeof (*event)) > ha->state_sync_path_mtu))
     {
-      nat_ha_send (f, b, is_resync, thread_index);
+      nat_ha_send (f, b, is_resync, vlib_thread_index);
       td->state_sync_buffer = 0;
       td->state_sync_frame = 0;
       td->state_sync_count = 0;
@@ -868,8 +865,8 @@ nat_ha_sadd (ip4_address_t * in_addr, u16 in_port, ip4_address_t * out_addr,
 }
 
 void
-nat_ha_sdel (ip4_address_t * out_addr, u16 out_port, ip4_address_t * eh_addr,
-            u16 eh_port, u8 proto, u32 fib_index, u32 thread_index)
+nat_ha_sdel (ip4_address_t *out_addr, u16 out_port, ip4_address_t *eh_addr,
+            u16 eh_port, u8 proto, u32 fib_index, u32 session_thread_index)
 {
   nat_ha_event_t event;
 
@@ -883,7 +880,7 @@ nat_ha_sdel (ip4_address_t * out_addr, u16 out_port, ip4_address_t * eh_addr,
   event.eh_port = eh_port;
   event.fib_index = clib_host_to_net_u32 (fib_index);
   event.protocol = proto;
-  nat_ha_event_add (&event, 0, thread_index, 0);
+  nat_ha_event_add (&event, 0, session_thread_index, 0);
 }
 
 void
@@ -933,7 +930,7 @@ nat_ha_worker_fn (vlib_main_t * vm, vlib_node_runtime_t * rt,
   /* flush HA NAT data under construction */
   nat_ha_event_add (0, 1, thread_index, 0);
   /* scan if we need to resend some non-ACKed data */
-  nat_ha_resend_scan (vlib_time_now (vm), thread_index);
+  nat_ha_resend_scan (vm, thread_index);
   return 0;
 }
 
index c466d4c..6fb749c 100644 (file)
@@ -62,7 +62,8 @@ void nat_ha_init (vlib_main_t * vm, u32 num_workers, u32 num_threads);
  *
  * @returns 0 on success, non-zero value otherwise.
  */
-int nat_ha_set_listener (ip4_address_t * addr, u16 port, u32 path_mtu);
+int nat_ha_set_listener (vlib_main_t *vm, ip4_address_t *addr, u16 port,
+                        u32 path_mtu);
 
 /**
  * @brief Get HA listener/local configuration
@@ -79,7 +80,7 @@ void nat_ha_get_listener (ip4_address_t * addr, u16 * port, u32 * path_mtu);
  *
  * @returns 0 on success, non-zero value otherwise.
  */
-int nat_ha_set_failover (ip4_address_t * addr, u16 port,
+int nat_ha_set_failover (vlib_main_t *vm, ip4_address_t *addr, u16 port,
                         u32 session_refresh_interval);
 
 /**
@@ -120,11 +121,11 @@ void nat_ha_sadd (ip4_address_t * in_addr, u16 in_port,
  * @param eh_port external host L4 port number
  * @param proto L4 protocol
  * @param fib_index fib index
- * @param thread_index thread index
+ * @param session_thread_index index of thread where this session was stored
  */
-void nat_ha_sdel (ip4_address_t * out_addr, u16 out_port,
-                 ip4_address_t * eh_addr, u16 eh_port, u8 proto,
-                 u32 fib_index, u32 thread_index);
+void nat_ha_sdel (ip4_address_t *out_addr, u16 out_port,
+                 ip4_address_t *eh_addr, u16 eh_port, u8 proto, u32 fib_index,
+                 u32 session_thread_index);
 
 /**
  * @brief Create session refresh HA event
index cd1941a..04ed9a3 100644 (file)
@@ -632,9 +632,9 @@ class MethodHolder(VppTestCase):
         """ Verify that there is no NAT44EI user """
         users = self.vapi.nat44_ei_user_dump()
         self.assertEqual(len(users), 0)
-        users = self.statistics.get_counter('/nat44-ei/total-users')
+        users = self.statistics['/nat44-ei/total-users']
         self.assertEqual(users[0][0], 0)
-        sessions = self.statistics.get_counter('/nat44-ei/total-sessions')
+        sessions = self.statistics['/nat44-ei/total-sessions']
         self.assertEqual(sessions[0][0], 0)
 
     def verify_syslog_apmap(self, data, is_add=True):
@@ -852,6 +852,16 @@ class MethodHolder(VppTestCase):
             self.assertEqual(data, p[Raw].load)
 
 
+def get_nat44_ei_in2out_worker_index(ip, vpp_worker_count):
+    if 0 == vpp_worker_count:
+        return 0
+    numeric = socket.inet_aton(ip)
+    numeric = struct.unpack("!L", numeric)[0]
+    numeric = socket.htonl(numeric)
+    h = numeric + (numeric >> 8) + (numeric >> 16) + (numeric >> 24)
+    return 1 + h % vpp_worker_count
+
+
 @tag_fixme_vpp_workers
 class TestNAT44EI(MethodHolder):
     """ NAT44EI Test Cases """
@@ -963,14 +973,14 @@ class TestNAT44EI(MethodHolder):
         capture = self.pg1.get_capture(len(pkts))
         self.verify_capture_out(capture)
 
-        sessions = self.statistics.get_counter('/nat44-ei/total-sessions')
-        self.assertTrue(sessions[0][0] > 0)
+        sessions = self.statistics['/nat44-ei/total-sessions']
+        self.assertGreater(sessions[:, 0].sum(), 0, "Session count invalid")
         self.logger.info("sessions before clearing: %s" % sessions[0][0])
 
         self.vapi.cli("clear nat44 ei sessions")
 
-        sessions = self.statistics.get_counter('/nat44-ei/total-sessions')
-        self.assertEqual(sessions[0][0], 0)
+        sessions = self.statistics['/nat44-ei/total-sessions']
+        self.assertEqual(sessions[:, 0].sum(), 0, "Session count invalid")
         self.logger.info("sessions after clearing: %s" % sessions[0][0])
 
     def test_dynamic(self):
@@ -985,12 +995,10 @@ class TestNAT44EI(MethodHolder):
             is_add=1)
 
         # in2out
-        tcpn = self.statistics.get_counter('/nat44-ei/in2out/slowpath/tcp')[0]
-        udpn = self.statistics.get_counter('/nat44-ei/in2out/slowpath/udp')[0]
-        icmpn = self.statistics.get_counter(
-            '/nat44-ei/in2out/slowpath/icmp')[0]
-        drops = self.statistics.get_counter(
-            '/nat44-ei/in2out/slowpath/drops')[0]
+        tcpn = self.statistics['/nat44-ei/in2out/slowpath/tcp']
+        udpn = self.statistics['/nat44-ei/in2out/slowpath/udp']
+        icmpn = self.statistics['/nat44-ei/in2out/slowpath/icmp']
+        drops = self.statistics['/nat44-ei/in2out/slowpath/drops']
 
         pkts = self.create_stream_in(self.pg0, self.pg1)
         self.pg0.add_stream(pkts)
@@ -1000,22 +1008,20 @@ class TestNAT44EI(MethodHolder):
         self.verify_capture_out(capture)
 
         if_idx = self.pg0.sw_if_index
-        cnt = self.statistics.get_counter('/nat44-ei/in2out/slowpath/tcp')[0]
-        self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2)
-        cnt = self.statistics.get_counter('/nat44-ei/in2out/slowpath/udp')[0]
-        self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
-        cnt = self.statistics.get_counter('/nat44-ei/in2out/slowpath/icmp')[0]
-        self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
-        cnt = self.statistics.get_counter('/nat44-ei/in2out/slowpath/drops')[0]
-        self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
+        cnt = self.statistics['/nat44-ei/in2out/slowpath/tcp']
+        self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2)
+        cnt = self.statistics['/nat44-ei/in2out/slowpath/udp']
+        self.assertEqual(cnt[:, if_idx].sum() - udpn[:, if_idx].sum(), 1)
+        cnt = self.statistics['/nat44-ei/in2out/slowpath/icmp']
+        self.assertEqual(cnt[:, if_idx].sum() - icmpn[:, if_idx].sum(), 1)
+        cnt = self.statistics['/nat44-ei/in2out/slowpath/drops']
+        self.assertEqual(cnt[:, if_idx].sum() - drops[:, if_idx].sum(), 0)
 
         # out2in
-        tcpn = self.statistics.get_counter('/nat44-ei/out2in/slowpath/tcp')[0]
-        udpn = self.statistics.get_counter('/nat44-ei/out2in/slowpath/udp')[0]
-        icmpn = self.statistics.get_counter(
-            '/nat44-ei/out2in/slowpath/icmp')[0]
-        drops = self.statistics.get_counter(
-            '/nat44-ei/out2in/slowpath/drops')[0]
+        tcpn = self.statistics['/nat44-ei/out2in/slowpath/tcp']
+        udpn = self.statistics['/nat44-ei/out2in/slowpath/udp']
+        icmpn = self.statistics['/nat44-ei/out2in/slowpath/icmp']
+        drops = self.statistics['/nat44-ei/out2in/slowpath/drops']
 
         pkts = self.create_stream_out(self.pg1)
         self.pg1.add_stream(pkts)
@@ -1025,19 +1031,19 @@ class TestNAT44EI(MethodHolder):
         self.verify_capture_in(capture, self.pg0)
 
         if_idx = self.pg1.sw_if_index
-        cnt = self.statistics.get_counter('/nat44-ei/out2in/slowpath/tcp')[0]
-        self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2)
-        cnt = self.statistics.get_counter('/nat44-ei/out2in/slowpath/udp')[0]
-        self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
-        cnt = self.statistics.get_counter('/nat44-ei/out2in/slowpath/icmp')[0]
-        self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
-        cnt = self.statistics.get_counter('/nat44-ei/out2in/slowpath/drops')[0]
-        self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
-
-        users = self.statistics.get_counter('/nat44-ei/total-users')
-        self.assertEqual(users[0][0], 1)
-        sessions = self.statistics.get_counter('/nat44-ei/total-sessions')
-        self.assertEqual(sessions[0][0], 3)
+        cnt = self.statistics['/nat44-ei/out2in/slowpath/tcp']
+        self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2)
+        cnt = self.statistics['/nat44-ei/out2in/slowpath/udp']
+        self.assertEqual(cnt[:, if_idx].sum() - udpn[:, if_idx].sum(), 1)
+        cnt = self.statistics['/nat44-ei/out2in/slowpath/icmp']
+        self.assertEqual(cnt[:, if_idx].sum() - icmpn[:, if_idx].sum(), 1)
+        cnt = self.statistics['/nat44-ei/out2in/slowpath/drops']
+        self.assertEqual(cnt[:, if_idx].sum() - drops[:, if_idx].sum(), 0)
+
+        users = self.statistics['/nat44-ei/total-users']
+        self.assertEqual(users[:, 0].sum(), 1)
+        sessions = self.statistics['/nat44-ei/total-sessions']
+        self.assertEqual(sessions[:, 0].sum(), 3)
 
     def test_dynamic_icmp_errors_in2out_ttl_1(self):
         """ NAT44EI handling of client packets with TTL=1 """
@@ -1828,7 +1834,7 @@ class TestNAT44EI(MethodHolder):
                                       server_in_port, server_out_port,
                                       proto=IP_PROTOS.tcp)
 
-        cnt = self.statistics.get_counter('/nat44-ei/hairpinning')[0]
+        cnt = self.statistics['/nat44-ei/hairpinning']
         # send packet from host to server
         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
              IP(src=host.ip4, dst=self.nat_addr) /
@@ -1851,9 +1857,9 @@ class TestNAT44EI(MethodHolder):
             self.logger.error(ppp("Unexpected or invalid packet:", p))
             raise
 
-        after = self.statistics.get_counter('/nat44-ei/hairpinning')[0]
+        after = self.statistics['/nat44-ei/hairpinning']
         if_idx = self.pg0.sw_if_index
-        self.assertEqual(after[if_idx] - cnt[if_idx], 1)
+        self.assertEqual(after[:, if_idx].sum() - cnt[:, if_idx].sum(), 1)
 
         # send reply from server to host
         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
@@ -1876,9 +1882,10 @@ class TestNAT44EI(MethodHolder):
             self.logger.error(ppp("Unexpected or invalid packet:", p))
             raise
 
-        after = self.statistics.get_counter('/nat44-ei/hairpinning')[0]
+        after = self.statistics['/nat44-ei/hairpinning']
         if_idx = self.pg0.sw_if_index
-        self.assertEqual(after[if_idx] - cnt[if_idx], 2)
+        self.assertEqual(after[:, if_idx].sum() - cnt[:, if_idx].sum(),
+                         2+(1 if self.vpp_worker_count > 0 else 0))
 
     def test_hairpinning2(self):
         """ NAT44EI hairpinning - 1:1 NAT"""
@@ -2086,7 +2093,7 @@ class TestNAT44EI(MethodHolder):
         self.pg_start()
         # Here VPP used to crash due to an infinite loop
 
-        cnt = self.statistics.get_counter('/nat44-ei/hairpinning')[0]
+        cnt = self.statistics['/nat44-ei/hairpinning']
         # send packet from host to server
         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
              IP(src=host.ip4, dst=self.nat_addr) /
@@ -2109,9 +2116,9 @@ class TestNAT44EI(MethodHolder):
             self.logger.error(ppp("Unexpected or invalid packet:", p))
             raise
 
-        after = self.statistics.get_counter('/nat44-ei/hairpinning')[0]
+        after = self.statistics['/nat44-ei/hairpinning']
         if_idx = self.pg0.sw_if_index
-        self.assertEqual(after[if_idx] - cnt[if_idx], 1)
+        self.assertEqual(after[:, if_idx].sum() - cnt[:, if_idx].sum(), 1)
 
         # send reply from server to host
         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
@@ -2134,9 +2141,10 @@ class TestNAT44EI(MethodHolder):
             self.logger.error(ppp("Unexpected or invalid packet:", p))
             raise
 
-        after = self.statistics.get_counter('/nat44-ei/hairpinning')[0]
+        after = self.statistics['/nat44-ei/hairpinning']
         if_idx = self.pg0.sw_if_index
-        self.assertEqual(after[if_idx] - cnt[if_idx], 2)
+        self.assertEqual(after[:, if_idx].sum() - cnt[:, if_idx].sum(),
+                         2+(1 if self.vpp_worker_count > 0 else 0))
 
     def test_interface_addr(self):
         """ NAT44EI acquire addresses from interface """
@@ -2368,7 +2376,8 @@ class TestNAT44EI(MethodHolder):
             sw_if_index=self.pg1.sw_if_index,
             is_add=1)
 
-        max_sessions = self.max_translations
+        max_sessions_per_thread = self.max_translations
+        max_sessions = max(1, self.vpp_worker_count) * max_sessions_per_thread
 
         pkts = []
         for i in range(0, max_sessions):
@@ -2416,7 +2425,7 @@ class TestNAT44EI(MethodHolder):
         for p in capture:
             if p.haslayer(Data):
                 data = ipfix.decode_data_set(p.getlayer(Set))
-                self.verify_ipfix_max_sessions(data, max_sessions)
+                self.verify_ipfix_max_sessions(data, max_sessions_per_thread)
 
     def test_syslog_apmap(self):
         """ NAT44EI syslog address and port mapping creation and deletion """
@@ -3400,8 +3409,8 @@ class TestNAT44EI(MethodHolder):
         self.verify_capture_out(capture)
         # active send HA events
         self.vapi.nat44_ei_ha_flush()
-        stats = self.statistics.get_counter('/nat44-ei/ha/add-event-send')
-        self.assertEqual(stats[0][0], 3)
+        stats = self.statistics['/nat44-ei/ha/add-event-send']
+        self.assertEqual(stats[:, 0].sum(), 3)
         capture = self.pg3.get_capture(1)
         p = capture[0]
         self.assert_packet_checksums_valid(p)
@@ -3418,7 +3427,7 @@ class TestNAT44EI(MethodHolder):
             self.assertEqual(udp.sport, 12345)
             self.assertEqual(udp.dport, 12346)
             self.assertEqual(hanat.version, 1)
-            self.assertEqual(hanat.thread_index, 0)
+            self.assertEqual(hanat.thread_index, 0)
             self.assertEqual(hanat.count, 3)
             seq = hanat.sequence_number
             for event in hanat.events:
@@ -3431,11 +3440,12 @@ class TestNAT44EI(MethodHolder):
         ack = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
                IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
                UDP(sport=12346, dport=12345) /
-               HANATStateSync(sequence_number=seq, flags='ACK'))
+               HANATStateSync(sequence_number=seq, flags='ACK',
+                              thread_index=hanat.thread_index))
         self.pg3.add_stream(ack)
         self.pg_start()
-        stats = self.statistics.get_counter('/nat44-ei/ha/ack-recv')
-        self.assertEqual(stats[0][0], 1)
+        stats = self.statistics['/nat44-ei/ha/ack-recv']
+        self.assertEqual(stats[:, 0].sum(), 1)
 
         # delete one session
         self.pg_enable_capture(self.pg_interfaces)
@@ -3443,8 +3453,8 @@ class TestNAT44EI(MethodHolder):
             address=self.pg0.remote_ip4, port=self.tcp_port_in,
             protocol=IP_PROTOS.tcp, flags=self.config_flags.NAT44_EI_IF_INSIDE)
         self.vapi.nat44_ei_ha_flush()
-        stats = self.statistics.get_counter('/nat44-ei/ha/del-event-send')
-        self.assertEqual(stats[0][0], 1)
+        stats = self.statistics['/nat44-ei/ha/del-event-send']
+        self.assertEqual(stats[:, 0].sum(), 1)
         capture = self.pg3.get_capture(1)
         p = capture[0]
         try:
@@ -3458,10 +3468,10 @@ class TestNAT44EI(MethodHolder):
         # do not send ACK, active retry send HA event again
         self.pg_enable_capture(self.pg_interfaces)
         sleep(12)
-        stats = self.statistics.get_counter('/nat44-ei/ha/retry-count')
-        self.assertEqual(stats[0][0], 3)
-        stats = self.statistics.get_counter('/nat44-ei/ha/missed-count')
-        self.assertEqual(stats[0][0], 1)
+        stats = self.statistics['/nat44-ei/ha/retry-count']
+        self.assertEqual(stats[:, 0].sum(), 3)
+        stats = self.statistics['/nat44-ei/ha/missed-count']
+        self.assertEqual(stats[:, 0].sum(), 1)
         capture = self.pg3.get_capture(3)
         for packet in capture:
             self.assertEqual(packet, p)
@@ -3473,8 +3483,8 @@ class TestNAT44EI(MethodHolder):
         self.pg_start()
         self.pg0.get_capture(2)
         self.vapi.nat44_ei_ha_flush()
-        stats = self.statistics.get_counter('/nat44-ei/ha/refresh-event-send')
-        self.assertEqual(stats[0][0], 2)
+        stats = self.statistics['/nat44-ei/ha/refresh-event-send']
+        self.assertEqual(stats[:, 0].sum(), 2)
         capture = self.pg3.get_capture(1)
         p = capture[0]
         self.assert_packet_checksums_valid(p)
@@ -3500,14 +3510,16 @@ class TestNAT44EI(MethodHolder):
                 self.assertEqual(event.total_pkts, 2)
                 self.assertGreater(event.total_bytes, 0)
 
+        stats = self.statistics['/nat44-ei/ha/ack-recv']
         ack = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
                IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
                UDP(sport=12346, dport=12345) /
-               HANATStateSync(sequence_number=seq, flags='ACK'))
+               HANATStateSync(sequence_number=seq, flags='ACK',
+                              thread_index=hanat.thread_index))
         self.pg3.add_stream(ack)
         self.pg_start()
-        stats = self.statistics.get_counter('/nat44-ei/ha/ack-recv')
-        self.assertEqual(stats[0][0], 2)
+        stats = self.statistics['/nat44-ei/ha/ack-recv']
+        self.assertEqual(stats[:, 0].sum(), 2)
 
     def test_ha_recv(self):
         """ NAT44EI Receive HA session synchronization events (passive) """
@@ -3523,8 +3535,23 @@ class TestNAT44EI(MethodHolder):
                                            port=12345, path_mtu=512)
         bind_layers(UDP, HANATStateSync, sport=12345)
 
-        self.tcp_port_out = random.randint(1025, 65535)
-        self.udp_port_out = random.randint(1025, 65535)
+        # this is a bit tricky - HA dictates thread index due to how it's
+        # designed, but once we use HA to create a session, we also want
+        # to pass a packet through said session. so the session must end
+        # up on the correct thread from both directions - in2out (based on
+        # IP address) and out2in (based on outside port)
+
+        # first choose a thread index which is correct for IP
+        thread_index = get_nat44_ei_in2out_worker_index(self.pg0.remote_ip4,
+                                                        self.vpp_worker_count)
+
+        # now pick a port which is correct for given thread
+        port_per_thread = int((0xffff-1024) / max(1, self.vpp_worker_count))
+        self.tcp_port_out = 1024 + random.randint(1, port_per_thread)
+        self.udp_port_out = 1024 + random.randint(1, port_per_thread)
+        if self.vpp_worker_count > 0:
+            self.tcp_port_out += port_per_thread * (thread_index - 1)
+            self.udp_port_out += port_per_thread * (thread_index - 1)
 
         # send HA session add events to failover/passive
         p = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
@@ -3544,7 +3571,8 @@ class TestNAT44EI(MethodHolder):
                        eh_addr=self.pg1.remote_ip4,
                        ehn_addr=self.pg1.remote_ip4,
                        eh_port=self.udp_external_port,
-                       ehn_port=self.udp_external_port, fib_index=0)]))
+                       ehn_port=self.udp_external_port, fib_index=0)],
+                 thread_index=thread_index))
 
         self.pg3.add_stream(p)
         self.pg_enable_capture(self.pg_interfaces)
@@ -3561,15 +3589,15 @@ class TestNAT44EI(MethodHolder):
             self.assertEqual(hanat.sequence_number, 1)
             self.assertEqual(hanat.flags, 'ACK')
             self.assertEqual(hanat.version, 1)
-            self.assertEqual(hanat.thread_index, 0)
-        stats = self.statistics.get_counter('/nat44-ei/ha/ack-send')
-        self.assertEqual(stats[0][0], 1)
-        stats = self.statistics.get_counter('/nat44-ei/ha/add-event-recv')
-        self.assertEqual(stats[0][0], 2)
-        users = self.statistics.get_counter('/nat44-ei/total-users')
-        self.assertEqual(users[0][0], 1)
-        sessions = self.statistics.get_counter('/nat44-ei/total-sessions')
-        self.assertEqual(sessions[0][0], 2)
+            self.assertEqual(hanat.thread_index, thread_index)
+        stats = self.statistics['/nat44-ei/ha/ack-send']
+        self.assertEqual(stats[:, 0].sum(), 1)
+        stats = self.statistics['/nat44-ei/ha/add-event-recv']
+        self.assertEqual(stats[:, 0].sum(), 2)
+        users = self.statistics['/nat44-ei/total-users']
+        self.assertEqual(users[:, 0].sum(), 1)
+        sessions = self.statistics['/nat44-ei/total-sessions']
+        self.assertEqual(sessions[:, 0].sum(), 2)
         users = self.vapi.nat44_ei_user_dump()
         self.assertEqual(len(users), 1)
         self.assertEqual(str(users[0].ip_address),
@@ -3600,7 +3628,8 @@ class TestNAT44EI(MethodHolder):
                        eh_addr=self.pg1.remote_ip4,
                        ehn_addr=self.pg1.remote_ip4,
                        eh_port=self.udp_external_port,
-                       ehn_port=self.udp_external_port, fib_index=0)]))
+                       ehn_port=self.udp_external_port, fib_index=0)],
+                 thread_index=thread_index))
 
         self.pg3.add_stream(p)
         self.pg_enable_capture(self.pg_interfaces)
@@ -3625,8 +3654,8 @@ class TestNAT44EI(MethodHolder):
         sessions = self.vapi.nat44_ei_user_session_dump(users[0].ip_address,
                                                         users[0].vrf_id)
         self.assertEqual(len(sessions), 1)
-        stats = self.statistics.get_counter('/nat44-ei/ha/del-event-recv')
-        self.assertEqual(stats[0][0], 1)
+        stats = self.statistics['/nat44-ei/ha/del-event-recv']
+        self.assertEqual(stats[:, 0].sum(), 1)
 
         stats = self.statistics.get_err_counter(
             '/err/nat44-ei-ha/pkts-processed')
@@ -3644,7 +3673,8 @@ class TestNAT44EI(MethodHolder):
                        ehn_addr=self.pg1.remote_ip4,
                        eh_port=self.tcp_external_port,
                        ehn_port=self.tcp_external_port, fib_index=0,
-                       total_bytes=1024, total_pkts=2)]))
+                       total_bytes=1024, total_pkts=2)],
+                 thread_index=thread_index))
         self.pg3.add_stream(p)
         self.pg_enable_capture(self.pg_interfaces)
         self.pg_start()
@@ -3670,9 +3700,8 @@ class TestNAT44EI(MethodHolder):
         session = sessions[0]
         self.assertEqual(session.total_bytes, 1024)
         self.assertEqual(session.total_pkts, 2)
-        stats = self.statistics.get_counter(
-            '/nat44-ei/ha/refresh-event-recv')
-        self.assertEqual(stats[0][0], 1)
+        stats = self.statistics['/nat44-ei/ha/refresh-event-recv']
+        self.assertEqual(stats[:, 0].sum(), 1)
 
         stats = self.statistics.get_err_counter(
             '/err/nat44-ei-ha/pkts-processed')
@@ -3898,9 +3927,7 @@ class TestNAT44Out2InDPO(MethodHolder):
 
 class TestNAT44EIMW(MethodHolder):
     """ NAT44EI Test Cases (multiple workers) """
-
     vpp_worker_count = 2
-
     max_translations = 10240
     max_users = 10240
 
@@ -4013,7 +4040,7 @@ class TestNAT44EIMW(MethodHolder):
                                       server_in_port, server_out_port,
                                       proto=IP_PROTOS.tcp)
 
-        cnt = self.statistics.get_counter('/nat44-ei/hairpinning')
+        cnt = self.statistics['/nat44-ei/hairpinning']
         # send packet from host to server
         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
              IP(src=host.ip4, dst=self.nat_addr) /
@@ -4036,7 +4063,7 @@ class TestNAT44EIMW(MethodHolder):
             self.logger.error(ppp("Unexpected or invalid packet:", p))
             raise
 
-        after = self.statistics.get_counter('/nat44-ei/hairpinning')
+        after = self.statistics['/nat44-ei/hairpinning']
 
         if_idx = self.pg0.sw_if_index
         self.assertEqual(after[worker_2][if_idx] - cnt[worker_1][if_idx], 1)
@@ -4062,7 +4089,7 @@ class TestNAT44EIMW(MethodHolder):
             self.logger.error(ppp("Unexpected or invalid packet:", p))
             raise
 
-        after = self.statistics.get_counter('/nat44-ei/hairpinning')
+        after = self.statistics['/nat44-ei/hairpinning']
         if_idx = self.pg0.sw_if_index
         self.assertEqual(after[worker_1][if_idx] - cnt[worker_1][if_idx], 1)
         self.assertEqual(after[worker_2][if_idx] - cnt[worker_2][if_idx], 2)