ethernet: check destination mac for L3 in ethernet-input node
[vpp.git] / test / test_srv6.py
1 #!/usr/bin/env python3
2
3 import unittest
4 import binascii
5 from socket import AF_INET6
6
7 from framework import VppTestCase
8 from asfframework import VppTestRunner
9 from vpp_ip_route import VppIpRoute, VppRoutePath, VppIpTable
10 from vpp_srv6 import (
11     SRv6LocalSIDBehaviors,
12     VppSRv6LocalSID,
13     VppSRv6Policy,
14     VppSRv6PolicyV2,
15     SRv6PolicyType,
16     VppSRv6Steering,
17     SRv6PolicySteeringTypes,
18 )
19
20 import scapy.compat
21 from scapy.packet import Raw
22 from scapy.layers.l2 import Ether, Dot1Q
23 from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
24 from scapy.layers.inet import IP, UDP
25
26 from util import ppp
27
28
29 class TestSRv6(VppTestCase):
30     """SRv6 Test Case"""
31
32     @classmethod
33     def setUpClass(cls):
34         super(TestSRv6, cls).setUpClass()
35
36     @classmethod
37     def tearDownClass(cls):
38         super(TestSRv6, cls).tearDownClass()
39
40     def setUp(self):
41         """Perform test setup before each test case."""
42         super(TestSRv6, self).setUp()
43
44         # packet sizes, inclusive L2 overhead
45         self.pg_packet_sizes = [64, 512, 1518, 9018]
46
47         # reset packet_infos
48         self.reset_packet_infos()
49
50     def tearDown(self):
51         """Clean up test setup after each test case."""
52         self.teardown_interfaces()
53
54         super(TestSRv6, self).tearDown()
55
56     def configure_interface(
57         self, interface, ipv6=False, ipv4=False, ipv6_table_id=0, ipv4_table_id=0
58     ):
59         """Configure interface.
60         :param ipv6: configure IPv6 on interface
61         :param ipv4: configure IPv4 on interface
62         :param ipv6_table_id: FIB table_id for IPv6
63         :param ipv4_table_id: FIB table_id for IPv4
64         """
65         self.logger.debug("Configuring interface %s" % (interface.name))
66         if ipv6:
67             self.logger.debug("Configuring IPv6")
68             interface.set_table_ip6(ipv6_table_id)
69             interface.config_ip6()
70             interface.resolve_ndp(timeout=5)
71         if ipv4:
72             self.logger.debug("Configuring IPv4")
73             interface.set_table_ip4(ipv4_table_id)
74             interface.config_ip4()
75             interface.resolve_arp()
76         interface.admin_up()
77
78     def setup_interfaces(self, ipv6=[], ipv4=[], ipv6_table_id=[], ipv4_table_id=[]):
79         """Create and configure interfaces.
80
81         :param ipv6: list of interface IPv6 capabilities
82         :param ipv4: list of interface IPv4 capabilities
83         :param ipv6_table_id: list of intf IPv6 FIB table_ids
84         :param ipv4_table_id: list of intf IPv4 FIB table_ids
85         :returns: List of created interfaces.
86         """
87         # how many interfaces?
88         if len(ipv6):
89             count = len(ipv6)
90         else:
91             count = len(ipv4)
92         self.logger.debug("Creating and configuring %d interfaces" % (count))
93
94         # fill up ipv6 and ipv4 lists if needed
95         # not enabled (False) is the default
96         if len(ipv6) < count:
97             ipv6 += (count - len(ipv6)) * [False]
98         if len(ipv4) < count:
99             ipv4 += (count - len(ipv4)) * [False]
100
101         # fill up table_id lists if needed
102         # table_id 0 (global) is the default
103         if len(ipv6_table_id) < count:
104             ipv6_table_id += (count - len(ipv6_table_id)) * [0]
105         if len(ipv4_table_id) < count:
106             ipv4_table_id += (count - len(ipv4_table_id)) * [0]
107
108         # create 'count' pg interfaces
109         self.create_pg_interfaces(range(count))
110
111         # setup all interfaces
112         for i in range(count):
113             intf = self.pg_interfaces[i]
114             self.configure_interface(
115                 intf, ipv6[i], ipv4[i], ipv6_table_id[i], ipv4_table_id[i]
116             )
117
118         if any(ipv6):
119             self.logger.debug(self.vapi.cli("show ip6 neighbors"))
120         if any(ipv4):
121             self.logger.debug(self.vapi.cli("show ip4 neighbors"))
122         self.logger.debug(self.vapi.cli("show interface"))
123         self.logger.debug(self.vapi.cli("show hardware"))
124
125         return self.pg_interfaces
126
127     def teardown_interfaces(self):
128         """Unconfigure and bring down interface."""
129         self.logger.debug("Tearing down interfaces")
130         # tear down all interfaces
131         # AFAIK they cannot be deleted
132         for i in self.pg_interfaces:
133             self.logger.debug("Tear down interface %s" % (i.name))
134             i.admin_down()
135             i.unconfig()
136             i.set_table_ip4(0)
137             i.set_table_ip6(0)
138
139     @unittest.skipUnless(0, "PC to fix")
140     def test_SRv6_T_Encaps(self):
141         """Test SRv6 Transit.Encaps behavior for IPv6."""
142         # send traffic to one destination interface
143         # source and destination are IPv6 only
144         self.setup_interfaces(ipv6=[True, True])
145
146         # configure FIB entries
147         route = VppIpRoute(
148             self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
149         )
150         route.add_vpp_config()
151
152         # configure encaps IPv6 source address
153         # needs to be done before SR Policy config
154         # TODO: API?
155         self.vapi.cli("set sr encaps source addr a3::")
156
157         bsid = "a3::9999:1"
158         # configure SRv6 Policy
159         # Note: segment list order: first -> last
160         sr_policy = VppSRv6Policy(
161             self,
162             bsid=bsid,
163             is_encap=1,
164             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
165             weight=1,
166             fib_table=0,
167             segments=["a4::", "a5::", "a6::c7"],
168             source="a3::",
169         )
170         sr_policy.add_vpp_config()
171         self.sr_policy = sr_policy
172
173         # log the sr policies
174         self.logger.info(self.vapi.cli("show sr policies"))
175
176         # steer IPv6 traffic to a7::/64 into SRv6 Policy
177         # use the bsid of the above self.sr_policy
178         pol_steering = VppSRv6Steering(
179             self,
180             bsid=self.sr_policy.bsid,
181             prefix="a7::",
182             mask_width=64,
183             traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
184             sr_policy_index=0,
185             table_id=0,
186             sw_if_index=0,
187         )
188         pol_steering.add_vpp_config()
189
190         # log the sr steering policies
191         self.logger.info(self.vapi.cli("show sr steering policies"))
192
193         # create packets
194         count = len(self.pg_packet_sizes)
195         dst_inner = "a7::1234"
196         pkts = []
197
198         # create IPv6 packets without SRH
199         packet_header = self.create_packet_header_IPv6(dst_inner)
200         # create traffic stream pg0->pg1
201         pkts.extend(
202             self.create_stream(
203                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
204             )
205         )
206
207         # create IPv6 packets with SRH
208         # packets with segments-left 1, active segment a7::
209         packet_header = self.create_packet_header_IPv6_SRH(
210             sidlist=["a8::", "a7::", "a6::"], segleft=1
211         )
212         # create traffic stream pg0->pg1
213         pkts.extend(
214             self.create_stream(
215                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
216             )
217         )
218
219         # create IPv6 packets with SRH and IPv6
220         # packets with segments-left 1, active segment a7::
221         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
222             dst_inner, sidlist=["a8::", "a7::", "a6::"], segleft=1
223         )
224         # create traffic stream pg0->pg1
225         pkts.extend(
226             self.create_stream(
227                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
228             )
229         )
230
231         # send packets and verify received packets
232         self.send_and_verify_pkts(
233             self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_T_Encaps
234         )
235
236         # log the localsid counters
237         self.logger.info(self.vapi.cli("show sr localsid"))
238
239         # remove SR steering
240         pol_steering.remove_vpp_config()
241         self.logger.info(self.vapi.cli("show sr steering policies"))
242
243         # remove SR Policies
244         self.sr_policy.remove_vpp_config()
245         self.logger.info(self.vapi.cli("show sr policies"))
246
247         # remove FIB entries
248         # done by tearDown
249
250         # cleanup interfaces
251         self.teardown_interfaces()
252
253     def test_SRv6_T_Encaps_with_v6src(self):
254         """Test SRv6 Transit.Encaps behavior for IPv6 and select multiple src v6addr case."""
255         # send traffic to one destination interface
256         # source and destination are IPv6 only
257         self.setup_interfaces(ipv6=[True, True])
258
259         # configure FIB entries
260         route = VppIpRoute(
261             self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
262         )
263         route.add_vpp_config()
264
265         # configure encaps IPv6 source address
266         # needs to be done before SR Policy config
267         # TODO: API?
268         self.vapi.cli("set sr encaps source addr a3::")
269
270         bsid = "a3::9999:1"
271         other_src_ip = "b1::"
272         # configure SRv6 Policy
273         # Note: segment list order: first -> last
274         sr_policy = VppSRv6PolicyV2(
275             self,
276             bsid=bsid,
277             is_encap=1,
278             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
279             weight=1,
280             fib_table=0,
281             segments=["a4::", "a5::", "a6::c7"],
282             encap_src=other_src_ip,
283             source=other_src_ip,
284         )
285         sr_policy.add_vpp_config()
286         self.sr_policy = sr_policy
287
288         # log the sr policies
289         self.logger.info(self.vapi.cli("show sr policies"))
290
291         # steer IPv6 traffic to a7::/64 into SRv6 Policy
292         # use the bsid of the above self.sr_policy
293         pol_steering = VppSRv6Steering(
294             self,
295             bsid=self.sr_policy.bsid,
296             prefix="a7::",
297             mask_width=64,
298             traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
299             sr_policy_index=0,
300             table_id=0,
301             sw_if_index=0,
302         )
303         pol_steering.add_vpp_config()
304
305         # log the sr steering policies
306         self.logger.info(self.vapi.cli("show sr steering-policies"))
307
308         # create packets
309         count = len(self.pg_packet_sizes)
310         dst_inner = "a7::1234"
311         pkts = []
312
313         # create IPv6 packets without SRH
314         packet_header = self.create_packet_header_IPv6(dst_inner)
315         # create traffic stream pg0->pg1
316         pkts.extend(
317             self.create_stream(
318                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
319             )
320         )
321
322         # create IPv6 packets with SRH
323         # packets with segments-left 1, active segment a7::
324         packet_header = self.create_packet_header_IPv6_SRH(
325             sidlist=["a8::", "a7::", "a6::"], segleft=1
326         )
327         # create traffic stream pg0->pg1
328         pkts.extend(
329             self.create_stream(
330                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
331             )
332         )
333
334         # create IPv6 packets with SRH and IPv6
335         # packets with segments-left 1, active segment a7::
336         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
337             dst_inner, sidlist=["a8::", "a7::", "a6::"], segleft=1
338         )
339         # create traffic stream pg0->pg1
340         pkts.extend(
341             self.create_stream(
342                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
343             )
344         )
345
346         # send packets and verify received packets
347         self.send_and_verify_pkts(
348             self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_T_Encaps
349         )
350
351         # log the localsid counters
352         self.logger.info(self.vapi.cli("show sr localsid"))
353
354         # remove SR steering
355         pol_steering.remove_vpp_config()
356         self.logger.info(self.vapi.cli("show sr steering-policies"))
357
358         # remove SR Policies
359         self.sr_policy.remove_vpp_config()
360         self.logger.info(self.vapi.cli("show sr policies"))
361
362         # remove FIB entries
363         # done by tearDown
364
365         # cleanup interfaces
366         self.teardown_interfaces()
367
368     @unittest.skipUnless(0, "PC to fix")
369     def test_SRv6_T_Insert(self):
370         """Test SRv6 Transit.Insert behavior (IPv6 only)."""
371         # send traffic to one destination interface
372         # source and destination are IPv6 only
373         self.setup_interfaces(ipv6=[True, True])
374
375         # configure FIB entries
376         route = VppIpRoute(
377             self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
378         )
379         route.add_vpp_config()
380
381         # configure encaps IPv6 source address
382         # needs to be done before SR Policy config
383         # TODO: API?
384         self.vapi.cli("set sr encaps source addr a3::")
385
386         bsid = "a3::9999:1"
387         # configure SRv6 Policy
388         # Note: segment list order: first -> last
389         sr_policy = VppSRv6Policy(
390             self,
391             bsid=bsid,
392             is_encap=0,
393             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
394             weight=1,
395             fib_table=0,
396             segments=["a4::", "a5::", "a6::c7"],
397             source="a3::",
398         )
399         sr_policy.add_vpp_config()
400         self.sr_policy = sr_policy
401
402         # log the sr policies
403         self.logger.info(self.vapi.cli("show sr policies"))
404
405         # steer IPv6 traffic to a7::/64 into SRv6 Policy
406         # use the bsid of the above self.sr_policy
407         pol_steering = VppSRv6Steering(
408             self,
409             bsid=self.sr_policy.bsid,
410             prefix="a7::",
411             mask_width=64,
412             traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
413             sr_policy_index=0,
414             table_id=0,
415             sw_if_index=0,
416         )
417         pol_steering.add_vpp_config()
418
419         # log the sr steering policies
420         self.logger.info(self.vapi.cli("show sr steering policies"))
421
422         # create packets
423         count = len(self.pg_packet_sizes)
424         dst_inner = "a7::1234"
425         pkts = []
426
427         # create IPv6 packets without SRH
428         packet_header = self.create_packet_header_IPv6(dst_inner)
429         # create traffic stream pg0->pg1
430         pkts.extend(
431             self.create_stream(
432                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
433             )
434         )
435
436         # create IPv6 packets with SRH
437         # packets with segments-left 1, active segment a7::
438         packet_header = self.create_packet_header_IPv6_SRH(
439             sidlist=["a8::", "a7::", "a6::"], segleft=1
440         )
441         # create traffic stream pg0->pg1
442         pkts.extend(
443             self.create_stream(
444                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
445             )
446         )
447
448         # send packets and verify received packets
449         self.send_and_verify_pkts(
450             self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_T_Insert
451         )
452
453         # log the localsid counters
454         self.logger.info(self.vapi.cli("show sr localsid"))
455
456         # remove SR steering
457         pol_steering.remove_vpp_config()
458         self.logger.info(self.vapi.cli("show sr steering policies"))
459
460         # remove SR Policies
461         self.sr_policy.remove_vpp_config()
462         self.logger.info(self.vapi.cli("show sr policies"))
463
464         # remove FIB entries
465         # done by tearDown
466
467         # cleanup interfaces
468         self.teardown_interfaces()
469
470     @unittest.skipUnless(0, "PC to fix")
471     def test_SRv6_T_Encaps_IPv4(self):
472         """Test SRv6 Transit.Encaps behavior for IPv4."""
473         # send traffic to one destination interface
474         # source interface is IPv4 only
475         # destination interface is IPv6 only
476         self.setup_interfaces(ipv6=[False, True], ipv4=[True, False])
477
478         # configure FIB entries
479         route = VppIpRoute(
480             self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
481         )
482         route.add_vpp_config()
483
484         # configure encaps IPv6 source address
485         # needs to be done before SR Policy config
486         # TODO: API?
487         self.vapi.cli("set sr encaps source addr a3::")
488
489         bsid = "a3::9999:1"
490         # configure SRv6 Policy
491         # Note: segment list order: first -> last
492         sr_policy = VppSRv6Policy(
493             self,
494             bsid=bsid,
495             is_encap=1,
496             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
497             weight=1,
498             fib_table=0,
499             segments=["a4::", "a5::", "a6::c7"],
500             source="a3::",
501         )
502         sr_policy.add_vpp_config()
503         self.sr_policy = sr_policy
504
505         # log the sr policies
506         self.logger.info(self.vapi.cli("show sr policies"))
507
508         # steer IPv4 traffic to 7.1.1.0/24 into SRv6 Policy
509         # use the bsid of the above self.sr_policy
510         pol_steering = VppSRv6Steering(
511             self,
512             bsid=self.sr_policy.bsid,
513             prefix="7.1.1.0",
514             mask_width=24,
515             traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV4,
516             sr_policy_index=0,
517             table_id=0,
518             sw_if_index=0,
519         )
520         pol_steering.add_vpp_config()
521
522         # log the sr steering policies
523         self.logger.info(self.vapi.cli("show sr steering policies"))
524
525         # create packets
526         count = len(self.pg_packet_sizes)
527         dst_inner = "7.1.1.123"
528         pkts = []
529
530         # create IPv4 packets
531         packet_header = self.create_packet_header_IPv4(dst_inner)
532         # create traffic stream pg0->pg1
533         pkts.extend(
534             self.create_stream(
535                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
536             )
537         )
538
539         # send packets and verify received packets
540         self.send_and_verify_pkts(
541             self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_T_Encaps_IPv4
542         )
543
544         # log the localsid counters
545         self.logger.info(self.vapi.cli("show sr localsid"))
546
547         # remove SR steering
548         pol_steering.remove_vpp_config()
549         self.logger.info(self.vapi.cli("show sr steering policies"))
550
551         # remove SR Policies
552         self.sr_policy.remove_vpp_config()
553         self.logger.info(self.vapi.cli("show sr policies"))
554
555         # remove FIB entries
556         # done by tearDown
557
558         # cleanup interfaces
559         self.teardown_interfaces()
560
561     @unittest.skip("VPP crashes after running this test")
562     def test_SRv6_T_Encaps_L2(self):
563         """Test SRv6 Transit.Encaps behavior for L2."""
564         # send traffic to one destination interface
565         # source interface is IPv4 only TODO?
566         # destination interface is IPv6 only
567         self.setup_interfaces(ipv6=[False, True], ipv4=[False, False])
568
569         # configure FIB entries
570         route = VppIpRoute(
571             self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
572         )
573         route.add_vpp_config()
574
575         # configure encaps IPv6 source address
576         # needs to be done before SR Policy config
577         # TODO: API?
578         self.vapi.cli("set sr encaps source addr a3::")
579
580         bsid = "a3::9999:1"
581         # configure SRv6 Policy
582         # Note: segment list order: first -> last
583         sr_policy = VppSRv6Policy(
584             self,
585             bsid=bsid,
586             is_encap=1,
587             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
588             weight=1,
589             fib_table=0,
590             segments=["a4::", "a5::", "a6::c7"],
591             source="a3::",
592         )
593         sr_policy.add_vpp_config()
594         self.sr_policy = sr_policy
595
596         # log the sr policies
597         self.logger.info(self.vapi.cli("show sr policies"))
598
599         # steer L2 traffic into SRv6 Policy
600         # use the bsid of the above self.sr_policy
601         pol_steering = VppSRv6Steering(
602             self,
603             bsid=self.sr_policy.bsid,
604             prefix="::",
605             mask_width=0,
606             traffic_type=SRv6PolicySteeringTypes.SR_STEER_L2,
607             sr_policy_index=0,
608             table_id=0,
609             sw_if_index=self.pg0.sw_if_index,
610         )
611         pol_steering.add_vpp_config()
612
613         # log the sr steering policies
614         self.logger.info(self.vapi.cli("show sr steering policies"))
615
616         # create packets
617         count = len(self.pg_packet_sizes)
618         pkts = []
619
620         # create L2 packets without dot1q header
621         packet_header = self.create_packet_header_L2()
622         # create traffic stream pg0->pg1
623         pkts.extend(
624             self.create_stream(
625                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
626             )
627         )
628
629         # create L2 packets with dot1q header
630         packet_header = self.create_packet_header_L2(vlan=123)
631         # create traffic stream pg0->pg1
632         pkts.extend(
633             self.create_stream(
634                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
635             )
636         )
637
638         # send packets and verify received packets
639         self.send_and_verify_pkts(
640             self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_T_Encaps_L2
641         )
642
643         # log the localsid counters
644         self.logger.info(self.vapi.cli("show sr localsid"))
645
646         # remove SR steering
647         pol_steering.remove_vpp_config()
648         self.logger.info(self.vapi.cli("show sr steering policies"))
649
650         # remove SR Policies
651         self.sr_policy.remove_vpp_config()
652         self.logger.info(self.vapi.cli("show sr policies"))
653
654         # remove FIB entries
655         # done by tearDown
656
657         # cleanup interfaces
658         self.teardown_interfaces()
659
660     def test_SRv6_End(self):
661         """Test SRv6 End (without PSP) behavior."""
662         # send traffic to one destination interface
663         # source and destination interfaces are IPv6 only
664         self.setup_interfaces(ipv6=[True, True])
665
666         # configure FIB entries
667         route = VppIpRoute(
668             self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
669         )
670         route.add_vpp_config()
671
672         # configure SRv6 localSID End without PSP behavior
673         localsid = VppSRv6LocalSID(
674             self,
675             localsid="A3::0",
676             behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
677             nh_addr=0,
678             end_psp=0,
679             sw_if_index=0,
680             vlan_index=0,
681             fib_table=0,
682         )
683         localsid.add_vpp_config()
684         # log the localsids
685         self.logger.debug(self.vapi.cli("show sr localsid"))
686
687         # create IPv6 packets with SRH (SL=2, SL=1, SL=0)
688         # send one packet per SL value per packet size
689         # SL=0 packet with localSID End with USP needs 2nd SRH
690         count = len(self.pg_packet_sizes)
691         dst_inner = "a4::1234"
692         pkts = []
693
694         # packets with segments-left 2, active segment a3::
695         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
696             dst_inner, sidlist=["a5::", "a4::", "a3::"], segleft=2
697         )
698         # create traffic stream pg0->pg1
699         pkts.extend(
700             self.create_stream(
701                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
702             )
703         )
704
705         # packets with segments-left 1, active segment a3::
706         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
707             dst_inner, sidlist=["a4::", "a3::", "a2::"], segleft=1
708         )
709         # add to traffic stream pg0->pg1
710         pkts.extend(
711             self.create_stream(
712                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
713             )
714         )
715
716         # TODO: test behavior with SL=0 packet (needs 2*SRH?)
717
718         expected_count = len(pkts)
719
720         # packets without SRH (should not crash)
721         packet_header = self.create_packet_header_IPv6("a3::")
722         # create traffic stream pg0->pg1
723         pkts.extend(
724             self.create_stream(
725                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
726             )
727         )
728
729         # send packets and verify received packets
730         self.send_and_verify_pkts(
731             self.pg0,
732             pkts,
733             self.pg1,
734             self.compare_rx_tx_packet_End,
735             expected_count=expected_count,
736         )
737
738         # log the localsid counters
739         self.logger.info(self.vapi.cli("show sr localsid"))
740
741         # remove SRv6 localSIDs
742         localsid.remove_vpp_config()
743
744         # remove FIB entries
745         # done by tearDown
746
747         # cleanup interfaces
748         self.teardown_interfaces()
749
750     def test_SRv6_End_with_PSP(self):
751         """Test SRv6 End with PSP behavior."""
752         # send traffic to one destination interface
753         # source and destination interfaces are IPv6 only
754         self.setup_interfaces(ipv6=[True, True])
755
756         # configure FIB entries
757         route = VppIpRoute(
758             self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
759         )
760         route.add_vpp_config()
761
762         # configure SRv6 localSID End with PSP behavior
763         localsid = VppSRv6LocalSID(
764             self,
765             localsid="A3::0",
766             behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
767             nh_addr=0,
768             end_psp=1,
769             sw_if_index=0,
770             vlan_index=0,
771             fib_table=0,
772         )
773         localsid.add_vpp_config()
774         # log the localsids
775         self.logger.debug(self.vapi.cli("show sr localsid"))
776
777         # create IPv6 packets with SRH (SL=2, SL=1)
778         # send one packet per SL value per packet size
779         # SL=0 packet with localSID End with PSP is dropped
780         count = len(self.pg_packet_sizes)
781         dst_inner = "a4::1234"
782         pkts = []
783
784         # packets with segments-left 2, active segment a3::
785         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
786             dst_inner, sidlist=["a5::", "a4::", "a3::"], segleft=2
787         )
788         # create traffic stream pg0->pg1
789         pkts.extend(
790             self.create_stream(
791                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
792             )
793         )
794
795         # packets with segments-left 1, active segment a3::
796         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
797             dst_inner, sidlist=["a4::", "a3::", "a2::"], segleft=1
798         )
799         # add to traffic stream pg0->pg1
800         pkts.extend(
801             self.create_stream(
802                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
803             )
804         )
805
806         # send packets and verify received packets
807         self.send_and_verify_pkts(
808             self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_End_PSP
809         )
810
811         # log the localsid counters
812         self.logger.info(self.vapi.cli("show sr localsid"))
813
814         # remove SRv6 localSIDs
815         localsid.remove_vpp_config()
816
817         # remove FIB entries
818         # done by tearDown
819
820         # cleanup interfaces
821         self.teardown_interfaces()
822
823     def test_SRv6_End_X(self):
824         """Test SRv6 End.X (without PSP) behavior."""
825         # create three interfaces (1 source, 2 destinations)
826         # source and destination interfaces are IPv6 only
827         self.setup_interfaces(ipv6=[True, True, True])
828
829         # configure FIB entries
830         # a4::/64 via pg1 and pg2
831         route = VppIpRoute(
832             self,
833             "a4::",
834             64,
835             [
836                 VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index),
837                 VppRoutePath(self.pg2.remote_ip6, self.pg2.sw_if_index),
838             ],
839         )
840         route.add_vpp_config()
841         self.logger.debug(self.vapi.cli("show ip6 fib"))
842
843         # configure SRv6 localSID End.X without PSP behavior
844         # End.X points to interface pg1
845         localsid = VppSRv6LocalSID(
846             self,
847             localsid="A3::C4",
848             behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
849             nh_addr=self.pg1.remote_ip6,
850             end_psp=0,
851             sw_if_index=self.pg1.sw_if_index,
852             vlan_index=0,
853             fib_table=0,
854         )
855         localsid.add_vpp_config()
856         # log the localsids
857         self.logger.debug(self.vapi.cli("show sr localsid"))
858
859         # create IPv6 packets with SRH (SL=2, SL=1)
860         # send one packet per SL value per packet size
861         # SL=0 packet with localSID End with PSP is dropped
862         count = len(self.pg_packet_sizes)
863         dst_inner = "a4::1234"
864         pkts = []
865
866         # packets with segments-left 2, active segment a3::c4
867         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
868             dst_inner, sidlist=["a5::", "a4::", "a3::c4"], segleft=2
869         )
870         # create traffic stream pg0->pg1
871         pkts.extend(
872             self.create_stream(
873                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
874             )
875         )
876
877         # packets with segments-left 1, active segment a3::c4
878         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
879             dst_inner, sidlist=["a4::", "a3::c4", "a2::"], segleft=1
880         )
881         # add to traffic stream pg0->pg1
882         pkts.extend(
883             self.create_stream(
884                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
885             )
886         )
887
888         # send packets and verify received packets
889         # using same comparison function as End (no PSP)
890         self.send_and_verify_pkts(
891             self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_End
892         )
893
894         # assert nothing was received on the other interface (pg2)
895         self.pg2.assert_nothing_captured(remark="mis-directed packet(s)")
896
897         # log the localsid counters
898         self.logger.info(self.vapi.cli("show sr localsid"))
899
900         # remove SRv6 localSIDs
901         localsid.remove_vpp_config()
902
903         # remove FIB entries
904         # done by tearDown
905
906         # cleanup interfaces
907         self.teardown_interfaces()
908
909     def test_SRv6_End_X_with_PSP(self):
910         """Test SRv6 End.X with PSP behavior."""
911         # create three interfaces (1 source, 2 destinations)
912         # source and destination interfaces are IPv6 only
913         self.setup_interfaces(ipv6=[True, True, True])
914
915         # configure FIB entries
916         # a4::/64 via pg1 and pg2
917         route = VppIpRoute(
918             self,
919             "a4::",
920             64,
921             [
922                 VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index),
923                 VppRoutePath(self.pg2.remote_ip6, self.pg2.sw_if_index),
924             ],
925         )
926         route.add_vpp_config()
927
928         # configure SRv6 localSID End with PSP behavior
929         localsid = VppSRv6LocalSID(
930             self,
931             localsid="A3::C4",
932             behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
933             nh_addr=self.pg1.remote_ip6,
934             end_psp=1,
935             sw_if_index=self.pg1.sw_if_index,
936             vlan_index=0,
937             fib_table=0,
938         )
939         localsid.add_vpp_config()
940         # log the localsids
941         self.logger.debug(self.vapi.cli("show sr localsid"))
942
943         # create IPv6 packets with SRH (SL=2, SL=1)
944         # send one packet per SL value per packet size
945         # SL=0 packet with localSID End with PSP is dropped
946         count = len(self.pg_packet_sizes)
947         dst_inner = "a4::1234"
948         pkts = []
949
950         # packets with segments-left 2, active segment a3::
951         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
952             dst_inner, sidlist=["a5::", "a4::", "a3::c4"], segleft=2
953         )
954         # create traffic stream pg0->pg1
955         pkts.extend(
956             self.create_stream(
957                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
958             )
959         )
960
961         # packets with segments-left 1, active segment a3::
962         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
963             dst_inner, sidlist=["a4::", "a3::c4", "a2::"], segleft=1
964         )
965         # add to traffic stream pg0->pg1
966         pkts.extend(
967             self.create_stream(
968                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
969             )
970         )
971
972         # send packets and verify received packets
973         # using same comparison function as End with PSP
974         self.send_and_verify_pkts(
975             self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_End_PSP
976         )
977
978         # assert nothing was received on the other interface (pg2)
979         self.pg2.assert_nothing_captured(remark="mis-directed packet(s)")
980
981         # log the localsid counters
982         self.logger.info(self.vapi.cli("show sr localsid"))
983
984         # remove SRv6 localSIDs
985         localsid.remove_vpp_config()
986
987         # remove FIB entries
988         # done by tearDown
989
990         # cleanup interfaces
991         self.teardown_interfaces()
992
993     def test_SRv6_End_DX6(self):
994         """Test SRv6 End.DX6 behavior."""
995         # send traffic to one destination interface
996         # source and destination interfaces are IPv6 only
997         self.setup_interfaces(ipv6=[True, True])
998
999         # configure SRv6 localSID End.DX6 behavior
1000         localsid = VppSRv6LocalSID(
1001             self,
1002             localsid="A3::C4",
1003             behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX6,
1004             nh_addr=self.pg1.remote_ip6,
1005             end_psp=0,
1006             sw_if_index=self.pg1.sw_if_index,
1007             vlan_index=0,
1008             fib_table=0,
1009         )
1010         localsid.add_vpp_config()
1011         # log the localsids
1012         self.logger.debug(self.vapi.cli("show sr localsid"))
1013
1014         # create IPv6 packets with SRH (SL=0)
1015         # send one packet per packet size
1016         count = len(self.pg_packet_sizes)
1017         dst_inner = "a4::1234"  # inner header destination address
1018         pkts = []
1019
1020         # packets with SRH, segments-left 0, active segment a3::c4
1021         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
1022             dst_inner, sidlist=["a3::c4", "a2::", "a1::"], segleft=0
1023         )
1024         # add to traffic stream pg0->pg1
1025         pkts.extend(
1026             self.create_stream(
1027                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
1028             )
1029         )
1030
1031         # packets without SRH, IPv6 in IPv6
1032         # outer IPv6 dest addr is the localsid End.DX6
1033         packet_header = self.create_packet_header_IPv6_IPv6(
1034             dst_inner, dst_outer="a3::c4"
1035         )
1036         # add to traffic stream pg0->pg1
1037         pkts.extend(
1038             self.create_stream(
1039                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
1040             )
1041         )
1042
1043         # send packets and verify received packets
1044         self.send_and_verify_pkts(
1045             self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_End_DX6
1046         )
1047
1048         # log the localsid counters
1049         self.logger.info(self.vapi.cli("show sr localsid"))
1050
1051         # remove SRv6 localSIDs
1052         localsid.remove_vpp_config()
1053
1054         # cleanup interfaces
1055         self.teardown_interfaces()
1056
1057     def test_SRv6_End_DT6(self):
1058         """Test SRv6 End.DT6 behavior."""
1059         # create three interfaces (1 source, 2 destinations)
1060         # all interfaces are IPv6 only
1061         # source interface in global FIB (0)
1062         # destination interfaces in global and vrf
1063         vrf_1 = 1
1064         ipt = VppIpTable(self, vrf_1, is_ip6=True)
1065         ipt.add_vpp_config()
1066         self.setup_interfaces(ipv6=[True, True, True], ipv6_table_id=[0, 0, vrf_1])
1067
1068         # configure FIB entries
1069         # a4::/64 is reachable
1070         #     via pg1 in table 0 (global)
1071         #     and via pg2 in table vrf_1
1072         route0 = VppIpRoute(
1073             self,
1074             "a4::",
1075             64,
1076             [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index, nh_table_id=0)],
1077             table_id=0,
1078         )
1079         route0.add_vpp_config()
1080         route1 = VppIpRoute(
1081             self,
1082             "a4::",
1083             64,
1084             [
1085                 VppRoutePath(
1086                     self.pg2.remote_ip6, self.pg2.sw_if_index, nh_table_id=vrf_1
1087                 )
1088             ],
1089             table_id=vrf_1,
1090         )
1091         route1.add_vpp_config()
1092         self.logger.debug(self.vapi.cli("show ip6 fib"))
1093
1094         # configure SRv6 localSID End.DT6 behavior
1095         # Note:
1096         # fib_table: where the localsid is installed
1097         # sw_if_index: in T-variants of localsid this is the vrf table_id
1098         localsid = VppSRv6LocalSID(
1099             self,
1100             localsid="A3::C4",
1101             behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT6,
1102             nh_addr=0,
1103             end_psp=0,
1104             sw_if_index=vrf_1,
1105             vlan_index=0,
1106             fib_table=0,
1107         )
1108         localsid.add_vpp_config()
1109         # log the localsids
1110         self.logger.debug(self.vapi.cli("show sr localsid"))
1111
1112         # create IPv6 packets with SRH (SL=0)
1113         # send one packet per packet size
1114         count = len(self.pg_packet_sizes)
1115         dst_inner = "a4::1234"  # inner header destination address
1116         pkts = []
1117
1118         # packets with SRH, segments-left 0, active segment a3::c4
1119         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
1120             dst_inner, sidlist=["a3::c4", "a2::", "a1::"], segleft=0
1121         )
1122         # add to traffic stream pg0->pg1
1123         pkts.extend(
1124             self.create_stream(
1125                 self.pg0, self.pg2, packet_header, self.pg_packet_sizes, count
1126             )
1127         )
1128
1129         # packets without SRH, IPv6 in IPv6
1130         # outer IPv6 dest addr is the localsid End.DT6
1131         packet_header = self.create_packet_header_IPv6_IPv6(
1132             dst_inner, dst_outer="a3::c4"
1133         )
1134         # add to traffic stream pg0->pg1
1135         pkts.extend(
1136             self.create_stream(
1137                 self.pg0, self.pg2, packet_header, self.pg_packet_sizes, count
1138             )
1139         )
1140
1141         # send packets and verify received packets
1142         # using same comparison function as End.DX6
1143         self.send_and_verify_pkts(
1144             self.pg0, pkts, self.pg2, self.compare_rx_tx_packet_End_DX6
1145         )
1146
1147         # assert nothing was received on the other interface (pg2)
1148         self.pg1.assert_nothing_captured(remark="mis-directed packet(s)")
1149
1150         # log the localsid counters
1151         self.logger.info(self.vapi.cli("show sr localsid"))
1152
1153         # remove SRv6 localSIDs
1154         localsid.remove_vpp_config()
1155
1156         # remove FIB entries
1157         # done by tearDown
1158
1159         # cleanup interfaces
1160         self.teardown_interfaces()
1161
1162     def test_SRv6_End_DX4(self):
1163         """Test SRv6 End.DX4 behavior."""
1164         # send traffic to one destination interface
1165         # source interface is IPv6 only
1166         # destination interface is IPv4 only
1167         self.setup_interfaces(ipv6=[True, False], ipv4=[False, True])
1168
1169         # configure SRv6 localSID End.DX4 behavior
1170         localsid = VppSRv6LocalSID(
1171             self,
1172             localsid="A3::C4",
1173             behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX4,
1174             nh_addr=self.pg1.remote_ip4,
1175             end_psp=0,
1176             sw_if_index=self.pg1.sw_if_index,
1177             vlan_index=0,
1178             fib_table=0,
1179         )
1180         localsid.add_vpp_config()
1181         # log the localsids
1182         self.logger.debug(self.vapi.cli("show sr localsid"))
1183
1184         # send one packet per packet size
1185         count = len(self.pg_packet_sizes)
1186         dst_inner = "4.1.1.123"  # inner header destination address
1187         pkts = []
1188
1189         # packets with SRH, segments-left 0, active segment a3::c4
1190         packet_header = self.create_packet_header_IPv6_SRH_IPv4(
1191             dst_inner, sidlist=["a3::c4", "a2::", "a1::"], segleft=0
1192         )
1193         # add to traffic stream pg0->pg1
1194         pkts.extend(
1195             self.create_stream(
1196                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
1197             )
1198         )
1199
1200         # packets without SRH, IPv4 in IPv6
1201         # outer IPv6 dest addr is the localsid End.DX4
1202         packet_header = self.create_packet_header_IPv6_IPv4(
1203             dst_inner, dst_outer="a3::c4"
1204         )
1205         # add to traffic stream pg0->pg1
1206         pkts.extend(
1207             self.create_stream(
1208                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
1209             )
1210         )
1211
1212         # send packets and verify received packets
1213         self.send_and_verify_pkts(
1214             self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_End_DX4
1215         )
1216
1217         # log the localsid counters
1218         self.logger.info(self.vapi.cli("show sr localsid"))
1219
1220         # remove SRv6 localSIDs
1221         localsid.remove_vpp_config()
1222
1223         # cleanup interfaces
1224         self.teardown_interfaces()
1225
1226     def test_SRv6_End_DT4(self):
1227         """Test SRv6 End.DT4 behavior."""
1228         # create three interfaces (1 source, 2 destinations)
1229         # source interface is IPv6-only
1230         # destination interfaces are IPv4 only
1231         # source interface in global FIB (0)
1232         # destination interfaces in global and vrf
1233         vrf_1 = 1
1234         ipt = VppIpTable(self, vrf_1)
1235         ipt.add_vpp_config()
1236         self.setup_interfaces(
1237             ipv6=[True, False, False],
1238             ipv4=[False, True, True],
1239             ipv6_table_id=[0, 0, 0],
1240             ipv4_table_id=[0, 0, vrf_1],
1241         )
1242
1243         # configure FIB entries
1244         # 4.1.1.0/24 is reachable
1245         #     via pg1 in table 0 (global)
1246         #     and via pg2 in table vrf_1
1247         route0 = VppIpRoute(
1248             self,
1249             "4.1.1.0",
1250             24,
1251             [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index, nh_table_id=0)],
1252             table_id=0,
1253         )
1254         route0.add_vpp_config()
1255         route1 = VppIpRoute(
1256             self,
1257             "4.1.1.0",
1258             24,
1259             [
1260                 VppRoutePath(
1261                     self.pg2.remote_ip4, self.pg2.sw_if_index, nh_table_id=vrf_1
1262                 )
1263             ],
1264             table_id=vrf_1,
1265         )
1266         route1.add_vpp_config()
1267         self.logger.debug(self.vapi.cli("show ip fib"))
1268
1269         # configure SRv6 localSID End.DT6 behavior
1270         # Note:
1271         # fib_table: where the localsid is installed
1272         # sw_if_index: in T-variants of localsid: vrf table_id
1273         localsid = VppSRv6LocalSID(
1274             self,
1275             localsid="A3::C4",
1276             behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT4,
1277             nh_addr=0,
1278             end_psp=0,
1279             sw_if_index=vrf_1,
1280             vlan_index=0,
1281             fib_table=0,
1282         )
1283         localsid.add_vpp_config()
1284         # log the localsids
1285         self.logger.debug(self.vapi.cli("show sr localsid"))
1286
1287         # create IPv6 packets with SRH (SL=0)
1288         # send one packet per packet size
1289         count = len(self.pg_packet_sizes)
1290         dst_inner = "4.1.1.123"  # inner header destination address
1291         pkts = []
1292
1293         # packets with SRH, segments-left 0, active segment a3::c4
1294         packet_header = self.create_packet_header_IPv6_SRH_IPv4(
1295             dst_inner, sidlist=["a3::c4", "a2::", "a1::"], segleft=0
1296         )
1297         # add to traffic stream pg0->pg1
1298         pkts.extend(
1299             self.create_stream(
1300                 self.pg0, self.pg2, packet_header, self.pg_packet_sizes, count
1301             )
1302         )
1303
1304         # packets without SRH, IPv6 in IPv6
1305         # outer IPv6 dest addr is the localsid End.DX4
1306         packet_header = self.create_packet_header_IPv6_IPv4(
1307             dst_inner, dst_outer="a3::c4"
1308         )
1309         # add to traffic stream pg0->pg1
1310         pkts.extend(
1311             self.create_stream(
1312                 self.pg0, self.pg2, packet_header, self.pg_packet_sizes, count
1313             )
1314         )
1315
1316         # send packets and verify received packets
1317         # using same comparison function as End.DX4
1318         self.send_and_verify_pkts(
1319             self.pg0, pkts, self.pg2, self.compare_rx_tx_packet_End_DX4
1320         )
1321
1322         # assert nothing was received on the other interface (pg2)
1323         self.pg1.assert_nothing_captured(remark="mis-directed packet(s)")
1324
1325         # log the localsid counters
1326         self.logger.info(self.vapi.cli("show sr localsid"))
1327
1328         # remove SRv6 localSIDs
1329         localsid.remove_vpp_config()
1330
1331         # remove FIB entries
1332         # done by tearDown
1333
1334         # cleanup interfaces
1335         self.teardown_interfaces()
1336
1337     def test_SRv6_End_DX2(self):
1338         """Test SRv6 End.DX2 behavior."""
1339         # send traffic to one destination interface
1340         # source interface is IPv6 only
1341         self.setup_interfaces(ipv6=[True, False], ipv4=[False, False])
1342
1343         # configure SRv6 localSID End.DX2 behavior
1344         localsid = VppSRv6LocalSID(
1345             self,
1346             localsid="A3::C4",
1347             behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX2,
1348             nh_addr=0,
1349             end_psp=0,
1350             sw_if_index=self.pg1.sw_if_index,
1351             vlan_index=0,
1352             fib_table=0,
1353         )
1354         localsid.add_vpp_config()
1355         # log the localsids
1356         self.logger.debug(self.vapi.cli("show sr localsid"))
1357
1358         # send one packet per packet size
1359         count = len(self.pg_packet_sizes)
1360         pkts = []
1361
1362         # packets with SRH, segments-left 0, active segment a3::c4
1363         # L2 has no dot1q header
1364         packet_header = self.create_packet_header_IPv6_SRH_L2(
1365             sidlist=["a3::c4", "a2::", "a1::"], segleft=0, vlan=0
1366         )
1367         # add to traffic stream pg0->pg1
1368         pkts.extend(
1369             self.create_stream(
1370                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
1371             )
1372         )
1373
1374         # packets with SRH, segments-left 0, active segment a3::c4
1375         # L2 has dot1q header
1376         packet_header = self.create_packet_header_IPv6_SRH_L2(
1377             sidlist=["a3::c4", "a2::", "a1::"], segleft=0, vlan=123
1378         )
1379         # add to traffic stream pg0->pg1
1380         pkts.extend(
1381             self.create_stream(
1382                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
1383             )
1384         )
1385
1386         # packets without SRH, L2 in IPv6
1387         # outer IPv6 dest addr is the localsid End.DX2
1388         # L2 has no dot1q header
1389         packet_header = self.create_packet_header_IPv6_L2(dst_outer="a3::c4", vlan=0)
1390         # add to traffic stream pg0->pg1
1391         pkts.extend(
1392             self.create_stream(
1393                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
1394             )
1395         )
1396
1397         # packets without SRH, L2 in IPv6
1398         # outer IPv6 dest addr is the localsid End.DX2
1399         # L2 has dot1q header
1400         packet_header = self.create_packet_header_IPv6_L2(dst_outer="a3::c4", vlan=123)
1401         # add to traffic stream pg0->pg1
1402         pkts.extend(
1403             self.create_stream(
1404                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
1405             )
1406         )
1407
1408         # send packets and verify received packets
1409         self.send_and_verify_pkts(
1410             self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_End_DX2
1411         )
1412
1413         # log the localsid counters
1414         self.logger.info(self.vapi.cli("show sr localsid"))
1415
1416         # remove SRv6 localSIDs
1417         localsid.remove_vpp_config()
1418
1419         # cleanup interfaces
1420         self.teardown_interfaces()
1421
1422     @unittest.skipUnless(0, "PC to fix")
1423     def test_SRv6_T_Insert_Classifier(self):
1424         """Test SRv6 Transit.Insert behavior (IPv6 only).
1425         steer packets using the classifier
1426         """
1427         # send traffic to one destination interface
1428         # source and destination are IPv6 only
1429         self.setup_interfaces(ipv6=[False, False, False, True, True])
1430
1431         # configure FIB entries
1432         route = VppIpRoute(
1433             self, "a4::", 64, [VppRoutePath(self.pg4.remote_ip6, self.pg4.sw_if_index)]
1434         )
1435         route.add_vpp_config()
1436
1437         # configure encaps IPv6 source address
1438         # needs to be done before SR Policy config
1439         # TODO: API?
1440         self.vapi.cli("set sr encaps source addr a3::")
1441
1442         bsid = "a3::9999:1"
1443         # configure SRv6 Policy
1444         # Note: segment list order: first -> last
1445         sr_policy = VppSRv6Policy(
1446             self,
1447             bsid=bsid,
1448             is_encap=0,
1449             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
1450             weight=1,
1451             fib_table=0,
1452             segments=["a4::", "a5::", "a6::c7"],
1453             source="a3::",
1454         )
1455         sr_policy.add_vpp_config()
1456         self.sr_policy = sr_policy
1457
1458         # log the sr policies
1459         self.logger.info(self.vapi.cli("show sr policies"))
1460
1461         # add classify table
1462         # mask on dst ip address prefix a7::/8
1463         mask = "{!s:0<16}".format("ff")
1464         r = self.vapi.classify_add_del_table(
1465             1,
1466             binascii.unhexlify(mask),
1467             match_n_vectors=(len(mask) - 1) // 32 + 1,
1468             current_data_flag=1,
1469             skip_n_vectors=2,
1470         )  # data offset
1471         self.assertIsNotNone(r, "No response msg for add_del_table")
1472         table_index = r.new_table_index
1473
1474         # add the source routing node as a ip6 inacl netxt node
1475         r = self.vapi.add_node_next("ip6-inacl", "sr-pl-rewrite-insert")
1476         inacl_next_node_index = r.node_index
1477
1478         match = "{!s:0<16}".format("a7")
1479         r = self.vapi.classify_add_del_session(
1480             1,
1481             table_index,
1482             binascii.unhexlify(match),
1483             hit_next_index=inacl_next_node_index,
1484             action=3,
1485             metadata=0,
1486         )  # sr policy index
1487         self.assertIsNotNone(r, "No response msg for add_del_session")
1488
1489         # log the classify table used in the steering policy
1490         self.logger.info(self.vapi.cli("show classify table"))
1491
1492         r = self.vapi.input_acl_set_interface(
1493             is_add=1, sw_if_index=self.pg3.sw_if_index, ip6_table_index=table_index
1494         )
1495         self.assertIsNotNone(r, "No response msg for input_acl_set_interface")
1496
1497         # log the ip6 inacl
1498         self.logger.info(self.vapi.cli("show inacl type ip6"))
1499
1500         # create packets
1501         count = len(self.pg_packet_sizes)
1502         dst_inner = "a7::1234"
1503         pkts = []
1504
1505         # create IPv6 packets without SRH
1506         packet_header = self.create_packet_header_IPv6(dst_inner)
1507         # create traffic stream pg3->pg4
1508         pkts.extend(
1509             self.create_stream(
1510                 self.pg3, self.pg4, packet_header, self.pg_packet_sizes, count
1511             )
1512         )
1513
1514         # create IPv6 packets with SRH
1515         # packets with segments-left 1, active segment a7::
1516         packet_header = self.create_packet_header_IPv6_SRH(
1517             sidlist=["a8::", "a7::", "a6::"], segleft=1
1518         )
1519         # create traffic stream pg3->pg4
1520         pkts.extend(
1521             self.create_stream(
1522                 self.pg3, self.pg4, packet_header, self.pg_packet_sizes, count
1523             )
1524         )
1525
1526         # send packets and verify received packets
1527         self.send_and_verify_pkts(
1528             self.pg3, pkts, self.pg4, self.compare_rx_tx_packet_T_Insert
1529         )
1530
1531         # remove the interface l2 input feature
1532         r = self.vapi.input_acl_set_interface(
1533             is_add=0, sw_if_index=self.pg3.sw_if_index, ip6_table_index=table_index
1534         )
1535         self.assertIsNotNone(r, "No response msg for input_acl_set_interface")
1536
1537         # log the ip6 inacl after cleaning
1538         self.logger.info(self.vapi.cli("show inacl type ip6"))
1539
1540         # log the localsid counters
1541         self.logger.info(self.vapi.cli("show sr localsid"))
1542
1543         # remove classifier SR steering
1544         # classifier_steering.remove_vpp_config()
1545         self.logger.info(self.vapi.cli("show sr steering policies"))
1546
1547         # remove SR Policies
1548         self.sr_policy.remove_vpp_config()
1549         self.logger.info(self.vapi.cli("show sr policies"))
1550
1551         # remove classify session and table
1552         r = self.vapi.classify_add_del_session(
1553             0, table_index, binascii.unhexlify(match)
1554         )
1555         self.assertIsNotNone(r, "No response msg for add_del_session")
1556
1557         r = self.vapi.classify_add_del_table(
1558             0, binascii.unhexlify(mask), table_index=table_index
1559         )
1560         self.assertIsNotNone(r, "No response msg for add_del_table")
1561
1562         self.logger.info(self.vapi.cli("show classify table"))
1563
1564         # remove FIB entries
1565         # done by tearDown
1566
1567         # cleanup interfaces
1568         self.teardown_interfaces()
1569
1570     def compare_rx_tx_packet_T_Encaps(self, tx_pkt, rx_pkt):
1571         """Compare input and output packet after passing T.Encaps
1572
1573         :param tx_pkt: transmitted packet
1574         :param rx_pkt: received packet
1575         """
1576         # T.Encaps updates the headers as follows:
1577         # SR Policy seglist (S3, S2, S1)
1578         # SR Policy source C
1579         # IPv6:
1580         # in: IPv6(A, B2)
1581         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(A, B2)
1582         # IPv6 + SRH:
1583         # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1)
1584         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(a, B2)SRH(B3, B2, B1; SL=1)
1585
1586         # get first (outer) IPv6 header of rx'ed packet
1587         rx_ip = rx_pkt.getlayer(IPv6)
1588         rx_srh = None
1589
1590         tx_ip = tx_pkt.getlayer(IPv6)
1591
1592         # expected segment-list
1593         seglist = self.sr_policy.segments
1594         # reverse list to get order as in SRH
1595         tx_seglist = seglist[::-1]
1596
1597         # get source address of SR Policy
1598         sr_policy_source = self.sr_policy.source
1599
1600         # rx'ed packet should have SRH
1601         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1602         # get SRH
1603         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1604
1605         # received ip.src should be equal to SR Policy source
1606         self.assertEqual(rx_ip.src, sr_policy_source)
1607         # received ip.dst should be equal to expected sidlist[lastentry]
1608         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1609         # rx'ed seglist should be equal to expected seglist
1610         self.assertEqual(rx_srh.addresses, tx_seglist)
1611         # segleft should be equal to size expected seglist-1
1612         self.assertEqual(rx_srh.segleft, len(tx_seglist) - 1)
1613         # segleft should be equal to lastentry
1614         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1615
1616         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1617         # except for the hop-limit field
1618         #   -> update tx'ed hlim to the expected hlim
1619         tx_ip.hlim = tx_ip.hlim - 1
1620
1621         self.assertEqual(rx_srh.payload, tx_ip)
1622
1623         self.logger.debug("packet verification: SUCCESS")
1624
1625     def compare_rx_tx_packet_T_Encaps_IPv4(self, tx_pkt, rx_pkt):
1626         """Compare input and output packet after passing T.Encaps for IPv4
1627
1628         :param tx_pkt: transmitted packet
1629         :param rx_pkt: received packet
1630         """
1631         # T.Encaps for IPv4 updates the headers as follows:
1632         # SR Policy seglist (S3, S2, S1)
1633         # SR Policy source C
1634         # IPv4:
1635         # in: IPv4(A, B2)
1636         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv4(A, B2)
1637
1638         # get first (outer) IPv6 header of rx'ed packet
1639         rx_ip = rx_pkt.getlayer(IPv6)
1640         rx_srh = None
1641
1642         tx_ip = tx_pkt.getlayer(IP)
1643
1644         # expected segment-list
1645         seglist = self.sr_policy.segments
1646         # reverse list to get order as in SRH
1647         tx_seglist = seglist[::-1]
1648
1649         # get source address of SR Policy
1650         sr_policy_source = self.sr_policy.source
1651
1652         # checks common to cases tx with and without SRH
1653         # rx'ed packet should have SRH and IPv4 header
1654         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1655         self.assertTrue(rx_ip.payload.haslayer(IP))
1656         # get SRH
1657         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1658
1659         # received ip.src should be equal to SR Policy source
1660         self.assertEqual(rx_ip.src, sr_policy_source)
1661         # received ip.dst should be equal to sidlist[lastentry]
1662         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1663         # rx'ed seglist should be equal to seglist
1664         self.assertEqual(rx_srh.addresses, tx_seglist)
1665         # segleft should be equal to size seglist-1
1666         self.assertEqual(rx_srh.segleft, len(tx_seglist) - 1)
1667         # segleft should be equal to lastentry
1668         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1669
1670         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1671         # except for the ttl field and ip checksum
1672         #   -> adjust tx'ed ttl to expected ttl
1673         tx_ip.ttl = tx_ip.ttl - 1
1674         #   -> set tx'ed ip checksum to None and let scapy recompute
1675         tx_ip.chksum = None
1676         # read back the pkt (with str()) to force computing these fields
1677         # probably other ways to accomplish this are possible
1678         tx_ip = IP(scapy.compat.raw(tx_ip))
1679
1680         self.assertEqual(rx_srh.payload, tx_ip)
1681
1682         self.logger.debug("packet verification: SUCCESS")
1683
1684     def compare_rx_tx_packet_T_Encaps_L2(self, tx_pkt, rx_pkt):
1685         """Compare input and output packet after passing T.Encaps for L2
1686
1687         :param tx_pkt: transmitted packet
1688         :param rx_pkt: received packet
1689         """
1690         # T.Encaps for L2 updates the headers as follows:
1691         # SR Policy seglist (S3, S2, S1)
1692         # SR Policy source C
1693         # L2:
1694         # in: L2
1695         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)L2
1696
1697         # get first (outer) IPv6 header of rx'ed packet
1698         rx_ip = rx_pkt.getlayer(IPv6)
1699         rx_srh = None
1700
1701         tx_ether = tx_pkt.getlayer(Ether)
1702
1703         # expected segment-list
1704         seglist = self.sr_policy.segments
1705         # reverse list to get order as in SRH
1706         tx_seglist = seglist[::-1]
1707
1708         # get source address of SR Policy
1709         sr_policy_source = self.sr_policy.source
1710
1711         # rx'ed packet should have SRH
1712         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1713         # get SRH
1714         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1715
1716         # received ip.src should be equal to SR Policy source
1717         self.assertEqual(rx_ip.src, sr_policy_source)
1718         # received ip.dst should be equal to sidlist[lastentry]
1719         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1720         # rx'ed seglist should be equal to seglist
1721         self.assertEqual(rx_srh.addresses, tx_seglist)
1722         # segleft should be equal to size seglist-1
1723         self.assertEqual(rx_srh.segleft, len(tx_seglist) - 1)
1724         # segleft should be equal to lastentry
1725         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1726         # nh should be "No Next Header" (143)
1727         self.assertEqual(rx_srh.nh, 143)
1728
1729         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1730         self.assertEqual(Ether(scapy.compat.raw(rx_srh.payload)), tx_ether)
1731
1732         self.logger.debug("packet verification: SUCCESS")
1733
1734     def compare_rx_tx_packet_T_Insert(self, tx_pkt, rx_pkt):
1735         """Compare input and output packet after passing T.Insert
1736
1737         :param tx_pkt: transmitted packet
1738         :param rx_pkt: received packet
1739         """
1740         # T.Insert updates the headers as follows:
1741         # IPv6:
1742         # in: IPv6(A, B2)
1743         # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)
1744         # IPv6 + SRH:
1745         # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1)
1746         # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)SRH(B3, B2, B1; SL=1)
1747
1748         # get first (outer) IPv6 header of rx'ed packet
1749         rx_ip = rx_pkt.getlayer(IPv6)
1750         rx_srh = None
1751         rx_ip2 = None
1752         rx_srh2 = None
1753         rx_ip3 = None
1754         rx_udp = rx_pkt[UDP]
1755
1756         tx_ip = tx_pkt.getlayer(IPv6)
1757         tx_srh = None
1758         tx_ip2 = None
1759         # some packets have been tx'ed with an SRH, some without it
1760         # get SRH if tx'ed packet has it
1761         if tx_pkt.haslayer(IPv6ExtHdrSegmentRouting):
1762             tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1763             tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1764         tx_udp = tx_pkt[UDP]
1765
1766         # expected segment-list (make copy of SR Policy segment list)
1767         seglist = self.sr_policy.segments[:]
1768         # expected seglist has initial dest addr as last segment
1769         seglist.append(tx_ip.dst)
1770         # reverse list to get order as in SRH
1771         tx_seglist = seglist[::-1]
1772
1773         # get source address of SR Policy
1774         sr_policy_source = self.sr_policy.source
1775
1776         # checks common to cases tx with and without SRH
1777         # rx'ed packet should have SRH and only one IPv6 header
1778         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1779         self.assertFalse(rx_ip.payload.haslayer(IPv6))
1780         # get SRH
1781         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1782
1783         # rx'ed ip.src should be equal to tx'ed ip.src
1784         self.assertEqual(rx_ip.src, tx_ip.src)
1785         # rx'ed ip.dst should be equal to sidlist[lastentry]
1786         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1787
1788         # rx'ed seglist should be equal to expected seglist
1789         self.assertEqual(rx_srh.addresses, tx_seglist)
1790         # segleft should be equal to size(expected seglist)-1
1791         self.assertEqual(rx_srh.segleft, len(tx_seglist) - 1)
1792         # segleft should be equal to lastentry
1793         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1794
1795         if tx_srh:  # packet was tx'ed with SRH
1796             # packet should have 2nd SRH
1797             self.assertTrue(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting))
1798             # get 2nd SRH
1799             rx_srh2 = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting, 2)
1800
1801             # rx'ed srh2.addresses should be equal to tx'ed srh.addresses
1802             self.assertEqual(rx_srh2.addresses, tx_srh.addresses)
1803             # rx'ed srh2.segleft should be equal to tx'ed srh.segleft
1804             self.assertEqual(rx_srh2.segleft, tx_srh.segleft)
1805             # rx'ed srh2.lastentry should be equal to tx'ed srh.lastentry
1806             self.assertEqual(rx_srh2.lastentry, tx_srh.lastentry)
1807
1808         else:  # packet was tx'ed without SRH
1809             # rx packet should have no other SRH
1810             self.assertFalse(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting))
1811
1812         # UDP layer should be unchanged
1813         self.assertEqual(rx_udp, tx_udp)
1814
1815         self.logger.debug("packet verification: SUCCESS")
1816
1817     def compare_rx_tx_packet_End(self, tx_pkt, rx_pkt):
1818         """Compare input and output packet after passing End (without PSP)
1819
1820         :param tx_pkt: transmitted packet
1821         :param rx_pkt: received packet
1822         """
1823         # End (no PSP) updates the headers as follows:
1824         # IPv6 + SRH:
1825         # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2)
1826         # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1827
1828         # get first (outer) IPv6 header of rx'ed packet
1829         rx_ip = rx_pkt.getlayer(IPv6)
1830         rx_srh = None
1831         rx_ip2 = None
1832         rx_udp = rx_pkt[UDP]
1833
1834         tx_ip = tx_pkt.getlayer(IPv6)
1835         # we know the packet has been tx'ed
1836         # with an inner IPv6 header and an SRH
1837         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1838         tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1839         tx_udp = tx_pkt[UDP]
1840
1841         # common checks, regardless of tx segleft value
1842         # rx'ed packet should have 2nd IPv6 header
1843         self.assertTrue(rx_ip.payload.haslayer(IPv6))
1844         # get second (inner) IPv6 header
1845         rx_ip2 = rx_pkt.getlayer(IPv6, 2)
1846
1847         if tx_ip.segleft > 0:
1848             # SRH should NOT have been popped:
1849             #   End SID without PSP does not pop SRH if segleft>0
1850             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1851             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1852
1853             # received ip.src should be equal to expected ip.src
1854             self.assertEqual(rx_ip.src, tx_ip.src)
1855             # sidlist should be unchanged
1856             self.assertEqual(rx_srh.addresses, tx_srh.addresses)
1857             # segleft should have been decremented
1858             self.assertEqual(rx_srh.segleft, tx_srh.segleft - 1)
1859             # received ip.dst should be equal to sidlist[segleft]
1860             self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
1861             # lastentry should be unchanged
1862             self.assertEqual(rx_srh.lastentry, tx_srh.lastentry)
1863             # inner IPv6 packet (ip2) should be unchanged
1864             self.assertEqual(rx_ip2.src, tx_ip2.src)
1865             self.assertEqual(rx_ip2.dst, tx_ip2.dst)
1866         # else:  # tx_ip.segleft == 0
1867         # TODO: Does this work with 2 SRHs in ingress packet?
1868
1869         # UDP layer should be unchanged
1870         self.assertEqual(rx_udp, tx_udp)
1871
1872         self.logger.debug("packet verification: SUCCESS")
1873
1874     def compare_rx_tx_packet_End_PSP(self, tx_pkt, rx_pkt):
1875         """Compare input and output packet after passing End with PSP
1876
1877         :param tx_pkt: transmitted packet
1878         :param rx_pkt: received packet
1879         """
1880         # End (PSP) updates the headers as follows:
1881         # IPv6 + SRH (SL>1):
1882         # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2)
1883         # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1884         # IPv6 + SRH (SL=1):
1885         # in: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1886         # out: IPv6(A, S3)
1887
1888         # get first (outer) IPv6 header of rx'ed packet
1889         rx_ip = rx_pkt.getlayer(IPv6)
1890         rx_srh = None
1891         rx_ip2 = None
1892         rx_udp = rx_pkt[UDP]
1893
1894         tx_ip = tx_pkt.getlayer(IPv6)
1895         # we know the packet has been tx'ed
1896         # with an inner IPv6 header and an SRH
1897         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1898         tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1899         tx_udp = tx_pkt[UDP]
1900
1901         # common checks, regardless of tx segleft value
1902         self.assertTrue(rx_ip.payload.haslayer(IPv6))
1903         rx_ip2 = rx_pkt.getlayer(IPv6, 2)
1904         # inner IPv6 packet (ip2) should be unchanged
1905         self.assertEqual(rx_ip2.src, tx_ip2.src)
1906         self.assertEqual(rx_ip2.dst, tx_ip2.dst)
1907
1908         if tx_ip.segleft > 1:
1909             # SRH should NOT have been popped:
1910             #   End SID with PSP does not pop SRH if segleft>1
1911             # rx'ed packet should have SRH
1912             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1913             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1914
1915             # received ip.src should be equal to expected ip.src
1916             self.assertEqual(rx_ip.src, tx_ip.src)
1917             # sidlist should be unchanged
1918             self.assertEqual(rx_srh.addresses, tx_srh.addresses)
1919             # segleft should have been decremented
1920             self.assertEqual(rx_srh.segleft, tx_srh.segleft - 1)
1921             # received ip.dst should be equal to sidlist[segleft]
1922             self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
1923             # lastentry should be unchanged
1924             self.assertEqual(rx_srh.lastentry, tx_srh.lastentry)
1925
1926         else:  # tx_ip.segleft <= 1
1927             # SRH should have been popped:
1928             #   End SID with PSP and segleft=1 pops SRH
1929             # the two IPv6 headers are still present
1930             # outer IPv6 header has DA == last segment of popped SRH
1931             # SRH should not be present
1932             self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1933             # outer IPv6 header ip.src should be equal to tx'ed ip.src
1934             self.assertEqual(rx_ip.src, tx_ip.src)
1935             # outer IPv6 header ip.dst should be = to tx'ed sidlist[segleft-1]
1936             self.assertEqual(rx_ip.dst, tx_srh.addresses[tx_srh.segleft - 1])
1937
1938         # UDP layer should be unchanged
1939         self.assertEqual(rx_udp, tx_udp)
1940
1941         self.logger.debug("packet verification: SUCCESS")
1942
1943     def compare_rx_tx_packet_End_DX6(self, tx_pkt, rx_pkt):
1944         """Compare input and output packet after passing End.DX6
1945
1946         :param tx_pkt: transmitted packet
1947         :param rx_pkt: received packet
1948         """
1949         # End.DX6 updates the headers as follows:
1950         # IPv6 + SRH (SL=0):
1951         # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv6(B, D)
1952         # out: IPv6(B, D)
1953         # IPv6:
1954         # in: IPv6(A, S3)IPv6(B, D)
1955         # out: IPv6(B, D)
1956
1957         # get first (outer) IPv6 header of rx'ed packet
1958         rx_ip = rx_pkt.getlayer(IPv6)
1959
1960         tx_ip = tx_pkt.getlayer(IPv6)
1961         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1962
1963         # verify if rx'ed packet has no SRH
1964         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1965
1966         # the whole rx_ip pkt should be equal to tx_ip2
1967         # except for the hlim field
1968         #   -> adjust tx'ed hlim to expected hlim
1969         tx_ip2.hlim = tx_ip2.hlim - 1
1970
1971         self.assertEqual(rx_ip, tx_ip2)
1972
1973         self.logger.debug("packet verification: SUCCESS")
1974
1975     def compare_rx_tx_packet_End_DX4(self, tx_pkt, rx_pkt):
1976         """Compare input and output packet after passing End.DX4
1977
1978         :param tx_pkt: transmitted packet
1979         :param rx_pkt: received packet
1980         """
1981         # End.DX4 updates the headers as follows:
1982         # IPv6 + SRH (SL=0):
1983         # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv4(B, D)
1984         # out: IPv4(B, D)
1985         # IPv6:
1986         # in: IPv6(A, S3)IPv4(B, D)
1987         # out: IPv4(B, D)
1988
1989         # get IPv4 header of rx'ed packet
1990         rx_ip = rx_pkt.getlayer(IP)
1991
1992         tx_ip = tx_pkt.getlayer(IPv6)
1993         tx_ip2 = tx_pkt.getlayer(IP)
1994
1995         # verify if rx'ed packet has no SRH
1996         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1997
1998         # the whole rx_ip pkt should be equal to tx_ip2
1999         # except for the ttl field and ip checksum
2000         #   -> adjust tx'ed ttl to expected ttl
2001         tx_ip2.ttl = tx_ip2.ttl - 1
2002         #   -> set tx'ed ip checksum to None and let scapy recompute
2003         tx_ip2.chksum = None
2004         # read back the pkt (with str()) to force computing these fields
2005         # probably other ways to accomplish this are possible
2006         tx_ip2 = IP(scapy.compat.raw(tx_ip2))
2007
2008         self.assertEqual(rx_ip, tx_ip2)
2009
2010         self.logger.debug("packet verification: SUCCESS")
2011
2012     def compare_rx_tx_packet_End_DX2(self, tx_pkt, rx_pkt):
2013         """Compare input and output packet after passing End.DX2
2014
2015         :param tx_pkt: transmitted packet
2016         :param rx_pkt: received packet
2017         """
2018         # End.DX2 updates the headers as follows:
2019         # IPv6 + SRH (SL=0):
2020         # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)L2
2021         # out: L2
2022         # IPv6:
2023         # in: IPv6(A, S3)L2
2024         # out: L2
2025
2026         # get IPv4 header of rx'ed packet
2027         rx_eth = rx_pkt.getlayer(Ether)
2028
2029         tx_ip = tx_pkt.getlayer(IPv6)
2030         # we can't just get the 2nd Ether layer
2031         # get the Raw content and dissect it as Ether
2032         tx_eth1 = Ether(scapy.compat.raw(tx_pkt[Raw]))
2033
2034         # verify if rx'ed packet has no SRH
2035         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
2036
2037         # the whole rx_eth pkt should be equal to tx_eth1
2038         self.assertEqual(rx_eth, tx_eth1)
2039
2040         self.logger.debug("packet verification: SUCCESS")
2041
2042     def create_stream(self, src_if, dst_if, packet_header, packet_sizes, count):
2043         """Create SRv6 input packet stream for defined interface.
2044
2045         :param VppInterface src_if: Interface to create packet stream for
2046         :param VppInterface dst_if: destination interface of packet stream
2047         :param packet_header: Layer3 scapy packet headers,
2048                 L2 is added when not provided,
2049                 Raw(payload) with packet_info is added
2050         :param list packet_sizes: packet stream pckt sizes,sequentially applied
2051                to packets in stream have
2052         :param int count: number of packets in packet stream
2053         :return: list of packets
2054         """
2055         self.logger.info("Creating packets")
2056         pkts = []
2057         for i in range(0, count - 1):
2058             payload_info = self.create_packet_info(src_if, dst_if)
2059             self.logger.debug("Creating packet with index %d" % (payload_info.index))
2060             payload = self.info_to_payload(payload_info)
2061             # add L2 header if not yet provided in packet_header
2062             if packet_header.getlayer(0).name == "Ethernet":
2063                 p = packet_header / Raw(payload)
2064             else:
2065                 p = (
2066                     Ether(dst=src_if.local_mac, src=src_if.remote_mac)
2067                     / packet_header
2068                     / Raw(payload)
2069                 )
2070             size = packet_sizes[i % len(packet_sizes)]
2071             self.logger.debug("Packet size %d" % (size))
2072             self.extend_packet(p, size)
2073             # we need to store the packet with the automatic fields computed
2074             # read back the dumped packet (with str())
2075             # to force computing these fields
2076             # probably other ways are possible
2077             p = Ether(scapy.compat.raw(p))
2078             payload_info.data = p.copy()
2079             self.logger.debug(ppp("Created packet:", p))
2080             pkts.append(p)
2081         self.logger.info("Done creating packets")
2082         return pkts
2083
2084     def send_and_verify_pkts(
2085         self, input, pkts, output, compare_func, expected_count=None
2086     ):
2087         """Send packets and verify received packets using compare_func
2088
2089         :param input: ingress interface of DUT
2090         :param pkts: list of packets to transmit
2091         :param output: egress interface of DUT
2092         :param compare_func: function to compare in and out packets
2093         :param expected_count: expected number of captured packets (if
2094                different than len(pkts))
2095         """
2096         # add traffic stream to input interface
2097         input.add_stream(pkts)
2098
2099         # enable capture on all interfaces
2100         self.pg_enable_capture(self.pg_interfaces)
2101
2102         # start traffic
2103         self.logger.info("Starting traffic")
2104         self.pg_start()
2105
2106         # get output capture
2107         self.logger.info("Getting packet capture")
2108         capture = output.get_capture(expected_count=expected_count)
2109
2110         # assert nothing was captured on input interface
2111         input.assert_nothing_captured()
2112
2113         # verify captured packets
2114         self.verify_captured_pkts(output, capture, compare_func)
2115
2116     def create_packet_header_IPv6(self, dst):
2117         """Create packet header: IPv6 header, UDP header
2118
2119         :param dst: IPv6 destination address
2120
2121         IPv6 source address is 1234::1
2122         UDP source port and destination port are 1234
2123         """
2124
2125         p = IPv6(src="1234::1", dst=dst) / UDP(sport=1234, dport=1234)
2126         return p
2127
2128     def create_packet_header_IPv6_SRH(self, sidlist, segleft):
2129         """Create packet header: IPv6 header with SRH, UDP header
2130
2131         :param list sidlist: segment list
2132         :param int segleft: segments-left field value
2133
2134         IPv6 destination address is set to sidlist[segleft]
2135         IPv6 source addresses are 1234::1 and 4321::1
2136         UDP source port and destination port are 1234
2137         """
2138
2139         p = (
2140             IPv6(src="1234::1", dst=sidlist[segleft])
2141             / IPv6ExtHdrSegmentRouting(addresses=sidlist)
2142             / UDP(sport=1234, dport=1234)
2143         )
2144         return p
2145
2146     def create_packet_header_IPv6_SRH_IPv6(self, dst, sidlist, segleft):
2147         """Create packet header: IPv6 encapsulated in SRv6:
2148         IPv6 header with SRH, IPv6 header, UDP header
2149
2150         :param ipv6address dst: inner IPv6 destination address
2151         :param list sidlist: segment list of outer IPv6 SRH
2152         :param int segleft: segments-left field of outer IPv6 SRH
2153
2154         Outer IPv6 destination address is set to sidlist[segleft]
2155         IPv6 source addresses are 1234::1 and 4321::1
2156         UDP source port and destination port are 1234
2157         """
2158
2159         p = (
2160             IPv6(src="1234::1", dst=sidlist[segleft])
2161             / IPv6ExtHdrSegmentRouting(addresses=sidlist, segleft=segleft, nh=41)
2162             / IPv6(src="4321::1", dst=dst)
2163             / UDP(sport=1234, dport=1234)
2164         )
2165         return p
2166
2167     def create_packet_header_IPv6_IPv6(self, dst_inner, dst_outer):
2168         """Create packet header: IPv6 encapsulated in IPv6:
2169         IPv6 header, IPv6 header, UDP header
2170
2171         :param ipv6address dst_inner: inner IPv6 destination address
2172         :param ipv6address dst_outer: outer IPv6 destination address
2173
2174         IPv6 source addresses are 1234::1 and 4321::1
2175         UDP source port and destination port are 1234
2176         """
2177
2178         p = (
2179             IPv6(src="1234::1", dst=dst_outer)
2180             / IPv6(src="4321::1", dst=dst_inner)
2181             / UDP(sport=1234, dport=1234)
2182         )
2183         return p
2184
2185     def create_packet_header_IPv6_SRH_SRH_IPv6(
2186         self, dst, sidlist1, segleft1, sidlist2, segleft2
2187     ):
2188         """Create packet header: IPv6 encapsulated in SRv6 with 2 SRH:
2189         IPv6 header with SRH, 2nd SRH, IPv6 header, UDP header
2190
2191         :param ipv6address dst: inner IPv6 destination address
2192         :param list sidlist1: segment list of outer IPv6 SRH
2193         :param int segleft1: segments-left field of outer IPv6 SRH
2194         :param list sidlist2: segment list of inner IPv6 SRH
2195         :param int segleft2: segments-left field of inner IPv6 SRH
2196
2197         Outer IPv6 destination address is set to sidlist[segleft]
2198         IPv6 source addresses are 1234::1 and 4321::1
2199         UDP source port and destination port are 1234
2200         """
2201
2202         p = (
2203             IPv6(src="1234::1", dst=sidlist1[segleft1])
2204             / IPv6ExtHdrSegmentRouting(addresses=sidlist1, segleft=segleft1, nh=43)
2205             / IPv6ExtHdrSegmentRouting(addresses=sidlist2, segleft=segleft2, nh=41)
2206             / IPv6(src="4321::1", dst=dst)
2207             / UDP(sport=1234, dport=1234)
2208         )
2209         return p
2210
2211     def create_packet_header_IPv4(self, dst):
2212         """Create packet header: IPv4 header, UDP header
2213
2214         :param dst: IPv4 destination address
2215
2216         IPv4 source address is 123.1.1.1
2217         UDP source port and destination port are 1234
2218         """
2219
2220         p = IP(src="123.1.1.1", dst=dst) / UDP(sport=1234, dport=1234)
2221         return p
2222
2223     def create_packet_header_IPv6_IPv4(self, dst_inner, dst_outer):
2224         """Create packet header: IPv4 encapsulated in IPv6:
2225         IPv6 header, IPv4 header, UDP header
2226
2227         :param ipv4address dst_inner: inner IPv4 destination address
2228         :param ipv6address dst_outer: outer IPv6 destination address
2229
2230         IPv6 source address is 1234::1
2231         IPv4 source address is 123.1.1.1
2232         UDP source port and destination port are 1234
2233         """
2234
2235         p = (
2236             IPv6(src="1234::1", dst=dst_outer)
2237             / IP(src="123.1.1.1", dst=dst_inner)
2238             / UDP(sport=1234, dport=1234)
2239         )
2240         return p
2241
2242     def create_packet_header_IPv6_SRH_IPv4(self, dst, sidlist, segleft):
2243         """Create packet header: IPv4 encapsulated in SRv6:
2244         IPv6 header with SRH, IPv4 header, UDP header
2245
2246         :param ipv4address dst: inner IPv4 destination address
2247         :param list sidlist: segment list of outer IPv6 SRH
2248         :param int segleft: segments-left field of outer IPv6 SRH
2249
2250         Outer IPv6 destination address is set to sidlist[segleft]
2251         IPv6 source address is 1234::1
2252         IPv4 source address is 123.1.1.1
2253         UDP source port and destination port are 1234
2254         """
2255
2256         p = (
2257             IPv6(src="1234::1", dst=sidlist[segleft])
2258             / IPv6ExtHdrSegmentRouting(addresses=sidlist, segleft=segleft, nh=4)
2259             / IP(src="123.1.1.1", dst=dst)
2260             / UDP(sport=1234, dport=1234)
2261         )
2262         return p
2263
2264     def create_packet_header_L2(self, vlan=0):
2265         """Create packet header: L2 header
2266
2267         :param vlan: if vlan!=0 then add 802.1q header
2268         """
2269         # Note: the dst addr ('00:55:44:33:22:11') is used in
2270         # the compare function compare_rx_tx_packet_T_Encaps_L2
2271         # to detect presence of L2 in SRH payload
2272         p = Ether(src="00:11:22:33:44:55", dst="00:55:44:33:22:11")
2273         etype = 0x8137  # IPX
2274         if vlan:
2275             # add 802.1q layer
2276             p /= Dot1Q(vlan=vlan, type=etype)
2277         else:
2278             p.type = etype
2279         return p
2280
2281     def create_packet_header_IPv6_SRH_L2(self, sidlist, segleft, vlan=0):
2282         """Create packet header: L2 encapsulated in SRv6:
2283         IPv6 header with SRH, L2
2284
2285         :param list sidlist: segment list of outer IPv6 SRH
2286         :param int segleft: segments-left field of outer IPv6 SRH
2287         :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
2288
2289         Outer IPv6 destination address is set to sidlist[segleft]
2290         IPv6 source address is 1234::1
2291         """
2292         eth = Ether(src="00:11:22:33:44:55", dst="00:55:44:33:22:11")
2293         etype = 0x8137  # IPX
2294         if vlan:
2295             # add 802.1q layer
2296             eth /= Dot1Q(vlan=vlan, type=etype)
2297         else:
2298             eth.type = etype
2299
2300         p = (
2301             IPv6(src="1234::1", dst=sidlist[segleft])
2302             / IPv6ExtHdrSegmentRouting(addresses=sidlist, segleft=segleft, nh=143)
2303             / eth
2304         )
2305         return p
2306
2307     def create_packet_header_IPv6_L2(self, dst_outer, vlan=0):
2308         """Create packet header: L2 encapsulated in IPv6:
2309         IPv6 header, L2
2310
2311         :param ipv6address dst_outer: outer IPv6 destination address
2312         :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
2313         """
2314         eth = Ether(src="00:11:22:33:44:55", dst="00:55:44:33:22:11")
2315         etype = 0x8137  # IPX
2316         if vlan:
2317             # add 802.1q layer
2318             eth /= Dot1Q(vlan=vlan, type=etype)
2319         else:
2320             eth.type = etype
2321
2322         p = IPv6(src="1234::1", dst=dst_outer, nh=143) / eth
2323         return p
2324
2325     def get_payload_info(self, packet):
2326         """Extract the payload_info from the packet"""
2327         # in most cases, payload_info is in packet[Raw]
2328         # but packet[Raw] gives the complete payload
2329         # (incl L2 header) for the T.Encaps L2 case
2330         try:
2331             payload_info = self.payload_to_info(packet[Raw])
2332
2333         except:
2334             # remote L2 header from packet[Raw]:
2335             # take packet[Raw], convert it to an Ether layer
2336             # and then extract Raw from it
2337             payload_info = self.payload_to_info(Ether(scapy.compat.r(packet[Raw]))[Raw])
2338
2339         return payload_info
2340
2341     def verify_captured_pkts(self, dst_if, capture, compare_func):
2342         """
2343         Verify captured packet stream for specified interface.
2344         Compare ingress with egress packets using the specified compare fn
2345
2346         :param dst_if: egress interface of DUT
2347         :param capture: captured packets
2348         :param compare_func: function to compare in and out packet
2349         """
2350         self.logger.info(
2351             "Verifying capture on interface %s using function %s"
2352             % (dst_if.name, compare_func.__name__)
2353         )
2354
2355         last_info = dict()
2356         for i in self.pg_interfaces:
2357             last_info[i.sw_if_index] = None
2358         dst_sw_if_index = dst_if.sw_if_index
2359
2360         for packet in capture:
2361             try:
2362                 # extract payload_info from packet's payload
2363                 payload_info = self.get_payload_info(packet)
2364                 packet_index = payload_info.index
2365
2366                 self.logger.debug("Verifying packet with index %d" % (packet_index))
2367                 # packet should have arrived on the expected interface
2368                 self.assertEqual(payload_info.dst, dst_sw_if_index)
2369                 self.logger.debug(
2370                     "Got packet on interface %s: src=%u (idx=%u)"
2371                     % (dst_if.name, payload_info.src, packet_index)
2372                 )
2373
2374                 # search for payload_info with same src and dst if_index
2375                 # this will give us the transmitted packet
2376                 next_info = self.get_next_packet_info_for_interface2(
2377                     payload_info.src, dst_sw_if_index, last_info[payload_info.src]
2378                 )
2379                 last_info[payload_info.src] = next_info
2380                 # next_info should not be None
2381                 self.assertTrue(next_info is not None)
2382                 # index of tx and rx packets should be equal
2383                 self.assertEqual(packet_index, next_info.index)
2384                 # data field of next_info contains the tx packet
2385                 txed_packet = next_info.data
2386
2387                 self.logger.debug(
2388                     ppp("Transmitted packet:", txed_packet)
2389                 )  # ppp=Pretty Print Packet
2390
2391                 self.logger.debug(ppp("Received packet:", packet))
2392
2393                 # compare rcvd packet with expected packet using compare_func
2394                 compare_func(txed_packet, packet)
2395
2396             except:
2397                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2398                 raise
2399
2400         # FIXME: there is no need to check manually that all the packets
2401         #        arrived (already done so by get_capture); checking here
2402         #        prevents testing packets that are expected to be dropped, so
2403         #        commenting this out for now
2404
2405         # have all expected packets arrived?
2406         # for i in self.pg_interfaces:
2407         #    remaining_packet = self.get_next_packet_info_for_interface2(
2408         #        i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
2409         #    self.assertTrue(remaining_packet is None,
2410         #                    "Interface %s: Packet expected from interface %s "
2411         #                    "didn't arrive" % (dst_if.name, i.name))
2412
2413
2414 if __name__ == "__main__":
2415     unittest.main(testRunner=VppTestRunner)