75d9d12da4802eaf341c6bec55566ab96877b236
[vpp.git] / test / test_srv6_ad.py
1 #!/usr/bin/env python
2
3 import unittest
4 import binascii
5 from socket import AF_INET6
6
7 from framework import VppTestCase, VppTestRunner
8 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto, VppIpTable
9 from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \
10     SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes
11
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether, Dot1Q
14 from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
15 from scapy.layers.inet import IP, UDP
16
17 from scapy.utils import inet_pton, inet_ntop
18
19 from util import ppp
20
21
22 class TestSRv6(VppTestCase):
23     """ SRv6 Dynamic Proxy plugin Test Case """
24
25     @classmethod
26     def setUpClass(self):
27         super(TestSRv6, self).setUpClass()
28
29     def setUp(self):
30         """ Perform test setup before each test case.
31         """
32         super(TestSRv6, self).setUp()
33
34         # packet sizes, inclusive L2 overhead
35         self.pg_packet_sizes = [64, 512, 1518, 9018]
36
37         # reset packet_infos
38         self.reset_packet_infos()
39
40     def tearDown(self):
41         """ Clean up test setup after each test case.
42         """
43         self.teardown_interfaces()
44
45         super(TestSRv6, self).tearDown()
46
47     def configure_interface(self,
48                             interface,
49                             ipv6=False, ipv4=False,
50                             ipv6_table_id=0, ipv4_table_id=0):
51         """ Configure interface.
52         :param ipv6: configure IPv6 on interface
53         :param ipv4: configure IPv4 on interface
54         :param ipv6_table_id: FIB table_id for IPv6
55         :param ipv4_table_id: FIB table_id for IPv4
56         """
57         self.logger.debug("Configuring interface %s" % (interface.name))
58         if ipv6:
59             self.logger.debug("Configuring IPv6")
60             interface.set_table_ip6(ipv6_table_id)
61             interface.config_ip6()
62             interface.resolve_ndp(timeout=5)
63         if ipv4:
64             self.logger.debug("Configuring IPv4")
65             interface.set_table_ip4(ipv4_table_id)
66             interface.config_ip4()
67             interface.resolve_arp()
68         interface.admin_up()
69
70     def setup_interfaces(self, ipv6=[], ipv4=[],
71                          ipv6_table_id=[], ipv4_table_id=[]):
72         """ Create and configure interfaces.
73
74         :param ipv6: list of interface IPv6 capabilities
75         :param ipv4: list of interface IPv4 capabilities
76         :param ipv6_table_id: list of intf IPv6 FIB table_ids
77         :param ipv4_table_id: list of intf IPv4 FIB table_ids
78         :returns: List of created interfaces.
79         """
80         # how many interfaces?
81         if len(ipv6):
82             count = len(ipv6)
83         else:
84             count = len(ipv4)
85         self.logger.debug("Creating and configuring %d interfaces" % (count))
86
87         # fill up ipv6 and ipv4 lists if needed
88         # not enabled (False) is the default
89         if len(ipv6) < count:
90             ipv6 += (count - len(ipv6)) * [False]
91         if len(ipv4) < count:
92             ipv4 += (count - len(ipv4)) * [False]
93
94         # fill up table_id lists if needed
95         # table_id 0 (global) is the default
96         if len(ipv6_table_id) < count:
97             ipv6_table_id += (count - len(ipv6_table_id)) * [0]
98         if len(ipv4_table_id) < count:
99             ipv4_table_id += (count - len(ipv4_table_id)) * [0]
100
101         # create 'count' pg interfaces
102         self.create_pg_interfaces(range(count))
103
104         # setup all interfaces
105         for i in range(count):
106             intf = self.pg_interfaces[i]
107             self.configure_interface(intf,
108                                      ipv6[i], ipv4[i],
109                                      ipv6_table_id[i], ipv4_table_id[i])
110
111         if any(ipv6):
112             self.logger.debug(self.vapi.cli("show ip6 neighbors"))
113         if any(ipv4):
114             self.logger.debug(self.vapi.cli("show ip arp"))
115         self.logger.debug(self.vapi.cli("show interface"))
116         self.logger.debug(self.vapi.cli("show hardware"))
117
118         return self.pg_interfaces
119
120     def teardown_interfaces(self):
121         """ Unconfigure and bring down interface.
122         """
123         self.logger.debug("Tearing down interfaces")
124         # tear down all interfaces
125         # AFAIK they cannot be deleted
126         for i in self.pg_interfaces:
127             self.logger.debug("Tear down interface %s" % (i.name))
128             i.admin_down()
129             i.unconfig()
130             i.set_table_ip4(0)
131             i.set_table_ip6(0)
132
133     def test_SRv6_End_AD_IPv6(self):
134         """ Test SRv6 End.AD behavior with IPv6 traffic.
135         """
136         self.src_addr = 'a0::'
137         self.sid_list = ['a1::', 'a2::a6', 'a3::']
138         self.test_sid_index = 1
139
140         # send traffic to one destination interface
141         # source and destination interfaces are IPv6 only
142         self.setup_interfaces(ipv6=[True, True])
143
144         # configure route to next segment
145         route = VppIpRoute(self, self.sid_list[self.test_sid_index + 1], 128,
146                            [VppRoutePath(self.pg0.remote_ip6,
147                                          self.pg0.sw_if_index,
148                                          proto=DpoProto.DPO_PROTO_IP6)],
149                            is_ip6=1)
150         route.add_vpp_config()
151
152         # configure SRv6 localSID behavior
153         cli_str = "sr localsid address " + \
154                   self.sid_list[self.test_sid_index] + \
155                   " behavior end.ad" + \
156                   " nh " + self.pg1.remote_ip6 + \
157                   " oif " + self.pg1.name + \
158                   " iif " + self.pg1.name
159         self.vapi.cli(cli_str)
160
161         # log the localsids
162         self.logger.debug(self.vapi.cli("show sr localsid"))
163
164         # send one packet per packet size
165         count = len(self.pg_packet_sizes)
166
167         # prepare IPv6 in SRv6 headers
168         packet_header1 = self.create_packet_header_IPv6_SRH_IPv6(
169             srcaddr=self.src_addr,
170             sidlist=self.sid_list[::-1],
171             segleft=len(self.sid_list) - self.test_sid_index - 1)
172
173         # generate packets (pg0->pg1)
174         pkts1 = self.create_stream(self.pg0, self.pg1, packet_header1,
175                                    self.pg_packet_sizes, count)
176
177         # send packets and verify received packets
178         self.send_and_verify_pkts(self.pg0, pkts1, self.pg1,
179                                   self.compare_rx_tx_packet_End_AD_IPv6_out)
180
181         # log the localsid counters
182         self.logger.info(self.vapi.cli("show sr localsid"))
183
184         # prepare IPv6 header for returning packets
185         packet_header2 = self.create_packet_header_IPv6()
186
187         # generate returning packets (pg1->pg0)
188         pkts2 = self.create_stream(self.pg1, self.pg0, packet_header2,
189                                    self.pg_packet_sizes, count)
190
191         # send packets and verify received packets
192         self.send_and_verify_pkts(self.pg1, pkts2, self.pg0,
193                                   self.compare_rx_tx_packet_End_AD_IPv6_in)
194
195         # log the localsid counters
196         self.logger.info(self.vapi.cli("show sr localsid"))
197
198         # remove SRv6 localSIDs
199         cli_str = "sr localsid del address " + \
200                   self.sid_list[self.test_sid_index]
201         self.vapi.cli(cli_str)
202
203         # cleanup interfaces
204         self.teardown_interfaces()
205
206     def compare_rx_tx_packet_End_AD_IPv6_out(self, tx_pkt, rx_pkt):
207         """ Compare input and output packet after passing End.AD with IPv6
208
209         :param tx_pkt: transmitted packet
210         :param rx_pkt: received packet
211         """
212
213         # get first (outer) IPv6 header of rx'ed packet
214         rx_ip = rx_pkt.getlayer(IPv6)
215
216         tx_ip = tx_pkt.getlayer(IPv6)
217         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
218
219         # verify if rx'ed packet has no SRH
220         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
221
222         # the whole rx_ip pkt should be equal to tx_ip2
223         # except for the hlim field
224         #   -> adjust tx'ed hlim to expected hlim
225         tx_ip2.hlim = tx_ip2.hlim - 1
226
227         self.assertEqual(rx_ip, tx_ip2)
228
229         self.logger.debug("packet verification: SUCCESS")
230
231     def compare_rx_tx_packet_End_AD_IPv6_in(self, tx_pkt, rx_pkt):
232         """ Compare input and output packet after passing End.AD
233
234         :param tx_pkt: transmitted packet
235         :param rx_pkt: received packet
236         """
237
238         # get first (outer) IPv6 header of rx'ed packet
239         rx_ip = rx_pkt.getlayer(IPv6)
240         # received ip.src should be equal to SR Policy source
241         self.assertEqual(rx_ip.src, self.src_addr)
242         # received ip.dst should be equal to expected sidlist next segment
243         self.assertEqual(rx_ip.dst, self.sid_list[self.test_sid_index + 1])
244
245         # rx'ed packet should have SRH
246         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
247
248         # get SRH
249         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
250         # rx'ed seglist should be equal to SID-list in reversed order
251         self.assertEqual(rx_srh.addresses, self.sid_list[::-1])
252         # segleft should be equal to previous segleft value minus 1
253         self.assertEqual(rx_srh.segleft,
254                          len(self.sid_list) - self.test_sid_index - 2)
255         # lastentry should be equal to the SID-list length minus 1
256         self.assertEqual(rx_srh.lastentry, len(self.sid_list) - 1)
257
258         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
259         # except for the hop-limit field
260         tx_ip = tx_pkt.getlayer(IPv6)
261         #   -> update tx'ed hlim to the expected hlim
262         tx_ip.hlim -= 1
263         #   -> check payload
264         self.assertEqual(rx_srh.payload, tx_ip)
265
266         self.logger.debug("packet verification: SUCCESS")
267
268     def test_SRv6_End_AD_IPv4(self):
269         """ Test SRv6 End.AD behavior with IPv4 traffic.
270         """
271         self.src_addr = 'a0::'
272         self.sid_list = ['a1::', 'a2::a4', 'a3::']
273         self.test_sid_index = 1
274
275         # send traffic to one destination interface
276         # source and destination interfaces are IPv6 only
277         self.setup_interfaces(ipv6=[True, False], ipv4=[False, True])
278
279         # configure route to next segment
280         route = VppIpRoute(self, self.sid_list[self.test_sid_index + 1], 128,
281                            [VppRoutePath(self.pg0.remote_ip6,
282                                          self.pg0.sw_if_index,
283                                          proto=DpoProto.DPO_PROTO_IP6)],
284                            is_ip6=1)
285         route.add_vpp_config()
286
287         # configure SRv6 localSID behavior
288         cli_str = "sr localsid address " + \
289                   self.sid_list[self.test_sid_index] + \
290                   " behavior end.ad" + \
291                   " nh " + self.pg1.remote_ip4 + \
292                   " oif " + self.pg1.name + \
293                   " iif " + self.pg1.name
294         self.vapi.cli(cli_str)
295
296         # log the localsids
297         self.logger.debug(self.vapi.cli("show sr localsid"))
298
299         # send one packet per packet size
300         count = len(self.pg_packet_sizes)
301
302         # prepare IPv4 in SRv6 headers
303         packet_header1 = self.create_packet_header_IPv6_SRH_IPv4(
304             srcaddr=self.src_addr,
305             sidlist=self.sid_list[::-1],
306             segleft=len(self.sid_list) - self.test_sid_index - 1)
307
308         # generate packets (pg0->pg1)
309         pkts1 = self.create_stream(self.pg0, self.pg1, packet_header1,
310                                    self.pg_packet_sizes, count)
311
312         # send packets and verify received packets
313         self.send_and_verify_pkts(self.pg0, pkts1, self.pg1,
314                                   self.compare_rx_tx_packet_End_AD_IPv4_out)
315
316         # log the localsid counters
317         self.logger.info(self.vapi.cli("show sr localsid"))
318
319         # prepare IPv6 header for returning packets
320         packet_header2 = self.create_packet_header_IPv4()
321
322         # generate returning packets (pg1->pg0)
323         pkts2 = self.create_stream(self.pg1, self.pg0, packet_header2,
324                                    self.pg_packet_sizes, count)
325
326         # send packets and verify received packets
327         self.send_and_verify_pkts(self.pg1, pkts2, self.pg0,
328                                   self.compare_rx_tx_packet_End_AD_IPv4_in)
329
330         # log the localsid counters
331         self.logger.info(self.vapi.cli("show sr localsid"))
332
333         # remove SRv6 localSIDs
334         cli_str = "sr localsid del address " + \
335                   self.sid_list[self.test_sid_index]
336         self.vapi.cli(cli_str)
337
338         # cleanup interfaces
339         self.teardown_interfaces()
340
341     def compare_rx_tx_packet_End_AD_IPv4_out(self, tx_pkt, rx_pkt):
342         """ Compare input and output packet after passing End.AD with IPv4
343
344         :param tx_pkt: transmitted packet
345         :param rx_pkt: received packet
346         """
347
348         # get IPv4 header of rx'ed packet
349         rx_ip = rx_pkt.getlayer(IP)
350
351         tx_ip = tx_pkt.getlayer(IPv6)
352         tx_ip2 = tx_pkt.getlayer(IP)
353
354         # verify if rx'ed packet has no SRH
355         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
356
357         # the whole rx_ip pkt should be equal to tx_ip2
358         # except for the ttl field and ip checksum
359         #   -> adjust tx'ed ttl to expected ttl
360         tx_ip2.ttl = tx_ip2.ttl - 1
361         #   -> set tx'ed ip checksum to None and let scapy recompute
362         tx_ip2.chksum = None
363         # read back the pkt (with str()) to force computing these fields
364         # probably other ways to accomplish this are possible
365         tx_ip2 = IP(str(tx_ip2))
366
367         self.assertEqual(rx_ip, tx_ip2)
368
369         self.logger.debug("packet verification: SUCCESS")
370
371     def compare_rx_tx_packet_End_AD_IPv4_in(self, tx_pkt, rx_pkt):
372         """ Compare input and output packet after passing End.AD
373
374         :param tx_pkt: transmitted packet
375         :param rx_pkt: received packet
376         """
377
378         # get first (outer) IPv6 header of rx'ed packet
379         rx_ip = rx_pkt.getlayer(IPv6)
380         # received ip.src should be equal to SR Policy source
381         self.assertEqual(rx_ip.src, self.src_addr)
382         # received ip.dst should be equal to expected sidlist next segment
383         self.assertEqual(rx_ip.dst, self.sid_list[self.test_sid_index + 1])
384
385         # rx'ed packet should have SRH
386         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
387
388         # get SRH
389         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
390         # rx'ed seglist should be equal to SID-list in reversed order
391         self.assertEqual(rx_srh.addresses, self.sid_list[::-1])
392         # segleft should be equal to previous segleft value minus 1
393         self.assertEqual(rx_srh.segleft,
394                          len(self.sid_list) - self.test_sid_index - 2)
395         # lastentry should be equal to the SID-list length minus 1
396         self.assertEqual(rx_srh.lastentry, len(self.sid_list) - 1)
397
398         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
399         # except for the ttl field and ip checksum
400         tx_ip = tx_pkt.getlayer(IP)
401         #   -> adjust tx'ed ttl to expected ttl
402         tx_ip.ttl = tx_ip.ttl - 1
403         #   -> set tx'ed ip checksum to None and let scapy recompute
404         tx_ip.chksum = None
405         #   -> read back the pkt (with str()) to force computing these fields
406         # probably other ways to accomplish this are possible
407         self.assertEqual(rx_srh.payload, IP(str(tx_ip)))
408
409         self.logger.debug("packet verification: SUCCESS")
410
411     def test_SRv6_End_AD_L2(self):
412         """ Test SRv6 End.AD behavior with L2 traffic.
413         """
414         self.src_addr = 'a0::'
415         self.sid_list = ['a1::', 'a2::a4', 'a3::']
416         self.test_sid_index = 1
417
418         # send traffic to one destination interface
419         # source and destination interfaces are IPv6 only
420         self.setup_interfaces(ipv6=[True, False])
421
422         # configure route to next segment
423         route = VppIpRoute(self, self.sid_list[self.test_sid_index + 1], 128,
424                            [VppRoutePath(self.pg0.remote_ip6,
425                                          self.pg0.sw_if_index,
426                                          proto=DpoProto.DPO_PROTO_IP6)],
427                            is_ip6=1)
428         route.add_vpp_config()
429
430         # configure SRv6 localSID behavior
431         cli_str = "sr localsid address " + \
432                   self.sid_list[self.test_sid_index] + \
433                   " behavior end.ad" + \
434                   " oif " + self.pg1.name + \
435                   " iif " + self.pg1.name
436         self.vapi.cli(cli_str)
437
438         # log the localsids
439         self.logger.debug(self.vapi.cli("show sr localsid"))
440
441         # send one packet per packet size
442         count = len(self.pg_packet_sizes)
443
444         # prepare L2 in SRv6 headers
445         packet_header1 = self.create_packet_header_IPv6_SRH_L2(
446             srcaddr=self.src_addr,
447             sidlist=self.sid_list[::-1],
448             segleft=len(self.sid_list) - self.test_sid_index - 1,
449             vlan=0)
450
451         # generate packets (pg0->pg1)
452         pkts1 = self.create_stream(self.pg0, self.pg1, packet_header1,
453                                    self.pg_packet_sizes, count)
454
455         # send packets and verify received packets
456         self.send_and_verify_pkts(self.pg0, pkts1, self.pg1,
457                                   self.compare_rx_tx_packet_End_AD_L2_out)
458
459         # log the localsid counters
460         self.logger.info(self.vapi.cli("show sr localsid"))
461
462         # prepare L2 header for returning packets
463         packet_header2 = self.create_packet_header_L2()
464
465         # generate returning packets (pg1->pg0)
466         pkts2 = self.create_stream(self.pg1, self.pg0, packet_header2,
467                                    self.pg_packet_sizes, count)
468
469         # send packets and verify received packets
470         self.send_and_verify_pkts(self.pg1, pkts2, self.pg0,
471                                   self.compare_rx_tx_packet_End_AD_L2_in)
472
473         # log the localsid counters
474         self.logger.info(self.vapi.cli("show sr localsid"))
475
476         # remove SRv6 localSIDs
477         cli_str = "sr localsid del address " + \
478                   self.sid_list[self.test_sid_index]
479         self.vapi.cli(cli_str)
480
481         # cleanup interfaces
482         self.teardown_interfaces()
483
484     def compare_rx_tx_packet_End_AD_L2_out(self, tx_pkt, rx_pkt):
485         """ Compare input and output packet after passing End.AD with L2
486
487         :param tx_pkt: transmitted packet
488         :param rx_pkt: received packet
489         """
490
491         # get IPv4 header of rx'ed packet
492         rx_eth = rx_pkt.getlayer(Ether)
493
494         tx_ip = tx_pkt.getlayer(IPv6)
495         # we can't just get the 2nd Ether layer
496         # get the Raw content and dissect it as Ether
497         tx_eth1 = Ether(str(tx_pkt[Raw]))
498
499         # verify if rx'ed packet has no SRH
500         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
501
502         # the whole rx_eth pkt should be equal to tx_eth1
503         self.assertEqual(rx_eth, tx_eth1)
504
505         self.logger.debug("packet verification: SUCCESS")
506
507     def compare_rx_tx_packet_End_AD_L2_in(self, tx_pkt, rx_pkt):
508         """ Compare input and output packet after passing End.AD
509
510         :param tx_pkt: transmitted packet
511         :param rx_pkt: received packet
512         """
513
514         ####
515         # get first (outer) IPv6 header of rx'ed packet
516         rx_ip = rx_pkt.getlayer(IPv6)
517         # received ip.src should be equal to SR Policy source
518         self.assertEqual(rx_ip.src, self.src_addr)
519         # received ip.dst should be equal to expected sidlist next segment
520         self.assertEqual(rx_ip.dst, self.sid_list[self.test_sid_index + 1])
521
522         # rx'ed packet should have SRH
523         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
524
525         # get SRH
526         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
527         # rx'ed seglist should be equal to SID-list in reversed order
528         self.assertEqual(rx_srh.addresses, self.sid_list[::-1])
529         # segleft should be equal to previous segleft value minus 1
530         self.assertEqual(rx_srh.segleft,
531                          len(self.sid_list) - self.test_sid_index - 2)
532         # lastentry should be equal to the SID-list length minus 1
533         self.assertEqual(rx_srh.lastentry, len(self.sid_list) - 1)
534
535         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
536         tx_ether = tx_pkt.getlayer(Ether)
537         self.assertEqual(Ether(str(rx_srh.payload)), tx_ether)
538
539         self.logger.debug("packet verification: SUCCESS")
540
541     def create_stream(self, src_if, dst_if, packet_header, packet_sizes,
542                       count):
543         """Create SRv6 input packet stream for defined interface.
544
545         :param VppInterface src_if: Interface to create packet stream for
546         :param VppInterface dst_if: destination interface of packet stream
547         :param packet_header: Layer3 scapy packet headers,
548                 L2 is added when not provided,
549                 Raw(payload) with packet_info is added
550         :param list packet_sizes: packet stream pckt sizes,sequentially applied
551                to packets in stream have
552         :param int count: number of packets in packet stream
553         :return: list of packets
554         """
555         self.logger.info("Creating packets")
556         pkts = []
557         for i in range(0, count - 1):
558             payload_info = self.create_packet_info(src_if, dst_if)
559             self.logger.debug(
560                 "Creating packet with index %d" % (payload_info.index))
561             payload = self.info_to_payload(payload_info)
562             # add L2 header if not yet provided in packet_header
563             if packet_header.getlayer(0).name == 'Ethernet':
564                 p = packet_header / Raw(payload)
565             else:
566                 p = Ether(dst=src_if.local_mac, src=src_if.remote_mac) / \
567                     packet_header / Raw(payload)
568             size = packet_sizes[i % len(packet_sizes)]
569             self.logger.debug("Packet size %d" % (size))
570             self.extend_packet(p, size)
571             # we need to store the packet with the automatic fields computed
572             # read back the dumped packet (with str())
573             # to force computing these fields
574             # probably other ways are possible
575             p = Ether(str(p))
576             payload_info.data = p.copy()
577             self.logger.debug(ppp("Created packet:", p))
578             pkts.append(p)
579         self.logger.info("Done creating packets")
580         return pkts
581
582     def send_and_verify_pkts(self, input, pkts, output, compare_func):
583         """Send packets and verify received packets using compare_func
584
585         :param input: ingress interface of DUT
586         :param pkts: list of packets to transmit
587         :param output: egress interface of DUT
588         :param compare_func: function to compare in and out packets
589         """
590         # add traffic stream to input interface
591         input.add_stream(pkts)
592
593         # enable capture on all interfaces
594         self.pg_enable_capture(self.pg_interfaces)
595
596         # start traffic
597         self.logger.info("Starting traffic")
598         self.pg_start()
599
600         # get output capture
601         self.logger.info("Getting packet capture")
602         capture = output.get_capture()
603
604         # assert nothing was captured on input interface
605         # input.assert_nothing_captured()
606
607         # verify captured packets
608         self.verify_captured_pkts(output, capture, compare_func)
609
610     def create_packet_header_IPv6(self):
611         """Create packet header: IPv6 header, UDP header
612
613         :param dst: IPv6 destination address
614
615         IPv6 source address is 1234::1
616         IPv6 destination address is 4321::1
617         UDP source port and destination port are 1234
618         """
619
620         p = IPv6(src='1234::1', dst='4321::1') / UDP(sport=1234, dport=1234)
621         return p
622
623     def create_packet_header_IPv6_SRH_IPv6(self, srcaddr, sidlist, segleft):
624         """Create packet header: IPv6 encapsulated in SRv6:
625         IPv6 header with SRH, IPv6 header, UDP header
626
627         :param int srcaddr: outer source address
628         :param list sidlist: segment list of outer IPv6 SRH
629         :param int segleft: segments-left field of outer IPv6 SRH
630
631         Outer IPv6 source address is set to srcaddr
632         Outer IPv6 destination address is set to sidlist[segleft]
633         Inner IPv6 source addresses is 1234::1
634         Inner IPv6 destination address is 4321::1
635         UDP source port and destination port are 1234
636         """
637
638         p = IPv6(src=srcaddr, dst=sidlist[segleft]) / \
639             IPv6ExtHdrSegmentRouting(addresses=sidlist,
640                                      segleft=segleft, nh=41) / \
641             IPv6(src='1234::1', dst='4321::1') / \
642             UDP(sport=1234, dport=1234)
643         return p
644
645     def create_packet_header_IPv4(self):
646         """Create packet header: IPv4 header, UDP header
647
648         :param dst: IPv4 destination address
649
650         IPv4 source address is 123.1.1.1
651         IPv4 destination address is 124.1.1.1
652         UDP source port and destination port are 1234
653         """
654
655         p = IP(src='123.1.1.1', dst='124.1.1.1') / UDP(sport=1234, dport=1234)
656         return p
657
658     def create_packet_header_IPv6_SRH_IPv4(self, srcaddr, sidlist, segleft):
659         """Create packet header: IPv4 encapsulated in SRv6:
660         IPv6 header with SRH, IPv4 header, UDP header
661
662         :param int srcaddr: outer source address
663         :param list sidlist: segment list of outer IPv6 SRH
664         :param int segleft: segments-left field of outer IPv6 SRH
665
666         Outer IPv6 source address is set to srcaddr
667         Outer IPv6 destination address is set to sidlist[segleft]
668         Inner IPv4 source address is 123.1.1.1
669         Inner IPv4 destination address is 124.1.1.1
670         UDP source port and destination port are 1234
671         """
672
673         p = IPv6(src=srcaddr, dst=sidlist[segleft]) / \
674             IPv6ExtHdrSegmentRouting(addresses=sidlist,
675                                      segleft=segleft, nh=4) / \
676             IP(src='123.1.1.1', dst='124.1.1.1') / \
677             UDP(sport=1234, dport=1234)
678         return p
679
680     def create_packet_header_L2(self, vlan=0):
681         """Create packet header: L2 header
682
683         :param vlan: if vlan!=0 then add 802.1q header
684         """
685         # Note: the dst addr ('00:55:44:33:22:11') is used in
686         # the compare function compare_rx_tx_packet_T_Encaps_L2
687         # to detect presence of L2 in SRH payload
688         p = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
689         etype = 0x8137  # IPX
690         if vlan:
691             # add 802.1q layer
692             p /= Dot1Q(vlan=vlan, type=etype)
693         else:
694             p.type = etype
695         return p
696
697     def create_packet_header_IPv6_SRH_L2(self, srcaddr, sidlist, segleft,
698                                          vlan=0):
699         """Create packet header: L2 encapsulated in SRv6:
700         IPv6 header with SRH, L2
701
702         :param int srcaddr: IPv6 source address
703         :param list sidlist: segment list of outer IPv6 SRH
704         :param int segleft: segments-left field of outer IPv6 SRH
705         :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
706
707         IPv6 source address is set to srcaddr
708         IPv6 destination address is set to sidlist[segleft]
709         """
710         eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
711         etype = 0x8137  # IPX
712         if vlan:
713             # add 802.1q layer
714             eth /= Dot1Q(vlan=vlan, type=etype)
715         else:
716             eth.type = etype
717
718         p = IPv6(src=srcaddr, dst=sidlist[segleft]) / \
719             IPv6ExtHdrSegmentRouting(addresses=sidlist,
720                                      segleft=segleft, nh=59) / \
721             eth
722         return p
723
724     def get_payload_info(self, packet):
725         """ Extract the payload_info from the packet
726         """
727         # in most cases, payload_info is in packet[Raw]
728         # but packet[Raw] gives the complete payload
729         # (incl L2 header) for the T.Encaps L2 case
730         try:
731             payload_info = self.payload_to_info(str(packet[Raw]))
732
733         except:
734             # remote L2 header from packet[Raw]:
735             # take packet[Raw], convert it to an Ether layer
736             # and then extract Raw from it
737             payload_info = self.payload_to_info(
738                 str(Ether(str(packet[Raw]))[Raw]))
739
740         return payload_info
741
742     def verify_captured_pkts(self, dst_if, capture, compare_func):
743         """
744         Verify captured packet stream for specified interface.
745         Compare ingress with egress packets using the specified compare fn
746
747         :param dst_if: egress interface of DUT
748         :param capture: captured packets
749         :param compare_func: function to compare in and out packet
750         """
751         self.logger.info("Verifying capture on interface %s using function %s"
752                          % (dst_if.name, compare_func.func_name))
753
754         last_info = dict()
755         for i in self.pg_interfaces:
756             last_info[i.sw_if_index] = None
757         dst_sw_if_index = dst_if.sw_if_index
758
759         for packet in capture:
760             try:
761                 # extract payload_info from packet's payload
762                 payload_info = self.get_payload_info(packet)
763                 packet_index = payload_info.index
764
765                 self.logger.debug("Verifying packet with index %d"
766                                   % (packet_index))
767                 # packet should have arrived on the expected interface
768                 self.assertEqual(payload_info.dst, dst_sw_if_index)
769                 self.logger.debug(
770                     "Got packet on interface %s: src=%u (idx=%u)" %
771                     (dst_if.name, payload_info.src, packet_index))
772
773                 # search for payload_info with same src and dst if_index
774                 # this will give us the transmitted packet
775                 next_info = self.get_next_packet_info_for_interface2(
776                     payload_info.src, dst_sw_if_index,
777                     last_info[payload_info.src])
778                 last_info[payload_info.src] = next_info
779                 # next_info should not be None
780                 self.assertTrue(next_info is not None)
781                 # index of tx and rx packets should be equal
782                 self.assertEqual(packet_index, next_info.index)
783                 # data field of next_info contains the tx packet
784                 txed_packet = next_info.data
785
786                 self.logger.debug(ppp("Transmitted packet:",
787                                       txed_packet))  # ppp=Pretty Print Packet
788
789                 self.logger.debug(ppp("Received packet:", packet))
790
791                 # compare rcvd packet with expected packet using compare_func
792                 compare_func(txed_packet, packet)
793
794             except:
795                 print packet.command()
796                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
797                 raise
798
799         # have all expected packets arrived?
800         for i in self.pg_interfaces:
801             remaining_packet = self.get_next_packet_info_for_interface2(
802                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
803             self.assertTrue(remaining_packet is None,
804                             "Interface %s: Packet expected from interface %s "
805                             "didn't arrive" % (dst_if.name, i.name))
806
807
808 if __name__ == '__main__':
809     unittest.main(testRunner=VppTestRunner)