"""IP{4,6} over IP{v,6} tunnel functional tests"""
import unittest
-from scapy.layers.inet6 import IPv6, Ether, IP, UDP
+from scapy.layers.inet6 import IPv6, Ether, IP, UDP, ICMPv6PacketTooBig
+from scapy.layers.inet import ICMP
from framework import VppTestCase, VppTestRunner
from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
from socket import AF_INET, AF_INET6, inet_pton
def validate(self, rx, expected):
self.assertEqual(rx, expected.__class__(str(expected)))
+ def validate_bytes(self, rx, expected):
+ self.assertEqual(rx, expected)
+
+ def payload(self, len):
+ return 'x' * len
+
def test_ipip4(self):
""" ip{v4,v6} over ip4 test """
p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
p_ip6 = IPv6(src="1::1", dst="DEAD::1", nh='UDP')
- p_ip4 = IP(src="1.2.3.4", dst="130.67.0.1")
- p_payload = UDP(sport=1234, dport=1234)
+ p_ip4 = IP(src=self.pg0.remote_ip4, dst="130.67.0.1")
+ p_payload = UDP(sport=1234, dport=1234) / self.payload(1300)
# IPv4 transport
rv = self.vapi.ipip_add_tunnel(
dst=self.pg1.remote_ip4) / p_ip4_inner / p_payload)
p4_reply.ttl -= 1
p4_reply.id = 0
- rx = self.send_and_expect(self.pg0, p4*10, self.pg1)
+ rx = self.send_and_expect(self.pg0, p4*11, self.pg1)
for p in rx:
self.validate(p[1], p4_reply)
+ # MTU (only checked on encap)
+ rv = self.vapi.sw_interface_set_mtu(sw_if_index, 576)
+ rv = self.vapi.sw_interface_dump()
+ for i in rv:
+ if i.sw_if_index == sw_if_index:
+ self.assertEqual(i.mtu, 576)
+ break
+
+ # Should fail. Too large MTU
+ p4.flags = 'DF'
+ p_icmp4 = ICMP(type='dest-unreach', code='fragmentation-needed',
+ nexthopmtu=576, chksum=0xb6c7)
+ icmp4_reply = (IP(src=self.pg0.local_ip4,
+ dst=self.pg0.remote_ip4,
+ ttl=254, len=576, id=0) /
+ p_icmp4 / p_ip4 / p_payload)
+ icmp4_reply[1].flags = 'DF'
+ n = icmp4_reply.__class__(str(icmp4_reply))
+ s = str(icmp4_reply)
+ icmp4_reply = s[0:576]
+ rx = self.send_and_expect(self.pg0, p4*9, self.pg0)
+ for p in rx:
+ self.validate_bytes(str(p[1]), icmp4_reply)
+
+ # Reset MTU
+ rv = self.vapi.sw_interface_set_mtu(sw_if_index, 1480)
+
# Decapsulation
p_ether = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
def test_ipip6(self):
""" ip{v4,v6} over ip6 test """
p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
- p_ip6 = IPv6(src="1::1", dst="DEAD::1", nh='UDP')
+ p_ip6 = IPv6(src=self.pg0.remote_ip6, dst="DEAD::1", nh='UDP')
p_ip4 = IP(src="1.2.3.4", dst="130.67.0.1")
- p_payload = UDP(sport=1234, dport=1234)
+ p_payload = UDP(sport=1234, dport=1234) / self.payload(1300)
# IPv6 transport
rv = self.vapi.ipip_add_tunnel(
for p in rx:
self.validate(p[1], p6_reply)
+ # MTU (only checked on encap)
+ rv = self.vapi.sw_interface_set_mtu(sw_if_index, 1280)
+ rv = self.vapi.sw_interface_dump()
+ for i in rv:
+ if i.sw_if_index == sw_if_index:
+ self.assertEqual(i.mtu, 1280)
+ break
+
+ # Should fail. Too large MTU
+ p_icmp6 = ICMPv6PacketTooBig(mtu=1280, cksum=0xd401)
+ icmp6_reply = (IPv6(src=self.pg0.local_ip6,
+ dst=self.pg0.remote_ip6,
+ hlim=254, plen=1240) /
+ p_icmp6 / p_ip6 / p_payload)
+ icmp6_reply[2].hlim -= 1
+ s = str(icmp6_reply)
+ icmp6_reply = s[0:1280]
+ rx = self.send_and_expect(self.pg0, p6*9, self.pg0)
+ for p in rx:
+ self.validate_bytes(str(p[1]), icmp6_reply)
+
+ # Reset MTU
+ rv = self.vapi.sw_interface_set_mtu(sw_if_index, 1460)
+
# IPv4 in to IPv6 tunnel
p4 = (p_ether / p_ip4 / p_payload)
p4_reply = (IPv6(src=self.pg0.local_ip6,