+ self.twice_nat_common(lb=True)
+
+ def test_self_twice_nat_lb_positive(self):
+ """ Self Twice NAT44 local service load balancing (positive test) """
+ self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
+ client_id=1)
+
+ def test_self_twice_nat_lb_negative(self):
+ """ Self Twice NAT44 local service load balancing (negative test) """
+ self.twice_nat_common(lb=True, self_twice_nat=True, same_pg=True,
+ client_id=2)
+
+ def test_twice_nat_interface_addr(self):
+ """ Acquire twice NAT44 addresses from interface """
+ self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index, twice_nat=1)
+
+ # no address in NAT pool
+ adresses = self.vapi.nat44_address_dump()
+ self.assertEqual(0, len(adresses))
+
+ # configure interface address and check NAT address pool
+ self.pg7.config_ip4()
+ adresses = self.vapi.nat44_address_dump()
+ self.assertEqual(1, len(adresses))
+ self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
+ self.assertEqual(adresses[0].twice_nat, 1)
+
+ # remove interface address and check NAT address pool
+ self.pg7.unconfig_ip4()
+ adresses = self.vapi.nat44_address_dump()
+ self.assertEqual(0, len(adresses))
+
+ def test_ipfix_max_frags(self):
+ """ IPFIX logging maximum fragments pending reassembly exceeded """
+ self.nat44_add_address(self.nat_addr)
+ self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+ self.vapi.nat_set_reass(max_frag=0)
+ self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
+ src_address=self.pg3.local_ip4n,
+ path_mtu=512,
+ template_interval=10)
+ self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
+ src_port=self.ipfix_src_port)
+
+ data = "A" * 4 + "B" * 16 + "C" * 3
+ self.tcp_port_in = random.randint(1025, 65535)
+ pkts = self.create_stream_frag(self.pg0,
+ self.pg1.remote_ip4,
+ self.tcp_port_in,
+ 20,
+ data)
+ self.pg0.add_stream(pkts[-1])
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ frags = self.pg1.get_capture(0)
+ self.vapi.cli("ipfix flush") # FIXME this should be an API call
+ capture = self.pg3.get_capture(9)
+ ipfix = IPFIXDecoder()
+ # first load template
+ for p in capture:
+ self.assertTrue(p.haslayer(IPFIX))
+ self.assertEqual(p[IP].src, self.pg3.local_ip4)
+ self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
+ self.assertEqual(p[UDP].sport, self.ipfix_src_port)
+ self.assertEqual(p[UDP].dport, 4739)
+ self.assertEqual(p[IPFIX].observationDomainID,
+ self.ipfix_domain_id)
+ if p.haslayer(Template):
+ ipfix.add_template(p.getlayer(Template))
+ # verify events in data set
+ for p in capture:
+ if p.haslayer(Data):
+ data = ipfix.decode_data_set(p.getlayer(Set))
+ self.verify_ipfix_max_fragments_ip4(data, 0,
+ self.pg0.remote_ip4n)
+
+ def test_tcp_session_close_in(self):
+ """ Close TCP session from inside network """
+ self.nat44_add_address(self.nat_addr)
+ self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
+ start_sessnum = len(sessions)
+
+ self.initiate_tcp_session(self.pg0, self.pg1)
+
+ # close the session from inside
+ try:
+ # FIN packet in -> out
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
+ flags="FA", seq=100, ack=300))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg1.get_capture(1)
+
+ pkts = []
+
+ # ACK packet out -> in
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
+ flags="A", seq=300, ack=101))
+ pkts.append(p)
+
+ # FIN packet out -> in
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
+ flags="FA", seq=300, ack=101))
+ pkts.append(p)
+
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg0.get_capture(2)
+
+ # ACK packet in -> out
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
+ flags="A", seq=101, ack=301))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg1.get_capture(1)
+
+ self.initiate_tcp_session(self.pg0, self.pg1)
+ sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
+ 0)
+ self.assertEqual(len(sessions) - start_sessnum, 1)
+ except:
+ self.logger.error("TCP session termination failed")
+ raise
+
+ def test_tcp_session_close_out(self):
+ """ Close TCP session from outside network """
+ self.nat44_add_address(self.nat_addr)
+ self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
+ start_sessnum = len(sessions)
+
+ self.initiate_tcp_session(self.pg0, self.pg1)
+
+ # close the session from outside
+ try:
+ # FIN packet out -> in
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
+ flags="FA", seq=100, ack=300))
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg0.get_capture(1)
+
+ # FIN+ACK packet in -> out
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
+ flags="FA", seq=300, ack=101))
+
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg1.get_capture(1)
+
+ # ACK packet out -> in
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
+ flags="A", seq=101, ack=301))
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg0.get_capture(1)
+
+ self.initiate_tcp_session(self.pg0, self.pg1)
+ sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
+ 0)
+ self.assertEqual(len(sessions) - start_sessnum, 1)
+ except:
+ self.logger.error("TCP session termination failed")
+ raise
+
+ def test_tcp_session_close_simultaneous(self):
+ """ Close TCP session from inside network """
+ self.nat44_add_address(self.nat_addr)
+ self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
+ start_sessnum = len(sessions)
+
+ self.initiate_tcp_session(self.pg0, self.pg1)
+
+ # close the session from inside
+ try:
+ # FIN packet in -> out
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
+ flags="FA", seq=100, ack=300))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg1.get_capture(1)
+
+ # FIN packet out -> in
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
+ flags="FA", seq=300, ack=100))
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg0.get_capture(1)
+
+ # ACK packet in -> out
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
+ flags="A", seq=101, ack=301))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg1.get_capture(1)
+
+ # ACK packet out -> in
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
+ TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
+ flags="A", seq=301, ack=101))
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg0.get_capture(1)
+
+ self.initiate_tcp_session(self.pg0, self.pg1)
+ sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n,
+ 0)
+ self.assertEqual(len(sessions) - start_sessnum, 1)
+ except:
+ self.logger.error("TCP session termination failed")
+ raise
+
+ def tearDown(self):
+ super(TestNAT44, self).tearDown()
+ if not self.vpp_dead:
+ self.logger.info(self.vapi.cli("show nat44 addresses"))
+ self.logger.info(self.vapi.cli("show nat44 interfaces"))
+ self.logger.info(self.vapi.cli("show nat44 static mappings"))
+ self.logger.info(self.vapi.cli("show nat44 interface address"))
+ self.logger.info(self.vapi.cli("show nat44 sessions detail"))
+ self.logger.info(self.vapi.cli("show nat virtual-reassembly"))
+ self.vapi.cli("nat addr-port-assignment-alg default")
+ self.clear_nat44()
+
+
+class TestNAT44Out2InDPO(MethodHolder):
+ """ NAT44 Test Cases using out2in DPO """
+
+ @classmethod
+ def setUpConstants(cls):
+ super(TestNAT44Out2InDPO, cls).setUpConstants()
+ cls.vpp_cmdline.extend(["nat", "{", "out2in dpo", "}"])
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestNAT44Out2InDPO, cls).setUpClass()
+
+ try:
+ cls.tcp_port_in = 6303
+ cls.tcp_port_out = 6303
+ cls.udp_port_in = 6304
+ cls.udp_port_out = 6304
+ cls.icmp_id_in = 6305
+ cls.icmp_id_out = 6305
+ cls.nat_addr = '10.0.0.3'
+ cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
+ cls.dst_ip4 = '192.168.70.1'
+
+ cls.create_pg_interfaces(range(2))
+
+ cls.pg0.admin_up()
+ cls.pg0.config_ip4()
+ cls.pg0.resolve_arp()
+
+ cls.pg1.admin_up()
+ cls.pg1.config_ip6()
+ cls.pg1.resolve_ndp()
+
+ cls.vapi.ip_add_del_route(is_ipv6=True, dst_address='\x00'*16,
+ dst_address_length=0,
+ next_hop_address=cls.pg1.remote_ip6n,
+ next_hop_sw_if_index=cls.pg1.sw_if_index)