6 from framework import VppTestCase, VppTestRunner
7 from vpp_ip import DpoProto
8 from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute, \
9 VppIpTable, VppMplsTable, VppMplsLabel
10 from vpp_mpls_tunnel_interface import VppMPLSTunnelInterface
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether
14 from scapy.layers.inet import IP, UDP, ICMP
15 from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded
16 from scapy.contrib.mpls import MPLS
19 def verify_filter(capture, sent):
20 if not len(capture) == len(sent):
21 # filter out any IPv6 RAs from the capture
28 def verify_mpls_stack(tst, rx, mpls_labels):
29 # the rx'd packet has the MPLS label popped
31 tst.assertEqual(eth.type, 0x8847)
35 for ii in range(len(mpls_labels)):
36 tst.assertEqual(rx_mpls.label, mpls_labels[ii].value)
37 tst.assertEqual(rx_mpls.cos, mpls_labels[ii].exp)
38 tst.assertEqual(rx_mpls.ttl, mpls_labels[ii].ttl)
40 if ii == len(mpls_labels) - 1:
41 tst.assertEqual(rx_mpls.s, 1)
44 tst.assertEqual(rx_mpls.s, 0)
45 # pop the label to expose the next
46 rx_mpls = rx_mpls[MPLS].payload
49 class TestSRMPLS(VppTestCase):
50 """ SR-MPLS Test Case """
54 super(TestSRMPLS, cls).setUpClass()
57 def tearDownClass(cls):
58 super(TestSRMPLS, cls).tearDownClass()
61 super(TestSRMPLS, self).setUp()
63 # create 2 pg interfaces
64 self.create_pg_interfaces(range(4))
66 # setup both interfaces
67 # assign them different tables.
71 tbl = VppMplsTable(self, 0)
73 self.tables.append(tbl)
75 for i in self.pg_interfaces:
84 for i in self.pg_interfaces:
89 super(TestSRMPLS, self).tearDown()
91 def create_stream_ip4(self, src_if, dst_ip, ip_ttl=64, ip_dscp=0):
92 self.reset_packet_infos()
94 for i in range(0, 257):
95 info = self.create_packet_info(src_if, src_if)
96 payload = self.info_to_payload(info)
97 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
98 IP(src=src_if.remote_ip4, dst=dst_ip,
99 ttl=ip_ttl, tos=ip_dscp) /
100 UDP(sport=1234, dport=1234) /
106 def verify_capture_labelled_ip4(self, src_if, capture, sent,
107 mpls_labels, ip_ttl=None):
109 capture = verify_filter(capture, sent)
111 self.assertEqual(len(capture), len(sent))
113 for i in range(len(capture)):
119 verify_mpls_stack(self, rx, mpls_labels)
121 self.assertEqual(rx_ip.src, tx_ip.src)
122 self.assertEqual(rx_ip.dst, tx_ip.dst)
124 # IP processing post pop has decremented the TTL
125 self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl)
127 self.assertEqual(rx_ip.ttl, ip_ttl)
132 def verify_capture_tunneled_ip4(self, src_if, capture, sent, mpls_labels):
134 capture = verify_filter(capture, sent)
136 self.assertEqual(len(capture), len(sent))
138 for i in range(len(capture)):
144 verify_mpls_stack(self, rx, mpls_labels)
146 self.assertEqual(rx_ip.src, tx_ip.src)
147 self.assertEqual(rx_ip.dst, tx_ip.dst)
148 # IP processing post pop has decremented the TTL
149 self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl)
154 def test_sr_mpls(self):
158 # A simple MPLS xconnect - neos label in label out
160 route_32_eos = VppMplsRoute(self, 32, 0,
161 [VppRoutePath(self.pg0.remote_ip4,
162 self.pg0.sw_if_index,
163 labels=[VppMplsLabel(32)])])
164 route_32_eos.add_vpp_config()
167 # A binding SID with only one label
169 self.vapi.sr_mpls_policy_add(999, 1, 0, [32])
172 # A labeled IP route that resolves thru the binding SID
174 ip_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
175 [VppRoutePath("0.0.0.0",
178 labels=[VppMplsLabel(55)])])
179 ip_10_0_0_1.add_vpp_config()
181 tx = self.create_stream_ip4(self.pg1, "10.0.0.1")
182 rx = self.send_and_expect(self.pg1, tx, self.pg0)
183 self.verify_capture_labelled_ip4(self.pg0, rx, tx,
188 # An unlabeled IP route that resolves thru the binding SID
190 ip_10_0_0_1 = VppIpRoute(self, "10.0.0.2", 32,
191 [VppRoutePath("0.0.0.0",
194 ip_10_0_0_1.add_vpp_config()
196 tx = self.create_stream_ip4(self.pg1, "10.0.0.2")
197 rx = self.send_and_expect(self.pg1, tx, self.pg0)
198 self.verify_capture_labelled_ip4(self.pg0, rx, tx,
201 self.vapi.sr_mpls_policy_del(999)
204 # this time the SID has many labels pushed
206 self.vapi.sr_mpls_policy_add(999, 1, 0, [32, 33, 34])
208 tx = self.create_stream_ip4(self.pg1, "10.0.0.1")
209 rx = self.send_and_expect(self.pg1, tx, self.pg0)
210 self.verify_capture_labelled_ip4(self.pg0, rx, tx,
215 tx = self.create_stream_ip4(self.pg1, "10.0.0.2")
216 rx = self.send_and_expect(self.pg1, tx, self.pg0)
217 self.verify_capture_labelled_ip4(self.pg0, rx, tx,
223 # Resolve an MPLS tunnel via the SID
225 mpls_tun = VppMPLSTunnelInterface(
227 [VppRoutePath("0.0.0.0",
230 labels=[VppMplsLabel(44),
232 mpls_tun.add_vpp_config()
236 # add an unlabelled route through the new tunnel
238 route_10_0_0_3 = VppIpRoute(self, "10.0.0.3", 32,
239 [VppRoutePath("0.0.0.0",
240 mpls_tun._sw_if_index)])
241 route_10_0_0_3.add_vpp_config()
242 self.logger.info(self.vapi.cli("sh mpls tun 0"))
243 self.logger.info(self.vapi.cli("sh adj 21"))
245 tx = self.create_stream_ip4(self.pg1, "10.0.0.3")
246 rx = self.send_and_expect(self.pg1, tx, self.pg0)
247 self.verify_capture_tunneled_ip4(self.pg0, rx, tx,
255 # add a labelled route through the new tunnel
257 route_10_0_0_3 = VppIpRoute(self, "10.0.0.4", 32,
258 [VppRoutePath("0.0.0.0",
259 mpls_tun._sw_if_index,
260 labels=[VppMplsLabel(55)])])
261 route_10_0_0_3.add_vpp_config()
263 tx = self.create_stream_ip4(self.pg1, "10.0.0.4")
264 rx = self.send_and_expect(self.pg1, tx, self.pg0)
265 self.verify_capture_tunneled_ip4(self.pg0, rx, tx,
273 self.vapi.sr_mpls_policy_del(999)
276 if __name__ == '__main__':
277 unittest.main(testRunner=VppTestRunner)