6 from framework import VppTestCase, VppTestRunner
7 from vpp_ip import DpoProto
8 from vpp_ip_route import VppIpRoute, VppRoutePath, VppMplsRoute, \
9 VppIpTable, VppMplsTable, VppMplsLabel
10 from vpp_mpls_tunnel_interface import VppMPLSTunnelInterface
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether
14 from scapy.layers.inet import IP, UDP, ICMP
15 from scapy.layers.inet6 import IPv6, ICMPv6TimeExceeded
16 from scapy.contrib.mpls import MPLS
19 def verify_filter(capture, sent):
20 if not len(capture) == len(sent):
21 # filter out any IPv6 RAs from the capture
28 def verify_mpls_stack(tst, rx, mpls_labels):
29 # the rx'd packet has the MPLS label popped
31 tst.assertEqual(eth.type, 0x8847)
35 for ii in range(len(mpls_labels)):
36 tst.assertEqual(rx_mpls.label, mpls_labels[ii].value)
37 tst.assertEqual(rx_mpls.cos, mpls_labels[ii].exp)
38 tst.assertEqual(rx_mpls.ttl, mpls_labels[ii].ttl)
40 if ii == len(mpls_labels) - 1:
41 tst.assertEqual(rx_mpls.s, 1)
44 tst.assertEqual(rx_mpls.s, 0)
45 # pop the label to expose the next
46 rx_mpls = rx_mpls[MPLS].payload
49 class TestSRMPLS(VppTestCase):
50 """ SR-MPLS Test Case """
53 super(TestSRMPLS, self).setUp()
55 # create 2 pg interfaces
56 self.create_pg_interfaces(range(4))
58 # setup both interfaces
59 # assign them different tables.
63 tbl = VppMplsTable(self, 0)
65 self.tables.append(tbl)
67 for i in self.pg_interfaces:
76 for i in self.pg_interfaces:
82 super(TestSRMPLS, self).tearDown()
84 def create_stream_ip4(self, src_if, dst_ip, ip_ttl=64, ip_dscp=0):
85 self.reset_packet_infos()
87 for i in range(0, 257):
88 info = self.create_packet_info(src_if, src_if)
89 payload = self.info_to_payload(info)
90 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
91 IP(src=src_if.remote_ip4, dst=dst_ip,
92 ttl=ip_ttl, tos=ip_dscp) /
93 UDP(sport=1234, dport=1234) /
99 def verify_capture_labelled_ip4(self, src_if, capture, sent,
100 mpls_labels, ip_ttl=None):
102 capture = verify_filter(capture, sent)
104 self.assertEqual(len(capture), len(sent))
106 for i in range(len(capture)):
112 verify_mpls_stack(self, rx, mpls_labels)
114 self.assertEqual(rx_ip.src, tx_ip.src)
115 self.assertEqual(rx_ip.dst, tx_ip.dst)
117 # IP processing post pop has decremented the TTL
118 self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl)
120 self.assertEqual(rx_ip.ttl, ip_ttl)
125 def verify_capture_tunneled_ip4(self, src_if, capture, sent, mpls_labels):
127 capture = verify_filter(capture, sent)
129 self.assertEqual(len(capture), len(sent))
131 for i in range(len(capture)):
137 verify_mpls_stack(self, rx, mpls_labels)
139 self.assertEqual(rx_ip.src, tx_ip.src)
140 self.assertEqual(rx_ip.dst, tx_ip.dst)
141 # IP processing post pop has decremented the TTL
142 self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl)
147 def test_sr_mpls(self):
151 # A simple MPLS xconnect - neos label in label out
153 route_32_eos = VppMplsRoute(self, 32, 0,
154 [VppRoutePath(self.pg0.remote_ip4,
155 self.pg0.sw_if_index,
156 labels=[VppMplsLabel(32)])])
157 route_32_eos.add_vpp_config()
160 # A binding SID with only one label
162 self.vapi.sr_mpls_policy_add(999, 1, 0, [32])
165 # A labeled IP route that resolves thru the binding SID
167 ip_10_0_0_1 = VppIpRoute(self, "10.0.0.1", 32,
168 [VppRoutePath("0.0.0.0",
171 labels=[VppMplsLabel(55)])])
172 ip_10_0_0_1.add_vpp_config()
174 tx = self.create_stream_ip4(self.pg1, "10.0.0.1")
175 rx = self.send_and_expect(self.pg1, tx, self.pg0)
176 self.verify_capture_labelled_ip4(self.pg0, rx, tx,
181 # An unlabeled IP route that resolves thru the binding SID
183 ip_10_0_0_1 = VppIpRoute(self, "10.0.0.2", 32,
184 [VppRoutePath("0.0.0.0",
187 ip_10_0_0_1.add_vpp_config()
189 tx = self.create_stream_ip4(self.pg1, "10.0.0.2")
190 rx = self.send_and_expect(self.pg1, tx, self.pg0)
191 self.verify_capture_labelled_ip4(self.pg0, rx, tx,
194 self.vapi.sr_mpls_policy_del(999)
197 # this time the SID has many labels pushed
199 self.vapi.sr_mpls_policy_add(999, 1, 0, [32, 33, 34])
201 tx = self.create_stream_ip4(self.pg1, "10.0.0.1")
202 rx = self.send_and_expect(self.pg1, tx, self.pg0)
203 self.verify_capture_labelled_ip4(self.pg0, rx, tx,
208 tx = self.create_stream_ip4(self.pg1, "10.0.0.2")
209 rx = self.send_and_expect(self.pg1, tx, self.pg0)
210 self.verify_capture_labelled_ip4(self.pg0, rx, tx,
216 # Resolve an MPLS tunnel via the SID
218 mpls_tun = VppMPLSTunnelInterface(
220 [VppRoutePath("0.0.0.0",
223 labels=[VppMplsLabel(44),
225 mpls_tun.add_vpp_config()
229 # add an unlabelled route through the new tunnel
231 route_10_0_0_3 = VppIpRoute(self, "10.0.0.3", 32,
232 [VppRoutePath("0.0.0.0",
233 mpls_tun._sw_if_index)])
234 route_10_0_0_3.add_vpp_config()
235 self.logger.info(self.vapi.cli("sh mpls tun 0"))
236 self.logger.info(self.vapi.cli("sh adj 21"))
238 tx = self.create_stream_ip4(self.pg1, "10.0.0.3")
239 rx = self.send_and_expect(self.pg1, tx, self.pg0)
240 self.verify_capture_tunneled_ip4(self.pg0, rx, tx,
248 # add a labelled route through the new tunnel
250 route_10_0_0_3 = VppIpRoute(self, "10.0.0.4", 32,
251 [VppRoutePath("0.0.0.0",
252 mpls_tun._sw_if_index,
253 labels=[VppMplsLabel(55)])])
254 route_10_0_0_3.add_vpp_config()
256 tx = self.create_stream_ip4(self.pg1, "10.0.0.4")
257 rx = self.send_and_expect(self.pg1, tx, self.pg0)
258 self.verify_capture_tunneled_ip4(self.pg0, rx, tx,
266 self.vapi.sr_mpls_policy_del(999)
269 if __name__ == '__main__':
270 unittest.main(testRunner=VppTestRunner)