from scapy.packet import Raw
from framework import VppTestCase, VppTestRunner
from vpp_papi import VppEnum
-from vpp_policer import VppPolicer, PolicerAction
+from vpp_policer import VppPolicer, PolicerAction, Dir
NUM_PKTS = 67
class TestPolicerInput(VppTestCase):
- """ Policer on an input interface """
+ """Policer on an interface"""
+
vpp_worker_count = 2
def setUp(self):
i.config_ip4()
i.resolve_arp()
- self.pkt = (Ether(src=self.pg0.remote_mac,
- dst=self.pg0.local_mac) /
- IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
- UDP(sport=1234, dport=1234) /
- Raw(b'\xa5' * 100))
+ self.pkt = (
+ Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
+ / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
+ / UDP(sport=1234, dport=1234)
+ / Raw(b"\xa5" * 100)
+ )
def tearDown(self):
for i in self.pg_interfaces:
i.admin_down()
super(TestPolicerInput, self).tearDown()
- def test_policer_input(self):
- """ Input Policing """
+ def policer_interface_test(self, dir: Dir):
pkts = self.pkt * NUM_PKTS
action_tx = PolicerAction(
- VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT,
- 0)
- policer = VppPolicer(self, "pol1", 80, 0, 1000, 0,
- conform_action=action_tx,
- exceed_action=action_tx,
- violate_action=action_tx)
+ VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
+ )
+ policer = VppPolicer(
+ self,
+ "pol1",
+ 80,
+ 0,
+ 1000,
+ 0,
+ conform_action=action_tx,
+ exceed_action=action_tx,
+ violate_action=action_tx,
+ )
policer.add_vpp_config()
+ sw_if_index = self.pg0.sw_if_index if dir == Dir.RX else self.pg1.sw_if_index
+
# Start policing on pg0
- policer.apply_vpp_config(self.pg0.sw_if_index, True)
+ policer.apply_vpp_config(sw_if_index, dir, True)
rx = self.send_and_expect(self.pg0, pkts, self.pg1, worker=0)
stats = policer.get_stats()
# Single rate, 2 colour policer - expect conform, violate but no exceed
- self.assertGreater(stats['conform_packets'], 0)
- self.assertEqual(stats['exceed_packets'], 0)
- self.assertGreater(stats['violate_packets'], 0)
+ self.assertGreater(stats["conform_packets"], 0)
+ self.assertEqual(stats["exceed_packets"], 0)
+ self.assertGreater(stats["violate_packets"], 0)
# Stop policing on pg0
- policer.apply_vpp_config(self.pg0.sw_if_index, False)
+ policer.apply_vpp_config(sw_if_index, dir, False)
rx = self.send_and_expect(self.pg0, pkts, self.pg1, worker=0)
policer.remove_vpp_config()
- def test_policer_handoff(self):
- """ Worker thread handoff """
+ def test_policer_input(self):
+ """Input Policing"""
+ self.policer_interface_test(Dir.RX)
+
+ def test_policer_output(self):
+ """Output Policing"""
+ self.policer_interface_test(Dir.TX)
+
+ def policer_handoff_test(self, dir: Dir):
pkts = self.pkt * NUM_PKTS
action_tx = PolicerAction(
- VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT,
- 0)
- policer = VppPolicer(self, "pol2", 80, 0, 1000, 0,
- conform_action=action_tx,
- exceed_action=action_tx,
- violate_action=action_tx)
+ VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
+ )
+ policer = VppPolicer(
+ self,
+ "pol2",
+ 80,
+ 0,
+ 1000,
+ 0,
+ conform_action=action_tx,
+ exceed_action=action_tx,
+ violate_action=action_tx,
+ )
policer.add_vpp_config()
+ sw_if_index = self.pg0.sw_if_index if dir == Dir.RX else self.pg1.sw_if_index
+
# Bind the policer to worker 1
policer.bind_vpp_config(1, True)
# Start policing on pg0
- policer.apply_vpp_config(self.pg0.sw_if_index, True)
+ policer.apply_vpp_config(sw_if_index, dir, True)
for worker in [0, 1]:
self.send_and_expect(self.pg0, pkts, self.pg1, worker=worker)
self.assertEqual(stats, stats1)
# Worker 0, should have handed everything off
- self.assertEqual(stats0['conform_packets'], 0)
- self.assertEqual(stats0['exceed_packets'], 0)
- self.assertEqual(stats0['violate_packets'], 0)
+ self.assertEqual(stats0["conform_packets"], 0)
+ self.assertEqual(stats0["exceed_packets"], 0)
+ self.assertEqual(stats0["violate_packets"], 0)
# Unbind the policer from worker 1 and repeat
policer.bind_vpp_config(1, False)
stats0 = policer.get_stats(worker=0)
stats1 = policer.get_stats(worker=1)
- self.assertGreater(stats0['conform_packets'], 0)
- self.assertEqual(stats0['exceed_packets'], 0)
- self.assertGreater(stats0['violate_packets'], 0)
+ self.assertGreater(stats0["conform_packets"], 0)
+ self.assertEqual(stats0["exceed_packets"], 0)
+ self.assertGreater(stats0["violate_packets"], 0)
- self.assertGreater(stats1['conform_packets'], 0)
- self.assertEqual(stats1['exceed_packets'], 0)
- self.assertGreater(stats1['violate_packets'], 0)
+ self.assertGreater(stats1["conform_packets"], 0)
+ self.assertEqual(stats1["exceed_packets"], 0)
+ self.assertGreater(stats1["violate_packets"], 0)
- self.assertEqual(stats0['conform_packets'] + stats1['conform_packets'],
- stats['conform_packets'])
+ self.assertEqual(
+ stats0["conform_packets"] + stats1["conform_packets"],
+ stats["conform_packets"],
+ )
- self.assertEqual(stats0['violate_packets'] + stats1['violate_packets'],
- stats['violate_packets'])
+ self.assertEqual(
+ stats0["violate_packets"] + stats1["violate_packets"],
+ stats["violate_packets"],
+ )
# Stop policing on pg0
- policer.apply_vpp_config(self.pg0.sw_if_index, False)
+ policer.apply_vpp_config(sw_if_index, dir, False)
policer.remove_vpp_config()
-if __name__ == '__main__':
+ def test_policer_handoff_input(self):
+ """Worker thread handoff policer input"""
+ self.policer_handoff_test(Dir.RX)
+
+ def test_policer_handoff_output(self):
+ """Worker thread handoff policer output"""
+ self.policer_handoff_test(Dir.TX)
+
+
+if __name__ == "__main__":
unittest.main(testRunner=VppTestRunner)