X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_abf.py;h=221a793fed37f653b3ddfe8b313720f45d65130f;hb=097fa66b9;hp=69d561bc19c999a19a83918bc458db410a18e8a1;hpb=f726f539185609cb4f3aee7f34906e8b538cd33e;p=vpp.git diff --git a/test/test_abf.py b/test/test_abf.py index 69d561bc19c..221a793fed3 100644 --- a/test/test_abf.py +++ b/test/test_abf.py @@ -5,7 +5,8 @@ import unittest from framework import VppTestCase, VppTestRunner from vpp_ip import DpoProto -from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsLabel, VppIpTable +from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsLabel, \ + VppIpTable, FibPathProto from scapy.packet import Raw from scapy.layers.l2 import Ether @@ -14,6 +15,8 @@ from scapy.layers.inet6 import IPv6 from vpp_object import VppObject +NUM_PKTS = 67 + def find_abf_policy(test, id): policies = test.vapi.abf_policy_dump() @@ -43,30 +46,9 @@ class VppAbfPolicy(VppObject): self.policy_id = policy_id self.acl = acl self.paths = paths - - def encode_paths(self): - br_paths = [] - for p in self.paths: - lstack = [] - for l in p.nh_labels: - if type(l) == VppMplsLabel: - lstack.append(l.encode()) - else: - lstack.append({'label': l, 'ttl': 255}) - n_labels = len(lstack) - while (len(lstack) < 16): - lstack.append({}) - br_paths.append({'next_hop': p.nh_addr, - 'weight': 1, - 'afi': p.proto, - 'sw_if_index': 0xffffffff, - 'preference': 0, - 'table_id': p.nh_table_id, - 'next_hop_id': p.next_hop_id, - 'is_udp_encap': p.is_udp_encap, - 'n_labels': n_labels, - 'label_stack': lstack}) - return br_paths + self.encoded_paths = [] + for path in self.paths: + self.encoded_paths.append(path.encode()) def add_vpp_config(self): self._test.vapi.abf_policy_add_del( @@ -74,7 +56,7 @@ class VppAbfPolicy(VppObject): {'policy_id': self.policy_id, 'acl_index': self.acl.acl_index, 'n_paths': len(self.paths), - 'paths': self.encode_paths()}) + 'paths': self.encoded_paths}) self._test.registry.register(self, self._test.logger) def remove_vpp_config(self): @@ -83,14 +65,11 @@ class VppAbfPolicy(VppObject): {'policy_id': self.policy_id, 'acl_index': self.acl.acl_index, 'n_paths': len(self.paths), - 'paths': self.encode_paths()}) + 'paths': self.encoded_paths}) def query_vpp_config(self): return find_abf_policy(self._test, self.policy_id) - def __str__(self): - return self.object_id() - def object_id(self): return ("abf-policy-%d" % self.policy_id) @@ -131,9 +110,6 @@ class VppAbfAttach(VppObject): self.policy_id, self.sw_if_index) - def __str__(self): - return self.object_id() - def object_id(self): return ("abf-attach-%d-%d" % (self.policy_id, self.sw_if_index)) @@ -141,6 +117,14 @@ class VppAbfAttach(VppObject): class TestAbf(VppTestCase): """ ABF Test Case """ + @classmethod + def setUpClass(cls): + super(TestAbf, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(TestAbf, cls).tearDownClass() + def setUp(self): super(TestAbf, self).setUp() @@ -171,7 +155,7 @@ class TestAbf(VppTestCase): # the application of ACLs to a forwarding path to achieve # ABF # So we construct just a few ACLs to ensure the ABF policies - # are correclty constructed and used. And a few path types + # are correctly constructed and used. And a few path types # to test the API path decoding. # @@ -214,7 +198,7 @@ class TestAbf(VppTestCase): IP(src="1.1.1.1", dst="1.1.1.2") / UDP(sport=1234, dport=1234) / Raw('\xa5' * 100)) - self.send_and_expect(self.pg0, p_1*65, self.pg1) + self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg1) # # Attach a 'better' priority policy to the same interface @@ -226,7 +210,7 @@ class TestAbf(VppTestCase): attach_2 = VppAbfAttach(self, 11, self.pg0.sw_if_index, 40) attach_2.add_vpp_config() - self.send_and_expect(self.pg0, p_1*65, self.pg2) + self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg2) # # Attach a policy with priority in the middle @@ -238,13 +222,13 @@ class TestAbf(VppTestCase): attach_3 = VppAbfAttach(self, 12, self.pg0.sw_if_index, 45) attach_3.add_vpp_config() - self.send_and_expect(self.pg0, p_1*65, self.pg2) + self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg2) # # remove the best priority # attach_2.remove_vpp_config() - self.send_and_expect(self.pg0, p_1*65, self.pg3) + self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg3) # # Attach one of the same policies to Pg1 @@ -257,14 +241,14 @@ class TestAbf(VppTestCase): IP(src="1.1.1.1", dst="1.1.1.2") / UDP(sport=1234, dport=1234) / Raw('\xa5' * 100)) - self.send_and_expect(self.pg1, p_2 * 65, self.pg3) + self.send_and_expect(self.pg1, p_2 * NUM_PKTS, self.pg3) # # detach the policy from PG1, now expect traffic to be dropped # attach_4.remove_vpp_config() - self.send_and_assert_no_replies(self.pg1, p_2 * 65, "Detached") + self.send_and_assert_no_replies(self.pg1, p_2 * NUM_PKTS, "Detached") # # Swap to route via a next-hop in the non-default table @@ -285,7 +269,7 @@ class TestAbf(VppTestCase): attach_5 = VppAbfAttach(self, 13, self.pg0.sw_if_index, 30) attach_5.add_vpp_config() - self.send_and_expect(self.pg0, p_1*65, self.pg4) + self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg4) self.pg4.unconfig_ip4() self.pg4.set_table_ip4(0) @@ -320,8 +304,7 @@ class TestAbf(VppTestCase): # abf_1 = VppAbfPolicy(self, 10, acl_1, [VppRoutePath("3001::1", - 0xffffffff, - proto=DpoProto.DPO_PROTO_IP6)]) + 0xffffffff)]) abf_1.add_vpp_config() attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, @@ -341,22 +324,20 @@ class TestAbf(VppTestCase): # packets are dropped because there is no route to the policy's # next hop # - self.send_and_assert_no_replies(self.pg1, p * 65, "no route") + self.send_and_assert_no_replies(self.pg1, p * NUM_PKTS, "no route") # # add a route resolving the next-hop # route = VppIpRoute(self, "3001::1", 32, [VppRoutePath(self.pg1.remote_ip6, - self.pg1.sw_if_index, - proto=DpoProto.DPO_PROTO_IP6)], - is_ip6=1) + self.pg1.sw_if_index)]) route.add_vpp_config() # # now expect packets forwarded. # - self.send_and_expect(self.pg0, p * 65, self.pg1) + self.send_and_expect(self.pg0, p * NUM_PKTS, self.pg1) if __name__ == '__main__':