5 from socket import AF_INET6
7 from framework import VppTestCase, VppTestRunner
8 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto, VppIpTable
9 from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \
10 SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes
13 from scapy.packet import Raw
14 from scapy.layers.l2 import Ether, Dot1Q
15 from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
16 from scapy.layers.inet import IP, UDP
18 from scapy.utils import inet_pton, inet_ntop
23 class TestSRv6(VppTestCase):
24 """ SRv6 Test Case """
28 super(TestSRv6, self).setUpClass()
31 """ Perform test setup before each test case.
33 super(TestSRv6, self).setUp()
35 # packet sizes, inclusive L2 overhead
36 self.pg_packet_sizes = [64, 512, 1518, 9018]
39 self.reset_packet_infos()
42 """ Clean up test setup after each test case.
44 self.teardown_interfaces()
46 super(TestSRv6, self).tearDown()
48 def configure_interface(self,
50 ipv6=False, ipv4=False,
51 ipv6_table_id=0, ipv4_table_id=0):
52 """ Configure interface.
53 :param ipv6: configure IPv6 on interface
54 :param ipv4: configure IPv4 on interface
55 :param ipv6_table_id: FIB table_id for IPv6
56 :param ipv4_table_id: FIB table_id for IPv4
58 self.logger.debug("Configuring interface %s" % (interface.name))
60 self.logger.debug("Configuring IPv6")
61 interface.set_table_ip6(ipv6_table_id)
62 interface.config_ip6()
63 interface.resolve_ndp(timeout=5)
65 self.logger.debug("Configuring IPv4")
66 interface.set_table_ip4(ipv4_table_id)
67 interface.config_ip4()
68 interface.resolve_arp()
71 def setup_interfaces(self, ipv6=[], ipv4=[],
72 ipv6_table_id=[], ipv4_table_id=[]):
73 """ Create and configure interfaces.
75 :param ipv6: list of interface IPv6 capabilities
76 :param ipv4: list of interface IPv4 capabilities
77 :param ipv6_table_id: list of intf IPv6 FIB table_ids
78 :param ipv4_table_id: list of intf IPv4 FIB table_ids
79 :returns: List of created interfaces.
81 # how many interfaces?
86 self.logger.debug("Creating and configuring %d interfaces" % (count))
88 # fill up ipv6 and ipv4 lists if needed
89 # not enabled (False) is the default
91 ipv6 += (count - len(ipv6)) * [False]
93 ipv4 += (count - len(ipv4)) * [False]
95 # fill up table_id lists if needed
96 # table_id 0 (global) is the default
97 if len(ipv6_table_id) < count:
98 ipv6_table_id += (count - len(ipv6_table_id)) * [0]
99 if len(ipv4_table_id) < count:
100 ipv4_table_id += (count - len(ipv4_table_id)) * [0]
102 # create 'count' pg interfaces
103 self.create_pg_interfaces(range(count))
105 # setup all interfaces
106 for i in range(count):
107 intf = self.pg_interfaces[i]
108 self.configure_interface(intf,
110 ipv6_table_id[i], ipv4_table_id[i])
113 self.logger.debug(self.vapi.cli("show ip6 neighbors"))
115 self.logger.debug(self.vapi.cli("show ip arp"))
116 self.logger.debug(self.vapi.cli("show interface"))
117 self.logger.debug(self.vapi.cli("show hardware"))
119 return self.pg_interfaces
121 def teardown_interfaces(self):
122 """ Unconfigure and bring down interface.
124 self.logger.debug("Tearing down interfaces")
125 # tear down all interfaces
126 # AFAIK they cannot be deleted
127 for i in self.pg_interfaces:
128 self.logger.debug("Tear down interface %s" % (i.name))
134 @unittest.skipUnless(0, "PC to fix")
135 def test_SRv6_T_Encaps(self):
136 """ Test SRv6 Transit.Encaps behavior for IPv6.
138 # send traffic to one destination interface
139 # source and destination are IPv6 only
140 self.setup_interfaces(ipv6=[True, True])
142 # configure FIB entries
143 route = VppIpRoute(self, "a4::", 64,
144 [VppRoutePath(self.pg1.remote_ip6,
145 self.pg1.sw_if_index,
146 proto=DpoProto.DPO_PROTO_IP6)],
148 route.add_vpp_config()
150 # configure encaps IPv6 source address
151 # needs to be done before SR Policy config
153 self.vapi.cli("set sr encaps source addr a3::")
156 # configure SRv6 Policy
157 # Note: segment list order: first -> last
158 sr_policy = VppSRv6Policy(
161 sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
162 weight=1, fib_table=0,
163 segments=['a4::', 'a5::', 'a6::c7'],
165 sr_policy.add_vpp_config()
166 self.sr_policy = sr_policy
168 # log the sr policies
169 self.logger.info(self.vapi.cli("show sr policies"))
171 # steer IPv6 traffic to a7::/64 into SRv6 Policy
172 # use the bsid of the above self.sr_policy
173 pol_steering = VppSRv6Steering(
175 bsid=self.sr_policy.bsid,
176 prefix="a7::", mask_width=64,
177 traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
178 sr_policy_index=0, table_id=0,
180 pol_steering.add_vpp_config()
182 # log the sr steering policies
183 self.logger.info(self.vapi.cli("show sr steering policies"))
186 count = len(self.pg_packet_sizes)
187 dst_inner = 'a7::1234'
190 # create IPv6 packets without SRH
191 packet_header = self.create_packet_header_IPv6(dst_inner)
192 # create traffic stream pg0->pg1
193 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
194 self.pg_packet_sizes, count))
196 # create IPv6 packets with SRH
197 # packets with segments-left 1, active segment a7::
198 packet_header = self.create_packet_header_IPv6_SRH(
199 sidlist=['a8::', 'a7::', 'a6::'],
201 # create traffic stream pg0->pg1
202 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
203 self.pg_packet_sizes, count))
205 # create IPv6 packets with SRH and IPv6
206 # packets with segments-left 1, active segment a7::
207 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
209 sidlist=['a8::', 'a7::', 'a6::'],
211 # create traffic stream pg0->pg1
212 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
213 self.pg_packet_sizes, count))
215 # send packets and verify received packets
216 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
217 self.compare_rx_tx_packet_T_Encaps)
219 # log the localsid counters
220 self.logger.info(self.vapi.cli("show sr localsid"))
223 pol_steering.remove_vpp_config()
224 self.logger.info(self.vapi.cli("show sr steering policies"))
227 self.sr_policy.remove_vpp_config()
228 self.logger.info(self.vapi.cli("show sr policies"))
234 self.teardown_interfaces()
236 @unittest.skipUnless(0, "PC to fix")
237 def test_SRv6_T_Insert(self):
238 """ Test SRv6 Transit.Insert behavior (IPv6 only).
240 # send traffic to one destination interface
241 # source and destination are IPv6 only
242 self.setup_interfaces(ipv6=[True, True])
244 # configure FIB entries
245 route = VppIpRoute(self, "a4::", 64,
246 [VppRoutePath(self.pg1.remote_ip6,
247 self.pg1.sw_if_index,
248 proto=DpoProto.DPO_PROTO_IP6)],
250 route.add_vpp_config()
252 # configure encaps IPv6 source address
253 # needs to be done before SR Policy config
255 self.vapi.cli("set sr encaps source addr a3::")
258 # configure SRv6 Policy
259 # Note: segment list order: first -> last
260 sr_policy = VppSRv6Policy(
263 sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
264 weight=1, fib_table=0,
265 segments=['a4::', 'a5::', 'a6::c7'],
267 sr_policy.add_vpp_config()
268 self.sr_policy = sr_policy
270 # log the sr policies
271 self.logger.info(self.vapi.cli("show sr policies"))
273 # steer IPv6 traffic to a7::/64 into SRv6 Policy
274 # use the bsid of the above self.sr_policy
275 pol_steering = VppSRv6Steering(
277 bsid=self.sr_policy.bsid,
278 prefix="a7::", mask_width=64,
279 traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
280 sr_policy_index=0, table_id=0,
282 pol_steering.add_vpp_config()
284 # log the sr steering policies
285 self.logger.info(self.vapi.cli("show sr steering policies"))
288 count = len(self.pg_packet_sizes)
289 dst_inner = 'a7::1234'
292 # create IPv6 packets without SRH
293 packet_header = self.create_packet_header_IPv6(dst_inner)
294 # create traffic stream pg0->pg1
295 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
296 self.pg_packet_sizes, count))
298 # create IPv6 packets with SRH
299 # packets with segments-left 1, active segment a7::
300 packet_header = self.create_packet_header_IPv6_SRH(
301 sidlist=['a8::', 'a7::', 'a6::'],
303 # create traffic stream pg0->pg1
304 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
305 self.pg_packet_sizes, count))
307 # send packets and verify received packets
308 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
309 self.compare_rx_tx_packet_T_Insert)
311 # log the localsid counters
312 self.logger.info(self.vapi.cli("show sr localsid"))
315 pol_steering.remove_vpp_config()
316 self.logger.info(self.vapi.cli("show sr steering policies"))
319 self.sr_policy.remove_vpp_config()
320 self.logger.info(self.vapi.cli("show sr policies"))
326 self.teardown_interfaces()
328 @unittest.skipUnless(0, "PC to fix")
329 def test_SRv6_T_Encaps_IPv4(self):
330 """ Test SRv6 Transit.Encaps behavior for IPv4.
332 # send traffic to one destination interface
333 # source interface is IPv4 only
334 # destination interface is IPv6 only
335 self.setup_interfaces(ipv6=[False, True], ipv4=[True, False])
337 # configure FIB entries
338 route = VppIpRoute(self, "a4::", 64,
339 [VppRoutePath(self.pg1.remote_ip6,
340 self.pg1.sw_if_index,
341 proto=DpoProto.DPO_PROTO_IP6)],
343 route.add_vpp_config()
345 # configure encaps IPv6 source address
346 # needs to be done before SR Policy config
348 self.vapi.cli("set sr encaps source addr a3::")
351 # configure SRv6 Policy
352 # Note: segment list order: first -> last
353 sr_policy = VppSRv6Policy(
356 sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
357 weight=1, fib_table=0,
358 segments=['a4::', 'a5::', 'a6::c7'],
360 sr_policy.add_vpp_config()
361 self.sr_policy = sr_policy
363 # log the sr policies
364 self.logger.info(self.vapi.cli("show sr policies"))
366 # steer IPv4 traffic to 7.1.1.0/24 into SRv6 Policy
367 # use the bsid of the above self.sr_policy
368 pol_steering = VppSRv6Steering(
370 bsid=self.sr_policy.bsid,
371 prefix="7.1.1.0", mask_width=24,
372 traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV4,
373 sr_policy_index=0, table_id=0,
375 pol_steering.add_vpp_config()
377 # log the sr steering policies
378 self.logger.info(self.vapi.cli("show sr steering policies"))
381 count = len(self.pg_packet_sizes)
382 dst_inner = '7.1.1.123'
385 # create IPv4 packets
386 packet_header = self.create_packet_header_IPv4(dst_inner)
387 # create traffic stream pg0->pg1
388 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
389 self.pg_packet_sizes, count))
391 # send packets and verify received packets
392 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
393 self.compare_rx_tx_packet_T_Encaps_IPv4)
395 # log the localsid counters
396 self.logger.info(self.vapi.cli("show sr localsid"))
399 pol_steering.remove_vpp_config()
400 self.logger.info(self.vapi.cli("show sr steering policies"))
403 self.sr_policy.remove_vpp_config()
404 self.logger.info(self.vapi.cli("show sr policies"))
410 self.teardown_interfaces()
412 @unittest.skip("VPP crashes after running this test")
413 def test_SRv6_T_Encaps_L2(self):
414 """ Test SRv6 Transit.Encaps behavior for L2.
416 # send traffic to one destination interface
417 # source interface is IPv4 only TODO?
418 # destination interface is IPv6 only
419 self.setup_interfaces(ipv6=[False, True], ipv4=[False, False])
421 # configure FIB entries
422 route = VppIpRoute(self, "a4::", 64,
423 [VppRoutePath(self.pg1.remote_ip6,
424 self.pg1.sw_if_index,
425 proto=DpoProto.DPO_PROTO_IP6)],
427 route.add_vpp_config()
429 # configure encaps IPv6 source address
430 # needs to be done before SR Policy config
432 self.vapi.cli("set sr encaps source addr a3::")
435 # configure SRv6 Policy
436 # Note: segment list order: first -> last
437 sr_policy = VppSRv6Policy(
440 sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
441 weight=1, fib_table=0,
442 segments=['a4::', 'a5::', 'a6::c7'],
444 sr_policy.add_vpp_config()
445 self.sr_policy = sr_policy
447 # log the sr policies
448 self.logger.info(self.vapi.cli("show sr policies"))
450 # steer L2 traffic into SRv6 Policy
451 # use the bsid of the above self.sr_policy
452 pol_steering = VppSRv6Steering(
454 bsid=self.sr_policy.bsid,
455 prefix="::", mask_width=0,
456 traffic_type=SRv6PolicySteeringTypes.SR_STEER_L2,
457 sr_policy_index=0, table_id=0,
458 sw_if_index=self.pg0.sw_if_index)
459 pol_steering.add_vpp_config()
461 # log the sr steering policies
462 self.logger.info(self.vapi.cli("show sr steering policies"))
465 count = len(self.pg_packet_sizes)
468 # create L2 packets without dot1q header
469 packet_header = self.create_packet_header_L2()
470 # create traffic stream pg0->pg1
471 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
472 self.pg_packet_sizes, count))
474 # create L2 packets with dot1q header
475 packet_header = self.create_packet_header_L2(vlan=123)
476 # create traffic stream pg0->pg1
477 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
478 self.pg_packet_sizes, count))
480 # send packets and verify received packets
481 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
482 self.compare_rx_tx_packet_T_Encaps_L2)
484 # log the localsid counters
485 self.logger.info(self.vapi.cli("show sr localsid"))
488 pol_steering.remove_vpp_config()
489 self.logger.info(self.vapi.cli("show sr steering policies"))
492 self.sr_policy.remove_vpp_config()
493 self.logger.info(self.vapi.cli("show sr policies"))
499 self.teardown_interfaces()
501 def test_SRv6_End(self):
502 """ Test SRv6 End (without PSP) behavior.
504 # send traffic to one destination interface
505 # source and destination interfaces are IPv6 only
506 self.setup_interfaces(ipv6=[True, True])
508 # configure FIB entries
509 route = VppIpRoute(self, "a4::", 64,
510 [VppRoutePath(self.pg1.remote_ip6,
511 self.pg1.sw_if_index,
512 proto=DpoProto.DPO_PROTO_IP6)],
514 route.add_vpp_config()
516 # configure SRv6 localSID End without PSP behavior
517 localsid = VppSRv6LocalSID(
518 self, localsid={'addr': 'A3::0'},
519 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
526 localsid.add_vpp_config()
528 self.logger.debug(self.vapi.cli("show sr localsid"))
530 # create IPv6 packets with SRH (SL=2, SL=1, SL=0)
531 # send one packet per SL value per packet size
532 # SL=0 packet with localSID End with USP needs 2nd SRH
533 count = len(self.pg_packet_sizes)
534 dst_inner = 'a4::1234'
537 # packets with segments-left 2, active segment a3::
538 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
540 sidlist=['a5::', 'a4::', 'a3::'],
542 # create traffic stream pg0->pg1
543 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
544 self.pg_packet_sizes, count))
546 # packets with segments-left 1, active segment a3::
547 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
549 sidlist=['a4::', 'a3::', 'a2::'],
551 # add to traffic stream pg0->pg1
552 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
553 self.pg_packet_sizes, count))
555 # TODO: test behavior with SL=0 packet (needs 2*SRH?)
557 # send packets and verify received packets
558 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
559 self.compare_rx_tx_packet_End)
561 # log the localsid counters
562 self.logger.info(self.vapi.cli("show sr localsid"))
564 # remove SRv6 localSIDs
565 localsid.remove_vpp_config()
571 self.teardown_interfaces()
573 def test_SRv6_End_with_PSP(self):
574 """ Test SRv6 End with PSP behavior.
576 # send traffic to one destination interface
577 # source and destination interfaces are IPv6 only
578 self.setup_interfaces(ipv6=[True, True])
580 # configure FIB entries
581 route = VppIpRoute(self, "a4::", 64,
582 [VppRoutePath(self.pg1.remote_ip6,
583 self.pg1.sw_if_index,
584 proto=DpoProto.DPO_PROTO_IP6)],
586 route.add_vpp_config()
588 # configure SRv6 localSID End with PSP behavior
589 localsid = VppSRv6LocalSID(
590 self, localsid={'addr': 'A3::0'},
591 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
598 localsid.add_vpp_config()
600 self.logger.debug(self.vapi.cli("show sr localsid"))
602 # create IPv6 packets with SRH (SL=2, SL=1)
603 # send one packet per SL value per packet size
604 # SL=0 packet with localSID End with PSP is dropped
605 count = len(self.pg_packet_sizes)
606 dst_inner = 'a4::1234'
609 # packets with segments-left 2, active segment a3::
610 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
612 sidlist=['a5::', 'a4::', 'a3::'],
614 # create traffic stream pg0->pg1
615 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
616 self.pg_packet_sizes, count))
618 # packets with segments-left 1, active segment a3::
619 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
621 sidlist=['a4::', 'a3::', 'a2::'],
623 # add to traffic stream pg0->pg1
624 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
625 self.pg_packet_sizes, count))
627 # send packets and verify received packets
628 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
629 self.compare_rx_tx_packet_End_PSP)
631 # log the localsid counters
632 self.logger.info(self.vapi.cli("show sr localsid"))
634 # remove SRv6 localSIDs
635 localsid.remove_vpp_config()
641 self.teardown_interfaces()
643 def test_SRv6_End_X(self):
644 """ Test SRv6 End.X (without PSP) behavior.
646 # create three interfaces (1 source, 2 destinations)
647 # source and destination interfaces are IPv6 only
648 self.setup_interfaces(ipv6=[True, True, True])
650 # configure FIB entries
651 # a4::/64 via pg1 and pg2
652 route = VppIpRoute(self, "a4::", 64,
653 [VppRoutePath(self.pg1.remote_ip6,
654 self.pg1.sw_if_index,
655 proto=DpoProto.DPO_PROTO_IP6),
656 VppRoutePath(self.pg2.remote_ip6,
657 self.pg2.sw_if_index,
658 proto=DpoProto.DPO_PROTO_IP6)],
660 route.add_vpp_config()
661 self.logger.debug(self.vapi.cli("show ip6 fib"))
663 # configure SRv6 localSID End.X without PSP behavior
664 # End.X points to interface pg1
665 localsid = VppSRv6LocalSID(
666 self, localsid={'addr': 'A3::C4'},
667 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
669 nh_addr6=self.pg1.remote_ip6,
671 sw_if_index=self.pg1.sw_if_index,
674 localsid.add_vpp_config()
676 self.logger.debug(self.vapi.cli("show sr localsid"))
678 # create IPv6 packets with SRH (SL=2, SL=1)
679 # send one packet per SL value per packet size
680 # SL=0 packet with localSID End with PSP is dropped
681 count = len(self.pg_packet_sizes)
682 dst_inner = 'a4::1234'
685 # packets with segments-left 2, active segment a3::c4
686 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
688 sidlist=['a5::', 'a4::', 'a3::c4'],
690 # create traffic stream pg0->pg1
691 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
692 self.pg_packet_sizes, count))
694 # packets with segments-left 1, active segment a3::c4
695 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
697 sidlist=['a4::', 'a3::c4', 'a2::'],
699 # add to traffic stream pg0->pg1
700 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
701 self.pg_packet_sizes, count))
703 # send packets and verify received packets
704 # using same comparison function as End (no PSP)
705 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
706 self.compare_rx_tx_packet_End)
708 # assert nothing was received on the other interface (pg2)
709 self.pg2.assert_nothing_captured("mis-directed packet(s)")
711 # log the localsid counters
712 self.logger.info(self.vapi.cli("show sr localsid"))
714 # remove SRv6 localSIDs
715 localsid.remove_vpp_config()
721 self.teardown_interfaces()
723 def test_SRv6_End_X_with_PSP(self):
724 """ Test SRv6 End.X with PSP behavior.
726 # create three interfaces (1 source, 2 destinations)
727 # source and destination interfaces are IPv6 only
728 self.setup_interfaces(ipv6=[True, True, True])
730 # configure FIB entries
731 # a4::/64 via pg1 and pg2
732 route = VppIpRoute(self, "a4::", 64,
733 [VppRoutePath(self.pg1.remote_ip6,
734 self.pg1.sw_if_index,
735 proto=DpoProto.DPO_PROTO_IP6),
736 VppRoutePath(self.pg2.remote_ip6,
737 self.pg2.sw_if_index,
738 proto=DpoProto.DPO_PROTO_IP6)],
740 route.add_vpp_config()
742 # configure SRv6 localSID End with PSP behavior
743 localsid = VppSRv6LocalSID(
744 self, localsid={'addr': 'A3::C4'},
745 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
747 nh_addr6=self.pg1.remote_ip6,
749 sw_if_index=self.pg1.sw_if_index,
752 localsid.add_vpp_config()
754 self.logger.debug(self.vapi.cli("show sr localsid"))
756 # create IPv6 packets with SRH (SL=2, SL=1)
757 # send one packet per SL value per packet size
758 # SL=0 packet with localSID End with PSP is dropped
759 count = len(self.pg_packet_sizes)
760 dst_inner = 'a4::1234'
763 # packets with segments-left 2, active segment a3::
764 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
766 sidlist=['a5::', 'a4::', 'a3::c4'],
768 # create traffic stream pg0->pg1
769 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
770 self.pg_packet_sizes, count))
772 # packets with segments-left 1, active segment a3::
773 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
775 sidlist=['a4::', 'a3::c4', 'a2::'],
777 # add to traffic stream pg0->pg1
778 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
779 self.pg_packet_sizes, count))
781 # send packets and verify received packets
782 # using same comparison function as End with PSP
783 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
784 self.compare_rx_tx_packet_End_PSP)
786 # assert nothing was received on the other interface (pg2)
787 self.pg2.assert_nothing_captured("mis-directed packet(s)")
789 # log the localsid counters
790 self.logger.info(self.vapi.cli("show sr localsid"))
792 # remove SRv6 localSIDs
793 localsid.remove_vpp_config()
799 self.teardown_interfaces()
801 def test_SRv6_End_DX6(self):
802 """ Test SRv6 End.DX6 behavior.
804 # send traffic to one destination interface
805 # source and destination interfaces are IPv6 only
806 self.setup_interfaces(ipv6=[True, True])
808 # configure SRv6 localSID End.DX6 behavior
809 localsid = VppSRv6LocalSID(
810 self, localsid={'addr': 'A3::C4'},
811 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX6,
813 nh_addr6=self.pg1.remote_ip6,
815 sw_if_index=self.pg1.sw_if_index,
818 localsid.add_vpp_config()
820 self.logger.debug(self.vapi.cli("show sr localsid"))
822 # create IPv6 packets with SRH (SL=0)
823 # send one packet per packet size
824 count = len(self.pg_packet_sizes)
825 dst_inner = 'a4::1234' # inner header destination address
828 # packets with SRH, segments-left 0, active segment a3::c4
829 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
831 sidlist=['a3::c4', 'a2::', 'a1::'],
833 # add to traffic stream pg0->pg1
834 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
835 self.pg_packet_sizes, count))
837 # packets without SRH, IPv6 in IPv6
838 # outer IPv6 dest addr is the localsid End.DX6
839 packet_header = self.create_packet_header_IPv6_IPv6(
842 # add to traffic stream pg0->pg1
843 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
844 self.pg_packet_sizes, count))
846 # send packets and verify received packets
847 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
848 self.compare_rx_tx_packet_End_DX6)
850 # log the localsid counters
851 self.logger.info(self.vapi.cli("show sr localsid"))
853 # remove SRv6 localSIDs
854 localsid.remove_vpp_config()
857 self.teardown_interfaces()
859 def test_SRv6_End_DT6(self):
860 """ Test SRv6 End.DT6 behavior.
862 # create three interfaces (1 source, 2 destinations)
863 # all interfaces are IPv6 only
864 # source interface in global FIB (0)
865 # destination interfaces in global and vrf
867 ipt = VppIpTable(self, vrf_1, is_ip6=True)
869 self.setup_interfaces(ipv6=[True, True, True],
870 ipv6_table_id=[0, 0, vrf_1])
872 # configure FIB entries
873 # a4::/64 is reachable
874 # via pg1 in table 0 (global)
875 # and via pg2 in table vrf_1
876 route0 = VppIpRoute(self, "a4::", 64,
877 [VppRoutePath(self.pg1.remote_ip6,
878 self.pg1.sw_if_index,
879 proto=DpoProto.DPO_PROTO_IP6,
883 route0.add_vpp_config()
884 route1 = VppIpRoute(self, "a4::", 64,
885 [VppRoutePath(self.pg2.remote_ip6,
886 self.pg2.sw_if_index,
887 proto=DpoProto.DPO_PROTO_IP6,
891 route1.add_vpp_config()
892 self.logger.debug(self.vapi.cli("show ip6 fib"))
894 # configure SRv6 localSID End.DT6 behavior
896 # fib_table: where the localsid is installed
897 # sw_if_index: in T-variants of localsid this is the vrf table_id
898 localsid = VppSRv6LocalSID(
899 self, localsid={'addr': 'A3::C4'},
900 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT6,
907 localsid.add_vpp_config()
909 self.logger.debug(self.vapi.cli("show sr localsid"))
911 # create IPv6 packets with SRH (SL=0)
912 # send one packet per packet size
913 count = len(self.pg_packet_sizes)
914 dst_inner = 'a4::1234' # inner header destination address
917 # packets with SRH, segments-left 0, active segment a3::c4
918 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
920 sidlist=['a3::c4', 'a2::', 'a1::'],
922 # add to traffic stream pg0->pg1
923 pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
924 self.pg_packet_sizes, count))
926 # packets without SRH, IPv6 in IPv6
927 # outer IPv6 dest addr is the localsid End.DT6
928 packet_header = self.create_packet_header_IPv6_IPv6(
931 # add to traffic stream pg0->pg1
932 pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
933 self.pg_packet_sizes, count))
935 # send packets and verify received packets
936 # using same comparison function as End.DX6
937 self.send_and_verify_pkts(self.pg0, pkts, self.pg2,
938 self.compare_rx_tx_packet_End_DX6)
940 # assert nothing was received on the other interface (pg2)
941 self.pg1.assert_nothing_captured("mis-directed packet(s)")
943 # log the localsid counters
944 self.logger.info(self.vapi.cli("show sr localsid"))
946 # remove SRv6 localSIDs
947 localsid.remove_vpp_config()
953 self.teardown_interfaces()
955 def test_SRv6_End_DX4(self):
956 """ Test SRv6 End.DX4 behavior.
958 # send traffic to one destination interface
959 # source interface is IPv6 only
960 # destination interface is IPv4 only
961 self.setup_interfaces(ipv6=[True, False], ipv4=[False, True])
963 # configure SRv6 localSID End.DX4 behavior
964 localsid = VppSRv6LocalSID(
965 self, localsid={'addr': 'A3::C4'},
966 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX4,
967 nh_addr4=self.pg1.remote_ip4,
970 sw_if_index=self.pg1.sw_if_index,
973 localsid.add_vpp_config()
975 self.logger.debug(self.vapi.cli("show sr localsid"))
977 # send one packet per packet size
978 count = len(self.pg_packet_sizes)
979 dst_inner = '4.1.1.123' # inner header destination address
982 # packets with SRH, segments-left 0, active segment a3::c4
983 packet_header = self.create_packet_header_IPv6_SRH_IPv4(
985 sidlist=['a3::c4', 'a2::', 'a1::'],
987 # add to traffic stream pg0->pg1
988 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
989 self.pg_packet_sizes, count))
991 # packets without SRH, IPv4 in IPv6
992 # outer IPv6 dest addr is the localsid End.DX4
993 packet_header = self.create_packet_header_IPv6_IPv4(
996 # add to traffic stream pg0->pg1
997 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
998 self.pg_packet_sizes, count))
1000 # send packets and verify received packets
1001 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
1002 self.compare_rx_tx_packet_End_DX4)
1004 # log the localsid counters
1005 self.logger.info(self.vapi.cli("show sr localsid"))
1007 # remove SRv6 localSIDs
1008 localsid.remove_vpp_config()
1010 # cleanup interfaces
1011 self.teardown_interfaces()
1013 def test_SRv6_End_DT4(self):
1014 """ Test SRv6 End.DT4 behavior.
1016 # create three interfaces (1 source, 2 destinations)
1017 # source interface is IPv6-only
1018 # destination interfaces are IPv4 only
1019 # source interface in global FIB (0)
1020 # destination interfaces in global and vrf
1022 ipt = VppIpTable(self, vrf_1)
1023 ipt.add_vpp_config()
1024 self.setup_interfaces(ipv6=[True, False, False],
1025 ipv4=[False, True, True],
1026 ipv6_table_id=[0, 0, 0],
1027 ipv4_table_id=[0, 0, vrf_1])
1029 # configure FIB entries
1030 # 4.1.1.0/24 is reachable
1031 # via pg1 in table 0 (global)
1032 # and via pg2 in table vrf_1
1033 route0 = VppIpRoute(self, "4.1.1.0", 24,
1034 [VppRoutePath(self.pg1.remote_ip4,
1035 self.pg1.sw_if_index,
1039 route0.add_vpp_config()
1040 route1 = VppIpRoute(self, "4.1.1.0", 24,
1041 [VppRoutePath(self.pg2.remote_ip4,
1042 self.pg2.sw_if_index,
1043 nh_table_id=vrf_1)],
1046 route1.add_vpp_config()
1047 self.logger.debug(self.vapi.cli("show ip fib"))
1049 # configure SRv6 localSID End.DT6 behavior
1051 # fib_table: where the localsid is installed
1052 # sw_if_index: in T-variants of localsid: vrf table_id
1053 localsid = VppSRv6LocalSID(
1054 self, localsid={'addr': 'A3::C4'},
1055 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT4,
1062 localsid.add_vpp_config()
1064 self.logger.debug(self.vapi.cli("show sr localsid"))
1066 # create IPv6 packets with SRH (SL=0)
1067 # send one packet per packet size
1068 count = len(self.pg_packet_sizes)
1069 dst_inner = '4.1.1.123' # inner header destination address
1072 # packets with SRH, segments-left 0, active segment a3::c4
1073 packet_header = self.create_packet_header_IPv6_SRH_IPv4(
1075 sidlist=['a3::c4', 'a2::', 'a1::'],
1077 # add to traffic stream pg0->pg1
1078 pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
1079 self.pg_packet_sizes, count))
1081 # packets without SRH, IPv6 in IPv6
1082 # outer IPv6 dest addr is the localsid End.DX4
1083 packet_header = self.create_packet_header_IPv6_IPv4(
1086 # add to traffic stream pg0->pg1
1087 pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
1088 self.pg_packet_sizes, count))
1090 # send packets and verify received packets
1091 # using same comparison function as End.DX4
1092 self.send_and_verify_pkts(self.pg0, pkts, self.pg2,
1093 self.compare_rx_tx_packet_End_DX4)
1095 # assert nothing was received on the other interface (pg2)
1096 self.pg1.assert_nothing_captured("mis-directed packet(s)")
1098 # log the localsid counters
1099 self.logger.info(self.vapi.cli("show sr localsid"))
1101 # remove SRv6 localSIDs
1102 localsid.remove_vpp_config()
1104 # remove FIB entries
1107 # cleanup interfaces
1108 self.teardown_interfaces()
1110 def test_SRv6_End_DX2(self):
1111 """ Test SRv6 End.DX2 behavior.
1113 # send traffic to one destination interface
1114 # source interface is IPv6 only
1115 self.setup_interfaces(ipv6=[True, False], ipv4=[False, False])
1117 # configure SRv6 localSID End.DX2 behavior
1118 localsid = VppSRv6LocalSID(
1119 self, localsid={'addr': 'A3::C4'},
1120 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX2,
1124 sw_if_index=self.pg1.sw_if_index,
1127 localsid.add_vpp_config()
1129 self.logger.debug(self.vapi.cli("show sr localsid"))
1131 # send one packet per packet size
1132 count = len(self.pg_packet_sizes)
1135 # packets with SRH, segments-left 0, active segment a3::c4
1136 # L2 has no dot1q header
1137 packet_header = self.create_packet_header_IPv6_SRH_L2(
1138 sidlist=['a3::c4', 'a2::', 'a1::'],
1141 # add to traffic stream pg0->pg1
1142 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1143 self.pg_packet_sizes, count))
1145 # packets with SRH, segments-left 0, active segment a3::c4
1146 # L2 has dot1q header
1147 packet_header = self.create_packet_header_IPv6_SRH_L2(
1148 sidlist=['a3::c4', 'a2::', 'a1::'],
1151 # add to traffic stream pg0->pg1
1152 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1153 self.pg_packet_sizes, count))
1155 # packets without SRH, L2 in IPv6
1156 # outer IPv6 dest addr is the localsid End.DX2
1157 # L2 has no dot1q header
1158 packet_header = self.create_packet_header_IPv6_L2(
1161 # add to traffic stream pg0->pg1
1162 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1163 self.pg_packet_sizes, count))
1165 # packets without SRH, L2 in IPv6
1166 # outer IPv6 dest addr is the localsid End.DX2
1167 # L2 has dot1q header
1168 packet_header = self.create_packet_header_IPv6_L2(
1171 # add to traffic stream pg0->pg1
1172 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1173 self.pg_packet_sizes, count))
1175 # send packets and verify received packets
1176 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
1177 self.compare_rx_tx_packet_End_DX2)
1179 # log the localsid counters
1180 self.logger.info(self.vapi.cli("show sr localsid"))
1182 # remove SRv6 localSIDs
1183 localsid.remove_vpp_config()
1185 # cleanup interfaces
1186 self.teardown_interfaces()
1188 @unittest.skipUnless(0, "PC to fix")
1189 def test_SRv6_T_Insert_Classifier(self):
1190 """ Test SRv6 Transit.Insert behavior (IPv6 only).
1191 steer packets using the classifier
1193 # send traffic to one destination interface
1194 # source and destination are IPv6 only
1195 self.setup_interfaces(ipv6=[False, False, False, True, True])
1197 # configure FIB entries
1198 route = VppIpRoute(self, "a4::", 64,
1199 [VppRoutePath(self.pg4.remote_ip6,
1200 self.pg4.sw_if_index,
1201 proto=DpoProto.DPO_PROTO_IP6)],
1203 route.add_vpp_config()
1205 # configure encaps IPv6 source address
1206 # needs to be done before SR Policy config
1208 self.vapi.cli("set sr encaps source addr a3::")
1211 # configure SRv6 Policy
1212 # Note: segment list order: first -> last
1213 sr_policy = VppSRv6Policy(
1216 sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
1217 weight=1, fib_table=0,
1218 segments=['a4::', 'a5::', 'a6::c7'],
1220 sr_policy.add_vpp_config()
1221 self.sr_policy = sr_policy
1223 # log the sr policies
1224 self.logger.info(self.vapi.cli("show sr policies"))
1226 # add classify table
1227 # mask on dst ip address prefix a7::/8
1228 mask = '{!s:0<16}'.format('ff')
1229 r = self.vapi.classify_add_del_table(
1231 binascii.unhexlify(mask),
1232 match_n_vectors=(len(mask) - 1) // 32 + 1,
1233 current_data_flag=1,
1234 skip_n_vectors=2) # data offset
1235 self.assertIsNotNone(r, 'No response msg for add_del_table')
1236 table_index = r.new_table_index
1238 # add the source routing node as a ip6 inacl netxt node
1239 r = self.vapi.add_node_next('ip6-inacl',
1240 'sr-pl-rewrite-insert')
1241 inacl_next_node_index = r.node_index
1243 match = '{!s:0<16}'.format('a7')
1244 r = self.vapi.classify_add_del_session(
1247 binascii.unhexlify(match),
1248 hit_next_index=inacl_next_node_index,
1250 metadata=0) # sr policy index
1251 self.assertIsNotNone(r, 'No response msg for add_del_session')
1253 # log the classify table used in the steering policy
1254 self.logger.info(self.vapi.cli("show classify table"))
1256 r = self.vapi.input_acl_set_interface(
1258 sw_if_index=self.pg3.sw_if_index,
1259 ip6_table_index=table_index)
1260 self.assertIsNotNone(r,
1261 'No response msg for input_acl_set_interface')
1264 self.logger.info(self.vapi.cli("show inacl type ip6"))
1267 count = len(self.pg_packet_sizes)
1268 dst_inner = 'a7::1234'
1271 # create IPv6 packets without SRH
1272 packet_header = self.create_packet_header_IPv6(dst_inner)
1273 # create traffic stream pg3->pg4
1274 pkts.extend(self.create_stream(self.pg3, self.pg4, packet_header,
1275 self.pg_packet_sizes, count))
1277 # create IPv6 packets with SRH
1278 # packets with segments-left 1, active segment a7::
1279 packet_header = self.create_packet_header_IPv6_SRH(
1280 sidlist=['a8::', 'a7::', 'a6::'],
1282 # create traffic stream pg3->pg4
1283 pkts.extend(self.create_stream(self.pg3, self.pg4, packet_header,
1284 self.pg_packet_sizes, count))
1286 # send packets and verify received packets
1287 self.send_and_verify_pkts(self.pg3, pkts, self.pg4,
1288 self.compare_rx_tx_packet_T_Insert)
1290 # remove the interface l2 input feature
1291 r = self.vapi.input_acl_set_interface(
1293 sw_if_index=self.pg3.sw_if_index,
1294 ip6_table_index=table_index)
1295 self.assertIsNotNone(r,
1296 'No response msg for input_acl_set_interface')
1298 # log the ip6 inacl after cleaning
1299 self.logger.info(self.vapi.cli("show inacl type ip6"))
1301 # log the localsid counters
1302 self.logger.info(self.vapi.cli("show sr localsid"))
1304 # remove classifier SR steering
1305 # classifier_steering.remove_vpp_config()
1306 self.logger.info(self.vapi.cli("show sr steering policies"))
1308 # remove SR Policies
1309 self.sr_policy.remove_vpp_config()
1310 self.logger.info(self.vapi.cli("show sr policies"))
1312 # remove classify session and table
1313 r = self.vapi.classify_add_del_session(
1316 binascii.unhexlify(match))
1317 self.assertIsNotNone(r, 'No response msg for add_del_session')
1319 r = self.vapi.classify_add_del_table(
1321 binascii.unhexlify(mask),
1322 table_index=table_index)
1323 self.assertIsNotNone(r, 'No response msg for add_del_table')
1325 self.logger.info(self.vapi.cli("show classify table"))
1327 # remove FIB entries
1330 # cleanup interfaces
1331 self.teardown_interfaces()
1333 def compare_rx_tx_packet_T_Encaps(self, tx_pkt, rx_pkt):
1334 """ Compare input and output packet after passing T.Encaps
1336 :param tx_pkt: transmitted packet
1337 :param rx_pkt: received packet
1339 # T.Encaps updates the headers as follows:
1340 # SR Policy seglist (S3, S2, S1)
1341 # SR Policy source C
1344 # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(A, B2)
1346 # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1)
1347 # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(a, B2)SRH(B3, B2, B1; SL=1)
1349 # get first (outer) IPv6 header of rx'ed packet
1350 rx_ip = rx_pkt.getlayer(IPv6)
1353 tx_ip = tx_pkt.getlayer(IPv6)
1355 # expected segment-list
1356 seglist = self.sr_policy.segments
1357 # reverse list to get order as in SRH
1358 tx_seglist = seglist[::-1]
1360 # get source address of SR Policy
1361 sr_policy_source = self.sr_policy.source
1363 # rx'ed packet should have SRH
1364 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1366 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1368 # received ip.src should be equal to SR Policy source
1369 self.assertEqual(rx_ip.src, sr_policy_source)
1370 # received ip.dst should be equal to expected sidlist[lastentry]
1371 self.assertEqual(rx_ip.dst, tx_seglist[-1])
1372 # rx'ed seglist should be equal to expected seglist
1373 self.assertEqual(rx_srh.addresses, tx_seglist)
1374 # segleft should be equal to size expected seglist-1
1375 self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1376 # segleft should be equal to lastentry
1377 self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1379 # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1380 # except for the hop-limit field
1381 # -> update tx'ed hlim to the expected hlim
1382 tx_ip.hlim = tx_ip.hlim - 1
1384 self.assertEqual(rx_srh.payload, tx_ip)
1386 self.logger.debug("packet verification: SUCCESS")
1388 def compare_rx_tx_packet_T_Encaps_IPv4(self, tx_pkt, rx_pkt):
1389 """ Compare input and output packet after passing T.Encaps for IPv4
1391 :param tx_pkt: transmitted packet
1392 :param rx_pkt: received packet
1394 # T.Encaps for IPv4 updates the headers as follows:
1395 # SR Policy seglist (S3, S2, S1)
1396 # SR Policy source C
1399 # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv4(A, B2)
1401 # get first (outer) IPv6 header of rx'ed packet
1402 rx_ip = rx_pkt.getlayer(IPv6)
1405 tx_ip = tx_pkt.getlayer(IP)
1407 # expected segment-list
1408 seglist = self.sr_policy.segments
1409 # reverse list to get order as in SRH
1410 tx_seglist = seglist[::-1]
1412 # get source address of SR Policy
1413 sr_policy_source = self.sr_policy.source
1415 # checks common to cases tx with and without SRH
1416 # rx'ed packet should have SRH and IPv4 header
1417 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1418 self.assertTrue(rx_ip.payload.haslayer(IP))
1420 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1422 # received ip.src should be equal to SR Policy source
1423 self.assertEqual(rx_ip.src, sr_policy_source)
1424 # received ip.dst should be equal to sidlist[lastentry]
1425 self.assertEqual(rx_ip.dst, tx_seglist[-1])
1426 # rx'ed seglist should be equal to seglist
1427 self.assertEqual(rx_srh.addresses, tx_seglist)
1428 # segleft should be equal to size seglist-1
1429 self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1430 # segleft should be equal to lastentry
1431 self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1433 # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1434 # except for the ttl field and ip checksum
1435 # -> adjust tx'ed ttl to expected ttl
1436 tx_ip.ttl = tx_ip.ttl - 1
1437 # -> set tx'ed ip checksum to None and let scapy recompute
1439 # read back the pkt (with str()) to force computing these fields
1440 # probably other ways to accomplish this are possible
1441 tx_ip = IP(scapy.compat.raw(tx_ip))
1443 self.assertEqual(rx_srh.payload, tx_ip)
1445 self.logger.debug("packet verification: SUCCESS")
1447 def compare_rx_tx_packet_T_Encaps_L2(self, tx_pkt, rx_pkt):
1448 """ Compare input and output packet after passing T.Encaps for L2
1450 :param tx_pkt: transmitted packet
1451 :param rx_pkt: received packet
1453 # T.Encaps for L2 updates the headers as follows:
1454 # SR Policy seglist (S3, S2, S1)
1455 # SR Policy source C
1458 # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)L2
1460 # get first (outer) IPv6 header of rx'ed packet
1461 rx_ip = rx_pkt.getlayer(IPv6)
1464 tx_ether = tx_pkt.getlayer(Ether)
1466 # expected segment-list
1467 seglist = self.sr_policy.segments
1468 # reverse list to get order as in SRH
1469 tx_seglist = seglist[::-1]
1471 # get source address of SR Policy
1472 sr_policy_source = self.sr_policy.source
1474 # rx'ed packet should have SRH
1475 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1477 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1479 # received ip.src should be equal to SR Policy source
1480 self.assertEqual(rx_ip.src, sr_policy_source)
1481 # received ip.dst should be equal to sidlist[lastentry]
1482 self.assertEqual(rx_ip.dst, tx_seglist[-1])
1483 # rx'ed seglist should be equal to seglist
1484 self.assertEqual(rx_srh.addresses, tx_seglist)
1485 # segleft should be equal to size seglist-1
1486 self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1487 # segleft should be equal to lastentry
1488 self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1489 # nh should be "No Next Header" (59)
1490 self.assertEqual(rx_srh.nh, 59)
1492 # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1493 self.assertEqual(Ether(scapy.compat.raw(rx_srh.payload)), tx_ether)
1495 self.logger.debug("packet verification: SUCCESS")
1497 def compare_rx_tx_packet_T_Insert(self, tx_pkt, rx_pkt):
1498 """ Compare input and output packet after passing T.Insert
1500 :param tx_pkt: transmitted packet
1501 :param rx_pkt: received packet
1503 # T.Insert updates the headers as follows:
1506 # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)
1508 # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1)
1509 # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)SRH(B3, B2, B1; SL=1)
1511 # get first (outer) IPv6 header of rx'ed packet
1512 rx_ip = rx_pkt.getlayer(IPv6)
1517 rx_udp = rx_pkt[UDP]
1519 tx_ip = tx_pkt.getlayer(IPv6)
1522 # some packets have been tx'ed with an SRH, some without it
1523 # get SRH if tx'ed packet has it
1524 if tx_pkt.haslayer(IPv6ExtHdrSegmentRouting):
1525 tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1526 tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1527 tx_udp = tx_pkt[UDP]
1529 # expected segment-list (make copy of SR Policy segment list)
1530 seglist = self.sr_policy.segments[:]
1531 # expected seglist has initial dest addr as last segment
1532 seglist.append(tx_ip.dst)
1533 # reverse list to get order as in SRH
1534 tx_seglist = seglist[::-1]
1536 # get source address of SR Policy
1537 sr_policy_source = self.sr_policy.source
1539 # checks common to cases tx with and without SRH
1540 # rx'ed packet should have SRH and only one IPv6 header
1541 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1542 self.assertFalse(rx_ip.payload.haslayer(IPv6))
1544 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1546 # rx'ed ip.src should be equal to tx'ed ip.src
1547 self.assertEqual(rx_ip.src, tx_ip.src)
1548 # rx'ed ip.dst should be equal to sidlist[lastentry]
1549 self.assertEqual(rx_ip.dst, tx_seglist[-1])
1551 # rx'ed seglist should be equal to expected seglist
1552 self.assertEqual(rx_srh.addresses, tx_seglist)
1553 # segleft should be equal to size(expected seglist)-1
1554 self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1555 # segleft should be equal to lastentry
1556 self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1558 if tx_srh: # packet was tx'ed with SRH
1559 # packet should have 2nd SRH
1560 self.assertTrue(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting))
1562 rx_srh2 = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting, 2)
1564 # rx'ed srh2.addresses should be equal to tx'ed srh.addresses
1565 self.assertEqual(rx_srh2.addresses, tx_srh.addresses)
1566 # rx'ed srh2.segleft should be equal to tx'ed srh.segleft
1567 self.assertEqual(rx_srh2.segleft, tx_srh.segleft)
1568 # rx'ed srh2.lastentry should be equal to tx'ed srh.lastentry
1569 self.assertEqual(rx_srh2.lastentry, tx_srh.lastentry)
1571 else: # packet was tx'ed without SRH
1572 # rx packet should have no other SRH
1573 self.assertFalse(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting))
1575 # UDP layer should be unchanged
1576 self.assertEqual(rx_udp, tx_udp)
1578 self.logger.debug("packet verification: SUCCESS")
1580 def compare_rx_tx_packet_End(self, tx_pkt, rx_pkt):
1581 """ Compare input and output packet after passing End (without PSP)
1583 :param tx_pkt: transmitted packet
1584 :param rx_pkt: received packet
1586 # End (no PSP) updates the headers as follows:
1588 # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2)
1589 # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1591 # get first (outer) IPv6 header of rx'ed packet
1592 rx_ip = rx_pkt.getlayer(IPv6)
1595 rx_udp = rx_pkt[UDP]
1597 tx_ip = tx_pkt.getlayer(IPv6)
1598 # we know the packet has been tx'ed
1599 # with an inner IPv6 header and an SRH
1600 tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1601 tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1602 tx_udp = tx_pkt[UDP]
1604 # common checks, regardless of tx segleft value
1605 # rx'ed packet should have 2nd IPv6 header
1606 self.assertTrue(rx_ip.payload.haslayer(IPv6))
1607 # get second (inner) IPv6 header
1608 rx_ip2 = rx_pkt.getlayer(IPv6, 2)
1610 if tx_ip.segleft > 0:
1611 # SRH should NOT have been popped:
1612 # End SID without PSP does not pop SRH if segleft>0
1613 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1614 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1616 # received ip.src should be equal to expected ip.src
1617 self.assertEqual(rx_ip.src, tx_ip.src)
1618 # sidlist should be unchanged
1619 self.assertEqual(rx_srh.addresses, tx_srh.addresses)
1620 # segleft should have been decremented
1621 self.assertEqual(rx_srh.segleft, tx_srh.segleft-1)
1622 # received ip.dst should be equal to sidlist[segleft]
1623 self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
1624 # lastentry should be unchanged
1625 self.assertEqual(rx_srh.lastentry, tx_srh.lastentry)
1626 # inner IPv6 packet (ip2) should be unchanged
1627 self.assertEqual(rx_ip2.src, tx_ip2.src)
1628 self.assertEqual(rx_ip2.dst, tx_ip2.dst)
1629 # else: # tx_ip.segleft == 0
1630 # TODO: Does this work with 2 SRHs in ingress packet?
1632 # UDP layer should be unchanged
1633 self.assertEqual(rx_udp, tx_udp)
1635 self.logger.debug("packet verification: SUCCESS")
1637 def compare_rx_tx_packet_End_PSP(self, tx_pkt, rx_pkt):
1638 """ Compare input and output packet after passing End with PSP
1640 :param tx_pkt: transmitted packet
1641 :param rx_pkt: received packet
1643 # End (PSP) updates the headers as follows:
1644 # IPv6 + SRH (SL>1):
1645 # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2)
1646 # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1647 # IPv6 + SRH (SL=1):
1648 # in: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1651 # get first (outer) IPv6 header of rx'ed packet
1652 rx_ip = rx_pkt.getlayer(IPv6)
1655 rx_udp = rx_pkt[UDP]
1657 tx_ip = tx_pkt.getlayer(IPv6)
1658 # we know the packet has been tx'ed
1659 # with an inner IPv6 header and an SRH
1660 tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1661 tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1662 tx_udp = tx_pkt[UDP]
1664 # common checks, regardless of tx segleft value
1665 self.assertTrue(rx_ip.payload.haslayer(IPv6))
1666 rx_ip2 = rx_pkt.getlayer(IPv6, 2)
1667 # inner IPv6 packet (ip2) should be unchanged
1668 self.assertEqual(rx_ip2.src, tx_ip2.src)
1669 self.assertEqual(rx_ip2.dst, tx_ip2.dst)
1671 if tx_ip.segleft > 1:
1672 # SRH should NOT have been popped:
1673 # End SID with PSP does not pop SRH if segleft>1
1674 # rx'ed packet should have SRH
1675 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1676 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1678 # received ip.src should be equal to expected ip.src
1679 self.assertEqual(rx_ip.src, tx_ip.src)
1680 # sidlist should be unchanged
1681 self.assertEqual(rx_srh.addresses, tx_srh.addresses)
1682 # segleft should have been decremented
1683 self.assertEqual(rx_srh.segleft, tx_srh.segleft-1)
1684 # received ip.dst should be equal to sidlist[segleft]
1685 self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
1686 # lastentry should be unchanged
1687 self.assertEqual(rx_srh.lastentry, tx_srh.lastentry)
1689 else: # tx_ip.segleft <= 1
1690 # SRH should have been popped:
1691 # End SID with PSP and segleft=1 pops SRH
1692 # the two IPv6 headers are still present
1693 # outer IPv6 header has DA == last segment of popped SRH
1694 # SRH should not be present
1695 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1696 # outer IPv6 header ip.src should be equal to tx'ed ip.src
1697 self.assertEqual(rx_ip.src, tx_ip.src)
1698 # outer IPv6 header ip.dst should be = to tx'ed sidlist[segleft-1]
1699 self.assertEqual(rx_ip.dst, tx_srh.addresses[tx_srh.segleft-1])
1701 # UDP layer should be unchanged
1702 self.assertEqual(rx_udp, tx_udp)
1704 self.logger.debug("packet verification: SUCCESS")
1706 def compare_rx_tx_packet_End_DX6(self, tx_pkt, rx_pkt):
1707 """ Compare input and output packet after passing End.DX6
1709 :param tx_pkt: transmitted packet
1710 :param rx_pkt: received packet
1712 # End.DX6 updates the headers as follows:
1713 # IPv6 + SRH (SL=0):
1714 # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv6(B, D)
1717 # in: IPv6(A, S3)IPv6(B, D)
1720 # get first (outer) IPv6 header of rx'ed packet
1721 rx_ip = rx_pkt.getlayer(IPv6)
1723 tx_ip = tx_pkt.getlayer(IPv6)
1724 tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1726 # verify if rx'ed packet has no SRH
1727 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1729 # the whole rx_ip pkt should be equal to tx_ip2
1730 # except for the hlim field
1731 # -> adjust tx'ed hlim to expected hlim
1732 tx_ip2.hlim = tx_ip2.hlim - 1
1734 self.assertEqual(rx_ip, tx_ip2)
1736 self.logger.debug("packet verification: SUCCESS")
1738 def compare_rx_tx_packet_End_DX4(self, tx_pkt, rx_pkt):
1739 """ Compare input and output packet after passing End.DX4
1741 :param tx_pkt: transmitted packet
1742 :param rx_pkt: received packet
1744 # End.DX4 updates the headers as follows:
1745 # IPv6 + SRH (SL=0):
1746 # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv4(B, D)
1749 # in: IPv6(A, S3)IPv4(B, D)
1752 # get IPv4 header of rx'ed packet
1753 rx_ip = rx_pkt.getlayer(IP)
1755 tx_ip = tx_pkt.getlayer(IPv6)
1756 tx_ip2 = tx_pkt.getlayer(IP)
1758 # verify if rx'ed packet has no SRH
1759 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1761 # the whole rx_ip pkt should be equal to tx_ip2
1762 # except for the ttl field and ip checksum
1763 # -> adjust tx'ed ttl to expected ttl
1764 tx_ip2.ttl = tx_ip2.ttl - 1
1765 # -> set tx'ed ip checksum to None and let scapy recompute
1766 tx_ip2.chksum = None
1767 # read back the pkt (with str()) to force computing these fields
1768 # probably other ways to accomplish this are possible
1769 tx_ip2 = IP(scapy.compat.raw(tx_ip2))
1771 self.assertEqual(rx_ip, tx_ip2)
1773 self.logger.debug("packet verification: SUCCESS")
1775 def compare_rx_tx_packet_End_DX2(self, tx_pkt, rx_pkt):
1776 """ Compare input and output packet after passing End.DX2
1778 :param tx_pkt: transmitted packet
1779 :param rx_pkt: received packet
1781 # End.DX2 updates the headers as follows:
1782 # IPv6 + SRH (SL=0):
1783 # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)L2
1789 # get IPv4 header of rx'ed packet
1790 rx_eth = rx_pkt.getlayer(Ether)
1792 tx_ip = tx_pkt.getlayer(IPv6)
1793 # we can't just get the 2nd Ether layer
1794 # get the Raw content and dissect it as Ether
1795 tx_eth1 = Ether(scapy.compat.raw(tx_pkt[Raw]))
1797 # verify if rx'ed packet has no SRH
1798 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1800 # the whole rx_eth pkt should be equal to tx_eth1
1801 self.assertEqual(rx_eth, tx_eth1)
1803 self.logger.debug("packet verification: SUCCESS")
1805 def create_stream(self, src_if, dst_if, packet_header, packet_sizes,
1807 """Create SRv6 input packet stream for defined interface.
1809 :param VppInterface src_if: Interface to create packet stream for
1810 :param VppInterface dst_if: destination interface of packet stream
1811 :param packet_header: Layer3 scapy packet headers,
1812 L2 is added when not provided,
1813 Raw(payload) with packet_info is added
1814 :param list packet_sizes: packet stream pckt sizes,sequentially applied
1815 to packets in stream have
1816 :param int count: number of packets in packet stream
1817 :return: list of packets
1819 self.logger.info("Creating packets")
1821 for i in range(0, count-1):
1822 payload_info = self.create_packet_info(src_if, dst_if)
1824 "Creating packet with index %d" % (payload_info.index))
1825 payload = self.info_to_payload(payload_info)
1826 # add L2 header if not yet provided in packet_header
1827 if packet_header.getlayer(0).name == 'Ethernet':
1828 p = (packet_header /
1831 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
1834 size = packet_sizes[i % len(packet_sizes)]
1835 self.logger.debug("Packet size %d" % (size))
1836 self.extend_packet(p, size)
1837 # we need to store the packet with the automatic fields computed
1838 # read back the dumped packet (with str())
1839 # to force computing these fields
1840 # probably other ways are possible
1841 p = Ether(scapy.compat.raw(p))
1842 payload_info.data = p.copy()
1843 self.logger.debug(ppp("Created packet:", p))
1845 self.logger.info("Done creating packets")
1848 def send_and_verify_pkts(self, input, pkts, output, compare_func):
1849 """Send packets and verify received packets using compare_func
1851 :param input: ingress interface of DUT
1852 :param pkts: list of packets to transmit
1853 :param output: egress interface of DUT
1854 :param compare_func: function to compare in and out packets
1856 # add traffic stream to input interface
1857 input.add_stream(pkts)
1859 # enable capture on all interfaces
1860 self.pg_enable_capture(self.pg_interfaces)
1863 self.logger.info("Starting traffic")
1866 # get output capture
1867 self.logger.info("Getting packet capture")
1868 capture = output.get_capture()
1870 # assert nothing was captured on input interface
1871 input.assert_nothing_captured()
1873 # verify captured packets
1874 self.verify_captured_pkts(output, capture, compare_func)
1876 def create_packet_header_IPv6(self, dst):
1877 """Create packet header: IPv6 header, UDP header
1879 :param dst: IPv6 destination address
1881 IPv6 source address is 1234::1
1882 UDP source port and destination port are 1234
1885 p = (IPv6(src='1234::1', dst=dst) /
1886 UDP(sport=1234, dport=1234))
1889 def create_packet_header_IPv6_SRH(self, sidlist, segleft):
1890 """Create packet header: IPv6 header with SRH, UDP header
1892 :param list sidlist: segment list
1893 :param int segleft: segments-left field value
1895 IPv6 destination address is set to sidlist[segleft]
1896 IPv6 source addresses are 1234::1 and 4321::1
1897 UDP source port and destination port are 1234
1900 p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1901 IPv6ExtHdrSegmentRouting(addresses=sidlist) /
1902 UDP(sport=1234, dport=1234))
1905 def create_packet_header_IPv6_SRH_IPv6(self, dst, sidlist, segleft):
1906 """Create packet header: IPv6 encapsulated in SRv6:
1907 IPv6 header with SRH, IPv6 header, UDP header
1909 :param ipv6address dst: inner IPv6 destination address
1910 :param list sidlist: segment list of outer IPv6 SRH
1911 :param int segleft: segments-left field of outer IPv6 SRH
1913 Outer IPv6 destination address is set to sidlist[segleft]
1914 IPv6 source addresses are 1234::1 and 4321::1
1915 UDP source port and destination port are 1234
1918 p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1919 IPv6ExtHdrSegmentRouting(addresses=sidlist,
1920 segleft=segleft, nh=41) /
1921 IPv6(src='4321::1', dst=dst) /
1922 UDP(sport=1234, dport=1234))
1925 def create_packet_header_IPv6_IPv6(self, dst_inner, dst_outer):
1926 """Create packet header: IPv6 encapsulated in IPv6:
1927 IPv6 header, IPv6 header, UDP header
1929 :param ipv6address dst_inner: inner IPv6 destination address
1930 :param ipv6address dst_outer: outer IPv6 destination address
1932 IPv6 source addresses are 1234::1 and 4321::1
1933 UDP source port and destination port are 1234
1936 p = (IPv6(src='1234::1', dst=dst_outer) /
1937 IPv6(src='4321::1', dst=dst_inner) /
1938 UDP(sport=1234, dport=1234))
1941 def create_packet_header_IPv6_SRH_SRH_IPv6(self, dst, sidlist1, segleft1,
1942 sidlist2, segleft2):
1943 """Create packet header: IPv6 encapsulated in SRv6 with 2 SRH:
1944 IPv6 header with SRH, 2nd SRH, IPv6 header, UDP header
1946 :param ipv6address dst: inner IPv6 destination address
1947 :param list sidlist1: segment list of outer IPv6 SRH
1948 :param int segleft1: segments-left field of outer IPv6 SRH
1949 :param list sidlist2: segment list of inner IPv6 SRH
1950 :param int segleft2: segments-left field of inner IPv6 SRH
1952 Outer IPv6 destination address is set to sidlist[segleft]
1953 IPv6 source addresses are 1234::1 and 4321::1
1954 UDP source port and destination port are 1234
1957 p = (IPv6(src='1234::1', dst=sidlist1[segleft1]) /
1958 IPv6ExtHdrSegmentRouting(addresses=sidlist1,
1959 segleft=segleft1, nh=43) /
1960 IPv6ExtHdrSegmentRouting(addresses=sidlist2,
1961 segleft=segleft2, nh=41) /
1962 IPv6(src='4321::1', dst=dst) /
1963 UDP(sport=1234, dport=1234))
1966 def create_packet_header_IPv4(self, dst):
1967 """Create packet header: IPv4 header, UDP header
1969 :param dst: IPv4 destination address
1971 IPv4 source address is 123.1.1.1
1972 UDP source port and destination port are 1234
1975 p = (IP(src='123.1.1.1', dst=dst) /
1976 UDP(sport=1234, dport=1234))
1979 def create_packet_header_IPv6_IPv4(self, dst_inner, dst_outer):
1980 """Create packet header: IPv4 encapsulated in IPv6:
1981 IPv6 header, IPv4 header, UDP header
1983 :param ipv4address dst_inner: inner IPv4 destination address
1984 :param ipv6address dst_outer: outer IPv6 destination address
1986 IPv6 source address is 1234::1
1987 IPv4 source address is 123.1.1.1
1988 UDP source port and destination port are 1234
1991 p = (IPv6(src='1234::1', dst=dst_outer) /
1992 IP(src='123.1.1.1', dst=dst_inner) /
1993 UDP(sport=1234, dport=1234))
1996 def create_packet_header_IPv6_SRH_IPv4(self, dst, sidlist, segleft):
1997 """Create packet header: IPv4 encapsulated in SRv6:
1998 IPv6 header with SRH, IPv4 header, UDP header
2000 :param ipv4address dst: inner IPv4 destination address
2001 :param list sidlist: segment list of outer IPv6 SRH
2002 :param int segleft: segments-left field of outer IPv6 SRH
2004 Outer IPv6 destination address is set to sidlist[segleft]
2005 IPv6 source address is 1234::1
2006 IPv4 source address is 123.1.1.1
2007 UDP source port and destination port are 1234
2010 p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
2011 IPv6ExtHdrSegmentRouting(addresses=sidlist,
2012 segleft=segleft, nh=4) /
2013 IP(src='123.1.1.1', dst=dst) /
2014 UDP(sport=1234, dport=1234))
2017 def create_packet_header_L2(self, vlan=0):
2018 """Create packet header: L2 header
2020 :param vlan: if vlan!=0 then add 802.1q header
2022 # Note: the dst addr ('00:55:44:33:22:11') is used in
2023 # the compare function compare_rx_tx_packet_T_Encaps_L2
2024 # to detect presence of L2 in SRH payload
2025 p = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
2026 etype = 0x8137 # IPX
2029 p /= Dot1Q(vlan=vlan, type=etype)
2034 def create_packet_header_IPv6_SRH_L2(self, sidlist, segleft, vlan=0):
2035 """Create packet header: L2 encapsulated in SRv6:
2036 IPv6 header with SRH, L2
2038 :param list sidlist: segment list of outer IPv6 SRH
2039 :param int segleft: segments-left field of outer IPv6 SRH
2040 :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
2042 Outer IPv6 destination address is set to sidlist[segleft]
2043 IPv6 source address is 1234::1
2045 eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
2046 etype = 0x8137 # IPX
2049 eth /= Dot1Q(vlan=vlan, type=etype)
2053 p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
2054 IPv6ExtHdrSegmentRouting(addresses=sidlist,
2055 segleft=segleft, nh=59) /
2059 def create_packet_header_IPv6_L2(self, dst_outer, vlan=0):
2060 """Create packet header: L2 encapsulated in IPv6:
2063 :param ipv6address dst_outer: outer IPv6 destination address
2064 :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
2066 eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
2067 etype = 0x8137 # IPX
2070 eth /= Dot1Q(vlan=vlan, type=etype)
2074 p = (IPv6(src='1234::1', dst=dst_outer, nh=59) / eth)
2077 def get_payload_info(self, packet):
2078 """ Extract the payload_info from the packet
2080 # in most cases, payload_info is in packet[Raw]
2081 # but packet[Raw] gives the complete payload
2082 # (incl L2 header) for the T.Encaps L2 case
2084 payload_info = self.payload_to_info(packet[Raw])
2087 # remote L2 header from packet[Raw]:
2088 # take packet[Raw], convert it to an Ether layer
2089 # and then extract Raw from it
2090 payload_info = self.payload_to_info(
2091 Ether(scapy.compat.r(packet[Raw]))[Raw])
2095 def verify_captured_pkts(self, dst_if, capture, compare_func):
2097 Verify captured packet stream for specified interface.
2098 Compare ingress with egress packets using the specified compare fn
2100 :param dst_if: egress interface of DUT
2101 :param capture: captured packets
2102 :param compare_func: function to compare in and out packet
2104 self.logger.info("Verifying capture on interface %s using function %s"
2105 % (dst_if.name, compare_func.__name__))
2108 for i in self.pg_interfaces:
2109 last_info[i.sw_if_index] = None
2110 dst_sw_if_index = dst_if.sw_if_index
2112 for packet in capture:
2114 # extract payload_info from packet's payload
2115 payload_info = self.get_payload_info(packet)
2116 packet_index = payload_info.index
2118 self.logger.debug("Verifying packet with index %d"
2120 # packet should have arrived on the expected interface
2121 self.assertEqual(payload_info.dst, dst_sw_if_index)
2123 "Got packet on interface %s: src=%u (idx=%u)" %
2124 (dst_if.name, payload_info.src, packet_index))
2126 # search for payload_info with same src and dst if_index
2127 # this will give us the transmitted packet
2128 next_info = self.get_next_packet_info_for_interface2(
2129 payload_info.src, dst_sw_if_index,
2130 last_info[payload_info.src])
2131 last_info[payload_info.src] = next_info
2132 # next_info should not be None
2133 self.assertTrue(next_info is not None)
2134 # index of tx and rx packets should be equal
2135 self.assertEqual(packet_index, next_info.index)
2136 # data field of next_info contains the tx packet
2137 txed_packet = next_info.data
2139 self.logger.debug(ppp("Transmitted packet:",
2140 txed_packet)) # ppp=Pretty Print Packet
2142 self.logger.debug(ppp("Received packet:", packet))
2144 # compare rcvd packet with expected packet using compare_func
2145 compare_func(txed_packet, packet)
2148 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2151 # have all expected packets arrived?
2152 for i in self.pg_interfaces:
2153 remaining_packet = self.get_next_packet_info_for_interface2(
2154 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
2155 self.assertTrue(remaining_packet is None,
2156 "Interface %s: Packet expected from interface %s "
2157 "didn't arrive" % (dst_if.name, i.name))
2160 if __name__ == '__main__':
2161 unittest.main(testRunner=VppTestRunner)