Enforce FIB table creation before use
[vpp.git] / test / test_srv6.py
1 #!/usr/bin/env python
2
3 import unittest
4 from socket import AF_INET6
5
6 from framework import VppTestCase, VppTestRunner
7 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto, VppIpTable
8 from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \
9     SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes
10
11 from scapy.packet import Raw
12 from scapy.layers.l2 import Ether, Dot1Q
13 from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
14 from scapy.layers.inet import IP, UDP
15
16 from scapy.utils import inet_pton, inet_ntop
17
18 from util import ppp
19
20
21 class TestSRv6(VppTestCase):
22     """ SRv6 Test Case """
23
24     @classmethod
25     def setUpClass(self):
26         super(TestSRv6, self).setUpClass()
27
28     def setUp(self):
29         """ Perform test setup before each test case.
30         """
31         super(TestSRv6, self).setUp()
32
33         # packet sizes, inclusive L2 overhead
34         self.pg_packet_sizes = [64, 512, 1518, 9018]
35
36         # reset packet_infos
37         self.reset_packet_infos()
38
39     def tearDown(self):
40         """ Clean up test setup after each test case.
41         """
42         self.teardown_interfaces()
43
44         super(TestSRv6, self).tearDown()
45
46     def configure_interface(self,
47                             interface,
48                             ipv6=False, ipv4=False,
49                             ipv6_table_id=0, ipv4_table_id=0):
50         """ Configure interface.
51         :param ipv6: configure IPv6 on interface
52         :param ipv4: configure IPv4 on interface
53         :param ipv6_table_id: FIB table_id for IPv6
54         :param ipv4_table_id: FIB table_id for IPv4
55         """
56         self.logger.debug("Configuring interface %s" % (interface.name))
57         if ipv6:
58             self.logger.debug("Configuring IPv6")
59             interface.set_table_ip6(ipv6_table_id)
60             interface.config_ip6()
61             interface.resolve_ndp(timeout=5)
62         if ipv4:
63             self.logger.debug("Configuring IPv4")
64             interface.set_table_ip4(ipv4_table_id)
65             interface.config_ip4()
66             interface.resolve_arp()
67         interface.admin_up()
68
69     def setup_interfaces(self, ipv6=[], ipv4=[],
70                          ipv6_table_id=[], ipv4_table_id=[]):
71         """ Create and configure interfaces.
72
73         :param ipv6: list of interface IPv6 capabilities
74         :param ipv4: list of interface IPv4 capabilities
75         :param ipv6_table_id: list of intf IPv6 FIB table_ids
76         :param ipv4_table_id: list of intf IPv4 FIB table_ids
77         :returns: List of created interfaces.
78         """
79         # how many interfaces?
80         if len(ipv6):
81             count = len(ipv6)
82         else:
83             count = len(ipv4)
84         self.logger.debug("Creating and configuring %d interfaces" % (count))
85
86         # fill up ipv6 and ipv4 lists if needed
87         # not enabled (False) is the default
88         if len(ipv6) < count:
89             ipv6 += (count - len(ipv6)) * [False]
90         if len(ipv4) < count:
91             ipv4 += (count - len(ipv4)) * [False]
92
93         # fill up table_id lists if needed
94         # table_id 0 (global) is the default
95         if len(ipv6_table_id) < count:
96             ipv6_table_id += (count - len(ipv6_table_id)) * [0]
97         if len(ipv4_table_id) < count:
98             ipv4_table_id += (count - len(ipv4_table_id)) * [0]
99
100         # create 'count' pg interfaces
101         self.create_pg_interfaces(range(count))
102
103         # create all tables
104         self.tables = []
105         ids = sorted(set(ipv4_table_id))
106         for i in range(len(ids)):
107             if 0 != ids[i]:
108                 self.tables.append(VppIpTable(self, ids[i]))
109         ids = sorted(set(ipv6_table_id))
110         for i in range(len(ids)):
111             if 0 != ids[i]:
112                 self.tables.append(VppIpTable(self, ids[i], is_ip6=1))
113         for t in self.tables:
114             t.add_vpp_config()
115
116         # setup all interfaces
117         for i in range(count):
118             intf = self.pg_interfaces[i]
119             self.configure_interface(intf,
120                                      ipv6[i], ipv4[i],
121                                      ipv6_table_id[i], ipv4_table_id[i])
122
123         if any(ipv6):
124             self.logger.debug(self.vapi.cli("show ip6 neighbors"))
125         if any(ipv4):
126             self.logger.debug(self.vapi.cli("show ip arp"))
127         self.logger.debug(self.vapi.cli("show interface"))
128         self.logger.debug(self.vapi.cli("show hardware"))
129
130         return self.pg_interfaces
131
132     def teardown_interfaces(self):
133         """ Unconfigure and bring down interface.
134         """
135         self.logger.debug("Tearing down interfaces")
136         # tear down all interfaces
137         # AFAIK they cannot be deleted
138         for i in self.pg_interfaces:
139             self.logger.debug("Tear down interface %s" % (i.name))
140             i.unconfig()
141             i.set_table_ip4(0)
142             i.set_table_ip6(0)
143             i.admin_down()
144
145     def test_SRv6_T_Encaps(self):
146         """ Test SRv6 Transit.Encaps behavior for IPv6.
147         """
148         # send traffic to one destination interface
149         # source and destination are IPv6 only
150         self.setup_interfaces(ipv6=[True, True])
151
152         # configure FIB entries
153         route = VppIpRoute(self, "a4::", 64,
154                            [VppRoutePath(self.pg1.remote_ip6,
155                                          self.pg1.sw_if_index,
156                                          proto=DpoProto.DPO_PROTO_IP6)],
157                            is_ip6=1)
158         route.add_vpp_config()
159
160         # configure encaps IPv6 source address
161         # needs to be done before SR Policy config
162         # TODO: API?
163         self.vapi.cli("set sr encaps source addr a3::")
164
165         bsid = 'a3::9999:1'
166         # configure SRv6 Policy
167         # Note: segment list order: first -> last
168         sr_policy = VppSRv6Policy(
169             self, bsid=bsid,
170             is_encap=1,
171             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
172             weight=1, fib_table=0,
173             segments=['a4::', 'a5::', 'a6::c7'],
174             source='a3::')
175         sr_policy.add_vpp_config()
176         self.sr_policy = sr_policy
177
178         # log the sr policies
179         self.logger.info(self.vapi.cli("show sr policies"))
180
181         # steer IPv6 traffic to a7::/64 into SRv6 Policy
182         # use the bsid of the above self.sr_policy
183         pol_steering = VppSRv6Steering(
184                         self,
185                         bsid=self.sr_policy.bsid,
186                         prefix="a7::", mask_width=64,
187                         traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
188                         sr_policy_index=0, table_id=0,
189                         sw_if_index=0)
190         pol_steering.add_vpp_config()
191
192         # log the sr steering policies
193         self.logger.info(self.vapi.cli("show sr steering policies"))
194
195         # create packets
196         count = len(self.pg_packet_sizes)
197         dst_inner = 'a7::1234'
198         pkts = []
199
200         # create IPv6 packets without SRH
201         packet_header = self.create_packet_header_IPv6(dst_inner)
202         # create traffic stream pg0->pg1
203         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
204                                        self.pg_packet_sizes, count))
205
206         # create IPv6 packets with SRH
207         # packets with segments-left 1, active segment a7::
208         packet_header = self.create_packet_header_IPv6_SRH(
209             sidlist=['a8::', 'a7::', 'a6::'],
210             segleft=1)
211         # create traffic stream pg0->pg1
212         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
213                                        self.pg_packet_sizes, count))
214
215         # create IPv6 packets with SRH and IPv6
216         # packets with segments-left 1, active segment a7::
217         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
218             dst_inner,
219             sidlist=['a8::', 'a7::', 'a6::'],
220             segleft=1)
221         # create traffic stream pg0->pg1
222         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
223                                        self.pg_packet_sizes, count))
224
225         # send packets and verify received packets
226         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
227                                   self.compare_rx_tx_packet_T_Encaps)
228
229         # log the localsid counters
230         self.logger.info(self.vapi.cli("show sr localsid"))
231
232         # remove SR steering
233         pol_steering.remove_vpp_config()
234         self.logger.info(self.vapi.cli("show sr steering policies"))
235
236         # remove SR Policies
237         self.sr_policy.remove_vpp_config()
238         self.logger.info(self.vapi.cli("show sr policies"))
239
240         # remove FIB entries
241         # done by tearDown
242
243         # cleanup interfaces
244         self.teardown_interfaces()
245
246     def test_SRv6_T_Insert(self):
247         """ Test SRv6 Transit.Insert behavior (IPv6 only).
248         """
249         # send traffic to one destination interface
250         # source and destination are IPv6 only
251         self.setup_interfaces(ipv6=[True, True])
252
253         # configure FIB entries
254         route = VppIpRoute(self, "a4::", 64,
255                            [VppRoutePath(self.pg1.remote_ip6,
256                                          self.pg1.sw_if_index,
257                                          proto=DpoProto.DPO_PROTO_IP6)],
258                            is_ip6=1)
259         route.add_vpp_config()
260
261         # configure encaps IPv6 source address
262         # needs to be done before SR Policy config
263         # TODO: API?
264         self.vapi.cli("set sr encaps source addr a3::")
265
266         bsid = 'a3::9999:1'
267         # configure SRv6 Policy
268         # Note: segment list order: first -> last
269         sr_policy = VppSRv6Policy(
270             self, bsid=bsid,
271             is_encap=0,
272             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
273             weight=1, fib_table=0,
274             segments=['a4::', 'a5::', 'a6::c7'],
275             source='a3::')
276         sr_policy.add_vpp_config()
277         self.sr_policy = sr_policy
278
279         # log the sr policies
280         self.logger.info(self.vapi.cli("show sr policies"))
281
282         # steer IPv6 traffic to a7::/64 into SRv6 Policy
283         # use the bsid of the above self.sr_policy
284         pol_steering = VppSRv6Steering(
285                         self,
286                         bsid=self.sr_policy.bsid,
287                         prefix="a7::", mask_width=64,
288                         traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV6,
289                         sr_policy_index=0, table_id=0,
290                         sw_if_index=0)
291         pol_steering.add_vpp_config()
292
293         # log the sr steering policies
294         self.logger.info(self.vapi.cli("show sr steering policies"))
295
296         # create packets
297         count = len(self.pg_packet_sizes)
298         dst_inner = 'a7::1234'
299         pkts = []
300
301         # create IPv6 packets without SRH
302         packet_header = self.create_packet_header_IPv6(dst_inner)
303         # create traffic stream pg0->pg1
304         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
305                                        self.pg_packet_sizes, count))
306
307         # create IPv6 packets with SRH
308         # packets with segments-left 1, active segment a7::
309         packet_header = self.create_packet_header_IPv6_SRH(
310             sidlist=['a8::', 'a7::', 'a6::'],
311             segleft=1)
312         # create traffic stream pg0->pg1
313         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
314                                        self.pg_packet_sizes, count))
315
316         # send packets and verify received packets
317         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
318                                   self.compare_rx_tx_packet_T_Insert)
319
320         # log the localsid counters
321         self.logger.info(self.vapi.cli("show sr localsid"))
322
323         # remove SR steering
324         pol_steering.remove_vpp_config()
325         self.logger.info(self.vapi.cli("show sr steering policies"))
326
327         # remove SR Policies
328         self.sr_policy.remove_vpp_config()
329         self.logger.info(self.vapi.cli("show sr policies"))
330
331         # remove FIB entries
332         # done by tearDown
333
334         # cleanup interfaces
335         self.teardown_interfaces()
336
337     def test_SRv6_T_Encaps_IPv4(self):
338         """ Test SRv6 Transit.Encaps behavior for IPv4.
339         """
340         # send traffic to one destination interface
341         # source interface is IPv4 only
342         # destination interface is IPv6 only
343         self.setup_interfaces(ipv6=[False, True], ipv4=[True, False])
344
345         # configure FIB entries
346         route = VppIpRoute(self, "a4::", 64,
347                            [VppRoutePath(self.pg1.remote_ip6,
348                                          self.pg1.sw_if_index,
349                                          proto=DpoProto.DPO_PROTO_IP6)],
350                            is_ip6=1)
351         route.add_vpp_config()
352
353         # configure encaps IPv6 source address
354         # needs to be done before SR Policy config
355         # TODO: API?
356         self.vapi.cli("set sr encaps source addr a3::")
357
358         bsid = 'a3::9999:1'
359         # configure SRv6 Policy
360         # Note: segment list order: first -> last
361         sr_policy = VppSRv6Policy(
362             self, bsid=bsid,
363             is_encap=1,
364             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
365             weight=1, fib_table=0,
366             segments=['a4::', 'a5::', 'a6::c7'],
367             source='a3::')
368         sr_policy.add_vpp_config()
369         self.sr_policy = sr_policy
370
371         # log the sr policies
372         self.logger.info(self.vapi.cli("show sr policies"))
373
374         # steer IPv4 traffic to 7.1.1.0/24 into SRv6 Policy
375         # use the bsid of the above self.sr_policy
376         pol_steering = VppSRv6Steering(
377                         self,
378                         bsid=self.sr_policy.bsid,
379                         prefix="7.1.1.0", mask_width=24,
380                         traffic_type=SRv6PolicySteeringTypes.SR_STEER_IPV4,
381                         sr_policy_index=0, table_id=0,
382                         sw_if_index=0)
383         pol_steering.add_vpp_config()
384
385         # log the sr steering policies
386         self.logger.info(self.vapi.cli("show sr steering policies"))
387
388         # create packets
389         count = len(self.pg_packet_sizes)
390         dst_inner = '7.1.1.123'
391         pkts = []
392
393         # create IPv4 packets
394         packet_header = self.create_packet_header_IPv4(dst_inner)
395         # create traffic stream pg0->pg1
396         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
397                                        self.pg_packet_sizes, count))
398
399         # send packets and verify received packets
400         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
401                                   self.compare_rx_tx_packet_T_Encaps_IPv4)
402
403         # log the localsid counters
404         self.logger.info(self.vapi.cli("show sr localsid"))
405
406         # remove SR steering
407         pol_steering.remove_vpp_config()
408         self.logger.info(self.vapi.cli("show sr steering policies"))
409
410         # remove SR Policies
411         self.sr_policy.remove_vpp_config()
412         self.logger.info(self.vapi.cli("show sr policies"))
413
414         # remove FIB entries
415         # done by tearDown
416
417         # cleanup interfaces
418         self.teardown_interfaces()
419
420     @unittest.skip("VPP crashes after running this test")
421     def test_SRv6_T_Encaps_L2(self):
422         """ Test SRv6 Transit.Encaps behavior for L2.
423         """
424         # send traffic to one destination interface
425         # source interface is IPv4 only TODO?
426         # destination interface is IPv6 only
427         self.setup_interfaces(ipv6=[False, True], ipv4=[False, False])
428
429         # configure FIB entries
430         route = VppIpRoute(self, "a4::", 64,
431                            [VppRoutePath(self.pg1.remote_ip6,
432                                          self.pg1.sw_if_index,
433                                          proto=DpoProto.DPO_PROTO_IP6)],
434                            is_ip6=1)
435         route.add_vpp_config()
436
437         # configure encaps IPv6 source address
438         # needs to be done before SR Policy config
439         # TODO: API?
440         self.vapi.cli("set sr encaps source addr a3::")
441
442         bsid = 'a3::9999:1'
443         # configure SRv6 Policy
444         # Note: segment list order: first -> last
445         sr_policy = VppSRv6Policy(
446             self, bsid=bsid,
447             is_encap=1,
448             sr_type=SRv6PolicyType.SR_POLICY_TYPE_DEFAULT,
449             weight=1, fib_table=0,
450             segments=['a4::', 'a5::', 'a6::c7'],
451             source='a3::')
452         sr_policy.add_vpp_config()
453         self.sr_policy = sr_policy
454
455         # log the sr policies
456         self.logger.info(self.vapi.cli("show sr policies"))
457
458         # steer L2 traffic into SRv6 Policy
459         # use the bsid of the above self.sr_policy
460         pol_steering = VppSRv6Steering(
461                         self,
462                         bsid=self.sr_policy.bsid,
463                         prefix="::", mask_width=0,
464                         traffic_type=SRv6PolicySteeringTypes.SR_STEER_L2,
465                         sr_policy_index=0, table_id=0,
466                         sw_if_index=self.pg0.sw_if_index)
467         pol_steering.add_vpp_config()
468
469         # log the sr steering policies
470         self.logger.info(self.vapi.cli("show sr steering policies"))
471
472         # create packets
473         count = len(self.pg_packet_sizes)
474         pkts = []
475
476         # create L2 packets without dot1q header
477         packet_header = self.create_packet_header_L2()
478         # create traffic stream pg0->pg1
479         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
480                                        self.pg_packet_sizes, count))
481
482         # create L2 packets with dot1q header
483         packet_header = self.create_packet_header_L2(vlan=123)
484         # create traffic stream pg0->pg1
485         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
486                                        self.pg_packet_sizes, count))
487
488         # send packets and verify received packets
489         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
490                                   self.compare_rx_tx_packet_T_Encaps_L2)
491
492         # log the localsid counters
493         self.logger.info(self.vapi.cli("show sr localsid"))
494
495         # remove SR steering
496         pol_steering.remove_vpp_config()
497         self.logger.info(self.vapi.cli("show sr steering policies"))
498
499         # remove SR Policies
500         self.sr_policy.remove_vpp_config()
501         self.logger.info(self.vapi.cli("show sr policies"))
502
503         # remove FIB entries
504         # done by tearDown
505
506         # cleanup interfaces
507         self.teardown_interfaces()
508
509     def test_SRv6_End(self):
510         """ Test SRv6 End (without PSP) behavior.
511         """
512         # send traffic to one destination interface
513         # source and destination interfaces are IPv6 only
514         self.setup_interfaces(ipv6=[True, True])
515
516         # configure FIB entries
517         route = VppIpRoute(self, "a4::", 64,
518                            [VppRoutePath(self.pg1.remote_ip6,
519                                          self.pg1.sw_if_index,
520                                          proto=DpoProto.DPO_PROTO_IP6)],
521                            is_ip6=1)
522         route.add_vpp_config()
523
524         # configure SRv6 localSID End without PSP behavior
525         localsid = VppSRv6LocalSID(
526                         self, localsid_addr='A3::0',
527                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
528                         nh_addr='::',
529                         end_psp=0,
530                         sw_if_index=0,
531                         vlan_index=0,
532                         fib_table=0)
533         localsid.add_vpp_config()
534         # log the localsids
535         self.logger.debug(self.vapi.cli("show sr localsid"))
536
537         # create IPv6 packets with SRH (SL=2, SL=1, SL=0)
538         # send one packet per SL value per packet size
539         # SL=0 packet with localSID End with USP needs 2nd SRH
540         count = len(self.pg_packet_sizes)
541         dst_inner = 'a4::1234'
542         pkts = []
543
544         # packets with segments-left 2, active segment a3::
545         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
546                 dst_inner,
547                 sidlist=['a5::', 'a4::', 'a3::'],
548                 segleft=2)
549         # create traffic stream pg0->pg1
550         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
551                                        self.pg_packet_sizes, count))
552
553         # packets with segments-left 1, active segment a3::
554         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
555                 dst_inner,
556                 sidlist=['a4::', 'a3::', 'a2::'],
557                 segleft=1)
558         # add to traffic stream pg0->pg1
559         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
560                                        self.pg_packet_sizes, count))
561
562         # TODO: test behavior with SL=0 packet (needs 2*SRH?)
563
564         # send packets and verify received packets
565         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
566                                   self.compare_rx_tx_packet_End)
567
568         # log the localsid counters
569         self.logger.info(self.vapi.cli("show sr localsid"))
570
571         # remove SRv6 localSIDs
572         localsid.remove_vpp_config()
573
574         # remove FIB entries
575         # done by tearDown
576
577         # cleanup interfaces
578         self.teardown_interfaces()
579
580     def test_SRv6_End_with_PSP(self):
581         """ Test SRv6 End with PSP behavior.
582         """
583         # send traffic to one destination interface
584         # source and destination interfaces are IPv6 only
585         self.setup_interfaces(ipv6=[True, True])
586
587         # configure FIB entries
588         route = VppIpRoute(self, "a4::", 64,
589                            [VppRoutePath(self.pg1.remote_ip6,
590                                          self.pg1.sw_if_index,
591                                          proto=DpoProto.DPO_PROTO_IP6)],
592                            is_ip6=1)
593         route.add_vpp_config()
594
595         # configure SRv6 localSID End with PSP behavior
596         localsid = VppSRv6LocalSID(
597                         self, localsid_addr='A3::0',
598                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_END,
599                         nh_addr='::',
600                         end_psp=1,
601                         sw_if_index=0,
602                         vlan_index=0,
603                         fib_table=0)
604         localsid.add_vpp_config()
605         # log the localsids
606         self.logger.debug(self.vapi.cli("show sr localsid"))
607
608         # create IPv6 packets with SRH (SL=2, SL=1)
609         # send one packet per SL value per packet size
610         # SL=0 packet with localSID End with PSP is dropped
611         count = len(self.pg_packet_sizes)
612         dst_inner = 'a4::1234'
613         pkts = []
614
615         # packets with segments-left 2, active segment a3::
616         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
617                     dst_inner,
618                     sidlist=['a5::', 'a4::', 'a3::'],
619                     segleft=2)
620         # create traffic stream pg0->pg1
621         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
622                                        self.pg_packet_sizes, count))
623
624         # packets with segments-left 1, active segment a3::
625         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
626                     dst_inner,
627                     sidlist=['a4::', 'a3::', 'a2::'],
628                     segleft=1)
629         # add to traffic stream pg0->pg1
630         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
631                                        self.pg_packet_sizes, count))
632
633         # send packets and verify received packets
634         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
635                                   self.compare_rx_tx_packet_End_PSP)
636
637         # log the localsid counters
638         self.logger.info(self.vapi.cli("show sr localsid"))
639
640         # remove SRv6 localSIDs
641         localsid.remove_vpp_config()
642
643         # remove FIB entries
644         # done by tearDown
645
646         # cleanup interfaces
647         self.teardown_interfaces()
648
649     def test_SRv6_End_X(self):
650         """ Test SRv6 End.X (without PSP) behavior.
651         """
652         # create three interfaces (1 source, 2 destinations)
653         # source and destination interfaces are IPv6 only
654         self.setup_interfaces(ipv6=[True, True, True])
655
656         # configure FIB entries
657         # a4::/64 via pg1 and pg2
658         route = VppIpRoute(self, "a4::", 64,
659                            [VppRoutePath(self.pg1.remote_ip6,
660                                          self.pg1.sw_if_index,
661                                          proto=DpoProto.DPO_PROTO_IP6),
662                             VppRoutePath(self.pg2.remote_ip6,
663                                          self.pg2.sw_if_index,
664                                          proto=DpoProto.DPO_PROTO_IP6)],
665                            is_ip6=1)
666         route.add_vpp_config()
667         self.logger.debug(self.vapi.cli("show ip6 fib"))
668
669         # configure SRv6 localSID End.X without PSP behavior
670         # End.X points to interface pg1
671         localsid = VppSRv6LocalSID(
672                         self, localsid_addr='A3::C4',
673                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
674                         nh_addr=self.pg1.remote_ip6,
675                         end_psp=0,
676                         sw_if_index=self.pg1.sw_if_index,
677                         vlan_index=0,
678                         fib_table=0)
679         localsid.add_vpp_config()
680         # log the localsids
681         self.logger.debug(self.vapi.cli("show sr localsid"))
682
683         # create IPv6 packets with SRH (SL=2, SL=1)
684         # send one packet per SL value per packet size
685         # SL=0 packet with localSID End with PSP is dropped
686         count = len(self.pg_packet_sizes)
687         dst_inner = 'a4::1234'
688         pkts = []
689
690         # packets with segments-left 2, active segment a3::c4
691         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
692                     dst_inner,
693                     sidlist=['a5::', 'a4::', 'a3::c4'],
694                     segleft=2)
695         # create traffic stream pg0->pg1
696         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
697                                        self.pg_packet_sizes, count))
698
699         # packets with segments-left 1, active segment a3::c4
700         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
701                     dst_inner,
702                     sidlist=['a4::', 'a3::c4', 'a2::'],
703                     segleft=1)
704         # add to traffic stream pg0->pg1
705         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
706                                        self.pg_packet_sizes, count))
707
708         # send packets and verify received packets
709         # using same comparison function as End (no PSP)
710         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
711                                   self.compare_rx_tx_packet_End)
712
713         # assert nothing was received on the other interface (pg2)
714         self.pg2.assert_nothing_captured("mis-directed packet(s)")
715
716         # log the localsid counters
717         self.logger.info(self.vapi.cli("show sr localsid"))
718
719         # remove SRv6 localSIDs
720         localsid.remove_vpp_config()
721
722         # remove FIB entries
723         # done by tearDown
724
725         # cleanup interfaces
726         self.teardown_interfaces()
727
728     def test_SRv6_End_X_with_PSP(self):
729         """ Test SRv6 End.X with PSP behavior.
730         """
731         # create three interfaces (1 source, 2 destinations)
732         # source and destination interfaces are IPv6 only
733         self.setup_interfaces(ipv6=[True, True, True])
734
735         # configure FIB entries
736         # a4::/64 via pg1 and pg2
737         route = VppIpRoute(self, "a4::", 64,
738                            [VppRoutePath(self.pg1.remote_ip6,
739                                          self.pg1.sw_if_index,
740                                          proto=DpoProto.DPO_PROTO_IP6),
741                             VppRoutePath(self.pg2.remote_ip6,
742                                          self.pg2.sw_if_index,
743                                          proto=DpoProto.DPO_PROTO_IP6)],
744                            is_ip6=1)
745         route.add_vpp_config()
746
747         # configure SRv6 localSID End with PSP behavior
748         localsid = VppSRv6LocalSID(
749                         self, localsid_addr='A3::C4',
750                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_X,
751                         nh_addr=self.pg1.remote_ip6,
752                         end_psp=1,
753                         sw_if_index=self.pg1.sw_if_index,
754                         vlan_index=0,
755                         fib_table=0)
756         localsid.add_vpp_config()
757         # log the localsids
758         self.logger.debug(self.vapi.cli("show sr localsid"))
759
760         # create IPv6 packets with SRH (SL=2, SL=1)
761         # send one packet per SL value per packet size
762         # SL=0 packet with localSID End with PSP is dropped
763         count = len(self.pg_packet_sizes)
764         dst_inner = 'a4::1234'
765         pkts = []
766
767         # packets with segments-left 2, active segment a3::
768         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
769                     dst_inner,
770                     sidlist=['a5::', 'a4::', 'a3::c4'],
771                     segleft=2)
772         # create traffic stream pg0->pg1
773         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
774                                        self.pg_packet_sizes, count))
775
776         # packets with segments-left 1, active segment a3::
777         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
778                     dst_inner,
779                     sidlist=['a4::', 'a3::c4', 'a2::'],
780                     segleft=1)
781         # add to traffic stream pg0->pg1
782         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
783                                        self.pg_packet_sizes, count))
784
785         # send packets and verify received packets
786         # using same comparison function as End with PSP
787         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
788                                   self.compare_rx_tx_packet_End_PSP)
789
790         # assert nothing was received on the other interface (pg2)
791         self.pg2.assert_nothing_captured("mis-directed packet(s)")
792
793         # log the localsid counters
794         self.logger.info(self.vapi.cli("show sr localsid"))
795
796         # remove SRv6 localSIDs
797         localsid.remove_vpp_config()
798
799         # remove FIB entries
800         # done by tearDown
801
802         # cleanup interfaces
803         self.teardown_interfaces()
804
805     def test_SRv6_End_DX6(self):
806         """ Test SRv6 End.DX6 behavior.
807         """
808         # send traffic to one destination interface
809         # source and destination interfaces are IPv6 only
810         self.setup_interfaces(ipv6=[True, True])
811
812         # configure SRv6 localSID End.DX6 behavior
813         localsid = VppSRv6LocalSID(
814                         self, localsid_addr='a3::c4',
815                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX6,
816                         nh_addr=self.pg1.remote_ip6,
817                         end_psp=0,
818                         sw_if_index=self.pg1.sw_if_index,
819                         vlan_index=0,
820                         fib_table=0)
821         localsid.add_vpp_config()
822         # log the localsids
823         self.logger.debug(self.vapi.cli("show sr localsid"))
824
825         # create IPv6 packets with SRH (SL=0)
826         # send one packet per packet size
827         count = len(self.pg_packet_sizes)
828         dst_inner = 'a4::1234'  # inner header destination address
829         pkts = []
830
831         # packets with SRH, segments-left 0, active segment a3::c4
832         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
833                         dst_inner,
834                         sidlist=['a3::c4', 'a2::', 'a1::'],
835                         segleft=0)
836         # add to traffic stream pg0->pg1
837         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
838                                        self.pg_packet_sizes, count))
839
840         # packets without SRH, IPv6 in IPv6
841         # outer IPv6 dest addr is the localsid End.DX6
842         packet_header = self.create_packet_header_IPv6_IPv6(
843                                             dst_inner,
844                                             dst_outer='a3::c4')
845         # add to traffic stream pg0->pg1
846         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
847                                        self.pg_packet_sizes, count))
848
849         # send packets and verify received packets
850         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
851                                   self.compare_rx_tx_packet_End_DX6)
852
853         # log the localsid counters
854         self.logger.info(self.vapi.cli("show sr localsid"))
855
856         # remove SRv6 localSIDs
857         localsid.remove_vpp_config()
858
859         # cleanup interfaces
860         self.teardown_interfaces()
861
862     def test_SRv6_End_DT6(self):
863         """ Test SRv6 End.DT6 behavior.
864         """
865         # create three interfaces (1 source, 2 destinations)
866         # all interfaces are IPv6 only
867         # source interface in global FIB (0)
868         # destination interfaces in global and vrf
869         vrf_1 = 1
870         self.setup_interfaces(ipv6=[True, True, True],
871                               ipv6_table_id=[0, 0, vrf_1])
872
873         # configure FIB entries
874         # a4::/64 is reachable
875         #     via pg1 in table 0 (global)
876         #     and via pg2 in table vrf_1
877         route0 = VppIpRoute(self, "a4::", 64,
878                             [VppRoutePath(self.pg1.remote_ip6,
879                                           self.pg1.sw_if_index,
880                                           proto=DpoProto.DPO_PROTO_IP6,
881                                           nh_table_id=0)],
882                             table_id=0,
883                             is_ip6=1)
884         route0.add_vpp_config()
885         route1 = VppIpRoute(self, "a4::", 64,
886                             [VppRoutePath(self.pg2.remote_ip6,
887                                           self.pg2.sw_if_index,
888                                           proto=DpoProto.DPO_PROTO_IP6,
889                                           nh_table_id=vrf_1)],
890                             table_id=vrf_1,
891                             is_ip6=1)
892         route1.add_vpp_config()
893         self.logger.debug(self.vapi.cli("show ip6 fib"))
894
895         # configure SRv6 localSID End.DT6 behavior
896         # Note:
897         # fib_table: where the localsid is installed
898         # sw_if_index: in T-variants of localsid this is the vrf table_id
899         localsid = VppSRv6LocalSID(
900                         self, localsid_addr='a3::c4',
901                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT6,
902                         nh_addr='::',
903                         end_psp=0,
904                         sw_if_index=vrf_1,
905                         vlan_index=0,
906                         fib_table=0)
907         localsid.add_vpp_config()
908         # log the localsids
909         self.logger.debug(self.vapi.cli("show sr localsid"))
910
911         # create IPv6 packets with SRH (SL=0)
912         # send one packet per packet size
913         count = len(self.pg_packet_sizes)
914         dst_inner = 'a4::1234'  # inner header destination address
915         pkts = []
916
917         # packets with SRH, segments-left 0, active segment a3::c4
918         packet_header = self.create_packet_header_IPv6_SRH_IPv6(
919                         dst_inner,
920                         sidlist=['a3::c4', 'a2::', 'a1::'],
921                         segleft=0)
922         # add to traffic stream pg0->pg1
923         pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
924                                        self.pg_packet_sizes, count))
925
926         # packets without SRH, IPv6 in IPv6
927         # outer IPv6 dest addr is the localsid End.DT6
928         packet_header = self.create_packet_header_IPv6_IPv6(
929                                             dst_inner,
930                                             dst_outer='a3::c4')
931         # add to traffic stream pg0->pg1
932         pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
933                                        self.pg_packet_sizes, count))
934
935         # send packets and verify received packets
936         # using same comparison function as End.DX6
937         self.send_and_verify_pkts(self.pg0, pkts, self.pg2,
938                                   self.compare_rx_tx_packet_End_DX6)
939
940         # assert nothing was received on the other interface (pg2)
941         self.pg1.assert_nothing_captured("mis-directed packet(s)")
942
943         # log the localsid counters
944         self.logger.info(self.vapi.cli("show sr localsid"))
945
946         # remove SRv6 localSIDs
947         localsid.remove_vpp_config()
948
949         # remove FIB entries
950         # done by tearDown
951
952         # cleanup interfaces
953         self.teardown_interfaces()
954
955     def test_SRv6_End_DX4(self):
956         """ Test SRv6 End.DX4 behavior.
957         """
958         # send traffic to one destination interface
959         # source interface is IPv6 only
960         # destination interface is IPv4 only
961         self.setup_interfaces(ipv6=[True, False], ipv4=[False, True])
962
963         # configure SRv6 localSID End.DX4 behavior
964         localsid = VppSRv6LocalSID(
965                         self, localsid_addr='a3::c4',
966                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX4,
967                         nh_addr=self.pg1.remote_ip4,
968                         end_psp=0,
969                         sw_if_index=self.pg1.sw_if_index,
970                         vlan_index=0,
971                         fib_table=0)
972         localsid.add_vpp_config()
973         # log the localsids
974         self.logger.debug(self.vapi.cli("show sr localsid"))
975
976         # send one packet per packet size
977         count = len(self.pg_packet_sizes)
978         dst_inner = '4.1.1.123'  # inner header destination address
979         pkts = []
980
981         # packets with SRH, segments-left 0, active segment a3::c4
982         packet_header = self.create_packet_header_IPv6_SRH_IPv4(
983                         dst_inner,
984                         sidlist=['a3::c4', 'a2::', 'a1::'],
985                         segleft=0)
986         # add to traffic stream pg0->pg1
987         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
988                                        self.pg_packet_sizes, count))
989
990         # packets without SRH, IPv4 in IPv6
991         # outer IPv6 dest addr is the localsid End.DX4
992         packet_header = self.create_packet_header_IPv6_IPv4(
993                                             dst_inner,
994                                             dst_outer='a3::c4')
995         # add to traffic stream pg0->pg1
996         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
997                                        self.pg_packet_sizes, count))
998
999         # send packets and verify received packets
1000         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
1001                                   self.compare_rx_tx_packet_End_DX4)
1002
1003         # log the localsid counters
1004         self.logger.info(self.vapi.cli("show sr localsid"))
1005
1006         # remove SRv6 localSIDs
1007         localsid.remove_vpp_config()
1008
1009         # cleanup interfaces
1010         self.teardown_interfaces()
1011
1012     def test_SRv6_End_DT4(self):
1013         """ Test SRv6 End.DT4 behavior.
1014         """
1015         # create three interfaces (1 source, 2 destinations)
1016         # source interface is IPv6-only
1017         # destination interfaces are IPv4 only
1018         # source interface in global FIB (0)
1019         # destination interfaces in global and vrf
1020         vrf_1 = 1
1021         self.setup_interfaces(ipv6=[True, False, False],
1022                               ipv4=[False, True, True],
1023                               ipv6_table_id=[0, 0, 0],
1024                               ipv4_table_id=[0, 0, vrf_1])
1025
1026         # configure FIB entries
1027         # 4.1.1.0/24 is reachable
1028         #     via pg1 in table 0 (global)
1029         #     and via pg2 in table vrf_1
1030         route0 = VppIpRoute(self, "4.1.1.0", 24,
1031                             [VppRoutePath(self.pg1.remote_ip4,
1032                                           self.pg1.sw_if_index,
1033                                           nh_table_id=0)],
1034                             table_id=0,
1035                             is_ip6=0)
1036         route0.add_vpp_config()
1037         route1 = VppIpRoute(self, "4.1.1.0", 24,
1038                             [VppRoutePath(self.pg2.remote_ip4,
1039                                           self.pg2.sw_if_index,
1040                                           nh_table_id=vrf_1)],
1041                             table_id=vrf_1,
1042                             is_ip6=0)
1043         route1.add_vpp_config()
1044         self.logger.debug(self.vapi.cli("show ip fib"))
1045
1046         # configure SRv6 localSID End.DT6 behavior
1047         # Note:
1048         # fib_table: where the localsid is installed
1049         # sw_if_index: in T-variants of localsid: vrf table_id
1050         localsid = VppSRv6LocalSID(
1051                         self, localsid_addr='a3::c4',
1052                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DT4,
1053                         nh_addr='::',
1054                         end_psp=0,
1055                         sw_if_index=vrf_1,
1056                         vlan_index=0,
1057                         fib_table=0)
1058         localsid.add_vpp_config()
1059         # log the localsids
1060         self.logger.debug(self.vapi.cli("show sr localsid"))
1061
1062         # create IPv6 packets with SRH (SL=0)
1063         # send one packet per packet size
1064         count = len(self.pg_packet_sizes)
1065         dst_inner = '4.1.1.123'  # inner header destination address
1066         pkts = []
1067
1068         # packets with SRH, segments-left 0, active segment a3::c4
1069         packet_header = self.create_packet_header_IPv6_SRH_IPv4(
1070                         dst_inner,
1071                         sidlist=['a3::c4', 'a2::', 'a1::'],
1072                         segleft=0)
1073         # add to traffic stream pg0->pg1
1074         pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
1075                                        self.pg_packet_sizes, count))
1076
1077         # packets without SRH, IPv6 in IPv6
1078         # outer IPv6 dest addr is the localsid End.DX4
1079         packet_header = self.create_packet_header_IPv6_IPv4(
1080                                             dst_inner,
1081                                             dst_outer='a3::c4')
1082         # add to traffic stream pg0->pg1
1083         pkts.extend(self.create_stream(self.pg0, self.pg2, packet_header,
1084                                        self.pg_packet_sizes, count))
1085
1086         # send packets and verify received packets
1087         # using same comparison function as End.DX4
1088         self.send_and_verify_pkts(self.pg0, pkts, self.pg2,
1089                                   self.compare_rx_tx_packet_End_DX4)
1090
1091         # assert nothing was received on the other interface (pg2)
1092         self.pg1.assert_nothing_captured("mis-directed packet(s)")
1093
1094         # log the localsid counters
1095         self.logger.info(self.vapi.cli("show sr localsid"))
1096
1097         # remove SRv6 localSIDs
1098         localsid.remove_vpp_config()
1099
1100         # remove FIB entries
1101         # done by tearDown
1102
1103         # cleanup interfaces
1104         self.teardown_interfaces()
1105
1106     def test_SRv6_End_DX2(self):
1107         """ Test SRv6 End.DX2 behavior.
1108         """
1109         # send traffic to one destination interface
1110         # source interface is IPv6 only
1111         self.setup_interfaces(ipv6=[True, False], ipv4=[False, False])
1112
1113         # configure SRv6 localSID End.DX2 behavior
1114         localsid = VppSRv6LocalSID(
1115                         self, localsid_addr='a3::c4',
1116                         behavior=SRv6LocalSIDBehaviors.SR_BEHAVIOR_DX2,
1117                         nh_addr='::',
1118                         end_psp=0,
1119                         sw_if_index=self.pg1.sw_if_index,
1120                         vlan_index=0,
1121                         fib_table=0)
1122         localsid.add_vpp_config()
1123         # log the localsids
1124         self.logger.debug(self.vapi.cli("show sr localsid"))
1125
1126         # send one packet per packet size
1127         count = len(self.pg_packet_sizes)
1128         pkts = []
1129
1130         # packets with SRH, segments-left 0, active segment a3::c4
1131         # L2 has no dot1q header
1132         packet_header = self.create_packet_header_IPv6_SRH_L2(
1133                             sidlist=['a3::c4', 'a2::', 'a1::'],
1134                             segleft=0,
1135                             vlan=0)
1136         # add to traffic stream pg0->pg1
1137         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1138                                        self.pg_packet_sizes, count))
1139
1140         # packets with SRH, segments-left 0, active segment a3::c4
1141         # L2 has dot1q header
1142         packet_header = self.create_packet_header_IPv6_SRH_L2(
1143                             sidlist=['a3::c4', 'a2::', 'a1::'],
1144                             segleft=0,
1145                             vlan=123)
1146         # add to traffic stream pg0->pg1
1147         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1148                                        self.pg_packet_sizes, count))
1149
1150         # packets without SRH, L2 in IPv6
1151         # outer IPv6 dest addr is the localsid End.DX2
1152         # L2 has no dot1q header
1153         packet_header = self.create_packet_header_IPv6_L2(
1154                                             dst_outer='a3::c4',
1155                                             vlan=0)
1156         # add to traffic stream pg0->pg1
1157         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1158                                        self.pg_packet_sizes, count))
1159
1160         # packets without SRH, L2 in IPv6
1161         # outer IPv6 dest addr is the localsid End.DX2
1162         # L2 has dot1q header
1163         packet_header = self.create_packet_header_IPv6_L2(
1164                                             dst_outer='a3::c4',
1165                                             vlan=123)
1166         # add to traffic stream pg0->pg1
1167         pkts.extend(self.create_stream(self.pg0, self.pg1, packet_header,
1168                                        self.pg_packet_sizes, count))
1169
1170         # send packets and verify received packets
1171         self.send_and_verify_pkts(self.pg0, pkts, self.pg1,
1172                                   self.compare_rx_tx_packet_End_DX2)
1173
1174         # log the localsid counters
1175         self.logger.info(self.vapi.cli("show sr localsid"))
1176
1177         # remove SRv6 localSIDs
1178         localsid.remove_vpp_config()
1179
1180         # cleanup interfaces
1181         self.teardown_interfaces()
1182
1183     def compare_rx_tx_packet_T_Encaps(self, tx_pkt, rx_pkt):
1184         """ Compare input and output packet after passing T.Encaps
1185
1186         :param tx_pkt: transmitted packet
1187         :param rx_pkt: received packet
1188         """
1189         # T.Encaps updates the headers as follows:
1190         # SR Policy seglist (S3, S2, S1)
1191         # SR Policy source C
1192         # IPv6:
1193         # in: IPv6(A, B2)
1194         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(A, B2)
1195         # IPv6 + SRH:
1196         # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1)
1197         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv6(a, B2)SRH(B3, B2, B1; SL=1)
1198
1199         # get first (outer) IPv6 header of rx'ed packet
1200         rx_ip = rx_pkt.getlayer(IPv6)
1201         rx_srh = None
1202
1203         tx_ip = tx_pkt.getlayer(IPv6)
1204
1205         # expected segment-list
1206         seglist = self.sr_policy.segments
1207         # reverse list to get order as in SRH
1208         tx_seglist = seglist[::-1]
1209
1210         # get source address of SR Policy
1211         sr_policy_source = self.sr_policy.source
1212
1213         # rx'ed packet should have SRH
1214         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1215         # get SRH
1216         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1217
1218         # received ip.src should be equal to SR Policy source
1219         self.assertEqual(rx_ip.src, sr_policy_source)
1220         # received ip.dst should be equal to expected sidlist[lastentry]
1221         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1222         # rx'ed seglist should be equal to expected seglist
1223         self.assertEqual(rx_srh.addresses, tx_seglist)
1224         # segleft should be equal to size expected seglist-1
1225         self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1226         # segleft should be equal to lastentry
1227         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1228
1229         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1230         # except for the hop-limit field
1231         #   -> update tx'ed hlim to the expected hlim
1232         tx_ip.hlim = tx_ip.hlim - 1
1233
1234         self.assertEqual(rx_srh.payload, tx_ip)
1235
1236         self.logger.debug("packet verification: SUCCESS")
1237
1238     def compare_rx_tx_packet_T_Encaps_IPv4(self, tx_pkt, rx_pkt):
1239         """ Compare input and output packet after passing T.Encaps for IPv4
1240
1241         :param tx_pkt: transmitted packet
1242         :param rx_pkt: received packet
1243         """
1244         # T.Encaps for IPv4 updates the headers as follows:
1245         # SR Policy seglist (S3, S2, S1)
1246         # SR Policy source C
1247         # IPv4:
1248         # in: IPv4(A, B2)
1249         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)IPv4(A, B2)
1250
1251         # get first (outer) IPv6 header of rx'ed packet
1252         rx_ip = rx_pkt.getlayer(IPv6)
1253         rx_srh = None
1254
1255         tx_ip = tx_pkt.getlayer(IP)
1256
1257         # expected segment-list
1258         seglist = self.sr_policy.segments
1259         # reverse list to get order as in SRH
1260         tx_seglist = seglist[::-1]
1261
1262         # get source address of SR Policy
1263         sr_policy_source = self.sr_policy.source
1264
1265         # checks common to cases tx with and without SRH
1266         # rx'ed packet should have SRH and IPv4 header
1267         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1268         self.assertTrue(rx_ip.payload.haslayer(IP))
1269         # get SRH
1270         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1271
1272         # received ip.src should be equal to SR Policy source
1273         self.assertEqual(rx_ip.src, sr_policy_source)
1274         # received ip.dst should be equal to sidlist[lastentry]
1275         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1276         # rx'ed seglist should be equal to seglist
1277         self.assertEqual(rx_srh.addresses, tx_seglist)
1278         # segleft should be equal to size seglist-1
1279         self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1280         # segleft should be equal to lastentry
1281         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1282
1283         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1284         # except for the ttl field and ip checksum
1285         #   -> adjust tx'ed ttl to expected ttl
1286         tx_ip.ttl = tx_ip.ttl - 1
1287         #   -> set tx'ed ip checksum to None and let scapy recompute
1288         tx_ip.chksum = None
1289         # read back the pkt (with str()) to force computing these fields
1290         # probably other ways to accomplish this are possible
1291         tx_ip = IP(str(tx_ip))
1292
1293         self.assertEqual(rx_srh.payload, tx_ip)
1294
1295         self.logger.debug("packet verification: SUCCESS")
1296
1297     def compare_rx_tx_packet_T_Encaps_L2(self, tx_pkt, rx_pkt):
1298         """ Compare input and output packet after passing T.Encaps for L2
1299
1300         :param tx_pkt: transmitted packet
1301         :param rx_pkt: received packet
1302         """
1303         # T.Encaps for L2 updates the headers as follows:
1304         # SR Policy seglist (S3, S2, S1)
1305         # SR Policy source C
1306         # L2:
1307         # in: L2
1308         # out: IPv6(C, S1)SRH(S3, S2, S1; SL=2)L2
1309
1310         # get first (outer) IPv6 header of rx'ed packet
1311         rx_ip = rx_pkt.getlayer(IPv6)
1312         rx_srh = None
1313
1314         tx_ether = tx_pkt.getlayer(Ether)
1315
1316         # expected segment-list
1317         seglist = self.sr_policy.segments
1318         # reverse list to get order as in SRH
1319         tx_seglist = seglist[::-1]
1320
1321         # get source address of SR Policy
1322         sr_policy_source = self.sr_policy.source
1323
1324         # rx'ed packet should have SRH
1325         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1326         # get SRH
1327         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1328
1329         # received ip.src should be equal to SR Policy source
1330         self.assertEqual(rx_ip.src, sr_policy_source)
1331         # received ip.dst should be equal to sidlist[lastentry]
1332         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1333         # rx'ed seglist should be equal to seglist
1334         self.assertEqual(rx_srh.addresses, tx_seglist)
1335         # segleft should be equal to size seglist-1
1336         self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1337         # segleft should be equal to lastentry
1338         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1339         # nh should be "No Next Header" (59)
1340         self.assertEqual(rx_srh.nh, 59)
1341
1342         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
1343         self.assertEqual(Ether(str(rx_srh.payload)), tx_ether)
1344
1345         self.logger.debug("packet verification: SUCCESS")
1346
1347     def compare_rx_tx_packet_T_Insert(self, tx_pkt, rx_pkt):
1348         """ Compare input and output packet after passing T.Insert
1349
1350         :param tx_pkt: transmitted packet
1351         :param rx_pkt: received packet
1352         """
1353         # T.Insert updates the headers as follows:
1354         # IPv6:
1355         # in: IPv6(A, B2)
1356         # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)
1357         # IPv6 + SRH:
1358         # in: IPv6(A, B2)SRH(B3, B2, B1; SL=1)
1359         # out: IPv6(A, S1)SRH(B2, S3, S2, S1; SL=3)SRH(B3, B2, B1; SL=1)
1360
1361         # get first (outer) IPv6 header of rx'ed packet
1362         rx_ip = rx_pkt.getlayer(IPv6)
1363         rx_srh = None
1364         rx_ip2 = None
1365         rx_srh2 = None
1366         rx_ip3 = None
1367         rx_udp = rx_pkt[UDP]
1368
1369         tx_ip = tx_pkt.getlayer(IPv6)
1370         tx_srh = None
1371         tx_ip2 = None
1372         # some packets have been tx'ed with an SRH, some without it
1373         # get SRH if tx'ed packet has it
1374         if tx_pkt.haslayer(IPv6ExtHdrSegmentRouting):
1375             tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1376             tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1377         tx_udp = tx_pkt[UDP]
1378
1379         # expected segment-list (make copy of SR Policy segment list)
1380         seglist = self.sr_policy.segments[:]
1381         # expected seglist has initial dest addr as last segment
1382         seglist.append(tx_ip.dst)
1383         # reverse list to get order as in SRH
1384         tx_seglist = seglist[::-1]
1385
1386         # get source address of SR Policy
1387         sr_policy_source = self.sr_policy.source
1388
1389         # checks common to cases tx with and without SRH
1390         # rx'ed packet should have SRH and only one IPv6 header
1391         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1392         self.assertFalse(rx_ip.payload.haslayer(IPv6))
1393         # get SRH
1394         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1395
1396         # rx'ed ip.src should be equal to tx'ed ip.src
1397         self.assertEqual(rx_ip.src, tx_ip.src)
1398         # rx'ed ip.dst should be equal to sidlist[lastentry]
1399         self.assertEqual(rx_ip.dst, tx_seglist[-1])
1400
1401         # rx'ed seglist should be equal to expected seglist
1402         self.assertEqual(rx_srh.addresses, tx_seglist)
1403         # segleft should be equal to size(expected seglist)-1
1404         self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
1405         # segleft should be equal to lastentry
1406         self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
1407
1408         if tx_srh:  # packet was tx'ed with SRH
1409             # packet should have 2nd SRH
1410             self.assertTrue(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting))
1411             # get 2nd SRH
1412             rx_srh2 = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting, 2)
1413
1414             # rx'ed srh2.addresses should be equal to tx'ed srh.addresses
1415             self.assertEqual(rx_srh2.addresses, tx_srh.addresses)
1416             # rx'ed srh2.segleft should be equal to tx'ed srh.segleft
1417             self.assertEqual(rx_srh2.segleft, tx_srh.segleft)
1418             # rx'ed srh2.lastentry should be equal to tx'ed srh.lastentry
1419             self.assertEqual(rx_srh2.lastentry, tx_srh.lastentry)
1420
1421         else:  # packet was tx'ed without SRH
1422             # rx packet should have no other SRH
1423             self.assertFalse(rx_srh.payload.haslayer(IPv6ExtHdrSegmentRouting))
1424
1425         # UDP layer should be unchanged
1426         self.assertEqual(rx_udp, tx_udp)
1427
1428         self.logger.debug("packet verification: SUCCESS")
1429
1430     def compare_rx_tx_packet_End(self, tx_pkt, rx_pkt):
1431         """ Compare input and output packet after passing End (without PSP)
1432
1433         :param tx_pkt: transmitted packet
1434         :param rx_pkt: received packet
1435         """
1436         # End (no PSP) updates the headers as follows:
1437         # IPv6 + SRH:
1438         # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2)
1439         # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1440
1441         # get first (outer) IPv6 header of rx'ed packet
1442         rx_ip = rx_pkt.getlayer(IPv6)
1443         rx_srh = None
1444         rx_ip2 = None
1445         rx_udp = rx_pkt[UDP]
1446
1447         tx_ip = tx_pkt.getlayer(IPv6)
1448         # we know the packet has been tx'ed
1449         # with an inner IPv6 header and an SRH
1450         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1451         tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1452         tx_udp = tx_pkt[UDP]
1453
1454         # common checks, regardless of tx segleft value
1455         # rx'ed packet should have 2nd IPv6 header
1456         self.assertTrue(rx_ip.payload.haslayer(IPv6))
1457         # get second (inner) IPv6 header
1458         rx_ip2 = rx_pkt.getlayer(IPv6, 2)
1459
1460         if tx_ip.segleft > 0:
1461             # SRH should NOT have been popped:
1462             #   End SID without PSP does not pop SRH if segleft>0
1463             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1464             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1465
1466             # received ip.src should be equal to expected ip.src
1467             self.assertEqual(rx_ip.src, tx_ip.src)
1468             # sidlist should be unchanged
1469             self.assertEqual(rx_srh.addresses, tx_srh.addresses)
1470             # segleft should have been decremented
1471             self.assertEqual(rx_srh.segleft, tx_srh.segleft-1)
1472             # received ip.dst should be equal to sidlist[segleft]
1473             self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
1474             # lastentry should be unchanged
1475             self.assertEqual(rx_srh.lastentry, tx_srh.lastentry)
1476             # inner IPv6 packet (ip2) should be unchanged
1477             self.assertEqual(rx_ip2.src, tx_ip2.src)
1478             self.assertEqual(rx_ip2.dst, tx_ip2.dst)
1479         # else:  # tx_ip.segleft == 0
1480             # TODO: Does this work with 2 SRHs in ingress packet?
1481
1482         # UDP layer should be unchanged
1483         self.assertEqual(rx_udp, tx_udp)
1484
1485         self.logger.debug("packet verification: SUCCESS")
1486
1487     def compare_rx_tx_packet_End_PSP(self, tx_pkt, rx_pkt):
1488         """ Compare input and output packet after passing End with PSP
1489
1490         :param tx_pkt: transmitted packet
1491         :param rx_pkt: received packet
1492         """
1493         # End (PSP) updates the headers as follows:
1494         # IPv6 + SRH (SL>1):
1495         # in: IPv6(A, S1)SRH(S3, S2, S1; SL=2)
1496         # out: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1497         # IPv6 + SRH (SL=1):
1498         # in: IPv6(A, S2)SRH(S3, S2, S1; SL=1)
1499         # out: IPv6(A, S3)
1500
1501         # get first (outer) IPv6 header of rx'ed packet
1502         rx_ip = rx_pkt.getlayer(IPv6)
1503         rx_srh = None
1504         rx_ip2 = None
1505         rx_udp = rx_pkt[UDP]
1506
1507         tx_ip = tx_pkt.getlayer(IPv6)
1508         # we know the packet has been tx'ed
1509         # with an inner IPv6 header and an SRH
1510         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1511         tx_srh = tx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1512         tx_udp = tx_pkt[UDP]
1513
1514         # common checks, regardless of tx segleft value
1515         self.assertTrue(rx_ip.payload.haslayer(IPv6))
1516         rx_ip2 = rx_pkt.getlayer(IPv6, 2)
1517         # inner IPv6 packet (ip2) should be unchanged
1518         self.assertEqual(rx_ip2.src, tx_ip2.src)
1519         self.assertEqual(rx_ip2.dst, tx_ip2.dst)
1520
1521         if tx_ip.segleft > 1:
1522             # SRH should NOT have been popped:
1523             #   End SID with PSP does not pop SRH if segleft>1
1524             # rx'ed packet should have SRH
1525             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1526             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
1527
1528             # received ip.src should be equal to expected ip.src
1529             self.assertEqual(rx_ip.src, tx_ip.src)
1530             # sidlist should be unchanged
1531             self.assertEqual(rx_srh.addresses, tx_srh.addresses)
1532             # segleft should have been decremented
1533             self.assertEqual(rx_srh.segleft, tx_srh.segleft-1)
1534             # received ip.dst should be equal to sidlist[segleft]
1535             self.assertEqual(rx_ip.dst, rx_srh.addresses[rx_srh.segleft])
1536             # lastentry should be unchanged
1537             self.assertEqual(rx_srh.lastentry, tx_srh.lastentry)
1538
1539         else:  # tx_ip.segleft <= 1
1540             # SRH should have been popped:
1541             #   End SID with PSP and segleft=1 pops SRH
1542             # the two IPv6 headers are still present
1543             # outer IPv6 header has DA == last segment of popped SRH
1544             # SRH should not be present
1545             self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1546             # outer IPv6 header ip.src should be equal to tx'ed ip.src
1547             self.assertEqual(rx_ip.src, tx_ip.src)
1548             # outer IPv6 header ip.dst should be = to tx'ed sidlist[segleft-1]
1549             self.assertEqual(rx_ip.dst, tx_srh.addresses[tx_srh.segleft-1])
1550
1551         # UDP layer should be unchanged
1552         self.assertEqual(rx_udp, tx_udp)
1553
1554         self.logger.debug("packet verification: SUCCESS")
1555
1556     def compare_rx_tx_packet_End_DX6(self, tx_pkt, rx_pkt):
1557         """ Compare input and output packet after passing End.DX6
1558
1559         :param tx_pkt: transmitted packet
1560         :param rx_pkt: received packet
1561         """
1562         # End.DX6 updates the headers as follows:
1563         # IPv6 + SRH (SL=0):
1564         # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv6(B, D)
1565         # out: IPv6(B, D)
1566         # IPv6:
1567         # in: IPv6(A, S3)IPv6(B, D)
1568         # out: IPv6(B, D)
1569
1570         # get first (outer) IPv6 header of rx'ed packet
1571         rx_ip = rx_pkt.getlayer(IPv6)
1572
1573         tx_ip = tx_pkt.getlayer(IPv6)
1574         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
1575
1576         # verify if rx'ed packet has no SRH
1577         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1578
1579         # the whole rx_ip pkt should be equal to tx_ip2
1580         # except for the hlim field
1581         #   -> adjust tx'ed hlim to expected hlim
1582         tx_ip2.hlim = tx_ip2.hlim - 1
1583
1584         self.assertEqual(rx_ip, tx_ip2)
1585
1586         self.logger.debug("packet verification: SUCCESS")
1587
1588     def compare_rx_tx_packet_End_DX4(self, tx_pkt, rx_pkt):
1589         """ Compare input and output packet after passing End.DX4
1590
1591         :param tx_pkt: transmitted packet
1592         :param rx_pkt: received packet
1593         """
1594         # End.DX4 updates the headers as follows:
1595         # IPv6 + SRH (SL=0):
1596         # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)IPv4(B, D)
1597         # out: IPv4(B, D)
1598         # IPv6:
1599         # in: IPv6(A, S3)IPv4(B, D)
1600         # out: IPv4(B, D)
1601
1602         # get IPv4 header of rx'ed packet
1603         rx_ip = rx_pkt.getlayer(IP)
1604
1605         tx_ip = tx_pkt.getlayer(IPv6)
1606         tx_ip2 = tx_pkt.getlayer(IP)
1607
1608         # verify if rx'ed packet has no SRH
1609         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1610
1611         # the whole rx_ip pkt should be equal to tx_ip2
1612         # except for the ttl field and ip checksum
1613         #   -> adjust tx'ed ttl to expected ttl
1614         tx_ip2.ttl = tx_ip2.ttl - 1
1615         #   -> set tx'ed ip checksum to None and let scapy recompute
1616         tx_ip2.chksum = None
1617         # read back the pkt (with str()) to force computing these fields
1618         # probably other ways to accomplish this are possible
1619         tx_ip2 = IP(str(tx_ip2))
1620
1621         self.assertEqual(rx_ip, tx_ip2)
1622
1623         self.logger.debug("packet verification: SUCCESS")
1624
1625     def compare_rx_tx_packet_End_DX2(self, tx_pkt, rx_pkt):
1626         """ Compare input and output packet after passing End.DX2
1627
1628         :param tx_pkt: transmitted packet
1629         :param rx_pkt: received packet
1630         """
1631         # End.DX2 updates the headers as follows:
1632         # IPv6 + SRH (SL=0):
1633         # in: IPv6(A, S3)SRH(S3, S2, S1; SL=0)L2
1634         # out: L2
1635         # IPv6:
1636         # in: IPv6(A, S3)L2
1637         # out: L2
1638
1639         # get IPv4 header of rx'ed packet
1640         rx_eth = rx_pkt.getlayer(Ether)
1641
1642         tx_ip = tx_pkt.getlayer(IPv6)
1643         # we can't just get the 2nd Ether layer
1644         # get the Raw content and dissect it as Ether
1645         tx_eth1 = Ether(str(tx_pkt[Raw]))
1646
1647         # verify if rx'ed packet has no SRH
1648         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
1649
1650         # the whole rx_eth pkt should be equal to tx_eth1
1651         self.assertEqual(rx_eth, tx_eth1)
1652
1653         self.logger.debug("packet verification: SUCCESS")
1654
1655     def create_stream(self, src_if, dst_if, packet_header, packet_sizes,
1656                       count):
1657         """Create SRv6 input packet stream for defined interface.
1658
1659         :param VppInterface src_if: Interface to create packet stream for
1660         :param VppInterface dst_if: destination interface of packet stream
1661         :param packet_header: Layer3 scapy packet headers,
1662                 L2 is added when not provided,
1663                 Raw(payload) with packet_info is added
1664         :param list packet_sizes: packet stream pckt sizes,sequentially applied
1665                to packets in stream have
1666         :param int count: number of packets in packet stream
1667         :return: list of packets
1668         """
1669         self.logger.info("Creating packets")
1670         pkts = []
1671         for i in range(0, count-1):
1672             payload_info = self.create_packet_info(src_if, dst_if)
1673             self.logger.debug(
1674                 "Creating packet with index %d" % (payload_info.index))
1675             payload = self.info_to_payload(payload_info)
1676             # add L2 header if not yet provided in packet_header
1677             if packet_header.getlayer(0).name == 'Ethernet':
1678                 p = (packet_header /
1679                      Raw(payload))
1680             else:
1681                 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
1682                      packet_header /
1683                      Raw(payload))
1684             size = packet_sizes[i % len(packet_sizes)]
1685             self.logger.debug("Packet size %d" % (size))
1686             self.extend_packet(p, size)
1687             # we need to store the packet with the automatic fields computed
1688             # read back the dumped packet (with str())
1689             # to force computing these fields
1690             # probably other ways are possible
1691             p = Ether(str(p))
1692             payload_info.data = p.copy()
1693             self.logger.debug(ppp("Created packet:", p))
1694             pkts.append(p)
1695         self.logger.info("Done creating packets")
1696         return pkts
1697
1698     def send_and_verify_pkts(self, input, pkts, output, compare_func):
1699         """Send packets and verify received packets using compare_func
1700
1701         :param input: ingress interface of DUT
1702         :param pkts: list of packets to transmit
1703         :param output: egress interface of DUT
1704         :param compare_func: function to compare in and out packets
1705         """
1706         # add traffic stream to input interface
1707         input.add_stream(pkts)
1708
1709         # enable capture on all interfaces
1710         self.pg_enable_capture(self.pg_interfaces)
1711
1712         # start traffic
1713         self.logger.info("Starting traffic")
1714         self.pg_start()
1715
1716         # get output capture
1717         self.logger.info("Getting packet capture")
1718         capture = output.get_capture()
1719
1720         # assert nothing was captured on input interface
1721         input.assert_nothing_captured()
1722
1723         # verify captured packets
1724         self.verify_captured_pkts(output, capture, compare_func)
1725
1726     def create_packet_header_IPv6(self, dst):
1727         """Create packet header: IPv6 header, UDP header
1728
1729         :param dst: IPv6 destination address
1730
1731         IPv6 source address is 1234::1
1732         UDP source port and destination port are 1234
1733         """
1734
1735         p = (IPv6(src='1234::1', dst=dst) /
1736              UDP(sport=1234, dport=1234))
1737         return p
1738
1739     def create_packet_header_IPv6_SRH(self, sidlist, segleft):
1740         """Create packet header: IPv6 header with SRH, UDP header
1741
1742         :param list sidlist: segment list
1743         :param int segleft: segments-left field value
1744
1745         IPv6 destination address is set to sidlist[segleft]
1746         IPv6 source addresses are 1234::1 and 4321::1
1747         UDP source port and destination port are 1234
1748         """
1749
1750         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1751              IPv6ExtHdrSegmentRouting(addresses=sidlist) /
1752              UDP(sport=1234, dport=1234))
1753         return p
1754
1755     def create_packet_header_IPv6_SRH_IPv6(self, dst, sidlist, segleft):
1756         """Create packet header: IPv6 encapsulated in SRv6:
1757         IPv6 header with SRH, IPv6 header, UDP header
1758
1759         :param ipv6address dst: inner IPv6 destination address
1760         :param list sidlist: segment list of outer IPv6 SRH
1761         :param int segleft: segments-left field of outer IPv6 SRH
1762
1763         Outer IPv6 destination address is set to sidlist[segleft]
1764         IPv6 source addresses are 1234::1 and 4321::1
1765         UDP source port and destination port are 1234
1766         """
1767
1768         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1769              IPv6ExtHdrSegmentRouting(addresses=sidlist,
1770                                       segleft=segleft, nh=41) /
1771              IPv6(src='4321::1', dst=dst) /
1772              UDP(sport=1234, dport=1234))
1773         return p
1774
1775     def create_packet_header_IPv6_IPv6(self, dst_inner, dst_outer):
1776         """Create packet header: IPv6 encapsulated in IPv6:
1777         IPv6 header, IPv6 header, UDP header
1778
1779         :param ipv6address dst_inner: inner IPv6 destination address
1780         :param ipv6address dst_outer: outer IPv6 destination address
1781
1782         IPv6 source addresses are 1234::1 and 4321::1
1783         UDP source port and destination port are 1234
1784         """
1785
1786         p = (IPv6(src='1234::1', dst=dst_outer) /
1787              IPv6(src='4321::1', dst=dst_inner) /
1788              UDP(sport=1234, dport=1234))
1789         return p
1790
1791     def create_packet_header_IPv6_SRH_SRH_IPv6(self, dst, sidlist1, segleft1,
1792                                                sidlist2, segleft2):
1793         """Create packet header: IPv6 encapsulated in SRv6 with 2 SRH:
1794         IPv6 header with SRH, 2nd SRH, IPv6 header, UDP header
1795
1796         :param ipv6address dst: inner IPv6 destination address
1797         :param list sidlist1: segment list of outer IPv6 SRH
1798         :param int segleft1: segments-left field of outer IPv6 SRH
1799         :param list sidlist2: segment list of inner IPv6 SRH
1800         :param int segleft2: segments-left field of inner IPv6 SRH
1801
1802         Outer IPv6 destination address is set to sidlist[segleft]
1803         IPv6 source addresses are 1234::1 and 4321::1
1804         UDP source port and destination port are 1234
1805         """
1806
1807         p = (IPv6(src='1234::1', dst=sidlist1[segleft1]) /
1808              IPv6ExtHdrSegmentRouting(addresses=sidlist1,
1809                                       segleft=segleft1, nh=43) /
1810              IPv6ExtHdrSegmentRouting(addresses=sidlist2,
1811                                       segleft=segleft2, nh=41) /
1812              IPv6(src='4321::1', dst=dst) /
1813              UDP(sport=1234, dport=1234))
1814         return p
1815
1816     def create_packet_header_IPv4(self, dst):
1817         """Create packet header: IPv4 header, UDP header
1818
1819         :param dst: IPv4 destination address
1820
1821         IPv4 source address is 123.1.1.1
1822         UDP source port and destination port are 1234
1823         """
1824
1825         p = (IP(src='123.1.1.1', dst=dst) /
1826              UDP(sport=1234, dport=1234))
1827         return p
1828
1829     def create_packet_header_IPv6_IPv4(self, dst_inner, dst_outer):
1830         """Create packet header: IPv4 encapsulated in IPv6:
1831         IPv6 header, IPv4 header, UDP header
1832
1833         :param ipv4address dst_inner: inner IPv4 destination address
1834         :param ipv6address dst_outer: outer IPv6 destination address
1835
1836         IPv6 source address is 1234::1
1837         IPv4 source address is 123.1.1.1
1838         UDP source port and destination port are 1234
1839         """
1840
1841         p = (IPv6(src='1234::1', dst=dst_outer) /
1842              IP(src='123.1.1.1', dst=dst_inner) /
1843              UDP(sport=1234, dport=1234))
1844         return p
1845
1846     def create_packet_header_IPv6_SRH_IPv4(self, dst, sidlist, segleft):
1847         """Create packet header: IPv4 encapsulated in SRv6:
1848         IPv6 header with SRH, IPv4 header, UDP header
1849
1850         :param ipv4address dst: inner IPv4 destination address
1851         :param list sidlist: segment list of outer IPv6 SRH
1852         :param int segleft: segments-left field of outer IPv6 SRH
1853
1854         Outer IPv6 destination address is set to sidlist[segleft]
1855         IPv6 source address is 1234::1
1856         IPv4 source address is 123.1.1.1
1857         UDP source port and destination port are 1234
1858         """
1859
1860         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1861              IPv6ExtHdrSegmentRouting(addresses=sidlist,
1862                                       segleft=segleft, nh=4) /
1863              IP(src='123.1.1.1', dst=dst) /
1864              UDP(sport=1234, dport=1234))
1865         return p
1866
1867     def create_packet_header_L2(self, vlan=0):
1868         """Create packet header: L2 header
1869
1870         :param vlan: if vlan!=0 then add 802.1q header
1871         """
1872         # Note: the dst addr ('00:55:44:33:22:11') is used in
1873         # the compare function compare_rx_tx_packet_T_Encaps_L2
1874         # to detect presence of L2 in SRH payload
1875         p = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
1876         etype = 0x8137  # IPX
1877         if vlan:
1878             # add 802.1q layer
1879             p /= Dot1Q(vlan=vlan, type=etype)
1880         else:
1881             p.type = etype
1882         return p
1883
1884     def create_packet_header_IPv6_SRH_L2(self, sidlist, segleft, vlan=0):
1885         """Create packet header: L2 encapsulated in SRv6:
1886         IPv6 header with SRH, L2
1887
1888         :param list sidlist: segment list of outer IPv6 SRH
1889         :param int segleft: segments-left field of outer IPv6 SRH
1890         :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
1891
1892         Outer IPv6 destination address is set to sidlist[segleft]
1893         IPv6 source address is 1234::1
1894         """
1895         eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
1896         etype = 0x8137  # IPX
1897         if vlan:
1898             # add 802.1q layer
1899             eth /= Dot1Q(vlan=vlan, type=etype)
1900         else:
1901             eth.type = etype
1902
1903         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
1904              IPv6ExtHdrSegmentRouting(addresses=sidlist,
1905                                       segleft=segleft, nh=59) /
1906              eth)
1907         return p
1908
1909     def create_packet_header_IPv6_L2(self, dst_outer, vlan=0):
1910         """Create packet header: L2 encapsulated in IPv6:
1911         IPv6 header, L2
1912
1913         :param ipv6address dst_outer: outer IPv6 destination address
1914         :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
1915         """
1916         eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
1917         etype = 0x8137  # IPX
1918         if vlan:
1919             # add 802.1q layer
1920             eth /= Dot1Q(vlan=vlan, type=etype)
1921         else:
1922             eth.type = etype
1923
1924         p = (IPv6(src='1234::1', dst=dst_outer, nh=59) / eth)
1925         return p
1926
1927     def get_payload_info(self, packet):
1928         """ Extract the payload_info from the packet
1929         """
1930         # in most cases, payload_info is in packet[Raw]
1931         # but packet[Raw] gives the complete payload
1932         # (incl L2 header) for the T.Encaps L2 case
1933         try:
1934             payload_info = self.payload_to_info(str(packet[Raw]))
1935
1936         except:
1937             # remote L2 header from packet[Raw]:
1938             # take packet[Raw], convert it to an Ether layer
1939             # and then extract Raw from it
1940             payload_info = self.payload_to_info(
1941                 str(Ether(str(packet[Raw]))[Raw]))
1942
1943         return payload_info
1944
1945     def verify_captured_pkts(self, dst_if, capture, compare_func):
1946         """
1947         Verify captured packet stream for specified interface.
1948         Compare ingress with egress packets using the specified compare fn
1949
1950         :param dst_if: egress interface of DUT
1951         :param capture: captured packets
1952         :param compare_func: function to compare in and out packet
1953         """
1954         self.logger.info("Verifying capture on interface %s using function %s"
1955                          % (dst_if.name, compare_func.func_name))
1956
1957         last_info = dict()
1958         for i in self.pg_interfaces:
1959             last_info[i.sw_if_index] = None
1960         dst_sw_if_index = dst_if.sw_if_index
1961
1962         for packet in capture:
1963             try:
1964                 # extract payload_info from packet's payload
1965                 payload_info = self.get_payload_info(packet)
1966                 packet_index = payload_info.index
1967
1968                 self.logger.debug("Verifying packet with index %d"
1969                                   % (packet_index))
1970                 # packet should have arrived on the expected interface
1971                 self.assertEqual(payload_info.dst, dst_sw_if_index)
1972                 self.logger.debug(
1973                     "Got packet on interface %s: src=%u (idx=%u)" %
1974                     (dst_if.name, payload_info.src, packet_index))
1975
1976                 # search for payload_info with same src and dst if_index
1977                 # this will give us the transmitted packet
1978                 next_info = self.get_next_packet_info_for_interface2(
1979                     payload_info.src, dst_sw_if_index,
1980                     last_info[payload_info.src])
1981                 last_info[payload_info.src] = next_info
1982                 # next_info should not be None
1983                 self.assertTrue(next_info is not None)
1984                 # index of tx and rx packets should be equal
1985                 self.assertEqual(packet_index, next_info.index)
1986                 # data field of next_info contains the tx packet
1987                 txed_packet = next_info.data
1988
1989                 self.logger.debug(ppp("Transmitted packet:",
1990                                       txed_packet))  # ppp=Pretty Print Packet
1991
1992                 self.logger.debug(ppp("Received packet:", packet))
1993
1994                 # compare rcvd packet with expected packet using compare_func
1995                 compare_func(txed_packet, packet)
1996
1997             except:
1998                 print packet.command()
1999                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2000                 raise
2001
2002         # have all expected packets arrived?
2003         for i in self.pg_interfaces:
2004             remaining_packet = self.get_next_packet_info_for_interface2(
2005                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
2006             self.assertTrue(remaining_packet is None,
2007                             "Interface %s: Packet expected from interface %s "
2008                             "didn't arrive" % (dst_if.name, i.name))
2009
2010
2011 if __name__ == '__main__':
2012     unittest.main(testRunner=VppTestRunner)