4 from framework import VppTestCase
5 from asfframework import VppTestRunner
6 from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathType
7 from vpp_l2 import L2_PORT_TYPE
8 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
9 from vpp_acl import AclRule, VppAcl, VppAclInterface
11 from scapy.packet import Raw
12 from scapy.layers.l2 import Ether, Dot1Q
13 from scapy.layers.inet import IP, UDP
14 from socket import AF_INET
15 from ipaddress import IPv4Network
20 class TestDVR(VppTestCase):
21 """Distributed Virtual Router"""
25 super(TestDVR, cls).setUpClass()
28 def tearDownClass(cls):
29 super(TestDVR, cls).tearDownClass()
32 super(TestDVR, self).setUp()
34 self.create_pg_interfaces(range(4))
35 self.create_loopback_interfaces(1)
37 for i in self.pg_interfaces:
40 self.loop0.config_ip4()
43 for i in self.pg_interfaces:
45 self.loop0.unconfig_ip4()
47 super(TestDVR, self).tearDown()
49 def assert_same_mac_addr(self, tx, rx):
53 self.assertEqual(t_eth.src, r_eth.src)
54 self.assertEqual(t_eth.dst, r_eth.dst)
56 def assert_has_vlan_tag(self, tag, rx):
59 self.assertEqual(tag, r_1q.vlan)
61 def assert_has_no_tag(self, rx):
63 self.assertFalse(p.haslayer(Dot1Q))
66 """Distributed Virtual Router"""
69 # A packet destined to an IP address that is L2 bridged via
72 ip_non_tag_bridged = "10.10.10.10"
73 ip_tag_bridged = "10.10.10.11"
74 any_src_addr = "1.1.1.1"
77 Ether(src=self.pg0.remote_mac, dst=self.loop0.local_mac)
78 / IP(src=any_src_addr, dst=ip_non_tag_bridged)
79 / UDP(sport=1234, dport=1234)
83 Ether(src=self.pg0.remote_mac, dst=self.loop0.local_mac)
84 / IP(src=any_src_addr, dst=ip_tag_bridged)
85 / UDP(sport=1234, dport=1234)
90 # Two sub-interfaces so we can test VLAN tag push/pop
92 sub_if_on_pg2 = VppDot1QSubint(self, self.pg2, 92)
93 sub_if_on_pg3 = VppDot1QSubint(self, self.pg3, 93)
94 sub_if_on_pg2.admin_up()
95 sub_if_on_pg3.admin_up()
98 # Put all the interfaces into a new bridge domain
100 self.vapi.sw_interface_set_l2_bridge(
101 rx_sw_if_index=self.pg0.sw_if_index, bd_id=1
103 self.vapi.sw_interface_set_l2_bridge(
104 rx_sw_if_index=self.pg1.sw_if_index, bd_id=1
106 self.vapi.sw_interface_set_l2_bridge(
107 rx_sw_if_index=sub_if_on_pg2.sw_if_index, bd_id=1
109 self.vapi.sw_interface_set_l2_bridge(
110 rx_sw_if_index=sub_if_on_pg3.sw_if_index, bd_id=1
112 self.vapi.sw_interface_set_l2_bridge(
113 rx_sw_if_index=self.loop0.sw_if_index, bd_id=1, port_type=L2_PORT_TYPE.BVI
116 self.vapi.l2_interface_vlan_tag_rewrite(
117 sw_if_index=sub_if_on_pg2.sw_if_index,
118 vtr_op=L2_VTR_OP.L2_POP_1,
121 self.vapi.l2_interface_vlan_tag_rewrite(
122 sw_if_index=sub_if_on_pg3.sw_if_index,
123 vtr_op=L2_VTR_OP.L2_POP_1,
128 # Add routes to bridge the traffic via a tagged an nontagged interface
130 route_no_tag = VppIpRoute(
136 "0.0.0.0", self.pg1.sw_if_index, type=FibPathType.FIB_PATH_TYPE_DVR
140 route_no_tag.add_vpp_config()
143 # Inject the packet that arrives and leaves on a non-tagged interface
144 # Since it's 'bridged' expect that the MAC headed is unchanged.
146 rx = self.send_and_expect(self.pg0, pkt_no_tag * NUM_PKTS, self.pg1)
147 self.assert_same_mac_addr(pkt_no_tag, rx)
148 self.assert_has_no_tag(rx)
151 # Add routes to bridge the traffic via a tagged interface
153 route_with_tag = VppIpRoute(
160 sub_if_on_pg3.sw_if_index,
161 type=FibPathType.FIB_PATH_TYPE_DVR,
165 route_with_tag.add_vpp_config()
168 # Inject the packet that arrives non-tag and leaves on a tagged
171 rx = self.send_and_expect(self.pg0, pkt_tag * NUM_PKTS, self.pg3)
172 self.assert_same_mac_addr(pkt_tag, rx)
173 self.assert_has_vlan_tag(93, rx)
179 Ether(src=self.pg2.remote_mac, dst=self.loop0.local_mac)
181 / IP(src=any_src_addr, dst=ip_tag_bridged)
182 / UDP(sport=1234, dport=1234)
186 rx = self.send_and_expect(self.pg2, pkt_tag_to_tag * NUM_PKTS, self.pg3)
187 self.assert_same_mac_addr(pkt_tag_to_tag, rx)
188 self.assert_has_vlan_tag(93, rx)
193 pkt_tag_to_non_tag = (
194 Ether(src=self.pg2.remote_mac, dst=self.loop0.local_mac)
196 / IP(src=any_src_addr, dst=ip_non_tag_bridged)
197 / UDP(sport=1234, dport=1234)
201 rx = self.send_and_expect(self.pg2, pkt_tag_to_non_tag * NUM_PKTS, self.pg1)
202 self.assert_same_mac_addr(pkt_tag_to_tag, rx)
203 self.assert_has_no_tag(rx)
206 # Add an output L3 ACL that will block the traffic
212 src_prefix=IPv4Network((any_src_addr, 32)),
213 dst_prefix=IPv4Network((ip_non_tag_bridged, 32)),
215 acl = VppAcl(self, rules=[rule_1])
219 # Apply the ACL on the output interface
221 acl_if1 = VppAclInterface(
222 self, sw_if_index=self.pg1.sw_if_index, n_input=0, acls=[acl]
224 acl_if1.add_vpp_config()
227 # Send packet's that should match the ACL and be dropped
229 rx = self.send_and_assert_no_replies(self.pg2, pkt_tag_to_non_tag * NUM_PKTS)
234 acl_if1.remove_vpp_config()
235 acl.remove_vpp_config()
237 self.vapi.sw_interface_set_l2_bridge(
238 rx_sw_if_index=self.pg0.sw_if_index, bd_id=1, enable=0
240 self.vapi.sw_interface_set_l2_bridge(
241 rx_sw_if_index=self.pg1.sw_if_index, bd_id=1, enable=0
243 self.vapi.sw_interface_set_l2_bridge(
244 rx_sw_if_index=sub_if_on_pg2.sw_if_index, bd_id=1, enable=0
246 self.vapi.sw_interface_set_l2_bridge(
247 rx_sw_if_index=sub_if_on_pg3.sw_if_index, bd_id=1, enable=0
249 self.vapi.sw_interface_set_l2_bridge(
250 rx_sw_if_index=self.loop0.sw_if_index,
252 port_type=L2_PORT_TYPE.BVI,
257 # Do a FIB dump to make sure the paths are correctly reported as DVR
259 routes = self.vapi.ip_route_dump(0)
262 if ip_tag_bridged == str(r.route.prefix.network_address):
264 r.route.paths[0].sw_if_index, sub_if_on_pg3.sw_if_index
266 self.assertEqual(r.route.paths[0].type, FibPathType.FIB_PATH_TYPE_DVR)
267 if ip_non_tag_bridged == str(r.route.prefix.network_address):
268 self.assertEqual(r.route.paths[0].sw_if_index, self.pg1.sw_if_index)
269 self.assertEqual(r.route.paths[0].type, FibPathType.FIB_PATH_TYPE_DVR)
272 # the explicit route delete is require so it happens before
273 # the sbu-interface delete. subinterface delete is required
274 # because that object type does not use the object registry
276 route_no_tag.remove_vpp_config()
277 route_with_tag.remove_vpp_config()
278 sub_if_on_pg3.remove_vpp_config()
279 sub_if_on_pg2.remove_vpp_config()
282 if __name__ == "__main__":
283 unittest.main(testRunner=VppTestRunner)