5 from framework import VppTestCase
6 from asfframework import VppTestRunner
7 from vpp_ip_route import (
14 from vpp_mpls_tunnel_interface import VppMPLSTunnelInterface
16 from scapy.packet import Raw
17 from scapy.layers.l2 import Ether
18 from scapy.layers.inet import IP, UDP
19 from scapy.layers.inet6 import IPv6
20 from scapy.contrib.mpls import MPLS
23 def verify_filter(capture, sent):
24 if not len(capture) == len(sent):
25 # filter out any IPv6 RAs from the capture
32 def verify_mpls_stack(tst, rx, mpls_labels):
33 # the rx'd packet has the MPLS label popped
35 tst.assertEqual(eth.type, 0x8847)
39 for ii in range(len(mpls_labels)):
40 tst.assertEqual(rx_mpls.label, mpls_labels[ii].value)
41 tst.assertEqual(rx_mpls.cos, mpls_labels[ii].exp)
42 tst.assertEqual(rx_mpls.ttl, mpls_labels[ii].ttl)
44 if ii == len(mpls_labels) - 1:
45 tst.assertEqual(rx_mpls.s, 1)
48 tst.assertEqual(rx_mpls.s, 0)
49 # pop the label to expose the next
50 rx_mpls = rx_mpls[MPLS].payload
53 class TestSRMPLS(VppTestCase):
54 """SR-MPLS Test Case"""
58 super(TestSRMPLS, cls).setUpClass()
61 def tearDownClass(cls):
62 super(TestSRMPLS, cls).tearDownClass()
65 super(TestSRMPLS, self).setUp()
67 # create 2 pg interfaces
68 self.create_pg_interfaces(range(4))
70 # setup both interfaces
71 # assign them different tables.
75 tbl = VppMplsTable(self, 0)
77 self.tables.append(tbl)
79 for i in self.pg_interfaces:
88 for i in self.pg_interfaces:
93 super(TestSRMPLS, self).tearDown()
95 def create_stream_ip4(self, src_if, dst_ip, ip_ttl=64, ip_dscp=0):
96 self.reset_packet_infos()
98 for i in range(0, 257):
99 info = self.create_packet_info(src_if, src_if)
100 payload = self.info_to_payload(info)
102 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
103 / IP(src=src_if.remote_ip4, dst=dst_ip, ttl=ip_ttl, tos=ip_dscp)
104 / UDP(sport=1234, dport=1234)
111 def verify_capture_labelled_ip4(
112 self, src_if, capture, sent, mpls_labels, ip_ttl=None
115 capture = verify_filter(capture, sent)
117 self.assertEqual(len(capture), len(sent))
119 for i in range(len(capture)):
125 verify_mpls_stack(self, rx, mpls_labels)
127 self.assertEqual(rx_ip.src, tx_ip.src)
128 self.assertEqual(rx_ip.dst, tx_ip.dst)
130 # IP processing post pop has decremented the TTL
131 self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl)
133 self.assertEqual(rx_ip.ttl, ip_ttl)
138 def verify_capture_tunneled_ip4(self, src_if, capture, sent, mpls_labels):
140 capture = verify_filter(capture, sent)
142 self.assertEqual(len(capture), len(sent))
144 for i in range(len(capture)):
150 verify_mpls_stack(self, rx, mpls_labels)
152 self.assertEqual(rx_ip.src, tx_ip.src)
153 self.assertEqual(rx_ip.dst, tx_ip.dst)
154 # IP processing post pop has decremented the TTL
155 self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl)
160 def test_sr_mpls(self):
164 # A simple MPLS xconnect - neos label in label out
166 route_32_eos = VppMplsRoute(
172 self.pg0.remote_ip4, self.pg0.sw_if_index, labels=[VppMplsLabel(32)]
176 route_32_eos.add_vpp_config()
179 # A binding SID with only one label
181 self.vapi.sr_mpls_policy_add(999, 1, 0, [32])
184 # A labeled IP route that resolves thru the binding SID
186 ip_10_0_0_1 = VppIpRoute(
192 "0.0.0.0", 0xFFFFFFFF, nh_via_label=999, labels=[VppMplsLabel(55)]
196 ip_10_0_0_1.add_vpp_config()
198 tx = self.create_stream_ip4(self.pg1, "10.0.0.1")
199 rx = self.send_and_expect(self.pg1, tx, self.pg0)
200 self.verify_capture_labelled_ip4(
201 self.pg0, rx, tx, [VppMplsLabel(32), VppMplsLabel(55)]
205 # An unlabeled IP route that resolves thru the binding SID
207 ip_10_0_0_1 = VppIpRoute(
211 [VppRoutePath("0.0.0.0", 0xFFFFFFFF, nh_via_label=999)],
213 ip_10_0_0_1.add_vpp_config()
215 tx = self.create_stream_ip4(self.pg1, "10.0.0.2")
216 rx = self.send_and_expect(self.pg1, tx, self.pg0)
217 self.verify_capture_labelled_ip4(self.pg0, rx, tx, [VppMplsLabel(32)])
219 self.vapi.sr_mpls_policy_del(999)
222 # this time the SID has many labels pushed
224 self.vapi.sr_mpls_policy_add(999, 1, 0, [32, 33, 34])
226 tx = self.create_stream_ip4(self.pg1, "10.0.0.1")
227 rx = self.send_and_expect(self.pg1, tx, self.pg0)
228 self.verify_capture_labelled_ip4(
232 [VppMplsLabel(32), VppMplsLabel(33), VppMplsLabel(34), VppMplsLabel(55)],
234 tx = self.create_stream_ip4(self.pg1, "10.0.0.2")
235 rx = self.send_and_expect(self.pg1, tx, self.pg0)
236 self.verify_capture_labelled_ip4(
237 self.pg0, rx, tx, [VppMplsLabel(32), VppMplsLabel(33), VppMplsLabel(34)]
241 # Resolve an MPLS tunnel via the SID
243 mpls_tun = VppMPLSTunnelInterface(
250 labels=[VppMplsLabel(44), VppMplsLabel(46)],
254 mpls_tun.add_vpp_config()
258 # add an unlabelled route through the new tunnel
260 route_10_0_0_3 = VppIpRoute(
261 self, "10.0.0.3", 32, [VppRoutePath("0.0.0.0", mpls_tun._sw_if_index)]
263 route_10_0_0_3.add_vpp_config()
264 self.logger.info(self.vapi.cli("sh mpls tun 0"))
265 self.logger.info(self.vapi.cli("sh adj 21"))
267 tx = self.create_stream_ip4(self.pg1, "10.0.0.3")
268 rx = self.send_and_expect(self.pg1, tx, self.pg0)
269 self.verify_capture_tunneled_ip4(
283 # add a labelled route through the new tunnel
285 route_10_0_0_3 = VppIpRoute(
289 [VppRoutePath("0.0.0.0", mpls_tun._sw_if_index, labels=[VppMplsLabel(55)])],
291 route_10_0_0_3.add_vpp_config()
293 tx = self.create_stream_ip4(self.pg1, "10.0.0.4")
294 rx = self.send_and_expect(self.pg1, tx, self.pg0)
295 self.verify_capture_tunneled_ip4(
309 self.vapi.sr_mpls_policy_del(999)
312 if __name__ == "__main__":
313 unittest.main(testRunner=VppTestRunner)