Typos. A bunch of typos I've been collecting.
[vpp.git] / test / test_srv6.py
1 #!/usr/bin/env python
2
3 import unittest
4 import binascii
5 from socket import AF_INET6
6
7 from framework import VppTestCase, VppTestRunner
8 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto, VppIpTable
9 from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \
10     SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes
11
12 import scapy.compat
13 from scapy.packet import Raw
14 from scapy.layers.l2 import Ether, Dot1Q
15 from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
16 from scapy.layers.inet import IP, UDP
17
18 from scapy.utils import inet_pton, inet_ntop
19
20 from util import ppp
21
22
23 class TestSRv6(VppTestCase):
24     """ SRv6 Test Case """
25
26     @classmethod
27     def setUpClass(self):
28         super(TestSRv6, self).setUpClass()
29
30     def setUp(self):
31         """ Perform test setup before each test case.
32         """
33         super(TestSRv6, self).setUp()
34
35         # packet sizes, inclusive L2 overhead
36         self.pg_packet_sizes = [64, 512, 1518, 9018]
37
38         # reset packet_infos
39         self.reset_packet_infos()
40
41     def tearDown(self):
42         """ Clean up test setup after each test case.
43         """
44         self.teardown_interfaces()
45
46         super(TestSRv6, self).tearDown()
47
48     def configure_interface(self,
49                             interface,
50                             ipv6=False, ipv4=False,
51                             ipv6_table_id=0, ipv4_table_id=0):
52         """ Configure interface.
53         :param ipv6: configure IPv6 on interface
54         :param ipv4: configure IPv4 on interface
55         :param ipv6_table_id: FIB table_id for IPv6
56         :param ipv4_table_id: FIB table_id for IPv4
57         """
58         self.logger.debug("Configuring interface %s" % (interface.name))
59         if ipv6:
60             self.logger.debug("Configuring IPv6")
61             interface.set_table_ip6(ipv6_table_id)
62             interface.config_ip6()
63             interface.resolve_ndp(timeout=5)
64         if ipv4:
65             self.logger.debug("Configuring IPv4")
66             interface.set_table_ip4(ipv4_table_id)
67             interface.config_ip4()
68             interface.resolve_arp()
69         interface.admin_up()
70
71     def setup_interfaces(self, ipv6=[], ipv4=[],
72                          ipv6_table_id=[], ipv4_table_id=[]):
73         """ Create and configure interfaces.
74
75         :param ipv6: list of interface IPv6 capabilities
76         :param ipv4: list of interface IPv4 capabilities
77         :param ipv6_table_id: list of intf IPv6 FIB table_ids
78         :param ipv4_table_id: list of intf IPv4 FIB table_ids
79         :returns: List of created interfaces.
80         """
81         # how many interfaces?
82         if len(ipv6):
83             count = len(ipv6)
84         else:
85             count = len(ipv4)
86         self.logger.debug("Creating and configuring %d interfaces" % (count))
87
88         # fill up ipv6 and ipv4 lists if needed
89         # not enabled (False) is the default
90         if len(ipv6) < count:
91             ipv6 += (count - len(ipv6)) * [False]
92         if len(ipv4) < count:
93             ipv4 += (count - len(ipv4)) * [False]
94
95         # fill up table_id lists if needed
96         # table_id 0 (global) is the default
97         if len(ipv6_table_id) < count:
98             ipv6_table_id += (count - len(ipv6_table_id)) * [0]
99         if len(ipv4_table_id) < count:
100             ipv4_table_id += (count - len(ipv4_table_id)) * [0]
101
102         # create 'count' pg interfaces
103         self.create_pg_interfaces(range(count))
104
105         # setup all interfaces
106         for i in range(count):
107             intf = self.pg_interfaces[i]
108             self.configure_interface(intf,
109                                      ipv6[i], ipv4[i],
110                                      ipv6_table_id[i], ipv4_table_id[i])
111
112         if any(ipv6):
113             self.logger.debug(self.vapi.cli("show ip6 neighbors"))
114         if any(ipv4):
115             self.logger.debug(self.vapi.cli("show ip arp"))
116         self.logger.debug(self.vapi.cli("show interface"))
117         self.logger.debug(self.vapi.cli("show hardware"))
118
119         return self.pg_interfaces
120
121     def teardown_interfaces(self):
122         """ Unconfigure and bring down interface.
123         """
124         self.logger.debug("Tearing down interfaces")
125         # tear down all interfaces
126         # AFAIK they cannot be deleted
127         for i in self.pg_interfaces:
128             self.logger.debug("Tear down interface %s" % (i.name))
129             i.admin_down()
130             i.unconfig()
131             i.set_table_ip4(0)
132             i.set_table_ip6(0)
133
134     @unittest.skipUnless(0, "PC to fix")
135     def test_SRv6_T_Encaps(self):
136         """ Test SRv6 Transit.Encaps behavior for IPv6.
137         """
138         # send traffic to one destination interface
139         # source and destination are IPv6 only
140         self.setup_interfaces(ipv6=[True, True])
141
142         # configure FIB entries
143         route = VppIpRoute(self, "a4::", 64,
144                            [VppRoutePath(self.pg1.remote_ip6,
145                                          self.pg1.sw_if_index,
146                                          proto=DpoProto.DPO_PROTO_IP6)],
147                            is_ip6=1)
148         route.add_vpp_config()
149
150         # configure encaps IPv6 source address
151         # needs to be done before SR Policy config
152         # TODO: API?
153         self.vapi.cli("set sr encaps source addr a3::")
154
155         bsid = 'a3::9999:1'
156         # configure SRv6 Policy
157         # Note: segment list order: first -> last
158         sr_policy = VppSRv6Policy(
159             self, bsid=bsid,
160             is_encap=1,
161             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
162             weight=1, fib_table=0,
163             segments=['a4::', 'a5::', 'a6::c7'],
164             source='a3::')
165         sr_policy.add_vpp_config()
166         self.sr_policy = sr_policy
167
168         # log the sr policies
169         self.logger.info(self.vapi.cli("show sr policies"))
170
171         # steer IPv6 traffic to a7::/64 into SRv6 Policy
172         # use the bsid of the above self.sr_policy
173         pol_steering = VppSRv6Steering(
174                         self,
175                         bsid=self.sr_policy.bsid,
176                         prefix="a7::", mask_width=64,
177                         traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
178                         sr_policy_index=0, table_id=0,
179                         sw_if_index=0)
180         pol_steering.add_vpp_config()
181
182         # log the sr steering policies
183         self.logger.info(self.vapi.cli("show sr steering policies"))
184
185         # create packets
186         count = len(self.pg_packet_sizes)
187         dst_inner = 'a7::1234'
188         pkts = []
189
190         # create IPv6 packets without SRH
191         packet_header = self.create_packet_header_IPv6(dst_inner)
192         # create traffic stream pg0->pg1
193         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
194                                        self.pg_packet_sizes, count))
195
196         # create IPv6 packets with SRH
197         # packets with segments-left 1, active segment a7::
198         packet_header = self.create_packet_header_IPv6_SRH(
199             sidlist=['a8::', 'a7::', 'a6::'],
200             segleft=1)
201         # create traffic stream pg0->pg1
202         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
203                                        self.pg_packet_sizes, count))
204
205         # create IPv6 packets with SRH and IPv6
206         # packets with segments-left 1, active segment a7::
207         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
208             dst_inner,
209             sidlist=['a8::', 'a7::', 'a6::'],
210             segleft=1)
211         # create traffic stream pg0->pg1
212         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
213                                        self.pg_packet_sizes, count))
214
215         # send packets and verify received packets
216         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
217                                   self.compare_rx_tx_packet_T_Encaps)
218
219         # log the localsid counters
220         self.logger.info(self.vapi.cli("show sr localsid"))
221
222         # remove SR steering
223         pol_steering.remove_vpp_config()
224         self.logger.info(self.vapi.cli("show sr steering policies"))
225
226         # remove SR Policies
227         self.sr_policy.remove_vpp_config()
228         self.logger.info(self.vapi.cli("show sr policies"))
229
230         # remove FIB entries
231         # done by tearDown
232
233         # cleanup interfaces
234         self.teardown_interfaces()
235
236     @unittest.skipUnless(0, "PC to fix")
237     def test_SRv6_T_Insert(self):
238         """ Test SRv6 Transit.Insert behavior (IPv6 only).
239         """
240         # send traffic to one destination interface
241         # source and destination are IPv6 only
242         self.setup_interfaces(ipv6=[True, True])
243
244         # configure FIB entries
245         route = VppIpRoute(self, "a4::", 64,
246                            [VppRoutePath(self.pg1.remote_ip6,
247                                          self.pg1.sw_if_index,
248                                          proto=DpoProto.DPO_PROTO_IP6)],
249                            is_ip6=1)
250         route.add_vpp_config()
251
252         # configure encaps IPv6 source address
253         # needs to be done before SR Policy config
254         # TODO: API?
255         self.vapi.cli("set sr encaps source addr a3::")
256
257         bsid = 'a3::9999:1'
258         # configure SRv6 Policy
259         # Note: segment list order: first -> last
260         sr_policy = VppSRv6Policy(
261             self, bsid=bsid,
262             is_encap=0,
263             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
264             weight=1, fib_table=0,
265             segments=['a4::', 'a5::', 'a6::c7'],
266             source='a3::')
267         sr_policy.add_vpp_config()
268         self.sr_policy = sr_policy
269
270         # log the sr policies
271         self.logger.info(self.vapi.cli("show sr policies"))
272
273         # steer IPv6 traffic to a7::/64 into SRv6 Policy
274         # use the bsid of the above self.sr_policy
275         pol_steering = VppSRv6Steering(
276                         self,
277                         bsid=self.sr_policy.bsid,
278                         prefix="a7::", mask_width=64,
279                         traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
280                         sr_policy_index=0, table_id=0,
281                         sw_if_index=0)
282         pol_steering.add_vpp_config()
283
284         # log the sr steering policies
285         self.logger.info(self.vapi.cli("show sr steering policies"))
286
287         # create packets
288         count = len(self.pg_packet_sizes)
289         dst_inner = 'a7::1234'
290         pkts = []
291
292         # create IPv6 packets without SRH
293         packet_header = self.create_packet_header_IPv6(dst_inner)
294         # create traffic stream pg0->pg1
295         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
296                                        self.pg_packet_sizes, count))
297
298         # create IPv6 packets with SRH
299         # packets with segments-left 1, active segment a7::
300         packet_header = self.create_packet_header_IPv6_SRH(
301             sidlist=['a8::', 'a7::', 'a6::'],
302             segleft=1)
303         # create traffic stream pg0->pg1
304         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
305                                        self.pg_packet_sizes, count))
306
307         # send packets and verify received packets
308         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
309                                   self.compare_rx_tx_packet_T_Insert)
310
311         # log the localsid counters
312         self.logger.info(self.vapi.cli("show sr localsid"))
313
314         # remove SR steering
315         pol_steering.remove_vpp_config()
316         self.logger.info(self.vapi.cli("show sr steering policies"))
317
318         # remove SR Policies
319         self.sr_policy.remove_vpp_config()
320         self.logger.info(self.vapi.cli("show sr policies"))
321
322         # remove FIB entries
323         # done by tearDown
324
325         # cleanup interfaces
326         self.teardown_interfaces()
327
328     @unittest.skipUnless(0, "PC to fix")
329     def test_SRv6_T_Encaps_IPv4(self):
330         """ Test SRv6 Transit.Encaps behavior for IPv4.
331         """
332         # send traffic to one destination interface
333         # source interface is IPv4 only
334         # destination interface is IPv6 only
335         self.setup_interfaces(ipv6=[False, True], ipv4=[True, False])
336
337         # configure FIB entries
338         route = VppIpRoute(self, "a4::", 64,
339                            [VppRoutePath(self.pg1.remote_ip6,
340                                          self.pg1.sw_if_index,
341                                          proto=DpoProto.DPO_PROTO_IP6)],
342                            is_ip6=1)
343         route.add_vpp_config()
344
345         # configure encaps IPv6 source address
346         # needs to be done before SR Policy config
347         # TODO: API?
348         self.vapi.cli("set sr encaps source addr a3::")
349
350         bsid = 'a3::9999:1'
351         # configure SRv6 Policy
352         # Note: segment list order: first -> last
353         sr_policy = VppSRv6Policy(
354             self, bsid=bsid,
355             is_encap=1,
356             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
357             weight=1, fib_table=0,
358             segments=['a4::', 'a5::', 'a6::c7'],
359             source='a3::')
360         sr_policy.add_vpp_config()
361         self.sr_policy = sr_policy
362
363         # log the sr policies
364         self.logger.info(self.vapi.cli("show sr policies"))
365
366         # steer IPv4 traffic to 7.1.1.0/24 into SRv6 Policy
367         # use the bsid of the above self.sr_policy
368         pol_steering = VppSRv6Steering(
369                         self,
370                         bsid=self.sr_policy.bsid,
371                         prefix="7.1.1.0", mask_width=24,
372                         traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV4,
373                         sr_policy_index=0, table_id=0,
374                         sw_if_index=0)
375         pol_steering.add_vpp_config()
376
377         # log the sr steering policies
378         self.logger.info(self.vapi.cli("show sr steering policies"))
379
380         # create packets
381         count = len(self.pg_packet_sizes)
382         dst_inner = '7.1.1.123'
383         pkts = []
384
385         # create IPv4 packets
386         packet_header = self.create_packet_header_IPv4(dst_inner)
387         # create traffic stream pg0->pg1
388         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
389                                        self.pg_packet_sizes, count))
390
391         # send packets and verify received packets
392         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
393                                   self.compare_rx_tx_packet_T_Encaps_IPv4)
394
395         # log the localsid counters
396         self.logger.info(self.vapi.cli("show sr localsid"))
397
398         # remove SR steering
399         pol_steering.remove_vpp_config()
400         self.logger.info(self.vapi.cli("show sr steering policies"))
401
402         # remove SR Policies
403         self.sr_policy.remove_vpp_config()
404         self.logger.info(self.vapi.cli("show sr policies"))
405
406         # remove FIB entries
407         # done by tearDown
408
409         # cleanup interfaces
410         self.teardown_interfaces()
411
412     @unittest.skip("VPP crashes after running this test")
413     def test_SRv6_T_Encaps_L2(self):
414         """ Test SRv6 Transit.Encaps behavior for L2.
415         """
416         # send traffic to one destination interface
417         # source interface is IPv4 only TODO?
418         # destination interface is IPv6 only
419         self.setup_interfaces(ipv6=[False, True], ipv4=[False, False])
420
421         # configure FIB entries
422         route = VppIpRoute(self, "a4::", 64,
423                            [VppRoutePath(self.pg1.remote_ip6,
424                                          self.pg1.sw_if_index,
425                                          proto=DpoProto.DPO_PROTO_IP6)],
426                            is_ip6=1)
427         route.add_vpp_config()
428
429         # configure encaps IPv6 source address
430         # needs to be done before SR Policy config
431         # TODO: API?
432         self.vapi.cli("set sr encaps source addr a3::")
433
434         bsid = 'a3::9999:1'
435         # configure SRv6 Policy
436         # Note: segment list order: first -> last
437         sr_policy = VppSRv6Policy(
438             self, bsid=bsid,
439             is_encap=1,
440             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
441             weight=1, fib_table=0,
442             segments=['a4::', 'a5::', 'a6::c7'],
443             source='a3::')
444         sr_policy.add_vpp_config()
445         self.sr_policy = sr_policy
446
447         # log the sr policies
448         self.logger.info(self.vapi.cli("show sr policies"))
449
450         # steer L2 traffic into SRv6 Policy
451         # use the bsid of the above self.sr_policy
452         pol_steering = VppSRv6Steering(
453                         self,
454                         bsid=self.sr_policy.bsid,
455                         prefix="::", mask_width=0,
456                         traffic_type=SRv6PolicySteeringTypes.SR_STEER_L2,
457                         sr_policy_index=0, table_id=0,
458                         sw_if_index=self.pg0.sw_if_index)
459         pol_steering.add_vpp_config()
460
461         # log the sr steering policies
462         self.logger.info(self.vapi.cli("show sr steering policies"))
463
464         # create packets
465         count = len(self.pg_packet_sizes)
466         pkts = []
467
468         # create L2 packets without dot1q header
469         packet_header = self.create_packet_header_L2()
470         # create traffic stream pg0->pg1
471         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
472                                        self.pg_packet_sizes, count))
473
474         # create L2 packets with dot1q header
475         packet_header = self.create_packet_header_L2(vlan=123)
476         # create traffic stream pg0->pg1
477         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
478                                        self.pg_packet_sizes, count))
479
480         # send packets and verify received packets
481         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
482                                   self.compare_rx_tx_packet_T_Encaps_L2)
483
484         # log the localsid counters
485         self.logger.info(self.vapi.cli("show sr localsid"))
486
487         # remove SR steering
488         pol_steering.remove_vpp_config()
489         self.logger.info(self.vapi.cli("show sr steering policies"))
490
491         # remove SR Policies
492         self.sr_policy.remove_vpp_config()
493         self.logger.info(self.vapi.cli("show sr policies"))
494
495         # remove FIB entries
496         # done by tearDown
497
498         # cleanup interfaces
499         self.teardown_interfaces()
500
501     def test_SRv6_End(self):
502         """ Test SRv6 End (without PSP) behavior.
503         """
504         # send traffic to one destination interface
505         # source and destination interfaces are IPv6 only
506         self.setup_interfaces(ipv6=[True, True])
507
508         # configure FIB entries
509         route = VppIpRoute(self, "a4::", 64,
510                            [VppRoutePath(self.pg1.remote_ip6,
511                                          self.pg1.sw_if_index,
512                                          proto=DpoProto.DPO_PROTO_IP6)],
513                            is_ip6=1)
514         route.add_vpp_config()
515
516         # configure SRv6 localSID End without PSP behavior
517         localsid = VppSRv6LocalSID(
518                         self, localsid={'addr': 'A3::0'},
519                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
520                         nh_addr4='0.0.0.0',
521                         nh_addr6='::',
522                         end_psp=0,
523                         sw_if_index=0,
524                         vlan_index=0,
525                         fib_table=0)
526         localsid.add_vpp_config()
527         # log the localsids
528         self.logger.debug(self.vapi.cli("show sr localsid"))
529
530         # create IPv6 packets with SRH (SL=2, SL=1, SL=0)
531         # send one packet per SL value per packet size
532         # SL=0 packet with localSID End with USP needs 2nd SRH
533         count = len(self.pg_packet_sizes)
534         dst_inner = 'a4::1234'
535         pkts = []
536
537         # packets with segments-left 2, active segment a3::
538         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
539                 dst_inner,
540                 sidlist=['a5::', 'a4::', 'a3::'],
541                 segleft=2)
542         # create traffic stream pg0->pg1
543         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
544                                        self.pg_packet_sizes, count))
545
546         # packets with segments-left 1, active segment a3::
547         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
548                 dst_inner,
549                 sidlist=['a4::', 'a3::', 'a2::'],
550                 segleft=1)
551         # add to traffic stream pg0->pg1
552         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
553                                        self.pg_packet_sizes, count))
554
555         # TODO: test behavior with SL=0 packet (needs 2*SRH?)
556
557         # send packets and verify received packets
558         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
559                                   self.compare_rx_tx_packet_End)
560
561         # log the localsid counters
562         self.logger.info(self.vapi.cli("show sr localsid"))
563
564         # remove SRv6 localSIDs
565         localsid.remove_vpp_config()
566
567         # remove FIB entries
568         # done by tearDown
569
570         # cleanup interfaces
571         self.teardown_interfaces()
572
573     def test_SRv6_End_with_PSP(self):
574         """ Test SRv6 End with PSP behavior.
575         """
576         # send traffic to one destination interface
577         # source and destination interfaces are IPv6 only
578         self.setup_interfaces(ipv6=[True, True])
579
580         # configure FIB entries
581         route = VppIpRoute(self, "a4::", 64,
582                            [VppRoutePath(self.pg1.remote_ip6,
583                                          self.pg1.sw_if_index,
584                                          proto=DpoProto.DPO_PROTO_IP6)],
585                            is_ip6=1)
586         route.add_vpp_config()
587
588         # configure SRv6 localSID End with PSP behavior
589         localsid = VppSRv6LocalSID(
590                         self, localsid={'addr': 'A3::0'},
591                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
592                         nh_addr4='0.0.0.0',
593                         nh_addr6='::',
594                         end_psp=1,
595                         sw_if_index=0,
596                         vlan_index=0,
597                         fib_table=0)
598         localsid.add_vpp_config()
599         # log the localsids
600         self.logger.debug(self.vapi.cli("show sr localsid"))
601
602         # create IPv6 packets with SRH (SL=2, SL=1)
603         # send one packet per SL value per packet size
604         # SL=0 packet with localSID End with PSP is dropped
605         count = len(self.pg_packet_sizes)
606         dst_inner = 'a4::1234'
607         pkts = []
608
609         # packets with segments-left 2, active segment a3::
610         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
611                     dst_inner,
612                     sidlist=['a5::', 'a4::', 'a3::'],
613                     segleft=2)
614         # create traffic stream pg0->pg1
615         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
616                                        self.pg_packet_sizes, count))
617
618         # packets with segments-left 1, active segment a3::
619         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
620                     dst_inner,
621                     sidlist=['a4::', 'a3::', 'a2::'],
622                     segleft=1)
623         # add to traffic stream pg0->pg1
624         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
625                                        self.pg_packet_sizes, count))
626
627         # send packets and verify received packets
628         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
629                                   self.compare_rx_tx_packet_End_PSP)
630
631         # log the localsid counters
632         self.logger.info(self.vapi.cli("show sr localsid"))
633
634         # remove SRv6 localSIDs
635         localsid.remove_vpp_config()
636
637         # remove FIB entries
638         # done by tearDown
639
640         # cleanup interfaces
641         self.teardown_interfaces()
642
643     def test_SRv6_End_X(self):
644         """ Test SRv6 End.X (without PSP) behavior.
645         """
646         # create three interfaces (1 source, 2 destinations)
647         # source and destination interfaces are IPv6 only
648         self.setup_interfaces(ipv6=[True, True, True])
649
650         # configure FIB entries
651         # a4::/64 via pg1 and pg2
652         route = VppIpRoute(self, "a4::", 64,
653                            [VppRoutePath(self.pg1.remote_ip6,
654                                          self.pg1.sw_if_index,
655                                          proto=DpoProto.DPO_PROTO_IP6),
656                             VppRoutePath(self.pg2.remote_ip6,
657                                          self.pg2.sw_if_index,
658                                          proto=DpoProto.DPO_PROTO_IP6)],
659                            is_ip6=1)
660         route.add_vpp_config()
661         self.logger.debug(self.vapi.cli("show ip6 fib"))
662
663         # configure SRv6 localSID End.X without PSP behavior
664         # End.X points to interface pg1
665         localsid = VppSRv6LocalSID(
666                         self, localsid={'addr': 'A3::C4'},
667                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
668                         nh_addr4='0.0.0.0',
669                         nh_addr6=self.pg1.remote_ip6,
670                         end_psp=0,
671                         sw_if_index=self.pg1.sw_if_index,
672                         vlan_index=0,
673                         fib_table=0)
674         localsid.add_vpp_config()
675         # log the localsids
676         self.logger.debug(self.vapi.cli("show sr localsid"))
677
678         # create IPv6 packets with SRH (SL=2, SL=1)
679         # send one packet per SL value per packet size
680         # SL=0 packet with localSID End with PSP is dropped
681         count = len(self.pg_packet_sizes)
682         dst_inner = 'a4::1234'
683         pkts = []
684
685         # packets with segments-left 2, active segment a3::c4
686         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
687                     dst_inner,
688                     sidlist=['a5::', 'a4::', 'a3::c4'],
689                     segleft=2)
690         # create traffic stream pg0->pg1
691         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
692                                        self.pg_packet_sizes, count))
693
694         # packets with segments-left 1, active segment a3::c4
695         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
696                     dst_inner,
697                     sidlist=['a4::', 'a3::c4', 'a2::'],
698                     segleft=1)
699         # add to traffic stream pg0->pg1
700         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
701                                        self.pg_packet_sizes, count))
702
703         # send packets and verify received packets
704         # using same comparison function as End (no PSP)
705         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
706                                   self.compare_rx_tx_packet_End)
707
708         # assert nothing was received on the other interface (pg2)
709         self.pg2.assert_nothing_captured("mis-directed packet(s)")
710
711         # log the localsid counters
712         self.logger.info(self.vapi.cli("show sr localsid"))
713
714         # remove SRv6 localSIDs
715         localsid.remove_vpp_config()
716
717         # remove FIB entries
718         # done by tearDown
719
720         # cleanup interfaces
721         self.teardown_interfaces()
722
723     def test_SRv6_End_X_with_PSP(self):
724         """ Test SRv6 End.X with PSP behavior.
725         """
726         # create three interfaces (1 source, 2 destinations)
727         # source and destination interfaces are IPv6 only
728         self.setup_interfaces(ipv6=[True, True, True])
729
730         # configure FIB entries
731         # a4::/64 via pg1 and pg2
732         route = VppIpRoute(self, "a4::", 64,
733                            [VppRoutePath(self.pg1.remote_ip6,
734                                          self.pg1.sw_if_index,
735                                          proto=DpoProto.DPO_PROTO_IP6),
736                             VppRoutePath(self.pg2.remote_ip6,
737                                          self.pg2.sw_if_index,
738                                          proto=DpoProto.DPO_PROTO_IP6)],
739                            is_ip6=1)
740         route.add_vpp_config()
741
742         # configure SRv6 localSID End with PSP behavior
743         localsid = VppSRv6LocalSID(
744                         self, localsid={'addr': 'A3::C4'},
745                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
746                         nh_addr4='0.0.0.0',
747                         nh_addr6=self.pg1.remote_ip6,
748                         end_psp=1,
749                         sw_if_index=self.pg1.sw_if_index,
750                         vlan_index=0,
751                         fib_table=0)
752         localsid.add_vpp_config()
753         # log the localsids
754         self.logger.debug(self.vapi.cli("show sr localsid"))
755
756         # create IPv6 packets with SRH (SL=2, SL=1)
757         # send one packet per SL value per packet size
758         # SL=0 packet with localSID End with PSP is dropped
759         count = len(self.pg_packet_sizes)
760         dst_inner = 'a4::1234'
761         pkts = []
762
763         # packets with segments-left 2, active segment a3::
764         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
765                     dst_inner,
766                     sidlist=['a5::', 'a4::', 'a3::c4'],
767                     segleft=2)
768         # create traffic stream pg0->pg1
769         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
770                                        self.pg_packet_sizes, count))
771
772         # packets with segments-left 1, active segment a3::
773         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
774                     dst_inner,
775                     sidlist=['a4::', 'a3::c4', 'a2::'],
776                     segleft=1)
777         # add to traffic stream pg0->pg1
778         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
779                                        self.pg_packet_sizes, count))
780
781         # send packets and verify received packets
782         # using same comparison function as End with PSP
783         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
784                                   self.compare_rx_tx_packet_End_PSP)
785
786         # assert nothing was received on the other interface (pg2)
787         self.pg2.assert_nothing_captured("mis-directed packet(s)")
788
789         # log the localsid counters
790         self.logger.info(self.vapi.cli("show sr localsid"))
791
792         # remove SRv6 localSIDs
793         localsid.remove_vpp_config()
794
795         # remove FIB entries
796         # done by tearDown
797
798         # cleanup interfaces
799         self.teardown_interfaces()
800
801     def test_SRv6_End_DX6(self):
802         """ Test SRv6 End.DX6 behavior.
803         """
804         # send traffic to one destination interface
805         # source and destination interfaces are IPv6 only
806         self.setup_interfaces(ipv6=[True, True])
807
808         # configure SRv6 localSID End.DX6 behavior
809         localsid = VppSRv6LocalSID(
810                         self, localsid={'addr': 'A3::C4'},
811                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX6,
812                         nh_addr4='0.0.0.0',
813                         nh_addr6=self.pg1.remote_ip6,
814                         end_psp=0,
815                         sw_if_index=self.pg1.sw_if_index,
816                         vlan_index=0,
817                         fib_table=0)
818         localsid.add_vpp_config()
819         # log the localsids
820         self.logger.debug(self.vapi.cli("show sr localsid"))
821
822         # create IPv6 packets with SRH (SL=0)
823         # send one packet per packet size
824         count = len(self.pg_packet_sizes)
825         dst_inner = 'a4::1234'  # inner header destination address
826         pkts = []
827
828         # packets with SRH, segments-left 0, active segment a3::c4
829         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
830                         dst_inner,
831                         sidlist=['a3::c4', 'a2::', 'a1::'],
832                         segleft=0)
833         # add to traffic stream pg0->pg1
834         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
835                                        self.pg_packet_sizes, count))
836
837         # packets without SRH, IPv6 in IPv6
838         # outer IPv6 dest addr is the localsid End.DX6
839         packet_header = self.create_packet_header_IPv6_IPv6(
840                                             dst_inner,
841                                             dst_outer='a3::c4')
842         # add to traffic stream pg0->pg1
843         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
844                                        self.pg_packet_sizes, count))
845
846         # send packets and verify received packets
847         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
848                                   self.compare_rx_tx_packet_End_DX6)
849
850         # log the localsid counters
851         self.logger.info(self.vapi.cli("show sr localsid"))
852
853         # remove SRv6 localSIDs
854         localsid.remove_vpp_config()
855
856         # cleanup interfaces
857         self.teardown_interfaces()
858
859     def test_SRv6_End_DT6(self):
860         """ Test SRv6 End.DT6 behavior.
861         """
862         # create three interfaces (1 source, 2 destinations)
863         # all interfaces are IPv6 only
864         # source interface in global FIB (0)
865         # destination interfaces in global and vrf
866         vrf_1 = 1
867         ipt = VppIpTable(self, vrf_1, is_ip6=True)
868         ipt.add_vpp_config()
869         self.setup_interfaces(ipv6=[True, True, True],
870                               ipv6_table_id=[0, 0, vrf_1])
871
872         # configure FIB entries
873         # a4::/64 is reachable
874         #     via pg1 in table 0 (global)
875         #     and via pg2 in table vrf_1
876         route0 = VppIpRoute(self, "a4::", 64,
877                             [VppRoutePath(self.pg1.remote_ip6,
878                                           self.pg1.sw_if_index,
879                                           proto=DpoProto.DPO_PROTO_IP6,
880                                           nh_table_id=0)],
881                             table_id=0,
882                             is_ip6=1)
883         route0.add_vpp_config()
884         route1 = VppIpRoute(self, "a4::", 64,
885                             [VppRoutePath(self.pg2.remote_ip6,
886                                           self.pg2.sw_if_index,
887                                           proto=DpoProto.DPO_PROTO_IP6,
888                                           nh_table_id=vrf_1)],
889                             table_id=vrf_1,
890                             is_ip6=1)
891         route1.add_vpp_config()
892         self.logger.debug(self.vapi.cli("show ip6 fib"))
893
894         # configure SRv6 localSID End.DT6 behavior
895         # Note:
896         # fib_table: where the localsid is installed
897         # sw_if_index: in T-variants of localsid this is the vrf table_id
898         localsid = VppSRv6LocalSID(
899                         self, localsid={'addr': 'A3::C4'},
900                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT6,
901                         nh_addr4='0.0.0.0',
902                         nh_addr6='::',
903                         end_psp=0,
904                         sw_if_index=vrf_1,
905                         vlan_index=0,
906                         fib_table=0)
907         localsid.add_vpp_config()
908         # log the localsids
909         self.logger.debug(self.vapi.cli("show sr localsid"))
910
911         # create IPv6 packets with SRH (SL=0)
912         # send one packet per packet size
913         count = len(self.pg_packet_sizes)
914         dst_inner = 'a4::1234'  # inner header destination address
915         pkts = []
916
917         # packets with SRH, segments-left 0, active segment a3::c4
918         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
919                         dst_inner,
920                         sidlist=['a3::c4', 'a2::', 'a1::'],
921                         segleft=0)
922         # add to traffic stream pg0->pg1
923         pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
924                                        self.pg_packet_sizes, count))
925
926         # packets without SRH, IPv6 in IPv6
927         # outer IPv6 dest addr is the localsid End.DT6
928         packet_header = self.create_packet_header_IPv6_IPv6(
929                                             dst_inner,
930                                             dst_outer='a3::c4')
931         # add to traffic stream pg0->pg1
932         pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
933                                        self.pg_packet_sizes, count))
934
935         # send packets and verify received packets
936         # using same comparison function as End.DX6
937         self.send_and_verify_pkts(self.pg0, pkts, self.pg2,
938                                   self.compare_rx_tx_packet_End_DX6)
939
940         # assert nothing was received on the other interface (pg2)
941         self.pg1.assert_nothing_captured("mis-directed packet(s)")
942
943         # log the localsid counters
944         self.logger.info(self.vapi.cli("show sr localsid"))
945
946         # remove SRv6 localSIDs
947         localsid.remove_vpp_config()
948
949         # remove FIB entries
950         # done by tearDown
951
952         # cleanup interfaces
953         self.teardown_interfaces()
954
955     def test_SRv6_End_DX4(self):
956         """ Test SRv6 End.DX4 behavior.
957         """
958         # send traffic to one destination interface
959         # source interface is IPv6 only
960         # destination interface is IPv4 only
961         self.setup_interfaces(ipv6=[True, False], ipv4=[False, True])
962
963         # configure SRv6 localSID End.DX4 behavior
964         localsid = VppSRv6LocalSID(
965                         self, localsid={'addr': 'A3::C4'},
966                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX4,
967                         nh_addr4=self.pg1.remote_ip4,
968                         nh_addr6='::',
969                         end_psp=0,
970                         sw_if_index=self.pg1.sw_if_index,
971                         vlan_index=0,
972                         fib_table=0)
973         localsid.add_vpp_config()
974         # log the localsids
975         self.logger.debug(self.vapi.cli("show sr localsid"))
976
977         # send one packet per packet size
978         count = len(self.pg_packet_sizes)
979         dst_inner = '4.1.1.123'  # inner header destination address
980         pkts = []
981
982         # packets with SRH, segments-left 0, active segment a3::c4
983         packet_header = self.create_packet_header_IPv6_SRH_IPv4(
984                         dst_inner,
985                         sidlist=['a3::c4', 'a2::', 'a1::'],
986                         segleft=0)
987         # add to traffic stream pg0->pg1
988         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
989                                        self.pg_packet_sizes, count))
990
991         # packets without SRH, IPv4 in IPv6
992         # outer IPv6 dest addr is the localsid End.DX4
993         packet_header = self.create_packet_header_IPv6_IPv4(
994                                             dst_inner,
995                                             dst_outer='a3::c4')
996         # add to traffic stream pg0->pg1
997         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
998                                        self.pg_packet_sizes, count))
999
1000         # send packets and verify received packets
1001         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
1002                                   self.compare_rx_tx_packet_End_DX4)
1003
1004         # log the localsid counters
1005         self.logger.info(self.vapi.cli("show sr localsid"))
1006
1007         # remove SRv6 localSIDs
1008         localsid.remove_vpp_config()
1009
1010         # cleanup interfaces
1011         self.teardown_interfaces()
1012
1013     def test_SRv6_End_DT4(self):
1014         """ Test SRv6 End.DT4 behavior.
1015         """
1016         # create three interfaces (1 source, 2 destinations)
1017         # source interface is IPv6-only
1018         # destination interfaces are IPv4 only
1019         # source interface in global FIB (0)
1020         # destination interfaces in global and vrf
1021         vrf_1 = 1
1022         ipt = VppIpTable(self, vrf_1)
1023         ipt.add_vpp_config()
1024         self.setup_interfaces(ipv6=[True, False, False],
1025                               ipv4=[False, True, True],
1026                               ipv6_table_id=[0, 0, 0],
1027                               ipv4_table_id=[0, 0, vrf_1])
1028
1029         # configure FIB entries
1030         # 4.1.1.0/24 is reachable
1031         #     via pg1 in table 0 (global)
1032         #     and via pg2 in table vrf_1
1033         route0 = VppIpRoute(self, "4.1.1.0", 24,
1034                             [VppRoutePath(self.pg1.remote_ip4,
1035                                           self.pg1.sw_if_index,
1036                                           nh_table_id=0)],
1037                             table_id=0,
1038                             is_ip6=0)
1039         route0.add_vpp_config()
1040         route1 = VppIpRoute(self, "4.1.1.0", 24,
1041                             [VppRoutePath(self.pg2.remote_ip4,
1042                                           self.pg2.sw_if_index,
1043                                           nh_table_id=vrf_1)],
1044                             table_id=vrf_1,
1045                             is_ip6=0)
1046         route1.add_vpp_config()
1047         self.logger.debug(self.vapi.cli("show ip fib"))
1048
1049         # configure SRv6 localSID End.DT6 behavior
1050         # Note:
1051         # fib_table: where the localsid is installed
1052         # sw_if_index: in T-variants of localsid: vrf table_id
1053         localsid = VppSRv6LocalSID(
1054                         self, localsid={'addr': 'A3::C4'},
1055                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT4,
1056                         nh_addr4='0.0.0.0',
1057                         nh_addr6='::',
1058                         end_psp=0,
1059                         sw_if_index=vrf_1,
1060                         vlan_index=0,
1061                         fib_table=0)
1062         localsid.add_vpp_config()
1063         # log the localsids
1064         self.logger.debug(self.vapi.cli("show sr localsid"))
1065
1066         # create IPv6 packets with SRH (SL=0)
1067         # send one packet per packet size
1068         count = len(self.pg_packet_sizes)
1069         dst_inner = '4.1.1.123'  # inner header destination address
1070         pkts = []
1071
1072         # packets with SRH, segments-left 0, active segment a3::c4
1073         packet_header = self.create_packet_header_IPv6_SRH_IPv4(
1074                         dst_inner,
1075                         sidlist=['a3::c4', 'a2::', 'a1::'],
1076                         segleft=0)
1077         # add to traffic stream pg0->pg1
1078         pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
1079                                        self.pg_packet_sizes, count))
1080
1081         # packets without SRH, IPv6 in IPv6
1082         # outer IPv6 dest addr is the localsid End.DX4
1083         packet_header = self.create_packet_header_IPv6_IPv4(
1084                                             dst_inner,
1085                                             dst_outer='a3::c4')
1086         # add to traffic stream pg0->pg1
1087         pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
1088                                        self.pg_packet_sizes, count))
1089
1090         # send packets and verify received packets
1091         # using same comparison function as End.DX4
1092         self.send_and_verify_pkts(self.pg0, pkts, self.pg2,
1093                                   self.compare_rx_tx_packet_End_DX4)
1094
1095         # assert nothing was received on the other interface (pg2)
1096         self.pg1.assert_nothing_captured("mis-directed packet(s)")
1097
1098         # log the localsid counters
1099         self.logger.info(self.vapi.cli("show sr localsid"))
1100
1101         # remove SRv6 localSIDs
1102         localsid.remove_vpp_config()
1103
1104         # remove FIB entries
1105         # done by tearDown
1106
1107         # cleanup interfaces
1108         self.teardown_interfaces()
1109
1110     def test_SRv6_End_DX2(self):
1111         """ Test SRv6 End.DX2 behavior.
1112         """
1113         # send traffic to one destination interface
1114         # source interface is IPv6 only
1115         self.setup_interfaces(ipv6=[True, False], ipv4=[False, False])
1116
1117         # configure SRv6 localSID End.DX2 behavior
1118         localsid = VppSRv6LocalSID(
1119                         self, localsid={'addr': 'A3::C4'},
1120                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX2,
1121                         nh_addr4='0.0.0.0',
1122                         nh_addr6='::',
1123                         end_psp=0,
1124                         sw_if_index=self.pg1.sw_if_index,
1125                         vlan_index=0,
1126                         fib_table=0)
1127         localsid.add_vpp_config()
1128         # log the localsids
1129         self.logger.debug(self.vapi.cli("show sr localsid"))
1130
1131         # send one packet per packet size
1132         count = len(self.pg_packet_sizes)
1133         pkts = []
1134
1135         # packets with SRH, segments-left 0, active segment a3::c4
1136         # L2 has no dot1q header
1137         packet_header = self.create_packet_header_IPv6_SRH_L2(
1138                             sidlist=['a3::c4', 'a2::', 'a1::'],
1139                             segleft=0,
1140                             vlan=0)
1141         # add to traffic stream pg0->pg1
1142         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1143                                        self.pg_packet_sizes, count))
1144
1145         # packets with SRH, segments-left 0, active segment a3::c4
1146         # L2 has dot1q header
1147         packet_header = self.create_packet_header_IPv6_SRH_L2(
1148                             sidlist=['a3::c4', 'a2::', 'a1::'],
1149                             segleft=0,
1150                             vlan=123)
1151         # add to traffic stream pg0->pg1
1152         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1153                                        self.pg_packet_sizes, count))
1154
1155         # packets without SRH, L2 in IPv6
1156         # outer IPv6 dest addr is the localsid End.DX2
1157         # L2 has no dot1q header
1158         packet_header = self.create_packet_header_IPv6_L2(
1159                                             dst_outer='a3::c4',
1160                                             vlan=0)
1161         # add to traffic stream pg0->pg1
1162         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1163                                        self.pg_packet_sizes, count))
1164
1165         # packets without SRH, L2 in IPv6
1166         # outer IPv6 dest addr is the localsid End.DX2
1167         # L2 has dot1q header
1168         packet_header = self.create_packet_header_IPv6_L2(
1169                                             dst_outer='a3::c4',
1170                                             vlan=123)
1171         # add to traffic stream pg0->pg1
1172         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1173                                        self.pg_packet_sizes, count))
1174
1175         # send packets and verify received packets
1176         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
1177                                   self.compare_rx_tx_packet_End_DX2)
1178
1179         # log the localsid counters
1180         self.logger.info(self.vapi.cli("show sr localsid"))
1181
1182         # remove SRv6 localSIDs
1183         localsid.remove_vpp_config()
1184
1185         # cleanup interfaces
1186         self.teardown_interfaces()
1187
1188     @unittest.skipUnless(0, "PC to fix")
1189     def test_SRv6_T_Insert_Classifier(self):
1190         """ Test SRv6 Transit.Insert behavior (IPv6 only).
1191             steer packets using the classifier
1192         """
1193         # send traffic to one destination interface
1194         # source and destination are IPv6 only
1195         self.setup_interfaces(ipv6=[False, False, False, True, True])
1196
1197         # configure FIB entries
1198         route = VppIpRoute(self, "a4::", 64,
1199                            [VppRoutePath(self.pg4.remote_ip6,
1200                                          self.pg4.sw_if_index,
1201                                          proto=DpoProto.DPO_PROTO_IP6)],
1202                            is_ip6=1)
1203         route.add_vpp_config()
1204
1205         # configure encaps IPv6 source address
1206         # needs to be done before SR Policy config
1207         # TODO: API?
1208         self.vapi.cli("set sr encaps source addr a3::")
1209
1210         bsid = 'a3::9999:1'
1211         # configure SRv6 Policy
1212         # Note: segment list order: first -> last
1213         sr_policy = VppSRv6Policy(
1214             self, bsid=bsid,
1215             is_encap=0,
1216             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
1217             weight=1, fib_table=0,
1218             segments=['a4::', 'a5::', 'a6::c7'],
1219             source='a3::')
1220         sr_policy.add_vpp_config()
1221         self.sr_policy = sr_policy
1222
1223         # log the sr policies
1224         self.logger.info(self.vapi.cli("show sr policies"))
1225
1226         # add classify table
1227         # mask on dst ip address prefix a7::/8
1228         mask = '{!s:0<16}'.format('ff')
1229         r = self.vapi.classify_add_del_table(
1230             1,
1231             binascii.unhexlify(mask),
1232             match_n_vectors=(len(mask) - 1) // 32 + 1,
1233             current_data_flag=1,
1234             skip_n_vectors=2)  # data offset
1235         self.assertIsNotNone(r, 'No response msg for add_del_table')
1236         table_index = r.new_table_index
1237
1238         # add the source routing node as a ip6 inacl netxt node
1239         r = self.vapi.add_node_next('ip6-inacl',
1240                                     'sr-pl-rewrite-insert')
1241         inacl_next_node_index = r.node_index
1242
1243         match = '{!s:0<16}'.format('a7')
1244         r = self.vapi.classify_add_del_session(
1245             1,
1246             table_index,
1247             binascii.unhexlify(match),
1248             hit_next_index=inacl_next_node_index,
1249             action=3,
1250             metadata=0)  # sr policy index
1251         self.assertIsNotNone(r, 'No response msg for add_del_session')
1252
1253         # log the classify table used in the steering policy
1254         self.logger.info(self.vapi.cli("show classify table"))
1255
1256         r = self.vapi.input_acl_set_interface(
1257             is_add=1,
1258             sw_if_index=self.pg3.sw_if_index,
1259             ip6_table_index=table_index)
1260         self.assertIsNotNone(r,
1261                              'No response msg for input_acl_set_interface')
1262
1263         # log the ip6 inacl
1264         self.logger.info(self.vapi.cli("show inacl type ip6"))
1265
1266         # create packets
1267         count = len(self.pg_packet_sizes)
1268         dst_inner = 'a7::1234'
1269         pkts = []
1270
1271         # create IPv6 packets without SRH
1272         packet_header = self.create_packet_header_IPv6(dst_inner)
1273         # create traffic stream pg3->pg4
1274         pkts.extend(self.create_stream(self.pg3, self.pg4, packet_header,
1275                                        self.pg_packet_sizes, count))
1276
1277         # create IPv6 packets with SRH
1278         # packets with segments-left 1, active segment a7::
1279         packet_header = self.create_packet_header_IPv6_SRH(
1280             sidlist=['a8::', 'a7::', 'a6::'],
1281             segleft=1)
1282         # create traffic stream pg3->pg4
1283         pkts.extend(self.create_stream(self.pg3, self.pg4, packet_header,
1284                                        self.pg_packet_sizes, count))
1285
1286         # send packets and verify received packets
1287         self.send_and_verify_pkts(self.pg3, pkts, self.pg4,
1288                                   self.compare_rx_tx_packet_T_Insert)
1289
1290         # remove the interface l2 input feature
1291         r = self.vapi.input_acl_set_interface(
1292             is_add=0,
1293             sw_if_index=self.pg3.sw_if_index,
1294             ip6_table_index=table_index)
1295         self.assertIsNotNone(r,
1296                              'No response msg for input_acl_set_interface')
1297
1298         # log the ip6 inacl after cleaning
1299         self.logger.info(self.vapi.cli("show inacl type ip6"))
1300
1301         # log the localsid counters
1302         self.logger.info(self.vapi.cli("show sr localsid"))
1303
1304         # remove classifier SR steering
1305         # classifier_steering.remove_vpp_config()
1306         self.logger.info(self.vapi.cli("show sr steering policies"))
1307
1308         # remove SR Policies
1309         self.sr_policy.remove_vpp_config()
1310         self.logger.info(self.vapi.cli("show sr policies"))
1311
1312         # remove classify session and table
1313         r = self.vapi.classify_add_del_session(
1314             0,
1315             table_index,
1316             binascii.unhexlify(match))
1317         self.assertIsNotNone(r, 'No response msg for add_del_session')
1318
1319         r = self.vapi.classify_add_del_table(
1320             0,
1321             binascii.unhexlify(mask),
1322             table_index=table_index)
1323         self.assertIsNotNone(r, 'No response msg for add_del_table')
1324
1325         self.logger.info(self.vapi.cli("show classify table"))
1326
1327         # remove FIB entries
1328         # done by tearDown
1329
1330         # cleanup interfaces
1331         self.teardown_interfaces()
1332
1333     def compare_rx_tx_packet_T_Encaps(self, tx_pkt, rx_pkt):
1334         """ Compare input and output packet after passing T.Encaps
1335
1336         :param tx_pkt: transmitted packet
1337         :param rx_pkt: received packet
1338         """
1339         # T.Encaps updates the headers as follows:
1340         # SR Policy seglist (S3, S2, S1)
1341         # SR Policy source C
1342         # IPv6:
1343         # in: IPv6(A, B2)
1344         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(A, B2)
1345         # IPv6 + SRH:
1346         # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1)
1347         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(a, B2)SRH(B3, B2, B1; SL=1)
1348
1349         # get first (outer) IPv6 header of rx'ed packet
1350         rx_ip = rx_pkt.getlayer(IPv6)
1351         rx_srh = None
1352
1353         tx_ip = tx_pkt.getlayer(IPv6)
1354
1355         # expected segment-list
1356         seglist = self.sr_policy.segments
1357         # reverse list to get order as in SRH
1358         tx_seglist = seglist[::-1]
1359
1360         # get source address of SR Policy
1361         sr_policy_source = self.sr_policy.source
1362
1363         # rx'ed packet should have SRH
1364         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1365         # get SRH
1366         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1367
1368         # received ip.src should be equal to SR Policy source
1369         self.assertEqual(rx_ip.src, sr_policy_source)
1370         # received ip.dst should be equal to expected sidlist[lastentry]
1371         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1372         # rx'ed seglist should be equal to expected seglist
1373         self.assertEqual(rx_srh.addresses, tx_seglist)
1374         # segleft should be equal to size expected seglist-1
1375         self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1376         # segleft should be equal to lastentry
1377         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1378
1379         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1380         # except for the hop-limit field
1381         #   -> update tx'ed hlim to the expected hlim
1382         tx_ip.hlim = tx_ip.hlim - 1
1383
1384         self.assertEqual(rx_srh.payload, tx_ip)
1385
1386         self.logger.debug("packet verification: SUCCESS")
1387
1388     def compare_rx_tx_packet_T_Encaps_IPv4(self, tx_pkt, rx_pkt):
1389         """ Compare input and output packet after passing T.Encaps for IPv4
1390
1391         :param tx_pkt: transmitted packet
1392         :param rx_pkt: received packet
1393         """
1394         # T.Encaps for IPv4 updates the headers as follows:
1395         # SR Policy seglist (S3, S2, S1)
1396         # SR Policy source C
1397         # IPv4:
1398         # in: IPv4(A, B2)
1399         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv4(A, B2)
1400
1401         # get first (outer) IPv6 header of rx'ed packet
1402         rx_ip = rx_pkt.getlayer(IPv6)
1403         rx_srh = None
1404
1405         tx_ip = tx_pkt.getlayer(IP)
1406
1407         # expected segment-list
1408         seglist = self.sr_policy.segments
1409         # reverse list to get order as in SRH
1410         tx_seglist = seglist[::-1]
1411
1412         # get source address of SR Policy
1413         sr_policy_source = self.sr_policy.source
1414
1415         # checks common to cases tx with and without SRH
1416         # rx'ed packet should have SRH and IPv4 header
1417         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1418         self.assertTrue(rx_ip.payload.haslayer(IP))
1419         # get SRH
1420         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1421
1422         # received ip.src should be equal to SR Policy source
1423         self.assertEqual(rx_ip.src, sr_policy_source)
1424         # received ip.dst should be equal to sidlist[lastentry]
1425         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1426         # rx'ed seglist should be equal to seglist
1427         self.assertEqual(rx_srh.addresses, tx_seglist)
1428         # segleft should be equal to size seglist-1
1429         self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1430         # segleft should be equal to lastentry
1431         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1432
1433         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1434         # except for the ttl field and ip checksum
1435         #   -> adjust tx'ed ttl to expected ttl
1436         tx_ip.ttl = tx_ip.ttl - 1
1437         #   -> set tx'ed ip checksum to None and let scapy recompute
1438         tx_ip.chksum = None
1439         # read back the pkt (with str()) to force computing these fields
1440         # probably other ways to accomplish this are possible
1441         tx_ip = IP(scapy.compat.raw(tx_ip))
1442
1443         self.assertEqual(rx_srh.payload, tx_ip)
1444
1445         self.logger.debug("packet verification: SUCCESS")
1446
1447     def compare_rx_tx_packet_T_Encaps_L2(self, tx_pkt, rx_pkt):
1448         """ Compare input and output packet after passing T.Encaps for L2
1449
1450         :param tx_pkt: transmitted packet
1451         :param rx_pkt: received packet
1452         """
1453         # T.Encaps for L2 updates the headers as follows:
1454         # SR Policy seglist (S3, S2, S1)
1455         # SR Policy source C
1456         # L2:
1457         # in: L2
1458         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)L2
1459
1460         # get first (outer) IPv6 header of rx'ed packet
1461         rx_ip = rx_pkt.getlayer(IPv6)
1462         rx_srh = None
1463
1464         tx_ether = tx_pkt.getlayer(Ether)
1465
1466         # expected segment-list
1467         seglist = self.sr_policy.segments
1468         # reverse list to get order as in SRH
1469         tx_seglist = seglist[::-1]
1470
1471         # get source address of SR Policy
1472         sr_policy_source = self.sr_policy.source
1473
1474         # rx'ed packet should have SRH
1475         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1476         # get SRH
1477         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1478
1479         # received ip.src should be equal to SR Policy source
1480         self.assertEqual(rx_ip.src, sr_policy_source)
1481         # received ip.dst should be equal to sidlist[lastentry]
1482         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1483         # rx'ed seglist should be equal to seglist
1484         self.assertEqual(rx_srh.addresses, tx_seglist)
1485         # segleft should be equal to size seglist-1
1486         self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1487         # segleft should be equal to lastentry
1488         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1489         # nh should be "No Next Header" (59)
1490         self.assertEqual(rx_srh.nh, 59)
1491
1492         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1493         self.assertEqual(Ether(scapy.compat.raw(rx_srh.payload)), tx_ether)
1494
1495         self.logger.debug("packet verification: SUCCESS")
1496
1497     def compare_rx_tx_packet_T_Insert(self, tx_pkt, rx_pkt):
1498         """ Compare input and output packet after passing T.Insert
1499
1500         :param tx_pkt: transmitted packet
1501         :param rx_pkt: received packet
1502         """
1503         # T.Insert updates the headers as follows:
1504         # IPv6:
1505         # in: IPv6(A, B2)
1506         # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)
1507         # IPv6 + SRH:
1508         # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1)
1509         # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)SRH(B3, B2, B1; SL=1)
1510
1511         # get first (outer) IPv6 header of rx'ed packet
1512         rx_ip = rx_pkt.getlayer(IPv6)
1513         rx_srh = None
1514         rx_ip2 = None
1515         rx_srh2 = None
1516         rx_ip3 = None
1517         rx_udp = rx_pkt[UDP]
1518
1519         tx_ip = tx_pkt.getlayer(IPv6)
1520         tx_srh = None
1521         tx_ip2 = None
1522         # some packets have been tx'ed with an SRH, some without it
1523         # get SRH if tx'ed packet has it
1524         if tx_pkt.haslayer(IPv6ExtHdrSegmentRouting):
1525             tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1526             tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1527         tx_udp = tx_pkt[UDP]
1528
1529         # expected segment-list (make copy of SR Policy segment list)
1530         seglist = self.sr_policy.segments[:]
1531         # expected seglist has initial dest addr as last segment
1532         seglist.append(tx_ip.dst)
1533         # reverse list to get order as in SRH
1534         tx_seglist = seglist[::-1]
1535
1536         # get source address of SR Policy
1537         sr_policy_source = self.sr_policy.source
1538
1539         # checks common to cases tx with and without SRH
1540         # rx'ed packet should have SRH and only one IPv6 header
1541         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1542         self.assertFalse(rx_ip.payload.haslayer(IPv6))
1543         # get SRH
1544         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1545
1546         # rx'ed ip.src should be equal to tx'ed ip.src
1547         self.assertEqual(rx_ip.src, tx_ip.src)
1548         # rx'ed ip.dst should be equal to sidlist[lastentry]
1549         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1550
1551         # rx'ed seglist should be equal to expected seglist
1552         self.assertEqual(rx_srh.addresses, tx_seglist)
1553         # segleft should be equal to size(expected seglist)-1
1554         self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1555         # segleft should be equal to lastentry
1556         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1557
1558         if tx_srh:  # packet was tx'ed with SRH
1559             # packet should have 2nd SRH
1560             self.assertTrue(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting))
1561             # get 2nd SRH
1562             rx_srh2 = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting, 2)
1563
1564             # rx'ed srh2.addresses should be equal to tx'ed srh.addresses
1565             self.assertEqual(rx_srh2.addresses, tx_srh.addresses)
1566             # rx'ed srh2.segleft should be equal to tx'ed srh.segleft
1567             self.assertEqual(rx_srh2.segleft, tx_srh.segleft)
1568             # rx'ed srh2.lastentry should be equal to tx'ed srh.lastentry
1569             self.assertEqual(rx_srh2.lastentry, tx_srh.lastentry)
1570
1571         else:  # packet was tx'ed without SRH
1572             # rx packet should have no other SRH
1573             self.assertFalse(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting))
1574
1575         # UDP layer should be unchanged
1576         self.assertEqual(rx_udp, tx_udp)
1577
1578         self.logger.debug("packet verification: SUCCESS")
1579
1580     def compare_rx_tx_packet_End(self, tx_pkt, rx_pkt):
1581         """ Compare input and output packet after passing End (without PSP)
1582
1583         :param tx_pkt: transmitted packet
1584         :param rx_pkt: received packet
1585         """
1586         # End (no PSP) updates the headers as follows:
1587         # IPv6 + SRH:
1588         # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2)
1589         # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1590
1591         # get first (outer) IPv6 header of rx'ed packet
1592         rx_ip = rx_pkt.getlayer(IPv6)
1593         rx_srh = None
1594         rx_ip2 = None
1595         rx_udp = rx_pkt[UDP]
1596
1597         tx_ip = tx_pkt.getlayer(IPv6)
1598         # we know the packet has been tx'ed
1599         # with an inner IPv6 header and an SRH
1600         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1601         tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1602         tx_udp = tx_pkt[UDP]
1603
1604         # common checks, regardless of tx segleft value
1605         # rx'ed packet should have 2nd IPv6 header
1606         self.assertTrue(rx_ip.payload.haslayer(IPv6))
1607         # get second (inner) IPv6 header
1608         rx_ip2 = rx_pkt.getlayer(IPv6, 2)
1609
1610         if tx_ip.segleft > 0:
1611             # SRH should NOT have been popped:
1612             #   End SID without PSP does not pop SRH if segleft>0
1613             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1614             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1615
1616             # received ip.src should be equal to expected ip.src
1617             self.assertEqual(rx_ip.src, tx_ip.src)
1618             # sidlist should be unchanged
1619             self.assertEqual(rx_srh.addresses, tx_srh.addresses)
1620             # segleft should have been decremented
1621             self.assertEqual(rx_srh.segleft, tx_srh.segleft-1)
1622             # received ip.dst should be equal to sidlist[segleft]
1623             self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
1624             # lastentry should be unchanged
1625             self.assertEqual(rx_srh.lastentry, tx_srh.lastentry)
1626             # inner IPv6 packet (ip2) should be unchanged
1627             self.assertEqual(rx_ip2.src, tx_ip2.src)
1628             self.assertEqual(rx_ip2.dst, tx_ip2.dst)
1629         # else:  # tx_ip.segleft == 0
1630             # TODO: Does this work with 2 SRHs in ingress packet?
1631
1632         # UDP layer should be unchanged
1633         self.assertEqual(rx_udp, tx_udp)
1634
1635         self.logger.debug("packet verification: SUCCESS")
1636
1637     def compare_rx_tx_packet_End_PSP(self, tx_pkt, rx_pkt):
1638         """ Compare input and output packet after passing End with PSP
1639
1640         :param tx_pkt: transmitted packet
1641         :param rx_pkt: received packet
1642         """
1643         # End (PSP) updates the headers as follows:
1644         # IPv6 + SRH (SL>1):
1645         # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2)
1646         # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1647         # IPv6 + SRH (SL=1):
1648         # in: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1649         # out: IPv6(A, S3)
1650
1651         # get first (outer) IPv6 header of rx'ed packet
1652         rx_ip = rx_pkt.getlayer(IPv6)
1653         rx_srh = None
1654         rx_ip2 = None
1655         rx_udp = rx_pkt[UDP]
1656
1657         tx_ip = tx_pkt.getlayer(IPv6)
1658         # we know the packet has been tx'ed
1659         # with an inner IPv6 header and an SRH
1660         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1661         tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1662         tx_udp = tx_pkt[UDP]
1663
1664         # common checks, regardless of tx segleft value
1665         self.assertTrue(rx_ip.payload.haslayer(IPv6))
1666         rx_ip2 = rx_pkt.getlayer(IPv6, 2)
1667         # inner IPv6 packet (ip2) should be unchanged
1668         self.assertEqual(rx_ip2.src, tx_ip2.src)
1669         self.assertEqual(rx_ip2.dst, tx_ip2.dst)
1670
1671         if tx_ip.segleft > 1:
1672             # SRH should NOT have been popped:
1673             #   End SID with PSP does not pop SRH if segleft>1
1674             # rx'ed packet should have SRH
1675             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1676             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1677
1678             # received ip.src should be equal to expected ip.src
1679             self.assertEqual(rx_ip.src, tx_ip.src)
1680             # sidlist should be unchanged
1681             self.assertEqual(rx_srh.addresses, tx_srh.addresses)
1682             # segleft should have been decremented
1683             self.assertEqual(rx_srh.segleft, tx_srh.segleft-1)
1684             # received ip.dst should be equal to sidlist[segleft]
1685             self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
1686             # lastentry should be unchanged
1687             self.assertEqual(rx_srh.lastentry, tx_srh.lastentry)
1688
1689         else:  # tx_ip.segleft <= 1
1690             # SRH should have been popped:
1691             #   End SID with PSP and segleft=1 pops SRH
1692             # the two IPv6 headers are still present
1693             # outer IPv6 header has DA == last segment of popped SRH
1694             # SRH should not be present
1695             self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1696             # outer IPv6 header ip.src should be equal to tx'ed ip.src
1697             self.assertEqual(rx_ip.src, tx_ip.src)
1698             # outer IPv6 header ip.dst should be = to tx'ed sidlist[segleft-1]
1699             self.assertEqual(rx_ip.dst, tx_srh.addresses[tx_srh.segleft-1])
1700
1701         # UDP layer should be unchanged
1702         self.assertEqual(rx_udp, tx_udp)
1703
1704         self.logger.debug("packet verification: SUCCESS")
1705
1706     def compare_rx_tx_packet_End_DX6(self, tx_pkt, rx_pkt):
1707         """ Compare input and output packet after passing End.DX6
1708
1709         :param tx_pkt: transmitted packet
1710         :param rx_pkt: received packet
1711         """
1712         # End.DX6 updates the headers as follows:
1713         # IPv6 + SRH (SL=0):
1714         # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv6(B, D)
1715         # out: IPv6(B, D)
1716         # IPv6:
1717         # in: IPv6(A, S3)IPv6(B, D)
1718         # out: IPv6(B, D)
1719
1720         # get first (outer) IPv6 header of rx'ed packet
1721         rx_ip = rx_pkt.getlayer(IPv6)
1722
1723         tx_ip = tx_pkt.getlayer(IPv6)
1724         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1725
1726         # verify if rx'ed packet has no SRH
1727         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1728
1729         # the whole rx_ip pkt should be equal to tx_ip2
1730         # except for the hlim field
1731         #   -> adjust tx'ed hlim to expected hlim
1732         tx_ip2.hlim = tx_ip2.hlim - 1
1733
1734         self.assertEqual(rx_ip, tx_ip2)
1735
1736         self.logger.debug("packet verification: SUCCESS")
1737
1738     def compare_rx_tx_packet_End_DX4(self, tx_pkt, rx_pkt):
1739         """ Compare input and output packet after passing End.DX4
1740
1741         :param tx_pkt: transmitted packet
1742         :param rx_pkt: received packet
1743         """
1744         # End.DX4 updates the headers as follows:
1745         # IPv6 + SRH (SL=0):
1746         # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv4(B, D)
1747         # out: IPv4(B, D)
1748         # IPv6:
1749         # in: IPv6(A, S3)IPv4(B, D)
1750         # out: IPv4(B, D)
1751
1752         # get IPv4 header of rx'ed packet
1753         rx_ip = rx_pkt.getlayer(IP)
1754
1755         tx_ip = tx_pkt.getlayer(IPv6)
1756         tx_ip2 = tx_pkt.getlayer(IP)
1757
1758         # verify if rx'ed packet has no SRH
1759         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1760
1761         # the whole rx_ip pkt should be equal to tx_ip2
1762         # except for the ttl field and ip checksum
1763         #   -> adjust tx'ed ttl to expected ttl
1764         tx_ip2.ttl = tx_ip2.ttl - 1
1765         #   -> set tx'ed ip checksum to None and let scapy recompute
1766         tx_ip2.chksum = None
1767         # read back the pkt (with str()) to force computing these fields
1768         # probably other ways to accomplish this are possible
1769         tx_ip2 = IP(scapy.compat.raw(tx_ip2))
1770
1771         self.assertEqual(rx_ip, tx_ip2)
1772
1773         self.logger.debug("packet verification: SUCCESS")
1774
1775     def compare_rx_tx_packet_End_DX2(self, tx_pkt, rx_pkt):
1776         """ Compare input and output packet after passing End.DX2
1777
1778         :param tx_pkt: transmitted packet
1779         :param rx_pkt: received packet
1780         """
1781         # End.DX2 updates the headers as follows:
1782         # IPv6 + SRH (SL=0):
1783         # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)L2
1784         # out: L2
1785         # IPv6:
1786         # in: IPv6(A, S3)L2
1787         # out: L2
1788
1789         # get IPv4 header of rx'ed packet
1790         rx_eth = rx_pkt.getlayer(Ether)
1791
1792         tx_ip = tx_pkt.getlayer(IPv6)
1793         # we can't just get the 2nd Ether layer
1794         # get the Raw content and dissect it as Ether
1795         tx_eth1 = Ether(scapy.compat.raw(tx_pkt[Raw]))
1796
1797         # verify if rx'ed packet has no SRH
1798         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1799
1800         # the whole rx_eth pkt should be equal to tx_eth1
1801         self.assertEqual(rx_eth, tx_eth1)
1802
1803         self.logger.debug("packet verification: SUCCESS")
1804
1805     def create_stream(self, src_if, dst_if, packet_header, packet_sizes,
1806                       count):
1807         """Create SRv6 input packet stream for defined interface.
1808
1809         :param VppInterface src_if: Interface to create packet stream for
1810         :param VppInterface dst_if: destination interface of packet stream
1811         :param packet_header: Layer3 scapy packet headers,
1812                 L2 is added when not provided,
1813                 Raw(payload) with packet_info is added
1814         :param list packet_sizes: packet stream pckt sizes,sequentially applied
1815                to packets in stream have
1816         :param int count: number of packets in packet stream
1817         :return: list of packets
1818         """
1819         self.logger.info("Creating packets")
1820         pkts = []
1821         for i in range(0, count-1):
1822             payload_info = self.create_packet_info(src_if, dst_if)
1823             self.logger.debug(
1824                 "Creating packet with index %d" % (payload_info.index))
1825             payload = self.info_to_payload(payload_info)
1826             # add L2 header if not yet provided in packet_header
1827             if packet_header.getlayer(0).name == 'Ethernet':
1828                 p = (packet_header /
1829                      Raw(payload))
1830             else:
1831                 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
1832                      packet_header /
1833                      Raw(payload))
1834             size = packet_sizes[i % len(packet_sizes)]
1835             self.logger.debug("Packet size %d" % (size))
1836             self.extend_packet(p, size)
1837             # we need to store the packet with the automatic fields computed
1838             # read back the dumped packet (with str())
1839             # to force computing these fields
1840             # probably other ways are possible
1841             p = Ether(scapy.compat.raw(p))
1842             payload_info.data = p.copy()
1843             self.logger.debug(ppp("Created packet:", p))
1844             pkts.append(p)
1845         self.logger.info("Done creating packets")
1846         return pkts
1847
1848     def send_and_verify_pkts(self, input, pkts, output, compare_func):
1849         """Send packets and verify received packets using compare_func
1850
1851         :param input: ingress interface of DUT
1852         :param pkts: list of packets to transmit
1853         :param output: egress interface of DUT
1854         :param compare_func: function to compare in and out packets
1855         """
1856         # add traffic stream to input interface
1857         input.add_stream(pkts)
1858
1859         # enable capture on all interfaces
1860         self.pg_enable_capture(self.pg_interfaces)
1861
1862         # start traffic
1863         self.logger.info("Starting traffic")
1864         self.pg_start()
1865
1866         # get output capture
1867         self.logger.info("Getting packet capture")
1868         capture = output.get_capture()
1869
1870         # assert nothing was captured on input interface
1871         input.assert_nothing_captured()
1872
1873         # verify captured packets
1874         self.verify_captured_pkts(output, capture, compare_func)
1875
1876     def create_packet_header_IPv6(self, dst):
1877         """Create packet header: IPv6 header, UDP header
1878
1879         :param dst: IPv6 destination address
1880
1881         IPv6 source address is 1234::1
1882         UDP source port and destination port are 1234
1883         """
1884
1885         p = (IPv6(src='1234::1', dst=dst) /
1886              UDP(sport=1234, dport=1234))
1887         return p
1888
1889     def create_packet_header_IPv6_SRH(self, sidlist, segleft):
1890         """Create packet header: IPv6 header with SRH, UDP header
1891
1892         :param list sidlist: segment list
1893         :param int segleft: segments-left field value
1894
1895         IPv6 destination address is set to sidlist[segleft]
1896         IPv6 source addresses are 1234::1 and 4321::1
1897         UDP source port and destination port are 1234
1898         """
1899
1900         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1901              IPv6ExtHdrSegmentRouting(addresses=sidlist) /
1902              UDP(sport=1234, dport=1234))
1903         return p
1904
1905     def create_packet_header_IPv6_SRH_IPv6(self, dst, sidlist, segleft):
1906         """Create packet header: IPv6 encapsulated in SRv6:
1907         IPv6 header with SRH, IPv6 header, UDP header
1908
1909         :param ipv6address dst: inner IPv6 destination address
1910         :param list sidlist: segment list of outer IPv6 SRH
1911         :param int segleft: segments-left field of outer IPv6 SRH
1912
1913         Outer IPv6 destination address is set to sidlist[segleft]
1914         IPv6 source addresses are 1234::1 and 4321::1
1915         UDP source port and destination port are 1234
1916         """
1917
1918         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1919              IPv6ExtHdrSegmentRouting(addresses=sidlist,
1920                                       segleft=segleft, nh=41) /
1921              IPv6(src='4321::1', dst=dst) /
1922              UDP(sport=1234, dport=1234))
1923         return p
1924
1925     def create_packet_header_IPv6_IPv6(self, dst_inner, dst_outer):
1926         """Create packet header: IPv6 encapsulated in IPv6:
1927         IPv6 header, IPv6 header, UDP header
1928
1929         :param ipv6address dst_inner: inner IPv6 destination address
1930         :param ipv6address dst_outer: outer IPv6 destination address
1931
1932         IPv6 source addresses are 1234::1 and 4321::1
1933         UDP source port and destination port are 1234
1934         """
1935
1936         p = (IPv6(src='1234::1', dst=dst_outer) /
1937              IPv6(src='4321::1', dst=dst_inner) /
1938              UDP(sport=1234, dport=1234))
1939         return p
1940
1941     def create_packet_header_IPv6_SRH_SRH_IPv6(self, dst, sidlist1, segleft1,
1942                                                sidlist2, segleft2):
1943         """Create packet header: IPv6 encapsulated in SRv6 with 2 SRH:
1944         IPv6 header with SRH, 2nd SRH, IPv6 header, UDP header
1945
1946         :param ipv6address dst: inner IPv6 destination address
1947         :param list sidlist1: segment list of outer IPv6 SRH
1948         :param int segleft1: segments-left field of outer IPv6 SRH
1949         :param list sidlist2: segment list of inner IPv6 SRH
1950         :param int segleft2: segments-left field of inner IPv6 SRH
1951
1952         Outer IPv6 destination address is set to sidlist[segleft]
1953         IPv6 source addresses are 1234::1 and 4321::1
1954         UDP source port and destination port are 1234
1955         """
1956
1957         p = (IPv6(src='1234::1', dst=sidlist1[segleft1]) /
1958              IPv6ExtHdrSegmentRouting(addresses=sidlist1,
1959                                       segleft=segleft1, nh=43) /
1960              IPv6ExtHdrSegmentRouting(addresses=sidlist2,
1961                                       segleft=segleft2, nh=41) /
1962              IPv6(src='4321::1', dst=dst) /
1963              UDP(sport=1234, dport=1234))
1964         return p
1965
1966     def create_packet_header_IPv4(self, dst):
1967         """Create packet header: IPv4 header, UDP header
1968
1969         :param dst: IPv4 destination address
1970
1971         IPv4 source address is 123.1.1.1
1972         UDP source port and destination port are 1234
1973         """
1974
1975         p = (IP(src='123.1.1.1', dst=dst) /
1976              UDP(sport=1234, dport=1234))
1977         return p
1978
1979     def create_packet_header_IPv6_IPv4(self, dst_inner, dst_outer):
1980         """Create packet header: IPv4 encapsulated in IPv6:
1981         IPv6 header, IPv4 header, UDP header
1982
1983         :param ipv4address dst_inner: inner IPv4 destination address
1984         :param ipv6address dst_outer: outer IPv6 destination address
1985
1986         IPv6 source address is 1234::1
1987         IPv4 source address is 123.1.1.1
1988         UDP source port and destination port are 1234
1989         """
1990
1991         p = (IPv6(src='1234::1', dst=dst_outer) /
1992              IP(src='123.1.1.1', dst=dst_inner) /
1993              UDP(sport=1234, dport=1234))
1994         return p
1995
1996     def create_packet_header_IPv6_SRH_IPv4(self, dst, sidlist, segleft):
1997         """Create packet header: IPv4 encapsulated in SRv6:
1998         IPv6 header with SRH, IPv4 header, UDP header
1999
2000         :param ipv4address dst: inner IPv4 destination address
2001         :param list sidlist: segment list of outer IPv6 SRH
2002         :param int segleft: segments-left field of outer IPv6 SRH
2003
2004         Outer IPv6 destination address is set to sidlist[segleft]
2005         IPv6 source address is 1234::1
2006         IPv4 source address is 123.1.1.1
2007         UDP source port and destination port are 1234
2008         """
2009
2010         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
2011              IPv6ExtHdrSegmentRouting(addresses=sidlist,
2012                                       segleft=segleft, nh=4) /
2013              IP(src='123.1.1.1', dst=dst) /
2014              UDP(sport=1234, dport=1234))
2015         return p
2016
2017     def create_packet_header_L2(self, vlan=0):
2018         """Create packet header: L2 header
2019
2020         :param vlan: if vlan!=0 then add 802.1q header
2021         """
2022         # Note: the dst addr ('00:55:44:33:22:11') is used in
2023         # the compare function compare_rx_tx_packet_T_Encaps_L2
2024         # to detect presence of L2 in SRH payload
2025         p = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
2026         etype = 0x8137  # IPX
2027         if vlan:
2028             # add 802.1q layer
2029             p /= Dot1Q(vlan=vlan, type=etype)
2030         else:
2031             p.type = etype
2032         return p
2033
2034     def create_packet_header_IPv6_SRH_L2(self, sidlist, segleft, vlan=0):
2035         """Create packet header: L2 encapsulated in SRv6:
2036         IPv6 header with SRH, L2
2037
2038         :param list sidlist: segment list of outer IPv6 SRH
2039         :param int segleft: segments-left field of outer IPv6 SRH
2040         :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
2041
2042         Outer IPv6 destination address is set to sidlist[segleft]
2043         IPv6 source address is 1234::1
2044         """
2045         eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
2046         etype = 0x8137  # IPX
2047         if vlan:
2048             # add 802.1q layer
2049             eth /= Dot1Q(vlan=vlan, type=etype)
2050         else:
2051             eth.type = etype
2052
2053         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
2054              IPv6ExtHdrSegmentRouting(addresses=sidlist,
2055                                       segleft=segleft, nh=59) /
2056              eth)
2057         return p
2058
2059     def create_packet_header_IPv6_L2(self, dst_outer, vlan=0):
2060         """Create packet header: L2 encapsulated in IPv6:
2061         IPv6 header, L2
2062
2063         :param ipv6address dst_outer: outer IPv6 destination address
2064         :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
2065         """
2066         eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
2067         etype = 0x8137  # IPX
2068         if vlan:
2069             # add 802.1q layer
2070             eth /= Dot1Q(vlan=vlan, type=etype)
2071         else:
2072             eth.type = etype
2073
2074         p = (IPv6(src='1234::1', dst=dst_outer, nh=59) / eth)
2075         return p
2076
2077     def get_payload_info(self, packet):
2078         """ Extract the payload_info from the packet
2079         """
2080         # in most cases, payload_info is in packet[Raw]
2081         # but packet[Raw] gives the complete payload
2082         # (incl L2 header) for the T.Encaps L2 case
2083         try:
2084             payload_info = self.payload_to_info(packet[Raw])
2085
2086         except:
2087             # remote L2 header from packet[Raw]:
2088             # take packet[Raw], convert it to an Ether layer
2089             # and then extract Raw from it
2090             payload_info = self.payload_to_info(
2091                 Ether(scapy.compat.r(packet[Raw]))[Raw])
2092
2093         return payload_info
2094
2095     def verify_captured_pkts(self, dst_if, capture, compare_func):
2096         """
2097         Verify captured packet stream for specified interface.
2098         Compare ingress with egress packets using the specified compare fn
2099
2100         :param dst_if: egress interface of DUT
2101         :param capture: captured packets
2102         :param compare_func: function to compare in and out packet
2103         """
2104         self.logger.info("Verifying capture on interface %s using function %s"
2105                          % (dst_if.name, compare_func.__name__))
2106
2107         last_info = dict()
2108         for i in self.pg_interfaces:
2109             last_info[i.sw_if_index] = None
2110         dst_sw_if_index = dst_if.sw_if_index
2111
2112         for packet in capture:
2113             try:
2114                 # extract payload_info from packet's payload
2115                 payload_info = self.get_payload_info(packet)
2116                 packet_index = payload_info.index
2117
2118                 self.logger.debug("Verifying packet with index %d"
2119                                   % (packet_index))
2120                 # packet should have arrived on the expected interface
2121                 self.assertEqual(payload_info.dst, dst_sw_if_index)
2122                 self.logger.debug(
2123                     "Got packet on interface %s: src=%u (idx=%u)" %
2124                     (dst_if.name, payload_info.src, packet_index))
2125
2126                 # search for payload_info with same src and dst if_index
2127                 # this will give us the transmitted packet
2128                 next_info = self.get_next_packet_info_for_interface2(
2129                     payload_info.src, dst_sw_if_index,
2130                     last_info[payload_info.src])
2131                 last_info[payload_info.src] = next_info
2132                 # next_info should not be None
2133                 self.assertTrue(next_info is not None)
2134                 # index of tx and rx packets should be equal
2135                 self.assertEqual(packet_index, next_info.index)
2136                 # data field of next_info contains the tx packet
2137                 txed_packet = next_info.data
2138
2139                 self.logger.debug(ppp("Transmitted packet:",
2140                                       txed_packet))  # ppp=Pretty Print Packet
2141
2142                 self.logger.debug(ppp("Received packet:", packet))
2143
2144                 # compare rcvd packet with expected packet using compare_func
2145                 compare_func(txed_packet, packet)
2146
2147             except:
2148                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2149                 raise
2150
2151         # have all expected packets arrived?
2152         for i in self.pg_interfaces:
2153             remaining_packet = self.get_next_packet_info_for_interface2(
2154                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
2155             self.assertTrue(remaining_packet is None,
2156                             "Interface %s: Packet expected from interface %s "
2157                             "didn't arrive" % (dst_if.name, i.name))
2158
2159
2160 if __name__ == '__main__':
2161     unittest.main(testRunner=VppTestRunner)