11 from framework import VppTestCase, VppTestRunner
12 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
13 from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \
14 IPField, IntField, LongField, XByteField, FlagsField, FieldLenField, \
16 from scapy.data import IP_PROTOS
17 from scapy.layers.inet import IP, TCP, UDP, ICMP
18 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
19 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
20 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
21 ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, fragment6
22 from scapy.layers.l2 import Ether, ARP, GRE
23 from scapy.packet import Raw
24 from syslog_rfc5424_parser import SyslogMessage, ParseError
25 from syslog_rfc5424_parser.constants import SyslogSeverity
26 from util import ip4_range
27 from util import ppc, ppp
28 from vpp_acl import AclRule, VppAcl, VppAclInterface
29 from vpp_ip_route import VppIpRoute, VppRoutePath
30 from vpp_neighbor import VppNeighbor
31 from vpp_papi import VppEnum
34 class TestNAT66(VppTestCase):
35 """ NAT66 Test Cases """
39 super(TestNAT66, cls).setUpClass()
41 cls.nat_addr = 'fd01:ff::2'
42 cls.create_pg_interfaces(range(2))
43 cls.interfaces = list(cls.pg_interfaces)
45 for i in cls.interfaces:
48 i.configure_ipv6_neighbors()
51 def config_flags(self):
52 return VppEnum.vl_api_nat_config_flags_t
54 def plugin_enable(self):
55 self.vapi.nat66_plugin_enable_disable(enable=1)
57 def plugin_disable(self):
58 self.vapi.nat66_plugin_enable_disable(enable=0)
61 super(TestNAT66, self).setUp()
65 super(TestNAT66, self).tearDown()
69 def test_static(self):
70 """ 1:1 NAT66 test """
71 flags = self.config_flags.NAT_IS_INSIDE
72 self.vapi.nat66_add_del_interface(is_add=1, flags=flags,
73 sw_if_index=self.pg0.sw_if_index)
74 self.vapi.nat66_add_del_interface(is_add=1,
75 sw_if_index=self.pg1.sw_if_index)
76 self.vapi.nat66_add_del_static_mapping(
77 local_ip_address=self.pg0.remote_ip6,
78 external_ip_address=self.nat_addr,
83 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
84 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
87 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
88 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
91 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
92 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
95 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
96 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
99 self.pg0.add_stream(pkts)
100 self.pg_enable_capture(self.pg_interfaces)
102 capture = self.pg1.get_capture(len(pkts))
104 for packet in capture:
106 self.assertEqual(packet[IPv6].src, self.nat_addr)
107 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
108 self.assert_packet_checksums_valid(packet)
110 self.logger.error(ppp("Unexpected or invalid packet:", packet))
115 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
116 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
119 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
120 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
123 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
124 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
127 p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
128 IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
129 GRE() / IP() / TCP())
131 self.pg1.add_stream(pkts)
132 self.pg_enable_capture(self.pg_interfaces)
134 capture = self.pg0.get_capture(len(pkts))
135 for packet in capture:
137 self.assertEqual(packet[IPv6].src, self.pg1.remote_ip6)
138 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
139 self.assert_packet_checksums_valid(packet)
141 self.logger.error(ppp("Unexpected or invalid packet:", packet))
144 sm = self.vapi.nat66_static_mapping_dump()
145 self.assertEqual(len(sm), 1)
146 self.assertEqual(sm[0].total_pkts, 8)
148 def test_check_no_translate(self):
149 """ NAT66 translate only when egress interface is outside interface """
150 flags = self.config_flags.NAT_IS_INSIDE
151 self.vapi.nat66_add_del_interface(is_add=1, flags=flags,
152 sw_if_index=self.pg0.sw_if_index)
153 self.vapi.nat66_add_del_interface(is_add=1, flags=flags,
154 sw_if_index=self.pg1.sw_if_index)
155 self.vapi.nat66_add_del_static_mapping(
156 local_ip_address=self.pg0.remote_ip6,
157 external_ip_address=self.nat_addr,
161 p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
162 IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
164 self.pg0.add_stream([p])
165 self.pg_enable_capture(self.pg_interfaces)
167 capture = self.pg1.get_capture(1)
170 self.assertEqual(packet[IPv6].src, self.pg0.remote_ip6)
171 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
173 self.logger.error(ppp("Unexpected or invalid packet:", packet))
177 if __name__ == '__main__':
178 unittest.main(testRunner=VppTestRunner)