tests docs: upgrade python packages
[vpp.git] / test / test_ipsec_ah.py
1 import socket
2 import unittest
3
4 from scapy.layers.ipsec import AH
5 from scapy.layers.inet import IP, UDP
6 from scapy.layers.inet6 import IPv6
7 from scapy.layers.l2 import Ether
8 from scapy.packet import Raw
9
10 from framework import VppTestRunner
11 from template_ipsec import TemplateIpsec, IpsecTra46Tests, IpsecTun46Tests, \
12     config_tun_params, config_tra_params, IPsecIPv4Params, IPsecIPv6Params, \
13     IpsecTra4, IpsecTun4, IpsecTra6, IpsecTun6, \
14     IpsecTun6HandoffTests, IpsecTun4HandoffTests
15 from template_ipsec import IpsecTcpTests
16 from vpp_ipsec import VppIpsecSA, VppIpsecSpd, VppIpsecSpdEntry,\
17         VppIpsecSpdItfBinding
18 from vpp_ip_route import VppIpRoute, VppRoutePath
19 from vpp_ip import DpoProto
20 from vpp_papi import VppEnum
21
22
23 class ConfigIpsecAH(TemplateIpsec):
24     """
25     Basic test for IPSEC using AH transport and Tunnel mode
26
27     TRANSPORT MODE::
28
29          ---   encrypt   ---
30         |pg2| <-------> |VPP|
31          ---   decrypt   ---
32
33     TUNNEL MODE::
34
35          ---   encrypt   ---   plain   ---
36         |pg0| <-------  |VPP| <------ |pg1|
37          ---             ---           ---
38
39          ---   decrypt   ---   plain   ---
40         |pg0| ------->  |VPP| ------> |pg1|
41          ---             ---           ---
42
43     """
44     encryption_type = AH
45     net_objs = []
46     tra4_encrypt_node_name = "ah4-encrypt"
47     tra4_decrypt_node_name = ["ah4-decrypt", "ah4-decrypt"]
48     tra6_encrypt_node_name = "ah6-encrypt"
49     tra6_decrypt_node_name = ["ah6-decrypt", "ah6-decrypt"]
50     tun4_encrypt_node_name = "ah4-encrypt"
51     tun4_decrypt_node_name = ["ah4-decrypt", "ah4-decrypt"]
52     tun6_encrypt_node_name = "ah6-encrypt"
53     tun6_decrypt_node_name = ["ah6-decrypt", "ah6-decrypt"]
54
55     @classmethod
56     def setUpClass(cls):
57         super(ConfigIpsecAH, cls).setUpClass()
58
59     @classmethod
60     def tearDownClass(cls):
61         super(ConfigIpsecAH, cls).tearDownClass()
62
63     def setUp(self):
64         super(ConfigIpsecAH, self).setUp()
65
66     def tearDown(self):
67         super(ConfigIpsecAH, self).tearDown()
68
69     def config_network(self, params):
70         self.net_objs = []
71         self.tun_if = self.pg0
72         self.tra_if = self.pg2
73         self.logger.info(self.vapi.ppcli("show int addr"))
74
75         self.tra_spd = VppIpsecSpd(self, self.tra_spd_id)
76         self.tra_spd.add_vpp_config()
77         self.net_objs.append(self.tra_spd)
78         self.tun_spd = VppIpsecSpd(self, self.tun_spd_id)
79         self.tun_spd.add_vpp_config()
80         self.net_objs.append(self.tun_spd)
81
82         b = VppIpsecSpdItfBinding(self, self.tra_spd,
83                                   self.tra_if)
84         b.add_vpp_config()
85         self.net_objs.append(b)
86
87         b = VppIpsecSpdItfBinding(self, self.tun_spd,
88                                   self.tun_if)
89         b.add_vpp_config()
90         self.net_objs.append(b)
91
92         for p in params:
93             self.config_ah_tra(p)
94             config_tra_params(p, self.encryption_type)
95         for p in params:
96             self.config_ah_tun(p)
97             config_tun_params(p, self.encryption_type, self.tun_if)
98         for p in params:
99             d = DpoProto.DPO_PROTO_IP6 if p.is_ipv6 else DpoProto.DPO_PROTO_IP4
100             r = VppIpRoute(self,  p.remote_tun_if_host, p.addr_len,
101                            [VppRoutePath(self.tun_if.remote_addr[p.addr_type],
102                                          0xffffffff,
103                                          proto=d)])
104             r.add_vpp_config()
105             self.net_objs.append(r)
106         self.logger.info(self.vapi.ppcli("show ipsec all"))
107
108     def unconfig_network(self):
109         for o in reversed(self.net_objs):
110             o.remove_vpp_config()
111         self.net_objs = []
112
113     def config_ah_tun(self, params):
114         addr_type = params.addr_type
115         scapy_tun_sa_id = params.scapy_tun_sa_id
116         scapy_tun_spi = params.scapy_tun_spi
117         vpp_tun_sa_id = params.vpp_tun_sa_id
118         vpp_tun_spi = params.vpp_tun_spi
119         auth_algo_vpp_id = params.auth_algo_vpp_id
120         auth_key = params.auth_key
121         crypt_algo_vpp_id = params.crypt_algo_vpp_id
122         crypt_key = params.crypt_key
123         remote_tun_if_host = params.remote_tun_if_host
124         addr_any = params.addr_any
125         addr_bcast = params.addr_bcast
126         flags = params.flags
127         tun_flags = params.tun_flags
128         e = VppEnum.vl_api_ipsec_spd_action_t
129         objs = []
130         params.outer_hop_limit = 253
131         params.outer_flow_label = 0x12345
132
133         params.tun_sa_in = VppIpsecSA(self, scapy_tun_sa_id, scapy_tun_spi,
134                                       auth_algo_vpp_id, auth_key,
135                                       crypt_algo_vpp_id, crypt_key,
136                                       self.vpp_ah_protocol,
137                                       self.tun_if.local_addr[addr_type],
138                                       self.tun_if.remote_addr[addr_type],
139                                       tun_flags=tun_flags,
140                                       flags=flags,
141                                       dscp=params.dscp)
142
143         params.tun_sa_out = VppIpsecSA(self, vpp_tun_sa_id, vpp_tun_spi,
144                                        auth_algo_vpp_id, auth_key,
145                                        crypt_algo_vpp_id, crypt_key,
146                                        self.vpp_ah_protocol,
147                                        self.tun_if.remote_addr[addr_type],
148                                        self.tun_if.local_addr[addr_type],
149                                        tun_flags=tun_flags,
150                                        flags=flags,
151                                        dscp=params.dscp)
152
153         objs.append(params.tun_sa_in)
154         objs.append(params.tun_sa_out)
155
156         params.spd_policy_in_any = VppIpsecSpdEntry(self, self.tun_spd,
157                                                     vpp_tun_sa_id,
158                                                     addr_any, addr_bcast,
159                                                     addr_any, addr_bcast,
160                                                     socket.IPPROTO_AH)
161         params.spd_policy_out_any = VppIpsecSpdEntry(self, self.tun_spd,
162                                                      vpp_tun_sa_id,
163                                                      addr_any, addr_bcast,
164                                                      addr_any, addr_bcast,
165                                                      socket.IPPROTO_AH,
166                                                      is_outbound=0)
167
168         objs.append(params.spd_policy_out_any)
169         objs.append(params.spd_policy_in_any)
170
171         e1 = VppIpsecSpdEntry(self, self.tun_spd, vpp_tun_sa_id,
172                               remote_tun_if_host,
173                               remote_tun_if_host,
174                               self.pg1.remote_addr[addr_type],
175                               self.pg1.remote_addr[addr_type],
176                               0, priority=10,
177                               policy=e.IPSEC_API_SPD_ACTION_PROTECT,
178                               is_outbound=0)
179         e2 = VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
180                               self.pg1.remote_addr[addr_type],
181                               self.pg1.remote_addr[addr_type],
182                               remote_tun_if_host,
183                               remote_tun_if_host,
184                               0, policy=e.IPSEC_API_SPD_ACTION_PROTECT,
185                               priority=10)
186         e3 = VppIpsecSpdEntry(self, self.tun_spd, vpp_tun_sa_id,
187                               remote_tun_if_host,
188                               remote_tun_if_host,
189                               self.pg0.local_addr[addr_type],
190                               self.pg0.local_addr[addr_type],
191                               0, priority=20,
192                               policy=e.IPSEC_API_SPD_ACTION_PROTECT,
193                               is_outbound=0)
194         e4 = VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
195                               self.pg0.local_addr[addr_type],
196                               self.pg0.local_addr[addr_type],
197                               remote_tun_if_host,
198                               remote_tun_if_host,
199                               0, policy=e.IPSEC_API_SPD_ACTION_PROTECT,
200                               priority=20)
201
202         objs = objs + [e1, e2, e3, e4]
203
204         for o in objs:
205             o.add_vpp_config()
206
207         self.net_objs = self.net_objs + objs
208
209     def config_ah_tra(self, params):
210         addr_type = params.addr_type
211         scapy_tra_sa_id = params.scapy_tra_sa_id
212         scapy_tra_spi = params.scapy_tra_spi
213         vpp_tra_sa_id = params.vpp_tra_sa_id
214         vpp_tra_spi = params.vpp_tra_spi
215         auth_algo_vpp_id = params.auth_algo_vpp_id
216         auth_key = params.auth_key
217         crypt_algo_vpp_id = params.crypt_algo_vpp_id
218         crypt_key = params.crypt_key
219         addr_any = params.addr_any
220         addr_bcast = params.addr_bcast
221         flags = params.flags | (VppEnum.vl_api_ipsec_sad_flags_t.
222                                 IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY)
223         e = VppEnum.vl_api_ipsec_spd_action_t
224         objs = []
225
226         params.tra_sa_in = VppIpsecSA(self, scapy_tra_sa_id, scapy_tra_spi,
227                                       auth_algo_vpp_id, auth_key,
228                                       crypt_algo_vpp_id, crypt_key,
229                                       self.vpp_ah_protocol,
230                                       flags=flags)
231         params.tra_sa_out = VppIpsecSA(self, vpp_tra_sa_id, vpp_tra_spi,
232                                        auth_algo_vpp_id, auth_key,
233                                        crypt_algo_vpp_id, crypt_key,
234                                        self.vpp_ah_protocol,
235                                        flags=flags)
236
237         objs.append(params.tra_sa_in)
238         objs.append(params.tra_sa_out)
239
240         objs.append(VppIpsecSpdEntry(self, self.tra_spd, vpp_tra_sa_id,
241                                      addr_any, addr_bcast,
242                                      addr_any, addr_bcast,
243                                      socket.IPPROTO_AH))
244         objs.append(VppIpsecSpdEntry(self, self.tra_spd, scapy_tra_sa_id,
245                                      addr_any, addr_bcast,
246                                      addr_any, addr_bcast,
247                                      socket.IPPROTO_AH,
248                                      is_outbound=0))
249         objs.append(VppIpsecSpdEntry(self, self.tra_spd, vpp_tra_sa_id,
250                                      self.tra_if.local_addr[addr_type],
251                                      self.tra_if.local_addr[addr_type],
252                                      self.tra_if.remote_addr[addr_type],
253                                      self.tra_if.remote_addr[addr_type],
254                                      0, priority=10,
255                                      policy=e.IPSEC_API_SPD_ACTION_PROTECT,
256                                      is_outbound=0))
257         objs.append(VppIpsecSpdEntry(self, self.tra_spd, scapy_tra_sa_id,
258                                      self.tra_if.local_addr[addr_type],
259                                      self.tra_if.local_addr[addr_type],
260                                      self.tra_if.remote_addr[addr_type],
261                                      self.tra_if.remote_addr[addr_type],
262                                      0, policy=e.IPSEC_API_SPD_ACTION_PROTECT,
263                                      priority=10))
264
265         for o in objs:
266             o.add_vpp_config()
267         self.net_objs = self.net_objs + objs
268
269
270 class TemplateIpsecAh(ConfigIpsecAH):
271     """
272     Basic test for IPSEC using AH transport and Tunnel mode
273
274     TRANSPORT MODE::
275
276          ---   encrypt   ---
277         |pg2| <-------> |VPP|
278          ---   decrypt   ---
279
280     TUNNEL MODE::
281
282          ---   encrypt   ---   plain   ---
283         |pg0| <-------  |VPP| <------ |pg1|
284          ---             ---           ---
285
286          ---   decrypt   ---   plain   ---
287         |pg0| ------->  |VPP| ------> |pg1|
288          ---             ---           ---
289
290     """
291     @classmethod
292     def setUpClass(cls):
293         super(TemplateIpsecAh, cls).setUpClass()
294
295     @classmethod
296     def tearDownClass(cls):
297         super(TemplateIpsecAh, cls).tearDownClass()
298
299     def setUp(self):
300         super(TemplateIpsecAh, self).setUp()
301         self.config_network(self.params.values())
302
303     def tearDown(self):
304         self.unconfig_network()
305         super(TemplateIpsecAh, self).tearDown()
306
307
308 class TestIpsecAh1(TemplateIpsecAh, IpsecTcpTests):
309     """ Ipsec AH - TCP tests """
310     pass
311
312
313 class TestIpsecAh2(TemplateIpsecAh, IpsecTra46Tests, IpsecTun46Tests):
314     """ Ipsec AH w/ SHA1 """
315     pass
316
317
318 class TestIpsecAhTun(TemplateIpsecAh, IpsecTun46Tests):
319     """ Ipsec AH - TUN encap tests """
320
321     def setUp(self):
322         self.ipv4_params = IPsecIPv4Params()
323         self.ipv6_params = IPsecIPv6Params()
324
325         c = (VppEnum.vl_api_tunnel_encap_decap_flags_t.
326              TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_DSCP)
327         c1 = c | (VppEnum.vl_api_tunnel_encap_decap_flags_t.
328                   TUNNEL_API_ENCAP_DECAP_FLAG_ENCAP_COPY_ECN)
329
330         self.ipv4_params.tun_flags = c
331         self.ipv6_params.tun_flags = c1
332
333         super(TestIpsecAhTun, self).setUp()
334
335     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
336         # set the DSCP + ECN - flags are set to copy only DSCP
337         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
338                 IP(src=src, dst=dst, tos=5) /
339                 UDP(sport=4444, dport=4444) /
340                 Raw(b'X' * payload_size)
341                 for i in range(count)]
342
343     def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=54):
344         # set the DSCP + ECN - flags are set to copy both
345         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
346                 IPv6(src=src, dst=dst, tc=5) /
347                 UDP(sport=4444, dport=4444) /
348                 Raw(b'X' * payload_size)
349                 for i in range(count)]
350
351     def verify_encrypted(self, p, sa, rxs):
352         # just check that only the DSCP is copied
353         for rx in rxs:
354             self.assertEqual(rx[IP].tos, 4)
355
356     def verify_encrypted6(self, p, sa, rxs):
357         # just check that the DSCP & ECN are copied
358         for rx in rxs:
359             self.assertEqual(rx[IPv6].tc, 5)
360
361
362 class TestIpsecAhTun2(TemplateIpsecAh, IpsecTun46Tests):
363     """ Ipsec AH - TUN encap tests """
364
365     def setUp(self):
366         self.ipv4_params = IPsecIPv4Params()
367         self.ipv6_params = IPsecIPv6Params()
368
369         self.ipv4_params.dscp = 3
370         self.ipv6_params.dscp = 4
371
372         super(TestIpsecAhTun2, self).setUp()
373
374     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
375         # set the DSCP + ECN - flags are set to copy only DSCP
376         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
377                 IP(src=src, dst=dst, tos=0) /
378                 UDP(sport=4444, dport=4444) /
379                 Raw(b'X' * payload_size)
380                 for i in range(count)]
381
382     def gen_pkts6(self, p, sw_intf, src, dst, count=1, payload_size=54):
383         # set the DSCP + ECN - flags are set to copy both
384         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
385                 IPv6(src=src, dst=dst, tc=0) /
386                 UDP(sport=4444, dport=4444) /
387                 Raw(b'X' * payload_size)
388                 for i in range(count)]
389
390     def verify_encrypted(self, p, sa, rxs):
391         # just check that only the DSCP is copied
392         for rx in rxs:
393             self.assertEqual(rx[IP].tos, 0xc)
394
395     def verify_encrypted6(self, p, sa, rxs):
396         # just check that the DSCP & ECN are copied
397         for rx in rxs:
398             self.assertEqual(rx[IPv6].tc, 0x10)
399
400
401 class TestIpsecAhHandoff(TemplateIpsecAh,
402                          IpsecTun6HandoffTests,
403                          IpsecTun4HandoffTests):
404     """ Ipsec AH Handoff """
405     pass
406
407
408 class TestIpsecAhAll(ConfigIpsecAH,
409                      IpsecTra4, IpsecTra6,
410                      IpsecTun4, IpsecTun6):
411     """ Ipsec AH all Algos """
412
413     def setUp(self):
414         super(TestIpsecAhAll, self).setUp()
415
416     def tearDown(self):
417         super(TestIpsecAhAll, self).tearDown()
418
419     def test_integ_algs(self):
420         """All Engines SHA[1_96, 256, 384, 512] w/ & w/o ESN"""
421         # foreach VPP crypto engine
422         engines = ["ia32", "ipsecmb", "openssl"]
423
424         algos = [{'vpp': VppEnum.vl_api_ipsec_integ_alg_t.
425                   IPSEC_API_INTEG_ALG_SHA1_96,
426                   'scapy': "HMAC-SHA1-96"},
427                  {'vpp': VppEnum.vl_api_ipsec_integ_alg_t.
428                   IPSEC_API_INTEG_ALG_SHA_256_128,
429                   'scapy': "SHA2-256-128"},
430                  {'vpp': VppEnum.vl_api_ipsec_integ_alg_t.
431                   IPSEC_API_INTEG_ALG_SHA_384_192,
432                   'scapy': "SHA2-384-192"},
433                  {'vpp': VppEnum.vl_api_ipsec_integ_alg_t.
434                   IPSEC_API_INTEG_ALG_SHA_512_256,
435                   'scapy': "SHA2-512-256"}]
436
437         flags = [0, (VppEnum.vl_api_ipsec_sad_flags_t.
438                      IPSEC_API_SAD_FLAG_USE_ESN)]
439
440         #
441         # loop through the VPP engines
442         #
443         for engine in engines:
444             self.vapi.cli("set crypto handler all %s" % engine)
445             #
446             # loop through each of the algorithms
447             #
448             for algo in algos:
449                 # with self.subTest(algo=algo['scapy']):
450                 for flag in flags:
451                     #
452                     # setup up the config paramters
453                     #
454                     self.ipv4_params = IPsecIPv4Params()
455                     self.ipv6_params = IPsecIPv6Params()
456
457                     self.params = {self.ipv4_params.addr_type:
458                                    self.ipv4_params,
459                                    self.ipv6_params.addr_type:
460                                    self.ipv6_params}
461
462                     for _, p in self.params.items():
463                         p.auth_algo_vpp_id = algo['vpp']
464                         p.auth_algo = algo['scapy']
465                         p.flags = p.flags | flag
466
467                     #
468                     # configure the SPDs. SAs, etc
469                     #
470                     self.config_network(self.params.values())
471
472                     #
473                     # run some traffic.
474                     #  An exhautsive 4o6, 6o4 is not necessary for each algo
475                     #
476                     self.verify_tra_basic6(count=17)
477                     self.verify_tra_basic4(count=17)
478                     self.verify_tun_66(self.params[socket.AF_INET6], count=17)
479                     self.verify_tun_44(self.params[socket.AF_INET], count=17)
480
481                     #
482                     # remove the SPDs, SAs, etc
483                     #
484                     self.unconfig_network()
485
486
487 if __name__ == '__main__':
488     unittest.main(testRunner=VppTestRunner)