VppMRoutePath, MRouteItfFlags, MRouteEntryFlags, VppMplsIpBind, \
VppMplsTable, VppIpTable
from vpp_sub_interface import VppSubInterface, VppDot1QSubint, VppDot1ADSubint
+from vpp_papi import VppEnum
+
+NUM_PKTS = 67
class TestIPv4(VppTestCase):
""" IPv4 Test Case """
+ @classmethod
+ def setUpClass(cls):
+ super(TestIPv4, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPv4, cls).tearDownClass()
+
def setUp(self):
"""
Perform test setup before test case.
def tearDown(self):
"""Run standard test teardown and log ``show ip arp``."""
super(TestIPv4, self).tearDown()
- if not self.vpp_dead:
- self.logger.info(self.vapi.cli("show ip arp"))
- # info(self.vapi.cli("show ip fib")) # many entries
+
+ def show_commands_at_teardown(self):
+ self.logger.info(self.vapi.cli("show ip arp"))
+ # info(self.vapi.cli("show ip fib")) # many entries
def config_fib_entries(self, count):
"""For each interface add to the FIB table *count* routes to
class TestICMPEcho(VppTestCase):
""" ICMP Echo Test Case """
+ @classmethod
+ def setUpClass(cls):
+ super(TestICMPEcho, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestICMPEcho, cls).tearDownClass()
+
def setUp(self):
super(TestICMPEcho, self).setUp()
super(TestIPv4FibCrud, cls).tearDownClass()
raise
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPv4FibCrud, cls).tearDownClass()
+
def setUp(self):
super(TestIPv4FibCrud, self).setUp()
self.reset_packet_infos()
class TestIPNull(VppTestCase):
""" IPv4 routes via NULL """
+ @classmethod
+ def setUpClass(cls):
+ super(TestIPNull, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPNull, cls).tearDownClass()
+
def setUp(self):
super(TestIPNull, self).setUp()
self.pg1.sw_if_index)])
r1.add_vpp_config()
- rx = self.send_and_expect(self.pg0, p * 65, self.pg1)
+ rx = self.send_and_expect(self.pg0, p * NUM_PKTS, self.pg1)
#
# insert a more specific as a drop
r2 = VppIpRoute(self, "1.1.1.1", 32, [], is_drop=1)
r2.add_vpp_config()
- self.send_and_assert_no_replies(self.pg0, p * 65, "Drop Route")
+ self.send_and_assert_no_replies(self.pg0, p * NUM_PKTS, "Drop Route")
r2.remove_vpp_config()
- rx = self.send_and_expect(self.pg0, p * 65, self.pg1)
+ rx = self.send_and_expect(self.pg0, p * NUM_PKTS, self.pg1)
class TestIPDisabled(VppTestCase):
""" IPv4 disabled """
+ @classmethod
+ def setUpClass(cls):
+ super(TestIPDisabled, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPDisabled, cls).tearDownClass()
+
def setUp(self):
super(TestIPDisabled, self).setUp()
class TestIPSubNets(VppTestCase):
""" IPv4 Subnets """
+ @classmethod
+ def setUpClass(cls):
+ super(TestIPSubNets, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPSubNets, cls).tearDownClass()
+
def setUp(self):
super(TestIPSubNets, self).setUp()
# create a 2 pg interfaces
self.create_pg_interfaces(range(2))
- # pg0 we will use to experiemnt
+ # pg0 we will use to experiment
self.pg0.admin_up()
# pg1 is setup normally
class TestIPLoadBalance(VppTestCase):
""" IPv4 Load-Balancing """
+ @classmethod
+ def setUpClass(cls):
+ super(TestIPLoadBalance, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPLoadBalance, cls).tearDownClass()
+
def setUp(self):
super(TestIPLoadBalance, self).setUp()
input.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
+ rxs = []
for oo in outputs:
rx = oo._get_capture(1)
self.assertNotEqual(0, len(rx))
+ for r in rx:
+ rxs.append(r)
+ return rxs
def send_and_expect_one_itf(self, input, pkts, itf):
input.add_stream(pkts)
src_ip_pkts = []
src_mpls_pkts = []
- for ii in range(65):
+ for ii in range(NUM_PKTS):
port_ip_hdr = (IP(dst="10.0.0.1", src="20.0.0.1") /
UDP(sport=1234, dport=1234 + ii) /
Raw('\xa5' * 100))
# src,dst
# We are not going to ensure equal amounts of packets across each link,
# since the hash algorithm is statistical and therefore this can never
- # be guaranteed. But wuth 64 different packets we do expect some
+ # be guaranteed. But with 64 different packets we do expect some
# balancing. So instead just ensure there is traffic on each link.
#
self.send_and_expect_load_balancing(self.pg0, port_ip_pkts,
[self.pg1, self.pg2,
self.pg3, self.pg4])
+ #
+ # bring down pg1 expect LB to adjust to use only those that are pu
+ #
+ self.pg1.link_down()
+
+ rx = self.send_and_expect_load_balancing(self.pg0, src_pkts,
+ [self.pg2, self.pg3,
+ self.pg4])
+ self.assertEqual(len(src_pkts), len(rx))
+
+ #
+ # bring down pg2 expect LB to adjust to use only those that are pu
+ #
+ self.pg2.link_down()
+
+ rx = self.send_and_expect_load_balancing(self.pg0, src_pkts,
+ [self.pg3, self.pg4])
+ self.assertEqual(len(src_pkts), len(rx))
+
+ #
+ # bring the links back up - expect LB over all again
+ #
+ self.pg1.link_up()
+ self.pg2.link_up()
+
+ rx = self.send_and_expect_load_balancing(self.pg0, src_pkts,
+ [self.pg1, self.pg2,
+ self.pg3, self.pg4])
+ self.assertEqual(len(src_pkts), len(rx))
+
+ #
+ # The same link-up/down but this time admin state
+ #
+ self.pg1.admin_down()
+ self.pg2.admin_down()
+ rx = self.send_and_expect_load_balancing(self.pg0, src_pkts,
+ [self.pg3, self.pg4])
+ self.assertEqual(len(src_pkts), len(rx))
+ self.pg1.admin_up()
+ self.pg2.admin_up()
+ self.pg1.resolve_arp()
+ self.pg2.resolve_arp()
+ rx = self.send_and_expect_load_balancing(self.pg0, src_pkts,
+ [self.pg1, self.pg2,
+ self.pg3, self.pg4])
+ self.assertEqual(len(src_pkts), len(rx))
+
#
# Recursive prefixes
# - testing that 2 stages of load-balancing, no choices
route_1_1_1_2.add_vpp_config()
#
- # inject the packet on pg0 - expect load-balancing across all 4 paths
+ # inject the packet on pg0 - rx only on via routes output interface
#
self.vapi.cli("clear trace")
self.send_and_expect_one_itf(self.pg0, port_pkts, self.pg3)
+ #
+ # Add a LB route in the presence of a down link - expect no
+ # packets over the down link
+ #
+ self.pg3.link_down()
+
+ route_10_0_0_3 = VppIpRoute(self, "10.0.0.3", 32,
+ [VppRoutePath(self.pg3.remote_ip4,
+ self.pg3.sw_if_index),
+ VppRoutePath(self.pg4.remote_ip4,
+ self.pg4.sw_if_index)])
+ route_10_0_0_3.add_vpp_config()
+
+ port_pkts = []
+ for ii in range(257):
+ port_pkts.append(Ether(src=self.pg0.remote_mac,
+ dst=self.pg0.local_mac) /
+ IP(dst="10.0.0.3", src="20.0.0.2") /
+ UDP(sport=1234, dport=1234 + ii) /
+ Raw('\xa5' * 100))
+
+ self.send_and_expect_one_itf(self.pg0, port_pkts, self.pg4)
+
+ # bring the link back up
+ self.pg3.link_up()
+
+ rx = self.send_and_expect_load_balancing(self.pg0, port_pkts,
+ [self.pg3, self.pg4])
+ self.assertEqual(len(src_pkts), len(rx))
+
class TestIPVlan0(VppTestCase):
""" IPv4 VLAN-0 """
+ @classmethod
+ def setUpClass(cls):
+ super(TestIPVlan0, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPVlan0, cls).tearDownClass()
+
def setUp(self):
super(TestIPVlan0, self).setUp()
IP(dst=self.pg1.remote_ip4,
src=self.pg0.remote_ip4) /
UDP(sport=1234, dport=1234) /
- Raw('\xa5' * 100)) * 65
+ Raw('\xa5' * 100)) * NUM_PKTS
#
# Expect that packets sent on VLAN-0 are forwarded on the
class TestIPPunt(VppTestCase):
""" IPv4 Punt Police/Redirect """
+ @classmethod
+ def setUpClass(cls):
+ super(TestIPPunt, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPPunt, cls).tearDownClass()
+
def setUp(self):
super(TestIPPunt, self).setUp()
def test_ip_punt(self):
""" IP punt police and redirect """
+ # use UDP packet that have a port we need to explicitly
+ # register to get punted.
+ pt_l4 = VppEnum.vl_api_punt_type_t.PUNT_API_TYPE_L4
+ af_ip4 = VppEnum.vl_api_address_family_t.ADDRESS_IP4
+ udp_proto = VppEnum.vl_api_ip_proto_t.IP_API_PROTO_UDP
+ punt_udp = {
+ 'type': pt_l4,
+ 'punt': {
+ 'l4': {
+ 'af': af_ip4,
+ 'protocol': udp_proto,
+ 'port': 1234,
+ }
+ }
+ }
+
+ self.vapi.set_punt(is_add=1, punt=punt_udp)
+
p = (Ether(src=self.pg0.remote_mac,
dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
- TCP(sport=1234, dport=1234) /
+ UDP(sport=1234, dport=1234) /
Raw('\xa5' * 100))
pkts = p * 1025
self.pg_start()
#
- # the number of packet recieved should be greater than 0,
+ # the number of packet received should be greater than 0,
# but not equal to the number sent, since some were policed
#
rx = self.pg1._get_capture(1)
self.assertLess(len(rx), len(pkts))
#
- # remove the poilcer. back to full rx
+ # remove the policer. back to full rx
#
self.vapi.ip_punt_police(policer.policer_index, is_add=0)
self.vapi.policer_add_del(b"ip4-punt", 400, 0, 10, 0,
class TestIPDeag(VppTestCase):
""" IPv4 Deaggregate Routes """
+ @classmethod
+ def setUpClass(cls):
+ super(TestIPDeag, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPDeag, cls).tearDownClass()
+
def setUp(self):
super(TestIPDeag, self).setUp()
class TestIPInput(VppTestCase):
""" IPv4 Input Exceptions """
+ @classmethod
+ def setUpClass(cls):
+ super(TestIPInput, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPInput, cls).tearDownClass()
+
def setUp(self):
super(TestIPInput, self).setUp()
UDP(sport=1234, dport=1234) /
Raw('\xa5' * 100))
- rx = self.send_and_expect(self.pg0, p_short * 65, self.pg1)
+ rx = self.send_and_expect(self.pg0, p_short * NUM_PKTS, self.pg1)
#
# Packet too long - this is dropped
UDP(sport=1234, dport=1234) /
Raw('\xa5' * 100))
- rx = self.send_and_assert_no_replies(self.pg0, p_long * 65,
+ rx = self.send_and_assert_no_replies(self.pg0, p_long * NUM_PKTS,
"too long")
#
UDP(sport=1234, dport=1234) /
Raw('\xa5' * 100))
- rx = self.send_and_assert_no_replies(self.pg0, p_chksum * 65,
+ rx = self.send_and_assert_no_replies(self.pg0, p_chksum * NUM_PKTS,
"bad checksum")
#
UDP(sport=1234, dport=1234) /
Raw('\xa5' * 100))
- rx = self.send_and_assert_no_replies(self.pg0, p_ver * 65,
+ rx = self.send_and_assert_no_replies(self.pg0, p_ver * NUM_PKTS,
"funky version")
#
UDP(sport=1234, dport=1234) /
Raw('\xa5' * 100))
- rx = self.send_and_assert_no_replies(self.pg0, p_frag * 65,
+ rx = self.send_and_assert_no_replies(self.pg0, p_frag * NUM_PKTS,
"frag offset")
#
UDP(sport=1234, dport=1234) /
Raw('\xa5' * 100))
- rx = self.send_and_expect(self.pg0, p_ttl * 65, self.pg0)
+ rx = self.send_and_expect(self.pg0, p_ttl * NUM_PKTS, self.pg0)
rx = rx[0]
icmp = rx[ICMP]
self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [1500, 0, 0, 0])
- rx = self.send_and_expect(self.pg0, p_mtu * 65, self.pg0)
+ rx = self.send_and_expect(self.pg0, p_mtu * NUM_PKTS, self.pg0)
rx = rx[0]
icmp = rx[ICMP]
self.assertEqual(icmp.dst, self.pg1.remote_ip4)
self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [2500, 0, 0, 0])
- rx = self.send_and_expect(self.pg0, p_mtu * 65, self.pg1)
+ rx = self.send_and_expect(self.pg0, p_mtu * NUM_PKTS, self.pg1)
# Reset MTU for subsequent tests
self.vapi.sw_interface_set_mtu(self.pg1.sw_if_index, [9000, 0, 0, 0])
class TestIPDirectedBroadcast(VppTestCase):
""" IPv4 Directed Broadcast """
+ @classmethod
+ def setUpClass(cls):
+ super(TestIPDirectedBroadcast, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPDirectedBroadcast, cls).tearDownClass()
+
def setUp(self):
super(TestIPDirectedBroadcast, self).setUp()
#
# test packet is L2 broadcast
#
- rx = self.send_and_expect(self.pg1, p0 * 65, self.pg0)
+ rx = self.send_and_expect(self.pg1, p0 * NUM_PKTS, self.pg0)
self.assertTrue(rx[0][Ether].dst, "ff:ff:ff:ff:ff:ff")
- self.send_and_assert_no_replies(self.pg0, p1 * 65,
+ self.send_and_assert_no_replies(self.pg0, p1 * NUM_PKTS,
"directed broadcast disabled")
#
#
self.vapi.sw_interface_set_ip_directed_broadcast(
self.pg0.sw_if_index, 0)
- self.send_and_assert_no_replies(self.pg1, p0 * 65,
+ self.send_and_assert_no_replies(self.pg1, p0 * NUM_PKTS,
"directed broadcast disabled")
self.vapi.sw_interface_set_ip_directed_broadcast(
self.pg0.sw_if_index, 1)
- rx = self.send_and_expect(self.pg1, p0 * 65, self.pg0)
+ rx = self.send_and_expect(self.pg1, p0 * NUM_PKTS, self.pg0)
self.pg0.unconfig_ip4()
self.pg1.unconfig_ip4()
class TestIPLPM(VppTestCase):
""" IPv4 longest Prefix Match """
+ @classmethod
+ def setUpClass(cls):
+ super(TestIPLPM, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPLPM, cls).tearDownClass()
+
def setUp(self):
super(TestIPLPM, self).setUp()
Raw('\xa5' * 2000))
self.logger.info(self.vapi.cli("sh ip fib mtrie"))
- rx = self.send_and_expect(self.pg0, p_8 * 65, self.pg2)
- rx = self.send_and_expect(self.pg0, p_24 * 65, self.pg1)
+ rx = self.send_and_expect(self.pg0, p_8 * NUM_PKTS, self.pg2)
+ rx = self.send_and_expect(self.pg0, p_24 * NUM_PKTS, self.pg1)
class TestIPv4Frag(VppTestCase):
i.config_ip4()
i.resolve_arp()
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPv4Frag, cls).tearDownClass()
+
def test_frag_large_packets(self):
""" Fragmentation of large packets """