nat: improve nat44-ed outside address distribution
[vpp.git] / test / test_srv6.py
1 #!/usr/bin/env python3
2
3 import unittest
4 import binascii
5 from socket import AF_INET6
6
7 from framework import VppTestCase, VppTestRunner
8 from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathProto, VppIpTable
9 from vpp_srv6 import (
10     SRv6LocalSIDBehaviors,
11     VppSRv6LocalSID,
12     VppSRv6Policy,
13     VppSRv6PolicyV2,
14     SRv6PolicyType,
15     VppSRv6Steering,
16     SRv6PolicySteeringTypes,
17 )
18
19 import scapy.compat
20 from scapy.packet import Raw
21 from scapy.layers.l2 import Ether, Dot1Q
22 from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
23 from scapy.layers.inet import IP, UDP
24
25 from util import ppp
26
27
28 class TestSRv6(VppTestCase):
29     """SRv6 Test Case"""
30
31     @classmethod
32     def setUpClass(cls):
33         super(TestSRv6, cls).setUpClass()
34
35     @classmethod
36     def tearDownClass(cls):
37         super(TestSRv6, cls).tearDownClass()
38
39     def setUp(self):
40         """Perform test setup before each test case."""
41         super(TestSRv6, self).setUp()
42
43         # packet sizes, inclusive L2 overhead
44         self.pg_packet_sizes = [64, 512, 1518, 9018]
45
46         # reset packet_infos
47         self.reset_packet_infos()
48
49     def tearDown(self):
50         """Clean up test setup after each test case."""
51         self.teardown_interfaces()
52
53         super(TestSRv6, self).tearDown()
54
55     def configure_interface(
56         self, interface, ipv6=False, ipv4=False, ipv6_table_id=0, ipv4_table_id=0
57     ):
58         """Configure interface.
59         :param ipv6: configure IPv6 on interface
60         :param ipv4: configure IPv4 on interface
61         :param ipv6_table_id: FIB table_id for IPv6
62         :param ipv4_table_id: FIB table_id for IPv4
63         """
64         self.logger.debug("Configuring interface %s" % (interface.name))
65         if ipv6:
66             self.logger.debug("Configuring IPv6")
67             interface.set_table_ip6(ipv6_table_id)
68             interface.config_ip6()
69             interface.resolve_ndp(timeout=5)
70         if ipv4:
71             self.logger.debug("Configuring IPv4")
72             interface.set_table_ip4(ipv4_table_id)
73             interface.config_ip4()
74             interface.resolve_arp()
75         interface.admin_up()
76
77     def setup_interfaces(self, ipv6=[], ipv4=[], ipv6_table_id=[], ipv4_table_id=[]):
78         """Create and configure interfaces.
79
80         :param ipv6: list of interface IPv6 capabilities
81         :param ipv4: list of interface IPv4 capabilities
82         :param ipv6_table_id: list of intf IPv6 FIB table_ids
83         :param ipv4_table_id: list of intf IPv4 FIB table_ids
84         :returns: List of created interfaces.
85         """
86         # how many interfaces?
87         if len(ipv6):
88             count = len(ipv6)
89         else:
90             count = len(ipv4)
91         self.logger.debug("Creating and configuring %d interfaces" % (count))
92
93         # fill up ipv6 and ipv4 lists if needed
94         # not enabled (False) is the default
95         if len(ipv6) < count:
96             ipv6 += (count - len(ipv6)) * [False]
97         if len(ipv4) < count:
98             ipv4 += (count - len(ipv4)) * [False]
99
100         # fill up table_id lists if needed
101         # table_id 0 (global) is the default
102         if len(ipv6_table_id) < count:
103             ipv6_table_id += (count - len(ipv6_table_id)) * [0]
104         if len(ipv4_table_id) < count:
105             ipv4_table_id += (count - len(ipv4_table_id)) * [0]
106
107         # create 'count' pg interfaces
108         self.create_pg_interfaces(range(count))
109
110         # setup all interfaces
111         for i in range(count):
112             intf = self.pg_interfaces[i]
113             self.configure_interface(
114                 intf, ipv6[i], ipv4[i], ipv6_table_id[i], ipv4_table_id[i]
115             )
116
117         if any(ipv6):
118             self.logger.debug(self.vapi.cli("show ip6 neighbors"))
119         if any(ipv4):
120             self.logger.debug(self.vapi.cli("show ip4 neighbors"))
121         self.logger.debug(self.vapi.cli("show interface"))
122         self.logger.debug(self.vapi.cli("show hardware"))
123
124         return self.pg_interfaces
125
126     def teardown_interfaces(self):
127         """Unconfigure and bring down interface."""
128         self.logger.debug("Tearing down interfaces")
129         # tear down all interfaces
130         # AFAIK they cannot be deleted
131         for i in self.pg_interfaces:
132             self.logger.debug("Tear down interface %s" % (i.name))
133             i.admin_down()
134             i.unconfig()
135             i.set_table_ip4(0)
136             i.set_table_ip6(0)
137
138     @unittest.skipUnless(0, "PC to fix")
139     def test_SRv6_T_Encaps(self):
140         """Test SRv6 Transit.Encaps behavior for IPv6."""
141         # send traffic to one destination interface
142         # source and destination are IPv6 only
143         self.setup_interfaces(ipv6=[True, True])
144
145         # configure FIB entries
146         route = VppIpRoute(
147             self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
148         )
149         route.add_vpp_config()
150
151         # configure encaps IPv6 source address
152         # needs to be done before SR Policy config
153         # TODO: API?
154         self.vapi.cli("set sr encaps source addr a3::")
155
156         bsid = "a3::9999:1"
157         # configure SRv6 Policy
158         # Note: segment list order: first -> last
159         sr_policy = VppSRv6Policy(
160             self,
161             bsid=bsid,
162             is_encap=1,
163             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
164             weight=1,
165             fib_table=0,
166             segments=["a4::", "a5::", "a6::c7"],
167             source="a3::",
168         )
169         sr_policy.add_vpp_config()
170         self.sr_policy = sr_policy
171
172         # log the sr policies
173         self.logger.info(self.vapi.cli("show sr policies"))
174
175         # steer IPv6 traffic to a7::/64 into SRv6 Policy
176         # use the bsid of the above self.sr_policy
177         pol_steering = VppSRv6Steering(
178             self,
179             bsid=self.sr_policy.bsid,
180             prefix="a7::",
181             mask_width=64,
182             traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
183             sr_policy_index=0,
184             table_id=0,
185             sw_if_index=0,
186         )
187         pol_steering.add_vpp_config()
188
189         # log the sr steering policies
190         self.logger.info(self.vapi.cli("show sr steering policies"))
191
192         # create packets
193         count = len(self.pg_packet_sizes)
194         dst_inner = "a7::1234"
195         pkts = []
196
197         # create IPv6 packets without SRH
198         packet_header = self.create_packet_header_IPv6(dst_inner)
199         # create traffic stream pg0->pg1
200         pkts.extend(
201             self.create_stream(
202                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
203             )
204         )
205
206         # create IPv6 packets with SRH
207         # packets with segments-left 1, active segment a7::
208         packet_header = self.create_packet_header_IPv6_SRH(
209             sidlist=["a8::", "a7::", "a6::"], segleft=1
210         )
211         # create traffic stream pg0->pg1
212         pkts.extend(
213             self.create_stream(
214                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
215             )
216         )
217
218         # create IPv6 packets with SRH and IPv6
219         # packets with segments-left 1, active segment a7::
220         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
221             dst_inner, sidlist=["a8::", "a7::", "a6::"], segleft=1
222         )
223         # create traffic stream pg0->pg1
224         pkts.extend(
225             self.create_stream(
226                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
227             )
228         )
229
230         # send packets and verify received packets
231         self.send_and_verify_pkts(
232             self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_T_Encaps
233         )
234
235         # log the localsid counters
236         self.logger.info(self.vapi.cli("show sr localsid"))
237
238         # remove SR steering
239         pol_steering.remove_vpp_config()
240         self.logger.info(self.vapi.cli("show sr steering policies"))
241
242         # remove SR Policies
243         self.sr_policy.remove_vpp_config()
244         self.logger.info(self.vapi.cli("show sr policies"))
245
246         # remove FIB entries
247         # done by tearDown
248
249         # cleanup interfaces
250         self.teardown_interfaces()
251
252     def test_SRv6_T_Encaps_with_v6src(self):
253         """Test SRv6 Transit.Encaps behavior for IPv6 and select multiple src v6addr case."""
254         # send traffic to one destination interface
255         # source and destination are IPv6 only
256         self.setup_interfaces(ipv6=[True, True])
257
258         # configure FIB entries
259         route = VppIpRoute(
260             self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
261         )
262         route.add_vpp_config()
263
264         # configure encaps IPv6 source address
265         # needs to be done before SR Policy config
266         # TODO: API?
267         self.vapi.cli("set sr encaps source addr a3::")
268
269         bsid = "a3::9999:1"
270         other_src_ip = "b1::"
271         # configure SRv6 Policy
272         # Note: segment list order: first -> last
273         sr_policy = VppSRv6PolicyV2(
274             self,
275             bsid=bsid,
276             is_encap=1,
277             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
278             weight=1,
279             fib_table=0,
280             segments=["a4::", "a5::", "a6::c7"],
281             encap_src=other_src_ip,
282             source=other_src_ip,
283         )
284         sr_policy.add_vpp_config()
285         self.sr_policy = sr_policy
286
287         # log the sr policies
288         self.logger.info(self.vapi.cli("show sr policies"))
289
290         # steer IPv6 traffic to a7::/64 into SRv6 Policy
291         # use the bsid of the above self.sr_policy
292         pol_steering = VppSRv6Steering(
293             self,
294             bsid=self.sr_policy.bsid,
295             prefix="a7::",
296             mask_width=64,
297             traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
298             sr_policy_index=0,
299             table_id=0,
300             sw_if_index=0,
301         )
302         pol_steering.add_vpp_config()
303
304         # log the sr steering policies
305         self.logger.info(self.vapi.cli("show sr steering-policies"))
306
307         # create packets
308         count = len(self.pg_packet_sizes)
309         dst_inner = "a7::1234"
310         pkts = []
311
312         # create IPv6 packets without SRH
313         packet_header = self.create_packet_header_IPv6(dst_inner)
314         # create traffic stream pg0->pg1
315         pkts.extend(
316             self.create_stream(
317                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
318             )
319         )
320
321         # create IPv6 packets with SRH
322         # packets with segments-left 1, active segment a7::
323         packet_header = self.create_packet_header_IPv6_SRH(
324             sidlist=["a8::", "a7::", "a6::"], segleft=1
325         )
326         # create traffic stream pg0->pg1
327         pkts.extend(
328             self.create_stream(
329                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
330             )
331         )
332
333         # create IPv6 packets with SRH and IPv6
334         # packets with segments-left 1, active segment a7::
335         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
336             dst_inner, sidlist=["a8::", "a7::", "a6::"], segleft=1
337         )
338         # create traffic stream pg0->pg1
339         pkts.extend(
340             self.create_stream(
341                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
342             )
343         )
344
345         # send packets and verify received packets
346         self.send_and_verify_pkts(
347             self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_T_Encaps
348         )
349
350         # log the localsid counters
351         self.logger.info(self.vapi.cli("show sr localsid"))
352
353         # remove SR steering
354         pol_steering.remove_vpp_config()
355         self.logger.info(self.vapi.cli("show sr steering-policies"))
356
357         # remove SR Policies
358         self.sr_policy.remove_vpp_config()
359         self.logger.info(self.vapi.cli("show sr policies"))
360
361         # remove FIB entries
362         # done by tearDown
363
364         # cleanup interfaces
365         self.teardown_interfaces()
366
367     @unittest.skipUnless(0, "PC to fix")
368     def test_SRv6_T_Insert(self):
369         """Test SRv6 Transit.Insert behavior (IPv6 only)."""
370         # send traffic to one destination interface
371         # source and destination are IPv6 only
372         self.setup_interfaces(ipv6=[True, True])
373
374         # configure FIB entries
375         route = VppIpRoute(
376             self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
377         )
378         route.add_vpp_config()
379
380         # configure encaps IPv6 source address
381         # needs to be done before SR Policy config
382         # TODO: API?
383         self.vapi.cli("set sr encaps source addr a3::")
384
385         bsid = "a3::9999:1"
386         # configure SRv6 Policy
387         # Note: segment list order: first -> last
388         sr_policy = VppSRv6Policy(
389             self,
390             bsid=bsid,
391             is_encap=0,
392             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
393             weight=1,
394             fib_table=0,
395             segments=["a4::", "a5::", "a6::c7"],
396             source="a3::",
397         )
398         sr_policy.add_vpp_config()
399         self.sr_policy = sr_policy
400
401         # log the sr policies
402         self.logger.info(self.vapi.cli("show sr policies"))
403
404         # steer IPv6 traffic to a7::/64 into SRv6 Policy
405         # use the bsid of the above self.sr_policy
406         pol_steering = VppSRv6Steering(
407             self,
408             bsid=self.sr_policy.bsid,
409             prefix="a7::",
410             mask_width=64,
411             traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
412             sr_policy_index=0,
413             table_id=0,
414             sw_if_index=0,
415         )
416         pol_steering.add_vpp_config()
417
418         # log the sr steering policies
419         self.logger.info(self.vapi.cli("show sr steering policies"))
420
421         # create packets
422         count = len(self.pg_packet_sizes)
423         dst_inner = "a7::1234"
424         pkts = []
425
426         # create IPv6 packets without SRH
427         packet_header = self.create_packet_header_IPv6(dst_inner)
428         # create traffic stream pg0->pg1
429         pkts.extend(
430             self.create_stream(
431                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
432             )
433         )
434
435         # create IPv6 packets with SRH
436         # packets with segments-left 1, active segment a7::
437         packet_header = self.create_packet_header_IPv6_SRH(
438             sidlist=["a8::", "a7::", "a6::"], segleft=1
439         )
440         # create traffic stream pg0->pg1
441         pkts.extend(
442             self.create_stream(
443                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
444             )
445         )
446
447         # send packets and verify received packets
448         self.send_and_verify_pkts(
449             self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_T_Insert
450         )
451
452         # log the localsid counters
453         self.logger.info(self.vapi.cli("show sr localsid"))
454
455         # remove SR steering
456         pol_steering.remove_vpp_config()
457         self.logger.info(self.vapi.cli("show sr steering policies"))
458
459         # remove SR Policies
460         self.sr_policy.remove_vpp_config()
461         self.logger.info(self.vapi.cli("show sr policies"))
462
463         # remove FIB entries
464         # done by tearDown
465
466         # cleanup interfaces
467         self.teardown_interfaces()
468
469     @unittest.skipUnless(0, "PC to fix")
470     def test_SRv6_T_Encaps_IPv4(self):
471         """Test SRv6 Transit.Encaps behavior for IPv4."""
472         # send traffic to one destination interface
473         # source interface is IPv4 only
474         # destination interface is IPv6 only
475         self.setup_interfaces(ipv6=[False, True], ipv4=[True, False])
476
477         # configure FIB entries
478         route = VppIpRoute(
479             self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
480         )
481         route.add_vpp_config()
482
483         # configure encaps IPv6 source address
484         # needs to be done before SR Policy config
485         # TODO: API?
486         self.vapi.cli("set sr encaps source addr a3::")
487
488         bsid = "a3::9999:1"
489         # configure SRv6 Policy
490         # Note: segment list order: first -> last
491         sr_policy = VppSRv6Policy(
492             self,
493             bsid=bsid,
494             is_encap=1,
495             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
496             weight=1,
497             fib_table=0,
498             segments=["a4::", "a5::", "a6::c7"],
499             source="a3::",
500         )
501         sr_policy.add_vpp_config()
502         self.sr_policy = sr_policy
503
504         # log the sr policies
505         self.logger.info(self.vapi.cli("show sr policies"))
506
507         # steer IPv4 traffic to 7.1.1.0/24 into SRv6 Policy
508         # use the bsid of the above self.sr_policy
509         pol_steering = VppSRv6Steering(
510             self,
511             bsid=self.sr_policy.bsid,
512             prefix="7.1.1.0",
513             mask_width=24,
514             traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV4,
515             sr_policy_index=0,
516             table_id=0,
517             sw_if_index=0,
518         )
519         pol_steering.add_vpp_config()
520
521         # log the sr steering policies
522         self.logger.info(self.vapi.cli("show sr steering policies"))
523
524         # create packets
525         count = len(self.pg_packet_sizes)
526         dst_inner = "7.1.1.123"
527         pkts = []
528
529         # create IPv4 packets
530         packet_header = self.create_packet_header_IPv4(dst_inner)
531         # create traffic stream pg0->pg1
532         pkts.extend(
533             self.create_stream(
534                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
535             )
536         )
537
538         # send packets and verify received packets
539         self.send_and_verify_pkts(
540             self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_T_Encaps_IPv4
541         )
542
543         # log the localsid counters
544         self.logger.info(self.vapi.cli("show sr localsid"))
545
546         # remove SR steering
547         pol_steering.remove_vpp_config()
548         self.logger.info(self.vapi.cli("show sr steering policies"))
549
550         # remove SR Policies
551         self.sr_policy.remove_vpp_config()
552         self.logger.info(self.vapi.cli("show sr policies"))
553
554         # remove FIB entries
555         # done by tearDown
556
557         # cleanup interfaces
558         self.teardown_interfaces()
559
560     @unittest.skip("VPP crashes after running this test")
561     def test_SRv6_T_Encaps_L2(self):
562         """Test SRv6 Transit.Encaps behavior for L2."""
563         # send traffic to one destination interface
564         # source interface is IPv4 only TODO?
565         # destination interface is IPv6 only
566         self.setup_interfaces(ipv6=[False, True], ipv4=[False, False])
567
568         # configure FIB entries
569         route = VppIpRoute(
570             self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
571         )
572         route.add_vpp_config()
573
574         # configure encaps IPv6 source address
575         # needs to be done before SR Policy config
576         # TODO: API?
577         self.vapi.cli("set sr encaps source addr a3::")
578
579         bsid = "a3::9999:1"
580         # configure SRv6 Policy
581         # Note: segment list order: first -> last
582         sr_policy = VppSRv6Policy(
583             self,
584             bsid=bsid,
585             is_encap=1,
586             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
587             weight=1,
588             fib_table=0,
589             segments=["a4::", "a5::", "a6::c7"],
590             source="a3::",
591         )
592         sr_policy.add_vpp_config()
593         self.sr_policy = sr_policy
594
595         # log the sr policies
596         self.logger.info(self.vapi.cli("show sr policies"))
597
598         # steer L2 traffic into SRv6 Policy
599         # use the bsid of the above self.sr_policy
600         pol_steering = VppSRv6Steering(
601             self,
602             bsid=self.sr_policy.bsid,
603             prefix="::",
604             mask_width=0,
605             traffic_type=SRv6PolicySteeringTypes.SR_STEER_L2,
606             sr_policy_index=0,
607             table_id=0,
608             sw_if_index=self.pg0.sw_if_index,
609         )
610         pol_steering.add_vpp_config()
611
612         # log the sr steering policies
613         self.logger.info(self.vapi.cli("show sr steering policies"))
614
615         # create packets
616         count = len(self.pg_packet_sizes)
617         pkts = []
618
619         # create L2 packets without dot1q header
620         packet_header = self.create_packet_header_L2()
621         # create traffic stream pg0->pg1
622         pkts.extend(
623             self.create_stream(
624                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
625             )
626         )
627
628         # create L2 packets with dot1q header
629         packet_header = self.create_packet_header_L2(vlan=123)
630         # create traffic stream pg0->pg1
631         pkts.extend(
632             self.create_stream(
633                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
634             )
635         )
636
637         # send packets and verify received packets
638         self.send_and_verify_pkts(
639             self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_T_Encaps_L2
640         )
641
642         # log the localsid counters
643         self.logger.info(self.vapi.cli("show sr localsid"))
644
645         # remove SR steering
646         pol_steering.remove_vpp_config()
647         self.logger.info(self.vapi.cli("show sr steering policies"))
648
649         # remove SR Policies
650         self.sr_policy.remove_vpp_config()
651         self.logger.info(self.vapi.cli("show sr policies"))
652
653         # remove FIB entries
654         # done by tearDown
655
656         # cleanup interfaces
657         self.teardown_interfaces()
658
659     def test_SRv6_End(self):
660         """Test SRv6 End (without PSP) behavior."""
661         # send traffic to one destination interface
662         # source and destination interfaces are IPv6 only
663         self.setup_interfaces(ipv6=[True, True])
664
665         # configure FIB entries
666         route = VppIpRoute(
667             self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
668         )
669         route.add_vpp_config()
670
671         # configure SRv6 localSID End without PSP behavior
672         localsid = VppSRv6LocalSID(
673             self,
674             localsid="A3::0",
675             behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
676             nh_addr=0,
677             end_psp=0,
678             sw_if_index=0,
679             vlan_index=0,
680             fib_table=0,
681         )
682         localsid.add_vpp_config()
683         # log the localsids
684         self.logger.debug(self.vapi.cli("show sr localsid"))
685
686         # create IPv6 packets with SRH (SL=2, SL=1, SL=0)
687         # send one packet per SL value per packet size
688         # SL=0 packet with localSID End with USP needs 2nd SRH
689         count = len(self.pg_packet_sizes)
690         dst_inner = "a4::1234"
691         pkts = []
692
693         # packets with segments-left 2, active segment a3::
694         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
695             dst_inner, sidlist=["a5::", "a4::", "a3::"], segleft=2
696         )
697         # create traffic stream pg0->pg1
698         pkts.extend(
699             self.create_stream(
700                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
701             )
702         )
703
704         # packets with segments-left 1, active segment a3::
705         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
706             dst_inner, sidlist=["a4::", "a3::", "a2::"], segleft=1
707         )
708         # add to traffic stream pg0->pg1
709         pkts.extend(
710             self.create_stream(
711                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
712             )
713         )
714
715         # TODO: test behavior with SL=0 packet (needs 2*SRH?)
716
717         expected_count = len(pkts)
718
719         # packets without SRH (should not crash)
720         packet_header = self.create_packet_header_IPv6("a3::")
721         # create traffic stream pg0->pg1
722         pkts.extend(
723             self.create_stream(
724                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
725             )
726         )
727
728         # send packets and verify received packets
729         self.send_and_verify_pkts(
730             self.pg0,
731             pkts,
732             self.pg1,
733             self.compare_rx_tx_packet_End,
734             expected_count=expected_count,
735         )
736
737         # log the localsid counters
738         self.logger.info(self.vapi.cli("show sr localsid"))
739
740         # remove SRv6 localSIDs
741         localsid.remove_vpp_config()
742
743         # remove FIB entries
744         # done by tearDown
745
746         # cleanup interfaces
747         self.teardown_interfaces()
748
749     def test_SRv6_End_with_PSP(self):
750         """Test SRv6 End with PSP behavior."""
751         # send traffic to one destination interface
752         # source and destination interfaces are IPv6 only
753         self.setup_interfaces(ipv6=[True, True])
754
755         # configure FIB entries
756         route = VppIpRoute(
757             self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
758         )
759         route.add_vpp_config()
760
761         # configure SRv6 localSID End with PSP behavior
762         localsid = VppSRv6LocalSID(
763             self,
764             localsid="A3::0",
765             behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
766             nh_addr=0,
767             end_psp=1,
768             sw_if_index=0,
769             vlan_index=0,
770             fib_table=0,
771         )
772         localsid.add_vpp_config()
773         # log the localsids
774         self.logger.debug(self.vapi.cli("show sr localsid"))
775
776         # create IPv6 packets with SRH (SL=2, SL=1)
777         # send one packet per SL value per packet size
778         # SL=0 packet with localSID End with PSP is dropped
779         count = len(self.pg_packet_sizes)
780         dst_inner = "a4::1234"
781         pkts = []
782
783         # packets with segments-left 2, active segment a3::
784         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
785             dst_inner, sidlist=["a5::", "a4::", "a3::"], segleft=2
786         )
787         # create traffic stream pg0->pg1
788         pkts.extend(
789             self.create_stream(
790                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
791             )
792         )
793
794         # packets with segments-left 1, active segment a3::
795         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
796             dst_inner, sidlist=["a4::", "a3::", "a2::"], segleft=1
797         )
798         # add to traffic stream pg0->pg1
799         pkts.extend(
800             self.create_stream(
801                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
802             )
803         )
804
805         # send packets and verify received packets
806         self.send_and_verify_pkts(
807             self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_End_PSP
808         )
809
810         # log the localsid counters
811         self.logger.info(self.vapi.cli("show sr localsid"))
812
813         # remove SRv6 localSIDs
814         localsid.remove_vpp_config()
815
816         # remove FIB entries
817         # done by tearDown
818
819         # cleanup interfaces
820         self.teardown_interfaces()
821
822     def test_SRv6_End_X(self):
823         """Test SRv6 End.X (without PSP) behavior."""
824         # create three interfaces (1 source, 2 destinations)
825         # source and destination interfaces are IPv6 only
826         self.setup_interfaces(ipv6=[True, True, True])
827
828         # configure FIB entries
829         # a4::/64 via pg1 and pg2
830         route = VppIpRoute(
831             self,
832             "a4::",
833             64,
834             [
835                 VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index),
836                 VppRoutePath(self.pg2.remote_ip6, self.pg2.sw_if_index),
837             ],
838         )
839         route.add_vpp_config()
840         self.logger.debug(self.vapi.cli("show ip6 fib"))
841
842         # configure SRv6 localSID End.X without PSP behavior
843         # End.X points to interface pg1
844         localsid = VppSRv6LocalSID(
845             self,
846             localsid="A3::C4",
847             behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
848             nh_addr=self.pg1.remote_ip6,
849             end_psp=0,
850             sw_if_index=self.pg1.sw_if_index,
851             vlan_index=0,
852             fib_table=0,
853         )
854         localsid.add_vpp_config()
855         # log the localsids
856         self.logger.debug(self.vapi.cli("show sr localsid"))
857
858         # create IPv6 packets with SRH (SL=2, SL=1)
859         # send one packet per SL value per packet size
860         # SL=0 packet with localSID End with PSP is dropped
861         count = len(self.pg_packet_sizes)
862         dst_inner = "a4::1234"
863         pkts = []
864
865         # packets with segments-left 2, active segment a3::c4
866         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
867             dst_inner, sidlist=["a5::", "a4::", "a3::c4"], segleft=2
868         )
869         # create traffic stream pg0->pg1
870         pkts.extend(
871             self.create_stream(
872                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
873             )
874         )
875
876         # packets with segments-left 1, active segment a3::c4
877         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
878             dst_inner, sidlist=["a4::", "a3::c4", "a2::"], segleft=1
879         )
880         # add to traffic stream pg0->pg1
881         pkts.extend(
882             self.create_stream(
883                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
884             )
885         )
886
887         # send packets and verify received packets
888         # using same comparison function as End (no PSP)
889         self.send_and_verify_pkts(
890             self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_End
891         )
892
893         # assert nothing was received on the other interface (pg2)
894         self.pg2.assert_nothing_captured(remark="mis-directed packet(s)")
895
896         # log the localsid counters
897         self.logger.info(self.vapi.cli("show sr localsid"))
898
899         # remove SRv6 localSIDs
900         localsid.remove_vpp_config()
901
902         # remove FIB entries
903         # done by tearDown
904
905         # cleanup interfaces
906         self.teardown_interfaces()
907
908     def test_SRv6_End_X_with_PSP(self):
909         """Test SRv6 End.X with PSP behavior."""
910         # create three interfaces (1 source, 2 destinations)
911         # source and destination interfaces are IPv6 only
912         self.setup_interfaces(ipv6=[True, True, True])
913
914         # configure FIB entries
915         # a4::/64 via pg1 and pg2
916         route = VppIpRoute(
917             self,
918             "a4::",
919             64,
920             [
921                 VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index),
922                 VppRoutePath(self.pg2.remote_ip6, self.pg2.sw_if_index),
923             ],
924         )
925         route.add_vpp_config()
926
927         # configure SRv6 localSID End with PSP behavior
928         localsid = VppSRv6LocalSID(
929             self,
930             localsid="A3::C4",
931             behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
932             nh_addr=self.pg1.remote_ip6,
933             end_psp=1,
934             sw_if_index=self.pg1.sw_if_index,
935             vlan_index=0,
936             fib_table=0,
937         )
938         localsid.add_vpp_config()
939         # log the localsids
940         self.logger.debug(self.vapi.cli("show sr localsid"))
941
942         # create IPv6 packets with SRH (SL=2, SL=1)
943         # send one packet per SL value per packet size
944         # SL=0 packet with localSID End with PSP is dropped
945         count = len(self.pg_packet_sizes)
946         dst_inner = "a4::1234"
947         pkts = []
948
949         # packets with segments-left 2, active segment a3::
950         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
951             dst_inner, sidlist=["a5::", "a4::", "a3::c4"], segleft=2
952         )
953         # create traffic stream pg0->pg1
954         pkts.extend(
955             self.create_stream(
956                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
957             )
958         )
959
960         # packets with segments-left 1, active segment a3::
961         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
962             dst_inner, sidlist=["a4::", "a3::c4", "a2::"], segleft=1
963         )
964         # add to traffic stream pg0->pg1
965         pkts.extend(
966             self.create_stream(
967                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
968             )
969         )
970
971         # send packets and verify received packets
972         # using same comparison function as End with PSP
973         self.send_and_verify_pkts(
974             self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_End_PSP
975         )
976
977         # assert nothing was received on the other interface (pg2)
978         self.pg2.assert_nothing_captured(remark="mis-directed packet(s)")
979
980         # log the localsid counters
981         self.logger.info(self.vapi.cli("show sr localsid"))
982
983         # remove SRv6 localSIDs
984         localsid.remove_vpp_config()
985
986         # remove FIB entries
987         # done by tearDown
988
989         # cleanup interfaces
990         self.teardown_interfaces()
991
992     def test_SRv6_End_DX6(self):
993         """Test SRv6 End.DX6 behavior."""
994         # send traffic to one destination interface
995         # source and destination interfaces are IPv6 only
996         self.setup_interfaces(ipv6=[True, True])
997
998         # configure SRv6 localSID End.DX6 behavior
999         localsid = VppSRv6LocalSID(
1000             self,
1001             localsid="A3::C4",
1002             behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX6,
1003             nh_addr=self.pg1.remote_ip6,
1004             end_psp=0,
1005             sw_if_index=self.pg1.sw_if_index,
1006             vlan_index=0,
1007             fib_table=0,
1008         )
1009         localsid.add_vpp_config()
1010         # log the localsids
1011         self.logger.debug(self.vapi.cli("show sr localsid"))
1012
1013         # create IPv6 packets with SRH (SL=0)
1014         # send one packet per packet size
1015         count = len(self.pg_packet_sizes)
1016         dst_inner = "a4::1234"  # inner header destination address
1017         pkts = []
1018
1019         # packets with SRH, segments-left 0, active segment a3::c4
1020         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
1021             dst_inner, sidlist=["a3::c4", "a2::", "a1::"], segleft=0
1022         )
1023         # add to traffic stream pg0->pg1
1024         pkts.extend(
1025             self.create_stream(
1026                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
1027             )
1028         )
1029
1030         # packets without SRH, IPv6 in IPv6
1031         # outer IPv6 dest addr is the localsid End.DX6
1032         packet_header = self.create_packet_header_IPv6_IPv6(
1033             dst_inner, dst_outer="a3::c4"
1034         )
1035         # add to traffic stream pg0->pg1
1036         pkts.extend(
1037             self.create_stream(
1038                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
1039             )
1040         )
1041
1042         # send packets and verify received packets
1043         self.send_and_verify_pkts(
1044             self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_End_DX6
1045         )
1046
1047         # log the localsid counters
1048         self.logger.info(self.vapi.cli("show sr localsid"))
1049
1050         # remove SRv6 localSIDs
1051         localsid.remove_vpp_config()
1052
1053         # cleanup interfaces
1054         self.teardown_interfaces()
1055
1056     def test_SRv6_End_DT6(self):
1057         """Test SRv6 End.DT6 behavior."""
1058         # create three interfaces (1 source, 2 destinations)
1059         # all interfaces are IPv6 only
1060         # source interface in global FIB (0)
1061         # destination interfaces in global and vrf
1062         vrf_1 = 1
1063         ipt = VppIpTable(self, vrf_1, is_ip6=True)
1064         ipt.add_vpp_config()
1065         self.setup_interfaces(ipv6=[True, True, True], ipv6_table_id=[0, 0, vrf_1])
1066
1067         # configure FIB entries
1068         # a4::/64 is reachable
1069         #     via pg1 in table 0 (global)
1070         #     and via pg2 in table vrf_1
1071         route0 = VppIpRoute(
1072             self,
1073             "a4::",
1074             64,
1075             [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index, nh_table_id=0)],
1076             table_id=0,
1077         )
1078         route0.add_vpp_config()
1079         route1 = VppIpRoute(
1080             self,
1081             "a4::",
1082             64,
1083             [
1084                 VppRoutePath(
1085                     self.pg2.remote_ip6, self.pg2.sw_if_index, nh_table_id=vrf_1
1086                 )
1087             ],
1088             table_id=vrf_1,
1089         )
1090         route1.add_vpp_config()
1091         self.logger.debug(self.vapi.cli("show ip6 fib"))
1092
1093         # configure SRv6 localSID End.DT6 behavior
1094         # Note:
1095         # fib_table: where the localsid is installed
1096         # sw_if_index: in T-variants of localsid this is the vrf table_id
1097         localsid = VppSRv6LocalSID(
1098             self,
1099             localsid="A3::C4",
1100             behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT6,
1101             nh_addr=0,
1102             end_psp=0,
1103             sw_if_index=vrf_1,
1104             vlan_index=0,
1105             fib_table=0,
1106         )
1107         localsid.add_vpp_config()
1108         # log the localsids
1109         self.logger.debug(self.vapi.cli("show sr localsid"))
1110
1111         # create IPv6 packets with SRH (SL=0)
1112         # send one packet per packet size
1113         count = len(self.pg_packet_sizes)
1114         dst_inner = "a4::1234"  # inner header destination address
1115         pkts = []
1116
1117         # packets with SRH, segments-left 0, active segment a3::c4
1118         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
1119             dst_inner, sidlist=["a3::c4", "a2::", "a1::"], segleft=0
1120         )
1121         # add to traffic stream pg0->pg1
1122         pkts.extend(
1123             self.create_stream(
1124                 self.pg0, self.pg2, packet_header, self.pg_packet_sizes, count
1125             )
1126         )
1127
1128         # packets without SRH, IPv6 in IPv6
1129         # outer IPv6 dest addr is the localsid End.DT6
1130         packet_header = self.create_packet_header_IPv6_IPv6(
1131             dst_inner, dst_outer="a3::c4"
1132         )
1133         # add to traffic stream pg0->pg1
1134         pkts.extend(
1135             self.create_stream(
1136                 self.pg0, self.pg2, packet_header, self.pg_packet_sizes, count
1137             )
1138         )
1139
1140         # send packets and verify received packets
1141         # using same comparison function as End.DX6
1142         self.send_and_verify_pkts(
1143             self.pg0, pkts, self.pg2, self.compare_rx_tx_packet_End_DX6
1144         )
1145
1146         # assert nothing was received on the other interface (pg2)
1147         self.pg1.assert_nothing_captured(remark="mis-directed packet(s)")
1148
1149         # log the localsid counters
1150         self.logger.info(self.vapi.cli("show sr localsid"))
1151
1152         # remove SRv6 localSIDs
1153         localsid.remove_vpp_config()
1154
1155         # remove FIB entries
1156         # done by tearDown
1157
1158         # cleanup interfaces
1159         self.teardown_interfaces()
1160
1161     def test_SRv6_End_DX4(self):
1162         """Test SRv6 End.DX4 behavior."""
1163         # send traffic to one destination interface
1164         # source interface is IPv6 only
1165         # destination interface is IPv4 only
1166         self.setup_interfaces(ipv6=[True, False], ipv4=[False, True])
1167
1168         # configure SRv6 localSID End.DX4 behavior
1169         localsid = VppSRv6LocalSID(
1170             self,
1171             localsid="A3::C4",
1172             behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX4,
1173             nh_addr=self.pg1.remote_ip4,
1174             end_psp=0,
1175             sw_if_index=self.pg1.sw_if_index,
1176             vlan_index=0,
1177             fib_table=0,
1178         )
1179         localsid.add_vpp_config()
1180         # log the localsids
1181         self.logger.debug(self.vapi.cli("show sr localsid"))
1182
1183         # send one packet per packet size
1184         count = len(self.pg_packet_sizes)
1185         dst_inner = "4.1.1.123"  # inner header destination address
1186         pkts = []
1187
1188         # packets with SRH, segments-left 0, active segment a3::c4
1189         packet_header = self.create_packet_header_IPv6_SRH_IPv4(
1190             dst_inner, sidlist=["a3::c4", "a2::", "a1::"], segleft=0
1191         )
1192         # add to traffic stream pg0->pg1
1193         pkts.extend(
1194             self.create_stream(
1195                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
1196             )
1197         )
1198
1199         # packets without SRH, IPv4 in IPv6
1200         # outer IPv6 dest addr is the localsid End.DX4
1201         packet_header = self.create_packet_header_IPv6_IPv4(
1202             dst_inner, dst_outer="a3::c4"
1203         )
1204         # add to traffic stream pg0->pg1
1205         pkts.extend(
1206             self.create_stream(
1207                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
1208             )
1209         )
1210
1211         # send packets and verify received packets
1212         self.send_and_verify_pkts(
1213             self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_End_DX4
1214         )
1215
1216         # log the localsid counters
1217         self.logger.info(self.vapi.cli("show sr localsid"))
1218
1219         # remove SRv6 localSIDs
1220         localsid.remove_vpp_config()
1221
1222         # cleanup interfaces
1223         self.teardown_interfaces()
1224
1225     def test_SRv6_End_DT4(self):
1226         """Test SRv6 End.DT4 behavior."""
1227         # create three interfaces (1 source, 2 destinations)
1228         # source interface is IPv6-only
1229         # destination interfaces are IPv4 only
1230         # source interface in global FIB (0)
1231         # destination interfaces in global and vrf
1232         vrf_1 = 1
1233         ipt = VppIpTable(self, vrf_1)
1234         ipt.add_vpp_config()
1235         self.setup_interfaces(
1236             ipv6=[True, False, False],
1237             ipv4=[False, True, True],
1238             ipv6_table_id=[0, 0, 0],
1239             ipv4_table_id=[0, 0, vrf_1],
1240         )
1241
1242         # configure FIB entries
1243         # 4.1.1.0/24 is reachable
1244         #     via pg1 in table 0 (global)
1245         #     and via pg2 in table vrf_1
1246         route0 = VppIpRoute(
1247             self,
1248             "4.1.1.0",
1249             24,
1250             [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index, nh_table_id=0)],
1251             table_id=0,
1252         )
1253         route0.add_vpp_config()
1254         route1 = VppIpRoute(
1255             self,
1256             "4.1.1.0",
1257             24,
1258             [
1259                 VppRoutePath(
1260                     self.pg2.remote_ip4, self.pg2.sw_if_index, nh_table_id=vrf_1
1261                 )
1262             ],
1263             table_id=vrf_1,
1264         )
1265         route1.add_vpp_config()
1266         self.logger.debug(self.vapi.cli("show ip fib"))
1267
1268         # configure SRv6 localSID End.DT6 behavior
1269         # Note:
1270         # fib_table: where the localsid is installed
1271         # sw_if_index: in T-variants of localsid: vrf table_id
1272         localsid = VppSRv6LocalSID(
1273             self,
1274             localsid="A3::C4",
1275             behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT4,
1276             nh_addr=0,
1277             end_psp=0,
1278             sw_if_index=vrf_1,
1279             vlan_index=0,
1280             fib_table=0,
1281         )
1282         localsid.add_vpp_config()
1283         # log the localsids
1284         self.logger.debug(self.vapi.cli("show sr localsid"))
1285
1286         # create IPv6 packets with SRH (SL=0)
1287         # send one packet per packet size
1288         count = len(self.pg_packet_sizes)
1289         dst_inner = "4.1.1.123"  # inner header destination address
1290         pkts = []
1291
1292         # packets with SRH, segments-left 0, active segment a3::c4
1293         packet_header = self.create_packet_header_IPv6_SRH_IPv4(
1294             dst_inner, sidlist=["a3::c4", "a2::", "a1::"], segleft=0
1295         )
1296         # add to traffic stream pg0->pg1
1297         pkts.extend(
1298             self.create_stream(
1299                 self.pg0, self.pg2, packet_header, self.pg_packet_sizes, count
1300             )
1301         )
1302
1303         # packets without SRH, IPv6 in IPv6
1304         # outer IPv6 dest addr is the localsid End.DX4
1305         packet_header = self.create_packet_header_IPv6_IPv4(
1306             dst_inner, dst_outer="a3::c4"
1307         )
1308         # add to traffic stream pg0->pg1
1309         pkts.extend(
1310             self.create_stream(
1311                 self.pg0, self.pg2, packet_header, self.pg_packet_sizes, count
1312             )
1313         )
1314
1315         # send packets and verify received packets
1316         # using same comparison function as End.DX4
1317         self.send_and_verify_pkts(
1318             self.pg0, pkts, self.pg2, self.compare_rx_tx_packet_End_DX4
1319         )
1320
1321         # assert nothing was received on the other interface (pg2)
1322         self.pg1.assert_nothing_captured(remark="mis-directed packet(s)")
1323
1324         # log the localsid counters
1325         self.logger.info(self.vapi.cli("show sr localsid"))
1326
1327         # remove SRv6 localSIDs
1328         localsid.remove_vpp_config()
1329
1330         # remove FIB entries
1331         # done by tearDown
1332
1333         # cleanup interfaces
1334         self.teardown_interfaces()
1335
1336     def test_SRv6_End_DX2(self):
1337         """Test SRv6 End.DX2 behavior."""
1338         # send traffic to one destination interface
1339         # source interface is IPv6 only
1340         self.setup_interfaces(ipv6=[True, False], ipv4=[False, False])
1341
1342         # configure SRv6 localSID End.DX2 behavior
1343         localsid = VppSRv6LocalSID(
1344             self,
1345             localsid="A3::C4",
1346             behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX2,
1347             nh_addr=0,
1348             end_psp=0,
1349             sw_if_index=self.pg1.sw_if_index,
1350             vlan_index=0,
1351             fib_table=0,
1352         )
1353         localsid.add_vpp_config()
1354         # log the localsids
1355         self.logger.debug(self.vapi.cli("show sr localsid"))
1356
1357         # send one packet per packet size
1358         count = len(self.pg_packet_sizes)
1359         pkts = []
1360
1361         # packets with SRH, segments-left 0, active segment a3::c4
1362         # L2 has no dot1q header
1363         packet_header = self.create_packet_header_IPv6_SRH_L2(
1364             sidlist=["a3::c4", "a2::", "a1::"], segleft=0, vlan=0
1365         )
1366         # add to traffic stream pg0->pg1
1367         pkts.extend(
1368             self.create_stream(
1369                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
1370             )
1371         )
1372
1373         # packets with SRH, segments-left 0, active segment a3::c4
1374         # L2 has dot1q header
1375         packet_header = self.create_packet_header_IPv6_SRH_L2(
1376             sidlist=["a3::c4", "a2::", "a1::"], segleft=0, vlan=123
1377         )
1378         # add to traffic stream pg0->pg1
1379         pkts.extend(
1380             self.create_stream(
1381                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
1382             )
1383         )
1384
1385         # packets without SRH, L2 in IPv6
1386         # outer IPv6 dest addr is the localsid End.DX2
1387         # L2 has no dot1q header
1388         packet_header = self.create_packet_header_IPv6_L2(dst_outer="a3::c4", vlan=0)
1389         # add to traffic stream pg0->pg1
1390         pkts.extend(
1391             self.create_stream(
1392                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
1393             )
1394         )
1395
1396         # packets without SRH, L2 in IPv6
1397         # outer IPv6 dest addr is the localsid End.DX2
1398         # L2 has dot1q header
1399         packet_header = self.create_packet_header_IPv6_L2(dst_outer="a3::c4", vlan=123)
1400         # add to traffic stream pg0->pg1
1401         pkts.extend(
1402             self.create_stream(
1403                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
1404             )
1405         )
1406
1407         # send packets and verify received packets
1408         self.send_and_verify_pkts(
1409             self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_End_DX2
1410         )
1411
1412         # log the localsid counters
1413         self.logger.info(self.vapi.cli("show sr localsid"))
1414
1415         # remove SRv6 localSIDs
1416         localsid.remove_vpp_config()
1417
1418         # cleanup interfaces
1419         self.teardown_interfaces()
1420
1421     @unittest.skipUnless(0, "PC to fix")
1422     def test_SRv6_T_Insert_Classifier(self):
1423         """Test SRv6 Transit.Insert behavior (IPv6 only).
1424         steer packets using the classifier
1425         """
1426         # send traffic to one destination interface
1427         # source and destination are IPv6 only
1428         self.setup_interfaces(ipv6=[False, False, False, True, True])
1429
1430         # configure FIB entries
1431         route = VppIpRoute(
1432             self, "a4::", 64, [VppRoutePath(self.pg4.remote_ip6, self.pg4.sw_if_index)]
1433         )
1434         route.add_vpp_config()
1435
1436         # configure encaps IPv6 source address
1437         # needs to be done before SR Policy config
1438         # TODO: API?
1439         self.vapi.cli("set sr encaps source addr a3::")
1440
1441         bsid = "a3::9999:1"
1442         # configure SRv6 Policy
1443         # Note: segment list order: first -> last
1444         sr_policy = VppSRv6Policy(
1445             self,
1446             bsid=bsid,
1447             is_encap=0,
1448             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
1449             weight=1,
1450             fib_table=0,
1451             segments=["a4::", "a5::", "a6::c7"],
1452             source="a3::",
1453         )
1454         sr_policy.add_vpp_config()
1455         self.sr_policy = sr_policy
1456
1457         # log the sr policies
1458         self.logger.info(self.vapi.cli("show sr policies"))
1459
1460         # add classify table
1461         # mask on dst ip address prefix a7::/8
1462         mask = "{!s:0<16}".format("ff")
1463         r = self.vapi.classify_add_del_table(
1464             1,
1465             binascii.unhexlify(mask),
1466             match_n_vectors=(len(mask) - 1) // 32 + 1,
1467             current_data_flag=1,
1468             skip_n_vectors=2,
1469         )  # data offset
1470         self.assertIsNotNone(r, "No response msg for add_del_table")
1471         table_index = r.new_table_index
1472
1473         # add the source routing node as a ip6 inacl netxt node
1474         r = self.vapi.add_node_next("ip6-inacl", "sr-pl-rewrite-insert")
1475         inacl_next_node_index = r.node_index
1476
1477         match = "{!s:0<16}".format("a7")
1478         r = self.vapi.classify_add_del_session(
1479             1,
1480             table_index,
1481             binascii.unhexlify(match),
1482             hit_next_index=inacl_next_node_index,
1483             action=3,
1484             metadata=0,
1485         )  # sr policy index
1486         self.assertIsNotNone(r, "No response msg for add_del_session")
1487
1488         # log the classify table used in the steering policy
1489         self.logger.info(self.vapi.cli("show classify table"))
1490
1491         r = self.vapi.input_acl_set_interface(
1492             is_add=1, sw_if_index=self.pg3.sw_if_index, ip6_table_index=table_index
1493         )
1494         self.assertIsNotNone(r, "No response msg for input_acl_set_interface")
1495
1496         # log the ip6 inacl
1497         self.logger.info(self.vapi.cli("show inacl type ip6"))
1498
1499         # create packets
1500         count = len(self.pg_packet_sizes)
1501         dst_inner = "a7::1234"
1502         pkts = []
1503
1504         # create IPv6 packets without SRH
1505         packet_header = self.create_packet_header_IPv6(dst_inner)
1506         # create traffic stream pg3->pg4
1507         pkts.extend(
1508             self.create_stream(
1509                 self.pg3, self.pg4, packet_header, self.pg_packet_sizes, count
1510             )
1511         )
1512
1513         # create IPv6 packets with SRH
1514         # packets with segments-left 1, active segment a7::
1515         packet_header = self.create_packet_header_IPv6_SRH(
1516             sidlist=["a8::", "a7::", "a6::"], segleft=1
1517         )
1518         # create traffic stream pg3->pg4
1519         pkts.extend(
1520             self.create_stream(
1521                 self.pg3, self.pg4, packet_header, self.pg_packet_sizes, count
1522             )
1523         )
1524
1525         # send packets and verify received packets
1526         self.send_and_verify_pkts(
1527             self.pg3, pkts, self.pg4, self.compare_rx_tx_packet_T_Insert
1528         )
1529
1530         # remove the interface l2 input feature
1531         r = self.vapi.input_acl_set_interface(
1532             is_add=0, sw_if_index=self.pg3.sw_if_index, ip6_table_index=table_index
1533         )
1534         self.assertIsNotNone(r, "No response msg for input_acl_set_interface")
1535
1536         # log the ip6 inacl after cleaning
1537         self.logger.info(self.vapi.cli("show inacl type ip6"))
1538
1539         # log the localsid counters
1540         self.logger.info(self.vapi.cli("show sr localsid"))
1541
1542         # remove classifier SR steering
1543         # classifier_steering.remove_vpp_config()
1544         self.logger.info(self.vapi.cli("show sr steering policies"))
1545
1546         # remove SR Policies
1547         self.sr_policy.remove_vpp_config()
1548         self.logger.info(self.vapi.cli("show sr policies"))
1549
1550         # remove classify session and table
1551         r = self.vapi.classify_add_del_session(
1552             0, table_index, binascii.unhexlify(match)
1553         )
1554         self.assertIsNotNone(r, "No response msg for add_del_session")
1555
1556         r = self.vapi.classify_add_del_table(
1557             0, binascii.unhexlify(mask), table_index=table_index
1558         )
1559         self.assertIsNotNone(r, "No response msg for add_del_table")
1560
1561         self.logger.info(self.vapi.cli("show classify table"))
1562
1563         # remove FIB entries
1564         # done by tearDown
1565
1566         # cleanup interfaces
1567         self.teardown_interfaces()
1568
1569     def compare_rx_tx_packet_T_Encaps(self, tx_pkt, rx_pkt):
1570         """Compare input and output packet after passing T.Encaps
1571
1572         :param tx_pkt: transmitted packet
1573         :param rx_pkt: received packet
1574         """
1575         # T.Encaps updates the headers as follows:
1576         # SR Policy seglist (S3, S2, S1)
1577         # SR Policy source C
1578         # IPv6:
1579         # in: IPv6(A, B2)
1580         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(A, B2)
1581         # IPv6 + SRH:
1582         # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1)
1583         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(a, B2)SRH(B3, B2, B1; SL=1)
1584
1585         # get first (outer) IPv6 header of rx'ed packet
1586         rx_ip = rx_pkt.getlayer(IPv6)
1587         rx_srh = None
1588
1589         tx_ip = tx_pkt.getlayer(IPv6)
1590
1591         # expected segment-list
1592         seglist = self.sr_policy.segments
1593         # reverse list to get order as in SRH
1594         tx_seglist = seglist[::-1]
1595
1596         # get source address of SR Policy
1597         sr_policy_source = self.sr_policy.source
1598
1599         # rx'ed packet should have SRH
1600         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1601         # get SRH
1602         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1603
1604         # received ip.src should be equal to SR Policy source
1605         self.assertEqual(rx_ip.src, sr_policy_source)
1606         # received ip.dst should be equal to expected sidlist[lastentry]
1607         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1608         # rx'ed seglist should be equal to expected seglist
1609         self.assertEqual(rx_srh.addresses, tx_seglist)
1610         # segleft should be equal to size expected seglist-1
1611         self.assertEqual(rx_srh.segleft, len(tx_seglist) - 1)
1612         # segleft should be equal to lastentry
1613         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1614
1615         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1616         # except for the hop-limit field
1617         #   -> update tx'ed hlim to the expected hlim
1618         tx_ip.hlim = tx_ip.hlim - 1
1619
1620         self.assertEqual(rx_srh.payload, tx_ip)
1621
1622         self.logger.debug("packet verification: SUCCESS")
1623
1624     def compare_rx_tx_packet_T_Encaps_IPv4(self, tx_pkt, rx_pkt):
1625         """Compare input and output packet after passing T.Encaps for IPv4
1626
1627         :param tx_pkt: transmitted packet
1628         :param rx_pkt: received packet
1629         """
1630         # T.Encaps for IPv4 updates the headers as follows:
1631         # SR Policy seglist (S3, S2, S1)
1632         # SR Policy source C
1633         # IPv4:
1634         # in: IPv4(A, B2)
1635         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv4(A, B2)
1636
1637         # get first (outer) IPv6 header of rx'ed packet
1638         rx_ip = rx_pkt.getlayer(IPv6)
1639         rx_srh = None
1640
1641         tx_ip = tx_pkt.getlayer(IP)
1642
1643         # expected segment-list
1644         seglist = self.sr_policy.segments
1645         # reverse list to get order as in SRH
1646         tx_seglist = seglist[::-1]
1647
1648         # get source address of SR Policy
1649         sr_policy_source = self.sr_policy.source
1650
1651         # checks common to cases tx with and without SRH
1652         # rx'ed packet should have SRH and IPv4 header
1653         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1654         self.assertTrue(rx_ip.payload.haslayer(IP))
1655         # get SRH
1656         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1657
1658         # received ip.src should be equal to SR Policy source
1659         self.assertEqual(rx_ip.src, sr_policy_source)
1660         # received ip.dst should be equal to sidlist[lastentry]
1661         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1662         # rx'ed seglist should be equal to seglist
1663         self.assertEqual(rx_srh.addresses, tx_seglist)
1664         # segleft should be equal to size seglist-1
1665         self.assertEqual(rx_srh.segleft, len(tx_seglist) - 1)
1666         # segleft should be equal to lastentry
1667         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1668
1669         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1670         # except for the ttl field and ip checksum
1671         #   -> adjust tx'ed ttl to expected ttl
1672         tx_ip.ttl = tx_ip.ttl - 1
1673         #   -> set tx'ed ip checksum to None and let scapy recompute
1674         tx_ip.chksum = None
1675         # read back the pkt (with str()) to force computing these fields
1676         # probably other ways to accomplish this are possible
1677         tx_ip = IP(scapy.compat.raw(tx_ip))
1678
1679         self.assertEqual(rx_srh.payload, tx_ip)
1680
1681         self.logger.debug("packet verification: SUCCESS")
1682
1683     def compare_rx_tx_packet_T_Encaps_L2(self, tx_pkt, rx_pkt):
1684         """Compare input and output packet after passing T.Encaps for L2
1685
1686         :param tx_pkt: transmitted packet
1687         :param rx_pkt: received packet
1688         """
1689         # T.Encaps for L2 updates the headers as follows:
1690         # SR Policy seglist (S3, S2, S1)
1691         # SR Policy source C
1692         # L2:
1693         # in: L2
1694         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)L2
1695
1696         # get first (outer) IPv6 header of rx'ed packet
1697         rx_ip = rx_pkt.getlayer(IPv6)
1698         rx_srh = None
1699
1700         tx_ether = tx_pkt.getlayer(Ether)
1701
1702         # expected segment-list
1703         seglist = self.sr_policy.segments
1704         # reverse list to get order as in SRH
1705         tx_seglist = seglist[::-1]
1706
1707         # get source address of SR Policy
1708         sr_policy_source = self.sr_policy.source
1709
1710         # rx'ed packet should have SRH
1711         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1712         # get SRH
1713         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1714
1715         # received ip.src should be equal to SR Policy source
1716         self.assertEqual(rx_ip.src, sr_policy_source)
1717         # received ip.dst should be equal to sidlist[lastentry]
1718         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1719         # rx'ed seglist should be equal to seglist
1720         self.assertEqual(rx_srh.addresses, tx_seglist)
1721         # segleft should be equal to size seglist-1
1722         self.assertEqual(rx_srh.segleft, len(tx_seglist) - 1)
1723         # segleft should be equal to lastentry
1724         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1725         # nh should be "No Next Header" (143)
1726         self.assertEqual(rx_srh.nh, 143)
1727
1728         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1729         self.assertEqual(Ether(scapy.compat.raw(rx_srh.payload)), tx_ether)
1730
1731         self.logger.debug("packet verification: SUCCESS")
1732
1733     def compare_rx_tx_packet_T_Insert(self, tx_pkt, rx_pkt):
1734         """Compare input and output packet after passing T.Insert
1735
1736         :param tx_pkt: transmitted packet
1737         :param rx_pkt: received packet
1738         """
1739         # T.Insert updates the headers as follows:
1740         # IPv6:
1741         # in: IPv6(A, B2)
1742         # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)
1743         # IPv6 + SRH:
1744         # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1)
1745         # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)SRH(B3, B2, B1; SL=1)
1746
1747         # get first (outer) IPv6 header of rx'ed packet
1748         rx_ip = rx_pkt.getlayer(IPv6)
1749         rx_srh = None
1750         rx_ip2 = None
1751         rx_srh2 = None
1752         rx_ip3 = None
1753         rx_udp = rx_pkt[UDP]
1754
1755         tx_ip = tx_pkt.getlayer(IPv6)
1756         tx_srh = None
1757         tx_ip2 = None
1758         # some packets have been tx'ed with an SRH, some without it
1759         # get SRH if tx'ed packet has it
1760         if tx_pkt.haslayer(IPv6ExtHdrSegmentRouting):
1761             tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1762             tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1763         tx_udp = tx_pkt[UDP]
1764
1765         # expected segment-list (make copy of SR Policy segment list)
1766         seglist = self.sr_policy.segments[:]
1767         # expected seglist has initial dest addr as last segment
1768         seglist.append(tx_ip.dst)
1769         # reverse list to get order as in SRH
1770         tx_seglist = seglist[::-1]
1771
1772         # get source address of SR Policy
1773         sr_policy_source = self.sr_policy.source
1774
1775         # checks common to cases tx with and without SRH
1776         # rx'ed packet should have SRH and only one IPv6 header
1777         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1778         self.assertFalse(rx_ip.payload.haslayer(IPv6))
1779         # get SRH
1780         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1781
1782         # rx'ed ip.src should be equal to tx'ed ip.src
1783         self.assertEqual(rx_ip.src, tx_ip.src)
1784         # rx'ed ip.dst should be equal to sidlist[lastentry]
1785         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1786
1787         # rx'ed seglist should be equal to expected seglist
1788         self.assertEqual(rx_srh.addresses, tx_seglist)
1789         # segleft should be equal to size(expected seglist)-1
1790         self.assertEqual(rx_srh.segleft, len(tx_seglist) - 1)
1791         # segleft should be equal to lastentry
1792         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1793
1794         if tx_srh:  # packet was tx'ed with SRH
1795             # packet should have 2nd SRH
1796             self.assertTrue(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting))
1797             # get 2nd SRH
1798             rx_srh2 = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting, 2)
1799
1800             # rx'ed srh2.addresses should be equal to tx'ed srh.addresses
1801             self.assertEqual(rx_srh2.addresses, tx_srh.addresses)
1802             # rx'ed srh2.segleft should be equal to tx'ed srh.segleft
1803             self.assertEqual(rx_srh2.segleft, tx_srh.segleft)
1804             # rx'ed srh2.lastentry should be equal to tx'ed srh.lastentry
1805             self.assertEqual(rx_srh2.lastentry, tx_srh.lastentry)
1806
1807         else:  # packet was tx'ed without SRH
1808             # rx packet should have no other SRH
1809             self.assertFalse(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting))
1810
1811         # UDP layer should be unchanged
1812         self.assertEqual(rx_udp, tx_udp)
1813
1814         self.logger.debug("packet verification: SUCCESS")
1815
1816     def compare_rx_tx_packet_End(self, tx_pkt, rx_pkt):
1817         """Compare input and output packet after passing End (without PSP)
1818
1819         :param tx_pkt: transmitted packet
1820         :param rx_pkt: received packet
1821         """
1822         # End (no PSP) updates the headers as follows:
1823         # IPv6 + SRH:
1824         # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2)
1825         # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1826
1827         # get first (outer) IPv6 header of rx'ed packet
1828         rx_ip = rx_pkt.getlayer(IPv6)
1829         rx_srh = None
1830         rx_ip2 = None
1831         rx_udp = rx_pkt[UDP]
1832
1833         tx_ip = tx_pkt.getlayer(IPv6)
1834         # we know the packet has been tx'ed
1835         # with an inner IPv6 header and an SRH
1836         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1837         tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1838         tx_udp = tx_pkt[UDP]
1839
1840         # common checks, regardless of tx segleft value
1841         # rx'ed packet should have 2nd IPv6 header
1842         self.assertTrue(rx_ip.payload.haslayer(IPv6))
1843         # get second (inner) IPv6 header
1844         rx_ip2 = rx_pkt.getlayer(IPv6, 2)
1845
1846         if tx_ip.segleft > 0:
1847             # SRH should NOT have been popped:
1848             #   End SID without PSP does not pop SRH if segleft>0
1849             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1850             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1851
1852             # received ip.src should be equal to expected ip.src
1853             self.assertEqual(rx_ip.src, tx_ip.src)
1854             # sidlist should be unchanged
1855             self.assertEqual(rx_srh.addresses, tx_srh.addresses)
1856             # segleft should have been decremented
1857             self.assertEqual(rx_srh.segleft, tx_srh.segleft - 1)
1858             # received ip.dst should be equal to sidlist[segleft]
1859             self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
1860             # lastentry should be unchanged
1861             self.assertEqual(rx_srh.lastentry, tx_srh.lastentry)
1862             # inner IPv6 packet (ip2) should be unchanged
1863             self.assertEqual(rx_ip2.src, tx_ip2.src)
1864             self.assertEqual(rx_ip2.dst, tx_ip2.dst)
1865         # else:  # tx_ip.segleft == 0
1866         # TODO: Does this work with 2 SRHs in ingress packet?
1867
1868         # UDP layer should be unchanged
1869         self.assertEqual(rx_udp, tx_udp)
1870
1871         self.logger.debug("packet verification: SUCCESS")
1872
1873     def compare_rx_tx_packet_End_PSP(self, tx_pkt, rx_pkt):
1874         """Compare input and output packet after passing End with PSP
1875
1876         :param tx_pkt: transmitted packet
1877         :param rx_pkt: received packet
1878         """
1879         # End (PSP) updates the headers as follows:
1880         # IPv6 + SRH (SL>1):
1881         # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2)
1882         # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1883         # IPv6 + SRH (SL=1):
1884         # in: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1885         # out: IPv6(A, S3)
1886
1887         # get first (outer) IPv6 header of rx'ed packet
1888         rx_ip = rx_pkt.getlayer(IPv6)
1889         rx_srh = None
1890         rx_ip2 = None
1891         rx_udp = rx_pkt[UDP]
1892
1893         tx_ip = tx_pkt.getlayer(IPv6)
1894         # we know the packet has been tx'ed
1895         # with an inner IPv6 header and an SRH
1896         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1897         tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1898         tx_udp = tx_pkt[UDP]
1899
1900         # common checks, regardless of tx segleft value
1901         self.assertTrue(rx_ip.payload.haslayer(IPv6))
1902         rx_ip2 = rx_pkt.getlayer(IPv6, 2)
1903         # inner IPv6 packet (ip2) should be unchanged
1904         self.assertEqual(rx_ip2.src, tx_ip2.src)
1905         self.assertEqual(rx_ip2.dst, tx_ip2.dst)
1906
1907         if tx_ip.segleft > 1:
1908             # SRH should NOT have been popped:
1909             #   End SID with PSP does not pop SRH if segleft>1
1910             # rx'ed packet should have SRH
1911             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1912             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1913
1914             # received ip.src should be equal to expected ip.src
1915             self.assertEqual(rx_ip.src, tx_ip.src)
1916             # sidlist should be unchanged
1917             self.assertEqual(rx_srh.addresses, tx_srh.addresses)
1918             # segleft should have been decremented
1919             self.assertEqual(rx_srh.segleft, tx_srh.segleft - 1)
1920             # received ip.dst should be equal to sidlist[segleft]
1921             self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
1922             # lastentry should be unchanged
1923             self.assertEqual(rx_srh.lastentry, tx_srh.lastentry)
1924
1925         else:  # tx_ip.segleft <= 1
1926             # SRH should have been popped:
1927             #   End SID with PSP and segleft=1 pops SRH
1928             # the two IPv6 headers are still present
1929             # outer IPv6 header has DA == last segment of popped SRH
1930             # SRH should not be present
1931             self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1932             # outer IPv6 header ip.src should be equal to tx'ed ip.src
1933             self.assertEqual(rx_ip.src, tx_ip.src)
1934             # outer IPv6 header ip.dst should be = to tx'ed sidlist[segleft-1]
1935             self.assertEqual(rx_ip.dst, tx_srh.addresses[tx_srh.segleft - 1])
1936
1937         # UDP layer should be unchanged
1938         self.assertEqual(rx_udp, tx_udp)
1939
1940         self.logger.debug("packet verification: SUCCESS")
1941
1942     def compare_rx_tx_packet_End_DX6(self, tx_pkt, rx_pkt):
1943         """Compare input and output packet after passing End.DX6
1944
1945         :param tx_pkt: transmitted packet
1946         :param rx_pkt: received packet
1947         """
1948         # End.DX6 updates the headers as follows:
1949         # IPv6 + SRH (SL=0):
1950         # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv6(B, D)
1951         # out: IPv6(B, D)
1952         # IPv6:
1953         # in: IPv6(A, S3)IPv6(B, D)
1954         # out: IPv6(B, D)
1955
1956         # get first (outer) IPv6 header of rx'ed packet
1957         rx_ip = rx_pkt.getlayer(IPv6)
1958
1959         tx_ip = tx_pkt.getlayer(IPv6)
1960         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1961
1962         # verify if rx'ed packet has no SRH
1963         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1964
1965         # the whole rx_ip pkt should be equal to tx_ip2
1966         # except for the hlim field
1967         #   -> adjust tx'ed hlim to expected hlim
1968         tx_ip2.hlim = tx_ip2.hlim - 1
1969
1970         self.assertEqual(rx_ip, tx_ip2)
1971
1972         self.logger.debug("packet verification: SUCCESS")
1973
1974     def compare_rx_tx_packet_End_DX4(self, tx_pkt, rx_pkt):
1975         """Compare input and output packet after passing End.DX4
1976
1977         :param tx_pkt: transmitted packet
1978         :param rx_pkt: received packet
1979         """
1980         # End.DX4 updates the headers as follows:
1981         # IPv6 + SRH (SL=0):
1982         # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv4(B, D)
1983         # out: IPv4(B, D)
1984         # IPv6:
1985         # in: IPv6(A, S3)IPv4(B, D)
1986         # out: IPv4(B, D)
1987
1988         # get IPv4 header of rx'ed packet
1989         rx_ip = rx_pkt.getlayer(IP)
1990
1991         tx_ip = tx_pkt.getlayer(IPv6)
1992         tx_ip2 = tx_pkt.getlayer(IP)
1993
1994         # verify if rx'ed packet has no SRH
1995         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1996
1997         # the whole rx_ip pkt should be equal to tx_ip2
1998         # except for the ttl field and ip checksum
1999         #   -> adjust tx'ed ttl to expected ttl
2000         tx_ip2.ttl = tx_ip2.ttl - 1
2001         #   -> set tx'ed ip checksum to None and let scapy recompute
2002         tx_ip2.chksum = None
2003         # read back the pkt (with str()) to force computing these fields
2004         # probably other ways to accomplish this are possible
2005         tx_ip2 = IP(scapy.compat.raw(tx_ip2))
2006
2007         self.assertEqual(rx_ip, tx_ip2)
2008
2009         self.logger.debug("packet verification: SUCCESS")
2010
2011     def compare_rx_tx_packet_End_DX2(self, tx_pkt, rx_pkt):
2012         """Compare input and output packet after passing End.DX2
2013
2014         :param tx_pkt: transmitted packet
2015         :param rx_pkt: received packet
2016         """
2017         # End.DX2 updates the headers as follows:
2018         # IPv6 + SRH (SL=0):
2019         # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)L2
2020         # out: L2
2021         # IPv6:
2022         # in: IPv6(A, S3)L2
2023         # out: L2
2024
2025         # get IPv4 header of rx'ed packet
2026         rx_eth = rx_pkt.getlayer(Ether)
2027
2028         tx_ip = tx_pkt.getlayer(IPv6)
2029         # we can't just get the 2nd Ether layer
2030         # get the Raw content and dissect it as Ether
2031         tx_eth1 = Ether(scapy.compat.raw(tx_pkt[Raw]))
2032
2033         # verify if rx'ed packet has no SRH
2034         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
2035
2036         # the whole rx_eth pkt should be equal to tx_eth1
2037         self.assertEqual(rx_eth, tx_eth1)
2038
2039         self.logger.debug("packet verification: SUCCESS")
2040
2041     def create_stream(self, src_if, dst_if, packet_header, packet_sizes, count):
2042         """Create SRv6 input packet stream for defined interface.
2043
2044         :param VppInterface src_if: Interface to create packet stream for
2045         :param VppInterface dst_if: destination interface of packet stream
2046         :param packet_header: Layer3 scapy packet headers,
2047                 L2 is added when not provided,
2048                 Raw(payload) with packet_info is added
2049         :param list packet_sizes: packet stream pckt sizes,sequentially applied
2050                to packets in stream have
2051         :param int count: number of packets in packet stream
2052         :return: list of packets
2053         """
2054         self.logger.info("Creating packets")
2055         pkts = []
2056         for i in range(0, count - 1):
2057             payload_info = self.create_packet_info(src_if, dst_if)
2058             self.logger.debug("Creating packet with index %d" % (payload_info.index))
2059             payload = self.info_to_payload(payload_info)
2060             # add L2 header if not yet provided in packet_header
2061             if packet_header.getlayer(0).name == "Ethernet":
2062                 p = packet_header / Raw(payload)
2063             else:
2064                 p = (
2065                     Ether(dst=src_if.local_mac, src=src_if.remote_mac)
2066                     / packet_header
2067                     / Raw(payload)
2068                 )
2069             size = packet_sizes[i % len(packet_sizes)]
2070             self.logger.debug("Packet size %d" % (size))
2071             self.extend_packet(p, size)
2072             # we need to store the packet with the automatic fields computed
2073             # read back the dumped packet (with str())
2074             # to force computing these fields
2075             # probably other ways are possible
2076             p = Ether(scapy.compat.raw(p))
2077             payload_info.data = p.copy()
2078             self.logger.debug(ppp("Created packet:", p))
2079             pkts.append(p)
2080         self.logger.info("Done creating packets")
2081         return pkts
2082
2083     def send_and_verify_pkts(
2084         self, input, pkts, output, compare_func, expected_count=None
2085     ):
2086         """Send packets and verify received packets using compare_func
2087
2088         :param input: ingress interface of DUT
2089         :param pkts: list of packets to transmit
2090         :param output: egress interface of DUT
2091         :param compare_func: function to compare in and out packets
2092         :param expected_count: expected number of captured packets (if
2093                different than len(pkts))
2094         """
2095         # add traffic stream to input interface
2096         input.add_stream(pkts)
2097
2098         # enable capture on all interfaces
2099         self.pg_enable_capture(self.pg_interfaces)
2100
2101         # start traffic
2102         self.logger.info("Starting traffic")
2103         self.pg_start()
2104
2105         # get output capture
2106         self.logger.info("Getting packet capture")
2107         capture = output.get_capture(expected_count=expected_count)
2108
2109         # assert nothing was captured on input interface
2110         input.assert_nothing_captured()
2111
2112         # verify captured packets
2113         self.verify_captured_pkts(output, capture, compare_func)
2114
2115     def create_packet_header_IPv6(self, dst):
2116         """Create packet header: IPv6 header, UDP header
2117
2118         :param dst: IPv6 destination address
2119
2120         IPv6 source address is 1234::1
2121         UDP source port and destination port are 1234
2122         """
2123
2124         p = IPv6(src="1234::1", dst=dst) / UDP(sport=1234, dport=1234)
2125         return p
2126
2127     def create_packet_header_IPv6_SRH(self, sidlist, segleft):
2128         """Create packet header: IPv6 header with SRH, UDP header
2129
2130         :param list sidlist: segment list
2131         :param int segleft: segments-left field value
2132
2133         IPv6 destination address is set to sidlist[segleft]
2134         IPv6 source addresses are 1234::1 and 4321::1
2135         UDP source port and destination port are 1234
2136         """
2137
2138         p = (
2139             IPv6(src="1234::1", dst=sidlist[segleft])
2140             / IPv6ExtHdrSegmentRouting(addresses=sidlist)
2141             / UDP(sport=1234, dport=1234)
2142         )
2143         return p
2144
2145     def create_packet_header_IPv6_SRH_IPv6(self, dst, sidlist, segleft):
2146         """Create packet header: IPv6 encapsulated in SRv6:
2147         IPv6 header with SRH, IPv6 header, UDP header
2148
2149         :param ipv6address dst: inner IPv6 destination address
2150         :param list sidlist: segment list of outer IPv6 SRH
2151         :param int segleft: segments-left field of outer IPv6 SRH
2152
2153         Outer IPv6 destination address is set to sidlist[segleft]
2154         IPv6 source addresses are 1234::1 and 4321::1
2155         UDP source port and destination port are 1234
2156         """
2157
2158         p = (
2159             IPv6(src="1234::1", dst=sidlist[segleft])
2160             / IPv6ExtHdrSegmentRouting(addresses=sidlist, segleft=segleft, nh=41)
2161             / IPv6(src="4321::1", dst=dst)
2162             / UDP(sport=1234, dport=1234)
2163         )
2164         return p
2165
2166     def create_packet_header_IPv6_IPv6(self, dst_inner, dst_outer):
2167         """Create packet header: IPv6 encapsulated in IPv6:
2168         IPv6 header, IPv6 header, UDP header
2169
2170         :param ipv6address dst_inner: inner IPv6 destination address
2171         :param ipv6address dst_outer: outer IPv6 destination address
2172
2173         IPv6 source addresses are 1234::1 and 4321::1
2174         UDP source port and destination port are 1234
2175         """
2176
2177         p = (
2178             IPv6(src="1234::1", dst=dst_outer)
2179             / IPv6(src="4321::1", dst=dst_inner)
2180             / UDP(sport=1234, dport=1234)
2181         )
2182         return p
2183
2184     def create_packet_header_IPv6_SRH_SRH_IPv6(
2185         self, dst, sidlist1, segleft1, sidlist2, segleft2
2186     ):
2187         """Create packet header: IPv6 encapsulated in SRv6 with 2 SRH:
2188         IPv6 header with SRH, 2nd SRH, IPv6 header, UDP header
2189
2190         :param ipv6address dst: inner IPv6 destination address
2191         :param list sidlist1: segment list of outer IPv6 SRH
2192         :param int segleft1: segments-left field of outer IPv6 SRH
2193         :param list sidlist2: segment list of inner IPv6 SRH
2194         :param int segleft2: segments-left field of inner IPv6 SRH
2195
2196         Outer IPv6 destination address is set to sidlist[segleft]
2197         IPv6 source addresses are 1234::1 and 4321::1
2198         UDP source port and destination port are 1234
2199         """
2200
2201         p = (
2202             IPv6(src="1234::1", dst=sidlist1[segleft1])
2203             / IPv6ExtHdrSegmentRouting(addresses=sidlist1, segleft=segleft1, nh=43)
2204             / IPv6ExtHdrSegmentRouting(addresses=sidlist2, segleft=segleft2, nh=41)
2205             / IPv6(src="4321::1", dst=dst)
2206             / UDP(sport=1234, dport=1234)
2207         )
2208         return p
2209
2210     def create_packet_header_IPv4(self, dst):
2211         """Create packet header: IPv4 header, UDP header
2212
2213         :param dst: IPv4 destination address
2214
2215         IPv4 source address is 123.1.1.1
2216         UDP source port and destination port are 1234
2217         """
2218
2219         p = IP(src="123.1.1.1", dst=dst) / UDP(sport=1234, dport=1234)
2220         return p
2221
2222     def create_packet_header_IPv6_IPv4(self, dst_inner, dst_outer):
2223         """Create packet header: IPv4 encapsulated in IPv6:
2224         IPv6 header, IPv4 header, UDP header
2225
2226         :param ipv4address dst_inner: inner IPv4 destination address
2227         :param ipv6address dst_outer: outer IPv6 destination address
2228
2229         IPv6 source address is 1234::1
2230         IPv4 source address is 123.1.1.1
2231         UDP source port and destination port are 1234
2232         """
2233
2234         p = (
2235             IPv6(src="1234::1", dst=dst_outer)
2236             / IP(src="123.1.1.1", dst=dst_inner)
2237             / UDP(sport=1234, dport=1234)
2238         )
2239         return p
2240
2241     def create_packet_header_IPv6_SRH_IPv4(self, dst, sidlist, segleft):
2242         """Create packet header: IPv4 encapsulated in SRv6:
2243         IPv6 header with SRH, IPv4 header, UDP header
2244
2245         :param ipv4address dst: inner IPv4 destination address
2246         :param list sidlist: segment list of outer IPv6 SRH
2247         :param int segleft: segments-left field of outer IPv6 SRH
2248
2249         Outer IPv6 destination address is set to sidlist[segleft]
2250         IPv6 source address is 1234::1
2251         IPv4 source address is 123.1.1.1
2252         UDP source port and destination port are 1234
2253         """
2254
2255         p = (
2256             IPv6(src="1234::1", dst=sidlist[segleft])
2257             / IPv6ExtHdrSegmentRouting(addresses=sidlist, segleft=segleft, nh=4)
2258             / IP(src="123.1.1.1", dst=dst)
2259             / UDP(sport=1234, dport=1234)
2260         )
2261         return p
2262
2263     def create_packet_header_L2(self, vlan=0):
2264         """Create packet header: L2 header
2265
2266         :param vlan: if vlan!=0 then add 802.1q header
2267         """
2268         # Note: the dst addr ('00:55:44:33:22:11') is used in
2269         # the compare function compare_rx_tx_packet_T_Encaps_L2
2270         # to detect presence of L2 in SRH payload
2271         p = Ether(src="00:11:22:33:44:55", dst="00:55:44:33:22:11")
2272         etype = 0x8137  # IPX
2273         if vlan:
2274             # add 802.1q layer
2275             p /= Dot1Q(vlan=vlan, type=etype)
2276         else:
2277             p.type = etype
2278         return p
2279
2280     def create_packet_header_IPv6_SRH_L2(self, sidlist, segleft, vlan=0):
2281         """Create packet header: L2 encapsulated in SRv6:
2282         IPv6 header with SRH, L2
2283
2284         :param list sidlist: segment list of outer IPv6 SRH
2285         :param int segleft: segments-left field of outer IPv6 SRH
2286         :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
2287
2288         Outer IPv6 destination address is set to sidlist[segleft]
2289         IPv6 source address is 1234::1
2290         """
2291         eth = Ether(src="00:11:22:33:44:55", dst="00:55:44:33:22:11")
2292         etype = 0x8137  # IPX
2293         if vlan:
2294             # add 802.1q layer
2295             eth /= Dot1Q(vlan=vlan, type=etype)
2296         else:
2297             eth.type = etype
2298
2299         p = (
2300             IPv6(src="1234::1", dst=sidlist[segleft])
2301             / IPv6ExtHdrSegmentRouting(addresses=sidlist, segleft=segleft, nh=143)
2302             / eth
2303         )
2304         return p
2305
2306     def create_packet_header_IPv6_L2(self, dst_outer, vlan=0):
2307         """Create packet header: L2 encapsulated in IPv6:
2308         IPv6 header, L2
2309
2310         :param ipv6address dst_outer: outer IPv6 destination address
2311         :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
2312         """
2313         eth = Ether(src="00:11:22:33:44:55", dst="00:55:44:33:22:11")
2314         etype = 0x8137  # IPX
2315         if vlan:
2316             # add 802.1q layer
2317             eth /= Dot1Q(vlan=vlan, type=etype)
2318         else:
2319             eth.type = etype
2320
2321         p = IPv6(src="1234::1", dst=dst_outer, nh=143) / eth
2322         return p
2323
2324     def get_payload_info(self, packet):
2325         """Extract the payload_info from the packet"""
2326         # in most cases, payload_info is in packet[Raw]
2327         # but packet[Raw] gives the complete payload
2328         # (incl L2 header) for the T.Encaps L2 case
2329         try:
2330             payload_info = self.payload_to_info(packet[Raw])
2331
2332         except:
2333             # remote L2 header from packet[Raw]:
2334             # take packet[Raw], convert it to an Ether layer
2335             # and then extract Raw from it
2336             payload_info = self.payload_to_info(Ether(scapy.compat.r(packet[Raw]))[Raw])
2337
2338         return payload_info
2339
2340     def verify_captured_pkts(self, dst_if, capture, compare_func):
2341         """
2342         Verify captured packet stream for specified interface.
2343         Compare ingress with egress packets using the specified compare fn
2344
2345         :param dst_if: egress interface of DUT
2346         :param capture: captured packets
2347         :param compare_func: function to compare in and out packet
2348         """
2349         self.logger.info(
2350             "Verifying capture on interface %s using function %s"
2351             % (dst_if.name, compare_func.__name__)
2352         )
2353
2354         last_info = dict()
2355         for i in self.pg_interfaces:
2356             last_info[i.sw_if_index] = None
2357         dst_sw_if_index = dst_if.sw_if_index
2358
2359         for packet in capture:
2360             try:
2361                 # extract payload_info from packet's payload
2362                 payload_info = self.get_payload_info(packet)
2363                 packet_index = payload_info.index
2364
2365                 self.logger.debug("Verifying packet with index %d" % (packet_index))
2366                 # packet should have arrived on the expected interface
2367                 self.assertEqual(payload_info.dst, dst_sw_if_index)
2368                 self.logger.debug(
2369                     "Got packet on interface %s: src=%u (idx=%u)"
2370                     % (dst_if.name, payload_info.src, packet_index)
2371                 )
2372
2373                 # search for payload_info with same src and dst if_index
2374                 # this will give us the transmitted packet
2375                 next_info = self.get_next_packet_info_for_interface2(
2376                     payload_info.src, dst_sw_if_index, last_info[payload_info.src]
2377                 )
2378                 last_info[payload_info.src] = next_info
2379                 # next_info should not be None
2380                 self.assertTrue(next_info is not None)
2381                 # index of tx and rx packets should be equal
2382                 self.assertEqual(packet_index, next_info.index)
2383                 # data field of next_info contains the tx packet
2384                 txed_packet = next_info.data
2385
2386                 self.logger.debug(
2387                     ppp("Transmitted packet:", txed_packet)
2388                 )  # ppp=Pretty Print Packet
2389
2390                 self.logger.debug(ppp("Received packet:", packet))
2391
2392                 # compare rcvd packet with expected packet using compare_func
2393                 compare_func(txed_packet, packet)
2394
2395             except:
2396                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2397                 raise
2398
2399         # FIXME: there is no need to check manually that all the packets
2400         #        arrived (already done so by get_capture); checking here
2401         #        prevents testing packets that are expected to be dropped, so
2402         #        commenting this out for now
2403
2404         # have all expected packets arrived?
2405         # for i in self.pg_interfaces:
2406         #    remaining_packet = self.get_next_packet_info_for_interface2(
2407         #        i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
2408         #    self.assertTrue(remaining_packet is None,
2409         #                    "Interface %s: Packet expected from interface %s "
2410         #                    "didn't arrive" % (dst_if.name, i.name))
2411
2412
2413 if __name__ == "__main__":
2414     unittest.main(testRunner=VppTestRunner)