Fixed bugs in SRv6 API
[vpp.git] / test / test_srv6.py
1 #!/usr/bin/env python
2
3 import unittest
4 import binascii
5 from socket import AF_INET6
6
7 from framework import VppTestCase, VppTestRunner
8 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto, VppIpTable
9 from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \
10     SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes
11
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether, Dot1Q
14 from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
15 from scapy.layers.inet import IP, UDP
16
17 from scapy.utils import inet_pton, inet_ntop
18
19 from util import ppp
20
21
22 class TestSRv6(VppTestCase):
23     """ SRv6 Test Case """
24
25     @classmethod
26     def setUpClass(self):
27         super(TestSRv6, self).setUpClass()
28
29     def setUp(self):
30         """ Perform test setup before each test case.
31         """
32         super(TestSRv6, self).setUp()
33
34         # packet sizes, inclusive L2 overhead
35         self.pg_packet_sizes = [64, 512, 1518, 9018]
36
37         # reset packet_infos
38         self.reset_packet_infos()
39
40     def tearDown(self):
41         """ Clean up test setup after each test case.
42         """
43         self.teardown_interfaces()
44
45         super(TestSRv6, self).tearDown()
46
47     def configure_interface(self,
48                             interface,
49                             ipv6=False, ipv4=False,
50                             ipv6_table_id=0, ipv4_table_id=0):
51         """ Configure interface.
52         :param ipv6: configure IPv6 on interface
53         :param ipv4: configure IPv4 on interface
54         :param ipv6_table_id: FIB table_id for IPv6
55         :param ipv4_table_id: FIB table_id for IPv4
56         """
57         self.logger.debug("Configuring interface %s" % (interface.name))
58         if ipv6:
59             self.logger.debug("Configuring IPv6")
60             interface.set_table_ip6(ipv6_table_id)
61             interface.config_ip6()
62             interface.resolve_ndp(timeout=5)
63         if ipv4:
64             self.logger.debug("Configuring IPv4")
65             interface.set_table_ip4(ipv4_table_id)
66             interface.config_ip4()
67             interface.resolve_arp()
68         interface.admin_up()
69
70     def setup_interfaces(self, ipv6=[], ipv4=[],
71                          ipv6_table_id=[], ipv4_table_id=[]):
72         """ Create and configure interfaces.
73
74         :param ipv6: list of interface IPv6 capabilities
75         :param ipv4: list of interface IPv4 capabilities
76         :param ipv6_table_id: list of intf IPv6 FIB table_ids
77         :param ipv4_table_id: list of intf IPv4 FIB table_ids
78         :returns: List of created interfaces.
79         """
80         # how many interfaces?
81         if len(ipv6):
82             count = len(ipv6)
83         else:
84             count = len(ipv4)
85         self.logger.debug("Creating and configuring %d interfaces" % (count))
86
87         # fill up ipv6 and ipv4 lists if needed
88         # not enabled (False) is the default
89         if len(ipv6) < count:
90             ipv6 += (count - len(ipv6)) * [False]
91         if len(ipv4) < count:
92             ipv4 += (count - len(ipv4)) * [False]
93
94         # fill up table_id lists if needed
95         # table_id 0 (global) is the default
96         if len(ipv6_table_id) < count:
97             ipv6_table_id += (count - len(ipv6_table_id)) * [0]
98         if len(ipv4_table_id) < count:
99             ipv4_table_id += (count - len(ipv4_table_id)) * [0]
100
101         # create 'count' pg interfaces
102         self.create_pg_interfaces(range(count))
103
104         # setup all interfaces
105         for i in range(count):
106             intf = self.pg_interfaces[i]
107             self.configure_interface(intf,
108                                      ipv6[i], ipv4[i],
109                                      ipv6_table_id[i], ipv4_table_id[i])
110
111         if any(ipv6):
112             self.logger.debug(self.vapi.cli("show ip6 neighbors"))
113         if any(ipv4):
114             self.logger.debug(self.vapi.cli("show ip arp"))
115         self.logger.debug(self.vapi.cli("show interface"))
116         self.logger.debug(self.vapi.cli("show hardware"))
117
118         return self.pg_interfaces
119
120     def teardown_interfaces(self):
121         """ Unconfigure and bring down interface.
122         """
123         self.logger.debug("Tearing down interfaces")
124         # tear down all interfaces
125         # AFAIK they cannot be deleted
126         for i in self.pg_interfaces:
127             self.logger.debug("Tear down interface %s" % (i.name))
128             i.admin_down()
129             i.unconfig()
130             i.set_table_ip4(0)
131             i.set_table_ip6(0)
132
133     @unittest.skipUnless(0, "PC to fix")
134     def test_SRv6_T_Encaps(self):
135         """ Test SRv6 Transit.Encaps behavior for IPv6.
136         """
137         # send traffic to one destination interface
138         # source and destination are IPv6 only
139         self.setup_interfaces(ipv6=[True, True])
140
141         # configure FIB entries
142         route = VppIpRoute(self, "a4::", 64,
143                            [VppRoutePath(self.pg1.remote_ip6,
144                                          self.pg1.sw_if_index,
145                                          proto=DpoProto.DPO_PROTO_IP6)],
146                            is_ip6=1)
147         route.add_vpp_config()
148
149         # configure encaps IPv6 source address
150         # needs to be done before SR Policy config
151         # TODO: API?
152         self.vapi.cli("set sr encaps source addr a3::")
153
154         bsid = 'a3::9999:1'
155         # configure SRv6 Policy
156         # Note: segment list order: first -> last
157         sr_policy = VppSRv6Policy(
158             self, bsid=bsid,
159             is_encap=1,
160             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
161             weight=1, fib_table=0,
162             segments=['a4::', 'a5::', 'a6::c7'],
163             source='a3::')
164         sr_policy.add_vpp_config()
165         self.sr_policy = sr_policy
166
167         # log the sr policies
168         self.logger.info(self.vapi.cli("show sr policies"))
169
170         # steer IPv6 traffic to a7::/64 into SRv6 Policy
171         # use the bsid of the above self.sr_policy
172         pol_steering = VppSRv6Steering(
173                         self,
174                         bsid=self.sr_policy.bsid,
175                         prefix="a7::", mask_width=64,
176                         traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
177                         sr_policy_index=0, table_id=0,
178                         sw_if_index=0)
179         pol_steering.add_vpp_config()
180
181         # log the sr steering policies
182         self.logger.info(self.vapi.cli("show sr steering policies"))
183
184         # create packets
185         count = len(self.pg_packet_sizes)
186         dst_inner = 'a7::1234'
187         pkts = []
188
189         # create IPv6 packets without SRH
190         packet_header = self.create_packet_header_IPv6(dst_inner)
191         # create traffic stream pg0->pg1
192         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
193                                        self.pg_packet_sizes, count))
194
195         # create IPv6 packets with SRH
196         # packets with segments-left 1, active segment a7::
197         packet_header = self.create_packet_header_IPv6_SRH(
198             sidlist=['a8::', 'a7::', 'a6::'],
199             segleft=1)
200         # create traffic stream pg0->pg1
201         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
202                                        self.pg_packet_sizes, count))
203
204         # create IPv6 packets with SRH and IPv6
205         # packets with segments-left 1, active segment a7::
206         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
207             dst_inner,
208             sidlist=['a8::', 'a7::', 'a6::'],
209             segleft=1)
210         # create traffic stream pg0->pg1
211         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
212                                        self.pg_packet_sizes, count))
213
214         # send packets and verify received packets
215         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
216                                   self.compare_rx_tx_packet_T_Encaps)
217
218         # log the localsid counters
219         self.logger.info(self.vapi.cli("show sr localsid"))
220
221         # remove SR steering
222         pol_steering.remove_vpp_config()
223         self.logger.info(self.vapi.cli("show sr steering policies"))
224
225         # remove SR Policies
226         self.sr_policy.remove_vpp_config()
227         self.logger.info(self.vapi.cli("show sr policies"))
228
229         # remove FIB entries
230         # done by tearDown
231
232         # cleanup interfaces
233         self.teardown_interfaces()
234
235     @unittest.skipUnless(0, "PC to fix")
236     def test_SRv6_T_Insert(self):
237         """ Test SRv6 Transit.Insert behavior (IPv6 only).
238         """
239         # send traffic to one destination interface
240         # source and destination are IPv6 only
241         self.setup_interfaces(ipv6=[True, True])
242
243         # configure FIB entries
244         route = VppIpRoute(self, "a4::", 64,
245                            [VppRoutePath(self.pg1.remote_ip6,
246                                          self.pg1.sw_if_index,
247                                          proto=DpoProto.DPO_PROTO_IP6)],
248                            is_ip6=1)
249         route.add_vpp_config()
250
251         # configure encaps IPv6 source address
252         # needs to be done before SR Policy config
253         # TODO: API?
254         self.vapi.cli("set sr encaps source addr a3::")
255
256         bsid = 'a3::9999:1'
257         # configure SRv6 Policy
258         # Note: segment list order: first -> last
259         sr_policy = VppSRv6Policy(
260             self, bsid=bsid,
261             is_encap=0,
262             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
263             weight=1, fib_table=0,
264             segments=['a4::', 'a5::', 'a6::c7'],
265             source='a3::')
266         sr_policy.add_vpp_config()
267         self.sr_policy = sr_policy
268
269         # log the sr policies
270         self.logger.info(self.vapi.cli("show sr policies"))
271
272         # steer IPv6 traffic to a7::/64 into SRv6 Policy
273         # use the bsid of the above self.sr_policy
274         pol_steering = VppSRv6Steering(
275                         self,
276                         bsid=self.sr_policy.bsid,
277                         prefix="a7::", mask_width=64,
278                         traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
279                         sr_policy_index=0, table_id=0,
280                         sw_if_index=0)
281         pol_steering.add_vpp_config()
282
283         # log the sr steering policies
284         self.logger.info(self.vapi.cli("show sr steering policies"))
285
286         # create packets
287         count = len(self.pg_packet_sizes)
288         dst_inner = 'a7::1234'
289         pkts = []
290
291         # create IPv6 packets without SRH
292         packet_header = self.create_packet_header_IPv6(dst_inner)
293         # create traffic stream pg0->pg1
294         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
295                                        self.pg_packet_sizes, count))
296
297         # create IPv6 packets with SRH
298         # packets with segments-left 1, active segment a7::
299         packet_header = self.create_packet_header_IPv6_SRH(
300             sidlist=['a8::', 'a7::', 'a6::'],
301             segleft=1)
302         # create traffic stream pg0->pg1
303         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
304                                        self.pg_packet_sizes, count))
305
306         # send packets and verify received packets
307         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
308                                   self.compare_rx_tx_packet_T_Insert)
309
310         # log the localsid counters
311         self.logger.info(self.vapi.cli("show sr localsid"))
312
313         # remove SR steering
314         pol_steering.remove_vpp_config()
315         self.logger.info(self.vapi.cli("show sr steering policies"))
316
317         # remove SR Policies
318         self.sr_policy.remove_vpp_config()
319         self.logger.info(self.vapi.cli("show sr policies"))
320
321         # remove FIB entries
322         # done by tearDown
323
324         # cleanup interfaces
325         self.teardown_interfaces()
326
327     @unittest.skipUnless(0, "PC to fix")
328     def test_SRv6_T_Encaps_IPv4(self):
329         """ Test SRv6 Transit.Encaps behavior for IPv4.
330         """
331         # send traffic to one destination interface
332         # source interface is IPv4 only
333         # destination interface is IPv6 only
334         self.setup_interfaces(ipv6=[False, True], ipv4=[True, False])
335
336         # configure FIB entries
337         route = VppIpRoute(self, "a4::", 64,
338                            [VppRoutePath(self.pg1.remote_ip6,
339                                          self.pg1.sw_if_index,
340                                          proto=DpoProto.DPO_PROTO_IP6)],
341                            is_ip6=1)
342         route.add_vpp_config()
343
344         # configure encaps IPv6 source address
345         # needs to be done before SR Policy config
346         # TODO: API?
347         self.vapi.cli("set sr encaps source addr a3::")
348
349         bsid = 'a3::9999:1'
350         # configure SRv6 Policy
351         # Note: segment list order: first -> last
352         sr_policy = VppSRv6Policy(
353             self, bsid=bsid,
354             is_encap=1,
355             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
356             weight=1, fib_table=0,
357             segments=['a4::', 'a5::', 'a6::c7'],
358             source='a3::')
359         sr_policy.add_vpp_config()
360         self.sr_policy = sr_policy
361
362         # log the sr policies
363         self.logger.info(self.vapi.cli("show sr policies"))
364
365         # steer IPv4 traffic to 7.1.1.0/24 into SRv6 Policy
366         # use the bsid of the above self.sr_policy
367         pol_steering = VppSRv6Steering(
368                         self,
369                         bsid=self.sr_policy.bsid,
370                         prefix="7.1.1.0", mask_width=24,
371                         traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV4,
372                         sr_policy_index=0, table_id=0,
373                         sw_if_index=0)
374         pol_steering.add_vpp_config()
375
376         # log the sr steering policies
377         self.logger.info(self.vapi.cli("show sr steering policies"))
378
379         # create packets
380         count = len(self.pg_packet_sizes)
381         dst_inner = '7.1.1.123'
382         pkts = []
383
384         # create IPv4 packets
385         packet_header = self.create_packet_header_IPv4(dst_inner)
386         # create traffic stream pg0->pg1
387         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
388                                        self.pg_packet_sizes, count))
389
390         # send packets and verify received packets
391         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
392                                   self.compare_rx_tx_packet_T_Encaps_IPv4)
393
394         # log the localsid counters
395         self.logger.info(self.vapi.cli("show sr localsid"))
396
397         # remove SR steering
398         pol_steering.remove_vpp_config()
399         self.logger.info(self.vapi.cli("show sr steering policies"))
400
401         # remove SR Policies
402         self.sr_policy.remove_vpp_config()
403         self.logger.info(self.vapi.cli("show sr policies"))
404
405         # remove FIB entries
406         # done by tearDown
407
408         # cleanup interfaces
409         self.teardown_interfaces()
410
411     @unittest.skip("VPP crashes after running this test")
412     def test_SRv6_T_Encaps_L2(self):
413         """ Test SRv6 Transit.Encaps behavior for L2.
414         """
415         # send traffic to one destination interface
416         # source interface is IPv4 only TODO?
417         # destination interface is IPv6 only
418         self.setup_interfaces(ipv6=[False, True], ipv4=[False, False])
419
420         # configure FIB entries
421         route = VppIpRoute(self, "a4::", 64,
422                            [VppRoutePath(self.pg1.remote_ip6,
423                                          self.pg1.sw_if_index,
424                                          proto=DpoProto.DPO_PROTO_IP6)],
425                            is_ip6=1)
426         route.add_vpp_config()
427
428         # configure encaps IPv6 source address
429         # needs to be done before SR Policy config
430         # TODO: API?
431         self.vapi.cli("set sr encaps source addr a3::")
432
433         bsid = 'a3::9999:1'
434         # configure SRv6 Policy
435         # Note: segment list order: first -> last
436         sr_policy = VppSRv6Policy(
437             self, bsid=bsid,
438             is_encap=1,
439             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
440             weight=1, fib_table=0,
441             segments=['a4::', 'a5::', 'a6::c7'],
442             source='a3::')
443         sr_policy.add_vpp_config()
444         self.sr_policy = sr_policy
445
446         # log the sr policies
447         self.logger.info(self.vapi.cli("show sr policies"))
448
449         # steer L2 traffic into SRv6 Policy
450         # use the bsid of the above self.sr_policy
451         pol_steering = VppSRv6Steering(
452                         self,
453                         bsid=self.sr_policy.bsid,
454                         prefix="::", mask_width=0,
455                         traffic_type=SRv6PolicySteeringTypes.SR_STEER_L2,
456                         sr_policy_index=0, table_id=0,
457                         sw_if_index=self.pg0.sw_if_index)
458         pol_steering.add_vpp_config()
459
460         # log the sr steering policies
461         self.logger.info(self.vapi.cli("show sr steering policies"))
462
463         # create packets
464         count = len(self.pg_packet_sizes)
465         pkts = []
466
467         # create L2 packets without dot1q header
468         packet_header = self.create_packet_header_L2()
469         # create traffic stream pg0->pg1
470         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
471                                        self.pg_packet_sizes, count))
472
473         # create L2 packets with dot1q header
474         packet_header = self.create_packet_header_L2(vlan=123)
475         # create traffic stream pg0->pg1
476         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
477                                        self.pg_packet_sizes, count))
478
479         # send packets and verify received packets
480         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
481                                   self.compare_rx_tx_packet_T_Encaps_L2)
482
483         # log the localsid counters
484         self.logger.info(self.vapi.cli("show sr localsid"))
485
486         # remove SR steering
487         pol_steering.remove_vpp_config()
488         self.logger.info(self.vapi.cli("show sr steering policies"))
489
490         # remove SR Policies
491         self.sr_policy.remove_vpp_config()
492         self.logger.info(self.vapi.cli("show sr policies"))
493
494         # remove FIB entries
495         # done by tearDown
496
497         # cleanup interfaces
498         self.teardown_interfaces()
499
500     def test_SRv6_End(self):
501         """ Test SRv6 End (without PSP) behavior.
502         """
503         # send traffic to one destination interface
504         # source and destination interfaces are IPv6 only
505         self.setup_interfaces(ipv6=[True, True])
506
507         # configure FIB entries
508         route = VppIpRoute(self, "a4::", 64,
509                            [VppRoutePath(self.pg1.remote_ip6,
510                                          self.pg1.sw_if_index,
511                                          proto=DpoProto.DPO_PROTO_IP6)],
512                            is_ip6=1)
513         route.add_vpp_config()
514
515         # configure SRv6 localSID End without PSP behavior
516         localsid = VppSRv6LocalSID(
517                         self, localsid={'addr': 'A3::0'},
518                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
519                         nh_addr4='0.0.0.0',
520                         nh_addr6='::',
521                         end_psp=0,
522                         sw_if_index=0,
523                         vlan_index=0,
524                         fib_table=0)
525         localsid.add_vpp_config()
526         # log the localsids
527         self.logger.debug(self.vapi.cli("show sr localsid"))
528
529         # create IPv6 packets with SRH (SL=2, SL=1, SL=0)
530         # send one packet per SL value per packet size
531         # SL=0 packet with localSID End with USP needs 2nd SRH
532         count = len(self.pg_packet_sizes)
533         dst_inner = 'a4::1234'
534         pkts = []
535
536         # packets with segments-left 2, active segment a3::
537         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
538                 dst_inner,
539                 sidlist=['a5::', 'a4::', 'a3::'],
540                 segleft=2)
541         # create traffic stream pg0->pg1
542         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
543                                        self.pg_packet_sizes, count))
544
545         # packets with segments-left 1, active segment a3::
546         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
547                 dst_inner,
548                 sidlist=['a4::', 'a3::', 'a2::'],
549                 segleft=1)
550         # add to traffic stream pg0->pg1
551         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
552                                        self.pg_packet_sizes, count))
553
554         # TODO: test behavior with SL=0 packet (needs 2*SRH?)
555
556         # send packets and verify received packets
557         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
558                                   self.compare_rx_tx_packet_End)
559
560         # log the localsid counters
561         self.logger.info(self.vapi.cli("show sr localsid"))
562
563         # remove SRv6 localSIDs
564         localsid.remove_vpp_config()
565
566         # remove FIB entries
567         # done by tearDown
568
569         # cleanup interfaces
570         self.teardown_interfaces()
571
572     def test_SRv6_End_with_PSP(self):
573         """ Test SRv6 End with PSP behavior.
574         """
575         # send traffic to one destination interface
576         # source and destination interfaces are IPv6 only
577         self.setup_interfaces(ipv6=[True, True])
578
579         # configure FIB entries
580         route = VppIpRoute(self, "a4::", 64,
581                            [VppRoutePath(self.pg1.remote_ip6,
582                                          self.pg1.sw_if_index,
583                                          proto=DpoProto.DPO_PROTO_IP6)],
584                            is_ip6=1)
585         route.add_vpp_config()
586
587         # configure SRv6 localSID End with PSP behavior
588         localsid = VppSRv6LocalSID(
589                         self, localsid={'addr': 'A3::0'},
590                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
591                         nh_addr4='0.0.0.0',
592                         nh_addr6='::',
593                         end_psp=1,
594                         sw_if_index=0,
595                         vlan_index=0,
596                         fib_table=0)
597         localsid.add_vpp_config()
598         # log the localsids
599         self.logger.debug(self.vapi.cli("show sr localsid"))
600
601         # create IPv6 packets with SRH (SL=2, SL=1)
602         # send one packet per SL value per packet size
603         # SL=0 packet with localSID End with PSP is dropped
604         count = len(self.pg_packet_sizes)
605         dst_inner = 'a4::1234'
606         pkts = []
607
608         # packets with segments-left 2, active segment a3::
609         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
610                     dst_inner,
611                     sidlist=['a5::', 'a4::', 'a3::'],
612                     segleft=2)
613         # create traffic stream pg0->pg1
614         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
615                                        self.pg_packet_sizes, count))
616
617         # packets with segments-left 1, active segment a3::
618         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
619                     dst_inner,
620                     sidlist=['a4::', 'a3::', 'a2::'],
621                     segleft=1)
622         # add to traffic stream pg0->pg1
623         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
624                                        self.pg_packet_sizes, count))
625
626         # send packets and verify received packets
627         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
628                                   self.compare_rx_tx_packet_End_PSP)
629
630         # log the localsid counters
631         self.logger.info(self.vapi.cli("show sr localsid"))
632
633         # remove SRv6 localSIDs
634         localsid.remove_vpp_config()
635
636         # remove FIB entries
637         # done by tearDown
638
639         # cleanup interfaces
640         self.teardown_interfaces()
641
642     def test_SRv6_End_X(self):
643         """ Test SRv6 End.X (without PSP) behavior.
644         """
645         # create three interfaces (1 source, 2 destinations)
646         # source and destination interfaces are IPv6 only
647         self.setup_interfaces(ipv6=[True, True, True])
648
649         # configure FIB entries
650         # a4::/64 via pg1 and pg2
651         route = VppIpRoute(self, "a4::", 64,
652                            [VppRoutePath(self.pg1.remote_ip6,
653                                          self.pg1.sw_if_index,
654                                          proto=DpoProto.DPO_PROTO_IP6),
655                             VppRoutePath(self.pg2.remote_ip6,
656                                          self.pg2.sw_if_index,
657                                          proto=DpoProto.DPO_PROTO_IP6)],
658                            is_ip6=1)
659         route.add_vpp_config()
660         self.logger.debug(self.vapi.cli("show ip6 fib"))
661
662         # configure SRv6 localSID End.X without PSP behavior
663         # End.X points to interface pg1
664         localsid = VppSRv6LocalSID(
665                         self, localsid={'addr': 'A3::C4'},
666                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
667                         nh_addr4='0.0.0.0',
668                         nh_addr6=self.pg1.remote_ip6,
669                         end_psp=0,
670                         sw_if_index=self.pg1.sw_if_index,
671                         vlan_index=0,
672                         fib_table=0)
673         localsid.add_vpp_config()
674         # log the localsids
675         self.logger.debug(self.vapi.cli("show sr localsid"))
676
677         # create IPv6 packets with SRH (SL=2, SL=1)
678         # send one packet per SL value per packet size
679         # SL=0 packet with localSID End with PSP is dropped
680         count = len(self.pg_packet_sizes)
681         dst_inner = 'a4::1234'
682         pkts = []
683
684         # packets with segments-left 2, active segment a3::c4
685         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
686                     dst_inner,
687                     sidlist=['a5::', 'a4::', 'a3::c4'],
688                     segleft=2)
689         # create traffic stream pg0->pg1
690         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
691                                        self.pg_packet_sizes, count))
692
693         # packets with segments-left 1, active segment a3::c4
694         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
695                     dst_inner,
696                     sidlist=['a4::', 'a3::c4', 'a2::'],
697                     segleft=1)
698         # add to traffic stream pg0->pg1
699         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
700                                        self.pg_packet_sizes, count))
701
702         # send packets and verify received packets
703         # using same comparison function as End (no PSP)
704         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
705                                   self.compare_rx_tx_packet_End)
706
707         # assert nothing was received on the other interface (pg2)
708         self.pg2.assert_nothing_captured("mis-directed packet(s)")
709
710         # log the localsid counters
711         self.logger.info(self.vapi.cli("show sr localsid"))
712
713         # remove SRv6 localSIDs
714         localsid.remove_vpp_config()
715
716         # remove FIB entries
717         # done by tearDown
718
719         # cleanup interfaces
720         self.teardown_interfaces()
721
722     def test_SRv6_End_X_with_PSP(self):
723         """ Test SRv6 End.X with PSP behavior.
724         """
725         # create three interfaces (1 source, 2 destinations)
726         # source and destination interfaces are IPv6 only
727         self.setup_interfaces(ipv6=[True, True, True])
728
729         # configure FIB entries
730         # a4::/64 via pg1 and pg2
731         route = VppIpRoute(self, "a4::", 64,
732                            [VppRoutePath(self.pg1.remote_ip6,
733                                          self.pg1.sw_if_index,
734                                          proto=DpoProto.DPO_PROTO_IP6),
735                             VppRoutePath(self.pg2.remote_ip6,
736                                          self.pg2.sw_if_index,
737                                          proto=DpoProto.DPO_PROTO_IP6)],
738                            is_ip6=1)
739         route.add_vpp_config()
740
741         # configure SRv6 localSID End with PSP behavior
742         localsid = VppSRv6LocalSID(
743                         self, localsid={'addr': 'A3::C4'},
744                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
745                         nh_addr4='0.0.0.0',
746                         nh_addr6=self.pg1.remote_ip6,
747                         end_psp=1,
748                         sw_if_index=self.pg1.sw_if_index,
749                         vlan_index=0,
750                         fib_table=0)
751         localsid.add_vpp_config()
752         # log the localsids
753         self.logger.debug(self.vapi.cli("show sr localsid"))
754
755         # create IPv6 packets with SRH (SL=2, SL=1)
756         # send one packet per SL value per packet size
757         # SL=0 packet with localSID End with PSP is dropped
758         count = len(self.pg_packet_sizes)
759         dst_inner = 'a4::1234'
760         pkts = []
761
762         # packets with segments-left 2, active segment a3::
763         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
764                     dst_inner,
765                     sidlist=['a5::', 'a4::', 'a3::c4'],
766                     segleft=2)
767         # create traffic stream pg0->pg1
768         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
769                                        self.pg_packet_sizes, count))
770
771         # packets with segments-left 1, active segment a3::
772         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
773                     dst_inner,
774                     sidlist=['a4::', 'a3::c4', 'a2::'],
775                     segleft=1)
776         # add to traffic stream pg0->pg1
777         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
778                                        self.pg_packet_sizes, count))
779
780         # send packets and verify received packets
781         # using same comparison function as End with PSP
782         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
783                                   self.compare_rx_tx_packet_End_PSP)
784
785         # assert nothing was received on the other interface (pg2)
786         self.pg2.assert_nothing_captured("mis-directed packet(s)")
787
788         # log the localsid counters
789         self.logger.info(self.vapi.cli("show sr localsid"))
790
791         # remove SRv6 localSIDs
792         localsid.remove_vpp_config()
793
794         # remove FIB entries
795         # done by tearDown
796
797         # cleanup interfaces
798         self.teardown_interfaces()
799
800     def test_SRv6_End_DX6(self):
801         """ Test SRv6 End.DX6 behavior.
802         """
803         # send traffic to one destination interface
804         # source and destination interfaces are IPv6 only
805         self.setup_interfaces(ipv6=[True, True])
806
807         # configure SRv6 localSID End.DX6 behavior
808         localsid = VppSRv6LocalSID(
809                         self, localsid={'addr': 'A3::C4'},
810                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX6,
811                         nh_addr4='0.0.0.0',
812                         nh_addr6=self.pg1.remote_ip6,
813                         end_psp=0,
814                         sw_if_index=self.pg1.sw_if_index,
815                         vlan_index=0,
816                         fib_table=0)
817         localsid.add_vpp_config()
818         # log the localsids
819         self.logger.debug(self.vapi.cli("show sr localsid"))
820
821         # create IPv6 packets with SRH (SL=0)
822         # send one packet per packet size
823         count = len(self.pg_packet_sizes)
824         dst_inner = 'a4::1234'  # inner header destination address
825         pkts = []
826
827         # packets with SRH, segments-left 0, active segment a3::c4
828         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
829                         dst_inner,
830                         sidlist=['a3::c4', 'a2::', 'a1::'],
831                         segleft=0)
832         # add to traffic stream pg0->pg1
833         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
834                                        self.pg_packet_sizes, count))
835
836         # packets without SRH, IPv6 in IPv6
837         # outer IPv6 dest addr is the localsid End.DX6
838         packet_header = self.create_packet_header_IPv6_IPv6(
839                                             dst_inner,
840                                             dst_outer='a3::c4')
841         # add to traffic stream pg0->pg1
842         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
843                                        self.pg_packet_sizes, count))
844
845         # send packets and verify received packets
846         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
847                                   self.compare_rx_tx_packet_End_DX6)
848
849         # log the localsid counters
850         self.logger.info(self.vapi.cli("show sr localsid"))
851
852         # remove SRv6 localSIDs
853         localsid.remove_vpp_config()
854
855         # cleanup interfaces
856         self.teardown_interfaces()
857
858     def test_SRv6_End_DT6(self):
859         """ Test SRv6 End.DT6 behavior.
860         """
861         # create three interfaces (1 source, 2 destinations)
862         # all interfaces are IPv6 only
863         # source interface in global FIB (0)
864         # destination interfaces in global and vrf
865         vrf_1 = 1
866         ipt = VppIpTable(self, vrf_1, is_ip6=True)
867         ipt.add_vpp_config()
868         self.setup_interfaces(ipv6=[True, True, True],
869                               ipv6_table_id=[0, 0, vrf_1])
870
871         # configure FIB entries
872         # a4::/64 is reachable
873         #     via pg1 in table 0 (global)
874         #     and via pg2 in table vrf_1
875         route0 = VppIpRoute(self, "a4::", 64,
876                             [VppRoutePath(self.pg1.remote_ip6,
877                                           self.pg1.sw_if_index,
878                                           proto=DpoProto.DPO_PROTO_IP6,
879                                           nh_table_id=0)],
880                             table_id=0,
881                             is_ip6=1)
882         route0.add_vpp_config()
883         route1 = VppIpRoute(self, "a4::", 64,
884                             [VppRoutePath(self.pg2.remote_ip6,
885                                           self.pg2.sw_if_index,
886                                           proto=DpoProto.DPO_PROTO_IP6,
887                                           nh_table_id=vrf_1)],
888                             table_id=vrf_1,
889                             is_ip6=1)
890         route1.add_vpp_config()
891         self.logger.debug(self.vapi.cli("show ip6 fib"))
892
893         # configure SRv6 localSID End.DT6 behavior
894         # Note:
895         # fib_table: where the localsid is installed
896         # sw_if_index: in T-variants of localsid this is the vrf table_id
897         localsid = VppSRv6LocalSID(
898                         self, localsid={'addr': 'A3::C4'},
899                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT6,
900                         nh_addr4='0.0.0.0',
901                         nh_addr6='::',
902                         end_psp=0,
903                         sw_if_index=vrf_1,
904                         vlan_index=0,
905                         fib_table=0)
906         localsid.add_vpp_config()
907         # log the localsids
908         self.logger.debug(self.vapi.cli("show sr localsid"))
909
910         # create IPv6 packets with SRH (SL=0)
911         # send one packet per packet size
912         count = len(self.pg_packet_sizes)
913         dst_inner = 'a4::1234'  # inner header destination address
914         pkts = []
915
916         # packets with SRH, segments-left 0, active segment a3::c4
917         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
918                         dst_inner,
919                         sidlist=['a3::c4', 'a2::', 'a1::'],
920                         segleft=0)
921         # add to traffic stream pg0->pg1
922         pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
923                                        self.pg_packet_sizes, count))
924
925         # packets without SRH, IPv6 in IPv6
926         # outer IPv6 dest addr is the localsid End.DT6
927         packet_header = self.create_packet_header_IPv6_IPv6(
928                                             dst_inner,
929                                             dst_outer='a3::c4')
930         # add to traffic stream pg0->pg1
931         pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
932                                        self.pg_packet_sizes, count))
933
934         # send packets and verify received packets
935         # using same comparison function as End.DX6
936         self.send_and_verify_pkts(self.pg0, pkts, self.pg2,
937                                   self.compare_rx_tx_packet_End_DX6)
938
939         # assert nothing was received on the other interface (pg2)
940         self.pg1.assert_nothing_captured("mis-directed packet(s)")
941
942         # log the localsid counters
943         self.logger.info(self.vapi.cli("show sr localsid"))
944
945         # remove SRv6 localSIDs
946         localsid.remove_vpp_config()
947
948         # remove FIB entries
949         # done by tearDown
950
951         # cleanup interfaces
952         self.teardown_interfaces()
953
954     def test_SRv6_End_DX4(self):
955         """ Test SRv6 End.DX4 behavior.
956         """
957         # send traffic to one destination interface
958         # source interface is IPv6 only
959         # destination interface is IPv4 only
960         self.setup_interfaces(ipv6=[True, False], ipv4=[False, True])
961
962         # configure SRv6 localSID End.DX4 behavior
963         localsid = VppSRv6LocalSID(
964                         self, localsid={'addr': 'A3::C4'},
965                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX4,
966                         nh_addr4=self.pg1.remote_ip4,
967                         nh_addr6='::',
968                         end_psp=0,
969                         sw_if_index=self.pg1.sw_if_index,
970                         vlan_index=0,
971                         fib_table=0)
972         localsid.add_vpp_config()
973         # log the localsids
974         self.logger.debug(self.vapi.cli("show sr localsid"))
975
976         # send one packet per packet size
977         count = len(self.pg_packet_sizes)
978         dst_inner = '4.1.1.123'  # inner header destination address
979         pkts = []
980
981         # packets with SRH, segments-left 0, active segment a3::c4
982         packet_header = self.create_packet_header_IPv6_SRH_IPv4(
983                         dst_inner,
984                         sidlist=['a3::c4', 'a2::', 'a1::'],
985                         segleft=0)
986         # add to traffic stream pg0->pg1
987         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
988                                        self.pg_packet_sizes, count))
989
990         # packets without SRH, IPv4 in IPv6
991         # outer IPv6 dest addr is the localsid End.DX4
992         packet_header = self.create_packet_header_IPv6_IPv4(
993                                             dst_inner,
994                                             dst_outer='a3::c4')
995         # add to traffic stream pg0->pg1
996         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
997                                        self.pg_packet_sizes, count))
998
999         # send packets and verify received packets
1000         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
1001                                   self.compare_rx_tx_packet_End_DX4)
1002
1003         # log the localsid counters
1004         self.logger.info(self.vapi.cli("show sr localsid"))
1005
1006         # remove SRv6 localSIDs
1007         localsid.remove_vpp_config()
1008
1009         # cleanup interfaces
1010         self.teardown_interfaces()
1011
1012     def test_SRv6_End_DT4(self):
1013         """ Test SRv6 End.DT4 behavior.
1014         """
1015         # create three interfaces (1 source, 2 destinations)
1016         # source interface is IPv6-only
1017         # destination interfaces are IPv4 only
1018         # source interface in global FIB (0)
1019         # destination interfaces in global and vrf
1020         vrf_1 = 1
1021         ipt = VppIpTable(self, vrf_1)
1022         ipt.add_vpp_config()
1023         self.setup_interfaces(ipv6=[True, False, False],
1024                               ipv4=[False, True, True],
1025                               ipv6_table_id=[0, 0, 0],
1026                               ipv4_table_id=[0, 0, vrf_1])
1027
1028         # configure FIB entries
1029         # 4.1.1.0/24 is reachable
1030         #     via pg1 in table 0 (global)
1031         #     and via pg2 in table vrf_1
1032         route0 = VppIpRoute(self, "4.1.1.0", 24,
1033                             [VppRoutePath(self.pg1.remote_ip4,
1034                                           self.pg1.sw_if_index,
1035                                           nh_table_id=0)],
1036                             table_id=0,
1037                             is_ip6=0)
1038         route0.add_vpp_config()
1039         route1 = VppIpRoute(self, "4.1.1.0", 24,
1040                             [VppRoutePath(self.pg2.remote_ip4,
1041                                           self.pg2.sw_if_index,
1042                                           nh_table_id=vrf_1)],
1043                             table_id=vrf_1,
1044                             is_ip6=0)
1045         route1.add_vpp_config()
1046         self.logger.debug(self.vapi.cli("show ip fib"))
1047
1048         # configure SRv6 localSID End.DT6 behavior
1049         # Note:
1050         # fib_table: where the localsid is installed
1051         # sw_if_index: in T-variants of localsid: vrf table_id
1052         localsid = VppSRv6LocalSID(
1053                         self, localsid={'addr': 'A3::C4'},
1054                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT4,
1055                         nh_addr4='0.0.0.0',
1056                         nh_addr6='::',
1057                         end_psp=0,
1058                         sw_if_index=vrf_1,
1059                         vlan_index=0,
1060                         fib_table=0)
1061         localsid.add_vpp_config()
1062         # log the localsids
1063         self.logger.debug(self.vapi.cli("show sr localsid"))
1064
1065         # create IPv6 packets with SRH (SL=0)
1066         # send one packet per packet size
1067         count = len(self.pg_packet_sizes)
1068         dst_inner = '4.1.1.123'  # inner header destination address
1069         pkts = []
1070
1071         # packets with SRH, segments-left 0, active segment a3::c4
1072         packet_header = self.create_packet_header_IPv6_SRH_IPv4(
1073                         dst_inner,
1074                         sidlist=['a3::c4', 'a2::', 'a1::'],
1075                         segleft=0)
1076         # add to traffic stream pg0->pg1
1077         pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
1078                                        self.pg_packet_sizes, count))
1079
1080         # packets without SRH, IPv6 in IPv6
1081         # outer IPv6 dest addr is the localsid End.DX4
1082         packet_header = self.create_packet_header_IPv6_IPv4(
1083                                             dst_inner,
1084                                             dst_outer='a3::c4')
1085         # add to traffic stream pg0->pg1
1086         pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
1087                                        self.pg_packet_sizes, count))
1088
1089         # send packets and verify received packets
1090         # using same comparison function as End.DX4
1091         self.send_and_verify_pkts(self.pg0, pkts, self.pg2,
1092                                   self.compare_rx_tx_packet_End_DX4)
1093
1094         # assert nothing was received on the other interface (pg2)
1095         self.pg1.assert_nothing_captured("mis-directed packet(s)")
1096
1097         # log the localsid counters
1098         self.logger.info(self.vapi.cli("show sr localsid"))
1099
1100         # remove SRv6 localSIDs
1101         localsid.remove_vpp_config()
1102
1103         # remove FIB entries
1104         # done by tearDown
1105
1106         # cleanup interfaces
1107         self.teardown_interfaces()
1108
1109     def test_SRv6_End_DX2(self):
1110         """ Test SRv6 End.DX2 behavior.
1111         """
1112         # send traffic to one destination interface
1113         # source interface is IPv6 only
1114         self.setup_interfaces(ipv6=[True, False], ipv4=[False, False])
1115
1116         # configure SRv6 localSID End.DX2 behavior
1117         localsid = VppSRv6LocalSID(
1118                         self, localsid={'addr': 'A3::C4'},
1119                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX2,
1120                         nh_addr4='0.0.0.0',
1121                         nh_addr6='::',
1122                         end_psp=0,
1123                         sw_if_index=self.pg1.sw_if_index,
1124                         vlan_index=0,
1125                         fib_table=0)
1126         localsid.add_vpp_config()
1127         # log the localsids
1128         self.logger.debug(self.vapi.cli("show sr localsid"))
1129
1130         # send one packet per packet size
1131         count = len(self.pg_packet_sizes)
1132         pkts = []
1133
1134         # packets with SRH, segments-left 0, active segment a3::c4
1135         # L2 has no dot1q header
1136         packet_header = self.create_packet_header_IPv6_SRH_L2(
1137                             sidlist=['a3::c4', 'a2::', 'a1::'],
1138                             segleft=0,
1139                             vlan=0)
1140         # add to traffic stream pg0->pg1
1141         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1142                                        self.pg_packet_sizes, count))
1143
1144         # packets with SRH, segments-left 0, active segment a3::c4
1145         # L2 has dot1q header
1146         packet_header = self.create_packet_header_IPv6_SRH_L2(
1147                             sidlist=['a3::c4', 'a2::', 'a1::'],
1148                             segleft=0,
1149                             vlan=123)
1150         # add to traffic stream pg0->pg1
1151         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1152                                        self.pg_packet_sizes, count))
1153
1154         # packets without SRH, L2 in IPv6
1155         # outer IPv6 dest addr is the localsid End.DX2
1156         # L2 has no dot1q header
1157         packet_header = self.create_packet_header_IPv6_L2(
1158                                             dst_outer='a3::c4',
1159                                             vlan=0)
1160         # add to traffic stream pg0->pg1
1161         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1162                                        self.pg_packet_sizes, count))
1163
1164         # packets without SRH, L2 in IPv6
1165         # outer IPv6 dest addr is the localsid End.DX2
1166         # L2 has dot1q header
1167         packet_header = self.create_packet_header_IPv6_L2(
1168                                             dst_outer='a3::c4',
1169                                             vlan=123)
1170         # add to traffic stream pg0->pg1
1171         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1172                                        self.pg_packet_sizes, count))
1173
1174         # send packets and verify received packets
1175         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
1176                                   self.compare_rx_tx_packet_End_DX2)
1177
1178         # log the localsid counters
1179         self.logger.info(self.vapi.cli("show sr localsid"))
1180
1181         # remove SRv6 localSIDs
1182         localsid.remove_vpp_config()
1183
1184         # cleanup interfaces
1185         self.teardown_interfaces()
1186
1187     @unittest.skipUnless(0, "PC to fix")
1188     def test_SRv6_T_Insert_Classifier(self):
1189         """ Test SRv6 Transit.Insert behavior (IPv6 only).
1190             steer packets using the classifier
1191         """
1192         # send traffic to one destination interface
1193         # source and destination are IPv6 only
1194         self.setup_interfaces(ipv6=[False, False, False, True, True])
1195
1196         # configure FIB entries
1197         route = VppIpRoute(self, "a4::", 64,
1198                            [VppRoutePath(self.pg4.remote_ip6,
1199                                          self.pg4.sw_if_index,
1200                                          proto=DpoProto.DPO_PROTO_IP6)],
1201                            is_ip6=1)
1202         route.add_vpp_config()
1203
1204         # configure encaps IPv6 source address
1205         # needs to be done before SR Policy config
1206         # TODO: API?
1207         self.vapi.cli("set sr encaps source addr a3::")
1208
1209         bsid = 'a3::9999:1'
1210         # configure SRv6 Policy
1211         # Note: segment list order: first -> last
1212         sr_policy = VppSRv6Policy(
1213             self, bsid=bsid,
1214             is_encap=0,
1215             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
1216             weight=1, fib_table=0,
1217             segments=['a4::', 'a5::', 'a6::c7'],
1218             source='a3::')
1219         sr_policy.add_vpp_config()
1220         self.sr_policy = sr_policy
1221
1222         # log the sr policies
1223         self.logger.info(self.vapi.cli("show sr policies"))
1224
1225         # add classify table
1226         # mask on dst ip address prefix a7::/8
1227         mask = '{:0<16}'.format('ff')
1228         r = self.vapi.classify_add_del_table(
1229             1,
1230             binascii.unhexlify(mask),
1231             match_n_vectors=(len(mask) - 1) // 32 + 1,
1232             current_data_flag=1,
1233             skip_n_vectors=2)  # data offset
1234         self.assertIsNotNone(r, msg='No response msg for add_del_table')
1235         table_index = r.new_table_index
1236
1237         # add the source routign node as a ip6 inacl netxt node
1238         r = self.vapi.add_node_next('ip6-inacl',
1239                                     'sr-pl-rewrite-insert')
1240         inacl_next_node_index = r.node_index
1241
1242         match = '{:0<16}'.format('a7')
1243         r = self.vapi.classify_add_del_session(
1244             1,
1245             table_index,
1246             binascii.unhexlify(match),
1247             hit_next_index=inacl_next_node_index,
1248             action=3,
1249             metadata=0)  # sr policy index
1250         self.assertIsNotNone(r, msg='No response msg for add_del_session')
1251
1252         # log the classify table used in the steering policy
1253         self.logger.info(self.vapi.cli("show classify table"))
1254
1255         r = self.vapi.input_acl_set_interface(
1256             is_add=1,
1257             sw_if_index=self.pg3.sw_if_index,
1258             ip6_table_index=table_index)
1259         self.assertIsNotNone(r,
1260                              msg='No response msg for input_acl_set_interface')
1261
1262         # log the ip6 inacl
1263         self.logger.info(self.vapi.cli("show inacl type ip6"))
1264
1265         # create packets
1266         count = len(self.pg_packet_sizes)
1267         dst_inner = 'a7::1234'
1268         pkts = []
1269
1270         # create IPv6 packets without SRH
1271         packet_header = self.create_packet_header_IPv6(dst_inner)
1272         # create traffic stream pg3->pg4
1273         pkts.extend(self.create_stream(self.pg3, self.pg4, packet_header,
1274                                        self.pg_packet_sizes, count))
1275
1276         # create IPv6 packets with SRH
1277         # packets with segments-left 1, active segment a7::
1278         packet_header = self.create_packet_header_IPv6_SRH(
1279             sidlist=['a8::', 'a7::', 'a6::'],
1280             segleft=1)
1281         # create traffic stream pg3->pg4
1282         pkts.extend(self.create_stream(self.pg3, self.pg4, packet_header,
1283                                        self.pg_packet_sizes, count))
1284
1285         # send packets and verify received packets
1286         self.send_and_verify_pkts(self.pg3, pkts, self.pg4,
1287                                   self.compare_rx_tx_packet_T_Insert)
1288
1289         # remove the interface l2 input feature
1290         r = self.vapi.input_acl_set_interface(
1291             is_add=0,
1292             sw_if_index=self.pg3.sw_if_index,
1293             ip6_table_index=table_index)
1294         self.assertIsNotNone(r,
1295                              msg='No response msg for input_acl_set_interface')
1296
1297         # log the ip6 inacl after cleaning
1298         self.logger.info(self.vapi.cli("show inacl type ip6"))
1299
1300         # log the localsid counters
1301         self.logger.info(self.vapi.cli("show sr localsid"))
1302
1303         # remove classifier SR steering
1304         # classifier_steering.remove_vpp_config()
1305         self.logger.info(self.vapi.cli("show sr steering policies"))
1306
1307         # remove SR Policies
1308         self.sr_policy.remove_vpp_config()
1309         self.logger.info(self.vapi.cli("show sr policies"))
1310
1311         # remove classify session and table
1312         r = self.vapi.classify_add_del_session(
1313             0,
1314             table_index,
1315             binascii.unhexlify(match))
1316         self.assertIsNotNone(r, msg='No response msg for add_del_session')
1317
1318         r = self.vapi.classify_add_del_table(
1319             0,
1320             binascii.unhexlify(mask),
1321             table_index=table_index)
1322         self.assertIsNotNone(r, msg='No response msg for add_del_table')
1323
1324         self.logger.info(self.vapi.cli("show classify table"))
1325
1326         # remove FIB entries
1327         # done by tearDown
1328
1329         # cleanup interfaces
1330         self.teardown_interfaces()
1331
1332     def compare_rx_tx_packet_T_Encaps(self, tx_pkt, rx_pkt):
1333         """ Compare input and output packet after passing T.Encaps
1334
1335         :param tx_pkt: transmitted packet
1336         :param rx_pkt: received packet
1337         """
1338         # T.Encaps updates the headers as follows:
1339         # SR Policy seglist (S3, S2, S1)
1340         # SR Policy source C
1341         # IPv6:
1342         # in: IPv6(A, B2)
1343         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(A, B2)
1344         # IPv6 + SRH:
1345         # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1)
1346         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(a, B2)SRH(B3, B2, B1; SL=1)
1347
1348         # get first (outer) IPv6 header of rx'ed packet
1349         rx_ip = rx_pkt.getlayer(IPv6)
1350         rx_srh = None
1351
1352         tx_ip = tx_pkt.getlayer(IPv6)
1353
1354         # expected segment-list
1355         seglist = self.sr_policy.segments
1356         # reverse list to get order as in SRH
1357         tx_seglist = seglist[::-1]
1358
1359         # get source address of SR Policy
1360         sr_policy_source = self.sr_policy.source
1361
1362         # rx'ed packet should have SRH
1363         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1364         # get SRH
1365         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1366
1367         # received ip.src should be equal to SR Policy source
1368         self.assertEqual(rx_ip.src, sr_policy_source)
1369         # received ip.dst should be equal to expected sidlist[lastentry]
1370         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1371         # rx'ed seglist should be equal to expected seglist
1372         self.assertEqual(rx_srh.addresses, tx_seglist)
1373         # segleft should be equal to size expected seglist-1
1374         self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1375         # segleft should be equal to lastentry
1376         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1377
1378         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1379         # except for the hop-limit field
1380         #   -> update tx'ed hlim to the expected hlim
1381         tx_ip.hlim = tx_ip.hlim - 1
1382
1383         self.assertEqual(rx_srh.payload, tx_ip)
1384
1385         self.logger.debug("packet verification: SUCCESS")
1386
1387     def compare_rx_tx_packet_T_Encaps_IPv4(self, tx_pkt, rx_pkt):
1388         """ Compare input and output packet after passing T.Encaps for IPv4
1389
1390         :param tx_pkt: transmitted packet
1391         :param rx_pkt: received packet
1392         """
1393         # T.Encaps for IPv4 updates the headers as follows:
1394         # SR Policy seglist (S3, S2, S1)
1395         # SR Policy source C
1396         # IPv4:
1397         # in: IPv4(A, B2)
1398         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv4(A, B2)
1399
1400         # get first (outer) IPv6 header of rx'ed packet
1401         rx_ip = rx_pkt.getlayer(IPv6)
1402         rx_srh = None
1403
1404         tx_ip = tx_pkt.getlayer(IP)
1405
1406         # expected segment-list
1407         seglist = self.sr_policy.segments
1408         # reverse list to get order as in SRH
1409         tx_seglist = seglist[::-1]
1410
1411         # get source address of SR Policy
1412         sr_policy_source = self.sr_policy.source
1413
1414         # checks common to cases tx with and without SRH
1415         # rx'ed packet should have SRH and IPv4 header
1416         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1417         self.assertTrue(rx_ip.payload.haslayer(IP))
1418         # get SRH
1419         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1420
1421         # received ip.src should be equal to SR Policy source
1422         self.assertEqual(rx_ip.src, sr_policy_source)
1423         # received ip.dst should be equal to sidlist[lastentry]
1424         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1425         # rx'ed seglist should be equal to seglist
1426         self.assertEqual(rx_srh.addresses, tx_seglist)
1427         # segleft should be equal to size seglist-1
1428         self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1429         # segleft should be equal to lastentry
1430         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1431
1432         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1433         # except for the ttl field and ip checksum
1434         #   -> adjust tx'ed ttl to expected ttl
1435         tx_ip.ttl = tx_ip.ttl - 1
1436         #   -> set tx'ed ip checksum to None and let scapy recompute
1437         tx_ip.chksum = None
1438         # read back the pkt (with str()) to force computing these fields
1439         # probably other ways to accomplish this are possible
1440         tx_ip = IP(str(tx_ip))
1441
1442         self.assertEqual(rx_srh.payload, tx_ip)
1443
1444         self.logger.debug("packet verification: SUCCESS")
1445
1446     def compare_rx_tx_packet_T_Encaps_L2(self, tx_pkt, rx_pkt):
1447         """ Compare input and output packet after passing T.Encaps for L2
1448
1449         :param tx_pkt: transmitted packet
1450         :param rx_pkt: received packet
1451         """
1452         # T.Encaps for L2 updates the headers as follows:
1453         # SR Policy seglist (S3, S2, S1)
1454         # SR Policy source C
1455         # L2:
1456         # in: L2
1457         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)L2
1458
1459         # get first (outer) IPv6 header of rx'ed packet
1460         rx_ip = rx_pkt.getlayer(IPv6)
1461         rx_srh = None
1462
1463         tx_ether = tx_pkt.getlayer(Ether)
1464
1465         # expected segment-list
1466         seglist = self.sr_policy.segments
1467         # reverse list to get order as in SRH
1468         tx_seglist = seglist[::-1]
1469
1470         # get source address of SR Policy
1471         sr_policy_source = self.sr_policy.source
1472
1473         # rx'ed packet should have SRH
1474         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1475         # get SRH
1476         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1477
1478         # received ip.src should be equal to SR Policy source
1479         self.assertEqual(rx_ip.src, sr_policy_source)
1480         # received ip.dst should be equal to sidlist[lastentry]
1481         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1482         # rx'ed seglist should be equal to seglist
1483         self.assertEqual(rx_srh.addresses, tx_seglist)
1484         # segleft should be equal to size seglist-1
1485         self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1486         # segleft should be equal to lastentry
1487         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1488         # nh should be "No Next Header" (59)
1489         self.assertEqual(rx_srh.nh, 59)
1490
1491         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1492         self.assertEqual(Ether(str(rx_srh.payload)), tx_ether)
1493
1494         self.logger.debug("packet verification: SUCCESS")
1495
1496     def compare_rx_tx_packet_T_Insert(self, tx_pkt, rx_pkt):
1497         """ Compare input and output packet after passing T.Insert
1498
1499         :param tx_pkt: transmitted packet
1500         :param rx_pkt: received packet
1501         """
1502         # T.Insert updates the headers as follows:
1503         # IPv6:
1504         # in: IPv6(A, B2)
1505         # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)
1506         # IPv6 + SRH:
1507         # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1)
1508         # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)SRH(B3, B2, B1; SL=1)
1509
1510         # get first (outer) IPv6 header of rx'ed packet
1511         rx_ip = rx_pkt.getlayer(IPv6)
1512         rx_srh = None
1513         rx_ip2 = None
1514         rx_srh2 = None
1515         rx_ip3 = None
1516         rx_udp = rx_pkt[UDP]
1517
1518         tx_ip = tx_pkt.getlayer(IPv6)
1519         tx_srh = None
1520         tx_ip2 = None
1521         # some packets have been tx'ed with an SRH, some without it
1522         # get SRH if tx'ed packet has it
1523         if tx_pkt.haslayer(IPv6ExtHdrSegmentRouting):
1524             tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1525             tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1526         tx_udp = tx_pkt[UDP]
1527
1528         # expected segment-list (make copy of SR Policy segment list)
1529         seglist = self.sr_policy.segments[:]
1530         # expected seglist has initial dest addr as last segment
1531         seglist.append(tx_ip.dst)
1532         # reverse list to get order as in SRH
1533         tx_seglist = seglist[::-1]
1534
1535         # get source address of SR Policy
1536         sr_policy_source = self.sr_policy.source
1537
1538         # checks common to cases tx with and without SRH
1539         # rx'ed packet should have SRH and only one IPv6 header
1540         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1541         self.assertFalse(rx_ip.payload.haslayer(IPv6))
1542         # get SRH
1543         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1544
1545         # rx'ed ip.src should be equal to tx'ed ip.src
1546         self.assertEqual(rx_ip.src, tx_ip.src)
1547         # rx'ed ip.dst should be equal to sidlist[lastentry]
1548         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1549
1550         # rx'ed seglist should be equal to expected seglist
1551         self.assertEqual(rx_srh.addresses, tx_seglist)
1552         # segleft should be equal to size(expected seglist)-1
1553         self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1554         # segleft should be equal to lastentry
1555         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1556
1557         if tx_srh:  # packet was tx'ed with SRH
1558             # packet should have 2nd SRH
1559             self.assertTrue(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting))
1560             # get 2nd SRH
1561             rx_srh2 = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting, 2)
1562
1563             # rx'ed srh2.addresses should be equal to tx'ed srh.addresses
1564             self.assertEqual(rx_srh2.addresses, tx_srh.addresses)
1565             # rx'ed srh2.segleft should be equal to tx'ed srh.segleft
1566             self.assertEqual(rx_srh2.segleft, tx_srh.segleft)
1567             # rx'ed srh2.lastentry should be equal to tx'ed srh.lastentry
1568             self.assertEqual(rx_srh2.lastentry, tx_srh.lastentry)
1569
1570         else:  # packet was tx'ed without SRH
1571             # rx packet should have no other SRH
1572             self.assertFalse(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting))
1573
1574         # UDP layer should be unchanged
1575         self.assertEqual(rx_udp, tx_udp)
1576
1577         self.logger.debug("packet verification: SUCCESS")
1578
1579     def compare_rx_tx_packet_End(self, tx_pkt, rx_pkt):
1580         """ Compare input and output packet after passing End (without PSP)
1581
1582         :param tx_pkt: transmitted packet
1583         :param rx_pkt: received packet
1584         """
1585         # End (no PSP) updates the headers as follows:
1586         # IPv6 + SRH:
1587         # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2)
1588         # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1589
1590         # get first (outer) IPv6 header of rx'ed packet
1591         rx_ip = rx_pkt.getlayer(IPv6)
1592         rx_srh = None
1593         rx_ip2 = None
1594         rx_udp = rx_pkt[UDP]
1595
1596         tx_ip = tx_pkt.getlayer(IPv6)
1597         # we know the packet has been tx'ed
1598         # with an inner IPv6 header and an SRH
1599         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1600         tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1601         tx_udp = tx_pkt[UDP]
1602
1603         # common checks, regardless of tx segleft value
1604         # rx'ed packet should have 2nd IPv6 header
1605         self.assertTrue(rx_ip.payload.haslayer(IPv6))
1606         # get second (inner) IPv6 header
1607         rx_ip2 = rx_pkt.getlayer(IPv6, 2)
1608
1609         if tx_ip.segleft > 0:
1610             # SRH should NOT have been popped:
1611             #   End SID without PSP does not pop SRH if segleft>0
1612             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1613             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1614
1615             # received ip.src should be equal to expected ip.src
1616             self.assertEqual(rx_ip.src, tx_ip.src)
1617             # sidlist should be unchanged
1618             self.assertEqual(rx_srh.addresses, tx_srh.addresses)
1619             # segleft should have been decremented
1620             self.assertEqual(rx_srh.segleft, tx_srh.segleft-1)
1621             # received ip.dst should be equal to sidlist[segleft]
1622             self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
1623             # lastentry should be unchanged
1624             self.assertEqual(rx_srh.lastentry, tx_srh.lastentry)
1625             # inner IPv6 packet (ip2) should be unchanged
1626             self.assertEqual(rx_ip2.src, tx_ip2.src)
1627             self.assertEqual(rx_ip2.dst, tx_ip2.dst)
1628         # else:  # tx_ip.segleft == 0
1629             # TODO: Does this work with 2 SRHs in ingress packet?
1630
1631         # UDP layer should be unchanged
1632         self.assertEqual(rx_udp, tx_udp)
1633
1634         self.logger.debug("packet verification: SUCCESS")
1635
1636     def compare_rx_tx_packet_End_PSP(self, tx_pkt, rx_pkt):
1637         """ Compare input and output packet after passing End with PSP
1638
1639         :param tx_pkt: transmitted packet
1640         :param rx_pkt: received packet
1641         """
1642         # End (PSP) updates the headers as follows:
1643         # IPv6 + SRH (SL>1):
1644         # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2)
1645         # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1646         # IPv6 + SRH (SL=1):
1647         # in: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1648         # out: IPv6(A, S3)
1649
1650         # get first (outer) IPv6 header of rx'ed packet
1651         rx_ip = rx_pkt.getlayer(IPv6)
1652         rx_srh = None
1653         rx_ip2 = None
1654         rx_udp = rx_pkt[UDP]
1655
1656         tx_ip = tx_pkt.getlayer(IPv6)
1657         # we know the packet has been tx'ed
1658         # with an inner IPv6 header and an SRH
1659         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1660         tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1661         tx_udp = tx_pkt[UDP]
1662
1663         # common checks, regardless of tx segleft value
1664         self.assertTrue(rx_ip.payload.haslayer(IPv6))
1665         rx_ip2 = rx_pkt.getlayer(IPv6, 2)
1666         # inner IPv6 packet (ip2) should be unchanged
1667         self.assertEqual(rx_ip2.src, tx_ip2.src)
1668         self.assertEqual(rx_ip2.dst, tx_ip2.dst)
1669
1670         if tx_ip.segleft > 1:
1671             # SRH should NOT have been popped:
1672             #   End SID with PSP does not pop SRH if segleft>1
1673             # rx'ed packet should have SRH
1674             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1675             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1676
1677             # received ip.src should be equal to expected ip.src
1678             self.assertEqual(rx_ip.src, tx_ip.src)
1679             # sidlist should be unchanged
1680             self.assertEqual(rx_srh.addresses, tx_srh.addresses)
1681             # segleft should have been decremented
1682             self.assertEqual(rx_srh.segleft, tx_srh.segleft-1)
1683             # received ip.dst should be equal to sidlist[segleft]
1684             self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
1685             # lastentry should be unchanged
1686             self.assertEqual(rx_srh.lastentry, tx_srh.lastentry)
1687
1688         else:  # tx_ip.segleft <= 1
1689             # SRH should have been popped:
1690             #   End SID with PSP and segleft=1 pops SRH
1691             # the two IPv6 headers are still present
1692             # outer IPv6 header has DA == last segment of popped SRH
1693             # SRH should not be present
1694             self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1695             # outer IPv6 header ip.src should be equal to tx'ed ip.src
1696             self.assertEqual(rx_ip.src, tx_ip.src)
1697             # outer IPv6 header ip.dst should be = to tx'ed sidlist[segleft-1]
1698             self.assertEqual(rx_ip.dst, tx_srh.addresses[tx_srh.segleft-1])
1699
1700         # UDP layer should be unchanged
1701         self.assertEqual(rx_udp, tx_udp)
1702
1703         self.logger.debug("packet verification: SUCCESS")
1704
1705     def compare_rx_tx_packet_End_DX6(self, tx_pkt, rx_pkt):
1706         """ Compare input and output packet after passing End.DX6
1707
1708         :param tx_pkt: transmitted packet
1709         :param rx_pkt: received packet
1710         """
1711         # End.DX6 updates the headers as follows:
1712         # IPv6 + SRH (SL=0):
1713         # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv6(B, D)
1714         # out: IPv6(B, D)
1715         # IPv6:
1716         # in: IPv6(A, S3)IPv6(B, D)
1717         # out: IPv6(B, D)
1718
1719         # get first (outer) IPv6 header of rx'ed packet
1720         rx_ip = rx_pkt.getlayer(IPv6)
1721
1722         tx_ip = tx_pkt.getlayer(IPv6)
1723         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1724
1725         # verify if rx'ed packet has no SRH
1726         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1727
1728         # the whole rx_ip pkt should be equal to tx_ip2
1729         # except for the hlim field
1730         #   -> adjust tx'ed hlim to expected hlim
1731         tx_ip2.hlim = tx_ip2.hlim - 1
1732
1733         self.assertEqual(rx_ip, tx_ip2)
1734
1735         self.logger.debug("packet verification: SUCCESS")
1736
1737     def compare_rx_tx_packet_End_DX4(self, tx_pkt, rx_pkt):
1738         """ Compare input and output packet after passing End.DX4
1739
1740         :param tx_pkt: transmitted packet
1741         :param rx_pkt: received packet
1742         """
1743         # End.DX4 updates the headers as follows:
1744         # IPv6 + SRH (SL=0):
1745         # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv4(B, D)
1746         # out: IPv4(B, D)
1747         # IPv6:
1748         # in: IPv6(A, S3)IPv4(B, D)
1749         # out: IPv4(B, D)
1750
1751         # get IPv4 header of rx'ed packet
1752         rx_ip = rx_pkt.getlayer(IP)
1753
1754         tx_ip = tx_pkt.getlayer(IPv6)
1755         tx_ip2 = tx_pkt.getlayer(IP)
1756
1757         # verify if rx'ed packet has no SRH
1758         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1759
1760         # the whole rx_ip pkt should be equal to tx_ip2
1761         # except for the ttl field and ip checksum
1762         #   -> adjust tx'ed ttl to expected ttl
1763         tx_ip2.ttl = tx_ip2.ttl - 1
1764         #   -> set tx'ed ip checksum to None and let scapy recompute
1765         tx_ip2.chksum = None
1766         # read back the pkt (with str()) to force computing these fields
1767         # probably other ways to accomplish this are possible
1768         tx_ip2 = IP(str(tx_ip2))
1769
1770         self.assertEqual(rx_ip, tx_ip2)
1771
1772         self.logger.debug("packet verification: SUCCESS")
1773
1774     def compare_rx_tx_packet_End_DX2(self, tx_pkt, rx_pkt):
1775         """ Compare input and output packet after passing End.DX2
1776
1777         :param tx_pkt: transmitted packet
1778         :param rx_pkt: received packet
1779         """
1780         # End.DX2 updates the headers as follows:
1781         # IPv6 + SRH (SL=0):
1782         # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)L2
1783         # out: L2
1784         # IPv6:
1785         # in: IPv6(A, S3)L2
1786         # out: L2
1787
1788         # get IPv4 header of rx'ed packet
1789         rx_eth = rx_pkt.getlayer(Ether)
1790
1791         tx_ip = tx_pkt.getlayer(IPv6)
1792         # we can't just get the 2nd Ether layer
1793         # get the Raw content and dissect it as Ether
1794         tx_eth1 = Ether(str(tx_pkt[Raw]))
1795
1796         # verify if rx'ed packet has no SRH
1797         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1798
1799         # the whole rx_eth pkt should be equal to tx_eth1
1800         self.assertEqual(rx_eth, tx_eth1)
1801
1802         self.logger.debug("packet verification: SUCCESS")
1803
1804     def create_stream(self, src_if, dst_if, packet_header, packet_sizes,
1805                       count):
1806         """Create SRv6 input packet stream for defined interface.
1807
1808         :param VppInterface src_if: Interface to create packet stream for
1809         :param VppInterface dst_if: destination interface of packet stream
1810         :param packet_header: Layer3 scapy packet headers,
1811                 L2 is added when not provided,
1812                 Raw(payload) with packet_info is added
1813         :param list packet_sizes: packet stream pckt sizes,sequentially applied
1814                to packets in stream have
1815         :param int count: number of packets in packet stream
1816         :return: list of packets
1817         """
1818         self.logger.info("Creating packets")
1819         pkts = []
1820         for i in range(0, count-1):
1821             payload_info = self.create_packet_info(src_if, dst_if)
1822             self.logger.debug(
1823                 "Creating packet with index %d" % (payload_info.index))
1824             payload = self.info_to_payload(payload_info)
1825             # add L2 header if not yet provided in packet_header
1826             if packet_header.getlayer(0).name == 'Ethernet':
1827                 p = (packet_header /
1828                      Raw(payload))
1829             else:
1830                 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
1831                      packet_header /
1832                      Raw(payload))
1833             size = packet_sizes[i % len(packet_sizes)]
1834             self.logger.debug("Packet size %d" % (size))
1835             self.extend_packet(p, size)
1836             # we need to store the packet with the automatic fields computed
1837             # read back the dumped packet (with str())
1838             # to force computing these fields
1839             # probably other ways are possible
1840             p = Ether(str(p))
1841             payload_info.data = p.copy()
1842             self.logger.debug(ppp("Created packet:", p))
1843             pkts.append(p)
1844         self.logger.info("Done creating packets")
1845         return pkts
1846
1847     def send_and_verify_pkts(self, input, pkts, output, compare_func):
1848         """Send packets and verify received packets using compare_func
1849
1850         :param input: ingress interface of DUT
1851         :param pkts: list of packets to transmit
1852         :param output: egress interface of DUT
1853         :param compare_func: function to compare in and out packets
1854         """
1855         # add traffic stream to input interface
1856         input.add_stream(pkts)
1857
1858         # enable capture on all interfaces
1859         self.pg_enable_capture(self.pg_interfaces)
1860
1861         # start traffic
1862         self.logger.info("Starting traffic")
1863         self.pg_start()
1864
1865         # get output capture
1866         self.logger.info("Getting packet capture")
1867         capture = output.get_capture()
1868
1869         # assert nothing was captured on input interface
1870         input.assert_nothing_captured()
1871
1872         # verify captured packets
1873         self.verify_captured_pkts(output, capture, compare_func)
1874
1875     def create_packet_header_IPv6(self, dst):
1876         """Create packet header: IPv6 header, UDP header
1877
1878         :param dst: IPv6 destination address
1879
1880         IPv6 source address is 1234::1
1881         UDP source port and destination port are 1234
1882         """
1883
1884         p = (IPv6(src='1234::1', dst=dst) /
1885              UDP(sport=1234, dport=1234))
1886         return p
1887
1888     def create_packet_header_IPv6_SRH(self, sidlist, segleft):
1889         """Create packet header: IPv6 header with SRH, UDP header
1890
1891         :param list sidlist: segment list
1892         :param int segleft: segments-left field value
1893
1894         IPv6 destination address is set to sidlist[segleft]
1895         IPv6 source addresses are 1234::1 and 4321::1
1896         UDP source port and destination port are 1234
1897         """
1898
1899         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1900              IPv6ExtHdrSegmentRouting(addresses=sidlist) /
1901              UDP(sport=1234, dport=1234))
1902         return p
1903
1904     def create_packet_header_IPv6_SRH_IPv6(self, dst, sidlist, segleft):
1905         """Create packet header: IPv6 encapsulated in SRv6:
1906         IPv6 header with SRH, IPv6 header, UDP header
1907
1908         :param ipv6address dst: inner IPv6 destination address
1909         :param list sidlist: segment list of outer IPv6 SRH
1910         :param int segleft: segments-left field of outer IPv6 SRH
1911
1912         Outer IPv6 destination address is set to sidlist[segleft]
1913         IPv6 source addresses are 1234::1 and 4321::1
1914         UDP source port and destination port are 1234
1915         """
1916
1917         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1918              IPv6ExtHdrSegmentRouting(addresses=sidlist,
1919                                       segleft=segleft, nh=41) /
1920              IPv6(src='4321::1', dst=dst) /
1921              UDP(sport=1234, dport=1234))
1922         return p
1923
1924     def create_packet_header_IPv6_IPv6(self, dst_inner, dst_outer):
1925         """Create packet header: IPv6 encapsulated in IPv6:
1926         IPv6 header, IPv6 header, UDP header
1927
1928         :param ipv6address dst_inner: inner IPv6 destination address
1929         :param ipv6address dst_outer: outer IPv6 destination address
1930
1931         IPv6 source addresses are 1234::1 and 4321::1
1932         UDP source port and destination port are 1234
1933         """
1934
1935         p = (IPv6(src='1234::1', dst=dst_outer) /
1936              IPv6(src='4321::1', dst=dst_inner) /
1937              UDP(sport=1234, dport=1234))
1938         return p
1939
1940     def create_packet_header_IPv6_SRH_SRH_IPv6(self, dst, sidlist1, segleft1,
1941                                                sidlist2, segleft2):
1942         """Create packet header: IPv6 encapsulated in SRv6 with 2 SRH:
1943         IPv6 header with SRH, 2nd SRH, IPv6 header, UDP header
1944
1945         :param ipv6address dst: inner IPv6 destination address
1946         :param list sidlist1: segment list of outer IPv6 SRH
1947         :param int segleft1: segments-left field of outer IPv6 SRH
1948         :param list sidlist2: segment list of inner IPv6 SRH
1949         :param int segleft2: segments-left field of inner IPv6 SRH
1950
1951         Outer IPv6 destination address is set to sidlist[segleft]
1952         IPv6 source addresses are 1234::1 and 4321::1
1953         UDP source port and destination port are 1234
1954         """
1955
1956         p = (IPv6(src='1234::1', dst=sidlist1[segleft1]) /
1957              IPv6ExtHdrSegmentRouting(addresses=sidlist1,
1958                                       segleft=segleft1, nh=43) /
1959              IPv6ExtHdrSegmentRouting(addresses=sidlist2,
1960                                       segleft=segleft2, nh=41) /
1961              IPv6(src='4321::1', dst=dst) /
1962              UDP(sport=1234, dport=1234))
1963         return p
1964
1965     def create_packet_header_IPv4(self, dst):
1966         """Create packet header: IPv4 header, UDP header
1967
1968         :param dst: IPv4 destination address
1969
1970         IPv4 source address is 123.1.1.1
1971         UDP source port and destination port are 1234
1972         """
1973
1974         p = (IP(src='123.1.1.1', dst=dst) /
1975              UDP(sport=1234, dport=1234))
1976         return p
1977
1978     def create_packet_header_IPv6_IPv4(self, dst_inner, dst_outer):
1979         """Create packet header: IPv4 encapsulated in IPv6:
1980         IPv6 header, IPv4 header, UDP header
1981
1982         :param ipv4address dst_inner: inner IPv4 destination address
1983         :param ipv6address dst_outer: outer IPv6 destination address
1984
1985         IPv6 source address is 1234::1
1986         IPv4 source address is 123.1.1.1
1987         UDP source port and destination port are 1234
1988         """
1989
1990         p = (IPv6(src='1234::1', dst=dst_outer) /
1991              IP(src='123.1.1.1', dst=dst_inner) /
1992              UDP(sport=1234, dport=1234))
1993         return p
1994
1995     def create_packet_header_IPv6_SRH_IPv4(self, dst, sidlist, segleft):
1996         """Create packet header: IPv4 encapsulated in SRv6:
1997         IPv6 header with SRH, IPv4 header, UDP header
1998
1999         :param ipv4address dst: inner IPv4 destination address
2000         :param list sidlist: segment list of outer IPv6 SRH
2001         :param int segleft: segments-left field of outer IPv6 SRH
2002
2003         Outer IPv6 destination address is set to sidlist[segleft]
2004         IPv6 source address is 1234::1
2005         IPv4 source address is 123.1.1.1
2006         UDP source port and destination port are 1234
2007         """
2008
2009         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
2010              IPv6ExtHdrSegmentRouting(addresses=sidlist,
2011                                       segleft=segleft, nh=4) /
2012              IP(src='123.1.1.1', dst=dst) /
2013              UDP(sport=1234, dport=1234))
2014         return p
2015
2016     def create_packet_header_L2(self, vlan=0):
2017         """Create packet header: L2 header
2018
2019         :param vlan: if vlan!=0 then add 802.1q header
2020         """
2021         # Note: the dst addr ('00:55:44:33:22:11') is used in
2022         # the compare function compare_rx_tx_packet_T_Encaps_L2
2023         # to detect presence of L2 in SRH payload
2024         p = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
2025         etype = 0x8137  # IPX
2026         if vlan:
2027             # add 802.1q layer
2028             p /= Dot1Q(vlan=vlan, type=etype)
2029         else:
2030             p.type = etype
2031         return p
2032
2033     def create_packet_header_IPv6_SRH_L2(self, sidlist, segleft, vlan=0):
2034         """Create packet header: L2 encapsulated in SRv6:
2035         IPv6 header with SRH, L2
2036
2037         :param list sidlist: segment list of outer IPv6 SRH
2038         :param int segleft: segments-left field of outer IPv6 SRH
2039         :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
2040
2041         Outer IPv6 destination address is set to sidlist[segleft]
2042         IPv6 source address is 1234::1
2043         """
2044         eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
2045         etype = 0x8137  # IPX
2046         if vlan:
2047             # add 802.1q layer
2048             eth /= Dot1Q(vlan=vlan, type=etype)
2049         else:
2050             eth.type = etype
2051
2052         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
2053              IPv6ExtHdrSegmentRouting(addresses=sidlist,
2054                                       segleft=segleft, nh=59) /
2055              eth)
2056         return p
2057
2058     def create_packet_header_IPv6_L2(self, dst_outer, vlan=0):
2059         """Create packet header: L2 encapsulated in IPv6:
2060         IPv6 header, L2
2061
2062         :param ipv6address dst_outer: outer IPv6 destination address
2063         :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
2064         """
2065         eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
2066         etype = 0x8137  # IPX
2067         if vlan:
2068             # add 802.1q layer
2069             eth /= Dot1Q(vlan=vlan, type=etype)
2070         else:
2071             eth.type = etype
2072
2073         p = (IPv6(src='1234::1', dst=dst_outer, nh=59) / eth)
2074         return p
2075
2076     def get_payload_info(self, packet):
2077         """ Extract the payload_info from the packet
2078         """
2079         # in most cases, payload_info is in packet[Raw]
2080         # but packet[Raw] gives the complete payload
2081         # (incl L2 header) for the T.Encaps L2 case
2082         try:
2083             payload_info = self.payload_to_info(str(packet[Raw]))
2084
2085         except:
2086             # remote L2 header from packet[Raw]:
2087             # take packet[Raw], convert it to an Ether layer
2088             # and then extract Raw from it
2089             payload_info = self.payload_to_info(
2090                 str(Ether(str(packet[Raw]))[Raw]))
2091
2092         return payload_info
2093
2094     def verify_captured_pkts(self, dst_if, capture, compare_func):
2095         """
2096         Verify captured packet stream for specified interface.
2097         Compare ingress with egress packets using the specified compare fn
2098
2099         :param dst_if: egress interface of DUT
2100         :param capture: captured packets
2101         :param compare_func: function to compare in and out packet
2102         """
2103         self.logger.info("Verifying capture on interface %s using function %s"
2104                          % (dst_if.name, compare_func.func_name))
2105
2106         last_info = dict()
2107         for i in self.pg_interfaces:
2108             last_info[i.sw_if_index] = None
2109         dst_sw_if_index = dst_if.sw_if_index
2110
2111         for packet in capture:
2112             try:
2113                 # extract payload_info from packet's payload
2114                 payload_info = self.get_payload_info(packet)
2115                 packet_index = payload_info.index
2116
2117                 self.logger.debug("Verifying packet with index %d"
2118                                   % (packet_index))
2119                 # packet should have arrived on the expected interface
2120                 self.assertEqual(payload_info.dst, dst_sw_if_index)
2121                 self.logger.debug(
2122                     "Got packet on interface %s: src=%u (idx=%u)" %
2123                     (dst_if.name, payload_info.src, packet_index))
2124
2125                 # search for payload_info with same src and dst if_index
2126                 # this will give us the transmitted packet
2127                 next_info = self.get_next_packet_info_for_interface2(
2128                     payload_info.src, dst_sw_if_index,
2129                     last_info[payload_info.src])
2130                 last_info[payload_info.src] = next_info
2131                 # next_info should not be None
2132                 self.assertTrue(next_info is not None)
2133                 # index of tx and rx packets should be equal
2134                 self.assertEqual(packet_index, next_info.index)
2135                 # data field of next_info contains the tx packet
2136                 txed_packet = next_info.data
2137
2138                 self.logger.debug(ppp("Transmitted packet:",
2139                                       txed_packet))  # ppp=Pretty Print Packet
2140
2141                 self.logger.debug(ppp("Received packet:", packet))
2142
2143                 # compare rcvd packet with expected packet using compare_func
2144                 compare_func(txed_packet, packet)
2145
2146             except:
2147                 print packet.command()
2148                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2149                 raise
2150
2151         # have all expected packets arrived?
2152         for i in self.pg_interfaces:
2153             remaining_packet = self.get_next_packet_info_for_interface2(
2154                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
2155             self.assertTrue(remaining_packet is None,
2156                             "Interface %s: Packet expected from interface %s "
2157                             "didn't arrive" % (dst_if.name, i.name))
2158
2159
2160 if __name__ == '__main__':
2161     unittest.main(testRunner=VppTestRunner)