6 from framework import VppTestCase, VppTestRunner
7 from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute, \
8 VppMplsIpBind, VppIpMRoute, VppMRoutePath, \
9 MRouteItfFlags, MRouteEntryFlags, DpoProto, VppIpTable, VppMplsTable, \
10 VppMplsLabel, MplsLspMode
11 from vpp_mpls_tunnel_interface import VppMPLSTunnelInterface
13 from scapy.packet import Raw
14 from scapy.layers.l2 import Ether
15 from scapy.layers.inet import IP, UDP, ICMP
16 from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded
17 from scapy.contrib.mpls import MPLS
20 def verify_filter(capture, sent):
21 if not len(capture) == len(sent):
22 # filter out any IPv6 RAs from the capture
29 def verify_mpls_stack(tst, rx, mpls_labels):
30 # the rx'd packet has the MPLS label popped
32 tst.assertEqual(eth.type, 0x8847)
36 for ii in range(len(mpls_labels)):
37 tst.assertEqual(rx_mpls.label, mpls_labels[ii].value)
38 tst.assertEqual(rx_mpls.cos, mpls_labels[ii].exp)
39 tst.assertEqual(rx_mpls.ttl, mpls_labels[ii].ttl)
41 if ii == len(mpls_labels) - 1:
42 tst.assertEqual(rx_mpls.s, 1)
45 tst.assertEqual(rx_mpls.s, 0)
46 # pop the label to expose the next
47 rx_mpls = rx_mpls[MPLS].payload
50 class TestSRMPLS(VppTestCase):
51 """ SR-MPLS Test Case """
54 super(TestSRMPLS, self).setUp()
56 # create 2 pg interfaces
57 self.create_pg_interfaces(range(4))
59 # setup both interfaces
60 # assign them different tables.
64 tbl = VppMplsTable(self, 0)
66 self.tables.append(tbl)
68 for i in self.pg_interfaces:
77 for i in self.pg_interfaces:
83 super(TestSRMPLS, self).tearDown()
85 def create_stream_ip4(self, src_if, dst_ip, ip_ttl=64, ip_dscp=0):
86 self.reset_packet_infos()
88 for i in range(0, 257):
89 info = self.create_packet_info(src_if, src_if)
90 payload = self.info_to_payload(info)
91 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
92 IP(src=src_if.remote_ip4, dst=dst_ip,
93 ttl=ip_ttl, tos=ip_dscp) /
94 UDP(sport=1234, dport=1234) /
100 def verify_capture_labelled_ip4(self, src_if, capture, sent,
101 mpls_labels, ip_ttl=None):
103 capture = verify_filter(capture, sent)
105 self.assertEqual(len(capture), len(sent))
107 for i in range(len(capture)):
113 verify_mpls_stack(self, rx, mpls_labels)
115 self.assertEqual(rx_ip.src, tx_ip.src)
116 self.assertEqual(rx_ip.dst, tx_ip.dst)
118 # IP processing post pop has decremented the TTL
119 self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl)
121 self.assertEqual(rx_ip.ttl, ip_ttl)
126 def verify_capture_tunneled_ip4(self, src_if, capture, sent, mpls_labels):
128 capture = verify_filter(capture, sent)
130 self.assertEqual(len(capture), len(sent))
132 for i in range(len(capture)):
138 verify_mpls_stack(self, rx, mpls_labels)
140 self.assertEqual(rx_ip.src, tx_ip.src)
141 self.assertEqual(rx_ip.dst, tx_ip.dst)
142 # IP processing post pop has decremented the TTL
143 self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl)
148 def test_sr_mpls(self):
152 # A simple MPLS xconnect - neos label in label out
154 route_32_eos = VppMplsRoute(self, 32, 0,
155 [VppRoutePath(self.pg0.remote_ip4,
156 self.pg0.sw_if_index,
157 labels=[VppMplsLabel(32)])])
158 route_32_eos.add_vpp_config()
161 # A binding SID with only one label
163 self.vapi.sr_mpls_policy_add(999, 1, 0, [32])
166 # A labeled IP route that resolves thru the binding SID
168 ip_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
169 [VppRoutePath("0.0.0.0",
172 labels=[VppMplsLabel(55)])])
173 ip_10_0_0_1.add_vpp_config()
175 tx = self.create_stream_ip4(self.pg1, "10.0.0.1")
176 rx = self.send_and_expect(self.pg1, tx, self.pg0)
177 self.verify_capture_labelled_ip4(self.pg0, rx, tx,
182 # An unlabeled IP route that resolves thru the binding SID
184 ip_10_0_0_1 = VppIpRoute(self, "10.0.0.2", 32,
185 [VppRoutePath("0.0.0.0",
188 ip_10_0_0_1.add_vpp_config()
190 tx = self.create_stream_ip4(self.pg1, "10.0.0.2")
191 rx = self.send_and_expect(self.pg1, tx, self.pg0)
192 self.verify_capture_labelled_ip4(self.pg0, rx, tx,
195 self.vapi.sr_mpls_policy_del(999)
198 # this time the SID has many labels pushed
200 self.vapi.sr_mpls_policy_add(999, 1, 0, [32, 33, 34])
202 tx = self.create_stream_ip4(self.pg1, "10.0.0.1")
203 rx = self.send_and_expect(self.pg1, tx, self.pg0)
204 self.verify_capture_labelled_ip4(self.pg0, rx, tx,
209 tx = self.create_stream_ip4(self.pg1, "10.0.0.2")
210 rx = self.send_and_expect(self.pg1, tx, self.pg0)
211 self.verify_capture_labelled_ip4(self.pg0, rx, tx,
217 # Resolve an MPLS tunnel via the SID
219 mpls_tun = VppMPLSTunnelInterface(
221 [VppRoutePath("0.0.0.0",
224 labels=[VppMplsLabel(44),
226 mpls_tun.add_vpp_config()
230 # add an unlabelled route through the new tunnel
232 route_10_0_0_3 = VppIpRoute(self, "10.0.0.3", 32,
233 [VppRoutePath("0.0.0.0",
234 mpls_tun._sw_if_index)])
235 route_10_0_0_3.add_vpp_config()
236 self.logger.info(self.vapi.cli("sh mpls tun 0"))
237 self.logger.info(self.vapi.cli("sh adj 21"))
239 tx = self.create_stream_ip4(self.pg1, "10.0.0.3")
240 rx = self.send_and_expect(self.pg1, tx, self.pg0)
241 self.verify_capture_tunneled_ip4(self.pg0, rx, tx,
249 # add a labelled route through the new tunnel
251 route_10_0_0_3 = VppIpRoute(self, "10.0.0.4", 32,
252 [VppRoutePath("0.0.0.0",
253 mpls_tun._sw_if_index,
254 labels=[VppMplsLabel(55)])])
255 route_10_0_0_3.add_vpp_config()
257 tx = self.create_stream_ip4(self.pg1, "10.0.0.4")
258 rx = self.send_and_expect(self.pg1, tx, self.pg0)
259 self.verify_capture_tunneled_ip4(self.pg0, rx, tx,
267 self.vapi.sr_mpls_policy_del(999)
270 if __name__ == '__main__':
271 unittest.main(testRunner=VppTestRunner)