tests: Add support for getting corefile patterns on FreeBSD
[vpp.git] / test / test_policer_input.py
1 #!/usr/bin/env python3
2 # Copyright (c) 2021 Graphiant, Inc.
3
4 import unittest
5 from scapy.layers.inet import IP, UDP
6 from scapy.layers.l2 import Ether
7 from scapy.packet import Raw
8 from framework import VppTestCase
9 from asfframework import VppTestRunner
10 from vpp_papi import VppEnum
11 from vpp_policer import VppPolicer, PolicerAction, Dir
12
13 NUM_PKTS = 67
14
15
16 class TestPolicerInput(VppTestCase):
17     """Policer on an interface"""
18
19     vpp_worker_count = 2
20
21     def setUp(self):
22         super(TestPolicerInput, self).setUp()
23
24         self.create_pg_interfaces(range(2))
25         for i in self.pg_interfaces:
26             i.admin_up()
27             i.config_ip4()
28             i.resolve_arp()
29
30         self.pkt = (
31             Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
32             / IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4)
33             / UDP(sport=1234, dport=1234)
34             / Raw(b"\xa5" * 100)
35         )
36
37     def tearDown(self):
38         for i in self.pg_interfaces:
39             i.unconfig_ip4()
40             i.admin_down()
41         super(TestPolicerInput, self).tearDown()
42
43     def policer_interface_test(self, dir: Dir):
44         pkts = self.pkt * NUM_PKTS
45
46         action_tx = PolicerAction(
47             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
48         )
49         policer = VppPolicer(
50             self,
51             "pol1",
52             80,
53             0,
54             1000,
55             0,
56             conform_action=action_tx,
57             exceed_action=action_tx,
58             violate_action=action_tx,
59         )
60         policer.add_vpp_config()
61
62         sw_if_index = self.pg0.sw_if_index if dir == Dir.RX else self.pg1.sw_if_index
63
64         # Start policing on pg0
65         policer.apply_vpp_config(sw_if_index, dir, True)
66
67         rx = self.send_and_expect(self.pg0, pkts, self.pg1, worker=0)
68         stats = policer.get_stats()
69
70         # Single rate, 2 colour policer - expect conform, violate but no exceed
71         self.assertGreater(stats["conform_packets"], 0)
72         self.assertEqual(stats["exceed_packets"], 0)
73         self.assertGreater(stats["violate_packets"], 0)
74
75         # Stop policing on pg0
76         policer.apply_vpp_config(sw_if_index, dir, False)
77
78         rx = self.send_and_expect(self.pg0, pkts, self.pg1, worker=0)
79
80         statsnew = policer.get_stats()
81
82         # No new packets counted
83         self.assertEqual(stats, statsnew)
84
85         policer.remove_vpp_config()
86
87     def test_policer_input(self):
88         """Input Policing"""
89         self.policer_interface_test(Dir.RX)
90
91     def test_policer_output(self):
92         """Output Policing"""
93         self.policer_interface_test(Dir.TX)
94
95     def test_policer_reset(self):
96         """Policer reset bucket"""
97         pkts = self.pkt * NUM_PKTS
98
99         action_tx = PolicerAction(
100             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
101         )
102         policer = VppPolicer(
103             self,
104             "pol1",
105             1,
106             0,
107             10000,
108             0,
109             conform_action=action_tx,
110             exceed_action=action_tx,
111             violate_action=action_tx,
112         )
113         policer.add_vpp_config()
114
115         # Start policing on pg0
116         policer.apply_vpp_config(self.pg0.sw_if_index, Dir.RX, True)
117
118         self.send_and_expect(self.pg0, pkts, self.pg1, worker=0)
119         details = policer.get_details()
120
121         self.assertGreater(details.current_limit, details.current_bucket)
122
123         self.send_and_expect(self.pg0, pkts, self.pg1, worker=0)
124         self.vapi.policer_reset(policer_index=policer.policer_index)
125         details = policer.get_details()
126
127         self.assertEqual(details.current_limit, details.current_bucket)
128
129         policer.apply_vpp_config(self.pg0.sw_if_index, Dir.RX, False)
130
131         policer.remove_vpp_config()
132
133     def test_policer_update(self):
134         """Policer update"""
135         pkts = self.pkt * NUM_PKTS
136
137         action_tx = PolicerAction(
138             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
139         )
140         policer = VppPolicer(
141             self,
142             "pol1",
143             1,
144             0,
145             10000,
146             0,
147             conform_action=action_tx,
148             exceed_action=action_tx,
149             violate_action=action_tx,
150         )
151         policer.add_vpp_config()
152
153         # Start policing on pg0
154         policer.apply_vpp_config(self.pg0.sw_if_index, Dir.RX, True)
155
156         self.send_and_expect(self.pg0, pkts, self.pg1, worker=0)
157         details_before = policer.get_details()
158
159         self.assertGreater(details_before.current_limit, details_before.current_bucket)
160
161         policer.cir = 8000
162         policer.commited_burst = 100000
163         policer.update()
164
165         details_after = policer.get_details()
166
167         self.assertGreater(details_after.cir, details_before.cir)
168         self.assertGreater(details_after.cb, details_before.cb)
169
170         policer.apply_vpp_config(self.pg0.sw_if_index, Dir.RX, False)
171
172         policer.remove_vpp_config()
173
174     def policer_handoff_test(self, dir: Dir):
175         pkts = self.pkt * NUM_PKTS
176
177         action_tx = PolicerAction(
178             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT, 0
179         )
180         policer = VppPolicer(
181             self,
182             "pol2",
183             80,
184             0,
185             1000,
186             0,
187             conform_action=action_tx,
188             exceed_action=action_tx,
189             violate_action=action_tx,
190         )
191         policer.add_vpp_config()
192
193         sw_if_index = self.pg0.sw_if_index if dir == Dir.RX else self.pg1.sw_if_index
194
195         # Bind the policer to worker 1
196         policer.bind_vpp_config(1, True)
197
198         # Start policing on pg0
199         policer.apply_vpp_config(sw_if_index, dir, True)
200
201         for worker in [0, 1]:
202             self.send_and_expect(self.pg0, pkts, self.pg1, worker=worker)
203             self.logger.debug(self.vapi.cli("show trace max 100"))
204
205         stats = policer.get_stats()
206         stats0 = policer.get_stats(worker=0)
207         stats1 = policer.get_stats(worker=1)
208
209         # Worker 1, should have done all the policing
210         self.assertEqual(stats, stats1)
211
212         # Worker 0, should have handed everything off
213         self.assertEqual(stats0["conform_packets"], 0)
214         self.assertEqual(stats0["exceed_packets"], 0)
215         self.assertEqual(stats0["violate_packets"], 0)
216
217         # Unbind the policer from worker 1 and repeat
218         policer.bind_vpp_config(1, False)
219         for worker in [0, 1]:
220             self.send_and_expect(self.pg0, pkts, self.pg1, worker=worker)
221             self.logger.debug(self.vapi.cli("show trace max 100"))
222
223         # The policer should auto-bind to worker 0 when packets arrive
224         stats = policer.get_stats()
225
226         # The 2 workers should now have policed the same amount
227         stats = policer.get_stats()
228         stats0 = policer.get_stats(worker=0)
229         stats1 = policer.get_stats(worker=1)
230
231         self.assertGreater(stats0["conform_packets"], 0)
232         self.assertEqual(stats0["exceed_packets"], 0)
233         self.assertGreater(stats0["violate_packets"], 0)
234
235         self.assertGreater(stats1["conform_packets"], 0)
236         self.assertEqual(stats1["exceed_packets"], 0)
237         self.assertGreater(stats1["violate_packets"], 0)
238
239         self.assertEqual(
240             stats0["conform_packets"] + stats1["conform_packets"],
241             stats["conform_packets"],
242         )
243
244         self.assertEqual(
245             stats0["violate_packets"] + stats1["violate_packets"],
246             stats["violate_packets"],
247         )
248
249         # Stop policing on pg0
250         policer.apply_vpp_config(sw_if_index, dir, False)
251
252         policer.remove_vpp_config()
253
254     def test_policer_handoff_input(self):
255         """Worker thread handoff policer input"""
256         self.policer_handoff_test(Dir.RX)
257
258     def test_policer_handoff_output(self):
259         """Worker thread handoff policer output"""
260         self.policer_handoff_test(Dir.TX)
261
262
263 if __name__ == "__main__":
264     unittest.main(testRunner=VppTestRunner)