6b8e23c852008b3f15b56d0cf9c36532ca012ef2
[vpp.git] / test / test_srv6_as.py
1 #!/usr/bin/env python
2
3 import unittest
4 import binascii
5 from socket import AF_INET6
6
7 from framework import VppTestCase, VppTestRunner
8 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto, VppIpTable
9 from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \
10     SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes
11
12 import scapy.compat
13 from scapy.packet import Raw
14 from scapy.layers.l2 import Ether, Dot1Q
15 from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
16 from scapy.layers.inet import IP, UDP
17
18 from scapy.utils import inet_pton, inet_ntop
19
20 from util import ppp
21
22
23 class TestSRv6(VppTestCase):
24     """ SRv6 Static Proxy plugin Test Case """
25
26     @classmethod
27     def setUpClass(self):
28         super(TestSRv6, self).setUpClass()
29
30     def setUp(self):
31         """ Perform test setup before each test case.
32         """
33         super(TestSRv6, self).setUp()
34
35         # packet sizes, inclusive L2 overhead
36         self.pg_packet_sizes = [64, 512, 1518, 9018]
37
38         # reset packet_infos
39         self.reset_packet_infos()
40
41     def tearDown(self):
42         """ Clean up test setup after each test case.
43         """
44         self.teardown_interfaces()
45
46         super(TestSRv6, self).tearDown()
47
48     def configure_interface(self,
49                             interface,
50                             ipv6=False, ipv4=False,
51                             ipv6_table_id=0, ipv4_table_id=0):
52         """ Configure interface.
53         :param ipv6: configure IPv6 on interface
54         :param ipv4: configure IPv4 on interface
55         :param ipv6_table_id: FIB table_id for IPv6
56         :param ipv4_table_id: FIB table_id for IPv4
57         """
58         self.logger.debug("Configuring interface %s" % (interface.name))
59         if ipv6:
60             self.logger.debug("Configuring IPv6")
61             interface.set_table_ip6(ipv6_table_id)
62             interface.config_ip6()
63             interface.resolve_ndp(timeout=5)
64         if ipv4:
65             self.logger.debug("Configuring IPv4")
66             interface.set_table_ip4(ipv4_table_id)
67             interface.config_ip4()
68             interface.resolve_arp()
69         interface.admin_up()
70
71     def setup_interfaces(self, ipv6=[], ipv4=[],
72                          ipv6_table_id=[], ipv4_table_id=[]):
73         """ Create and configure interfaces.
74
75         :param ipv6: list of interface IPv6 capabilities
76         :param ipv4: list of interface IPv4 capabilities
77         :param ipv6_table_id: list of intf IPv6 FIB table_ids
78         :param ipv4_table_id: list of intf IPv4 FIB table_ids
79         :returns: List of created interfaces.
80         """
81         # how many interfaces?
82         if len(ipv6):
83             count = len(ipv6)
84         else:
85             count = len(ipv4)
86         self.logger.debug("Creating and configuring %d interfaces" % (count))
87
88         # fill up ipv6 and ipv4 lists if needed
89         # not enabled (False) is the default
90         if len(ipv6) < count:
91             ipv6 += (count - len(ipv6)) * [False]
92         if len(ipv4) < count:
93             ipv4 += (count - len(ipv4)) * [False]
94
95         # fill up table_id lists if needed
96         # table_id 0 (global) is the default
97         if len(ipv6_table_id) < count:
98             ipv6_table_id += (count - len(ipv6_table_id)) * [0]
99         if len(ipv4_table_id) < count:
100             ipv4_table_id += (count - len(ipv4_table_id)) * [0]
101
102         # create 'count' pg interfaces
103         self.create_pg_interfaces(range(count))
104
105         # setup all interfaces
106         for i in range(count):
107             intf = self.pg_interfaces[i]
108             self.configure_interface(intf,
109                                      ipv6[i], ipv4[i],
110                                      ipv6_table_id[i], ipv4_table_id[i])
111
112         if any(ipv6):
113             self.logger.debug(self.vapi.cli("show ip6 neighbors"))
114         if any(ipv4):
115             self.logger.debug(self.vapi.cli("show ip arp"))
116         self.logger.debug(self.vapi.cli("show interface"))
117         self.logger.debug(self.vapi.cli("show hardware"))
118
119         return self.pg_interfaces
120
121     def teardown_interfaces(self):
122         """ Unconfigure and bring down interface.
123         """
124         self.logger.debug("Tearing down interfaces")
125         # tear down all interfaces
126         # AFAIK they cannot be deleted
127         for i in self.pg_interfaces:
128             self.logger.debug("Tear down interface %s" % (i.name))
129             i.admin_down()
130             i.unconfig()
131             i.set_table_ip4(0)
132             i.set_table_ip6(0)
133
134     def test_SRv6_End_AS_IPv6_noSRH(self):
135         """ Test SRv6 End.AS behavior with IPv6 traffic and no SRH rewrite.
136         """
137         self.run_SRv6_End_AS_IPv6(
138             sid_list=['a1::', 'a2::a6', 'a3::'],
139             test_sid_index=1,
140             rewrite_src_addr='a2::')
141
142     def test_SRv6_End_AS_IPv6_SRH(self):
143         """ Test SRv6 End.AS behavior with IPv6 traffic and SRH rewrite.
144         """
145         self.run_SRv6_End_AS_IPv6(
146             sid_list=['a1::a6', 'a2::', 'a3::'],
147             test_sid_index=0,
148             rewrite_src_addr='a1::')
149
150     def test_SRv6_End_AS_IPv4_noSRH(self):
151         """ Test SRv6 End.AS behavior with IPv4 traffic and no SRH rewrite.
152         """
153         self.run_SRv6_End_AS_IPv4(
154             sid_list=['a1::', 'a2::a6', 'a3::'],
155             test_sid_index=1,
156             rewrite_src_addr='a2::')
157
158     def test_SRv6_End_AS_IPv4_SRH(self):
159         """ Test SRv6 End.AS behavior with IPv4 traffic and SRH rewrite.
160         """
161         self.run_SRv6_End_AS_IPv4(
162             sid_list=['a1::a6', 'a2::', 'a3::'],
163             test_sid_index=0,
164             rewrite_src_addr='a1::')
165
166     def test_SRv6_End_AS_L2_noSRH(self):
167         """ Test SRv6 End.AS behavior with L2 traffic and no SRH rewrite.
168         """
169         self.run_SRv6_End_AS_L2(
170             sid_list=['a1::', 'a2::a6', 'a3::'],
171             test_sid_index=1,
172             rewrite_src_addr='a2::')
173
174     def test_SRv6_End_AS_L2_SRH(self):
175         """ Test SRv6 End.AS behavior with L2 traffic and SRH rewrite.
176         """
177         self.run_SRv6_End_AS_L2(
178             sid_list=['a1::a6', 'a2::', 'a3::'],
179             test_sid_index=0,
180             rewrite_src_addr='a1::')
181
182     def run_SRv6_End_AS_L2(self, sid_list, test_sid_index, rewrite_src_addr):
183         """ Run SRv6 End.AS test with L2 traffic.
184         """
185         self.rewrite_src_addr = rewrite_src_addr
186         self.rewrite_sid_list = sid_list[test_sid_index + 1::]
187
188         # send traffic to one destination interface
189         # source and destination interfaces are IPv6 only
190         self.setup_interfaces(ipv6=[True, False])
191
192         # configure route to next segment
193         route = VppIpRoute(self, sid_list[test_sid_index + 1], 128,
194                            [VppRoutePath(self.pg0.remote_ip6,
195                                          self.pg0.sw_if_index,
196                                          proto=DpoProto.DPO_PROTO_IP6)],
197                            is_ip6=1)
198         route.add_vpp_config()
199
200         # configure SRv6 localSID behavior
201         cli_str = "sr localsid address " + sid_list[test_sid_index] \
202             + " behavior end.as" \
203             + " oif " + self.pg1.name \
204             + " iif " + self.pg1.name \
205             + " src " + self.rewrite_src_addr
206         for s in self.rewrite_sid_list:
207             cli_str += " next " + s
208         self.vapi.cli(cli_str)
209
210         # log the localsids
211         self.logger.debug(self.vapi.cli("show sr localsid"))
212
213         # send one packet per packet size
214         count = len(self.pg_packet_sizes)
215
216         # prepare L2 in SRv6 headers
217         packet_header1 = self.create_packet_header_IPv6_SRH_L2(
218                         sidlist=sid_list[::-1],
219                         segleft=len(sid_list) - test_sid_index - 1,
220                         vlan=0)
221
222         # generate packets (pg0->pg1)
223         pkts1 = self.create_stream(self.pg0, self.pg1, packet_header1,
224                                    self.pg_packet_sizes, count)
225
226         # send packets and verify received packets
227         self.send_and_verify_pkts(self.pg0, pkts1, self.pg1,
228                                   self.compare_rx_tx_packet_End_AS_L2_out)
229
230         # log the localsid counters
231         self.logger.info(self.vapi.cli("show sr localsid"))
232
233         # prepare L2 header for returning packets
234         packet_header2 = self.create_packet_header_L2()
235
236         # generate returning packets (pg1->pg0)
237         pkts2 = self.create_stream(self.pg1, self.pg0, packet_header2,
238                                    self.pg_packet_sizes, count)
239
240         # send packets and verify received packets
241         self.send_and_verify_pkts(self.pg1, pkts2, self.pg0,
242                                   self.compare_rx_tx_packet_End_AS_L2_in)
243
244         # log the localsid counters
245         self.logger.info(self.vapi.cli("show sr localsid"))
246
247         # remove SRv6 localSIDs
248         self.vapi.cli("sr localsid del address " + sid_list[test_sid_index])
249
250         # cleanup interfaces
251         self.teardown_interfaces()
252
253     def run_SRv6_End_AS_IPv6(self, sid_list, test_sid_index, rewrite_src_addr):
254         """ Run SRv6 End.AS test with IPv6 traffic.
255         """
256         self.rewrite_src_addr = rewrite_src_addr
257         self.rewrite_sid_list = sid_list[test_sid_index + 1::]
258
259         # send traffic to one destination interface
260         # source and destination interfaces are IPv6 only
261         self.setup_interfaces(ipv6=[True, True])
262
263         # configure route to next segment
264         route = VppIpRoute(self, sid_list[test_sid_index + 1], 128,
265                            [VppRoutePath(self.pg0.remote_ip6,
266                                          self.pg0.sw_if_index,
267                                          proto=DpoProto.DPO_PROTO_IP6)],
268                            is_ip6=1)
269         route.add_vpp_config()
270
271         # configure SRv6 localSID behavior
272         cli_str = "sr localsid address " + sid_list[test_sid_index] \
273             + " behavior end.as" \
274             + " nh " + self.pg1.remote_ip6 \
275             + " oif " + self.pg1.name \
276             + " iif " + self.pg1.name \
277             + " src " + self.rewrite_src_addr
278         for s in self.rewrite_sid_list:
279             cli_str += " next " + s
280         self.vapi.cli(cli_str)
281
282         # log the localsids
283         self.logger.debug(self.vapi.cli("show sr localsid"))
284
285         # send one packet per packet size
286         count = len(self.pg_packet_sizes)
287
288         # prepare IPv6 in SRv6 headers
289         packet_header1 = self.create_packet_header_IPv6_SRH_IPv6(
290                         sidlist=sid_list[::-1],
291                         segleft=len(sid_list) - test_sid_index - 1)
292
293         # generate packets (pg0->pg1)
294         pkts1 = self.create_stream(self.pg0, self.pg1, packet_header1,
295                                    self.pg_packet_sizes, count)
296
297         # send packets and verify received packets
298         self.send_and_verify_pkts(self.pg0, pkts1, self.pg1,
299                                   self.compare_rx_tx_packet_End_AS_IPv6_out)
300
301         # log the localsid counters
302         self.logger.info(self.vapi.cli("show sr localsid"))
303
304         # prepare IPv6 header for returning packets
305         packet_header2 = self.create_packet_header_IPv6()
306
307         # generate returning packets (pg1->pg0)
308         pkts2 = self.create_stream(self.pg1, self.pg0, packet_header2,
309                                    self.pg_packet_sizes, count)
310
311         # send packets and verify received packets
312         self.send_and_verify_pkts(self.pg1, pkts2, self.pg0,
313                                   self.compare_rx_tx_packet_End_AS_IPv6_in)
314
315         # log the localsid counters
316         self.logger.info(self.vapi.cli("show sr localsid"))
317
318         # remove SRv6 localSIDs
319         self.vapi.cli("sr localsid del address " + sid_list[test_sid_index])
320
321         # cleanup interfaces
322         self.teardown_interfaces()
323
324     def run_SRv6_End_AS_IPv4(self, sid_list, test_sid_index, rewrite_src_addr):
325         """ Run SRv6 End.AS test with IPv4 traffic.
326         """
327         self.rewrite_src_addr = rewrite_src_addr
328         self.rewrite_sid_list = sid_list[test_sid_index + 1::]
329
330         # send traffic to one destination interface
331         # source and destination interfaces are IPv6 only
332         self.setup_interfaces(ipv6=[True, False], ipv4=[True, True])
333
334         # configure route to next segment
335         route = VppIpRoute(self, sid_list[test_sid_index + 1], 128,
336                            [VppRoutePath(self.pg0.remote_ip6,
337                                          self.pg0.sw_if_index,
338                                          proto=DpoProto.DPO_PROTO_IP6)],
339                            is_ip6=1)
340         route.add_vpp_config()
341
342         # configure SRv6 localSID behavior
343         cli_str = "sr localsid address " + sid_list[test_sid_index] \
344             + " behavior end.as" \
345             + " nh " + self.pg1.remote_ip4 \
346             + " oif " + self.pg1.name \
347             + " iif " + self.pg1.name \
348             + " src " + self.rewrite_src_addr
349         for s in self.rewrite_sid_list:
350             cli_str += " next " + s
351         self.vapi.cli(cli_str)
352
353         # log the localsids
354         self.logger.debug(self.vapi.cli("show sr localsid"))
355
356         # send one packet per packet size
357         count = len(self.pg_packet_sizes)
358
359         # prepare IPv4 in SRv6 headers
360         packet_header1 = self.create_packet_header_IPv6_SRH_IPv4(
361                         sidlist=sid_list[::-1],
362                         segleft=len(sid_list) - test_sid_index - 1)
363
364         # generate packets (pg0->pg1)
365         pkts1 = self.create_stream(self.pg0, self.pg1, packet_header1,
366                                    self.pg_packet_sizes, count)
367
368         # send packets and verify received packets
369         self.send_and_verify_pkts(self.pg0, pkts1, self.pg1,
370                                   self.compare_rx_tx_packet_End_AS_IPv4_out)
371
372         # log the localsid counters
373         self.logger.info(self.vapi.cli("show sr localsid"))
374
375         # prepare IPv6 header for returning packets
376         packet_header2 = self.create_packet_header_IPv4()
377
378         # generate returning packets (pg1->pg0)
379         pkts2 = self.create_stream(self.pg1, self.pg0, packet_header2,
380                                    self.pg_packet_sizes, count)
381
382         # send packets and verify received packets
383         self.send_and_verify_pkts(self.pg1, pkts2, self.pg0,
384                                   self.compare_rx_tx_packet_End_AS_IPv4_in)
385
386         # log the localsid counters
387         self.logger.info(self.vapi.cli("show sr localsid"))
388
389         # remove SRv6 localSIDs
390         self.vapi.cli("sr localsid del address " + sid_list[test_sid_index])
391
392         # cleanup interfaces
393         self.teardown_interfaces()
394
395     def compare_rx_tx_packet_End_AS_IPv6_in(self, tx_pkt, rx_pkt):
396         """ Compare input and output packet after passing End.AS
397
398         :param tx_pkt: transmitted packet
399         :param rx_pkt: received packet
400         """
401
402         # get first (outer) IPv6 header of rx'ed packet
403         rx_ip = rx_pkt.getlayer(IPv6)
404         rx_srh = None
405
406         tx_ip = tx_pkt.getlayer(IPv6)
407
408         # expected segment-list (SRH order)
409         tx_seglist = self.rewrite_sid_list[::-1]
410
411         # received ip.src should be equal to SR Policy source
412         self.assertEqual(rx_ip.src, self.rewrite_src_addr)
413         # received ip.dst should be equal to expected sidlist[lastentry]
414         self.assertEqual(rx_ip.dst, tx_seglist[-1])
415
416         if len(tx_seglist) > 1:
417             # rx'ed packet should have SRH
418             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
419             # get SRH
420             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
421             # rx'ed seglist should be equal to expected seglist
422             self.assertEqual(rx_srh.addresses, tx_seglist)
423             # segleft should be equal to size expected seglist-1
424             self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
425             # segleft should be equal to lastentry
426             self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
427             # get payload
428             payload = rx_srh.payload
429         else:
430             # rx'ed packet should NOT have SRH
431             self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
432             # get payload
433             payload = rx_ip.payload
434
435         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
436         # except for the hop-limit field
437         #   -> update tx'ed hlim to the expected hlim
438         tx_ip.hlim = tx_ip.hlim - 1
439
440         self.assertEqual(payload, tx_ip)
441
442         self.logger.debug("packet verification: SUCCESS")
443
444     def compare_rx_tx_packet_End_AS_IPv4_in(self, tx_pkt, rx_pkt):
445         """ Compare input and output packet after passing End.AS
446
447         :param tx_pkt: transmitted packet
448         :param rx_pkt: received packet
449         """
450
451         # get first (outer) IPv6 header of rx'ed packet
452         rx_ip = rx_pkt.getlayer(IPv6)
453         rx_srh = None
454
455         tx_ip = tx_pkt.getlayer(IP)
456
457         # expected segment-list (SRH order)
458         tx_seglist = self.rewrite_sid_list[::-1]
459
460         # received ip.src should be equal to SR Policy source
461         self.assertEqual(rx_ip.src, self.rewrite_src_addr)
462         # received ip.dst should be equal to expected sidlist[lastentry]
463         self.assertEqual(rx_ip.dst, tx_seglist[-1])
464
465         if len(tx_seglist) > 1:
466             # rx'ed packet should have SRH and IPv4 header
467             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
468             self.assertTrue(rx_ip.payload.haslayer(IP))
469             # get SRH
470             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
471             # rx'ed seglist should be equal to seglist
472             self.assertEqual(rx_srh.addresses, tx_seglist)
473             # segleft should be equal to size seglist-1
474             self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
475             # segleft should be equal to lastentry
476             self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
477             payload = rx_srh.payload
478         else:
479             # rx'ed packet should NOT have SRH
480             self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
481             # get payload
482             payload = rx_ip.payload
483
484         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
485         # except for the ttl field and ip checksum
486         #   -> adjust tx'ed ttl to expected ttl
487         tx_ip.ttl = tx_ip.ttl - 1
488         #   -> set tx'ed ip checksum to None and let scapy recompute
489         tx_ip.chksum = None
490         # read back the pkt (with str()) to force computing these fields
491         # probably other ways to accomplish this are possible
492         tx_ip = IP(scapy.compat.raw(tx_ip))
493
494         self.assertEqual(payload, tx_ip)
495
496         self.logger.debug("packet verification: SUCCESS")
497
498     def compare_rx_tx_packet_End_AS_L2_in(self, tx_pkt, rx_pkt):
499         """ Compare input and output packet after passing End.AS
500
501         :param tx_pkt: transmitted packet
502         :param rx_pkt: received packet
503         """
504
505         # get first (outer) IPv6 header of rx'ed packet
506         rx_ip = rx_pkt.getlayer(IPv6)
507         rx_srh = None
508
509         tx_ether = tx_pkt.getlayer(Ether)
510
511         # expected segment-list (SRH order)
512         tx_seglist = self.rewrite_sid_list[::-1]
513
514         # received ip.src should be equal to SR Policy source
515         self.assertEqual(rx_ip.src, self.rewrite_src_addr)
516         # received ip.dst should be equal to expected sidlist[lastentry]
517         self.assertEqual(rx_ip.dst, tx_seglist[-1])
518
519         if len(tx_seglist) > 1:
520             # rx'ed packet should have SRH
521             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
522             # get SRH
523             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
524             # rx'ed seglist should be equal to seglist
525             self.assertEqual(rx_srh.addresses, tx_seglist)
526             # segleft should be equal to size seglist-1
527             self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
528             # segleft should be equal to lastentry
529             self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
530             # nh should be "No Next Header" (59)
531             self.assertEqual(rx_srh.nh, 59)
532             # get payload
533             payload = rx_srh.payload
534         else:
535             # rx'ed packet should NOT have SRH
536             self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
537             # get payload
538             payload = rx_ip.payload
539
540         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
541         self.assertEqual(Ether(scapy.compat.raw(payload)), tx_ether)
542
543         self.logger.debug("packet verification: SUCCESS")
544
545     def compare_rx_tx_packet_End_AS_IPv6_out(self, tx_pkt, rx_pkt):
546         """ Compare input and output packet after passing End.AS with IPv6
547
548         :param tx_pkt: transmitted packet
549         :param rx_pkt: received packet
550         """
551
552         # get first (outer) IPv6 header of rx'ed packet
553         rx_ip = rx_pkt.getlayer(IPv6)
554
555         tx_ip = tx_pkt.getlayer(IPv6)
556         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
557
558         # verify if rx'ed packet has no SRH
559         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
560
561         # the whole rx_ip pkt should be equal to tx_ip2
562         # except for the hlim field
563         #   -> adjust tx'ed hlim to expected hlim
564         tx_ip2.hlim = tx_ip2.hlim - 1
565
566         self.assertEqual(rx_ip, tx_ip2)
567
568         self.logger.debug("packet verification: SUCCESS")
569
570     def compare_rx_tx_packet_End_AS_IPv4_out(self, tx_pkt, rx_pkt):
571         """ Compare input and output packet after passing End.AS with IPv4
572
573         :param tx_pkt: transmitted packet
574         :param rx_pkt: received packet
575         """
576
577         # get IPv4 header of rx'ed packet
578         rx_ip = rx_pkt.getlayer(IP)
579
580         tx_ip = tx_pkt.getlayer(IPv6)
581         tx_ip2 = tx_pkt.getlayer(IP)
582
583         # verify if rx'ed packet has no SRH
584         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
585
586         # the whole rx_ip pkt should be equal to tx_ip2
587         # except for the ttl field and ip checksum
588         #   -> adjust tx'ed ttl to expected ttl
589         tx_ip2.ttl = tx_ip2.ttl - 1
590         #   -> set tx'ed ip checksum to None and let scapy recompute
591         tx_ip2.chksum = None
592         # read back the pkt (with str()) to force computing these fields
593         # probably other ways to accomplish this are possible
594         tx_ip2 = IP(scapy.compat.raw(tx_ip2))
595
596         self.assertEqual(rx_ip, tx_ip2)
597
598         self.logger.debug("packet verification: SUCCESS")
599
600     def compare_rx_tx_packet_End_AS_L2_out(self, tx_pkt, rx_pkt):
601         """ Compare input and output packet after passing End.AS with L2
602
603         :param tx_pkt: transmitted packet
604         :param rx_pkt: received packet
605         """
606
607         # get IPv4 header of rx'ed packet
608         rx_eth = rx_pkt.getlayer(Ether)
609
610         tx_ip = tx_pkt.getlayer(IPv6)
611         # we can't just get the 2nd Ether layer
612         # get the Raw content and dissect it as Ether
613         tx_eth1 = Ether(scapy.compat.raw(tx_pkt[Raw]))
614
615         # verify if rx'ed packet has no SRH
616         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
617
618         # the whole rx_eth pkt should be equal to tx_eth1
619         self.assertEqual(rx_eth, tx_eth1)
620
621         self.logger.debug("packet verification: SUCCESS")
622
623     def create_stream(self, src_if, dst_if, packet_header, packet_sizes,
624                       count):
625         """Create SRv6 input packet stream for defined interface.
626
627         :param VppInterface src_if: Interface to create packet stream for
628         :param VppInterface dst_if: destination interface of packet stream
629         :param packet_header: Layer3 scapy packet headers,
630                 L2 is added when not provided,
631                 Raw(payload) with packet_info is added
632         :param list packet_sizes: packet stream pckt sizes,sequentially applied
633                to packets in stream have
634         :param int count: number of packets in packet stream
635         :return: list of packets
636         """
637         self.logger.info("Creating packets")
638         pkts = []
639         for i in range(0, count-1):
640             payload_info = self.create_packet_info(src_if, dst_if)
641             self.logger.debug(
642                 "Creating packet with index %d" % (payload_info.index))
643             payload = self.info_to_payload(payload_info)
644             # add L2 header if not yet provided in packet_header
645             if packet_header.getlayer(0).name == 'Ethernet':
646                 p = (packet_header /
647                      Raw(payload))
648             else:
649                 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
650                      packet_header /
651                      Raw(payload))
652             size = packet_sizes[i % len(packet_sizes)]
653             self.logger.debug("Packet size %d" % (size))
654             self.extend_packet(p, size)
655             # we need to store the packet with the automatic fields computed
656             # read back the dumped packet (with str())
657             # to force computing these fields
658             # probably other ways are possible
659             p = Ether(scapy.compat.raw(p))
660             payload_info.data = p.copy()
661             self.logger.debug(ppp("Created packet:", p))
662             pkts.append(p)
663         self.logger.info("Done creating packets")
664         return pkts
665
666     def send_and_verify_pkts(self, input, pkts, output, compare_func):
667         """Send packets and verify received packets using compare_func
668
669         :param input: ingress interface of DUT
670         :param pkts: list of packets to transmit
671         :param output: egress interface of DUT
672         :param compare_func: function to compare in and out packets
673         """
674         # add traffic stream to input interface
675         input.add_stream(pkts)
676
677         # enable capture on all interfaces
678         self.pg_enable_capture(self.pg_interfaces)
679
680         # start traffic
681         self.logger.info("Starting traffic")
682         self.pg_start()
683
684         # get output capture
685         self.logger.info("Getting packet capture")
686         capture = output.get_capture()
687
688         # assert nothing was captured on input interface
689         # input.assert_nothing_captured()
690
691         # verify captured packets
692         self.verify_captured_pkts(output, capture, compare_func)
693
694     def create_packet_header_IPv6(self):
695         """Create packet header: IPv6 header, UDP header
696
697         :param dst: IPv6 destination address
698
699         IPv6 source address is 1234::1
700         IPv6 destination address is 4321::1
701         UDP source port and destination port are 1234
702         """
703
704         p = (IPv6(src='1234::1', dst='4321::1') /
705              UDP(sport=1234, dport=1234))
706         return p
707
708     def create_packet_header_IPv6_SRH_IPv6(self, sidlist, segleft):
709         """Create packet header: IPv6 encapsulated in SRv6:
710         IPv6 header with SRH, IPv6 header, UDP header
711
712         :param list sidlist: segment list of outer IPv6 SRH
713         :param int segleft: segments-left field of outer IPv6 SRH
714
715         Outer IPv6 source address is set to 5678::1
716         Outer IPv6 destination address is set to sidlist[segleft]
717         IPv6 source addresses is 1234::1
718         IPv6 destination address is 4321::1
719         UDP source port and destination port are 1234
720         """
721
722         p = (IPv6(src='5678::1', dst=sidlist[segleft]) /
723              IPv6ExtHdrSegmentRouting(addresses=sidlist,
724                                       segleft=segleft, nh=41) /
725              IPv6(src='1234::1', dst='4321::1') /
726              UDP(sport=1234, dport=1234))
727         return p
728
729     def create_packet_header_IPv4(self):
730         """Create packet header: IPv4 header, UDP header
731
732         :param dst: IPv4 destination address
733
734         IPv4 source address is 123.1.1.1
735         IPv4 destination address is 124.1.1.1
736         UDP source port and destination port are 1234
737         """
738
739         p = (IP(src='123.1.1.1', dst='124.1.1.1') /
740              UDP(sport=1234, dport=1234))
741         return p
742
743     def create_packet_header_IPv6_SRH_IPv4(self, sidlist, segleft):
744         """Create packet header: IPv4 encapsulated in SRv6:
745         IPv6 header with SRH, IPv4 header, UDP header
746
747         :param ipv4address dst: inner IPv4 destination address
748         :param list sidlist: segment list of outer IPv6 SRH
749         :param int segleft: segments-left field of outer IPv6 SRH
750
751         Outer IPv6 destination address is set to sidlist[segleft]
752         IPv6 source address is 1234::1
753         IPv4 source address is 123.1.1.1
754         IPv4 destination address is 124.1.1.1
755         UDP source port and destination port are 1234
756         """
757
758         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
759              IPv6ExtHdrSegmentRouting(addresses=sidlist,
760                                       segleft=segleft, nh=4) /
761              IP(src='123.1.1.1', dst='124.1.1.1') /
762              UDP(sport=1234, dport=1234))
763         return p
764
765     def create_packet_header_L2(self, vlan=0):
766         """Create packet header: L2 header
767
768         :param vlan: if vlan!=0 then add 802.1q header
769         """
770         # Note: the dst addr ('00:55:44:33:22:11') is used in
771         # the compare function compare_rx_tx_packet_T_Encaps_L2
772         # to detect presence of L2 in SRH payload
773         p = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
774         etype = 0x8137  # IPX
775         if vlan:
776             # add 802.1q layer
777             p /= Dot1Q(vlan=vlan, type=etype)
778         else:
779             p.type = etype
780         return p
781
782     def create_packet_header_IPv6_SRH_L2(self, sidlist, segleft, vlan=0):
783         """Create packet header: L2 encapsulated in SRv6:
784         IPv6 header with SRH, L2
785
786         :param list sidlist: segment list of outer IPv6 SRH
787         :param int segleft: segments-left field of outer IPv6 SRH
788         :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
789
790         Outer IPv6 destination address is set to sidlist[segleft]
791         IPv6 source address is 1234::1
792         """
793         eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
794         etype = 0x8137  # IPX
795         if vlan:
796             # add 802.1q layer
797             eth /= Dot1Q(vlan=vlan, type=etype)
798         else:
799             eth.type = etype
800
801         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
802              IPv6ExtHdrSegmentRouting(addresses=sidlist,
803                                       segleft=segleft, nh=59) /
804              eth)
805         return p
806
807     def get_payload_info(self, packet):
808         """ Extract the payload_info from the packet
809         """
810         # in most cases, payload_info is in packet[Raw]
811         # but packet[Raw] gives the complete payload
812         # (incl L2 header) for the T.Encaps L2 case
813         try:
814             payload_info = self.payload_to_info(packet[Raw])
815
816         except:
817             # remote L2 header from packet[Raw]:
818             # take packet[Raw], convert it to an Ether layer
819             # and then extract Raw from it
820             payload_info = self.payload_to_info(
821                 Ether(scapy.compat.raw(packet[Raw]))[Raw])
822
823         return payload_info
824
825     def verify_captured_pkts(self, dst_if, capture, compare_func):
826         """
827         Verify captured packet stream for specified interface.
828         Compare ingress with egress packets using the specified compare fn
829
830         :param dst_if: egress interface of DUT
831         :param capture: captured packets
832         :param compare_func: function to compare in and out packet
833         """
834         self.logger.info("Verifying capture on interface %s using function %s"
835                          % (dst_if.name, compare_func.__name__))
836
837         last_info = dict()
838         for i in self.pg_interfaces:
839             last_info[i.sw_if_index] = None
840         dst_sw_if_index = dst_if.sw_if_index
841
842         for packet in capture:
843             try:
844                 # extract payload_info from packet's payload
845                 payload_info = self.get_payload_info(packet)
846                 packet_index = payload_info.index
847
848                 self.logger.debug("Verifying packet with index %d"
849                                   % (packet_index))
850                 # packet should have arrived on the expected interface
851                 self.assertEqual(payload_info.dst, dst_sw_if_index)
852                 self.logger.debug(
853                     "Got packet on interface %s: src=%u (idx=%u)" %
854                     (dst_if.name, payload_info.src, packet_index))
855
856                 # search for payload_info with same src and dst if_index
857                 # this will give us the transmitted packet
858                 next_info = self.get_next_packet_info_for_interface2(
859                     payload_info.src, dst_sw_if_index,
860                     last_info[payload_info.src])
861                 last_info[payload_info.src] = next_info
862                 # next_info should not be None
863                 self.assertTrue(next_info is not None)
864                 # index of tx and rx packets should be equal
865                 self.assertEqual(packet_index, next_info.index)
866                 # data field of next_info contains the tx packet
867                 txed_packet = next_info.data
868
869                 self.logger.debug(ppp("Transmitted packet:",
870                                       txed_packet))  # ppp=Pretty Print Packet
871
872                 self.logger.debug(ppp("Received packet:", packet))
873
874                 # compare rcvd packet with expected packet using compare_func
875                 compare_func(txed_packet, packet)
876
877             except:
878                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
879                 raise
880
881         # have all expected packets arrived?
882         for i in self.pg_interfaces:
883             remaining_packet = self.get_next_packet_info_for_interface2(
884                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
885             self.assertTrue(remaining_packet is None,
886                             "Interface %s: Packet expected from interface %s "
887                             "didn't arrive" % (dst_if.name, i.name))
888
889
890 if __name__ == '__main__':
891     unittest.main(testRunner=VppTestRunner)