4 from util import ip4_range, reassemble4
6 from framework import VppTestCase, VppTestRunner
7 from template_bd import BridgeDomain
9 from scapy.layers.l2 import Ether
10 from scapy.layers.l2 import ARP
11 from scapy.packet import Raw, bind_layers
12 from scapy.layers.inet import IP, UDP
13 from scapy.layers.vxlan import VXLAN
16 from vpp_ip_route import VppIpRoute, VppRoutePath
17 from vpp_vxlan_tunnel import VppVxlanTunnel
18 from vpp_ip import INVALID_INDEX
19 from vpp_neighbor import VppNeighbor
22 class TestVxlan(BridgeDomain, VppTestCase):
25 def __init__(self, *args):
26 BridgeDomain.__init__(self)
27 VppTestCase.__init__(self, *args)
29 def encapsulate(self, pkt, vni):
31 Encapsulate the original payload frame by adding VXLAN header with its
32 UDP, IP and Ethernet fields
35 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
36 / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
37 / UDP(sport=self.dport, dport=self.dport, chksum=0)
38 / VXLAN(vni=vni, flags=self.flags)
42 def ip_range(self, start, end):
43 """range of remote ip's"""
44 return ip4_range(self.pg0.remote_ip4, start, end)
46 def encap_mcast(self, pkt, src_ip, src_mac, vni):
48 Encapsulate the original payload frame by adding VXLAN header with its
49 UDP, IP and Ethernet fields
52 Ether(src=src_mac, dst=self.mcast_mac)
53 / IP(src=src_ip, dst=self.mcast_ip4)
54 / UDP(sport=self.dport, dport=self.dport, chksum=0)
55 / VXLAN(vni=vni, flags=self.flags)
59 def decapsulate(self, pkt):
61 Decapsulate the original payload frame by removing VXLAN header
63 # check if is set I flag
64 self.assertEqual(pkt[VXLAN].flags, int("0x8", 16))
65 return pkt[VXLAN].payload
67 # Method for checking VXLAN encapsulation.
69 def check_encapsulation(self, pkt, vni, local_only=False, mcast_pkt=False):
70 # TODO: add error messages
71 # Verify source MAC is VPP_MAC and destination MAC is MY_MAC resolved
73 self.assertEqual(pkt[Ether].src, self.pg0.local_mac)
76 self.assertEqual(pkt[Ether].dst, self.pg0.remote_mac)
78 self.assertEqual(pkt[Ether].dst, type(self).mcast_mac)
79 # Verify VXLAN tunnel source IP is VPP_IP and destination IP is MY_IP.
80 self.assertEqual(pkt[IP].src, self.pg0.local_ip4)
83 self.assertEqual(pkt[IP].dst, self.pg0.remote_ip4)
85 self.assertEqual(pkt[IP].dst, type(self).mcast_ip4)
86 # Verify UDP destination port is VXLAN 4789, source UDP port could be
88 self.assertEqual(pkt[UDP].dport, self.dport)
90 self.assert_udp_checksum_valid(pkt)
92 self.assertEqual(pkt[VXLAN].vni, vni)
95 def create_vxlan_flood_test_bd(cls, vni, n_ucast_tunnels, port):
96 # Create 10 ucast vxlan tunnels under bd
98 ip_range_end = ip_range_start + n_ucast_tunnels
99 next_hop_address = cls.pg0.remote_ip4
100 for dest_ip4 in ip4_range(next_hop_address, ip_range_start, ip_range_end):
101 # add host route so dest_ip4 will not be resolved
106 [VppRoutePath(next_hop_address, INVALID_INDEX)],
113 src=cls.pg0.local_ip4,
120 cls.vapi.sw_interface_set_l2_bridge(r.sw_if_index, bd_id=vni)
123 def add_del_shared_mcast_dst_load(cls, port, is_add):
125 add or del tunnels sharing the same mcast dst
126 to test vxlan ref_count mechanism
128 n_shared_dst_tunnels = 20
130 vni_end = vni_start + n_shared_dst_tunnels
131 for vni in range(vni_start, vni_end):
134 src=cls.pg0.local_ip4,
143 if r.sw_if_index == 0xFFFFFFFF:
144 raise ValueError("bad sw_if_index: ~0")
146 r.remove_vpp_config()
149 def add_shared_mcast_dst_load(cls, port):
150 cls.add_del_shared_mcast_dst_load(port=port, is_add=1)
153 def del_shared_mcast_dst_load(cls, port):
154 cls.add_del_shared_mcast_dst_load(port=port, is_add=0)
157 def add_del_mcast_tunnels_load(cls, port, is_add):
159 add or del tunnels to test vxlan stability
161 n_distinct_dst_tunnels = 200
163 ip_range_end = ip_range_start + n_distinct_dst_tunnels
164 for dest_ip4 in ip4_range(cls.mcast_ip4, ip_range_start, ip_range_end):
165 vni = bytearray(socket.inet_pton(socket.AF_INET, dest_ip4))[3]
168 src=cls.pg0.local_ip4,
178 r.remove_vpp_config()
181 def add_mcast_tunnels_load(cls, port):
182 cls.add_del_mcast_tunnels_load(port=port, is_add=1)
185 def del_mcast_tunnels_load(cls, port):
186 cls.add_del_mcast_tunnels_load(port=port, is_add=0)
188 # Class method to start the VXLAN test case.
189 # Overrides setUpClass method in VppTestCase class.
190 # Python try..except statement is used to ensure that the tear down of
191 # the class will be executed even if exception is raised.
192 # @param cls The class pointer.
195 super(TestVxlan, cls).setUpClass()
200 # Create 2 pg interfaces.
201 cls.create_pg_interfaces(range(4))
202 for pg in cls.pg_interfaces:
205 # Configure IPv4 addresses on VPP pg0.
208 # Resolve MAC address for VPP's IP address on pg0.
209 cls.pg0.resolve_arp()
211 # Our Multicast address
212 cls.mcast_ip4 = "239.1.1.1"
213 cls.mcast_mac = util.mcast_ip_to_mac(cls.mcast_ip4)
219 def tearDownClass(cls):
220 super(TestVxlan, cls).tearDownClass()
223 super(TestVxlan, self).setUp()
225 def createVxLANInterfaces(self, port=4789):
226 # Create VXLAN VTEP on VPP pg0, and put vxlan_tunnel0 and pg1
230 self.single_tunnel_vni = 0x12345
231 self.single_tunnel_bd = 1
234 src=self.pg0.local_ip4,
235 dst=self.pg0.remote_ip4,
238 vni=self.single_tunnel_vni,
241 self.vapi.sw_interface_set_l2_bridge(
242 rx_sw_if_index=r.sw_if_index, bd_id=self.single_tunnel_bd
244 self.vapi.sw_interface_set_l2_bridge(
245 rx_sw_if_index=self.pg1.sw_if_index, bd_id=self.single_tunnel_bd
248 # Setup vni 2 to test multicast flooding
249 self.n_ucast_tunnels = 10
250 self.mcast_flood_bd = 2
251 self.create_vxlan_flood_test_bd(
252 self.mcast_flood_bd, self.n_ucast_tunnels, self.dport
256 src=self.pg0.local_ip4,
261 vni=self.mcast_flood_bd,
264 self.vapi.sw_interface_set_l2_bridge(
265 rx_sw_if_index=r.sw_if_index, bd_id=self.mcast_flood_bd
267 self.vapi.sw_interface_set_l2_bridge(
268 rx_sw_if_index=self.pg2.sw_if_index, bd_id=self.mcast_flood_bd
271 # Add and delete mcast tunnels to check stability
272 self.add_shared_mcast_dst_load(self.dport)
273 self.add_mcast_tunnels_load(self.dport)
274 self.del_shared_mcast_dst_load(self.dport)
275 self.del_mcast_tunnels_load(self.dport)
277 # Setup vni 3 to test unicast flooding
278 self.ucast_flood_bd = 3
279 self.create_vxlan_flood_test_bd(
280 self.ucast_flood_bd, self.n_ucast_tunnels, self.dport
282 self.vapi.sw_interface_set_l2_bridge(
283 rx_sw_if_index=self.pg3.sw_if_index, bd_id=self.ucast_flood_bd
286 # Set scapy listen custom port for VxLAN
287 bind_layers(UDP, VXLAN, dport=self.dport)
289 def encap_big_packet(self):
290 self.vapi.sw_interface_set_mtu(self.pg0.sw_if_index, [1500, 0, 0, 0])
293 Ether(src="00:00:00:00:00:02", dst="00:00:00:00:00:01")
294 / IP(src="4.3.2.1", dst="1.2.3.4")
295 / UDP(sport=20000, dport=10000)
296 / Raw(b"\xa5" * 1450)
299 self.pg1.add_stream([frame])
301 self.pg0.enable_capture()
305 # Pick first received frame and check if it's correctly encapsulated.
306 out = self.pg0.get_capture(2)
308 pkt = reassemble4(out)
310 self.check_encapsulation(pkt, self.single_tunnel_vni)
312 payload = self.decapsulate(pkt)
314 # self.assert_eq_pkts(payload, frame)
317 Tests with default port (4789)
320 def test_decap(self):
321 """Decapsulation test
324 self.createVxLANInterfaces()
325 super(TestVxlan, self).test_decap()
327 def test_encap(self):
328 """Encapsulation test
331 self.createVxLANInterfaces()
332 super(TestVxlan, self).test_encap()
334 def test_encap_big_packet(self):
335 """Encapsulation test send big frame from pg1
336 Verify receipt of encapsulated frames on pg0
338 self.createVxLANInterfaces()
339 self.encap_big_packet()
341 def test_ucast_flood(self):
342 """Unicast flood test
345 self.createVxLANInterfaces()
346 super(TestVxlan, self).test_ucast_flood()
348 def test_mcast_flood(self):
349 """Multicast flood test
352 self.createVxLANInterfaces()
353 super(TestVxlan, self).test_mcast_flood()
355 def test_mcast_rcv(self):
356 """Multicast receive test
359 self.createVxLANInterfaces()
360 super(TestVxlan, self).test_mcast_rcv()
363 Tests with custom port
366 def test_decap_custom_port(self):
367 """Decapsulation test custom port
370 self.createVxLANInterfaces(1111)
371 super(TestVxlan, self).test_decap()
373 def test_encap_custom_port(self):
374 """Encapsulation test custom port
377 self.createVxLANInterfaces(1111)
378 super(TestVxlan, self).test_encap()
380 def test_ucast_flood_custom_port(self):
381 """Unicast flood test custom port
384 self.createVxLANInterfaces(1111)
385 super(TestVxlan, self).test_ucast_flood()
387 def test_mcast_flood_custom_port(self):
388 """Multicast flood test custom port
391 self.createVxLANInterfaces(1111)
392 super(TestVxlan, self).test_mcast_flood()
394 def test_mcast_rcv_custom_port(self):
395 """Multicast receive test custom port
398 self.createVxLANInterfaces(1111)
399 super(TestVxlan, self).test_mcast_rcv()
401 # Method to define VPP actions before tear down of the test case.
402 # Overrides tearDown method in VppTestCase class.
403 # @param self The object pointer.
406 super(TestVxlan, self).tearDown()
408 def show_commands_at_teardown(self):
409 self.logger.info(self.vapi.cli("show bridge-domain 1 detail"))
410 self.logger.info(self.vapi.cli("show bridge-domain 2 detail"))
411 self.logger.info(self.vapi.cli("show bridge-domain 3 detail"))
412 self.logger.info(self.vapi.cli("show vxlan tunnel"))
415 class TestVxlan2(VppTestCase):
416 """VXLAN Test Case"""
419 super(TestVxlan2, self).setUp()
421 # Create 2 pg interfaces.
422 self.create_pg_interfaces(range(4))
423 for pg in self.pg_interfaces:
426 # Configure IPv4 addresses on VPP pg0.
427 self.pg0.config_ip4()
428 self.pg0.resolve_arp()
431 super(TestVxlan2, self).tearDown()
433 def test_xconnect(self):
434 """VXLAN source address not local"""
437 # test the broken configuration of a VXLAN tunnel whose
438 # source address is not local ot the box. packets sent
439 # through the tunnel should be dropped
441 t = VppVxlanTunnel(self, src="10.0.0.5", dst=self.pg0.local_ip4, vni=1000)
445 self.vapi.sw_interface_set_l2_xconnect(
446 t.sw_if_index, self.pg1.sw_if_index, enable=1
448 self.vapi.sw_interface_set_l2_xconnect(
449 self.pg1.sw_if_index, t.sw_if_index, enable=1
453 Ether(src="00:11:22:33:44:55", dst="00:00:00:11:22:33")
454 / IP(src="4.3.2.1", dst="1.2.3.4")
455 / UDP(sport=20000, dport=10000)
456 / Raw(b"\xa5" * 1450)
459 rx = self.send_and_assert_no_replies(self.pg1, [p])
462 class TestVxlanL2Mode(VppTestCase):
463 """VXLAN Test Case"""
466 super(TestVxlanL2Mode, self).setUp()
468 # Create 2 pg interfaces.
469 self.create_pg_interfaces(range(2))
470 for pg in self.pg_interfaces:
473 # Configure IPv4 addresses on VPP pg0.
474 self.pg0.config_ip4()
475 self.pg0.resolve_arp()
477 # Configure IPv4 addresses on VPP pg1.
478 self.pg1.config_ip4()
481 super(TestVxlanL2Mode, self).tearDown()
483 def test_l2_mode(self):
486 self, src=self.pg0.local_ip4, dst=self.pg0.remote_ip4, vni=1000, is_l3=False
492 dstIP = t.local_ip4[:-1] + "2"
494 # Create a packet to send
496 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
497 / IP(src=self.pg1.local_ip4, dst=dstIP)
498 / UDP(sport=555, dport=556)
503 rx = self.send_and_expect(self.pg1, [p], self.pg0)
505 self.assertEqual(p[Ether].dst, self.pg0.remote_mac)
506 self.assertEqual(p[Ether].src, self.pg0.local_mac)
507 self.assertEqual(p[ARP].op, 1)
508 self.assertEqual(p[ARP].pdst, dstIP)
511 VppNeighbor(self, t.sw_if_index, self.pg1.remote_mac, dstIP).add_vpp_config()
515 rx = self.send_and_expect(self.pg1, p * NUM_PKTS, self.pg0)
516 self.assertEqual(NUM_PKTS, len(rx))
519 if __name__ == "__main__":
520 unittest.main(testRunner=VppTestRunner)