import unittest
import socket
+from framework import tag_fixme_vpp_workers
from framework import VppTestCase, VppTestRunner
from vpp_ip import DpoProto, INVALID_INDEX
from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute, \
VppMplsIpBind, VppIpMRoute, VppMRoutePath, \
- MRouteItfFlags, MRouteEntryFlags, VppIpTable, VppMplsTable, \
+ VppIpTable, VppMplsTable, \
VppMplsLabel, MplsLspMode, find_mpls_route, \
FibPathProto, FibPathType, FibPathFlags, VppMplsLabel, MplsLspMode
from vpp_mpls_tunnel_interface import VppMPLSTunnelInterface
+from vpp_papi import VppEnum
import scapy.compat
from scapy.packet import Raw
from scapy.layers.l2 import Ether, ARP
-from scapy.layers.inet import IP, UDP, ICMP
-from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded
+from scapy.layers.inet import IP, UDP, ICMP, icmptypes, icmpcodes
+from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded, ICMPv6EchoRequest, \
+ ICMPv6PacketTooBig
from scapy.contrib.mpls import MPLS
NUM_PKTS = 67
rx_mpls = rx_mpls[MPLS].payload
+@tag_fixme_vpp_workers
class TestMPLS(VppTestCase):
""" MPLS Test Case """
return pkts
def create_stream_labelled_ip6(self, src_if, mpls_labels,
- hlim=64, dst_ip=None):
+ hlim=64, dst_ip=None,
+ ping=0, ip_itf=None):
if dst_ip is None:
dst_ip = src_if.remote_ip6
self.reset_packet_infos()
for l in mpls_labels:
p = p / MPLS(label=l.value, ttl=l.ttl, cos=l.exp)
- p = p / (IPv6(src=src_if.remote_ip6, dst=dst_ip, hlim=hlim) /
- UDP(sport=1234, dport=1234) /
- Raw(payload))
+ if ping:
+ p = p / (IPv6(src=ip_itf.remote_ip6,
+ dst=ip_itf.local_ip6) /
+ ICMPv6EchoRequest())
+ else:
+ p = p / (IPv6(src=src_if.remote_ip6, dst=dst_ip, hlim=hlim) /
+ UDP(sport=1234, dport=1234) /
+ Raw(payload))
info.data = p.copy()
pkts.append(p)
return pkts
raise
def verify_capture_ip6(self, src_if, capture, sent,
- ip_hlim=None, ip_dscp=0):
+ ip_hlim=None, ip_dscp=0,
+ ping_resp=0):
try:
self.assertEqual(len(capture), len(sent))
tx_ip = tx[IPv6]
rx_ip = rx[IPv6]
- self.assertEqual(rx_ip.src, tx_ip.src)
- self.assertEqual(rx_ip.dst, tx_ip.dst)
- self.assertEqual(rx_ip.tc, ip_dscp)
- # IP processing post pop has decremented the TTL
- if not ip_hlim:
- self.assertEqual(rx_ip.hlim + 1, tx_ip.hlim)
+ if not ping_resp:
+ self.assertEqual(rx_ip.src, tx_ip.src)
+ self.assertEqual(rx_ip.dst, tx_ip.dst)
+ self.assertEqual(rx_ip.tc, ip_dscp)
+ # IP processing post pop has decremented the TTL
+ if not ip_hlim:
+ self.assertEqual(rx_ip.hlim + 1, tx_ip.hlim)
+ else:
+ self.assertEqual(rx_ip.hlim, ip_hlim)
else:
- self.assertEqual(rx_ip.hlim, ip_hlim)
-
+ self.assertEqual(rx_ip.src, tx_ip.dst)
+ self.assertEqual(rx_ip.dst, tx_ip.src)
except:
raise
def verify_capture_ip6_icmp(self, src_if, capture, sent):
try:
- self.assertEqual(len(capture), len(sent))
+ # rate limited ICMP
+ self.assertTrue(len(capture) <= len(sent))
for i in range(len(capture)):
tx = sent[i]
except:
raise
+ def verify_capture_fragmented_labelled_ip6(self, src_if, capture, sent,
+ mpls_labels, ip_ttl=None):
+ try:
+ capture = verify_filter(capture, sent)
+
+ for i in range(len(capture)):
+ tx = sent[0]
+ rx = capture[i]
+ tx_ip = tx[IPv6]
+ rx.show()
+ rx_ip = IPv6(rx[MPLS].payload)
+ rx_ip.show()
+
+ verify_mpls_stack(self, rx, mpls_labels)
+
+ self.assertEqual(rx_ip.src, tx_ip.src)
+ self.assertEqual(rx_ip.dst, tx_ip.dst)
+ if not ip_ttl:
+ # IP processing post pop has decremented the hop-limit
+ self.assertEqual(rx_ip.hlim + 1, tx_ip.hlim)
+ else:
+ self.assertEqual(rx_ip.hlim, ip_ttl)
+ except:
+ raise
+
def test_swap(self):
""" MPLS label swap tests """
[VppMplsLabel(333, ttl=64)],
dst_ip=self.pg1.remote_ip6,
hlim=1)
- rx = self.send_and_expect(self.pg0, tx, self.pg0)
+ rx = self.send_and_expect_some(self.pg0, tx, self.pg0)
self.verify_capture_ip6_icmp(self.pg0, rx, tx)
#
tx = self.create_stream_labelled_ip6(self.pg0, [VppMplsLabel(334)],
dst_ip=self.pg1.remote_ip6,
hlim=0)
- rx = self.send_and_expect(self.pg0, tx, self.pg0)
+ rx = self.send_and_expect_some(self.pg0, tx, self.pg0)
self.verify_capture_ip6_icmp(self.pg0, rx, tx)
#
self.pg0.sw_if_index,
labels=[VppMplsLabel(32)])])
route_10_0_0_1.add_vpp_config()
+ route_1000_1 = VppIpRoute(self, "1000::1", 128,
+ [VppRoutePath(self.pg0.remote_ip6,
+ self.pg0.sw_if_index,
+ labels=[VppMplsLabel(32)])])
+ route_1000_1.add_vpp_config()
#
# a stream that matches the route for 10.0.0.1
self.verify_capture_fragmented_labelled_ip4(self.pg0, rx, tx,
[VppMplsLabel(32)])
+ # packets with DF bit set generate ICMP
+ for t in tx:
+ t[IP].flags = 'DF'
+ rxs = self.send_and_expect_some(self.pg0, tx, self.pg0)
+
+ for rx in rxs:
+ self.assertEqual(icmptypes[rx[ICMP].type], "dest-unreach")
+ self.assertEqual(icmpcodes[rx[ICMP].type][rx[ICMP].code],
+ "fragmentation-needed")
+ # the link MTU is 9000, the MPLS over head is 4 bytes
+ self.assertEqual(rx[ICMP].nexthopmtu, 9000 - 4)
+
+ self.assertEqual(self.statistics.get_err_counter(
+ "/err/mpls-frag/can't fragment this packet"),
+ len(tx))
+ #
+ # a stream that matches the route for 1000::1/128
+ # PG0 is in the default table
+ #
+ tx = self.create_stream_ip6(self.pg0, "1000::1")
+ for i in range(0, 257):
+ self.extend_packet(tx[i], 10000)
+
+ rxs = self.send_and_expect_some(self.pg0, tx, self.pg0)
+ for rx in rxs:
+ self.assertEqual(rx[ICMPv6PacketTooBig].mtu, 9000 - 4)
+
#
# cleanup
#
def test_mpls_tunnel_many(self):
""" MPLS Multiple Tunnels """
- for ii in range(10):
+ for ii in range(100):
mpls_tun = VppMPLSTunnelInterface(
self,
[VppRoutePath(self.pg0.remote_ip4,
VppMplsLabel(46, MplsLspMode.UNIFORM)])])
mpls_tun.add_vpp_config()
mpls_tun.admin_up()
+ for ii in range(100):
+ mpls_tun = VppMPLSTunnelInterface(
+ self,
+ [VppRoutePath(self.pg0.remote_ip4,
+ self.pg0.sw_if_index,
+ labels=[VppMplsLabel(44, ttl=32),
+ VppMplsLabel(46, MplsLspMode.UNIFORM)])],
+ is_l2=1)
+ mpls_tun.add_vpp_config()
+ mpls_tun.admin_up()
def test_v4_exp_null(self):
""" MPLS V4 Explicit NULL test """
0xffffffff,
nh_table_id=1)])
route_35_eos.add_vpp_config()
+ route_356_eos = VppMplsRoute(
+ self, 356, 1,
+ [VppRoutePath("0::0",
+ 0xffffffff,
+ nh_table_id=1)],
+ eos_proto=FibPathProto.FIB_PATH_NH_PROTO_IP6)
+ route_356_eos.add_vpp_config()
#
# ping an interface in the non-default table
self.pg0, [VppMplsLabel(35)], ping=1, ip_itf=self.pg1)
rx = self.send_and_expect(self.pg0, tx, self.pg1)
self.verify_capture_ip4(self.pg1, rx, tx, ping_resp=1)
+ tx = self.create_stream_labelled_ip6(
+ self.pg0, [VppMplsLabel(356)], ping=1, ip_itf=self.pg1)
+ rx = self.send_and_expect(self.pg0, tx, self.pg1)
+ self.verify_capture_ip6(self.pg1, rx, tx, ping_resp=1)
#
# Double pop
def test_mcast_head(self):
""" MPLS Multicast Head-end """
+ MRouteItfFlags = VppEnum.vl_api_mfib_itf_flags_t
+ MRouteEntryFlags = VppEnum.vl_api_mfib_entry_flags_t
+
#
# Create a multicast tunnel with two replications
#
self,
"0.0.0.0",
"232.1.1.1", 32,
- MRouteEntryFlags.MFIB_ENTRY_FLAG_NONE,
+ MRouteEntryFlags.MFIB_API_ENTRY_FLAG_NONE,
[VppMRoutePath(self.pg0.sw_if_index,
- MRouteItfFlags.MFIB_ITF_FLAG_ACCEPT),
+ MRouteItfFlags.MFIB_API_ITF_FLAG_ACCEPT),
VppMRoutePath(mpls_tun._sw_if_index,
- MRouteItfFlags.MFIB_ITF_FLAG_FORWARD)])
+ MRouteItfFlags.MFIB_API_ITF_FLAG_FORWARD)])
route_232_1_1_1.add_vpp_config()
self.logger.info(self.vapi.cli("sh ip mfib index 0"))
def test_mcast_ip4_tail(self):
""" MPLS IPv4 Multicast Tail """
+ MRouteItfFlags = VppEnum.vl_api_mfib_itf_flags_t
+ MRouteEntryFlags = VppEnum.vl_api_mfib_entry_flags_t
+
#
# Add a multicast route that will forward the traffic
# post-disposition
self,
"0.0.0.0",
"232.1.1.1", 32,
- MRouteEntryFlags.MFIB_ENTRY_FLAG_NONE,
+ MRouteEntryFlags.MFIB_API_ENTRY_FLAG_NONE,
table_id=1,
paths=[VppMRoutePath(self.pg1.sw_if_index,
- MRouteItfFlags.MFIB_ITF_FLAG_FORWARD)])
+ MRouteItfFlags.MFIB_API_ITF_FLAG_FORWARD)])
route_232_1_1_1.add_vpp_config()
#
def test_mcast_ip6_tail(self):
""" MPLS IPv6 Multicast Tail """
+ MRouteItfFlags = VppEnum.vl_api_mfib_itf_flags_t
+ MRouteEntryFlags = VppEnum.vl_api_mfib_entry_flags_t
+
#
# Add a multicast route that will forward the traffic
# post-disposition
self,
"::",
"ff01::1", 32,
- MRouteEntryFlags.MFIB_ENTRY_FLAG_NONE,
+ MRouteEntryFlags.MFIB_API_ENTRY_FLAG_NONE,
table_id=1,
paths=[VppMRoutePath(self.pg1.sw_if_index,
- MRouteItfFlags.MFIB_ITF_FLAG_FORWARD,
+ MRouteItfFlags.MFIB_API_ITF_FLAG_FORWARD,
proto=FibPathProto.FIB_PATH_NH_PROTO_IP6)])
route_ff.add_vpp_config()
[VppMplsLabel(34)],
dst_ip="ff01::1",
hlim=1)
- rx = self.send_and_expect(self.pg0, tx, self.pg0)
+ rx = self.send_and_expect_some(self.pg0, tx, self.pg0)
self.verify_capture_ip6_icmp(self.pg0, rx, tx)
#
VppMplsLabel(32),
VppMplsLabel(99)])
+ def test_attached(self):
+ """ Attach Routes with Local Label """
+
+ #
+ # test that if a local label is associated with an attached/connected
+ # prefix, that we can reach hosts in the prefix.
+ #
+ binding = VppMplsIpBind(self, 44,
+ self.pg0._local_ip4_subnet,
+ self.pg0.local_ip4_prefix_len)
+ binding.add_vpp_config()
+
+ tx = (Ether(src=self.pg1.remote_mac,
+ dst=self.pg1.local_mac) /
+ MPLS(label=44, ttl=64) /
+ IP(src=self.pg0.remote_ip4, dst=self.pg0.remote_ip4) /
+ UDP(sport=1234, dport=1234) /
+ Raw(b'\xa5' * 100))
+ rxs = self.send_and_expect(self.pg0, [tx], self.pg0)
+ for rx in rxs:
+ # if there's an ARP then the label is linked to the glean
+ # which is wrong.
+ self.assertFalse(rx.haslayer(ARP))
+ # it should be unicasted to the host
+ self.assertEqual(rx[Ether].dst, self.pg0.remote_mac)
+ self.assertEqual(rx[IP].dst, self.pg0.remote_ip4)
+
class TestMPLSDisabled(VppTestCase):
""" MPLS disabled """
def test_mpls_disabled(self):
""" MPLS Disabled """
+ self.logger.info(self.vapi.cli("show mpls interface"))
+ self.logger.info(self.vapi.cli("show mpls interface pg1"))
+ self.logger.info(self.vapi.cli("show mpls interface pg0"))
+
tx = (Ether(src=self.pg1.remote_mac,
dst=self.pg1.local_mac) /
MPLS(label=32, ttl=64) /
#
self.pg1.enable_mpls()
+ self.logger.info(self.vapi.cli("show mpls interface"))
+ self.logger.info(self.vapi.cli("show mpls interface pg1"))
+
#
# Now we get packets through
#