"""IP{4,6} over IP{v,6} tunnel functional tests"""
import unittest
-from scapy.layers.inet6 import IPv6, Ether, IP, UDP, ICMPv6PacketTooBig
-from scapy.layers.inet import ICMP
+from scapy.layers.inet6 import IPv6, Ether, IP, UDP
from framework import VppTestCase, VppTestRunner
from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
from socket import AF_INET, AF_INET6, inet_pton
def validate(self, rx, expected):
self.assertEqual(rx, expected.__class__(str(expected)))
- def validate_bytes(self, rx, expected):
- self.assertEqual(rx, expected)
-
- def payload(self, len):
- return 'x' * len
-
def test_ipip4(self):
""" ip{v4,v6} over ip4 test """
p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
- p_ip6 = IPv6(src="1::1", dst="DEAD::1", nh='UDP')
- p_ip4 = IP(src=self.pg0.remote_ip4, dst="130.67.0.1")
- p_payload = UDP(sport=1234, dport=1234) / self.payload(1300)
+ p_ip6 = IPv6(src="1::1", dst="DEAD::1", nh='UDP', tc=42)
+ p_ip4 = IP(src="1.2.3.4", dst="130.67.0.1", tos=42)
+ p_payload = UDP(sport=1234, dport=1234)
# IPv4 transport
rv = self.vapi.ipip_add_tunnel(
src_address=self.pg0.local_ip4n,
dst_address=self.pg1.remote_ip4n,
- is_ipv6=0)
+ is_ipv6=0, tc_tos=0xFF)
sw_if_index = rv.sw_if_index
# Set interface up and enable IP on it
p_inner_ip6 = p_ip6
p_inner_ip6.hlim -= 1
p6_reply = (IP(src=self.pg0.local_ip4, dst=self.pg1.remote_ip4,
- proto='ipv6', id=0) / p_inner_ip6 / p_payload)
+ proto='ipv6', id=0, tos=42) / p_inner_ip6 / p_payload)
p6_reply.ttl -= 1
rx = self.send_and_expect(self.pg0, p6*10, self.pg1)
for p in rx:
p4 = (p_ether / p_ip4 / p_payload)
p_ip4_inner = p_ip4
p_ip4_inner.ttl -= 1
- p4_reply = (IP(src=self.pg0.local_ip4,
- dst=self.pg1.remote_ip4) / p_ip4_inner / p_payload)
+ p4_reply = (IP(src=self.pg0.local_ip4, dst=self.pg1.remote_ip4,
+ tos=42) /
+ p_ip4_inner / p_payload)
p4_reply.ttl -= 1
p4_reply.id = 0
- rx = self.send_and_expect(self.pg0, p4*11, self.pg1)
+ rx = self.send_and_expect(self.pg0, p4*10, self.pg1)
for p in rx:
self.validate(p[1], p4_reply)
- # MTU (only checked on encap)
- rv = self.vapi.sw_interface_set_mtu(sw_if_index, 576)
- rv = self.vapi.sw_interface_dump()
- for i in rv:
- if i.sw_if_index == sw_if_index:
- self.assertEqual(i.mtu, 576)
- break
-
- # Should fail. Too large MTU
- p4.flags = 'DF'
- p_icmp4 = ICMP(type='dest-unreach', code='fragmentation-needed',
- nexthopmtu=576, chksum=0xb6c7)
- icmp4_reply = (IP(src=self.pg0.local_ip4,
- dst=self.pg0.remote_ip4,
- ttl=254, len=576, id=0) /
- p_icmp4 / p_ip4 / p_payload)
- icmp4_reply[1].flags = 'DF'
- n = icmp4_reply.__class__(str(icmp4_reply))
- s = str(icmp4_reply)
- icmp4_reply = s[0:576]
- rx = self.send_and_expect(self.pg0, p4*9, self.pg0)
- for p in rx:
- self.validate_bytes(str(p[1]), icmp4_reply)
-
- # Reset MTU
- rv = self.vapi.sw_interface_set_mtu(sw_if_index, 1480)
-
# Decapsulation
p_ether = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
def test_ipip6(self):
""" ip{v4,v6} over ip6 test """
p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
- p_ip6 = IPv6(src=self.pg0.remote_ip6, dst="DEAD::1", nh='UDP')
- p_ip4 = IP(src="1.2.3.4", dst="130.67.0.1")
- p_payload = UDP(sport=1234, dport=1234) / self.payload(1300)
+ p_ip6 = IPv6(src="1::1", dst="DEAD::1", tc=42, nh='UDP')
+ p_ip4 = IP(src="1.2.3.4", dst="130.67.0.1", tos=42)
+ p_payload = UDP(sport=1234, dport=1234)
# IPv6 transport
rv = self.vapi.ipip_add_tunnel(
src_address=self.pg0.local_ip6n,
- dst_address=self.pg1.remote_ip6n)
+ dst_address=self.pg1.remote_ip6n, tc_tos=255)
sw_if_index = rv.sw_if_index
# IPv6 in to IPv6 tunnel
p6 = (p_ether / p_ip6 / p_payload)
- p6_reply = (IPv6(src=self.pg0.local_ip6,
- dst=self.pg1.remote_ip6, hlim=63) / p_ip6 / p_payload)
+ p6_reply = (IPv6(src=self.pg0.local_ip6, dst=self.pg1.remote_ip6,
+ hlim=63, tc=42) /
+ p_ip6 / p_payload)
p6_reply[1].hlim -= 1
- rx = self.send_and_expect(self.pg0, p6*10, self.pg1)
+ rx = self.send_and_expect(self.pg0, p6*11, self.pg1)
for p in rx:
self.validate(p[1], p6_reply)
- # MTU (only checked on encap)
- rv = self.vapi.sw_interface_set_mtu(sw_if_index, 1280)
- rv = self.vapi.sw_interface_dump()
- for i in rv:
- if i.sw_if_index == sw_if_index:
- self.assertEqual(i.mtu, 1280)
- break
-
- # Should fail. Too large MTU
- p_icmp6 = ICMPv6PacketTooBig(mtu=1280, cksum=0xd401)
- icmp6_reply = (IPv6(src=self.pg0.local_ip6,
- dst=self.pg0.remote_ip6,
- hlim=254, plen=1240) /
- p_icmp6 / p_ip6 / p_payload)
- icmp6_reply[2].hlim -= 1
- s = str(icmp6_reply)
- icmp6_reply = s[0:1280]
- rx = self.send_and_expect(self.pg0, p6*9, self.pg0)
- for p in rx:
- self.validate_bytes(str(p[1]), icmp6_reply)
-
- # Reset MTU
- rv = self.vapi.sw_interface_set_mtu(sw_if_index, 1460)
-
# IPv4 in to IPv6 tunnel
p4 = (p_ether / p_ip4 / p_payload)
p4_reply = (IPv6(src=self.pg0.local_ip6,
- dst=self.pg1.remote_ip6, hlim=63) / p_ip4 / p_payload)
+ dst=self.pg1.remote_ip6, hlim=63, tc=42) /
+ p_ip4 / p_payload)
p4_reply[1].ttl -= 1
- rx = self.send_and_expect(self.pg0, p4*10, self.pg1)
+ rx = self.send_and_expect(self.pg0, p4*11, self.pg1)
for p in rx:
self.validate(p[1], p4_reply)
dst=self.pg0.local_ip6) / p_ip4 / p_payload)
p4_reply = (p_ip4 / p_payload)
p4_reply.ttl -= 1
- rx = self.send_and_expect(self.pg1, p4*10, self.pg0)
+ rx = self.send_and_expect(self.pg1, p4*11, self.pg0)
for p in rx:
self.validate(p[1], p4_reply)
dst=self.pg0.local_ip6) / p_ip6 / p_payload)
p6_reply = (p_ip6 / p_payload)
p6_reply.hlim = 63
- rx = self.send_and_expect(self.pg1, p6*10, self.pg0)
+ rx = self.send_and_expect(self.pg1, p6*11, self.pg0)
for p in rx:
self.validate(p[1], p6_reply)