6 from framework import VppTestCase, VppTestRunner
7 from vpp_ip_route import IpRoute, RoutePath, MplsRoute, MplsIpBind
9 from scapy.packet import Raw
10 from scapy.layers.l2 import Ether
11 from scapy.layers.inet import IP, UDP
12 from scapy.layers.inet6 import IPv6
13 from scapy.contrib.mpls import MPLS
17 class TestMPLS(VppTestCase):
18 """ MPLS Test Case """
22 super(TestMPLS, cls).setUpClass()
25 super(TestMPLS, self).setUp()
27 # create 2 pg interfaces
28 self.create_pg_interfaces(range(2))
30 # setup both interfaces
31 # assign them different tables.
34 for i in self.pg_interfaces:
36 i.set_table_ip4(table_id)
37 i.set_table_ip6(table_id)
46 super(TestMPLS, self).tearDown()
48 # the default of 64 matches the IP packet TTL default
49 def create_stream_labelled_ip4(self, src_if, mpls_labels, mpls_ttl=255):
51 for i in range(0, 257):
52 info = self.create_packet_info(src_if.sw_if_index,
54 payload = self.info_to_payload(info)
55 p = Ether(dst=src_if.local_mac, src=src_if.remote_mac)
57 for ii in range(len(mpls_labels)):
58 if ii == len(mpls_labels) - 1:
59 p = p / MPLS(label=mpls_labels[ii], ttl=mpls_ttl, s=1)
61 p = p / MPLS(label=mpls_labels[ii], ttl=mpls_ttl, s=0)
62 p = (p / IP(src=src_if.remote_ip4, dst=src_if.remote_ip4) /
63 UDP(sport=1234, dport=1234) /
69 def create_stream_ip4(self, src_if, dst_ip):
71 for i in range(0, 257):
72 info = self.create_packet_info(src_if.sw_if_index,
74 payload = self.info_to_payload(info)
75 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
76 IP(src=src_if.remote_ip4, dst=dst_ip) /
77 UDP(sport=1234, dport=1234) /
83 def create_stream_labelled_ip6(self, src_if, mpls_label, mpls_ttl):
85 for i in range(0, 257):
86 info = self.create_packet_info(src_if.sw_if_index,
88 payload = self.info_to_payload(info)
89 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
90 MPLS(label=mpls_label, ttl=mpls_ttl) /
91 IPv6(src=src_if.remote_ip6, dst=src_if.remote_ip6) /
92 UDP(sport=1234, dport=1234) /
99 def verify_filter(capture, sent):
100 if not len(capture) == len(sent):
101 # filter out any IPv6 RAs from the capture
107 def verify_capture_ip4(self, src_if, capture, sent):
109 capture = self.verify_filter(capture, sent)
111 self.assertEqual(len(capture), len(sent))
113 for i in range(len(capture)):
117 # the rx'd packet has the MPLS label popped
119 self.assertEqual(eth.type, 0x800)
124 self.assertEqual(rx_ip.src, tx_ip.src)
125 self.assertEqual(rx_ip.dst, tx_ip.dst)
126 # IP processing post pop has decremented the TTL
127 self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl)
132 def verify_mpls_stack(self, rx, mpls_labels, ttl=255, num=0):
133 # the rx'd packet has the MPLS label popped
135 self.assertEqual(eth.type, 0x8847)
139 for ii in range(len(mpls_labels)):
140 self.assertEqual(rx_mpls.label, mpls_labels[ii])
141 self.assertEqual(rx_mpls.cos, 0)
143 self.assertEqual(rx_mpls.ttl, ttl)
145 self.assertEqual(rx_mpls.ttl, 255)
147 if ii == len(mpls_labels) - 1:
148 self.assertEqual(rx_mpls.s, 1)
151 self.assertEqual(rx_mpls.s, 0)
152 # pop the label to expose the next
153 rx_mpls = rx_mpls[MPLS].payload
155 def verify_capture_labelled_ip4(self, src_if, capture, sent,
158 capture = self.verify_filter(capture, sent)
160 self.assertEqual(len(capture), len(sent))
162 for i in range(len(capture)):
168 # the MPLS TTL is copied from the IP
169 self.verify_mpls_stack(
170 rx, mpls_labels, rx_ip.ttl, len(mpls_labels) - 1)
172 self.assertEqual(rx_ip.src, tx_ip.src)
173 self.assertEqual(rx_ip.dst, tx_ip.dst)
174 # IP processing post pop has decremented the TTL
175 self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl)
180 def verify_capture_tunneled_ip4(self, src_if, capture, sent, mpls_labels):
182 capture = self.verify_filter(capture, sent)
184 self.assertEqual(len(capture), len(sent))
186 for i in range(len(capture)):
192 # the MPLS TTL is 255 since it enters a new tunnel
193 self.verify_mpls_stack(
194 rx, mpls_labels, 255, len(mpls_labels) - 1)
196 self.assertEqual(rx_ip.src, tx_ip.src)
197 self.assertEqual(rx_ip.dst, tx_ip.dst)
198 # IP processing post pop has decremented the TTL
199 self.assertEqual(rx_ip.ttl + 1, tx_ip.ttl)
204 def verify_capture_labelled(self, src_if, capture, sent,
205 mpls_labels, ttl=254, num=0):
207 capture = self.verify_filter(capture, sent)
209 self.assertEqual(len(capture), len(sent))
211 for i in range(len(capture)):
213 self.verify_mpls_stack(rx, mpls_labels, ttl, num)
217 def verify_capture_ip6(self, src_if, capture, sent):
219 self.assertEqual(len(capture), len(sent))
221 for i in range(len(capture)):
225 # the rx'd packet has the MPLS label popped
227 self.assertEqual(eth.type, 0x86DD)
232 self.assertEqual(rx_ip.src, tx_ip.src)
233 self.assertEqual(rx_ip.dst, tx_ip.dst)
234 # IP processing post pop has decremented the TTL
235 self.assertEqual(rx_ip.hlim + 1, tx_ip.hlim)
241 """ MPLS label swap tests """
244 # A simple MPLS xconnect - eos label in label out
246 route_32_eos = MplsRoute(self, 32, 1,
247 [RoutePath(self.pg0.remote_ip4,
248 self.pg0.sw_if_index,
250 route_32_eos.add_vpp_config()
253 # a stream that matches the route for 10.0.0.1
254 # PG0 is in the default table
256 self.vapi.cli("clear trace")
257 tx = self.create_stream_labelled_ip4(self.pg0, [32])
258 self.pg0.add_stream(tx)
260 self.pg_enable_capture(self.pg_interfaces)
263 rx = self.pg0.get_capture()
264 self.verify_capture_labelled_ip4(self.pg0, rx, tx, [33])
267 # A simple MPLS xconnect - non-eos label in label out
269 route_32_neos = MplsRoute(self, 32, 0,
270 [RoutePath(self.pg0.remote_ip4,
271 self.pg0.sw_if_index,
273 route_32_neos.add_vpp_config()
276 # a stream that matches the route for 10.0.0.1
277 # PG0 is in the default table
279 self.vapi.cli("clear trace")
280 tx = self.create_stream_labelled_ip4(self.pg0, [32, 99])
281 self.pg0.add_stream(tx)
283 self.pg_enable_capture(self.pg_interfaces)
286 rx = self.pg0.get_capture()
287 self.verify_capture_labelled(self.pg0, rx, tx, [33, 99])
290 # An MPLS xconnect - EOS label in IP out
292 route_33_eos = MplsRoute(self, 33, 1,
293 [RoutePath(self.pg0.remote_ip4,
294 self.pg0.sw_if_index,
296 route_33_eos.add_vpp_config()
298 self.vapi.cli("clear trace")
299 tx = self.create_stream_labelled_ip4(self.pg0, [33])
300 self.pg0.add_stream(tx)
302 self.pg_enable_capture(self.pg_interfaces)
305 rx = self.pg0.get_capture()
306 self.verify_capture_ip4(self.pg0, rx, tx)
309 # An MPLS xconnect - non-EOS label in IP out - an invalid configuration
310 # so this traffic should be dropped.
312 route_33_neos = MplsRoute(self, 33, 0,
313 [RoutePath(self.pg0.remote_ip4,
314 self.pg0.sw_if_index,
316 route_33_neos.add_vpp_config()
318 self.vapi.cli("clear trace")
319 tx = self.create_stream_labelled_ip4(self.pg0, [33, 99])
320 self.pg0.add_stream(tx)
322 self.pg_enable_capture(self.pg_interfaces)
325 rx = self.pg0.get_capture()
327 self.assertEqual(0, len(rx))
329 self.logger.error("MPLS non-EOS packets popped and forwarded")
330 self.logger.error(ppp("", rx))
334 # A recursive EOS x-connect, which resolves through another x-connect
336 route_34_eos = MplsRoute(self, 34, 1,
337 [RoutePath("0.0.0.0",
341 route_34_eos.add_vpp_config()
343 self.vapi.cli("clear trace")
344 tx = self.create_stream_labelled_ip4(self.pg0, [34])
345 self.pg0.add_stream(tx)
347 self.pg_enable_capture(self.pg_interfaces)
350 rx = self.pg0.get_capture()
351 self.verify_capture_labelled_ip4(self.pg0, rx, tx, [33, 44, 45])
354 # A recursive non-EOS x-connect, which resolves through another
357 route_34_neos = MplsRoute(self, 34, 0,
358 [RoutePath("0.0.0.0",
362 route_34_neos.add_vpp_config()
364 self.vapi.cli("clear trace")
365 tx = self.create_stream_labelled_ip4(self.pg0, [34, 99])
366 self.pg0.add_stream(tx)
368 self.pg_enable_capture(self.pg_interfaces)
371 rx = self.pg0.get_capture()
372 # it's the 2nd (counting from 0) label in the stack that is swapped
373 self.verify_capture_labelled(self.pg0, rx, tx, [33, 44, 46, 99], num=2)
376 # an recursive IP route that resolves through the recursive non-eos
379 ip_10_0_0_1 = IpRoute(self, "10.0.0.1", 32,
380 [RoutePath("0.0.0.0",
384 ip_10_0_0_1.add_vpp_config()
386 self.vapi.cli("clear trace")
387 tx = self.create_stream_ip4(self.pg0, "10.0.0.1")
388 self.pg0.add_stream(tx)
390 self.pg_enable_capture(self.pg_interfaces)
393 rx = self.pg0.get_capture()
394 self.verify_capture_labelled_ip4(self.pg0, rx, tx, [33, 44, 46, 55])
396 ip_10_0_0_1.remove_vpp_config()
397 route_34_neos.remove_vpp_config()
398 route_34_eos.remove_vpp_config()
399 route_33_neos.remove_vpp_config()
400 route_33_eos.remove_vpp_config()
401 route_32_neos.remove_vpp_config()
402 route_32_eos.remove_vpp_config()
405 """ MPLS Local Label Binding test """
408 # Add a non-recursive route with a single out label
410 route_10_0_0_1 = IpRoute(self, "10.0.0.1", 32,
411 [RoutePath(self.pg0.remote_ip4,
412 self.pg0.sw_if_index,
414 route_10_0_0_1.add_vpp_config()
416 # bind a local label to the route
417 binding = MplsIpBind(self, 44, "10.0.0.1", 32)
418 binding.add_vpp_config()
421 self.vapi.cli("clear trace")
422 tx = self.create_stream_labelled_ip4(self.pg0, [44, 99])
423 self.pg0.add_stream(tx)
425 self.pg_enable_capture(self.pg_interfaces)
428 rx = self.pg0.get_capture()
429 self.verify_capture_labelled(self.pg0, rx, tx, [45, 99])
432 self.vapi.cli("clear trace")
433 tx = self.create_stream_labelled_ip4(self.pg0, [44])
434 self.pg0.add_stream(tx)
436 self.pg_enable_capture(self.pg_interfaces)
439 rx = self.pg0.get_capture()
440 self.verify_capture_labelled(self.pg0, rx, tx, [45])
443 self.vapi.cli("clear trace")
444 tx = self.create_stream_ip4(self.pg0, "10.0.0.1")
445 self.pg0.add_stream(tx)
447 self.pg_enable_capture(self.pg_interfaces)
450 rx = self.pg0.get_capture()
451 self.verify_capture_labelled_ip4(self.pg0, rx, tx, [45])
456 binding.remove_vpp_config()
457 route_10_0_0_1.remove_vpp_config()
459 def test_imposition(self):
460 """ MPLS label imposition test """
463 # Add a non-recursive route with a single out label
465 route_10_0_0_1 = IpRoute(self, "10.0.0.1", 32,
466 [RoutePath(self.pg0.remote_ip4,
467 self.pg0.sw_if_index,
469 route_10_0_0_1.add_vpp_config()
472 # a stream that matches the route for 10.0.0.1
473 # PG0 is in the default table
475 self.vapi.cli("clear trace")
476 tx = self.create_stream_ip4(self.pg0, "10.0.0.1")
477 self.pg0.add_stream(tx)
479 self.pg_enable_capture(self.pg_interfaces)
482 rx = self.pg0.get_capture()
483 self.verify_capture_labelled_ip4(self.pg0, rx, tx, [32])
486 # Add a non-recursive route with a 3 out labels
488 route_10_0_0_2 = IpRoute(self, "10.0.0.2", 32,
489 [RoutePath(self.pg0.remote_ip4,
490 self.pg0.sw_if_index,
491 labels=[32, 33, 34])])
492 route_10_0_0_2.add_vpp_config()
495 # a stream that matches the route for 10.0.0.1
496 # PG0 is in the default table
498 self.vapi.cli("clear trace")
499 tx = self.create_stream_ip4(self.pg0, "10.0.0.2")
500 self.pg0.add_stream(tx)
502 self.pg_enable_capture(self.pg_interfaces)
505 rx = self.pg0.get_capture()
506 self.verify_capture_labelled_ip4(self.pg0, rx, tx, [32, 33, 34])
509 # add a recursive path, with output label, via the 1 label route
511 route_11_0_0_1 = IpRoute(self, "11.0.0.1", 32,
512 [RoutePath("10.0.0.1",
515 route_11_0_0_1.add_vpp_config()
518 # a stream that matches the route for 11.0.0.1, should pick up
519 # the label stack for 11.0.0.1 and 10.0.0.1
521 self.vapi.cli("clear trace")
522 tx = self.create_stream_ip4(self.pg0, "11.0.0.1")
523 self.pg0.add_stream(tx)
525 self.pg_enable_capture(self.pg_interfaces)
528 rx = self.pg0.get_capture()
529 self.verify_capture_labelled_ip4(self.pg0, rx, tx, [32, 44])
532 # add a recursive path, with 2 labels, via the 3 label route
534 route_11_0_0_2 = IpRoute(self, "11.0.0.2", 32,
535 [RoutePath("10.0.0.2",
538 route_11_0_0_2.add_vpp_config()
541 # a stream that matches the route for 11.0.0.1, should pick up
542 # the label stack for 11.0.0.1 and 10.0.0.1
544 self.vapi.cli("clear trace")
545 tx = self.create_stream_ip4(self.pg0, "11.0.0.2")
546 self.pg0.add_stream(tx)
548 self.pg_enable_capture(self.pg_interfaces)
551 rx = self.pg0.get_capture()
552 self.verify_capture_labelled_ip4(
553 self.pg0, rx, tx, [32, 33, 34, 44, 45])
558 route_11_0_0_2.remove_vpp_config()
559 route_11_0_0_1.remove_vpp_config()
560 route_10_0_0_2.remove_vpp_config()
561 route_10_0_0_1.remove_vpp_config()
563 def test_tunnel(self):
564 """ MPLS Tunnel Tests """
567 # Create a tunnel with a single out label
569 nh_addr = socket.inet_pton(socket.AF_INET, self.pg0.remote_ip4)
571 reply = self.vapi.mpls_tunnel_add_del(
572 0xffffffff, # don't know the if index yet
575 self.pg0.sw_if_index,
576 0, # next-hop-table-id
581 self.vapi.sw_interface_set_flags(reply.sw_if_index, admin_up_down=1)
584 # add an unlabelled route through the new tunnel
586 dest_addr = socket.inet_pton(socket.AF_INET, "10.0.0.3")
587 nh_addr = socket.inet_pton(socket.AF_INET, "0.0.0.0")
590 self.vapi.ip_add_del_route(
593 nh_addr, # all zeros next-hop - tunnel is p2p
594 reply.sw_if_index, # sw_if_index of the new tunnel
596 0, # next-hop-table-id
602 self.vapi.cli("clear trace")
603 tx = self.create_stream_ip4(self.pg0, "10.0.0.3")
604 self.pg0.add_stream(tx)
606 self.pg_enable_capture(self.pg_interfaces)
609 rx = self.pg0.get_capture()
610 self.verify_capture_tunneled_ip4(self.pg0, rx, tx, [44, 46])
612 def test_v4_exp_null(self):
613 """ MPLS V4 Explicit NULL test """
616 # The first test case has an MPLS TTL of 0
617 # all packet should be dropped
619 tx = self.create_stream_labelled_ip4(self.pg0, [0], 0)
620 self.pg0.add_stream(tx)
622 self.pg_enable_capture(self.pg_interfaces)
625 rx = self.pg0.get_capture()
628 self.assertEqual(0, len(rx))
630 self.logger.error("MPLS TTL=0 packets forwarded")
631 self.logger.error(ppp("", rx))
635 # a stream with a non-zero MPLS TTL
636 # PG0 is in the default table
638 self.vapi.cli("clear trace")
639 tx = self.create_stream_labelled_ip4(self.pg0, [0])
640 self.pg0.add_stream(tx)
642 self.pg_enable_capture(self.pg_interfaces)
645 rx = self.pg0.get_capture()
646 self.verify_capture_ip4(self.pg0, rx, tx)
649 # a stream with a non-zero MPLS TTL
651 # we are ensuring the post-pop lookup occurs in the VRF table
653 self.vapi.cli("clear trace")
654 tx = self.create_stream_labelled_ip4(self.pg1, [0])
655 self.pg1.add_stream(tx)
657 self.pg_enable_capture(self.pg_interfaces)
660 rx = self.pg1.get_capture()
661 self.verify_capture_ip4(self.pg0, rx, tx)
663 def test_v6_exp_null(self):
664 """ MPLS V6 Explicit NULL test """
667 # a stream with a non-zero MPLS TTL
668 # PG0 is in the default table
670 self.vapi.cli("clear trace")
671 tx = self.create_stream_labelled_ip6(self.pg0, 2, 2)
672 self.pg0.add_stream(tx)
674 self.pg_enable_capture(self.pg_interfaces)
677 rx = self.pg0.get_capture()
678 self.verify_capture_ip6(self.pg0, rx, tx)
681 # a stream with a non-zero MPLS TTL
683 # we are ensuring the post-pop lookup occurs in the VRF table
685 self.vapi.cli("clear trace")
686 tx = self.create_stream_labelled_ip6(self.pg1, 2, 2)
687 self.pg1.add_stream(tx)
689 self.pg_enable_capture(self.pg_interfaces)
692 rx = self.pg1.get_capture()
693 self.verify_capture_ip6(self.pg0, rx, tx)
696 if __name__ == '__main__':
697 unittest.main(testRunner=VppTestRunner)