-#!/usr/bin/env python
+#!/usr/bin/env python3
import socket
import unittest
from framework import VppTestCase, VppTestRunner
from util import ppp, ip6_normalize, mk_ll_addr
-from vpp_ip import DpoProto, VppIpAddress
+from vpp_ip import DpoProto
from vpp_ip_route import VppIpRoute, VppRoutePath, find_route, VppIpMRoute, \
VppMRoutePath, MRouteItfFlags, MRouteEntryFlags, VppMplsIpBind, \
- VppMplsRoute, VppMplsTable, VppIpTable, FibPathType, \
- VppIpInterfaceAddress
+ VppMplsRoute, VppMplsTable, VppIpTable, FibPathType, FibPathProto, \
+ VppIpInterfaceAddress, find_route_in_dump, find_mroute_in_dump
from vpp_neighbor import find_nbr, VppNeighbor
from vpp_pg_interface import is_ipv6_misc
from vpp_sub_interface import VppSubInterface, VppDot1QSubint
-from ipaddress import IPv6Network, IPv4Network, IPv6Address
+from ipaddress import IPv6Network, IPv6Address
AF_INET6 = socket.AF_INET6
addr1 = "2001:10::10"
addr2 = "2001:10::20"
- if_addr1 = VppIpInterfaceAddress(self, self.pg0,
- VppIpAddress(addr1), 64)
- if_addr2 = VppIpInterfaceAddress(self, self.pg0,
- VppIpAddress(addr2), 64)
- self.assertFalse(if_addr1.query_vpp_config()) # 2001:10::/64
+ if_addr1 = VppIpInterfaceAddress(self, self.pg0, addr1, 64)
+ if_addr2 = VppIpInterfaceAddress(self, self.pg0, addr2, 64)
+ self.assertFalse(if_addr1.query_vpp_config())
self.assertFalse(find_route(self, addr1, 128))
self.assertFalse(find_route(self, addr2, 128))
# configure first address, verify route present
if_addr1.add_vpp_config()
- self.assertTrue(if_addr1.query_vpp_config()) # 2001:10::/64
+ self.assertTrue(if_addr1.query_vpp_config())
self.assertTrue(find_route(self, addr1, 128))
self.assertFalse(find_route(self, addr2, 128))
# configure second address, delete first, verify route not removed
if_addr2.add_vpp_config()
if_addr1.remove_vpp_config()
- self.assertTrue(if_addr1.query_vpp_config()) # 2001:10::/64
+ self.assertFalse(if_addr1.query_vpp_config())
+ self.assertTrue(if_addr2.query_vpp_config())
self.assertFalse(find_route(self, addr1, 128))
self.assertTrue(find_route(self, addr2, 128))
# delete second address, verify route removed
if_addr2.remove_vpp_config()
- self.assertFalse(if_addr1.query_vpp_config()) # 2001:10::/64
+ self.assertFalse(if_addr1.query_vpp_config())
self.assertFalse(find_route(self, addr1, 128))
self.assertFalse(find_route(self, addr2, 128))
IPv6(dst=self.pg0._remote_hosts[2].ip6,
src=self.pg0.remote_ip6) /
inet6.UDP(sport=10000, dport=20000) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
self.pg0.add_stream(t)
self.pg_enable_capture(self.pg_interfaces)
IPv6(dst=self.pg0._remote_hosts[2].ip6,
src=self.pg0._remote_hosts[3].ip6) /
inet6.UDP(sport=10000, dport=20000) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
self.pg2.add_stream(t2)
self.pg_enable_capture(self.pg_interfaces)
dst=self.pg0.local_mac) /
IPv6(src=self.pg0.remote_ip6, dst="2001::1") /
inet6.UDP(sport=1234, dport=1234) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
#
# A route via IP NULL that will reply with ICMP unreachables
dst=self.pg1.local_mac) /
IPv6(src="2001::1", dst=self.pg0.remote_ip6) /
inet6.UDP(sport=1234, dport=1234) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
pm = (Ether(src=self.pg1.remote_mac,
dst=self.pg1.local_mac) /
IPv6(src="2001::1", dst="ffef::1") /
inet6.UDP(sport=1234, dport=1234) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
#
# PG1 does not forward IP traffic
port_ip_hdr = (
IPv6(dst="3000::1", src="3000:1::1") /
inet6.UDP(sport=1234, dport=1234 + ii) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
port_ip_pkts.append((Ether(src=self.pg0.remote_mac,
dst=self.pg0.local_mac) /
port_ip_hdr))
src_ip_hdr = (
IPv6(dst="3000::1", src="3000:1::%d" % ii) /
inet6.UDP(sport=1234, dport=1234) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
src_ip_pkts.append((Ether(src=self.pg0.remote_mac,
dst=self.pg0.local_mac) /
src_ip_hdr))
src="4000:1::1") /
inet6.UDP(sport=1234,
dport=1234 + ii) /
- Raw('\xa5' * 100)))
+ Raw(b'\xa5' * 100)))
src_pkts.append((Ether(src=self.pg0.remote_mac,
dst=self.pg0.local_mac) /
IPv6(dst="4000::1",
src="4000:1::%d" % ii) /
inet6.UDP(sport=1234, dport=1234) /
- Raw('\xa5' * 100)))
+ Raw(b'\xa5' * 100)))
route_3000_2 = VppIpRoute(self, "3000::2", 128,
[VppRoutePath(self.pg3.remote_ip6,
src="6000:1::1") /
inet6.UDP(sport=1234,
dport=1234 + ii) /
- Raw('\xa5' * 100)))
+ Raw(b'\xa5' * 100)))
route_5000_2 = VppIpRoute(self, "5000::2", 128,
[VppRoutePath(self.pg3.remote_ip6,
IPv6(src=self.pg0.remote_ip6,
dst=self.pg0.local_ip6) /
inet6.TCP(sport=1234, dport=1234) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
pkts = p * 1025
dst=self.pg0.local_mac) /
IPv6(src="5::5", dst="1::1") /
inet6.TCP(sport=1234, dport=1234) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
p_src = (Ether(src=self.pg0.remote_mac,
dst=self.pg0.local_mac) /
IPv6(src="2::2", dst="1::2") /
inet6.TCP(sport=1234, dport=1234) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
pkts_dst = p_dst * 257
pkts_src = p_src * 257
dst=self.pg0.local_mac) /
IPv6(src="3::4", dst="3::3") /
inet6.TCP(sport=1234, dport=1234) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
self.send_and_assert_no_replies(self.pg0, p_l * 257,
"IP lookup loop")
dst=self.pg1.remote_ip6,
hlim=1) /
inet6.UDP(sport=1234, dport=1234) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
rx = self.send_and_expect(self.pg0, p_version * NUM_PKTS, self.pg0)
rx = rx[0]
dst=dst or self.pg1.remote_ip6,
version=3) /
l4 /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
self.send_and_assert_no_replies(self.pg0, p_version * NUM_PKTS,
remark=msg or "",
IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) /
IPv6ExtHdrHopByHop() /
inet6.UDP(sport=1234, dport=1234) /
- Raw('\xa5' * 100))
+ Raw(b'\xa5' * 100))
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
+
+class TestIPReplace(VppTestCase):
+ """ IPv6 Table Replace """
+
+ @classmethod
+ def setUpClass(cls):
+ super(TestIPReplace, cls).setUpClass()
+
+ @classmethod
+ def tearDownClass(cls):
+ super(TestIPReplace, cls).tearDownClass()
+
+ def setUp(self):
+ super(TestIPReplace, self).setUp()
+
+ self.create_pg_interfaces(range(4))
+
+ table_id = 1
+ self.tables = []
+
+ for i in self.pg_interfaces:
+ i.admin_up()
+ i.config_ip6()
+ i.resolve_arp()
+ i.generate_remote_hosts(2)
+ self.tables.append(VppIpTable(self, table_id,
+ True).add_vpp_config())
+ table_id += 1
+
+ def tearDown(self):
+ super(TestIPReplace, self).tearDown()
+ for i in self.pg_interfaces:
+ i.admin_down()
+ i.unconfig_ip4()
+
+ def test_replace(self):
+ """ IP Table Replace """
+
+ N_ROUTES = 20
+ links = [self.pg0, self.pg1, self.pg2, self.pg3]
+ routes = [[], [], [], []]
+
+ # the sizes of 'empty' tables
+ for t in self.tables:
+ self.assertEqual(len(t.dump()), 2)
+ self.assertEqual(len(t.mdump()), 5)
+
+ # load up the tables with some routes
+ for ii, t in enumerate(self.tables):
+ for jj in range(1, N_ROUTES):
+ uni = VppIpRoute(
+ self, "2001::%d" % jj if jj != 0 else "2001::", 128,
+ [VppRoutePath(links[ii].remote_hosts[0].ip6,
+ links[ii].sw_if_index),
+ VppRoutePath(links[ii].remote_hosts[1].ip6,
+ links[ii].sw_if_index)],
+ table_id=t.table_id).add_vpp_config()
+ multi = VppIpMRoute(
+ self, "::",
+ "ff:2001::%d" % jj, 128,
+ MRouteEntryFlags.MFIB_ENTRY_FLAG_NONE,
+ [VppMRoutePath(self.pg0.sw_if_index,
+ MRouteItfFlags.MFIB_ITF_FLAG_ACCEPT,
+ proto=FibPathProto.FIB_PATH_NH_PROTO_IP6),
+ VppMRoutePath(self.pg1.sw_if_index,
+ MRouteItfFlags.MFIB_ITF_FLAG_FORWARD,
+ proto=FibPathProto.FIB_PATH_NH_PROTO_IP6),
+ VppMRoutePath(self.pg2.sw_if_index,
+ MRouteItfFlags.MFIB_ITF_FLAG_FORWARD,
+ proto=FibPathProto.FIB_PATH_NH_PROTO_IP6),
+ VppMRoutePath(self.pg3.sw_if_index,
+ MRouteItfFlags.MFIB_ITF_FLAG_FORWARD,
+ proto=FibPathProto.FIB_PATH_NH_PROTO_IP6)],
+ table_id=t.table_id).add_vpp_config()
+ routes[ii].append({'uni': uni,
+ 'multi': multi})
+
+ #
+ # replace the tables a few times
+ #
+ for kk in range(3):
+ # replace each table
+ for t in self.tables:
+ t.replace_begin()
+
+ # all the routes are still there
+ for ii, t in enumerate(self.tables):
+ dump = t.dump()
+ mdump = t.mdump()
+ for r in routes[ii]:
+ self.assertTrue(find_route_in_dump(dump, r['uni'], t))
+ self.assertTrue(find_mroute_in_dump(mdump, r['multi'], t))
+
+ # redownload the even numbered routes
+ for ii, t in enumerate(self.tables):
+ for jj in range(0, N_ROUTES, 2):
+ routes[ii][jj]['uni'].add_vpp_config()
+ routes[ii][jj]['multi'].add_vpp_config()
+
+ # signal each table converged
+ for t in self.tables:
+ t.replace_end()
+
+ # we should find the even routes, but not the odd
+ for ii, t in enumerate(self.tables):
+ dump = t.dump()
+ mdump = t.mdump()
+ for jj in range(0, N_ROUTES, 2):
+ self.assertTrue(find_route_in_dump(
+ dump, routes[ii][jj]['uni'], t))
+ self.assertTrue(find_mroute_in_dump(
+ mdump, routes[ii][jj]['multi'], t))
+ for jj in range(1, N_ROUTES - 1, 2):
+ self.assertFalse(find_route_in_dump(
+ dump, routes[ii][jj]['uni'], t))
+ self.assertFalse(find_mroute_in_dump(
+ mdump, routes[ii][jj]['multi'], t))
+
+ # reload all the routes
+ for ii, t in enumerate(self.tables):
+ for r in routes[ii]:
+ r['uni'].add_vpp_config()
+ r['multi'].add_vpp_config()
+
+ # all the routes are still there
+ for ii, t in enumerate(self.tables):
+ dump = t.dump()
+ mdump = t.mdump()
+ for r in routes[ii]:
+ self.assertTrue(find_route_in_dump(dump, r['uni'], t))
+ self.assertTrue(find_mroute_in_dump(mdump, r['multi'], t))
+
+ #
+ # finally flush the tables for good measure
+ #
+ for t in self.tables:
+ t.flush()
+ self.assertEqual(len(t.dump()), 2)
+ self.assertEqual(len(t.mdump()), 5)
+
+
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)