tests: Add support for getting corefile patterns on FreeBSD
[vpp.git] / test / test_vxlan.py
1 #!/usr/bin/env python3
2
3 import socket
4 from util import ip4_range, reassemble4
5 import unittest
6 from framework import VppTestCase
7 from asfframework import VppTestRunner
8 from template_bd import BridgeDomain
9
10 from scapy.layers.l2 import Ether
11 from scapy.layers.l2 import ARP
12 from scapy.packet import Raw, bind_layers
13 from scapy.layers.inet import IP, UDP
14 from scapy.layers.vxlan import VXLAN
15
16 import util
17 from vpp_ip_route import VppIpRoute, VppRoutePath
18 from vpp_vxlan_tunnel import VppVxlanTunnel
19 from vpp_ip import INVALID_INDEX
20 from vpp_neighbor import VppNeighbor
21
22
23 class TestVxlan(BridgeDomain, VppTestCase):
24     """VXLAN Test Case"""
25
26     def __init__(self, *args):
27         BridgeDomain.__init__(self)
28         VppTestCase.__init__(self, *args)
29
30     def encapsulate(self, pkt, vni):
31         """
32         Encapsulate the original payload frame by adding VXLAN header with its
33         UDP, IP and Ethernet fields
34         """
35         return (
36             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
37             / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
38             / UDP(sport=self.dport, dport=self.dport, chksum=0)
39             / VXLAN(vni=vni, flags=self.flags)
40             / pkt
41         )
42
43     def ip_range(self, start, end):
44         """range of remote ip's"""
45         return ip4_range(self.pg0.remote_ip4, start, end)
46
47     def encap_mcast(self, pkt, src_ip, src_mac, vni):
48         """
49         Encapsulate the original payload frame by adding VXLAN header with its
50         UDP, IP and Ethernet fields
51         """
52         return (
53             Ether(src=src_mac, dst=self.mcast_mac)
54             / IP(src=src_ip, dst=self.mcast_ip4)
55             / UDP(sport=self.dport, dport=self.dport, chksum=0)
56             / VXLAN(vni=vni, flags=self.flags)
57             / pkt
58         )
59
60     def decapsulate(self, pkt):
61         """
62         Decapsulate the original payload frame by removing VXLAN header
63         """
64         # check if is set I flag
65         self.assertEqual(pkt[VXLAN].flags, int("0x8", 16))
66         return pkt[VXLAN].payload
67
68     # Method for checking VXLAN encapsulation.
69     #
70     def check_encapsulation(self, pkt, vni, local_only=False, mcast_pkt=False):
71         # TODO: add error messages
72         # Verify source MAC is VPP_MAC and destination MAC is MY_MAC resolved
73         #  by VPP using ARP.
74         self.assertEqual(pkt[Ether].src, self.pg0.local_mac)
75         if not local_only:
76             if not mcast_pkt:
77                 self.assertEqual(pkt[Ether].dst, self.pg0.remote_mac)
78             else:
79                 self.assertEqual(pkt[Ether].dst, type(self).mcast_mac)
80         # Verify VXLAN tunnel source IP is VPP_IP and destination IP is MY_IP.
81         self.assertEqual(pkt[IP].src, self.pg0.local_ip4)
82         if not local_only:
83             if not mcast_pkt:
84                 self.assertEqual(pkt[IP].dst, self.pg0.remote_ip4)
85             else:
86                 self.assertEqual(pkt[IP].dst, type(self).mcast_ip4)
87         # Verify UDP destination port is VXLAN 4789, source UDP port could be
88         #  arbitrary.
89         self.assertEqual(pkt[UDP].dport, self.dport)
90         # Verify UDP checksum
91         self.assert_udp_checksum_valid(pkt)
92         # Verify VNI
93         self.assertEqual(pkt[VXLAN].vni, vni)
94
95     @classmethod
96     def create_vxlan_flood_test_bd(cls, vni, n_ucast_tunnels, port):
97         # Create 10 ucast vxlan tunnels under bd
98         ip_range_start = 10
99         ip_range_end = ip_range_start + n_ucast_tunnels
100         next_hop_address = cls.pg0.remote_ip4
101         for dest_ip4 in ip4_range(next_hop_address, ip_range_start, ip_range_end):
102             # add host route so dest_ip4 will not be resolved
103             rip = VppIpRoute(
104                 cls,
105                 dest_ip4,
106                 32,
107                 [VppRoutePath(next_hop_address, INVALID_INDEX)],
108                 register=False,
109             )
110             rip.add_vpp_config()
111
112             r = VppVxlanTunnel(
113                 cls,
114                 src=cls.pg0.local_ip4,
115                 src_port=port,
116                 dst_port=port,
117                 dst=dest_ip4,
118                 vni=vni,
119             )
120             r.add_vpp_config()
121             cls.vapi.sw_interface_set_l2_bridge(r.sw_if_index, bd_id=vni)
122
123     @classmethod
124     def add_del_shared_mcast_dst_load(cls, port, is_add):
125         """
126         add or del tunnels sharing the same mcast dst
127         to test vxlan ref_count mechanism
128         """
129         n_shared_dst_tunnels = 20
130         vni_start = 10000
131         vni_end = vni_start + n_shared_dst_tunnels
132         for vni in range(vni_start, vni_end):
133             r = VppVxlanTunnel(
134                 cls,
135                 src=cls.pg0.local_ip4,
136                 src_port=port,
137                 dst_port=port,
138                 dst=cls.mcast_ip4,
139                 mcast_sw_if_index=1,
140                 vni=vni,
141             )
142             if is_add:
143                 r.add_vpp_config()
144                 if r.sw_if_index == 0xFFFFFFFF:
145                     raise ValueError("bad sw_if_index: ~0")
146             else:
147                 r.remove_vpp_config()
148
149     @classmethod
150     def add_shared_mcast_dst_load(cls, port):
151         cls.add_del_shared_mcast_dst_load(port=port, is_add=1)
152
153     @classmethod
154     def del_shared_mcast_dst_load(cls, port):
155         cls.add_del_shared_mcast_dst_load(port=port, is_add=0)
156
157     @classmethod
158     def add_del_mcast_tunnels_load(cls, port, is_add):
159         """
160         add or del tunnels to test vxlan stability
161         """
162         n_distinct_dst_tunnels = 200
163         ip_range_start = 10
164         ip_range_end = ip_range_start + n_distinct_dst_tunnels
165         for dest_ip4 in ip4_range(cls.mcast_ip4, ip_range_start, ip_range_end):
166             vni = bytearray(socket.inet_pton(socket.AF_INET, dest_ip4))[3]
167             r = VppVxlanTunnel(
168                 cls,
169                 src=cls.pg0.local_ip4,
170                 src_port=port,
171                 dst_port=port,
172                 dst=dest_ip4,
173                 mcast_sw_if_index=1,
174                 vni=vni,
175             )
176             if is_add:
177                 r.add_vpp_config()
178             else:
179                 r.remove_vpp_config()
180
181     @classmethod
182     def add_mcast_tunnels_load(cls, port):
183         cls.add_del_mcast_tunnels_load(port=port, is_add=1)
184
185     @classmethod
186     def del_mcast_tunnels_load(cls, port):
187         cls.add_del_mcast_tunnels_load(port=port, is_add=0)
188
189     # Class method to start the VXLAN test case.
190     #  Overrides setUpClass method in VppTestCase class.
191     #  Python try..except statement is used to ensure that the tear down of
192     #  the class will be executed even if exception is raised.
193     #  @param cls The class pointer.
194     @classmethod
195     def setUpClass(cls):
196         super(TestVxlan, cls).setUpClass()
197
198         try:
199             cls.flags = 0x8
200
201             # Create 2 pg interfaces.
202             cls.create_pg_interfaces(range(4))
203             for pg in cls.pg_interfaces:
204                 pg.admin_up()
205
206             # Configure IPv4 addresses on VPP pg0.
207             cls.pg0.config_ip4()
208
209             # Resolve MAC address for VPP's IP address on pg0.
210             cls.pg0.resolve_arp()
211
212             # Our Multicast address
213             cls.mcast_ip4 = "239.1.1.1"
214             cls.mcast_mac = util.mcast_ip_to_mac(cls.mcast_ip4)
215         except Exception:
216             cls.tearDownClass()
217             raise
218
219     @classmethod
220     def tearDownClass(cls):
221         super(TestVxlan, cls).tearDownClass()
222
223     def setUp(self):
224         super(TestVxlan, self).setUp()
225
226     def createVxLANInterfaces(self, port=4789):
227         # Create VXLAN VTEP on VPP pg0, and put vxlan_tunnel0 and pg1
228         #  into BD.
229         self.dport = port
230
231         self.single_tunnel_vni = 0x12345
232         self.single_tunnel_bd = 1
233         r = VppVxlanTunnel(
234             self,
235             src=self.pg0.local_ip4,
236             dst=self.pg0.remote_ip4,
237             src_port=self.dport,
238             dst_port=self.dport,
239             vni=self.single_tunnel_vni,
240         )
241         r.add_vpp_config()
242         self.vapi.sw_interface_set_l2_bridge(
243             rx_sw_if_index=r.sw_if_index, bd_id=self.single_tunnel_bd
244         )
245         self.vapi.sw_interface_set_l2_bridge(
246             rx_sw_if_index=self.pg1.sw_if_index, bd_id=self.single_tunnel_bd
247         )
248
249         # Setup vni 2 to test multicast flooding
250         self.n_ucast_tunnels = 10
251         self.mcast_flood_bd = 2
252         self.create_vxlan_flood_test_bd(
253             self.mcast_flood_bd, self.n_ucast_tunnels, self.dport
254         )
255         r = VppVxlanTunnel(
256             self,
257             src=self.pg0.local_ip4,
258             dst=self.mcast_ip4,
259             src_port=self.dport,
260             dst_port=self.dport,
261             mcast_sw_if_index=1,
262             vni=self.mcast_flood_bd,
263         )
264         r.add_vpp_config()
265         self.vapi.sw_interface_set_l2_bridge(
266             rx_sw_if_index=r.sw_if_index, bd_id=self.mcast_flood_bd
267         )
268         self.vapi.sw_interface_set_l2_bridge(
269             rx_sw_if_index=self.pg2.sw_if_index, bd_id=self.mcast_flood_bd
270         )
271
272         # Add and delete mcast tunnels to check stability
273         self.add_shared_mcast_dst_load(self.dport)
274         self.add_mcast_tunnels_load(self.dport)
275         self.del_shared_mcast_dst_load(self.dport)
276         self.del_mcast_tunnels_load(self.dport)
277
278         # Setup vni 3 to test unicast flooding
279         self.ucast_flood_bd = 3
280         self.create_vxlan_flood_test_bd(
281             self.ucast_flood_bd, self.n_ucast_tunnels, self.dport
282         )
283         self.vapi.sw_interface_set_l2_bridge(
284             rx_sw_if_index=self.pg3.sw_if_index, bd_id=self.ucast_flood_bd
285         )
286
287         # Set scapy listen custom port for VxLAN
288         bind_layers(UDP, VXLAN, dport=self.dport)
289
290     def encap_big_packet(self):
291         self.vapi.sw_interface_set_mtu(self.pg0.sw_if_index, [1500, 0, 0, 0])
292
293         frame = (
294             Ether(src="00:00:00:00:00:02", dst="00:00:00:00:00:01")
295             / IP(src="4.3.2.1", dst="1.2.3.4")
296             / UDP(sport=20000, dport=10000)
297             / Raw(b"\xa5" * 1450)
298         )
299
300         self.pg1.add_stream([frame])
301
302         self.pg0.enable_capture()
303
304         self.pg_start()
305
306         # Pick first received frame and check if it's correctly encapsulated.
307         out = self.pg0.get_capture(2)
308         ether = out[0]
309         pkt = reassemble4(out)
310         pkt = ether / pkt
311         self.check_encapsulation(pkt, self.single_tunnel_vni)
312
313         payload = self.decapsulate(pkt)
314         # TODO: Scapy bug?
315         # self.assert_eq_pkts(payload, frame)
316
317     """
318     Tests with default port (4789)
319     """
320
321     def test_decap(self):
322         """Decapsulation test
323         from BridgeDoman
324         """
325         self.createVxLANInterfaces()
326         super(TestVxlan, self).test_decap()
327
328     def test_encap(self):
329         """Encapsulation test
330         from BridgeDoman
331         """
332         self.createVxLANInterfaces()
333         super(TestVxlan, self).test_encap()
334
335     def test_encap_big_packet(self):
336         """Encapsulation test send big frame from pg1
337         Verify receipt of encapsulated frames on pg0
338         """
339         self.createVxLANInterfaces()
340         self.encap_big_packet()
341
342     def test_ucast_flood(self):
343         """Unicast flood test
344         from BridgeDoman
345         """
346         self.createVxLANInterfaces()
347         super(TestVxlan, self).test_ucast_flood()
348
349     def test_mcast_flood(self):
350         """Multicast flood test
351         from BridgeDoman
352         """
353         self.createVxLANInterfaces()
354         super(TestVxlan, self).test_mcast_flood()
355
356     def test_mcast_rcv(self):
357         """Multicast receive test
358         from BridgeDoman
359         """
360         self.createVxLANInterfaces()
361         super(TestVxlan, self).test_mcast_rcv()
362
363     """
364     Tests with custom port
365     """
366
367     def test_decap_custom_port(self):
368         """Decapsulation test custom port
369         from BridgeDoman
370         """
371         self.createVxLANInterfaces(1111)
372         super(TestVxlan, self).test_decap()
373
374     def test_encap_custom_port(self):
375         """Encapsulation test custom port
376         from BridgeDoman
377         """
378         self.createVxLANInterfaces(1111)
379         super(TestVxlan, self).test_encap()
380
381     def test_ucast_flood_custom_port(self):
382         """Unicast flood test custom port
383         from BridgeDoman
384         """
385         self.createVxLANInterfaces(1111)
386         super(TestVxlan, self).test_ucast_flood()
387
388     def test_mcast_flood_custom_port(self):
389         """Multicast flood test custom port
390         from BridgeDoman
391         """
392         self.createVxLANInterfaces(1111)
393         super(TestVxlan, self).test_mcast_flood()
394
395     def test_mcast_rcv_custom_port(self):
396         """Multicast receive test custom port
397         from BridgeDoman
398         """
399         self.createVxLANInterfaces(1111)
400         super(TestVxlan, self).test_mcast_rcv()
401
402     # Method to define VPP actions before tear down of the test case.
403     #  Overrides tearDown method in VppTestCase class.
404     #  @param self The object pointer.
405
406     def tearDown(self):
407         super(TestVxlan, self).tearDown()
408
409     def show_commands_at_teardown(self):
410         self.logger.info(self.vapi.cli("show bridge-domain 1 detail"))
411         self.logger.info(self.vapi.cli("show bridge-domain 2 detail"))
412         self.logger.info(self.vapi.cli("show bridge-domain 3 detail"))
413         self.logger.info(self.vapi.cli("show vxlan tunnel"))
414
415
416 class TestVxlan2(VppTestCase):
417     """VXLAN Test Case"""
418
419     def setUp(self):
420         super(TestVxlan2, self).setUp()
421
422         # Create 2 pg interfaces.
423         self.create_pg_interfaces(range(4))
424         for pg in self.pg_interfaces:
425             pg.admin_up()
426
427         # Configure IPv4 addresses on VPP pg0.
428         self.pg0.config_ip4()
429         self.pg0.resolve_arp()
430
431     def tearDown(self):
432         super(TestVxlan2, self).tearDown()
433
434     def test_xconnect(self):
435         """VXLAN source address not local"""
436
437         #
438         # test the broken configuration of a VXLAN tunnel whose
439         # source address is not local ot the box. packets sent
440         # through the tunnel should be dropped
441         #
442         t = VppVxlanTunnel(self, src="10.0.0.5", dst=self.pg0.local_ip4, vni=1000)
443         t.add_vpp_config()
444         t.admin_up()
445
446         self.vapi.sw_interface_set_l2_xconnect(
447             t.sw_if_index, self.pg1.sw_if_index, enable=1
448         )
449         self.vapi.sw_interface_set_l2_xconnect(
450             self.pg1.sw_if_index, t.sw_if_index, enable=1
451         )
452
453         p = (
454             Ether(src="00:11:22:33:44:55", dst="00:00:00:11:22:33")
455             / IP(src="4.3.2.1", dst="1.2.3.4")
456             / UDP(sport=20000, dport=10000)
457             / Raw(b"\xa5" * 1450)
458         )
459
460         rx = self.send_and_assert_no_replies(self.pg1, [p])
461
462
463 class TestVxlanL2Mode(VppTestCase):
464     """VXLAN Test Case"""
465
466     def setUp(self):
467         super(TestVxlanL2Mode, self).setUp()
468
469         # Create 2 pg interfaces.
470         self.create_pg_interfaces(range(2))
471         for pg in self.pg_interfaces:
472             pg.admin_up()
473
474         # Configure IPv4 addresses on VPP pg0.
475         self.pg0.config_ip4()
476         self.pg0.resolve_arp()
477
478         # Configure IPv4 addresses on VPP pg1.
479         self.pg1.config_ip4()
480
481     def tearDown(self):
482         super(TestVxlanL2Mode, self).tearDown()
483
484     def test_l2_mode(self):
485         """VXLAN L2 mode"""
486         t = VppVxlanTunnel(
487             self, src=self.pg0.local_ip4, dst=self.pg0.remote_ip4, vni=1000, is_l3=False
488         )
489         t.add_vpp_config()
490         t.config_ip4()
491         t.admin_up()
492
493         dstIP = t.local_ip4[:-1] + "2"
494
495         # Create a packet to send
496         p = (
497             Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
498             / IP(src=self.pg1.local_ip4, dst=dstIP)
499             / UDP(sport=555, dport=556)
500             / Raw(b"\x00" * 80)
501         )
502
503         # Expect ARP request
504         rx = self.send_and_expect(self.pg1, [p], self.pg0)
505         for p in rx:
506             self.assertEqual(p[Ether].dst, self.pg0.remote_mac)
507             self.assertEqual(p[Ether].src, self.pg0.local_mac)
508             self.assertEqual(p[ARP].op, 1)
509             self.assertEqual(p[ARP].pdst, dstIP)
510
511         # Resolve ARP
512         VppNeighbor(self, t.sw_if_index, self.pg1.remote_mac, dstIP).add_vpp_config()
513
514         # Send packets
515         NUM_PKTS = 128
516         rx = self.send_and_expect(self.pg1, p * NUM_PKTS, self.pg0)
517         self.assertEqual(NUM_PKTS, len(rx))
518
519
520 if __name__ == "__main__":
521     unittest.main(testRunner=VppTestRunner)