sr: fix possible null-pointer dereference
[vpp.git] / src / plugins / srv6-am / test / test_srv6.py
1 #!/usr/bin/env python3
2
3 import unittest
4 import binascii
5 from socket import AF_INET6
6
7 from framework import VppTestCase, VppTestRunner
8 from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathProto, VppIpTable
9 from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \
10     SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes
11
12 import scapy.compat
13 from scapy.packet import Raw
14 from scapy.layers.l2 import Ether, Dot1Q
15 from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
16 from scapy.layers.inet import IP, UDP
17
18 from util import ppp
19
20
21 class TestSRv6(VppTestCase):
22     """ SRv6 Test Case """
23
24     @classmethod
25     def setUpClass(cls):
26         super(TestSRv6, cls).setUpClass()
27
28     @classmethod
29     def tearDownClass(cls):
30         super(TestSRv6, cls).tearDownClass()
31
32     def setUp(self):
33         """ Perform test setup before each test case.
34         """
35         super(TestSRv6, self).setUp()
36
37         # packet sizes, inclusive L2 overhead
38         self.pg_packet_sizes = [64, 512, 1518, 9018]
39
40         # reset packet_infos
41         self.reset_packet_infos()
42
43     def tearDown(self):
44         """ Clean up test setup after each test case.
45         """
46         self.teardown_interfaces()
47
48         super(TestSRv6, self).tearDown()
49
50     def configure_interface(self,
51                             interface,
52                             ipv6=False, ipv4=False,
53                             ipv6_table_id=0, ipv4_table_id=0):
54         """ Configure interface.
55         :param ipv6: configure IPv6 on interface
56         :param ipv4: configure IPv4 on interface
57         :param ipv6_table_id: FIB table_id for IPv6
58         :param ipv4_table_id: FIB table_id for IPv4
59         """
60         self.logger.debug("Configuring interface %s" % (interface.name))
61         if ipv6:
62             self.logger.debug("Configuring IPv6")
63             interface.set_table_ip6(ipv6_table_id)
64             interface.config_ip6()
65             interface.resolve_ndp(timeout=5)
66         if ipv4:
67             self.logger.debug("Configuring IPv4")
68             interface.set_table_ip4(ipv4_table_id)
69             interface.config_ip4()
70             interface.resolve_arp()
71         interface.admin_up()
72
73     def setup_interfaces(self, ipv6=[], ipv4=[],
74                          ipv6_table_id=[], ipv4_table_id=[]):
75         """ Create and configure interfaces.
76
77         :param ipv6: list of interface IPv6 capabilities
78         :param ipv4: list of interface IPv4 capabilities
79         :param ipv6_table_id: list of intf IPv6 FIB table_ids
80         :param ipv4_table_id: list of intf IPv4 FIB table_ids
81         :returns: List of created interfaces.
82         """
83         # how many interfaces?
84         if len(ipv6):
85             count = len(ipv6)
86         else:
87             count = len(ipv4)
88         self.logger.debug("Creating and configuring %d interfaces" % (count))
89
90         # fill up ipv6 and ipv4 lists if needed
91         # not enabled (False) is the default
92         if len(ipv6) < count:
93             ipv6 += (count - len(ipv6)) * [False]
94         if len(ipv4) < count:
95             ipv4 += (count - len(ipv4)) * [False]
96
97         # fill up table_id lists if needed
98         # table_id 0 (global) is the default
99         if len(ipv6_table_id) < count:
100             ipv6_table_id += (count - len(ipv6_table_id)) * [0]
101         if len(ipv4_table_id) < count:
102             ipv4_table_id += (count - len(ipv4_table_id)) * [0]
103
104         # create 'count' pg interfaces
105         self.create_pg_interfaces(range(count))
106
107         # setup all interfaces
108         for i in range(count):
109             intf = self.pg_interfaces[i]
110             self.configure_interface(intf,
111                                      ipv6[i], ipv4[i],
112                                      ipv6_table_id[i], ipv4_table_id[i])
113
114         if any(ipv6):
115             self.logger.debug(self.vapi.cli("show ip6 neighbors"))
116         if any(ipv4):
117             self.logger.debug(self.vapi.cli("show ip4 neighbors"))
118         self.logger.debug(self.vapi.cli("show interface"))
119         self.logger.debug(self.vapi.cli("show hardware"))
120
121         return self.pg_interfaces
122
123     def teardown_interfaces(self):
124         """ Unconfigure and bring down interface.
125         """
126         self.logger.debug("Tearing down interfaces")
127         # tear down all interfaces
128         # AFAIK they cannot be deleted
129         for i in self.pg_interfaces:
130             self.logger.debug("Tear down interface %s" % (i.name))
131             i.admin_down()
132             i.unconfig()
133             i.set_table_ip4(0)
134             i.set_table_ip6(0)
135
136     @unittest.skipUnless(0, "PC to fix")
137     def test_SRv6_T_Encaps(self):
138         """ Test SRv6 Transit.Encaps behavior for IPv6.
139         """
140         # send traffic to one destination interface
141         # source and destination are IPv6 only
142         self.setup_interfaces(ipv6=[True, True])
143
144         # configure FIB entries
145         route = VppIpRoute(self, "a4::", 64,
146                            [VppRoutePath(self.pg1.remote_ip6,
147                                          self.pg1.sw_if_index)])
148         route.add_vpp_config()
149
150         # configure encaps IPv6 source address
151         # needs to be done before SR Policy config
152         # TODO: API?
153         self.vapi.cli("set sr encaps source addr a3::")
154
155         bsid = 'a3::9999:1'
156         # configure SRv6 Policy
157         # Note: segment list order: first -> last
158         sr_policy = VppSRv6Policy(
159             self, bsid=bsid,
160             is_encap=1,
161             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
162             weight=1, fib_table=0,
163             segments=['a4::', 'a5::', 'a6::c7'],
164             source='a3::')
165         sr_policy.add_vpp_config()
166         self.sr_policy = sr_policy
167
168         # log the sr policies
169         self.logger.info(self.vapi.cli("show sr policies"))
170
171         # steer IPv6 traffic to a7::/64 into SRv6 Policy
172         # use the bsid of the above self.sr_policy
173         pol_steering = VppSRv6Steering(
174                         self,
175                         bsid=self.sr_policy.bsid,
176                         prefix="a7::", mask_width=64,
177                         traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
178                         sr_policy_index=0, table_id=0,
179                         sw_if_index=0)
180         pol_steering.add_vpp_config()
181
182         # log the sr steering policies
183         self.logger.info(self.vapi.cli("show sr steering policies"))
184
185         # create packets
186         count = len(self.pg_packet_sizes)
187         dst_inner = 'a7::1234'
188         pkts = []
189
190         # create IPv6 packets without SRH
191         packet_header = self.create_packet_header_IPv6(dst_inner)
192         # create traffic stream pg0->pg1
193         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
194                                        self.pg_packet_sizes, count))
195
196         # create IPv6 packets with SRH
197         # packets with segments-left 1, active segment a7::
198         packet_header = self.create_packet_header_IPv6_SRH(
199             sidlist=['a8::', 'a7::', 'a6::'],
200             segleft=1)
201         # create traffic stream pg0->pg1
202         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
203                                        self.pg_packet_sizes, count))
204
205         # create IPv6 packets with SRH and IPv6
206         # packets with segments-left 1, active segment a7::
207         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
208             dst_inner,
209             sidlist=['a8::', 'a7::', 'a6::'],
210             segleft=1)
211         # create traffic stream pg0->pg1
212         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
213                                        self.pg_packet_sizes, count))
214
215         # send packets and verify received packets
216         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
217                                   self.compare_rx_tx_packet_T_Encaps)
218
219         # log the localsid counters
220         self.logger.info(self.vapi.cli("show sr localsid"))
221
222         # remove SR steering
223         pol_steering.remove_vpp_config()
224         self.logger.info(self.vapi.cli("show sr steering policies"))
225
226         # remove SR Policies
227         self.sr_policy.remove_vpp_config()
228         self.logger.info(self.vapi.cli("show sr policies"))
229
230         # remove FIB entries
231         # done by tearDown
232
233         # cleanup interfaces
234         self.teardown_interfaces()
235
236     @unittest.skipUnless(0, "PC to fix")
237     def test_SRv6_T_Insert(self):
238         """ Test SRv6 Transit.Insert behavior (IPv6 only).
239         """
240         # send traffic to one destination interface
241         # source and destination are IPv6 only
242         self.setup_interfaces(ipv6=[True, True])
243
244         # configure FIB entries
245         route = VppIpRoute(self, "a4::", 64,
246                            [VppRoutePath(self.pg1.remote_ip6,
247                                          self.pg1.sw_if_index)])
248         route.add_vpp_config()
249
250         # configure encaps IPv6 source address
251         # needs to be done before SR Policy config
252         # TODO: API?
253         self.vapi.cli("set sr encaps source addr a3::")
254
255         bsid = 'a3::9999:1'
256         # configure SRv6 Policy
257         # Note: segment list order: first -> last
258         sr_policy = VppSRv6Policy(
259             self, bsid=bsid,
260             is_encap=0,
261             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
262             weight=1, fib_table=0,
263             segments=['a4::', 'a5::', 'a6::c7'],
264             source='a3::')
265         sr_policy.add_vpp_config()
266         self.sr_policy = sr_policy
267
268         # log the sr policies
269         self.logger.info(self.vapi.cli("show sr policies"))
270
271         # steer IPv6 traffic to a7::/64 into SRv6 Policy
272         # use the bsid of the above self.sr_policy
273         pol_steering = VppSRv6Steering(
274                         self,
275                         bsid=self.sr_policy.bsid,
276                         prefix="a7::", mask_width=64,
277                         traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
278                         sr_policy_index=0, table_id=0,
279                         sw_if_index=0)
280         pol_steering.add_vpp_config()
281
282         # log the sr steering policies
283         self.logger.info(self.vapi.cli("show sr steering policies"))
284
285         # create packets
286         count = len(self.pg_packet_sizes)
287         dst_inner = 'a7::1234'
288         pkts = []
289
290         # create IPv6 packets without SRH
291         packet_header = self.create_packet_header_IPv6(dst_inner)
292         # create traffic stream pg0->pg1
293         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
294                                        self.pg_packet_sizes, count))
295
296         # create IPv6 packets with SRH
297         # packets with segments-left 1, active segment a7::
298         packet_header = self.create_packet_header_IPv6_SRH(
299             sidlist=['a8::', 'a7::', 'a6::'],
300             segleft=1)
301         # create traffic stream pg0->pg1
302         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
303                                        self.pg_packet_sizes, count))
304
305         # send packets and verify received packets
306         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
307                                   self.compare_rx_tx_packet_T_Insert)
308
309         # log the localsid counters
310         self.logger.info(self.vapi.cli("show sr localsid"))
311
312         # remove SR steering
313         pol_steering.remove_vpp_config()
314         self.logger.info(self.vapi.cli("show sr steering policies"))
315
316         # remove SR Policies
317         self.sr_policy.remove_vpp_config()
318         self.logger.info(self.vapi.cli("show sr policies"))
319
320         # remove FIB entries
321         # done by tearDown
322
323         # cleanup interfaces
324         self.teardown_interfaces()
325
326     @unittest.skipUnless(0, "PC to fix")
327     def test_SRv6_T_Encaps_IPv4(self):
328         """ Test SRv6 Transit.Encaps behavior for IPv4.
329         """
330         # send traffic to one destination interface
331         # source interface is IPv4 only
332         # destination interface is IPv6 only
333         self.setup_interfaces(ipv6=[False, True], ipv4=[True, False])
334
335         # configure FIB entries
336         route = VppIpRoute(self, "a4::", 64,
337                            [VppRoutePath(self.pg1.remote_ip6,
338                                          self.pg1.sw_if_index)])
339         route.add_vpp_config()
340
341         # configure encaps IPv6 source address
342         # needs to be done before SR Policy config
343         # TODO: API?
344         self.vapi.cli("set sr encaps source addr a3::")
345
346         bsid = 'a3::9999:1'
347         # configure SRv6 Policy
348         # Note: segment list order: first -> last
349         sr_policy = VppSRv6Policy(
350             self, bsid=bsid,
351             is_encap=1,
352             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
353             weight=1, fib_table=0,
354             segments=['a4::', 'a5::', 'a6::c7'],
355             source='a3::')
356         sr_policy.add_vpp_config()
357         self.sr_policy = sr_policy
358
359         # log the sr policies
360         self.logger.info(self.vapi.cli("show sr policies"))
361
362         # steer IPv4 traffic to 7.1.1.0/24 into SRv6 Policy
363         # use the bsid of the above self.sr_policy
364         pol_steering = VppSRv6Steering(
365                         self,
366                         bsid=self.sr_policy.bsid,
367                         prefix="7.1.1.0", mask_width=24,
368                         traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV4,
369                         sr_policy_index=0, table_id=0,
370                         sw_if_index=0)
371         pol_steering.add_vpp_config()
372
373         # log the sr steering policies
374         self.logger.info(self.vapi.cli("show sr steering policies"))
375
376         # create packets
377         count = len(self.pg_packet_sizes)
378         dst_inner = '7.1.1.123'
379         pkts = []
380
381         # create IPv4 packets
382         packet_header = self.create_packet_header_IPv4(dst_inner)
383         # create traffic stream pg0->pg1
384         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
385                                        self.pg_packet_sizes, count))
386
387         # send packets and verify received packets
388         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
389                                   self.compare_rx_tx_packet_T_Encaps_IPv4)
390
391         # log the localsid counters
392         self.logger.info(self.vapi.cli("show sr localsid"))
393
394         # remove SR steering
395         pol_steering.remove_vpp_config()
396         self.logger.info(self.vapi.cli("show sr steering policies"))
397
398         # remove SR Policies
399         self.sr_policy.remove_vpp_config()
400         self.logger.info(self.vapi.cli("show sr policies"))
401
402         # remove FIB entries
403         # done by tearDown
404
405         # cleanup interfaces
406         self.teardown_interfaces()
407
408     @unittest.skip("VPP crashes after running this test")
409     def test_SRv6_T_Encaps_L2(self):
410         """ Test SRv6 Transit.Encaps behavior for L2.
411         """
412         # send traffic to one destination interface
413         # source interface is IPv4 only TODO?
414         # destination interface is IPv6 only
415         self.setup_interfaces(ipv6=[False, True], ipv4=[False, False])
416
417         # configure FIB entries
418         route = VppIpRoute(self, "a4::", 64,
419                            [VppRoutePath(self.pg1.remote_ip6,
420                                          self.pg1.sw_if_index)])
421         route.add_vpp_config()
422
423         # configure encaps IPv6 source address
424         # needs to be done before SR Policy config
425         # TODO: API?
426         self.vapi.cli("set sr encaps source addr a3::")
427
428         bsid = 'a3::9999:1'
429         # configure SRv6 Policy
430         # Note: segment list order: first -> last
431         sr_policy = VppSRv6Policy(
432             self, bsid=bsid,
433             is_encap=1,
434             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
435             weight=1, fib_table=0,
436             segments=['a4::', 'a5::', 'a6::c7'],
437             source='a3::')
438         sr_policy.add_vpp_config()
439         self.sr_policy = sr_policy
440
441         # log the sr policies
442         self.logger.info(self.vapi.cli("show sr policies"))
443
444         # steer L2 traffic into SRv6 Policy
445         # use the bsid of the above self.sr_policy
446         pol_steering = VppSRv6Steering(
447                         self,
448                         bsid=self.sr_policy.bsid,
449                         prefix="::", mask_width=0,
450                         traffic_type=SRv6PolicySteeringTypes.SR_STEER_L2,
451                         sr_policy_index=0, table_id=0,
452                         sw_if_index=self.pg0.sw_if_index)
453         pol_steering.add_vpp_config()
454
455         # log the sr steering policies
456         self.logger.info(self.vapi.cli("show sr steering policies"))
457
458         # create packets
459         count = len(self.pg_packet_sizes)
460         pkts = []
461
462         # create L2 packets without dot1q header
463         packet_header = self.create_packet_header_L2()
464         # create traffic stream pg0->pg1
465         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
466                                        self.pg_packet_sizes, count))
467
468         # create L2 packets with dot1q header
469         packet_header = self.create_packet_header_L2(vlan=123)
470         # create traffic stream pg0->pg1
471         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
472                                        self.pg_packet_sizes, count))
473
474         # send packets and verify received packets
475         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
476                                   self.compare_rx_tx_packet_T_Encaps_L2)
477
478         # log the localsid counters
479         self.logger.info(self.vapi.cli("show sr localsid"))
480
481         # remove SR steering
482         pol_steering.remove_vpp_config()
483         self.logger.info(self.vapi.cli("show sr steering policies"))
484
485         # remove SR Policies
486         self.sr_policy.remove_vpp_config()
487         self.logger.info(self.vapi.cli("show sr policies"))
488
489         # remove FIB entries
490         # done by tearDown
491
492         # cleanup interfaces
493         self.teardown_interfaces()
494
495     def test_SRv6_End(self):
496         """ Test SRv6 End (without PSP) behavior.
497         """
498         # send traffic to one destination interface
499         # source and destination interfaces are IPv6 only
500         self.setup_interfaces(ipv6=[True, True])
501
502         # configure FIB entries
503         route = VppIpRoute(self, "a4::", 64,
504                            [VppRoutePath(self.pg1.remote_ip6,
505                                          self.pg1.sw_if_index)])
506         route.add_vpp_config()
507
508         # configure SRv6 localSID End without PSP behavior
509         localsid = VppSRv6LocalSID(
510                         self, localsid={'addr': 'A3::0'},
511                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
512                         nh_addr4='0.0.0.0',
513                         nh_addr6='::',
514                         end_psp=0,
515                         sw_if_index=0,
516                         vlan_index=0,
517                         fib_table=0)
518         localsid.add_vpp_config()
519         # log the localsids
520         self.logger.debug(self.vapi.cli("show sr localsid"))
521
522         # create IPv6 packets with SRH (SL=2, SL=1, SL=0)
523         # send one packet per SL value per packet size
524         # SL=0 packet with localSID End with USP needs 2nd SRH
525         count = len(self.pg_packet_sizes)
526         dst_inner = 'a4::1234'
527         pkts = []
528
529         # packets with segments-left 2, active segment a3::
530         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
531                 dst_inner,
532                 sidlist=['a5::', 'a4::', 'a3::'],
533                 segleft=2)
534         # create traffic stream pg0->pg1
535         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
536                                        self.pg_packet_sizes, count))
537
538         # packets with segments-left 1, active segment a3::
539         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
540                 dst_inner,
541                 sidlist=['a4::', 'a3::', 'a2::'],
542                 segleft=1)
543         # add to traffic stream pg0->pg1
544         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
545                                        self.pg_packet_sizes, count))
546
547         # TODO: test behavior with SL=0 packet (needs 2*SRH?)
548
549         expected_count = len(pkts)
550
551         # packets without SRH (should not crash)
552         packet_header = self.create_packet_header_IPv6('a3::')
553         # create traffic stream pg0->pg1
554         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
555                                        self.pg_packet_sizes, count))
556
557         # send packets and verify received packets
558         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
559                                   self.compare_rx_tx_packet_End,
560                                   expected_count=expected_count)
561
562         # log the localsid counters
563         self.logger.info(self.vapi.cli("show sr localsid"))
564
565         # remove SRv6 localSIDs
566         localsid.remove_vpp_config()
567
568         # remove FIB entries
569         # done by tearDown
570
571         # cleanup interfaces
572         self.teardown_interfaces()
573
574     def test_SRv6_End_with_PSP(self):
575         """ Test SRv6 End with PSP behavior.
576         """
577         # send traffic to one destination interface
578         # source and destination interfaces are IPv6 only
579         self.setup_interfaces(ipv6=[True, True])
580
581         # configure FIB entries
582         route = VppIpRoute(self, "a4::", 64,
583                            [VppRoutePath(self.pg1.remote_ip6,
584                                          self.pg1.sw_if_index)])
585         route.add_vpp_config()
586
587         # configure SRv6 localSID End with PSP behavior
588         localsid = VppSRv6LocalSID(
589                         self, localsid={'addr': 'A3::0'},
590                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
591                         nh_addr4='0.0.0.0',
592                         nh_addr6='::',
593                         end_psp=1,
594                         sw_if_index=0,
595                         vlan_index=0,
596                         fib_table=0)
597         localsid.add_vpp_config()
598         # log the localsids
599         self.logger.debug(self.vapi.cli("show sr localsid"))
600
601         # create IPv6 packets with SRH (SL=2, SL=1)
602         # send one packet per SL value per packet size
603         # SL=0 packet with localSID End with PSP is dropped
604         count = len(self.pg_packet_sizes)
605         dst_inner = 'a4::1234'
606         pkts = []
607
608         # packets with segments-left 2, active segment a3::
609         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
610                     dst_inner,
611                     sidlist=['a5::', 'a4::', 'a3::'],
612                     segleft=2)
613         # create traffic stream pg0->pg1
614         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
615                                        self.pg_packet_sizes, count))
616
617         # packets with segments-left 1, active segment a3::
618         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
619                     dst_inner,
620                     sidlist=['a4::', 'a3::', 'a2::'],
621                     segleft=1)
622         # add to traffic stream pg0->pg1
623         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
624                                        self.pg_packet_sizes, count))
625
626         # send packets and verify received packets
627         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
628                                   self.compare_rx_tx_packet_End_PSP)
629
630         # log the localsid counters
631         self.logger.info(self.vapi.cli("show sr localsid"))
632
633         # remove SRv6 localSIDs
634         localsid.remove_vpp_config()
635
636         # remove FIB entries
637         # done by tearDown
638
639         # cleanup interfaces
640         self.teardown_interfaces()
641
642     def test_SRv6_End_X(self):
643         """ Test SRv6 End.X (without PSP) behavior.
644         """
645         # create three interfaces (1 source, 2 destinations)
646         # source and destination interfaces are IPv6 only
647         self.setup_interfaces(ipv6=[True, True, True])
648
649         # configure FIB entries
650         # a4::/64 via pg1 and pg2
651         route = VppIpRoute(self, "a4::", 64,
652                            [VppRoutePath(self.pg1.remote_ip6,
653                                          self.pg1.sw_if_index),
654                             VppRoutePath(self.pg2.remote_ip6,
655                                          self.pg2.sw_if_index)])
656         route.add_vpp_config()
657         self.logger.debug(self.vapi.cli("show ip6 fib"))
658
659         # configure SRv6 localSID End.X without PSP behavior
660         # End.X points to interface pg1
661         localsid = VppSRv6LocalSID(
662                         self, localsid={'addr': 'A3::C4'},
663                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
664                         nh_addr4='0.0.0.0',
665                         nh_addr6=self.pg1.remote_ip6,
666                         end_psp=0,
667                         sw_if_index=self.pg1.sw_if_index,
668                         vlan_index=0,
669                         fib_table=0)
670         localsid.add_vpp_config()
671         # log the localsids
672         self.logger.debug(self.vapi.cli("show sr localsid"))
673
674         # create IPv6 packets with SRH (SL=2, SL=1)
675         # send one packet per SL value per packet size
676         # SL=0 packet with localSID End with PSP is dropped
677         count = len(self.pg_packet_sizes)
678         dst_inner = 'a4::1234'
679         pkts = []
680
681         # packets with segments-left 2, active segment a3::c4
682         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
683                     dst_inner,
684                     sidlist=['a5::', 'a4::', 'a3::c4'],
685                     segleft=2)
686         # create traffic stream pg0->pg1
687         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
688                                        self.pg_packet_sizes, count))
689
690         # packets with segments-left 1, active segment a3::c4
691         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
692                     dst_inner,
693                     sidlist=['a4::', 'a3::c4', 'a2::'],
694                     segleft=1)
695         # add to traffic stream pg0->pg1
696         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
697                                        self.pg_packet_sizes, count))
698
699         # send packets and verify received packets
700         # using same comparison function as End (no PSP)
701         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
702                                   self.compare_rx_tx_packet_End)
703
704         # assert nothing was received on the other interface (pg2)
705         self.pg2.assert_nothing_captured("mis-directed packet(s)")
706
707         # log the localsid counters
708         self.logger.info(self.vapi.cli("show sr localsid"))
709
710         # remove SRv6 localSIDs
711         localsid.remove_vpp_config()
712
713         # remove FIB entries
714         # done by tearDown
715
716         # cleanup interfaces
717         self.teardown_interfaces()
718
719     def test_SRv6_End_X_with_PSP(self):
720         """ Test SRv6 End.X with PSP behavior.
721         """
722         # create three interfaces (1 source, 2 destinations)
723         # source and destination interfaces are IPv6 only
724         self.setup_interfaces(ipv6=[True, True, True])
725
726         # configure FIB entries
727         # a4::/64 via pg1 and pg2
728         route = VppIpRoute(self, "a4::", 64,
729                            [VppRoutePath(
730                                self.pg1.remote_ip6,
731                                self.pg1.sw_if_index),
732                             VppRoutePath(self.pg2.remote_ip6,
733                                          self.pg2.sw_if_index)])
734         route.add_vpp_config()
735
736         # configure SRv6 localSID End with PSP behavior
737         localsid = VppSRv6LocalSID(
738                         self, localsid={'addr': 'A3::C4'},
739                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
740                         nh_addr4='0.0.0.0',
741                         nh_addr6=self.pg1.remote_ip6,
742                         end_psp=1,
743                         sw_if_index=self.pg1.sw_if_index,
744                         vlan_index=0,
745                         fib_table=0)
746         localsid.add_vpp_config()
747         # log the localsids
748         self.logger.debug(self.vapi.cli("show sr localsid"))
749
750         # create IPv6 packets with SRH (SL=2, SL=1)
751         # send one packet per SL value per packet size
752         # SL=0 packet with localSID End with PSP is dropped
753         count = len(self.pg_packet_sizes)
754         dst_inner = 'a4::1234'
755         pkts = []
756
757         # packets with segments-left 2, active segment a3::
758         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
759                     dst_inner,
760                     sidlist=['a5::', 'a4::', 'a3::c4'],
761                     segleft=2)
762         # create traffic stream pg0->pg1
763         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
764                                        self.pg_packet_sizes, count))
765
766         # packets with segments-left 1, active segment a3::
767         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
768                     dst_inner,
769                     sidlist=['a4::', 'a3::c4', 'a2::'],
770                     segleft=1)
771         # add to traffic stream pg0->pg1
772         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
773                                        self.pg_packet_sizes, count))
774
775         # send packets and verify received packets
776         # using same comparison function as End with PSP
777         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
778                                   self.compare_rx_tx_packet_End_PSP)
779
780         # assert nothing was received on the other interface (pg2)
781         self.pg2.assert_nothing_captured("mis-directed packet(s)")
782
783         # log the localsid counters
784         self.logger.info(self.vapi.cli("show sr localsid"))
785
786         # remove SRv6 localSIDs
787         localsid.remove_vpp_config()
788
789         # remove FIB entries
790         # done by tearDown
791
792         # cleanup interfaces
793         self.teardown_interfaces()
794
795     def test_SRv6_End_DX6(self):
796         """ Test SRv6 End.DX6 behavior.
797         """
798         # send traffic to one destination interface
799         # source and destination interfaces are IPv6 only
800         self.setup_interfaces(ipv6=[True, True])
801
802         # configure SRv6 localSID End.DX6 behavior
803         localsid = VppSRv6LocalSID(
804                         self, localsid={'addr': 'A3::C4'},
805                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX6,
806                         nh_addr4='0.0.0.0',
807                         nh_addr6=self.pg1.remote_ip6,
808                         end_psp=0,
809                         sw_if_index=self.pg1.sw_if_index,
810                         vlan_index=0,
811                         fib_table=0)
812         localsid.add_vpp_config()
813         # log the localsids
814         self.logger.debug(self.vapi.cli("show sr localsid"))
815
816         # create IPv6 packets with SRH (SL=0)
817         # send one packet per packet size
818         count = len(self.pg_packet_sizes)
819         dst_inner = 'a4::1234'  # inner header destination address
820         pkts = []
821
822         # packets with SRH, segments-left 0, active segment a3::c4
823         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
824                         dst_inner,
825                         sidlist=['a3::c4', 'a2::', 'a1::'],
826                         segleft=0)
827         # add to traffic stream pg0->pg1
828         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
829                                        self.pg_packet_sizes, count))
830
831         # packets without SRH, IPv6 in IPv6
832         # outer IPv6 dest addr is the localsid End.DX6
833         packet_header = self.create_packet_header_IPv6_IPv6(
834                                             dst_inner,
835                                             dst_outer='a3::c4')
836         # add to traffic stream pg0->pg1
837         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
838                                        self.pg_packet_sizes, count))
839
840         # send packets and verify received packets
841         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
842                                   self.compare_rx_tx_packet_End_DX6)
843
844         # log the localsid counters
845         self.logger.info(self.vapi.cli("show sr localsid"))
846
847         # remove SRv6 localSIDs
848         localsid.remove_vpp_config()
849
850         # cleanup interfaces
851         self.teardown_interfaces()
852
853     def test_SRv6_End_DT6(self):
854         """ Test SRv6 End.DT6 behavior.
855         """
856         # create three interfaces (1 source, 2 destinations)
857         # all interfaces are IPv6 only
858         # source interface in global FIB (0)
859         # destination interfaces in global and vrf
860         vrf_1 = 1
861         ipt = VppIpTable(self, vrf_1, is_ip6=True)
862         ipt.add_vpp_config()
863         self.setup_interfaces(ipv6=[True, True, True],
864                               ipv6_table_id=[0, 0, vrf_1])
865
866         # configure FIB entries
867         # a4::/64 is reachable
868         #     via pg1 in table 0 (global)
869         #     and via pg2 in table vrf_1
870         route0 = VppIpRoute(self, "a4::", 64,
871                             [VppRoutePath(self.pg1.remote_ip6,
872                                           self.pg1.sw_if_index,
873                                           nh_table_id=0)],
874                             table_id=0)
875         route0.add_vpp_config()
876         route1 = VppIpRoute(self, "a4::", 64,
877                             [VppRoutePath(self.pg2.remote_ip6,
878                                           self.pg2.sw_if_index,
879                                           nh_table_id=vrf_1)],
880                             table_id=vrf_1)
881         route1.add_vpp_config()
882         self.logger.debug(self.vapi.cli("show ip6 fib"))
883
884         # configure SRv6 localSID End.DT6 behavior
885         # Note:
886         # fib_table: where the localsid is installed
887         # sw_if_index: in T-variants of localsid this is the vrf table_id
888         localsid = VppSRv6LocalSID(
889                         self, localsid={'addr': 'A3::C4'},
890                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT6,
891                         nh_addr4='0.0.0.0',
892                         nh_addr6='::',
893                         end_psp=0,
894                         sw_if_index=vrf_1,
895                         vlan_index=0,
896                         fib_table=0)
897         localsid.add_vpp_config()
898         # log the localsids
899         self.logger.debug(self.vapi.cli("show sr localsid"))
900
901         # create IPv6 packets with SRH (SL=0)
902         # send one packet per packet size
903         count = len(self.pg_packet_sizes)
904         dst_inner = 'a4::1234'  # inner header destination address
905         pkts = []
906
907         # packets with SRH, segments-left 0, active segment a3::c4
908         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
909                         dst_inner,
910                         sidlist=['a3::c4', 'a2::', 'a1::'],
911                         segleft=0)
912         # add to traffic stream pg0->pg1
913         pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
914                                        self.pg_packet_sizes, count))
915
916         # packets without SRH, IPv6 in IPv6
917         # outer IPv6 dest addr is the localsid End.DT6
918         packet_header = self.create_packet_header_IPv6_IPv6(
919                                             dst_inner,
920                                             dst_outer='a3::c4')
921         # add to traffic stream pg0->pg1
922         pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
923                                        self.pg_packet_sizes, count))
924
925         # send packets and verify received packets
926         # using same comparison function as End.DX6
927         self.send_and_verify_pkts(self.pg0, pkts, self.pg2,
928                                   self.compare_rx_tx_packet_End_DX6)
929
930         # assert nothing was received on the other interface (pg2)
931         self.pg1.assert_nothing_captured("mis-directed packet(s)")
932
933         # log the localsid counters
934         self.logger.info(self.vapi.cli("show sr localsid"))
935
936         # remove SRv6 localSIDs
937         localsid.remove_vpp_config()
938
939         # remove FIB entries
940         # done by tearDown
941
942         # cleanup interfaces
943         self.teardown_interfaces()
944
945     def test_SRv6_End_DX4(self):
946         """ Test SRv6 End.DX4 behavior.
947         """
948         # send traffic to one destination interface
949         # source interface is IPv6 only
950         # destination interface is IPv4 only
951         self.setup_interfaces(ipv6=[True, False], ipv4=[False, True])
952
953         # configure SRv6 localSID End.DX4 behavior
954         localsid = VppSRv6LocalSID(
955                         self, localsid={'addr': 'A3::C4'},
956                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX4,
957                         nh_addr4=self.pg1.remote_ip4,
958                         nh_addr6='::',
959                         end_psp=0,
960                         sw_if_index=self.pg1.sw_if_index,
961                         vlan_index=0,
962                         fib_table=0)
963         localsid.add_vpp_config()
964         # log the localsids
965         self.logger.debug(self.vapi.cli("show sr localsid"))
966
967         # send one packet per packet size
968         count = len(self.pg_packet_sizes)
969         dst_inner = '4.1.1.123'  # inner header destination address
970         pkts = []
971
972         # packets with SRH, segments-left 0, active segment a3::c4
973         packet_header = self.create_packet_header_IPv6_SRH_IPv4(
974                         dst_inner,
975                         sidlist=['a3::c4', 'a2::', 'a1::'],
976                         segleft=0)
977         # add to traffic stream pg0->pg1
978         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
979                                        self.pg_packet_sizes, count))
980
981         # packets without SRH, IPv4 in IPv6
982         # outer IPv6 dest addr is the localsid End.DX4
983         packet_header = self.create_packet_header_IPv6_IPv4(
984                                             dst_inner,
985                                             dst_outer='a3::c4')
986         # add to traffic stream pg0->pg1
987         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
988                                        self.pg_packet_sizes, count))
989
990         # send packets and verify received packets
991         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
992                                   self.compare_rx_tx_packet_End_DX4)
993
994         # log the localsid counters
995         self.logger.info(self.vapi.cli("show sr localsid"))
996
997         # remove SRv6 localSIDs
998         localsid.remove_vpp_config()
999
1000         # cleanup interfaces
1001         self.teardown_interfaces()
1002
1003     def test_SRv6_End_DT4(self):
1004         """ Test SRv6 End.DT4 behavior.
1005         """
1006         # create three interfaces (1 source, 2 destinations)
1007         # source interface is IPv6-only
1008         # destination interfaces are IPv4 only
1009         # source interface in global FIB (0)
1010         # destination interfaces in global and vrf
1011         vrf_1 = 1
1012         ipt = VppIpTable(self, vrf_1)
1013         ipt.add_vpp_config()
1014         self.setup_interfaces(ipv6=[True, False, False],
1015                               ipv4=[False, True, True],
1016                               ipv6_table_id=[0, 0, 0],
1017                               ipv4_table_id=[0, 0, vrf_1])
1018
1019         # configure FIB entries
1020         # 4.1.1.0/24 is reachable
1021         #     via pg1 in table 0 (global)
1022         #     and via pg2 in table vrf_1
1023         route0 = VppIpRoute(self, "4.1.1.0", 24,
1024                             [VppRoutePath(self.pg1.remote_ip4,
1025                                           self.pg1.sw_if_index,
1026                                           nh_table_id=0)],
1027                             table_id=0)
1028         route0.add_vpp_config()
1029         route1 = VppIpRoute(self, "4.1.1.0", 24,
1030                             [VppRoutePath(self.pg2.remote_ip4,
1031                                           self.pg2.sw_if_index,
1032                                           nh_table_id=vrf_1)],
1033                             table_id=vrf_1)
1034         route1.add_vpp_config()
1035         self.logger.debug(self.vapi.cli("show ip fib"))
1036
1037         # configure SRv6 localSID End.DT6 behavior
1038         # Note:
1039         # fib_table: where the localsid is installed
1040         # sw_if_index: in T-variants of localsid: vrf table_id
1041         localsid = VppSRv6LocalSID(
1042                         self, localsid={'addr': 'A3::C4'},
1043                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT4,
1044                         nh_addr4='0.0.0.0',
1045                         nh_addr6='::',
1046                         end_psp=0,
1047                         sw_if_index=vrf_1,
1048                         vlan_index=0,
1049                         fib_table=0)
1050         localsid.add_vpp_config()
1051         # log the localsids
1052         self.logger.debug(self.vapi.cli("show sr localsid"))
1053
1054         # create IPv6 packets with SRH (SL=0)
1055         # send one packet per packet size
1056         count = len(self.pg_packet_sizes)
1057         dst_inner = '4.1.1.123'  # inner header destination address
1058         pkts = []
1059
1060         # packets with SRH, segments-left 0, active segment a3::c4
1061         packet_header = self.create_packet_header_IPv6_SRH_IPv4(
1062                         dst_inner,
1063                         sidlist=['a3::c4', 'a2::', 'a1::'],
1064                         segleft=0)
1065         # add to traffic stream pg0->pg1
1066         pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
1067                                        self.pg_packet_sizes, count))
1068
1069         # packets without SRH, IPv6 in IPv6
1070         # outer IPv6 dest addr is the localsid End.DX4
1071         packet_header = self.create_packet_header_IPv6_IPv4(
1072                                             dst_inner,
1073                                             dst_outer='a3::c4')
1074         # add to traffic stream pg0->pg1
1075         pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
1076                                        self.pg_packet_sizes, count))
1077
1078         # send packets and verify received packets
1079         # using same comparison function as End.DX4
1080         self.send_and_verify_pkts(self.pg0, pkts, self.pg2,
1081                                   self.compare_rx_tx_packet_End_DX4)
1082
1083         # assert nothing was received on the other interface (pg2)
1084         self.pg1.assert_nothing_captured("mis-directed packet(s)")
1085
1086         # log the localsid counters
1087         self.logger.info(self.vapi.cli("show sr localsid"))
1088
1089         # remove SRv6 localSIDs
1090         localsid.remove_vpp_config()
1091
1092         # remove FIB entries
1093         # done by tearDown
1094
1095         # cleanup interfaces
1096         self.teardown_interfaces()
1097
1098     def test_SRv6_End_DX2(self):
1099         """ Test SRv6 End.DX2 behavior.
1100         """
1101         # send traffic to one destination interface
1102         # source interface is IPv6 only
1103         self.setup_interfaces(ipv6=[True, False], ipv4=[False, False])
1104
1105         # configure SRv6 localSID End.DX2 behavior
1106         localsid = VppSRv6LocalSID(
1107                         self, localsid={'addr': 'A3::C4'},
1108                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX2,
1109                         nh_addr4='0.0.0.0',
1110                         nh_addr6='::',
1111                         end_psp=0,
1112                         sw_if_index=self.pg1.sw_if_index,
1113                         vlan_index=0,
1114                         fib_table=0)
1115         localsid.add_vpp_config()
1116         # log the localsids
1117         self.logger.debug(self.vapi.cli("show sr localsid"))
1118
1119         # send one packet per packet size
1120         count = len(self.pg_packet_sizes)
1121         pkts = []
1122
1123         # packets with SRH, segments-left 0, active segment a3::c4
1124         # L2 has no dot1q header
1125         packet_header = self.create_packet_header_IPv6_SRH_L2(
1126                             sidlist=['a3::c4', 'a2::', 'a1::'],
1127                             segleft=0,
1128                             vlan=0)
1129         # add to traffic stream pg0->pg1
1130         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1131                                        self.pg_packet_sizes, count))
1132
1133         # packets with SRH, segments-left 0, active segment a3::c4
1134         # L2 has dot1q header
1135         packet_header = self.create_packet_header_IPv6_SRH_L2(
1136                             sidlist=['a3::c4', 'a2::', 'a1::'],
1137                             segleft=0,
1138                             vlan=123)
1139         # add to traffic stream pg0->pg1
1140         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1141                                        self.pg_packet_sizes, count))
1142
1143         # packets without SRH, L2 in IPv6
1144         # outer IPv6 dest addr is the localsid End.DX2
1145         # L2 has no dot1q header
1146         packet_header = self.create_packet_header_IPv6_L2(
1147                                             dst_outer='a3::c4',
1148                                             vlan=0)
1149         # add to traffic stream pg0->pg1
1150         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1151                                        self.pg_packet_sizes, count))
1152
1153         # packets without SRH, L2 in IPv6
1154         # outer IPv6 dest addr is the localsid End.DX2
1155         # L2 has dot1q header
1156         packet_header = self.create_packet_header_IPv6_L2(
1157                                             dst_outer='a3::c4',
1158                                             vlan=123)
1159         # add to traffic stream pg0->pg1
1160         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1161                                        self.pg_packet_sizes, count))
1162
1163         # send packets and verify received packets
1164         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
1165                                   self.compare_rx_tx_packet_End_DX2)
1166
1167         # log the localsid counters
1168         self.logger.info(self.vapi.cli("show sr localsid"))
1169
1170         # remove SRv6 localSIDs
1171         localsid.remove_vpp_config()
1172
1173         # cleanup interfaces
1174         self.teardown_interfaces()
1175
1176     @unittest.skipUnless(0, "PC to fix")
1177     def test_SRv6_T_Insert_Classifier(self):
1178         """ Test SRv6 Transit.Insert behavior (IPv6 only).
1179             steer packets using the classifier
1180         """
1181         # send traffic to one destination interface
1182         # source and destination are IPv6 only
1183         self.setup_interfaces(ipv6=[False, False, False, True, True])
1184
1185         # configure FIB entries
1186         route = VppIpRoute(self, "a4::", 64,
1187                            [VppRoutePath(
1188                                self.pg4.remote_ip6,
1189                                self.pg4.sw_if_index)])
1190         route.add_vpp_config()
1191
1192         # configure encaps IPv6 source address
1193         # needs to be done before SR Policy config
1194         # TODO: API?
1195         self.vapi.cli("set sr encaps source addr a3::")
1196
1197         bsid = 'a3::9999:1'
1198         # configure SRv6 Policy
1199         # Note: segment list order: first -> last
1200         sr_policy = VppSRv6Policy(
1201             self, bsid=bsid,
1202             is_encap=0,
1203             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
1204             weight=1, fib_table=0,
1205             segments=['a4::', 'a5::', 'a6::c7'],
1206             source='a3::')
1207         sr_policy.add_vpp_config()
1208         self.sr_policy = sr_policy
1209
1210         # log the sr policies
1211         self.logger.info(self.vapi.cli("show sr policies"))
1212
1213         # add classify table
1214         # mask on dst ip address prefix a7::/8
1215         mask = '{!s:0<16}'.format('ff')
1216         r = self.vapi.classify_add_del_table(
1217             1,
1218             binascii.unhexlify(mask),
1219             match_n_vectors=(len(mask) - 1) // 32 + 1,
1220             current_data_flag=1,
1221             skip_n_vectors=2)  # data offset
1222         self.assertIsNotNone(r, 'No response msg for add_del_table')
1223         table_index = r.new_table_index
1224
1225         # add the source routing node as a ip6 inacl netxt node
1226         r = self.vapi.add_node_next('ip6-inacl',
1227                                     'sr-pl-rewrite-insert')
1228         inacl_next_node_index = r.node_index
1229
1230         match = '{!s:0<16}'.format('a7')
1231         r = self.vapi.classify_add_del_session(
1232             1,
1233             table_index,
1234             binascii.unhexlify(match),
1235             hit_next_index=inacl_next_node_index,
1236             action=3,
1237             metadata=0)  # sr policy index
1238         self.assertIsNotNone(r, 'No response msg for add_del_session')
1239
1240         # log the classify table used in the steering policy
1241         self.logger.info(self.vapi.cli("show classify table"))
1242
1243         r = self.vapi.input_acl_set_interface(
1244             is_add=1,
1245             sw_if_index=self.pg3.sw_if_index,
1246             ip6_table_index=table_index)
1247         self.assertIsNotNone(r,
1248                              'No response msg for input_acl_set_interface')
1249
1250         # log the ip6 inacl
1251         self.logger.info(self.vapi.cli("show inacl type ip6"))
1252
1253         # create packets
1254         count = len(self.pg_packet_sizes)
1255         dst_inner = 'a7::1234'
1256         pkts = []
1257
1258         # create IPv6 packets without SRH
1259         packet_header = self.create_packet_header_IPv6(dst_inner)
1260         # create traffic stream pg3->pg4
1261         pkts.extend(self.create_stream(self.pg3, self.pg4, packet_header,
1262                                        self.pg_packet_sizes, count))
1263
1264         # create IPv6 packets with SRH
1265         # packets with segments-left 1, active segment a7::
1266         packet_header = self.create_packet_header_IPv6_SRH(
1267             sidlist=['a8::', 'a7::', 'a6::'],
1268             segleft=1)
1269         # create traffic stream pg3->pg4
1270         pkts.extend(self.create_stream(self.pg3, self.pg4, packet_header,
1271                                        self.pg_packet_sizes, count))
1272
1273         # send packets and verify received packets
1274         self.send_and_verify_pkts(self.pg3, pkts, self.pg4,
1275                                   self.compare_rx_tx_packet_T_Insert)
1276
1277         # remove the interface l2 input feature
1278         r = self.vapi.input_acl_set_interface(
1279             is_add=0,
1280             sw_if_index=self.pg3.sw_if_index,
1281             ip6_table_index=table_index)
1282         self.assertIsNotNone(r,
1283                              'No response msg for input_acl_set_interface')
1284
1285         # log the ip6 inacl after cleaning
1286         self.logger.info(self.vapi.cli("show inacl type ip6"))
1287
1288         # log the localsid counters
1289         self.logger.info(self.vapi.cli("show sr localsid"))
1290
1291         # remove classifier SR steering
1292         # classifier_steering.remove_vpp_config()
1293         self.logger.info(self.vapi.cli("show sr steering policies"))
1294
1295         # remove SR Policies
1296         self.sr_policy.remove_vpp_config()
1297         self.logger.info(self.vapi.cli("show sr policies"))
1298
1299         # remove classify session and table
1300         r = self.vapi.classify_add_del_session(
1301             0,
1302             table_index,
1303             binascii.unhexlify(match))
1304         self.assertIsNotNone(r, 'No response msg for add_del_session')
1305
1306         r = self.vapi.classify_add_del_table(
1307             0,
1308             binascii.unhexlify(mask),
1309             table_index=table_index)
1310         self.assertIsNotNone(r, 'No response msg for add_del_table')
1311
1312         self.logger.info(self.vapi.cli("show classify table"))
1313
1314         # remove FIB entries
1315         # done by tearDown
1316
1317         # cleanup interfaces
1318         self.teardown_interfaces()
1319
1320     def compare_rx_tx_packet_T_Encaps(self, tx_pkt, rx_pkt):
1321         """ Compare input and output packet after passing T.Encaps
1322
1323         :param tx_pkt: transmitted packet
1324         :param rx_pkt: received packet
1325         """
1326         # T.Encaps updates the headers as follows:
1327         # SR Policy seglist (S3, S2, S1)
1328         # SR Policy source C
1329         # IPv6:
1330         # in: IPv6(A, B2)
1331         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(A, B2)
1332         # IPv6 + SRH:
1333         # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1)
1334         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(a, B2)SRH(B3, B2, B1; SL=1)
1335
1336         # get first (outer) IPv6 header of rx'ed packet
1337         rx_ip = rx_pkt.getlayer(IPv6)
1338         rx_srh = None
1339
1340         tx_ip = tx_pkt.getlayer(IPv6)
1341
1342         # expected segment-list
1343         seglist = self.sr_policy.segments
1344         # reverse list to get order as in SRH
1345         tx_seglist = seglist[::-1]
1346
1347         # get source address of SR Policy
1348         sr_policy_source = self.sr_policy.source
1349
1350         # rx'ed packet should have SRH
1351         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1352         # get SRH
1353         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1354
1355         # received ip.src should be equal to SR Policy source
1356         self.assertEqual(rx_ip.src, sr_policy_source)
1357         # received ip.dst should be equal to expected sidlist[lastentry]
1358         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1359         # rx'ed seglist should be equal to expected seglist
1360         self.assertEqual(rx_srh.addresses, tx_seglist)
1361         # segleft should be equal to size expected seglist-1
1362         self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1363         # segleft should be equal to lastentry
1364         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1365
1366         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1367         # except for the hop-limit field
1368         #   -> update tx'ed hlim to the expected hlim
1369         tx_ip.hlim = tx_ip.hlim - 1
1370
1371         self.assertEqual(rx_srh.payload, tx_ip)
1372
1373         self.logger.debug("packet verification: SUCCESS")
1374
1375     def compare_rx_tx_packet_T_Encaps_IPv4(self, tx_pkt, rx_pkt):
1376         """ Compare input and output packet after passing T.Encaps for IPv4
1377
1378         :param tx_pkt: transmitted packet
1379         :param rx_pkt: received packet
1380         """
1381         # T.Encaps for IPv4 updates the headers as follows:
1382         # SR Policy seglist (S3, S2, S1)
1383         # SR Policy source C
1384         # IPv4:
1385         # in: IPv4(A, B2)
1386         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv4(A, B2)
1387
1388         # get first (outer) IPv6 header of rx'ed packet
1389         rx_ip = rx_pkt.getlayer(IPv6)
1390         rx_srh = None
1391
1392         tx_ip = tx_pkt.getlayer(IP)
1393
1394         # expected segment-list
1395         seglist = self.sr_policy.segments
1396         # reverse list to get order as in SRH
1397         tx_seglist = seglist[::-1]
1398
1399         # get source address of SR Policy
1400         sr_policy_source = self.sr_policy.source
1401
1402         # checks common to cases tx with and without SRH
1403         # rx'ed packet should have SRH and IPv4 header
1404         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1405         self.assertTrue(rx_ip.payload.haslayer(IP))
1406         # get SRH
1407         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1408
1409         # received ip.src should be equal to SR Policy source
1410         self.assertEqual(rx_ip.src, sr_policy_source)
1411         # received ip.dst should be equal to sidlist[lastentry]
1412         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1413         # rx'ed seglist should be equal to seglist
1414         self.assertEqual(rx_srh.addresses, tx_seglist)
1415         # segleft should be equal to size seglist-1
1416         self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1417         # segleft should be equal to lastentry
1418         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1419
1420         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1421         # except for the ttl field and ip checksum
1422         #   -> adjust tx'ed ttl to expected ttl
1423         tx_ip.ttl = tx_ip.ttl - 1
1424         #   -> set tx'ed ip checksum to None and let scapy recompute
1425         tx_ip.chksum = None
1426         # read back the pkt (with str()) to force computing these fields
1427         # probably other ways to accomplish this are possible
1428         tx_ip = IP(scapy.compat.raw(tx_ip))
1429
1430         self.assertEqual(rx_srh.payload, tx_ip)
1431
1432         self.logger.debug("packet verification: SUCCESS")
1433
1434     def compare_rx_tx_packet_T_Encaps_L2(self, tx_pkt, rx_pkt):
1435         """ Compare input and output packet after passing T.Encaps for L2
1436
1437         :param tx_pkt: transmitted packet
1438         :param rx_pkt: received packet
1439         """
1440         # T.Encaps for L2 updates the headers as follows:
1441         # SR Policy seglist (S3, S2, S1)
1442         # SR Policy source C
1443         # L2:
1444         # in: L2
1445         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)L2
1446
1447         # get first (outer) IPv6 header of rx'ed packet
1448         rx_ip = rx_pkt.getlayer(IPv6)
1449         rx_srh = None
1450
1451         tx_ether = tx_pkt.getlayer(Ether)
1452
1453         # expected segment-list
1454         seglist = self.sr_policy.segments
1455         # reverse list to get order as in SRH
1456         tx_seglist = seglist[::-1]
1457
1458         # get source address of SR Policy
1459         sr_policy_source = self.sr_policy.source
1460
1461         # rx'ed packet should have SRH
1462         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1463         # get SRH
1464         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1465
1466         # received ip.src should be equal to SR Policy source
1467         self.assertEqual(rx_ip.src, sr_policy_source)
1468         # received ip.dst should be equal to sidlist[lastentry]
1469         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1470         # rx'ed seglist should be equal to seglist
1471         self.assertEqual(rx_srh.addresses, tx_seglist)
1472         # segleft should be equal to size seglist-1
1473         self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1474         # segleft should be equal to lastentry
1475         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1476         # nh should be "No Next Header" (59)
1477         self.assertEqual(rx_srh.nh, 59)
1478
1479         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1480         self.assertEqual(Ether(scapy.compat.raw(rx_srh.payload)), tx_ether)
1481
1482         self.logger.debug("packet verification: SUCCESS")
1483
1484     def compare_rx_tx_packet_T_Insert(self, tx_pkt, rx_pkt):
1485         """ Compare input and output packet after passing T.Insert
1486
1487         :param tx_pkt: transmitted packet
1488         :param rx_pkt: received packet
1489         """
1490         # T.Insert updates the headers as follows:
1491         # IPv6:
1492         # in: IPv6(A, B2)
1493         # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)
1494         # IPv6 + SRH:
1495         # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1)
1496         # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)SRH(B3, B2, B1; SL=1)
1497
1498         # get first (outer) IPv6 header of rx'ed packet
1499         rx_ip = rx_pkt.getlayer(IPv6)
1500         rx_srh = None
1501         rx_ip2 = None
1502         rx_srh2 = None
1503         rx_ip3 = None
1504         rx_udp = rx_pkt[UDP]
1505
1506         tx_ip = tx_pkt.getlayer(IPv6)
1507         tx_srh = None
1508         tx_ip2 = None
1509         # some packets have been tx'ed with an SRH, some without it
1510         # get SRH if tx'ed packet has it
1511         if tx_pkt.haslayer(IPv6ExtHdrSegmentRouting):
1512             tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1513             tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1514         tx_udp = tx_pkt[UDP]
1515
1516         # expected segment-list (make copy of SR Policy segment list)
1517         seglist = self.sr_policy.segments[:]
1518         # expected seglist has initial dest addr as last segment
1519         seglist.append(tx_ip.dst)
1520         # reverse list to get order as in SRH
1521         tx_seglist = seglist[::-1]
1522
1523         # get source address of SR Policy
1524         sr_policy_source = self.sr_policy.source
1525
1526         # checks common to cases tx with and without SRH
1527         # rx'ed packet should have SRH and only one IPv6 header
1528         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1529         self.assertFalse(rx_ip.payload.haslayer(IPv6))
1530         # get SRH
1531         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1532
1533         # rx'ed ip.src should be equal to tx'ed ip.src
1534         self.assertEqual(rx_ip.src, tx_ip.src)
1535         # rx'ed ip.dst should be equal to sidlist[lastentry]
1536         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1537
1538         # rx'ed seglist should be equal to expected seglist
1539         self.assertEqual(rx_srh.addresses, tx_seglist)
1540         # segleft should be equal to size(expected seglist)-1
1541         self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1542         # segleft should be equal to lastentry
1543         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1544
1545         if tx_srh:  # packet was tx'ed with SRH
1546             # packet should have 2nd SRH
1547             self.assertTrue(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting))
1548             # get 2nd SRH
1549             rx_srh2 = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting, 2)
1550
1551             # rx'ed srh2.addresses should be equal to tx'ed srh.addresses
1552             self.assertEqual(rx_srh2.addresses, tx_srh.addresses)
1553             # rx'ed srh2.segleft should be equal to tx'ed srh.segleft
1554             self.assertEqual(rx_srh2.segleft, tx_srh.segleft)
1555             # rx'ed srh2.lastentry should be equal to tx'ed srh.lastentry
1556             self.assertEqual(rx_srh2.lastentry, tx_srh.lastentry)
1557
1558         else:  # packet was tx'ed without SRH
1559             # rx packet should have no other SRH
1560             self.assertFalse(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting))
1561
1562         # UDP layer should be unchanged
1563         self.assertEqual(rx_udp, tx_udp)
1564
1565         self.logger.debug("packet verification: SUCCESS")
1566
1567     def compare_rx_tx_packet_End(self, tx_pkt, rx_pkt):
1568         """ Compare input and output packet after passing End (without PSP)
1569
1570         :param tx_pkt: transmitted packet
1571         :param rx_pkt: received packet
1572         """
1573         # End (no PSP) updates the headers as follows:
1574         # IPv6 + SRH:
1575         # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2)
1576         # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1577
1578         # get first (outer) IPv6 header of rx'ed packet
1579         rx_ip = rx_pkt.getlayer(IPv6)
1580         rx_srh = None
1581         rx_ip2 = None
1582         rx_udp = rx_pkt[UDP]
1583
1584         tx_ip = tx_pkt.getlayer(IPv6)
1585         # we know the packet has been tx'ed
1586         # with an inner IPv6 header and an SRH
1587         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1588         tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1589         tx_udp = tx_pkt[UDP]
1590
1591         # common checks, regardless of tx segleft value
1592         # rx'ed packet should have 2nd IPv6 header
1593         self.assertTrue(rx_ip.payload.haslayer(IPv6))
1594         # get second (inner) IPv6 header
1595         rx_ip2 = rx_pkt.getlayer(IPv6, 2)
1596
1597         if tx_ip.segleft > 0:
1598             # SRH should NOT have been popped:
1599             #   End SID without PSP does not pop SRH if segleft>0
1600             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1601             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1602
1603             # received ip.src should be equal to expected ip.src
1604             self.assertEqual(rx_ip.src, tx_ip.src)
1605             # sidlist should be unchanged
1606             self.assertEqual(rx_srh.addresses, tx_srh.addresses)
1607             # segleft should have been decremented
1608             self.assertEqual(rx_srh.segleft, tx_srh.segleft-1)
1609             # received ip.dst should be equal to sidlist[segleft]
1610             self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
1611             # lastentry should be unchanged
1612             self.assertEqual(rx_srh.lastentry, tx_srh.lastentry)
1613             # inner IPv6 packet (ip2) should be unchanged
1614             self.assertEqual(rx_ip2.src, tx_ip2.src)
1615             self.assertEqual(rx_ip2.dst, tx_ip2.dst)
1616         # else:  # tx_ip.segleft == 0
1617             # TODO: Does this work with 2 SRHs in ingress packet?
1618
1619         # UDP layer should be unchanged
1620         self.assertEqual(rx_udp, tx_udp)
1621
1622         self.logger.debug("packet verification: SUCCESS")
1623
1624     def compare_rx_tx_packet_End_PSP(self, tx_pkt, rx_pkt):
1625         """ Compare input and output packet after passing End with PSP
1626
1627         :param tx_pkt: transmitted packet
1628         :param rx_pkt: received packet
1629         """
1630         # End (PSP) updates the headers as follows:
1631         # IPv6 + SRH (SL>1):
1632         # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2)
1633         # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1634         # IPv6 + SRH (SL=1):
1635         # in: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1636         # out: IPv6(A, S3)
1637
1638         # get first (outer) IPv6 header of rx'ed packet
1639         rx_ip = rx_pkt.getlayer(IPv6)
1640         rx_srh = None
1641         rx_ip2 = None
1642         rx_udp = rx_pkt[UDP]
1643
1644         tx_ip = tx_pkt.getlayer(IPv6)
1645         # we know the packet has been tx'ed
1646         # with an inner IPv6 header and an SRH
1647         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1648         tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1649         tx_udp = tx_pkt[UDP]
1650
1651         # common checks, regardless of tx segleft value
1652         self.assertTrue(rx_ip.payload.haslayer(IPv6))
1653         rx_ip2 = rx_pkt.getlayer(IPv6, 2)
1654         # inner IPv6 packet (ip2) should be unchanged
1655         self.assertEqual(rx_ip2.src, tx_ip2.src)
1656         self.assertEqual(rx_ip2.dst, tx_ip2.dst)
1657
1658         if tx_ip.segleft > 1:
1659             # SRH should NOT have been popped:
1660             #   End SID with PSP does not pop SRH if segleft>1
1661             # rx'ed packet should have SRH
1662             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1663             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1664
1665             # received ip.src should be equal to expected ip.src
1666             self.assertEqual(rx_ip.src, tx_ip.src)
1667             # sidlist should be unchanged
1668             self.assertEqual(rx_srh.addresses, tx_srh.addresses)
1669             # segleft should have been decremented
1670             self.assertEqual(rx_srh.segleft, tx_srh.segleft-1)
1671             # received ip.dst should be equal to sidlist[segleft]
1672             self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
1673             # lastentry should be unchanged
1674             self.assertEqual(rx_srh.lastentry, tx_srh.lastentry)
1675
1676         else:  # tx_ip.segleft <= 1
1677             # SRH should have been popped:
1678             #   End SID with PSP and segleft=1 pops SRH
1679             # the two IPv6 headers are still present
1680             # outer IPv6 header has DA == last segment of popped SRH
1681             # SRH should not be present
1682             self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1683             # outer IPv6 header ip.src should be equal to tx'ed ip.src
1684             self.assertEqual(rx_ip.src, tx_ip.src)
1685             # outer IPv6 header ip.dst should be = to tx'ed sidlist[segleft-1]
1686             self.assertEqual(rx_ip.dst, tx_srh.addresses[tx_srh.segleft-1])
1687
1688         # UDP layer should be unchanged
1689         self.assertEqual(rx_udp, tx_udp)
1690
1691         self.logger.debug("packet verification: SUCCESS")
1692
1693     def compare_rx_tx_packet_End_DX6(self, tx_pkt, rx_pkt):
1694         """ Compare input and output packet after passing End.DX6
1695
1696         :param tx_pkt: transmitted packet
1697         :param rx_pkt: received packet
1698         """
1699         # End.DX6 updates the headers as follows:
1700         # IPv6 + SRH (SL=0):
1701         # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv6(B, D)
1702         # out: IPv6(B, D)
1703         # IPv6:
1704         # in: IPv6(A, S3)IPv6(B, D)
1705         # out: IPv6(B, D)
1706
1707         # get first (outer) IPv6 header of rx'ed packet
1708         rx_ip = rx_pkt.getlayer(IPv6)
1709
1710         tx_ip = tx_pkt.getlayer(IPv6)
1711         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1712
1713         # verify if rx'ed packet has no SRH
1714         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1715
1716         # the whole rx_ip pkt should be equal to tx_ip2
1717         # except for the hlim field
1718         #   -> adjust tx'ed hlim to expected hlim
1719         tx_ip2.hlim = tx_ip2.hlim - 1
1720
1721         self.assertEqual(rx_ip, tx_ip2)
1722
1723         self.logger.debug("packet verification: SUCCESS")
1724
1725     def compare_rx_tx_packet_End_DX4(self, tx_pkt, rx_pkt):
1726         """ Compare input and output packet after passing End.DX4
1727
1728         :param tx_pkt: transmitted packet
1729         :param rx_pkt: received packet
1730         """
1731         # End.DX4 updates the headers as follows:
1732         # IPv6 + SRH (SL=0):
1733         # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv4(B, D)
1734         # out: IPv4(B, D)
1735         # IPv6:
1736         # in: IPv6(A, S3)IPv4(B, D)
1737         # out: IPv4(B, D)
1738
1739         # get IPv4 header of rx'ed packet
1740         rx_ip = rx_pkt.getlayer(IP)
1741
1742         tx_ip = tx_pkt.getlayer(IPv6)
1743         tx_ip2 = tx_pkt.getlayer(IP)
1744
1745         # verify if rx'ed packet has no SRH
1746         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1747
1748         # the whole rx_ip pkt should be equal to tx_ip2
1749         # except for the ttl field and ip checksum
1750         #   -> adjust tx'ed ttl to expected ttl
1751         tx_ip2.ttl = tx_ip2.ttl - 1
1752         #   -> set tx'ed ip checksum to None and let scapy recompute
1753         tx_ip2.chksum = None
1754         # read back the pkt (with str()) to force computing these fields
1755         # probably other ways to accomplish this are possible
1756         tx_ip2 = IP(scapy.compat.raw(tx_ip2))
1757
1758         self.assertEqual(rx_ip, tx_ip2)
1759
1760         self.logger.debug("packet verification: SUCCESS")
1761
1762     def compare_rx_tx_packet_End_DX2(self, tx_pkt, rx_pkt):
1763         """ Compare input and output packet after passing End.DX2
1764
1765         :param tx_pkt: transmitted packet
1766         :param rx_pkt: received packet
1767         """
1768         # End.DX2 updates the headers as follows:
1769         # IPv6 + SRH (SL=0):
1770         # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)L2
1771         # out: L2
1772         # IPv6:
1773         # in: IPv6(A, S3)L2
1774         # out: L2
1775
1776         # get IPv4 header of rx'ed packet
1777         rx_eth = rx_pkt.getlayer(Ether)
1778
1779         tx_ip = tx_pkt.getlayer(IPv6)
1780         # we can't just get the 2nd Ether layer
1781         # get the Raw content and dissect it as Ether
1782         tx_eth1 = Ether(scapy.compat.raw(tx_pkt[Raw]))
1783
1784         # verify if rx'ed packet has no SRH
1785         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1786
1787         # the whole rx_eth pkt should be equal to tx_eth1
1788         self.assertEqual(rx_eth, tx_eth1)
1789
1790         self.logger.debug("packet verification: SUCCESS")
1791
1792     def create_stream(self, src_if, dst_if, packet_header, packet_sizes,
1793                       count):
1794         """Create SRv6 input packet stream for defined interface.
1795
1796         :param VppInterface src_if: Interface to create packet stream for
1797         :param VppInterface dst_if: destination interface of packet stream
1798         :param packet_header: Layer3 scapy packet headers,
1799                 L2 is added when not provided,
1800                 Raw(payload) with packet_info is added
1801         :param list packet_sizes: packet stream pckt sizes,sequentially applied
1802                to packets in stream have
1803         :param int count: number of packets in packet stream
1804         :return: list of packets
1805         """
1806         self.logger.info("Creating packets")
1807         pkts = []
1808         for i in range(0, count-1):
1809             payload_info = self.create_packet_info(src_if, dst_if)
1810             self.logger.debug(
1811                 "Creating packet with index %d" % (payload_info.index))
1812             payload = self.info_to_payload(payload_info)
1813             # add L2 header if not yet provided in packet_header
1814             if packet_header.getlayer(0).name == 'Ethernet':
1815                 p = (packet_header /
1816                      Raw(payload))
1817             else:
1818                 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
1819                      packet_header /
1820                      Raw(payload))
1821             size = packet_sizes[i % len(packet_sizes)]
1822             self.logger.debug("Packet size %d" % (size))
1823             self.extend_packet(p, size)
1824             # we need to store the packet with the automatic fields computed
1825             # read back the dumped packet (with str())
1826             # to force computing these fields
1827             # probably other ways are possible
1828             p = Ether(scapy.compat.raw(p))
1829             payload_info.data = p.copy()
1830             self.logger.debug(ppp("Created packet:", p))
1831             pkts.append(p)
1832         self.logger.info("Done creating packets")
1833         return pkts
1834
1835     def send_and_verify_pkts(self, input, pkts, output, compare_func,
1836                              expected_count=None):
1837         """Send packets and verify received packets using compare_func
1838
1839         :param input: ingress interface of DUT
1840         :param pkts: list of packets to transmit
1841         :param output: egress interface of DUT
1842         :param compare_func: function to compare in and out packets
1843         :param expected_count: expected number of captured packets (if
1844                different than len(pkts))
1845         """
1846         # add traffic stream to input interface
1847         input.add_stream(pkts)
1848
1849         # enable capture on all interfaces
1850         self.pg_enable_capture(self.pg_interfaces)
1851
1852         # start traffic
1853         self.logger.info("Starting traffic")
1854         self.pg_start()
1855
1856         # get output capture
1857         self.logger.info("Getting packet capture")
1858         capture = output.get_capture(expected_count=expected_count)
1859
1860         # assert nothing was captured on input interface
1861         input.assert_nothing_captured()
1862
1863         # verify captured packets
1864         self.verify_captured_pkts(output, capture, compare_func)
1865
1866     def create_packet_header_IPv6(self, dst):
1867         """Create packet header: IPv6 header, UDP header
1868
1869         :param dst: IPv6 destination address
1870
1871         IPv6 source address is 1234::1
1872         UDP source port and destination port are 1234
1873         """
1874
1875         p = (IPv6(src='1234::1', dst=dst) /
1876              UDP(sport=1234, dport=1234))
1877         return p
1878
1879     def create_packet_header_IPv6_SRH(self, sidlist, segleft):
1880         """Create packet header: IPv6 header with SRH, UDP header
1881
1882         :param list sidlist: segment list
1883         :param int segleft: segments-left field value
1884
1885         IPv6 destination address is set to sidlist[segleft]
1886         IPv6 source addresses are 1234::1 and 4321::1
1887         UDP source port and destination port are 1234
1888         """
1889
1890         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1891              IPv6ExtHdrSegmentRouting(addresses=sidlist) /
1892              UDP(sport=1234, dport=1234))
1893         return p
1894
1895     def create_packet_header_IPv6_SRH_IPv6(self, dst, sidlist, segleft):
1896         """Create packet header: IPv6 encapsulated in SRv6:
1897         IPv6 header with SRH, IPv6 header, UDP header
1898
1899         :param ipv6address dst: inner IPv6 destination address
1900         :param list sidlist: segment list of outer IPv6 SRH
1901         :param int segleft: segments-left field of outer IPv6 SRH
1902
1903         Outer IPv6 destination address is set to sidlist[segleft]
1904         IPv6 source addresses are 1234::1 and 4321::1
1905         UDP source port and destination port are 1234
1906         """
1907
1908         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1909              IPv6ExtHdrSegmentRouting(addresses=sidlist,
1910                                       segleft=segleft, nh=41) /
1911              IPv6(src='4321::1', dst=dst) /
1912              UDP(sport=1234, dport=1234))
1913         return p
1914
1915     def create_packet_header_IPv6_IPv6(self, dst_inner, dst_outer):
1916         """Create packet header: IPv6 encapsulated in IPv6:
1917         IPv6 header, IPv6 header, UDP header
1918
1919         :param ipv6address dst_inner: inner IPv6 destination address
1920         :param ipv6address dst_outer: outer IPv6 destination address
1921
1922         IPv6 source addresses are 1234::1 and 4321::1
1923         UDP source port and destination port are 1234
1924         """
1925
1926         p = (IPv6(src='1234::1', dst=dst_outer) /
1927              IPv6(src='4321::1', dst=dst_inner) /
1928              UDP(sport=1234, dport=1234))
1929         return p
1930
1931     def create_packet_header_IPv6_SRH_SRH_IPv6(self, dst, sidlist1, segleft1,
1932                                                sidlist2, segleft2):
1933         """Create packet header: IPv6 encapsulated in SRv6 with 2 SRH:
1934         IPv6 header with SRH, 2nd SRH, IPv6 header, UDP header
1935
1936         :param ipv6address dst: inner IPv6 destination address
1937         :param list sidlist1: segment list of outer IPv6 SRH
1938         :param int segleft1: segments-left field of outer IPv6 SRH
1939         :param list sidlist2: segment list of inner IPv6 SRH
1940         :param int segleft2: segments-left field of inner IPv6 SRH
1941
1942         Outer IPv6 destination address is set to sidlist[segleft]
1943         IPv6 source addresses are 1234::1 and 4321::1
1944         UDP source port and destination port are 1234
1945         """
1946
1947         p = (IPv6(src='1234::1', dst=sidlist1[segleft1]) /
1948              IPv6ExtHdrSegmentRouting(addresses=sidlist1,
1949                                       segleft=segleft1, nh=43) /
1950              IPv6ExtHdrSegmentRouting(addresses=sidlist2,
1951                                       segleft=segleft2, nh=41) /
1952              IPv6(src='4321::1', dst=dst) /
1953              UDP(sport=1234, dport=1234))
1954         return p
1955
1956     def create_packet_header_IPv4(self, dst):
1957         """Create packet header: IPv4 header, UDP header
1958
1959         :param dst: IPv4 destination address
1960
1961         IPv4 source address is 123.1.1.1
1962         UDP source port and destination port are 1234
1963         """
1964
1965         p = (IP(src='123.1.1.1', dst=dst) /
1966              UDP(sport=1234, dport=1234))
1967         return p
1968
1969     def create_packet_header_IPv6_IPv4(self, dst_inner, dst_outer):
1970         """Create packet header: IPv4 encapsulated in IPv6:
1971         IPv6 header, IPv4 header, UDP header
1972
1973         :param ipv4address dst_inner: inner IPv4 destination address
1974         :param ipv6address dst_outer: outer IPv6 destination address
1975
1976         IPv6 source address is 1234::1
1977         IPv4 source address is 123.1.1.1
1978         UDP source port and destination port are 1234
1979         """
1980
1981         p = (IPv6(src='1234::1', dst=dst_outer) /
1982              IP(src='123.1.1.1', dst=dst_inner) /
1983              UDP(sport=1234, dport=1234))
1984         return p
1985
1986     def create_packet_header_IPv6_SRH_IPv4(self, dst, sidlist, segleft):
1987         """Create packet header: IPv4 encapsulated in SRv6:
1988         IPv6 header with SRH, IPv4 header, UDP header
1989
1990         :param ipv4address dst: inner IPv4 destination address
1991         :param list sidlist: segment list of outer IPv6 SRH
1992         :param int segleft: segments-left field of outer IPv6 SRH
1993
1994         Outer IPv6 destination address is set to sidlist[segleft]
1995         IPv6 source address is 1234::1
1996         IPv4 source address is 123.1.1.1
1997         UDP source port and destination port are 1234
1998         """
1999
2000         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
2001              IPv6ExtHdrSegmentRouting(addresses=sidlist,
2002                                       segleft=segleft, nh=4) /
2003              IP(src='123.1.1.1', dst=dst) /
2004              UDP(sport=1234, dport=1234))
2005         return p
2006
2007     def create_packet_header_L2(self, vlan=0):
2008         """Create packet header: L2 header
2009
2010         :param vlan: if vlan!=0 then add 802.1q header
2011         """
2012         # Note: the dst addr ('00:55:44:33:22:11') is used in
2013         # the compare function compare_rx_tx_packet_T_Encaps_L2
2014         # to detect presence of L2 in SRH payload
2015         p = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
2016         etype = 0x8137  # IPX
2017         if vlan:
2018             # add 802.1q layer
2019             p /= Dot1Q(vlan=vlan, type=etype)
2020         else:
2021             p.type = etype
2022         return p
2023
2024     def create_packet_header_IPv6_SRH_L2(self, sidlist, segleft, vlan=0):
2025         """Create packet header: L2 encapsulated in SRv6:
2026         IPv6 header with SRH, L2
2027
2028         :param list sidlist: segment list of outer IPv6 SRH
2029         :param int segleft: segments-left field of outer IPv6 SRH
2030         :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
2031
2032         Outer IPv6 destination address is set to sidlist[segleft]
2033         IPv6 source address is 1234::1
2034         """
2035         eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
2036         etype = 0x8137  # IPX
2037         if vlan:
2038             # add 802.1q layer
2039             eth /= Dot1Q(vlan=vlan, type=etype)
2040         else:
2041             eth.type = etype
2042
2043         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
2044              IPv6ExtHdrSegmentRouting(addresses=sidlist,
2045                                       segleft=segleft, nh=59) /
2046              eth)
2047         return p
2048
2049     def create_packet_header_IPv6_L2(self, dst_outer, vlan=0):
2050         """Create packet header: L2 encapsulated in IPv6:
2051         IPv6 header, L2
2052
2053         :param ipv6address dst_outer: outer IPv6 destination address
2054         :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
2055         """
2056         eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
2057         etype = 0x8137  # IPX
2058         if vlan:
2059             # add 802.1q layer
2060             eth /= Dot1Q(vlan=vlan, type=etype)
2061         else:
2062             eth.type = etype
2063
2064         p = (IPv6(src='1234::1', dst=dst_outer, nh=59) / eth)
2065         return p
2066
2067     def get_payload_info(self, packet):
2068         """ Extract the payload_info from the packet
2069         """
2070         # in most cases, payload_info is in packet[Raw]
2071         # but packet[Raw] gives the complete payload
2072         # (incl L2 header) for the T.Encaps L2 case
2073         try:
2074             payload_info = self.payload_to_info(packet[Raw])
2075
2076         except:
2077             # remote L2 header from packet[Raw]:
2078             # take packet[Raw], convert it to an Ether layer
2079             # and then extract Raw from it
2080             payload_info = self.payload_to_info(
2081                 Ether(scapy.compat.r(packet[Raw]))[Raw])
2082
2083         return payload_info
2084
2085     def verify_captured_pkts(self, dst_if, capture, compare_func):
2086         """
2087         Verify captured packet stream for specified interface.
2088         Compare ingress with egress packets using the specified compare fn
2089
2090         :param dst_if: egress interface of DUT
2091         :param capture: captured packets
2092         :param compare_func: function to compare in and out packet
2093         """
2094         self.logger.info("Verifying capture on interface %s using function %s"
2095                          % (dst_if.name, compare_func.__name__))
2096
2097         last_info = dict()
2098         for i in self.pg_interfaces:
2099             last_info[i.sw_if_index] = None
2100         dst_sw_if_index = dst_if.sw_if_index
2101
2102         for packet in capture:
2103             try:
2104                 # extract payload_info from packet's payload
2105                 payload_info = self.get_payload_info(packet)
2106                 packet_index = payload_info.index
2107
2108                 self.logger.debug("Verifying packet with index %d"
2109                                   % (packet_index))
2110                 # packet should have arrived on the expected interface
2111                 self.assertEqual(payload_info.dst, dst_sw_if_index)
2112                 self.logger.debug(
2113                     "Got packet on interface %s: src=%u (idx=%u)" %
2114                     (dst_if.name, payload_info.src, packet_index))
2115
2116                 # search for payload_info with same src and dst if_index
2117                 # this will give us the transmitted packet
2118                 next_info = self.get_next_packet_info_for_interface2(
2119                     payload_info.src, dst_sw_if_index,
2120                     last_info[payload_info.src])
2121                 last_info[payload_info.src] = next_info
2122                 # next_info should not be None
2123                 self.assertTrue(next_info is not None)
2124                 # index of tx and rx packets should be equal
2125                 self.assertEqual(packet_index, next_info.index)
2126                 # data field of next_info contains the tx packet
2127                 txed_packet = next_info.data
2128
2129                 self.logger.debug(ppp("Transmitted packet:",
2130                                       txed_packet))  # ppp=Pretty Print Packet
2131
2132                 self.logger.debug(ppp("Received packet:", packet))
2133
2134                 # compare rcvd packet with expected packet using compare_func
2135                 compare_func(txed_packet, packet)
2136
2137             except:
2138                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2139                 raise
2140
2141         # FIXME: there is no need to check manually that all the packets
2142         #        arrived (already done so by get_capture); checking here
2143         #        prevents testing packets that are expected to be dropped, so
2144         #        commenting this out for now
2145
2146         # have all expected packets arrived?
2147         # for i in self.pg_interfaces:
2148         #    remaining_packet = self.get_next_packet_info_for_interface2(
2149         #        i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
2150         #    self.assertTrue(remaining_packet is None,
2151         #                    "Interface %s: Packet expected from interface %s "
2152         #                    "didn't arrive" % (dst_if.name, i.name))
2153
2154
2155 if __name__ == '__main__':
2156     unittest.main(testRunner=VppTestRunner)