40cc1906612d046914346cdfed9703a873c674f8
[vpp.git] / test / test_srv6_ad.py
1 #!/usr/bin/env python
2
3 import unittest
4 import binascii
5 from socket import AF_INET6
6
7 from framework import VppTestCase, VppTestRunner
8 from vpp_ip import DpoProto
9 from vpp_ip_route import VppIpRoute, VppRoutePath, VppIpTable
10 from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \
11     SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes
12
13 import scapy.compat
14 from scapy.packet import Raw
15 from scapy.layers.l2 import Ether, Dot1Q
16 from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
17 from scapy.layers.inet import IP, UDP
18
19 from scapy.utils import inet_pton, inet_ntop
20
21 from util import ppp
22
23
24 class TestSRv6(VppTestCase):
25     """ SRv6 Dynamic Proxy plugin Test Case """
26
27     @classmethod
28     def setUpClass(self):
29         super(TestSRv6, self).setUpClass()
30
31     def setUp(self):
32         """ Perform test setup before each test case.
33         """
34         super(TestSRv6, self).setUp()
35
36         # packet sizes, inclusive L2 overhead
37         self.pg_packet_sizes = [64, 512, 1518, 9018]
38
39         # reset packet_infos
40         self.reset_packet_infos()
41
42     def tearDown(self):
43         """ Clean up test setup after each test case.
44         """
45         self.teardown_interfaces()
46
47         super(TestSRv6, self).tearDown()
48
49     def configure_interface(self,
50                             interface,
51                             ipv6=False, ipv4=False,
52                             ipv6_table_id=0, ipv4_table_id=0):
53         """ Configure interface.
54         :param ipv6: configure IPv6 on interface
55         :param ipv4: configure IPv4 on interface
56         :param ipv6_table_id: FIB table_id for IPv6
57         :param ipv4_table_id: FIB table_id for IPv4
58         """
59         self.logger.debug("Configuring interface %s" % (interface.name))
60         if ipv6:
61             self.logger.debug("Configuring IPv6")
62             interface.set_table_ip6(ipv6_table_id)
63             interface.config_ip6()
64             interface.resolve_ndp(timeout=5)
65         if ipv4:
66             self.logger.debug("Configuring IPv4")
67             interface.set_table_ip4(ipv4_table_id)
68             interface.config_ip4()
69             interface.resolve_arp()
70         interface.admin_up()
71
72     def setup_interfaces(self, ipv6=[], ipv4=[],
73                          ipv6_table_id=[], ipv4_table_id=[]):
74         """ Create and configure interfaces.
75
76         :param ipv6: list of interface IPv6 capabilities
77         :param ipv4: list of interface IPv4 capabilities
78         :param ipv6_table_id: list of intf IPv6 FIB table_ids
79         :param ipv4_table_id: list of intf IPv4 FIB table_ids
80         :returns: List of created interfaces.
81         """
82         # how many interfaces?
83         if len(ipv6):
84             count = len(ipv6)
85         else:
86             count = len(ipv4)
87         self.logger.debug("Creating and configuring %d interfaces" % (count))
88
89         # fill up ipv6 and ipv4 lists if needed
90         # not enabled (False) is the default
91         if len(ipv6) < count:
92             ipv6 += (count - len(ipv6)) * [False]
93         if len(ipv4) < count:
94             ipv4 += (count - len(ipv4)) * [False]
95
96         # fill up table_id lists if needed
97         # table_id 0 (global) is the default
98         if len(ipv6_table_id) < count:
99             ipv6_table_id += (count - len(ipv6_table_id)) * [0]
100         if len(ipv4_table_id) < count:
101             ipv4_table_id += (count - len(ipv4_table_id)) * [0]
102
103         # create 'count' pg interfaces
104         self.create_pg_interfaces(range(count))
105
106         # setup all interfaces
107         for i in range(count):
108             intf = self.pg_interfaces[i]
109             self.configure_interface(intf,
110                                      ipv6[i], ipv4[i],
111                                      ipv6_table_id[i], ipv4_table_id[i])
112
113         if any(ipv6):
114             self.logger.debug(self.vapi.cli("show ip6 neighbors"))
115         if any(ipv4):
116             self.logger.debug(self.vapi.cli("show ip arp"))
117         self.logger.debug(self.vapi.cli("show interface"))
118         self.logger.debug(self.vapi.cli("show hardware"))
119
120         return self.pg_interfaces
121
122     def teardown_interfaces(self):
123         """ Unconfigure and bring down interface.
124         """
125         self.logger.debug("Tearing down interfaces")
126         # tear down all interfaces
127         # AFAIK they cannot be deleted
128         for i in self.pg_interfaces:
129             self.logger.debug("Tear down interface %s" % (i.name))
130             i.admin_down()
131             i.unconfig()
132             i.set_table_ip4(0)
133             i.set_table_ip6(0)
134
135     def test_SRv6_End_AD_IPv6(self):
136         """ Test SRv6 End.AD behavior with IPv6 traffic.
137         """
138         self.src_addr = 'a0::'
139         self.sid_list = ['a1::', 'a2::a6', 'a3::']
140         self.test_sid_index = 1
141
142         # send traffic to one destination interface
143         # source and destination interfaces are IPv6 only
144         self.setup_interfaces(ipv6=[True, True])
145
146         # configure route to next segment
147         route = VppIpRoute(self, self.sid_list[self.test_sid_index + 1], 128,
148                            [VppRoutePath(self.pg0.remote_ip6,
149                                          self.pg0.sw_if_index,
150                                          proto=DpoProto.DPO_PROTO_IP6)],
151                            is_ip6=1)
152         route.add_vpp_config()
153
154         # configure SRv6 localSID behavior
155         cli_str = "sr localsid address " + \
156                   self.sid_list[self.test_sid_index] + \
157                   " behavior end.ad" + \
158                   " nh " + self.pg1.remote_ip6 + \
159                   " oif " + self.pg1.name + \
160                   " iif " + self.pg1.name
161         self.vapi.cli(cli_str)
162
163         # log the localsids
164         self.logger.debug(self.vapi.cli("show sr localsid"))
165
166         # send one packet per packet size
167         count = len(self.pg_packet_sizes)
168
169         # prepare IPv6 in SRv6 headers
170         packet_header1 = self.create_packet_header_IPv6_SRH_IPv6(
171             srcaddr=self.src_addr,
172             sidlist=self.sid_list[::-1],
173             segleft=len(self.sid_list) - self.test_sid_index - 1)
174
175         # generate packets (pg0->pg1)
176         pkts1 = self.create_stream(self.pg0, self.pg1, packet_header1,
177                                    self.pg_packet_sizes, count)
178
179         # send packets and verify received packets
180         self.send_and_verify_pkts(self.pg0, pkts1, self.pg1,
181                                   self.compare_rx_tx_packet_End_AD_IPv6_out)
182
183         # log the localsid counters
184         self.logger.info(self.vapi.cli("show sr localsid"))
185
186         # prepare IPv6 header for returning packets
187         packet_header2 = self.create_packet_header_IPv6()
188
189         # generate returning packets (pg1->pg0)
190         pkts2 = self.create_stream(self.pg1, self.pg0, packet_header2,
191                                    self.pg_packet_sizes, count)
192
193         # send packets and verify received packets
194         self.send_and_verify_pkts(self.pg1, pkts2, self.pg0,
195                                   self.compare_rx_tx_packet_End_AD_IPv6_in)
196
197         # log the localsid counters
198         self.logger.info(self.vapi.cli("show sr localsid"))
199
200         # remove SRv6 localSIDs
201         cli_str = "sr localsid del address " + \
202                   self.sid_list[self.test_sid_index]
203         self.vapi.cli(cli_str)
204
205         # cleanup interfaces
206         self.teardown_interfaces()
207
208     def compare_rx_tx_packet_End_AD_IPv6_out(self, tx_pkt, rx_pkt):
209         """ Compare input and output packet after passing End.AD with IPv6
210
211         :param tx_pkt: transmitted packet
212         :param rx_pkt: received packet
213         """
214
215         # get first (outer) IPv6 header of rx'ed packet
216         rx_ip = rx_pkt.getlayer(IPv6)
217
218         tx_ip = tx_pkt.getlayer(IPv6)
219         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
220
221         # verify if rx'ed packet has no SRH
222         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
223
224         # the whole rx_ip pkt should be equal to tx_ip2
225         # except for the hlim field
226         #   -> adjust tx'ed hlim to expected hlim
227         tx_ip2.hlim = tx_ip2.hlim - 1
228
229         self.assertEqual(rx_ip, tx_ip2)
230
231         self.logger.debug("packet verification: SUCCESS")
232
233     def compare_rx_tx_packet_End_AD_IPv6_in(self, tx_pkt, rx_pkt):
234         """ Compare input and output packet after passing End.AD
235
236         :param tx_pkt: transmitted packet
237         :param rx_pkt: received packet
238         """
239
240         # get first (outer) IPv6 header of rx'ed packet
241         rx_ip = rx_pkt.getlayer(IPv6)
242         # received ip.src should be equal to SR Policy source
243         self.assertEqual(rx_ip.src, self.src_addr)
244         # received ip.dst should be equal to expected sidlist next segment
245         self.assertEqual(rx_ip.dst, self.sid_list[self.test_sid_index + 1])
246
247         # rx'ed packet should have SRH
248         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
249
250         # get SRH
251         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
252         # rx'ed seglist should be equal to SID-list in reversed order
253         self.assertEqual(rx_srh.addresses, self.sid_list[::-1])
254         # segleft should be equal to previous segleft value minus 1
255         self.assertEqual(rx_srh.segleft,
256                          len(self.sid_list) - self.test_sid_index - 2)
257         # lastentry should be equal to the SID-list length minus 1
258         self.assertEqual(rx_srh.lastentry, len(self.sid_list) - 1)
259
260         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
261         # except for the hop-limit field
262         tx_ip = tx_pkt.getlayer(IPv6)
263         #   -> update tx'ed hlim to the expected hlim
264         tx_ip.hlim -= 1
265         #   -> check payload
266         self.assertEqual(rx_srh.payload, tx_ip)
267
268         self.logger.debug("packet verification: SUCCESS")
269
270     def test_SRv6_End_AD_IPv4(self):
271         """ Test SRv6 End.AD behavior with IPv4 traffic.
272         """
273         self.src_addr = 'a0::'
274         self.sid_list = ['a1::', 'a2::a4', 'a3::']
275         self.test_sid_index = 1
276
277         # send traffic to one destination interface
278         # source and destination interfaces are IPv6 only
279         self.setup_interfaces(ipv6=[True, False], ipv4=[False, True])
280
281         # configure route to next segment
282         route = VppIpRoute(self, self.sid_list[self.test_sid_index + 1], 128,
283                            [VppRoutePath(self.pg0.remote_ip6,
284                                          self.pg0.sw_if_index,
285                                          proto=DpoProto.DPO_PROTO_IP6)],
286                            is_ip6=1)
287         route.add_vpp_config()
288
289         # configure SRv6 localSID behavior
290         cli_str = "sr localsid address " + \
291                   self.sid_list[self.test_sid_index] + \
292                   " behavior end.ad" + \
293                   " nh " + self.pg1.remote_ip4 + \
294                   " oif " + self.pg1.name + \
295                   " iif " + self.pg1.name
296         self.vapi.cli(cli_str)
297
298         # log the localsids
299         self.logger.debug(self.vapi.cli("show sr localsid"))
300
301         # send one packet per packet size
302         count = len(self.pg_packet_sizes)
303
304         # prepare IPv4 in SRv6 headers
305         packet_header1 = self.create_packet_header_IPv6_SRH_IPv4(
306             srcaddr=self.src_addr,
307             sidlist=self.sid_list[::-1],
308             segleft=len(self.sid_list) - self.test_sid_index - 1)
309
310         # generate packets (pg0->pg1)
311         pkts1 = self.create_stream(self.pg0, self.pg1, packet_header1,
312                                    self.pg_packet_sizes, count)
313
314         # send packets and verify received packets
315         self.send_and_verify_pkts(self.pg0, pkts1, self.pg1,
316                                   self.compare_rx_tx_packet_End_AD_IPv4_out)
317
318         # log the localsid counters
319         self.logger.info(self.vapi.cli("show sr localsid"))
320
321         # prepare IPv6 header for returning packets
322         packet_header2 = self.create_packet_header_IPv4()
323
324         # generate returning packets (pg1->pg0)
325         pkts2 = self.create_stream(self.pg1, self.pg0, packet_header2,
326                                    self.pg_packet_sizes, count)
327
328         # send packets and verify received packets
329         self.send_and_verify_pkts(self.pg1, pkts2, self.pg0,
330                                   self.compare_rx_tx_packet_End_AD_IPv4_in)
331
332         # log the localsid counters
333         self.logger.info(self.vapi.cli("show sr localsid"))
334
335         # remove SRv6 localSIDs
336         cli_str = "sr localsid del address " + \
337                   self.sid_list[self.test_sid_index]
338         self.vapi.cli(cli_str)
339
340         # cleanup interfaces
341         self.teardown_interfaces()
342
343     def compare_rx_tx_packet_End_AD_IPv4_out(self, tx_pkt, rx_pkt):
344         """ Compare input and output packet after passing End.AD with IPv4
345
346         :param tx_pkt: transmitted packet
347         :param rx_pkt: received packet
348         """
349
350         # get IPv4 header of rx'ed packet
351         rx_ip = rx_pkt.getlayer(IP)
352
353         tx_ip = tx_pkt.getlayer(IPv6)
354         tx_ip2 = tx_pkt.getlayer(IP)
355
356         # verify if rx'ed packet has no SRH
357         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
358
359         # the whole rx_ip pkt should be equal to tx_ip2
360         # except for the ttl field and ip checksum
361         #   -> adjust tx'ed ttl to expected ttl
362         tx_ip2.ttl = tx_ip2.ttl - 1
363         #   -> set tx'ed ip checksum to None and let scapy recompute
364         tx_ip2.chksum = None
365         # read back the pkt (with str()) to force computing these fields
366         # probably other ways to accomplish this are possible
367         tx_ip2 = IP(scapy.compat.raw(tx_ip2))
368
369         self.assertEqual(rx_ip, tx_ip2)
370
371         self.logger.debug("packet verification: SUCCESS")
372
373     def compare_rx_tx_packet_End_AD_IPv4_in(self, tx_pkt, rx_pkt):
374         """ Compare input and output packet after passing End.AD
375
376         :param tx_pkt: transmitted packet
377         :param rx_pkt: received packet
378         """
379
380         # get first (outer) IPv6 header of rx'ed packet
381         rx_ip = rx_pkt.getlayer(IPv6)
382         # received ip.src should be equal to SR Policy source
383         self.assertEqual(rx_ip.src, self.src_addr)
384         # received ip.dst should be equal to expected sidlist next segment
385         self.assertEqual(rx_ip.dst, self.sid_list[self.test_sid_index + 1])
386
387         # rx'ed packet should have SRH
388         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
389
390         # get SRH
391         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
392         # rx'ed seglist should be equal to SID-list in reversed order
393         self.assertEqual(rx_srh.addresses, self.sid_list[::-1])
394         # segleft should be equal to previous segleft value minus 1
395         self.assertEqual(rx_srh.segleft,
396                          len(self.sid_list) - self.test_sid_index - 2)
397         # lastentry should be equal to the SID-list length minus 1
398         self.assertEqual(rx_srh.lastentry, len(self.sid_list) - 1)
399
400         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
401         # except for the ttl field and ip checksum
402         tx_ip = tx_pkt.getlayer(IP)
403         #   -> adjust tx'ed ttl to expected ttl
404         tx_ip.ttl = tx_ip.ttl - 1
405         #   -> set tx'ed ip checksum to None and let scapy recompute
406         tx_ip.chksum = None
407         #   -> read back the pkt (with str()) to force computing these fields
408         # probably other ways to accomplish this are possible
409         self.assertEqual(rx_srh.payload, IP(scapy.compat.raw(tx_ip)))
410
411         self.logger.debug("packet verification: SUCCESS")
412
413     def test_SRv6_End_AD_L2(self):
414         """ Test SRv6 End.AD behavior with L2 traffic.
415         """
416         self.src_addr = 'a0::'
417         self.sid_list = ['a1::', 'a2::a4', 'a3::']
418         self.test_sid_index = 1
419
420         # send traffic to one destination interface
421         # source and destination interfaces are IPv6 only
422         self.setup_interfaces(ipv6=[True, False])
423
424         # configure route to next segment
425         route = VppIpRoute(self, self.sid_list[self.test_sid_index + 1], 128,
426                            [VppRoutePath(self.pg0.remote_ip6,
427                                          self.pg0.sw_if_index,
428                                          proto=DpoProto.DPO_PROTO_IP6)],
429                            is_ip6=1)
430         route.add_vpp_config()
431
432         # configure SRv6 localSID behavior
433         cli_str = "sr localsid address " + \
434                   self.sid_list[self.test_sid_index] + \
435                   " behavior end.ad" + \
436                   " oif " + self.pg1.name + \
437                   " iif " + self.pg1.name
438         self.vapi.cli(cli_str)
439
440         # log the localsids
441         self.logger.debug(self.vapi.cli("show sr localsid"))
442
443         # send one packet per packet size
444         count = len(self.pg_packet_sizes)
445
446         # prepare L2 in SRv6 headers
447         packet_header1 = self.create_packet_header_IPv6_SRH_L2(
448             srcaddr=self.src_addr,
449             sidlist=self.sid_list[::-1],
450             segleft=len(self.sid_list) - self.test_sid_index - 1,
451             vlan=0)
452
453         # generate packets (pg0->pg1)
454         pkts1 = self.create_stream(self.pg0, self.pg1, packet_header1,
455                                    self.pg_packet_sizes, count)
456
457         # send packets and verify received packets
458         self.send_and_verify_pkts(self.pg0, pkts1, self.pg1,
459                                   self.compare_rx_tx_packet_End_AD_L2_out)
460
461         # log the localsid counters
462         self.logger.info(self.vapi.cli("show sr localsid"))
463
464         # prepare L2 header for returning packets
465         packet_header2 = self.create_packet_header_L2()
466
467         # generate returning packets (pg1->pg0)
468         pkts2 = self.create_stream(self.pg1, self.pg0, packet_header2,
469                                    self.pg_packet_sizes, count)
470
471         # send packets and verify received packets
472         self.send_and_verify_pkts(self.pg1, pkts2, self.pg0,
473                                   self.compare_rx_tx_packet_End_AD_L2_in)
474
475         # log the localsid counters
476         self.logger.info(self.vapi.cli("show sr localsid"))
477
478         # remove SRv6 localSIDs
479         cli_str = "sr localsid del address " + \
480                   self.sid_list[self.test_sid_index]
481         self.vapi.cli(cli_str)
482
483         # cleanup interfaces
484         self.teardown_interfaces()
485
486     def compare_rx_tx_packet_End_AD_L2_out(self, tx_pkt, rx_pkt):
487         """ Compare input and output packet after passing End.AD with L2
488
489         :param tx_pkt: transmitted packet
490         :param rx_pkt: received packet
491         """
492
493         # get IPv4 header of rx'ed packet
494         rx_eth = rx_pkt.getlayer(Ether)
495
496         tx_ip = tx_pkt.getlayer(IPv6)
497         # we can't just get the 2nd Ether layer
498         # get the Raw content and dissect it as Ether
499         tx_eth1 = Ether(scapy.compat.raw(tx_pkt[Raw]))
500
501         # verify if rx'ed packet has no SRH
502         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
503
504         # the whole rx_eth pkt should be equal to tx_eth1
505         self.assertEqual(rx_eth, tx_eth1)
506
507         self.logger.debug("packet verification: SUCCESS")
508
509     def compare_rx_tx_packet_End_AD_L2_in(self, tx_pkt, rx_pkt):
510         """ Compare input and output packet after passing End.AD
511
512         :param tx_pkt: transmitted packet
513         :param rx_pkt: received packet
514         """
515
516         ####
517         # get first (outer) IPv6 header of rx'ed packet
518         rx_ip = rx_pkt.getlayer(IPv6)
519         # received ip.src should be equal to SR Policy source
520         self.assertEqual(rx_ip.src, self.src_addr)
521         # received ip.dst should be equal to expected sidlist next segment
522         self.assertEqual(rx_ip.dst, self.sid_list[self.test_sid_index + 1])
523
524         # rx'ed packet should have SRH
525         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
526
527         # get SRH
528         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
529         # rx'ed seglist should be equal to SID-list in reversed order
530         self.assertEqual(rx_srh.addresses, self.sid_list[::-1])
531         # segleft should be equal to previous segleft value minus 1
532         self.assertEqual(rx_srh.segleft,
533                          len(self.sid_list) - self.test_sid_index - 2)
534         # lastentry should be equal to the SID-list length minus 1
535         self.assertEqual(rx_srh.lastentry, len(self.sid_list) - 1)
536
537         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
538         tx_ether = tx_pkt.getlayer(Ether)
539         self.assertEqual(Ether(scapy.compat.raw(rx_srh.payload)), tx_ether)
540
541         self.logger.debug("packet verification: SUCCESS")
542
543     def create_stream(self, src_if, dst_if, packet_header, packet_sizes,
544                       count):
545         """Create SRv6 input packet stream for defined interface.
546
547         :param VppInterface src_if: Interface to create packet stream for
548         :param VppInterface dst_if: destination interface of packet stream
549         :param packet_header: Layer3 scapy packet headers,
550                 L2 is added when not provided,
551                 Raw(payload) with packet_info is added
552         :param list packet_sizes: packet stream pckt sizes,sequentially applied
553                to packets in stream have
554         :param int count: number of packets in packet stream
555         :return: list of packets
556         """
557         self.logger.info("Creating packets")
558         pkts = []
559         for i in range(0, count - 1):
560             payload_info = self.create_packet_info(src_if, dst_if)
561             self.logger.debug(
562                 "Creating packet with index %d" % (payload_info.index))
563             payload = self.info_to_payload(payload_info)
564             # add L2 header if not yet provided in packet_header
565             if packet_header.getlayer(0).name == 'Ethernet':
566                 p = packet_header / Raw(payload)
567             else:
568                 p = Ether(dst=src_if.local_mac, src=src_if.remote_mac) / \
569                     packet_header / Raw(payload)
570             size = packet_sizes[i % len(packet_sizes)]
571             self.logger.debug("Packet size %d" % (size))
572             self.extend_packet(p, size)
573             # we need to store the packet with the automatic fields computed
574             # read back the dumped packet (with str())
575             # to force computing these fields
576             # probably other ways are possible
577             p = Ether(scapy.compat.raw(p))
578             payload_info.data = p.copy()
579             self.logger.debug(ppp("Created packet:", p))
580             pkts.append(p)
581         self.logger.info("Done creating packets")
582         return pkts
583
584     def send_and_verify_pkts(self, input, pkts, output, compare_func):
585         """Send packets and verify received packets using compare_func
586
587         :param input: ingress interface of DUT
588         :param pkts: list of packets to transmit
589         :param output: egress interface of DUT
590         :param compare_func: function to compare in and out packets
591         """
592         # add traffic stream to input interface
593         input.add_stream(pkts)
594
595         # enable capture on all interfaces
596         self.pg_enable_capture(self.pg_interfaces)
597
598         # start traffic
599         self.logger.info("Starting traffic")
600         self.pg_start()
601
602         # get output capture
603         self.logger.info("Getting packet capture")
604         capture = output.get_capture()
605
606         # assert nothing was captured on input interface
607         # input.assert_nothing_captured()
608
609         # verify captured packets
610         self.verify_captured_pkts(output, capture, compare_func)
611
612     def create_packet_header_IPv6(self):
613         """Create packet header: IPv6 header, UDP header
614
615         :param dst: IPv6 destination address
616
617         IPv6 source address is 1234::1
618         IPv6 destination address is 4321::1
619         UDP source port and destination port are 1234
620         """
621
622         p = IPv6(src='1234::1', dst='4321::1') / UDP(sport=1234, dport=1234)
623         return p
624
625     def create_packet_header_IPv6_SRH_IPv6(self, srcaddr, sidlist, segleft):
626         """Create packet header: IPv6 encapsulated in SRv6:
627         IPv6 header with SRH, IPv6 header, UDP header
628
629         :param int srcaddr: outer source address
630         :param list sidlist: segment list of outer IPv6 SRH
631         :param int segleft: segments-left field of outer IPv6 SRH
632
633         Outer IPv6 source address is set to srcaddr
634         Outer IPv6 destination address is set to sidlist[segleft]
635         Inner IPv6 source addresses is 1234::1
636         Inner IPv6 destination address is 4321::1
637         UDP source port and destination port are 1234
638         """
639
640         p = IPv6(src=srcaddr, dst=sidlist[segleft]) / \
641             IPv6ExtHdrSegmentRouting(addresses=sidlist,
642                                      segleft=segleft, nh=41) / \
643             IPv6(src='1234::1', dst='4321::1') / \
644             UDP(sport=1234, dport=1234)
645         return p
646
647     def create_packet_header_IPv4(self):
648         """Create packet header: IPv4 header, UDP header
649
650         :param dst: IPv4 destination address
651
652         IPv4 source address is 123.1.1.1
653         IPv4 destination address is 124.1.1.1
654         UDP source port and destination port are 1234
655         """
656
657         p = IP(src='123.1.1.1', dst='124.1.1.1') / UDP(sport=1234, dport=1234)
658         return p
659
660     def create_packet_header_IPv6_SRH_IPv4(self, srcaddr, sidlist, segleft):
661         """Create packet header: IPv4 encapsulated in SRv6:
662         IPv6 header with SRH, IPv4 header, UDP header
663
664         :param int srcaddr: outer source address
665         :param list sidlist: segment list of outer IPv6 SRH
666         :param int segleft: segments-left field of outer IPv6 SRH
667
668         Outer IPv6 source address is set to srcaddr
669         Outer IPv6 destination address is set to sidlist[segleft]
670         Inner IPv4 source address is 123.1.1.1
671         Inner IPv4 destination address is 124.1.1.1
672         UDP source port and destination port are 1234
673         """
674
675         p = IPv6(src=srcaddr, dst=sidlist[segleft]) / \
676             IPv6ExtHdrSegmentRouting(addresses=sidlist,
677                                      segleft=segleft, nh=4) / \
678             IP(src='123.1.1.1', dst='124.1.1.1') / \
679             UDP(sport=1234, dport=1234)
680         return p
681
682     def create_packet_header_L2(self, vlan=0):
683         """Create packet header: L2 header
684
685         :param vlan: if vlan!=0 then add 802.1q header
686         """
687         # Note: the dst addr ('00:55:44:33:22:11') is used in
688         # the compare function compare_rx_tx_packet_T_Encaps_L2
689         # to detect presence of L2 in SRH payload
690         p = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
691         etype = 0x8137  # IPX
692         if vlan:
693             # add 802.1q layer
694             p /= Dot1Q(vlan=vlan, type=etype)
695         else:
696             p.type = etype
697         return p
698
699     def create_packet_header_IPv6_SRH_L2(self, srcaddr, sidlist, segleft,
700                                          vlan=0):
701         """Create packet header: L2 encapsulated in SRv6:
702         IPv6 header with SRH, L2
703
704         :param int srcaddr: IPv6 source address
705         :param list sidlist: segment list of outer IPv6 SRH
706         :param int segleft: segments-left field of outer IPv6 SRH
707         :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
708
709         IPv6 source address is set to srcaddr
710         IPv6 destination address is set to sidlist[segleft]
711         """
712         eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
713         etype = 0x8137  # IPX
714         if vlan:
715             # add 802.1q layer
716             eth /= Dot1Q(vlan=vlan, type=etype)
717         else:
718             eth.type = etype
719
720         p = IPv6(src=srcaddr, dst=sidlist[segleft]) / \
721             IPv6ExtHdrSegmentRouting(addresses=sidlist,
722                                      segleft=segleft, nh=59) / \
723             eth
724         return p
725
726     def get_payload_info(self, packet):
727         """ Extract the payload_info from the packet
728         """
729         # in most cases, payload_info is in packet[Raw]
730         # but packet[Raw] gives the complete payload
731         # (incl L2 header) for the T.Encaps L2 case
732         try:
733             payload_info = self.payload_to_info(packet[Raw])
734
735         except:
736             # remote L2 header from packet[Raw]:
737             # take packet[Raw], convert it to an Ether layer
738             # and then extract Raw from it
739             payload_info = self.payload_to_info(
740                 Ether(scapy.compat.raw(packet[Raw]))[Raw])
741
742         return payload_info
743
744     def verify_captured_pkts(self, dst_if, capture, compare_func):
745         """
746         Verify captured packet stream for specified interface.
747         Compare ingress with egress packets using the specified compare fn
748
749         :param dst_if: egress interface of DUT
750         :param capture: captured packets
751         :param compare_func: function to compare in and out packet
752         """
753         self.logger.info("Verifying capture on interface %s using function %s"
754                          % (dst_if.name, compare_func.__name__))
755
756         last_info = dict()
757         for i in self.pg_interfaces:
758             last_info[i.sw_if_index] = None
759         dst_sw_if_index = dst_if.sw_if_index
760
761         for packet in capture:
762             try:
763                 # extract payload_info from packet's payload
764                 payload_info = self.get_payload_info(packet)
765                 packet_index = payload_info.index
766
767                 self.logger.debug("Verifying packet with index %d"
768                                   % (packet_index))
769                 # packet should have arrived on the expected interface
770                 self.assertEqual(payload_info.dst, dst_sw_if_index)
771                 self.logger.debug(
772                     "Got packet on interface %s: src=%u (idx=%u)" %
773                     (dst_if.name, payload_info.src, packet_index))
774
775                 # search for payload_info with same src and dst if_index
776                 # this will give us the transmitted packet
777                 next_info = self.get_next_packet_info_for_interface2(
778                     payload_info.src, dst_sw_if_index,
779                     last_info[payload_info.src])
780                 last_info[payload_info.src] = next_info
781                 # next_info should not be None
782                 self.assertTrue(next_info is not None)
783                 # index of tx and rx packets should be equal
784                 self.assertEqual(packet_index, next_info.index)
785                 # data field of next_info contains the tx packet
786                 txed_packet = next_info.data
787
788                 self.logger.debug(ppp("Transmitted packet:",
789                                       txed_packet))  # ppp=Pretty Print Packet
790
791                 self.logger.debug(ppp("Received packet:", packet))
792
793                 # compare rcvd packet with expected packet using compare_func
794                 compare_func(txed_packet, packet)
795
796             except:
797                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
798                 raise
799
800         # have all expected packets arrived?
801         for i in self.pg_interfaces:
802             remaining_packet = self.get_next_packet_info_for_interface2(
803                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
804             self.assertTrue(remaining_packet is None,
805                             "Interface %s: Packet expected from interface %s "
806                             "didn't arrive" % (dst_if.name, i.name))
807
808
809 if __name__ == '__main__':
810     unittest.main(testRunner=VppTestRunner)