tests: Use errno value rather than a specific int
[vpp.git] / test / test_nat66.py
1 #!/usr/bin/env python3
2
3 import unittest
4
5 from framework import VppTestCase
6 from asfframework import VppTestRunner
7 from scapy.layers.inet import IP, TCP, UDP
8 from scapy.layers.inet6 import (
9     IPv6,
10     ICMPv6EchoRequest,
11     ICMPv6EchoReply,
12 )
13 from scapy.layers.l2 import Ether, GRE
14 from util import ppp
15 from vpp_papi import VppEnum
16
17
18 class TestNAT66(VppTestCase):
19     """NAT66 Test Cases"""
20
21     @classmethod
22     def setUpClass(cls):
23         super(TestNAT66, cls).setUpClass()
24
25         cls.nat_addr = "fd01:ff::2"
26         cls.create_pg_interfaces(range(2))
27         cls.interfaces = list(cls.pg_interfaces)
28
29         for i in cls.interfaces:
30             i.admin_up()
31             i.config_ip6()
32             i.configure_ipv6_neighbors()
33
34     @property
35     def config_flags(self):
36         return VppEnum.vl_api_nat_config_flags_t
37
38     def plugin_enable(self):
39         self.vapi.nat66_plugin_enable_disable(enable=1)
40
41     def plugin_disable(self):
42         self.vapi.nat66_plugin_enable_disable(enable=0)
43
44     def setUp(self):
45         super(TestNAT66, self).setUp()
46         self.plugin_enable()
47
48     def tearDown(self):
49         super(TestNAT66, self).tearDown()
50         if not self.vpp_dead:
51             self.plugin_disable()
52
53     def test_static(self):
54         """1:1 NAT66 test"""
55         flags = self.config_flags.NAT_IS_INSIDE
56         self.vapi.nat66_add_del_interface(
57             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
58         )
59         self.vapi.nat66_add_del_interface(is_add=1, sw_if_index=self.pg1.sw_if_index)
60         self.vapi.nat66_add_del_static_mapping(
61             local_ip_address=self.pg0.remote_ip6,
62             external_ip_address=self.nat_addr,
63             is_add=1,
64         )
65
66         # in2out
67         pkts = []
68         p = (
69             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
70             / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6)
71             / TCP()
72         )
73         pkts.append(p)
74         p = (
75             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
76             / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6)
77             / UDP()
78         )
79         pkts.append(p)
80         p = (
81             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
82             / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6)
83             / ICMPv6EchoRequest()
84         )
85         pkts.append(p)
86         p = (
87             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
88             / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6)
89             / GRE()
90             / IP()
91             / TCP()
92         )
93         pkts.append(p)
94         self.pg0.add_stream(pkts)
95         self.pg_enable_capture(self.pg_interfaces)
96         self.pg_start()
97         capture = self.pg1.get_capture(len(pkts))
98
99         for packet in capture:
100             try:
101                 self.assertEqual(packet[IPv6].src, self.nat_addr)
102                 self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
103                 self.assert_packet_checksums_valid(packet)
104             except:
105                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
106                 raise
107
108         # out2in
109         pkts = []
110         p = (
111             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
112             / IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr)
113             / TCP()
114         )
115         pkts.append(p)
116         p = (
117             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
118             / IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr)
119             / UDP()
120         )
121         pkts.append(p)
122         p = (
123             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
124             / IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr)
125             / ICMPv6EchoReply()
126         )
127         pkts.append(p)
128         p = (
129             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
130             / IPv6(src=self.pg1.remote_ip6, dst=self.nat_addr)
131             / GRE()
132             / IP()
133             / TCP()
134         )
135         pkts.append(p)
136         self.pg1.add_stream(pkts)
137         self.pg_enable_capture(self.pg_interfaces)
138         self.pg_start()
139         capture = self.pg0.get_capture(len(pkts))
140         for packet in capture:
141             try:
142                 self.assertEqual(packet[IPv6].src, self.pg1.remote_ip6)
143                 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
144                 self.assert_packet_checksums_valid(packet)
145             except:
146                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
147                 raise
148
149         sm = self.vapi.nat66_static_mapping_dump()
150         self.assertEqual(len(sm), 1)
151         self.assertEqual(sm[0].total_pkts, 8)
152
153     def test_check_no_translate(self):
154         """NAT66 translate only when egress interface is outside interface"""
155         flags = self.config_flags.NAT_IS_INSIDE
156         self.vapi.nat66_add_del_interface(
157             is_add=1, flags=flags, sw_if_index=self.pg0.sw_if_index
158         )
159         self.vapi.nat66_add_del_interface(
160             is_add=1, flags=flags, sw_if_index=self.pg1.sw_if_index
161         )
162         self.vapi.nat66_add_del_static_mapping(
163             local_ip_address=self.pg0.remote_ip6,
164             external_ip_address=self.nat_addr,
165             is_add=1,
166         )
167
168         # in2out
169         p = (
170             Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac)
171             / IPv6(src=self.pg0.remote_ip6, dst=self.pg1.remote_ip6)
172             / UDP()
173         )
174         self.pg0.add_stream([p])
175         self.pg_enable_capture(self.pg_interfaces)
176         self.pg_start()
177         capture = self.pg1.get_capture(1)
178         packet = capture[0]
179         try:
180             self.assertEqual(packet[IPv6].src, self.pg0.remote_ip6)
181             self.assertEqual(packet[IPv6].dst, self.pg1.remote_ip6)
182         except:
183             self.logger.error(ppp("Unexpected or invalid packet:", packet))
184             raise
185
186
187 if __name__ == "__main__":
188     unittest.main(testRunner=VppTestRunner)