tests: changes for scapy 2.4.3 migration
[vpp.git] / src / plugins / srv6-as / test / test_srv6_as.py
1 #!/usr/bin/env python3
2
3 import unittest
4 import binascii
5 from socket import AF_INET6
6
7 from framework import VppTestCase, VppTestRunner
8 from vpp_ip_route import VppIpRoute, VppRoutePath, FibPathProto, VppIpTable
9 from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \
10     SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes
11
12 import scapy.compat
13 from scapy.packet import Raw
14 from scapy.layers.l2 import Ether, Dot1Q
15 from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
16 from scapy.layers.inet import IP, UDP
17
18 from util import ppp
19
20
21 class TestSRv6(VppTestCase):
22     """ SRv6 Static Proxy plugin Test Case """
23
24     @classmethod
25     def setUpClass(self):
26         super(TestSRv6, self).setUpClass()
27
28     @classmethod
29     def tearDownClass(cls):
30         super(TestSRv6, cls).tearDownClass()
31
32     def setUp(self):
33         """ Perform test setup before each test case.
34         """
35         super(TestSRv6, self).setUp()
36
37         # packet sizes, inclusive L2 overhead
38         self.pg_packet_sizes = [64, 512, 1518, 9018]
39
40         # reset packet_infos
41         self.reset_packet_infos()
42
43     def tearDown(self):
44         """ Clean up test setup after each test case.
45         """
46         self.teardown_interfaces()
47
48         super(TestSRv6, self).tearDown()
49
50     def configure_interface(self,
51                             interface,
52                             ipv6=False, ipv4=False,
53                             ipv6_table_id=0, ipv4_table_id=0):
54         """ Configure interface.
55         :param ipv6: configure IPv6 on interface
56         :param ipv4: configure IPv4 on interface
57         :param ipv6_table_id: FIB table_id for IPv6
58         :param ipv4_table_id: FIB table_id for IPv4
59         """
60         self.logger.debug("Configuring interface %s" % (interface.name))
61         if ipv6:
62             self.logger.debug("Configuring IPv6")
63             interface.set_table_ip6(ipv6_table_id)
64             interface.config_ip6()
65             interface.resolve_ndp(timeout=5)
66         if ipv4:
67             self.logger.debug("Configuring IPv4")
68             interface.set_table_ip4(ipv4_table_id)
69             interface.config_ip4()
70             interface.resolve_arp()
71         interface.admin_up()
72
73     def setup_interfaces(self, ipv6=[], ipv4=[],
74                          ipv6_table_id=[], ipv4_table_id=[]):
75         """ Create and configure interfaces.
76
77         :param ipv6: list of interface IPv6 capabilities
78         :param ipv4: list of interface IPv4 capabilities
79         :param ipv6_table_id: list of intf IPv6 FIB table_ids
80         :param ipv4_table_id: list of intf IPv4 FIB table_ids
81         :returns: List of created interfaces.
82         """
83         # how many interfaces?
84         if len(ipv6):
85             count = len(ipv6)
86         else:
87             count = len(ipv4)
88         self.logger.debug("Creating and configuring %d interfaces" % (count))
89
90         # fill up ipv6 and ipv4 lists if needed
91         # not enabled (False) is the default
92         if len(ipv6) < count:
93             ipv6 += (count - len(ipv6)) * [False]
94         if len(ipv4) < count:
95             ipv4 += (count - len(ipv4)) * [False]
96
97         # fill up table_id lists if needed
98         # table_id 0 (global) is the default
99         if len(ipv6_table_id) < count:
100             ipv6_table_id += (count - len(ipv6_table_id)) * [0]
101         if len(ipv4_table_id) < count:
102             ipv4_table_id += (count - len(ipv4_table_id)) * [0]
103
104         # create 'count' pg interfaces
105         self.create_pg_interfaces(range(count))
106
107         # setup all interfaces
108         for i in range(count):
109             intf = self.pg_interfaces[i]
110             self.configure_interface(intf,
111                                      ipv6[i], ipv4[i],
112                                      ipv6_table_id[i], ipv4_table_id[i])
113
114         if any(ipv6):
115             self.logger.debug(self.vapi.cli("show ip6 neighbors"))
116         if any(ipv4):
117             self.logger.debug(self.vapi.cli("show ip arp"))
118         self.logger.debug(self.vapi.cli("show interface"))
119         self.logger.debug(self.vapi.cli("show hardware"))
120
121         return self.pg_interfaces
122
123     def teardown_interfaces(self):
124         """ Unconfigure and bring down interface.
125         """
126         self.logger.debug("Tearing down interfaces")
127         # tear down all interfaces
128         # AFAIK they cannot be deleted
129         for i in self.pg_interfaces:
130             self.logger.debug("Tear down interface %s" % (i.name))
131             i.admin_down()
132             i.unconfig()
133             i.set_table_ip4(0)
134             i.set_table_ip6(0)
135
136     def test_SRv6_End_AS_IPv6_noSRH(self):
137         """ Test SRv6 End.AS behavior with IPv6 traffic and no SRH rewrite.
138         """
139         self.run_SRv6_End_AS_IPv6(
140             sid_list=['a1::', 'a2::a6', 'a3::'],
141             test_sid_index=1,
142             rewrite_src_addr='a2::')
143
144     def test_SRv6_End_AS_IPv6_SRH(self):
145         """ Test SRv6 End.AS behavior with IPv6 traffic and SRH rewrite.
146         """
147         self.run_SRv6_End_AS_IPv6(
148             sid_list=['a1::a6', 'a2::', 'a3::'],
149             test_sid_index=0,
150             rewrite_src_addr='a1::')
151
152     def test_SRv6_End_AS_IPv4_noSRH(self):
153         """ Test SRv6 End.AS behavior with IPv4 traffic and no SRH rewrite.
154         """
155         self.run_SRv6_End_AS_IPv4(
156             sid_list=['a1::', 'a2::a6', 'a3::'],
157             test_sid_index=1,
158             rewrite_src_addr='a2::')
159
160     def test_SRv6_End_AS_IPv4_SRH(self):
161         """ Test SRv6 End.AS behavior with IPv4 traffic and SRH rewrite.
162         """
163         self.run_SRv6_End_AS_IPv4(
164             sid_list=['a1::a6', 'a2::', 'a3::'],
165             test_sid_index=0,
166             rewrite_src_addr='a1::')
167
168     def test_SRv6_End_AS_L2_noSRH(self):
169         """ Test SRv6 End.AS behavior with L2 traffic and no SRH rewrite.
170         """
171         self.run_SRv6_End_AS_L2(
172             sid_list=['a1::', 'a2::a6', 'a3::'],
173             test_sid_index=1,
174             rewrite_src_addr='a2::')
175
176     def test_SRv6_End_AS_L2_SRH(self):
177         """ Test SRv6 End.AS behavior with L2 traffic and SRH rewrite.
178         """
179         self.run_SRv6_End_AS_L2(
180             sid_list=['a1::a6', 'a2::', 'a3::'],
181             test_sid_index=0,
182             rewrite_src_addr='a1::')
183
184     def run_SRv6_End_AS_L2(self, sid_list, test_sid_index, rewrite_src_addr):
185         """ Run SRv6 End.AS test with L2 traffic.
186         """
187         self.rewrite_src_addr = rewrite_src_addr
188         self.rewrite_sid_list = sid_list[test_sid_index + 1::]
189
190         # send traffic to one destination interface
191         # source and destination interfaces are IPv6 only
192         self.setup_interfaces(ipv6=[True, False])
193
194         # configure route to next segment
195         route = VppIpRoute(self, sid_list[test_sid_index + 1], 128,
196                            [VppRoutePath(self.pg0.remote_ip6,
197                                          self.pg0.sw_if_index)])
198         route.add_vpp_config()
199
200         # configure SRv6 localSID behavior
201         cli_str = "sr localsid address " + sid_list[test_sid_index] \
202             + " behavior end.as" \
203             + " oif " + self.pg1.name \
204             + " iif " + self.pg1.name \
205             + " src " + self.rewrite_src_addr
206         for s in self.rewrite_sid_list:
207             cli_str += " next " + s
208         self.vapi.cli(cli_str)
209
210         # log the localsids
211         self.logger.debug(self.vapi.cli("show sr localsid"))
212
213         # send one packet per packet size
214         count = len(self.pg_packet_sizes)
215
216         # prepare L2 in SRv6 headers
217         packet_header1 = self.create_packet_header_IPv6_SRH_L2(
218                         sidlist=sid_list[::-1],
219                         segleft=len(sid_list) - test_sid_index - 1,
220                         vlan=0)
221
222         # generate packets (pg0->pg1)
223         pkts1 = self.create_stream(self.pg0, self.pg1, packet_header1,
224                                    self.pg_packet_sizes, count)
225
226         # send packets and verify received packets
227         self.send_and_verify_pkts(self.pg0, pkts1, self.pg1,
228                                   self.compare_rx_tx_packet_End_AS_L2_out)
229
230         # log the localsid counters
231         self.logger.info(self.vapi.cli("show sr localsid"))
232
233         # prepare L2 header for returning packets
234         packet_header2 = self.create_packet_header_L2()
235
236         # generate returning packets (pg1->pg0)
237         pkts2 = self.create_stream(self.pg1, self.pg0, packet_header2,
238                                    self.pg_packet_sizes, count)
239
240         # send packets and verify received packets
241         self.send_and_verify_pkts(self.pg1, pkts2, self.pg0,
242                                   self.compare_rx_tx_packet_End_AS_L2_in)
243
244         # log the localsid counters
245         self.logger.info(self.vapi.cli("show sr localsid"))
246
247         # remove SRv6 localSIDs
248         self.vapi.cli("sr localsid del address " + sid_list[test_sid_index])
249
250         # cleanup interfaces
251         self.teardown_interfaces()
252
253     def run_SRv6_End_AS_IPv6(self, sid_list, test_sid_index, rewrite_src_addr):
254         """ Run SRv6 End.AS test with IPv6 traffic.
255         """
256         self.rewrite_src_addr = rewrite_src_addr
257         self.rewrite_sid_list = sid_list[test_sid_index + 1::]
258
259         # send traffic to one destination interface
260         # source and destination interfaces are IPv6 only
261         self.setup_interfaces(ipv6=[True, True])
262
263         # configure route to next segment
264         route = VppIpRoute(self, sid_list[test_sid_index + 1], 128,
265                            [VppRoutePath(self.pg0.remote_ip6,
266                                          self.pg0.sw_if_index)])
267         route.add_vpp_config()
268
269         # configure SRv6 localSID behavior
270         cli_str = "sr localsid address " + sid_list[test_sid_index] \
271             + " behavior end.as" \
272             + " nh " + self.pg1.remote_ip6 \
273             + " oif " + self.pg1.name \
274             + " iif " + self.pg1.name \
275             + " src " + self.rewrite_src_addr
276         for s in self.rewrite_sid_list:
277             cli_str += " next " + s
278         self.vapi.cli(cli_str)
279
280         # log the localsids
281         self.logger.debug(self.vapi.cli("show sr localsid"))
282
283         # send one packet per packet size
284         count = len(self.pg_packet_sizes)
285
286         # prepare IPv6 in SRv6 headers
287         packet_header1 = self.create_packet_header_IPv6_SRH_IPv6(
288                         sidlist=sid_list[::-1],
289                         segleft=len(sid_list) - test_sid_index - 1)
290
291         # generate packets (pg0->pg1)
292         pkts1 = self.create_stream(self.pg0, self.pg1, packet_header1,
293                                    self.pg_packet_sizes, count)
294
295         # send packets and verify received packets
296         self.send_and_verify_pkts(self.pg0, pkts1, self.pg1,
297                                   self.compare_rx_tx_packet_End_AS_IPv6_out)
298
299         # log the localsid counters
300         self.logger.info(self.vapi.cli("show sr localsid"))
301
302         # prepare IPv6 header for returning packets
303         packet_header2 = self.create_packet_header_IPv6()
304
305         # generate returning packets (pg1->pg0)
306         pkts2 = self.create_stream(self.pg1, self.pg0, packet_header2,
307                                    self.pg_packet_sizes, count)
308
309         # send packets and verify received packets
310         self.send_and_verify_pkts(self.pg1, pkts2, self.pg0,
311                                   self.compare_rx_tx_packet_End_AS_IPv6_in)
312
313         # log the localsid counters
314         self.logger.info(self.vapi.cli("show sr localsid"))
315
316         # remove SRv6 localSIDs
317         self.vapi.cli("sr localsid del address " + sid_list[test_sid_index])
318
319         # cleanup interfaces
320         self.teardown_interfaces()
321
322     def run_SRv6_End_AS_IPv4(self, sid_list, test_sid_index, rewrite_src_addr):
323         """ Run SRv6 End.AS test with IPv4 traffic.
324         """
325         self.rewrite_src_addr = rewrite_src_addr
326         self.rewrite_sid_list = sid_list[test_sid_index + 1::]
327
328         # send traffic to one destination interface
329         # source and destination interfaces are IPv6 only
330         self.setup_interfaces(ipv6=[True, False], ipv4=[True, True])
331
332         # configure route to next segment
333         route = VppIpRoute(self, sid_list[test_sid_index + 1], 128,
334                            [VppRoutePath(self.pg0.remote_ip6,
335                                          self.pg0.sw_if_index)])
336         route.add_vpp_config()
337
338         # configure SRv6 localSID behavior
339         cli_str = "sr localsid address " + sid_list[test_sid_index] \
340             + " behavior end.as" \
341             + " nh " + self.pg1.remote_ip4 \
342             + " oif " + self.pg1.name \
343             + " iif " + self.pg1.name \
344             + " src " + self.rewrite_src_addr
345         for s in self.rewrite_sid_list:
346             cli_str += " next " + s
347         self.vapi.cli(cli_str)
348
349         # log the localsids
350         self.logger.debug(self.vapi.cli("show sr localsid"))
351
352         # send one packet per packet size
353         count = len(self.pg_packet_sizes)
354
355         # prepare IPv4 in SRv6 headers
356         packet_header1 = self.create_packet_header_IPv6_SRH_IPv4(
357                         sidlist=sid_list[::-1],
358                         segleft=len(sid_list) - test_sid_index - 1)
359
360         # generate packets (pg0->pg1)
361         pkts1 = self.create_stream(self.pg0, self.pg1, packet_header1,
362                                    self.pg_packet_sizes, count)
363
364         # send packets and verify received packets
365         self.send_and_verify_pkts(self.pg0, pkts1, self.pg1,
366                                   self.compare_rx_tx_packet_End_AS_IPv4_out)
367
368         # log the localsid counters
369         self.logger.info(self.vapi.cli("show sr localsid"))
370
371         # prepare IPv6 header for returning packets
372         packet_header2 = self.create_packet_header_IPv4()
373
374         # generate returning packets (pg1->pg0)
375         pkts2 = self.create_stream(self.pg1, self.pg0, packet_header2,
376                                    self.pg_packet_sizes, count)
377
378         # send packets and verify received packets
379         self.send_and_verify_pkts(self.pg1, pkts2, self.pg0,
380                                   self.compare_rx_tx_packet_End_AS_IPv4_in)
381
382         # log the localsid counters
383         self.logger.info(self.vapi.cli("show sr localsid"))
384
385         # remove SRv6 localSIDs
386         self.vapi.cli("sr localsid del address " + sid_list[test_sid_index])
387
388         # cleanup interfaces
389         self.teardown_interfaces()
390
391     def compare_rx_tx_packet_End_AS_IPv6_in(self, tx_pkt, rx_pkt):
392         """ Compare input and output packet after passing End.AS
393
394         :param tx_pkt: transmitted packet
395         :param rx_pkt: received packet
396         """
397
398         # get first (outer) IPv6 header of rx'ed packet
399         rx_ip = rx_pkt.getlayer(IPv6)
400         rx_srh = None
401
402         tx_ip = tx_pkt.getlayer(IPv6)
403
404         # expected segment-list (SRH order)
405         tx_seglist = self.rewrite_sid_list[::-1]
406
407         # received ip.src should be equal to SR Policy source
408         self.assertEqual(rx_ip.src, self.rewrite_src_addr)
409         # received ip.dst should be equal to expected sidlist[lastentry]
410         self.assertEqual(rx_ip.dst, tx_seglist[-1])
411
412         if len(tx_seglist) > 1:
413             # rx'ed packet should have SRH
414             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
415             # get SRH
416             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
417             # rx'ed seglist should be equal to expected seglist
418             self.assertEqual(rx_srh.addresses, tx_seglist)
419             # segleft should be equal to size expected seglist-1
420             self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
421             # segleft should be equal to lastentry
422             self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
423             # get payload
424             payload = rx_srh.payload
425         else:
426             # rx'ed packet should NOT have SRH
427             self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
428             # get payload
429             payload = rx_ip.payload
430
431         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
432         # except for the hop-limit field
433         #   -> update tx'ed hlim to the expected hlim
434         tx_ip.hlim = tx_ip.hlim - 1
435
436         self.assertEqual(payload, tx_ip)
437
438         self.logger.debug("packet verification: SUCCESS")
439
440     def compare_rx_tx_packet_End_AS_IPv4_in(self, tx_pkt, rx_pkt):
441         """ Compare input and output packet after passing End.AS
442
443         :param tx_pkt: transmitted packet
444         :param rx_pkt: received packet
445         """
446
447         # get first (outer) IPv6 header of rx'ed packet
448         rx_ip = rx_pkt.getlayer(IPv6)
449         rx_srh = None
450
451         tx_ip = tx_pkt.getlayer(IP)
452
453         # expected segment-list (SRH order)
454         tx_seglist = self.rewrite_sid_list[::-1]
455
456         # received ip.src should be equal to SR Policy source
457         self.assertEqual(rx_ip.src, self.rewrite_src_addr)
458         # received ip.dst should be equal to expected sidlist[lastentry]
459         self.assertEqual(rx_ip.dst, tx_seglist[-1])
460
461         if len(tx_seglist) > 1:
462             # rx'ed packet should have SRH and IPv4 header
463             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
464             self.assertTrue(rx_ip.payload.haslayer(IP))
465             # get SRH
466             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
467             # rx'ed seglist should be equal to seglist
468             self.assertEqual(rx_srh.addresses, tx_seglist)
469             # segleft should be equal to size seglist-1
470             self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
471             # segleft should be equal to lastentry
472             self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
473             payload = rx_srh.payload
474         else:
475             # rx'ed packet should NOT have SRH
476             self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
477             # get payload
478             payload = rx_ip.payload
479
480         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
481         # except for the ttl field and ip checksum
482         #   -> adjust tx'ed ttl to expected ttl
483         tx_ip.ttl = tx_ip.ttl - 1
484         #   -> set tx'ed ip checksum to None and let scapy recompute
485         tx_ip.chksum = None
486         # read back the pkt (with str()) to force computing these fields
487         # probably other ways to accomplish this are possible
488         tx_ip = IP(scapy.compat.raw(tx_ip))
489
490         self.assertEqual(payload, tx_ip)
491
492         self.logger.debug("packet verification: SUCCESS")
493
494     def compare_rx_tx_packet_End_AS_L2_in(self, tx_pkt, rx_pkt):
495         """ Compare input and output packet after passing End.AS
496
497         :param tx_pkt: transmitted packet
498         :param rx_pkt: received packet
499         """
500
501         # get first (outer) IPv6 header of rx'ed packet
502         rx_ip = rx_pkt.getlayer(IPv6)
503         rx_srh = None
504
505         tx_ether = tx_pkt.getlayer(Ether)
506
507         # expected segment-list (SRH order)
508         tx_seglist = self.rewrite_sid_list[::-1]
509
510         # received ip.src should be equal to SR Policy source
511         self.assertEqual(rx_ip.src, self.rewrite_src_addr)
512         # received ip.dst should be equal to expected sidlist[lastentry]
513         self.assertEqual(rx_ip.dst, tx_seglist[-1])
514
515         if len(tx_seglist) > 1:
516             # rx'ed packet should have SRH
517             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
518             # get SRH
519             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
520             # rx'ed seglist should be equal to seglist
521             self.assertEqual(rx_srh.addresses, tx_seglist)
522             # segleft should be equal to size seglist-1
523             self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
524             # segleft should be equal to lastentry
525             self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
526             # nh should be "No Next Header" (59)
527             self.assertEqual(rx_srh.nh, 59)
528             # get payload
529             payload = rx_srh.payload
530         else:
531             # rx'ed packet should NOT have SRH
532             self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
533             # get payload
534             payload = rx_ip.payload
535
536         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
537         self.assertEqual(Ether(scapy.compat.raw(payload)), tx_ether)
538
539         self.logger.debug("packet verification: SUCCESS")
540
541     def compare_rx_tx_packet_End_AS_IPv6_out(self, tx_pkt, rx_pkt):
542         """ Compare input and output packet after passing End.AS with IPv6
543
544         :param tx_pkt: transmitted packet
545         :param rx_pkt: received packet
546         """
547
548         # get first (outer) IPv6 header of rx'ed packet
549         rx_ip = rx_pkt.getlayer(IPv6)
550
551         tx_ip = tx_pkt.getlayer(IPv6)
552         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
553
554         # verify if rx'ed packet has no SRH
555         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
556
557         # the whole rx_ip pkt should be equal to tx_ip2
558         # except for the hlim field
559         #   -> adjust tx'ed hlim to expected hlim
560         tx_ip2.hlim = tx_ip2.hlim - 1
561
562         self.assertEqual(rx_ip, tx_ip2)
563
564         self.logger.debug("packet verification: SUCCESS")
565
566     def compare_rx_tx_packet_End_AS_IPv4_out(self, tx_pkt, rx_pkt):
567         """ Compare input and output packet after passing End.AS with IPv4
568
569         :param tx_pkt: transmitted packet
570         :param rx_pkt: received packet
571         """
572
573         # get IPv4 header of rx'ed packet
574         rx_ip = rx_pkt.getlayer(IP)
575
576         tx_ip = tx_pkt.getlayer(IPv6)
577         tx_ip2 = tx_pkt.getlayer(IP)
578
579         # verify if rx'ed packet has no SRH
580         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
581
582         # the whole rx_ip pkt should be equal to tx_ip2
583         # except for the ttl field and ip checksum
584         #   -> adjust tx'ed ttl to expected ttl
585         tx_ip2.ttl = tx_ip2.ttl - 1
586         #   -> set tx'ed ip checksum to None and let scapy recompute
587         tx_ip2.chksum = None
588         # read back the pkt (with str()) to force computing these fields
589         # probably other ways to accomplish this are possible
590         tx_ip2 = IP(scapy.compat.raw(tx_ip2))
591
592         self.assertEqual(rx_ip, tx_ip2)
593
594         self.logger.debug("packet verification: SUCCESS")
595
596     def compare_rx_tx_packet_End_AS_L2_out(self, tx_pkt, rx_pkt):
597         """ Compare input and output packet after passing End.AS with L2
598
599         :param tx_pkt: transmitted packet
600         :param rx_pkt: received packet
601         """
602
603         # get IPv4 header of rx'ed packet
604         rx_eth = rx_pkt.getlayer(Ether)
605
606         tx_ip = tx_pkt.getlayer(IPv6)
607         # we can't just get the 2nd Ether layer
608         # get the Raw content and dissect it as Ether
609         tx_eth1 = Ether(scapy.compat.raw(tx_pkt[Raw]))
610
611         # verify if rx'ed packet has no SRH
612         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
613
614         # the whole rx_eth pkt should be equal to tx_eth1
615         self.assertEqual(rx_eth, tx_eth1)
616
617         self.logger.debug("packet verification: SUCCESS")
618
619     def create_stream(self, src_if, dst_if, packet_header, packet_sizes,
620                       count):
621         """Create SRv6 input packet stream for defined interface.
622
623         :param VppInterface src_if: Interface to create packet stream for
624         :param VppInterface dst_if: destination interface of packet stream
625         :param packet_header: Layer3 scapy packet headers,
626                 L2 is added when not provided,
627                 Raw(payload) with packet_info is added
628         :param list packet_sizes: packet stream pckt sizes,sequentially applied
629                to packets in stream have
630         :param int count: number of packets in packet stream
631         :return: list of packets
632         """
633         self.logger.info("Creating packets")
634         pkts = []
635         for i in range(0, count-1):
636             payload_info = self.create_packet_info(src_if, dst_if)
637             self.logger.debug(
638                 "Creating packet with index %d" % (payload_info.index))
639             payload = self.info_to_payload(payload_info)
640             # add L2 header if not yet provided in packet_header
641             if packet_header.getlayer(0).name == 'Ethernet':
642                 p = (packet_header /
643                      Raw(payload))
644             else:
645                 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
646                      packet_header /
647                      Raw(payload))
648             size = packet_sizes[i % len(packet_sizes)]
649             self.logger.debug("Packet size %d" % (size))
650             self.extend_packet(p, size)
651             # we need to store the packet with the automatic fields computed
652             # read back the dumped packet (with str())
653             # to force computing these fields
654             # probably other ways are possible
655             p = Ether(scapy.compat.raw(p))
656             payload_info.data = p.copy()
657             self.logger.debug(ppp("Created packet:", p))
658             pkts.append(p)
659         self.logger.info("Done creating packets")
660         return pkts
661
662     def send_and_verify_pkts(self, input, pkts, output, compare_func):
663         """Send packets and verify received packets using compare_func
664
665         :param input: ingress interface of DUT
666         :param pkts: list of packets to transmit
667         :param output: egress interface of DUT
668         :param compare_func: function to compare in and out packets
669         """
670         # add traffic stream to input interface
671         input.add_stream(pkts)
672
673         # enable capture on all interfaces
674         self.pg_enable_capture(self.pg_interfaces)
675
676         # start traffic
677         self.logger.info("Starting traffic")
678         self.pg_start()
679
680         # get output capture
681         self.logger.info("Getting packet capture")
682         capture = output.get_capture()
683
684         # assert nothing was captured on input interface
685         # input.assert_nothing_captured()
686
687         # verify captured packets
688         self.verify_captured_pkts(output, capture, compare_func)
689
690     def create_packet_header_IPv6(self):
691         """Create packet header: IPv6 header, UDP header
692
693         :param dst: IPv6 destination address
694
695         IPv6 source address is 1234::1
696         IPv6 destination address is 4321::1
697         UDP source port and destination port are 1234
698         """
699
700         p = (IPv6(src='1234::1', dst='4321::1') /
701              UDP(sport=1234, dport=1234))
702         return p
703
704     def create_packet_header_IPv6_SRH_IPv6(self, sidlist, segleft):
705         """Create packet header: IPv6 encapsulated in SRv6:
706         IPv6 header with SRH, IPv6 header, UDP header
707
708         :param list sidlist: segment list of outer IPv6 SRH
709         :param int segleft: segments-left field of outer IPv6 SRH
710
711         Outer IPv6 source address is set to 5678::1
712         Outer IPv6 destination address is set to sidlist[segleft]
713         IPv6 source addresses is 1234::1
714         IPv6 destination address is 4321::1
715         UDP source port and destination port are 1234
716         """
717
718         p = (IPv6(src='5678::1', dst=sidlist[segleft]) /
719              IPv6ExtHdrSegmentRouting(addresses=sidlist,
720                                       segleft=segleft, nh=41) /
721              IPv6(src='1234::1', dst='4321::1') /
722              UDP(sport=1234, dport=1234))
723         return p
724
725     def create_packet_header_IPv4(self):
726         """Create packet header: IPv4 header, UDP header
727
728         :param dst: IPv4 destination address
729
730         IPv4 source address is 123.1.1.1
731         IPv4 destination address is 124.1.1.1
732         UDP source port and destination port are 1234
733         """
734
735         p = (IP(src='123.1.1.1', dst='124.1.1.1') /
736              UDP(sport=1234, dport=1234))
737         return p
738
739     def create_packet_header_IPv6_SRH_IPv4(self, sidlist, segleft):
740         """Create packet header: IPv4 encapsulated in SRv6:
741         IPv6 header with SRH, IPv4 header, UDP header
742
743         :param ipv4address dst: inner IPv4 destination address
744         :param list sidlist: segment list of outer IPv6 SRH
745         :param int segleft: segments-left field of outer IPv6 SRH
746
747         Outer IPv6 destination address is set to sidlist[segleft]
748         IPv6 source address is 1234::1
749         IPv4 source address is 123.1.1.1
750         IPv4 destination address is 124.1.1.1
751         UDP source port and destination port are 1234
752         """
753
754         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
755              IPv6ExtHdrSegmentRouting(addresses=sidlist,
756                                       segleft=segleft, nh=4) /
757              IP(src='123.1.1.1', dst='124.1.1.1') /
758              UDP(sport=1234, dport=1234))
759         return p
760
761     def create_packet_header_L2(self, vlan=0):
762         """Create packet header: L2 header
763
764         :param vlan: if vlan!=0 then add 802.1q header
765         """
766         # Note: the dst addr ('00:55:44:33:22:11') is used in
767         # the compare function compare_rx_tx_packet_T_Encaps_L2
768         # to detect presence of L2 in SRH payload
769         p = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
770         etype = 0x8137  # IPX
771         if vlan:
772             # add 802.1q layer
773             p /= Dot1Q(vlan=vlan, type=etype)
774         else:
775             p.type = etype
776         return p
777
778     def create_packet_header_IPv6_SRH_L2(self, sidlist, segleft, vlan=0):
779         """Create packet header: L2 encapsulated in SRv6:
780         IPv6 header with SRH, L2
781
782         :param list sidlist: segment list of outer IPv6 SRH
783         :param int segleft: segments-left field of outer IPv6 SRH
784         :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
785
786         Outer IPv6 destination address is set to sidlist[segleft]
787         IPv6 source address is 1234::1
788         """
789         eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
790         etype = 0x8137  # IPX
791         if vlan:
792             # add 802.1q layer
793             eth /= Dot1Q(vlan=vlan, type=etype)
794         else:
795             eth.type = etype
796
797         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
798              IPv6ExtHdrSegmentRouting(addresses=sidlist,
799                                       segleft=segleft, nh=59) /
800              eth)
801         return p
802
803     def get_payload_info(self, packet):
804         """ Extract the payload_info from the packet
805         """
806         # in most cases, payload_info is in packet[Raw]
807         # but packet[Raw] gives the complete payload
808         # (incl L2 header) for the T.Encaps L2 case
809         try:
810             payload_info = self.payload_to_info(packet[Raw])
811
812         except:
813             # remote L2 header from packet[Raw]:
814             # take packet[Raw], convert it to an Ether layer
815             # and then extract Raw from it
816             payload_info = self.payload_to_info(
817                 Ether(scapy.compat.raw(packet[Raw]))[Raw])
818
819         return payload_info
820
821     def verify_captured_pkts(self, dst_if, capture, compare_func):
822         """
823         Verify captured packet stream for specified interface.
824         Compare ingress with egress packets using the specified compare fn
825
826         :param dst_if: egress interface of DUT
827         :param capture: captured packets
828         :param compare_func: function to compare in and out packet
829         """
830         self.logger.info("Verifying capture on interface %s using function %s"
831                          % (dst_if.name, compare_func.__name__))
832
833         last_info = dict()
834         for i in self.pg_interfaces:
835             last_info[i.sw_if_index] = None
836         dst_sw_if_index = dst_if.sw_if_index
837
838         for packet in capture:
839             try:
840                 # extract payload_info from packet's payload
841                 payload_info = self.get_payload_info(packet)
842                 packet_index = payload_info.index
843
844                 self.logger.debug("Verifying packet with index %d"
845                                   % (packet_index))
846                 # packet should have arrived on the expected interface
847                 self.assertEqual(payload_info.dst, dst_sw_if_index)
848                 self.logger.debug(
849                     "Got packet on interface %s: src=%u (idx=%u)" %
850                     (dst_if.name, payload_info.src, packet_index))
851
852                 # search for payload_info with same src and dst if_index
853                 # this will give us the transmitted packet
854                 next_info = self.get_next_packet_info_for_interface2(
855                     payload_info.src, dst_sw_if_index,
856                     last_info[payload_info.src])
857                 last_info[payload_info.src] = next_info
858                 # next_info should not be None
859                 self.assertTrue(next_info is not None)
860                 # index of tx and rx packets should be equal
861                 self.assertEqual(packet_index, next_info.index)
862                 # data field of next_info contains the tx packet
863                 txed_packet = next_info.data
864
865                 self.logger.debug(ppp("Transmitted packet:",
866                                       txed_packet))  # ppp=Pretty Print Packet
867
868                 self.logger.debug(ppp("Received packet:", packet))
869
870                 # compare rcvd packet with expected packet using compare_func
871                 compare_func(txed_packet, packet)
872
873             except:
874                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
875                 raise
876
877         # have all expected packets arrived?
878         for i in self.pg_interfaces:
879             remaining_packet = self.get_next_packet_info_for_interface2(
880                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
881             self.assertTrue(remaining_packet is None,
882                             "Interface %s: Packet expected from interface %s "
883                             "didn't arrive" % (dst_if.name, i.name))
884
885
886 if __name__ == '__main__':
887     unittest.main(testRunner=VppTestRunner)