5 from socket import AF_INET6
7 from framework import VppTestCase, VppTestRunner
8 from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathProto, VppIpTable
10 SRv6LocalSIDBehaviors,
15 SRv6PolicySteeringTypes,
19 from scapy.packet import Raw
20 from scapy.layers.l2 import Ether, Dot1Q
21 from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
22 from scapy.layers.inet import IP, UDP
27 class TestSRv6As(VppTestCase):
28 """SRv6 Static Proxy plugin Test Case"""
32 super(TestSRv6As, self).setUpClass()
35 def tearDownClass(cls):
36 super(TestSRv6As, cls).tearDownClass()
39 """Perform test setup before each test case."""
40 super(TestSRv6As, self).setUp()
42 # packet sizes, inclusive L2 overhead
43 self.pg_packet_sizes = [64, 512, 1518, 9018]
46 self.reset_packet_infos()
49 """Clean up test setup after each test case."""
50 self.teardown_interfaces()
52 super(TestSRv6As, self).tearDown()
54 def configure_interface(
55 self, interface, ipv6=False, ipv4=False, ipv6_table_id=0, ipv4_table_id=0
57 """Configure interface.
58 :param ipv6: configure IPv6 on interface
59 :param ipv4: configure IPv4 on interface
60 :param ipv6_table_id: FIB table_id for IPv6
61 :param ipv4_table_id: FIB table_id for IPv4
63 self.logger.debug("Configuring interface %s" % (interface.name))
65 self.logger.debug("Configuring IPv6")
66 interface.set_table_ip6(ipv6_table_id)
67 interface.config_ip6()
68 interface.resolve_ndp(timeout=5)
70 self.logger.debug("Configuring IPv4")
71 interface.set_table_ip4(ipv4_table_id)
72 interface.config_ip4()
73 interface.resolve_arp()
76 def setup_interfaces(self, ipv6=[], ipv4=[], ipv6_table_id=[], ipv4_table_id=[]):
77 """Create and configure interfaces.
79 :param ipv6: list of interface IPv6 capabilities
80 :param ipv4: list of interface IPv4 capabilities
81 :param ipv6_table_id: list of intf IPv6 FIB table_ids
82 :param ipv4_table_id: list of intf IPv4 FIB table_ids
83 :returns: List of created interfaces.
85 # how many interfaces?
90 self.logger.debug("Creating and configuring %d interfaces" % (count))
92 # fill up ipv6 and ipv4 lists if needed
93 # not enabled (False) is the default
95 ipv6 += (count - len(ipv6)) * [False]
97 ipv4 += (count - len(ipv4)) * [False]
99 # fill up table_id lists if needed
100 # table_id 0 (global) is the default
101 if len(ipv6_table_id) < count:
102 ipv6_table_id += (count - len(ipv6_table_id)) * [0]
103 if len(ipv4_table_id) < count:
104 ipv4_table_id += (count - len(ipv4_table_id)) * [0]
106 # create 'count' pg interfaces
107 self.create_pg_interfaces(range(count))
109 # setup all interfaces
110 for i in range(count):
111 intf = self.pg_interfaces[i]
112 self.configure_interface(
113 intf, ipv6[i], ipv4[i], ipv6_table_id[i], ipv4_table_id[i]
117 self.logger.debug(self.vapi.cli("show ip6 neighbors"))
119 self.logger.debug(self.vapi.cli("show ip4 neighbors"))
120 self.logger.debug(self.vapi.cli("show interface"))
121 self.logger.debug(self.vapi.cli("show hardware"))
123 return self.pg_interfaces
125 def teardown_interfaces(self):
126 """Unconfigure and bring down interface."""
127 self.logger.debug("Tearing down interfaces")
128 # tear down all interfaces
129 # AFAIK they cannot be deleted
130 for i in self.pg_interfaces:
131 self.logger.debug("Tear down interface %s" % (i.name))
137 def test_SRv6_End_AS_IPv6_noSRH(self):
138 """Test SRv6 End.AS behavior with IPv6 traffic and no SRH rewrite."""
139 self.run_SRv6_End_AS_IPv6(
140 sid_list=["a1::", "a2::a6", "a3::"],
142 rewrite_src_addr="a2::",
145 def test_SRv6_End_AS_IPv6_SRH(self):
146 """Test SRv6 End.AS behavior with IPv6 traffic and SRH rewrite."""
147 self.run_SRv6_End_AS_IPv6(
148 sid_list=["a1::a6", "a2::", "a3::"],
150 rewrite_src_addr="a1::",
153 def test_SRv6_End_AS_IPv4_noSRH(self):
154 """Test SRv6 End.AS behavior with IPv4 traffic and no SRH rewrite."""
155 self.run_SRv6_End_AS_IPv4(
156 sid_list=["a1::", "a2::a6", "a3::"],
158 rewrite_src_addr="a2::",
161 def test_SRv6_End_AS_IPv4_SRH(self):
162 """Test SRv6 End.AS behavior with IPv4 traffic and SRH rewrite."""
163 self.run_SRv6_End_AS_IPv4(
164 sid_list=["a1::a6", "a2::", "a3::"],
166 rewrite_src_addr="a1::",
169 def test_SRv6_End_AS_L2_noSRH(self):
170 """Test SRv6 End.AS behavior with L2 traffic and no SRH rewrite."""
171 self.run_SRv6_End_AS_L2(
172 sid_list=["a1::", "a2::a6", "a3::"],
174 rewrite_src_addr="a2::",
177 def test_SRv6_End_AS_L2_SRH(self):
178 """Test SRv6 End.AS behavior with L2 traffic and SRH rewrite."""
179 self.run_SRv6_End_AS_L2(
180 sid_list=["a1::a6", "a2::", "a3::"],
182 rewrite_src_addr="a1::",
185 def run_SRv6_End_AS_L2(self, sid_list, test_sid_index, rewrite_src_addr):
186 """Run SRv6 End.AS test with L2 traffic."""
187 self.rewrite_src_addr = rewrite_src_addr
188 self.rewrite_sid_list = sid_list[test_sid_index + 1 : :]
190 # send traffic to one destination interface
191 # source and destination interfaces are IPv6 only
192 self.setup_interfaces(ipv6=[True, False])
194 # configure route to next segment
197 sid_list[test_sid_index + 1],
199 [VppRoutePath(self.pg0.remote_ip6, self.pg0.sw_if_index)],
201 route.add_vpp_config()
203 # configure SRv6 localSID behavior
205 "sr localsid address "
206 + sid_list[test_sid_index]
213 + self.rewrite_src_addr
215 for s in self.rewrite_sid_list:
216 cli_str += " next " + s
217 self.vapi.cli(cli_str)
220 self.logger.debug(self.vapi.cli("show sr localsid"))
222 # send one packet per packet size
223 count = len(self.pg_packet_sizes)
225 # prepare L2 in SRv6 headers
226 packet_header1 = self.create_packet_header_IPv6_SRH_L2(
227 sidlist=sid_list[::-1], segleft=len(sid_list) - test_sid_index - 1, vlan=0
230 # generate packets (pg0->pg1)
231 pkts1 = self.create_stream(
232 self.pg0, self.pg1, packet_header1, self.pg_packet_sizes, count
235 # send packets and verify received packets
236 self.send_and_verify_pkts(
237 self.pg0, pkts1, self.pg1, self.compare_rx_tx_packet_End_AS_L2_out
240 # log the localsid counters
241 self.logger.info(self.vapi.cli("show sr localsid"))
243 # prepare L2 header for returning packets
244 packet_header2 = self.create_packet_header_L2()
246 # generate returning packets (pg1->pg0)
247 pkts2 = self.create_stream(
248 self.pg1, self.pg0, packet_header2, self.pg_packet_sizes, count
251 # send packets and verify received packets
252 self.send_and_verify_pkts(
253 self.pg1, pkts2, self.pg0, self.compare_rx_tx_packet_End_AS_L2_in
256 # log the localsid counters
257 self.logger.info(self.vapi.cli("show sr localsid"))
259 # remove SRv6 localSIDs
260 self.vapi.cli("sr localsid del address " + sid_list[test_sid_index])
263 self.teardown_interfaces()
265 def run_SRv6_End_AS_IPv6(self, sid_list, test_sid_index, rewrite_src_addr):
266 """Run SRv6 End.AS test with IPv6 traffic."""
267 self.rewrite_src_addr = rewrite_src_addr
268 self.rewrite_sid_list = sid_list[test_sid_index + 1 : :]
270 # send traffic to one destination interface
271 # source and destination interfaces are IPv6 only
272 self.setup_interfaces(ipv6=[True, True])
274 # configure route to next segment
277 sid_list[test_sid_index + 1],
279 [VppRoutePath(self.pg0.remote_ip6, self.pg0.sw_if_index)],
281 route.add_vpp_config()
283 # configure SRv6 localSID behavior
285 "sr localsid address "
286 + sid_list[test_sid_index]
289 + self.pg1.remote_ip6
295 + self.rewrite_src_addr
297 for s in self.rewrite_sid_list:
298 cli_str += " next " + s
299 self.vapi.cli(cli_str)
302 self.logger.debug(self.vapi.cli("show sr localsid"))
304 # send one packet per packet size
305 count = len(self.pg_packet_sizes)
307 # prepare IPv6 in SRv6 headers
308 packet_header1 = self.create_packet_header_IPv6_SRH_IPv6(
309 sidlist=sid_list[::-1], segleft=len(sid_list) - test_sid_index - 1
312 # generate packets (pg0->pg1)
313 pkts1 = self.create_stream(
314 self.pg0, self.pg1, packet_header1, self.pg_packet_sizes, count
317 # send packets and verify received packets
318 self.send_and_verify_pkts(
319 self.pg0, pkts1, self.pg1, self.compare_rx_tx_packet_End_AS_IPv6_out
322 # log the localsid counters
323 self.logger.info(self.vapi.cli("show sr localsid"))
325 # prepare IPv6 header for returning packets
326 packet_header2 = self.create_packet_header_IPv6()
328 # generate returning packets (pg1->pg0)
329 pkts2 = self.create_stream(
330 self.pg1, self.pg0, packet_header2, self.pg_packet_sizes, count
333 # send packets and verify received packets
334 self.send_and_verify_pkts(
335 self.pg1, pkts2, self.pg0, self.compare_rx_tx_packet_End_AS_IPv6_in
338 # log the localsid counters
339 self.logger.info(self.vapi.cli("show sr localsid"))
341 # remove SRv6 localSIDs
342 self.vapi.cli("sr localsid del address " + sid_list[test_sid_index])
345 self.teardown_interfaces()
347 def run_SRv6_End_AS_IPv4(self, sid_list, test_sid_index, rewrite_src_addr):
348 """Run SRv6 End.AS test with IPv4 traffic."""
349 self.rewrite_src_addr = rewrite_src_addr
350 self.rewrite_sid_list = sid_list[test_sid_index + 1 : :]
352 # send traffic to one destination interface
353 # source and destination interfaces are IPv6 only
354 self.setup_interfaces(ipv6=[True, False], ipv4=[True, True])
356 # configure route to next segment
359 sid_list[test_sid_index + 1],
361 [VppRoutePath(self.pg0.remote_ip6, self.pg0.sw_if_index)],
363 route.add_vpp_config()
365 # configure SRv6 localSID behavior
367 "sr localsid address "
368 + sid_list[test_sid_index]
371 + self.pg1.remote_ip4
377 + self.rewrite_src_addr
379 for s in self.rewrite_sid_list:
380 cli_str += " next " + s
381 self.vapi.cli(cli_str)
384 self.logger.debug(self.vapi.cli("show sr localsid"))
386 # send one packet per packet size
387 count = len(self.pg_packet_sizes)
389 # prepare IPv4 in SRv6 headers
390 packet_header1 = self.create_packet_header_IPv6_SRH_IPv4(
391 sidlist=sid_list[::-1], segleft=len(sid_list) - test_sid_index - 1
394 # generate packets (pg0->pg1)
395 pkts1 = self.create_stream(
396 self.pg0, self.pg1, packet_header1, self.pg_packet_sizes, count
399 # send packets and verify received packets
400 self.send_and_verify_pkts(
401 self.pg0, pkts1, self.pg1, self.compare_rx_tx_packet_End_AS_IPv4_out
404 # log the localsid counters
405 self.logger.info(self.vapi.cli("show sr localsid"))
407 # prepare IPv6 header for returning packets
408 packet_header2 = self.create_packet_header_IPv4()
410 # generate returning packets (pg1->pg0)
411 pkts2 = self.create_stream(
412 self.pg1, self.pg0, packet_header2, self.pg_packet_sizes, count
415 # send packets and verify received packets
416 self.send_and_verify_pkts(
417 self.pg1, pkts2, self.pg0, self.compare_rx_tx_packet_End_AS_IPv4_in
420 # log the localsid counters
421 self.logger.info(self.vapi.cli("show sr localsid"))
423 # remove SRv6 localSIDs
424 self.vapi.cli("sr localsid del address " + sid_list[test_sid_index])
427 self.teardown_interfaces()
429 def compare_rx_tx_packet_End_AS_IPv6_in(self, tx_pkt, rx_pkt):
430 """Compare input and output packet after passing End.AS
432 :param tx_pkt: transmitted packet
433 :param rx_pkt: received packet
436 # get first (outer) IPv6 header of rx'ed packet
437 rx_ip = rx_pkt.getlayer(IPv6)
440 tx_ip = tx_pkt.getlayer(IPv6)
442 # expected segment-list (SRH order)
443 tx_seglist = self.rewrite_sid_list[::-1]
445 # received ip.src should be equal to SR Policy source
446 self.assertEqual(rx_ip.src, self.rewrite_src_addr)
447 # received ip.dst should be equal to expected sidlist[lastentry]
448 self.assertEqual(rx_ip.dst, tx_seglist[-1])
450 if len(tx_seglist) > 1:
451 # rx'ed packet should have SRH
452 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
454 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
455 # rx'ed seglist should be equal to expected seglist
456 self.assertEqual(rx_srh.addresses, tx_seglist)
457 # segleft should be equal to size expected seglist-1
458 self.assertEqual(rx_srh.segleft, len(tx_seglist) - 1)
459 # segleft should be equal to lastentry
460 self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
462 payload = rx_srh.payload
464 # rx'ed packet should NOT have SRH
465 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
467 payload = rx_ip.payload
469 # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
470 # except for the hop-limit field
471 # -> update tx'ed hlim to the expected hlim
472 tx_ip.hlim = tx_ip.hlim - 1
474 self.assertEqual(payload, tx_ip)
476 self.logger.debug("packet verification: SUCCESS")
478 def compare_rx_tx_packet_End_AS_IPv4_in(self, tx_pkt, rx_pkt):
479 """Compare input and output packet after passing End.AS
481 :param tx_pkt: transmitted packet
482 :param rx_pkt: received packet
485 # get first (outer) IPv6 header of rx'ed packet
486 rx_ip = rx_pkt.getlayer(IPv6)
489 tx_ip = tx_pkt.getlayer(IP)
491 # expected segment-list (SRH order)
492 tx_seglist = self.rewrite_sid_list[::-1]
494 # received ip.src should be equal to SR Policy source
495 self.assertEqual(rx_ip.src, self.rewrite_src_addr)
496 # received ip.dst should be equal to expected sidlist[lastentry]
497 self.assertEqual(rx_ip.dst, tx_seglist[-1])
499 if len(tx_seglist) > 1:
500 # rx'ed packet should have SRH and IPv4 header
501 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
502 self.assertTrue(rx_ip.payload.haslayer(IP))
504 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
505 # rx'ed seglist should be equal to seglist
506 self.assertEqual(rx_srh.addresses, tx_seglist)
507 # segleft should be equal to size seglist-1
508 self.assertEqual(rx_srh.segleft, len(tx_seglist) - 1)
509 # segleft should be equal to lastentry
510 self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
511 payload = rx_srh.payload
513 # rx'ed packet should NOT have SRH
514 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
516 payload = rx_ip.payload
518 # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
519 # except for the ttl field and ip checksum
520 # -> adjust tx'ed ttl to expected ttl
521 tx_ip.ttl = tx_ip.ttl - 1
522 # -> set tx'ed ip checksum to None and let scapy recompute
524 # read back the pkt (with str()) to force computing these fields
525 # probably other ways to accomplish this are possible
526 tx_ip = IP(scapy.compat.raw(tx_ip))
528 self.assertEqual(payload, tx_ip)
530 self.logger.debug("packet verification: SUCCESS")
532 def compare_rx_tx_packet_End_AS_L2_in(self, tx_pkt, rx_pkt):
533 """Compare input and output packet after passing End.AS
535 :param tx_pkt: transmitted packet
536 :param rx_pkt: received packet
539 # get first (outer) IPv6 header of rx'ed packet
540 rx_ip = rx_pkt.getlayer(IPv6)
543 tx_ether = tx_pkt.getlayer(Ether)
545 # expected segment-list (SRH order)
546 tx_seglist = self.rewrite_sid_list[::-1]
548 # received ip.src should be equal to SR Policy source
549 self.assertEqual(rx_ip.src, self.rewrite_src_addr)
550 # received ip.dst should be equal to expected sidlist[lastentry]
551 self.assertEqual(rx_ip.dst, tx_seglist[-1])
553 if len(tx_seglist) > 1:
554 # rx'ed packet should have SRH
555 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
557 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
558 # rx'ed seglist should be equal to seglist
559 self.assertEqual(rx_srh.addresses, tx_seglist)
560 # segleft should be equal to size seglist-1
561 self.assertEqual(rx_srh.segleft, len(tx_seglist) - 1)
562 # segleft should be equal to lastentry
563 self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
564 # nh should be "No Next Header" (143)
565 self.assertEqual(rx_srh.nh, 143)
567 payload = rx_srh.payload
569 # rx'ed packet should NOT have SRH
570 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
572 payload = rx_ip.payload
574 # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
575 self.assertEqual(Ether(scapy.compat.raw(payload)), tx_ether)
577 self.logger.debug("packet verification: SUCCESS")
579 def compare_rx_tx_packet_End_AS_IPv6_out(self, tx_pkt, rx_pkt):
580 """Compare input and output packet after passing End.AS with IPv6
582 :param tx_pkt: transmitted packet
583 :param rx_pkt: received packet
586 # get first (outer) IPv6 header of rx'ed packet
587 rx_ip = rx_pkt.getlayer(IPv6)
589 tx_ip = tx_pkt.getlayer(IPv6)
590 tx_ip2 = tx_pkt.getlayer(IPv6, 2)
592 # verify if rx'ed packet has no SRH
593 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
595 # the whole rx_ip pkt should be equal to tx_ip2
596 # except for the hlim field
597 # -> adjust tx'ed hlim to expected hlim
598 tx_ip2.hlim = tx_ip2.hlim - 1
600 self.assertEqual(rx_ip, tx_ip2)
602 self.logger.debug("packet verification: SUCCESS")
604 def compare_rx_tx_packet_End_AS_IPv4_out(self, tx_pkt, rx_pkt):
605 """Compare input and output packet after passing End.AS with IPv4
607 :param tx_pkt: transmitted packet
608 :param rx_pkt: received packet
611 # get IPv4 header of rx'ed packet
612 rx_ip = rx_pkt.getlayer(IP)
614 tx_ip = tx_pkt.getlayer(IPv6)
615 tx_ip2 = tx_pkt.getlayer(IP)
617 # verify if rx'ed packet has no SRH
618 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
620 # the whole rx_ip pkt should be equal to tx_ip2
621 # except for the ttl field and ip checksum
622 # -> adjust tx'ed ttl to expected ttl
623 tx_ip2.ttl = tx_ip2.ttl - 1
624 # -> set tx'ed ip checksum to None and let scapy recompute
626 # read back the pkt (with str()) to force computing these fields
627 # probably other ways to accomplish this are possible
628 tx_ip2 = IP(scapy.compat.raw(tx_ip2))
630 self.assertEqual(rx_ip, tx_ip2)
632 self.logger.debug("packet verification: SUCCESS")
634 def compare_rx_tx_packet_End_AS_L2_out(self, tx_pkt, rx_pkt):
635 """Compare input and output packet after passing End.AS with L2
637 :param tx_pkt: transmitted packet
638 :param rx_pkt: received packet
641 # get IPv4 header of rx'ed packet
642 rx_eth = rx_pkt.getlayer(Ether)
644 tx_ip = tx_pkt.getlayer(IPv6)
645 # we can't just get the 2nd Ether layer
646 # get the Raw content and dissect it as Ether
647 tx_eth1 = Ether(scapy.compat.raw(tx_pkt[Raw]))
649 # verify if rx'ed packet has no SRH
650 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
652 # the whole rx_eth pkt should be equal to tx_eth1
653 self.assertEqual(rx_eth, tx_eth1)
655 self.logger.debug("packet verification: SUCCESS")
657 def create_stream(self, src_if, dst_if, packet_header, packet_sizes, count):
658 """Create SRv6 input packet stream for defined interface.
660 :param VppInterface src_if: Interface to create packet stream for
661 :param VppInterface dst_if: destination interface of packet stream
662 :param packet_header: Layer3 scapy packet headers,
663 L2 is added when not provided,
664 Raw(payload) with packet_info is added
665 :param list packet_sizes: packet stream pckt sizes,sequentially applied
666 to packets in stream have
667 :param int count: number of packets in packet stream
668 :return: list of packets
670 self.logger.info("Creating packets")
672 for i in range(0, count - 1):
673 payload_info = self.create_packet_info(src_if, dst_if)
674 self.logger.debug("Creating packet with index %d" % (payload_info.index))
675 payload = self.info_to_payload(payload_info)
676 # add L2 header if not yet provided in packet_header
677 if packet_header.getlayer(0).name == "Ethernet":
678 p = packet_header / Raw(payload)
681 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
685 size = packet_sizes[i % len(packet_sizes)]
686 self.logger.debug("Packet size %d" % (size))
687 self.extend_packet(p, size)
688 # we need to store the packet with the automatic fields computed
689 # read back the dumped packet (with str())
690 # to force computing these fields
691 # probably other ways are possible
692 p = Ether(scapy.compat.raw(p))
693 payload_info.data = p.copy()
694 self.logger.debug(ppp("Created packet:", p))
696 self.logger.info("Done creating packets")
699 def send_and_verify_pkts(self, input, pkts, output, compare_func):
700 """Send packets and verify received packets using compare_func
702 :param input: ingress interface of DUT
703 :param pkts: list of packets to transmit
704 :param output: egress interface of DUT
705 :param compare_func: function to compare in and out packets
707 # add traffic stream to input interface
708 input.add_stream(pkts)
710 # enable capture on all interfaces
711 self.pg_enable_capture(self.pg_interfaces)
714 self.logger.info("Starting traffic")
718 self.logger.info("Getting packet capture")
719 capture = output.get_capture()
721 # assert nothing was captured on input interface
722 # input.assert_nothing_captured()
724 # verify captured packets
725 self.verify_captured_pkts(output, capture, compare_func)
727 def create_packet_header_IPv6(self):
728 """Create packet header: IPv6 header, UDP header
730 :param dst: IPv6 destination address
732 IPv6 source address is 1234::1
733 IPv6 destination address is 4321::1
734 UDP source port and destination port are 1234
737 p = IPv6(src="1234::1", dst="4321::1") / UDP(sport=1234, dport=1234)
740 def create_packet_header_IPv6_SRH_IPv6(self, sidlist, segleft):
741 """Create packet header: IPv6 encapsulated in SRv6:
742 IPv6 header with SRH, IPv6 header, UDP header
744 :param list sidlist: segment list of outer IPv6 SRH
745 :param int segleft: segments-left field of outer IPv6 SRH
747 Outer IPv6 source address is set to 5678::1
748 Outer IPv6 destination address is set to sidlist[segleft]
749 IPv6 source addresses is 1234::1
750 IPv6 destination address is 4321::1
751 UDP source port and destination port are 1234
755 IPv6(src="5678::1", dst=sidlist[segleft])
756 / IPv6ExtHdrSegmentRouting(addresses=sidlist, segleft=segleft, nh=41)
757 / IPv6(src="1234::1", dst="4321::1")
758 / UDP(sport=1234, dport=1234)
762 def create_packet_header_IPv4(self):
763 """Create packet header: IPv4 header, UDP header
765 :param dst: IPv4 destination address
767 IPv4 source address is 123.1.1.1
768 IPv4 destination address is 124.1.1.1
769 UDP source port and destination port are 1234
772 p = IP(src="123.1.1.1", dst="124.1.1.1") / UDP(sport=1234, dport=1234)
775 def create_packet_header_IPv6_SRH_IPv4(self, sidlist, segleft):
776 """Create packet header: IPv4 encapsulated in SRv6:
777 IPv6 header with SRH, IPv4 header, UDP header
779 :param ipv4address dst: inner IPv4 destination address
780 :param list sidlist: segment list of outer IPv6 SRH
781 :param int segleft: segments-left field of outer IPv6 SRH
783 Outer IPv6 destination address is set to sidlist[segleft]
784 IPv6 source address is 1234::1
785 IPv4 source address is 123.1.1.1
786 IPv4 destination address is 124.1.1.1
787 UDP source port and destination port are 1234
791 IPv6(src="1234::1", dst=sidlist[segleft])
792 / IPv6ExtHdrSegmentRouting(addresses=sidlist, segleft=segleft, nh=4)
793 / IP(src="123.1.1.1", dst="124.1.1.1")
794 / UDP(sport=1234, dport=1234)
798 def create_packet_header_L2(self, vlan=0):
799 """Create packet header: L2 header
801 :param vlan: if vlan!=0 then add 802.1q header
803 # Note: the dst addr ('00:55:44:33:22:11') is used in
804 # the compare function compare_rx_tx_packet_T_Encaps_L2
805 # to detect presence of L2 in SRH payload
806 p = Ether(src="00:11:22:33:44:55", dst="00:55:44:33:22:11")
810 p /= Dot1Q(vlan=vlan, type=etype)
815 def create_packet_header_IPv6_SRH_L2(self, sidlist, segleft, vlan=0):
816 """Create packet header: L2 encapsulated in SRv6:
817 IPv6 header with SRH, L2
819 :param list sidlist: segment list of outer IPv6 SRH
820 :param int segleft: segments-left field of outer IPv6 SRH
821 :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
823 Outer IPv6 destination address is set to sidlist[segleft]
824 IPv6 source address is 1234::1
826 eth = Ether(src="00:11:22:33:44:55", dst="00:55:44:33:22:11")
830 eth /= Dot1Q(vlan=vlan, type=etype)
835 IPv6(src="1234::1", dst=sidlist[segleft])
836 / IPv6ExtHdrSegmentRouting(addresses=sidlist, segleft=segleft, nh=143)
841 def get_payload_info(self, packet):
842 """Extract the payload_info from the packet"""
843 # in most cases, payload_info is in packet[Raw]
844 # but packet[Raw] gives the complete payload
845 # (incl L2 header) for the T.Encaps L2 case
847 payload_info = self.payload_to_info(packet[Raw])
850 # remote L2 header from packet[Raw]:
851 # take packet[Raw], convert it to an Ether layer
852 # and then extract Raw from it
853 payload_info = self.payload_to_info(
854 Ether(scapy.compat.raw(packet[Raw]))[Raw]
859 def verify_captured_pkts(self, dst_if, capture, compare_func):
861 Verify captured packet stream for specified interface.
862 Compare ingress with egress packets using the specified compare fn
864 :param dst_if: egress interface of DUT
865 :param capture: captured packets
866 :param compare_func: function to compare in and out packet
869 "Verifying capture on interface %s using function %s"
870 % (dst_if.name, compare_func.__name__)
874 for i in self.pg_interfaces:
875 last_info[i.sw_if_index] = None
876 dst_sw_if_index = dst_if.sw_if_index
878 for packet in capture:
880 # extract payload_info from packet's payload
881 payload_info = self.get_payload_info(packet)
882 packet_index = payload_info.index
884 self.logger.debug("Verifying packet with index %d" % (packet_index))
885 # packet should have arrived on the expected interface
886 self.assertEqual(payload_info.dst, dst_sw_if_index)
888 "Got packet on interface %s: src=%u (idx=%u)"
889 % (dst_if.name, payload_info.src, packet_index)
892 # search for payload_info with same src and dst if_index
893 # this will give us the transmitted packet
894 next_info = self.get_next_packet_info_for_interface2(
895 payload_info.src, dst_sw_if_index, last_info[payload_info.src]
897 last_info[payload_info.src] = next_info
898 # next_info should not be None
899 self.assertTrue(next_info is not None)
900 # index of tx and rx packets should be equal
901 self.assertEqual(packet_index, next_info.index)
902 # data field of next_info contains the tx packet
903 txed_packet = next_info.data
906 ppp("Transmitted packet:", txed_packet)
907 ) # ppp=Pretty Print Packet
909 self.logger.debug(ppp("Received packet:", packet))
911 # compare rcvd packet with expected packet using compare_func
912 compare_func(txed_packet, packet)
915 self.logger.error(ppp("Unexpected or invalid packet:", packet))
918 # have all expected packets arrived?
919 for i in self.pg_interfaces:
920 remaining_packet = self.get_next_packet_info_for_interface2(
921 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index]
924 remaining_packet is None,
925 "Interface %s: Packet expected from interface %s "
926 "didn't arrive" % (dst_if.name, i.name),
930 if __name__ == "__main__":
931 unittest.main(testRunner=VppTestRunner)