add classify session action set-sr-policy-index
[vpp.git] / test / test_srv6.py
1 #!/usr/bin/env python
2
3 import unittest
4 import binascii
5 from socket import AF_INET6
6
7 from framework import VppTestCase, VppTestRunner
8 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
9 from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \
10     SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes
11
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether, Dot1Q
14 from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
15 from scapy.layers.inet import IP, UDP
16
17 from scapy.utils import inet_pton, inet_ntop
18
19 from util import ppp
20
21
22 class TestSRv6(VppTestCase):
23     """ SRv6 Test Case """
24
25     @classmethod
26     def setUpClass(self):
27         super(TestSRv6, self).setUpClass()
28
29     def setUp(self):
30         """ Perform test setup before each test case.
31         """
32         super(TestSRv6, self).setUp()
33
34         # packet sizes, inclusive L2 overhead
35         self.pg_packet_sizes = [64, 512, 1518, 9018]
36
37         # reset packet_infos
38         self.reset_packet_infos()
39
40     def tearDown(self):
41         """ Clean up test setup after each test case.
42         """
43         self.teardown_interfaces()
44
45         super(TestSRv6, self).tearDown()
46
47     def configure_interface(self,
48                             interface,
49                             ipv6=False, ipv4=False,
50                             ipv6_table_id=0, ipv4_table_id=0):
51         """ Configure interface.
52         :param ipv6: configure IPv6 on interface
53         :param ipv4: configure IPv4 on interface
54         :param ipv6_table_id: FIB table_id for IPv6
55         :param ipv4_table_id: FIB table_id for IPv4
56         """
57         self.logger.debug("Configuring interface %s" % (interface.name))
58         if ipv6:
59             self.logger.debug("Configuring IPv6")
60             interface.set_table_ip6(ipv6_table_id)
61             interface.config_ip6()
62             interface.resolve_ndp(timeout=5)
63         if ipv4:
64             self.logger.debug("Configuring IPv4")
65             interface.set_table_ip4(ipv4_table_id)
66             interface.config_ip4()
67             interface.resolve_arp()
68         interface.admin_up()
69
70     def setup_interfaces(self, ipv6=[], ipv4=[],
71                          ipv6_table_id=[], ipv4_table_id=[]):
72         """ Create and configure interfaces.
73
74         :param ipv6: list of interface IPv6 capabilities
75         :param ipv4: list of interface IPv4 capabilities
76         :param ipv6_table_id: list of intf IPv6 FIB table_ids
77         :param ipv4_table_id: list of intf IPv4 FIB table_ids
78         :returns: List of created interfaces.
79         """
80         # how many interfaces?
81         if len(ipv6):
82             count = len(ipv6)
83         else:
84             count = len(ipv4)
85         self.logger.debug("Creating and configuring %d interfaces" % (count))
86
87         # fill up ipv6 and ipv4 lists if needed
88         # not enabled (False) is the default
89         if len(ipv6) < count:
90             ipv6 += (count - len(ipv6)) * [False]
91         if len(ipv4) < count:
92             ipv4 += (count - len(ipv4)) * [False]
93
94         # fill up table_id lists if needed
95         # table_id 0 (global) is the default
96         if len(ipv6_table_id) < count:
97             ipv6_table_id += (count - len(ipv6_table_id)) * [0]
98         if len(ipv4_table_id) < count:
99             ipv4_table_id += (count - len(ipv4_table_id)) * [0]
100
101         # create 'count' pg interfaces
102         self.create_pg_interfaces(range(count))
103
104         # setup all interfaces
105         for i in range(count):
106             intf = self.pg_interfaces[i]
107             self.configure_interface(intf,
108                                      ipv6[i], ipv4[i],
109                                      ipv6_table_id[i], ipv4_table_id[i])
110
111         if any(ipv6):
112             self.logger.debug(self.vapi.cli("show ip6 neighbors"))
113         if any(ipv4):
114             self.logger.debug(self.vapi.cli("show ip arp"))
115         self.logger.debug(self.vapi.cli("show interface"))
116         self.logger.debug(self.vapi.cli("show hardware"))
117
118         return self.pg_interfaces
119
120     def teardown_interfaces(self):
121         """ Unconfigure and bring down interface.
122         """
123         self.logger.debug("Tearing down interfaces")
124         # tear down all interfaces
125         # AFAIK they cannot be deleted
126         for i in self.pg_interfaces:
127             self.logger.debug("Tear down interface %s" % (i.name))
128             i.admin_down()
129             i.unconfig()
130
131     def test_SRv6_T_Encaps(self):
132         """ Test SRv6 Transit.Encaps behavior for IPv6.
133         """
134         # send traffic to one destination interface
135         # source and destination are IPv6 only
136         self.setup_interfaces(ipv6=[True, True])
137
138         # configure FIB entries
139         route = VppIpRoute(self, "a4::", 64,
140                            [VppRoutePath(self.pg1.remote_ip6,
141                                          self.pg1.sw_if_index,
142                                          proto=DpoProto.DPO_PROTO_IP6)],
143                            is_ip6=1)
144         route.add_vpp_config()
145
146         # configure encaps IPv6 source address
147         # needs to be done before SR Policy config
148         # TODO: API?
149         self.vapi.cli("set sr encaps source addr a3::")
150
151         bsid = 'a3::9999:1'
152         # configure SRv6 Policy
153         # Note: segment list order: first -> last
154         sr_policy = VppSRv6Policy(
155             self, bsid=bsid,
156             is_encap=1,
157             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
158             weight=1, fib_table=0,
159             segments=['a4::', 'a5::', 'a6::c7'],
160             source='a3::')
161         sr_policy.add_vpp_config()
162         self.sr_policy = sr_policy
163
164         # log the sr policies
165         self.logger.info(self.vapi.cli("show sr policies"))
166
167         # steer IPv6 traffic to a7::/64 into SRv6 Policy
168         # use the bsid of the above self.sr_policy
169         pol_steering = VppSRv6Steering(
170                         self,
171                         bsid=self.sr_policy.bsid,
172                         prefix="a7::", mask_width=64,
173                         traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
174                         sr_policy_index=0, table_id=0,
175                         sw_if_index=0)
176         pol_steering.add_vpp_config()
177
178         # log the sr steering policies
179         self.logger.info(self.vapi.cli("show sr steering policies"))
180
181         # create packets
182         count = len(self.pg_packet_sizes)
183         dst_inner = 'a7::1234'
184         pkts = []
185
186         # create IPv6 packets without SRH
187         packet_header = self.create_packet_header_IPv6(dst_inner)
188         # create traffic stream pg0->pg1
189         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
190                                        self.pg_packet_sizes, count))
191
192         # create IPv6 packets with SRH
193         # packets with segments-left 1, active segment a7::
194         packet_header = self.create_packet_header_IPv6_SRH(
195             sidlist=['a8::', 'a7::', 'a6::'],
196             segleft=1)
197         # create traffic stream pg0->pg1
198         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
199                                        self.pg_packet_sizes, count))
200
201         # create IPv6 packets with SRH and IPv6
202         # packets with segments-left 1, active segment a7::
203         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
204             dst_inner,
205             sidlist=['a8::', 'a7::', 'a6::'],
206             segleft=1)
207         # create traffic stream pg0->pg1
208         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
209                                        self.pg_packet_sizes, count))
210
211         # send packets and verify received packets
212         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
213                                   self.compare_rx_tx_packet_T_Encaps)
214
215         # log the localsid counters
216         self.logger.info(self.vapi.cli("show sr localsid"))
217
218         # remove SR steering
219         pol_steering.remove_vpp_config()
220         self.logger.info(self.vapi.cli("show sr steering policies"))
221
222         # remove SR Policies
223         self.sr_policy.remove_vpp_config()
224         self.logger.info(self.vapi.cli("show sr policies"))
225
226         # remove FIB entries
227         # done by tearDown
228
229         # cleanup interfaces
230         self.teardown_interfaces()
231
232     def test_SRv6_T_Insert(self):
233         """ Test SRv6 Transit.Insert behavior (IPv6 only).
234         """
235         # send traffic to one destination interface
236         # source and destination are IPv6 only
237         self.setup_interfaces(ipv6=[True, True])
238
239         # configure FIB entries
240         route = VppIpRoute(self, "a4::", 64,
241                            [VppRoutePath(self.pg1.remote_ip6,
242                                          self.pg1.sw_if_index,
243                                          proto=DpoProto.DPO_PROTO_IP6)],
244                            is_ip6=1)
245         route.add_vpp_config()
246
247         # configure encaps IPv6 source address
248         # needs to be done before SR Policy config
249         # TODO: API?
250         self.vapi.cli("set sr encaps source addr a3::")
251
252         bsid = 'a3::9999:1'
253         # configure SRv6 Policy
254         # Note: segment list order: first -> last
255         sr_policy = VppSRv6Policy(
256             self, bsid=bsid,
257             is_encap=0,
258             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
259             weight=1, fib_table=0,
260             segments=['a4::', 'a5::', 'a6::c7'],
261             source='a3::')
262         sr_policy.add_vpp_config()
263         self.sr_policy = sr_policy
264
265         # log the sr policies
266         self.logger.info(self.vapi.cli("show sr policies"))
267
268         # steer IPv6 traffic to a7::/64 into SRv6 Policy
269         # use the bsid of the above self.sr_policy
270         pol_steering = VppSRv6Steering(
271                         self,
272                         bsid=self.sr_policy.bsid,
273                         prefix="a7::", mask_width=64,
274                         traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
275                         sr_policy_index=0, table_id=0,
276                         sw_if_index=0)
277         pol_steering.add_vpp_config()
278
279         # log the sr steering policies
280         self.logger.info(self.vapi.cli("show sr steering policies"))
281
282         # create packets
283         count = len(self.pg_packet_sizes)
284         dst_inner = 'a7::1234'
285         pkts = []
286
287         # create IPv6 packets without SRH
288         packet_header = self.create_packet_header_IPv6(dst_inner)
289         # create traffic stream pg0->pg1
290         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
291                                        self.pg_packet_sizes, count))
292
293         # create IPv6 packets with SRH
294         # packets with segments-left 1, active segment a7::
295         packet_header = self.create_packet_header_IPv6_SRH(
296             sidlist=['a8::', 'a7::', 'a6::'],
297             segleft=1)
298         # create traffic stream pg0->pg1
299         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
300                                        self.pg_packet_sizes, count))
301
302         # send packets and verify received packets
303         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
304                                   self.compare_rx_tx_packet_T_Insert)
305
306         # log the localsid counters
307         self.logger.info(self.vapi.cli("show sr localsid"))
308
309         # remove SR steering
310         pol_steering.remove_vpp_config()
311         self.logger.info(self.vapi.cli("show sr steering policies"))
312
313         # remove SR Policies
314         self.sr_policy.remove_vpp_config()
315         self.logger.info(self.vapi.cli("show sr policies"))
316
317         # remove FIB entries
318         # done by tearDown
319
320         # cleanup interfaces
321         self.teardown_interfaces()
322
323     def test_SRv6_T_Encaps_IPv4(self):
324         """ Test SRv6 Transit.Encaps behavior for IPv4.
325         """
326         # send traffic to one destination interface
327         # source interface is IPv4 only
328         # destination interface is IPv6 only
329         self.setup_interfaces(ipv6=[False, True], ipv4=[True, False])
330
331         # configure FIB entries
332         route = VppIpRoute(self, "a4::", 64,
333                            [VppRoutePath(self.pg1.remote_ip6,
334                                          self.pg1.sw_if_index,
335                                          proto=DpoProto.DPO_PROTO_IP6)],
336                            is_ip6=1)
337         route.add_vpp_config()
338
339         # configure encaps IPv6 source address
340         # needs to be done before SR Policy config
341         # TODO: API?
342         self.vapi.cli("set sr encaps source addr a3::")
343
344         bsid = 'a3::9999:1'
345         # configure SRv6 Policy
346         # Note: segment list order: first -> last
347         sr_policy = VppSRv6Policy(
348             self, bsid=bsid,
349             is_encap=1,
350             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
351             weight=1, fib_table=0,
352             segments=['a4::', 'a5::', 'a6::c7'],
353             source='a3::')
354         sr_policy.add_vpp_config()
355         self.sr_policy = sr_policy
356
357         # log the sr policies
358         self.logger.info(self.vapi.cli("show sr policies"))
359
360         # steer IPv4 traffic to 7.1.1.0/24 into SRv6 Policy
361         # use the bsid of the above self.sr_policy
362         pol_steering = VppSRv6Steering(
363                         self,
364                         bsid=self.sr_policy.bsid,
365                         prefix="7.1.1.0", mask_width=24,
366                         traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV4,
367                         sr_policy_index=0, table_id=0,
368                         sw_if_index=0)
369         pol_steering.add_vpp_config()
370
371         # log the sr steering policies
372         self.logger.info(self.vapi.cli("show sr steering policies"))
373
374         # create packets
375         count = len(self.pg_packet_sizes)
376         dst_inner = '7.1.1.123'
377         pkts = []
378
379         # create IPv4 packets
380         packet_header = self.create_packet_header_IPv4(dst_inner)
381         # create traffic stream pg0->pg1
382         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
383                                        self.pg_packet_sizes, count))
384
385         # send packets and verify received packets
386         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
387                                   self.compare_rx_tx_packet_T_Encaps_IPv4)
388
389         # log the localsid counters
390         self.logger.info(self.vapi.cli("show sr localsid"))
391
392         # remove SR steering
393         pol_steering.remove_vpp_config()
394         self.logger.info(self.vapi.cli("show sr steering policies"))
395
396         # remove SR Policies
397         self.sr_policy.remove_vpp_config()
398         self.logger.info(self.vapi.cli("show sr policies"))
399
400         # remove FIB entries
401         # done by tearDown
402
403         # cleanup interfaces
404         self.teardown_interfaces()
405
406     @unittest.skip("VPP crashes after running this test")
407     def test_SRv6_T_Encaps_L2(self):
408         """ Test SRv6 Transit.Encaps behavior for L2.
409         """
410         # send traffic to one destination interface
411         # source interface is IPv4 only TODO?
412         # destination interface is IPv6 only
413         self.setup_interfaces(ipv6=[False, True], ipv4=[False, False])
414
415         # configure FIB entries
416         route = VppIpRoute(self, "a4::", 64,
417                            [VppRoutePath(self.pg1.remote_ip6,
418                                          self.pg1.sw_if_index,
419                                          proto=DpoProto.DPO_PROTO_IP6)],
420                            is_ip6=1)
421         route.add_vpp_config()
422
423         # configure encaps IPv6 source address
424         # needs to be done before SR Policy config
425         # TODO: API?
426         self.vapi.cli("set sr encaps source addr a3::")
427
428         bsid = 'a3::9999:1'
429         # configure SRv6 Policy
430         # Note: segment list order: first -> last
431         sr_policy = VppSRv6Policy(
432             self, bsid=bsid,
433             is_encap=1,
434             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
435             weight=1, fib_table=0,
436             segments=['a4::', 'a5::', 'a6::c7'],
437             source='a3::')
438         sr_policy.add_vpp_config()
439         self.sr_policy = sr_policy
440
441         # log the sr policies
442         self.logger.info(self.vapi.cli("show sr policies"))
443
444         # steer L2 traffic into SRv6 Policy
445         # use the bsid of the above self.sr_policy
446         pol_steering = VppSRv6Steering(
447                         self,
448                         bsid=self.sr_policy.bsid,
449                         prefix="::", mask_width=0,
450                         traffic_type=SRv6PolicySteeringTypes.SR_STEER_L2,
451                         sr_policy_index=0, table_id=0,
452                         sw_if_index=self.pg0.sw_if_index)
453         pol_steering.add_vpp_config()
454
455         # log the sr steering policies
456         self.logger.info(self.vapi.cli("show sr steering policies"))
457
458         # create packets
459         count = len(self.pg_packet_sizes)
460         pkts = []
461
462         # create L2 packets without dot1q header
463         packet_header = self.create_packet_header_L2()
464         # create traffic stream pg0->pg1
465         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
466                                        self.pg_packet_sizes, count))
467
468         # create L2 packets with dot1q header
469         packet_header = self.create_packet_header_L2(vlan=123)
470         # create traffic stream pg0->pg1
471         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
472                                        self.pg_packet_sizes, count))
473
474         # send packets and verify received packets
475         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
476                                   self.compare_rx_tx_packet_T_Encaps_L2)
477
478         # log the localsid counters
479         self.logger.info(self.vapi.cli("show sr localsid"))
480
481         # remove SR steering
482         pol_steering.remove_vpp_config()
483         self.logger.info(self.vapi.cli("show sr steering policies"))
484
485         # remove SR Policies
486         self.sr_policy.remove_vpp_config()
487         self.logger.info(self.vapi.cli("show sr policies"))
488
489         # remove FIB entries
490         # done by tearDown
491
492         # cleanup interfaces
493         self.teardown_interfaces()
494
495     def test_SRv6_End(self):
496         """ Test SRv6 End (without PSP) behavior.
497         """
498         # send traffic to one destination interface
499         # source and destination interfaces are IPv6 only
500         self.setup_interfaces(ipv6=[True, True])
501
502         # configure FIB entries
503         route = VppIpRoute(self, "a4::", 64,
504                            [VppRoutePath(self.pg1.remote_ip6,
505                                          self.pg1.sw_if_index,
506                                          proto=DpoProto.DPO_PROTO_IP6)],
507                            is_ip6=1)
508         route.add_vpp_config()
509
510         # configure SRv6 localSID End without PSP behavior
511         localsid = VppSRv6LocalSID(
512                         self, localsid_addr='A3::0',
513                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
514                         nh_addr='::',
515                         end_psp=0,
516                         sw_if_index=0,
517                         vlan_index=0,
518                         fib_table=0)
519         localsid.add_vpp_config()
520         # log the localsids
521         self.logger.debug(self.vapi.cli("show sr localsid"))
522
523         # create IPv6 packets with SRH (SL=2, SL=1, SL=0)
524         # send one packet per SL value per packet size
525         # SL=0 packet with localSID End with USP needs 2nd SRH
526         count = len(self.pg_packet_sizes)
527         dst_inner = 'a4::1234'
528         pkts = []
529
530         # packets with segments-left 2, active segment a3::
531         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
532                 dst_inner,
533                 sidlist=['a5::', 'a4::', 'a3::'],
534                 segleft=2)
535         # create traffic stream pg0->pg1
536         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
537                                        self.pg_packet_sizes, count))
538
539         # packets with segments-left 1, active segment a3::
540         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
541                 dst_inner,
542                 sidlist=['a4::', 'a3::', 'a2::'],
543                 segleft=1)
544         # add to traffic stream pg0->pg1
545         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
546                                        self.pg_packet_sizes, count))
547
548         # TODO: test behavior with SL=0 packet (needs 2*SRH?)
549
550         # send packets and verify received packets
551         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
552                                   self.compare_rx_tx_packet_End)
553
554         # log the localsid counters
555         self.logger.info(self.vapi.cli("show sr localsid"))
556
557         # remove SRv6 localSIDs
558         localsid.remove_vpp_config()
559
560         # remove FIB entries
561         # done by tearDown
562
563         # cleanup interfaces
564         self.teardown_interfaces()
565
566     def test_SRv6_End_with_PSP(self):
567         """ Test SRv6 End with PSP behavior.
568         """
569         # send traffic to one destination interface
570         # source and destination interfaces are IPv6 only
571         self.setup_interfaces(ipv6=[True, True])
572
573         # configure FIB entries
574         route = VppIpRoute(self, "a4::", 64,
575                            [VppRoutePath(self.pg1.remote_ip6,
576                                          self.pg1.sw_if_index,
577                                          proto=DpoProto.DPO_PROTO_IP6)],
578                            is_ip6=1)
579         route.add_vpp_config()
580
581         # configure SRv6 localSID End with PSP behavior
582         localsid = VppSRv6LocalSID(
583                         self, localsid_addr='A3::0',
584                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
585                         nh_addr='::',
586                         end_psp=1,
587                         sw_if_index=0,
588                         vlan_index=0,
589                         fib_table=0)
590         localsid.add_vpp_config()
591         # log the localsids
592         self.logger.debug(self.vapi.cli("show sr localsid"))
593
594         # create IPv6 packets with SRH (SL=2, SL=1)
595         # send one packet per SL value per packet size
596         # SL=0 packet with localSID End with PSP is dropped
597         count = len(self.pg_packet_sizes)
598         dst_inner = 'a4::1234'
599         pkts = []
600
601         # packets with segments-left 2, active segment a3::
602         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
603                     dst_inner,
604                     sidlist=['a5::', 'a4::', 'a3::'],
605                     segleft=2)
606         # create traffic stream pg0->pg1
607         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
608                                        self.pg_packet_sizes, count))
609
610         # packets with segments-left 1, active segment a3::
611         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
612                     dst_inner,
613                     sidlist=['a4::', 'a3::', 'a2::'],
614                     segleft=1)
615         # add to traffic stream pg0->pg1
616         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
617                                        self.pg_packet_sizes, count))
618
619         # send packets and verify received packets
620         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
621                                   self.compare_rx_tx_packet_End_PSP)
622
623         # log the localsid counters
624         self.logger.info(self.vapi.cli("show sr localsid"))
625
626         # remove SRv6 localSIDs
627         localsid.remove_vpp_config()
628
629         # remove FIB entries
630         # done by tearDown
631
632         # cleanup interfaces
633         self.teardown_interfaces()
634
635     def test_SRv6_End_X(self):
636         """ Test SRv6 End.X (without PSP) behavior.
637         """
638         # create three interfaces (1 source, 2 destinations)
639         # source and destination interfaces are IPv6 only
640         self.setup_interfaces(ipv6=[True, True, True])
641
642         # configure FIB entries
643         # a4::/64 via pg1 and pg2
644         route = VppIpRoute(self, "a4::", 64,
645                            [VppRoutePath(self.pg1.remote_ip6,
646                                          self.pg1.sw_if_index,
647                                          proto=DpoProto.DPO_PROTO_IP6),
648                             VppRoutePath(self.pg2.remote_ip6,
649                                          self.pg2.sw_if_index,
650                                          proto=DpoProto.DPO_PROTO_IP6)],
651                            is_ip6=1)
652         route.add_vpp_config()
653         self.logger.debug(self.vapi.cli("show ip6 fib"))
654
655         # configure SRv6 localSID End.X without PSP behavior
656         # End.X points to interface pg1
657         localsid = VppSRv6LocalSID(
658                         self, localsid_addr='A3::C4',
659                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
660                         nh_addr=self.pg1.remote_ip6,
661                         end_psp=0,
662                         sw_if_index=self.pg1.sw_if_index,
663                         vlan_index=0,
664                         fib_table=0)
665         localsid.add_vpp_config()
666         # log the localsids
667         self.logger.debug(self.vapi.cli("show sr localsid"))
668
669         # create IPv6 packets with SRH (SL=2, SL=1)
670         # send one packet per SL value per packet size
671         # SL=0 packet with localSID End with PSP is dropped
672         count = len(self.pg_packet_sizes)
673         dst_inner = 'a4::1234'
674         pkts = []
675
676         # packets with segments-left 2, active segment a3::c4
677         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
678                     dst_inner,
679                     sidlist=['a5::', 'a4::', 'a3::c4'],
680                     segleft=2)
681         # create traffic stream pg0->pg1
682         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
683                                        self.pg_packet_sizes, count))
684
685         # packets with segments-left 1, active segment a3::c4
686         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
687                     dst_inner,
688                     sidlist=['a4::', 'a3::c4', 'a2::'],
689                     segleft=1)
690         # add to traffic stream pg0->pg1
691         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
692                                        self.pg_packet_sizes, count))
693
694         # send packets and verify received packets
695         # using same comparison function as End (no PSP)
696         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
697                                   self.compare_rx_tx_packet_End)
698
699         # assert nothing was received on the other interface (pg2)
700         self.pg2.assert_nothing_captured("mis-directed packet(s)")
701
702         # log the localsid counters
703         self.logger.info(self.vapi.cli("show sr localsid"))
704
705         # remove SRv6 localSIDs
706         localsid.remove_vpp_config()
707
708         # remove FIB entries
709         # done by tearDown
710
711         # cleanup interfaces
712         self.teardown_interfaces()
713
714     def test_SRv6_End_X_with_PSP(self):
715         """ Test SRv6 End.X with PSP behavior.
716         """
717         # create three interfaces (1 source, 2 destinations)
718         # source and destination interfaces are IPv6 only
719         self.setup_interfaces(ipv6=[True, True, True])
720
721         # configure FIB entries
722         # a4::/64 via pg1 and pg2
723         route = VppIpRoute(self, "a4::", 64,
724                            [VppRoutePath(self.pg1.remote_ip6,
725                                          self.pg1.sw_if_index,
726                                          proto=DpoProto.DPO_PROTO_IP6),
727                             VppRoutePath(self.pg2.remote_ip6,
728                                          self.pg2.sw_if_index,
729                                          proto=DpoProto.DPO_PROTO_IP6)],
730                            is_ip6=1)
731         route.add_vpp_config()
732
733         # configure SRv6 localSID End with PSP behavior
734         localsid = VppSRv6LocalSID(
735                         self, localsid_addr='A3::C4',
736                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
737                         nh_addr=self.pg1.remote_ip6,
738                         end_psp=1,
739                         sw_if_index=self.pg1.sw_if_index,
740                         vlan_index=0,
741                         fib_table=0)
742         localsid.add_vpp_config()
743         # log the localsids
744         self.logger.debug(self.vapi.cli("show sr localsid"))
745
746         # create IPv6 packets with SRH (SL=2, SL=1)
747         # send one packet per SL value per packet size
748         # SL=0 packet with localSID End with PSP is dropped
749         count = len(self.pg_packet_sizes)
750         dst_inner = 'a4::1234'
751         pkts = []
752
753         # packets with segments-left 2, active segment a3::
754         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
755                     dst_inner,
756                     sidlist=['a5::', 'a4::', 'a3::c4'],
757                     segleft=2)
758         # create traffic stream pg0->pg1
759         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
760                                        self.pg_packet_sizes, count))
761
762         # packets with segments-left 1, active segment a3::
763         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
764                     dst_inner,
765                     sidlist=['a4::', 'a3::c4', 'a2::'],
766                     segleft=1)
767         # add to traffic stream pg0->pg1
768         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
769                                        self.pg_packet_sizes, count))
770
771         # send packets and verify received packets
772         # using same comparison function as End with PSP
773         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
774                                   self.compare_rx_tx_packet_End_PSP)
775
776         # assert nothing was received on the other interface (pg2)
777         self.pg2.assert_nothing_captured("mis-directed packet(s)")
778
779         # log the localsid counters
780         self.logger.info(self.vapi.cli("show sr localsid"))
781
782         # remove SRv6 localSIDs
783         localsid.remove_vpp_config()
784
785         # remove FIB entries
786         # done by tearDown
787
788         # cleanup interfaces
789         self.teardown_interfaces()
790
791     def test_SRv6_End_DX6(self):
792         """ Test SRv6 End.DX6 behavior.
793         """
794         # send traffic to one destination interface
795         # source and destination interfaces are IPv6 only
796         self.setup_interfaces(ipv6=[True, True])
797
798         # configure SRv6 localSID End.DX6 behavior
799         localsid = VppSRv6LocalSID(
800                         self, localsid_addr='a3::c4',
801                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX6,
802                         nh_addr=self.pg1.remote_ip6,
803                         end_psp=0,
804                         sw_if_index=self.pg1.sw_if_index,
805                         vlan_index=0,
806                         fib_table=0)
807         localsid.add_vpp_config()
808         # log the localsids
809         self.logger.debug(self.vapi.cli("show sr localsid"))
810
811         # create IPv6 packets with SRH (SL=0)
812         # send one packet per packet size
813         count = len(self.pg_packet_sizes)
814         dst_inner = 'a4::1234'  # inner header destination address
815         pkts = []
816
817         # packets with SRH, segments-left 0, active segment a3::c4
818         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
819                         dst_inner,
820                         sidlist=['a3::c4', 'a2::', 'a1::'],
821                         segleft=0)
822         # add to traffic stream pg0->pg1
823         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
824                                        self.pg_packet_sizes, count))
825
826         # packets without SRH, IPv6 in IPv6
827         # outer IPv6 dest addr is the localsid End.DX6
828         packet_header = self.create_packet_header_IPv6_IPv6(
829                                             dst_inner,
830                                             dst_outer='a3::c4')
831         # add to traffic stream pg0->pg1
832         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
833                                        self.pg_packet_sizes, count))
834
835         # send packets and verify received packets
836         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
837                                   self.compare_rx_tx_packet_End_DX6)
838
839         # log the localsid counters
840         self.logger.info(self.vapi.cli("show sr localsid"))
841
842         # remove SRv6 localSIDs
843         localsid.remove_vpp_config()
844
845         # cleanup interfaces
846         self.teardown_interfaces()
847
848     def test_SRv6_End_DT6(self):
849         """ Test SRv6 End.DT6 behavior.
850         """
851         # create three interfaces (1 source, 2 destinations)
852         # all interfaces are IPv6 only
853         # source interface in global FIB (0)
854         # destination interfaces in global and vrf
855         vrf_1 = 1
856         self.setup_interfaces(ipv6=[True, True, True],
857                               ipv6_table_id=[0, 0, vrf_1])
858
859         # configure FIB entries
860         # a4::/64 is reachable
861         #     via pg1 in table 0 (global)
862         #     and via pg2 in table vrf_1
863         route0 = VppIpRoute(self, "a4::", 64,
864                             [VppRoutePath(self.pg1.remote_ip6,
865                                           self.pg1.sw_if_index,
866                                           proto=DpoProto.DPO_PROTO_IP6,
867                                           nh_table_id=0)],
868                             table_id=0,
869                             is_ip6=1)
870         route0.add_vpp_config()
871         route1 = VppIpRoute(self, "a4::", 64,
872                             [VppRoutePath(self.pg2.remote_ip6,
873                                           self.pg2.sw_if_index,
874                                           proto=DpoProto.DPO_PROTO_IP6,
875                                           nh_table_id=vrf_1)],
876                             table_id=vrf_1,
877                             is_ip6=1)
878         route1.add_vpp_config()
879         self.logger.debug(self.vapi.cli("show ip6 fib"))
880
881         # configure SRv6 localSID End.DT6 behavior
882         # Note:
883         # fib_table: where the localsid is installed
884         # sw_if_index: in T-variants of localsid this is the vrf table_id
885         localsid = VppSRv6LocalSID(
886                         self, localsid_addr='a3::c4',
887                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT6,
888                         nh_addr='::',
889                         end_psp=0,
890                         sw_if_index=vrf_1,
891                         vlan_index=0,
892                         fib_table=0)
893         localsid.add_vpp_config()
894         # log the localsids
895         self.logger.debug(self.vapi.cli("show sr localsid"))
896
897         # create IPv6 packets with SRH (SL=0)
898         # send one packet per packet size
899         count = len(self.pg_packet_sizes)
900         dst_inner = 'a4::1234'  # inner header destination address
901         pkts = []
902
903         # packets with SRH, segments-left 0, active segment a3::c4
904         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
905                         dst_inner,
906                         sidlist=['a3::c4', 'a2::', 'a1::'],
907                         segleft=0)
908         # add to traffic stream pg0->pg1
909         pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
910                                        self.pg_packet_sizes, count))
911
912         # packets without SRH, IPv6 in IPv6
913         # outer IPv6 dest addr is the localsid End.DT6
914         packet_header = self.create_packet_header_IPv6_IPv6(
915                                             dst_inner,
916                                             dst_outer='a3::c4')
917         # add to traffic stream pg0->pg1
918         pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
919                                        self.pg_packet_sizes, count))
920
921         # send packets and verify received packets
922         # using same comparison function as End.DX6
923         self.send_and_verify_pkts(self.pg0, pkts, self.pg2,
924                                   self.compare_rx_tx_packet_End_DX6)
925
926         # assert nothing was received on the other interface (pg2)
927         self.pg1.assert_nothing_captured("mis-directed packet(s)")
928
929         # log the localsid counters
930         self.logger.info(self.vapi.cli("show sr localsid"))
931
932         # remove SRv6 localSIDs
933         localsid.remove_vpp_config()
934
935         # remove FIB entries
936         # done by tearDown
937
938         # cleanup interfaces
939         self.teardown_interfaces()
940
941     def test_SRv6_End_DX4(self):
942         """ Test SRv6 End.DX4 behavior.
943         """
944         # send traffic to one destination interface
945         # source interface is IPv6 only
946         # destination interface is IPv4 only
947         self.setup_interfaces(ipv6=[True, False], ipv4=[False, True])
948
949         # configure SRv6 localSID End.DX4 behavior
950         localsid = VppSRv6LocalSID(
951                         self, localsid_addr='a3::c4',
952                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX4,
953                         nh_addr=self.pg1.remote_ip4,
954                         end_psp=0,
955                         sw_if_index=self.pg1.sw_if_index,
956                         vlan_index=0,
957                         fib_table=0)
958         localsid.add_vpp_config()
959         # log the localsids
960         self.logger.debug(self.vapi.cli("show sr localsid"))
961
962         # send one packet per packet size
963         count = len(self.pg_packet_sizes)
964         dst_inner = '4.1.1.123'  # inner header destination address
965         pkts = []
966
967         # packets with SRH, segments-left 0, active segment a3::c4
968         packet_header = self.create_packet_header_IPv6_SRH_IPv4(
969                         dst_inner,
970                         sidlist=['a3::c4', 'a2::', 'a1::'],
971                         segleft=0)
972         # add to traffic stream pg0->pg1
973         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
974                                        self.pg_packet_sizes, count))
975
976         # packets without SRH, IPv4 in IPv6
977         # outer IPv6 dest addr is the localsid End.DX4
978         packet_header = self.create_packet_header_IPv6_IPv4(
979                                             dst_inner,
980                                             dst_outer='a3::c4')
981         # add to traffic stream pg0->pg1
982         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
983                                        self.pg_packet_sizes, count))
984
985         # send packets and verify received packets
986         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
987                                   self.compare_rx_tx_packet_End_DX4)
988
989         # log the localsid counters
990         self.logger.info(self.vapi.cli("show sr localsid"))
991
992         # remove SRv6 localSIDs
993         localsid.remove_vpp_config()
994
995         # cleanup interfaces
996         self.teardown_interfaces()
997
998     def test_SRv6_End_DT4(self):
999         """ Test SRv6 End.DT4 behavior.
1000         """
1001         # create three interfaces (1 source, 2 destinations)
1002         # source interface is IPv6-only
1003         # destination interfaces are IPv4 only
1004         # source interface in global FIB (0)
1005         # destination interfaces in global and vrf
1006         vrf_1 = 1
1007         self.setup_interfaces(ipv6=[True, False, False],
1008                               ipv4=[False, True, True],
1009                               ipv6_table_id=[0, 0, 0],
1010                               ipv4_table_id=[0, 0, vrf_1])
1011
1012         # configure FIB entries
1013         # 4.1.1.0/24 is reachable
1014         #     via pg1 in table 0 (global)
1015         #     and via pg2 in table vrf_1
1016         route0 = VppIpRoute(self, "4.1.1.0", 24,
1017                             [VppRoutePath(self.pg1.remote_ip4,
1018                                           self.pg1.sw_if_index,
1019                                           nh_table_id=0)],
1020                             table_id=0,
1021                             is_ip6=0)
1022         route0.add_vpp_config()
1023         route1 = VppIpRoute(self, "4.1.1.0", 24,
1024                             [VppRoutePath(self.pg2.remote_ip4,
1025                                           self.pg2.sw_if_index,
1026                                           nh_table_id=vrf_1)],
1027                             table_id=vrf_1,
1028                             is_ip6=0)
1029         route1.add_vpp_config()
1030         self.logger.debug(self.vapi.cli("show ip fib"))
1031
1032         # configure SRv6 localSID End.DT6 behavior
1033         # Note:
1034         # fib_table: where the localsid is installed
1035         # sw_if_index: in T-variants of localsid: vrf table_id
1036         localsid = VppSRv6LocalSID(
1037                         self, localsid_addr='a3::c4',
1038                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT4,
1039                         nh_addr='::',
1040                         end_psp=0,
1041                         sw_if_index=vrf_1,
1042                         vlan_index=0,
1043                         fib_table=0)
1044         localsid.add_vpp_config()
1045         # log the localsids
1046         self.logger.debug(self.vapi.cli("show sr localsid"))
1047
1048         # create IPv6 packets with SRH (SL=0)
1049         # send one packet per packet size
1050         count = len(self.pg_packet_sizes)
1051         dst_inner = '4.1.1.123'  # inner header destination address
1052         pkts = []
1053
1054         # packets with SRH, segments-left 0, active segment a3::c4
1055         packet_header = self.create_packet_header_IPv6_SRH_IPv4(
1056                         dst_inner,
1057                         sidlist=['a3::c4', 'a2::', 'a1::'],
1058                         segleft=0)
1059         # add to traffic stream pg0->pg1
1060         pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
1061                                        self.pg_packet_sizes, count))
1062
1063         # packets without SRH, IPv6 in IPv6
1064         # outer IPv6 dest addr is the localsid End.DX4
1065         packet_header = self.create_packet_header_IPv6_IPv4(
1066                                             dst_inner,
1067                                             dst_outer='a3::c4')
1068         # add to traffic stream pg0->pg1
1069         pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
1070                                        self.pg_packet_sizes, count))
1071
1072         # send packets and verify received packets
1073         # using same comparison function as End.DX4
1074         self.send_and_verify_pkts(self.pg0, pkts, self.pg2,
1075                                   self.compare_rx_tx_packet_End_DX4)
1076
1077         # assert nothing was received on the other interface (pg2)
1078         self.pg1.assert_nothing_captured("mis-directed packet(s)")
1079
1080         # log the localsid counters
1081         self.logger.info(self.vapi.cli("show sr localsid"))
1082
1083         # remove SRv6 localSIDs
1084         localsid.remove_vpp_config()
1085
1086         # remove FIB entries
1087         # done by tearDown
1088
1089         # cleanup interfaces
1090         self.teardown_interfaces()
1091
1092     def test_SRv6_End_DX2(self):
1093         """ Test SRv6 End.DX2 behavior.
1094         """
1095         # send traffic to one destination interface
1096         # source interface is IPv6 only
1097         self.setup_interfaces(ipv6=[True, False], ipv4=[False, False])
1098
1099         # configure SRv6 localSID End.DX2 behavior
1100         localsid = VppSRv6LocalSID(
1101                         self, localsid_addr='a3::c4',
1102                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX2,
1103                         nh_addr='::',
1104                         end_psp=0,
1105                         sw_if_index=self.pg1.sw_if_index,
1106                         vlan_index=0,
1107                         fib_table=0)
1108         localsid.add_vpp_config()
1109         # log the localsids
1110         self.logger.debug(self.vapi.cli("show sr localsid"))
1111
1112         # send one packet per packet size
1113         count = len(self.pg_packet_sizes)
1114         pkts = []
1115
1116         # packets with SRH, segments-left 0, active segment a3::c4
1117         # L2 has no dot1q header
1118         packet_header = self.create_packet_header_IPv6_SRH_L2(
1119                             sidlist=['a3::c4', 'a2::', 'a1::'],
1120                             segleft=0,
1121                             vlan=0)
1122         # add to traffic stream pg0->pg1
1123         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1124                                        self.pg_packet_sizes, count))
1125
1126         # packets with SRH, segments-left 0, active segment a3::c4
1127         # L2 has dot1q header
1128         packet_header = self.create_packet_header_IPv6_SRH_L2(
1129                             sidlist=['a3::c4', 'a2::', 'a1::'],
1130                             segleft=0,
1131                             vlan=123)
1132         # add to traffic stream pg0->pg1
1133         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1134                                        self.pg_packet_sizes, count))
1135
1136         # packets without SRH, L2 in IPv6
1137         # outer IPv6 dest addr is the localsid End.DX2
1138         # L2 has no dot1q header
1139         packet_header = self.create_packet_header_IPv6_L2(
1140                                             dst_outer='a3::c4',
1141                                             vlan=0)
1142         # add to traffic stream pg0->pg1
1143         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1144                                        self.pg_packet_sizes, count))
1145
1146         # packets without SRH, L2 in IPv6
1147         # outer IPv6 dest addr is the localsid End.DX2
1148         # L2 has dot1q header
1149         packet_header = self.create_packet_header_IPv6_L2(
1150                                             dst_outer='a3::c4',
1151                                             vlan=123)
1152         # add to traffic stream pg0->pg1
1153         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1154                                        self.pg_packet_sizes, count))
1155
1156         # send packets and verify received packets
1157         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
1158                                   self.compare_rx_tx_packet_End_DX2)
1159
1160         # log the localsid counters
1161         self.logger.info(self.vapi.cli("show sr localsid"))
1162
1163         # remove SRv6 localSIDs
1164         localsid.remove_vpp_config()
1165
1166         # cleanup interfaces
1167         self.teardown_interfaces()
1168
1169     def test_SRv6_T_Insert_Classifier(self):
1170         """ Test SRv6 Transit.Insert behavior (IPv6 only).
1171             steer packets using the classifier
1172         """
1173         # send traffic to one destination interface
1174         # source and destination are IPv6 only
1175         self.setup_interfaces(ipv6=[False, False, False, True, True])
1176
1177         # configure FIB entries
1178         route = VppIpRoute(self, "a4::", 64,
1179                            [VppRoutePath(self.pg4.remote_ip6,
1180                                          self.pg4.sw_if_index,
1181                                          proto=DpoProto.DPO_PROTO_IP6)],
1182                            is_ip6=1)
1183         route.add_vpp_config()
1184
1185         # configure encaps IPv6 source address
1186         # needs to be done before SR Policy config
1187         # TODO: API?
1188         self.vapi.cli("set sr encaps source addr a3::")
1189
1190         bsid = 'a3::9999:1'
1191         # configure SRv6 Policy
1192         # Note: segment list order: first -> last
1193         sr_policy = VppSRv6Policy(
1194             self, bsid=bsid,
1195             is_encap=0,
1196             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
1197             weight=1, fib_table=0,
1198             segments=['a4::', 'a5::', 'a6::c7'],
1199             source='a3::')
1200         sr_policy.add_vpp_config()
1201         self.sr_policy = sr_policy
1202
1203         # log the sr policies
1204         self.logger.info(self.vapi.cli("show sr policies"))
1205
1206         # add classify table
1207         # mask on dst ip address prefix a7::/8
1208         mask = '{:0<16}'.format('ff')
1209         r = self.vapi.classify_add_del_table(
1210             1,
1211             binascii.unhexlify(mask),
1212             match_n_vectors=(len(mask) - 1) // 32 + 1,
1213             current_data_flag=1,
1214             skip_n_vectors=2)  # data offset
1215         self.assertIsNotNone(r, msg='No response msg for add_del_table')
1216         table_index = r.new_table_index
1217
1218         # add the source routign node as a ip6 inacl netxt node
1219         r = self.vapi.add_node_next('ip6-inacl',
1220                                     'sr-pl-rewrite-insert')
1221         inacl_next_node_index = r.node_index
1222
1223         match = '{:0<16}'.format('a7')
1224         r = self.vapi.classify_add_del_session(
1225             1,
1226             table_index,
1227             binascii.unhexlify(match),
1228             hit_next_index=inacl_next_node_index,
1229             action=3,
1230             metadata=0)  # sr policy index
1231         self.assertIsNotNone(r, msg='No response msg for add_del_session')
1232
1233         # log the classify table used in the steering policy
1234         self.logger.info(self.vapi.cli("show classify table"))
1235
1236         r = self.vapi.input_acl_set_interface(
1237             is_add=1,
1238             sw_if_index=self.pg3.sw_if_index,
1239             ip6_table_index=table_index)
1240         self.assertIsNotNone(r,
1241                              msg='No response msg for input_acl_set_interface')
1242
1243         # log the ip6 inacl
1244         self.logger.info(self.vapi.cli("show inacl type ip6"))
1245
1246         # create packets
1247         count = len(self.pg_packet_sizes)
1248         dst_inner = 'a7::1234'
1249         pkts = []
1250
1251         # create IPv6 packets without SRH
1252         packet_header = self.create_packet_header_IPv6(dst_inner)
1253         # create traffic stream pg3->pg4
1254         pkts.extend(self.create_stream(self.pg3, self.pg4, packet_header,
1255                                        self.pg_packet_sizes, count))
1256
1257         # create IPv6 packets with SRH
1258         # packets with segments-left 1, active segment a7::
1259         packet_header = self.create_packet_header_IPv6_SRH(
1260             sidlist=['a8::', 'a7::', 'a6::'],
1261             segleft=1)
1262         # create traffic stream pg3->pg4
1263         pkts.extend(self.create_stream(self.pg3, self.pg4, packet_header,
1264                                        self.pg_packet_sizes, count))
1265
1266         # send packets and verify received packets
1267         self.send_and_verify_pkts(self.pg3, pkts, self.pg4,
1268                                   self.compare_rx_tx_packet_T_Insert)
1269
1270         # remove the interface l2 input feature
1271         r = self.vapi.input_acl_set_interface(
1272             is_add=0,
1273             sw_if_index=self.pg3.sw_if_index,
1274             ip6_table_index=table_index)
1275         self.assertIsNotNone(r,
1276                              msg='No response msg for input_acl_set_interface')
1277
1278         # log the ip6 inacl after cleaning
1279         self.logger.info(self.vapi.cli("show inacl type ip6"))
1280
1281         # log the localsid counters
1282         self.logger.info(self.vapi.cli("show sr localsid"))
1283
1284         # remove classifier SR steering
1285         # classifier_steering.remove_vpp_config()
1286         self.logger.info(self.vapi.cli("show sr steering policies"))
1287
1288         # remove SR Policies
1289         self.sr_policy.remove_vpp_config()
1290         self.logger.info(self.vapi.cli("show sr policies"))
1291
1292         # remove classify session and table
1293         r = self.vapi.classify_add_del_session(
1294             0,
1295             table_index,
1296             binascii.unhexlify(match))
1297         self.assertIsNotNone(r, msg='No response msg for add_del_session')
1298
1299         r = self.vapi.classify_add_del_table(
1300             0,
1301             binascii.unhexlify(mask),
1302             table_index=table_index)
1303         self.assertIsNotNone(r, msg='No response msg for add_del_table')
1304
1305         self.logger.info(self.vapi.cli("show classify table"))
1306
1307         # remove FIB entries
1308         # done by tearDown
1309
1310         # cleanup interfaces
1311         self.teardown_interfaces()
1312
1313     def compare_rx_tx_packet_T_Encaps(self, tx_pkt, rx_pkt):
1314         """ Compare input and output packet after passing T.Encaps
1315
1316         :param tx_pkt: transmitted packet
1317         :param rx_pkt: received packet
1318         """
1319         # T.Encaps updates the headers as follows:
1320         # SR Policy seglist (S3, S2, S1)
1321         # SR Policy source C
1322         # IPv6:
1323         # in: IPv6(A, B2)
1324         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(A, B2)
1325         # IPv6 + SRH:
1326         # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1)
1327         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(a, B2)SRH(B3, B2, B1; SL=1)
1328
1329         # get first (outer) IPv6 header of rx'ed packet
1330         rx_ip = rx_pkt.getlayer(IPv6)
1331         rx_srh = None
1332
1333         tx_ip = tx_pkt.getlayer(IPv6)
1334
1335         # expected segment-list
1336         seglist = self.sr_policy.segments
1337         # reverse list to get order as in SRH
1338         tx_seglist = seglist[::-1]
1339
1340         # get source address of SR Policy
1341         sr_policy_source = self.sr_policy.source
1342
1343         # rx'ed packet should have SRH
1344         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1345         # get SRH
1346         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1347
1348         # received ip.src should be equal to SR Policy source
1349         self.assertEqual(rx_ip.src, sr_policy_source)
1350         # received ip.dst should be equal to expected sidlist[lastentry]
1351         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1352         # rx'ed seglist should be equal to expected seglist
1353         self.assertEqual(rx_srh.addresses, tx_seglist)
1354         # segleft should be equal to size expected seglist-1
1355         self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1356         # segleft should be equal to lastentry
1357         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1358
1359         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1360         # except for the hop-limit field
1361         #   -> update tx'ed hlim to the expected hlim
1362         tx_ip.hlim = tx_ip.hlim - 1
1363
1364         self.assertEqual(rx_srh.payload, tx_ip)
1365
1366         self.logger.debug("packet verification: SUCCESS")
1367
1368     def compare_rx_tx_packet_T_Encaps_IPv4(self, tx_pkt, rx_pkt):
1369         """ Compare input and output packet after passing T.Encaps for IPv4
1370
1371         :param tx_pkt: transmitted packet
1372         :param rx_pkt: received packet
1373         """
1374         # T.Encaps for IPv4 updates the headers as follows:
1375         # SR Policy seglist (S3, S2, S1)
1376         # SR Policy source C
1377         # IPv4:
1378         # in: IPv4(A, B2)
1379         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv4(A, B2)
1380
1381         # get first (outer) IPv6 header of rx'ed packet
1382         rx_ip = rx_pkt.getlayer(IPv6)
1383         rx_srh = None
1384
1385         tx_ip = tx_pkt.getlayer(IP)
1386
1387         # expected segment-list
1388         seglist = self.sr_policy.segments
1389         # reverse list to get order as in SRH
1390         tx_seglist = seglist[::-1]
1391
1392         # get source address of SR Policy
1393         sr_policy_source = self.sr_policy.source
1394
1395         # checks common to cases tx with and without SRH
1396         # rx'ed packet should have SRH and IPv4 header
1397         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1398         self.assertTrue(rx_ip.payload.haslayer(IP))
1399         # get SRH
1400         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1401
1402         # received ip.src should be equal to SR Policy source
1403         self.assertEqual(rx_ip.src, sr_policy_source)
1404         # received ip.dst should be equal to sidlist[lastentry]
1405         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1406         # rx'ed seglist should be equal to seglist
1407         self.assertEqual(rx_srh.addresses, tx_seglist)
1408         # segleft should be equal to size seglist-1
1409         self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1410         # segleft should be equal to lastentry
1411         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1412
1413         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1414         # except for the ttl field and ip checksum
1415         #   -> adjust tx'ed ttl to expected ttl
1416         tx_ip.ttl = tx_ip.ttl - 1
1417         #   -> set tx'ed ip checksum to None and let scapy recompute
1418         tx_ip.chksum = None
1419         # read back the pkt (with str()) to force computing these fields
1420         # probably other ways to accomplish this are possible
1421         tx_ip = IP(str(tx_ip))
1422
1423         self.assertEqual(rx_srh.payload, tx_ip)
1424
1425         self.logger.debug("packet verification: SUCCESS")
1426
1427     def compare_rx_tx_packet_T_Encaps_L2(self, tx_pkt, rx_pkt):
1428         """ Compare input and output packet after passing T.Encaps for L2
1429
1430         :param tx_pkt: transmitted packet
1431         :param rx_pkt: received packet
1432         """
1433         # T.Encaps for L2 updates the headers as follows:
1434         # SR Policy seglist (S3, S2, S1)
1435         # SR Policy source C
1436         # L2:
1437         # in: L2
1438         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)L2
1439
1440         # get first (outer) IPv6 header of rx'ed packet
1441         rx_ip = rx_pkt.getlayer(IPv6)
1442         rx_srh = None
1443
1444         tx_ether = tx_pkt.getlayer(Ether)
1445
1446         # expected segment-list
1447         seglist = self.sr_policy.segments
1448         # reverse list to get order as in SRH
1449         tx_seglist = seglist[::-1]
1450
1451         # get source address of SR Policy
1452         sr_policy_source = self.sr_policy.source
1453
1454         # rx'ed packet should have SRH
1455         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1456         # get SRH
1457         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1458
1459         # received ip.src should be equal to SR Policy source
1460         self.assertEqual(rx_ip.src, sr_policy_source)
1461         # received ip.dst should be equal to sidlist[lastentry]
1462         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1463         # rx'ed seglist should be equal to seglist
1464         self.assertEqual(rx_srh.addresses, tx_seglist)
1465         # segleft should be equal to size seglist-1
1466         self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1467         # segleft should be equal to lastentry
1468         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1469         # nh should be "No Next Header" (59)
1470         self.assertEqual(rx_srh.nh, 59)
1471
1472         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1473         self.assertEqual(Ether(str(rx_srh.payload)), tx_ether)
1474
1475         self.logger.debug("packet verification: SUCCESS")
1476
1477     def compare_rx_tx_packet_T_Insert(self, tx_pkt, rx_pkt):
1478         """ Compare input and output packet after passing T.Insert
1479
1480         :param tx_pkt: transmitted packet
1481         :param rx_pkt: received packet
1482         """
1483         # T.Insert updates the headers as follows:
1484         # IPv6:
1485         # in: IPv6(A, B2)
1486         # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)
1487         # IPv6 + SRH:
1488         # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1)
1489         # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)SRH(B3, B2, B1; SL=1)
1490
1491         # get first (outer) IPv6 header of rx'ed packet
1492         rx_ip = rx_pkt.getlayer(IPv6)
1493         rx_srh = None
1494         rx_ip2 = None
1495         rx_srh2 = None
1496         rx_ip3 = None
1497         rx_udp = rx_pkt[UDP]
1498
1499         tx_ip = tx_pkt.getlayer(IPv6)
1500         tx_srh = None
1501         tx_ip2 = None
1502         # some packets have been tx'ed with an SRH, some without it
1503         # get SRH if tx'ed packet has it
1504         if tx_pkt.haslayer(IPv6ExtHdrSegmentRouting):
1505             tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1506             tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1507         tx_udp = tx_pkt[UDP]
1508
1509         # expected segment-list (make copy of SR Policy segment list)
1510         seglist = self.sr_policy.segments[:]
1511         # expected seglist has initial dest addr as last segment
1512         seglist.append(tx_ip.dst)
1513         # reverse list to get order as in SRH
1514         tx_seglist = seglist[::-1]
1515
1516         # get source address of SR Policy
1517         sr_policy_source = self.sr_policy.source
1518
1519         # checks common to cases tx with and without SRH
1520         # rx'ed packet should have SRH and only one IPv6 header
1521         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1522         self.assertFalse(rx_ip.payload.haslayer(IPv6))
1523         # get SRH
1524         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1525
1526         # rx'ed ip.src should be equal to tx'ed ip.src
1527         self.assertEqual(rx_ip.src, tx_ip.src)
1528         # rx'ed ip.dst should be equal to sidlist[lastentry]
1529         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1530
1531         # rx'ed seglist should be equal to expected seglist
1532         self.assertEqual(rx_srh.addresses, tx_seglist)
1533         # segleft should be equal to size(expected seglist)-1
1534         self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1535         # segleft should be equal to lastentry
1536         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1537
1538         if tx_srh:  # packet was tx'ed with SRH
1539             # packet should have 2nd SRH
1540             self.assertTrue(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting))
1541             # get 2nd SRH
1542             rx_srh2 = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting, 2)
1543
1544             # rx'ed srh2.addresses should be equal to tx'ed srh.addresses
1545             self.assertEqual(rx_srh2.addresses, tx_srh.addresses)
1546             # rx'ed srh2.segleft should be equal to tx'ed srh.segleft
1547             self.assertEqual(rx_srh2.segleft, tx_srh.segleft)
1548             # rx'ed srh2.lastentry should be equal to tx'ed srh.lastentry
1549             self.assertEqual(rx_srh2.lastentry, tx_srh.lastentry)
1550
1551         else:  # packet was tx'ed without SRH
1552             # rx packet should have no other SRH
1553             self.assertFalse(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting))
1554
1555         # UDP layer should be unchanged
1556         self.assertEqual(rx_udp, tx_udp)
1557
1558         self.logger.debug("packet verification: SUCCESS")
1559
1560     def compare_rx_tx_packet_End(self, tx_pkt, rx_pkt):
1561         """ Compare input and output packet after passing End (without PSP)
1562
1563         :param tx_pkt: transmitted packet
1564         :param rx_pkt: received packet
1565         """
1566         # End (no PSP) updates the headers as follows:
1567         # IPv6 + SRH:
1568         # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2)
1569         # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1570
1571         # get first (outer) IPv6 header of rx'ed packet
1572         rx_ip = rx_pkt.getlayer(IPv6)
1573         rx_srh = None
1574         rx_ip2 = None
1575         rx_udp = rx_pkt[UDP]
1576
1577         tx_ip = tx_pkt.getlayer(IPv6)
1578         # we know the packet has been tx'ed
1579         # with an inner IPv6 header and an SRH
1580         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1581         tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1582         tx_udp = tx_pkt[UDP]
1583
1584         # common checks, regardless of tx segleft value
1585         # rx'ed packet should have 2nd IPv6 header
1586         self.assertTrue(rx_ip.payload.haslayer(IPv6))
1587         # get second (inner) IPv6 header
1588         rx_ip2 = rx_pkt.getlayer(IPv6, 2)
1589
1590         if tx_ip.segleft > 0:
1591             # SRH should NOT have been popped:
1592             #   End SID without PSP does not pop SRH if segleft>0
1593             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1594             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1595
1596             # received ip.src should be equal to expected ip.src
1597             self.assertEqual(rx_ip.src, tx_ip.src)
1598             # sidlist should be unchanged
1599             self.assertEqual(rx_srh.addresses, tx_srh.addresses)
1600             # segleft should have been decremented
1601             self.assertEqual(rx_srh.segleft, tx_srh.segleft-1)
1602             # received ip.dst should be equal to sidlist[segleft]
1603             self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
1604             # lastentry should be unchanged
1605             self.assertEqual(rx_srh.lastentry, tx_srh.lastentry)
1606             # inner IPv6 packet (ip2) should be unchanged
1607             self.assertEqual(rx_ip2.src, tx_ip2.src)
1608             self.assertEqual(rx_ip2.dst, tx_ip2.dst)
1609         # else:  # tx_ip.segleft == 0
1610             # TODO: Does this work with 2 SRHs in ingress packet?
1611
1612         # UDP layer should be unchanged
1613         self.assertEqual(rx_udp, tx_udp)
1614
1615         self.logger.debug("packet verification: SUCCESS")
1616
1617     def compare_rx_tx_packet_End_PSP(self, tx_pkt, rx_pkt):
1618         """ Compare input and output packet after passing End with PSP
1619
1620         :param tx_pkt: transmitted packet
1621         :param rx_pkt: received packet
1622         """
1623         # End (PSP) updates the headers as follows:
1624         # IPv6 + SRH (SL>1):
1625         # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2)
1626         # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1627         # IPv6 + SRH (SL=1):
1628         # in: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1629         # out: IPv6(A, S3)
1630
1631         # get first (outer) IPv6 header of rx'ed packet
1632         rx_ip = rx_pkt.getlayer(IPv6)
1633         rx_srh = None
1634         rx_ip2 = None
1635         rx_udp = rx_pkt[UDP]
1636
1637         tx_ip = tx_pkt.getlayer(IPv6)
1638         # we know the packet has been tx'ed
1639         # with an inner IPv6 header and an SRH
1640         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1641         tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1642         tx_udp = tx_pkt[UDP]
1643
1644         # common checks, regardless of tx segleft value
1645         self.assertTrue(rx_ip.payload.haslayer(IPv6))
1646         rx_ip2 = rx_pkt.getlayer(IPv6, 2)
1647         # inner IPv6 packet (ip2) should be unchanged
1648         self.assertEqual(rx_ip2.src, tx_ip2.src)
1649         self.assertEqual(rx_ip2.dst, tx_ip2.dst)
1650
1651         if tx_ip.segleft > 1:
1652             # SRH should NOT have been popped:
1653             #   End SID with PSP does not pop SRH if segleft>1
1654             # rx'ed packet should have SRH
1655             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1656             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1657
1658             # received ip.src should be equal to expected ip.src
1659             self.assertEqual(rx_ip.src, tx_ip.src)
1660             # sidlist should be unchanged
1661             self.assertEqual(rx_srh.addresses, tx_srh.addresses)
1662             # segleft should have been decremented
1663             self.assertEqual(rx_srh.segleft, tx_srh.segleft-1)
1664             # received ip.dst should be equal to sidlist[segleft]
1665             self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
1666             # lastentry should be unchanged
1667             self.assertEqual(rx_srh.lastentry, tx_srh.lastentry)
1668
1669         else:  # tx_ip.segleft <= 1
1670             # SRH should have been popped:
1671             #   End SID with PSP and segleft=1 pops SRH
1672             # the two IPv6 headers are still present
1673             # outer IPv6 header has DA == last segment of popped SRH
1674             # SRH should not be present
1675             self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1676             # outer IPv6 header ip.src should be equal to tx'ed ip.src
1677             self.assertEqual(rx_ip.src, tx_ip.src)
1678             # outer IPv6 header ip.dst should be = to tx'ed sidlist[segleft-1]
1679             self.assertEqual(rx_ip.dst, tx_srh.addresses[tx_srh.segleft-1])
1680
1681         # UDP layer should be unchanged
1682         self.assertEqual(rx_udp, tx_udp)
1683
1684         self.logger.debug("packet verification: SUCCESS")
1685
1686     def compare_rx_tx_packet_End_DX6(self, tx_pkt, rx_pkt):
1687         """ Compare input and output packet after passing End.DX6
1688
1689         :param tx_pkt: transmitted packet
1690         :param rx_pkt: received packet
1691         """
1692         # End.DX6 updates the headers as follows:
1693         # IPv6 + SRH (SL=0):
1694         # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv6(B, D)
1695         # out: IPv6(B, D)
1696         # IPv6:
1697         # in: IPv6(A, S3)IPv6(B, D)
1698         # out: IPv6(B, D)
1699
1700         # get first (outer) IPv6 header of rx'ed packet
1701         rx_ip = rx_pkt.getlayer(IPv6)
1702
1703         tx_ip = tx_pkt.getlayer(IPv6)
1704         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1705
1706         # verify if rx'ed packet has no SRH
1707         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1708
1709         # the whole rx_ip pkt should be equal to tx_ip2
1710         # except for the hlim field
1711         #   -> adjust tx'ed hlim to expected hlim
1712         tx_ip2.hlim = tx_ip2.hlim - 1
1713
1714         self.assertEqual(rx_ip, tx_ip2)
1715
1716         self.logger.debug("packet verification: SUCCESS")
1717
1718     def compare_rx_tx_packet_End_DX4(self, tx_pkt, rx_pkt):
1719         """ Compare input and output packet after passing End.DX4
1720
1721         :param tx_pkt: transmitted packet
1722         :param rx_pkt: received packet
1723         """
1724         # End.DX4 updates the headers as follows:
1725         # IPv6 + SRH (SL=0):
1726         # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv4(B, D)
1727         # out: IPv4(B, D)
1728         # IPv6:
1729         # in: IPv6(A, S3)IPv4(B, D)
1730         # out: IPv4(B, D)
1731
1732         # get IPv4 header of rx'ed packet
1733         rx_ip = rx_pkt.getlayer(IP)
1734
1735         tx_ip = tx_pkt.getlayer(IPv6)
1736         tx_ip2 = tx_pkt.getlayer(IP)
1737
1738         # verify if rx'ed packet has no SRH
1739         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1740
1741         # the whole rx_ip pkt should be equal to tx_ip2
1742         # except for the ttl field and ip checksum
1743         #   -> adjust tx'ed ttl to expected ttl
1744         tx_ip2.ttl = tx_ip2.ttl - 1
1745         #   -> set tx'ed ip checksum to None and let scapy recompute
1746         tx_ip2.chksum = None
1747         # read back the pkt (with str()) to force computing these fields
1748         # probably other ways to accomplish this are possible
1749         tx_ip2 = IP(str(tx_ip2))
1750
1751         self.assertEqual(rx_ip, tx_ip2)
1752
1753         self.logger.debug("packet verification: SUCCESS")
1754
1755     def compare_rx_tx_packet_End_DX2(self, tx_pkt, rx_pkt):
1756         """ Compare input and output packet after passing End.DX2
1757
1758         :param tx_pkt: transmitted packet
1759         :param rx_pkt: received packet
1760         """
1761         # End.DX2 updates the headers as follows:
1762         # IPv6 + SRH (SL=0):
1763         # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)L2
1764         # out: L2
1765         # IPv6:
1766         # in: IPv6(A, S3)L2
1767         # out: L2
1768
1769         # get IPv4 header of rx'ed packet
1770         rx_eth = rx_pkt.getlayer(Ether)
1771
1772         tx_ip = tx_pkt.getlayer(IPv6)
1773         # we can't just get the 2nd Ether layer
1774         # get the Raw content and dissect it as Ether
1775         tx_eth1 = Ether(str(tx_pkt[Raw]))
1776
1777         # verify if rx'ed packet has no SRH
1778         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1779
1780         # the whole rx_eth pkt should be equal to tx_eth1
1781         self.assertEqual(rx_eth, tx_eth1)
1782
1783         self.logger.debug("packet verification: SUCCESS")
1784
1785     def create_stream(self, src_if, dst_if, packet_header, packet_sizes,
1786                       count):
1787         """Create SRv6 input packet stream for defined interface.
1788
1789         :param VppInterface src_if: Interface to create packet stream for
1790         :param VppInterface dst_if: destination interface of packet stream
1791         :param packet_header: Layer3 scapy packet headers,
1792                 L2 is added when not provided,
1793                 Raw(payload) with packet_info is added
1794         :param list packet_sizes: packet stream pckt sizes,sequentially applied
1795                to packets in stream have
1796         :param int count: number of packets in packet stream
1797         :return: list of packets
1798         """
1799         self.logger.info("Creating packets")
1800         pkts = []
1801         for i in range(0, count-1):
1802             payload_info = self.create_packet_info(src_if, dst_if)
1803             self.logger.debug(
1804                 "Creating packet with index %d" % (payload_info.index))
1805             payload = self.info_to_payload(payload_info)
1806             # add L2 header if not yet provided in packet_header
1807             if packet_header.getlayer(0).name == 'Ethernet':
1808                 p = (packet_header /
1809                      Raw(payload))
1810             else:
1811                 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
1812                      packet_header /
1813                      Raw(payload))
1814             size = packet_sizes[i % len(packet_sizes)]
1815             self.logger.debug("Packet size %d" % (size))
1816             self.extend_packet(p, size)
1817             # we need to store the packet with the automatic fields computed
1818             # read back the dumped packet (with str())
1819             # to force computing these fields
1820             # probably other ways are possible
1821             p = Ether(str(p))
1822             payload_info.data = p.copy()
1823             self.logger.debug(ppp("Created packet:", p))
1824             pkts.append(p)
1825         self.logger.info("Done creating packets")
1826         return pkts
1827
1828     def send_and_verify_pkts(self, input, pkts, output, compare_func):
1829         """Send packets and verify received packets using compare_func
1830
1831         :param input: ingress interface of DUT
1832         :param pkts: list of packets to transmit
1833         :param output: egress interface of DUT
1834         :param compare_func: function to compare in and out packets
1835         """
1836         # add traffic stream to input interface
1837         input.add_stream(pkts)
1838
1839         # enable capture on all interfaces
1840         self.pg_enable_capture(self.pg_interfaces)
1841
1842         # start traffic
1843         self.logger.info("Starting traffic")
1844         self.pg_start()
1845
1846         # get output capture
1847         self.logger.info("Getting packet capture")
1848         capture = output.get_capture()
1849
1850         # assert nothing was captured on input interface
1851         input.assert_nothing_captured()
1852
1853         # verify captured packets
1854         self.verify_captured_pkts(output, capture, compare_func)
1855
1856     def create_packet_header_IPv6(self, dst):
1857         """Create packet header: IPv6 header, UDP header
1858
1859         :param dst: IPv6 destination address
1860
1861         IPv6 source address is 1234::1
1862         UDP source port and destination port are 1234
1863         """
1864
1865         p = (IPv6(src='1234::1', dst=dst) /
1866              UDP(sport=1234, dport=1234))
1867         return p
1868
1869     def create_packet_header_IPv6_SRH(self, sidlist, segleft):
1870         """Create packet header: IPv6 header with SRH, UDP header
1871
1872         :param list sidlist: segment list
1873         :param int segleft: segments-left field value
1874
1875         IPv6 destination address is set to sidlist[segleft]
1876         IPv6 source addresses are 1234::1 and 4321::1
1877         UDP source port and destination port are 1234
1878         """
1879
1880         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1881              IPv6ExtHdrSegmentRouting(addresses=sidlist) /
1882              UDP(sport=1234, dport=1234))
1883         return p
1884
1885     def create_packet_header_IPv6_SRH_IPv6(self, dst, sidlist, segleft):
1886         """Create packet header: IPv6 encapsulated in SRv6:
1887         IPv6 header with SRH, IPv6 header, UDP header
1888
1889         :param ipv6address dst: inner IPv6 destination address
1890         :param list sidlist: segment list of outer IPv6 SRH
1891         :param int segleft: segments-left field of outer IPv6 SRH
1892
1893         Outer IPv6 destination address is set to sidlist[segleft]
1894         IPv6 source addresses are 1234::1 and 4321::1
1895         UDP source port and destination port are 1234
1896         """
1897
1898         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1899              IPv6ExtHdrSegmentRouting(addresses=sidlist,
1900                                       segleft=segleft, nh=41) /
1901              IPv6(src='4321::1', dst=dst) /
1902              UDP(sport=1234, dport=1234))
1903         return p
1904
1905     def create_packet_header_IPv6_IPv6(self, dst_inner, dst_outer):
1906         """Create packet header: IPv6 encapsulated in IPv6:
1907         IPv6 header, IPv6 header, UDP header
1908
1909         :param ipv6address dst_inner: inner IPv6 destination address
1910         :param ipv6address dst_outer: outer IPv6 destination address
1911
1912         IPv6 source addresses are 1234::1 and 4321::1
1913         UDP source port and destination port are 1234
1914         """
1915
1916         p = (IPv6(src='1234::1', dst=dst_outer) /
1917              IPv6(src='4321::1', dst=dst_inner) /
1918              UDP(sport=1234, dport=1234))
1919         return p
1920
1921     def create_packet_header_IPv6_SRH_SRH_IPv6(self, dst, sidlist1, segleft1,
1922                                                sidlist2, segleft2):
1923         """Create packet header: IPv6 encapsulated in SRv6 with 2 SRH:
1924         IPv6 header with SRH, 2nd SRH, IPv6 header, UDP header
1925
1926         :param ipv6address dst: inner IPv6 destination address
1927         :param list sidlist1: segment list of outer IPv6 SRH
1928         :param int segleft1: segments-left field of outer IPv6 SRH
1929         :param list sidlist2: segment list of inner IPv6 SRH
1930         :param int segleft2: segments-left field of inner IPv6 SRH
1931
1932         Outer IPv6 destination address is set to sidlist[segleft]
1933         IPv6 source addresses are 1234::1 and 4321::1
1934         UDP source port and destination port are 1234
1935         """
1936
1937         p = (IPv6(src='1234::1', dst=sidlist1[segleft1]) /
1938              IPv6ExtHdrSegmentRouting(addresses=sidlist1,
1939                                       segleft=segleft1, nh=43) /
1940              IPv6ExtHdrSegmentRouting(addresses=sidlist2,
1941                                       segleft=segleft2, nh=41) /
1942              IPv6(src='4321::1', dst=dst) /
1943              UDP(sport=1234, dport=1234))
1944         return p
1945
1946     def create_packet_header_IPv4(self, dst):
1947         """Create packet header: IPv4 header, UDP header
1948
1949         :param dst: IPv4 destination address
1950
1951         IPv4 source address is 123.1.1.1
1952         UDP source port and destination port are 1234
1953         """
1954
1955         p = (IP(src='123.1.1.1', dst=dst) /
1956              UDP(sport=1234, dport=1234))
1957         return p
1958
1959     def create_packet_header_IPv6_IPv4(self, dst_inner, dst_outer):
1960         """Create packet header: IPv4 encapsulated in IPv6:
1961         IPv6 header, IPv4 header, UDP header
1962
1963         :param ipv4address dst_inner: inner IPv4 destination address
1964         :param ipv6address dst_outer: outer IPv6 destination address
1965
1966         IPv6 source address is 1234::1
1967         IPv4 source address is 123.1.1.1
1968         UDP source port and destination port are 1234
1969         """
1970
1971         p = (IPv6(src='1234::1', dst=dst_outer) /
1972              IP(src='123.1.1.1', dst=dst_inner) /
1973              UDP(sport=1234, dport=1234))
1974         return p
1975
1976     def create_packet_header_IPv6_SRH_IPv4(self, dst, sidlist, segleft):
1977         """Create packet header: IPv4 encapsulated in SRv6:
1978         IPv6 header with SRH, IPv4 header, UDP header
1979
1980         :param ipv4address dst: inner IPv4 destination address
1981         :param list sidlist: segment list of outer IPv6 SRH
1982         :param int segleft: segments-left field of outer IPv6 SRH
1983
1984         Outer IPv6 destination address is set to sidlist[segleft]
1985         IPv6 source address is 1234::1
1986         IPv4 source address is 123.1.1.1
1987         UDP source port and destination port are 1234
1988         """
1989
1990         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1991              IPv6ExtHdrSegmentRouting(addresses=sidlist,
1992                                       segleft=segleft, nh=4) /
1993              IP(src='123.1.1.1', dst=dst) /
1994              UDP(sport=1234, dport=1234))
1995         return p
1996
1997     def create_packet_header_L2(self, vlan=0):
1998         """Create packet header: L2 header
1999
2000         :param vlan: if vlan!=0 then add 802.1q header
2001         """
2002         # Note: the dst addr ('00:55:44:33:22:11') is used in
2003         # the compare function compare_rx_tx_packet_T_Encaps_L2
2004         # to detect presence of L2 in SRH payload
2005         p = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
2006         etype = 0x8137  # IPX
2007         if vlan:
2008             # add 802.1q layer
2009             p /= Dot1Q(vlan=vlan, type=etype)
2010         else:
2011             p.type = etype
2012         return p
2013
2014     def create_packet_header_IPv6_SRH_L2(self, sidlist, segleft, vlan=0):
2015         """Create packet header: L2 encapsulated in SRv6:
2016         IPv6 header with SRH, L2
2017
2018         :param list sidlist: segment list of outer IPv6 SRH
2019         :param int segleft: segments-left field of outer IPv6 SRH
2020         :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
2021
2022         Outer IPv6 destination address is set to sidlist[segleft]
2023         IPv6 source address is 1234::1
2024         """
2025         eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
2026         etype = 0x8137  # IPX
2027         if vlan:
2028             # add 802.1q layer
2029             eth /= Dot1Q(vlan=vlan, type=etype)
2030         else:
2031             eth.type = etype
2032
2033         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
2034              IPv6ExtHdrSegmentRouting(addresses=sidlist,
2035                                       segleft=segleft, nh=59) /
2036              eth)
2037         return p
2038
2039     def create_packet_header_IPv6_L2(self, dst_outer, vlan=0):
2040         """Create packet header: L2 encapsulated in IPv6:
2041         IPv6 header, L2
2042
2043         :param ipv6address dst_outer: outer IPv6 destination address
2044         :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
2045         """
2046         eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
2047         etype = 0x8137  # IPX
2048         if vlan:
2049             # add 802.1q layer
2050             eth /= Dot1Q(vlan=vlan, type=etype)
2051         else:
2052             eth.type = etype
2053
2054         p = (IPv6(src='1234::1', dst=dst_outer, nh=59) / eth)
2055         return p
2056
2057     def get_payload_info(self, packet):
2058         """ Extract the payload_info from the packet
2059         """
2060         # in most cases, payload_info is in packet[Raw]
2061         # but packet[Raw] gives the complete payload
2062         # (incl L2 header) for the T.Encaps L2 case
2063         try:
2064             payload_info = self.payload_to_info(str(packet[Raw]))
2065
2066         except:
2067             # remote L2 header from packet[Raw]:
2068             # take packet[Raw], convert it to an Ether layer
2069             # and then extract Raw from it
2070             payload_info = self.payload_to_info(
2071                 str(Ether(str(packet[Raw]))[Raw]))
2072
2073         return payload_info
2074
2075     def verify_captured_pkts(self, dst_if, capture, compare_func):
2076         """
2077         Verify captured packet stream for specified interface.
2078         Compare ingress with egress packets using the specified compare fn
2079
2080         :param dst_if: egress interface of DUT
2081         :param capture: captured packets
2082         :param compare_func: function to compare in and out packet
2083         """
2084         self.logger.info("Verifying capture on interface %s using function %s"
2085                          % (dst_if.name, compare_func.func_name))
2086
2087         last_info = dict()
2088         for i in self.pg_interfaces:
2089             last_info[i.sw_if_index] = None
2090         dst_sw_if_index = dst_if.sw_if_index
2091
2092         for packet in capture:
2093             try:
2094                 # extract payload_info from packet's payload
2095                 payload_info = self.get_payload_info(packet)
2096                 packet_index = payload_info.index
2097
2098                 self.logger.debug("Verifying packet with index %d"
2099                                   % (packet_index))
2100                 # packet should have arrived on the expected interface
2101                 self.assertEqual(payload_info.dst, dst_sw_if_index)
2102                 self.logger.debug(
2103                     "Got packet on interface %s: src=%u (idx=%u)" %
2104                     (dst_if.name, payload_info.src, packet_index))
2105
2106                 # search for payload_info with same src and dst if_index
2107                 # this will give us the transmitted packet
2108                 next_info = self.get_next_packet_info_for_interface2(
2109                     payload_info.src, dst_sw_if_index,
2110                     last_info[payload_info.src])
2111                 last_info[payload_info.src] = next_info
2112                 # next_info should not be None
2113                 self.assertTrue(next_info is not None)
2114                 # index of tx and rx packets should be equal
2115                 self.assertEqual(packet_index, next_info.index)
2116                 # data field of next_info contains the tx packet
2117                 txed_packet = next_info.data
2118
2119                 self.logger.debug(ppp("Transmitted packet:",
2120                                       txed_packet))  # ppp=Pretty Print Packet
2121
2122                 self.logger.debug(ppp("Received packet:", packet))
2123
2124                 # compare rcvd packet with expected packet using compare_func
2125                 compare_func(txed_packet, packet)
2126
2127             except:
2128                 print packet.command()
2129                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2130                 raise
2131
2132         # have all expected packets arrived?
2133         for i in self.pg_interfaces:
2134             remaining_packet = self.get_next_packet_info_for_interface2(
2135                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
2136             self.assertTrue(remaining_packet is None,
2137                             "Interface %s: Packet expected from interface %s "
2138                             "didn't arrive" % (dst_if.name, i.name))
2139
2140
2141 if __name__ == '__main__':
2142     unittest.main(testRunner=VppTestRunner)