2 # Copyright (c) 2021 Graphiant, Inc.
6 from scapy.layers.inet import IP, UDP
7 from scapy.layers.l2 import Ether
8 from scapy.packet import Raw
9 from framework import VppTestCase, VppTestRunner
10 from vpp_papi import VppEnum
11 from vpp_policer import VppPolicer, PolicerAction, Dir
16 class TestPolicerInput(VppTestCase):
17 """Policer on an interface"""
22 super(TestPolicerInput, self).setUp()
24 self.create_pg_interfaces(range(2))
25 for i in self.pg_interfaces:
31 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
32 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
33 / UDP(sport=1234, dport=1234)
38 for i in self.pg_interfaces:
41 super(TestPolicerInput, self).tearDown()
43 def policer_interface_test(self, dir: Dir):
44 pkts = self.pkt * NUM_PKTS
46 action_tx = PolicerAction(
47 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
56 conform_action=action_tx,
57 exceed_action=action_tx,
58 violate_action=action_tx,
60 policer.add_vpp_config()
62 sw_if_index = self.pg0.sw_if_index if dir == Dir.RX else self.pg1.sw_if_index
64 # Start policing on pg0
65 policer.apply_vpp_config(sw_if_index, dir, True)
67 rx = self.send_and_expect(self.pg0, pkts, self.pg1, worker=0)
68 stats = policer.get_stats()
70 # Single rate, 2 colour policer - expect conform, violate but no exceed
71 self.assertGreater(stats["conform_packets"], 0)
72 self.assertEqual(stats["exceed_packets"], 0)
73 self.assertGreater(stats["violate_packets"], 0)
75 # Stop policing on pg0
76 policer.apply_vpp_config(sw_if_index, dir, False)
78 rx = self.send_and_expect(self.pg0, pkts, self.pg1, worker=0)
80 statsnew = policer.get_stats()
82 # No new packets counted
83 self.assertEqual(stats, statsnew)
85 policer.remove_vpp_config()
87 def test_policer_input(self):
89 self.policer_interface_test(Dir.RX)
91 def test_policer_output(self):
93 self.policer_interface_test(Dir.TX)
95 def policer_handoff_test(self, dir: Dir):
96 pkts = self.pkt * NUM_PKTS
98 action_tx = PolicerAction(
99 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
101 policer = VppPolicer(
108 conform_action=action_tx,
109 exceed_action=action_tx,
110 violate_action=action_tx,
112 policer.add_vpp_config()
114 sw_if_index = self.pg0.sw_if_index if dir == Dir.RX else self.pg1.sw_if_index
116 # Bind the policer to worker 1
117 policer.bind_vpp_config(1, True)
119 # Start policing on pg0
120 policer.apply_vpp_config(sw_if_index, dir, True)
122 for worker in [0, 1]:
123 self.send_and_expect(self.pg0, pkts, self.pg1, worker=worker)
124 self.logger.debug(self.vapi.cli("show trace max 100"))
126 stats = policer.get_stats()
127 stats0 = policer.get_stats(worker=0)
128 stats1 = policer.get_stats(worker=1)
130 # Worker 1, should have done all the policing
131 self.assertEqual(stats, stats1)
133 # Worker 0, should have handed everything off
134 self.assertEqual(stats0["conform_packets"], 0)
135 self.assertEqual(stats0["exceed_packets"], 0)
136 self.assertEqual(stats0["violate_packets"], 0)
138 # Unbind the policer from worker 1 and repeat
139 policer.bind_vpp_config(1, False)
140 for worker in [0, 1]:
141 self.send_and_expect(self.pg0, pkts, self.pg1, worker=worker)
142 self.logger.debug(self.vapi.cli("show trace max 100"))
144 # The policer should auto-bind to worker 0 when packets arrive
145 stats = policer.get_stats()
147 # The 2 workers should now have policed the same amount
148 stats = policer.get_stats()
149 stats0 = policer.get_stats(worker=0)
150 stats1 = policer.get_stats(worker=1)
152 self.assertGreater(stats0["conform_packets"], 0)
153 self.assertEqual(stats0["exceed_packets"], 0)
154 self.assertGreater(stats0["violate_packets"], 0)
156 self.assertGreater(stats1["conform_packets"], 0)
157 self.assertEqual(stats1["exceed_packets"], 0)
158 self.assertGreater(stats1["violate_packets"], 0)
161 stats0["conform_packets"] + stats1["conform_packets"],
162 stats["conform_packets"],
166 stats0["violate_packets"] + stats1["violate_packets"],
167 stats["violate_packets"],
170 # Stop policing on pg0
171 policer.apply_vpp_config(sw_if_index, dir, False)
173 policer.remove_vpp_config()
175 def test_policer_handoff_input(self):
176 """Worker thread handoff policer input"""
177 self.policer_handoff_test(Dir.RX)
179 def test_policer_handoff_output(self):
180 """Worker thread handoff policer output"""
181 self.policer_handoff_test(Dir.TX)
184 if __name__ == "__main__":
185 unittest.main(testRunner=VppTestRunner)