tests: Add support for getting corefile patterns on FreeBSD
[vpp.git] / test / test_srv6_ad.py
1 #!/usr/bin/env python3
2
3 import unittest
4
5 from framework import VppTestCase
6 from asfframework import VppTestRunner
7 from vpp_ip import DpoProto
8 from vpp_ip_route import VppIpRoute, VppRoutePath
9
10
11 import scapy.compat
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether, Dot1Q
14 from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
15 from scapy.layers.inet import IP, UDP
16
17 from util import ppp
18
19
20 class TestSRv6Ad(VppTestCase):
21     """SRv6 Dynamic Proxy plugin Test Case"""
22
23     @classmethod
24     def setUpClass(self):
25         super(TestSRv6Ad, self).setUpClass()
26
27     @classmethod
28     def tearDownClass(cls):
29         super(TestSRv6Ad, cls).tearDownClass()
30
31     def setUp(self):
32         """Perform test setup before each test case."""
33         super(TestSRv6Ad, self).setUp()
34
35         # packet sizes, inclusive L2 overhead
36         self.pg_packet_sizes = [64, 512, 1518, 9018]
37
38         # reset packet_infos
39         self.reset_packet_infos()
40
41     def tearDown(self):
42         """Clean up test setup after each test case."""
43         self.teardown_interfaces()
44
45         super(TestSRv6Ad, self).tearDown()
46
47     def configure_interface(
48         self, interface, ipv6=False, ipv4=False, ipv6_table_id=0, ipv4_table_id=0
49     ):
50         """Configure interface.
51         :param ipv6: configure IPv6 on interface
52         :param ipv4: configure IPv4 on interface
53         :param ipv6_table_id: FIB table_id for IPv6
54         :param ipv4_table_id: FIB table_id for IPv4
55         """
56         self.logger.debug("Configuring interface %s" % (interface.name))
57         if ipv6:
58             self.logger.debug("Configuring IPv6")
59             interface.set_table_ip6(ipv6_table_id)
60             interface.config_ip6()
61             interface.resolve_ndp(timeout=5)
62         if ipv4:
63             self.logger.debug("Configuring IPv4")
64             interface.set_table_ip4(ipv4_table_id)
65             interface.config_ip4()
66             interface.resolve_arp()
67         interface.admin_up()
68
69     def setup_interfaces(self, ipv6=[], ipv4=[], ipv6_table_id=[], ipv4_table_id=[]):
70         """Create and configure interfaces.
71
72         :param ipv6: list of interface IPv6 capabilities
73         :param ipv4: list of interface IPv4 capabilities
74         :param ipv6_table_id: list of intf IPv6 FIB table_ids
75         :param ipv4_table_id: list of intf IPv4 FIB table_ids
76         :returns: List of created interfaces.
77         """
78         # how many interfaces?
79         if len(ipv6):
80             count = len(ipv6)
81         else:
82             count = len(ipv4)
83         self.logger.debug("Creating and configuring %d interfaces" % (count))
84
85         # fill up ipv6 and ipv4 lists if needed
86         # not enabled (False) is the default
87         if len(ipv6) < count:
88             ipv6 += (count - len(ipv6)) * [False]
89         if len(ipv4) < count:
90             ipv4 += (count - len(ipv4)) * [False]
91
92         # fill up table_id lists if needed
93         # table_id 0 (global) is the default
94         if len(ipv6_table_id) < count:
95             ipv6_table_id += (count - len(ipv6_table_id)) * [0]
96         if len(ipv4_table_id) < count:
97             ipv4_table_id += (count - len(ipv4_table_id)) * [0]
98
99         # create 'count' pg interfaces
100         self.create_pg_interfaces(range(count))
101
102         # setup all interfaces
103         for i in range(count):
104             intf = self.pg_interfaces[i]
105             self.configure_interface(
106                 intf, ipv6[i], ipv4[i], ipv6_table_id[i], ipv4_table_id[i]
107             )
108
109         if any(ipv6):
110             self.logger.debug(self.vapi.cli("show ip6 neighbors"))
111         if any(ipv4):
112             self.logger.debug(self.vapi.cli("show ip4 neighbors"))
113         self.logger.debug(self.vapi.cli("show interface"))
114         self.logger.debug(self.vapi.cli("show hardware"))
115
116         return self.pg_interfaces
117
118     def teardown_interfaces(self):
119         """Unconfigure and bring down interface."""
120         self.logger.debug("Tearing down interfaces")
121         # tear down all interfaces
122         # AFAIK they cannot be deleted
123         for i in self.pg_interfaces:
124             self.logger.debug("Tear down interface %s" % (i.name))
125             i.admin_down()
126             i.unconfig()
127             i.set_table_ip4(0)
128             i.set_table_ip6(0)
129
130     def test_SRv6_End_AD_IPv6(self):
131         """Test SRv6 End.AD behavior with IPv6 traffic."""
132         self.src_addr = "a0::"
133         self.sid_list = ["a1::", "a2::a6", "a3::"]
134         self.test_sid_index = 1
135
136         # send traffic to one destination interface
137         # source and destination interfaces are IPv6 only
138         self.setup_interfaces(ipv6=[True, True])
139
140         # configure route to next segment
141         route = VppIpRoute(
142             self,
143             self.sid_list[self.test_sid_index + 1],
144             128,
145             [
146                 VppRoutePath(
147                     self.pg0.remote_ip6,
148                     self.pg0.sw_if_index,
149                     proto=DpoProto.DPO_PROTO_IP6,
150                 )
151             ],
152         )
153         route.add_vpp_config()
154
155         # configure SRv6 localSID behavior
156         cli_str = (
157             "sr localsid address "
158             + self.sid_list[self.test_sid_index]
159             + " behavior end.ad"
160             + " nh "
161             + self.pg1.remote_ip6
162             + " oif "
163             + self.pg1.name
164             + " iif "
165             + self.pg1.name
166         )
167         self.vapi.cli(cli_str)
168
169         # log the localsids
170         self.logger.debug(self.vapi.cli("show sr localsid"))
171
172         # send one packet per packet size
173         count = len(self.pg_packet_sizes)
174
175         # prepare IPv6 in SRv6 headers
176         packet_header1 = self.create_packet_header_IPv6_SRH_IPv6(
177             srcaddr=self.src_addr,
178             sidlist=self.sid_list[::-1],
179             segleft=len(self.sid_list) - self.test_sid_index - 1,
180         )
181
182         # generate packets (pg0->pg1)
183         pkts1 = self.create_stream(
184             self.pg0, self.pg1, packet_header1, self.pg_packet_sizes, count
185         )
186
187         # send packets and verify received packets
188         self.send_and_verify_pkts(
189             self.pg0, pkts1, self.pg1, self.compare_rx_tx_packet_End_AD_IPv6_out
190         )
191
192         # log the localsid counters
193         self.logger.info(self.vapi.cli("show sr localsid"))
194
195         # prepare IPv6 header for returning packets
196         packet_header2 = self.create_packet_header_IPv6()
197
198         # generate returning packets (pg1->pg0)
199         pkts2 = self.create_stream(
200             self.pg1, self.pg0, packet_header2, self.pg_packet_sizes, count
201         )
202
203         # send packets and verify received packets
204         self.send_and_verify_pkts(
205             self.pg1, pkts2, self.pg0, self.compare_rx_tx_packet_End_AD_IPv6_in
206         )
207
208         # log the localsid counters
209         self.logger.info(self.vapi.cli("show sr localsid"))
210
211         # remove SRv6 localSIDs
212         cli_str = "sr localsid del address " + self.sid_list[self.test_sid_index]
213         self.vapi.cli(cli_str)
214
215         # cleanup interfaces
216         self.teardown_interfaces()
217
218     def compare_rx_tx_packet_End_AD_IPv6_out(self, tx_pkt, rx_pkt):
219         """Compare input and output packet after passing End.AD with IPv6
220
221         :param tx_pkt: transmitted packet
222         :param rx_pkt: received packet
223         """
224
225         # get first (outer) IPv6 header of rx'ed packet
226         rx_ip = rx_pkt.getlayer(IPv6)
227
228         tx_ip = tx_pkt.getlayer(IPv6)
229         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
230
231         # verify if rx'ed packet has no SRH
232         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
233
234         # the whole rx_ip pkt should be equal to tx_ip2
235         # except for the hlim field
236         #   -> adjust tx'ed hlim to expected hlim
237         tx_ip2.hlim = tx_ip2.hlim - 1
238
239         self.assertEqual(rx_ip, tx_ip2)
240
241         self.logger.debug("packet verification: SUCCESS")
242
243     def compare_rx_tx_packet_End_AD_IPv6_in(self, tx_pkt, rx_pkt):
244         """Compare input and output packet after passing End.AD
245
246         :param tx_pkt: transmitted packet
247         :param rx_pkt: received packet
248         """
249
250         # get first (outer) IPv6 header of rx'ed packet
251         rx_ip = rx_pkt.getlayer(IPv6)
252         # received ip.src should be equal to SR Policy source
253         self.assertEqual(rx_ip.src, self.src_addr)
254         # received ip.dst should be equal to expected sidlist next segment
255         self.assertEqual(rx_ip.dst, self.sid_list[self.test_sid_index + 1])
256
257         # rx'ed packet should have SRH
258         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
259
260         # get SRH
261         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
262         # rx'ed seglist should be equal to SID-list in reversed order
263         self.assertEqual(rx_srh.addresses, self.sid_list[::-1])
264         # segleft should be equal to previous segleft value minus 1
265         self.assertEqual(rx_srh.segleft, len(self.sid_list) - self.test_sid_index - 2)
266         # lastentry should be equal to the SID-list length minus 1
267         self.assertEqual(rx_srh.lastentry, len(self.sid_list) - 1)
268
269         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
270         # except for the hop-limit field
271         tx_ip = tx_pkt.getlayer(IPv6)
272         #   -> update tx'ed hlim to the expected hlim
273         tx_ip.hlim -= 1
274         #   -> check payload
275         self.assertEqual(rx_srh.payload, tx_ip)
276
277         self.logger.debug("packet verification: SUCCESS")
278
279     def test_SRv6_End_AD_IPv4(self):
280         """Test SRv6 End.AD behavior with IPv4 traffic."""
281         self.src_addr = "a0::"
282         self.sid_list = ["a1::", "a2::a4", "a3::"]
283         self.test_sid_index = 1
284
285         # send traffic to one destination interface
286         # source and destination interfaces are IPv6 only
287         self.setup_interfaces(ipv6=[True, False], ipv4=[False, True])
288
289         # configure route to next segment
290         route = VppIpRoute(
291             self,
292             self.sid_list[self.test_sid_index + 1],
293             128,
294             [
295                 VppRoutePath(
296                     self.pg0.remote_ip6,
297                     self.pg0.sw_if_index,
298                     proto=DpoProto.DPO_PROTO_IP6,
299                 )
300             ],
301         )
302         route.add_vpp_config()
303
304         # configure SRv6 localSID behavior
305         cli_str = (
306             "sr localsid address "
307             + self.sid_list[self.test_sid_index]
308             + " behavior end.ad"
309             + " nh "
310             + self.pg1.remote_ip4
311             + " oif "
312             + self.pg1.name
313             + " iif "
314             + self.pg1.name
315         )
316         self.vapi.cli(cli_str)
317
318         # log the localsids
319         self.logger.debug(self.vapi.cli("show sr localsid"))
320
321         # send one packet per packet size
322         count = len(self.pg_packet_sizes)
323
324         # prepare IPv4 in SRv6 headers
325         packet_header1 = self.create_packet_header_IPv6_SRH_IPv4(
326             srcaddr=self.src_addr,
327             sidlist=self.sid_list[::-1],
328             segleft=len(self.sid_list) - self.test_sid_index - 1,
329         )
330
331         # generate packets (pg0->pg1)
332         pkts1 = self.create_stream(
333             self.pg0, self.pg1, packet_header1, self.pg_packet_sizes, count
334         )
335
336         # send packets and verify received packets
337         self.send_and_verify_pkts(
338             self.pg0, pkts1, self.pg1, self.compare_rx_tx_packet_End_AD_IPv4_out
339         )
340
341         # log the localsid counters
342         self.logger.info(self.vapi.cli("show sr localsid"))
343
344         # prepare IPv6 header for returning packets
345         packet_header2 = self.create_packet_header_IPv4()
346
347         # generate returning packets (pg1->pg0)
348         pkts2 = self.create_stream(
349             self.pg1, self.pg0, packet_header2, self.pg_packet_sizes, count
350         )
351
352         # send packets and verify received packets
353         self.send_and_verify_pkts(
354             self.pg1, pkts2, self.pg0, self.compare_rx_tx_packet_End_AD_IPv4_in
355         )
356
357         # log the localsid counters
358         self.logger.info(self.vapi.cli("show sr localsid"))
359
360         # remove SRv6 localSIDs
361         cli_str = "sr localsid del address " + self.sid_list[self.test_sid_index]
362         self.vapi.cli(cli_str)
363
364         # cleanup interfaces
365         self.teardown_interfaces()
366
367     def compare_rx_tx_packet_End_AD_IPv4_out(self, tx_pkt, rx_pkt):
368         """Compare input and output packet after passing End.AD with IPv4
369
370         :param tx_pkt: transmitted packet
371         :param rx_pkt: received packet
372         """
373
374         # get IPv4 header of rx'ed packet
375         rx_ip = rx_pkt.getlayer(IP)
376
377         tx_ip = tx_pkt.getlayer(IPv6)
378         tx_ip2 = tx_pkt.getlayer(IP)
379
380         # verify if rx'ed packet has no SRH
381         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
382
383         # the whole rx_ip pkt should be equal to tx_ip2
384         # except for the ttl field and ip checksum
385         #   -> adjust tx'ed ttl to expected ttl
386         tx_ip2.ttl = tx_ip2.ttl - 1
387         #   -> set tx'ed ip checksum to None and let scapy recompute
388         tx_ip2.chksum = None
389         # read back the pkt (with str()) to force computing these fields
390         # probably other ways to accomplish this are possible
391         tx_ip2 = IP(scapy.compat.raw(tx_ip2))
392
393         self.assertEqual(rx_ip, tx_ip2)
394
395         self.logger.debug("packet verification: SUCCESS")
396
397     def compare_rx_tx_packet_End_AD_IPv4_in(self, tx_pkt, rx_pkt):
398         """Compare input and output packet after passing End.AD
399
400         :param tx_pkt: transmitted packet
401         :param rx_pkt: received packet
402         """
403
404         # get first (outer) IPv6 header of rx'ed packet
405         rx_ip = rx_pkt.getlayer(IPv6)
406         # received ip.src should be equal to SR Policy source
407         self.assertEqual(rx_ip.src, self.src_addr)
408         # received ip.dst should be equal to expected sidlist next segment
409         self.assertEqual(rx_ip.dst, self.sid_list[self.test_sid_index + 1])
410
411         # rx'ed packet should have SRH
412         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
413
414         # get SRH
415         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
416         # rx'ed seglist should be equal to SID-list in reversed order
417         self.assertEqual(rx_srh.addresses, self.sid_list[::-1])
418         # segleft should be equal to previous segleft value minus 1
419         self.assertEqual(rx_srh.segleft, len(self.sid_list) - self.test_sid_index - 2)
420         # lastentry should be equal to the SID-list length minus 1
421         self.assertEqual(rx_srh.lastentry, len(self.sid_list) - 1)
422
423         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
424         # except for the ttl field and ip checksum
425         tx_ip = tx_pkt.getlayer(IP)
426         #   -> adjust tx'ed ttl to expected ttl
427         tx_ip.ttl = tx_ip.ttl - 1
428         #   -> set tx'ed ip checksum to None and let scapy recompute
429         tx_ip.chksum = None
430         #   -> read back the pkt (with str()) to force computing these fields
431         # probably other ways to accomplish this are possible
432         self.assertEqual(rx_srh.payload, IP(scapy.compat.raw(tx_ip)))
433
434         self.logger.debug("packet verification: SUCCESS")
435
436     def test_SRv6_End_AD_L2(self):
437         """Test SRv6 End.AD behavior with L2 traffic."""
438         self.src_addr = "a0::"
439         self.sid_list = ["a1::", "a2::a4", "a3::"]
440         self.test_sid_index = 1
441
442         # send traffic to one destination interface
443         # source and destination interfaces are IPv6 only
444         self.setup_interfaces(ipv6=[True, False])
445
446         # configure route to next segment
447         route = VppIpRoute(
448             self,
449             self.sid_list[self.test_sid_index + 1],
450             128,
451             [
452                 VppRoutePath(
453                     self.pg0.remote_ip6,
454                     self.pg0.sw_if_index,
455                     proto=DpoProto.DPO_PROTO_IP6,
456                 )
457             ],
458         )
459         route.add_vpp_config()
460
461         # configure SRv6 localSID behavior
462         cli_str = (
463             "sr localsid address "
464             + self.sid_list[self.test_sid_index]
465             + " behavior end.ad"
466             + " oif "
467             + self.pg1.name
468             + " iif "
469             + self.pg1.name
470         )
471         self.vapi.cli(cli_str)
472
473         # log the localsids
474         self.logger.debug(self.vapi.cli("show sr localsid"))
475
476         # send one packet per packet size
477         count = len(self.pg_packet_sizes)
478
479         # prepare L2 in SRv6 headers
480         packet_header1 = self.create_packet_header_IPv6_SRH_L2(
481             srcaddr=self.src_addr,
482             sidlist=self.sid_list[::-1],
483             segleft=len(self.sid_list) - self.test_sid_index - 1,
484             vlan=0,
485         )
486
487         # generate packets (pg0->pg1)
488         pkts1 = self.create_stream(
489             self.pg0, self.pg1, packet_header1, self.pg_packet_sizes, count
490         )
491
492         # send packets and verify received packets
493         self.send_and_verify_pkts(
494             self.pg0, pkts1, self.pg1, self.compare_rx_tx_packet_End_AD_L2_out
495         )
496
497         # log the localsid counters
498         self.logger.info(self.vapi.cli("show sr localsid"))
499
500         # prepare L2 header for returning packets
501         packet_header2 = self.create_packet_header_L2()
502
503         # generate returning packets (pg1->pg0)
504         pkts2 = self.create_stream(
505             self.pg1, self.pg0, packet_header2, self.pg_packet_sizes, count
506         )
507
508         # send packets and verify received packets
509         self.send_and_verify_pkts(
510             self.pg1, pkts2, self.pg0, self.compare_rx_tx_packet_End_AD_L2_in
511         )
512
513         # log the localsid counters
514         self.logger.info(self.vapi.cli("show sr localsid"))
515
516         # remove SRv6 localSIDs
517         cli_str = "sr localsid del address " + self.sid_list[self.test_sid_index]
518         self.vapi.cli(cli_str)
519
520         # cleanup interfaces
521         self.teardown_interfaces()
522
523     def compare_rx_tx_packet_End_AD_L2_out(self, tx_pkt, rx_pkt):
524         """Compare input and output packet after passing End.AD with L2
525
526         :param tx_pkt: transmitted packet
527         :param rx_pkt: received packet
528         """
529
530         # get IPv4 header of rx'ed packet
531         rx_eth = rx_pkt.getlayer(Ether)
532
533         tx_ip = tx_pkt.getlayer(IPv6)
534         # we can't just get the 2nd Ether layer
535         # get the Raw content and dissect it as Ether
536         tx_eth1 = Ether(scapy.compat.raw(tx_pkt[Raw]))
537
538         # verify if rx'ed packet has no SRH
539         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
540
541         # the whole rx_eth pkt should be equal to tx_eth1
542         self.assertEqual(rx_eth, tx_eth1)
543
544         self.logger.debug("packet verification: SUCCESS")
545
546     def compare_rx_tx_packet_End_AD_L2_in(self, tx_pkt, rx_pkt):
547         """Compare input and output packet after passing End.AD
548
549         :param tx_pkt: transmitted packet
550         :param rx_pkt: received packet
551         """
552
553         ####
554         # get first (outer) IPv6 header of rx'ed packet
555         rx_ip = rx_pkt.getlayer(IPv6)
556         # received ip.src should be equal to SR Policy source
557         self.assertEqual(rx_ip.src, self.src_addr)
558         # received ip.dst should be equal to expected sidlist next segment
559         self.assertEqual(rx_ip.dst, self.sid_list[self.test_sid_index + 1])
560
561         # rx'ed packet should have SRH
562         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
563
564         # get SRH
565         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
566         # rx'ed seglist should be equal to SID-list in reversed order
567         self.assertEqual(rx_srh.addresses, self.sid_list[::-1])
568         # segleft should be equal to previous segleft value minus 1
569         self.assertEqual(rx_srh.segleft, len(self.sid_list) - self.test_sid_index - 2)
570         # lastentry should be equal to the SID-list length minus 1
571         self.assertEqual(rx_srh.lastentry, len(self.sid_list) - 1)
572
573         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
574         tx_ether = tx_pkt.getlayer(Ether)
575         self.assertEqual(Ether(scapy.compat.raw(rx_srh.payload)), tx_ether)
576
577         self.logger.debug("packet verification: SUCCESS")
578
579     def create_stream(self, src_if, dst_if, packet_header, packet_sizes, count):
580         """Create SRv6 input packet stream for defined interface.
581
582         :param VppInterface src_if: Interface to create packet stream for
583         :param VppInterface dst_if: destination interface of packet stream
584         :param packet_header: Layer3 scapy packet headers,
585                 L2 is added when not provided,
586                 Raw(payload) with packet_info is added
587         :param list packet_sizes: packet stream pckt sizes,sequentially applied
588                to packets in stream have
589         :param int count: number of packets in packet stream
590         :return: list of packets
591         """
592         self.logger.info("Creating packets")
593         pkts = []
594         for i in range(0, count - 1):
595             payload_info = self.create_packet_info(src_if, dst_if)
596             self.logger.debug("Creating packet with index %d" % (payload_info.index))
597             payload = self.info_to_payload(payload_info)
598             # add L2 header if not yet provided in packet_header
599             if packet_header.getlayer(0).name == "Ethernet":
600                 p = packet_header / Raw(payload)
601             else:
602                 p = (
603                     Ether(dst=src_if.local_mac, src=src_if.remote_mac)
604                     / packet_header
605                     / Raw(payload)
606                 )
607             size = packet_sizes[i % len(packet_sizes)]
608             self.logger.debug("Packet size %d" % (size))
609             self.extend_packet(p, size)
610             # we need to store the packet with the automatic fields computed
611             # read back the dumped packet (with str())
612             # to force computing these fields
613             # probably other ways are possible
614             p = Ether(scapy.compat.raw(p))
615             payload_info.data = p.copy()
616             self.logger.debug(ppp("Created packet:", p))
617             pkts.append(p)
618         self.logger.info("Done creating packets")
619         return pkts
620
621     def send_and_verify_pkts(self, input, pkts, output, compare_func):
622         """Send packets and verify received packets using compare_func
623
624         :param input: ingress interface of DUT
625         :param pkts: list of packets to transmit
626         :param output: egress interface of DUT
627         :param compare_func: function to compare in and out packets
628         """
629         # add traffic stream to input interface
630         input.add_stream(pkts)
631
632         # enable capture on all interfaces
633         self.pg_enable_capture(self.pg_interfaces)
634
635         # start traffic
636         self.logger.info("Starting traffic")
637         self.pg_start()
638
639         # get output capture
640         self.logger.info("Getting packet capture")
641         capture = output.get_capture()
642
643         # assert nothing was captured on input interface
644         # input.assert_nothing_captured()
645
646         # verify captured packets
647         self.verify_captured_pkts(output, capture, compare_func)
648
649     def create_packet_header_IPv6(self):
650         """Create packet header: IPv6 header, UDP header
651
652         :param dst: IPv6 destination address
653
654         IPv6 source address is 1234::1
655         IPv6 destination address is 4321::1
656         UDP source port and destination port are 1234
657         """
658
659         p = IPv6(src="1234::1", dst="4321::1") / UDP(sport=1234, dport=1234)
660         return p
661
662     def create_packet_header_IPv6_SRH_IPv6(self, srcaddr, sidlist, segleft):
663         """Create packet header: IPv6 encapsulated in SRv6:
664         IPv6 header with SRH, IPv6 header, UDP header
665
666         :param int srcaddr: outer source address
667         :param list sidlist: segment list of outer IPv6 SRH
668         :param int segleft: segments-left field of outer IPv6 SRH
669
670         Outer IPv6 source address is set to srcaddr
671         Outer IPv6 destination address is set to sidlist[segleft]
672         Inner IPv6 source addresses is 1234::1
673         Inner IPv6 destination address is 4321::1
674         UDP source port and destination port are 1234
675         """
676
677         p = (
678             IPv6(src=srcaddr, dst=sidlist[segleft])
679             / IPv6ExtHdrSegmentRouting(addresses=sidlist, segleft=segleft, nh=41)
680             / IPv6(src="1234::1", dst="4321::1")
681             / UDP(sport=1234, dport=1234)
682         )
683         return p
684
685     def create_packet_header_IPv4(self):
686         """Create packet header: IPv4 header, UDP header
687
688         :param dst: IPv4 destination address
689
690         IPv4 source address is 123.1.1.1
691         IPv4 destination address is 124.1.1.1
692         UDP source port and destination port are 1234
693         """
694
695         p = IP(src="123.1.1.1", dst="124.1.1.1") / UDP(sport=1234, dport=1234)
696         return p
697
698     def create_packet_header_IPv6_SRH_IPv4(self, srcaddr, sidlist, segleft):
699         """Create packet header: IPv4 encapsulated in SRv6:
700         IPv6 header with SRH, IPv4 header, UDP header
701
702         :param int srcaddr: outer source address
703         :param list sidlist: segment list of outer IPv6 SRH
704         :param int segleft: segments-left field of outer IPv6 SRH
705
706         Outer IPv6 source address is set to srcaddr
707         Outer IPv6 destination address is set to sidlist[segleft]
708         Inner IPv4 source address is 123.1.1.1
709         Inner IPv4 destination address is 124.1.1.1
710         UDP source port and destination port are 1234
711         """
712
713         p = (
714             IPv6(src=srcaddr, dst=sidlist[segleft])
715             / IPv6ExtHdrSegmentRouting(addresses=sidlist, segleft=segleft, nh=4)
716             / IP(src="123.1.1.1", dst="124.1.1.1")
717             / UDP(sport=1234, dport=1234)
718         )
719         return p
720
721     def create_packet_header_L2(self, vlan=0):
722         """Create packet header: L2 header
723
724         :param vlan: if vlan!=0 then add 802.1q header
725         """
726         # Note: the dst addr ('00:55:44:33:22:11') is used in
727         # the compare function compare_rx_tx_packet_T_Encaps_L2
728         # to detect presence of L2 in SRH payload
729         p = Ether(src="00:11:22:33:44:55", dst="00:55:44:33:22:11")
730         etype = 0x8137  # IPX
731         if vlan:
732             # add 802.1q layer
733             p /= Dot1Q(vlan=vlan, type=etype)
734         else:
735             p.type = etype
736         return p
737
738     def create_packet_header_IPv6_SRH_L2(self, srcaddr, sidlist, segleft, vlan=0):
739         """Create packet header: L2 encapsulated in SRv6:
740         IPv6 header with SRH, L2
741
742         :param int srcaddr: IPv6 source address
743         :param list sidlist: segment list of outer IPv6 SRH
744         :param int segleft: segments-left field of outer IPv6 SRH
745         :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
746
747         IPv6 source address is set to srcaddr
748         IPv6 destination address is set to sidlist[segleft]
749         """
750         eth = Ether(src="00:11:22:33:44:55", dst="00:55:44:33:22:11")
751         etype = 0x8137  # IPX
752         if vlan:
753             # add 802.1q layer
754             eth /= Dot1Q(vlan=vlan, type=etype)
755         else:
756             eth.type = etype
757
758         p = (
759             IPv6(src=srcaddr, dst=sidlist[segleft])
760             / IPv6ExtHdrSegmentRouting(addresses=sidlist, segleft=segleft, nh=143)
761             / eth
762         )
763         return p
764
765     def get_payload_info(self, packet):
766         """Extract the payload_info from the packet"""
767         # in most cases, payload_info is in packet[Raw]
768         # but packet[Raw] gives the complete payload
769         # (incl L2 header) for the T.Encaps L2 case
770         try:
771             payload_info = self.payload_to_info(packet[Raw])
772
773         except:
774             # remote L2 header from packet[Raw]:
775             # take packet[Raw], convert it to an Ether layer
776             # and then extract Raw from it
777             payload_info = self.payload_to_info(
778                 Ether(scapy.compat.raw(packet[Raw]))[Raw]
779             )
780
781         return payload_info
782
783     def verify_captured_pkts(self, dst_if, capture, compare_func):
784         """
785         Verify captured packet stream for specified interface.
786         Compare ingress with egress packets using the specified compare fn
787
788         :param dst_if: egress interface of DUT
789         :param capture: captured packets
790         :param compare_func: function to compare in and out packet
791         """
792         self.logger.info(
793             "Verifying capture on interface %s using function %s"
794             % (dst_if.name, compare_func.__name__)
795         )
796
797         last_info = dict()
798         for i in self.pg_interfaces:
799             last_info[i.sw_if_index] = None
800         dst_sw_if_index = dst_if.sw_if_index
801
802         for packet in capture:
803             try:
804                 # extract payload_info from packet's payload
805                 payload_info = self.get_payload_info(packet)
806                 packet_index = payload_info.index
807
808                 self.logger.debug("Verifying packet with index %d" % (packet_index))
809                 # packet should have arrived on the expected interface
810                 self.assertEqual(payload_info.dst, dst_sw_if_index)
811                 self.logger.debug(
812                     "Got packet on interface %s: src=%u (idx=%u)"
813                     % (dst_if.name, payload_info.src, packet_index)
814                 )
815
816                 # search for payload_info with same src and dst if_index
817                 # this will give us the transmitted packet
818                 next_info = self.get_next_packet_info_for_interface2(
819                     payload_info.src, dst_sw_if_index, last_info[payload_info.src]
820                 )
821                 last_info[payload_info.src] = next_info
822                 # next_info should not be None
823                 self.assertTrue(next_info is not None)
824                 # index of tx and rx packets should be equal
825                 self.assertEqual(packet_index, next_info.index)
826                 # data field of next_info contains the tx packet
827                 txed_packet = next_info.data
828
829                 self.logger.debug(
830                     ppp("Transmitted packet:", txed_packet)
831                 )  # ppp=Pretty Print Packet
832
833                 self.logger.debug(ppp("Received packet:", packet))
834
835                 # compare rcvd packet with expected packet using compare_func
836                 compare_func(txed_packet, packet)
837
838             except:
839                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
840                 raise
841
842         # have all expected packets arrived?
843         for i in self.pg_interfaces:
844             remaining_packet = self.get_next_packet_info_for_interface2(
845                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index]
846             )
847             self.assertTrue(
848                 remaining_packet is None,
849                 "Interface %s: Packet expected from interface %s "
850                 "didn't arrive" % (dst_if.name, i.name),
851             )
852
853
854 if __name__ == "__main__":
855     unittest.main(testRunner=VppTestRunner)