MAP: Convert from DPO to input feature.
[vpp.git] / test / test_srv6_ad.py
1 #!/usr/bin/env python
2
3 import unittest
4 import binascii
5 from socket import AF_INET6
6
7 from framework import VppTestCase, VppTestRunner
8 from vpp_ip import DpoProto
9 from vpp_ip_route import VppIpRoute, VppRoutePath, VppIpTable
10 from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \
11     SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes
12
13 from scapy.packet import Raw
14 from scapy.layers.l2 import Ether, Dot1Q
15 from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
16 from scapy.layers.inet import IP, UDP
17
18 from scapy.utils import inet_pton, inet_ntop
19
20 from util import ppp
21
22
23 class TestSRv6(VppTestCase):
24     """ SRv6 Dynamic Proxy plugin Test Case """
25
26     @classmethod
27     def setUpClass(self):
28         super(TestSRv6, self).setUpClass()
29
30     def setUp(self):
31         """ Perform test setup before each test case.
32         """
33         super(TestSRv6, self).setUp()
34
35         # packet sizes, inclusive L2 overhead
36         self.pg_packet_sizes = [64, 512, 1518, 9018]
37
38         # reset packet_infos
39         self.reset_packet_infos()
40
41     def tearDown(self):
42         """ Clean up test setup after each test case.
43         """
44         self.teardown_interfaces()
45
46         super(TestSRv6, self).tearDown()
47
48     def configure_interface(self,
49                             interface,
50                             ipv6=False, ipv4=False,
51                             ipv6_table_id=0, ipv4_table_id=0):
52         """ Configure interface.
53         :param ipv6: configure IPv6 on interface
54         :param ipv4: configure IPv4 on interface
55         :param ipv6_table_id: FIB table_id for IPv6
56         :param ipv4_table_id: FIB table_id for IPv4
57         """
58         self.logger.debug("Configuring interface %s" % (interface.name))
59         if ipv6:
60             self.logger.debug("Configuring IPv6")
61             interface.set_table_ip6(ipv6_table_id)
62             interface.config_ip6()
63             interface.resolve_ndp(timeout=5)
64         if ipv4:
65             self.logger.debug("Configuring IPv4")
66             interface.set_table_ip4(ipv4_table_id)
67             interface.config_ip4()
68             interface.resolve_arp()
69         interface.admin_up()
70
71     def setup_interfaces(self, ipv6=[], ipv4=[],
72                          ipv6_table_id=[], ipv4_table_id=[]):
73         """ Create and configure interfaces.
74
75         :param ipv6: list of interface IPv6 capabilities
76         :param ipv4: list of interface IPv4 capabilities
77         :param ipv6_table_id: list of intf IPv6 FIB table_ids
78         :param ipv4_table_id: list of intf IPv4 FIB table_ids
79         :returns: List of created interfaces.
80         """
81         # how many interfaces?
82         if len(ipv6):
83             count = len(ipv6)
84         else:
85             count = len(ipv4)
86         self.logger.debug("Creating and configuring %d interfaces" % (count))
87
88         # fill up ipv6 and ipv4 lists if needed
89         # not enabled (False) is the default
90         if len(ipv6) < count:
91             ipv6 += (count - len(ipv6)) * [False]
92         if len(ipv4) < count:
93             ipv4 += (count - len(ipv4)) * [False]
94
95         # fill up table_id lists if needed
96         # table_id 0 (global) is the default
97         if len(ipv6_table_id) < count:
98             ipv6_table_id += (count - len(ipv6_table_id)) * [0]
99         if len(ipv4_table_id) < count:
100             ipv4_table_id += (count - len(ipv4_table_id)) * [0]
101
102         # create 'count' pg interfaces
103         self.create_pg_interfaces(range(count))
104
105         # setup all interfaces
106         for i in range(count):
107             intf = self.pg_interfaces[i]
108             self.configure_interface(intf,
109                                      ipv6[i], ipv4[i],
110                                      ipv6_table_id[i], ipv4_table_id[i])
111
112         if any(ipv6):
113             self.logger.debug(self.vapi.cli("show ip6 neighbors"))
114         if any(ipv4):
115             self.logger.debug(self.vapi.cli("show ip arp"))
116         self.logger.debug(self.vapi.cli("show interface"))
117         self.logger.debug(self.vapi.cli("show hardware"))
118
119         return self.pg_interfaces
120
121     def teardown_interfaces(self):
122         """ Unconfigure and bring down interface.
123         """
124         self.logger.debug("Tearing down interfaces")
125         # tear down all interfaces
126         # AFAIK they cannot be deleted
127         for i in self.pg_interfaces:
128             self.logger.debug("Tear down interface %s" % (i.name))
129             i.admin_down()
130             i.unconfig()
131             i.set_table_ip4(0)
132             i.set_table_ip6(0)
133
134     def test_SRv6_End_AD_IPv6(self):
135         """ Test SRv6 End.AD behavior with IPv6 traffic.
136         """
137         self.src_addr = 'a0::'
138         self.sid_list = ['a1::', 'a2::a6', 'a3::']
139         self.test_sid_index = 1
140
141         # send traffic to one destination interface
142         # source and destination interfaces are IPv6 only
143         self.setup_interfaces(ipv6=[True, True])
144
145         # configure route to next segment
146         route = VppIpRoute(self, self.sid_list[self.test_sid_index + 1], 128,
147                            [VppRoutePath(self.pg0.remote_ip6,
148                                          self.pg0.sw_if_index,
149                                          proto=DpoProto.DPO_PROTO_IP6)],
150                            is_ip6=1)
151         route.add_vpp_config()
152
153         # configure SRv6 localSID behavior
154         cli_str = "sr localsid address " + \
155                   self.sid_list[self.test_sid_index] + \
156                   " behavior end.ad" + \
157                   " nh " + self.pg1.remote_ip6 + \
158                   " oif " + self.pg1.name + \
159                   " iif " + self.pg1.name
160         self.vapi.cli(cli_str)
161
162         # log the localsids
163         self.logger.debug(self.vapi.cli("show sr localsid"))
164
165         # send one packet per packet size
166         count = len(self.pg_packet_sizes)
167
168         # prepare IPv6 in SRv6 headers
169         packet_header1 = self.create_packet_header_IPv6_SRH_IPv6(
170             srcaddr=self.src_addr,
171             sidlist=self.sid_list[::-1],
172             segleft=len(self.sid_list) - self.test_sid_index - 1)
173
174         # generate packets (pg0->pg1)
175         pkts1 = self.create_stream(self.pg0, self.pg1, packet_header1,
176                                    self.pg_packet_sizes, count)
177
178         # send packets and verify received packets
179         self.send_and_verify_pkts(self.pg0, pkts1, self.pg1,
180                                   self.compare_rx_tx_packet_End_AD_IPv6_out)
181
182         # log the localsid counters
183         self.logger.info(self.vapi.cli("show sr localsid"))
184
185         # prepare IPv6 header for returning packets
186         packet_header2 = self.create_packet_header_IPv6()
187
188         # generate returning packets (pg1->pg0)
189         pkts2 = self.create_stream(self.pg1, self.pg0, packet_header2,
190                                    self.pg_packet_sizes, count)
191
192         # send packets and verify received packets
193         self.send_and_verify_pkts(self.pg1, pkts2, self.pg0,
194                                   self.compare_rx_tx_packet_End_AD_IPv6_in)
195
196         # log the localsid counters
197         self.logger.info(self.vapi.cli("show sr localsid"))
198
199         # remove SRv6 localSIDs
200         cli_str = "sr localsid del address " + \
201                   self.sid_list[self.test_sid_index]
202         self.vapi.cli(cli_str)
203
204         # cleanup interfaces
205         self.teardown_interfaces()
206
207     def compare_rx_tx_packet_End_AD_IPv6_out(self, tx_pkt, rx_pkt):
208         """ Compare input and output packet after passing End.AD with IPv6
209
210         :param tx_pkt: transmitted packet
211         :param rx_pkt: received packet
212         """
213
214         # get first (outer) IPv6 header of rx'ed packet
215         rx_ip = rx_pkt.getlayer(IPv6)
216
217         tx_ip = tx_pkt.getlayer(IPv6)
218         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
219
220         # verify if rx'ed packet has no SRH
221         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
222
223         # the whole rx_ip pkt should be equal to tx_ip2
224         # except for the hlim field
225         #   -> adjust tx'ed hlim to expected hlim
226         tx_ip2.hlim = tx_ip2.hlim - 1
227
228         self.assertEqual(rx_ip, tx_ip2)
229
230         self.logger.debug("packet verification: SUCCESS")
231
232     def compare_rx_tx_packet_End_AD_IPv6_in(self, tx_pkt, rx_pkt):
233         """ Compare input and output packet after passing End.AD
234
235         :param tx_pkt: transmitted packet
236         :param rx_pkt: received packet
237         """
238
239         # get first (outer) IPv6 header of rx'ed packet
240         rx_ip = rx_pkt.getlayer(IPv6)
241         # received ip.src should be equal to SR Policy source
242         self.assertEqual(rx_ip.src, self.src_addr)
243         # received ip.dst should be equal to expected sidlist next segment
244         self.assertEqual(rx_ip.dst, self.sid_list[self.test_sid_index + 1])
245
246         # rx'ed packet should have SRH
247         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
248
249         # get SRH
250         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
251         # rx'ed seglist should be equal to SID-list in reversed order
252         self.assertEqual(rx_srh.addresses, self.sid_list[::-1])
253         # segleft should be equal to previous segleft value minus 1
254         self.assertEqual(rx_srh.segleft,
255                          len(self.sid_list) - self.test_sid_index - 2)
256         # lastentry should be equal to the SID-list length minus 1
257         self.assertEqual(rx_srh.lastentry, len(self.sid_list) - 1)
258
259         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
260         # except for the hop-limit field
261         tx_ip = tx_pkt.getlayer(IPv6)
262         #   -> update tx'ed hlim to the expected hlim
263         tx_ip.hlim -= 1
264         #   -> check payload
265         self.assertEqual(rx_srh.payload, tx_ip)
266
267         self.logger.debug("packet verification: SUCCESS")
268
269     def test_SRv6_End_AD_IPv4(self):
270         """ Test SRv6 End.AD behavior with IPv4 traffic.
271         """
272         self.src_addr = 'a0::'
273         self.sid_list = ['a1::', 'a2::a4', 'a3::']
274         self.test_sid_index = 1
275
276         # send traffic to one destination interface
277         # source and destination interfaces are IPv6 only
278         self.setup_interfaces(ipv6=[True, False], ipv4=[False, True])
279
280         # configure route to next segment
281         route = VppIpRoute(self, self.sid_list[self.test_sid_index + 1], 128,
282                            [VppRoutePath(self.pg0.remote_ip6,
283                                          self.pg0.sw_if_index,
284                                          proto=DpoProto.DPO_PROTO_IP6)],
285                            is_ip6=1)
286         route.add_vpp_config()
287
288         # configure SRv6 localSID behavior
289         cli_str = "sr localsid address " + \
290                   self.sid_list[self.test_sid_index] + \
291                   " behavior end.ad" + \
292                   " nh " + self.pg1.remote_ip4 + \
293                   " oif " + self.pg1.name + \
294                   " iif " + self.pg1.name
295         self.vapi.cli(cli_str)
296
297         # log the localsids
298         self.logger.debug(self.vapi.cli("show sr localsid"))
299
300         # send one packet per packet size
301         count = len(self.pg_packet_sizes)
302
303         # prepare IPv4 in SRv6 headers
304         packet_header1 = self.create_packet_header_IPv6_SRH_IPv4(
305             srcaddr=self.src_addr,
306             sidlist=self.sid_list[::-1],
307             segleft=len(self.sid_list) - self.test_sid_index - 1)
308
309         # generate packets (pg0->pg1)
310         pkts1 = self.create_stream(self.pg0, self.pg1, packet_header1,
311                                    self.pg_packet_sizes, count)
312
313         # send packets and verify received packets
314         self.send_and_verify_pkts(self.pg0, pkts1, self.pg1,
315                                   self.compare_rx_tx_packet_End_AD_IPv4_out)
316
317         # log the localsid counters
318         self.logger.info(self.vapi.cli("show sr localsid"))
319
320         # prepare IPv6 header for returning packets
321         packet_header2 = self.create_packet_header_IPv4()
322
323         # generate returning packets (pg1->pg0)
324         pkts2 = self.create_stream(self.pg1, self.pg0, packet_header2,
325                                    self.pg_packet_sizes, count)
326
327         # send packets and verify received packets
328         self.send_and_verify_pkts(self.pg1, pkts2, self.pg0,
329                                   self.compare_rx_tx_packet_End_AD_IPv4_in)
330
331         # log the localsid counters
332         self.logger.info(self.vapi.cli("show sr localsid"))
333
334         # remove SRv6 localSIDs
335         cli_str = "sr localsid del address " + \
336                   self.sid_list[self.test_sid_index]
337         self.vapi.cli(cli_str)
338
339         # cleanup interfaces
340         self.teardown_interfaces()
341
342     def compare_rx_tx_packet_End_AD_IPv4_out(self, tx_pkt, rx_pkt):
343         """ Compare input and output packet after passing End.AD with IPv4
344
345         :param tx_pkt: transmitted packet
346         :param rx_pkt: received packet
347         """
348
349         # get IPv4 header of rx'ed packet
350         rx_ip = rx_pkt.getlayer(IP)
351
352         tx_ip = tx_pkt.getlayer(IPv6)
353         tx_ip2 = tx_pkt.getlayer(IP)
354
355         # verify if rx'ed packet has no SRH
356         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
357
358         # the whole rx_ip pkt should be equal to tx_ip2
359         # except for the ttl field and ip checksum
360         #   -> adjust tx'ed ttl to expected ttl
361         tx_ip2.ttl = tx_ip2.ttl - 1
362         #   -> set tx'ed ip checksum to None and let scapy recompute
363         tx_ip2.chksum = None
364         # read back the pkt (with str()) to force computing these fields
365         # probably other ways to accomplish this are possible
366         tx_ip2 = IP(str(tx_ip2))
367
368         self.assertEqual(rx_ip, tx_ip2)
369
370         self.logger.debug("packet verification: SUCCESS")
371
372     def compare_rx_tx_packet_End_AD_IPv4_in(self, tx_pkt, rx_pkt):
373         """ Compare input and output packet after passing End.AD
374
375         :param tx_pkt: transmitted packet
376         :param rx_pkt: received packet
377         """
378
379         # get first (outer) IPv6 header of rx'ed packet
380         rx_ip = rx_pkt.getlayer(IPv6)
381         # received ip.src should be equal to SR Policy source
382         self.assertEqual(rx_ip.src, self.src_addr)
383         # received ip.dst should be equal to expected sidlist next segment
384         self.assertEqual(rx_ip.dst, self.sid_list[self.test_sid_index + 1])
385
386         # rx'ed packet should have SRH
387         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
388
389         # get SRH
390         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
391         # rx'ed seglist should be equal to SID-list in reversed order
392         self.assertEqual(rx_srh.addresses, self.sid_list[::-1])
393         # segleft should be equal to previous segleft value minus 1
394         self.assertEqual(rx_srh.segleft,
395                          len(self.sid_list) - self.test_sid_index - 2)
396         # lastentry should be equal to the SID-list length minus 1
397         self.assertEqual(rx_srh.lastentry, len(self.sid_list) - 1)
398
399         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
400         # except for the ttl field and ip checksum
401         tx_ip = tx_pkt.getlayer(IP)
402         #   -> adjust tx'ed ttl to expected ttl
403         tx_ip.ttl = tx_ip.ttl - 1
404         #   -> set tx'ed ip checksum to None and let scapy recompute
405         tx_ip.chksum = None
406         #   -> read back the pkt (with str()) to force computing these fields
407         # probably other ways to accomplish this are possible
408         self.assertEqual(rx_srh.payload, IP(str(tx_ip)))
409
410         self.logger.debug("packet verification: SUCCESS")
411
412     def test_SRv6_End_AD_L2(self):
413         """ Test SRv6 End.AD behavior with L2 traffic.
414         """
415         self.src_addr = 'a0::'
416         self.sid_list = ['a1::', 'a2::a4', 'a3::']
417         self.test_sid_index = 1
418
419         # send traffic to one destination interface
420         # source and destination interfaces are IPv6 only
421         self.setup_interfaces(ipv6=[True, False])
422
423         # configure route to next segment
424         route = VppIpRoute(self, self.sid_list[self.test_sid_index + 1], 128,
425                            [VppRoutePath(self.pg0.remote_ip6,
426                                          self.pg0.sw_if_index,
427                                          proto=DpoProto.DPO_PROTO_IP6)],
428                            is_ip6=1)
429         route.add_vpp_config()
430
431         # configure SRv6 localSID behavior
432         cli_str = "sr localsid address " + \
433                   self.sid_list[self.test_sid_index] + \
434                   " behavior end.ad" + \
435                   " oif " + self.pg1.name + \
436                   " iif " + self.pg1.name
437         self.vapi.cli(cli_str)
438
439         # log the localsids
440         self.logger.debug(self.vapi.cli("show sr localsid"))
441
442         # send one packet per packet size
443         count = len(self.pg_packet_sizes)
444
445         # prepare L2 in SRv6 headers
446         packet_header1 = self.create_packet_header_IPv6_SRH_L2(
447             srcaddr=self.src_addr,
448             sidlist=self.sid_list[::-1],
449             segleft=len(self.sid_list) - self.test_sid_index - 1,
450             vlan=0)
451
452         # generate packets (pg0->pg1)
453         pkts1 = self.create_stream(self.pg0, self.pg1, packet_header1,
454                                    self.pg_packet_sizes, count)
455
456         # send packets and verify received packets
457         self.send_and_verify_pkts(self.pg0, pkts1, self.pg1,
458                                   self.compare_rx_tx_packet_End_AD_L2_out)
459
460         # log the localsid counters
461         self.logger.info(self.vapi.cli("show sr localsid"))
462
463         # prepare L2 header for returning packets
464         packet_header2 = self.create_packet_header_L2()
465
466         # generate returning packets (pg1->pg0)
467         pkts2 = self.create_stream(self.pg1, self.pg0, packet_header2,
468                                    self.pg_packet_sizes, count)
469
470         # send packets and verify received packets
471         self.send_and_verify_pkts(self.pg1, pkts2, self.pg0,
472                                   self.compare_rx_tx_packet_End_AD_L2_in)
473
474         # log the localsid counters
475         self.logger.info(self.vapi.cli("show sr localsid"))
476
477         # remove SRv6 localSIDs
478         cli_str = "sr localsid del address " + \
479                   self.sid_list[self.test_sid_index]
480         self.vapi.cli(cli_str)
481
482         # cleanup interfaces
483         self.teardown_interfaces()
484
485     def compare_rx_tx_packet_End_AD_L2_out(self, tx_pkt, rx_pkt):
486         """ Compare input and output packet after passing End.AD with L2
487
488         :param tx_pkt: transmitted packet
489         :param rx_pkt: received packet
490         """
491
492         # get IPv4 header of rx'ed packet
493         rx_eth = rx_pkt.getlayer(Ether)
494
495         tx_ip = tx_pkt.getlayer(IPv6)
496         # we can't just get the 2nd Ether layer
497         # get the Raw content and dissect it as Ether
498         tx_eth1 = Ether(str(tx_pkt[Raw]))
499
500         # verify if rx'ed packet has no SRH
501         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
502
503         # the whole rx_eth pkt should be equal to tx_eth1
504         self.assertEqual(rx_eth, tx_eth1)
505
506         self.logger.debug("packet verification: SUCCESS")
507
508     def compare_rx_tx_packet_End_AD_L2_in(self, tx_pkt, rx_pkt):
509         """ Compare input and output packet after passing End.AD
510
511         :param tx_pkt: transmitted packet
512         :param rx_pkt: received packet
513         """
514
515         ####
516         # get first (outer) IPv6 header of rx'ed packet
517         rx_ip = rx_pkt.getlayer(IPv6)
518         # received ip.src should be equal to SR Policy source
519         self.assertEqual(rx_ip.src, self.src_addr)
520         # received ip.dst should be equal to expected sidlist next segment
521         self.assertEqual(rx_ip.dst, self.sid_list[self.test_sid_index + 1])
522
523         # rx'ed packet should have SRH
524         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
525
526         # get SRH
527         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
528         # rx'ed seglist should be equal to SID-list in reversed order
529         self.assertEqual(rx_srh.addresses, self.sid_list[::-1])
530         # segleft should be equal to previous segleft value minus 1
531         self.assertEqual(rx_srh.segleft,
532                          len(self.sid_list) - self.test_sid_index - 2)
533         # lastentry should be equal to the SID-list length minus 1
534         self.assertEqual(rx_srh.lastentry, len(self.sid_list) - 1)
535
536         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
537         tx_ether = tx_pkt.getlayer(Ether)
538         self.assertEqual(Ether(str(rx_srh.payload)), tx_ether)
539
540         self.logger.debug("packet verification: SUCCESS")
541
542     def create_stream(self, src_if, dst_if, packet_header, packet_sizes,
543                       count):
544         """Create SRv6 input packet stream for defined interface.
545
546         :param VppInterface src_if: Interface to create packet stream for
547         :param VppInterface dst_if: destination interface of packet stream
548         :param packet_header: Layer3 scapy packet headers,
549                 L2 is added when not provided,
550                 Raw(payload) with packet_info is added
551         :param list packet_sizes: packet stream pckt sizes,sequentially applied
552                to packets in stream have
553         :param int count: number of packets in packet stream
554         :return: list of packets
555         """
556         self.logger.info("Creating packets")
557         pkts = []
558         for i in range(0, count - 1):
559             payload_info = self.create_packet_info(src_if, dst_if)
560             self.logger.debug(
561                 "Creating packet with index %d" % (payload_info.index))
562             payload = self.info_to_payload(payload_info)
563             # add L2 header if not yet provided in packet_header
564             if packet_header.getlayer(0).name == 'Ethernet':
565                 p = packet_header / Raw(payload)
566             else:
567                 p = Ether(dst=src_if.local_mac, src=src_if.remote_mac) / \
568                     packet_header / Raw(payload)
569             size = packet_sizes[i % len(packet_sizes)]
570             self.logger.debug("Packet size %d" % (size))
571             self.extend_packet(p, size)
572             # we need to store the packet with the automatic fields computed
573             # read back the dumped packet (with str())
574             # to force computing these fields
575             # probably other ways are possible
576             p = Ether(str(p))
577             payload_info.data = p.copy()
578             self.logger.debug(ppp("Created packet:", p))
579             pkts.append(p)
580         self.logger.info("Done creating packets")
581         return pkts
582
583     def send_and_verify_pkts(self, input, pkts, output, compare_func):
584         """Send packets and verify received packets using compare_func
585
586         :param input: ingress interface of DUT
587         :param pkts: list of packets to transmit
588         :param output: egress interface of DUT
589         :param compare_func: function to compare in and out packets
590         """
591         # add traffic stream to input interface
592         input.add_stream(pkts)
593
594         # enable capture on all interfaces
595         self.pg_enable_capture(self.pg_interfaces)
596
597         # start traffic
598         self.logger.info("Starting traffic")
599         self.pg_start()
600
601         # get output capture
602         self.logger.info("Getting packet capture")
603         capture = output.get_capture()
604
605         # assert nothing was captured on input interface
606         # input.assert_nothing_captured()
607
608         # verify captured packets
609         self.verify_captured_pkts(output, capture, compare_func)
610
611     def create_packet_header_IPv6(self):
612         """Create packet header: IPv6 header, UDP header
613
614         :param dst: IPv6 destination address
615
616         IPv6 source address is 1234::1
617         IPv6 destination address is 4321::1
618         UDP source port and destination port are 1234
619         """
620
621         p = IPv6(src='1234::1', dst='4321::1') / UDP(sport=1234, dport=1234)
622         return p
623
624     def create_packet_header_IPv6_SRH_IPv6(self, srcaddr, sidlist, segleft):
625         """Create packet header: IPv6 encapsulated in SRv6:
626         IPv6 header with SRH, IPv6 header, UDP header
627
628         :param int srcaddr: outer source address
629         :param list sidlist: segment list of outer IPv6 SRH
630         :param int segleft: segments-left field of outer IPv6 SRH
631
632         Outer IPv6 source address is set to srcaddr
633         Outer IPv6 destination address is set to sidlist[segleft]
634         Inner IPv6 source addresses is 1234::1
635         Inner IPv6 destination address is 4321::1
636         UDP source port and destination port are 1234
637         """
638
639         p = IPv6(src=srcaddr, dst=sidlist[segleft]) / \
640             IPv6ExtHdrSegmentRouting(addresses=sidlist,
641                                      segleft=segleft, nh=41) / \
642             IPv6(src='1234::1', dst='4321::1') / \
643             UDP(sport=1234, dport=1234)
644         return p
645
646     def create_packet_header_IPv4(self):
647         """Create packet header: IPv4 header, UDP header
648
649         :param dst: IPv4 destination address
650
651         IPv4 source address is 123.1.1.1
652         IPv4 destination address is 124.1.1.1
653         UDP source port and destination port are 1234
654         """
655
656         p = IP(src='123.1.1.1', dst='124.1.1.1') / UDP(sport=1234, dport=1234)
657         return p
658
659     def create_packet_header_IPv6_SRH_IPv4(self, srcaddr, sidlist, segleft):
660         """Create packet header: IPv4 encapsulated in SRv6:
661         IPv6 header with SRH, IPv4 header, UDP header
662
663         :param int srcaddr: outer source address
664         :param list sidlist: segment list of outer IPv6 SRH
665         :param int segleft: segments-left field of outer IPv6 SRH
666
667         Outer IPv6 source address is set to srcaddr
668         Outer IPv6 destination address is set to sidlist[segleft]
669         Inner IPv4 source address is 123.1.1.1
670         Inner IPv4 destination address is 124.1.1.1
671         UDP source port and destination port are 1234
672         """
673
674         p = IPv6(src=srcaddr, dst=sidlist[segleft]) / \
675             IPv6ExtHdrSegmentRouting(addresses=sidlist,
676                                      segleft=segleft, nh=4) / \
677             IP(src='123.1.1.1', dst='124.1.1.1') / \
678             UDP(sport=1234, dport=1234)
679         return p
680
681     def create_packet_header_L2(self, vlan=0):
682         """Create packet header: L2 header
683
684         :param vlan: if vlan!=0 then add 802.1q header
685         """
686         # Note: the dst addr ('00:55:44:33:22:11') is used in
687         # the compare function compare_rx_tx_packet_T_Encaps_L2
688         # to detect presence of L2 in SRH payload
689         p = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
690         etype = 0x8137  # IPX
691         if vlan:
692             # add 802.1q layer
693             p /= Dot1Q(vlan=vlan, type=etype)
694         else:
695             p.type = etype
696         return p
697
698     def create_packet_header_IPv6_SRH_L2(self, srcaddr, sidlist, segleft,
699                                          vlan=0):
700         """Create packet header: L2 encapsulated in SRv6:
701         IPv6 header with SRH, L2
702
703         :param int srcaddr: IPv6 source address
704         :param list sidlist: segment list of outer IPv6 SRH
705         :param int segleft: segments-left field of outer IPv6 SRH
706         :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
707
708         IPv6 source address is set to srcaddr
709         IPv6 destination address is set to sidlist[segleft]
710         """
711         eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
712         etype = 0x8137  # IPX
713         if vlan:
714             # add 802.1q layer
715             eth /= Dot1Q(vlan=vlan, type=etype)
716         else:
717             eth.type = etype
718
719         p = IPv6(src=srcaddr, dst=sidlist[segleft]) / \
720             IPv6ExtHdrSegmentRouting(addresses=sidlist,
721                                      segleft=segleft, nh=59) / \
722             eth
723         return p
724
725     def get_payload_info(self, packet):
726         """ Extract the payload_info from the packet
727         """
728         # in most cases, payload_info is in packet[Raw]
729         # but packet[Raw] gives the complete payload
730         # (incl L2 header) for the T.Encaps L2 case
731         try:
732             payload_info = self.payload_to_info(str(packet[Raw]))
733
734         except:
735             # remote L2 header from packet[Raw]:
736             # take packet[Raw], convert it to an Ether layer
737             # and then extract Raw from it
738             payload_info = self.payload_to_info(
739                 str(Ether(str(packet[Raw]))[Raw]))
740
741         return payload_info
742
743     def verify_captured_pkts(self, dst_if, capture, compare_func):
744         """
745         Verify captured packet stream for specified interface.
746         Compare ingress with egress packets using the specified compare fn
747
748         :param dst_if: egress interface of DUT
749         :param capture: captured packets
750         :param compare_func: function to compare in and out packet
751         """
752         self.logger.info("Verifying capture on interface %s using function %s"
753                          % (dst_if.name, compare_func.func_name))
754
755         last_info = dict()
756         for i in self.pg_interfaces:
757             last_info[i.sw_if_index] = None
758         dst_sw_if_index = dst_if.sw_if_index
759
760         for packet in capture:
761             try:
762                 # extract payload_info from packet's payload
763                 payload_info = self.get_payload_info(packet)
764                 packet_index = payload_info.index
765
766                 self.logger.debug("Verifying packet with index %d"
767                                   % (packet_index))
768                 # packet should have arrived on the expected interface
769                 self.assertEqual(payload_info.dst, dst_sw_if_index)
770                 self.logger.debug(
771                     "Got packet on interface %s: src=%u (idx=%u)" %
772                     (dst_if.name, payload_info.src, packet_index))
773
774                 # search for payload_info with same src and dst if_index
775                 # this will give us the transmitted packet
776                 next_info = self.get_next_packet_info_for_interface2(
777                     payload_info.src, dst_sw_if_index,
778                     last_info[payload_info.src])
779                 last_info[payload_info.src] = next_info
780                 # next_info should not be None
781                 self.assertTrue(next_info is not None)
782                 # index of tx and rx packets should be equal
783                 self.assertEqual(packet_index, next_info.index)
784                 # data field of next_info contains the tx packet
785                 txed_packet = next_info.data
786
787                 self.logger.debug(ppp("Transmitted packet:",
788                                       txed_packet))  # ppp=Pretty Print Packet
789
790                 self.logger.debug(ppp("Received packet:", packet))
791
792                 # compare rcvd packet with expected packet using compare_func
793                 compare_func(txed_packet, packet)
794
795             except:
796                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
797                 raise
798
799         # have all expected packets arrived?
800         for i in self.pg_interfaces:
801             remaining_packet = self.get_next_packet_info_for_interface2(
802                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
803             self.assertTrue(remaining_packet is None,
804                             "Interface %s: Packet expected from interface %s "
805                             "didn't arrive" % (dst_if.name, i.name))
806
807
808 if __name__ == '__main__':
809     unittest.main(testRunner=VppTestRunner)