5 from socket import AF_INET6
7 from framework import VppTestCase, VppTestRunner
8 from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathProto, VppIpTable
10 SRv6LocalSIDBehaviors,
16 SRv6PolicySteeringTypes,
20 from scapy.packet import Raw
21 from scapy.layers.l2 import Ether, Dot1Q
22 from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
23 from scapy.layers.inet import IP, UDP
28 class TestSRv6(VppTestCase):
33 super(TestSRv6, cls).setUpClass()
36 def tearDownClass(cls):
37 super(TestSRv6, cls).tearDownClass()
40 """Perform test setup before each test case."""
41 super(TestSRv6, self).setUp()
43 # packet sizes, inclusive L2 overhead
44 self.pg_packet_sizes = [64, 512, 1518, 9018]
47 self.reset_packet_infos()
50 """Clean up test setup after each test case."""
51 self.teardown_interfaces()
53 super(TestSRv6, self).tearDown()
55 def configure_interface(
56 self, interface, ipv6=False, ipv4=False, ipv6_table_id=0, ipv4_table_id=0
58 """Configure interface.
59 :param ipv6: configure IPv6 on interface
60 :param ipv4: configure IPv4 on interface
61 :param ipv6_table_id: FIB table_id for IPv6
62 :param ipv4_table_id: FIB table_id for IPv4
64 self.logger.debug("Configuring interface %s" % (interface.name))
66 self.logger.debug("Configuring IPv6")
67 interface.set_table_ip6(ipv6_table_id)
68 interface.config_ip6()
69 interface.resolve_ndp(timeout=5)
71 self.logger.debug("Configuring IPv4")
72 interface.set_table_ip4(ipv4_table_id)
73 interface.config_ip4()
74 interface.resolve_arp()
77 def setup_interfaces(self, ipv6=[], ipv4=[], ipv6_table_id=[], ipv4_table_id=[]):
78 """Create and configure interfaces.
80 :param ipv6: list of interface IPv6 capabilities
81 :param ipv4: list of interface IPv4 capabilities
82 :param ipv6_table_id: list of intf IPv6 FIB table_ids
83 :param ipv4_table_id: list of intf IPv4 FIB table_ids
84 :returns: List of created interfaces.
86 # how many interfaces?
91 self.logger.debug("Creating and configuring %d interfaces" % (count))
93 # fill up ipv6 and ipv4 lists if needed
94 # not enabled (False) is the default
96 ipv6 += (count - len(ipv6)) * [False]
98 ipv4 += (count - len(ipv4)) * [False]
100 # fill up table_id lists if needed
101 # table_id 0 (global) is the default
102 if len(ipv6_table_id) < count:
103 ipv6_table_id += (count - len(ipv6_table_id)) * [0]
104 if len(ipv4_table_id) < count:
105 ipv4_table_id += (count - len(ipv4_table_id)) * [0]
107 # create 'count' pg interfaces
108 self.create_pg_interfaces(range(count))
110 # setup all interfaces
111 for i in range(count):
112 intf = self.pg_interfaces[i]
113 self.configure_interface(
114 intf, ipv6[i], ipv4[i], ipv6_table_id[i], ipv4_table_id[i]
118 self.logger.debug(self.vapi.cli("show ip6 neighbors"))
120 self.logger.debug(self.vapi.cli("show ip4 neighbors"))
121 self.logger.debug(self.vapi.cli("show interface"))
122 self.logger.debug(self.vapi.cli("show hardware"))
124 return self.pg_interfaces
126 def teardown_interfaces(self):
127 """Unconfigure and bring down interface."""
128 self.logger.debug("Tearing down interfaces")
129 # tear down all interfaces
130 # AFAIK they cannot be deleted
131 for i in self.pg_interfaces:
132 self.logger.debug("Tear down interface %s" % (i.name))
138 @unittest.skipUnless(0, "PC to fix")
139 def test_SRv6_T_Encaps(self):
140 """Test SRv6 Transit.Encaps behavior for IPv6."""
141 # send traffic to one destination interface
142 # source and destination are IPv6 only
143 self.setup_interfaces(ipv6=[True, True])
145 # configure FIB entries
147 self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
149 route.add_vpp_config()
151 # configure encaps IPv6 source address
152 # needs to be done before SR Policy config
154 self.vapi.cli("set sr encaps source addr a3::")
157 # configure SRv6 Policy
158 # Note: segment list order: first -> last
159 sr_policy = VppSRv6Policy(
163 sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
166 segments=["a4::", "a5::", "a6::c7"],
169 sr_policy.add_vpp_config()
170 self.sr_policy = sr_policy
172 # log the sr policies
173 self.logger.info(self.vapi.cli("show sr policies"))
175 # steer IPv6 traffic to a7::/64 into SRv6 Policy
176 # use the bsid of the above self.sr_policy
177 pol_steering = VppSRv6Steering(
179 bsid=self.sr_policy.bsid,
182 traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
187 pol_steering.add_vpp_config()
189 # log the sr steering policies
190 self.logger.info(self.vapi.cli("show sr steering policies"))
193 count = len(self.pg_packet_sizes)
194 dst_inner = "a7::1234"
197 # create IPv6 packets without SRH
198 packet_header = self.create_packet_header_IPv6(dst_inner)
199 # create traffic stream pg0->pg1
202 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
206 # create IPv6 packets with SRH
207 # packets with segments-left 1, active segment a7::
208 packet_header = self.create_packet_header_IPv6_SRH(
209 sidlist=["a8::", "a7::", "a6::"], segleft=1
211 # create traffic stream pg0->pg1
214 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
218 # create IPv6 packets with SRH and IPv6
219 # packets with segments-left 1, active segment a7::
220 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
221 dst_inner, sidlist=["a8::", "a7::", "a6::"], segleft=1
223 # create traffic stream pg0->pg1
226 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
230 # send packets and verify received packets
231 self.send_and_verify_pkts(
232 self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_T_Encaps
235 # log the localsid counters
236 self.logger.info(self.vapi.cli("show sr localsid"))
239 pol_steering.remove_vpp_config()
240 self.logger.info(self.vapi.cli("show sr steering policies"))
243 self.sr_policy.remove_vpp_config()
244 self.logger.info(self.vapi.cli("show sr policies"))
250 self.teardown_interfaces()
252 def test_SRv6_T_Encaps_with_v6src(self):
253 """Test SRv6 Transit.Encaps behavior for IPv6 and select multiple src v6addr case."""
254 # send traffic to one destination interface
255 # source and destination are IPv6 only
256 self.setup_interfaces(ipv6=[True, True])
258 # configure FIB entries
260 self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
262 route.add_vpp_config()
264 # configure encaps IPv6 source address
265 # needs to be done before SR Policy config
267 self.vapi.cli("set sr encaps source addr a3::")
270 other_src_ip = "b1::"
271 # configure SRv6 Policy
272 # Note: segment list order: first -> last
273 sr_policy = VppSRv6PolicyV2(
277 sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
280 segments=["a4::", "a5::", "a6::c7"],
281 encap_src=other_src_ip,
284 sr_policy.add_vpp_config()
285 self.sr_policy = sr_policy
287 # log the sr policies
288 self.logger.info(self.vapi.cli("show sr policies"))
290 # steer IPv6 traffic to a7::/64 into SRv6 Policy
291 # use the bsid of the above self.sr_policy
292 pol_steering = VppSRv6Steering(
294 bsid=self.sr_policy.bsid,
297 traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
302 pol_steering.add_vpp_config()
304 # log the sr steering policies
305 self.logger.info(self.vapi.cli("show sr steering-policies"))
308 count = len(self.pg_packet_sizes)
309 dst_inner = "a7::1234"
312 # create IPv6 packets without SRH
313 packet_header = self.create_packet_header_IPv6(dst_inner)
314 # create traffic stream pg0->pg1
317 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
321 # create IPv6 packets with SRH
322 # packets with segments-left 1, active segment a7::
323 packet_header = self.create_packet_header_IPv6_SRH(
324 sidlist=["a8::", "a7::", "a6::"], segleft=1
326 # create traffic stream pg0->pg1
329 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
333 # create IPv6 packets with SRH and IPv6
334 # packets with segments-left 1, active segment a7::
335 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
336 dst_inner, sidlist=["a8::", "a7::", "a6::"], segleft=1
338 # create traffic stream pg0->pg1
341 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
345 # send packets and verify received packets
346 self.send_and_verify_pkts(
347 self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_T_Encaps
350 # log the localsid counters
351 self.logger.info(self.vapi.cli("show sr localsid"))
354 pol_steering.remove_vpp_config()
355 self.logger.info(self.vapi.cli("show sr steering-policies"))
358 self.sr_policy.remove_vpp_config()
359 self.logger.info(self.vapi.cli("show sr policies"))
365 self.teardown_interfaces()
367 @unittest.skipUnless(0, "PC to fix")
368 def test_SRv6_T_Insert(self):
369 """Test SRv6 Transit.Insert behavior (IPv6 only)."""
370 # send traffic to one destination interface
371 # source and destination are IPv6 only
372 self.setup_interfaces(ipv6=[True, True])
374 # configure FIB entries
376 self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
378 route.add_vpp_config()
380 # configure encaps IPv6 source address
381 # needs to be done before SR Policy config
383 self.vapi.cli("set sr encaps source addr a3::")
386 # configure SRv6 Policy
387 # Note: segment list order: first -> last
388 sr_policy = VppSRv6Policy(
392 sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
395 segments=["a4::", "a5::", "a6::c7"],
398 sr_policy.add_vpp_config()
399 self.sr_policy = sr_policy
401 # log the sr policies
402 self.logger.info(self.vapi.cli("show sr policies"))
404 # steer IPv6 traffic to a7::/64 into SRv6 Policy
405 # use the bsid of the above self.sr_policy
406 pol_steering = VppSRv6Steering(
408 bsid=self.sr_policy.bsid,
411 traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
416 pol_steering.add_vpp_config()
418 # log the sr steering policies
419 self.logger.info(self.vapi.cli("show sr steering policies"))
422 count = len(self.pg_packet_sizes)
423 dst_inner = "a7::1234"
426 # create IPv6 packets without SRH
427 packet_header = self.create_packet_header_IPv6(dst_inner)
428 # create traffic stream pg0->pg1
431 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
435 # create IPv6 packets with SRH
436 # packets with segments-left 1, active segment a7::
437 packet_header = self.create_packet_header_IPv6_SRH(
438 sidlist=["a8::", "a7::", "a6::"], segleft=1
440 # create traffic stream pg0->pg1
443 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
447 # send packets and verify received packets
448 self.send_and_verify_pkts(
449 self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_T_Insert
452 # log the localsid counters
453 self.logger.info(self.vapi.cli("show sr localsid"))
456 pol_steering.remove_vpp_config()
457 self.logger.info(self.vapi.cli("show sr steering policies"))
460 self.sr_policy.remove_vpp_config()
461 self.logger.info(self.vapi.cli("show sr policies"))
467 self.teardown_interfaces()
469 @unittest.skipUnless(0, "PC to fix")
470 def test_SRv6_T_Encaps_IPv4(self):
471 """Test SRv6 Transit.Encaps behavior for IPv4."""
472 # send traffic to one destination interface
473 # source interface is IPv4 only
474 # destination interface is IPv6 only
475 self.setup_interfaces(ipv6=[False, True], ipv4=[True, False])
477 # configure FIB entries
479 self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
481 route.add_vpp_config()
483 # configure encaps IPv6 source address
484 # needs to be done before SR Policy config
486 self.vapi.cli("set sr encaps source addr a3::")
489 # configure SRv6 Policy
490 # Note: segment list order: first -> last
491 sr_policy = VppSRv6Policy(
495 sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
498 segments=["a4::", "a5::", "a6::c7"],
501 sr_policy.add_vpp_config()
502 self.sr_policy = sr_policy
504 # log the sr policies
505 self.logger.info(self.vapi.cli("show sr policies"))
507 # steer IPv4 traffic to 7.1.1.0/24 into SRv6 Policy
508 # use the bsid of the above self.sr_policy
509 pol_steering = VppSRv6Steering(
511 bsid=self.sr_policy.bsid,
514 traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV4,
519 pol_steering.add_vpp_config()
521 # log the sr steering policies
522 self.logger.info(self.vapi.cli("show sr steering policies"))
525 count = len(self.pg_packet_sizes)
526 dst_inner = "7.1.1.123"
529 # create IPv4 packets
530 packet_header = self.create_packet_header_IPv4(dst_inner)
531 # create traffic stream pg0->pg1
534 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
538 # send packets and verify received packets
539 self.send_and_verify_pkts(
540 self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_T_Encaps_IPv4
543 # log the localsid counters
544 self.logger.info(self.vapi.cli("show sr localsid"))
547 pol_steering.remove_vpp_config()
548 self.logger.info(self.vapi.cli("show sr steering policies"))
551 self.sr_policy.remove_vpp_config()
552 self.logger.info(self.vapi.cli("show sr policies"))
558 self.teardown_interfaces()
560 @unittest.skip("VPP crashes after running this test")
561 def test_SRv6_T_Encaps_L2(self):
562 """Test SRv6 Transit.Encaps behavior for L2."""
563 # send traffic to one destination interface
564 # source interface is IPv4 only TODO?
565 # destination interface is IPv6 only
566 self.setup_interfaces(ipv6=[False, True], ipv4=[False, False])
568 # configure FIB entries
570 self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
572 route.add_vpp_config()
574 # configure encaps IPv6 source address
575 # needs to be done before SR Policy config
577 self.vapi.cli("set sr encaps source addr a3::")
580 # configure SRv6 Policy
581 # Note: segment list order: first -> last
582 sr_policy = VppSRv6Policy(
586 sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
589 segments=["a4::", "a5::", "a6::c7"],
592 sr_policy.add_vpp_config()
593 self.sr_policy = sr_policy
595 # log the sr policies
596 self.logger.info(self.vapi.cli("show sr policies"))
598 # steer L2 traffic into SRv6 Policy
599 # use the bsid of the above self.sr_policy
600 pol_steering = VppSRv6Steering(
602 bsid=self.sr_policy.bsid,
605 traffic_type=SRv6PolicySteeringTypes.SR_STEER_L2,
608 sw_if_index=self.pg0.sw_if_index,
610 pol_steering.add_vpp_config()
612 # log the sr steering policies
613 self.logger.info(self.vapi.cli("show sr steering policies"))
616 count = len(self.pg_packet_sizes)
619 # create L2 packets without dot1q header
620 packet_header = self.create_packet_header_L2()
621 # create traffic stream pg0->pg1
624 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
628 # create L2 packets with dot1q header
629 packet_header = self.create_packet_header_L2(vlan=123)
630 # create traffic stream pg0->pg1
633 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
637 # send packets and verify received packets
638 self.send_and_verify_pkts(
639 self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_T_Encaps_L2
642 # log the localsid counters
643 self.logger.info(self.vapi.cli("show sr localsid"))
646 pol_steering.remove_vpp_config()
647 self.logger.info(self.vapi.cli("show sr steering policies"))
650 self.sr_policy.remove_vpp_config()
651 self.logger.info(self.vapi.cli("show sr policies"))
657 self.teardown_interfaces()
659 def test_SRv6_End(self):
660 """Test SRv6 End (without PSP) behavior."""
661 # send traffic to one destination interface
662 # source and destination interfaces are IPv6 only
663 self.setup_interfaces(ipv6=[True, True])
665 # configure FIB entries
667 self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
669 route.add_vpp_config()
671 # configure SRv6 localSID End without PSP behavior
672 localsid = VppSRv6LocalSID(
675 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
682 localsid.add_vpp_config()
684 self.logger.debug(self.vapi.cli("show sr localsid"))
686 # create IPv6 packets with SRH (SL=2, SL=1, SL=0)
687 # send one packet per SL value per packet size
688 # SL=0 packet with localSID End with USP needs 2nd SRH
689 count = len(self.pg_packet_sizes)
690 dst_inner = "a4::1234"
693 # packets with segments-left 2, active segment a3::
694 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
695 dst_inner, sidlist=["a5::", "a4::", "a3::"], segleft=2
697 # create traffic stream pg0->pg1
700 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
704 # packets with segments-left 1, active segment a3::
705 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
706 dst_inner, sidlist=["a4::", "a3::", "a2::"], segleft=1
708 # add to traffic stream pg0->pg1
711 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
715 # TODO: test behavior with SL=0 packet (needs 2*SRH?)
717 expected_count = len(pkts)
719 # packets without SRH (should not crash)
720 packet_header = self.create_packet_header_IPv6("a3::")
721 # create traffic stream pg0->pg1
724 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
728 # send packets and verify received packets
729 self.send_and_verify_pkts(
733 self.compare_rx_tx_packet_End,
734 expected_count=expected_count,
737 # log the localsid counters
738 self.logger.info(self.vapi.cli("show sr localsid"))
740 # remove SRv6 localSIDs
741 localsid.remove_vpp_config()
747 self.teardown_interfaces()
749 def test_SRv6_End_with_PSP(self):
750 """Test SRv6 End with PSP behavior."""
751 # send traffic to one destination interface
752 # source and destination interfaces are IPv6 only
753 self.setup_interfaces(ipv6=[True, True])
755 # configure FIB entries
757 self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
759 route.add_vpp_config()
761 # configure SRv6 localSID End with PSP behavior
762 localsid = VppSRv6LocalSID(
765 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
772 localsid.add_vpp_config()
774 self.logger.debug(self.vapi.cli("show sr localsid"))
776 # create IPv6 packets with SRH (SL=2, SL=1)
777 # send one packet per SL value per packet size
778 # SL=0 packet with localSID End with PSP is dropped
779 count = len(self.pg_packet_sizes)
780 dst_inner = "a4::1234"
783 # packets with segments-left 2, active segment a3::
784 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
785 dst_inner, sidlist=["a5::", "a4::", "a3::"], segleft=2
787 # create traffic stream pg0->pg1
790 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
794 # packets with segments-left 1, active segment a3::
795 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
796 dst_inner, sidlist=["a4::", "a3::", "a2::"], segleft=1
798 # add to traffic stream pg0->pg1
801 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
805 # send packets and verify received packets
806 self.send_and_verify_pkts(
807 self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_End_PSP
810 # log the localsid counters
811 self.logger.info(self.vapi.cli("show sr localsid"))
813 # remove SRv6 localSIDs
814 localsid.remove_vpp_config()
820 self.teardown_interfaces()
822 def test_SRv6_End_X(self):
823 """Test SRv6 End.X (without PSP) behavior."""
824 # create three interfaces (1 source, 2 destinations)
825 # source and destination interfaces are IPv6 only
826 self.setup_interfaces(ipv6=[True, True, True])
828 # configure FIB entries
829 # a4::/64 via pg1 and pg2
835 VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index),
836 VppRoutePath(self.pg2.remote_ip6, self.pg2.sw_if_index),
839 route.add_vpp_config()
840 self.logger.debug(self.vapi.cli("show ip6 fib"))
842 # configure SRv6 localSID End.X without PSP behavior
843 # End.X points to interface pg1
844 localsid = VppSRv6LocalSID(
847 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
848 nh_addr=self.pg1.remote_ip6,
850 sw_if_index=self.pg1.sw_if_index,
854 localsid.add_vpp_config()
856 self.logger.debug(self.vapi.cli("show sr localsid"))
858 # create IPv6 packets with SRH (SL=2, SL=1)
859 # send one packet per SL value per packet size
860 # SL=0 packet with localSID End with PSP is dropped
861 count = len(self.pg_packet_sizes)
862 dst_inner = "a4::1234"
865 # packets with segments-left 2, active segment a3::c4
866 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
867 dst_inner, sidlist=["a5::", "a4::", "a3::c4"], segleft=2
869 # create traffic stream pg0->pg1
872 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
876 # packets with segments-left 1, active segment a3::c4
877 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
878 dst_inner, sidlist=["a4::", "a3::c4", "a2::"], segleft=1
880 # add to traffic stream pg0->pg1
883 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
887 # send packets and verify received packets
888 # using same comparison function as End (no PSP)
889 self.send_and_verify_pkts(
890 self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_End
893 # assert nothing was received on the other interface (pg2)
894 self.pg2.assert_nothing_captured(remark="mis-directed packet(s)")
896 # log the localsid counters
897 self.logger.info(self.vapi.cli("show sr localsid"))
899 # remove SRv6 localSIDs
900 localsid.remove_vpp_config()
906 self.teardown_interfaces()
908 def test_SRv6_End_X_with_PSP(self):
909 """Test SRv6 End.X with PSP behavior."""
910 # create three interfaces (1 source, 2 destinations)
911 # source and destination interfaces are IPv6 only
912 self.setup_interfaces(ipv6=[True, True, True])
914 # configure FIB entries
915 # a4::/64 via pg1 and pg2
921 VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index),
922 VppRoutePath(self.pg2.remote_ip6, self.pg2.sw_if_index),
925 route.add_vpp_config()
927 # configure SRv6 localSID End with PSP behavior
928 localsid = VppSRv6LocalSID(
931 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
932 nh_addr=self.pg1.remote_ip6,
934 sw_if_index=self.pg1.sw_if_index,
938 localsid.add_vpp_config()
940 self.logger.debug(self.vapi.cli("show sr localsid"))
942 # create IPv6 packets with SRH (SL=2, SL=1)
943 # send one packet per SL value per packet size
944 # SL=0 packet with localSID End with PSP is dropped
945 count = len(self.pg_packet_sizes)
946 dst_inner = "a4::1234"
949 # packets with segments-left 2, active segment a3::
950 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
951 dst_inner, sidlist=["a5::", "a4::", "a3::c4"], segleft=2
953 # create traffic stream pg0->pg1
956 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
960 # packets with segments-left 1, active segment a3::
961 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
962 dst_inner, sidlist=["a4::", "a3::c4", "a2::"], segleft=1
964 # add to traffic stream pg0->pg1
967 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
971 # send packets and verify received packets
972 # using same comparison function as End with PSP
973 self.send_and_verify_pkts(
974 self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_End_PSP
977 # assert nothing was received on the other interface (pg2)
978 self.pg2.assert_nothing_captured(remark="mis-directed packet(s)")
980 # log the localsid counters
981 self.logger.info(self.vapi.cli("show sr localsid"))
983 # remove SRv6 localSIDs
984 localsid.remove_vpp_config()
990 self.teardown_interfaces()
992 def test_SRv6_End_DX6(self):
993 """Test SRv6 End.DX6 behavior."""
994 # send traffic to one destination interface
995 # source and destination interfaces are IPv6 only
996 self.setup_interfaces(ipv6=[True, True])
998 # configure SRv6 localSID End.DX6 behavior
999 localsid = VppSRv6LocalSID(
1002 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX6,
1003 nh_addr=self.pg1.remote_ip6,
1005 sw_if_index=self.pg1.sw_if_index,
1009 localsid.add_vpp_config()
1011 self.logger.debug(self.vapi.cli("show sr localsid"))
1013 # create IPv6 packets with SRH (SL=0)
1014 # send one packet per packet size
1015 count = len(self.pg_packet_sizes)
1016 dst_inner = "a4::1234" # inner header destination address
1019 # packets with SRH, segments-left 0, active segment a3::c4
1020 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
1021 dst_inner, sidlist=["a3::c4", "a2::", "a1::"], segleft=0
1023 # add to traffic stream pg0->pg1
1026 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
1030 # packets without SRH, IPv6 in IPv6
1031 # outer IPv6 dest addr is the localsid End.DX6
1032 packet_header = self.create_packet_header_IPv6_IPv6(
1033 dst_inner, dst_outer="a3::c4"
1035 # add to traffic stream pg0->pg1
1038 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
1042 # send packets and verify received packets
1043 self.send_and_verify_pkts(
1044 self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_End_DX6
1047 # log the localsid counters
1048 self.logger.info(self.vapi.cli("show sr localsid"))
1050 # remove SRv6 localSIDs
1051 localsid.remove_vpp_config()
1053 # cleanup interfaces
1054 self.teardown_interfaces()
1056 def test_SRv6_End_DT6(self):
1057 """Test SRv6 End.DT6 behavior."""
1058 # create three interfaces (1 source, 2 destinations)
1059 # all interfaces are IPv6 only
1060 # source interface in global FIB (0)
1061 # destination interfaces in global and vrf
1063 ipt = VppIpTable(self, vrf_1, is_ip6=True)
1064 ipt.add_vpp_config()
1065 self.setup_interfaces(ipv6=[True, True, True], ipv6_table_id=[0, 0, vrf_1])
1067 # configure FIB entries
1068 # a4::/64 is reachable
1069 # via pg1 in table 0 (global)
1070 # and via pg2 in table vrf_1
1071 route0 = VppIpRoute(
1075 [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index, nh_table_id=0)],
1078 route0.add_vpp_config()
1079 route1 = VppIpRoute(
1085 self.pg2.remote_ip6, self.pg2.sw_if_index, nh_table_id=vrf_1
1090 route1.add_vpp_config()
1091 self.logger.debug(self.vapi.cli("show ip6 fib"))
1093 # configure SRv6 localSID End.DT6 behavior
1095 # fib_table: where the localsid is installed
1096 # sw_if_index: in T-variants of localsid this is the vrf table_id
1097 localsid = VppSRv6LocalSID(
1100 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT6,
1107 localsid.add_vpp_config()
1109 self.logger.debug(self.vapi.cli("show sr localsid"))
1111 # create IPv6 packets with SRH (SL=0)
1112 # send one packet per packet size
1113 count = len(self.pg_packet_sizes)
1114 dst_inner = "a4::1234" # inner header destination address
1117 # packets with SRH, segments-left 0, active segment a3::c4
1118 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
1119 dst_inner, sidlist=["a3::c4", "a2::", "a1::"], segleft=0
1121 # add to traffic stream pg0->pg1
1124 self.pg0, self.pg2, packet_header, self.pg_packet_sizes, count
1128 # packets without SRH, IPv6 in IPv6
1129 # outer IPv6 dest addr is the localsid End.DT6
1130 packet_header = self.create_packet_header_IPv6_IPv6(
1131 dst_inner, dst_outer="a3::c4"
1133 # add to traffic stream pg0->pg1
1136 self.pg0, self.pg2, packet_header, self.pg_packet_sizes, count
1140 # send packets and verify received packets
1141 # using same comparison function as End.DX6
1142 self.send_and_verify_pkts(
1143 self.pg0, pkts, self.pg2, self.compare_rx_tx_packet_End_DX6
1146 # assert nothing was received on the other interface (pg2)
1147 self.pg1.assert_nothing_captured(remark="mis-directed packet(s)")
1149 # log the localsid counters
1150 self.logger.info(self.vapi.cli("show sr localsid"))
1152 # remove SRv6 localSIDs
1153 localsid.remove_vpp_config()
1155 # remove FIB entries
1158 # cleanup interfaces
1159 self.teardown_interfaces()
1161 def test_SRv6_End_DX4(self):
1162 """Test SRv6 End.DX4 behavior."""
1163 # send traffic to one destination interface
1164 # source interface is IPv6 only
1165 # destination interface is IPv4 only
1166 self.setup_interfaces(ipv6=[True, False], ipv4=[False, True])
1168 # configure SRv6 localSID End.DX4 behavior
1169 localsid = VppSRv6LocalSID(
1172 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX4,
1173 nh_addr=self.pg1.remote_ip4,
1175 sw_if_index=self.pg1.sw_if_index,
1179 localsid.add_vpp_config()
1181 self.logger.debug(self.vapi.cli("show sr localsid"))
1183 # send one packet per packet size
1184 count = len(self.pg_packet_sizes)
1185 dst_inner = "4.1.1.123" # inner header destination address
1188 # packets with SRH, segments-left 0, active segment a3::c4
1189 packet_header = self.create_packet_header_IPv6_SRH_IPv4(
1190 dst_inner, sidlist=["a3::c4", "a2::", "a1::"], segleft=0
1192 # add to traffic stream pg0->pg1
1195 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
1199 # packets without SRH, IPv4 in IPv6
1200 # outer IPv6 dest addr is the localsid End.DX4
1201 packet_header = self.create_packet_header_IPv6_IPv4(
1202 dst_inner, dst_outer="a3::c4"
1204 # add to traffic stream pg0->pg1
1207 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
1211 # send packets and verify received packets
1212 self.send_and_verify_pkts(
1213 self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_End_DX4
1216 # log the localsid counters
1217 self.logger.info(self.vapi.cli("show sr localsid"))
1219 # remove SRv6 localSIDs
1220 localsid.remove_vpp_config()
1222 # cleanup interfaces
1223 self.teardown_interfaces()
1225 def test_SRv6_End_DT4(self):
1226 """Test SRv6 End.DT4 behavior."""
1227 # create three interfaces (1 source, 2 destinations)
1228 # source interface is IPv6-only
1229 # destination interfaces are IPv4 only
1230 # source interface in global FIB (0)
1231 # destination interfaces in global and vrf
1233 ipt = VppIpTable(self, vrf_1)
1234 ipt.add_vpp_config()
1235 self.setup_interfaces(
1236 ipv6=[True, False, False],
1237 ipv4=[False, True, True],
1238 ipv6_table_id=[0, 0, 0],
1239 ipv4_table_id=[0, 0, vrf_1],
1242 # configure FIB entries
1243 # 4.1.1.0/24 is reachable
1244 # via pg1 in table 0 (global)
1245 # and via pg2 in table vrf_1
1246 route0 = VppIpRoute(
1250 [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index, nh_table_id=0)],
1253 route0.add_vpp_config()
1254 route1 = VppIpRoute(
1260 self.pg2.remote_ip4, self.pg2.sw_if_index, nh_table_id=vrf_1
1265 route1.add_vpp_config()
1266 self.logger.debug(self.vapi.cli("show ip fib"))
1268 # configure SRv6 localSID End.DT6 behavior
1270 # fib_table: where the localsid is installed
1271 # sw_if_index: in T-variants of localsid: vrf table_id
1272 localsid = VppSRv6LocalSID(
1275 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT4,
1282 localsid.add_vpp_config()
1284 self.logger.debug(self.vapi.cli("show sr localsid"))
1286 # create IPv6 packets with SRH (SL=0)
1287 # send one packet per packet size
1288 count = len(self.pg_packet_sizes)
1289 dst_inner = "4.1.1.123" # inner header destination address
1292 # packets with SRH, segments-left 0, active segment a3::c4
1293 packet_header = self.create_packet_header_IPv6_SRH_IPv4(
1294 dst_inner, sidlist=["a3::c4", "a2::", "a1::"], segleft=0
1296 # add to traffic stream pg0->pg1
1299 self.pg0, self.pg2, packet_header, self.pg_packet_sizes, count
1303 # packets without SRH, IPv6 in IPv6
1304 # outer IPv6 dest addr is the localsid End.DX4
1305 packet_header = self.create_packet_header_IPv6_IPv4(
1306 dst_inner, dst_outer="a3::c4"
1308 # add to traffic stream pg0->pg1
1311 self.pg0, self.pg2, packet_header, self.pg_packet_sizes, count
1315 # send packets and verify received packets
1316 # using same comparison function as End.DX4
1317 self.send_and_verify_pkts(
1318 self.pg0, pkts, self.pg2, self.compare_rx_tx_packet_End_DX4
1321 # assert nothing was received on the other interface (pg2)
1322 self.pg1.assert_nothing_captured(remark="mis-directed packet(s)")
1324 # log the localsid counters
1325 self.logger.info(self.vapi.cli("show sr localsid"))
1327 # remove SRv6 localSIDs
1328 localsid.remove_vpp_config()
1330 # remove FIB entries
1333 # cleanup interfaces
1334 self.teardown_interfaces()
1336 def test_SRv6_End_DX2(self):
1337 """Test SRv6 End.DX2 behavior."""
1338 # send traffic to one destination interface
1339 # source interface is IPv6 only
1340 self.setup_interfaces(ipv6=[True, False], ipv4=[False, False])
1342 # configure SRv6 localSID End.DX2 behavior
1343 localsid = VppSRv6LocalSID(
1346 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX2,
1349 sw_if_index=self.pg1.sw_if_index,
1353 localsid.add_vpp_config()
1355 self.logger.debug(self.vapi.cli("show sr localsid"))
1357 # send one packet per packet size
1358 count = len(self.pg_packet_sizes)
1361 # packets with SRH, segments-left 0, active segment a3::c4
1362 # L2 has no dot1q header
1363 packet_header = self.create_packet_header_IPv6_SRH_L2(
1364 sidlist=["a3::c4", "a2::", "a1::"], segleft=0, vlan=0
1366 # add to traffic stream pg0->pg1
1369 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
1373 # packets with SRH, segments-left 0, active segment a3::c4
1374 # L2 has dot1q header
1375 packet_header = self.create_packet_header_IPv6_SRH_L2(
1376 sidlist=["a3::c4", "a2::", "a1::"], segleft=0, vlan=123
1378 # add to traffic stream pg0->pg1
1381 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
1385 # packets without SRH, L2 in IPv6
1386 # outer IPv6 dest addr is the localsid End.DX2
1387 # L2 has no dot1q header
1388 packet_header = self.create_packet_header_IPv6_L2(dst_outer="a3::c4", vlan=0)
1389 # add to traffic stream pg0->pg1
1392 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
1396 # packets without SRH, L2 in IPv6
1397 # outer IPv6 dest addr is the localsid End.DX2
1398 # L2 has dot1q header
1399 packet_header = self.create_packet_header_IPv6_L2(dst_outer="a3::c4", vlan=123)
1400 # add to traffic stream pg0->pg1
1403 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
1407 # send packets and verify received packets
1408 self.send_and_verify_pkts(
1409 self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_End_DX2
1412 # log the localsid counters
1413 self.logger.info(self.vapi.cli("show sr localsid"))
1415 # remove SRv6 localSIDs
1416 localsid.remove_vpp_config()
1418 # cleanup interfaces
1419 self.teardown_interfaces()
1421 @unittest.skipUnless(0, "PC to fix")
1422 def test_SRv6_T_Insert_Classifier(self):
1423 """Test SRv6 Transit.Insert behavior (IPv6 only).
1424 steer packets using the classifier
1426 # send traffic to one destination interface
1427 # source and destination are IPv6 only
1428 self.setup_interfaces(ipv6=[False, False, False, True, True])
1430 # configure FIB entries
1432 self, "a4::", 64, [VppRoutePath(self.pg4.remote_ip6, self.pg4.sw_if_index)]
1434 route.add_vpp_config()
1436 # configure encaps IPv6 source address
1437 # needs to be done before SR Policy config
1439 self.vapi.cli("set sr encaps source addr a3::")
1442 # configure SRv6 Policy
1443 # Note: segment list order: first -> last
1444 sr_policy = VppSRv6Policy(
1448 sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
1451 segments=["a4::", "a5::", "a6::c7"],
1454 sr_policy.add_vpp_config()
1455 self.sr_policy = sr_policy
1457 # log the sr policies
1458 self.logger.info(self.vapi.cli("show sr policies"))
1460 # add classify table
1461 # mask on dst ip address prefix a7::/8
1462 mask = "{!s:0<16}".format("ff")
1463 r = self.vapi.classify_add_del_table(
1465 binascii.unhexlify(mask),
1466 match_n_vectors=(len(mask) - 1) // 32 + 1,
1467 current_data_flag=1,
1470 self.assertIsNotNone(r, "No response msg for add_del_table")
1471 table_index = r.new_table_index
1473 # add the source routing node as a ip6 inacl netxt node
1474 r = self.vapi.add_node_next("ip6-inacl", "sr-pl-rewrite-insert")
1475 inacl_next_node_index = r.node_index
1477 match = "{!s:0<16}".format("a7")
1478 r = self.vapi.classify_add_del_session(
1481 binascii.unhexlify(match),
1482 hit_next_index=inacl_next_node_index,
1486 self.assertIsNotNone(r, "No response msg for add_del_session")
1488 # log the classify table used in the steering policy
1489 self.logger.info(self.vapi.cli("show classify table"))
1491 r = self.vapi.input_acl_set_interface(
1492 is_add=1, sw_if_index=self.pg3.sw_if_index, ip6_table_index=table_index
1494 self.assertIsNotNone(r, "No response msg for input_acl_set_interface")
1497 self.logger.info(self.vapi.cli("show inacl type ip6"))
1500 count = len(self.pg_packet_sizes)
1501 dst_inner = "a7::1234"
1504 # create IPv6 packets without SRH
1505 packet_header = self.create_packet_header_IPv6(dst_inner)
1506 # create traffic stream pg3->pg4
1509 self.pg3, self.pg4, packet_header, self.pg_packet_sizes, count
1513 # create IPv6 packets with SRH
1514 # packets with segments-left 1, active segment a7::
1515 packet_header = self.create_packet_header_IPv6_SRH(
1516 sidlist=["a8::", "a7::", "a6::"], segleft=1
1518 # create traffic stream pg3->pg4
1521 self.pg3, self.pg4, packet_header, self.pg_packet_sizes, count
1525 # send packets and verify received packets
1526 self.send_and_verify_pkts(
1527 self.pg3, pkts, self.pg4, self.compare_rx_tx_packet_T_Insert
1530 # remove the interface l2 input feature
1531 r = self.vapi.input_acl_set_interface(
1532 is_add=0, sw_if_index=self.pg3.sw_if_index, ip6_table_index=table_index
1534 self.assertIsNotNone(r, "No response msg for input_acl_set_interface")
1536 # log the ip6 inacl after cleaning
1537 self.logger.info(self.vapi.cli("show inacl type ip6"))
1539 # log the localsid counters
1540 self.logger.info(self.vapi.cli("show sr localsid"))
1542 # remove classifier SR steering
1543 # classifier_steering.remove_vpp_config()
1544 self.logger.info(self.vapi.cli("show sr steering policies"))
1546 # remove SR Policies
1547 self.sr_policy.remove_vpp_config()
1548 self.logger.info(self.vapi.cli("show sr policies"))
1550 # remove classify session and table
1551 r = self.vapi.classify_add_del_session(
1552 0, table_index, binascii.unhexlify(match)
1554 self.assertIsNotNone(r, "No response msg for add_del_session")
1556 r = self.vapi.classify_add_del_table(
1557 0, binascii.unhexlify(mask), table_index=table_index
1559 self.assertIsNotNone(r, "No response msg for add_del_table")
1561 self.logger.info(self.vapi.cli("show classify table"))
1563 # remove FIB entries
1566 # cleanup interfaces
1567 self.teardown_interfaces()
1569 def compare_rx_tx_packet_T_Encaps(self, tx_pkt, rx_pkt):
1570 """Compare input and output packet after passing T.Encaps
1572 :param tx_pkt: transmitted packet
1573 :param rx_pkt: received packet
1575 # T.Encaps updates the headers as follows:
1576 # SR Policy seglist (S3, S2, S1)
1577 # SR Policy source C
1580 # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(A, B2)
1582 # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1)
1583 # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(a, B2)SRH(B3, B2, B1; SL=1)
1585 # get first (outer) IPv6 header of rx'ed packet
1586 rx_ip = rx_pkt.getlayer(IPv6)
1589 tx_ip = tx_pkt.getlayer(IPv6)
1591 # expected segment-list
1592 seglist = self.sr_policy.segments
1593 # reverse list to get order as in SRH
1594 tx_seglist = seglist[::-1]
1596 # get source address of SR Policy
1597 sr_policy_source = self.sr_policy.source
1599 # rx'ed packet should have SRH
1600 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1602 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1604 # received ip.src should be equal to SR Policy source
1605 self.assertEqual(rx_ip.src, sr_policy_source)
1606 # received ip.dst should be equal to expected sidlist[lastentry]
1607 self.assertEqual(rx_ip.dst, tx_seglist[-1])
1608 # rx'ed seglist should be equal to expected seglist
1609 self.assertEqual(rx_srh.addresses, tx_seglist)
1610 # segleft should be equal to size expected seglist-1
1611 self.assertEqual(rx_srh.segleft, len(tx_seglist) - 1)
1612 # segleft should be equal to lastentry
1613 self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1615 # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1616 # except for the hop-limit field
1617 # -> update tx'ed hlim to the expected hlim
1618 tx_ip.hlim = tx_ip.hlim - 1
1620 self.assertEqual(rx_srh.payload, tx_ip)
1622 self.logger.debug("packet verification: SUCCESS")
1624 def compare_rx_tx_packet_T_Encaps_IPv4(self, tx_pkt, rx_pkt):
1625 """Compare input and output packet after passing T.Encaps for IPv4
1627 :param tx_pkt: transmitted packet
1628 :param rx_pkt: received packet
1630 # T.Encaps for IPv4 updates the headers as follows:
1631 # SR Policy seglist (S3, S2, S1)
1632 # SR Policy source C
1635 # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv4(A, B2)
1637 # get first (outer) IPv6 header of rx'ed packet
1638 rx_ip = rx_pkt.getlayer(IPv6)
1641 tx_ip = tx_pkt.getlayer(IP)
1643 # expected segment-list
1644 seglist = self.sr_policy.segments
1645 # reverse list to get order as in SRH
1646 tx_seglist = seglist[::-1]
1648 # get source address of SR Policy
1649 sr_policy_source = self.sr_policy.source
1651 # checks common to cases tx with and without SRH
1652 # rx'ed packet should have SRH and IPv4 header
1653 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1654 self.assertTrue(rx_ip.payload.haslayer(IP))
1656 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1658 # received ip.src should be equal to SR Policy source
1659 self.assertEqual(rx_ip.src, sr_policy_source)
1660 # received ip.dst should be equal to sidlist[lastentry]
1661 self.assertEqual(rx_ip.dst, tx_seglist[-1])
1662 # rx'ed seglist should be equal to seglist
1663 self.assertEqual(rx_srh.addresses, tx_seglist)
1664 # segleft should be equal to size seglist-1
1665 self.assertEqual(rx_srh.segleft, len(tx_seglist) - 1)
1666 # segleft should be equal to lastentry
1667 self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1669 # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1670 # except for the ttl field and ip checksum
1671 # -> adjust tx'ed ttl to expected ttl
1672 tx_ip.ttl = tx_ip.ttl - 1
1673 # -> set tx'ed ip checksum to None and let scapy recompute
1675 # read back the pkt (with str()) to force computing these fields
1676 # probably other ways to accomplish this are possible
1677 tx_ip = IP(scapy.compat.raw(tx_ip))
1679 self.assertEqual(rx_srh.payload, tx_ip)
1681 self.logger.debug("packet verification: SUCCESS")
1683 def compare_rx_tx_packet_T_Encaps_L2(self, tx_pkt, rx_pkt):
1684 """Compare input and output packet after passing T.Encaps for L2
1686 :param tx_pkt: transmitted packet
1687 :param rx_pkt: received packet
1689 # T.Encaps for L2 updates the headers as follows:
1690 # SR Policy seglist (S3, S2, S1)
1691 # SR Policy source C
1694 # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)L2
1696 # get first (outer) IPv6 header of rx'ed packet
1697 rx_ip = rx_pkt.getlayer(IPv6)
1700 tx_ether = tx_pkt.getlayer(Ether)
1702 # expected segment-list
1703 seglist = self.sr_policy.segments
1704 # reverse list to get order as in SRH
1705 tx_seglist = seglist[::-1]
1707 # get source address of SR Policy
1708 sr_policy_source = self.sr_policy.source
1710 # rx'ed packet should have SRH
1711 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1713 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1715 # received ip.src should be equal to SR Policy source
1716 self.assertEqual(rx_ip.src, sr_policy_source)
1717 # received ip.dst should be equal to sidlist[lastentry]
1718 self.assertEqual(rx_ip.dst, tx_seglist[-1])
1719 # rx'ed seglist should be equal to seglist
1720 self.assertEqual(rx_srh.addresses, tx_seglist)
1721 # segleft should be equal to size seglist-1
1722 self.assertEqual(rx_srh.segleft, len(tx_seglist) - 1)
1723 # segleft should be equal to lastentry
1724 self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1725 # nh should be "No Next Header" (143)
1726 self.assertEqual(rx_srh.nh, 143)
1728 # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1729 self.assertEqual(Ether(scapy.compat.raw(rx_srh.payload)), tx_ether)
1731 self.logger.debug("packet verification: SUCCESS")
1733 def compare_rx_tx_packet_T_Insert(self, tx_pkt, rx_pkt):
1734 """Compare input and output packet after passing T.Insert
1736 :param tx_pkt: transmitted packet
1737 :param rx_pkt: received packet
1739 # T.Insert updates the headers as follows:
1742 # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)
1744 # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1)
1745 # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)SRH(B3, B2, B1; SL=1)
1747 # get first (outer) IPv6 header of rx'ed packet
1748 rx_ip = rx_pkt.getlayer(IPv6)
1753 rx_udp = rx_pkt[UDP]
1755 tx_ip = tx_pkt.getlayer(IPv6)
1758 # some packets have been tx'ed with an SRH, some without it
1759 # get SRH if tx'ed packet has it
1760 if tx_pkt.haslayer(IPv6ExtHdrSegmentRouting):
1761 tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1762 tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1763 tx_udp = tx_pkt[UDP]
1765 # expected segment-list (make copy of SR Policy segment list)
1766 seglist = self.sr_policy.segments[:]
1767 # expected seglist has initial dest addr as last segment
1768 seglist.append(tx_ip.dst)
1769 # reverse list to get order as in SRH
1770 tx_seglist = seglist[::-1]
1772 # get source address of SR Policy
1773 sr_policy_source = self.sr_policy.source
1775 # checks common to cases tx with and without SRH
1776 # rx'ed packet should have SRH and only one IPv6 header
1777 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1778 self.assertFalse(rx_ip.payload.haslayer(IPv6))
1780 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1782 # rx'ed ip.src should be equal to tx'ed ip.src
1783 self.assertEqual(rx_ip.src, tx_ip.src)
1784 # rx'ed ip.dst should be equal to sidlist[lastentry]
1785 self.assertEqual(rx_ip.dst, tx_seglist[-1])
1787 # rx'ed seglist should be equal to expected seglist
1788 self.assertEqual(rx_srh.addresses, tx_seglist)
1789 # segleft should be equal to size(expected seglist)-1
1790 self.assertEqual(rx_srh.segleft, len(tx_seglist) - 1)
1791 # segleft should be equal to lastentry
1792 self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1794 if tx_srh: # packet was tx'ed with SRH
1795 # packet should have 2nd SRH
1796 self.assertTrue(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting))
1798 rx_srh2 = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting, 2)
1800 # rx'ed srh2.addresses should be equal to tx'ed srh.addresses
1801 self.assertEqual(rx_srh2.addresses, tx_srh.addresses)
1802 # rx'ed srh2.segleft should be equal to tx'ed srh.segleft
1803 self.assertEqual(rx_srh2.segleft, tx_srh.segleft)
1804 # rx'ed srh2.lastentry should be equal to tx'ed srh.lastentry
1805 self.assertEqual(rx_srh2.lastentry, tx_srh.lastentry)
1807 else: # packet was tx'ed without SRH
1808 # rx packet should have no other SRH
1809 self.assertFalse(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting))
1811 # UDP layer should be unchanged
1812 self.assertEqual(rx_udp, tx_udp)
1814 self.logger.debug("packet verification: SUCCESS")
1816 def compare_rx_tx_packet_End(self, tx_pkt, rx_pkt):
1817 """Compare input and output packet after passing End (without PSP)
1819 :param tx_pkt: transmitted packet
1820 :param rx_pkt: received packet
1822 # End (no PSP) updates the headers as follows:
1824 # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2)
1825 # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1827 # get first (outer) IPv6 header of rx'ed packet
1828 rx_ip = rx_pkt.getlayer(IPv6)
1831 rx_udp = rx_pkt[UDP]
1833 tx_ip = tx_pkt.getlayer(IPv6)
1834 # we know the packet has been tx'ed
1835 # with an inner IPv6 header and an SRH
1836 tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1837 tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1838 tx_udp = tx_pkt[UDP]
1840 # common checks, regardless of tx segleft value
1841 # rx'ed packet should have 2nd IPv6 header
1842 self.assertTrue(rx_ip.payload.haslayer(IPv6))
1843 # get second (inner) IPv6 header
1844 rx_ip2 = rx_pkt.getlayer(IPv6, 2)
1846 if tx_ip.segleft > 0:
1847 # SRH should NOT have been popped:
1848 # End SID without PSP does not pop SRH if segleft>0
1849 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1850 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1852 # received ip.src should be equal to expected ip.src
1853 self.assertEqual(rx_ip.src, tx_ip.src)
1854 # sidlist should be unchanged
1855 self.assertEqual(rx_srh.addresses, tx_srh.addresses)
1856 # segleft should have been decremented
1857 self.assertEqual(rx_srh.segleft, tx_srh.segleft - 1)
1858 # received ip.dst should be equal to sidlist[segleft]
1859 self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
1860 # lastentry should be unchanged
1861 self.assertEqual(rx_srh.lastentry, tx_srh.lastentry)
1862 # inner IPv6 packet (ip2) should be unchanged
1863 self.assertEqual(rx_ip2.src, tx_ip2.src)
1864 self.assertEqual(rx_ip2.dst, tx_ip2.dst)
1865 # else: # tx_ip.segleft == 0
1866 # TODO: Does this work with 2 SRHs in ingress packet?
1868 # UDP layer should be unchanged
1869 self.assertEqual(rx_udp, tx_udp)
1871 self.logger.debug("packet verification: SUCCESS")
1873 def compare_rx_tx_packet_End_PSP(self, tx_pkt, rx_pkt):
1874 """Compare input and output packet after passing End with PSP
1876 :param tx_pkt: transmitted packet
1877 :param rx_pkt: received packet
1879 # End (PSP) updates the headers as follows:
1880 # IPv6 + SRH (SL>1):
1881 # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2)
1882 # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1883 # IPv6 + SRH (SL=1):
1884 # in: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1887 # get first (outer) IPv6 header of rx'ed packet
1888 rx_ip = rx_pkt.getlayer(IPv6)
1891 rx_udp = rx_pkt[UDP]
1893 tx_ip = tx_pkt.getlayer(IPv6)
1894 # we know the packet has been tx'ed
1895 # with an inner IPv6 header and an SRH
1896 tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1897 tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1898 tx_udp = tx_pkt[UDP]
1900 # common checks, regardless of tx segleft value
1901 self.assertTrue(rx_ip.payload.haslayer(IPv6))
1902 rx_ip2 = rx_pkt.getlayer(IPv6, 2)
1903 # inner IPv6 packet (ip2) should be unchanged
1904 self.assertEqual(rx_ip2.src, tx_ip2.src)
1905 self.assertEqual(rx_ip2.dst, tx_ip2.dst)
1907 if tx_ip.segleft > 1:
1908 # SRH should NOT have been popped:
1909 # End SID with PSP does not pop SRH if segleft>1
1910 # rx'ed packet should have SRH
1911 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1912 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1914 # received ip.src should be equal to expected ip.src
1915 self.assertEqual(rx_ip.src, tx_ip.src)
1916 # sidlist should be unchanged
1917 self.assertEqual(rx_srh.addresses, tx_srh.addresses)
1918 # segleft should have been decremented
1919 self.assertEqual(rx_srh.segleft, tx_srh.segleft - 1)
1920 # received ip.dst should be equal to sidlist[segleft]
1921 self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
1922 # lastentry should be unchanged
1923 self.assertEqual(rx_srh.lastentry, tx_srh.lastentry)
1925 else: # tx_ip.segleft <= 1
1926 # SRH should have been popped:
1927 # End SID with PSP and segleft=1 pops SRH
1928 # the two IPv6 headers are still present
1929 # outer IPv6 header has DA == last segment of popped SRH
1930 # SRH should not be present
1931 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1932 # outer IPv6 header ip.src should be equal to tx'ed ip.src
1933 self.assertEqual(rx_ip.src, tx_ip.src)
1934 # outer IPv6 header ip.dst should be = to tx'ed sidlist[segleft-1]
1935 self.assertEqual(rx_ip.dst, tx_srh.addresses[tx_srh.segleft - 1])
1937 # UDP layer should be unchanged
1938 self.assertEqual(rx_udp, tx_udp)
1940 self.logger.debug("packet verification: SUCCESS")
1942 def compare_rx_tx_packet_End_DX6(self, tx_pkt, rx_pkt):
1943 """Compare input and output packet after passing End.DX6
1945 :param tx_pkt: transmitted packet
1946 :param rx_pkt: received packet
1948 # End.DX6 updates the headers as follows:
1949 # IPv6 + SRH (SL=0):
1950 # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv6(B, D)
1953 # in: IPv6(A, S3)IPv6(B, D)
1956 # get first (outer) IPv6 header of rx'ed packet
1957 rx_ip = rx_pkt.getlayer(IPv6)
1959 tx_ip = tx_pkt.getlayer(IPv6)
1960 tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1962 # verify if rx'ed packet has no SRH
1963 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1965 # the whole rx_ip pkt should be equal to tx_ip2
1966 # except for the hlim field
1967 # -> adjust tx'ed hlim to expected hlim
1968 tx_ip2.hlim = tx_ip2.hlim - 1
1970 self.assertEqual(rx_ip, tx_ip2)
1972 self.logger.debug("packet verification: SUCCESS")
1974 def compare_rx_tx_packet_End_DX4(self, tx_pkt, rx_pkt):
1975 """Compare input and output packet after passing End.DX4
1977 :param tx_pkt: transmitted packet
1978 :param rx_pkt: received packet
1980 # End.DX4 updates the headers as follows:
1981 # IPv6 + SRH (SL=0):
1982 # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv4(B, D)
1985 # in: IPv6(A, S3)IPv4(B, D)
1988 # get IPv4 header of rx'ed packet
1989 rx_ip = rx_pkt.getlayer(IP)
1991 tx_ip = tx_pkt.getlayer(IPv6)
1992 tx_ip2 = tx_pkt.getlayer(IP)
1994 # verify if rx'ed packet has no SRH
1995 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1997 # the whole rx_ip pkt should be equal to tx_ip2
1998 # except for the ttl field and ip checksum
1999 # -> adjust tx'ed ttl to expected ttl
2000 tx_ip2.ttl = tx_ip2.ttl - 1
2001 # -> set tx'ed ip checksum to None and let scapy recompute
2002 tx_ip2.chksum = None
2003 # read back the pkt (with str()) to force computing these fields
2004 # probably other ways to accomplish this are possible
2005 tx_ip2 = IP(scapy.compat.raw(tx_ip2))
2007 self.assertEqual(rx_ip, tx_ip2)
2009 self.logger.debug("packet verification: SUCCESS")
2011 def compare_rx_tx_packet_End_DX2(self, tx_pkt, rx_pkt):
2012 """Compare input and output packet after passing End.DX2
2014 :param tx_pkt: transmitted packet
2015 :param rx_pkt: received packet
2017 # End.DX2 updates the headers as follows:
2018 # IPv6 + SRH (SL=0):
2019 # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)L2
2025 # get IPv4 header of rx'ed packet
2026 rx_eth = rx_pkt.getlayer(Ether)
2028 tx_ip = tx_pkt.getlayer(IPv6)
2029 # we can't just get the 2nd Ether layer
2030 # get the Raw content and dissect it as Ether
2031 tx_eth1 = Ether(scapy.compat.raw(tx_pkt[Raw]))
2033 # verify if rx'ed packet has no SRH
2034 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
2036 # the whole rx_eth pkt should be equal to tx_eth1
2037 self.assertEqual(rx_eth, tx_eth1)
2039 self.logger.debug("packet verification: SUCCESS")
2041 def create_stream(self, src_if, dst_if, packet_header, packet_sizes, count):
2042 """Create SRv6 input packet stream for defined interface.
2044 :param VppInterface src_if: Interface to create packet stream for
2045 :param VppInterface dst_if: destination interface of packet stream
2046 :param packet_header: Layer3 scapy packet headers,
2047 L2 is added when not provided,
2048 Raw(payload) with packet_info is added
2049 :param list packet_sizes: packet stream pckt sizes,sequentially applied
2050 to packets in stream have
2051 :param int count: number of packets in packet stream
2052 :return: list of packets
2054 self.logger.info("Creating packets")
2056 for i in range(0, count - 1):
2057 payload_info = self.create_packet_info(src_if, dst_if)
2058 self.logger.debug("Creating packet with index %d" % (payload_info.index))
2059 payload = self.info_to_payload(payload_info)
2060 # add L2 header if not yet provided in packet_header
2061 if packet_header.getlayer(0).name == "Ethernet":
2062 p = packet_header / Raw(payload)
2065 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
2069 size = packet_sizes[i % len(packet_sizes)]
2070 self.logger.debug("Packet size %d" % (size))
2071 self.extend_packet(p, size)
2072 # we need to store the packet with the automatic fields computed
2073 # read back the dumped packet (with str())
2074 # to force computing these fields
2075 # probably other ways are possible
2076 p = Ether(scapy.compat.raw(p))
2077 payload_info.data = p.copy()
2078 self.logger.debug(ppp("Created packet:", p))
2080 self.logger.info("Done creating packets")
2083 def send_and_verify_pkts(
2084 self, input, pkts, output, compare_func, expected_count=None
2086 """Send packets and verify received packets using compare_func
2088 :param input: ingress interface of DUT
2089 :param pkts: list of packets to transmit
2090 :param output: egress interface of DUT
2091 :param compare_func: function to compare in and out packets
2092 :param expected_count: expected number of captured packets (if
2093 different than len(pkts))
2095 # add traffic stream to input interface
2096 input.add_stream(pkts)
2098 # enable capture on all interfaces
2099 self.pg_enable_capture(self.pg_interfaces)
2102 self.logger.info("Starting traffic")
2105 # get output capture
2106 self.logger.info("Getting packet capture")
2107 capture = output.get_capture(expected_count=expected_count)
2109 # assert nothing was captured on input interface
2110 input.assert_nothing_captured()
2112 # verify captured packets
2113 self.verify_captured_pkts(output, capture, compare_func)
2115 def create_packet_header_IPv6(self, dst):
2116 """Create packet header: IPv6 header, UDP header
2118 :param dst: IPv6 destination address
2120 IPv6 source address is 1234::1
2121 UDP source port and destination port are 1234
2124 p = IPv6(src="1234::1", dst=dst) / UDP(sport=1234, dport=1234)
2127 def create_packet_header_IPv6_SRH(self, sidlist, segleft):
2128 """Create packet header: IPv6 header with SRH, UDP header
2130 :param list sidlist: segment list
2131 :param int segleft: segments-left field value
2133 IPv6 destination address is set to sidlist[segleft]
2134 IPv6 source addresses are 1234::1 and 4321::1
2135 UDP source port and destination port are 1234
2139 IPv6(src="1234::1", dst=sidlist[segleft])
2140 / IPv6ExtHdrSegmentRouting(addresses=sidlist)
2141 / UDP(sport=1234, dport=1234)
2145 def create_packet_header_IPv6_SRH_IPv6(self, dst, sidlist, segleft):
2146 """Create packet header: IPv6 encapsulated in SRv6:
2147 IPv6 header with SRH, IPv6 header, UDP header
2149 :param ipv6address dst: inner IPv6 destination address
2150 :param list sidlist: segment list of outer IPv6 SRH
2151 :param int segleft: segments-left field of outer IPv6 SRH
2153 Outer IPv6 destination address is set to sidlist[segleft]
2154 IPv6 source addresses are 1234::1 and 4321::1
2155 UDP source port and destination port are 1234
2159 IPv6(src="1234::1", dst=sidlist[segleft])
2160 / IPv6ExtHdrSegmentRouting(addresses=sidlist, segleft=segleft, nh=41)
2161 / IPv6(src="4321::1", dst=dst)
2162 / UDP(sport=1234, dport=1234)
2166 def create_packet_header_IPv6_IPv6(self, dst_inner, dst_outer):
2167 """Create packet header: IPv6 encapsulated in IPv6:
2168 IPv6 header, IPv6 header, UDP header
2170 :param ipv6address dst_inner: inner IPv6 destination address
2171 :param ipv6address dst_outer: outer IPv6 destination address
2173 IPv6 source addresses are 1234::1 and 4321::1
2174 UDP source port and destination port are 1234
2178 IPv6(src="1234::1", dst=dst_outer)
2179 / IPv6(src="4321::1", dst=dst_inner)
2180 / UDP(sport=1234, dport=1234)
2184 def create_packet_header_IPv6_SRH_SRH_IPv6(
2185 self, dst, sidlist1, segleft1, sidlist2, segleft2
2187 """Create packet header: IPv6 encapsulated in SRv6 with 2 SRH:
2188 IPv6 header with SRH, 2nd SRH, IPv6 header, UDP header
2190 :param ipv6address dst: inner IPv6 destination address
2191 :param list sidlist1: segment list of outer IPv6 SRH
2192 :param int segleft1: segments-left field of outer IPv6 SRH
2193 :param list sidlist2: segment list of inner IPv6 SRH
2194 :param int segleft2: segments-left field of inner IPv6 SRH
2196 Outer IPv6 destination address is set to sidlist[segleft]
2197 IPv6 source addresses are 1234::1 and 4321::1
2198 UDP source port and destination port are 1234
2202 IPv6(src="1234::1", dst=sidlist1[segleft1])
2203 / IPv6ExtHdrSegmentRouting(addresses=sidlist1, segleft=segleft1, nh=43)
2204 / IPv6ExtHdrSegmentRouting(addresses=sidlist2, segleft=segleft2, nh=41)
2205 / IPv6(src="4321::1", dst=dst)
2206 / UDP(sport=1234, dport=1234)
2210 def create_packet_header_IPv4(self, dst):
2211 """Create packet header: IPv4 header, UDP header
2213 :param dst: IPv4 destination address
2215 IPv4 source address is 123.1.1.1
2216 UDP source port and destination port are 1234
2219 p = IP(src="123.1.1.1", dst=dst) / UDP(sport=1234, dport=1234)
2222 def create_packet_header_IPv6_IPv4(self, dst_inner, dst_outer):
2223 """Create packet header: IPv4 encapsulated in IPv6:
2224 IPv6 header, IPv4 header, UDP header
2226 :param ipv4address dst_inner: inner IPv4 destination address
2227 :param ipv6address dst_outer: outer IPv6 destination address
2229 IPv6 source address is 1234::1
2230 IPv4 source address is 123.1.1.1
2231 UDP source port and destination port are 1234
2235 IPv6(src="1234::1", dst=dst_outer)
2236 / IP(src="123.1.1.1", dst=dst_inner)
2237 / UDP(sport=1234, dport=1234)
2241 def create_packet_header_IPv6_SRH_IPv4(self, dst, sidlist, segleft):
2242 """Create packet header: IPv4 encapsulated in SRv6:
2243 IPv6 header with SRH, IPv4 header, UDP header
2245 :param ipv4address dst: inner IPv4 destination address
2246 :param list sidlist: segment list of outer IPv6 SRH
2247 :param int segleft: segments-left field of outer IPv6 SRH
2249 Outer IPv6 destination address is set to sidlist[segleft]
2250 IPv6 source address is 1234::1
2251 IPv4 source address is 123.1.1.1
2252 UDP source port and destination port are 1234
2256 IPv6(src="1234::1", dst=sidlist[segleft])
2257 / IPv6ExtHdrSegmentRouting(addresses=sidlist, segleft=segleft, nh=4)
2258 / IP(src="123.1.1.1", dst=dst)
2259 / UDP(sport=1234, dport=1234)
2263 def create_packet_header_L2(self, vlan=0):
2264 """Create packet header: L2 header
2266 :param vlan: if vlan!=0 then add 802.1q header
2268 # Note: the dst addr ('00:55:44:33:22:11') is used in
2269 # the compare function compare_rx_tx_packet_T_Encaps_L2
2270 # to detect presence of L2 in SRH payload
2271 p = Ether(src="00:11:22:33:44:55", dst="00:55:44:33:22:11")
2272 etype = 0x8137 # IPX
2275 p /= Dot1Q(vlan=vlan, type=etype)
2280 def create_packet_header_IPv6_SRH_L2(self, sidlist, segleft, vlan=0):
2281 """Create packet header: L2 encapsulated in SRv6:
2282 IPv6 header with SRH, L2
2284 :param list sidlist: segment list of outer IPv6 SRH
2285 :param int segleft: segments-left field of outer IPv6 SRH
2286 :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
2288 Outer IPv6 destination address is set to sidlist[segleft]
2289 IPv6 source address is 1234::1
2291 eth = Ether(src="00:11:22:33:44:55", dst="00:55:44:33:22:11")
2292 etype = 0x8137 # IPX
2295 eth /= Dot1Q(vlan=vlan, type=etype)
2300 IPv6(src="1234::1", dst=sidlist[segleft])
2301 / IPv6ExtHdrSegmentRouting(addresses=sidlist, segleft=segleft, nh=143)
2306 def create_packet_header_IPv6_L2(self, dst_outer, vlan=0):
2307 """Create packet header: L2 encapsulated in IPv6:
2310 :param ipv6address dst_outer: outer IPv6 destination address
2311 :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
2313 eth = Ether(src="00:11:22:33:44:55", dst="00:55:44:33:22:11")
2314 etype = 0x8137 # IPX
2317 eth /= Dot1Q(vlan=vlan, type=etype)
2321 p = IPv6(src="1234::1", dst=dst_outer, nh=143) / eth
2324 def get_payload_info(self, packet):
2325 """Extract the payload_info from the packet"""
2326 # in most cases, payload_info is in packet[Raw]
2327 # but packet[Raw] gives the complete payload
2328 # (incl L2 header) for the T.Encaps L2 case
2330 payload_info = self.payload_to_info(packet[Raw])
2333 # remote L2 header from packet[Raw]:
2334 # take packet[Raw], convert it to an Ether layer
2335 # and then extract Raw from it
2336 payload_info = self.payload_to_info(Ether(scapy.compat.r(packet[Raw]))[Raw])
2340 def verify_captured_pkts(self, dst_if, capture, compare_func):
2342 Verify captured packet stream for specified interface.
2343 Compare ingress with egress packets using the specified compare fn
2345 :param dst_if: egress interface of DUT
2346 :param capture: captured packets
2347 :param compare_func: function to compare in and out packet
2350 "Verifying capture on interface %s using function %s"
2351 % (dst_if.name, compare_func.__name__)
2355 for i in self.pg_interfaces:
2356 last_info[i.sw_if_index] = None
2357 dst_sw_if_index = dst_if.sw_if_index
2359 for packet in capture:
2361 # extract payload_info from packet's payload
2362 payload_info = self.get_payload_info(packet)
2363 packet_index = payload_info.index
2365 self.logger.debug("Verifying packet with index %d" % (packet_index))
2366 # packet should have arrived on the expected interface
2367 self.assertEqual(payload_info.dst, dst_sw_if_index)
2369 "Got packet on interface %s: src=%u (idx=%u)"
2370 % (dst_if.name, payload_info.src, packet_index)
2373 # search for payload_info with same src and dst if_index
2374 # this will give us the transmitted packet
2375 next_info = self.get_next_packet_info_for_interface2(
2376 payload_info.src, dst_sw_if_index, last_info[payload_info.src]
2378 last_info[payload_info.src] = next_info
2379 # next_info should not be None
2380 self.assertTrue(next_info is not None)
2381 # index of tx and rx packets should be equal
2382 self.assertEqual(packet_index, next_info.index)
2383 # data field of next_info contains the tx packet
2384 txed_packet = next_info.data
2387 ppp("Transmitted packet:", txed_packet)
2388 ) # ppp=Pretty Print Packet
2390 self.logger.debug(ppp("Received packet:", packet))
2392 # compare rcvd packet with expected packet using compare_func
2393 compare_func(txed_packet, packet)
2396 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2399 # FIXME: there is no need to check manually that all the packets
2400 # arrived (already done so by get_capture); checking here
2401 # prevents testing packets that are expected to be dropped, so
2402 # commenting this out for now
2404 # have all expected packets arrived?
2405 # for i in self.pg_interfaces:
2406 # remaining_packet = self.get_next_packet_info_for_interface2(
2407 # i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
2408 # self.assertTrue(remaining_packet is None,
2409 # "Interface %s: Packet expected from interface %s "
2410 # "didn't arrive" % (dst_if.name, i.name))
2413 if __name__ == "__main__":
2414 unittest.main(testRunner=VppTestRunner)