Scapy upgrade to 2.4.0.rc5
[vpp.git] / test / test_srv6.py
1 #!/usr/bin/env python
2
3 import unittest
4 import binascii
5 from socket import AF_INET6
6
7 from framework import VppTestCase, VppTestRunner
8 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
9 from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \
10     SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes
11
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether, Dot1Q
14 from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
15 from scapy.layers.inet import IP, UDP
16
17 from scapy.utils import inet_pton, inet_ntop
18
19 from util import ppp
20
21
22 class TestSRv6(VppTestCase):
23     """ SRv6 Test Case """
24
25     @classmethod
26     def setUpClass(self):
27         super(TestSRv6, self).setUpClass()
28
29     def setUp(self):
30         """ Perform test setup before each test case.
31         """
32         super(TestSRv6, self).setUp()
33
34         # packet sizes, inclusive L2 overhead
35         self.pg_packet_sizes = [64, 512, 1518, 9018]
36
37         # reset packet_infos
38         self.reset_packet_infos()
39
40     def tearDown(self):
41         """ Clean up test setup after each test case.
42         """
43         self.teardown_interfaces()
44
45         super(TestSRv6, self).tearDown()
46
47     def configure_interface(self,
48                             interface,
49                             ipv6=False, ipv4=False,
50                             ipv6_table_id=0, ipv4_table_id=0):
51         """ Configure interface.
52         :param ipv6: configure IPv6 on interface
53         :param ipv4: configure IPv4 on interface
54         :param ipv6_table_id: FIB table_id for IPv6
55         :param ipv4_table_id: FIB table_id for IPv4
56         """
57         self.logger.debug("Configuring interface %s" % (interface.name))
58         if ipv6:
59             self.logger.debug("Configuring IPv6")
60             interface.set_table_ip6(ipv6_table_id)
61             interface.config_ip6()
62             interface.resolve_ndp(timeout=5)
63         if ipv4:
64             self.logger.debug("Configuring IPv4")
65             interface.set_table_ip4(ipv4_table_id)
66             interface.config_ip4()
67             interface.resolve_arp()
68         interface.admin_up()
69
70     def setup_interfaces(self, ipv6=[], ipv4=[],
71                          ipv6_table_id=[], ipv4_table_id=[]):
72         """ Create and configure interfaces.
73
74         :param ipv6: list of interface IPv6 capabilities
75         :param ipv4: list of interface IPv4 capabilities
76         :param ipv6_table_id: list of intf IPv6 FIB table_ids
77         :param ipv4_table_id: list of intf IPv4 FIB table_ids
78         :returns: List of created interfaces.
79         """
80         # how many interfaces?
81         if len(ipv6):
82             count = len(ipv6)
83         else:
84             count = len(ipv4)
85         self.logger.debug("Creating and configuring %d interfaces" % (count))
86
87         # fill up ipv6 and ipv4 lists if needed
88         # not enabled (False) is the default
89         if len(ipv6) < count:
90             ipv6 += (count - len(ipv6)) * [False]
91         if len(ipv4) < count:
92             ipv4 += (count - len(ipv4)) * [False]
93
94         # fill up table_id lists if needed
95         # table_id 0 (global) is the default
96         if len(ipv6_table_id) < count:
97             ipv6_table_id += (count - len(ipv6_table_id)) * [0]
98         if len(ipv4_table_id) < count:
99             ipv4_table_id += (count - len(ipv4_table_id)) * [0]
100
101         # create 'count' pg interfaces
102         self.create_pg_interfaces(range(count))
103
104         # setup all interfaces
105         for i in range(count):
106             intf = self.pg_interfaces[i]
107             self.configure_interface(intf,
108                                      ipv6[i], ipv4[i],
109                                      ipv6_table_id[i], ipv4_table_id[i])
110
111         if any(ipv6):
112             self.logger.debug(self.vapi.cli("show ip6 neighbors"))
113         if any(ipv4):
114             self.logger.debug(self.vapi.cli("show ip arp"))
115         self.logger.debug(self.vapi.cli("show interface"))
116         self.logger.debug(self.vapi.cli("show hardware"))
117
118         return self.pg_interfaces
119
120     def teardown_interfaces(self):
121         """ Unconfigure and bring down interface.
122         """
123         self.logger.debug("Tearing down interfaces")
124         # tear down all interfaces
125         # AFAIK they cannot be deleted
126         for i in self.pg_interfaces:
127             self.logger.debug("Tear down interface %s" % (i.name))
128             i.admin_down()
129             i.unconfig()
130
131     @unittest.skipUnless(0, "PC to fix")
132     def test_SRv6_T_Encaps(self):
133         """ Test SRv6 Transit.Encaps behavior for IPv6.
134         """
135         # send traffic to one destination interface
136         # source and destination are IPv6 only
137         self.setup_interfaces(ipv6=[True, True])
138
139         # configure FIB entries
140         route = VppIpRoute(self, "a4::", 64,
141                            [VppRoutePath(self.pg1.remote_ip6,
142                                          self.pg1.sw_if_index,
143                                          proto=DpoProto.DPO_PROTO_IP6)],
144                            is_ip6=1)
145         route.add_vpp_config()
146
147         # configure encaps IPv6 source address
148         # needs to be done before SR Policy config
149         # TODO: API?
150         self.vapi.cli("set sr encaps source addr a3::")
151
152         bsid = 'a3::9999:1'
153         # configure SRv6 Policy
154         # Note: segment list order: first -> last
155         sr_policy = VppSRv6Policy(
156             self, bsid=bsid,
157             is_encap=1,
158             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
159             weight=1, fib_table=0,
160             segments=['a4::', 'a5::', 'a6::c7'],
161             source='a3::')
162         sr_policy.add_vpp_config()
163         self.sr_policy = sr_policy
164
165         # log the sr policies
166         self.logger.info(self.vapi.cli("show sr policies"))
167
168         # steer IPv6 traffic to a7::/64 into SRv6 Policy
169         # use the bsid of the above self.sr_policy
170         pol_steering = VppSRv6Steering(
171                         self,
172                         bsid=self.sr_policy.bsid,
173                         prefix="a7::", mask_width=64,
174                         traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
175                         sr_policy_index=0, table_id=0,
176                         sw_if_index=0)
177         pol_steering.add_vpp_config()
178
179         # log the sr steering policies
180         self.logger.info(self.vapi.cli("show sr steering policies"))
181
182         # create packets
183         count = len(self.pg_packet_sizes)
184         dst_inner = 'a7::1234'
185         pkts = []
186
187         # create IPv6 packets without SRH
188         packet_header = self.create_packet_header_IPv6(dst_inner)
189         # create traffic stream pg0->pg1
190         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
191                                        self.pg_packet_sizes, count))
192
193         # create IPv6 packets with SRH
194         # packets with segments-left 1, active segment a7::
195         packet_header = self.create_packet_header_IPv6_SRH(
196             sidlist=['a8::', 'a7::', 'a6::'],
197             segleft=1)
198         # create traffic stream pg0->pg1
199         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
200                                        self.pg_packet_sizes, count))
201
202         # create IPv6 packets with SRH and IPv6
203         # packets with segments-left 1, active segment a7::
204         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
205             dst_inner,
206             sidlist=['a8::', 'a7::', 'a6::'],
207             segleft=1)
208         # create traffic stream pg0->pg1
209         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
210                                        self.pg_packet_sizes, count))
211
212         # send packets and verify received packets
213         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
214                                   self.compare_rx_tx_packet_T_Encaps)
215
216         # log the localsid counters
217         self.logger.info(self.vapi.cli("show sr localsid"))
218
219         # remove SR steering
220         pol_steering.remove_vpp_config()
221         self.logger.info(self.vapi.cli("show sr steering policies"))
222
223         # remove SR Policies
224         self.sr_policy.remove_vpp_config()
225         self.logger.info(self.vapi.cli("show sr policies"))
226
227         # remove FIB entries
228         # done by tearDown
229
230         # cleanup interfaces
231         self.teardown_interfaces()
232
233     @unittest.skipUnless(0, "PC to fix")
234     def test_SRv6_T_Insert(self):
235         """ Test SRv6 Transit.Insert behavior (IPv6 only).
236         """
237         # send traffic to one destination interface
238         # source and destination are IPv6 only
239         self.setup_interfaces(ipv6=[True, True])
240
241         # configure FIB entries
242         route = VppIpRoute(self, "a4::", 64,
243                            [VppRoutePath(self.pg1.remote_ip6,
244                                          self.pg1.sw_if_index,
245                                          proto=DpoProto.DPO_PROTO_IP6)],
246                            is_ip6=1)
247         route.add_vpp_config()
248
249         # configure encaps IPv6 source address
250         # needs to be done before SR Policy config
251         # TODO: API?
252         self.vapi.cli("set sr encaps source addr a3::")
253
254         bsid = 'a3::9999:1'
255         # configure SRv6 Policy
256         # Note: segment list order: first -> last
257         sr_policy = VppSRv6Policy(
258             self, bsid=bsid,
259             is_encap=0,
260             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
261             weight=1, fib_table=0,
262             segments=['a4::', 'a5::', 'a6::c7'],
263             source='a3::')
264         sr_policy.add_vpp_config()
265         self.sr_policy = sr_policy
266
267         # log the sr policies
268         self.logger.info(self.vapi.cli("show sr policies"))
269
270         # steer IPv6 traffic to a7::/64 into SRv6 Policy
271         # use the bsid of the above self.sr_policy
272         pol_steering = VppSRv6Steering(
273                         self,
274                         bsid=self.sr_policy.bsid,
275                         prefix="a7::", mask_width=64,
276                         traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
277                         sr_policy_index=0, table_id=0,
278                         sw_if_index=0)
279         pol_steering.add_vpp_config()
280
281         # log the sr steering policies
282         self.logger.info(self.vapi.cli("show sr steering policies"))
283
284         # create packets
285         count = len(self.pg_packet_sizes)
286         dst_inner = 'a7::1234'
287         pkts = []
288
289         # create IPv6 packets without SRH
290         packet_header = self.create_packet_header_IPv6(dst_inner)
291         # create traffic stream pg0->pg1
292         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
293                                        self.pg_packet_sizes, count))
294
295         # create IPv6 packets with SRH
296         # packets with segments-left 1, active segment a7::
297         packet_header = self.create_packet_header_IPv6_SRH(
298             sidlist=['a8::', 'a7::', 'a6::'],
299             segleft=1)
300         # create traffic stream pg0->pg1
301         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
302                                        self.pg_packet_sizes, count))
303
304         # send packets and verify received packets
305         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
306                                   self.compare_rx_tx_packet_T_Insert)
307
308         # log the localsid counters
309         self.logger.info(self.vapi.cli("show sr localsid"))
310
311         # remove SR steering
312         pol_steering.remove_vpp_config()
313         self.logger.info(self.vapi.cli("show sr steering policies"))
314
315         # remove SR Policies
316         self.sr_policy.remove_vpp_config()
317         self.logger.info(self.vapi.cli("show sr policies"))
318
319         # remove FIB entries
320         # done by tearDown
321
322         # cleanup interfaces
323         self.teardown_interfaces()
324
325     @unittest.skipUnless(0, "PC to fix")
326     def test_SRv6_T_Encaps_IPv4(self):
327         """ Test SRv6 Transit.Encaps behavior for IPv4.
328         """
329         # send traffic to one destination interface
330         # source interface is IPv4 only
331         # destination interface is IPv6 only
332         self.setup_interfaces(ipv6=[False, True], ipv4=[True, False])
333
334         # configure FIB entries
335         route = VppIpRoute(self, "a4::", 64,
336                            [VppRoutePath(self.pg1.remote_ip6,
337                                          self.pg1.sw_if_index,
338                                          proto=DpoProto.DPO_PROTO_IP6)],
339                            is_ip6=1)
340         route.add_vpp_config()
341
342         # configure encaps IPv6 source address
343         # needs to be done before SR Policy config
344         # TODO: API?
345         self.vapi.cli("set sr encaps source addr a3::")
346
347         bsid = 'a3::9999:1'
348         # configure SRv6 Policy
349         # Note: segment list order: first -> last
350         sr_policy = VppSRv6Policy(
351             self, bsid=bsid,
352             is_encap=1,
353             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
354             weight=1, fib_table=0,
355             segments=['a4::', 'a5::', 'a6::c7'],
356             source='a3::')
357         sr_policy.add_vpp_config()
358         self.sr_policy = sr_policy
359
360         # log the sr policies
361         self.logger.info(self.vapi.cli("show sr policies"))
362
363         # steer IPv4 traffic to 7.1.1.0/24 into SRv6 Policy
364         # use the bsid of the above self.sr_policy
365         pol_steering = VppSRv6Steering(
366                         self,
367                         bsid=self.sr_policy.bsid,
368                         prefix="7.1.1.0", mask_width=24,
369                         traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV4,
370                         sr_policy_index=0, table_id=0,
371                         sw_if_index=0)
372         pol_steering.add_vpp_config()
373
374         # log the sr steering policies
375         self.logger.info(self.vapi.cli("show sr steering policies"))
376
377         # create packets
378         count = len(self.pg_packet_sizes)
379         dst_inner = '7.1.1.123'
380         pkts = []
381
382         # create IPv4 packets
383         packet_header = self.create_packet_header_IPv4(dst_inner)
384         # create traffic stream pg0->pg1
385         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
386                                        self.pg_packet_sizes, count))
387
388         # send packets and verify received packets
389         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
390                                   self.compare_rx_tx_packet_T_Encaps_IPv4)
391
392         # log the localsid counters
393         self.logger.info(self.vapi.cli("show sr localsid"))
394
395         # remove SR steering
396         pol_steering.remove_vpp_config()
397         self.logger.info(self.vapi.cli("show sr steering policies"))
398
399         # remove SR Policies
400         self.sr_policy.remove_vpp_config()
401         self.logger.info(self.vapi.cli("show sr policies"))
402
403         # remove FIB entries
404         # done by tearDown
405
406         # cleanup interfaces
407         self.teardown_interfaces()
408
409     @unittest.skip("VPP crashes after running this test")
410     def test_SRv6_T_Encaps_L2(self):
411         """ Test SRv6 Transit.Encaps behavior for L2.
412         """
413         # send traffic to one destination interface
414         # source interface is IPv4 only TODO?
415         # destination interface is IPv6 only
416         self.setup_interfaces(ipv6=[False, True], ipv4=[False, False])
417
418         # configure FIB entries
419         route = VppIpRoute(self, "a4::", 64,
420                            [VppRoutePath(self.pg1.remote_ip6,
421                                          self.pg1.sw_if_index,
422                                          proto=DpoProto.DPO_PROTO_IP6)],
423                            is_ip6=1)
424         route.add_vpp_config()
425
426         # configure encaps IPv6 source address
427         # needs to be done before SR Policy config
428         # TODO: API?
429         self.vapi.cli("set sr encaps source addr a3::")
430
431         bsid = 'a3::9999:1'
432         # configure SRv6 Policy
433         # Note: segment list order: first -> last
434         sr_policy = VppSRv6Policy(
435             self, bsid=bsid,
436             is_encap=1,
437             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
438             weight=1, fib_table=0,
439             segments=['a4::', 'a5::', 'a6::c7'],
440             source='a3::')
441         sr_policy.add_vpp_config()
442         self.sr_policy = sr_policy
443
444         # log the sr policies
445         self.logger.info(self.vapi.cli("show sr policies"))
446
447         # steer L2 traffic into SRv6 Policy
448         # use the bsid of the above self.sr_policy
449         pol_steering = VppSRv6Steering(
450                         self,
451                         bsid=self.sr_policy.bsid,
452                         prefix="::", mask_width=0,
453                         traffic_type=SRv6PolicySteeringTypes.SR_STEER_L2,
454                         sr_policy_index=0, table_id=0,
455                         sw_if_index=self.pg0.sw_if_index)
456         pol_steering.add_vpp_config()
457
458         # log the sr steering policies
459         self.logger.info(self.vapi.cli("show sr steering policies"))
460
461         # create packets
462         count = len(self.pg_packet_sizes)
463         pkts = []
464
465         # create L2 packets without dot1q header
466         packet_header = self.create_packet_header_L2()
467         # create traffic stream pg0->pg1
468         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
469                                        self.pg_packet_sizes, count))
470
471         # create L2 packets with dot1q header
472         packet_header = self.create_packet_header_L2(vlan=123)
473         # create traffic stream pg0->pg1
474         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
475                                        self.pg_packet_sizes, count))
476
477         # send packets and verify received packets
478         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
479                                   self.compare_rx_tx_packet_T_Encaps_L2)
480
481         # log the localsid counters
482         self.logger.info(self.vapi.cli("show sr localsid"))
483
484         # remove SR steering
485         pol_steering.remove_vpp_config()
486         self.logger.info(self.vapi.cli("show sr steering policies"))
487
488         # remove SR Policies
489         self.sr_policy.remove_vpp_config()
490         self.logger.info(self.vapi.cli("show sr policies"))
491
492         # remove FIB entries
493         # done by tearDown
494
495         # cleanup interfaces
496         self.teardown_interfaces()
497
498     def test_SRv6_End(self):
499         """ Test SRv6 End (without PSP) behavior.
500         """
501         # send traffic to one destination interface
502         # source and destination interfaces are IPv6 only
503         self.setup_interfaces(ipv6=[True, True])
504
505         # configure FIB entries
506         route = VppIpRoute(self, "a4::", 64,
507                            [VppRoutePath(self.pg1.remote_ip6,
508                                          self.pg1.sw_if_index,
509                                          proto=DpoProto.DPO_PROTO_IP6)],
510                            is_ip6=1)
511         route.add_vpp_config()
512
513         # configure SRv6 localSID End without PSP behavior
514         localsid = VppSRv6LocalSID(
515                         self, localsid_addr='A3::0',
516                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
517                         nh_addr='::',
518                         end_psp=0,
519                         sw_if_index=0,
520                         vlan_index=0,
521                         fib_table=0)
522         localsid.add_vpp_config()
523         # log the localsids
524         self.logger.debug(self.vapi.cli("show sr localsid"))
525
526         # create IPv6 packets with SRH (SL=2, SL=1, SL=0)
527         # send one packet per SL value per packet size
528         # SL=0 packet with localSID End with USP needs 2nd SRH
529         count = len(self.pg_packet_sizes)
530         dst_inner = 'a4::1234'
531         pkts = []
532
533         # packets with segments-left 2, active segment a3::
534         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
535                 dst_inner,
536                 sidlist=['a5::', 'a4::', 'a3::'],
537                 segleft=2)
538         # create traffic stream pg0->pg1
539         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
540                                        self.pg_packet_sizes, count))
541
542         # packets with segments-left 1, active segment a3::
543         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
544                 dst_inner,
545                 sidlist=['a4::', 'a3::', 'a2::'],
546                 segleft=1)
547         # add to traffic stream pg0->pg1
548         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
549                                        self.pg_packet_sizes, count))
550
551         # TODO: test behavior with SL=0 packet (needs 2*SRH?)
552
553         # send packets and verify received packets
554         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
555                                   self.compare_rx_tx_packet_End)
556
557         # log the localsid counters
558         self.logger.info(self.vapi.cli("show sr localsid"))
559
560         # remove SRv6 localSIDs
561         localsid.remove_vpp_config()
562
563         # remove FIB entries
564         # done by tearDown
565
566         # cleanup interfaces
567         self.teardown_interfaces()
568
569     def test_SRv6_End_with_PSP(self):
570         """ Test SRv6 End with PSP behavior.
571         """
572         # send traffic to one destination interface
573         # source and destination interfaces are IPv6 only
574         self.setup_interfaces(ipv6=[True, True])
575
576         # configure FIB entries
577         route = VppIpRoute(self, "a4::", 64,
578                            [VppRoutePath(self.pg1.remote_ip6,
579                                          self.pg1.sw_if_index,
580                                          proto=DpoProto.DPO_PROTO_IP6)],
581                            is_ip6=1)
582         route.add_vpp_config()
583
584         # configure SRv6 localSID End with PSP behavior
585         localsid = VppSRv6LocalSID(
586                         self, localsid_addr='A3::0',
587                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
588                         nh_addr='::',
589                         end_psp=1,
590                         sw_if_index=0,
591                         vlan_index=0,
592                         fib_table=0)
593         localsid.add_vpp_config()
594         # log the localsids
595         self.logger.debug(self.vapi.cli("show sr localsid"))
596
597         # create IPv6 packets with SRH (SL=2, SL=1)
598         # send one packet per SL value per packet size
599         # SL=0 packet with localSID End with PSP is dropped
600         count = len(self.pg_packet_sizes)
601         dst_inner = 'a4::1234'
602         pkts = []
603
604         # packets with segments-left 2, active segment a3::
605         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
606                     dst_inner,
607                     sidlist=['a5::', 'a4::', 'a3::'],
608                     segleft=2)
609         # create traffic stream pg0->pg1
610         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
611                                        self.pg_packet_sizes, count))
612
613         # packets with segments-left 1, active segment a3::
614         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
615                     dst_inner,
616                     sidlist=['a4::', 'a3::', 'a2::'],
617                     segleft=1)
618         # add to traffic stream pg0->pg1
619         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
620                                        self.pg_packet_sizes, count))
621
622         # send packets and verify received packets
623         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
624                                   self.compare_rx_tx_packet_End_PSP)
625
626         # log the localsid counters
627         self.logger.info(self.vapi.cli("show sr localsid"))
628
629         # remove SRv6 localSIDs
630         localsid.remove_vpp_config()
631
632         # remove FIB entries
633         # done by tearDown
634
635         # cleanup interfaces
636         self.teardown_interfaces()
637
638     def test_SRv6_End_X(self):
639         """ Test SRv6 End.X (without PSP) behavior.
640         """
641         # create three interfaces (1 source, 2 destinations)
642         # source and destination interfaces are IPv6 only
643         self.setup_interfaces(ipv6=[True, True, True])
644
645         # configure FIB entries
646         # a4::/64 via pg1 and pg2
647         route = VppIpRoute(self, "a4::", 64,
648                            [VppRoutePath(self.pg1.remote_ip6,
649                                          self.pg1.sw_if_index,
650                                          proto=DpoProto.DPO_PROTO_IP6),
651                             VppRoutePath(self.pg2.remote_ip6,
652                                          self.pg2.sw_if_index,
653                                          proto=DpoProto.DPO_PROTO_IP6)],
654                            is_ip6=1)
655         route.add_vpp_config()
656         self.logger.debug(self.vapi.cli("show ip6 fib"))
657
658         # configure SRv6 localSID End.X without PSP behavior
659         # End.X points to interface pg1
660         localsid = VppSRv6LocalSID(
661                         self, localsid_addr='A3::C4',
662                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
663                         nh_addr=self.pg1.remote_ip6,
664                         end_psp=0,
665                         sw_if_index=self.pg1.sw_if_index,
666                         vlan_index=0,
667                         fib_table=0)
668         localsid.add_vpp_config()
669         # log the localsids
670         self.logger.debug(self.vapi.cli("show sr localsid"))
671
672         # create IPv6 packets with SRH (SL=2, SL=1)
673         # send one packet per SL value per packet size
674         # SL=0 packet with localSID End with PSP is dropped
675         count = len(self.pg_packet_sizes)
676         dst_inner = 'a4::1234'
677         pkts = []
678
679         # packets with segments-left 2, active segment a3::c4
680         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
681                     dst_inner,
682                     sidlist=['a5::', 'a4::', 'a3::c4'],
683                     segleft=2)
684         # create traffic stream pg0->pg1
685         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
686                                        self.pg_packet_sizes, count))
687
688         # packets with segments-left 1, active segment a3::c4
689         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
690                     dst_inner,
691                     sidlist=['a4::', 'a3::c4', 'a2::'],
692                     segleft=1)
693         # add to traffic stream pg0->pg1
694         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
695                                        self.pg_packet_sizes, count))
696
697         # send packets and verify received packets
698         # using same comparison function as End (no PSP)
699         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
700                                   self.compare_rx_tx_packet_End)
701
702         # assert nothing was received on the other interface (pg2)
703         self.pg2.assert_nothing_captured("mis-directed packet(s)")
704
705         # log the localsid counters
706         self.logger.info(self.vapi.cli("show sr localsid"))
707
708         # remove SRv6 localSIDs
709         localsid.remove_vpp_config()
710
711         # remove FIB entries
712         # done by tearDown
713
714         # cleanup interfaces
715         self.teardown_interfaces()
716
717     def test_SRv6_End_X_with_PSP(self):
718         """ Test SRv6 End.X with PSP behavior.
719         """
720         # create three interfaces (1 source, 2 destinations)
721         # source and destination interfaces are IPv6 only
722         self.setup_interfaces(ipv6=[True, True, True])
723
724         # configure FIB entries
725         # a4::/64 via pg1 and pg2
726         route = VppIpRoute(self, "a4::", 64,
727                            [VppRoutePath(self.pg1.remote_ip6,
728                                          self.pg1.sw_if_index,
729                                          proto=DpoProto.DPO_PROTO_IP6),
730                             VppRoutePath(self.pg2.remote_ip6,
731                                          self.pg2.sw_if_index,
732                                          proto=DpoProto.DPO_PROTO_IP6)],
733                            is_ip6=1)
734         route.add_vpp_config()
735
736         # configure SRv6 localSID End with PSP behavior
737         localsid = VppSRv6LocalSID(
738                         self, localsid_addr='A3::C4',
739                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
740                         nh_addr=self.pg1.remote_ip6,
741                         end_psp=1,
742                         sw_if_index=self.pg1.sw_if_index,
743                         vlan_index=0,
744                         fib_table=0)
745         localsid.add_vpp_config()
746         # log the localsids
747         self.logger.debug(self.vapi.cli("show sr localsid"))
748
749         # create IPv6 packets with SRH (SL=2, SL=1)
750         # send one packet per SL value per packet size
751         # SL=0 packet with localSID End with PSP is dropped
752         count = len(self.pg_packet_sizes)
753         dst_inner = 'a4::1234'
754         pkts = []
755
756         # packets with segments-left 2, active segment a3::
757         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
758                     dst_inner,
759                     sidlist=['a5::', 'a4::', 'a3::c4'],
760                     segleft=2)
761         # create traffic stream pg0->pg1
762         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
763                                        self.pg_packet_sizes, count))
764
765         # packets with segments-left 1, active segment a3::
766         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
767                     dst_inner,
768                     sidlist=['a4::', 'a3::c4', 'a2::'],
769                     segleft=1)
770         # add to traffic stream pg0->pg1
771         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
772                                        self.pg_packet_sizes, count))
773
774         # send packets and verify received packets
775         # using same comparison function as End with PSP
776         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
777                                   self.compare_rx_tx_packet_End_PSP)
778
779         # assert nothing was received on the other interface (pg2)
780         self.pg2.assert_nothing_captured("mis-directed packet(s)")
781
782         # log the localsid counters
783         self.logger.info(self.vapi.cli("show sr localsid"))
784
785         # remove SRv6 localSIDs
786         localsid.remove_vpp_config()
787
788         # remove FIB entries
789         # done by tearDown
790
791         # cleanup interfaces
792         self.teardown_interfaces()
793
794     def test_SRv6_End_DX6(self):
795         """ Test SRv6 End.DX6 behavior.
796         """
797         # send traffic to one destination interface
798         # source and destination interfaces are IPv6 only
799         self.setup_interfaces(ipv6=[True, True])
800
801         # configure SRv6 localSID End.DX6 behavior
802         localsid = VppSRv6LocalSID(
803                         self, localsid_addr='a3::c4',
804                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX6,
805                         nh_addr=self.pg1.remote_ip6,
806                         end_psp=0,
807                         sw_if_index=self.pg1.sw_if_index,
808                         vlan_index=0,
809                         fib_table=0)
810         localsid.add_vpp_config()
811         # log the localsids
812         self.logger.debug(self.vapi.cli("show sr localsid"))
813
814         # create IPv6 packets with SRH (SL=0)
815         # send one packet per packet size
816         count = len(self.pg_packet_sizes)
817         dst_inner = 'a4::1234'  # inner header destination address
818         pkts = []
819
820         # packets with SRH, segments-left 0, active segment a3::c4
821         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
822                         dst_inner,
823                         sidlist=['a3::c4', 'a2::', 'a1::'],
824                         segleft=0)
825         # add to traffic stream pg0->pg1
826         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
827                                        self.pg_packet_sizes, count))
828
829         # packets without SRH, IPv6 in IPv6
830         # outer IPv6 dest addr is the localsid End.DX6
831         packet_header = self.create_packet_header_IPv6_IPv6(
832                                             dst_inner,
833                                             dst_outer='a3::c4')
834         # add to traffic stream pg0->pg1
835         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
836                                        self.pg_packet_sizes, count))
837
838         # send packets and verify received packets
839         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
840                                   self.compare_rx_tx_packet_End_DX6)
841
842         # log the localsid counters
843         self.logger.info(self.vapi.cli("show sr localsid"))
844
845         # remove SRv6 localSIDs
846         localsid.remove_vpp_config()
847
848         # cleanup interfaces
849         self.teardown_interfaces()
850
851     def test_SRv6_End_DT6(self):
852         """ Test SRv6 End.DT6 behavior.
853         """
854         # create three interfaces (1 source, 2 destinations)
855         # all interfaces are IPv6 only
856         # source interface in global FIB (0)
857         # destination interfaces in global and vrf
858         vrf_1 = 1
859         self.setup_interfaces(ipv6=[True, True, True],
860                               ipv6_table_id=[0, 0, vrf_1])
861
862         # configure FIB entries
863         # a4::/64 is reachable
864         #     via pg1 in table 0 (global)
865         #     and via pg2 in table vrf_1
866         route0 = VppIpRoute(self, "a4::", 64,
867                             [VppRoutePath(self.pg1.remote_ip6,
868                                           self.pg1.sw_if_index,
869                                           proto=DpoProto.DPO_PROTO_IP6,
870                                           nh_table_id=0)],
871                             table_id=0,
872                             is_ip6=1)
873         route0.add_vpp_config()
874         route1 = VppIpRoute(self, "a4::", 64,
875                             [VppRoutePath(self.pg2.remote_ip6,
876                                           self.pg2.sw_if_index,
877                                           proto=DpoProto.DPO_PROTO_IP6,
878                                           nh_table_id=vrf_1)],
879                             table_id=vrf_1,
880                             is_ip6=1)
881         route1.add_vpp_config()
882         self.logger.debug(self.vapi.cli("show ip6 fib"))
883
884         # configure SRv6 localSID End.DT6 behavior
885         # Note:
886         # fib_table: where the localsid is installed
887         # sw_if_index: in T-variants of localsid this is the vrf table_id
888         localsid = VppSRv6LocalSID(
889                         self, localsid_addr='a3::c4',
890                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT6,
891                         nh_addr='::',
892                         end_psp=0,
893                         sw_if_index=vrf_1,
894                         vlan_index=0,
895                         fib_table=0)
896         localsid.add_vpp_config()
897         # log the localsids
898         self.logger.debug(self.vapi.cli("show sr localsid"))
899
900         # create IPv6 packets with SRH (SL=0)
901         # send one packet per packet size
902         count = len(self.pg_packet_sizes)
903         dst_inner = 'a4::1234'  # inner header destination address
904         pkts = []
905
906         # packets with SRH, segments-left 0, active segment a3::c4
907         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
908                         dst_inner,
909                         sidlist=['a3::c4', 'a2::', 'a1::'],
910                         segleft=0)
911         # add to traffic stream pg0->pg1
912         pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
913                                        self.pg_packet_sizes, count))
914
915         # packets without SRH, IPv6 in IPv6
916         # outer IPv6 dest addr is the localsid End.DT6
917         packet_header = self.create_packet_header_IPv6_IPv6(
918                                             dst_inner,
919                                             dst_outer='a3::c4')
920         # add to traffic stream pg0->pg1
921         pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
922                                        self.pg_packet_sizes, count))
923
924         # send packets and verify received packets
925         # using same comparison function as End.DX6
926         self.send_and_verify_pkts(self.pg0, pkts, self.pg2,
927                                   self.compare_rx_tx_packet_End_DX6)
928
929         # assert nothing was received on the other interface (pg2)
930         self.pg1.assert_nothing_captured("mis-directed packet(s)")
931
932         # log the localsid counters
933         self.logger.info(self.vapi.cli("show sr localsid"))
934
935         # remove SRv6 localSIDs
936         localsid.remove_vpp_config()
937
938         # remove FIB entries
939         # done by tearDown
940
941         # cleanup interfaces
942         self.teardown_interfaces()
943
944     def test_SRv6_End_DX4(self):
945         """ Test SRv6 End.DX4 behavior.
946         """
947         # send traffic to one destination interface
948         # source interface is IPv6 only
949         # destination interface is IPv4 only
950         self.setup_interfaces(ipv6=[True, False], ipv4=[False, True])
951
952         # configure SRv6 localSID End.DX4 behavior
953         localsid = VppSRv6LocalSID(
954                         self, localsid_addr='a3::c4',
955                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX4,
956                         nh_addr=self.pg1.remote_ip4,
957                         end_psp=0,
958                         sw_if_index=self.pg1.sw_if_index,
959                         vlan_index=0,
960                         fib_table=0)
961         localsid.add_vpp_config()
962         # log the localsids
963         self.logger.debug(self.vapi.cli("show sr localsid"))
964
965         # send one packet per packet size
966         count = len(self.pg_packet_sizes)
967         dst_inner = '4.1.1.123'  # inner header destination address
968         pkts = []
969
970         # packets with SRH, segments-left 0, active segment a3::c4
971         packet_header = self.create_packet_header_IPv6_SRH_IPv4(
972                         dst_inner,
973                         sidlist=['a3::c4', 'a2::', 'a1::'],
974                         segleft=0)
975         # add to traffic stream pg0->pg1
976         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
977                                        self.pg_packet_sizes, count))
978
979         # packets without SRH, IPv4 in IPv6
980         # outer IPv6 dest addr is the localsid End.DX4
981         packet_header = self.create_packet_header_IPv6_IPv4(
982                                             dst_inner,
983                                             dst_outer='a3::c4')
984         # add to traffic stream pg0->pg1
985         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
986                                        self.pg_packet_sizes, count))
987
988         # send packets and verify received packets
989         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
990                                   self.compare_rx_tx_packet_End_DX4)
991
992         # log the localsid counters
993         self.logger.info(self.vapi.cli("show sr localsid"))
994
995         # remove SRv6 localSIDs
996         localsid.remove_vpp_config()
997
998         # cleanup interfaces
999         self.teardown_interfaces()
1000
1001     def test_SRv6_End_DT4(self):
1002         """ Test SRv6 End.DT4 behavior.
1003         """
1004         # create three interfaces (1 source, 2 destinations)
1005         # source interface is IPv6-only
1006         # destination interfaces are IPv4 only
1007         # source interface in global FIB (0)
1008         # destination interfaces in global and vrf
1009         vrf_1 = 1
1010         self.setup_interfaces(ipv6=[True, False, False],
1011                               ipv4=[False, True, True],
1012                               ipv6_table_id=[0, 0, 0],
1013                               ipv4_table_id=[0, 0, vrf_1])
1014
1015         # configure FIB entries
1016         # 4.1.1.0/24 is reachable
1017         #     via pg1 in table 0 (global)
1018         #     and via pg2 in table vrf_1
1019         route0 = VppIpRoute(self, "4.1.1.0", 24,
1020                             [VppRoutePath(self.pg1.remote_ip4,
1021                                           self.pg1.sw_if_index,
1022                                           nh_table_id=0)],
1023                             table_id=0,
1024                             is_ip6=0)
1025         route0.add_vpp_config()
1026         route1 = VppIpRoute(self, "4.1.1.0", 24,
1027                             [VppRoutePath(self.pg2.remote_ip4,
1028                                           self.pg2.sw_if_index,
1029                                           nh_table_id=vrf_1)],
1030                             table_id=vrf_1,
1031                             is_ip6=0)
1032         route1.add_vpp_config()
1033         self.logger.debug(self.vapi.cli("show ip fib"))
1034
1035         # configure SRv6 localSID End.DT6 behavior
1036         # Note:
1037         # fib_table: where the localsid is installed
1038         # sw_if_index: in T-variants of localsid: vrf table_id
1039         localsid = VppSRv6LocalSID(
1040                         self, localsid_addr='a3::c4',
1041                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT4,
1042                         nh_addr='::',
1043                         end_psp=0,
1044                         sw_if_index=vrf_1,
1045                         vlan_index=0,
1046                         fib_table=0)
1047         localsid.add_vpp_config()
1048         # log the localsids
1049         self.logger.debug(self.vapi.cli("show sr localsid"))
1050
1051         # create IPv6 packets with SRH (SL=0)
1052         # send one packet per packet size
1053         count = len(self.pg_packet_sizes)
1054         dst_inner = '4.1.1.123'  # inner header destination address
1055         pkts = []
1056
1057         # packets with SRH, segments-left 0, active segment a3::c4
1058         packet_header = self.create_packet_header_IPv6_SRH_IPv4(
1059                         dst_inner,
1060                         sidlist=['a3::c4', 'a2::', 'a1::'],
1061                         segleft=0)
1062         # add to traffic stream pg0->pg1
1063         pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
1064                                        self.pg_packet_sizes, count))
1065
1066         # packets without SRH, IPv6 in IPv6
1067         # outer IPv6 dest addr is the localsid End.DX4
1068         packet_header = self.create_packet_header_IPv6_IPv4(
1069                                             dst_inner,
1070                                             dst_outer='a3::c4')
1071         # add to traffic stream pg0->pg1
1072         pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
1073                                        self.pg_packet_sizes, count))
1074
1075         # send packets and verify received packets
1076         # using same comparison function as End.DX4
1077         self.send_and_verify_pkts(self.pg0, pkts, self.pg2,
1078                                   self.compare_rx_tx_packet_End_DX4)
1079
1080         # assert nothing was received on the other interface (pg2)
1081         self.pg1.assert_nothing_captured("mis-directed packet(s)")
1082
1083         # log the localsid counters
1084         self.logger.info(self.vapi.cli("show sr localsid"))
1085
1086         # remove SRv6 localSIDs
1087         localsid.remove_vpp_config()
1088
1089         # remove FIB entries
1090         # done by tearDown
1091
1092         # cleanup interfaces
1093         self.teardown_interfaces()
1094
1095     def test_SRv6_End_DX2(self):
1096         """ Test SRv6 End.DX2 behavior.
1097         """
1098         # send traffic to one destination interface
1099         # source interface is IPv6 only
1100         self.setup_interfaces(ipv6=[True, False], ipv4=[False, False])
1101
1102         # configure SRv6 localSID End.DX2 behavior
1103         localsid = VppSRv6LocalSID(
1104                         self, localsid_addr='a3::c4',
1105                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX2,
1106                         nh_addr='::',
1107                         end_psp=0,
1108                         sw_if_index=self.pg1.sw_if_index,
1109                         vlan_index=0,
1110                         fib_table=0)
1111         localsid.add_vpp_config()
1112         # log the localsids
1113         self.logger.debug(self.vapi.cli("show sr localsid"))
1114
1115         # send one packet per packet size
1116         count = len(self.pg_packet_sizes)
1117         pkts = []
1118
1119         # packets with SRH, segments-left 0, active segment a3::c4
1120         # L2 has no dot1q header
1121         packet_header = self.create_packet_header_IPv6_SRH_L2(
1122                             sidlist=['a3::c4', 'a2::', 'a1::'],
1123                             segleft=0,
1124                             vlan=0)
1125         # add to traffic stream pg0->pg1
1126         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1127                                        self.pg_packet_sizes, count))
1128
1129         # packets with SRH, segments-left 0, active segment a3::c4
1130         # L2 has dot1q header
1131         packet_header = self.create_packet_header_IPv6_SRH_L2(
1132                             sidlist=['a3::c4', 'a2::', 'a1::'],
1133                             segleft=0,
1134                             vlan=123)
1135         # add to traffic stream pg0->pg1
1136         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1137                                        self.pg_packet_sizes, count))
1138
1139         # packets without SRH, L2 in IPv6
1140         # outer IPv6 dest addr is the localsid End.DX2
1141         # L2 has no dot1q header
1142         packet_header = self.create_packet_header_IPv6_L2(
1143                                             dst_outer='a3::c4',
1144                                             vlan=0)
1145         # add to traffic stream pg0->pg1
1146         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1147                                        self.pg_packet_sizes, count))
1148
1149         # packets without SRH, L2 in IPv6
1150         # outer IPv6 dest addr is the localsid End.DX2
1151         # L2 has dot1q header
1152         packet_header = self.create_packet_header_IPv6_L2(
1153                                             dst_outer='a3::c4',
1154                                             vlan=123)
1155         # add to traffic stream pg0->pg1
1156         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1157                                        self.pg_packet_sizes, count))
1158
1159         # send packets and verify received packets
1160         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
1161                                   self.compare_rx_tx_packet_End_DX2)
1162
1163         # log the localsid counters
1164         self.logger.info(self.vapi.cli("show sr localsid"))
1165
1166         # remove SRv6 localSIDs
1167         localsid.remove_vpp_config()
1168
1169         # cleanup interfaces
1170         self.teardown_interfaces()
1171
1172     @unittest.skipUnless(0, "PC to fix")
1173     def test_SRv6_T_Insert_Classifier(self):
1174         """ Test SRv6 Transit.Insert behavior (IPv6 only).
1175             steer packets using the classifier
1176         """
1177         # send traffic to one destination interface
1178         # source and destination are IPv6 only
1179         self.setup_interfaces(ipv6=[False, False, False, True, True])
1180
1181         # configure FIB entries
1182         route = VppIpRoute(self, "a4::", 64,
1183                            [VppRoutePath(self.pg4.remote_ip6,
1184                                          self.pg4.sw_if_index,
1185                                          proto=DpoProto.DPO_PROTO_IP6)],
1186                            is_ip6=1)
1187         route.add_vpp_config()
1188
1189         # configure encaps IPv6 source address
1190         # needs to be done before SR Policy config
1191         # TODO: API?
1192         self.vapi.cli("set sr encaps source addr a3::")
1193
1194         bsid = 'a3::9999:1'
1195         # configure SRv6 Policy
1196         # Note: segment list order: first -> last
1197         sr_policy = VppSRv6Policy(
1198             self, bsid=bsid,
1199             is_encap=0,
1200             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
1201             weight=1, fib_table=0,
1202             segments=['a4::', 'a5::', 'a6::c7'],
1203             source='a3::')
1204         sr_policy.add_vpp_config()
1205         self.sr_policy = sr_policy
1206
1207         # log the sr policies
1208         self.logger.info(self.vapi.cli("show sr policies"))
1209
1210         # add classify table
1211         # mask on dst ip address prefix a7::/8
1212         mask = '{:0<16}'.format('ff')
1213         r = self.vapi.classify_add_del_table(
1214             1,
1215             binascii.unhexlify(mask),
1216             match_n_vectors=(len(mask) - 1) // 32 + 1,
1217             current_data_flag=1,
1218             skip_n_vectors=2)  # data offset
1219         self.assertIsNotNone(r, msg='No response msg for add_del_table')
1220         table_index = r.new_table_index
1221
1222         # add the source routign node as a ip6 inacl netxt node
1223         r = self.vapi.add_node_next('ip6-inacl',
1224                                     'sr-pl-rewrite-insert')
1225         inacl_next_node_index = r.node_index
1226
1227         match = '{:0<16}'.format('a7')
1228         r = self.vapi.classify_add_del_session(
1229             1,
1230             table_index,
1231             binascii.unhexlify(match),
1232             hit_next_index=inacl_next_node_index,
1233             action=3,
1234             metadata=0)  # sr policy index
1235         self.assertIsNotNone(r, msg='No response msg for add_del_session')
1236
1237         # log the classify table used in the steering policy
1238         self.logger.info(self.vapi.cli("show classify table"))
1239
1240         r = self.vapi.input_acl_set_interface(
1241             is_add=1,
1242             sw_if_index=self.pg3.sw_if_index,
1243             ip6_table_index=table_index)
1244         self.assertIsNotNone(r,
1245                              msg='No response msg for input_acl_set_interface')
1246
1247         # log the ip6 inacl
1248         self.logger.info(self.vapi.cli("show inacl type ip6"))
1249
1250         # create packets
1251         count = len(self.pg_packet_sizes)
1252         dst_inner = 'a7::1234'
1253         pkts = []
1254
1255         # create IPv6 packets without SRH
1256         packet_header = self.create_packet_header_IPv6(dst_inner)
1257         # create traffic stream pg3->pg4
1258         pkts.extend(self.create_stream(self.pg3, self.pg4, packet_header,
1259                                        self.pg_packet_sizes, count))
1260
1261         # create IPv6 packets with SRH
1262         # packets with segments-left 1, active segment a7::
1263         packet_header = self.create_packet_header_IPv6_SRH(
1264             sidlist=['a8::', 'a7::', 'a6::'],
1265             segleft=1)
1266         # create traffic stream pg3->pg4
1267         pkts.extend(self.create_stream(self.pg3, self.pg4, packet_header,
1268                                        self.pg_packet_sizes, count))
1269
1270         # send packets and verify received packets
1271         self.send_and_verify_pkts(self.pg3, pkts, self.pg4,
1272                                   self.compare_rx_tx_packet_T_Insert)
1273
1274         # remove the interface l2 input feature
1275         r = self.vapi.input_acl_set_interface(
1276             is_add=0,
1277             sw_if_index=self.pg3.sw_if_index,
1278             ip6_table_index=table_index)
1279         self.assertIsNotNone(r,
1280                              msg='No response msg for input_acl_set_interface')
1281
1282         # log the ip6 inacl after cleaning
1283         self.logger.info(self.vapi.cli("show inacl type ip6"))
1284
1285         # log the localsid counters
1286         self.logger.info(self.vapi.cli("show sr localsid"))
1287
1288         # remove classifier SR steering
1289         # classifier_steering.remove_vpp_config()
1290         self.logger.info(self.vapi.cli("show sr steering policies"))
1291
1292         # remove SR Policies
1293         self.sr_policy.remove_vpp_config()
1294         self.logger.info(self.vapi.cli("show sr policies"))
1295
1296         # remove classify session and table
1297         r = self.vapi.classify_add_del_session(
1298             0,
1299             table_index,
1300             binascii.unhexlify(match))
1301         self.assertIsNotNone(r, msg='No response msg for add_del_session')
1302
1303         r = self.vapi.classify_add_del_table(
1304             0,
1305             binascii.unhexlify(mask),
1306             table_index=table_index)
1307         self.assertIsNotNone(r, msg='No response msg for add_del_table')
1308
1309         self.logger.info(self.vapi.cli("show classify table"))
1310
1311         # remove FIB entries
1312         # done by tearDown
1313
1314         # cleanup interfaces
1315         self.teardown_interfaces()
1316
1317     def compare_rx_tx_packet_T_Encaps(self, tx_pkt, rx_pkt):
1318         """ Compare input and output packet after passing T.Encaps
1319
1320         :param tx_pkt: transmitted packet
1321         :param rx_pkt: received packet
1322         """
1323         # T.Encaps updates the headers as follows:
1324         # SR Policy seglist (S3, S2, S1)
1325         # SR Policy source C
1326         # IPv6:
1327         # in: IPv6(A, B2)
1328         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(A, B2)
1329         # IPv6 + SRH:
1330         # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1)
1331         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(a, B2)SRH(B3, B2, B1; SL=1)
1332
1333         # get first (outer) IPv6 header of rx'ed packet
1334         rx_ip = rx_pkt.getlayer(IPv6)
1335         rx_srh = None
1336
1337         tx_ip = tx_pkt.getlayer(IPv6)
1338
1339         # expected segment-list
1340         seglist = self.sr_policy.segments
1341         # reverse list to get order as in SRH
1342         tx_seglist = seglist[::-1]
1343
1344         # get source address of SR Policy
1345         sr_policy_source = self.sr_policy.source
1346
1347         # rx'ed packet should have SRH
1348         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1349         # get SRH
1350         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1351
1352         # received ip.src should be equal to SR Policy source
1353         self.assertEqual(rx_ip.src, sr_policy_source)
1354         # received ip.dst should be equal to expected sidlist[lastentry]
1355         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1356         # rx'ed seglist should be equal to expected seglist
1357         self.assertEqual(rx_srh.addresses, tx_seglist)
1358         # segleft should be equal to size expected seglist-1
1359         self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1360         # segleft should be equal to lastentry
1361         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1362
1363         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1364         # except for the hop-limit field
1365         #   -> update tx'ed hlim to the expected hlim
1366         tx_ip.hlim = tx_ip.hlim - 1
1367
1368         self.assertEqual(rx_srh.payload, tx_ip)
1369
1370         self.logger.debug("packet verification: SUCCESS")
1371
1372     def compare_rx_tx_packet_T_Encaps_IPv4(self, tx_pkt, rx_pkt):
1373         """ Compare input and output packet after passing T.Encaps for IPv4
1374
1375         :param tx_pkt: transmitted packet
1376         :param rx_pkt: received packet
1377         """
1378         # T.Encaps for IPv4 updates the headers as follows:
1379         # SR Policy seglist (S3, S2, S1)
1380         # SR Policy source C
1381         # IPv4:
1382         # in: IPv4(A, B2)
1383         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv4(A, B2)
1384
1385         # get first (outer) IPv6 header of rx'ed packet
1386         rx_ip = rx_pkt.getlayer(IPv6)
1387         rx_srh = None
1388
1389         tx_ip = tx_pkt.getlayer(IP)
1390
1391         # expected segment-list
1392         seglist = self.sr_policy.segments
1393         # reverse list to get order as in SRH
1394         tx_seglist = seglist[::-1]
1395
1396         # get source address of SR Policy
1397         sr_policy_source = self.sr_policy.source
1398
1399         # checks common to cases tx with and without SRH
1400         # rx'ed packet should have SRH and IPv4 header
1401         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1402         self.assertTrue(rx_ip.payload.haslayer(IP))
1403         # get SRH
1404         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1405
1406         # received ip.src should be equal to SR Policy source
1407         self.assertEqual(rx_ip.src, sr_policy_source)
1408         # received ip.dst should be equal to sidlist[lastentry]
1409         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1410         # rx'ed seglist should be equal to seglist
1411         self.assertEqual(rx_srh.addresses, tx_seglist)
1412         # segleft should be equal to size seglist-1
1413         self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1414         # segleft should be equal to lastentry
1415         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1416
1417         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1418         # except for the ttl field and ip checksum
1419         #   -> adjust tx'ed ttl to expected ttl
1420         tx_ip.ttl = tx_ip.ttl - 1
1421         #   -> set tx'ed ip checksum to None and let scapy recompute
1422         tx_ip.chksum = None
1423         # read back the pkt (with str()) to force computing these fields
1424         # probably other ways to accomplish this are possible
1425         tx_ip = IP(str(tx_ip))
1426
1427         self.assertEqual(rx_srh.payload, tx_ip)
1428
1429         self.logger.debug("packet verification: SUCCESS")
1430
1431     def compare_rx_tx_packet_T_Encaps_L2(self, tx_pkt, rx_pkt):
1432         """ Compare input and output packet after passing T.Encaps for L2
1433
1434         :param tx_pkt: transmitted packet
1435         :param rx_pkt: received packet
1436         """
1437         # T.Encaps for L2 updates the headers as follows:
1438         # SR Policy seglist (S3, S2, S1)
1439         # SR Policy source C
1440         # L2:
1441         # in: L2
1442         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)L2
1443
1444         # get first (outer) IPv6 header of rx'ed packet
1445         rx_ip = rx_pkt.getlayer(IPv6)
1446         rx_srh = None
1447
1448         tx_ether = tx_pkt.getlayer(Ether)
1449
1450         # expected segment-list
1451         seglist = self.sr_policy.segments
1452         # reverse list to get order as in SRH
1453         tx_seglist = seglist[::-1]
1454
1455         # get source address of SR Policy
1456         sr_policy_source = self.sr_policy.source
1457
1458         # rx'ed packet should have SRH
1459         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1460         # get SRH
1461         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1462
1463         # received ip.src should be equal to SR Policy source
1464         self.assertEqual(rx_ip.src, sr_policy_source)
1465         # received ip.dst should be equal to sidlist[lastentry]
1466         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1467         # rx'ed seglist should be equal to seglist
1468         self.assertEqual(rx_srh.addresses, tx_seglist)
1469         # segleft should be equal to size seglist-1
1470         self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1471         # segleft should be equal to lastentry
1472         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1473         # nh should be "No Next Header" (59)
1474         self.assertEqual(rx_srh.nh, 59)
1475
1476         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1477         self.assertEqual(Ether(str(rx_srh.payload)), tx_ether)
1478
1479         self.logger.debug("packet verification: SUCCESS")
1480
1481     def compare_rx_tx_packet_T_Insert(self, tx_pkt, rx_pkt):
1482         """ Compare input and output packet after passing T.Insert
1483
1484         :param tx_pkt: transmitted packet
1485         :param rx_pkt: received packet
1486         """
1487         # T.Insert updates the headers as follows:
1488         # IPv6:
1489         # in: IPv6(A, B2)
1490         # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)
1491         # IPv6 + SRH:
1492         # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1)
1493         # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)SRH(B3, B2, B1; SL=1)
1494
1495         # get first (outer) IPv6 header of rx'ed packet
1496         rx_ip = rx_pkt.getlayer(IPv6)
1497         rx_srh = None
1498         rx_ip2 = None
1499         rx_srh2 = None
1500         rx_ip3 = None
1501         rx_udp = rx_pkt[UDP]
1502
1503         tx_ip = tx_pkt.getlayer(IPv6)
1504         tx_srh = None
1505         tx_ip2 = None
1506         # some packets have been tx'ed with an SRH, some without it
1507         # get SRH if tx'ed packet has it
1508         if tx_pkt.haslayer(IPv6ExtHdrSegmentRouting):
1509             tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1510             tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1511         tx_udp = tx_pkt[UDP]
1512
1513         # expected segment-list (make copy of SR Policy segment list)
1514         seglist = self.sr_policy.segments[:]
1515         # expected seglist has initial dest addr as last segment
1516         seglist.append(tx_ip.dst)
1517         # reverse list to get order as in SRH
1518         tx_seglist = seglist[::-1]
1519
1520         # get source address of SR Policy
1521         sr_policy_source = self.sr_policy.source
1522
1523         # checks common to cases tx with and without SRH
1524         # rx'ed packet should have SRH and only one IPv6 header
1525         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1526         self.assertFalse(rx_ip.payload.haslayer(IPv6))
1527         # get SRH
1528         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1529
1530         # rx'ed ip.src should be equal to tx'ed ip.src
1531         self.assertEqual(rx_ip.src, tx_ip.src)
1532         # rx'ed ip.dst should be equal to sidlist[lastentry]
1533         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1534
1535         # rx'ed seglist should be equal to expected seglist
1536         self.assertEqual(rx_srh.addresses, tx_seglist)
1537         # segleft should be equal to size(expected seglist)-1
1538         self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1539         # segleft should be equal to lastentry
1540         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1541
1542         if tx_srh:  # packet was tx'ed with SRH
1543             # packet should have 2nd SRH
1544             self.assertTrue(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting))
1545             # get 2nd SRH
1546             rx_srh2 = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting, 2)
1547
1548             # rx'ed srh2.addresses should be equal to tx'ed srh.addresses
1549             self.assertEqual(rx_srh2.addresses, tx_srh.addresses)
1550             # rx'ed srh2.segleft should be equal to tx'ed srh.segleft
1551             self.assertEqual(rx_srh2.segleft, tx_srh.segleft)
1552             # rx'ed srh2.lastentry should be equal to tx'ed srh.lastentry
1553             self.assertEqual(rx_srh2.lastentry, tx_srh.lastentry)
1554
1555         else:  # packet was tx'ed without SRH
1556             # rx packet should have no other SRH
1557             self.assertFalse(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting))
1558
1559         # UDP layer should be unchanged
1560         self.assertEqual(rx_udp, tx_udp)
1561
1562         self.logger.debug("packet verification: SUCCESS")
1563
1564     def compare_rx_tx_packet_End(self, tx_pkt, rx_pkt):
1565         """ Compare input and output packet after passing End (without PSP)
1566
1567         :param tx_pkt: transmitted packet
1568         :param rx_pkt: received packet
1569         """
1570         # End (no PSP) updates the headers as follows:
1571         # IPv6 + SRH:
1572         # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2)
1573         # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1574
1575         # get first (outer) IPv6 header of rx'ed packet
1576         rx_ip = rx_pkt.getlayer(IPv6)
1577         rx_srh = None
1578         rx_ip2 = None
1579         rx_udp = rx_pkt[UDP]
1580
1581         tx_ip = tx_pkt.getlayer(IPv6)
1582         # we know the packet has been tx'ed
1583         # with an inner IPv6 header and an SRH
1584         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1585         tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1586         tx_udp = tx_pkt[UDP]
1587
1588         # common checks, regardless of tx segleft value
1589         # rx'ed packet should have 2nd IPv6 header
1590         self.assertTrue(rx_ip.payload.haslayer(IPv6))
1591         # get second (inner) IPv6 header
1592         rx_ip2 = rx_pkt.getlayer(IPv6, 2)
1593
1594         if tx_ip.segleft > 0:
1595             # SRH should NOT have been popped:
1596             #   End SID without PSP does not pop SRH if segleft>0
1597             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1598             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1599
1600             # received ip.src should be equal to expected ip.src
1601             self.assertEqual(rx_ip.src, tx_ip.src)
1602             # sidlist should be unchanged
1603             self.assertEqual(rx_srh.addresses, tx_srh.addresses)
1604             # segleft should have been decremented
1605             self.assertEqual(rx_srh.segleft, tx_srh.segleft-1)
1606             # received ip.dst should be equal to sidlist[segleft]
1607             self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
1608             # lastentry should be unchanged
1609             self.assertEqual(rx_srh.lastentry, tx_srh.lastentry)
1610             # inner IPv6 packet (ip2) should be unchanged
1611             self.assertEqual(rx_ip2.src, tx_ip2.src)
1612             self.assertEqual(rx_ip2.dst, tx_ip2.dst)
1613         # else:  # tx_ip.segleft == 0
1614             # TODO: Does this work with 2 SRHs in ingress packet?
1615
1616         # UDP layer should be unchanged
1617         self.assertEqual(rx_udp, tx_udp)
1618
1619         self.logger.debug("packet verification: SUCCESS")
1620
1621     def compare_rx_tx_packet_End_PSP(self, tx_pkt, rx_pkt):
1622         """ Compare input and output packet after passing End with PSP
1623
1624         :param tx_pkt: transmitted packet
1625         :param rx_pkt: received packet
1626         """
1627         # End (PSP) updates the headers as follows:
1628         # IPv6 + SRH (SL>1):
1629         # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2)
1630         # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1631         # IPv6 + SRH (SL=1):
1632         # in: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1633         # out: IPv6(A, S3)
1634
1635         # get first (outer) IPv6 header of rx'ed packet
1636         rx_ip = rx_pkt.getlayer(IPv6)
1637         rx_srh = None
1638         rx_ip2 = None
1639         rx_udp = rx_pkt[UDP]
1640
1641         tx_ip = tx_pkt.getlayer(IPv6)
1642         # we know the packet has been tx'ed
1643         # with an inner IPv6 header and an SRH
1644         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1645         tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1646         tx_udp = tx_pkt[UDP]
1647
1648         # common checks, regardless of tx segleft value
1649         self.assertTrue(rx_ip.payload.haslayer(IPv6))
1650         rx_ip2 = rx_pkt.getlayer(IPv6, 2)
1651         # inner IPv6 packet (ip2) should be unchanged
1652         self.assertEqual(rx_ip2.src, tx_ip2.src)
1653         self.assertEqual(rx_ip2.dst, tx_ip2.dst)
1654
1655         if tx_ip.segleft > 1:
1656             # SRH should NOT have been popped:
1657             #   End SID with PSP does not pop SRH if segleft>1
1658             # rx'ed packet should have SRH
1659             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1660             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1661
1662             # received ip.src should be equal to expected ip.src
1663             self.assertEqual(rx_ip.src, tx_ip.src)
1664             # sidlist should be unchanged
1665             self.assertEqual(rx_srh.addresses, tx_srh.addresses)
1666             # segleft should have been decremented
1667             self.assertEqual(rx_srh.segleft, tx_srh.segleft-1)
1668             # received ip.dst should be equal to sidlist[segleft]
1669             self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
1670             # lastentry should be unchanged
1671             self.assertEqual(rx_srh.lastentry, tx_srh.lastentry)
1672
1673         else:  # tx_ip.segleft <= 1
1674             # SRH should have been popped:
1675             #   End SID with PSP and segleft=1 pops SRH
1676             # the two IPv6 headers are still present
1677             # outer IPv6 header has DA == last segment of popped SRH
1678             # SRH should not be present
1679             self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1680             # outer IPv6 header ip.src should be equal to tx'ed ip.src
1681             self.assertEqual(rx_ip.src, tx_ip.src)
1682             # outer IPv6 header ip.dst should be = to tx'ed sidlist[segleft-1]
1683             self.assertEqual(rx_ip.dst, tx_srh.addresses[tx_srh.segleft-1])
1684
1685         # UDP layer should be unchanged
1686         self.assertEqual(rx_udp, tx_udp)
1687
1688         self.logger.debug("packet verification: SUCCESS")
1689
1690     def compare_rx_tx_packet_End_DX6(self, tx_pkt, rx_pkt):
1691         """ Compare input and output packet after passing End.DX6
1692
1693         :param tx_pkt: transmitted packet
1694         :param rx_pkt: received packet
1695         """
1696         # End.DX6 updates the headers as follows:
1697         # IPv6 + SRH (SL=0):
1698         # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv6(B, D)
1699         # out: IPv6(B, D)
1700         # IPv6:
1701         # in: IPv6(A, S3)IPv6(B, D)
1702         # out: IPv6(B, D)
1703
1704         # get first (outer) IPv6 header of rx'ed packet
1705         rx_ip = rx_pkt.getlayer(IPv6)
1706
1707         tx_ip = tx_pkt.getlayer(IPv6)
1708         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1709
1710         # verify if rx'ed packet has no SRH
1711         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1712
1713         # the whole rx_ip pkt should be equal to tx_ip2
1714         # except for the hlim field
1715         #   -> adjust tx'ed hlim to expected hlim
1716         tx_ip2.hlim = tx_ip2.hlim - 1
1717
1718         self.assertEqual(rx_ip, tx_ip2)
1719
1720         self.logger.debug("packet verification: SUCCESS")
1721
1722     def compare_rx_tx_packet_End_DX4(self, tx_pkt, rx_pkt):
1723         """ Compare input and output packet after passing End.DX4
1724
1725         :param tx_pkt: transmitted packet
1726         :param rx_pkt: received packet
1727         """
1728         # End.DX4 updates the headers as follows:
1729         # IPv6 + SRH (SL=0):
1730         # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv4(B, D)
1731         # out: IPv4(B, D)
1732         # IPv6:
1733         # in: IPv6(A, S3)IPv4(B, D)
1734         # out: IPv4(B, D)
1735
1736         # get IPv4 header of rx'ed packet
1737         rx_ip = rx_pkt.getlayer(IP)
1738
1739         tx_ip = tx_pkt.getlayer(IPv6)
1740         tx_ip2 = tx_pkt.getlayer(IP)
1741
1742         # verify if rx'ed packet has no SRH
1743         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1744
1745         # the whole rx_ip pkt should be equal to tx_ip2
1746         # except for the ttl field and ip checksum
1747         #   -> adjust tx'ed ttl to expected ttl
1748         tx_ip2.ttl = tx_ip2.ttl - 1
1749         #   -> set tx'ed ip checksum to None and let scapy recompute
1750         tx_ip2.chksum = None
1751         # read back the pkt (with str()) to force computing these fields
1752         # probably other ways to accomplish this are possible
1753         tx_ip2 = IP(str(tx_ip2))
1754
1755         self.assertEqual(rx_ip, tx_ip2)
1756
1757         self.logger.debug("packet verification: SUCCESS")
1758
1759     def compare_rx_tx_packet_End_DX2(self, tx_pkt, rx_pkt):
1760         """ Compare input and output packet after passing End.DX2
1761
1762         :param tx_pkt: transmitted packet
1763         :param rx_pkt: received packet
1764         """
1765         # End.DX2 updates the headers as follows:
1766         # IPv6 + SRH (SL=0):
1767         # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)L2
1768         # out: L2
1769         # IPv6:
1770         # in: IPv6(A, S3)L2
1771         # out: L2
1772
1773         # get IPv4 header of rx'ed packet
1774         rx_eth = rx_pkt.getlayer(Ether)
1775
1776         tx_ip = tx_pkt.getlayer(IPv6)
1777         # we can't just get the 2nd Ether layer
1778         # get the Raw content and dissect it as Ether
1779         tx_eth1 = Ether(str(tx_pkt[Raw]))
1780
1781         # verify if rx'ed packet has no SRH
1782         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1783
1784         # the whole rx_eth pkt should be equal to tx_eth1
1785         self.assertEqual(rx_eth, tx_eth1)
1786
1787         self.logger.debug("packet verification: SUCCESS")
1788
1789     def create_stream(self, src_if, dst_if, packet_header, packet_sizes,
1790                       count):
1791         """Create SRv6 input packet stream for defined interface.
1792
1793         :param VppInterface src_if: Interface to create packet stream for
1794         :param VppInterface dst_if: destination interface of packet stream
1795         :param packet_header: Layer3 scapy packet headers,
1796                 L2 is added when not provided,
1797                 Raw(payload) with packet_info is added
1798         :param list packet_sizes: packet stream pckt sizes,sequentially applied
1799                to packets in stream have
1800         :param int count: number of packets in packet stream
1801         :return: list of packets
1802         """
1803         self.logger.info("Creating packets")
1804         pkts = []
1805         for i in range(0, count-1):
1806             payload_info = self.create_packet_info(src_if, dst_if)
1807             self.logger.debug(
1808                 "Creating packet with index %d" % (payload_info.index))
1809             payload = self.info_to_payload(payload_info)
1810             # add L2 header if not yet provided in packet_header
1811             if packet_header.getlayer(0).name == 'Ethernet':
1812                 p = (packet_header /
1813                      Raw(payload))
1814             else:
1815                 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
1816                      packet_header /
1817                      Raw(payload))
1818             size = packet_sizes[i % len(packet_sizes)]
1819             self.logger.debug("Packet size %d" % (size))
1820             self.extend_packet(p, size)
1821             # we need to store the packet with the automatic fields computed
1822             # read back the dumped packet (with str())
1823             # to force computing these fields
1824             # probably other ways are possible
1825             p = Ether(str(p))
1826             payload_info.data = p.copy()
1827             self.logger.debug(ppp("Created packet:", p))
1828             pkts.append(p)
1829         self.logger.info("Done creating packets")
1830         return pkts
1831
1832     def send_and_verify_pkts(self, input, pkts, output, compare_func):
1833         """Send packets and verify received packets using compare_func
1834
1835         :param input: ingress interface of DUT
1836         :param pkts: list of packets to transmit
1837         :param output: egress interface of DUT
1838         :param compare_func: function to compare in and out packets
1839         """
1840         # add traffic stream to input interface
1841         input.add_stream(pkts)
1842
1843         # enable capture on all interfaces
1844         self.pg_enable_capture(self.pg_interfaces)
1845
1846         # start traffic
1847         self.logger.info("Starting traffic")
1848         self.pg_start()
1849
1850         # get output capture
1851         self.logger.info("Getting packet capture")
1852         capture = output.get_capture()
1853
1854         # assert nothing was captured on input interface
1855         input.assert_nothing_captured()
1856
1857         # verify captured packets
1858         self.verify_captured_pkts(output, capture, compare_func)
1859
1860     def create_packet_header_IPv6(self, dst):
1861         """Create packet header: IPv6 header, UDP header
1862
1863         :param dst: IPv6 destination address
1864
1865         IPv6 source address is 1234::1
1866         UDP source port and destination port are 1234
1867         """
1868
1869         p = (IPv6(src='1234::1', dst=dst) /
1870              UDP(sport=1234, dport=1234))
1871         return p
1872
1873     def create_packet_header_IPv6_SRH(self, sidlist, segleft):
1874         """Create packet header: IPv6 header with SRH, UDP header
1875
1876         :param list sidlist: segment list
1877         :param int segleft: segments-left field value
1878
1879         IPv6 destination address is set to sidlist[segleft]
1880         IPv6 source addresses are 1234::1 and 4321::1
1881         UDP source port and destination port are 1234
1882         """
1883
1884         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1885              IPv6ExtHdrSegmentRouting(addresses=sidlist) /
1886              UDP(sport=1234, dport=1234))
1887         return p
1888
1889     def create_packet_header_IPv6_SRH_IPv6(self, dst, sidlist, segleft):
1890         """Create packet header: IPv6 encapsulated in SRv6:
1891         IPv6 header with SRH, IPv6 header, UDP header
1892
1893         :param ipv6address dst: inner IPv6 destination address
1894         :param list sidlist: segment list of outer IPv6 SRH
1895         :param int segleft: segments-left field of outer IPv6 SRH
1896
1897         Outer IPv6 destination address is set to sidlist[segleft]
1898         IPv6 source addresses are 1234::1 and 4321::1
1899         UDP source port and destination port are 1234
1900         """
1901
1902         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1903              IPv6ExtHdrSegmentRouting(addresses=sidlist,
1904                                       segleft=segleft, nh=41) /
1905              IPv6(src='4321::1', dst=dst) /
1906              UDP(sport=1234, dport=1234))
1907         return p
1908
1909     def create_packet_header_IPv6_IPv6(self, dst_inner, dst_outer):
1910         """Create packet header: IPv6 encapsulated in IPv6:
1911         IPv6 header, IPv6 header, UDP header
1912
1913         :param ipv6address dst_inner: inner IPv6 destination address
1914         :param ipv6address dst_outer: outer IPv6 destination address
1915
1916         IPv6 source addresses are 1234::1 and 4321::1
1917         UDP source port and destination port are 1234
1918         """
1919
1920         p = (IPv6(src='1234::1', dst=dst_outer) /
1921              IPv6(src='4321::1', dst=dst_inner) /
1922              UDP(sport=1234, dport=1234))
1923         return p
1924
1925     def create_packet_header_IPv6_SRH_SRH_IPv6(self, dst, sidlist1, segleft1,
1926                                                sidlist2, segleft2):
1927         """Create packet header: IPv6 encapsulated in SRv6 with 2 SRH:
1928         IPv6 header with SRH, 2nd SRH, IPv6 header, UDP header
1929
1930         :param ipv6address dst: inner IPv6 destination address
1931         :param list sidlist1: segment list of outer IPv6 SRH
1932         :param int segleft1: segments-left field of outer IPv6 SRH
1933         :param list sidlist2: segment list of inner IPv6 SRH
1934         :param int segleft2: segments-left field of inner IPv6 SRH
1935
1936         Outer IPv6 destination address is set to sidlist[segleft]
1937         IPv6 source addresses are 1234::1 and 4321::1
1938         UDP source port and destination port are 1234
1939         """
1940
1941         p = (IPv6(src='1234::1', dst=sidlist1[segleft1]) /
1942              IPv6ExtHdrSegmentRouting(addresses=sidlist1,
1943                                       segleft=segleft1, nh=43) /
1944              IPv6ExtHdrSegmentRouting(addresses=sidlist2,
1945                                       segleft=segleft2, nh=41) /
1946              IPv6(src='4321::1', dst=dst) /
1947              UDP(sport=1234, dport=1234))
1948         return p
1949
1950     def create_packet_header_IPv4(self, dst):
1951         """Create packet header: IPv4 header, UDP header
1952
1953         :param dst: IPv4 destination address
1954
1955         IPv4 source address is 123.1.1.1
1956         UDP source port and destination port are 1234
1957         """
1958
1959         p = (IP(src='123.1.1.1', dst=dst) /
1960              UDP(sport=1234, dport=1234))
1961         return p
1962
1963     def create_packet_header_IPv6_IPv4(self, dst_inner, dst_outer):
1964         """Create packet header: IPv4 encapsulated in IPv6:
1965         IPv6 header, IPv4 header, UDP header
1966
1967         :param ipv4address dst_inner: inner IPv4 destination address
1968         :param ipv6address dst_outer: outer IPv6 destination address
1969
1970         IPv6 source address is 1234::1
1971         IPv4 source address is 123.1.1.1
1972         UDP source port and destination port are 1234
1973         """
1974
1975         p = (IPv6(src='1234::1', dst=dst_outer) /
1976              IP(src='123.1.1.1', dst=dst_inner) /
1977              UDP(sport=1234, dport=1234))
1978         return p
1979
1980     def create_packet_header_IPv6_SRH_IPv4(self, dst, sidlist, segleft):
1981         """Create packet header: IPv4 encapsulated in SRv6:
1982         IPv6 header with SRH, IPv4 header, UDP header
1983
1984         :param ipv4address dst: inner IPv4 destination address
1985         :param list sidlist: segment list of outer IPv6 SRH
1986         :param int segleft: segments-left field of outer IPv6 SRH
1987
1988         Outer IPv6 destination address is set to sidlist[segleft]
1989         IPv6 source address is 1234::1
1990         IPv4 source address is 123.1.1.1
1991         UDP source port and destination port are 1234
1992         """
1993
1994         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1995              IPv6ExtHdrSegmentRouting(addresses=sidlist,
1996                                       segleft=segleft, nh=4) /
1997              IP(src='123.1.1.1', dst=dst) /
1998              UDP(sport=1234, dport=1234))
1999         return p
2000
2001     def create_packet_header_L2(self, vlan=0):
2002         """Create packet header: L2 header
2003
2004         :param vlan: if vlan!=0 then add 802.1q header
2005         """
2006         # Note: the dst addr ('00:55:44:33:22:11') is used in
2007         # the compare function compare_rx_tx_packet_T_Encaps_L2
2008         # to detect presence of L2 in SRH payload
2009         p = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
2010         etype = 0x8137  # IPX
2011         if vlan:
2012             # add 802.1q layer
2013             p /= Dot1Q(vlan=vlan, type=etype)
2014         else:
2015             p.type = etype
2016         return p
2017
2018     def create_packet_header_IPv6_SRH_L2(self, sidlist, segleft, vlan=0):
2019         """Create packet header: L2 encapsulated in SRv6:
2020         IPv6 header with SRH, L2
2021
2022         :param list sidlist: segment list of outer IPv6 SRH
2023         :param int segleft: segments-left field of outer IPv6 SRH
2024         :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
2025
2026         Outer IPv6 destination address is set to sidlist[segleft]
2027         IPv6 source address is 1234::1
2028         """
2029         eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
2030         etype = 0x8137  # IPX
2031         if vlan:
2032             # add 802.1q layer
2033             eth /= Dot1Q(vlan=vlan, type=etype)
2034         else:
2035             eth.type = etype
2036
2037         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
2038              IPv6ExtHdrSegmentRouting(addresses=sidlist,
2039                                       segleft=segleft, nh=59) /
2040              eth)
2041         return p
2042
2043     def create_packet_header_IPv6_L2(self, dst_outer, vlan=0):
2044         """Create packet header: L2 encapsulated in IPv6:
2045         IPv6 header, L2
2046
2047         :param ipv6address dst_outer: outer IPv6 destination address
2048         :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
2049         """
2050         eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
2051         etype = 0x8137  # IPX
2052         if vlan:
2053             # add 802.1q layer
2054             eth /= Dot1Q(vlan=vlan, type=etype)
2055         else:
2056             eth.type = etype
2057
2058         p = (IPv6(src='1234::1', dst=dst_outer, nh=59) / eth)
2059         return p
2060
2061     def get_payload_info(self, packet):
2062         """ Extract the payload_info from the packet
2063         """
2064         # in most cases, payload_info is in packet[Raw]
2065         # but packet[Raw] gives the complete payload
2066         # (incl L2 header) for the T.Encaps L2 case
2067         try:
2068             payload_info = self.payload_to_info(str(packet[Raw]))
2069
2070         except:
2071             # remote L2 header from packet[Raw]:
2072             # take packet[Raw], convert it to an Ether layer
2073             # and then extract Raw from it
2074             payload_info = self.payload_to_info(
2075                 str(Ether(str(packet[Raw]))[Raw]))
2076
2077         return payload_info
2078
2079     def verify_captured_pkts(self, dst_if, capture, compare_func):
2080         """
2081         Verify captured packet stream for specified interface.
2082         Compare ingress with egress packets using the specified compare fn
2083
2084         :param dst_if: egress interface of DUT
2085         :param capture: captured packets
2086         :param compare_func: function to compare in and out packet
2087         """
2088         self.logger.info("Verifying capture on interface %s using function %s"
2089                          % (dst_if.name, compare_func.func_name))
2090
2091         last_info = dict()
2092         for i in self.pg_interfaces:
2093             last_info[i.sw_if_index] = None
2094         dst_sw_if_index = dst_if.sw_if_index
2095
2096         for packet in capture:
2097             try:
2098                 # extract payload_info from packet's payload
2099                 payload_info = self.get_payload_info(packet)
2100                 packet_index = payload_info.index
2101
2102                 self.logger.debug("Verifying packet with index %d"
2103                                   % (packet_index))
2104                 # packet should have arrived on the expected interface
2105                 self.assertEqual(payload_info.dst, dst_sw_if_index)
2106                 self.logger.debug(
2107                     "Got packet on interface %s: src=%u (idx=%u)" %
2108                     (dst_if.name, payload_info.src, packet_index))
2109
2110                 # search for payload_info with same src and dst if_index
2111                 # this will give us the transmitted packet
2112                 next_info = self.get_next_packet_info_for_interface2(
2113                     payload_info.src, dst_sw_if_index,
2114                     last_info[payload_info.src])
2115                 last_info[payload_info.src] = next_info
2116                 # next_info should not be None
2117                 self.assertTrue(next_info is not None)
2118                 # index of tx and rx packets should be equal
2119                 self.assertEqual(packet_index, next_info.index)
2120                 # data field of next_info contains the tx packet
2121                 txed_packet = next_info.data
2122
2123                 self.logger.debug(ppp("Transmitted packet:",
2124                                       txed_packet))  # ppp=Pretty Print Packet
2125
2126                 self.logger.debug(ppp("Received packet:", packet))
2127
2128                 # compare rcvd packet with expected packet using compare_func
2129                 compare_func(txed_packet, packet)
2130
2131             except:
2132                 print packet.command()
2133                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2134                 raise
2135
2136         # have all expected packets arrived?
2137         for i in self.pg_interfaces:
2138             remaining_packet = self.get_next_packet_info_for_interface2(
2139                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
2140             self.assertTrue(remaining_packet is None,
2141                             "Interface %s: Packet expected from interface %s "
2142                             "didn't arrive" % (dst_if.name, i.name))
2143
2144
2145 if __name__ == '__main__':
2146     unittest.main(testRunner=VppTestRunner)