ikev2: fix incorrect api message
[vpp.git] / src / plugins / nat / test / test_nat66.py
1 #!/usr/bin/env python3
2
3 import ipaddress
4 import random
5 import socket
6 import struct
7 import unittest
8 from io import BytesIO
9 from time import sleep
10
11 import scapy.compat
12 from framework import VppTestCase, VppTestRunner, running_extended_tests
13 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
14 from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \
15     IPField, IntField, LongField, XByteField, FlagsField, FieldLenField, \
16     PacketListField
17 from scapy.data import IP_PROTOS
18 from scapy.layers.inet import IP, TCP, UDP, ICMP
19 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
20 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
21 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply, \
22     ICMPv6ND_NS, ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, fragment6
23 from scapy.layers.l2 import Ether, ARP, GRE
24 from scapy.packet import Raw
25 from syslog_rfc5424_parser import SyslogMessage, ParseError
26 from syslog_rfc5424_parser.constants import SyslogSeverity
27 from util import ip4_range
28 from util import ppc, ppp
29 from vpp_acl import AclRule, VppAcl, VppAclInterface
30 from vpp_ip_route import VppIpRoute, VppRoutePath
31 from vpp_neighbor import VppNeighbor
32 from vpp_papi import VppEnum
33
34
35 class MethodHolder(VppTestCase):
36     """ NAT create capture and verify method holder """
37     @property
38     def config_flags(self):
39         return VppEnum.vl_api_nat_config_flags_t
40
41
42 class TestNAT66(MethodHolder):
43     """ NAT66 Test Cases """
44
45     @classmethod
46     def setUpClass(cls):
47         super(TestNAT66, cls).setUpClass()
48
49         cls.nat_addr = 'fd01:ff::2'
50
51         cls.create_pg_interfaces(range(2))
52         cls.interfaces = list(cls.pg_interfaces)
53
54         for i in cls.interfaces:
55             i.admin_up()
56             i.config_ip6()
57             i.configure_ipv6_neighbors()
58
59     @classmethod
60     def tearDownClass(cls):
61         super(TestNAT66, cls).tearDownClass()
62
63     def test_static(self):
64         """ 1:1 NAT66 test """
65         flags = self.config_flags.NAT_IS_INSIDE
66         self.vapi.nat66_add_del_interface(is_add=1, flags=flags,
67                                           sw_if_index=self.pg0.sw_if_index)
68         self.vapi.nat66_add_del_interface(is_add=1,
69                                           sw_if_index=self.pg1.sw_if_index)
70         self.vapi.nat66_add_del_static_mapping(
71             local_ip_address=self.pg0.remote_ip6,
72             external_ip_address=self.nat_addr,
73             is_add=1)
74
75         # in2out
76         pkts = []
77         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
78              IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
79              TCP())
80         pkts.append(p)
81         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
82              IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
83              UDP())
84         pkts.append(p)
85         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
86              IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
87              ICMPv6EchoRequest())
88         pkts.append(p)
89         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
90              IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
91              GRE() / IP() / TCP())
92         pkts.append(p)
93         self.pg0.add_stream(pkts)
94         self.pg_enable_capture(self.pg_interfaces)
95         self.pg_start()
96         capture = self.pg1.get_capture(len(pkts))
97
98         for packet in capture:
99             try:
100                 self.assertEqual(packet[IPv6].src, self.nat_addr)
101                 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
102                 self.assert_packet_checksums_valid(packet)
103             except:
104                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
105                 raise
106
107         # out2in
108         pkts = []
109         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
110              IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
111              TCP())
112         pkts.append(p)
113         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
114              IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
115              UDP())
116         pkts.append(p)
117         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
118              IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
119              ICMPv6EchoReply())
120         pkts.append(p)
121         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
122              IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr) /
123              GRE() / IP() / TCP())
124         pkts.append(p)
125         self.pg1.add_stream(pkts)
126         self.pg_enable_capture(self.pg_interfaces)
127         self.pg_start()
128         capture = self.pg0.get_capture(len(pkts))
129         for packet in capture:
130             try:
131                 self.assertEqual(packet[IPv6].src, self.pg1.remote_ip6)
132                 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
133                 self.assert_packet_checksums_valid(packet)
134             except:
135                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
136                 raise
137
138         sm = self.vapi.nat66_static_mapping_dump()
139         self.assertEqual(len(sm), 1)
140         self.assertEqual(sm[0].total_pkts, 8)
141
142     def test_check_no_translate(self):
143         """ NAT66 translate only when egress interface is outside interface """
144         flags = self.config_flags.NAT_IS_INSIDE
145         self.vapi.nat66_add_del_interface(is_add=1, flags=flags,
146                                           sw_if_index=self.pg0.sw_if_index)
147         self.vapi.nat66_add_del_interface(is_add=1, flags=flags,
148                                           sw_if_index=self.pg1.sw_if_index)
149         self.vapi.nat66_add_del_static_mapping(
150             local_ip_address=self.pg0.remote_ip6,
151             external_ip_address=self.nat_addr,
152             is_add=1)
153
154         # in2out
155         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
156              IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6) /
157              UDP())
158         self.pg0.add_stream([p])
159         self.pg_enable_capture(self.pg_interfaces)
160         self.pg_start()
161         capture = self.pg1.get_capture(1)
162         packet = capture[0]
163         try:
164             self.assertEqual(packet[IPv6].src, self.pg0.remote_ip6)
165             self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
166         except:
167             self.logger.error(ppp("Unexpected or invalid packet:", packet))
168             raise
169
170     def clear_nat66(self):
171         """
172         Clear NAT66 configuration.
173         """
174         interfaces = self.vapi.nat66_interface_dump()
175         for intf in interfaces:
176             self.vapi.nat66_add_del_interface(is_add=0, flags=intf.flags,
177                                               sw_if_index=intf.sw_if_index)
178
179         static_mappings = self.vapi.nat66_static_mapping_dump()
180         for sm in static_mappings:
181             self.vapi.nat66_add_del_static_mapping(
182                 local_ip_address=sm.local_ip_address,
183                 external_ip_address=sm.external_ip_address, vrf_id=sm.vrf_id,
184                 is_add=0)
185
186     def tearDown(self):
187         super(TestNAT66, self).tearDown()
188         self.clear_nat66()
189
190     def show_commands_at_teardown(self):
191         self.logger.info(self.vapi.cli("show nat66 interfaces"))
192         self.logger.info(self.vapi.cli("show nat66 static mappings"))
193
194
195 if __name__ == '__main__':
196     unittest.main(testRunner=VppTestRunner)