+ def test_deag(self):
+ """ MPLS Deagg """
+
+ #
+ # A de-agg route - next-hop lookup in default table
+ #
+ route_34_eos = VppMplsRoute(self, 34, 1,
+ [VppRoutePath("0.0.0.0",
+ 0xffffffff,
+ nh_table_id=0)])
+ route_34_eos.add_vpp_config()
+
+ #
+ # ping an interface in the default table
+ # PG0 is in the default table
+ #
+ self.vapi.cli("clear trace")
+ tx = self.create_stream_labelled_ip4(self.pg0, [34], ping=1,
+ ip_itf=self.pg0)
+ self.pg0.add_stream(tx)
+
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ rx = self.pg0.get_capture()
+ self.verify_capture_ip4(self.pg0, rx, tx, ping_resp=1)
+
+ #
+ # A de-agg route - next-hop lookup in non-default table
+ #
+ route_35_eos = VppMplsRoute(self, 35, 1,
+ [VppRoutePath("0.0.0.0",
+ 0xffffffff,
+ nh_table_id=1)])
+ route_35_eos.add_vpp_config()
+
+ #
+ # ping an interface in the non-default table
+ # PG0 is in the default table. packet arrive labelled in the
+ # default table and egress unlabelled in the non-default
+ #
+ self.vapi.cli("clear trace")
+ tx = self.create_stream_labelled_ip4(
+ self.pg0, [35], ping=1, ip_itf=self.pg1)
+ self.pg0.add_stream(tx)
+
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ packet_count = self.get_packet_count_for_if_idx(self.pg0.sw_if_index)
+ rx = self.pg1.get_capture(packet_count)
+ self.verify_capture_ip4(self.pg1, rx, tx, ping_resp=1)
+
+ route_35_eos.remove_vpp_config()
+ route_34_eos.remove_vpp_config()
+
+
+class TestMPLSDisabled(VppTestCase):
+ """ MPLS disabled """
+
+ def setUp(self):
+ super(TestMPLSDisabled, self).setUp()
+
+ # create 2 pg interfaces
+ self.create_pg_interfaces(range(2))
+
+ # PG0 is MPLS enalbed
+ self.pg0.admin_up()
+ self.pg0.config_ip4()
+ self.pg0.resolve_arp()
+ self.pg0.enable_mpls()
+
+ # PG 1 is not MPLS enabled
+ self.pg1.admin_up()
+
+ def tearDown(self):
+ super(TestMPLSDisabled, self).tearDown()
+ for i in self.pg_interfaces:
+ i.unconfig_ip4()
+ i.admin_down()
+
+ def send_and_assert_no_replies(self, intf, pkts, remark):
+ intf.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ for i in self.pg_interfaces:
+ i.get_capture(0)
+ i.assert_nothing_captured(remark=remark)
+
+ def test_mpls_disabled(self):
+ """ MPLS Disabled """
+
+ tx = (Ether(src=self.pg1.remote_mac,
+ dst=self.pg1.local_mac) /
+ MPLS(label=32, ttl=64) /
+ IPv6(src="2001::1", dst=self.pg0.remote_ip6) /
+ UDP(sport=1234, dport=1234) /
+ Raw('\xa5' * 100))
+
+ #
+ # A simple MPLS xconnect - eos label in label out
+ #
+ route_32_eos = VppMplsRoute(self, 32, 1,
+ [VppRoutePath(self.pg0.remote_ip4,
+ self.pg0.sw_if_index,
+ labels=[33])])
+ route_32_eos.add_vpp_config()
+
+ #
+ # PG1 does not forward IP traffic
+ #
+ self.send_and_assert_no_replies(self.pg1, tx, "MPLS disabled")
+
+ #
+ # MPLS enable PG1
+ #
+ self.pg1.enable_mpls()
+
+ #
+ # Now we get packets through
+ #
+ self.pg1.add_stream(tx)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ rx = self.pg0.get_capture(1)
+
+ #
+ # Disable PG1
+ #
+ self.pg1.disable_mpls()
+
+ #
+ # PG1 does not forward IP traffic
+ #
+ self.send_and_assert_no_replies(self.pg1, tx, "IPv6 disabled")
+ self.send_and_assert_no_replies(self.pg1, tx, "IPv6 disabled")
+