5 from socket import AF_INET6
7 from framework import VppTestCase, VppTestRunner
8 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto, VppIpTable
9 from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \
10 SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether, Dot1Q
14 from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
15 from scapy.layers.inet import IP, UDP
17 from scapy.utils import inet_pton, inet_ntop
22 class TestSRv6(VppTestCase):
23 """ SRv6 Test Case """
27 super(TestSRv6, self).setUpClass()
30 """ Perform test setup before each test case.
32 super(TestSRv6, self).setUp()
34 # packet sizes, inclusive L2 overhead
35 self.pg_packet_sizes = [64, 512, 1518, 9018]
38 self.reset_packet_infos()
41 """ Clean up test setup after each test case.
43 self.teardown_interfaces()
45 super(TestSRv6, self).tearDown()
47 def configure_interface(self,
49 ipv6=False, ipv4=False,
50 ipv6_table_id=0, ipv4_table_id=0):
51 """ Configure interface.
52 :param ipv6: configure IPv6 on interface
53 :param ipv4: configure IPv4 on interface
54 :param ipv6_table_id: FIB table_id for IPv6
55 :param ipv4_table_id: FIB table_id for IPv4
57 self.logger.debug("Configuring interface %s" % (interface.name))
59 self.logger.debug("Configuring IPv6")
60 interface.set_table_ip6(ipv6_table_id)
61 interface.config_ip6()
62 interface.resolve_ndp(timeout=5)
64 self.logger.debug("Configuring IPv4")
65 interface.set_table_ip4(ipv4_table_id)
66 interface.config_ip4()
67 interface.resolve_arp()
70 def setup_interfaces(self, ipv6=[], ipv4=[],
71 ipv6_table_id=[], ipv4_table_id=[]):
72 """ Create and configure interfaces.
74 :param ipv6: list of interface IPv6 capabilities
75 :param ipv4: list of interface IPv4 capabilities
76 :param ipv6_table_id: list of intf IPv6 FIB table_ids
77 :param ipv4_table_id: list of intf IPv4 FIB table_ids
78 :returns: List of created interfaces.
80 # how many interfaces?
85 self.logger.debug("Creating and configuring %d interfaces" % (count))
87 # fill up ipv6 and ipv4 lists if needed
88 # not enabled (False) is the default
90 ipv6 += (count - len(ipv6)) * [False]
92 ipv4 += (count - len(ipv4)) * [False]
94 # fill up table_id lists if needed
95 # table_id 0 (global) is the default
96 if len(ipv6_table_id) < count:
97 ipv6_table_id += (count - len(ipv6_table_id)) * [0]
98 if len(ipv4_table_id) < count:
99 ipv4_table_id += (count - len(ipv4_table_id)) * [0]
101 # create 'count' pg interfaces
102 self.create_pg_interfaces(range(count))
104 # setup all interfaces
105 for i in range(count):
106 intf = self.pg_interfaces[i]
107 self.configure_interface(intf,
109 ipv6_table_id[i], ipv4_table_id[i])
112 self.logger.debug(self.vapi.cli("show ip6 neighbors"))
114 self.logger.debug(self.vapi.cli("show ip arp"))
115 self.logger.debug(self.vapi.cli("show interface"))
116 self.logger.debug(self.vapi.cli("show hardware"))
118 return self.pg_interfaces
120 def teardown_interfaces(self):
121 """ Unconfigure and bring down interface.
123 self.logger.debug("Tearing down interfaces")
124 # tear down all interfaces
125 # AFAIK they cannot be deleted
126 for i in self.pg_interfaces:
127 self.logger.debug("Tear down interface %s" % (i.name))
133 @unittest.skipUnless(0, "PC to fix")
134 def test_SRv6_T_Encaps(self):
135 """ Test SRv6 Transit.Encaps behavior for IPv6.
137 # send traffic to one destination interface
138 # source and destination are IPv6 only
139 self.setup_interfaces(ipv6=[True, True])
141 # configure FIB entries
142 route = VppIpRoute(self, "a4::", 64,
143 [VppRoutePath(self.pg1.remote_ip6,
144 self.pg1.sw_if_index,
145 proto=DpoProto.DPO_PROTO_IP6)],
147 route.add_vpp_config()
149 # configure encaps IPv6 source address
150 # needs to be done before SR Policy config
152 self.vapi.cli("set sr encaps source addr a3::")
155 # configure SRv6 Policy
156 # Note: segment list order: first -> last
157 sr_policy = VppSRv6Policy(
160 sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
161 weight=1, fib_table=0,
162 segments=['a4::', 'a5::', 'a6::c7'],
164 sr_policy.add_vpp_config()
165 self.sr_policy = sr_policy
167 # log the sr policies
168 self.logger.info(self.vapi.cli("show sr policies"))
170 # steer IPv6 traffic to a7::/64 into SRv6 Policy
171 # use the bsid of the above self.sr_policy
172 pol_steering = VppSRv6Steering(
174 bsid=self.sr_policy.bsid,
175 prefix="a7::", mask_width=64,
176 traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
177 sr_policy_index=0, table_id=0,
179 pol_steering.add_vpp_config()
181 # log the sr steering policies
182 self.logger.info(self.vapi.cli("show sr steering policies"))
185 count = len(self.pg_packet_sizes)
186 dst_inner = 'a7::1234'
189 # create IPv6 packets without SRH
190 packet_header = self.create_packet_header_IPv6(dst_inner)
191 # create traffic stream pg0->pg1
192 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
193 self.pg_packet_sizes, count))
195 # create IPv6 packets with SRH
196 # packets with segments-left 1, active segment a7::
197 packet_header = self.create_packet_header_IPv6_SRH(
198 sidlist=['a8::', 'a7::', 'a6::'],
200 # create traffic stream pg0->pg1
201 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
202 self.pg_packet_sizes, count))
204 # create IPv6 packets with SRH and IPv6
205 # packets with segments-left 1, active segment a7::
206 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
208 sidlist=['a8::', 'a7::', 'a6::'],
210 # create traffic stream pg0->pg1
211 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
212 self.pg_packet_sizes, count))
214 # send packets and verify received packets
215 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
216 self.compare_rx_tx_packet_T_Encaps)
218 # log the localsid counters
219 self.logger.info(self.vapi.cli("show sr localsid"))
222 pol_steering.remove_vpp_config()
223 self.logger.info(self.vapi.cli("show sr steering policies"))
226 self.sr_policy.remove_vpp_config()
227 self.logger.info(self.vapi.cli("show sr policies"))
233 self.teardown_interfaces()
235 @unittest.skipUnless(0, "PC to fix")
236 def test_SRv6_T_Insert(self):
237 """ Test SRv6 Transit.Insert behavior (IPv6 only).
239 # send traffic to one destination interface
240 # source and destination are IPv6 only
241 self.setup_interfaces(ipv6=[True, True])
243 # configure FIB entries
244 route = VppIpRoute(self, "a4::", 64,
245 [VppRoutePath(self.pg1.remote_ip6,
246 self.pg1.sw_if_index,
247 proto=DpoProto.DPO_PROTO_IP6)],
249 route.add_vpp_config()
251 # configure encaps IPv6 source address
252 # needs to be done before SR Policy config
254 self.vapi.cli("set sr encaps source addr a3::")
257 # configure SRv6 Policy
258 # Note: segment list order: first -> last
259 sr_policy = VppSRv6Policy(
262 sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
263 weight=1, fib_table=0,
264 segments=['a4::', 'a5::', 'a6::c7'],
266 sr_policy.add_vpp_config()
267 self.sr_policy = sr_policy
269 # log the sr policies
270 self.logger.info(self.vapi.cli("show sr policies"))
272 # steer IPv6 traffic to a7::/64 into SRv6 Policy
273 # use the bsid of the above self.sr_policy
274 pol_steering = VppSRv6Steering(
276 bsid=self.sr_policy.bsid,
277 prefix="a7::", mask_width=64,
278 traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
279 sr_policy_index=0, table_id=0,
281 pol_steering.add_vpp_config()
283 # log the sr steering policies
284 self.logger.info(self.vapi.cli("show sr steering policies"))
287 count = len(self.pg_packet_sizes)
288 dst_inner = 'a7::1234'
291 # create IPv6 packets without SRH
292 packet_header = self.create_packet_header_IPv6(dst_inner)
293 # create traffic stream pg0->pg1
294 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
295 self.pg_packet_sizes, count))
297 # create IPv6 packets with SRH
298 # packets with segments-left 1, active segment a7::
299 packet_header = self.create_packet_header_IPv6_SRH(
300 sidlist=['a8::', 'a7::', 'a6::'],
302 # create traffic stream pg0->pg1
303 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
304 self.pg_packet_sizes, count))
306 # send packets and verify received packets
307 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
308 self.compare_rx_tx_packet_T_Insert)
310 # log the localsid counters
311 self.logger.info(self.vapi.cli("show sr localsid"))
314 pol_steering.remove_vpp_config()
315 self.logger.info(self.vapi.cli("show sr steering policies"))
318 self.sr_policy.remove_vpp_config()
319 self.logger.info(self.vapi.cli("show sr policies"))
325 self.teardown_interfaces()
327 @unittest.skipUnless(0, "PC to fix")
328 def test_SRv6_T_Encaps_IPv4(self):
329 """ Test SRv6 Transit.Encaps behavior for IPv4.
331 # send traffic to one destination interface
332 # source interface is IPv4 only
333 # destination interface is IPv6 only
334 self.setup_interfaces(ipv6=[False, True], ipv4=[True, False])
336 # configure FIB entries
337 route = VppIpRoute(self, "a4::", 64,
338 [VppRoutePath(self.pg1.remote_ip6,
339 self.pg1.sw_if_index,
340 proto=DpoProto.DPO_PROTO_IP6)],
342 route.add_vpp_config()
344 # configure encaps IPv6 source address
345 # needs to be done before SR Policy config
347 self.vapi.cli("set sr encaps source addr a3::")
350 # configure SRv6 Policy
351 # Note: segment list order: first -> last
352 sr_policy = VppSRv6Policy(
355 sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
356 weight=1, fib_table=0,
357 segments=['a4::', 'a5::', 'a6::c7'],
359 sr_policy.add_vpp_config()
360 self.sr_policy = sr_policy
362 # log the sr policies
363 self.logger.info(self.vapi.cli("show sr policies"))
365 # steer IPv4 traffic to 7.1.1.0/24 into SRv6 Policy
366 # use the bsid of the above self.sr_policy
367 pol_steering = VppSRv6Steering(
369 bsid=self.sr_policy.bsid,
370 prefix="7.1.1.0", mask_width=24,
371 traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV4,
372 sr_policy_index=0, table_id=0,
374 pol_steering.add_vpp_config()
376 # log the sr steering policies
377 self.logger.info(self.vapi.cli("show sr steering policies"))
380 count = len(self.pg_packet_sizes)
381 dst_inner = '7.1.1.123'
384 # create IPv4 packets
385 packet_header = self.create_packet_header_IPv4(dst_inner)
386 # create traffic stream pg0->pg1
387 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
388 self.pg_packet_sizes, count))
390 # send packets and verify received packets
391 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
392 self.compare_rx_tx_packet_T_Encaps_IPv4)
394 # log the localsid counters
395 self.logger.info(self.vapi.cli("show sr localsid"))
398 pol_steering.remove_vpp_config()
399 self.logger.info(self.vapi.cli("show sr steering policies"))
402 self.sr_policy.remove_vpp_config()
403 self.logger.info(self.vapi.cli("show sr policies"))
409 self.teardown_interfaces()
411 @unittest.skip("VPP crashes after running this test")
412 def test_SRv6_T_Encaps_L2(self):
413 """ Test SRv6 Transit.Encaps behavior for L2.
415 # send traffic to one destination interface
416 # source interface is IPv4 only TODO?
417 # destination interface is IPv6 only
418 self.setup_interfaces(ipv6=[False, True], ipv4=[False, False])
420 # configure FIB entries
421 route = VppIpRoute(self, "a4::", 64,
422 [VppRoutePath(self.pg1.remote_ip6,
423 self.pg1.sw_if_index,
424 proto=DpoProto.DPO_PROTO_IP6)],
426 route.add_vpp_config()
428 # configure encaps IPv6 source address
429 # needs to be done before SR Policy config
431 self.vapi.cli("set sr encaps source addr a3::")
434 # configure SRv6 Policy
435 # Note: segment list order: first -> last
436 sr_policy = VppSRv6Policy(
439 sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
440 weight=1, fib_table=0,
441 segments=['a4::', 'a5::', 'a6::c7'],
443 sr_policy.add_vpp_config()
444 self.sr_policy = sr_policy
446 # log the sr policies
447 self.logger.info(self.vapi.cli("show sr policies"))
449 # steer L2 traffic into SRv6 Policy
450 # use the bsid of the above self.sr_policy
451 pol_steering = VppSRv6Steering(
453 bsid=self.sr_policy.bsid,
454 prefix="::", mask_width=0,
455 traffic_type=SRv6PolicySteeringTypes.SR_STEER_L2,
456 sr_policy_index=0, table_id=0,
457 sw_if_index=self.pg0.sw_if_index)
458 pol_steering.add_vpp_config()
460 # log the sr steering policies
461 self.logger.info(self.vapi.cli("show sr steering policies"))
464 count = len(self.pg_packet_sizes)
467 # create L2 packets without dot1q header
468 packet_header = self.create_packet_header_L2()
469 # create traffic stream pg0->pg1
470 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
471 self.pg_packet_sizes, count))
473 # create L2 packets with dot1q header
474 packet_header = self.create_packet_header_L2(vlan=123)
475 # create traffic stream pg0->pg1
476 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
477 self.pg_packet_sizes, count))
479 # send packets and verify received packets
480 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
481 self.compare_rx_tx_packet_T_Encaps_L2)
483 # log the localsid counters
484 self.logger.info(self.vapi.cli("show sr localsid"))
487 pol_steering.remove_vpp_config()
488 self.logger.info(self.vapi.cli("show sr steering policies"))
491 self.sr_policy.remove_vpp_config()
492 self.logger.info(self.vapi.cli("show sr policies"))
498 self.teardown_interfaces()
500 def test_SRv6_End(self):
501 """ Test SRv6 End (without PSP) behavior.
503 # send traffic to one destination interface
504 # source and destination interfaces are IPv6 only
505 self.setup_interfaces(ipv6=[True, True])
507 # configure FIB entries
508 route = VppIpRoute(self, "a4::", 64,
509 [VppRoutePath(self.pg1.remote_ip6,
510 self.pg1.sw_if_index,
511 proto=DpoProto.DPO_PROTO_IP6)],
513 route.add_vpp_config()
515 # configure SRv6 localSID End without PSP behavior
516 localsid = VppSRv6LocalSID(
517 self, localsid={'addr': 'A3::0'},
518 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
525 localsid.add_vpp_config()
527 self.logger.debug(self.vapi.cli("show sr localsid"))
529 # create IPv6 packets with SRH (SL=2, SL=1, SL=0)
530 # send one packet per SL value per packet size
531 # SL=0 packet with localSID End with USP needs 2nd SRH
532 count = len(self.pg_packet_sizes)
533 dst_inner = 'a4::1234'
536 # packets with segments-left 2, active segment a3::
537 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
539 sidlist=['a5::', 'a4::', 'a3::'],
541 # create traffic stream pg0->pg1
542 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
543 self.pg_packet_sizes, count))
545 # packets with segments-left 1, active segment a3::
546 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
548 sidlist=['a4::', 'a3::', 'a2::'],
550 # add to traffic stream pg0->pg1
551 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
552 self.pg_packet_sizes, count))
554 # TODO: test behavior with SL=0 packet (needs 2*SRH?)
556 # send packets and verify received packets
557 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
558 self.compare_rx_tx_packet_End)
560 # log the localsid counters
561 self.logger.info(self.vapi.cli("show sr localsid"))
563 # remove SRv6 localSIDs
564 localsid.remove_vpp_config()
570 self.teardown_interfaces()
572 def test_SRv6_End_with_PSP(self):
573 """ Test SRv6 End with PSP behavior.
575 # send traffic to one destination interface
576 # source and destination interfaces are IPv6 only
577 self.setup_interfaces(ipv6=[True, True])
579 # configure FIB entries
580 route = VppIpRoute(self, "a4::", 64,
581 [VppRoutePath(self.pg1.remote_ip6,
582 self.pg1.sw_if_index,
583 proto=DpoProto.DPO_PROTO_IP6)],
585 route.add_vpp_config()
587 # configure SRv6 localSID End with PSP behavior
588 localsid = VppSRv6LocalSID(
589 self, localsid={'addr': 'A3::0'},
590 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
597 localsid.add_vpp_config()
599 self.logger.debug(self.vapi.cli("show sr localsid"))
601 # create IPv6 packets with SRH (SL=2, SL=1)
602 # send one packet per SL value per packet size
603 # SL=0 packet with localSID End with PSP is dropped
604 count = len(self.pg_packet_sizes)
605 dst_inner = 'a4::1234'
608 # packets with segments-left 2, active segment a3::
609 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
611 sidlist=['a5::', 'a4::', 'a3::'],
613 # create traffic stream pg0->pg1
614 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
615 self.pg_packet_sizes, count))
617 # packets with segments-left 1, active segment a3::
618 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
620 sidlist=['a4::', 'a3::', 'a2::'],
622 # add to traffic stream pg0->pg1
623 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
624 self.pg_packet_sizes, count))
626 # send packets and verify received packets
627 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
628 self.compare_rx_tx_packet_End_PSP)
630 # log the localsid counters
631 self.logger.info(self.vapi.cli("show sr localsid"))
633 # remove SRv6 localSIDs
634 localsid.remove_vpp_config()
640 self.teardown_interfaces()
642 def test_SRv6_End_X(self):
643 """ Test SRv6 End.X (without PSP) behavior.
645 # create three interfaces (1 source, 2 destinations)
646 # source and destination interfaces are IPv6 only
647 self.setup_interfaces(ipv6=[True, True, True])
649 # configure FIB entries
650 # a4::/64 via pg1 and pg2
651 route = VppIpRoute(self, "a4::", 64,
652 [VppRoutePath(self.pg1.remote_ip6,
653 self.pg1.sw_if_index,
654 proto=DpoProto.DPO_PROTO_IP6),
655 VppRoutePath(self.pg2.remote_ip6,
656 self.pg2.sw_if_index,
657 proto=DpoProto.DPO_PROTO_IP6)],
659 route.add_vpp_config()
660 self.logger.debug(self.vapi.cli("show ip6 fib"))
662 # configure SRv6 localSID End.X without PSP behavior
663 # End.X points to interface pg1
664 localsid = VppSRv6LocalSID(
665 self, localsid={'addr': 'A3::C4'},
666 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
668 nh_addr6=self.pg1.remote_ip6,
670 sw_if_index=self.pg1.sw_if_index,
673 localsid.add_vpp_config()
675 self.logger.debug(self.vapi.cli("show sr localsid"))
677 # create IPv6 packets with SRH (SL=2, SL=1)
678 # send one packet per SL value per packet size
679 # SL=0 packet with localSID End with PSP is dropped
680 count = len(self.pg_packet_sizes)
681 dst_inner = 'a4::1234'
684 # packets with segments-left 2, active segment a3::c4
685 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
687 sidlist=['a5::', 'a4::', 'a3::c4'],
689 # create traffic stream pg0->pg1
690 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
691 self.pg_packet_sizes, count))
693 # packets with segments-left 1, active segment a3::c4
694 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
696 sidlist=['a4::', 'a3::c4', 'a2::'],
698 # add to traffic stream pg0->pg1
699 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
700 self.pg_packet_sizes, count))
702 # send packets and verify received packets
703 # using same comparison function as End (no PSP)
704 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
705 self.compare_rx_tx_packet_End)
707 # assert nothing was received on the other interface (pg2)
708 self.pg2.assert_nothing_captured("mis-directed packet(s)")
710 # log the localsid counters
711 self.logger.info(self.vapi.cli("show sr localsid"))
713 # remove SRv6 localSIDs
714 localsid.remove_vpp_config()
720 self.teardown_interfaces()
722 def test_SRv6_End_X_with_PSP(self):
723 """ Test SRv6 End.X with PSP behavior.
725 # create three interfaces (1 source, 2 destinations)
726 # source and destination interfaces are IPv6 only
727 self.setup_interfaces(ipv6=[True, True, True])
729 # configure FIB entries
730 # a4::/64 via pg1 and pg2
731 route = VppIpRoute(self, "a4::", 64,
732 [VppRoutePath(self.pg1.remote_ip6,
733 self.pg1.sw_if_index,
734 proto=DpoProto.DPO_PROTO_IP6),
735 VppRoutePath(self.pg2.remote_ip6,
736 self.pg2.sw_if_index,
737 proto=DpoProto.DPO_PROTO_IP6)],
739 route.add_vpp_config()
741 # configure SRv6 localSID End with PSP behavior
742 localsid = VppSRv6LocalSID(
743 self, localsid={'addr': 'A3::C4'},
744 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
746 nh_addr6=self.pg1.remote_ip6,
748 sw_if_index=self.pg1.sw_if_index,
751 localsid.add_vpp_config()
753 self.logger.debug(self.vapi.cli("show sr localsid"))
755 # create IPv6 packets with SRH (SL=2, SL=1)
756 # send one packet per SL value per packet size
757 # SL=0 packet with localSID End with PSP is dropped
758 count = len(self.pg_packet_sizes)
759 dst_inner = 'a4::1234'
762 # packets with segments-left 2, active segment a3::
763 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
765 sidlist=['a5::', 'a4::', 'a3::c4'],
767 # create traffic stream pg0->pg1
768 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
769 self.pg_packet_sizes, count))
771 # packets with segments-left 1, active segment a3::
772 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
774 sidlist=['a4::', 'a3::c4', 'a2::'],
776 # add to traffic stream pg0->pg1
777 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
778 self.pg_packet_sizes, count))
780 # send packets and verify received packets
781 # using same comparison function as End with PSP
782 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
783 self.compare_rx_tx_packet_End_PSP)
785 # assert nothing was received on the other interface (pg2)
786 self.pg2.assert_nothing_captured("mis-directed packet(s)")
788 # log the localsid counters
789 self.logger.info(self.vapi.cli("show sr localsid"))
791 # remove SRv6 localSIDs
792 localsid.remove_vpp_config()
798 self.teardown_interfaces()
800 def test_SRv6_End_DX6(self):
801 """ Test SRv6 End.DX6 behavior.
803 # send traffic to one destination interface
804 # source and destination interfaces are IPv6 only
805 self.setup_interfaces(ipv6=[True, True])
807 # configure SRv6 localSID End.DX6 behavior
808 localsid = VppSRv6LocalSID(
809 self, localsid={'addr': 'A3::C4'},
810 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX6,
812 nh_addr6=self.pg1.remote_ip6,
814 sw_if_index=self.pg1.sw_if_index,
817 localsid.add_vpp_config()
819 self.logger.debug(self.vapi.cli("show sr localsid"))
821 # create IPv6 packets with SRH (SL=0)
822 # send one packet per packet size
823 count = len(self.pg_packet_sizes)
824 dst_inner = 'a4::1234' # inner header destination address
827 # packets with SRH, segments-left 0, active segment a3::c4
828 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
830 sidlist=['a3::c4', 'a2::', 'a1::'],
832 # add to traffic stream pg0->pg1
833 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
834 self.pg_packet_sizes, count))
836 # packets without SRH, IPv6 in IPv6
837 # outer IPv6 dest addr is the localsid End.DX6
838 packet_header = self.create_packet_header_IPv6_IPv6(
841 # add to traffic stream pg0->pg1
842 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
843 self.pg_packet_sizes, count))
845 # send packets and verify received packets
846 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
847 self.compare_rx_tx_packet_End_DX6)
849 # log the localsid counters
850 self.logger.info(self.vapi.cli("show sr localsid"))
852 # remove SRv6 localSIDs
853 localsid.remove_vpp_config()
856 self.teardown_interfaces()
858 def test_SRv6_End_DT6(self):
859 """ Test SRv6 End.DT6 behavior.
861 # create three interfaces (1 source, 2 destinations)
862 # all interfaces are IPv6 only
863 # source interface in global FIB (0)
864 # destination interfaces in global and vrf
866 ipt = VppIpTable(self, vrf_1, is_ip6=True)
868 self.setup_interfaces(ipv6=[True, True, True],
869 ipv6_table_id=[0, 0, vrf_1])
871 # configure FIB entries
872 # a4::/64 is reachable
873 # via pg1 in table 0 (global)
874 # and via pg2 in table vrf_1
875 route0 = VppIpRoute(self, "a4::", 64,
876 [VppRoutePath(self.pg1.remote_ip6,
877 self.pg1.sw_if_index,
878 proto=DpoProto.DPO_PROTO_IP6,
882 route0.add_vpp_config()
883 route1 = VppIpRoute(self, "a4::", 64,
884 [VppRoutePath(self.pg2.remote_ip6,
885 self.pg2.sw_if_index,
886 proto=DpoProto.DPO_PROTO_IP6,
890 route1.add_vpp_config()
891 self.logger.debug(self.vapi.cli("show ip6 fib"))
893 # configure SRv6 localSID End.DT6 behavior
895 # fib_table: where the localsid is installed
896 # sw_if_index: in T-variants of localsid this is the vrf table_id
897 localsid = VppSRv6LocalSID(
898 self, localsid={'addr': 'A3::C4'},
899 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT6,
906 localsid.add_vpp_config()
908 self.logger.debug(self.vapi.cli("show sr localsid"))
910 # create IPv6 packets with SRH (SL=0)
911 # send one packet per packet size
912 count = len(self.pg_packet_sizes)
913 dst_inner = 'a4::1234' # inner header destination address
916 # packets with SRH, segments-left 0, active segment a3::c4
917 packet_header = self.create_packet_header_IPv6_SRH_IPv6(
919 sidlist=['a3::c4', 'a2::', 'a1::'],
921 # add to traffic stream pg0->pg1
922 pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
923 self.pg_packet_sizes, count))
925 # packets without SRH, IPv6 in IPv6
926 # outer IPv6 dest addr is the localsid End.DT6
927 packet_header = self.create_packet_header_IPv6_IPv6(
930 # add to traffic stream pg0->pg1
931 pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
932 self.pg_packet_sizes, count))
934 # send packets and verify received packets
935 # using same comparison function as End.DX6
936 self.send_and_verify_pkts(self.pg0, pkts, self.pg2,
937 self.compare_rx_tx_packet_End_DX6)
939 # assert nothing was received on the other interface (pg2)
940 self.pg1.assert_nothing_captured("mis-directed packet(s)")
942 # log the localsid counters
943 self.logger.info(self.vapi.cli("show sr localsid"))
945 # remove SRv6 localSIDs
946 localsid.remove_vpp_config()
952 self.teardown_interfaces()
954 def test_SRv6_End_DX4(self):
955 """ Test SRv6 End.DX4 behavior.
957 # send traffic to one destination interface
958 # source interface is IPv6 only
959 # destination interface is IPv4 only
960 self.setup_interfaces(ipv6=[True, False], ipv4=[False, True])
962 # configure SRv6 localSID End.DX4 behavior
963 localsid = VppSRv6LocalSID(
964 self, localsid={'addr': 'A3::C4'},
965 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX4,
966 nh_addr4=self.pg1.remote_ip4,
969 sw_if_index=self.pg1.sw_if_index,
972 localsid.add_vpp_config()
974 self.logger.debug(self.vapi.cli("show sr localsid"))
976 # send one packet per packet size
977 count = len(self.pg_packet_sizes)
978 dst_inner = '4.1.1.123' # inner header destination address
981 # packets with SRH, segments-left 0, active segment a3::c4
982 packet_header = self.create_packet_header_IPv6_SRH_IPv4(
984 sidlist=['a3::c4', 'a2::', 'a1::'],
986 # add to traffic stream pg0->pg1
987 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
988 self.pg_packet_sizes, count))
990 # packets without SRH, IPv4 in IPv6
991 # outer IPv6 dest addr is the localsid End.DX4
992 packet_header = self.create_packet_header_IPv6_IPv4(
995 # add to traffic stream pg0->pg1
996 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
997 self.pg_packet_sizes, count))
999 # send packets and verify received packets
1000 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
1001 self.compare_rx_tx_packet_End_DX4)
1003 # log the localsid counters
1004 self.logger.info(self.vapi.cli("show sr localsid"))
1006 # remove SRv6 localSIDs
1007 localsid.remove_vpp_config()
1009 # cleanup interfaces
1010 self.teardown_interfaces()
1012 def test_SRv6_End_DT4(self):
1013 """ Test SRv6 End.DT4 behavior.
1015 # create three interfaces (1 source, 2 destinations)
1016 # source interface is IPv6-only
1017 # destination interfaces are IPv4 only
1018 # source interface in global FIB (0)
1019 # destination interfaces in global and vrf
1021 ipt = VppIpTable(self, vrf_1)
1022 ipt.add_vpp_config()
1023 self.setup_interfaces(ipv6=[True, False, False],
1024 ipv4=[False, True, True],
1025 ipv6_table_id=[0, 0, 0],
1026 ipv4_table_id=[0, 0, vrf_1])
1028 # configure FIB entries
1029 # 4.1.1.0/24 is reachable
1030 # via pg1 in table 0 (global)
1031 # and via pg2 in table vrf_1
1032 route0 = VppIpRoute(self, "4.1.1.0", 24,
1033 [VppRoutePath(self.pg1.remote_ip4,
1034 self.pg1.sw_if_index,
1038 route0.add_vpp_config()
1039 route1 = VppIpRoute(self, "4.1.1.0", 24,
1040 [VppRoutePath(self.pg2.remote_ip4,
1041 self.pg2.sw_if_index,
1042 nh_table_id=vrf_1)],
1045 route1.add_vpp_config()
1046 self.logger.debug(self.vapi.cli("show ip fib"))
1048 # configure SRv6 localSID End.DT6 behavior
1050 # fib_table: where the localsid is installed
1051 # sw_if_index: in T-variants of localsid: vrf table_id
1052 localsid = VppSRv6LocalSID(
1053 self, localsid={'addr': 'A3::C4'},
1054 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT4,
1061 localsid.add_vpp_config()
1063 self.logger.debug(self.vapi.cli("show sr localsid"))
1065 # create IPv6 packets with SRH (SL=0)
1066 # send one packet per packet size
1067 count = len(self.pg_packet_sizes)
1068 dst_inner = '4.1.1.123' # inner header destination address
1071 # packets with SRH, segments-left 0, active segment a3::c4
1072 packet_header = self.create_packet_header_IPv6_SRH_IPv4(
1074 sidlist=['a3::c4', 'a2::', 'a1::'],
1076 # add to traffic stream pg0->pg1
1077 pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
1078 self.pg_packet_sizes, count))
1080 # packets without SRH, IPv6 in IPv6
1081 # outer IPv6 dest addr is the localsid End.DX4
1082 packet_header = self.create_packet_header_IPv6_IPv4(
1085 # add to traffic stream pg0->pg1
1086 pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
1087 self.pg_packet_sizes, count))
1089 # send packets and verify received packets
1090 # using same comparison function as End.DX4
1091 self.send_and_verify_pkts(self.pg0, pkts, self.pg2,
1092 self.compare_rx_tx_packet_End_DX4)
1094 # assert nothing was received on the other interface (pg2)
1095 self.pg1.assert_nothing_captured("mis-directed packet(s)")
1097 # log the localsid counters
1098 self.logger.info(self.vapi.cli("show sr localsid"))
1100 # remove SRv6 localSIDs
1101 localsid.remove_vpp_config()
1103 # remove FIB entries
1106 # cleanup interfaces
1107 self.teardown_interfaces()
1109 def test_SRv6_End_DX2(self):
1110 """ Test SRv6 End.DX2 behavior.
1112 # send traffic to one destination interface
1113 # source interface is IPv6 only
1114 self.setup_interfaces(ipv6=[True, False], ipv4=[False, False])
1116 # configure SRv6 localSID End.DX2 behavior
1117 localsid = VppSRv6LocalSID(
1118 self, localsid={'addr': 'A3::C4'},
1119 behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX2,
1123 sw_if_index=self.pg1.sw_if_index,
1126 localsid.add_vpp_config()
1128 self.logger.debug(self.vapi.cli("show sr localsid"))
1130 # send one packet per packet size
1131 count = len(self.pg_packet_sizes)
1134 # packets with SRH, segments-left 0, active segment a3::c4
1135 # L2 has no dot1q header
1136 packet_header = self.create_packet_header_IPv6_SRH_L2(
1137 sidlist=['a3::c4', 'a2::', 'a1::'],
1140 # add to traffic stream pg0->pg1
1141 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1142 self.pg_packet_sizes, count))
1144 # packets with SRH, segments-left 0, active segment a3::c4
1145 # L2 has dot1q header
1146 packet_header = self.create_packet_header_IPv6_SRH_L2(
1147 sidlist=['a3::c4', 'a2::', 'a1::'],
1150 # add to traffic stream pg0->pg1
1151 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1152 self.pg_packet_sizes, count))
1154 # packets without SRH, L2 in IPv6
1155 # outer IPv6 dest addr is the localsid End.DX2
1156 # L2 has no dot1q header
1157 packet_header = self.create_packet_header_IPv6_L2(
1160 # add to traffic stream pg0->pg1
1161 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1162 self.pg_packet_sizes, count))
1164 # packets without SRH, L2 in IPv6
1165 # outer IPv6 dest addr is the localsid End.DX2
1166 # L2 has dot1q header
1167 packet_header = self.create_packet_header_IPv6_L2(
1170 # add to traffic stream pg0->pg1
1171 pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1172 self.pg_packet_sizes, count))
1174 # send packets and verify received packets
1175 self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
1176 self.compare_rx_tx_packet_End_DX2)
1178 # log the localsid counters
1179 self.logger.info(self.vapi.cli("show sr localsid"))
1181 # remove SRv6 localSIDs
1182 localsid.remove_vpp_config()
1184 # cleanup interfaces
1185 self.teardown_interfaces()
1187 @unittest.skipUnless(0, "PC to fix")
1188 def test_SRv6_T_Insert_Classifier(self):
1189 """ Test SRv6 Transit.Insert behavior (IPv6 only).
1190 steer packets using the classifier
1192 # send traffic to one destination interface
1193 # source and destination are IPv6 only
1194 self.setup_interfaces(ipv6=[False, False, False, True, True])
1196 # configure FIB entries
1197 route = VppIpRoute(self, "a4::", 64,
1198 [VppRoutePath(self.pg4.remote_ip6,
1199 self.pg4.sw_if_index,
1200 proto=DpoProto.DPO_PROTO_IP6)],
1202 route.add_vpp_config()
1204 # configure encaps IPv6 source address
1205 # needs to be done before SR Policy config
1207 self.vapi.cli("set sr encaps source addr a3::")
1210 # configure SRv6 Policy
1211 # Note: segment list order: first -> last
1212 sr_policy = VppSRv6Policy(
1215 sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
1216 weight=1, fib_table=0,
1217 segments=['a4::', 'a5::', 'a6::c7'],
1219 sr_policy.add_vpp_config()
1220 self.sr_policy = sr_policy
1222 # log the sr policies
1223 self.logger.info(self.vapi.cli("show sr policies"))
1225 # add classify table
1226 # mask on dst ip address prefix a7::/8
1227 mask = '{:0<16}'.format('ff')
1228 r = self.vapi.classify_add_del_table(
1230 binascii.unhexlify(mask),
1231 match_n_vectors=(len(mask) - 1) // 32 + 1,
1232 current_data_flag=1,
1233 skip_n_vectors=2) # data offset
1234 self.assertIsNotNone(r, msg='No response msg for add_del_table')
1235 table_index = r.new_table_index
1237 # add the source routign node as a ip6 inacl netxt node
1238 r = self.vapi.add_node_next('ip6-inacl',
1239 'sr-pl-rewrite-insert')
1240 inacl_next_node_index = r.node_index
1242 match = '{:0<16}'.format('a7')
1243 r = self.vapi.classify_add_del_session(
1246 binascii.unhexlify(match),
1247 hit_next_index=inacl_next_node_index,
1249 metadata=0) # sr policy index
1250 self.assertIsNotNone(r, msg='No response msg for add_del_session')
1252 # log the classify table used in the steering policy
1253 self.logger.info(self.vapi.cli("show classify table"))
1255 r = self.vapi.input_acl_set_interface(
1257 sw_if_index=self.pg3.sw_if_index,
1258 ip6_table_index=table_index)
1259 self.assertIsNotNone(r,
1260 msg='No response msg for input_acl_set_interface')
1263 self.logger.info(self.vapi.cli("show inacl type ip6"))
1266 count = len(self.pg_packet_sizes)
1267 dst_inner = 'a7::1234'
1270 # create IPv6 packets without SRH
1271 packet_header = self.create_packet_header_IPv6(dst_inner)
1272 # create traffic stream pg3->pg4
1273 pkts.extend(self.create_stream(self.pg3, self.pg4, packet_header,
1274 self.pg_packet_sizes, count))
1276 # create IPv6 packets with SRH
1277 # packets with segments-left 1, active segment a7::
1278 packet_header = self.create_packet_header_IPv6_SRH(
1279 sidlist=['a8::', 'a7::', 'a6::'],
1281 # create traffic stream pg3->pg4
1282 pkts.extend(self.create_stream(self.pg3, self.pg4, packet_header,
1283 self.pg_packet_sizes, count))
1285 # send packets and verify received packets
1286 self.send_and_verify_pkts(self.pg3, pkts, self.pg4,
1287 self.compare_rx_tx_packet_T_Insert)
1289 # remove the interface l2 input feature
1290 r = self.vapi.input_acl_set_interface(
1292 sw_if_index=self.pg3.sw_if_index,
1293 ip6_table_index=table_index)
1294 self.assertIsNotNone(r,
1295 msg='No response msg for input_acl_set_interface')
1297 # log the ip6 inacl after cleaning
1298 self.logger.info(self.vapi.cli("show inacl type ip6"))
1300 # log the localsid counters
1301 self.logger.info(self.vapi.cli("show sr localsid"))
1303 # remove classifier SR steering
1304 # classifier_steering.remove_vpp_config()
1305 self.logger.info(self.vapi.cli("show sr steering policies"))
1307 # remove SR Policies
1308 self.sr_policy.remove_vpp_config()
1309 self.logger.info(self.vapi.cli("show sr policies"))
1311 # remove classify session and table
1312 r = self.vapi.classify_add_del_session(
1315 binascii.unhexlify(match))
1316 self.assertIsNotNone(r, msg='No response msg for add_del_session')
1318 r = self.vapi.classify_add_del_table(
1320 binascii.unhexlify(mask),
1321 table_index=table_index)
1322 self.assertIsNotNone(r, msg='No response msg for add_del_table')
1324 self.logger.info(self.vapi.cli("show classify table"))
1326 # remove FIB entries
1329 # cleanup interfaces
1330 self.teardown_interfaces()
1332 def compare_rx_tx_packet_T_Encaps(self, tx_pkt, rx_pkt):
1333 """ Compare input and output packet after passing T.Encaps
1335 :param tx_pkt: transmitted packet
1336 :param rx_pkt: received packet
1338 # T.Encaps updates the headers as follows:
1339 # SR Policy seglist (S3, S2, S1)
1340 # SR Policy source C
1343 # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(A, B2)
1345 # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1)
1346 # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(a, B2)SRH(B3, B2, B1; SL=1)
1348 # get first (outer) IPv6 header of rx'ed packet
1349 rx_ip = rx_pkt.getlayer(IPv6)
1352 tx_ip = tx_pkt.getlayer(IPv6)
1354 # expected segment-list
1355 seglist = self.sr_policy.segments
1356 # reverse list to get order as in SRH
1357 tx_seglist = seglist[::-1]
1359 # get source address of SR Policy
1360 sr_policy_source = self.sr_policy.source
1362 # rx'ed packet should have SRH
1363 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1365 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1367 # received ip.src should be equal to SR Policy source
1368 self.assertEqual(rx_ip.src, sr_policy_source)
1369 # received ip.dst should be equal to expected sidlist[lastentry]
1370 self.assertEqual(rx_ip.dst, tx_seglist[-1])
1371 # rx'ed seglist should be equal to expected seglist
1372 self.assertEqual(rx_srh.addresses, tx_seglist)
1373 # segleft should be equal to size expected seglist-1
1374 self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1375 # segleft should be equal to lastentry
1376 self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1378 # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1379 # except for the hop-limit field
1380 # -> update tx'ed hlim to the expected hlim
1381 tx_ip.hlim = tx_ip.hlim - 1
1383 self.assertEqual(rx_srh.payload, tx_ip)
1385 self.logger.debug("packet verification: SUCCESS")
1387 def compare_rx_tx_packet_T_Encaps_IPv4(self, tx_pkt, rx_pkt):
1388 """ Compare input and output packet after passing T.Encaps for IPv4
1390 :param tx_pkt: transmitted packet
1391 :param rx_pkt: received packet
1393 # T.Encaps for IPv4 updates the headers as follows:
1394 # SR Policy seglist (S3, S2, S1)
1395 # SR Policy source C
1398 # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv4(A, B2)
1400 # get first (outer) IPv6 header of rx'ed packet
1401 rx_ip = rx_pkt.getlayer(IPv6)
1404 tx_ip = tx_pkt.getlayer(IP)
1406 # expected segment-list
1407 seglist = self.sr_policy.segments
1408 # reverse list to get order as in SRH
1409 tx_seglist = seglist[::-1]
1411 # get source address of SR Policy
1412 sr_policy_source = self.sr_policy.source
1414 # checks common to cases tx with and without SRH
1415 # rx'ed packet should have SRH and IPv4 header
1416 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1417 self.assertTrue(rx_ip.payload.haslayer(IP))
1419 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1421 # received ip.src should be equal to SR Policy source
1422 self.assertEqual(rx_ip.src, sr_policy_source)
1423 # received ip.dst should be equal to sidlist[lastentry]
1424 self.assertEqual(rx_ip.dst, tx_seglist[-1])
1425 # rx'ed seglist should be equal to seglist
1426 self.assertEqual(rx_srh.addresses, tx_seglist)
1427 # segleft should be equal to size seglist-1
1428 self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1429 # segleft should be equal to lastentry
1430 self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1432 # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1433 # except for the ttl field and ip checksum
1434 # -> adjust tx'ed ttl to expected ttl
1435 tx_ip.ttl = tx_ip.ttl - 1
1436 # -> set tx'ed ip checksum to None and let scapy recompute
1438 # read back the pkt (with str()) to force computing these fields
1439 # probably other ways to accomplish this are possible
1440 tx_ip = IP(str(tx_ip))
1442 self.assertEqual(rx_srh.payload, tx_ip)
1444 self.logger.debug("packet verification: SUCCESS")
1446 def compare_rx_tx_packet_T_Encaps_L2(self, tx_pkt, rx_pkt):
1447 """ Compare input and output packet after passing T.Encaps for L2
1449 :param tx_pkt: transmitted packet
1450 :param rx_pkt: received packet
1452 # T.Encaps for L2 updates the headers as follows:
1453 # SR Policy seglist (S3, S2, S1)
1454 # SR Policy source C
1457 # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)L2
1459 # get first (outer) IPv6 header of rx'ed packet
1460 rx_ip = rx_pkt.getlayer(IPv6)
1463 tx_ether = tx_pkt.getlayer(Ether)
1465 # expected segment-list
1466 seglist = self.sr_policy.segments
1467 # reverse list to get order as in SRH
1468 tx_seglist = seglist[::-1]
1470 # get source address of SR Policy
1471 sr_policy_source = self.sr_policy.source
1473 # rx'ed packet should have SRH
1474 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1476 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1478 # received ip.src should be equal to SR Policy source
1479 self.assertEqual(rx_ip.src, sr_policy_source)
1480 # received ip.dst should be equal to sidlist[lastentry]
1481 self.assertEqual(rx_ip.dst, tx_seglist[-1])
1482 # rx'ed seglist should be equal to seglist
1483 self.assertEqual(rx_srh.addresses, tx_seglist)
1484 # segleft should be equal to size seglist-1
1485 self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1486 # segleft should be equal to lastentry
1487 self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1488 # nh should be "No Next Header" (59)
1489 self.assertEqual(rx_srh.nh, 59)
1491 # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1492 self.assertEqual(Ether(str(rx_srh.payload)), tx_ether)
1494 self.logger.debug("packet verification: SUCCESS")
1496 def compare_rx_tx_packet_T_Insert(self, tx_pkt, rx_pkt):
1497 """ Compare input and output packet after passing T.Insert
1499 :param tx_pkt: transmitted packet
1500 :param rx_pkt: received packet
1502 # T.Insert updates the headers as follows:
1505 # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)
1507 # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1)
1508 # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)SRH(B3, B2, B1; SL=1)
1510 # get first (outer) IPv6 header of rx'ed packet
1511 rx_ip = rx_pkt.getlayer(IPv6)
1516 rx_udp = rx_pkt[UDP]
1518 tx_ip = tx_pkt.getlayer(IPv6)
1521 # some packets have been tx'ed with an SRH, some without it
1522 # get SRH if tx'ed packet has it
1523 if tx_pkt.haslayer(IPv6ExtHdrSegmentRouting):
1524 tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1525 tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1526 tx_udp = tx_pkt[UDP]
1528 # expected segment-list (make copy of SR Policy segment list)
1529 seglist = self.sr_policy.segments[:]
1530 # expected seglist has initial dest addr as last segment
1531 seglist.append(tx_ip.dst)
1532 # reverse list to get order as in SRH
1533 tx_seglist = seglist[::-1]
1535 # get source address of SR Policy
1536 sr_policy_source = self.sr_policy.source
1538 # checks common to cases tx with and without SRH
1539 # rx'ed packet should have SRH and only one IPv6 header
1540 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1541 self.assertFalse(rx_ip.payload.haslayer(IPv6))
1543 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1545 # rx'ed ip.src should be equal to tx'ed ip.src
1546 self.assertEqual(rx_ip.src, tx_ip.src)
1547 # rx'ed ip.dst should be equal to sidlist[lastentry]
1548 self.assertEqual(rx_ip.dst, tx_seglist[-1])
1550 # rx'ed seglist should be equal to expected seglist
1551 self.assertEqual(rx_srh.addresses, tx_seglist)
1552 # segleft should be equal to size(expected seglist)-1
1553 self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1554 # segleft should be equal to lastentry
1555 self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1557 if tx_srh: # packet was tx'ed with SRH
1558 # packet should have 2nd SRH
1559 self.assertTrue(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting))
1561 rx_srh2 = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting, 2)
1563 # rx'ed srh2.addresses should be equal to tx'ed srh.addresses
1564 self.assertEqual(rx_srh2.addresses, tx_srh.addresses)
1565 # rx'ed srh2.segleft should be equal to tx'ed srh.segleft
1566 self.assertEqual(rx_srh2.segleft, tx_srh.segleft)
1567 # rx'ed srh2.lastentry should be equal to tx'ed srh.lastentry
1568 self.assertEqual(rx_srh2.lastentry, tx_srh.lastentry)
1570 else: # packet was tx'ed without SRH
1571 # rx packet should have no other SRH
1572 self.assertFalse(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting))
1574 # UDP layer should be unchanged
1575 self.assertEqual(rx_udp, tx_udp)
1577 self.logger.debug("packet verification: SUCCESS")
1579 def compare_rx_tx_packet_End(self, tx_pkt, rx_pkt):
1580 """ Compare input and output packet after passing End (without PSP)
1582 :param tx_pkt: transmitted packet
1583 :param rx_pkt: received packet
1585 # End (no PSP) updates the headers as follows:
1587 # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2)
1588 # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1590 # get first (outer) IPv6 header of rx'ed packet
1591 rx_ip = rx_pkt.getlayer(IPv6)
1594 rx_udp = rx_pkt[UDP]
1596 tx_ip = tx_pkt.getlayer(IPv6)
1597 # we know the packet has been tx'ed
1598 # with an inner IPv6 header and an SRH
1599 tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1600 tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1601 tx_udp = tx_pkt[UDP]
1603 # common checks, regardless of tx segleft value
1604 # rx'ed packet should have 2nd IPv6 header
1605 self.assertTrue(rx_ip.payload.haslayer(IPv6))
1606 # get second (inner) IPv6 header
1607 rx_ip2 = rx_pkt.getlayer(IPv6, 2)
1609 if tx_ip.segleft > 0:
1610 # SRH should NOT have been popped:
1611 # End SID without PSP does not pop SRH if segleft>0
1612 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1613 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1615 # received ip.src should be equal to expected ip.src
1616 self.assertEqual(rx_ip.src, tx_ip.src)
1617 # sidlist should be unchanged
1618 self.assertEqual(rx_srh.addresses, tx_srh.addresses)
1619 # segleft should have been decremented
1620 self.assertEqual(rx_srh.segleft, tx_srh.segleft-1)
1621 # received ip.dst should be equal to sidlist[segleft]
1622 self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
1623 # lastentry should be unchanged
1624 self.assertEqual(rx_srh.lastentry, tx_srh.lastentry)
1625 # inner IPv6 packet (ip2) should be unchanged
1626 self.assertEqual(rx_ip2.src, tx_ip2.src)
1627 self.assertEqual(rx_ip2.dst, tx_ip2.dst)
1628 # else: # tx_ip.segleft == 0
1629 # TODO: Does this work with 2 SRHs in ingress packet?
1631 # UDP layer should be unchanged
1632 self.assertEqual(rx_udp, tx_udp)
1634 self.logger.debug("packet verification: SUCCESS")
1636 def compare_rx_tx_packet_End_PSP(self, tx_pkt, rx_pkt):
1637 """ Compare input and output packet after passing End with PSP
1639 :param tx_pkt: transmitted packet
1640 :param rx_pkt: received packet
1642 # End (PSP) updates the headers as follows:
1643 # IPv6 + SRH (SL>1):
1644 # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2)
1645 # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1646 # IPv6 + SRH (SL=1):
1647 # in: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1650 # get first (outer) IPv6 header of rx'ed packet
1651 rx_ip = rx_pkt.getlayer(IPv6)
1654 rx_udp = rx_pkt[UDP]
1656 tx_ip = tx_pkt.getlayer(IPv6)
1657 # we know the packet has been tx'ed
1658 # with an inner IPv6 header and an SRH
1659 tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1660 tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1661 tx_udp = tx_pkt[UDP]
1663 # common checks, regardless of tx segleft value
1664 self.assertTrue(rx_ip.payload.haslayer(IPv6))
1665 rx_ip2 = rx_pkt.getlayer(IPv6, 2)
1666 # inner IPv6 packet (ip2) should be unchanged
1667 self.assertEqual(rx_ip2.src, tx_ip2.src)
1668 self.assertEqual(rx_ip2.dst, tx_ip2.dst)
1670 if tx_ip.segleft > 1:
1671 # SRH should NOT have been popped:
1672 # End SID with PSP does not pop SRH if segleft>1
1673 # rx'ed packet should have SRH
1674 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1675 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1677 # received ip.src should be equal to expected ip.src
1678 self.assertEqual(rx_ip.src, tx_ip.src)
1679 # sidlist should be unchanged
1680 self.assertEqual(rx_srh.addresses, tx_srh.addresses)
1681 # segleft should have been decremented
1682 self.assertEqual(rx_srh.segleft, tx_srh.segleft-1)
1683 # received ip.dst should be equal to sidlist[segleft]
1684 self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
1685 # lastentry should be unchanged
1686 self.assertEqual(rx_srh.lastentry, tx_srh.lastentry)
1688 else: # tx_ip.segleft <= 1
1689 # SRH should have been popped:
1690 # End SID with PSP and segleft=1 pops SRH
1691 # the two IPv6 headers are still present
1692 # outer IPv6 header has DA == last segment of popped SRH
1693 # SRH should not be present
1694 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1695 # outer IPv6 header ip.src should be equal to tx'ed ip.src
1696 self.assertEqual(rx_ip.src, tx_ip.src)
1697 # outer IPv6 header ip.dst should be = to tx'ed sidlist[segleft-1]
1698 self.assertEqual(rx_ip.dst, tx_srh.addresses[tx_srh.segleft-1])
1700 # UDP layer should be unchanged
1701 self.assertEqual(rx_udp, tx_udp)
1703 self.logger.debug("packet verification: SUCCESS")
1705 def compare_rx_tx_packet_End_DX6(self, tx_pkt, rx_pkt):
1706 """ Compare input and output packet after passing End.DX6
1708 :param tx_pkt: transmitted packet
1709 :param rx_pkt: received packet
1711 # End.DX6 updates the headers as follows:
1712 # IPv6 + SRH (SL=0):
1713 # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv6(B, D)
1716 # in: IPv6(A, S3)IPv6(B, D)
1719 # get first (outer) IPv6 header of rx'ed packet
1720 rx_ip = rx_pkt.getlayer(IPv6)
1722 tx_ip = tx_pkt.getlayer(IPv6)
1723 tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1725 # verify if rx'ed packet has no SRH
1726 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1728 # the whole rx_ip pkt should be equal to tx_ip2
1729 # except for the hlim field
1730 # -> adjust tx'ed hlim to expected hlim
1731 tx_ip2.hlim = tx_ip2.hlim - 1
1733 self.assertEqual(rx_ip, tx_ip2)
1735 self.logger.debug("packet verification: SUCCESS")
1737 def compare_rx_tx_packet_End_DX4(self, tx_pkt, rx_pkt):
1738 """ Compare input and output packet after passing End.DX4
1740 :param tx_pkt: transmitted packet
1741 :param rx_pkt: received packet
1743 # End.DX4 updates the headers as follows:
1744 # IPv6 + SRH (SL=0):
1745 # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv4(B, D)
1748 # in: IPv6(A, S3)IPv4(B, D)
1751 # get IPv4 header of rx'ed packet
1752 rx_ip = rx_pkt.getlayer(IP)
1754 tx_ip = tx_pkt.getlayer(IPv6)
1755 tx_ip2 = tx_pkt.getlayer(IP)
1757 # verify if rx'ed packet has no SRH
1758 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1760 # the whole rx_ip pkt should be equal to tx_ip2
1761 # except for the ttl field and ip checksum
1762 # -> adjust tx'ed ttl to expected ttl
1763 tx_ip2.ttl = tx_ip2.ttl - 1
1764 # -> set tx'ed ip checksum to None and let scapy recompute
1765 tx_ip2.chksum = None
1766 # read back the pkt (with str()) to force computing these fields
1767 # probably other ways to accomplish this are possible
1768 tx_ip2 = IP(str(tx_ip2))
1770 self.assertEqual(rx_ip, tx_ip2)
1772 self.logger.debug("packet verification: SUCCESS")
1774 def compare_rx_tx_packet_End_DX2(self, tx_pkt, rx_pkt):
1775 """ Compare input and output packet after passing End.DX2
1777 :param tx_pkt: transmitted packet
1778 :param rx_pkt: received packet
1780 # End.DX2 updates the headers as follows:
1781 # IPv6 + SRH (SL=0):
1782 # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)L2
1788 # get IPv4 header of rx'ed packet
1789 rx_eth = rx_pkt.getlayer(Ether)
1791 tx_ip = tx_pkt.getlayer(IPv6)
1792 # we can't just get the 2nd Ether layer
1793 # get the Raw content and dissect it as Ether
1794 tx_eth1 = Ether(str(tx_pkt[Raw]))
1796 # verify if rx'ed packet has no SRH
1797 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1799 # the whole rx_eth pkt should be equal to tx_eth1
1800 self.assertEqual(rx_eth, tx_eth1)
1802 self.logger.debug("packet verification: SUCCESS")
1804 def create_stream(self, src_if, dst_if, packet_header, packet_sizes,
1806 """Create SRv6 input packet stream for defined interface.
1808 :param VppInterface src_if: Interface to create packet stream for
1809 :param VppInterface dst_if: destination interface of packet stream
1810 :param packet_header: Layer3 scapy packet headers,
1811 L2 is added when not provided,
1812 Raw(payload) with packet_info is added
1813 :param list packet_sizes: packet stream pckt sizes,sequentially applied
1814 to packets in stream have
1815 :param int count: number of packets in packet stream
1816 :return: list of packets
1818 self.logger.info("Creating packets")
1820 for i in range(0, count-1):
1821 payload_info = self.create_packet_info(src_if, dst_if)
1823 "Creating packet with index %d" % (payload_info.index))
1824 payload = self.info_to_payload(payload_info)
1825 # add L2 header if not yet provided in packet_header
1826 if packet_header.getlayer(0).name == 'Ethernet':
1827 p = (packet_header /
1830 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
1833 size = packet_sizes[i % len(packet_sizes)]
1834 self.logger.debug("Packet size %d" % (size))
1835 self.extend_packet(p, size)
1836 # we need to store the packet with the automatic fields computed
1837 # read back the dumped packet (with str())
1838 # to force computing these fields
1839 # probably other ways are possible
1841 payload_info.data = p.copy()
1842 self.logger.debug(ppp("Created packet:", p))
1844 self.logger.info("Done creating packets")
1847 def send_and_verify_pkts(self, input, pkts, output, compare_func):
1848 """Send packets and verify received packets using compare_func
1850 :param input: ingress interface of DUT
1851 :param pkts: list of packets to transmit
1852 :param output: egress interface of DUT
1853 :param compare_func: function to compare in and out packets
1855 # add traffic stream to input interface
1856 input.add_stream(pkts)
1858 # enable capture on all interfaces
1859 self.pg_enable_capture(self.pg_interfaces)
1862 self.logger.info("Starting traffic")
1865 # get output capture
1866 self.logger.info("Getting packet capture")
1867 capture = output.get_capture()
1869 # assert nothing was captured on input interface
1870 input.assert_nothing_captured()
1872 # verify captured packets
1873 self.verify_captured_pkts(output, capture, compare_func)
1875 def create_packet_header_IPv6(self, dst):
1876 """Create packet header: IPv6 header, UDP header
1878 :param dst: IPv6 destination address
1880 IPv6 source address is 1234::1
1881 UDP source port and destination port are 1234
1884 p = (IPv6(src='1234::1', dst=dst) /
1885 UDP(sport=1234, dport=1234))
1888 def create_packet_header_IPv6_SRH(self, sidlist, segleft):
1889 """Create packet header: IPv6 header with SRH, UDP header
1891 :param list sidlist: segment list
1892 :param int segleft: segments-left field value
1894 IPv6 destination address is set to sidlist[segleft]
1895 IPv6 source addresses are 1234::1 and 4321::1
1896 UDP source port and destination port are 1234
1899 p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1900 IPv6ExtHdrSegmentRouting(addresses=sidlist) /
1901 UDP(sport=1234, dport=1234))
1904 def create_packet_header_IPv6_SRH_IPv6(self, dst, sidlist, segleft):
1905 """Create packet header: IPv6 encapsulated in SRv6:
1906 IPv6 header with SRH, IPv6 header, UDP header
1908 :param ipv6address dst: inner IPv6 destination address
1909 :param list sidlist: segment list of outer IPv6 SRH
1910 :param int segleft: segments-left field of outer IPv6 SRH
1912 Outer IPv6 destination address is set to sidlist[segleft]
1913 IPv6 source addresses are 1234::1 and 4321::1
1914 UDP source port and destination port are 1234
1917 p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1918 IPv6ExtHdrSegmentRouting(addresses=sidlist,
1919 segleft=segleft, nh=41) /
1920 IPv6(src='4321::1', dst=dst) /
1921 UDP(sport=1234, dport=1234))
1924 def create_packet_header_IPv6_IPv6(self, dst_inner, dst_outer):
1925 """Create packet header: IPv6 encapsulated in IPv6:
1926 IPv6 header, IPv6 header, UDP header
1928 :param ipv6address dst_inner: inner IPv6 destination address
1929 :param ipv6address dst_outer: outer IPv6 destination address
1931 IPv6 source addresses are 1234::1 and 4321::1
1932 UDP source port and destination port are 1234
1935 p = (IPv6(src='1234::1', dst=dst_outer) /
1936 IPv6(src='4321::1', dst=dst_inner) /
1937 UDP(sport=1234, dport=1234))
1940 def create_packet_header_IPv6_SRH_SRH_IPv6(self, dst, sidlist1, segleft1,
1941 sidlist2, segleft2):
1942 """Create packet header: IPv6 encapsulated in SRv6 with 2 SRH:
1943 IPv6 header with SRH, 2nd SRH, IPv6 header, UDP header
1945 :param ipv6address dst: inner IPv6 destination address
1946 :param list sidlist1: segment list of outer IPv6 SRH
1947 :param int segleft1: segments-left field of outer IPv6 SRH
1948 :param list sidlist2: segment list of inner IPv6 SRH
1949 :param int segleft2: segments-left field of inner IPv6 SRH
1951 Outer IPv6 destination address is set to sidlist[segleft]
1952 IPv6 source addresses are 1234::1 and 4321::1
1953 UDP source port and destination port are 1234
1956 p = (IPv6(src='1234::1', dst=sidlist1[segleft1]) /
1957 IPv6ExtHdrSegmentRouting(addresses=sidlist1,
1958 segleft=segleft1, nh=43) /
1959 IPv6ExtHdrSegmentRouting(addresses=sidlist2,
1960 segleft=segleft2, nh=41) /
1961 IPv6(src='4321::1', dst=dst) /
1962 UDP(sport=1234, dport=1234))
1965 def create_packet_header_IPv4(self, dst):
1966 """Create packet header: IPv4 header, UDP header
1968 :param dst: IPv4 destination address
1970 IPv4 source address is 123.1.1.1
1971 UDP source port and destination port are 1234
1974 p = (IP(src='123.1.1.1', dst=dst) /
1975 UDP(sport=1234, dport=1234))
1978 def create_packet_header_IPv6_IPv4(self, dst_inner, dst_outer):
1979 """Create packet header: IPv4 encapsulated in IPv6:
1980 IPv6 header, IPv4 header, UDP header
1982 :param ipv4address dst_inner: inner IPv4 destination address
1983 :param ipv6address dst_outer: outer IPv6 destination address
1985 IPv6 source address is 1234::1
1986 IPv4 source address is 123.1.1.1
1987 UDP source port and destination port are 1234
1990 p = (IPv6(src='1234::1', dst=dst_outer) /
1991 IP(src='123.1.1.1', dst=dst_inner) /
1992 UDP(sport=1234, dport=1234))
1995 def create_packet_header_IPv6_SRH_IPv4(self, dst, sidlist, segleft):
1996 """Create packet header: IPv4 encapsulated in SRv6:
1997 IPv6 header with SRH, IPv4 header, UDP header
1999 :param ipv4address dst: inner IPv4 destination address
2000 :param list sidlist: segment list of outer IPv6 SRH
2001 :param int segleft: segments-left field of outer IPv6 SRH
2003 Outer IPv6 destination address is set to sidlist[segleft]
2004 IPv6 source address is 1234::1
2005 IPv4 source address is 123.1.1.1
2006 UDP source port and destination port are 1234
2009 p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
2010 IPv6ExtHdrSegmentRouting(addresses=sidlist,
2011 segleft=segleft, nh=4) /
2012 IP(src='123.1.1.1', dst=dst) /
2013 UDP(sport=1234, dport=1234))
2016 def create_packet_header_L2(self, vlan=0):
2017 """Create packet header: L2 header
2019 :param vlan: if vlan!=0 then add 802.1q header
2021 # Note: the dst addr ('00:55:44:33:22:11') is used in
2022 # the compare function compare_rx_tx_packet_T_Encaps_L2
2023 # to detect presence of L2 in SRH payload
2024 p = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
2025 etype = 0x8137 # IPX
2028 p /= Dot1Q(vlan=vlan, type=etype)
2033 def create_packet_header_IPv6_SRH_L2(self, sidlist, segleft, vlan=0):
2034 """Create packet header: L2 encapsulated in SRv6:
2035 IPv6 header with SRH, L2
2037 :param list sidlist: segment list of outer IPv6 SRH
2038 :param int segleft: segments-left field of outer IPv6 SRH
2039 :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
2041 Outer IPv6 destination address is set to sidlist[segleft]
2042 IPv6 source address is 1234::1
2044 eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
2045 etype = 0x8137 # IPX
2048 eth /= Dot1Q(vlan=vlan, type=etype)
2052 p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
2053 IPv6ExtHdrSegmentRouting(addresses=sidlist,
2054 segleft=segleft, nh=59) /
2058 def create_packet_header_IPv6_L2(self, dst_outer, vlan=0):
2059 """Create packet header: L2 encapsulated in IPv6:
2062 :param ipv6address dst_outer: outer IPv6 destination address
2063 :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
2065 eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
2066 etype = 0x8137 # IPX
2069 eth /= Dot1Q(vlan=vlan, type=etype)
2073 p = (IPv6(src='1234::1', dst=dst_outer, nh=59) / eth)
2076 def get_payload_info(self, packet):
2077 """ Extract the payload_info from the packet
2079 # in most cases, payload_info is in packet[Raw]
2080 # but packet[Raw] gives the complete payload
2081 # (incl L2 header) for the T.Encaps L2 case
2083 payload_info = self.payload_to_info(str(packet[Raw]))
2086 # remote L2 header from packet[Raw]:
2087 # take packet[Raw], convert it to an Ether layer
2088 # and then extract Raw from it
2089 payload_info = self.payload_to_info(
2090 str(Ether(str(packet[Raw]))[Raw]))
2094 def verify_captured_pkts(self, dst_if, capture, compare_func):
2096 Verify captured packet stream for specified interface.
2097 Compare ingress with egress packets using the specified compare fn
2099 :param dst_if: egress interface of DUT
2100 :param capture: captured packets
2101 :param compare_func: function to compare in and out packet
2103 self.logger.info("Verifying capture on interface %s using function %s"
2104 % (dst_if.name, compare_func.func_name))
2107 for i in self.pg_interfaces:
2108 last_info[i.sw_if_index] = None
2109 dst_sw_if_index = dst_if.sw_if_index
2111 for packet in capture:
2113 # extract payload_info from packet's payload
2114 payload_info = self.get_payload_info(packet)
2115 packet_index = payload_info.index
2117 self.logger.debug("Verifying packet with index %d"
2119 # packet should have arrived on the expected interface
2120 self.assertEqual(payload_info.dst, dst_sw_if_index)
2122 "Got packet on interface %s: src=%u (idx=%u)" %
2123 (dst_if.name, payload_info.src, packet_index))
2125 # search for payload_info with same src and dst if_index
2126 # this will give us the transmitted packet
2127 next_info = self.get_next_packet_info_for_interface2(
2128 payload_info.src, dst_sw_if_index,
2129 last_info[payload_info.src])
2130 last_info[payload_info.src] = next_info
2131 # next_info should not be None
2132 self.assertTrue(next_info is not None)
2133 # index of tx and rx packets should be equal
2134 self.assertEqual(packet_index, next_info.index)
2135 # data field of next_info contains the tx packet
2136 txed_packet = next_info.data
2138 self.logger.debug(ppp("Transmitted packet:",
2139 txed_packet)) # ppp=Pretty Print Packet
2141 self.logger.debug(ppp("Received packet:", packet))
2143 # compare rcvd packet with expected packet using compare_func
2144 compare_func(txed_packet, packet)
2147 print packet.command()
2148 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2151 # have all expected packets arrived?
2152 for i in self.pg_interfaces:
2153 remaining_packet = self.get_next_packet_info_for_interface2(
2154 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
2155 self.assertTrue(remaining_packet is None,
2156 "Interface %s: Packet expected from interface %s "
2157 "didn't arrive" % (dst_if.name, i.name))
2160 if __name__ == '__main__':
2161 unittest.main(testRunner=VppTestRunner)