-#!/usr/bin/env python
-import random
-import socket
+#!/usr/bin/env python3
import unittest
-from framework import VppTestCase, VppTestRunner
-from vpp_sub_interface import VppSubInterface, VppDot1QSubint
-from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
-from vpp_papi_provider import L2_VTR_OP
+from framework import VppTestCase
+from asfframework import VppTestRunner
+from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathType
+from vpp_l2 import L2_PORT_TYPE
+from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
+from vpp_acl import AclRule, VppAcl, VppAclInterface
from scapy.packet import Raw
-from scapy.layers.l2 import Ether, Dot1Q, ARP
+from scapy.layers.l2 import Ether, Dot1Q
from scapy.layers.inet import IP, UDP
-from util import ppp
+from socket import AF_INET
+from ipaddress import IPv4Network
+
+NUM_PKTS = 67
class TestDVR(VppTestCase):
- """ IPv4 Load-Balancing """
+ """Distributed Virtual Router"""
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestDVR, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestDVR, cls).tearDownClass()
def setUp(self):
super(TestDVR, self).setUp()
self.create_pg_interfaces(range(4))
- self.create_loopback_interfaces(range(1))
+ self.create_loopback_interfaces(1)
for i in self.pg_interfaces:
i.admin_up()
super(TestDVR, self).tearDown()
+ def assert_same_mac_addr(self, tx, rx):
+ t_eth = tx[Ether]
+ for p in rx:
+ r_eth = p[Ether]
+ self.assertEqual(t_eth.src, r_eth.src)
+ self.assertEqual(t_eth.dst, r_eth.dst)
+
+ def assert_has_vlan_tag(self, tag, rx):
+ for p in rx:
+ r_1q = p[Dot1Q]
+ self.assertEqual(tag, r_1q.vlan)
+
+ def assert_has_no_tag(self, rx):
+ for p in rx:
+ self.assertFalse(p.haslayer(Dot1Q))
+
def test_dvr(self):
- """ Distributed Virtual Router """
+ """Distributed Virtual Router"""
#
# A packet destined to an IP address that is L2 bridged via
ip_tag_bridged = "10.10.10.11"
any_src_addr = "1.1.1.1"
- pkt_no_tag = (Ether(src=self.pg0.remote_mac,
- dst=self.loop0.local_mac) /
- IP(src=any_src_addr,
- dst=ip_non_tag_bridged) /
- UDP(sport=1234, dport=1234) /
- Raw('\xa5' * 100))
- pkt_tag = (Ether(src=self.pg0.remote_mac,
- dst=self.loop0.local_mac) /
- IP(src=any_src_addr,
- dst=ip_tag_bridged) /
- UDP(sport=1234, dport=1234) /
- Raw('\xa5' * 100))
+ pkt_no_tag = (
+ Ether(src=self.pg0.remote_mac, dst=self.loop0.local_mac)
+ / IP(src=any_src_addr, dst=ip_non_tag_bridged)
+ / UDP(sport=1234, dport=1234)
+ / Raw(b"\xa5" * 100)
+ )
+ pkt_tag = (
+ Ether(src=self.pg0.remote_mac, dst=self.loop0.local_mac)
+ / IP(src=any_src_addr, dst=ip_tag_bridged)
+ / UDP(sport=1234, dport=1234)
+ / Raw(b"\xa5" * 100)
+ )
#
# Two sub-interfaces so we can test VLAN tag push/pop
#
# Put all the interfaces into a new bridge domain
#
- self.vapi.sw_interface_set_l2_bridge(self.pg0.sw_if_index, 1)
- self.vapi.sw_interface_set_l2_bridge(self.pg1.sw_if_index, 1)
- self.vapi.sw_interface_set_l2_bridge(sub_if_on_pg2.sw_if_index, 1)
- self.vapi.sw_interface_set_l2_bridge(sub_if_on_pg3.sw_if_index, 1)
- self.vapi.sw_interface_set_l2_bridge(self.loop0.sw_if_index, 1, bvi=1)
-
- self.vapi.sw_interface_set_l2_tag_rewrite(sub_if_on_pg2.sw_if_index,
- L2_VTR_OP.L2_POP_1,
- 92)
- self.vapi.sw_interface_set_l2_tag_rewrite(sub_if_on_pg3.sw_if_index,
- L2_VTR_OP.L2_POP_1,
- 93)
+ self.vapi.sw_interface_set_l2_bridge(
+ rx_sw_if_index=self.pg0.sw_if_index, bd_id=1
+ )
+ self.vapi.sw_interface_set_l2_bridge(
+ rx_sw_if_index=self.pg1.sw_if_index, bd_id=1
+ )
+ self.vapi.sw_interface_set_l2_bridge(
+ rx_sw_if_index=sub_if_on_pg2.sw_if_index, bd_id=1
+ )
+ self.vapi.sw_interface_set_l2_bridge(
+ rx_sw_if_index=sub_if_on_pg3.sw_if_index, bd_id=1
+ )
+ self.vapi.sw_interface_set_l2_bridge(
+ rx_sw_if_index=self.loop0.sw_if_index, bd_id=1, port_type=L2_PORT_TYPE.BVI
+ )
+
+ self.vapi.l2_interface_vlan_tag_rewrite(
+ sw_if_index=sub_if_on_pg2.sw_if_index,
+ vtr_op=L2_VTR_OP.L2_POP_1,
+ push_dot1q=92,
+ )
+ self.vapi.l2_interface_vlan_tag_rewrite(
+ sw_if_index=sub_if_on_pg3.sw_if_index,
+ vtr_op=L2_VTR_OP.L2_POP_1,
+ push_dot1q=93,
+ )
#
# Add routes to bridge the traffic via a tagged an nontagged interface
#
route_no_tag = VppIpRoute(
- self, ip_non_tag_bridged, 32,
- [VppRoutePath("0.0.0.0",
- self.pg1.sw_if_index,
- proto=DpoProto.DPO_PROTO_ETHERNET)])
+ self,
+ ip_non_tag_bridged,
+ 32,
+ [
+ VppRoutePath(
+ "0.0.0.0", self.pg1.sw_if_index, type=FibPathType.FIB_PATH_TYPE_DVR
+ )
+ ],
+ )
route_no_tag.add_vpp_config()
#
# Inject the packet that arrives and leaves on a non-tagged interface
# Since it's 'bridged' expect that the MAC headed is unchanged.
#
- self.pg0.add_stream(pkt_no_tag)
-
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
-
- rx = self.pg1.get_capture(1)
-
- self.assertEqual(rx[0][Ether].dst, pkt_no_tag[Ether].dst)
- self.assertEqual(rx[0][Ether].src, pkt_no_tag[Ether].src)
+ rx = self.send_and_expect(self.pg0, pkt_no_tag * NUM_PKTS, self.pg1)
+ self.assert_same_mac_addr(pkt_no_tag, rx)
+ self.assert_has_no_tag(rx)
#
# Add routes to bridge the traffic via a tagged interface
#
- route_no_tag = VppIpRoute(
- self, ip_tag_bridged, 32,
- [VppRoutePath("0.0.0.0",
- sub_if_on_pg3.sw_if_index,
- proto=DpoProto.DPO_PROTO_ETHERNET)])
- route_no_tag.add_vpp_config()
+ route_with_tag = VppIpRoute(
+ self,
+ ip_tag_bridged,
+ 32,
+ [
+ VppRoutePath(
+ "0.0.0.0",
+ sub_if_on_pg3.sw_if_index,
+ type=FibPathType.FIB_PATH_TYPE_DVR,
+ )
+ ],
+ )
+ route_with_tag.add_vpp_config()
#
- # Inject the packet that arrives and leaves on a non-tagged interface
- # Since it's 'bridged' expect that the MAC headed is unchanged.
+ # Inject the packet that arrives non-tag and leaves on a tagged
+ # interface
#
- self.pg0.add_stream(pkt_tag)
+ rx = self.send_and_expect(self.pg0, pkt_tag * NUM_PKTS, self.pg3)
+ self.assert_same_mac_addr(pkt_tag, rx)
+ self.assert_has_vlan_tag(93, rx)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
+ #
+ # Tag to tag
+ #
+ pkt_tag_to_tag = (
+ Ether(src=self.pg2.remote_mac, dst=self.loop0.local_mac)
+ / Dot1Q(vlan=92)
+ / IP(src=any_src_addr, dst=ip_tag_bridged)
+ / UDP(sport=1234, dport=1234)
+ / Raw(b"\xa5" * 100)
+ )
+
+ rx = self.send_and_expect(self.pg2, pkt_tag_to_tag * NUM_PKTS, self.pg3)
+ self.assert_same_mac_addr(pkt_tag_to_tag, rx)
+ self.assert_has_vlan_tag(93, rx)
- rx = self.pg3.get_capture(1)
+ #
+ # Tag to non-Tag
+ #
+ pkt_tag_to_non_tag = (
+ Ether(src=self.pg2.remote_mac, dst=self.loop0.local_mac)
+ / Dot1Q(vlan=92)
+ / IP(src=any_src_addr, dst=ip_non_tag_bridged)
+ / UDP(sport=1234, dport=1234)
+ / Raw(b"\xa5" * 100)
+ )
+
+ rx = self.send_and_expect(self.pg2, pkt_tag_to_non_tag * NUM_PKTS, self.pg1)
+ self.assert_same_mac_addr(pkt_tag_to_tag, rx)
+ self.assert_has_no_tag(rx)
- self.assertEqual(rx[0][Ether].dst, pkt_tag[Ether].dst)
- self.assertEqual(rx[0][Ether].src, pkt_tag[Ether].src)
- self.assertEqual(rx[0][Dot1Q].vlan, 93)
+ #
+ # Add an output L3 ACL that will block the traffic
+ #
+ rule_1 = AclRule(
+ is_permit=0,
+ proto=17,
+ ports=1234,
+ src_prefix=IPv4Network((any_src_addr, 32)),
+ dst_prefix=IPv4Network((ip_non_tag_bridged, 32)),
+ )
+ acl = VppAcl(self, rules=[rule_1])
+ acl.add_vpp_config()
#
- # Tag to tag
+ # Apply the ACL on the output interface
#
- pkt_tag_to_tag = (Ether(src=self.pg2.remote_mac,
- dst=self.loop0.local_mac) /
- Dot1Q(vlan=92) /
- IP(src=any_src_addr,
- dst=ip_tag_bridged) /
- UDP(sport=1234, dport=1234) /
- Raw('\xa5' * 100))
+ acl_if1 = VppAclInterface(
+ self, sw_if_index=self.pg1.sw_if_index, n_input=0, acls=[acl]
+ )
+ acl_if1.add_vpp_config()
- self.pg2.add_stream(pkt_tag_to_tag)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- rx = self.pg3.get_capture(1)
+ #
+ # Send packet's that should match the ACL and be dropped
+ #
+ rx = self.send_and_assert_no_replies(self.pg2, pkt_tag_to_non_tag * NUM_PKTS)
- self.assertEqual(rx[0][Ether].dst, pkt_tag_to_tag[Ether].dst)
- self.assertEqual(rx[0][Ether].src, pkt_tag_to_tag[Ether].src)
- self.assertEqual(rx[0][Dot1Q].vlan, 93)
+ #
+ # cleanup
+ #
+ acl_if1.remove_vpp_config()
+ acl.remove_vpp_config()
+
+ self.vapi.sw_interface_set_l2_bridge(
+ rx_sw_if_index=self.pg0.sw_if_index, bd_id=1, enable=0
+ )
+ self.vapi.sw_interface_set_l2_bridge(
+ rx_sw_if_index=self.pg1.sw_if_index, bd_id=1, enable=0
+ )
+ self.vapi.sw_interface_set_l2_bridge(
+ rx_sw_if_index=sub_if_on_pg2.sw_if_index, bd_id=1, enable=0
+ )
+ self.vapi.sw_interface_set_l2_bridge(
+ rx_sw_if_index=sub_if_on_pg3.sw_if_index, bd_id=1, enable=0
+ )
+ self.vapi.sw_interface_set_l2_bridge(
+ rx_sw_if_index=self.loop0.sw_if_index,
+ bd_id=1,
+ port_type=L2_PORT_TYPE.BVI,
+ enable=0,
+ )
#
- # Tag to non-Tag
+ # Do a FIB dump to make sure the paths are correctly reported as DVR
#
- pkt_tag_to_non_tag = (Ether(src=self.pg2.remote_mac,
- dst=self.loop0.local_mac) /
- Dot1Q(vlan=92) /
- IP(src=any_src_addr,
- dst=ip_non_tag_bridged) /
- UDP(sport=1234, dport=1234) /
- Raw('\xa5' * 100))
+ routes = self.vapi.ip_route_dump(0)
+
+ for r in routes:
+ if ip_tag_bridged == str(r.route.prefix.network_address):
+ self.assertEqual(
+ r.route.paths[0].sw_if_index, sub_if_on_pg3.sw_if_index
+ )
+ self.assertEqual(r.route.paths[0].type, FibPathType.FIB_PATH_TYPE_DVR)
+ if ip_non_tag_bridged == str(r.route.prefix.network_address):
+ self.assertEqual(r.route.paths[0].sw_if_index, self.pg1.sw_if_index)
+ self.assertEqual(r.route.paths[0].type, FibPathType.FIB_PATH_TYPE_DVR)
- self.pg2.add_stream(pkt_tag_to_non_tag)
- self.pg_enable_capture(self.pg_interfaces)
- self.pg_start()
- rx = self.pg1.get_capture(1)
+ #
+ # the explicit route delete is require so it happens before
+ # the sbu-interface delete. subinterface delete is required
+ # because that object type does not use the object registry
+ #
+ route_no_tag.remove_vpp_config()
+ route_with_tag.remove_vpp_config()
+ sub_if_on_pg3.remove_vpp_config()
+ sub_if_on_pg2.remove_vpp_config()
- self.assertEqual(rx[0][Ether].dst, pkt_tag_to_tag[Ether].dst)
- self.assertEqual(rx[0][Ether].src, pkt_tag_to_tag[Ether].src)
- self.assertFalse(rx[0].haslayer(Dot1Q))
-if __name__ == '__main__':
+if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)