from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
from time import sleep
from util import ip4_range
+from util import mactobinary
class MethodHolder(VppTestCase):
cls.ipfix_src_port = 4739
cls.ipfix_domain_id = 1
- cls.create_pg_interfaces(range(9))
+ cls.create_pg_interfaces(range(10))
cls.interfaces = list(cls.pg_interfaces[0:4])
for i in cls.interfaces:
cls.pg7.admin_up()
cls.pg8.admin_up()
+ cls.pg9.generate_remote_hosts(2)
+ cls.pg9.config_ip4()
+ ip_addr_n = socket.inet_pton(socket.AF_INET, "10.0.0.1")
+ cls.vapi.sw_interface_add_del_address(cls.pg9.sw_if_index,
+ ip_addr_n,
+ 24)
+ cls.pg9.admin_up()
+ cls.pg9.resolve_arp()
+ cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
+ cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
+ cls.pg9.resolve_arp()
+
except Exception:
super(TestNAT44, cls).tearDownClass()
raise
interfaces = self.vapi.nat44_interface_dump()
for intf in interfaces:
+ if intf.is_inside > 1:
+ self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
+ 0,
+ is_add=0)
self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
intf.is_inside,
is_add=0)
lb_sm.external_port,
lb_sm.protocol,
lb_sm.vrf_id,
- is_add=0)
+ is_add=0,
+ local_num=0,
+ locals=[])
adresses = self.vapi.nat44_address_dump()
for addr in adresses:
""" NAT44 interfaces without configured IP address """
self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
- self.pg7.remote_mac,
+ mactobinary(self.pg7.remote_mac),
self.pg7.remote_ip4n,
is_static=1)
self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
- self.pg8.remote_mac,
+ mactobinary(self.pg8.remote_mac),
self.pg8.remote_ip4n,
is_static=1)
""" NAT44 interfaces without configured IP address - 1:1 NAT """
self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
- self.pg7.remote_mac,
+ mactobinary(self.pg7.remote_mac),
self.pg7.remote_ip4n,
is_static=1)
self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
- self.pg8.remote_mac,
+ mactobinary(self.pg8.remote_mac),
self.pg8.remote_ip4n,
is_static=1)
self.icmp_id_out = 30608
self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
- self.pg7.remote_mac,
+ mactobinary(self.pg7.remote_mac),
self.pg7.remote_ip4n,
is_static=1)
self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
- self.pg8.remote_mac,
+ mactobinary(self.pg8.remote_mac),
self.pg8.remote_ip4n,
is_static=1)
self.logger.error(ppp("Unexpected or invalid packet:"), p)
raise
+ def test_one_armed_nat44(self):
+ """ One armed NAT44 """
+ remote_host = self.pg9.remote_hosts[0]
+ local_host = self.pg9.remote_hosts[1]
+ external_port = 0
+
+ self.nat44_add_address(self.nat_addr)
+ self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index)
+ self.vapi.nat44_interface_add_del_feature(self.pg9.sw_if_index,
+ is_inside=0)
+
+ # in2out
+ p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
+ IP(src=local_host.ip4, dst=remote_host.ip4) /
+ TCP(sport=12345, dport=80))
+ self.pg9.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg9.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.src, self.nat_addr)
+ self.assertEqual(ip.dst, remote_host.ip4)
+ self.assertNotEqual(tcp.sport, 12345)
+ external_port = tcp.sport
+ self.assertEqual(tcp.dport, 80)
+ self.check_tcp_checksum(p)
+ self.check_ip_checksum(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ # out2in
+ p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
+ IP(src=remote_host.ip4, dst=self.nat_addr) /
+ TCP(sport=80, dport=external_port))
+ self.pg9.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg9.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.src, remote_host.ip4)
+ self.assertEqual(ip.dst, local_host.ip4)
+ self.assertEqual(tcp.sport, 80)
+ self.assertEqual(tcp.dport, 12345)
+ self.check_tcp_checksum(p)
+ self.check_ip_checksum(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ def test_del_session(self):
+ """ Delete NAT44 session """
+ self.nat44_add_address(self.nat_addr)
+ self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
+ self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
+ is_inside=0)
+
+ pkts = self.create_stream_in(self.pg0, self.pg1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(len(pkts))
+
+ sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
+ nsessions = len(sessions)
+
+ self.vapi.nat44_del_session(sessions[0].inside_ip_address,
+ sessions[0].inside_port,
+ sessions[0].protocol)
+ self.vapi.nat44_del_session(sessions[1].outside_ip_address,
+ sessions[1].outside_port,
+ sessions[1].protocol,
+ is_in=0)
+
+ sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4n, 0)
+ self.assertEqual(nsessions - len(sessions), 2)
+
def tearDown(self):
super(TestNAT44, self).tearDown()
if not self.vpp_dead:
cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
cls.vrf1_nat_addr)
- cls.create_pg_interfaces(range(3))
+ cls.create_pg_interfaces(range(4))
cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
cls.ip6_interfaces.append(cls.pg_interfaces[2])
cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
i.config_ip4()
i.resolve_arp()
+ cls.pg3.admin_up()
+ cls.pg3.config_ip4()
+ cls.pg3.resolve_arp()
+ cls.pg3.config_ip6()
+ cls.pg3.configure_ipv6_neighbors()
+
except Exception:
super(TestNAT64, cls).tearDownClass()
raise
self.logger.error(ppp("Unexpected or invalid packet:", packet))
raise
+ def test_one_armed_nat64(self):
+ """ One armed NAT64 """
+ external_port = 0
+ remote_host_ip6 = self.compose_ip6(self.pg3.remote_ip4,
+ '64:ff9b::',
+ 96)
+
+ self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
+ self.nat_addr_n)
+ self.vapi.nat64_add_del_interface(self.pg3.sw_if_index)
+ self.vapi.nat64_add_del_interface(self.pg3.sw_if_index, is_inside=0)
+
+ # in2out
+ p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
+ IPv6(src=self.pg3.remote_ip6, dst=remote_host_ip6) /
+ TCP(sport=12345, dport=80))
+ self.pg3.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg3.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IP]
+ tcp = p[TCP]
+ self.assertEqual(ip.src, self.nat_addr)
+ self.assertEqual(ip.dst, self.pg3.remote_ip4)
+ self.assertNotEqual(tcp.sport, 12345)
+ external_port = tcp.sport
+ self.assertEqual(tcp.dport, 80)
+ self.check_tcp_checksum(p)
+ self.check_ip_checksum(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
+ # out2in
+ p = (Ether(src=self.pg3.remote_mac, dst=self.pg3.local_mac) /
+ IP(src=self.pg3.remote_ip4, dst=self.nat_addr) /
+ TCP(sport=80, dport=external_port))
+ self.pg3.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg3.get_capture(1)
+ p = capture[0]
+ try:
+ ip = p[IPv6]
+ tcp = p[TCP]
+ self.assertEqual(ip.src, remote_host_ip6)
+ self.assertEqual(ip.dst, self.pg3.remote_ip6)
+ self.assertEqual(tcp.sport, 80)
+ self.assertEqual(tcp.dport, 12345)
+ self.check_tcp_checksum(p)
+ except:
+ self.logger.error(ppp("Unexpected or invalid packet:", p))
+ raise
+
def nat64_get_ses_num(self):
"""
Return number of active NAT64 sessions.
interfaces = self.vapi.nat64_interface_dump()
for intf in interfaces:
+ if intf.is_inside > 1:
+ self.vapi.nat64_add_del_interface(intf.sw_if_index,
+ 0,
+ is_add=0)
self.vapi.nat64_add_del_interface(intf.sw_if_index,
intf.is_inside,
is_add=0)