tests: replace pycodestyle with black
[vpp.git] / test / test_srv6.py
1 #!/usr/bin/env python3
2
3 import unittest
4 import binascii
5 from socket import AF_INET6
6
7 from framework import VppTestCase, VppTestRunner
8 from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathProto, VppIpTable
9 from vpp_srv6 import (
10     SRv6LocalSIDBehaviors,
11     VppSRv6LocalSID,
12     VppSRv6Policy,
13     SRv6PolicyType,
14     VppSRv6Steering,
15     SRv6PolicySteeringTypes,
16 )
17
18 import scapy.compat
19 from scapy.packet import Raw
20 from scapy.layers.l2 import Ether, Dot1Q
21 from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
22 from scapy.layers.inet import IP, UDP
23
24 from util import ppp
25
26
27 class TestSRv6(VppTestCase):
28     """SRv6 Test Case"""
29
30     @classmethod
31     def setUpClass(cls):
32         super(TestSRv6, cls).setUpClass()
33
34     @classmethod
35     def tearDownClass(cls):
36         super(TestSRv6, cls).tearDownClass()
37
38     def setUp(self):
39         """Perform test setup before each test case."""
40         super(TestSRv6, self).setUp()
41
42         # packet sizes, inclusive L2 overhead
43         self.pg_packet_sizes = [64, 512, 1518, 9018]
44
45         # reset packet_infos
46         self.reset_packet_infos()
47
48     def tearDown(self):
49         """Clean up test setup after each test case."""
50         self.teardown_interfaces()
51
52         super(TestSRv6, self).tearDown()
53
54     def configure_interface(
55         self, interface, ipv6=False, ipv4=False, ipv6_table_id=0, ipv4_table_id=0
56     ):
57         """Configure interface.
58         :param ipv6: configure IPv6 on interface
59         :param ipv4: configure IPv4 on interface
60         :param ipv6_table_id: FIB table_id for IPv6
61         :param ipv4_table_id: FIB table_id for IPv4
62         """
63         self.logger.debug("Configuring interface %s" % (interface.name))
64         if ipv6:
65             self.logger.debug("Configuring IPv6")
66             interface.set_table_ip6(ipv6_table_id)
67             interface.config_ip6()
68             interface.resolve_ndp(timeout=5)
69         if ipv4:
70             self.logger.debug("Configuring IPv4")
71             interface.set_table_ip4(ipv4_table_id)
72             interface.config_ip4()
73             interface.resolve_arp()
74         interface.admin_up()
75
76     def setup_interfaces(self, ipv6=[], ipv4=[], ipv6_table_id=[], ipv4_table_id=[]):
77         """Create and configure interfaces.
78
79         :param ipv6: list of interface IPv6 capabilities
80         :param ipv4: list of interface IPv4 capabilities
81         :param ipv6_table_id: list of intf IPv6 FIB table_ids
82         :param ipv4_table_id: list of intf IPv4 FIB table_ids
83         :returns: List of created interfaces.
84         """
85         # how many interfaces?
86         if len(ipv6):
87             count = len(ipv6)
88         else:
89             count = len(ipv4)
90         self.logger.debug("Creating and configuring %d interfaces" % (count))
91
92         # fill up ipv6 and ipv4 lists if needed
93         # not enabled (False) is the default
94         if len(ipv6) < count:
95             ipv6 += (count - len(ipv6)) * [False]
96         if len(ipv4) < count:
97             ipv4 += (count - len(ipv4)) * [False]
98
99         # fill up table_id lists if needed
100         # table_id 0 (global) is the default
101         if len(ipv6_table_id) < count:
102             ipv6_table_id += (count - len(ipv6_table_id)) * [0]
103         if len(ipv4_table_id) < count:
104             ipv4_table_id += (count - len(ipv4_table_id)) * [0]
105
106         # create 'count' pg interfaces
107         self.create_pg_interfaces(range(count))
108
109         # setup all interfaces
110         for i in range(count):
111             intf = self.pg_interfaces[i]
112             self.configure_interface(
113                 intf, ipv6[i], ipv4[i], ipv6_table_id[i], ipv4_table_id[i]
114             )
115
116         if any(ipv6):
117             self.logger.debug(self.vapi.cli("show ip6 neighbors"))
118         if any(ipv4):
119             self.logger.debug(self.vapi.cli("show ip4 neighbors"))
120         self.logger.debug(self.vapi.cli("show interface"))
121         self.logger.debug(self.vapi.cli("show hardware"))
122
123         return self.pg_interfaces
124
125     def teardown_interfaces(self):
126         """Unconfigure and bring down interface."""
127         self.logger.debug("Tearing down interfaces")
128         # tear down all interfaces
129         # AFAIK they cannot be deleted
130         for i in self.pg_interfaces:
131             self.logger.debug("Tear down interface %s" % (i.name))
132             i.admin_down()
133             i.unconfig()
134             i.set_table_ip4(0)
135             i.set_table_ip6(0)
136
137     @unittest.skipUnless(0, "PC to fix")
138     def test_SRv6_T_Encaps(self):
139         """Test SRv6 Transit.Encaps behavior for IPv6."""
140         # send traffic to one destination interface
141         # source and destination are IPv6 only
142         self.setup_interfaces(ipv6=[True, True])
143
144         # configure FIB entries
145         route = VppIpRoute(
146             self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
147         )
148         route.add_vpp_config()
149
150         # configure encaps IPv6 source address
151         # needs to be done before SR Policy config
152         # TODO: API?
153         self.vapi.cli("set sr encaps source addr a3::")
154
155         bsid = "a3::9999:1"
156         # configure SRv6 Policy
157         # Note: segment list order: first -> last
158         sr_policy = VppSRv6Policy(
159             self,
160             bsid=bsid,
161             is_encap=1,
162             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
163             weight=1,
164             fib_table=0,
165             segments=["a4::", "a5::", "a6::c7"],
166             source="a3::",
167         )
168         sr_policy.add_vpp_config()
169         self.sr_policy = sr_policy
170
171         # log the sr policies
172         self.logger.info(self.vapi.cli("show sr policies"))
173
174         # steer IPv6 traffic to a7::/64 into SRv6 Policy
175         # use the bsid of the above self.sr_policy
176         pol_steering = VppSRv6Steering(
177             self,
178             bsid=self.sr_policy.bsid,
179             prefix="a7::",
180             mask_width=64,
181             traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
182             sr_policy_index=0,
183             table_id=0,
184             sw_if_index=0,
185         )
186         pol_steering.add_vpp_config()
187
188         # log the sr steering policies
189         self.logger.info(self.vapi.cli("show sr steering policies"))
190
191         # create packets
192         count = len(self.pg_packet_sizes)
193         dst_inner = "a7::1234"
194         pkts = []
195
196         # create IPv6 packets without SRH
197         packet_header = self.create_packet_header_IPv6(dst_inner)
198         # create traffic stream pg0->pg1
199         pkts.extend(
200             self.create_stream(
201                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
202             )
203         )
204
205         # create IPv6 packets with SRH
206         # packets with segments-left 1, active segment a7::
207         packet_header = self.create_packet_header_IPv6_SRH(
208             sidlist=["a8::", "a7::", "a6::"], segleft=1
209         )
210         # create traffic stream pg0->pg1
211         pkts.extend(
212             self.create_stream(
213                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
214             )
215         )
216
217         # create IPv6 packets with SRH and IPv6
218         # packets with segments-left 1, active segment a7::
219         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
220             dst_inner, sidlist=["a8::", "a7::", "a6::"], segleft=1
221         )
222         # create traffic stream pg0->pg1
223         pkts.extend(
224             self.create_stream(
225                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
226             )
227         )
228
229         # send packets and verify received packets
230         self.send_and_verify_pkts(
231             self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_T_Encaps
232         )
233
234         # log the localsid counters
235         self.logger.info(self.vapi.cli("show sr localsid"))
236
237         # remove SR steering
238         pol_steering.remove_vpp_config()
239         self.logger.info(self.vapi.cli("show sr steering policies"))
240
241         # remove SR Policies
242         self.sr_policy.remove_vpp_config()
243         self.logger.info(self.vapi.cli("show sr policies"))
244
245         # remove FIB entries
246         # done by tearDown
247
248         # cleanup interfaces
249         self.teardown_interfaces()
250
251     @unittest.skipUnless(0, "PC to fix")
252     def test_SRv6_T_Insert(self):
253         """Test SRv6 Transit.Insert behavior (IPv6 only)."""
254         # send traffic to one destination interface
255         # source and destination are IPv6 only
256         self.setup_interfaces(ipv6=[True, True])
257
258         # configure FIB entries
259         route = VppIpRoute(
260             self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
261         )
262         route.add_vpp_config()
263
264         # configure encaps IPv6 source address
265         # needs to be done before SR Policy config
266         # TODO: API?
267         self.vapi.cli("set sr encaps source addr a3::")
268
269         bsid = "a3::9999:1"
270         # configure SRv6 Policy
271         # Note: segment list order: first -> last
272         sr_policy = VppSRv6Policy(
273             self,
274             bsid=bsid,
275             is_encap=0,
276             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
277             weight=1,
278             fib_table=0,
279             segments=["a4::", "a5::", "a6::c7"],
280             source="a3::",
281         )
282         sr_policy.add_vpp_config()
283         self.sr_policy = sr_policy
284
285         # log the sr policies
286         self.logger.info(self.vapi.cli("show sr policies"))
287
288         # steer IPv6 traffic to a7::/64 into SRv6 Policy
289         # use the bsid of the above self.sr_policy
290         pol_steering = VppSRv6Steering(
291             self,
292             bsid=self.sr_policy.bsid,
293             prefix="a7::",
294             mask_width=64,
295             traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
296             sr_policy_index=0,
297             table_id=0,
298             sw_if_index=0,
299         )
300         pol_steering.add_vpp_config()
301
302         # log the sr steering policies
303         self.logger.info(self.vapi.cli("show sr steering policies"))
304
305         # create packets
306         count = len(self.pg_packet_sizes)
307         dst_inner = "a7::1234"
308         pkts = []
309
310         # create IPv6 packets without SRH
311         packet_header = self.create_packet_header_IPv6(dst_inner)
312         # create traffic stream pg0->pg1
313         pkts.extend(
314             self.create_stream(
315                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
316             )
317         )
318
319         # create IPv6 packets with SRH
320         # packets with segments-left 1, active segment a7::
321         packet_header = self.create_packet_header_IPv6_SRH(
322             sidlist=["a8::", "a7::", "a6::"], segleft=1
323         )
324         # create traffic stream pg0->pg1
325         pkts.extend(
326             self.create_stream(
327                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
328             )
329         )
330
331         # send packets and verify received packets
332         self.send_and_verify_pkts(
333             self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_T_Insert
334         )
335
336         # log the localsid counters
337         self.logger.info(self.vapi.cli("show sr localsid"))
338
339         # remove SR steering
340         pol_steering.remove_vpp_config()
341         self.logger.info(self.vapi.cli("show sr steering policies"))
342
343         # remove SR Policies
344         self.sr_policy.remove_vpp_config()
345         self.logger.info(self.vapi.cli("show sr policies"))
346
347         # remove FIB entries
348         # done by tearDown
349
350         # cleanup interfaces
351         self.teardown_interfaces()
352
353     @unittest.skipUnless(0, "PC to fix")
354     def test_SRv6_T_Encaps_IPv4(self):
355         """Test SRv6 Transit.Encaps behavior for IPv4."""
356         # send traffic to one destination interface
357         # source interface is IPv4 only
358         # destination interface is IPv6 only
359         self.setup_interfaces(ipv6=[False, True], ipv4=[True, False])
360
361         # configure FIB entries
362         route = VppIpRoute(
363             self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
364         )
365         route.add_vpp_config()
366
367         # configure encaps IPv6 source address
368         # needs to be done before SR Policy config
369         # TODO: API?
370         self.vapi.cli("set sr encaps source addr a3::")
371
372         bsid = "a3::9999:1"
373         # configure SRv6 Policy
374         # Note: segment list order: first -> last
375         sr_policy = VppSRv6Policy(
376             self,
377             bsid=bsid,
378             is_encap=1,
379             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
380             weight=1,
381             fib_table=0,
382             segments=["a4::", "a5::", "a6::c7"],
383             source="a3::",
384         )
385         sr_policy.add_vpp_config()
386         self.sr_policy = sr_policy
387
388         # log the sr policies
389         self.logger.info(self.vapi.cli("show sr policies"))
390
391         # steer IPv4 traffic to 7.1.1.0/24 into SRv6 Policy
392         # use the bsid of the above self.sr_policy
393         pol_steering = VppSRv6Steering(
394             self,
395             bsid=self.sr_policy.bsid,
396             prefix="7.1.1.0",
397             mask_width=24,
398             traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV4,
399             sr_policy_index=0,
400             table_id=0,
401             sw_if_index=0,
402         )
403         pol_steering.add_vpp_config()
404
405         # log the sr steering policies
406         self.logger.info(self.vapi.cli("show sr steering policies"))
407
408         # create packets
409         count = len(self.pg_packet_sizes)
410         dst_inner = "7.1.1.123"
411         pkts = []
412
413         # create IPv4 packets
414         packet_header = self.create_packet_header_IPv4(dst_inner)
415         # create traffic stream pg0->pg1
416         pkts.extend(
417             self.create_stream(
418                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
419             )
420         )
421
422         # send packets and verify received packets
423         self.send_and_verify_pkts(
424             self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_T_Encaps_IPv4
425         )
426
427         # log the localsid counters
428         self.logger.info(self.vapi.cli("show sr localsid"))
429
430         # remove SR steering
431         pol_steering.remove_vpp_config()
432         self.logger.info(self.vapi.cli("show sr steering policies"))
433
434         # remove SR Policies
435         self.sr_policy.remove_vpp_config()
436         self.logger.info(self.vapi.cli("show sr policies"))
437
438         # remove FIB entries
439         # done by tearDown
440
441         # cleanup interfaces
442         self.teardown_interfaces()
443
444     @unittest.skip("VPP crashes after running this test")
445     def test_SRv6_T_Encaps_L2(self):
446         """Test SRv6 Transit.Encaps behavior for L2."""
447         # send traffic to one destination interface
448         # source interface is IPv4 only TODO?
449         # destination interface is IPv6 only
450         self.setup_interfaces(ipv6=[False, True], ipv4=[False, False])
451
452         # configure FIB entries
453         route = VppIpRoute(
454             self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
455         )
456         route.add_vpp_config()
457
458         # configure encaps IPv6 source address
459         # needs to be done before SR Policy config
460         # TODO: API?
461         self.vapi.cli("set sr encaps source addr a3::")
462
463         bsid = "a3::9999:1"
464         # configure SRv6 Policy
465         # Note: segment list order: first -> last
466         sr_policy = VppSRv6Policy(
467             self,
468             bsid=bsid,
469             is_encap=1,
470             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
471             weight=1,
472             fib_table=0,
473             segments=["a4::", "a5::", "a6::c7"],
474             source="a3::",
475         )
476         sr_policy.add_vpp_config()
477         self.sr_policy = sr_policy
478
479         # log the sr policies
480         self.logger.info(self.vapi.cli("show sr policies"))
481
482         # steer L2 traffic into SRv6 Policy
483         # use the bsid of the above self.sr_policy
484         pol_steering = VppSRv6Steering(
485             self,
486             bsid=self.sr_policy.bsid,
487             prefix="::",
488             mask_width=0,
489             traffic_type=SRv6PolicySteeringTypes.SR_STEER_L2,
490             sr_policy_index=0,
491             table_id=0,
492             sw_if_index=self.pg0.sw_if_index,
493         )
494         pol_steering.add_vpp_config()
495
496         # log the sr steering policies
497         self.logger.info(self.vapi.cli("show sr steering policies"))
498
499         # create packets
500         count = len(self.pg_packet_sizes)
501         pkts = []
502
503         # create L2 packets without dot1q header
504         packet_header = self.create_packet_header_L2()
505         # create traffic stream pg0->pg1
506         pkts.extend(
507             self.create_stream(
508                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
509             )
510         )
511
512         # create L2 packets with dot1q header
513         packet_header = self.create_packet_header_L2(vlan=123)
514         # create traffic stream pg0->pg1
515         pkts.extend(
516             self.create_stream(
517                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
518             )
519         )
520
521         # send packets and verify received packets
522         self.send_and_verify_pkts(
523             self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_T_Encaps_L2
524         )
525
526         # log the localsid counters
527         self.logger.info(self.vapi.cli("show sr localsid"))
528
529         # remove SR steering
530         pol_steering.remove_vpp_config()
531         self.logger.info(self.vapi.cli("show sr steering policies"))
532
533         # remove SR Policies
534         self.sr_policy.remove_vpp_config()
535         self.logger.info(self.vapi.cli("show sr policies"))
536
537         # remove FIB entries
538         # done by tearDown
539
540         # cleanup interfaces
541         self.teardown_interfaces()
542
543     def test_SRv6_End(self):
544         """Test SRv6 End (without PSP) behavior."""
545         # send traffic to one destination interface
546         # source and destination interfaces are IPv6 only
547         self.setup_interfaces(ipv6=[True, True])
548
549         # configure FIB entries
550         route = VppIpRoute(
551             self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
552         )
553         route.add_vpp_config()
554
555         # configure SRv6 localSID End without PSP behavior
556         localsid = VppSRv6LocalSID(
557             self,
558             localsid="A3::0",
559             behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
560             nh_addr=0,
561             end_psp=0,
562             sw_if_index=0,
563             vlan_index=0,
564             fib_table=0,
565         )
566         localsid.add_vpp_config()
567         # log the localsids
568         self.logger.debug(self.vapi.cli("show sr localsid"))
569
570         # create IPv6 packets with SRH (SL=2, SL=1, SL=0)
571         # send one packet per SL value per packet size
572         # SL=0 packet with localSID End with USP needs 2nd SRH
573         count = len(self.pg_packet_sizes)
574         dst_inner = "a4::1234"
575         pkts = []
576
577         # packets with segments-left 2, active segment a3::
578         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
579             dst_inner, sidlist=["a5::", "a4::", "a3::"], segleft=2
580         )
581         # create traffic stream pg0->pg1
582         pkts.extend(
583             self.create_stream(
584                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
585             )
586         )
587
588         # packets with segments-left 1, active segment a3::
589         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
590             dst_inner, sidlist=["a4::", "a3::", "a2::"], segleft=1
591         )
592         # add to traffic stream pg0->pg1
593         pkts.extend(
594             self.create_stream(
595                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
596             )
597         )
598
599         # TODO: test behavior with SL=0 packet (needs 2*SRH?)
600
601         expected_count = len(pkts)
602
603         # packets without SRH (should not crash)
604         packet_header = self.create_packet_header_IPv6("a3::")
605         # create traffic stream pg0->pg1
606         pkts.extend(
607             self.create_stream(
608                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
609             )
610         )
611
612         # send packets and verify received packets
613         self.send_and_verify_pkts(
614             self.pg0,
615             pkts,
616             self.pg1,
617             self.compare_rx_tx_packet_End,
618             expected_count=expected_count,
619         )
620
621         # log the localsid counters
622         self.logger.info(self.vapi.cli("show sr localsid"))
623
624         # remove SRv6 localSIDs
625         localsid.remove_vpp_config()
626
627         # remove FIB entries
628         # done by tearDown
629
630         # cleanup interfaces
631         self.teardown_interfaces()
632
633     def test_SRv6_End_with_PSP(self):
634         """Test SRv6 End with PSP behavior."""
635         # send traffic to one destination interface
636         # source and destination interfaces are IPv6 only
637         self.setup_interfaces(ipv6=[True, True])
638
639         # configure FIB entries
640         route = VppIpRoute(
641             self, "a4::", 64, [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index)]
642         )
643         route.add_vpp_config()
644
645         # configure SRv6 localSID End with PSP behavior
646         localsid = VppSRv6LocalSID(
647             self,
648             localsid="A3::0",
649             behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
650             nh_addr=0,
651             end_psp=1,
652             sw_if_index=0,
653             vlan_index=0,
654             fib_table=0,
655         )
656         localsid.add_vpp_config()
657         # log the localsids
658         self.logger.debug(self.vapi.cli("show sr localsid"))
659
660         # create IPv6 packets with SRH (SL=2, SL=1)
661         # send one packet per SL value per packet size
662         # SL=0 packet with localSID End with PSP is dropped
663         count = len(self.pg_packet_sizes)
664         dst_inner = "a4::1234"
665         pkts = []
666
667         # packets with segments-left 2, active segment a3::
668         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
669             dst_inner, sidlist=["a5::", "a4::", "a3::"], segleft=2
670         )
671         # create traffic stream pg0->pg1
672         pkts.extend(
673             self.create_stream(
674                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
675             )
676         )
677
678         # packets with segments-left 1, active segment a3::
679         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
680             dst_inner, sidlist=["a4::", "a3::", "a2::"], segleft=1
681         )
682         # add to traffic stream pg0->pg1
683         pkts.extend(
684             self.create_stream(
685                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
686             )
687         )
688
689         # send packets and verify received packets
690         self.send_and_verify_pkts(
691             self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_End_PSP
692         )
693
694         # log the localsid counters
695         self.logger.info(self.vapi.cli("show sr localsid"))
696
697         # remove SRv6 localSIDs
698         localsid.remove_vpp_config()
699
700         # remove FIB entries
701         # done by tearDown
702
703         # cleanup interfaces
704         self.teardown_interfaces()
705
706     def test_SRv6_End_X(self):
707         """Test SRv6 End.X (without PSP) behavior."""
708         # create three interfaces (1 source, 2 destinations)
709         # source and destination interfaces are IPv6 only
710         self.setup_interfaces(ipv6=[True, True, True])
711
712         # configure FIB entries
713         # a4::/64 via pg1 and pg2
714         route = VppIpRoute(
715             self,
716             "a4::",
717             64,
718             [
719                 VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index),
720                 VppRoutePath(self.pg2.remote_ip6, self.pg2.sw_if_index),
721             ],
722         )
723         route.add_vpp_config()
724         self.logger.debug(self.vapi.cli("show ip6 fib"))
725
726         # configure SRv6 localSID End.X without PSP behavior
727         # End.X points to interface pg1
728         localsid = VppSRv6LocalSID(
729             self,
730             localsid="A3::C4",
731             behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
732             nh_addr=self.pg1.remote_ip6,
733             end_psp=0,
734             sw_if_index=self.pg1.sw_if_index,
735             vlan_index=0,
736             fib_table=0,
737         )
738         localsid.add_vpp_config()
739         # log the localsids
740         self.logger.debug(self.vapi.cli("show sr localsid"))
741
742         # create IPv6 packets with SRH (SL=2, SL=1)
743         # send one packet per SL value per packet size
744         # SL=0 packet with localSID End with PSP is dropped
745         count = len(self.pg_packet_sizes)
746         dst_inner = "a4::1234"
747         pkts = []
748
749         # packets with segments-left 2, active segment a3::c4
750         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
751             dst_inner, sidlist=["a5::", "a4::", "a3::c4"], segleft=2
752         )
753         # create traffic stream pg0->pg1
754         pkts.extend(
755             self.create_stream(
756                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
757             )
758         )
759
760         # packets with segments-left 1, active segment a3::c4
761         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
762             dst_inner, sidlist=["a4::", "a3::c4", "a2::"], segleft=1
763         )
764         # add to traffic stream pg0->pg1
765         pkts.extend(
766             self.create_stream(
767                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
768             )
769         )
770
771         # send packets and verify received packets
772         # using same comparison function as End (no PSP)
773         self.send_and_verify_pkts(
774             self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_End
775         )
776
777         # assert nothing was received on the other interface (pg2)
778         self.pg2.assert_nothing_captured(remark="mis-directed packet(s)")
779
780         # log the localsid counters
781         self.logger.info(self.vapi.cli("show sr localsid"))
782
783         # remove SRv6 localSIDs
784         localsid.remove_vpp_config()
785
786         # remove FIB entries
787         # done by tearDown
788
789         # cleanup interfaces
790         self.teardown_interfaces()
791
792     def test_SRv6_End_X_with_PSP(self):
793         """Test SRv6 End.X with PSP behavior."""
794         # create three interfaces (1 source, 2 destinations)
795         # source and destination interfaces are IPv6 only
796         self.setup_interfaces(ipv6=[True, True, True])
797
798         # configure FIB entries
799         # a4::/64 via pg1 and pg2
800         route = VppIpRoute(
801             self,
802             "a4::",
803             64,
804             [
805                 VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index),
806                 VppRoutePath(self.pg2.remote_ip6, self.pg2.sw_if_index),
807             ],
808         )
809         route.add_vpp_config()
810
811         # configure SRv6 localSID End with PSP behavior
812         localsid = VppSRv6LocalSID(
813             self,
814             localsid="A3::C4",
815             behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
816             nh_addr=self.pg1.remote_ip6,
817             end_psp=1,
818             sw_if_index=self.pg1.sw_if_index,
819             vlan_index=0,
820             fib_table=0,
821         )
822         localsid.add_vpp_config()
823         # log the localsids
824         self.logger.debug(self.vapi.cli("show sr localsid"))
825
826         # create IPv6 packets with SRH (SL=2, SL=1)
827         # send one packet per SL value per packet size
828         # SL=0 packet with localSID End with PSP is dropped
829         count = len(self.pg_packet_sizes)
830         dst_inner = "a4::1234"
831         pkts = []
832
833         # packets with segments-left 2, active segment a3::
834         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
835             dst_inner, sidlist=["a5::", "a4::", "a3::c4"], segleft=2
836         )
837         # create traffic stream pg0->pg1
838         pkts.extend(
839             self.create_stream(
840                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
841             )
842         )
843
844         # packets with segments-left 1, active segment a3::
845         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
846             dst_inner, sidlist=["a4::", "a3::c4", "a2::"], segleft=1
847         )
848         # add to traffic stream pg0->pg1
849         pkts.extend(
850             self.create_stream(
851                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
852             )
853         )
854
855         # send packets and verify received packets
856         # using same comparison function as End with PSP
857         self.send_and_verify_pkts(
858             self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_End_PSP
859         )
860
861         # assert nothing was received on the other interface (pg2)
862         self.pg2.assert_nothing_captured(remark="mis-directed packet(s)")
863
864         # log the localsid counters
865         self.logger.info(self.vapi.cli("show sr localsid"))
866
867         # remove SRv6 localSIDs
868         localsid.remove_vpp_config()
869
870         # remove FIB entries
871         # done by tearDown
872
873         # cleanup interfaces
874         self.teardown_interfaces()
875
876     def test_SRv6_End_DX6(self):
877         """Test SRv6 End.DX6 behavior."""
878         # send traffic to one destination interface
879         # source and destination interfaces are IPv6 only
880         self.setup_interfaces(ipv6=[True, True])
881
882         # configure SRv6 localSID End.DX6 behavior
883         localsid = VppSRv6LocalSID(
884             self,
885             localsid="A3::C4",
886             behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX6,
887             nh_addr=self.pg1.remote_ip6,
888             end_psp=0,
889             sw_if_index=self.pg1.sw_if_index,
890             vlan_index=0,
891             fib_table=0,
892         )
893         localsid.add_vpp_config()
894         # log the localsids
895         self.logger.debug(self.vapi.cli("show sr localsid"))
896
897         # create IPv6 packets with SRH (SL=0)
898         # send one packet per packet size
899         count = len(self.pg_packet_sizes)
900         dst_inner = "a4::1234"  # inner header destination address
901         pkts = []
902
903         # packets with SRH, segments-left 0, active segment a3::c4
904         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
905             dst_inner, sidlist=["a3::c4", "a2::", "a1::"], segleft=0
906         )
907         # add to traffic stream pg0->pg1
908         pkts.extend(
909             self.create_stream(
910                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
911             )
912         )
913
914         # packets without SRH, IPv6 in IPv6
915         # outer IPv6 dest addr is the localsid End.DX6
916         packet_header = self.create_packet_header_IPv6_IPv6(
917             dst_inner, dst_outer="a3::c4"
918         )
919         # add to traffic stream pg0->pg1
920         pkts.extend(
921             self.create_stream(
922                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
923             )
924         )
925
926         # send packets and verify received packets
927         self.send_and_verify_pkts(
928             self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_End_DX6
929         )
930
931         # log the localsid counters
932         self.logger.info(self.vapi.cli("show sr localsid"))
933
934         # remove SRv6 localSIDs
935         localsid.remove_vpp_config()
936
937         # cleanup interfaces
938         self.teardown_interfaces()
939
940     def test_SRv6_End_DT6(self):
941         """Test SRv6 End.DT6 behavior."""
942         # create three interfaces (1 source, 2 destinations)
943         # all interfaces are IPv6 only
944         # source interface in global FIB (0)
945         # destination interfaces in global and vrf
946         vrf_1 = 1
947         ipt = VppIpTable(self, vrf_1, is_ip6=True)
948         ipt.add_vpp_config()
949         self.setup_interfaces(ipv6=[True, True, True], ipv6_table_id=[0, 0, vrf_1])
950
951         # configure FIB entries
952         # a4::/64 is reachable
953         #     via pg1 in table 0 (global)
954         #     and via pg2 in table vrf_1
955         route0 = VppIpRoute(
956             self,
957             "a4::",
958             64,
959             [VppRoutePath(self.pg1.remote_ip6, self.pg1.sw_if_index, nh_table_id=0)],
960             table_id=0,
961         )
962         route0.add_vpp_config()
963         route1 = VppIpRoute(
964             self,
965             "a4::",
966             64,
967             [
968                 VppRoutePath(
969                     self.pg2.remote_ip6, self.pg2.sw_if_index, nh_table_id=vrf_1
970                 )
971             ],
972             table_id=vrf_1,
973         )
974         route1.add_vpp_config()
975         self.logger.debug(self.vapi.cli("show ip6 fib"))
976
977         # configure SRv6 localSID End.DT6 behavior
978         # Note:
979         # fib_table: where the localsid is installed
980         # sw_if_index: in T-variants of localsid this is the vrf table_id
981         localsid = VppSRv6LocalSID(
982             self,
983             localsid="A3::C4",
984             behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT6,
985             nh_addr=0,
986             end_psp=0,
987             sw_if_index=vrf_1,
988             vlan_index=0,
989             fib_table=0,
990         )
991         localsid.add_vpp_config()
992         # log the localsids
993         self.logger.debug(self.vapi.cli("show sr localsid"))
994
995         # create IPv6 packets with SRH (SL=0)
996         # send one packet per packet size
997         count = len(self.pg_packet_sizes)
998         dst_inner = "a4::1234"  # inner header destination address
999         pkts = []
1000
1001         # packets with SRH, segments-left 0, active segment a3::c4
1002         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
1003             dst_inner, sidlist=["a3::c4", "a2::", "a1::"], segleft=0
1004         )
1005         # add to traffic stream pg0->pg1
1006         pkts.extend(
1007             self.create_stream(
1008                 self.pg0, self.pg2, packet_header, self.pg_packet_sizes, count
1009             )
1010         )
1011
1012         # packets without SRH, IPv6 in IPv6
1013         # outer IPv6 dest addr is the localsid End.DT6
1014         packet_header = self.create_packet_header_IPv6_IPv6(
1015             dst_inner, dst_outer="a3::c4"
1016         )
1017         # add to traffic stream pg0->pg1
1018         pkts.extend(
1019             self.create_stream(
1020                 self.pg0, self.pg2, packet_header, self.pg_packet_sizes, count
1021             )
1022         )
1023
1024         # send packets and verify received packets
1025         # using same comparison function as End.DX6
1026         self.send_and_verify_pkts(
1027             self.pg0, pkts, self.pg2, self.compare_rx_tx_packet_End_DX6
1028         )
1029
1030         # assert nothing was received on the other interface (pg2)
1031         self.pg1.assert_nothing_captured(remark="mis-directed packet(s)")
1032
1033         # log the localsid counters
1034         self.logger.info(self.vapi.cli("show sr localsid"))
1035
1036         # remove SRv6 localSIDs
1037         localsid.remove_vpp_config()
1038
1039         # remove FIB entries
1040         # done by tearDown
1041
1042         # cleanup interfaces
1043         self.teardown_interfaces()
1044
1045     def test_SRv6_End_DX4(self):
1046         """Test SRv6 End.DX4 behavior."""
1047         # send traffic to one destination interface
1048         # source interface is IPv6 only
1049         # destination interface is IPv4 only
1050         self.setup_interfaces(ipv6=[True, False], ipv4=[False, True])
1051
1052         # configure SRv6 localSID End.DX4 behavior
1053         localsid = VppSRv6LocalSID(
1054             self,
1055             localsid="A3::C4",
1056             behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX4,
1057             nh_addr=self.pg1.remote_ip4,
1058             end_psp=0,
1059             sw_if_index=self.pg1.sw_if_index,
1060             vlan_index=0,
1061             fib_table=0,
1062         )
1063         localsid.add_vpp_config()
1064         # log the localsids
1065         self.logger.debug(self.vapi.cli("show sr localsid"))
1066
1067         # send one packet per packet size
1068         count = len(self.pg_packet_sizes)
1069         dst_inner = "4.1.1.123"  # inner header destination address
1070         pkts = []
1071
1072         # packets with SRH, segments-left 0, active segment a3::c4
1073         packet_header = self.create_packet_header_IPv6_SRH_IPv4(
1074             dst_inner, sidlist=["a3::c4", "a2::", "a1::"], segleft=0
1075         )
1076         # add to traffic stream pg0->pg1
1077         pkts.extend(
1078             self.create_stream(
1079                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
1080             )
1081         )
1082
1083         # packets without SRH, IPv4 in IPv6
1084         # outer IPv6 dest addr is the localsid End.DX4
1085         packet_header = self.create_packet_header_IPv6_IPv4(
1086             dst_inner, dst_outer="a3::c4"
1087         )
1088         # add to traffic stream pg0->pg1
1089         pkts.extend(
1090             self.create_stream(
1091                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
1092             )
1093         )
1094
1095         # send packets and verify received packets
1096         self.send_and_verify_pkts(
1097             self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_End_DX4
1098         )
1099
1100         # log the localsid counters
1101         self.logger.info(self.vapi.cli("show sr localsid"))
1102
1103         # remove SRv6 localSIDs
1104         localsid.remove_vpp_config()
1105
1106         # cleanup interfaces
1107         self.teardown_interfaces()
1108
1109     def test_SRv6_End_DT4(self):
1110         """Test SRv6 End.DT4 behavior."""
1111         # create three interfaces (1 source, 2 destinations)
1112         # source interface is IPv6-only
1113         # destination interfaces are IPv4 only
1114         # source interface in global FIB (0)
1115         # destination interfaces in global and vrf
1116         vrf_1 = 1
1117         ipt = VppIpTable(self, vrf_1)
1118         ipt.add_vpp_config()
1119         self.setup_interfaces(
1120             ipv6=[True, False, False],
1121             ipv4=[False, True, True],
1122             ipv6_table_id=[0, 0, 0],
1123             ipv4_table_id=[0, 0, vrf_1],
1124         )
1125
1126         # configure FIB entries
1127         # 4.1.1.0/24 is reachable
1128         #     via pg1 in table 0 (global)
1129         #     and via pg2 in table vrf_1
1130         route0 = VppIpRoute(
1131             self,
1132             "4.1.1.0",
1133             24,
1134             [VppRoutePath(self.pg1.remote_ip4, self.pg1.sw_if_index, nh_table_id=0)],
1135             table_id=0,
1136         )
1137         route0.add_vpp_config()
1138         route1 = VppIpRoute(
1139             self,
1140             "4.1.1.0",
1141             24,
1142             [
1143                 VppRoutePath(
1144                     self.pg2.remote_ip4, self.pg2.sw_if_index, nh_table_id=vrf_1
1145                 )
1146             ],
1147             table_id=vrf_1,
1148         )
1149         route1.add_vpp_config()
1150         self.logger.debug(self.vapi.cli("show ip fib"))
1151
1152         # configure SRv6 localSID End.DT6 behavior
1153         # Note:
1154         # fib_table: where the localsid is installed
1155         # sw_if_index: in T-variants of localsid: vrf table_id
1156         localsid = VppSRv6LocalSID(
1157             self,
1158             localsid="A3::C4",
1159             behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT4,
1160             nh_addr=0,
1161             end_psp=0,
1162             sw_if_index=vrf_1,
1163             vlan_index=0,
1164             fib_table=0,
1165         )
1166         localsid.add_vpp_config()
1167         # log the localsids
1168         self.logger.debug(self.vapi.cli("show sr localsid"))
1169
1170         # create IPv6 packets with SRH (SL=0)
1171         # send one packet per packet size
1172         count = len(self.pg_packet_sizes)
1173         dst_inner = "4.1.1.123"  # inner header destination address
1174         pkts = []
1175
1176         # packets with SRH, segments-left 0, active segment a3::c4
1177         packet_header = self.create_packet_header_IPv6_SRH_IPv4(
1178             dst_inner, sidlist=["a3::c4", "a2::", "a1::"], segleft=0
1179         )
1180         # add to traffic stream pg0->pg1
1181         pkts.extend(
1182             self.create_stream(
1183                 self.pg0, self.pg2, packet_header, self.pg_packet_sizes, count
1184             )
1185         )
1186
1187         # packets without SRH, IPv6 in IPv6
1188         # outer IPv6 dest addr is the localsid End.DX4
1189         packet_header = self.create_packet_header_IPv6_IPv4(
1190             dst_inner, dst_outer="a3::c4"
1191         )
1192         # add to traffic stream pg0->pg1
1193         pkts.extend(
1194             self.create_stream(
1195                 self.pg0, self.pg2, packet_header, self.pg_packet_sizes, count
1196             )
1197         )
1198
1199         # send packets and verify received packets
1200         # using same comparison function as End.DX4
1201         self.send_and_verify_pkts(
1202             self.pg0, pkts, self.pg2, self.compare_rx_tx_packet_End_DX4
1203         )
1204
1205         # assert nothing was received on the other interface (pg2)
1206         self.pg1.assert_nothing_captured(remark="mis-directed packet(s)")
1207
1208         # log the localsid counters
1209         self.logger.info(self.vapi.cli("show sr localsid"))
1210
1211         # remove SRv6 localSIDs
1212         localsid.remove_vpp_config()
1213
1214         # remove FIB entries
1215         # done by tearDown
1216
1217         # cleanup interfaces
1218         self.teardown_interfaces()
1219
1220     def test_SRv6_End_DX2(self):
1221         """Test SRv6 End.DX2 behavior."""
1222         # send traffic to one destination interface
1223         # source interface is IPv6 only
1224         self.setup_interfaces(ipv6=[True, False], ipv4=[False, False])
1225
1226         # configure SRv6 localSID End.DX2 behavior
1227         localsid = VppSRv6LocalSID(
1228             self,
1229             localsid="A3::C4",
1230             behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX2,
1231             nh_addr=0,
1232             end_psp=0,
1233             sw_if_index=self.pg1.sw_if_index,
1234             vlan_index=0,
1235             fib_table=0,
1236         )
1237         localsid.add_vpp_config()
1238         # log the localsids
1239         self.logger.debug(self.vapi.cli("show sr localsid"))
1240
1241         # send one packet per packet size
1242         count = len(self.pg_packet_sizes)
1243         pkts = []
1244
1245         # packets with SRH, segments-left 0, active segment a3::c4
1246         # L2 has no dot1q header
1247         packet_header = self.create_packet_header_IPv6_SRH_L2(
1248             sidlist=["a3::c4", "a2::", "a1::"], segleft=0, vlan=0
1249         )
1250         # add to traffic stream pg0->pg1
1251         pkts.extend(
1252             self.create_stream(
1253                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
1254             )
1255         )
1256
1257         # packets with SRH, segments-left 0, active segment a3::c4
1258         # L2 has dot1q header
1259         packet_header = self.create_packet_header_IPv6_SRH_L2(
1260             sidlist=["a3::c4", "a2::", "a1::"], segleft=0, vlan=123
1261         )
1262         # add to traffic stream pg0->pg1
1263         pkts.extend(
1264             self.create_stream(
1265                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
1266             )
1267         )
1268
1269         # packets without SRH, L2 in IPv6
1270         # outer IPv6 dest addr is the localsid End.DX2
1271         # L2 has no dot1q header
1272         packet_header = self.create_packet_header_IPv6_L2(dst_outer="a3::c4", vlan=0)
1273         # add to traffic stream pg0->pg1
1274         pkts.extend(
1275             self.create_stream(
1276                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
1277             )
1278         )
1279
1280         # packets without SRH, L2 in IPv6
1281         # outer IPv6 dest addr is the localsid End.DX2
1282         # L2 has dot1q header
1283         packet_header = self.create_packet_header_IPv6_L2(dst_outer="a3::c4", vlan=123)
1284         # add to traffic stream pg0->pg1
1285         pkts.extend(
1286             self.create_stream(
1287                 self.pg0, self.pg1, packet_header, self.pg_packet_sizes, count
1288             )
1289         )
1290
1291         # send packets and verify received packets
1292         self.send_and_verify_pkts(
1293             self.pg0, pkts, self.pg1, self.compare_rx_tx_packet_End_DX2
1294         )
1295
1296         # log the localsid counters
1297         self.logger.info(self.vapi.cli("show sr localsid"))
1298
1299         # remove SRv6 localSIDs
1300         localsid.remove_vpp_config()
1301
1302         # cleanup interfaces
1303         self.teardown_interfaces()
1304
1305     @unittest.skipUnless(0, "PC to fix")
1306     def test_SRv6_T_Insert_Classifier(self):
1307         """Test SRv6 Transit.Insert behavior (IPv6 only).
1308         steer packets using the classifier
1309         """
1310         # send traffic to one destination interface
1311         # source and destination are IPv6 only
1312         self.setup_interfaces(ipv6=[False, False, False, True, True])
1313
1314         # configure FIB entries
1315         route = VppIpRoute(
1316             self, "a4::", 64, [VppRoutePath(self.pg4.remote_ip6, self.pg4.sw_if_index)]
1317         )
1318         route.add_vpp_config()
1319
1320         # configure encaps IPv6 source address
1321         # needs to be done before SR Policy config
1322         # TODO: API?
1323         self.vapi.cli("set sr encaps source addr a3::")
1324
1325         bsid = "a3::9999:1"
1326         # configure SRv6 Policy
1327         # Note: segment list order: first -> last
1328         sr_policy = VppSRv6Policy(
1329             self,
1330             bsid=bsid,
1331             is_encap=0,
1332             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
1333             weight=1,
1334             fib_table=0,
1335             segments=["a4::", "a5::", "a6::c7"],
1336             source="a3::",
1337         )
1338         sr_policy.add_vpp_config()
1339         self.sr_policy = sr_policy
1340
1341         # log the sr policies
1342         self.logger.info(self.vapi.cli("show sr policies"))
1343
1344         # add classify table
1345         # mask on dst ip address prefix a7::/8
1346         mask = "{!s:0<16}".format("ff")
1347         r = self.vapi.classify_add_del_table(
1348             1,
1349             binascii.unhexlify(mask),
1350             match_n_vectors=(len(mask) - 1) // 32 + 1,
1351             current_data_flag=1,
1352             skip_n_vectors=2,
1353         )  # data offset
1354         self.assertIsNotNone(r, "No response msg for add_del_table")
1355         table_index = r.new_table_index
1356
1357         # add the source routing node as a ip6 inacl netxt node
1358         r = self.vapi.add_node_next("ip6-inacl", "sr-pl-rewrite-insert")
1359         inacl_next_node_index = r.node_index
1360
1361         match = "{!s:0<16}".format("a7")
1362         r = self.vapi.classify_add_del_session(
1363             1,
1364             table_index,
1365             binascii.unhexlify(match),
1366             hit_next_index=inacl_next_node_index,
1367             action=3,
1368             metadata=0,
1369         )  # sr policy index
1370         self.assertIsNotNone(r, "No response msg for add_del_session")
1371
1372         # log the classify table used in the steering policy
1373         self.logger.info(self.vapi.cli("show classify table"))
1374
1375         r = self.vapi.input_acl_set_interface(
1376             is_add=1, sw_if_index=self.pg3.sw_if_index, ip6_table_index=table_index
1377         )
1378         self.assertIsNotNone(r, "No response msg for input_acl_set_interface")
1379
1380         # log the ip6 inacl
1381         self.logger.info(self.vapi.cli("show inacl type ip6"))
1382
1383         # create packets
1384         count = len(self.pg_packet_sizes)
1385         dst_inner = "a7::1234"
1386         pkts = []
1387
1388         # create IPv6 packets without SRH
1389         packet_header = self.create_packet_header_IPv6(dst_inner)
1390         # create traffic stream pg3->pg4
1391         pkts.extend(
1392             self.create_stream(
1393                 self.pg3, self.pg4, packet_header, self.pg_packet_sizes, count
1394             )
1395         )
1396
1397         # create IPv6 packets with SRH
1398         # packets with segments-left 1, active segment a7::
1399         packet_header = self.create_packet_header_IPv6_SRH(
1400             sidlist=["a8::", "a7::", "a6::"], segleft=1
1401         )
1402         # create traffic stream pg3->pg4
1403         pkts.extend(
1404             self.create_stream(
1405                 self.pg3, self.pg4, packet_header, self.pg_packet_sizes, count
1406             )
1407         )
1408
1409         # send packets and verify received packets
1410         self.send_and_verify_pkts(
1411             self.pg3, pkts, self.pg4, self.compare_rx_tx_packet_T_Insert
1412         )
1413
1414         # remove the interface l2 input feature
1415         r = self.vapi.input_acl_set_interface(
1416             is_add=0, sw_if_index=self.pg3.sw_if_index, ip6_table_index=table_index
1417         )
1418         self.assertIsNotNone(r, "No response msg for input_acl_set_interface")
1419
1420         # log the ip6 inacl after cleaning
1421         self.logger.info(self.vapi.cli("show inacl type ip6"))
1422
1423         # log the localsid counters
1424         self.logger.info(self.vapi.cli("show sr localsid"))
1425
1426         # remove classifier SR steering
1427         # classifier_steering.remove_vpp_config()
1428         self.logger.info(self.vapi.cli("show sr steering policies"))
1429
1430         # remove SR Policies
1431         self.sr_policy.remove_vpp_config()
1432         self.logger.info(self.vapi.cli("show sr policies"))
1433
1434         # remove classify session and table
1435         r = self.vapi.classify_add_del_session(
1436             0, table_index, binascii.unhexlify(match)
1437         )
1438         self.assertIsNotNone(r, "No response msg for add_del_session")
1439
1440         r = self.vapi.classify_add_del_table(
1441             0, binascii.unhexlify(mask), table_index=table_index
1442         )
1443         self.assertIsNotNone(r, "No response msg for add_del_table")
1444
1445         self.logger.info(self.vapi.cli("show classify table"))
1446
1447         # remove FIB entries
1448         # done by tearDown
1449
1450         # cleanup interfaces
1451         self.teardown_interfaces()
1452
1453     def compare_rx_tx_packet_T_Encaps(self, tx_pkt, rx_pkt):
1454         """Compare input and output packet after passing T.Encaps
1455
1456         :param tx_pkt: transmitted packet
1457         :param rx_pkt: received packet
1458         """
1459         # T.Encaps updates the headers as follows:
1460         # SR Policy seglist (S3, S2, S1)
1461         # SR Policy source C
1462         # IPv6:
1463         # in: IPv6(A, B2)
1464         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(A, B2)
1465         # IPv6 + SRH:
1466         # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1)
1467         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(a, B2)SRH(B3, B2, B1; SL=1)
1468
1469         # get first (outer) IPv6 header of rx'ed packet
1470         rx_ip = rx_pkt.getlayer(IPv6)
1471         rx_srh = None
1472
1473         tx_ip = tx_pkt.getlayer(IPv6)
1474
1475         # expected segment-list
1476         seglist = self.sr_policy.segments
1477         # reverse list to get order as in SRH
1478         tx_seglist = seglist[::-1]
1479
1480         # get source address of SR Policy
1481         sr_policy_source = self.sr_policy.source
1482
1483         # rx'ed packet should have SRH
1484         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1485         # get SRH
1486         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1487
1488         # received ip.src should be equal to SR Policy source
1489         self.assertEqual(rx_ip.src, sr_policy_source)
1490         # received ip.dst should be equal to expected sidlist[lastentry]
1491         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1492         # rx'ed seglist should be equal to expected seglist
1493         self.assertEqual(rx_srh.addresses, tx_seglist)
1494         # segleft should be equal to size expected seglist-1
1495         self.assertEqual(rx_srh.segleft, len(tx_seglist) - 1)
1496         # segleft should be equal to lastentry
1497         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1498
1499         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1500         # except for the hop-limit field
1501         #   -> update tx'ed hlim to the expected hlim
1502         tx_ip.hlim = tx_ip.hlim - 1
1503
1504         self.assertEqual(rx_srh.payload, tx_ip)
1505
1506         self.logger.debug("packet verification: SUCCESS")
1507
1508     def compare_rx_tx_packet_T_Encaps_IPv4(self, tx_pkt, rx_pkt):
1509         """Compare input and output packet after passing T.Encaps for IPv4
1510
1511         :param tx_pkt: transmitted packet
1512         :param rx_pkt: received packet
1513         """
1514         # T.Encaps for IPv4 updates the headers as follows:
1515         # SR Policy seglist (S3, S2, S1)
1516         # SR Policy source C
1517         # IPv4:
1518         # in: IPv4(A, B2)
1519         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv4(A, B2)
1520
1521         # get first (outer) IPv6 header of rx'ed packet
1522         rx_ip = rx_pkt.getlayer(IPv6)
1523         rx_srh = None
1524
1525         tx_ip = tx_pkt.getlayer(IP)
1526
1527         # expected segment-list
1528         seglist = self.sr_policy.segments
1529         # reverse list to get order as in SRH
1530         tx_seglist = seglist[::-1]
1531
1532         # get source address of SR Policy
1533         sr_policy_source = self.sr_policy.source
1534
1535         # checks common to cases tx with and without SRH
1536         # rx'ed packet should have SRH and IPv4 header
1537         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1538         self.assertTrue(rx_ip.payload.haslayer(IP))
1539         # get SRH
1540         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1541
1542         # received ip.src should be equal to SR Policy source
1543         self.assertEqual(rx_ip.src, sr_policy_source)
1544         # received ip.dst should be equal to sidlist[lastentry]
1545         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1546         # rx'ed seglist should be equal to seglist
1547         self.assertEqual(rx_srh.addresses, tx_seglist)
1548         # segleft should be equal to size seglist-1
1549         self.assertEqual(rx_srh.segleft, len(tx_seglist) - 1)
1550         # segleft should be equal to lastentry
1551         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1552
1553         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1554         # except for the ttl field and ip checksum
1555         #   -> adjust tx'ed ttl to expected ttl
1556         tx_ip.ttl = tx_ip.ttl - 1
1557         #   -> set tx'ed ip checksum to None and let scapy recompute
1558         tx_ip.chksum = None
1559         # read back the pkt (with str()) to force computing these fields
1560         # probably other ways to accomplish this are possible
1561         tx_ip = IP(scapy.compat.raw(tx_ip))
1562
1563         self.assertEqual(rx_srh.payload, tx_ip)
1564
1565         self.logger.debug("packet verification: SUCCESS")
1566
1567     def compare_rx_tx_packet_T_Encaps_L2(self, tx_pkt, rx_pkt):
1568         """Compare input and output packet after passing T.Encaps for L2
1569
1570         :param tx_pkt: transmitted packet
1571         :param rx_pkt: received packet
1572         """
1573         # T.Encaps for L2 updates the headers as follows:
1574         # SR Policy seglist (S3, S2, S1)
1575         # SR Policy source C
1576         # L2:
1577         # in: L2
1578         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)L2
1579
1580         # get first (outer) IPv6 header of rx'ed packet
1581         rx_ip = rx_pkt.getlayer(IPv6)
1582         rx_srh = None
1583
1584         tx_ether = tx_pkt.getlayer(Ether)
1585
1586         # expected segment-list
1587         seglist = self.sr_policy.segments
1588         # reverse list to get order as in SRH
1589         tx_seglist = seglist[::-1]
1590
1591         # get source address of SR Policy
1592         sr_policy_source = self.sr_policy.source
1593
1594         # rx'ed packet should have SRH
1595         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1596         # get SRH
1597         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1598
1599         # received ip.src should be equal to SR Policy source
1600         self.assertEqual(rx_ip.src, sr_policy_source)
1601         # received ip.dst should be equal to sidlist[lastentry]
1602         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1603         # rx'ed seglist should be equal to seglist
1604         self.assertEqual(rx_srh.addresses, tx_seglist)
1605         # segleft should be equal to size seglist-1
1606         self.assertEqual(rx_srh.segleft, len(tx_seglist) - 1)
1607         # segleft should be equal to lastentry
1608         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1609         # nh should be "No Next Header" (143)
1610         self.assertEqual(rx_srh.nh, 143)
1611
1612         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1613         self.assertEqual(Ether(scapy.compat.raw(rx_srh.payload)), tx_ether)
1614
1615         self.logger.debug("packet verification: SUCCESS")
1616
1617     def compare_rx_tx_packet_T_Insert(self, tx_pkt, rx_pkt):
1618         """Compare input and output packet after passing T.Insert
1619
1620         :param tx_pkt: transmitted packet
1621         :param rx_pkt: received packet
1622         """
1623         # T.Insert updates the headers as follows:
1624         # IPv6:
1625         # in: IPv6(A, B2)
1626         # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)
1627         # IPv6 + SRH:
1628         # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1)
1629         # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)SRH(B3, B2, B1; SL=1)
1630
1631         # get first (outer) IPv6 header of rx'ed packet
1632         rx_ip = rx_pkt.getlayer(IPv6)
1633         rx_srh = None
1634         rx_ip2 = None
1635         rx_srh2 = None
1636         rx_ip3 = None
1637         rx_udp = rx_pkt[UDP]
1638
1639         tx_ip = tx_pkt.getlayer(IPv6)
1640         tx_srh = None
1641         tx_ip2 = None
1642         # some packets have been tx'ed with an SRH, some without it
1643         # get SRH if tx'ed packet has it
1644         if tx_pkt.haslayer(IPv6ExtHdrSegmentRouting):
1645             tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1646             tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1647         tx_udp = tx_pkt[UDP]
1648
1649         # expected segment-list (make copy of SR Policy segment list)
1650         seglist = self.sr_policy.segments[:]
1651         # expected seglist has initial dest addr as last segment
1652         seglist.append(tx_ip.dst)
1653         # reverse list to get order as in SRH
1654         tx_seglist = seglist[::-1]
1655
1656         # get source address of SR Policy
1657         sr_policy_source = self.sr_policy.source
1658
1659         # checks common to cases tx with and without SRH
1660         # rx'ed packet should have SRH and only one IPv6 header
1661         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1662         self.assertFalse(rx_ip.payload.haslayer(IPv6))
1663         # get SRH
1664         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1665
1666         # rx'ed ip.src should be equal to tx'ed ip.src
1667         self.assertEqual(rx_ip.src, tx_ip.src)
1668         # rx'ed ip.dst should be equal to sidlist[lastentry]
1669         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1670
1671         # rx'ed seglist should be equal to expected seglist
1672         self.assertEqual(rx_srh.addresses, tx_seglist)
1673         # segleft should be equal to size(expected seglist)-1
1674         self.assertEqual(rx_srh.segleft, len(tx_seglist) - 1)
1675         # segleft should be equal to lastentry
1676         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1677
1678         if tx_srh:  # packet was tx'ed with SRH
1679             # packet should have 2nd SRH
1680             self.assertTrue(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting))
1681             # get 2nd SRH
1682             rx_srh2 = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting, 2)
1683
1684             # rx'ed srh2.addresses should be equal to tx'ed srh.addresses
1685             self.assertEqual(rx_srh2.addresses, tx_srh.addresses)
1686             # rx'ed srh2.segleft should be equal to tx'ed srh.segleft
1687             self.assertEqual(rx_srh2.segleft, tx_srh.segleft)
1688             # rx'ed srh2.lastentry should be equal to tx'ed srh.lastentry
1689             self.assertEqual(rx_srh2.lastentry, tx_srh.lastentry)
1690
1691         else:  # packet was tx'ed without SRH
1692             # rx packet should have no other SRH
1693             self.assertFalse(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting))
1694
1695         # UDP layer should be unchanged
1696         self.assertEqual(rx_udp, tx_udp)
1697
1698         self.logger.debug("packet verification: SUCCESS")
1699
1700     def compare_rx_tx_packet_End(self, tx_pkt, rx_pkt):
1701         """Compare input and output packet after passing End (without PSP)
1702
1703         :param tx_pkt: transmitted packet
1704         :param rx_pkt: received packet
1705         """
1706         # End (no PSP) updates the headers as follows:
1707         # IPv6 + SRH:
1708         # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2)
1709         # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1710
1711         # get first (outer) IPv6 header of rx'ed packet
1712         rx_ip = rx_pkt.getlayer(IPv6)
1713         rx_srh = None
1714         rx_ip2 = None
1715         rx_udp = rx_pkt[UDP]
1716
1717         tx_ip = tx_pkt.getlayer(IPv6)
1718         # we know the packet has been tx'ed
1719         # with an inner IPv6 header and an SRH
1720         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1721         tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1722         tx_udp = tx_pkt[UDP]
1723
1724         # common checks, regardless of tx segleft value
1725         # rx'ed packet should have 2nd IPv6 header
1726         self.assertTrue(rx_ip.payload.haslayer(IPv6))
1727         # get second (inner) IPv6 header
1728         rx_ip2 = rx_pkt.getlayer(IPv6, 2)
1729
1730         if tx_ip.segleft > 0:
1731             # SRH should NOT have been popped:
1732             #   End SID without PSP does not pop SRH if segleft>0
1733             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1734             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1735
1736             # received ip.src should be equal to expected ip.src
1737             self.assertEqual(rx_ip.src, tx_ip.src)
1738             # sidlist should be unchanged
1739             self.assertEqual(rx_srh.addresses, tx_srh.addresses)
1740             # segleft should have been decremented
1741             self.assertEqual(rx_srh.segleft, tx_srh.segleft - 1)
1742             # received ip.dst should be equal to sidlist[segleft]
1743             self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
1744             # lastentry should be unchanged
1745             self.assertEqual(rx_srh.lastentry, tx_srh.lastentry)
1746             # inner IPv6 packet (ip2) should be unchanged
1747             self.assertEqual(rx_ip2.src, tx_ip2.src)
1748             self.assertEqual(rx_ip2.dst, tx_ip2.dst)
1749         # else:  # tx_ip.segleft == 0
1750         # TODO: Does this work with 2 SRHs in ingress packet?
1751
1752         # UDP layer should be unchanged
1753         self.assertEqual(rx_udp, tx_udp)
1754
1755         self.logger.debug("packet verification: SUCCESS")
1756
1757     def compare_rx_tx_packet_End_PSP(self, tx_pkt, rx_pkt):
1758         """Compare input and output packet after passing End with PSP
1759
1760         :param tx_pkt: transmitted packet
1761         :param rx_pkt: received packet
1762         """
1763         # End (PSP) updates the headers as follows:
1764         # IPv6 + SRH (SL>1):
1765         # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2)
1766         # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1767         # IPv6 + SRH (SL=1):
1768         # in: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1769         # out: IPv6(A, S3)
1770
1771         # get first (outer) IPv6 header of rx'ed packet
1772         rx_ip = rx_pkt.getlayer(IPv6)
1773         rx_srh = None
1774         rx_ip2 = None
1775         rx_udp = rx_pkt[UDP]
1776
1777         tx_ip = tx_pkt.getlayer(IPv6)
1778         # we know the packet has been tx'ed
1779         # with an inner IPv6 header and an SRH
1780         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1781         tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1782         tx_udp = tx_pkt[UDP]
1783
1784         # common checks, regardless of tx segleft value
1785         self.assertTrue(rx_ip.payload.haslayer(IPv6))
1786         rx_ip2 = rx_pkt.getlayer(IPv6, 2)
1787         # inner IPv6 packet (ip2) should be unchanged
1788         self.assertEqual(rx_ip2.src, tx_ip2.src)
1789         self.assertEqual(rx_ip2.dst, tx_ip2.dst)
1790
1791         if tx_ip.segleft > 1:
1792             # SRH should NOT have been popped:
1793             #   End SID with PSP does not pop SRH if segleft>1
1794             # rx'ed packet should have SRH
1795             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1796             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1797
1798             # received ip.src should be equal to expected ip.src
1799             self.assertEqual(rx_ip.src, tx_ip.src)
1800             # sidlist should be unchanged
1801             self.assertEqual(rx_srh.addresses, tx_srh.addresses)
1802             # segleft should have been decremented
1803             self.assertEqual(rx_srh.segleft, tx_srh.segleft - 1)
1804             # received ip.dst should be equal to sidlist[segleft]
1805             self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
1806             # lastentry should be unchanged
1807             self.assertEqual(rx_srh.lastentry, tx_srh.lastentry)
1808
1809         else:  # tx_ip.segleft <= 1
1810             # SRH should have been popped:
1811             #   End SID with PSP and segleft=1 pops SRH
1812             # the two IPv6 headers are still present
1813             # outer IPv6 header has DA == last segment of popped SRH
1814             # SRH should not be present
1815             self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1816             # outer IPv6 header ip.src should be equal to tx'ed ip.src
1817             self.assertEqual(rx_ip.src, tx_ip.src)
1818             # outer IPv6 header ip.dst should be = to tx'ed sidlist[segleft-1]
1819             self.assertEqual(rx_ip.dst, tx_srh.addresses[tx_srh.segleft - 1])
1820
1821         # UDP layer should be unchanged
1822         self.assertEqual(rx_udp, tx_udp)
1823
1824         self.logger.debug("packet verification: SUCCESS")
1825
1826     def compare_rx_tx_packet_End_DX6(self, tx_pkt, rx_pkt):
1827         """Compare input and output packet after passing End.DX6
1828
1829         :param tx_pkt: transmitted packet
1830         :param rx_pkt: received packet
1831         """
1832         # End.DX6 updates the headers as follows:
1833         # IPv6 + SRH (SL=0):
1834         # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv6(B, D)
1835         # out: IPv6(B, D)
1836         # IPv6:
1837         # in: IPv6(A, S3)IPv6(B, D)
1838         # out: IPv6(B, D)
1839
1840         # get first (outer) IPv6 header of rx'ed packet
1841         rx_ip = rx_pkt.getlayer(IPv6)
1842
1843         tx_ip = tx_pkt.getlayer(IPv6)
1844         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1845
1846         # verify if rx'ed packet has no SRH
1847         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1848
1849         # the whole rx_ip pkt should be equal to tx_ip2
1850         # except for the hlim field
1851         #   -> adjust tx'ed hlim to expected hlim
1852         tx_ip2.hlim = tx_ip2.hlim - 1
1853
1854         self.assertEqual(rx_ip, tx_ip2)
1855
1856         self.logger.debug("packet verification: SUCCESS")
1857
1858     def compare_rx_tx_packet_End_DX4(self, tx_pkt, rx_pkt):
1859         """Compare input and output packet after passing End.DX4
1860
1861         :param tx_pkt: transmitted packet
1862         :param rx_pkt: received packet
1863         """
1864         # End.DX4 updates the headers as follows:
1865         # IPv6 + SRH (SL=0):
1866         # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv4(B, D)
1867         # out: IPv4(B, D)
1868         # IPv6:
1869         # in: IPv6(A, S3)IPv4(B, D)
1870         # out: IPv4(B, D)
1871
1872         # get IPv4 header of rx'ed packet
1873         rx_ip = rx_pkt.getlayer(IP)
1874
1875         tx_ip = tx_pkt.getlayer(IPv6)
1876         tx_ip2 = tx_pkt.getlayer(IP)
1877
1878         # verify if rx'ed packet has no SRH
1879         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1880
1881         # the whole rx_ip pkt should be equal to tx_ip2
1882         # except for the ttl field and ip checksum
1883         #   -> adjust tx'ed ttl to expected ttl
1884         tx_ip2.ttl = tx_ip2.ttl - 1
1885         #   -> set tx'ed ip checksum to None and let scapy recompute
1886         tx_ip2.chksum = None
1887         # read back the pkt (with str()) to force computing these fields
1888         # probably other ways to accomplish this are possible
1889         tx_ip2 = IP(scapy.compat.raw(tx_ip2))
1890
1891         self.assertEqual(rx_ip, tx_ip2)
1892
1893         self.logger.debug("packet verification: SUCCESS")
1894
1895     def compare_rx_tx_packet_End_DX2(self, tx_pkt, rx_pkt):
1896         """Compare input and output packet after passing End.DX2
1897
1898         :param tx_pkt: transmitted packet
1899         :param rx_pkt: received packet
1900         """
1901         # End.DX2 updates the headers as follows:
1902         # IPv6 + SRH (SL=0):
1903         # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)L2
1904         # out: L2
1905         # IPv6:
1906         # in: IPv6(A, S3)L2
1907         # out: L2
1908
1909         # get IPv4 header of rx'ed packet
1910         rx_eth = rx_pkt.getlayer(Ether)
1911
1912         tx_ip = tx_pkt.getlayer(IPv6)
1913         # we can't just get the 2nd Ether layer
1914         # get the Raw content and dissect it as Ether
1915         tx_eth1 = Ether(scapy.compat.raw(tx_pkt[Raw]))
1916
1917         # verify if rx'ed packet has no SRH
1918         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1919
1920         # the whole rx_eth pkt should be equal to tx_eth1
1921         self.assertEqual(rx_eth, tx_eth1)
1922
1923         self.logger.debug("packet verification: SUCCESS")
1924
1925     def create_stream(self, src_if, dst_if, packet_header, packet_sizes, count):
1926         """Create SRv6 input packet stream for defined interface.
1927
1928         :param VppInterface src_if: Interface to create packet stream for
1929         :param VppInterface dst_if: destination interface of packet stream
1930         :param packet_header: Layer3 scapy packet headers,
1931                 L2 is added when not provided,
1932                 Raw(payload) with packet_info is added
1933         :param list packet_sizes: packet stream pckt sizes,sequentially applied
1934                to packets in stream have
1935         :param int count: number of packets in packet stream
1936         :return: list of packets
1937         """
1938         self.logger.info("Creating packets")
1939         pkts = []
1940         for i in range(0, count - 1):
1941             payload_info = self.create_packet_info(src_if, dst_if)
1942             self.logger.debug("Creating packet with index %d" % (payload_info.index))
1943             payload = self.info_to_payload(payload_info)
1944             # add L2 header if not yet provided in packet_header
1945             if packet_header.getlayer(0).name == "Ethernet":
1946                 p = packet_header / Raw(payload)
1947             else:
1948                 p = (
1949                     Ether(dst=src_if.local_mac, src=src_if.remote_mac)
1950                     / packet_header
1951                     / Raw(payload)
1952                 )
1953             size = packet_sizes[i % len(packet_sizes)]
1954             self.logger.debug("Packet size %d" % (size))
1955             self.extend_packet(p, size)
1956             # we need to store the packet with the automatic fields computed
1957             # read back the dumped packet (with str())
1958             # to force computing these fields
1959             # probably other ways are possible
1960             p = Ether(scapy.compat.raw(p))
1961             payload_info.data = p.copy()
1962             self.logger.debug(ppp("Created packet:", p))
1963             pkts.append(p)
1964         self.logger.info("Done creating packets")
1965         return pkts
1966
1967     def send_and_verify_pkts(
1968         self, input, pkts, output, compare_func, expected_count=None
1969     ):
1970         """Send packets and verify received packets using compare_func
1971
1972         :param input: ingress interface of DUT
1973         :param pkts: list of packets to transmit
1974         :param output: egress interface of DUT
1975         :param compare_func: function to compare in and out packets
1976         :param expected_count: expected number of captured packets (if
1977                different than len(pkts))
1978         """
1979         # add traffic stream to input interface
1980         input.add_stream(pkts)
1981
1982         # enable capture on all interfaces
1983         self.pg_enable_capture(self.pg_interfaces)
1984
1985         # start traffic
1986         self.logger.info("Starting traffic")
1987         self.pg_start()
1988
1989         # get output capture
1990         self.logger.info("Getting packet capture")
1991         capture = output.get_capture(expected_count=expected_count)
1992
1993         # assert nothing was captured on input interface
1994         input.assert_nothing_captured()
1995
1996         # verify captured packets
1997         self.verify_captured_pkts(output, capture, compare_func)
1998
1999     def create_packet_header_IPv6(self, dst):
2000         """Create packet header: IPv6 header, UDP header
2001
2002         :param dst: IPv6 destination address
2003
2004         IPv6 source address is 1234::1
2005         UDP source port and destination port are 1234
2006         """
2007
2008         p = IPv6(src="1234::1", dst=dst) / UDP(sport=1234, dport=1234)
2009         return p
2010
2011     def create_packet_header_IPv6_SRH(self, sidlist, segleft):
2012         """Create packet header: IPv6 header with SRH, UDP header
2013
2014         :param list sidlist: segment list
2015         :param int segleft: segments-left field value
2016
2017         IPv6 destination address is set to sidlist[segleft]
2018         IPv6 source addresses are 1234::1 and 4321::1
2019         UDP source port and destination port are 1234
2020         """
2021
2022         p = (
2023             IPv6(src="1234::1", dst=sidlist[segleft])
2024             / IPv6ExtHdrSegmentRouting(addresses=sidlist)
2025             / UDP(sport=1234, dport=1234)
2026         )
2027         return p
2028
2029     def create_packet_header_IPv6_SRH_IPv6(self, dst, sidlist, segleft):
2030         """Create packet header: IPv6 encapsulated in SRv6:
2031         IPv6 header with SRH, IPv6 header, UDP header
2032
2033         :param ipv6address dst: inner IPv6 destination address
2034         :param list sidlist: segment list of outer IPv6 SRH
2035         :param int segleft: segments-left field of outer IPv6 SRH
2036
2037         Outer IPv6 destination address is set to sidlist[segleft]
2038         IPv6 source addresses are 1234::1 and 4321::1
2039         UDP source port and destination port are 1234
2040         """
2041
2042         p = (
2043             IPv6(src="1234::1", dst=sidlist[segleft])
2044             / IPv6ExtHdrSegmentRouting(addresses=sidlist, segleft=segleft, nh=41)
2045             / IPv6(src="4321::1", dst=dst)
2046             / UDP(sport=1234, dport=1234)
2047         )
2048         return p
2049
2050     def create_packet_header_IPv6_IPv6(self, dst_inner, dst_outer):
2051         """Create packet header: IPv6 encapsulated in IPv6:
2052         IPv6 header, IPv6 header, UDP header
2053
2054         :param ipv6address dst_inner: inner IPv6 destination address
2055         :param ipv6address dst_outer: outer IPv6 destination address
2056
2057         IPv6 source addresses are 1234::1 and 4321::1
2058         UDP source port and destination port are 1234
2059         """
2060
2061         p = (
2062             IPv6(src="1234::1", dst=dst_outer)
2063             / IPv6(src="4321::1", dst=dst_inner)
2064             / UDP(sport=1234, dport=1234)
2065         )
2066         return p
2067
2068     def create_packet_header_IPv6_SRH_SRH_IPv6(
2069         self, dst, sidlist1, segleft1, sidlist2, segleft2
2070     ):
2071         """Create packet header: IPv6 encapsulated in SRv6 with 2 SRH:
2072         IPv6 header with SRH, 2nd SRH, IPv6 header, UDP header
2073
2074         :param ipv6address dst: inner IPv6 destination address
2075         :param list sidlist1: segment list of outer IPv6 SRH
2076         :param int segleft1: segments-left field of outer IPv6 SRH
2077         :param list sidlist2: segment list of inner IPv6 SRH
2078         :param int segleft2: segments-left field of inner IPv6 SRH
2079
2080         Outer IPv6 destination address is set to sidlist[segleft]
2081         IPv6 source addresses are 1234::1 and 4321::1
2082         UDP source port and destination port are 1234
2083         """
2084
2085         p = (
2086             IPv6(src="1234::1", dst=sidlist1[segleft1])
2087             / IPv6ExtHdrSegmentRouting(addresses=sidlist1, segleft=segleft1, nh=43)
2088             / IPv6ExtHdrSegmentRouting(addresses=sidlist2, segleft=segleft2, nh=41)
2089             / IPv6(src="4321::1", dst=dst)
2090             / UDP(sport=1234, dport=1234)
2091         )
2092         return p
2093
2094     def create_packet_header_IPv4(self, dst):
2095         """Create packet header: IPv4 header, UDP header
2096
2097         :param dst: IPv4 destination address
2098
2099         IPv4 source address is 123.1.1.1
2100         UDP source port and destination port are 1234
2101         """
2102
2103         p = IP(src="123.1.1.1", dst=dst) / UDP(sport=1234, dport=1234)
2104         return p
2105
2106     def create_packet_header_IPv6_IPv4(self, dst_inner, dst_outer):
2107         """Create packet header: IPv4 encapsulated in IPv6:
2108         IPv6 header, IPv4 header, UDP header
2109
2110         :param ipv4address dst_inner: inner IPv4 destination address
2111         :param ipv6address dst_outer: outer IPv6 destination address
2112
2113         IPv6 source address is 1234::1
2114         IPv4 source address is 123.1.1.1
2115         UDP source port and destination port are 1234
2116         """
2117
2118         p = (
2119             IPv6(src="1234::1", dst=dst_outer)
2120             / IP(src="123.1.1.1", dst=dst_inner)
2121             / UDP(sport=1234, dport=1234)
2122         )
2123         return p
2124
2125     def create_packet_header_IPv6_SRH_IPv4(self, dst, sidlist, segleft):
2126         """Create packet header: IPv4 encapsulated in SRv6:
2127         IPv6 header with SRH, IPv4 header, UDP header
2128
2129         :param ipv4address dst: inner IPv4 destination address
2130         :param list sidlist: segment list of outer IPv6 SRH
2131         :param int segleft: segments-left field of outer IPv6 SRH
2132
2133         Outer IPv6 destination address is set to sidlist[segleft]
2134         IPv6 source address is 1234::1
2135         IPv4 source address is 123.1.1.1
2136         UDP source port and destination port are 1234
2137         """
2138
2139         p = (
2140             IPv6(src="1234::1", dst=sidlist[segleft])
2141             / IPv6ExtHdrSegmentRouting(addresses=sidlist, segleft=segleft, nh=4)
2142             / IP(src="123.1.1.1", dst=dst)
2143             / UDP(sport=1234, dport=1234)
2144         )
2145         return p
2146
2147     def create_packet_header_L2(self, vlan=0):
2148         """Create packet header: L2 header
2149
2150         :param vlan: if vlan!=0 then add 802.1q header
2151         """
2152         # Note: the dst addr ('00:55:44:33:22:11') is used in
2153         # the compare function compare_rx_tx_packet_T_Encaps_L2
2154         # to detect presence of L2 in SRH payload
2155         p = Ether(src="00:11:22:33:44:55", dst="00:55:44:33:22:11")
2156         etype = 0x8137  # IPX
2157         if vlan:
2158             # add 802.1q layer
2159             p /= Dot1Q(vlan=vlan, type=etype)
2160         else:
2161             p.type = etype
2162         return p
2163
2164     def create_packet_header_IPv6_SRH_L2(self, sidlist, segleft, vlan=0):
2165         """Create packet header: L2 encapsulated in SRv6:
2166         IPv6 header with SRH, L2
2167
2168         :param list sidlist: segment list of outer IPv6 SRH
2169         :param int segleft: segments-left field of outer IPv6 SRH
2170         :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
2171
2172         Outer IPv6 destination address is set to sidlist[segleft]
2173         IPv6 source address is 1234::1
2174         """
2175         eth = Ether(src="00:11:22:33:44:55", dst="00:55:44:33:22:11")
2176         etype = 0x8137  # IPX
2177         if vlan:
2178             # add 802.1q layer
2179             eth /= Dot1Q(vlan=vlan, type=etype)
2180         else:
2181             eth.type = etype
2182
2183         p = (
2184             IPv6(src="1234::1", dst=sidlist[segleft])
2185             / IPv6ExtHdrSegmentRouting(addresses=sidlist, segleft=segleft, nh=143)
2186             / eth
2187         )
2188         return p
2189
2190     def create_packet_header_IPv6_L2(self, dst_outer, vlan=0):
2191         """Create packet header: L2 encapsulated in IPv6:
2192         IPv6 header, L2
2193
2194         :param ipv6address dst_outer: outer IPv6 destination address
2195         :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
2196         """
2197         eth = Ether(src="00:11:22:33:44:55", dst="00:55:44:33:22:11")
2198         etype = 0x8137  # IPX
2199         if vlan:
2200             # add 802.1q layer
2201             eth /= Dot1Q(vlan=vlan, type=etype)
2202         else:
2203             eth.type = etype
2204
2205         p = IPv6(src="1234::1", dst=dst_outer, nh=143) / eth
2206         return p
2207
2208     def get_payload_info(self, packet):
2209         """Extract the payload_info from the packet"""
2210         # in most cases, payload_info is in packet[Raw]
2211         # but packet[Raw] gives the complete payload
2212         # (incl L2 header) for the T.Encaps L2 case
2213         try:
2214             payload_info = self.payload_to_info(packet[Raw])
2215
2216         except:
2217             # remote L2 header from packet[Raw]:
2218             # take packet[Raw], convert it to an Ether layer
2219             # and then extract Raw from it
2220             payload_info = self.payload_to_info(Ether(scapy.compat.r(packet[Raw]))[Raw])
2221
2222         return payload_info
2223
2224     def verify_captured_pkts(self, dst_if, capture, compare_func):
2225         """
2226         Verify captured packet stream for specified interface.
2227         Compare ingress with egress packets using the specified compare fn
2228
2229         :param dst_if: egress interface of DUT
2230         :param capture: captured packets
2231         :param compare_func: function to compare in and out packet
2232         """
2233         self.logger.info(
2234             "Verifying capture on interface %s using function %s"
2235             % (dst_if.name, compare_func.__name__)
2236         )
2237
2238         last_info = dict()
2239         for i in self.pg_interfaces:
2240             last_info[i.sw_if_index] = None
2241         dst_sw_if_index = dst_if.sw_if_index
2242
2243         for packet in capture:
2244             try:
2245                 # extract payload_info from packet's payload
2246                 payload_info = self.get_payload_info(packet)
2247                 packet_index = payload_info.index
2248
2249                 self.logger.debug("Verifying packet with index %d" % (packet_index))
2250                 # packet should have arrived on the expected interface
2251                 self.assertEqual(payload_info.dst, dst_sw_if_index)
2252                 self.logger.debug(
2253                     "Got packet on interface %s: src=%u (idx=%u)"
2254                     % (dst_if.name, payload_info.src, packet_index)
2255                 )
2256
2257                 # search for payload_info with same src and dst if_index
2258                 # this will give us the transmitted packet
2259                 next_info = self.get_next_packet_info_for_interface2(
2260                     payload_info.src, dst_sw_if_index, last_info[payload_info.src]
2261                 )
2262                 last_info[payload_info.src] = next_info
2263                 # next_info should not be None
2264                 self.assertTrue(next_info is not None)
2265                 # index of tx and rx packets should be equal
2266                 self.assertEqual(packet_index, next_info.index)
2267                 # data field of next_info contains the tx packet
2268                 txed_packet = next_info.data
2269
2270                 self.logger.debug(
2271                     ppp("Transmitted packet:", txed_packet)
2272                 )  # ppp=Pretty Print Packet
2273
2274                 self.logger.debug(ppp("Received packet:", packet))
2275
2276                 # compare rcvd packet with expected packet using compare_func
2277                 compare_func(txed_packet, packet)
2278
2279             except:
2280                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2281                 raise
2282
2283         # FIXME: there is no need to check manually that all the packets
2284         #        arrived (already done so by get_capture); checking here
2285         #        prevents testing packets that are expected to be dropped, so
2286         #        commenting this out for now
2287
2288         # have all expected packets arrived?
2289         # for i in self.pg_interfaces:
2290         #    remaining_packet = self.get_next_packet_info_for_interface2(
2291         #        i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
2292         #    self.assertTrue(remaining_packet is None,
2293         #                    "Interface %s: Packet expected from interface %s "
2294         #                    "didn't arrive" % (dst_if.name, i.name))
2295
2296
2297 if __name__ == "__main__":
2298     unittest.main(testRunner=VppTestRunner)