IP fragmentation to handle buffer chains.
[vpp.git] / test / test_ipip.py
1 #!/usr/bin/env python
2 """IP{4,6} over IP{v,6} tunnel functional tests"""
3
4 import unittest
5 from scapy.layers.inet6 import IPv6, Ether, IP, UDP
6 from scapy.all import fragment
7 from framework import VppTestCase, VppTestRunner
8 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
9 from socket import AF_INET, AF_INET6, inet_pton
10 import StringIO
11
12 """ Testipip is a subclass of  VPPTestCase classes.
13
14 IPIP tests.
15
16 """
17
18
19 def reassemble(listoffragments):
20     buffer = StringIO.StringIO()
21     first = listoffragments[0]
22     buffer.seek(20)
23     for pkt in listoffragments:
24         buffer.seek(pkt[IP].frag*8)
25         buffer.write(pkt[IP].payload)
26     first.len = len(buffer.getvalue()) + 20
27     first.flags = 0
28     del(first.chksum)
29     header = str(first[IP])[:20]
30     return first[IP].__class__(header + buffer.getvalue())
31
32
33 class TestIPIP(VppTestCase):
34     """ IPIP Test Case """
35
36     @classmethod
37     def setUpClass(cls):
38         super(TestIPIP, cls).setUpClass()
39         cls.create_pg_interfaces(range(2))
40         cls.interfaces = list(cls.pg_interfaces)
41
42     def setUp(cls):
43         super(TestIPIP, cls).setUp()
44         for i in cls.interfaces:
45             i.admin_up()
46             i.config_ip4()
47             i.config_ip6()
48             i.disable_ipv6_ra()
49             i.resolve_arp()
50             i.resolve_ndp()
51
52     def tearDown(self):
53         super(TestIPIP, self).tearDown()
54         if not self.vpp_dead:
55             for i in self.pg_interfaces:
56                 i.unconfig_ip4()
57                 i.unconfig_ip6()
58                 i.admin_down()
59
60     def validate(self, rx, expected):
61         self.assertEqual(rx, expected.__class__(str(expected)))
62
63     def test_ipip4(self):
64         """ ip{v4,v6} over ip4 test """
65         p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
66         p_ip6 = IPv6(src="1::1", dst="DEAD::1", nh='UDP', tc=42)
67         p_ip4 = IP(src="1.2.3.4", dst="130.67.0.1", tos=42)
68         p_payload = UDP(sport=1234, dport=1234)
69
70         # IPv4 transport
71         rv = self.vapi.ipip_add_tunnel(
72             src_address=self.pg0.local_ip4n,
73             dst_address=self.pg1.remote_ip4n,
74             is_ipv6=0, tc_tos=0xFF)
75         sw_if_index = rv.sw_if_index
76
77         # Set interface up and enable IP on it
78         self.vapi.sw_interface_set_flags(sw_if_index, 1)
79         self.vapi.sw_interface_set_unnumbered(
80             ip_sw_if_index=self.pg0.sw_if_index,
81             sw_if_index=sw_if_index)
82
83         # Add IPv4 and IPv6 routes via tunnel interface
84         ip4_via_tunnel = VppIpRoute(
85             self, "130.67.0.0", 16,
86             [VppRoutePath("0.0.0.0",
87                           sw_if_index,
88                           proto=DpoProto.DPO_PROTO_IP4)], is_ip6=0)
89         ip4_via_tunnel.add_vpp_config()
90
91         ip6_via_tunnel = VppIpRoute(
92             self, "dead::", 16,
93             [VppRoutePath("::",
94                           sw_if_index,
95                           proto=DpoProto.DPO_PROTO_IP6)], is_ip6=1)
96         ip6_via_tunnel.add_vpp_config()
97
98         # IPv6 in to IPv4 tunnel
99         p6 = (p_ether / p_ip6 / p_payload)
100         p_inner_ip6 = p_ip6
101         p_inner_ip6.hlim -= 1
102         p6_reply = (IP(src=self.pg0.local_ip4, dst=self.pg1.remote_ip4,
103                        proto='ipv6', id=0, tos=42) / p_inner_ip6 / p_payload)
104         p6_reply.ttl -= 1
105         rx = self.send_and_expect(self.pg0, p6*10, self.pg1)
106         for p in rx:
107             self.validate(p[1], p6_reply)
108
109         # IPv4 in to IPv4 tunnel
110         p4 = (p_ether / p_ip4 / p_payload)
111         p_ip4_inner = p_ip4
112         p_ip4_inner.ttl -= 1
113         p4_reply = (IP(src=self.pg0.local_ip4, dst=self.pg1.remote_ip4,
114                        tos=42) /
115                     p_ip4_inner / p_payload)
116         p4_reply.ttl -= 1
117         p4_reply.id = 0
118         rx = self.send_and_expect(self.pg0, p4*10, self.pg1)
119         for p in rx:
120             self.validate(p[1], p4_reply)
121
122         # Decapsulation
123         p_ether = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
124
125         # IPv4 tunnel to IPv4
126         p_ip4 = IP(src="1.2.3.4", dst=self.pg0.remote_ip4)
127         p4 = (p_ether / IP(src=self.pg1.remote_ip4,
128                            dst=self.pg0.local_ip4) / p_ip4 / p_payload)
129         p4_reply = (p_ip4 / p_payload)
130         p4_reply.ttl -= 1
131         rx = self.send_and_expect(self.pg1, p4*10, self.pg0)
132         for p in rx:
133             self.validate(p[1], p4_reply)
134
135         # IPv4 tunnel to IPv6
136         p_ip6 = IPv6(src="1:2:3::4", dst=self.pg0.remote_ip6)
137         p6 = (p_ether / IP(src=self.pg1.remote_ip4,
138                            dst=self.pg0.local_ip4) / p_ip6 / p_payload)
139         p6_reply = (p_ip6 / p_payload)
140         p6_reply.hlim = 63
141         rx = self.send_and_expect(self.pg1, p6*10, self.pg0)
142         for p in rx:
143             self.validate(p[1], p6_reply)
144
145         # Fragmentation / Reassembly and Re-fragmentation
146         rv = self.vapi.ip_reassembly_enable_disable(
147             sw_if_index=self.pg1.sw_if_index,
148             enable_ip4=1)
149         # Decapsulation
150         p_ether = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
151         p_payload = UDP(sport=1234, dport=1234) / self.payload(3123)
152         p_ip4 = IP(src="1.2.3.4", dst=self.pg0.remote_ip4)
153         outer_ip4 = (p_ether / IP(src=self.pg1.remote_ip4,
154                      dst=self.pg0.local_ip4) / p_ip4 / p_payload)
155         frags = fragment(outer_ip4, 1400)
156         p4_reply = (p_ip4 / p_payload)
157         p4_reply.ttl -= 1
158
159         self.pg_enable_capture()
160         self.pg1.add_stream(frags)
161         self.pg_start()
162         rx = self.pg0.get_capture(1)
163         for p in rx:
164             self.validate(p[1], p4_reply)
165
166         # Now try with re-fragmentation
167         self.vapi.sw_interface_set_mtu(self.pg0.sw_if_index, [576, 0, 0, 0])
168         self.pg_enable_capture()
169         self.pg1.add_stream(frags)
170         self.pg_start()
171         rx = self.pg0.get_capture(6)
172         reass_pkt = reassemble(rx)
173         p4_reply.ttl -= 1
174         p4_reply.id = 256
175         self.validate(reass_pkt, p4_reply)
176
177     def test_ipip6(self):
178         """ ip{v4,v6} over ip6 test """
179         p_ether = Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac)
180         p_ip6 = IPv6(src="1::1", dst="DEAD::1", tc=42, nh='UDP')
181         p_ip4 = IP(src="1.2.3.4", dst="130.67.0.1", tos=42)
182         p_payload = UDP(sport=1234, dport=1234)
183
184         # IPv6 transport
185         rv = self.vapi.ipip_add_tunnel(
186             src_address=self.pg0.local_ip6n,
187             dst_address=self.pg1.remote_ip6n, tc_tos=255)
188
189         sw_if_index = rv.sw_if_index
190
191         self.vapi.sw_interface_set_flags(sw_if_index, 1)
192         self.vapi.sw_interface_set_unnumbered(
193             ip_sw_if_index=self.pg0.sw_if_index, sw_if_index=sw_if_index)
194
195         # Add IPv4 and IPv6 routes via tunnel interface
196         ip4_via_tunnel = VppIpRoute(
197             self, "130.67.0.0", 16,
198             [VppRoutePath("0.0.0.0",
199                           sw_if_index,
200                           proto=DpoProto.DPO_PROTO_IP4)], is_ip6=0)
201         ip4_via_tunnel.add_vpp_config()
202
203         ip6_via_tunnel = VppIpRoute(
204             self, "dead::", 16,
205             [VppRoutePath("::",
206                           sw_if_index,
207                           proto=DpoProto.DPO_PROTO_IP6)], is_ip6=1)
208         ip6_via_tunnel.add_vpp_config()
209
210         # Encapsulation
211
212         # IPv6 in to IPv6 tunnel
213         p6 = (p_ether / p_ip6 / p_payload)
214         p6_reply = (IPv6(src=self.pg0.local_ip6, dst=self.pg1.remote_ip6,
215                          hlim=63, tc=42) /
216                     p_ip6 / p_payload)
217         p6_reply[1].hlim -= 1
218         rx = self.send_and_expect(self.pg0, p6*11, self.pg1)
219         for p in rx:
220             self.validate(p[1], p6_reply)
221
222         # IPv4 in to IPv6 tunnel
223         p4 = (p_ether / p_ip4 / p_payload)
224         p4_reply = (IPv6(src=self.pg0.local_ip6,
225                          dst=self.pg1.remote_ip6, hlim=63, tc=42) /
226                     p_ip4 / p_payload)
227         p4_reply[1].ttl -= 1
228         rx = self.send_and_expect(self.pg0, p4*11, self.pg1)
229         for p in rx:
230             self.validate(p[1], p4_reply)
231
232         # Decapsulation
233
234         p_ether = Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac)
235
236         # IPv6 tunnel to IPv4
237         p_ip4 = IP(src="1.2.3.4", dst=self.pg0.remote_ip4)
238         p4 = (p_ether / IPv6(src=self.pg1.remote_ip6,
239                              dst=self.pg0.local_ip6) / p_ip4 / p_payload)
240         p4_reply = (p_ip4 / p_payload)
241         p4_reply.ttl -= 1
242         rx = self.send_and_expect(self.pg1, p4*11, self.pg0)
243         for p in rx:
244             self.validate(p[1], p4_reply)
245
246         # IPv6 tunnel to IPv6
247         p_ip6 = IPv6(src="1:2:3::4", dst=self.pg0.remote_ip6)
248         p6 = (p_ether / IPv6(src=self.pg1.remote_ip6,
249                              dst=self.pg0.local_ip6) / p_ip6 / p_payload)
250         p6_reply = (p_ip6 / p_payload)
251         p6_reply.hlim = 63
252         rx = self.send_and_expect(self.pg1, p6*11, self.pg0)
253         for p in rx:
254             self.validate(p[1], p6_reply)
255
256     def test_ipip_create(self):
257         """ ipip create / delete interface test """
258         rv = self.vapi.ipip_add_tunnel(
259             src_address=inet_pton(AF_INET, '1.2.3.4'),
260             dst_address=inet_pton(AF_INET, '2.3.4.5'), is_ipv6=0)
261         sw_if_index = rv.sw_if_index
262         self.vapi.ipip_del_tunnel(sw_if_index)
263
264     def payload(self, len):
265         return 'x' * len
266
267
268 if __name__ == '__main__':
269     unittest.main(testRunner=VppTestRunner)