tests: replace pycodestyle with black
[vpp.git] / test / test_nat66.py
1 #!/usr/bin/env python3
2
3 import ipaddress
4 import random
5 import socket
6 import struct
7 import unittest
8 from io import BytesIO
9
10 import scapy.compat
11 from framework import VppTestCase, VppTestRunner
12 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
13 from scapy.all import (
14     bind_layers,
15     Packet,
16     ByteEnumField,
17     ShortField,
18     IPField,
19     IntField,
20     LongField,
21     XByteField,
22     FlagsField,
23     FieldLenField,
24     PacketListField,
25 )
26 from scapy.data import IP_PROTOS
27 from scapy.layers.inet import IP, TCP, UDP, ICMP
28 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
29 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6, IPv6ExtHdrFragment
30 from scapy.layers.inet6 import (
31     IPv6,
32     ICMPv6EchoRequest,
33     ICMPv6EchoReply,
34     ICMPv6ND_NS,
35     ICMPv6ND_NA,
36     ICMPv6NDOptDstLLAddr,
37     fragment6,
38 )
39 from scapy.layers.l2 import Ether, ARP, GRE
40 from scapy.packet import Raw
41 from syslog_rfc5424_parser import SyslogMessage, ParseError
42 from syslog_rfc5424_parser.constants import SyslogSeverity
43 from util import ip4_range
44 from util import ppc, ppp
45 from vpp_acl import AclRule, VppAcl, VppAclInterface
46 from vpp_ip_route import VppIpRoute, VppRoutePath
47 from vpp_neighbor import VppNeighbor
48 from vpp_papi import VppEnum
49
50
51 class TestNAT66(VppTestCase):
52     """NAT66 Test Cases"""
53
54     @classmethod
55     def setUpClass(cls):
56         super(TestNAT66, cls).setUpClass()
57
58         cls.nat_addr = "fd01:ff::2"
59         cls.create_pg_interfaces(range(2))
60         cls.interfaces = list(cls.pg_interfaces)
61
62         for i in cls.interfaces:
63             i.admin_up()
64             i.config_ip6()
65             i.configure_ipv6_neighbors()
66
67     @property
68     def config_flags(self):
69         return VppEnum.vl_api_nat_config_flags_t
70
71     def plugin_enable(self):
72         self.vapi.nat66_plugin_enable_disable(enable=1)
73
74     def plugin_disable(self):
75         self.vapi.nat66_plugin_enable_disable(enable=0)
76
77     def setUp(self):
78         super(TestNAT66, self).setUp()
79         self.plugin_enable()
80
81     def tearDown(self):
82         super(TestNAT66, self).tearDown()
83         if not self.vpp_dead:
84             self.plugin_disable()
85
86     def test_static(self):
87         """1:1 NAT66 test"""
88         flags = self.config_flags.NAT_IS_INSIDE
89         self.vapi.nat66_add_del_interface(
90             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
91         )
92         self.vapi.nat66_add_del_interface(is_add=1, sw_if_index=self.pg1.sw_if_index)
93         self.vapi.nat66_add_del_static_mapping(
94             local_ip_address=self.pg0.remote_ip6,
95             external_ip_address=self.nat_addr,
96             is_add=1,
97         )
98
99         # in2out
100         pkts = []
101         p = (
102             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
103             / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6)
104             / TCP()
105         )
106         pkts.append(p)
107         p = (
108             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
109             / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6)
110             / UDP()
111         )
112         pkts.append(p)
113         p = (
114             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
115             / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6)
116             / ICMPv6EchoRequest()
117         )
118         pkts.append(p)
119         p = (
120             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
121             / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6)
122             / GRE()
123             / IP()
124             / TCP()
125         )
126         pkts.append(p)
127         self.pg0.add_stream(pkts)
128         self.pg_enable_capture(self.pg_interfaces)
129         self.pg_start()
130         capture = self.pg1.get_capture(len(pkts))
131
132         for packet in capture:
133             try:
134                 self.assertEqual(packet[IPv6].src, self.nat_addr)
135                 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
136                 self.assert_packet_checksums_valid(packet)
137             except:
138                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
139                 raise
140
141         # out2in
142         pkts = []
143         p = (
144             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
145             / IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr)
146             / TCP()
147         )
148         pkts.append(p)
149         p = (
150             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
151             / IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr)
152             / UDP()
153         )
154         pkts.append(p)
155         p = (
156             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
157             / IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr)
158             / ICMPv6EchoReply()
159         )
160         pkts.append(p)
161         p = (
162             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
163             / IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr)
164             / GRE()
165             / IP()
166             / TCP()
167         )
168         pkts.append(p)
169         self.pg1.add_stream(pkts)
170         self.pg_enable_capture(self.pg_interfaces)
171         self.pg_start()
172         capture = self.pg0.get_capture(len(pkts))
173         for packet in capture:
174             try:
175                 self.assertEqual(packet[IPv6].src, self.pg1.remote_ip6)
176                 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
177                 self.assert_packet_checksums_valid(packet)
178             except:
179                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
180                 raise
181
182         sm = self.vapi.nat66_static_mapping_dump()
183         self.assertEqual(len(sm), 1)
184         self.assertEqual(sm[0].total_pkts, 8)
185
186     def test_check_no_translate(self):
187         """NAT66 translate only when egress interface is outside interface"""
188         flags = self.config_flags.NAT_IS_INSIDE
189         self.vapi.nat66_add_del_interface(
190             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
191         )
192         self.vapi.nat66_add_del_interface(
193             is_add=1, flags=flags, sw_if_index=self.pg1.sw_if_index
194         )
195         self.vapi.nat66_add_del_static_mapping(
196             local_ip_address=self.pg0.remote_ip6,
197             external_ip_address=self.nat_addr,
198             is_add=1,
199         )
200
201         # in2out
202         p = (
203             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
204             / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6)
205             / UDP()
206         )
207         self.pg0.add_stream([p])
208         self.pg_enable_capture(self.pg_interfaces)
209         self.pg_start()
210         capture = self.pg1.get_capture(1)
211         packet = capture[0]
212         try:
213             self.assertEqual(packet[IPv6].src, self.pg0.remote_ip6)
214             self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
215         except:
216             self.logger.error(ppp("Unexpected or invalid packet:", packet))
217             raise
218
219
220 if __name__ == "__main__":
221     unittest.main(testRunner=VppTestRunner)