Tests Cleanup: Fix missing calls to setUpClass/tearDownClass.
[vpp.git] / test / test_srv6.py
1 #!/usr/bin/env python
2
3 import unittest
4 import binascii
5 from socket import AF_INET6
6
7 from framework import VppTestCase, VppTestRunner
8 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto, VppIpTable
9 from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \
10     SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes
11
12 import scapy.compat
13 from scapy.packet import Raw
14 from scapy.layers.l2 import Ether, Dot1Q
15 from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
16 from scapy.layers.inet import IP, UDP
17
18 from scapy.utils import inet_pton, inet_ntop
19
20 from util import ppp
21
22
23 class TestSRv6(VppTestCase):
24     """ SRv6 Test Case """
25
26     @classmethod
27     def setUpClass(cls):
28         super(TestSRv6, cls).setUpClass()
29
30     @classmethod
31     def tearDownClass(cls):
32         super(TestSRv6, cls).tearDownClass()
33
34     def setUp(self):
35         """ Perform test setup before each test case.
36         """
37         super(TestSRv6, self).setUp()
38
39         # packet sizes, inclusive L2 overhead
40         self.pg_packet_sizes = [64, 512, 1518, 9018]
41
42         # reset packet_infos
43         self.reset_packet_infos()
44
45     def tearDown(self):
46         """ Clean up test setup after each test case.
47         """
48         self.teardown_interfaces()
49
50         super(TestSRv6, self).tearDown()
51
52     def configure_interface(self,
53                             interface,
54                             ipv6=False, ipv4=False,
55                             ipv6_table_id=0, ipv4_table_id=0):
56         """ Configure interface.
57         :param ipv6: configure IPv6 on interface
58         :param ipv4: configure IPv4 on interface
59         :param ipv6_table_id: FIB table_id for IPv6
60         :param ipv4_table_id: FIB table_id for IPv4
61         """
62         self.logger.debug("Configuring interface %s" % (interface.name))
63         if ipv6:
64             self.logger.debug("Configuring IPv6")
65             interface.set_table_ip6(ipv6_table_id)
66             interface.config_ip6()
67             interface.resolve_ndp(timeout=5)
68         if ipv4:
69             self.logger.debug("Configuring IPv4")
70             interface.set_table_ip4(ipv4_table_id)
71             interface.config_ip4()
72             interface.resolve_arp()
73         interface.admin_up()
74
75     def setup_interfaces(self, ipv6=[], ipv4=[],
76                          ipv6_table_id=[], ipv4_table_id=[]):
77         """ Create and configure interfaces.
78
79         :param ipv6: list of interface IPv6 capabilities
80         :param ipv4: list of interface IPv4 capabilities
81         :param ipv6_table_id: list of intf IPv6 FIB table_ids
82         :param ipv4_table_id: list of intf IPv4 FIB table_ids
83         :returns: List of created interfaces.
84         """
85         # how many interfaces?
86         if len(ipv6):
87             count = len(ipv6)
88         else:
89             count = len(ipv4)
90         self.logger.debug("Creating and configuring %d interfaces" % (count))
91
92         # fill up ipv6 and ipv4 lists if needed
93         # not enabled (False) is the default
94         if len(ipv6) < count:
95             ipv6 += (count - len(ipv6)) * [False]
96         if len(ipv4) < count:
97             ipv4 += (count - len(ipv4)) * [False]
98
99         # fill up table_id lists if needed
100         # table_id 0 (global) is the default
101         if len(ipv6_table_id) < count:
102             ipv6_table_id += (count - len(ipv6_table_id)) * [0]
103         if len(ipv4_table_id) < count:
104             ipv4_table_id += (count - len(ipv4_table_id)) * [0]
105
106         # create 'count' pg interfaces
107         self.create_pg_interfaces(range(count))
108
109         # setup all interfaces
110         for i in range(count):
111             intf = self.pg_interfaces[i]
112             self.configure_interface(intf,
113                                      ipv6[i], ipv4[i],
114                                      ipv6_table_id[i], ipv4_table_id[i])
115
116         if any(ipv6):
117             self.logger.debug(self.vapi.cli("show ip6 neighbors"))
118         if any(ipv4):
119             self.logger.debug(self.vapi.cli("show ip arp"))
120         self.logger.debug(self.vapi.cli("show interface"))
121         self.logger.debug(self.vapi.cli("show hardware"))
122
123         return self.pg_interfaces
124
125     def teardown_interfaces(self):
126         """ Unconfigure and bring down interface.
127         """
128         self.logger.debug("Tearing down interfaces")
129         # tear down all interfaces
130         # AFAIK they cannot be deleted
131         for i in self.pg_interfaces:
132             self.logger.debug("Tear down interface %s" % (i.name))
133             i.admin_down()
134             i.unconfig()
135             i.set_table_ip4(0)
136             i.set_table_ip6(0)
137
138     @unittest.skipUnless(0, "PC to fix")
139     def test_SRv6_T_Encaps(self):
140         """ Test SRv6 Transit.Encaps behavior for IPv6.
141         """
142         # send traffic to one destination interface
143         # source and destination are IPv6 only
144         self.setup_interfaces(ipv6=[True, True])
145
146         # configure FIB entries
147         route = VppIpRoute(self, "a4::", 64,
148                            [VppRoutePath(self.pg1.remote_ip6,
149                                          self.pg1.sw_if_index,
150                                          proto=DpoProto.DPO_PROTO_IP6)],
151                            is_ip6=1)
152         route.add_vpp_config()
153
154         # configure encaps IPv6 source address
155         # needs to be done before SR Policy config
156         # TODO: API?
157         self.vapi.cli("set sr encaps source addr a3::")
158
159         bsid = 'a3::9999:1'
160         # configure SRv6 Policy
161         # Note: segment list order: first -> last
162         sr_policy = VppSRv6Policy(
163             self, bsid=bsid,
164             is_encap=1,
165             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
166             weight=1, fib_table=0,
167             segments=['a4::', 'a5::', 'a6::c7'],
168             source='a3::')
169         sr_policy.add_vpp_config()
170         self.sr_policy = sr_policy
171
172         # log the sr policies
173         self.logger.info(self.vapi.cli("show sr policies"))
174
175         # steer IPv6 traffic to a7::/64 into SRv6 Policy
176         # use the bsid of the above self.sr_policy
177         pol_steering = VppSRv6Steering(
178                         self,
179                         bsid=self.sr_policy.bsid,
180                         prefix="a7::", mask_width=64,
181                         traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
182                         sr_policy_index=0, table_id=0,
183                         sw_if_index=0)
184         pol_steering.add_vpp_config()
185
186         # log the sr steering policies
187         self.logger.info(self.vapi.cli("show sr steering policies"))
188
189         # create packets
190         count = len(self.pg_packet_sizes)
191         dst_inner = 'a7::1234'
192         pkts = []
193
194         # create IPv6 packets without SRH
195         packet_header = self.create_packet_header_IPv6(dst_inner)
196         # create traffic stream pg0->pg1
197         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
198                                        self.pg_packet_sizes, count))
199
200         # create IPv6 packets with SRH
201         # packets with segments-left 1, active segment a7::
202         packet_header = self.create_packet_header_IPv6_SRH(
203             sidlist=['a8::', 'a7::', 'a6::'],
204             segleft=1)
205         # create traffic stream pg0->pg1
206         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
207                                        self.pg_packet_sizes, count))
208
209         # create IPv6 packets with SRH and IPv6
210         # packets with segments-left 1, active segment a7::
211         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
212             dst_inner,
213             sidlist=['a8::', 'a7::', 'a6::'],
214             segleft=1)
215         # create traffic stream pg0->pg1
216         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
217                                        self.pg_packet_sizes, count))
218
219         # send packets and verify received packets
220         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
221                                   self.compare_rx_tx_packet_T_Encaps)
222
223         # log the localsid counters
224         self.logger.info(self.vapi.cli("show sr localsid"))
225
226         # remove SR steering
227         pol_steering.remove_vpp_config()
228         self.logger.info(self.vapi.cli("show sr steering policies"))
229
230         # remove SR Policies
231         self.sr_policy.remove_vpp_config()
232         self.logger.info(self.vapi.cli("show sr policies"))
233
234         # remove FIB entries
235         # done by tearDown
236
237         # cleanup interfaces
238         self.teardown_interfaces()
239
240     @unittest.skipUnless(0, "PC to fix")
241     def test_SRv6_T_Insert(self):
242         """ Test SRv6 Transit.Insert behavior (IPv6 only).
243         """
244         # send traffic to one destination interface
245         # source and destination are IPv6 only
246         self.setup_interfaces(ipv6=[True, True])
247
248         # configure FIB entries
249         route = VppIpRoute(self, "a4::", 64,
250                            [VppRoutePath(self.pg1.remote_ip6,
251                                          self.pg1.sw_if_index,
252                                          proto=DpoProto.DPO_PROTO_IP6)],
253                            is_ip6=1)
254         route.add_vpp_config()
255
256         # configure encaps IPv6 source address
257         # needs to be done before SR Policy config
258         # TODO: API?
259         self.vapi.cli("set sr encaps source addr a3::")
260
261         bsid = 'a3::9999:1'
262         # configure SRv6 Policy
263         # Note: segment list order: first -> last
264         sr_policy = VppSRv6Policy(
265             self, bsid=bsid,
266             is_encap=0,
267             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
268             weight=1, fib_table=0,
269             segments=['a4::', 'a5::', 'a6::c7'],
270             source='a3::')
271         sr_policy.add_vpp_config()
272         self.sr_policy = sr_policy
273
274         # log the sr policies
275         self.logger.info(self.vapi.cli("show sr policies"))
276
277         # steer IPv6 traffic to a7::/64 into SRv6 Policy
278         # use the bsid of the above self.sr_policy
279         pol_steering = VppSRv6Steering(
280                         self,
281                         bsid=self.sr_policy.bsid,
282                         prefix="a7::", mask_width=64,
283                         traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
284                         sr_policy_index=0, table_id=0,
285                         sw_if_index=0)
286         pol_steering.add_vpp_config()
287
288         # log the sr steering policies
289         self.logger.info(self.vapi.cli("show sr steering policies"))
290
291         # create packets
292         count = len(self.pg_packet_sizes)
293         dst_inner = 'a7::1234'
294         pkts = []
295
296         # create IPv6 packets without SRH
297         packet_header = self.create_packet_header_IPv6(dst_inner)
298         # create traffic stream pg0->pg1
299         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
300                                        self.pg_packet_sizes, count))
301
302         # create IPv6 packets with SRH
303         # packets with segments-left 1, active segment a7::
304         packet_header = self.create_packet_header_IPv6_SRH(
305             sidlist=['a8::', 'a7::', 'a6::'],
306             segleft=1)
307         # create traffic stream pg0->pg1
308         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
309                                        self.pg_packet_sizes, count))
310
311         # send packets and verify received packets
312         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
313                                   self.compare_rx_tx_packet_T_Insert)
314
315         # log the localsid counters
316         self.logger.info(self.vapi.cli("show sr localsid"))
317
318         # remove SR steering
319         pol_steering.remove_vpp_config()
320         self.logger.info(self.vapi.cli("show sr steering policies"))
321
322         # remove SR Policies
323         self.sr_policy.remove_vpp_config()
324         self.logger.info(self.vapi.cli("show sr policies"))
325
326         # remove FIB entries
327         # done by tearDown
328
329         # cleanup interfaces
330         self.teardown_interfaces()
331
332     @unittest.skipUnless(0, "PC to fix")
333     def test_SRv6_T_Encaps_IPv4(self):
334         """ Test SRv6 Transit.Encaps behavior for IPv4.
335         """
336         # send traffic to one destination interface
337         # source interface is IPv4 only
338         # destination interface is IPv6 only
339         self.setup_interfaces(ipv6=[False, True], ipv4=[True, False])
340
341         # configure FIB entries
342         route = VppIpRoute(self, "a4::", 64,
343                            [VppRoutePath(self.pg1.remote_ip6,
344                                          self.pg1.sw_if_index,
345                                          proto=DpoProto.DPO_PROTO_IP6)],
346                            is_ip6=1)
347         route.add_vpp_config()
348
349         # configure encaps IPv6 source address
350         # needs to be done before SR Policy config
351         # TODO: API?
352         self.vapi.cli("set sr encaps source addr a3::")
353
354         bsid = 'a3::9999:1'
355         # configure SRv6 Policy
356         # Note: segment list order: first -> last
357         sr_policy = VppSRv6Policy(
358             self, bsid=bsid,
359             is_encap=1,
360             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
361             weight=1, fib_table=0,
362             segments=['a4::', 'a5::', 'a6::c7'],
363             source='a3::')
364         sr_policy.add_vpp_config()
365         self.sr_policy = sr_policy
366
367         # log the sr policies
368         self.logger.info(self.vapi.cli("show sr policies"))
369
370         # steer IPv4 traffic to 7.1.1.0/24 into SRv6 Policy
371         # use the bsid of the above self.sr_policy
372         pol_steering = VppSRv6Steering(
373                         self,
374                         bsid=self.sr_policy.bsid,
375                         prefix="7.1.1.0", mask_width=24,
376                         traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV4,
377                         sr_policy_index=0, table_id=0,
378                         sw_if_index=0)
379         pol_steering.add_vpp_config()
380
381         # log the sr steering policies
382         self.logger.info(self.vapi.cli("show sr steering policies"))
383
384         # create packets
385         count = len(self.pg_packet_sizes)
386         dst_inner = '7.1.1.123'
387         pkts = []
388
389         # create IPv4 packets
390         packet_header = self.create_packet_header_IPv4(dst_inner)
391         # create traffic stream pg0->pg1
392         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
393                                        self.pg_packet_sizes, count))
394
395         # send packets and verify received packets
396         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
397                                   self.compare_rx_tx_packet_T_Encaps_IPv4)
398
399         # log the localsid counters
400         self.logger.info(self.vapi.cli("show sr localsid"))
401
402         # remove SR steering
403         pol_steering.remove_vpp_config()
404         self.logger.info(self.vapi.cli("show sr steering policies"))
405
406         # remove SR Policies
407         self.sr_policy.remove_vpp_config()
408         self.logger.info(self.vapi.cli("show sr policies"))
409
410         # remove FIB entries
411         # done by tearDown
412
413         # cleanup interfaces
414         self.teardown_interfaces()
415
416     @unittest.skip("VPP crashes after running this test")
417     def test_SRv6_T_Encaps_L2(self):
418         """ Test SRv6 Transit.Encaps behavior for L2.
419         """
420         # send traffic to one destination interface
421         # source interface is IPv4 only TODO?
422         # destination interface is IPv6 only
423         self.setup_interfaces(ipv6=[False, True], ipv4=[False, False])
424
425         # configure FIB entries
426         route = VppIpRoute(self, "a4::", 64,
427                            [VppRoutePath(self.pg1.remote_ip6,
428                                          self.pg1.sw_if_index,
429                                          proto=DpoProto.DPO_PROTO_IP6)],
430                            is_ip6=1)
431         route.add_vpp_config()
432
433         # configure encaps IPv6 source address
434         # needs to be done before SR Policy config
435         # TODO: API?
436         self.vapi.cli("set sr encaps source addr a3::")
437
438         bsid = 'a3::9999:1'
439         # configure SRv6 Policy
440         # Note: segment list order: first -> last
441         sr_policy = VppSRv6Policy(
442             self, bsid=bsid,
443             is_encap=1,
444             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
445             weight=1, fib_table=0,
446             segments=['a4::', 'a5::', 'a6::c7'],
447             source='a3::')
448         sr_policy.add_vpp_config()
449         self.sr_policy = sr_policy
450
451         # log the sr policies
452         self.logger.info(self.vapi.cli("show sr policies"))
453
454         # steer L2 traffic into SRv6 Policy
455         # use the bsid of the above self.sr_policy
456         pol_steering = VppSRv6Steering(
457                         self,
458                         bsid=self.sr_policy.bsid,
459                         prefix="::", mask_width=0,
460                         traffic_type=SRv6PolicySteeringTypes.SR_STEER_L2,
461                         sr_policy_index=0, table_id=0,
462                         sw_if_index=self.pg0.sw_if_index)
463         pol_steering.add_vpp_config()
464
465         # log the sr steering policies
466         self.logger.info(self.vapi.cli("show sr steering policies"))
467
468         # create packets
469         count = len(self.pg_packet_sizes)
470         pkts = []
471
472         # create L2 packets without dot1q header
473         packet_header = self.create_packet_header_L2()
474         # create traffic stream pg0->pg1
475         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
476                                        self.pg_packet_sizes, count))
477
478         # create L2 packets with dot1q header
479         packet_header = self.create_packet_header_L2(vlan=123)
480         # create traffic stream pg0->pg1
481         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
482                                        self.pg_packet_sizes, count))
483
484         # send packets and verify received packets
485         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
486                                   self.compare_rx_tx_packet_T_Encaps_L2)
487
488         # log the localsid counters
489         self.logger.info(self.vapi.cli("show sr localsid"))
490
491         # remove SR steering
492         pol_steering.remove_vpp_config()
493         self.logger.info(self.vapi.cli("show sr steering policies"))
494
495         # remove SR Policies
496         self.sr_policy.remove_vpp_config()
497         self.logger.info(self.vapi.cli("show sr policies"))
498
499         # remove FIB entries
500         # done by tearDown
501
502         # cleanup interfaces
503         self.teardown_interfaces()
504
505     def test_SRv6_End(self):
506         """ Test SRv6 End (without PSP) behavior.
507         """
508         # send traffic to one destination interface
509         # source and destination interfaces are IPv6 only
510         self.setup_interfaces(ipv6=[True, True])
511
512         # configure FIB entries
513         route = VppIpRoute(self, "a4::", 64,
514                            [VppRoutePath(self.pg1.remote_ip6,
515                                          self.pg1.sw_if_index,
516                                          proto=DpoProto.DPO_PROTO_IP6)],
517                            is_ip6=1)
518         route.add_vpp_config()
519
520         # configure SRv6 localSID End without PSP behavior
521         localsid = VppSRv6LocalSID(
522                         self, localsid={'addr': 'A3::0'},
523                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
524                         nh_addr4='0.0.0.0',
525                         nh_addr6='::',
526                         end_psp=0,
527                         sw_if_index=0,
528                         vlan_index=0,
529                         fib_table=0)
530         localsid.add_vpp_config()
531         # log the localsids
532         self.logger.debug(self.vapi.cli("show sr localsid"))
533
534         # create IPv6 packets with SRH (SL=2, SL=1, SL=0)
535         # send one packet per SL value per packet size
536         # SL=0 packet with localSID End with USP needs 2nd SRH
537         count = len(self.pg_packet_sizes)
538         dst_inner = 'a4::1234'
539         pkts = []
540
541         # packets with segments-left 2, active segment a3::
542         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
543                 dst_inner,
544                 sidlist=['a5::', 'a4::', 'a3::'],
545                 segleft=2)
546         # create traffic stream pg0->pg1
547         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
548                                        self.pg_packet_sizes, count))
549
550         # packets with segments-left 1, active segment a3::
551         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
552                 dst_inner,
553                 sidlist=['a4::', 'a3::', 'a2::'],
554                 segleft=1)
555         # add to traffic stream pg0->pg1
556         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
557                                        self.pg_packet_sizes, count))
558
559         # TODO: test behavior with SL=0 packet (needs 2*SRH?)
560
561         # send packets and verify received packets
562         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
563                                   self.compare_rx_tx_packet_End)
564
565         # log the localsid counters
566         self.logger.info(self.vapi.cli("show sr localsid"))
567
568         # remove SRv6 localSIDs
569         localsid.remove_vpp_config()
570
571         # remove FIB entries
572         # done by tearDown
573
574         # cleanup interfaces
575         self.teardown_interfaces()
576
577     def test_SRv6_End_with_PSP(self):
578         """ Test SRv6 End with PSP behavior.
579         """
580         # send traffic to one destination interface
581         # source and destination interfaces are IPv6 only
582         self.setup_interfaces(ipv6=[True, True])
583
584         # configure FIB entries
585         route = VppIpRoute(self, "a4::", 64,
586                            [VppRoutePath(self.pg1.remote_ip6,
587                                          self.pg1.sw_if_index,
588                                          proto=DpoProto.DPO_PROTO_IP6)],
589                            is_ip6=1)
590         route.add_vpp_config()
591
592         # configure SRv6 localSID End with PSP behavior
593         localsid = VppSRv6LocalSID(
594                         self, localsid={'addr': 'A3::0'},
595                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
596                         nh_addr4='0.0.0.0',
597                         nh_addr6='::',
598                         end_psp=1,
599                         sw_if_index=0,
600                         vlan_index=0,
601                         fib_table=0)
602         localsid.add_vpp_config()
603         # log the localsids
604         self.logger.debug(self.vapi.cli("show sr localsid"))
605
606         # create IPv6 packets with SRH (SL=2, SL=1)
607         # send one packet per SL value per packet size
608         # SL=0 packet with localSID End with PSP is dropped
609         count = len(self.pg_packet_sizes)
610         dst_inner = 'a4::1234'
611         pkts = []
612
613         # packets with segments-left 2, active segment a3::
614         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
615                     dst_inner,
616                     sidlist=['a5::', 'a4::', 'a3::'],
617                     segleft=2)
618         # create traffic stream pg0->pg1
619         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
620                                        self.pg_packet_sizes, count))
621
622         # packets with segments-left 1, active segment a3::
623         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
624                     dst_inner,
625                     sidlist=['a4::', 'a3::', 'a2::'],
626                     segleft=1)
627         # add to traffic stream pg0->pg1
628         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
629                                        self.pg_packet_sizes, count))
630
631         # send packets and verify received packets
632         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
633                                   self.compare_rx_tx_packet_End_PSP)
634
635         # log the localsid counters
636         self.logger.info(self.vapi.cli("show sr localsid"))
637
638         # remove SRv6 localSIDs
639         localsid.remove_vpp_config()
640
641         # remove FIB entries
642         # done by tearDown
643
644         # cleanup interfaces
645         self.teardown_interfaces()
646
647     def test_SRv6_End_X(self):
648         """ Test SRv6 End.X (without PSP) behavior.
649         """
650         # create three interfaces (1 source, 2 destinations)
651         # source and destination interfaces are IPv6 only
652         self.setup_interfaces(ipv6=[True, True, True])
653
654         # configure FIB entries
655         # a4::/64 via pg1 and pg2
656         route = VppIpRoute(self, "a4::", 64,
657                            [VppRoutePath(self.pg1.remote_ip6,
658                                          self.pg1.sw_if_index,
659                                          proto=DpoProto.DPO_PROTO_IP6),
660                             VppRoutePath(self.pg2.remote_ip6,
661                                          self.pg2.sw_if_index,
662                                          proto=DpoProto.DPO_PROTO_IP6)],
663                            is_ip6=1)
664         route.add_vpp_config()
665         self.logger.debug(self.vapi.cli("show ip6 fib"))
666
667         # configure SRv6 localSID End.X without PSP behavior
668         # End.X points to interface pg1
669         localsid = VppSRv6LocalSID(
670                         self, localsid={'addr': 'A3::C4'},
671                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
672                         nh_addr4='0.0.0.0',
673                         nh_addr6=self.pg1.remote_ip6,
674                         end_psp=0,
675                         sw_if_index=self.pg1.sw_if_index,
676                         vlan_index=0,
677                         fib_table=0)
678         localsid.add_vpp_config()
679         # log the localsids
680         self.logger.debug(self.vapi.cli("show sr localsid"))
681
682         # create IPv6 packets with SRH (SL=2, SL=1)
683         # send one packet per SL value per packet size
684         # SL=0 packet with localSID End with PSP is dropped
685         count = len(self.pg_packet_sizes)
686         dst_inner = 'a4::1234'
687         pkts = []
688
689         # packets with segments-left 2, active segment a3::c4
690         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
691                     dst_inner,
692                     sidlist=['a5::', 'a4::', 'a3::c4'],
693                     segleft=2)
694         # create traffic stream pg0->pg1
695         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
696                                        self.pg_packet_sizes, count))
697
698         # packets with segments-left 1, active segment a3::c4
699         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
700                     dst_inner,
701                     sidlist=['a4::', 'a3::c4', 'a2::'],
702                     segleft=1)
703         # add to traffic stream pg0->pg1
704         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
705                                        self.pg_packet_sizes, count))
706
707         # send packets and verify received packets
708         # using same comparison function as End (no PSP)
709         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
710                                   self.compare_rx_tx_packet_End)
711
712         # assert nothing was received on the other interface (pg2)
713         self.pg2.assert_nothing_captured("mis-directed packet(s)")
714
715         # log the localsid counters
716         self.logger.info(self.vapi.cli("show sr localsid"))
717
718         # remove SRv6 localSIDs
719         localsid.remove_vpp_config()
720
721         # remove FIB entries
722         # done by tearDown
723
724         # cleanup interfaces
725         self.teardown_interfaces()
726
727     def test_SRv6_End_X_with_PSP(self):
728         """ Test SRv6 End.X with PSP behavior.
729         """
730         # create three interfaces (1 source, 2 destinations)
731         # source and destination interfaces are IPv6 only
732         self.setup_interfaces(ipv6=[True, True, True])
733
734         # configure FIB entries
735         # a4::/64 via pg1 and pg2
736         route = VppIpRoute(self, "a4::", 64,
737                            [VppRoutePath(self.pg1.remote_ip6,
738                                          self.pg1.sw_if_index,
739                                          proto=DpoProto.DPO_PROTO_IP6),
740                             VppRoutePath(self.pg2.remote_ip6,
741                                          self.pg2.sw_if_index,
742                                          proto=DpoProto.DPO_PROTO_IP6)],
743                            is_ip6=1)
744         route.add_vpp_config()
745
746         # configure SRv6 localSID End with PSP behavior
747         localsid = VppSRv6LocalSID(
748                         self, localsid={'addr': 'A3::C4'},
749                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
750                         nh_addr4='0.0.0.0',
751                         nh_addr6=self.pg1.remote_ip6,
752                         end_psp=1,
753                         sw_if_index=self.pg1.sw_if_index,
754                         vlan_index=0,
755                         fib_table=0)
756         localsid.add_vpp_config()
757         # log the localsids
758         self.logger.debug(self.vapi.cli("show sr localsid"))
759
760         # create IPv6 packets with SRH (SL=2, SL=1)
761         # send one packet per SL value per packet size
762         # SL=0 packet with localSID End with PSP is dropped
763         count = len(self.pg_packet_sizes)
764         dst_inner = 'a4::1234'
765         pkts = []
766
767         # packets with segments-left 2, active segment a3::
768         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
769                     dst_inner,
770                     sidlist=['a5::', 'a4::', 'a3::c4'],
771                     segleft=2)
772         # create traffic stream pg0->pg1
773         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
774                                        self.pg_packet_sizes, count))
775
776         # packets with segments-left 1, active segment a3::
777         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
778                     dst_inner,
779                     sidlist=['a4::', 'a3::c4', 'a2::'],
780                     segleft=1)
781         # add to traffic stream pg0->pg1
782         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
783                                        self.pg_packet_sizes, count))
784
785         # send packets and verify received packets
786         # using same comparison function as End with PSP
787         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
788                                   self.compare_rx_tx_packet_End_PSP)
789
790         # assert nothing was received on the other interface (pg2)
791         self.pg2.assert_nothing_captured("mis-directed packet(s)")
792
793         # log the localsid counters
794         self.logger.info(self.vapi.cli("show sr localsid"))
795
796         # remove SRv6 localSIDs
797         localsid.remove_vpp_config()
798
799         # remove FIB entries
800         # done by tearDown
801
802         # cleanup interfaces
803         self.teardown_interfaces()
804
805     def test_SRv6_End_DX6(self):
806         """ Test SRv6 End.DX6 behavior.
807         """
808         # send traffic to one destination interface
809         # source and destination interfaces are IPv6 only
810         self.setup_interfaces(ipv6=[True, True])
811
812         # configure SRv6 localSID End.DX6 behavior
813         localsid = VppSRv6LocalSID(
814                         self, localsid={'addr': 'A3::C4'},
815                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX6,
816                         nh_addr4='0.0.0.0',
817                         nh_addr6=self.pg1.remote_ip6,
818                         end_psp=0,
819                         sw_if_index=self.pg1.sw_if_index,
820                         vlan_index=0,
821                         fib_table=0)
822         localsid.add_vpp_config()
823         # log the localsids
824         self.logger.debug(self.vapi.cli("show sr localsid"))
825
826         # create IPv6 packets with SRH (SL=0)
827         # send one packet per packet size
828         count = len(self.pg_packet_sizes)
829         dst_inner = 'a4::1234'  # inner header destination address
830         pkts = []
831
832         # packets with SRH, segments-left 0, active segment a3::c4
833         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
834                         dst_inner,
835                         sidlist=['a3::c4', 'a2::', 'a1::'],
836                         segleft=0)
837         # add to traffic stream pg0->pg1
838         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
839                                        self.pg_packet_sizes, count))
840
841         # packets without SRH, IPv6 in IPv6
842         # outer IPv6 dest addr is the localsid End.DX6
843         packet_header = self.create_packet_header_IPv6_IPv6(
844                                             dst_inner,
845                                             dst_outer='a3::c4')
846         # add to traffic stream pg0->pg1
847         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
848                                        self.pg_packet_sizes, count))
849
850         # send packets and verify received packets
851         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
852                                   self.compare_rx_tx_packet_End_DX6)
853
854         # log the localsid counters
855         self.logger.info(self.vapi.cli("show sr localsid"))
856
857         # remove SRv6 localSIDs
858         localsid.remove_vpp_config()
859
860         # cleanup interfaces
861         self.teardown_interfaces()
862
863     def test_SRv6_End_DT6(self):
864         """ Test SRv6 End.DT6 behavior.
865         """
866         # create three interfaces (1 source, 2 destinations)
867         # all interfaces are IPv6 only
868         # source interface in global FIB (0)
869         # destination interfaces in global and vrf
870         vrf_1 = 1
871         ipt = VppIpTable(self, vrf_1, is_ip6=True)
872         ipt.add_vpp_config()
873         self.setup_interfaces(ipv6=[True, True, True],
874                               ipv6_table_id=[0, 0, vrf_1])
875
876         # configure FIB entries
877         # a4::/64 is reachable
878         #     via pg1 in table 0 (global)
879         #     and via pg2 in table vrf_1
880         route0 = VppIpRoute(self, "a4::", 64,
881                             [VppRoutePath(self.pg1.remote_ip6,
882                                           self.pg1.sw_if_index,
883                                           proto=DpoProto.DPO_PROTO_IP6,
884                                           nh_table_id=0)],
885                             table_id=0,
886                             is_ip6=1)
887         route0.add_vpp_config()
888         route1 = VppIpRoute(self, "a4::", 64,
889                             [VppRoutePath(self.pg2.remote_ip6,
890                                           self.pg2.sw_if_index,
891                                           proto=DpoProto.DPO_PROTO_IP6,
892                                           nh_table_id=vrf_1)],
893                             table_id=vrf_1,
894                             is_ip6=1)
895         route1.add_vpp_config()
896         self.logger.debug(self.vapi.cli("show ip6 fib"))
897
898         # configure SRv6 localSID End.DT6 behavior
899         # Note:
900         # fib_table: where the localsid is installed
901         # sw_if_index: in T-variants of localsid this is the vrf table_id
902         localsid = VppSRv6LocalSID(
903                         self, localsid={'addr': 'A3::C4'},
904                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT6,
905                         nh_addr4='0.0.0.0',
906                         nh_addr6='::',
907                         end_psp=0,
908                         sw_if_index=vrf_1,
909                         vlan_index=0,
910                         fib_table=0)
911         localsid.add_vpp_config()
912         # log the localsids
913         self.logger.debug(self.vapi.cli("show sr localsid"))
914
915         # create IPv6 packets with SRH (SL=0)
916         # send one packet per packet size
917         count = len(self.pg_packet_sizes)
918         dst_inner = 'a4::1234'  # inner header destination address
919         pkts = []
920
921         # packets with SRH, segments-left 0, active segment a3::c4
922         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
923                         dst_inner,
924                         sidlist=['a3::c4', 'a2::', 'a1::'],
925                         segleft=0)
926         # add to traffic stream pg0->pg1
927         pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
928                                        self.pg_packet_sizes, count))
929
930         # packets without SRH, IPv6 in IPv6
931         # outer IPv6 dest addr is the localsid End.DT6
932         packet_header = self.create_packet_header_IPv6_IPv6(
933                                             dst_inner,
934                                             dst_outer='a3::c4')
935         # add to traffic stream pg0->pg1
936         pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
937                                        self.pg_packet_sizes, count))
938
939         # send packets and verify received packets
940         # using same comparison function as End.DX6
941         self.send_and_verify_pkts(self.pg0, pkts, self.pg2,
942                                   self.compare_rx_tx_packet_End_DX6)
943
944         # assert nothing was received on the other interface (pg2)
945         self.pg1.assert_nothing_captured("mis-directed packet(s)")
946
947         # log the localsid counters
948         self.logger.info(self.vapi.cli("show sr localsid"))
949
950         # remove SRv6 localSIDs
951         localsid.remove_vpp_config()
952
953         # remove FIB entries
954         # done by tearDown
955
956         # cleanup interfaces
957         self.teardown_interfaces()
958
959     def test_SRv6_End_DX4(self):
960         """ Test SRv6 End.DX4 behavior.
961         """
962         # send traffic to one destination interface
963         # source interface is IPv6 only
964         # destination interface is IPv4 only
965         self.setup_interfaces(ipv6=[True, False], ipv4=[False, True])
966
967         # configure SRv6 localSID End.DX4 behavior
968         localsid = VppSRv6LocalSID(
969                         self, localsid={'addr': 'A3::C4'},
970                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX4,
971                         nh_addr4=self.pg1.remote_ip4,
972                         nh_addr6='::',
973                         end_psp=0,
974                         sw_if_index=self.pg1.sw_if_index,
975                         vlan_index=0,
976                         fib_table=0)
977         localsid.add_vpp_config()
978         # log the localsids
979         self.logger.debug(self.vapi.cli("show sr localsid"))
980
981         # send one packet per packet size
982         count = len(self.pg_packet_sizes)
983         dst_inner = '4.1.1.123'  # inner header destination address
984         pkts = []
985
986         # packets with SRH, segments-left 0, active segment a3::c4
987         packet_header = self.create_packet_header_IPv6_SRH_IPv4(
988                         dst_inner,
989                         sidlist=['a3::c4', 'a2::', 'a1::'],
990                         segleft=0)
991         # add to traffic stream pg0->pg1
992         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
993                                        self.pg_packet_sizes, count))
994
995         # packets without SRH, IPv4 in IPv6
996         # outer IPv6 dest addr is the localsid End.DX4
997         packet_header = self.create_packet_header_IPv6_IPv4(
998                                             dst_inner,
999                                             dst_outer='a3::c4')
1000         # add to traffic stream pg0->pg1
1001         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1002                                        self.pg_packet_sizes, count))
1003
1004         # send packets and verify received packets
1005         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
1006                                   self.compare_rx_tx_packet_End_DX4)
1007
1008         # log the localsid counters
1009         self.logger.info(self.vapi.cli("show sr localsid"))
1010
1011         # remove SRv6 localSIDs
1012         localsid.remove_vpp_config()
1013
1014         # cleanup interfaces
1015         self.teardown_interfaces()
1016
1017     def test_SRv6_End_DT4(self):
1018         """ Test SRv6 End.DT4 behavior.
1019         """
1020         # create three interfaces (1 source, 2 destinations)
1021         # source interface is IPv6-only
1022         # destination interfaces are IPv4 only
1023         # source interface in global FIB (0)
1024         # destination interfaces in global and vrf
1025         vrf_1 = 1
1026         ipt = VppIpTable(self, vrf_1)
1027         ipt.add_vpp_config()
1028         self.setup_interfaces(ipv6=[True, False, False],
1029                               ipv4=[False, True, True],
1030                               ipv6_table_id=[0, 0, 0],
1031                               ipv4_table_id=[0, 0, vrf_1])
1032
1033         # configure FIB entries
1034         # 4.1.1.0/24 is reachable
1035         #     via pg1 in table 0 (global)
1036         #     and via pg2 in table vrf_1
1037         route0 = VppIpRoute(self, "4.1.1.0", 24,
1038                             [VppRoutePath(self.pg1.remote_ip4,
1039                                           self.pg1.sw_if_index,
1040                                           nh_table_id=0)],
1041                             table_id=0,
1042                             is_ip6=0)
1043         route0.add_vpp_config()
1044         route1 = VppIpRoute(self, "4.1.1.0", 24,
1045                             [VppRoutePath(self.pg2.remote_ip4,
1046                                           self.pg2.sw_if_index,
1047                                           nh_table_id=vrf_1)],
1048                             table_id=vrf_1,
1049                             is_ip6=0)
1050         route1.add_vpp_config()
1051         self.logger.debug(self.vapi.cli("show ip fib"))
1052
1053         # configure SRv6 localSID End.DT6 behavior
1054         # Note:
1055         # fib_table: where the localsid is installed
1056         # sw_if_index: in T-variants of localsid: vrf table_id
1057         localsid = VppSRv6LocalSID(
1058                         self, localsid={'addr': 'A3::C4'},
1059                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT4,
1060                         nh_addr4='0.0.0.0',
1061                         nh_addr6='::',
1062                         end_psp=0,
1063                         sw_if_index=vrf_1,
1064                         vlan_index=0,
1065                         fib_table=0)
1066         localsid.add_vpp_config()
1067         # log the localsids
1068         self.logger.debug(self.vapi.cli("show sr localsid"))
1069
1070         # create IPv6 packets with SRH (SL=0)
1071         # send one packet per packet size
1072         count = len(self.pg_packet_sizes)
1073         dst_inner = '4.1.1.123'  # inner header destination address
1074         pkts = []
1075
1076         # packets with SRH, segments-left 0, active segment a3::c4
1077         packet_header = self.create_packet_header_IPv6_SRH_IPv4(
1078                         dst_inner,
1079                         sidlist=['a3::c4', 'a2::', 'a1::'],
1080                         segleft=0)
1081         # add to traffic stream pg0->pg1
1082         pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
1083                                        self.pg_packet_sizes, count))
1084
1085         # packets without SRH, IPv6 in IPv6
1086         # outer IPv6 dest addr is the localsid End.DX4
1087         packet_header = self.create_packet_header_IPv6_IPv4(
1088                                             dst_inner,
1089                                             dst_outer='a3::c4')
1090         # add to traffic stream pg0->pg1
1091         pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
1092                                        self.pg_packet_sizes, count))
1093
1094         # send packets and verify received packets
1095         # using same comparison function as End.DX4
1096         self.send_and_verify_pkts(self.pg0, pkts, self.pg2,
1097                                   self.compare_rx_tx_packet_End_DX4)
1098
1099         # assert nothing was received on the other interface (pg2)
1100         self.pg1.assert_nothing_captured("mis-directed packet(s)")
1101
1102         # log the localsid counters
1103         self.logger.info(self.vapi.cli("show sr localsid"))
1104
1105         # remove SRv6 localSIDs
1106         localsid.remove_vpp_config()
1107
1108         # remove FIB entries
1109         # done by tearDown
1110
1111         # cleanup interfaces
1112         self.teardown_interfaces()
1113
1114     def test_SRv6_End_DX2(self):
1115         """ Test SRv6 End.DX2 behavior.
1116         """
1117         # send traffic to one destination interface
1118         # source interface is IPv6 only
1119         self.setup_interfaces(ipv6=[True, False], ipv4=[False, False])
1120
1121         # configure SRv6 localSID End.DX2 behavior
1122         localsid = VppSRv6LocalSID(
1123                         self, localsid={'addr': 'A3::C4'},
1124                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX2,
1125                         nh_addr4='0.0.0.0',
1126                         nh_addr6='::',
1127                         end_psp=0,
1128                         sw_if_index=self.pg1.sw_if_index,
1129                         vlan_index=0,
1130                         fib_table=0)
1131         localsid.add_vpp_config()
1132         # log the localsids
1133         self.logger.debug(self.vapi.cli("show sr localsid"))
1134
1135         # send one packet per packet size
1136         count = len(self.pg_packet_sizes)
1137         pkts = []
1138
1139         # packets with SRH, segments-left 0, active segment a3::c4
1140         # L2 has no dot1q header
1141         packet_header = self.create_packet_header_IPv6_SRH_L2(
1142                             sidlist=['a3::c4', 'a2::', 'a1::'],
1143                             segleft=0,
1144                             vlan=0)
1145         # add to traffic stream pg0->pg1
1146         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1147                                        self.pg_packet_sizes, count))
1148
1149         # packets with SRH, segments-left 0, active segment a3::c4
1150         # L2 has dot1q header
1151         packet_header = self.create_packet_header_IPv6_SRH_L2(
1152                             sidlist=['a3::c4', 'a2::', 'a1::'],
1153                             segleft=0,
1154                             vlan=123)
1155         # add to traffic stream pg0->pg1
1156         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1157                                        self.pg_packet_sizes, count))
1158
1159         # packets without SRH, L2 in IPv6
1160         # outer IPv6 dest addr is the localsid End.DX2
1161         # L2 has no dot1q header
1162         packet_header = self.create_packet_header_IPv6_L2(
1163                                             dst_outer='a3::c4',
1164                                             vlan=0)
1165         # add to traffic stream pg0->pg1
1166         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1167                                        self.pg_packet_sizes, count))
1168
1169         # packets without SRH, L2 in IPv6
1170         # outer IPv6 dest addr is the localsid End.DX2
1171         # L2 has dot1q header
1172         packet_header = self.create_packet_header_IPv6_L2(
1173                                             dst_outer='a3::c4',
1174                                             vlan=123)
1175         # add to traffic stream pg0->pg1
1176         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1177                                        self.pg_packet_sizes, count))
1178
1179         # send packets and verify received packets
1180         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
1181                                   self.compare_rx_tx_packet_End_DX2)
1182
1183         # log the localsid counters
1184         self.logger.info(self.vapi.cli("show sr localsid"))
1185
1186         # remove SRv6 localSIDs
1187         localsid.remove_vpp_config()
1188
1189         # cleanup interfaces
1190         self.teardown_interfaces()
1191
1192     @unittest.skipUnless(0, "PC to fix")
1193     def test_SRv6_T_Insert_Classifier(self):
1194         """ Test SRv6 Transit.Insert behavior (IPv6 only).
1195             steer packets using the classifier
1196         """
1197         # send traffic to one destination interface
1198         # source and destination are IPv6 only
1199         self.setup_interfaces(ipv6=[False, False, False, True, True])
1200
1201         # configure FIB entries
1202         route = VppIpRoute(self, "a4::", 64,
1203                            [VppRoutePath(self.pg4.remote_ip6,
1204                                          self.pg4.sw_if_index,
1205                                          proto=DpoProto.DPO_PROTO_IP6)],
1206                            is_ip6=1)
1207         route.add_vpp_config()
1208
1209         # configure encaps IPv6 source address
1210         # needs to be done before SR Policy config
1211         # TODO: API?
1212         self.vapi.cli("set sr encaps source addr a3::")
1213
1214         bsid = 'a3::9999:1'
1215         # configure SRv6 Policy
1216         # Note: segment list order: first -> last
1217         sr_policy = VppSRv6Policy(
1218             self, bsid=bsid,
1219             is_encap=0,
1220             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
1221             weight=1, fib_table=0,
1222             segments=['a4::', 'a5::', 'a6::c7'],
1223             source='a3::')
1224         sr_policy.add_vpp_config()
1225         self.sr_policy = sr_policy
1226
1227         # log the sr policies
1228         self.logger.info(self.vapi.cli("show sr policies"))
1229
1230         # add classify table
1231         # mask on dst ip address prefix a7::/8
1232         mask = '{!s:0<16}'.format('ff')
1233         r = self.vapi.classify_add_del_table(
1234             1,
1235             binascii.unhexlify(mask),
1236             match_n_vectors=(len(mask) - 1) // 32 + 1,
1237             current_data_flag=1,
1238             skip_n_vectors=2)  # data offset
1239         self.assertIsNotNone(r, 'No response msg for add_del_table')
1240         table_index = r.new_table_index
1241
1242         # add the source routing node as a ip6 inacl netxt node
1243         r = self.vapi.add_node_next('ip6-inacl',
1244                                     'sr-pl-rewrite-insert')
1245         inacl_next_node_index = r.node_index
1246
1247         match = '{!s:0<16}'.format('a7')
1248         r = self.vapi.classify_add_del_session(
1249             1,
1250             table_index,
1251             binascii.unhexlify(match),
1252             hit_next_index=inacl_next_node_index,
1253             action=3,
1254             metadata=0)  # sr policy index
1255         self.assertIsNotNone(r, 'No response msg for add_del_session')
1256
1257         # log the classify table used in the steering policy
1258         self.logger.info(self.vapi.cli("show classify table"))
1259
1260         r = self.vapi.input_acl_set_interface(
1261             is_add=1,
1262             sw_if_index=self.pg3.sw_if_index,
1263             ip6_table_index=table_index)
1264         self.assertIsNotNone(r,
1265                              'No response msg for input_acl_set_interface')
1266
1267         # log the ip6 inacl
1268         self.logger.info(self.vapi.cli("show inacl type ip6"))
1269
1270         # create packets
1271         count = len(self.pg_packet_sizes)
1272         dst_inner = 'a7::1234'
1273         pkts = []
1274
1275         # create IPv6 packets without SRH
1276         packet_header = self.create_packet_header_IPv6(dst_inner)
1277         # create traffic stream pg3->pg4
1278         pkts.extend(self.create_stream(self.pg3, self.pg4, packet_header,
1279                                        self.pg_packet_sizes, count))
1280
1281         # create IPv6 packets with SRH
1282         # packets with segments-left 1, active segment a7::
1283         packet_header = self.create_packet_header_IPv6_SRH(
1284             sidlist=['a8::', 'a7::', 'a6::'],
1285             segleft=1)
1286         # create traffic stream pg3->pg4
1287         pkts.extend(self.create_stream(self.pg3, self.pg4, packet_header,
1288                                        self.pg_packet_sizes, count))
1289
1290         # send packets and verify received packets
1291         self.send_and_verify_pkts(self.pg3, pkts, self.pg4,
1292                                   self.compare_rx_tx_packet_T_Insert)
1293
1294         # remove the interface l2 input feature
1295         r = self.vapi.input_acl_set_interface(
1296             is_add=0,
1297             sw_if_index=self.pg3.sw_if_index,
1298             ip6_table_index=table_index)
1299         self.assertIsNotNone(r,
1300                              'No response msg for input_acl_set_interface')
1301
1302         # log the ip6 inacl after cleaning
1303         self.logger.info(self.vapi.cli("show inacl type ip6"))
1304
1305         # log the localsid counters
1306         self.logger.info(self.vapi.cli("show sr localsid"))
1307
1308         # remove classifier SR steering
1309         # classifier_steering.remove_vpp_config()
1310         self.logger.info(self.vapi.cli("show sr steering policies"))
1311
1312         # remove SR Policies
1313         self.sr_policy.remove_vpp_config()
1314         self.logger.info(self.vapi.cli("show sr policies"))
1315
1316         # remove classify session and table
1317         r = self.vapi.classify_add_del_session(
1318             0,
1319             table_index,
1320             binascii.unhexlify(match))
1321         self.assertIsNotNone(r, 'No response msg for add_del_session')
1322
1323         r = self.vapi.classify_add_del_table(
1324             0,
1325             binascii.unhexlify(mask),
1326             table_index=table_index)
1327         self.assertIsNotNone(r, 'No response msg for add_del_table')
1328
1329         self.logger.info(self.vapi.cli("show classify table"))
1330
1331         # remove FIB entries
1332         # done by tearDown
1333
1334         # cleanup interfaces
1335         self.teardown_interfaces()
1336
1337     def compare_rx_tx_packet_T_Encaps(self, tx_pkt, rx_pkt):
1338         """ Compare input and output packet after passing T.Encaps
1339
1340         :param tx_pkt: transmitted packet
1341         :param rx_pkt: received packet
1342         """
1343         # T.Encaps updates the headers as follows:
1344         # SR Policy seglist (S3, S2, S1)
1345         # SR Policy source C
1346         # IPv6:
1347         # in: IPv6(A, B2)
1348         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(A, B2)
1349         # IPv6 + SRH:
1350         # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1)
1351         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(a, B2)SRH(B3, B2, B1; SL=1)
1352
1353         # get first (outer) IPv6 header of rx'ed packet
1354         rx_ip = rx_pkt.getlayer(IPv6)
1355         rx_srh = None
1356
1357         tx_ip = tx_pkt.getlayer(IPv6)
1358
1359         # expected segment-list
1360         seglist = self.sr_policy.segments
1361         # reverse list to get order as in SRH
1362         tx_seglist = seglist[::-1]
1363
1364         # get source address of SR Policy
1365         sr_policy_source = self.sr_policy.source
1366
1367         # rx'ed packet should have SRH
1368         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1369         # get SRH
1370         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1371
1372         # received ip.src should be equal to SR Policy source
1373         self.assertEqual(rx_ip.src, sr_policy_source)
1374         # received ip.dst should be equal to expected sidlist[lastentry]
1375         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1376         # rx'ed seglist should be equal to expected seglist
1377         self.assertEqual(rx_srh.addresses, tx_seglist)
1378         # segleft should be equal to size expected seglist-1
1379         self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1380         # segleft should be equal to lastentry
1381         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1382
1383         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1384         # except for the hop-limit field
1385         #   -> update tx'ed hlim to the expected hlim
1386         tx_ip.hlim = tx_ip.hlim - 1
1387
1388         self.assertEqual(rx_srh.payload, tx_ip)
1389
1390         self.logger.debug("packet verification: SUCCESS")
1391
1392     def compare_rx_tx_packet_T_Encaps_IPv4(self, tx_pkt, rx_pkt):
1393         """ Compare input and output packet after passing T.Encaps for IPv4
1394
1395         :param tx_pkt: transmitted packet
1396         :param rx_pkt: received packet
1397         """
1398         # T.Encaps for IPv4 updates the headers as follows:
1399         # SR Policy seglist (S3, S2, S1)
1400         # SR Policy source C
1401         # IPv4:
1402         # in: IPv4(A, B2)
1403         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv4(A, B2)
1404
1405         # get first (outer) IPv6 header of rx'ed packet
1406         rx_ip = rx_pkt.getlayer(IPv6)
1407         rx_srh = None
1408
1409         tx_ip = tx_pkt.getlayer(IP)
1410
1411         # expected segment-list
1412         seglist = self.sr_policy.segments
1413         # reverse list to get order as in SRH
1414         tx_seglist = seglist[::-1]
1415
1416         # get source address of SR Policy
1417         sr_policy_source = self.sr_policy.source
1418
1419         # checks common to cases tx with and without SRH
1420         # rx'ed packet should have SRH and IPv4 header
1421         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1422         self.assertTrue(rx_ip.payload.haslayer(IP))
1423         # get SRH
1424         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1425
1426         # received ip.src should be equal to SR Policy source
1427         self.assertEqual(rx_ip.src, sr_policy_source)
1428         # received ip.dst should be equal to sidlist[lastentry]
1429         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1430         # rx'ed seglist should be equal to seglist
1431         self.assertEqual(rx_srh.addresses, tx_seglist)
1432         # segleft should be equal to size seglist-1
1433         self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1434         # segleft should be equal to lastentry
1435         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1436
1437         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1438         # except for the ttl field and ip checksum
1439         #   -> adjust tx'ed ttl to expected ttl
1440         tx_ip.ttl = tx_ip.ttl - 1
1441         #   -> set tx'ed ip checksum to None and let scapy recompute
1442         tx_ip.chksum = None
1443         # read back the pkt (with str()) to force computing these fields
1444         # probably other ways to accomplish this are possible
1445         tx_ip = IP(scapy.compat.raw(tx_ip))
1446
1447         self.assertEqual(rx_srh.payload, tx_ip)
1448
1449         self.logger.debug("packet verification: SUCCESS")
1450
1451     def compare_rx_tx_packet_T_Encaps_L2(self, tx_pkt, rx_pkt):
1452         """ Compare input and output packet after passing T.Encaps for L2
1453
1454         :param tx_pkt: transmitted packet
1455         :param rx_pkt: received packet
1456         """
1457         # T.Encaps for L2 updates the headers as follows:
1458         # SR Policy seglist (S3, S2, S1)
1459         # SR Policy source C
1460         # L2:
1461         # in: L2
1462         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)L2
1463
1464         # get first (outer) IPv6 header of rx'ed packet
1465         rx_ip = rx_pkt.getlayer(IPv6)
1466         rx_srh = None
1467
1468         tx_ether = tx_pkt.getlayer(Ether)
1469
1470         # expected segment-list
1471         seglist = self.sr_policy.segments
1472         # reverse list to get order as in SRH
1473         tx_seglist = seglist[::-1]
1474
1475         # get source address of SR Policy
1476         sr_policy_source = self.sr_policy.source
1477
1478         # rx'ed packet should have SRH
1479         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1480         # get SRH
1481         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1482
1483         # received ip.src should be equal to SR Policy source
1484         self.assertEqual(rx_ip.src, sr_policy_source)
1485         # received ip.dst should be equal to sidlist[lastentry]
1486         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1487         # rx'ed seglist should be equal to seglist
1488         self.assertEqual(rx_srh.addresses, tx_seglist)
1489         # segleft should be equal to size seglist-1
1490         self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1491         # segleft should be equal to lastentry
1492         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1493         # nh should be "No Next Header" (59)
1494         self.assertEqual(rx_srh.nh, 59)
1495
1496         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1497         self.assertEqual(Ether(scapy.compat.raw(rx_srh.payload)), tx_ether)
1498
1499         self.logger.debug("packet verification: SUCCESS")
1500
1501     def compare_rx_tx_packet_T_Insert(self, tx_pkt, rx_pkt):
1502         """ Compare input and output packet after passing T.Insert
1503
1504         :param tx_pkt: transmitted packet
1505         :param rx_pkt: received packet
1506         """
1507         # T.Insert updates the headers as follows:
1508         # IPv6:
1509         # in: IPv6(A, B2)
1510         # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)
1511         # IPv6 + SRH:
1512         # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1)
1513         # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)SRH(B3, B2, B1; SL=1)
1514
1515         # get first (outer) IPv6 header of rx'ed packet
1516         rx_ip = rx_pkt.getlayer(IPv6)
1517         rx_srh = None
1518         rx_ip2 = None
1519         rx_srh2 = None
1520         rx_ip3 = None
1521         rx_udp = rx_pkt[UDP]
1522
1523         tx_ip = tx_pkt.getlayer(IPv6)
1524         tx_srh = None
1525         tx_ip2 = None
1526         # some packets have been tx'ed with an SRH, some without it
1527         # get SRH if tx'ed packet has it
1528         if tx_pkt.haslayer(IPv6ExtHdrSegmentRouting):
1529             tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1530             tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1531         tx_udp = tx_pkt[UDP]
1532
1533         # expected segment-list (make copy of SR Policy segment list)
1534         seglist = self.sr_policy.segments[:]
1535         # expected seglist has initial dest addr as last segment
1536         seglist.append(tx_ip.dst)
1537         # reverse list to get order as in SRH
1538         tx_seglist = seglist[::-1]
1539
1540         # get source address of SR Policy
1541         sr_policy_source = self.sr_policy.source
1542
1543         # checks common to cases tx with and without SRH
1544         # rx'ed packet should have SRH and only one IPv6 header
1545         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1546         self.assertFalse(rx_ip.payload.haslayer(IPv6))
1547         # get SRH
1548         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1549
1550         # rx'ed ip.src should be equal to tx'ed ip.src
1551         self.assertEqual(rx_ip.src, tx_ip.src)
1552         # rx'ed ip.dst should be equal to sidlist[lastentry]
1553         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1554
1555         # rx'ed seglist should be equal to expected seglist
1556         self.assertEqual(rx_srh.addresses, tx_seglist)
1557         # segleft should be equal to size(expected seglist)-1
1558         self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1559         # segleft should be equal to lastentry
1560         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1561
1562         if tx_srh:  # packet was tx'ed with SRH
1563             # packet should have 2nd SRH
1564             self.assertTrue(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting))
1565             # get 2nd SRH
1566             rx_srh2 = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting, 2)
1567
1568             # rx'ed srh2.addresses should be equal to tx'ed srh.addresses
1569             self.assertEqual(rx_srh2.addresses, tx_srh.addresses)
1570             # rx'ed srh2.segleft should be equal to tx'ed srh.segleft
1571             self.assertEqual(rx_srh2.segleft, tx_srh.segleft)
1572             # rx'ed srh2.lastentry should be equal to tx'ed srh.lastentry
1573             self.assertEqual(rx_srh2.lastentry, tx_srh.lastentry)
1574
1575         else:  # packet was tx'ed without SRH
1576             # rx packet should have no other SRH
1577             self.assertFalse(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting))
1578
1579         # UDP layer should be unchanged
1580         self.assertEqual(rx_udp, tx_udp)
1581
1582         self.logger.debug("packet verification: SUCCESS")
1583
1584     def compare_rx_tx_packet_End(self, tx_pkt, rx_pkt):
1585         """ Compare input and output packet after passing End (without PSP)
1586
1587         :param tx_pkt: transmitted packet
1588         :param rx_pkt: received packet
1589         """
1590         # End (no PSP) updates the headers as follows:
1591         # IPv6 + SRH:
1592         # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2)
1593         # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1594
1595         # get first (outer) IPv6 header of rx'ed packet
1596         rx_ip = rx_pkt.getlayer(IPv6)
1597         rx_srh = None
1598         rx_ip2 = None
1599         rx_udp = rx_pkt[UDP]
1600
1601         tx_ip = tx_pkt.getlayer(IPv6)
1602         # we know the packet has been tx'ed
1603         # with an inner IPv6 header and an SRH
1604         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1605         tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1606         tx_udp = tx_pkt[UDP]
1607
1608         # common checks, regardless of tx segleft value
1609         # rx'ed packet should have 2nd IPv6 header
1610         self.assertTrue(rx_ip.payload.haslayer(IPv6))
1611         # get second (inner) IPv6 header
1612         rx_ip2 = rx_pkt.getlayer(IPv6, 2)
1613
1614         if tx_ip.segleft > 0:
1615             # SRH should NOT have been popped:
1616             #   End SID without PSP does not pop SRH if segleft>0
1617             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1618             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1619
1620             # received ip.src should be equal to expected ip.src
1621             self.assertEqual(rx_ip.src, tx_ip.src)
1622             # sidlist should be unchanged
1623             self.assertEqual(rx_srh.addresses, tx_srh.addresses)
1624             # segleft should have been decremented
1625             self.assertEqual(rx_srh.segleft, tx_srh.segleft-1)
1626             # received ip.dst should be equal to sidlist[segleft]
1627             self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
1628             # lastentry should be unchanged
1629             self.assertEqual(rx_srh.lastentry, tx_srh.lastentry)
1630             # inner IPv6 packet (ip2) should be unchanged
1631             self.assertEqual(rx_ip2.src, tx_ip2.src)
1632             self.assertEqual(rx_ip2.dst, tx_ip2.dst)
1633         # else:  # tx_ip.segleft == 0
1634             # TODO: Does this work with 2 SRHs in ingress packet?
1635
1636         # UDP layer should be unchanged
1637         self.assertEqual(rx_udp, tx_udp)
1638
1639         self.logger.debug("packet verification: SUCCESS")
1640
1641     def compare_rx_tx_packet_End_PSP(self, tx_pkt, rx_pkt):
1642         """ Compare input and output packet after passing End with PSP
1643
1644         :param tx_pkt: transmitted packet
1645         :param rx_pkt: received packet
1646         """
1647         # End (PSP) updates the headers as follows:
1648         # IPv6 + SRH (SL>1):
1649         # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2)
1650         # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1651         # IPv6 + SRH (SL=1):
1652         # in: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1653         # out: IPv6(A, S3)
1654
1655         # get first (outer) IPv6 header of rx'ed packet
1656         rx_ip = rx_pkt.getlayer(IPv6)
1657         rx_srh = None
1658         rx_ip2 = None
1659         rx_udp = rx_pkt[UDP]
1660
1661         tx_ip = tx_pkt.getlayer(IPv6)
1662         # we know the packet has been tx'ed
1663         # with an inner IPv6 header and an SRH
1664         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1665         tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1666         tx_udp = tx_pkt[UDP]
1667
1668         # common checks, regardless of tx segleft value
1669         self.assertTrue(rx_ip.payload.haslayer(IPv6))
1670         rx_ip2 = rx_pkt.getlayer(IPv6, 2)
1671         # inner IPv6 packet (ip2) should be unchanged
1672         self.assertEqual(rx_ip2.src, tx_ip2.src)
1673         self.assertEqual(rx_ip2.dst, tx_ip2.dst)
1674
1675         if tx_ip.segleft > 1:
1676             # SRH should NOT have been popped:
1677             #   End SID with PSP does not pop SRH if segleft>1
1678             # rx'ed packet should have SRH
1679             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1680             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1681
1682             # received ip.src should be equal to expected ip.src
1683             self.assertEqual(rx_ip.src, tx_ip.src)
1684             # sidlist should be unchanged
1685             self.assertEqual(rx_srh.addresses, tx_srh.addresses)
1686             # segleft should have been decremented
1687             self.assertEqual(rx_srh.segleft, tx_srh.segleft-1)
1688             # received ip.dst should be equal to sidlist[segleft]
1689             self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
1690             # lastentry should be unchanged
1691             self.assertEqual(rx_srh.lastentry, tx_srh.lastentry)
1692
1693         else:  # tx_ip.segleft <= 1
1694             # SRH should have been popped:
1695             #   End SID with PSP and segleft=1 pops SRH
1696             # the two IPv6 headers are still present
1697             # outer IPv6 header has DA == last segment of popped SRH
1698             # SRH should not be present
1699             self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1700             # outer IPv6 header ip.src should be equal to tx'ed ip.src
1701             self.assertEqual(rx_ip.src, tx_ip.src)
1702             # outer IPv6 header ip.dst should be = to tx'ed sidlist[segleft-1]
1703             self.assertEqual(rx_ip.dst, tx_srh.addresses[tx_srh.segleft-1])
1704
1705         # UDP layer should be unchanged
1706         self.assertEqual(rx_udp, tx_udp)
1707
1708         self.logger.debug("packet verification: SUCCESS")
1709
1710     def compare_rx_tx_packet_End_DX6(self, tx_pkt, rx_pkt):
1711         """ Compare input and output packet after passing End.DX6
1712
1713         :param tx_pkt: transmitted packet
1714         :param rx_pkt: received packet
1715         """
1716         # End.DX6 updates the headers as follows:
1717         # IPv6 + SRH (SL=0):
1718         # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv6(B, D)
1719         # out: IPv6(B, D)
1720         # IPv6:
1721         # in: IPv6(A, S3)IPv6(B, D)
1722         # out: IPv6(B, D)
1723
1724         # get first (outer) IPv6 header of rx'ed packet
1725         rx_ip = rx_pkt.getlayer(IPv6)
1726
1727         tx_ip = tx_pkt.getlayer(IPv6)
1728         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1729
1730         # verify if rx'ed packet has no SRH
1731         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1732
1733         # the whole rx_ip pkt should be equal to tx_ip2
1734         # except for the hlim field
1735         #   -> adjust tx'ed hlim to expected hlim
1736         tx_ip2.hlim = tx_ip2.hlim - 1
1737
1738         self.assertEqual(rx_ip, tx_ip2)
1739
1740         self.logger.debug("packet verification: SUCCESS")
1741
1742     def compare_rx_tx_packet_End_DX4(self, tx_pkt, rx_pkt):
1743         """ Compare input and output packet after passing End.DX4
1744
1745         :param tx_pkt: transmitted packet
1746         :param rx_pkt: received packet
1747         """
1748         # End.DX4 updates the headers as follows:
1749         # IPv6 + SRH (SL=0):
1750         # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv4(B, D)
1751         # out: IPv4(B, D)
1752         # IPv6:
1753         # in: IPv6(A, S3)IPv4(B, D)
1754         # out: IPv4(B, D)
1755
1756         # get IPv4 header of rx'ed packet
1757         rx_ip = rx_pkt.getlayer(IP)
1758
1759         tx_ip = tx_pkt.getlayer(IPv6)
1760         tx_ip2 = tx_pkt.getlayer(IP)
1761
1762         # verify if rx'ed packet has no SRH
1763         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1764
1765         # the whole rx_ip pkt should be equal to tx_ip2
1766         # except for the ttl field and ip checksum
1767         #   -> adjust tx'ed ttl to expected ttl
1768         tx_ip2.ttl = tx_ip2.ttl - 1
1769         #   -> set tx'ed ip checksum to None and let scapy recompute
1770         tx_ip2.chksum = None
1771         # read back the pkt (with str()) to force computing these fields
1772         # probably other ways to accomplish this are possible
1773         tx_ip2 = IP(scapy.compat.raw(tx_ip2))
1774
1775         self.assertEqual(rx_ip, tx_ip2)
1776
1777         self.logger.debug("packet verification: SUCCESS")
1778
1779     def compare_rx_tx_packet_End_DX2(self, tx_pkt, rx_pkt):
1780         """ Compare input and output packet after passing End.DX2
1781
1782         :param tx_pkt: transmitted packet
1783         :param rx_pkt: received packet
1784         """
1785         # End.DX2 updates the headers as follows:
1786         # IPv6 + SRH (SL=0):
1787         # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)L2
1788         # out: L2
1789         # IPv6:
1790         # in: IPv6(A, S3)L2
1791         # out: L2
1792
1793         # get IPv4 header of rx'ed packet
1794         rx_eth = rx_pkt.getlayer(Ether)
1795
1796         tx_ip = tx_pkt.getlayer(IPv6)
1797         # we can't just get the 2nd Ether layer
1798         # get the Raw content and dissect it as Ether
1799         tx_eth1 = Ether(scapy.compat.raw(tx_pkt[Raw]))
1800
1801         # verify if rx'ed packet has no SRH
1802         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1803
1804         # the whole rx_eth pkt should be equal to tx_eth1
1805         self.assertEqual(rx_eth, tx_eth1)
1806
1807         self.logger.debug("packet verification: SUCCESS")
1808
1809     def create_stream(self, src_if, dst_if, packet_header, packet_sizes,
1810                       count):
1811         """Create SRv6 input packet stream for defined interface.
1812
1813         :param VppInterface src_if: Interface to create packet stream for
1814         :param VppInterface dst_if: destination interface of packet stream
1815         :param packet_header: Layer3 scapy packet headers,
1816                 L2 is added when not provided,
1817                 Raw(payload) with packet_info is added
1818         :param list packet_sizes: packet stream pckt sizes,sequentially applied
1819                to packets in stream have
1820         :param int count: number of packets in packet stream
1821         :return: list of packets
1822         """
1823         self.logger.info("Creating packets")
1824         pkts = []
1825         for i in range(0, count-1):
1826             payload_info = self.create_packet_info(src_if, dst_if)
1827             self.logger.debug(
1828                 "Creating packet with index %d" % (payload_info.index))
1829             payload = self.info_to_payload(payload_info)
1830             # add L2 header if not yet provided in packet_header
1831             if packet_header.getlayer(0).name == 'Ethernet':
1832                 p = (packet_header /
1833                      Raw(payload))
1834             else:
1835                 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
1836                      packet_header /
1837                      Raw(payload))
1838             size = packet_sizes[i % len(packet_sizes)]
1839             self.logger.debug("Packet size %d" % (size))
1840             self.extend_packet(p, size)
1841             # we need to store the packet with the automatic fields computed
1842             # read back the dumped packet (with str())
1843             # to force computing these fields
1844             # probably other ways are possible
1845             p = Ether(scapy.compat.raw(p))
1846             payload_info.data = p.copy()
1847             self.logger.debug(ppp("Created packet:", p))
1848             pkts.append(p)
1849         self.logger.info("Done creating packets")
1850         return pkts
1851
1852     def send_and_verify_pkts(self, input, pkts, output, compare_func):
1853         """Send packets and verify received packets using compare_func
1854
1855         :param input: ingress interface of DUT
1856         :param pkts: list of packets to transmit
1857         :param output: egress interface of DUT
1858         :param compare_func: function to compare in and out packets
1859         """
1860         # add traffic stream to input interface
1861         input.add_stream(pkts)
1862
1863         # enable capture on all interfaces
1864         self.pg_enable_capture(self.pg_interfaces)
1865
1866         # start traffic
1867         self.logger.info("Starting traffic")
1868         self.pg_start()
1869
1870         # get output capture
1871         self.logger.info("Getting packet capture")
1872         capture = output.get_capture()
1873
1874         # assert nothing was captured on input interface
1875         input.assert_nothing_captured()
1876
1877         # verify captured packets
1878         self.verify_captured_pkts(output, capture, compare_func)
1879
1880     def create_packet_header_IPv6(self, dst):
1881         """Create packet header: IPv6 header, UDP header
1882
1883         :param dst: IPv6 destination address
1884
1885         IPv6 source address is 1234::1
1886         UDP source port and destination port are 1234
1887         """
1888
1889         p = (IPv6(src='1234::1', dst=dst) /
1890              UDP(sport=1234, dport=1234))
1891         return p
1892
1893     def create_packet_header_IPv6_SRH(self, sidlist, segleft):
1894         """Create packet header: IPv6 header with SRH, UDP header
1895
1896         :param list sidlist: segment list
1897         :param int segleft: segments-left field value
1898
1899         IPv6 destination address is set to sidlist[segleft]
1900         IPv6 source addresses are 1234::1 and 4321::1
1901         UDP source port and destination port are 1234
1902         """
1903
1904         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1905              IPv6ExtHdrSegmentRouting(addresses=sidlist) /
1906              UDP(sport=1234, dport=1234))
1907         return p
1908
1909     def create_packet_header_IPv6_SRH_IPv6(self, dst, sidlist, segleft):
1910         """Create packet header: IPv6 encapsulated in SRv6:
1911         IPv6 header with SRH, IPv6 header, UDP header
1912
1913         :param ipv6address dst: inner IPv6 destination address
1914         :param list sidlist: segment list of outer IPv6 SRH
1915         :param int segleft: segments-left field of outer IPv6 SRH
1916
1917         Outer IPv6 destination address is set to sidlist[segleft]
1918         IPv6 source addresses are 1234::1 and 4321::1
1919         UDP source port and destination port are 1234
1920         """
1921
1922         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1923              IPv6ExtHdrSegmentRouting(addresses=sidlist,
1924                                       segleft=segleft, nh=41) /
1925              IPv6(src='4321::1', dst=dst) /
1926              UDP(sport=1234, dport=1234))
1927         return p
1928
1929     def create_packet_header_IPv6_IPv6(self, dst_inner, dst_outer):
1930         """Create packet header: IPv6 encapsulated in IPv6:
1931         IPv6 header, IPv6 header, UDP header
1932
1933         :param ipv6address dst_inner: inner IPv6 destination address
1934         :param ipv6address dst_outer: outer IPv6 destination address
1935
1936         IPv6 source addresses are 1234::1 and 4321::1
1937         UDP source port and destination port are 1234
1938         """
1939
1940         p = (IPv6(src='1234::1', dst=dst_outer) /
1941              IPv6(src='4321::1', dst=dst_inner) /
1942              UDP(sport=1234, dport=1234))
1943         return p
1944
1945     def create_packet_header_IPv6_SRH_SRH_IPv6(self, dst, sidlist1, segleft1,
1946                                                sidlist2, segleft2):
1947         """Create packet header: IPv6 encapsulated in SRv6 with 2 SRH:
1948         IPv6 header with SRH, 2nd SRH, IPv6 header, UDP header
1949
1950         :param ipv6address dst: inner IPv6 destination address
1951         :param list sidlist1: segment list of outer IPv6 SRH
1952         :param int segleft1: segments-left field of outer IPv6 SRH
1953         :param list sidlist2: segment list of inner IPv6 SRH
1954         :param int segleft2: segments-left field of inner IPv6 SRH
1955
1956         Outer IPv6 destination address is set to sidlist[segleft]
1957         IPv6 source addresses are 1234::1 and 4321::1
1958         UDP source port and destination port are 1234
1959         """
1960
1961         p = (IPv6(src='1234::1', dst=sidlist1[segleft1]) /
1962              IPv6ExtHdrSegmentRouting(addresses=sidlist1,
1963                                       segleft=segleft1, nh=43) /
1964              IPv6ExtHdrSegmentRouting(addresses=sidlist2,
1965                                       segleft=segleft2, nh=41) /
1966              IPv6(src='4321::1', dst=dst) /
1967              UDP(sport=1234, dport=1234))
1968         return p
1969
1970     def create_packet_header_IPv4(self, dst):
1971         """Create packet header: IPv4 header, UDP header
1972
1973         :param dst: IPv4 destination address
1974
1975         IPv4 source address is 123.1.1.1
1976         UDP source port and destination port are 1234
1977         """
1978
1979         p = (IP(src='123.1.1.1', dst=dst) /
1980              UDP(sport=1234, dport=1234))
1981         return p
1982
1983     def create_packet_header_IPv6_IPv4(self, dst_inner, dst_outer):
1984         """Create packet header: IPv4 encapsulated in IPv6:
1985         IPv6 header, IPv4 header, UDP header
1986
1987         :param ipv4address dst_inner: inner IPv4 destination address
1988         :param ipv6address dst_outer: outer IPv6 destination address
1989
1990         IPv6 source address is 1234::1
1991         IPv4 source address is 123.1.1.1
1992         UDP source port and destination port are 1234
1993         """
1994
1995         p = (IPv6(src='1234::1', dst=dst_outer) /
1996              IP(src='123.1.1.1', dst=dst_inner) /
1997              UDP(sport=1234, dport=1234))
1998         return p
1999
2000     def create_packet_header_IPv6_SRH_IPv4(self, dst, sidlist, segleft):
2001         """Create packet header: IPv4 encapsulated in SRv6:
2002         IPv6 header with SRH, IPv4 header, UDP header
2003
2004         :param ipv4address dst: inner IPv4 destination address
2005         :param list sidlist: segment list of outer IPv6 SRH
2006         :param int segleft: segments-left field of outer IPv6 SRH
2007
2008         Outer IPv6 destination address is set to sidlist[segleft]
2009         IPv6 source address is 1234::1
2010         IPv4 source address is 123.1.1.1
2011         UDP source port and destination port are 1234
2012         """
2013
2014         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
2015              IPv6ExtHdrSegmentRouting(addresses=sidlist,
2016                                       segleft=segleft, nh=4) /
2017              IP(src='123.1.1.1', dst=dst) /
2018              UDP(sport=1234, dport=1234))
2019         return p
2020
2021     def create_packet_header_L2(self, vlan=0):
2022         """Create packet header: L2 header
2023
2024         :param vlan: if vlan!=0 then add 802.1q header
2025         """
2026         # Note: the dst addr ('00:55:44:33:22:11') is used in
2027         # the compare function compare_rx_tx_packet_T_Encaps_L2
2028         # to detect presence of L2 in SRH payload
2029         p = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
2030         etype = 0x8137  # IPX
2031         if vlan:
2032             # add 802.1q layer
2033             p /= Dot1Q(vlan=vlan, type=etype)
2034         else:
2035             p.type = etype
2036         return p
2037
2038     def create_packet_header_IPv6_SRH_L2(self, sidlist, segleft, vlan=0):
2039         """Create packet header: L2 encapsulated in SRv6:
2040         IPv6 header with SRH, L2
2041
2042         :param list sidlist: segment list of outer IPv6 SRH
2043         :param int segleft: segments-left field of outer IPv6 SRH
2044         :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
2045
2046         Outer IPv6 destination address is set to sidlist[segleft]
2047         IPv6 source address is 1234::1
2048         """
2049         eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
2050         etype = 0x8137  # IPX
2051         if vlan:
2052             # add 802.1q layer
2053             eth /= Dot1Q(vlan=vlan, type=etype)
2054         else:
2055             eth.type = etype
2056
2057         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
2058              IPv6ExtHdrSegmentRouting(addresses=sidlist,
2059                                       segleft=segleft, nh=59) /
2060              eth)
2061         return p
2062
2063     def create_packet_header_IPv6_L2(self, dst_outer, vlan=0):
2064         """Create packet header: L2 encapsulated in IPv6:
2065         IPv6 header, L2
2066
2067         :param ipv6address dst_outer: outer IPv6 destination address
2068         :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
2069         """
2070         eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
2071         etype = 0x8137  # IPX
2072         if vlan:
2073             # add 802.1q layer
2074             eth /= Dot1Q(vlan=vlan, type=etype)
2075         else:
2076             eth.type = etype
2077
2078         p = (IPv6(src='1234::1', dst=dst_outer, nh=59) / eth)
2079         return p
2080
2081     def get_payload_info(self, packet):
2082         """ Extract the payload_info from the packet
2083         """
2084         # in most cases, payload_info is in packet[Raw]
2085         # but packet[Raw] gives the complete payload
2086         # (incl L2 header) for the T.Encaps L2 case
2087         try:
2088             payload_info = self.payload_to_info(packet[Raw])
2089
2090         except:
2091             # remote L2 header from packet[Raw]:
2092             # take packet[Raw], convert it to an Ether layer
2093             # and then extract Raw from it
2094             payload_info = self.payload_to_info(
2095                 Ether(scapy.compat.r(packet[Raw]))[Raw])
2096
2097         return payload_info
2098
2099     def verify_captured_pkts(self, dst_if, capture, compare_func):
2100         """
2101         Verify captured packet stream for specified interface.
2102         Compare ingress with egress packets using the specified compare fn
2103
2104         :param dst_if: egress interface of DUT
2105         :param capture: captured packets
2106         :param compare_func: function to compare in and out packet
2107         """
2108         self.logger.info("Verifying capture on interface %s using function %s"
2109                          % (dst_if.name, compare_func.__name__))
2110
2111         last_info = dict()
2112         for i in self.pg_interfaces:
2113             last_info[i.sw_if_index] = None
2114         dst_sw_if_index = dst_if.sw_if_index
2115
2116         for packet in capture:
2117             try:
2118                 # extract payload_info from packet's payload
2119                 payload_info = self.get_payload_info(packet)
2120                 packet_index = payload_info.index
2121
2122                 self.logger.debug("Verifying packet with index %d"
2123                                   % (packet_index))
2124                 # packet should have arrived on the expected interface
2125                 self.assertEqual(payload_info.dst, dst_sw_if_index)
2126                 self.logger.debug(
2127                     "Got packet on interface %s: src=%u (idx=%u)" %
2128                     (dst_if.name, payload_info.src, packet_index))
2129
2130                 # search for payload_info with same src and dst if_index
2131                 # this will give us the transmitted packet
2132                 next_info = self.get_next_packet_info_for_interface2(
2133                     payload_info.src, dst_sw_if_index,
2134                     last_info[payload_info.src])
2135                 last_info[payload_info.src] = next_info
2136                 # next_info should not be None
2137                 self.assertTrue(next_info is not None)
2138                 # index of tx and rx packets should be equal
2139                 self.assertEqual(packet_index, next_info.index)
2140                 # data field of next_info contains the tx packet
2141                 txed_packet = next_info.data
2142
2143                 self.logger.debug(ppp("Transmitted packet:",
2144                                       txed_packet))  # ppp=Pretty Print Packet
2145
2146                 self.logger.debug(ppp("Received packet:", packet))
2147
2148                 # compare rcvd packet with expected packet using compare_func
2149                 compare_func(txed_packet, packet)
2150
2151             except:
2152                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2153                 raise
2154
2155         # have all expected packets arrived?
2156         for i in self.pg_interfaces:
2157             remaining_packet = self.get_next_packet_info_for_interface2(
2158                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
2159             self.assertTrue(remaining_packet is None,
2160                             "Interface %s: Packet expected from interface %s "
2161                             "didn't arrive" % (dst_if.name, i.name))
2162
2163
2164 if __name__ == '__main__':
2165     unittest.main(testRunner=VppTestRunner)