12 from framework import VppTestCase, VppTestRunner, running_extended_tests
13 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
14 from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \
15 IPField, IntField, LongField, XByteField, FlagsField, FieldLenField, \
17 from scapy.data import IP_PROTOS
18 from scapy.layers.inet import IP, TCP, UDP, ICMP
19 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
20 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
21 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
22 ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, fragment6
23 from scapy.layers.l2 import Ether, ARP, GRE
24 from scapy.packet import Raw
25 from syslog_rfc5424_parser import SyslogMessage, ParseError
26 from syslog_rfc5424_parser.constants import SyslogSeverity
27 from util import ip4_range
28 from util import ppc, ppp
29 from vpp_acl import AclRule, VppAcl, VppAclInterface
30 from vpp_ip_route import VppIpRoute, VppRoutePath
31 from vpp_neighbor import VppNeighbor
32 from vpp_papi import VppEnum
35 class MethodHolder(VppTestCase):
36 """ NAT create capture and verify method holder """
38 def config_flags(self):
39 return VppEnum.vl_api_nat_config_flags_t
42 class TestNAT66(MethodHolder):
43 """ NAT66 Test Cases """
47 super(TestNAT66, cls).setUpClass()
49 cls.nat_addr = 'fd01:ff::2'
51 cls.create_pg_interfaces(range(2))
52 cls.interfaces = list(cls.pg_interfaces)
54 for i in cls.interfaces:
57 i.configure_ipv6_neighbors()
60 def tearDownClass(cls):
61 super(TestNAT66, cls).tearDownClass()
63 def test_static(self):
64 """ 1:1 NAT66 test """
65 flags = self.config_flags.NAT_IS_INSIDE
66 self.vapi.nat66_add_del_interface(is_add=1, flags=flags,
67 sw_if_index=self.pg0.sw_if_index)
68 self.vapi.nat66_add_del_interface(is_add=1,
69 sw_if_index=self.pg1.sw_if_index)
70 self.vapi.nat66_add_del_static_mapping(
71 local_ip_address=self.pg0.remote_ip6,
72 external_ip_address=self.nat_addr,
77 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
78 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
81 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
82 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
85 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
86 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
89 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
90 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
93 self.pg0.add_stream(pkts)
94 self.pg_enable_capture(self.pg_interfaces)
96 capture = self.pg1.get_capture(len(pkts))
98 for packet in capture:
100 self.assertEqual(packet[IPv6].src, self.nat_addr)
101 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
102 self.assert_packet_checksums_valid(packet)
104 self.logger.error(ppp("Unexpected or invalid packet:", packet))
109 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
110 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
113 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
114 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
117 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
118 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
121 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
122 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
123 GRE() / IP() / TCP())
125 self.pg1.add_stream(pkts)
126 self.pg_enable_capture(self.pg_interfaces)
128 capture = self.pg0.get_capture(len(pkts))
129 for packet in capture:
131 self.assertEqual(packet[IPv6].src, self.pg1.remote_ip6)
132 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
133 self.assert_packet_checksums_valid(packet)
135 self.logger.error(ppp("Unexpected or invalid packet:", packet))
138 sm = self.vapi.nat66_static_mapping_dump()
139 self.assertEqual(len(sm), 1)
140 self.assertEqual(sm[0].total_pkts, 8)
142 def test_check_no_translate(self):
143 """ NAT66 translate only when egress interface is outside interface """
144 flags = self.config_flags.NAT_IS_INSIDE
145 self.vapi.nat66_add_del_interface(is_add=1, flags=flags,
146 sw_if_index=self.pg0.sw_if_index)
147 self.vapi.nat66_add_del_interface(is_add=1, flags=flags,
148 sw_if_index=self.pg1.sw_if_index)
149 self.vapi.nat66_add_del_static_mapping(
150 local_ip_address=self.pg0.remote_ip6,
151 external_ip_address=self.nat_addr,
155 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
156 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
158 self.pg0.add_stream([p])
159 self.pg_enable_capture(self.pg_interfaces)
161 capture = self.pg1.get_capture(1)
164 self.assertEqual(packet[IPv6].src, self.pg0.remote_ip6)
165 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
167 self.logger.error(ppp("Unexpected or invalid packet:", packet))
170 def clear_nat66(self):
172 Clear NAT66 configuration.
174 interfaces = self.vapi.nat66_interface_dump()
175 for intf in interfaces:
176 self.vapi.nat66_add_del_interface(is_add=0, flags=intf.flags,
177 sw_if_index=intf.sw_if_index)
179 static_mappings = self.vapi.nat66_static_mapping_dump()
180 for sm in static_mappings:
181 self.vapi.nat66_add_del_static_mapping(
182 local_ip_address=sm.local_ip_address,
183 external_ip_address=sm.external_ip_address, vrf_id=sm.vrf_id,
187 super(TestNAT66, self).tearDown()
190 def show_commands_at_teardown(self):
191 self.logger.info(self.vapi.cli("show nat66 interfaces"))
192 self.logger.info(self.vapi.cli("show nat66 static mappings"))
195 if __name__ == '__main__':
196 unittest.main(testRunner=VppTestRunner)