X-Git-Url: https://gerrit.fd.io/r/gitweb?a=blobdiff_plain;f=test%2Ftest_abf.py;h=856d02a8185d719a5c7e67c16a6d1e9c688f8d2f;hb=6e1eaad216c41ce1cb4af13a2214f4d86e094414;hp=69d561bc19c999a19a83918bc458db410a18e8a1;hpb=f726f539185609cb4f3aee7f34906e8b538cd33e;p=vpp.git diff --git a/test/test_abf.py b/test/test_abf.py index 69d561bc19c..856d02a8185 100644 --- a/test/test_abf.py +++ b/test/test_abf.py @@ -1,19 +1,29 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 from socket import inet_pton, inet_ntop, AF_INET, AF_INET6 import unittest from framework import VppTestCase, VppTestRunner from vpp_ip import DpoProto -from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsLabel, VppIpTable +from vpp_ip_route import ( + VppIpRoute, + VppRoutePath, + VppMplsLabel, + VppIpTable, + FibPathProto, +) +from vpp_acl import AclRule, VppAcl from scapy.packet import Raw from scapy.layers.l2 import Ether from scapy.layers.inet import IP, UDP from scapy.layers.inet6 import IPv6 +from ipaddress import IPv4Network, IPv6Network from vpp_object import VppObject +NUM_PKTS = 67 + def find_abf_policy(test, id): policies = test.vapi.abf_policy_dump() @@ -26,83 +36,53 @@ def find_abf_policy(test, id): def find_abf_itf_attach(test, id, sw_if_index): attachs = test.vapi.abf_itf_attach_dump() for a in attachs: - if id == a.attach.policy_id and \ - sw_if_index == a.attach.sw_if_index: + if id == a.attach.policy_id and sw_if_index == a.attach.sw_if_index: return True return False class VppAbfPolicy(VppObject): - - def __init__(self, - test, - policy_id, - acl, - paths): + def __init__(self, test, policy_id, acl, paths): self._test = test self.policy_id = policy_id self.acl = acl self.paths = paths - - def encode_paths(self): - br_paths = [] - for p in self.paths: - lstack = [] - for l in p.nh_labels: - if type(l) == VppMplsLabel: - lstack.append(l.encode()) - else: - lstack.append({'label': l, 'ttl': 255}) - n_labels = len(lstack) - while (len(lstack) < 16): - lstack.append({}) - br_paths.append({'next_hop': p.nh_addr, - 'weight': 1, - 'afi': p.proto, - 'sw_if_index': 0xffffffff, - 'preference': 0, - 'table_id': p.nh_table_id, - 'next_hop_id': p.next_hop_id, - 'is_udp_encap': p.is_udp_encap, - 'n_labels': n_labels, - 'label_stack': lstack}) - return br_paths + self.encoded_paths = [] + for path in self.paths: + self.encoded_paths.append(path.encode()) def add_vpp_config(self): self._test.vapi.abf_policy_add_del( 1, - {'policy_id': self.policy_id, - 'acl_index': self.acl.acl_index, - 'n_paths': len(self.paths), - 'paths': self.encode_paths()}) + { + "policy_id": self.policy_id, + "acl_index": self.acl.acl_index, + "n_paths": len(self.paths), + "paths": self.encoded_paths, + }, + ) self._test.registry.register(self, self._test.logger) def remove_vpp_config(self): self._test.vapi.abf_policy_add_del( 0, - {'policy_id': self.policy_id, - 'acl_index': self.acl.acl_index, - 'n_paths': len(self.paths), - 'paths': self.encode_paths()}) + { + "policy_id": self.policy_id, + "acl_index": self.acl.acl_index, + "n_paths": len(self.paths), + "paths": self.encoded_paths, + }, + ) def query_vpp_config(self): return find_abf_policy(self._test, self.policy_id) - def __str__(self): - return self.object_id() - def object_id(self): - return ("abf-policy-%d" % self.policy_id) + return "abf-policy-%d" % self.policy_id class VppAbfAttach(VppObject): - - def __init__(self, - test, - policy_id, - sw_if_index, - priority, - is_ipv6=0): + def __init__(self, test, policy_id, sw_if_index, priority, is_ipv6=0): self._test = test self.policy_id = policy_id self.sw_if_index = sw_if_index @@ -112,34 +92,43 @@ class VppAbfAttach(VppObject): def add_vpp_config(self): self._test.vapi.abf_itf_attach_add_del( 1, - {'policy_id': self.policy_id, - 'sw_if_index': self.sw_if_index, - 'priority': self.priority, - 'is_ipv6': self.is_ipv6}) + { + "policy_id": self.policy_id, + "sw_if_index": self.sw_if_index, + "priority": self.priority, + "is_ipv6": self.is_ipv6, + }, + ) self._test.registry.register(self, self._test.logger) def remove_vpp_config(self): self._test.vapi.abf_itf_attach_add_del( 0, - {'policy_id': self.policy_id, - 'sw_if_index': self.sw_if_index, - 'priority': self.priority, - 'is_ipv6': self.is_ipv6}) + { + "policy_id": self.policy_id, + "sw_if_index": self.sw_if_index, + "priority": self.priority, + "is_ipv6": self.is_ipv6, + }, + ) def query_vpp_config(self): - return find_abf_itf_attach(self._test, - self.policy_id, - self.sw_if_index) - - def __str__(self): - return self.object_id() + return find_abf_itf_attach(self._test, self.policy_id, self.sw_if_index) def object_id(self): - return ("abf-attach-%d-%d" % (self.policy_id, self.sw_if_index)) + return "abf-attach-%d-%d" % (self.policy_id, self.sw_if_index) class TestAbf(VppTestCase): - """ ABF Test Case """ + """ABF Test Case""" + + @classmethod + def setUpClass(cls): + super(TestAbf, cls).setUpClass() + + @classmethod + def tearDownClass(cls): + super(TestAbf, cls).tearDownClass() def setUp(self): super(TestAbf, self).setUp() @@ -157,13 +146,11 @@ class TestAbf(VppTestCase): for i in self.pg_interfaces: i.unconfig_ip4() i.unconfig_ip6() - i.ip6_disable() i.admin_down() super(TestAbf, self).tearDown() def test_abf4(self): - """ IPv4 ACL Based Forwarding - """ + """IPv4 ACL Based Forwarding""" # # We are not testing the various matching capabilities @@ -171,32 +158,29 @@ class TestAbf(VppTestCase): # the application of ACLs to a forwarding path to achieve # ABF # So we construct just a few ACLs to ensure the ABF policies - # are correclty constructed and used. And a few path types + # are correctly constructed and used. And a few path types # to test the API path decoding. # # # Rule 1 # - rule_1 = ({'is_permit': 1, - 'is_ipv6': 0, - 'proto': 17, - 'srcport_or_icmptype_first': 1234, - 'srcport_or_icmptype_last': 1234, - 'src_ip_prefix_len': 32, - 'src_ip_addr': inet_pton(AF_INET, "1.1.1.1"), - 'dstport_or_icmpcode_first': 1234, - 'dstport_or_icmpcode_last': 1234, - 'dst_ip_prefix_len': 32, - 'dst_ip_addr': inet_pton(AF_INET, "1.1.1.2")}) - acl_1 = self.vapi.acl_add_replace(acl_index=4294967295, r=[rule_1]) + rule_1 = AclRule( + is_permit=1, + proto=17, + ports=1234, + src_prefix=IPv4Network("1.1.1.1/32"), + dst_prefix=IPv4Network("1.1.1.2/32"), + ) + acl_1 = VppAcl(self, rules=[rule_1]) + acl_1.add_vpp_config() # # ABF policy for ACL 1 - path via interface 1 # - abf_1 = VppAbfPolicy(self, 10, acl_1, - [VppRoutePath(self.pg1.remote_ip4, - self.pg1.sw_if_index)]) + abf_1 = VppAbfPolicy( + self, 10, acl_1, [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index)] + ) abf_1.add_vpp_config() # @@ -209,42 +193,43 @@ class TestAbf(VppTestCase): # fire in packet matching the ACL src,dst. If it's forwarded # then the ABF was successful, since default routing will drop it # - p_1 = (Ether(src=self.pg0.remote_mac, - dst=self.pg0.local_mac) / - IP(src="1.1.1.1", dst="1.1.1.2") / - UDP(sport=1234, dport=1234) / - Raw('\xa5' * 100)) - self.send_and_expect(self.pg0, p_1*65, self.pg1) + p_1 = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IP(src="1.1.1.1", dst="1.1.1.2") + / UDP(sport=1234, dport=1234) + / Raw(b"\xa5" * 100) + ) + self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg1) # # Attach a 'better' priority policy to the same interface # - abf_2 = VppAbfPolicy(self, 11, acl_1, - [VppRoutePath(self.pg2.remote_ip4, - self.pg2.sw_if_index)]) + abf_2 = VppAbfPolicy( + self, 11, acl_1, [VppRoutePath(self.pg2.remote_ip4, self.pg2.sw_if_index)] + ) abf_2.add_vpp_config() attach_2 = VppAbfAttach(self, 11, self.pg0.sw_if_index, 40) attach_2.add_vpp_config() - self.send_and_expect(self.pg0, p_1*65, self.pg2) + self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg2) # # Attach a policy with priority in the middle # - abf_3 = VppAbfPolicy(self, 12, acl_1, - [VppRoutePath(self.pg3.remote_ip4, - self.pg3.sw_if_index)]) + abf_3 = VppAbfPolicy( + self, 12, acl_1, [VppRoutePath(self.pg3.remote_ip4, self.pg3.sw_if_index)] + ) abf_3.add_vpp_config() attach_3 = VppAbfAttach(self, 12, self.pg0.sw_if_index, 45) attach_3.add_vpp_config() - self.send_and_expect(self.pg0, p_1*65, self.pg2) + self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg2) # # remove the best priority # attach_2.remove_vpp_config() - self.send_and_expect(self.pg0, p_1*65, self.pg3) + self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg3) # # Attach one of the same policies to Pg1 @@ -252,19 +237,20 @@ class TestAbf(VppTestCase): attach_4 = VppAbfAttach(self, 12, self.pg1.sw_if_index, 45) attach_4.add_vpp_config() - p_2 = (Ether(src=self.pg1.remote_mac, - dst=self.pg1.local_mac) / - IP(src="1.1.1.1", dst="1.1.1.2") / - UDP(sport=1234, dport=1234) / - Raw('\xa5' * 100)) - self.send_and_expect(self.pg1, p_2 * 65, self.pg3) + p_2 = ( + Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) + / IP(src="1.1.1.1", dst="1.1.1.2") + / UDP(sport=1234, dport=1234) + / Raw(b"\xa5" * 100) + ) + self.send_and_expect(self.pg1, p_2 * NUM_PKTS, self.pg3) # # detach the policy from PG1, now expect traffic to be dropped # attach_4.remove_vpp_config() - self.send_and_assert_no_replies(self.pg1, p_2 * 65, "Detached") + self.send_and_assert_no_replies(self.pg1, p_2 * NUM_PKTS, "Detached") # # Swap to route via a next-hop in the non-default table @@ -277,22 +263,27 @@ class TestAbf(VppTestCase): self.pg4.config_ip4() self.pg4.resolve_arp() - abf_13 = VppAbfPolicy(self, 13, acl_1, - [VppRoutePath(self.pg4.remote_ip4, - 0xffffffff, - nh_table_id=table_20.table_id)]) + abf_13 = VppAbfPolicy( + self, + 13, + acl_1, + [ + VppRoutePath( + self.pg4.remote_ip4, 0xFFFFFFFF, nh_table_id=table_20.table_id + ) + ], + ) abf_13.add_vpp_config() attach_5 = VppAbfAttach(self, 13, self.pg0.sw_if_index, 30) attach_5.add_vpp_config() - self.send_and_expect(self.pg0, p_1*65, self.pg4) + self.send_and_expect(self.pg0, p_1 * NUM_PKTS, self.pg4) self.pg4.unconfig_ip4() self.pg4.set_table_ip4(0) def test_abf6(self): - """ IPv6 ACL Based Forwarding - """ + """IPv6 ACL Based Forwarding""" # # Simple test for matching IPv6 packets @@ -301,63 +292,57 @@ class TestAbf(VppTestCase): # # Rule 1 # - rule_1 = ({'is_permit': 1, - 'is_ipv6': 1, - 'proto': 17, - 'srcport_or_icmptype_first': 1234, - 'srcport_or_icmptype_last': 1234, - 'src_ip_prefix_len': 128, - 'src_ip_addr': inet_pton(AF_INET6, "2001::2"), - 'dstport_or_icmpcode_first': 1234, - 'dstport_or_icmpcode_last': 1234, - 'dst_ip_prefix_len': 128, - 'dst_ip_addr': inet_pton(AF_INET6, "2001::1")}) - acl_1 = self.vapi.acl_add_replace(acl_index=4294967295, - r=[rule_1]) + rule_1 = AclRule( + is_permit=1, + proto=17, + ports=1234, + src_prefix=IPv6Network("2001::2/128"), + dst_prefix=IPv6Network("2001::1/128"), + ) + acl_1 = VppAcl(self, rules=[rule_1]) + acl_1.add_vpp_config() # # ABF policy for ACL 1 - path via interface 1 # - abf_1 = VppAbfPolicy(self, 10, acl_1, - [VppRoutePath("3001::1", - 0xffffffff, - proto=DpoProto.DPO_PROTO_IP6)]) + abf_1 = VppAbfPolicy(self, 10, acl_1, [VppRoutePath("3001::1", 0xFFFFFFFF)]) abf_1.add_vpp_config() - attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, - 45, is_ipv6=True) + attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index, 45, is_ipv6=True) attach_1.add_vpp_config() # # a packet matching the rule # - p = (Ether(src=self.pg0.remote_mac, - dst=self.pg0.local_mac) / - IPv6(src="2001::2", dst="2001::1") / - UDP(sport=1234, dport=1234) / - Raw('\xa5' * 100)) + p = ( + Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) + / IPv6(src="2001::2", dst="2001::1") + / UDP(sport=1234, dport=1234) + / Raw(b"\xa5" * 100) + ) # # packets are dropped because there is no route to the policy's # next hop # - self.send_and_assert_no_replies(self.pg1, p * 65, "no route") + self.send_and_assert_no_replies(self.pg1, p * NUM_PKTS, "no route") # # add a route resolving the next-hop # - route = VppIpRoute(self, "3001::1", 32, - [VppRoutePath(self.pg1.remote_ip6, - self.pg1.sw_if_index, - proto=DpoProto.DPO_PROTO_IP6)], - is_ip6=1) + route = VppIpRoute( + self, + "3001::1", + 32, + [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)], + ) route.add_vpp_config() # # now expect packets forwarded. # - self.send_and_expect(self.pg0, p * 65, self.pg1) + self.send_and_expect(self.pg0, p * NUM_PKTS, self.pg1) -if __name__ == '__main__': +if __name__ == "__main__": unittest.main(testRunner=VppTestRunner)