-#!/usr/bin/env python
+#!/usr/bin/env python3
import socket
import unittest
from vpp_ip import DpoProto
from vpp_ip_route import VppIpRoute, VppRoutePath, find_route, VppIpMRoute, \
VppMRoutePath, MRouteItfFlags, MRouteEntryFlags, VppMplsIpBind, \
- VppMplsRoute, VppMplsTable, VppIpTable, FibPathType
+ VppMplsRoute, VppMplsTable, VppIpTable, FibPathType, FibPathProto, \
+ VppIpInterfaceAddress, find_route_in_dump, find_mroute_in_dump
from vpp_neighbor import find_nbr, VppNeighbor
from vpp_pg_interface import is_ipv6_misc
from vpp_sub_interface import VppSubInterface, VppDot1QSubint
-from ipaddress import IPv6Network, IPv4Network, IPv6Address
+from ipaddress import IPv6Network, IPv6Address
AF_INET6 = socket.AF_INET6
:param int packet_size: Required packet size.
:param Scapy pkt: Packet to be modified.
"""
- dst_if_idx = packet_size / 10 % 2
+ dst_if_idx = int(packet_size / 10 % 2)
dst_if = self.flows[src_if][dst_if_idx]
info = self.create_packet_info(src_if, dst_if)
payload = self.info_to_payload(info)
# the options are nested in the scapy packet in way that i cannot
# decipher how to decode. this 1st layer of option always returns
# nested classes, so a direct obj1=obj2 comparison always fails.
- # however, the getlayer(.., 2) does give one instnace.
+ # however, the getlayer(.., 2) does give one instance.
# so we cheat here and construct a new opt instance for comparison
rd = ICMPv6NDOptPrefixInfo(
prefixlen=raos.prefixlen,
rd = rx.getlayer(
ICMPv6NDOptPrefixInfo, ii + 2)
else:
- self.assertEqual(pi_opt, raos)
+ self.assertEqual(pi_opt, raos, 'Expected: %s, received: %s'
+ % (pi_opt.show(dump=True),
+ raos.show(dump=True)))
def send_and_expect_ra(self, intf, pkts, remark, dst_ip=None,
filter_out_fn=is_ipv6_misc,
#
# Configure The RA to announce the links prefix
#
- self.pg0.ip6_ra_prefix(self.pg0.local_ip6,
- self.pg0.local_ip6_prefix_len)
+ self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
+ self.pg0.local_ip6_prefix_len))
#
# RAs should now contain the prefix information option
# Change the prefix info to not off-link
# L-flag is clear
#
- self.pg0.ip6_ra_prefix(self.pg0.local_ip6,
- self.pg0.local_ip6_prefix_len,
+ self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
+ self.pg0.local_ip6_prefix_len),
off_link=1)
opt = ICMPv6NDOptPrefixInfo(
# Change the prefix info to not off-link, no-autoconfig
# L and A flag are clear in the advert
#
- self.pg0.ip6_ra_prefix(self.pg0.local_ip6,
- self.pg0.local_ip6_prefix_len,
+ self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
+ self.pg0.local_ip6_prefix_len),
off_link=1,
no_autoconfig=1)
# Change the flag settings back to the defaults
# L and A flag are set in the advert
#
- self.pg0.ip6_ra_prefix(self.pg0.local_ip6,
- self.pg0.local_ip6_prefix_len)
+ self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
+ self.pg0.local_ip6_prefix_len))
opt = ICMPv6NDOptPrefixInfo(
prefixlen=self.pg0.local_ip6_prefix_len,
# Change the prefix info to not off-link, no-autoconfig
# L and A flag are clear in the advert
#
- self.pg0.ip6_ra_prefix(self.pg0.local_ip6,
- self.pg0.local_ip6_prefix_len,
+ self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
+ self.pg0.local_ip6_prefix_len),
off_link=1,
no_autoconfig=1)
# Use the reset to defaults option to revert to defaults
# L and A flag are clear in the advert
#
- self.pg0.ip6_ra_prefix(self.pg0.local_ip6,
- self.pg0.local_ip6_prefix_len,
+ self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
+ self.pg0.local_ip6_prefix_len),
use_default=1)
opt = ICMPv6NDOptPrefixInfo(
#
# Advertise Another prefix. With no L-flag/A-flag
#
- self.pg0.ip6_ra_prefix(self.pg1.local_ip6,
- self.pg1.local_ip6_prefix_len,
+ self.pg0.ip6_ra_prefix('%s/%s' % (self.pg1.local_ip6,
+ self.pg1.local_ip6_prefix_len),
off_link=1,
no_autoconfig=1)
# Remove the first prefix-info - expect the second is still in the
# advert
#
- self.pg0.ip6_ra_prefix(self.pg0.local_ip6,
- self.pg0.local_ip6_prefix_len,
+ self.pg0.ip6_ra_prefix('%s/%s' % (self.pg0.local_ip6,
+ self.pg0.local_ip6_prefix_len),
is_no=1)
opt = ICMPv6NDOptPrefixInfo(
#
# Remove the second prefix-info - expect no prefix-info in the adverts
#
- self.pg0.ip6_ra_prefix(self.pg1.local_ip6,
- self.pg1.local_ip6_prefix_len,
+ self.pg0.ip6_ra_prefix('%s/%s' % (self.pg1.local_ip6,
+ self.pg1.local_ip6_prefix_len),
is_no=1)
self.pg0.ip6_ra_config(send_unicast=1)
self.pg0.ip6_ra_config(no=1, suppress=1, send_unicast=0)
+class TestIPv6IfAddrRoute(VppTestCase):
+ """ IPv6 Interface Addr Route Test Case """
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestIPv6IfAddrRoute, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPv6IfAddrRoute, cls).tearDownClass()
+
+ def setUp(self):
+ super(TestIPv6IfAddrRoute, self).setUp()
+
+ # create 1 pg interface
+ self.create_pg_interfaces(range(1))
+
+ for i in self.pg_interfaces:
+ i.admin_up()
+ i.config_ip6()
+ i.resolve_ndp()
+
+ def tearDown(self):
+ super(TestIPv6IfAddrRoute, self).tearDown()
+ for i in self.pg_interfaces:
+ i.unconfig_ip6()
+ i.admin_down()
+
+ def test_ipv6_ifaddrs_same_prefix(self):
+ """ IPv6 Interface Addresses Same Prefix test
+
+ Test scenario:
+
+ - Verify no route in FIB for prefix 2001:10::/64
+ - Configure IPv4 address 2001:10::10/64 on an interface
+ - Verify route in FIB for prefix 2001:10::/64
+ - Configure IPv4 address 2001:10::20/64 on an interface
+ - Delete 2001:10::10/64 from interface
+ - Verify route in FIB for prefix 2001:10::/64
+ - Delete 2001:10::20/64 from interface
+ - Verify no route in FIB for prefix 2001:10::/64
+ """
+
+ addr1 = "2001:10::10"
+ addr2 = "2001:10::20"
+
+ if_addr1 = VppIpInterfaceAddress(self, self.pg0, addr1, 64)
+ if_addr2 = VppIpInterfaceAddress(self, self.pg0, addr2, 64)
+ self.assertFalse(if_addr1.query_vpp_config())
+ self.assertFalse(find_route(self, addr1, 128))
+ self.assertFalse(find_route(self, addr2, 128))
+
+ # configure first address, verify route present
+ if_addr1.add_vpp_config()
+ self.assertTrue(if_addr1.query_vpp_config())
+ self.assertTrue(find_route(self, addr1, 128))
+ self.assertFalse(find_route(self, addr2, 128))
+
+ # configure second address, delete first, verify route not removed
+ if_addr2.add_vpp_config()
+ if_addr1.remove_vpp_config()
+ self.assertFalse(if_addr1.query_vpp_config())
+ self.assertTrue(if_addr2.query_vpp_config())
+ self.assertFalse(find_route(self, addr1, 128))
+ self.assertTrue(find_route(self, addr2, 128))
+
+ # delete second address, verify route removed
+ if_addr2.remove_vpp_config()
+ self.assertFalse(if_addr1.query_vpp_config())
+ self.assertFalse(find_route(self, addr1, 128))
+ self.assertFalse(find_route(self, addr2, 128))
+
+
class TestICMPv6Echo(VppTestCase):
""" ICMPv6 Echo Test Case """
self.pg0.add_stream([packet])
self.pg_start()
- self.sleep(0.1)
+ self.sleep_on_vpp_time(0.1)
fib = self.vapi.ip_route_dump(0, True)
self.pg0.add_stream([packet])
self.pg_start()
- self.sleep(0.1)
+ self.sleep_on_vpp_time(0.1)
# check that default route is deleted
fib = self.vapi.ip_route_dump(0, True)
default_routes = self.get_default_routes(fib)
self.assertEqual(len(default_routes), 0)
- self.sleep(0.1)
+ self.sleep_on_vpp_time(0.1)
# send RA
packet = self.create_ra_packet(self.pg0)
self.pg0.add_stream([packet])
self.pg_start()
- self.sleep(0.1)
+ self.sleep_on_vpp_time(0.1)
# check FIB for new default route
fib = self.vapi.ip_route_dump(0, True)
self.pg0.add_stream([packet])
self.pg_start()
- self.sleep(0.1)
+ self.sleep_on_vpp_time(0.1)
# check that default route still exists
fib = self.vapi.ip_route_dump(0, True)
self.assertEqual(dr['sw_if_index'], self.pg0.sw_if_index)
self.assertEqual(dr['next_hop'], router_address)
- self.sleep(1)
+ self.sleep_on_vpp_time(1)
# check that default route is deleted
fib = self.vapi.ip_route_dump(0, True)
strict=False)
self.assertEqual(prefix, IPv6Network(text_type('1::/20')))
- self.sleep(1)
+ self.sleep_on_vpp_time(1)
# check that SLAAC address is deleted
fib = self.vapi.ip_route_dump(0, True)
IPv6(dst=self.pg0._remote_hosts[2].ip6,
src=self.pg0.remote_ip6) /
inet6.UDP(sport=10000, dport=20000) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
self.pg0.add_stream(t)
self.pg_enable_capture(self.pg_interfaces)
IPv6(dst=self.pg0._remote_hosts[2].ip6,
src=self.pg0._remote_hosts[3].ip6) /
inet6.UDP(sport=10000, dport=20000) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
self.pg2.add_stream(t2)
self.pg_enable_capture(self.pg_interfaces)
dst=self.pg0.local_mac) /
IPv6(src=self.pg0.remote_ip6, dst="2001::1") /
inet6.UDP(sport=1234, dport=1234) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
#
# A route via IP NULL that will reply with ICMP unreachables
dst=self.pg1.local_mac) /
IPv6(src="2001::1", dst=self.pg0.remote_ip6) /
inet6.UDP(sport=1234, dport=1234) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
pm = (Ether(src=self.pg1.remote_mac,
dst=self.pg1.local_mac) /
IPv6(src="2001::1", dst="ffef::1") /
inet6.UDP(sport=1234, dport=1234) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
#
# PG1 does not forward IP traffic
port_ip_hdr = (
IPv6(dst="3000::1", src="3000:1::1") /
inet6.UDP(sport=1234, dport=1234 + ii) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
port_ip_pkts.append((Ether(src=self.pg0.remote_mac,
dst=self.pg0.local_mac) /
port_ip_hdr))
src_ip_hdr = (
IPv6(dst="3000::1", src="3000:1::%d" % ii) /
inet6.UDP(sport=1234, dport=1234) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
src_ip_pkts.append((Ether(src=self.pg0.remote_mac,
dst=self.pg0.local_mac) /
src_ip_hdr))
src="4000:1::1") /
inet6.UDP(sport=1234,
dport=1234 + ii) /
- Raw('\xa5' * 100)))
+ Raw(b'\xa5' * 100)))
src_pkts.append((Ether(src=self.pg0.remote_mac,
dst=self.pg0.local_mac) /
IPv6(dst="4000::1",
src="4000:1::%d" % ii) /
inet6.UDP(sport=1234, dport=1234) /
- Raw('\xa5' * 100)))
+ Raw(b'\xa5' * 100)))
route_3000_2 = VppIpRoute(self, "3000::2", 128,
[VppRoutePath(self.pg3.remote_ip6,
src="6000:1::1") /
inet6.UDP(sport=1234,
dport=1234 + ii) /
- Raw('\xa5' * 100)))
+ Raw(b'\xa5' * 100)))
route_5000_2 = VppIpRoute(self, "5000::2", 128,
[VppRoutePath(self.pg3.remote_ip6,
IPv6(src=self.pg0.remote_ip6,
dst=self.pg0.local_ip6) /
inet6.TCP(sport=1234, dport=1234) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
pkts = p * 1025
dst=self.pg0.local_mac) /
IPv6(src="5::5", dst="1::1") /
inet6.TCP(sport=1234, dport=1234) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
p_src = (Ether(src=self.pg0.remote_mac,
dst=self.pg0.local_mac) /
IPv6(src="2::2", dst="1::2") /
inet6.TCP(sport=1234, dport=1234) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
pkts_dst = p_dst * 257
pkts_src = p_src * 257
dst=self.pg0.local_mac) /
IPv6(src="3::4", dst="3::3") /
inet6.TCP(sport=1234, dport=1234) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
self.send_and_assert_no_replies(self.pg0, p_l * 257,
"IP lookup loop")
dst=self.pg1.remote_ip6,
hlim=1) /
inet6.UDP(sport=1234, dport=1234) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
rx = self.send_and_expect(self.pg0, p_version * NUM_PKTS, self.pg0)
rx = rx[0]
dst=dst or self.pg1.remote_ip6,
version=3) /
l4 /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
self.send_and_assert_no_replies(self.pg0, p_version * NUM_PKTS,
remark=msg or "",
IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) /
IPv6ExtHdrHopByHop() /
inet6.UDP(sport=1234, dport=1234) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
+
+class TestIPReplace(VppTestCase):
+ """ IPv6 Table Replace """
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestIPReplace, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPReplace, cls).tearDownClass()
+
+ def setUp(self):
+ super(TestIPReplace, self).setUp()
+
+ self.create_pg_interfaces(range(4))
+
+ table_id = 1
+ self.tables = []
+
+ for i in self.pg_interfaces:
+ i.admin_up()
+ i.config_ip6()
+ i.resolve_arp()
+ i.generate_remote_hosts(2)
+ self.tables.append(VppIpTable(self, table_id,
+ True).add_vpp_config())
+ table_id += 1
+
+ def tearDown(self):
+ super(TestIPReplace, self).tearDown()
+ for i in self.pg_interfaces:
+ i.admin_down()
+ i.unconfig_ip4()
+
+ def test_replace(self):
+ """ IP Table Replace """
+
+ N_ROUTES = 20
+ links = [self.pg0, self.pg1, self.pg2, self.pg3]
+ routes = [[], [], [], []]
+
+ # the sizes of 'empty' tables
+ for t in self.tables:
+ self.assertEqual(len(t.dump()), 2)
+ self.assertEqual(len(t.mdump()), 5)
+
+ # load up the tables with some routes
+ for ii, t in enumerate(self.tables):
+ for jj in range(1, N_ROUTES):
+ uni = VppIpRoute(
+ self, "2001::%d" % jj if jj != 0 else "2001::", 128,
+ [VppRoutePath(links[ii].remote_hosts[0].ip6,
+ links[ii].sw_if_index),
+ VppRoutePath(links[ii].remote_hosts[1].ip6,
+ links[ii].sw_if_index)],
+ table_id=t.table_id).add_vpp_config()
+ multi = VppIpMRoute(
+ self, "::",
+ "ff:2001::%d" % jj, 128,
+ MRouteEntryFlags.MFIB_ENTRY_FLAG_NONE,
+ [VppMRoutePath(self.pg0.sw_if_index,
+ MRouteItfFlags.MFIB_ITF_FLAG_ACCEPT,
+ proto=FibPathProto.FIB_PATH_NH_PROTO_IP6),
+ VppMRoutePath(self.pg1.sw_if_index,
+ MRouteItfFlags.MFIB_ITF_FLAG_FORWARD,
+ proto=FibPathProto.FIB_PATH_NH_PROTO_IP6),
+ VppMRoutePath(self.pg2.sw_if_index,
+ MRouteItfFlags.MFIB_ITF_FLAG_FORWARD,
+ proto=FibPathProto.FIB_PATH_NH_PROTO_IP6),
+ VppMRoutePath(self.pg3.sw_if_index,
+ MRouteItfFlags.MFIB_ITF_FLAG_FORWARD,
+ proto=FibPathProto.FIB_PATH_NH_PROTO_IP6)],
+ table_id=t.table_id).add_vpp_config()
+ routes[ii].append({'uni': uni,
+ 'multi': multi})
+
+ #
+ # replace the tables a few times
+ #
+ for kk in range(3):
+ # replace each table
+ for t in self.tables:
+ t.replace_begin()
+
+ # all the routes are still there
+ for ii, t in enumerate(self.tables):
+ dump = t.dump()
+ mdump = t.mdump()
+ for r in routes[ii]:
+ self.assertTrue(find_route_in_dump(dump, r['uni'], t))
+ self.assertTrue(find_mroute_in_dump(mdump, r['multi'], t))
+
+ # redownload the even numbered routes
+ for ii, t in enumerate(self.tables):
+ for jj in range(0, N_ROUTES, 2):
+ routes[ii][jj]['uni'].add_vpp_config()
+ routes[ii][jj]['multi'].add_vpp_config()
+
+ # signal each table converged
+ for t in self.tables:
+ t.replace_end()
+
+ # we should find the even routes, but not the odd
+ for ii, t in enumerate(self.tables):
+ dump = t.dump()
+ mdump = t.mdump()
+ for jj in range(0, N_ROUTES, 2):
+ self.assertTrue(find_route_in_dump(
+ dump, routes[ii][jj]['uni'], t))
+ self.assertTrue(find_mroute_in_dump(
+ mdump, routes[ii][jj]['multi'], t))
+ for jj in range(1, N_ROUTES - 1, 2):
+ self.assertFalse(find_route_in_dump(
+ dump, routes[ii][jj]['uni'], t))
+ self.assertFalse(find_mroute_in_dump(
+ mdump, routes[ii][jj]['multi'], t))
+
+ # reload all the routes
+ for ii, t in enumerate(self.tables):
+ for r in routes[ii]:
+ r['uni'].add_vpp_config()
+ r['multi'].add_vpp_config()
+
+ # all the routes are still there
+ for ii, t in enumerate(self.tables):
+ dump = t.dump()
+ mdump = t.mdump()
+ for r in routes[ii]:
+ self.assertTrue(find_route_in_dump(dump, r['uni'], t))
+ self.assertTrue(find_mroute_in_dump(mdump, r['multi'], t))
+
+ #
+ # finally flush the tables for good measure
+ #
+ for t in self.tables:
+ t.flush()
+ self.assertEqual(len(t.dump()), 2)
+ self.assertEqual(len(t.mdump()), 5)
+
+
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)