6RD: Rewritten 6RD RFC5969 support.
[vpp.git] / test / test_sixrd.py
1 #
2 # 6RD RFC5969 functional tests
3 #
4
5 import unittest
6 from scapy.layers.inet import IP, UDP, ICMP
7 from scapy.layers.inet6 import IPv6
8 from scapy.layers.l2 import Ether, GRE
9 from scapy.packet import Raw
10
11 from framework import VppTestCase
12 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
13 from util import ppp
14 from ipaddress import *
15
16 """ Test6rd is a subclass of  VPPTestCase classes.
17
18 6RD tests.
19
20 """
21
22
23 class Test6RD(VppTestCase):
24     """ 6RD Test Case """
25
26     @classmethod
27     def setUpClass(cls):
28         super(Test6RD, cls).setUpClass()
29         try:
30             cls.create_pg_interfaces(range(2))
31             cls.interfaces = list(cls.pg_interfaces)
32         except Exception:
33             super(Test6RD, cls).tearDownClass()
34             raise
35
36     def setUp(cls):
37         super(Test6RD, cls).setUp()
38         try:
39             for i in cls.interfaces:
40                 i.admin_up()
41                 i.config_ip4()
42                 i.config_ip6()
43                 i.disable_ipv6_ra()
44                 i.resolve_arp()
45                 i.resolve_ndp()
46         except Exception:
47             super(Test6RD, cls).tearDown()
48             raise
49
50     def tearDown(self):
51         super(Test6RD, self).tearDown()
52         if not self.vpp_dead:
53             self.vapi.cli("show hardware")
54         for i in self.pg_interfaces:
55             i.unconfig_ip4()
56             i.unconfig_ip6()
57             i.admin_down()
58         if type(self.tunnel_index) is list:
59             for sw_if_index in self.tunnel_index:
60                 self.vapi.sixrd_del_tunnel(sw_if_index)
61         else:
62             self.vapi.sixrd_del_tunnel(self.tunnel_index)
63         self.vapi.cli("show error")
64
65     def validate_6in4(self, rx, expected):
66         if IP not in rx:
67             self.fail()
68         if IPv6 not in rx:
69             self.fail()
70
71         self.assertEqual(rx[IP].src, expected[IP].src)
72         self.assertEqual(rx[IP].dst, expected[IP].dst)
73         self.assertEqual(rx[IP].proto, expected[IP].proto)
74         self.assertEqual(rx[IPv6].src, expected[IPv6].src)
75         self.assertEqual(rx[IPv6].dst, expected[IPv6].dst)
76
77     def validate_4in6(self, rx, expected):
78         if IPv6 not in rx:
79             self.fail()
80         if IP in rx:
81             self.fail()
82
83         self.assertTrue(rx[IPv6].src == expected[IPv6].src)
84         self.assertTrue(rx[IPv6].dst == expected[IPv6].dst)
85         self.assertTrue(rx[IPv6].nh == expected[IPv6].nh)
86
87     def payload(self, len):
88         return 'x' * len
89
90     def test_6rd_ip6_to_ip4(self):
91         """ ip6 -> ip4 (encap) 6rd test """
92         p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
93         p_ip6 = IPv6(src="1::1", dst="2002:AC10:0202::1", nh='UDP')
94
95         rv = self.vapi.sixrd_add_tunnel(
96             0, str(ip_address('2002::').packed), 16,
97             str(ip_address('0.0.0.0').packed), 0,
98             str(ip_address(self.pg0.local_ip4).packed), 0, True)
99
100         self.assertEqual(rv.retval, 0)
101         self.tunnel_index = rv.sw_if_index
102         self.vapi.cli("show ip6 fib")
103         p_payload = UDP(sport=1234, dport=1234)
104         p = (p_ether / p_ip6 / p_payload)
105
106         p_reply = (IP(src=self.pg0.local_ip4, dst=self.pg1.remote_ip4,
107                       proto='ipv6') / p_ip6)
108
109         rx = self.send_and_expect(self.pg0, p*10, self.pg1)
110         for p in rx:
111             self.validate_6in4(p, p_reply)
112
113         # MTU tests (default is 1480)
114         plen = 1481 - 40 - 8
115         p_ip6 = IPv6(src="1::1", dst="2002:AC10:0202::1")
116         p_payload = UDP(sport=1234, dport=1234) / Raw(self.payload(plen))
117         p = (p_ether / p_ip6 / p_payload)
118
119         p_reply = (IP(src=self.pg0.local_ip4, dst=self.pg1.remote_ip4,
120                       proto='ipv6') / p_ip6)
121
122         rx = self.send_and_assert_no_replies(self.pg0, p*10)
123
124     def test_6rd_ip4_to_ip6(self):
125         """ ip4 -> ip6 (decap) 6rd test """
126
127         rv = self.vapi.sixrd_add_tunnel(
128             0, str(ip_address('2002::').packed),
129             16, str(ip_address('0.0.0.0').packed),
130             0, str(ip_address(self.pg0.local_ip4).packed), 0, True)
131         self.assertEqual(rv.retval, 0)
132         self.tunnel_index = rv.sw_if_index
133         self.vapi.cli("show ip6 fib")
134         p_ip6 = (IPv6(src="2002:AC10:0202::1", dst=self.pg1.remote_ip6) /
135                  UDP(sport=1234, dport=1234))
136
137         p = (Ether(src=self.pg0.remote_mac,
138                    dst=self.pg0.local_mac) /
139              IP(src=self.pg1.remote_ip4, dst=self.pg0.local_ip4) /
140              p_ip6)
141
142         p_reply = p_ip6
143
144         rx = self.send_and_expect(self.pg0, p*10, self.pg1)
145         for p in rx:
146             self.validate_4in6(p, p_reply)
147
148     def test_6rd_ip4_to_ip6_multiple(self):
149         """ ip4 -> ip6 (decap) 6rd test """
150
151         self.tunnel_index = []
152         rv = self.vapi.sixrd_add_tunnel(
153             0, str(ip_address('2002::').packed),
154             16, str(ip_address('0.0.0.0').packed),
155             0, str(ip_address(self.pg0.local_ip4).packed), 0, True)
156         self.assertEqual(rv.retval, 0)
157         self.tunnel_index.append(rv.sw_if_index)
158         rv = self.vapi.sixrd_add_tunnel(
159             0, str(ip_address('2003::').packed),
160             16, str(ip_address('0.0.0.0').packed),
161             0, str(ip_address(self.pg1.local_ip4).packed), 0, True)
162         self.assertEqual(rv.retval, 0)
163
164         self.tunnel_index.append(rv.sw_if_index)
165
166         self.vapi.cli("show ip6 fib")
167         p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
168         p_ip4 = IP(src=self.pg1.remote_ip4, dst=self.pg0.local_ip4)
169         p_ip6_1 = (IPv6(src="2002:AC10:0202::1", dst=self.pg1.remote_ip6) /
170                    UDP(sport=1234, dport=1234))
171         p_ip6_2 = (IPv6(src="2003:AC10:0202::1", dst=self.pg1.remote_ip6) /
172                    UDP(sport=1234, dport=1234))
173
174         p = (p_ether / p_ip4 / p_ip6_1)
175         rx = self.send_and_expect(self.pg0, p*10, self.pg1)
176         for p in rx:
177             self.validate_4in6(p, p_ip6_1)
178
179         p = (p_ether / p_ip4 / p_ip6_2)
180         rx = self.send_and_expect(self.pg0, p*10, self.pg1)
181         for p in rx:
182             self.validate_4in6(p, p_ip6_2)
183
184     def test_6rd_ip4_to_ip6_suffix(self):
185         """ ip4 -> ip6 (decap) 6rd test """
186
187         rv = self.vapi.sixrd_add_tunnel(
188             0, str(ip_address('2002::').packed), 16,
189             str(ip_address('172.0.0.0').packed), 8,
190             str(ip_address(self.pg0.local_ip4).packed), 0, True)
191         self.assertEqual(rv.retval, 0)
192
193         self.tunnel_index = rv.sw_if_index
194
195         self.vapi.cli("show ip6 fib")
196         p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
197         p_ip4 = IP(src=self.pg1.remote_ip4, dst=self.pg0.local_ip4)
198         p_ip6 = (IPv6(src="2002:1002:0200::1", dst=self.pg1.remote_ip6) /
199                  UDP(sport=1234, dport=1234))
200
201         p = (p_ether / p_ip4 / p_ip6)
202         rx = self.send_and_expect(self.pg0, p*10, self.pg1)
203         for p in rx:
204             self.validate_4in6(p, p_ip6)
205
206     def test_6rd_ip4_to_ip6_sec_check(self):
207         """ ip4 -> ip6 (decap) security check 6rd test """
208
209         rv = self.vapi.sixrd_add_tunnel(
210             0, str(ip_address('2002::').packed),
211             16, str(ip_address('0.0.0.0').packed),
212             0, str(ip_address(self.pg0.local_ip4).packed), 0, True)
213         self.assertEqual(rv.retval, 0)
214         self.tunnel_index = rv.sw_if_index
215         self.vapi.cli("show ip6 fib")
216         p_ip6 = (IPv6(src="2002:AC10:0202::1", dst=self.pg1.remote_ip6) /
217                  UDP(sport=1234, dport=1234))
218         p_ip6_fail = (IPv6(src="2002:DEAD:0202::1", dst=self.pg1.remote_ip6) /
219                       UDP(sport=1234, dport=1234))
220
221         p = (Ether(src=self.pg0.remote_mac,
222                    dst=self.pg0.local_mac) /
223              IP(src=self.pg1.remote_ip4, dst=self.pg0.local_ip4) /
224              p_ip6)
225
226         p_reply = p_ip6
227
228         rx = self.send_and_expect(self.pg0, p*10, self.pg1)
229         for p in rx:
230             self.validate_4in6(p, p_reply)
231
232         p = (Ether(src=self.pg0.remote_mac,
233                    dst=self.pg0.local_mac) /
234              IP(src=self.pg1.remote_ip4, dst=self.pg0.local_ip4) /
235              p_ip6_fail)
236         rx = self.send_and_assert_no_replies(self.pg0, p*10)
237
238     def test_6rd_bgp_tunnel(self):
239         """ 6rd BGP tunnel """
240
241         rv = self.vapi.sixrd_add_tunnel(
242             0, str(ip_address('2002::').packed),
243             16, str(ip_address('0.0.0.0').packed),
244             0, str(ip_address(self.pg0.local_ip4).packed), 0, False)
245         self.assertEqual(rv.retval, 0)
246         self.tunnel_index = rv.sw_if_index
247
248         default_route = VppIpRoute(
249             self, "DEAD::", 16, [VppRoutePath("2002:0808:0808::",
250                                               self.tunnel_index,
251                                               proto=DpoProto.DPO_PROTO_IP6)],
252             is_ip6=1)
253         default_route.add_vpp_config()
254
255         ip4_route = VppIpRoute(self, "8.0.0.0", 8,
256                                [VppRoutePath(self.pg1.remote_ip4, 0xFFFFFFFF)])
257         ip4_route.add_vpp_config()
258
259         # Via recursive route 6 -> 4
260         p = (Ether(src=self.pg0.remote_mac,
261                    dst=self.pg0.local_mac) /
262              IPv6(src="1::1", dst="DEAD:BEEF::1") /
263              UDP(sport=1234, dport=1234))
264
265         p_reply = (IP(src=self.pg0.local_ip4, dst="8.8.8.8",
266                       proto='ipv6') /
267                    IPv6(src='1::1', dst='DEAD:BEEF::1', nh='UDP'))
268
269         rx = self.send_and_expect(self.pg0, p*10, self.pg1)
270         for p in rx:
271             self.validate_6in4(p, p_reply)
272
273         # Via recursive route 4 -> 6 (Security check must be disabled)
274         p_ip6 = (IPv6(src="DEAD:BEEF::1", dst=self.pg1.remote_ip6) /
275                  UDP(sport=1234, dport=1234))
276         p = (Ether(src=self.pg0.remote_mac,
277                    dst=self.pg0.local_mac) /
278              IP(src="8.8.8.8", dst=self.pg0.local_ip4) /
279              p_ip6)
280
281         p_reply = p_ip6
282
283         rx = self.send_and_expect(self.pg0, p*10, self.pg1)
284         for p in rx:
285             self.validate_4in6(p, p_reply)
286
287
288 if __name__ == '__main__':
289     unittest.main(testRunner=VppTestRunner)