import socket
from framework import VppTestCase, VppTestRunner
-from vpp_sub_interface import VppSubInterface, VppDot1QSubint, VppDot1ADSubint
from vpp_ip_route import IpRoute, RoutePath, MplsRoute, MplsIpBind
from scapy.packet import Raw
from scapy.layers.l2 import Ether
-from scapy.layers.inet import IP, UDP
+from scapy.layers.inet import IP, UDP, ICMP
from scapy.layers.inet6 import IPv6
from scapy.contrib.mpls import MPLS
-from util import ppp
-
class TestMPLS(VppTestCase):
super(TestMPLS, self).tearDown()
# the default of 64 matches the IP packet TTL default
- def create_stream_labelled_ip4(self, src_if, mpls_labels, mpls_ttl=255):
+ def create_stream_labelled_ip4(
+ self,
+ src_if,
+ mpls_labels,
+ mpls_ttl=255,
+ ping=0,
+ ip_itf=None):
+ self.reset_packet_infos()
pkts = []
for i in range(0, 257):
- info = self.create_packet_info(src_if.sw_if_index,
- src_if.sw_if_index)
+ info = self.create_packet_info(src_if, src_if)
payload = self.info_to_payload(info)
p = Ether(dst=src_if.local_mac, src=src_if.remote_mac)
p = p / MPLS(label=mpls_labels[ii], ttl=mpls_ttl, s=1)
else:
p = p / MPLS(label=mpls_labels[ii], ttl=mpls_ttl, s=0)
- p = (p / IP(src=src_if.remote_ip4, dst=src_if.remote_ip4) /
- UDP(sport=1234, dport=1234) /
- Raw(payload))
+ if not ping:
+ p = (p / IP(src=src_if.local_ip4, dst=src_if.remote_ip4) /
+ UDP(sport=1234, dport=1234) /
+ Raw(payload))
+ else:
+ p = (p / IP(src=ip_itf.remote_ip4,
+ dst=ip_itf.local_ip4) /
+ ICMP())
+
info.data = p.copy()
pkts.append(p)
return pkts
def create_stream_ip4(self, src_if, dst_ip):
+ self.reset_packet_infos()
pkts = []
for i in range(0, 257):
- info = self.create_packet_info(src_if.sw_if_index,
- src_if.sw_if_index)
+ info = self.create_packet_info(src_if, src_if)
payload = self.info_to_payload(info)
p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
IP(src=src_if.remote_ip4, dst=dst_ip) /
return pkts
def create_stream_labelled_ip6(self, src_if, mpls_label, mpls_ttl):
+ self.reset_packet_infos()
pkts = []
for i in range(0, 257):
- info = self.create_packet_info(src_if.sw_if_index,
- src_if.sw_if_index)
+ info = self.create_packet_info(src_if, src_if)
payload = self.info_to_payload(info)
p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
MPLS(label=mpls_label, ttl=mpls_ttl) /
pkts.append(p)
return pkts
- def verify_filter(self, capture, sent):
+ @staticmethod
+ def verify_filter(capture, sent):
if not len(capture) == len(sent):
- # filter out any IPv6 RAs from the captur
+ # filter out any IPv6 RAs from the capture
for p in capture:
- if (p.haslayer(IPv6)):
+ if p.haslayer(IPv6):
capture.remove(p)
return capture
- def verify_capture_ip4(self, src_if, capture, sent):
+ def verify_capture_ip4(self, src_if, capture, sent, ping_resp=0):
try:
capture = self.verify_filter(capture, sent)
tx_ip = tx[IP]
rx_ip = rx[IP]
- self.assertEqual(rx_ip.src, tx_ip.src)
- self.assertEqual(rx_ip.dst, tx_ip.dst)
- # IP processing post pop has decremented the TTL
- self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl)
+ if not ping_resp:
+ self.assertEqual(rx_ip.src, tx_ip.src)
+ self.assertEqual(rx_ip.dst, tx_ip.dst)
+ # IP processing post pop has decremented the TTL
+ self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl)
+ else:
+ self.assertEqual(rx_ip.src, tx_ip.dst)
+ self.assertEqual(rx_ip.dst, tx_ip.src)
except:
raise
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
-
- rx = self.pg0.get_capture()
- try:
- self.assertEqual(0, len(rx))
- except:
- error("MPLS non-EOS packets popped and forwarded")
- error(packet.show())
- raise
+ self.pg0.assert_nothing_captured(
+ remark="MPLS non-EOS packets popped and forwarded")
#
# A recursive EOS x-connect, which resolves through another x-connect
labels=[44, 45])])
route_34_eos.add_vpp_config()
- self.vapi.cli("clear trace")
tx = self.create_stream_labelled_ip4(self.pg0, [34])
self.pg0.add_stream(tx)
self.verify_capture_labelled_ip4(self.pg0, rx, tx, [33, 44, 45])
#
- # A recursive non-EOS x-connect, which resolves through another x-connect
+ # A recursive non-EOS x-connect, which resolves through another
+ # x-connect
#
route_34_neos = MplsRoute(self, 34, 0,
[RoutePath("0.0.0.0",
self.pg_start()
rx = self.pg0.get_capture()
- # it's the 2nd (counting from 0) lael in the stack that is swapped
+ # it's the 2nd (counting from 0) label in the stack that is swapped
self.verify_capture_labelled(self.pg0, rx, tx, [33, 44, 46, 99], num=2)
#
- # an recursive IP route that resolves through the recursive non-eos x-connect
+ # an recursive IP route that resolves through the recursive non-eos
+ # x-connect
#
ip_10_0_0_1 = IpRoute(self, "10.0.0.1", 32,
[RoutePath("0.0.0.0",
self.verify_capture_labelled_ip4(self.pg0, rx, tx, [32, 33, 34])
#
- # add a recursive path, with ouput label, via the 1 label route
+ # add a recursive path, with output label, via the 1 label route
#
route_11_0_0_1 = IpRoute(self, "11.0.0.1", 32,
[RoutePath("10.0.0.1",
#
nh_addr = socket.inet_pton(socket.AF_INET, self.pg0.remote_ip4)
- reply = self.vapi.mpls_tunnel_add_del(0xffffffff, # don't know the if index yet
- 1, # IPv4 next-hop
- nh_addr,
- self.pg0.sw_if_index,
- 0, # next-hop-table-id
- 1, # next-hop-weight
- 2, # num-out-labels,
- [44, 46])
+ reply = self.vapi.mpls_tunnel_add_del(
+ 0xffffffff, # don't know the if index yet
+ 1, # IPv4 next-hop
+ nh_addr,
+ self.pg0.sw_if_index,
+ 0, # next-hop-table-id
+ 1, # next-hop-weight
+ 2, # num-out-labels,
+ [44, 46])
self.vapi.sw_interface_set_flags(reply.sw_if_index, admin_up_down=1)
#
nh_addr = socket.inet_pton(socket.AF_INET, "0.0.0.0")
dest_addr_len = 32
- self.vapi.ip_add_del_route(dest_addr,
- dest_addr_len,
- nh_addr, # all zeros next-hop - tunnel is p2p
- reply.sw_if_index, # sw_if_index of the new tunnel
- 0, # table-id
- 0, # next-hop-table-id
- 1, # next-hop-weight
- 0, # num-out-labels,
- []) # out-label
+ self.vapi.ip_add_del_route(
+ dest_addr,
+ dest_addr_len,
+ nh_addr, # all zeros next-hop - tunnel is p2p
+ reply.sw_if_index, # sw_if_index of the new tunnel
+ 0, # table-id
+ 0, # next-hop-table-id
+ 1, # next-hop-weight
+ 0, # num-out-labels,
+ []) # out-label
self.vapi.cli("clear trace")
tx = self.create_stream_ip4(self.pg0, "10.0.0.3")
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- rx = self.pg0.get_capture()
-
- try:
- self.assertEqual(0, len(rx))
- except:
- self.logger.error("MPLS TTL=0 packets forwarded")
- self.logger.error(ppp("", rx))
- raise
+ self.pg0.assert_nothing_captured(remark="MPLS TTL=0 packets forwarded")
#
# a stream with a non-zero MPLS TTL
# PG0 is in the default table
#
- self.vapi.cli("clear trace")
tx = self.create_stream_labelled_ip4(self.pg0, [0])
self.pg0.add_stream(tx)
rx = self.pg1.get_capture()
self.verify_capture_ip6(self.pg0, rx, tx)
+ def test_deag(self):
+ """ MPLS Deagg """
+
+ #
+ # A de-agg route - next-hop lookup in default table
+ #
+ route_34_eos = MplsRoute(self, 34, 1,
+ [RoutePath("0.0.0.0",
+ 0xffffffff,
+ nh_table_id=0)])
+ route_34_eos.add_vpp_config()
+
+ #
+ # ping an interface in the default table
+ # PG0 is in the default table
+ #
+ self.vapi.cli("clear trace")
+ tx = self.create_stream_labelled_ip4(self.pg0, [34], ping=1,
+ ip_itf=self.pg0)
+ self.pg0.add_stream(tx)
+
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ rx = self.pg0.get_capture()
+ self.verify_capture_ip4(self.pg0, rx, tx, ping_resp=1)
+
+ #
+ # A de-agg route - next-hop lookup in non-default table
+ #
+ route_35_eos = MplsRoute(self, 35, 1,
+ [RoutePath("0.0.0.0",
+ 0xffffffff,
+ nh_table_id=1)])
+ route_35_eos.add_vpp_config()
+
+ #
+ # ping an interface in the non-default table
+ # PG0 is in the default table. packet arrive labelled in the
+ # default table and egress unlabelled in the non-default
+ #
+ self.vapi.cli("clear trace")
+ tx = self.create_stream_labelled_ip4(
+ self.pg0, [35], ping=1, ip_itf=self.pg1)
+ self.pg0.add_stream(tx)
+
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ packet_count = self.get_packet_count_for_if_idx(self.pg0.sw_if_index)
+ rx = self.pg1.get_capture(packet_count)
+ self.verify_capture_ip4(self.pg1, rx, tx, ping_resp=1)
+
+ route_35_eos.remove_vpp_config()
+ route_34_eos.remove_vpp_config()
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)