from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP, TCP
from util import ppp
+from vpp_ip_route import VppIpRoute, VppRoutePath
+from vpp_ip import INVALID_INDEX
class TestClassifier(VppTestCase):
self.logger.info(self.vapi.cli("show classify table verbose"))
self.logger.info(self.vapi.cli("show ip fib"))
- def config_pbr_fib_entry(self, intf, is_add=1):
- """Configure fib entry to route traffic toward PBR VRF table
-
- :param VppInterface intf: destination interface to be routed for PBR.
-
- """
- addr_len = 24
- self.vapi.ip_add_del_route(dst_address=intf.local_ip4n,
- dst_address_length=addr_len,
- next_hop_address=intf.remote_ip4n,
- table_id=self.pbr_vrfid, is_add=is_add)
-
def create_stream(self, src_if, dst_if, packet_sizes,
proto_l=UDP(sport=1234, dport=5678)):
"""Create input packet stream for defined interfaces.
:param int vrf_id: The FIB table / VRF ID to be verified.
:return: 1 if the FIB table / VRF ID is configured, otherwise return 0.
"""
- ip_fib_dump = self.vapi.ip_fib_dump()
- vrf_count = 0
- for ip_fib_details in ip_fib_dump:
- if ip_fib_details[2] == vrf_id:
- vrf_count += 1
+ ip_fib_dump = self.vapi.ip_route_dump(vrf_id, False)
+ vrf_count = len(ip_fib_dump)
+
if vrf_count == 0:
self.logger.info("IPv4 VRF ID %d is not configured" % vrf_id)
return 0
self.build_ip_match(src_ip=self.pg0.remote_ip4),
pbr_option, self.pbr_vrfid)
self.assertTrue(self.verify_vrf(self.pbr_vrfid))
- self.config_pbr_fib_entry(self.pg3)
+ r = VppIpRoute(self, self.pg3.local_ip4, 24,
+ [VppRoutePath(self.pg3.remote_ip4,
+ INVALID_INDEX)],
+ table_id=self.pbr_vrfid)
+ r.add_vpp_config()
+
self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
self.pg_enable_capture(self.pg_interfaces)
self.pg2.assert_nothing_captured(remark="packets forwarded")
# remove the classify session and the route
- self.config_pbr_fib_entry(self.pg3, is_add=0)
+ r.remove_vpp_config()
self.create_classify_session(
self.acl_tbl_idx.get(key),
self.build_ip_match(src_ip=self.pg0.remote_ip4),