tests: changes for scapy 2.4.3 migration
[vpp.git] / src / plugins / srv6-am / test / test_srv6.py
1 #!/usr/bin/env python3
2
3 import unittest
4 import binascii
5 from socket import AF_INET6
6
7 from framework import VppTestCase, VppTestRunner
8 from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathProto, VppIpTable
9 from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \
10     SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes
11
12 import scapy.compat
13 from scapy.packet import Raw
14 from scapy.layers.l2 import Ether, Dot1Q
15 from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
16 from scapy.layers.inet import IP, UDP
17
18 from util import ppp
19
20
21 class TestSRv6(VppTestCase):
22     """ SRv6 Test Case """
23
24     @classmethod
25     def setUpClass(cls):
26         super(TestSRv6, cls).setUpClass()
27
28     @classmethod
29     def tearDownClass(cls):
30         super(TestSRv6, cls).tearDownClass()
31
32     def setUp(self):
33         """ Perform test setup before each test case.
34         """
35         super(TestSRv6, self).setUp()
36
37         # packet sizes, inclusive L2 overhead
38         self.pg_packet_sizes = [64, 512, 1518, 9018]
39
40         # reset packet_infos
41         self.reset_packet_infos()
42
43     def tearDown(self):
44         """ Clean up test setup after each test case.
45         """
46         self.teardown_interfaces()
47
48         super(TestSRv6, self).tearDown()
49
50     def configure_interface(self,
51                             interface,
52                             ipv6=False, ipv4=False,
53                             ipv6_table_id=0, ipv4_table_id=0):
54         """ Configure interface.
55         :param ipv6: configure IPv6 on interface
56         :param ipv4: configure IPv4 on interface
57         :param ipv6_table_id: FIB table_id for IPv6
58         :param ipv4_table_id: FIB table_id for IPv4
59         """
60         self.logger.debug("Configuring interface %s" % (interface.name))
61         if ipv6:
62             self.logger.debug("Configuring IPv6")
63             interface.set_table_ip6(ipv6_table_id)
64             interface.config_ip6()
65             interface.resolve_ndp(timeout=5)
66         if ipv4:
67             self.logger.debug("Configuring IPv4")
68             interface.set_table_ip4(ipv4_table_id)
69             interface.config_ip4()
70             interface.resolve_arp()
71         interface.admin_up()
72
73     def setup_interfaces(self, ipv6=[], ipv4=[],
74                          ipv6_table_id=[], ipv4_table_id=[]):
75         """ Create and configure interfaces.
76
77         :param ipv6: list of interface IPv6 capabilities
78         :param ipv4: list of interface IPv4 capabilities
79         :param ipv6_table_id: list of intf IPv6 FIB table_ids
80         :param ipv4_table_id: list of intf IPv4 FIB table_ids
81         :returns: List of created interfaces.
82         """
83         # how many interfaces?
84         if len(ipv6):
85             count = len(ipv6)
86         else:
87             count = len(ipv4)
88         self.logger.debug("Creating and configuring %d interfaces" % (count))
89
90         # fill up ipv6 and ipv4 lists if needed
91         # not enabled (False) is the default
92         if len(ipv6) < count:
93             ipv6 += (count - len(ipv6)) * [False]
94         if len(ipv4) < count:
95             ipv4 += (count - len(ipv4)) * [False]
96
97         # fill up table_id lists if needed
98         # table_id 0 (global) is the default
99         if len(ipv6_table_id) < count:
100             ipv6_table_id += (count - len(ipv6_table_id)) * [0]
101         if len(ipv4_table_id) < count:
102             ipv4_table_id += (count - len(ipv4_table_id)) * [0]
103
104         # create 'count' pg interfaces
105         self.create_pg_interfaces(range(count))
106
107         # setup all interfaces
108         for i in range(count):
109             intf = self.pg_interfaces[i]
110             self.configure_interface(intf,
111                                      ipv6[i], ipv4[i],
112                                      ipv6_table_id[i], ipv4_table_id[i])
113
114         if any(ipv6):
115             self.logger.debug(self.vapi.cli("show ip6 neighbors"))
116         if any(ipv4):
117             self.logger.debug(self.vapi.cli("show ip arp"))
118         self.logger.debug(self.vapi.cli("show interface"))
119         self.logger.debug(self.vapi.cli("show hardware"))
120
121         return self.pg_interfaces
122
123     def teardown_interfaces(self):
124         """ Unconfigure and bring down interface.
125         """
126         self.logger.debug("Tearing down interfaces")
127         # tear down all interfaces
128         # AFAIK they cannot be deleted
129         for i in self.pg_interfaces:
130             self.logger.debug("Tear down interface %s" % (i.name))
131             i.admin_down()
132             i.unconfig()
133             i.set_table_ip4(0)
134             i.set_table_ip6(0)
135
136     @unittest.skipUnless(0, "PC to fix")
137     def test_SRv6_T_Encaps(self):
138         """ Test SRv6 Transit.Encaps behavior for IPv6.
139         """
140         # send traffic to one destination interface
141         # source and destination are IPv6 only
142         self.setup_interfaces(ipv6=[True, True])
143
144         # configure FIB entries
145         route = VppIpRoute(self, "a4::", 64,
146                            [VppRoutePath(self.pg1.remote_ip6,
147                                          self.pg1.sw_if_index)])
148         route.add_vpp_config()
149
150         # configure encaps IPv6 source address
151         # needs to be done before SR Policy config
152         # TODO: API?
153         self.vapi.cli("set sr encaps source addr a3::")
154
155         bsid = 'a3::9999:1'
156         # configure SRv6 Policy
157         # Note: segment list order: first -> last
158         sr_policy = VppSRv6Policy(
159             self, bsid=bsid,
160             is_encap=1,
161             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
162             weight=1, fib_table=0,
163             segments=['a4::', 'a5::', 'a6::c7'],
164             source='a3::')
165         sr_policy.add_vpp_config()
166         self.sr_policy = sr_policy
167
168         # log the sr policies
169         self.logger.info(self.vapi.cli("show sr policies"))
170
171         # steer IPv6 traffic to a7::/64 into SRv6 Policy
172         # use the bsid of the above self.sr_policy
173         pol_steering = VppSRv6Steering(
174                         self,
175                         bsid=self.sr_policy.bsid,
176                         prefix="a7::", mask_width=64,
177                         traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
178                         sr_policy_index=0, table_id=0,
179                         sw_if_index=0)
180         pol_steering.add_vpp_config()
181
182         # log the sr steering policies
183         self.logger.info(self.vapi.cli("show sr steering policies"))
184
185         # create packets
186         count = len(self.pg_packet_sizes)
187         dst_inner = 'a7::1234'
188         pkts = []
189
190         # create IPv6 packets without SRH
191         packet_header = self.create_packet_header_IPv6(dst_inner)
192         # create traffic stream pg0->pg1
193         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
194                                        self.pg_packet_sizes, count))
195
196         # create IPv6 packets with SRH
197         # packets with segments-left 1, active segment a7::
198         packet_header = self.create_packet_header_IPv6_SRH(
199             sidlist=['a8::', 'a7::', 'a6::'],
200             segleft=1)
201         # create traffic stream pg0->pg1
202         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
203                                        self.pg_packet_sizes, count))
204
205         # create IPv6 packets with SRH and IPv6
206         # packets with segments-left 1, active segment a7::
207         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
208             dst_inner,
209             sidlist=['a8::', 'a7::', 'a6::'],
210             segleft=1)
211         # create traffic stream pg0->pg1
212         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
213                                        self.pg_packet_sizes, count))
214
215         # send packets and verify received packets
216         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
217                                   self.compare_rx_tx_packet_T_Encaps)
218
219         # log the localsid counters
220         self.logger.info(self.vapi.cli("show sr localsid"))
221
222         # remove SR steering
223         pol_steering.remove_vpp_config()
224         self.logger.info(self.vapi.cli("show sr steering policies"))
225
226         # remove SR Policies
227         self.sr_policy.remove_vpp_config()
228         self.logger.info(self.vapi.cli("show sr policies"))
229
230         # remove FIB entries
231         # done by tearDown
232
233         # cleanup interfaces
234         self.teardown_interfaces()
235
236     @unittest.skipUnless(0, "PC to fix")
237     def test_SRv6_T_Insert(self):
238         """ Test SRv6 Transit.Insert behavior (IPv6 only).
239         """
240         # send traffic to one destination interface
241         # source and destination are IPv6 only
242         self.setup_interfaces(ipv6=[True, True])
243
244         # configure FIB entries
245         route = VppIpRoute(self, "a4::", 64,
246                            [VppRoutePath(self.pg1.remote_ip6,
247                                          self.pg1.sw_if_index)])
248         route.add_vpp_config()
249
250         # configure encaps IPv6 source address
251         # needs to be done before SR Policy config
252         # TODO: API?
253         self.vapi.cli("set sr encaps source addr a3::")
254
255         bsid = 'a3::9999:1'
256         # configure SRv6 Policy
257         # Note: segment list order: first -> last
258         sr_policy = VppSRv6Policy(
259             self, bsid=bsid,
260             is_encap=0,
261             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
262             weight=1, fib_table=0,
263             segments=['a4::', 'a5::', 'a6::c7'],
264             source='a3::')
265         sr_policy.add_vpp_config()
266         self.sr_policy = sr_policy
267
268         # log the sr policies
269         self.logger.info(self.vapi.cli("show sr policies"))
270
271         # steer IPv6 traffic to a7::/64 into SRv6 Policy
272         # use the bsid of the above self.sr_policy
273         pol_steering = VppSRv6Steering(
274                         self,
275                         bsid=self.sr_policy.bsid,
276                         prefix="a7::", mask_width=64,
277                         traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
278                         sr_policy_index=0, table_id=0,
279                         sw_if_index=0)
280         pol_steering.add_vpp_config()
281
282         # log the sr steering policies
283         self.logger.info(self.vapi.cli("show sr steering policies"))
284
285         # create packets
286         count = len(self.pg_packet_sizes)
287         dst_inner = 'a7::1234'
288         pkts = []
289
290         # create IPv6 packets without SRH
291         packet_header = self.create_packet_header_IPv6(dst_inner)
292         # create traffic stream pg0->pg1
293         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
294                                        self.pg_packet_sizes, count))
295
296         # create IPv6 packets with SRH
297         # packets with segments-left 1, active segment a7::
298         packet_header = self.create_packet_header_IPv6_SRH(
299             sidlist=['a8::', 'a7::', 'a6::'],
300             segleft=1)
301         # create traffic stream pg0->pg1
302         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
303                                        self.pg_packet_sizes, count))
304
305         # send packets and verify received packets
306         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
307                                   self.compare_rx_tx_packet_T_Insert)
308
309         # log the localsid counters
310         self.logger.info(self.vapi.cli("show sr localsid"))
311
312         # remove SR steering
313         pol_steering.remove_vpp_config()
314         self.logger.info(self.vapi.cli("show sr steering policies"))
315
316         # remove SR Policies
317         self.sr_policy.remove_vpp_config()
318         self.logger.info(self.vapi.cli("show sr policies"))
319
320         # remove FIB entries
321         # done by tearDown
322
323         # cleanup interfaces
324         self.teardown_interfaces()
325
326     @unittest.skipUnless(0, "PC to fix")
327     def test_SRv6_T_Encaps_IPv4(self):
328         """ Test SRv6 Transit.Encaps behavior for IPv4.
329         """
330         # send traffic to one destination interface
331         # source interface is IPv4 only
332         # destination interface is IPv6 only
333         self.setup_interfaces(ipv6=[False, True], ipv4=[True, False])
334
335         # configure FIB entries
336         route = VppIpRoute(self, "a4::", 64,
337                            [VppRoutePath(self.pg1.remote_ip6,
338                                          self.pg1.sw_if_index)])
339         route.add_vpp_config()
340
341         # configure encaps IPv6 source address
342         # needs to be done before SR Policy config
343         # TODO: API?
344         self.vapi.cli("set sr encaps source addr a3::")
345
346         bsid = 'a3::9999:1'
347         # configure SRv6 Policy
348         # Note: segment list order: first -> last
349         sr_policy = VppSRv6Policy(
350             self, bsid=bsid,
351             is_encap=1,
352             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
353             weight=1, fib_table=0,
354             segments=['a4::', 'a5::', 'a6::c7'],
355             source='a3::')
356         sr_policy.add_vpp_config()
357         self.sr_policy = sr_policy
358
359         # log the sr policies
360         self.logger.info(self.vapi.cli("show sr policies"))
361
362         # steer IPv4 traffic to 7.1.1.0/24 into SRv6 Policy
363         # use the bsid of the above self.sr_policy
364         pol_steering = VppSRv6Steering(
365                         self,
366                         bsid=self.sr_policy.bsid,
367                         prefix="7.1.1.0", mask_width=24,
368                         traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV4,
369                         sr_policy_index=0, table_id=0,
370                         sw_if_index=0)
371         pol_steering.add_vpp_config()
372
373         # log the sr steering policies
374         self.logger.info(self.vapi.cli("show sr steering policies"))
375
376         # create packets
377         count = len(self.pg_packet_sizes)
378         dst_inner = '7.1.1.123'
379         pkts = []
380
381         # create IPv4 packets
382         packet_header = self.create_packet_header_IPv4(dst_inner)
383         # create traffic stream pg0->pg1
384         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
385                                        self.pg_packet_sizes, count))
386
387         # send packets and verify received packets
388         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
389                                   self.compare_rx_tx_packet_T_Encaps_IPv4)
390
391         # log the localsid counters
392         self.logger.info(self.vapi.cli("show sr localsid"))
393
394         # remove SR steering
395         pol_steering.remove_vpp_config()
396         self.logger.info(self.vapi.cli("show sr steering policies"))
397
398         # remove SR Policies
399         self.sr_policy.remove_vpp_config()
400         self.logger.info(self.vapi.cli("show sr policies"))
401
402         # remove FIB entries
403         # done by tearDown
404
405         # cleanup interfaces
406         self.teardown_interfaces()
407
408     @unittest.skip("VPP crashes after running this test")
409     def test_SRv6_T_Encaps_L2(self):
410         """ Test SRv6 Transit.Encaps behavior for L2.
411         """
412         # send traffic to one destination interface
413         # source interface is IPv4 only TODO?
414         # destination interface is IPv6 only
415         self.setup_interfaces(ipv6=[False, True], ipv4=[False, False])
416
417         # configure FIB entries
418         route = VppIpRoute(self, "a4::", 64,
419                            [VppRoutePath(self.pg1.remote_ip6,
420                                          self.pg1.sw_if_index)])
421         route.add_vpp_config()
422
423         # configure encaps IPv6 source address
424         # needs to be done before SR Policy config
425         # TODO: API?
426         self.vapi.cli("set sr encaps source addr a3::")
427
428         bsid = 'a3::9999:1'
429         # configure SRv6 Policy
430         # Note: segment list order: first -> last
431         sr_policy = VppSRv6Policy(
432             self, bsid=bsid,
433             is_encap=1,
434             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
435             weight=1, fib_table=0,
436             segments=['a4::', 'a5::', 'a6::c7'],
437             source='a3::')
438         sr_policy.add_vpp_config()
439         self.sr_policy = sr_policy
440
441         # log the sr policies
442         self.logger.info(self.vapi.cli("show sr policies"))
443
444         # steer L2 traffic into SRv6 Policy
445         # use the bsid of the above self.sr_policy
446         pol_steering = VppSRv6Steering(
447                         self,
448                         bsid=self.sr_policy.bsid,
449                         prefix="::", mask_width=0,
450                         traffic_type=SRv6PolicySteeringTypes.SR_STEER_L2,
451                         sr_policy_index=0, table_id=0,
452                         sw_if_index=self.pg0.sw_if_index)
453         pol_steering.add_vpp_config()
454
455         # log the sr steering policies
456         self.logger.info(self.vapi.cli("show sr steering policies"))
457
458         # create packets
459         count = len(self.pg_packet_sizes)
460         pkts = []
461
462         # create L2 packets without dot1q header
463         packet_header = self.create_packet_header_L2()
464         # create traffic stream pg0->pg1
465         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
466                                        self.pg_packet_sizes, count))
467
468         # create L2 packets with dot1q header
469         packet_header = self.create_packet_header_L2(vlan=123)
470         # create traffic stream pg0->pg1
471         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
472                                        self.pg_packet_sizes, count))
473
474         # send packets and verify received packets
475         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
476                                   self.compare_rx_tx_packet_T_Encaps_L2)
477
478         # log the localsid counters
479         self.logger.info(self.vapi.cli("show sr localsid"))
480
481         # remove SR steering
482         pol_steering.remove_vpp_config()
483         self.logger.info(self.vapi.cli("show sr steering policies"))
484
485         # remove SR Policies
486         self.sr_policy.remove_vpp_config()
487         self.logger.info(self.vapi.cli("show sr policies"))
488
489         # remove FIB entries
490         # done by tearDown
491
492         # cleanup interfaces
493         self.teardown_interfaces()
494
495     def test_SRv6_End(self):
496         """ Test SRv6 End (without PSP) behavior.
497         """
498         # send traffic to one destination interface
499         # source and destination interfaces are IPv6 only
500         self.setup_interfaces(ipv6=[True, True])
501
502         # configure FIB entries
503         route = VppIpRoute(self, "a4::", 64,
504                            [VppRoutePath(self.pg1.remote_ip6,
505                                          self.pg1.sw_if_index)])
506         route.add_vpp_config()
507
508         # configure SRv6 localSID End without PSP behavior
509         localsid = VppSRv6LocalSID(
510                         self, localsid={'addr': 'A3::0'},
511                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
512                         nh_addr4='0.0.0.0',
513                         nh_addr6='::',
514                         end_psp=0,
515                         sw_if_index=0,
516                         vlan_index=0,
517                         fib_table=0)
518         localsid.add_vpp_config()
519         # log the localsids
520         self.logger.debug(self.vapi.cli("show sr localsid"))
521
522         # create IPv6 packets with SRH (SL=2, SL=1, SL=0)
523         # send one packet per SL value per packet size
524         # SL=0 packet with localSID End with USP needs 2nd SRH
525         count = len(self.pg_packet_sizes)
526         dst_inner = 'a4::1234'
527         pkts = []
528
529         # packets with segments-left 2, active segment a3::
530         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
531                 dst_inner,
532                 sidlist=['a5::', 'a4::', 'a3::'],
533                 segleft=2)
534         # create traffic stream pg0->pg1
535         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
536                                        self.pg_packet_sizes, count))
537
538         # packets with segments-left 1, active segment a3::
539         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
540                 dst_inner,
541                 sidlist=['a4::', 'a3::', 'a2::'],
542                 segleft=1)
543         # add to traffic stream pg0->pg1
544         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
545                                        self.pg_packet_sizes, count))
546
547         # TODO: test behavior with SL=0 packet (needs 2*SRH?)
548
549         # send packets and verify received packets
550         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
551                                   self.compare_rx_tx_packet_End)
552
553         # log the localsid counters
554         self.logger.info(self.vapi.cli("show sr localsid"))
555
556         # remove SRv6 localSIDs
557         localsid.remove_vpp_config()
558
559         # remove FIB entries
560         # done by tearDown
561
562         # cleanup interfaces
563         self.teardown_interfaces()
564
565     def test_SRv6_End_with_PSP(self):
566         """ Test SRv6 End with PSP behavior.
567         """
568         # send traffic to one destination interface
569         # source and destination interfaces are IPv6 only
570         self.setup_interfaces(ipv6=[True, True])
571
572         # configure FIB entries
573         route = VppIpRoute(self, "a4::", 64,
574                            [VppRoutePath(self.pg1.remote_ip6,
575                                          self.pg1.sw_if_index)])
576         route.add_vpp_config()
577
578         # configure SRv6 localSID End with PSP behavior
579         localsid = VppSRv6LocalSID(
580                         self, localsid={'addr': 'A3::0'},
581                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
582                         nh_addr4='0.0.0.0',
583                         nh_addr6='::',
584                         end_psp=1,
585                         sw_if_index=0,
586                         vlan_index=0,
587                         fib_table=0)
588         localsid.add_vpp_config()
589         # log the localsids
590         self.logger.debug(self.vapi.cli("show sr localsid"))
591
592         # create IPv6 packets with SRH (SL=2, SL=1)
593         # send one packet per SL value per packet size
594         # SL=0 packet with localSID End with PSP is dropped
595         count = len(self.pg_packet_sizes)
596         dst_inner = 'a4::1234'
597         pkts = []
598
599         # packets with segments-left 2, active segment a3::
600         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
601                     dst_inner,
602                     sidlist=['a5::', 'a4::', 'a3::'],
603                     segleft=2)
604         # create traffic stream pg0->pg1
605         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
606                                        self.pg_packet_sizes, count))
607
608         # packets with segments-left 1, active segment a3::
609         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
610                     dst_inner,
611                     sidlist=['a4::', 'a3::', 'a2::'],
612                     segleft=1)
613         # add to traffic stream pg0->pg1
614         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
615                                        self.pg_packet_sizes, count))
616
617         # send packets and verify received packets
618         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
619                                   self.compare_rx_tx_packet_End_PSP)
620
621         # log the localsid counters
622         self.logger.info(self.vapi.cli("show sr localsid"))
623
624         # remove SRv6 localSIDs
625         localsid.remove_vpp_config()
626
627         # remove FIB entries
628         # done by tearDown
629
630         # cleanup interfaces
631         self.teardown_interfaces()
632
633     def test_SRv6_End_X(self):
634         """ Test SRv6 End.X (without PSP) behavior.
635         """
636         # create three interfaces (1 source, 2 destinations)
637         # source and destination interfaces are IPv6 only
638         self.setup_interfaces(ipv6=[True, True, True])
639
640         # configure FIB entries
641         # a4::/64 via pg1 and pg2
642         route = VppIpRoute(self, "a4::", 64,
643                            [VppRoutePath(self.pg1.remote_ip6,
644                                          self.pg1.sw_if_index),
645                             VppRoutePath(self.pg2.remote_ip6,
646                                          self.pg2.sw_if_index)])
647         route.add_vpp_config()
648         self.logger.debug(self.vapi.cli("show ip6 fib"))
649
650         # configure SRv6 localSID End.X without PSP behavior
651         # End.X points to interface pg1
652         localsid = VppSRv6LocalSID(
653                         self, localsid={'addr': 'A3::C4'},
654                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
655                         nh_addr4='0.0.0.0',
656                         nh_addr6=self.pg1.remote_ip6,
657                         end_psp=0,
658                         sw_if_index=self.pg1.sw_if_index,
659                         vlan_index=0,
660                         fib_table=0)
661         localsid.add_vpp_config()
662         # log the localsids
663         self.logger.debug(self.vapi.cli("show sr localsid"))
664
665         # create IPv6 packets with SRH (SL=2, SL=1)
666         # send one packet per SL value per packet size
667         # SL=0 packet with localSID End with PSP is dropped
668         count = len(self.pg_packet_sizes)
669         dst_inner = 'a4::1234'
670         pkts = []
671
672         # packets with segments-left 2, active segment a3::c4
673         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
674                     dst_inner,
675                     sidlist=['a5::', 'a4::', 'a3::c4'],
676                     segleft=2)
677         # create traffic stream pg0->pg1
678         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
679                                        self.pg_packet_sizes, count))
680
681         # packets with segments-left 1, active segment a3::c4
682         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
683                     dst_inner,
684                     sidlist=['a4::', 'a3::c4', 'a2::'],
685                     segleft=1)
686         # add to traffic stream pg0->pg1
687         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
688                                        self.pg_packet_sizes, count))
689
690         # send packets and verify received packets
691         # using same comparison function as End (no PSP)
692         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
693                                   self.compare_rx_tx_packet_End)
694
695         # assert nothing was received on the other interface (pg2)
696         self.pg2.assert_nothing_captured("mis-directed packet(s)")
697
698         # log the localsid counters
699         self.logger.info(self.vapi.cli("show sr localsid"))
700
701         # remove SRv6 localSIDs
702         localsid.remove_vpp_config()
703
704         # remove FIB entries
705         # done by tearDown
706
707         # cleanup interfaces
708         self.teardown_interfaces()
709
710     def test_SRv6_End_X_with_PSP(self):
711         """ Test SRv6 End.X with PSP behavior.
712         """
713         # create three interfaces (1 source, 2 destinations)
714         # source and destination interfaces are IPv6 only
715         self.setup_interfaces(ipv6=[True, True, True])
716
717         # configure FIB entries
718         # a4::/64 via pg1 and pg2
719         route = VppIpRoute(self, "a4::", 64,
720                            [VppRoutePath(
721                                self.pg1.remote_ip6,
722                                self.pg1.sw_if_index),
723                             VppRoutePath(self.pg2.remote_ip6,
724                                          self.pg2.sw_if_index)])
725         route.add_vpp_config()
726
727         # configure SRv6 localSID End with PSP behavior
728         localsid = VppSRv6LocalSID(
729                         self, localsid={'addr': 'A3::C4'},
730                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
731                         nh_addr4='0.0.0.0',
732                         nh_addr6=self.pg1.remote_ip6,
733                         end_psp=1,
734                         sw_if_index=self.pg1.sw_if_index,
735                         vlan_index=0,
736                         fib_table=0)
737         localsid.add_vpp_config()
738         # log the localsids
739         self.logger.debug(self.vapi.cli("show sr localsid"))
740
741         # create IPv6 packets with SRH (SL=2, SL=1)
742         # send one packet per SL value per packet size
743         # SL=0 packet with localSID End with PSP is dropped
744         count = len(self.pg_packet_sizes)
745         dst_inner = 'a4::1234'
746         pkts = []
747
748         # packets with segments-left 2, active segment a3::
749         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
750                     dst_inner,
751                     sidlist=['a5::', 'a4::', 'a3::c4'],
752                     segleft=2)
753         # create traffic stream pg0->pg1
754         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
755                                        self.pg_packet_sizes, count))
756
757         # packets with segments-left 1, active segment a3::
758         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
759                     dst_inner,
760                     sidlist=['a4::', 'a3::c4', 'a2::'],
761                     segleft=1)
762         # add to traffic stream pg0->pg1
763         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
764                                        self.pg_packet_sizes, count))
765
766         # send packets and verify received packets
767         # using same comparison function as End with PSP
768         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
769                                   self.compare_rx_tx_packet_End_PSP)
770
771         # assert nothing was received on the other interface (pg2)
772         self.pg2.assert_nothing_captured("mis-directed packet(s)")
773
774         # log the localsid counters
775         self.logger.info(self.vapi.cli("show sr localsid"))
776
777         # remove SRv6 localSIDs
778         localsid.remove_vpp_config()
779
780         # remove FIB entries
781         # done by tearDown
782
783         # cleanup interfaces
784         self.teardown_interfaces()
785
786     def test_SRv6_End_DX6(self):
787         """ Test SRv6 End.DX6 behavior.
788         """
789         # send traffic to one destination interface
790         # source and destination interfaces are IPv6 only
791         self.setup_interfaces(ipv6=[True, True])
792
793         # configure SRv6 localSID End.DX6 behavior
794         localsid = VppSRv6LocalSID(
795                         self, localsid={'addr': 'A3::C4'},
796                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX6,
797                         nh_addr4='0.0.0.0',
798                         nh_addr6=self.pg1.remote_ip6,
799                         end_psp=0,
800                         sw_if_index=self.pg1.sw_if_index,
801                         vlan_index=0,
802                         fib_table=0)
803         localsid.add_vpp_config()
804         # log the localsids
805         self.logger.debug(self.vapi.cli("show sr localsid"))
806
807         # create IPv6 packets with SRH (SL=0)
808         # send one packet per packet size
809         count = len(self.pg_packet_sizes)
810         dst_inner = 'a4::1234'  # inner header destination address
811         pkts = []
812
813         # packets with SRH, segments-left 0, active segment a3::c4
814         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
815                         dst_inner,
816                         sidlist=['a3::c4', 'a2::', 'a1::'],
817                         segleft=0)
818         # add to traffic stream pg0->pg1
819         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
820                                        self.pg_packet_sizes, count))
821
822         # packets without SRH, IPv6 in IPv6
823         # outer IPv6 dest addr is the localsid End.DX6
824         packet_header = self.create_packet_header_IPv6_IPv6(
825                                             dst_inner,
826                                             dst_outer='a3::c4')
827         # add to traffic stream pg0->pg1
828         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
829                                        self.pg_packet_sizes, count))
830
831         # send packets and verify received packets
832         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
833                                   self.compare_rx_tx_packet_End_DX6)
834
835         # log the localsid counters
836         self.logger.info(self.vapi.cli("show sr localsid"))
837
838         # remove SRv6 localSIDs
839         localsid.remove_vpp_config()
840
841         # cleanup interfaces
842         self.teardown_interfaces()
843
844     def test_SRv6_End_DT6(self):
845         """ Test SRv6 End.DT6 behavior.
846         """
847         # create three interfaces (1 source, 2 destinations)
848         # all interfaces are IPv6 only
849         # source interface in global FIB (0)
850         # destination interfaces in global and vrf
851         vrf_1 = 1
852         ipt = VppIpTable(self, vrf_1, is_ip6=True)
853         ipt.add_vpp_config()
854         self.setup_interfaces(ipv6=[True, True, True],
855                               ipv6_table_id=[0, 0, vrf_1])
856
857         # configure FIB entries
858         # a4::/64 is reachable
859         #     via pg1 in table 0 (global)
860         #     and via pg2 in table vrf_1
861         route0 = VppIpRoute(self, "a4::", 64,
862                             [VppRoutePath(self.pg1.remote_ip6,
863                                           self.pg1.sw_if_index,
864                                           nh_table_id=0)],
865                             table_id=0)
866         route0.add_vpp_config()
867         route1 = VppIpRoute(self, "a4::", 64,
868                             [VppRoutePath(self.pg2.remote_ip6,
869                                           self.pg2.sw_if_index,
870                                           nh_table_id=vrf_1)],
871                             table_id=vrf_1)
872         route1.add_vpp_config()
873         self.logger.debug(self.vapi.cli("show ip6 fib"))
874
875         # configure SRv6 localSID End.DT6 behavior
876         # Note:
877         # fib_table: where the localsid is installed
878         # sw_if_index: in T-variants of localsid this is the vrf table_id
879         localsid = VppSRv6LocalSID(
880                         self, localsid={'addr': 'A3::C4'},
881                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT6,
882                         nh_addr4='0.0.0.0',
883                         nh_addr6='::',
884                         end_psp=0,
885                         sw_if_index=vrf_1,
886                         vlan_index=0,
887                         fib_table=0)
888         localsid.add_vpp_config()
889         # log the localsids
890         self.logger.debug(self.vapi.cli("show sr localsid"))
891
892         # create IPv6 packets with SRH (SL=0)
893         # send one packet per packet size
894         count = len(self.pg_packet_sizes)
895         dst_inner = 'a4::1234'  # inner header destination address
896         pkts = []
897
898         # packets with SRH, segments-left 0, active segment a3::c4
899         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
900                         dst_inner,
901                         sidlist=['a3::c4', 'a2::', 'a1::'],
902                         segleft=0)
903         # add to traffic stream pg0->pg1
904         pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
905                                        self.pg_packet_sizes, count))
906
907         # packets without SRH, IPv6 in IPv6
908         # outer IPv6 dest addr is the localsid End.DT6
909         packet_header = self.create_packet_header_IPv6_IPv6(
910                                             dst_inner,
911                                             dst_outer='a3::c4')
912         # add to traffic stream pg0->pg1
913         pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
914                                        self.pg_packet_sizes, count))
915
916         # send packets and verify received packets
917         # using same comparison function as End.DX6
918         self.send_and_verify_pkts(self.pg0, pkts, self.pg2,
919                                   self.compare_rx_tx_packet_End_DX6)
920
921         # assert nothing was received on the other interface (pg2)
922         self.pg1.assert_nothing_captured("mis-directed packet(s)")
923
924         # log the localsid counters
925         self.logger.info(self.vapi.cli("show sr localsid"))
926
927         # remove SRv6 localSIDs
928         localsid.remove_vpp_config()
929
930         # remove FIB entries
931         # done by tearDown
932
933         # cleanup interfaces
934         self.teardown_interfaces()
935
936     def test_SRv6_End_DX4(self):
937         """ Test SRv6 End.DX4 behavior.
938         """
939         # send traffic to one destination interface
940         # source interface is IPv6 only
941         # destination interface is IPv4 only
942         self.setup_interfaces(ipv6=[True, False], ipv4=[False, True])
943
944         # configure SRv6 localSID End.DX4 behavior
945         localsid = VppSRv6LocalSID(
946                         self, localsid={'addr': 'A3::C4'},
947                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX4,
948                         nh_addr4=self.pg1.remote_ip4,
949                         nh_addr6='::',
950                         end_psp=0,
951                         sw_if_index=self.pg1.sw_if_index,
952                         vlan_index=0,
953                         fib_table=0)
954         localsid.add_vpp_config()
955         # log the localsids
956         self.logger.debug(self.vapi.cli("show sr localsid"))
957
958         # send one packet per packet size
959         count = len(self.pg_packet_sizes)
960         dst_inner = '4.1.1.123'  # inner header destination address
961         pkts = []
962
963         # packets with SRH, segments-left 0, active segment a3::c4
964         packet_header = self.create_packet_header_IPv6_SRH_IPv4(
965                         dst_inner,
966                         sidlist=['a3::c4', 'a2::', 'a1::'],
967                         segleft=0)
968         # add to traffic stream pg0->pg1
969         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
970                                        self.pg_packet_sizes, count))
971
972         # packets without SRH, IPv4 in IPv6
973         # outer IPv6 dest addr is the localsid End.DX4
974         packet_header = self.create_packet_header_IPv6_IPv4(
975                                             dst_inner,
976                                             dst_outer='a3::c4')
977         # add to traffic stream pg0->pg1
978         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
979                                        self.pg_packet_sizes, count))
980
981         # send packets and verify received packets
982         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
983                                   self.compare_rx_tx_packet_End_DX4)
984
985         # log the localsid counters
986         self.logger.info(self.vapi.cli("show sr localsid"))
987
988         # remove SRv6 localSIDs
989         localsid.remove_vpp_config()
990
991         # cleanup interfaces
992         self.teardown_interfaces()
993
994     def test_SRv6_End_DT4(self):
995         """ Test SRv6 End.DT4 behavior.
996         """
997         # create three interfaces (1 source, 2 destinations)
998         # source interface is IPv6-only
999         # destination interfaces are IPv4 only
1000         # source interface in global FIB (0)
1001         # destination interfaces in global and vrf
1002         vrf_1 = 1
1003         ipt = VppIpTable(self, vrf_1)
1004         ipt.add_vpp_config()
1005         self.setup_interfaces(ipv6=[True, False, False],
1006                               ipv4=[False, True, True],
1007                               ipv6_table_id=[0, 0, 0],
1008                               ipv4_table_id=[0, 0, vrf_1])
1009
1010         # configure FIB entries
1011         # 4.1.1.0/24 is reachable
1012         #     via pg1 in table 0 (global)
1013         #     and via pg2 in table vrf_1
1014         route0 = VppIpRoute(self, "4.1.1.0", 24,
1015                             [VppRoutePath(self.pg1.remote_ip4,
1016                                           self.pg1.sw_if_index,
1017                                           nh_table_id=0)],
1018                             table_id=0)
1019         route0.add_vpp_config()
1020         route1 = VppIpRoute(self, "4.1.1.0", 24,
1021                             [VppRoutePath(self.pg2.remote_ip4,
1022                                           self.pg2.sw_if_index,
1023                                           nh_table_id=vrf_1)],
1024                             table_id=vrf_1)
1025         route1.add_vpp_config()
1026         self.logger.debug(self.vapi.cli("show ip fib"))
1027
1028         # configure SRv6 localSID End.DT6 behavior
1029         # Note:
1030         # fib_table: where the localsid is installed
1031         # sw_if_index: in T-variants of localsid: vrf table_id
1032         localsid = VppSRv6LocalSID(
1033                         self, localsid={'addr': 'A3::C4'},
1034                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT4,
1035                         nh_addr4='0.0.0.0',
1036                         nh_addr6='::',
1037                         end_psp=0,
1038                         sw_if_index=vrf_1,
1039                         vlan_index=0,
1040                         fib_table=0)
1041         localsid.add_vpp_config()
1042         # log the localsids
1043         self.logger.debug(self.vapi.cli("show sr localsid"))
1044
1045         # create IPv6 packets with SRH (SL=0)
1046         # send one packet per packet size
1047         count = len(self.pg_packet_sizes)
1048         dst_inner = '4.1.1.123'  # inner header destination address
1049         pkts = []
1050
1051         # packets with SRH, segments-left 0, active segment a3::c4
1052         packet_header = self.create_packet_header_IPv6_SRH_IPv4(
1053                         dst_inner,
1054                         sidlist=['a3::c4', 'a2::', 'a1::'],
1055                         segleft=0)
1056         # add to traffic stream pg0->pg1
1057         pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
1058                                        self.pg_packet_sizes, count))
1059
1060         # packets without SRH, IPv6 in IPv6
1061         # outer IPv6 dest addr is the localsid End.DX4
1062         packet_header = self.create_packet_header_IPv6_IPv4(
1063                                             dst_inner,
1064                                             dst_outer='a3::c4')
1065         # add to traffic stream pg0->pg1
1066         pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
1067                                        self.pg_packet_sizes, count))
1068
1069         # send packets and verify received packets
1070         # using same comparison function as End.DX4
1071         self.send_and_verify_pkts(self.pg0, pkts, self.pg2,
1072                                   self.compare_rx_tx_packet_End_DX4)
1073
1074         # assert nothing was received on the other interface (pg2)
1075         self.pg1.assert_nothing_captured("mis-directed packet(s)")
1076
1077         # log the localsid counters
1078         self.logger.info(self.vapi.cli("show sr localsid"))
1079
1080         # remove SRv6 localSIDs
1081         localsid.remove_vpp_config()
1082
1083         # remove FIB entries
1084         # done by tearDown
1085
1086         # cleanup interfaces
1087         self.teardown_interfaces()
1088
1089     def test_SRv6_End_DX2(self):
1090         """ Test SRv6 End.DX2 behavior.
1091         """
1092         # send traffic to one destination interface
1093         # source interface is IPv6 only
1094         self.setup_interfaces(ipv6=[True, False], ipv4=[False, False])
1095
1096         # configure SRv6 localSID End.DX2 behavior
1097         localsid = VppSRv6LocalSID(
1098                         self, localsid={'addr': 'A3::C4'},
1099                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX2,
1100                         nh_addr4='0.0.0.0',
1101                         nh_addr6='::',
1102                         end_psp=0,
1103                         sw_if_index=self.pg1.sw_if_index,
1104                         vlan_index=0,
1105                         fib_table=0)
1106         localsid.add_vpp_config()
1107         # log the localsids
1108         self.logger.debug(self.vapi.cli("show sr localsid"))
1109
1110         # send one packet per packet size
1111         count = len(self.pg_packet_sizes)
1112         pkts = []
1113
1114         # packets with SRH, segments-left 0, active segment a3::c4
1115         # L2 has no dot1q header
1116         packet_header = self.create_packet_header_IPv6_SRH_L2(
1117                             sidlist=['a3::c4', 'a2::', 'a1::'],
1118                             segleft=0,
1119                             vlan=0)
1120         # add to traffic stream pg0->pg1
1121         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1122                                        self.pg_packet_sizes, count))
1123
1124         # packets with SRH, segments-left 0, active segment a3::c4
1125         # L2 has dot1q header
1126         packet_header = self.create_packet_header_IPv6_SRH_L2(
1127                             sidlist=['a3::c4', 'a2::', 'a1::'],
1128                             segleft=0,
1129                             vlan=123)
1130         # add to traffic stream pg0->pg1
1131         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1132                                        self.pg_packet_sizes, count))
1133
1134         # packets without SRH, L2 in IPv6
1135         # outer IPv6 dest addr is the localsid End.DX2
1136         # L2 has no dot1q header
1137         packet_header = self.create_packet_header_IPv6_L2(
1138                                             dst_outer='a3::c4',
1139                                             vlan=0)
1140         # add to traffic stream pg0->pg1
1141         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1142                                        self.pg_packet_sizes, count))
1143
1144         # packets without SRH, L2 in IPv6
1145         # outer IPv6 dest addr is the localsid End.DX2
1146         # L2 has dot1q header
1147         packet_header = self.create_packet_header_IPv6_L2(
1148                                             dst_outer='a3::c4',
1149                                             vlan=123)
1150         # add to traffic stream pg0->pg1
1151         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1152                                        self.pg_packet_sizes, count))
1153
1154         # send packets and verify received packets
1155         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
1156                                   self.compare_rx_tx_packet_End_DX2)
1157
1158         # log the localsid counters
1159         self.logger.info(self.vapi.cli("show sr localsid"))
1160
1161         # remove SRv6 localSIDs
1162         localsid.remove_vpp_config()
1163
1164         # cleanup interfaces
1165         self.teardown_interfaces()
1166
1167     @unittest.skipUnless(0, "PC to fix")
1168     def test_SRv6_T_Insert_Classifier(self):
1169         """ Test SRv6 Transit.Insert behavior (IPv6 only).
1170             steer packets using the classifier
1171         """
1172         # send traffic to one destination interface
1173         # source and destination are IPv6 only
1174         self.setup_interfaces(ipv6=[False, False, False, True, True])
1175
1176         # configure FIB entries
1177         route = VppIpRoute(self, "a4::", 64,
1178                            [VppRoutePath(
1179                                self.pg4.remote_ip6,
1180                                self.pg4.sw_if_index)])
1181         route.add_vpp_config()
1182
1183         # configure encaps IPv6 source address
1184         # needs to be done before SR Policy config
1185         # TODO: API?
1186         self.vapi.cli("set sr encaps source addr a3::")
1187
1188         bsid = 'a3::9999:1'
1189         # configure SRv6 Policy
1190         # Note: segment list order: first -> last
1191         sr_policy = VppSRv6Policy(
1192             self, bsid=bsid,
1193             is_encap=0,
1194             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
1195             weight=1, fib_table=0,
1196             segments=['a4::', 'a5::', 'a6::c7'],
1197             source='a3::')
1198         sr_policy.add_vpp_config()
1199         self.sr_policy = sr_policy
1200
1201         # log the sr policies
1202         self.logger.info(self.vapi.cli("show sr policies"))
1203
1204         # add classify table
1205         # mask on dst ip address prefix a7::/8
1206         mask = '{!s:0<16}'.format('ff')
1207         r = self.vapi.classify_add_del_table(
1208             1,
1209             binascii.unhexlify(mask),
1210             match_n_vectors=(len(mask) - 1) // 32 + 1,
1211             current_data_flag=1,
1212             skip_n_vectors=2)  # data offset
1213         self.assertIsNotNone(r, 'No response msg for add_del_table')
1214         table_index = r.new_table_index
1215
1216         # add the source routing node as a ip6 inacl netxt node
1217         r = self.vapi.add_node_next('ip6-inacl',
1218                                     'sr-pl-rewrite-insert')
1219         inacl_next_node_index = r.node_index
1220
1221         match = '{!s:0<16}'.format('a7')
1222         r = self.vapi.classify_add_del_session(
1223             1,
1224             table_index,
1225             binascii.unhexlify(match),
1226             hit_next_index=inacl_next_node_index,
1227             action=3,
1228             metadata=0)  # sr policy index
1229         self.assertIsNotNone(r, 'No response msg for add_del_session')
1230
1231         # log the classify table used in the steering policy
1232         self.logger.info(self.vapi.cli("show classify table"))
1233
1234         r = self.vapi.input_acl_set_interface(
1235             is_add=1,
1236             sw_if_index=self.pg3.sw_if_index,
1237             ip6_table_index=table_index)
1238         self.assertIsNotNone(r,
1239                              'No response msg for input_acl_set_interface')
1240
1241         # log the ip6 inacl
1242         self.logger.info(self.vapi.cli("show inacl type ip6"))
1243
1244         # create packets
1245         count = len(self.pg_packet_sizes)
1246         dst_inner = 'a7::1234'
1247         pkts = []
1248
1249         # create IPv6 packets without SRH
1250         packet_header = self.create_packet_header_IPv6(dst_inner)
1251         # create traffic stream pg3->pg4
1252         pkts.extend(self.create_stream(self.pg3, self.pg4, packet_header,
1253                                        self.pg_packet_sizes, count))
1254
1255         # create IPv6 packets with SRH
1256         # packets with segments-left 1, active segment a7::
1257         packet_header = self.create_packet_header_IPv6_SRH(
1258             sidlist=['a8::', 'a7::', 'a6::'],
1259             segleft=1)
1260         # create traffic stream pg3->pg4
1261         pkts.extend(self.create_stream(self.pg3, self.pg4, packet_header,
1262                                        self.pg_packet_sizes, count))
1263
1264         # send packets and verify received packets
1265         self.send_and_verify_pkts(self.pg3, pkts, self.pg4,
1266                                   self.compare_rx_tx_packet_T_Insert)
1267
1268         # remove the interface l2 input feature
1269         r = self.vapi.input_acl_set_interface(
1270             is_add=0,
1271             sw_if_index=self.pg3.sw_if_index,
1272             ip6_table_index=table_index)
1273         self.assertIsNotNone(r,
1274                              'No response msg for input_acl_set_interface')
1275
1276         # log the ip6 inacl after cleaning
1277         self.logger.info(self.vapi.cli("show inacl type ip6"))
1278
1279         # log the localsid counters
1280         self.logger.info(self.vapi.cli("show sr localsid"))
1281
1282         # remove classifier SR steering
1283         # classifier_steering.remove_vpp_config()
1284         self.logger.info(self.vapi.cli("show sr steering policies"))
1285
1286         # remove SR Policies
1287         self.sr_policy.remove_vpp_config()
1288         self.logger.info(self.vapi.cli("show sr policies"))
1289
1290         # remove classify session and table
1291         r = self.vapi.classify_add_del_session(
1292             0,
1293             table_index,
1294             binascii.unhexlify(match))
1295         self.assertIsNotNone(r, 'No response msg for add_del_session')
1296
1297         r = self.vapi.classify_add_del_table(
1298             0,
1299             binascii.unhexlify(mask),
1300             table_index=table_index)
1301         self.assertIsNotNone(r, 'No response msg for add_del_table')
1302
1303         self.logger.info(self.vapi.cli("show classify table"))
1304
1305         # remove FIB entries
1306         # done by tearDown
1307
1308         # cleanup interfaces
1309         self.teardown_interfaces()
1310
1311     def compare_rx_tx_packet_T_Encaps(self, tx_pkt, rx_pkt):
1312         """ Compare input and output packet after passing T.Encaps
1313
1314         :param tx_pkt: transmitted packet
1315         :param rx_pkt: received packet
1316         """
1317         # T.Encaps updates the headers as follows:
1318         # SR Policy seglist (S3, S2, S1)
1319         # SR Policy source C
1320         # IPv6:
1321         # in: IPv6(A, B2)
1322         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(A, B2)
1323         # IPv6 + SRH:
1324         # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1)
1325         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(a, B2)SRH(B3, B2, B1; SL=1)
1326
1327         # get first (outer) IPv6 header of rx'ed packet
1328         rx_ip = rx_pkt.getlayer(IPv6)
1329         rx_srh = None
1330
1331         tx_ip = tx_pkt.getlayer(IPv6)
1332
1333         # expected segment-list
1334         seglist = self.sr_policy.segments
1335         # reverse list to get order as in SRH
1336         tx_seglist = seglist[::-1]
1337
1338         # get source address of SR Policy
1339         sr_policy_source = self.sr_policy.source
1340
1341         # rx'ed packet should have SRH
1342         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1343         # get SRH
1344         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1345
1346         # received ip.src should be equal to SR Policy source
1347         self.assertEqual(rx_ip.src, sr_policy_source)
1348         # received ip.dst should be equal to expected sidlist[lastentry]
1349         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1350         # rx'ed seglist should be equal to expected seglist
1351         self.assertEqual(rx_srh.addresses, tx_seglist)
1352         # segleft should be equal to size expected seglist-1
1353         self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1354         # segleft should be equal to lastentry
1355         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1356
1357         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1358         # except for the hop-limit field
1359         #   -> update tx'ed hlim to the expected hlim
1360         tx_ip.hlim = tx_ip.hlim - 1
1361
1362         self.assertEqual(rx_srh.payload, tx_ip)
1363
1364         self.logger.debug("packet verification: SUCCESS")
1365
1366     def compare_rx_tx_packet_T_Encaps_IPv4(self, tx_pkt, rx_pkt):
1367         """ Compare input and output packet after passing T.Encaps for IPv4
1368
1369         :param tx_pkt: transmitted packet
1370         :param rx_pkt: received packet
1371         """
1372         # T.Encaps for IPv4 updates the headers as follows:
1373         # SR Policy seglist (S3, S2, S1)
1374         # SR Policy source C
1375         # IPv4:
1376         # in: IPv4(A, B2)
1377         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv4(A, B2)
1378
1379         # get first (outer) IPv6 header of rx'ed packet
1380         rx_ip = rx_pkt.getlayer(IPv6)
1381         rx_srh = None
1382
1383         tx_ip = tx_pkt.getlayer(IP)
1384
1385         # expected segment-list
1386         seglist = self.sr_policy.segments
1387         # reverse list to get order as in SRH
1388         tx_seglist = seglist[::-1]
1389
1390         # get source address of SR Policy
1391         sr_policy_source = self.sr_policy.source
1392
1393         # checks common to cases tx with and without SRH
1394         # rx'ed packet should have SRH and IPv4 header
1395         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1396         self.assertTrue(rx_ip.payload.haslayer(IP))
1397         # get SRH
1398         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1399
1400         # received ip.src should be equal to SR Policy source
1401         self.assertEqual(rx_ip.src, sr_policy_source)
1402         # received ip.dst should be equal to sidlist[lastentry]
1403         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1404         # rx'ed seglist should be equal to seglist
1405         self.assertEqual(rx_srh.addresses, tx_seglist)
1406         # segleft should be equal to size seglist-1
1407         self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1408         # segleft should be equal to lastentry
1409         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1410
1411         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1412         # except for the ttl field and ip checksum
1413         #   -> adjust tx'ed ttl to expected ttl
1414         tx_ip.ttl = tx_ip.ttl - 1
1415         #   -> set tx'ed ip checksum to None and let scapy recompute
1416         tx_ip.chksum = None
1417         # read back the pkt (with str()) to force computing these fields
1418         # probably other ways to accomplish this are possible
1419         tx_ip = IP(scapy.compat.raw(tx_ip))
1420
1421         self.assertEqual(rx_srh.payload, tx_ip)
1422
1423         self.logger.debug("packet verification: SUCCESS")
1424
1425     def compare_rx_tx_packet_T_Encaps_L2(self, tx_pkt, rx_pkt):
1426         """ Compare input and output packet after passing T.Encaps for L2
1427
1428         :param tx_pkt: transmitted packet
1429         :param rx_pkt: received packet
1430         """
1431         # T.Encaps for L2 updates the headers as follows:
1432         # SR Policy seglist (S3, S2, S1)
1433         # SR Policy source C
1434         # L2:
1435         # in: L2
1436         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)L2
1437
1438         # get first (outer) IPv6 header of rx'ed packet
1439         rx_ip = rx_pkt.getlayer(IPv6)
1440         rx_srh = None
1441
1442         tx_ether = tx_pkt.getlayer(Ether)
1443
1444         # expected segment-list
1445         seglist = self.sr_policy.segments
1446         # reverse list to get order as in SRH
1447         tx_seglist = seglist[::-1]
1448
1449         # get source address of SR Policy
1450         sr_policy_source = self.sr_policy.source
1451
1452         # rx'ed packet should have SRH
1453         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1454         # get SRH
1455         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1456
1457         # received ip.src should be equal to SR Policy source
1458         self.assertEqual(rx_ip.src, sr_policy_source)
1459         # received ip.dst should be equal to sidlist[lastentry]
1460         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1461         # rx'ed seglist should be equal to seglist
1462         self.assertEqual(rx_srh.addresses, tx_seglist)
1463         # segleft should be equal to size seglist-1
1464         self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1465         # segleft should be equal to lastentry
1466         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1467         # nh should be "No Next Header" (59)
1468         self.assertEqual(rx_srh.nh, 59)
1469
1470         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1471         self.assertEqual(Ether(scapy.compat.raw(rx_srh.payload)), tx_ether)
1472
1473         self.logger.debug("packet verification: SUCCESS")
1474
1475     def compare_rx_tx_packet_T_Insert(self, tx_pkt, rx_pkt):
1476         """ Compare input and output packet after passing T.Insert
1477
1478         :param tx_pkt: transmitted packet
1479         :param rx_pkt: received packet
1480         """
1481         # T.Insert updates the headers as follows:
1482         # IPv6:
1483         # in: IPv6(A, B2)
1484         # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)
1485         # IPv6 + SRH:
1486         # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1)
1487         # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)SRH(B3, B2, B1; SL=1)
1488
1489         # get first (outer) IPv6 header of rx'ed packet
1490         rx_ip = rx_pkt.getlayer(IPv6)
1491         rx_srh = None
1492         rx_ip2 = None
1493         rx_srh2 = None
1494         rx_ip3 = None
1495         rx_udp = rx_pkt[UDP]
1496
1497         tx_ip = tx_pkt.getlayer(IPv6)
1498         tx_srh = None
1499         tx_ip2 = None
1500         # some packets have been tx'ed with an SRH, some without it
1501         # get SRH if tx'ed packet has it
1502         if tx_pkt.haslayer(IPv6ExtHdrSegmentRouting):
1503             tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1504             tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1505         tx_udp = tx_pkt[UDP]
1506
1507         # expected segment-list (make copy of SR Policy segment list)
1508         seglist = self.sr_policy.segments[:]
1509         # expected seglist has initial dest addr as last segment
1510         seglist.append(tx_ip.dst)
1511         # reverse list to get order as in SRH
1512         tx_seglist = seglist[::-1]
1513
1514         # get source address of SR Policy
1515         sr_policy_source = self.sr_policy.source
1516
1517         # checks common to cases tx with and without SRH
1518         # rx'ed packet should have SRH and only one IPv6 header
1519         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1520         self.assertFalse(rx_ip.payload.haslayer(IPv6))
1521         # get SRH
1522         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1523
1524         # rx'ed ip.src should be equal to tx'ed ip.src
1525         self.assertEqual(rx_ip.src, tx_ip.src)
1526         # rx'ed ip.dst should be equal to sidlist[lastentry]
1527         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1528
1529         # rx'ed seglist should be equal to expected seglist
1530         self.assertEqual(rx_srh.addresses, tx_seglist)
1531         # segleft should be equal to size(expected seglist)-1
1532         self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1533         # segleft should be equal to lastentry
1534         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1535
1536         if tx_srh:  # packet was tx'ed with SRH
1537             # packet should have 2nd SRH
1538             self.assertTrue(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting))
1539             # get 2nd SRH
1540             rx_srh2 = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting, 2)
1541
1542             # rx'ed srh2.addresses should be equal to tx'ed srh.addresses
1543             self.assertEqual(rx_srh2.addresses, tx_srh.addresses)
1544             # rx'ed srh2.segleft should be equal to tx'ed srh.segleft
1545             self.assertEqual(rx_srh2.segleft, tx_srh.segleft)
1546             # rx'ed srh2.lastentry should be equal to tx'ed srh.lastentry
1547             self.assertEqual(rx_srh2.lastentry, tx_srh.lastentry)
1548
1549         else:  # packet was tx'ed without SRH
1550             # rx packet should have no other SRH
1551             self.assertFalse(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting))
1552
1553         # UDP layer should be unchanged
1554         self.assertEqual(rx_udp, tx_udp)
1555
1556         self.logger.debug("packet verification: SUCCESS")
1557
1558     def compare_rx_tx_packet_End(self, tx_pkt, rx_pkt):
1559         """ Compare input and output packet after passing End (without PSP)
1560
1561         :param tx_pkt: transmitted packet
1562         :param rx_pkt: received packet
1563         """
1564         # End (no PSP) updates the headers as follows:
1565         # IPv6 + SRH:
1566         # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2)
1567         # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1568
1569         # get first (outer) IPv6 header of rx'ed packet
1570         rx_ip = rx_pkt.getlayer(IPv6)
1571         rx_srh = None
1572         rx_ip2 = None
1573         rx_udp = rx_pkt[UDP]
1574
1575         tx_ip = tx_pkt.getlayer(IPv6)
1576         # we know the packet has been tx'ed
1577         # with an inner IPv6 header and an SRH
1578         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1579         tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1580         tx_udp = tx_pkt[UDP]
1581
1582         # common checks, regardless of tx segleft value
1583         # rx'ed packet should have 2nd IPv6 header
1584         self.assertTrue(rx_ip.payload.haslayer(IPv6))
1585         # get second (inner) IPv6 header
1586         rx_ip2 = rx_pkt.getlayer(IPv6, 2)
1587
1588         if tx_ip.segleft > 0:
1589             # SRH should NOT have been popped:
1590             #   End SID without PSP does not pop SRH if segleft>0
1591             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1592             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1593
1594             # received ip.src should be equal to expected ip.src
1595             self.assertEqual(rx_ip.src, tx_ip.src)
1596             # sidlist should be unchanged
1597             self.assertEqual(rx_srh.addresses, tx_srh.addresses)
1598             # segleft should have been decremented
1599             self.assertEqual(rx_srh.segleft, tx_srh.segleft-1)
1600             # received ip.dst should be equal to sidlist[segleft]
1601             self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
1602             # lastentry should be unchanged
1603             self.assertEqual(rx_srh.lastentry, tx_srh.lastentry)
1604             # inner IPv6 packet (ip2) should be unchanged
1605             self.assertEqual(rx_ip2.src, tx_ip2.src)
1606             self.assertEqual(rx_ip2.dst, tx_ip2.dst)
1607         # else:  # tx_ip.segleft == 0
1608             # TODO: Does this work with 2 SRHs in ingress packet?
1609
1610         # UDP layer should be unchanged
1611         self.assertEqual(rx_udp, tx_udp)
1612
1613         self.logger.debug("packet verification: SUCCESS")
1614
1615     def compare_rx_tx_packet_End_PSP(self, tx_pkt, rx_pkt):
1616         """ Compare input and output packet after passing End with PSP
1617
1618         :param tx_pkt: transmitted packet
1619         :param rx_pkt: received packet
1620         """
1621         # End (PSP) updates the headers as follows:
1622         # IPv6 + SRH (SL>1):
1623         # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2)
1624         # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1625         # IPv6 + SRH (SL=1):
1626         # in: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1627         # out: IPv6(A, S3)
1628
1629         # get first (outer) IPv6 header of rx'ed packet
1630         rx_ip = rx_pkt.getlayer(IPv6)
1631         rx_srh = None
1632         rx_ip2 = None
1633         rx_udp = rx_pkt[UDP]
1634
1635         tx_ip = tx_pkt.getlayer(IPv6)
1636         # we know the packet has been tx'ed
1637         # with an inner IPv6 header and an SRH
1638         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1639         tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1640         tx_udp = tx_pkt[UDP]
1641
1642         # common checks, regardless of tx segleft value
1643         self.assertTrue(rx_ip.payload.haslayer(IPv6))
1644         rx_ip2 = rx_pkt.getlayer(IPv6, 2)
1645         # inner IPv6 packet (ip2) should be unchanged
1646         self.assertEqual(rx_ip2.src, tx_ip2.src)
1647         self.assertEqual(rx_ip2.dst, tx_ip2.dst)
1648
1649         if tx_ip.segleft > 1:
1650             # SRH should NOT have been popped:
1651             #   End SID with PSP does not pop SRH if segleft>1
1652             # rx'ed packet should have SRH
1653             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1654             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1655
1656             # received ip.src should be equal to expected ip.src
1657             self.assertEqual(rx_ip.src, tx_ip.src)
1658             # sidlist should be unchanged
1659             self.assertEqual(rx_srh.addresses, tx_srh.addresses)
1660             # segleft should have been decremented
1661             self.assertEqual(rx_srh.segleft, tx_srh.segleft-1)
1662             # received ip.dst should be equal to sidlist[segleft]
1663             self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
1664             # lastentry should be unchanged
1665             self.assertEqual(rx_srh.lastentry, tx_srh.lastentry)
1666
1667         else:  # tx_ip.segleft <= 1
1668             # SRH should have been popped:
1669             #   End SID with PSP and segleft=1 pops SRH
1670             # the two IPv6 headers are still present
1671             # outer IPv6 header has DA == last segment of popped SRH
1672             # SRH should not be present
1673             self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1674             # outer IPv6 header ip.src should be equal to tx'ed ip.src
1675             self.assertEqual(rx_ip.src, tx_ip.src)
1676             # outer IPv6 header ip.dst should be = to tx'ed sidlist[segleft-1]
1677             self.assertEqual(rx_ip.dst, tx_srh.addresses[tx_srh.segleft-1])
1678
1679         # UDP layer should be unchanged
1680         self.assertEqual(rx_udp, tx_udp)
1681
1682         self.logger.debug("packet verification: SUCCESS")
1683
1684     def compare_rx_tx_packet_End_DX6(self, tx_pkt, rx_pkt):
1685         """ Compare input and output packet after passing End.DX6
1686
1687         :param tx_pkt: transmitted packet
1688         :param rx_pkt: received packet
1689         """
1690         # End.DX6 updates the headers as follows:
1691         # IPv6 + SRH (SL=0):
1692         # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv6(B, D)
1693         # out: IPv6(B, D)
1694         # IPv6:
1695         # in: IPv6(A, S3)IPv6(B, D)
1696         # out: IPv6(B, D)
1697
1698         # get first (outer) IPv6 header of rx'ed packet
1699         rx_ip = rx_pkt.getlayer(IPv6)
1700
1701         tx_ip = tx_pkt.getlayer(IPv6)
1702         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1703
1704         # verify if rx'ed packet has no SRH
1705         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1706
1707         # the whole rx_ip pkt should be equal to tx_ip2
1708         # except for the hlim field
1709         #   -> adjust tx'ed hlim to expected hlim
1710         tx_ip2.hlim = tx_ip2.hlim - 1
1711
1712         self.assertEqual(rx_ip, tx_ip2)
1713
1714         self.logger.debug("packet verification: SUCCESS")
1715
1716     def compare_rx_tx_packet_End_DX4(self, tx_pkt, rx_pkt):
1717         """ Compare input and output packet after passing End.DX4
1718
1719         :param tx_pkt: transmitted packet
1720         :param rx_pkt: received packet
1721         """
1722         # End.DX4 updates the headers as follows:
1723         # IPv6 + SRH (SL=0):
1724         # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv4(B, D)
1725         # out: IPv4(B, D)
1726         # IPv6:
1727         # in: IPv6(A, S3)IPv4(B, D)
1728         # out: IPv4(B, D)
1729
1730         # get IPv4 header of rx'ed packet
1731         rx_ip = rx_pkt.getlayer(IP)
1732
1733         tx_ip = tx_pkt.getlayer(IPv6)
1734         tx_ip2 = tx_pkt.getlayer(IP)
1735
1736         # verify if rx'ed packet has no SRH
1737         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1738
1739         # the whole rx_ip pkt should be equal to tx_ip2
1740         # except for the ttl field and ip checksum
1741         #   -> adjust tx'ed ttl to expected ttl
1742         tx_ip2.ttl = tx_ip2.ttl - 1
1743         #   -> set tx'ed ip checksum to None and let scapy recompute
1744         tx_ip2.chksum = None
1745         # read back the pkt (with str()) to force computing these fields
1746         # probably other ways to accomplish this are possible
1747         tx_ip2 = IP(scapy.compat.raw(tx_ip2))
1748
1749         self.assertEqual(rx_ip, tx_ip2)
1750
1751         self.logger.debug("packet verification: SUCCESS")
1752
1753     def compare_rx_tx_packet_End_DX2(self, tx_pkt, rx_pkt):
1754         """ Compare input and output packet after passing End.DX2
1755
1756         :param tx_pkt: transmitted packet
1757         :param rx_pkt: received packet
1758         """
1759         # End.DX2 updates the headers as follows:
1760         # IPv6 + SRH (SL=0):
1761         # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)L2
1762         # out: L2
1763         # IPv6:
1764         # in: IPv6(A, S3)L2
1765         # out: L2
1766
1767         # get IPv4 header of rx'ed packet
1768         rx_eth = rx_pkt.getlayer(Ether)
1769
1770         tx_ip = tx_pkt.getlayer(IPv6)
1771         # we can't just get the 2nd Ether layer
1772         # get the Raw content and dissect it as Ether
1773         tx_eth1 = Ether(scapy.compat.raw(tx_pkt[Raw]))
1774
1775         # verify if rx'ed packet has no SRH
1776         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1777
1778         # the whole rx_eth pkt should be equal to tx_eth1
1779         self.assertEqual(rx_eth, tx_eth1)
1780
1781         self.logger.debug("packet verification: SUCCESS")
1782
1783     def create_stream(self, src_if, dst_if, packet_header, packet_sizes,
1784                       count):
1785         """Create SRv6 input packet stream for defined interface.
1786
1787         :param VppInterface src_if: Interface to create packet stream for
1788         :param VppInterface dst_if: destination interface of packet stream
1789         :param packet_header: Layer3 scapy packet headers,
1790                 L2 is added when not provided,
1791                 Raw(payload) with packet_info is added
1792         :param list packet_sizes: packet stream pckt sizes,sequentially applied
1793                to packets in stream have
1794         :param int count: number of packets in packet stream
1795         :return: list of packets
1796         """
1797         self.logger.info("Creating packets")
1798         pkts = []
1799         for i in range(0, count-1):
1800             payload_info = self.create_packet_info(src_if, dst_if)
1801             self.logger.debug(
1802                 "Creating packet with index %d" % (payload_info.index))
1803             payload = self.info_to_payload(payload_info)
1804             # add L2 header if not yet provided in packet_header
1805             if packet_header.getlayer(0).name == 'Ethernet':
1806                 p = (packet_header /
1807                      Raw(payload))
1808             else:
1809                 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
1810                      packet_header /
1811                      Raw(payload))
1812             size = packet_sizes[i % len(packet_sizes)]
1813             self.logger.debug("Packet size %d" % (size))
1814             self.extend_packet(p, size)
1815             # we need to store the packet with the automatic fields computed
1816             # read back the dumped packet (with str())
1817             # to force computing these fields
1818             # probably other ways are possible
1819             p = Ether(scapy.compat.raw(p))
1820             payload_info.data = p.copy()
1821             self.logger.debug(ppp("Created packet:", p))
1822             pkts.append(p)
1823         self.logger.info("Done creating packets")
1824         return pkts
1825
1826     def send_and_verify_pkts(self, input, pkts, output, compare_func):
1827         """Send packets and verify received packets using compare_func
1828
1829         :param input: ingress interface of DUT
1830         :param pkts: list of packets to transmit
1831         :param output: egress interface of DUT
1832         :param compare_func: function to compare in and out packets
1833         """
1834         # add traffic stream to input interface
1835         input.add_stream(pkts)
1836
1837         # enable capture on all interfaces
1838         self.pg_enable_capture(self.pg_interfaces)
1839
1840         # start traffic
1841         self.logger.info("Starting traffic")
1842         self.pg_start()
1843
1844         # get output capture
1845         self.logger.info("Getting packet capture")
1846         capture = output.get_capture()
1847
1848         # assert nothing was captured on input interface
1849         input.assert_nothing_captured()
1850
1851         # verify captured packets
1852         self.verify_captured_pkts(output, capture, compare_func)
1853
1854     def create_packet_header_IPv6(self, dst):
1855         """Create packet header: IPv6 header, UDP header
1856
1857         :param dst: IPv6 destination address
1858
1859         IPv6 source address is 1234::1
1860         UDP source port and destination port are 1234
1861         """
1862
1863         p = (IPv6(src='1234::1', dst=dst) /
1864              UDP(sport=1234, dport=1234))
1865         return p
1866
1867     def create_packet_header_IPv6_SRH(self, sidlist, segleft):
1868         """Create packet header: IPv6 header with SRH, UDP header
1869
1870         :param list sidlist: segment list
1871         :param int segleft: segments-left field value
1872
1873         IPv6 destination address is set to sidlist[segleft]
1874         IPv6 source addresses are 1234::1 and 4321::1
1875         UDP source port and destination port are 1234
1876         """
1877
1878         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1879              IPv6ExtHdrSegmentRouting(addresses=sidlist) /
1880              UDP(sport=1234, dport=1234))
1881         return p
1882
1883     def create_packet_header_IPv6_SRH_IPv6(self, dst, sidlist, segleft):
1884         """Create packet header: IPv6 encapsulated in SRv6:
1885         IPv6 header with SRH, IPv6 header, UDP header
1886
1887         :param ipv6address dst: inner IPv6 destination address
1888         :param list sidlist: segment list of outer IPv6 SRH
1889         :param int segleft: segments-left field of outer IPv6 SRH
1890
1891         Outer IPv6 destination address is set to sidlist[segleft]
1892         IPv6 source addresses are 1234::1 and 4321::1
1893         UDP source port and destination port are 1234
1894         """
1895
1896         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1897              IPv6ExtHdrSegmentRouting(addresses=sidlist,
1898                                       segleft=segleft, nh=41) /
1899              IPv6(src='4321::1', dst=dst) /
1900              UDP(sport=1234, dport=1234))
1901         return p
1902
1903     def create_packet_header_IPv6_IPv6(self, dst_inner, dst_outer):
1904         """Create packet header: IPv6 encapsulated in IPv6:
1905         IPv6 header, IPv6 header, UDP header
1906
1907         :param ipv6address dst_inner: inner IPv6 destination address
1908         :param ipv6address dst_outer: outer IPv6 destination address
1909
1910         IPv6 source addresses are 1234::1 and 4321::1
1911         UDP source port and destination port are 1234
1912         """
1913
1914         p = (IPv6(src='1234::1', dst=dst_outer) /
1915              IPv6(src='4321::1', dst=dst_inner) /
1916              UDP(sport=1234, dport=1234))
1917         return p
1918
1919     def create_packet_header_IPv6_SRH_SRH_IPv6(self, dst, sidlist1, segleft1,
1920                                                sidlist2, segleft2):
1921         """Create packet header: IPv6 encapsulated in SRv6 with 2 SRH:
1922         IPv6 header with SRH, 2nd SRH, IPv6 header, UDP header
1923
1924         :param ipv6address dst: inner IPv6 destination address
1925         :param list sidlist1: segment list of outer IPv6 SRH
1926         :param int segleft1: segments-left field of outer IPv6 SRH
1927         :param list sidlist2: segment list of inner IPv6 SRH
1928         :param int segleft2: segments-left field of inner IPv6 SRH
1929
1930         Outer IPv6 destination address is set to sidlist[segleft]
1931         IPv6 source addresses are 1234::1 and 4321::1
1932         UDP source port and destination port are 1234
1933         """
1934
1935         p = (IPv6(src='1234::1', dst=sidlist1[segleft1]) /
1936              IPv6ExtHdrSegmentRouting(addresses=sidlist1,
1937                                       segleft=segleft1, nh=43) /
1938              IPv6ExtHdrSegmentRouting(addresses=sidlist2,
1939                                       segleft=segleft2, nh=41) /
1940              IPv6(src='4321::1', dst=dst) /
1941              UDP(sport=1234, dport=1234))
1942         return p
1943
1944     def create_packet_header_IPv4(self, dst):
1945         """Create packet header: IPv4 header, UDP header
1946
1947         :param dst: IPv4 destination address
1948
1949         IPv4 source address is 123.1.1.1
1950         UDP source port and destination port are 1234
1951         """
1952
1953         p = (IP(src='123.1.1.1', dst=dst) /
1954              UDP(sport=1234, dport=1234))
1955         return p
1956
1957     def create_packet_header_IPv6_IPv4(self, dst_inner, dst_outer):
1958         """Create packet header: IPv4 encapsulated in IPv6:
1959         IPv6 header, IPv4 header, UDP header
1960
1961         :param ipv4address dst_inner: inner IPv4 destination address
1962         :param ipv6address dst_outer: outer IPv6 destination address
1963
1964         IPv6 source address is 1234::1
1965         IPv4 source address is 123.1.1.1
1966         UDP source port and destination port are 1234
1967         """
1968
1969         p = (IPv6(src='1234::1', dst=dst_outer) /
1970              IP(src='123.1.1.1', dst=dst_inner) /
1971              UDP(sport=1234, dport=1234))
1972         return p
1973
1974     def create_packet_header_IPv6_SRH_IPv4(self, dst, sidlist, segleft):
1975         """Create packet header: IPv4 encapsulated in SRv6:
1976         IPv6 header with SRH, IPv4 header, UDP header
1977
1978         :param ipv4address dst: inner IPv4 destination address
1979         :param list sidlist: segment list of outer IPv6 SRH
1980         :param int segleft: segments-left field of outer IPv6 SRH
1981
1982         Outer IPv6 destination address is set to sidlist[segleft]
1983         IPv6 source address is 1234::1
1984         IPv4 source address is 123.1.1.1
1985         UDP source port and destination port are 1234
1986         """
1987
1988         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1989              IPv6ExtHdrSegmentRouting(addresses=sidlist,
1990                                       segleft=segleft, nh=4) /
1991              IP(src='123.1.1.1', dst=dst) /
1992              UDP(sport=1234, dport=1234))
1993         return p
1994
1995     def create_packet_header_L2(self, vlan=0):
1996         """Create packet header: L2 header
1997
1998         :param vlan: if vlan!=0 then add 802.1q header
1999         """
2000         # Note: the dst addr ('00:55:44:33:22:11') is used in
2001         # the compare function compare_rx_tx_packet_T_Encaps_L2
2002         # to detect presence of L2 in SRH payload
2003         p = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
2004         etype = 0x8137  # IPX
2005         if vlan:
2006             # add 802.1q layer
2007             p /= Dot1Q(vlan=vlan, type=etype)
2008         else:
2009             p.type = etype
2010         return p
2011
2012     def create_packet_header_IPv6_SRH_L2(self, sidlist, segleft, vlan=0):
2013         """Create packet header: L2 encapsulated in SRv6:
2014         IPv6 header with SRH, L2
2015
2016         :param list sidlist: segment list of outer IPv6 SRH
2017         :param int segleft: segments-left field of outer IPv6 SRH
2018         :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
2019
2020         Outer IPv6 destination address is set to sidlist[segleft]
2021         IPv6 source address is 1234::1
2022         """
2023         eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
2024         etype = 0x8137  # IPX
2025         if vlan:
2026             # add 802.1q layer
2027             eth /= Dot1Q(vlan=vlan, type=etype)
2028         else:
2029             eth.type = etype
2030
2031         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
2032              IPv6ExtHdrSegmentRouting(addresses=sidlist,
2033                                       segleft=segleft, nh=59) /
2034              eth)
2035         return p
2036
2037     def create_packet_header_IPv6_L2(self, dst_outer, vlan=0):
2038         """Create packet header: L2 encapsulated in IPv6:
2039         IPv6 header, L2
2040
2041         :param ipv6address dst_outer: outer IPv6 destination address
2042         :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
2043         """
2044         eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
2045         etype = 0x8137  # IPX
2046         if vlan:
2047             # add 802.1q layer
2048             eth /= Dot1Q(vlan=vlan, type=etype)
2049         else:
2050             eth.type = etype
2051
2052         p = (IPv6(src='1234::1', dst=dst_outer, nh=59) / eth)
2053         return p
2054
2055     def get_payload_info(self, packet):
2056         """ Extract the payload_info from the packet
2057         """
2058         # in most cases, payload_info is in packet[Raw]
2059         # but packet[Raw] gives the complete payload
2060         # (incl L2 header) for the T.Encaps L2 case
2061         try:
2062             payload_info = self.payload_to_info(packet[Raw])
2063
2064         except:
2065             # remote L2 header from packet[Raw]:
2066             # take packet[Raw], convert it to an Ether layer
2067             # and then extract Raw from it
2068             payload_info = self.payload_to_info(
2069                 Ether(scapy.compat.r(packet[Raw]))[Raw])
2070
2071         return payload_info
2072
2073     def verify_captured_pkts(self, dst_if, capture, compare_func):
2074         """
2075         Verify captured packet stream for specified interface.
2076         Compare ingress with egress packets using the specified compare fn
2077
2078         :param dst_if: egress interface of DUT
2079         :param capture: captured packets
2080         :param compare_func: function to compare in and out packet
2081         """
2082         self.logger.info("Verifying capture on interface %s using function %s"
2083                          % (dst_if.name, compare_func.__name__))
2084
2085         last_info = dict()
2086         for i in self.pg_interfaces:
2087             last_info[i.sw_if_index] = None
2088         dst_sw_if_index = dst_if.sw_if_index
2089
2090         for packet in capture:
2091             try:
2092                 # extract payload_info from packet's payload
2093                 payload_info = self.get_payload_info(packet)
2094                 packet_index = payload_info.index
2095
2096                 self.logger.debug("Verifying packet with index %d"
2097                                   % (packet_index))
2098                 # packet should have arrived on the expected interface
2099                 self.assertEqual(payload_info.dst, dst_sw_if_index)
2100                 self.logger.debug(
2101                     "Got packet on interface %s: src=%u (idx=%u)" %
2102                     (dst_if.name, payload_info.src, packet_index))
2103
2104                 # search for payload_info with same src and dst if_index
2105                 # this will give us the transmitted packet
2106                 next_info = self.get_next_packet_info_for_interface2(
2107                     payload_info.src, dst_sw_if_index,
2108                     last_info[payload_info.src])
2109                 last_info[payload_info.src] = next_info
2110                 # next_info should not be None
2111                 self.assertTrue(next_info is not None)
2112                 # index of tx and rx packets should be equal
2113                 self.assertEqual(packet_index, next_info.index)
2114                 # data field of next_info contains the tx packet
2115                 txed_packet = next_info.data
2116
2117                 self.logger.debug(ppp("Transmitted packet:",
2118                                       txed_packet))  # ppp=Pretty Print Packet
2119
2120                 self.logger.debug(ppp("Received packet:", packet))
2121
2122                 # compare rcvd packet with expected packet using compare_func
2123                 compare_func(txed_packet, packet)
2124
2125             except:
2126                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2127                 raise
2128
2129         # have all expected packets arrived?
2130         for i in self.pg_interfaces:
2131             remaining_packet = self.get_next_packet_info_for_interface2(
2132                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
2133             self.assertTrue(remaining_packet is None,
2134                             "Interface %s: Packet expected from interface %s "
2135                             "didn't arrive" % (dst_if.name, i.name))
2136
2137
2138 if __name__ == '__main__':
2139     unittest.main(testRunner=VppTestRunner)