fib: fib api updates
[vpp.git] / test / test_srv6.py
1 #!/usr/bin/env python
2
3 import unittest
4 import binascii
5 from socket import AF_INET6
6
7 from framework import VppTestCase, VppTestRunner
8 from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathProto, VppIpTable
9 from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \
10     SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes
11
12 import scapy.compat
13 from scapy.packet import Raw
14 from scapy.layers.l2 import Ether, Dot1Q
15 from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
16 from scapy.layers.inet import IP, UDP
17
18 from scapy.utils import inet_pton, inet_ntop
19
20 from util import ppp
21
22
23 class TestSRv6(VppTestCase):
24     """ SRv6 Test Case """
25
26     @classmethod
27     def setUpClass(cls):
28         super(TestSRv6, cls).setUpClass()
29
30     @classmethod
31     def tearDownClass(cls):
32         super(TestSRv6, cls).tearDownClass()
33
34     def setUp(self):
35         """ Perform test setup before each test case.
36         """
37         super(TestSRv6, self).setUp()
38
39         # packet sizes, inclusive L2 overhead
40         self.pg_packet_sizes = [64, 512, 1518, 9018]
41
42         # reset packet_infos
43         self.reset_packet_infos()
44
45     def tearDown(self):
46         """ Clean up test setup after each test case.
47         """
48         self.teardown_interfaces()
49
50         super(TestSRv6, self).tearDown()
51
52     def configure_interface(self,
53                             interface,
54                             ipv6=False, ipv4=False,
55                             ipv6_table_id=0, ipv4_table_id=0):
56         """ Configure interface.
57         :param ipv6: configure IPv6 on interface
58         :param ipv4: configure IPv4 on interface
59         :param ipv6_table_id: FIB table_id for IPv6
60         :param ipv4_table_id: FIB table_id for IPv4
61         """
62         self.logger.debug("Configuring interface %s" % (interface.name))
63         if ipv6:
64             self.logger.debug("Configuring IPv6")
65             interface.set_table_ip6(ipv6_table_id)
66             interface.config_ip6()
67             interface.resolve_ndp(timeout=5)
68         if ipv4:
69             self.logger.debug("Configuring IPv4")
70             interface.set_table_ip4(ipv4_table_id)
71             interface.config_ip4()
72             interface.resolve_arp()
73         interface.admin_up()
74
75     def setup_interfaces(self, ipv6=[], ipv4=[],
76                          ipv6_table_id=[], ipv4_table_id=[]):
77         """ Create and configure interfaces.
78
79         :param ipv6: list of interface IPv6 capabilities
80         :param ipv4: list of interface IPv4 capabilities
81         :param ipv6_table_id: list of intf IPv6 FIB table_ids
82         :param ipv4_table_id: list of intf IPv4 FIB table_ids
83         :returns: List of created interfaces.
84         """
85         # how many interfaces?
86         if len(ipv6):
87             count = len(ipv6)
88         else:
89             count = len(ipv4)
90         self.logger.debug("Creating and configuring %d interfaces" % (count))
91
92         # fill up ipv6 and ipv4 lists if needed
93         # not enabled (False) is the default
94         if len(ipv6) < count:
95             ipv6 += (count - len(ipv6)) * [False]
96         if len(ipv4) < count:
97             ipv4 += (count - len(ipv4)) * [False]
98
99         # fill up table_id lists if needed
100         # table_id 0 (global) is the default
101         if len(ipv6_table_id) < count:
102             ipv6_table_id += (count - len(ipv6_table_id)) * [0]
103         if len(ipv4_table_id) < count:
104             ipv4_table_id += (count - len(ipv4_table_id)) * [0]
105
106         # create 'count' pg interfaces
107         self.create_pg_interfaces(range(count))
108
109         # setup all interfaces
110         for i in range(count):
111             intf = self.pg_interfaces[i]
112             self.configure_interface(intf,
113                                      ipv6[i], ipv4[i],
114                                      ipv6_table_id[i], ipv4_table_id[i])
115
116         if any(ipv6):
117             self.logger.debug(self.vapi.cli("show ip6 neighbors"))
118         if any(ipv4):
119             self.logger.debug(self.vapi.cli("show ip arp"))
120         self.logger.debug(self.vapi.cli("show interface"))
121         self.logger.debug(self.vapi.cli("show hardware"))
122
123         return self.pg_interfaces
124
125     def teardown_interfaces(self):
126         """ Unconfigure and bring down interface.
127         """
128         self.logger.debug("Tearing down interfaces")
129         # tear down all interfaces
130         # AFAIK they cannot be deleted
131         for i in self.pg_interfaces:
132             self.logger.debug("Tear down interface %s" % (i.name))
133             i.admin_down()
134             i.unconfig()
135             i.set_table_ip4(0)
136             i.set_table_ip6(0)
137
138     @unittest.skipUnless(0, "PC to fix")
139     def test_SRv6_T_Encaps(self):
140         """ Test SRv6 Transit.Encaps behavior for IPv6.
141         """
142         # send traffic to one destination interface
143         # source and destination are IPv6 only
144         self.setup_interfaces(ipv6=[True, True])
145
146         # configure FIB entries
147         route = VppIpRoute(self, "a4::", 64,
148                            [VppRoutePath(self.pg1.remote_ip6,
149                                          self.pg1.sw_if_index)])
150         route.add_vpp_config()
151
152         # configure encaps IPv6 source address
153         # needs to be done before SR Policy config
154         # TODO: API?
155         self.vapi.cli("set sr encaps source addr a3::")
156
157         bsid = 'a3::9999:1'
158         # configure SRv6 Policy
159         # Note: segment list order: first -> last
160         sr_policy = VppSRv6Policy(
161             self, bsid=bsid,
162             is_encap=1,
163             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
164             weight=1, fib_table=0,
165             segments=['a4::', 'a5::', 'a6::c7'],
166             source='a3::')
167         sr_policy.add_vpp_config()
168         self.sr_policy = sr_policy
169
170         # log the sr policies
171         self.logger.info(self.vapi.cli("show sr policies"))
172
173         # steer IPv6 traffic to a7::/64 into SRv6 Policy
174         # use the bsid of the above self.sr_policy
175         pol_steering = VppSRv6Steering(
176                         self,
177                         bsid=self.sr_policy.bsid,
178                         prefix="a7::", mask_width=64,
179                         traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
180                         sr_policy_index=0, table_id=0,
181                         sw_if_index=0)
182         pol_steering.add_vpp_config()
183
184         # log the sr steering policies
185         self.logger.info(self.vapi.cli("show sr steering policies"))
186
187         # create packets
188         count = len(self.pg_packet_sizes)
189         dst_inner = 'a7::1234'
190         pkts = []
191
192         # create IPv6 packets without SRH
193         packet_header = self.create_packet_header_IPv6(dst_inner)
194         # create traffic stream pg0->pg1
195         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
196                                        self.pg_packet_sizes, count))
197
198         # create IPv6 packets with SRH
199         # packets with segments-left 1, active segment a7::
200         packet_header = self.create_packet_header_IPv6_SRH(
201             sidlist=['a8::', 'a7::', 'a6::'],
202             segleft=1)
203         # create traffic stream pg0->pg1
204         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
205                                        self.pg_packet_sizes, count))
206
207         # create IPv6 packets with SRH and IPv6
208         # packets with segments-left 1, active segment a7::
209         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
210             dst_inner,
211             sidlist=['a8::', 'a7::', 'a6::'],
212             segleft=1)
213         # create traffic stream pg0->pg1
214         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
215                                        self.pg_packet_sizes, count))
216
217         # send packets and verify received packets
218         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
219                                   self.compare_rx_tx_packet_T_Encaps)
220
221         # log the localsid counters
222         self.logger.info(self.vapi.cli("show sr localsid"))
223
224         # remove SR steering
225         pol_steering.remove_vpp_config()
226         self.logger.info(self.vapi.cli("show sr steering policies"))
227
228         # remove SR Policies
229         self.sr_policy.remove_vpp_config()
230         self.logger.info(self.vapi.cli("show sr policies"))
231
232         # remove FIB entries
233         # done by tearDown
234
235         # cleanup interfaces
236         self.teardown_interfaces()
237
238     @unittest.skipUnless(0, "PC to fix")
239     def test_SRv6_T_Insert(self):
240         """ Test SRv6 Transit.Insert behavior (IPv6 only).
241         """
242         # send traffic to one destination interface
243         # source and destination are IPv6 only
244         self.setup_interfaces(ipv6=[True, True])
245
246         # configure FIB entries
247         route = VppIpRoute(self, "a4::", 64,
248                            [VppRoutePath(self.pg1.remote_ip6,
249                                          self.pg1.sw_if_index)])
250         route.add_vpp_config()
251
252         # configure encaps IPv6 source address
253         # needs to be done before SR Policy config
254         # TODO: API?
255         self.vapi.cli("set sr encaps source addr a3::")
256
257         bsid = 'a3::9999:1'
258         # configure SRv6 Policy
259         # Note: segment list order: first -> last
260         sr_policy = VppSRv6Policy(
261             self, bsid=bsid,
262             is_encap=0,
263             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
264             weight=1, fib_table=0,
265             segments=['a4::', 'a5::', 'a6::c7'],
266             source='a3::')
267         sr_policy.add_vpp_config()
268         self.sr_policy = sr_policy
269
270         # log the sr policies
271         self.logger.info(self.vapi.cli("show sr policies"))
272
273         # steer IPv6 traffic to a7::/64 into SRv6 Policy
274         # use the bsid of the above self.sr_policy
275         pol_steering = VppSRv6Steering(
276                         self,
277                         bsid=self.sr_policy.bsid,
278                         prefix="a7::", mask_width=64,
279                         traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
280                         sr_policy_index=0, table_id=0,
281                         sw_if_index=0)
282         pol_steering.add_vpp_config()
283
284         # log the sr steering policies
285         self.logger.info(self.vapi.cli("show sr steering policies"))
286
287         # create packets
288         count = len(self.pg_packet_sizes)
289         dst_inner = 'a7::1234'
290         pkts = []
291
292         # create IPv6 packets without SRH
293         packet_header = self.create_packet_header_IPv6(dst_inner)
294         # create traffic stream pg0->pg1
295         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
296                                        self.pg_packet_sizes, count))
297
298         # create IPv6 packets with SRH
299         # packets with segments-left 1, active segment a7::
300         packet_header = self.create_packet_header_IPv6_SRH(
301             sidlist=['a8::', 'a7::', 'a6::'],
302             segleft=1)
303         # create traffic stream pg0->pg1
304         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
305                                        self.pg_packet_sizes, count))
306
307         # send packets and verify received packets
308         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
309                                   self.compare_rx_tx_packet_T_Insert)
310
311         # log the localsid counters
312         self.logger.info(self.vapi.cli("show sr localsid"))
313
314         # remove SR steering
315         pol_steering.remove_vpp_config()
316         self.logger.info(self.vapi.cli("show sr steering policies"))
317
318         # remove SR Policies
319         self.sr_policy.remove_vpp_config()
320         self.logger.info(self.vapi.cli("show sr policies"))
321
322         # remove FIB entries
323         # done by tearDown
324
325         # cleanup interfaces
326         self.teardown_interfaces()
327
328     @unittest.skipUnless(0, "PC to fix")
329     def test_SRv6_T_Encaps_IPv4(self):
330         """ Test SRv6 Transit.Encaps behavior for IPv4.
331         """
332         # send traffic to one destination interface
333         # source interface is IPv4 only
334         # destination interface is IPv6 only
335         self.setup_interfaces(ipv6=[False, True], ipv4=[True, False])
336
337         # configure FIB entries
338         route = VppIpRoute(self, "a4::", 64,
339                            [VppRoutePath(self.pg1.remote_ip6,
340                                          self.pg1.sw_if_index)])
341         route.add_vpp_config()
342
343         # configure encaps IPv6 source address
344         # needs to be done before SR Policy config
345         # TODO: API?
346         self.vapi.cli("set sr encaps source addr a3::")
347
348         bsid = 'a3::9999:1'
349         # configure SRv6 Policy
350         # Note: segment list order: first -> last
351         sr_policy = VppSRv6Policy(
352             self, bsid=bsid,
353             is_encap=1,
354             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
355             weight=1, fib_table=0,
356             segments=['a4::', 'a5::', 'a6::c7'],
357             source='a3::')
358         sr_policy.add_vpp_config()
359         self.sr_policy = sr_policy
360
361         # log the sr policies
362         self.logger.info(self.vapi.cli("show sr policies"))
363
364         # steer IPv4 traffic to 7.1.1.0/24 into SRv6 Policy
365         # use the bsid of the above self.sr_policy
366         pol_steering = VppSRv6Steering(
367                         self,
368                         bsid=self.sr_policy.bsid,
369                         prefix="7.1.1.0", mask_width=24,
370                         traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV4,
371                         sr_policy_index=0, table_id=0,
372                         sw_if_index=0)
373         pol_steering.add_vpp_config()
374
375         # log the sr steering policies
376         self.logger.info(self.vapi.cli("show sr steering policies"))
377
378         # create packets
379         count = len(self.pg_packet_sizes)
380         dst_inner = '7.1.1.123'
381         pkts = []
382
383         # create IPv4 packets
384         packet_header = self.create_packet_header_IPv4(dst_inner)
385         # create traffic stream pg0->pg1
386         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
387                                        self.pg_packet_sizes, count))
388
389         # send packets and verify received packets
390         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
391                                   self.compare_rx_tx_packet_T_Encaps_IPv4)
392
393         # log the localsid counters
394         self.logger.info(self.vapi.cli("show sr localsid"))
395
396         # remove SR steering
397         pol_steering.remove_vpp_config()
398         self.logger.info(self.vapi.cli("show sr steering policies"))
399
400         # remove SR Policies
401         self.sr_policy.remove_vpp_config()
402         self.logger.info(self.vapi.cli("show sr policies"))
403
404         # remove FIB entries
405         # done by tearDown
406
407         # cleanup interfaces
408         self.teardown_interfaces()
409
410     @unittest.skip("VPP crashes after running this test")
411     def test_SRv6_T_Encaps_L2(self):
412         """ Test SRv6 Transit.Encaps behavior for L2.
413         """
414         # send traffic to one destination interface
415         # source interface is IPv4 only TODO?
416         # destination interface is IPv6 only
417         self.setup_interfaces(ipv6=[False, True], ipv4=[False, False])
418
419         # configure FIB entries
420         route = VppIpRoute(self, "a4::", 64,
421                            [VppRoutePath(self.pg1.remote_ip6,
422                                          self.pg1.sw_if_index)])
423         route.add_vpp_config()
424
425         # configure encaps IPv6 source address
426         # needs to be done before SR Policy config
427         # TODO: API?
428         self.vapi.cli("set sr encaps source addr a3::")
429
430         bsid = 'a3::9999:1'
431         # configure SRv6 Policy
432         # Note: segment list order: first -> last
433         sr_policy = VppSRv6Policy(
434             self, bsid=bsid,
435             is_encap=1,
436             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
437             weight=1, fib_table=0,
438             segments=['a4::', 'a5::', 'a6::c7'],
439             source='a3::')
440         sr_policy.add_vpp_config()
441         self.sr_policy = sr_policy
442
443         # log the sr policies
444         self.logger.info(self.vapi.cli("show sr policies"))
445
446         # steer L2 traffic into SRv6 Policy
447         # use the bsid of the above self.sr_policy
448         pol_steering = VppSRv6Steering(
449                         self,
450                         bsid=self.sr_policy.bsid,
451                         prefix="::", mask_width=0,
452                         traffic_type=SRv6PolicySteeringTypes.SR_STEER_L2,
453                         sr_policy_index=0, table_id=0,
454                         sw_if_index=self.pg0.sw_if_index)
455         pol_steering.add_vpp_config()
456
457         # log the sr steering policies
458         self.logger.info(self.vapi.cli("show sr steering policies"))
459
460         # create packets
461         count = len(self.pg_packet_sizes)
462         pkts = []
463
464         # create L2 packets without dot1q header
465         packet_header = self.create_packet_header_L2()
466         # create traffic stream pg0->pg1
467         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
468                                        self.pg_packet_sizes, count))
469
470         # create L2 packets with dot1q header
471         packet_header = self.create_packet_header_L2(vlan=123)
472         # create traffic stream pg0->pg1
473         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
474                                        self.pg_packet_sizes, count))
475
476         # send packets and verify received packets
477         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
478                                   self.compare_rx_tx_packet_T_Encaps_L2)
479
480         # log the localsid counters
481         self.logger.info(self.vapi.cli("show sr localsid"))
482
483         # remove SR steering
484         pol_steering.remove_vpp_config()
485         self.logger.info(self.vapi.cli("show sr steering policies"))
486
487         # remove SR Policies
488         self.sr_policy.remove_vpp_config()
489         self.logger.info(self.vapi.cli("show sr policies"))
490
491         # remove FIB entries
492         # done by tearDown
493
494         # cleanup interfaces
495         self.teardown_interfaces()
496
497     def test_SRv6_End(self):
498         """ Test SRv6 End (without PSP) behavior.
499         """
500         # send traffic to one destination interface
501         # source and destination interfaces are IPv6 only
502         self.setup_interfaces(ipv6=[True, True])
503
504         # configure FIB entries
505         route = VppIpRoute(self, "a4::", 64,
506                            [VppRoutePath(self.pg1.remote_ip6,
507                                          self.pg1.sw_if_index)])
508         route.add_vpp_config()
509
510         # configure SRv6 localSID End without PSP behavior
511         localsid = VppSRv6LocalSID(
512                         self, localsid={'addr': 'A3::0'},
513                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
514                         nh_addr4='0.0.0.0',
515                         nh_addr6='::',
516                         end_psp=0,
517                         sw_if_index=0,
518                         vlan_index=0,
519                         fib_table=0)
520         localsid.add_vpp_config()
521         # log the localsids
522         self.logger.debug(self.vapi.cli("show sr localsid"))
523
524         # create IPv6 packets with SRH (SL=2, SL=1, SL=0)
525         # send one packet per SL value per packet size
526         # SL=0 packet with localSID End with USP needs 2nd SRH
527         count = len(self.pg_packet_sizes)
528         dst_inner = 'a4::1234'
529         pkts = []
530
531         # packets with segments-left 2, active segment a3::
532         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
533                 dst_inner,
534                 sidlist=['a5::', 'a4::', 'a3::'],
535                 segleft=2)
536         # create traffic stream pg0->pg1
537         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
538                                        self.pg_packet_sizes, count))
539
540         # packets with segments-left 1, active segment a3::
541         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
542                 dst_inner,
543                 sidlist=['a4::', 'a3::', 'a2::'],
544                 segleft=1)
545         # add to traffic stream pg0->pg1
546         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
547                                        self.pg_packet_sizes, count))
548
549         # TODO: test behavior with SL=0 packet (needs 2*SRH?)
550
551         # send packets and verify received packets
552         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
553                                   self.compare_rx_tx_packet_End)
554
555         # log the localsid counters
556         self.logger.info(self.vapi.cli("show sr localsid"))
557
558         # remove SRv6 localSIDs
559         localsid.remove_vpp_config()
560
561         # remove FIB entries
562         # done by tearDown
563
564         # cleanup interfaces
565         self.teardown_interfaces()
566
567     def test_SRv6_End_with_PSP(self):
568         """ Test SRv6 End with PSP behavior.
569         """
570         # send traffic to one destination interface
571         # source and destination interfaces are IPv6 only
572         self.setup_interfaces(ipv6=[True, True])
573
574         # configure FIB entries
575         route = VppIpRoute(self, "a4::", 64,
576                            [VppRoutePath(self.pg1.remote_ip6,
577                                          self.pg1.sw_if_index)])
578         route.add_vpp_config()
579
580         # configure SRv6 localSID End with PSP behavior
581         localsid = VppSRv6LocalSID(
582                         self, localsid={'addr': 'A3::0'},
583                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
584                         nh_addr4='0.0.0.0',
585                         nh_addr6='::',
586                         end_psp=1,
587                         sw_if_index=0,
588                         vlan_index=0,
589                         fib_table=0)
590         localsid.add_vpp_config()
591         # log the localsids
592         self.logger.debug(self.vapi.cli("show sr localsid"))
593
594         # create IPv6 packets with SRH (SL=2, SL=1)
595         # send one packet per SL value per packet size
596         # SL=0 packet with localSID End with PSP is dropped
597         count = len(self.pg_packet_sizes)
598         dst_inner = 'a4::1234'
599         pkts = []
600
601         # packets with segments-left 2, active segment a3::
602         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
603                     dst_inner,
604                     sidlist=['a5::', 'a4::', 'a3::'],
605                     segleft=2)
606         # create traffic stream pg0->pg1
607         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
608                                        self.pg_packet_sizes, count))
609
610         # packets with segments-left 1, active segment a3::
611         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
612                     dst_inner,
613                     sidlist=['a4::', 'a3::', 'a2::'],
614                     segleft=1)
615         # add to traffic stream pg0->pg1
616         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
617                                        self.pg_packet_sizes, count))
618
619         # send packets and verify received packets
620         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
621                                   self.compare_rx_tx_packet_End_PSP)
622
623         # log the localsid counters
624         self.logger.info(self.vapi.cli("show sr localsid"))
625
626         # remove SRv6 localSIDs
627         localsid.remove_vpp_config()
628
629         # remove FIB entries
630         # done by tearDown
631
632         # cleanup interfaces
633         self.teardown_interfaces()
634
635     def test_SRv6_End_X(self):
636         """ Test SRv6 End.X (without PSP) behavior.
637         """
638         # create three interfaces (1 source, 2 destinations)
639         # source and destination interfaces are IPv6 only
640         self.setup_interfaces(ipv6=[True, True, True])
641
642         # configure FIB entries
643         # a4::/64 via pg1 and pg2
644         route = VppIpRoute(self, "a4::", 64,
645                            [VppRoutePath(self.pg1.remote_ip6,
646                                          self.pg1.sw_if_index),
647                             VppRoutePath(self.pg2.remote_ip6,
648                                          self.pg2.sw_if_index)])
649         route.add_vpp_config()
650         self.logger.debug(self.vapi.cli("show ip6 fib"))
651
652         # configure SRv6 localSID End.X without PSP behavior
653         # End.X points to interface pg1
654         localsid = VppSRv6LocalSID(
655                         self, localsid={'addr': 'A3::C4'},
656                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
657                         nh_addr4='0.0.0.0',
658                         nh_addr6=self.pg1.remote_ip6,
659                         end_psp=0,
660                         sw_if_index=self.pg1.sw_if_index,
661                         vlan_index=0,
662                         fib_table=0)
663         localsid.add_vpp_config()
664         # log the localsids
665         self.logger.debug(self.vapi.cli("show sr localsid"))
666
667         # create IPv6 packets with SRH (SL=2, SL=1)
668         # send one packet per SL value per packet size
669         # SL=0 packet with localSID End with PSP is dropped
670         count = len(self.pg_packet_sizes)
671         dst_inner = 'a4::1234'
672         pkts = []
673
674         # packets with segments-left 2, active segment a3::c4
675         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
676                     dst_inner,
677                     sidlist=['a5::', 'a4::', 'a3::c4'],
678                     segleft=2)
679         # create traffic stream pg0->pg1
680         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
681                                        self.pg_packet_sizes, count))
682
683         # packets with segments-left 1, active segment a3::c4
684         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
685                     dst_inner,
686                     sidlist=['a4::', 'a3::c4', 'a2::'],
687                     segleft=1)
688         # add to traffic stream pg0->pg1
689         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
690                                        self.pg_packet_sizes, count))
691
692         # send packets and verify received packets
693         # using same comparison function as End (no PSP)
694         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
695                                   self.compare_rx_tx_packet_End)
696
697         # assert nothing was received on the other interface (pg2)
698         self.pg2.assert_nothing_captured("mis-directed packet(s)")
699
700         # log the localsid counters
701         self.logger.info(self.vapi.cli("show sr localsid"))
702
703         # remove SRv6 localSIDs
704         localsid.remove_vpp_config()
705
706         # remove FIB entries
707         # done by tearDown
708
709         # cleanup interfaces
710         self.teardown_interfaces()
711
712     def test_SRv6_End_X_with_PSP(self):
713         """ Test SRv6 End.X with PSP behavior.
714         """
715         # create three interfaces (1 source, 2 destinations)
716         # source and destination interfaces are IPv6 only
717         self.setup_interfaces(ipv6=[True, True, True])
718
719         # configure FIB entries
720         # a4::/64 via pg1 and pg2
721         route = VppIpRoute(self, "a4::", 64,
722                            [VppRoutePath(
723                                self.pg1.remote_ip6,
724                                self.pg1.sw_if_index),
725                             VppRoutePath(self.pg2.remote_ip6,
726                                          self.pg2.sw_if_index)])
727         route.add_vpp_config()
728
729         # configure SRv6 localSID End with PSP behavior
730         localsid = VppSRv6LocalSID(
731                         self, localsid={'addr': 'A3::C4'},
732                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
733                         nh_addr4='0.0.0.0',
734                         nh_addr6=self.pg1.remote_ip6,
735                         end_psp=1,
736                         sw_if_index=self.pg1.sw_if_index,
737                         vlan_index=0,
738                         fib_table=0)
739         localsid.add_vpp_config()
740         # log the localsids
741         self.logger.debug(self.vapi.cli("show sr localsid"))
742
743         # create IPv6 packets with SRH (SL=2, SL=1)
744         # send one packet per SL value per packet size
745         # SL=0 packet with localSID End with PSP is dropped
746         count = len(self.pg_packet_sizes)
747         dst_inner = 'a4::1234'
748         pkts = []
749
750         # packets with segments-left 2, active segment a3::
751         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
752                     dst_inner,
753                     sidlist=['a5::', 'a4::', 'a3::c4'],
754                     segleft=2)
755         # create traffic stream pg0->pg1
756         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
757                                        self.pg_packet_sizes, count))
758
759         # packets with segments-left 1, active segment a3::
760         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
761                     dst_inner,
762                     sidlist=['a4::', 'a3::c4', 'a2::'],
763                     segleft=1)
764         # add to traffic stream pg0->pg1
765         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
766                                        self.pg_packet_sizes, count))
767
768         # send packets and verify received packets
769         # using same comparison function as End with PSP
770         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
771                                   self.compare_rx_tx_packet_End_PSP)
772
773         # assert nothing was received on the other interface (pg2)
774         self.pg2.assert_nothing_captured("mis-directed packet(s)")
775
776         # log the localsid counters
777         self.logger.info(self.vapi.cli("show sr localsid"))
778
779         # remove SRv6 localSIDs
780         localsid.remove_vpp_config()
781
782         # remove FIB entries
783         # done by tearDown
784
785         # cleanup interfaces
786         self.teardown_interfaces()
787
788     def test_SRv6_End_DX6(self):
789         """ Test SRv6 End.DX6 behavior.
790         """
791         # send traffic to one destination interface
792         # source and destination interfaces are IPv6 only
793         self.setup_interfaces(ipv6=[True, True])
794
795         # configure SRv6 localSID End.DX6 behavior
796         localsid = VppSRv6LocalSID(
797                         self, localsid={'addr': 'A3::C4'},
798                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX6,
799                         nh_addr4='0.0.0.0',
800                         nh_addr6=self.pg1.remote_ip6,
801                         end_psp=0,
802                         sw_if_index=self.pg1.sw_if_index,
803                         vlan_index=0,
804                         fib_table=0)
805         localsid.add_vpp_config()
806         # log the localsids
807         self.logger.debug(self.vapi.cli("show sr localsid"))
808
809         # create IPv6 packets with SRH (SL=0)
810         # send one packet per packet size
811         count = len(self.pg_packet_sizes)
812         dst_inner = 'a4::1234'  # inner header destination address
813         pkts = []
814
815         # packets with SRH, segments-left 0, active segment a3::c4
816         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
817                         dst_inner,
818                         sidlist=['a3::c4', 'a2::', 'a1::'],
819                         segleft=0)
820         # add to traffic stream pg0->pg1
821         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
822                                        self.pg_packet_sizes, count))
823
824         # packets without SRH, IPv6 in IPv6
825         # outer IPv6 dest addr is the localsid End.DX6
826         packet_header = self.create_packet_header_IPv6_IPv6(
827                                             dst_inner,
828                                             dst_outer='a3::c4')
829         # add to traffic stream pg0->pg1
830         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
831                                        self.pg_packet_sizes, count))
832
833         # send packets and verify received packets
834         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
835                                   self.compare_rx_tx_packet_End_DX6)
836
837         # log the localsid counters
838         self.logger.info(self.vapi.cli("show sr localsid"))
839
840         # remove SRv6 localSIDs
841         localsid.remove_vpp_config()
842
843         # cleanup interfaces
844         self.teardown_interfaces()
845
846     def test_SRv6_End_DT6(self):
847         """ Test SRv6 End.DT6 behavior.
848         """
849         # create three interfaces (1 source, 2 destinations)
850         # all interfaces are IPv6 only
851         # source interface in global FIB (0)
852         # destination interfaces in global and vrf
853         vrf_1 = 1
854         ipt = VppIpTable(self, vrf_1, is_ip6=True)
855         ipt.add_vpp_config()
856         self.setup_interfaces(ipv6=[True, True, True],
857                               ipv6_table_id=[0, 0, vrf_1])
858
859         # configure FIB entries
860         # a4::/64 is reachable
861         #     via pg1 in table 0 (global)
862         #     and via pg2 in table vrf_1
863         route0 = VppIpRoute(self, "a4::", 64,
864                             [VppRoutePath(self.pg1.remote_ip6,
865                                           self.pg1.sw_if_index,
866                                           nh_table_id=0)],
867                             table_id=0)
868         route0.add_vpp_config()
869         route1 = VppIpRoute(self, "a4::", 64,
870                             [VppRoutePath(self.pg2.remote_ip6,
871                                           self.pg2.sw_if_index,
872                                           nh_table_id=vrf_1)],
873                             table_id=vrf_1)
874         route1.add_vpp_config()
875         self.logger.debug(self.vapi.cli("show ip6 fib"))
876
877         # configure SRv6 localSID End.DT6 behavior
878         # Note:
879         # fib_table: where the localsid is installed
880         # sw_if_index: in T-variants of localsid this is the vrf table_id
881         localsid = VppSRv6LocalSID(
882                         self, localsid={'addr': 'A3::C4'},
883                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT6,
884                         nh_addr4='0.0.0.0',
885                         nh_addr6='::',
886                         end_psp=0,
887                         sw_if_index=vrf_1,
888                         vlan_index=0,
889                         fib_table=0)
890         localsid.add_vpp_config()
891         # log the localsids
892         self.logger.debug(self.vapi.cli("show sr localsid"))
893
894         # create IPv6 packets with SRH (SL=0)
895         # send one packet per packet size
896         count = len(self.pg_packet_sizes)
897         dst_inner = 'a4::1234'  # inner header destination address
898         pkts = []
899
900         # packets with SRH, segments-left 0, active segment a3::c4
901         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
902                         dst_inner,
903                         sidlist=['a3::c4', 'a2::', 'a1::'],
904                         segleft=0)
905         # add to traffic stream pg0->pg1
906         pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
907                                        self.pg_packet_sizes, count))
908
909         # packets without SRH, IPv6 in IPv6
910         # outer IPv6 dest addr is the localsid End.DT6
911         packet_header = self.create_packet_header_IPv6_IPv6(
912                                             dst_inner,
913                                             dst_outer='a3::c4')
914         # add to traffic stream pg0->pg1
915         pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
916                                        self.pg_packet_sizes, count))
917
918         # send packets and verify received packets
919         # using same comparison function as End.DX6
920         self.send_and_verify_pkts(self.pg0, pkts, self.pg2,
921                                   self.compare_rx_tx_packet_End_DX6)
922
923         # assert nothing was received on the other interface (pg2)
924         self.pg1.assert_nothing_captured("mis-directed packet(s)")
925
926         # log the localsid counters
927         self.logger.info(self.vapi.cli("show sr localsid"))
928
929         # remove SRv6 localSIDs
930         localsid.remove_vpp_config()
931
932         # remove FIB entries
933         # done by tearDown
934
935         # cleanup interfaces
936         self.teardown_interfaces()
937
938     def test_SRv6_End_DX4(self):
939         """ Test SRv6 End.DX4 behavior.
940         """
941         # send traffic to one destination interface
942         # source interface is IPv6 only
943         # destination interface is IPv4 only
944         self.setup_interfaces(ipv6=[True, False], ipv4=[False, True])
945
946         # configure SRv6 localSID End.DX4 behavior
947         localsid = VppSRv6LocalSID(
948                         self, localsid={'addr': 'A3::C4'},
949                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX4,
950                         nh_addr4=self.pg1.remote_ip4,
951                         nh_addr6='::',
952                         end_psp=0,
953                         sw_if_index=self.pg1.sw_if_index,
954                         vlan_index=0,
955                         fib_table=0)
956         localsid.add_vpp_config()
957         # log the localsids
958         self.logger.debug(self.vapi.cli("show sr localsid"))
959
960         # send one packet per packet size
961         count = len(self.pg_packet_sizes)
962         dst_inner = '4.1.1.123'  # inner header destination address
963         pkts = []
964
965         # packets with SRH, segments-left 0, active segment a3::c4
966         packet_header = self.create_packet_header_IPv6_SRH_IPv4(
967                         dst_inner,
968                         sidlist=['a3::c4', 'a2::', 'a1::'],
969                         segleft=0)
970         # add to traffic stream pg0->pg1
971         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
972                                        self.pg_packet_sizes, count))
973
974         # packets without SRH, IPv4 in IPv6
975         # outer IPv6 dest addr is the localsid End.DX4
976         packet_header = self.create_packet_header_IPv6_IPv4(
977                                             dst_inner,
978                                             dst_outer='a3::c4')
979         # add to traffic stream pg0->pg1
980         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
981                                        self.pg_packet_sizes, count))
982
983         # send packets and verify received packets
984         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
985                                   self.compare_rx_tx_packet_End_DX4)
986
987         # log the localsid counters
988         self.logger.info(self.vapi.cli("show sr localsid"))
989
990         # remove SRv6 localSIDs
991         localsid.remove_vpp_config()
992
993         # cleanup interfaces
994         self.teardown_interfaces()
995
996     def test_SRv6_End_DT4(self):
997         """ Test SRv6 End.DT4 behavior.
998         """
999         # create three interfaces (1 source, 2 destinations)
1000         # source interface is IPv6-only
1001         # destination interfaces are IPv4 only
1002         # source interface in global FIB (0)
1003         # destination interfaces in global and vrf
1004         vrf_1 = 1
1005         ipt = VppIpTable(self, vrf_1)
1006         ipt.add_vpp_config()
1007         self.setup_interfaces(ipv6=[True, False, False],
1008                               ipv4=[False, True, True],
1009                               ipv6_table_id=[0, 0, 0],
1010                               ipv4_table_id=[0, 0, vrf_1])
1011
1012         # configure FIB entries
1013         # 4.1.1.0/24 is reachable
1014         #     via pg1 in table 0 (global)
1015         #     and via pg2 in table vrf_1
1016         route0 = VppIpRoute(self, "4.1.1.0", 24,
1017                             [VppRoutePath(self.pg1.remote_ip4,
1018                                           self.pg1.sw_if_index,
1019                                           nh_table_id=0)],
1020                             table_id=0)
1021         route0.add_vpp_config()
1022         route1 = VppIpRoute(self, "4.1.1.0", 24,
1023                             [VppRoutePath(self.pg2.remote_ip4,
1024                                           self.pg2.sw_if_index,
1025                                           nh_table_id=vrf_1)],
1026                             table_id=vrf_1)
1027         route1.add_vpp_config()
1028         self.logger.debug(self.vapi.cli("show ip fib"))
1029
1030         # configure SRv6 localSID End.DT6 behavior
1031         # Note:
1032         # fib_table: where the localsid is installed
1033         # sw_if_index: in T-variants of localsid: vrf table_id
1034         localsid = VppSRv6LocalSID(
1035                         self, localsid={'addr': 'A3::C4'},
1036                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT4,
1037                         nh_addr4='0.0.0.0',
1038                         nh_addr6='::',
1039                         end_psp=0,
1040                         sw_if_index=vrf_1,
1041                         vlan_index=0,
1042                         fib_table=0)
1043         localsid.add_vpp_config()
1044         # log the localsids
1045         self.logger.debug(self.vapi.cli("show sr localsid"))
1046
1047         # create IPv6 packets with SRH (SL=0)
1048         # send one packet per packet size
1049         count = len(self.pg_packet_sizes)
1050         dst_inner = '4.1.1.123'  # inner header destination address
1051         pkts = []
1052
1053         # packets with SRH, segments-left 0, active segment a3::c4
1054         packet_header = self.create_packet_header_IPv6_SRH_IPv4(
1055                         dst_inner,
1056                         sidlist=['a3::c4', 'a2::', 'a1::'],
1057                         segleft=0)
1058         # add to traffic stream pg0->pg1
1059         pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
1060                                        self.pg_packet_sizes, count))
1061
1062         # packets without SRH, IPv6 in IPv6
1063         # outer IPv6 dest addr is the localsid End.DX4
1064         packet_header = self.create_packet_header_IPv6_IPv4(
1065                                             dst_inner,
1066                                             dst_outer='a3::c4')
1067         # add to traffic stream pg0->pg1
1068         pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
1069                                        self.pg_packet_sizes, count))
1070
1071         # send packets and verify received packets
1072         # using same comparison function as End.DX4
1073         self.send_and_verify_pkts(self.pg0, pkts, self.pg2,
1074                                   self.compare_rx_tx_packet_End_DX4)
1075
1076         # assert nothing was received on the other interface (pg2)
1077         self.pg1.assert_nothing_captured("mis-directed packet(s)")
1078
1079         # log the localsid counters
1080         self.logger.info(self.vapi.cli("show sr localsid"))
1081
1082         # remove SRv6 localSIDs
1083         localsid.remove_vpp_config()
1084
1085         # remove FIB entries
1086         # done by tearDown
1087
1088         # cleanup interfaces
1089         self.teardown_interfaces()
1090
1091     def test_SRv6_End_DX2(self):
1092         """ Test SRv6 End.DX2 behavior.
1093         """
1094         # send traffic to one destination interface
1095         # source interface is IPv6 only
1096         self.setup_interfaces(ipv6=[True, False], ipv4=[False, False])
1097
1098         # configure SRv6 localSID End.DX2 behavior
1099         localsid = VppSRv6LocalSID(
1100                         self, localsid={'addr': 'A3::C4'},
1101                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX2,
1102                         nh_addr4='0.0.0.0',
1103                         nh_addr6='::',
1104                         end_psp=0,
1105                         sw_if_index=self.pg1.sw_if_index,
1106                         vlan_index=0,
1107                         fib_table=0)
1108         localsid.add_vpp_config()
1109         # log the localsids
1110         self.logger.debug(self.vapi.cli("show sr localsid"))
1111
1112         # send one packet per packet size
1113         count = len(self.pg_packet_sizes)
1114         pkts = []
1115
1116         # packets with SRH, segments-left 0, active segment a3::c4
1117         # L2 has no dot1q header
1118         packet_header = self.create_packet_header_IPv6_SRH_L2(
1119                             sidlist=['a3::c4', 'a2::', 'a1::'],
1120                             segleft=0,
1121                             vlan=0)
1122         # add to traffic stream pg0->pg1
1123         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1124                                        self.pg_packet_sizes, count))
1125
1126         # packets with SRH, segments-left 0, active segment a3::c4
1127         # L2 has dot1q header
1128         packet_header = self.create_packet_header_IPv6_SRH_L2(
1129                             sidlist=['a3::c4', 'a2::', 'a1::'],
1130                             segleft=0,
1131                             vlan=123)
1132         # add to traffic stream pg0->pg1
1133         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1134                                        self.pg_packet_sizes, count))
1135
1136         # packets without SRH, L2 in IPv6
1137         # outer IPv6 dest addr is the localsid End.DX2
1138         # L2 has no dot1q header
1139         packet_header = self.create_packet_header_IPv6_L2(
1140                                             dst_outer='a3::c4',
1141                                             vlan=0)
1142         # add to traffic stream pg0->pg1
1143         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1144                                        self.pg_packet_sizes, count))
1145
1146         # packets without SRH, L2 in IPv6
1147         # outer IPv6 dest addr is the localsid End.DX2
1148         # L2 has dot1q header
1149         packet_header = self.create_packet_header_IPv6_L2(
1150                                             dst_outer='a3::c4',
1151                                             vlan=123)
1152         # add to traffic stream pg0->pg1
1153         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1154                                        self.pg_packet_sizes, count))
1155
1156         # send packets and verify received packets
1157         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
1158                                   self.compare_rx_tx_packet_End_DX2)
1159
1160         # log the localsid counters
1161         self.logger.info(self.vapi.cli("show sr localsid"))
1162
1163         # remove SRv6 localSIDs
1164         localsid.remove_vpp_config()
1165
1166         # cleanup interfaces
1167         self.teardown_interfaces()
1168
1169     @unittest.skipUnless(0, "PC to fix")
1170     def test_SRv6_T_Insert_Classifier(self):
1171         """ Test SRv6 Transit.Insert behavior (IPv6 only).
1172             steer packets using the classifier
1173         """
1174         # send traffic to one destination interface
1175         # source and destination are IPv6 only
1176         self.setup_interfaces(ipv6=[False, False, False, True, True])
1177
1178         # configure FIB entries
1179         route = VppIpRoute(self, "a4::", 64,
1180                            [VppRoutePath(
1181                                self.pg4.remote_ip6,
1182                                self.pg4.sw_if_index)])
1183         route.add_vpp_config()
1184
1185         # configure encaps IPv6 source address
1186         # needs to be done before SR Policy config
1187         # TODO: API?
1188         self.vapi.cli("set sr encaps source addr a3::")
1189
1190         bsid = 'a3::9999:1'
1191         # configure SRv6 Policy
1192         # Note: segment list order: first -> last
1193         sr_policy = VppSRv6Policy(
1194             self, bsid=bsid,
1195             is_encap=0,
1196             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
1197             weight=1, fib_table=0,
1198             segments=['a4::', 'a5::', 'a6::c7'],
1199             source='a3::')
1200         sr_policy.add_vpp_config()
1201         self.sr_policy = sr_policy
1202
1203         # log the sr policies
1204         self.logger.info(self.vapi.cli("show sr policies"))
1205
1206         # add classify table
1207         # mask on dst ip address prefix a7::/8
1208         mask = '{!s:0<16}'.format('ff')
1209         r = self.vapi.classify_add_del_table(
1210             1,
1211             binascii.unhexlify(mask),
1212             match_n_vectors=(len(mask) - 1) // 32 + 1,
1213             current_data_flag=1,
1214             skip_n_vectors=2)  # data offset
1215         self.assertIsNotNone(r, 'No response msg for add_del_table')
1216         table_index = r.new_table_index
1217
1218         # add the source routing node as a ip6 inacl netxt node
1219         r = self.vapi.add_node_next('ip6-inacl',
1220                                     'sr-pl-rewrite-insert')
1221         inacl_next_node_index = r.node_index
1222
1223         match = '{!s:0<16}'.format('a7')
1224         r = self.vapi.classify_add_del_session(
1225             1,
1226             table_index,
1227             binascii.unhexlify(match),
1228             hit_next_index=inacl_next_node_index,
1229             action=3,
1230             metadata=0)  # sr policy index
1231         self.assertIsNotNone(r, 'No response msg for add_del_session')
1232
1233         # log the classify table used in the steering policy
1234         self.logger.info(self.vapi.cli("show classify table"))
1235
1236         r = self.vapi.input_acl_set_interface(
1237             is_add=1,
1238             sw_if_index=self.pg3.sw_if_index,
1239             ip6_table_index=table_index)
1240         self.assertIsNotNone(r,
1241                              'No response msg for input_acl_set_interface')
1242
1243         # log the ip6 inacl
1244         self.logger.info(self.vapi.cli("show inacl type ip6"))
1245
1246         # create packets
1247         count = len(self.pg_packet_sizes)
1248         dst_inner = 'a7::1234'
1249         pkts = []
1250
1251         # create IPv6 packets without SRH
1252         packet_header = self.create_packet_header_IPv6(dst_inner)
1253         # create traffic stream pg3->pg4
1254         pkts.extend(self.create_stream(self.pg3, self.pg4, packet_header,
1255                                        self.pg_packet_sizes, count))
1256
1257         # create IPv6 packets with SRH
1258         # packets with segments-left 1, active segment a7::
1259         packet_header = self.create_packet_header_IPv6_SRH(
1260             sidlist=['a8::', 'a7::', 'a6::'],
1261             segleft=1)
1262         # create traffic stream pg3->pg4
1263         pkts.extend(self.create_stream(self.pg3, self.pg4, packet_header,
1264                                        self.pg_packet_sizes, count))
1265
1266         # send packets and verify received packets
1267         self.send_and_verify_pkts(self.pg3, pkts, self.pg4,
1268                                   self.compare_rx_tx_packet_T_Insert)
1269
1270         # remove the interface l2 input feature
1271         r = self.vapi.input_acl_set_interface(
1272             is_add=0,
1273             sw_if_index=self.pg3.sw_if_index,
1274             ip6_table_index=table_index)
1275         self.assertIsNotNone(r,
1276                              'No response msg for input_acl_set_interface')
1277
1278         # log the ip6 inacl after cleaning
1279         self.logger.info(self.vapi.cli("show inacl type ip6"))
1280
1281         # log the localsid counters
1282         self.logger.info(self.vapi.cli("show sr localsid"))
1283
1284         # remove classifier SR steering
1285         # classifier_steering.remove_vpp_config()
1286         self.logger.info(self.vapi.cli("show sr steering policies"))
1287
1288         # remove SR Policies
1289         self.sr_policy.remove_vpp_config()
1290         self.logger.info(self.vapi.cli("show sr policies"))
1291
1292         # remove classify session and table
1293         r = self.vapi.classify_add_del_session(
1294             0,
1295             table_index,
1296             binascii.unhexlify(match))
1297         self.assertIsNotNone(r, 'No response msg for add_del_session')
1298
1299         r = self.vapi.classify_add_del_table(
1300             0,
1301             binascii.unhexlify(mask),
1302             table_index=table_index)
1303         self.assertIsNotNone(r, 'No response msg for add_del_table')
1304
1305         self.logger.info(self.vapi.cli("show classify table"))
1306
1307         # remove FIB entries
1308         # done by tearDown
1309
1310         # cleanup interfaces
1311         self.teardown_interfaces()
1312
1313     def compare_rx_tx_packet_T_Encaps(self, tx_pkt, rx_pkt):
1314         """ Compare input and output packet after passing T.Encaps
1315
1316         :param tx_pkt: transmitted packet
1317         :param rx_pkt: received packet
1318         """
1319         # T.Encaps updates the headers as follows:
1320         # SR Policy seglist (S3, S2, S1)
1321         # SR Policy source C
1322         # IPv6:
1323         # in: IPv6(A, B2)
1324         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(A, B2)
1325         # IPv6 + SRH:
1326         # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1)
1327         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(a, B2)SRH(B3, B2, B1; SL=1)
1328
1329         # get first (outer) IPv6 header of rx'ed packet
1330         rx_ip = rx_pkt.getlayer(IPv6)
1331         rx_srh = None
1332
1333         tx_ip = tx_pkt.getlayer(IPv6)
1334
1335         # expected segment-list
1336         seglist = self.sr_policy.segments
1337         # reverse list to get order as in SRH
1338         tx_seglist = seglist[::-1]
1339
1340         # get source address of SR Policy
1341         sr_policy_source = self.sr_policy.source
1342
1343         # rx'ed packet should have SRH
1344         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1345         # get SRH
1346         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1347
1348         # received ip.src should be equal to SR Policy source
1349         self.assertEqual(rx_ip.src, sr_policy_source)
1350         # received ip.dst should be equal to expected sidlist[lastentry]
1351         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1352         # rx'ed seglist should be equal to expected seglist
1353         self.assertEqual(rx_srh.addresses, tx_seglist)
1354         # segleft should be equal to size expected seglist-1
1355         self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1356         # segleft should be equal to lastentry
1357         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1358
1359         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1360         # except for the hop-limit field
1361         #   -> update tx'ed hlim to the expected hlim
1362         tx_ip.hlim = tx_ip.hlim - 1
1363
1364         self.assertEqual(rx_srh.payload, tx_ip)
1365
1366         self.logger.debug("packet verification: SUCCESS")
1367
1368     def compare_rx_tx_packet_T_Encaps_IPv4(self, tx_pkt, rx_pkt):
1369         """ Compare input and output packet after passing T.Encaps for IPv4
1370
1371         :param tx_pkt: transmitted packet
1372         :param rx_pkt: received packet
1373         """
1374         # T.Encaps for IPv4 updates the headers as follows:
1375         # SR Policy seglist (S3, S2, S1)
1376         # SR Policy source C
1377         # IPv4:
1378         # in: IPv4(A, B2)
1379         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv4(A, B2)
1380
1381         # get first (outer) IPv6 header of rx'ed packet
1382         rx_ip = rx_pkt.getlayer(IPv6)
1383         rx_srh = None
1384
1385         tx_ip = tx_pkt.getlayer(IP)
1386
1387         # expected segment-list
1388         seglist = self.sr_policy.segments
1389         # reverse list to get order as in SRH
1390         tx_seglist = seglist[::-1]
1391
1392         # get source address of SR Policy
1393         sr_policy_source = self.sr_policy.source
1394
1395         # checks common to cases tx with and without SRH
1396         # rx'ed packet should have SRH and IPv4 header
1397         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1398         self.assertTrue(rx_ip.payload.haslayer(IP))
1399         # get SRH
1400         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1401
1402         # received ip.src should be equal to SR Policy source
1403         self.assertEqual(rx_ip.src, sr_policy_source)
1404         # received ip.dst should be equal to sidlist[lastentry]
1405         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1406         # rx'ed seglist should be equal to seglist
1407         self.assertEqual(rx_srh.addresses, tx_seglist)
1408         # segleft should be equal to size seglist-1
1409         self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1410         # segleft should be equal to lastentry
1411         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1412
1413         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1414         # except for the ttl field and ip checksum
1415         #   -> adjust tx'ed ttl to expected ttl
1416         tx_ip.ttl = tx_ip.ttl - 1
1417         #   -> set tx'ed ip checksum to None and let scapy recompute
1418         tx_ip.chksum = None
1419         # read back the pkt (with str()) to force computing these fields
1420         # probably other ways to accomplish this are possible
1421         tx_ip = IP(scapy.compat.raw(tx_ip))
1422
1423         self.assertEqual(rx_srh.payload, tx_ip)
1424
1425         self.logger.debug("packet verification: SUCCESS")
1426
1427     def compare_rx_tx_packet_T_Encaps_L2(self, tx_pkt, rx_pkt):
1428         """ Compare input and output packet after passing T.Encaps for L2
1429
1430         :param tx_pkt: transmitted packet
1431         :param rx_pkt: received packet
1432         """
1433         # T.Encaps for L2 updates the headers as follows:
1434         # SR Policy seglist (S3, S2, S1)
1435         # SR Policy source C
1436         # L2:
1437         # in: L2
1438         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)L2
1439
1440         # get first (outer) IPv6 header of rx'ed packet
1441         rx_ip = rx_pkt.getlayer(IPv6)
1442         rx_srh = None
1443
1444         tx_ether = tx_pkt.getlayer(Ether)
1445
1446         # expected segment-list
1447         seglist = self.sr_policy.segments
1448         # reverse list to get order as in SRH
1449         tx_seglist = seglist[::-1]
1450
1451         # get source address of SR Policy
1452         sr_policy_source = self.sr_policy.source
1453
1454         # rx'ed packet should have SRH
1455         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1456         # get SRH
1457         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1458
1459         # received ip.src should be equal to SR Policy source
1460         self.assertEqual(rx_ip.src, sr_policy_source)
1461         # received ip.dst should be equal to sidlist[lastentry]
1462         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1463         # rx'ed seglist should be equal to seglist
1464         self.assertEqual(rx_srh.addresses, tx_seglist)
1465         # segleft should be equal to size seglist-1
1466         self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1467         # segleft should be equal to lastentry
1468         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1469         # nh should be "No Next Header" (59)
1470         self.assertEqual(rx_srh.nh, 59)
1471
1472         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1473         self.assertEqual(Ether(scapy.compat.raw(rx_srh.payload)), tx_ether)
1474
1475         self.logger.debug("packet verification: SUCCESS")
1476
1477     def compare_rx_tx_packet_T_Insert(self, tx_pkt, rx_pkt):
1478         """ Compare input and output packet after passing T.Insert
1479
1480         :param tx_pkt: transmitted packet
1481         :param rx_pkt: received packet
1482         """
1483         # T.Insert updates the headers as follows:
1484         # IPv6:
1485         # in: IPv6(A, B2)
1486         # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)
1487         # IPv6 + SRH:
1488         # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1)
1489         # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)SRH(B3, B2, B1; SL=1)
1490
1491         # get first (outer) IPv6 header of rx'ed packet
1492         rx_ip = rx_pkt.getlayer(IPv6)
1493         rx_srh = None
1494         rx_ip2 = None
1495         rx_srh2 = None
1496         rx_ip3 = None
1497         rx_udp = rx_pkt[UDP]
1498
1499         tx_ip = tx_pkt.getlayer(IPv6)
1500         tx_srh = None
1501         tx_ip2 = None
1502         # some packets have been tx'ed with an SRH, some without it
1503         # get SRH if tx'ed packet has it
1504         if tx_pkt.haslayer(IPv6ExtHdrSegmentRouting):
1505             tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1506             tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1507         tx_udp = tx_pkt[UDP]
1508
1509         # expected segment-list (make copy of SR Policy segment list)
1510         seglist = self.sr_policy.segments[:]
1511         # expected seglist has initial dest addr as last segment
1512         seglist.append(tx_ip.dst)
1513         # reverse list to get order as in SRH
1514         tx_seglist = seglist[::-1]
1515
1516         # get source address of SR Policy
1517         sr_policy_source = self.sr_policy.source
1518
1519         # checks common to cases tx with and without SRH
1520         # rx'ed packet should have SRH and only one IPv6 header
1521         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1522         self.assertFalse(rx_ip.payload.haslayer(IPv6))
1523         # get SRH
1524         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1525
1526         # rx'ed ip.src should be equal to tx'ed ip.src
1527         self.assertEqual(rx_ip.src, tx_ip.src)
1528         # rx'ed ip.dst should be equal to sidlist[lastentry]
1529         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1530
1531         # rx'ed seglist should be equal to expected seglist
1532         self.assertEqual(rx_srh.addresses, tx_seglist)
1533         # segleft should be equal to size(expected seglist)-1
1534         self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1535         # segleft should be equal to lastentry
1536         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1537
1538         if tx_srh:  # packet was tx'ed with SRH
1539             # packet should have 2nd SRH
1540             self.assertTrue(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting))
1541             # get 2nd SRH
1542             rx_srh2 = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting, 2)
1543
1544             # rx'ed srh2.addresses should be equal to tx'ed srh.addresses
1545             self.assertEqual(rx_srh2.addresses, tx_srh.addresses)
1546             # rx'ed srh2.segleft should be equal to tx'ed srh.segleft
1547             self.assertEqual(rx_srh2.segleft, tx_srh.segleft)
1548             # rx'ed srh2.lastentry should be equal to tx'ed srh.lastentry
1549             self.assertEqual(rx_srh2.lastentry, tx_srh.lastentry)
1550
1551         else:  # packet was tx'ed without SRH
1552             # rx packet should have no other SRH
1553             self.assertFalse(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting))
1554
1555         # UDP layer should be unchanged
1556         self.assertEqual(rx_udp, tx_udp)
1557
1558         self.logger.debug("packet verification: SUCCESS")
1559
1560     def compare_rx_tx_packet_End(self, tx_pkt, rx_pkt):
1561         """ Compare input and output packet after passing End (without PSP)
1562
1563         :param tx_pkt: transmitted packet
1564         :param rx_pkt: received packet
1565         """
1566         # End (no PSP) updates the headers as follows:
1567         # IPv6 + SRH:
1568         # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2)
1569         # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1570
1571         # get first (outer) IPv6 header of rx'ed packet
1572         rx_ip = rx_pkt.getlayer(IPv6)
1573         rx_srh = None
1574         rx_ip2 = None
1575         rx_udp = rx_pkt[UDP]
1576
1577         tx_ip = tx_pkt.getlayer(IPv6)
1578         # we know the packet has been tx'ed
1579         # with an inner IPv6 header and an SRH
1580         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1581         tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1582         tx_udp = tx_pkt[UDP]
1583
1584         # common checks, regardless of tx segleft value
1585         # rx'ed packet should have 2nd IPv6 header
1586         self.assertTrue(rx_ip.payload.haslayer(IPv6))
1587         # get second (inner) IPv6 header
1588         rx_ip2 = rx_pkt.getlayer(IPv6, 2)
1589
1590         if tx_ip.segleft > 0:
1591             # SRH should NOT have been popped:
1592             #   End SID without PSP does not pop SRH if segleft>0
1593             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1594             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1595
1596             # received ip.src should be equal to expected ip.src
1597             self.assertEqual(rx_ip.src, tx_ip.src)
1598             # sidlist should be unchanged
1599             self.assertEqual(rx_srh.addresses, tx_srh.addresses)
1600             # segleft should have been decremented
1601             self.assertEqual(rx_srh.segleft, tx_srh.segleft-1)
1602             # received ip.dst should be equal to sidlist[segleft]
1603             self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
1604             # lastentry should be unchanged
1605             self.assertEqual(rx_srh.lastentry, tx_srh.lastentry)
1606             # inner IPv6 packet (ip2) should be unchanged
1607             self.assertEqual(rx_ip2.src, tx_ip2.src)
1608             self.assertEqual(rx_ip2.dst, tx_ip2.dst)
1609         # else:  # tx_ip.segleft == 0
1610             # TODO: Does this work with 2 SRHs in ingress packet?
1611
1612         # UDP layer should be unchanged
1613         self.assertEqual(rx_udp, tx_udp)
1614
1615         self.logger.debug("packet verification: SUCCESS")
1616
1617     def compare_rx_tx_packet_End_PSP(self, tx_pkt, rx_pkt):
1618         """ Compare input and output packet after passing End with PSP
1619
1620         :param tx_pkt: transmitted packet
1621         :param rx_pkt: received packet
1622         """
1623         # End (PSP) updates the headers as follows:
1624         # IPv6 + SRH (SL>1):
1625         # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2)
1626         # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1627         # IPv6 + SRH (SL=1):
1628         # in: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1629         # out: IPv6(A, S3)
1630
1631         # get first (outer) IPv6 header of rx'ed packet
1632         rx_ip = rx_pkt.getlayer(IPv6)
1633         rx_srh = None
1634         rx_ip2 = None
1635         rx_udp = rx_pkt[UDP]
1636
1637         tx_ip = tx_pkt.getlayer(IPv6)
1638         # we know the packet has been tx'ed
1639         # with an inner IPv6 header and an SRH
1640         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1641         tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1642         tx_udp = tx_pkt[UDP]
1643
1644         # common checks, regardless of tx segleft value
1645         self.assertTrue(rx_ip.payload.haslayer(IPv6))
1646         rx_ip2 = rx_pkt.getlayer(IPv6, 2)
1647         # inner IPv6 packet (ip2) should be unchanged
1648         self.assertEqual(rx_ip2.src, tx_ip2.src)
1649         self.assertEqual(rx_ip2.dst, tx_ip2.dst)
1650
1651         if tx_ip.segleft > 1:
1652             # SRH should NOT have been popped:
1653             #   End SID with PSP does not pop SRH if segleft>1
1654             # rx'ed packet should have SRH
1655             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1656             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1657
1658             # received ip.src should be equal to expected ip.src
1659             self.assertEqual(rx_ip.src, tx_ip.src)
1660             # sidlist should be unchanged
1661             self.assertEqual(rx_srh.addresses, tx_srh.addresses)
1662             # segleft should have been decremented
1663             self.assertEqual(rx_srh.segleft, tx_srh.segleft-1)
1664             # received ip.dst should be equal to sidlist[segleft]
1665             self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
1666             # lastentry should be unchanged
1667             self.assertEqual(rx_srh.lastentry, tx_srh.lastentry)
1668
1669         else:  # tx_ip.segleft <= 1
1670             # SRH should have been popped:
1671             #   End SID with PSP and segleft=1 pops SRH
1672             # the two IPv6 headers are still present
1673             # outer IPv6 header has DA == last segment of popped SRH
1674             # SRH should not be present
1675             self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1676             # outer IPv6 header ip.src should be equal to tx'ed ip.src
1677             self.assertEqual(rx_ip.src, tx_ip.src)
1678             # outer IPv6 header ip.dst should be = to tx'ed sidlist[segleft-1]
1679             self.assertEqual(rx_ip.dst, tx_srh.addresses[tx_srh.segleft-1])
1680
1681         # UDP layer should be unchanged
1682         self.assertEqual(rx_udp, tx_udp)
1683
1684         self.logger.debug("packet verification: SUCCESS")
1685
1686     def compare_rx_tx_packet_End_DX6(self, tx_pkt, rx_pkt):
1687         """ Compare input and output packet after passing End.DX6
1688
1689         :param tx_pkt: transmitted packet
1690         :param rx_pkt: received packet
1691         """
1692         # End.DX6 updates the headers as follows:
1693         # IPv6 + SRH (SL=0):
1694         # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv6(B, D)
1695         # out: IPv6(B, D)
1696         # IPv6:
1697         # in: IPv6(A, S3)IPv6(B, D)
1698         # out: IPv6(B, D)
1699
1700         # get first (outer) IPv6 header of rx'ed packet
1701         rx_ip = rx_pkt.getlayer(IPv6)
1702
1703         tx_ip = tx_pkt.getlayer(IPv6)
1704         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1705
1706         # verify if rx'ed packet has no SRH
1707         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1708
1709         # the whole rx_ip pkt should be equal to tx_ip2
1710         # except for the hlim field
1711         #   -> adjust tx'ed hlim to expected hlim
1712         tx_ip2.hlim = tx_ip2.hlim - 1
1713
1714         self.assertEqual(rx_ip, tx_ip2)
1715
1716         self.logger.debug("packet verification: SUCCESS")
1717
1718     def compare_rx_tx_packet_End_DX4(self, tx_pkt, rx_pkt):
1719         """ Compare input and output packet after passing End.DX4
1720
1721         :param tx_pkt: transmitted packet
1722         :param rx_pkt: received packet
1723         """
1724         # End.DX4 updates the headers as follows:
1725         # IPv6 + SRH (SL=0):
1726         # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv4(B, D)
1727         # out: IPv4(B, D)
1728         # IPv6:
1729         # in: IPv6(A, S3)IPv4(B, D)
1730         # out: IPv4(B, D)
1731
1732         # get IPv4 header of rx'ed packet
1733         rx_ip = rx_pkt.getlayer(IP)
1734
1735         tx_ip = tx_pkt.getlayer(IPv6)
1736         tx_ip2 = tx_pkt.getlayer(IP)
1737
1738         # verify if rx'ed packet has no SRH
1739         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1740
1741         # the whole rx_ip pkt should be equal to tx_ip2
1742         # except for the ttl field and ip checksum
1743         #   -> adjust tx'ed ttl to expected ttl
1744         tx_ip2.ttl = tx_ip2.ttl - 1
1745         #   -> set tx'ed ip checksum to None and let scapy recompute
1746         tx_ip2.chksum = None
1747         # read back the pkt (with str()) to force computing these fields
1748         # probably other ways to accomplish this are possible
1749         tx_ip2 = IP(scapy.compat.raw(tx_ip2))
1750
1751         self.assertEqual(rx_ip, tx_ip2)
1752
1753         self.logger.debug("packet verification: SUCCESS")
1754
1755     def compare_rx_tx_packet_End_DX2(self, tx_pkt, rx_pkt):
1756         """ Compare input and output packet after passing End.DX2
1757
1758         :param tx_pkt: transmitted packet
1759         :param rx_pkt: received packet
1760         """
1761         # End.DX2 updates the headers as follows:
1762         # IPv6 + SRH (SL=0):
1763         # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)L2
1764         # out: L2
1765         # IPv6:
1766         # in: IPv6(A, S3)L2
1767         # out: L2
1768
1769         # get IPv4 header of rx'ed packet
1770         rx_eth = rx_pkt.getlayer(Ether)
1771
1772         tx_ip = tx_pkt.getlayer(IPv6)
1773         # we can't just get the 2nd Ether layer
1774         # get the Raw content and dissect it as Ether
1775         tx_eth1 = Ether(scapy.compat.raw(tx_pkt[Raw]))
1776
1777         # verify if rx'ed packet has no SRH
1778         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1779
1780         # the whole rx_eth pkt should be equal to tx_eth1
1781         self.assertEqual(rx_eth, tx_eth1)
1782
1783         self.logger.debug("packet verification: SUCCESS")
1784
1785     def create_stream(self, src_if, dst_if, packet_header, packet_sizes,
1786                       count):
1787         """Create SRv6 input packet stream for defined interface.
1788
1789         :param VppInterface src_if: Interface to create packet stream for
1790         :param VppInterface dst_if: destination interface of packet stream
1791         :param packet_header: Layer3 scapy packet headers,
1792                 L2 is added when not provided,
1793                 Raw(payload) with packet_info is added
1794         :param list packet_sizes: packet stream pckt sizes,sequentially applied
1795                to packets in stream have
1796         :param int count: number of packets in packet stream
1797         :return: list of packets
1798         """
1799         self.logger.info("Creating packets")
1800         pkts = []
1801         for i in range(0, count-1):
1802             payload_info = self.create_packet_info(src_if, dst_if)
1803             self.logger.debug(
1804                 "Creating packet with index %d" % (payload_info.index))
1805             payload = self.info_to_payload(payload_info)
1806             # add L2 header if not yet provided in packet_header
1807             if packet_header.getlayer(0).name == 'Ethernet':
1808                 p = (packet_header /
1809                      Raw(payload))
1810             else:
1811                 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
1812                      packet_header /
1813                      Raw(payload))
1814             size = packet_sizes[i % len(packet_sizes)]
1815             self.logger.debug("Packet size %d" % (size))
1816             self.extend_packet(p, size)
1817             # we need to store the packet with the automatic fields computed
1818             # read back the dumped packet (with str())
1819             # to force computing these fields
1820             # probably other ways are possible
1821             p = Ether(scapy.compat.raw(p))
1822             payload_info.data = p.copy()
1823             self.logger.debug(ppp("Created packet:", p))
1824             pkts.append(p)
1825         self.logger.info("Done creating packets")
1826         return pkts
1827
1828     def send_and_verify_pkts(self, input, pkts, output, compare_func):
1829         """Send packets and verify received packets using compare_func
1830
1831         :param input: ingress interface of DUT
1832         :param pkts: list of packets to transmit
1833         :param output: egress interface of DUT
1834         :param compare_func: function to compare in and out packets
1835         """
1836         # add traffic stream to input interface
1837         input.add_stream(pkts)
1838
1839         # enable capture on all interfaces
1840         self.pg_enable_capture(self.pg_interfaces)
1841
1842         # start traffic
1843         self.logger.info("Starting traffic")
1844         self.pg_start()
1845
1846         # get output capture
1847         self.logger.info("Getting packet capture")
1848         capture = output.get_capture()
1849
1850         # assert nothing was captured on input interface
1851         input.assert_nothing_captured()
1852
1853         # verify captured packets
1854         self.verify_captured_pkts(output, capture, compare_func)
1855
1856     def create_packet_header_IPv6(self, dst):
1857         """Create packet header: IPv6 header, UDP header
1858
1859         :param dst: IPv6 destination address
1860
1861         IPv6 source address is 1234::1
1862         UDP source port and destination port are 1234
1863         """
1864
1865         p = (IPv6(src='1234::1', dst=dst) /
1866              UDP(sport=1234, dport=1234))
1867         return p
1868
1869     def create_packet_header_IPv6_SRH(self, sidlist, segleft):
1870         """Create packet header: IPv6 header with SRH, UDP header
1871
1872         :param list sidlist: segment list
1873         :param int segleft: segments-left field value
1874
1875         IPv6 destination address is set to sidlist[segleft]
1876         IPv6 source addresses are 1234::1 and 4321::1
1877         UDP source port and destination port are 1234
1878         """
1879
1880         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1881              IPv6ExtHdrSegmentRouting(addresses=sidlist) /
1882              UDP(sport=1234, dport=1234))
1883         return p
1884
1885     def create_packet_header_IPv6_SRH_IPv6(self, dst, sidlist, segleft):
1886         """Create packet header: IPv6 encapsulated in SRv6:
1887         IPv6 header with SRH, IPv6 header, UDP header
1888
1889         :param ipv6address dst: inner IPv6 destination address
1890         :param list sidlist: segment list of outer IPv6 SRH
1891         :param int segleft: segments-left field of outer IPv6 SRH
1892
1893         Outer IPv6 destination address is set to sidlist[segleft]
1894         IPv6 source addresses are 1234::1 and 4321::1
1895         UDP source port and destination port are 1234
1896         """
1897
1898         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1899              IPv6ExtHdrSegmentRouting(addresses=sidlist,
1900                                       segleft=segleft, nh=41) /
1901              IPv6(src='4321::1', dst=dst) /
1902              UDP(sport=1234, dport=1234))
1903         return p
1904
1905     def create_packet_header_IPv6_IPv6(self, dst_inner, dst_outer):
1906         """Create packet header: IPv6 encapsulated in IPv6:
1907         IPv6 header, IPv6 header, UDP header
1908
1909         :param ipv6address dst_inner: inner IPv6 destination address
1910         :param ipv6address dst_outer: outer IPv6 destination address
1911
1912         IPv6 source addresses are 1234::1 and 4321::1
1913         UDP source port and destination port are 1234
1914         """
1915
1916         p = (IPv6(src='1234::1', dst=dst_outer) /
1917              IPv6(src='4321::1', dst=dst_inner) /
1918              UDP(sport=1234, dport=1234))
1919         return p
1920
1921     def create_packet_header_IPv6_SRH_SRH_IPv6(self, dst, sidlist1, segleft1,
1922                                                sidlist2, segleft2):
1923         """Create packet header: IPv6 encapsulated in SRv6 with 2 SRH:
1924         IPv6 header with SRH, 2nd SRH, IPv6 header, UDP header
1925
1926         :param ipv6address dst: inner IPv6 destination address
1927         :param list sidlist1: segment list of outer IPv6 SRH
1928         :param int segleft1: segments-left field of outer IPv6 SRH
1929         :param list sidlist2: segment list of inner IPv6 SRH
1930         :param int segleft2: segments-left field of inner IPv6 SRH
1931
1932         Outer IPv6 destination address is set to sidlist[segleft]
1933         IPv6 source addresses are 1234::1 and 4321::1
1934         UDP source port and destination port are 1234
1935         """
1936
1937         p = (IPv6(src='1234::1', dst=sidlist1[segleft1]) /
1938              IPv6ExtHdrSegmentRouting(addresses=sidlist1,
1939                                       segleft=segleft1, nh=43) /
1940              IPv6ExtHdrSegmentRouting(addresses=sidlist2,
1941                                       segleft=segleft2, nh=41) /
1942              IPv6(src='4321::1', dst=dst) /
1943              UDP(sport=1234, dport=1234))
1944         return p
1945
1946     def create_packet_header_IPv4(self, dst):
1947         """Create packet header: IPv4 header, UDP header
1948
1949         :param dst: IPv4 destination address
1950
1951         IPv4 source address is 123.1.1.1
1952         UDP source port and destination port are 1234
1953         """
1954
1955         p = (IP(src='123.1.1.1', dst=dst) /
1956              UDP(sport=1234, dport=1234))
1957         return p
1958
1959     def create_packet_header_IPv6_IPv4(self, dst_inner, dst_outer):
1960         """Create packet header: IPv4 encapsulated in IPv6:
1961         IPv6 header, IPv4 header, UDP header
1962
1963         :param ipv4address dst_inner: inner IPv4 destination address
1964         :param ipv6address dst_outer: outer IPv6 destination address
1965
1966         IPv6 source address is 1234::1
1967         IPv4 source address is 123.1.1.1
1968         UDP source port and destination port are 1234
1969         """
1970
1971         p = (IPv6(src='1234::1', dst=dst_outer) /
1972              IP(src='123.1.1.1', dst=dst_inner) /
1973              UDP(sport=1234, dport=1234))
1974         return p
1975
1976     def create_packet_header_IPv6_SRH_IPv4(self, dst, sidlist, segleft):
1977         """Create packet header: IPv4 encapsulated in SRv6:
1978         IPv6 header with SRH, IPv4 header, UDP header
1979
1980         :param ipv4address dst: inner IPv4 destination address
1981         :param list sidlist: segment list of outer IPv6 SRH
1982         :param int segleft: segments-left field of outer IPv6 SRH
1983
1984         Outer IPv6 destination address is set to sidlist[segleft]
1985         IPv6 source address is 1234::1
1986         IPv4 source address is 123.1.1.1
1987         UDP source port and destination port are 1234
1988         """
1989
1990         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1991              IPv6ExtHdrSegmentRouting(addresses=sidlist,
1992                                       segleft=segleft, nh=4) /
1993              IP(src='123.1.1.1', dst=dst) /
1994              UDP(sport=1234, dport=1234))
1995         return p
1996
1997     def create_packet_header_L2(self, vlan=0):
1998         """Create packet header: L2 header
1999
2000         :param vlan: if vlan!=0 then add 802.1q header
2001         """
2002         # Note: the dst addr ('00:55:44:33:22:11') is used in
2003         # the compare function compare_rx_tx_packet_T_Encaps_L2
2004         # to detect presence of L2 in SRH payload
2005         p = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
2006         etype = 0x8137  # IPX
2007         if vlan:
2008             # add 802.1q layer
2009             p /= Dot1Q(vlan=vlan, type=etype)
2010         else:
2011             p.type = etype
2012         return p
2013
2014     def create_packet_header_IPv6_SRH_L2(self, sidlist, segleft, vlan=0):
2015         """Create packet header: L2 encapsulated in SRv6:
2016         IPv6 header with SRH, L2
2017
2018         :param list sidlist: segment list of outer IPv6 SRH
2019         :param int segleft: segments-left field of outer IPv6 SRH
2020         :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
2021
2022         Outer IPv6 destination address is set to sidlist[segleft]
2023         IPv6 source address is 1234::1
2024         """
2025         eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
2026         etype = 0x8137  # IPX
2027         if vlan:
2028             # add 802.1q layer
2029             eth /= Dot1Q(vlan=vlan, type=etype)
2030         else:
2031             eth.type = etype
2032
2033         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
2034              IPv6ExtHdrSegmentRouting(addresses=sidlist,
2035                                       segleft=segleft, nh=59) /
2036              eth)
2037         return p
2038
2039     def create_packet_header_IPv6_L2(self, dst_outer, vlan=0):
2040         """Create packet header: L2 encapsulated in IPv6:
2041         IPv6 header, L2
2042
2043         :param ipv6address dst_outer: outer IPv6 destination address
2044         :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
2045         """
2046         eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
2047         etype = 0x8137  # IPX
2048         if vlan:
2049             # add 802.1q layer
2050             eth /= Dot1Q(vlan=vlan, type=etype)
2051         else:
2052             eth.type = etype
2053
2054         p = (IPv6(src='1234::1', dst=dst_outer, nh=59) / eth)
2055         return p
2056
2057     def get_payload_info(self, packet):
2058         """ Extract the payload_info from the packet
2059         """
2060         # in most cases, payload_info is in packet[Raw]
2061         # but packet[Raw] gives the complete payload
2062         # (incl L2 header) for the T.Encaps L2 case
2063         try:
2064             payload_info = self.payload_to_info(packet[Raw])
2065
2066         except:
2067             # remote L2 header from packet[Raw]:
2068             # take packet[Raw], convert it to an Ether layer
2069             # and then extract Raw from it
2070             payload_info = self.payload_to_info(
2071                 Ether(scapy.compat.r(packet[Raw]))[Raw])
2072
2073         return payload_info
2074
2075     def verify_captured_pkts(self, dst_if, capture, compare_func):
2076         """
2077         Verify captured packet stream for specified interface.
2078         Compare ingress with egress packets using the specified compare fn
2079
2080         :param dst_if: egress interface of DUT
2081         :param capture: captured packets
2082         :param compare_func: function to compare in and out packet
2083         """
2084         self.logger.info("Verifying capture on interface %s using function %s"
2085                          % (dst_if.name, compare_func.__name__))
2086
2087         last_info = dict()
2088         for i in self.pg_interfaces:
2089             last_info[i.sw_if_index] = None
2090         dst_sw_if_index = dst_if.sw_if_index
2091
2092         for packet in capture:
2093             try:
2094                 # extract payload_info from packet's payload
2095                 payload_info = self.get_payload_info(packet)
2096                 packet_index = payload_info.index
2097
2098                 self.logger.debug("Verifying packet with index %d"
2099                                   % (packet_index))
2100                 # packet should have arrived on the expected interface
2101                 self.assertEqual(payload_info.dst, dst_sw_if_index)
2102                 self.logger.debug(
2103                     "Got packet on interface %s: src=%u (idx=%u)" %
2104                     (dst_if.name, payload_info.src, packet_index))
2105
2106                 # search for payload_info with same src and dst if_index
2107                 # this will give us the transmitted packet
2108                 next_info = self.get_next_packet_info_for_interface2(
2109                     payload_info.src, dst_sw_if_index,
2110                     last_info[payload_info.src])
2111                 last_info[payload_info.src] = next_info
2112                 # next_info should not be None
2113                 self.assertTrue(next_info is not None)
2114                 # index of tx and rx packets should be equal
2115                 self.assertEqual(packet_index, next_info.index)
2116                 # data field of next_info contains the tx packet
2117                 txed_packet = next_info.data
2118
2119                 self.logger.debug(ppp("Transmitted packet:",
2120                                       txed_packet))  # ppp=Pretty Print Packet
2121
2122                 self.logger.debug(ppp("Received packet:", packet))
2123
2124                 # compare rcvd packet with expected packet using compare_func
2125                 compare_func(txed_packet, packet)
2126
2127             except:
2128                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2129                 raise
2130
2131         # have all expected packets arrived?
2132         for i in self.pg_interfaces:
2133             remaining_packet = self.get_next_packet_info_for_interface2(
2134                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
2135             self.assertTrue(remaining_packet is None,
2136                             "Interface %s: Packet expected from interface %s "
2137                             "didn't arrive" % (dst_if.name, i.name))
2138
2139
2140 if __name__ == '__main__':
2141     unittest.main(testRunner=VppTestRunner)