Tests Cleanup: Fix missing calls to setUpClass/tearDownClass.
[vpp.git] / test / test_srv6_ad.py
1 #!/usr/bin/env python
2
3 import unittest
4 import binascii
5 from socket import AF_INET6
6
7 from framework import VppTestCase, VppTestRunner
8 from vpp_ip import DpoProto
9 from vpp_ip_route import VppIpRoute, VppRoutePath, VppIpTable
10 from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \
11     SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes
12
13 import scapy.compat
14 from scapy.packet import Raw
15 from scapy.layers.l2 import Ether, Dot1Q
16 from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
17 from scapy.layers.inet import IP, UDP
18
19 from scapy.utils import inet_pton, inet_ntop
20
21 from util import ppp
22
23
24 class TestSRv6(VppTestCase):
25     """ SRv6 Dynamic Proxy plugin Test Case """
26
27     @classmethod
28     def setUpClass(self):
29         super(TestSRv6, self).setUpClass()
30
31     @classmethod
32     def tearDownClass(cls):
33         super(TestSRv6, cls).tearDownClass()
34
35     def setUp(self):
36         """ Perform test setup before each test case.
37         """
38         super(TestSRv6, self).setUp()
39
40         # packet sizes, inclusive L2 overhead
41         self.pg_packet_sizes = [64, 512, 1518, 9018]
42
43         # reset packet_infos
44         self.reset_packet_infos()
45
46     def tearDown(self):
47         """ Clean up test setup after each test case.
48         """
49         self.teardown_interfaces()
50
51         super(TestSRv6, self).tearDown()
52
53     def configure_interface(self,
54                             interface,
55                             ipv6=False, ipv4=False,
56                             ipv6_table_id=0, ipv4_table_id=0):
57         """ Configure interface.
58         :param ipv6: configure IPv6 on interface
59         :param ipv4: configure IPv4 on interface
60         :param ipv6_table_id: FIB table_id for IPv6
61         :param ipv4_table_id: FIB table_id for IPv4
62         """
63         self.logger.debug("Configuring interface %s" % (interface.name))
64         if ipv6:
65             self.logger.debug("Configuring IPv6")
66             interface.set_table_ip6(ipv6_table_id)
67             interface.config_ip6()
68             interface.resolve_ndp(timeout=5)
69         if ipv4:
70             self.logger.debug("Configuring IPv4")
71             interface.set_table_ip4(ipv4_table_id)
72             interface.config_ip4()
73             interface.resolve_arp()
74         interface.admin_up()
75
76     def setup_interfaces(self, ipv6=[], ipv4=[],
77                          ipv6_table_id=[], ipv4_table_id=[]):
78         """ Create and configure interfaces.
79
80         :param ipv6: list of interface IPv6 capabilities
81         :param ipv4: list of interface IPv4 capabilities
82         :param ipv6_table_id: list of intf IPv6 FIB table_ids
83         :param ipv4_table_id: list of intf IPv4 FIB table_ids
84         :returns: List of created interfaces.
85         """
86         # how many interfaces?
87         if len(ipv6):
88             count = len(ipv6)
89         else:
90             count = len(ipv4)
91         self.logger.debug("Creating and configuring %d interfaces" % (count))
92
93         # fill up ipv6 and ipv4 lists if needed
94         # not enabled (False) is the default
95         if len(ipv6) < count:
96             ipv6 += (count - len(ipv6)) * [False]
97         if len(ipv4) < count:
98             ipv4 += (count - len(ipv4)) * [False]
99
100         # fill up table_id lists if needed
101         # table_id 0 (global) is the default
102         if len(ipv6_table_id) < count:
103             ipv6_table_id += (count - len(ipv6_table_id)) * [0]
104         if len(ipv4_table_id) < count:
105             ipv4_table_id += (count - len(ipv4_table_id)) * [0]
106
107         # create 'count' pg interfaces
108         self.create_pg_interfaces(range(count))
109
110         # setup all interfaces
111         for i in range(count):
112             intf = self.pg_interfaces[i]
113             self.configure_interface(intf,
114                                      ipv6[i], ipv4[i],
115                                      ipv6_table_id[i], ipv4_table_id[i])
116
117         if any(ipv6):
118             self.logger.debug(self.vapi.cli("show ip6 neighbors"))
119         if any(ipv4):
120             self.logger.debug(self.vapi.cli("show ip arp"))
121         self.logger.debug(self.vapi.cli("show interface"))
122         self.logger.debug(self.vapi.cli("show hardware"))
123
124         return self.pg_interfaces
125
126     def teardown_interfaces(self):
127         """ Unconfigure and bring down interface.
128         """
129         self.logger.debug("Tearing down interfaces")
130         # tear down all interfaces
131         # AFAIK they cannot be deleted
132         for i in self.pg_interfaces:
133             self.logger.debug("Tear down interface %s" % (i.name))
134             i.admin_down()
135             i.unconfig()
136             i.set_table_ip4(0)
137             i.set_table_ip6(0)
138
139     def test_SRv6_End_AD_IPv6(self):
140         """ Test SRv6 End.AD behavior with IPv6 traffic.
141         """
142         self.src_addr = 'a0::'
143         self.sid_list = ['a1::', 'a2::a6', 'a3::']
144         self.test_sid_index = 1
145
146         # send traffic to one destination interface
147         # source and destination interfaces are IPv6 only
148         self.setup_interfaces(ipv6=[True, True])
149
150         # configure route to next segment
151         route = VppIpRoute(self, self.sid_list[self.test_sid_index + 1], 128,
152                            [VppRoutePath(self.pg0.remote_ip6,
153                                          self.pg0.sw_if_index,
154                                          proto=DpoProto.DPO_PROTO_IP6)],
155                            is_ip6=1)
156         route.add_vpp_config()
157
158         # configure SRv6 localSID behavior
159         cli_str = "sr localsid address " + \
160                   self.sid_list[self.test_sid_index] + \
161                   " behavior end.ad" + \
162                   " nh " + self.pg1.remote_ip6 + \
163                   " oif " + self.pg1.name + \
164                   " iif " + self.pg1.name
165         self.vapi.cli(cli_str)
166
167         # log the localsids
168         self.logger.debug(self.vapi.cli("show sr localsid"))
169
170         # send one packet per packet size
171         count = len(self.pg_packet_sizes)
172
173         # prepare IPv6 in SRv6 headers
174         packet_header1 = self.create_packet_header_IPv6_SRH_IPv6(
175             srcaddr=self.src_addr,
176             sidlist=self.sid_list[::-1],
177             segleft=len(self.sid_list) - self.test_sid_index - 1)
178
179         # generate packets (pg0->pg1)
180         pkts1 = self.create_stream(self.pg0, self.pg1, packet_header1,
181                                    self.pg_packet_sizes, count)
182
183         # send packets and verify received packets
184         self.send_and_verify_pkts(self.pg0, pkts1, self.pg1,
185                                   self.compare_rx_tx_packet_End_AD_IPv6_out)
186
187         # log the localsid counters
188         self.logger.info(self.vapi.cli("show sr localsid"))
189
190         # prepare IPv6 header for returning packets
191         packet_header2 = self.create_packet_header_IPv6()
192
193         # generate returning packets (pg1->pg0)
194         pkts2 = self.create_stream(self.pg1, self.pg0, packet_header2,
195                                    self.pg_packet_sizes, count)
196
197         # send packets and verify received packets
198         self.send_and_verify_pkts(self.pg1, pkts2, self.pg0,
199                                   self.compare_rx_tx_packet_End_AD_IPv6_in)
200
201         # log the localsid counters
202         self.logger.info(self.vapi.cli("show sr localsid"))
203
204         # remove SRv6 localSIDs
205         cli_str = "sr localsid del address " + \
206                   self.sid_list[self.test_sid_index]
207         self.vapi.cli(cli_str)
208
209         # cleanup interfaces
210         self.teardown_interfaces()
211
212     def compare_rx_tx_packet_End_AD_IPv6_out(self, tx_pkt, rx_pkt):
213         """ Compare input and output packet after passing End.AD with IPv6
214
215         :param tx_pkt: transmitted packet
216         :param rx_pkt: received packet
217         """
218
219         # get first (outer) IPv6 header of rx'ed packet
220         rx_ip = rx_pkt.getlayer(IPv6)
221
222         tx_ip = tx_pkt.getlayer(IPv6)
223         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
224
225         # verify if rx'ed packet has no SRH
226         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
227
228         # the whole rx_ip pkt should be equal to tx_ip2
229         # except for the hlim field
230         #   -> adjust tx'ed hlim to expected hlim
231         tx_ip2.hlim = tx_ip2.hlim - 1
232
233         self.assertEqual(rx_ip, tx_ip2)
234
235         self.logger.debug("packet verification: SUCCESS")
236
237     def compare_rx_tx_packet_End_AD_IPv6_in(self, tx_pkt, rx_pkt):
238         """ Compare input and output packet after passing End.AD
239
240         :param tx_pkt: transmitted packet
241         :param rx_pkt: received packet
242         """
243
244         # get first (outer) IPv6 header of rx'ed packet
245         rx_ip = rx_pkt.getlayer(IPv6)
246         # received ip.src should be equal to SR Policy source
247         self.assertEqual(rx_ip.src, self.src_addr)
248         # received ip.dst should be equal to expected sidlist next segment
249         self.assertEqual(rx_ip.dst, self.sid_list[self.test_sid_index + 1])
250
251         # rx'ed packet should have SRH
252         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
253
254         # get SRH
255         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
256         # rx'ed seglist should be equal to SID-list in reversed order
257         self.assertEqual(rx_srh.addresses, self.sid_list[::-1])
258         # segleft should be equal to previous segleft value minus 1
259         self.assertEqual(rx_srh.segleft,
260                          len(self.sid_list) - self.test_sid_index - 2)
261         # lastentry should be equal to the SID-list length minus 1
262         self.assertEqual(rx_srh.lastentry, len(self.sid_list) - 1)
263
264         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
265         # except for the hop-limit field
266         tx_ip = tx_pkt.getlayer(IPv6)
267         #   -> update tx'ed hlim to the expected hlim
268         tx_ip.hlim -= 1
269         #   -> check payload
270         self.assertEqual(rx_srh.payload, tx_ip)
271
272         self.logger.debug("packet verification: SUCCESS")
273
274     def test_SRv6_End_AD_IPv4(self):
275         """ Test SRv6 End.AD behavior with IPv4 traffic.
276         """
277         self.src_addr = 'a0::'
278         self.sid_list = ['a1::', 'a2::a4', 'a3::']
279         self.test_sid_index = 1
280
281         # send traffic to one destination interface
282         # source and destination interfaces are IPv6 only
283         self.setup_interfaces(ipv6=[True, False], ipv4=[False, True])
284
285         # configure route to next segment
286         route = VppIpRoute(self, self.sid_list[self.test_sid_index + 1], 128,
287                            [VppRoutePath(self.pg0.remote_ip6,
288                                          self.pg0.sw_if_index,
289                                          proto=DpoProto.DPO_PROTO_IP6)],
290                            is_ip6=1)
291         route.add_vpp_config()
292
293         # configure SRv6 localSID behavior
294         cli_str = "sr localsid address " + \
295                   self.sid_list[self.test_sid_index] + \
296                   " behavior end.ad" + \
297                   " nh " + self.pg1.remote_ip4 + \
298                   " oif " + self.pg1.name + \
299                   " iif " + self.pg1.name
300         self.vapi.cli(cli_str)
301
302         # log the localsids
303         self.logger.debug(self.vapi.cli("show sr localsid"))
304
305         # send one packet per packet size
306         count = len(self.pg_packet_sizes)
307
308         # prepare IPv4 in SRv6 headers
309         packet_header1 = self.create_packet_header_IPv6_SRH_IPv4(
310             srcaddr=self.src_addr,
311             sidlist=self.sid_list[::-1],
312             segleft=len(self.sid_list) - self.test_sid_index - 1)
313
314         # generate packets (pg0->pg1)
315         pkts1 = self.create_stream(self.pg0, self.pg1, packet_header1,
316                                    self.pg_packet_sizes, count)
317
318         # send packets and verify received packets
319         self.send_and_verify_pkts(self.pg0, pkts1, self.pg1,
320                                   self.compare_rx_tx_packet_End_AD_IPv4_out)
321
322         # log the localsid counters
323         self.logger.info(self.vapi.cli("show sr localsid"))
324
325         # prepare IPv6 header for returning packets
326         packet_header2 = self.create_packet_header_IPv4()
327
328         # generate returning packets (pg1->pg0)
329         pkts2 = self.create_stream(self.pg1, self.pg0, packet_header2,
330                                    self.pg_packet_sizes, count)
331
332         # send packets and verify received packets
333         self.send_and_verify_pkts(self.pg1, pkts2, self.pg0,
334                                   self.compare_rx_tx_packet_End_AD_IPv4_in)
335
336         # log the localsid counters
337         self.logger.info(self.vapi.cli("show sr localsid"))
338
339         # remove SRv6 localSIDs
340         cli_str = "sr localsid del address " + \
341                   self.sid_list[self.test_sid_index]
342         self.vapi.cli(cli_str)
343
344         # cleanup interfaces
345         self.teardown_interfaces()
346
347     def compare_rx_tx_packet_End_AD_IPv4_out(self, tx_pkt, rx_pkt):
348         """ Compare input and output packet after passing End.AD with IPv4
349
350         :param tx_pkt: transmitted packet
351         :param rx_pkt: received packet
352         """
353
354         # get IPv4 header of rx'ed packet
355         rx_ip = rx_pkt.getlayer(IP)
356
357         tx_ip = tx_pkt.getlayer(IPv6)
358         tx_ip2 = tx_pkt.getlayer(IP)
359
360         # verify if rx'ed packet has no SRH
361         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
362
363         # the whole rx_ip pkt should be equal to tx_ip2
364         # except for the ttl field and ip checksum
365         #   -> adjust tx'ed ttl to expected ttl
366         tx_ip2.ttl = tx_ip2.ttl - 1
367         #   -> set tx'ed ip checksum to None and let scapy recompute
368         tx_ip2.chksum = None
369         # read back the pkt (with str()) to force computing these fields
370         # probably other ways to accomplish this are possible
371         tx_ip2 = IP(scapy.compat.raw(tx_ip2))
372
373         self.assertEqual(rx_ip, tx_ip2)
374
375         self.logger.debug("packet verification: SUCCESS")
376
377     def compare_rx_tx_packet_End_AD_IPv4_in(self, tx_pkt, rx_pkt):
378         """ Compare input and output packet after passing End.AD
379
380         :param tx_pkt: transmitted packet
381         :param rx_pkt: received packet
382         """
383
384         # get first (outer) IPv6 header of rx'ed packet
385         rx_ip = rx_pkt.getlayer(IPv6)
386         # received ip.src should be equal to SR Policy source
387         self.assertEqual(rx_ip.src, self.src_addr)
388         # received ip.dst should be equal to expected sidlist next segment
389         self.assertEqual(rx_ip.dst, self.sid_list[self.test_sid_index + 1])
390
391         # rx'ed packet should have SRH
392         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
393
394         # get SRH
395         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
396         # rx'ed seglist should be equal to SID-list in reversed order
397         self.assertEqual(rx_srh.addresses, self.sid_list[::-1])
398         # segleft should be equal to previous segleft value minus 1
399         self.assertEqual(rx_srh.segleft,
400                          len(self.sid_list) - self.test_sid_index - 2)
401         # lastentry should be equal to the SID-list length minus 1
402         self.assertEqual(rx_srh.lastentry, len(self.sid_list) - 1)
403
404         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
405         # except for the ttl field and ip checksum
406         tx_ip = tx_pkt.getlayer(IP)
407         #   -> adjust tx'ed ttl to expected ttl
408         tx_ip.ttl = tx_ip.ttl - 1
409         #   -> set tx'ed ip checksum to None and let scapy recompute
410         tx_ip.chksum = None
411         #   -> read back the pkt (with str()) to force computing these fields
412         # probably other ways to accomplish this are possible
413         self.assertEqual(rx_srh.payload, IP(scapy.compat.raw(tx_ip)))
414
415         self.logger.debug("packet verification: SUCCESS")
416
417     def test_SRv6_End_AD_L2(self):
418         """ Test SRv6 End.AD behavior with L2 traffic.
419         """
420         self.src_addr = 'a0::'
421         self.sid_list = ['a1::', 'a2::a4', 'a3::']
422         self.test_sid_index = 1
423
424         # send traffic to one destination interface
425         # source and destination interfaces are IPv6 only
426         self.setup_interfaces(ipv6=[True, False])
427
428         # configure route to next segment
429         route = VppIpRoute(self, self.sid_list[self.test_sid_index + 1], 128,
430                            [VppRoutePath(self.pg0.remote_ip6,
431                                          self.pg0.sw_if_index,
432                                          proto=DpoProto.DPO_PROTO_IP6)],
433                            is_ip6=1)
434         route.add_vpp_config()
435
436         # configure SRv6 localSID behavior
437         cli_str = "sr localsid address " + \
438                   self.sid_list[self.test_sid_index] + \
439                   " behavior end.ad" + \
440                   " oif " + self.pg1.name + \
441                   " iif " + self.pg1.name
442         self.vapi.cli(cli_str)
443
444         # log the localsids
445         self.logger.debug(self.vapi.cli("show sr localsid"))
446
447         # send one packet per packet size
448         count = len(self.pg_packet_sizes)
449
450         # prepare L2 in SRv6 headers
451         packet_header1 = self.create_packet_header_IPv6_SRH_L2(
452             srcaddr=self.src_addr,
453             sidlist=self.sid_list[::-1],
454             segleft=len(self.sid_list) - self.test_sid_index - 1,
455             vlan=0)
456
457         # generate packets (pg0->pg1)
458         pkts1 = self.create_stream(self.pg0, self.pg1, packet_header1,
459                                    self.pg_packet_sizes, count)
460
461         # send packets and verify received packets
462         self.send_and_verify_pkts(self.pg0, pkts1, self.pg1,
463                                   self.compare_rx_tx_packet_End_AD_L2_out)
464
465         # log the localsid counters
466         self.logger.info(self.vapi.cli("show sr localsid"))
467
468         # prepare L2 header for returning packets
469         packet_header2 = self.create_packet_header_L2()
470
471         # generate returning packets (pg1->pg0)
472         pkts2 = self.create_stream(self.pg1, self.pg0, packet_header2,
473                                    self.pg_packet_sizes, count)
474
475         # send packets and verify received packets
476         self.send_and_verify_pkts(self.pg1, pkts2, self.pg0,
477                                   self.compare_rx_tx_packet_End_AD_L2_in)
478
479         # log the localsid counters
480         self.logger.info(self.vapi.cli("show sr localsid"))
481
482         # remove SRv6 localSIDs
483         cli_str = "sr localsid del address " + \
484                   self.sid_list[self.test_sid_index]
485         self.vapi.cli(cli_str)
486
487         # cleanup interfaces
488         self.teardown_interfaces()
489
490     def compare_rx_tx_packet_End_AD_L2_out(self, tx_pkt, rx_pkt):
491         """ Compare input and output packet after passing End.AD with L2
492
493         :param tx_pkt: transmitted packet
494         :param rx_pkt: received packet
495         """
496
497         # get IPv4 header of rx'ed packet
498         rx_eth = rx_pkt.getlayer(Ether)
499
500         tx_ip = tx_pkt.getlayer(IPv6)
501         # we can't just get the 2nd Ether layer
502         # get the Raw content and dissect it as Ether
503         tx_eth1 = Ether(scapy.compat.raw(tx_pkt[Raw]))
504
505         # verify if rx'ed packet has no SRH
506         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
507
508         # the whole rx_eth pkt should be equal to tx_eth1
509         self.assertEqual(rx_eth, tx_eth1)
510
511         self.logger.debug("packet verification: SUCCESS")
512
513     def compare_rx_tx_packet_End_AD_L2_in(self, tx_pkt, rx_pkt):
514         """ Compare input and output packet after passing End.AD
515
516         :param tx_pkt: transmitted packet
517         :param rx_pkt: received packet
518         """
519
520         ####
521         # get first (outer) IPv6 header of rx'ed packet
522         rx_ip = rx_pkt.getlayer(IPv6)
523         # received ip.src should be equal to SR Policy source
524         self.assertEqual(rx_ip.src, self.src_addr)
525         # received ip.dst should be equal to expected sidlist next segment
526         self.assertEqual(rx_ip.dst, self.sid_list[self.test_sid_index + 1])
527
528         # rx'ed packet should have SRH
529         self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
530
531         # get SRH
532         rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
533         # rx'ed seglist should be equal to SID-list in reversed order
534         self.assertEqual(rx_srh.addresses, self.sid_list[::-1])
535         # segleft should be equal to previous segleft value minus 1
536         self.assertEqual(rx_srh.segleft,
537                          len(self.sid_list) - self.test_sid_index - 2)
538         # lastentry should be equal to the SID-list length minus 1
539         self.assertEqual(rx_srh.lastentry, len(self.sid_list) - 1)
540
541         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
542         tx_ether = tx_pkt.getlayer(Ether)
543         self.assertEqual(Ether(scapy.compat.raw(rx_srh.payload)), tx_ether)
544
545         self.logger.debug("packet verification: SUCCESS")
546
547     def create_stream(self, src_if, dst_if, packet_header, packet_sizes,
548                       count):
549         """Create SRv6 input packet stream for defined interface.
550
551         :param VppInterface src_if: Interface to create packet stream for
552         :param VppInterface dst_if: destination interface of packet stream
553         :param packet_header: Layer3 scapy packet headers,
554                 L2 is added when not provided,
555                 Raw(payload) with packet_info is added
556         :param list packet_sizes: packet stream pckt sizes,sequentially applied
557                to packets in stream have
558         :param int count: number of packets in packet stream
559         :return: list of packets
560         """
561         self.logger.info("Creating packets")
562         pkts = []
563         for i in range(0, count - 1):
564             payload_info = self.create_packet_info(src_if, dst_if)
565             self.logger.debug(
566                 "Creating packet with index %d" % (payload_info.index))
567             payload = self.info_to_payload(payload_info)
568             # add L2 header if not yet provided in packet_header
569             if packet_header.getlayer(0).name == 'Ethernet':
570                 p = packet_header / Raw(payload)
571             else:
572                 p = Ether(dst=src_if.local_mac, src=src_if.remote_mac) / \
573                     packet_header / Raw(payload)
574             size = packet_sizes[i % len(packet_sizes)]
575             self.logger.debug("Packet size %d" % (size))
576             self.extend_packet(p, size)
577             # we need to store the packet with the automatic fields computed
578             # read back the dumped packet (with str())
579             # to force computing these fields
580             # probably other ways are possible
581             p = Ether(scapy.compat.raw(p))
582             payload_info.data = p.copy()
583             self.logger.debug(ppp("Created packet:", p))
584             pkts.append(p)
585         self.logger.info("Done creating packets")
586         return pkts
587
588     def send_and_verify_pkts(self, input, pkts, output, compare_func):
589         """Send packets and verify received packets using compare_func
590
591         :param input: ingress interface of DUT
592         :param pkts: list of packets to transmit
593         :param output: egress interface of DUT
594         :param compare_func: function to compare in and out packets
595         """
596         # add traffic stream to input interface
597         input.add_stream(pkts)
598
599         # enable capture on all interfaces
600         self.pg_enable_capture(self.pg_interfaces)
601
602         # start traffic
603         self.logger.info("Starting traffic")
604         self.pg_start()
605
606         # get output capture
607         self.logger.info("Getting packet capture")
608         capture = output.get_capture()
609
610         # assert nothing was captured on input interface
611         # input.assert_nothing_captured()
612
613         # verify captured packets
614         self.verify_captured_pkts(output, capture, compare_func)
615
616     def create_packet_header_IPv6(self):
617         """Create packet header: IPv6 header, UDP header
618
619         :param dst: IPv6 destination address
620
621         IPv6 source address is 1234::1
622         IPv6 destination address is 4321::1
623         UDP source port and destination port are 1234
624         """
625
626         p = IPv6(src='1234::1', dst='4321::1') / UDP(sport=1234, dport=1234)
627         return p
628
629     def create_packet_header_IPv6_SRH_IPv6(self, srcaddr, sidlist, segleft):
630         """Create packet header: IPv6 encapsulated in SRv6:
631         IPv6 header with SRH, IPv6 header, UDP header
632
633         :param int srcaddr: outer source address
634         :param list sidlist: segment list of outer IPv6 SRH
635         :param int segleft: segments-left field of outer IPv6 SRH
636
637         Outer IPv6 source address is set to srcaddr
638         Outer IPv6 destination address is set to sidlist[segleft]
639         Inner IPv6 source addresses is 1234::1
640         Inner IPv6 destination address is 4321::1
641         UDP source port and destination port are 1234
642         """
643
644         p = IPv6(src=srcaddr, dst=sidlist[segleft]) / \
645             IPv6ExtHdrSegmentRouting(addresses=sidlist,
646                                      segleft=segleft, nh=41) / \
647             IPv6(src='1234::1', dst='4321::1') / \
648             UDP(sport=1234, dport=1234)
649         return p
650
651     def create_packet_header_IPv4(self):
652         """Create packet header: IPv4 header, UDP header
653
654         :param dst: IPv4 destination address
655
656         IPv4 source address is 123.1.1.1
657         IPv4 destination address is 124.1.1.1
658         UDP source port and destination port are 1234
659         """
660
661         p = IP(src='123.1.1.1', dst='124.1.1.1') / UDP(sport=1234, dport=1234)
662         return p
663
664     def create_packet_header_IPv6_SRH_IPv4(self, srcaddr, sidlist, segleft):
665         """Create packet header: IPv4 encapsulated in SRv6:
666         IPv6 header with SRH, IPv4 header, UDP header
667
668         :param int srcaddr: outer source address
669         :param list sidlist: segment list of outer IPv6 SRH
670         :param int segleft: segments-left field of outer IPv6 SRH
671
672         Outer IPv6 source address is set to srcaddr
673         Outer IPv6 destination address is set to sidlist[segleft]
674         Inner IPv4 source address is 123.1.1.1
675         Inner IPv4 destination address is 124.1.1.1
676         UDP source port and destination port are 1234
677         """
678
679         p = IPv6(src=srcaddr, dst=sidlist[segleft]) / \
680             IPv6ExtHdrSegmentRouting(addresses=sidlist,
681                                      segleft=segleft, nh=4) / \
682             IP(src='123.1.1.1', dst='124.1.1.1') / \
683             UDP(sport=1234, dport=1234)
684         return p
685
686     def create_packet_header_L2(self, vlan=0):
687         """Create packet header: L2 header
688
689         :param vlan: if vlan!=0 then add 802.1q header
690         """
691         # Note: the dst addr ('00:55:44:33:22:11') is used in
692         # the compare function compare_rx_tx_packet_T_Encaps_L2
693         # to detect presence of L2 in SRH payload
694         p = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
695         etype = 0x8137  # IPX
696         if vlan:
697             # add 802.1q layer
698             p /= Dot1Q(vlan=vlan, type=etype)
699         else:
700             p.type = etype
701         return p
702
703     def create_packet_header_IPv6_SRH_L2(self, srcaddr, sidlist, segleft,
704                                          vlan=0):
705         """Create packet header: L2 encapsulated in SRv6:
706         IPv6 header with SRH, L2
707
708         :param int srcaddr: IPv6 source address
709         :param list sidlist: segment list of outer IPv6 SRH
710         :param int segleft: segments-left field of outer IPv6 SRH
711         :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
712
713         IPv6 source address is set to srcaddr
714         IPv6 destination address is set to sidlist[segleft]
715         """
716         eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
717         etype = 0x8137  # IPX
718         if vlan:
719             # add 802.1q layer
720             eth /= Dot1Q(vlan=vlan, type=etype)
721         else:
722             eth.type = etype
723
724         p = IPv6(src=srcaddr, dst=sidlist[segleft]) / \
725             IPv6ExtHdrSegmentRouting(addresses=sidlist,
726                                      segleft=segleft, nh=59) / \
727             eth
728         return p
729
730     def get_payload_info(self, packet):
731         """ Extract the payload_info from the packet
732         """
733         # in most cases, payload_info is in packet[Raw]
734         # but packet[Raw] gives the complete payload
735         # (incl L2 header) for the T.Encaps L2 case
736         try:
737             payload_info = self.payload_to_info(packet[Raw])
738
739         except:
740             # remote L2 header from packet[Raw]:
741             # take packet[Raw], convert it to an Ether layer
742             # and then extract Raw from it
743             payload_info = self.payload_to_info(
744                 Ether(scapy.compat.raw(packet[Raw]))[Raw])
745
746         return payload_info
747
748     def verify_captured_pkts(self, dst_if, capture, compare_func):
749         """
750         Verify captured packet stream for specified interface.
751         Compare ingress with egress packets using the specified compare fn
752
753         :param dst_if: egress interface of DUT
754         :param capture: captured packets
755         :param compare_func: function to compare in and out packet
756         """
757         self.logger.info("Verifying capture on interface %s using function %s"
758                          % (dst_if.name, compare_func.__name__))
759
760         last_info = dict()
761         for i in self.pg_interfaces:
762             last_info[i.sw_if_index] = None
763         dst_sw_if_index = dst_if.sw_if_index
764
765         for packet in capture:
766             try:
767                 # extract payload_info from packet's payload
768                 payload_info = self.get_payload_info(packet)
769                 packet_index = payload_info.index
770
771                 self.logger.debug("Verifying packet with index %d"
772                                   % (packet_index))
773                 # packet should have arrived on the expected interface
774                 self.assertEqual(payload_info.dst, dst_sw_if_index)
775                 self.logger.debug(
776                     "Got packet on interface %s: src=%u (idx=%u)" %
777                     (dst_if.name, payload_info.src, packet_index))
778
779                 # search for payload_info with same src and dst if_index
780                 # this will give us the transmitted packet
781                 next_info = self.get_next_packet_info_for_interface2(
782                     payload_info.src, dst_sw_if_index,
783                     last_info[payload_info.src])
784                 last_info[payload_info.src] = next_info
785                 # next_info should not be None
786                 self.assertTrue(next_info is not None)
787                 # index of tx and rx packets should be equal
788                 self.assertEqual(packet_index, next_info.index)
789                 # data field of next_info contains the tx packet
790                 txed_packet = next_info.data
791
792                 self.logger.debug(ppp("Transmitted packet:",
793                                       txed_packet))  # ppp=Pretty Print Packet
794
795                 self.logger.debug(ppp("Received packet:", packet))
796
797                 # compare rcvd packet with expected packet using compare_func
798                 compare_func(txed_packet, packet)
799
800             except:
801                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
802                 raise
803
804         # have all expected packets arrived?
805         for i in self.pg_interfaces:
806             remaining_packet = self.get_next_packet_info_for_interface2(
807                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
808             self.assertTrue(remaining_packet is None,
809                             "Interface %s: Packet expected from interface %s "
810                             "didn't arrive" % (dst_if.name, i.name))
811
812
813 if __name__ == '__main__':
814     unittest.main(testRunner=VppTestRunner)