policer: output interface policer
[vpp.git] / test / test_policer_input.py
1 #!/usr/bin/env python3
2 # Copyright (c) 2021 Graphiant, Inc.
3
4 import unittest
5 import scapy.compat
6 from scapy.layers.inet import IP, UDP
7 from scapy.layers.l2 import Ether
8 from scapy.packet import Raw
9 from framework import VppTestCase, VppTestRunner
10 from vpp_papi import VppEnum
11 from vpp_policer import VppPolicer, PolicerAction, Dir
12
13 NUM_PKTS = 67
14
15
16 class TestPolicerInput(VppTestCase):
17     """ Policer on an interface """
18     vpp_worker_count = 2
19
20     def setUp(self):
21         super(TestPolicerInput, self).setUp()
22
23         self.create_pg_interfaces(range(2))
24         for i in self.pg_interfaces:
25             i.admin_up()
26             i.config_ip4()
27             i.resolve_arp()
28
29         self.pkt = (Ether(src=self.pg0.remote_mac,
30                           dst=self.pg0.local_mac) /
31                     IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
32                     UDP(sport=1234, dport=1234) /
33                     Raw(b'\xa5' * 100))
34
35     def tearDown(self):
36         for i in self.pg_interfaces:
37             i.unconfig_ip4()
38             i.admin_down()
39         super(TestPolicerInput, self).tearDown()
40
41     def policer_interface_test(self, dir: Dir):
42         pkts = self.pkt * NUM_PKTS
43
44         action_tx = PolicerAction(
45             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT,
46             0)
47         policer = VppPolicer(self, "pol1", 80, 0, 1000, 0,
48                              conform_action=action_tx,
49                              exceed_action=action_tx,
50                              violate_action=action_tx)
51         policer.add_vpp_config()
52
53         sw_if_index = (self.pg0.sw_if_index
54                        if dir == Dir.RX
55                        else self.pg1.sw_if_index)
56
57         # Start policing on pg0
58         policer.apply_vpp_config(sw_if_index, dir, True)
59
60         rx = self.send_and_expect(self.pg0, pkts, self.pg1, worker=0)
61         stats = policer.get_stats()
62
63         # Single rate, 2 colour policer - expect conform, violate but no exceed
64         self.assertGreater(stats['conform_packets'], 0)
65         self.assertEqual(stats['exceed_packets'], 0)
66         self.assertGreater(stats['violate_packets'], 0)
67
68         # Stop policing on pg0
69         policer.apply_vpp_config(sw_if_index, dir, False)
70
71         rx = self.send_and_expect(self.pg0, pkts, self.pg1, worker=0)
72
73         statsnew = policer.get_stats()
74
75         # No new packets counted
76         self.assertEqual(stats, statsnew)
77
78         policer.remove_vpp_config()
79
80     def test_policer_input(self):
81         """ Input Policing """
82         self.policer_interface_test(Dir.RX)
83
84     def test_policer_output(self):
85         """ Output Policing """
86         self.policer_interface_test(Dir.TX)
87
88     def policer_handoff_test(self, dir: Dir):
89         pkts = self.pkt * NUM_PKTS
90
91         action_tx = PolicerAction(
92             VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT,
93             0)
94         policer = VppPolicer(self, "pol2", 80, 0, 1000, 0,
95                              conform_action=action_tx,
96                              exceed_action=action_tx,
97                              violate_action=action_tx)
98         policer.add_vpp_config()
99
100         sw_if_index = (self.pg0.sw_if_index
101                        if dir == Dir.RX
102                        else self.pg1.sw_if_index)
103
104         # Bind the policer to worker 1
105         policer.bind_vpp_config(1, True)
106
107         # Start policing on pg0
108         policer.apply_vpp_config(sw_if_index, dir, True)
109
110         for worker in [0, 1]:
111             self.send_and_expect(self.pg0, pkts, self.pg1, worker=worker)
112             self.logger.debug(self.vapi.cli("show trace max 100"))
113
114         stats = policer.get_stats()
115         stats0 = policer.get_stats(worker=0)
116         stats1 = policer.get_stats(worker=1)
117
118         # Worker 1, should have done all the policing
119         self.assertEqual(stats, stats1)
120
121         # Worker 0, should have handed everything off
122         self.assertEqual(stats0['conform_packets'], 0)
123         self.assertEqual(stats0['exceed_packets'], 0)
124         self.assertEqual(stats0['violate_packets'], 0)
125
126         # Unbind the policer from worker 1 and repeat
127         policer.bind_vpp_config(1, False)
128         for worker in [0, 1]:
129             self.send_and_expect(self.pg0, pkts, self.pg1, worker=worker)
130             self.logger.debug(self.vapi.cli("show trace max 100"))
131
132         # The policer should auto-bind to worker 0 when packets arrive
133         stats = policer.get_stats()
134
135         # The 2 workers should now have policed the same amount
136         stats = policer.get_stats()
137         stats0 = policer.get_stats(worker=0)
138         stats1 = policer.get_stats(worker=1)
139
140         self.assertGreater(stats0['conform_packets'], 0)
141         self.assertEqual(stats0['exceed_packets'], 0)
142         self.assertGreater(stats0['violate_packets'], 0)
143
144         self.assertGreater(stats1['conform_packets'], 0)
145         self.assertEqual(stats1['exceed_packets'], 0)
146         self.assertGreater(stats1['violate_packets'], 0)
147
148         self.assertEqual(stats0['conform_packets'] + stats1['conform_packets'],
149                          stats['conform_packets'])
150
151         self.assertEqual(stats0['violate_packets'] + stats1['violate_packets'],
152                          stats['violate_packets'])
153
154         # Stop policing on pg0
155         policer.apply_vpp_config(sw_if_index, dir, False)
156
157         policer.remove_vpp_config()
158
159     def test_policer_handoff_input(self):
160         """ Worker thread handoff policer input"""
161         self.policer_handoff_test(Dir.RX)
162
163     def test_policer_handoff_output(self):
164         """ Worker thread handoff policer output"""
165         self.policer_handoff_test(Dir.TX)
166
167
168 if __name__ == '__main__':
169     unittest.main(testRunner=VppTestRunner)