#!/usr/bin/env python
-import unittest
import socket
+import unittest
+
+import scapy.layers.inet6 as inet6
+from scapy.contrib.mpls import MPLS
+from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6ND_RS, \
+ ICMPv6ND_RA, ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \
+ ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types, \
+ ICMPv6TimeExceeded, ICMPv6EchoRequest, ICMPv6EchoReply
+from scapy.layers.l2 import Ether, Dot1Q
+from scapy.packet import Raw
+from scapy.utils import inet_pton, inet_ntop
+from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \
+ in6_mactoifaceid
+from six import moves
from framework import VppTestCase, VppTestRunner
from util import ppp, ip6_normalize
-from vpp_sub_interface import VppSubInterface, VppDot1QSubint
-from vpp_pg_interface import is_ipv6_misc
+from vpp_ip import DpoProto
from vpp_ip_route import VppIpRoute, VppRoutePath, find_route, VppIpMRoute, \
VppMRoutePath, MRouteItfFlags, MRouteEntryFlags, VppMplsIpBind, \
- VppMplsRoute, DpoProto, VppMplsTable
+ VppMplsRoute, VppMplsTable, VppIpTable, VppIpAddress
from vpp_neighbor import find_nbr, VppNeighbor
-
-from scapy.packet import Raw
-from scapy.layers.l2 import Ether, Dot1Q
-from scapy.layers.inet6 import IPv6, UDP, TCP, ICMPv6ND_NS, ICMPv6ND_RS, \
- ICMPv6ND_RA, ICMPv6NDOptSrcLLAddr, getmacbyip6, ICMPv6MRD_Solicitation, \
- ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \
- ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types, \
- ICMPv6TimeExceeded
-from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_ptop, in6_islladdr, \
- in6_mactoifaceid, in6_ismaddr
-from scapy.utils import inet_pton, inet_ntop
-from scapy.contrib.mpls import MPLS
-
+from vpp_pg_interface import is_ipv6_misc
+from vpp_sub_interface import VppSubInterface, VppDot1QSubint
AF_INET6 = socket.AF_INET6
in6_ptop(dst_ip))
# and come from the target address
- self.assertEqual(in6_ptop(rx[IPv6].src), in6_ptop(tgt_ip))
+ self.assertEqual(
+ in6_ptop(rx[IPv6].src), in6_ptop(tgt_ip))
# Dest link-layer options should have the router's MAC
dll = rx[ICMPv6NDOptDstLLAddr]
# the rx'd NS should be addressed to an mcast address
# derived from the target address
- self.assertEqual(in6_ptop(rx[IPv6].dst), in6_ptop(dst_ip))
+ self.assertEqual(
+ in6_ptop(rx[IPv6].dst), in6_ptop(dst_ip))
# expect the tgt IP in the NS header
ns = rx[ICMPv6ND_NS]
self.assertEqual(in6_ptop(ns.tgt), in6_ptop(tgt_ip))
# packet is from the router's local address
- self.assertEqual(in6_ptop(rx[IPv6].src), intf.local_ip6)
+ self.assertEqual(
+ in6_ptop(rx[IPv6].src), intf.local_ip6)
# Src link-layer options should have the router's MAC
sll = rx[ICMPv6NDOptSrcLLAddr]
:ivar list interfaces: pg interfaces and subinterfaces.
:ivar dict flows: IPv4 packet flows in test.
- :ivar list pg_if_packet_sizes: packet sizes in test.
*TODO:* Create AD sub interface
"""
self.flows[self.pg2.sub_if] = [self.pg0, self.pg1.sub_if]
# packet sizes
- self.pg_if_packet_sizes = [64, 512, 1518, 9018]
- self.sub_if_packet_sizes = [64, 512, 1518 + 4, 9018 + 4]
+ self.pg_if_packet_sizes = [64, 1500, 9020]
self.interfaces = list(self.pg_interfaces)
self.interfaces.extend(self.sub_interfaces)
def tearDown(self):
"""Run standard test teardown and log ``show ip6 neighbors``."""
- for i in self.sub_interfaces:
+ for i in self.interfaces:
i.unconfig_ip6()
i.ip6_disable()
i.admin_down()
+ for i in self.sub_interfaces:
i.remove_vpp_config()
super(TestIPv6, self).tearDown()
(count, percent))
percent += 1
- def create_stream(self, src_if, packet_sizes):
+ def modify_packet(self, src_if, packet_size, pkt):
+ """Add load, set destination IP and extend packet to required packet
+ size for defined interface.
+
+ :param VppInterface src_if: Interface to create packet for.
+ :param int packet_size: Required packet size.
+ :param Scapy pkt: Packet to be modified.
+ """
+ dst_if_idx = packet_size / 10 % 2
+ dst_if = self.flows[src_if][dst_if_idx]
+ info = self.create_packet_info(src_if, dst_if)
+ payload = self.info_to_payload(info)
+ p = pkt / Raw(payload)
+ p[IPv6].dst = dst_if.remote_ip6
+ info.data = p.copy()
+ if isinstance(src_if, VppSubInterface):
+ p = src_if.add_dot1_layer(p)
+ self.extend_packet(p, packet_size)
+
+ return p
+
+ def create_stream(self, src_if):
"""Create input packet stream for defined interface.
:param VppInterface src_if: Interface to create packet stream for.
- :param list packet_sizes: Required packet sizes.
"""
- pkts = []
- for i in range(0, 257):
- dst_if = self.flows[src_if][i % 2]
- info = self.create_packet_info(src_if, dst_if)
- payload = self.info_to_payload(info)
- p = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
- IPv6(src=src_if.remote_ip6, dst=dst_if.remote_ip6) /
- UDP(sport=1234, dport=1234) /
- Raw(payload))
- info.data = p.copy()
- if isinstance(src_if, VppSubInterface):
- p = src_if.add_dot1_layer(p)
- size = packet_sizes[(i // 2) % len(packet_sizes)]
- self.extend_packet(p, size)
- pkts.append(p)
+ hdr_ext = 4 if isinstance(src_if, VppSubInterface) else 0
+ pkt_tmpl = (Ether(dst=src_if.local_mac, src=src_if.remote_mac) /
+ IPv6(src=src_if.remote_ip6) /
+ inet6.UDP(sport=1234, dport=1234))
+
+ pkts = [self.modify_packet(src_if, i, pkt_tmpl)
+ for i in moves.range(self.pg_if_packet_sizes[0],
+ self.pg_if_packet_sizes[1], 10)]
+ pkts_b = [self.modify_packet(src_if, i, pkt_tmpl)
+ for i in moves.range(self.pg_if_packet_sizes[1] + hdr_ext,
+ self.pg_if_packet_sizes[2] + hdr_ext,
+ 50)]
+ pkts.extend(pkts_b)
+
return pkts
def verify_capture(self, dst_if, capture):
self.assertTrue(Dot1Q not in packet)
try:
ip = packet[IPv6]
- udp = packet[UDP]
+ udp = packet[inet6.UDP]
payload_info = self.payload_to_info(str(packet[Raw]))
packet_index = payload_info.index
self.assertEqual(payload_info.dst, dst_sw_if_index)
self.assertEqual(packet_index, next_info.index)
saved_packet = next_info.data
# Check standard fields
- self.assertEqual(ip.src, saved_packet[IPv6].src)
- self.assertEqual(ip.dst, saved_packet[IPv6].dst)
- self.assertEqual(udp.sport, saved_packet[UDP].sport)
- self.assertEqual(udp.dport, saved_packet[UDP].dport)
+ self.assertEqual(
+ ip.src, saved_packet[IPv6].src)
+ self.assertEqual(
+ ip.dst, saved_packet[IPv6].dst)
+ self.assertEqual(
+ udp.sport, saved_packet[inet6.UDP].sport)
+ self.assertEqual(
+ udp.dport, saved_packet[inet6.UDP].dport)
except:
self.logger.error(ppp("Unexpected or invalid packet:", packet))
raise
- Send and verify received packets on each interface.
"""
- pkts = self.create_stream(self.pg0, self.pg_if_packet_sizes)
+ pkts = self.create_stream(self.pg0)
self.pg0.add_stream(pkts)
for i in self.sub_interfaces:
- pkts = self.create_stream(i, self.sub_if_packet_sizes)
+ pkts = self.create_stream(i)
i.parent.add_stream(pkts)
self.pg_enable_capture(self.pg_interfaces)
p = (Ether(dst=in6_getnsmac(nsma)) /
IPv6(dst=d, src="2002::2") /
ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
- ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac))
+ ICMPv6NDOptSrcLLAddr(
+ lladdr=self.pg0.remote_mac))
pkts = [p]
self.send_and_assert_no_replies(
p = (Ether(dst=in6_getnsmac(nsma)) /
IPv6(dst=d, src=self.pg0.remote_ip6) /
ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
- ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac))
+ ICMPv6NDOptSrcLLAddr(
+ lladdr=self.pg0.remote_mac))
pkts = [p]
self.send_and_assert_no_replies(
p = (Ether(dst=in6_getnsmac(nsma)) /
IPv6(dst=d, src=self.pg0.remote_ip6) /
ICMPv6ND_NS(tgt="fd::ffff") /
- ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac))
+ ICMPv6NDOptSrcLLAddr(
+ lladdr=self.pg0.remote_mac))
pkts = [p]
self.send_and_assert_no_replies(self.pg0, pkts,
# address
#
p = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) /
- IPv6(dst=d, src=self.pg0._remote_hosts[2].ip6_ll) /
+ IPv6(
+ dst=d, src=self.pg0._remote_hosts[2].ip6_ll) /
ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
- ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac))
+ ICMPv6NDOptSrcLLAddr(
+ lladdr=self.pg0.remote_mac))
self.send_and_expect_na(self.pg0, p,
"NS from link-local",
# An NS to the router's own Link-local
#
p = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) /
- IPv6(dst=d, src=self.pg0._remote_hosts[3].ip6_ll) /
+ IPv6(
+ dst=d, src=self.pg0._remote_hosts[3].ip6_ll) /
ICMPv6ND_NS(tgt=self.pg0.local_ip6_ll) /
- ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac))
+ ICMPv6NDOptSrcLLAddr(
+ lladdr=self.pg0.remote_mac))
self.send_and_expect_na(self.pg0, p,
"NS to/from link-local",
src=self.pg0.remote_mac) /
IPv6(src=self.pg0.remote_ip6,
dst=self.pg1.remote_hosts[1].ip6) /
- UDP(sport=1234, dport=1234) /
+ inet6.UDP(sport=1234, dport=1234) /
Raw())
self.pg0.add_stream(p)
if not pi_opt:
# the RA should not contain prefix information
- self.assertFalse(ra.haslayer(ICMPv6NDOptPrefixInfo))
+ self.assertFalse(ra.haslayer(
+ ICMPv6NDOptPrefixInfo))
else:
raos = rx.getlayer(ICMPv6NDOptPrefixInfo, 1)
# nested classes, so a direct obj1=obj2 comparison always fails.
# however, the getlayer(.., 2) does give one instnace.
# so we cheat here and construct a new opt instnace for comparison
- rd = ICMPv6NDOptPrefixInfo(prefixlen=raos.prefixlen,
- prefix=raos.prefix,
- L=raos.L,
- A=raos.A)
+ rd = ICMPv6NDOptPrefixInfo(
+ prefixlen=raos.prefixlen,
+ prefix=raos.prefix,
+ L=raos.L,
+ A=raos.A)
if type(pi_opt) is list:
for ii in range(len(pi_opt)):
self.assertEqual(pi_opt[ii], rd)
- rd = rx.getlayer(ICMPv6NDOptPrefixInfo, ii+2)
+ rd = rx.getlayer(
+ ICMPv6NDOptPrefixInfo, ii + 2)
else:
self.assertEqual(pi_opt, raos)
# - expect an RA in return
#
p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IPv6(dst=self.pg0.local_ip6, src=self.pg0.remote_ip6) /
+ IPv6(
+ dst=self.pg0.local_ip6, src=self.pg0.remote_ip6) /
ICMPv6ND_RS())
pkts = [p]
self.send_and_expect_ra(self.pg0, pkts, "Genuine RS")
#
self.pg0.ip6_ra_config(send_unicast=1)
p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
- IPv6(dst=self.pg0.local_ip6, src="2002::ffff") /
+ IPv6(dst=self.pg0.local_ip6,
+ src="2002::ffff") /
ICMPv6ND_RS())
pkts = [p]
self.send_and_assert_no_replies(self.pg0, pkts,
#
# RAs should now contain the prefix information option
#
- opt = ICMPv6NDOptPrefixInfo(prefixlen=self.pg0.local_ip6_prefix_len,
- prefix=self.pg0.local_ip6,
- L=1,
- A=1)
+ opt = ICMPv6NDOptPrefixInfo(
+ prefixlen=self.pg0.local_ip6_prefix_len,
+ prefix=self.pg0.local_ip6,
+ L=1,
+ A=1)
self.pg0.ip6_ra_config(send_unicast=1)
ll = mk_ll_addr(self.pg0.remote_mac)
self.pg0.local_ip6_prefix_len,
off_link=1)
- opt = ICMPv6NDOptPrefixInfo(prefixlen=self.pg0.local_ip6_prefix_len,
- prefix=self.pg0.local_ip6,
- L=0,
- A=1)
+ opt = ICMPv6NDOptPrefixInfo(
+ prefixlen=self.pg0.local_ip6_prefix_len,
+ prefix=self.pg0.local_ip6,
+ L=0,
+ A=1)
self.pg0.ip6_ra_config(send_unicast=1)
self.send_and_expect_ra(self.pg0, p,
off_link=1,
no_autoconfig=1)
- opt = ICMPv6NDOptPrefixInfo(prefixlen=self.pg0.local_ip6_prefix_len,
- prefix=self.pg0.local_ip6,
- L=0,
- A=0)
+ opt = ICMPv6NDOptPrefixInfo(
+ prefixlen=self.pg0.local_ip6_prefix_len,
+ prefix=self.pg0.local_ip6,
+ L=0,
+ A=0)
self.pg0.ip6_ra_config(send_unicast=1)
self.send_and_expect_ra(self.pg0, p,
self.pg0.ip6_ra_prefix(self.pg0.local_ip6n,
self.pg0.local_ip6_prefix_len)
- opt = ICMPv6NDOptPrefixInfo(prefixlen=self.pg0.local_ip6_prefix_len,
- prefix=self.pg0.local_ip6,
- L=1,
- A=1)
+ opt = ICMPv6NDOptPrefixInfo(
+ prefixlen=self.pg0.local_ip6_prefix_len,
+ prefix=self.pg0.local_ip6,
+ L=1,
+ A=1)
self.pg0.ip6_ra_config(send_unicast=1)
self.send_and_expect_ra(self.pg0, p,
off_link=1,
no_autoconfig=1)
- opt = ICMPv6NDOptPrefixInfo(prefixlen=self.pg0.local_ip6_prefix_len,
- prefix=self.pg0.local_ip6,
- L=0,
- A=0)
+ opt = ICMPv6NDOptPrefixInfo(
+ prefixlen=self.pg0.local_ip6_prefix_len,
+ prefix=self.pg0.local_ip6,
+ L=0,
+ A=0)
self.pg0.ip6_ra_config(send_unicast=1)
self.send_and_expect_ra(self.pg0, p,
self.pg0.local_ip6_prefix_len,
use_default=1)
- opt = ICMPv6NDOptPrefixInfo(prefixlen=self.pg0.local_ip6_prefix_len,
- prefix=self.pg0.local_ip6,
- L=1,
- A=1)
+ opt = ICMPv6NDOptPrefixInfo(
+ prefixlen=self.pg0.local_ip6_prefix_len,
+ prefix=self.pg0.local_ip6,
+ L=1,
+ A=1)
self.pg0.ip6_ra_config(send_unicast=1)
self.send_and_expect_ra(self.pg0, p,
off_link=1,
no_autoconfig=1)
- opt = [ICMPv6NDOptPrefixInfo(prefixlen=self.pg0.local_ip6_prefix_len,
- prefix=self.pg0.local_ip6,
- L=1,
- A=1),
- ICMPv6NDOptPrefixInfo(prefixlen=self.pg1.local_ip6_prefix_len,
- prefix=self.pg1.local_ip6,
- L=0,
- A=0)]
+ opt = [ICMPv6NDOptPrefixInfo(
+ prefixlen=self.pg0.local_ip6_prefix_len,
+ prefix=self.pg0.local_ip6,
+ L=1,
+ A=1),
+ ICMPv6NDOptPrefixInfo(
+ prefixlen=self.pg1.local_ip6_prefix_len,
+ prefix=self.pg1.local_ip6,
+ L=0,
+ A=0)]
self.pg0.ip6_ra_config(send_unicast=1)
ll = mk_ll_addr(self.pg0.remote_mac)
self.pg0.local_ip6_prefix_len,
is_no=1)
- opt = ICMPv6NDOptPrefixInfo(prefixlen=self.pg1.local_ip6_prefix_len,
- prefix=self.pg1.local_ip6,
- L=0,
- A=0)
+ opt = ICMPv6NDOptPrefixInfo(
+ prefixlen=self.pg1.local_ip6_prefix_len,
+ prefix=self.pg1.local_ip6,
+ L=0,
+ A=0)
self.pg0.ip6_ra_config(send_unicast=1)
self.send_and_expect_ra(self.pg0, p,
self.pg0.ip6_ra_config(no=1, suppress=1, send_unicast=0)
+class TestICMPv6Echo(VppTestCase):
+ """ ICMPv6 Echo Test Case """
+
+ def setUp(self):
+ super(TestICMPv6Echo, self).setUp()
+
+ # create 1 pg interface
+ self.create_pg_interfaces(range(1))
+
+ for i in self.pg_interfaces:
+ i.admin_up()
+ i.config_ip6()
+ i.resolve_ndp()
+
+ def tearDown(self):
+ super(TestICMPv6Echo, self).tearDown()
+ for i in self.pg_interfaces:
+ i.unconfig_ip6()
+ i.ip6_disable()
+ i.admin_down()
+
+ def test_icmpv6_echo(self):
+ """ VPP replies to ICMPv6 Echo Request
+
+ Test scenario:
+
+ - Receive ICMPv6 Echo Request message on pg0 interface.
+ - Check outgoing ICMPv6 Echo Reply message on pg0 interface.
+ """
+
+ icmpv6_id = 0xb
+ icmpv6_seq = 5
+ icmpv6_data = '\x0a' * 18
+ p_echo_request = (Ether(src=self.pg0.remote_mac,
+ dst=self.pg0.local_mac) /
+ IPv6(src=self.pg0.remote_ip6,
+ dst=self.pg0.local_ip6) /
+ ICMPv6EchoRequest(
+ id=icmpv6_id,
+ seq=icmpv6_seq,
+ data=icmpv6_data))
+
+ self.pg0.add_stream(p_echo_request)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+
+ rx = self.pg0.get_capture(1)
+ rx = rx[0]
+ ether = rx[Ether]
+ ipv6 = rx[IPv6]
+ icmpv6 = rx[ICMPv6EchoReply]
+
+ self.assertEqual(ether.src, self.pg0.local_mac)
+ self.assertEqual(ether.dst, self.pg0.remote_mac)
+
+ self.assertEqual(ipv6.src, self.pg0.local_ip6)
+ self.assertEqual(ipv6.dst, self.pg0.remote_ip6)
+
+ self.assertEqual(
+ icmp6types[icmpv6.type], "Echo Reply")
+ self.assertEqual(icmpv6.id, icmpv6_id)
+ self.assertEqual(icmpv6.seq, icmpv6_seq)
+ self.assertEqual(icmpv6.data, icmpv6_data)
+
+
class TestIPv6RD(TestIPv6ND):
""" IPv6 Router Discovery Test Case """
i.config_ip6()
def tearDown(self):
+ for i in self.interfaces:
+ i.unconfig_ip6()
+ i.admin_down()
super(TestIPv6RD, self).tearDown()
def test_rd_send_router_solicitation(self):
rx_list = self.pg1.get_capture(count, timeout=3)
self.assertEqual(len(rx_list), count)
for packet in rx_list:
- self.assertTrue(packet.haslayer(IPv6))
- self.assertTrue(packet[IPv6].haslayer(ICMPv6ND_RS))
+ self.assertEqual(packet.haslayer(IPv6), 1)
+ self.assertEqual(packet[IPv6].haslayer(
+ ICMPv6ND_RS), 1)
dst = ip6_normalize(packet[IPv6].dst)
dst2 = ip6_normalize("ff02::2")
self.assert_equal(dst, dst2)
src = ip6_normalize(packet[IPv6].src)
src2 = ip6_normalize(self.pg1.local_ip6_ll)
self.assert_equal(src, src2)
- self.assertTrue(packet[ICMPv6ND_RS].haslayer(ICMPv6NDOptSrcLLAddr))
- self.assert_equal(packet[ICMPv6NDOptSrcLLAddr].lladdr,
- self.pg1.local_mac)
+ self.assertTrue(
+ bool(packet[ICMPv6ND_RS].haslayer(
+ ICMPv6NDOptSrcLLAddr)))
+ self.assert_equal(
+ packet[ICMPv6NDOptSrcLLAddr].lladdr,
+ self.pg1.local_mac)
def verify_prefix_info(self, reported_prefix, prefix_option):
prefix = socket.inet_pton(socket.AF_INET6,
self.sleep(0.1)
# send RA
- packet = (self.create_ra_packet(self.pg0) / ICMPv6NDOptPrefixInfo(
+ packet = (self.create_ra_packet(
+ self.pg0) / ICMPv6NDOptPrefixInfo(
prefix="1::",
prefixlen=64,
validlifetime=2,
# on the link that has the prefix configured
#
ns_pg1 = (Ether(dst=in6_getnsmac(nsma), src=self.pg1.remote_mac) /
- IPv6(dst=d, src=self.pg0._remote_hosts[2].ip6) /
+ IPv6(dst=d,
+ src=self.pg0._remote_hosts[2].ip6) /
ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
- ICMPv6NDOptSrcLLAddr(lladdr=self.pg0._remote_hosts[2].mac))
+ ICMPv6NDOptSrcLLAddr(
+ lladdr=self.pg0._remote_hosts[2].mac))
self.send_and_assert_no_replies(self.pg1, ns_pg1, "Off link NS")
t = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
IPv6(dst=self.pg0._remote_hosts[2].ip6,
src=self.pg0.remote_ip6) /
- UDP(sport=10000, dport=20000) /
+ inet6.UDP(sport=10000, dport=20000) /
Raw('\xa5' * 100))
self.pg0.add_stream(t)
self.assertEqual(rx[Ether].dst, self.pg0._remote_hosts[2].mac)
self.assertEqual(rx[Ether].src, self.pg1.local_mac)
- self.assertEqual(rx[IPv6].src, t[IPv6].src)
- self.assertEqual(rx[IPv6].dst, t[IPv6].dst)
+ self.assertEqual(rx[IPv6].src,
+ t[IPv6].src)
+ self.assertEqual(rx[IPv6].dst,
+ t[IPv6].dst)
#
# Test we proxy for the host on the main interface
#
ns_pg0 = (Ether(dst=in6_getnsmac(nsma), src=self.pg0.remote_mac) /
IPv6(dst=d, src=self.pg0.remote_ip6) /
- ICMPv6ND_NS(tgt=self.pg0._remote_hosts[2].ip6) /
- ICMPv6NDOptSrcLLAddr(lladdr=self.pg0.remote_mac))
+ ICMPv6ND_NS(
+ tgt=self.pg0._remote_hosts[2].ip6) /
+ ICMPv6NDOptSrcLLAddr(
+ lladdr=self.pg0.remote_mac))
self.send_and_expect_na(self.pg0, ns_pg0,
"NS to proxy entry on main",
# Setup and resolve proxy for another host on another interface
#
ns_pg2 = (Ether(dst=in6_getnsmac(nsma), src=self.pg2.remote_mac) /
- IPv6(dst=d, src=self.pg0._remote_hosts[3].ip6) /
+ IPv6(dst=d,
+ src=self.pg0._remote_hosts[3].ip6) /
ICMPv6ND_NS(tgt=self.pg0.local_ip6) /
- ICMPv6NDOptSrcLLAddr(lladdr=self.pg0._remote_hosts[2].mac))
+ ICMPv6NDOptSrcLLAddr(
+ lladdr=self.pg0._remote_hosts[2].mac))
self.vapi.ip6_nd_proxy(
inet_pton(AF_INET6, self.pg0._remote_hosts[3].ip6),
src=self.pg0.remote_hosts[3].mac) /
IPv6(dst=self.pg0._remote_hosts[2].ip6,
src=self.pg0._remote_hosts[3].ip6) /
- UDP(sport=10000, dport=20000) /
+ inet6.UDP(sport=10000, dport=20000) /
Raw('\xa5' * 100))
self.pg2.add_stream(t2)
self.assertEqual(rx[Ether].dst, self.pg0._remote_hosts[2].mac)
self.assertEqual(rx[Ether].src, self.pg1.local_mac)
- self.assertEqual(rx[IPv6].src, t2[IPv6].src)
- self.assertEqual(rx[IPv6].dst, t2[IPv6].dst)
+ self.assertEqual(rx[IPv6].src,
+ t2[IPv6].src)
+ self.assertEqual(rx[IPv6].dst,
+ t2[IPv6].dst)
#
# remove the proxy configs
p = (Ether(src=self.pg0.remote_mac,
dst=self.pg0.local_mac) /
IPv6(src=self.pg0.remote_ip6, dst="2001::1") /
- UDP(sport=1234, dport=1234) /
+ inet6.UDP(sport=1234, dport=1234) /
Raw('\xa5' * 100))
#
pu = (Ether(src=self.pg1.remote_mac,
dst=self.pg1.local_mac) /
IPv6(src="2001::1", dst=self.pg0.remote_ip6) /
- UDP(sport=1234, dport=1234) /
+ inet6.UDP(sport=1234, dport=1234) /
Raw('\xa5' * 100))
pm = (Ether(src=self.pg1.remote_mac,
dst=self.pg1.local_mac) /
IPv6(src="2001::1", dst="ffef::1") /
- UDP(sport=1234, dport=1234) /
+ inet6.UDP(sport=1234, dport=1234) /
Raw('\xa5' * 100))
#
src_mpls_pkts = []
for ii in range(65):
- port_ip_hdr = (IPv6(dst="3000::1", src="3000:1::1") /
- UDP(sport=1234, dport=1234 + ii) /
- Raw('\xa5' * 100))
+ port_ip_hdr = (
+ IPv6(dst="3000::1", src="3000:1::1") /
+ inet6.UDP(sport=1234, dport=1234 + ii) /
+ Raw('\xa5' * 100))
port_ip_pkts.append((Ether(src=self.pg0.remote_mac,
dst=self.pg0.local_mac) /
port_ip_hdr))
MPLS(label=14, ttl=2) /
MPLS(label=999, ttl=2) /
port_ip_hdr))
- src_ip_hdr = (IPv6(dst="3000::1", src="3000:1::%d" % ii) /
- UDP(sport=1234, dport=1234) /
- Raw('\xa5' * 100))
+ src_ip_hdr = (
+ IPv6(dst="3000::1", src="3000:1::%d" % ii) /
+ inet6.UDP(sport=1234, dport=1234) /
+ Raw('\xa5' * 100))
src_ip_pkts.append((Ether(src=self.pg0.remote_mac,
dst=self.pg0.local_mac) /
src_ip_hdr))
for ii in range(257):
port_pkts.append((Ether(src=self.pg0.remote_mac,
dst=self.pg0.local_mac) /
- IPv6(dst="4000::1", src="4000:1::1") /
- UDP(sport=1234, dport=1234 + ii) /
+ IPv6(dst="4000::1",
+ src="4000:1::1") /
+ inet6.UDP(sport=1234,
+ dport=1234 + ii) /
Raw('\xa5' * 100)))
src_pkts.append((Ether(src=self.pg0.remote_mac,
dst=self.pg0.local_mac) /
- IPv6(dst="4000::1", src="4000:1::%d" % ii) /
- UDP(sport=1234, dport=1234) /
+ IPv6(dst="4000::1",
+ src="4000:1::%d" % ii) /
+ inet6.UDP(sport=1234, dport=1234) /
Raw('\xa5' * 100)))
route_3000_2 = VppIpRoute(self, "3000::2", 128,
for ii in range(257):
port_pkts.append((Ether(src=self.pg0.remote_mac,
dst=self.pg0.local_mac) /
- IPv6(dst="6000::1", src="6000:1::1") /
- UDP(sport=1234, dport=1234 + ii) /
+ IPv6(dst="6000::1",
+ src="6000:1::1") /
+ inet6.UDP(sport=1234,
+ dport=1234 + ii) /
Raw('\xa5' * 100)))
route_5000_2 = VppIpRoute(self, "5000::2", 128,
def setUp(self):
super(TestIP6Punt, self).setUp()
- self.create_pg_interfaces(range(2))
+ self.create_pg_interfaces(range(4))
for i in self.pg_interfaces:
i.admin_up()
p = (Ether(src=self.pg0.remote_mac,
dst=self.pg0.local_mac) /
- IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) /
- TCP(sport=1234, dport=1234) /
+ IPv6(src=self.pg0.remote_ip6,
+ dst=self.pg0.local_ip6) /
+ inet6.TCP(sport=1234, dport=1234) /
Raw('\xa5' * 100))
pkts = p * 1025
#
# Configure a punt redirect via pg1.
#
- nh_addr = inet_pton(AF_INET6,
- self.pg1.remote_ip6)
+ nh_addr = VppIpAddress(self.pg1.remote_ip6).encode()
self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
self.pg1.sw_if_index,
- nh_addr,
- is_ip6=1)
+ nh_addr)
self.send_and_expect(self.pg0, pkts, self.pg1)
# but not equal to the number sent, since some were policed
#
rx = self.pg1._get_capture(1)
- self.assertTrue(len(rx) > 0)
- self.assertTrue(len(rx) < len(pkts))
+ self.assertGreater(len(rx), 0)
+ self.assertLess(len(rx), len(pkts))
#
# remove the poilcer. back to full rx
self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
self.pg1.sw_if_index,
nh_addr,
- is_add=0,
- is_ip6=1)
+ is_add=0)
self.send_and_assert_no_replies(self.pg0, pkts,
"IP no punt config")
#
self.vapi.ip_punt_redirect(0xffffffff,
self.pg1.sw_if_index,
- nh_addr,
- is_ip6=1)
+ nh_addr)
self.send_and_expect(self.pg0, pkts, self.pg1)
self.vapi.ip_punt_redirect(0xffffffff,
self.pg1.sw_if_index,
nh_addr,
- is_add=0,
- is_ip6=1)
+ is_add=0)
+
+ def test_ip_punt_dump(self):
+ """ IP6 punt redirect dump"""
+
+ #
+ # Configure a punt redirects
+ #
+ nh_addr = VppIpAddress(self.pg3.remote_ip6).encode()
+ self.vapi.ip_punt_redirect(self.pg0.sw_if_index,
+ self.pg3.sw_if_index,
+ nh_addr)
+ self.vapi.ip_punt_redirect(self.pg1.sw_if_index,
+ self.pg3.sw_if_index,
+ nh_addr)
+ self.vapi.ip_punt_redirect(self.pg2.sw_if_index,
+ self.pg3.sw_if_index,
+ VppIpAddress('0::0').encode())
+
+ #
+ # Dump pg0 punt redirects
+ #
+ punts = self.vapi.ip_punt_redirect_dump(self.pg0.sw_if_index,
+ is_ipv6=1)
+ for p in punts:
+ self.assertEqual(p.punt.rx_sw_if_index, self.pg0.sw_if_index)
+
+ #
+ # Dump punt redirects for all interfaces
+ #
+ punts = self.vapi.ip_punt_redirect_dump(0xffffffff, is_ipv6=1)
+ self.assertEqual(len(punts), 3)
+ for p in punts:
+ self.assertEqual(p.punt.tx_sw_if_index, self.pg3.sw_if_index)
+ self.assertNotEqual(punts[1].punt.nh.un.ip6, self.pg3.remote_ip6)
+ self.assertEqual(punts[2].punt.nh.un.ip6, '\x00'*16)
+
+
+class TestIPDeag(VppTestCase):
+ """ IPv6 Deaggregate Routes """
+
+ def setUp(self):
+ super(TestIPDeag, self).setUp()
+
+ self.create_pg_interfaces(range(3))
+
+ for i in self.pg_interfaces:
+ i.admin_up()
+ i.config_ip6()
+ i.resolve_ndp()
+
+ def tearDown(self):
+ super(TestIPDeag, self).tearDown()
+ for i in self.pg_interfaces:
+ i.unconfig_ip6()
+ i.admin_down()
+
+ def test_ip_deag(self):
+ """ IP Deag Routes """
+
+ #
+ # Create a table to be used for:
+ # 1 - another destination address lookup
+ # 2 - a source address lookup
+ #
+ table_dst = VppIpTable(self, 1, is_ip6=1)
+ table_src = VppIpTable(self, 2, is_ip6=1)
+ table_dst.add_vpp_config()
+ table_src.add_vpp_config()
+
+ #
+ # Add a route in the default table to point to a deag/
+ # second lookup in each of these tables
+ #
+ route_to_dst = VppIpRoute(self, "1::1", 128,
+ [VppRoutePath("::",
+ 0xffffffff,
+ nh_table_id=1,
+ proto=DpoProto.DPO_PROTO_IP6)],
+ is_ip6=1)
+ route_to_src = VppIpRoute(self, "1::2", 128,
+ [VppRoutePath("::",
+ 0xffffffff,
+ nh_table_id=2,
+ is_source_lookup=1,
+ proto=DpoProto.DPO_PROTO_IP6)],
+ is_ip6=1)
+ route_to_dst.add_vpp_config()
+ route_to_src.add_vpp_config()
+
+ #
+ # packets to these destination are dropped, since they'll
+ # hit the respective default routes in the second table
+ #
+ p_dst = (Ether(src=self.pg0.remote_mac,
+ dst=self.pg0.local_mac) /
+ IPv6(src="5::5", dst="1::1") /
+ inet6.TCP(sport=1234, dport=1234) /
+ Raw('\xa5' * 100))
+ p_src = (Ether(src=self.pg0.remote_mac,
+ dst=self.pg0.local_mac) /
+ IPv6(src="2::2", dst="1::2") /
+ inet6.TCP(sport=1234, dport=1234) /
+ Raw('\xa5' * 100))
+ pkts_dst = p_dst * 257
+ pkts_src = p_src * 257
+
+ self.send_and_assert_no_replies(self.pg0, pkts_dst,
+ "IP in dst table")
+ self.send_and_assert_no_replies(self.pg0, pkts_src,
+ "IP in src table")
+
+ #
+ # add a route in the dst table to forward via pg1
+ #
+ route_in_dst = VppIpRoute(self, "1::1", 128,
+ [VppRoutePath(self.pg1.remote_ip6,
+ self.pg1.sw_if_index,
+ proto=DpoProto.DPO_PROTO_IP6)],
+ is_ip6=1,
+ table_id=1)
+ route_in_dst.add_vpp_config()
+
+ self.send_and_expect(self.pg0, pkts_dst, self.pg1)
+
+ #
+ # add a route in the src table to forward via pg2
+ #
+ route_in_src = VppIpRoute(self, "2::2", 128,
+ [VppRoutePath(self.pg2.remote_ip6,
+ self.pg2.sw_if_index,
+ proto=DpoProto.DPO_PROTO_IP6)],
+ is_ip6=1,
+ table_id=2)
+ route_in_src.add_vpp_config()
+ self.send_and_expect(self.pg0, pkts_src, self.pg2)
+
+ #
+ # loop in the lookup DP
+ #
+ route_loop = VppIpRoute(self, "3::3", 128,
+ [VppRoutePath("::",
+ 0xffffffff,
+ proto=DpoProto.DPO_PROTO_IP6)],
+ is_ip6=1)
+ route_loop.add_vpp_config()
+
+ p_l = (Ether(src=self.pg0.remote_mac,
+ dst=self.pg0.local_mac) /
+ IPv6(src="3::4", dst="3::3") /
+ inet6.TCP(sport=1234, dport=1234) /
+ Raw('\xa5' * 100))
+
+ self.send_and_assert_no_replies(self.pg0, p_l * 257,
+ "IP lookup loop")
class TestIP6Input(VppTestCase):
IPv6(src=self.pg0.remote_ip6,
dst=self.pg1.remote_ip6,
version=3) /
- UDP(sport=1234, dport=1234) /
+ inet6.UDP(sport=1234, dport=1234) /
Raw('\xa5' * 100))
self.send_and_assert_no_replies(self.pg0, p_version * 65,
IPv6(src=self.pg0.remote_ip6,
dst=self.pg1.remote_ip6,
hlim=1) /
- UDP(sport=1234, dport=1234) /
+ inet6.UDP(sport=1234, dport=1234) /
Raw('\xa5' * 100))
rx = self.send_and_expect(self.pg0, p_version * 65, self.pg0)