5 from framework import VppTestCase
6 from asfframework import VppTestRunner
7 from vpp_ip_route import VppIpRoute, VppRoutePath
10 from scapy.packet import Raw
11 from scapy.layers.l2 import Ether, Dot1Q
12 from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
13 from scapy.layers.inet import IP, UDP
18 class TestSRv6As(VppTestCase):
19 """SRv6 Static Proxy plugin Test Case"""
23 super(TestSRv6As, self).setUpClass()
26 def tearDownClass(cls):
27 super(TestSRv6As, cls).tearDownClass()
30 """Perform test setup before each test case."""
31 super(TestSRv6As, self).setUp()
33 # packet sizes, inclusive L2 overhead
34 self.pg_packet_sizes = [64, 512, 1518, 9018]
37 self.reset_packet_infos()
40 """Clean up test setup after each test case."""
41 self.teardown_interfaces()
43 super(TestSRv6As, self).tearDown()
45 def configure_interface(
46 self, interface, ipv6=False, ipv4=False, ipv6_table_id=0, ipv4_table_id=0
48 """Configure interface.
49 :param ipv6: configure IPv6 on interface
50 :param ipv4: configure IPv4 on interface
51 :param ipv6_table_id: FIB table_id for IPv6
52 :param ipv4_table_id: FIB table_id for IPv4
54 self.logger.debug("Configuring interface %s" % (interface.name))
56 self.logger.debug("Configuring IPv6")
57 interface.set_table_ip6(ipv6_table_id)
58 interface.config_ip6()
59 interface.resolve_ndp(timeout=5)
61 self.logger.debug("Configuring IPv4")
62 interface.set_table_ip4(ipv4_table_id)
63 interface.config_ip4()
64 interface.resolve_arp()
67 def setup_interfaces(self, ipv6=[], ipv4=[], ipv6_table_id=[], ipv4_table_id=[]):
68 """Create and configure interfaces.
70 :param ipv6: list of interface IPv6 capabilities
71 :param ipv4: list of interface IPv4 capabilities
72 :param ipv6_table_id: list of intf IPv6 FIB table_ids
73 :param ipv4_table_id: list of intf IPv4 FIB table_ids
74 :returns: List of created interfaces.
76 # how many interfaces?
81 self.logger.debug("Creating and configuring %d interfaces" % (count))
83 # fill up ipv6 and ipv4 lists if needed
84 # not enabled (False) is the default
86 ipv6 += (count - len(ipv6)) * [False]
88 ipv4 += (count - len(ipv4)) * [False]
90 # fill up table_id lists if needed
91 # table_id 0 (global) is the default
92 if len(ipv6_table_id) < count:
93 ipv6_table_id += (count - len(ipv6_table_id)) * [0]
94 if len(ipv4_table_id) < count:
95 ipv4_table_id += (count - len(ipv4_table_id)) * [0]
97 # create 'count' pg interfaces
98 self.create_pg_interfaces(range(count))
100 # setup all interfaces
101 for i in range(count):
102 intf = self.pg_interfaces[i]
103 self.configure_interface(
104 intf, ipv6[i], ipv4[i], ipv6_table_id[i], ipv4_table_id[i]
108 self.logger.debug(self.vapi.cli("show ip6 neighbors"))
110 self.logger.debug(self.vapi.cli("show ip4 neighbors"))
111 self.logger.debug(self.vapi.cli("show interface"))
112 self.logger.debug(self.vapi.cli("show hardware"))
114 return self.pg_interfaces
116 def teardown_interfaces(self):
117 """Unconfigure and bring down interface."""
118 self.logger.debug("Tearing down interfaces")
119 # tear down all interfaces
120 # AFAIK they cannot be deleted
121 for i in self.pg_interfaces:
122 self.logger.debug("Tear down interface %s" % (i.name))
128 def test_SRv6_End_AS_IPv6_noSRH(self):
129 """Test SRv6 End.AS behavior with IPv6 traffic and no SRH rewrite."""
130 self.run_SRv6_End_AS_IPv6(
131 sid_list=["a1::", "a2::a6", "a3::"],
133 rewrite_src_addr="a2::",
136 def test_SRv6_End_AS_IPv6_SRH(self):
137 """Test SRv6 End.AS behavior with IPv6 traffic and SRH rewrite."""
138 self.run_SRv6_End_AS_IPv6(
139 sid_list=["a1::a6", "a2::", "a3::"],
141 rewrite_src_addr="a1::",
144 def test_SRv6_End_AS_IPv4_noSRH(self):
145 """Test SRv6 End.AS behavior with IPv4 traffic and no SRH rewrite."""
146 self.run_SRv6_End_AS_IPv4(
147 sid_list=["a1::", "a2::a6", "a3::"],
149 rewrite_src_addr="a2::",
152 def test_SRv6_End_AS_IPv4_SRH(self):
153 """Test SRv6 End.AS behavior with IPv4 traffic and SRH rewrite."""
154 self.run_SRv6_End_AS_IPv4(
155 sid_list=["a1::a6", "a2::", "a3::"],
157 rewrite_src_addr="a1::",
160 def test_SRv6_End_AS_L2_noSRH(self):
161 """Test SRv6 End.AS behavior with L2 traffic and no SRH rewrite."""
162 self.run_SRv6_End_AS_L2(
163 sid_list=["a1::", "a2::a6", "a3::"],
165 rewrite_src_addr="a2::",
168 def test_SRv6_End_AS_L2_SRH(self):
169 """Test SRv6 End.AS behavior with L2 traffic and SRH rewrite."""
170 self.run_SRv6_End_AS_L2(
171 sid_list=["a1::a6", "a2::", "a3::"],
173 rewrite_src_addr="a1::",
176 def run_SRv6_End_AS_L2(self, sid_list, test_sid_index, rewrite_src_addr):
177 """Run SRv6 End.AS test with L2 traffic."""
178 self.rewrite_src_addr = rewrite_src_addr
179 self.rewrite_sid_list = sid_list[test_sid_index + 1 : :]
181 # send traffic to one destination interface
182 # source and destination interfaces are IPv6 only
183 self.setup_interfaces(ipv6=[True, False])
185 # configure route to next segment
188 sid_list[test_sid_index + 1],
190 [VppRoutePath(self.pg0.remote_ip6, self.pg0.sw_if_index)],
192 route.add_vpp_config()
194 # configure SRv6 localSID behavior
196 "sr localsid address "
197 + sid_list[test_sid_index]
204 + self.rewrite_src_addr
206 for s in self.rewrite_sid_list:
207 cli_str += " next " + s
208 self.vapi.cli(cli_str)
211 self.logger.debug(self.vapi.cli("show sr localsid"))
213 # send one packet per packet size
214 count = len(self.pg_packet_sizes)
216 # prepare L2 in SRv6 headers
217 packet_header1 = self.create_packet_header_IPv6_SRH_L2(
218 sidlist=sid_list[::-1], segleft=len(sid_list) - test_sid_index - 1, vlan=0
221 # generate packets (pg0->pg1)
222 pkts1 = self.create_stream(
223 self.pg0, self.pg1, packet_header1, self.pg_packet_sizes, count
226 # send packets and verify received packets
227 self.send_and_verify_pkts(
228 self.pg0, pkts1, self.pg1, self.compare_rx_tx_packet_End_AS_L2_out
231 # log the localsid counters
232 self.logger.info(self.vapi.cli("show sr localsid"))
234 # prepare L2 header for returning packets
235 packet_header2 = self.create_packet_header_L2()
237 # generate returning packets (pg1->pg0)
238 pkts2 = self.create_stream(
239 self.pg1, self.pg0, packet_header2, self.pg_packet_sizes, count
242 # send packets and verify received packets
243 self.send_and_verify_pkts(
244 self.pg1, pkts2, self.pg0, self.compare_rx_tx_packet_End_AS_L2_in
247 # log the localsid counters
248 self.logger.info(self.vapi.cli("show sr localsid"))
250 # remove SRv6 localSIDs
251 self.vapi.cli("sr localsid del address " + sid_list[test_sid_index])
254 self.teardown_interfaces()
256 def run_SRv6_End_AS_IPv6(self, sid_list, test_sid_index, rewrite_src_addr):
257 """Run SRv6 End.AS test with IPv6 traffic."""
258 self.rewrite_src_addr = rewrite_src_addr
259 self.rewrite_sid_list = sid_list[test_sid_index + 1 : :]
261 # send traffic to one destination interface
262 # source and destination interfaces are IPv6 only
263 self.setup_interfaces(ipv6=[True, True])
265 # configure route to next segment
268 sid_list[test_sid_index + 1],
270 [VppRoutePath(self.pg0.remote_ip6, self.pg0.sw_if_index)],
272 route.add_vpp_config()
274 # configure SRv6 localSID behavior
276 "sr localsid address "
277 + sid_list[test_sid_index]
280 + self.pg1.remote_ip6
286 + self.rewrite_src_addr
288 for s in self.rewrite_sid_list:
289 cli_str += " next " + s
290 self.vapi.cli(cli_str)
293 self.logger.debug(self.vapi.cli("show sr localsid"))
295 # send one packet per packet size
296 count = len(self.pg_packet_sizes)
298 # prepare IPv6 in SRv6 headers
299 packet_header1 = self.create_packet_header_IPv6_SRH_IPv6(
300 sidlist=sid_list[::-1], segleft=len(sid_list) - test_sid_index - 1
303 # generate packets (pg0->pg1)
304 pkts1 = self.create_stream(
305 self.pg0, self.pg1, packet_header1, self.pg_packet_sizes, count
308 # send packets and verify received packets
309 self.send_and_verify_pkts(
310 self.pg0, pkts1, self.pg1, self.compare_rx_tx_packet_End_AS_IPv6_out
313 # log the localsid counters
314 self.logger.info(self.vapi.cli("show sr localsid"))
316 # prepare IPv6 header for returning packets
317 packet_header2 = self.create_packet_header_IPv6()
319 # generate returning packets (pg1->pg0)
320 pkts2 = self.create_stream(
321 self.pg1, self.pg0, packet_header2, self.pg_packet_sizes, count
324 # send packets and verify received packets
325 self.send_and_verify_pkts(
326 self.pg1, pkts2, self.pg0, self.compare_rx_tx_packet_End_AS_IPv6_in
329 # log the localsid counters
330 self.logger.info(self.vapi.cli("show sr localsid"))
332 # remove SRv6 localSIDs
333 self.vapi.cli("sr localsid del address " + sid_list[test_sid_index])
336 self.teardown_interfaces()
338 def run_SRv6_End_AS_IPv4(self, sid_list, test_sid_index, rewrite_src_addr):
339 """Run SRv6 End.AS test with IPv4 traffic."""
340 self.rewrite_src_addr = rewrite_src_addr
341 self.rewrite_sid_list = sid_list[test_sid_index + 1 : :]
343 # send traffic to one destination interface
344 # source and destination interfaces are IPv6 only
345 self.setup_interfaces(ipv6=[True, False], ipv4=[True, True])
347 # configure route to next segment
350 sid_list[test_sid_index + 1],
352 [VppRoutePath(self.pg0.remote_ip6, self.pg0.sw_if_index)],
354 route.add_vpp_config()
356 # configure SRv6 localSID behavior
358 "sr localsid address "
359 + sid_list[test_sid_index]
362 + self.pg1.remote_ip4
368 + self.rewrite_src_addr
370 for s in self.rewrite_sid_list:
371 cli_str += " next " + s
372 self.vapi.cli(cli_str)
375 self.logger.debug(self.vapi.cli("show sr localsid"))
377 # send one packet per packet size
378 count = len(self.pg_packet_sizes)
380 # prepare IPv4 in SRv6 headers
381 packet_header1 = self.create_packet_header_IPv6_SRH_IPv4(
382 sidlist=sid_list[::-1], segleft=len(sid_list) - test_sid_index - 1
385 # generate packets (pg0->pg1)
386 pkts1 = self.create_stream(
387 self.pg0, self.pg1, packet_header1, self.pg_packet_sizes, count
390 # send packets and verify received packets
391 self.send_and_verify_pkts(
392 self.pg0, pkts1, self.pg1, self.compare_rx_tx_packet_End_AS_IPv4_out
395 # log the localsid counters
396 self.logger.info(self.vapi.cli("show sr localsid"))
398 # prepare IPv6 header for returning packets
399 packet_header2 = self.create_packet_header_IPv4()
401 # generate returning packets (pg1->pg0)
402 pkts2 = self.create_stream(
403 self.pg1, self.pg0, packet_header2, self.pg_packet_sizes, count
406 # send packets and verify received packets
407 self.send_and_verify_pkts(
408 self.pg1, pkts2, self.pg0, self.compare_rx_tx_packet_End_AS_IPv4_in
411 # log the localsid counters
412 self.logger.info(self.vapi.cli("show sr localsid"))
414 # remove SRv6 localSIDs
415 self.vapi.cli("sr localsid del address " + sid_list[test_sid_index])
418 self.teardown_interfaces()
420 def compare_rx_tx_packet_End_AS_IPv6_in(self, tx_pkt, rx_pkt):
421 """Compare input and output packet after passing End.AS
423 :param tx_pkt: transmitted packet
424 :param rx_pkt: received packet
427 # get first (outer) IPv6 header of rx'ed packet
428 rx_ip = rx_pkt.getlayer(IPv6)
431 tx_ip = tx_pkt.getlayer(IPv6)
433 # expected segment-list (SRH order)
434 tx_seglist = self.rewrite_sid_list[::-1]
436 # received ip.src should be equal to SR Policy source
437 self.assertEqual(rx_ip.src, self.rewrite_src_addr)
438 # received ip.dst should be equal to expected sidlist[lastentry]
439 self.assertEqual(rx_ip.dst, tx_seglist[-1])
441 if len(tx_seglist) > 1:
442 # rx'ed packet should have SRH
443 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
445 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
446 # rx'ed seglist should be equal to expected seglist
447 self.assertEqual(rx_srh.addresses, tx_seglist)
448 # segleft should be equal to size expected seglist-1
449 self.assertEqual(rx_srh.segleft, len(tx_seglist) - 1)
450 # segleft should be equal to lastentry
451 self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
453 payload = rx_srh.payload
455 # rx'ed packet should NOT have SRH
456 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
458 payload = rx_ip.payload
460 # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
461 # except for the hop-limit field
462 # -> update tx'ed hlim to the expected hlim
463 tx_ip.hlim = tx_ip.hlim - 1
465 self.assertEqual(payload, tx_ip)
467 self.logger.debug("packet verification: SUCCESS")
469 def compare_rx_tx_packet_End_AS_IPv4_in(self, tx_pkt, rx_pkt):
470 """Compare input and output packet after passing End.AS
472 :param tx_pkt: transmitted packet
473 :param rx_pkt: received packet
476 # get first (outer) IPv6 header of rx'ed packet
477 rx_ip = rx_pkt.getlayer(IPv6)
480 tx_ip = tx_pkt.getlayer(IP)
482 # expected segment-list (SRH order)
483 tx_seglist = self.rewrite_sid_list[::-1]
485 # received ip.src should be equal to SR Policy source
486 self.assertEqual(rx_ip.src, self.rewrite_src_addr)
487 # received ip.dst should be equal to expected sidlist[lastentry]
488 self.assertEqual(rx_ip.dst, tx_seglist[-1])
490 if len(tx_seglist) > 1:
491 # rx'ed packet should have SRH and IPv4 header
492 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
493 self.assertTrue(rx_ip.payload.haslayer(IP))
495 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
496 # rx'ed seglist should be equal to seglist
497 self.assertEqual(rx_srh.addresses, tx_seglist)
498 # segleft should be equal to size seglist-1
499 self.assertEqual(rx_srh.segleft, len(tx_seglist) - 1)
500 # segleft should be equal to lastentry
501 self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
502 payload = rx_srh.payload
504 # rx'ed packet should NOT have SRH
505 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
507 payload = rx_ip.payload
509 # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
510 # except for the ttl field and ip checksum
511 # -> adjust tx'ed ttl to expected ttl
512 tx_ip.ttl = tx_ip.ttl - 1
513 # -> set tx'ed ip checksum to None and let scapy recompute
515 # read back the pkt (with str()) to force computing these fields
516 # probably other ways to accomplish this are possible
517 tx_ip = IP(scapy.compat.raw(tx_ip))
519 self.assertEqual(payload, tx_ip)
521 self.logger.debug("packet verification: SUCCESS")
523 def compare_rx_tx_packet_End_AS_L2_in(self, tx_pkt, rx_pkt):
524 """Compare input and output packet after passing End.AS
526 :param tx_pkt: transmitted packet
527 :param rx_pkt: received packet
530 # get first (outer) IPv6 header of rx'ed packet
531 rx_ip = rx_pkt.getlayer(IPv6)
534 tx_ether = tx_pkt.getlayer(Ether)
536 # expected segment-list (SRH order)
537 tx_seglist = self.rewrite_sid_list[::-1]
539 # received ip.src should be equal to SR Policy source
540 self.assertEqual(rx_ip.src, self.rewrite_src_addr)
541 # received ip.dst should be equal to expected sidlist[lastentry]
542 self.assertEqual(rx_ip.dst, tx_seglist[-1])
544 if len(tx_seglist) > 1:
545 # rx'ed packet should have SRH
546 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
548 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
549 # rx'ed seglist should be equal to seglist
550 self.assertEqual(rx_srh.addresses, tx_seglist)
551 # segleft should be equal to size seglist-1
552 self.assertEqual(rx_srh.segleft, len(tx_seglist) - 1)
553 # segleft should be equal to lastentry
554 self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
555 # nh should be "No Next Header" (143)
556 self.assertEqual(rx_srh.nh, 143)
558 payload = rx_srh.payload
560 # rx'ed packet should NOT have SRH
561 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
563 payload = rx_ip.payload
565 # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
566 self.assertEqual(Ether(scapy.compat.raw(payload)), tx_ether)
568 self.logger.debug("packet verification: SUCCESS")
570 def compare_rx_tx_packet_End_AS_IPv6_out(self, tx_pkt, rx_pkt):
571 """Compare input and output packet after passing End.AS with IPv6
573 :param tx_pkt: transmitted packet
574 :param rx_pkt: received packet
577 # get first (outer) IPv6 header of rx'ed packet
578 rx_ip = rx_pkt.getlayer(IPv6)
580 tx_ip = tx_pkt.getlayer(IPv6)
581 tx_ip2 = tx_pkt.getlayer(IPv6, 2)
583 # verify if rx'ed packet has no SRH
584 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
586 # the whole rx_ip pkt should be equal to tx_ip2
587 # except for the hlim field
588 # -> adjust tx'ed hlim to expected hlim
589 tx_ip2.hlim = tx_ip2.hlim - 1
591 self.assertEqual(rx_ip, tx_ip2)
593 self.logger.debug("packet verification: SUCCESS")
595 def compare_rx_tx_packet_End_AS_IPv4_out(self, tx_pkt, rx_pkt):
596 """Compare input and output packet after passing End.AS with IPv4
598 :param tx_pkt: transmitted packet
599 :param rx_pkt: received packet
602 # get IPv4 header of rx'ed packet
603 rx_ip = rx_pkt.getlayer(IP)
605 tx_ip = tx_pkt.getlayer(IPv6)
606 tx_ip2 = tx_pkt.getlayer(IP)
608 # verify if rx'ed packet has no SRH
609 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
611 # the whole rx_ip pkt should be equal to tx_ip2
612 # except for the ttl field and ip checksum
613 # -> adjust tx'ed ttl to expected ttl
614 tx_ip2.ttl = tx_ip2.ttl - 1
615 # -> set tx'ed ip checksum to None and let scapy recompute
617 # read back the pkt (with str()) to force computing these fields
618 # probably other ways to accomplish this are possible
619 tx_ip2 = IP(scapy.compat.raw(tx_ip2))
621 self.assertEqual(rx_ip, tx_ip2)
623 self.logger.debug("packet verification: SUCCESS")
625 def compare_rx_tx_packet_End_AS_L2_out(self, tx_pkt, rx_pkt):
626 """Compare input and output packet after passing End.AS with L2
628 :param tx_pkt: transmitted packet
629 :param rx_pkt: received packet
632 # get IPv4 header of rx'ed packet
633 rx_eth = rx_pkt.getlayer(Ether)
635 tx_ip = tx_pkt.getlayer(IPv6)
636 # we can't just get the 2nd Ether layer
637 # get the Raw content and dissect it as Ether
638 tx_eth1 = Ether(scapy.compat.raw(tx_pkt[Raw]))
640 # verify if rx'ed packet has no SRH
641 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
643 # the whole rx_eth pkt should be equal to tx_eth1
644 self.assertEqual(rx_eth, tx_eth1)
646 self.logger.debug("packet verification: SUCCESS")
648 def create_stream(self, src_if, dst_if, packet_header, packet_sizes, count):
649 """Create SRv6 input packet stream for defined interface.
651 :param VppInterface src_if: Interface to create packet stream for
652 :param VppInterface dst_if: destination interface of packet stream
653 :param packet_header: Layer3 scapy packet headers,
654 L2 is added when not provided,
655 Raw(payload) with packet_info is added
656 :param list packet_sizes: packet stream pckt sizes,sequentially applied
657 to packets in stream have
658 :param int count: number of packets in packet stream
659 :return: list of packets
661 self.logger.info("Creating packets")
663 for i in range(0, count - 1):
664 payload_info = self.create_packet_info(src_if, dst_if)
665 self.logger.debug("Creating packet with index %d" % (payload_info.index))
666 payload = self.info_to_payload(payload_info)
667 # add L2 header if not yet provided in packet_header
668 if packet_header.getlayer(0).name == "Ethernet":
669 p = packet_header / Raw(payload)
672 Ether(dst=src_if.local_mac, src=src_if.remote_mac)
676 size = packet_sizes[i % len(packet_sizes)]
677 self.logger.debug("Packet size %d" % (size))
678 self.extend_packet(p, size)
679 # we need to store the packet with the automatic fields computed
680 # read back the dumped packet (with str())
681 # to force computing these fields
682 # probably other ways are possible
683 p = Ether(scapy.compat.raw(p))
684 payload_info.data = p.copy()
685 self.logger.debug(ppp("Created packet:", p))
687 self.logger.info("Done creating packets")
690 def send_and_verify_pkts(self, input, pkts, output, compare_func):
691 """Send packets and verify received packets using compare_func
693 :param input: ingress interface of DUT
694 :param pkts: list of packets to transmit
695 :param output: egress interface of DUT
696 :param compare_func: function to compare in and out packets
698 # add traffic stream to input interface
699 input.add_stream(pkts)
701 # enable capture on all interfaces
702 self.pg_enable_capture(self.pg_interfaces)
705 self.logger.info("Starting traffic")
709 self.logger.info("Getting packet capture")
710 capture = output.get_capture()
712 # assert nothing was captured on input interface
713 # input.assert_nothing_captured()
715 # verify captured packets
716 self.verify_captured_pkts(output, capture, compare_func)
718 def create_packet_header_IPv6(self):
719 """Create packet header: IPv6 header, UDP header
721 :param dst: IPv6 destination address
723 IPv6 source address is 1234::1
724 IPv6 destination address is 4321::1
725 UDP source port and destination port are 1234
728 p = IPv6(src="1234::1", dst="4321::1") / UDP(sport=1234, dport=1234)
731 def create_packet_header_IPv6_SRH_IPv6(self, sidlist, segleft):
732 """Create packet header: IPv6 encapsulated in SRv6:
733 IPv6 header with SRH, IPv6 header, UDP header
735 :param list sidlist: segment list of outer IPv6 SRH
736 :param int segleft: segments-left field of outer IPv6 SRH
738 Outer IPv6 source address is set to 5678::1
739 Outer IPv6 destination address is set to sidlist[segleft]
740 IPv6 source addresses is 1234::1
741 IPv6 destination address is 4321::1
742 UDP source port and destination port are 1234
746 IPv6(src="5678::1", dst=sidlist[segleft])
747 / IPv6ExtHdrSegmentRouting(addresses=sidlist, segleft=segleft, nh=41)
748 / IPv6(src="1234::1", dst="4321::1")
749 / UDP(sport=1234, dport=1234)
753 def create_packet_header_IPv4(self):
754 """Create packet header: IPv4 header, UDP header
756 :param dst: IPv4 destination address
758 IPv4 source address is 123.1.1.1
759 IPv4 destination address is 124.1.1.1
760 UDP source port and destination port are 1234
763 p = IP(src="123.1.1.1", dst="124.1.1.1") / UDP(sport=1234, dport=1234)
766 def create_packet_header_IPv6_SRH_IPv4(self, sidlist, segleft):
767 """Create packet header: IPv4 encapsulated in SRv6:
768 IPv6 header with SRH, IPv4 header, UDP header
770 :param ipv4address dst: inner IPv4 destination address
771 :param list sidlist: segment list of outer IPv6 SRH
772 :param int segleft: segments-left field of outer IPv6 SRH
774 Outer IPv6 destination address is set to sidlist[segleft]
775 IPv6 source address is 1234::1
776 IPv4 source address is 123.1.1.1
777 IPv4 destination address is 124.1.1.1
778 UDP source port and destination port are 1234
782 IPv6(src="1234::1", dst=sidlist[segleft])
783 / IPv6ExtHdrSegmentRouting(addresses=sidlist, segleft=segleft, nh=4)
784 / IP(src="123.1.1.1", dst="124.1.1.1")
785 / UDP(sport=1234, dport=1234)
789 def create_packet_header_L2(self, vlan=0):
790 """Create packet header: L2 header
792 :param vlan: if vlan!=0 then add 802.1q header
794 # Note: the dst addr ('00:55:44:33:22:11') is used in
795 # the compare function compare_rx_tx_packet_T_Encaps_L2
796 # to detect presence of L2 in SRH payload
797 p = Ether(src="00:11:22:33:44:55", dst="00:55:44:33:22:11")
801 p /= Dot1Q(vlan=vlan, type=etype)
806 def create_packet_header_IPv6_SRH_L2(self, sidlist, segleft, vlan=0):
807 """Create packet header: L2 encapsulated in SRv6:
808 IPv6 header with SRH, L2
810 :param list sidlist: segment list of outer IPv6 SRH
811 :param int segleft: segments-left field of outer IPv6 SRH
812 :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
814 Outer IPv6 destination address is set to sidlist[segleft]
815 IPv6 source address is 1234::1
817 eth = Ether(src="00:11:22:33:44:55", dst="00:55:44:33:22:11")
821 eth /= Dot1Q(vlan=vlan, type=etype)
826 IPv6(src="1234::1", dst=sidlist[segleft])
827 / IPv6ExtHdrSegmentRouting(addresses=sidlist, segleft=segleft, nh=143)
832 def get_payload_info(self, packet):
833 """Extract the payload_info from the packet"""
834 # in most cases, payload_info is in packet[Raw]
835 # but packet[Raw] gives the complete payload
836 # (incl L2 header) for the T.Encaps L2 case
838 payload_info = self.payload_to_info(packet[Raw])
841 # remote L2 header from packet[Raw]:
842 # take packet[Raw], convert it to an Ether layer
843 # and then extract Raw from it
844 payload_info = self.payload_to_info(
845 Ether(scapy.compat.raw(packet[Raw]))[Raw]
850 def verify_captured_pkts(self, dst_if, capture, compare_func):
852 Verify captured packet stream for specified interface.
853 Compare ingress with egress packets using the specified compare fn
855 :param dst_if: egress interface of DUT
856 :param capture: captured packets
857 :param compare_func: function to compare in and out packet
860 "Verifying capture on interface %s using function %s"
861 % (dst_if.name, compare_func.__name__)
865 for i in self.pg_interfaces:
866 last_info[i.sw_if_index] = None
867 dst_sw_if_index = dst_if.sw_if_index
869 for packet in capture:
871 # extract payload_info from packet's payload
872 payload_info = self.get_payload_info(packet)
873 packet_index = payload_info.index
875 self.logger.debug("Verifying packet with index %d" % (packet_index))
876 # packet should have arrived on the expected interface
877 self.assertEqual(payload_info.dst, dst_sw_if_index)
879 "Got packet on interface %s: src=%u (idx=%u)"
880 % (dst_if.name, payload_info.src, packet_index)
883 # search for payload_info with same src and dst if_index
884 # this will give us the transmitted packet
885 next_info = self.get_next_packet_info_for_interface2(
886 payload_info.src, dst_sw_if_index, last_info[payload_info.src]
888 last_info[payload_info.src] = next_info
889 # next_info should not be None
890 self.assertTrue(next_info is not None)
891 # index of tx and rx packets should be equal
892 self.assertEqual(packet_index, next_info.index)
893 # data field of next_info contains the tx packet
894 txed_packet = next_info.data
897 ppp("Transmitted packet:", txed_packet)
898 ) # ppp=Pretty Print Packet
900 self.logger.debug(ppp("Received packet:", packet))
902 # compare rcvd packet with expected packet using compare_func
903 compare_func(txed_packet, packet)
906 self.logger.error(ppp("Unexpected or invalid packet:", packet))
909 # have all expected packets arrived?
910 for i in self.pg_interfaces:
911 remaining_packet = self.get_next_packet_info_for_interface2(
912 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index]
915 remaining_packet is None,
916 "Interface %s: Packet expected from interface %s "
917 "didn't arrive" % (dst_if.name, i.name),
921 if __name__ == "__main__":
922 unittest.main(testRunner=VppTestRunner)