+ deterministic_mappings = self.vapi.snat_det_map_dump()
+ self.assertEqual(len(deterministic_mappings), 1)
+ dsm = deterministic_mappings[0]
+ self.assertEqual(in_addr_n, dsm.in_addr[:4])
+ self.assertEqual(in_plen, dsm.in_plen)
+ self.assertEqual(out_addr_n, dsm.out_addr[:4])
+ self.assertEqual(out_plen, dsm.out_plen)
+
+ self.clear_snat()
+ deterministic_mappings = self.vapi.snat_det_map_dump()
+ self.assertEqual(len(deterministic_mappings), 0)
+
+ def test_set_timeouts(self):
+ """ Set deterministic NAT timeouts """
+ timeouts_before = self.vapi.snat_det_get_timeouts()
+
+ self.vapi.snat_det_set_timeouts(timeouts_before.udp + 10,
+ timeouts_before.tcp_established + 10,
+ timeouts_before.tcp_transitory + 10,
+ timeouts_before.icmp + 10)
+
+ timeouts_after = self.vapi.snat_det_get_timeouts()
+
+ self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
+ self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
+ self.assertNotEqual(timeouts_before.tcp_established,
+ timeouts_after.tcp_established)
+ self.assertNotEqual(timeouts_before.tcp_transitory,
+ timeouts_after.tcp_transitory)
+
+ def test_det_in(self):
+ """ CGNAT translation test (TCP, UDP, ICMP) """
+
+ nat_ip = "10.0.0.10"
+
+ self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
+ 32,
+ socket.inet_aton(nat_ip),
+ 32)
+ self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ # in2out
+ pkts = self.create_stream_in(self.pg0, self.pg1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(len(pkts))
+ self.verify_capture_out(capture, nat_ip)
+
+ # out2in
+ pkts = self.create_stream_out(self.pg1, nat_ip)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ self.verify_capture_in(capture, self.pg0)
+
+ # session dump test
+ sessions = self.vapi.snat_det_session_dump(self.pg0.remote_ip4n)
+ self.assertEqual(len(sessions), 3)
+
+ # TCP session
+ s = sessions[0]
+ self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
+ self.assertEqual(s.in_port, self.tcp_port_in)
+ self.assertEqual(s.out_port, self.tcp_port_out)
+ self.assertEqual(s.ext_port, self.tcp_external_port)
+
+ # UDP session
+ s = sessions[1]
+ self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
+ self.assertEqual(s.in_port, self.udp_port_in)
+ self.assertEqual(s.out_port, self.udp_port_out)
+ self.assertEqual(s.ext_port, self.udp_external_port)
+
+ # ICMP session
+ s = sessions[2]
+ self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
+ self.assertEqual(s.in_port, self.icmp_id_in)
+ self.assertEqual(s.out_port, self.icmp_external_id)
+
+ def test_multiple_users(self):
+ """ CGNAT multiple users """
+
+ nat_ip = "10.0.0.10"
+ port_in = 80
+ external_port = 6303
+
+ host0 = self.pg0.remote_hosts[0]
+ host1 = self.pg0.remote_hosts[1]
+
+ self.vapi.snat_add_det_map(host0.ip4n,
+ 24,
+ socket.inet_aton(nat_ip),
+ 32)
+ self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ # host0 to out
+ p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
+ IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=port_in, dport=external_port))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.src, nat_ip)
+ self.assertEqual(ip.dst, self.pg1.remote_ip4)
+ self.assertEqual(tcp.dport, external_port)
+ port_out0 = tcp.sport
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ # host1 to out
+ p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
+ IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=port_in, dport=external_port))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.src, nat_ip)
+ self.assertEqual(ip.dst, self.pg1.remote_ip4)
+ self.assertEqual(tcp.dport, external_port)
+ port_out1 = tcp.sport
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ dms = self.vapi.snat_det_map_dump()
+ self.assertEqual(1, len(dms))
+ self.assertEqual(2, dms[0].ses_num)
+
+ # out to host0
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=nat_ip) /
+ TCP(sport=external_port, dport=port_out0))
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.src, self.pg1.remote_ip4)
+ self.assertEqual(ip.dst, host0.ip4)
+ self.assertEqual(tcp.dport, port_in)
+ self.assertEqual(tcp.sport, external_port)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ # out to host1
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=nat_ip) /
+ TCP(sport=external_port, dport=port_out1))
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.src, self.pg1.remote_ip4)
+ self.assertEqual(ip.dst, host1.ip4)
+ self.assertEqual(tcp.dport, port_in)
+ self.assertEqual(tcp.sport, external_port)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet", p))
+ raise
+
+ # session close api test
+ self.vapi.snat_det_close_session_out(socket.inet_aton(nat_ip),
+ port_out1,
+ self.pg1.remote_ip4n,
+ external_port)
+ dms = self.vapi.snat_det_map_dump()
+ self.assertEqual(dms[0].ses_num, 1)
+
+ self.vapi.snat_det_close_session_in(host0.ip4n,
+ port_in,
+ self.pg1.remote_ip4n,
+ external_port)
+ dms = self.vapi.snat_det_map_dump()
+ self.assertEqual(dms[0].ses_num, 0)
+
+ def test_tcp_session_close_detection_in(self):
+ """ CGNAT TCP session close initiated from inside network """
+ self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
+ 32,
+ socket.inet_aton(self.snat_addr),
+ 32)
+ self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ self.initiate_tcp_session(self.pg0, self.pg1)
+
+ # close the session from inside
+ try:
+ # FIN packet in -> out
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
+ flags="F"))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg1.get_capture(1)
+
+ pkts = []
+
+ # ACK packet out -> in
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
+ TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
+ flags="A"))
+ pkts.append(p)
+
+ # FIN packet out -> in
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
+ TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
+ flags="F"))
+ pkts.append(p)
+
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg0.get_capture(2)
+
+ # ACK packet in -> out
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
+ flags="A"))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg1.get_capture(1)
+
+ # Check if snat closed the session
+ dms = self.vapi.snat_det_map_dump()
+ self.assertEqual(0, dms[0].ses_num)
+ except:
+ self.logger.error("TCP session termination failed")
+ raise
+
+ def test_tcp_session_close_detection_out(self):
+ """ CGNAT TCP session close initiated from outside network """
+ self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
+ 32,
+ socket.inet_aton(self.snat_addr),
+ 32)
+ self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ self.initiate_tcp_session(self.pg0, self.pg1)
+
+ # close the session from outside
+ try:
+ # FIN packet out -> in
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
+ TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
+ flags="F"))
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg0.get_capture(1)
+
+ pkts = []
+
+ # ACK packet in -> out
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
+ flags="A"))
+ pkts.append(p)
+
+ # ACK packet in -> out
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
+ flags="F"))
+ pkts.append(p)
+
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg1.get_capture(2)
+
+ # ACK packet out -> in
+ p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
+ IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
+ TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
+ flags="A"))
+ self.pg1.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ self.pg0.get_capture(1)
+
+ # Check if snat closed the session
+ dms = self.vapi.snat_det_map_dump()
+ self.assertEqual(0, dms[0].ses_num)
+ except:
+ self.logger.error("TCP session termination failed")
+ raise
+
+ @unittest.skipUnless(running_extended_tests(), "part of extended tests")
+ def test_session_timeout(self):
+ """ CGNAT session timeouts """
+ self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
+ 32,
+ socket.inet_aton(self.snat_addr),
+ 32)
+ self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ self.initiate_tcp_session(self.pg0, self.pg1)
+ self.vapi.snat_det_set_timeouts(5, 5, 5, 5)
+ pkts = self.create_stream_in(self.pg0, self.pg1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(len(pkts))
+ sleep(15)
+
+ dms = self.vapi.snat_det_map_dump()
+ self.assertEqual(0, dms[0].ses_num)
+
+ def test_session_limit_per_user(self):
+ """ CGNAT maximum 1000 sessions per user should be created """
+ self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
+ 32,
+ socket.inet_aton(self.snat_addr),
+ 32)
+ self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+ self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
+ src_address=self.pg2.local_ip4n,
+ path_mtu=512,
+ template_interval=10)
+ self.vapi.snat_ipfix()
+
+ pkts = []
+ for port in range(1025, 2025):
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ UDP(sport=port, dport=port))
+ pkts.append(p)
+
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(len(pkts))
+
+ p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
+ UDP(sport=3001, dport=3002))
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.assert_nothing_captured()
+
+ # verify ICMP error packet
+ capture = self.pg0.get_capture(1)
+ p = capture[0]
+ self.assertTrue(p.haslayer(ICMP))
+ icmp = p[ICMP]
+ self.assertEqual(icmp.type, 3)
+ self.assertEqual(icmp.code, 1)
+ self.assertTrue(icmp.haslayer(IPerror))
+ inner_ip = icmp[IPerror]
+ self.assertEqual(inner_ip[UDPerror].sport, 3001)
+ self.assertEqual(inner_ip[UDPerror].dport, 3002)
+
+ dms = self.vapi.snat_det_map_dump()
+
+ self.assertEqual(1000, dms[0].ses_num)
+
+ # verify IPFIX logging
+ self.vapi.cli("ipfix flush") # FIXME this should be an API call
+ capture = self.pg2.get_capture(2)
+ ipfix = IPFIXDecoder()
+ # first load template
+ for p in capture:
+ self.assertTrue(p.haslayer(IPFIX))
+ if p.haslayer(Template):
+ ipfix.add_template(p.getlayer(Template))
+ # verify events in data set
+ for p in capture:
+ if p.haslayer(Data):
+ data = ipfix.decode_data_set(p.getlayer(Set))
+ self.verify_ipfix_max_entries_per_user(data)
+
+ def clear_snat(self):
+ """
+ Clear SNAT configuration.
+ """
+ self.vapi.snat_ipfix(enable=0)
+ self.vapi.snat_det_set_timeouts()
+ deterministic_mappings = self.vapi.snat_det_map_dump()
+ for dsm in deterministic_mappings:
+ self.vapi.snat_add_det_map(dsm.in_addr,
+ dsm.in_plen,
+ dsm.out_addr,
+ dsm.out_plen,
+ is_add=0)
+
+ interfaces = self.vapi.snat_interface_dump()
+ for intf in interfaces:
+ self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
+ intf.is_inside,
+ is_add=0)
+