2 """IP{4,6} over IP{v,6} tunnel functional tests"""
5 from scapy.layers.inet6 import IPv6, Ether, IP, UDP, ICMPv6PacketTooBig
6 from scapy.layers.inet import ICMP
7 from framework import VppTestCase, VppTestRunner
8 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
9 from socket import AF_INET, AF_INET6, inet_pton
11 """ Testipip is a subclass of VPPTestCase classes.
18 class TestIPIP(VppTestCase):
19 """ IPIP Test Case """
23 super(TestIPIP, cls).setUpClass()
24 cls.create_pg_interfaces(range(2))
25 cls.interfaces = list(cls.pg_interfaces)
28 super(TestIPIP, cls).setUp()
29 for i in cls.interfaces:
38 super(TestIPIP, self).tearDown()
40 for i in self.pg_interfaces:
45 def validate(self, rx, expected):
46 self.assertEqual(rx, expected.__class__(str(expected)))
48 def validate_bytes(self, rx, expected):
49 self.assertEqual(rx, expected)
51 def payload(self, len):
55 """ ip{v4,v6} over ip4 test """
56 p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
57 p_ip6 = IPv6(src="1::1", dst="DEAD::1", nh='UDP')
58 p_ip4 = IP(src=self.pg0.remote_ip4, dst="130.67.0.1")
59 p_payload = UDP(sport=1234, dport=1234) / self.payload(1300)
62 rv = self.vapi.ipip_add_tunnel(
63 src_address=self.pg0.local_ip4n,
64 dst_address=self.pg1.remote_ip4n,
66 sw_if_index = rv.sw_if_index
68 # Set interface up and enable IP on it
69 self.vapi.sw_interface_set_flags(sw_if_index, 1)
70 self.vapi.sw_interface_set_unnumbered(
71 ip_sw_if_index=self.pg0.sw_if_index,
72 sw_if_index=sw_if_index)
74 # Add IPv4 and IPv6 routes via tunnel interface
75 ip4_via_tunnel = VppIpRoute(
76 self, "130.67.0.0", 16,
77 [VppRoutePath("0.0.0.0",
79 proto=DpoProto.DPO_PROTO_IP4)], is_ip6=0)
80 ip4_via_tunnel.add_vpp_config()
82 ip6_via_tunnel = VppIpRoute(
86 proto=DpoProto.DPO_PROTO_IP6)], is_ip6=1)
87 ip6_via_tunnel.add_vpp_config()
89 # IPv6 in to IPv4 tunnel
90 p6 = (p_ether / p_ip6 / p_payload)
93 p6_reply = (IP(src=self.pg0.local_ip4, dst=self.pg1.remote_ip4,
94 proto='ipv6', id=0) / p_inner_ip6 / p_payload)
96 rx = self.send_and_expect(self.pg0, p6*10, self.pg1)
98 self.validate(p[1], p6_reply)
100 # IPv4 in to IPv4 tunnel
101 p4 = (p_ether / p_ip4 / p_payload)
104 p4_reply = (IP(src=self.pg0.local_ip4,
105 dst=self.pg1.remote_ip4) / p_ip4_inner / p_payload)
108 rx = self.send_and_expect(self.pg0, p4*11, self.pg1)
110 self.validate(p[1], p4_reply)
112 # MTU (only checked on encap)
113 rv = self.vapi.sw_interface_set_mtu(sw_if_index, 576)
114 rv = self.vapi.sw_interface_dump()
116 if i.sw_if_index == sw_if_index:
117 self.assertEqual(i.mtu, 576)
120 # Should fail. Too large MTU
122 p_icmp4 = ICMP(type='dest-unreach', code='fragmentation-needed',
123 nexthopmtu=576, chksum=0xb6c7)
124 icmp4_reply = (IP(src=self.pg0.local_ip4,
125 dst=self.pg0.remote_ip4,
126 ttl=254, len=576, id=0) /
127 p_icmp4 / p_ip4 / p_payload)
128 icmp4_reply[1].flags = 'DF'
129 n = icmp4_reply.__class__(str(icmp4_reply))
131 icmp4_reply = s[0:576]
132 rx = self.send_and_expect(self.pg0, p4*9, self.pg0)
134 self.validate_bytes(str(p[1]), icmp4_reply)
137 rv = self.vapi.sw_interface_set_mtu(sw_if_index, 1480)
140 p_ether = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
142 # IPv4 tunnel to IPv4
143 p_ip4 = IP(src="1.2.3.4", dst=self.pg0.remote_ip4)
144 p4 = (p_ether / IP(src=self.pg1.remote_ip4,
145 dst=self.pg0.local_ip4) / p_ip4 / p_payload)
146 p4_reply = (p_ip4 / p_payload)
148 rx = self.send_and_expect(self.pg1, p4*10, self.pg0)
150 self.validate(p[1], p4_reply)
152 # IPv4 tunnel to IPv6
153 p_ip6 = IPv6(src="1:2:3::4", dst=self.pg0.remote_ip6)
154 p6 = (p_ether / IP(src=self.pg1.remote_ip4,
155 dst=self.pg0.local_ip4) / p_ip6 / p_payload)
156 p6_reply = (p_ip6 / p_payload)
158 rx = self.send_and_expect(self.pg1, p6*10, self.pg0)
160 self.validate(p[1], p6_reply)
162 def test_ipip6(self):
163 """ ip{v4,v6} over ip6 test """
164 p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
165 p_ip6 = IPv6(src=self.pg0.remote_ip6, dst="DEAD::1", nh='UDP')
166 p_ip4 = IP(src="1.2.3.4", dst="130.67.0.1")
167 p_payload = UDP(sport=1234, dport=1234) / self.payload(1300)
170 rv = self.vapi.ipip_add_tunnel(
171 src_address=self.pg0.local_ip6n,
172 dst_address=self.pg1.remote_ip6n)
174 sw_if_index = rv.sw_if_index
176 self.vapi.sw_interface_set_flags(sw_if_index, 1)
177 self.vapi.sw_interface_set_unnumbered(
178 ip_sw_if_index=self.pg0.sw_if_index, sw_if_index=sw_if_index)
180 # Add IPv4 and IPv6 routes via tunnel interface
181 ip4_via_tunnel = VppIpRoute(
182 self, "130.67.0.0", 16,
183 [VppRoutePath("0.0.0.0",
185 proto=DpoProto.DPO_PROTO_IP4)], is_ip6=0)
186 ip4_via_tunnel.add_vpp_config()
188 ip6_via_tunnel = VppIpRoute(
192 proto=DpoProto.DPO_PROTO_IP6)], is_ip6=1)
193 ip6_via_tunnel.add_vpp_config()
197 # IPv6 in to IPv6 tunnel
198 p6 = (p_ether / p_ip6 / p_payload)
199 p6_reply = (IPv6(src=self.pg0.local_ip6,
200 dst=self.pg1.remote_ip6, hlim=63) / p_ip6 / p_payload)
201 p6_reply[1].hlim -= 1
202 rx = self.send_and_expect(self.pg0, p6*10, self.pg1)
204 self.validate(p[1], p6_reply)
206 # MTU (only checked on encap)
207 rv = self.vapi.sw_interface_set_mtu(sw_if_index, 1280)
208 rv = self.vapi.sw_interface_dump()
210 if i.sw_if_index == sw_if_index:
211 self.assertEqual(i.mtu, 1280)
214 # Should fail. Too large MTU
215 p_icmp6 = ICMPv6PacketTooBig(mtu=1280, cksum=0xd401)
216 icmp6_reply = (IPv6(src=self.pg0.local_ip6,
217 dst=self.pg0.remote_ip6,
218 hlim=254, plen=1240) /
219 p_icmp6 / p_ip6 / p_payload)
220 icmp6_reply[2].hlim -= 1
222 icmp6_reply = s[0:1280]
223 rx = self.send_and_expect(self.pg0, p6*9, self.pg0)
225 self.validate_bytes(str(p[1]), icmp6_reply)
228 rv = self.vapi.sw_interface_set_mtu(sw_if_index, 1460)
230 # IPv4 in to IPv6 tunnel
231 p4 = (p_ether / p_ip4 / p_payload)
232 p4_reply = (IPv6(src=self.pg0.local_ip6,
233 dst=self.pg1.remote_ip6, hlim=63) / p_ip4 / p_payload)
235 rx = self.send_and_expect(self.pg0, p4*10, self.pg1)
237 self.validate(p[1], p4_reply)
241 p_ether = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
243 # IPv6 tunnel to IPv4
244 p_ip4 = IP(src="1.2.3.4", dst=self.pg0.remote_ip4)
245 p4 = (p_ether / IPv6(src=self.pg1.remote_ip6,
246 dst=self.pg0.local_ip6) / p_ip4 / p_payload)
247 p4_reply = (p_ip4 / p_payload)
249 rx = self.send_and_expect(self.pg1, p4*10, self.pg0)
251 self.validate(p[1], p4_reply)
253 # IPv6 tunnel to IPv6
254 p_ip6 = IPv6(src="1:2:3::4", dst=self.pg0.remote_ip6)
255 p6 = (p_ether / IPv6(src=self.pg1.remote_ip6,
256 dst=self.pg0.local_ip6) / p_ip6 / p_payload)
257 p6_reply = (p_ip6 / p_payload)
259 rx = self.send_and_expect(self.pg1, p6*10, self.pg0)
261 self.validate(p[1], p6_reply)
263 def test_ipip_create(self):
264 """ ipip create / delete interface test """
265 rv = self.vapi.ipip_add_tunnel(
266 src_address=inet_pton(AF_INET, '1.2.3.4'),
267 dst_address=inet_pton(AF_INET, '2.3.4.5'), is_ipv6=0)
268 sw_if_index = rv.sw_if_index
269 self.vapi.ipip_del_tunnel(sw_if_index)
272 if __name__ == '__main__':
273 unittest.main(testRunner=VppTestRunner)