#!/usr/bin/env python3
-import socket
import unittest
-from framework import VppTestCase, VppTestRunner
+from framework import VppTestCase
+from asfframework import VppTestRunner
from template_bd import BridgeDomain
from scapy.layers.l2 import Ether
-from scapy.layers.inet6 import IPv6, UDP
+from scapy.packet import Raw, bind_layers
+from scapy.layers.inet6 import IP, IPv6, UDP
from scapy.layers.vxlan import VXLAN
-from scapy.utils import atol
+
+import util
from vpp_ip_route import VppIpRoute, VppRoutePath
+from vpp_vxlan_tunnel import VppVxlanTunnel
from vpp_ip import INVALID_INDEX
class TestVxlan6(BridgeDomain, VppTestCase):
- """ VXLAN over IPv6 Test Case """
+ """VXLAN over IPv6 Test Case"""
def __init__(self, *args):
BridgeDomain.__init__(self)
Encapsulate the original payload frame by adding VXLAN header with its
UDP, IP and Ethernet fields
"""
- return (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
- IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) /
- UDP(sport=self.dport, dport=self.dport, chksum=0) /
- VXLAN(vni=vni, flags=self.flags) /
- pkt)
+ return (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6)
+ / UDP(sport=self.dport, dport=self.dport, chksum=0)
+ / VXLAN(vni=vni, flags=self.flags)
+ / pkt
+ )
@classmethod
def ip_range(cls, s, e):
- """ range of remote ip's """
- tmp = cls.pg0.remote_ip6.rsplit(':', 1)[0]
+ """range of remote ip's"""
+ tmp = cls.pg0.remote_ip6.rsplit(":", 1)[0]
return ("%s:%x" % (tmp, i) for i in range(s, e))
def encap_mcast(self, pkt, src_ip, src_mac, vni):
Encapsulate the original payload frame by adding VXLAN header with its
UDP, IP and Ethernet fields
"""
- return (Ether(src=src_mac, dst=self.mcast_mac) /
- IPv6(src=src_ip, dst=self.mcast_ip6) /
- UDP(sport=self.dport, dport=self.dport, chksum=0) /
- VXLAN(vni=vni, flags=self.flags) /
- pkt)
+ return (
+ Ether(src=src_mac, dst=self.mcast_mac)
+ / IPv6(src=src_ip, dst=self.mcast_ip6)
+ / UDP(sport=self.dport, dport=self.dport, chksum=0)
+ / VXLAN(vni=vni, flags=self.flags)
+ / pkt
+ )
def decapsulate(self, pkt):
"""
Decapsulate the original payload frame by removing VXLAN header
"""
# check if is set I flag
- self.assertEqual(pkt[VXLAN].flags, int('0x8', 16))
+ self.assertEqual(pkt[VXLAN].flags, int("0x8", 16))
return pkt[VXLAN].payload
# Method for checking VXLAN encapsulation.
self.assertEqual(pkt[IPv6].dst, type(self).mcast_ip6)
# Verify UDP destination port is VXLAN 4789, source UDP port could be
# arbitrary.
- self.assertEqual(pkt[UDP].dport, type(self).dport)
- # TODO: checksum check
+ self.assertEqual(pkt[UDP].dport, self.dport)
+ # Verify UDP checksum
+ self.assert_udp_checksum_valid(pkt, ignore_zero_checksum=False)
# Verify VNI
self.assertEqual(pkt[VXLAN].vni, vni)
@classmethod
- def create_vxlan_flood_test_bd(cls, vni, n_ucast_tunnels):
+ def create_vxlan_flood_test_bd(cls, vni, n_ucast_tunnels, port):
# Create 10 ucast vxlan tunnels under bd
start = 10
end = start + n_ucast_tunnels
for dest_ip6 in cls.ip_range(start, end):
- dest_ip6n = socket.inet_pton(socket.AF_INET6, dest_ip6)
# add host route so dest ip will not be resolved
- rip = VppIpRoute(cls, dest_ip6, 128,
- [VppRoutePath(cls.pg0.remote_ip6, INVALID_INDEX)],
- register=False)
+ rip = VppIpRoute(
+ cls,
+ dest_ip6,
+ 128,
+ [VppRoutePath(cls.pg0.remote_ip6, INVALID_INDEX)],
+ register=False,
+ )
rip.add_vpp_config()
- r = cls.vapi.vxlan_add_del_tunnel(src_address=cls.pg0.local_ip6n,
- dst_address=dest_ip6n, is_ipv6=1,
- vni=vni)
+ r = VppVxlanTunnel(
+ cls,
+ src=cls.pg0.local_ip6,
+ src_port=port,
+ dst_port=port,
+ dst=dest_ip6,
+ vni=vni,
+ )
+ r.add_vpp_config()
cls.vapi.sw_interface_set_l2_bridge(r.sw_if_index, bd_id=vni)
@classmethod
super(TestVxlan6, cls).setUpClass()
try:
- cls.dport = 4789
cls.flags = 0x8
# Create 2 pg interfaces.
for pg in cls.pg_interfaces:
pg.admin_up()
- # Configure IPv4 addresses on VPP pg0.
+ # Configure IPv6 addresses on VPP pg0.
cls.pg0.config_ip6()
# Resolve MAC address for VPP's IP address on pg0.
cls.pg0.resolve_ndp()
- cls.mcast_ip6 = 'ff0e::1'
- cls.mcast_ip6n = socket.inet_pton(socket.AF_INET6, cls.mcast_ip6)
- cls.mcast_mac = "33:33:00:00:00:%02x" % (1)
-
- # Create VXLAN VTEP on VPP pg0, and put vxlan_tunnel0 and pg1
- # into BD.
- cls.single_tunnel_bd = 1
- r = cls.vapi.vxlan_add_del_tunnel(src_address=cls.pg0.local_ip6n,
- dst_address=cls.pg0.remote_ip6n,
- is_ipv6=1,
- vni=cls.single_tunnel_bd)
- cls.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=r.sw_if_index,
- bd_id=cls.single_tunnel_bd)
- cls.vapi.sw_interface_set_l2_bridge(
- rx_sw_if_index=cls.pg1.sw_if_index, bd_id=cls.single_tunnel_bd)
-
- # Setup vni 2 to test multicast flooding
- cls.n_ucast_tunnels = 10
- cls.mcast_flood_bd = 2
- cls.create_vxlan_flood_test_bd(cls.mcast_flood_bd,
- cls.n_ucast_tunnels)
- r = cls.vapi.vxlan_add_del_tunnel(src_address=cls.pg0.local_ip6n,
- dst_address=cls.mcast_ip6n,
- mcast_sw_if_index=1, is_ipv6=1,
- vni=cls.mcast_flood_bd)
- cls.vapi.sw_interface_set_l2_bridge(rx_sw_if_index=r.sw_if_index,
- bd_id=cls.mcast_flood_bd)
- cls.vapi.sw_interface_set_l2_bridge(
- rx_sw_if_index=cls.pg2.sw_if_index, bd_id=cls.mcast_flood_bd)
-
- # Setup vni 3 to test unicast flooding
- cls.ucast_flood_bd = 3
- cls.create_vxlan_flood_test_bd(cls.ucast_flood_bd,
- cls.n_ucast_tunnels)
- cls.vapi.sw_interface_set_l2_bridge(
- rx_sw_if_index=cls.pg3.sw_if_index, bd_id=cls.ucast_flood_bd)
+ # Our Multicast address
+ cls.mcast_ip6 = "ff0e::1"
+ cls.mcast_mac = util.mcast_ip_to_mac(cls.mcast_ip6)
except Exception:
super(TestVxlan6, cls).tearDownClass()
raise
def tearDownClass(cls):
super(TestVxlan6, cls).tearDownClass()
+ def setUp(self):
+ super(TestVxlan6, self).setUp()
+
+ def createVxLANInterfaces(self, port=4789):
+ # Create VXLAN VTEP on VPP pg0, and put vxlan_tunnel0 and pg1
+ # into BD.
+ self.dport = port
+
+ self.single_tunnel_vni = 0x12345
+ self.single_tunnel_bd = 1
+ r = VppVxlanTunnel(
+ self,
+ src=self.pg0.local_ip6,
+ dst=self.pg0.remote_ip6,
+ src_port=self.dport,
+ dst_port=self.dport,
+ vni=self.single_tunnel_vni,
+ )
+ r.add_vpp_config()
+ self.vapi.sw_interface_set_l2_bridge(
+ rx_sw_if_index=r.sw_if_index, bd_id=self.single_tunnel_bd
+ )
+ self.vapi.sw_interface_set_l2_bridge(
+ rx_sw_if_index=self.pg1.sw_if_index, bd_id=self.single_tunnel_bd
+ )
+
+ # Setup vni 2 to test multicast flooding
+ self.n_ucast_tunnels = 10
+ self.mcast_flood_bd = 2
+ self.create_vxlan_flood_test_bd(
+ self.mcast_flood_bd, self.n_ucast_tunnels, self.dport
+ )
+ r = VppVxlanTunnel(
+ self,
+ src=self.pg0.local_ip6,
+ dst=self.mcast_ip6,
+ src_port=self.dport,
+ dst_port=self.dport,
+ mcast_sw_if_index=1,
+ vni=self.mcast_flood_bd,
+ )
+ r.add_vpp_config()
+ self.vapi.sw_interface_set_l2_bridge(
+ rx_sw_if_index=r.sw_if_index, bd_id=self.mcast_flood_bd
+ )
+ self.vapi.sw_interface_set_l2_bridge(
+ rx_sw_if_index=self.pg2.sw_if_index, bd_id=self.mcast_flood_bd
+ )
+
+ # Setup vni 3 to test unicast flooding
+ self.ucast_flood_bd = 3
+ self.create_vxlan_flood_test_bd(
+ self.ucast_flood_bd, self.n_ucast_tunnels, self.dport
+ )
+ self.vapi.sw_interface_set_l2_bridge(
+ rx_sw_if_index=self.pg3.sw_if_index, bd_id=self.ucast_flood_bd
+ )
+
+ # Set scapy listen custom port for VxLAN
+ bind_layers(UDP, VXLAN, dport=self.dport)
+
# Method to define VPP actions before tear down of the test case.
# Overrides tearDown method in VppTestCase class.
# @param self The object pointer.
self.logger.info(self.vapi.cli("show bridge-domain 3 detail"))
self.logger.info(self.vapi.cli("show vxlan tunnel"))
+ def encap_fragmented_packet(self):
+ frame = (
+ Ether(src="00:00:00:00:00:02", dst="00:00:00:00:00:01")
+ / IP(src="4.3.2.1", dst="1.2.3.4")
+ / UDP(sport=20000, dport=10000)
+ / Raw(b"\xa5" * 1000)
+ )
+
+ frags = util.fragment_rfc791(frame, 400)
+
+ self.pg1.add_stream(frags)
+
+ self.pg0.enable_capture()
+
+ self.pg_start()
+
+ out = self.pg0.get_capture(3)
+
+ payload = []
+ for pkt in out:
+ payload.append(self.decapsulate(pkt))
+ self.check_encapsulation(pkt, self.single_tunnel_vni)
+
+ reassembled = util.reassemble4(payload)
+
+ self.assertEqual(Ether(raw(frame))[IP], reassembled[IP])
+
+ """
+ Tests with default port (4789)
+ """
+
+ def test_decap(self):
+ """Decapsulation test
+ from BridgeDoman
+ """
+ self.createVxLANInterfaces()
+ super(TestVxlan6, self).test_decap()
+
+ def test_encap(self):
+ """Encapsulation test
+ from BridgeDoman
+ """
+ self.createVxLANInterfaces()
+ super(TestVxlan6, self).test_encap()
+
+ def test_encap_fragmented_packet(self):
+ """Encapsulation test send fragments from pg1
+ Verify receipt of encapsulated frames on pg0
+ """
+ self.createVxLANInterfaces()
+ self.encap_fragmented_packet()
+
+ def test_ucast_flood(self):
+ """Unicast flood test
+ from BridgeDoman
+ """
+ self.createVxLANInterfaces()
+ super(TestVxlan6, self).test_ucast_flood()
+
+ def test_mcast_flood(self):
+ """Multicast flood test
+ from BridgeDoman
+ """
+ self.createVxLANInterfaces()
+ super(TestVxlan6, self).test_mcast_flood()
+
+ def test_mcast_rcv(self):
+ """Multicast receive test
+ from BridgeDoman
+ """
+ self.createVxLANInterfaces()
+ super(TestVxlan6, self).test_mcast_rcv()
+
+ """
+ Tests with custom port
+ """
+
+ def test_decap_custom_port(self):
+ """Decapsulation test custom port
+ from BridgeDoman
+ """
+ self.createVxLANInterfaces(1111)
+ super(TestVxlan6, self).test_decap()
+
+ def test_encap_custom_port(self):
+ """Encapsulation test custom port
+ from BridgeDoman
+ """
+ self.createVxLANInterfaces(1111)
+ super(TestVxlan6, self).test_encap()
+
+ def test_ucast_flood_custom_port(self):
+ """Unicast flood test custom port
+ from BridgeDoman
+ """
+ self.createVxLANInterfaces(1111)
+ super(TestVxlan6, self).test_ucast_flood()
+
+ def test_mcast_flood_custom_port(self):
+ """Multicast flood test custom port
+ from BridgeDoman
+ """
+ self.createVxLANInterfaces(1111)
+ super(TestVxlan6, self).test_mcast_flood()
+
+ def test_mcast_rcv_custom_port(self):
+ """Multicast receive test custom port
+ from BridgeDoman
+ """
+ self.createVxLANInterfaces(1111)
+ super(TestVxlan6, self).test_mcast_rcv()
+
-if __name__ == '__main__':
+if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)