fib: fib api updates
[vpp.git] / test / test_srv6_ad.py
1 #!/usr/bin/env python
2
3 import unittest
4 import binascii
5 from socket import AF_INET6
6
7 from framework import VppTestCase, VppTestRunner
8 from vpp_ip import DpoProto
9 from vpp_ip_route import VppIpRoute, VppRoutePath, VppIpTable
10 from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \
11     SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes
12
13 import scapy.compat
14 from scapy.packet import Raw
15 from scapy.layers.l2 import Ether, Dot1Q
16 from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
17 from scapy.layers.inet import IP, UDP
18
19 from scapy.utils import inet_pton, inet_ntop
20
21 from util import ppp
22
23
24 class TestSRv6(VppTestCase):
25     """ SRv6 Dynamic Proxy plugin Test Case """
26
27     @classmethod
28     def setUpClass(self):
29         super(TestSRv6, self).setUpClass()
30
31     @classmethod
32     def tearDownClass(cls):
33         super(TestSRv6, cls).tearDownClass()
34
35     def setUp(self):
36         """ Perform test setup before each test case.
37         """
38         super(TestSRv6, self).setUp()
39
40         # packet sizes, inclusive L2 overhead
41         self.pg_packet_sizes = [64, 512, 1518, 9018]
42
43         # reset packet_infos
44         self.reset_packet_infos()
45
46     def tearDown(self):
47         """ Clean up test setup after each test case.
48         """
49         self.teardown_interfaces()
50
51         super(TestSRv6, self).tearDown()
52
53     def configure_interface(self,
54                             interface,
55                             ipv6=False, ipv4=False,
56                             ipv6_table_id=0, ipv4_table_id=0):
57         """ Configure interface.
58         :param ipv6: configure IPv6 on interface
59         :param ipv4: configure IPv4 on interface
60         :param ipv6_table_id: FIB table_id for IPv6
61         :param ipv4_table_id: FIB table_id for IPv4
62         """
63         self.logger.debug("Configuring interface %s" % (interface.name))
64         if ipv6:
65             self.logger.debug("Configuring IPv6")
66             interface.set_table_ip6(ipv6_table_id)
67             interface.config_ip6()
68             interface.resolve_ndp(timeout=5)
69         if ipv4:
70             self.logger.debug("Configuring IPv4")
71             interface.set_table_ip4(ipv4_table_id)
72             interface.config_ip4()
73             interface.resolve_arp()
74         interface.admin_up()
75
76     def setup_interfaces(self, ipv6=[], ipv4=[],
77                          ipv6_table_id=[], ipv4_table_id=[]):
78         """ Create and configure interfaces.
79
80         :param ipv6: list of interface IPv6 capabilities
81         :param ipv4: list of interface IPv4 capabilities
82         :param ipv6_table_id: list of intf IPv6 FIB table_ids
83         :param ipv4_table_id: list of intf IPv4 FIB table_ids
84         :returns: List of created interfaces.
85         """
86         # how many interfaces?
87         if len(ipv6):
88             count = len(ipv6)
89         else:
90             count = len(ipv4)
91         self.logger.debug("Creating and configuring %d interfaces" % (count))
92
93         # fill up ipv6 and ipv4 lists if needed
94         # not enabled (False) is the default
95         if len(ipv6) < count:
96             ipv6 += (count - len(ipv6)) * [False]
97         if len(ipv4) < count:
98             ipv4 += (count - len(ipv4)) * [False]
99
100         # fill up table_id lists if needed
101         # table_id 0 (global) is the default
102         if len(ipv6_table_id) < count:
103             ipv6_table_id += (count - len(ipv6_table_id)) * [0]
104         if len(ipv4_table_id) < count:
105             ipv4_table_id += (count - len(ipv4_table_id)) * [0]
106
107         # create 'count' pg interfaces
108         self.create_pg_interfaces(range(count))
109
110         # setup all interfaces
111         for i in range(count):
112             intf = self.pg_interfaces[i]
113             self.configure_interface(intf,
114                                      ipv6[i], ipv4[i],
115                                      ipv6_table_id[i], ipv4_table_id[i])
116
117         if any(ipv6):
118             self.logger.debug(self.vapi.cli("show ip6 neighbors"))
119         if any(ipv4):
120             self.logger.debug(self.vapi.cli("show ip arp"))
121         self.logger.debug(self.vapi.cli("show interface"))
122         self.logger.debug(self.vapi.cli("show hardware"))
123
124         return self.pg_interfaces
125
126     def teardown_interfaces(self):
127         """ Unconfigure and bring down interface.
128         """
129         self.logger.debug("Tearing down interfaces")
130         # tear down all interfaces
131         # AFAIK they cannot be deleted
132         for i in self.pg_interfaces:
133             self.logger.debug("Tear down interface %s" % (i.name))
134             i.admin_down()
135             i.unconfig()
136             i.set_table_ip4(0)
137             i.set_table_ip6(0)
138
139     def test_SRv6_End_AD_IPv6(self):
140         """ Test SRv6 End.AD behavior with IPv6 traffic.
141         """
142         self.src_addr = 'a0::'
143         self.sid_list = ['a1::', 'a2::a6', 'a3::']
144         self.test_sid_index = 1
145
146         # send traffic to one destination interface
147         # source and destination interfaces are IPv6 only
148         self.setup_interfaces(ipv6=[True, True])
149
150         # configure route to next segment
151         route = VppIpRoute(self, self.sid_list[self.test_sid_index + 1], 128,
152                            [VppRoutePath(self.pg0.remote_ip6,
153                                          self.pg0.sw_if_index,
154                                          proto=DpoProto.DPO_PROTO_IP6)])
155         route.add_vpp_config()
156
157         # configure SRv6 localSID behavior
158         cli_str = "sr localsid address " + \
159                   self.sid_list[self.test_sid_index] + \
160                   " behavior end.ad" + \
161                   " nh " + self.pg1.remote_ip6 + \
162                   " oif " + self.pg1.name + \
163                   " iif " + self.pg1.name
164         self.vapi.cli(cli_str)
165
166         # log the localsids
167         self.logger.debug(self.vapi.cli("show sr localsid"))
168
169         # send one packet per packet size
170         count = len(self.pg_packet_sizes)
171
172         # prepare IPv6 in SRv6 headers
173         packet_header1 = self.create_packet_header_IPv6_SRH_IPv6(
174             srcaddr=self.src_addr,
175             sidlist=self.sid_list[::-1],
176             segleft=len(self.sid_list) - self.test_sid_index - 1)
177
178         # generate packets (pg0->pg1)
179         pkts1 = self.create_stream(self.pg0, self.pg1, packet_header1,
180                                    self.pg_packet_sizes, count)
181
182         # send packets and verify received packets
183         self.send_and_verify_pkts(self.pg0, pkts1, self.pg1,
184                                   self.compare_rx_tx_packet_End_AD_IPv6_out)
185
186         # log the localsid counters
187         self.logger.info(self.vapi.cli("show sr localsid"))
188
189         # prepare IPv6 header for returning packets
190         packet_header2 = self.create_packet_header_IPv6()
191
192         # generate returning packets (pg1->pg0)
193         pkts2 = self.create_stream(self.pg1, self.pg0, packet_header2,
194                                    self.pg_packet_sizes, count)
195
196         # send packets and verify received packets
197         self.send_and_verify_pkts(self.pg1, pkts2, self.pg0,
198                                   self.compare_rx_tx_packet_End_AD_IPv6_in)
199
200         # log the localsid counters
201         self.logger.info(self.vapi.cli("show sr localsid"))
202
203         # remove SRv6 localSIDs
204         cli_str = "sr localsid del address " + \
205                   self.sid_list[self.test_sid_index]
206         self.vapi.cli(cli_str)
207
208         # cleanup interfaces
209         self.teardown_interfaces()
210
211     def compare_rx_tx_packet_End_AD_IPv6_out(self, tx_pkt, rx_pkt):
212         """ Compare input and output packet after passing End.AD with IPv6
213
214         :param tx_pkt: transmitted packet
215         :param rx_pkt: received packet
216         """
217
218         # get first (outer) IPv6 header of rx'ed packet
219         rx_ip = rx_pkt.getlayer(IPv6)
220
221         tx_ip = tx_pkt.getlayer(IPv6)
222         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
223
224         # verify if rx'ed packet has no SRH
225         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
226
227         # the whole rx_ip pkt should be equal to tx_ip2
228         # except for the hlim field
229         #   -> adjust tx'ed hlim to expected hlim
230         tx_ip2.hlim = tx_ip2.hlim - 1
231
232         self.assertEqual(rx_ip, tx_ip2)
233
234         self.logger.debug("packet verification: SUCCESS")
235
236     def compare_rx_tx_packet_End_AD_IPv6_in(self, tx_pkt, rx_pkt):
237         """ Compare input and output packet after passing End.AD
238
239         :param tx_pkt: transmitted packet
240         :param rx_pkt: received packet
241         """
242
243         # get first (outer) IPv6 header of rx'ed packet
244         rx_ip = rx_pkt.getlayer(IPv6)
245         # received ip.src should be equal to SR Policy source
246         self.assertEqual(rx_ip.src, self.src_addr)
247         # received ip.dst should be equal to expected sidlist next segment
248         self.assertEqual(rx_ip.dst, self.sid_list[self.test_sid_index + 1])
249
250         # rx'ed packet should have SRH
251         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
252
253         # get SRH
254         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
255         # rx'ed seglist should be equal to SID-list in reversed order
256         self.assertEqual(rx_srh.addresses, self.sid_list[::-1])
257         # segleft should be equal to previous segleft value minus 1
258         self.assertEqual(rx_srh.segleft,
259                          len(self.sid_list) - self.test_sid_index - 2)
260         # lastentry should be equal to the SID-list length minus 1
261         self.assertEqual(rx_srh.lastentry, len(self.sid_list) - 1)
262
263         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
264         # except for the hop-limit field
265         tx_ip = tx_pkt.getlayer(IPv6)
266         #   -> update tx'ed hlim to the expected hlim
267         tx_ip.hlim -= 1
268         #   -> check payload
269         self.assertEqual(rx_srh.payload, tx_ip)
270
271         self.logger.debug("packet verification: SUCCESS")
272
273     def test_SRv6_End_AD_IPv4(self):
274         """ Test SRv6 End.AD behavior with IPv4 traffic.
275         """
276         self.src_addr = 'a0::'
277         self.sid_list = ['a1::', 'a2::a4', 'a3::']
278         self.test_sid_index = 1
279
280         # send traffic to one destination interface
281         # source and destination interfaces are IPv6 only
282         self.setup_interfaces(ipv6=[True, False], ipv4=[False, True])
283
284         # configure route to next segment
285         route = VppIpRoute(self, self.sid_list[self.test_sid_index + 1], 128,
286                            [VppRoutePath(self.pg0.remote_ip6,
287                                          self.pg0.sw_if_index,
288                                          proto=DpoProto.DPO_PROTO_IP6)])
289         route.add_vpp_config()
290
291         # configure SRv6 localSID behavior
292         cli_str = "sr localsid address " + \
293                   self.sid_list[self.test_sid_index] + \
294                   " behavior end.ad" + \
295                   " nh " + self.pg1.remote_ip4 + \
296                   " oif " + self.pg1.name + \
297                   " iif " + self.pg1.name
298         self.vapi.cli(cli_str)
299
300         # log the localsids
301         self.logger.debug(self.vapi.cli("show sr localsid"))
302
303         # send one packet per packet size
304         count = len(self.pg_packet_sizes)
305
306         # prepare IPv4 in SRv6 headers
307         packet_header1 = self.create_packet_header_IPv6_SRH_IPv4(
308             srcaddr=self.src_addr,
309             sidlist=self.sid_list[::-1],
310             segleft=len(self.sid_list) - self.test_sid_index - 1)
311
312         # generate packets (pg0->pg1)
313         pkts1 = self.create_stream(self.pg0, self.pg1, packet_header1,
314                                    self.pg_packet_sizes, count)
315
316         # send packets and verify received packets
317         self.send_and_verify_pkts(self.pg0, pkts1, self.pg1,
318                                   self.compare_rx_tx_packet_End_AD_IPv4_out)
319
320         # log the localsid counters
321         self.logger.info(self.vapi.cli("show sr localsid"))
322
323         # prepare IPv6 header for returning packets
324         packet_header2 = self.create_packet_header_IPv4()
325
326         # generate returning packets (pg1->pg0)
327         pkts2 = self.create_stream(self.pg1, self.pg0, packet_header2,
328                                    self.pg_packet_sizes, count)
329
330         # send packets and verify received packets
331         self.send_and_verify_pkts(self.pg1, pkts2, self.pg0,
332                                   self.compare_rx_tx_packet_End_AD_IPv4_in)
333
334         # log the localsid counters
335         self.logger.info(self.vapi.cli("show sr localsid"))
336
337         # remove SRv6 localSIDs
338         cli_str = "sr localsid del address " + \
339                   self.sid_list[self.test_sid_index]
340         self.vapi.cli(cli_str)
341
342         # cleanup interfaces
343         self.teardown_interfaces()
344
345     def compare_rx_tx_packet_End_AD_IPv4_out(self, tx_pkt, rx_pkt):
346         """ Compare input and output packet after passing End.AD with IPv4
347
348         :param tx_pkt: transmitted packet
349         :param rx_pkt: received packet
350         """
351
352         # get IPv4 header of rx'ed packet
353         rx_ip = rx_pkt.getlayer(IP)
354
355         tx_ip = tx_pkt.getlayer(IPv6)
356         tx_ip2 = tx_pkt.getlayer(IP)
357
358         # verify if rx'ed packet has no SRH
359         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
360
361         # the whole rx_ip pkt should be equal to tx_ip2
362         # except for the ttl field and ip checksum
363         #   -> adjust tx'ed ttl to expected ttl
364         tx_ip2.ttl = tx_ip2.ttl - 1
365         #   -> set tx'ed ip checksum to None and let scapy recompute
366         tx_ip2.chksum = None
367         # read back the pkt (with str()) to force computing these fields
368         # probably other ways to accomplish this are possible
369         tx_ip2 = IP(scapy.compat.raw(tx_ip2))
370
371         self.assertEqual(rx_ip, tx_ip2)
372
373         self.logger.debug("packet verification: SUCCESS")
374
375     def compare_rx_tx_packet_End_AD_IPv4_in(self, tx_pkt, rx_pkt):
376         """ Compare input and output packet after passing End.AD
377
378         :param tx_pkt: transmitted packet
379         :param rx_pkt: received packet
380         """
381
382         # get first (outer) IPv6 header of rx'ed packet
383         rx_ip = rx_pkt.getlayer(IPv6)
384         # received ip.src should be equal to SR Policy source
385         self.assertEqual(rx_ip.src, self.src_addr)
386         # received ip.dst should be equal to expected sidlist next segment
387         self.assertEqual(rx_ip.dst, self.sid_list[self.test_sid_index + 1])
388
389         # rx'ed packet should have SRH
390         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
391
392         # get SRH
393         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
394         # rx'ed seglist should be equal to SID-list in reversed order
395         self.assertEqual(rx_srh.addresses, self.sid_list[::-1])
396         # segleft should be equal to previous segleft value minus 1
397         self.assertEqual(rx_srh.segleft,
398                          len(self.sid_list) - self.test_sid_index - 2)
399         # lastentry should be equal to the SID-list length minus 1
400         self.assertEqual(rx_srh.lastentry, len(self.sid_list) - 1)
401
402         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
403         # except for the ttl field and ip checksum
404         tx_ip = tx_pkt.getlayer(IP)
405         #   -> adjust tx'ed ttl to expected ttl
406         tx_ip.ttl = tx_ip.ttl - 1
407         #   -> set tx'ed ip checksum to None and let scapy recompute
408         tx_ip.chksum = None
409         #   -> read back the pkt (with str()) to force computing these fields
410         # probably other ways to accomplish this are possible
411         self.assertEqual(rx_srh.payload, IP(scapy.compat.raw(tx_ip)))
412
413         self.logger.debug("packet verification: SUCCESS")
414
415     def test_SRv6_End_AD_L2(self):
416         """ Test SRv6 End.AD behavior with L2 traffic.
417         """
418         self.src_addr = 'a0::'
419         self.sid_list = ['a1::', 'a2::a4', 'a3::']
420         self.test_sid_index = 1
421
422         # send traffic to one destination interface
423         # source and destination interfaces are IPv6 only
424         self.setup_interfaces(ipv6=[True, False])
425
426         # configure route to next segment
427         route = VppIpRoute(self, self.sid_list[self.test_sid_index + 1], 128,
428                            [VppRoutePath(self.pg0.remote_ip6,
429                                          self.pg0.sw_if_index,
430                                          proto=DpoProto.DPO_PROTO_IP6)])
431         route.add_vpp_config()
432
433         # configure SRv6 localSID behavior
434         cli_str = "sr localsid address " + \
435                   self.sid_list[self.test_sid_index] + \
436                   " behavior end.ad" + \
437                   " oif " + self.pg1.name + \
438                   " iif " + self.pg1.name
439         self.vapi.cli(cli_str)
440
441         # log the localsids
442         self.logger.debug(self.vapi.cli("show sr localsid"))
443
444         # send one packet per packet size
445         count = len(self.pg_packet_sizes)
446
447         # prepare L2 in SRv6 headers
448         packet_header1 = self.create_packet_header_IPv6_SRH_L2(
449             srcaddr=self.src_addr,
450             sidlist=self.sid_list[::-1],
451             segleft=len(self.sid_list) - self.test_sid_index - 1,
452             vlan=0)
453
454         # generate packets (pg0->pg1)
455         pkts1 = self.create_stream(self.pg0, self.pg1, packet_header1,
456                                    self.pg_packet_sizes, count)
457
458         # send packets and verify received packets
459         self.send_and_verify_pkts(self.pg0, pkts1, self.pg1,
460                                   self.compare_rx_tx_packet_End_AD_L2_out)
461
462         # log the localsid counters
463         self.logger.info(self.vapi.cli("show sr localsid"))
464
465         # prepare L2 header for returning packets
466         packet_header2 = self.create_packet_header_L2()
467
468         # generate returning packets (pg1->pg0)
469         pkts2 = self.create_stream(self.pg1, self.pg0, packet_header2,
470                                    self.pg_packet_sizes, count)
471
472         # send packets and verify received packets
473         self.send_and_verify_pkts(self.pg1, pkts2, self.pg0,
474                                   self.compare_rx_tx_packet_End_AD_L2_in)
475
476         # log the localsid counters
477         self.logger.info(self.vapi.cli("show sr localsid"))
478
479         # remove SRv6 localSIDs
480         cli_str = "sr localsid del address " + \
481                   self.sid_list[self.test_sid_index]
482         self.vapi.cli(cli_str)
483
484         # cleanup interfaces
485         self.teardown_interfaces()
486
487     def compare_rx_tx_packet_End_AD_L2_out(self, tx_pkt, rx_pkt):
488         """ Compare input and output packet after passing End.AD with L2
489
490         :param tx_pkt: transmitted packet
491         :param rx_pkt: received packet
492         """
493
494         # get IPv4 header of rx'ed packet
495         rx_eth = rx_pkt.getlayer(Ether)
496
497         tx_ip = tx_pkt.getlayer(IPv6)
498         # we can't just get the 2nd Ether layer
499         # get the Raw content and dissect it as Ether
500         tx_eth1 = Ether(scapy.compat.raw(tx_pkt[Raw]))
501
502         # verify if rx'ed packet has no SRH
503         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
504
505         # the whole rx_eth pkt should be equal to tx_eth1
506         self.assertEqual(rx_eth, tx_eth1)
507
508         self.logger.debug("packet verification: SUCCESS")
509
510     def compare_rx_tx_packet_End_AD_L2_in(self, tx_pkt, rx_pkt):
511         """ Compare input and output packet after passing End.AD
512
513         :param tx_pkt: transmitted packet
514         :param rx_pkt: received packet
515         """
516
517         ####
518         # get first (outer) IPv6 header of rx'ed packet
519         rx_ip = rx_pkt.getlayer(IPv6)
520         # received ip.src should be equal to SR Policy source
521         self.assertEqual(rx_ip.src, self.src_addr)
522         # received ip.dst should be equal to expected sidlist next segment
523         self.assertEqual(rx_ip.dst, self.sid_list[self.test_sid_index + 1])
524
525         # rx'ed packet should have SRH
526         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
527
528         # get SRH
529         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
530         # rx'ed seglist should be equal to SID-list in reversed order
531         self.assertEqual(rx_srh.addresses, self.sid_list[::-1])
532         # segleft should be equal to previous segleft value minus 1
533         self.assertEqual(rx_srh.segleft,
534                          len(self.sid_list) - self.test_sid_index - 2)
535         # lastentry should be equal to the SID-list length minus 1
536         self.assertEqual(rx_srh.lastentry, len(self.sid_list) - 1)
537
538         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
539         tx_ether = tx_pkt.getlayer(Ether)
540         self.assertEqual(Ether(scapy.compat.raw(rx_srh.payload)), tx_ether)
541
542         self.logger.debug("packet verification: SUCCESS")
543
544     def create_stream(self, src_if, dst_if, packet_header, packet_sizes,
545                       count):
546         """Create SRv6 input packet stream for defined interface.
547
548         :param VppInterface src_if: Interface to create packet stream for
549         :param VppInterface dst_if: destination interface of packet stream
550         :param packet_header: Layer3 scapy packet headers,
551                 L2 is added when not provided,
552                 Raw(payload) with packet_info is added
553         :param list packet_sizes: packet stream pckt sizes,sequentially applied
554                to packets in stream have
555         :param int count: number of packets in packet stream
556         :return: list of packets
557         """
558         self.logger.info("Creating packets")
559         pkts = []
560         for i in range(0, count - 1):
561             payload_info = self.create_packet_info(src_if, dst_if)
562             self.logger.debug(
563                 "Creating packet with index %d" % (payload_info.index))
564             payload = self.info_to_payload(payload_info)
565             # add L2 header if not yet provided in packet_header
566             if packet_header.getlayer(0).name == 'Ethernet':
567                 p = packet_header / Raw(payload)
568             else:
569                 p = Ether(dst=src_if.local_mac, src=src_if.remote_mac) / \
570                     packet_header / Raw(payload)
571             size = packet_sizes[i % len(packet_sizes)]
572             self.logger.debug("Packet size %d" % (size))
573             self.extend_packet(p, size)
574             # we need to store the packet with the automatic fields computed
575             # read back the dumped packet (with str())
576             # to force computing these fields
577             # probably other ways are possible
578             p = Ether(scapy.compat.raw(p))
579             payload_info.data = p.copy()
580             self.logger.debug(ppp("Created packet:", p))
581             pkts.append(p)
582         self.logger.info("Done creating packets")
583         return pkts
584
585     def send_and_verify_pkts(self, input, pkts, output, compare_func):
586         """Send packets and verify received packets using compare_func
587
588         :param input: ingress interface of DUT
589         :param pkts: list of packets to transmit
590         :param output: egress interface of DUT
591         :param compare_func: function to compare in and out packets
592         """
593         # add traffic stream to input interface
594         input.add_stream(pkts)
595
596         # enable capture on all interfaces
597         self.pg_enable_capture(self.pg_interfaces)
598
599         # start traffic
600         self.logger.info("Starting traffic")
601         self.pg_start()
602
603         # get output capture
604         self.logger.info("Getting packet capture")
605         capture = output.get_capture()
606
607         # assert nothing was captured on input interface
608         # input.assert_nothing_captured()
609
610         # verify captured packets
611         self.verify_captured_pkts(output, capture, compare_func)
612
613     def create_packet_header_IPv6(self):
614         """Create packet header: IPv6 header, UDP header
615
616         :param dst: IPv6 destination address
617
618         IPv6 source address is 1234::1
619         IPv6 destination address is 4321::1
620         UDP source port and destination port are 1234
621         """
622
623         p = IPv6(src='1234::1', dst='4321::1') / UDP(sport=1234, dport=1234)
624         return p
625
626     def create_packet_header_IPv6_SRH_IPv6(self, srcaddr, sidlist, segleft):
627         """Create packet header: IPv6 encapsulated in SRv6:
628         IPv6 header with SRH, IPv6 header, UDP header
629
630         :param int srcaddr: outer source address
631         :param list sidlist: segment list of outer IPv6 SRH
632         :param int segleft: segments-left field of outer IPv6 SRH
633
634         Outer IPv6 source address is set to srcaddr
635         Outer IPv6 destination address is set to sidlist[segleft]
636         Inner IPv6 source addresses is 1234::1
637         Inner IPv6 destination address is 4321::1
638         UDP source port and destination port are 1234
639         """
640
641         p = IPv6(src=srcaddr, dst=sidlist[segleft]) / \
642             IPv6ExtHdrSegmentRouting(addresses=sidlist,
643                                      segleft=segleft, nh=41) / \
644             IPv6(src='1234::1', dst='4321::1') / \
645             UDP(sport=1234, dport=1234)
646         return p
647
648     def create_packet_header_IPv4(self):
649         """Create packet header: IPv4 header, UDP header
650
651         :param dst: IPv4 destination address
652
653         IPv4 source address is 123.1.1.1
654         IPv4 destination address is 124.1.1.1
655         UDP source port and destination port are 1234
656         """
657
658         p = IP(src='123.1.1.1', dst='124.1.1.1') / UDP(sport=1234, dport=1234)
659         return p
660
661     def create_packet_header_IPv6_SRH_IPv4(self, srcaddr, sidlist, segleft):
662         """Create packet header: IPv4 encapsulated in SRv6:
663         IPv6 header with SRH, IPv4 header, UDP header
664
665         :param int srcaddr: outer source address
666         :param list sidlist: segment list of outer IPv6 SRH
667         :param int segleft: segments-left field of outer IPv6 SRH
668
669         Outer IPv6 source address is set to srcaddr
670         Outer IPv6 destination address is set to sidlist[segleft]
671         Inner IPv4 source address is 123.1.1.1
672         Inner IPv4 destination address is 124.1.1.1
673         UDP source port and destination port are 1234
674         """
675
676         p = IPv6(src=srcaddr, dst=sidlist[segleft]) / \
677             IPv6ExtHdrSegmentRouting(addresses=sidlist,
678                                      segleft=segleft, nh=4) / \
679             IP(src='123.1.1.1', dst='124.1.1.1') / \
680             UDP(sport=1234, dport=1234)
681         return p
682
683     def create_packet_header_L2(self, vlan=0):
684         """Create packet header: L2 header
685
686         :param vlan: if vlan!=0 then add 802.1q header
687         """
688         # Note: the dst addr ('00:55:44:33:22:11') is used in
689         # the compare function compare_rx_tx_packet_T_Encaps_L2
690         # to detect presence of L2 in SRH payload
691         p = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
692         etype = 0x8137  # IPX
693         if vlan:
694             # add 802.1q layer
695             p /= Dot1Q(vlan=vlan, type=etype)
696         else:
697             p.type = etype
698         return p
699
700     def create_packet_header_IPv6_SRH_L2(self, srcaddr, sidlist, segleft,
701                                          vlan=0):
702         """Create packet header: L2 encapsulated in SRv6:
703         IPv6 header with SRH, L2
704
705         :param int srcaddr: IPv6 source address
706         :param list sidlist: segment list of outer IPv6 SRH
707         :param int segleft: segments-left field of outer IPv6 SRH
708         :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
709
710         IPv6 source address is set to srcaddr
711         IPv6 destination address is set to sidlist[segleft]
712         """
713         eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
714         etype = 0x8137  # IPX
715         if vlan:
716             # add 802.1q layer
717             eth /= Dot1Q(vlan=vlan, type=etype)
718         else:
719             eth.type = etype
720
721         p = IPv6(src=srcaddr, dst=sidlist[segleft]) / \
722             IPv6ExtHdrSegmentRouting(addresses=sidlist,
723                                      segleft=segleft, nh=59) / \
724             eth
725         return p
726
727     def get_payload_info(self, packet):
728         """ Extract the payload_info from the packet
729         """
730         # in most cases, payload_info is in packet[Raw]
731         # but packet[Raw] gives the complete payload
732         # (incl L2 header) for the T.Encaps L2 case
733         try:
734             payload_info = self.payload_to_info(packet[Raw])
735
736         except:
737             # remote L2 header from packet[Raw]:
738             # take packet[Raw], convert it to an Ether layer
739             # and then extract Raw from it
740             payload_info = self.payload_to_info(
741                 Ether(scapy.compat.raw(packet[Raw]))[Raw])
742
743         return payload_info
744
745     def verify_captured_pkts(self, dst_if, capture, compare_func):
746         """
747         Verify captured packet stream for specified interface.
748         Compare ingress with egress packets using the specified compare fn
749
750         :param dst_if: egress interface of DUT
751         :param capture: captured packets
752         :param compare_func: function to compare in and out packet
753         """
754         self.logger.info("Verifying capture on interface %s using function %s"
755                          % (dst_if.name, compare_func.__name__))
756
757         last_info = dict()
758         for i in self.pg_interfaces:
759             last_info[i.sw_if_index] = None
760         dst_sw_if_index = dst_if.sw_if_index
761
762         for packet in capture:
763             try:
764                 # extract payload_info from packet's payload
765                 payload_info = self.get_payload_info(packet)
766                 packet_index = payload_info.index
767
768                 self.logger.debug("Verifying packet with index %d"
769                                   % (packet_index))
770                 # packet should have arrived on the expected interface
771                 self.assertEqual(payload_info.dst, dst_sw_if_index)
772                 self.logger.debug(
773                     "Got packet on interface %s: src=%u (idx=%u)" %
774                     (dst_if.name, payload_info.src, packet_index))
775
776                 # search for payload_info with same src and dst if_index
777                 # this will give us the transmitted packet
778                 next_info = self.get_next_packet_info_for_interface2(
779                     payload_info.src, dst_sw_if_index,
780                     last_info[payload_info.src])
781                 last_info[payload_info.src] = next_info
782                 # next_info should not be None
783                 self.assertTrue(next_info is not None)
784                 # index of tx and rx packets should be equal
785                 self.assertEqual(packet_index, next_info.index)
786                 # data field of next_info contains the tx packet
787                 txed_packet = next_info.data
788
789                 self.logger.debug(ppp("Transmitted packet:",
790                                       txed_packet))  # ppp=Pretty Print Packet
791
792                 self.logger.debug(ppp("Received packet:", packet))
793
794                 # compare rcvd packet with expected packet using compare_func
795                 compare_func(txed_packet, packet)
796
797             except:
798                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
799                 raise
800
801         # have all expected packets arrived?
802         for i in self.pg_interfaces:
803             remaining_packet = self.get_next_packet_info_for_interface2(
804                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
805             self.assertTrue(remaining_packet is None,
806                             "Interface %s: Packet expected from interface %s "
807                             "didn't arrive" % (dst_if.name, i.name))
808
809
810 if __name__ == '__main__':
811     unittest.main(testRunner=VppTestRunner)