2 # Copyright (c) 2021 Graphiant, Inc.
6 from scapy.layers.inet import IP, UDP
7 from scapy.layers.l2 import Ether
8 from scapy.packet import Raw
9 from framework import VppTestCase, VppTestRunner
10 from vpp_papi import VppEnum
11 from vpp_policer import VppPolicer, PolicerAction, Dir
16 class TestPolicerInput(VppTestCase):
17 """ Policer on an interface """
21 super(TestPolicerInput, self).setUp()
23 self.create_pg_interfaces(range(2))
24 for i in self.pg_interfaces:
29 self.pkt = (Ether(src=self.pg0.remote_mac,
30 dst=self.pg0.local_mac) /
31 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
32 UDP(sport=1234, dport=1234) /
36 for i in self.pg_interfaces:
39 super(TestPolicerInput, self).tearDown()
41 def policer_interface_test(self, dir: Dir):
42 pkts = self.pkt * NUM_PKTS
44 action_tx = PolicerAction(
45 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT,
47 policer = VppPolicer(self, "pol1", 80, 0, 1000, 0,
48 conform_action=action_tx,
49 exceed_action=action_tx,
50 violate_action=action_tx)
51 policer.add_vpp_config()
53 sw_if_index = (self.pg0.sw_if_index
55 else self.pg1.sw_if_index)
57 # Start policing on pg0
58 policer.apply_vpp_config(sw_if_index, dir, True)
60 rx = self.send_and_expect(self.pg0, pkts, self.pg1, worker=0)
61 stats = policer.get_stats()
63 # Single rate, 2 colour policer - expect conform, violate but no exceed
64 self.assertGreater(stats['conform_packets'], 0)
65 self.assertEqual(stats['exceed_packets'], 0)
66 self.assertGreater(stats['violate_packets'], 0)
68 # Stop policing on pg0
69 policer.apply_vpp_config(sw_if_index, dir, False)
71 rx = self.send_and_expect(self.pg0, pkts, self.pg1, worker=0)
73 statsnew = policer.get_stats()
75 # No new packets counted
76 self.assertEqual(stats, statsnew)
78 policer.remove_vpp_config()
80 def test_policer_input(self):
81 """ Input Policing """
82 self.policer_interface_test(Dir.RX)
84 def test_policer_output(self):
85 """ Output Policing """
86 self.policer_interface_test(Dir.TX)
88 def policer_handoff_test(self, dir: Dir):
89 pkts = self.pkt * NUM_PKTS
91 action_tx = PolicerAction(
92 VppEnum.vl_api_sse2_qos_action_type_t.SSE2_QOS_ACTION_API_TRANSMIT,
94 policer = VppPolicer(self, "pol2", 80, 0, 1000, 0,
95 conform_action=action_tx,
96 exceed_action=action_tx,
97 violate_action=action_tx)
98 policer.add_vpp_config()
100 sw_if_index = (self.pg0.sw_if_index
102 else self.pg1.sw_if_index)
104 # Bind the policer to worker 1
105 policer.bind_vpp_config(1, True)
107 # Start policing on pg0
108 policer.apply_vpp_config(sw_if_index, dir, True)
110 for worker in [0, 1]:
111 self.send_and_expect(self.pg0, pkts, self.pg1, worker=worker)
112 self.logger.debug(self.vapi.cli("show trace max 100"))
114 stats = policer.get_stats()
115 stats0 = policer.get_stats(worker=0)
116 stats1 = policer.get_stats(worker=1)
118 # Worker 1, should have done all the policing
119 self.assertEqual(stats, stats1)
121 # Worker 0, should have handed everything off
122 self.assertEqual(stats0['conform_packets'], 0)
123 self.assertEqual(stats0['exceed_packets'], 0)
124 self.assertEqual(stats0['violate_packets'], 0)
126 # Unbind the policer from worker 1 and repeat
127 policer.bind_vpp_config(1, False)
128 for worker in [0, 1]:
129 self.send_and_expect(self.pg0, pkts, self.pg1, worker=worker)
130 self.logger.debug(self.vapi.cli("show trace max 100"))
132 # The policer should auto-bind to worker 0 when packets arrive
133 stats = policer.get_stats()
135 # The 2 workers should now have policed the same amount
136 stats = policer.get_stats()
137 stats0 = policer.get_stats(worker=0)
138 stats1 = policer.get_stats(worker=1)
140 self.assertGreater(stats0['conform_packets'], 0)
141 self.assertEqual(stats0['exceed_packets'], 0)
142 self.assertGreater(stats0['violate_packets'], 0)
144 self.assertGreater(stats1['conform_packets'], 0)
145 self.assertEqual(stats1['exceed_packets'], 0)
146 self.assertGreater(stats1['violate_packets'], 0)
148 self.assertEqual(stats0['conform_packets'] + stats1['conform_packets'],
149 stats['conform_packets'])
151 self.assertEqual(stats0['violate_packets'] + stats1['violate_packets'],
152 stats['violate_packets'])
154 # Stop policing on pg0
155 policer.apply_vpp_config(sw_if_index, dir, False)
157 policer.remove_vpp_config()
159 def test_policer_handoff_input(self):
160 """ Worker thread handoff policer input"""
161 self.policer_handoff_test(Dir.RX)
163 def test_policer_handoff_output(self):
164 """ Worker thread handoff policer output"""
165 self.policer_handoff_test(Dir.TX)
168 if __name__ == '__main__':
169 unittest.main(testRunner=VppTestRunner)