From: Klement Sekera Date: Wed, 31 Mar 2021 11:38:09 +0000 (+0200) Subject: nat: fix multi worker scenarios X-Git-Tag: v21.10-rc0~220 X-Git-Url: https://gerrit.fd.io/r/gitweb?a=commitdiff_plain;h=1fbf034e3ea97d507e41420248c99da7bdc916bf;p=vpp.git nat: fix multi worker scenarios Properly select worker from hash table in out2in direction, increase number of worker threads in MW test to 4 to test these cases. Type: fix Change-Id: I76eda5761ff497b85b031dd913a64b7fcb53b33a Signed-off-by: Klement Sekera --- diff --git a/src/plugins/nat/nat44-ed/nat44_ed.c b/src/plugins/nat/nat44-ed/nat44_ed.c index 99029bb83b8..58aade568b9 100644 --- a/src/plugins/nat/nat44-ed/nat44_ed.c +++ b/src/plugins/nat/nat44-ed/nat44_ed.c @@ -2603,7 +2603,6 @@ nat44_ed_get_out2in_worker_index (vlib_buffer_t *b, ip4_header_t *ip, snat_main_t *sm = &snat_main; clib_bihash_kv_8_8_t kv, value; clib_bihash_kv_16_8_t kv16, value16; - snat_main_per_thread_data_t *tsm; u32 proto, next_worker_index = 0; u16 port; @@ -2612,29 +2611,7 @@ nat44_ed_get_out2in_worker_index (vlib_buffer_t *b, ip4_header_t *ip, proto = ip_proto_to_nat_proto (ip->protocol); - if (PREDICT_TRUE (proto == NAT_PROTOCOL_UDP || proto == NAT_PROTOCOL_TCP)) - { - init_ed_k (&kv16, ip->dst_address, vnet_buffer (b)->ip.reass.l4_dst_port, - ip->src_address, vnet_buffer (b)->ip.reass.l4_src_port, - rx_fib_index, ip->protocol); - - if (PREDICT_TRUE ( - !clib_bihash_search_16_8 (&sm->flow_hash, &kv16, &value16))) - { - tsm = - vec_elt_at_index (sm->per_thread_data, - ed_value_get_thread_index (&value16)); - vnet_buffer2 (b)->nat.cached_session_index = - ed_value_get_session_index (&value16); - next_worker_index = sm->first_worker_index + tsm->thread_index; - nat_elog_debug_handoff ( - sm, "HANDOFF OUT2IN (session)", next_worker_index, rx_fib_index, - clib_net_to_host_u32 (ip->src_address.as_u32), - clib_net_to_host_u32 (ip->dst_address.as_u32)); - return next_worker_index; - } - } - else if (proto == NAT_PROTOCOL_ICMP) + if (PREDICT_FALSE (proto == NAT_PROTOCOL_ICMP)) { ip4_address_t lookup_saddr, lookup_daddr; u16 lookup_sport, lookup_dport; @@ -2648,10 +2625,7 @@ nat44_ed_get_out2in_worker_index (vlib_buffer_t *b, ip4_header_t *ip, if (PREDICT_TRUE ( !clib_bihash_search_16_8 (&sm->flow_hash, &kv16, &value16))) { - tsm = - vec_elt_at_index (sm->per_thread_data, - ed_value_get_thread_index (&value16)); - next_worker_index = sm->first_worker_index + tsm->thread_index; + next_worker_index = ed_value_get_thread_index (&value16); nat_elog_debug_handoff ( sm, "HANDOFF OUT2IN (session)", next_worker_index, rx_fib_index, clib_net_to_host_u32 (ip->src_address.as_u32), @@ -2661,6 +2635,23 @@ nat44_ed_get_out2in_worker_index (vlib_buffer_t *b, ip4_header_t *ip, } } + init_ed_k (&kv16, ip->src_address, vnet_buffer (b)->ip.reass.l4_src_port, + ip->dst_address, vnet_buffer (b)->ip.reass.l4_dst_port, + rx_fib_index, ip->protocol); + + if (PREDICT_TRUE ( + !clib_bihash_search_16_8 (&sm->flow_hash, &kv16, &value16))) + { + vnet_buffer2 (b)->nat.cached_session_index = + ed_value_get_session_index (&value16); + next_worker_index = ed_value_get_thread_index (&value16); + nat_elog_debug_handoff (sm, "HANDOFF OUT2IN (session)", + next_worker_index, rx_fib_index, + clib_net_to_host_u32 (ip->src_address.as_u32), + clib_net_to_host_u32 (ip->dst_address.as_u32)); + return next_worker_index; + } + /* first try static mappings without port */ if (PREDICT_FALSE (pool_elts (sm->static_mappings))) { diff --git a/src/plugins/nat/nat44-ed/nat44_ed_out2in.c b/src/plugins/nat/nat44-ed/nat44_ed_out2in.c index f46433c9e53..e8cf7c930fa 100644 --- a/src/plugins/nat/nat44-ed/nat44_ed_out2in.c +++ b/src/plugins/nat/nat44-ed/nat44_ed_out2in.c @@ -37,6 +37,15 @@ static char *nat_out2in_ed_error_strings[] = { #undef _ }; +typedef enum +{ + NAT_ED_SP_REASON_NO_REASON, + NAT_ED_SP_REASON_LOOKUP_FAILED, + NAT_ED_SP_REASON_VRF_EXPIRED, + NAT_ED_SP_TCP_CLOSED, + NAT_ED_SP_SESS_EXPIRED, +} nat_slow_path_reason_e; + typedef struct { u32 sw_if_index; @@ -49,8 +58,29 @@ typedef struct u8 is_slow_path; u8 translation_via_i2of; u8 lookup_skipped; + nat_slow_path_reason_e slow_path_reason; } nat44_ed_out2in_trace_t; +static u8 * +format_slow_path_reason (u8 *s, va_list *args) +{ + nat_slow_path_reason_e reason = va_arg (*args, nat_slow_path_reason_e); + switch (reason) + { + case NAT_ED_SP_REASON_NO_REASON: + return format (s, "no reason for slow path"); + case NAT_ED_SP_REASON_LOOKUP_FAILED: + return format (s, "slow path because lookup failed"); + case NAT_ED_SP_REASON_VRF_EXPIRED: + return format (s, "slow path because vrf expired"); + case NAT_ED_SP_TCP_CLOSED: + return format (s, "slow path because tcp closed"); + case NAT_ED_SP_SESS_EXPIRED: + return format (s, "slow path because session expired"); + } + return format (s, "invalid reason value"); +} + static u8 * format_nat44_ed_out2in_trace (u8 * s, va_list * args) { @@ -85,6 +115,7 @@ format_nat44_ed_out2in_trace (u8 * s, va_list * args) s = format (s, "\n search key %U", format_ed_session_kvp, &t->search_key); } + s = format (s, "\n %U", format_slow_path_reason, t->slow_path_reason); } return s; @@ -790,6 +821,7 @@ nat44_ed_out2in_fast_path_node_fn_inline (vlib_main_t * vm, snat_session_t *s0 = 0; clib_bihash_kv_16_8_t kv0, value0; nat_translation_error_e translation_error = NAT_ED_TRNSL_ERR_SUCCESS; + nat_slow_path_reason_e slow_path_reason = NAT_ED_SP_REASON_NO_REASON; nat_6t_flow_t *f = 0; nat_6t_t lookup; int lookup_skipped = 0; @@ -891,6 +923,7 @@ nat44_ed_out2in_fast_path_node_fn_inline (vlib_main_t * vm, if (clib_bihash_search_16_8 (&sm->flow_hash, &kv0, &value0)) { // flow does not exist go slow path + slow_path_reason = NAT_ED_SP_REASON_LOOKUP_FAILED; next[0] = NAT_NEXT_OUT2IN_ED_SLOW_PATH; goto trace0; } @@ -905,6 +938,7 @@ nat44_ed_out2in_fast_path_node_fn_inline (vlib_main_t * vm, // session is closed, go slow path nat_free_session_data (sm, s0, thread_index, 0); nat_ed_session_delete (sm, s0, thread_index, 1); + slow_path_reason = NAT_ED_SP_REASON_VRF_EXPIRED; next[0] = NAT_NEXT_OUT2IN_ED_SLOW_PATH; goto trace0; } @@ -914,6 +948,7 @@ nat44_ed_out2in_fast_path_node_fn_inline (vlib_main_t * vm, if (now >= s0->tcp_closed_timestamp) { // session is closed, go slow path, freed in slow path + slow_path_reason = NAT_ED_SP_TCP_CLOSED; next[0] = NAT_NEXT_OUT2IN_ED_SLOW_PATH; } else @@ -934,6 +969,7 @@ nat44_ed_out2in_fast_path_node_fn_inline (vlib_main_t * vm, // session is closed, go slow path nat_free_session_data (sm, s0, thread_index, 0); nat_ed_session_delete (sm, s0, thread_index, 1); + slow_path_reason = NAT_ED_SP_SESS_EXPIRED; next[0] = NAT_NEXT_OUT2IN_ED_SLOW_PATH; goto trace0; } @@ -1042,6 +1078,7 @@ nat44_ed_out2in_fast_path_node_fn_inline (vlib_main_t * vm, t->translation_error = translation_error; clib_memcpy (&t->search_key, &kv0, sizeof (t->search_key)); t->lookup_skipped = lookup_skipped; + t->slow_path_reason = slow_path_reason; if (s0) { diff --git a/src/plugins/nat/test/test_nat44_ed.py b/src/plugins/nat/test/test_nat44_ed.py index 7f61eed5a41..2ce7f23dac9 100644 --- a/src/plugins/nat/test/test_nat44_ed.py +++ b/src/plugins/nat/test/test_nat44_ed.py @@ -2,7 +2,7 @@ import unittest from io import BytesIO -from random import randint +from random import randint, shuffle, choice import scapy.compat from framework import VppTestCase, VppTestRunner @@ -1953,7 +1953,8 @@ class TestNAT44ED(NAT44EDTestCase): class TestNAT44EDMW(TestNAT44ED): """ NAT44ED MW Test Case """ - vpp_worker_count = 1 + vpp_worker_count = 4 + max_sessions = 5000 @unittest.skip('MW fix required') def test_users_dump(self): @@ -2014,6 +2015,10 @@ class TestNAT44EDMW(TestNAT44ED): def test_dynamic(self): """ NAT44ED dynamic translation test """ + pkt_count = 1500 + tcp_port_offset = 20 + udp_port_offset = 20 + icmp_id_offset = 20 self.nat_add_address(self.nat_addr) self.nat_add_inside_interface(self.pg0) @@ -2025,14 +2030,31 @@ class TestNAT44EDMW(TestNAT44ED): ic1 = self.statistics['/nat44-ed/in2out/slowpath/icmp'] dc1 = self.statistics['/nat44-ed/in2out/slowpath/drops'] - pkts = self.create_stream_in(self.pg0, self.pg1) - # TODO: specify worker=idx, also stats have to - # know from which worker to take capture - self.pg0.add_stream(pkts) + i2o_pkts = [[] for x in range(0, self.vpp_worker_count)] + + for i in range(pkt_count): + p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / + IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / + TCP(sport=tcp_port_offset + i, dport=20)) + i2o_pkts[p[TCP].sport % self.vpp_worker_count].append(p) + + p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / + IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / + UDP(sport=udp_port_offset + i, dport=20)) + i2o_pkts[p[UDP].sport % self.vpp_worker_count].append(p) + + p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / + IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / + ICMP(id=icmp_id_offset + i, type='echo-request')) + i2o_pkts[p[ICMP].id % self.vpp_worker_count].append(p) + + for i in range(0, self.vpp_worker_count): + if len(i2o_pkts[i]) > 0: + self.pg0.add_stream(i2o_pkts[i], worker=i) + self.pg_enable_capture(self.pg_interfaces) self.pg_start() - capture = self.pg1.get_capture(len(pkts)) - self.verify_capture_out(capture, ignore_port=True) + capture = self.pg1.get_capture(pkt_count * 3) if_idx = self.pg0.sw_if_index tc2 = self.statistics['/nat44-ed/in2out/slowpath/tcp'] @@ -2040,23 +2062,82 @@ class TestNAT44EDMW(TestNAT44ED): ic2 = self.statistics['/nat44-ed/in2out/slowpath/icmp'] dc2 = self.statistics['/nat44-ed/in2out/slowpath/drops'] - self.assertEqual(tc2[:, if_idx].sum() - tc1[:, if_idx].sum(), 2) - self.assertEqual(uc2[:, if_idx].sum() - uc1[:, if_idx].sum(), 1) - self.assertEqual(ic2[:, if_idx].sum() - ic1[:, if_idx].sum(), 1) + self.assertEqual( + tc2[:, if_idx].sum() - tc1[:, if_idx].sum(), pkt_count) + self.assertEqual( + uc2[:, if_idx].sum() - uc1[:, if_idx].sum(), pkt_count) + self.assertEqual( + ic2[:, if_idx].sum() - ic1[:, if_idx].sum(), pkt_count) self.assertEqual(dc2[:, if_idx].sum() - dc1[:, if_idx].sum(), 0) + self.logger.info(self.vapi.cli("show trace")) + # out2in tc1 = self.statistics['/nat44-ed/out2in/fastpath/tcp'] uc1 = self.statistics['/nat44-ed/out2in/fastpath/udp'] ic1 = self.statistics['/nat44-ed/out2in/fastpath/icmp'] dc1 = self.statistics['/nat44-ed/out2in/fastpath/drops'] - pkts = self.create_stream_out(self.pg1) - self.pg1.add_stream(pkts) + recvd_tcp_ports = set() + recvd_udp_ports = set() + recvd_icmp_ids = set() + + for p in capture: + if TCP in p: + recvd_tcp_ports.add(p[TCP].sport) + if UDP in p: + recvd_udp_ports.add(p[UDP].sport) + if ICMP in p: + recvd_icmp_ids.add(p[ICMP].id) + + recvd_tcp_ports = list(recvd_tcp_ports) + recvd_udp_ports = list(recvd_udp_ports) + recvd_icmp_ids = list(recvd_icmp_ids) + + o2i_pkts = [[] for x in range(0, self.vpp_worker_count)] + for i in range(pkt_count): + p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / + IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / + TCP(dport=choice(recvd_tcp_ports), sport=20)) + o2i_pkts[p[TCP].dport % self.vpp_worker_count].append(p) + + p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / + IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / + UDP(dport=choice(recvd_udp_ports), sport=20)) + o2i_pkts[p[UDP].dport % self.vpp_worker_count].append(p) + + p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / + IP(src=self.pg1.remote_ip4, dst=self.nat_addr) / + ICMP(id=choice(recvd_icmp_ids), type='echo-reply')) + o2i_pkts[p[ICMP].id % self.vpp_worker_count].append(p) + + for i in range(0, self.vpp_worker_count): + if len(o2i_pkts[i]) > 0: + self.pg1.add_stream(o2i_pkts[i], worker=i) + self.pg_enable_capture(self.pg_interfaces) self.pg_start() - capture = self.pg0.get_capture(len(pkts)) - self.verify_capture_in(capture, self.pg0) + capture = self.pg0.get_capture(pkt_count * 3) + for packet in capture: + try: + self.assert_packet_checksums_valid(packet) + self.assertEqual(packet[IP].dst, self.pg0.remote_ip4) + if packet.haslayer(TCP): + self.assert_in_range( + packet[TCP].dport, tcp_port_offset, + tcp_port_offset + pkt_count, "dst TCP port") + elif packet.haslayer(UDP): + self.assert_in_range( + packet[UDP].dport, udp_port_offset, + udp_port_offset + pkt_count, "dst UDP port") + else: + self.assert_in_range( + packet[ICMP].id, icmp_id_offset, + icmp_id_offset + pkt_count, "ICMP id") + except: + self.logger.error(ppp("Unexpected or invalid packet " + "(inside network):", packet)) + raise if_idx = self.pg1.sw_if_index tc2 = self.statistics['/nat44-ed/out2in/fastpath/tcp'] @@ -2064,13 +2145,17 @@ class TestNAT44EDMW(TestNAT44ED): ic2 = self.statistics['/nat44-ed/out2in/fastpath/icmp'] dc2 = self.statistics['/nat44-ed/out2in/fastpath/drops'] - self.assertEqual(tc2[:, if_idx].sum() - tc1[:, if_idx].sum(), 2) - self.assertEqual(uc2[:, if_idx].sum() - uc1[:, if_idx].sum(), 1) - self.assertEqual(ic2[:, if_idx].sum() - ic1[:, if_idx].sum(), 1) + self.assertEqual( + tc2[:, if_idx].sum() - tc1[:, if_idx].sum(), pkt_count) + self.assertEqual( + uc2[:, if_idx].sum() - uc1[:, if_idx].sum(), pkt_count) + self.assertEqual( + ic2[:, if_idx].sum() - ic1[:, if_idx].sum(), pkt_count) self.assertEqual(dc2[:, if_idx].sum() - dc1[:, if_idx].sum(), 0) sc = self.statistics['/nat44-ed/total-sessions'] - self.assertEqual(sc[:, 0].sum(), 3) + self.assertEqual(sc[:, 0].sum(), len(recvd_tcp_ports) + + len(recvd_udp_ports) + len(recvd_icmp_ids)) def test_frag_in_order(self): """ NAT44ED translate fragments arriving in order """ @@ -2697,7 +2782,7 @@ class TestNAT44EDMW(TestNAT44ED): server1_n += 1 else: server2_n += 1 - self.assertGreater(server1_n, server2_n) + self.assertGreaterEqual(server1_n, server2_n) local = { 'addr': server3.ip4,