IP table bind allowed only if table exists
[vpp.git] / test / test_srv6.py
1 #!/usr/bin/env python
2
3 import unittest
4 import binascii
5 from socket import AF_INET6
6
7 from framework import VppTestCase, VppTestRunner
8 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto, VppIpTable
9 from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \
10     SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes
11
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether, Dot1Q
14 from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
15 from scapy.layers.inet import IP, UDP
16
17 from scapy.utils import inet_pton, inet_ntop
18
19 from util import ppp
20
21
22 class TestSRv6(VppTestCase):
23     """ SRv6 Test Case """
24
25     @classmethod
26     def setUpClass(self):
27         super(TestSRv6, self).setUpClass()
28
29     def setUp(self):
30         """ Perform test setup before each test case.
31         """
32         super(TestSRv6, self).setUp()
33
34         # packet sizes, inclusive L2 overhead
35         self.pg_packet_sizes = [64, 512, 1518, 9018]
36
37         # reset packet_infos
38         self.reset_packet_infos()
39
40     def tearDown(self):
41         """ Clean up test setup after each test case.
42         """
43         self.teardown_interfaces()
44
45         super(TestSRv6, self).tearDown()
46
47     def configure_interface(self,
48                             interface,
49                             ipv6=False, ipv4=False,
50                             ipv6_table_id=0, ipv4_table_id=0):
51         """ Configure interface.
52         :param ipv6: configure IPv6 on interface
53         :param ipv4: configure IPv4 on interface
54         :param ipv6_table_id: FIB table_id for IPv6
55         :param ipv4_table_id: FIB table_id for IPv4
56         """
57         self.logger.debug("Configuring interface %s" % (interface.name))
58         if ipv6:
59             self.logger.debug("Configuring IPv6")
60             interface.set_table_ip6(ipv6_table_id)
61             interface.config_ip6()
62             interface.resolve_ndp(timeout=5)
63         if ipv4:
64             self.logger.debug("Configuring IPv4")
65             interface.set_table_ip4(ipv4_table_id)
66             interface.config_ip4()
67             interface.resolve_arp()
68         interface.admin_up()
69
70     def setup_interfaces(self, ipv6=[], ipv4=[],
71                          ipv6_table_id=[], ipv4_table_id=[]):
72         """ Create and configure interfaces.
73
74         :param ipv6: list of interface IPv6 capabilities
75         :param ipv4: list of interface IPv4 capabilities
76         :param ipv6_table_id: list of intf IPv6 FIB table_ids
77         :param ipv4_table_id: list of intf IPv4 FIB table_ids
78         :returns: List of created interfaces.
79         """
80         # how many interfaces?
81         if len(ipv6):
82             count = len(ipv6)
83         else:
84             count = len(ipv4)
85         self.logger.debug("Creating and configuring %d interfaces" % (count))
86
87         # fill up ipv6 and ipv4 lists if needed
88         # not enabled (False) is the default
89         if len(ipv6) < count:
90             ipv6 += (count - len(ipv6)) * [False]
91         if len(ipv4) < count:
92             ipv4 += (count - len(ipv4)) * [False]
93
94         # fill up table_id lists if needed
95         # table_id 0 (global) is the default
96         if len(ipv6_table_id) < count:
97             ipv6_table_id += (count - len(ipv6_table_id)) * [0]
98         if len(ipv4_table_id) < count:
99             ipv4_table_id += (count - len(ipv4_table_id)) * [0]
100
101         # create 'count' pg interfaces
102         self.create_pg_interfaces(range(count))
103
104         # setup all interfaces
105         for i in range(count):
106             intf = self.pg_interfaces[i]
107             self.configure_interface(intf,
108                                      ipv6[i], ipv4[i],
109                                      ipv6_table_id[i], ipv4_table_id[i])
110
111         if any(ipv6):
112             self.logger.debug(self.vapi.cli("show ip6 neighbors"))
113         if any(ipv4):
114             self.logger.debug(self.vapi.cli("show ip arp"))
115         self.logger.debug(self.vapi.cli("show interface"))
116         self.logger.debug(self.vapi.cli("show hardware"))
117
118         return self.pg_interfaces
119
120     def teardown_interfaces(self):
121         """ Unconfigure and bring down interface.
122         """
123         self.logger.debug("Tearing down interfaces")
124         # tear down all interfaces
125         # AFAIK they cannot be deleted
126         for i in self.pg_interfaces:
127             self.logger.debug("Tear down interface %s" % (i.name))
128             i.admin_down()
129             i.unconfig()
130             i.set_table_ip4(0)
131             i.set_table_ip6(0)
132
133     @unittest.skipUnless(0, "PC to fix")
134     def test_SRv6_T_Encaps(self):
135         """ Test SRv6 Transit.Encaps behavior for IPv6.
136         """
137         # send traffic to one destination interface
138         # source and destination are IPv6 only
139         self.setup_interfaces(ipv6=[True, True])
140
141         # configure FIB entries
142         route = VppIpRoute(self, "a4::", 64,
143                            [VppRoutePath(self.pg1.remote_ip6,
144                                          self.pg1.sw_if_index,
145                                          proto=DpoProto.DPO_PROTO_IP6)],
146                            is_ip6=1)
147         route.add_vpp_config()
148
149         # configure encaps IPv6 source address
150         # needs to be done before SR Policy config
151         # TODO: API?
152         self.vapi.cli("set sr encaps source addr a3::")
153
154         bsid = 'a3::9999:1'
155         # configure SRv6 Policy
156         # Note: segment list order: first -> last
157         sr_policy = VppSRv6Policy(
158             self, bsid=bsid,
159             is_encap=1,
160             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
161             weight=1, fib_table=0,
162             segments=['a4::', 'a5::', 'a6::c7'],
163             source='a3::')
164         sr_policy.add_vpp_config()
165         self.sr_policy = sr_policy
166
167         # log the sr policies
168         self.logger.info(self.vapi.cli("show sr policies"))
169
170         # steer IPv6 traffic to a7::/64 into SRv6 Policy
171         # use the bsid of the above self.sr_policy
172         pol_steering = VppSRv6Steering(
173                         self,
174                         bsid=self.sr_policy.bsid,
175                         prefix="a7::", mask_width=64,
176                         traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
177                         sr_policy_index=0, table_id=0,
178                         sw_if_index=0)
179         pol_steering.add_vpp_config()
180
181         # log the sr steering policies
182         self.logger.info(self.vapi.cli("show sr steering policies"))
183
184         # create packets
185         count = len(self.pg_packet_sizes)
186         dst_inner = 'a7::1234'
187         pkts = []
188
189         # create IPv6 packets without SRH
190         packet_header = self.create_packet_header_IPv6(dst_inner)
191         # create traffic stream pg0->pg1
192         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
193                                        self.pg_packet_sizes, count))
194
195         # create IPv6 packets with SRH
196         # packets with segments-left 1, active segment a7::
197         packet_header = self.create_packet_header_IPv6_SRH(
198             sidlist=['a8::', 'a7::', 'a6::'],
199             segleft=1)
200         # create traffic stream pg0->pg1
201         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
202                                        self.pg_packet_sizes, count))
203
204         # create IPv6 packets with SRH and IPv6
205         # packets with segments-left 1, active segment a7::
206         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
207             dst_inner,
208             sidlist=['a8::', 'a7::', 'a6::'],
209             segleft=1)
210         # create traffic stream pg0->pg1
211         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
212                                        self.pg_packet_sizes, count))
213
214         # send packets and verify received packets
215         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
216                                   self.compare_rx_tx_packet_T_Encaps)
217
218         # log the localsid counters
219         self.logger.info(self.vapi.cli("show sr localsid"))
220
221         # remove SR steering
222         pol_steering.remove_vpp_config()
223         self.logger.info(self.vapi.cli("show sr steering policies"))
224
225         # remove SR Policies
226         self.sr_policy.remove_vpp_config()
227         self.logger.info(self.vapi.cli("show sr policies"))
228
229         # remove FIB entries
230         # done by tearDown
231
232         # cleanup interfaces
233         self.teardown_interfaces()
234
235     @unittest.skipUnless(0, "PC to fix")
236     def test_SRv6_T_Insert(self):
237         """ Test SRv6 Transit.Insert behavior (IPv6 only).
238         """
239         # send traffic to one destination interface
240         # source and destination are IPv6 only
241         self.setup_interfaces(ipv6=[True, True])
242
243         # configure FIB entries
244         route = VppIpRoute(self, "a4::", 64,
245                            [VppRoutePath(self.pg1.remote_ip6,
246                                          self.pg1.sw_if_index,
247                                          proto=DpoProto.DPO_PROTO_IP6)],
248                            is_ip6=1)
249         route.add_vpp_config()
250
251         # configure encaps IPv6 source address
252         # needs to be done before SR Policy config
253         # TODO: API?
254         self.vapi.cli("set sr encaps source addr a3::")
255
256         bsid = 'a3::9999:1'
257         # configure SRv6 Policy
258         # Note: segment list order: first -> last
259         sr_policy = VppSRv6Policy(
260             self, bsid=bsid,
261             is_encap=0,
262             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
263             weight=1, fib_table=0,
264             segments=['a4::', 'a5::', 'a6::c7'],
265             source='a3::')
266         sr_policy.add_vpp_config()
267         self.sr_policy = sr_policy
268
269         # log the sr policies
270         self.logger.info(self.vapi.cli("show sr policies"))
271
272         # steer IPv6 traffic to a7::/64 into SRv6 Policy
273         # use the bsid of the above self.sr_policy
274         pol_steering = VppSRv6Steering(
275                         self,
276                         bsid=self.sr_policy.bsid,
277                         prefix="a7::", mask_width=64,
278                         traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
279                         sr_policy_index=0, table_id=0,
280                         sw_if_index=0)
281         pol_steering.add_vpp_config()
282
283         # log the sr steering policies
284         self.logger.info(self.vapi.cli("show sr steering policies"))
285
286         # create packets
287         count = len(self.pg_packet_sizes)
288         dst_inner = 'a7::1234'
289         pkts = []
290
291         # create IPv6 packets without SRH
292         packet_header = self.create_packet_header_IPv6(dst_inner)
293         # create traffic stream pg0->pg1
294         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
295                                        self.pg_packet_sizes, count))
296
297         # create IPv6 packets with SRH
298         # packets with segments-left 1, active segment a7::
299         packet_header = self.create_packet_header_IPv6_SRH(
300             sidlist=['a8::', 'a7::', 'a6::'],
301             segleft=1)
302         # create traffic stream pg0->pg1
303         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
304                                        self.pg_packet_sizes, count))
305
306         # send packets and verify received packets
307         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
308                                   self.compare_rx_tx_packet_T_Insert)
309
310         # log the localsid counters
311         self.logger.info(self.vapi.cli("show sr localsid"))
312
313         # remove SR steering
314         pol_steering.remove_vpp_config()
315         self.logger.info(self.vapi.cli("show sr steering policies"))
316
317         # remove SR Policies
318         self.sr_policy.remove_vpp_config()
319         self.logger.info(self.vapi.cli("show sr policies"))
320
321         # remove FIB entries
322         # done by tearDown
323
324         # cleanup interfaces
325         self.teardown_interfaces()
326
327     @unittest.skipUnless(0, "PC to fix")
328     def test_SRv6_T_Encaps_IPv4(self):
329         """ Test SRv6 Transit.Encaps behavior for IPv4.
330         """
331         # send traffic to one destination interface
332         # source interface is IPv4 only
333         # destination interface is IPv6 only
334         self.setup_interfaces(ipv6=[False, True], ipv4=[True, False])
335
336         # configure FIB entries
337         route = VppIpRoute(self, "a4::", 64,
338                            [VppRoutePath(self.pg1.remote_ip6,
339                                          self.pg1.sw_if_index,
340                                          proto=DpoProto.DPO_PROTO_IP6)],
341                            is_ip6=1)
342         route.add_vpp_config()
343
344         # configure encaps IPv6 source address
345         # needs to be done before SR Policy config
346         # TODO: API?
347         self.vapi.cli("set sr encaps source addr a3::")
348
349         bsid = 'a3::9999:1'
350         # configure SRv6 Policy
351         # Note: segment list order: first -> last
352         sr_policy = VppSRv6Policy(
353             self, bsid=bsid,
354             is_encap=1,
355             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
356             weight=1, fib_table=0,
357             segments=['a4::', 'a5::', 'a6::c7'],
358             source='a3::')
359         sr_policy.add_vpp_config()
360         self.sr_policy = sr_policy
361
362         # log the sr policies
363         self.logger.info(self.vapi.cli("show sr policies"))
364
365         # steer IPv4 traffic to 7.1.1.0/24 into SRv6 Policy
366         # use the bsid of the above self.sr_policy
367         pol_steering = VppSRv6Steering(
368                         self,
369                         bsid=self.sr_policy.bsid,
370                         prefix="7.1.1.0", mask_width=24,
371                         traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV4,
372                         sr_policy_index=0, table_id=0,
373                         sw_if_index=0)
374         pol_steering.add_vpp_config()
375
376         # log the sr steering policies
377         self.logger.info(self.vapi.cli("show sr steering policies"))
378
379         # create packets
380         count = len(self.pg_packet_sizes)
381         dst_inner = '7.1.1.123'
382         pkts = []
383
384         # create IPv4 packets
385         packet_header = self.create_packet_header_IPv4(dst_inner)
386         # create traffic stream pg0->pg1
387         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
388                                        self.pg_packet_sizes, count))
389
390         # send packets and verify received packets
391         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
392                                   self.compare_rx_tx_packet_T_Encaps_IPv4)
393
394         # log the localsid counters
395         self.logger.info(self.vapi.cli("show sr localsid"))
396
397         # remove SR steering
398         pol_steering.remove_vpp_config()
399         self.logger.info(self.vapi.cli("show sr steering policies"))
400
401         # remove SR Policies
402         self.sr_policy.remove_vpp_config()
403         self.logger.info(self.vapi.cli("show sr policies"))
404
405         # remove FIB entries
406         # done by tearDown
407
408         # cleanup interfaces
409         self.teardown_interfaces()
410
411     @unittest.skip("VPP crashes after running this test")
412     def test_SRv6_T_Encaps_L2(self):
413         """ Test SRv6 Transit.Encaps behavior for L2.
414         """
415         # send traffic to one destination interface
416         # source interface is IPv4 only TODO?
417         # destination interface is IPv6 only
418         self.setup_interfaces(ipv6=[False, True], ipv4=[False, False])
419
420         # configure FIB entries
421         route = VppIpRoute(self, "a4::", 64,
422                            [VppRoutePath(self.pg1.remote_ip6,
423                                          self.pg1.sw_if_index,
424                                          proto=DpoProto.DPO_PROTO_IP6)],
425                            is_ip6=1)
426         route.add_vpp_config()
427
428         # configure encaps IPv6 source address
429         # needs to be done before SR Policy config
430         # TODO: API?
431         self.vapi.cli("set sr encaps source addr a3::")
432
433         bsid = 'a3::9999:1'
434         # configure SRv6 Policy
435         # Note: segment list order: first -> last
436         sr_policy = VppSRv6Policy(
437             self, bsid=bsid,
438             is_encap=1,
439             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
440             weight=1, fib_table=0,
441             segments=['a4::', 'a5::', 'a6::c7'],
442             source='a3::')
443         sr_policy.add_vpp_config()
444         self.sr_policy = sr_policy
445
446         # log the sr policies
447         self.logger.info(self.vapi.cli("show sr policies"))
448
449         # steer L2 traffic into SRv6 Policy
450         # use the bsid of the above self.sr_policy
451         pol_steering = VppSRv6Steering(
452                         self,
453                         bsid=self.sr_policy.bsid,
454                         prefix="::", mask_width=0,
455                         traffic_type=SRv6PolicySteeringTypes.SR_STEER_L2,
456                         sr_policy_index=0, table_id=0,
457                         sw_if_index=self.pg0.sw_if_index)
458         pol_steering.add_vpp_config()
459
460         # log the sr steering policies
461         self.logger.info(self.vapi.cli("show sr steering policies"))
462
463         # create packets
464         count = len(self.pg_packet_sizes)
465         pkts = []
466
467         # create L2 packets without dot1q header
468         packet_header = self.create_packet_header_L2()
469         # create traffic stream pg0->pg1
470         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
471                                        self.pg_packet_sizes, count))
472
473         # create L2 packets with dot1q header
474         packet_header = self.create_packet_header_L2(vlan=123)
475         # create traffic stream pg0->pg1
476         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
477                                        self.pg_packet_sizes, count))
478
479         # send packets and verify received packets
480         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
481                                   self.compare_rx_tx_packet_T_Encaps_L2)
482
483         # log the localsid counters
484         self.logger.info(self.vapi.cli("show sr localsid"))
485
486         # remove SR steering
487         pol_steering.remove_vpp_config()
488         self.logger.info(self.vapi.cli("show sr steering policies"))
489
490         # remove SR Policies
491         self.sr_policy.remove_vpp_config()
492         self.logger.info(self.vapi.cli("show sr policies"))
493
494         # remove FIB entries
495         # done by tearDown
496
497         # cleanup interfaces
498         self.teardown_interfaces()
499
500     def test_SRv6_End(self):
501         """ Test SRv6 End (without PSP) behavior.
502         """
503         # send traffic to one destination interface
504         # source and destination interfaces are IPv6 only
505         self.setup_interfaces(ipv6=[True, True])
506
507         # configure FIB entries
508         route = VppIpRoute(self, "a4::", 64,
509                            [VppRoutePath(self.pg1.remote_ip6,
510                                          self.pg1.sw_if_index,
511                                          proto=DpoProto.DPO_PROTO_IP6)],
512                            is_ip6=1)
513         route.add_vpp_config()
514
515         # configure SRv6 localSID End without PSP behavior
516         localsid = VppSRv6LocalSID(
517                         self, localsid_addr='A3::0',
518                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
519                         nh_addr='::',
520                         end_psp=0,
521                         sw_if_index=0,
522                         vlan_index=0,
523                         fib_table=0)
524         localsid.add_vpp_config()
525         # log the localsids
526         self.logger.debug(self.vapi.cli("show sr localsid"))
527
528         # create IPv6 packets with SRH (SL=2, SL=1, SL=0)
529         # send one packet per SL value per packet size
530         # SL=0 packet with localSID End with USP needs 2nd SRH
531         count = len(self.pg_packet_sizes)
532         dst_inner = 'a4::1234'
533         pkts = []
534
535         # packets with segments-left 2, active segment a3::
536         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
537                 dst_inner,
538                 sidlist=['a5::', 'a4::', 'a3::'],
539                 segleft=2)
540         # create traffic stream pg0->pg1
541         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
542                                        self.pg_packet_sizes, count))
543
544         # packets with segments-left 1, active segment a3::
545         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
546                 dst_inner,
547                 sidlist=['a4::', 'a3::', 'a2::'],
548                 segleft=1)
549         # add to traffic stream pg0->pg1
550         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
551                                        self.pg_packet_sizes, count))
552
553         # TODO: test behavior with SL=0 packet (needs 2*SRH?)
554
555         # send packets and verify received packets
556         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
557                                   self.compare_rx_tx_packet_End)
558
559         # log the localsid counters
560         self.logger.info(self.vapi.cli("show sr localsid"))
561
562         # remove SRv6 localSIDs
563         localsid.remove_vpp_config()
564
565         # remove FIB entries
566         # done by tearDown
567
568         # cleanup interfaces
569         self.teardown_interfaces()
570
571     def test_SRv6_End_with_PSP(self):
572         """ Test SRv6 End with PSP behavior.
573         """
574         # send traffic to one destination interface
575         # source and destination interfaces are IPv6 only
576         self.setup_interfaces(ipv6=[True, True])
577
578         # configure FIB entries
579         route = VppIpRoute(self, "a4::", 64,
580                            [VppRoutePath(self.pg1.remote_ip6,
581                                          self.pg1.sw_if_index,
582                                          proto=DpoProto.DPO_PROTO_IP6)],
583                            is_ip6=1)
584         route.add_vpp_config()
585
586         # configure SRv6 localSID End with PSP behavior
587         localsid = VppSRv6LocalSID(
588                         self, localsid_addr='A3::0',
589                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
590                         nh_addr='::',
591                         end_psp=1,
592                         sw_if_index=0,
593                         vlan_index=0,
594                         fib_table=0)
595         localsid.add_vpp_config()
596         # log the localsids
597         self.logger.debug(self.vapi.cli("show sr localsid"))
598
599         # create IPv6 packets with SRH (SL=2, SL=1)
600         # send one packet per SL value per packet size
601         # SL=0 packet with localSID End with PSP is dropped
602         count = len(self.pg_packet_sizes)
603         dst_inner = 'a4::1234'
604         pkts = []
605
606         # packets with segments-left 2, active segment a3::
607         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
608                     dst_inner,
609                     sidlist=['a5::', 'a4::', 'a3::'],
610                     segleft=2)
611         # create traffic stream pg0->pg1
612         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
613                                        self.pg_packet_sizes, count))
614
615         # packets with segments-left 1, active segment a3::
616         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
617                     dst_inner,
618                     sidlist=['a4::', 'a3::', 'a2::'],
619                     segleft=1)
620         # add to traffic stream pg0->pg1
621         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
622                                        self.pg_packet_sizes, count))
623
624         # send packets and verify received packets
625         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
626                                   self.compare_rx_tx_packet_End_PSP)
627
628         # log the localsid counters
629         self.logger.info(self.vapi.cli("show sr localsid"))
630
631         # remove SRv6 localSIDs
632         localsid.remove_vpp_config()
633
634         # remove FIB entries
635         # done by tearDown
636
637         # cleanup interfaces
638         self.teardown_interfaces()
639
640     def test_SRv6_End_X(self):
641         """ Test SRv6 End.X (without PSP) behavior.
642         """
643         # create three interfaces (1 source, 2 destinations)
644         # source and destination interfaces are IPv6 only
645         self.setup_interfaces(ipv6=[True, True, True])
646
647         # configure FIB entries
648         # a4::/64 via pg1 and pg2
649         route = VppIpRoute(self, "a4::", 64,
650                            [VppRoutePath(self.pg1.remote_ip6,
651                                          self.pg1.sw_if_index,
652                                          proto=DpoProto.DPO_PROTO_IP6),
653                             VppRoutePath(self.pg2.remote_ip6,
654                                          self.pg2.sw_if_index,
655                                          proto=DpoProto.DPO_PROTO_IP6)],
656                            is_ip6=1)
657         route.add_vpp_config()
658         self.logger.debug(self.vapi.cli("show ip6 fib"))
659
660         # configure SRv6 localSID End.X without PSP behavior
661         # End.X points to interface pg1
662         localsid = VppSRv6LocalSID(
663                         self, localsid_addr='A3::C4',
664                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
665                         nh_addr=self.pg1.remote_ip6,
666                         end_psp=0,
667                         sw_if_index=self.pg1.sw_if_index,
668                         vlan_index=0,
669                         fib_table=0)
670         localsid.add_vpp_config()
671         # log the localsids
672         self.logger.debug(self.vapi.cli("show sr localsid"))
673
674         # create IPv6 packets with SRH (SL=2, SL=1)
675         # send one packet per SL value per packet size
676         # SL=0 packet with localSID End with PSP is dropped
677         count = len(self.pg_packet_sizes)
678         dst_inner = 'a4::1234'
679         pkts = []
680
681         # packets with segments-left 2, active segment a3::c4
682         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
683                     dst_inner,
684                     sidlist=['a5::', 'a4::', 'a3::c4'],
685                     segleft=2)
686         # create traffic stream pg0->pg1
687         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
688                                        self.pg_packet_sizes, count))
689
690         # packets with segments-left 1, active segment a3::c4
691         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
692                     dst_inner,
693                     sidlist=['a4::', 'a3::c4', 'a2::'],
694                     segleft=1)
695         # add to traffic stream pg0->pg1
696         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
697                                        self.pg_packet_sizes, count))
698
699         # send packets and verify received packets
700         # using same comparison function as End (no PSP)
701         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
702                                   self.compare_rx_tx_packet_End)
703
704         # assert nothing was received on the other interface (pg2)
705         self.pg2.assert_nothing_captured("mis-directed packet(s)")
706
707         # log the localsid counters
708         self.logger.info(self.vapi.cli("show sr localsid"))
709
710         # remove SRv6 localSIDs
711         localsid.remove_vpp_config()
712
713         # remove FIB entries
714         # done by tearDown
715
716         # cleanup interfaces
717         self.teardown_interfaces()
718
719     def test_SRv6_End_X_with_PSP(self):
720         """ Test SRv6 End.X with PSP behavior.
721         """
722         # create three interfaces (1 source, 2 destinations)
723         # source and destination interfaces are IPv6 only
724         self.setup_interfaces(ipv6=[True, True, True])
725
726         # configure FIB entries
727         # a4::/64 via pg1 and pg2
728         route = VppIpRoute(self, "a4::", 64,
729                            [VppRoutePath(self.pg1.remote_ip6,
730                                          self.pg1.sw_if_index,
731                                          proto=DpoProto.DPO_PROTO_IP6),
732                             VppRoutePath(self.pg2.remote_ip6,
733                                          self.pg2.sw_if_index,
734                                          proto=DpoProto.DPO_PROTO_IP6)],
735                            is_ip6=1)
736         route.add_vpp_config()
737
738         # configure SRv6 localSID End with PSP behavior
739         localsid = VppSRv6LocalSID(
740                         self, localsid_addr='A3::C4',
741                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
742                         nh_addr=self.pg1.remote_ip6,
743                         end_psp=1,
744                         sw_if_index=self.pg1.sw_if_index,
745                         vlan_index=0,
746                         fib_table=0)
747         localsid.add_vpp_config()
748         # log the localsids
749         self.logger.debug(self.vapi.cli("show sr localsid"))
750
751         # create IPv6 packets with SRH (SL=2, SL=1)
752         # send one packet per SL value per packet size
753         # SL=0 packet with localSID End with PSP is dropped
754         count = len(self.pg_packet_sizes)
755         dst_inner = 'a4::1234'
756         pkts = []
757
758         # packets with segments-left 2, active segment a3::
759         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
760                     dst_inner,
761                     sidlist=['a5::', 'a4::', 'a3::c4'],
762                     segleft=2)
763         # create traffic stream pg0->pg1
764         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
765                                        self.pg_packet_sizes, count))
766
767         # packets with segments-left 1, active segment a3::
768         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
769                     dst_inner,
770                     sidlist=['a4::', 'a3::c4', 'a2::'],
771                     segleft=1)
772         # add to traffic stream pg0->pg1
773         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
774                                        self.pg_packet_sizes, count))
775
776         # send packets and verify received packets
777         # using same comparison function as End with PSP
778         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
779                                   self.compare_rx_tx_packet_End_PSP)
780
781         # assert nothing was received on the other interface (pg2)
782         self.pg2.assert_nothing_captured("mis-directed packet(s)")
783
784         # log the localsid counters
785         self.logger.info(self.vapi.cli("show sr localsid"))
786
787         # remove SRv6 localSIDs
788         localsid.remove_vpp_config()
789
790         # remove FIB entries
791         # done by tearDown
792
793         # cleanup interfaces
794         self.teardown_interfaces()
795
796     def test_SRv6_End_DX6(self):
797         """ Test SRv6 End.DX6 behavior.
798         """
799         # send traffic to one destination interface
800         # source and destination interfaces are IPv6 only
801         self.setup_interfaces(ipv6=[True, True])
802
803         # configure SRv6 localSID End.DX6 behavior
804         localsid = VppSRv6LocalSID(
805                         self, localsid_addr='a3::c4',
806                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX6,
807                         nh_addr=self.pg1.remote_ip6,
808                         end_psp=0,
809                         sw_if_index=self.pg1.sw_if_index,
810                         vlan_index=0,
811                         fib_table=0)
812         localsid.add_vpp_config()
813         # log the localsids
814         self.logger.debug(self.vapi.cli("show sr localsid"))
815
816         # create IPv6 packets with SRH (SL=0)
817         # send one packet per packet size
818         count = len(self.pg_packet_sizes)
819         dst_inner = 'a4::1234'  # inner header destination address
820         pkts = []
821
822         # packets with SRH, segments-left 0, active segment a3::c4
823         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
824                         dst_inner,
825                         sidlist=['a3::c4', 'a2::', 'a1::'],
826                         segleft=0)
827         # add to traffic stream pg0->pg1
828         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
829                                        self.pg_packet_sizes, count))
830
831         # packets without SRH, IPv6 in IPv6
832         # outer IPv6 dest addr is the localsid End.DX6
833         packet_header = self.create_packet_header_IPv6_IPv6(
834                                             dst_inner,
835                                             dst_outer='a3::c4')
836         # add to traffic stream pg0->pg1
837         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
838                                        self.pg_packet_sizes, count))
839
840         # send packets and verify received packets
841         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
842                                   self.compare_rx_tx_packet_End_DX6)
843
844         # log the localsid counters
845         self.logger.info(self.vapi.cli("show sr localsid"))
846
847         # remove SRv6 localSIDs
848         localsid.remove_vpp_config()
849
850         # cleanup interfaces
851         self.teardown_interfaces()
852
853     def test_SRv6_End_DT6(self):
854         """ Test SRv6 End.DT6 behavior.
855         """
856         # create three interfaces (1 source, 2 destinations)
857         # all interfaces are IPv6 only
858         # source interface in global FIB (0)
859         # destination interfaces in global and vrf
860         vrf_1 = 1
861         ipt = VppIpTable(self, vrf_1, is_ip6=True)
862         ipt.add_vpp_config()
863         self.setup_interfaces(ipv6=[True, True, True],
864                               ipv6_table_id=[0, 0, vrf_1])
865
866         # configure FIB entries
867         # a4::/64 is reachable
868         #     via pg1 in table 0 (global)
869         #     and via pg2 in table vrf_1
870         route0 = VppIpRoute(self, "a4::", 64,
871                             [VppRoutePath(self.pg1.remote_ip6,
872                                           self.pg1.sw_if_index,
873                                           proto=DpoProto.DPO_PROTO_IP6,
874                                           nh_table_id=0)],
875                             table_id=0,
876                             is_ip6=1)
877         route0.add_vpp_config()
878         route1 = VppIpRoute(self, "a4::", 64,
879                             [VppRoutePath(self.pg2.remote_ip6,
880                                           self.pg2.sw_if_index,
881                                           proto=DpoProto.DPO_PROTO_IP6,
882                                           nh_table_id=vrf_1)],
883                             table_id=vrf_1,
884                             is_ip6=1)
885         route1.add_vpp_config()
886         self.logger.debug(self.vapi.cli("show ip6 fib"))
887
888         # configure SRv6 localSID End.DT6 behavior
889         # Note:
890         # fib_table: where the localsid is installed
891         # sw_if_index: in T-variants of localsid this is the vrf table_id
892         localsid = VppSRv6LocalSID(
893                         self, localsid_addr='a3::c4',
894                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT6,
895                         nh_addr='::',
896                         end_psp=0,
897                         sw_if_index=vrf_1,
898                         vlan_index=0,
899                         fib_table=0)
900         localsid.add_vpp_config()
901         # log the localsids
902         self.logger.debug(self.vapi.cli("show sr localsid"))
903
904         # create IPv6 packets with SRH (SL=0)
905         # send one packet per packet size
906         count = len(self.pg_packet_sizes)
907         dst_inner = 'a4::1234'  # inner header destination address
908         pkts = []
909
910         # packets with SRH, segments-left 0, active segment a3::c4
911         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
912                         dst_inner,
913                         sidlist=['a3::c4', 'a2::', 'a1::'],
914                         segleft=0)
915         # add to traffic stream pg0->pg1
916         pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
917                                        self.pg_packet_sizes, count))
918
919         # packets without SRH, IPv6 in IPv6
920         # outer IPv6 dest addr is the localsid End.DT6
921         packet_header = self.create_packet_header_IPv6_IPv6(
922                                             dst_inner,
923                                             dst_outer='a3::c4')
924         # add to traffic stream pg0->pg1
925         pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
926                                        self.pg_packet_sizes, count))
927
928         # send packets and verify received packets
929         # using same comparison function as End.DX6
930         self.send_and_verify_pkts(self.pg0, pkts, self.pg2,
931                                   self.compare_rx_tx_packet_End_DX6)
932
933         # assert nothing was received on the other interface (pg2)
934         self.pg1.assert_nothing_captured("mis-directed packet(s)")
935
936         # log the localsid counters
937         self.logger.info(self.vapi.cli("show sr localsid"))
938
939         # remove SRv6 localSIDs
940         localsid.remove_vpp_config()
941
942         # remove FIB entries
943         # done by tearDown
944
945         # cleanup interfaces
946         self.teardown_interfaces()
947
948     def test_SRv6_End_DX4(self):
949         """ Test SRv6 End.DX4 behavior.
950         """
951         # send traffic to one destination interface
952         # source interface is IPv6 only
953         # destination interface is IPv4 only
954         self.setup_interfaces(ipv6=[True, False], ipv4=[False, True])
955
956         # configure SRv6 localSID End.DX4 behavior
957         localsid = VppSRv6LocalSID(
958                         self, localsid_addr='a3::c4',
959                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX4,
960                         nh_addr=self.pg1.remote_ip4,
961                         end_psp=0,
962                         sw_if_index=self.pg1.sw_if_index,
963                         vlan_index=0,
964                         fib_table=0)
965         localsid.add_vpp_config()
966         # log the localsids
967         self.logger.debug(self.vapi.cli("show sr localsid"))
968
969         # send one packet per packet size
970         count = len(self.pg_packet_sizes)
971         dst_inner = '4.1.1.123'  # inner header destination address
972         pkts = []
973
974         # packets with SRH, segments-left 0, active segment a3::c4
975         packet_header = self.create_packet_header_IPv6_SRH_IPv4(
976                         dst_inner,
977                         sidlist=['a3::c4', 'a2::', 'a1::'],
978                         segleft=0)
979         # add to traffic stream pg0->pg1
980         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
981                                        self.pg_packet_sizes, count))
982
983         # packets without SRH, IPv4 in IPv6
984         # outer IPv6 dest addr is the localsid End.DX4
985         packet_header = self.create_packet_header_IPv6_IPv4(
986                                             dst_inner,
987                                             dst_outer='a3::c4')
988         # add to traffic stream pg0->pg1
989         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
990                                        self.pg_packet_sizes, count))
991
992         # send packets and verify received packets
993         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
994                                   self.compare_rx_tx_packet_End_DX4)
995
996         # log the localsid counters
997         self.logger.info(self.vapi.cli("show sr localsid"))
998
999         # remove SRv6 localSIDs
1000         localsid.remove_vpp_config()
1001
1002         # cleanup interfaces
1003         self.teardown_interfaces()
1004
1005     def test_SRv6_End_DT4(self):
1006         """ Test SRv6 End.DT4 behavior.
1007         """
1008         # create three interfaces (1 source, 2 destinations)
1009         # source interface is IPv6-only
1010         # destination interfaces are IPv4 only
1011         # source interface in global FIB (0)
1012         # destination interfaces in global and vrf
1013         vrf_1 = 1
1014         ipt = VppIpTable(self, vrf_1)
1015         ipt.add_vpp_config()
1016         self.setup_interfaces(ipv6=[True, False, False],
1017                               ipv4=[False, True, True],
1018                               ipv6_table_id=[0, 0, 0],
1019                               ipv4_table_id=[0, 0, vrf_1])
1020
1021         # configure FIB entries
1022         # 4.1.1.0/24 is reachable
1023         #     via pg1 in table 0 (global)
1024         #     and via pg2 in table vrf_1
1025         route0 = VppIpRoute(self, "4.1.1.0", 24,
1026                             [VppRoutePath(self.pg1.remote_ip4,
1027                                           self.pg1.sw_if_index,
1028                                           nh_table_id=0)],
1029                             table_id=0,
1030                             is_ip6=0)
1031         route0.add_vpp_config()
1032         route1 = VppIpRoute(self, "4.1.1.0", 24,
1033                             [VppRoutePath(self.pg2.remote_ip4,
1034                                           self.pg2.sw_if_index,
1035                                           nh_table_id=vrf_1)],
1036                             table_id=vrf_1,
1037                             is_ip6=0)
1038         route1.add_vpp_config()
1039         self.logger.debug(self.vapi.cli("show ip fib"))
1040
1041         # configure SRv6 localSID End.DT6 behavior
1042         # Note:
1043         # fib_table: where the localsid is installed
1044         # sw_if_index: in T-variants of localsid: vrf table_id
1045         localsid = VppSRv6LocalSID(
1046                         self, localsid_addr='a3::c4',
1047                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT4,
1048                         nh_addr='::',
1049                         end_psp=0,
1050                         sw_if_index=vrf_1,
1051                         vlan_index=0,
1052                         fib_table=0)
1053         localsid.add_vpp_config()
1054         # log the localsids
1055         self.logger.debug(self.vapi.cli("show sr localsid"))
1056
1057         # create IPv6 packets with SRH (SL=0)
1058         # send one packet per packet size
1059         count = len(self.pg_packet_sizes)
1060         dst_inner = '4.1.1.123'  # inner header destination address
1061         pkts = []
1062
1063         # packets with SRH, segments-left 0, active segment a3::c4
1064         packet_header = self.create_packet_header_IPv6_SRH_IPv4(
1065                         dst_inner,
1066                         sidlist=['a3::c4', 'a2::', 'a1::'],
1067                         segleft=0)
1068         # add to traffic stream pg0->pg1
1069         pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
1070                                        self.pg_packet_sizes, count))
1071
1072         # packets without SRH, IPv6 in IPv6
1073         # outer IPv6 dest addr is the localsid End.DX4
1074         packet_header = self.create_packet_header_IPv6_IPv4(
1075                                             dst_inner,
1076                                             dst_outer='a3::c4')
1077         # add to traffic stream pg0->pg1
1078         pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
1079                                        self.pg_packet_sizes, count))
1080
1081         # send packets and verify received packets
1082         # using same comparison function as End.DX4
1083         self.send_and_verify_pkts(self.pg0, pkts, self.pg2,
1084                                   self.compare_rx_tx_packet_End_DX4)
1085
1086         # assert nothing was received on the other interface (pg2)
1087         self.pg1.assert_nothing_captured("mis-directed packet(s)")
1088
1089         # log the localsid counters
1090         self.logger.info(self.vapi.cli("show sr localsid"))
1091
1092         # remove SRv6 localSIDs
1093         localsid.remove_vpp_config()
1094
1095         # remove FIB entries
1096         # done by tearDown
1097
1098         # cleanup interfaces
1099         self.teardown_interfaces()
1100
1101     def test_SRv6_End_DX2(self):
1102         """ Test SRv6 End.DX2 behavior.
1103         """
1104         # send traffic to one destination interface
1105         # source interface is IPv6 only
1106         self.setup_interfaces(ipv6=[True, False], ipv4=[False, False])
1107
1108         # configure SRv6 localSID End.DX2 behavior
1109         localsid = VppSRv6LocalSID(
1110                         self, localsid_addr='a3::c4',
1111                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX2,
1112                         nh_addr='::',
1113                         end_psp=0,
1114                         sw_if_index=self.pg1.sw_if_index,
1115                         vlan_index=0,
1116                         fib_table=0)
1117         localsid.add_vpp_config()
1118         # log the localsids
1119         self.logger.debug(self.vapi.cli("show sr localsid"))
1120
1121         # send one packet per packet size
1122         count = len(self.pg_packet_sizes)
1123         pkts = []
1124
1125         # packets with SRH, segments-left 0, active segment a3::c4
1126         # L2 has no dot1q header
1127         packet_header = self.create_packet_header_IPv6_SRH_L2(
1128                             sidlist=['a3::c4', 'a2::', 'a1::'],
1129                             segleft=0,
1130                             vlan=0)
1131         # add to traffic stream pg0->pg1
1132         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1133                                        self.pg_packet_sizes, count))
1134
1135         # packets with SRH, segments-left 0, active segment a3::c4
1136         # L2 has dot1q header
1137         packet_header = self.create_packet_header_IPv6_SRH_L2(
1138                             sidlist=['a3::c4', 'a2::', 'a1::'],
1139                             segleft=0,
1140                             vlan=123)
1141         # add to traffic stream pg0->pg1
1142         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1143                                        self.pg_packet_sizes, count))
1144
1145         # packets without SRH, L2 in IPv6
1146         # outer IPv6 dest addr is the localsid End.DX2
1147         # L2 has no dot1q header
1148         packet_header = self.create_packet_header_IPv6_L2(
1149                                             dst_outer='a3::c4',
1150                                             vlan=0)
1151         # add to traffic stream pg0->pg1
1152         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1153                                        self.pg_packet_sizes, count))
1154
1155         # packets without SRH, L2 in IPv6
1156         # outer IPv6 dest addr is the localsid End.DX2
1157         # L2 has dot1q header
1158         packet_header = self.create_packet_header_IPv6_L2(
1159                                             dst_outer='a3::c4',
1160                                             vlan=123)
1161         # add to traffic stream pg0->pg1
1162         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1163                                        self.pg_packet_sizes, count))
1164
1165         # send packets and verify received packets
1166         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
1167                                   self.compare_rx_tx_packet_End_DX2)
1168
1169         # log the localsid counters
1170         self.logger.info(self.vapi.cli("show sr localsid"))
1171
1172         # remove SRv6 localSIDs
1173         localsid.remove_vpp_config()
1174
1175         # cleanup interfaces
1176         self.teardown_interfaces()
1177
1178     @unittest.skipUnless(0, "PC to fix")
1179     def test_SRv6_T_Insert_Classifier(self):
1180         """ Test SRv6 Transit.Insert behavior (IPv6 only).
1181             steer packets using the classifier
1182         """
1183         # send traffic to one destination interface
1184         # source and destination are IPv6 only
1185         self.setup_interfaces(ipv6=[False, False, False, True, True])
1186
1187         # configure FIB entries
1188         route = VppIpRoute(self, "a4::", 64,
1189                            [VppRoutePath(self.pg4.remote_ip6,
1190                                          self.pg4.sw_if_index,
1191                                          proto=DpoProto.DPO_PROTO_IP6)],
1192                            is_ip6=1)
1193         route.add_vpp_config()
1194
1195         # configure encaps IPv6 source address
1196         # needs to be done before SR Policy config
1197         # TODO: API?
1198         self.vapi.cli("set sr encaps source addr a3::")
1199
1200         bsid = 'a3::9999:1'
1201         # configure SRv6 Policy
1202         # Note: segment list order: first -> last
1203         sr_policy = VppSRv6Policy(
1204             self, bsid=bsid,
1205             is_encap=0,
1206             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
1207             weight=1, fib_table=0,
1208             segments=['a4::', 'a5::', 'a6::c7'],
1209             source='a3::')
1210         sr_policy.add_vpp_config()
1211         self.sr_policy = sr_policy
1212
1213         # log the sr policies
1214         self.logger.info(self.vapi.cli("show sr policies"))
1215
1216         # add classify table
1217         # mask on dst ip address prefix a7::/8
1218         mask = '{:0<16}'.format('ff')
1219         r = self.vapi.classify_add_del_table(
1220             1,
1221             binascii.unhexlify(mask),
1222             match_n_vectors=(len(mask) - 1) // 32 + 1,
1223             current_data_flag=1,
1224             skip_n_vectors=2)  # data offset
1225         self.assertIsNotNone(r, msg='No response msg for add_del_table')
1226         table_index = r.new_table_index
1227
1228         # add the source routign node as a ip6 inacl netxt node
1229         r = self.vapi.add_node_next('ip6-inacl',
1230                                     'sr-pl-rewrite-insert')
1231         inacl_next_node_index = r.node_index
1232
1233         match = '{:0<16}'.format('a7')
1234         r = self.vapi.classify_add_del_session(
1235             1,
1236             table_index,
1237             binascii.unhexlify(match),
1238             hit_next_index=inacl_next_node_index,
1239             action=3,
1240             metadata=0)  # sr policy index
1241         self.assertIsNotNone(r, msg='No response msg for add_del_session')
1242
1243         # log the classify table used in the steering policy
1244         self.logger.info(self.vapi.cli("show classify table"))
1245
1246         r = self.vapi.input_acl_set_interface(
1247             is_add=1,
1248             sw_if_index=self.pg3.sw_if_index,
1249             ip6_table_index=table_index)
1250         self.assertIsNotNone(r,
1251                              msg='No response msg for input_acl_set_interface')
1252
1253         # log the ip6 inacl
1254         self.logger.info(self.vapi.cli("show inacl type ip6"))
1255
1256         # create packets
1257         count = len(self.pg_packet_sizes)
1258         dst_inner = 'a7::1234'
1259         pkts = []
1260
1261         # create IPv6 packets without SRH
1262         packet_header = self.create_packet_header_IPv6(dst_inner)
1263         # create traffic stream pg3->pg4
1264         pkts.extend(self.create_stream(self.pg3, self.pg4, packet_header,
1265                                        self.pg_packet_sizes, count))
1266
1267         # create IPv6 packets with SRH
1268         # packets with segments-left 1, active segment a7::
1269         packet_header = self.create_packet_header_IPv6_SRH(
1270             sidlist=['a8::', 'a7::', 'a6::'],
1271             segleft=1)
1272         # create traffic stream pg3->pg4
1273         pkts.extend(self.create_stream(self.pg3, self.pg4, packet_header,
1274                                        self.pg_packet_sizes, count))
1275
1276         # send packets and verify received packets
1277         self.send_and_verify_pkts(self.pg3, pkts, self.pg4,
1278                                   self.compare_rx_tx_packet_T_Insert)
1279
1280         # remove the interface l2 input feature
1281         r = self.vapi.input_acl_set_interface(
1282             is_add=0,
1283             sw_if_index=self.pg3.sw_if_index,
1284             ip6_table_index=table_index)
1285         self.assertIsNotNone(r,
1286                              msg='No response msg for input_acl_set_interface')
1287
1288         # log the ip6 inacl after cleaning
1289         self.logger.info(self.vapi.cli("show inacl type ip6"))
1290
1291         # log the localsid counters
1292         self.logger.info(self.vapi.cli("show sr localsid"))
1293
1294         # remove classifier SR steering
1295         # classifier_steering.remove_vpp_config()
1296         self.logger.info(self.vapi.cli("show sr steering policies"))
1297
1298         # remove SR Policies
1299         self.sr_policy.remove_vpp_config()
1300         self.logger.info(self.vapi.cli("show sr policies"))
1301
1302         # remove classify session and table
1303         r = self.vapi.classify_add_del_session(
1304             0,
1305             table_index,
1306             binascii.unhexlify(match))
1307         self.assertIsNotNone(r, msg='No response msg for add_del_session')
1308
1309         r = self.vapi.classify_add_del_table(
1310             0,
1311             binascii.unhexlify(mask),
1312             table_index=table_index)
1313         self.assertIsNotNone(r, msg='No response msg for add_del_table')
1314
1315         self.logger.info(self.vapi.cli("show classify table"))
1316
1317         # remove FIB entries
1318         # done by tearDown
1319
1320         # cleanup interfaces
1321         self.teardown_interfaces()
1322
1323     def compare_rx_tx_packet_T_Encaps(self, tx_pkt, rx_pkt):
1324         """ Compare input and output packet after passing T.Encaps
1325
1326         :param tx_pkt: transmitted packet
1327         :param rx_pkt: received packet
1328         """
1329         # T.Encaps updates the headers as follows:
1330         # SR Policy seglist (S3, S2, S1)
1331         # SR Policy source C
1332         # IPv6:
1333         # in: IPv6(A, B2)
1334         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(A, B2)
1335         # IPv6 + SRH:
1336         # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1)
1337         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(a, B2)SRH(B3, B2, B1; SL=1)
1338
1339         # get first (outer) IPv6 header of rx'ed packet
1340         rx_ip = rx_pkt.getlayer(IPv6)
1341         rx_srh = None
1342
1343         tx_ip = tx_pkt.getlayer(IPv6)
1344
1345         # expected segment-list
1346         seglist = self.sr_policy.segments
1347         # reverse list to get order as in SRH
1348         tx_seglist = seglist[::-1]
1349
1350         # get source address of SR Policy
1351         sr_policy_source = self.sr_policy.source
1352
1353         # rx'ed packet should have SRH
1354         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1355         # get SRH
1356         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1357
1358         # received ip.src should be equal to SR Policy source
1359         self.assertEqual(rx_ip.src, sr_policy_source)
1360         # received ip.dst should be equal to expected sidlist[lastentry]
1361         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1362         # rx'ed seglist should be equal to expected seglist
1363         self.assertEqual(rx_srh.addresses, tx_seglist)
1364         # segleft should be equal to size expected seglist-1
1365         self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1366         # segleft should be equal to lastentry
1367         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1368
1369         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1370         # except for the hop-limit field
1371         #   -> update tx'ed hlim to the expected hlim
1372         tx_ip.hlim = tx_ip.hlim - 1
1373
1374         self.assertEqual(rx_srh.payload, tx_ip)
1375
1376         self.logger.debug("packet verification: SUCCESS")
1377
1378     def compare_rx_tx_packet_T_Encaps_IPv4(self, tx_pkt, rx_pkt):
1379         """ Compare input and output packet after passing T.Encaps for IPv4
1380
1381         :param tx_pkt: transmitted packet
1382         :param rx_pkt: received packet
1383         """
1384         # T.Encaps for IPv4 updates the headers as follows:
1385         # SR Policy seglist (S3, S2, S1)
1386         # SR Policy source C
1387         # IPv4:
1388         # in: IPv4(A, B2)
1389         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv4(A, B2)
1390
1391         # get first (outer) IPv6 header of rx'ed packet
1392         rx_ip = rx_pkt.getlayer(IPv6)
1393         rx_srh = None
1394
1395         tx_ip = tx_pkt.getlayer(IP)
1396
1397         # expected segment-list
1398         seglist = self.sr_policy.segments
1399         # reverse list to get order as in SRH
1400         tx_seglist = seglist[::-1]
1401
1402         # get source address of SR Policy
1403         sr_policy_source = self.sr_policy.source
1404
1405         # checks common to cases tx with and without SRH
1406         # rx'ed packet should have SRH and IPv4 header
1407         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1408         self.assertTrue(rx_ip.payload.haslayer(IP))
1409         # get SRH
1410         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1411
1412         # received ip.src should be equal to SR Policy source
1413         self.assertEqual(rx_ip.src, sr_policy_source)
1414         # received ip.dst should be equal to sidlist[lastentry]
1415         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1416         # rx'ed seglist should be equal to seglist
1417         self.assertEqual(rx_srh.addresses, tx_seglist)
1418         # segleft should be equal to size seglist-1
1419         self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1420         # segleft should be equal to lastentry
1421         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1422
1423         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1424         # except for the ttl field and ip checksum
1425         #   -> adjust tx'ed ttl to expected ttl
1426         tx_ip.ttl = tx_ip.ttl - 1
1427         #   -> set tx'ed ip checksum to None and let scapy recompute
1428         tx_ip.chksum = None
1429         # read back the pkt (with str()) to force computing these fields
1430         # probably other ways to accomplish this are possible
1431         tx_ip = IP(str(tx_ip))
1432
1433         self.assertEqual(rx_srh.payload, tx_ip)
1434
1435         self.logger.debug("packet verification: SUCCESS")
1436
1437     def compare_rx_tx_packet_T_Encaps_L2(self, tx_pkt, rx_pkt):
1438         """ Compare input and output packet after passing T.Encaps for L2
1439
1440         :param tx_pkt: transmitted packet
1441         :param rx_pkt: received packet
1442         """
1443         # T.Encaps for L2 updates the headers as follows:
1444         # SR Policy seglist (S3, S2, S1)
1445         # SR Policy source C
1446         # L2:
1447         # in: L2
1448         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)L2
1449
1450         # get first (outer) IPv6 header of rx'ed packet
1451         rx_ip = rx_pkt.getlayer(IPv6)
1452         rx_srh = None
1453
1454         tx_ether = tx_pkt.getlayer(Ether)
1455
1456         # expected segment-list
1457         seglist = self.sr_policy.segments
1458         # reverse list to get order as in SRH
1459         tx_seglist = seglist[::-1]
1460
1461         # get source address of SR Policy
1462         sr_policy_source = self.sr_policy.source
1463
1464         # rx'ed packet should have SRH
1465         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1466         # get SRH
1467         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1468
1469         # received ip.src should be equal to SR Policy source
1470         self.assertEqual(rx_ip.src, sr_policy_source)
1471         # received ip.dst should be equal to sidlist[lastentry]
1472         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1473         # rx'ed seglist should be equal to seglist
1474         self.assertEqual(rx_srh.addresses, tx_seglist)
1475         # segleft should be equal to size seglist-1
1476         self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1477         # segleft should be equal to lastentry
1478         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1479         # nh should be "No Next Header" (59)
1480         self.assertEqual(rx_srh.nh, 59)
1481
1482         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1483         self.assertEqual(Ether(str(rx_srh.payload)), tx_ether)
1484
1485         self.logger.debug("packet verification: SUCCESS")
1486
1487     def compare_rx_tx_packet_T_Insert(self, tx_pkt, rx_pkt):
1488         """ Compare input and output packet after passing T.Insert
1489
1490         :param tx_pkt: transmitted packet
1491         :param rx_pkt: received packet
1492         """
1493         # T.Insert updates the headers as follows:
1494         # IPv6:
1495         # in: IPv6(A, B2)
1496         # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)
1497         # IPv6 + SRH:
1498         # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1)
1499         # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)SRH(B3, B2, B1; SL=1)
1500
1501         # get first (outer) IPv6 header of rx'ed packet
1502         rx_ip = rx_pkt.getlayer(IPv6)
1503         rx_srh = None
1504         rx_ip2 = None
1505         rx_srh2 = None
1506         rx_ip3 = None
1507         rx_udp = rx_pkt[UDP]
1508
1509         tx_ip = tx_pkt.getlayer(IPv6)
1510         tx_srh = None
1511         tx_ip2 = None
1512         # some packets have been tx'ed with an SRH, some without it
1513         # get SRH if tx'ed packet has it
1514         if tx_pkt.haslayer(IPv6ExtHdrSegmentRouting):
1515             tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1516             tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1517         tx_udp = tx_pkt[UDP]
1518
1519         # expected segment-list (make copy of SR Policy segment list)
1520         seglist = self.sr_policy.segments[:]
1521         # expected seglist has initial dest addr as last segment
1522         seglist.append(tx_ip.dst)
1523         # reverse list to get order as in SRH
1524         tx_seglist = seglist[::-1]
1525
1526         # get source address of SR Policy
1527         sr_policy_source = self.sr_policy.source
1528
1529         # checks common to cases tx with and without SRH
1530         # rx'ed packet should have SRH and only one IPv6 header
1531         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1532         self.assertFalse(rx_ip.payload.haslayer(IPv6))
1533         # get SRH
1534         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1535
1536         # rx'ed ip.src should be equal to tx'ed ip.src
1537         self.assertEqual(rx_ip.src, tx_ip.src)
1538         # rx'ed ip.dst should be equal to sidlist[lastentry]
1539         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1540
1541         # rx'ed seglist should be equal to expected seglist
1542         self.assertEqual(rx_srh.addresses, tx_seglist)
1543         # segleft should be equal to size(expected seglist)-1
1544         self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1545         # segleft should be equal to lastentry
1546         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1547
1548         if tx_srh:  # packet was tx'ed with SRH
1549             # packet should have 2nd SRH
1550             self.assertTrue(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting))
1551             # get 2nd SRH
1552             rx_srh2 = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting, 2)
1553
1554             # rx'ed srh2.addresses should be equal to tx'ed srh.addresses
1555             self.assertEqual(rx_srh2.addresses, tx_srh.addresses)
1556             # rx'ed srh2.segleft should be equal to tx'ed srh.segleft
1557             self.assertEqual(rx_srh2.segleft, tx_srh.segleft)
1558             # rx'ed srh2.lastentry should be equal to tx'ed srh.lastentry
1559             self.assertEqual(rx_srh2.lastentry, tx_srh.lastentry)
1560
1561         else:  # packet was tx'ed without SRH
1562             # rx packet should have no other SRH
1563             self.assertFalse(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting))
1564
1565         # UDP layer should be unchanged
1566         self.assertEqual(rx_udp, tx_udp)
1567
1568         self.logger.debug("packet verification: SUCCESS")
1569
1570     def compare_rx_tx_packet_End(self, tx_pkt, rx_pkt):
1571         """ Compare input and output packet after passing End (without PSP)
1572
1573         :param tx_pkt: transmitted packet
1574         :param rx_pkt: received packet
1575         """
1576         # End (no PSP) updates the headers as follows:
1577         # IPv6 + SRH:
1578         # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2)
1579         # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1580
1581         # get first (outer) IPv6 header of rx'ed packet
1582         rx_ip = rx_pkt.getlayer(IPv6)
1583         rx_srh = None
1584         rx_ip2 = None
1585         rx_udp = rx_pkt[UDP]
1586
1587         tx_ip = tx_pkt.getlayer(IPv6)
1588         # we know the packet has been tx'ed
1589         # with an inner IPv6 header and an SRH
1590         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1591         tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1592         tx_udp = tx_pkt[UDP]
1593
1594         # common checks, regardless of tx segleft value
1595         # rx'ed packet should have 2nd IPv6 header
1596         self.assertTrue(rx_ip.payload.haslayer(IPv6))
1597         # get second (inner) IPv6 header
1598         rx_ip2 = rx_pkt.getlayer(IPv6, 2)
1599
1600         if tx_ip.segleft > 0:
1601             # SRH should NOT have been popped:
1602             #   End SID without PSP does not pop SRH if segleft>0
1603             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1604             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1605
1606             # received ip.src should be equal to expected ip.src
1607             self.assertEqual(rx_ip.src, tx_ip.src)
1608             # sidlist should be unchanged
1609             self.assertEqual(rx_srh.addresses, tx_srh.addresses)
1610             # segleft should have been decremented
1611             self.assertEqual(rx_srh.segleft, tx_srh.segleft-1)
1612             # received ip.dst should be equal to sidlist[segleft]
1613             self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
1614             # lastentry should be unchanged
1615             self.assertEqual(rx_srh.lastentry, tx_srh.lastentry)
1616             # inner IPv6 packet (ip2) should be unchanged
1617             self.assertEqual(rx_ip2.src, tx_ip2.src)
1618             self.assertEqual(rx_ip2.dst, tx_ip2.dst)
1619         # else:  # tx_ip.segleft == 0
1620             # TODO: Does this work with 2 SRHs in ingress packet?
1621
1622         # UDP layer should be unchanged
1623         self.assertEqual(rx_udp, tx_udp)
1624
1625         self.logger.debug("packet verification: SUCCESS")
1626
1627     def compare_rx_tx_packet_End_PSP(self, tx_pkt, rx_pkt):
1628         """ Compare input and output packet after passing End with PSP
1629
1630         :param tx_pkt: transmitted packet
1631         :param rx_pkt: received packet
1632         """
1633         # End (PSP) updates the headers as follows:
1634         # IPv6 + SRH (SL>1):
1635         # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2)
1636         # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1637         # IPv6 + SRH (SL=1):
1638         # in: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1639         # out: IPv6(A, S3)
1640
1641         # get first (outer) IPv6 header of rx'ed packet
1642         rx_ip = rx_pkt.getlayer(IPv6)
1643         rx_srh = None
1644         rx_ip2 = None
1645         rx_udp = rx_pkt[UDP]
1646
1647         tx_ip = tx_pkt.getlayer(IPv6)
1648         # we know the packet has been tx'ed
1649         # with an inner IPv6 header and an SRH
1650         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1651         tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1652         tx_udp = tx_pkt[UDP]
1653
1654         # common checks, regardless of tx segleft value
1655         self.assertTrue(rx_ip.payload.haslayer(IPv6))
1656         rx_ip2 = rx_pkt.getlayer(IPv6, 2)
1657         # inner IPv6 packet (ip2) should be unchanged
1658         self.assertEqual(rx_ip2.src, tx_ip2.src)
1659         self.assertEqual(rx_ip2.dst, tx_ip2.dst)
1660
1661         if tx_ip.segleft > 1:
1662             # SRH should NOT have been popped:
1663             #   End SID with PSP does not pop SRH if segleft>1
1664             # rx'ed packet should have SRH
1665             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1666             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1667
1668             # received ip.src should be equal to expected ip.src
1669             self.assertEqual(rx_ip.src, tx_ip.src)
1670             # sidlist should be unchanged
1671             self.assertEqual(rx_srh.addresses, tx_srh.addresses)
1672             # segleft should have been decremented
1673             self.assertEqual(rx_srh.segleft, tx_srh.segleft-1)
1674             # received ip.dst should be equal to sidlist[segleft]
1675             self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
1676             # lastentry should be unchanged
1677             self.assertEqual(rx_srh.lastentry, tx_srh.lastentry)
1678
1679         else:  # tx_ip.segleft <= 1
1680             # SRH should have been popped:
1681             #   End SID with PSP and segleft=1 pops SRH
1682             # the two IPv6 headers are still present
1683             # outer IPv6 header has DA == last segment of popped SRH
1684             # SRH should not be present
1685             self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1686             # outer IPv6 header ip.src should be equal to tx'ed ip.src
1687             self.assertEqual(rx_ip.src, tx_ip.src)
1688             # outer IPv6 header ip.dst should be = to tx'ed sidlist[segleft-1]
1689             self.assertEqual(rx_ip.dst, tx_srh.addresses[tx_srh.segleft-1])
1690
1691         # UDP layer should be unchanged
1692         self.assertEqual(rx_udp, tx_udp)
1693
1694         self.logger.debug("packet verification: SUCCESS")
1695
1696     def compare_rx_tx_packet_End_DX6(self, tx_pkt, rx_pkt):
1697         """ Compare input and output packet after passing End.DX6
1698
1699         :param tx_pkt: transmitted packet
1700         :param rx_pkt: received packet
1701         """
1702         # End.DX6 updates the headers as follows:
1703         # IPv6 + SRH (SL=0):
1704         # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv6(B, D)
1705         # out: IPv6(B, D)
1706         # IPv6:
1707         # in: IPv6(A, S3)IPv6(B, D)
1708         # out: IPv6(B, D)
1709
1710         # get first (outer) IPv6 header of rx'ed packet
1711         rx_ip = rx_pkt.getlayer(IPv6)
1712
1713         tx_ip = tx_pkt.getlayer(IPv6)
1714         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1715
1716         # verify if rx'ed packet has no SRH
1717         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1718
1719         # the whole rx_ip pkt should be equal to tx_ip2
1720         # except for the hlim field
1721         #   -> adjust tx'ed hlim to expected hlim
1722         tx_ip2.hlim = tx_ip2.hlim - 1
1723
1724         self.assertEqual(rx_ip, tx_ip2)
1725
1726         self.logger.debug("packet verification: SUCCESS")
1727
1728     def compare_rx_tx_packet_End_DX4(self, tx_pkt, rx_pkt):
1729         """ Compare input and output packet after passing End.DX4
1730
1731         :param tx_pkt: transmitted packet
1732         :param rx_pkt: received packet
1733         """
1734         # End.DX4 updates the headers as follows:
1735         # IPv6 + SRH (SL=0):
1736         # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv4(B, D)
1737         # out: IPv4(B, D)
1738         # IPv6:
1739         # in: IPv6(A, S3)IPv4(B, D)
1740         # out: IPv4(B, D)
1741
1742         # get IPv4 header of rx'ed packet
1743         rx_ip = rx_pkt.getlayer(IP)
1744
1745         tx_ip = tx_pkt.getlayer(IPv6)
1746         tx_ip2 = tx_pkt.getlayer(IP)
1747
1748         # verify if rx'ed packet has no SRH
1749         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1750
1751         # the whole rx_ip pkt should be equal to tx_ip2
1752         # except for the ttl field and ip checksum
1753         #   -> adjust tx'ed ttl to expected ttl
1754         tx_ip2.ttl = tx_ip2.ttl - 1
1755         #   -> set tx'ed ip checksum to None and let scapy recompute
1756         tx_ip2.chksum = None
1757         # read back the pkt (with str()) to force computing these fields
1758         # probably other ways to accomplish this are possible
1759         tx_ip2 = IP(str(tx_ip2))
1760
1761         self.assertEqual(rx_ip, tx_ip2)
1762
1763         self.logger.debug("packet verification: SUCCESS")
1764
1765     def compare_rx_tx_packet_End_DX2(self, tx_pkt, rx_pkt):
1766         """ Compare input and output packet after passing End.DX2
1767
1768         :param tx_pkt: transmitted packet
1769         :param rx_pkt: received packet
1770         """
1771         # End.DX2 updates the headers as follows:
1772         # IPv6 + SRH (SL=0):
1773         # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)L2
1774         # out: L2
1775         # IPv6:
1776         # in: IPv6(A, S3)L2
1777         # out: L2
1778
1779         # get IPv4 header of rx'ed packet
1780         rx_eth = rx_pkt.getlayer(Ether)
1781
1782         tx_ip = tx_pkt.getlayer(IPv6)
1783         # we can't just get the 2nd Ether layer
1784         # get the Raw content and dissect it as Ether
1785         tx_eth1 = Ether(str(tx_pkt[Raw]))
1786
1787         # verify if rx'ed packet has no SRH
1788         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1789
1790         # the whole rx_eth pkt should be equal to tx_eth1
1791         self.assertEqual(rx_eth, tx_eth1)
1792
1793         self.logger.debug("packet verification: SUCCESS")
1794
1795     def create_stream(self, src_if, dst_if, packet_header, packet_sizes,
1796                       count):
1797         """Create SRv6 input packet stream for defined interface.
1798
1799         :param VppInterface src_if: Interface to create packet stream for
1800         :param VppInterface dst_if: destination interface of packet stream
1801         :param packet_header: Layer3 scapy packet headers,
1802                 L2 is added when not provided,
1803                 Raw(payload) with packet_info is added
1804         :param list packet_sizes: packet stream pckt sizes,sequentially applied
1805                to packets in stream have
1806         :param int count: number of packets in packet stream
1807         :return: list of packets
1808         """
1809         self.logger.info("Creating packets")
1810         pkts = []
1811         for i in range(0, count-1):
1812             payload_info = self.create_packet_info(src_if, dst_if)
1813             self.logger.debug(
1814                 "Creating packet with index %d" % (payload_info.index))
1815             payload = self.info_to_payload(payload_info)
1816             # add L2 header if not yet provided in packet_header
1817             if packet_header.getlayer(0).name == 'Ethernet':
1818                 p = (packet_header /
1819                      Raw(payload))
1820             else:
1821                 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
1822                      packet_header /
1823                      Raw(payload))
1824             size = packet_sizes[i % len(packet_sizes)]
1825             self.logger.debug("Packet size %d" % (size))
1826             self.extend_packet(p, size)
1827             # we need to store the packet with the automatic fields computed
1828             # read back the dumped packet (with str())
1829             # to force computing these fields
1830             # probably other ways are possible
1831             p = Ether(str(p))
1832             payload_info.data = p.copy()
1833             self.logger.debug(ppp("Created packet:", p))
1834             pkts.append(p)
1835         self.logger.info("Done creating packets")
1836         return pkts
1837
1838     def send_and_verify_pkts(self, input, pkts, output, compare_func):
1839         """Send packets and verify received packets using compare_func
1840
1841         :param input: ingress interface of DUT
1842         :param pkts: list of packets to transmit
1843         :param output: egress interface of DUT
1844         :param compare_func: function to compare in and out packets
1845         """
1846         # add traffic stream to input interface
1847         input.add_stream(pkts)
1848
1849         # enable capture on all interfaces
1850         self.pg_enable_capture(self.pg_interfaces)
1851
1852         # start traffic
1853         self.logger.info("Starting traffic")
1854         self.pg_start()
1855
1856         # get output capture
1857         self.logger.info("Getting packet capture")
1858         capture = output.get_capture()
1859
1860         # assert nothing was captured on input interface
1861         input.assert_nothing_captured()
1862
1863         # verify captured packets
1864         self.verify_captured_pkts(output, capture, compare_func)
1865
1866     def create_packet_header_IPv6(self, dst):
1867         """Create packet header: IPv6 header, UDP header
1868
1869         :param dst: IPv6 destination address
1870
1871         IPv6 source address is 1234::1
1872         UDP source port and destination port are 1234
1873         """
1874
1875         p = (IPv6(src='1234::1', dst=dst) /
1876              UDP(sport=1234, dport=1234))
1877         return p
1878
1879     def create_packet_header_IPv6_SRH(self, sidlist, segleft):
1880         """Create packet header: IPv6 header with SRH, UDP header
1881
1882         :param list sidlist: segment list
1883         :param int segleft: segments-left field value
1884
1885         IPv6 destination address is set to sidlist[segleft]
1886         IPv6 source addresses are 1234::1 and 4321::1
1887         UDP source port and destination port are 1234
1888         """
1889
1890         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1891              IPv6ExtHdrSegmentRouting(addresses=sidlist) /
1892              UDP(sport=1234, dport=1234))
1893         return p
1894
1895     def create_packet_header_IPv6_SRH_IPv6(self, dst, sidlist, segleft):
1896         """Create packet header: IPv6 encapsulated in SRv6:
1897         IPv6 header with SRH, IPv6 header, UDP header
1898
1899         :param ipv6address dst: inner IPv6 destination address
1900         :param list sidlist: segment list of outer IPv6 SRH
1901         :param int segleft: segments-left field of outer IPv6 SRH
1902
1903         Outer IPv6 destination address is set to sidlist[segleft]
1904         IPv6 source addresses are 1234::1 and 4321::1
1905         UDP source port and destination port are 1234
1906         """
1907
1908         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1909              IPv6ExtHdrSegmentRouting(addresses=sidlist,
1910                                       segleft=segleft, nh=41) /
1911              IPv6(src='4321::1', dst=dst) /
1912              UDP(sport=1234, dport=1234))
1913         return p
1914
1915     def create_packet_header_IPv6_IPv6(self, dst_inner, dst_outer):
1916         """Create packet header: IPv6 encapsulated in IPv6:
1917         IPv6 header, IPv6 header, UDP header
1918
1919         :param ipv6address dst_inner: inner IPv6 destination address
1920         :param ipv6address dst_outer: outer IPv6 destination address
1921
1922         IPv6 source addresses are 1234::1 and 4321::1
1923         UDP source port and destination port are 1234
1924         """
1925
1926         p = (IPv6(src='1234::1', dst=dst_outer) /
1927              IPv6(src='4321::1', dst=dst_inner) /
1928              UDP(sport=1234, dport=1234))
1929         return p
1930
1931     def create_packet_header_IPv6_SRH_SRH_IPv6(self, dst, sidlist1, segleft1,
1932                                                sidlist2, segleft2):
1933         """Create packet header: IPv6 encapsulated in SRv6 with 2 SRH:
1934         IPv6 header with SRH, 2nd SRH, IPv6 header, UDP header
1935
1936         :param ipv6address dst: inner IPv6 destination address
1937         :param list sidlist1: segment list of outer IPv6 SRH
1938         :param int segleft1: segments-left field of outer IPv6 SRH
1939         :param list sidlist2: segment list of inner IPv6 SRH
1940         :param int segleft2: segments-left field of inner IPv6 SRH
1941
1942         Outer IPv6 destination address is set to sidlist[segleft]
1943         IPv6 source addresses are 1234::1 and 4321::1
1944         UDP source port and destination port are 1234
1945         """
1946
1947         p = (IPv6(src='1234::1', dst=sidlist1[segleft1]) /
1948              IPv6ExtHdrSegmentRouting(addresses=sidlist1,
1949                                       segleft=segleft1, nh=43) /
1950              IPv6ExtHdrSegmentRouting(addresses=sidlist2,
1951                                       segleft=segleft2, nh=41) /
1952              IPv6(src='4321::1', dst=dst) /
1953              UDP(sport=1234, dport=1234))
1954         return p
1955
1956     def create_packet_header_IPv4(self, dst):
1957         """Create packet header: IPv4 header, UDP header
1958
1959         :param dst: IPv4 destination address
1960
1961         IPv4 source address is 123.1.1.1
1962         UDP source port and destination port are 1234
1963         """
1964
1965         p = (IP(src='123.1.1.1', dst=dst) /
1966              UDP(sport=1234, dport=1234))
1967         return p
1968
1969     def create_packet_header_IPv6_IPv4(self, dst_inner, dst_outer):
1970         """Create packet header: IPv4 encapsulated in IPv6:
1971         IPv6 header, IPv4 header, UDP header
1972
1973         :param ipv4address dst_inner: inner IPv4 destination address
1974         :param ipv6address dst_outer: outer IPv6 destination address
1975
1976         IPv6 source address is 1234::1
1977         IPv4 source address is 123.1.1.1
1978         UDP source port and destination port are 1234
1979         """
1980
1981         p = (IPv6(src='1234::1', dst=dst_outer) /
1982              IP(src='123.1.1.1', dst=dst_inner) /
1983              UDP(sport=1234, dport=1234))
1984         return p
1985
1986     def create_packet_header_IPv6_SRH_IPv4(self, dst, sidlist, segleft):
1987         """Create packet header: IPv4 encapsulated in SRv6:
1988         IPv6 header with SRH, IPv4 header, UDP header
1989
1990         :param ipv4address dst: inner IPv4 destination address
1991         :param list sidlist: segment list of outer IPv6 SRH
1992         :param int segleft: segments-left field of outer IPv6 SRH
1993
1994         Outer IPv6 destination address is set to sidlist[segleft]
1995         IPv6 source address is 1234::1
1996         IPv4 source address is 123.1.1.1
1997         UDP source port and destination port are 1234
1998         """
1999
2000         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
2001              IPv6ExtHdrSegmentRouting(addresses=sidlist,
2002                                       segleft=segleft, nh=4) /
2003              IP(src='123.1.1.1', dst=dst) /
2004              UDP(sport=1234, dport=1234))
2005         return p
2006
2007     def create_packet_header_L2(self, vlan=0):
2008         """Create packet header: L2 header
2009
2010         :param vlan: if vlan!=0 then add 802.1q header
2011         """
2012         # Note: the dst addr ('00:55:44:33:22:11') is used in
2013         # the compare function compare_rx_tx_packet_T_Encaps_L2
2014         # to detect presence of L2 in SRH payload
2015         p = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
2016         etype = 0x8137  # IPX
2017         if vlan:
2018             # add 802.1q layer
2019             p /= Dot1Q(vlan=vlan, type=etype)
2020         else:
2021             p.type = etype
2022         return p
2023
2024     def create_packet_header_IPv6_SRH_L2(self, sidlist, segleft, vlan=0):
2025         """Create packet header: L2 encapsulated in SRv6:
2026         IPv6 header with SRH, L2
2027
2028         :param list sidlist: segment list of outer IPv6 SRH
2029         :param int segleft: segments-left field of outer IPv6 SRH
2030         :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
2031
2032         Outer IPv6 destination address is set to sidlist[segleft]
2033         IPv6 source address is 1234::1
2034         """
2035         eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
2036         etype = 0x8137  # IPX
2037         if vlan:
2038             # add 802.1q layer
2039             eth /= Dot1Q(vlan=vlan, type=etype)
2040         else:
2041             eth.type = etype
2042
2043         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
2044              IPv6ExtHdrSegmentRouting(addresses=sidlist,
2045                                       segleft=segleft, nh=59) /
2046              eth)
2047         return p
2048
2049     def create_packet_header_IPv6_L2(self, dst_outer, vlan=0):
2050         """Create packet header: L2 encapsulated in IPv6:
2051         IPv6 header, L2
2052
2053         :param ipv6address dst_outer: outer IPv6 destination address
2054         :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
2055         """
2056         eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
2057         etype = 0x8137  # IPX
2058         if vlan:
2059             # add 802.1q layer
2060             eth /= Dot1Q(vlan=vlan, type=etype)
2061         else:
2062             eth.type = etype
2063
2064         p = (IPv6(src='1234::1', dst=dst_outer, nh=59) / eth)
2065         return p
2066
2067     def get_payload_info(self, packet):
2068         """ Extract the payload_info from the packet
2069         """
2070         # in most cases, payload_info is in packet[Raw]
2071         # but packet[Raw] gives the complete payload
2072         # (incl L2 header) for the T.Encaps L2 case
2073         try:
2074             payload_info = self.payload_to_info(str(packet[Raw]))
2075
2076         except:
2077             # remote L2 header from packet[Raw]:
2078             # take packet[Raw], convert it to an Ether layer
2079             # and then extract Raw from it
2080             payload_info = self.payload_to_info(
2081                 str(Ether(str(packet[Raw]))[Raw]))
2082
2083         return payload_info
2084
2085     def verify_captured_pkts(self, dst_if, capture, compare_func):
2086         """
2087         Verify captured packet stream for specified interface.
2088         Compare ingress with egress packets using the specified compare fn
2089
2090         :param dst_if: egress interface of DUT
2091         :param capture: captured packets
2092         :param compare_func: function to compare in and out packet
2093         """
2094         self.logger.info("Verifying capture on interface %s using function %s"
2095                          % (dst_if.name, compare_func.func_name))
2096
2097         last_info = dict()
2098         for i in self.pg_interfaces:
2099             last_info[i.sw_if_index] = None
2100         dst_sw_if_index = dst_if.sw_if_index
2101
2102         for packet in capture:
2103             try:
2104                 # extract payload_info from packet's payload
2105                 payload_info = self.get_payload_info(packet)
2106                 packet_index = payload_info.index
2107
2108                 self.logger.debug("Verifying packet with index %d"
2109                                   % (packet_index))
2110                 # packet should have arrived on the expected interface
2111                 self.assertEqual(payload_info.dst, dst_sw_if_index)
2112                 self.logger.debug(
2113                     "Got packet on interface %s: src=%u (idx=%u)" %
2114                     (dst_if.name, payload_info.src, packet_index))
2115
2116                 # search for payload_info with same src and dst if_index
2117                 # this will give us the transmitted packet
2118                 next_info = self.get_next_packet_info_for_interface2(
2119                     payload_info.src, dst_sw_if_index,
2120                     last_info[payload_info.src])
2121                 last_info[payload_info.src] = next_info
2122                 # next_info should not be None
2123                 self.assertTrue(next_info is not None)
2124                 # index of tx and rx packets should be equal
2125                 self.assertEqual(packet_index, next_info.index)
2126                 # data field of next_info contains the tx packet
2127                 txed_packet = next_info.data
2128
2129                 self.logger.debug(ppp("Transmitted packet:",
2130                                       txed_packet))  # ppp=Pretty Print Packet
2131
2132                 self.logger.debug(ppp("Received packet:", packet))
2133
2134                 # compare rcvd packet with expected packet using compare_func
2135                 compare_func(txed_packet, packet)
2136
2137             except:
2138                 print packet.command()
2139                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2140                 raise
2141
2142         # have all expected packets arrived?
2143         for i in self.pg_interfaces:
2144             remaining_packet = self.get_next_packet_info_for_interface2(
2145                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
2146             self.assertTrue(remaining_packet is None,
2147                             "Interface %s: Packet expected from interface %s "
2148                             "didn't arrive" % (dst_if.name, i.name))
2149
2150
2151 if __name__ == '__main__':
2152     unittest.main(testRunner=VppTestRunner)