4 from scapy.layers.ipsec import AH
5 from scapy.layers.inet import IP, UDP
6 from scapy.layers.inet6 import IPv6
7 from scapy.layers.l2 import Ether
8 from scapy.packet import Raw
10 from framework import VppTestRunner
11 from template_ipsec import TemplateIpsec, IpsecTra46Tests, IpsecTun46Tests, \
12 config_tun_params, config_tra_params, IPsecIPv4Params, IPsecIPv6Params, \
13 IpsecTra4, IpsecTun4, IpsecTra6, IpsecTun6, \
14 IpsecTun6HandoffTests, IpsecTun4HandoffTests
15 from template_ipsec import IpsecTcpTests
16 from vpp_ipsec import VppIpsecSA, VppIpsecSpd, VppIpsecSpdEntry,\
18 from vpp_ip_route import VppIpRoute, VppRoutePath
19 from vpp_ip import DpoProto
20 from vpp_papi import VppEnum
23 class ConfigIpsecAH(TemplateIpsec):
25 Basic test for IPSEC using AH transport and Tunnel mode
35 --- encrypt --- plain ---
36 |pg0| <------- |VPP| <------ |pg1|
39 --- decrypt --- plain ---
40 |pg0| -------> |VPP| ------> |pg1|
45 tra4_encrypt_node_name = "ah4-encrypt"
46 tra4_decrypt_node_name = ["ah4-decrypt", "ah4-decrypt"]
47 tra6_encrypt_node_name = "ah6-encrypt"
48 tra6_decrypt_node_name = ["ah6-decrypt", "ah6-decrypt"]
49 tun4_encrypt_node_name = "ah4-encrypt"
50 tun4_decrypt_node_name = ["ah4-decrypt", "ah4-decrypt"]
51 tun6_encrypt_node_name = "ah6-encrypt"
52 tun6_decrypt_node_name = ["ah6-decrypt", "ah6-decrypt"]
56 super(ConfigIpsecAH, cls).setUpClass()
59 def tearDownClass(cls):
60 super(ConfigIpsecAH, cls).tearDownClass()
63 super(ConfigIpsecAH, self).setUp()
66 super(ConfigIpsecAH, self).tearDown()
68 def config_network(self, params):
70 self.tun_if = self.pg0
71 self.tra_if = self.pg2
72 self.logger.info(self.vapi.ppcli("show int addr"))
74 self.tra_spd = VppIpsecSpd(self, self.tra_spd_id)
75 self.tra_spd.add_vpp_config()
76 self.net_objs.append(self.tra_spd)
77 self.tun_spd = VppIpsecSpd(self, self.tun_spd_id)
78 self.tun_spd.add_vpp_config()
79 self.net_objs.append(self.tun_spd)
81 b = VppIpsecSpdItfBinding(self, self.tra_spd,
84 self.net_objs.append(b)
86 b = VppIpsecSpdItfBinding(self, self.tun_spd,
89 self.net_objs.append(b)
93 config_tra_params(p, self.encryption_type)
96 config_tun_params(p, self.encryption_type, self.tun_if)
98 d = DpoProto.DPO_PROTO_IP6 if p.is_ipv6 else DpoProto.DPO_PROTO_IP4
99 r = VppIpRoute(self, p.remote_tun_if_host, p.addr_len,
100 [VppRoutePath(self.tun_if.remote_addr[p.addr_type],
104 self.net_objs.append(r)
105 self.logger.info(self.vapi.ppcli("show ipsec all"))
107 def unconfig_network(self):
108 for o in reversed(self.net_objs):
109 o.remove_vpp_config()
112 def config_ah_tun(self, params):
113 addr_type = params.addr_type
114 scapy_tun_sa_id = params.scapy_tun_sa_id
115 scapy_tun_spi = params.scapy_tun_spi
116 vpp_tun_sa_id = params.vpp_tun_sa_id
117 vpp_tun_spi = params.vpp_tun_spi
118 auth_algo_vpp_id = params.auth_algo_vpp_id
119 auth_key = params.auth_key
120 crypt_algo_vpp_id = params.crypt_algo_vpp_id
121 crypt_key = params.crypt_key
122 remote_tun_if_host = params.remote_tun_if_host
123 addr_any = params.addr_any
124 addr_bcast = params.addr_bcast
126 tun_flags = params.tun_flags
127 e = VppEnum.vl_api_ipsec_spd_action_t
129 params.outer_hop_limit = 253
130 params.outer_flow_label = 0x12345
132 params.tun_sa_in = VppIpsecSA(self, scapy_tun_sa_id, scapy_tun_spi,
133 auth_algo_vpp_id, auth_key,
134 crypt_algo_vpp_id, crypt_key,
135 self.vpp_ah_protocol,
136 self.tun_if.local_addr[addr_type],
137 self.tun_if.remote_addr[addr_type],
142 params.tun_sa_out = VppIpsecSA(self, vpp_tun_sa_id, vpp_tun_spi,
143 auth_algo_vpp_id, auth_key,
144 crypt_algo_vpp_id, crypt_key,
145 self.vpp_ah_protocol,
146 self.tun_if.remote_addr[addr_type],
147 self.tun_if.local_addr[addr_type],
152 objs.append(params.tun_sa_in)
153 objs.append(params.tun_sa_out)
155 params.spd_policy_in_any = VppIpsecSpdEntry(self, self.tun_spd,
157 addr_any, addr_bcast,
158 addr_any, addr_bcast,
160 params.spd_policy_out_any = VppIpsecSpdEntry(self, self.tun_spd,
162 addr_any, addr_bcast,
163 addr_any, addr_bcast,
167 objs.append(params.spd_policy_out_any)
168 objs.append(params.spd_policy_in_any)
170 e1 = VppIpsecSpdEntry(self, self.tun_spd, vpp_tun_sa_id,
173 self.pg1.remote_addr[addr_type],
174 self.pg1.remote_addr[addr_type],
176 policy=e.IPSEC_API_SPD_ACTION_PROTECT,
178 e2 = VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
179 self.pg1.remote_addr[addr_type],
180 self.pg1.remote_addr[addr_type],
183 0, policy=e.IPSEC_API_SPD_ACTION_PROTECT,
185 e3 = VppIpsecSpdEntry(self, self.tun_spd, vpp_tun_sa_id,
188 self.pg0.local_addr[addr_type],
189 self.pg0.local_addr[addr_type],
191 policy=e.IPSEC_API_SPD_ACTION_PROTECT,
193 e4 = VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
194 self.pg0.local_addr[addr_type],
195 self.pg0.local_addr[addr_type],
198 0, policy=e.IPSEC_API_SPD_ACTION_PROTECT,
201 objs = objs + [e1, e2, e3, e4]
206 self.net_objs = self.net_objs + objs
208 def config_ah_tra(self, params):
209 addr_type = params.addr_type
210 scapy_tra_sa_id = params.scapy_tra_sa_id
211 scapy_tra_spi = params.scapy_tra_spi
212 vpp_tra_sa_id = params.vpp_tra_sa_id
213 vpp_tra_spi = params.vpp_tra_spi
214 auth_algo_vpp_id = params.auth_algo_vpp_id
215 auth_key = params.auth_key
216 crypt_algo_vpp_id = params.crypt_algo_vpp_id
217 crypt_key = params.crypt_key
218 addr_any = params.addr_any
219 addr_bcast = params.addr_bcast
220 flags = params.flags | (VppEnum.vl_api_ipsec_sad_flags_t.
221 IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY)
222 e = VppEnum.vl_api_ipsec_spd_action_t
225 params.tra_sa_in = VppIpsecSA(self, scapy_tra_sa_id, scapy_tra_spi,
226 auth_algo_vpp_id, auth_key,
227 crypt_algo_vpp_id, crypt_key,
228 self.vpp_ah_protocol,
230 params.tra_sa_out = VppIpsecSA(self, vpp_tra_sa_id, vpp_tra_spi,
231 auth_algo_vpp_id, auth_key,
232 crypt_algo_vpp_id, crypt_key,
233 self.vpp_ah_protocol,
236 objs.append(params.tra_sa_in)
237 objs.append(params.tra_sa_out)
239 objs.append(VppIpsecSpdEntry(self, self.tra_spd, vpp_tra_sa_id,
240 addr_any, addr_bcast,
241 addr_any, addr_bcast,
243 objs.append(VppIpsecSpdEntry(self, self.tra_spd, scapy_tra_sa_id,
244 addr_any, addr_bcast,
245 addr_any, addr_bcast,
248 objs.append(VppIpsecSpdEntry(self, self.tra_spd, vpp_tra_sa_id,
249 self.tra_if.local_addr[addr_type],
250 self.tra_if.local_addr[addr_type],
251 self.tra_if.remote_addr[addr_type],
252 self.tra_if.remote_addr[addr_type],
254 policy=e.IPSEC_API_SPD_ACTION_PROTECT,
256 objs.append(VppIpsecSpdEntry(self, self.tra_spd, scapy_tra_sa_id,
257 self.tra_if.local_addr[addr_type],
258 self.tra_if.local_addr[addr_type],
259 self.tra_if.remote_addr[addr_type],
260 self.tra_if.remote_addr[addr_type],
261 0, policy=e.IPSEC_API_SPD_ACTION_PROTECT,
266 self.net_objs = self.net_objs + objs
269 class TemplateIpsecAh(ConfigIpsecAH):
271 Basic test for IPSEC using AH transport and Tunnel mode
276 |pg2| <-------> |VPP|
281 --- encrypt --- plain ---
282 |pg0| <------- |VPP| <------ |pg1|
285 --- decrypt --- plain ---
286 |pg0| -------> |VPP| ------> |pg1|
291 super(TemplateIpsecAh, cls).setUpClass()
294 def tearDownClass(cls):
295 super(TemplateIpsecAh, cls).tearDownClass()
298 super(TemplateIpsecAh, self).setUp()
299 self.config_network(self.params.values())
302 self.unconfig_network()
303 super(TemplateIpsecAh, self).tearDown()
306 class TestIpsecAh1(TemplateIpsecAh, IpsecTcpTests):
307 """ Ipsec AH - TCP tests """
311 class TestIpsecAh2(TemplateIpsecAh, IpsecTra46Tests, IpsecTun46Tests):
312 """ Ipsec AH w/ SHA1 """
316 class TestIpsecAhTun(TemplateIpsecAh, IpsecTun46Tests):
317 """ Ipsec AH - TUN encap tests """
320 self.ipv4_params = IPsecIPv4Params()
321 self.ipv6_params = IPsecIPv6Params()
323 c = (VppEnum.vl_api_tunnel_encap_decap_flags_t.
324 TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_DSCP)
325 c1 = c | (VppEnum.vl_api_tunnel_encap_decap_flags_t.
326 TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_ECN)
328 self.ipv4_params.tun_flags = c
329 self.ipv6_params.tun_flags = c1
331 super(TestIpsecAhTun, self).setUp()
333 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
334 # set the DSCP + ECN - flags are set to copy only DSCP
335 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
336 IP(src=src, dst=dst, tos=5) /
337 UDP(sport=4444, dport=4444) /
338 Raw(b'X' * payload_size)
339 for i in range(count)]
341 def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=54):
342 # set the DSCP + ECN - flags are set to copy both
343 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
344 IPv6(src=src, dst=dst, tc=5) /
345 UDP(sport=4444, dport=4444) /
346 Raw(b'X' * payload_size)
347 for i in range(count)]
349 def verify_encrypted(self, p, sa, rxs):
350 # just check that only the DSCP is copied
352 self.assertEqual(rx[IP].tos, 4)
354 def verify_encrypted6(self, p, sa, rxs):
355 # just check that the DSCP & ECN are copied
357 self.assertEqual(rx[IPv6].tc, 5)
360 class TestIpsecAhTun2(TemplateIpsecAh, IpsecTun46Tests):
361 """ Ipsec AH - TUN encap tests """
364 self.ipv4_params = IPsecIPv4Params()
365 self.ipv6_params = IPsecIPv6Params()
367 self.ipv4_params.dscp = 3
368 self.ipv6_params.dscp = 4
370 super(TestIpsecAhTun2, self).setUp()
372 def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
373 # set the DSCP + ECN - flags are set to copy only DSCP
374 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
375 IP(src=src, dst=dst, tos=0) /
376 UDP(sport=4444, dport=4444) /
377 Raw(b'X' * payload_size)
378 for i in range(count)]
380 def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=54):
381 # set the DSCP + ECN - flags are set to copy both
382 return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
383 IPv6(src=src, dst=dst, tc=0) /
384 UDP(sport=4444, dport=4444) /
385 Raw(b'X' * payload_size)
386 for i in range(count)]
388 def verify_encrypted(self, p, sa, rxs):
389 # just check that only the DSCP is copied
391 self.assertEqual(rx[IP].tos, 0xc)
393 def verify_encrypted6(self, p, sa, rxs):
394 # just check that the DSCP & ECN are copied
396 self.assertEqual(rx[IPv6].tc, 0x10)
399 class TestIpsecAhHandoff(TemplateIpsecAh,
400 IpsecTun6HandoffTests,
401 IpsecTun4HandoffTests):
402 """ Ipsec AH Handoff """
406 class TestIpsecAhAll(ConfigIpsecAH,
407 IpsecTra4, IpsecTra6,
408 IpsecTun4, IpsecTun6):
409 """ Ipsec AH all Algos """
412 super(TestIpsecAhAll, self).setUp()
415 super(TestIpsecAhAll, self).tearDown()
417 def test_integ_algs(self):
418 """All Engines SHA[1_96, 256, 384, 512] w/ & w/o ESN"""
419 # foreach VPP crypto engine
420 engines = ["ia32", "ipsecmb", "openssl"]
422 algos = [{'vpp': VppEnum.vl_api_ipsec_integ_alg_t.
423 IPSEC_API_INTEG_ALG_SHA1_96,
424 'scapy': "HMAC-SHA1-96"},
425 {'vpp': VppEnum.vl_api_ipsec_integ_alg_t.
426 IPSEC_API_INTEG_ALG_SHA_256_128,
427 'scapy': "SHA2-256-128"},
428 {'vpp': VppEnum.vl_api_ipsec_integ_alg_t.
429 IPSEC_API_INTEG_ALG_SHA_384_192,
430 'scapy': "SHA2-384-192"},
431 {'vpp': VppEnum.vl_api_ipsec_integ_alg_t.
432 IPSEC_API_INTEG_ALG_SHA_512_256,
433 'scapy': "SHA2-512-256"}]
435 flags = [0, (VppEnum.vl_api_ipsec_sad_flags_t.
436 IPSEC_API_SAD_FLAG_USE_ESN)]
439 # loop through the VPP engines
441 for engine in engines:
442 self.vapi.cli("set crypto handler all %s" % engine)
444 # loop through each of the algorithms
447 # with self.subTest(algo=algo['scapy']):
450 # setup up the config paramters
452 self.ipv4_params = IPsecIPv4Params()
453 self.ipv6_params = IPsecIPv6Params()
455 self.params = {self.ipv4_params.addr_type:
457 self.ipv6_params.addr_type:
460 for _, p in self.params.items():
461 p.auth_algo_vpp_id = algo['vpp']
462 p.auth_algo = algo['scapy']
463 p.flags = p.flags | flag
466 # configure the SPDs. SAs, etc
468 self.config_network(self.params.values())
472 # An exhautsive 4o6, 6o4 is not necessary for each algo
474 self.verify_tra_basic6(count=17)
475 self.verify_tra_basic4(count=17)
476 self.verify_tun_66(self.params[socket.AF_INET6], count=17)
477 self.verify_tun_44(self.params[socket.AF_INET], count=17)
480 # remove the SPDs, SAs, etc
482 self.unconfig_network()
485 if __name__ == '__main__':
486 unittest.main(testRunner=VppTestRunner)