6 from framework import VppTestCase, VppTestRunner
7 from vpp_ip import DpoProto
8 from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute, \
9 VppIpTable, VppMplsTable, VppMplsLabel
10 from vpp_mpls_tunnel_interface import VppMPLSTunnelInterface
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether
14 from scapy.layers.inet import IP, UDP, ICMP
15 from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded
16 from scapy.contrib.mpls import MPLS
19 def verify_filter(capture, sent):
20 if not len(capture) == len(sent):
21 # filter out any IPv6 RAs from the capture
28 def verify_mpls_stack(tst, rx, mpls_labels):
29 # the rx'd packet has the MPLS label popped
31 tst.assertEqual(eth.type, 0x8847)
35 for ii in range(len(mpls_labels)):
36 tst.assertEqual(rx_mpls.label, mpls_labels[ii].value)
37 tst.assertEqual(rx_mpls.cos, mpls_labels[ii].exp)
38 tst.assertEqual(rx_mpls.ttl, mpls_labels[ii].ttl)
40 if ii == len(mpls_labels) - 1:
41 tst.assertEqual(rx_mpls.s, 1)
44 tst.assertEqual(rx_mpls.s, 0)
45 # pop the label to expose the next
46 rx_mpls = rx_mpls[MPLS].payload
49 class TestSRMPLS(VppTestCase):
50 """ SR-MPLS Test Case """
54 super(TestSRMPLS, cls).setUpClass()
57 def tearDownClass(cls):
58 super(TestSRMPLS, cls).tearDownClass()
61 super(TestSRMPLS, self).setUp()
63 # create 2 pg interfaces
64 self.create_pg_interfaces(range(4))
66 # setup both interfaces
67 # assign them different tables.
71 tbl = VppMplsTable(self, 0)
73 self.tables.append(tbl)
75 for i in self.pg_interfaces:
84 for i in self.pg_interfaces:
90 super(TestSRMPLS, self).tearDown()
92 def create_stream_ip4(self, src_if, dst_ip, ip_ttl=64, ip_dscp=0):
93 self.reset_packet_infos()
95 for i in range(0, 257):
96 info = self.create_packet_info(src_if, src_if)
97 payload = self.info_to_payload(info)
98 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
99 IP(src=src_if.remote_ip4, dst=dst_ip,
100 ttl=ip_ttl, tos=ip_dscp) /
101 UDP(sport=1234, dport=1234) /
107 def verify_capture_labelled_ip4(self, src_if, capture, sent,
108 mpls_labels, ip_ttl=None):
110 capture = verify_filter(capture, sent)
112 self.assertEqual(len(capture), len(sent))
114 for i in range(len(capture)):
120 verify_mpls_stack(self, rx, mpls_labels)
122 self.assertEqual(rx_ip.src, tx_ip.src)
123 self.assertEqual(rx_ip.dst, tx_ip.dst)
125 # IP processing post pop has decremented the TTL
126 self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl)
128 self.assertEqual(rx_ip.ttl, ip_ttl)
133 def verify_capture_tunneled_ip4(self, src_if, capture, sent, mpls_labels):
135 capture = verify_filter(capture, sent)
137 self.assertEqual(len(capture), len(sent))
139 for i in range(len(capture)):
145 verify_mpls_stack(self, rx, mpls_labels)
147 self.assertEqual(rx_ip.src, tx_ip.src)
148 self.assertEqual(rx_ip.dst, tx_ip.dst)
149 # IP processing post pop has decremented the TTL
150 self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl)
155 def test_sr_mpls(self):
159 # A simple MPLS xconnect - neos label in label out
161 route_32_eos = VppMplsRoute(self, 32, 0,
162 [VppRoutePath(self.pg0.remote_ip4,
163 self.pg0.sw_if_index,
164 labels=[VppMplsLabel(32)])])
165 route_32_eos.add_vpp_config()
168 # A binding SID with only one label
170 self.vapi.sr_mpls_policy_add(999, 1, 0, [32])
173 # A labeled IP route that resolves thru the binding SID
175 ip_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
176 [VppRoutePath("0.0.0.0",
179 labels=[VppMplsLabel(55)])])
180 ip_10_0_0_1.add_vpp_config()
182 tx = self.create_stream_ip4(self.pg1, "10.0.0.1")
183 rx = self.send_and_expect(self.pg1, tx, self.pg0)
184 self.verify_capture_labelled_ip4(self.pg0, rx, tx,
189 # An unlabeled IP route that resolves thru the binding SID
191 ip_10_0_0_1 = VppIpRoute(self, "10.0.0.2", 32,
192 [VppRoutePath("0.0.0.0",
195 ip_10_0_0_1.add_vpp_config()
197 tx = self.create_stream_ip4(self.pg1, "10.0.0.2")
198 rx = self.send_and_expect(self.pg1, tx, self.pg0)
199 self.verify_capture_labelled_ip4(self.pg0, rx, tx,
202 self.vapi.sr_mpls_policy_del(999)
205 # this time the SID has many labels pushed
207 self.vapi.sr_mpls_policy_add(999, 1, 0, [32, 33, 34])
209 tx = self.create_stream_ip4(self.pg1, "10.0.0.1")
210 rx = self.send_and_expect(self.pg1, tx, self.pg0)
211 self.verify_capture_labelled_ip4(self.pg0, rx, tx,
216 tx = self.create_stream_ip4(self.pg1, "10.0.0.2")
217 rx = self.send_and_expect(self.pg1, tx, self.pg0)
218 self.verify_capture_labelled_ip4(self.pg0, rx, tx,
224 # Resolve an MPLS tunnel via the SID
226 mpls_tun = VppMPLSTunnelInterface(
228 [VppRoutePath("0.0.0.0",
231 labels=[VppMplsLabel(44),
233 mpls_tun.add_vpp_config()
237 # add an unlabelled route through the new tunnel
239 route_10_0_0_3 = VppIpRoute(self, "10.0.0.3", 32,
240 [VppRoutePath("0.0.0.0",
241 mpls_tun._sw_if_index)])
242 route_10_0_0_3.add_vpp_config()
243 self.logger.info(self.vapi.cli("sh mpls tun 0"))
244 self.logger.info(self.vapi.cli("sh adj 21"))
246 tx = self.create_stream_ip4(self.pg1, "10.0.0.3")
247 rx = self.send_and_expect(self.pg1, tx, self.pg0)
248 self.verify_capture_tunneled_ip4(self.pg0, rx, tx,
256 # add a labelled route through the new tunnel
258 route_10_0_0_3 = VppIpRoute(self, "10.0.0.4", 32,
259 [VppRoutePath("0.0.0.0",
260 mpls_tun._sw_if_index,
261 labels=[VppMplsLabel(55)])])
262 route_10_0_0_3.add_vpp_config()
264 tx = self.create_stream_ip4(self.pg1, "10.0.0.4")
265 rx = self.send_and_expect(self.pg1, tx, self.pg0)
266 self.verify_capture_tunneled_ip4(self.pg0, rx, tx,
274 self.vapi.sr_mpls_policy_del(999)
277 if __name__ == '__main__':
278 unittest.main(testRunner=VppTestRunner)