X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_snat.py;h=b85c3dfeff4ab7e4bd41997be0a8d92441d2010f;hb=406eb1df440ded950533780b2b7bbab10fc3da10;hp=0708d440714e0fe14d41b57f6f1571e7601b9c95;hpb=d7f75cdf672ff1b323175a50b853d63c1242e65c;p=vpp.git diff --git a/test/test_snat.py b/test/test_snat.py index 0708d440714..b85c3dfeff4 100644 --- a/test/test_snat.py +++ b/test/test_snat.py @@ -4,13 +4,14 @@ import socket import unittest import struct -from framework import VppTestCase, VppTestRunner +from framework import VppTestCase, VppTestRunner, running_extended_tests from scapy.layers.inet import IP, TCP, UDP, ICMP from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror from scapy.layers.l2 import Ether, ARP from scapy.data import IP_PROTOS from util import ppp from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder +from time import sleep class MethodHolder(VppTestCase): @@ -312,7 +313,7 @@ class TestSNAT(MethodHolder): cls.icmp_id_out = 6305 cls.snat_addr = '10.0.0.3' - cls.create_pg_interfaces(range(8)) + cls.create_pg_interfaces(range(9)) cls.interfaces = list(cls.pg_interfaces[0:4]) for i in cls.interfaces: @@ -343,6 +344,7 @@ class TestSNAT(MethodHolder): i.resolve_arp() cls.pg7.admin_up() + cls.pg8.admin_up() except Exception: super(TestSNAT, cls).tearDownClass() @@ -352,6 +354,26 @@ class TestSNAT(MethodHolder): """ Clear SNAT configuration. """ + # I found no elegant way to do this + self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n, + dst_address_length=32, + next_hop_address=self.pg7.remote_ip4n, + next_hop_sw_if_index=self.pg7.sw_if_index, + is_add=0) + self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n, + dst_address_length=32, + next_hop_address=self.pg8.remote_ip4n, + next_hop_sw_if_index=self.pg8.sw_if_index, + is_add=0) + + for intf in [self.pg7, self.pg8]: + neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index) + for n in neighbors: + self.vapi.ip_neighbor_add_del(intf.sw_if_index, + n.mac_address, + n.ip_address, + is_add=0) + if self.pg7.has_ip4_config: self.pg7.unconfig_ip4() @@ -559,7 +581,7 @@ class TestSNAT(MethodHolder): self.verify_capture_out_with_icmp_errors(capture) def test_ping_out_interface_from_outside(self): - """ Ping SNAT out interface from outside """ + """ Ping SNAT out interface from outside network """ self.snat_add_address(self.snat_addr) self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index) @@ -586,6 +608,36 @@ class TestSNAT(MethodHolder): "(outside network):", packet)) raise + def test_ping_internal_host_from_outside(self): + """ Ping internal host from outside network """ + + self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr) + self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index) + self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index, + is_inside=0) + + # out2in + pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) / + IP(src=self.pg1.remote_ip4, dst=self.snat_addr, ttl=64) / + ICMP(id=self.icmp_id_out, type='echo-request')) + self.pg1.add_stream(pkt) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg0.get_capture(1) + self.verify_capture_in(capture, self.pg0, packet_num=1) + self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp) + + # in2out + pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) / + IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) / + ICMP(id=self.icmp_id_in, type='echo-reply')) + self.pg0.add_stream(pkt) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture(1) + self.verify_capture_out(capture, same_port=True, packet_num=1) + self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp) + def test_static_in(self): """ SNAT 1:1 NAT initialized from inside network """ @@ -1280,6 +1332,145 @@ class TestSNAT(MethodHolder): capture = self.pg2.get_capture(len(pkts)) self.verify_capture_out(capture, nat_ip1) + def test_dynamic_ipless_interfaces(self): + """ SNAT interfaces without configured ip dynamic map """ + + self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index, + self.pg7.remote_mac, + self.pg7.remote_ip4n, + is_static=1) + self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index, + self.pg8.remote_mac, + self.pg8.remote_ip4n, + is_static=1) + + self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n, + dst_address_length=32, + next_hop_address=self.pg7.remote_ip4n, + next_hop_sw_if_index=self.pg7.sw_if_index) + self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n, + dst_address_length=32, + next_hop_address=self.pg8.remote_ip4n, + next_hop_sw_if_index=self.pg8.sw_if_index) + + self.snat_add_address(self.snat_addr) + self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index) + self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index, + is_inside=0) + + # in2out + pkts = self.create_stream_in(self.pg7, self.pg8) + self.pg7.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg8.get_capture(len(pkts)) + self.verify_capture_out(capture) + + # out2in + pkts = self.create_stream_out(self.pg8, self.snat_addr) + self.pg8.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg7.get_capture(len(pkts)) + self.verify_capture_in(capture, self.pg7) + + def test_static_ipless_interfaces(self): + """ SNAT 1:1 NAT interfaces without configured ip """ + + self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index, + self.pg7.remote_mac, + self.pg7.remote_ip4n, + is_static=1) + self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index, + self.pg8.remote_mac, + self.pg8.remote_ip4n, + is_static=1) + + self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n, + dst_address_length=32, + next_hop_address=self.pg7.remote_ip4n, + next_hop_sw_if_index=self.pg7.sw_if_index) + self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n, + dst_address_length=32, + next_hop_address=self.pg8.remote_ip4n, + next_hop_sw_if_index=self.pg8.sw_if_index) + + self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr) + self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index) + self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index, + is_inside=0) + + # out2in + pkts = self.create_stream_out(self.pg8) + self.pg8.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg7.get_capture(len(pkts)) + self.verify_capture_in(capture, self.pg7) + + # in2out + pkts = self.create_stream_in(self.pg7, self.pg8) + self.pg7.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg8.get_capture(len(pkts)) + self.verify_capture_out(capture, self.snat_addr, True) + + def test_static_with_port_ipless_interfaces(self): + """ SNAT 1:1 NAT with port interfaces without configured ip """ + + self.tcp_port_out = 30606 + self.udp_port_out = 30607 + self.icmp_id_out = 30608 + + self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index, + self.pg7.remote_mac, + self.pg7.remote_ip4n, + is_static=1) + self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index, + self.pg8.remote_mac, + self.pg8.remote_ip4n, + is_static=1) + + self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n, + dst_address_length=32, + next_hop_address=self.pg7.remote_ip4n, + next_hop_sw_if_index=self.pg7.sw_if_index) + self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n, + dst_address_length=32, + next_hop_address=self.pg8.remote_ip4n, + next_hop_sw_if_index=self.pg8.sw_if_index) + + self.snat_add_address(self.snat_addr) + self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr, + self.tcp_port_in, self.tcp_port_out, + proto=IP_PROTOS.tcp) + self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr, + self.udp_port_in, self.udp_port_out, + proto=IP_PROTOS.udp) + self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr, + self.icmp_id_in, self.icmp_id_out, + proto=IP_PROTOS.icmp) + self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index) + self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index, + is_inside=0) + + # out2in + pkts = self.create_stream_out(self.pg8) + self.pg8.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg7.get_capture(len(pkts)) + self.verify_capture_in(capture, self.pg7) + + # in2out + pkts = self.create_stream_in(self.pg7, self.pg8) + self.pg7.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg8.get_capture(len(pkts)) + self.verify_capture_out(capture) + def tearDown(self): super(TestSNAT, self).tearDown() if not self.vpp_dead: @@ -1300,7 +1491,14 @@ class TestDeterministicNAT(MethodHolder): super(TestDeterministicNAT, cls).setUpClass() try: - cls.create_pg_interfaces(range(2)) + cls.tcp_port_in = 6303 + cls.tcp_external_port = 6303 + cls.udp_port_in = 6304 + cls.udp_external_port = 6304 + cls.icmp_id_in = 6305 + cls.snat_addr = '10.0.0.3' + + cls.create_pg_interfaces(range(3)) cls.interfaces = list(cls.pg_interfaces) for i in cls.interfaces: @@ -1308,10 +1506,158 @@ class TestDeterministicNAT(MethodHolder): i.config_ip4() i.resolve_arp() + cls.pg0.generate_remote_hosts(2) + cls.pg0.configure_ipv4_neighbors() + except Exception: super(TestDeterministicNAT, cls).tearDownClass() raise + def create_stream_in(self, in_if, out_if, ttl=64): + """ + Create packet stream for inside network + + :param in_if: Inside interface + :param out_if: Outside interface + :param ttl: TTL of generated packets + """ + pkts = [] + # TCP + p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / + IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) / + TCP(sport=self.tcp_port_in, dport=self.tcp_external_port)) + pkts.append(p) + + # UDP + p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / + IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) / + UDP(sport=self.udp_port_in, dport=self.udp_external_port)) + pkts.append(p) + + # ICMP + p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) / + IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) / + ICMP(id=self.icmp_id_in, type='echo-request')) + pkts.append(p) + + return pkts + + def create_stream_out(self, out_if, dst_ip=None, ttl=64): + """ + Create packet stream for outside network + + :param out_if: Outside interface + :param dst_ip: Destination IP address (Default use global SNAT address) + :param ttl: TTL of generated packets + """ + if dst_ip is None: + dst_ip = self.snat_addr + pkts = [] + # TCP + p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / + IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / + TCP(dport=self.tcp_port_out, sport=self.tcp_external_port)) + pkts.append(p) + + # UDP + p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / + IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / + UDP(dport=self.udp_port_out, sport=self.udp_external_port)) + pkts.append(p) + + # ICMP + p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) / + IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) / + ICMP(id=self.icmp_external_id, type='echo-reply')) + pkts.append(p) + + return pkts + + def verify_capture_out(self, capture, nat_ip=None, packet_num=3): + """ + Verify captured packets on outside network + + :param capture: Captured packets + :param nat_ip: Translated IP address (Default use global SNAT address) + :param same_port: Sorce port number is not translated (Default False) + :param packet_num: Expected number of packets (Default 3) + """ + if nat_ip is None: + nat_ip = self.snat_addr + self.assertEqual(packet_num, len(capture)) + for packet in capture: + try: + self.assertEqual(packet[IP].src, nat_ip) + if packet.haslayer(TCP): + self.tcp_port_out = packet[TCP].sport + elif packet.haslayer(UDP): + self.udp_port_out = packet[UDP].sport + else: + self.icmp_external_id = packet[ICMP].id + except: + self.logger.error(ppp("Unexpected or invalid packet " + "(outside network):", packet)) + raise + + def initiate_tcp_session(self, in_if, out_if): + """ + Initiates TCP session + + :param in_if: Inside interface + :param out_if: Outside interface + """ + try: + # SYN packet in->out + p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) / + IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) / + TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, + flags="S")) + in_if.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = out_if.get_capture(1) + p = capture[0] + self.tcp_port_out = p[TCP].sport + + # SYN + ACK packet out->in + p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) / + IP(src=out_if.remote_ip4, dst=self.snat_addr) / + TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, + flags="SA")) + out_if.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + in_if.get_capture(1) + + # ACK packet in->out + p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) / + IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) / + TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, + flags="A")) + in_if.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + out_if.get_capture(1) + + except: + self.logger.error("TCP 3 way handshake failed") + raise + + def verify_ipfix_max_entries_per_user(self, data): + """ + Verify IPFIX maximum entries per user exceeded event + + :param data: Decoded IPFIX data records + """ + self.assertEqual(1, len(data)) + record = data[0] + # natEvent + self.assertEqual(ord(record[230]), 13) + # natQuotaExceededEvent + self.assertEqual('\x03\x00\x00\x00', record[466]) + # sourceIPv4Address + self.assertEqual(self.pg0.remote_ip4n, record[8]) + def test_deterministic_mode(self): """ S-NAT run deterministic mode """ in_addr = '172.16.255.0' @@ -1345,10 +1691,413 @@ class TestDeterministicNAT(MethodHolder): deterministic_mappings = self.vapi.snat_det_map_dump() self.assertEqual(len(deterministic_mappings), 0) + def test_set_timeouts(self): + """ Set deterministic NAT timeouts """ + timeouts_before = self.vapi.snat_det_get_timeouts() + + self.vapi.snat_det_set_timeouts(timeouts_before.udp + 10, + timeouts_before.tcp_established + 10, + timeouts_before.tcp_transitory + 10, + timeouts_before.icmp + 10) + + timeouts_after = self.vapi.snat_det_get_timeouts() + + self.assertNotEqual(timeouts_before.udp, timeouts_after.udp) + self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp) + self.assertNotEqual(timeouts_before.tcp_established, + timeouts_after.tcp_established) + self.assertNotEqual(timeouts_before.tcp_transitory, + timeouts_after.tcp_transitory) + + def test_det_in(self): + """ CGNAT translation test (TCP, UDP, ICMP) """ + + nat_ip = "10.0.0.10" + + self.vapi.snat_add_det_map(self.pg0.remote_ip4n, + 32, + socket.inet_aton(nat_ip), + 32) + self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index) + self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index, + is_inside=0) + + # in2out + pkts = self.create_stream_in(self.pg0, self.pg1) + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture(len(pkts)) + self.verify_capture_out(capture, nat_ip) + + # out2in + pkts = self.create_stream_out(self.pg1, nat_ip) + self.pg1.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg0.get_capture(len(pkts)) + self.verify_capture_in(capture, self.pg0) + + # session dump test + sessions = self.vapi.snat_det_session_dump(self.pg0.remote_ip4n) + self.assertEqual(len(sessions), 3) + + # TCP session + s = sessions[0] + self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n) + self.assertEqual(s.in_port, self.tcp_port_in) + self.assertEqual(s.out_port, self.tcp_port_out) + self.assertEqual(s.ext_port, self.tcp_external_port) + + # UDP session + s = sessions[1] + self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n) + self.assertEqual(s.in_port, self.udp_port_in) + self.assertEqual(s.out_port, self.udp_port_out) + self.assertEqual(s.ext_port, self.udp_external_port) + + # ICMP session + s = sessions[2] + self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n) + self.assertEqual(s.in_port, self.icmp_id_in) + self.assertEqual(s.out_port, self.icmp_external_id) + + def test_multiple_users(self): + """ CGNAT multiple users """ + + nat_ip = "10.0.0.10" + port_in = 80 + external_port = 6303 + + host0 = self.pg0.remote_hosts[0] + host1 = self.pg0.remote_hosts[1] + + self.vapi.snat_add_det_map(host0.ip4n, + 24, + socket.inet_aton(nat_ip), + 32) + self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index) + self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index, + is_inside=0) + + # host0 to out + p = (Ether(src=host0.mac, dst=self.pg0.local_mac) / + IP(src=host0.ip4, dst=self.pg1.remote_ip4) / + TCP(sport=port_in, dport=external_port)) + self.pg0.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture(1) + p = capture[0] + try: + ip = p[IP] + tcp = p[TCP] + self.assertEqual(ip.src, nat_ip) + self.assertEqual(ip.dst, self.pg1.remote_ip4) + self.assertEqual(tcp.dport, external_port) + port_out0 = tcp.sport + except: + self.logger.error(ppp("Unexpected or invalid packet:", p)) + raise + + # host1 to out + p = (Ether(src=host1.mac, dst=self.pg0.local_mac) / + IP(src=host1.ip4, dst=self.pg1.remote_ip4) / + TCP(sport=port_in, dport=external_port)) + self.pg0.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture(1) + p = capture[0] + try: + ip = p[IP] + tcp = p[TCP] + self.assertEqual(ip.src, nat_ip) + self.assertEqual(ip.dst, self.pg1.remote_ip4) + self.assertEqual(tcp.dport, external_port) + port_out1 = tcp.sport + except: + self.logger.error(ppp("Unexpected or invalid packet:", p)) + raise + + dms = self.vapi.snat_det_map_dump() + self.assertEqual(1, len(dms)) + self.assertEqual(2, dms[0].ses_num) + + # out to host0 + p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / + IP(src=self.pg1.remote_ip4, dst=nat_ip) / + TCP(sport=external_port, dport=port_out0)) + self.pg1.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg0.get_capture(1) + p = capture[0] + try: + ip = p[IP] + tcp = p[TCP] + self.assertEqual(ip.src, self.pg1.remote_ip4) + self.assertEqual(ip.dst, host0.ip4) + self.assertEqual(tcp.dport, port_in) + self.assertEqual(tcp.sport, external_port) + except: + self.logger.error(ppp("Unexpected or invalid packet:", p)) + raise + + # out to host1 + p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / + IP(src=self.pg1.remote_ip4, dst=nat_ip) / + TCP(sport=external_port, dport=port_out1)) + self.pg1.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg0.get_capture(1) + p = capture[0] + try: + ip = p[IP] + tcp = p[TCP] + self.assertEqual(ip.src, self.pg1.remote_ip4) + self.assertEqual(ip.dst, host1.ip4) + self.assertEqual(tcp.dport, port_in) + self.assertEqual(tcp.sport, external_port) + except: + self.logger.error(ppp("Unexpected or invalid packet", p)) + raise + + # session close api test + self.vapi.snat_det_close_session_out(socket.inet_aton(nat_ip), + port_out1, + self.pg1.remote_ip4n, + external_port) + dms = self.vapi.snat_det_map_dump() + self.assertEqual(dms[0].ses_num, 1) + + self.vapi.snat_det_close_session_in(host0.ip4n, + port_in, + self.pg1.remote_ip4n, + external_port) + dms = self.vapi.snat_det_map_dump() + self.assertEqual(dms[0].ses_num, 0) + + def test_tcp_session_close_detection_in(self): + """ CGNAT TCP session close initiated from inside network """ + self.vapi.snat_add_det_map(self.pg0.remote_ip4n, + 32, + socket.inet_aton(self.snat_addr), + 32) + self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index) + self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index, + is_inside=0) + + self.initiate_tcp_session(self.pg0, self.pg1) + + # close the session from inside + try: + # FIN packet in -> out + p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / + IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / + TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, + flags="F")) + self.pg0.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + self.pg1.get_capture(1) + + pkts = [] + + # ACK packet out -> in + p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / + IP(src=self.pg1.remote_ip4, dst=self.snat_addr) / + TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, + flags="A")) + pkts.append(p) + + # FIN packet out -> in + p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / + IP(src=self.pg1.remote_ip4, dst=self.snat_addr) / + TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, + flags="F")) + pkts.append(p) + + self.pg1.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + self.pg0.get_capture(2) + + # ACK packet in -> out + p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / + IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / + TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, + flags="A")) + self.pg0.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + self.pg1.get_capture(1) + + # Check if snat closed the session + dms = self.vapi.snat_det_map_dump() + self.assertEqual(0, dms[0].ses_num) + except: + self.logger.error("TCP session termination failed") + raise + + def test_tcp_session_close_detection_out(self): + """ CGNAT TCP session close initiated from outside network """ + self.vapi.snat_add_det_map(self.pg0.remote_ip4n, + 32, + socket.inet_aton(self.snat_addr), + 32) + self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index) + self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index, + is_inside=0) + + self.initiate_tcp_session(self.pg0, self.pg1) + + # close the session from outside + try: + # FIN packet out -> in + p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / + IP(src=self.pg1.remote_ip4, dst=self.snat_addr) / + TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, + flags="F")) + self.pg1.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + self.pg0.get_capture(1) + + pkts = [] + + # ACK packet in -> out + p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / + IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / + TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, + flags="A")) + pkts.append(p) + + # ACK packet in -> out + p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / + IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / + TCP(sport=self.tcp_port_in, dport=self.tcp_external_port, + flags="F")) + pkts.append(p) + + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + self.pg1.get_capture(2) + + # ACK packet out -> in + p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) / + IP(src=self.pg1.remote_ip4, dst=self.snat_addr) / + TCP(sport=self.tcp_external_port, dport=self.tcp_port_out, + flags="A")) + self.pg1.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + self.pg0.get_capture(1) + + # Check if snat closed the session + dms = self.vapi.snat_det_map_dump() + self.assertEqual(0, dms[0].ses_num) + except: + self.logger.error("TCP session termination failed") + raise + + @unittest.skipUnless(running_extended_tests(), "part of extended tests") + def test_session_timeout(self): + """ CGNAT session timeouts """ + self.vapi.snat_add_det_map(self.pg0.remote_ip4n, + 32, + socket.inet_aton(self.snat_addr), + 32) + self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index) + self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index, + is_inside=0) + + self.initiate_tcp_session(self.pg0, self.pg1) + self.vapi.snat_det_set_timeouts(5, 5, 5, 5) + pkts = self.create_stream_in(self.pg0, self.pg1) + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture(len(pkts)) + sleep(15) + + dms = self.vapi.snat_det_map_dump() + self.assertEqual(0, dms[0].ses_num) + + def test_session_limit_per_user(self): + """ CGNAT maximum 1000 sessions per user should be created """ + self.vapi.snat_add_det_map(self.pg0.remote_ip4n, + 32, + socket.inet_aton(self.snat_addr), + 32) + self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index) + self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index, + is_inside=0) + self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n, + src_address=self.pg2.local_ip4n, + path_mtu=512, + template_interval=10) + self.vapi.snat_ipfix() + + pkts = [] + for port in range(1025, 2025): + p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / + IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / + UDP(sport=port, dport=port)) + pkts.append(p) + + self.pg0.add_stream(pkts) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.get_capture(len(pkts)) + + p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / + IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) / + UDP(sport=3001, dport=3002)) + self.pg0.add_stream(p) + self.pg_enable_capture(self.pg_interfaces) + self.pg_start() + capture = self.pg1.assert_nothing_captured() + + # verify ICMP error packet + capture = self.pg0.get_capture(1) + p = capture[0] + self.assertTrue(p.haslayer(ICMP)) + icmp = p[ICMP] + self.assertEqual(icmp.type, 3) + self.assertEqual(icmp.code, 1) + self.assertTrue(icmp.haslayer(IPerror)) + inner_ip = icmp[IPerror] + self.assertEqual(inner_ip[UDPerror].sport, 3001) + self.assertEqual(inner_ip[UDPerror].dport, 3002) + + dms = self.vapi.snat_det_map_dump() + + self.assertEqual(1000, dms[0].ses_num) + + # verify IPFIX logging + self.vapi.cli("ipfix flush") # FIXME this should be an API call + capture = self.pg2.get_capture(2) + ipfix = IPFIXDecoder() + # first load template + for p in capture: + self.assertTrue(p.haslayer(IPFIX)) + if p.haslayer(Template): + ipfix.add_template(p.getlayer(Template)) + # verify events in data set + for p in capture: + if p.haslayer(Data): + data = ipfix.decode_data_set(p.getlayer(Set)) + self.verify_ipfix_max_entries_per_user(data) + def clear_snat(self): """ Clear SNAT configuration. """ + self.vapi.snat_ipfix(enable=0) + self.vapi.snat_det_set_timeouts() deterministic_mappings = self.vapi.snat_det_map_dump() for dsm in deterministic_mappings: self.vapi.snat_add_det_map(dsm.in_addr, @@ -1357,6 +2106,12 @@ class TestDeterministicNAT(MethodHolder): dsm.out_plen, is_add=0) + interfaces = self.vapi.snat_interface_dump() + for intf in interfaces: + self.vapi.snat_interface_add_del_feature(intf.sw_if_index, + intf.is_inside, + is_add=0) + def tearDown(self): super(TestDeterministicNAT, self).tearDown() if not self.vpp_dead: