tests: replace pycodestyle with black
[vpp.git] / test / test_srv6_as.py
1 #!/usr/bin/env python3
2
3 import unittest
4 import binascii
5 from socket import AF_INET6
6
7 from framework import VppTestCase, VppTestRunner
8 from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathProto, VppIpTable
9 from vpp_srv6 import (
10     SRv6LocalSIDBehaviors,
11     VppSRv6LocalSID,
12     VppSRv6Policy,
13     SRv6PolicyType,
14     VppSRv6Steering,
15     SRv6PolicySteeringTypes,
16 )
17
18 import scapy.compat
19 from scapy.packet import Raw
20 from scapy.layers.l2 import Ether, Dot1Q
21 from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
22 from scapy.layers.inet import IP, UDP
23
24 from util import ppp
25
26
27 class TestSRv6As(VppTestCase):
28     """SRv6 Static Proxy plugin Test Case"""
29
30     @classmethod
31     def setUpClass(self):
32         super(TestSRv6As, self).setUpClass()
33
34     @classmethod
35     def tearDownClass(cls):
36         super(TestSRv6As, cls).tearDownClass()
37
38     def setUp(self):
39         """Perform test setup before each test case."""
40         super(TestSRv6As, self).setUp()
41
42         # packet sizes, inclusive L2 overhead
43         self.pg_packet_sizes = [64, 512, 1518, 9018]
44
45         # reset packet_infos
46         self.reset_packet_infos()
47
48     def tearDown(self):
49         """Clean up test setup after each test case."""
50         self.teardown_interfaces()
51
52         super(TestSRv6As, self).tearDown()
53
54     def configure_interface(
55         self, interface, ipv6=False, ipv4=False, ipv6_table_id=0, ipv4_table_id=0
56     ):
57         """Configure interface.
58         :param ipv6: configure IPv6 on interface
59         :param ipv4: configure IPv4 on interface
60         :param ipv6_table_id: FIB table_id for IPv6
61         :param ipv4_table_id: FIB table_id for IPv4
62         """
63         self.logger.debug("Configuring interface %s" % (interface.name))
64         if ipv6:
65             self.logger.debug("Configuring IPv6")
66             interface.set_table_ip6(ipv6_table_id)
67             interface.config_ip6()
68             interface.resolve_ndp(timeout=5)
69         if ipv4:
70             self.logger.debug("Configuring IPv4")
71             interface.set_table_ip4(ipv4_table_id)
72             interface.config_ip4()
73             interface.resolve_arp()
74         interface.admin_up()
75
76     def setup_interfaces(self, ipv6=[], ipv4=[], ipv6_table_id=[], ipv4_table_id=[]):
77         """Create and configure interfaces.
78
79         :param ipv6: list of interface IPv6 capabilities
80         :param ipv4: list of interface IPv4 capabilities
81         :param ipv6_table_id: list of intf IPv6 FIB table_ids
82         :param ipv4_table_id: list of intf IPv4 FIB table_ids
83         :returns: List of created interfaces.
84         """
85         # how many interfaces?
86         if len(ipv6):
87             count = len(ipv6)
88         else:
89             count = len(ipv4)
90         self.logger.debug("Creating and configuring %d interfaces" % (count))
91
92         # fill up ipv6 and ipv4 lists if needed
93         # not enabled (False) is the default
94         if len(ipv6) < count:
95             ipv6 += (count - len(ipv6)) * [False]
96         if len(ipv4) < count:
97             ipv4 += (count - len(ipv4)) * [False]
98
99         # fill up table_id lists if needed
100         # table_id 0 (global) is the default
101         if len(ipv6_table_id) < count:
102             ipv6_table_id += (count - len(ipv6_table_id)) * [0]
103         if len(ipv4_table_id) < count:
104             ipv4_table_id += (count - len(ipv4_table_id)) * [0]
105
106         # create 'count' pg interfaces
107         self.create_pg_interfaces(range(count))
108
109         # setup all interfaces
110         for i in range(count):
111             intf = self.pg_interfaces[i]
112             self.configure_interface(
113                 intf, ipv6[i], ipv4[i], ipv6_table_id[i], ipv4_table_id[i]
114             )
115
116         if any(ipv6):
117             self.logger.debug(self.vapi.cli("show ip6 neighbors"))
118         if any(ipv4):
119             self.logger.debug(self.vapi.cli("show ip4 neighbors"))
120         self.logger.debug(self.vapi.cli("show interface"))
121         self.logger.debug(self.vapi.cli("show hardware"))
122
123         return self.pg_interfaces
124
125     def teardown_interfaces(self):
126         """Unconfigure and bring down interface."""
127         self.logger.debug("Tearing down interfaces")
128         # tear down all interfaces
129         # AFAIK they cannot be deleted
130         for i in self.pg_interfaces:
131             self.logger.debug("Tear down interface %s" % (i.name))
132             i.admin_down()
133             i.unconfig()
134             i.set_table_ip4(0)
135             i.set_table_ip6(0)
136
137     def test_SRv6_End_AS_IPv6_noSRH(self):
138         """Test SRv6 End.AS behavior with IPv6 traffic and no SRH rewrite."""
139         self.run_SRv6_End_AS_IPv6(
140             sid_list=["a1::", "a2::a6", "a3::"],
141             test_sid_index=1,
142             rewrite_src_addr="a2::",
143         )
144
145     def test_SRv6_End_AS_IPv6_SRH(self):
146         """Test SRv6 End.AS behavior with IPv6 traffic and SRH rewrite."""
147         self.run_SRv6_End_AS_IPv6(
148             sid_list=["a1::a6", "a2::", "a3::"],
149             test_sid_index=0,
150             rewrite_src_addr="a1::",
151         )
152
153     def test_SRv6_End_AS_IPv4_noSRH(self):
154         """Test SRv6 End.AS behavior with IPv4 traffic and no SRH rewrite."""
155         self.run_SRv6_End_AS_IPv4(
156             sid_list=["a1::", "a2::a6", "a3::"],
157             test_sid_index=1,
158             rewrite_src_addr="a2::",
159         )
160
161     def test_SRv6_End_AS_IPv4_SRH(self):
162         """Test SRv6 End.AS behavior with IPv4 traffic and SRH rewrite."""
163         self.run_SRv6_End_AS_IPv4(
164             sid_list=["a1::a6", "a2::", "a3::"],
165             test_sid_index=0,
166             rewrite_src_addr="a1::",
167         )
168
169     def test_SRv6_End_AS_L2_noSRH(self):
170         """Test SRv6 End.AS behavior with L2 traffic and no SRH rewrite."""
171         self.run_SRv6_End_AS_L2(
172             sid_list=["a1::", "a2::a6", "a3::"],
173             test_sid_index=1,
174             rewrite_src_addr="a2::",
175         )
176
177     def test_SRv6_End_AS_L2_SRH(self):
178         """Test SRv6 End.AS behavior with L2 traffic and SRH rewrite."""
179         self.run_SRv6_End_AS_L2(
180             sid_list=["a1::a6", "a2::", "a3::"],
181             test_sid_index=0,
182             rewrite_src_addr="a1::",
183         )
184
185     def run_SRv6_End_AS_L2(self, sid_list, test_sid_index, rewrite_src_addr):
186         """Run SRv6 End.AS test with L2 traffic."""
187         self.rewrite_src_addr = rewrite_src_addr
188         self.rewrite_sid_list = sid_list[test_sid_index + 1 : :]
189
190         # send traffic to one destination interface
191         # source and destination interfaces are IPv6 only
192         self.setup_interfaces(ipv6=[True, False])
193
194         # configure route to next segment
195         route = VppIpRoute(
196             self,
197             sid_list[test_sid_index + 1],
198             128,
199             [VppRoutePath(self.pg0.remote_ip6, self.pg0.sw_if_index)],
200         )
201         route.add_vpp_config()
202
203         # configure SRv6 localSID behavior
204         cli_str = (
205             "sr localsid address "
206             + sid_list[test_sid_index]
207             + " behavior end.as"
208             + " oif "
209             + self.pg1.name
210             + " iif "
211             + self.pg1.name
212             + " src "
213             + self.rewrite_src_addr
214         )
215         for s in self.rewrite_sid_list:
216             cli_str += " next " + s
217         self.vapi.cli(cli_str)
218
219         # log the localsids
220         self.logger.debug(self.vapi.cli("show sr localsid"))
221
222         # send one packet per packet size
223         count = len(self.pg_packet_sizes)
224
225         # prepare L2 in SRv6 headers
226         packet_header1 = self.create_packet_header_IPv6_SRH_L2(
227             sidlist=sid_list[::-1], segleft=len(sid_list) - test_sid_index - 1, vlan=0
228         )
229
230         # generate packets (pg0->pg1)
231         pkts1 = self.create_stream(
232             self.pg0, self.pg1, packet_header1, self.pg_packet_sizes, count
233         )
234
235         # send packets and verify received packets
236         self.send_and_verify_pkts(
237             self.pg0, pkts1, self.pg1, self.compare_rx_tx_packet_End_AS_L2_out
238         )
239
240         # log the localsid counters
241         self.logger.info(self.vapi.cli("show sr localsid"))
242
243         # prepare L2 header for returning packets
244         packet_header2 = self.create_packet_header_L2()
245
246         # generate returning packets (pg1->pg0)
247         pkts2 = self.create_stream(
248             self.pg1, self.pg0, packet_header2, self.pg_packet_sizes, count
249         )
250
251         # send packets and verify received packets
252         self.send_and_verify_pkts(
253             self.pg1, pkts2, self.pg0, self.compare_rx_tx_packet_End_AS_L2_in
254         )
255
256         # log the localsid counters
257         self.logger.info(self.vapi.cli("show sr localsid"))
258
259         # remove SRv6 localSIDs
260         self.vapi.cli("sr localsid del address " + sid_list[test_sid_index])
261
262         # cleanup interfaces
263         self.teardown_interfaces()
264
265     def run_SRv6_End_AS_IPv6(self, sid_list, test_sid_index, rewrite_src_addr):
266         """Run SRv6 End.AS test with IPv6 traffic."""
267         self.rewrite_src_addr = rewrite_src_addr
268         self.rewrite_sid_list = sid_list[test_sid_index + 1 : :]
269
270         # send traffic to one destination interface
271         # source and destination interfaces are IPv6 only
272         self.setup_interfaces(ipv6=[True, True])
273
274         # configure route to next segment
275         route = VppIpRoute(
276             self,
277             sid_list[test_sid_index + 1],
278             128,
279             [VppRoutePath(self.pg0.remote_ip6, self.pg0.sw_if_index)],
280         )
281         route.add_vpp_config()
282
283         # configure SRv6 localSID behavior
284         cli_str = (
285             "sr localsid address "
286             + sid_list[test_sid_index]
287             + " behavior end.as"
288             + " nh "
289             + self.pg1.remote_ip6
290             + " oif "
291             + self.pg1.name
292             + " iif "
293             + self.pg1.name
294             + " src "
295             + self.rewrite_src_addr
296         )
297         for s in self.rewrite_sid_list:
298             cli_str += " next " + s
299         self.vapi.cli(cli_str)
300
301         # log the localsids
302         self.logger.debug(self.vapi.cli("show sr localsid"))
303
304         # send one packet per packet size
305         count = len(self.pg_packet_sizes)
306
307         # prepare IPv6 in SRv6 headers
308         packet_header1 = self.create_packet_header_IPv6_SRH_IPv6(
309             sidlist=sid_list[::-1], segleft=len(sid_list) - test_sid_index - 1
310         )
311
312         # generate packets (pg0->pg1)
313         pkts1 = self.create_stream(
314             self.pg0, self.pg1, packet_header1, self.pg_packet_sizes, count
315         )
316
317         # send packets and verify received packets
318         self.send_and_verify_pkts(
319             self.pg0, pkts1, self.pg1, self.compare_rx_tx_packet_End_AS_IPv6_out
320         )
321
322         # log the localsid counters
323         self.logger.info(self.vapi.cli("show sr localsid"))
324
325         # prepare IPv6 header for returning packets
326         packet_header2 = self.create_packet_header_IPv6()
327
328         # generate returning packets (pg1->pg0)
329         pkts2 = self.create_stream(
330             self.pg1, self.pg0, packet_header2, self.pg_packet_sizes, count
331         )
332
333         # send packets and verify received packets
334         self.send_and_verify_pkts(
335             self.pg1, pkts2, self.pg0, self.compare_rx_tx_packet_End_AS_IPv6_in
336         )
337
338         # log the localsid counters
339         self.logger.info(self.vapi.cli("show sr localsid"))
340
341         # remove SRv6 localSIDs
342         self.vapi.cli("sr localsid del address " + sid_list[test_sid_index])
343
344         # cleanup interfaces
345         self.teardown_interfaces()
346
347     def run_SRv6_End_AS_IPv4(self, sid_list, test_sid_index, rewrite_src_addr):
348         """Run SRv6 End.AS test with IPv4 traffic."""
349         self.rewrite_src_addr = rewrite_src_addr
350         self.rewrite_sid_list = sid_list[test_sid_index + 1 : :]
351
352         # send traffic to one destination interface
353         # source and destination interfaces are IPv6 only
354         self.setup_interfaces(ipv6=[True, False], ipv4=[True, True])
355
356         # configure route to next segment
357         route = VppIpRoute(
358             self,
359             sid_list[test_sid_index + 1],
360             128,
361             [VppRoutePath(self.pg0.remote_ip6, self.pg0.sw_if_index)],
362         )
363         route.add_vpp_config()
364
365         # configure SRv6 localSID behavior
366         cli_str = (
367             "sr localsid address "
368             + sid_list[test_sid_index]
369             + " behavior end.as"
370             + " nh "
371             + self.pg1.remote_ip4
372             + " oif "
373             + self.pg1.name
374             + " iif "
375             + self.pg1.name
376             + " src "
377             + self.rewrite_src_addr
378         )
379         for s in self.rewrite_sid_list:
380             cli_str += " next " + s
381         self.vapi.cli(cli_str)
382
383         # log the localsids
384         self.logger.debug(self.vapi.cli("show sr localsid"))
385
386         # send one packet per packet size
387         count = len(self.pg_packet_sizes)
388
389         # prepare IPv4 in SRv6 headers
390         packet_header1 = self.create_packet_header_IPv6_SRH_IPv4(
391             sidlist=sid_list[::-1], segleft=len(sid_list) - test_sid_index - 1
392         )
393
394         # generate packets (pg0->pg1)
395         pkts1 = self.create_stream(
396             self.pg0, self.pg1, packet_header1, self.pg_packet_sizes, count
397         )
398
399         # send packets and verify received packets
400         self.send_and_verify_pkts(
401             self.pg0, pkts1, self.pg1, self.compare_rx_tx_packet_End_AS_IPv4_out
402         )
403
404         # log the localsid counters
405         self.logger.info(self.vapi.cli("show sr localsid"))
406
407         # prepare IPv6 header for returning packets
408         packet_header2 = self.create_packet_header_IPv4()
409
410         # generate returning packets (pg1->pg0)
411         pkts2 = self.create_stream(
412             self.pg1, self.pg0, packet_header2, self.pg_packet_sizes, count
413         )
414
415         # send packets and verify received packets
416         self.send_and_verify_pkts(
417             self.pg1, pkts2, self.pg0, self.compare_rx_tx_packet_End_AS_IPv4_in
418         )
419
420         # log the localsid counters
421         self.logger.info(self.vapi.cli("show sr localsid"))
422
423         # remove SRv6 localSIDs
424         self.vapi.cli("sr localsid del address " + sid_list[test_sid_index])
425
426         # cleanup interfaces
427         self.teardown_interfaces()
428
429     def compare_rx_tx_packet_End_AS_IPv6_in(self, tx_pkt, rx_pkt):
430         """Compare input and output packet after passing End.AS
431
432         :param tx_pkt: transmitted packet
433         :param rx_pkt: received packet
434         """
435
436         # get first (outer) IPv6 header of rx'ed packet
437         rx_ip = rx_pkt.getlayer(IPv6)
438         rx_srh = None
439
440         tx_ip = tx_pkt.getlayer(IPv6)
441
442         # expected segment-list (SRH order)
443         tx_seglist = self.rewrite_sid_list[::-1]
444
445         # received ip.src should be equal to SR Policy source
446         self.assertEqual(rx_ip.src, self.rewrite_src_addr)
447         # received ip.dst should be equal to expected sidlist[lastentry]
448         self.assertEqual(rx_ip.dst, tx_seglist[-1])
449
450         if len(tx_seglist) > 1:
451             # rx'ed packet should have SRH
452             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
453             # get SRH
454             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
455             # rx'ed seglist should be equal to expected seglist
456             self.assertEqual(rx_srh.addresses, tx_seglist)
457             # segleft should be equal to size expected seglist-1
458             self.assertEqual(rx_srh.segleft, len(tx_seglist) - 1)
459             # segleft should be equal to lastentry
460             self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
461             # get payload
462             payload = rx_srh.payload
463         else:
464             # rx'ed packet should NOT have SRH
465             self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
466             # get payload
467             payload = rx_ip.payload
468
469         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
470         # except for the hop-limit field
471         #   -> update tx'ed hlim to the expected hlim
472         tx_ip.hlim = tx_ip.hlim - 1
473
474         self.assertEqual(payload, tx_ip)
475
476         self.logger.debug("packet verification: SUCCESS")
477
478     def compare_rx_tx_packet_End_AS_IPv4_in(self, tx_pkt, rx_pkt):
479         """Compare input and output packet after passing End.AS
480
481         :param tx_pkt: transmitted packet
482         :param rx_pkt: received packet
483         """
484
485         # get first (outer) IPv6 header of rx'ed packet
486         rx_ip = rx_pkt.getlayer(IPv6)
487         rx_srh = None
488
489         tx_ip = tx_pkt.getlayer(IP)
490
491         # expected segment-list (SRH order)
492         tx_seglist = self.rewrite_sid_list[::-1]
493
494         # received ip.src should be equal to SR Policy source
495         self.assertEqual(rx_ip.src, self.rewrite_src_addr)
496         # received ip.dst should be equal to expected sidlist[lastentry]
497         self.assertEqual(rx_ip.dst, tx_seglist[-1])
498
499         if len(tx_seglist) > 1:
500             # rx'ed packet should have SRH and IPv4 header
501             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
502             self.assertTrue(rx_ip.payload.haslayer(IP))
503             # get SRH
504             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
505             # rx'ed seglist should be equal to seglist
506             self.assertEqual(rx_srh.addresses, tx_seglist)
507             # segleft should be equal to size seglist-1
508             self.assertEqual(rx_srh.segleft, len(tx_seglist) - 1)
509             # segleft should be equal to lastentry
510             self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
511             payload = rx_srh.payload
512         else:
513             # rx'ed packet should NOT have SRH
514             self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
515             # get payload
516             payload = rx_ip.payload
517
518         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
519         # except for the ttl field and ip checksum
520         #   -> adjust tx'ed ttl to expected ttl
521         tx_ip.ttl = tx_ip.ttl - 1
522         #   -> set tx'ed ip checksum to None and let scapy recompute
523         tx_ip.chksum = None
524         # read back the pkt (with str()) to force computing these fields
525         # probably other ways to accomplish this are possible
526         tx_ip = IP(scapy.compat.raw(tx_ip))
527
528         self.assertEqual(payload, tx_ip)
529
530         self.logger.debug("packet verification: SUCCESS")
531
532     def compare_rx_tx_packet_End_AS_L2_in(self, tx_pkt, rx_pkt):
533         """Compare input and output packet after passing End.AS
534
535         :param tx_pkt: transmitted packet
536         :param rx_pkt: received packet
537         """
538
539         # get first (outer) IPv6 header of rx'ed packet
540         rx_ip = rx_pkt.getlayer(IPv6)
541         rx_srh = None
542
543         tx_ether = tx_pkt.getlayer(Ether)
544
545         # expected segment-list (SRH order)
546         tx_seglist = self.rewrite_sid_list[::-1]
547
548         # received ip.src should be equal to SR Policy source
549         self.assertEqual(rx_ip.src, self.rewrite_src_addr)
550         # received ip.dst should be equal to expected sidlist[lastentry]
551         self.assertEqual(rx_ip.dst, tx_seglist[-1])
552
553         if len(tx_seglist) > 1:
554             # rx'ed packet should have SRH
555             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
556             # get SRH
557             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
558             # rx'ed seglist should be equal to seglist
559             self.assertEqual(rx_srh.addresses, tx_seglist)
560             # segleft should be equal to size seglist-1
561             self.assertEqual(rx_srh.segleft, len(tx_seglist) - 1)
562             # segleft should be equal to lastentry
563             self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
564             # nh should be "No Next Header" (143)
565             self.assertEqual(rx_srh.nh, 143)
566             # get payload
567             payload = rx_srh.payload
568         else:
569             # rx'ed packet should NOT have SRH
570             self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
571             # get payload
572             payload = rx_ip.payload
573
574         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
575         self.assertEqual(Ether(scapy.compat.raw(payload)), tx_ether)
576
577         self.logger.debug("packet verification: SUCCESS")
578
579     def compare_rx_tx_packet_End_AS_IPv6_out(self, tx_pkt, rx_pkt):
580         """Compare input and output packet after passing End.AS with IPv6
581
582         :param tx_pkt: transmitted packet
583         :param rx_pkt: received packet
584         """
585
586         # get first (outer) IPv6 header of rx'ed packet
587         rx_ip = rx_pkt.getlayer(IPv6)
588
589         tx_ip = tx_pkt.getlayer(IPv6)
590         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
591
592         # verify if rx'ed packet has no SRH
593         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
594
595         # the whole rx_ip pkt should be equal to tx_ip2
596         # except for the hlim field
597         #   -> adjust tx'ed hlim to expected hlim
598         tx_ip2.hlim = tx_ip2.hlim - 1
599
600         self.assertEqual(rx_ip, tx_ip2)
601
602         self.logger.debug("packet verification: SUCCESS")
603
604     def compare_rx_tx_packet_End_AS_IPv4_out(self, tx_pkt, rx_pkt):
605         """Compare input and output packet after passing End.AS with IPv4
606
607         :param tx_pkt: transmitted packet
608         :param rx_pkt: received packet
609         """
610
611         # get IPv4 header of rx'ed packet
612         rx_ip = rx_pkt.getlayer(IP)
613
614         tx_ip = tx_pkt.getlayer(IPv6)
615         tx_ip2 = tx_pkt.getlayer(IP)
616
617         # verify if rx'ed packet has no SRH
618         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
619
620         # the whole rx_ip pkt should be equal to tx_ip2
621         # except for the ttl field and ip checksum
622         #   -> adjust tx'ed ttl to expected ttl
623         tx_ip2.ttl = tx_ip2.ttl - 1
624         #   -> set tx'ed ip checksum to None and let scapy recompute
625         tx_ip2.chksum = None
626         # read back the pkt (with str()) to force computing these fields
627         # probably other ways to accomplish this are possible
628         tx_ip2 = IP(scapy.compat.raw(tx_ip2))
629
630         self.assertEqual(rx_ip, tx_ip2)
631
632         self.logger.debug("packet verification: SUCCESS")
633
634     def compare_rx_tx_packet_End_AS_L2_out(self, tx_pkt, rx_pkt):
635         """Compare input and output packet after passing End.AS with L2
636
637         :param tx_pkt: transmitted packet
638         :param rx_pkt: received packet
639         """
640
641         # get IPv4 header of rx'ed packet
642         rx_eth = rx_pkt.getlayer(Ether)
643
644         tx_ip = tx_pkt.getlayer(IPv6)
645         # we can't just get the 2nd Ether layer
646         # get the Raw content and dissect it as Ether
647         tx_eth1 = Ether(scapy.compat.raw(tx_pkt[Raw]))
648
649         # verify if rx'ed packet has no SRH
650         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
651
652         # the whole rx_eth pkt should be equal to tx_eth1
653         self.assertEqual(rx_eth, tx_eth1)
654
655         self.logger.debug("packet verification: SUCCESS")
656
657     def create_stream(self, src_if, dst_if, packet_header, packet_sizes, count):
658         """Create SRv6 input packet stream for defined interface.
659
660         :param VppInterface src_if: Interface to create packet stream for
661         :param VppInterface dst_if: destination interface of packet stream
662         :param packet_header: Layer3 scapy packet headers,
663                 L2 is added when not provided,
664                 Raw(payload) with packet_info is added
665         :param list packet_sizes: packet stream pckt sizes,sequentially applied
666                to packets in stream have
667         :param int count: number of packets in packet stream
668         :return: list of packets
669         """
670         self.logger.info("Creating packets")
671         pkts = []
672         for i in range(0, count - 1):
673             payload_info = self.create_packet_info(src_if, dst_if)
674             self.logger.debug("Creating packet with index %d" % (payload_info.index))
675             payload = self.info_to_payload(payload_info)
676             # add L2 header if not yet provided in packet_header
677             if packet_header.getlayer(0).name == "Ethernet":
678                 p = packet_header / Raw(payload)
679             else:
680                 p = (
681                     Ether(dst=src_if.local_mac, src=src_if.remote_mac)
682                     / packet_header
683                     / Raw(payload)
684                 )
685             size = packet_sizes[i % len(packet_sizes)]
686             self.logger.debug("Packet size %d" % (size))
687             self.extend_packet(p, size)
688             # we need to store the packet with the automatic fields computed
689             # read back the dumped packet (with str())
690             # to force computing these fields
691             # probably other ways are possible
692             p = Ether(scapy.compat.raw(p))
693             payload_info.data = p.copy()
694             self.logger.debug(ppp("Created packet:", p))
695             pkts.append(p)
696         self.logger.info("Done creating packets")
697         return pkts
698
699     def send_and_verify_pkts(self, input, pkts, output, compare_func):
700         """Send packets and verify received packets using compare_func
701
702         :param input: ingress interface of DUT
703         :param pkts: list of packets to transmit
704         :param output: egress interface of DUT
705         :param compare_func: function to compare in and out packets
706         """
707         # add traffic stream to input interface
708         input.add_stream(pkts)
709
710         # enable capture on all interfaces
711         self.pg_enable_capture(self.pg_interfaces)
712
713         # start traffic
714         self.logger.info("Starting traffic")
715         self.pg_start()
716
717         # get output capture
718         self.logger.info("Getting packet capture")
719         capture = output.get_capture()
720
721         # assert nothing was captured on input interface
722         # input.assert_nothing_captured()
723
724         # verify captured packets
725         self.verify_captured_pkts(output, capture, compare_func)
726
727     def create_packet_header_IPv6(self):
728         """Create packet header: IPv6 header, UDP header
729
730         :param dst: IPv6 destination address
731
732         IPv6 source address is 1234::1
733         IPv6 destination address is 4321::1
734         UDP source port and destination port are 1234
735         """
736
737         p = IPv6(src="1234::1", dst="4321::1") / UDP(sport=1234, dport=1234)
738         return p
739
740     def create_packet_header_IPv6_SRH_IPv6(self, sidlist, segleft):
741         """Create packet header: IPv6 encapsulated in SRv6:
742         IPv6 header with SRH, IPv6 header, UDP header
743
744         :param list sidlist: segment list of outer IPv6 SRH
745         :param int segleft: segments-left field of outer IPv6 SRH
746
747         Outer IPv6 source address is set to 5678::1
748         Outer IPv6 destination address is set to sidlist[segleft]
749         IPv6 source addresses is 1234::1
750         IPv6 destination address is 4321::1
751         UDP source port and destination port are 1234
752         """
753
754         p = (
755             IPv6(src="5678::1", dst=sidlist[segleft])
756             / IPv6ExtHdrSegmentRouting(addresses=sidlist, segleft=segleft, nh=41)
757             / IPv6(src="1234::1", dst="4321::1")
758             / UDP(sport=1234, dport=1234)
759         )
760         return p
761
762     def create_packet_header_IPv4(self):
763         """Create packet header: IPv4 header, UDP header
764
765         :param dst: IPv4 destination address
766
767         IPv4 source address is 123.1.1.1
768         IPv4 destination address is 124.1.1.1
769         UDP source port and destination port are 1234
770         """
771
772         p = IP(src="123.1.1.1", dst="124.1.1.1") / UDP(sport=1234, dport=1234)
773         return p
774
775     def create_packet_header_IPv6_SRH_IPv4(self, sidlist, segleft):
776         """Create packet header: IPv4 encapsulated in SRv6:
777         IPv6 header with SRH, IPv4 header, UDP header
778
779         :param ipv4address dst: inner IPv4 destination address
780         :param list sidlist: segment list of outer IPv6 SRH
781         :param int segleft: segments-left field of outer IPv6 SRH
782
783         Outer IPv6 destination address is set to sidlist[segleft]
784         IPv6 source address is 1234::1
785         IPv4 source address is 123.1.1.1
786         IPv4 destination address is 124.1.1.1
787         UDP source port and destination port are 1234
788         """
789
790         p = (
791             IPv6(src="1234::1", dst=sidlist[segleft])
792             / IPv6ExtHdrSegmentRouting(addresses=sidlist, segleft=segleft, nh=4)
793             / IP(src="123.1.1.1", dst="124.1.1.1")
794             / UDP(sport=1234, dport=1234)
795         )
796         return p
797
798     def create_packet_header_L2(self, vlan=0):
799         """Create packet header: L2 header
800
801         :param vlan: if vlan!=0 then add 802.1q header
802         """
803         # Note: the dst addr ('00:55:44:33:22:11') is used in
804         # the compare function compare_rx_tx_packet_T_Encaps_L2
805         # to detect presence of L2 in SRH payload
806         p = Ether(src="00:11:22:33:44:55", dst="00:55:44:33:22:11")
807         etype = 0x8137  # IPX
808         if vlan:
809             # add 802.1q layer
810             p /= Dot1Q(vlan=vlan, type=etype)
811         else:
812             p.type = etype
813         return p
814
815     def create_packet_header_IPv6_SRH_L2(self, sidlist, segleft, vlan=0):
816         """Create packet header: L2 encapsulated in SRv6:
817         IPv6 header with SRH, L2
818
819         :param list sidlist: segment list of outer IPv6 SRH
820         :param int segleft: segments-left field of outer IPv6 SRH
821         :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
822
823         Outer IPv6 destination address is set to sidlist[segleft]
824         IPv6 source address is 1234::1
825         """
826         eth = Ether(src="00:11:22:33:44:55", dst="00:55:44:33:22:11")
827         etype = 0x8137  # IPX
828         if vlan:
829             # add 802.1q layer
830             eth /= Dot1Q(vlan=vlan, type=etype)
831         else:
832             eth.type = etype
833
834         p = (
835             IPv6(src="1234::1", dst=sidlist[segleft])
836             / IPv6ExtHdrSegmentRouting(addresses=sidlist, segleft=segleft, nh=143)
837             / eth
838         )
839         return p
840
841     def get_payload_info(self, packet):
842         """Extract the payload_info from the packet"""
843         # in most cases, payload_info is in packet[Raw]
844         # but packet[Raw] gives the complete payload
845         # (incl L2 header) for the T.Encaps L2 case
846         try:
847             payload_info = self.payload_to_info(packet[Raw])
848
849         except:
850             # remote L2 header from packet[Raw]:
851             # take packet[Raw], convert it to an Ether layer
852             # and then extract Raw from it
853             payload_info = self.payload_to_info(
854                 Ether(scapy.compat.raw(packet[Raw]))[Raw]
855             )
856
857         return payload_info
858
859     def verify_captured_pkts(self, dst_if, capture, compare_func):
860         """
861         Verify captured packet stream for specified interface.
862         Compare ingress with egress packets using the specified compare fn
863
864         :param dst_if: egress interface of DUT
865         :param capture: captured packets
866         :param compare_func: function to compare in and out packet
867         """
868         self.logger.info(
869             "Verifying capture on interface %s using function %s"
870             % (dst_if.name, compare_func.__name__)
871         )
872
873         last_info = dict()
874         for i in self.pg_interfaces:
875             last_info[i.sw_if_index] = None
876         dst_sw_if_index = dst_if.sw_if_index
877
878         for packet in capture:
879             try:
880                 # extract payload_info from packet's payload
881                 payload_info = self.get_payload_info(packet)
882                 packet_index = payload_info.index
883
884                 self.logger.debug("Verifying packet with index %d" % (packet_index))
885                 # packet should have arrived on the expected interface
886                 self.assertEqual(payload_info.dst, dst_sw_if_index)
887                 self.logger.debug(
888                     "Got packet on interface %s: src=%u (idx=%u)"
889                     % (dst_if.name, payload_info.src, packet_index)
890                 )
891
892                 # search for payload_info with same src and dst if_index
893                 # this will give us the transmitted packet
894                 next_info = self.get_next_packet_info_for_interface2(
895                     payload_info.src, dst_sw_if_index, last_info[payload_info.src]
896                 )
897                 last_info[payload_info.src] = next_info
898                 # next_info should not be None
899                 self.assertTrue(next_info is not None)
900                 # index of tx and rx packets should be equal
901                 self.assertEqual(packet_index, next_info.index)
902                 # data field of next_info contains the tx packet
903                 txed_packet = next_info.data
904
905                 self.logger.debug(
906                     ppp("Transmitted packet:", txed_packet)
907                 )  # ppp=Pretty Print Packet
908
909                 self.logger.debug(ppp("Received packet:", packet))
910
911                 # compare rcvd packet with expected packet using compare_func
912                 compare_func(txed_packet, packet)
913
914             except:
915                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
916                 raise
917
918         # have all expected packets arrived?
919         for i in self.pg_interfaces:
920             remaining_packet = self.get_next_packet_info_for_interface2(
921                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index]
922             )
923             self.assertTrue(
924                 remaining_packet is None,
925                 "Interface %s: Packet expected from interface %s "
926                 "didn't arrive" % (dst_if.name, i.name),
927             )
928
929
930 if __name__ == "__main__":
931     unittest.main(testRunner=VppTestRunner)