4 from socket import AF_INET6
6 from framework import VppTestCase, VppTestRunner
7 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
8 from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \
9 SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes
11 from scapy.packet import Raw
12 from scapy.layers.l2 import Ether, Dot1Q
13 from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
14 from scapy.layers.inet import IP, UDP
16 from scapy.utils import inet_pton, inet_ntop
21 class TestSRv6(VppTestCase):
22 """ SRv6 Test Case """
26 super(TestSRv6, self).setUpClass()
29 """ Perform test setup before each test case.
31 super(TestSRv6, self).setUp()
33 # packet sizes, inclusive L2 overhead
34 self.pg_packet_sizes = [64, 512, 1518, 9018]
37 self.reset_packet_infos()
40 """ Clean up test setup after each test case.
42 self.teardown_interfaces()
44 super(TestSRv6, self).tearDown()
46 def configure_interface(self,
48 ipv6=False, ipv4=False,
49 ipv6_table_id=0, ipv4_table_id=0):
50 """ Configure interface.
51 :param ipv6: configure IPv6 on interface
52 :param ipv4: configure IPv4 on interface
53 :param ipv6_table_id: FIB table_id for IPv6
54 :param ipv4_table_id: FIB table_id for IPv4
56 self.logger.debug("Configuring interface %s" % (interface.name))
58 self.logger.debug("Configuring IPv6")
59 interface.set_table_ip6(ipv6_table_id)
60 interface.config_ip6()
61 interface.resolve_ndp(timeout=5)
63 self.logger.debug("Configuring IPv4")
64 interface.set_table_ip4(ipv4_table_id)
65 interface.config_ip4()
66 interface.resolve_arp()
69 def setup_interfaces(self, ipv6=[], ipv4=[],
70 ipv6_table_id=[], ipv4_table_id=[]):
71 """ Create and configure interfaces.
73 :param ipv6: list of interface IPv6 capabilities
74 :param ipv4: list of interface IPv4 capabilities
75 :param ipv6_table_id: list of intf IPv6 FIB table_ids
76 :param ipv4_table_id: list of intf IPv4 FIB table_ids
77 :returns: List of created interfaces.
79 # how many interfaces?
84 self.logger.debug("Creating and configuring %d interfaces" % (count))
86 # fill up ipv6 and ipv4 lists if needed
87 # not enabled (False) is the default
89 ipv6 += (count - len(ipv6)) * [False]
91 ipv4 += (count - len(ipv4)) * [False]
93 # fill up table_id lists if needed
94 # table_id 0 (global) is the default
95 if len(ipv6_table_id) < count:
96 ipv6_table_id += (count - len(ipv6_table_id)) * [0]
97 if len(ipv4_table_id) < count:
98 ipv4_table_id += (count - len(ipv4_table_id)) * [0]
100 # create 'count' pg interfaces
101 self.create_pg_interfaces(range(count))
103 # setup all interfaces
104 for i in range(count):
105 intf = self.pg_interfaces[i]
106 self.configure_interface(intf,
108 ipv6_table_id[i], ipv4_table_id[i])
111 self.logger.debug(self.vapi.cli("show ip6 neighbors"))
113 self.logger.debug(self.vapi.cli("show ip arp"))
114 self.logger.debug(self.vapi.cli("show interface"))
115 self.logger.debug(self.vapi.cli("show hardware"))
117 return self.pg_interfaces
119 def teardown_interfaces(self):
120 """ Unconfigure and bring down interface.
122 self.logger.debug("Tearing down interfaces")
123 # tear down all interfaces
124 # AFAIK they cannot be deleted
125 for i in self.pg_interfaces:
126 self.logger.debug("Tear down interface %s" % (i.name))
130 def test_SRv6_T_Encaps(self):
131 """ Test SRv6 Transit.Encaps behavior for IPv6.
133 # send traffic to one destination interface
134 # source and destination are IPv6 only
135 self.setup_interfaces(ipv6=[True, True])
137 # configure FIB entries
138 route = VppIpRoute(self, "a4::", 64,
139 [VppRoutePath(self.pg1.remote_ip6,
140 self.pg1.sw_if_index,
141 proto=DpoProto.DPO_PROTO_IP6)],
143 route.add_vpp_config()
145 # configure encaps IPv6 source address
146 # needs to be done before SR Policy config
148 self.vapi.cli("set sr encaps source addr a3::")
151 # configure SRv6 Policy
152 # Note: segment list order: first -> last
153 sr_policy = VppSRv6Policy(
156 sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
157 weight=1, fib_table=0,
158 segments=['a4::', 'a5::', 'a6::c7'],
160 sr_policy.add_vpp_config()
161 self.sr_policy = sr_policy
163 # log the sr policies
164 self.logger.info(self.vapi.cli("show sr policies"))
166 # steer IPv6 traffic to a7::/64 into SRv6 Policy
167 # use the bsid of the above self.sr_policy
168 pol_steering = VppSRv6Steering(
170 bsid=self.sr_policy.bsid,
171 prefix="a7::", mask_width=64,
172 traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
173 sr_policy_index=0, table_id=0,
175 pol_steering.add_vpp_config()
177 # log the sr steering policies
178 self.logger.info(self.vapi.cli("show sr steering policies"))
181 count = len(self.pg_packet_sizes)
182 dst_inner = 'a7::1234'
185 # create IPv6 packets without SRH
186 packet_header = self.create_packet_header_IPv6(dst_inner)
187 # create traffic stream pg0->pg1
188 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
189 self.pg_packet_sizes, count))
191 # create IPv6 packets with SRH
192 # packets with segments-left 1, active segment a7::
193 packet_header = self.create_packet_header_IPv6_SRH(
194 sidlist=['a8::', 'a7::', 'a6::'],
196 # create traffic stream pg0->pg1
197 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
198 self.pg_packet_sizes, count))
200 # create IPv6 packets with SRH and IPv6
201 # packets with segments-left 1, active segment a7::
202 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
204 sidlist=['a8::', 'a7::', 'a6::'],
206 # create traffic stream pg0->pg1
207 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
208 self.pg_packet_sizes, count))
210 # send packets and verify received packets
211 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
212 self.compare_rx_tx_packet_T_Encaps)
214 # log the localsid counters
215 self.logger.info(self.vapi.cli("show sr localsid"))
218 pol_steering.remove_vpp_config()
219 self.logger.info(self.vapi.cli("show sr steering policies"))
222 self.sr_policy.remove_vpp_config()
223 self.logger.info(self.vapi.cli("show sr policies"))
229 self.teardown_interfaces()
231 def test_SRv6_T_Insert(self):
232 """ Test SRv6 Transit.Insert behavior (IPv6 only).
234 # send traffic to one destination interface
235 # source and destination are IPv6 only
236 self.setup_interfaces(ipv6=[True, True])
238 # configure FIB entries
239 route = VppIpRoute(self, "a4::", 64,
240 [VppRoutePath(self.pg1.remote_ip6,
241 self.pg1.sw_if_index,
242 proto=DpoProto.DPO_PROTO_IP6)],
244 route.add_vpp_config()
246 # configure encaps IPv6 source address
247 # needs to be done before SR Policy config
249 self.vapi.cli("set sr encaps source addr a3::")
252 # configure SRv6 Policy
253 # Note: segment list order: first -> last
254 sr_policy = VppSRv6Policy(
257 sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
258 weight=1, fib_table=0,
259 segments=['a4::', 'a5::', 'a6::c7'],
261 sr_policy.add_vpp_config()
262 self.sr_policy = sr_policy
264 # log the sr policies
265 self.logger.info(self.vapi.cli("show sr policies"))
267 # steer IPv6 traffic to a7::/64 into SRv6 Policy
268 # use the bsid of the above self.sr_policy
269 pol_steering = VppSRv6Steering(
271 bsid=self.sr_policy.bsid,
272 prefix="a7::", mask_width=64,
273 traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
274 sr_policy_index=0, table_id=0,
276 pol_steering.add_vpp_config()
278 # log the sr steering policies
279 self.logger.info(self.vapi.cli("show sr steering policies"))
282 count = len(self.pg_packet_sizes)
283 dst_inner = 'a7::1234'
286 # create IPv6 packets without SRH
287 packet_header = self.create_packet_header_IPv6(dst_inner)
288 # create traffic stream pg0->pg1
289 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
290 self.pg_packet_sizes, count))
292 # create IPv6 packets with SRH
293 # packets with segments-left 1, active segment a7::
294 packet_header = self.create_packet_header_IPv6_SRH(
295 sidlist=['a8::', 'a7::', 'a6::'],
297 # create traffic stream pg0->pg1
298 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
299 self.pg_packet_sizes, count))
301 # send packets and verify received packets
302 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
303 self.compare_rx_tx_packet_T_Insert)
305 # log the localsid counters
306 self.logger.info(self.vapi.cli("show sr localsid"))
309 pol_steering.remove_vpp_config()
310 self.logger.info(self.vapi.cli("show sr steering policies"))
313 self.sr_policy.remove_vpp_config()
314 self.logger.info(self.vapi.cli("show sr policies"))
320 self.teardown_interfaces()
322 def test_SRv6_T_Encaps_IPv4(self):
323 """ Test SRv6 Transit.Encaps behavior for IPv4.
325 # send traffic to one destination interface
326 # source interface is IPv4 only
327 # destination interface is IPv6 only
328 self.setup_interfaces(ipv6=[False, True], ipv4=[True, False])
330 # configure FIB entries
331 route = VppIpRoute(self, "a4::", 64,
332 [VppRoutePath(self.pg1.remote_ip6,
333 self.pg1.sw_if_index,
334 proto=DpoProto.DPO_PROTO_IP6)],
336 route.add_vpp_config()
338 # configure encaps IPv6 source address
339 # needs to be done before SR Policy config
341 self.vapi.cli("set sr encaps source addr a3::")
344 # configure SRv6 Policy
345 # Note: segment list order: first -> last
346 sr_policy = VppSRv6Policy(
349 sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
350 weight=1, fib_table=0,
351 segments=['a4::', 'a5::', 'a6::c7'],
353 sr_policy.add_vpp_config()
354 self.sr_policy = sr_policy
356 # log the sr policies
357 self.logger.info(self.vapi.cli("show sr policies"))
359 # steer IPv4 traffic to 7.1.1.0/24 into SRv6 Policy
360 # use the bsid of the above self.sr_policy
361 pol_steering = VppSRv6Steering(
363 bsid=self.sr_policy.bsid,
364 prefix="7.1.1.0", mask_width=24,
365 traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV4,
366 sr_policy_index=0, table_id=0,
368 pol_steering.add_vpp_config()
370 # log the sr steering policies
371 self.logger.info(self.vapi.cli("show sr steering policies"))
374 count = len(self.pg_packet_sizes)
375 dst_inner = '7.1.1.123'
378 # create IPv4 packets
379 packet_header = self.create_packet_header_IPv4(dst_inner)
380 # create traffic stream pg0->pg1
381 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
382 self.pg_packet_sizes, count))
384 # send packets and verify received packets
385 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
386 self.compare_rx_tx_packet_T_Encaps_IPv4)
388 # log the localsid counters
389 self.logger.info(self.vapi.cli("show sr localsid"))
392 pol_steering.remove_vpp_config()
393 self.logger.info(self.vapi.cli("show sr steering policies"))
396 self.sr_policy.remove_vpp_config()
397 self.logger.info(self.vapi.cli("show sr policies"))
403 self.teardown_interfaces()
405 @unittest.skip("VPP crashes after running this test")
406 def test_SRv6_T_Encaps_L2(self):
407 """ Test SRv6 Transit.Encaps behavior for L2.
409 # send traffic to one destination interface
410 # source interface is IPv4 only TODO?
411 # destination interface is IPv6 only
412 self.setup_interfaces(ipv6=[False, True], ipv4=[False, False])
414 # configure FIB entries
415 route = VppIpRoute(self, "a4::", 64,
416 [VppRoutePath(self.pg1.remote_ip6,
417 self.pg1.sw_if_index,
418 proto=DpoProto.DPO_PROTO_IP6)],
420 route.add_vpp_config()
422 # configure encaps IPv6 source address
423 # needs to be done before SR Policy config
425 self.vapi.cli("set sr encaps source addr a3::")
428 # configure SRv6 Policy
429 # Note: segment list order: first -> last
430 sr_policy = VppSRv6Policy(
433 sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
434 weight=1, fib_table=0,
435 segments=['a4::', 'a5::', 'a6::c7'],
437 sr_policy.add_vpp_config()
438 self.sr_policy = sr_policy
440 # log the sr policies
441 self.logger.info(self.vapi.cli("show sr policies"))
443 # steer L2 traffic into SRv6 Policy
444 # use the bsid of the above self.sr_policy
445 pol_steering = VppSRv6Steering(
447 bsid=self.sr_policy.bsid,
448 prefix="::", mask_width=0,
449 traffic_type=SRv6PolicySteeringTypes.SR_STEER_L2,
450 sr_policy_index=0, table_id=0,
451 sw_if_index=self.pg0.sw_if_index)
452 pol_steering.add_vpp_config()
454 # log the sr steering policies
455 self.logger.info(self.vapi.cli("show sr steering policies"))
458 count = len(self.pg_packet_sizes)
461 # create L2 packets without dot1q header
462 packet_header = self.create_packet_header_L2()
463 # create traffic stream pg0->pg1
464 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
465 self.pg_packet_sizes, count))
467 # create L2 packets with dot1q header
468 packet_header = self.create_packet_header_L2(vlan=123)
469 # create traffic stream pg0->pg1
470 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
471 self.pg_packet_sizes, count))
473 # send packets and verify received packets
474 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
475 self.compare_rx_tx_packet_T_Encaps_L2)
477 # log the localsid counters
478 self.logger.info(self.vapi.cli("show sr localsid"))
481 pol_steering.remove_vpp_config()
482 self.logger.info(self.vapi.cli("show sr steering policies"))
485 self.sr_policy.remove_vpp_config()
486 self.logger.info(self.vapi.cli("show sr policies"))
492 self.teardown_interfaces()
494 def test_SRv6_End(self):
495 """ Test SRv6 End (without PSP) behavior.
497 # send traffic to one destination interface
498 # source and destination interfaces are IPv6 only
499 self.setup_interfaces(ipv6=[True, True])
501 # configure FIB entries
502 route = VppIpRoute(self, "a4::", 64,
503 [VppRoutePath(self.pg1.remote_ip6,
504 self.pg1.sw_if_index,
505 proto=DpoProto.DPO_PROTO_IP6)],
507 route.add_vpp_config()
509 # configure SRv6 localSID End without PSP behavior
510 localsid = VppSRv6LocalSID(
511 self, localsid_addr='A3::0',
512 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
518 localsid.add_vpp_config()
520 self.logger.debug(self.vapi.cli("show sr localsid"))
522 # create IPv6 packets with SRH (SL=2, SL=1, SL=0)
523 # send one packet per SL value per packet size
524 # SL=0 packet with localSID End with USP needs 2nd SRH
525 count = len(self.pg_packet_sizes)
526 dst_inner = 'a4::1234'
529 # packets with segments-left 2, active segment a3::
530 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
532 sidlist=['a5::', 'a4::', 'a3::'],
534 # create traffic stream pg0->pg1
535 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
536 self.pg_packet_sizes, count))
538 # packets with segments-left 1, active segment a3::
539 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
541 sidlist=['a4::', 'a3::', 'a2::'],
543 # add to traffic stream pg0->pg1
544 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
545 self.pg_packet_sizes, count))
547 # TODO: test behavior with SL=0 packet (needs 2*SRH?)
549 # send packets and verify received packets
550 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
551 self.compare_rx_tx_packet_End)
553 # log the localsid counters
554 self.logger.info(self.vapi.cli("show sr localsid"))
556 # remove SRv6 localSIDs
557 localsid.remove_vpp_config()
563 self.teardown_interfaces()
565 def test_SRv6_End_with_PSP(self):
566 """ Test SRv6 End with PSP behavior.
568 # send traffic to one destination interface
569 # source and destination interfaces are IPv6 only
570 self.setup_interfaces(ipv6=[True, True])
572 # configure FIB entries
573 route = VppIpRoute(self, "a4::", 64,
574 [VppRoutePath(self.pg1.remote_ip6,
575 self.pg1.sw_if_index,
576 proto=DpoProto.DPO_PROTO_IP6)],
578 route.add_vpp_config()
580 # configure SRv6 localSID End with PSP behavior
581 localsid = VppSRv6LocalSID(
582 self, localsid_addr='A3::0',
583 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
589 localsid.add_vpp_config()
591 self.logger.debug(self.vapi.cli("show sr localsid"))
593 # create IPv6 packets with SRH (SL=2, SL=1)
594 # send one packet per SL value per packet size
595 # SL=0 packet with localSID End with PSP is dropped
596 count = len(self.pg_packet_sizes)
597 dst_inner = 'a4::1234'
600 # packets with segments-left 2, active segment a3::
601 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
603 sidlist=['a5::', 'a4::', 'a3::'],
605 # create traffic stream pg0->pg1
606 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
607 self.pg_packet_sizes, count))
609 # packets with segments-left 1, active segment a3::
610 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
612 sidlist=['a4::', 'a3::', 'a2::'],
614 # add to traffic stream pg0->pg1
615 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
616 self.pg_packet_sizes, count))
618 # send packets and verify received packets
619 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
620 self.compare_rx_tx_packet_End_PSP)
622 # log the localsid counters
623 self.logger.info(self.vapi.cli("show sr localsid"))
625 # remove SRv6 localSIDs
626 localsid.remove_vpp_config()
632 self.teardown_interfaces()
634 def test_SRv6_End_X(self):
635 """ Test SRv6 End.X (without PSP) behavior.
637 # create three interfaces (1 source, 2 destinations)
638 # source and destination interfaces are IPv6 only
639 self.setup_interfaces(ipv6=[True, True, True])
641 # configure FIB entries
642 # a4::/64 via pg1 and pg2
643 route = VppIpRoute(self, "a4::", 64,
644 [VppRoutePath(self.pg1.remote_ip6,
645 self.pg1.sw_if_index,
646 proto=DpoProto.DPO_PROTO_IP6),
647 VppRoutePath(self.pg2.remote_ip6,
648 self.pg2.sw_if_index,
649 proto=DpoProto.DPO_PROTO_IP6)],
651 route.add_vpp_config()
652 self.logger.debug(self.vapi.cli("show ip6 fib"))
654 # configure SRv6 localSID End.X without PSP behavior
655 # End.X points to interface pg1
656 localsid = VppSRv6LocalSID(
657 self, localsid_addr='A3::C4',
658 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
659 nh_addr=self.pg1.remote_ip6,
661 sw_if_index=self.pg1.sw_if_index,
664 localsid.add_vpp_config()
666 self.logger.debug(self.vapi.cli("show sr localsid"))
668 # create IPv6 packets with SRH (SL=2, SL=1)
669 # send one packet per SL value per packet size
670 # SL=0 packet with localSID End with PSP is dropped
671 count = len(self.pg_packet_sizes)
672 dst_inner = 'a4::1234'
675 # packets with segments-left 2, active segment a3::c4
676 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
678 sidlist=['a5::', 'a4::', 'a3::c4'],
680 # create traffic stream pg0->pg1
681 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
682 self.pg_packet_sizes, count))
684 # packets with segments-left 1, active segment a3::c4
685 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
687 sidlist=['a4::', 'a3::c4', 'a2::'],
689 # add to traffic stream pg0->pg1
690 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
691 self.pg_packet_sizes, count))
693 # send packets and verify received packets
694 # using same comparison function as End (no PSP)
695 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
696 self.compare_rx_tx_packet_End)
698 # assert nothing was received on the other interface (pg2)
699 self.pg2.assert_nothing_captured("mis-directed packet(s)")
701 # log the localsid counters
702 self.logger.info(self.vapi.cli("show sr localsid"))
704 # remove SRv6 localSIDs
705 localsid.remove_vpp_config()
711 self.teardown_interfaces()
713 def test_SRv6_End_X_with_PSP(self):
714 """ Test SRv6 End.X with PSP behavior.
716 # create three interfaces (1 source, 2 destinations)
717 # source and destination interfaces are IPv6 only
718 self.setup_interfaces(ipv6=[True, True, True])
720 # configure FIB entries
721 # a4::/64 via pg1 and pg2
722 route = VppIpRoute(self, "a4::", 64,
723 [VppRoutePath(self.pg1.remote_ip6,
724 self.pg1.sw_if_index,
725 proto=DpoProto.DPO_PROTO_IP6),
726 VppRoutePath(self.pg2.remote_ip6,
727 self.pg2.sw_if_index,
728 proto=DpoProto.DPO_PROTO_IP6)],
730 route.add_vpp_config()
732 # configure SRv6 localSID End with PSP behavior
733 localsid = VppSRv6LocalSID(
734 self, localsid_addr='A3::C4',
735 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
736 nh_addr=self.pg1.remote_ip6,
738 sw_if_index=self.pg1.sw_if_index,
741 localsid.add_vpp_config()
743 self.logger.debug(self.vapi.cli("show sr localsid"))
745 # create IPv6 packets with SRH (SL=2, SL=1)
746 # send one packet per SL value per packet size
747 # SL=0 packet with localSID End with PSP is dropped
748 count = len(self.pg_packet_sizes)
749 dst_inner = 'a4::1234'
752 # packets with segments-left 2, active segment a3::
753 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
755 sidlist=['a5::', 'a4::', 'a3::c4'],
757 # create traffic stream pg0->pg1
758 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
759 self.pg_packet_sizes, count))
761 # packets with segments-left 1, active segment a3::
762 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
764 sidlist=['a4::', 'a3::c4', 'a2::'],
766 # add to traffic stream pg0->pg1
767 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
768 self.pg_packet_sizes, count))
770 # send packets and verify received packets
771 # using same comparison function as End with PSP
772 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
773 self.compare_rx_tx_packet_End_PSP)
775 # assert nothing was received on the other interface (pg2)
776 self.pg2.assert_nothing_captured("mis-directed packet(s)")
778 # log the localsid counters
779 self.logger.info(self.vapi.cli("show sr localsid"))
781 # remove SRv6 localSIDs
782 localsid.remove_vpp_config()
788 self.teardown_interfaces()
790 def test_SRv6_End_DX6(self):
791 """ Test SRv6 End.DX6 behavior.
793 # send traffic to one destination interface
794 # source and destination interfaces are IPv6 only
795 self.setup_interfaces(ipv6=[True, True])
797 # configure SRv6 localSID End.DX6 behavior
798 localsid = VppSRv6LocalSID(
799 self, localsid_addr='a3::c4',
800 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX6,
801 nh_addr=self.pg1.remote_ip6,
803 sw_if_index=self.pg1.sw_if_index,
806 localsid.add_vpp_config()
808 self.logger.debug(self.vapi.cli("show sr localsid"))
810 # create IPv6 packets with SRH (SL=0)
811 # send one packet per packet size
812 count = len(self.pg_packet_sizes)
813 dst_inner = 'a4::1234' # inner header destination address
816 # packets with SRH, segments-left 0, active segment a3::c4
817 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
819 sidlist=['a3::c4', 'a2::', 'a1::'],
821 # add to traffic stream pg0->pg1
822 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
823 self.pg_packet_sizes, count))
825 # packets without SRH, IPv6 in IPv6
826 # outer IPv6 dest addr is the localsid End.DX6
827 packet_header = self.create_packet_header_IPv6_IPv6(
830 # add to traffic stream pg0->pg1
831 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
832 self.pg_packet_sizes, count))
834 # send packets and verify received packets
835 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
836 self.compare_rx_tx_packet_End_DX6)
838 # log the localsid counters
839 self.logger.info(self.vapi.cli("show sr localsid"))
841 # remove SRv6 localSIDs
842 localsid.remove_vpp_config()
845 self.teardown_interfaces()
847 def test_SRv6_End_DT6(self):
848 """ Test SRv6 End.DT6 behavior.
850 # create three interfaces (1 source, 2 destinations)
851 # all interfaces are IPv6 only
852 # source interface in global FIB (0)
853 # destination interfaces in global and vrf
855 self.setup_interfaces(ipv6=[True, True, True],
856 ipv6_table_id=[0, 0, vrf_1])
858 # configure FIB entries
859 # a4::/64 is reachable
860 # via pg1 in table 0 (global)
861 # and via pg2 in table vrf_1
862 route0 = VppIpRoute(self, "a4::", 64,
863 [VppRoutePath(self.pg1.remote_ip6,
864 self.pg1.sw_if_index,
865 proto=DpoProto.DPO_PROTO_IP6,
869 route0.add_vpp_config()
870 route1 = VppIpRoute(self, "a4::", 64,
871 [VppRoutePath(self.pg2.remote_ip6,
872 self.pg2.sw_if_index,
873 proto=DpoProto.DPO_PROTO_IP6,
877 route1.add_vpp_config()
878 self.logger.debug(self.vapi.cli("show ip6 fib"))
880 # configure SRv6 localSID End.DT6 behavior
882 # fib_table: where the localsid is installed
883 # sw_if_index: in T-variants of localsid this is the vrf table_id
884 localsid = VppSRv6LocalSID(
885 self, localsid_addr='a3::c4',
886 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT6,
892 localsid.add_vpp_config()
894 self.logger.debug(self.vapi.cli("show sr localsid"))
896 # create IPv6 packets with SRH (SL=0)
897 # send one packet per packet size
898 count = len(self.pg_packet_sizes)
899 dst_inner = 'a4::1234' # inner header destination address
902 # packets with SRH, segments-left 0, active segment a3::c4
903 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
905 sidlist=['a3::c4', 'a2::', 'a1::'],
907 # add to traffic stream pg0->pg1
908 pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
909 self.pg_packet_sizes, count))
911 # packets without SRH, IPv6 in IPv6
912 # outer IPv6 dest addr is the localsid End.DT6
913 packet_header = self.create_packet_header_IPv6_IPv6(
916 # add to traffic stream pg0->pg1
917 pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
918 self.pg_packet_sizes, count))
920 # send packets and verify received packets
921 # using same comparison function as End.DX6
922 self.send_and_verify_pkts(self.pg0, pkts, self.pg2,
923 self.compare_rx_tx_packet_End_DX6)
925 # assert nothing was received on the other interface (pg2)
926 self.pg1.assert_nothing_captured("mis-directed packet(s)")
928 # log the localsid counters
929 self.logger.info(self.vapi.cli("show sr localsid"))
931 # remove SRv6 localSIDs
932 localsid.remove_vpp_config()
938 self.teardown_interfaces()
940 def test_SRv6_End_DX4(self):
941 """ Test SRv6 End.DX4 behavior.
943 # send traffic to one destination interface
944 # source interface is IPv6 only
945 # destination interface is IPv4 only
946 self.setup_interfaces(ipv6=[True, False], ipv4=[False, True])
948 # configure SRv6 localSID End.DX4 behavior
949 localsid = VppSRv6LocalSID(
950 self, localsid_addr='a3::c4',
951 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX4,
952 nh_addr=self.pg1.remote_ip4,
954 sw_if_index=self.pg1.sw_if_index,
957 localsid.add_vpp_config()
959 self.logger.debug(self.vapi.cli("show sr localsid"))
961 # send one packet per packet size
962 count = len(self.pg_packet_sizes)
963 dst_inner = '4.1.1.123' # inner header destination address
966 # packets with SRH, segments-left 0, active segment a3::c4
967 packet_header = self.create_packet_header_IPv6_SRH_IPv4(
969 sidlist=['a3::c4', 'a2::', 'a1::'],
971 # add to traffic stream pg0->pg1
972 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
973 self.pg_packet_sizes, count))
975 # packets without SRH, IPv4 in IPv6
976 # outer IPv6 dest addr is the localsid End.DX4
977 packet_header = self.create_packet_header_IPv6_IPv4(
980 # add to traffic stream pg0->pg1
981 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
982 self.pg_packet_sizes, count))
984 # send packets and verify received packets
985 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
986 self.compare_rx_tx_packet_End_DX4)
988 # log the localsid counters
989 self.logger.info(self.vapi.cli("show sr localsid"))
991 # remove SRv6 localSIDs
992 localsid.remove_vpp_config()
995 self.teardown_interfaces()
997 def test_SRv6_End_DT4(self):
998 """ Test SRv6 End.DT4 behavior.
1000 # create three interfaces (1 source, 2 destinations)
1001 # source interface is IPv6-only
1002 # destination interfaces are IPv4 only
1003 # source interface in global FIB (0)
1004 # destination interfaces in global and vrf
1006 self.setup_interfaces(ipv6=[True, False, False],
1007 ipv4=[False, True, True],
1008 ipv6_table_id=[0, 0, 0],
1009 ipv4_table_id=[0, 0, vrf_1])
1011 # configure FIB entries
1012 # 4.1.1.0/24 is reachable
1013 # via pg1 in table 0 (global)
1014 # and via pg2 in table vrf_1
1015 route0 = VppIpRoute(self, "4.1.1.0", 24,
1016 [VppRoutePath(self.pg1.remote_ip4,
1017 self.pg1.sw_if_index,
1021 route0.add_vpp_config()
1022 route1 = VppIpRoute(self, "4.1.1.0", 24,
1023 [VppRoutePath(self.pg2.remote_ip4,
1024 self.pg2.sw_if_index,
1025 nh_table_id=vrf_1)],
1028 route1.add_vpp_config()
1029 self.logger.debug(self.vapi.cli("show ip fib"))
1031 # configure SRv6 localSID End.DT6 behavior
1033 # fib_table: where the localsid is installed
1034 # sw_if_index: in T-variants of localsid: vrf table_id
1035 localsid = VppSRv6LocalSID(
1036 self, localsid_addr='a3::c4',
1037 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT4,
1043 localsid.add_vpp_config()
1045 self.logger.debug(self.vapi.cli("show sr localsid"))
1047 # create IPv6 packets with SRH (SL=0)
1048 # send one packet per packet size
1049 count = len(self.pg_packet_sizes)
1050 dst_inner = '4.1.1.123' # inner header destination address
1053 # packets with SRH, segments-left 0, active segment a3::c4
1054 packet_header = self.create_packet_header_IPv6_SRH_IPv4(
1056 sidlist=['a3::c4', 'a2::', 'a1::'],
1058 # add to traffic stream pg0->pg1
1059 pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
1060 self.pg_packet_sizes, count))
1062 # packets without SRH, IPv6 in IPv6
1063 # outer IPv6 dest addr is the localsid End.DX4
1064 packet_header = self.create_packet_header_IPv6_IPv4(
1067 # add to traffic stream pg0->pg1
1068 pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
1069 self.pg_packet_sizes, count))
1071 # send packets and verify received packets
1072 # using same comparison function as End.DX4
1073 self.send_and_verify_pkts(self.pg0, pkts, self.pg2,
1074 self.compare_rx_tx_packet_End_DX4)
1076 # assert nothing was received on the other interface (pg2)
1077 self.pg1.assert_nothing_captured("mis-directed packet(s)")
1079 # log the localsid counters
1080 self.logger.info(self.vapi.cli("show sr localsid"))
1082 # remove SRv6 localSIDs
1083 localsid.remove_vpp_config()
1085 # remove FIB entries
1088 # cleanup interfaces
1089 self.teardown_interfaces()
1091 def test_SRv6_End_DX2(self):
1092 """ Test SRv6 End.DX2 behavior.
1094 # send traffic to one destination interface
1095 # source interface is IPv6 only
1096 self.setup_interfaces(ipv6=[True, False], ipv4=[False, False])
1098 # configure SRv6 localSID End.DX2 behavior
1099 localsid = VppSRv6LocalSID(
1100 self, localsid_addr='a3::c4',
1101 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX2,
1104 sw_if_index=self.pg1.sw_if_index,
1107 localsid.add_vpp_config()
1109 self.logger.debug(self.vapi.cli("show sr localsid"))
1111 # send one packet per packet size
1112 count = len(self.pg_packet_sizes)
1115 # packets with SRH, segments-left 0, active segment a3::c4
1116 # L2 has no dot1q header
1117 packet_header = self.create_packet_header_IPv6_SRH_L2(
1118 sidlist=['a3::c4', 'a2::', 'a1::'],
1121 # add to traffic stream pg0->pg1
1122 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1123 self.pg_packet_sizes, count))
1125 # packets with SRH, segments-left 0, active segment a3::c4
1126 # L2 has dot1q header
1127 packet_header = self.create_packet_header_IPv6_SRH_L2(
1128 sidlist=['a3::c4', 'a2::', 'a1::'],
1131 # add to traffic stream pg0->pg1
1132 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1133 self.pg_packet_sizes, count))
1135 # packets without SRH, L2 in IPv6
1136 # outer IPv6 dest addr is the localsid End.DX2
1137 # L2 has no dot1q header
1138 packet_header = self.create_packet_header_IPv6_L2(
1141 # add to traffic stream pg0->pg1
1142 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1143 self.pg_packet_sizes, count))
1145 # packets without SRH, L2 in IPv6
1146 # outer IPv6 dest addr is the localsid End.DX2
1147 # L2 has dot1q header
1148 packet_header = self.create_packet_header_IPv6_L2(
1151 # add to traffic stream pg0->pg1
1152 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1153 self.pg_packet_sizes, count))
1155 # send packets and verify received packets
1156 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
1157 self.compare_rx_tx_packet_End_DX2)
1159 # log the localsid counters
1160 self.logger.info(self.vapi.cli("show sr localsid"))
1162 # remove SRv6 localSIDs
1163 localsid.remove_vpp_config()
1165 # cleanup interfaces
1166 self.teardown_interfaces()
1168 def compare_rx_tx_packet_T_Encaps(self, tx_pkt, rx_pkt):
1169 """ Compare input and output packet after passing T.Encaps
1171 :param tx_pkt: transmitted packet
1172 :param rx_pkt: received packet
1174 # T.Encaps updates the headers as follows:
1175 # SR Policy seglist (S3, S2, S1)
1176 # SR Policy source C
1179 # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(A, B2)
1181 # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1)
1182 # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(a, B2)SRH(B3, B2, B1; SL=1)
1184 # get first (outer) IPv6 header of rx'ed packet
1185 rx_ip = rx_pkt.getlayer(IPv6)
1188 tx_ip = tx_pkt.getlayer(IPv6)
1190 # expected segment-list
1191 seglist = self.sr_policy.segments
1192 # reverse list to get order as in SRH
1193 tx_seglist = seglist[::-1]
1195 # get source address of SR Policy
1196 sr_policy_source = self.sr_policy.source
1198 # rx'ed packet should have SRH
1199 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1201 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1203 # received ip.src should be equal to SR Policy source
1204 self.assertEqual(rx_ip.src, sr_policy_source)
1205 # received ip.dst should be equal to expected sidlist[lastentry]
1206 self.assertEqual(rx_ip.dst, tx_seglist[-1])
1207 # rx'ed seglist should be equal to expected seglist
1208 self.assertEqual(rx_srh.addresses, tx_seglist)
1209 # segleft should be equal to size expected seglist-1
1210 self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1211 # segleft should be equal to lastentry
1212 self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1214 # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1215 # except for the hop-limit field
1216 # -> update tx'ed hlim to the expected hlim
1217 tx_ip.hlim = tx_ip.hlim - 1
1219 self.assertEqual(rx_srh.payload, tx_ip)
1221 self.logger.debug("packet verification: SUCCESS")
1223 def compare_rx_tx_packet_T_Encaps_IPv4(self, tx_pkt, rx_pkt):
1224 """ Compare input and output packet after passing T.Encaps for IPv4
1226 :param tx_pkt: transmitted packet
1227 :param rx_pkt: received packet
1229 # T.Encaps for IPv4 updates the headers as follows:
1230 # SR Policy seglist (S3, S2, S1)
1231 # SR Policy source C
1234 # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv4(A, B2)
1236 # get first (outer) IPv6 header of rx'ed packet
1237 rx_ip = rx_pkt.getlayer(IPv6)
1240 tx_ip = tx_pkt.getlayer(IP)
1242 # expected segment-list
1243 seglist = self.sr_policy.segments
1244 # reverse list to get order as in SRH
1245 tx_seglist = seglist[::-1]
1247 # get source address of SR Policy
1248 sr_policy_source = self.sr_policy.source
1250 # checks common to cases tx with and without SRH
1251 # rx'ed packet should have SRH and IPv4 header
1252 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1253 self.assertTrue(rx_ip.payload.haslayer(IP))
1255 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1257 # received ip.src should be equal to SR Policy source
1258 self.assertEqual(rx_ip.src, sr_policy_source)
1259 # received ip.dst should be equal to sidlist[lastentry]
1260 self.assertEqual(rx_ip.dst, tx_seglist[-1])
1261 # rx'ed seglist should be equal to seglist
1262 self.assertEqual(rx_srh.addresses, tx_seglist)
1263 # segleft should be equal to size seglist-1
1264 self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1265 # segleft should be equal to lastentry
1266 self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1268 # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1269 # except for the ttl field and ip checksum
1270 # -> adjust tx'ed ttl to expected ttl
1271 tx_ip.ttl = tx_ip.ttl - 1
1272 # -> set tx'ed ip checksum to None and let scapy recompute
1274 # read back the pkt (with str()) to force computing these fields
1275 # probably other ways to accomplish this are possible
1276 tx_ip = IP(str(tx_ip))
1278 self.assertEqual(rx_srh.payload, tx_ip)
1280 self.logger.debug("packet verification: SUCCESS")
1282 def compare_rx_tx_packet_T_Encaps_L2(self, tx_pkt, rx_pkt):
1283 """ Compare input and output packet after passing T.Encaps for L2
1285 :param tx_pkt: transmitted packet
1286 :param rx_pkt: received packet
1288 # T.Encaps for L2 updates the headers as follows:
1289 # SR Policy seglist (S3, S2, S1)
1290 # SR Policy source C
1293 # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)L2
1295 # get first (outer) IPv6 header of rx'ed packet
1296 rx_ip = rx_pkt.getlayer(IPv6)
1299 tx_ether = tx_pkt.getlayer(Ether)
1301 # expected segment-list
1302 seglist = self.sr_policy.segments
1303 # reverse list to get order as in SRH
1304 tx_seglist = seglist[::-1]
1306 # get source address of SR Policy
1307 sr_policy_source = self.sr_policy.source
1309 # rx'ed packet should have SRH
1310 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1312 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1314 # received ip.src should be equal to SR Policy source
1315 self.assertEqual(rx_ip.src, sr_policy_source)
1316 # received ip.dst should be equal to sidlist[lastentry]
1317 self.assertEqual(rx_ip.dst, tx_seglist[-1])
1318 # rx'ed seglist should be equal to seglist
1319 self.assertEqual(rx_srh.addresses, tx_seglist)
1320 # segleft should be equal to size seglist-1
1321 self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1322 # segleft should be equal to lastentry
1323 self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1324 # nh should be "No Next Header" (59)
1325 self.assertEqual(rx_srh.nh, 59)
1327 # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1328 self.assertEqual(Ether(str(rx_srh.payload)), tx_ether)
1330 self.logger.debug("packet verification: SUCCESS")
1332 def compare_rx_tx_packet_T_Insert(self, tx_pkt, rx_pkt):
1333 """ Compare input and output packet after passing T.Insert
1335 :param tx_pkt: transmitted packet
1336 :param rx_pkt: received packet
1338 # T.Insert updates the headers as follows:
1341 # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)
1343 # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1)
1344 # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)SRH(B3, B2, B1; SL=1)
1346 # get first (outer) IPv6 header of rx'ed packet
1347 rx_ip = rx_pkt.getlayer(IPv6)
1352 rx_udp = rx_pkt[UDP]
1354 tx_ip = tx_pkt.getlayer(IPv6)
1357 # some packets have been tx'ed with an SRH, some without it
1358 # get SRH if tx'ed packet has it
1359 if tx_pkt.haslayer(IPv6ExtHdrSegmentRouting):
1360 tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1361 tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1362 tx_udp = tx_pkt[UDP]
1364 # expected segment-list (make copy of SR Policy segment list)
1365 seglist = self.sr_policy.segments[:]
1366 # expected seglist has initial dest addr as last segment
1367 seglist.append(tx_ip.dst)
1368 # reverse list to get order as in SRH
1369 tx_seglist = seglist[::-1]
1371 # get source address of SR Policy
1372 sr_policy_source = self.sr_policy.source
1374 # checks common to cases tx with and without SRH
1375 # rx'ed packet should have SRH and only one IPv6 header
1376 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1377 self.assertFalse(rx_ip.payload.haslayer(IPv6))
1379 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1381 # rx'ed ip.src should be equal to tx'ed ip.src
1382 self.assertEqual(rx_ip.src, tx_ip.src)
1383 # rx'ed ip.dst should be equal to sidlist[lastentry]
1384 self.assertEqual(rx_ip.dst, tx_seglist[-1])
1386 # rx'ed seglist should be equal to expected seglist
1387 self.assertEqual(rx_srh.addresses, tx_seglist)
1388 # segleft should be equal to size(expected seglist)-1
1389 self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1390 # segleft should be equal to lastentry
1391 self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1393 if tx_srh: # packet was tx'ed with SRH
1394 # packet should have 2nd SRH
1395 self.assertTrue(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting))
1397 rx_srh2 = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting, 2)
1399 # rx'ed srh2.addresses should be equal to tx'ed srh.addresses
1400 self.assertEqual(rx_srh2.addresses, tx_srh.addresses)
1401 # rx'ed srh2.segleft should be equal to tx'ed srh.segleft
1402 self.assertEqual(rx_srh2.segleft, tx_srh.segleft)
1403 # rx'ed srh2.lastentry should be equal to tx'ed srh.lastentry
1404 self.assertEqual(rx_srh2.lastentry, tx_srh.lastentry)
1406 else: # packet was tx'ed without SRH
1407 # rx packet should have no other SRH
1408 self.assertFalse(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting))
1410 # UDP layer should be unchanged
1411 self.assertEqual(rx_udp, tx_udp)
1413 self.logger.debug("packet verification: SUCCESS")
1415 def compare_rx_tx_packet_End(self, tx_pkt, rx_pkt):
1416 """ Compare input and output packet after passing End (without PSP)
1418 :param tx_pkt: transmitted packet
1419 :param rx_pkt: received packet
1421 # End (no PSP) updates the headers as follows:
1423 # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2)
1424 # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1426 # get first (outer) IPv6 header of rx'ed packet
1427 rx_ip = rx_pkt.getlayer(IPv6)
1430 rx_udp = rx_pkt[UDP]
1432 tx_ip = tx_pkt.getlayer(IPv6)
1433 # we know the packet has been tx'ed
1434 # with an inner IPv6 header and an SRH
1435 tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1436 tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1437 tx_udp = tx_pkt[UDP]
1439 # common checks, regardless of tx segleft value
1440 # rx'ed packet should have 2nd IPv6 header
1441 self.assertTrue(rx_ip.payload.haslayer(IPv6))
1442 # get second (inner) IPv6 header
1443 rx_ip2 = rx_pkt.getlayer(IPv6, 2)
1445 if tx_ip.segleft > 0:
1446 # SRH should NOT have been popped:
1447 # End SID without PSP does not pop SRH if segleft>0
1448 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1449 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1451 # received ip.src should be equal to expected ip.src
1452 self.assertEqual(rx_ip.src, tx_ip.src)
1453 # sidlist should be unchanged
1454 self.assertEqual(rx_srh.addresses, tx_srh.addresses)
1455 # segleft should have been decremented
1456 self.assertEqual(rx_srh.segleft, tx_srh.segleft-1)
1457 # received ip.dst should be equal to sidlist[segleft]
1458 self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
1459 # lastentry should be unchanged
1460 self.assertEqual(rx_srh.lastentry, tx_srh.lastentry)
1461 # inner IPv6 packet (ip2) should be unchanged
1462 self.assertEqual(rx_ip2.src, tx_ip2.src)
1463 self.assertEqual(rx_ip2.dst, tx_ip2.dst)
1464 # else: # tx_ip.segleft == 0
1465 # TODO: Does this work with 2 SRHs in ingress packet?
1467 # UDP layer should be unchanged
1468 self.assertEqual(rx_udp, tx_udp)
1470 self.logger.debug("packet verification: SUCCESS")
1472 def compare_rx_tx_packet_End_PSP(self, tx_pkt, rx_pkt):
1473 """ Compare input and output packet after passing End with PSP
1475 :param tx_pkt: transmitted packet
1476 :param rx_pkt: received packet
1478 # End (PSP) updates the headers as follows:
1479 # IPv6 + SRH (SL>1):
1480 # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2)
1481 # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1482 # IPv6 + SRH (SL=1):
1483 # in: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1486 # get first (outer) IPv6 header of rx'ed packet
1487 rx_ip = rx_pkt.getlayer(IPv6)
1490 rx_udp = rx_pkt[UDP]
1492 tx_ip = tx_pkt.getlayer(IPv6)
1493 # we know the packet has been tx'ed
1494 # with an inner IPv6 header and an SRH
1495 tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1496 tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1497 tx_udp = tx_pkt[UDP]
1499 # common checks, regardless of tx segleft value
1500 self.assertTrue(rx_ip.payload.haslayer(IPv6))
1501 rx_ip2 = rx_pkt.getlayer(IPv6, 2)
1502 # inner IPv6 packet (ip2) should be unchanged
1503 self.assertEqual(rx_ip2.src, tx_ip2.src)
1504 self.assertEqual(rx_ip2.dst, tx_ip2.dst)
1506 if tx_ip.segleft > 1:
1507 # SRH should NOT have been popped:
1508 # End SID with PSP does not pop SRH if segleft>1
1509 # rx'ed packet should have SRH
1510 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1511 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1513 # received ip.src should be equal to expected ip.src
1514 self.assertEqual(rx_ip.src, tx_ip.src)
1515 # sidlist should be unchanged
1516 self.assertEqual(rx_srh.addresses, tx_srh.addresses)
1517 # segleft should have been decremented
1518 self.assertEqual(rx_srh.segleft, tx_srh.segleft-1)
1519 # received ip.dst should be equal to sidlist[segleft]
1520 self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
1521 # lastentry should be unchanged
1522 self.assertEqual(rx_srh.lastentry, tx_srh.lastentry)
1524 else: # tx_ip.segleft <= 1
1525 # SRH should have been popped:
1526 # End SID with PSP and segleft=1 pops SRH
1527 # the two IPv6 headers are still present
1528 # outer IPv6 header has DA == last segment of popped SRH
1529 # SRH should not be present
1530 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1531 # outer IPv6 header ip.src should be equal to tx'ed ip.src
1532 self.assertEqual(rx_ip.src, tx_ip.src)
1533 # outer IPv6 header ip.dst should be = to tx'ed sidlist[segleft-1]
1534 self.assertEqual(rx_ip.dst, tx_srh.addresses[tx_srh.segleft-1])
1536 # UDP layer should be unchanged
1537 self.assertEqual(rx_udp, tx_udp)
1539 self.logger.debug("packet verification: SUCCESS")
1541 def compare_rx_tx_packet_End_DX6(self, tx_pkt, rx_pkt):
1542 """ Compare input and output packet after passing End.DX6
1544 :param tx_pkt: transmitted packet
1545 :param rx_pkt: received packet
1547 # End.DX6 updates the headers as follows:
1548 # IPv6 + SRH (SL=0):
1549 # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv6(B, D)
1552 # in: IPv6(A, S3)IPv6(B, D)
1555 # get first (outer) IPv6 header of rx'ed packet
1556 rx_ip = rx_pkt.getlayer(IPv6)
1558 tx_ip = tx_pkt.getlayer(IPv6)
1559 tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1561 # verify if rx'ed packet has no SRH
1562 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1564 # the whole rx_ip pkt should be equal to tx_ip2
1565 # except for the hlim field
1566 # -> adjust tx'ed hlim to expected hlim
1567 tx_ip2.hlim = tx_ip2.hlim - 1
1569 self.assertEqual(rx_ip, tx_ip2)
1571 self.logger.debug("packet verification: SUCCESS")
1573 def compare_rx_tx_packet_End_DX4(self, tx_pkt, rx_pkt):
1574 """ Compare input and output packet after passing End.DX4
1576 :param tx_pkt: transmitted packet
1577 :param rx_pkt: received packet
1579 # End.DX4 updates the headers as follows:
1580 # IPv6 + SRH (SL=0):
1581 # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv4(B, D)
1584 # in: IPv6(A, S3)IPv4(B, D)
1587 # get IPv4 header of rx'ed packet
1588 rx_ip = rx_pkt.getlayer(IP)
1590 tx_ip = tx_pkt.getlayer(IPv6)
1591 tx_ip2 = tx_pkt.getlayer(IP)
1593 # verify if rx'ed packet has no SRH
1594 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1596 # the whole rx_ip pkt should be equal to tx_ip2
1597 # except for the ttl field and ip checksum
1598 # -> adjust tx'ed ttl to expected ttl
1599 tx_ip2.ttl = tx_ip2.ttl - 1
1600 # -> set tx'ed ip checksum to None and let scapy recompute
1601 tx_ip2.chksum = None
1602 # read back the pkt (with str()) to force computing these fields
1603 # probably other ways to accomplish this are possible
1604 tx_ip2 = IP(str(tx_ip2))
1606 self.assertEqual(rx_ip, tx_ip2)
1608 self.logger.debug("packet verification: SUCCESS")
1610 def compare_rx_tx_packet_End_DX2(self, tx_pkt, rx_pkt):
1611 """ Compare input and output packet after passing End.DX2
1613 :param tx_pkt: transmitted packet
1614 :param rx_pkt: received packet
1616 # End.DX2 updates the headers as follows:
1617 # IPv6 + SRH (SL=0):
1618 # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)L2
1624 # get IPv4 header of rx'ed packet
1625 rx_eth = rx_pkt.getlayer(Ether)
1627 tx_ip = tx_pkt.getlayer(IPv6)
1628 # we can't just get the 2nd Ether layer
1629 # get the Raw content and dissect it as Ether
1630 tx_eth1 = Ether(str(tx_pkt[Raw]))
1632 # verify if rx'ed packet has no SRH
1633 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1635 # the whole rx_eth pkt should be equal to tx_eth1
1636 self.assertEqual(rx_eth, tx_eth1)
1638 self.logger.debug("packet verification: SUCCESS")
1640 def create_stream(self, src_if, dst_if, packet_header, packet_sizes,
1642 """Create SRv6 input packet stream for defined interface.
1644 :param VppInterface src_if: Interface to create packet stream for
1645 :param VppInterface dst_if: destination interface of packet stream
1646 :param packet_header: Layer3 scapy packet headers,
1647 L2 is added when not provided,
1648 Raw(payload) with packet_info is added
1649 :param list packet_sizes: packet stream pckt sizes,sequentially applied
1650 to packets in stream have
1651 :param int count: number of packets in packet stream
1652 :return: list of packets
1654 self.logger.info("Creating packets")
1656 for i in range(0, count-1):
1657 payload_info = self.create_packet_info(src_if, dst_if)
1659 "Creating packet with index %d" % (payload_info.index))
1660 payload = self.info_to_payload(payload_info)
1661 # add L2 header if not yet provided in packet_header
1662 if packet_header.getlayer(0).name == 'Ethernet':
1663 p = (packet_header /
1666 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
1669 size = packet_sizes[i % len(packet_sizes)]
1670 self.logger.debug("Packet size %d" % (size))
1671 self.extend_packet(p, size)
1672 # we need to store the packet with the automatic fields computed
1673 # read back the dumped packet (with str())
1674 # to force computing these fields
1675 # probably other ways are possible
1677 payload_info.data = p.copy()
1678 self.logger.debug(ppp("Created packet:", p))
1680 self.logger.info("Done creating packets")
1683 def send_and_verify_pkts(self, input, pkts, output, compare_func):
1684 """Send packets and verify received packets using compare_func
1686 :param input: ingress interface of DUT
1687 :param pkts: list of packets to transmit
1688 :param output: egress interface of DUT
1689 :param compare_func: function to compare in and out packets
1691 # add traffic stream to input interface
1692 input.add_stream(pkts)
1694 # enable capture on all interfaces
1695 self.pg_enable_capture(self.pg_interfaces)
1698 self.logger.info("Starting traffic")
1701 # get output capture
1702 self.logger.info("Getting packet capture")
1703 capture = output.get_capture()
1705 # assert nothing was captured on input interface
1706 input.assert_nothing_captured()
1708 # verify captured packets
1709 self.verify_captured_pkts(output, capture, compare_func)
1711 def create_packet_header_IPv6(self, dst):
1712 """Create packet header: IPv6 header, UDP header
1714 :param dst: IPv6 destination address
1716 IPv6 source address is 1234::1
1717 UDP source port and destination port are 1234
1720 p = (IPv6(src='1234::1', dst=dst) /
1721 UDP(sport=1234, dport=1234))
1724 def create_packet_header_IPv6_SRH(self, sidlist, segleft):
1725 """Create packet header: IPv6 header with SRH, UDP header
1727 :param list sidlist: segment list
1728 :param int segleft: segments-left field value
1730 IPv6 destination address is set to sidlist[segleft]
1731 IPv6 source addresses are 1234::1 and 4321::1
1732 UDP source port and destination port are 1234
1735 p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1736 IPv6ExtHdrSegmentRouting(addresses=sidlist) /
1737 UDP(sport=1234, dport=1234))
1740 def create_packet_header_IPv6_SRH_IPv6(self, dst, sidlist, segleft):
1741 """Create packet header: IPv6 encapsulated in SRv6:
1742 IPv6 header with SRH, IPv6 header, UDP header
1744 :param ipv6address dst: inner IPv6 destination address
1745 :param list sidlist: segment list of outer IPv6 SRH
1746 :param int segleft: segments-left field of outer IPv6 SRH
1748 Outer IPv6 destination address is set to sidlist[segleft]
1749 IPv6 source addresses are 1234::1 and 4321::1
1750 UDP source port and destination port are 1234
1753 p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1754 IPv6ExtHdrSegmentRouting(addresses=sidlist,
1755 segleft=segleft, nh=41) /
1756 IPv6(src='4321::1', dst=dst) /
1757 UDP(sport=1234, dport=1234))
1760 def create_packet_header_IPv6_IPv6(self, dst_inner, dst_outer):
1761 """Create packet header: IPv6 encapsulated in IPv6:
1762 IPv6 header, IPv6 header, UDP header
1764 :param ipv6address dst_inner: inner IPv6 destination address
1765 :param ipv6address dst_outer: outer IPv6 destination address
1767 IPv6 source addresses are 1234::1 and 4321::1
1768 UDP source port and destination port are 1234
1771 p = (IPv6(src='1234::1', dst=dst_outer) /
1772 IPv6(src='4321::1', dst=dst_inner) /
1773 UDP(sport=1234, dport=1234))
1776 def create_packet_header_IPv6_SRH_SRH_IPv6(self, dst, sidlist1, segleft1,
1777 sidlist2, segleft2):
1778 """Create packet header: IPv6 encapsulated in SRv6 with 2 SRH:
1779 IPv6 header with SRH, 2nd SRH, IPv6 header, UDP header
1781 :param ipv6address dst: inner IPv6 destination address
1782 :param list sidlist1: segment list of outer IPv6 SRH
1783 :param int segleft1: segments-left field of outer IPv6 SRH
1784 :param list sidlist2: segment list of inner IPv6 SRH
1785 :param int segleft2: segments-left field of inner IPv6 SRH
1787 Outer IPv6 destination address is set to sidlist[segleft]
1788 IPv6 source addresses are 1234::1 and 4321::1
1789 UDP source port and destination port are 1234
1792 p = (IPv6(src='1234::1', dst=sidlist1[segleft1]) /
1793 IPv6ExtHdrSegmentRouting(addresses=sidlist1,
1794 segleft=segleft1, nh=43) /
1795 IPv6ExtHdrSegmentRouting(addresses=sidlist2,
1796 segleft=segleft2, nh=41) /
1797 IPv6(src='4321::1', dst=dst) /
1798 UDP(sport=1234, dport=1234))
1801 def create_packet_header_IPv4(self, dst):
1802 """Create packet header: IPv4 header, UDP header
1804 :param dst: IPv4 destination address
1806 IPv4 source address is 123.1.1.1
1807 UDP source port and destination port are 1234
1810 p = (IP(src='123.1.1.1', dst=dst) /
1811 UDP(sport=1234, dport=1234))
1814 def create_packet_header_IPv6_IPv4(self, dst_inner, dst_outer):
1815 """Create packet header: IPv4 encapsulated in IPv6:
1816 IPv6 header, IPv4 header, UDP header
1818 :param ipv4address dst_inner: inner IPv4 destination address
1819 :param ipv6address dst_outer: outer IPv6 destination address
1821 IPv6 source address is 1234::1
1822 IPv4 source address is 123.1.1.1
1823 UDP source port and destination port are 1234
1826 p = (IPv6(src='1234::1', dst=dst_outer) /
1827 IP(src='123.1.1.1', dst=dst_inner) /
1828 UDP(sport=1234, dport=1234))
1831 def create_packet_header_IPv6_SRH_IPv4(self, dst, sidlist, segleft):
1832 """Create packet header: IPv4 encapsulated in SRv6:
1833 IPv6 header with SRH, IPv4 header, UDP header
1835 :param ipv4address dst: inner IPv4 destination address
1836 :param list sidlist: segment list of outer IPv6 SRH
1837 :param int segleft: segments-left field of outer IPv6 SRH
1839 Outer IPv6 destination address is set to sidlist[segleft]
1840 IPv6 source address is 1234::1
1841 IPv4 source address is 123.1.1.1
1842 UDP source port and destination port are 1234
1845 p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1846 IPv6ExtHdrSegmentRouting(addresses=sidlist,
1847 segleft=segleft, nh=4) /
1848 IP(src='123.1.1.1', dst=dst) /
1849 UDP(sport=1234, dport=1234))
1852 def create_packet_header_L2(self, vlan=0):
1853 """Create packet header: L2 header
1855 :param vlan: if vlan!=0 then add 802.1q header
1857 # Note: the dst addr ('00:55:44:33:22:11') is used in
1858 # the compare function compare_rx_tx_packet_T_Encaps_L2
1859 # to detect presence of L2 in SRH payload
1860 p = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
1861 etype = 0x8137 # IPX
1864 p /= Dot1Q(vlan=vlan, type=etype)
1869 def create_packet_header_IPv6_SRH_L2(self, sidlist, segleft, vlan=0):
1870 """Create packet header: L2 encapsulated in SRv6:
1871 IPv6 header with SRH, L2
1873 :param list sidlist: segment list of outer IPv6 SRH
1874 :param int segleft: segments-left field of outer IPv6 SRH
1875 :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
1877 Outer IPv6 destination address is set to sidlist[segleft]
1878 IPv6 source address is 1234::1
1880 eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
1881 etype = 0x8137 # IPX
1884 eth /= Dot1Q(vlan=vlan, type=etype)
1888 p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1889 IPv6ExtHdrSegmentRouting(addresses=sidlist,
1890 segleft=segleft, nh=59) /
1894 def create_packet_header_IPv6_L2(self, dst_outer, vlan=0):
1895 """Create packet header: L2 encapsulated in IPv6:
1898 :param ipv6address dst_outer: outer IPv6 destination address
1899 :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
1901 eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
1902 etype = 0x8137 # IPX
1905 eth /= Dot1Q(vlan=vlan, type=etype)
1909 p = (IPv6(src='1234::1', dst=dst_outer, nh=59) / eth)
1912 def get_payload_info(self, packet):
1913 """ Extract the payload_info from the packet
1915 # in most cases, payload_info is in packet[Raw]
1916 # but packet[Raw] gives the complete payload
1917 # (incl L2 header) for the T.Encaps L2 case
1919 payload_info = self.payload_to_info(str(packet[Raw]))
1922 # remote L2 header from packet[Raw]:
1923 # take packet[Raw], convert it to an Ether layer
1924 # and then extract Raw from it
1925 payload_info = self.payload_to_info(
1926 str(Ether(str(packet[Raw]))[Raw]))
1930 def verify_captured_pkts(self, dst_if, capture, compare_func):
1932 Verify captured packet stream for specified interface.
1933 Compare ingress with egress packets using the specified compare fn
1935 :param dst_if: egress interface of DUT
1936 :param capture: captured packets
1937 :param compare_func: function to compare in and out packet
1939 self.logger.info("Verifying capture on interface %s using function %s"
1940 % (dst_if.name, compare_func.func_name))
1943 for i in self.pg_interfaces:
1944 last_info[i.sw_if_index] = None
1945 dst_sw_if_index = dst_if.sw_if_index
1947 for packet in capture:
1949 # extract payload_info from packet's payload
1950 payload_info = self.get_payload_info(packet)
1951 packet_index = payload_info.index
1953 self.logger.debug("Verifying packet with index %d"
1955 # packet should have arrived on the expected interface
1956 self.assertEqual(payload_info.dst, dst_sw_if_index)
1958 "Got packet on interface %s: src=%u (idx=%u)" %
1959 (dst_if.name, payload_info.src, packet_index))
1961 # search for payload_info with same src and dst if_index
1962 # this will give us the transmitted packet
1963 next_info = self.get_next_packet_info_for_interface2(
1964 payload_info.src, dst_sw_if_index,
1965 last_info[payload_info.src])
1966 last_info[payload_info.src] = next_info
1967 # next_info should not be None
1968 self.assertTrue(next_info is not None)
1969 # index of tx and rx packets should be equal
1970 self.assertEqual(packet_index, next_info.index)
1971 # data field of next_info contains the tx packet
1972 txed_packet = next_info.data
1974 self.logger.debug(ppp("Transmitted packet:",
1975 txed_packet)) # ppp=Pretty Print Packet
1977 self.logger.debug(ppp("Received packet:", packet))
1979 # compare rcvd packet with expected packet using compare_func
1980 compare_func(txed_packet, packet)
1983 print packet.command()
1984 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1987 # have all expected packets arrived?
1988 for i in self.pg_interfaces:
1989 remaining_packet = self.get_next_packet_info_for_interface2(
1990 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
1991 self.assertTrue(remaining_packet is None,
1992 "Interface %s: Packet expected from interface %s "
1993 "didn't arrive" % (dst_if.name, i.name))
1996 if __name__ == '__main__':
1997 unittest.main(testRunner=VppTestRunner)