from framework import VppTestCase, VppTestRunner
from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute, \
VppMplsIpBind, VppIpMRoute, VppMRoutePath, \
- MRouteItfFlags, MRouteEntryFlags, DpoProto
+ MRouteItfFlags, MRouteEntryFlags, DpoProto, VppIpTable, VppMplsTable
from vpp_mpls_tunnel_interface import VppMPLSTunnelInterface
from scapy.packet import Raw
# setup both interfaces
# assign them different tables.
table_id = 0
+ self.tables = []
+
+ tbl = VppMplsTable(self, 0)
+ tbl.add_vpp_config()
+ self.tables.append(tbl)
for i in self.pg_interfaces:
i.admin_up()
+
+ if table_id != 0:
+ tbl = VppIpTable(self, table_id)
+ tbl.add_vpp_config()
+ self.tables.append(tbl)
+ tbl = VppIpTable(self, table_id, is_ip6=1)
+ tbl.add_vpp_config()
+ self.tables.append(tbl)
+
i.set_table_ip4(table_id)
i.set_table_ip6(table_id)
i.config_ip4()
table_id += 1
def tearDown(self):
- super(TestMPLS, self).tearDown()
for i in self.pg_interfaces:
i.unconfig_ip4()
i.unconfig_ip6()
i.ip6_disable()
+ i.set_table_ip4(0)
+ i.set_table_ip6(0)
+ i.disable_mpls()
i.admin_down()
+ super(TestMPLS, self).tearDown()
# the default of 64 matches the IP packet TTL default
def create_stream_labelled_ip4(
ping=0,
ip_itf=None,
dst_ip=None,
+ chksum=None,
n=257):
self.reset_packet_infos()
pkts = []
dst=ip_itf.local_ip4) /
ICMP())
+ if chksum:
+ p[IP].chksum = chksum
info.data = p.copy()
pkts.append(p)
return pkts
return pkts
def create_stream_labelled_ip6(self, src_if, mpls_label, mpls_ttl,
- dst_ip=None):
+ dst_ip=None, hlim=64):
if dst_ip is None:
dst_ip = src_if.remote_ip6
self.reset_packet_infos()
payload = self.info_to_payload(info)
p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
MPLS(label=mpls_label, ttl=mpls_ttl) /
- IPv6(src=src_if.remote_ip6, dst=dst_ip) /
+ IPv6(src=src_if.remote_ip6, dst=dst_ip, hlim=hlim) /
UDP(sport=1234, dport=1234) /
Raw(payload))
info.data = p.copy()
rx = self.pg1.get_capture(257)
self.verify_capture_ip4(self.pg1, rx, tx)
+ #
+ # disposed packets have an invalid IPv4 checkusm
+ #
+ tx = self.create_stream_labelled_ip4(self.pg0, [34],
+ dst_ip="232.1.1.1", n=65,
+ chksum=1)
+ self.send_and_assert_no_replies(self.pg0, tx, "Invalid Checksum")
+
#
# set the RPF-ID of the enrtry to not match the input packet's
#
tx = self.create_stream_labelled_ip4(self.pg0, [34],
dst_ip="232.1.1.1")
self.send_and_assert_no_replies(self.pg0, tx, "RPF-ID drop 56")
+ self.logger.error(self.vapi.cli("sh error"))
def test_mcast_ip6_tail(self):
""" MPLS IPv6 Multicast Tail """
rx = self.pg1.get_capture(257)
self.verify_capture_ip6(self.pg1, rx, tx)
+ #
+ # disposed packets have hop-limit = 1
+ #
+ tx = self.create_stream_labelled_ip6(self.pg0, [34], 255,
+ dst_ip="ff01::1", hlim=1)
+ self.send_and_assert_no_replies(self.pg0, tx, "Hop Limt Expired")
+
#
# set the RPF-ID of the enrtry to not match the input packet's
#
# create 2 pg interfaces
self.create_pg_interfaces(range(2))
+ self.tbl = VppMplsTable(self, 0)
+ self.tbl.add_vpp_config()
+
# PG0 is MPLS enalbed
self.pg0.admin_up()
self.pg0.config_ip4()
self.pg1.admin_up()
def tearDown(self):
- super(TestMPLSDisabled, self).tearDown()
for i in self.pg_interfaces:
i.unconfig_ip4()
i.admin_down()
+ self.pg0.disable_mpls()
+ super(TestMPLSDisabled, self).tearDown()
+
def send_and_assert_no_replies(self, intf, pkts, remark):
intf.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
# create 2 pg interfaces
self.create_pg_interfaces(range(4))
+ mpls_tbl = VppMplsTable(self, 0)
+ mpls_tbl.add_vpp_config()
+ tbl4 = VppIpTable(self, 1)
+ tbl4.add_vpp_config()
+ tbl6 = VppIpTable(self, 1, is_ip6=1)
+ tbl6.add_vpp_config()
+
# core links
self.pg0.admin_up()
self.pg0.config_ip4()
self.pg3.resolve_ndp()
def tearDown(self):
- super(TestMPLSPIC, self).tearDown()
self.pg0.disable_mpls()
+ self.pg1.disable_mpls()
for i in self.pg_interfaces:
i.unconfig_ip4()
i.unconfig_ip6()
i.set_table_ip4(0)
i.set_table_ip6(0)
i.admin_down()
+ super(TestMPLSPIC, self).tearDown()
def test_mpls_ibgp_pic(self):
""" MPLS iBGP PIC edge convergence
# create 2 pg interfaces
self.create_pg_interfaces(range(2))
+ # create the default MPLS table
+ self.tables = []
+ tbl = VppMplsTable(self, 0)
+ tbl.add_vpp_config()
+ self.tables.append(tbl)
+
# use pg0 as the core facing interface
self.pg0.admin_up()
self.pg0.config_ip4()
self.pg0.resolve_arp()
self.pg0.enable_mpls()
- # use the other 2 for customer facg L2 links
+ # use the other 2 for customer facing L2 links
for i in self.pg_interfaces[1:]:
i.admin_up()
def tearDown(self):
- super(TestMPLSL2, self).tearDown()
for i in self.pg_interfaces[1:]:
i.admin_down()
self.pg0.disable_mpls()
self.pg0.unconfig_ip4()
self.pg0.admin_down()
+ super(TestMPLSL2, self).tearDown()
def verify_capture_tunneled_ethernet(self, capture, sent, mpls_labels,
ttl=255, top=None):