tests: changes for scapy 2.4.3 migration
[vpp.git] / src / plugins / srv6-ad / test / test_srv6_ad.py
1 #!/usr/bin/env python3
2
3 import unittest
4 import binascii
5 from socket import AF_INET6
6
7 from framework import VppTestCase, VppTestRunner
8 from vpp_ip import DpoProto
9 from vpp_ip_route import VppIpRoute, VppRoutePath, VppIpTable
10 from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \
11     SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes
12
13 import scapy.compat
14 from scapy.packet import Raw
15 from scapy.layers.l2 import Ether, Dot1Q
16 from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
17 from scapy.layers.inet import IP, UDP
18
19 from util import ppp
20
21
22 class TestSRv6(VppTestCase):
23     """ SRv6 Dynamic Proxy plugin Test Case """
24
25     @classmethod
26     def setUpClass(self):
27         super(TestSRv6, self).setUpClass()
28
29     @classmethod
30     def tearDownClass(cls):
31         super(TestSRv6, cls).tearDownClass()
32
33     def setUp(self):
34         """ Perform test setup before each test case.
35         """
36         super(TestSRv6, self).setUp()
37
38         # packet sizes, inclusive L2 overhead
39         self.pg_packet_sizes = [64, 512, 1518, 9018]
40
41         # reset packet_infos
42         self.reset_packet_infos()
43
44     def tearDown(self):
45         """ Clean up test setup after each test case.
46         """
47         self.teardown_interfaces()
48
49         super(TestSRv6, self).tearDown()
50
51     def configure_interface(self,
52                             interface,
53                             ipv6=False, ipv4=False,
54                             ipv6_table_id=0, ipv4_table_id=0):
55         """ Configure interface.
56         :param ipv6: configure IPv6 on interface
57         :param ipv4: configure IPv4 on interface
58         :param ipv6_table_id: FIB table_id for IPv6
59         :param ipv4_table_id: FIB table_id for IPv4
60         """
61         self.logger.debug("Configuring interface %s" % (interface.name))
62         if ipv6:
63             self.logger.debug("Configuring IPv6")
64             interface.set_table_ip6(ipv6_table_id)
65             interface.config_ip6()
66             interface.resolve_ndp(timeout=5)
67         if ipv4:
68             self.logger.debug("Configuring IPv4")
69             interface.set_table_ip4(ipv4_table_id)
70             interface.config_ip4()
71             interface.resolve_arp()
72         interface.admin_up()
73
74     def setup_interfaces(self, ipv6=[], ipv4=[],
75                          ipv6_table_id=[], ipv4_table_id=[]):
76         """ Create and configure interfaces.
77
78         :param ipv6: list of interface IPv6 capabilities
79         :param ipv4: list of interface IPv4 capabilities
80         :param ipv6_table_id: list of intf IPv6 FIB table_ids
81         :param ipv4_table_id: list of intf IPv4 FIB table_ids
82         :returns: List of created interfaces.
83         """
84         # how many interfaces?
85         if len(ipv6):
86             count = len(ipv6)
87         else:
88             count = len(ipv4)
89         self.logger.debug("Creating and configuring %d interfaces" % (count))
90
91         # fill up ipv6 and ipv4 lists if needed
92         # not enabled (False) is the default
93         if len(ipv6) < count:
94             ipv6 += (count - len(ipv6)) * [False]
95         if len(ipv4) < count:
96             ipv4 += (count - len(ipv4)) * [False]
97
98         # fill up table_id lists if needed
99         # table_id 0 (global) is the default
100         if len(ipv6_table_id) < count:
101             ipv6_table_id += (count - len(ipv6_table_id)) * [0]
102         if len(ipv4_table_id) < count:
103             ipv4_table_id += (count - len(ipv4_table_id)) * [0]
104
105         # create 'count' pg interfaces
106         self.create_pg_interfaces(range(count))
107
108         # setup all interfaces
109         for i in range(count):
110             intf = self.pg_interfaces[i]
111             self.configure_interface(intf,
112                                      ipv6[i], ipv4[i],
113                                      ipv6_table_id[i], ipv4_table_id[i])
114
115         if any(ipv6):
116             self.logger.debug(self.vapi.cli("show ip6 neighbors"))
117         if any(ipv4):
118             self.logger.debug(self.vapi.cli("show ip arp"))
119         self.logger.debug(self.vapi.cli("show interface"))
120         self.logger.debug(self.vapi.cli("show hardware"))
121
122         return self.pg_interfaces
123
124     def teardown_interfaces(self):
125         """ Unconfigure and bring down interface.
126         """
127         self.logger.debug("Tearing down interfaces")
128         # tear down all interfaces
129         # AFAIK they cannot be deleted
130         for i in self.pg_interfaces:
131             self.logger.debug("Tear down interface %s" % (i.name))
132             i.admin_down()
133             i.unconfig()
134             i.set_table_ip4(0)
135             i.set_table_ip6(0)
136
137     def test_SRv6_End_AD_IPv6(self):
138         """ Test SRv6 End.AD behavior with IPv6 traffic.
139         """
140         self.src_addr = 'a0::'
141         self.sid_list = ['a1::', 'a2::a6', 'a3::']
142         self.test_sid_index = 1
143
144         # send traffic to one destination interface
145         # source and destination interfaces are IPv6 only
146         self.setup_interfaces(ipv6=[True, True])
147
148         # configure route to next segment
149         route = VppIpRoute(self, self.sid_list[self.test_sid_index + 1], 128,
150                            [VppRoutePath(self.pg0.remote_ip6,
151                                          self.pg0.sw_if_index,
152                                          proto=DpoProto.DPO_PROTO_IP6)])
153         route.add_vpp_config()
154
155         # configure SRv6 localSID behavior
156         cli_str = "sr localsid address " + \
157                   self.sid_list[self.test_sid_index] + \
158                   " behavior end.ad" + \
159                   " nh " + self.pg1.remote_ip6 + \
160                   " oif " + self.pg1.name + \
161                   " iif " + self.pg1.name
162         self.vapi.cli(cli_str)
163
164         # log the localsids
165         self.logger.debug(self.vapi.cli("show sr localsid"))
166
167         # send one packet per packet size
168         count = len(self.pg_packet_sizes)
169
170         # prepare IPv6 in SRv6 headers
171         packet_header1 = self.create_packet_header_IPv6_SRH_IPv6(
172             srcaddr=self.src_addr,
173             sidlist=self.sid_list[::-1],
174             segleft=len(self.sid_list) - self.test_sid_index - 1)
175
176         # generate packets (pg0->pg1)
177         pkts1 = self.create_stream(self.pg0, self.pg1, packet_header1,
178                                    self.pg_packet_sizes, count)
179
180         # send packets and verify received packets
181         self.send_and_verify_pkts(self.pg0, pkts1, self.pg1,
182                                   self.compare_rx_tx_packet_End_AD_IPv6_out)
183
184         # log the localsid counters
185         self.logger.info(self.vapi.cli("show sr localsid"))
186
187         # prepare IPv6 header for returning packets
188         packet_header2 = self.create_packet_header_IPv6()
189
190         # generate returning packets (pg1->pg0)
191         pkts2 = self.create_stream(self.pg1, self.pg0, packet_header2,
192                                    self.pg_packet_sizes, count)
193
194         # send packets and verify received packets
195         self.send_and_verify_pkts(self.pg1, pkts2, self.pg0,
196                                   self.compare_rx_tx_packet_End_AD_IPv6_in)
197
198         # log the localsid counters
199         self.logger.info(self.vapi.cli("show sr localsid"))
200
201         # remove SRv6 localSIDs
202         cli_str = "sr localsid del address " + \
203                   self.sid_list[self.test_sid_index]
204         self.vapi.cli(cli_str)
205
206         # cleanup interfaces
207         self.teardown_interfaces()
208
209     def compare_rx_tx_packet_End_AD_IPv6_out(self, tx_pkt, rx_pkt):
210         """ Compare input and output packet after passing End.AD with IPv6
211
212         :param tx_pkt: transmitted packet
213         :param rx_pkt: received packet
214         """
215
216         # get first (outer) IPv6 header of rx'ed packet
217         rx_ip = rx_pkt.getlayer(IPv6)
218
219         tx_ip = tx_pkt.getlayer(IPv6)
220         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
221
222         # verify if rx'ed packet has no SRH
223         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
224
225         # the whole rx_ip pkt should be equal to tx_ip2
226         # except for the hlim field
227         #   -> adjust tx'ed hlim to expected hlim
228         tx_ip2.hlim = tx_ip2.hlim - 1
229
230         self.assertEqual(rx_ip, tx_ip2)
231
232         self.logger.debug("packet verification: SUCCESS")
233
234     def compare_rx_tx_packet_End_AD_IPv6_in(self, tx_pkt, rx_pkt):
235         """ Compare input and output packet after passing End.AD
236
237         :param tx_pkt: transmitted packet
238         :param rx_pkt: received packet
239         """
240
241         # get first (outer) IPv6 header of rx'ed packet
242         rx_ip = rx_pkt.getlayer(IPv6)
243         # received ip.src should be equal to SR Policy source
244         self.assertEqual(rx_ip.src, self.src_addr)
245         # received ip.dst should be equal to expected sidlist next segment
246         self.assertEqual(rx_ip.dst, self.sid_list[self.test_sid_index + 1])
247
248         # rx'ed packet should have SRH
249         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
250
251         # get SRH
252         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
253         # rx'ed seglist should be equal to SID-list in reversed order
254         self.assertEqual(rx_srh.addresses, self.sid_list[::-1])
255         # segleft should be equal to previous segleft value minus 1
256         self.assertEqual(rx_srh.segleft,
257                          len(self.sid_list) - self.test_sid_index - 2)
258         # lastentry should be equal to the SID-list length minus 1
259         self.assertEqual(rx_srh.lastentry, len(self.sid_list) - 1)
260
261         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
262         # except for the hop-limit field
263         tx_ip = tx_pkt.getlayer(IPv6)
264         #   -> update tx'ed hlim to the expected hlim
265         tx_ip.hlim -= 1
266         #   -> check payload
267         self.assertEqual(rx_srh.payload, tx_ip)
268
269         self.logger.debug("packet verification: SUCCESS")
270
271     def test_SRv6_End_AD_IPv4(self):
272         """ Test SRv6 End.AD behavior with IPv4 traffic.
273         """
274         self.src_addr = 'a0::'
275         self.sid_list = ['a1::', 'a2::a4', 'a3::']
276         self.test_sid_index = 1
277
278         # send traffic to one destination interface
279         # source and destination interfaces are IPv6 only
280         self.setup_interfaces(ipv6=[True, False], ipv4=[False, True])
281
282         # configure route to next segment
283         route = VppIpRoute(self, self.sid_list[self.test_sid_index + 1], 128,
284                            [VppRoutePath(self.pg0.remote_ip6,
285                                          self.pg0.sw_if_index,
286                                          proto=DpoProto.DPO_PROTO_IP6)])
287         route.add_vpp_config()
288
289         # configure SRv6 localSID behavior
290         cli_str = "sr localsid address " + \
291                   self.sid_list[self.test_sid_index] + \
292                   " behavior end.ad" + \
293                   " nh " + self.pg1.remote_ip4 + \
294                   " oif " + self.pg1.name + \
295                   " iif " + self.pg1.name
296         self.vapi.cli(cli_str)
297
298         # log the localsids
299         self.logger.debug(self.vapi.cli("show sr localsid"))
300
301         # send one packet per packet size
302         count = len(self.pg_packet_sizes)
303
304         # prepare IPv4 in SRv6 headers
305         packet_header1 = self.create_packet_header_IPv6_SRH_IPv4(
306             srcaddr=self.src_addr,
307             sidlist=self.sid_list[::-1],
308             segleft=len(self.sid_list) - self.test_sid_index - 1)
309
310         # generate packets (pg0->pg1)
311         pkts1 = self.create_stream(self.pg0, self.pg1, packet_header1,
312                                    self.pg_packet_sizes, count)
313
314         # send packets and verify received packets
315         self.send_and_verify_pkts(self.pg0, pkts1, self.pg1,
316                                   self.compare_rx_tx_packet_End_AD_IPv4_out)
317
318         # log the localsid counters
319         self.logger.info(self.vapi.cli("show sr localsid"))
320
321         # prepare IPv6 header for returning packets
322         packet_header2 = self.create_packet_header_IPv4()
323
324         # generate returning packets (pg1->pg0)
325         pkts2 = self.create_stream(self.pg1, self.pg0, packet_header2,
326                                    self.pg_packet_sizes, count)
327
328         # send packets and verify received packets
329         self.send_and_verify_pkts(self.pg1, pkts2, self.pg0,
330                                   self.compare_rx_tx_packet_End_AD_IPv4_in)
331
332         # log the localsid counters
333         self.logger.info(self.vapi.cli("show sr localsid"))
334
335         # remove SRv6 localSIDs
336         cli_str = "sr localsid del address " + \
337                   self.sid_list[self.test_sid_index]
338         self.vapi.cli(cli_str)
339
340         # cleanup interfaces
341         self.teardown_interfaces()
342
343     def compare_rx_tx_packet_End_AD_IPv4_out(self, tx_pkt, rx_pkt):
344         """ Compare input and output packet after passing End.AD with IPv4
345
346         :param tx_pkt: transmitted packet
347         :param rx_pkt: received packet
348         """
349
350         # get IPv4 header of rx'ed packet
351         rx_ip = rx_pkt.getlayer(IP)
352
353         tx_ip = tx_pkt.getlayer(IPv6)
354         tx_ip2 = tx_pkt.getlayer(IP)
355
356         # verify if rx'ed packet has no SRH
357         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
358
359         # the whole rx_ip pkt should be equal to tx_ip2
360         # except for the ttl field and ip checksum
361         #   -> adjust tx'ed ttl to expected ttl
362         tx_ip2.ttl = tx_ip2.ttl - 1
363         #   -> set tx'ed ip checksum to None and let scapy recompute
364         tx_ip2.chksum = None
365         # read back the pkt (with str()) to force computing these fields
366         # probably other ways to accomplish this are possible
367         tx_ip2 = IP(scapy.compat.raw(tx_ip2))
368
369         self.assertEqual(rx_ip, tx_ip2)
370
371         self.logger.debug("packet verification: SUCCESS")
372
373     def compare_rx_tx_packet_End_AD_IPv4_in(self, tx_pkt, rx_pkt):
374         """ Compare input and output packet after passing End.AD
375
376         :param tx_pkt: transmitted packet
377         :param rx_pkt: received packet
378         """
379
380         # get first (outer) IPv6 header of rx'ed packet
381         rx_ip = rx_pkt.getlayer(IPv6)
382         # received ip.src should be equal to SR Policy source
383         self.assertEqual(rx_ip.src, self.src_addr)
384         # received ip.dst should be equal to expected sidlist next segment
385         self.assertEqual(rx_ip.dst, self.sid_list[self.test_sid_index + 1])
386
387         # rx'ed packet should have SRH
388         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
389
390         # get SRH
391         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
392         # rx'ed seglist should be equal to SID-list in reversed order
393         self.assertEqual(rx_srh.addresses, self.sid_list[::-1])
394         # segleft should be equal to previous segleft value minus 1
395         self.assertEqual(rx_srh.segleft,
396                          len(self.sid_list) - self.test_sid_index - 2)
397         # lastentry should be equal to the SID-list length minus 1
398         self.assertEqual(rx_srh.lastentry, len(self.sid_list) - 1)
399
400         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
401         # except for the ttl field and ip checksum
402         tx_ip = tx_pkt.getlayer(IP)
403         #   -> adjust tx'ed ttl to expected ttl
404         tx_ip.ttl = tx_ip.ttl - 1
405         #   -> set tx'ed ip checksum to None and let scapy recompute
406         tx_ip.chksum = None
407         #   -> read back the pkt (with str()) to force computing these fields
408         # probably other ways to accomplish this are possible
409         self.assertEqual(rx_srh.payload, IP(scapy.compat.raw(tx_ip)))
410
411         self.logger.debug("packet verification: SUCCESS")
412
413     def test_SRv6_End_AD_L2(self):
414         """ Test SRv6 End.AD behavior with L2 traffic.
415         """
416         self.src_addr = 'a0::'
417         self.sid_list = ['a1::', 'a2::a4', 'a3::']
418         self.test_sid_index = 1
419
420         # send traffic to one destination interface
421         # source and destination interfaces are IPv6 only
422         self.setup_interfaces(ipv6=[True, False])
423
424         # configure route to next segment
425         route = VppIpRoute(self, self.sid_list[self.test_sid_index + 1], 128,
426                            [VppRoutePath(self.pg0.remote_ip6,
427                                          self.pg0.sw_if_index,
428                                          proto=DpoProto.DPO_PROTO_IP6)])
429         route.add_vpp_config()
430
431         # configure SRv6 localSID behavior
432         cli_str = "sr localsid address " + \
433                   self.sid_list[self.test_sid_index] + \
434                   " behavior end.ad" + \
435                   " oif " + self.pg1.name + \
436                   " iif " + self.pg1.name
437         self.vapi.cli(cli_str)
438
439         # log the localsids
440         self.logger.debug(self.vapi.cli("show sr localsid"))
441
442         # send one packet per packet size
443         count = len(self.pg_packet_sizes)
444
445         # prepare L2 in SRv6 headers
446         packet_header1 = self.create_packet_header_IPv6_SRH_L2(
447             srcaddr=self.src_addr,
448             sidlist=self.sid_list[::-1],
449             segleft=len(self.sid_list) - self.test_sid_index - 1,
450             vlan=0)
451
452         # generate packets (pg0->pg1)
453         pkts1 = self.create_stream(self.pg0, self.pg1, packet_header1,
454                                    self.pg_packet_sizes, count)
455
456         # send packets and verify received packets
457         self.send_and_verify_pkts(self.pg0, pkts1, self.pg1,
458                                   self.compare_rx_tx_packet_End_AD_L2_out)
459
460         # log the localsid counters
461         self.logger.info(self.vapi.cli("show sr localsid"))
462
463         # prepare L2 header for returning packets
464         packet_header2 = self.create_packet_header_L2()
465
466         # generate returning packets (pg1->pg0)
467         pkts2 = self.create_stream(self.pg1, self.pg0, packet_header2,
468                                    self.pg_packet_sizes, count)
469
470         # send packets and verify received packets
471         self.send_and_verify_pkts(self.pg1, pkts2, self.pg0,
472                                   self.compare_rx_tx_packet_End_AD_L2_in)
473
474         # log the localsid counters
475         self.logger.info(self.vapi.cli("show sr localsid"))
476
477         # remove SRv6 localSIDs
478         cli_str = "sr localsid del address " + \
479                   self.sid_list[self.test_sid_index]
480         self.vapi.cli(cli_str)
481
482         # cleanup interfaces
483         self.teardown_interfaces()
484
485     def compare_rx_tx_packet_End_AD_L2_out(self, tx_pkt, rx_pkt):
486         """ Compare input and output packet after passing End.AD with L2
487
488         :param tx_pkt: transmitted packet
489         :param rx_pkt: received packet
490         """
491
492         # get IPv4 header of rx'ed packet
493         rx_eth = rx_pkt.getlayer(Ether)
494
495         tx_ip = tx_pkt.getlayer(IPv6)
496         # we can't just get the 2nd Ether layer
497         # get the Raw content and dissect it as Ether
498         tx_eth1 = Ether(scapy.compat.raw(tx_pkt[Raw]))
499
500         # verify if rx'ed packet has no SRH
501         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
502
503         # the whole rx_eth pkt should be equal to tx_eth1
504         self.assertEqual(rx_eth, tx_eth1)
505
506         self.logger.debug("packet verification: SUCCESS")
507
508     def compare_rx_tx_packet_End_AD_L2_in(self, tx_pkt, rx_pkt):
509         """ Compare input and output packet after passing End.AD
510
511         :param tx_pkt: transmitted packet
512         :param rx_pkt: received packet
513         """
514
515         ####
516         # get first (outer) IPv6 header of rx'ed packet
517         rx_ip = rx_pkt.getlayer(IPv6)
518         # received ip.src should be equal to SR Policy source
519         self.assertEqual(rx_ip.src, self.src_addr)
520         # received ip.dst should be equal to expected sidlist next segment
521         self.assertEqual(rx_ip.dst, self.sid_list[self.test_sid_index + 1])
522
523         # rx'ed packet should have SRH
524         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
525
526         # get SRH
527         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
528         # rx'ed seglist should be equal to SID-list in reversed order
529         self.assertEqual(rx_srh.addresses, self.sid_list[::-1])
530         # segleft should be equal to previous segleft value minus 1
531         self.assertEqual(rx_srh.segleft,
532                          len(self.sid_list) - self.test_sid_index - 2)
533         # lastentry should be equal to the SID-list length minus 1
534         self.assertEqual(rx_srh.lastentry, len(self.sid_list) - 1)
535
536         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
537         tx_ether = tx_pkt.getlayer(Ether)
538         self.assertEqual(Ether(scapy.compat.raw(rx_srh.payload)), tx_ether)
539
540         self.logger.debug("packet verification: SUCCESS")
541
542     def create_stream(self, src_if, dst_if, packet_header, packet_sizes,
543                       count):
544         """Create SRv6 input packet stream for defined interface.
545
546         :param VppInterface src_if: Interface to create packet stream for
547         :param VppInterface dst_if: destination interface of packet stream
548         :param packet_header: Layer3 scapy packet headers,
549                 L2 is added when not provided,
550                 Raw(payload) with packet_info is added
551         :param list packet_sizes: packet stream pckt sizes,sequentially applied
552                to packets in stream have
553         :param int count: number of packets in packet stream
554         :return: list of packets
555         """
556         self.logger.info("Creating packets")
557         pkts = []
558         for i in range(0, count - 1):
559             payload_info = self.create_packet_info(src_if, dst_if)
560             self.logger.debug(
561                 "Creating packet with index %d" % (payload_info.index))
562             payload = self.info_to_payload(payload_info)
563             # add L2 header if not yet provided in packet_header
564             if packet_header.getlayer(0).name == 'Ethernet':
565                 p = packet_header / Raw(payload)
566             else:
567                 p = Ether(dst=src_if.local_mac, src=src_if.remote_mac) / \
568                     packet_header / Raw(payload)
569             size = packet_sizes[i % len(packet_sizes)]
570             self.logger.debug("Packet size %d" % (size))
571             self.extend_packet(p, size)
572             # we need to store the packet with the automatic fields computed
573             # read back the dumped packet (with str())
574             # to force computing these fields
575             # probably other ways are possible
576             p = Ether(scapy.compat.raw(p))
577             payload_info.data = p.copy()
578             self.logger.debug(ppp("Created packet:", p))
579             pkts.append(p)
580         self.logger.info("Done creating packets")
581         return pkts
582
583     def send_and_verify_pkts(self, input, pkts, output, compare_func):
584         """Send packets and verify received packets using compare_func
585
586         :param input: ingress interface of DUT
587         :param pkts: list of packets to transmit
588         :param output: egress interface of DUT
589         :param compare_func: function to compare in and out packets
590         """
591         # add traffic stream to input interface
592         input.add_stream(pkts)
593
594         # enable capture on all interfaces
595         self.pg_enable_capture(self.pg_interfaces)
596
597         # start traffic
598         self.logger.info("Starting traffic")
599         self.pg_start()
600
601         # get output capture
602         self.logger.info("Getting packet capture")
603         capture = output.get_capture()
604
605         # assert nothing was captured on input interface
606         # input.assert_nothing_captured()
607
608         # verify captured packets
609         self.verify_captured_pkts(output, capture, compare_func)
610
611     def create_packet_header_IPv6(self):
612         """Create packet header: IPv6 header, UDP header
613
614         :param dst: IPv6 destination address
615
616         IPv6 source address is 1234::1
617         IPv6 destination address is 4321::1
618         UDP source port and destination port are 1234
619         """
620
621         p = IPv6(src='1234::1', dst='4321::1') / UDP(sport=1234, dport=1234)
622         return p
623
624     def create_packet_header_IPv6_SRH_IPv6(self, srcaddr, sidlist, segleft):
625         """Create packet header: IPv6 encapsulated in SRv6:
626         IPv6 header with SRH, IPv6 header, UDP header
627
628         :param int srcaddr: outer source address
629         :param list sidlist: segment list of outer IPv6 SRH
630         :param int segleft: segments-left field of outer IPv6 SRH
631
632         Outer IPv6 source address is set to srcaddr
633         Outer IPv6 destination address is set to sidlist[segleft]
634         Inner IPv6 source addresses is 1234::1
635         Inner IPv6 destination address is 4321::1
636         UDP source port and destination port are 1234
637         """
638
639         p = IPv6(src=srcaddr, dst=sidlist[segleft]) / \
640             IPv6ExtHdrSegmentRouting(addresses=sidlist,
641                                      segleft=segleft, nh=41) / \
642             IPv6(src='1234::1', dst='4321::1') / \
643             UDP(sport=1234, dport=1234)
644         return p
645
646     def create_packet_header_IPv4(self):
647         """Create packet header: IPv4 header, UDP header
648
649         :param dst: IPv4 destination address
650
651         IPv4 source address is 123.1.1.1
652         IPv4 destination address is 124.1.1.1
653         UDP source port and destination port are 1234
654         """
655
656         p = IP(src='123.1.1.1', dst='124.1.1.1') / UDP(sport=1234, dport=1234)
657         return p
658
659     def create_packet_header_IPv6_SRH_IPv4(self, srcaddr, sidlist, segleft):
660         """Create packet header: IPv4 encapsulated in SRv6:
661         IPv6 header with SRH, IPv4 header, UDP header
662
663         :param int srcaddr: outer source address
664         :param list sidlist: segment list of outer IPv6 SRH
665         :param int segleft: segments-left field of outer IPv6 SRH
666
667         Outer IPv6 source address is set to srcaddr
668         Outer IPv6 destination address is set to sidlist[segleft]
669         Inner IPv4 source address is 123.1.1.1
670         Inner IPv4 destination address is 124.1.1.1
671         UDP source port and destination port are 1234
672         """
673
674         p = IPv6(src=srcaddr, dst=sidlist[segleft]) / \
675             IPv6ExtHdrSegmentRouting(addresses=sidlist,
676                                      segleft=segleft, nh=4) / \
677             IP(src='123.1.1.1', dst='124.1.1.1') / \
678             UDP(sport=1234, dport=1234)
679         return p
680
681     def create_packet_header_L2(self, vlan=0):
682         """Create packet header: L2 header
683
684         :param vlan: if vlan!=0 then add 802.1q header
685         """
686         # Note: the dst addr ('00:55:44:33:22:11') is used in
687         # the compare function compare_rx_tx_packet_T_Encaps_L2
688         # to detect presence of L2 in SRH payload
689         p = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
690         etype = 0x8137  # IPX
691         if vlan:
692             # add 802.1q layer
693             p /= Dot1Q(vlan=vlan, type=etype)
694         else:
695             p.type = etype
696         return p
697
698     def create_packet_header_IPv6_SRH_L2(self, srcaddr, sidlist, segleft,
699                                          vlan=0):
700         """Create packet header: L2 encapsulated in SRv6:
701         IPv6 header with SRH, L2
702
703         :param int srcaddr: IPv6 source address
704         :param list sidlist: segment list of outer IPv6 SRH
705         :param int segleft: segments-left field of outer IPv6 SRH
706         :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
707
708         IPv6 source address is set to srcaddr
709         IPv6 destination address is set to sidlist[segleft]
710         """
711         eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
712         etype = 0x8137  # IPX
713         if vlan:
714             # add 802.1q layer
715             eth /= Dot1Q(vlan=vlan, type=etype)
716         else:
717             eth.type = etype
718
719         p = IPv6(src=srcaddr, dst=sidlist[segleft]) / \
720             IPv6ExtHdrSegmentRouting(addresses=sidlist,
721                                      segleft=segleft, nh=59) / \
722             eth
723         return p
724
725     def get_payload_info(self, packet):
726         """ Extract the payload_info from the packet
727         """
728         # in most cases, payload_info is in packet[Raw]
729         # but packet[Raw] gives the complete payload
730         # (incl L2 header) for the T.Encaps L2 case
731         try:
732             payload_info = self.payload_to_info(packet[Raw])
733
734         except:
735             # remote L2 header from packet[Raw]:
736             # take packet[Raw], convert it to an Ether layer
737             # and then extract Raw from it
738             payload_info = self.payload_to_info(
739                 Ether(scapy.compat.raw(packet[Raw]))[Raw])
740
741         return payload_info
742
743     def verify_captured_pkts(self, dst_if, capture, compare_func):
744         """
745         Verify captured packet stream for specified interface.
746         Compare ingress with egress packets using the specified compare fn
747
748         :param dst_if: egress interface of DUT
749         :param capture: captured packets
750         :param compare_func: function to compare in and out packet
751         """
752         self.logger.info("Verifying capture on interface %s using function %s"
753                          % (dst_if.name, compare_func.__name__))
754
755         last_info = dict()
756         for i in self.pg_interfaces:
757             last_info[i.sw_if_index] = None
758         dst_sw_if_index = dst_if.sw_if_index
759
760         for packet in capture:
761             try:
762                 # extract payload_info from packet's payload
763                 payload_info = self.get_payload_info(packet)
764                 packet_index = payload_info.index
765
766                 self.logger.debug("Verifying packet with index %d"
767                                   % (packet_index))
768                 # packet should have arrived on the expected interface
769                 self.assertEqual(payload_info.dst, dst_sw_if_index)
770                 self.logger.debug(
771                     "Got packet on interface %s: src=%u (idx=%u)" %
772                     (dst_if.name, payload_info.src, packet_index))
773
774                 # search for payload_info with same src and dst if_index
775                 # this will give us the transmitted packet
776                 next_info = self.get_next_packet_info_for_interface2(
777                     payload_info.src, dst_sw_if_index,
778                     last_info[payload_info.src])
779                 last_info[payload_info.src] = next_info
780                 # next_info should not be None
781                 self.assertTrue(next_info is not None)
782                 # index of tx and rx packets should be equal
783                 self.assertEqual(packet_index, next_info.index)
784                 # data field of next_info contains the tx packet
785                 txed_packet = next_info.data
786
787                 self.logger.debug(ppp("Transmitted packet:",
788                                       txed_packet))  # ppp=Pretty Print Packet
789
790                 self.logger.debug(ppp("Received packet:", packet))
791
792                 # compare rcvd packet with expected packet using compare_func
793                 compare_func(txed_packet, packet)
794
795             except:
796                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
797                 raise
798
799         # have all expected packets arrived?
800         for i in self.pg_interfaces:
801             remaining_packet = self.get_next_packet_info_for_interface2(
802                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
803             self.assertTrue(remaining_packet is None,
804                             "Interface %s: Packet expected from interface %s "
805                             "didn't arrive" % (dst_if.name, i.name))
806
807
808 if __name__ == '__main__':
809     unittest.main(testRunner=VppTestRunner)