self.logger.info(self.vapi.cli("show classify table verbose"))
self.logger.info(self.vapi.cli("show ip fib"))
- def config_pbr_fib_entry(self, intf):
+ def config_pbr_fib_entry(self, intf, is_add=1):
"""Configure fib entry to route traffic toward PBR VRF table
:param VppInterface intf: destination interface to be routed for PBR.
self.vapi.ip_add_del_route(intf.local_ip4n,
addr_len,
intf.remote_ip4n,
- table_id=self.pbr_vrfid)
+ table_id=self.pbr_vrfid,
+ is_add=is_add)
def create_stream(self, src_if, dst_if, packet_sizes):
"""Create input packet stream for defined interfaces.
payload_info = self.payload_to_info(str(packet[Raw]))
packet_index = payload_info.index
self.assertEqual(payload_info.dst, dst_sw_if_index)
- self.logger.debug("Got packet on port %s: src=%u (id=%u)" %
- (dst_if.name, payload_info.src, packet_index))
+ self.logger.debug(
+ "Got packet on port %s: src=%u (id=%u)" %
+ (dst_if.name, payload_info.src, packet_index))
next_info = self.get_next_packet_info_for_interface2(
payload_info.src, dst_sw_if_index,
last_info[payload_info.src])
"Interface %s: Packet expected from interface %s "
"didn't arrive" % (dst_if.name, i.name))
+ def verify_vrf(self, vrf_id):
+ """
+ Check if the FIB table / VRF ID is configured.
+
+ :param int vrf_id: The FIB table / VRF ID to be verified.
+ :return: 1 if the FIB table / VRF ID is configured, otherwise return 0.
+ """
+ ip_fib_dump = self.vapi.ip_fib_dump()
+ vrf_count = 0
+ for ip_fib_details in ip_fib_dump:
+ if ip_fib_details[2] == vrf_id:
+ vrf_count += 1
+ if vrf_count == 0:
+ self.logger.info("IPv4 VRF ID %d is not configured" % vrf_id)
+ return 0
+ else:
+ self.logger.info("IPv4 VRF ID %d is configured" % vrf_id)
+ return 1
+
@staticmethod
def build_ip_mask(proto='', src_ip='', dst_ip='',
src_port='', dst_port=''):
pkts = self.create_stream(self.pg0, self.pg2, self.pg_if_packet_sizes)
self.pg0.add_stream(pkts)
- self.create_classify_table(
- 'mac', self.build_mac_mask(src_mac='ffffffffffff'), data_offset=-14)
+ self.create_classify_table('mac',
+ self.build_mac_mask(src_mac='ffffffffffff'),
+ data_offset=-14)
self.create_classify_session(
self.pg0, self.acl_tbl_idx.get('mac'),
self.build_mac_match(src_mac=self.pg0.remote_mac))
pkts = self.create_stream(self.pg0, self.pg3, self.pg_if_packet_sizes)
self.pg0.add_stream(pkts)
- self.create_classify_table('pbr', self.build_ip_mask(src_ip='ffffffff'))
+ self.create_classify_table(
+ 'pbr', self.build_ip_mask(
+ src_ip='ffffffff'))
pbr_option = 1
+ # this will create the VRF/table in which we will insert the route
self.create_classify_session(
self.pg0, self.acl_tbl_idx.get('pbr'),
self.build_ip_match(src_ip=self.pg0.remote_ip4),
pbr_option, self.pbr_vrfid)
+ self.assertTrue(self.verify_vrf(self.pbr_vrfid))
self.config_pbr_fib_entry(self.pg3)
self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get('pbr'))
self.pg1.assert_nothing_captured(remark="packets forwarded")
self.pg2.assert_nothing_captured(remark="packets forwarded")
+ # remove the classify session and the route
+ self.config_pbr_fib_entry(self.pg3, is_add=0)
+ self.create_classify_session(
+ self.pg0, self.acl_tbl_idx.get('pbr'),
+ self.build_ip_match(src_ip=self.pg0.remote_ip4),
+ pbr_option, self.pbr_vrfid, is_add=0)
+
+ # and the table should be gone.
+ self.assertFalse(self.verify_vrf(self.pbr_vrfid))
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)