4 from scapy.layers.ipsec import ESP
5 from scapy.layers.l2 import Ether, Raw, GRE
6 from scapy.layers.inet import IP, UDP
7 from framework import VppTestRunner, is_skip_aarch64_set, is_platform_aarch64
8 from template_ipsec import TemplateIpsec, IpsecTun4Tests, IpsecTun6Tests, \
9 IpsecTun4, IpsecTun6, IpsecTcpTests, config_tun_params
10 from vpp_ipsec_tun_interface import VppIpsecTunInterface, \
11 VppIpsecGRETunInterface
12 from vpp_ip_route import VppIpRoute, VppRoutePath, DpoProto
13 from vpp_ipsec import VppIpsecSA
14 from vpp_l2 import VppBridgeDomain, VppBridgeDomainPort
18 class TemplateIpsec4TunIfEsp(TemplateIpsec):
19 """ IPsec tunnel interface tests """
25 super(TemplateIpsec4TunIfEsp, cls).setUpClass()
28 def tearDownClass(cls):
29 super(TemplateIpsec4TunIfEsp, cls).tearDownClass()
32 super(TemplateIpsec4TunIfEsp, self).setUp()
34 self.tun_if = self.pg0
38 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
39 p.scapy_tun_spi, p.crypt_algo_vpp_id,
40 p.crypt_key, p.crypt_key,
41 p.auth_algo_vpp_id, p.auth_key,
43 p.tun_if.add_vpp_config()
48 VppIpRoute(self, p.remote_tun_if_host, 32,
49 [VppRoutePath(p.tun_if.remote_ip4,
50 0xffffffff)]).add_vpp_config()
51 VppIpRoute(self, p.remote_tun_if_host6, 128,
52 [VppRoutePath(p.tun_if.remote_ip6,
54 proto=DpoProto.DPO_PROTO_IP6)],
55 is_ip6=1).add_vpp_config()
59 self.vapi.cli("show hardware")
60 super(TemplateIpsec4TunIfEsp, self).tearDown()
63 class TestIpsec4TunIfEsp1(TemplateIpsec4TunIfEsp, IpsecTun4Tests):
64 """ Ipsec ESP - TUN tests """
65 tun4_encrypt_node_name = "esp4-encrypt"
66 tun4_decrypt_node_name = "esp4-decrypt"
68 def test_tun_basic64(self):
69 """ ipsec 6o4 tunnel basic test """
70 self.verify_tun_64(self.params[socket.AF_INET], count=1)
72 def test_tun_burst64(self):
73 """ ipsec 6o4 tunnel basic test """
74 self.verify_tun_64(self.params[socket.AF_INET], count=257)
76 def test_tun_basic_frag44(self):
77 """ ipsec 4o4 tunnel frag basic test """
80 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
82 self.verify_tun_44(self.params[socket.AF_INET],
83 count=1, payload_size=1800, n_rx=2)
84 self.vapi.sw_interface_set_mtu(p.tun_if.sw_if_index,
88 class TestIpsec4TunIfEsp2(TemplateIpsec4TunIfEsp, IpsecTcpTests):
89 """ Ipsec ESP - TCP tests """
93 class TemplateIpsec6TunIfEsp(TemplateIpsec):
94 """ IPsec tunnel interface tests """
99 super(TemplateIpsec6TunIfEsp, self).setUp()
101 self.tun_if = self.pg0
104 tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
105 p.scapy_tun_spi, p.crypt_algo_vpp_id,
106 p.crypt_key, p.crypt_key,
107 p.auth_algo_vpp_id, p.auth_key,
108 p.auth_key, is_ip6=True)
109 tun_if.add_vpp_config()
114 VppIpRoute(self, p.remote_tun_if_host, 128,
115 [VppRoutePath(tun_if.remote_ip6,
117 proto=DpoProto.DPO_PROTO_IP6)],
118 is_ip6=1).add_vpp_config()
119 VppIpRoute(self, p.remote_tun_if_host4, 32,
120 [VppRoutePath(tun_if.remote_ip4,
121 0xffffffff)]).add_vpp_config()
124 if not self.vpp_dead:
125 self.vapi.cli("show hardware")
126 super(TemplateIpsec6TunIfEsp, self).tearDown()
129 class TestIpsec6TunIfEsp1(TemplateIpsec6TunIfEsp, IpsecTun6Tests):
130 """ Ipsec ESP - TUN tests """
131 tun6_encrypt_node_name = "esp6-encrypt"
132 tun6_decrypt_node_name = "esp6-decrypt"
134 def test_tun_basic46(self):
135 """ ipsec 4o6 tunnel basic test """
136 self.verify_tun_46(self.params[socket.AF_INET6], count=1)
138 def test_tun_burst46(self):
139 """ ipsec 4o6 tunnel burst test """
140 self.verify_tun_46(self.params[socket.AF_INET6], count=257)
143 class TestIpsec4MultiTunIfEsp(TemplateIpsec, IpsecTun4):
144 """ IPsec IPv4 Multi Tunnel interface """
146 encryption_type = ESP
147 tun4_encrypt_node_name = "esp4-encrypt"
148 tun4_decrypt_node_name = "esp4-decrypt"
151 super(TestIpsec4MultiTunIfEsp, self).setUp()
153 self.tun_if = self.pg0
155 self.multi_params = []
158 p = copy.copy(self.ipv4_params)
160 p.remote_tun_if_host = "1.1.1.%d" % (ii + 1)
161 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
162 p.scapy_tun_spi = p.scapy_tun_spi + ii
163 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
164 p.vpp_tun_spi = p.vpp_tun_spi + ii
166 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
167 p.scapy_tra_spi = p.scapy_tra_spi + ii
168 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
169 p.vpp_tra_spi = p.vpp_tra_spi + ii
171 config_tun_params(p, self.encryption_type, self.tun_if)
172 self.multi_params.append(p)
174 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
177 p.crypt_key, p.crypt_key,
178 p.auth_algo_vpp_id, p.auth_key,
180 p.tun_if.add_vpp_config()
182 p.tun_if.config_ip4()
184 VppIpRoute(self, p.remote_tun_if_host, 32,
185 [VppRoutePath(p.tun_if.remote_ip4,
186 0xffffffff)]).add_vpp_config()
189 if not self.vpp_dead:
190 self.vapi.cli("show hardware")
191 super(TestIpsec4MultiTunIfEsp, self).tearDown()
193 def test_tun_44(self):
194 """Multiple IPSEC tunnel interfaces """
195 for p in self.multi_params:
196 self.verify_tun_44(p, count=127)
197 c = p.tun_if.get_rx_stats()
198 self.assertEqual(c['packets'], 127)
199 c = p.tun_if.get_tx_stats()
200 self.assertEqual(c['packets'], 127)
203 class TestIpsec6MultiTunIfEsp(TemplateIpsec, IpsecTun6):
204 """ IPsec IPv6 Multi Tunnel interface """
206 encryption_type = ESP
207 tun6_encrypt_node_name = "esp6-encrypt"
208 tun6_decrypt_node_name = "esp6-decrypt"
211 super(TestIpsec6MultiTunIfEsp, self).setUp()
213 self.tun_if = self.pg0
215 self.multi_params = []
218 p = copy.copy(self.ipv6_params)
220 p.remote_tun_if_host = "1111::%d" % (ii + 1)
221 p.scapy_tun_sa_id = p.scapy_tun_sa_id + ii
222 p.scapy_tun_spi = p.scapy_tun_spi + ii
223 p.vpp_tun_sa_id = p.vpp_tun_sa_id + ii
224 p.vpp_tun_spi = p.vpp_tun_spi + ii
226 p.scapy_tra_sa_id = p.scapy_tra_sa_id + ii
227 p.scapy_tra_spi = p.scapy_tra_spi + ii
228 p.vpp_tra_sa_id = p.vpp_tra_sa_id + ii
229 p.vpp_tra_spi = p.vpp_tra_spi + ii
231 config_tun_params(p, self.encryption_type, self.tun_if)
232 self.multi_params.append(p)
234 p.tun_if = VppIpsecTunInterface(self, self.pg0, p.vpp_tun_spi,
237 p.crypt_key, p.crypt_key,
238 p.auth_algo_vpp_id, p.auth_key,
239 p.auth_key, is_ip6=True)
240 p.tun_if.add_vpp_config()
242 p.tun_if.config_ip6()
244 VppIpRoute(self, p.remote_tun_if_host, 128,
245 [VppRoutePath(p.tun_if.remote_ip6,
247 proto=DpoProto.DPO_PROTO_IP6)],
248 is_ip6=1).add_vpp_config()
251 if not self.vpp_dead:
252 self.vapi.cli("show hardware")
253 super(TestIpsec6MultiTunIfEsp, self).tearDown()
255 def test_tun_66(self):
256 """Multiple IPSEC tunnel interfaces """
257 for p in self.multi_params:
258 self.verify_tun_66(p, count=127)
259 c = p.tun_if.get_rx_stats()
260 self.assertEqual(c['packets'], 127)
261 c = p.tun_if.get_tx_stats()
262 self.assertEqual(c['packets'], 127)
265 class TemplateIpsecGRETunIfEsp(TemplateIpsec):
266 """ IPsec GRE tunnel interface tests """
268 encryption_type = ESP
269 omac = "00:11:22:33:44:55"
271 def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
273 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
274 sa.encrypt(IP(src=self.pg0.remote_ip4,
275 dst=self.pg0.local_ip4) /
277 Ether(dst=self.omac) /
278 IP(src="1.1.1.1", dst="1.1.1.2") /
279 UDP(sport=1144, dport=2233) /
280 Raw('X' * payload_size))
281 for i in range(count)]
283 def gen_pkts(self, sw_intf, src, dst, count=1,
285 return [Ether(dst=self.omac) /
286 IP(src="1.1.1.1", dst="1.1.1.2") /
287 UDP(sport=1144, dport=2233) /
288 Raw('X' * payload_size)
289 for i in range(count)]
291 def verify_decrypted(self, p, rxs):
293 self.assert_equal(rx[Ether].dst, self.omac)
294 self.assert_equal(rx[IP].dst, "1.1.1.2")
296 def verify_encrypted(self, p, sa, rxs):
299 pkt = sa.decrypt(rx[IP])
300 if not pkt.haslayer(IP):
301 pkt = IP(pkt[Raw].load)
302 self.assert_packet_checksums_valid(pkt)
303 self.assert_equal(pkt[IP].dst, self.pg0.remote_ip4)
304 self.assert_equal(pkt[IP].src, self.pg0.local_ip4)
305 self.assertTrue(pkt.haslayer(GRE))
307 self.assertEqual(e[Ether].dst, self.omac)
308 self.assertEqual(e[IP].dst, "1.1.1.2")
309 except (IndexError, AssertionError):
310 self.logger.debug(ppp("Unexpected packet:", rx))
312 self.logger.debug(ppp("Decrypted packet:", pkt))
318 super(TemplateIpsecGRETunIfEsp, self).setUp()
320 self.tun_if = self.pg0
324 bd1 = VppBridgeDomain(self, 1)
327 p.tun_sa_out = VppIpsecSA(self, p.scapy_tun_sa_id, p.scapy_tun_spi,
328 p.auth_algo_vpp_id, p.auth_key,
329 p.crypt_algo_vpp_id, p.crypt_key,
330 self.vpp_esp_protocol,
333 p.tun_sa_out.add_vpp_config()
335 p.tun_sa_in = VppIpsecSA(self, p.vpp_tun_sa_id, p.vpp_tun_spi,
336 p.auth_algo_vpp_id, p.auth_key,
337 p.crypt_algo_vpp_id, p.crypt_key,
338 self.vpp_esp_protocol,
341 p.tun_sa_in.add_vpp_config()
343 self.tun = VppIpsecGRETunInterface(self, self.pg0,
347 self.tun.add_vpp_config()
349 self.tun.config_ip4()
351 VppIpRoute(self, p.remote_tun_if_host, 32,
352 [VppRoutePath(self.tun.remote_ip4,
353 0xffffffff)]).add_vpp_config()
354 VppBridgeDomainPort(self, bd1, self.tun).add_vpp_config()
355 VppBridgeDomainPort(self, bd1, self.pg1).add_vpp_config()
358 if not self.vpp_dead:
359 self.vapi.cli("show hardware")
360 self.tun.unconfig_ip4()
361 super(TemplateIpsecGRETunIfEsp, self).tearDown()
364 @unittest.skipIf(is_skip_aarch64_set and is_platform_aarch64,
365 "test doesn't work on aarch64")
366 class TestIpsecGRETunIfEsp1(TemplateIpsecGRETunIfEsp, IpsecTun4Tests):
367 """ Ipsec GRE ESP - TUN tests """
368 tun4_encrypt_node_name = "esp4-encrypt"
369 tun4_decrypt_node_name = "esp4-decrypt"
371 if __name__ == '__main__':
372 unittest.main(testRunner=VppTestRunner)