X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_snat.py;h=b85c3dfeff4ab7e4bd41997be0a8d92441d2010f;hb=406eb1df440ded950533780b2b7bbab10fc3da10;hp=fe5af418aeaf784b0f21c2b1660417f1052b5797;hpb=6bc8c6493c8e7e1401130f8c9ca34c9b2915a7ad;p=vpp.git diff --git a/test/test_snat.py b/test/test_snat.py index fe5af418aea..b85c3dfeff4 100644 --- a/test/test_snat.py +++ b/test/test_snat.py @@ -313,7 +313,7 @@ class TestSNAT(MethodHolder): cls.icmp_id_out = 6305 cls.snat_addr = '10.0.0.3' - cls.create_pg_interfaces(range(8)) + cls.create_pg_interfaces(range(9)) cls.interfaces = list(cls.pg_interfaces[0:4]) for i in cls.interfaces: @@ -344,6 +344,7 @@ class TestSNAT(MethodHolder): i.resolve_arp() cls.pg7.admin_up() + cls.pg8.admin_up() except Exception: super(TestSNAT, cls).tearDownClass() @@ -353,6 +354,26 @@ class TestSNAT(MethodHolder): """ Clear SNAT configuration. """ + # I found no elegant way to do this + self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n, + dst_address_length=32, + next_hop_address=self.pg7.remote_ip4n, + next_hop_sw_if_index=self.pg7.sw_if_index, + is_add=0) + self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n, + dst_address_length=32, + next_hop_address=self.pg8.remote_ip4n, + next_hop_sw_if_index=self.pg8.sw_if_index, + is_add=0) + + for intf in [self.pg7, self.pg8]: + neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index) + for n in neighbors: + self.vapi.ip_neighbor_add_del(intf.sw_if_index, + n.mac_address, + n.ip_address, + is_add=0) + if self.pg7.has_ip4_config: self.pg7.unconfig_ip4() @@ -1311,6 +1332,145 @@ class TestSNAT(MethodHolder): capture = self.pg2.get_capture(len(pkts)) self.verify_capture_out(capture, nat_ip1) + def test_dynamic_ipless_interfaces(self): + """ SNAT interfaces without configured ip dynamic map """ + + self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index, + self.pg7.remote_mac, + self.pg7.remote_ip4n, + is_static=1) + self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index, + self.pg8.remote_mac, + self.pg8.remote_ip4n, + is_static=1) + + self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n, + dst_address_length=32, + next_hop_address=self.pg7.remote_ip4n, + next_hop_sw_if_index=self.pg7.sw_if_index) + self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n, + dst_address_length=32, + next_hop_address=self.pg8.remote_ip4n, + next_hop_sw_if_index=self.pg8.sw_if_index) + + self.snat_add_address(self.snat_addr) + self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index) + self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index, + is_inside=0) + + # in2out + pkts = self.create_stream_in(self.pg7, self.pg8) + self.pg7.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg8.get_capture(len(pkts)) + self.verify_capture_out(capture) + + # out2in + pkts = self.create_stream_out(self.pg8, self.snat_addr) + self.pg8.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg7.get_capture(len(pkts)) + self.verify_capture_in(capture, self.pg7) + + def test_static_ipless_interfaces(self): + """ SNAT 1:1 NAT interfaces without configured ip """ + + self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index, + self.pg7.remote_mac, + self.pg7.remote_ip4n, + is_static=1) + self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index, + self.pg8.remote_mac, + self.pg8.remote_ip4n, + is_static=1) + + self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n, + dst_address_length=32, + next_hop_address=self.pg7.remote_ip4n, + next_hop_sw_if_index=self.pg7.sw_if_index) + self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n, + dst_address_length=32, + next_hop_address=self.pg8.remote_ip4n, + next_hop_sw_if_index=self.pg8.sw_if_index) + + self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr) + self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index) + self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index, + is_inside=0) + + # out2in + pkts = self.create_stream_out(self.pg8) + self.pg8.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg7.get_capture(len(pkts)) + self.verify_capture_in(capture, self.pg7) + + # in2out + pkts = self.create_stream_in(self.pg7, self.pg8) + self.pg7.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg8.get_capture(len(pkts)) + self.verify_capture_out(capture, self.snat_addr, True) + + def test_static_with_port_ipless_interfaces(self): + """ SNAT 1:1 NAT with port interfaces without configured ip """ + + self.tcp_port_out = 30606 + self.udp_port_out = 30607 + self.icmp_id_out = 30608 + + self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index, + self.pg7.remote_mac, + self.pg7.remote_ip4n, + is_static=1) + self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index, + self.pg8.remote_mac, + self.pg8.remote_ip4n, + is_static=1) + + self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n, + dst_address_length=32, + next_hop_address=self.pg7.remote_ip4n, + next_hop_sw_if_index=self.pg7.sw_if_index) + self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n, + dst_address_length=32, + next_hop_address=self.pg8.remote_ip4n, + next_hop_sw_if_index=self.pg8.sw_if_index) + + self.snat_add_address(self.snat_addr) + self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr, + self.tcp_port_in, self.tcp_port_out, + proto=IP_PROTOS.tcp) + self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr, + self.udp_port_in, self.udp_port_out, + proto=IP_PROTOS.udp) + self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr, + self.icmp_id_in, self.icmp_id_out, + proto=IP_PROTOS.icmp) + self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index) + self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index, + is_inside=0) + + # out2in + pkts = self.create_stream_out(self.pg8) + self.pg8.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg7.get_capture(len(pkts)) + self.verify_capture_in(capture, self.pg7) + + # in2out + pkts = self.create_stream_in(self.pg7, self.pg8) + self.pg7.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg8.get_capture(len(pkts)) + self.verify_capture_out(capture) + def tearDown(self): super(TestSNAT, self).tearDown() if not self.vpp_dead: @@ -1332,9 +1492,9 @@ class TestDeterministicNAT(MethodHolder): try: cls.tcp_port_in = 6303 - cls.tcp_port_out = 6303 + cls.tcp_external_port = 6303 cls.udp_port_in = 6304 - cls.udp_port_out = 6304 + cls.udp_external_port = 6304 cls.icmp_id_in = 6305 cls.snat_addr = '10.0.0.3' @@ -1365,13 +1525,13 @@ class TestDeterministicNAT(MethodHolder): # TCP p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) / - TCP(sport=self.tcp_port_in, dport=self.tcp_port_out)) + TCP(sport=self.tcp_port_in, dport=self.tcp_external_port)) pkts.append(p) # UDP p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) / - UDP(sport=self.udp_port_in, dport=self.udp_port_out)) + UDP(sport=self.udp_port_in, dport=self.udp_external_port)) pkts.append(p) # ICMP @@ -1396,13 +1556,13 @@ class TestDeterministicNAT(MethodHolder): # TCP p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / - TCP(dport=self.tcp_external_port, sport=self.tcp_port_out)) + TCP(dport=self.tcp_port_out, sport=self.tcp_external_port)) pkts.append(p) # UDP p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / - UDP(dport=self.udp_external_port, sport=self.udp_port_out)) + UDP(dport=self.udp_port_out, sport=self.udp_external_port)) pkts.append(p) # ICMP @@ -1429,9 +1589,9 @@ class TestDeterministicNAT(MethodHolder): try: self.assertEqual(packet[IP].src, nat_ip) if packet.haslayer(TCP): - self.tcp_external_port = packet[TCP].sport + self.tcp_port_out = packet[TCP].sport elif packet.haslayer(UDP): - self.udp_external_port = packet[UDP].sport + self.udp_port_out = packet[UDP].sport else: self.icmp_external_id = packet[ICMP].id except: @@ -1450,19 +1610,19 @@ class TestDeterministicNAT(MethodHolder): # SYN packet in->out p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) / - TCP(sport=self.tcp_port_in, dport=self.tcp_port_out, + TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="S")) in_if.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = out_if.get_capture(1) p = capture[0] - self.tcp_external_port = p[TCP].sport + self.tcp_port_out = p[TCP].sport # SYN + ACK packet out->in p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) / IP(src=out_if.remote_ip4, dst=self.snat_addr) / - TCP(sport=self.tcp_port_out, dport=self.tcp_external_port, + TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, flags="SA")) out_if.add_stream(p) self.pg_enable_capture(self.pg_interfaces) @@ -1472,7 +1632,7 @@ class TestDeterministicNAT(MethodHolder): # ACK packet in->out p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) / IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) / - TCP(sport=self.tcp_port_in, dport=self.tcp_port_out, + TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="A")) in_if.add_stream(p) self.pg_enable_capture(self.pg_interfaces) @@ -1553,12 +1713,6 @@ class TestDeterministicNAT(MethodHolder): """ CGNAT translation test (TCP, UDP, ICMP) """ nat_ip = "10.0.0.10" - self.tcp_port_out = 6303 - self.udp_port_out = 6304 - self.icmp_id_in = 6305 - self.tcp_external_port = 7303 - self.udp_external_port = 7304 - self.icmp_id_out = 7305 self.vapi.snat_add_det_map(self.pg0.remote_ip4n, 32, @@ -1584,12 +1738,36 @@ class TestDeterministicNAT(MethodHolder): capture = self.pg0.get_capture(len(pkts)) self.verify_capture_in(capture, self.pg0) + # session dump test + sessions = self.vapi.snat_det_session_dump(self.pg0.remote_ip4n) + self.assertEqual(len(sessions), 3) + + # TCP session + s = sessions[0] + self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n) + self.assertEqual(s.in_port, self.tcp_port_in) + self.assertEqual(s.out_port, self.tcp_port_out) + self.assertEqual(s.ext_port, self.tcp_external_port) + + # UDP session + s = sessions[1] + self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n) + self.assertEqual(s.in_port, self.udp_port_in) + self.assertEqual(s.out_port, self.udp_port_out) + self.assertEqual(s.ext_port, self.udp_external_port) + + # ICMP session + s = sessions[2] + self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n) + self.assertEqual(s.in_port, self.icmp_id_in) + self.assertEqual(s.out_port, self.icmp_external_id) + def test_multiple_users(self): """ CGNAT multiple users """ nat_ip = "10.0.0.10" port_in = 80 - port_out = 6303 + external_port = 6303 host0 = self.pg0.remote_hosts[0] host1 = self.pg0.remote_hosts[1] @@ -1605,7 +1783,7 @@ class TestDeterministicNAT(MethodHolder): # host0 to out p = (Ether(src=host0.mac, dst=self.pg0.local_mac) / IP(src=host0.ip4, dst=self.pg1.remote_ip4) / - TCP(sport=port_in, dport=port_out)) + TCP(sport=port_in, dport=external_port)) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1616,8 +1794,8 @@ class TestDeterministicNAT(MethodHolder): tcp = p[TCP] self.assertEqual(ip.src, nat_ip) self.assertEqual(ip.dst, self.pg1.remote_ip4) - self.assertEqual(tcp.dport, port_out) - external_port0 = tcp.sport + self.assertEqual(tcp.dport, external_port) + port_out0 = tcp.sport except: self.logger.error(ppp("Unexpected or invalid packet:", p)) raise @@ -1625,7 +1803,7 @@ class TestDeterministicNAT(MethodHolder): # host1 to out p = (Ether(src=host1.mac, dst=self.pg0.local_mac) / IP(src=host1.ip4, dst=self.pg1.remote_ip4) / - TCP(sport=port_in, dport=port_out)) + TCP(sport=port_in, dport=external_port)) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1636,8 +1814,8 @@ class TestDeterministicNAT(MethodHolder): tcp = p[TCP] self.assertEqual(ip.src, nat_ip) self.assertEqual(ip.dst, self.pg1.remote_ip4) - self.assertEqual(tcp.dport, port_out) - external_port1 = tcp.sport + self.assertEqual(tcp.dport, external_port) + port_out1 = tcp.sport except: self.logger.error(ppp("Unexpected or invalid packet:", p)) raise @@ -1649,7 +1827,7 @@ class TestDeterministicNAT(MethodHolder): # out to host0 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / IP(src=self.pg1.remote_ip4, dst=nat_ip) / - TCP(sport=port_out, dport=external_port0)) + TCP(sport=external_port, dport=port_out0)) self.pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1661,7 +1839,7 @@ class TestDeterministicNAT(MethodHolder): self.assertEqual(ip.src, self.pg1.remote_ip4) self.assertEqual(ip.dst, host0.ip4) self.assertEqual(tcp.dport, port_in) - self.assertEqual(tcp.sport, port_out) + self.assertEqual(tcp.sport, external_port) except: self.logger.error(ppp("Unexpected or invalid packet:", p)) raise @@ -1669,7 +1847,7 @@ class TestDeterministicNAT(MethodHolder): # out to host1 p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / IP(src=self.pg1.remote_ip4, dst=nat_ip) / - TCP(sport=port_out, dport=external_port1)) + TCP(sport=external_port, dport=port_out1)) self.pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() @@ -1681,23 +1859,23 @@ class TestDeterministicNAT(MethodHolder): self.assertEqual(ip.src, self.pg1.remote_ip4) self.assertEqual(ip.dst, host1.ip4) self.assertEqual(tcp.dport, port_in) - self.assertEqual(tcp.sport, port_out) + self.assertEqual(tcp.sport, external_port) except: self.logger.error(ppp("Unexpected or invalid packet", p)) raise # session close api test self.vapi.snat_det_close_session_out(socket.inet_aton(nat_ip), - external_port1, + port_out1, self.pg1.remote_ip4n, - port_out) + external_port) dms = self.vapi.snat_det_map_dump() self.assertEqual(dms[0].ses_num, 1) self.vapi.snat_det_close_session_in(host0.ip4n, port_in, self.pg1.remote_ip4n, - port_out) + external_port) dms = self.vapi.snat_det_map_dump() self.assertEqual(dms[0].ses_num, 0) @@ -1718,7 +1896,7 @@ class TestDeterministicNAT(MethodHolder): # FIN packet in -> out p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=self.tcp_port_in, dport=self.tcp_port_out, + TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="F")) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) @@ -1730,14 +1908,14 @@ class TestDeterministicNAT(MethodHolder): # ACK packet out -> in p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / IP(src=self.pg1.remote_ip4, dst=self.snat_addr) / - TCP(sport=self.tcp_port_out, dport=self.tcp_external_port, + TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, flags="A")) pkts.append(p) # FIN packet out -> in p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / IP(src=self.pg1.remote_ip4, dst=self.snat_addr) / - TCP(sport=self.tcp_port_out, dport=self.tcp_external_port, + TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, flags="F")) pkts.append(p) @@ -1749,7 +1927,7 @@ class TestDeterministicNAT(MethodHolder): # ACK packet in -> out p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=self.tcp_port_in, dport=self.tcp_port_out, + TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="A")) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) @@ -1780,7 +1958,7 @@ class TestDeterministicNAT(MethodHolder): # FIN packet out -> in p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / IP(src=self.pg1.remote_ip4, dst=self.snat_addr) / - TCP(sport=self.tcp_port_out, dport=self.tcp_external_port, + TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, flags="F")) self.pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) @@ -1792,14 +1970,14 @@ class TestDeterministicNAT(MethodHolder): # ACK packet in -> out p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=self.tcp_port_in, dport=self.tcp_port_out, + TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="A")) pkts.append(p) # ACK packet in -> out p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - TCP(sport=self.tcp_port_in, dport=self.tcp_port_out, + TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, flags="F")) pkts.append(p) @@ -1811,7 +1989,7 @@ class TestDeterministicNAT(MethodHolder): # ACK packet out -> in p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / IP(src=self.pg1.remote_ip4, dst=self.snat_addr) / - TCP(sport=self.tcp_port_out, dport=self.tcp_external_port, + TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, flags="A")) self.pg1.add_stream(p) self.pg_enable_capture(self.pg_interfaces) @@ -1877,12 +2055,24 @@ class TestDeterministicNAT(MethodHolder): p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / - UDP(sport=3000, dport=3000)) + UDP(sport=3001, dport=3002)) self.pg0.add_stream(p) self.pg_enable_capture(self.pg_interfaces) self.pg_start() capture = self.pg1.assert_nothing_captured() + # verify ICMP error packet + capture = self.pg0.get_capture(1) + p = capture[0] + self.assertTrue(p.haslayer(ICMP)) + icmp = p[ICMP] + self.assertEqual(icmp.type, 3) + self.assertEqual(icmp.code, 1) + self.assertTrue(icmp.haslayer(IPerror)) + inner_ip = icmp[IPerror] + self.assertEqual(inner_ip[UDPerror].sport, 3001) + self.assertEqual(inner_ip[UDPerror].dport, 3002) + dms = self.vapi.snat_det_map_dump() self.assertEqual(1000, dms[0].ses_num)