5 from framework import VppTestCase, VppTestRunner
6 from template_bd import BridgeDomain
8 from scapy.layers.l2 import Ether
9 from scapy.packet import Raw, bind_layers
10 from scapy.layers.inet6 import IP, IPv6, UDP
11 from scapy.layers.vxlan import VXLAN
14 from vpp_ip_route import VppIpRoute, VppRoutePath
15 from vpp_vxlan_tunnel import VppVxlanTunnel
16 from vpp_ip import INVALID_INDEX
19 class TestVxlan6(BridgeDomain, VppTestCase):
20 """VXLAN over IPv6 Test Case"""
22 def __init__(self, *args):
23 BridgeDomain.__init__(self)
24 VppTestCase.__init__(self, *args)
26 def encapsulate(self, pkt, vni):
28 Encapsulate the original payload frame by adding VXLAN header with its
29 UDP, IP and Ethernet fields
32 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
33 / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
34 / UDP(sport=self.dport, dport=self.dport, chksum=0)
35 / VXLAN(vni=vni, flags=self.flags)
40 def ip_range(cls, s, e):
41 """range of remote ip's"""
42 tmp = cls.pg0.remote_ip6.rsplit(":", 1)[0]
43 return ("%s:%x" % (tmp, i) for i in range(s, e))
45 def encap_mcast(self, pkt, src_ip, src_mac, vni):
47 Encapsulate the original payload frame by adding VXLAN header with its
48 UDP, IP and Ethernet fields
51 Ether(src=src_mac, dst=self.mcast_mac)
52 / IPv6(src=src_ip, dst=self.mcast_ip6)
53 / UDP(sport=self.dport, dport=self.dport, chksum=0)
54 / VXLAN(vni=vni, flags=self.flags)
58 def decapsulate(self, pkt):
60 Decapsulate the original payload frame by removing VXLAN header
62 # check if is set I flag
63 self.assertEqual(pkt[VXLAN].flags, int("0x8", 16))
64 return pkt[VXLAN].payload
66 # Method for checking VXLAN encapsulation.
68 def check_encapsulation(self, pkt, vni, local_only=False, mcast_pkt=False):
69 # TODO: add error messages
70 # Verify source MAC is VPP_MAC and destination MAC is MY_MAC resolved
72 self.assertEqual(pkt[Ether].src, self.pg0.local_mac)
75 self.assertEqual(pkt[Ether].dst, self.pg0.remote_mac)
77 self.assertEqual(pkt[Ether].dst, type(self).mcast_mac)
78 # Verify VXLAN tunnel source IP is VPP_IP and destination IP is MY_IP.
79 self.assertEqual(pkt[IPv6].src, self.pg0.local_ip6)
82 self.assertEqual(pkt[IPv6].dst, self.pg0.remote_ip6)
84 self.assertEqual(pkt[IPv6].dst, type(self).mcast_ip6)
85 # Verify UDP destination port is VXLAN 4789, source UDP port could be
87 self.assertEqual(pkt[UDP].dport, self.dport)
89 self.assert_udp_checksum_valid(pkt, ignore_zero_checksum=False)
91 self.assertEqual(pkt[VXLAN].vni, vni)
94 def create_vxlan_flood_test_bd(cls, vni, n_ucast_tunnels, port):
95 # Create 10 ucast vxlan tunnels under bd
97 end = start + n_ucast_tunnels
98 for dest_ip6 in cls.ip_range(start, end):
99 # add host route so dest ip will not be resolved
104 [VppRoutePath(cls.pg0.remote_ip6, INVALID_INDEX)],
110 src=cls.pg0.local_ip6,
117 cls.vapi.sw_interface_set_l2_bridge(r.sw_if_index, bd_id=vni)
120 def add_mcast_tunnels_load(cls):
121 cls.add_del_mcast_tunnels_load(is_add=1)
124 def del_mcast_tunnels_load(cls):
125 cls.add_del_mcast_tunnels_load(is_add=0)
127 # Class method to start the VXLAN test case.
128 # Overrides setUpClass method in VppTestCase class.
129 # Python try..except statement is used to ensure that the tear down of
130 # the class will be executed even if exception is raised.
131 # @param cls The class pointer.
134 super(TestVxlan6, cls).setUpClass()
139 # Create 2 pg interfaces.
140 cls.create_pg_interfaces(range(4))
141 for pg in cls.pg_interfaces:
144 # Configure IPv6 addresses on VPP pg0.
147 # Resolve MAC address for VPP's IP address on pg0.
148 cls.pg0.resolve_ndp()
150 # Our Multicast address
151 cls.mcast_ip6 = "ff0e::1"
152 cls.mcast_mac = util.mcast_ip_to_mac(cls.mcast_ip6)
154 super(TestVxlan6, cls).tearDownClass()
158 def tearDownClass(cls):
159 super(TestVxlan6, cls).tearDownClass()
162 super(TestVxlan6, self).setUp()
164 def createVxLANInterfaces(self, port=4789):
165 # Create VXLAN VTEP on VPP pg0, and put vxlan_tunnel0 and pg1
169 self.single_tunnel_vni = 0x12345
170 self.single_tunnel_bd = 1
173 src=self.pg0.local_ip6,
174 dst=self.pg0.remote_ip6,
177 vni=self.single_tunnel_vni,
180 self.vapi.sw_interface_set_l2_bridge(
181 rx_sw_if_index=r.sw_if_index, bd_id=self.single_tunnel_bd
183 self.vapi.sw_interface_set_l2_bridge(
184 rx_sw_if_index=self.pg1.sw_if_index, bd_id=self.single_tunnel_bd
187 # Setup vni 2 to test multicast flooding
188 self.n_ucast_tunnels = 10
189 self.mcast_flood_bd = 2
190 self.create_vxlan_flood_test_bd(
191 self.mcast_flood_bd, self.n_ucast_tunnels, self.dport
195 src=self.pg0.local_ip6,
200 vni=self.mcast_flood_bd,
203 self.vapi.sw_interface_set_l2_bridge(
204 rx_sw_if_index=r.sw_if_index, bd_id=self.mcast_flood_bd
206 self.vapi.sw_interface_set_l2_bridge(
207 rx_sw_if_index=self.pg2.sw_if_index, bd_id=self.mcast_flood_bd
210 # Setup vni 3 to test unicast flooding
211 self.ucast_flood_bd = 3
212 self.create_vxlan_flood_test_bd(
213 self.ucast_flood_bd, self.n_ucast_tunnels, self.dport
215 self.vapi.sw_interface_set_l2_bridge(
216 rx_sw_if_index=self.pg3.sw_if_index, bd_id=self.ucast_flood_bd
219 # Set scapy listen custom port for VxLAN
220 bind_layers(UDP, VXLAN, dport=self.dport)
222 # Method to define VPP actions before tear down of the test case.
223 # Overrides tearDown method in VppTestCase class.
224 # @param self The object pointer.
226 super(TestVxlan6, self).tearDown()
228 def show_commands_at_teardown(self):
229 self.logger.info(self.vapi.cli("show bridge-domain 1 detail"))
230 self.logger.info(self.vapi.cli("show bridge-domain 2 detail"))
231 self.logger.info(self.vapi.cli("show bridge-domain 3 detail"))
232 self.logger.info(self.vapi.cli("show vxlan tunnel"))
234 def encap_fragmented_packet(self):
236 Ether(src="00:00:00:00:00:02", dst="00:00:00:00:00:01")
237 / IP(src="4.3.2.1", dst="1.2.3.4")
238 / UDP(sport=20000, dport=10000)
239 / Raw(b"\xa5" * 1000)
242 frags = util.fragment_rfc791(frame, 400)
244 self.pg1.add_stream(frags)
246 self.pg0.enable_capture()
250 out = self.pg0.get_capture(3)
254 payload.append(self.decapsulate(pkt))
255 self.check_encapsulation(pkt, self.single_tunnel_vni)
257 reassembled = util.reassemble4(payload)
259 self.assertEqual(Ether(raw(frame))[IP], reassembled[IP])
262 Tests with default port (4789)
265 def test_decap(self):
266 """Decapsulation test
269 self.createVxLANInterfaces()
270 super(TestVxlan6, self).test_decap()
272 def test_encap(self):
273 """Encapsulation test
276 self.createVxLANInterfaces()
277 super(TestVxlan6, self).test_encap()
279 def test_encap_fragmented_packet(self):
280 """Encapsulation test send fragments from pg1
281 Verify receipt of encapsulated frames on pg0
283 self.createVxLANInterfaces()
284 self.encap_fragmented_packet()
286 def test_ucast_flood(self):
287 """Unicast flood test
290 self.createVxLANInterfaces()
291 super(TestVxlan6, self).test_ucast_flood()
293 def test_mcast_flood(self):
294 """Multicast flood test
297 self.createVxLANInterfaces()
298 super(TestVxlan6, self).test_mcast_flood()
300 def test_mcast_rcv(self):
301 """Multicast receive test
304 self.createVxLANInterfaces()
305 super(TestVxlan6, self).test_mcast_rcv()
308 Tests with custom port
311 def test_decap_custom_port(self):
312 """Decapsulation test custom port
315 self.createVxLANInterfaces(1111)
316 super(TestVxlan6, self).test_decap()
318 def test_encap_custom_port(self):
319 """Encapsulation test custom port
322 self.createVxLANInterfaces(1111)
323 super(TestVxlan6, self).test_encap()
325 def test_ucast_flood_custom_port(self):
326 """Unicast flood test custom port
329 self.createVxLANInterfaces(1111)
330 super(TestVxlan6, self).test_ucast_flood()
332 def test_mcast_flood_custom_port(self):
333 """Multicast flood test custom port
336 self.createVxLANInterfaces(1111)
337 super(TestVxlan6, self).test_mcast_flood()
339 def test_mcast_rcv_custom_port(self):
340 """Multicast receive test custom port
343 self.createVxLANInterfaces(1111)
344 super(TestVxlan6, self).test_mcast_rcv()
347 if __name__ == "__main__":
348 unittest.main(testRunner=VppTestRunner)