-#!/usr/bin/env python
+#!/usr/bin/env python3
import binascii
import socket
from scapy.layers.l2 import Ether
from scapy.layers.inet import IP, UDP, TCP
from util import ppp
+from vpp_ip_route import VppIpRoute, VppRoutePath
+from vpp_ip import INVALID_INDEX
class TestClassifier(VppTestCase):
super(TestClassifier, cls).setUpClass()
cls.acl_active_table = ''
+ @classmethod
+ def tearDownClass(cls):
+ super(TestClassifier, cls).tearDownClass()
+
def setUp(self):
"""
Perform test setup before test case.
def tearDown(self):
"""Run standard test teardown and acl related log."""
if not self.vpp_dead:
- self.logger.info(self.vapi.ppcli("show inacl type ip4"))
- self.logger.info(self.vapi.ppcli("show outacl type ip4"))
- self.logger.info(self.vapi.cli("show classify table verbose"))
- self.logger.info(self.vapi.cli("show ip fib"))
if self.acl_active_table == 'ip_out':
self.output_acl_set_interface(
self.pg0, self.acl_tbl_idx.get(self.acl_active_table), 0)
super(TestClassifier, self).tearDown()
- def config_pbr_fib_entry(self, intf, is_add=1):
- """Configure fib entry to route traffic toward PBR VRF table
-
- :param VppInterface intf: destination interface to be routed for PBR.
-
- """
- addr_len = 24
- self.vapi.ip_add_del_route(intf.local_ip4n,
- addr_len,
- intf.remote_ip4n,
- table_id=self.pbr_vrfid,
- is_add=is_add)
+ def show_commands_at_teardown(self):
+ self.logger.info(self.vapi.ppcli("show inacl type ip4"))
+ self.logger.info(self.vapi.ppcli("show outacl type ip4"))
+ self.logger.info(self.vapi.cli("show classify table verbose"))
+ self.logger.info(self.vapi.cli("show ip fib"))
def create_stream(self, src_if, dst_if, packet_sizes,
proto_l=UDP(sport=1234, dport=5678)):
:param int vrf_id: The FIB table / VRF ID to be verified.
:return: 1 if the FIB table / VRF ID is configured, otherwise return 0.
"""
- ip_fib_dump = self.vapi.ip_fib_dump()
- vrf_count = 0
- for ip_fib_details in ip_fib_dump:
- if ip_fib_details[2] == vrf_id:
- vrf_count += 1
+ ip_fib_dump = self.vapi.ip_route_dump(vrf_id, False)
+ vrf_count = len(ip_fib_dump)
+
if vrf_count == 0:
self.logger.info("IPv4 VRF ID %d is not configured" % vrf_id)
return 0
:param int dst_port: destination port number "x"
"""
if src_ip:
- src_ip = binascii.hexlify(socket.inet_aton(src_ip))
+ src_ip = binascii.hexlify(socket.inet_aton(src_ip)).decode('ascii')
if dst_ip:
- dst_ip = binascii.hexlify(socket.inet_aton(dst_ip))
+ dst_ip = binascii.hexlify(socket.inet_aton(dst_ip)).decode('ascii')
return ('{!s:0>20}{!s:0>12}{!s:0>8}{!s:0>4}{!s:0>4}'.format(
hex(proto)[2:], src_ip, dst_ip, hex(src_port)[2:],
class TestClassifierIP(TestClassifier):
""" Classifier IP Test Case """
+ @classmethod
+ def setUpClass(cls):
+ super(TestClassifierIP, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestClassifierIP, cls).tearDownClass()
+
def test_iacl_src_ip(self):
""" Source IP iACL test
class TestClassifierUDP(TestClassifier):
""" Classifier UDP proto Test Case """
+ @classmethod
+ def setUpClass(cls):
+ super(TestClassifierUDP, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestClassifierUDP, cls).tearDownClass()
+
def test_iacl_proto_udp(self):
""" UDP protocol iACL test
class TestClassifierTCP(TestClassifier):
""" Classifier TCP proto Test Case """
+ @classmethod
+ def setUpClass(cls):
+ super(TestClassifierTCP, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestClassifierTCP, cls).tearDownClass()
+
def test_iacl_proto_tcp(self):
""" TCP protocol iACL test
class TestClassifierIPOut(TestClassifier):
""" Classifier output IP Test Case """
+ @classmethod
+ def setUpClass(cls):
+ super(TestClassifierIPOut, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestClassifierIPOut, cls).tearDownClass()
+
def test_acl_ip_out(self):
""" Output IP ACL test
class TestClassifierMAC(TestClassifier):
""" Classifier MAC Test Case """
+ @classmethod
+ def setUpClass(cls):
+ super(TestClassifierMAC, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestClassifierMAC, cls).tearDownClass()
+
def test_acl_mac(self):
""" MAC ACL test
class TestClassifierPBR(TestClassifier):
""" Classifier PBR Test Case """
+ @classmethod
+ def setUpClass(cls):
+ super(TestClassifierPBR, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestClassifierPBR, cls).tearDownClass()
+
def test_acl_pbr(self):
""" IP PBR test
self.build_ip_match(src_ip=self.pg0.remote_ip4),
pbr_option, self.pbr_vrfid)
self.assertTrue(self.verify_vrf(self.pbr_vrfid))
- self.config_pbr_fib_entry(self.pg3)
+ r = VppIpRoute(self, self.pg3.local_ip4, 24,
+ [VppRoutePath(self.pg3.remote_ip4,
+ INVALID_INDEX)],
+ table_id=self.pbr_vrfid)
+ r.add_vpp_config()
+
self.input_acl_set_interface(self.pg0, self.acl_tbl_idx.get(key))
self.pg_enable_capture(self.pg_interfaces)
self.pg2.assert_nothing_captured(remark="packets forwarded")
# remove the classify session and the route
- self.config_pbr_fib_entry(self.pg3, is_add=0)
+ r.remove_vpp_config()
self.create_classify_session(
self.acl_tbl_idx.get(key),
self.build_ip_match(src_ip=self.pg0.remote_ip4),