Test framework: StringIO fixes for Python3
[vpp.git] / test / test_ipip.py
1 #!/usr/bin/env python
2 """IP{4,6} over IP{v,6} tunnel functional tests"""
3
4 import unittest
5 from scapy.layers.inet6 import IPv6, Ether, IP, UDP, IPv6ExtHdrFragment
6 from scapy.all import fragment, fragment6, RandShort, defragment6
7 from framework import VppTestCase, VppTestRunner
8 from vpp_ip import DpoProto
9 from vpp_ip_route import VppIpRoute, VppRoutePath, VppIpTable
10 from socket import AF_INET, AF_INET6, inet_pton
11 from util import reassemble4
12
13
14 """ Testipip is a subclass of  VPPTestCase classes.
15
16 IPIP tests.
17
18 """
19
20
21 class TestIPIP(VppTestCase):
22     """ IPIP Test Case """
23
24     @classmethod
25     def setUpClass(cls):
26         super(TestIPIP, cls).setUpClass()
27         cls.create_pg_interfaces(range(2))
28         cls.interfaces = list(cls.pg_interfaces)
29
30     def setUp(self):
31         super(TestIPIP, self).setUp()
32         for i in self.interfaces:
33             i.admin_up()
34             i.config_ip4()
35             i.config_ip6()
36             i.disable_ipv6_ra()
37             i.resolve_arp()
38             i.resolve_ndp()
39
40     def tearDown(self):
41         super(TestIPIP, self).tearDown()
42         if not self.vpp_dead:
43             for i in self.pg_interfaces:
44                 i.unconfig_ip4()
45                 i.unconfig_ip6()
46                 i.admin_down()
47
48     def validate(self, rx, expected):
49         self.assertEqual(rx, expected.__class__(expected))
50
51     def generate_ip4_frags(self, payload_length, fragment_size):
52         p_ether = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
53         p_payload = UDP(sport=1234, dport=1234) / self.payload(payload_length)
54         p_ip4 = IP(src="1.2.3.4", dst=self.pg0.remote_ip4)
55         outer_ip4 = (p_ether / IP(src=self.pg1.remote_ip4,
56                                   id=RandShort(),
57                                   dst=self.pg0.local_ip4) / p_ip4 / p_payload)
58         frags = fragment(outer_ip4, fragment_size)
59         p4_reply = (p_ip4 / p_payload)
60         p4_reply.ttl -= 1
61         return frags, p4_reply
62
63     def test_ipip4(self):
64         """ ip{v4,v6} over ip4 test """
65         p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
66         p_ip6 = IPv6(src="1::1", dst="DEAD::1", nh='UDP', tc=42)
67         p_ip4 = IP(src="1.2.3.4", dst="130.67.0.1", tos=42)
68         p_payload = UDP(sport=1234, dport=1234)
69
70         # IPv4 transport
71         rv = self.vapi.ipip_add_tunnel(
72             src_address=self.pg0.local_ip4n,
73             dst_address=self.pg1.remote_ip4n,
74             is_ipv6=0, tc_tos=0xFF)
75         sw_if_index = rv.sw_if_index
76
77         # Set interface up and enable IP on it
78         self.vapi.sw_interface_set_flags(sw_if_index, 1)
79         self.vapi.sw_interface_set_unnumbered(
80             ip_sw_if_index=self.pg0.sw_if_index,
81             sw_if_index=sw_if_index)
82
83         # Add IPv4 and IPv6 routes via tunnel interface
84         ip4_via_tunnel = VppIpRoute(
85             self, "130.67.0.0", 16,
86             [VppRoutePath("0.0.0.0",
87                           sw_if_index,
88                           proto=DpoProto.DPO_PROTO_IP4)], is_ip6=0)
89         ip4_via_tunnel.add_vpp_config()
90
91         ip6_via_tunnel = VppIpRoute(
92             self, "dead::", 16,
93             [VppRoutePath("::",
94                           sw_if_index,
95                           proto=DpoProto.DPO_PROTO_IP6)], is_ip6=1)
96         ip6_via_tunnel.add_vpp_config()
97
98         # IPv6 in to IPv4 tunnel
99         p6 = (p_ether / p_ip6 / p_payload)
100         p_inner_ip6 = p_ip6
101         p_inner_ip6.hlim -= 1
102         p6_reply = (IP(src=self.pg0.local_ip4, dst=self.pg1.remote_ip4,
103                        proto='ipv6', id=0, tos=42) / p_inner_ip6 / p_payload)
104         p6_reply.ttl -= 1
105         rx = self.send_and_expect(self.pg0, p6*10, self.pg1)
106         for p in rx:
107             self.validate(p[1], p6_reply)
108
109         # IPv4 in to IPv4 tunnel
110         p4 = (p_ether / p_ip4 / p_payload)
111         p_ip4_inner = p_ip4
112         p_ip4_inner.ttl -= 1
113         p4_reply = (IP(src=self.pg0.local_ip4, dst=self.pg1.remote_ip4,
114                        tos=42) /
115                     p_ip4_inner / p_payload)
116         p4_reply.ttl -= 1
117         p4_reply.id = 0
118         rx = self.send_and_expect(self.pg0, p4*10, self.pg1)
119         for p in rx:
120             self.validate(p[1], p4_reply)
121
122         # Decapsulation
123         p_ether = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
124
125         # IPv4 tunnel to IPv4
126         p_ip4 = IP(src="1.2.3.4", dst=self.pg0.remote_ip4)
127         p4 = (p_ether / IP(src=self.pg1.remote_ip4,
128                            dst=self.pg0.local_ip4) / p_ip4 / p_payload)
129         p4_reply = (p_ip4 / p_payload)
130         p4_reply.ttl -= 1
131         rx = self.send_and_expect(self.pg1, p4*10, self.pg0)
132         for p in rx:
133             self.validate(p[1], p4_reply)
134
135         err = self.statistics.get_counter(
136             '/err/ipip4-input/packets decapsulated')
137         self.assertEqual(err, 10)
138
139         # IPv4 tunnel to IPv6
140         p_ip6 = IPv6(src="1:2:3::4", dst=self.pg0.remote_ip6)
141         p6 = (p_ether / IP(src=self.pg1.remote_ip4,
142                            dst=self.pg0.local_ip4) / p_ip6 / p_payload)
143         p6_reply = (p_ip6 / p_payload)
144         p6_reply.hlim = 63
145         rx = self.send_and_expect(self.pg1, p6*10, self.pg0)
146         for p in rx:
147             self.validate(p[1], p6_reply)
148
149         err = self.statistics.get_counter(
150             '/err/ipip4-input/packets decapsulated')
151         self.assertEqual(err, 20)
152
153         #
154         # Fragmentation / Reassembly and Re-fragmentation
155         #
156         rv = self.vapi.ip_reassembly_enable_disable(
157             sw_if_index=self.pg1.sw_if_index,
158             enable_ip4=1)
159
160         # Send lots of fragments, verify reassembled packet
161         frags, p4_reply = self.generate_ip4_frags(3131, 1400)
162         f = []
163         for i in range(0, 1000):
164             f.extend(frags)
165         self.pg1.add_stream(f)
166         self.pg_enable_capture()
167         self.pg_start()
168         rx = self.pg0.get_capture(1000)
169
170         for p in rx:
171             self.validate(p[1], p4_reply)
172
173         err = self.statistics.get_counter(
174             '/err/ipip4-input/packets decapsulated')
175         self.assertEqual(err, 1020)
176
177         f = []
178         r = []
179         for i in range(1, 90):
180             frags, p4_reply = self.generate_ip4_frags(i * 100, 1000)
181             f.extend(frags)
182             r.extend(p4_reply)
183         self.pg_enable_capture()
184         self.pg1.add_stream(f)
185         self.pg_start()
186         rx = self.pg0.get_capture(89)
187         i = 0
188         for p in rx:
189             self.validate(p[1], r[i])
190             i += 1
191
192         # Now try with re-fragmentation
193         #
194         # Send fragments to tunnel head-end, for the tunnel head end
195         # to reassemble and then refragment
196         #
197         self.vapi.sw_interface_set_mtu(self.pg0.sw_if_index, [576, 0, 0, 0])
198         frags, p4_reply = self.generate_ip4_frags(3123, 1200)
199         self.pg_enable_capture()
200         self.pg1.add_stream(frags)
201         self.pg_start()
202         rx = self.pg0.get_capture(6)
203         reass_pkt = reassemble4(rx)
204         p4_reply.ttl -= 1
205         p4_reply.id = 256
206         self.validate(reass_pkt, p4_reply)
207
208         self.vapi.sw_interface_set_mtu(self.pg0.sw_if_index, [1600, 0, 0, 0])
209         frags, p4_reply = self.generate_ip4_frags(3123, 1200)
210         self.pg_enable_capture()
211         self.pg1.add_stream(frags)
212         self.pg_start()
213         rx = self.pg0.get_capture(2)
214         reass_pkt = reassemble4(rx)
215         p4_reply.ttl -= 1
216         p4_reply.id = 512
217         self.validate(reass_pkt, p4_reply)
218
219     def test_ipip_create(self):
220         """ ipip create / delete interface test """
221         rv = self.vapi.ipip_add_tunnel(
222             src_address=inet_pton(AF_INET, '1.2.3.4'),
223             dst_address=inet_pton(AF_INET, '2.3.4.5'), is_ipv6=0)
224         sw_if_index = rv.sw_if_index
225         self.vapi.ipip_del_tunnel(sw_if_index)
226
227     def test_ipip_vrf_create(self):
228         """ ipip create / delete interface VRF test """
229
230         t = VppIpTable(self, 20)
231         t.add_vpp_config()
232         rv = self.vapi.ipip_add_tunnel(
233             src_address=inet_pton(AF_INET, '1.2.3.4'),
234             dst_address=inet_pton(AF_INET, '2.3.4.5'), is_ipv6=0,
235             table_id=20)
236         sw_if_index = rv.sw_if_index
237         self.vapi.ipip_del_tunnel(sw_if_index)
238
239     def payload(self, len):
240         return 'x' * len
241
242
243 class TestIPIP6(VppTestCase):
244     """ IPIP6 Test Case """
245
246     @classmethod
247     def setUpClass(cls):
248         super(TestIPIP6, cls).setUpClass()
249         cls.create_pg_interfaces(range(2))
250         cls.interfaces = list(cls.pg_interfaces)
251
252     def setUp(self):
253         for i in self.interfaces:
254             i.admin_up()
255             i.config_ip4()
256             i.config_ip6()
257             i.disable_ipv6_ra()
258             i.resolve_arp()
259             i.resolve_ndp()
260         self.setup_tunnel()
261
262     def tearDown(self):
263         if not self.vpp_dead:
264             self.destroy_tunnel()
265             for i in self.pg_interfaces:
266                 i.unconfig_ip4()
267                 i.unconfig_ip6()
268                 i.admin_down()
269             super(TestIPIP6, self).tearDown()
270
271     def setup_tunnel(self):
272         # IPv6 transport
273         rv = self.vapi.ipip_add_tunnel(
274             src_address=self.pg0.local_ip6n,
275             dst_address=self.pg1.remote_ip6n, tc_tos=255)
276
277         sw_if_index = rv.sw_if_index
278         self.tunnel_if_index = sw_if_index
279         self.vapi.sw_interface_set_flags(sw_if_index, 1)
280         self.vapi.sw_interface_set_unnumbered(
281             ip_sw_if_index=self.pg0.sw_if_index, sw_if_index=sw_if_index)
282
283         # Add IPv4 and IPv6 routes via tunnel interface
284         ip4_via_tunnel = VppIpRoute(
285             self, "130.67.0.0", 16,
286             [VppRoutePath("0.0.0.0",
287                           sw_if_index,
288                           proto=DpoProto.DPO_PROTO_IP4)], is_ip6=0)
289         ip4_via_tunnel.add_vpp_config()
290
291         ip6_via_tunnel = VppIpRoute(
292             self, "dead::", 16,
293             [VppRoutePath("::",
294                           sw_if_index,
295                           proto=DpoProto.DPO_PROTO_IP6)], is_ip6=1)
296         ip6_via_tunnel.add_vpp_config()
297
298         self.tunnel_ip6_via_tunnel = ip6_via_tunnel
299         self.tunnel_ip4_via_tunnel = ip4_via_tunnel
300
301     def destroy_tunnel(self):
302         # IPv6 transport
303         self.tunnel_ip4_via_tunnel.remove_vpp_config()
304         self.tunnel_ip6_via_tunnel.remove_vpp_config()
305
306         rv = self.vapi.ipip_del_tunnel(sw_if_index=self.tunnel_if_index)
307
308     def validate(self, rx, expected):
309         self.assertEqual(rx, expected.__class__(expected))
310
311     def generate_ip6_frags(self, payload_length, fragment_size):
312         p_ether = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
313         p_payload = UDP(sport=1234, dport=1234) / self.payload(payload_length)
314         p_ip6 = IPv6(src="1::1", dst=self.pg0.remote_ip6)
315         outer_ip6 = (p_ether / IPv6(src=self.pg1.remote_ip6,
316                                     dst=self.pg0.local_ip6) /
317                      IPv6ExtHdrFragment() / p_ip6 / p_payload)
318         frags = fragment6(outer_ip6, fragment_size)
319         p6_reply = (p_ip6 / p_payload)
320         p6_reply.hlim -= 1
321         return frags, p6_reply
322
323     def generate_ip6_hairpin_frags(self, payload_length, fragment_size):
324         p_ether = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
325         p_payload = UDP(sport=1234, dport=1234) / self.payload(payload_length)
326         p_ip6 = IPv6(src="1::1", dst="dead::1")
327         outer_ip6 = (p_ether / IPv6(src=self.pg1.remote_ip6,
328                                     dst=self.pg0.local_ip6) /
329                      IPv6ExtHdrFragment() / p_ip6 / p_payload)
330         frags = fragment6(outer_ip6, fragment_size)
331         p_ip6.hlim -= 1
332         p6_reply = (IPv6(src=self.pg0.local_ip6, dst=self.pg1.remote_ip6,
333                          hlim=63) / p_ip6 / p_payload)
334
335         return frags, p6_reply
336
337     def test_encap(self):
338         """ ip{v4,v6} over ip6 test encap """
339         p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
340         p_ip6 = IPv6(src="1::1", dst="DEAD::1", tc=42, nh='UDP')
341         p_ip4 = IP(src="1.2.3.4", dst="130.67.0.1", tos=42)
342         p_payload = UDP(sport=1234, dport=1234)
343
344         # Encapsulation
345         # IPv6 in to IPv6 tunnel
346         p6 = (p_ether / p_ip6 / p_payload)
347         p6_reply = (IPv6(src=self.pg0.local_ip6, dst=self.pg1.remote_ip6,
348                          hlim=64, tc=42) /
349                     p_ip6 / p_payload)
350         p6_reply[1].hlim -= 1
351         rx = self.send_and_expect(self.pg0, p6*11, self.pg1)
352         for p in rx:
353             self.validate(p[1], p6_reply)
354
355         # IPv4 in to IPv6 tunnel
356         p4 = (p_ether / p_ip4 / p_payload)
357         p4_reply = (IPv6(src=self.pg0.local_ip6,
358                          dst=self.pg1.remote_ip6, hlim=64, tc=42) /
359                     p_ip4 / p_payload)
360         p4_reply[1].ttl -= 1
361         rx = self.send_and_expect(self.pg0, p4*11, self.pg1)
362         for p in rx:
363             self.validate(p[1], p4_reply)
364
365     def test_decap(self):
366         """ ip{v4,v6} over ip6 test decap """
367
368         p_ether = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
369         p_ip6 = IPv6(src="1::1", dst="DEAD::1", tc=42, nh='UDP')
370         p_ip4 = IP(src="1.2.3.4", dst=self.pg0.remote_ip4)
371         p_payload = UDP(sport=1234, dport=1234)
372
373         # Decapsulation
374         # IPv6 tunnel to IPv4
375
376         p4 = (p_ether / IPv6(src=self.pg1.remote_ip6,
377                              dst=self.pg0.local_ip6) / p_ip4 / p_payload)
378         p4_reply = (p_ip4 / p_payload)
379         p4_reply.ttl -= 1
380         rx = self.send_and_expect(self.pg1, p4*11, self.pg0)
381         for p in rx:
382             self.validate(p[1], p4_reply)
383
384         # IPv6 tunnel to IPv6
385         p_ip6 = IPv6(src="1:2:3::4", dst=self.pg0.remote_ip6)
386         p6 = (p_ether / IPv6(src=self.pg1.remote_ip6,
387                              dst=self.pg0.local_ip6) / p_ip6 / p_payload)
388         p6_reply = (p_ip6 / p_payload)
389         p6_reply.hlim = 63
390         rx = self.send_and_expect(self.pg1, p6*11, self.pg0)
391         for p in rx:
392             self.validate(p[1], p6_reply)
393
394     def test_frag(self):
395         """ ip{v4,v6} over ip6 test frag """
396
397         p_ether = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
398         p_ip6 = IPv6(src="1::1", dst="DEAD::1", tc=42, nh='UDP')
399         p_ip4 = IP(src="1.2.3.4", dst=self.pg0.remote_ip4)
400         p_payload = UDP(sport=1234, dport=1234)
401
402         #
403         # Fragmentation / Reassembly and Re-fragmentation
404         #
405         rv = self.vapi.ip_reassembly_enable_disable(
406             sw_if_index=self.pg1.sw_if_index,
407             enable_ip6=1)
408
409         # Send lots of fragments, verify reassembled packet
410         before_cnt = self.statistics.get_counter(
411             '/err/ipip6-input/packets decapsulated')
412         frags, p6_reply = self.generate_ip6_frags(3131, 1400)
413         f = []
414         for i in range(0, 1000):
415             f.extend(frags)
416         self.pg1.add_stream(f)
417         self.pg_enable_capture()
418         self.pg_start()
419         rx = self.pg0.get_capture(1000)
420
421         for p in rx:
422             self.validate(p[1], p6_reply)
423
424         cnt = self.statistics.get_counter(
425             '/err/ipip6-input/packets decapsulated')
426         self.assertEqual(cnt, before_cnt + 1000)
427
428         f = []
429         r = []
430         # TODO: Check out why reassembly of atomic fragments don't work
431         for i in range(10, 90):
432             frags, p6_reply = self.generate_ip6_frags(i * 100, 1000)
433             f.extend(frags)
434             r.extend(p6_reply)
435         self.pg_enable_capture()
436         self.pg1.add_stream(f)
437         self.pg_start()
438         rx = self.pg0.get_capture(80)
439         i = 0
440         for p in rx:
441             self.validate(p[1], r[i])
442             i += 1
443
444         # Simple fragmentation
445         p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
446         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [1280, 0, 0, 0])
447
448         # IPv6 in to IPv6 tunnel
449         p_payload = UDP(sport=1234, dport=1234) / self.payload(1300)
450
451         p6 = (p_ether / p_ip6 / p_payload)
452         p6_reply = (IPv6(src=self.pg0.local_ip6, dst=self.pg1.remote_ip6,
453                          hlim=63, tc=42) /
454                     p_ip6 / p_payload)
455         p6_reply[1].hlim -= 1
456         self.pg_enable_capture()
457         self.pg0.add_stream(p6)
458         self.pg_start()
459         rx = self.pg1.get_capture(2)
460
461         # Scapy defragment doesn't deal well with multiple layers
462         # of samy type / Ethernet header first
463         f = [p[1] for p in rx]
464         reass_pkt = defragment6(f)
465         self.validate(reass_pkt, p6_reply)
466
467         # Now try with re-fragmentation
468         #
469         # Send large fragments to tunnel head-end, for the tunnel head end
470         # to reassemble and then refragment out the tunnel again.
471         # Hair-pinning
472         #
473         self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [1280, 0, 0, 0])
474         frags, p6_reply = self.generate_ip6_hairpin_frags(8000, 1200)
475         self.pg_enable_capture()
476         self.pg1.add_stream(frags)
477         self.pg_start()
478         rx = self.pg1.get_capture(7)
479         f = [p[1] for p in rx]
480         reass_pkt = defragment6(f)
481         p6_reply.id = 256
482         self.validate(reass_pkt, p6_reply)
483
484     def test_ipip_create(self):
485         """ ipip create / delete interface test """
486         rv = self.vapi.ipip_add_tunnel(
487             src_address=inet_pton(AF_INET, '1.2.3.4'),
488             dst_address=inet_pton(AF_INET, '2.3.4.5'), is_ipv6=0)
489         sw_if_index = rv.sw_if_index
490         self.vapi.ipip_del_tunnel(sw_if_index)
491
492     def test_ipip_vrf_create(self):
493         """ ipip create / delete interface VRF test """
494
495         t = VppIpTable(self, 20)
496         t.add_vpp_config()
497         rv = self.vapi.ipip_add_tunnel(
498             src_address=inet_pton(AF_INET, '1.2.3.4'),
499             dst_address=inet_pton(AF_INET, '2.3.4.5'), is_ipv6=0,
500             table_id=20)
501         sw_if_index = rv.sw_if_index
502         self.vapi.ipip_del_tunnel(sw_if_index)
503
504     def payload(self, len):
505         return 'x' * len
506
507
508 if __name__ == '__main__':
509     unittest.main(testRunner=VppTestRunner)