import scapy.layers.inet6 as inet6
from scapy.contrib.mpls import MPLS
from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6ND_RS, \
ICMPv6ND_RA, ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \
ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types, \
import scapy.layers.inet6 as inet6
from scapy.contrib.mpls import MPLS
from scapy.layers.inet6 import IPv6, ICMPv6ND_NS, ICMPv6ND_RS, \
ICMPv6ND_RA, ICMPv6NDOptMTU, ICMPv6NDOptSrcLLAddr, ICMPv6NDOptPrefixInfo, \
ICMPv6ND_NA, ICMPv6NDOptDstLLAddr, ICMPv6DestUnreach, icmp6types, \
- ICMPv6TimeExceeded, ICMPv6EchoRequest, ICMPv6EchoReply
+ ICMPv6TimeExceeded, ICMPv6EchoRequest, ICMPv6EchoReply, IPv6ExtHdrHopByHop
from scapy.layers.l2 import Ether, Dot1Q
from scapy.packet import Raw
from scapy.utils import inet_pton, inet_ntop
from scapy.layers.l2 import Ether, Dot1Q
from scapy.packet import Raw
from scapy.utils import inet_pton, inet_ntop
class TestIPv6ND(VppTestCase):
def validate_ra(self, intf, rx, dst_ip=None):
class TestIPv6ND(VppTestCase):
def validate_ra(self, intf, rx, dst_ip=None):
for i in self.interfaces:
next_hop_address = i.local_ip6n
for j in range(count / n_int):
for i in self.interfaces:
next_hop_address = i.local_ip6n
for j in range(count / n_int):
- self.vapi.ip_add_del_route(
- dest_addr, dest_addr_len, next_hop_address, is_ipv6=1)
+ self.vapi.ip_add_del_route(dst_address=dest_addr,
+ dst_address_length=dest_addr_len,
+ next_hop_address=next_hop_address,
+ is_ipv6=1)
# decipher how to decode. this 1st layer of option always returns
# nested classes, so a direct obj1=obj2 comparison always fails.
# however, the getlayer(.., 2) does give one instnace.
# decipher how to decode. this 1st layer of option always returns
# nested classes, so a direct obj1=obj2 comparison always fails.
# however, the getlayer(.., 2) does give one instnace.
rd = ICMPv6NDOptPrefixInfo(
prefixlen=raos.prefixlen,
prefix=raos.prefix,
rd = ICMPv6NDOptPrefixInfo(
prefixlen=raos.prefixlen,
prefix=raos.prefix,
- # When we reconfiure the IPv6 RA config, we reset the RA rate limiting,
+ # When we reconfigure the IPv6 RA config,
+ # we reset the RA rate limiting,
# so we need to do this before each test below so as not to drop
# packets for rate limiting reasons. Test this works here.
#
# so we need to do this before each test below so as not to drop
# packets for rate limiting reasons. Test this works here.
#
# L and A flag are clear in the advert
#
self.pg0.ip6_ra_prefix(self.pg0.local_ip6,
# L and A flag are clear in the advert
#
self.pg0.ip6_ra_prefix(self.pg0.local_ip6,
#
self.pg0.ip6_ra_prefix(self.pg1.local_ip6,
self.pg1.local_ip6_prefix_len,
#
self.pg0.ip6_ra_prefix(self.pg1.local_ip6,
self.pg1.local_ip6_prefix_len,
p_echo_request = (Ether(src=self.pg0.remote_mac,
dst=self.pg0.local_mac) /
IPv6(src=self.pg0.remote_ip6,
p_echo_request = (Ether(src=self.pg0.remote_mac,
dst=self.pg0.local_mac) /
IPv6(src=self.pg0.remote_ip6,
- defaut_route = {}
- defaut_route['sw_if_index'] = path.sw_if_index
- defaut_route['next_hop'] = path.next_hop
- list.append(defaut_route)
+ default_route = {}
+ default_route['sw_if_index'] = path.sw_if_index
+ default_route['next_hop'] = path.next_hop
+ list.append(default_route)
# check FIB still contains the SLAAC address
addresses = set(self.get_interface_addresses(fib, self.pg0))
new_addresses = addresses.difference(initial_addresses)
# check FIB still contains the SLAAC address
addresses = set(self.get_interface_addresses(fib, self.pg0))
new_addresses = addresses.difference(initial_addresses)
self.assertEqual(len(new_addresses), 1)
prefix = list(new_addresses)[0][:8] + '\0\0\0\0\0\0\0\0'
self.assertEqual(inet_ntop(AF_INET6, prefix), '1::')
self.assertEqual(len(new_addresses), 1)
prefix = list(new_addresses)[0][:8] + '\0\0\0\0\0\0\0\0'
self.assertEqual(inet_ntop(AF_INET6, prefix), '1::')
def setUp(self):
super(TestIPDisabled, self).setUp()
# create 2 pg interfaces
self.create_pg_interfaces(range(2))
def setUp(self):
super(TestIPDisabled, self).setUp()
# create 2 pg interfaces
self.create_pg_interfaces(range(2))
port_ip_hdr = (
IPv6(dst="3000::1", src="3000:1::1") /
inet6.UDP(sport=1234, dport=1234 + ii) /
port_ip_hdr = (
IPv6(dst="3000::1", src="3000:1::1") /
inet6.UDP(sport=1234, dport=1234 + ii) /
#
route_3000_1 = VppIpRoute(self, "3000::1", 128,
[VppRoutePath(self.pg1.remote_ip6,
#
route_3000_1 = VppIpRoute(self, "3000::1", 128,
[VppRoutePath(self.pg1.remote_ip6,
# src,dst
# We are not going to ensure equal amounts of packets across each link,
# since the hash algorithm is statistical and therefore this can never
# src,dst
# We are not going to ensure equal amounts of packets across each link,
# since the hash algorithm is statistical and therefore this can never
# balancing. So instead just ensure there is traffic on each link.
#
self.send_and_expect_load_balancing(self.pg0, port_ip_pkts,
# balancing. So instead just ensure there is traffic on each link.
#
self.send_and_expect_load_balancing(self.pg0, port_ip_pkts,
- self.vapi.set_ip_flow_hash(0, is_ip6=1, src=1, dst=1, sport=0, dport=0)
+ self.vapi.set_ip_flow_hash(vrf_id=0, src=1, dst=1, sport=0, dport=0,
+ is_ipv6=1)
- self.vapi.set_ip_flow_hash(0, is_ip6=1, src=1, dst=1, sport=1, dport=1)
+ self.vapi.set_ip_flow_hash(vrf_id=0, src=1, dst=1, sport=1, dport=1,
+ is_ipv6=1)
- policer = self.vapi.policer_add_del("ip6-punt", 400, 0, 10, 0,
+ policer = self.vapi.policer_add_del(b"ip6-punt", 400, 0, 10, 0,
# but not equal to the number sent, since some were policed
#
rx = self.pg1._get_capture(1)
# but not equal to the number sent, since some were policed
#
rx = self.pg1._get_capture(1)
# remove the policer. back to full rx
#
self.vapi.ip_punt_police(policer.policer_index, is_add=0, is_ip6=1)
# remove the policer. back to full rx
#
self.vapi.ip_punt_police(policer.policer_index, is_add=0, is_ip6=1)
- self.vapi.policer_add_del("ip6-punt", 400, 0, 10, 0,
+ self.vapi.policer_add_del(b"ip6-punt", 400, 0, 10, 0,
+ def test_hop_by_hop(self):
+ """ Hop-by-hop header test """
+
+ p = (Ether(src=self.pg0.remote_mac,
+ dst=self.pg0.local_mac) /
+ IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) /
+ IPv6ExtHdrHopByHop() /
+ inet6.UDP(sport=1234, dport=1234) /
+ Raw('\xa5' * 100))
+
+ self.pg0.add_stream(p)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()