#!/usr/bin/env python import unittest import socket from logging import * from framework import VppTestCase, VppTestRunner from vpp_sub_interface import VppSubInterface, VppDot1QSubint, VppDot1ADSubint from scapy.packet import Raw from scapy.layers.l2 import Ether, Dot1Q, ARP from scapy.layers.inet import IP, UDP from scapy.layers.inet6 import ICMPv6ND_NS, IPv6, UDP from scapy.contrib.mpls import MPLS class TestMPLS(VppTestCase): """ MPLS Test Case """ @classmethod def setUpClass(cls): super(TestMPLS, cls).setUpClass() def setUp(self): super(TestMPLS, self).setUp() # create 2 pg interfaces self.create_pg_interfaces(range(3)) # setup both interfaces # assign them different tables. table_id = 0 for i in self.pg_interfaces: i.admin_up() i.set_table_ip4(table_id) i.set_table_ip6(table_id) i.config_ip4() i.config_ip6() i.enable_mpls() i.resolve_arp() i.resolve_ndp() table_id += 1 def tearDown(self): super(TestMPLS, self).tearDown() def create_stream_ip4(self, src_if, mpls_label, mpls_ttl): pkts = [] for i in range(0, 257): info = self.create_packet_info(src_if.sw_if_index, src_if.sw_if_index) payload = self.info_to_payload(info) p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) / MPLS(label=mpls_label, ttl=mpls_ttl) / IP(src=src_if.remote_ip4, dst=src_if.remote_ip4) / UDP(sport=1234, dport=1234) / Raw(payload)) info.data = p.copy() pkts.append(p) return pkts def create_stream_ip6(self, src_if, mpls_label, mpls_ttl): pkts = [] for i in range(0, 257): info = self.create_packet_info(src_if.sw_if_index, src_if.sw_if_index) payload = self.info_to_payload(info) p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) / MPLS(label=mpls_label, ttl=mpls_ttl) / IPv6(src=src_if.remote_ip6, dst=src_if.remote_ip6) / UDP(sport=1234, dport=1234) / Raw(payload)) info.data = p.copy() pkts.append(p) return pkts def verify_capture_ip4(self, src_if, capture, sent): try: self.assertEqual(len(capture), len(sent)) for i in range(len(capture)): tx = sent[i] rx = capture[i] # the rx'd packet has the MPLS label popped eth = rx[Ether]; self.assertEqual(eth.type, 0x800); tx_ip = tx[IP] rx_ip = rx[IP] self.assertEqual(rx_ip.src, tx_ip.src) self.assertEqual(rx_ip.dst, tx_ip.dst) # IP processing post pop has decremented the TTL self.assertEqual(rx_ip.ttl+1, tx_ip.ttl) except: raise; def verify_capture_ip6(self, src_if, capture, sent): try: self.assertEqual(len(capture), len(sent)) for i in range(len(capture)): tx = sent[i] rx = capture[i] # the rx'd packet has the MPLS label popped eth = rx[Ether]; self.assertEqual(eth.type, 0x86DD); tx_ip = tx[IPv6] rx_ip = rx[IPv6] self.assertEqual(rx_ip.src, tx_ip.src) self.assertEqual(rx_ip.dst, tx_ip.dst) # IP processing post pop has decremented the TTL self.assertEqual(rx_ip.hlim + 1, tx_ip.hlim) except: raise; def test_v4_exp_null(self): """ MPLS V4 Explicit NULL test """ # # The first test case has an MPLS TTL of 0 # all packet should be dropped # tx = self.create_stream_ip4(self.pg0, 0, 0) self.pg0.add_stream(tx) self.pg_enable_capture(self.pg_interfaces) self.pg_start() rx = self.pg0.get_capture() try: self.assertEqual(0, len(rx)); except: error("MPLS TTL=0 packets forwarded") error(packet.show()) raise # # a stream with a non-zero MPLS TTL # PG0 is in the default table # self.vapi.cli("clear trace") tx = self.create_stream_ip4(self.pg0, 0, 2) self.pg0.add_stream(tx) self.pg_enable_capture(self.pg_interfaces) self.pg_start() rx = self.pg0.get_capture() self.verify_capture_ip4(self.pg0, rx, tx) # # a stream with a non-zero MPLS TTL # PG1 is in table 1 # we are ensuring the post-pop lookup occurs in the VRF table # self.vapi.cli("clear trace") tx = self.create_stream_ip4(self.pg1, 0, 2) self.pg1.add_stream(tx) self.pg_enable_capture(self.pg_interfaces) self.pg_start() rx = self.pg1.get_capture() self.verify_capture_ip4(self.pg0, rx, tx) def test_v6_exp_null(self): """ MPLS V6 Explicit NULL test """ # # a stream with a non-zero MPLS TTL # PG0 is in the default table # self.vapi.cli("clear trace") tx = self.create_stream_ip6(self.pg0, 2, 2) self.pg0.add_stream(tx) self.pg_enable_capture(self.pg_interfaces) self.pg_start() rx = self.pg0.get_capture() self.verify_capture_ip6(self.pg0, rx, tx) # # a stream with a non-zero MPLS TTL # PG1 is in table 1 # we are ensuring the post-pop lookup occurs in the VRF table # self.vapi.cli("clear trace") tx = self.create_stream_ip6(self.pg1, 2, 2) self.pg1.add_stream(tx) self.pg_enable_capture(self.pg_interfaces) self.pg_start() rx = self.pg1.get_capture() self.verify_capture_ip6(self.pg0, rx, tx) if __name__ == '__main__': unittest.main(testRunner=VppTestRunner)