IPIP: Add IP{v4,v6} over IP{v4,v6} configured tunnel support.
[vpp.git] / test / test_ipip.py
1 #
2 # IP{4,6} over IP{v,6} tunnel functional tests
3 #
4
5 import unittest
6 from scapy.layers.inet import IP, UDP, ICMP
7 from scapy.layers.inet6 import IPv6
8 from scapy.layers.l2 import Ether, GRE
9 from scapy.packet import Raw
10
11 from framework import VppTestCase
12 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
13 from util import ppp
14 from ipaddress import *
15
16 """ Testipip is a subclass of  VPPTestCase classes.
17
18 IPIP tests.
19
20 """
21
22
23 class TestIPIP(VppTestCase):
24     """ IPIP Test Case """
25
26     @classmethod
27     def setUpClass(cls):
28         super(TestIPIP, cls).setUpClass()
29         try:
30             cls.create_pg_interfaces(range(2))
31             cls.interfaces = list(cls.pg_interfaces)
32         except Exception:
33             super(TestIPIP, cls).tearDownClass()
34             raise
35
36     def setUp(cls):
37         super(TestIPIP, cls).setUp()
38         try:
39             for i in cls.interfaces:
40                 i.admin_up()
41                 i.config_ip4()
42                 i.config_ip6()
43                 i.disable_ipv6_ra()
44                 i.resolve_arp()
45                 i.resolve_ndp()
46         except Exception:
47             super(TestIPIP, cls).tearDown()
48             raise
49
50     def tearDown(self):
51         super(TestIPIP, self).tearDown()
52         if not self.vpp_dead:
53             self.vapi.cli("show hardware")
54         for i in self.pg_interfaces:
55             i.unconfig_ip4()
56             i.unconfig_ip6()
57             i.admin_down()
58         self.vapi.cli("show error")
59
60     def validate(self, rx, expected):
61         expected = expected.__class__(str(expected))
62         if rx != expected:
63             print('RX packet:')
64             print(rx.show())
65             print('EXPECTED packet:')
66             print(expected.show())
67             self.assertDictEqual(rx, expected)
68
69     def payload(self, len):
70         return 'x' * len
71
72     def test_ipip4(self):
73         """ ip{v4,v6} over ip4 test """
74         p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
75         p_ip6 = IPv6(src="1::1", dst="DEAD::1", nh='UDP')
76         p_ip4 = IP(src="1.2.3.4", dst="130.67.0.1")
77         p_payload = UDP(sport=1234, dport=1234)
78
79         # IPv4 transport
80         rv = self.vapi.ipip_add_tunnel(
81             src_address=str(ip_address(self.pg0.local_ip4).packed),
82             dst_address=str(ip_address(self.pg1.remote_ip4).packed),
83             is_ipv6=0)
84         self.assertEqual(rv.retval, 0)
85         sw_if_index = rv.sw_if_index
86
87         # Set interface up and enable IP on it
88         rv = self.vapi.sw_interface_set_flags(sw_if_index, 1)
89         self.assertEqual(rv.retval, 0)
90         rv = self.vapi.sw_interface_set_unnumbered(
91             ip_sw_if_index=self.pg0.sw_if_index,
92             sw_if_index=sw_if_index)
93         self.assertEqual(rv.retval, 0)
94
95         # Add IPv4 and IPv6 routes via tunnel interface
96         ip4_via_tunnel = VppIpRoute(
97             self, "130.67.0.0", 16,
98             [VppRoutePath("0.0.0.0",
99                           sw_if_index,
100                           proto=DpoProto.DPO_PROTO_IP4)], is_ip6=0)
101         ip4_via_tunnel.add_vpp_config()
102
103         ip6_via_tunnel = VppIpRoute(
104             self, "dead::", 16,
105             [VppRoutePath("::",
106                           sw_if_index,
107                           proto=DpoProto.DPO_PROTO_IP6)], is_ip6=1)
108         ip6_via_tunnel.add_vpp_config()
109
110         # IPv6 in to IPv4 tunnel
111         p6 = (p_ether / p_ip6 / p_payload)
112         p_inner_ip6 = p_ip6
113         p_inner_ip6.hlim -= 1
114         p6_reply = (IP(src=self.pg0.local_ip4, dst=self.pg1.remote_ip4,
115                        proto='ipv6', id=0) / p_inner_ip6 / p_payload)
116         p6_reply.ttl -= 1
117         rx = self.send_and_expect(self.pg0, p6*10, self.pg1)
118         for p in rx:
119             self.validate(p[1], p6_reply)
120
121         # IPv4 in to IPv4 tunnel
122         p4 = (p_ether / p_ip4 / p_payload)
123         p_ip4_inner = p_ip4
124         p_ip4_inner.ttl -= 1
125         p4_reply = (IP(src=self.pg0.local_ip4,
126                        dst=self.pg1.remote_ip4) / p_ip4_inner / p_payload)
127         p4_reply.ttl -= 1
128         p4_reply.id = 0
129         rx = self.send_and_expect(self.pg0, p4*10, self.pg1)
130         for p in rx:
131             self.validate(p[1], p4_reply)
132
133         # Decapsulation
134         p_ether = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
135
136         # IPv4 tunnel to IPv4
137         p_ip4 = IP(src="1.2.3.4", dst=self.pg0.remote_ip4)
138         p4 = (p_ether / IP(src=self.pg1.remote_ip4,
139                            dst=self.pg0.local_ip4) / p_ip4 / p_payload)
140         p4_reply = (p_ip4 / p_payload)
141         p4_reply.ttl -= 1
142         rx = self.send_and_expect(self.pg1, p4*10, self.pg0)
143         for p in rx:
144             self.validate(p[1], p4_reply)
145
146         # IPv4 tunnel to IPv6
147         p_ip6 = IPv6(src="1:2:3::4", dst=self.pg0.remote_ip6)
148         p6 = (p_ether / IP(src=self.pg1.remote_ip4,
149                            dst=self.pg0.local_ip4) / p_ip6 / p_payload)
150         p6_reply = (p_ip6 / p_payload)
151         p6_reply.hlim = 63
152         rx = self.send_and_expect(self.pg1, p6*10, self.pg0)
153         for p in rx:
154             self.validate(p[1], p6_reply)
155
156     def test_ipip6(self):
157         """ ip{v4,v6} over ip6 test """
158         p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
159         p_ip6 = IPv6(src="1::1", dst="DEAD::1", nh='UDP')
160         p_ip4 = IP(src="1.2.3.4", dst="130.67.0.1")
161         p_payload = UDP(sport=1234, dport=1234)
162
163         # IPv6 transport
164         rv = self.vapi.ipip_add_tunnel(
165             src_address=str(ip_address(self.pg0.local_ip6).packed),
166             dst_address=str(ip_address(self.pg1.remote_ip6).packed))
167         self.assertEqual(rv.retval, 0)
168
169         sw_if_index = rv.sw_if_index
170
171         rv = self.vapi.sw_interface_set_flags(sw_if_index, 1)
172         self.assertEqual(rv.retval, 0)
173         rv = self.vapi.sw_interface_set_unnumbered(
174             ip_sw_if_index=self.pg0.sw_if_index, sw_if_index=sw_if_index)
175         self.assertEqual(rv.retval, 0)
176
177         # Add IPv4 and IPv6 routes via tunnel interface
178         ip4_via_tunnel = VppIpRoute(
179             self, "130.67.0.0", 16,
180             [VppRoutePath("0.0.0.0",
181                           sw_if_index,
182                           proto=DpoProto.DPO_PROTO_IP4)], is_ip6=0)
183         ip4_via_tunnel.add_vpp_config()
184
185         ip6_via_tunnel = VppIpRoute(
186             self, "dead::", 16,
187             [VppRoutePath("::",
188                           sw_if_index,
189                           proto=DpoProto.DPO_PROTO_IP6)], is_ip6=1)
190         ip6_via_tunnel.add_vpp_config()
191
192         # Encapsulation
193
194         # IPv6 in to IPv6 tunnel
195         p6 = (p_ether / p_ip6 / p_payload)
196         p6_reply = (IPv6(src=self.pg0.local_ip6,
197                          dst=self.pg1.remote_ip6, hlim=63) / p_ip6 / p_payload)
198         p6_reply[1].hlim -= 1
199         rx = self.send_and_expect(self.pg0, p6*10, self.pg1)
200         for p in rx:
201             self.validate(p[1], p6_reply)
202
203         # IPv4 in to IPv6 tunnel
204         p4 = (p_ether / p_ip4 / p_payload)
205         p4_reply = (IPv6(src=self.pg0.local_ip6,
206                          dst=self.pg1.remote_ip6, hlim=63) / p_ip4 / p_payload)
207         p4_reply[1].ttl -= 1
208         rx = self.send_and_expect(self.pg0, p4*10, self.pg1)
209         for p in rx:
210             self.validate(p[1], p4_reply)
211
212         # Decapsulation
213
214         p_ether = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
215
216         # IPv6 tunnel to IPv4
217         p_ip4 = IP(src="1.2.3.4", dst=self.pg0.remote_ip4)
218         p4 = (p_ether / IPv6(src=self.pg1.remote_ip6,
219                              dst=self.pg0.local_ip6) / p_ip4 / p_payload)
220         p4_reply = (p_ip4 / p_payload)
221         p4_reply.ttl -= 1
222         rx = self.send_and_expect(self.pg1, p4*10, self.pg0)
223         for p in rx:
224             self.validate(p[1], p4_reply)
225
226         # IPv6 tunnel to IPv6
227         p_ip6 = IPv6(src="1:2:3::4", dst=self.pg0.remote_ip6)
228         p6 = (p_ether / IPv6(src=self.pg1.remote_ip6,
229                              dst=self.pg0.local_ip6) / p_ip6 / p_payload)
230         p6_reply = (p_ip6 / p_payload)
231         p6_reply.hlim = 63
232         rx = self.send_and_expect(self.pg1, p6*10, self.pg0)
233         for p in rx:
234             self.validate(p[1], p6_reply)
235
236     def test_ipip_create(self):
237         """ ipip create / delete interface test """
238         rv = self.vapi.ipip_add_tunnel(
239             src_address=str(ip_address('1.2.3.4').packed),
240             dst_address=str(ip_address('2.3.4.5').packed), is_ipv6=0)
241         self.assertEqual(rv.retval, 0)
242         sw_if_index = rv.sw_if_index
243         rv = self.vapi.ipip_del_tunnel(sw_if_index)
244         self.assertEqual(rv.retval, 0)
245
246
247 if __name__ == '__main__':
248     unittest.main(testRunner=VppTestRunner)