5 from socket import AF_INET6
7 from framework import VppTestCase
8 from asfframework import VppTestRunner
9 from vpp_ip_route import VppIpRoute, VppRoutePath, VppIpTable
10 from vpp_srv6 import (
11 SRv6LocalSIDBehaviors,
17 SRv6PolicySteeringTypes,
21 from scapy.packet import Raw
22 from scapy.layers.l2 import Ether, Dot1Q
23 from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
24 from scapy.layers.inet import IP, UDP
29 class TestSRv6(VppTestCase):
34 super(TestSRv6, cls).setUpClass()
37 def tearDownClass(cls):
38 super(TestSRv6, cls).tearDownClass()
41 """Perform test setup before each test case."""
42 super(TestSRv6, self).setUp()
44 # packet sizes, inclusive L2 overhead
45 self.pg_packet_sizes = [64, 512, 1518, 9018]
48 self.reset_packet_infos()
51 """Clean up test setup after each test case."""
52 self.teardown_interfaces()
54 super(TestSRv6, self).tearDown()
56 def configure_interface(
57 self, interface, ipv6=False, ipv4=False, ipv6_table_id=0, ipv4_table_id=0
59 """Configure interface.
60 :param ipv6: configure IPv6 on interface
61 :param ipv4: configure IPv4 on interface
62 :param ipv6_table_id: FIB table_id for IPv6
63 :param ipv4_table_id: FIB table_id for IPv4
65 self.logger.debug("Configuring interface %s" % (interface.name))
67 self.logger.debug("Configuring IPv6")
68 interface.set_table_ip6(ipv6_table_id)
69 interface.config_ip6()
70 interface.resolve_ndp(timeout=5)
72 self.logger.debug("Configuring IPv4")
73 interface.set_table_ip4(ipv4_table_id)
74 interface.config_ip4()
75 interface.resolve_arp()
78 def setup_interfaces(self, ipv6=[], ipv4=[], ipv6_table_id=[], ipv4_table_id=[]):
79 """Create and configure interfaces.
81 :param ipv6: list of interface IPv6 capabilities
82 :param ipv4: list of interface IPv4 capabilities
83 :param ipv6_table_id: list of intf IPv6 FIB table_ids
84 :param ipv4_table_id: list of intf IPv4 FIB table_ids
85 :returns: List of created interfaces.
87 # how many interfaces?
92 self.logger.debug("Creating and configuring %d interfaces" % (count))
94 # fill up ipv6 and ipv4 lists if needed
95 # not enabled (False) is the default
97 ipv6 += (count - len(ipv6)) * [False]
99 ipv4 += (count - len(ipv4)) * [False]
101 # fill up table_id lists if needed
102 # table_id 0 (global) is the default
103 if len(ipv6_table_id) < count:
104 ipv6_table_id += (count - len(ipv6_table_id)) * [0]
105 if len(ipv4_table_id) < count:
106 ipv4_table_id += (count - len(ipv4_table_id)) * [0]
108 # create 'count' pg interfaces
109 self.create_pg_interfaces(range(count))
111 # setup all interfaces
112 for i in range(count):
113 intf = self.pg_interfaces[i]
114 self.configure_interface(
115 intf, ipv6[i], ipv4[i], ipv6_table_id[i], ipv4_table_id[i]
119 self.logger.debug(self.vapi.cli("show ip6 neighbors"))
121 self.logger.debug(self.vapi.cli("show ip4 neighbors"))
122 self.logger.debug(self.vapi.cli("show interface"))
123 self.logger.debug(self.vapi.cli("show hardware"))
125 return self.pg_interfaces
127 def teardown_interfaces(self):
128 """Unconfigure and bring down interface."""
129 self.logger.debug("Tearing down interfaces")
130 # tear down all interfaces
131 # AFAIK they cannot be deleted
132 for i in self.pg_interfaces:
133 self.logger.debug("Tear down interface %s" % (i.name))
139 @unittest.skipUnless(0, "PC to fix")
140 def test_SRv6_T_Encaps(self):
141 """Test SRv6 Transit.Encaps behavior for IPv6."""
142 # send traffic to one destination interface
143 # source and destination are IPv6 only
144 self.setup_interfaces(ipv6=[True, True])
146 # configure FIB entries
148 self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
150 route.add_vpp_config()
152 # configure encaps IPv6 source address
153 # needs to be done before SR Policy config
155 self.vapi.cli("set sr encaps source addr a3::")
158 # configure SRv6 Policy
159 # Note: segment list order: first -> last
160 sr_policy = VppSRv6Policy(
164 sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
167 segments=["a4::", "a5::", "a6::c7"],
170 sr_policy.add_vpp_config()
171 self.sr_policy = sr_policy
173 # log the sr policies
174 self.logger.info(self.vapi.cli("show sr policies"))
176 # steer IPv6 traffic to a7::/64 into SRv6 Policy
177 # use the bsid of the above self.sr_policy
178 pol_steering = VppSRv6Steering(
180 bsid=self.sr_policy.bsid,
183 traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
188 pol_steering.add_vpp_config()
190 # log the sr steering policies
191 self.logger.info(self.vapi.cli("show sr steering policies"))
194 count = len(self.pg_packet_sizes)
195 dst_inner = "a7::1234"
198 # create IPv6 packets without SRH
199 packet_header = self.create_packet_header_IPv6(dst_inner)
200 # create traffic stream pg0->pg1
203 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
207 # create IPv6 packets with SRH
208 # packets with segments-left 1, active segment a7::
209 packet_header = self.create_packet_header_IPv6_SRH(
210 sidlist=["a8::", "a7::", "a6::"], segleft=1
212 # create traffic stream pg0->pg1
215 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
219 # create IPv6 packets with SRH and IPv6
220 # packets with segments-left 1, active segment a7::
221 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
222 dst_inner, sidlist=["a8::", "a7::", "a6::"], segleft=1
224 # create traffic stream pg0->pg1
227 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
231 # send packets and verify received packets
232 self.send_and_verify_pkts(
233 self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_T_Encaps
236 # log the localsid counters
237 self.logger.info(self.vapi.cli("show sr localsid"))
240 pol_steering.remove_vpp_config()
241 self.logger.info(self.vapi.cli("show sr steering policies"))
244 self.sr_policy.remove_vpp_config()
245 self.logger.info(self.vapi.cli("show sr policies"))
251 self.teardown_interfaces()
253 def test_SRv6_T_Encaps_with_v6src(self):
254 """Test SRv6 Transit.Encaps behavior for IPv6 and select multiple src v6addr case."""
255 # send traffic to one destination interface
256 # source and destination are IPv6 only
257 self.setup_interfaces(ipv6=[True, True])
259 # configure FIB entries
261 self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
263 route.add_vpp_config()
265 # configure encaps IPv6 source address
266 # needs to be done before SR Policy config
268 self.vapi.cli("set sr encaps source addr a3::")
271 other_src_ip = "b1::"
272 # configure SRv6 Policy
273 # Note: segment list order: first -> last
274 sr_policy = VppSRv6PolicyV2(
278 sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
281 segments=["a4::", "a5::", "a6::c7"],
282 encap_src=other_src_ip,
285 sr_policy.add_vpp_config()
286 self.sr_policy = sr_policy
288 # log the sr policies
289 self.logger.info(self.vapi.cli("show sr policies"))
291 # steer IPv6 traffic to a7::/64 into SRv6 Policy
292 # use the bsid of the above self.sr_policy
293 pol_steering = VppSRv6Steering(
295 bsid=self.sr_policy.bsid,
298 traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
303 pol_steering.add_vpp_config()
305 # log the sr steering policies
306 self.logger.info(self.vapi.cli("show sr steering-policies"))
309 count = len(self.pg_packet_sizes)
310 dst_inner = "a7::1234"
313 # create IPv6 packets without SRH
314 packet_header = self.create_packet_header_IPv6(dst_inner)
315 # create traffic stream pg0->pg1
318 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
322 # create IPv6 packets with SRH
323 # packets with segments-left 1, active segment a7::
324 packet_header = self.create_packet_header_IPv6_SRH(
325 sidlist=["a8::", "a7::", "a6::"], segleft=1
327 # create traffic stream pg0->pg1
330 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
334 # create IPv6 packets with SRH and IPv6
335 # packets with segments-left 1, active segment a7::
336 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
337 dst_inner, sidlist=["a8::", "a7::", "a6::"], segleft=1
339 # create traffic stream pg0->pg1
342 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
346 # send packets and verify received packets
347 self.send_and_verify_pkts(
348 self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_T_Encaps
351 # log the localsid counters
352 self.logger.info(self.vapi.cli("show sr localsid"))
355 pol_steering.remove_vpp_config()
356 self.logger.info(self.vapi.cli("show sr steering-policies"))
359 self.sr_policy.remove_vpp_config()
360 self.logger.info(self.vapi.cli("show sr policies"))
366 self.teardown_interfaces()
368 @unittest.skipUnless(0, "PC to fix")
369 def test_SRv6_T_Insert(self):
370 """Test SRv6 Transit.Insert behavior (IPv6 only)."""
371 # send traffic to one destination interface
372 # source and destination are IPv6 only
373 self.setup_interfaces(ipv6=[True, True])
375 # configure FIB entries
377 self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
379 route.add_vpp_config()
381 # configure encaps IPv6 source address
382 # needs to be done before SR Policy config
384 self.vapi.cli("set sr encaps source addr a3::")
387 # configure SRv6 Policy
388 # Note: segment list order: first -> last
389 sr_policy = VppSRv6Policy(
393 sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
396 segments=["a4::", "a5::", "a6::c7"],
399 sr_policy.add_vpp_config()
400 self.sr_policy = sr_policy
402 # log the sr policies
403 self.logger.info(self.vapi.cli("show sr policies"))
405 # steer IPv6 traffic to a7::/64 into SRv6 Policy
406 # use the bsid of the above self.sr_policy
407 pol_steering = VppSRv6Steering(
409 bsid=self.sr_policy.bsid,
412 traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
417 pol_steering.add_vpp_config()
419 # log the sr steering policies
420 self.logger.info(self.vapi.cli("show sr steering policies"))
423 count = len(self.pg_packet_sizes)
424 dst_inner = "a7::1234"
427 # create IPv6 packets without SRH
428 packet_header = self.create_packet_header_IPv6(dst_inner)
429 # create traffic stream pg0->pg1
432 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
436 # create IPv6 packets with SRH
437 # packets with segments-left 1, active segment a7::
438 packet_header = self.create_packet_header_IPv6_SRH(
439 sidlist=["a8::", "a7::", "a6::"], segleft=1
441 # create traffic stream pg0->pg1
444 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
448 # send packets and verify received packets
449 self.send_and_verify_pkts(
450 self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_T_Insert
453 # log the localsid counters
454 self.logger.info(self.vapi.cli("show sr localsid"))
457 pol_steering.remove_vpp_config()
458 self.logger.info(self.vapi.cli("show sr steering policies"))
461 self.sr_policy.remove_vpp_config()
462 self.logger.info(self.vapi.cli("show sr policies"))
468 self.teardown_interfaces()
470 @unittest.skipUnless(0, "PC to fix")
471 def test_SRv6_T_Encaps_IPv4(self):
472 """Test SRv6 Transit.Encaps behavior for IPv4."""
473 # send traffic to one destination interface
474 # source interface is IPv4 only
475 # destination interface is IPv6 only
476 self.setup_interfaces(ipv6=[False, True], ipv4=[True, False])
478 # configure FIB entries
480 self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
482 route.add_vpp_config()
484 # configure encaps IPv6 source address
485 # needs to be done before SR Policy config
487 self.vapi.cli("set sr encaps source addr a3::")
490 # configure SRv6 Policy
491 # Note: segment list order: first -> last
492 sr_policy = VppSRv6Policy(
496 sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
499 segments=["a4::", "a5::", "a6::c7"],
502 sr_policy.add_vpp_config()
503 self.sr_policy = sr_policy
505 # log the sr policies
506 self.logger.info(self.vapi.cli("show sr policies"))
508 # steer IPv4 traffic to 7.1.1.0/24 into SRv6 Policy
509 # use the bsid of the above self.sr_policy
510 pol_steering = VppSRv6Steering(
512 bsid=self.sr_policy.bsid,
515 traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV4,
520 pol_steering.add_vpp_config()
522 # log the sr steering policies
523 self.logger.info(self.vapi.cli("show sr steering policies"))
526 count = len(self.pg_packet_sizes)
527 dst_inner = "7.1.1.123"
530 # create IPv4 packets
531 packet_header = self.create_packet_header_IPv4(dst_inner)
532 # create traffic stream pg0->pg1
535 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
539 # send packets and verify received packets
540 self.send_and_verify_pkts(
541 self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_T_Encaps_IPv4
544 # log the localsid counters
545 self.logger.info(self.vapi.cli("show sr localsid"))
548 pol_steering.remove_vpp_config()
549 self.logger.info(self.vapi.cli("show sr steering policies"))
552 self.sr_policy.remove_vpp_config()
553 self.logger.info(self.vapi.cli("show sr policies"))
559 self.teardown_interfaces()
561 @unittest.skip("VPP crashes after running this test")
562 def test_SRv6_T_Encaps_L2(self):
563 """Test SRv6 Transit.Encaps behavior for L2."""
564 # send traffic to one destination interface
565 # source interface is IPv4 only TODO?
566 # destination interface is IPv6 only
567 self.setup_interfaces(ipv6=[False, True], ipv4=[False, False])
569 # configure FIB entries
571 self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
573 route.add_vpp_config()
575 # configure encaps IPv6 source address
576 # needs to be done before SR Policy config
578 self.vapi.cli("set sr encaps source addr a3::")
581 # configure SRv6 Policy
582 # Note: segment list order: first -> last
583 sr_policy = VppSRv6Policy(
587 sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
590 segments=["a4::", "a5::", "a6::c7"],
593 sr_policy.add_vpp_config()
594 self.sr_policy = sr_policy
596 # log the sr policies
597 self.logger.info(self.vapi.cli("show sr policies"))
599 # steer L2 traffic into SRv6 Policy
600 # use the bsid of the above self.sr_policy
601 pol_steering = VppSRv6Steering(
603 bsid=self.sr_policy.bsid,
606 traffic_type=SRv6PolicySteeringTypes.SR_STEER_L2,
609 sw_if_index=self.pg0.sw_if_index,
611 pol_steering.add_vpp_config()
613 # log the sr steering policies
614 self.logger.info(self.vapi.cli("show sr steering policies"))
617 count = len(self.pg_packet_sizes)
620 # create L2 packets without dot1q header
621 packet_header = self.create_packet_header_L2()
622 # create traffic stream pg0->pg1
625 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
629 # create L2 packets with dot1q header
630 packet_header = self.create_packet_header_L2(vlan=123)
631 # create traffic stream pg0->pg1
634 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
638 # send packets and verify received packets
639 self.send_and_verify_pkts(
640 self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_T_Encaps_L2
643 # log the localsid counters
644 self.logger.info(self.vapi.cli("show sr localsid"))
647 pol_steering.remove_vpp_config()
648 self.logger.info(self.vapi.cli("show sr steering policies"))
651 self.sr_policy.remove_vpp_config()
652 self.logger.info(self.vapi.cli("show sr policies"))
658 self.teardown_interfaces()
660 def test_SRv6_End(self):
661 """Test SRv6 End (without PSP) behavior."""
662 # send traffic to one destination interface
663 # source and destination interfaces are IPv6 only
664 self.setup_interfaces(ipv6=[True, True])
666 # configure FIB entries
668 self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
670 route.add_vpp_config()
672 # configure SRv6 localSID End without PSP behavior
673 localsid = VppSRv6LocalSID(
676 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
683 localsid.add_vpp_config()
685 self.logger.debug(self.vapi.cli("show sr localsid"))
687 # create IPv6 packets with SRH (SL=2, SL=1, SL=0)
688 # send one packet per SL value per packet size
689 # SL=0 packet with localSID End with USP needs 2nd SRH
690 count = len(self.pg_packet_sizes)
691 dst_inner = "a4::1234"
694 # packets with segments-left 2, active segment a3::
695 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
696 dst_inner, sidlist=["a5::", "a4::", "a3::"], segleft=2
698 # create traffic stream pg0->pg1
701 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
705 # packets with segments-left 1, active segment a3::
706 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
707 dst_inner, sidlist=["a4::", "a3::", "a2::"], segleft=1
709 # add to traffic stream pg0->pg1
712 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
716 # TODO: test behavior with SL=0 packet (needs 2*SRH?)
718 expected_count = len(pkts)
720 # packets without SRH (should not crash)
721 packet_header = self.create_packet_header_IPv6("a3::")
722 # create traffic stream pg0->pg1
725 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
729 # send packets and verify received packets
730 self.send_and_verify_pkts(
734 self.compare_rx_tx_packet_End,
735 expected_count=expected_count,
738 # log the localsid counters
739 self.logger.info(self.vapi.cli("show sr localsid"))
741 # remove SRv6 localSIDs
742 localsid.remove_vpp_config()
748 self.teardown_interfaces()
750 def test_SRv6_End_with_PSP(self):
751 """Test SRv6 End with PSP behavior."""
752 # send traffic to one destination interface
753 # source and destination interfaces are IPv6 only
754 self.setup_interfaces(ipv6=[True, True])
756 # configure FIB entries
758 self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
760 route.add_vpp_config()
762 # configure SRv6 localSID End with PSP behavior
763 localsid = VppSRv6LocalSID(
766 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
773 localsid.add_vpp_config()
775 self.logger.debug(self.vapi.cli("show sr localsid"))
777 # create IPv6 packets with SRH (SL=2, SL=1)
778 # send one packet per SL value per packet size
779 # SL=0 packet with localSID End with PSP is dropped
780 count = len(self.pg_packet_sizes)
781 dst_inner = "a4::1234"
784 # packets with segments-left 2, active segment a3::
785 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
786 dst_inner, sidlist=["a5::", "a4::", "a3::"], segleft=2
788 # create traffic stream pg0->pg1
791 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
795 # packets with segments-left 1, active segment a3::
796 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
797 dst_inner, sidlist=["a4::", "a3::", "a2::"], segleft=1
799 # add to traffic stream pg0->pg1
802 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
806 # send packets and verify received packets
807 self.send_and_verify_pkts(
808 self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_End_PSP
811 # log the localsid counters
812 self.logger.info(self.vapi.cli("show sr localsid"))
814 # remove SRv6 localSIDs
815 localsid.remove_vpp_config()
821 self.teardown_interfaces()
823 def test_SRv6_End_X(self):
824 """Test SRv6 End.X (without PSP) behavior."""
825 # create three interfaces (1 source, 2 destinations)
826 # source and destination interfaces are IPv6 only
827 self.setup_interfaces(ipv6=[True, True, True])
829 # configure FIB entries
830 # a4::/64 via pg1 and pg2
836 VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index),
837 VppRoutePath(self.pg2.remote_ip6, self.pg2.sw_if_index),
840 route.add_vpp_config()
841 self.logger.debug(self.vapi.cli("show ip6 fib"))
843 # configure SRv6 localSID End.X without PSP behavior
844 # End.X points to interface pg1
845 localsid = VppSRv6LocalSID(
848 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
849 nh_addr=self.pg1.remote_ip6,
851 sw_if_index=self.pg1.sw_if_index,
855 localsid.add_vpp_config()
857 self.logger.debug(self.vapi.cli("show sr localsid"))
859 # create IPv6 packets with SRH (SL=2, SL=1)
860 # send one packet per SL value per packet size
861 # SL=0 packet with localSID End with PSP is dropped
862 count = len(self.pg_packet_sizes)
863 dst_inner = "a4::1234"
866 # packets with segments-left 2, active segment a3::c4
867 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
868 dst_inner, sidlist=["a5::", "a4::", "a3::c4"], segleft=2
870 # create traffic stream pg0->pg1
873 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
877 # packets with segments-left 1, active segment a3::c4
878 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
879 dst_inner, sidlist=["a4::", "a3::c4", "a2::"], segleft=1
881 # add to traffic stream pg0->pg1
884 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
888 # send packets and verify received packets
889 # using same comparison function as End (no PSP)
890 self.send_and_verify_pkts(
891 self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_End
894 # assert nothing was received on the other interface (pg2)
895 self.pg2.assert_nothing_captured(remark="mis-directed packet(s)")
897 # log the localsid counters
898 self.logger.info(self.vapi.cli("show sr localsid"))
900 # remove SRv6 localSIDs
901 localsid.remove_vpp_config()
907 self.teardown_interfaces()
909 def test_SRv6_End_X_with_PSP(self):
910 """Test SRv6 End.X with PSP behavior."""
911 # create three interfaces (1 source, 2 destinations)
912 # source and destination interfaces are IPv6 only
913 self.setup_interfaces(ipv6=[True, True, True])
915 # configure FIB entries
916 # a4::/64 via pg1 and pg2
922 VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index),
923 VppRoutePath(self.pg2.remote_ip6, self.pg2.sw_if_index),
926 route.add_vpp_config()
928 # configure SRv6 localSID End with PSP behavior
929 localsid = VppSRv6LocalSID(
932 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
933 nh_addr=self.pg1.remote_ip6,
935 sw_if_index=self.pg1.sw_if_index,
939 localsid.add_vpp_config()
941 self.logger.debug(self.vapi.cli("show sr localsid"))
943 # create IPv6 packets with SRH (SL=2, SL=1)
944 # send one packet per SL value per packet size
945 # SL=0 packet with localSID End with PSP is dropped
946 count = len(self.pg_packet_sizes)
947 dst_inner = "a4::1234"
950 # packets with segments-left 2, active segment a3::
951 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
952 dst_inner, sidlist=["a5::", "a4::", "a3::c4"], segleft=2
954 # create traffic stream pg0->pg1
957 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
961 # packets with segments-left 1, active segment a3::
962 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
963 dst_inner, sidlist=["a4::", "a3::c4", "a2::"], segleft=1
965 # add to traffic stream pg0->pg1
968 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
972 # send packets and verify received packets
973 # using same comparison function as End with PSP
974 self.send_and_verify_pkts(
975 self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_End_PSP
978 # assert nothing was received on the other interface (pg2)
979 self.pg2.assert_nothing_captured(remark="mis-directed packet(s)")
981 # log the localsid counters
982 self.logger.info(self.vapi.cli("show sr localsid"))
984 # remove SRv6 localSIDs
985 localsid.remove_vpp_config()
991 self.teardown_interfaces()
993 def test_SRv6_End_DX6(self):
994 """Test SRv6 End.DX6 behavior."""
995 # send traffic to one destination interface
996 # source and destination interfaces are IPv6 only
997 self.setup_interfaces(ipv6=[True, True])
999 # configure SRv6 localSID End.DX6 behavior
1000 localsid = VppSRv6LocalSID(
1003 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX6,
1004 nh_addr=self.pg1.remote_ip6,
1006 sw_if_index=self.pg1.sw_if_index,
1010 localsid.add_vpp_config()
1012 self.logger.debug(self.vapi.cli("show sr localsid"))
1014 # create IPv6 packets with SRH (SL=0)
1015 # send one packet per packet size
1016 count = len(self.pg_packet_sizes)
1017 dst_inner = "a4::1234" # inner header destination address
1020 # packets with SRH, segments-left 0, active segment a3::c4
1021 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
1022 dst_inner, sidlist=["a3::c4", "a2::", "a1::"], segleft=0
1024 # add to traffic stream pg0->pg1
1027 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
1031 # packets without SRH, IPv6 in IPv6
1032 # outer IPv6 dest addr is the localsid End.DX6
1033 packet_header = self.create_packet_header_IPv6_IPv6(
1034 dst_inner, dst_outer="a3::c4"
1036 # add to traffic stream pg0->pg1
1039 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
1043 # send packets and verify received packets
1044 self.send_and_verify_pkts(
1045 self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_End_DX6
1048 # log the localsid counters
1049 self.logger.info(self.vapi.cli("show sr localsid"))
1051 # remove SRv6 localSIDs
1052 localsid.remove_vpp_config()
1054 # cleanup interfaces
1055 self.teardown_interfaces()
1057 def test_SRv6_End_DT6(self):
1058 """Test SRv6 End.DT6 behavior."""
1059 # create three interfaces (1 source, 2 destinations)
1060 # all interfaces are IPv6 only
1061 # source interface in global FIB (0)
1062 # destination interfaces in global and vrf
1064 ipt = VppIpTable(self, vrf_1, is_ip6=True)
1065 ipt.add_vpp_config()
1066 self.setup_interfaces(ipv6=[True, True, True], ipv6_table_id=[0, 0, vrf_1])
1068 # configure FIB entries
1069 # a4::/64 is reachable
1070 # via pg1 in table 0 (global)
1071 # and via pg2 in table vrf_1
1072 route0 = VppIpRoute(
1076 [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index, nh_table_id=0)],
1079 route0.add_vpp_config()
1080 route1 = VppIpRoute(
1086 self.pg2.remote_ip6, self.pg2.sw_if_index, nh_table_id=vrf_1
1091 route1.add_vpp_config()
1092 self.logger.debug(self.vapi.cli("show ip6 fib"))
1094 # configure SRv6 localSID End.DT6 behavior
1096 # fib_table: where the localsid is installed
1097 # sw_if_index: in T-variants of localsid this is the vrf table_id
1098 localsid = VppSRv6LocalSID(
1101 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT6,
1108 localsid.add_vpp_config()
1110 self.logger.debug(self.vapi.cli("show sr localsid"))
1112 # create IPv6 packets with SRH (SL=0)
1113 # send one packet per packet size
1114 count = len(self.pg_packet_sizes)
1115 dst_inner = "a4::1234" # inner header destination address
1118 # packets with SRH, segments-left 0, active segment a3::c4
1119 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
1120 dst_inner, sidlist=["a3::c4", "a2::", "a1::"], segleft=0
1122 # add to traffic stream pg0->pg1
1125 self.pg0, self.pg2, packet_header, self.pg_packet_sizes, count
1129 # packets without SRH, IPv6 in IPv6
1130 # outer IPv6 dest addr is the localsid End.DT6
1131 packet_header = self.create_packet_header_IPv6_IPv6(
1132 dst_inner, dst_outer="a3::c4"
1134 # add to traffic stream pg0->pg1
1137 self.pg0, self.pg2, packet_header, self.pg_packet_sizes, count
1141 # send packets and verify received packets
1142 # using same comparison function as End.DX6
1143 self.send_and_verify_pkts(
1144 self.pg0, pkts, self.pg2, self.compare_rx_tx_packet_End_DX6
1147 # assert nothing was received on the other interface (pg2)
1148 self.pg1.assert_nothing_captured(remark="mis-directed packet(s)")
1150 # log the localsid counters
1151 self.logger.info(self.vapi.cli("show sr localsid"))
1153 # remove SRv6 localSIDs
1154 localsid.remove_vpp_config()
1156 # remove FIB entries
1159 # cleanup interfaces
1160 self.teardown_interfaces()
1162 def test_SRv6_End_DX4(self):
1163 """Test SRv6 End.DX4 behavior."""
1164 # send traffic to one destination interface
1165 # source interface is IPv6 only
1166 # destination interface is IPv4 only
1167 self.setup_interfaces(ipv6=[True, False], ipv4=[False, True])
1169 # configure SRv6 localSID End.DX4 behavior
1170 localsid = VppSRv6LocalSID(
1173 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX4,
1174 nh_addr=self.pg1.remote_ip4,
1176 sw_if_index=self.pg1.sw_if_index,
1180 localsid.add_vpp_config()
1182 self.logger.debug(self.vapi.cli("show sr localsid"))
1184 # send one packet per packet size
1185 count = len(self.pg_packet_sizes)
1186 dst_inner = "4.1.1.123" # inner header destination address
1189 # packets with SRH, segments-left 0, active segment a3::c4
1190 packet_header = self.create_packet_header_IPv6_SRH_IPv4(
1191 dst_inner, sidlist=["a3::c4", "a2::", "a1::"], segleft=0
1193 # add to traffic stream pg0->pg1
1196 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
1200 # packets without SRH, IPv4 in IPv6
1201 # outer IPv6 dest addr is the localsid End.DX4
1202 packet_header = self.create_packet_header_IPv6_IPv4(
1203 dst_inner, dst_outer="a3::c4"
1205 # add to traffic stream pg0->pg1
1208 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
1212 # send packets and verify received packets
1213 self.send_and_verify_pkts(
1214 self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_End_DX4
1217 # log the localsid counters
1218 self.logger.info(self.vapi.cli("show sr localsid"))
1220 # remove SRv6 localSIDs
1221 localsid.remove_vpp_config()
1223 # cleanup interfaces
1224 self.teardown_interfaces()
1226 def test_SRv6_End_DT4(self):
1227 """Test SRv6 End.DT4 behavior."""
1228 # create three interfaces (1 source, 2 destinations)
1229 # source interface is IPv6-only
1230 # destination interfaces are IPv4 only
1231 # source interface in global FIB (0)
1232 # destination interfaces in global and vrf
1234 ipt = VppIpTable(self, vrf_1)
1235 ipt.add_vpp_config()
1236 self.setup_interfaces(
1237 ipv6=[True, False, False],
1238 ipv4=[False, True, True],
1239 ipv6_table_id=[0, 0, 0],
1240 ipv4_table_id=[0, 0, vrf_1],
1243 # configure FIB entries
1244 # 4.1.1.0/24 is reachable
1245 # via pg1 in table 0 (global)
1246 # and via pg2 in table vrf_1
1247 route0 = VppIpRoute(
1251 [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index, nh_table_id=0)],
1254 route0.add_vpp_config()
1255 route1 = VppIpRoute(
1261 self.pg2.remote_ip4, self.pg2.sw_if_index, nh_table_id=vrf_1
1266 route1.add_vpp_config()
1267 self.logger.debug(self.vapi.cli("show ip fib"))
1269 # configure SRv6 localSID End.DT6 behavior
1271 # fib_table: where the localsid is installed
1272 # sw_if_index: in T-variants of localsid: vrf table_id
1273 localsid = VppSRv6LocalSID(
1276 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT4,
1283 localsid.add_vpp_config()
1285 self.logger.debug(self.vapi.cli("show sr localsid"))
1287 # create IPv6 packets with SRH (SL=0)
1288 # send one packet per packet size
1289 count = len(self.pg_packet_sizes)
1290 dst_inner = "4.1.1.123" # inner header destination address
1293 # packets with SRH, segments-left 0, active segment a3::c4
1294 packet_header = self.create_packet_header_IPv6_SRH_IPv4(
1295 dst_inner, sidlist=["a3::c4", "a2::", "a1::"], segleft=0
1297 # add to traffic stream pg0->pg1
1300 self.pg0, self.pg2, packet_header, self.pg_packet_sizes, count
1304 # packets without SRH, IPv6 in IPv6
1305 # outer IPv6 dest addr is the localsid End.DX4
1306 packet_header = self.create_packet_header_IPv6_IPv4(
1307 dst_inner, dst_outer="a3::c4"
1309 # add to traffic stream pg0->pg1
1312 self.pg0, self.pg2, packet_header, self.pg_packet_sizes, count
1316 # send packets and verify received packets
1317 # using same comparison function as End.DX4
1318 self.send_and_verify_pkts(
1319 self.pg0, pkts, self.pg2, self.compare_rx_tx_packet_End_DX4
1322 # assert nothing was received on the other interface (pg2)
1323 self.pg1.assert_nothing_captured(remark="mis-directed packet(s)")
1325 # log the localsid counters
1326 self.logger.info(self.vapi.cli("show sr localsid"))
1328 # remove SRv6 localSIDs
1329 localsid.remove_vpp_config()
1331 # remove FIB entries
1334 # cleanup interfaces
1335 self.teardown_interfaces()
1337 def test_SRv6_End_DX2(self):
1338 """Test SRv6 End.DX2 behavior."""
1339 # send traffic to one destination interface
1340 # source interface is IPv6 only
1341 self.setup_interfaces(ipv6=[True, False], ipv4=[False, False])
1343 # configure SRv6 localSID End.DX2 behavior
1344 localsid = VppSRv6LocalSID(
1347 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX2,
1350 sw_if_index=self.pg1.sw_if_index,
1354 localsid.add_vpp_config()
1356 self.logger.debug(self.vapi.cli("show sr localsid"))
1358 # send one packet per packet size
1359 count = len(self.pg_packet_sizes)
1362 # packets with SRH, segments-left 0, active segment a3::c4
1363 # L2 has no dot1q header
1364 packet_header = self.create_packet_header_IPv6_SRH_L2(
1365 sidlist=["a3::c4", "a2::", "a1::"], segleft=0, vlan=0
1367 # add to traffic stream pg0->pg1
1370 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
1374 # packets with SRH, segments-left 0, active segment a3::c4
1375 # L2 has dot1q header
1376 packet_header = self.create_packet_header_IPv6_SRH_L2(
1377 sidlist=["a3::c4", "a2::", "a1::"], segleft=0, vlan=123
1379 # add to traffic stream pg0->pg1
1382 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
1386 # packets without SRH, L2 in IPv6
1387 # outer IPv6 dest addr is the localsid End.DX2
1388 # L2 has no dot1q header
1389 packet_header = self.create_packet_header_IPv6_L2(dst_outer="a3::c4", vlan=0)
1390 # add to traffic stream pg0->pg1
1393 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
1397 # packets without SRH, L2 in IPv6
1398 # outer IPv6 dest addr is the localsid End.DX2
1399 # L2 has dot1q header
1400 packet_header = self.create_packet_header_IPv6_L2(dst_outer="a3::c4", vlan=123)
1401 # add to traffic stream pg0->pg1
1404 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
1408 # send packets and verify received packets
1409 self.send_and_verify_pkts(
1410 self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_End_DX2
1413 # log the localsid counters
1414 self.logger.info(self.vapi.cli("show sr localsid"))
1416 # remove SRv6 localSIDs
1417 localsid.remove_vpp_config()
1419 # cleanup interfaces
1420 self.teardown_interfaces()
1422 @unittest.skipUnless(0, "PC to fix")
1423 def test_SRv6_T_Insert_Classifier(self):
1424 """Test SRv6 Transit.Insert behavior (IPv6 only).
1425 steer packets using the classifier
1427 # send traffic to one destination interface
1428 # source and destination are IPv6 only
1429 self.setup_interfaces(ipv6=[False, False, False, True, True])
1431 # configure FIB entries
1433 self, "a4::", 64, [VppRoutePath(self.pg4.remote_ip6, self.pg4.sw_if_index)]
1435 route.add_vpp_config()
1437 # configure encaps IPv6 source address
1438 # needs to be done before SR Policy config
1440 self.vapi.cli("set sr encaps source addr a3::")
1443 # configure SRv6 Policy
1444 # Note: segment list order: first -> last
1445 sr_policy = VppSRv6Policy(
1449 sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
1452 segments=["a4::", "a5::", "a6::c7"],
1455 sr_policy.add_vpp_config()
1456 self.sr_policy = sr_policy
1458 # log the sr policies
1459 self.logger.info(self.vapi.cli("show sr policies"))
1461 # add classify table
1462 # mask on dst ip address prefix a7::/8
1463 mask = "{!s:0<16}".format("ff")
1464 r = self.vapi.classify_add_del_table(
1466 binascii.unhexlify(mask),
1467 match_n_vectors=(len(mask) - 1) // 32 + 1,
1468 current_data_flag=1,
1471 self.assertIsNotNone(r, "No response msg for add_del_table")
1472 table_index = r.new_table_index
1474 # add the source routing node as a ip6 inacl netxt node
1475 r = self.vapi.add_node_next("ip6-inacl", "sr-pl-rewrite-insert")
1476 inacl_next_node_index = r.node_index
1478 match = "{!s:0<16}".format("a7")
1479 r = self.vapi.classify_add_del_session(
1482 binascii.unhexlify(match),
1483 hit_next_index=inacl_next_node_index,
1487 self.assertIsNotNone(r, "No response msg for add_del_session")
1489 # log the classify table used in the steering policy
1490 self.logger.info(self.vapi.cli("show classify table"))
1492 r = self.vapi.input_acl_set_interface(
1493 is_add=1, sw_if_index=self.pg3.sw_if_index, ip6_table_index=table_index
1495 self.assertIsNotNone(r, "No response msg for input_acl_set_interface")
1498 self.logger.info(self.vapi.cli("show inacl type ip6"))
1501 count = len(self.pg_packet_sizes)
1502 dst_inner = "a7::1234"
1505 # create IPv6 packets without SRH
1506 packet_header = self.create_packet_header_IPv6(dst_inner)
1507 # create traffic stream pg3->pg4
1510 self.pg3, self.pg4, packet_header, self.pg_packet_sizes, count
1514 # create IPv6 packets with SRH
1515 # packets with segments-left 1, active segment a7::
1516 packet_header = self.create_packet_header_IPv6_SRH(
1517 sidlist=["a8::", "a7::", "a6::"], segleft=1
1519 # create traffic stream pg3->pg4
1522 self.pg3, self.pg4, packet_header, self.pg_packet_sizes, count
1526 # send packets and verify received packets
1527 self.send_and_verify_pkts(
1528 self.pg3, pkts, self.pg4, self.compare_rx_tx_packet_T_Insert
1531 # remove the interface l2 input feature
1532 r = self.vapi.input_acl_set_interface(
1533 is_add=0, sw_if_index=self.pg3.sw_if_index, ip6_table_index=table_index
1535 self.assertIsNotNone(r, "No response msg for input_acl_set_interface")
1537 # log the ip6 inacl after cleaning
1538 self.logger.info(self.vapi.cli("show inacl type ip6"))
1540 # log the localsid counters
1541 self.logger.info(self.vapi.cli("show sr localsid"))
1543 # remove classifier SR steering
1544 # classifier_steering.remove_vpp_config()
1545 self.logger.info(self.vapi.cli("show sr steering policies"))
1547 # remove SR Policies
1548 self.sr_policy.remove_vpp_config()
1549 self.logger.info(self.vapi.cli("show sr policies"))
1551 # remove classify session and table
1552 r = self.vapi.classify_add_del_session(
1553 0, table_index, binascii.unhexlify(match)
1555 self.assertIsNotNone(r, "No response msg for add_del_session")
1557 r = self.vapi.classify_add_del_table(
1558 0, binascii.unhexlify(mask), table_index=table_index
1560 self.assertIsNotNone(r, "No response msg for add_del_table")
1562 self.logger.info(self.vapi.cli("show classify table"))
1564 # remove FIB entries
1567 # cleanup interfaces
1568 self.teardown_interfaces()
1570 def compare_rx_tx_packet_T_Encaps(self, tx_pkt, rx_pkt):
1571 """Compare input and output packet after passing T.Encaps
1573 :param tx_pkt: transmitted packet
1574 :param rx_pkt: received packet
1576 # T.Encaps updates the headers as follows:
1577 # SR Policy seglist (S3, S2, S1)
1578 # SR Policy source C
1581 # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(A, B2)
1583 # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1)
1584 # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(a, B2)SRH(B3, B2, B1; SL=1)
1586 # get first (outer) IPv6 header of rx'ed packet
1587 rx_ip = rx_pkt.getlayer(IPv6)
1590 tx_ip = tx_pkt.getlayer(IPv6)
1592 # expected segment-list
1593 seglist = self.sr_policy.segments
1594 # reverse list to get order as in SRH
1595 tx_seglist = seglist[::-1]
1597 # get source address of SR Policy
1598 sr_policy_source = self.sr_policy.source
1600 # rx'ed packet should have SRH
1601 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1603 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1605 # received ip.src should be equal to SR Policy source
1606 self.assertEqual(rx_ip.src, sr_policy_source)
1607 # received ip.dst should be equal to expected sidlist[lastentry]
1608 self.assertEqual(rx_ip.dst, tx_seglist[-1])
1609 # rx'ed seglist should be equal to expected seglist
1610 self.assertEqual(rx_srh.addresses, tx_seglist)
1611 # segleft should be equal to size expected seglist-1
1612 self.assertEqual(rx_srh.segleft, len(tx_seglist) - 1)
1613 # segleft should be equal to lastentry
1614 self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1616 # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1617 # except for the hop-limit field
1618 # -> update tx'ed hlim to the expected hlim
1619 tx_ip.hlim = tx_ip.hlim - 1
1621 self.assertEqual(rx_srh.payload, tx_ip)
1623 self.logger.debug("packet verification: SUCCESS")
1625 def compare_rx_tx_packet_T_Encaps_IPv4(self, tx_pkt, rx_pkt):
1626 """Compare input and output packet after passing T.Encaps for IPv4
1628 :param tx_pkt: transmitted packet
1629 :param rx_pkt: received packet
1631 # T.Encaps for IPv4 updates the headers as follows:
1632 # SR Policy seglist (S3, S2, S1)
1633 # SR Policy source C
1636 # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv4(A, B2)
1638 # get first (outer) IPv6 header of rx'ed packet
1639 rx_ip = rx_pkt.getlayer(IPv6)
1642 tx_ip = tx_pkt.getlayer(IP)
1644 # expected segment-list
1645 seglist = self.sr_policy.segments
1646 # reverse list to get order as in SRH
1647 tx_seglist = seglist[::-1]
1649 # get source address of SR Policy
1650 sr_policy_source = self.sr_policy.source
1652 # checks common to cases tx with and without SRH
1653 # rx'ed packet should have SRH and IPv4 header
1654 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1655 self.assertTrue(rx_ip.payload.haslayer(IP))
1657 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1659 # received ip.src should be equal to SR Policy source
1660 self.assertEqual(rx_ip.src, sr_policy_source)
1661 # received ip.dst should be equal to sidlist[lastentry]
1662 self.assertEqual(rx_ip.dst, tx_seglist[-1])
1663 # rx'ed seglist should be equal to seglist
1664 self.assertEqual(rx_srh.addresses, tx_seglist)
1665 # segleft should be equal to size seglist-1
1666 self.assertEqual(rx_srh.segleft, len(tx_seglist) - 1)
1667 # segleft should be equal to lastentry
1668 self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1670 # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1671 # except for the ttl field and ip checksum
1672 # -> adjust tx'ed ttl to expected ttl
1673 tx_ip.ttl = tx_ip.ttl - 1
1674 # -> set tx'ed ip checksum to None and let scapy recompute
1676 # read back the pkt (with str()) to force computing these fields
1677 # probably other ways to accomplish this are possible
1678 tx_ip = IP(scapy.compat.raw(tx_ip))
1680 self.assertEqual(rx_srh.payload, tx_ip)
1682 self.logger.debug("packet verification: SUCCESS")
1684 def compare_rx_tx_packet_T_Encaps_L2(self, tx_pkt, rx_pkt):
1685 """Compare input and output packet after passing T.Encaps for L2
1687 :param tx_pkt: transmitted packet
1688 :param rx_pkt: received packet
1690 # T.Encaps for L2 updates the headers as follows:
1691 # SR Policy seglist (S3, S2, S1)
1692 # SR Policy source C
1695 # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)L2
1697 # get first (outer) IPv6 header of rx'ed packet
1698 rx_ip = rx_pkt.getlayer(IPv6)
1701 tx_ether = tx_pkt.getlayer(Ether)
1703 # expected segment-list
1704 seglist = self.sr_policy.segments
1705 # reverse list to get order as in SRH
1706 tx_seglist = seglist[::-1]
1708 # get source address of SR Policy
1709 sr_policy_source = self.sr_policy.source
1711 # rx'ed packet should have SRH
1712 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1714 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1716 # received ip.src should be equal to SR Policy source
1717 self.assertEqual(rx_ip.src, sr_policy_source)
1718 # received ip.dst should be equal to sidlist[lastentry]
1719 self.assertEqual(rx_ip.dst, tx_seglist[-1])
1720 # rx'ed seglist should be equal to seglist
1721 self.assertEqual(rx_srh.addresses, tx_seglist)
1722 # segleft should be equal to size seglist-1
1723 self.assertEqual(rx_srh.segleft, len(tx_seglist) - 1)
1724 # segleft should be equal to lastentry
1725 self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1726 # nh should be "No Next Header" (143)
1727 self.assertEqual(rx_srh.nh, 143)
1729 # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1730 self.assertEqual(Ether(scapy.compat.raw(rx_srh.payload)), tx_ether)
1732 self.logger.debug("packet verification: SUCCESS")
1734 def compare_rx_tx_packet_T_Insert(self, tx_pkt, rx_pkt):
1735 """Compare input and output packet after passing T.Insert
1737 :param tx_pkt: transmitted packet
1738 :param rx_pkt: received packet
1740 # T.Insert updates the headers as follows:
1743 # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)
1745 # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1)
1746 # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)SRH(B3, B2, B1; SL=1)
1748 # get first (outer) IPv6 header of rx'ed packet
1749 rx_ip = rx_pkt.getlayer(IPv6)
1754 rx_udp = rx_pkt[UDP]
1756 tx_ip = tx_pkt.getlayer(IPv6)
1759 # some packets have been tx'ed with an SRH, some without it
1760 # get SRH if tx'ed packet has it
1761 if tx_pkt.haslayer(IPv6ExtHdrSegmentRouting):
1762 tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1763 tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1764 tx_udp = tx_pkt[UDP]
1766 # expected segment-list (make copy of SR Policy segment list)
1767 seglist = self.sr_policy.segments[:]
1768 # expected seglist has initial dest addr as last segment
1769 seglist.append(tx_ip.dst)
1770 # reverse list to get order as in SRH
1771 tx_seglist = seglist[::-1]
1773 # get source address of SR Policy
1774 sr_policy_source = self.sr_policy.source
1776 # checks common to cases tx with and without SRH
1777 # rx'ed packet should have SRH and only one IPv6 header
1778 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1779 self.assertFalse(rx_ip.payload.haslayer(IPv6))
1781 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1783 # rx'ed ip.src should be equal to tx'ed ip.src
1784 self.assertEqual(rx_ip.src, tx_ip.src)
1785 # rx'ed ip.dst should be equal to sidlist[lastentry]
1786 self.assertEqual(rx_ip.dst, tx_seglist[-1])
1788 # rx'ed seglist should be equal to expected seglist
1789 self.assertEqual(rx_srh.addresses, tx_seglist)
1790 # segleft should be equal to size(expected seglist)-1
1791 self.assertEqual(rx_srh.segleft, len(tx_seglist) - 1)
1792 # segleft should be equal to lastentry
1793 self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1795 if tx_srh: # packet was tx'ed with SRH
1796 # packet should have 2nd SRH
1797 self.assertTrue(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting))
1799 rx_srh2 = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting, 2)
1801 # rx'ed srh2.addresses should be equal to tx'ed srh.addresses
1802 self.assertEqual(rx_srh2.addresses, tx_srh.addresses)
1803 # rx'ed srh2.segleft should be equal to tx'ed srh.segleft
1804 self.assertEqual(rx_srh2.segleft, tx_srh.segleft)
1805 # rx'ed srh2.lastentry should be equal to tx'ed srh.lastentry
1806 self.assertEqual(rx_srh2.lastentry, tx_srh.lastentry)
1808 else: # packet was tx'ed without SRH
1809 # rx packet should have no other SRH
1810 self.assertFalse(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting))
1812 # UDP layer should be unchanged
1813 self.assertEqual(rx_udp, tx_udp)
1815 self.logger.debug("packet verification: SUCCESS")
1817 def compare_rx_tx_packet_End(self, tx_pkt, rx_pkt):
1818 """Compare input and output packet after passing End (without PSP)
1820 :param tx_pkt: transmitted packet
1821 :param rx_pkt: received packet
1823 # End (no PSP) updates the headers as follows:
1825 # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2)
1826 # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1828 # get first (outer) IPv6 header of rx'ed packet
1829 rx_ip = rx_pkt.getlayer(IPv6)
1832 rx_udp = rx_pkt[UDP]
1834 tx_ip = tx_pkt.getlayer(IPv6)
1835 # we know the packet has been tx'ed
1836 # with an inner IPv6 header and an SRH
1837 tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1838 tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1839 tx_udp = tx_pkt[UDP]
1841 # common checks, regardless of tx segleft value
1842 # rx'ed packet should have 2nd IPv6 header
1843 self.assertTrue(rx_ip.payload.haslayer(IPv6))
1844 # get second (inner) IPv6 header
1845 rx_ip2 = rx_pkt.getlayer(IPv6, 2)
1847 if tx_ip.segleft > 0:
1848 # SRH should NOT have been popped:
1849 # End SID without PSP does not pop SRH if segleft>0
1850 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1851 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1853 # received ip.src should be equal to expected ip.src
1854 self.assertEqual(rx_ip.src, tx_ip.src)
1855 # sidlist should be unchanged
1856 self.assertEqual(rx_srh.addresses, tx_srh.addresses)
1857 # segleft should have been decremented
1858 self.assertEqual(rx_srh.segleft, tx_srh.segleft - 1)
1859 # received ip.dst should be equal to sidlist[segleft]
1860 self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
1861 # lastentry should be unchanged
1862 self.assertEqual(rx_srh.lastentry, tx_srh.lastentry)
1863 # inner IPv6 packet (ip2) should be unchanged
1864 self.assertEqual(rx_ip2.src, tx_ip2.src)
1865 self.assertEqual(rx_ip2.dst, tx_ip2.dst)
1866 # else: # tx_ip.segleft == 0
1867 # TODO: Does this work with 2 SRHs in ingress packet?
1869 # UDP layer should be unchanged
1870 self.assertEqual(rx_udp, tx_udp)
1872 self.logger.debug("packet verification: SUCCESS")
1874 def compare_rx_tx_packet_End_PSP(self, tx_pkt, rx_pkt):
1875 """Compare input and output packet after passing End with PSP
1877 :param tx_pkt: transmitted packet
1878 :param rx_pkt: received packet
1880 # End (PSP) updates the headers as follows:
1881 # IPv6 + SRH (SL>1):
1882 # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2)
1883 # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1884 # IPv6 + SRH (SL=1):
1885 # in: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1888 # get first (outer) IPv6 header of rx'ed packet
1889 rx_ip = rx_pkt.getlayer(IPv6)
1892 rx_udp = rx_pkt[UDP]
1894 tx_ip = tx_pkt.getlayer(IPv6)
1895 # we know the packet has been tx'ed
1896 # with an inner IPv6 header and an SRH
1897 tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1898 tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1899 tx_udp = tx_pkt[UDP]
1901 # common checks, regardless of tx segleft value
1902 self.assertTrue(rx_ip.payload.haslayer(IPv6))
1903 rx_ip2 = rx_pkt.getlayer(IPv6, 2)
1904 # inner IPv6 packet (ip2) should be unchanged
1905 self.assertEqual(rx_ip2.src, tx_ip2.src)
1906 self.assertEqual(rx_ip2.dst, tx_ip2.dst)
1908 if tx_ip.segleft > 1:
1909 # SRH should NOT have been popped:
1910 # End SID with PSP does not pop SRH if segleft>1
1911 # rx'ed packet should have SRH
1912 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1913 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1915 # received ip.src should be equal to expected ip.src
1916 self.assertEqual(rx_ip.src, tx_ip.src)
1917 # sidlist should be unchanged
1918 self.assertEqual(rx_srh.addresses, tx_srh.addresses)
1919 # segleft should have been decremented
1920 self.assertEqual(rx_srh.segleft, tx_srh.segleft - 1)
1921 # received ip.dst should be equal to sidlist[segleft]
1922 self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
1923 # lastentry should be unchanged
1924 self.assertEqual(rx_srh.lastentry, tx_srh.lastentry)
1926 else: # tx_ip.segleft <= 1
1927 # SRH should have been popped:
1928 # End SID with PSP and segleft=1 pops SRH
1929 # the two IPv6 headers are still present
1930 # outer IPv6 header has DA == last segment of popped SRH
1931 # SRH should not be present
1932 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1933 # outer IPv6 header ip.src should be equal to tx'ed ip.src
1934 self.assertEqual(rx_ip.src, tx_ip.src)
1935 # outer IPv6 header ip.dst should be = to tx'ed sidlist[segleft-1]
1936 self.assertEqual(rx_ip.dst, tx_srh.addresses[tx_srh.segleft - 1])
1938 # UDP layer should be unchanged
1939 self.assertEqual(rx_udp, tx_udp)
1941 self.logger.debug("packet verification: SUCCESS")
1943 def compare_rx_tx_packet_End_DX6(self, tx_pkt, rx_pkt):
1944 """Compare input and output packet after passing End.DX6
1946 :param tx_pkt: transmitted packet
1947 :param rx_pkt: received packet
1949 # End.DX6 updates the headers as follows:
1950 # IPv6 + SRH (SL=0):
1951 # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv6(B, D)
1954 # in: IPv6(A, S3)IPv6(B, D)
1957 # get first (outer) IPv6 header of rx'ed packet
1958 rx_ip = rx_pkt.getlayer(IPv6)
1960 tx_ip = tx_pkt.getlayer(IPv6)
1961 tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1963 # verify if rx'ed packet has no SRH
1964 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1966 # the whole rx_ip pkt should be equal to tx_ip2
1967 # except for the hlim field
1968 # -> adjust tx'ed hlim to expected hlim
1969 tx_ip2.hlim = tx_ip2.hlim - 1
1971 self.assertEqual(rx_ip, tx_ip2)
1973 self.logger.debug("packet verification: SUCCESS")
1975 def compare_rx_tx_packet_End_DX4(self, tx_pkt, rx_pkt):
1976 """Compare input and output packet after passing End.DX4
1978 :param tx_pkt: transmitted packet
1979 :param rx_pkt: received packet
1981 # End.DX4 updates the headers as follows:
1982 # IPv6 + SRH (SL=0):
1983 # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv4(B, D)
1986 # in: IPv6(A, S3)IPv4(B, D)
1989 # get IPv4 header of rx'ed packet
1990 rx_ip = rx_pkt.getlayer(IP)
1992 tx_ip = tx_pkt.getlayer(IPv6)
1993 tx_ip2 = tx_pkt.getlayer(IP)
1995 # verify if rx'ed packet has no SRH
1996 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1998 # the whole rx_ip pkt should be equal to tx_ip2
1999 # except for the ttl field and ip checksum
2000 # -> adjust tx'ed ttl to expected ttl
2001 tx_ip2.ttl = tx_ip2.ttl - 1
2002 # -> set tx'ed ip checksum to None and let scapy recompute
2003 tx_ip2.chksum = None
2004 # read back the pkt (with str()) to force computing these fields
2005 # probably other ways to accomplish this are possible
2006 tx_ip2 = IP(scapy.compat.raw(tx_ip2))
2008 self.assertEqual(rx_ip, tx_ip2)
2010 self.logger.debug("packet verification: SUCCESS")
2012 def compare_rx_tx_packet_End_DX2(self, tx_pkt, rx_pkt):
2013 """Compare input and output packet after passing End.DX2
2015 :param tx_pkt: transmitted packet
2016 :param rx_pkt: received packet
2018 # End.DX2 updates the headers as follows:
2019 # IPv6 + SRH (SL=0):
2020 # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)L2
2026 # get IPv4 header of rx'ed packet
2027 rx_eth = rx_pkt.getlayer(Ether)
2029 tx_ip = tx_pkt.getlayer(IPv6)
2030 # we can't just get the 2nd Ether layer
2031 # get the Raw content and dissect it as Ether
2032 tx_eth1 = Ether(scapy.compat.raw(tx_pkt[Raw]))
2034 # verify if rx'ed packet has no SRH
2035 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
2037 # the whole rx_eth pkt should be equal to tx_eth1
2038 self.assertEqual(rx_eth, tx_eth1)
2040 self.logger.debug("packet verification: SUCCESS")
2042 def create_stream(self, src_if, dst_if, packet_header, packet_sizes, count):
2043 """Create SRv6 input packet stream for defined interface.
2045 :param VppInterface src_if: Interface to create packet stream for
2046 :param VppInterface dst_if: destination interface of packet stream
2047 :param packet_header: Layer3 scapy packet headers,
2048 L2 is added when not provided,
2049 Raw(payload) with packet_info is added
2050 :param list packet_sizes: packet stream pckt sizes,sequentially applied
2051 to packets in stream have
2052 :param int count: number of packets in packet stream
2053 :return: list of packets
2055 self.logger.info("Creating packets")
2057 for i in range(0, count - 1):
2058 payload_info = self.create_packet_info(src_if, dst_if)
2059 self.logger.debug("Creating packet with index %d" % (payload_info.index))
2060 payload = self.info_to_payload(payload_info)
2061 # add L2 header if not yet provided in packet_header
2062 if packet_header.getlayer(0).name == "Ethernet":
2063 p = packet_header / Raw(payload)
2066 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
2070 size = packet_sizes[i % len(packet_sizes)]
2071 self.logger.debug("Packet size %d" % (size))
2072 self.extend_packet(p, size)
2073 # we need to store the packet with the automatic fields computed
2074 # read back the dumped packet (with str())
2075 # to force computing these fields
2076 # probably other ways are possible
2077 p = Ether(scapy.compat.raw(p))
2078 payload_info.data = p.copy()
2079 self.logger.debug(ppp("Created packet:", p))
2081 self.logger.info("Done creating packets")
2084 def send_and_verify_pkts(
2085 self, input, pkts, output, compare_func, expected_count=None
2087 """Send packets and verify received packets using compare_func
2089 :param input: ingress interface of DUT
2090 :param pkts: list of packets to transmit
2091 :param output: egress interface of DUT
2092 :param compare_func: function to compare in and out packets
2093 :param expected_count: expected number of captured packets (if
2094 different than len(pkts))
2096 # add traffic stream to input interface
2097 input.add_stream(pkts)
2099 # enable capture on all interfaces
2100 self.pg_enable_capture(self.pg_interfaces)
2103 self.logger.info("Starting traffic")
2106 # get output capture
2107 self.logger.info("Getting packet capture")
2108 capture = output.get_capture(expected_count=expected_count)
2110 # assert nothing was captured on input interface
2111 input.assert_nothing_captured()
2113 # verify captured packets
2114 self.verify_captured_pkts(output, capture, compare_func)
2116 def create_packet_header_IPv6(self, dst):
2117 """Create packet header: IPv6 header, UDP header
2119 :param dst: IPv6 destination address
2121 IPv6 source address is 1234::1
2122 UDP source port and destination port are 1234
2125 p = IPv6(src="1234::1", dst=dst) / UDP(sport=1234, dport=1234)
2128 def create_packet_header_IPv6_SRH(self, sidlist, segleft):
2129 """Create packet header: IPv6 header with SRH, UDP header
2131 :param list sidlist: segment list
2132 :param int segleft: segments-left field value
2134 IPv6 destination address is set to sidlist[segleft]
2135 IPv6 source addresses are 1234::1 and 4321::1
2136 UDP source port and destination port are 1234
2140 IPv6(src="1234::1", dst=sidlist[segleft])
2141 / IPv6ExtHdrSegmentRouting(addresses=sidlist)
2142 / UDP(sport=1234, dport=1234)
2146 def create_packet_header_IPv6_SRH_IPv6(self, dst, sidlist, segleft):
2147 """Create packet header: IPv6 encapsulated in SRv6:
2148 IPv6 header with SRH, IPv6 header, UDP header
2150 :param ipv6address dst: inner IPv6 destination address
2151 :param list sidlist: segment list of outer IPv6 SRH
2152 :param int segleft: segments-left field of outer IPv6 SRH
2154 Outer IPv6 destination address is set to sidlist[segleft]
2155 IPv6 source addresses are 1234::1 and 4321::1
2156 UDP source port and destination port are 1234
2160 IPv6(src="1234::1", dst=sidlist[segleft])
2161 / IPv6ExtHdrSegmentRouting(addresses=sidlist, segleft=segleft, nh=41)
2162 / IPv6(src="4321::1", dst=dst)
2163 / UDP(sport=1234, dport=1234)
2167 def create_packet_header_IPv6_IPv6(self, dst_inner, dst_outer):
2168 """Create packet header: IPv6 encapsulated in IPv6:
2169 IPv6 header, IPv6 header, UDP header
2171 :param ipv6address dst_inner: inner IPv6 destination address
2172 :param ipv6address dst_outer: outer IPv6 destination address
2174 IPv6 source addresses are 1234::1 and 4321::1
2175 UDP source port and destination port are 1234
2179 IPv6(src="1234::1", dst=dst_outer)
2180 / IPv6(src="4321::1", dst=dst_inner)
2181 / UDP(sport=1234, dport=1234)
2185 def create_packet_header_IPv6_SRH_SRH_IPv6(
2186 self, dst, sidlist1, segleft1, sidlist2, segleft2
2188 """Create packet header: IPv6 encapsulated in SRv6 with 2 SRH:
2189 IPv6 header with SRH, 2nd SRH, IPv6 header, UDP header
2191 :param ipv6address dst: inner IPv6 destination address
2192 :param list sidlist1: segment list of outer IPv6 SRH
2193 :param int segleft1: segments-left field of outer IPv6 SRH
2194 :param list sidlist2: segment list of inner IPv6 SRH
2195 :param int segleft2: segments-left field of inner IPv6 SRH
2197 Outer IPv6 destination address is set to sidlist[segleft]
2198 IPv6 source addresses are 1234::1 and 4321::1
2199 UDP source port and destination port are 1234
2203 IPv6(src="1234::1", dst=sidlist1[segleft1])
2204 / IPv6ExtHdrSegmentRouting(addresses=sidlist1, segleft=segleft1, nh=43)
2205 / IPv6ExtHdrSegmentRouting(addresses=sidlist2, segleft=segleft2, nh=41)
2206 / IPv6(src="4321::1", dst=dst)
2207 / UDP(sport=1234, dport=1234)
2211 def create_packet_header_IPv4(self, dst):
2212 """Create packet header: IPv4 header, UDP header
2214 :param dst: IPv4 destination address
2216 IPv4 source address is 123.1.1.1
2217 UDP source port and destination port are 1234
2220 p = IP(src="123.1.1.1", dst=dst) / UDP(sport=1234, dport=1234)
2223 def create_packet_header_IPv6_IPv4(self, dst_inner, dst_outer):
2224 """Create packet header: IPv4 encapsulated in IPv6:
2225 IPv6 header, IPv4 header, UDP header
2227 :param ipv4address dst_inner: inner IPv4 destination address
2228 :param ipv6address dst_outer: outer IPv6 destination address
2230 IPv6 source address is 1234::1
2231 IPv4 source address is 123.1.1.1
2232 UDP source port and destination port are 1234
2236 IPv6(src="1234::1", dst=dst_outer)
2237 / IP(src="123.1.1.1", dst=dst_inner)
2238 / UDP(sport=1234, dport=1234)
2242 def create_packet_header_IPv6_SRH_IPv4(self, dst, sidlist, segleft):
2243 """Create packet header: IPv4 encapsulated in SRv6:
2244 IPv6 header with SRH, IPv4 header, UDP header
2246 :param ipv4address dst: inner IPv4 destination address
2247 :param list sidlist: segment list of outer IPv6 SRH
2248 :param int segleft: segments-left field of outer IPv6 SRH
2250 Outer IPv6 destination address is set to sidlist[segleft]
2251 IPv6 source address is 1234::1
2252 IPv4 source address is 123.1.1.1
2253 UDP source port and destination port are 1234
2257 IPv6(src="1234::1", dst=sidlist[segleft])
2258 / IPv6ExtHdrSegmentRouting(addresses=sidlist, segleft=segleft, nh=4)
2259 / IP(src="123.1.1.1", dst=dst)
2260 / UDP(sport=1234, dport=1234)
2264 def create_packet_header_L2(self, vlan=0):
2265 """Create packet header: L2 header
2267 :param vlan: if vlan!=0 then add 802.1q header
2269 # Note: the dst addr ('00:55:44:33:22:11') is used in
2270 # the compare function compare_rx_tx_packet_T_Encaps_L2
2271 # to detect presence of L2 in SRH payload
2272 p = Ether(src="00:11:22:33:44:55", dst="00:55:44:33:22:11")
2273 etype = 0x8137 # IPX
2276 p /= Dot1Q(vlan=vlan, type=etype)
2281 def create_packet_header_IPv6_SRH_L2(self, sidlist, segleft, vlan=0):
2282 """Create packet header: L2 encapsulated in SRv6:
2283 IPv6 header with SRH, L2
2285 :param list sidlist: segment list of outer IPv6 SRH
2286 :param int segleft: segments-left field of outer IPv6 SRH
2287 :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
2289 Outer IPv6 destination address is set to sidlist[segleft]
2290 IPv6 source address is 1234::1
2292 eth = Ether(src="00:11:22:33:44:55", dst="00:55:44:33:22:11")
2293 etype = 0x8137 # IPX
2296 eth /= Dot1Q(vlan=vlan, type=etype)
2301 IPv6(src="1234::1", dst=sidlist[segleft])
2302 / IPv6ExtHdrSegmentRouting(addresses=sidlist, segleft=segleft, nh=143)
2307 def create_packet_header_IPv6_L2(self, dst_outer, vlan=0):
2308 """Create packet header: L2 encapsulated in IPv6:
2311 :param ipv6address dst_outer: outer IPv6 destination address
2312 :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
2314 eth = Ether(src="00:11:22:33:44:55", dst="00:55:44:33:22:11")
2315 etype = 0x8137 # IPX
2318 eth /= Dot1Q(vlan=vlan, type=etype)
2322 p = IPv6(src="1234::1", dst=dst_outer, nh=143) / eth
2325 def get_payload_info(self, packet):
2326 """Extract the payload_info from the packet"""
2327 # in most cases, payload_info is in packet[Raw]
2328 # but packet[Raw] gives the complete payload
2329 # (incl L2 header) for the T.Encaps L2 case
2331 payload_info = self.payload_to_info(packet[Raw])
2334 # remote L2 header from packet[Raw]:
2335 # take packet[Raw], convert it to an Ether layer
2336 # and then extract Raw from it
2337 payload_info = self.payload_to_info(Ether(scapy.compat.r(packet[Raw]))[Raw])
2341 def verify_captured_pkts(self, dst_if, capture, compare_func):
2343 Verify captured packet stream for specified interface.
2344 Compare ingress with egress packets using the specified compare fn
2346 :param dst_if: egress interface of DUT
2347 :param capture: captured packets
2348 :param compare_func: function to compare in and out packet
2351 "Verifying capture on interface %s using function %s"
2352 % (dst_if.name, compare_func.__name__)
2356 for i in self.pg_interfaces:
2357 last_info[i.sw_if_index] = None
2358 dst_sw_if_index = dst_if.sw_if_index
2360 for packet in capture:
2362 # extract payload_info from packet's payload
2363 payload_info = self.get_payload_info(packet)
2364 packet_index = payload_info.index
2366 self.logger.debug("Verifying packet with index %d" % (packet_index))
2367 # packet should have arrived on the expected interface
2368 self.assertEqual(payload_info.dst, dst_sw_if_index)
2370 "Got packet on interface %s: src=%u (idx=%u)"
2371 % (dst_if.name, payload_info.src, packet_index)
2374 # search for payload_info with same src and dst if_index
2375 # this will give us the transmitted packet
2376 next_info = self.get_next_packet_info_for_interface2(
2377 payload_info.src, dst_sw_if_index, last_info[payload_info.src]
2379 last_info[payload_info.src] = next_info
2380 # next_info should not be None
2381 self.assertTrue(next_info is not None)
2382 # index of tx and rx packets should be equal
2383 self.assertEqual(packet_index, next_info.index)
2384 # data field of next_info contains the tx packet
2385 txed_packet = next_info.data
2388 ppp("Transmitted packet:", txed_packet)
2389 ) # ppp=Pretty Print Packet
2391 self.logger.debug(ppp("Received packet:", packet))
2393 # compare rcvd packet with expected packet using compare_func
2394 compare_func(txed_packet, packet)
2397 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2400 # FIXME: there is no need to check manually that all the packets
2401 # arrived (already done so by get_capture); checking here
2402 # prevents testing packets that are expected to be dropped, so
2403 # commenting this out for now
2405 # have all expected packets arrived?
2406 # for i in self.pg_interfaces:
2407 # remaining_packet = self.get_next_packet_info_for_interface2(
2408 # i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
2409 # self.assertTrue(remaining_packet is None,
2410 # "Interface %s: Packet expected from interface %s "
2411 # "didn't arrive" % (dst_if.name, i.name))
2414 if __name__ == "__main__":
2415 unittest.main(testRunner=VppTestRunner)