MTU: Setting of MTU on software interface (instead of hardware interface)
[vpp.git] / test / test_ipip.py
1 #!/usr/bin/env python
2 """IP{4,6} over IP{v,6} tunnel functional tests"""
3
4 import unittest
5 from scapy.layers.inet6 import IPv6, Ether, IP, UDP, ICMPv6PacketTooBig
6 from scapy.layers.inet import ICMP
7 from framework import VppTestCase, VppTestRunner
8 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
9 from socket import AF_INET, AF_INET6, inet_pton
10
11 """ Testipip is a subclass of  VPPTestCase classes.
12
13 IPIP tests.
14
15 """
16
17
18 class TestIPIP(VppTestCase):
19     """ IPIP Test Case """
20
21     @classmethod
22     def setUpClass(cls):
23         super(TestIPIP, cls).setUpClass()
24         cls.create_pg_interfaces(range(2))
25         cls.interfaces = list(cls.pg_interfaces)
26
27     def setUp(cls):
28         super(TestIPIP, cls).setUp()
29         for i in cls.interfaces:
30             i.admin_up()
31             i.config_ip4()
32             i.config_ip6()
33             i.disable_ipv6_ra()
34             i.resolve_arp()
35             i.resolve_ndp()
36
37     def tearDown(self):
38         super(TestIPIP, self).tearDown()
39         if not self.vpp_dead:
40             for i in self.pg_interfaces:
41                 i.unconfig_ip4()
42                 i.unconfig_ip6()
43                 i.admin_down()
44
45     def validate(self, rx, expected):
46         self.assertEqual(rx, expected.__class__(str(expected)))
47
48     def validate_bytes(self, rx, expected):
49         self.assertEqual(rx, expected)
50
51     def payload(self, len):
52         return 'x' * len
53
54     def test_ipip4(self):
55         """ ip{v4,v6} over ip4 test """
56         p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
57         p_ip6 = IPv6(src="1::1", dst="DEAD::1", nh='UDP')
58         p_ip4 = IP(src=self.pg0.remote_ip4, dst="130.67.0.1")
59         p_payload = UDP(sport=1234, dport=1234) / self.payload(1300)
60
61         # IPv4 transport
62         rv = self.vapi.ipip_add_tunnel(
63             src_address=self.pg0.local_ip4n,
64             dst_address=self.pg1.remote_ip4n,
65             is_ipv6=0)
66         sw_if_index = rv.sw_if_index
67
68         # Set interface up and enable IP on it
69         self.vapi.sw_interface_set_flags(sw_if_index, 1)
70         self.vapi.sw_interface_set_unnumbered(
71             ip_sw_if_index=self.pg0.sw_if_index,
72             sw_if_index=sw_if_index)
73
74         # Add IPv4 and IPv6 routes via tunnel interface
75         ip4_via_tunnel = VppIpRoute(
76             self, "130.67.0.0", 16,
77             [VppRoutePath("0.0.0.0",
78                           sw_if_index,
79                           proto=DpoProto.DPO_PROTO_IP4)], is_ip6=0)
80         ip4_via_tunnel.add_vpp_config()
81
82         ip6_via_tunnel = VppIpRoute(
83             self, "dead::", 16,
84             [VppRoutePath("::",
85                           sw_if_index,
86                           proto=DpoProto.DPO_PROTO_IP6)], is_ip6=1)
87         ip6_via_tunnel.add_vpp_config()
88
89         # IPv6 in to IPv4 tunnel
90         p6 = (p_ether / p_ip6 / p_payload)
91         p_inner_ip6 = p_ip6
92         p_inner_ip6.hlim -= 1
93         p6_reply = (IP(src=self.pg0.local_ip4, dst=self.pg1.remote_ip4,
94                        proto='ipv6', id=0) / p_inner_ip6 / p_payload)
95         p6_reply.ttl -= 1
96         rx = self.send_and_expect(self.pg0, p6*10, self.pg1)
97         for p in rx:
98             self.validate(p[1], p6_reply)
99
100         # IPv4 in to IPv4 tunnel
101         p4 = (p_ether / p_ip4 / p_payload)
102         p_ip4_inner = p_ip4
103         p_ip4_inner.ttl -= 1
104         p4_reply = (IP(src=self.pg0.local_ip4,
105                        dst=self.pg1.remote_ip4) / p_ip4_inner / p_payload)
106         p4_reply.ttl -= 1
107         p4_reply.id = 0
108         rx = self.send_and_expect(self.pg0, p4*11, self.pg1)
109         for p in rx:
110             self.validate(p[1], p4_reply)
111
112         # MTU (only checked on encap)
113         rv = self.vapi.sw_interface_set_mtu(sw_if_index, 576)
114         rv = self.vapi.sw_interface_dump()
115         for i in rv:
116             if i.sw_if_index == sw_if_index:
117                 self.assertEqual(i.mtu, 576)
118                 break
119
120         # Should fail. Too large MTU
121         p4.flags = 'DF'
122         p_icmp4 = ICMP(type='dest-unreach', code='fragmentation-needed',
123                        nexthopmtu=576, chksum=0xb6c7)
124         icmp4_reply = (IP(src=self.pg0.local_ip4,
125                           dst=self.pg0.remote_ip4,
126                           ttl=254, len=576, id=0) /
127                        p_icmp4 / p_ip4 / p_payload)
128         icmp4_reply[1].flags = 'DF'
129         n = icmp4_reply.__class__(str(icmp4_reply))
130         s = str(icmp4_reply)
131         icmp4_reply = s[0:576]
132         rx = self.send_and_expect(self.pg0, p4*9, self.pg0)
133         for p in rx:
134             self.validate_bytes(str(p[1]), icmp4_reply)
135
136         # Reset MTU
137         rv = self.vapi.sw_interface_set_mtu(sw_if_index, 1480)
138
139         # Decapsulation
140         p_ether = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
141
142         # IPv4 tunnel to IPv4
143         p_ip4 = IP(src="1.2.3.4", dst=self.pg0.remote_ip4)
144         p4 = (p_ether / IP(src=self.pg1.remote_ip4,
145                            dst=self.pg0.local_ip4) / p_ip4 / p_payload)
146         p4_reply = (p_ip4 / p_payload)
147         p4_reply.ttl -= 1
148         rx = self.send_and_expect(self.pg1, p4*10, self.pg0)
149         for p in rx:
150             self.validate(p[1], p4_reply)
151
152         # IPv4 tunnel to IPv6
153         p_ip6 = IPv6(src="1:2:3::4", dst=self.pg0.remote_ip6)
154         p6 = (p_ether / IP(src=self.pg1.remote_ip4,
155                            dst=self.pg0.local_ip4) / p_ip6 / p_payload)
156         p6_reply = (p_ip6 / p_payload)
157         p6_reply.hlim = 63
158         rx = self.send_and_expect(self.pg1, p6*10, self.pg0)
159         for p in rx:
160             self.validate(p[1], p6_reply)
161
162     def test_ipip6(self):
163         """ ip{v4,v6} over ip6 test """
164         p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
165         p_ip6 = IPv6(src=self.pg0.remote_ip6, dst="DEAD::1", nh='UDP')
166         p_ip4 = IP(src="1.2.3.4", dst="130.67.0.1")
167         p_payload = UDP(sport=1234, dport=1234) / self.payload(1300)
168
169         # IPv6 transport
170         rv = self.vapi.ipip_add_tunnel(
171             src_address=self.pg0.local_ip6n,
172             dst_address=self.pg1.remote_ip6n)
173
174         sw_if_index = rv.sw_if_index
175
176         self.vapi.sw_interface_set_flags(sw_if_index, 1)
177         self.vapi.sw_interface_set_unnumbered(
178             ip_sw_if_index=self.pg0.sw_if_index, sw_if_index=sw_if_index)
179
180         # Add IPv4 and IPv6 routes via tunnel interface
181         ip4_via_tunnel = VppIpRoute(
182             self, "130.67.0.0", 16,
183             [VppRoutePath("0.0.0.0",
184                           sw_if_index,
185                           proto=DpoProto.DPO_PROTO_IP4)], is_ip6=0)
186         ip4_via_tunnel.add_vpp_config()
187
188         ip6_via_tunnel = VppIpRoute(
189             self, "dead::", 16,
190             [VppRoutePath("::",
191                           sw_if_index,
192                           proto=DpoProto.DPO_PROTO_IP6)], is_ip6=1)
193         ip6_via_tunnel.add_vpp_config()
194
195         # Encapsulation
196
197         # IPv6 in to IPv6 tunnel
198         p6 = (p_ether / p_ip6 / p_payload)
199         p6_reply = (IPv6(src=self.pg0.local_ip6,
200                          dst=self.pg1.remote_ip6, hlim=63) / p_ip6 / p_payload)
201         p6_reply[1].hlim -= 1
202         rx = self.send_and_expect(self.pg0, p6*10, self.pg1)
203         for p in rx:
204             self.validate(p[1], p6_reply)
205
206         # MTU (only checked on encap)
207         rv = self.vapi.sw_interface_set_mtu(sw_if_index, 1280)
208         rv = self.vapi.sw_interface_dump()
209         for i in rv:
210             if i.sw_if_index == sw_if_index:
211                 self.assertEqual(i.mtu, 1280)
212                 break
213
214         # Should fail. Too large MTU
215         p_icmp6 = ICMPv6PacketTooBig(mtu=1280, cksum=0xd401)
216         icmp6_reply = (IPv6(src=self.pg0.local_ip6,
217                             dst=self.pg0.remote_ip6,
218                             hlim=254, plen=1240) /
219                        p_icmp6 / p_ip6 / p_payload)
220         icmp6_reply[2].hlim -= 1
221         s = str(icmp6_reply)
222         icmp6_reply = s[0:1280]
223         rx = self.send_and_expect(self.pg0, p6*9, self.pg0)
224         for p in rx:
225             self.validate_bytes(str(p[1]), icmp6_reply)
226
227         # Reset MTU
228         rv = self.vapi.sw_interface_set_mtu(sw_if_index, 1460)
229
230         # IPv4 in to IPv6 tunnel
231         p4 = (p_ether / p_ip4 / p_payload)
232         p4_reply = (IPv6(src=self.pg0.local_ip6,
233                          dst=self.pg1.remote_ip6, hlim=63) / p_ip4 / p_payload)
234         p4_reply[1].ttl -= 1
235         rx = self.send_and_expect(self.pg0, p4*10, self.pg1)
236         for p in rx:
237             self.validate(p[1], p4_reply)
238
239         # Decapsulation
240
241         p_ether = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
242
243         # IPv6 tunnel to IPv4
244         p_ip4 = IP(src="1.2.3.4", dst=self.pg0.remote_ip4)
245         p4 = (p_ether / IPv6(src=self.pg1.remote_ip6,
246                              dst=self.pg0.local_ip6) / p_ip4 / p_payload)
247         p4_reply = (p_ip4 / p_payload)
248         p4_reply.ttl -= 1
249         rx = self.send_and_expect(self.pg1, p4*10, self.pg0)
250         for p in rx:
251             self.validate(p[1], p4_reply)
252
253         # IPv6 tunnel to IPv6
254         p_ip6 = IPv6(src="1:2:3::4", dst=self.pg0.remote_ip6)
255         p6 = (p_ether / IPv6(src=self.pg1.remote_ip6,
256                              dst=self.pg0.local_ip6) / p_ip6 / p_payload)
257         p6_reply = (p_ip6 / p_payload)
258         p6_reply.hlim = 63
259         rx = self.send_and_expect(self.pg1, p6*10, self.pg0)
260         for p in rx:
261             self.validate(p[1], p6_reply)
262
263     def test_ipip_create(self):
264         """ ipip create / delete interface test """
265         rv = self.vapi.ipip_add_tunnel(
266             src_address=inet_pton(AF_INET, '1.2.3.4'),
267             dst_address=inet_pton(AF_INET, '2.3.4.5'), is_ipv6=0)
268         sw_if_index = rv.sw_if_index
269         self.vapi.ipip_del_tunnel(sw_if_index)
270
271
272 if __name__ == '__main__':
273     unittest.main(testRunner=VppTestRunner)