5 from socket import AF_INET6
7 from framework import VppTestCase, VppTestRunner
8 from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathProto, VppIpTable
10 SRv6LocalSIDBehaviors,
15 SRv6PolicySteeringTypes,
19 from scapy.packet import Raw
20 from scapy.layers.l2 import Ether, Dot1Q
21 from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
22 from scapy.layers.inet import IP, UDP
27 class TestSRv6(VppTestCase):
32 super(TestSRv6, cls).setUpClass()
35 def tearDownClass(cls):
36 super(TestSRv6, cls).tearDownClass()
39 """Perform test setup before each test case."""
40 super(TestSRv6, self).setUp()
42 # packet sizes, inclusive L2 overhead
43 self.pg_packet_sizes = [64, 512, 1518, 9018]
46 self.reset_packet_infos()
49 """Clean up test setup after each test case."""
50 self.teardown_interfaces()
52 super(TestSRv6, self).tearDown()
54 def configure_interface(
55 self, interface, ipv6=False, ipv4=False, ipv6_table_id=0, ipv4_table_id=0
57 """Configure interface.
58 :param ipv6: configure IPv6 on interface
59 :param ipv4: configure IPv4 on interface
60 :param ipv6_table_id: FIB table_id for IPv6
61 :param ipv4_table_id: FIB table_id for IPv4
63 self.logger.debug("Configuring interface %s" % (interface.name))
65 self.logger.debug("Configuring IPv6")
66 interface.set_table_ip6(ipv6_table_id)
67 interface.config_ip6()
68 interface.resolve_ndp(timeout=5)
70 self.logger.debug("Configuring IPv4")
71 interface.set_table_ip4(ipv4_table_id)
72 interface.config_ip4()
73 interface.resolve_arp()
76 def setup_interfaces(self, ipv6=[], ipv4=[], ipv6_table_id=[], ipv4_table_id=[]):
77 """Create and configure interfaces.
79 :param ipv6: list of interface IPv6 capabilities
80 :param ipv4: list of interface IPv4 capabilities
81 :param ipv6_table_id: list of intf IPv6 FIB table_ids
82 :param ipv4_table_id: list of intf IPv4 FIB table_ids
83 :returns: List of created interfaces.
85 # how many interfaces?
90 self.logger.debug("Creating and configuring %d interfaces" % (count))
92 # fill up ipv6 and ipv4 lists if needed
93 # not enabled (False) is the default
95 ipv6 += (count - len(ipv6)) * [False]
97 ipv4 += (count - len(ipv4)) * [False]
99 # fill up table_id lists if needed
100 # table_id 0 (global) is the default
101 if len(ipv6_table_id) < count:
102 ipv6_table_id += (count - len(ipv6_table_id)) * [0]
103 if len(ipv4_table_id) < count:
104 ipv4_table_id += (count - len(ipv4_table_id)) * [0]
106 # create 'count' pg interfaces
107 self.create_pg_interfaces(range(count))
109 # setup all interfaces
110 for i in range(count):
111 intf = self.pg_interfaces[i]
112 self.configure_interface(
113 intf, ipv6[i], ipv4[i], ipv6_table_id[i], ipv4_table_id[i]
117 self.logger.debug(self.vapi.cli("show ip6 neighbors"))
119 self.logger.debug(self.vapi.cli("show ip4 neighbors"))
120 self.logger.debug(self.vapi.cli("show interface"))
121 self.logger.debug(self.vapi.cli("show hardware"))
123 return self.pg_interfaces
125 def teardown_interfaces(self):
126 """Unconfigure and bring down interface."""
127 self.logger.debug("Tearing down interfaces")
128 # tear down all interfaces
129 # AFAIK they cannot be deleted
130 for i in self.pg_interfaces:
131 self.logger.debug("Tear down interface %s" % (i.name))
137 @unittest.skipUnless(0, "PC to fix")
138 def test_SRv6_T_Encaps(self):
139 """Test SRv6 Transit.Encaps behavior for IPv6."""
140 # send traffic to one destination interface
141 # source and destination are IPv6 only
142 self.setup_interfaces(ipv6=[True, True])
144 # configure FIB entries
146 self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
148 route.add_vpp_config()
150 # configure encaps IPv6 source address
151 # needs to be done before SR Policy config
153 self.vapi.cli("set sr encaps source addr a3::")
156 # configure SRv6 Policy
157 # Note: segment list order: first -> last
158 sr_policy = VppSRv6Policy(
162 sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
165 segments=["a4::", "a5::", "a6::c7"],
168 sr_policy.add_vpp_config()
169 self.sr_policy = sr_policy
171 # log the sr policies
172 self.logger.info(self.vapi.cli("show sr policies"))
174 # steer IPv6 traffic to a7::/64 into SRv6 Policy
175 # use the bsid of the above self.sr_policy
176 pol_steering = VppSRv6Steering(
178 bsid=self.sr_policy.bsid,
181 traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
186 pol_steering.add_vpp_config()
188 # log the sr steering policies
189 self.logger.info(self.vapi.cli("show sr steering policies"))
192 count = len(self.pg_packet_sizes)
193 dst_inner = "a7::1234"
196 # create IPv6 packets without SRH
197 packet_header = self.create_packet_header_IPv6(dst_inner)
198 # create traffic stream pg0->pg1
201 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
205 # create IPv6 packets with SRH
206 # packets with segments-left 1, active segment a7::
207 packet_header = self.create_packet_header_IPv6_SRH(
208 sidlist=["a8::", "a7::", "a6::"], segleft=1
210 # create traffic stream pg0->pg1
213 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
217 # create IPv6 packets with SRH and IPv6
218 # packets with segments-left 1, active segment a7::
219 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
220 dst_inner, sidlist=["a8::", "a7::", "a6::"], segleft=1
222 # create traffic stream pg0->pg1
225 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
229 # send packets and verify received packets
230 self.send_and_verify_pkts(
231 self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_T_Encaps
234 # log the localsid counters
235 self.logger.info(self.vapi.cli("show sr localsid"))
238 pol_steering.remove_vpp_config()
239 self.logger.info(self.vapi.cli("show sr steering policies"))
242 self.sr_policy.remove_vpp_config()
243 self.logger.info(self.vapi.cli("show sr policies"))
249 self.teardown_interfaces()
251 @unittest.skipUnless(0, "PC to fix")
252 def test_SRv6_T_Insert(self):
253 """Test SRv6 Transit.Insert behavior (IPv6 only)."""
254 # send traffic to one destination interface
255 # source and destination are IPv6 only
256 self.setup_interfaces(ipv6=[True, True])
258 # configure FIB entries
260 self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
262 route.add_vpp_config()
264 # configure encaps IPv6 source address
265 # needs to be done before SR Policy config
267 self.vapi.cli("set sr encaps source addr a3::")
270 # configure SRv6 Policy
271 # Note: segment list order: first -> last
272 sr_policy = VppSRv6Policy(
276 sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
279 segments=["a4::", "a5::", "a6::c7"],
282 sr_policy.add_vpp_config()
283 self.sr_policy = sr_policy
285 # log the sr policies
286 self.logger.info(self.vapi.cli("show sr policies"))
288 # steer IPv6 traffic to a7::/64 into SRv6 Policy
289 # use the bsid of the above self.sr_policy
290 pol_steering = VppSRv6Steering(
292 bsid=self.sr_policy.bsid,
295 traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
300 pol_steering.add_vpp_config()
302 # log the sr steering policies
303 self.logger.info(self.vapi.cli("show sr steering policies"))
306 count = len(self.pg_packet_sizes)
307 dst_inner = "a7::1234"
310 # create IPv6 packets without SRH
311 packet_header = self.create_packet_header_IPv6(dst_inner)
312 # create traffic stream pg0->pg1
315 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
319 # create IPv6 packets with SRH
320 # packets with segments-left 1, active segment a7::
321 packet_header = self.create_packet_header_IPv6_SRH(
322 sidlist=["a8::", "a7::", "a6::"], segleft=1
324 # create traffic stream pg0->pg1
327 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
331 # send packets and verify received packets
332 self.send_and_verify_pkts(
333 self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_T_Insert
336 # log the localsid counters
337 self.logger.info(self.vapi.cli("show sr localsid"))
340 pol_steering.remove_vpp_config()
341 self.logger.info(self.vapi.cli("show sr steering policies"))
344 self.sr_policy.remove_vpp_config()
345 self.logger.info(self.vapi.cli("show sr policies"))
351 self.teardown_interfaces()
353 @unittest.skipUnless(0, "PC to fix")
354 def test_SRv6_T_Encaps_IPv4(self):
355 """Test SRv6 Transit.Encaps behavior for IPv4."""
356 # send traffic to one destination interface
357 # source interface is IPv4 only
358 # destination interface is IPv6 only
359 self.setup_interfaces(ipv6=[False, True], ipv4=[True, False])
361 # configure FIB entries
363 self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
365 route.add_vpp_config()
367 # configure encaps IPv6 source address
368 # needs to be done before SR Policy config
370 self.vapi.cli("set sr encaps source addr a3::")
373 # configure SRv6 Policy
374 # Note: segment list order: first -> last
375 sr_policy = VppSRv6Policy(
379 sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
382 segments=["a4::", "a5::", "a6::c7"],
385 sr_policy.add_vpp_config()
386 self.sr_policy = sr_policy
388 # log the sr policies
389 self.logger.info(self.vapi.cli("show sr policies"))
391 # steer IPv4 traffic to 7.1.1.0/24 into SRv6 Policy
392 # use the bsid of the above self.sr_policy
393 pol_steering = VppSRv6Steering(
395 bsid=self.sr_policy.bsid,
398 traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV4,
403 pol_steering.add_vpp_config()
405 # log the sr steering policies
406 self.logger.info(self.vapi.cli("show sr steering policies"))
409 count = len(self.pg_packet_sizes)
410 dst_inner = "7.1.1.123"
413 # create IPv4 packets
414 packet_header = self.create_packet_header_IPv4(dst_inner)
415 # create traffic stream pg0->pg1
418 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
422 # send packets and verify received packets
423 self.send_and_verify_pkts(
424 self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_T_Encaps_IPv4
427 # log the localsid counters
428 self.logger.info(self.vapi.cli("show sr localsid"))
431 pol_steering.remove_vpp_config()
432 self.logger.info(self.vapi.cli("show sr steering policies"))
435 self.sr_policy.remove_vpp_config()
436 self.logger.info(self.vapi.cli("show sr policies"))
442 self.teardown_interfaces()
444 @unittest.skip("VPP crashes after running this test")
445 def test_SRv6_T_Encaps_L2(self):
446 """Test SRv6 Transit.Encaps behavior for L2."""
447 # send traffic to one destination interface
448 # source interface is IPv4 only TODO?
449 # destination interface is IPv6 only
450 self.setup_interfaces(ipv6=[False, True], ipv4=[False, False])
452 # configure FIB entries
454 self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
456 route.add_vpp_config()
458 # configure encaps IPv6 source address
459 # needs to be done before SR Policy config
461 self.vapi.cli("set sr encaps source addr a3::")
464 # configure SRv6 Policy
465 # Note: segment list order: first -> last
466 sr_policy = VppSRv6Policy(
470 sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
473 segments=["a4::", "a5::", "a6::c7"],
476 sr_policy.add_vpp_config()
477 self.sr_policy = sr_policy
479 # log the sr policies
480 self.logger.info(self.vapi.cli("show sr policies"))
482 # steer L2 traffic into SRv6 Policy
483 # use the bsid of the above self.sr_policy
484 pol_steering = VppSRv6Steering(
486 bsid=self.sr_policy.bsid,
489 traffic_type=SRv6PolicySteeringTypes.SR_STEER_L2,
492 sw_if_index=self.pg0.sw_if_index,
494 pol_steering.add_vpp_config()
496 # log the sr steering policies
497 self.logger.info(self.vapi.cli("show sr steering policies"))
500 count = len(self.pg_packet_sizes)
503 # create L2 packets without dot1q header
504 packet_header = self.create_packet_header_L2()
505 # create traffic stream pg0->pg1
508 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
512 # create L2 packets with dot1q header
513 packet_header = self.create_packet_header_L2(vlan=123)
514 # create traffic stream pg0->pg1
517 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
521 # send packets and verify received packets
522 self.send_and_verify_pkts(
523 self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_T_Encaps_L2
526 # log the localsid counters
527 self.logger.info(self.vapi.cli("show sr localsid"))
530 pol_steering.remove_vpp_config()
531 self.logger.info(self.vapi.cli("show sr steering policies"))
534 self.sr_policy.remove_vpp_config()
535 self.logger.info(self.vapi.cli("show sr policies"))
541 self.teardown_interfaces()
543 def test_SRv6_End(self):
544 """Test SRv6 End (without PSP) behavior."""
545 # send traffic to one destination interface
546 # source and destination interfaces are IPv6 only
547 self.setup_interfaces(ipv6=[True, True])
549 # configure FIB entries
551 self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
553 route.add_vpp_config()
555 # configure SRv6 localSID End without PSP behavior
556 localsid = VppSRv6LocalSID(
559 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
566 localsid.add_vpp_config()
568 self.logger.debug(self.vapi.cli("show sr localsid"))
570 # create IPv6 packets with SRH (SL=2, SL=1, SL=0)
571 # send one packet per SL value per packet size
572 # SL=0 packet with localSID End with USP needs 2nd SRH
573 count = len(self.pg_packet_sizes)
574 dst_inner = "a4::1234"
577 # packets with segments-left 2, active segment a3::
578 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
579 dst_inner, sidlist=["a5::", "a4::", "a3::"], segleft=2
581 # create traffic stream pg0->pg1
584 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
588 # packets with segments-left 1, active segment a3::
589 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
590 dst_inner, sidlist=["a4::", "a3::", "a2::"], segleft=1
592 # add to traffic stream pg0->pg1
595 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
599 # TODO: test behavior with SL=0 packet (needs 2*SRH?)
601 expected_count = len(pkts)
603 # packets without SRH (should not crash)
604 packet_header = self.create_packet_header_IPv6("a3::")
605 # create traffic stream pg0->pg1
608 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
612 # send packets and verify received packets
613 self.send_and_verify_pkts(
617 self.compare_rx_tx_packet_End,
618 expected_count=expected_count,
621 # log the localsid counters
622 self.logger.info(self.vapi.cli("show sr localsid"))
624 # remove SRv6 localSIDs
625 localsid.remove_vpp_config()
631 self.teardown_interfaces()
633 def test_SRv6_End_with_PSP(self):
634 """Test SRv6 End with PSP behavior."""
635 # send traffic to one destination interface
636 # source and destination interfaces are IPv6 only
637 self.setup_interfaces(ipv6=[True, True])
639 # configure FIB entries
641 self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
643 route.add_vpp_config()
645 # configure SRv6 localSID End with PSP behavior
646 localsid = VppSRv6LocalSID(
649 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
656 localsid.add_vpp_config()
658 self.logger.debug(self.vapi.cli("show sr localsid"))
660 # create IPv6 packets with SRH (SL=2, SL=1)
661 # send one packet per SL value per packet size
662 # SL=0 packet with localSID End with PSP is dropped
663 count = len(self.pg_packet_sizes)
664 dst_inner = "a4::1234"
667 # packets with segments-left 2, active segment a3::
668 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
669 dst_inner, sidlist=["a5::", "a4::", "a3::"], segleft=2
671 # create traffic stream pg0->pg1
674 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
678 # packets with segments-left 1, active segment a3::
679 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
680 dst_inner, sidlist=["a4::", "a3::", "a2::"], segleft=1
682 # add to traffic stream pg0->pg1
685 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
689 # send packets and verify received packets
690 self.send_and_verify_pkts(
691 self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_End_PSP
694 # log the localsid counters
695 self.logger.info(self.vapi.cli("show sr localsid"))
697 # remove SRv6 localSIDs
698 localsid.remove_vpp_config()
704 self.teardown_interfaces()
706 def test_SRv6_End_X(self):
707 """Test SRv6 End.X (without PSP) behavior."""
708 # create three interfaces (1 source, 2 destinations)
709 # source and destination interfaces are IPv6 only
710 self.setup_interfaces(ipv6=[True, True, True])
712 # configure FIB entries
713 # a4::/64 via pg1 and pg2
719 VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index),
720 VppRoutePath(self.pg2.remote_ip6, self.pg2.sw_if_index),
723 route.add_vpp_config()
724 self.logger.debug(self.vapi.cli("show ip6 fib"))
726 # configure SRv6 localSID End.X without PSP behavior
727 # End.X points to interface pg1
728 localsid = VppSRv6LocalSID(
731 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
732 nh_addr=self.pg1.remote_ip6,
734 sw_if_index=self.pg1.sw_if_index,
738 localsid.add_vpp_config()
740 self.logger.debug(self.vapi.cli("show sr localsid"))
742 # create IPv6 packets with SRH (SL=2, SL=1)
743 # send one packet per SL value per packet size
744 # SL=0 packet with localSID End with PSP is dropped
745 count = len(self.pg_packet_sizes)
746 dst_inner = "a4::1234"
749 # packets with segments-left 2, active segment a3::c4
750 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
751 dst_inner, sidlist=["a5::", "a4::", "a3::c4"], segleft=2
753 # create traffic stream pg0->pg1
756 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
760 # packets with segments-left 1, active segment a3::c4
761 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
762 dst_inner, sidlist=["a4::", "a3::c4", "a2::"], segleft=1
764 # add to traffic stream pg0->pg1
767 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
771 # send packets and verify received packets
772 # using same comparison function as End (no PSP)
773 self.send_and_verify_pkts(
774 self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_End
777 # assert nothing was received on the other interface (pg2)
778 self.pg2.assert_nothing_captured(remark="mis-directed packet(s)")
780 # log the localsid counters
781 self.logger.info(self.vapi.cli("show sr localsid"))
783 # remove SRv6 localSIDs
784 localsid.remove_vpp_config()
790 self.teardown_interfaces()
792 def test_SRv6_End_X_with_PSP(self):
793 """Test SRv6 End.X with PSP behavior."""
794 # create three interfaces (1 source, 2 destinations)
795 # source and destination interfaces are IPv6 only
796 self.setup_interfaces(ipv6=[True, True, True])
798 # configure FIB entries
799 # a4::/64 via pg1 and pg2
805 VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index),
806 VppRoutePath(self.pg2.remote_ip6, self.pg2.sw_if_index),
809 route.add_vpp_config()
811 # configure SRv6 localSID End with PSP behavior
812 localsid = VppSRv6LocalSID(
815 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
816 nh_addr=self.pg1.remote_ip6,
818 sw_if_index=self.pg1.sw_if_index,
822 localsid.add_vpp_config()
824 self.logger.debug(self.vapi.cli("show sr localsid"))
826 # create IPv6 packets with SRH (SL=2, SL=1)
827 # send one packet per SL value per packet size
828 # SL=0 packet with localSID End with PSP is dropped
829 count = len(self.pg_packet_sizes)
830 dst_inner = "a4::1234"
833 # packets with segments-left 2, active segment a3::
834 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
835 dst_inner, sidlist=["a5::", "a4::", "a3::c4"], segleft=2
837 # create traffic stream pg0->pg1
840 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
844 # packets with segments-left 1, active segment a3::
845 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
846 dst_inner, sidlist=["a4::", "a3::c4", "a2::"], segleft=1
848 # add to traffic stream pg0->pg1
851 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
855 # send packets and verify received packets
856 # using same comparison function as End with PSP
857 self.send_and_verify_pkts(
858 self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_End_PSP
861 # assert nothing was received on the other interface (pg2)
862 self.pg2.assert_nothing_captured(remark="mis-directed packet(s)")
864 # log the localsid counters
865 self.logger.info(self.vapi.cli("show sr localsid"))
867 # remove SRv6 localSIDs
868 localsid.remove_vpp_config()
874 self.teardown_interfaces()
876 def test_SRv6_End_DX6(self):
877 """Test SRv6 End.DX6 behavior."""
878 # send traffic to one destination interface
879 # source and destination interfaces are IPv6 only
880 self.setup_interfaces(ipv6=[True, True])
882 # configure SRv6 localSID End.DX6 behavior
883 localsid = VppSRv6LocalSID(
886 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX6,
887 nh_addr=self.pg1.remote_ip6,
889 sw_if_index=self.pg1.sw_if_index,
893 localsid.add_vpp_config()
895 self.logger.debug(self.vapi.cli("show sr localsid"))
897 # create IPv6 packets with SRH (SL=0)
898 # send one packet per packet size
899 count = len(self.pg_packet_sizes)
900 dst_inner = "a4::1234" # inner header destination address
903 # packets with SRH, segments-left 0, active segment a3::c4
904 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
905 dst_inner, sidlist=["a3::c4", "a2::", "a1::"], segleft=0
907 # add to traffic stream pg0->pg1
910 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
914 # packets without SRH, IPv6 in IPv6
915 # outer IPv6 dest addr is the localsid End.DX6
916 packet_header = self.create_packet_header_IPv6_IPv6(
917 dst_inner, dst_outer="a3::c4"
919 # add to traffic stream pg0->pg1
922 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
926 # send packets and verify received packets
927 self.send_and_verify_pkts(
928 self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_End_DX6
931 # log the localsid counters
932 self.logger.info(self.vapi.cli("show sr localsid"))
934 # remove SRv6 localSIDs
935 localsid.remove_vpp_config()
938 self.teardown_interfaces()
940 def test_SRv6_End_DT6(self):
941 """Test SRv6 End.DT6 behavior."""
942 # create three interfaces (1 source, 2 destinations)
943 # all interfaces are IPv6 only
944 # source interface in global FIB (0)
945 # destination interfaces in global and vrf
947 ipt = VppIpTable(self, vrf_1, is_ip6=True)
949 self.setup_interfaces(ipv6=[True, True, True], ipv6_table_id=[0, 0, vrf_1])
951 # configure FIB entries
952 # a4::/64 is reachable
953 # via pg1 in table 0 (global)
954 # and via pg2 in table vrf_1
959 [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index, nh_table_id=0)],
962 route0.add_vpp_config()
969 self.pg2.remote_ip6, self.pg2.sw_if_index, nh_table_id=vrf_1
974 route1.add_vpp_config()
975 self.logger.debug(self.vapi.cli("show ip6 fib"))
977 # configure SRv6 localSID End.DT6 behavior
979 # fib_table: where the localsid is installed
980 # sw_if_index: in T-variants of localsid this is the vrf table_id
981 localsid = VppSRv6LocalSID(
984 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT6,
991 localsid.add_vpp_config()
993 self.logger.debug(self.vapi.cli("show sr localsid"))
995 # create IPv6 packets with SRH (SL=0)
996 # send one packet per packet size
997 count = len(self.pg_packet_sizes)
998 dst_inner = "a4::1234" # inner header destination address
1001 # packets with SRH, segments-left 0, active segment a3::c4
1002 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
1003 dst_inner, sidlist=["a3::c4", "a2::", "a1::"], segleft=0
1005 # add to traffic stream pg0->pg1
1008 self.pg0, self.pg2, packet_header, self.pg_packet_sizes, count
1012 # packets without SRH, IPv6 in IPv6
1013 # outer IPv6 dest addr is the localsid End.DT6
1014 packet_header = self.create_packet_header_IPv6_IPv6(
1015 dst_inner, dst_outer="a3::c4"
1017 # add to traffic stream pg0->pg1
1020 self.pg0, self.pg2, packet_header, self.pg_packet_sizes, count
1024 # send packets and verify received packets
1025 # using same comparison function as End.DX6
1026 self.send_and_verify_pkts(
1027 self.pg0, pkts, self.pg2, self.compare_rx_tx_packet_End_DX6
1030 # assert nothing was received on the other interface (pg2)
1031 self.pg1.assert_nothing_captured(remark="mis-directed packet(s)")
1033 # log the localsid counters
1034 self.logger.info(self.vapi.cli("show sr localsid"))
1036 # remove SRv6 localSIDs
1037 localsid.remove_vpp_config()
1039 # remove FIB entries
1042 # cleanup interfaces
1043 self.teardown_interfaces()
1045 def test_SRv6_End_DX4(self):
1046 """Test SRv6 End.DX4 behavior."""
1047 # send traffic to one destination interface
1048 # source interface is IPv6 only
1049 # destination interface is IPv4 only
1050 self.setup_interfaces(ipv6=[True, False], ipv4=[False, True])
1052 # configure SRv6 localSID End.DX4 behavior
1053 localsid = VppSRv6LocalSID(
1056 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX4,
1057 nh_addr=self.pg1.remote_ip4,
1059 sw_if_index=self.pg1.sw_if_index,
1063 localsid.add_vpp_config()
1065 self.logger.debug(self.vapi.cli("show sr localsid"))
1067 # send one packet per packet size
1068 count = len(self.pg_packet_sizes)
1069 dst_inner = "4.1.1.123" # inner header destination address
1072 # packets with SRH, segments-left 0, active segment a3::c4
1073 packet_header = self.create_packet_header_IPv6_SRH_IPv4(
1074 dst_inner, sidlist=["a3::c4", "a2::", "a1::"], segleft=0
1076 # add to traffic stream pg0->pg1
1079 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
1083 # packets without SRH, IPv4 in IPv6
1084 # outer IPv6 dest addr is the localsid End.DX4
1085 packet_header = self.create_packet_header_IPv6_IPv4(
1086 dst_inner, dst_outer="a3::c4"
1088 # add to traffic stream pg0->pg1
1091 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
1095 # send packets and verify received packets
1096 self.send_and_verify_pkts(
1097 self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_End_DX4
1100 # log the localsid counters
1101 self.logger.info(self.vapi.cli("show sr localsid"))
1103 # remove SRv6 localSIDs
1104 localsid.remove_vpp_config()
1106 # cleanup interfaces
1107 self.teardown_interfaces()
1109 def test_SRv6_End_DT4(self):
1110 """Test SRv6 End.DT4 behavior."""
1111 # create three interfaces (1 source, 2 destinations)
1112 # source interface is IPv6-only
1113 # destination interfaces are IPv4 only
1114 # source interface in global FIB (0)
1115 # destination interfaces in global and vrf
1117 ipt = VppIpTable(self, vrf_1)
1118 ipt.add_vpp_config()
1119 self.setup_interfaces(
1120 ipv6=[True, False, False],
1121 ipv4=[False, True, True],
1122 ipv6_table_id=[0, 0, 0],
1123 ipv4_table_id=[0, 0, vrf_1],
1126 # configure FIB entries
1127 # 4.1.1.0/24 is reachable
1128 # via pg1 in table 0 (global)
1129 # and via pg2 in table vrf_1
1130 route0 = VppIpRoute(
1134 [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index, nh_table_id=0)],
1137 route0.add_vpp_config()
1138 route1 = VppIpRoute(
1144 self.pg2.remote_ip4, self.pg2.sw_if_index, nh_table_id=vrf_1
1149 route1.add_vpp_config()
1150 self.logger.debug(self.vapi.cli("show ip fib"))
1152 # configure SRv6 localSID End.DT6 behavior
1154 # fib_table: where the localsid is installed
1155 # sw_if_index: in T-variants of localsid: vrf table_id
1156 localsid = VppSRv6LocalSID(
1159 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT4,
1166 localsid.add_vpp_config()
1168 self.logger.debug(self.vapi.cli("show sr localsid"))
1170 # create IPv6 packets with SRH (SL=0)
1171 # send one packet per packet size
1172 count = len(self.pg_packet_sizes)
1173 dst_inner = "4.1.1.123" # inner header destination address
1176 # packets with SRH, segments-left 0, active segment a3::c4
1177 packet_header = self.create_packet_header_IPv6_SRH_IPv4(
1178 dst_inner, sidlist=["a3::c4", "a2::", "a1::"], segleft=0
1180 # add to traffic stream pg0->pg1
1183 self.pg0, self.pg2, packet_header, self.pg_packet_sizes, count
1187 # packets without SRH, IPv6 in IPv6
1188 # outer IPv6 dest addr is the localsid End.DX4
1189 packet_header = self.create_packet_header_IPv6_IPv4(
1190 dst_inner, dst_outer="a3::c4"
1192 # add to traffic stream pg0->pg1
1195 self.pg0, self.pg2, packet_header, self.pg_packet_sizes, count
1199 # send packets and verify received packets
1200 # using same comparison function as End.DX4
1201 self.send_and_verify_pkts(
1202 self.pg0, pkts, self.pg2, self.compare_rx_tx_packet_End_DX4
1205 # assert nothing was received on the other interface (pg2)
1206 self.pg1.assert_nothing_captured(remark="mis-directed packet(s)")
1208 # log the localsid counters
1209 self.logger.info(self.vapi.cli("show sr localsid"))
1211 # remove SRv6 localSIDs
1212 localsid.remove_vpp_config()
1214 # remove FIB entries
1217 # cleanup interfaces
1218 self.teardown_interfaces()
1220 def test_SRv6_End_DX2(self):
1221 """Test SRv6 End.DX2 behavior."""
1222 # send traffic to one destination interface
1223 # source interface is IPv6 only
1224 self.setup_interfaces(ipv6=[True, False], ipv4=[False, False])
1226 # configure SRv6 localSID End.DX2 behavior
1227 localsid = VppSRv6LocalSID(
1230 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX2,
1233 sw_if_index=self.pg1.sw_if_index,
1237 localsid.add_vpp_config()
1239 self.logger.debug(self.vapi.cli("show sr localsid"))
1241 # send one packet per packet size
1242 count = len(self.pg_packet_sizes)
1245 # packets with SRH, segments-left 0, active segment a3::c4
1246 # L2 has no dot1q header
1247 packet_header = self.create_packet_header_IPv6_SRH_L2(
1248 sidlist=["a3::c4", "a2::", "a1::"], segleft=0, vlan=0
1250 # add to traffic stream pg0->pg1
1253 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
1257 # packets with SRH, segments-left 0, active segment a3::c4
1258 # L2 has dot1q header
1259 packet_header = self.create_packet_header_IPv6_SRH_L2(
1260 sidlist=["a3::c4", "a2::", "a1::"], segleft=0, vlan=123
1262 # add to traffic stream pg0->pg1
1265 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
1269 # packets without SRH, L2 in IPv6
1270 # outer IPv6 dest addr is the localsid End.DX2
1271 # L2 has no dot1q header
1272 packet_header = self.create_packet_header_IPv6_L2(dst_outer="a3::c4", vlan=0)
1273 # add to traffic stream pg0->pg1
1276 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
1280 # packets without SRH, L2 in IPv6
1281 # outer IPv6 dest addr is the localsid End.DX2
1282 # L2 has dot1q header
1283 packet_header = self.create_packet_header_IPv6_L2(dst_outer="a3::c4", vlan=123)
1284 # add to traffic stream pg0->pg1
1287 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
1291 # send packets and verify received packets
1292 self.send_and_verify_pkts(
1293 self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_End_DX2
1296 # log the localsid counters
1297 self.logger.info(self.vapi.cli("show sr localsid"))
1299 # remove SRv6 localSIDs
1300 localsid.remove_vpp_config()
1302 # cleanup interfaces
1303 self.teardown_interfaces()
1305 @unittest.skipUnless(0, "PC to fix")
1306 def test_SRv6_T_Insert_Classifier(self):
1307 """Test SRv6 Transit.Insert behavior (IPv6 only).
1308 steer packets using the classifier
1310 # send traffic to one destination interface
1311 # source and destination are IPv6 only
1312 self.setup_interfaces(ipv6=[False, False, False, True, True])
1314 # configure FIB entries
1316 self, "a4::", 64, [VppRoutePath(self.pg4.remote_ip6, self.pg4.sw_if_index)]
1318 route.add_vpp_config()
1320 # configure encaps IPv6 source address
1321 # needs to be done before SR Policy config
1323 self.vapi.cli("set sr encaps source addr a3::")
1326 # configure SRv6 Policy
1327 # Note: segment list order: first -> last
1328 sr_policy = VppSRv6Policy(
1332 sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
1335 segments=["a4::", "a5::", "a6::c7"],
1338 sr_policy.add_vpp_config()
1339 self.sr_policy = sr_policy
1341 # log the sr policies
1342 self.logger.info(self.vapi.cli("show sr policies"))
1344 # add classify table
1345 # mask on dst ip address prefix a7::/8
1346 mask = "{!s:0<16}".format("ff")
1347 r = self.vapi.classify_add_del_table(
1349 binascii.unhexlify(mask),
1350 match_n_vectors=(len(mask) - 1) // 32 + 1,
1351 current_data_flag=1,
1354 self.assertIsNotNone(r, "No response msg for add_del_table")
1355 table_index = r.new_table_index
1357 # add the source routing node as a ip6 inacl netxt node
1358 r = self.vapi.add_node_next("ip6-inacl", "sr-pl-rewrite-insert")
1359 inacl_next_node_index = r.node_index
1361 match = "{!s:0<16}".format("a7")
1362 r = self.vapi.classify_add_del_session(
1365 binascii.unhexlify(match),
1366 hit_next_index=inacl_next_node_index,
1370 self.assertIsNotNone(r, "No response msg for add_del_session")
1372 # log the classify table used in the steering policy
1373 self.logger.info(self.vapi.cli("show classify table"))
1375 r = self.vapi.input_acl_set_interface(
1376 is_add=1, sw_if_index=self.pg3.sw_if_index, ip6_table_index=table_index
1378 self.assertIsNotNone(r, "No response msg for input_acl_set_interface")
1381 self.logger.info(self.vapi.cli("show inacl type ip6"))
1384 count = len(self.pg_packet_sizes)
1385 dst_inner = "a7::1234"
1388 # create IPv6 packets without SRH
1389 packet_header = self.create_packet_header_IPv6(dst_inner)
1390 # create traffic stream pg3->pg4
1393 self.pg3, self.pg4, packet_header, self.pg_packet_sizes, count
1397 # create IPv6 packets with SRH
1398 # packets with segments-left 1, active segment a7::
1399 packet_header = self.create_packet_header_IPv6_SRH(
1400 sidlist=["a8::", "a7::", "a6::"], segleft=1
1402 # create traffic stream pg3->pg4
1405 self.pg3, self.pg4, packet_header, self.pg_packet_sizes, count
1409 # send packets and verify received packets
1410 self.send_and_verify_pkts(
1411 self.pg3, pkts, self.pg4, self.compare_rx_tx_packet_T_Insert
1414 # remove the interface l2 input feature
1415 r = self.vapi.input_acl_set_interface(
1416 is_add=0, sw_if_index=self.pg3.sw_if_index, ip6_table_index=table_index
1418 self.assertIsNotNone(r, "No response msg for input_acl_set_interface")
1420 # log the ip6 inacl after cleaning
1421 self.logger.info(self.vapi.cli("show inacl type ip6"))
1423 # log the localsid counters
1424 self.logger.info(self.vapi.cli("show sr localsid"))
1426 # remove classifier SR steering
1427 # classifier_steering.remove_vpp_config()
1428 self.logger.info(self.vapi.cli("show sr steering policies"))
1430 # remove SR Policies
1431 self.sr_policy.remove_vpp_config()
1432 self.logger.info(self.vapi.cli("show sr policies"))
1434 # remove classify session and table
1435 r = self.vapi.classify_add_del_session(
1436 0, table_index, binascii.unhexlify(match)
1438 self.assertIsNotNone(r, "No response msg for add_del_session")
1440 r = self.vapi.classify_add_del_table(
1441 0, binascii.unhexlify(mask), table_index=table_index
1443 self.assertIsNotNone(r, "No response msg for add_del_table")
1445 self.logger.info(self.vapi.cli("show classify table"))
1447 # remove FIB entries
1450 # cleanup interfaces
1451 self.teardown_interfaces()
1453 def compare_rx_tx_packet_T_Encaps(self, tx_pkt, rx_pkt):
1454 """Compare input and output packet after passing T.Encaps
1456 :param tx_pkt: transmitted packet
1457 :param rx_pkt: received packet
1459 # T.Encaps updates the headers as follows:
1460 # SR Policy seglist (S3, S2, S1)
1461 # SR Policy source C
1464 # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(A, B2)
1466 # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1)
1467 # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(a, B2)SRH(B3, B2, B1; SL=1)
1469 # get first (outer) IPv6 header of rx'ed packet
1470 rx_ip = rx_pkt.getlayer(IPv6)
1473 tx_ip = tx_pkt.getlayer(IPv6)
1475 # expected segment-list
1476 seglist = self.sr_policy.segments
1477 # reverse list to get order as in SRH
1478 tx_seglist = seglist[::-1]
1480 # get source address of SR Policy
1481 sr_policy_source = self.sr_policy.source
1483 # rx'ed packet should have SRH
1484 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1486 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1488 # received ip.src should be equal to SR Policy source
1489 self.assertEqual(rx_ip.src, sr_policy_source)
1490 # received ip.dst should be equal to expected sidlist[lastentry]
1491 self.assertEqual(rx_ip.dst, tx_seglist[-1])
1492 # rx'ed seglist should be equal to expected seglist
1493 self.assertEqual(rx_srh.addresses, tx_seglist)
1494 # segleft should be equal to size expected seglist-1
1495 self.assertEqual(rx_srh.segleft, len(tx_seglist) - 1)
1496 # segleft should be equal to lastentry
1497 self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1499 # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1500 # except for the hop-limit field
1501 # -> update tx'ed hlim to the expected hlim
1502 tx_ip.hlim = tx_ip.hlim - 1
1504 self.assertEqual(rx_srh.payload, tx_ip)
1506 self.logger.debug("packet verification: SUCCESS")
1508 def compare_rx_tx_packet_T_Encaps_IPv4(self, tx_pkt, rx_pkt):
1509 """Compare input and output packet after passing T.Encaps for IPv4
1511 :param tx_pkt: transmitted packet
1512 :param rx_pkt: received packet
1514 # T.Encaps for IPv4 updates the headers as follows:
1515 # SR Policy seglist (S3, S2, S1)
1516 # SR Policy source C
1519 # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv4(A, B2)
1521 # get first (outer) IPv6 header of rx'ed packet
1522 rx_ip = rx_pkt.getlayer(IPv6)
1525 tx_ip = tx_pkt.getlayer(IP)
1527 # expected segment-list
1528 seglist = self.sr_policy.segments
1529 # reverse list to get order as in SRH
1530 tx_seglist = seglist[::-1]
1532 # get source address of SR Policy
1533 sr_policy_source = self.sr_policy.source
1535 # checks common to cases tx with and without SRH
1536 # rx'ed packet should have SRH and IPv4 header
1537 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1538 self.assertTrue(rx_ip.payload.haslayer(IP))
1540 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1542 # received ip.src should be equal to SR Policy source
1543 self.assertEqual(rx_ip.src, sr_policy_source)
1544 # received ip.dst should be equal to sidlist[lastentry]
1545 self.assertEqual(rx_ip.dst, tx_seglist[-1])
1546 # rx'ed seglist should be equal to seglist
1547 self.assertEqual(rx_srh.addresses, tx_seglist)
1548 # segleft should be equal to size seglist-1
1549 self.assertEqual(rx_srh.segleft, len(tx_seglist) - 1)
1550 # segleft should be equal to lastentry
1551 self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1553 # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1554 # except for the ttl field and ip checksum
1555 # -> adjust tx'ed ttl to expected ttl
1556 tx_ip.ttl = tx_ip.ttl - 1
1557 # -> set tx'ed ip checksum to None and let scapy recompute
1559 # read back the pkt (with str()) to force computing these fields
1560 # probably other ways to accomplish this are possible
1561 tx_ip = IP(scapy.compat.raw(tx_ip))
1563 self.assertEqual(rx_srh.payload, tx_ip)
1565 self.logger.debug("packet verification: SUCCESS")
1567 def compare_rx_tx_packet_T_Encaps_L2(self, tx_pkt, rx_pkt):
1568 """Compare input and output packet after passing T.Encaps for L2
1570 :param tx_pkt: transmitted packet
1571 :param rx_pkt: received packet
1573 # T.Encaps for L2 updates the headers as follows:
1574 # SR Policy seglist (S3, S2, S1)
1575 # SR Policy source C
1578 # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)L2
1580 # get first (outer) IPv6 header of rx'ed packet
1581 rx_ip = rx_pkt.getlayer(IPv6)
1584 tx_ether = tx_pkt.getlayer(Ether)
1586 # expected segment-list
1587 seglist = self.sr_policy.segments
1588 # reverse list to get order as in SRH
1589 tx_seglist = seglist[::-1]
1591 # get source address of SR Policy
1592 sr_policy_source = self.sr_policy.source
1594 # rx'ed packet should have SRH
1595 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1597 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1599 # received ip.src should be equal to SR Policy source
1600 self.assertEqual(rx_ip.src, sr_policy_source)
1601 # received ip.dst should be equal to sidlist[lastentry]
1602 self.assertEqual(rx_ip.dst, tx_seglist[-1])
1603 # rx'ed seglist should be equal to seglist
1604 self.assertEqual(rx_srh.addresses, tx_seglist)
1605 # segleft should be equal to size seglist-1
1606 self.assertEqual(rx_srh.segleft, len(tx_seglist) - 1)
1607 # segleft should be equal to lastentry
1608 self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1609 # nh should be "No Next Header" (143)
1610 self.assertEqual(rx_srh.nh, 143)
1612 # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1613 self.assertEqual(Ether(scapy.compat.raw(rx_srh.payload)), tx_ether)
1615 self.logger.debug("packet verification: SUCCESS")
1617 def compare_rx_tx_packet_T_Insert(self, tx_pkt, rx_pkt):
1618 """Compare input and output packet after passing T.Insert
1620 :param tx_pkt: transmitted packet
1621 :param rx_pkt: received packet
1623 # T.Insert updates the headers as follows:
1626 # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)
1628 # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1)
1629 # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)SRH(B3, B2, B1; SL=1)
1631 # get first (outer) IPv6 header of rx'ed packet
1632 rx_ip = rx_pkt.getlayer(IPv6)
1637 rx_udp = rx_pkt[UDP]
1639 tx_ip = tx_pkt.getlayer(IPv6)
1642 # some packets have been tx'ed with an SRH, some without it
1643 # get SRH if tx'ed packet has it
1644 if tx_pkt.haslayer(IPv6ExtHdrSegmentRouting):
1645 tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1646 tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1647 tx_udp = tx_pkt[UDP]
1649 # expected segment-list (make copy of SR Policy segment list)
1650 seglist = self.sr_policy.segments[:]
1651 # expected seglist has initial dest addr as last segment
1652 seglist.append(tx_ip.dst)
1653 # reverse list to get order as in SRH
1654 tx_seglist = seglist[::-1]
1656 # get source address of SR Policy
1657 sr_policy_source = self.sr_policy.source
1659 # checks common to cases tx with and without SRH
1660 # rx'ed packet should have SRH and only one IPv6 header
1661 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1662 self.assertFalse(rx_ip.payload.haslayer(IPv6))
1664 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1666 # rx'ed ip.src should be equal to tx'ed ip.src
1667 self.assertEqual(rx_ip.src, tx_ip.src)
1668 # rx'ed ip.dst should be equal to sidlist[lastentry]
1669 self.assertEqual(rx_ip.dst, tx_seglist[-1])
1671 # rx'ed seglist should be equal to expected seglist
1672 self.assertEqual(rx_srh.addresses, tx_seglist)
1673 # segleft should be equal to size(expected seglist)-1
1674 self.assertEqual(rx_srh.segleft, len(tx_seglist) - 1)
1675 # segleft should be equal to lastentry
1676 self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1678 if tx_srh: # packet was tx'ed with SRH
1679 # packet should have 2nd SRH
1680 self.assertTrue(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting))
1682 rx_srh2 = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting, 2)
1684 # rx'ed srh2.addresses should be equal to tx'ed srh.addresses
1685 self.assertEqual(rx_srh2.addresses, tx_srh.addresses)
1686 # rx'ed srh2.segleft should be equal to tx'ed srh.segleft
1687 self.assertEqual(rx_srh2.segleft, tx_srh.segleft)
1688 # rx'ed srh2.lastentry should be equal to tx'ed srh.lastentry
1689 self.assertEqual(rx_srh2.lastentry, tx_srh.lastentry)
1691 else: # packet was tx'ed without SRH
1692 # rx packet should have no other SRH
1693 self.assertFalse(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting))
1695 # UDP layer should be unchanged
1696 self.assertEqual(rx_udp, tx_udp)
1698 self.logger.debug("packet verification: SUCCESS")
1700 def compare_rx_tx_packet_End(self, tx_pkt, rx_pkt):
1701 """Compare input and output packet after passing End (without PSP)
1703 :param tx_pkt: transmitted packet
1704 :param rx_pkt: received packet
1706 # End (no PSP) updates the headers as follows:
1708 # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2)
1709 # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1711 # get first (outer) IPv6 header of rx'ed packet
1712 rx_ip = rx_pkt.getlayer(IPv6)
1715 rx_udp = rx_pkt[UDP]
1717 tx_ip = tx_pkt.getlayer(IPv6)
1718 # we know the packet has been tx'ed
1719 # with an inner IPv6 header and an SRH
1720 tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1721 tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1722 tx_udp = tx_pkt[UDP]
1724 # common checks, regardless of tx segleft value
1725 # rx'ed packet should have 2nd IPv6 header
1726 self.assertTrue(rx_ip.payload.haslayer(IPv6))
1727 # get second (inner) IPv6 header
1728 rx_ip2 = rx_pkt.getlayer(IPv6, 2)
1730 if tx_ip.segleft > 0:
1731 # SRH should NOT have been popped:
1732 # End SID without PSP does not pop SRH if segleft>0
1733 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1734 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1736 # received ip.src should be equal to expected ip.src
1737 self.assertEqual(rx_ip.src, tx_ip.src)
1738 # sidlist should be unchanged
1739 self.assertEqual(rx_srh.addresses, tx_srh.addresses)
1740 # segleft should have been decremented
1741 self.assertEqual(rx_srh.segleft, tx_srh.segleft - 1)
1742 # received ip.dst should be equal to sidlist[segleft]
1743 self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
1744 # lastentry should be unchanged
1745 self.assertEqual(rx_srh.lastentry, tx_srh.lastentry)
1746 # inner IPv6 packet (ip2) should be unchanged
1747 self.assertEqual(rx_ip2.src, tx_ip2.src)
1748 self.assertEqual(rx_ip2.dst, tx_ip2.dst)
1749 # else: # tx_ip.segleft == 0
1750 # TODO: Does this work with 2 SRHs in ingress packet?
1752 # UDP layer should be unchanged
1753 self.assertEqual(rx_udp, tx_udp)
1755 self.logger.debug("packet verification: SUCCESS")
1757 def compare_rx_tx_packet_End_PSP(self, tx_pkt, rx_pkt):
1758 """Compare input and output packet after passing End with PSP
1760 :param tx_pkt: transmitted packet
1761 :param rx_pkt: received packet
1763 # End (PSP) updates the headers as follows:
1764 # IPv6 + SRH (SL>1):
1765 # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2)
1766 # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1767 # IPv6 + SRH (SL=1):
1768 # in: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1771 # get first (outer) IPv6 header of rx'ed packet
1772 rx_ip = rx_pkt.getlayer(IPv6)
1775 rx_udp = rx_pkt[UDP]
1777 tx_ip = tx_pkt.getlayer(IPv6)
1778 # we know the packet has been tx'ed
1779 # with an inner IPv6 header and an SRH
1780 tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1781 tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1782 tx_udp = tx_pkt[UDP]
1784 # common checks, regardless of tx segleft value
1785 self.assertTrue(rx_ip.payload.haslayer(IPv6))
1786 rx_ip2 = rx_pkt.getlayer(IPv6, 2)
1787 # inner IPv6 packet (ip2) should be unchanged
1788 self.assertEqual(rx_ip2.src, tx_ip2.src)
1789 self.assertEqual(rx_ip2.dst, tx_ip2.dst)
1791 if tx_ip.segleft > 1:
1792 # SRH should NOT have been popped:
1793 # End SID with PSP does not pop SRH if segleft>1
1794 # rx'ed packet should have SRH
1795 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1796 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1798 # received ip.src should be equal to expected ip.src
1799 self.assertEqual(rx_ip.src, tx_ip.src)
1800 # sidlist should be unchanged
1801 self.assertEqual(rx_srh.addresses, tx_srh.addresses)
1802 # segleft should have been decremented
1803 self.assertEqual(rx_srh.segleft, tx_srh.segleft - 1)
1804 # received ip.dst should be equal to sidlist[segleft]
1805 self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
1806 # lastentry should be unchanged
1807 self.assertEqual(rx_srh.lastentry, tx_srh.lastentry)
1809 else: # tx_ip.segleft <= 1
1810 # SRH should have been popped:
1811 # End SID with PSP and segleft=1 pops SRH
1812 # the two IPv6 headers are still present
1813 # outer IPv6 header has DA == last segment of popped SRH
1814 # SRH should not be present
1815 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1816 # outer IPv6 header ip.src should be equal to tx'ed ip.src
1817 self.assertEqual(rx_ip.src, tx_ip.src)
1818 # outer IPv6 header ip.dst should be = to tx'ed sidlist[segleft-1]
1819 self.assertEqual(rx_ip.dst, tx_srh.addresses[tx_srh.segleft - 1])
1821 # UDP layer should be unchanged
1822 self.assertEqual(rx_udp, tx_udp)
1824 self.logger.debug("packet verification: SUCCESS")
1826 def compare_rx_tx_packet_End_DX6(self, tx_pkt, rx_pkt):
1827 """Compare input and output packet after passing End.DX6
1829 :param tx_pkt: transmitted packet
1830 :param rx_pkt: received packet
1832 # End.DX6 updates the headers as follows:
1833 # IPv6 + SRH (SL=0):
1834 # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv6(B, D)
1837 # in: IPv6(A, S3)IPv6(B, D)
1840 # get first (outer) IPv6 header of rx'ed packet
1841 rx_ip = rx_pkt.getlayer(IPv6)
1843 tx_ip = tx_pkt.getlayer(IPv6)
1844 tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1846 # verify if rx'ed packet has no SRH
1847 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1849 # the whole rx_ip pkt should be equal to tx_ip2
1850 # except for the hlim field
1851 # -> adjust tx'ed hlim to expected hlim
1852 tx_ip2.hlim = tx_ip2.hlim - 1
1854 self.assertEqual(rx_ip, tx_ip2)
1856 self.logger.debug("packet verification: SUCCESS")
1858 def compare_rx_tx_packet_End_DX4(self, tx_pkt, rx_pkt):
1859 """Compare input and output packet after passing End.DX4
1861 :param tx_pkt: transmitted packet
1862 :param rx_pkt: received packet
1864 # End.DX4 updates the headers as follows:
1865 # IPv6 + SRH (SL=0):
1866 # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv4(B, D)
1869 # in: IPv6(A, S3)IPv4(B, D)
1872 # get IPv4 header of rx'ed packet
1873 rx_ip = rx_pkt.getlayer(IP)
1875 tx_ip = tx_pkt.getlayer(IPv6)
1876 tx_ip2 = tx_pkt.getlayer(IP)
1878 # verify if rx'ed packet has no SRH
1879 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1881 # the whole rx_ip pkt should be equal to tx_ip2
1882 # except for the ttl field and ip checksum
1883 # -> adjust tx'ed ttl to expected ttl
1884 tx_ip2.ttl = tx_ip2.ttl - 1
1885 # -> set tx'ed ip checksum to None and let scapy recompute
1886 tx_ip2.chksum = None
1887 # read back the pkt (with str()) to force computing these fields
1888 # probably other ways to accomplish this are possible
1889 tx_ip2 = IP(scapy.compat.raw(tx_ip2))
1891 self.assertEqual(rx_ip, tx_ip2)
1893 self.logger.debug("packet verification: SUCCESS")
1895 def compare_rx_tx_packet_End_DX2(self, tx_pkt, rx_pkt):
1896 """Compare input and output packet after passing End.DX2
1898 :param tx_pkt: transmitted packet
1899 :param rx_pkt: received packet
1901 # End.DX2 updates the headers as follows:
1902 # IPv6 + SRH (SL=0):
1903 # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)L2
1909 # get IPv4 header of rx'ed packet
1910 rx_eth = rx_pkt.getlayer(Ether)
1912 tx_ip = tx_pkt.getlayer(IPv6)
1913 # we can't just get the 2nd Ether layer
1914 # get the Raw content and dissect it as Ether
1915 tx_eth1 = Ether(scapy.compat.raw(tx_pkt[Raw]))
1917 # verify if rx'ed packet has no SRH
1918 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1920 # the whole rx_eth pkt should be equal to tx_eth1
1921 self.assertEqual(rx_eth, tx_eth1)
1923 self.logger.debug("packet verification: SUCCESS")
1925 def create_stream(self, src_if, dst_if, packet_header, packet_sizes, count):
1926 """Create SRv6 input packet stream for defined interface.
1928 :param VppInterface src_if: Interface to create packet stream for
1929 :param VppInterface dst_if: destination interface of packet stream
1930 :param packet_header: Layer3 scapy packet headers,
1931 L2 is added when not provided,
1932 Raw(payload) with packet_info is added
1933 :param list packet_sizes: packet stream pckt sizes,sequentially applied
1934 to packets in stream have
1935 :param int count: number of packets in packet stream
1936 :return: list of packets
1938 self.logger.info("Creating packets")
1940 for i in range(0, count - 1):
1941 payload_info = self.create_packet_info(src_if, dst_if)
1942 self.logger.debug("Creating packet with index %d" % (payload_info.index))
1943 payload = self.info_to_payload(payload_info)
1944 # add L2 header if not yet provided in packet_header
1945 if packet_header.getlayer(0).name == "Ethernet":
1946 p = packet_header / Raw(payload)
1949 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
1953 size = packet_sizes[i % len(packet_sizes)]
1954 self.logger.debug("Packet size %d" % (size))
1955 self.extend_packet(p, size)
1956 # we need to store the packet with the automatic fields computed
1957 # read back the dumped packet (with str())
1958 # to force computing these fields
1959 # probably other ways are possible
1960 p = Ether(scapy.compat.raw(p))
1961 payload_info.data = p.copy()
1962 self.logger.debug(ppp("Created packet:", p))
1964 self.logger.info("Done creating packets")
1967 def send_and_verify_pkts(
1968 self, input, pkts, output, compare_func, expected_count=None
1970 """Send packets and verify received packets using compare_func
1972 :param input: ingress interface of DUT
1973 :param pkts: list of packets to transmit
1974 :param output: egress interface of DUT
1975 :param compare_func: function to compare in and out packets
1976 :param expected_count: expected number of captured packets (if
1977 different than len(pkts))
1979 # add traffic stream to input interface
1980 input.add_stream(pkts)
1982 # enable capture on all interfaces
1983 self.pg_enable_capture(self.pg_interfaces)
1986 self.logger.info("Starting traffic")
1989 # get output capture
1990 self.logger.info("Getting packet capture")
1991 capture = output.get_capture(expected_count=expected_count)
1993 # assert nothing was captured on input interface
1994 input.assert_nothing_captured()
1996 # verify captured packets
1997 self.verify_captured_pkts(output, capture, compare_func)
1999 def create_packet_header_IPv6(self, dst):
2000 """Create packet header: IPv6 header, UDP header
2002 :param dst: IPv6 destination address
2004 IPv6 source address is 1234::1
2005 UDP source port and destination port are 1234
2008 p = IPv6(src="1234::1", dst=dst) / UDP(sport=1234, dport=1234)
2011 def create_packet_header_IPv6_SRH(self, sidlist, segleft):
2012 """Create packet header: IPv6 header with SRH, UDP header
2014 :param list sidlist: segment list
2015 :param int segleft: segments-left field value
2017 IPv6 destination address is set to sidlist[segleft]
2018 IPv6 source addresses are 1234::1 and 4321::1
2019 UDP source port and destination port are 1234
2023 IPv6(src="1234::1", dst=sidlist[segleft])
2024 / IPv6ExtHdrSegmentRouting(addresses=sidlist)
2025 / UDP(sport=1234, dport=1234)
2029 def create_packet_header_IPv6_SRH_IPv6(self, dst, sidlist, segleft):
2030 """Create packet header: IPv6 encapsulated in SRv6:
2031 IPv6 header with SRH, IPv6 header, UDP header
2033 :param ipv6address dst: inner IPv6 destination address
2034 :param list sidlist: segment list of outer IPv6 SRH
2035 :param int segleft: segments-left field of outer IPv6 SRH
2037 Outer IPv6 destination address is set to sidlist[segleft]
2038 IPv6 source addresses are 1234::1 and 4321::1
2039 UDP source port and destination port are 1234
2043 IPv6(src="1234::1", dst=sidlist[segleft])
2044 / IPv6ExtHdrSegmentRouting(addresses=sidlist, segleft=segleft, nh=41)
2045 / IPv6(src="4321::1", dst=dst)
2046 / UDP(sport=1234, dport=1234)
2050 def create_packet_header_IPv6_IPv6(self, dst_inner, dst_outer):
2051 """Create packet header: IPv6 encapsulated in IPv6:
2052 IPv6 header, IPv6 header, UDP header
2054 :param ipv6address dst_inner: inner IPv6 destination address
2055 :param ipv6address dst_outer: outer IPv6 destination address
2057 IPv6 source addresses are 1234::1 and 4321::1
2058 UDP source port and destination port are 1234
2062 IPv6(src="1234::1", dst=dst_outer)
2063 / IPv6(src="4321::1", dst=dst_inner)
2064 / UDP(sport=1234, dport=1234)
2068 def create_packet_header_IPv6_SRH_SRH_IPv6(
2069 self, dst, sidlist1, segleft1, sidlist2, segleft2
2071 """Create packet header: IPv6 encapsulated in SRv6 with 2 SRH:
2072 IPv6 header with SRH, 2nd SRH, IPv6 header, UDP header
2074 :param ipv6address dst: inner IPv6 destination address
2075 :param list sidlist1: segment list of outer IPv6 SRH
2076 :param int segleft1: segments-left field of outer IPv6 SRH
2077 :param list sidlist2: segment list of inner IPv6 SRH
2078 :param int segleft2: segments-left field of inner IPv6 SRH
2080 Outer IPv6 destination address is set to sidlist[segleft]
2081 IPv6 source addresses are 1234::1 and 4321::1
2082 UDP source port and destination port are 1234
2086 IPv6(src="1234::1", dst=sidlist1[segleft1])
2087 / IPv6ExtHdrSegmentRouting(addresses=sidlist1, segleft=segleft1, nh=43)
2088 / IPv6ExtHdrSegmentRouting(addresses=sidlist2, segleft=segleft2, nh=41)
2089 / IPv6(src="4321::1", dst=dst)
2090 / UDP(sport=1234, dport=1234)
2094 def create_packet_header_IPv4(self, dst):
2095 """Create packet header: IPv4 header, UDP header
2097 :param dst: IPv4 destination address
2099 IPv4 source address is 123.1.1.1
2100 UDP source port and destination port are 1234
2103 p = IP(src="123.1.1.1", dst=dst) / UDP(sport=1234, dport=1234)
2106 def create_packet_header_IPv6_IPv4(self, dst_inner, dst_outer):
2107 """Create packet header: IPv4 encapsulated in IPv6:
2108 IPv6 header, IPv4 header, UDP header
2110 :param ipv4address dst_inner: inner IPv4 destination address
2111 :param ipv6address dst_outer: outer IPv6 destination address
2113 IPv6 source address is 1234::1
2114 IPv4 source address is 123.1.1.1
2115 UDP source port and destination port are 1234
2119 IPv6(src="1234::1", dst=dst_outer)
2120 / IP(src="123.1.1.1", dst=dst_inner)
2121 / UDP(sport=1234, dport=1234)
2125 def create_packet_header_IPv6_SRH_IPv4(self, dst, sidlist, segleft):
2126 """Create packet header: IPv4 encapsulated in SRv6:
2127 IPv6 header with SRH, IPv4 header, UDP header
2129 :param ipv4address dst: inner IPv4 destination address
2130 :param list sidlist: segment list of outer IPv6 SRH
2131 :param int segleft: segments-left field of outer IPv6 SRH
2133 Outer IPv6 destination address is set to sidlist[segleft]
2134 IPv6 source address is 1234::1
2135 IPv4 source address is 123.1.1.1
2136 UDP source port and destination port are 1234
2140 IPv6(src="1234::1", dst=sidlist[segleft])
2141 / IPv6ExtHdrSegmentRouting(addresses=sidlist, segleft=segleft, nh=4)
2142 / IP(src="123.1.1.1", dst=dst)
2143 / UDP(sport=1234, dport=1234)
2147 def create_packet_header_L2(self, vlan=0):
2148 """Create packet header: L2 header
2150 :param vlan: if vlan!=0 then add 802.1q header
2152 # Note: the dst addr ('00:55:44:33:22:11') is used in
2153 # the compare function compare_rx_tx_packet_T_Encaps_L2
2154 # to detect presence of L2 in SRH payload
2155 p = Ether(src="00:11:22:33:44:55", dst="00:55:44:33:22:11")
2156 etype = 0x8137 # IPX
2159 p /= Dot1Q(vlan=vlan, type=etype)
2164 def create_packet_header_IPv6_SRH_L2(self, sidlist, segleft, vlan=0):
2165 """Create packet header: L2 encapsulated in SRv6:
2166 IPv6 header with SRH, L2
2168 :param list sidlist: segment list of outer IPv6 SRH
2169 :param int segleft: segments-left field of outer IPv6 SRH
2170 :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
2172 Outer IPv6 destination address is set to sidlist[segleft]
2173 IPv6 source address is 1234::1
2175 eth = Ether(src="00:11:22:33:44:55", dst="00:55:44:33:22:11")
2176 etype = 0x8137 # IPX
2179 eth /= Dot1Q(vlan=vlan, type=etype)
2184 IPv6(src="1234::1", dst=sidlist[segleft])
2185 / IPv6ExtHdrSegmentRouting(addresses=sidlist, segleft=segleft, nh=143)
2190 def create_packet_header_IPv6_L2(self, dst_outer, vlan=0):
2191 """Create packet header: L2 encapsulated in IPv6:
2194 :param ipv6address dst_outer: outer IPv6 destination address
2195 :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
2197 eth = Ether(src="00:11:22:33:44:55", dst="00:55:44:33:22:11")
2198 etype = 0x8137 # IPX
2201 eth /= Dot1Q(vlan=vlan, type=etype)
2205 p = IPv6(src="1234::1", dst=dst_outer, nh=143) / eth
2208 def get_payload_info(self, packet):
2209 """Extract the payload_info from the packet"""
2210 # in most cases, payload_info is in packet[Raw]
2211 # but packet[Raw] gives the complete payload
2212 # (incl L2 header) for the T.Encaps L2 case
2214 payload_info = self.payload_to_info(packet[Raw])
2217 # remote L2 header from packet[Raw]:
2218 # take packet[Raw], convert it to an Ether layer
2219 # and then extract Raw from it
2220 payload_info = self.payload_to_info(Ether(scapy.compat.r(packet[Raw]))[Raw])
2224 def verify_captured_pkts(self, dst_if, capture, compare_func):
2226 Verify captured packet stream for specified interface.
2227 Compare ingress with egress packets using the specified compare fn
2229 :param dst_if: egress interface of DUT
2230 :param capture: captured packets
2231 :param compare_func: function to compare in and out packet
2234 "Verifying capture on interface %s using function %s"
2235 % (dst_if.name, compare_func.__name__)
2239 for i in self.pg_interfaces:
2240 last_info[i.sw_if_index] = None
2241 dst_sw_if_index = dst_if.sw_if_index
2243 for packet in capture:
2245 # extract payload_info from packet's payload
2246 payload_info = self.get_payload_info(packet)
2247 packet_index = payload_info.index
2249 self.logger.debug("Verifying packet with index %d" % (packet_index))
2250 # packet should have arrived on the expected interface
2251 self.assertEqual(payload_info.dst, dst_sw_if_index)
2253 "Got packet on interface %s: src=%u (idx=%u)"
2254 % (dst_if.name, payload_info.src, packet_index)
2257 # search for payload_info with same src and dst if_index
2258 # this will give us the transmitted packet
2259 next_info = self.get_next_packet_info_for_interface2(
2260 payload_info.src, dst_sw_if_index, last_info[payload_info.src]
2262 last_info[payload_info.src] = next_info
2263 # next_info should not be None
2264 self.assertTrue(next_info is not None)
2265 # index of tx and rx packets should be equal
2266 self.assertEqual(packet_index, next_info.index)
2267 # data field of next_info contains the tx packet
2268 txed_packet = next_info.data
2271 ppp("Transmitted packet:", txed_packet)
2272 ) # ppp=Pretty Print Packet
2274 self.logger.debug(ppp("Received packet:", packet))
2276 # compare rcvd packet with expected packet using compare_func
2277 compare_func(txed_packet, packet)
2280 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2283 # FIXME: there is no need to check manually that all the packets
2284 # arrived (already done so by get_capture); checking here
2285 # prevents testing packets that are expected to be dropped, so
2286 # commenting this out for now
2288 # have all expected packets arrived?
2289 # for i in self.pg_interfaces:
2290 # remaining_packet = self.get_next_packet_info_for_interface2(
2291 # i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
2292 # self.assertTrue(remaining_packet is None,
2293 # "Interface %s: Packet expected from interface %s "
2294 # "didn't arrive" % (dst_if.name, i.name))
2297 if __name__ == "__main__":
2298 unittest.main(testRunner=VppTestRunner)