2 # Copyright (c) 2021 Graphiant, Inc.
5 from scapy.layers.inet import IP, UDP
6 from scapy.layers.l2 import Ether
7 from scapy.packet import Raw
8 from framework import VppTestCase
9 from asfframework import VppTestRunner
10 from vpp_papi import VppEnum
11 from vpp_policer import VppPolicer, PolicerAction, Dir
16 class TestPolicerInput(VppTestCase):
17 """Policer on an interface"""
22 super(TestPolicerInput, self).setUp()
24 self.create_pg_interfaces(range(2))
25 for i in self.pg_interfaces:
31 Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
32 / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
33 / UDP(sport=1234, dport=1234)
38 for i in self.pg_interfaces:
41 super(TestPolicerInput, self).tearDown()
43 def policer_interface_test(self, dir: Dir):
44 pkts = self.pkt * NUM_PKTS
46 action_tx = PolicerAction(
47 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
56 conform_action=action_tx,
57 exceed_action=action_tx,
58 violate_action=action_tx,
60 policer.add_vpp_config()
62 sw_if_index = self.pg0.sw_if_index if dir == Dir.RX else self.pg1.sw_if_index
64 # Start policing on pg0
65 policer.apply_vpp_config(sw_if_index, dir, True)
67 rx = self.send_and_expect(self.pg0, pkts, self.pg1, worker=0)
68 stats = policer.get_stats()
70 # Single rate, 2 colour policer - expect conform, violate but no exceed
71 self.assertGreater(stats["conform_packets"], 0)
72 self.assertEqual(stats["exceed_packets"], 0)
73 self.assertGreater(stats["violate_packets"], 0)
75 # Stop policing on pg0
76 policer.apply_vpp_config(sw_if_index, dir, False)
78 rx = self.send_and_expect(self.pg0, pkts, self.pg1, worker=0)
80 statsnew = policer.get_stats()
82 # No new packets counted
83 self.assertEqual(stats, statsnew)
85 policer.remove_vpp_config()
87 def test_policer_input(self):
89 self.policer_interface_test(Dir.RX)
91 def test_policer_output(self):
93 self.policer_interface_test(Dir.TX)
95 def test_policer_reset(self):
96 """Policer reset bucket"""
97 pkts = self.pkt * NUM_PKTS
99 action_tx = PolicerAction(
100 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
102 policer = VppPolicer(
109 conform_action=action_tx,
110 exceed_action=action_tx,
111 violate_action=action_tx,
113 policer.add_vpp_config()
115 # Start policing on pg0
116 policer.apply_vpp_config(self.pg0.sw_if_index, Dir.RX, True)
118 self.send_and_expect(self.pg0, pkts, self.pg1, worker=0)
119 details = policer.get_details()
121 self.assertGreater(details.current_limit, details.current_bucket)
123 self.send_and_expect(self.pg0, pkts, self.pg1, worker=0)
124 self.vapi.policer_reset(policer_index=policer.policer_index)
125 details = policer.get_details()
127 self.assertEqual(details.current_limit, details.current_bucket)
129 policer.apply_vpp_config(self.pg0.sw_if_index, Dir.RX, False)
131 policer.remove_vpp_config()
133 def test_policer_update(self):
135 pkts = self.pkt * NUM_PKTS
137 action_tx = PolicerAction(
138 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
140 policer = VppPolicer(
147 conform_action=action_tx,
148 exceed_action=action_tx,
149 violate_action=action_tx,
151 policer.add_vpp_config()
153 # Start policing on pg0
154 policer.apply_vpp_config(self.pg0.sw_if_index, Dir.RX, True)
156 self.send_and_expect(self.pg0, pkts, self.pg1, worker=0)
157 details_before = policer.get_details()
159 self.assertGreater(details_before.current_limit, details_before.current_bucket)
162 policer.commited_burst = 100000
165 details_after = policer.get_details()
167 self.assertGreater(details_after.cir, details_before.cir)
168 self.assertGreater(details_after.cb, details_before.cb)
170 policer.apply_vpp_config(self.pg0.sw_if_index, Dir.RX, False)
172 policer.remove_vpp_config()
174 def policer_handoff_test(self, dir: Dir):
175 pkts = self.pkt * NUM_PKTS
177 action_tx = PolicerAction(
178 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
180 policer = VppPolicer(
187 conform_action=action_tx,
188 exceed_action=action_tx,
189 violate_action=action_tx,
191 policer.add_vpp_config()
193 sw_if_index = self.pg0.sw_if_index if dir == Dir.RX else self.pg1.sw_if_index
195 # Bind the policer to worker 1
196 policer.bind_vpp_config(1, True)
198 # Start policing on pg0
199 policer.apply_vpp_config(sw_if_index, dir, True)
201 for worker in [0, 1]:
202 self.send_and_expect(self.pg0, pkts, self.pg1, worker=worker)
203 self.logger.debug(self.vapi.cli("show trace max 100"))
205 stats = policer.get_stats()
206 stats0 = policer.get_stats(worker=0)
207 stats1 = policer.get_stats(worker=1)
209 # Worker 1, should have done all the policing
210 self.assertEqual(stats, stats1)
212 # Worker 0, should have handed everything off
213 self.assertEqual(stats0["conform_packets"], 0)
214 self.assertEqual(stats0["exceed_packets"], 0)
215 self.assertEqual(stats0["violate_packets"], 0)
217 # Unbind the policer from worker 1 and repeat
218 policer.bind_vpp_config(1, False)
219 for worker in [0, 1]:
220 self.send_and_expect(self.pg0, pkts, self.pg1, worker=worker)
221 self.logger.debug(self.vapi.cli("show trace max 100"))
223 # The policer should auto-bind to worker 0 when packets arrive
224 stats = policer.get_stats()
226 # The 2 workers should now have policed the same amount
227 stats = policer.get_stats()
228 stats0 = policer.get_stats(worker=0)
229 stats1 = policer.get_stats(worker=1)
231 self.assertGreater(stats0["conform_packets"], 0)
232 self.assertEqual(stats0["exceed_packets"], 0)
233 self.assertGreater(stats0["violate_packets"], 0)
235 self.assertGreater(stats1["conform_packets"], 0)
236 self.assertEqual(stats1["exceed_packets"], 0)
237 self.assertGreater(stats1["violate_packets"], 0)
240 stats0["conform_packets"] + stats1["conform_packets"],
241 stats["conform_packets"],
245 stats0["violate_packets"] + stats1["violate_packets"],
246 stats["violate_packets"],
249 # Stop policing on pg0
250 policer.apply_vpp_config(sw_if_index, dir, False)
252 policer.remove_vpp_config()
254 def test_policer_handoff_input(self):
255 """Worker thread handoff policer input"""
256 self.policer_handoff_test(Dir.RX)
258 def test_policer_handoff_output(self):
259 """Worker thread handoff policer output"""
260 self.policer_handoff_test(Dir.TX)
263 if __name__ == "__main__":
264 unittest.main(testRunner=VppTestRunner)