from framework import VppTestCase, VppTestRunner
from vpp_ip import DpoProto
-from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsLabel
+from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsLabel, \
+ VppIpTable, FibPathProto
from scapy.packet import Raw
from scapy.layers.l2 import Ether
from vpp_object import VppObject
+NUM_PKTS = 67
+
def find_abf_policy(test, id):
policies = test.vapi.abf_policy_dump()
self.policy_id = policy_id
self.acl = acl
self.paths = paths
-
- def encode_paths(self):
- br_paths = []
- for p in self.paths:
- lstack = []
- for l in p.nh_labels:
- if type(l) == VppMplsLabel:
- lstack.append(l.encode())
- else:
- lstack.append({'label': l, 'ttl': 255})
- n_labels = len(lstack)
- while (len(lstack) < 16):
- lstack.append({})
- br_paths.append({'next_hop': p.nh_addr,
- 'weight': 1,
- 'afi': p.proto,
- 'sw_if_index': 0xffffffff,
- 'preference': 0,
- 'table_id': p.nh_table_id,
- 'next_hop_id': p.next_hop_id,
- 'is_udp_encap': p.is_udp_encap,
- 'n_labels': n_labels,
- 'label_stack': lstack})
- return br_paths
+ self.encoded_paths = []
+ for path in self.paths:
+ self.encoded_paths.append(path.encode())
def add_vpp_config(self):
self._test.vapi.abf_policy_add_del(
{'policy_id': self.policy_id,
'acl_index': self.acl.acl_index,
'n_paths': len(self.paths),
- 'paths': self.encode_paths()})
+ 'paths': self.encoded_paths})
self._test.registry.register(self, self._test.logger)
def remove_vpp_config(self):
{'policy_id': self.policy_id,
'acl_index': self.acl.acl_index,
'n_paths': len(self.paths),
- 'paths': self.encode_paths()})
+ 'paths': self.encoded_paths})
def query_vpp_config(self):
return find_abf_policy(self._test, self.policy_id)
- def __str__(self):
- return self.object_id()
-
def object_id(self):
return ("abf-policy-%d" % self.policy_id)
self.policy_id,
self.sw_if_index)
- def __str__(self):
- return self.object_id()
-
def object_id(self):
return ("abf-attach-%d-%d" % (self.policy_id, self.sw_if_index))
class TestAbf(VppTestCase):
""" ABF Test Case """
+ @classmethod
+ def setUpClass(cls):
+ super(TestAbf, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestAbf, cls).tearDownClass()
+
def setUp(self):
super(TestAbf, self).setUp()
- self.create_pg_interfaces(range(4))
+ self.create_pg_interfaces(range(5))
- for i in self.pg_interfaces:
+ for i in self.pg_interfaces[:4]:
i.admin_up()
i.config_ip4()
i.resolve_arp()
# the application of ACLs to a forwarding path to achieve
# ABF
# So we construct just a few ACLs to ensure the ABF policies
- # are correclty constructed and used. And a few path types
+ # are correctly constructed and used. And a few path types
# to test the API path decoding.
#
IP(src="1.1.1.1", dst="1.1.1.2") /
UDP(sport=1234, dport=1234) /
Raw('\xa5' * 100))
- self.send_and_expect(self.pg0, p_1*65, self.pg1)
+ self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg1)
#
# Attach a 'better' priority policy to the same interface
attach_2 = VppAbfAttach(self, 11, self.pg0.sw_if_index, 40)
attach_2.add_vpp_config()
- self.send_and_expect(self.pg0, p_1*65, self.pg2)
+ self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg2)
#
# Attach a policy with priority in the middle
attach_3 = VppAbfAttach(self, 12, self.pg0.sw_if_index, 45)
attach_3.add_vpp_config()
- self.send_and_expect(self.pg0, p_1*65, self.pg2)
+ self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg2)
#
# remove the best priority
#
attach_2.remove_vpp_config()
- self.send_and_expect(self.pg0, p_1*65, self.pg3)
+ self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg3)
#
# Attach one of the same policies to Pg1
IP(src="1.1.1.1", dst="1.1.1.2") /
UDP(sport=1234, dport=1234) /
Raw('\xa5' * 100))
- self.send_and_expect(self.pg1, p_2 * 65, self.pg3)
+ self.send_and_expect(self.pg1, p_2 * NUM_PKTS, self.pg3)
#
# detach the policy from PG1, now expect traffic to be dropped
#
attach_4.remove_vpp_config()
- self.send_and_assert_no_replies(self.pg1, p_2 * 65, "Detached")
+ self.send_and_assert_no_replies(self.pg1, p_2 * NUM_PKTS, "Detached")
+
+ #
+ # Swap to route via a next-hop in the non-default table
+ #
+ table_20 = VppIpTable(self, 20)
+ table_20.add_vpp_config()
+
+ self.pg4.set_table_ip4(table_20.table_id)
+ self.pg4.admin_up()
+ self.pg4.config_ip4()
+ self.pg4.resolve_arp()
+
+ abf_13 = VppAbfPolicy(self, 13, acl_1,
+ [VppRoutePath(self.pg4.remote_ip4,
+ 0xffffffff,
+ nh_table_id=table_20.table_id)])
+ abf_13.add_vpp_config()
+ attach_5 = VppAbfAttach(self, 13, self.pg0.sw_if_index, 30)
+ attach_5.add_vpp_config()
+
+ self.send_and_expect(self.pg0, p_1*NUM_PKTS, self.pg4)
+
+ self.pg4.unconfig_ip4()
+ self.pg4.set_table_ip4(0)
def test_abf6(self):
""" IPv6 ACL Based Forwarding
#
abf_1 = VppAbfPolicy(self, 10, acl_1,
[VppRoutePath("3001::1",
- 0xffffffff,
- proto=DpoProto.DPO_PROTO_IP6)])
+ 0xffffffff)])
abf_1.add_vpp_config()
attach_1 = VppAbfAttach(self, 10, self.pg0.sw_if_index,
# packets are dropped because there is no route to the policy's
# next hop
#
- self.send_and_assert_no_replies(self.pg1, p * 65, "no route")
+ self.send_and_assert_no_replies(self.pg1, p * NUM_PKTS, "no route")
#
# add a route resolving the next-hop
#
route = VppIpRoute(self, "3001::1", 32,
[VppRoutePath(self.pg1.remote_ip6,
- self.pg1.sw_if_index,
- proto=DpoProto.DPO_PROTO_IP6)],
- is_ip6=1)
+ self.pg1.sw_if_index)])
route.add_vpp_config()
#
# now expect packets forwarded.
#
- self.send_and_expect(self.pg0, p * 65, self.pg1)
+ self.send_and_expect(self.pg0, p * NUM_PKTS, self.pg1)
if __name__ == '__main__':