6 from framework import VppTestCase, VppTestRunner
7 from vpp_ip import DpoProto
8 from vpp_ip_route import (
16 from vpp_mpls_tunnel_interface import VppMPLSTunnelInterface
18 from scapy.packet import Raw
19 from scapy.layers.l2 import Ether
20 from scapy.layers.inet import IP, UDP, ICMP
21 from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded
22 from scapy.contrib.mpls import MPLS
25 def verify_filter(capture, sent):
26 if not len(capture) == len(sent):
27 # filter out any IPv6 RAs from the capture
34 def verify_mpls_stack(tst, rx, mpls_labels):
35 # the rx'd packet has the MPLS label popped
37 tst.assertEqual(eth.type, 0x8847)
41 for ii in range(len(mpls_labels)):
42 tst.assertEqual(rx_mpls.label, mpls_labels[ii].value)
43 tst.assertEqual(rx_mpls.cos, mpls_labels[ii].exp)
44 tst.assertEqual(rx_mpls.ttl, mpls_labels[ii].ttl)
46 if ii == len(mpls_labels) - 1:
47 tst.assertEqual(rx_mpls.s, 1)
50 tst.assertEqual(rx_mpls.s, 0)
51 # pop the label to expose the next
52 rx_mpls = rx_mpls[MPLS].payload
55 class TestSRMPLS(VppTestCase):
56 """SR-MPLS Test Case"""
60 super(TestSRMPLS, cls).setUpClass()
63 def tearDownClass(cls):
64 super(TestSRMPLS, cls).tearDownClass()
67 super(TestSRMPLS, self).setUp()
69 # create 2 pg interfaces
70 self.create_pg_interfaces(range(4))
72 # setup both interfaces
73 # assign them different tables.
77 tbl = VppMplsTable(self, 0)
79 self.tables.append(tbl)
81 for i in self.pg_interfaces:
90 for i in self.pg_interfaces:
95 super(TestSRMPLS, self).tearDown()
97 def create_stream_ip4(self, src_if, dst_ip, ip_ttl=64, ip_dscp=0):
98 self.reset_packet_infos()
100 for i in range(0, 257):
101 info = self.create_packet_info(src_if, src_if)
102 payload = self.info_to_payload(info)
104 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
105 / IP(src=src_if.remote_ip4, dst=dst_ip, ttl=ip_ttl, tos=ip_dscp)
106 / UDP(sport=1234, dport=1234)
113 def verify_capture_labelled_ip4(
114 self, src_if, capture, sent, mpls_labels, ip_ttl=None
117 capture = verify_filter(capture, sent)
119 self.assertEqual(len(capture), len(sent))
121 for i in range(len(capture)):
127 verify_mpls_stack(self, rx, mpls_labels)
129 self.assertEqual(rx_ip.src, tx_ip.src)
130 self.assertEqual(rx_ip.dst, tx_ip.dst)
132 # IP processing post pop has decremented the TTL
133 self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl)
135 self.assertEqual(rx_ip.ttl, ip_ttl)
140 def verify_capture_tunneled_ip4(self, src_if, capture, sent, mpls_labels):
142 capture = verify_filter(capture, sent)
144 self.assertEqual(len(capture), len(sent))
146 for i in range(len(capture)):
152 verify_mpls_stack(self, rx, mpls_labels)
154 self.assertEqual(rx_ip.src, tx_ip.src)
155 self.assertEqual(rx_ip.dst, tx_ip.dst)
156 # IP processing post pop has decremented the TTL
157 self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl)
162 def test_sr_mpls(self):
166 # A simple MPLS xconnect - neos label in label out
168 route_32_eos = VppMplsRoute(
174 self.pg0.remote_ip4, self.pg0.sw_if_index, labels=[VppMplsLabel(32)]
178 route_32_eos.add_vpp_config()
181 # A binding SID with only one label
183 self.vapi.sr_mpls_policy_add(999, 1, 0, [32])
186 # A labeled IP route that resolves thru the binding SID
188 ip_10_0_0_1 = VppIpRoute(
194 "0.0.0.0", 0xFFFFFFFF, nh_via_label=999, labels=[VppMplsLabel(55)]
198 ip_10_0_0_1.add_vpp_config()
200 tx = self.create_stream_ip4(self.pg1, "10.0.0.1")
201 rx = self.send_and_expect(self.pg1, tx, self.pg0)
202 self.verify_capture_labelled_ip4(
203 self.pg0, rx, tx, [VppMplsLabel(32), VppMplsLabel(55)]
207 # An unlabeled IP route that resolves thru the binding SID
209 ip_10_0_0_1 = VppIpRoute(
213 [VppRoutePath("0.0.0.0", 0xFFFFFFFF, nh_via_label=999)],
215 ip_10_0_0_1.add_vpp_config()
217 tx = self.create_stream_ip4(self.pg1, "10.0.0.2")
218 rx = self.send_and_expect(self.pg1, tx, self.pg0)
219 self.verify_capture_labelled_ip4(self.pg0, rx, tx, [VppMplsLabel(32)])
221 self.vapi.sr_mpls_policy_del(999)
224 # this time the SID has many labels pushed
226 self.vapi.sr_mpls_policy_add(999, 1, 0, [32, 33, 34])
228 tx = self.create_stream_ip4(self.pg1, "10.0.0.1")
229 rx = self.send_and_expect(self.pg1, tx, self.pg0)
230 self.verify_capture_labelled_ip4(
234 [VppMplsLabel(32), VppMplsLabel(33), VppMplsLabel(34), VppMplsLabel(55)],
236 tx = self.create_stream_ip4(self.pg1, "10.0.0.2")
237 rx = self.send_and_expect(self.pg1, tx, self.pg0)
238 self.verify_capture_labelled_ip4(
239 self.pg0, rx, tx, [VppMplsLabel(32), VppMplsLabel(33), VppMplsLabel(34)]
243 # Resolve an MPLS tunnel via the SID
245 mpls_tun = VppMPLSTunnelInterface(
252 labels=[VppMplsLabel(44), VppMplsLabel(46)],
256 mpls_tun.add_vpp_config()
260 # add an unlabelled route through the new tunnel
262 route_10_0_0_3 = VppIpRoute(
263 self, "10.0.0.3", 32, [VppRoutePath("0.0.0.0", mpls_tun._sw_if_index)]
265 route_10_0_0_3.add_vpp_config()
266 self.logger.info(self.vapi.cli("sh mpls tun 0"))
267 self.logger.info(self.vapi.cli("sh adj 21"))
269 tx = self.create_stream_ip4(self.pg1, "10.0.0.3")
270 rx = self.send_and_expect(self.pg1, tx, self.pg0)
271 self.verify_capture_tunneled_ip4(
285 # add a labelled route through the new tunnel
287 route_10_0_0_3 = VppIpRoute(
291 [VppRoutePath("0.0.0.0", mpls_tun._sw_if_index, labels=[VppMplsLabel(55)])],
293 route_10_0_0_3.add_vpp_config()
295 tx = self.create_stream_ip4(self.pg1, "10.0.0.4")
296 rx = self.send_and_expect(self.pg1, tx, self.pg0)
297 self.verify_capture_tunneled_ip4(
311 self.vapi.sr_mpls_policy_del(999)
314 if __name__ == "__main__":
315 unittest.main(testRunner=VppTestRunner)