tests: replace pycodestyle with black
[vpp.git] / test / test_srv6_ad.py
1 #!/usr/bin/env python3
2
3 import unittest
4 import binascii
5 from socket import AF_INET6
6
7 from framework import VppTestCase, VppTestRunner
8 from vpp_ip import DpoProto
9 from vpp_ip_route import VppIpRoute, VppRoutePath, VppIpTable
10 from vpp_srv6 import (
11     SRv6LocalSIDBehaviors,
12     VppSRv6LocalSID,
13     VppSRv6Policy,
14     SRv6PolicyType,
15     VppSRv6Steering,
16     SRv6PolicySteeringTypes,
17 )
18
19 import scapy.compat
20 from scapy.packet import Raw
21 from scapy.layers.l2 import Ether, Dot1Q
22 from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
23 from scapy.layers.inet import IP, UDP
24
25 from util import ppp
26
27
28 class TestSRv6Ad(VppTestCase):
29     """SRv6 Dynamic Proxy plugin Test Case"""
30
31     @classmethod
32     def setUpClass(self):
33         super(TestSRv6Ad, self).setUpClass()
34
35     @classmethod
36     def tearDownClass(cls):
37         super(TestSRv6Ad, cls).tearDownClass()
38
39     def setUp(self):
40         """Perform test setup before each test case."""
41         super(TestSRv6Ad, self).setUp()
42
43         # packet sizes, inclusive L2 overhead
44         self.pg_packet_sizes = [64, 512, 1518, 9018]
45
46         # reset packet_infos
47         self.reset_packet_infos()
48
49     def tearDown(self):
50         """Clean up test setup after each test case."""
51         self.teardown_interfaces()
52
53         super(TestSRv6Ad, self).tearDown()
54
55     def configure_interface(
56         self, interface, ipv6=False, ipv4=False, ipv6_table_id=0, ipv4_table_id=0
57     ):
58         """Configure interface.
59         :param ipv6: configure IPv6 on interface
60         :param ipv4: configure IPv4 on interface
61         :param ipv6_table_id: FIB table_id for IPv6
62         :param ipv4_table_id: FIB table_id for IPv4
63         """
64         self.logger.debug("Configuring interface %s" % (interface.name))
65         if ipv6:
66             self.logger.debug("Configuring IPv6")
67             interface.set_table_ip6(ipv6_table_id)
68             interface.config_ip6()
69             interface.resolve_ndp(timeout=5)
70         if ipv4:
71             self.logger.debug("Configuring IPv4")
72             interface.set_table_ip4(ipv4_table_id)
73             interface.config_ip4()
74             interface.resolve_arp()
75         interface.admin_up()
76
77     def setup_interfaces(self, ipv6=[], ipv4=[], ipv6_table_id=[], ipv4_table_id=[]):
78         """Create and configure interfaces.
79
80         :param ipv6: list of interface IPv6 capabilities
81         :param ipv4: list of interface IPv4 capabilities
82         :param ipv6_table_id: list of intf IPv6 FIB table_ids
83         :param ipv4_table_id: list of intf IPv4 FIB table_ids
84         :returns: List of created interfaces.
85         """
86         # how many interfaces?
87         if len(ipv6):
88             count = len(ipv6)
89         else:
90             count = len(ipv4)
91         self.logger.debug("Creating and configuring %d interfaces" % (count))
92
93         # fill up ipv6 and ipv4 lists if needed
94         # not enabled (False) is the default
95         if len(ipv6) < count:
96             ipv6 += (count - len(ipv6)) * [False]
97         if len(ipv4) < count:
98             ipv4 += (count - len(ipv4)) * [False]
99
100         # fill up table_id lists if needed
101         # table_id 0 (global) is the default
102         if len(ipv6_table_id) < count:
103             ipv6_table_id += (count - len(ipv6_table_id)) * [0]
104         if len(ipv4_table_id) < count:
105             ipv4_table_id += (count - len(ipv4_table_id)) * [0]
106
107         # create 'count' pg interfaces
108         self.create_pg_interfaces(range(count))
109
110         # setup all interfaces
111         for i in range(count):
112             intf = self.pg_interfaces[i]
113             self.configure_interface(
114                 intf, ipv6[i], ipv4[i], ipv6_table_id[i], ipv4_table_id[i]
115             )
116
117         if any(ipv6):
118             self.logger.debug(self.vapi.cli("show ip6 neighbors"))
119         if any(ipv4):
120             self.logger.debug(self.vapi.cli("show ip4 neighbors"))
121         self.logger.debug(self.vapi.cli("show interface"))
122         self.logger.debug(self.vapi.cli("show hardware"))
123
124         return self.pg_interfaces
125
126     def teardown_interfaces(self):
127         """Unconfigure and bring down interface."""
128         self.logger.debug("Tearing down interfaces")
129         # tear down all interfaces
130         # AFAIK they cannot be deleted
131         for i in self.pg_interfaces:
132             self.logger.debug("Tear down interface %s" % (i.name))
133             i.admin_down()
134             i.unconfig()
135             i.set_table_ip4(0)
136             i.set_table_ip6(0)
137
138     def test_SRv6_End_AD_IPv6(self):
139         """Test SRv6 End.AD behavior with IPv6 traffic."""
140         self.src_addr = "a0::"
141         self.sid_list = ["a1::", "a2::a6", "a3::"]
142         self.test_sid_index = 1
143
144         # send traffic to one destination interface
145         # source and destination interfaces are IPv6 only
146         self.setup_interfaces(ipv6=[True, True])
147
148         # configure route to next segment
149         route = VppIpRoute(
150             self,
151             self.sid_list[self.test_sid_index + 1],
152             128,
153             [
154                 VppRoutePath(
155                     self.pg0.remote_ip6,
156                     self.pg0.sw_if_index,
157                     proto=DpoProto.DPO_PROTO_IP6,
158                 )
159             ],
160         )
161         route.add_vpp_config()
162
163         # configure SRv6 localSID behavior
164         cli_str = (
165             "sr localsid address "
166             + self.sid_list[self.test_sid_index]
167             + " behavior end.ad"
168             + " nh "
169             + self.pg1.remote_ip6
170             + " oif "
171             + self.pg1.name
172             + " iif "
173             + self.pg1.name
174         )
175         self.vapi.cli(cli_str)
176
177         # log the localsids
178         self.logger.debug(self.vapi.cli("show sr localsid"))
179
180         # send one packet per packet size
181         count = len(self.pg_packet_sizes)
182
183         # prepare IPv6 in SRv6 headers
184         packet_header1 = self.create_packet_header_IPv6_SRH_IPv6(
185             srcaddr=self.src_addr,
186             sidlist=self.sid_list[::-1],
187             segleft=len(self.sid_list) - self.test_sid_index - 1,
188         )
189
190         # generate packets (pg0->pg1)
191         pkts1 = self.create_stream(
192             self.pg0, self.pg1, packet_header1, self.pg_packet_sizes, count
193         )
194
195         # send packets and verify received packets
196         self.send_and_verify_pkts(
197             self.pg0, pkts1, self.pg1, self.compare_rx_tx_packet_End_AD_IPv6_out
198         )
199
200         # log the localsid counters
201         self.logger.info(self.vapi.cli("show sr localsid"))
202
203         # prepare IPv6 header for returning packets
204         packet_header2 = self.create_packet_header_IPv6()
205
206         # generate returning packets (pg1->pg0)
207         pkts2 = self.create_stream(
208             self.pg1, self.pg0, packet_header2, self.pg_packet_sizes, count
209         )
210
211         # send packets and verify received packets
212         self.send_and_verify_pkts(
213             self.pg1, pkts2, self.pg0, self.compare_rx_tx_packet_End_AD_IPv6_in
214         )
215
216         # log the localsid counters
217         self.logger.info(self.vapi.cli("show sr localsid"))
218
219         # remove SRv6 localSIDs
220         cli_str = "sr localsid del address " + self.sid_list[self.test_sid_index]
221         self.vapi.cli(cli_str)
222
223         # cleanup interfaces
224         self.teardown_interfaces()
225
226     def compare_rx_tx_packet_End_AD_IPv6_out(self, tx_pkt, rx_pkt):
227         """Compare input and output packet after passing End.AD with IPv6
228
229         :param tx_pkt: transmitted packet
230         :param rx_pkt: received packet
231         """
232
233         # get first (outer) IPv6 header of rx'ed packet
234         rx_ip = rx_pkt.getlayer(IPv6)
235
236         tx_ip = tx_pkt.getlayer(IPv6)
237         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
238
239         # verify if rx'ed packet has no SRH
240         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
241
242         # the whole rx_ip pkt should be equal to tx_ip2
243         # except for the hlim field
244         #   -> adjust tx'ed hlim to expected hlim
245         tx_ip2.hlim = tx_ip2.hlim - 1
246
247         self.assertEqual(rx_ip, tx_ip2)
248
249         self.logger.debug("packet verification: SUCCESS")
250
251     def compare_rx_tx_packet_End_AD_IPv6_in(self, tx_pkt, rx_pkt):
252         """Compare input and output packet after passing End.AD
253
254         :param tx_pkt: transmitted packet
255         :param rx_pkt: received packet
256         """
257
258         # get first (outer) IPv6 header of rx'ed packet
259         rx_ip = rx_pkt.getlayer(IPv6)
260         # received ip.src should be equal to SR Policy source
261         self.assertEqual(rx_ip.src, self.src_addr)
262         # received ip.dst should be equal to expected sidlist next segment
263         self.assertEqual(rx_ip.dst, self.sid_list[self.test_sid_index + 1])
264
265         # rx'ed packet should have SRH
266         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
267
268         # get SRH
269         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
270         # rx'ed seglist should be equal to SID-list in reversed order
271         self.assertEqual(rx_srh.addresses, self.sid_list[::-1])
272         # segleft should be equal to previous segleft value minus 1
273         self.assertEqual(rx_srh.segleft, len(self.sid_list) - self.test_sid_index - 2)
274         # lastentry should be equal to the SID-list length minus 1
275         self.assertEqual(rx_srh.lastentry, len(self.sid_list) - 1)
276
277         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
278         # except for the hop-limit field
279         tx_ip = tx_pkt.getlayer(IPv6)
280         #   -> update tx'ed hlim to the expected hlim
281         tx_ip.hlim -= 1
282         #   -> check payload
283         self.assertEqual(rx_srh.payload, tx_ip)
284
285         self.logger.debug("packet verification: SUCCESS")
286
287     def test_SRv6_End_AD_IPv4(self):
288         """Test SRv6 End.AD behavior with IPv4 traffic."""
289         self.src_addr = "a0::"
290         self.sid_list = ["a1::", "a2::a4", "a3::"]
291         self.test_sid_index = 1
292
293         # send traffic to one destination interface
294         # source and destination interfaces are IPv6 only
295         self.setup_interfaces(ipv6=[True, False], ipv4=[False, True])
296
297         # configure route to next segment
298         route = VppIpRoute(
299             self,
300             self.sid_list[self.test_sid_index + 1],
301             128,
302             [
303                 VppRoutePath(
304                     self.pg0.remote_ip6,
305                     self.pg0.sw_if_index,
306                     proto=DpoProto.DPO_PROTO_IP6,
307                 )
308             ],
309         )
310         route.add_vpp_config()
311
312         # configure SRv6 localSID behavior
313         cli_str = (
314             "sr localsid address "
315             + self.sid_list[self.test_sid_index]
316             + " behavior end.ad"
317             + " nh "
318             + self.pg1.remote_ip4
319             + " oif "
320             + self.pg1.name
321             + " iif "
322             + self.pg1.name
323         )
324         self.vapi.cli(cli_str)
325
326         # log the localsids
327         self.logger.debug(self.vapi.cli("show sr localsid"))
328
329         # send one packet per packet size
330         count = len(self.pg_packet_sizes)
331
332         # prepare IPv4 in SRv6 headers
333         packet_header1 = self.create_packet_header_IPv6_SRH_IPv4(
334             srcaddr=self.src_addr,
335             sidlist=self.sid_list[::-1],
336             segleft=len(self.sid_list) - self.test_sid_index - 1,
337         )
338
339         # generate packets (pg0->pg1)
340         pkts1 = self.create_stream(
341             self.pg0, self.pg1, packet_header1, self.pg_packet_sizes, count
342         )
343
344         # send packets and verify received packets
345         self.send_and_verify_pkts(
346             self.pg0, pkts1, self.pg1, self.compare_rx_tx_packet_End_AD_IPv4_out
347         )
348
349         # log the localsid counters
350         self.logger.info(self.vapi.cli("show sr localsid"))
351
352         # prepare IPv6 header for returning packets
353         packet_header2 = self.create_packet_header_IPv4()
354
355         # generate returning packets (pg1->pg0)
356         pkts2 = self.create_stream(
357             self.pg1, self.pg0, packet_header2, self.pg_packet_sizes, count
358         )
359
360         # send packets and verify received packets
361         self.send_and_verify_pkts(
362             self.pg1, pkts2, self.pg0, self.compare_rx_tx_packet_End_AD_IPv4_in
363         )
364
365         # log the localsid counters
366         self.logger.info(self.vapi.cli("show sr localsid"))
367
368         # remove SRv6 localSIDs
369         cli_str = "sr localsid del address " + self.sid_list[self.test_sid_index]
370         self.vapi.cli(cli_str)
371
372         # cleanup interfaces
373         self.teardown_interfaces()
374
375     def compare_rx_tx_packet_End_AD_IPv4_out(self, tx_pkt, rx_pkt):
376         """Compare input and output packet after passing End.AD with IPv4
377
378         :param tx_pkt: transmitted packet
379         :param rx_pkt: received packet
380         """
381
382         # get IPv4 header of rx'ed packet
383         rx_ip = rx_pkt.getlayer(IP)
384
385         tx_ip = tx_pkt.getlayer(IPv6)
386         tx_ip2 = tx_pkt.getlayer(IP)
387
388         # verify if rx'ed packet has no SRH
389         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
390
391         # the whole rx_ip pkt should be equal to tx_ip2
392         # except for the ttl field and ip checksum
393         #   -> adjust tx'ed ttl to expected ttl
394         tx_ip2.ttl = tx_ip2.ttl - 1
395         #   -> set tx'ed ip checksum to None and let scapy recompute
396         tx_ip2.chksum = None
397         # read back the pkt (with str()) to force computing these fields
398         # probably other ways to accomplish this are possible
399         tx_ip2 = IP(scapy.compat.raw(tx_ip2))
400
401         self.assertEqual(rx_ip, tx_ip2)
402
403         self.logger.debug("packet verification: SUCCESS")
404
405     def compare_rx_tx_packet_End_AD_IPv4_in(self, tx_pkt, rx_pkt):
406         """Compare input and output packet after passing End.AD
407
408         :param tx_pkt: transmitted packet
409         :param rx_pkt: received packet
410         """
411
412         # get first (outer) IPv6 header of rx'ed packet
413         rx_ip = rx_pkt.getlayer(IPv6)
414         # received ip.src should be equal to SR Policy source
415         self.assertEqual(rx_ip.src, self.src_addr)
416         # received ip.dst should be equal to expected sidlist next segment
417         self.assertEqual(rx_ip.dst, self.sid_list[self.test_sid_index + 1])
418
419         # rx'ed packet should have SRH
420         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
421
422         # get SRH
423         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
424         # rx'ed seglist should be equal to SID-list in reversed order
425         self.assertEqual(rx_srh.addresses, self.sid_list[::-1])
426         # segleft should be equal to previous segleft value minus 1
427         self.assertEqual(rx_srh.segleft, len(self.sid_list) - self.test_sid_index - 2)
428         # lastentry should be equal to the SID-list length minus 1
429         self.assertEqual(rx_srh.lastentry, len(self.sid_list) - 1)
430
431         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
432         # except for the ttl field and ip checksum
433         tx_ip = tx_pkt.getlayer(IP)
434         #   -> adjust tx'ed ttl to expected ttl
435         tx_ip.ttl = tx_ip.ttl - 1
436         #   -> set tx'ed ip checksum to None and let scapy recompute
437         tx_ip.chksum = None
438         #   -> read back the pkt (with str()) to force computing these fields
439         # probably other ways to accomplish this are possible
440         self.assertEqual(rx_srh.payload, IP(scapy.compat.raw(tx_ip)))
441
442         self.logger.debug("packet verification: SUCCESS")
443
444     def test_SRv6_End_AD_L2(self):
445         """Test SRv6 End.AD behavior with L2 traffic."""
446         self.src_addr = "a0::"
447         self.sid_list = ["a1::", "a2::a4", "a3::"]
448         self.test_sid_index = 1
449
450         # send traffic to one destination interface
451         # source and destination interfaces are IPv6 only
452         self.setup_interfaces(ipv6=[True, False])
453
454         # configure route to next segment
455         route = VppIpRoute(
456             self,
457             self.sid_list[self.test_sid_index + 1],
458             128,
459             [
460                 VppRoutePath(
461                     self.pg0.remote_ip6,
462                     self.pg0.sw_if_index,
463                     proto=DpoProto.DPO_PROTO_IP6,
464                 )
465             ],
466         )
467         route.add_vpp_config()
468
469         # configure SRv6 localSID behavior
470         cli_str = (
471             "sr localsid address "
472             + self.sid_list[self.test_sid_index]
473             + " behavior end.ad"
474             + " oif "
475             + self.pg1.name
476             + " iif "
477             + self.pg1.name
478         )
479         self.vapi.cli(cli_str)
480
481         # log the localsids
482         self.logger.debug(self.vapi.cli("show sr localsid"))
483
484         # send one packet per packet size
485         count = len(self.pg_packet_sizes)
486
487         # prepare L2 in SRv6 headers
488         packet_header1 = self.create_packet_header_IPv6_SRH_L2(
489             srcaddr=self.src_addr,
490             sidlist=self.sid_list[::-1],
491             segleft=len(self.sid_list) - self.test_sid_index - 1,
492             vlan=0,
493         )
494
495         # generate packets (pg0->pg1)
496         pkts1 = self.create_stream(
497             self.pg0, self.pg1, packet_header1, self.pg_packet_sizes, count
498         )
499
500         # send packets and verify received packets
501         self.send_and_verify_pkts(
502             self.pg0, pkts1, self.pg1, self.compare_rx_tx_packet_End_AD_L2_out
503         )
504
505         # log the localsid counters
506         self.logger.info(self.vapi.cli("show sr localsid"))
507
508         # prepare L2 header for returning packets
509         packet_header2 = self.create_packet_header_L2()
510
511         # generate returning packets (pg1->pg0)
512         pkts2 = self.create_stream(
513             self.pg1, self.pg0, packet_header2, self.pg_packet_sizes, count
514         )
515
516         # send packets and verify received packets
517         self.send_and_verify_pkts(
518             self.pg1, pkts2, self.pg0, self.compare_rx_tx_packet_End_AD_L2_in
519         )
520
521         # log the localsid counters
522         self.logger.info(self.vapi.cli("show sr localsid"))
523
524         # remove SRv6 localSIDs
525         cli_str = "sr localsid del address " + self.sid_list[self.test_sid_index]
526         self.vapi.cli(cli_str)
527
528         # cleanup interfaces
529         self.teardown_interfaces()
530
531     def compare_rx_tx_packet_End_AD_L2_out(self, tx_pkt, rx_pkt):
532         """Compare input and output packet after passing End.AD with L2
533
534         :param tx_pkt: transmitted packet
535         :param rx_pkt: received packet
536         """
537
538         # get IPv4 header of rx'ed packet
539         rx_eth = rx_pkt.getlayer(Ether)
540
541         tx_ip = tx_pkt.getlayer(IPv6)
542         # we can't just get the 2nd Ether layer
543         # get the Raw content and dissect it as Ether
544         tx_eth1 = Ether(scapy.compat.raw(tx_pkt[Raw]))
545
546         # verify if rx'ed packet has no SRH
547         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
548
549         # the whole rx_eth pkt should be equal to tx_eth1
550         self.assertEqual(rx_eth, tx_eth1)
551
552         self.logger.debug("packet verification: SUCCESS")
553
554     def compare_rx_tx_packet_End_AD_L2_in(self, tx_pkt, rx_pkt):
555         """Compare input and output packet after passing End.AD
556
557         :param tx_pkt: transmitted packet
558         :param rx_pkt: received packet
559         """
560
561         ####
562         # get first (outer) IPv6 header of rx'ed packet
563         rx_ip = rx_pkt.getlayer(IPv6)
564         # received ip.src should be equal to SR Policy source
565         self.assertEqual(rx_ip.src, self.src_addr)
566         # received ip.dst should be equal to expected sidlist next segment
567         self.assertEqual(rx_ip.dst, self.sid_list[self.test_sid_index + 1])
568
569         # rx'ed packet should have SRH
570         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
571
572         # get SRH
573         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
574         # rx'ed seglist should be equal to SID-list in reversed order
575         self.assertEqual(rx_srh.addresses, self.sid_list[::-1])
576         # segleft should be equal to previous segleft value minus 1
577         self.assertEqual(rx_srh.segleft, len(self.sid_list) - self.test_sid_index - 2)
578         # lastentry should be equal to the SID-list length minus 1
579         self.assertEqual(rx_srh.lastentry, len(self.sid_list) - 1)
580
581         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
582         tx_ether = tx_pkt.getlayer(Ether)
583         self.assertEqual(Ether(scapy.compat.raw(rx_srh.payload)), tx_ether)
584
585         self.logger.debug("packet verification: SUCCESS")
586
587     def create_stream(self, src_if, dst_if, packet_header, packet_sizes, count):
588         """Create SRv6 input packet stream for defined interface.
589
590         :param VppInterface src_if: Interface to create packet stream for
591         :param VppInterface dst_if: destination interface of packet stream
592         :param packet_header: Layer3 scapy packet headers,
593                 L2 is added when not provided,
594                 Raw(payload) with packet_info is added
595         :param list packet_sizes: packet stream pckt sizes,sequentially applied
596                to packets in stream have
597         :param int count: number of packets in packet stream
598         :return: list of packets
599         """
600         self.logger.info("Creating packets")
601         pkts = []
602         for i in range(0, count - 1):
603             payload_info = self.create_packet_info(src_if, dst_if)
604             self.logger.debug("Creating packet with index %d" % (payload_info.index))
605             payload = self.info_to_payload(payload_info)
606             # add L2 header if not yet provided in packet_header
607             if packet_header.getlayer(0).name == "Ethernet":
608                 p = packet_header / Raw(payload)
609             else:
610                 p = (
611                     Ether(dst=src_if.local_mac, src=src_if.remote_mac)
612                     / packet_header
613                     / Raw(payload)
614                 )
615             size = packet_sizes[i % len(packet_sizes)]
616             self.logger.debug("Packet size %d" % (size))
617             self.extend_packet(p, size)
618             # we need to store the packet with the automatic fields computed
619             # read back the dumped packet (with str())
620             # to force computing these fields
621             # probably other ways are possible
622             p = Ether(scapy.compat.raw(p))
623             payload_info.data = p.copy()
624             self.logger.debug(ppp("Created packet:", p))
625             pkts.append(p)
626         self.logger.info("Done creating packets")
627         return pkts
628
629     def send_and_verify_pkts(self, input, pkts, output, compare_func):
630         """Send packets and verify received packets using compare_func
631
632         :param input: ingress interface of DUT
633         :param pkts: list of packets to transmit
634         :param output: egress interface of DUT
635         :param compare_func: function to compare in and out packets
636         """
637         # add traffic stream to input interface
638         input.add_stream(pkts)
639
640         # enable capture on all interfaces
641         self.pg_enable_capture(self.pg_interfaces)
642
643         # start traffic
644         self.logger.info("Starting traffic")
645         self.pg_start()
646
647         # get output capture
648         self.logger.info("Getting packet capture")
649         capture = output.get_capture()
650
651         # assert nothing was captured on input interface
652         # input.assert_nothing_captured()
653
654         # verify captured packets
655         self.verify_captured_pkts(output, capture, compare_func)
656
657     def create_packet_header_IPv6(self):
658         """Create packet header: IPv6 header, UDP header
659
660         :param dst: IPv6 destination address
661
662         IPv6 source address is 1234::1
663         IPv6 destination address is 4321::1
664         UDP source port and destination port are 1234
665         """
666
667         p = IPv6(src="1234::1", dst="4321::1") / UDP(sport=1234, dport=1234)
668         return p
669
670     def create_packet_header_IPv6_SRH_IPv6(self, srcaddr, sidlist, segleft):
671         """Create packet header: IPv6 encapsulated in SRv6:
672         IPv6 header with SRH, IPv6 header, UDP header
673
674         :param int srcaddr: outer source address
675         :param list sidlist: segment list of outer IPv6 SRH
676         :param int segleft: segments-left field of outer IPv6 SRH
677
678         Outer IPv6 source address is set to srcaddr
679         Outer IPv6 destination address is set to sidlist[segleft]
680         Inner IPv6 source addresses is 1234::1
681         Inner IPv6 destination address is 4321::1
682         UDP source port and destination port are 1234
683         """
684
685         p = (
686             IPv6(src=srcaddr, dst=sidlist[segleft])
687             / IPv6ExtHdrSegmentRouting(addresses=sidlist, segleft=segleft, nh=41)
688             / IPv6(src="1234::1", dst="4321::1")
689             / UDP(sport=1234, dport=1234)
690         )
691         return p
692
693     def create_packet_header_IPv4(self):
694         """Create packet header: IPv4 header, UDP header
695
696         :param dst: IPv4 destination address
697
698         IPv4 source address is 123.1.1.1
699         IPv4 destination address is 124.1.1.1
700         UDP source port and destination port are 1234
701         """
702
703         p = IP(src="123.1.1.1", dst="124.1.1.1") / UDP(sport=1234, dport=1234)
704         return p
705
706     def create_packet_header_IPv6_SRH_IPv4(self, srcaddr, sidlist, segleft):
707         """Create packet header: IPv4 encapsulated in SRv6:
708         IPv6 header with SRH, IPv4 header, UDP header
709
710         :param int srcaddr: outer source address
711         :param list sidlist: segment list of outer IPv6 SRH
712         :param int segleft: segments-left field of outer IPv6 SRH
713
714         Outer IPv6 source address is set to srcaddr
715         Outer IPv6 destination address is set to sidlist[segleft]
716         Inner IPv4 source address is 123.1.1.1
717         Inner IPv4 destination address is 124.1.1.1
718         UDP source port and destination port are 1234
719         """
720
721         p = (
722             IPv6(src=srcaddr, dst=sidlist[segleft])
723             / IPv6ExtHdrSegmentRouting(addresses=sidlist, segleft=segleft, nh=4)
724             / IP(src="123.1.1.1", dst="124.1.1.1")
725             / UDP(sport=1234, dport=1234)
726         )
727         return p
728
729     def create_packet_header_L2(self, vlan=0):
730         """Create packet header: L2 header
731
732         :param vlan: if vlan!=0 then add 802.1q header
733         """
734         # Note: the dst addr ('00:55:44:33:22:11') is used in
735         # the compare function compare_rx_tx_packet_T_Encaps_L2
736         # to detect presence of L2 in SRH payload
737         p = Ether(src="00:11:22:33:44:55", dst="00:55:44:33:22:11")
738         etype = 0x8137  # IPX
739         if vlan:
740             # add 802.1q layer
741             p /= Dot1Q(vlan=vlan, type=etype)
742         else:
743             p.type = etype
744         return p
745
746     def create_packet_header_IPv6_SRH_L2(self, srcaddr, sidlist, segleft, vlan=0):
747         """Create packet header: L2 encapsulated in SRv6:
748         IPv6 header with SRH, L2
749
750         :param int srcaddr: IPv6 source address
751         :param list sidlist: segment list of outer IPv6 SRH
752         :param int segleft: segments-left field of outer IPv6 SRH
753         :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
754
755         IPv6 source address is set to srcaddr
756         IPv6 destination address is set to sidlist[segleft]
757         """
758         eth = Ether(src="00:11:22:33:44:55", dst="00:55:44:33:22:11")
759         etype = 0x8137  # IPX
760         if vlan:
761             # add 802.1q layer
762             eth /= Dot1Q(vlan=vlan, type=etype)
763         else:
764             eth.type = etype
765
766         p = (
767             IPv6(src=srcaddr, dst=sidlist[segleft])
768             / IPv6ExtHdrSegmentRouting(addresses=sidlist, segleft=segleft, nh=143)
769             / eth
770         )
771         return p
772
773     def get_payload_info(self, packet):
774         """Extract the payload_info from the packet"""
775         # in most cases, payload_info is in packet[Raw]
776         # but packet[Raw] gives the complete payload
777         # (incl L2 header) for the T.Encaps L2 case
778         try:
779             payload_info = self.payload_to_info(packet[Raw])
780
781         except:
782             # remote L2 header from packet[Raw]:
783             # take packet[Raw], convert it to an Ether layer
784             # and then extract Raw from it
785             payload_info = self.payload_to_info(
786                 Ether(scapy.compat.raw(packet[Raw]))[Raw]
787             )
788
789         return payload_info
790
791     def verify_captured_pkts(self, dst_if, capture, compare_func):
792         """
793         Verify captured packet stream for specified interface.
794         Compare ingress with egress packets using the specified compare fn
795
796         :param dst_if: egress interface of DUT
797         :param capture: captured packets
798         :param compare_func: function to compare in and out packet
799         """
800         self.logger.info(
801             "Verifying capture on interface %s using function %s"
802             % (dst_if.name, compare_func.__name__)
803         )
804
805         last_info = dict()
806         for i in self.pg_interfaces:
807             last_info[i.sw_if_index] = None
808         dst_sw_if_index = dst_if.sw_if_index
809
810         for packet in capture:
811             try:
812                 # extract payload_info from packet's payload
813                 payload_info = self.get_payload_info(packet)
814                 packet_index = payload_info.index
815
816                 self.logger.debug("Verifying packet with index %d" % (packet_index))
817                 # packet should have arrived on the expected interface
818                 self.assertEqual(payload_info.dst, dst_sw_if_index)
819                 self.logger.debug(
820                     "Got packet on interface %s: src=%u (idx=%u)"
821                     % (dst_if.name, payload_info.src, packet_index)
822                 )
823
824                 # search for payload_info with same src and dst if_index
825                 # this will give us the transmitted packet
826                 next_info = self.get_next_packet_info_for_interface2(
827                     payload_info.src, dst_sw_if_index, last_info[payload_info.src]
828                 )
829                 last_info[payload_info.src] = next_info
830                 # next_info should not be None
831                 self.assertTrue(next_info is not None)
832                 # index of tx and rx packets should be equal
833                 self.assertEqual(packet_index, next_info.index)
834                 # data field of next_info contains the tx packet
835                 txed_packet = next_info.data
836
837                 self.logger.debug(
838                     ppp("Transmitted packet:", txed_packet)
839                 )  # ppp=Pretty Print Packet
840
841                 self.logger.debug(ppp("Received packet:", packet))
842
843                 # compare rcvd packet with expected packet using compare_func
844                 compare_func(txed_packet, packet)
845
846             except:
847                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
848                 raise
849
850         # have all expected packets arrived?
851         for i in self.pg_interfaces:
852             remaining_packet = self.get_next_packet_info_for_interface2(
853                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index]
854             )
855             self.assertTrue(
856                 remaining_packet is None,
857                 "Interface %s: Packet expected from interface %s "
858                 "didn't arrive" % (dst_if.name, i.name),
859             )
860
861
862 if __name__ == "__main__":
863     unittest.main(testRunner=VppTestRunner)