from scapy.packet import Raw
from scapy.layers.l2 import Ether
-from scapy.layers.inet import IP, UDP
+from scapy.layers.inet import IP, UDP, ICMP
from scapy.layers.inet6 import IPv6
from scapy.contrib.mpls import MPLS
-from util import ppp
class TestMPLS(VppTestCase):
super(TestMPLS, self).tearDown()
# the default of 64 matches the IP packet TTL default
- def create_stream_labelled_ip4(self, src_if, mpls_labels, mpls_ttl=255):
+ def create_stream_labelled_ip4(
+ self,
+ src_if,
+ mpls_labels,
+ mpls_ttl=255,
+ ping=0,
+ ip_itf=None):
+ self.reset_packet_infos()
pkts = []
for i in range(0, 257):
- info = self.create_packet_info(src_if.sw_if_index,
- src_if.sw_if_index)
+ info = self.create_packet_info(src_if, src_if)
payload = self.info_to_payload(info)
p = Ether(dst=src_if.local_mac, src=src_if.remote_mac)
p = p / MPLS(label=mpls_labels[ii], ttl=mpls_ttl, s=1)
else:
p = p / MPLS(label=mpls_labels[ii], ttl=mpls_ttl, s=0)
- p = (p / IP(src=src_if.remote_ip4, dst=src_if.remote_ip4) /
- UDP(sport=1234, dport=1234) /
- Raw(payload))
+ if not ping:
+ p = (p / IP(src=src_if.local_ip4, dst=src_if.remote_ip4) /
+ UDP(sport=1234, dport=1234) /
+ Raw(payload))
+ else:
+ p = (p / IP(src=ip_itf.remote_ip4,
+ dst=ip_itf.local_ip4) /
+ ICMP())
+
info.data = p.copy()
pkts.append(p)
return pkts
def create_stream_ip4(self, src_if, dst_ip):
+ self.reset_packet_infos()
pkts = []
for i in range(0, 257):
- info = self.create_packet_info(src_if.sw_if_index,
- src_if.sw_if_index)
+ info = self.create_packet_info(src_if, src_if)
payload = self.info_to_payload(info)
p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
IP(src=src_if.remote_ip4, dst=dst_ip) /
return pkts
def create_stream_labelled_ip6(self, src_if, mpls_label, mpls_ttl):
+ self.reset_packet_infos()
pkts = []
for i in range(0, 257):
- info = self.create_packet_info(src_if.sw_if_index,
- src_if.sw_if_index)
+ info = self.create_packet_info(src_if, src_if)
payload = self.info_to_payload(info)
p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
MPLS(label=mpls_label, ttl=mpls_ttl) /
capture.remove(p)
return capture
- def verify_capture_ip4(self, src_if, capture, sent):
+ def verify_capture_ip4(self, src_if, capture, sent, ping_resp=0):
try:
capture = self.verify_filter(capture, sent)
tx_ip = tx[IP]
rx_ip = rx[IP]
- self.assertEqual(rx_ip.src, tx_ip.src)
- self.assertEqual(rx_ip.dst, tx_ip.dst)
- # IP processing post pop has decremented the TTL
- self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl)
+ if not ping_resp:
+ self.assertEqual(rx_ip.src, tx_ip.src)
+ self.assertEqual(rx_ip.dst, tx_ip.dst)
+ # IP processing post pop has decremented the TTL
+ self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl)
+ else:
+ self.assertEqual(rx_ip.src, tx_ip.dst)
+ self.assertEqual(rx_ip.dst, tx_ip.src)
except:
raise
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
-
- rx = self.pg0.get_capture()
- try:
- self.assertEqual(0, len(rx))
- except:
- self.logger.error("MPLS non-EOS packets popped and forwarded")
- self.logger.error(ppp("", rx))
- raise
+ self.pg0.assert_nothing_captured(
+ remark="MPLS non-EOS packets popped and forwarded")
#
# A recursive EOS x-connect, which resolves through another x-connect
labels=[44, 45])])
route_34_eos.add_vpp_config()
- self.vapi.cli("clear trace")
tx = self.create_stream_labelled_ip4(self.pg0, [34])
self.pg0.add_stream(tx)
0, # next-hop-table-id
1, # next-hop-weight
2, # num-out-labels,
- [44, 46]
- )
+ [44, 46])
self.vapi.sw_interface_set_flags(reply.sw_if_index, admin_up_down=1)
#
0, # next-hop-table-id
1, # next-hop-weight
0, # num-out-labels,
- [] # out-label
- )
+ []) # out-label
self.vapi.cli("clear trace")
tx = self.create_stream_ip4(self.pg0, "10.0.0.3")
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
- rx = self.pg0.get_capture()
-
- try:
- self.assertEqual(0, len(rx))
- except:
- self.logger.error("MPLS TTL=0 packets forwarded")
- self.logger.error(ppp("", rx))
- raise
+ self.pg0.assert_nothing_captured(remark="MPLS TTL=0 packets forwarded")
#
# a stream with a non-zero MPLS TTL
# PG0 is in the default table
#
- self.vapi.cli("clear trace")
tx = self.create_stream_labelled_ip4(self.pg0, [0])
self.pg0.add_stream(tx)
rx = self.pg1.get_capture()
self.verify_capture_ip6(self.pg0, rx, tx)
+ def test_deag(self):
+ """ MPLS Deagg """
+
+ #
+ # A de-agg route - next-hop lookup in default table
+ #
+ route_34_eos = MplsRoute(self, 34, 1,
+ [RoutePath("0.0.0.0",
+ 0xffffffff,
+ nh_table_id=0)])
+ route_34_eos.add_vpp_config()
+
+ #
+ # ping an interface in the default table
+ # PG0 is in the default table
+ #
+ self.vapi.cli("clear trace")
+ tx = self.create_stream_labelled_ip4(self.pg0, [34], ping=1,
+ ip_itf=self.pg0)
+ self.pg0.add_stream(tx)
+
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ rx = self.pg0.get_capture()
+ self.verify_capture_ip4(self.pg0, rx, tx, ping_resp=1)
+
+ #
+ # A de-agg route - next-hop lookup in non-default table
+ #
+ route_35_eos = MplsRoute(self, 35, 1,
+ [RoutePath("0.0.0.0",
+ 0xffffffff,
+ nh_table_id=1)])
+ route_35_eos.add_vpp_config()
+
+ #
+ # ping an interface in the non-default table
+ # PG0 is in the default table. packet arrive labelled in the
+ # default table and egress unlabelled in the non-default
+ #
+ self.vapi.cli("clear trace")
+ tx = self.create_stream_labelled_ip4(
+ self.pg0, [35], ping=1, ip_itf=self.pg1)
+ self.pg0.add_stream(tx)
+
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ packet_count = self.get_packet_count_for_if_idx(self.pg0.sw_if_index)
+ rx = self.pg1.get_capture(packet_count)
+ self.verify_capture_ip4(self.pg1, rx, tx, ping_resp=1)
+
+ route_35_eos.remove_vpp_config()
+ route_34_eos.remove_vpp_config()
if __name__ == '__main__':
unittest.main(testRunner=VppTestRunner)