+ @classmethod
+ def tearDownClass(cls):
+ super(TestVxlan, cls).tearDownClass()
+
+ def setUp(self):
+ super(TestVxlan, self).setUp()
+ # Create VXLAN VTEP on VPP pg0, and put vxlan_tunnel0 and pg1
+ # into BD.
+ self.single_tunnel_bd = 1
+ r = VppVxlanTunnel(self, src=self.pg0.local_ip4,
+ dst=self.pg0.remote_ip4, vni=self.single_tunnel_bd)
+ r.add_vpp_config()
+ self.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=r.sw_if_index,
+ bd_id=self.single_tunnel_bd)
+ self.vapi.sw_interface_set_l2_bridge(
+ rx_sw_if_index=self.pg1.sw_if_index, bd_id=self.single_tunnel_bd)
+
+ # Setup vni 2 to test multicast flooding
+ self.n_ucast_tunnels = 10
+ self.mcast_flood_bd = 2
+ self.create_vxlan_flood_test_bd(self.mcast_flood_bd,
+ self.n_ucast_tunnels)
+ r = VppVxlanTunnel(self, src=self.pg0.local_ip4, dst=self.mcast_ip4,
+ mcast_sw_if_index=1, vni=self.mcast_flood_bd)
+ r.add_vpp_config()
+ self.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=r.sw_if_index,
+ bd_id=self.mcast_flood_bd)
+ self.vapi.sw_interface_set_l2_bridge(
+ rx_sw_if_index=self.pg2.sw_if_index, bd_id=self.mcast_flood_bd)
+
+ # Add and delete mcast tunnels to check stability
+ self.add_shared_mcast_dst_load()
+ self.add_mcast_tunnels_load()
+ self.del_shared_mcast_dst_load()
+ self.del_mcast_tunnels_load()
+
+ # Setup vni 3 to test unicast flooding
+ self.ucast_flood_bd = 3
+ self.create_vxlan_flood_test_bd(self.ucast_flood_bd,
+ self.n_ucast_tunnels)
+ self.vapi.sw_interface_set_l2_bridge(
+ rx_sw_if_index=self.pg3.sw_if_index, bd_id=self.ucast_flood_bd)
+
+ def test_decap(self):
+ """ Decapsulation test
+ from BridgeDoman
+ """
+ super(TestVxlan, self).test_decap()
+
+ def test_encap_big_packet(self):
+ """ Encapsulation test send big frame from pg1
+ Verify receipt of encapsulated frames on pg0
+ """
+
+ self.vapi.sw_interface_set_mtu(self.pg0.sw_if_index, [1500, 0, 0, 0])
+
+ frame = (Ether(src='00:00:00:00:00:02', dst='00:00:00:00:00:01') /
+ IP(src='4.3.2.1', dst='1.2.3.4') /
+ UDP(sport=20000, dport=10000) /
+ Raw(b'\xa5' * 1450))
+
+ self.pg1.add_stream([frame])
+
+ self.pg0.enable_capture()
+
+ self.pg_start()
+
+ # Pick first received frame and check if it's correctly encapsulated.
+ out = self.pg0.get_capture(2)
+ ether = out[0]
+ pkt = reassemble4(out)
+ pkt = ether / pkt
+ self.check_encapsulation(pkt, self.single_tunnel_bd)
+
+ payload = self.decapsulate(pkt)
+ # TODO: Scapy bug?
+ # self.assert_eq_pkts(payload, frame)
+