4 from scapy.layers.ipsec import AH
5 from scapy.layers.inet import IP, UDP
6 from scapy.layers.inet6 import IPv6
7 from scapy.layers.l2 import Ether
8 from scapy.packet import Raw
10 from framework import VppTestRunner
11 from template_ipsec import TemplateIpsec, IpsecTra46Tests, IpsecTun46Tests, \
12 config_tun_params, config_tra_params, IPsecIPv4Params, IPsecIPv6Params, \
13 IpsecTra4, IpsecTun4, IpsecTra6, IpsecTun6, \
14 IpsecTun6HandoffTests, IpsecTun4HandoffTests
15 from template_ipsec import IpsecTcpTests
16 from vpp_ipsec import VppIpsecSA, VppIpsecSpd, VppIpsecSpdEntry,\
18 from vpp_ip_route import VppIpRoute, VppRoutePath
19 from vpp_ip import DpoProto
20 from vpp_papi import VppEnum
23 class ConfigIpsecAH(TemplateIpsec):
25 Basic test for IPSEC using AH transport and Tunnel mode
35 --- encrypt --- plain ---
36 |pg0| <------- |VPP| <------ |pg1|
39 --- decrypt --- plain ---
40 |pg0| -------> |VPP| ------> |pg1|
45 tra4_encrypt_node_name = "ah4-encrypt"
46 tra4_decrypt_node_name = "ah4-decrypt"
47 tra6_encrypt_node_name = "ah6-encrypt"
48 tra6_decrypt_node_name = "ah6-decrypt"
49 tun4_encrypt_node_name = "ah4-encrypt"
50 tun4_decrypt_node_name = "ah4-decrypt"
51 tun6_encrypt_node_name = "ah6-encrypt"
52 tun6_decrypt_node_name = "ah6-decrypt"
56 super(ConfigIpsecAH, cls).setUpClass()
59 def tearDownClass(cls):
60 super(ConfigIpsecAH, cls).tearDownClass()
63 super(ConfigIpsecAH, self).setUp()
66 super(ConfigIpsecAH, self).tearDown()
68 def config_network(self, params):
70 self.tun_if = self.pg0
71 self.tra_if = self.pg2
72 self.logger.info(self.vapi.ppcli("show int addr"))
74 self.tra_spd = VppIpsecSpd(self, self.tra_spd_id)
75 self.tra_spd.add_vpp_config()
76 self.net_objs.append(self.tra_spd)
77 self.tun_spd = VppIpsecSpd(self, self.tun_spd_id)
78 self.tun_spd.add_vpp_config()
79 self.net_objs.append(self.tun_spd)
81 b = VppIpsecSpdItfBinding(self, self.tra_spd,
84 self.net_objs.append(b)
86 b = VppIpsecSpdItfBinding(self, self.tun_spd,
89 self.net_objs.append(b)
93 config_tra_params(p, self.encryption_type)
96 config_tun_params(p, self.encryption_type, self.tun_if)
98 d = DpoProto.DPO_PROTO_IP6 if p.is_ipv6 else DpoProto.DPO_PROTO_IP4
99 r = VppIpRoute(self, p.remote_tun_if_host, p.addr_len,
100 [VppRoutePath(self.tun_if.remote_addr[p.addr_type],
104 self.net_objs.append(r)
105 self.logger.info(self.vapi.ppcli("show ipsec all"))
107 def unconfig_network(self):
108 for o in reversed(self.net_objs):
109 o.remove_vpp_config()
112 def config_ah_tun(self, params):
113 addr_type = params.addr_type
114 scapy_tun_sa_id = params.scapy_tun_sa_id
115 scapy_tun_spi = params.scapy_tun_spi
116 vpp_tun_sa_id = params.vpp_tun_sa_id
117 vpp_tun_spi = params.vpp_tun_spi
118 auth_algo_vpp_id = params.auth_algo_vpp_id
119 auth_key = params.auth_key
120 crypt_algo_vpp_id = params.crypt_algo_vpp_id
121 crypt_key = params.crypt_key
122 remote_tun_if_host = params.remote_tun_if_host
123 addr_any = params.addr_any
124 addr_bcast = params.addr_bcast
126 tun_flags = params.tun_flags
127 e = VppEnum.vl_api_ipsec_spd_action_t
130 params.tun_sa_in = VppIpsecSA(self, scapy_tun_sa_id, scapy_tun_spi,
131 auth_algo_vpp_id, auth_key,
132 crypt_algo_vpp_id, crypt_key,
133 self.vpp_ah_protocol,
134 self.tun_if.local_addr[addr_type],
135 self.tun_if.remote_addr[addr_type],
140 params.tun_sa_out = VppIpsecSA(self, vpp_tun_sa_id, vpp_tun_spi,
141 auth_algo_vpp_id, auth_key,
142 crypt_algo_vpp_id, crypt_key,
143 self.vpp_ah_protocol,
144 self.tun_if.remote_addr[addr_type],
145 self.tun_if.local_addr[addr_type],
150 objs.append(params.tun_sa_in)
151 objs.append(params.tun_sa_out)
153 params.spd_policy_in_any = VppIpsecSpdEntry(self, self.tun_spd,
155 addr_any, addr_bcast,
156 addr_any, addr_bcast,
158 params.spd_policy_out_any = VppIpsecSpdEntry(self, self.tun_spd,
160 addr_any, addr_bcast,
161 addr_any, addr_bcast,
165 objs.append(params.spd_policy_out_any)
166 objs.append(params.spd_policy_in_any)
168 e1 = VppIpsecSpdEntry(self, self.tun_spd, vpp_tun_sa_id,
171 self.pg1.remote_addr[addr_type],
172 self.pg1.remote_addr[addr_type],
174 policy=e.IPSEC_API_SPD_ACTION_PROTECT,
176 e2 = VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
177 self.pg1.remote_addr[addr_type],
178 self.pg1.remote_addr[addr_type],
181 0, policy=e.IPSEC_API_SPD_ACTION_PROTECT,
183 e3 = VppIpsecSpdEntry(self, self.tun_spd, vpp_tun_sa_id,
186 self.pg0.local_addr[addr_type],
187 self.pg0.local_addr[addr_type],
189 policy=e.IPSEC_API_SPD_ACTION_PROTECT,
191 e4 = VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
192 self.pg0.local_addr[addr_type],
193 self.pg0.local_addr[addr_type],
196 0, policy=e.IPSEC_API_SPD_ACTION_PROTECT,
199 objs = objs + [e1, e2, e3, e4]
204 self.net_objs = self.net_objs + objs
206 def config_ah_tra(self, params):
207 addr_type = params.addr_type
208 scapy_tra_sa_id = params.scapy_tra_sa_id
209 scapy_tra_spi = params.scapy_tra_spi
210 vpp_tra_sa_id = params.vpp_tra_sa_id
211 vpp_tra_spi = params.vpp_tra_spi
212 auth_algo_vpp_id = params.auth_algo_vpp_id
213 auth_key = params.auth_key
214 crypt_algo_vpp_id = params.crypt_algo_vpp_id
215 crypt_key = params.crypt_key
216 addr_any = params.addr_any
217 addr_bcast = params.addr_bcast
218 flags = params.flags | (VppEnum.vl_api_ipsec_sad_flags_t.
219 IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY)
220 e = VppEnum.vl_api_ipsec_spd_action_t
223 params.tra_sa_in = VppIpsecSA(self, scapy_tra_sa_id, scapy_tra_spi,
224 auth_algo_vpp_id, auth_key,
225 crypt_algo_vpp_id, crypt_key,
226 self.vpp_ah_protocol,
228 params.tra_sa_out = VppIpsecSA(self, vpp_tra_sa_id, vpp_tra_spi,
229 auth_algo_vpp_id, auth_key,
230 crypt_algo_vpp_id, crypt_key,
231 self.vpp_ah_protocol,
234 objs.append(params.tra_sa_in)
235 objs.append(params.tra_sa_out)
237 objs.append(VppIpsecSpdEntry(self, self.tra_spd, vpp_tra_sa_id,
238 addr_any, addr_bcast,
239 addr_any, addr_bcast,
241 objs.append(VppIpsecSpdEntry(self, self.tra_spd, scapy_tra_sa_id,
242 addr_any, addr_bcast,
243 addr_any, addr_bcast,
246 objs.append(VppIpsecSpdEntry(self, self.tra_spd, vpp_tra_sa_id,
247 self.tra_if.local_addr[addr_type],
248 self.tra_if.local_addr[addr_type],
249 self.tra_if.remote_addr[addr_type],
250 self.tra_if.remote_addr[addr_type],
252 policy=e.IPSEC_API_SPD_ACTION_PROTECT,
254 objs.append(VppIpsecSpdEntry(self, self.tra_spd, scapy_tra_sa_id,
255 self.tra_if.local_addr[addr_type],
256 self.tra_if.local_addr[addr_type],
257 self.tra_if.remote_addr[addr_type],
258 self.tra_if.remote_addr[addr_type],
259 0, policy=e.IPSEC_API_SPD_ACTION_PROTECT,
264 self.net_objs = self.net_objs + objs
267 class TemplateIpsecAh(ConfigIpsecAH):
269 Basic test for IPSEC using AH transport and Tunnel mode
274 |pg2| <-------> |VPP|
279 --- encrypt --- plain ---
280 |pg0| <------- |VPP| <------ |pg1|
283 --- decrypt --- plain ---
284 |pg0| -------> |VPP| ------> |pg1|
289 super(TemplateIpsecAh, cls).setUpClass()
292 def tearDownClass(cls):
293 super(TemplateIpsecAh, cls).tearDownClass()
296 super(TemplateIpsecAh, self).setUp()
297 self.config_network(self.params.values())
300 self.unconfig_network()
301 super(TemplateIpsecAh, self).tearDown()
304 class TestIpsecAh1(TemplateIpsecAh, IpsecTcpTests):
305 """ Ipsec AH - TCP tests """
309 class TestIpsecAh2(TemplateIpsecAh, IpsecTra46Tests, IpsecTun46Tests):
310 """ Ipsec AH w/ SHA1 """
314 class TestIpsecAhTun(TemplateIpsecAh, IpsecTun46Tests):
315 """ Ipsec AH - TUN encap tests """
318 self.ipv4_params = IPsecIPv4Params()
319 self.ipv6_params = IPsecIPv6Params()
321 c = (VppEnum.vl_api_tunnel_encap_decap_flags_t.
322 TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_DSCP)
323 c1 = c | (VppEnum.vl_api_tunnel_encap_decap_flags_t.
324 TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_ECN)
326 self.ipv4_params.tun_flags = c
327 self.ipv6_params.tun_flags = c1
329 super(TestIpsecAhTun, self).setUp()
331 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
332 # set the DSCP + ECN - flags are set to copy only DSCP
333 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
334 IP(src=src, dst=dst, tos=5) /
335 UDP(sport=4444, dport=4444) /
336 Raw(b'X' * payload_size)
337 for i in range(count)]
339 def gen_pkts6(self, sw_intf, src, dst, count=1, payload_size=54):
340 # set the DSCP + ECN - flags are set to copy both
341 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
342 IPv6(src=src, dst=dst, tc=5) /
343 UDP(sport=4444, dport=4444) /
344 Raw(b'X' * payload_size)
345 for i in range(count)]
347 def verify_encrypted(self, p, sa, rxs):
348 # just check that only the DSCP is copied
350 self.assertEqual(rx[IP].tos, 4)
352 def verify_encrypted6(self, p, sa, rxs):
353 # just check that the DSCP & ECN are copied
355 self.assertEqual(rx[IPv6].tc, 5)
358 class TestIpsecAhTun2(TemplateIpsecAh, IpsecTun46Tests):
359 """ Ipsec AH - TUN encap tests """
362 self.ipv4_params = IPsecIPv4Params()
363 self.ipv6_params = IPsecIPv6Params()
365 self.ipv4_params.dscp = 3
366 self.ipv6_params.dscp = 4
368 super(TestIpsecAhTun2, self).setUp()
370 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
371 # set the DSCP + ECN - flags are set to copy only DSCP
372 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
373 IP(src=src, dst=dst, tos=0) /
374 UDP(sport=4444, dport=4444) /
375 Raw(b'X' * payload_size)
376 for i in range(count)]
378 def gen_pkts6(self, sw_intf, src, dst, count=1, payload_size=54):
379 # set the DSCP + ECN - flags are set to copy both
380 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
381 IPv6(src=src, dst=dst, tc=0) /
382 UDP(sport=4444, dport=4444) /
383 Raw(b'X' * payload_size)
384 for i in range(count)]
386 def verify_encrypted(self, p, sa, rxs):
387 # just check that only the DSCP is copied
389 self.assertEqual(rx[IP].tos, 0xc)
391 def verify_encrypted6(self, p, sa, rxs):
392 # just check that the DSCP & ECN are copied
394 self.assertEqual(rx[IPv6].tc, 0x10)
397 class TestIpsecAhHandoff(TemplateIpsecAh,
398 IpsecTun6HandoffTests,
399 IpsecTun4HandoffTests):
400 """ Ipsec AH Handoff """
404 class TestIpsecAhAll(ConfigIpsecAH,
405 IpsecTra4, IpsecTra6,
406 IpsecTun4, IpsecTun6):
407 """ Ipsec AH all Algos """
410 super(TestIpsecAhAll, self).setUp()
413 super(TestIpsecAhAll, self).tearDown()
415 def test_integ_algs(self):
416 """All Engines SHA[1_96, 256, 384, 512] w/ & w/o ESN"""
417 # foreach VPP crypto engine
418 engines = ["ia32", "ipsecmb", "openssl"]
420 algos = [{'vpp': VppEnum.vl_api_ipsec_integ_alg_t.
421 IPSEC_API_INTEG_ALG_SHA1_96,
422 'scapy': "HMAC-SHA1-96"},
423 {'vpp': VppEnum.vl_api_ipsec_integ_alg_t.
424 IPSEC_API_INTEG_ALG_SHA_256_128,
425 'scapy': "SHA2-256-128"},
426 {'vpp': VppEnum.vl_api_ipsec_integ_alg_t.
427 IPSEC_API_INTEG_ALG_SHA_384_192,
428 'scapy': "SHA2-384-192"},
429 {'vpp': VppEnum.vl_api_ipsec_integ_alg_t.
430 IPSEC_API_INTEG_ALG_SHA_512_256,
431 'scapy': "SHA2-512-256"}]
433 flags = [0, (VppEnum.vl_api_ipsec_sad_flags_t.
434 IPSEC_API_SAD_FLAG_USE_ESN)]
437 # loop through the VPP engines
439 for engine in engines:
440 self.vapi.cli("set crypto handler all %s" % engine)
442 # loop through each of the algorithms
445 # with self.subTest(algo=algo['scapy']):
448 # setup up the config paramters
450 self.ipv4_params = IPsecIPv4Params()
451 self.ipv6_params = IPsecIPv6Params()
453 self.params = {self.ipv4_params.addr_type:
455 self.ipv6_params.addr_type:
458 for _, p in self.params.items():
459 p.auth_algo_vpp_id = algo['vpp']
460 p.auth_algo = algo['scapy']
461 p.flags = p.flags | flag
464 # configure the SPDs. SAs, etc
466 self.config_network(self.params.values())
470 # An exhautsive 4o6, 6o4 is not necessary for each algo
472 self.verify_tra_basic6(count=17)
473 self.verify_tra_basic4(count=17)
474 self.verify_tun_66(self.params[socket.AF_INET6], count=17)
475 self.verify_tun_44(self.params[socket.AF_INET], count=17)
478 # remove the SPDs, SAs, etc
480 self.unconfig_network()
483 if __name__ == '__main__':
484 unittest.main(testRunner=VppTestRunner)