import socket
from framework import VppTestCase, VppTestRunner
-from vpp_sub_interface import VppSubInterface, VppDot1QSubint, VppDot1ADSubint
-from vpp_ip_route import IpRoute, RoutePath, MplsRoute, MplsIpBind
+from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute, \
+ VppMplsIpBind
from scapy.packet import Raw
from scapy.layers.l2 import Ether
-from scapy.layers.inet import IP, UDP
+from scapy.layers.inet import IP, UDP, ICMP
from scapy.layers.inet6 import IPv6
from scapy.contrib.mpls import MPLS
-from util import ppp
-
class TestMPLS(VppTestCase):
""" MPLS Test Case """
- @classmethod
- def setUpClass(cls):
- super(TestMPLS, cls).setUpClass()
-
def setUp(self):
super(TestMPLS, self).setUp()
def tearDown(self):
super(TestMPLS, self).tearDown()
+ for i in self.pg_interfaces:
+ i.unconfig_ip4()
+ i.unconfig_ip6()
+ i.ip6_disable()
+ i.admin_down()
# the default of 64 matches the IP packet TTL default
- def create_stream_labelled_ip4(self, src_if, mpls_labels, mpls_ttl=255):
+ def create_stream_labelled_ip4(
+ self,
+ src_if,
+ mpls_labels,
+ mpls_ttl=255,
+ ping=0,
+ ip_itf=None):
+ self.reset_packet_infos()
pkts = []
for i in range(0, 257):
- info = self.create_packet_info(src_if.sw_if_index,
- src_if.sw_if_index)
+ info = self.create_packet_info(src_if, src_if)
payload = self.info_to_payload(info)
p = Ether(dst=src_if.local_mac, src=src_if.remote_mac)
p = p / MPLS(label=mpls_labels[ii], ttl=mpls_ttl, s=1)
else:
p = p / MPLS(label=mpls_labels[ii], ttl=mpls_ttl, s=0)
- p = (p / IP(src=src_if.remote_ip4, dst=src_if.remote_ip4) /
- UDP(sport=1234, dport=1234) /
- Raw(payload))
+ if not ping:
+ p = (p / IP(src=src_if.local_ip4, dst=src_if.remote_ip4) /
+ UDP(sport=1234, dport=1234) /
+ Raw(payload))
+ else:
+ p = (p / IP(src=ip_itf.remote_ip4,
+ dst=ip_itf.local_ip4) /
+ ICMP())
+
info.data = p.copy()
pkts.append(p)
return pkts
def create_stream_ip4(self, src_if, dst_ip):
+ self.reset_packet_infos()
pkts = []
for i in range(0, 257):
- info = self.create_packet_info(src_if.sw_if_index,
- src_if.sw_if_index)
+ info = self.create_packet_info(src_if, src_if)
payload = self.info_to_payload(info)
p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
IP(src=src_if.remote_ip4, dst=dst_ip) /
return pkts
def create_stream_labelled_ip6(self, src_if, mpls_label, mpls_ttl):
+ self.reset_packet_infos()
pkts = []
for i in range(0, 257):
- info = self.create_packet_info(src_if.sw_if_index,
- src_if.sw_if_index)
+ info = self.create_packet_info(src_if, src_if)
payload = self.info_to_payload(info)
p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
MPLS(label=mpls_label, ttl=mpls_ttl) /
pkts.append(p)
return pkts
- def verify_filter(self, capture, sent):
+ @staticmethod
+ def verify_filter(capture, sent):
if not len(capture) == len(sent):
- # filter out any IPv6 RAs from the captur
+ # filter out any IPv6 RAs from the capture
for p in capture:
- if (p.haslayer(IPv6)):
+ if p.haslayer(IPv6):
capture.remove(p)
return capture
- def verify_capture_ip4(self, src_if, capture, sent):
+ def verify_capture_ip4(self, src_if, capture, sent, ping_resp=0):
try:
capture = self.verify_filter(capture, sent)
tx_ip = tx[IP]
rx_ip = rx[IP]
- self.assertEqual(rx_ip.src, tx_ip.src)
- self.assertEqual(rx_ip.dst, tx_ip.dst)
- # IP processing post pop has decremented the TTL
- self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl)
+ if not ping_resp:
+ self.assertEqual(rx_ip.src, tx_ip.src)
+ self.assertEqual(rx_ip.dst, tx_ip.dst)
+ # IP processing post pop has decremented the TTL
+ self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl)
+ else:
+ self.assertEqual(rx_ip.src, tx_ip.dst)
+ self.assertEqual(rx_ip.dst, tx_ip.src)
except:
raise
#
# A simple MPLS xconnect - eos label in label out
#
- route_32_eos = MplsRoute(self, 32, 1,
- [RoutePath(self.pg0.remote_ip4,
- self.pg0.sw_if_index,
- labels=[33])])
+ route_32_eos = VppMplsRoute(self, 32, 1,
+ [VppRoutePath(self.pg0.remote_ip4,
+ self.pg0.sw_if_index,
+ labels=[33])])
route_32_eos.add_vpp_config()
#
#
# A simple MPLS xconnect - non-eos label in label out
#
- route_32_neos = MplsRoute(self, 32, 0,
- [RoutePath(self.pg0.remote_ip4,
- self.pg0.sw_if_index,
- labels=[33])])
+ route_32_neos = VppMplsRoute(self, 32, 0,
+ [VppRoutePath(self.pg0.remote_ip4,
+ self.pg0.sw_if_index,
+ labels=[33])])
route_32_neos.add_vpp_config()
#
#
# An MPLS xconnect - EOS label in IP out
#
- route_33_eos = MplsRoute(self, 33, 1,
- [RoutePath(self.pg0.remote_ip4,
- self.pg0.sw_if_index,
- labels=[])])
+ route_33_eos = VppMplsRoute(self, 33, 1,
+ [VppRoutePath(self.pg0.remote_ip4,
+ self.pg0.sw_if_index,
+ labels=[])])
route_33_eos.add_vpp_config()
self.vapi.cli("clear trace")
# An MPLS xconnect - non-EOS label in IP out - an invalid configuration
# so this traffic should be dropped.
#
- route_33_neos = MplsRoute(self, 33, 0,
- [RoutePath(self.pg0.remote_ip4,
- self.pg0.sw_if_index,
- labels=[])])
+ route_33_neos = VppMplsRoute(self, 33, 0,
+ [VppRoutePath(self.pg0.remote_ip4,
+ self.pg0.sw_if_index,
+ labels=[])])
route_33_neos.add_vpp_config()
self.vapi.cli("clear trace")
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
-
- rx = self.pg0.get_capture()
- try:
- self.assertEqual(0, len(rx))
- except:
- error("MPLS non-EOS packets popped and forwarded")
- error(packet.show())
- raise
+ self.pg0.assert_nothing_captured(
+ remark="MPLS non-EOS packets popped and forwarded")
#
# A recursive EOS x-connect, which resolves through another x-connect
#
- route_34_eos = MplsRoute(self, 34, 1,
- [RoutePath("0.0.0.0",
- 0xffffffff,
- nh_via_label=32,
- labels=[44, 45])])
+ route_34_eos = VppMplsRoute(self, 34, 1,
+ [VppRoutePath("0.0.0.0",
+ 0xffffffff,
+ nh_via_label=32,
+ labels=[44, 45])])
route_34_eos.add_vpp_config()
- self.vapi.cli("clear trace")
tx = self.create_stream_labelled_ip4(self.pg0, [34])
self.pg0.add_stream(tx)
self.verify_capture_labelled_ip4(self.pg0, rx, tx, [33, 44, 45])
#
- # A recursive non-EOS x-connect, which resolves through another x-connect
+ # A recursive non-EOS x-connect, which resolves through another
+ # x-connect
#
- route_34_neos = MplsRoute(self, 34, 0,
- [RoutePath("0.0.0.0",
- 0xffffffff,
- nh_via_label=32,
- labels=[44, 46])])
+ route_34_neos = VppMplsRoute(self, 34, 0,
+ [VppRoutePath("0.0.0.0",
+ 0xffffffff,
+ nh_via_label=32,
+ labels=[44, 46])])
route_34_neos.add_vpp_config()
self.vapi.cli("clear trace")
self.pg_start()
rx = self.pg0.get_capture()
- # it's the 2nd (counting from 0) lael in the stack that is swapped
+ # it's the 2nd (counting from 0) label in the stack that is swapped
self.verify_capture_labelled(self.pg0, rx, tx, [33, 44, 46, 99], num=2)
#
- # an recursive IP route that resolves through the recursive non-eos x-connect
+ # an recursive IP route that resolves through the recursive non-eos
+ # x-connect
#
- ip_10_0_0_1 = IpRoute(self, "10.0.0.1", 32,
- [RoutePath("0.0.0.0",
- 0xffffffff,
- nh_via_label=34,
- labels=[55])])
+ ip_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
+ [VppRoutePath("0.0.0.0",
+ 0xffffffff,
+ nh_via_label=34,
+ labels=[55])])
ip_10_0_0_1.add_vpp_config()
self.vapi.cli("clear trace")
#
# Add a non-recursive route with a single out label
#
- route_10_0_0_1 = IpRoute(self, "10.0.0.1", 32,
- [RoutePath(self.pg0.remote_ip4,
- self.pg0.sw_if_index,
- labels=[45])])
+ route_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
+ [VppRoutePath(self.pg0.remote_ip4,
+ self.pg0.sw_if_index,
+ labels=[45])])
route_10_0_0_1.add_vpp_config()
# bind a local label to the route
- binding = MplsIpBind(self, 44, "10.0.0.1", 32)
+ binding = VppMplsIpBind(self, 44, "10.0.0.1", 32)
binding.add_vpp_config()
# non-EOS stream
#
# Add a non-recursive route with a single out label
#
- route_10_0_0_1 = IpRoute(self, "10.0.0.1", 32,
- [RoutePath(self.pg0.remote_ip4,
- self.pg0.sw_if_index,
- labels=[32])])
+ route_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
+ [VppRoutePath(self.pg0.remote_ip4,
+ self.pg0.sw_if_index,
+ labels=[32])])
route_10_0_0_1.add_vpp_config()
#
#
# Add a non-recursive route with a 3 out labels
#
- route_10_0_0_2 = IpRoute(self, "10.0.0.2", 32,
- [RoutePath(self.pg0.remote_ip4,
- self.pg0.sw_if_index,
- labels=[32, 33, 34])])
+ route_10_0_0_2 = VppIpRoute(self, "10.0.0.2", 32,
+ [VppRoutePath(self.pg0.remote_ip4,
+ self.pg0.sw_if_index,
+ labels=[32, 33, 34])])
route_10_0_0_2.add_vpp_config()
#
self.verify_capture_labelled_ip4(self.pg0, rx, tx, [32, 33, 34])
#
- # add a recursive path, with ouput label, via the 1 label route
+ # add a recursive path, with output label, via the 1 label route
#
- route_11_0_0_1 = IpRoute(self, "11.0.0.1", 32,
- [RoutePath("10.0.0.1",
- 0xffffffff,
- labels=[44])])
+ route_11_0_0_1 = VppIpRoute(self, "11.0.0.1", 32,
+ [VppRoutePath("10.0.0.1",
+ 0xffffffff,
+ labels=[44])])
route_11_0_0_1.add_vpp_config()
#
#
# add a recursive path, with 2 labels, via the 3 label route
#
- route_11_0_0_2 = IpRoute(self, "11.0.0.2", 32,
- [RoutePath("10.0.0.2",
- 0xffffffff,
- labels=[44, 45])])
+ route_11_0_0_2 = VppIpRoute(self, "11.0.0.2", 32,
+ [VppRoutePath("10.0.0.2",
+ 0xffffffff,
+ labels=[44, 45])])
route_11_0_0_2.add_vpp_config()
#
#
nh_addr = socket.inet_pton(socket.AF_INET, self.pg0.remote_ip4)
- reply = self.vapi.mpls_tunnel_add_del(0xffffffff, # don't know the if index yet
- 1, # IPv4 next-hop
- nh_addr,
- self.pg0.sw_if_index,
- 0, # next-hop-table-id
- 1, # next-hop-weight
- 2, # num-out-labels,
- [44, 46])
+ reply = self.vapi.mpls_tunnel_add_del(
+ 0xffffffff, # don't know the if index yet
+ 1, # IPv4 next-hop
+ nh_addr,
+ self.pg0.sw_if_index,
+ 0, # next-hop-table-id
+ 1, # next-hop-weight
+ 2, # num-out-labels,
+ [44, 46])
self.vapi.sw_interface_set_flags(reply.sw_if_index, admin_up_down=1)
#
# add an unlabelled route through the new tunnel
#
- dest_addr = socket.inet_pton(socket.AF_INET, "10.0.0.3")
- nh_addr = socket.inet_pton(socket.AF_INET, "0.0.0.0")
- dest_addr_len = 32
-
- self.vapi.ip_add_del_route(dest_addr,
- dest_addr_len,
- nh_addr, # all zeros next-hop - tunnel is p2p
- reply.sw_if_index, # sw_if_index of the new tunnel
- 0, # table-id
- 0, # next-hop-table-id
- 1, # next-hop-weight
- 0, # num-out-labels,
- []) # out-label
+ route_10_0_0_3 = VppIpRoute(self, "10.0.0.3", 32,
+ [VppRoutePath("0.0.0.0",
+ reply.sw_if_index)])
+ route_10_0_0_3.add_vpp_config()
self.vapi.cli("clear trace")
tx = self.create_stream_ip4(self.pg0, "10.0.0.3")
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- rx = self.pg0.get_capture()
-
- try:
- self.assertEqual(0, len(rx))
- except:
- self.logger.error("MPLS TTL=0 packets forwarded")
- self.logger.error(ppp("", rx))
- raise
+ self.pg0.assert_nothing_captured(remark="MPLS TTL=0 packets forwarded")
#
# a stream with a non-zero MPLS TTL
# PG0 is in the default table
#
- self.vapi.cli("clear trace")
tx = self.create_stream_labelled_ip4(self.pg0, [0])
self.pg0.add_stream(tx)
rx = self.pg1.get_capture()
self.verify_capture_ip6(self.pg0, rx, tx)
+ def test_deag(self):
+ """ MPLS Deagg """
+
+ #
+ # A de-agg route - next-hop lookup in default table
+ #
+ route_34_eos = VppMplsRoute(self, 34, 1,
+ [VppRoutePath("0.0.0.0",
+ 0xffffffff,
+ nh_table_id=0)])
+ route_34_eos.add_vpp_config()
+
+ #
+ # ping an interface in the default table
+ # PG0 is in the default table
+ #
+ self.vapi.cli("clear trace")
+ tx = self.create_stream_labelled_ip4(self.pg0, [34], ping=1,
+ ip_itf=self.pg0)
+ self.pg0.add_stream(tx)
+
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ rx = self.pg0.get_capture()
+ self.verify_capture_ip4(self.pg0, rx, tx, ping_resp=1)
+
+ #
+ # A de-agg route - next-hop lookup in non-default table
+ #
+ route_35_eos = VppMplsRoute(self, 35, 1,
+ [VppRoutePath("0.0.0.0",
+ 0xffffffff,
+ nh_table_id=1)])
+ route_35_eos.add_vpp_config()
+
+ #
+ # ping an interface in the non-default table
+ # PG0 is in the default table. packet arrive labelled in the
+ # default table and egress unlabelled in the non-default
+ #
+ self.vapi.cli("clear trace")
+ tx = self.create_stream_labelled_ip4(
+ self.pg0, [35], ping=1, ip_itf=self.pg1)
+ self.pg0.add_stream(tx)
+
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ packet_count = self.get_packet_count_for_if_idx(self.pg0.sw_if_index)
+ rx = self.pg1.get_capture(packet_count)
+ self.verify_capture_ip4(self.pg1, rx, tx, ping_resp=1)
+
+ route_35_eos.remove_vpp_config()
+ route_34_eos.remove_vpp_config()
+
+
+class TestMPLSDisabled(VppTestCase):
+ """ MPLS disabled """
+
+ def setUp(self):
+ super(TestMPLSDisabled, self).setUp()
+
+ # create 2 pg interfaces
+ self.create_pg_interfaces(range(2))
+
+ # PG0 is MPLS enalbed
+ self.pg0.admin_up()
+ self.pg0.config_ip4()
+ self.pg0.resolve_arp()
+ self.pg0.enable_mpls()
+
+ # PG 1 is not MPLS enabled
+ self.pg1.admin_up()
+
+ def tearDown(self):
+ super(TestMPLSDisabled, self).tearDown()
+ for i in self.pg_interfaces:
+ i.unconfig_ip4()
+ i.admin_down()
+
+ def send_and_assert_no_replies(self, intf, pkts, remark):
+ intf.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ for i in self.pg_interfaces:
+ i.get_capture(0)
+ i.assert_nothing_captured(remark=remark)
+
+ def test_mpls_disabled(self):
+ """ MPLS Disabled """
+
+ tx = (Ether(src=self.pg1.remote_mac,
+ dst=self.pg1.local_mac) /
+ MPLS(label=32, ttl=64) /
+ IPv6(src="2001::1", dst=self.pg0.remote_ip6) /
+ UDP(sport=1234, dport=1234) /
+ Raw('\xa5' * 100))
+
+ #
+ # A simple MPLS xconnect - eos label in label out
+ #
+ route_32_eos = VppMplsRoute(self, 32, 1,
+ [VppRoutePath(self.pg0.remote_ip4,
+ self.pg0.sw_if_index,
+ labels=[33])])
+ route_32_eos.add_vpp_config()
+
+ #
+ # PG1 does not forward IP traffic
+ #
+ self.send_and_assert_no_replies(self.pg1, tx, "MPLS disabled")
+
+ #
+ # MPLS enable PG1
+ #
+ self.pg1.enable_mpls()
+
+ #
+ # Now we get packets through
+ #
+ self.pg1.add_stream(tx)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ rx = self.pg0.get_capture(1)
+
+ #
+ # Disable PG1
+ #
+ self.pg1.disable_mpls()
+
+ #
+ # PG1 does not forward IP traffic
+ #
+ self.send_and_assert_no_replies(self.pg1, tx, "IPv6 disabled")
+ self.send_and_assert_no_replies(self.pg1, tx, "IPv6 disabled")
+
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)