4 from socket import AF_INET6
6 from framework import VppTestCase, VppTestRunner
7 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto, VppIpTable
8 from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \
9 SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes
11 from scapy.packet import Raw
12 from scapy.layers.l2 import Ether, Dot1Q
13 from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
14 from scapy.layers.inet import IP, UDP
16 from scapy.utils import inet_pton, inet_ntop
21 class TestSRv6(VppTestCase):
22 """ SRv6 Test Case """
26 super(TestSRv6, self).setUpClass()
29 """ Perform test setup before each test case.
31 super(TestSRv6, self).setUp()
33 # packet sizes, inclusive L2 overhead
34 self.pg_packet_sizes = [64, 512, 1518, 9018]
37 self.reset_packet_infos()
40 """ Clean up test setup after each test case.
42 self.teardown_interfaces()
44 super(TestSRv6, self).tearDown()
46 def configure_interface(self,
48 ipv6=False, ipv4=False,
49 ipv6_table_id=0, ipv4_table_id=0):
50 """ Configure interface.
51 :param ipv6: configure IPv6 on interface
52 :param ipv4: configure IPv4 on interface
53 :param ipv6_table_id: FIB table_id for IPv6
54 :param ipv4_table_id: FIB table_id for IPv4
56 self.logger.debug("Configuring interface %s" % (interface.name))
58 self.logger.debug("Configuring IPv6")
59 interface.set_table_ip6(ipv6_table_id)
60 interface.config_ip6()
61 interface.resolve_ndp(timeout=5)
63 self.logger.debug("Configuring IPv4")
64 interface.set_table_ip4(ipv4_table_id)
65 interface.config_ip4()
66 interface.resolve_arp()
69 def setup_interfaces(self, ipv6=[], ipv4=[],
70 ipv6_table_id=[], ipv4_table_id=[]):
71 """ Create and configure interfaces.
73 :param ipv6: list of interface IPv6 capabilities
74 :param ipv4: list of interface IPv4 capabilities
75 :param ipv6_table_id: list of intf IPv6 FIB table_ids
76 :param ipv4_table_id: list of intf IPv4 FIB table_ids
77 :returns: List of created interfaces.
79 # how many interfaces?
84 self.logger.debug("Creating and configuring %d interfaces" % (count))
86 # fill up ipv6 and ipv4 lists if needed
87 # not enabled (False) is the default
89 ipv6 += (count - len(ipv6)) * [False]
91 ipv4 += (count - len(ipv4)) * [False]
93 # fill up table_id lists if needed
94 # table_id 0 (global) is the default
95 if len(ipv6_table_id) < count:
96 ipv6_table_id += (count - len(ipv6_table_id)) * [0]
97 if len(ipv4_table_id) < count:
98 ipv4_table_id += (count - len(ipv4_table_id)) * [0]
100 # create 'count' pg interfaces
101 self.create_pg_interfaces(range(count))
105 ids = sorted(set(ipv4_table_id))
106 for i in range(len(ids)):
108 self.tables.append(VppIpTable(self, ids[i]))
109 ids = sorted(set(ipv6_table_id))
110 for i in range(len(ids)):
112 self.tables.append(VppIpTable(self, ids[i], is_ip6=1))
113 for t in self.tables:
116 # setup all interfaces
117 for i in range(count):
118 intf = self.pg_interfaces[i]
119 self.configure_interface(intf,
121 ipv6_table_id[i], ipv4_table_id[i])
124 self.logger.debug(self.vapi.cli("show ip6 neighbors"))
126 self.logger.debug(self.vapi.cli("show ip arp"))
127 self.logger.debug(self.vapi.cli("show interface"))
128 self.logger.debug(self.vapi.cli("show hardware"))
130 return self.pg_interfaces
132 def teardown_interfaces(self):
133 """ Unconfigure and bring down interface.
135 self.logger.debug("Tearing down interfaces")
136 # tear down all interfaces
137 # AFAIK they cannot be deleted
138 for i in self.pg_interfaces:
139 self.logger.debug("Tear down interface %s" % (i.name))
145 def test_SRv6_T_Encaps(self):
146 """ Test SRv6 Transit.Encaps behavior for IPv6.
148 # send traffic to one destination interface
149 # source and destination are IPv6 only
150 self.setup_interfaces(ipv6=[True, True])
152 # configure FIB entries
153 route = VppIpRoute(self, "a4::", 64,
154 [VppRoutePath(self.pg1.remote_ip6,
155 self.pg1.sw_if_index,
156 proto=DpoProto.DPO_PROTO_IP6)],
158 route.add_vpp_config()
160 # configure encaps IPv6 source address
161 # needs to be done before SR Policy config
163 self.vapi.cli("set sr encaps source addr a3::")
166 # configure SRv6 Policy
167 # Note: segment list order: first -> last
168 sr_policy = VppSRv6Policy(
171 sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
172 weight=1, fib_table=0,
173 segments=['a4::', 'a5::', 'a6::c7'],
175 sr_policy.add_vpp_config()
176 self.sr_policy = sr_policy
178 # log the sr policies
179 self.logger.info(self.vapi.cli("show sr policies"))
181 # steer IPv6 traffic to a7::/64 into SRv6 Policy
182 # use the bsid of the above self.sr_policy
183 pol_steering = VppSRv6Steering(
185 bsid=self.sr_policy.bsid,
186 prefix="a7::", mask_width=64,
187 traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
188 sr_policy_index=0, table_id=0,
190 pol_steering.add_vpp_config()
192 # log the sr steering policies
193 self.logger.info(self.vapi.cli("show sr steering policies"))
196 count = len(self.pg_packet_sizes)
197 dst_inner = 'a7::1234'
200 # create IPv6 packets without SRH
201 packet_header = self.create_packet_header_IPv6(dst_inner)
202 # create traffic stream pg0->pg1
203 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
204 self.pg_packet_sizes, count))
206 # create IPv6 packets with SRH
207 # packets with segments-left 1, active segment a7::
208 packet_header = self.create_packet_header_IPv6_SRH(
209 sidlist=['a8::', 'a7::', 'a6::'],
211 # create traffic stream pg0->pg1
212 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
213 self.pg_packet_sizes, count))
215 # create IPv6 packets with SRH and IPv6
216 # packets with segments-left 1, active segment a7::
217 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
219 sidlist=['a8::', 'a7::', 'a6::'],
221 # create traffic stream pg0->pg1
222 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
223 self.pg_packet_sizes, count))
225 # send packets and verify received packets
226 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
227 self.compare_rx_tx_packet_T_Encaps)
229 # log the localsid counters
230 self.logger.info(self.vapi.cli("show sr localsid"))
233 pol_steering.remove_vpp_config()
234 self.logger.info(self.vapi.cli("show sr steering policies"))
237 self.sr_policy.remove_vpp_config()
238 self.logger.info(self.vapi.cli("show sr policies"))
244 self.teardown_interfaces()
246 def test_SRv6_T_Insert(self):
247 """ Test SRv6 Transit.Insert behavior (IPv6 only).
249 # send traffic to one destination interface
250 # source and destination are IPv6 only
251 self.setup_interfaces(ipv6=[True, True])
253 # configure FIB entries
254 route = VppIpRoute(self, "a4::", 64,
255 [VppRoutePath(self.pg1.remote_ip6,
256 self.pg1.sw_if_index,
257 proto=DpoProto.DPO_PROTO_IP6)],
259 route.add_vpp_config()
261 # configure encaps IPv6 source address
262 # needs to be done before SR Policy config
264 self.vapi.cli("set sr encaps source addr a3::")
267 # configure SRv6 Policy
268 # Note: segment list order: first -> last
269 sr_policy = VppSRv6Policy(
272 sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
273 weight=1, fib_table=0,
274 segments=['a4::', 'a5::', 'a6::c7'],
276 sr_policy.add_vpp_config()
277 self.sr_policy = sr_policy
279 # log the sr policies
280 self.logger.info(self.vapi.cli("show sr policies"))
282 # steer IPv6 traffic to a7::/64 into SRv6 Policy
283 # use the bsid of the above self.sr_policy
284 pol_steering = VppSRv6Steering(
286 bsid=self.sr_policy.bsid,
287 prefix="a7::", mask_width=64,
288 traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
289 sr_policy_index=0, table_id=0,
291 pol_steering.add_vpp_config()
293 # log the sr steering policies
294 self.logger.info(self.vapi.cli("show sr steering policies"))
297 count = len(self.pg_packet_sizes)
298 dst_inner = 'a7::1234'
301 # create IPv6 packets without SRH
302 packet_header = self.create_packet_header_IPv6(dst_inner)
303 # create traffic stream pg0->pg1
304 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
305 self.pg_packet_sizes, count))
307 # create IPv6 packets with SRH
308 # packets with segments-left 1, active segment a7::
309 packet_header = self.create_packet_header_IPv6_SRH(
310 sidlist=['a8::', 'a7::', 'a6::'],
312 # create traffic stream pg0->pg1
313 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
314 self.pg_packet_sizes, count))
316 # send packets and verify received packets
317 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
318 self.compare_rx_tx_packet_T_Insert)
320 # log the localsid counters
321 self.logger.info(self.vapi.cli("show sr localsid"))
324 pol_steering.remove_vpp_config()
325 self.logger.info(self.vapi.cli("show sr steering policies"))
328 self.sr_policy.remove_vpp_config()
329 self.logger.info(self.vapi.cli("show sr policies"))
335 self.teardown_interfaces()
337 def test_SRv6_T_Encaps_IPv4(self):
338 """ Test SRv6 Transit.Encaps behavior for IPv4.
340 # send traffic to one destination interface
341 # source interface is IPv4 only
342 # destination interface is IPv6 only
343 self.setup_interfaces(ipv6=[False, True], ipv4=[True, False])
345 # configure FIB entries
346 route = VppIpRoute(self, "a4::", 64,
347 [VppRoutePath(self.pg1.remote_ip6,
348 self.pg1.sw_if_index,
349 proto=DpoProto.DPO_PROTO_IP6)],
351 route.add_vpp_config()
353 # configure encaps IPv6 source address
354 # needs to be done before SR Policy config
356 self.vapi.cli("set sr encaps source addr a3::")
359 # configure SRv6 Policy
360 # Note: segment list order: first -> last
361 sr_policy = VppSRv6Policy(
364 sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
365 weight=1, fib_table=0,
366 segments=['a4::', 'a5::', 'a6::c7'],
368 sr_policy.add_vpp_config()
369 self.sr_policy = sr_policy
371 # log the sr policies
372 self.logger.info(self.vapi.cli("show sr policies"))
374 # steer IPv4 traffic to 7.1.1.0/24 into SRv6 Policy
375 # use the bsid of the above self.sr_policy
376 pol_steering = VppSRv6Steering(
378 bsid=self.sr_policy.bsid,
379 prefix="7.1.1.0", mask_width=24,
380 traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV4,
381 sr_policy_index=0, table_id=0,
383 pol_steering.add_vpp_config()
385 # log the sr steering policies
386 self.logger.info(self.vapi.cli("show sr steering policies"))
389 count = len(self.pg_packet_sizes)
390 dst_inner = '7.1.1.123'
393 # create IPv4 packets
394 packet_header = self.create_packet_header_IPv4(dst_inner)
395 # create traffic stream pg0->pg1
396 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
397 self.pg_packet_sizes, count))
399 # send packets and verify received packets
400 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
401 self.compare_rx_tx_packet_T_Encaps_IPv4)
403 # log the localsid counters
404 self.logger.info(self.vapi.cli("show sr localsid"))
407 pol_steering.remove_vpp_config()
408 self.logger.info(self.vapi.cli("show sr steering policies"))
411 self.sr_policy.remove_vpp_config()
412 self.logger.info(self.vapi.cli("show sr policies"))
418 self.teardown_interfaces()
420 @unittest.skip("VPP crashes after running this test")
421 def test_SRv6_T_Encaps_L2(self):
422 """ Test SRv6 Transit.Encaps behavior for L2.
424 # send traffic to one destination interface
425 # source interface is IPv4 only TODO?
426 # destination interface is IPv6 only
427 self.setup_interfaces(ipv6=[False, True], ipv4=[False, False])
429 # configure FIB entries
430 route = VppIpRoute(self, "a4::", 64,
431 [VppRoutePath(self.pg1.remote_ip6,
432 self.pg1.sw_if_index,
433 proto=DpoProto.DPO_PROTO_IP6)],
435 route.add_vpp_config()
437 # configure encaps IPv6 source address
438 # needs to be done before SR Policy config
440 self.vapi.cli("set sr encaps source addr a3::")
443 # configure SRv6 Policy
444 # Note: segment list order: first -> last
445 sr_policy = VppSRv6Policy(
448 sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
449 weight=1, fib_table=0,
450 segments=['a4::', 'a5::', 'a6::c7'],
452 sr_policy.add_vpp_config()
453 self.sr_policy = sr_policy
455 # log the sr policies
456 self.logger.info(self.vapi.cli("show sr policies"))
458 # steer L2 traffic into SRv6 Policy
459 # use the bsid of the above self.sr_policy
460 pol_steering = VppSRv6Steering(
462 bsid=self.sr_policy.bsid,
463 prefix="::", mask_width=0,
464 traffic_type=SRv6PolicySteeringTypes.SR_STEER_L2,
465 sr_policy_index=0, table_id=0,
466 sw_if_index=self.pg0.sw_if_index)
467 pol_steering.add_vpp_config()
469 # log the sr steering policies
470 self.logger.info(self.vapi.cli("show sr steering policies"))
473 count = len(self.pg_packet_sizes)
476 # create L2 packets without dot1q header
477 packet_header = self.create_packet_header_L2()
478 # create traffic stream pg0->pg1
479 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
480 self.pg_packet_sizes, count))
482 # create L2 packets with dot1q header
483 packet_header = self.create_packet_header_L2(vlan=123)
484 # create traffic stream pg0->pg1
485 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
486 self.pg_packet_sizes, count))
488 # send packets and verify received packets
489 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
490 self.compare_rx_tx_packet_T_Encaps_L2)
492 # log the localsid counters
493 self.logger.info(self.vapi.cli("show sr localsid"))
496 pol_steering.remove_vpp_config()
497 self.logger.info(self.vapi.cli("show sr steering policies"))
500 self.sr_policy.remove_vpp_config()
501 self.logger.info(self.vapi.cli("show sr policies"))
507 self.teardown_interfaces()
509 def test_SRv6_End(self):
510 """ Test SRv6 End (without PSP) behavior.
512 # send traffic to one destination interface
513 # source and destination interfaces are IPv6 only
514 self.setup_interfaces(ipv6=[True, True])
516 # configure FIB entries
517 route = VppIpRoute(self, "a4::", 64,
518 [VppRoutePath(self.pg1.remote_ip6,
519 self.pg1.sw_if_index,
520 proto=DpoProto.DPO_PROTO_IP6)],
522 route.add_vpp_config()
524 # configure SRv6 localSID End without PSP behavior
525 localsid = VppSRv6LocalSID(
526 self, localsid_addr='A3::0',
527 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
533 localsid.add_vpp_config()
535 self.logger.debug(self.vapi.cli("show sr localsid"))
537 # create IPv6 packets with SRH (SL=2, SL=1, SL=0)
538 # send one packet per SL value per packet size
539 # SL=0 packet with localSID End with USP needs 2nd SRH
540 count = len(self.pg_packet_sizes)
541 dst_inner = 'a4::1234'
544 # packets with segments-left 2, active segment a3::
545 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
547 sidlist=['a5::', 'a4::', 'a3::'],
549 # create traffic stream pg0->pg1
550 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
551 self.pg_packet_sizes, count))
553 # packets with segments-left 1, active segment a3::
554 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
556 sidlist=['a4::', 'a3::', 'a2::'],
558 # add to traffic stream pg0->pg1
559 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
560 self.pg_packet_sizes, count))
562 # TODO: test behavior with SL=0 packet (needs 2*SRH?)
564 # send packets and verify received packets
565 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
566 self.compare_rx_tx_packet_End)
568 # log the localsid counters
569 self.logger.info(self.vapi.cli("show sr localsid"))
571 # remove SRv6 localSIDs
572 localsid.remove_vpp_config()
578 self.teardown_interfaces()
580 def test_SRv6_End_with_PSP(self):
581 """ Test SRv6 End with PSP behavior.
583 # send traffic to one destination interface
584 # source and destination interfaces are IPv6 only
585 self.setup_interfaces(ipv6=[True, True])
587 # configure FIB entries
588 route = VppIpRoute(self, "a4::", 64,
589 [VppRoutePath(self.pg1.remote_ip6,
590 self.pg1.sw_if_index,
591 proto=DpoProto.DPO_PROTO_IP6)],
593 route.add_vpp_config()
595 # configure SRv6 localSID End with PSP behavior
596 localsid = VppSRv6LocalSID(
597 self, localsid_addr='A3::0',
598 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
604 localsid.add_vpp_config()
606 self.logger.debug(self.vapi.cli("show sr localsid"))
608 # create IPv6 packets with SRH (SL=2, SL=1)
609 # send one packet per SL value per packet size
610 # SL=0 packet with localSID End with PSP is dropped
611 count = len(self.pg_packet_sizes)
612 dst_inner = 'a4::1234'
615 # packets with segments-left 2, active segment a3::
616 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
618 sidlist=['a5::', 'a4::', 'a3::'],
620 # create traffic stream pg0->pg1
621 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
622 self.pg_packet_sizes, count))
624 # packets with segments-left 1, active segment a3::
625 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
627 sidlist=['a4::', 'a3::', 'a2::'],
629 # add to traffic stream pg0->pg1
630 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
631 self.pg_packet_sizes, count))
633 # send packets and verify received packets
634 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
635 self.compare_rx_tx_packet_End_PSP)
637 # log the localsid counters
638 self.logger.info(self.vapi.cli("show sr localsid"))
640 # remove SRv6 localSIDs
641 localsid.remove_vpp_config()
647 self.teardown_interfaces()
649 def test_SRv6_End_X(self):
650 """ Test SRv6 End.X (without PSP) behavior.
652 # create three interfaces (1 source, 2 destinations)
653 # source and destination interfaces are IPv6 only
654 self.setup_interfaces(ipv6=[True, True, True])
656 # configure FIB entries
657 # a4::/64 via pg1 and pg2
658 route = VppIpRoute(self, "a4::", 64,
659 [VppRoutePath(self.pg1.remote_ip6,
660 self.pg1.sw_if_index,
661 proto=DpoProto.DPO_PROTO_IP6),
662 VppRoutePath(self.pg2.remote_ip6,
663 self.pg2.sw_if_index,
664 proto=DpoProto.DPO_PROTO_IP6)],
666 route.add_vpp_config()
667 self.logger.debug(self.vapi.cli("show ip6 fib"))
669 # configure SRv6 localSID End.X without PSP behavior
670 # End.X points to interface pg1
671 localsid = VppSRv6LocalSID(
672 self, localsid_addr='A3::C4',
673 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
674 nh_addr=self.pg1.remote_ip6,
676 sw_if_index=self.pg1.sw_if_index,
679 localsid.add_vpp_config()
681 self.logger.debug(self.vapi.cli("show sr localsid"))
683 # create IPv6 packets with SRH (SL=2, SL=1)
684 # send one packet per SL value per packet size
685 # SL=0 packet with localSID End with PSP is dropped
686 count = len(self.pg_packet_sizes)
687 dst_inner = 'a4::1234'
690 # packets with segments-left 2, active segment a3::c4
691 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
693 sidlist=['a5::', 'a4::', 'a3::c4'],
695 # create traffic stream pg0->pg1
696 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
697 self.pg_packet_sizes, count))
699 # packets with segments-left 1, active segment a3::c4
700 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
702 sidlist=['a4::', 'a3::c4', 'a2::'],
704 # add to traffic stream pg0->pg1
705 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
706 self.pg_packet_sizes, count))
708 # send packets and verify received packets
709 # using same comparison function as End (no PSP)
710 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
711 self.compare_rx_tx_packet_End)
713 # assert nothing was received on the other interface (pg2)
714 self.pg2.assert_nothing_captured("mis-directed packet(s)")
716 # log the localsid counters
717 self.logger.info(self.vapi.cli("show sr localsid"))
719 # remove SRv6 localSIDs
720 localsid.remove_vpp_config()
726 self.teardown_interfaces()
728 def test_SRv6_End_X_with_PSP(self):
729 """ Test SRv6 End.X with PSP behavior.
731 # create three interfaces (1 source, 2 destinations)
732 # source and destination interfaces are IPv6 only
733 self.setup_interfaces(ipv6=[True, True, True])
735 # configure FIB entries
736 # a4::/64 via pg1 and pg2
737 route = VppIpRoute(self, "a4::", 64,
738 [VppRoutePath(self.pg1.remote_ip6,
739 self.pg1.sw_if_index,
740 proto=DpoProto.DPO_PROTO_IP6),
741 VppRoutePath(self.pg2.remote_ip6,
742 self.pg2.sw_if_index,
743 proto=DpoProto.DPO_PROTO_IP6)],
745 route.add_vpp_config()
747 # configure SRv6 localSID End with PSP behavior
748 localsid = VppSRv6LocalSID(
749 self, localsid_addr='A3::C4',
750 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
751 nh_addr=self.pg1.remote_ip6,
753 sw_if_index=self.pg1.sw_if_index,
756 localsid.add_vpp_config()
758 self.logger.debug(self.vapi.cli("show sr localsid"))
760 # create IPv6 packets with SRH (SL=2, SL=1)
761 # send one packet per SL value per packet size
762 # SL=0 packet with localSID End with PSP is dropped
763 count = len(self.pg_packet_sizes)
764 dst_inner = 'a4::1234'
767 # packets with segments-left 2, active segment a3::
768 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
770 sidlist=['a5::', 'a4::', 'a3::c4'],
772 # create traffic stream pg0->pg1
773 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
774 self.pg_packet_sizes, count))
776 # packets with segments-left 1, active segment a3::
777 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
779 sidlist=['a4::', 'a3::c4', 'a2::'],
781 # add to traffic stream pg0->pg1
782 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
783 self.pg_packet_sizes, count))
785 # send packets and verify received packets
786 # using same comparison function as End with PSP
787 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
788 self.compare_rx_tx_packet_End_PSP)
790 # assert nothing was received on the other interface (pg2)
791 self.pg2.assert_nothing_captured("mis-directed packet(s)")
793 # log the localsid counters
794 self.logger.info(self.vapi.cli("show sr localsid"))
796 # remove SRv6 localSIDs
797 localsid.remove_vpp_config()
803 self.teardown_interfaces()
805 def test_SRv6_End_DX6(self):
806 """ Test SRv6 End.DX6 behavior.
808 # send traffic to one destination interface
809 # source and destination interfaces are IPv6 only
810 self.setup_interfaces(ipv6=[True, True])
812 # configure SRv6 localSID End.DX6 behavior
813 localsid = VppSRv6LocalSID(
814 self, localsid_addr='a3::c4',
815 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX6,
816 nh_addr=self.pg1.remote_ip6,
818 sw_if_index=self.pg1.sw_if_index,
821 localsid.add_vpp_config()
823 self.logger.debug(self.vapi.cli("show sr localsid"))
825 # create IPv6 packets with SRH (SL=0)
826 # send one packet per packet size
827 count = len(self.pg_packet_sizes)
828 dst_inner = 'a4::1234' # inner header destination address
831 # packets with SRH, segments-left 0, active segment a3::c4
832 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
834 sidlist=['a3::c4', 'a2::', 'a1::'],
836 # add to traffic stream pg0->pg1
837 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
838 self.pg_packet_sizes, count))
840 # packets without SRH, IPv6 in IPv6
841 # outer IPv6 dest addr is the localsid End.DX6
842 packet_header = self.create_packet_header_IPv6_IPv6(
845 # add to traffic stream pg0->pg1
846 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
847 self.pg_packet_sizes, count))
849 # send packets and verify received packets
850 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
851 self.compare_rx_tx_packet_End_DX6)
853 # log the localsid counters
854 self.logger.info(self.vapi.cli("show sr localsid"))
856 # remove SRv6 localSIDs
857 localsid.remove_vpp_config()
860 self.teardown_interfaces()
862 def test_SRv6_End_DT6(self):
863 """ Test SRv6 End.DT6 behavior.
865 # create three interfaces (1 source, 2 destinations)
866 # all interfaces are IPv6 only
867 # source interface in global FIB (0)
868 # destination interfaces in global and vrf
870 self.setup_interfaces(ipv6=[True, True, True],
871 ipv6_table_id=[0, 0, vrf_1])
873 # configure FIB entries
874 # a4::/64 is reachable
875 # via pg1 in table 0 (global)
876 # and via pg2 in table vrf_1
877 route0 = VppIpRoute(self, "a4::", 64,
878 [VppRoutePath(self.pg1.remote_ip6,
879 self.pg1.sw_if_index,
880 proto=DpoProto.DPO_PROTO_IP6,
884 route0.add_vpp_config()
885 route1 = VppIpRoute(self, "a4::", 64,
886 [VppRoutePath(self.pg2.remote_ip6,
887 self.pg2.sw_if_index,
888 proto=DpoProto.DPO_PROTO_IP6,
892 route1.add_vpp_config()
893 self.logger.debug(self.vapi.cli("show ip6 fib"))
895 # configure SRv6 localSID End.DT6 behavior
897 # fib_table: where the localsid is installed
898 # sw_if_index: in T-variants of localsid this is the vrf table_id
899 localsid = VppSRv6LocalSID(
900 self, localsid_addr='a3::c4',
901 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT6,
907 localsid.add_vpp_config()
909 self.logger.debug(self.vapi.cli("show sr localsid"))
911 # create IPv6 packets with SRH (SL=0)
912 # send one packet per packet size
913 count = len(self.pg_packet_sizes)
914 dst_inner = 'a4::1234' # inner header destination address
917 # packets with SRH, segments-left 0, active segment a3::c4
918 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
920 sidlist=['a3::c4', 'a2::', 'a1::'],
922 # add to traffic stream pg0->pg1
923 pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
924 self.pg_packet_sizes, count))
926 # packets without SRH, IPv6 in IPv6
927 # outer IPv6 dest addr is the localsid End.DT6
928 packet_header = self.create_packet_header_IPv6_IPv6(
931 # add to traffic stream pg0->pg1
932 pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
933 self.pg_packet_sizes, count))
935 # send packets and verify received packets
936 # using same comparison function as End.DX6
937 self.send_and_verify_pkts(self.pg0, pkts, self.pg2,
938 self.compare_rx_tx_packet_End_DX6)
940 # assert nothing was received on the other interface (pg2)
941 self.pg1.assert_nothing_captured("mis-directed packet(s)")
943 # log the localsid counters
944 self.logger.info(self.vapi.cli("show sr localsid"))
946 # remove SRv6 localSIDs
947 localsid.remove_vpp_config()
953 self.teardown_interfaces()
955 def test_SRv6_End_DX4(self):
956 """ Test SRv6 End.DX4 behavior.
958 # send traffic to one destination interface
959 # source interface is IPv6 only
960 # destination interface is IPv4 only
961 self.setup_interfaces(ipv6=[True, False], ipv4=[False, True])
963 # configure SRv6 localSID End.DX4 behavior
964 localsid = VppSRv6LocalSID(
965 self, localsid_addr='a3::c4',
966 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX4,
967 nh_addr=self.pg1.remote_ip4,
969 sw_if_index=self.pg1.sw_if_index,
972 localsid.add_vpp_config()
974 self.logger.debug(self.vapi.cli("show sr localsid"))
976 # send one packet per packet size
977 count = len(self.pg_packet_sizes)
978 dst_inner = '4.1.1.123' # inner header destination address
981 # packets with SRH, segments-left 0, active segment a3::c4
982 packet_header = self.create_packet_header_IPv6_SRH_IPv4(
984 sidlist=['a3::c4', 'a2::', 'a1::'],
986 # add to traffic stream pg0->pg1
987 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
988 self.pg_packet_sizes, count))
990 # packets without SRH, IPv4 in IPv6
991 # outer IPv6 dest addr is the localsid End.DX4
992 packet_header = self.create_packet_header_IPv6_IPv4(
995 # add to traffic stream pg0->pg1
996 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
997 self.pg_packet_sizes, count))
999 # send packets and verify received packets
1000 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
1001 self.compare_rx_tx_packet_End_DX4)
1003 # log the localsid counters
1004 self.logger.info(self.vapi.cli("show sr localsid"))
1006 # remove SRv6 localSIDs
1007 localsid.remove_vpp_config()
1009 # cleanup interfaces
1010 self.teardown_interfaces()
1012 def test_SRv6_End_DT4(self):
1013 """ Test SRv6 End.DT4 behavior.
1015 # create three interfaces (1 source, 2 destinations)
1016 # source interface is IPv6-only
1017 # destination interfaces are IPv4 only
1018 # source interface in global FIB (0)
1019 # destination interfaces in global and vrf
1021 self.setup_interfaces(ipv6=[True, False, False],
1022 ipv4=[False, True, True],
1023 ipv6_table_id=[0, 0, 0],
1024 ipv4_table_id=[0, 0, vrf_1])
1026 # configure FIB entries
1027 # 4.1.1.0/24 is reachable
1028 # via pg1 in table 0 (global)
1029 # and via pg2 in table vrf_1
1030 route0 = VppIpRoute(self, "4.1.1.0", 24,
1031 [VppRoutePath(self.pg1.remote_ip4,
1032 self.pg1.sw_if_index,
1036 route0.add_vpp_config()
1037 route1 = VppIpRoute(self, "4.1.1.0", 24,
1038 [VppRoutePath(self.pg2.remote_ip4,
1039 self.pg2.sw_if_index,
1040 nh_table_id=vrf_1)],
1043 route1.add_vpp_config()
1044 self.logger.debug(self.vapi.cli("show ip fib"))
1046 # configure SRv6 localSID End.DT6 behavior
1048 # fib_table: where the localsid is installed
1049 # sw_if_index: in T-variants of localsid: vrf table_id
1050 localsid = VppSRv6LocalSID(
1051 self, localsid_addr='a3::c4',
1052 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT4,
1058 localsid.add_vpp_config()
1060 self.logger.debug(self.vapi.cli("show sr localsid"))
1062 # create IPv6 packets with SRH (SL=0)
1063 # send one packet per packet size
1064 count = len(self.pg_packet_sizes)
1065 dst_inner = '4.1.1.123' # inner header destination address
1068 # packets with SRH, segments-left 0, active segment a3::c4
1069 packet_header = self.create_packet_header_IPv6_SRH_IPv4(
1071 sidlist=['a3::c4', 'a2::', 'a1::'],
1073 # add to traffic stream pg0->pg1
1074 pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
1075 self.pg_packet_sizes, count))
1077 # packets without SRH, IPv6 in IPv6
1078 # outer IPv6 dest addr is the localsid End.DX4
1079 packet_header = self.create_packet_header_IPv6_IPv4(
1082 # add to traffic stream pg0->pg1
1083 pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
1084 self.pg_packet_sizes, count))
1086 # send packets and verify received packets
1087 # using same comparison function as End.DX4
1088 self.send_and_verify_pkts(self.pg0, pkts, self.pg2,
1089 self.compare_rx_tx_packet_End_DX4)
1091 # assert nothing was received on the other interface (pg2)
1092 self.pg1.assert_nothing_captured("mis-directed packet(s)")
1094 # log the localsid counters
1095 self.logger.info(self.vapi.cli("show sr localsid"))
1097 # remove SRv6 localSIDs
1098 localsid.remove_vpp_config()
1100 # remove FIB entries
1103 # cleanup interfaces
1104 self.teardown_interfaces()
1106 def test_SRv6_End_DX2(self):
1107 """ Test SRv6 End.DX2 behavior.
1109 # send traffic to one destination interface
1110 # source interface is IPv6 only
1111 self.setup_interfaces(ipv6=[True, False], ipv4=[False, False])
1113 # configure SRv6 localSID End.DX2 behavior
1114 localsid = VppSRv6LocalSID(
1115 self, localsid_addr='a3::c4',
1116 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX2,
1119 sw_if_index=self.pg1.sw_if_index,
1122 localsid.add_vpp_config()
1124 self.logger.debug(self.vapi.cli("show sr localsid"))
1126 # send one packet per packet size
1127 count = len(self.pg_packet_sizes)
1130 # packets with SRH, segments-left 0, active segment a3::c4
1131 # L2 has no dot1q header
1132 packet_header = self.create_packet_header_IPv6_SRH_L2(
1133 sidlist=['a3::c4', 'a2::', 'a1::'],
1136 # add to traffic stream pg0->pg1
1137 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1138 self.pg_packet_sizes, count))
1140 # packets with SRH, segments-left 0, active segment a3::c4
1141 # L2 has dot1q header
1142 packet_header = self.create_packet_header_IPv6_SRH_L2(
1143 sidlist=['a3::c4', 'a2::', 'a1::'],
1146 # add to traffic stream pg0->pg1
1147 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1148 self.pg_packet_sizes, count))
1150 # packets without SRH, L2 in IPv6
1151 # outer IPv6 dest addr is the localsid End.DX2
1152 # L2 has no dot1q header
1153 packet_header = self.create_packet_header_IPv6_L2(
1156 # add to traffic stream pg0->pg1
1157 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1158 self.pg_packet_sizes, count))
1160 # packets without SRH, L2 in IPv6
1161 # outer IPv6 dest addr is the localsid End.DX2
1162 # L2 has dot1q header
1163 packet_header = self.create_packet_header_IPv6_L2(
1166 # add to traffic stream pg0->pg1
1167 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1168 self.pg_packet_sizes, count))
1170 # send packets and verify received packets
1171 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
1172 self.compare_rx_tx_packet_End_DX2)
1174 # log the localsid counters
1175 self.logger.info(self.vapi.cli("show sr localsid"))
1177 # remove SRv6 localSIDs
1178 localsid.remove_vpp_config()
1180 # cleanup interfaces
1181 self.teardown_interfaces()
1183 def compare_rx_tx_packet_T_Encaps(self, tx_pkt, rx_pkt):
1184 """ Compare input and output packet after passing T.Encaps
1186 :param tx_pkt: transmitted packet
1187 :param rx_pkt: received packet
1189 # T.Encaps updates the headers as follows:
1190 # SR Policy seglist (S3, S2, S1)
1191 # SR Policy source C
1194 # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(A, B2)
1196 # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1)
1197 # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(a, B2)SRH(B3, B2, B1; SL=1)
1199 # get first (outer) IPv6 header of rx'ed packet
1200 rx_ip = rx_pkt.getlayer(IPv6)
1203 tx_ip = tx_pkt.getlayer(IPv6)
1205 # expected segment-list
1206 seglist = self.sr_policy.segments
1207 # reverse list to get order as in SRH
1208 tx_seglist = seglist[::-1]
1210 # get source address of SR Policy
1211 sr_policy_source = self.sr_policy.source
1213 # rx'ed packet should have SRH
1214 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1216 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1218 # received ip.src should be equal to SR Policy source
1219 self.assertEqual(rx_ip.src, sr_policy_source)
1220 # received ip.dst should be equal to expected sidlist[lastentry]
1221 self.assertEqual(rx_ip.dst, tx_seglist[-1])
1222 # rx'ed seglist should be equal to expected seglist
1223 self.assertEqual(rx_srh.addresses, tx_seglist)
1224 # segleft should be equal to size expected seglist-1
1225 self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1226 # segleft should be equal to lastentry
1227 self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1229 # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1230 # except for the hop-limit field
1231 # -> update tx'ed hlim to the expected hlim
1232 tx_ip.hlim = tx_ip.hlim - 1
1234 self.assertEqual(rx_srh.payload, tx_ip)
1236 self.logger.debug("packet verification: SUCCESS")
1238 def compare_rx_tx_packet_T_Encaps_IPv4(self, tx_pkt, rx_pkt):
1239 """ Compare input and output packet after passing T.Encaps for IPv4
1241 :param tx_pkt: transmitted packet
1242 :param rx_pkt: received packet
1244 # T.Encaps for IPv4 updates the headers as follows:
1245 # SR Policy seglist (S3, S2, S1)
1246 # SR Policy source C
1249 # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv4(A, B2)
1251 # get first (outer) IPv6 header of rx'ed packet
1252 rx_ip = rx_pkt.getlayer(IPv6)
1255 tx_ip = tx_pkt.getlayer(IP)
1257 # expected segment-list
1258 seglist = self.sr_policy.segments
1259 # reverse list to get order as in SRH
1260 tx_seglist = seglist[::-1]
1262 # get source address of SR Policy
1263 sr_policy_source = self.sr_policy.source
1265 # checks common to cases tx with and without SRH
1266 # rx'ed packet should have SRH and IPv4 header
1267 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1268 self.assertTrue(rx_ip.payload.haslayer(IP))
1270 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1272 # received ip.src should be equal to SR Policy source
1273 self.assertEqual(rx_ip.src, sr_policy_source)
1274 # received ip.dst should be equal to sidlist[lastentry]
1275 self.assertEqual(rx_ip.dst, tx_seglist[-1])
1276 # rx'ed seglist should be equal to seglist
1277 self.assertEqual(rx_srh.addresses, tx_seglist)
1278 # segleft should be equal to size seglist-1
1279 self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1280 # segleft should be equal to lastentry
1281 self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1283 # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1284 # except for the ttl field and ip checksum
1285 # -> adjust tx'ed ttl to expected ttl
1286 tx_ip.ttl = tx_ip.ttl - 1
1287 # -> set tx'ed ip checksum to None and let scapy recompute
1289 # read back the pkt (with str()) to force computing these fields
1290 # probably other ways to accomplish this are possible
1291 tx_ip = IP(str(tx_ip))
1293 self.assertEqual(rx_srh.payload, tx_ip)
1295 self.logger.debug("packet verification: SUCCESS")
1297 def compare_rx_tx_packet_T_Encaps_L2(self, tx_pkt, rx_pkt):
1298 """ Compare input and output packet after passing T.Encaps for L2
1300 :param tx_pkt: transmitted packet
1301 :param rx_pkt: received packet
1303 # T.Encaps for L2 updates the headers as follows:
1304 # SR Policy seglist (S3, S2, S1)
1305 # SR Policy source C
1308 # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)L2
1310 # get first (outer) IPv6 header of rx'ed packet
1311 rx_ip = rx_pkt.getlayer(IPv6)
1314 tx_ether = tx_pkt.getlayer(Ether)
1316 # expected segment-list
1317 seglist = self.sr_policy.segments
1318 # reverse list to get order as in SRH
1319 tx_seglist = seglist[::-1]
1321 # get source address of SR Policy
1322 sr_policy_source = self.sr_policy.source
1324 # rx'ed packet should have SRH
1325 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1327 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1329 # received ip.src should be equal to SR Policy source
1330 self.assertEqual(rx_ip.src, sr_policy_source)
1331 # received ip.dst should be equal to sidlist[lastentry]
1332 self.assertEqual(rx_ip.dst, tx_seglist[-1])
1333 # rx'ed seglist should be equal to seglist
1334 self.assertEqual(rx_srh.addresses, tx_seglist)
1335 # segleft should be equal to size seglist-1
1336 self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1337 # segleft should be equal to lastentry
1338 self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1339 # nh should be "No Next Header" (59)
1340 self.assertEqual(rx_srh.nh, 59)
1342 # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1343 self.assertEqual(Ether(str(rx_srh.payload)), tx_ether)
1345 self.logger.debug("packet verification: SUCCESS")
1347 def compare_rx_tx_packet_T_Insert(self, tx_pkt, rx_pkt):
1348 """ Compare input and output packet after passing T.Insert
1350 :param tx_pkt: transmitted packet
1351 :param rx_pkt: received packet
1353 # T.Insert updates the headers as follows:
1356 # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)
1358 # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1)
1359 # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)SRH(B3, B2, B1; SL=1)
1361 # get first (outer) IPv6 header of rx'ed packet
1362 rx_ip = rx_pkt.getlayer(IPv6)
1367 rx_udp = rx_pkt[UDP]
1369 tx_ip = tx_pkt.getlayer(IPv6)
1372 # some packets have been tx'ed with an SRH, some without it
1373 # get SRH if tx'ed packet has it
1374 if tx_pkt.haslayer(IPv6ExtHdrSegmentRouting):
1375 tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1376 tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1377 tx_udp = tx_pkt[UDP]
1379 # expected segment-list (make copy of SR Policy segment list)
1380 seglist = self.sr_policy.segments[:]
1381 # expected seglist has initial dest addr as last segment
1382 seglist.append(tx_ip.dst)
1383 # reverse list to get order as in SRH
1384 tx_seglist = seglist[::-1]
1386 # get source address of SR Policy
1387 sr_policy_source = self.sr_policy.source
1389 # checks common to cases tx with and without SRH
1390 # rx'ed packet should have SRH and only one IPv6 header
1391 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1392 self.assertFalse(rx_ip.payload.haslayer(IPv6))
1394 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1396 # rx'ed ip.src should be equal to tx'ed ip.src
1397 self.assertEqual(rx_ip.src, tx_ip.src)
1398 # rx'ed ip.dst should be equal to sidlist[lastentry]
1399 self.assertEqual(rx_ip.dst, tx_seglist[-1])
1401 # rx'ed seglist should be equal to expected seglist
1402 self.assertEqual(rx_srh.addresses, tx_seglist)
1403 # segleft should be equal to size(expected seglist)-1
1404 self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1405 # segleft should be equal to lastentry
1406 self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1408 if tx_srh: # packet was tx'ed with SRH
1409 # packet should have 2nd SRH
1410 self.assertTrue(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting))
1412 rx_srh2 = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting, 2)
1414 # rx'ed srh2.addresses should be equal to tx'ed srh.addresses
1415 self.assertEqual(rx_srh2.addresses, tx_srh.addresses)
1416 # rx'ed srh2.segleft should be equal to tx'ed srh.segleft
1417 self.assertEqual(rx_srh2.segleft, tx_srh.segleft)
1418 # rx'ed srh2.lastentry should be equal to tx'ed srh.lastentry
1419 self.assertEqual(rx_srh2.lastentry, tx_srh.lastentry)
1421 else: # packet was tx'ed without SRH
1422 # rx packet should have no other SRH
1423 self.assertFalse(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting))
1425 # UDP layer should be unchanged
1426 self.assertEqual(rx_udp, tx_udp)
1428 self.logger.debug("packet verification: SUCCESS")
1430 def compare_rx_tx_packet_End(self, tx_pkt, rx_pkt):
1431 """ Compare input and output packet after passing End (without PSP)
1433 :param tx_pkt: transmitted packet
1434 :param rx_pkt: received packet
1436 # End (no PSP) updates the headers as follows:
1438 # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2)
1439 # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1441 # get first (outer) IPv6 header of rx'ed packet
1442 rx_ip = rx_pkt.getlayer(IPv6)
1445 rx_udp = rx_pkt[UDP]
1447 tx_ip = tx_pkt.getlayer(IPv6)
1448 # we know the packet has been tx'ed
1449 # with an inner IPv6 header and an SRH
1450 tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1451 tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1452 tx_udp = tx_pkt[UDP]
1454 # common checks, regardless of tx segleft value
1455 # rx'ed packet should have 2nd IPv6 header
1456 self.assertTrue(rx_ip.payload.haslayer(IPv6))
1457 # get second (inner) IPv6 header
1458 rx_ip2 = rx_pkt.getlayer(IPv6, 2)
1460 if tx_ip.segleft > 0:
1461 # SRH should NOT have been popped:
1462 # End SID without PSP does not pop SRH if segleft>0
1463 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1464 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1466 # received ip.src should be equal to expected ip.src
1467 self.assertEqual(rx_ip.src, tx_ip.src)
1468 # sidlist should be unchanged
1469 self.assertEqual(rx_srh.addresses, tx_srh.addresses)
1470 # segleft should have been decremented
1471 self.assertEqual(rx_srh.segleft, tx_srh.segleft-1)
1472 # received ip.dst should be equal to sidlist[segleft]
1473 self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
1474 # lastentry should be unchanged
1475 self.assertEqual(rx_srh.lastentry, tx_srh.lastentry)
1476 # inner IPv6 packet (ip2) should be unchanged
1477 self.assertEqual(rx_ip2.src, tx_ip2.src)
1478 self.assertEqual(rx_ip2.dst, tx_ip2.dst)
1479 # else: # tx_ip.segleft == 0
1480 # TODO: Does this work with 2 SRHs in ingress packet?
1482 # UDP layer should be unchanged
1483 self.assertEqual(rx_udp, tx_udp)
1485 self.logger.debug("packet verification: SUCCESS")
1487 def compare_rx_tx_packet_End_PSP(self, tx_pkt, rx_pkt):
1488 """ Compare input and output packet after passing End with PSP
1490 :param tx_pkt: transmitted packet
1491 :param rx_pkt: received packet
1493 # End (PSP) updates the headers as follows:
1494 # IPv6 + SRH (SL>1):
1495 # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2)
1496 # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1497 # IPv6 + SRH (SL=1):
1498 # in: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1501 # get first (outer) IPv6 header of rx'ed packet
1502 rx_ip = rx_pkt.getlayer(IPv6)
1505 rx_udp = rx_pkt[UDP]
1507 tx_ip = tx_pkt.getlayer(IPv6)
1508 # we know the packet has been tx'ed
1509 # with an inner IPv6 header and an SRH
1510 tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1511 tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1512 tx_udp = tx_pkt[UDP]
1514 # common checks, regardless of tx segleft value
1515 self.assertTrue(rx_ip.payload.haslayer(IPv6))
1516 rx_ip2 = rx_pkt.getlayer(IPv6, 2)
1517 # inner IPv6 packet (ip2) should be unchanged
1518 self.assertEqual(rx_ip2.src, tx_ip2.src)
1519 self.assertEqual(rx_ip2.dst, tx_ip2.dst)
1521 if tx_ip.segleft > 1:
1522 # SRH should NOT have been popped:
1523 # End SID with PSP does not pop SRH if segleft>1
1524 # rx'ed packet should have SRH
1525 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1526 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1528 # received ip.src should be equal to expected ip.src
1529 self.assertEqual(rx_ip.src, tx_ip.src)
1530 # sidlist should be unchanged
1531 self.assertEqual(rx_srh.addresses, tx_srh.addresses)
1532 # segleft should have been decremented
1533 self.assertEqual(rx_srh.segleft, tx_srh.segleft-1)
1534 # received ip.dst should be equal to sidlist[segleft]
1535 self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
1536 # lastentry should be unchanged
1537 self.assertEqual(rx_srh.lastentry, tx_srh.lastentry)
1539 else: # tx_ip.segleft <= 1
1540 # SRH should have been popped:
1541 # End SID with PSP and segleft=1 pops SRH
1542 # the two IPv6 headers are still present
1543 # outer IPv6 header has DA == last segment of popped SRH
1544 # SRH should not be present
1545 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1546 # outer IPv6 header ip.src should be equal to tx'ed ip.src
1547 self.assertEqual(rx_ip.src, tx_ip.src)
1548 # outer IPv6 header ip.dst should be = to tx'ed sidlist[segleft-1]
1549 self.assertEqual(rx_ip.dst, tx_srh.addresses[tx_srh.segleft-1])
1551 # UDP layer should be unchanged
1552 self.assertEqual(rx_udp, tx_udp)
1554 self.logger.debug("packet verification: SUCCESS")
1556 def compare_rx_tx_packet_End_DX6(self, tx_pkt, rx_pkt):
1557 """ Compare input and output packet after passing End.DX6
1559 :param tx_pkt: transmitted packet
1560 :param rx_pkt: received packet
1562 # End.DX6 updates the headers as follows:
1563 # IPv6 + SRH (SL=0):
1564 # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv6(B, D)
1567 # in: IPv6(A, S3)IPv6(B, D)
1570 # get first (outer) IPv6 header of rx'ed packet
1571 rx_ip = rx_pkt.getlayer(IPv6)
1573 tx_ip = tx_pkt.getlayer(IPv6)
1574 tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1576 # verify if rx'ed packet has no SRH
1577 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1579 # the whole rx_ip pkt should be equal to tx_ip2
1580 # except for the hlim field
1581 # -> adjust tx'ed hlim to expected hlim
1582 tx_ip2.hlim = tx_ip2.hlim - 1
1584 self.assertEqual(rx_ip, tx_ip2)
1586 self.logger.debug("packet verification: SUCCESS")
1588 def compare_rx_tx_packet_End_DX4(self, tx_pkt, rx_pkt):
1589 """ Compare input and output packet after passing End.DX4
1591 :param tx_pkt: transmitted packet
1592 :param rx_pkt: received packet
1594 # End.DX4 updates the headers as follows:
1595 # IPv6 + SRH (SL=0):
1596 # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv4(B, D)
1599 # in: IPv6(A, S3)IPv4(B, D)
1602 # get IPv4 header of rx'ed packet
1603 rx_ip = rx_pkt.getlayer(IP)
1605 tx_ip = tx_pkt.getlayer(IPv6)
1606 tx_ip2 = tx_pkt.getlayer(IP)
1608 # verify if rx'ed packet has no SRH
1609 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1611 # the whole rx_ip pkt should be equal to tx_ip2
1612 # except for the ttl field and ip checksum
1613 # -> adjust tx'ed ttl to expected ttl
1614 tx_ip2.ttl = tx_ip2.ttl - 1
1615 # -> set tx'ed ip checksum to None and let scapy recompute
1616 tx_ip2.chksum = None
1617 # read back the pkt (with str()) to force computing these fields
1618 # probably other ways to accomplish this are possible
1619 tx_ip2 = IP(str(tx_ip2))
1621 self.assertEqual(rx_ip, tx_ip2)
1623 self.logger.debug("packet verification: SUCCESS")
1625 def compare_rx_tx_packet_End_DX2(self, tx_pkt, rx_pkt):
1626 """ Compare input and output packet after passing End.DX2
1628 :param tx_pkt: transmitted packet
1629 :param rx_pkt: received packet
1631 # End.DX2 updates the headers as follows:
1632 # IPv6 + SRH (SL=0):
1633 # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)L2
1639 # get IPv4 header of rx'ed packet
1640 rx_eth = rx_pkt.getlayer(Ether)
1642 tx_ip = tx_pkt.getlayer(IPv6)
1643 # we can't just get the 2nd Ether layer
1644 # get the Raw content and dissect it as Ether
1645 tx_eth1 = Ether(str(tx_pkt[Raw]))
1647 # verify if rx'ed packet has no SRH
1648 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1650 # the whole rx_eth pkt should be equal to tx_eth1
1651 self.assertEqual(rx_eth, tx_eth1)
1653 self.logger.debug("packet verification: SUCCESS")
1655 def create_stream(self, src_if, dst_if, packet_header, packet_sizes,
1657 """Create SRv6 input packet stream for defined interface.
1659 :param VppInterface src_if: Interface to create packet stream for
1660 :param VppInterface dst_if: destination interface of packet stream
1661 :param packet_header: Layer3 scapy packet headers,
1662 L2 is added when not provided,
1663 Raw(payload) with packet_info is added
1664 :param list packet_sizes: packet stream pckt sizes,sequentially applied
1665 to packets in stream have
1666 :param int count: number of packets in packet stream
1667 :return: list of packets
1669 self.logger.info("Creating packets")
1671 for i in range(0, count-1):
1672 payload_info = self.create_packet_info(src_if, dst_if)
1674 "Creating packet with index %d" % (payload_info.index))
1675 payload = self.info_to_payload(payload_info)
1676 # add L2 header if not yet provided in packet_header
1677 if packet_header.getlayer(0).name == 'Ethernet':
1678 p = (packet_header /
1681 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
1684 size = packet_sizes[i % len(packet_sizes)]
1685 self.logger.debug("Packet size %d" % (size))
1686 self.extend_packet(p, size)
1687 # we need to store the packet with the automatic fields computed
1688 # read back the dumped packet (with str())
1689 # to force computing these fields
1690 # probably other ways are possible
1692 payload_info.data = p.copy()
1693 self.logger.debug(ppp("Created packet:", p))
1695 self.logger.info("Done creating packets")
1698 def send_and_verify_pkts(self, input, pkts, output, compare_func):
1699 """Send packets and verify received packets using compare_func
1701 :param input: ingress interface of DUT
1702 :param pkts: list of packets to transmit
1703 :param output: egress interface of DUT
1704 :param compare_func: function to compare in and out packets
1706 # add traffic stream to input interface
1707 input.add_stream(pkts)
1709 # enable capture on all interfaces
1710 self.pg_enable_capture(self.pg_interfaces)
1713 self.logger.info("Starting traffic")
1716 # get output capture
1717 self.logger.info("Getting packet capture")
1718 capture = output.get_capture()
1720 # assert nothing was captured on input interface
1721 input.assert_nothing_captured()
1723 # verify captured packets
1724 self.verify_captured_pkts(output, capture, compare_func)
1726 def create_packet_header_IPv6(self, dst):
1727 """Create packet header: IPv6 header, UDP header
1729 :param dst: IPv6 destination address
1731 IPv6 source address is 1234::1
1732 UDP source port and destination port are 1234
1735 p = (IPv6(src='1234::1', dst=dst) /
1736 UDP(sport=1234, dport=1234))
1739 def create_packet_header_IPv6_SRH(self, sidlist, segleft):
1740 """Create packet header: IPv6 header with SRH, UDP header
1742 :param list sidlist: segment list
1743 :param int segleft: segments-left field value
1745 IPv6 destination address is set to sidlist[segleft]
1746 IPv6 source addresses are 1234::1 and 4321::1
1747 UDP source port and destination port are 1234
1750 p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1751 IPv6ExtHdrSegmentRouting(addresses=sidlist) /
1752 UDP(sport=1234, dport=1234))
1755 def create_packet_header_IPv6_SRH_IPv6(self, dst, sidlist, segleft):
1756 """Create packet header: IPv6 encapsulated in SRv6:
1757 IPv6 header with SRH, IPv6 header, UDP header
1759 :param ipv6address dst: inner IPv6 destination address
1760 :param list sidlist: segment list of outer IPv6 SRH
1761 :param int segleft: segments-left field of outer IPv6 SRH
1763 Outer IPv6 destination address is set to sidlist[segleft]
1764 IPv6 source addresses are 1234::1 and 4321::1
1765 UDP source port and destination port are 1234
1768 p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1769 IPv6ExtHdrSegmentRouting(addresses=sidlist,
1770 segleft=segleft, nh=41) /
1771 IPv6(src='4321::1', dst=dst) /
1772 UDP(sport=1234, dport=1234))
1775 def create_packet_header_IPv6_IPv6(self, dst_inner, dst_outer):
1776 """Create packet header: IPv6 encapsulated in IPv6:
1777 IPv6 header, IPv6 header, UDP header
1779 :param ipv6address dst_inner: inner IPv6 destination address
1780 :param ipv6address dst_outer: outer IPv6 destination address
1782 IPv6 source addresses are 1234::1 and 4321::1
1783 UDP source port and destination port are 1234
1786 p = (IPv6(src='1234::1', dst=dst_outer) /
1787 IPv6(src='4321::1', dst=dst_inner) /
1788 UDP(sport=1234, dport=1234))
1791 def create_packet_header_IPv6_SRH_SRH_IPv6(self, dst, sidlist1, segleft1,
1792 sidlist2, segleft2):
1793 """Create packet header: IPv6 encapsulated in SRv6 with 2 SRH:
1794 IPv6 header with SRH, 2nd SRH, IPv6 header, UDP header
1796 :param ipv6address dst: inner IPv6 destination address
1797 :param list sidlist1: segment list of outer IPv6 SRH
1798 :param int segleft1: segments-left field of outer IPv6 SRH
1799 :param list sidlist2: segment list of inner IPv6 SRH
1800 :param int segleft2: segments-left field of inner IPv6 SRH
1802 Outer IPv6 destination address is set to sidlist[segleft]
1803 IPv6 source addresses are 1234::1 and 4321::1
1804 UDP source port and destination port are 1234
1807 p = (IPv6(src='1234::1', dst=sidlist1[segleft1]) /
1808 IPv6ExtHdrSegmentRouting(addresses=sidlist1,
1809 segleft=segleft1, nh=43) /
1810 IPv6ExtHdrSegmentRouting(addresses=sidlist2,
1811 segleft=segleft2, nh=41) /
1812 IPv6(src='4321::1', dst=dst) /
1813 UDP(sport=1234, dport=1234))
1816 def create_packet_header_IPv4(self, dst):
1817 """Create packet header: IPv4 header, UDP header
1819 :param dst: IPv4 destination address
1821 IPv4 source address is 123.1.1.1
1822 UDP source port and destination port are 1234
1825 p = (IP(src='123.1.1.1', dst=dst) /
1826 UDP(sport=1234, dport=1234))
1829 def create_packet_header_IPv6_IPv4(self, dst_inner, dst_outer):
1830 """Create packet header: IPv4 encapsulated in IPv6:
1831 IPv6 header, IPv4 header, UDP header
1833 :param ipv4address dst_inner: inner IPv4 destination address
1834 :param ipv6address dst_outer: outer IPv6 destination address
1836 IPv6 source address is 1234::1
1837 IPv4 source address is 123.1.1.1
1838 UDP source port and destination port are 1234
1841 p = (IPv6(src='1234::1', dst=dst_outer) /
1842 IP(src='123.1.1.1', dst=dst_inner) /
1843 UDP(sport=1234, dport=1234))
1846 def create_packet_header_IPv6_SRH_IPv4(self, dst, sidlist, segleft):
1847 """Create packet header: IPv4 encapsulated in SRv6:
1848 IPv6 header with SRH, IPv4 header, UDP header
1850 :param ipv4address dst: inner IPv4 destination address
1851 :param list sidlist: segment list of outer IPv6 SRH
1852 :param int segleft: segments-left field of outer IPv6 SRH
1854 Outer IPv6 destination address is set to sidlist[segleft]
1855 IPv6 source address is 1234::1
1856 IPv4 source address is 123.1.1.1
1857 UDP source port and destination port are 1234
1860 p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1861 IPv6ExtHdrSegmentRouting(addresses=sidlist,
1862 segleft=segleft, nh=4) /
1863 IP(src='123.1.1.1', dst=dst) /
1864 UDP(sport=1234, dport=1234))
1867 def create_packet_header_L2(self, vlan=0):
1868 """Create packet header: L2 header
1870 :param vlan: if vlan!=0 then add 802.1q header
1872 # Note: the dst addr ('00:55:44:33:22:11') is used in
1873 # the compare function compare_rx_tx_packet_T_Encaps_L2
1874 # to detect presence of L2 in SRH payload
1875 p = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
1876 etype = 0x8137 # IPX
1879 p /= Dot1Q(vlan=vlan, type=etype)
1884 def create_packet_header_IPv6_SRH_L2(self, sidlist, segleft, vlan=0):
1885 """Create packet header: L2 encapsulated in SRv6:
1886 IPv6 header with SRH, L2
1888 :param list sidlist: segment list of outer IPv6 SRH
1889 :param int segleft: segments-left field of outer IPv6 SRH
1890 :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
1892 Outer IPv6 destination address is set to sidlist[segleft]
1893 IPv6 source address is 1234::1
1895 eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
1896 etype = 0x8137 # IPX
1899 eth /= Dot1Q(vlan=vlan, type=etype)
1903 p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1904 IPv6ExtHdrSegmentRouting(addresses=sidlist,
1905 segleft=segleft, nh=59) /
1909 def create_packet_header_IPv6_L2(self, dst_outer, vlan=0):
1910 """Create packet header: L2 encapsulated in IPv6:
1913 :param ipv6address dst_outer: outer IPv6 destination address
1914 :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
1916 eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
1917 etype = 0x8137 # IPX
1920 eth /= Dot1Q(vlan=vlan, type=etype)
1924 p = (IPv6(src='1234::1', dst=dst_outer, nh=59) / eth)
1927 def get_payload_info(self, packet):
1928 """ Extract the payload_info from the packet
1930 # in most cases, payload_info is in packet[Raw]
1931 # but packet[Raw] gives the complete payload
1932 # (incl L2 header) for the T.Encaps L2 case
1934 payload_info = self.payload_to_info(str(packet[Raw]))
1937 # remote L2 header from packet[Raw]:
1938 # take packet[Raw], convert it to an Ether layer
1939 # and then extract Raw from it
1940 payload_info = self.payload_to_info(
1941 str(Ether(str(packet[Raw]))[Raw]))
1945 def verify_captured_pkts(self, dst_if, capture, compare_func):
1947 Verify captured packet stream for specified interface.
1948 Compare ingress with egress packets using the specified compare fn
1950 :param dst_if: egress interface of DUT
1951 :param capture: captured packets
1952 :param compare_func: function to compare in and out packet
1954 self.logger.info("Verifying capture on interface %s using function %s"
1955 % (dst_if.name, compare_func.func_name))
1958 for i in self.pg_interfaces:
1959 last_info[i.sw_if_index] = None
1960 dst_sw_if_index = dst_if.sw_if_index
1962 for packet in capture:
1964 # extract payload_info from packet's payload
1965 payload_info = self.get_payload_info(packet)
1966 packet_index = payload_info.index
1968 self.logger.debug("Verifying packet with index %d"
1970 # packet should have arrived on the expected interface
1971 self.assertEqual(payload_info.dst, dst_sw_if_index)
1973 "Got packet on interface %s: src=%u (idx=%u)" %
1974 (dst_if.name, payload_info.src, packet_index))
1976 # search for payload_info with same src and dst if_index
1977 # this will give us the transmitted packet
1978 next_info = self.get_next_packet_info_for_interface2(
1979 payload_info.src, dst_sw_if_index,
1980 last_info[payload_info.src])
1981 last_info[payload_info.src] = next_info
1982 # next_info should not be None
1983 self.assertTrue(next_info is not None)
1984 # index of tx and rx packets should be equal
1985 self.assertEqual(packet_index, next_info.index)
1986 # data field of next_info contains the tx packet
1987 txed_packet = next_info.data
1989 self.logger.debug(ppp("Transmitted packet:",
1990 txed_packet)) # ppp=Pretty Print Packet
1992 self.logger.debug(ppp("Received packet:", packet))
1994 # compare rcvd packet with expected packet using compare_func
1995 compare_func(txed_packet, packet)
1998 print packet.command()
1999 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2002 # have all expected packets arrived?
2003 for i in self.pg_interfaces:
2004 remaining_packet = self.get_next_packet_info_for_interface2(
2005 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
2006 self.assertTrue(remaining_packet is None,
2007 "Interface %s: Packet expected from interface %s "
2008 "didn't arrive" % (dst_if.name, i.name))
2011 if __name__ == '__main__':
2012 unittest.main(testRunner=VppTestRunner)