sr: srv6 API cleanup
[vpp.git] / src / plugins / srv6-am / test / test_srv6.py
1 #!/usr/bin/env python3
2
3 import unittest
4 import binascii
5 from socket import AF_INET6
6
7 from framework import VppTestCase, VppTestRunner
8 from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathProto, VppIpTable
9 from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \
10     SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes
11
12 import scapy.compat
13 from scapy.packet import Raw
14 from scapy.layers.l2 import Ether, Dot1Q
15 from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
16 from scapy.layers.inet import IP, UDP
17
18 from util import ppp
19
20
21 class TestSRv6(VppTestCase):
22     """ SRv6 Test Case """
23
24     @classmethod
25     def setUpClass(cls):
26         super(TestSRv6, cls).setUpClass()
27
28     @classmethod
29     def tearDownClass(cls):
30         super(TestSRv6, cls).tearDownClass()
31
32     def setUp(self):
33         """ Perform test setup before each test case.
34         """
35         super(TestSRv6, self).setUp()
36
37         # packet sizes, inclusive L2 overhead
38         self.pg_packet_sizes = [64, 512, 1518, 9018]
39
40         # reset packet_infos
41         self.reset_packet_infos()
42
43     def tearDown(self):
44         """ Clean up test setup after each test case.
45         """
46         self.teardown_interfaces()
47
48         super(TestSRv6, self).tearDown()
49
50     def configure_interface(self,
51                             interface,
52                             ipv6=False, ipv4=False,
53                             ipv6_table_id=0, ipv4_table_id=0):
54         """ Configure interface.
55         :param ipv6: configure IPv6 on interface
56         :param ipv4: configure IPv4 on interface
57         :param ipv6_table_id: FIB table_id for IPv6
58         :param ipv4_table_id: FIB table_id for IPv4
59         """
60         self.logger.debug("Configuring interface %s" % (interface.name))
61         if ipv6:
62             self.logger.debug("Configuring IPv6")
63             interface.set_table_ip6(ipv6_table_id)
64             interface.config_ip6()
65             interface.resolve_ndp(timeout=5)
66         if ipv4:
67             self.logger.debug("Configuring IPv4")
68             interface.set_table_ip4(ipv4_table_id)
69             interface.config_ip4()
70             interface.resolve_arp()
71         interface.admin_up()
72
73     def setup_interfaces(self, ipv6=[], ipv4=[],
74                          ipv6_table_id=[], ipv4_table_id=[]):
75         """ Create and configure interfaces.
76
77         :param ipv6: list of interface IPv6 capabilities
78         :param ipv4: list of interface IPv4 capabilities
79         :param ipv6_table_id: list of intf IPv6 FIB table_ids
80         :param ipv4_table_id: list of intf IPv4 FIB table_ids
81         :returns: List of created interfaces.
82         """
83         # how many interfaces?
84         if len(ipv6):
85             count = len(ipv6)
86         else:
87             count = len(ipv4)
88         self.logger.debug("Creating and configuring %d interfaces" % (count))
89
90         # fill up ipv6 and ipv4 lists if needed
91         # not enabled (False) is the default
92         if len(ipv6) < count:
93             ipv6 += (count - len(ipv6)) * [False]
94         if len(ipv4) < count:
95             ipv4 += (count - len(ipv4)) * [False]
96
97         # fill up table_id lists if needed
98         # table_id 0 (global) is the default
99         if len(ipv6_table_id) < count:
100             ipv6_table_id += (count - len(ipv6_table_id)) * [0]
101         if len(ipv4_table_id) < count:
102             ipv4_table_id += (count - len(ipv4_table_id)) * [0]
103
104         # create 'count' pg interfaces
105         self.create_pg_interfaces(range(count))
106
107         # setup all interfaces
108         for i in range(count):
109             intf = self.pg_interfaces[i]
110             self.configure_interface(intf,
111                                      ipv6[i], ipv4[i],
112                                      ipv6_table_id[i], ipv4_table_id[i])
113
114         if any(ipv6):
115             self.logger.debug(self.vapi.cli("show ip6 neighbors"))
116         if any(ipv4):
117             self.logger.debug(self.vapi.cli("show ip4 neighbors"))
118         self.logger.debug(self.vapi.cli("show interface"))
119         self.logger.debug(self.vapi.cli("show hardware"))
120
121         return self.pg_interfaces
122
123     def teardown_interfaces(self):
124         """ Unconfigure and bring down interface.
125         """
126         self.logger.debug("Tearing down interfaces")
127         # tear down all interfaces
128         # AFAIK they cannot be deleted
129         for i in self.pg_interfaces:
130             self.logger.debug("Tear down interface %s" % (i.name))
131             i.admin_down()
132             i.unconfig()
133             i.set_table_ip4(0)
134             i.set_table_ip6(0)
135
136     @unittest.skipUnless(0, "PC to fix")
137     def test_SRv6_T_Encaps(self):
138         """ Test SRv6 Transit.Encaps behavior for IPv6.
139         """
140         # send traffic to one destination interface
141         # source and destination are IPv6 only
142         self.setup_interfaces(ipv6=[True, True])
143
144         # configure FIB entries
145         route = VppIpRoute(self, "a4::", 64,
146                            [VppRoutePath(self.pg1.remote_ip6,
147                                          self.pg1.sw_if_index)])
148         route.add_vpp_config()
149
150         # configure encaps IPv6 source address
151         # needs to be done before SR Policy config
152         # TODO: API?
153         self.vapi.cli("set sr encaps source addr a3::")
154
155         bsid = 'a3::9999:1'
156         # configure SRv6 Policy
157         # Note: segment list order: first -> last
158         sr_policy = VppSRv6Policy(
159             self, bsid=bsid,
160             is_encap=1,
161             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
162             weight=1, fib_table=0,
163             segments=['a4::', 'a5::', 'a6::c7'],
164             source='a3::')
165         sr_policy.add_vpp_config()
166         self.sr_policy = sr_policy
167
168         # log the sr policies
169         self.logger.info(self.vapi.cli("show sr policies"))
170
171         # steer IPv6 traffic to a7::/64 into SRv6 Policy
172         # use the bsid of the above self.sr_policy
173         pol_steering = VppSRv6Steering(
174                         self,
175                         bsid=self.sr_policy.bsid,
176                         prefix="a7::", mask_width=64,
177                         traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
178                         sr_policy_index=0, table_id=0,
179                         sw_if_index=0)
180         pol_steering.add_vpp_config()
181
182         # log the sr steering policies
183         self.logger.info(self.vapi.cli("show sr steering policies"))
184
185         # create packets
186         count = len(self.pg_packet_sizes)
187         dst_inner = 'a7::1234'
188         pkts = []
189
190         # create IPv6 packets without SRH
191         packet_header = self.create_packet_header_IPv6(dst_inner)
192         # create traffic stream pg0->pg1
193         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
194                                        self.pg_packet_sizes, count))
195
196         # create IPv6 packets with SRH
197         # packets with segments-left 1, active segment a7::
198         packet_header = self.create_packet_header_IPv6_SRH(
199             sidlist=['a8::', 'a7::', 'a6::'],
200             segleft=1)
201         # create traffic stream pg0->pg1
202         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
203                                        self.pg_packet_sizes, count))
204
205         # create IPv6 packets with SRH and IPv6
206         # packets with segments-left 1, active segment a7::
207         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
208             dst_inner,
209             sidlist=['a8::', 'a7::', 'a6::'],
210             segleft=1)
211         # create traffic stream pg0->pg1
212         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
213                                        self.pg_packet_sizes, count))
214
215         # send packets and verify received packets
216         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
217                                   self.compare_rx_tx_packet_T_Encaps)
218
219         # log the localsid counters
220         self.logger.info(self.vapi.cli("show sr localsid"))
221
222         # remove SR steering
223         pol_steering.remove_vpp_config()
224         self.logger.info(self.vapi.cli("show sr steering policies"))
225
226         # remove SR Policies
227         self.sr_policy.remove_vpp_config()
228         self.logger.info(self.vapi.cli("show sr policies"))
229
230         # remove FIB entries
231         # done by tearDown
232
233         # cleanup interfaces
234         self.teardown_interfaces()
235
236     @unittest.skipUnless(0, "PC to fix")
237     def test_SRv6_T_Insert(self):
238         """ Test SRv6 Transit.Insert behavior (IPv6 only).
239         """
240         # send traffic to one destination interface
241         # source and destination are IPv6 only
242         self.setup_interfaces(ipv6=[True, True])
243
244         # configure FIB entries
245         route = VppIpRoute(self, "a4::", 64,
246                            [VppRoutePath(self.pg1.remote_ip6,
247                                          self.pg1.sw_if_index)])
248         route.add_vpp_config()
249
250         # configure encaps IPv6 source address
251         # needs to be done before SR Policy config
252         # TODO: API?
253         self.vapi.cli("set sr encaps source addr a3::")
254
255         bsid = 'a3::9999:1'
256         # configure SRv6 Policy
257         # Note: segment list order: first -> last
258         sr_policy = VppSRv6Policy(
259             self, bsid=bsid,
260             is_encap=0,
261             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
262             weight=1, fib_table=0,
263             segments=['a4::', 'a5::', 'a6::c7'],
264             source='a3::')
265         sr_policy.add_vpp_config()
266         self.sr_policy = sr_policy
267
268         # log the sr policies
269         self.logger.info(self.vapi.cli("show sr policies"))
270
271         # steer IPv6 traffic to a7::/64 into SRv6 Policy
272         # use the bsid of the above self.sr_policy
273         pol_steering = VppSRv6Steering(
274                         self,
275                         bsid=self.sr_policy.bsid,
276                         prefix="a7::", mask_width=64,
277                         traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
278                         sr_policy_index=0, table_id=0,
279                         sw_if_index=0)
280         pol_steering.add_vpp_config()
281
282         # log the sr steering policies
283         self.logger.info(self.vapi.cli("show sr steering policies"))
284
285         # create packets
286         count = len(self.pg_packet_sizes)
287         dst_inner = 'a7::1234'
288         pkts = []
289
290         # create IPv6 packets without SRH
291         packet_header = self.create_packet_header_IPv6(dst_inner)
292         # create traffic stream pg0->pg1
293         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
294                                        self.pg_packet_sizes, count))
295
296         # create IPv6 packets with SRH
297         # packets with segments-left 1, active segment a7::
298         packet_header = self.create_packet_header_IPv6_SRH(
299             sidlist=['a8::', 'a7::', 'a6::'],
300             segleft=1)
301         # create traffic stream pg0->pg1
302         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
303                                        self.pg_packet_sizes, count))
304
305         # send packets and verify received packets
306         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
307                                   self.compare_rx_tx_packet_T_Insert)
308
309         # log the localsid counters
310         self.logger.info(self.vapi.cli("show sr localsid"))
311
312         # remove SR steering
313         pol_steering.remove_vpp_config()
314         self.logger.info(self.vapi.cli("show sr steering policies"))
315
316         # remove SR Policies
317         self.sr_policy.remove_vpp_config()
318         self.logger.info(self.vapi.cli("show sr policies"))
319
320         # remove FIB entries
321         # done by tearDown
322
323         # cleanup interfaces
324         self.teardown_interfaces()
325
326     @unittest.skipUnless(0, "PC to fix")
327     def test_SRv6_T_Encaps_IPv4(self):
328         """ Test SRv6 Transit.Encaps behavior for IPv4.
329         """
330         # send traffic to one destination interface
331         # source interface is IPv4 only
332         # destination interface is IPv6 only
333         self.setup_interfaces(ipv6=[False, True], ipv4=[True, False])
334
335         # configure FIB entries
336         route = VppIpRoute(self, "a4::", 64,
337                            [VppRoutePath(self.pg1.remote_ip6,
338                                          self.pg1.sw_if_index)])
339         route.add_vpp_config()
340
341         # configure encaps IPv6 source address
342         # needs to be done before SR Policy config
343         # TODO: API?
344         self.vapi.cli("set sr encaps source addr a3::")
345
346         bsid = 'a3::9999:1'
347         # configure SRv6 Policy
348         # Note: segment list order: first -> last
349         sr_policy = VppSRv6Policy(
350             self, bsid=bsid,
351             is_encap=1,
352             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
353             weight=1, fib_table=0,
354             segments=['a4::', 'a5::', 'a6::c7'],
355             source='a3::')
356         sr_policy.add_vpp_config()
357         self.sr_policy = sr_policy
358
359         # log the sr policies
360         self.logger.info(self.vapi.cli("show sr policies"))
361
362         # steer IPv4 traffic to 7.1.1.0/24 into SRv6 Policy
363         # use the bsid of the above self.sr_policy
364         pol_steering = VppSRv6Steering(
365                         self,
366                         bsid=self.sr_policy.bsid,
367                         prefix="7.1.1.0", mask_width=24,
368                         traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV4,
369                         sr_policy_index=0, table_id=0,
370                         sw_if_index=0)
371         pol_steering.add_vpp_config()
372
373         # log the sr steering policies
374         self.logger.info(self.vapi.cli("show sr steering policies"))
375
376         # create packets
377         count = len(self.pg_packet_sizes)
378         dst_inner = '7.1.1.123'
379         pkts = []
380
381         # create IPv4 packets
382         packet_header = self.create_packet_header_IPv4(dst_inner)
383         # create traffic stream pg0->pg1
384         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
385                                        self.pg_packet_sizes, count))
386
387         # send packets and verify received packets
388         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
389                                   self.compare_rx_tx_packet_T_Encaps_IPv4)
390
391         # log the localsid counters
392         self.logger.info(self.vapi.cli("show sr localsid"))
393
394         # remove SR steering
395         pol_steering.remove_vpp_config()
396         self.logger.info(self.vapi.cli("show sr steering policies"))
397
398         # remove SR Policies
399         self.sr_policy.remove_vpp_config()
400         self.logger.info(self.vapi.cli("show sr policies"))
401
402         # remove FIB entries
403         # done by tearDown
404
405         # cleanup interfaces
406         self.teardown_interfaces()
407
408     @unittest.skip("VPP crashes after running this test")
409     def test_SRv6_T_Encaps_L2(self):
410         """ Test SRv6 Transit.Encaps behavior for L2.
411         """
412         # send traffic to one destination interface
413         # source interface is IPv4 only TODO?
414         # destination interface is IPv6 only
415         self.setup_interfaces(ipv6=[False, True], ipv4=[False, False])
416
417         # configure FIB entries
418         route = VppIpRoute(self, "a4::", 64,
419                            [VppRoutePath(self.pg1.remote_ip6,
420                                          self.pg1.sw_if_index)])
421         route.add_vpp_config()
422
423         # configure encaps IPv6 source address
424         # needs to be done before SR Policy config
425         # TODO: API?
426         self.vapi.cli("set sr encaps source addr a3::")
427
428         bsid = 'a3::9999:1'
429         # configure SRv6 Policy
430         # Note: segment list order: first -> last
431         sr_policy = VppSRv6Policy(
432             self, bsid=bsid,
433             is_encap=1,
434             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
435             weight=1, fib_table=0,
436             segments=['a4::', 'a5::', 'a6::c7'],
437             source='a3::')
438         sr_policy.add_vpp_config()
439         self.sr_policy = sr_policy
440
441         # log the sr policies
442         self.logger.info(self.vapi.cli("show sr policies"))
443
444         # steer L2 traffic into SRv6 Policy
445         # use the bsid of the above self.sr_policy
446         pol_steering = VppSRv6Steering(
447                         self,
448                         bsid=self.sr_policy.bsid,
449                         prefix="::", mask_width=0,
450                         traffic_type=SRv6PolicySteeringTypes.SR_STEER_L2,
451                         sr_policy_index=0, table_id=0,
452                         sw_if_index=self.pg0.sw_if_index)
453         pol_steering.add_vpp_config()
454
455         # log the sr steering policies
456         self.logger.info(self.vapi.cli("show sr steering policies"))
457
458         # create packets
459         count = len(self.pg_packet_sizes)
460         pkts = []
461
462         # create L2 packets without dot1q header
463         packet_header = self.create_packet_header_L2()
464         # create traffic stream pg0->pg1
465         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
466                                        self.pg_packet_sizes, count))
467
468         # create L2 packets with dot1q header
469         packet_header = self.create_packet_header_L2(vlan=123)
470         # create traffic stream pg0->pg1
471         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
472                                        self.pg_packet_sizes, count))
473
474         # send packets and verify received packets
475         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
476                                   self.compare_rx_tx_packet_T_Encaps_L2)
477
478         # log the localsid counters
479         self.logger.info(self.vapi.cli("show sr localsid"))
480
481         # remove SR steering
482         pol_steering.remove_vpp_config()
483         self.logger.info(self.vapi.cli("show sr steering policies"))
484
485         # remove SR Policies
486         self.sr_policy.remove_vpp_config()
487         self.logger.info(self.vapi.cli("show sr policies"))
488
489         # remove FIB entries
490         # done by tearDown
491
492         # cleanup interfaces
493         self.teardown_interfaces()
494
495     def test_SRv6_End(self):
496         """ Test SRv6 End (without PSP) behavior.
497         """
498         # send traffic to one destination interface
499         # source and destination interfaces are IPv6 only
500         self.setup_interfaces(ipv6=[True, True])
501
502         # configure FIB entries
503         route = VppIpRoute(self, "a4::", 64,
504                            [VppRoutePath(self.pg1.remote_ip6,
505                                          self.pg1.sw_if_index)])
506         route.add_vpp_config()
507
508         # configure SRv6 localSID End without PSP behavior
509         localsid = VppSRv6LocalSID(
510                         self, localsid='A3::0',
511                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
512                         nh_addr=0,
513                         end_psp=0,
514                         sw_if_index=0,
515                         vlan_index=0,
516                         fib_table=0)
517         localsid.add_vpp_config()
518         # log the localsids
519         self.logger.debug(self.vapi.cli("show sr localsid"))
520
521         # create IPv6 packets with SRH (SL=2, SL=1, SL=0)
522         # send one packet per SL value per packet size
523         # SL=0 packet with localSID End with USP needs 2nd SRH
524         count = len(self.pg_packet_sizes)
525         dst_inner = 'a4::1234'
526         pkts = []
527
528         # packets with segments-left 2, active segment a3::
529         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
530                 dst_inner,
531                 sidlist=['a5::', 'a4::', 'a3::'],
532                 segleft=2)
533         # create traffic stream pg0->pg1
534         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
535                                        self.pg_packet_sizes, count))
536
537         # packets with segments-left 1, active segment a3::
538         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
539                 dst_inner,
540                 sidlist=['a4::', 'a3::', 'a2::'],
541                 segleft=1)
542         # add to traffic stream pg0->pg1
543         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
544                                        self.pg_packet_sizes, count))
545
546         # TODO: test behavior with SL=0 packet (needs 2*SRH?)
547
548         expected_count = len(pkts)
549
550         # packets without SRH (should not crash)
551         packet_header = self.create_packet_header_IPv6('a3::')
552         # create traffic stream pg0->pg1
553         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
554                                        self.pg_packet_sizes, count))
555
556         # send packets and verify received packets
557         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
558                                   self.compare_rx_tx_packet_End,
559                                   expected_count=expected_count)
560
561         # log the localsid counters
562         self.logger.info(self.vapi.cli("show sr localsid"))
563
564         # remove SRv6 localSIDs
565         localsid.remove_vpp_config()
566
567         # remove FIB entries
568         # done by tearDown
569
570         # cleanup interfaces
571         self.teardown_interfaces()
572
573     def test_SRv6_End_with_PSP(self):
574         """ Test SRv6 End with PSP behavior.
575         """
576         # send traffic to one destination interface
577         # source and destination interfaces are IPv6 only
578         self.setup_interfaces(ipv6=[True, True])
579
580         # configure FIB entries
581         route = VppIpRoute(self, "a4::", 64,
582                            [VppRoutePath(self.pg1.remote_ip6,
583                                          self.pg1.sw_if_index)])
584         route.add_vpp_config()
585
586         # configure SRv6 localSID End with PSP behavior
587         localsid = VppSRv6LocalSID(
588                         self, localsid='A3::0',
589                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
590                         nh_addr=0,
591                         end_psp=1,
592                         sw_if_index=0,
593                         vlan_index=0,
594                         fib_table=0)
595         localsid.add_vpp_config()
596         # log the localsids
597         self.logger.debug(self.vapi.cli("show sr localsid"))
598
599         # create IPv6 packets with SRH (SL=2, SL=1)
600         # send one packet per SL value per packet size
601         # SL=0 packet with localSID End with PSP is dropped
602         count = len(self.pg_packet_sizes)
603         dst_inner = 'a4::1234'
604         pkts = []
605
606         # packets with segments-left 2, active segment a3::
607         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
608                     dst_inner,
609                     sidlist=['a5::', 'a4::', 'a3::'],
610                     segleft=2)
611         # create traffic stream pg0->pg1
612         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
613                                        self.pg_packet_sizes, count))
614
615         # packets with segments-left 1, active segment a3::
616         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
617                     dst_inner,
618                     sidlist=['a4::', 'a3::', 'a2::'],
619                     segleft=1)
620         # add to traffic stream pg0->pg1
621         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
622                                        self.pg_packet_sizes, count))
623
624         # send packets and verify received packets
625         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
626                                   self.compare_rx_tx_packet_End_PSP)
627
628         # log the localsid counters
629         self.logger.info(self.vapi.cli("show sr localsid"))
630
631         # remove SRv6 localSIDs
632         localsid.remove_vpp_config()
633
634         # remove FIB entries
635         # done by tearDown
636
637         # cleanup interfaces
638         self.teardown_interfaces()
639
640     def test_SRv6_End_X(self):
641         """ Test SRv6 End.X (without PSP) behavior.
642         """
643         # create three interfaces (1 source, 2 destinations)
644         # source and destination interfaces are IPv6 only
645         self.setup_interfaces(ipv6=[True, True, True])
646
647         # configure FIB entries
648         # a4::/64 via pg1 and pg2
649         route = VppIpRoute(self, "a4::", 64,
650                            [VppRoutePath(self.pg1.remote_ip6,
651                                          self.pg1.sw_if_index),
652                             VppRoutePath(self.pg2.remote_ip6,
653                                          self.pg2.sw_if_index)])
654         route.add_vpp_config()
655         self.logger.debug(self.vapi.cli("show ip6 fib"))
656
657         # configure SRv6 localSID End.X without PSP behavior
658         # End.X points to interface pg1
659         localsid = VppSRv6LocalSID(
660                         self, localsid='A3::C4',
661                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
662                         nh_addr=self.pg1.remote_ip6,
663                         end_psp=0,
664                         sw_if_index=self.pg1.sw_if_index,
665                         vlan_index=0,
666                         fib_table=0)
667         localsid.add_vpp_config()
668         # log the localsids
669         self.logger.debug(self.vapi.cli("show sr localsid"))
670
671         # create IPv6 packets with SRH (SL=2, SL=1)
672         # send one packet per SL value per packet size
673         # SL=0 packet with localSID End with PSP is dropped
674         count = len(self.pg_packet_sizes)
675         dst_inner = 'a4::1234'
676         pkts = []
677
678         # packets with segments-left 2, active segment a3::c4
679         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
680                     dst_inner,
681                     sidlist=['a5::', 'a4::', 'a3::c4'],
682                     segleft=2)
683         # create traffic stream pg0->pg1
684         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
685                                        self.pg_packet_sizes, count))
686
687         # packets with segments-left 1, active segment a3::c4
688         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
689                     dst_inner,
690                     sidlist=['a4::', 'a3::c4', 'a2::'],
691                     segleft=1)
692         # add to traffic stream pg0->pg1
693         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
694                                        self.pg_packet_sizes, count))
695
696         # send packets and verify received packets
697         # using same comparison function as End (no PSP)
698         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
699                                   self.compare_rx_tx_packet_End)
700
701         # assert nothing was received on the other interface (pg2)
702         self.pg2.assert_nothing_captured("mis-directed packet(s)")
703
704         # log the localsid counters
705         self.logger.info(self.vapi.cli("show sr localsid"))
706
707         # remove SRv6 localSIDs
708         localsid.remove_vpp_config()
709
710         # remove FIB entries
711         # done by tearDown
712
713         # cleanup interfaces
714         self.teardown_interfaces()
715
716     def test_SRv6_End_X_with_PSP(self):
717         """ Test SRv6 End.X with PSP behavior.
718         """
719         # create three interfaces (1 source, 2 destinations)
720         # source and destination interfaces are IPv6 only
721         self.setup_interfaces(ipv6=[True, True, True])
722
723         # configure FIB entries
724         # a4::/64 via pg1 and pg2
725         route = VppIpRoute(self, "a4::", 64,
726                            [VppRoutePath(
727                                self.pg1.remote_ip6,
728                                self.pg1.sw_if_index),
729                             VppRoutePath(self.pg2.remote_ip6,
730                                          self.pg2.sw_if_index)])
731         route.add_vpp_config()
732
733         # configure SRv6 localSID End with PSP behavior
734         localsid = VppSRv6LocalSID(
735                         self, localsid='A3::C4',
736                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
737                         nh_addr=self.pg1.remote_ip6,
738                         end_psp=1,
739                         sw_if_index=self.pg1.sw_if_index,
740                         vlan_index=0,
741                         fib_table=0)
742         localsid.add_vpp_config()
743         # log the localsids
744         self.logger.debug(self.vapi.cli("show sr localsid"))
745
746         # create IPv6 packets with SRH (SL=2, SL=1)
747         # send one packet per SL value per packet size
748         # SL=0 packet with localSID End with PSP is dropped
749         count = len(self.pg_packet_sizes)
750         dst_inner = 'a4::1234'
751         pkts = []
752
753         # packets with segments-left 2, active segment a3::
754         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
755                     dst_inner,
756                     sidlist=['a5::', 'a4::', 'a3::c4'],
757                     segleft=2)
758         # create traffic stream pg0->pg1
759         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
760                                        self.pg_packet_sizes, count))
761
762         # packets with segments-left 1, active segment a3::
763         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
764                     dst_inner,
765                     sidlist=['a4::', 'a3::c4', 'a2::'],
766                     segleft=1)
767         # add to traffic stream pg0->pg1
768         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
769                                        self.pg_packet_sizes, count))
770
771         # send packets and verify received packets
772         # using same comparison function as End with PSP
773         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
774                                   self.compare_rx_tx_packet_End_PSP)
775
776         # assert nothing was received on the other interface (pg2)
777         self.pg2.assert_nothing_captured("mis-directed packet(s)")
778
779         # log the localsid counters
780         self.logger.info(self.vapi.cli("show sr localsid"))
781
782         # remove SRv6 localSIDs
783         localsid.remove_vpp_config()
784
785         # remove FIB entries
786         # done by tearDown
787
788         # cleanup interfaces
789         self.teardown_interfaces()
790
791     def test_SRv6_End_DX6(self):
792         """ Test SRv6 End.DX6 behavior.
793         """
794         # send traffic to one destination interface
795         # source and destination interfaces are IPv6 only
796         self.setup_interfaces(ipv6=[True, True])
797
798         # configure SRv6 localSID End.DX6 behavior
799         localsid = VppSRv6LocalSID(
800                         self, localsid='A3::C4',
801                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX6,
802                         nh_addr=self.pg1.remote_ip6,
803                         end_psp=0,
804                         sw_if_index=self.pg1.sw_if_index,
805                         vlan_index=0,
806                         fib_table=0)
807         localsid.add_vpp_config()
808         # log the localsids
809         self.logger.debug(self.vapi.cli("show sr localsid"))
810
811         # create IPv6 packets with SRH (SL=0)
812         # send one packet per packet size
813         count = len(self.pg_packet_sizes)
814         dst_inner = 'a4::1234'  # inner header destination address
815         pkts = []
816
817         # packets with SRH, segments-left 0, active segment a3::c4
818         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
819                         dst_inner,
820                         sidlist=['a3::c4', 'a2::', 'a1::'],
821                         segleft=0)
822         # add to traffic stream pg0->pg1
823         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
824                                        self.pg_packet_sizes, count))
825
826         # packets without SRH, IPv6 in IPv6
827         # outer IPv6 dest addr is the localsid End.DX6
828         packet_header = self.create_packet_header_IPv6_IPv6(
829                                             dst_inner,
830                                             dst_outer='a3::c4')
831         # add to traffic stream pg0->pg1
832         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
833                                        self.pg_packet_sizes, count))
834
835         # send packets and verify received packets
836         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
837                                   self.compare_rx_tx_packet_End_DX6)
838
839         # log the localsid counters
840         self.logger.info(self.vapi.cli("show sr localsid"))
841
842         # remove SRv6 localSIDs
843         localsid.remove_vpp_config()
844
845         # cleanup interfaces
846         self.teardown_interfaces()
847
848     def test_SRv6_End_DT6(self):
849         """ Test SRv6 End.DT6 behavior.
850         """
851         # create three interfaces (1 source, 2 destinations)
852         # all interfaces are IPv6 only
853         # source interface in global FIB (0)
854         # destination interfaces in global and vrf
855         vrf_1 = 1
856         ipt = VppIpTable(self, vrf_1, is_ip6=True)
857         ipt.add_vpp_config()
858         self.setup_interfaces(ipv6=[True, True, True],
859                               ipv6_table_id=[0, 0, vrf_1])
860
861         # configure FIB entries
862         # a4::/64 is reachable
863         #     via pg1 in table 0 (global)
864         #     and via pg2 in table vrf_1
865         route0 = VppIpRoute(self, "a4::", 64,
866                             [VppRoutePath(self.pg1.remote_ip6,
867                                           self.pg1.sw_if_index,
868                                           nh_table_id=0)],
869                             table_id=0)
870         route0.add_vpp_config()
871         route1 = VppIpRoute(self, "a4::", 64,
872                             [VppRoutePath(self.pg2.remote_ip6,
873                                           self.pg2.sw_if_index,
874                                           nh_table_id=vrf_1)],
875                             table_id=vrf_1)
876         route1.add_vpp_config()
877         self.logger.debug(self.vapi.cli("show ip6 fib"))
878
879         # configure SRv6 localSID End.DT6 behavior
880         # Note:
881         # fib_table: where the localsid is installed
882         # sw_if_index: in T-variants of localsid this is the vrf table_id
883         localsid = VppSRv6LocalSID(
884                         self, localsid='A3::C4',
885                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT6,
886                         nh_addr=0,
887                         end_psp=0,
888                         sw_if_index=vrf_1,
889                         vlan_index=0,
890                         fib_table=0)
891         localsid.add_vpp_config()
892         # log the localsids
893         self.logger.debug(self.vapi.cli("show sr localsid"))
894
895         # create IPv6 packets with SRH (SL=0)
896         # send one packet per packet size
897         count = len(self.pg_packet_sizes)
898         dst_inner = 'a4::1234'  # inner header destination address
899         pkts = []
900
901         # packets with SRH, segments-left 0, active segment a3::c4
902         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
903                         dst_inner,
904                         sidlist=['a3::c4', 'a2::', 'a1::'],
905                         segleft=0)
906         # add to traffic stream pg0->pg1
907         pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
908                                        self.pg_packet_sizes, count))
909
910         # packets without SRH, IPv6 in IPv6
911         # outer IPv6 dest addr is the localsid End.DT6
912         packet_header = self.create_packet_header_IPv6_IPv6(
913                                             dst_inner,
914                                             dst_outer='a3::c4')
915         # add to traffic stream pg0->pg1
916         pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
917                                        self.pg_packet_sizes, count))
918
919         # send packets and verify received packets
920         # using same comparison function as End.DX6
921         self.send_and_verify_pkts(self.pg0, pkts, self.pg2,
922                                   self.compare_rx_tx_packet_End_DX6)
923
924         # assert nothing was received on the other interface (pg2)
925         self.pg1.assert_nothing_captured("mis-directed packet(s)")
926
927         # log the localsid counters
928         self.logger.info(self.vapi.cli("show sr localsid"))
929
930         # remove SRv6 localSIDs
931         localsid.remove_vpp_config()
932
933         # remove FIB entries
934         # done by tearDown
935
936         # cleanup interfaces
937         self.teardown_interfaces()
938
939     def test_SRv6_End_DX4(self):
940         """ Test SRv6 End.DX4 behavior.
941         """
942         # send traffic to one destination interface
943         # source interface is IPv6 only
944         # destination interface is IPv4 only
945         self.setup_interfaces(ipv6=[True, False], ipv4=[False, True])
946
947         # configure SRv6 localSID End.DX4 behavior
948         localsid = VppSRv6LocalSID(
949                         self, localsid='A3::C4',
950                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX4,
951                         nh_addr=self.pg1.remote_ip4,
952                         end_psp=0,
953                         sw_if_index=self.pg1.sw_if_index,
954                         vlan_index=0,
955                         fib_table=0)
956         localsid.add_vpp_config()
957         # log the localsids
958         self.logger.debug(self.vapi.cli("show sr localsid"))
959
960         # send one packet per packet size
961         count = len(self.pg_packet_sizes)
962         dst_inner = '4.1.1.123'  # inner header destination address
963         pkts = []
964
965         # packets with SRH, segments-left 0, active segment a3::c4
966         packet_header = self.create_packet_header_IPv6_SRH_IPv4(
967                         dst_inner,
968                         sidlist=['a3::c4', 'a2::', 'a1::'],
969                         segleft=0)
970         # add to traffic stream pg0->pg1
971         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
972                                        self.pg_packet_sizes, count))
973
974         # packets without SRH, IPv4 in IPv6
975         # outer IPv6 dest addr is the localsid End.DX4
976         packet_header = self.create_packet_header_IPv6_IPv4(
977                                             dst_inner,
978                                             dst_outer='a3::c4')
979         # add to traffic stream pg0->pg1
980         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
981                                        self.pg_packet_sizes, count))
982
983         # send packets and verify received packets
984         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
985                                   self.compare_rx_tx_packet_End_DX4)
986
987         # log the localsid counters
988         self.logger.info(self.vapi.cli("show sr localsid"))
989
990         # remove SRv6 localSIDs
991         localsid.remove_vpp_config()
992
993         # cleanup interfaces
994         self.teardown_interfaces()
995
996     def test_SRv6_End_DT4(self):
997         """ Test SRv6 End.DT4 behavior.
998         """
999         # create three interfaces (1 source, 2 destinations)
1000         # source interface is IPv6-only
1001         # destination interfaces are IPv4 only
1002         # source interface in global FIB (0)
1003         # destination interfaces in global and vrf
1004         vrf_1 = 1
1005         ipt = VppIpTable(self, vrf_1)
1006         ipt.add_vpp_config()
1007         self.setup_interfaces(ipv6=[True, False, False],
1008                               ipv4=[False, True, True],
1009                               ipv6_table_id=[0, 0, 0],
1010                               ipv4_table_id=[0, 0, vrf_1])
1011
1012         # configure FIB entries
1013         # 4.1.1.0/24 is reachable
1014         #     via pg1 in table 0 (global)
1015         #     and via pg2 in table vrf_1
1016         route0 = VppIpRoute(self, "4.1.1.0", 24,
1017                             [VppRoutePath(self.pg1.remote_ip4,
1018                                           self.pg1.sw_if_index,
1019                                           nh_table_id=0)],
1020                             table_id=0)
1021         route0.add_vpp_config()
1022         route1 = VppIpRoute(self, "4.1.1.0", 24,
1023                             [VppRoutePath(self.pg2.remote_ip4,
1024                                           self.pg2.sw_if_index,
1025                                           nh_table_id=vrf_1)],
1026                             table_id=vrf_1)
1027         route1.add_vpp_config()
1028         self.logger.debug(self.vapi.cli("show ip fib"))
1029
1030         # configure SRv6 localSID End.DT6 behavior
1031         # Note:
1032         # fib_table: where the localsid is installed
1033         # sw_if_index: in T-variants of localsid: vrf table_id
1034         localsid = VppSRv6LocalSID(
1035                         self, localsid='A3::C4',
1036                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT4,
1037                         nh_addr=0,
1038                         end_psp=0,
1039                         sw_if_index=vrf_1,
1040                         vlan_index=0,
1041                         fib_table=0)
1042         localsid.add_vpp_config()
1043         # log the localsids
1044         self.logger.debug(self.vapi.cli("show sr localsid"))
1045
1046         # create IPv6 packets with SRH (SL=0)
1047         # send one packet per packet size
1048         count = len(self.pg_packet_sizes)
1049         dst_inner = '4.1.1.123'  # inner header destination address
1050         pkts = []
1051
1052         # packets with SRH, segments-left 0, active segment a3::c4
1053         packet_header = self.create_packet_header_IPv6_SRH_IPv4(
1054                         dst_inner,
1055                         sidlist=['a3::c4', 'a2::', 'a1::'],
1056                         segleft=0)
1057         # add to traffic stream pg0->pg1
1058         pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
1059                                        self.pg_packet_sizes, count))
1060
1061         # packets without SRH, IPv6 in IPv6
1062         # outer IPv6 dest addr is the localsid End.DX4
1063         packet_header = self.create_packet_header_IPv6_IPv4(
1064                                             dst_inner,
1065                                             dst_outer='a3::c4')
1066         # add to traffic stream pg0->pg1
1067         pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
1068                                        self.pg_packet_sizes, count))
1069
1070         # send packets and verify received packets
1071         # using same comparison function as End.DX4
1072         self.send_and_verify_pkts(self.pg0, pkts, self.pg2,
1073                                   self.compare_rx_tx_packet_End_DX4)
1074
1075         # assert nothing was received on the other interface (pg2)
1076         self.pg1.assert_nothing_captured("mis-directed packet(s)")
1077
1078         # log the localsid counters
1079         self.logger.info(self.vapi.cli("show sr localsid"))
1080
1081         # remove SRv6 localSIDs
1082         localsid.remove_vpp_config()
1083
1084         # remove FIB entries
1085         # done by tearDown
1086
1087         # cleanup interfaces
1088         self.teardown_interfaces()
1089
1090     def test_SRv6_End_DX2(self):
1091         """ Test SRv6 End.DX2 behavior.
1092         """
1093         # send traffic to one destination interface
1094         # source interface is IPv6 only
1095         self.setup_interfaces(ipv6=[True, False], ipv4=[False, False])
1096
1097         # configure SRv6 localSID End.DX2 behavior
1098         localsid = VppSRv6LocalSID(
1099                         self, localsid='A3::C4',
1100                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX2,
1101                         nh_addr=0,
1102                         end_psp=0,
1103                         sw_if_index=self.pg1.sw_if_index,
1104                         vlan_index=0,
1105                         fib_table=0)
1106         localsid.add_vpp_config()
1107         # log the localsids
1108         self.logger.debug(self.vapi.cli("show sr localsid"))
1109
1110         # send one packet per packet size
1111         count = len(self.pg_packet_sizes)
1112         pkts = []
1113
1114         # packets with SRH, segments-left 0, active segment a3::c4
1115         # L2 has no dot1q header
1116         packet_header = self.create_packet_header_IPv6_SRH_L2(
1117                             sidlist=['a3::c4', 'a2::', 'a1::'],
1118                             segleft=0,
1119                             vlan=0)
1120         # add to traffic stream pg0->pg1
1121         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1122                                        self.pg_packet_sizes, count))
1123
1124         # packets with SRH, segments-left 0, active segment a3::c4
1125         # L2 has dot1q header
1126         packet_header = self.create_packet_header_IPv6_SRH_L2(
1127                             sidlist=['a3::c4', 'a2::', 'a1::'],
1128                             segleft=0,
1129                             vlan=123)
1130         # add to traffic stream pg0->pg1
1131         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1132                                        self.pg_packet_sizes, count))
1133
1134         # packets without SRH, L2 in IPv6
1135         # outer IPv6 dest addr is the localsid End.DX2
1136         # L2 has no dot1q header
1137         packet_header = self.create_packet_header_IPv6_L2(
1138                                             dst_outer='a3::c4',
1139                                             vlan=0)
1140         # add to traffic stream pg0->pg1
1141         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1142                                        self.pg_packet_sizes, count))
1143
1144         # packets without SRH, L2 in IPv6
1145         # outer IPv6 dest addr is the localsid End.DX2
1146         # L2 has dot1q header
1147         packet_header = self.create_packet_header_IPv6_L2(
1148                                             dst_outer='a3::c4',
1149                                             vlan=123)
1150         # add to traffic stream pg0->pg1
1151         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1152                                        self.pg_packet_sizes, count))
1153
1154         # send packets and verify received packets
1155         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
1156                                   self.compare_rx_tx_packet_End_DX2)
1157
1158         # log the localsid counters
1159         self.logger.info(self.vapi.cli("show sr localsid"))
1160
1161         # remove SRv6 localSIDs
1162         localsid.remove_vpp_config()
1163
1164         # cleanup interfaces
1165         self.teardown_interfaces()
1166
1167     @unittest.skipUnless(0, "PC to fix")
1168     def test_SRv6_T_Insert_Classifier(self):
1169         """ Test SRv6 Transit.Insert behavior (IPv6 only).
1170             steer packets using the classifier
1171         """
1172         # send traffic to one destination interface
1173         # source and destination are IPv6 only
1174         self.setup_interfaces(ipv6=[False, False, False, True, True])
1175
1176         # configure FIB entries
1177         route = VppIpRoute(self, "a4::", 64,
1178                            [VppRoutePath(
1179                                self.pg4.remote_ip6,
1180                                self.pg4.sw_if_index)])
1181         route.add_vpp_config()
1182
1183         # configure encaps IPv6 source address
1184         # needs to be done before SR Policy config
1185         # TODO: API?
1186         self.vapi.cli("set sr encaps source addr a3::")
1187
1188         bsid = 'a3::9999:1'
1189         # configure SRv6 Policy
1190         # Note: segment list order: first -> last
1191         sr_policy = VppSRv6Policy(
1192             self, bsid=bsid,
1193             is_encap=0,
1194             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
1195             weight=1, fib_table=0,
1196             segments=['a4::', 'a5::', 'a6::c7'],
1197             source='a3::')
1198         sr_policy.add_vpp_config()
1199         self.sr_policy = sr_policy
1200
1201         # log the sr policies
1202         self.logger.info(self.vapi.cli("show sr policies"))
1203
1204         # add classify table
1205         # mask on dst ip address prefix a7::/8
1206         mask = '{!s:0<16}'.format('ff')
1207         r = self.vapi.classify_add_del_table(
1208             1,
1209             binascii.unhexlify(mask),
1210             match_n_vectors=(len(mask) - 1) // 32 + 1,
1211             current_data_flag=1,
1212             skip_n_vectors=2)  # data offset
1213         self.assertIsNotNone(r, 'No response msg for add_del_table')
1214         table_index = r.new_table_index
1215
1216         # add the source routing node as a ip6 inacl netxt node
1217         r = self.vapi.add_node_next('ip6-inacl',
1218                                     'sr-pl-rewrite-insert')
1219         inacl_next_node_index = r.node_index
1220
1221         match = '{!s:0<16}'.format('a7')
1222         r = self.vapi.classify_add_del_session(
1223             1,
1224             table_index,
1225             binascii.unhexlify(match),
1226             hit_next_index=inacl_next_node_index,
1227             action=3,
1228             metadata=0)  # sr policy index
1229         self.assertIsNotNone(r, 'No response msg for add_del_session')
1230
1231         # log the classify table used in the steering policy
1232         self.logger.info(self.vapi.cli("show classify table"))
1233
1234         r = self.vapi.input_acl_set_interface(
1235             is_add=1,
1236             sw_if_index=self.pg3.sw_if_index,
1237             ip6_table_index=table_index)
1238         self.assertIsNotNone(r,
1239                              'No response msg for input_acl_set_interface')
1240
1241         # log the ip6 inacl
1242         self.logger.info(self.vapi.cli("show inacl type ip6"))
1243
1244         # create packets
1245         count = len(self.pg_packet_sizes)
1246         dst_inner = 'a7::1234'
1247         pkts = []
1248
1249         # create IPv6 packets without SRH
1250         packet_header = self.create_packet_header_IPv6(dst_inner)
1251         # create traffic stream pg3->pg4
1252         pkts.extend(self.create_stream(self.pg3, self.pg4, packet_header,
1253                                        self.pg_packet_sizes, count))
1254
1255         # create IPv6 packets with SRH
1256         # packets with segments-left 1, active segment a7::
1257         packet_header = self.create_packet_header_IPv6_SRH(
1258             sidlist=['a8::', 'a7::', 'a6::'],
1259             segleft=1)
1260         # create traffic stream pg3->pg4
1261         pkts.extend(self.create_stream(self.pg3, self.pg4, packet_header,
1262                                        self.pg_packet_sizes, count))
1263
1264         # send packets and verify received packets
1265         self.send_and_verify_pkts(self.pg3, pkts, self.pg4,
1266                                   self.compare_rx_tx_packet_T_Insert)
1267
1268         # remove the interface l2 input feature
1269         r = self.vapi.input_acl_set_interface(
1270             is_add=0,
1271             sw_if_index=self.pg3.sw_if_index,
1272             ip6_table_index=table_index)
1273         self.assertIsNotNone(r,
1274                              'No response msg for input_acl_set_interface')
1275
1276         # log the ip6 inacl after cleaning
1277         self.logger.info(self.vapi.cli("show inacl type ip6"))
1278
1279         # log the localsid counters
1280         self.logger.info(self.vapi.cli("show sr localsid"))
1281
1282         # remove classifier SR steering
1283         # classifier_steering.remove_vpp_config()
1284         self.logger.info(self.vapi.cli("show sr steering policies"))
1285
1286         # remove SR Policies
1287         self.sr_policy.remove_vpp_config()
1288         self.logger.info(self.vapi.cli("show sr policies"))
1289
1290         # remove classify session and table
1291         r = self.vapi.classify_add_del_session(
1292             0,
1293             table_index,
1294             binascii.unhexlify(match))
1295         self.assertIsNotNone(r, 'No response msg for add_del_session')
1296
1297         r = self.vapi.classify_add_del_table(
1298             0,
1299             binascii.unhexlify(mask),
1300             table_index=table_index)
1301         self.assertIsNotNone(r, 'No response msg for add_del_table')
1302
1303         self.logger.info(self.vapi.cli("show classify table"))
1304
1305         # remove FIB entries
1306         # done by tearDown
1307
1308         # cleanup interfaces
1309         self.teardown_interfaces()
1310
1311     def compare_rx_tx_packet_T_Encaps(self, tx_pkt, rx_pkt):
1312         """ Compare input and output packet after passing T.Encaps
1313
1314         :param tx_pkt: transmitted packet
1315         :param rx_pkt: received packet
1316         """
1317         # T.Encaps updates the headers as follows:
1318         # SR Policy seglist (S3, S2, S1)
1319         # SR Policy source C
1320         # IPv6:
1321         # in: IPv6(A, B2)
1322         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(A, B2)
1323         # IPv6 + SRH:
1324         # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1)
1325         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(a, B2)SRH(B3, B2, B1; SL=1)
1326
1327         # get first (outer) IPv6 header of rx'ed packet
1328         rx_ip = rx_pkt.getlayer(IPv6)
1329         rx_srh = None
1330
1331         tx_ip = tx_pkt.getlayer(IPv6)
1332
1333         # expected segment-list
1334         seglist = self.sr_policy.segments
1335         # reverse list to get order as in SRH
1336         tx_seglist = seglist[::-1]
1337
1338         # get source address of SR Policy
1339         sr_policy_source = self.sr_policy.source
1340
1341         # rx'ed packet should have SRH
1342         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1343         # get SRH
1344         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1345
1346         # received ip.src should be equal to SR Policy source
1347         self.assertEqual(rx_ip.src, sr_policy_source)
1348         # received ip.dst should be equal to expected sidlist[lastentry]
1349         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1350         # rx'ed seglist should be equal to expected seglist
1351         self.assertEqual(rx_srh.addresses, tx_seglist)
1352         # segleft should be equal to size expected seglist-1
1353         self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1354         # segleft should be equal to lastentry
1355         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1356
1357         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1358         # except for the hop-limit field
1359         #   -> update tx'ed hlim to the expected hlim
1360         tx_ip.hlim = tx_ip.hlim - 1
1361
1362         self.assertEqual(rx_srh.payload, tx_ip)
1363
1364         self.logger.debug("packet verification: SUCCESS")
1365
1366     def compare_rx_tx_packet_T_Encaps_IPv4(self, tx_pkt, rx_pkt):
1367         """ Compare input and output packet after passing T.Encaps for IPv4
1368
1369         :param tx_pkt: transmitted packet
1370         :param rx_pkt: received packet
1371         """
1372         # T.Encaps for IPv4 updates the headers as follows:
1373         # SR Policy seglist (S3, S2, S1)
1374         # SR Policy source C
1375         # IPv4:
1376         # in: IPv4(A, B2)
1377         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv4(A, B2)
1378
1379         # get first (outer) IPv6 header of rx'ed packet
1380         rx_ip = rx_pkt.getlayer(IPv6)
1381         rx_srh = None
1382
1383         tx_ip = tx_pkt.getlayer(IP)
1384
1385         # expected segment-list
1386         seglist = self.sr_policy.segments
1387         # reverse list to get order as in SRH
1388         tx_seglist = seglist[::-1]
1389
1390         # get source address of SR Policy
1391         sr_policy_source = self.sr_policy.source
1392
1393         # checks common to cases tx with and without SRH
1394         # rx'ed packet should have SRH and IPv4 header
1395         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1396         self.assertTrue(rx_ip.payload.haslayer(IP))
1397         # get SRH
1398         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1399
1400         # received ip.src should be equal to SR Policy source
1401         self.assertEqual(rx_ip.src, sr_policy_source)
1402         # received ip.dst should be equal to sidlist[lastentry]
1403         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1404         # rx'ed seglist should be equal to seglist
1405         self.assertEqual(rx_srh.addresses, tx_seglist)
1406         # segleft should be equal to size seglist-1
1407         self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1408         # segleft should be equal to lastentry
1409         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1410
1411         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1412         # except for the ttl field and ip checksum
1413         #   -> adjust tx'ed ttl to expected ttl
1414         tx_ip.ttl = tx_ip.ttl - 1
1415         #   -> set tx'ed ip checksum to None and let scapy recompute
1416         tx_ip.chksum = None
1417         # read back the pkt (with str()) to force computing these fields
1418         # probably other ways to accomplish this are possible
1419         tx_ip = IP(scapy.compat.raw(tx_ip))
1420
1421         self.assertEqual(rx_srh.payload, tx_ip)
1422
1423         self.logger.debug("packet verification: SUCCESS")
1424
1425     def compare_rx_tx_packet_T_Encaps_L2(self, tx_pkt, rx_pkt):
1426         """ Compare input and output packet after passing T.Encaps for L2
1427
1428         :param tx_pkt: transmitted packet
1429         :param rx_pkt: received packet
1430         """
1431         # T.Encaps for L2 updates the headers as follows:
1432         # SR Policy seglist (S3, S2, S1)
1433         # SR Policy source C
1434         # L2:
1435         # in: L2
1436         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)L2
1437
1438         # get first (outer) IPv6 header of rx'ed packet
1439         rx_ip = rx_pkt.getlayer(IPv6)
1440         rx_srh = None
1441
1442         tx_ether = tx_pkt.getlayer(Ether)
1443
1444         # expected segment-list
1445         seglist = self.sr_policy.segments
1446         # reverse list to get order as in SRH
1447         tx_seglist = seglist[::-1]
1448
1449         # get source address of SR Policy
1450         sr_policy_source = self.sr_policy.source
1451
1452         # rx'ed packet should have SRH
1453         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1454         # get SRH
1455         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1456
1457         # received ip.src should be equal to SR Policy source
1458         self.assertEqual(rx_ip.src, sr_policy_source)
1459         # received ip.dst should be equal to sidlist[lastentry]
1460         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1461         # rx'ed seglist should be equal to seglist
1462         self.assertEqual(rx_srh.addresses, tx_seglist)
1463         # segleft should be equal to size seglist-1
1464         self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1465         # segleft should be equal to lastentry
1466         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1467         # nh should be "No Next Header" (143)
1468         self.assertEqual(rx_srh.nh, 143)
1469
1470         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1471         self.assertEqual(Ether(scapy.compat.raw(rx_srh.payload)), tx_ether)
1472
1473         self.logger.debug("packet verification: SUCCESS")
1474
1475     def compare_rx_tx_packet_T_Insert(self, tx_pkt, rx_pkt):
1476         """ Compare input and output packet after passing T.Insert
1477
1478         :param tx_pkt: transmitted packet
1479         :param rx_pkt: received packet
1480         """
1481         # T.Insert updates the headers as follows:
1482         # IPv6:
1483         # in: IPv6(A, B2)
1484         # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)
1485         # IPv6 + SRH:
1486         # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1)
1487         # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)SRH(B3, B2, B1; SL=1)
1488
1489         # get first (outer) IPv6 header of rx'ed packet
1490         rx_ip = rx_pkt.getlayer(IPv6)
1491         rx_srh = None
1492         rx_ip2 = None
1493         rx_srh2 = None
1494         rx_ip3 = None
1495         rx_udp = rx_pkt[UDP]
1496
1497         tx_ip = tx_pkt.getlayer(IPv6)
1498         tx_srh = None
1499         tx_ip2 = None
1500         # some packets have been tx'ed with an SRH, some without it
1501         # get SRH if tx'ed packet has it
1502         if tx_pkt.haslayer(IPv6ExtHdrSegmentRouting):
1503             tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1504             tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1505         tx_udp = tx_pkt[UDP]
1506
1507         # expected segment-list (make copy of SR Policy segment list)
1508         seglist = self.sr_policy.segments[:]
1509         # expected seglist has initial dest addr as last segment
1510         seglist.append(tx_ip.dst)
1511         # reverse list to get order as in SRH
1512         tx_seglist = seglist[::-1]
1513
1514         # get source address of SR Policy
1515         sr_policy_source = self.sr_policy.source
1516
1517         # checks common to cases tx with and without SRH
1518         # rx'ed packet should have SRH and only one IPv6 header
1519         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1520         self.assertFalse(rx_ip.payload.haslayer(IPv6))
1521         # get SRH
1522         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1523
1524         # rx'ed ip.src should be equal to tx'ed ip.src
1525         self.assertEqual(rx_ip.src, tx_ip.src)
1526         # rx'ed ip.dst should be equal to sidlist[lastentry]
1527         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1528
1529         # rx'ed seglist should be equal to expected seglist
1530         self.assertEqual(rx_srh.addresses, tx_seglist)
1531         # segleft should be equal to size(expected seglist)-1
1532         self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1533         # segleft should be equal to lastentry
1534         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1535
1536         if tx_srh:  # packet was tx'ed with SRH
1537             # packet should have 2nd SRH
1538             self.assertTrue(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting))
1539             # get 2nd SRH
1540             rx_srh2 = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting, 2)
1541
1542             # rx'ed srh2.addresses should be equal to tx'ed srh.addresses
1543             self.assertEqual(rx_srh2.addresses, tx_srh.addresses)
1544             # rx'ed srh2.segleft should be equal to tx'ed srh.segleft
1545             self.assertEqual(rx_srh2.segleft, tx_srh.segleft)
1546             # rx'ed srh2.lastentry should be equal to tx'ed srh.lastentry
1547             self.assertEqual(rx_srh2.lastentry, tx_srh.lastentry)
1548
1549         else:  # packet was tx'ed without SRH
1550             # rx packet should have no other SRH
1551             self.assertFalse(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting))
1552
1553         # UDP layer should be unchanged
1554         self.assertEqual(rx_udp, tx_udp)
1555
1556         self.logger.debug("packet verification: SUCCESS")
1557
1558     def compare_rx_tx_packet_End(self, tx_pkt, rx_pkt):
1559         """ Compare input and output packet after passing End (without PSP)
1560
1561         :param tx_pkt: transmitted packet
1562         :param rx_pkt: received packet
1563         """
1564         # End (no PSP) updates the headers as follows:
1565         # IPv6 + SRH:
1566         # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2)
1567         # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1568
1569         # get first (outer) IPv6 header of rx'ed packet
1570         rx_ip = rx_pkt.getlayer(IPv6)
1571         rx_srh = None
1572         rx_ip2 = None
1573         rx_udp = rx_pkt[UDP]
1574
1575         tx_ip = tx_pkt.getlayer(IPv6)
1576         # we know the packet has been tx'ed
1577         # with an inner IPv6 header and an SRH
1578         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1579         tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1580         tx_udp = tx_pkt[UDP]
1581
1582         # common checks, regardless of tx segleft value
1583         # rx'ed packet should have 2nd IPv6 header
1584         self.assertTrue(rx_ip.payload.haslayer(IPv6))
1585         # get second (inner) IPv6 header
1586         rx_ip2 = rx_pkt.getlayer(IPv6, 2)
1587
1588         if tx_ip.segleft > 0:
1589             # SRH should NOT have been popped:
1590             #   End SID without PSP does not pop SRH if segleft>0
1591             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1592             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1593
1594             # received ip.src should be equal to expected ip.src
1595             self.assertEqual(rx_ip.src, tx_ip.src)
1596             # sidlist should be unchanged
1597             self.assertEqual(rx_srh.addresses, tx_srh.addresses)
1598             # segleft should have been decremented
1599             self.assertEqual(rx_srh.segleft, tx_srh.segleft-1)
1600             # received ip.dst should be equal to sidlist[segleft]
1601             self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
1602             # lastentry should be unchanged
1603             self.assertEqual(rx_srh.lastentry, tx_srh.lastentry)
1604             # inner IPv6 packet (ip2) should be unchanged
1605             self.assertEqual(rx_ip2.src, tx_ip2.src)
1606             self.assertEqual(rx_ip2.dst, tx_ip2.dst)
1607         # else:  # tx_ip.segleft == 0
1608             # TODO: Does this work with 2 SRHs in ingress packet?
1609
1610         # UDP layer should be unchanged
1611         self.assertEqual(rx_udp, tx_udp)
1612
1613         self.logger.debug("packet verification: SUCCESS")
1614
1615     def compare_rx_tx_packet_End_PSP(self, tx_pkt, rx_pkt):
1616         """ Compare input and output packet after passing End with PSP
1617
1618         :param tx_pkt: transmitted packet
1619         :param rx_pkt: received packet
1620         """
1621         # End (PSP) updates the headers as follows:
1622         # IPv6 + SRH (SL>1):
1623         # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2)
1624         # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1625         # IPv6 + SRH (SL=1):
1626         # in: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1627         # out: IPv6(A, S3)
1628
1629         # get first (outer) IPv6 header of rx'ed packet
1630         rx_ip = rx_pkt.getlayer(IPv6)
1631         rx_srh = None
1632         rx_ip2 = None
1633         rx_udp = rx_pkt[UDP]
1634
1635         tx_ip = tx_pkt.getlayer(IPv6)
1636         # we know the packet has been tx'ed
1637         # with an inner IPv6 header and an SRH
1638         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1639         tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1640         tx_udp = tx_pkt[UDP]
1641
1642         # common checks, regardless of tx segleft value
1643         self.assertTrue(rx_ip.payload.haslayer(IPv6))
1644         rx_ip2 = rx_pkt.getlayer(IPv6, 2)
1645         # inner IPv6 packet (ip2) should be unchanged
1646         self.assertEqual(rx_ip2.src, tx_ip2.src)
1647         self.assertEqual(rx_ip2.dst, tx_ip2.dst)
1648
1649         if tx_ip.segleft > 1:
1650             # SRH should NOT have been popped:
1651             #   End SID with PSP does not pop SRH if segleft>1
1652             # rx'ed packet should have SRH
1653             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1654             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1655
1656             # received ip.src should be equal to expected ip.src
1657             self.assertEqual(rx_ip.src, tx_ip.src)
1658             # sidlist should be unchanged
1659             self.assertEqual(rx_srh.addresses, tx_srh.addresses)
1660             # segleft should have been decremented
1661             self.assertEqual(rx_srh.segleft, tx_srh.segleft-1)
1662             # received ip.dst should be equal to sidlist[segleft]
1663             self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
1664             # lastentry should be unchanged
1665             self.assertEqual(rx_srh.lastentry, tx_srh.lastentry)
1666
1667         else:  # tx_ip.segleft <= 1
1668             # SRH should have been popped:
1669             #   End SID with PSP and segleft=1 pops SRH
1670             # the two IPv6 headers are still present
1671             # outer IPv6 header has DA == last segment of popped SRH
1672             # SRH should not be present
1673             self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1674             # outer IPv6 header ip.src should be equal to tx'ed ip.src
1675             self.assertEqual(rx_ip.src, tx_ip.src)
1676             # outer IPv6 header ip.dst should be = to tx'ed sidlist[segleft-1]
1677             self.assertEqual(rx_ip.dst, tx_srh.addresses[tx_srh.segleft-1])
1678
1679         # UDP layer should be unchanged
1680         self.assertEqual(rx_udp, tx_udp)
1681
1682         self.logger.debug("packet verification: SUCCESS")
1683
1684     def compare_rx_tx_packet_End_DX6(self, tx_pkt, rx_pkt):
1685         """ Compare input and output packet after passing End.DX6
1686
1687         :param tx_pkt: transmitted packet
1688         :param rx_pkt: received packet
1689         """
1690         # End.DX6 updates the headers as follows:
1691         # IPv6 + SRH (SL=0):
1692         # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv6(B, D)
1693         # out: IPv6(B, D)
1694         # IPv6:
1695         # in: IPv6(A, S3)IPv6(B, D)
1696         # out: IPv6(B, D)
1697
1698         # get first (outer) IPv6 header of rx'ed packet
1699         rx_ip = rx_pkt.getlayer(IPv6)
1700
1701         tx_ip = tx_pkt.getlayer(IPv6)
1702         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1703
1704         # verify if rx'ed packet has no SRH
1705         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1706
1707         # the whole rx_ip pkt should be equal to tx_ip2
1708         # except for the hlim field
1709         #   -> adjust tx'ed hlim to expected hlim
1710         tx_ip2.hlim = tx_ip2.hlim - 1
1711
1712         self.assertEqual(rx_ip, tx_ip2)
1713
1714         self.logger.debug("packet verification: SUCCESS")
1715
1716     def compare_rx_tx_packet_End_DX4(self, tx_pkt, rx_pkt):
1717         """ Compare input and output packet after passing End.DX4
1718
1719         :param tx_pkt: transmitted packet
1720         :param rx_pkt: received packet
1721         """
1722         # End.DX4 updates the headers as follows:
1723         # IPv6 + SRH (SL=0):
1724         # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv4(B, D)
1725         # out: IPv4(B, D)
1726         # IPv6:
1727         # in: IPv6(A, S3)IPv4(B, D)
1728         # out: IPv4(B, D)
1729
1730         # get IPv4 header of rx'ed packet
1731         rx_ip = rx_pkt.getlayer(IP)
1732
1733         tx_ip = tx_pkt.getlayer(IPv6)
1734         tx_ip2 = tx_pkt.getlayer(IP)
1735
1736         # verify if rx'ed packet has no SRH
1737         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1738
1739         # the whole rx_ip pkt should be equal to tx_ip2
1740         # except for the ttl field and ip checksum
1741         #   -> adjust tx'ed ttl to expected ttl
1742         tx_ip2.ttl = tx_ip2.ttl - 1
1743         #   -> set tx'ed ip checksum to None and let scapy recompute
1744         tx_ip2.chksum = None
1745         # read back the pkt (with str()) to force computing these fields
1746         # probably other ways to accomplish this are possible
1747         tx_ip2 = IP(scapy.compat.raw(tx_ip2))
1748
1749         self.assertEqual(rx_ip, tx_ip2)
1750
1751         self.logger.debug("packet verification: SUCCESS")
1752
1753     def compare_rx_tx_packet_End_DX2(self, tx_pkt, rx_pkt):
1754         """ Compare input and output packet after passing End.DX2
1755
1756         :param tx_pkt: transmitted packet
1757         :param rx_pkt: received packet
1758         """
1759         # End.DX2 updates the headers as follows:
1760         # IPv6 + SRH (SL=0):
1761         # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)L2
1762         # out: L2
1763         # IPv6:
1764         # in: IPv6(A, S3)L2
1765         # out: L2
1766
1767         # get IPv4 header of rx'ed packet
1768         rx_eth = rx_pkt.getlayer(Ether)
1769
1770         tx_ip = tx_pkt.getlayer(IPv6)
1771         # we can't just get the 2nd Ether layer
1772         # get the Raw content and dissect it as Ether
1773         tx_eth1 = Ether(scapy.compat.raw(tx_pkt[Raw]))
1774
1775         # verify if rx'ed packet has no SRH
1776         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1777
1778         # the whole rx_eth pkt should be equal to tx_eth1
1779         self.assertEqual(rx_eth, tx_eth1)
1780
1781         self.logger.debug("packet verification: SUCCESS")
1782
1783     def create_stream(self, src_if, dst_if, packet_header, packet_sizes,
1784                       count):
1785         """Create SRv6 input packet stream for defined interface.
1786
1787         :param VppInterface src_if: Interface to create packet stream for
1788         :param VppInterface dst_if: destination interface of packet stream
1789         :param packet_header: Layer3 scapy packet headers,
1790                 L2 is added when not provided,
1791                 Raw(payload) with packet_info is added
1792         :param list packet_sizes: packet stream pckt sizes,sequentially applied
1793                to packets in stream have
1794         :param int count: number of packets in packet stream
1795         :return: list of packets
1796         """
1797         self.logger.info("Creating packets")
1798         pkts = []
1799         for i in range(0, count-1):
1800             payload_info = self.create_packet_info(src_if, dst_if)
1801             self.logger.debug(
1802                 "Creating packet with index %d" % (payload_info.index))
1803             payload = self.info_to_payload(payload_info)
1804             # add L2 header if not yet provided in packet_header
1805             if packet_header.getlayer(0).name == 'Ethernet':
1806                 p = (packet_header /
1807                      Raw(payload))
1808             else:
1809                 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
1810                      packet_header /
1811                      Raw(payload))
1812             size = packet_sizes[i % len(packet_sizes)]
1813             self.logger.debug("Packet size %d" % (size))
1814             self.extend_packet(p, size)
1815             # we need to store the packet with the automatic fields computed
1816             # read back the dumped packet (with str())
1817             # to force computing these fields
1818             # probably other ways are possible
1819             p = Ether(scapy.compat.raw(p))
1820             payload_info.data = p.copy()
1821             self.logger.debug(ppp("Created packet:", p))
1822             pkts.append(p)
1823         self.logger.info("Done creating packets")
1824         return pkts
1825
1826     def send_and_verify_pkts(self, input, pkts, output, compare_func,
1827                              expected_count=None):
1828         """Send packets and verify received packets using compare_func
1829
1830         :param input: ingress interface of DUT
1831         :param pkts: list of packets to transmit
1832         :param output: egress interface of DUT
1833         :param compare_func: function to compare in and out packets
1834         :param expected_count: expected number of captured packets (if
1835                different than len(pkts))
1836         """
1837         # add traffic stream to input interface
1838         input.add_stream(pkts)
1839
1840         # enable capture on all interfaces
1841         self.pg_enable_capture(self.pg_interfaces)
1842
1843         # start traffic
1844         self.logger.info("Starting traffic")
1845         self.pg_start()
1846
1847         # get output capture
1848         self.logger.info("Getting packet capture")
1849         capture = output.get_capture(expected_count=expected_count)
1850
1851         # assert nothing was captured on input interface
1852         input.assert_nothing_captured()
1853
1854         # verify captured packets
1855         self.verify_captured_pkts(output, capture, compare_func)
1856
1857     def create_packet_header_IPv6(self, dst):
1858         """Create packet header: IPv6 header, UDP header
1859
1860         :param dst: IPv6 destination address
1861
1862         IPv6 source address is 1234::1
1863         UDP source port and destination port are 1234
1864         """
1865
1866         p = (IPv6(src='1234::1', dst=dst) /
1867              UDP(sport=1234, dport=1234))
1868         return p
1869
1870     def create_packet_header_IPv6_SRH(self, sidlist, segleft):
1871         """Create packet header: IPv6 header with SRH, UDP header
1872
1873         :param list sidlist: segment list
1874         :param int segleft: segments-left field value
1875
1876         IPv6 destination address is set to sidlist[segleft]
1877         IPv6 source addresses are 1234::1 and 4321::1
1878         UDP source port and destination port are 1234
1879         """
1880
1881         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1882              IPv6ExtHdrSegmentRouting(addresses=sidlist) /
1883              UDP(sport=1234, dport=1234))
1884         return p
1885
1886     def create_packet_header_IPv6_SRH_IPv6(self, dst, sidlist, segleft):
1887         """Create packet header: IPv6 encapsulated in SRv6:
1888         IPv6 header with SRH, IPv6 header, UDP header
1889
1890         :param ipv6address dst: inner IPv6 destination address
1891         :param list sidlist: segment list of outer IPv6 SRH
1892         :param int segleft: segments-left field of outer IPv6 SRH
1893
1894         Outer IPv6 destination address is set to sidlist[segleft]
1895         IPv6 source addresses are 1234::1 and 4321::1
1896         UDP source port and destination port are 1234
1897         """
1898
1899         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1900              IPv6ExtHdrSegmentRouting(addresses=sidlist,
1901                                       segleft=segleft, nh=41) /
1902              IPv6(src='4321::1', dst=dst) /
1903              UDP(sport=1234, dport=1234))
1904         return p
1905
1906     def create_packet_header_IPv6_IPv6(self, dst_inner, dst_outer):
1907         """Create packet header: IPv6 encapsulated in IPv6:
1908         IPv6 header, IPv6 header, UDP header
1909
1910         :param ipv6address dst_inner: inner IPv6 destination address
1911         :param ipv6address dst_outer: outer IPv6 destination address
1912
1913         IPv6 source addresses are 1234::1 and 4321::1
1914         UDP source port and destination port are 1234
1915         """
1916
1917         p = (IPv6(src='1234::1', dst=dst_outer) /
1918              IPv6(src='4321::1', dst=dst_inner) /
1919              UDP(sport=1234, dport=1234))
1920         return p
1921
1922     def create_packet_header_IPv6_SRH_SRH_IPv6(self, dst, sidlist1, segleft1,
1923                                                sidlist2, segleft2):
1924         """Create packet header: IPv6 encapsulated in SRv6 with 2 SRH:
1925         IPv6 header with SRH, 2nd SRH, IPv6 header, UDP header
1926
1927         :param ipv6address dst: inner IPv6 destination address
1928         :param list sidlist1: segment list of outer IPv6 SRH
1929         :param int segleft1: segments-left field of outer IPv6 SRH
1930         :param list sidlist2: segment list of inner IPv6 SRH
1931         :param int segleft2: segments-left field of inner IPv6 SRH
1932
1933         Outer IPv6 destination address is set to sidlist[segleft]
1934         IPv6 source addresses are 1234::1 and 4321::1
1935         UDP source port and destination port are 1234
1936         """
1937
1938         p = (IPv6(src='1234::1', dst=sidlist1[segleft1]) /
1939              IPv6ExtHdrSegmentRouting(addresses=sidlist1,
1940                                       segleft=segleft1, nh=43) /
1941              IPv6ExtHdrSegmentRouting(addresses=sidlist2,
1942                                       segleft=segleft2, nh=41) /
1943              IPv6(src='4321::1', dst=dst) /
1944              UDP(sport=1234, dport=1234))
1945         return p
1946
1947     def create_packet_header_IPv4(self, dst):
1948         """Create packet header: IPv4 header, UDP header
1949
1950         :param dst: IPv4 destination address
1951
1952         IPv4 source address is 123.1.1.1
1953         UDP source port and destination port are 1234
1954         """
1955
1956         p = (IP(src='123.1.1.1', dst=dst) /
1957              UDP(sport=1234, dport=1234))
1958         return p
1959
1960     def create_packet_header_IPv6_IPv4(self, dst_inner, dst_outer):
1961         """Create packet header: IPv4 encapsulated in IPv6:
1962         IPv6 header, IPv4 header, UDP header
1963
1964         :param ipv4address dst_inner: inner IPv4 destination address
1965         :param ipv6address dst_outer: outer IPv6 destination address
1966
1967         IPv6 source address is 1234::1
1968         IPv4 source address is 123.1.1.1
1969         UDP source port and destination port are 1234
1970         """
1971
1972         p = (IPv6(src='1234::1', dst=dst_outer) /
1973              IP(src='123.1.1.1', dst=dst_inner) /
1974              UDP(sport=1234, dport=1234))
1975         return p
1976
1977     def create_packet_header_IPv6_SRH_IPv4(self, dst, sidlist, segleft):
1978         """Create packet header: IPv4 encapsulated in SRv6:
1979         IPv6 header with SRH, IPv4 header, UDP header
1980
1981         :param ipv4address dst: inner IPv4 destination address
1982         :param list sidlist: segment list of outer IPv6 SRH
1983         :param int segleft: segments-left field of outer IPv6 SRH
1984
1985         Outer IPv6 destination address is set to sidlist[segleft]
1986         IPv6 source address is 1234::1
1987         IPv4 source address is 123.1.1.1
1988         UDP source port and destination port are 1234
1989         """
1990
1991         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1992              IPv6ExtHdrSegmentRouting(addresses=sidlist,
1993                                       segleft=segleft, nh=4) /
1994              IP(src='123.1.1.1', dst=dst) /
1995              UDP(sport=1234, dport=1234))
1996         return p
1997
1998     def create_packet_header_L2(self, vlan=0):
1999         """Create packet header: L2 header
2000
2001         :param vlan: if vlan!=0 then add 802.1q header
2002         """
2003         # Note: the dst addr ('00:55:44:33:22:11') is used in
2004         # the compare function compare_rx_tx_packet_T_Encaps_L2
2005         # to detect presence of L2 in SRH payload
2006         p = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
2007         etype = 0x8137  # IPX
2008         if vlan:
2009             # add 802.1q layer
2010             p /= Dot1Q(vlan=vlan, type=etype)
2011         else:
2012             p.type = etype
2013         return p
2014
2015     def create_packet_header_IPv6_SRH_L2(self, sidlist, segleft, vlan=0):
2016         """Create packet header: L2 encapsulated in SRv6:
2017         IPv6 header with SRH, L2
2018
2019         :param list sidlist: segment list of outer IPv6 SRH
2020         :param int segleft: segments-left field of outer IPv6 SRH
2021         :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
2022
2023         Outer IPv6 destination address is set to sidlist[segleft]
2024         IPv6 source address is 1234::1
2025         """
2026         eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
2027         etype = 0x8137  # IPX
2028         if vlan:
2029             # add 802.1q layer
2030             eth /= Dot1Q(vlan=vlan, type=etype)
2031         else:
2032             eth.type = etype
2033
2034         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
2035              IPv6ExtHdrSegmentRouting(addresses=sidlist,
2036                                       segleft=segleft, nh=143) /
2037              eth)
2038         return p
2039
2040     def create_packet_header_IPv6_L2(self, dst_outer, vlan=0):
2041         """Create packet header: L2 encapsulated in IPv6:
2042         IPv6 header, L2
2043
2044         :param ipv6address dst_outer: outer IPv6 destination address
2045         :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
2046         """
2047         eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
2048         etype = 0x8137  # IPX
2049         if vlan:
2050             # add 802.1q layer
2051             eth /= Dot1Q(vlan=vlan, type=etype)
2052         else:
2053             eth.type = etype
2054
2055         p = (IPv6(src='1234::1', dst=dst_outer, nh=143) / eth)
2056         return p
2057
2058     def get_payload_info(self, packet):
2059         """ Extract the payload_info from the packet
2060         """
2061         # in most cases, payload_info is in packet[Raw]
2062         # but packet[Raw] gives the complete payload
2063         # (incl L2 header) for the T.Encaps L2 case
2064         try:
2065             payload_info = self.payload_to_info(packet[Raw])
2066
2067         except:
2068             # remote L2 header from packet[Raw]:
2069             # take packet[Raw], convert it to an Ether layer
2070             # and then extract Raw from it
2071             payload_info = self.payload_to_info(
2072                 Ether(scapy.compat.r(packet[Raw]))[Raw])
2073
2074         return payload_info
2075
2076     def verify_captured_pkts(self, dst_if, capture, compare_func):
2077         """
2078         Verify captured packet stream for specified interface.
2079         Compare ingress with egress packets using the specified compare fn
2080
2081         :param dst_if: egress interface of DUT
2082         :param capture: captured packets
2083         :param compare_func: function to compare in and out packet
2084         """
2085         self.logger.info("Verifying capture on interface %s using function %s"
2086                          % (dst_if.name, compare_func.__name__))
2087
2088         last_info = dict()
2089         for i in self.pg_interfaces:
2090             last_info[i.sw_if_index] = None
2091         dst_sw_if_index = dst_if.sw_if_index
2092
2093         for packet in capture:
2094             try:
2095                 # extract payload_info from packet's payload
2096                 payload_info = self.get_payload_info(packet)
2097                 packet_index = payload_info.index
2098
2099                 self.logger.debug("Verifying packet with index %d"
2100                                   % (packet_index))
2101                 # packet should have arrived on the expected interface
2102                 self.assertEqual(payload_info.dst, dst_sw_if_index)
2103                 self.logger.debug(
2104                     "Got packet on interface %s: src=%u (idx=%u)" %
2105                     (dst_if.name, payload_info.src, packet_index))
2106
2107                 # search for payload_info with same src and dst if_index
2108                 # this will give us the transmitted packet
2109                 next_info = self.get_next_packet_info_for_interface2(
2110                     payload_info.src, dst_sw_if_index,
2111                     last_info[payload_info.src])
2112                 last_info[payload_info.src] = next_info
2113                 # next_info should not be None
2114                 self.assertTrue(next_info is not None)
2115                 # index of tx and rx packets should be equal
2116                 self.assertEqual(packet_index, next_info.index)
2117                 # data field of next_info contains the tx packet
2118                 txed_packet = next_info.data
2119
2120                 self.logger.debug(ppp("Transmitted packet:",
2121                                       txed_packet))  # ppp=Pretty Print Packet
2122
2123                 self.logger.debug(ppp("Received packet:", packet))
2124
2125                 # compare rcvd packet with expected packet using compare_func
2126                 compare_func(txed_packet, packet)
2127
2128             except:
2129                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2130                 raise
2131
2132         # FIXME: there is no need to check manually that all the packets
2133         #        arrived (already done so by get_capture); checking here
2134         #        prevents testing packets that are expected to be dropped, so
2135         #        commenting this out for now
2136
2137         # have all expected packets arrived?
2138         # for i in self.pg_interfaces:
2139         #    remaining_packet = self.get_next_packet_info_for_interface2(
2140         #        i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
2141         #    self.assertTrue(remaining_packet is None,
2142         #                    "Interface %s: Packet expected from interface %s "
2143         #                    "didn't arrive" % (dst_if.name, i.name))
2144
2145
2146 if __name__ == '__main__':
2147     unittest.main(testRunner=VppTestRunner)