import socket
import unittest
+import scapy.compat
from scapy.contrib.mpls import MPLS
from scapy.layers.inet import IP, UDP, TCP, ICMP, icmptypes, icmpcodes
from scapy.layers.l2 import Ether, Dot1Q, ARP
class TestIPv4(VppTestCase):
""" IPv4 Test Case """
+ @classmethod
+ def setUpClass(cls):
+ super(TestIPv4, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPv4, cls).tearDownClass()
+
def setUp(self):
"""
Perform test setup before test case.
def tearDown(self):
"""Run standard test teardown and log ``show ip arp``."""
super(TestIPv4, self).tearDown()
- if not self.vpp_dead:
- self.logger.info(self.vapi.cli("show ip arp"))
- # info(self.vapi.cli("show ip fib")) # many entries
+
+ def show_commands_at_teardown(self):
+ self.logger.info(self.vapi.cli("show ip arp"))
+ # info(self.vapi.cli("show ip fib")) # many entries
def config_fib_entries(self, count):
"""For each interface add to the FIB table *count* routes to
for i in self.interfaces:
next_hop_address = i.local_ip4n
for j in range(count / n_int):
- self.vapi.ip_add_del_route(
- dest_addr, dest_addr_len, next_hop_address)
+ self.vapi.ip_add_del_route(dst_address=dest_addr,
+ dst_address_length=dest_addr_len,
+ next_hop_address=next_hop_address)
counter += 1
if counter / count * 100 > percent:
self.logger.info("Configure %d FIB entries .. %d%% done" %
try:
ip = packet[IP]
udp = packet[UDP]
- payload_info = self.payload_to_info(str(packet[Raw]))
+ payload_info = self.payload_to_info(packet[Raw])
packet_index = payload_info.index
self.assertEqual(payload_info.dst, dst_sw_if_index)
self.logger.debug(
class TestICMPEcho(VppTestCase):
""" ICMP Echo Test Case """
+ @classmethod
+ def setUpClass(cls):
+ super(TestICMPEcho, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestICMPEcho, cls).tearDownClass()
+
def setUp(self):
super(TestICMPEcho, self).setUp()
dest_addr_len = 32
n_next_hop_addr = socket.inet_pton(socket.AF_INET, next_hop_addr)
for _ in range(count):
- n_dest_addr = '{:08x}'.format(dest_addr).decode('hex')
- self.vapi.ip_add_del_route(n_dest_addr, dest_addr_len,
- n_next_hop_addr)
+ n_dest_addr = binascii.unhexlify('{:08x}'.format(dest_addr))
+ self.vapi.ip_add_del_route(dst_address=n_dest_addr,
+ dst_address_length=dest_addr_len,
+ next_hop_address=n_next_hop_addr)
added_ips.append(socket.inet_ntoa(n_dest_addr))
dest_addr += 1
return added_ips
dest_addr_len = 32
n_next_hop_addr = socket.inet_pton(socket.AF_INET, next_hop_addr)
for _ in range(count):
- n_dest_addr = '{:08x}'.format(dest_addr).decode('hex')
- self.vapi.ip_add_del_route(n_dest_addr, dest_addr_len,
- n_next_hop_addr, is_add=0)
+ n_dest_addr = binascii.unhexlify('{:08x}'.format(dest_addr))
+ self.vapi.ip_add_del_route(dst_address=n_dest_addr,
+ dst_address_length=dest_addr_len,
+ next_hop_address=n_next_hop_addr,
+ is_add=0)
removed_ips.append(socket.inet_ntoa(n_dest_addr))
dest_addr += 1
return removed_ips
def _find_ip_match(self, find_in, pkt):
for p in find_in:
- if self.payload_to_info(str(p[Raw])) == \
- self.payload_to_info(str(pkt[Raw])):
+ if self.payload_to_info(p[Raw]) == \
+ self.payload_to_info(pkt[Raw]):
if p[IP].src != pkt[IP].src:
break
if p[IP].dst != pkt[IP].dst:
for ip in ips:
self.assertTrue(_ip_in_route_dump(ip, fib_dump),
- 'IP {} is not in fib dump.'.format(ip))
+ 'IP {!s} is not in fib dump.'.format(ip))
def verify_not_in_route_dump(self, fib_dump, ips):
for ip in ips:
self.assertFalse(_ip_in_route_dump(ip, fib_dump),
- 'IP {} is in fib dump.'.format(ip))
+ 'IP {!s} is in fib dump.'.format(ip))
@classmethod
def setUpClass(cls):
super(TestIPv4FibCrud, cls).tearDownClass()
raise
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPv4FibCrud, cls).tearDownClass()
+
def setUp(self):
super(TestIPv4FibCrud, self).setUp()
self.reset_packet_infos()
+ self.configured_routes = []
+ self.deleted_routes = []
+
def test_1_add_routes(self):
""" Add 1k routes
- delete 10 routes check with traffic script.
"""
+ # config 1M FIB entries
+ self.configured_routes.extend(self.config_fib_many_to_one(
+ "10.0.0.0", self.pg0.remote_ip4, 100))
self.deleted_routes.extend(self.unconfig_fib_many_to_one(
"10.0.0.10", self.pg0.remote_ip4, 10))
for x in self.deleted_routes:
- re-add 5 routes check with traffic script.
- add 100 routes check with traffic script.
"""
+ # config 1M FIB entries
+ self.configured_routes.extend(self.config_fib_many_to_one(
+ "10.0.0.0", self.pg0.remote_ip4, 100))
+ self.deleted_routes.extend(self.unconfig_fib_many_to_one(
+ "10.0.0.10", self.pg0.remote_ip4, 10))
+ for x in self.deleted_routes:
+ self.configured_routes.remove(x)
+
tmp = self.config_fib_many_to_one(
"10.0.0.10", self.pg0.remote_ip4, 5)
self.configured_routes.extend(tmp)
class TestIPNull(VppTestCase):
""" IPv4 routes via NULL """
+ @classmethod
+ def setUpClass(cls):
+ super(TestIPNull, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPNull, cls).tearDownClass()
+
def setUp(self):
super(TestIPNull, self).setUp()
class TestIPDisabled(VppTestCase):
""" IPv4 disabled """
+ @classmethod
+ def setUpClass(cls):
+ super(TestIPDisabled, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPDisabled, cls).tearDownClass()
+
def setUp(self):
super(TestIPDisabled, self).setUp()
class TestIPSubNets(VppTestCase):
""" IPv4 Subnets """
+ @classmethod
+ def setUpClass(cls):
+ super(TestIPSubNets, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPSubNets, cls).tearDownClass()
+
def setUp(self):
super(TestIPSubNets, self).setUp()
# create a 2 pg interfaces
self.create_pg_interfaces(range(2))
- # pg0 we will use to experiemnt
+ # pg0 we will use to experiment
self.pg0.admin_up()
# pg1 is setup normally
#
ip_addr_n = socket.inet_pton(socket.AF_INET, "10.10.10.10")
- self.vapi.sw_interface_add_del_address(self.pg0.sw_if_index,
- ip_addr_n,
- 16)
+ self.vapi.sw_interface_add_del_address(
+ sw_if_index=self.pg0.sw_if_index, address=ip_addr_n,
+ address_length=16)
pn = (Ether(src=self.pg1.remote_mac,
dst=self.pg1.local_mac) /
self.send_and_assert_no_replies(self.pg1, pb, "IP Broadcast address")
# remove the sub-net and we are forwarding via the cover again
- self.vapi.sw_interface_add_del_address(self.pg0.sw_if_index,
- ip_addr_n,
- 16,
- is_add=0)
+ self.vapi.sw_interface_add_del_address(
+ sw_if_index=self.pg0.sw_if_index, address=ip_addr_n,
+ address_length=16, is_add=0)
self.pg1.add_stream(pn)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
#
ip_addr_n = socket.inet_pton(socket.AF_INET, "10.10.10.10")
- self.vapi.sw_interface_add_del_address(self.pg0.sw_if_index,
- ip_addr_n,
- 31)
+ self.vapi.sw_interface_add_del_address(
+ sw_if_index=self.pg0.sw_if_index, address=ip_addr_n,
+ address_length=31)
pn = (Ether(src=self.pg1.remote_mac,
dst=self.pg1.local_mac) /
rx[ARP]
# remove the sub-net and we are forwarding via the cover again
- self.vapi.sw_interface_add_del_address(self.pg0.sw_if_index,
- ip_addr_n,
- 31,
- is_add=0)
+ self.vapi.sw_interface_add_del_address(
+ sw_if_index=self.pg0.sw_if_index, address=ip_addr_n,
+ address_length=31, is_add=0)
self.pg1.add_stream(pn)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
class TestIPLoadBalance(VppTestCase):
""" IPv4 Load-Balancing """
+ @classmethod
+ def setUpClass(cls):
+ super(TestIPLoadBalance, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPLoadBalance, cls).tearDownClass()
+
def setUp(self):
super(TestIPLoadBalance, self).setUp()
input.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
+ rxs = []
for oo in outputs:
rx = oo._get_capture(1)
self.assertNotEqual(0, len(rx))
+ for r in rx:
+ rxs.append(r)
+ return rxs
def send_and_expect_one_itf(self, input, pkts, itf):
input.add_stream(pkts)
# src,dst
# We are not going to ensure equal amounts of packets across each link,
# since the hash algorithm is statistical and therefore this can never
- # be guaranteed. But wuth 64 different packets we do expect some
+ # be guaranteed. But with 64 different packets we do expect some
# balancing. So instead just ensure there is traffic on each link.
#
self.send_and_expect_load_balancing(self.pg0, port_ip_pkts,
# - now only the stream with differing source address will
# load-balance
#
- self.vapi.set_ip_flow_hash(0, src=1, dst=1, sport=0, dport=0)
+ self.vapi.set_ip_flow_hash(vrf_id=0, src=1, dst=1, sport=0, dport=0)
self.send_and_expect_load_balancing(self.pg0, src_ip_pkts,
[self.pg1, self.pg2])
#
# change the flow hash config back to defaults
#
- self.vapi.set_ip_flow_hash(0, src=1, dst=1, sport=1, dport=1)
+ self.vapi.set_ip_flow_hash(vrf_id=0, src=1, dst=1, sport=1, dport=1)
#
# Recursive prefixes
[self.pg1, self.pg2,
self.pg3, self.pg4])
+ #
+ # bring down pg1 expect LB to adjust to use only those that are pu
+ #
+ self.pg1.link_down()
+
+ rx = self.send_and_expect_load_balancing(self.pg0, src_pkts,
+ [self.pg2, self.pg3,
+ self.pg4])
+ self.assertEqual(len(src_pkts), len(rx))
+
+ #
+ # bring down pg2 expect LB to adjust to use only those that are pu
+ #
+ self.pg2.link_down()
+
+ rx = self.send_and_expect_load_balancing(self.pg0, src_pkts,
+ [self.pg3, self.pg4])
+ self.assertEqual(len(src_pkts), len(rx))
+
+ #
+ # bring the links back up - expect LB over all again
+ #
+ self.pg1.link_up()
+ self.pg2.link_up()
+
+ rx = self.send_and_expect_load_balancing(self.pg0, src_pkts,
+ [self.pg1, self.pg2,
+ self.pg3, self.pg4])
+ self.assertEqual(len(src_pkts), len(rx))
+
+ #
+ # The same link-up/down but this time admin state
+ #
+ self.pg1.admin_down()
+ self.pg2.admin_down()
+ rx = self.send_and_expect_load_balancing(self.pg0, src_pkts,
+ [self.pg3, self.pg4])
+ self.assertEqual(len(src_pkts), len(rx))
+ self.pg1.admin_up()
+ self.pg2.admin_up()
+ self.pg1.resolve_arp()
+ self.pg2.resolve_arp()
+ rx = self.send_and_expect_load_balancing(self.pg0, src_pkts,
+ [self.pg1, self.pg2,
+ self.pg3, self.pg4])
+ self.assertEqual(len(src_pkts), len(rx))
+
#
# Recursive prefixes
# - testing that 2 stages of load-balancing, no choices
route_1_1_1_2.add_vpp_config()
#
- # inject the packet on pg0 - expect load-balancing across all 4 paths
+ # inject the packet on pg0 - rx only on via routes output interface
#
self.vapi.cli("clear trace")
self.send_and_expect_one_itf(self.pg0, port_pkts, self.pg3)
+ #
+ # Add a LB route in the presence of a down link - expect no
+ # packets over the down link
+ #
+ self.pg3.link_down()
+
+ route_10_0_0_3 = VppIpRoute(self, "10.0.0.3", 32,
+ [VppRoutePath(self.pg3.remote_ip4,
+ self.pg3.sw_if_index),
+ VppRoutePath(self.pg4.remote_ip4,
+ self.pg4.sw_if_index)])
+ route_10_0_0_3.add_vpp_config()
+
+ port_pkts = []
+ for ii in range(257):
+ port_pkts.append(Ether(src=self.pg0.remote_mac,
+ dst=self.pg0.local_mac) /
+ IP(dst="10.0.0.3", src="20.0.0.2") /
+ UDP(sport=1234, dport=1234 + ii) /
+ Raw('\xa5' * 100))
+
+ self.send_and_expect_one_itf(self.pg0, port_pkts, self.pg4)
+
+ # bring the link back up
+ self.pg3.link_up()
+
+ rx = self.send_and_expect_load_balancing(self.pg0, port_pkts,
+ [self.pg3, self.pg4])
+ self.assertEqual(len(src_pkts), len(rx))
+
class TestIPVlan0(VppTestCase):
""" IPv4 VLAN-0 """
+ @classmethod
+ def setUpClass(cls):
+ super(TestIPVlan0, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPVlan0, cls).tearDownClass()
+
def setUp(self):
super(TestIPVlan0, self).setUp()
class TestIPPunt(VppTestCase):
""" IPv4 Punt Police/Redirect """
+ @classmethod
+ def setUpClass(cls):
+ super(TestIPPunt, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPPunt, cls).tearDownClass()
+
def setUp(self):
super(TestIPPunt, self).setUp()
#
# add a policer
#
- policer = self.vapi.policer_add_del("ip4-punt", 400, 0, 10, 0,
+ policer = self.vapi.policer_add_del(b"ip4-punt", 400, 0, 10, 0,
rate_type=1)
self.vapi.ip_punt_police(policer.policer_index)
self.pg_start()
#
- # the number of packet recieved should be greater than 0,
+ # the number of packet received should be greater than 0,
# but not equal to the number sent, since some were policed
#
rx = self.pg1._get_capture(1)
self.assertLess(len(rx), len(pkts))
#
- # remove the poilcer. back to full rx
+ # remove the policer. back to full rx
#
self.vapi.ip_punt_police(policer.policer_index, is_add=0)
- self.vapi.policer_add_del("ip4-punt", 400, 0, 10, 0,
+ self.vapi.policer_add_del(b"ip4-punt", 400, 0, 10, 0,
rate_type=1, is_add=0)
self.send_and_expect(self.pg0, pkts, self.pg1)
class TestIPDeag(VppTestCase):
""" IPv4 Deaggregate Routes """
+ @classmethod
+ def setUpClass(cls):
+ super(TestIPDeag, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPDeag, cls).tearDownClass()
+
def setUp(self):
super(TestIPDeag, self).setUp()
class TestIPInput(VppTestCase):
""" IPv4 Input Exceptions """
+ @classmethod
+ def setUpClass(cls):
+ super(TestIPInput, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPInput, cls).tearDownClass()
+
def setUp(self):
super(TestIPInput, self).setUp()
class TestIPDirectedBroadcast(VppTestCase):
""" IPv4 Directed Broadcast """
+ @classmethod
+ def setUpClass(cls):
+ super(TestIPDirectedBroadcast, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPDirectedBroadcast, cls).tearDownClass()
+
def setUp(self):
super(TestIPDirectedBroadcast, self).setUp()
class TestIPLPM(VppTestCase):
""" IPv4 longest Prefix Match """
+ @classmethod
+ def setUpClass(cls):
+ super(TestIPLPM, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPLPM, cls).tearDownClass()
+
def setUp(self):
super(TestIPLPM, self).setUp()
i.config_ip4()
i.resolve_arp()
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPv4Frag, cls).tearDownClass()
+
def test_frag_large_packets(self):
""" Fragmentation of large packets """