MAP: Convert from DPO to input feature.
[vpp.git] / test / test_srv6_as.py
1 #!/usr/bin/env python
2
3 import unittest
4 import binascii
5 from socket import AF_INET6
6
7 from framework import VppTestCase, VppTestRunner
8 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto, VppIpTable
9 from vpp_srv6 import SRv6LocalSIDBehaviors, VppSRv6LocalSID, VppSRv6Policy, \
10     SRv6PolicyType, VppSRv6Steering, SRv6PolicySteeringTypes
11
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether, Dot1Q
14 from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
15 from scapy.layers.inet import IP, UDP
16
17 from scapy.utils import inet_pton, inet_ntop
18
19 from util import ppp
20
21
22 class TestSRv6(VppTestCase):
23     """ SRv6 Static Proxy plugin Test Case """
24
25     @classmethod
26     def setUpClass(self):
27         super(TestSRv6, self).setUpClass()
28
29     def setUp(self):
30         """ Perform test setup before each test case.
31         """
32         super(TestSRv6, self).setUp()
33
34         # packet sizes, inclusive L2 overhead
35         self.pg_packet_sizes = [64, 512, 1518, 9018]
36
37         # reset packet_infos
38         self.reset_packet_infos()
39
40     def tearDown(self):
41         """ Clean up test setup after each test case.
42         """
43         self.teardown_interfaces()
44
45         super(TestSRv6, self).tearDown()
46
47     def configure_interface(self,
48                             interface,
49                             ipv6=False, ipv4=False,
50                             ipv6_table_id=0, ipv4_table_id=0):
51         """ Configure interface.
52         :param ipv6: configure IPv6 on interface
53         :param ipv4: configure IPv4 on interface
54         :param ipv6_table_id: FIB table_id for IPv6
55         :param ipv4_table_id: FIB table_id for IPv4
56         """
57         self.logger.debug("Configuring interface %s" % (interface.name))
58         if ipv6:
59             self.logger.debug("Configuring IPv6")
60             interface.set_table_ip6(ipv6_table_id)
61             interface.config_ip6()
62             interface.resolve_ndp(timeout=5)
63         if ipv4:
64             self.logger.debug("Configuring IPv4")
65             interface.set_table_ip4(ipv4_table_id)
66             interface.config_ip4()
67             interface.resolve_arp()
68         interface.admin_up()
69
70     def setup_interfaces(self, ipv6=[], ipv4=[],
71                          ipv6_table_id=[], ipv4_table_id=[]):
72         """ Create and configure interfaces.
73
74         :param ipv6: list of interface IPv6 capabilities
75         :param ipv4: list of interface IPv4 capabilities
76         :param ipv6_table_id: list of intf IPv6 FIB table_ids
77         :param ipv4_table_id: list of intf IPv4 FIB table_ids
78         :returns: List of created interfaces.
79         """
80         # how many interfaces?
81         if len(ipv6):
82             count = len(ipv6)
83         else:
84             count = len(ipv4)
85         self.logger.debug("Creating and configuring %d interfaces" % (count))
86
87         # fill up ipv6 and ipv4 lists if needed
88         # not enabled (False) is the default
89         if len(ipv6) < count:
90             ipv6 += (count - len(ipv6)) * [False]
91         if len(ipv4) < count:
92             ipv4 += (count - len(ipv4)) * [False]
93
94         # fill up table_id lists if needed
95         # table_id 0 (global) is the default
96         if len(ipv6_table_id) < count:
97             ipv6_table_id += (count - len(ipv6_table_id)) * [0]
98         if len(ipv4_table_id) < count:
99             ipv4_table_id += (count - len(ipv4_table_id)) * [0]
100
101         # create 'count' pg interfaces
102         self.create_pg_interfaces(range(count))
103
104         # setup all interfaces
105         for i in range(count):
106             intf = self.pg_interfaces[i]
107             self.configure_interface(intf,
108                                      ipv6[i], ipv4[i],
109                                      ipv6_table_id[i], ipv4_table_id[i])
110
111         if any(ipv6):
112             self.logger.debug(self.vapi.cli("show ip6 neighbors"))
113         if any(ipv4):
114             self.logger.debug(self.vapi.cli("show ip arp"))
115         self.logger.debug(self.vapi.cli("show interface"))
116         self.logger.debug(self.vapi.cli("show hardware"))
117
118         return self.pg_interfaces
119
120     def teardown_interfaces(self):
121         """ Unconfigure and bring down interface.
122         """
123         self.logger.debug("Tearing down interfaces")
124         # tear down all interfaces
125         # AFAIK they cannot be deleted
126         for i in self.pg_interfaces:
127             self.logger.debug("Tear down interface %s" % (i.name))
128             i.admin_down()
129             i.unconfig()
130             i.set_table_ip4(0)
131             i.set_table_ip6(0)
132
133     def test_SRv6_End_AS_IPv6_noSRH(self):
134         """ Test SRv6 End.AS behavior with IPv6 traffic and no SRH rewrite.
135         """
136         self.run_SRv6_End_AS_IPv6(
137             sid_list=['a1::', 'a2::a6', 'a3::'],
138             test_sid_index=1,
139             rewrite_src_addr='a2::')
140
141     def test_SRv6_End_AS_IPv6_SRH(self):
142         """ Test SRv6 End.AS behavior with IPv6 traffic and SRH rewrite.
143         """
144         self.run_SRv6_End_AS_IPv6(
145             sid_list=['a1::a6', 'a2::', 'a3::'],
146             test_sid_index=0,
147             rewrite_src_addr='a1::')
148
149     def test_SRv6_End_AS_IPv4_noSRH(self):
150         """ Test SRv6 End.AS behavior with IPv4 traffic and no SRH rewrite.
151         """
152         self.run_SRv6_End_AS_IPv4(
153             sid_list=['a1::', 'a2::a6', 'a3::'],
154             test_sid_index=1,
155             rewrite_src_addr='a2::')
156
157     def test_SRv6_End_AS_IPv4_SRH(self):
158         """ Test SRv6 End.AS behavior with IPv4 traffic and SRH rewrite.
159         """
160         self.run_SRv6_End_AS_IPv4(
161             sid_list=['a1::a6', 'a2::', 'a3::'],
162             test_sid_index=0,
163             rewrite_src_addr='a1::')
164
165     def test_SRv6_End_AS_L2_noSRH(self):
166         """ Test SRv6 End.AS behavior with L2 traffic and no SRH rewrite.
167         """
168         self.run_SRv6_End_AS_L2(
169             sid_list=['a1::', 'a2::a6', 'a3::'],
170             test_sid_index=1,
171             rewrite_src_addr='a2::')
172
173     def test_SRv6_End_AS_L2_SRH(self):
174         """ Test SRv6 End.AS behavior with L2 traffic and SRH rewrite.
175         """
176         self.run_SRv6_End_AS_L2(
177             sid_list=['a1::a6', 'a2::', 'a3::'],
178             test_sid_index=0,
179             rewrite_src_addr='a1::')
180
181     def run_SRv6_End_AS_L2(self, sid_list, test_sid_index, rewrite_src_addr):
182         """ Run SRv6 End.AS test with L2 traffic.
183         """
184         self.rewrite_src_addr = rewrite_src_addr
185         self.rewrite_sid_list = sid_list[test_sid_index + 1::]
186
187         # send traffic to one destination interface
188         # source and destination interfaces are IPv6 only
189         self.setup_interfaces(ipv6=[True, False])
190
191         # configure route to next segment
192         route = VppIpRoute(self, sid_list[test_sid_index + 1], 128,
193                            [VppRoutePath(self.pg0.remote_ip6,
194                                          self.pg0.sw_if_index,
195                                          proto=DpoProto.DPO_PROTO_IP6)],
196                            is_ip6=1)
197         route.add_vpp_config()
198
199         # configure SRv6 localSID behavior
200         cli_str = "sr localsid address " + sid_list[test_sid_index] \
201             + " behavior end.as" \
202             + " oif " + self.pg1.name \
203             + " iif " + self.pg1.name \
204             + " src " + self.rewrite_src_addr
205         for s in self.rewrite_sid_list:
206             cli_str += " next " + s
207         self.vapi.cli(cli_str)
208
209         # log the localsids
210         self.logger.debug(self.vapi.cli("show sr localsid"))
211
212         # send one packet per packet size
213         count = len(self.pg_packet_sizes)
214
215         # prepare L2 in SRv6 headers
216         packet_header1 = self.create_packet_header_IPv6_SRH_L2(
217                         sidlist=sid_list[::-1],
218                         segleft=len(sid_list) - test_sid_index - 1,
219                         vlan=0)
220
221         # generate packets (pg0->pg1)
222         pkts1 = self.create_stream(self.pg0, self.pg1, packet_header1,
223                                    self.pg_packet_sizes, count)
224
225         # send packets and verify received packets
226         self.send_and_verify_pkts(self.pg0, pkts1, self.pg1,
227                                   self.compare_rx_tx_packet_End_AS_L2_out)
228
229         # log the localsid counters
230         self.logger.info(self.vapi.cli("show sr localsid"))
231
232         # prepare L2 header for returning packets
233         packet_header2 = self.create_packet_header_L2()
234
235         # generate returning packets (pg1->pg0)
236         pkts2 = self.create_stream(self.pg1, self.pg0, packet_header2,
237                                    self.pg_packet_sizes, count)
238
239         # send packets and verify received packets
240         self.send_and_verify_pkts(self.pg1, pkts2, self.pg0,
241                                   self.compare_rx_tx_packet_End_AS_L2_in)
242
243         # log the localsid counters
244         self.logger.info(self.vapi.cli("show sr localsid"))
245
246         # remove SRv6 localSIDs
247         self.vapi.cli("sr localsid del address " + sid_list[test_sid_index])
248
249         # cleanup interfaces
250         self.teardown_interfaces()
251
252     def run_SRv6_End_AS_IPv6(self, sid_list, test_sid_index, rewrite_src_addr):
253         """ Run SRv6 End.AS test with IPv6 traffic.
254         """
255         self.rewrite_src_addr = rewrite_src_addr
256         self.rewrite_sid_list = sid_list[test_sid_index + 1::]
257
258         # send traffic to one destination interface
259         # source and destination interfaces are IPv6 only
260         self.setup_interfaces(ipv6=[True, True])
261
262         # configure route to next segment
263         route = VppIpRoute(self, sid_list[test_sid_index + 1], 128,
264                            [VppRoutePath(self.pg0.remote_ip6,
265                                          self.pg0.sw_if_index,
266                                          proto=DpoProto.DPO_PROTO_IP6)],
267                            is_ip6=1)
268         route.add_vpp_config()
269
270         # configure SRv6 localSID behavior
271         cli_str = "sr localsid address " + sid_list[test_sid_index] \
272             + " behavior end.as" \
273             + " nh " + self.pg1.remote_ip6 \
274             + " oif " + self.pg1.name \
275             + " iif " + self.pg1.name \
276             + " src " + self.rewrite_src_addr
277         for s in self.rewrite_sid_list:
278             cli_str += " next " + s
279         self.vapi.cli(cli_str)
280
281         # log the localsids
282         self.logger.debug(self.vapi.cli("show sr localsid"))
283
284         # send one packet per packet size
285         count = len(self.pg_packet_sizes)
286
287         # prepare IPv6 in SRv6 headers
288         packet_header1 = self.create_packet_header_IPv6_SRH_IPv6(
289                         sidlist=sid_list[::-1],
290                         segleft=len(sid_list) - test_sid_index - 1)
291
292         # generate packets (pg0->pg1)
293         pkts1 = self.create_stream(self.pg0, self.pg1, packet_header1,
294                                    self.pg_packet_sizes, count)
295
296         # send packets and verify received packets
297         self.send_and_verify_pkts(self.pg0, pkts1, self.pg1,
298                                   self.compare_rx_tx_packet_End_AS_IPv6_out)
299
300         # log the localsid counters
301         self.logger.info(self.vapi.cli("show sr localsid"))
302
303         # prepare IPv6 header for returning packets
304         packet_header2 = self.create_packet_header_IPv6()
305
306         # generate returning packets (pg1->pg0)
307         pkts2 = self.create_stream(self.pg1, self.pg0, packet_header2,
308                                    self.pg_packet_sizes, count)
309
310         # send packets and verify received packets
311         self.send_and_verify_pkts(self.pg1, pkts2, self.pg0,
312                                   self.compare_rx_tx_packet_End_AS_IPv6_in)
313
314         # log the localsid counters
315         self.logger.info(self.vapi.cli("show sr localsid"))
316
317         # remove SRv6 localSIDs
318         self.vapi.cli("sr localsid del address " + sid_list[test_sid_index])
319
320         # cleanup interfaces
321         self.teardown_interfaces()
322
323     def run_SRv6_End_AS_IPv4(self, sid_list, test_sid_index, rewrite_src_addr):
324         """ Run SRv6 End.AS test with IPv4 traffic.
325         """
326         self.rewrite_src_addr = rewrite_src_addr
327         self.rewrite_sid_list = sid_list[test_sid_index + 1::]
328
329         # send traffic to one destination interface
330         # source and destination interfaces are IPv6 only
331         self.setup_interfaces(ipv6=[True, False], ipv4=[True, True])
332
333         # configure route to next segment
334         route = VppIpRoute(self, sid_list[test_sid_index + 1], 128,
335                            [VppRoutePath(self.pg0.remote_ip6,
336                                          self.pg0.sw_if_index,
337                                          proto=DpoProto.DPO_PROTO_IP6)],
338                            is_ip6=1)
339         route.add_vpp_config()
340
341         # configure SRv6 localSID behavior
342         cli_str = "sr localsid address " + sid_list[test_sid_index] \
343             + " behavior end.as" \
344             + " nh " + self.pg1.remote_ip4 \
345             + " oif " + self.pg1.name \
346             + " iif " + self.pg1.name \
347             + " src " + self.rewrite_src_addr
348         for s in self.rewrite_sid_list:
349             cli_str += " next " + s
350         self.vapi.cli(cli_str)
351
352         # log the localsids
353         self.logger.debug(self.vapi.cli("show sr localsid"))
354
355         # send one packet per packet size
356         count = len(self.pg_packet_sizes)
357
358         # prepare IPv4 in SRv6 headers
359         packet_header1 = self.create_packet_header_IPv6_SRH_IPv4(
360                         sidlist=sid_list[::-1],
361                         segleft=len(sid_list) - test_sid_index - 1)
362
363         # generate packets (pg0->pg1)
364         pkts1 = self.create_stream(self.pg0, self.pg1, packet_header1,
365                                    self.pg_packet_sizes, count)
366
367         # send packets and verify received packets
368         self.send_and_verify_pkts(self.pg0, pkts1, self.pg1,
369                                   self.compare_rx_tx_packet_End_AS_IPv4_out)
370
371         # log the localsid counters
372         self.logger.info(self.vapi.cli("show sr localsid"))
373
374         # prepare IPv6 header for returning packets
375         packet_header2 = self.create_packet_header_IPv4()
376
377         # generate returning packets (pg1->pg0)
378         pkts2 = self.create_stream(self.pg1, self.pg0, packet_header2,
379                                    self.pg_packet_sizes, count)
380
381         # send packets and verify received packets
382         self.send_and_verify_pkts(self.pg1, pkts2, self.pg0,
383                                   self.compare_rx_tx_packet_End_AS_IPv4_in)
384
385         # log the localsid counters
386         self.logger.info(self.vapi.cli("show sr localsid"))
387
388         # remove SRv6 localSIDs
389         self.vapi.cli("sr localsid del address " + sid_list[test_sid_index])
390
391         # cleanup interfaces
392         self.teardown_interfaces()
393
394     def compare_rx_tx_packet_End_AS_IPv6_in(self, tx_pkt, rx_pkt):
395         """ Compare input and output packet after passing End.AS
396
397         :param tx_pkt: transmitted packet
398         :param rx_pkt: received packet
399         """
400
401         # get first (outer) IPv6 header of rx'ed packet
402         rx_ip = rx_pkt.getlayer(IPv6)
403         rx_srh = None
404
405         tx_ip = tx_pkt.getlayer(IPv6)
406
407         # expected segment-list (SRH order)
408         tx_seglist = self.rewrite_sid_list[::-1]
409
410         # received ip.src should be equal to SR Policy source
411         self.assertEqual(rx_ip.src, self.rewrite_src_addr)
412         # received ip.dst should be equal to expected sidlist[lastentry]
413         self.assertEqual(rx_ip.dst, tx_seglist[-1])
414
415         if len(tx_seglist) > 1:
416             # rx'ed packet should have SRH
417             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
418             # get SRH
419             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
420             # rx'ed seglist should be equal to expected seglist
421             self.assertEqual(rx_srh.addresses, tx_seglist)
422             # segleft should be equal to size expected seglist-1
423             self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
424             # segleft should be equal to lastentry
425             self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
426             # get payload
427             payload = rx_srh.payload
428         else:
429             # rx'ed packet should NOT have SRH
430             self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
431             # get payload
432             payload = rx_ip.payload
433
434         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
435         # except for the hop-limit field
436         #   -> update tx'ed hlim to the expected hlim
437         tx_ip.hlim = tx_ip.hlim - 1
438
439         self.assertEqual(payload, tx_ip)
440
441         self.logger.debug("packet verification: SUCCESS")
442
443     def compare_rx_tx_packet_End_AS_IPv4_in(self, tx_pkt, rx_pkt):
444         """ Compare input and output packet after passing End.AS
445
446         :param tx_pkt: transmitted packet
447         :param rx_pkt: received packet
448         """
449
450         # get first (outer) IPv6 header of rx'ed packet
451         rx_ip = rx_pkt.getlayer(IPv6)
452         rx_srh = None
453
454         tx_ip = tx_pkt.getlayer(IP)
455
456         # expected segment-list (SRH order)
457         tx_seglist = self.rewrite_sid_list[::-1]
458
459         # received ip.src should be equal to SR Policy source
460         self.assertEqual(rx_ip.src, self.rewrite_src_addr)
461         # received ip.dst should be equal to expected sidlist[lastentry]
462         self.assertEqual(rx_ip.dst, tx_seglist[-1])
463
464         if len(tx_seglist) > 1:
465             # rx'ed packet should have SRH and IPv4 header
466             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
467             self.assertTrue(rx_ip.payload.haslayer(IP))
468             # get SRH
469             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
470             # rx'ed seglist should be equal to seglist
471             self.assertEqual(rx_srh.addresses, tx_seglist)
472             # segleft should be equal to size seglist-1
473             self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
474             # segleft should be equal to lastentry
475             self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
476             payload = rx_srh.payload
477         else:
478             # rx'ed packet should NOT have SRH
479             self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
480             # get payload
481             payload = rx_ip.payload
482
483         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
484         # except for the ttl field and ip checksum
485         #   -> adjust tx'ed ttl to expected ttl
486         tx_ip.ttl = tx_ip.ttl - 1
487         #   -> set tx'ed ip checksum to None and let scapy recompute
488         tx_ip.chksum = None
489         # read back the pkt (with str()) to force computing these fields
490         # probably other ways to accomplish this are possible
491         tx_ip = IP(str(tx_ip))
492
493         self.assertEqual(payload, tx_ip)
494
495         self.logger.debug("packet verification: SUCCESS")
496
497     def compare_rx_tx_packet_End_AS_L2_in(self, tx_pkt, rx_pkt):
498         """ Compare input and output packet after passing End.AS
499
500         :param tx_pkt: transmitted packet
501         :param rx_pkt: received packet
502         """
503
504         # get first (outer) IPv6 header of rx'ed packet
505         rx_ip = rx_pkt.getlayer(IPv6)
506         rx_srh = None
507
508         tx_ether = tx_pkt.getlayer(Ether)
509
510         # expected segment-list (SRH order)
511         tx_seglist = self.rewrite_sid_list[::-1]
512
513         # received ip.src should be equal to SR Policy source
514         self.assertEqual(rx_ip.src, self.rewrite_src_addr)
515         # received ip.dst should be equal to expected sidlist[lastentry]
516         self.assertEqual(rx_ip.dst, tx_seglist[-1])
517
518         if len(tx_seglist) > 1:
519             # rx'ed packet should have SRH
520             self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
521             # get SRH
522             rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
523             # rx'ed seglist should be equal to seglist
524             self.assertEqual(rx_srh.addresses, tx_seglist)
525             # segleft should be equal to size seglist-1
526             self.assertEqual(rx_srh.segleft, len(tx_seglist)-1)
527             # segleft should be equal to lastentry
528             self.assertEqual(rx_srh.segleft, rx_srh.lastentry)
529             # nh should be "No Next Header" (59)
530             self.assertEqual(rx_srh.nh, 59)
531             # get payload
532             payload = rx_srh.payload
533         else:
534             # rx'ed packet should NOT have SRH
535             self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
536             # get payload
537             payload = rx_ip.payload
538
539         # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
540         self.assertEqual(Ether(str(payload)), tx_ether)
541
542         self.logger.debug("packet verification: SUCCESS")
543
544     def compare_rx_tx_packet_End_AS_IPv6_out(self, tx_pkt, rx_pkt):
545         """ Compare input and output packet after passing End.AS with IPv6
546
547         :param tx_pkt: transmitted packet
548         :param rx_pkt: received packet
549         """
550
551         # get first (outer) IPv6 header of rx'ed packet
552         rx_ip = rx_pkt.getlayer(IPv6)
553
554         tx_ip = tx_pkt.getlayer(IPv6)
555         tx_ip2 = tx_pkt.getlayer(IPv6, 2)
556
557         # verify if rx'ed packet has no SRH
558         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
559
560         # the whole rx_ip pkt should be equal to tx_ip2
561         # except for the hlim field
562         #   -> adjust tx'ed hlim to expected hlim
563         tx_ip2.hlim = tx_ip2.hlim - 1
564
565         self.assertEqual(rx_ip, tx_ip2)
566
567         self.logger.debug("packet verification: SUCCESS")
568
569     def compare_rx_tx_packet_End_AS_IPv4_out(self, tx_pkt, rx_pkt):
570         """ Compare input and output packet after passing End.AS with IPv4
571
572         :param tx_pkt: transmitted packet
573         :param rx_pkt: received packet
574         """
575
576         # get IPv4 header of rx'ed packet
577         rx_ip = rx_pkt.getlayer(IP)
578
579         tx_ip = tx_pkt.getlayer(IPv6)
580         tx_ip2 = tx_pkt.getlayer(IP)
581
582         # verify if rx'ed packet has no SRH
583         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
584
585         # the whole rx_ip pkt should be equal to tx_ip2
586         # except for the ttl field and ip checksum
587         #   -> adjust tx'ed ttl to expected ttl
588         tx_ip2.ttl = tx_ip2.ttl - 1
589         #   -> set tx'ed ip checksum to None and let scapy recompute
590         tx_ip2.chksum = None
591         # read back the pkt (with str()) to force computing these fields
592         # probably other ways to accomplish this are possible
593         tx_ip2 = IP(str(tx_ip2))
594
595         self.assertEqual(rx_ip, tx_ip2)
596
597         self.logger.debug("packet verification: SUCCESS")
598
599     def compare_rx_tx_packet_End_AS_L2_out(self, tx_pkt, rx_pkt):
600         """ Compare input and output packet after passing End.AS with L2
601
602         :param tx_pkt: transmitted packet
603         :param rx_pkt: received packet
604         """
605
606         # get IPv4 header of rx'ed packet
607         rx_eth = rx_pkt.getlayer(Ether)
608
609         tx_ip = tx_pkt.getlayer(IPv6)
610         # we can't just get the 2nd Ether layer
611         # get the Raw content and dissect it as Ether
612         tx_eth1 = Ether(str(tx_pkt[Raw]))
613
614         # verify if rx'ed packet has no SRH
615         self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
616
617         # the whole rx_eth pkt should be equal to tx_eth1
618         self.assertEqual(rx_eth, tx_eth1)
619
620         self.logger.debug("packet verification: SUCCESS")
621
622     def create_stream(self, src_if, dst_if, packet_header, packet_sizes,
623                       count):
624         """Create SRv6 input packet stream for defined interface.
625
626         :param VppInterface src_if: Interface to create packet stream for
627         :param VppInterface dst_if: destination interface of packet stream
628         :param packet_header: Layer3 scapy packet headers,
629                 L2 is added when not provided,
630                 Raw(payload) with packet_info is added
631         :param list packet_sizes: packet stream pckt sizes,sequentially applied
632                to packets in stream have
633         :param int count: number of packets in packet stream
634         :return: list of packets
635         """
636         self.logger.info("Creating packets")
637         pkts = []
638         for i in range(0, count-1):
639             payload_info = self.create_packet_info(src_if, dst_if)
640             self.logger.debug(
641                 "Creating packet with index %d" % (payload_info.index))
642             payload = self.info_to_payload(payload_info)
643             # add L2 header if not yet provided in packet_header
644             if packet_header.getlayer(0).name == 'Ethernet':
645                 p = (packet_header /
646                      Raw(payload))
647             else:
648                 p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
649                      packet_header /
650                      Raw(payload))
651             size = packet_sizes[i % len(packet_sizes)]
652             self.logger.debug("Packet size %d" % (size))
653             self.extend_packet(p, size)
654             # we need to store the packet with the automatic fields computed
655             # read back the dumped packet (with str())
656             # to force computing these fields
657             # probably other ways are possible
658             p = Ether(str(p))
659             payload_info.data = p.copy()
660             self.logger.debug(ppp("Created packet:", p))
661             pkts.append(p)
662         self.logger.info("Done creating packets")
663         return pkts
664
665     def send_and_verify_pkts(self, input, pkts, output, compare_func):
666         """Send packets and verify received packets using compare_func
667
668         :param input: ingress interface of DUT
669         :param pkts: list of packets to transmit
670         :param output: egress interface of DUT
671         :param compare_func: function to compare in and out packets
672         """
673         # add traffic stream to input interface
674         input.add_stream(pkts)
675
676         # enable capture on all interfaces
677         self.pg_enable_capture(self.pg_interfaces)
678
679         # start traffic
680         self.logger.info("Starting traffic")
681         self.pg_start()
682
683         # get output capture
684         self.logger.info("Getting packet capture")
685         capture = output.get_capture()
686
687         # assert nothing was captured on input interface
688         # input.assert_nothing_captured()
689
690         # verify captured packets
691         self.verify_captured_pkts(output, capture, compare_func)
692
693     def create_packet_header_IPv6(self):
694         """Create packet header: IPv6 header, UDP header
695
696         :param dst: IPv6 destination address
697
698         IPv6 source address is 1234::1
699         IPv6 destination address is 4321::1
700         UDP source port and destination port are 1234
701         """
702
703         p = (IPv6(src='1234::1', dst='4321::1') /
704              UDP(sport=1234, dport=1234))
705         return p
706
707     def create_packet_header_IPv6_SRH_IPv6(self, sidlist, segleft):
708         """Create packet header: IPv6 encapsulated in SRv6:
709         IPv6 header with SRH, IPv6 header, UDP header
710
711         :param list sidlist: segment list of outer IPv6 SRH
712         :param int segleft: segments-left field of outer IPv6 SRH
713
714         Outer IPv6 source address is set to 5678::1
715         Outer IPv6 destination address is set to sidlist[segleft]
716         IPv6 source addresses is 1234::1
717         IPv6 destination address is 4321::1
718         UDP source port and destination port are 1234
719         """
720
721         p = (IPv6(src='5678::1', dst=sidlist[segleft]) /
722              IPv6ExtHdrSegmentRouting(addresses=sidlist,
723                                       segleft=segleft, nh=41) /
724              IPv6(src='1234::1', dst='4321::1') /
725              UDP(sport=1234, dport=1234))
726         return p
727
728     def create_packet_header_IPv4(self):
729         """Create packet header: IPv4 header, UDP header
730
731         :param dst: IPv4 destination address
732
733         IPv4 source address is 123.1.1.1
734         IPv4 destination address is 124.1.1.1
735         UDP source port and destination port are 1234
736         """
737
738         p = (IP(src='123.1.1.1', dst='124.1.1.1') /
739              UDP(sport=1234, dport=1234))
740         return p
741
742     def create_packet_header_IPv6_SRH_IPv4(self, sidlist, segleft):
743         """Create packet header: IPv4 encapsulated in SRv6:
744         IPv6 header with SRH, IPv4 header, UDP header
745
746         :param ipv4address dst: inner IPv4 destination address
747         :param list sidlist: segment list of outer IPv6 SRH
748         :param int segleft: segments-left field of outer IPv6 SRH
749
750         Outer IPv6 destination address is set to sidlist[segleft]
751         IPv6 source address is 1234::1
752         IPv4 source address is 123.1.1.1
753         IPv4 destination address is 124.1.1.1
754         UDP source port and destination port are 1234
755         """
756
757         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
758              IPv6ExtHdrSegmentRouting(addresses=sidlist,
759                                       segleft=segleft, nh=4) /
760              IP(src='123.1.1.1', dst='124.1.1.1') /
761              UDP(sport=1234, dport=1234))
762         return p
763
764     def create_packet_header_L2(self, vlan=0):
765         """Create packet header: L2 header
766
767         :param vlan: if vlan!=0 then add 802.1q header
768         """
769         # Note: the dst addr ('00:55:44:33:22:11') is used in
770         # the compare function compare_rx_tx_packet_T_Encaps_L2
771         # to detect presence of L2 in SRH payload
772         p = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
773         etype = 0x8137  # IPX
774         if vlan:
775             # add 802.1q layer
776             p /= Dot1Q(vlan=vlan, type=etype)
777         else:
778             p.type = etype
779         return p
780
781     def create_packet_header_IPv6_SRH_L2(self, sidlist, segleft, vlan=0):
782         """Create packet header: L2 encapsulated in SRv6:
783         IPv6 header with SRH, L2
784
785         :param list sidlist: segment list of outer IPv6 SRH
786         :param int segleft: segments-left field of outer IPv6 SRH
787         :param vlan: L2 vlan; if vlan!=0 then add 802.1q header
788
789         Outer IPv6 destination address is set to sidlist[segleft]
790         IPv6 source address is 1234::1
791         """
792         eth = Ether(src='00:11:22:33:44:55', dst='00:55:44:33:22:11')
793         etype = 0x8137  # IPX
794         if vlan:
795             # add 802.1q layer
796             eth /= Dot1Q(vlan=vlan, type=etype)
797         else:
798             eth.type = etype
799
800         p = (IPv6(src='1234::1', dst=sidlist[segleft]) /
801              IPv6ExtHdrSegmentRouting(addresses=sidlist,
802                                       segleft=segleft, nh=59) /
803              eth)
804         return p
805
806     def get_payload_info(self, packet):
807         """ Extract the payload_info from the packet
808         """
809         # in most cases, payload_info is in packet[Raw]
810         # but packet[Raw] gives the complete payload
811         # (incl L2 header) for the T.Encaps L2 case
812         try:
813             payload_info = self.payload_to_info(str(packet[Raw]))
814
815         except:
816             # remote L2 header from packet[Raw]:
817             # take packet[Raw], convert it to an Ether layer
818             # and then extract Raw from it
819             payload_info = self.payload_to_info(
820                 str(Ether(str(packet[Raw]))[Raw]))
821
822         return payload_info
823
824     def verify_captured_pkts(self, dst_if, capture, compare_func):
825         """
826         Verify captured packet stream for specified interface.
827         Compare ingress with egress packets using the specified compare fn
828
829         :param dst_if: egress interface of DUT
830         :param capture: captured packets
831         :param compare_func: function to compare in and out packet
832         """
833         self.logger.info("Verifying capture on interface %s using function %s"
834                          % (dst_if.name, compare_func.func_name))
835
836         last_info = dict()
837         for i in self.pg_interfaces:
838             last_info[i.sw_if_index] = None
839         dst_sw_if_index = dst_if.sw_if_index
840
841         for packet in capture:
842             try:
843                 # extract payload_info from packet's payload
844                 payload_info = self.get_payload_info(packet)
845                 packet_index = payload_info.index
846
847                 self.logger.debug("Verifying packet with index %d"
848                                   % (packet_index))
849                 # packet should have arrived on the expected interface
850                 self.assertEqual(payload_info.dst, dst_sw_if_index)
851                 self.logger.debug(
852                     "Got packet on interface %s: src=%u (idx=%u)" %
853                     (dst_if.name, payload_info.src, packet_index))
854
855                 # search for payload_info with same src and dst if_index
856                 # this will give us the transmitted packet
857                 next_info = self.get_next_packet_info_for_interface2(
858                     payload_info.src, dst_sw_if_index,
859                     last_info[payload_info.src])
860                 last_info[payload_info.src] = next_info
861                 # next_info should not be None
862                 self.assertTrue(next_info is not None)
863                 # index of tx and rx packets should be equal
864                 self.assertEqual(packet_index, next_info.index)
865                 # data field of next_info contains the tx packet
866                 txed_packet = next_info.data
867
868                 self.logger.debug(ppp("Transmitted packet:",
869                                       txed_packet))  # ppp=Pretty Print Packet
870
871                 self.logger.debug(ppp("Received packet:", packet))
872
873                 # compare rcvd packet with expected packet using compare_func
874                 compare_func(txed_packet, packet)
875
876             except:
877                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
878                 raise
879
880         # have all expected packets arrived?
881         for i in self.pg_interfaces:
882             remaining_packet = self.get_next_packet_info_for_interface2(
883                 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
884             self.assertTrue(remaining_packet is None,
885                             "Interface %s: Packet expected from interface %s "
886                             "didn't arrive" % (dst_if.name, i.name))
887
888
889 if __name__ == '__main__':
890     unittest.main(testRunner=VppTestRunner)