4 from framework import VppTestCase, VppTestRunner
5 from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathType
6 from vpp_l2 import L2_PORT_TYPE
7 from vpp_sub_interface import L2_VTR_OP, VppDot1QSubint
8 from vpp_acl import AclRule, VppAcl, VppAclInterface
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether, Dot1Q
12 from scapy.layers.inet import IP, UDP
13 from socket import AF_INET, inet_pton
14 from ipaddress import IPv4Network
19 class TestDVR(VppTestCase):
20 """Distributed Virtual Router"""
24 super(TestDVR, cls).setUpClass()
27 def tearDownClass(cls):
28 super(TestDVR, cls).tearDownClass()
31 super(TestDVR, self).setUp()
33 self.create_pg_interfaces(range(4))
34 self.create_loopback_interfaces(1)
36 for i in self.pg_interfaces:
39 self.loop0.config_ip4()
42 for i in self.pg_interfaces:
44 self.loop0.unconfig_ip4()
46 super(TestDVR, self).tearDown()
48 def assert_same_mac_addr(self, tx, rx):
52 self.assertEqual(t_eth.src, r_eth.src)
53 self.assertEqual(t_eth.dst, r_eth.dst)
55 def assert_has_vlan_tag(self, tag, rx):
58 self.assertEqual(tag, r_1q.vlan)
60 def assert_has_no_tag(self, rx):
62 self.assertFalse(p.haslayer(Dot1Q))
65 """Distributed Virtual Router"""
68 # A packet destined to an IP address that is L2 bridged via
71 ip_non_tag_bridged = "10.10.10.10"
72 ip_tag_bridged = "10.10.10.11"
73 any_src_addr = "1.1.1.1"
76 Ether(src=self.pg0.remote_mac, dst=self.loop0.local_mac)
77 / IP(src=any_src_addr, dst=ip_non_tag_bridged)
78 / UDP(sport=1234, dport=1234)
82 Ether(src=self.pg0.remote_mac, dst=self.loop0.local_mac)
83 / IP(src=any_src_addr, dst=ip_tag_bridged)
84 / UDP(sport=1234, dport=1234)
89 # Two sub-interfaces so we can test VLAN tag push/pop
91 sub_if_on_pg2 = VppDot1QSubint(self, self.pg2, 92)
92 sub_if_on_pg3 = VppDot1QSubint(self, self.pg3, 93)
93 sub_if_on_pg2.admin_up()
94 sub_if_on_pg3.admin_up()
97 # Put all the interfaces into a new bridge domain
99 self.vapi.sw_interface_set_l2_bridge(
100 rx_sw_if_index=self.pg0.sw_if_index, bd_id=1
102 self.vapi.sw_interface_set_l2_bridge(
103 rx_sw_if_index=self.pg1.sw_if_index, bd_id=1
105 self.vapi.sw_interface_set_l2_bridge(
106 rx_sw_if_index=sub_if_on_pg2.sw_if_index, bd_id=1
108 self.vapi.sw_interface_set_l2_bridge(
109 rx_sw_if_index=sub_if_on_pg3.sw_if_index, bd_id=1
111 self.vapi.sw_interface_set_l2_bridge(
112 rx_sw_if_index=self.loop0.sw_if_index, bd_id=1, port_type=L2_PORT_TYPE.BVI
115 self.vapi.l2_interface_vlan_tag_rewrite(
116 sw_if_index=sub_if_on_pg2.sw_if_index,
117 vtr_op=L2_VTR_OP.L2_POP_1,
120 self.vapi.l2_interface_vlan_tag_rewrite(
121 sw_if_index=sub_if_on_pg3.sw_if_index,
122 vtr_op=L2_VTR_OP.L2_POP_1,
127 # Add routes to bridge the traffic via a tagged an nontagged interface
129 route_no_tag = VppIpRoute(
135 "0.0.0.0", self.pg1.sw_if_index, type=FibPathType.FIB_PATH_TYPE_DVR
139 route_no_tag.add_vpp_config()
142 # Inject the packet that arrives and leaves on a non-tagged interface
143 # Since it's 'bridged' expect that the MAC headed is unchanged.
145 rx = self.send_and_expect(self.pg0, pkt_no_tag * NUM_PKTS, self.pg1)
146 self.assert_same_mac_addr(pkt_no_tag, rx)
147 self.assert_has_no_tag(rx)
150 # Add routes to bridge the traffic via a tagged interface
152 route_with_tag = VppIpRoute(
159 sub_if_on_pg3.sw_if_index,
160 type=FibPathType.FIB_PATH_TYPE_DVR,
164 route_with_tag.add_vpp_config()
167 # Inject the packet that arrives non-tag and leaves on a tagged
170 rx = self.send_and_expect(self.pg0, pkt_tag * NUM_PKTS, self.pg3)
171 self.assert_same_mac_addr(pkt_tag, rx)
172 self.assert_has_vlan_tag(93, rx)
178 Ether(src=self.pg2.remote_mac, dst=self.loop0.local_mac)
180 / IP(src=any_src_addr, dst=ip_tag_bridged)
181 / UDP(sport=1234, dport=1234)
185 rx = self.send_and_expect(self.pg2, pkt_tag_to_tag * NUM_PKTS, self.pg3)
186 self.assert_same_mac_addr(pkt_tag_to_tag, rx)
187 self.assert_has_vlan_tag(93, rx)
192 pkt_tag_to_non_tag = (
193 Ether(src=self.pg2.remote_mac, dst=self.loop0.local_mac)
195 / IP(src=any_src_addr, dst=ip_non_tag_bridged)
196 / UDP(sport=1234, dport=1234)
200 rx = self.send_and_expect(self.pg2, pkt_tag_to_non_tag * NUM_PKTS, self.pg1)
201 self.assert_same_mac_addr(pkt_tag_to_tag, rx)
202 self.assert_has_no_tag(rx)
205 # Add an output L3 ACL that will block the traffic
211 src_prefix=IPv4Network((any_src_addr, 32)),
212 dst_prefix=IPv4Network((ip_non_tag_bridged, 32)),
214 acl = VppAcl(self, rules=[rule_1])
218 # Apply the ACL on the output interface
220 acl_if1 = VppAclInterface(
221 self, sw_if_index=self.pg1.sw_if_index, n_input=0, acls=[acl]
223 acl_if1.add_vpp_config()
226 # Send packet's that should match the ACL and be dropped
228 rx = self.send_and_assert_no_replies(self.pg2, pkt_tag_to_non_tag * NUM_PKTS)
233 acl_if1.remove_vpp_config()
234 acl.remove_vpp_config()
236 self.vapi.sw_interface_set_l2_bridge(
237 rx_sw_if_index=self.pg0.sw_if_index, bd_id=1, enable=0
239 self.vapi.sw_interface_set_l2_bridge(
240 rx_sw_if_index=self.pg1.sw_if_index, bd_id=1, enable=0
242 self.vapi.sw_interface_set_l2_bridge(
243 rx_sw_if_index=sub_if_on_pg2.sw_if_index, bd_id=1, enable=0
245 self.vapi.sw_interface_set_l2_bridge(
246 rx_sw_if_index=sub_if_on_pg3.sw_if_index, bd_id=1, enable=0
248 self.vapi.sw_interface_set_l2_bridge(
249 rx_sw_if_index=self.loop0.sw_if_index,
251 port_type=L2_PORT_TYPE.BVI,
256 # Do a FIB dump to make sure the paths are correctly reported as DVR
258 routes = self.vapi.ip_route_dump(0)
261 if ip_tag_bridged == str(r.route.prefix.network_address):
263 r.route.paths[0].sw_if_index, sub_if_on_pg3.sw_if_index
265 self.assertEqual(r.route.paths[0].type, FibPathType.FIB_PATH_TYPE_DVR)
266 if ip_non_tag_bridged == str(r.route.prefix.network_address):
267 self.assertEqual(r.route.paths[0].sw_if_index, self.pg1.sw_if_index)
268 self.assertEqual(r.route.paths[0].type, FibPathType.FIB_PATH_TYPE_DVR)
271 # the explicit route delete is require so it happens before
272 # the sbu-interface delete. subinterface delete is required
273 # because that object type does not use the object registry
275 route_no_tag.remove_vpp_config()
276 route_with_tag.remove_vpp_config()
277 sub_if_on_pg3.remove_vpp_config()
278 sub_if_on_pg2.remove_vpp_config()
281 if __name__ == "__main__":
282 unittest.main(testRunner=VppTestRunner)