5 from socket import AF_INET6
7 from framework import VppTestCase, VppTestRunner
8 from vpp_ip import DpoProto
9 from vpp_ip_route import VppIpRoute, VppRoutePath, VppIpTable
12 from scapy.packet import Raw
13 from scapy.layers.l2 import Ether, Dot1Q
14 from scapy.layers.inet6 import IPv6, UDP, IPv6ExtHdrSegmentRouting
15 from scapy.layers.inet import IP, UDP
20 class TestSRv6(VppTestCase):
21 """ SRv6 Flow-based Dynamic Proxy plugin Test Case """
25 super(TestSRv6, self).setUpClass()
28 def tearDownClass(cls):
29 super(TestSRv6, cls).tearDownClass()
32 """ Perform test setup before each test case.
34 super(TestSRv6, self).setUp()
36 # packet sizes, inclusive L2 overhead
37 self.pg_packet_sizes = [64, 512, 1518, 9018]
40 self.reset_packet_infos()
43 """ Clean up test setup after each test case.
45 self.teardown_interfaces()
47 super(TestSRv6, self).tearDown()
49 def configure_interface(self,
51 ipv6=False, ipv4=False,
52 ipv6_table_id=0, ipv4_table_id=0):
53 """ Configure interface.
54 :param ipv6: configure IPv6 on interface
55 :param ipv4: configure IPv4 on interface
56 :param ipv6_table_id: FIB table_id for IPv6
57 :param ipv4_table_id: FIB table_id for IPv4
59 self.logger.debug("Configuring interface %s" % (interface.name))
61 self.logger.debug("Configuring IPv6")
62 interface.set_table_ip6(ipv6_table_id)
63 interface.config_ip6()
64 interface.resolve_ndp(timeout=5)
66 self.logger.debug("Configuring IPv4")
67 interface.set_table_ip4(ipv4_table_id)
68 interface.config_ip4()
69 interface.resolve_arp()
72 def setup_interfaces(self, ipv6=[], ipv4=[],
73 ipv6_table_id=[], ipv4_table_id=[]):
74 """ Create and configure interfaces.
76 :param ipv6: list of interface IPv6 capabilities
77 :param ipv4: list of interface IPv4 capabilities
78 :param ipv6_table_id: list of intf IPv6 FIB table_ids
79 :param ipv4_table_id: list of intf IPv4 FIB table_ids
80 :returns: List of created interfaces.
82 # how many interfaces?
87 self.logger.debug("Creating and configuring %d interfaces" % (count))
89 # fill up ipv6 and ipv4 lists if needed
90 # not enabled (False) is the default
92 ipv6 += (count - len(ipv6)) * [False]
94 ipv4 += (count - len(ipv4)) * [False]
96 # fill up table_id lists if needed
97 # table_id 0 (global) is the default
98 if len(ipv6_table_id) < count:
99 ipv6_table_id += (count - len(ipv6_table_id)) * [0]
100 if len(ipv4_table_id) < count:
101 ipv4_table_id += (count - len(ipv4_table_id)) * [0]
103 # create 'count' pg interfaces
104 self.create_pg_interfaces(range(count))
106 # setup all interfaces
107 for i in range(count):
108 intf = self.pg_interfaces[i]
109 self.configure_interface(intf,
111 ipv6_table_id[i], ipv4_table_id[i])
114 self.logger.debug(self.vapi.cli("show ip6 neighbors"))
116 self.logger.debug(self.vapi.cli("show ip4 neighbors"))
117 self.logger.debug(self.vapi.cli("show interface"))
118 self.logger.debug(self.vapi.cli("show hardware"))
120 return self.pg_interfaces
122 def teardown_interfaces(self):
123 """ Unconfigure and bring down interface.
125 self.logger.debug("Tearing down interfaces")
126 # tear down all interfaces
127 # AFAIK they cannot be deleted
128 for i in self.pg_interfaces:
129 self.logger.debug("Tear down interface %s" % (i.name))
135 def test_SRv6_End_AD_IPv6(self):
136 """ Test SRv6 End.AD behavior with IPv6 traffic.
138 self.src_addr = 'a0::'
139 self.sid_list = ['a1::', 'a2::a6', 'a3::']
140 self.test_sid_index = 1
142 # send traffic to one destination interface
143 # source and destination interfaces are IPv6 only
144 self.setup_interfaces(ipv6=[True, True])
146 # configure route to next segment
147 route = VppIpRoute(self, self.sid_list[self.test_sid_index + 1], 128,
148 [VppRoutePath(self.pg0.remote_ip6,
149 self.pg0.sw_if_index,
150 proto=DpoProto.DPO_PROTO_IP6)])
151 route.add_vpp_config()
153 # configure SRv6 localSID behavior
154 cli_str = "sr localsid address " + \
155 self.sid_list[self.test_sid_index] + \
156 " behavior end.ad.flow" + \
157 " nh " + self.pg1.remote_ip6 + \
158 " oif " + self.pg1.name + \
159 " iif " + self.pg1.name
160 self.vapi.cli(cli_str)
163 self.logger.debug(self.vapi.cli("show sr localsid"))
165 # send one packet per packet size
166 count = len(self.pg_packet_sizes)
168 # prepare IPv6 in SRv6 headers
169 packet_header1 = self.create_packet_header_IPv6_SRH_IPv6(
170 srcaddr=self.src_addr,
171 sidlist=self.sid_list[::-1],
172 segleft=len(self.sid_list) - self.test_sid_index - 1)
174 # generate packets (pg0->pg1)
175 pkts1 = self.create_stream(self.pg0, self.pg1, packet_header1,
176 self.pg_packet_sizes, count)
178 # send packets and verify received packets
179 self.send_and_verify_pkts(self.pg0, pkts1, self.pg1,
180 self.compare_rx_tx_packet_End_AD_IPv6_out)
182 # log the localsid counters
183 self.logger.info(self.vapi.cli("show sr localsid"))
185 # prepare IPv6 header for returning packets
186 packet_header2 = self.create_packet_header_IPv6()
188 # generate returning packets (pg1->pg0)
189 pkts2 = self.create_stream(self.pg1, self.pg0, packet_header2,
190 self.pg_packet_sizes, count)
192 # send packets and verify received packets
193 self.send_and_verify_pkts(self.pg1, pkts2, self.pg0,
194 self.compare_rx_tx_packet_End_AD_IPv6_in)
196 # log the localsid counters
197 self.logger.info(self.vapi.cli("show sr localsid"))
199 # remove SRv6 localSIDs
200 cli_str = "sr localsid del address " + \
201 self.sid_list[self.test_sid_index]
202 self.vapi.cli(cli_str)
205 self.teardown_interfaces()
207 def compare_rx_tx_packet_End_AD_IPv6_out(self, tx_pkt, rx_pkt):
208 """ Compare input and output packet after passing End.AD with IPv6
210 :param tx_pkt: transmitted packet
211 :param rx_pkt: received packet
214 # get first (outer) IPv6 header of rx'ed packet
215 rx_ip = rx_pkt.getlayer(IPv6)
217 tx_ip = tx_pkt.getlayer(IPv6)
218 tx_ip2 = tx_pkt.getlayer(IPv6, 2)
220 # verify if rx'ed packet has no SRH
221 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
223 # the whole rx_ip pkt should be equal to tx_ip2
224 # except for the hlim field
225 # -> adjust tx'ed hlim to expected hlim
226 tx_ip2.hlim = tx_ip2.hlim - 1
228 self.assertEqual(rx_ip, tx_ip2)
230 self.logger.debug("packet verification: SUCCESS")
232 def compare_rx_tx_packet_End_AD_IPv6_in(self, tx_pkt, rx_pkt):
233 """ Compare input and output packet after passing End.AD
235 :param tx_pkt: transmitted packet
236 :param rx_pkt: received packet
239 # get first (outer) IPv6 header of rx'ed packet
240 rx_ip = rx_pkt.getlayer(IPv6)
241 # received ip.src should be equal to SR Policy source
242 self.assertEqual(rx_ip.src, self.src_addr)
243 # received ip.dst should be equal to expected sidlist next segment
244 self.assertEqual(rx_ip.dst, self.sid_list[self.test_sid_index + 1])
246 # rx'ed packet should have SRH
247 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
250 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
251 # rx'ed seglist should be equal to SID-list in reversed order
252 self.assertEqual(rx_srh.addresses, self.sid_list[::-1])
253 # segleft should be equal to previous segleft value minus 1
254 self.assertEqual(rx_srh.segleft,
255 len(self.sid_list) - self.test_sid_index - 2)
256 # lastentry should be equal to the SID-list length minus 1
257 self.assertEqual(rx_srh.lastentry, len(self.sid_list) - 1)
259 # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
260 # except for the hop-limit field
261 tx_ip = tx_pkt.getlayer(IPv6)
262 # -> update tx'ed hlim to the expected hlim
265 self.assertEqual(rx_srh.payload, tx_ip)
267 self.logger.debug("packet verification: SUCCESS")
269 def test_SRv6_End_AD_IPv4(self):
270 """ Test SRv6 End.AD behavior with IPv4 traffic.
272 self.src_addr = 'a0::'
273 self.sid_list = ['a1::', 'a2::a4', 'a3::']
274 self.test_sid_index = 1
276 # send traffic to one destination interface
277 # source and destination interfaces are IPv6 only
278 self.setup_interfaces(ipv6=[True, False], ipv4=[False, True])
280 # configure route to next segment
281 route = VppIpRoute(self, self.sid_list[self.test_sid_index + 1], 128,
282 [VppRoutePath(self.pg0.remote_ip6,
283 self.pg0.sw_if_index,
284 proto=DpoProto.DPO_PROTO_IP6)])
285 route.add_vpp_config()
287 # configure SRv6 localSID behavior
288 cli_str = "sr localsid address " + \
289 self.sid_list[self.test_sid_index] + \
290 " behavior end.ad.flow" + \
291 " nh " + self.pg1.remote_ip4 + \
292 " oif " + self.pg1.name + \
293 " iif " + self.pg1.name
294 self.vapi.cli(cli_str)
297 self.logger.debug(self.vapi.cli("show sr localsid"))
299 # send one packet per packet size
300 count = len(self.pg_packet_sizes)
302 # prepare IPv4 in SRv6 headers
303 packet_header1 = self.create_packet_header_IPv6_SRH_IPv4(
304 srcaddr=self.src_addr,
305 sidlist=self.sid_list[::-1],
306 segleft=len(self.sid_list) - self.test_sid_index - 1)
308 # generate packets (pg0->pg1)
309 pkts1 = self.create_stream(self.pg0, self.pg1, packet_header1,
310 self.pg_packet_sizes, count)
312 # send packets and verify received packets
313 self.send_and_verify_pkts(self.pg0, pkts1, self.pg1,
314 self.compare_rx_tx_packet_End_AD_IPv4_out)
316 # log the localsid counters
317 self.logger.info(self.vapi.cli("show sr localsid"))
319 # prepare IPv6 header for returning packets
320 packet_header2 = self.create_packet_header_IPv4()
322 # generate returning packets (pg1->pg0)
323 pkts2 = self.create_stream(self.pg1, self.pg0, packet_header2,
324 self.pg_packet_sizes, count)
326 # send packets and verify received packets
327 self.send_and_verify_pkts(self.pg1, pkts2, self.pg0,
328 self.compare_rx_tx_packet_End_AD_IPv4_in)
330 # log the localsid counters
331 self.logger.info(self.vapi.cli("show sr localsid"))
333 # remove SRv6 localSIDs
334 cli_str = "sr localsid del address " + \
335 self.sid_list[self.test_sid_index]
336 self.vapi.cli(cli_str)
339 self.teardown_interfaces()
341 def compare_rx_tx_packet_End_AD_IPv4_out(self, tx_pkt, rx_pkt):
342 """ Compare input and output packet after passing End.AD with IPv4
344 :param tx_pkt: transmitted packet
345 :param rx_pkt: received packet
348 # get IPv4 header of rx'ed packet
349 rx_ip = rx_pkt.getlayer(IP)
351 tx_ip = tx_pkt.getlayer(IPv6)
352 tx_ip2 = tx_pkt.getlayer(IP)
354 # verify if rx'ed packet has no SRH
355 self.assertFalse(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
357 # the whole rx_ip pkt should be equal to tx_ip2
358 # except for the ttl field and ip checksum
359 # -> adjust tx'ed ttl to expected ttl
360 tx_ip2.ttl = tx_ip2.ttl - 1
361 # -> set tx'ed ip checksum to None and let scapy recompute
363 # read back the pkt (with str()) to force computing these fields
364 # probably other ways to accomplish this are possible
365 tx_ip2 = IP(scapy.compat.raw(tx_ip2))
367 self.assertEqual(rx_ip, tx_ip2)
369 self.logger.debug("packet verification: SUCCESS")
371 def compare_rx_tx_packet_End_AD_IPv4_in(self, tx_pkt, rx_pkt):
372 """ Compare input and output packet after passing End.AD
374 :param tx_pkt: transmitted packet
375 :param rx_pkt: received packet
378 # get first (outer) IPv6 header of rx'ed packet
379 rx_ip = rx_pkt.getlayer(IPv6)
380 # received ip.src should be equal to SR Policy source
381 self.assertEqual(rx_ip.src, self.src_addr)
382 # received ip.dst should be equal to expected sidlist next segment
383 self.assertEqual(rx_ip.dst, self.sid_list[self.test_sid_index + 1])
385 # rx'ed packet should have SRH
386 self.assertTrue(rx_pkt.haslayer(IPv6ExtHdrSegmentRouting))
389 rx_srh = rx_pkt.getlayer(IPv6ExtHdrSegmentRouting)
390 # rx'ed seglist should be equal to SID-list in reversed order
391 self.assertEqual(rx_srh.addresses, self.sid_list[::-1])
392 # segleft should be equal to previous segleft value minus 1
393 self.assertEqual(rx_srh.segleft,
394 len(self.sid_list) - self.test_sid_index - 2)
395 # lastentry should be equal to the SID-list length minus 1
396 self.assertEqual(rx_srh.lastentry, len(self.sid_list) - 1)
398 # the whole rx'ed pkt beyond SRH should be equal to tx'ed pkt
399 # except for the ttl field and ip checksum
400 tx_ip = tx_pkt.getlayer(IP)
401 # -> adjust tx'ed ttl to expected ttl
402 tx_ip.ttl = tx_ip.ttl - 1
403 # -> set tx'ed ip checksum to None and let scapy recompute
405 # -> read back the pkt (with str()) to force computing these fields
406 # probably other ways to accomplish this are possible
407 self.assertEqual(rx_srh.payload, IP(scapy.compat.raw(tx_ip)))
409 self.logger.debug("packet verification: SUCCESS")
411 def create_stream(self, src_if, dst_if, packet_header, packet_sizes,
413 """Create SRv6 input packet stream for defined interface.
415 :param VppInterface src_if: Interface to create packet stream for
416 :param VppInterface dst_if: destination interface of packet stream
417 :param packet_header: Layer3 scapy packet headers,
418 L2 is added when not provided,
419 Raw(payload) with packet_info is added
420 :param list packet_sizes: packet stream pckt sizes,sequentially applied
421 to packets in stream have
422 :param int count: number of packets in packet stream
423 :return: list of packets
425 self.logger.info("Creating packets")
427 for i in range(0, count - 1):
428 payload_info = self.create_packet_info(src_if, dst_if)
430 "Creating packet with index %d" % (payload_info.index))
431 payload = self.info_to_payload(payload_info)
432 # add L2 header if not yet provided in packet_header
433 if packet_header.getlayer(0).name == 'Ethernet':
434 p = packet_header / Raw(payload)
436 p = Ether(dst=src_if.local_mac, src=src_if.remote_mac) / \
437 packet_header / Raw(payload)
438 size = packet_sizes[i % len(packet_sizes)]
439 self.logger.debug("Packet size %d" % (size))
440 self.extend_packet(p, size)
441 # we need to store the packet with the automatic fields computed
442 # read back the dumped packet (with str())
443 # to force computing these fields
444 # probably other ways are possible
445 p = Ether(scapy.compat.raw(p))
446 payload_info.data = p.copy()
447 self.logger.debug(ppp("Created packet:", p))
449 self.logger.info("Done creating packets")
452 def send_and_verify_pkts(self, input, pkts, output, compare_func):
453 """Send packets and verify received packets using compare_func
455 :param input: ingress interface of DUT
456 :param pkts: list of packets to transmit
457 :param output: egress interface of DUT
458 :param compare_func: function to compare in and out packets
460 # add traffic stream to input interface
461 input.add_stream(pkts)
463 # enable capture on all interfaces
464 self.pg_enable_capture(self.pg_interfaces)
467 self.logger.info("Starting traffic")
471 self.logger.info("Getting packet capture")
472 capture = output.get_capture()
474 # assert nothing was captured on input interface
475 # input.assert_nothing_captured()
477 # verify captured packets
478 self.verify_captured_pkts(output, capture, compare_func)
480 def create_packet_header_IPv6(self, saddr='1234::1', daddr='4321::1',
481 sport=1234, dport=1234):
482 """Create packet header: IPv6 header, UDP header
484 :param dst: IPv6 destination address
486 IPv6 source address is 1234::1
487 IPv6 destination address is 4321::1
488 UDP source port and destination port are 1234
491 p = IPv6(src=saddr, dst=daddr) / UDP(sport=sport, dport=dport)
494 def create_packet_header_IPv6_SRH_IPv6(self, srcaddr, sidlist, segleft,
495 insrc='1234::1', indst='4321::1',
496 sport=1234, dport=1234):
497 """Create packet header: IPv6 encapsulated in SRv6:
498 IPv6 header with SRH, IPv6 header, UDP header
500 :param int srcaddr: outer source address
501 :param list sidlist: segment list of outer IPv6 SRH
502 :param int segleft: segments-left field of outer IPv6 SRH
504 Outer IPv6 source address is set to srcaddr
505 Outer IPv6 destination address is set to sidlist[segleft]
506 Inner IPv6 source addresses is 1234::1
507 Inner IPv6 destination address is 4321::1
508 UDP source port and destination port are 1234
511 p = IPv6(src=srcaddr, dst=sidlist[segleft]) / \
512 IPv6ExtHdrSegmentRouting(addresses=sidlist,
513 segleft=segleft, nh=41) / \
514 IPv6(src=insrc, dst=indst) / \
515 UDP(sport=sport, dport=dport)
518 def create_packet_header_IPv4(self):
519 """Create packet header: IPv4 header, UDP header
521 :param dst: IPv4 destination address
523 IPv4 source address is 123.1.1.1
524 IPv4 destination address is 124.1.1.1
525 UDP source port and destination port are 1234
528 p = IP(src='123.1.1.1', dst='124.1.1.1') / UDP(sport=1234, dport=1234)
531 def create_packet_header_IPv6_SRH_IPv4(self, srcaddr, sidlist, segleft):
532 """Create packet header: IPv4 encapsulated in SRv6:
533 IPv6 header with SRH, IPv4 header, UDP header
535 :param int srcaddr: outer source address
536 :param list sidlist: segment list of outer IPv6 SRH
537 :param int segleft: segments-left field of outer IPv6 SRH
539 Outer IPv6 source address is set to srcaddr
540 Outer IPv6 destination address is set to sidlist[segleft]
541 Inner IPv4 source address is 123.1.1.1
542 Inner IPv4 destination address is 124.1.1.1
543 UDP source port and destination port are 1234
546 p = IPv6(src=srcaddr, dst=sidlist[segleft]) / \
547 IPv6ExtHdrSegmentRouting(addresses=sidlist,
548 segleft=segleft, nh=4) / \
549 IP(src='123.1.1.1', dst='124.1.1.1') / \
550 UDP(sport=1234, dport=1234)
553 def get_payload_info(self, packet):
554 """ Extract the payload_info from the packet
556 # in most cases, payload_info is in packet[Raw]
557 # but packet[Raw] gives the complete payload
558 # (incl L2 header) for the T.Encaps L2 case
560 payload_info = self.payload_to_info(packet[Raw])
563 # remote L2 header from packet[Raw]:
564 # take packet[Raw], convert it to an Ether layer
565 # and then extract Raw from it
566 payload_info = self.payload_to_info(
567 Ether(scapy.compat.raw(packet[Raw]))[Raw])
571 def verify_captured_pkts(self, dst_if, capture, compare_func):
573 Verify captured packet stream for specified interface.
574 Compare ingress with egress packets using the specified compare fn
576 :param dst_if: egress interface of DUT
577 :param capture: captured packets
578 :param compare_func: function to compare in and out packet
580 self.logger.info("Verifying capture on interface %s using function %s"
581 % (dst_if.name, compare_func.__name__))
584 for i in self.pg_interfaces:
585 last_info[i.sw_if_index] = None
586 dst_sw_if_index = dst_if.sw_if_index
588 for packet in capture:
590 # extract payload_info from packet's payload
591 payload_info = self.get_payload_info(packet)
592 packet_index = payload_info.index
594 self.logger.debug("Verifying packet with index %d"
596 # packet should have arrived on the expected interface
597 self.assertEqual(payload_info.dst, dst_sw_if_index)
599 "Got packet on interface %s: src=%u (idx=%u)" %
600 (dst_if.name, payload_info.src, packet_index))
602 # search for payload_info with same src and dst if_index
603 # this will give us the transmitted packet
604 next_info = self.get_next_packet_info_for_interface2(
605 payload_info.src, dst_sw_if_index,
606 last_info[payload_info.src])
607 last_info[payload_info.src] = next_info
608 # next_info should not be None
609 self.assertTrue(next_info is not None)
610 # index of tx and rx packets should be equal
611 self.assertEqual(packet_index, next_info.index)
612 # data field of next_info contains the tx packet
613 txed_packet = next_info.data
615 self.logger.debug(ppp("Transmitted packet:",
616 txed_packet)) # ppp=Pretty Print Packet
618 self.logger.debug(ppp("Received packet:", packet))
620 # compare rcvd packet with expected packet using compare_func
621 compare_func(txed_packet, packet)
624 self.logger.error(ppp("Unexpected or invalid packet:", packet))
627 # have all expected packets arrived?
628 for i in self.pg_interfaces:
629 remaining_packet = self.get_next_packet_info_for_interface2(
630 i.sw_if_index, dst_sw_if_index, last_info[i.sw_if_index])
631 self.assertTrue(remaining_packet is None,
632 "Interface %s: Packet expected from interface %s "
633 "didn't arrive" % (dst_if.name, i.name))
636 if __name__ == '__main__':
637 unittest.main(testRunner=VppTestRunner)