2 """IP{4,6} over IP{v,6} tunnel functional tests"""
5 from scapy.layers.inet6 import IPv6, Ether, IP, UDP, IPv6ExtHdrFragment
6 from scapy.all import fragment, fragment6, RandShort, defragment6
7 from framework import VppTestCase, VppTestRunner
8 from vpp_ip import DpoProto
9 from vpp_ip_route import VppIpRoute, VppRoutePath, VppIpTable, FibPathProto
10 from socket import AF_INET, AF_INET6, inet_pton
11 from util import reassemble4
13 """ Testipip is a subclass of VPPTestCase classes.
20 def ipip_add_tunnel(test, src, dst, table_id=0, tc_tos=0xff):
21 """ Add a IPIP tunnel """
22 return test.vapi.ipip_add_tunnel(
27 'instance': 0xffffffff,
33 class TestIPIP(VppTestCase):
34 """ IPIP Test Case """
38 super(TestIPIP, cls).setUpClass()
39 cls.create_pg_interfaces(range(2))
40 cls.interfaces = list(cls.pg_interfaces)
43 def tearDownClass(cls):
44 super(TestIPIP, cls).tearDownClass()
47 super(TestIPIP, self).setUp()
48 for i in self.interfaces:
57 super(TestIPIP, self).tearDown()
59 for i in self.pg_interfaces:
64 def validate(self, rx, expected):
65 self.assertEqual(rx, expected.__class__(expected))
67 def generate_ip4_frags(self, payload_length, fragment_size):
68 p_ether = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
69 p_payload = UDP(sport=1234, dport=1234) / self.payload(payload_length)
70 p_ip4 = IP(src="1.2.3.4", dst=self.pg0.remote_ip4)
71 outer_ip4 = (p_ether / IP(src=self.pg1.remote_ip4,
73 dst=self.pg0.local_ip4) / p_ip4 / p_payload)
74 frags = fragment(outer_ip4, fragment_size)
75 p4_reply = (p_ip4 / p_payload)
77 return frags, p4_reply
80 """ ip{v4,v6} over ip4 test """
81 p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
82 p_ip6 = IPv6(src="1::1", dst="DEAD::1", nh='UDP', tc=42)
83 p_ip4 = IP(src="1.2.3.4", dst="130.67.0.1", tos=42)
84 p_payload = UDP(sport=1234, dport=1234)
87 rv = ipip_add_tunnel(self,
91 sw_if_index = rv.sw_if_index
93 # Set interface up and enable IP on it
94 self.vapi.sw_interface_set_flags(sw_if_index, 1)
95 self.vapi.sw_interface_set_unnumbered(
96 sw_if_index=self.pg0.sw_if_index,
97 unnumbered_sw_if_index=sw_if_index)
99 # Add IPv4 and IPv6 routes via tunnel interface
100 ip4_via_tunnel = VppIpRoute(
101 self, "130.67.0.0", 16,
102 [VppRoutePath("0.0.0.0",
104 proto=FibPathProto.FIB_PATH_NH_PROTO_IP4)])
105 ip4_via_tunnel.add_vpp_config()
107 ip6_via_tunnel = VppIpRoute(
111 proto=FibPathProto.FIB_PATH_NH_PROTO_IP6)])
112 ip6_via_tunnel.add_vpp_config()
114 # IPv6 in to IPv4 tunnel
115 p6 = (p_ether / p_ip6 / p_payload)
117 p_inner_ip6.hlim -= 1
118 p6_reply = (IP(src=self.pg0.local_ip4, dst=self.pg1.remote_ip4,
119 proto='ipv6', id=0, tos=42) / p_inner_ip6 / p_payload)
121 rx = self.send_and_expect(self.pg0, p6 * 10, self.pg1)
123 self.validate(p[1], p6_reply)
125 # IPv4 in to IPv4 tunnel
126 p4 = (p_ether / p_ip4 / p_payload)
129 p4_reply = (IP(src=self.pg0.local_ip4, dst=self.pg1.remote_ip4,
131 p_ip4_inner / p_payload)
134 rx = self.send_and_expect(self.pg0, p4 * 10, self.pg1)
136 self.validate(p[1], p4_reply)
139 p_ether = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
141 # IPv4 tunnel to IPv4
142 p_ip4 = IP(src="1.2.3.4", dst=self.pg0.remote_ip4)
143 p4 = (p_ether / IP(src=self.pg1.remote_ip4,
144 dst=self.pg0.local_ip4) / p_ip4 / p_payload)
145 p4_reply = (p_ip4 / p_payload)
147 rx = self.send_and_expect(self.pg1, p4 * 10, self.pg0)
149 self.validate(p[1], p4_reply)
151 err = self.statistics.get_err_counter(
152 '/err/ipip4-input/packets decapsulated')
153 self.assertEqual(err, 10)
155 # IPv4 tunnel to IPv6
156 p_ip6 = IPv6(src="1:2:3::4", dst=self.pg0.remote_ip6)
157 p6 = (p_ether / IP(src=self.pg1.remote_ip4,
158 dst=self.pg0.local_ip4) / p_ip6 / p_payload)
159 p6_reply = (p_ip6 / p_payload)
161 rx = self.send_and_expect(self.pg1, p6 * 10, self.pg0)
163 self.validate(p[1], p6_reply)
165 err = self.statistics.get_err_counter(
166 '/err/ipip4-input/packets decapsulated')
167 self.assertEqual(err, 20)
170 # Fragmentation / Reassembly and Re-fragmentation
172 rv = self.vapi.ip_reassembly_enable_disable(
173 sw_if_index=self.pg1.sw_if_index,
176 self.vapi.ip_reassembly_set(timeout_ms=1000, max_reassemblies=1000,
177 max_reassembly_length=1000,
178 expire_walk_interval_ms=10000,
181 # Send lots of fragments, verify reassembled packet
182 frags, p4_reply = self.generate_ip4_frags(3131, 1400)
184 for i in range(0, 1000):
186 self.pg1.add_stream(f)
187 self.pg_enable_capture()
189 rx = self.pg0.get_capture(1000)
192 self.validate(p[1], p4_reply)
194 err = self.statistics.get_err_counter(
195 '/err/ipip4-input/packets decapsulated')
196 self.assertEqual(err, 1020)
200 for i in range(1, 90):
201 frags, p4_reply = self.generate_ip4_frags(i * 100, 1000)
204 self.pg_enable_capture()
205 self.pg1.add_stream(f)
207 rx = self.pg0.get_capture(89)
210 self.validate(p[1], r[i])
213 # Now try with re-fragmentation
215 # Send fragments to tunnel head-end, for the tunnel head end
216 # to reassemble and then refragment
218 self.vapi.sw_interface_set_mtu(self.pg0.sw_if_index, [576, 0, 0, 0])
219 frags, p4_reply = self.generate_ip4_frags(3123, 1200)
220 self.pg_enable_capture()
221 self.pg1.add_stream(frags)
223 rx = self.pg0.get_capture(6)
224 reass_pkt = reassemble4(rx)
227 self.validate(reass_pkt, p4_reply)
229 self.vapi.sw_interface_set_mtu(self.pg0.sw_if_index, [1600, 0, 0, 0])
230 frags, p4_reply = self.generate_ip4_frags(3123, 1200)
231 self.pg_enable_capture()
232 self.pg1.add_stream(frags)
234 rx = self.pg0.get_capture(2)
235 reass_pkt = reassemble4(rx)
238 self.validate(reass_pkt, p4_reply)
240 def test_ipip_create(self):
241 """ ipip create / delete interface test """
242 rv = ipip_add_tunnel(self, '1.2.3.4', '2.3.4.5')
243 sw_if_index = rv.sw_if_index
244 self.vapi.ipip_del_tunnel(sw_if_index)
246 def test_ipip_vrf_create(self):
247 """ ipip create / delete interface VRF test """
249 t = VppIpTable(self, 20)
251 rv = ipip_add_tunnel(self, '1.2.3.4', '2.3.4.5', table_id=20)
252 sw_if_index = rv.sw_if_index
253 self.vapi.ipip_del_tunnel(sw_if_index)
255 def payload(self, len):
259 class TestIPIP6(VppTestCase):
260 """ IPIP6 Test Case """
264 super(TestIPIP6, cls).setUpClass()
265 cls.create_pg_interfaces(range(2))
266 cls.interfaces = list(cls.pg_interfaces)
269 def tearDownClass(cls):
270 super(TestIPIP6, cls).tearDownClass()
273 super(TestIPIP6, self).setUp()
274 for i in self.interfaces:
284 if not self.vpp_dead:
285 self.destroy_tunnel()
286 for i in self.pg_interfaces:
290 super(TestIPIP6, self).tearDown()
292 def setup_tunnel(self):
294 rv = ipip_add_tunnel(self,
299 sw_if_index = rv.sw_if_index
300 self.tunnel_if_index = sw_if_index
301 self.vapi.sw_interface_set_flags(sw_if_index, 1)
302 self.vapi.sw_interface_set_unnumbered(
303 sw_if_index=self.pg0.sw_if_index,
304 unnumbered_sw_if_index=sw_if_index)
306 # Add IPv4 and IPv6 routes via tunnel interface
307 ip4_via_tunnel = VppIpRoute(
308 self, "130.67.0.0", 16,
309 [VppRoutePath("0.0.0.0",
311 proto=FibPathProto.FIB_PATH_NH_PROTO_IP4)])
312 ip4_via_tunnel.add_vpp_config()
314 ip6_via_tunnel = VppIpRoute(
318 proto=FibPathProto.FIB_PATH_NH_PROTO_IP6)])
319 ip6_via_tunnel.add_vpp_config()
321 self.tunnel_ip6_via_tunnel = ip6_via_tunnel
322 self.tunnel_ip4_via_tunnel = ip4_via_tunnel
324 def destroy_tunnel(self):
326 self.tunnel_ip4_via_tunnel.remove_vpp_config()
327 self.tunnel_ip6_via_tunnel.remove_vpp_config()
329 rv = self.vapi.ipip_del_tunnel(sw_if_index=self.tunnel_if_index)
331 def validate(self, rx, expected):
332 self.assertEqual(rx, expected.__class__(expected))
334 def generate_ip6_frags(self, payload_length, fragment_size):
335 p_ether = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
336 p_payload = UDP(sport=1234, dport=1234) / self.payload(payload_length)
337 p_ip6 = IPv6(src="1::1", dst=self.pg0.remote_ip6)
338 outer_ip6 = (p_ether / IPv6(src=self.pg1.remote_ip6,
339 dst=self.pg0.local_ip6) /
340 IPv6ExtHdrFragment() / p_ip6 / p_payload)
341 frags = fragment6(outer_ip6, fragment_size)
342 p6_reply = (p_ip6 / p_payload)
344 return frags, p6_reply
346 def generate_ip6_hairpin_frags(self, payload_length, fragment_size):
347 p_ether = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
348 p_payload = UDP(sport=1234, dport=1234) / self.payload(payload_length)
349 p_ip6 = IPv6(src="1::1", dst="dead::1")
350 outer_ip6 = (p_ether / IPv6(src=self.pg1.remote_ip6,
351 dst=self.pg0.local_ip6) /
352 IPv6ExtHdrFragment() / p_ip6 / p_payload)
353 frags = fragment6(outer_ip6, fragment_size)
355 p6_reply = (IPv6(src=self.pg0.local_ip6, dst=self.pg1.remote_ip6,
356 hlim=63) / p_ip6 / p_payload)
358 return frags, p6_reply
360 def test_encap(self):
361 """ ip{v4,v6} over ip6 test encap """
362 p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
363 p_ip6 = IPv6(src="1::1", dst="DEAD::1", tc=42, nh='UDP')
364 p_ip4 = IP(src="1.2.3.4", dst="130.67.0.1", tos=42)
365 p_payload = UDP(sport=1234, dport=1234)
368 # IPv6 in to IPv6 tunnel
369 p6 = (p_ether / p_ip6 / p_payload)
370 p6_reply = (IPv6(src=self.pg0.local_ip6, dst=self.pg1.remote_ip6,
373 p6_reply[1].hlim -= 1
374 rx = self.send_and_expect(self.pg0, p6 * 11, self.pg1)
376 self.validate(p[1], p6_reply)
378 # IPv4 in to IPv6 tunnel
379 p4 = (p_ether / p_ip4 / p_payload)
380 p4_reply = (IPv6(src=self.pg0.local_ip6,
381 dst=self.pg1.remote_ip6, hlim=64, tc=42) /
384 rx = self.send_and_expect(self.pg0, p4 * 11, self.pg1)
386 self.validate(p[1], p4_reply)
388 def test_decap(self):
389 """ ip{v4,v6} over ip6 test decap """
391 p_ether = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
392 p_ip6 = IPv6(src="1::1", dst="DEAD::1", tc=42, nh='UDP')
393 p_ip4 = IP(src="1.2.3.4", dst=self.pg0.remote_ip4)
394 p_payload = UDP(sport=1234, dport=1234)
397 # IPv6 tunnel to IPv4
399 p4 = (p_ether / IPv6(src=self.pg1.remote_ip6,
400 dst=self.pg0.local_ip6) / p_ip4 / p_payload)
401 p4_reply = (p_ip4 / p_payload)
403 rx = self.send_and_expect(self.pg1, p4 * 11, self.pg0)
405 self.validate(p[1], p4_reply)
407 # IPv6 tunnel to IPv6
408 p_ip6 = IPv6(src="1:2:3::4", dst=self.pg0.remote_ip6)
409 p6 = (p_ether / IPv6(src=self.pg1.remote_ip6,
410 dst=self.pg0.local_ip6) / p_ip6 / p_payload)
411 p6_reply = (p_ip6 / p_payload)
413 rx = self.send_and_expect(self.pg1, p6 * 11, self.pg0)
415 self.validate(p[1], p6_reply)
418 """ ip{v4,v6} over ip6 test frag """
420 p_ether = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
421 p_ip6 = IPv6(src="1::1", dst="DEAD::1", tc=42, nh='UDP')
422 p_ip4 = IP(src="1.2.3.4", dst=self.pg0.remote_ip4)
423 p_payload = UDP(sport=1234, dport=1234)
426 # Fragmentation / Reassembly and Re-fragmentation
428 rv = self.vapi.ip_reassembly_enable_disable(
429 sw_if_index=self.pg1.sw_if_index,
432 self.vapi.ip_reassembly_set(timeout_ms=1000, max_reassemblies=1000,
433 max_reassembly_length=1000,
434 expire_walk_interval_ms=10000,
437 # Send lots of fragments, verify reassembled packet
438 before_cnt = self.statistics.get_err_counter(
439 '/err/ipip6-input/packets decapsulated')
440 frags, p6_reply = self.generate_ip6_frags(3131, 1400)
442 for i in range(0, 1000):
444 self.pg1.add_stream(f)
445 self.pg_enable_capture()
447 rx = self.pg0.get_capture(1000)
450 self.validate(p[1], p6_reply)
452 cnt = self.statistics.get_err_counter(
453 '/err/ipip6-input/packets decapsulated')
454 self.assertEqual(cnt, before_cnt + 1000)
458 # TODO: Check out why reassembly of atomic fragments don't work
459 for i in range(10, 90):
460 frags, p6_reply = self.generate_ip6_frags(i * 100, 1000)
463 self.pg_enable_capture()
464 self.pg1.add_stream(f)
466 rx = self.pg0.get_capture(80)
469 self.validate(p[1], r[i])
472 # Simple fragmentation
473 p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
474 self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [1280, 0, 0, 0])
476 # IPv6 in to IPv6 tunnel
477 p_payload = UDP(sport=1234, dport=1234) / self.payload(1300)
479 p6 = (p_ether / p_ip6 / p_payload)
480 p6_reply = (IPv6(src=self.pg0.local_ip6, dst=self.pg1.remote_ip6,
483 p6_reply[1].hlim -= 1
484 self.pg_enable_capture()
485 self.pg0.add_stream(p6)
487 rx = self.pg1.get_capture(2)
489 # Scapy defragment doesn't deal well with multiple layers
490 # of same type / Ethernet header first
491 f = [p[1] for p in rx]
492 reass_pkt = defragment6(f)
493 self.validate(reass_pkt, p6_reply)
495 # Now try with re-fragmentation
497 # Send large fragments to tunnel head-end, for the tunnel head end
498 # to reassemble and then refragment out the tunnel again.
501 self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [1280, 0, 0, 0])
502 frags, p6_reply = self.generate_ip6_hairpin_frags(8000, 1200)
503 self.pg_enable_capture()
504 self.pg1.add_stream(frags)
506 rx = self.pg1.get_capture(7)
507 f = [p[1] for p in rx]
508 reass_pkt = defragment6(f)
510 self.validate(reass_pkt, p6_reply)
512 def test_ipip_create(self):
513 """ ipip create / delete interface test """
514 rv = ipip_add_tunnel(self, '1.2.3.4', '2.3.4.5')
515 sw_if_index = rv.sw_if_index
516 self.vapi.ipip_del_tunnel(sw_if_index)
518 def test_ipip_vrf_create(self):
519 """ ipip create / delete interface VRF test """
521 t = VppIpTable(self, 20)
523 rv = ipip_add_tunnel(self, '1.2.3.4', '2.3.4.5', table_id=20)
524 sw_if_index = rv.sw_if_index
525 self.vapi.ipip_del_tunnel(sw_if_index)
527 def payload(self, len):
531 if __name__ == '__main__':
532 unittest.main(testRunner=VppTestRunner)