4 from util import ip4_range, reassemble4
6 from framework import VppTestCase
7 from asfframework import VppTestRunner
8 from template_bd import BridgeDomain
10 from scapy.layers.l2 import Ether
11 from scapy.layers.l2 import ARP
12 from scapy.packet import Raw, bind_layers
13 from scapy.layers.inet import IP, UDP
14 from scapy.layers.inet6 import IPv6
15 from scapy.layers.vxlan import VXLAN
16 from scapy.contrib.mpls import MPLS
19 from vpp_ip_route import VppIpRoute, VppRoutePath
20 from vpp_vxlan_tunnel import VppVxlanTunnel
21 from vpp_ip import INVALID_INDEX
22 from vpp_neighbor import VppNeighbor
25 class TestVxlan(BridgeDomain, VppTestCase):
28 def __init__(self, *args):
29 BridgeDomain.__init__(self)
30 VppTestCase.__init__(self, *args)
32 def encapsulate(self, pkt, vni):
34 Encapsulate the original payload frame by adding VXLAN header with its
35 UDP, IP and Ethernet fields
38 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
39 / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4)
40 / UDP(sport=self.dport, dport=self.dport, chksum=0)
41 / VXLAN(vni=vni, flags=self.flags)
45 def ip_range(self, start, end):
46 """range of remote ip's"""
47 return ip4_range(self.pg0.remote_ip4, start, end)
49 def encap_mcast(self, pkt, src_ip, src_mac, vni):
51 Encapsulate the original payload frame by adding VXLAN header with its
52 UDP, IP and Ethernet fields
55 Ether(src=src_mac, dst=self.mcast_mac)
56 / IP(src=src_ip, dst=self.mcast_ip4)
57 / UDP(sport=self.dport, dport=self.dport, chksum=0)
58 / VXLAN(vni=vni, flags=self.flags)
62 def decapsulate(self, pkt):
64 Decapsulate the original payload frame by removing VXLAN header
66 # check if is set I flag
67 self.assertEqual(pkt[VXLAN].flags, int("0x8", 16))
68 return pkt[VXLAN].payload
70 # Method for checking VXLAN encapsulation.
72 def check_encapsulation(self, pkt, vni, local_only=False, mcast_pkt=False):
73 # TODO: add error messages
74 # Verify source MAC is VPP_MAC and destination MAC is MY_MAC resolved
76 self.assertEqual(pkt[Ether].src, self.pg0.local_mac)
79 self.assertEqual(pkt[Ether].dst, self.pg0.remote_mac)
81 self.assertEqual(pkt[Ether].dst, type(self).mcast_mac)
82 # Verify VXLAN tunnel source IP is VPP_IP and destination IP is MY_IP.
83 self.assertEqual(pkt[IP].src, self.pg0.local_ip4)
86 self.assertEqual(pkt[IP].dst, self.pg0.remote_ip4)
88 self.assertEqual(pkt[IP].dst, type(self).mcast_ip4)
89 # Verify UDP destination port is VXLAN 4789, source UDP port could be
91 self.assertEqual(pkt[UDP].dport, self.dport)
93 self.assert_udp_checksum_valid(pkt)
95 self.assertEqual(pkt[VXLAN].vni, vni)
98 def create_vxlan_flood_test_bd(cls, vni, n_ucast_tunnels, port):
99 # Create 10 ucast vxlan tunnels under bd
101 ip_range_end = ip_range_start + n_ucast_tunnels
102 next_hop_address = cls.pg0.remote_ip4
103 for dest_ip4 in ip4_range(next_hop_address, ip_range_start, ip_range_end):
104 # add host route so dest_ip4 will not be resolved
109 [VppRoutePath(next_hop_address, INVALID_INDEX)],
116 src=cls.pg0.local_ip4,
123 cls.vapi.sw_interface_set_l2_bridge(r.sw_if_index, bd_id=vni)
126 def add_del_shared_mcast_dst_load(cls, port, is_add):
128 add or del tunnels sharing the same mcast dst
129 to test vxlan ref_count mechanism
131 n_shared_dst_tunnels = 20
133 vni_end = vni_start + n_shared_dst_tunnels
134 for vni in range(vni_start, vni_end):
137 src=cls.pg0.local_ip4,
146 if r.sw_if_index == 0xFFFFFFFF:
147 raise ValueError("bad sw_if_index: ~0")
149 r.remove_vpp_config()
152 def add_shared_mcast_dst_load(cls, port):
153 cls.add_del_shared_mcast_dst_load(port=port, is_add=1)
156 def del_shared_mcast_dst_load(cls, port):
157 cls.add_del_shared_mcast_dst_load(port=port, is_add=0)
160 def add_del_mcast_tunnels_load(cls, port, is_add):
162 add or del tunnels to test vxlan stability
164 n_distinct_dst_tunnels = 200
166 ip_range_end = ip_range_start + n_distinct_dst_tunnels
167 for dest_ip4 in ip4_range(cls.mcast_ip4, ip_range_start, ip_range_end):
168 vni = bytearray(socket.inet_pton(socket.AF_INET, dest_ip4))[3]
171 src=cls.pg0.local_ip4,
181 r.remove_vpp_config()
184 def add_mcast_tunnels_load(cls, port):
185 cls.add_del_mcast_tunnels_load(port=port, is_add=1)
188 def del_mcast_tunnels_load(cls, port):
189 cls.add_del_mcast_tunnels_load(port=port, is_add=0)
191 # Class method to start the VXLAN test case.
192 # Overrides setUpClass method in VppTestCase class.
193 # Python try..except statement is used to ensure that the tear down of
194 # the class will be executed even if exception is raised.
195 # @param cls The class pointer.
198 super(TestVxlan, cls).setUpClass()
203 # Create 2 pg interfaces.
204 cls.create_pg_interfaces(range(4))
205 for pg in cls.pg_interfaces:
208 # Configure IPv4 addresses on VPP pg0.
211 # Resolve MAC address for VPP's IP address on pg0.
212 cls.pg0.resolve_arp()
214 # Our Multicast address
215 cls.mcast_ip4 = "239.1.1.1"
216 cls.mcast_mac = util.mcast_ip_to_mac(cls.mcast_ip4)
222 def tearDownClass(cls):
223 super(TestVxlan, cls).tearDownClass()
226 super(TestVxlan, self).setUp()
228 def createVxLANInterfaces(self, port=4789):
229 # Create VXLAN VTEP on VPP pg0, and put vxlan_tunnel0 and pg1
233 self.single_tunnel_vni = 0x12345
234 self.single_tunnel_bd = 1
237 src=self.pg0.local_ip4,
238 dst=self.pg0.remote_ip4,
241 vni=self.single_tunnel_vni,
244 self.vapi.sw_interface_set_l2_bridge(
245 rx_sw_if_index=r.sw_if_index, bd_id=self.single_tunnel_bd
247 self.vapi.sw_interface_set_l2_bridge(
248 rx_sw_if_index=self.pg1.sw_if_index, bd_id=self.single_tunnel_bd
251 # Setup vni 2 to test multicast flooding
252 self.n_ucast_tunnels = 10
253 self.mcast_flood_bd = 2
254 self.create_vxlan_flood_test_bd(
255 self.mcast_flood_bd, self.n_ucast_tunnels, self.dport
259 src=self.pg0.local_ip4,
264 vni=self.mcast_flood_bd,
267 self.vapi.sw_interface_set_l2_bridge(
268 rx_sw_if_index=r.sw_if_index, bd_id=self.mcast_flood_bd
270 self.vapi.sw_interface_set_l2_bridge(
271 rx_sw_if_index=self.pg2.sw_if_index, bd_id=self.mcast_flood_bd
274 # Add and delete mcast tunnels to check stability
275 self.add_shared_mcast_dst_load(self.dport)
276 self.add_mcast_tunnels_load(self.dport)
277 self.del_shared_mcast_dst_load(self.dport)
278 self.del_mcast_tunnels_load(self.dport)
280 # Setup vni 3 to test unicast flooding
281 self.ucast_flood_bd = 3
282 self.create_vxlan_flood_test_bd(
283 self.ucast_flood_bd, self.n_ucast_tunnels, self.dport
285 self.vapi.sw_interface_set_l2_bridge(
286 rx_sw_if_index=self.pg3.sw_if_index, bd_id=self.ucast_flood_bd
289 # Set scapy listen custom port for VxLAN
290 bind_layers(UDP, VXLAN, dport=self.dport)
292 def encap_packets(self):
293 def encap_frames(frame, n=10):
296 # Provide IP flow hash difference.
302 self.pg1.add_stream(frames)
304 self.pg0.enable_capture()
307 # Pick received frames and check if they're correctly encapsulated.
308 out = self.pg0.get_capture(n)
312 self.check_encapsulation(pkt, self.single_tunnel_vni)
314 payload = self.decapsulate(pkt)
315 self.assert_eq_pkts(payload, frames[i])
317 sports.add(pkt[UDP].sport)
319 # Check src port randomization presence, not concerned with the
320 # src ports split ratio, just as long as there are more then one.
321 self.assertGreaterEqual(len(sports), min(n, 2))
324 Ether(src="00:00:00:00:00:02", dst="00:00:00:00:00:01")
325 / IP(src="4.3.2.1", dst="1.2.3.4")
326 / UDP(sport=20000, dport=10000)
329 encap_frames(frame_ip4)
332 Ether(src="00:00:00:00:00:02", dst="00:00:00:00:00:01")
333 / IPv6(src="2001:db8::4321", dst="2001:db8::1234")
334 / UDP(sport=20000, dport=10000)
337 encap_frames(frame_ip6)
340 Ether(src="00:00:00:00:00:02", dst="00:00:00:00:00:01")
341 / MPLS(label=44, ttl=64)
342 / IP(src="4.3.2.1", dst="1.2.3.4")
343 / UDP(sport=20000, dport=10000)
346 encap_frames(frame_mpls4)
349 Ether(src="00:00:00:00:00:02", dst="00:00:00:00:00:01")
350 / MPLS(label=44, ttl=64)
351 / IPv6(src="2001:db8::4321", dst="2001:db8::1234")
352 / UDP(sport=20000, dport=10000)
355 encap_frames(frame_mpls6)
357 def encap_big_packet(self):
358 self.vapi.sw_interface_set_mtu(self.pg0.sw_if_index, [1500, 0, 0, 0])
361 Ether(src="00:00:00:00:00:02", dst="00:00:00:00:00:01")
362 / IP(src="4.3.2.1", dst="1.2.3.4")
363 / UDP(sport=20000, dport=10000)
364 / Raw(b"\xa5" * 1450)
367 self.pg1.add_stream([frame])
369 self.pg0.enable_capture()
373 # Pick first received frame and check if it's correctly encapsulated.
374 out = self.pg0.get_capture(2)
376 pkt = reassemble4(out)
378 self.check_encapsulation(pkt, self.single_tunnel_vni)
380 payload = self.decapsulate(pkt)
382 # self.assert_eq_pkts(payload, frame)
385 Tests with default port (4789)
388 def test_decap(self):
389 """Decapsulation test
392 self.createVxLANInterfaces()
393 super(TestVxlan, self).test_decap()
395 def test_encap(self):
396 """Encapsulation test
399 self.createVxLANInterfaces()
402 def test_encap_big_packet(self):
403 """Encapsulation test send big frame from pg1
404 Verify receipt of encapsulated frames on pg0
406 self.createVxLANInterfaces()
407 self.encap_big_packet()
409 def test_ucast_flood(self):
410 """Unicast flood test
413 self.createVxLANInterfaces()
414 super(TestVxlan, self).test_ucast_flood()
416 def test_mcast_flood(self):
417 """Multicast flood test
420 self.createVxLANInterfaces()
421 super(TestVxlan, self).test_mcast_flood()
423 def test_mcast_rcv(self):
424 """Multicast receive test
427 self.createVxLANInterfaces()
428 super(TestVxlan, self).test_mcast_rcv()
431 Tests with custom port
434 def test_decap_custom_port(self):
435 """Decapsulation test custom port
438 self.createVxLANInterfaces(1111)
439 super(TestVxlan, self).test_decap()
441 def test_encap_custom_port(self):
442 """Encapsulation test custom port
445 self.createVxLANInterfaces(1111)
446 super(TestVxlan, self).test_encap()
448 def test_ucast_flood_custom_port(self):
449 """Unicast flood test custom port
452 self.createVxLANInterfaces(1111)
453 super(TestVxlan, self).test_ucast_flood()
455 def test_mcast_flood_custom_port(self):
456 """Multicast flood test custom port
459 self.createVxLANInterfaces(1111)
460 super(TestVxlan, self).test_mcast_flood()
462 def test_mcast_rcv_custom_port(self):
463 """Multicast receive test custom port
466 self.createVxLANInterfaces(1111)
467 super(TestVxlan, self).test_mcast_rcv()
469 # Method to define VPP actions before tear down of the test case.
470 # Overrides tearDown method in VppTestCase class.
471 # @param self The object pointer.
474 super(TestVxlan, self).tearDown()
476 def show_commands_at_teardown(self):
477 self.logger.info(self.vapi.cli("show bridge-domain 1 detail"))
478 self.logger.info(self.vapi.cli("show bridge-domain 2 detail"))
479 self.logger.info(self.vapi.cli("show bridge-domain 3 detail"))
480 self.logger.info(self.vapi.cli("show vxlan tunnel"))
483 class TestVxlan2(VppTestCase):
484 """VXLAN Test Case"""
487 super(TestVxlan2, self).setUp()
489 # Create 2 pg interfaces.
490 self.create_pg_interfaces(range(4))
491 for pg in self.pg_interfaces:
494 # Configure IPv4 addresses on VPP pg0.
495 self.pg0.config_ip4()
496 self.pg0.resolve_arp()
499 super(TestVxlan2, self).tearDown()
501 def test_xconnect(self):
502 """VXLAN source address not local"""
505 # test the broken configuration of a VXLAN tunnel whose
506 # source address is not local ot the box. packets sent
507 # through the tunnel should be dropped
509 t = VppVxlanTunnel(self, src="10.0.0.5", dst=self.pg0.local_ip4, vni=1000)
513 self.vapi.sw_interface_set_l2_xconnect(
514 t.sw_if_index, self.pg1.sw_if_index, enable=1
516 self.vapi.sw_interface_set_l2_xconnect(
517 self.pg1.sw_if_index, t.sw_if_index, enable=1
521 Ether(src="00:11:22:33:44:55", dst="00:00:00:11:22:33")
522 / IP(src="4.3.2.1", dst="1.2.3.4")
523 / UDP(sport=20000, dport=10000)
524 / Raw(b"\xa5" * 1450)
527 rx = self.send_and_assert_no_replies(self.pg1, [p])
530 class TestVxlanL2Mode(VppTestCase):
531 """VXLAN Test Case"""
534 super(TestVxlanL2Mode, self).setUp()
536 # Create 2 pg interfaces.
537 self.create_pg_interfaces(range(2))
538 for pg in self.pg_interfaces:
541 # Configure IPv4 addresses on VPP pg0.
542 self.pg0.config_ip4()
543 self.pg0.resolve_arp()
545 # Configure IPv4 addresses on VPP pg1.
546 self.pg1.config_ip4()
549 super(TestVxlanL2Mode, self).tearDown()
551 def test_l2_mode(self):
554 self, src=self.pg0.local_ip4, dst=self.pg0.remote_ip4, vni=1000, is_l3=False
560 dstIP = t.local_ip4[:-1] + "2"
562 # Create a packet to send
564 Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac)
565 / IP(src=self.pg1.local_ip4, dst=dstIP)
566 / UDP(sport=555, dport=556)
571 rx = self.send_and_expect(self.pg1, [p], self.pg0)
573 self.assertEqual(p[Ether].dst, self.pg0.remote_mac)
574 self.assertEqual(p[Ether].src, self.pg0.local_mac)
575 self.assertEqual(p[ARP].op, 1)
576 self.assertEqual(p[ARP].pdst, dstIP)
579 VppNeighbor(self, t.sw_if_index, self.pg1.remote_mac, dstIP).add_vpp_config()
583 rx = self.send_and_expect(self.pg1, p * NUM_PKTS, self.pg0)
584 self.assertEqual(NUM_PKTS, len(rx))
587 if __name__ == "__main__":
588 unittest.main(testRunner=VppTestRunner)