SRv6 tests
[vpp.git] / test / test_srv6.py
1 #!/usr/bin/env python
2
3 import unittest
4 from socket import AF_INET6
5
6 from framework import VppTestCase, VppTestRunner
7 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
8 from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \
9     SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes
10
11 from scapy.packet import Raw
12 from scapy.layers.l2 import Ether, Dot1Q
13 from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
14 from scapy.layers.inet import IP, UDP
15
16 from scapy.utils import inet_pton, inet_ntop
17
18 from util import ppp
19
20
21 class TestSRv6(VppTestCase):
22     """ SRv6 Test Case """
23
24     @classmethod
25     def setUpClass(self):
26         super(TestSRv6, self).setUpClass()
27
28     def setUp(self):
29         """ Perform test setup before each test case.
30         """
31         super(TestSRv6, self).setUp()
32
33         # packet sizes, inclusive L2 overhead
34         self.pg_packet_sizes = [64, 512, 1518, 9018]
35
36         # reset packet_infos
37         self.reset_packet_infos()
38
39     def tearDown(self):
40         """ Clean up test setup after each test case.
41         """
42         self.teardown_interfaces()
43
44         super(TestSRv6, self).tearDown()
45
46     def configure_interface(self,
47                             interface,
48                             ipv6=False, ipv4=False,
49                             ipv6_table_id=0, ipv4_table_id=0):
50         """ Configure interface.
51         :param ipv6: configure IPv6 on interface
52         :param ipv4: configure IPv4 on interface
53         :param ipv6_table_id: FIB table_id for IPv6
54         :param ipv4_table_id: FIB table_id for IPv4
55         """
56         self.logger.debug("Configuring interface %s" % (interface.name))
57         if ipv6:
58             self.logger.debug("Configuring IPv6")
59             interface.set_table_ip6(ipv6_table_id)
60             interface.config_ip6()
61             interface.resolve_ndp(timeout=5)
62         if ipv4:
63             self.logger.debug("Configuring IPv4")
64             interface.set_table_ip4(ipv4_table_id)
65             interface.config_ip4()
66             interface.resolve_arp()
67         interface.admin_up()
68
69     def setup_interfaces(self, ipv6=[], ipv4=[],
70                          ipv6_table_id=[], ipv4_table_id=[]):
71         """ Create and configure interfaces.
72
73         :param ipv6: list of interface IPv6 capabilities
74         :param ipv4: list of interface IPv4 capabilities
75         :param ipv6_table_id: list of intf IPv6 FIB table_ids
76         :param ipv4_table_id: list of intf IPv4 FIB table_ids
77         :returns: List of created interfaces.
78         """
79         # how many interfaces?
80         if len(ipv6):
81             count = len(ipv6)
82         else:
83             count = len(ipv4)
84         self.logger.debug("Creating and configuring %d interfaces" % (count))
85
86         # fill up ipv6 and ipv4 lists if needed
87         # not enabled (False) is the default
88         if len(ipv6) < count:
89             ipv6 += (count - len(ipv6)) * [False]
90         if len(ipv4) < count:
91             ipv4 += (count - len(ipv4)) * [False]
92
93         # fill up table_id lists if needed
94         # table_id 0 (global) is the default
95         if len(ipv6_table_id) < count:
96             ipv6_table_id += (count - len(ipv6_table_id)) * [0]
97         if len(ipv4_table_id) < count:
98             ipv4_table_id += (count - len(ipv4_table_id)) * [0]
99
100         # create 'count' pg interfaces
101         self.create_pg_interfaces(range(count))
102
103         # setup all interfaces
104         for i in range(count):
105             intf = self.pg_interfaces[i]
106             self.configure_interface(intf,
107                                      ipv6[i], ipv4[i],
108                                      ipv6_table_id[i], ipv4_table_id[i])
109
110         if any(ipv6):
111             self.logger.debug(self.vapi.cli("show ip6 neighbors"))
112         if any(ipv4):
113             self.logger.debug(self.vapi.cli("show ip arp"))
114         self.logger.debug(self.vapi.cli("show interface"))
115         self.logger.debug(self.vapi.cli("show hardware"))
116
117         return self.pg_interfaces
118
119     def teardown_interfaces(self):
120         """ Unconfigure and bring down interface.
121         """
122         self.logger.debug("Tearing down interfaces")
123         # tear down all interfaces
124         # AFAIK they cannot be deleted
125         for i in self.pg_interfaces:
126             self.logger.debug("Tear down interface %s" % (i.name))
127             i.admin_down()
128             i.unconfig()
129
130     def test_SRv6_T_Encaps(self):
131         """ Test SRv6 Transit.Encaps behavior for IPv6.
132         """
133         # send traffic to one destination interface
134         # source and destination are IPv6 only
135         self.setup_interfaces(ipv6=[True, True])
136
137         # configure FIB entries
138         route = VppIpRoute(self, "a4::", 64,
139                            [VppRoutePath(self.pg1.remote_ip6,
140                                          self.pg1.sw_if_index,
141                                          proto=DpoProto.DPO_PROTO_IP6)],
142                            is_ip6=1)
143         route.add_vpp_config()
144
145         # configure encaps IPv6 source address
146         # needs to be done before SR Policy config
147         # TODO: API?
148         self.vapi.cli("set sr encaps source addr a3::")
149
150         bsid = 'a3::9999:1'
151         # configure SRv6 Policy
152         # Note: segment list order: first -> last
153         sr_policy = VppSRv6Policy(
154             self, bsid=bsid,
155             is_encap=1,
156             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
157             weight=1, fib_table=0,
158             segments=['a4::', 'a5::', 'a6::c7'],
159             source='a3::')
160         sr_policy.add_vpp_config()
161         self.sr_policy = sr_policy
162
163         # log the sr policies
164         self.logger.info(self.vapi.cli("show sr policies"))
165
166         # steer IPv6 traffic to a7::/64 into SRv6 Policy
167         # use the bsid of the above self.sr_policy
168         pol_steering = VppSRv6Steering(
169                         self,
170                         bsid=self.sr_policy.bsid,
171                         prefix="a7::", mask_width=64,
172                         traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
173                         sr_policy_index=0, table_id=0,
174                         sw_if_index=0)
175         pol_steering.add_vpp_config()
176
177         # log the sr steering policies
178         self.logger.info(self.vapi.cli("show sr steering policies"))
179
180         # create packets
181         count = len(self.pg_packet_sizes)
182         dst_inner = 'a7::1234'
183         pkts = []
184
185         # create IPv6 packets without SRH
186         packet_header = self.create_packet_header_IPv6(dst_inner)
187         # create traffic stream pg0->pg1
188         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
189                                        self.pg_packet_sizes, count))
190
191         # create IPv6 packets with SRH
192         # packets with segments-left 1, active segment a7::
193         packet_header = self.create_packet_header_IPv6_SRH(
194             sidlist=['a8::', 'a7::', 'a6::'],
195             segleft=1)
196         # create traffic stream pg0->pg1
197         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
198                                        self.pg_packet_sizes, count))
199
200         # create IPv6 packets with SRH and IPv6
201         # packets with segments-left 1, active segment a7::
202         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
203             dst_inner,
204             sidlist=['a8::', 'a7::', 'a6::'],
205             segleft=1)
206         # create traffic stream pg0->pg1
207         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
208                                        self.pg_packet_sizes, count))
209
210         # send packets and verify received packets
211         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
212                                   self.compare_rx_tx_packet_T_Encaps)
213
214         # log the localsid counters
215         self.logger.info(self.vapi.cli("show sr localsid"))
216
217         # remove SR steering
218         pol_steering.remove_vpp_config()
219         self.logger.info(self.vapi.cli("show sr steering policies"))
220
221         # remove SR Policies
222         self.sr_policy.remove_vpp_config()
223         self.logger.info(self.vapi.cli("show sr policies"))
224
225         # remove FIB entries
226         # done by tearDown
227
228         # cleanup interfaces
229         self.teardown_interfaces()
230
231     def test_SRv6_T_Insert(self):
232         """ Test SRv6 Transit.Insert behavior (IPv6 only).
233         """
234         # send traffic to one destination interface
235         # source and destination are IPv6 only
236         self.setup_interfaces(ipv6=[True, True])
237
238         # configure FIB entries
239         route = VppIpRoute(self, "a4::", 64,
240                            [VppRoutePath(self.pg1.remote_ip6,
241                                          self.pg1.sw_if_index,
242                                          proto=DpoProto.DPO_PROTO_IP6)],
243                            is_ip6=1)
244         route.add_vpp_config()
245
246         # configure encaps IPv6 source address
247         # needs to be done before SR Policy config
248         # TODO: API?
249         self.vapi.cli("set sr encaps source addr a3::")
250
251         bsid = 'a3::9999:1'
252         # configure SRv6 Policy
253         # Note: segment list order: first -> last
254         sr_policy = VppSRv6Policy(
255             self, bsid=bsid,
256             is_encap=0,
257             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
258             weight=1, fib_table=0,
259             segments=['a4::', 'a5::', 'a6::c7'],
260             source='a3::')
261         sr_policy.add_vpp_config()
262         self.sr_policy = sr_policy
263
264         # log the sr policies
265         self.logger.info(self.vapi.cli("show sr policies"))
266
267         # steer IPv6 traffic to a7::/64 into SRv6 Policy
268         # use the bsid of the above self.sr_policy
269         pol_steering = VppSRv6Steering(
270                         self,
271                         bsid=self.sr_policy.bsid,
272                         prefix="a7::", mask_width=64,
273                         traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
274                         sr_policy_index=0, table_id=0,
275                         sw_if_index=0)
276         pol_steering.add_vpp_config()
277
278         # log the sr steering policies
279         self.logger.info(self.vapi.cli("show sr steering policies"))
280
281         # create packets
282         count = len(self.pg_packet_sizes)
283         dst_inner = 'a7::1234'
284         pkts = []
285
286         # create IPv6 packets without SRH
287         packet_header = self.create_packet_header_IPv6(dst_inner)
288         # create traffic stream pg0->pg1
289         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
290                                        self.pg_packet_sizes, count))
291
292         # create IPv6 packets with SRH
293         # packets with segments-left 1, active segment a7::
294         packet_header = self.create_packet_header_IPv6_SRH(
295             sidlist=['a8::', 'a7::', 'a6::'],
296             segleft=1)
297         # create traffic stream pg0->pg1
298         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
299                                        self.pg_packet_sizes, count))
300
301         # send packets and verify received packets
302         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
303                                   self.compare_rx_tx_packet_T_Insert)
304
305         # log the localsid counters
306         self.logger.info(self.vapi.cli("show sr localsid"))
307
308         # remove SR steering
309         pol_steering.remove_vpp_config()
310         self.logger.info(self.vapi.cli("show sr steering policies"))
311
312         # remove SR Policies
313         self.sr_policy.remove_vpp_config()
314         self.logger.info(self.vapi.cli("show sr policies"))
315
316         # remove FIB entries
317         # done by tearDown
318
319         # cleanup interfaces
320         self.teardown_interfaces()
321
322     def test_SRv6_T_Encaps_IPv4(self):
323         """ Test SRv6 Transit.Encaps behavior for IPv4.
324         """
325         # send traffic to one destination interface
326         # source interface is IPv4 only
327         # destination interface is IPv6 only
328         self.setup_interfaces(ipv6=[False, True], ipv4=[True, False])
329
330         # configure FIB entries
331         route = VppIpRoute(self, "a4::", 64,
332                            [VppRoutePath(self.pg1.remote_ip6,
333                                          self.pg1.sw_if_index,
334                                          proto=DpoProto.DPO_PROTO_IP6)],
335                            is_ip6=1)
336         route.add_vpp_config()
337
338         # configure encaps IPv6 source address
339         # needs to be done before SR Policy config
340         # TODO: API?
341         self.vapi.cli("set sr encaps source addr a3::")
342
343         bsid = 'a3::9999:1'
344         # configure SRv6 Policy
345         # Note: segment list order: first -> last
346         sr_policy = VppSRv6Policy(
347             self, bsid=bsid,
348             is_encap=1,
349             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
350             weight=1, fib_table=0,
351             segments=['a4::', 'a5::', 'a6::c7'],
352             source='a3::')
353         sr_policy.add_vpp_config()
354         self.sr_policy = sr_policy
355
356         # log the sr policies
357         self.logger.info(self.vapi.cli("show sr policies"))
358
359         # steer IPv4 traffic to 7.1.1.0/24 into SRv6 Policy
360         # use the bsid of the above self.sr_policy
361         pol_steering = VppSRv6Steering(
362                         self,
363                         bsid=self.sr_policy.bsid,
364                         prefix="7.1.1.0", mask_width=24,
365                         traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV4,
366                         sr_policy_index=0, table_id=0,
367                         sw_if_index=0)
368         pol_steering.add_vpp_config()
369
370         # log the sr steering policies
371         self.logger.info(self.vapi.cli("show sr steering policies"))
372
373         # create packets
374         count = len(self.pg_packet_sizes)
375         dst_inner = '7.1.1.123'
376         pkts = []
377
378         # create IPv4 packets
379         packet_header = self.create_packet_header_IPv4(dst_inner)
380         # create traffic stream pg0->pg1
381         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
382                                        self.pg_packet_sizes, count))
383
384         # send packets and verify received packets
385         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
386                                   self.compare_rx_tx_packet_T_Encaps_IPv4)
387
388         # log the localsid counters
389         self.logger.info(self.vapi.cli("show sr localsid"))
390
391         # remove SR steering
392         pol_steering.remove_vpp_config()
393         self.logger.info(self.vapi.cli("show sr steering policies"))
394
395         # remove SR Policies
396         self.sr_policy.remove_vpp_config()
397         self.logger.info(self.vapi.cli("show sr policies"))
398
399         # remove FIB entries
400         # done by tearDown
401
402         # cleanup interfaces
403         self.teardown_interfaces()
404
405     @unittest.skip("VPP crashes after running this test")
406     def test_SRv6_T_Encaps_L2(self):
407         """ Test SRv6 Transit.Encaps behavior for L2.
408         """
409         # send traffic to one destination interface
410         # source interface is IPv4 only TODO?
411         # destination interface is IPv6 only
412         self.setup_interfaces(ipv6=[False, True], ipv4=[False, False])
413
414         # configure FIB entries
415         route = VppIpRoute(self, "a4::", 64,
416                            [VppRoutePath(self.pg1.remote_ip6,
417                                          self.pg1.sw_if_index,
418                                          proto=DpoProto.DPO_PROTO_IP6)],
419                            is_ip6=1)
420         route.add_vpp_config()
421
422         # configure encaps IPv6 source address
423         # needs to be done before SR Policy config
424         # TODO: API?
425         self.vapi.cli("set sr encaps source addr a3::")
426
427         bsid = 'a3::9999:1'
428         # configure SRv6 Policy
429         # Note: segment list order: first -> last
430         sr_policy = VppSRv6Policy(
431             self, bsid=bsid,
432             is_encap=1,
433             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
434             weight=1, fib_table=0,
435             segments=['a4::', 'a5::', 'a6::c7'],
436             source='a3::')
437         sr_policy.add_vpp_config()
438         self.sr_policy = sr_policy
439
440         # log the sr policies
441         self.logger.info(self.vapi.cli("show sr policies"))
442
443         # steer L2 traffic into SRv6 Policy
444         # use the bsid of the above self.sr_policy
445         pol_steering = VppSRv6Steering(
446                         self,
447                         bsid=self.sr_policy.bsid,
448                         prefix="::", mask_width=0,
449                         traffic_type=SRv6PolicySteeringTypes.SR_STEER_L2,
450                         sr_policy_index=0, table_id=0,
451                         sw_if_index=self.pg0.sw_if_index)
452         pol_steering.add_vpp_config()
453
454         # log the sr steering policies
455         self.logger.info(self.vapi.cli("show sr steering policies"))
456
457         # create packets
458         count = len(self.pg_packet_sizes)
459         pkts = []
460
461         # create L2 packets without dot1q header
462         packet_header = self.create_packet_header_L2()
463         # create traffic stream pg0->pg1
464         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
465                                        self.pg_packet_sizes, count))
466
467         # create L2 packets with dot1q header
468         packet_header = self.create_packet_header_L2(vlan=123)
469         # create traffic stream pg0->pg1
470         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
471                                        self.pg_packet_sizes, count))
472
473         # send packets and verify received packets
474         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
475                                   self.compare_rx_tx_packet_T_Encaps_L2)
476
477         # log the localsid counters
478         self.logger.info(self.vapi.cli("show sr localsid"))
479
480         # remove SR steering
481         pol_steering.remove_vpp_config()
482         self.logger.info(self.vapi.cli("show sr steering policies"))
483
484         # remove SR Policies
485         self.sr_policy.remove_vpp_config()
486         self.logger.info(self.vapi.cli("show sr policies"))
487
488         # remove FIB entries
489         # done by tearDown
490
491         # cleanup interfaces
492         self.teardown_interfaces()
493
494     def test_SRv6_End(self):
495         """ Test SRv6 End (without PSP) behavior.
496         """
497         # send traffic to one destination interface
498         # source and destination interfaces are IPv6 only
499         self.setup_interfaces(ipv6=[True, True])
500
501         # configure FIB entries
502         route = VppIpRoute(self, "a4::", 64,
503                            [VppRoutePath(self.pg1.remote_ip6,
504                                          self.pg1.sw_if_index,
505                                          proto=DpoProto.DPO_PROTO_IP6)],
506                            is_ip6=1)
507         route.add_vpp_config()
508
509         # configure SRv6 localSID End without PSP behavior
510         localsid = VppSRv6LocalSID(
511                         self, localsid_addr='A3::0',
512                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
513                         nh_addr='::',
514                         end_psp=0,
515                         sw_if_index=0,
516                         vlan_index=0,
517                         fib_table=0)
518         localsid.add_vpp_config()
519         # log the localsids
520         self.logger.debug(self.vapi.cli("show sr localsid"))
521
522         # create IPv6 packets with SRH (SL=2, SL=1, SL=0)
523         # send one packet per SL value per packet size
524         # SL=0 packet with localSID End with USP needs 2nd SRH
525         count = len(self.pg_packet_sizes)
526         dst_inner = 'a4::1234'
527         pkts = []
528
529         # packets with segments-left 2, active segment a3::
530         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
531                 dst_inner,
532                 sidlist=['a5::', 'a4::', 'a3::'],
533                 segleft=2)
534         # create traffic stream pg0->pg1
535         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
536                                        self.pg_packet_sizes, count))
537
538         # packets with segments-left 1, active segment a3::
539         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
540                 dst_inner,
541                 sidlist=['a4::', 'a3::', 'a2::'],
542                 segleft=1)
543         # add to traffic stream pg0->pg1
544         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
545                                        self.pg_packet_sizes, count))
546
547         # TODO: test behavior with SL=0 packet (needs 2*SRH?)
548
549         # send packets and verify received packets
550         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
551                                   self.compare_rx_tx_packet_End)
552
553         # log the localsid counters
554         self.logger.info(self.vapi.cli("show sr localsid"))
555
556         # remove SRv6 localSIDs
557         localsid.remove_vpp_config()
558
559         # remove FIB entries
560         # done by tearDown
561
562         # cleanup interfaces
563         self.teardown_interfaces()
564
565     def test_SRv6_End_with_PSP(self):
566         """ Test SRv6 End with PSP behavior.
567         """
568         # send traffic to one destination interface
569         # source and destination interfaces are IPv6 only
570         self.setup_interfaces(ipv6=[True, True])
571
572         # configure FIB entries
573         route = VppIpRoute(self, "a4::", 64,
574                            [VppRoutePath(self.pg1.remote_ip6,
575                                          self.pg1.sw_if_index,
576                                          proto=DpoProto.DPO_PROTO_IP6)],
577                            is_ip6=1)
578         route.add_vpp_config()
579
580         # configure SRv6 localSID End with PSP behavior
581         localsid = VppSRv6LocalSID(
582                         self, localsid_addr='A3::0',
583                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
584                         nh_addr='::',
585                         end_psp=1,
586                         sw_if_index=0,
587                         vlan_index=0,
588                         fib_table=0)
589         localsid.add_vpp_config()
590         # log the localsids
591         self.logger.debug(self.vapi.cli("show sr localsid"))
592
593         # create IPv6 packets with SRH (SL=2, SL=1)
594         # send one packet per SL value per packet size
595         # SL=0 packet with localSID End with PSP is dropped
596         count = len(self.pg_packet_sizes)
597         dst_inner = 'a4::1234'
598         pkts = []
599
600         # packets with segments-left 2, active segment a3::
601         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
602                     dst_inner,
603                     sidlist=['a5::', 'a4::', 'a3::'],
604                     segleft=2)
605         # create traffic stream pg0->pg1
606         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
607                                        self.pg_packet_sizes, count))
608
609         # packets with segments-left 1, active segment a3::
610         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
611                     dst_inner,
612                     sidlist=['a4::', 'a3::', 'a2::'],
613                     segleft=1)
614         # add to traffic stream pg0->pg1
615         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
616                                        self.pg_packet_sizes, count))
617
618         # send packets and verify received packets
619         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
620                                   self.compare_rx_tx_packet_End_PSP)
621
622         # log the localsid counters
623         self.logger.info(self.vapi.cli("show sr localsid"))
624
625         # remove SRv6 localSIDs
626         localsid.remove_vpp_config()
627
628         # remove FIB entries
629         # done by tearDown
630
631         # cleanup interfaces
632         self.teardown_interfaces()
633
634     def test_SRv6_End_X(self):
635         """ Test SRv6 End.X (without PSP) behavior.
636         """
637         # create three interfaces (1 source, 2 destinations)
638         # source and destination interfaces are IPv6 only
639         self.setup_interfaces(ipv6=[True, True, True])
640
641         # configure FIB entries
642         # a4::/64 via pg1 and pg2
643         route = VppIpRoute(self, "a4::", 64,
644                            [VppRoutePath(self.pg1.remote_ip6,
645                                          self.pg1.sw_if_index,
646                                          proto=DpoProto.DPO_PROTO_IP6),
647                             VppRoutePath(self.pg2.remote_ip6,
648                                          self.pg2.sw_if_index,
649                                          proto=DpoProto.DPO_PROTO_IP6)],
650                            is_ip6=1)
651         route.add_vpp_config()
652         self.logger.debug(self.vapi.cli("show ip6 fib"))
653
654         # configure SRv6 localSID End.X without PSP behavior
655         # End.X points to interface pg1
656         localsid = VppSRv6LocalSID(
657                         self, localsid_addr='A3::C4',
658                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
659                         nh_addr=self.pg1.remote_ip6,
660                         end_psp=0,
661                         sw_if_index=self.pg1.sw_if_index,
662                         vlan_index=0,
663                         fib_table=0)
664         localsid.add_vpp_config()
665         # log the localsids
666         self.logger.debug(self.vapi.cli("show sr localsid"))
667
668         # create IPv6 packets with SRH (SL=2, SL=1)
669         # send one packet per SL value per packet size
670         # SL=0 packet with localSID End with PSP is dropped
671         count = len(self.pg_packet_sizes)
672         dst_inner = 'a4::1234'
673         pkts = []
674
675         # packets with segments-left 2, active segment a3::c4
676         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
677                     dst_inner,
678                     sidlist=['a5::', 'a4::', 'a3::c4'],
679                     segleft=2)
680         # create traffic stream pg0->pg1
681         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
682                                        self.pg_packet_sizes, count))
683
684         # packets with segments-left 1, active segment a3::c4
685         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
686                     dst_inner,
687                     sidlist=['a4::', 'a3::c4', 'a2::'],
688                     segleft=1)
689         # add to traffic stream pg0->pg1
690         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
691                                        self.pg_packet_sizes, count))
692
693         # send packets and verify received packets
694         # using same comparison function as End (no PSP)
695         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
696                                   self.compare_rx_tx_packet_End)
697
698         # assert nothing was received on the other interface (pg2)
699         self.pg2.assert_nothing_captured("mis-directed packet(s)")
700
701         # log the localsid counters
702         self.logger.info(self.vapi.cli("show sr localsid"))
703
704         # remove SRv6 localSIDs
705         localsid.remove_vpp_config()
706
707         # remove FIB entries
708         # done by tearDown
709
710         # cleanup interfaces
711         self.teardown_interfaces()
712
713     def test_SRv6_End_X_with_PSP(self):
714         """ Test SRv6 End.X with PSP behavior.
715         """
716         # create three interfaces (1 source, 2 destinations)
717         # source and destination interfaces are IPv6 only
718         self.setup_interfaces(ipv6=[True, True, True])
719
720         # configure FIB entries
721         # a4::/64 via pg1 and pg2
722         route = VppIpRoute(self, "a4::", 64,
723                            [VppRoutePath(self.pg1.remote_ip6,
724                                          self.pg1.sw_if_index,
725                                          proto=DpoProto.DPO_PROTO_IP6),
726                             VppRoutePath(self.pg2.remote_ip6,
727                                          self.pg2.sw_if_index,
728                                          proto=DpoProto.DPO_PROTO_IP6)],
729                            is_ip6=1)
730         route.add_vpp_config()
731
732         # configure SRv6 localSID End with PSP behavior
733         localsid = VppSRv6LocalSID(
734                         self, localsid_addr='A3::C4',
735                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
736                         nh_addr=self.pg1.remote_ip6,
737                         end_psp=1,
738                         sw_if_index=self.pg1.sw_if_index,
739                         vlan_index=0,
740                         fib_table=0)
741         localsid.add_vpp_config()
742         # log the localsids
743         self.logger.debug(self.vapi.cli("show sr localsid"))
744
745         # create IPv6 packets with SRH (SL=2, SL=1)
746         # send one packet per SL value per packet size
747         # SL=0 packet with localSID End with PSP is dropped
748         count = len(self.pg_packet_sizes)
749         dst_inner = 'a4::1234'
750         pkts = []
751
752         # packets with segments-left 2, active segment a3::
753         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
754                     dst_inner,
755                     sidlist=['a5::', 'a4::', 'a3::c4'],
756                     segleft=2)
757         # create traffic stream pg0->pg1
758         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
759                                        self.pg_packet_sizes, count))
760
761         # packets with segments-left 1, active segment a3::
762         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
763                     dst_inner,
764                     sidlist=['a4::', 'a3::c4', 'a2::'],
765                     segleft=1)
766         # add to traffic stream pg0->pg1
767         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
768                                        self.pg_packet_sizes, count))
769
770         # send packets and verify received packets
771         # using same comparison function as End with PSP
772         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
773                                   self.compare_rx_tx_packet_End_PSP)
774
775         # assert nothing was received on the other interface (pg2)
776         self.pg2.assert_nothing_captured("mis-directed packet(s)")
777
778         # log the localsid counters
779         self.logger.info(self.vapi.cli("show sr localsid"))
780
781         # remove SRv6 localSIDs
782         localsid.remove_vpp_config()
783
784         # remove FIB entries
785         # done by tearDown
786
787         # cleanup interfaces
788         self.teardown_interfaces()
789
790     def test_SRv6_End_DX6(self):
791         """ Test SRv6 End.DX6 behavior.
792         """
793         # send traffic to one destination interface
794         # source and destination interfaces are IPv6 only
795         self.setup_interfaces(ipv6=[True, True])
796
797         # configure SRv6 localSID End.DX6 behavior
798         localsid = VppSRv6LocalSID(
799                         self, localsid_addr='a3::c4',
800                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX6,
801                         nh_addr=self.pg1.remote_ip6,
802                         end_psp=0,
803                         sw_if_index=self.pg1.sw_if_index,
804                         vlan_index=0,
805                         fib_table=0)
806         localsid.add_vpp_config()
807         # log the localsids
808         self.logger.debug(self.vapi.cli("show sr localsid"))
809
810         # create IPv6 packets with SRH (SL=0)
811         # send one packet per packet size
812         count = len(self.pg_packet_sizes)
813         dst_inner = 'a4::1234'  # inner header destination address
814         pkts = []
815
816         # packets with SRH, segments-left 0, active segment a3::c4
817         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
818                         dst_inner,
819                         sidlist=['a3::c4', 'a2::', 'a1::'],
820                         segleft=0)
821         # add to traffic stream pg0->pg1
822         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
823                                        self.pg_packet_sizes, count))
824
825         # packets without SRH, IPv6 in IPv6
826         # outer IPv6 dest addr is the localsid End.DX6
827         packet_header = self.create_packet_header_IPv6_IPv6(
828                                             dst_inner,
829                                             dst_outer='a3::c4')
830         # add to traffic stream pg0->pg1
831         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
832                                        self.pg_packet_sizes, count))
833
834         # send packets and verify received packets
835         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
836                                   self.compare_rx_tx_packet_End_DX6)
837
838         # log the localsid counters
839         self.logger.info(self.vapi.cli("show sr localsid"))
840
841         # remove SRv6 localSIDs
842         localsid.remove_vpp_config()
843
844         # cleanup interfaces
845         self.teardown_interfaces()
846
847     def test_SRv6_End_DT6(self):
848         """ Test SRv6 End.DT6 behavior.
849         """
850         # create three interfaces (1 source, 2 destinations)
851         # all interfaces are IPv6 only
852         # source interface in global FIB (0)
853         # destination interfaces in global and vrf
854         vrf_1 = 1
855         self.setup_interfaces(ipv6=[True, True, True],
856                               ipv6_table_id=[0, 0, vrf_1])
857
858         # configure FIB entries
859         # a4::/64 is reachable
860         #     via pg1 in table 0 (global)
861         #     and via pg2 in table vrf_1
862         route0 = VppIpRoute(self, "a4::", 64,
863                             [VppRoutePath(self.pg1.remote_ip6,
864                                           self.pg1.sw_if_index,
865                                           proto=DpoProto.DPO_PROTO_IP6,
866                                           nh_table_id=0)],
867                             table_id=0,
868                             is_ip6=1)
869         route0.add_vpp_config()
870         route1 = VppIpRoute(self, "a4::", 64,
871                             [VppRoutePath(self.pg2.remote_ip6,
872                                           self.pg2.sw_if_index,
873                                           proto=DpoProto.DPO_PROTO_IP6,
874                                           nh_table_id=vrf_1)],
875                             table_id=vrf_1,
876                             is_ip6=1)
877         route1.add_vpp_config()
878         self.logger.debug(self.vapi.cli("show ip6 fib"))
879
880         # configure SRv6 localSID End.DT6 behavior
881         # Note:
882         # fib_table: where the localsid is installed
883         # sw_if_index: in T-variants of localsid this is the vrf table_id
884         localsid = VppSRv6LocalSID(
885                         self, localsid_addr='a3::c4',
886                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT6,
887                         nh_addr='::',
888                         end_psp=0,
889                         sw_if_index=vrf_1,
890                         vlan_index=0,
891                         fib_table=0)
892         localsid.add_vpp_config()
893         # log the localsids
894         self.logger.debug(self.vapi.cli("show sr localsid"))
895
896         # create IPv6 packets with SRH (SL=0)
897         # send one packet per packet size
898         count = len(self.pg_packet_sizes)
899         dst_inner = 'a4::1234'  # inner header destination address
900         pkts = []
901
902         # packets with SRH, segments-left 0, active segment a3::c4
903         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
904                         dst_inner,
905                         sidlist=['a3::c4', 'a2::', 'a1::'],
906                         segleft=0)
907         # add to traffic stream pg0->pg1
908         pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
909                                        self.pg_packet_sizes, count))
910
911         # packets without SRH, IPv6 in IPv6
912         # outer IPv6 dest addr is the localsid End.DT6
913         packet_header = self.create_packet_header_IPv6_IPv6(
914                                             dst_inner,
915                                             dst_outer='a3::c4')
916         # add to traffic stream pg0->pg1
917         pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
918                                        self.pg_packet_sizes, count))
919
920         # send packets and verify received packets
921         # using same comparison function as End.DX6
922         self.send_and_verify_pkts(self.pg0, pkts, self.pg2,
923                                   self.compare_rx_tx_packet_End_DX6)
924
925         # assert nothing was received on the other interface (pg2)
926         self.pg1.assert_nothing_captured("mis-directed packet(s)")
927
928         # log the localsid counters
929         self.logger.info(self.vapi.cli("show sr localsid"))
930
931         # remove SRv6 localSIDs
932         localsid.remove_vpp_config()
933
934         # remove FIB entries
935         # done by tearDown
936
937         # cleanup interfaces
938         self.teardown_interfaces()
939
940     def test_SRv6_End_DX4(self):
941         """ Test SRv6 End.DX4 behavior.
942         """
943         # send traffic to one destination interface
944         # source interface is IPv6 only
945         # destination interface is IPv4 only
946         self.setup_interfaces(ipv6=[True, False], ipv4=[False, True])
947
948         # configure SRv6 localSID End.DX4 behavior
949         localsid = VppSRv6LocalSID(
950                         self, localsid_addr='a3::c4',
951                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX4,
952                         nh_addr=self.pg1.remote_ip4,
953                         end_psp=0,
954                         sw_if_index=self.pg1.sw_if_index,
955                         vlan_index=0,
956                         fib_table=0)
957         localsid.add_vpp_config()
958         # log the localsids
959         self.logger.debug(self.vapi.cli("show sr localsid"))
960
961         # send one packet per packet size
962         count = len(self.pg_packet_sizes)
963         dst_inner = '4.1.1.123'  # inner header destination address
964         pkts = []
965
966         # packets with SRH, segments-left 0, active segment a3::c4
967         packet_header = self.create_packet_header_IPv6_SRH_IPv4(
968                         dst_inner,
969                         sidlist=['a3::c4', 'a2::', 'a1::'],
970                         segleft=0)
971         # add to traffic stream pg0->pg1
972         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
973                                        self.pg_packet_sizes, count))
974
975         # packets without SRH, IPv4 in IPv6
976         # outer IPv6 dest addr is the localsid End.DX4
977         packet_header = self.create_packet_header_IPv6_IPv4(
978                                             dst_inner,
979                                             dst_outer='a3::c4')
980         # add to traffic stream pg0->pg1
981         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
982                                        self.pg_packet_sizes, count))
983
984         # send packets and verify received packets
985         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
986                                   self.compare_rx_tx_packet_End_DX4)
987
988         # log the localsid counters
989         self.logger.info(self.vapi.cli("show sr localsid"))
990
991         # remove SRv6 localSIDs
992         localsid.remove_vpp_config()
993
994         # cleanup interfaces
995         self.teardown_interfaces()
996
997     def test_SRv6_End_DT4(self):
998         """ Test SRv6 End.DT4 behavior.
999         """
1000         # create three interfaces (1 source, 2 destinations)
1001         # source interface is IPv6-only
1002         # destination interfaces are IPv4 only
1003         # source interface in global FIB (0)
1004         # destination interfaces in global and vrf
1005         vrf_1 = 1
1006         self.setup_interfaces(ipv6=[True, False, False],
1007                               ipv4=[False, True, True],
1008                               ipv6_table_id=[0, 0, 0],
1009                               ipv4_table_id=[0, 0, vrf_1])
1010
1011         # configure FIB entries
1012         # 4.1.1.0/24 is reachable
1013         #     via pg1 in table 0 (global)
1014         #     and via pg2 in table vrf_1
1015         route0 = VppIpRoute(self, "4.1.1.0", 24,
1016                             [VppRoutePath(self.pg1.remote_ip4,
1017                                           self.pg1.sw_if_index,
1018                                           nh_table_id=0)],
1019                             table_id=0,
1020                             is_ip6=0)
1021         route0.add_vpp_config()
1022         route1 = VppIpRoute(self, "4.1.1.0", 24,
1023                             [VppRoutePath(self.pg2.remote_ip4,
1024                                           self.pg2.sw_if_index,
1025                                           nh_table_id=vrf_1)],
1026                             table_id=vrf_1,
1027                             is_ip6=0)
1028         route1.add_vpp_config()
1029         self.logger.debug(self.vapi.cli("show ip fib"))
1030
1031         # configure SRv6 localSID End.DT6 behavior
1032         # Note:
1033         # fib_table: where the localsid is installed
1034         # sw_if_index: in T-variants of localsid: vrf table_id
1035         localsid = VppSRv6LocalSID(
1036                         self, localsid_addr='a3::c4',
1037                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT4,
1038                         nh_addr='::',
1039                         end_psp=0,
1040                         sw_if_index=vrf_1,
1041                         vlan_index=0,
1042                         fib_table=0)
1043         localsid.add_vpp_config()
1044         # log the localsids
1045         self.logger.debug(self.vapi.cli("show sr localsid"))
1046
1047         # create IPv6 packets with SRH (SL=0)
1048         # send one packet per packet size
1049         count = len(self.pg_packet_sizes)
1050         dst_inner = '4.1.1.123'  # inner header destination address
1051         pkts = []
1052
1053         # packets with SRH, segments-left 0, active segment a3::c4
1054         packet_header = self.create_packet_header_IPv6_SRH_IPv4(
1055                         dst_inner,
1056                         sidlist=['a3::c4', 'a2::', 'a1::'],
1057                         segleft=0)
1058         # add to traffic stream pg0->pg1
1059         pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
1060                                        self.pg_packet_sizes, count))
1061
1062         # packets without SRH, IPv6 in IPv6
1063         # outer IPv6 dest addr is the localsid End.DX4
1064         packet_header = self.create_packet_header_IPv6_IPv4(
1065                                             dst_inner,
1066                                             dst_outer='a3::c4')
1067         # add to traffic stream pg0->pg1
1068         pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
1069                                        self.pg_packet_sizes, count))
1070
1071         # send packets and verify received packets
1072         # using same comparison function as End.DX4
1073         self.send_and_verify_pkts(self.pg0, pkts, self.pg2,
1074                                   self.compare_rx_tx_packet_End_DX4)
1075
1076         # assert nothing was received on the other interface (pg2)
1077         self.pg1.assert_nothing_captured("mis-directed packet(s)")
1078
1079         # log the localsid counters
1080         self.logger.info(self.vapi.cli("show sr localsid"))
1081
1082         # remove SRv6 localSIDs
1083         localsid.remove_vpp_config()
1084
1085         # remove FIB entries
1086         # done by tearDown
1087
1088         # cleanup interfaces
1089         self.teardown_interfaces()
1090
1091     def test_SRv6_End_DX2(self):
1092         """ Test SRv6 End.DX2 behavior.
1093         """
1094         # send traffic to one destination interface
1095         # source interface is IPv6 only
1096         self.setup_interfaces(ipv6=[True, False], ipv4=[False, False])
1097
1098         # configure SRv6 localSID End.DX2 behavior
1099         localsid = VppSRv6LocalSID(
1100                         self, localsid_addr='a3::c4',
1101                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX2,
1102                         nh_addr='::',
1103                         end_psp=0,
1104                         sw_if_index=self.pg1.sw_if_index,
1105                         vlan_index=0,
1106                         fib_table=0)
1107         localsid.add_vpp_config()
1108         # log the localsids
1109         self.logger.debug(self.vapi.cli("show sr localsid"))
1110
1111         # send one packet per packet size
1112         count = len(self.pg_packet_sizes)
1113         pkts = []
1114
1115         # packets with SRH, segments-left 0, active segment a3::c4
1116         # L2 has no dot1q header
1117         packet_header = self.create_packet_header_IPv6_SRH_L2(
1118                             sidlist=['a3::c4', 'a2::', 'a1::'],
1119                             segleft=0,
1120                             vlan=0)
1121         # add to traffic stream pg0->pg1
1122         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1123                                        self.pg_packet_sizes, count))
1124
1125         # packets with SRH, segments-left 0, active segment a3::c4
1126         # L2 has dot1q header
1127         packet_header = self.create_packet_header_IPv6_SRH_L2(
1128                             sidlist=['a3::c4', 'a2::', 'a1::'],
1129                             segleft=0,
1130                             vlan=123)
1131         # add to traffic stream pg0->pg1
1132         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1133                                        self.pg_packet_sizes, count))
1134
1135         # packets without SRH, L2 in IPv6
1136         # outer IPv6 dest addr is the localsid End.DX2
1137         # L2 has no dot1q header
1138         packet_header = self.create_packet_header_IPv6_L2(
1139                                             dst_outer='a3::c4',
1140                                             vlan=0)
1141         # add to traffic stream pg0->pg1
1142         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1143                                        self.pg_packet_sizes, count))
1144
1145         # packets without SRH, L2 in IPv6
1146         # outer IPv6 dest addr is the localsid End.DX2
1147         # L2 has dot1q header
1148         packet_header = self.create_packet_header_IPv6_L2(
1149                                             dst_outer='a3::c4',
1150                                             vlan=123)
1151         # add to traffic stream pg0->pg1
1152         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1153                                        self.pg_packet_sizes, count))
1154
1155         # send packets and verify received packets
1156         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
1157                                   self.compare_rx_tx_packet_End_DX2)
1158
1159         # log the localsid counters
1160         self.logger.info(self.vapi.cli("show sr localsid"))
1161
1162         # remove SRv6 localSIDs
1163         localsid.remove_vpp_config()
1164
1165         # cleanup interfaces
1166         self.teardown_interfaces()
1167
1168     def compare_rx_tx_packet_T_Encaps(self, tx_pkt, rx_pkt):
1169         """ Compare input and output packet after passing T.Encaps
1170
1171         :param tx_pkt: transmitted packet
1172         :param rx_pkt: received packet
1173         """
1174         # T.Encaps updates the headers as follows:
1175         # SR Policy seglist (S3, S2, S1)
1176         # SR Policy source C
1177         # IPv6:
1178         # in: IPv6(A, B2)
1179         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(A, B2)
1180         # IPv6 + SRH:
1181         # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1)
1182         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(a, B2)SRH(B3, B2, B1; SL=1)
1183
1184         # get first (outer) IPv6 header of rx'ed packet
1185         rx_ip = rx_pkt.getlayer(IPv6)
1186         rx_srh = None
1187
1188         tx_ip = tx_pkt.getlayer(IPv6)
1189
1190         # expected segment-list
1191         seglist = self.sr_policy.segments
1192         # reverse list to get order as in SRH
1193         tx_seglist = seglist[::-1]
1194
1195         # get source address of SR Policy
1196         sr_policy_source = self.sr_policy.source
1197
1198         # rx'ed packet should have SRH
1199         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1200         # get SRH
1201         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1202
1203         # received ip.src should be equal to SR Policy source
1204         self.assertEqual(rx_ip.src, sr_policy_source)
1205         # received ip.dst should be equal to expected sidlist[lastentry]
1206         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1207         # rx'ed seglist should be equal to expected seglist
1208         self.assertEqual(rx_srh.addresses, tx_seglist)
1209         # segleft should be equal to size expected seglist-1
1210         self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1211         # segleft should be equal to lastentry
1212         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1213
1214         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1215         # except for the hop-limit field
1216         #   -> update tx'ed hlim to the expected hlim
1217         tx_ip.hlim = tx_ip.hlim - 1
1218
1219         self.assertEqual(rx_srh.payload, tx_ip)
1220
1221         self.logger.debug("packet verification: SUCCESS")
1222
1223     def compare_rx_tx_packet_T_Encaps_IPv4(self, tx_pkt, rx_pkt):
1224         """ Compare input and output packet after passing T.Encaps for IPv4
1225
1226         :param tx_pkt: transmitted packet
1227         :param rx_pkt: received packet
1228         """
1229         # T.Encaps for IPv4 updates the headers as follows:
1230         # SR Policy seglist (S3, S2, S1)
1231         # SR Policy source C
1232         # IPv4:
1233         # in: IPv4(A, B2)
1234         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv4(A, B2)
1235
1236         # get first (outer) IPv6 header of rx'ed packet
1237         rx_ip = rx_pkt.getlayer(IPv6)
1238         rx_srh = None
1239
1240         tx_ip = tx_pkt.getlayer(IP)
1241
1242         # expected segment-list
1243         seglist = self.sr_policy.segments
1244         # reverse list to get order as in SRH
1245         tx_seglist = seglist[::-1]
1246
1247         # get source address of SR Policy
1248         sr_policy_source = self.sr_policy.source
1249
1250         # checks common to cases tx with and without SRH
1251         # rx'ed packet should have SRH and IPv4 header
1252         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1253         self.assertTrue(rx_ip.payload.haslayer(IP))
1254         # get SRH
1255         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1256
1257         # received ip.src should be equal to SR Policy source
1258         self.assertEqual(rx_ip.src, sr_policy_source)
1259         # received ip.dst should be equal to sidlist[lastentry]
1260         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1261         # rx'ed seglist should be equal to seglist
1262         self.assertEqual(rx_srh.addresses, tx_seglist)
1263         # segleft should be equal to size seglist-1
1264         self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1265         # segleft should be equal to lastentry
1266         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1267
1268         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1269         # except for the ttl field and ip checksum
1270         #   -> adjust tx'ed ttl to expected ttl
1271         tx_ip.ttl = tx_ip.ttl - 1
1272         #   -> set tx'ed ip checksum to None and let scapy recompute
1273         tx_ip.chksum = None
1274         # read back the pkt (with str()) to force computing these fields
1275         # probably other ways to accomplish this are possible
1276         tx_ip = IP(str(tx_ip))
1277
1278         self.assertEqual(rx_srh.payload, tx_ip)
1279
1280         self.logger.debug("packet verification: SUCCESS")
1281
1282     def compare_rx_tx_packet_T_Encaps_L2(self, tx_pkt, rx_pkt):
1283         """ Compare input and output packet after passing T.Encaps for L2
1284
1285         :param tx_pkt: transmitted packet
1286         :param rx_pkt: received packet
1287         """
1288         # T.Encaps for L2 updates the headers as follows:
1289         # SR Policy seglist (S3, S2, S1)
1290         # SR Policy source C
1291         # L2:
1292         # in: L2
1293         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)L2
1294
1295         # get first (outer) IPv6 header of rx'ed packet
1296         rx_ip = rx_pkt.getlayer(IPv6)
1297         rx_srh = None
1298
1299         tx_ether = tx_pkt.getlayer(Ether)
1300
1301         # expected segment-list
1302         seglist = self.sr_policy.segments
1303         # reverse list to get order as in SRH
1304         tx_seglist = seglist[::-1]
1305
1306         # get source address of SR Policy
1307         sr_policy_source = self.sr_policy.source
1308
1309         # rx'ed packet should have SRH
1310         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1311         # get SRH
1312         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1313
1314         # received ip.src should be equal to SR Policy source
1315         self.assertEqual(rx_ip.src, sr_policy_source)
1316         # received ip.dst should be equal to sidlist[lastentry]
1317         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1318         # rx'ed seglist should be equal to seglist
1319         self.assertEqual(rx_srh.addresses, tx_seglist)
1320         # segleft should be equal to size seglist-1
1321         self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1322         # segleft should be equal to lastentry
1323         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1324         # nh should be "No Next Header" (59)
1325         self.assertEqual(rx_srh.nh, 59)
1326
1327         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1328         self.assertEqual(Ether(str(rx_srh.payload)), tx_ether)
1329
1330         self.logger.debug("packet verification: SUCCESS")
1331
1332     def compare_rx_tx_packet_T_Insert(self, tx_pkt, rx_pkt):
1333         """ Compare input and output packet after passing T.Insert
1334
1335         :param tx_pkt: transmitted packet
1336         :param rx_pkt: received packet
1337         """
1338         # T.Insert updates the headers as follows:
1339         # IPv6:
1340         # in: IPv6(A, B2)
1341         # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)
1342         # IPv6 + SRH:
1343         # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1)
1344         # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)SRH(B3, B2, B1; SL=1)
1345
1346         # get first (outer) IPv6 header of rx'ed packet
1347         rx_ip = rx_pkt.getlayer(IPv6)
1348         rx_srh = None
1349         rx_ip2 = None
1350         rx_srh2 = None
1351         rx_ip3 = None
1352         rx_udp = rx_pkt[UDP]
1353
1354         tx_ip = tx_pkt.getlayer(IPv6)
1355         tx_srh = None
1356         tx_ip2 = None
1357         # some packets have been tx'ed with an SRH, some without it
1358         # get SRH if tx'ed packet has it
1359         if tx_pkt.haslayer(IPv6ExtHdrSegmentRouting):
1360             tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1361             tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1362         tx_udp = tx_pkt[UDP]
1363
1364         # expected segment-list (make copy of SR Policy segment list)
1365         seglist = self.sr_policy.segments[:]
1366         # expected seglist has initial dest addr as last segment
1367         seglist.append(tx_ip.dst)
1368         # reverse list to get order as in SRH
1369         tx_seglist = seglist[::-1]
1370
1371         # get source address of SR Policy
1372         sr_policy_source = self.sr_policy.source
1373
1374         # checks common to cases tx with and without SRH
1375         # rx'ed packet should have SRH and only one IPv6 header
1376         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1377         self.assertFalse(rx_ip.payload.haslayer(IPv6))
1378         # get SRH
1379         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1380
1381         # rx'ed ip.src should be equal to tx'ed ip.src
1382         self.assertEqual(rx_ip.src, tx_ip.src)
1383         # rx'ed ip.dst should be equal to sidlist[lastentry]
1384         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1385
1386         # rx'ed seglist should be equal to expected seglist
1387         self.assertEqual(rx_srh.addresses, tx_seglist)
1388         # segleft should be equal to size(expected seglist)-1
1389         self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1390         # segleft should be equal to lastentry
1391         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1392
1393         if tx_srh:  # packet was tx'ed with SRH
1394             # packet should have 2nd SRH
1395             self.assertTrue(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting))
1396             # get 2nd SRH
1397             rx_srh2 = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting, 2)
1398
1399             # rx'ed srh2.addresses should be equal to tx'ed srh.addresses
1400             self.assertEqual(rx_srh2.addresses, tx_srh.addresses)
1401             # rx'ed srh2.segleft should be equal to tx'ed srh.segleft
1402             self.assertEqual(rx_srh2.segleft, tx_srh.segleft)
1403             # rx'ed srh2.lastentry should be equal to tx'ed srh.lastentry
1404             self.assertEqual(rx_srh2.lastentry, tx_srh.lastentry)
1405
1406         else:  # packet was tx'ed without SRH
1407             # rx packet should have no other SRH
1408             self.assertFalse(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting))
1409
1410         # UDP layer should be unchanged
1411         self.assertEqual(rx_udp, tx_udp)
1412
1413         self.logger.debug("packet verification: SUCCESS")
1414
1415     def compare_rx_tx_packet_End(self, tx_pkt, rx_pkt):
1416         """ Compare input and output packet after passing End (without PSP)
1417
1418         :param tx_pkt: transmitted packet
1419         :param rx_pkt: received packet
1420         """
1421         # End (no PSP) updates the headers as follows:
1422         # IPv6 + SRH:
1423         # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2)
1424         # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1425
1426         # get first (outer) IPv6 header of rx'ed packet
1427         rx_ip = rx_pkt.getlayer(IPv6)
1428         rx_srh = None
1429         rx_ip2 = None
1430         rx_udp = rx_pkt[UDP]
1431
1432         tx_ip = tx_pkt.getlayer(IPv6)
1433         # we know the packet has been tx'ed
1434         # with an inner IPv6 header and an SRH
1435         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1436         tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1437         tx_udp = tx_pkt[UDP]
1438
1439         # common checks, regardless of tx segleft value
1440         # rx'ed packet should have 2nd IPv6 header
1441         self.assertTrue(rx_ip.payload.haslayer(IPv6))
1442         # get second (inner) IPv6 header
1443         rx_ip2 = rx_pkt.getlayer(IPv6, 2)
1444
1445         if tx_ip.segleft > 0:
1446             # SRH should NOT have been popped:
1447             #   End SID without PSP does not pop SRH if segleft>0
1448             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1449             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1450
1451             # received ip.src should be equal to expected ip.src
1452             self.assertEqual(rx_ip.src, tx_ip.src)
1453             # sidlist should be unchanged
1454             self.assertEqual(rx_srh.addresses, tx_srh.addresses)
1455             # segleft should have been decremented
1456             self.assertEqual(rx_srh.segleft, tx_srh.segleft-1)
1457             # received ip.dst should be equal to sidlist[segleft]
1458             self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
1459             # lastentry should be unchanged
1460             self.assertEqual(rx_srh.lastentry, tx_srh.lastentry)
1461             # inner IPv6 packet (ip2) should be unchanged
1462             self.assertEqual(rx_ip2.src, tx_ip2.src)
1463             self.assertEqual(rx_ip2.dst, tx_ip2.dst)
1464         # else:  # tx_ip.segleft == 0
1465             # TODO: Does this work with 2 SRHs in ingress packet?
1466
1467         # UDP layer should be unchanged
1468         self.assertEqual(rx_udp, tx_udp)
1469
1470         self.logger.debug("packet verification: SUCCESS")
1471
1472     def compare_rx_tx_packet_End_PSP(self, tx_pkt, rx_pkt):
1473         """ Compare input and output packet after passing End with PSP
1474
1475         :param tx_pkt: transmitted packet
1476         :param rx_pkt: received packet
1477         """
1478         # End (PSP) updates the headers as follows:
1479         # IPv6 + SRH (SL>1):
1480         # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2)
1481         # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1482         # IPv6 + SRH (SL=1):
1483         # in: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1484         # out: IPv6(A, S3)
1485
1486         # get first (outer) IPv6 header of rx'ed packet
1487         rx_ip = rx_pkt.getlayer(IPv6)
1488         rx_srh = None
1489         rx_ip2 = None
1490         rx_udp = rx_pkt[UDP]
1491
1492         tx_ip = tx_pkt.getlayer(IPv6)
1493         # we know the packet has been tx'ed
1494         # with an inner IPv6 header and an SRH
1495         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1496         tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1497         tx_udp = tx_pkt[UDP]
1498
1499         # common checks, regardless of tx segleft value
1500         self.assertTrue(rx_ip.payload.haslayer(IPv6))
1501         rx_ip2 = rx_pkt.getlayer(IPv6, 2)
1502         # inner IPv6 packet (ip2) should be unchanged
1503         self.assertEqual(rx_ip2.src, tx_ip2.src)
1504         self.assertEqual(rx_ip2.dst, tx_ip2.dst)
1505
1506         if tx_ip.segleft > 1:
1507             # SRH should NOT have been popped:
1508             #   End SID with PSP does not pop SRH if segleft>1
1509             # rx'ed packet should have SRH
1510             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1511             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1512
1513             # received ip.src should be equal to expected ip.src
1514             self.assertEqual(rx_ip.src, tx_ip.src)
1515             # sidlist should be unchanged
1516             self.assertEqual(rx_srh.addresses, tx_srh.addresses)
1517             # segleft should have been decremented
1518             self.assertEqual(rx_srh.segleft, tx_srh.segleft-1)
1519             # received ip.dst should be equal to sidlist[segleft]
1520             self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
1521             # lastentry should be unchanged
1522             self.assertEqual(rx_srh.lastentry, tx_srh.lastentry)
1523
1524         else:  # tx_ip.segleft <= 1
1525             # SRH should have been popped:
1526             #   End SID with PSP and segleft=1 pops SRH
1527             # the two IPv6 headers are still present
1528             # outer IPv6 header has DA == last segment of popped SRH
1529             # SRH should not be present
1530             self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1531             # outer IPv6 header ip.src should be equal to tx'ed ip.src
1532             self.assertEqual(rx_ip.src, tx_ip.src)
1533             # outer IPv6 header ip.dst should be = to tx'ed sidlist[segleft-1]
1534             self.assertEqual(rx_ip.dst, tx_srh.addresses[tx_srh.segleft-1])
1535
1536         # UDP layer should be unchanged
1537         self.assertEqual(rx_udp, tx_udp)
1538
1539         self.logger.debug("packet verification: SUCCESS")
1540
1541     def compare_rx_tx_packet_End_DX6(self, tx_pkt, rx_pkt):
1542         """ Compare input and output packet after passing End.DX6
1543
1544         :param tx_pkt: transmitted packet
1545         :param rx_pkt: received packet
1546         """
1547         # End.DX6 updates the headers as follows:
1548         # IPv6 + SRH (SL=0):
1549         # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv6(B, D)
1550         # out: IPv6(B, D)
1551         # IPv6:
1552         # in: IPv6(A, S3)IPv6(B, D)
1553         # out: IPv6(B, D)
1554
1555         # get first (outer) IPv6 header of rx'ed packet
1556         rx_ip = rx_pkt.getlayer(IPv6)
1557
1558         tx_ip = tx_pkt.getlayer(IPv6)
1559         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1560
1561         # verify if rx'ed packet has no SRH
1562         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1563
1564         # the whole rx_ip pkt should be equal to tx_ip2
1565         # except for the hlim field
1566         #   -> adjust tx'ed hlim to expected hlim
1567         tx_ip2.hlim = tx_ip2.hlim - 1
1568
1569         self.assertEqual(rx_ip, tx_ip2)
1570
1571         self.logger.debug("packet verification: SUCCESS")
1572
1573     def compare_rx_tx_packet_End_DX4(self, tx_pkt, rx_pkt):
1574         """ Compare input and output packet after passing End.DX4
1575
1576         :param tx_pkt: transmitted packet
1577         :param rx_pkt: received packet
1578         """
1579         # End.DX4 updates the headers as follows:
1580         # IPv6 + SRH (SL=0):
1581         # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv4(B, D)
1582         # out: IPv4(B, D)
1583         # IPv6:
1584         # in: IPv6(A, S3)IPv4(B, D)
1585         # out: IPv4(B, D)
1586
1587         # get IPv4 header of rx'ed packet
1588         rx_ip = rx_pkt.getlayer(IP)
1589
1590         tx_ip = tx_pkt.getlayer(IPv6)
1591         tx_ip2 = tx_pkt.getlayer(IP)
1592
1593         # verify if rx'ed packet has no SRH
1594         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1595
1596         # the whole rx_ip pkt should be equal to tx_ip2
1597         # except for the ttl field and ip checksum
1598         #   -> adjust tx'ed ttl to expected ttl
1599         tx_ip2.ttl = tx_ip2.ttl - 1
1600         #   -> set tx'ed ip checksum to None and let scapy recompute
1601         tx_ip2.chksum = None
1602         # read back the pkt (with str()) to force computing these fields
1603         # probably other ways to accomplish this are possible
1604         tx_ip2 = IP(str(tx_ip2))
1605
1606         self.assertEqual(rx_ip, tx_ip2)
1607
1608         self.logger.debug("packet verification: SUCCESS")
1609
1610     def compare_rx_tx_packet_End_DX2(self, tx_pkt, rx_pkt):
1611         """ Compare input and output packet after passing End.DX2
1612
1613         :param tx_pkt: transmitted packet
1614         :param rx_pkt: received packet
1615         """
1616         # End.DX2 updates the headers as follows:
1617         # IPv6 + SRH (SL=0):
1618         # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)L2
1619         # out: L2
1620         # IPv6:
1621         # in: IPv6(A, S3)L2
1622         # out: L2
1623
1624         # get IPv4 header of rx'ed packet
1625         rx_eth = rx_pkt.getlayer(Ether)
1626
1627         tx_ip = tx_pkt.getlayer(IPv6)
1628         # we can't just get the 2nd Ether layer
1629         # get the Raw content and dissect it as Ether
1630         tx_eth1 = Ether(str(tx_pkt[Raw]))
1631
1632         # verify if rx'ed packet has no SRH
1633         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1634
1635         # the whole rx_eth pkt should be equal to tx_eth1
1636         self.assertEqual(rx_eth, tx_eth1)
1637
1638         self.logger.debug("packet verification: SUCCESS")
1639
1640     def create_stream(self, src_if, dst_if, packet_header, packet_sizes,
1641                       count):
1642         """Create SRv6 input packet stream for defined interface.
1643
1644         :param VppInterface src_if: Interface to create packet stream for
1645         :param VppInterface dst_if: destination interface of packet stream
1646         :param packet_header: Layer3 scapy packet headers,
1647                 L2 is added when not provided,
1648                 Raw(payload) with packet_info is added
1649         :param list packet_sizes: packet stream pckt sizes,sequentially applied
1650                to packets in stream have
1651         :param int count: number of packets in packet stream
1652         :return: list of packets
1653         """
1654         self.logger.info("Creating packets")
1655         pkts = []
1656         for i in range(0, count-1):
1657             payload_info = self.create_packet_info(src_if, dst_if)
1658             self.logger.debug(
1659                 "Creating packet with index %d" % (payload_info.index))
1660             payload = self.info_to_payload(payload_info)
1661             # add L2 header if not yet provided in packet_header
1662             if packet_header.getlayer(0).name == 'Ethernet':
1663                 p = (packet_header /
1664                      Raw(payload))
1665             else:
1666                 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
1667                      packet_header /
1668                      Raw(payload))
1669             size = packet_sizes[i % len(packet_sizes)]
1670             self.logger.debug("Packet size %d" % (size))
1671             self.extend_packet(p, size)
1672             # we need to store the packet with the automatic fields computed
1673             # read back the dumped packet (with str())
1674             # to force computing these fields
1675             # probably other ways are possible
1676             p = Ether(str(p))
1677             payload_info.data = p.copy()
1678             self.logger.debug(ppp("Created packet:", p))
1679             pkts.append(p)
1680         self.logger.info("Done creating packets")
1681         return pkts
1682
1683     def send_and_verify_pkts(self, input, pkts, output, compare_func):
1684         """Send packets and verify received packets using compare_func
1685
1686         :param input: ingress interface of DUT
1687         :param pkts: list of packets to transmit
1688         :param output: egress interface of DUT
1689         :param compare_func: function to compare in and out packets
1690         """
1691         # add traffic stream to input interface
1692         input.add_stream(pkts)
1693
1694         # enable capture on all interfaces
1695         self.pg_enable_capture(self.pg_interfaces)
1696
1697         # start traffic
1698         self.logger.info("Starting traffic")
1699         self.pg_start()
1700
1701         # get output capture
1702         self.logger.info("Getting packet capture")
1703         capture = output.get_capture()
1704
1705         # assert nothing was captured on input interface
1706         input.assert_nothing_captured()
1707
1708         # verify captured packets
1709         self.verify_captured_pkts(output, capture, compare_func)
1710
1711     def create_packet_header_IPv6(self, dst):
1712         """Create packet header: IPv6 header, UDP header
1713
1714         :param dst: IPv6 destination address
1715
1716         IPv6 source address is 1234::1
1717         UDP source port and destination port are 1234
1718         """
1719
1720         p = (IPv6(src='1234::1', dst=dst) /
1721              UDP(sport=1234, dport=1234))
1722         return p
1723
1724     def create_packet_header_IPv6_SRH(self, sidlist, segleft):
1725         """Create packet header: IPv6 header with SRH, UDP header
1726
1727         :param list sidlist: segment list
1728         :param int segleft: segments-left field value
1729
1730         IPv6 destination address is set to sidlist[segleft]
1731         IPv6 source addresses are 1234::1 and 4321::1
1732         UDP source port and destination port are 1234
1733         """
1734
1735         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1736              IPv6ExtHdrSegmentRouting(addresses=sidlist) /
1737              UDP(sport=1234, dport=1234))
1738         return p
1739
1740     def create_packet_header_IPv6_SRH_IPv6(self, dst, sidlist, segleft):
1741         """Create packet header: IPv6 encapsulated in SRv6:
1742         IPv6 header with SRH, IPv6 header, UDP header
1743
1744         :param ipv6address dst: inner IPv6 destination address
1745         :param list sidlist: segment list of outer IPv6 SRH
1746         :param int segleft: segments-left field of outer IPv6 SRH
1747
1748         Outer IPv6 destination address is set to sidlist[segleft]
1749         IPv6 source addresses are 1234::1 and 4321::1
1750         UDP source port and destination port are 1234
1751         """
1752
1753         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1754              IPv6ExtHdrSegmentRouting(addresses=sidlist,
1755                                       segleft=segleft, nh=41) /
1756              IPv6(src='4321::1', dst=dst) /
1757              UDP(sport=1234, dport=1234))
1758         return p
1759
1760     def create_packet_header_IPv6_IPv6(self, dst_inner, dst_outer):
1761         """Create packet header: IPv6 encapsulated in IPv6:
1762         IPv6 header, IPv6 header, UDP header
1763
1764         :param ipv6address dst_inner: inner IPv6 destination address
1765         :param ipv6address dst_outer: outer IPv6 destination address
1766
1767         IPv6 source addresses are 1234::1 and 4321::1
1768         UDP source port and destination port are 1234
1769         """
1770
1771         p = (IPv6(src='1234::1', dst=dst_outer) /
1772              IPv6(src='4321::1', dst=dst_inner) /
1773              UDP(sport=1234, dport=1234))
1774         return p
1775
1776     def create_packet_header_IPv6_SRH_SRH_IPv6(self, dst, sidlist1, segleft1,
1777                                                sidlist2, segleft2):
1778         """Create packet header: IPv6 encapsulated in SRv6 with 2 SRH:
1779         IPv6 header with SRH, 2nd SRH, IPv6 header, UDP header
1780
1781         :param ipv6address dst: inner IPv6 destination address
1782         :param list sidlist1: segment list of outer IPv6 SRH
1783         :param int segleft1: segments-left field of outer IPv6 SRH
1784         :param list sidlist2: segment list of inner IPv6 SRH
1785         :param int segleft2: segments-left field of inner IPv6 SRH
1786
1787         Outer IPv6 destination address is set to sidlist[segleft]
1788         IPv6 source addresses are 1234::1 and 4321::1
1789         UDP source port and destination port are 1234
1790         """
1791
1792         p = (IPv6(src='1234::1', dst=sidlist1[segleft1]) /
1793              IPv6ExtHdrSegmentRouting(addresses=sidlist1,
1794                                       segleft=segleft1, nh=43) /
1795              IPv6ExtHdrSegmentRouting(addresses=sidlist2,
1796                                       segleft=segleft2, nh=41) /
1797              IPv6(src='4321::1', dst=dst) /
1798              UDP(sport=1234, dport=1234))
1799         return p
1800
1801     def create_packet_header_IPv4(self, dst):
1802         """Create packet header: IPv4 header, UDP header
1803
1804         :param dst: IPv4 destination address
1805
1806         IPv4 source address is 123.1.1.1
1807         UDP source port and destination port are 1234
1808         """
1809
1810         p = (IP(src='123.1.1.1', dst=dst) /
1811              UDP(sport=1234, dport=1234))
1812         return p
1813
1814     def create_packet_header_IPv6_IPv4(self, dst_inner, dst_outer):
1815         """Create packet header: IPv4 encapsulated in IPv6:
1816         IPv6 header, IPv4 header, UDP header
1817
1818         :param ipv4address dst_inner: inner IPv4 destination address
1819         :param ipv6address dst_outer: outer IPv6 destination address
1820
1821         IPv6 source address is 1234::1
1822         IPv4 source address is 123.1.1.1
1823         UDP source port and destination port are 1234
1824         """
1825
1826         p = (IPv6(src='1234::1', dst=dst_outer) /
1827              IP(src='123.1.1.1', dst=dst_inner) /
1828              UDP(sport=1234, dport=1234))
1829         return p
1830
1831     def create_packet_header_IPv6_SRH_IPv4(self, dst, sidlist, segleft):
1832         """Create packet header: IPv4 encapsulated in SRv6:
1833         IPv6 header with SRH, IPv4 header, UDP header
1834
1835         :param ipv4address dst: inner IPv4 destination address
1836         :param list sidlist: segment list of outer IPv6 SRH
1837         :param int segleft: segments-left field of outer IPv6 SRH
1838
1839         Outer IPv6 destination address is set to sidlist[segleft]
1840         IPv6 source address is 1234::1
1841         IPv4 source address is 123.1.1.1
1842         UDP source port and destination port are 1234
1843         """
1844
1845         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1846              IPv6ExtHdrSegmentRouting(addresses=sidlist,
1847                                       segleft=segleft, nh=4) /
1848              IP(src='123.1.1.1', dst=dst) /
1849              UDP(sport=1234, dport=1234))
1850         return p
1851
1852     def create_packet_header_L2(self, vlan=0):
1853         """Create packet header: L2 header
1854
1855         :param vlan: if vlan!=0 then add 802.1q header
1856         """
1857         # Note: the dst addr ('00:55:44:33:22:11') is used in
1858         # the compare function compare_rx_tx_packet_T_Encaps_L2
1859         # to detect presence of L2 in SRH payload
1860         p = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
1861         etype = 0x8137  # IPX
1862         if vlan:
1863             # add 802.1q layer
1864             p /= Dot1Q(vlan=vlan, type=etype)
1865         else:
1866             p.type = etype
1867         return p
1868
1869     def create_packet_header_IPv6_SRH_L2(self, sidlist, segleft, vlan=0):
1870         """Create packet header: L2 encapsulated in SRv6:
1871         IPv6 header with SRH, L2
1872
1873         :param list sidlist: segment list of outer IPv6 SRH
1874         :param int segleft: segments-left field of outer IPv6 SRH
1875         :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
1876
1877         Outer IPv6 destination address is set to sidlist[segleft]
1878         IPv6 source address is 1234::1
1879         """
1880         eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
1881         etype = 0x8137  # IPX
1882         if vlan:
1883             # add 802.1q layer
1884             eth /= Dot1Q(vlan=vlan, type=etype)
1885         else:
1886             eth.type = etype
1887
1888         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1889              IPv6ExtHdrSegmentRouting(addresses=sidlist,
1890                                       segleft=segleft, nh=59) /
1891              eth)
1892         return p
1893
1894     def create_packet_header_IPv6_L2(self, dst_outer, vlan=0):
1895         """Create packet header: L2 encapsulated in IPv6:
1896         IPv6 header, L2
1897
1898         :param ipv6address dst_outer: outer IPv6 destination address
1899         :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
1900         """
1901         eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
1902         etype = 0x8137  # IPX
1903         if vlan:
1904             # add 802.1q layer
1905             eth /= Dot1Q(vlan=vlan, type=etype)
1906         else:
1907             eth.type = etype
1908
1909         p = (IPv6(src='1234::1', dst=dst_outer, nh=59) / eth)
1910         return p
1911
1912     def get_payload_info(self, packet):
1913         """ Extract the payload_info from the packet
1914         """
1915         # in most cases, payload_info is in packet[Raw]
1916         # but packet[Raw] gives the complete payload
1917         # (incl L2 header) for the T.Encaps L2 case
1918         try:
1919             payload_info = self.payload_to_info(str(packet[Raw]))
1920
1921         except:
1922             # remote L2 header from packet[Raw]:
1923             # take packet[Raw], convert it to an Ether layer
1924             # and then extract Raw from it
1925             payload_info = self.payload_to_info(
1926                 str(Ether(str(packet[Raw]))[Raw]))
1927
1928         return payload_info
1929
1930     def verify_captured_pkts(self, dst_if, capture, compare_func):
1931         """
1932         Verify captured packet stream for specified interface.
1933         Compare ingress with egress packets using the specified compare fn
1934
1935         :param dst_if: egress interface of DUT
1936         :param capture: captured packets
1937         :param compare_func: function to compare in and out packet
1938         """
1939         self.logger.info("Verifying capture on interface %s using function %s"
1940                          % (dst_if.name, compare_func.func_name))
1941
1942         last_info = dict()
1943         for i in self.pg_interfaces:
1944             last_info[i.sw_if_index] = None
1945         dst_sw_if_index = dst_if.sw_if_index
1946
1947         for packet in capture:
1948             try:
1949                 # extract payload_info from packet's payload
1950                 payload_info = self.get_payload_info(packet)
1951                 packet_index = payload_info.index
1952
1953                 self.logger.debug("Verifying packet with index %d"
1954                                   % (packet_index))
1955                 # packet should have arrived on the expected interface
1956                 self.assertEqual(payload_info.dst, dst_sw_if_index)
1957                 self.logger.debug(
1958                     "Got packet on interface %s: src=%u (idx=%u)" %
1959                     (dst_if.name, payload_info.src, packet_index))
1960
1961                 # search for payload_info with same src and dst if_index
1962                 # this will give us the transmitted packet
1963                 next_info = self.get_next_packet_info_for_interface2(
1964                     payload_info.src, dst_sw_if_index,
1965                     last_info[payload_info.src])
1966                 last_info[payload_info.src] = next_info
1967                 # next_info should not be None
1968                 self.assertTrue(next_info is not None)
1969                 # index of tx and rx packets should be equal
1970                 self.assertEqual(packet_index, next_info.index)
1971                 # data field of next_info contains the tx packet
1972                 txed_packet = next_info.data
1973
1974                 self.logger.debug(ppp("Transmitted packet:",
1975                                       txed_packet))  # ppp=Pretty Print Packet
1976
1977                 self.logger.debug(ppp("Received packet:", packet))
1978
1979                 # compare rcvd packet with expected packet using compare_func
1980                 compare_func(txed_packet, packet)
1981
1982             except:
1983                 print packet.command()
1984                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1985                 raise
1986
1987         # have all expected packets arrived?
1988         for i in self.pg_interfaces:
1989             remaining_packet = self.get_next_packet_info_for_interface2(
1990                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
1991             self.assertTrue(remaining_packet is None,
1992                             "Interface %s: Packet expected from interface %s "
1993                             "didn't arrive" % (dst_if.name, i.name))
1994
1995
1996 if __name__ == '__main__':
1997     unittest.main(testRunner=VppTestRunner)