X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_vxlan.py;h=913fc4018e0c7443e040347e39e73ac2753f80c5;hb=d9b0c6fbf7aa5bd9af84264105b39c82028a4a29;hp=aa069dc6e7bfd373f73f2e231bd7a404ad0b24af;hpb=90cf21b5d8fd2d3e531e841dcd752311df5f8a50;p=vpp.git diff --git a/test/test_vxlan.py b/test/test_vxlan.py index aa069dc6e7b..913fc4018e0 100644 --- a/test/test_vxlan.py +++ b/test/test_vxlan.py @@ -1,19 +1,26 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 import socket -from util import ip4n_range, ip4_range, reassemble4 +from util import ip4_range, reassemble4 import unittest from framework import VppTestCase, VppTestRunner from template_bd import BridgeDomain -from scapy.layers.l2 import Ether, Raw +from scapy.layers.l2 import Ether +from scapy.layers.l2 import ARP +from scapy.packet import Raw, bind_layers from scapy.layers.inet import IP, UDP from scapy.layers.vxlan import VXLAN -from scapy.utils import atol + +import util +from vpp_ip_route import VppIpRoute, VppRoutePath +from vpp_vxlan_tunnel import VppVxlanTunnel +from vpp_ip import INVALID_INDEX +from vpp_neighbor import VppNeighbor class TestVxlan(BridgeDomain, VppTestCase): - """ VXLAN Test Case """ + """VXLAN Test Case""" def __init__(self, *args): BridgeDomain.__init__(self) @@ -24,14 +31,16 @@ class TestVxlan(BridgeDomain, VppTestCase): Encapsulate the original payload frame by adding VXLAN header with its UDP, IP and Ethernet fields """ - return (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) / - IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) / - UDP(sport=self.dport, dport=self.dport, chksum=0) / - VXLAN(vni=vni, flags=self.flags) / - pkt) + return ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) + / UDP(sport=self.dport, dport=self.dport, chksum=0) + / VXLAN(vni=vni, flags=self.flags) + / pkt + ) def ip_range(self, start, end): - """ range of remote ip's """ + """range of remote ip's""" return ip4_range(self.pg0.remote_ip4, start, end) def encap_mcast(self, pkt, src_ip, src_mac, vni): @@ -39,18 +48,20 @@ class TestVxlan(BridgeDomain, VppTestCase): Encapsulate the original payload frame by adding VXLAN header with its UDP, IP and Ethernet fields """ - return (Ether(src=src_mac, dst=self.mcast_mac) / - IP(src=src_ip, dst=self.mcast_ip4) / - UDP(sport=self.dport, dport=self.dport, chksum=0) / - VXLAN(vni=vni, flags=self.flags) / - pkt) + return ( + Ether(src=src_mac, dst=self.mcast_mac) + / IP(src=src_ip, dst=self.mcast_ip4) + / UDP(sport=self.dport, dport=self.dport, chksum=0) + / VXLAN(vni=vni, flags=self.flags) + / pkt + ) def decapsulate(self, pkt): """ Decapsulate the original payload frame by removing VXLAN header """ # check if is set I flag - self.assertEqual(pkt[VXLAN].flags, int('0x8', 16)) + self.assertEqual(pkt[VXLAN].flags, int("0x8", 16)) return pkt[VXLAN].payload # Method for checking VXLAN encapsulation. @@ -74,30 +85,42 @@ class TestVxlan(BridgeDomain, VppTestCase): self.assertEqual(pkt[IP].dst, type(self).mcast_ip4) # Verify UDP destination port is VXLAN 4789, source UDP port could be # arbitrary. - self.assertEqual(pkt[UDP].dport, type(self).dport) - # TODO: checksum check + self.assertEqual(pkt[UDP].dport, self.dport) + # Verify UDP checksum + self.assert_udp_checksum_valid(pkt) # Verify VNI self.assertEqual(pkt[VXLAN].vni, vni) @classmethod - def create_vxlan_flood_test_bd(cls, vni, n_ucast_tunnels): + def create_vxlan_flood_test_bd(cls, vni, n_ucast_tunnels, port): # Create 10 ucast vxlan tunnels under bd ip_range_start = 10 ip_range_end = ip_range_start + n_ucast_tunnels - next_hop_address = cls.pg0.remote_ip4n - for dest_ip4n in ip4n_range(next_hop_address, ip_range_start, - ip_range_end): - # add host route so dest_ip4n will not be resolved - cls.vapi.ip_add_del_route(dst_address=dest_ip4n, - dst_address_length=32, - next_hop_address=next_hop_address) - r = cls.vapi.vxlan_add_del_tunnel(src_address=cls.pg0.local_ip4n, - dst_address=dest_ip4n, vni=vni) - cls.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=r.sw_if_index, - bd_id=vni) + next_hop_address = cls.pg0.remote_ip4 + for dest_ip4 in ip4_range(next_hop_address, ip_range_start, ip_range_end): + # add host route so dest_ip4 will not be resolved + rip = VppIpRoute( + cls, + dest_ip4, + 32, + [VppRoutePath(next_hop_address, INVALID_INDEX)], + register=False, + ) + rip.add_vpp_config() + + r = VppVxlanTunnel( + cls, + src=cls.pg0.local_ip4, + src_port=port, + dst_port=port, + dst=dest_ip4, + vni=vni, + ) + r.add_vpp_config() + cls.vapi.sw_interface_set_l2_bridge(r.sw_if_index, bd_id=vni) @classmethod - def add_del_shared_mcast_dst_load(cls, is_add): + def add_del_shared_mcast_dst_load(cls, port, is_add): """ add or del tunnels sharing the same mcast dst to test vxlan ref_count mechanism @@ -106,44 +129,61 @@ class TestVxlan(BridgeDomain, VppTestCase): vni_start = 10000 vni_end = vni_start + n_shared_dst_tunnels for vni in range(vni_start, vni_end): - r = cls.vapi.vxlan_add_del_tunnel(src_address=cls.pg0.local_ip4n, - dst_address=cls.mcast_ip4n, - mcast_sw_if_index=1, - is_add=is_add, vni=vni) - if r.sw_if_index == 0xffffffff: - raise ValueError("bad sw_if_index: ~0") + r = VppVxlanTunnel( + cls, + src=cls.pg0.local_ip4, + src_port=port, + dst_port=port, + dst=cls.mcast_ip4, + mcast_sw_if_index=1, + vni=vni, + ) + if is_add: + r.add_vpp_config() + if r.sw_if_index == 0xFFFFFFFF: + raise ValueError("bad sw_if_index: ~0") + else: + r.remove_vpp_config() @classmethod - def add_shared_mcast_dst_load(cls): - cls.add_del_shared_mcast_dst_load(is_add=1) + def add_shared_mcast_dst_load(cls, port): + cls.add_del_shared_mcast_dst_load(port=port, is_add=1) @classmethod - def del_shared_mcast_dst_load(cls): - cls.add_del_shared_mcast_dst_load(is_add=0) + def del_shared_mcast_dst_load(cls, port): + cls.add_del_shared_mcast_dst_load(port=port, is_add=0) @classmethod - def add_del_mcast_tunnels_load(cls, is_add): + def add_del_mcast_tunnels_load(cls, port, is_add): """ add or del tunnels to test vxlan stability """ n_distinct_dst_tunnels = 200 ip_range_start = 10 ip_range_end = ip_range_start + n_distinct_dst_tunnels - for dest_ip4n in ip4n_range(cls.mcast_ip4n, ip_range_start, - ip_range_end): - vni = bytearray(dest_ip4n)[3] - cls.vapi.vxlan_add_del_tunnel(src_address=cls.pg0.local_ip4n, - dst_address=dest_ip4n, - mcast_sw_if_index=1, is_add=is_add, - vni=vni) + for dest_ip4 in ip4_range(cls.mcast_ip4, ip_range_start, ip_range_end): + vni = bytearray(socket.inet_pton(socket.AF_INET, dest_ip4))[3] + r = VppVxlanTunnel( + cls, + src=cls.pg0.local_ip4, + src_port=port, + dst_port=port, + dst=dest_ip4, + mcast_sw_if_index=1, + vni=vni, + ) + if is_add: + r.add_vpp_config() + else: + r.remove_vpp_config() @classmethod - def add_mcast_tunnels_load(cls): - cls.add_del_mcast_tunnels_load(is_add=1) + def add_mcast_tunnels_load(cls, port): + cls.add_del_mcast_tunnels_load(port=port, is_add=1) @classmethod - def del_mcast_tunnels_load(cls): - cls.add_del_mcast_tunnels_load(is_add=0) + def del_mcast_tunnels_load(cls, port): + cls.add_del_mcast_tunnels_load(port=port, is_add=0) # Class method to start the VXLAN test case. # Overrides setUpClass method in VppTestCase class. @@ -155,7 +195,6 @@ class TestVxlan(BridgeDomain, VppTestCase): super(TestVxlan, cls).setUpClass() try: - cls.dport = 4789 cls.flags = 0x8 # Create 2 pg interfaces. @@ -170,68 +209,92 @@ class TestVxlan(BridgeDomain, VppTestCase): cls.pg0.resolve_arp() # Our Multicast address - cls.mcast_ip4 = '239.1.1.1' - cls.mcast_ip4n = socket.inet_pton(socket.AF_INET, cls.mcast_ip4) - iplong = atol(cls.mcast_ip4) - cls.mcast_mac = "01:00:5e:%02x:%02x:%02x" % ( - (iplong >> 16) & 0x7F, (iplong >> 8) & 0xFF, iplong & 0xFF) - - # Create VXLAN VTEP on VPP pg0, and put vxlan_tunnel0 and pg1 - # into BD. - cls.single_tunnel_bd = 1 - r = cls.vapi.vxlan_add_del_tunnel(src_address=cls.pg0.local_ip4n, - dst_address=cls.pg0.remote_ip4n, - vni=cls.single_tunnel_bd) - cls.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=r.sw_if_index, - bd_id=cls.single_tunnel_bd) - cls.vapi.sw_interface_set_l2_bridge( - rx_sw_if_index=cls.pg1.sw_if_index, bd_id=cls.single_tunnel_bd) - - # Setup vni 2 to test multicast flooding - cls.n_ucast_tunnels = 10 - cls.mcast_flood_bd = 2 - cls.create_vxlan_flood_test_bd(cls.mcast_flood_bd, - cls.n_ucast_tunnels) - r = cls.vapi.vxlan_add_del_tunnel(src_address=cls.pg0.local_ip4n, - dst_address=cls.mcast_ip4n, - mcast_sw_if_index=1, - vni=cls.mcast_flood_bd) - cls.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=r.sw_if_index, - bd_id=cls.mcast_flood_bd) - cls.vapi.sw_interface_set_l2_bridge( - rx_sw_if_index=cls.pg2.sw_if_index, bd_id=cls.mcast_flood_bd) - - # Add and delete mcast tunnels to check stability - cls.add_shared_mcast_dst_load() - cls.add_mcast_tunnels_load() - cls.del_shared_mcast_dst_load() - cls.del_mcast_tunnels_load() - - # Setup vni 3 to test unicast flooding - cls.ucast_flood_bd = 3 - cls.create_vxlan_flood_test_bd(cls.ucast_flood_bd, - cls.n_ucast_tunnels) - cls.vapi.sw_interface_set_l2_bridge( - rx_sw_if_index=cls.pg3.sw_if_index, bd_id=cls.ucast_flood_bd) + cls.mcast_ip4 = "239.1.1.1" + cls.mcast_mac = util.mcast_ip_to_mac(cls.mcast_ip4) except Exception: - super(TestVxlan, cls).tearDownClass() + cls.tearDownClass() raise @classmethod def tearDownClass(cls): super(TestVxlan, cls).tearDownClass() - def test_encap_big_packet(self): - """ Encapsulation test send big frame from pg1 - Verify receipt of encapsulated frames on pg0 - """ - + def setUp(self): + super(TestVxlan, self).setUp() + + def createVxLANInterfaces(self, port=4789): + # Create VXLAN VTEP on VPP pg0, and put vxlan_tunnel0 and pg1 + # into BD. + self.dport = port + + self.single_tunnel_vni = 0x12345 + self.single_tunnel_bd = 1 + r = VppVxlanTunnel( + self, + src=self.pg0.local_ip4, + dst=self.pg0.remote_ip4, + src_port=self.dport, + dst_port=self.dport, + vni=self.single_tunnel_vni, + ) + r.add_vpp_config() + self.vapi.sw_interface_set_l2_bridge( + rx_sw_if_index=r.sw_if_index, bd_id=self.single_tunnel_bd + ) + self.vapi.sw_interface_set_l2_bridge( + rx_sw_if_index=self.pg1.sw_if_index, bd_id=self.single_tunnel_bd + ) + + # Setup vni 2 to test multicast flooding + self.n_ucast_tunnels = 10 + self.mcast_flood_bd = 2 + self.create_vxlan_flood_test_bd( + self.mcast_flood_bd, self.n_ucast_tunnels, self.dport + ) + r = VppVxlanTunnel( + self, + src=self.pg0.local_ip4, + dst=self.mcast_ip4, + src_port=self.dport, + dst_port=self.dport, + mcast_sw_if_index=1, + vni=self.mcast_flood_bd, + ) + r.add_vpp_config() + self.vapi.sw_interface_set_l2_bridge( + rx_sw_if_index=r.sw_if_index, bd_id=self.mcast_flood_bd + ) + self.vapi.sw_interface_set_l2_bridge( + rx_sw_if_index=self.pg2.sw_if_index, bd_id=self.mcast_flood_bd + ) + + # Add and delete mcast tunnels to check stability + self.add_shared_mcast_dst_load(self.dport) + self.add_mcast_tunnels_load(self.dport) + self.del_shared_mcast_dst_load(self.dport) + self.del_mcast_tunnels_load(self.dport) + + # Setup vni 3 to test unicast flooding + self.ucast_flood_bd = 3 + self.create_vxlan_flood_test_bd( + self.ucast_flood_bd, self.n_ucast_tunnels, self.dport + ) + self.vapi.sw_interface_set_l2_bridge( + rx_sw_if_index=self.pg3.sw_if_index, bd_id=self.ucast_flood_bd + ) + + # Set scapy listen custom port for VxLAN + bind_layers(UDP, VXLAN, dport=self.dport) + + def encap_big_packet(self): self.vapi.sw_interface_set_mtu(self.pg0.sw_if_index, [1500, 0, 0, 0]) - frame = (Ether(src='00:00:00:00:00:02', dst='00:00:00:00:00:01') / - IP(src='4.3.2.1', dst='1.2.3.4') / - UDP(sport=20000, dport=10000) / - Raw('\xa5' * 1450)) + frame = ( + Ether(src="00:00:00:00:00:02", dst="00:00:00:00:00:01") + / IP(src="4.3.2.1", dst="1.2.3.4") + / UDP(sport=20000, dport=10000) + / Raw(b"\xa5" * 1450) + ) self.pg1.add_stream([frame]) @@ -244,15 +307,101 @@ class TestVxlan(BridgeDomain, VppTestCase): ether = out[0] pkt = reassemble4(out) pkt = ether / pkt - self.check_encapsulation(pkt, self.single_tunnel_bd) + self.check_encapsulation(pkt, self.single_tunnel_vni) payload = self.decapsulate(pkt) # TODO: Scapy bug? # self.assert_eq_pkts(payload, frame) + """ + Tests with default port (4789) + """ + + def test_decap(self): + """Decapsulation test + from BridgeDoman + """ + self.createVxLANInterfaces() + super(TestVxlan, self).test_decap() + + def test_encap(self): + """Encapsulation test + from BridgeDoman + """ + self.createVxLANInterfaces() + super(TestVxlan, self).test_encap() + + def test_encap_big_packet(self): + """Encapsulation test send big frame from pg1 + Verify receipt of encapsulated frames on pg0 + """ + self.createVxLANInterfaces() + self.encap_big_packet() + + def test_ucast_flood(self): + """Unicast flood test + from BridgeDoman + """ + self.createVxLANInterfaces() + super(TestVxlan, self).test_ucast_flood() + + def test_mcast_flood(self): + """Multicast flood test + from BridgeDoman + """ + self.createVxLANInterfaces() + super(TestVxlan, self).test_mcast_flood() + + def test_mcast_rcv(self): + """Multicast receive test + from BridgeDoman + """ + self.createVxLANInterfaces() + super(TestVxlan, self).test_mcast_rcv() + + """ + Tests with custom port + """ + + def test_decap_custom_port(self): + """Decapsulation test custom port + from BridgeDoman + """ + self.createVxLANInterfaces(1111) + super(TestVxlan, self).test_decap() + + def test_encap_custom_port(self): + """Encapsulation test custom port + from BridgeDoman + """ + self.createVxLANInterfaces(1111) + super(TestVxlan, self).test_encap() + + def test_ucast_flood_custom_port(self): + """Unicast flood test custom port + from BridgeDoman + """ + self.createVxLANInterfaces(1111) + super(TestVxlan, self).test_ucast_flood() + + def test_mcast_flood_custom_port(self): + """Multicast flood test custom port + from BridgeDoman + """ + self.createVxLANInterfaces(1111) + super(TestVxlan, self).test_mcast_flood() + + def test_mcast_rcv_custom_port(self): + """Multicast receive test custom port + from BridgeDoman + """ + self.createVxLANInterfaces(1111) + super(TestVxlan, self).test_mcast_rcv() + # Method to define VPP actions before tear down of the test case. # Overrides tearDown method in VppTestCase class. # @param self The object pointer. + def tearDown(self): super(TestVxlan, self).tearDown() @@ -263,5 +412,109 @@ class TestVxlan(BridgeDomain, VppTestCase): self.logger.info(self.vapi.cli("show vxlan tunnel")) -if __name__ == '__main__': +class TestVxlan2(VppTestCase): + """VXLAN Test Case""" + + def setUp(self): + super(TestVxlan2, self).setUp() + + # Create 2 pg interfaces. + self.create_pg_interfaces(range(4)) + for pg in self.pg_interfaces: + pg.admin_up() + + # Configure IPv4 addresses on VPP pg0. + self.pg0.config_ip4() + self.pg0.resolve_arp() + + def tearDown(self): + super(TestVxlan2, self).tearDown() + + def test_xconnect(self): + """VXLAN source address not local""" + + # + # test the broken configuration of a VXLAN tunnel whose + # source address is not local ot the box. packets sent + # through the tunnel should be dropped + # + t = VppVxlanTunnel(self, src="10.0.0.5", dst=self.pg0.local_ip4, vni=1000) + t.add_vpp_config() + t.admin_up() + + self.vapi.sw_interface_set_l2_xconnect( + t.sw_if_index, self.pg1.sw_if_index, enable=1 + ) + self.vapi.sw_interface_set_l2_xconnect( + self.pg1.sw_if_index, t.sw_if_index, enable=1 + ) + + p = ( + Ether(src="00:11:22:33:44:55", dst="00:00:00:11:22:33") + / IP(src="4.3.2.1", dst="1.2.3.4") + / UDP(sport=20000, dport=10000) + / Raw(b"\xa5" * 1450) + ) + + rx = self.send_and_assert_no_replies(self.pg1, [p]) + + +class TestVxlanL2Mode(VppTestCase): + """VXLAN Test Case""" + + def setUp(self): + super(TestVxlanL2Mode, self).setUp() + + # Create 2 pg interfaces. + self.create_pg_interfaces(range(2)) + for pg in self.pg_interfaces: + pg.admin_up() + + # Configure IPv4 addresses on VPP pg0. + self.pg0.config_ip4() + self.pg0.resolve_arp() + + # Configure IPv4 addresses on VPP pg1. + self.pg1.config_ip4() + + def tearDown(self): + super(TestVxlanL2Mode, self).tearDown() + + def test_l2_mode(self): + """VXLAN L2 mode""" + t = VppVxlanTunnel( + self, src=self.pg0.local_ip4, dst=self.pg0.remote_ip4, vni=1000, is_l3=False + ) + t.add_vpp_config() + t.config_ip4() + t.admin_up() + + dstIP = t.local_ip4[:-1] + "2" + + # Create a packet to send + p = ( + Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) + / IP(src=self.pg1.local_ip4, dst=dstIP) + / UDP(sport=555, dport=556) + / Raw(b"\x00" * 80) + ) + + # Expect ARP request + rx = self.send_and_expect(self.pg1, [p], self.pg0) + for p in rx: + self.assertEqual(p[Ether].dst, self.pg0.remote_mac) + self.assertEqual(p[Ether].src, self.pg0.local_mac) + self.assertEqual(p[ARP].op, 1) + self.assertEqual(p[ARP].pdst, dstIP) + + # Resolve ARP + VppNeighbor(self, t.sw_if_index, self.pg1.remote_mac, dstIP).add_vpp_config() + + # Send packets + NUM_PKTS = 128 + rx = self.send_and_expect(self.pg1, p * NUM_PKTS, self.pg0) + self.assertEqual(NUM_PKTS, len(rx)) + + +if __name__ == "__main__": unittest.main(testRunner=VppTestRunner)