TEST: IPSEC NAT-T with UDP header
[vpp.git] / test / test_ipsec_ah.py
1 import socket
2 import unittest
3
4 from scapy.layers.ipsec import AH
5
6 from framework import VppTestRunner
7 from template_ipsec import TemplateIpsec, IpsecTra46Tests, IpsecTun46Tests
8 from template_ipsec import IpsecTcpTests
9 from vpp_ipsec import VppIpsecSA, VppIpsecSpd, VppIpsecSpdEntry,\
10         VppIpsecSpdItfBinding
11 from vpp_ip_route import VppIpRoute, VppRoutePath
12 from vpp_ip import DpoProto
13 from vpp_papi import VppEnum
14
15
16 class TemplateIpsecAh(TemplateIpsec):
17     """
18     Basic test for IPSEC using AH transport and Tunnel mode
19
20     TRANSPORT MODE:
21
22      ---   encrypt   ---
23     |pg2| <-------> |VPP|
24      ---   decrypt   ---
25
26     TUNNEL MODE:
27
28      ---   encrypt   ---   plain   ---
29     |pg0| <-------  |VPP| <------ |pg1|
30      ---             ---           ---
31
32      ---   decrypt   ---   plain   ---
33     |pg0| ------->  |VPP| ------> |pg1|
34      ---             ---           ---
35     """
36
37     def setUp(self):
38         super(TemplateIpsecAh, self).setUp()
39
40         self.encryption_type = AH
41         self.tun_if = self.pg0
42         self.tra_if = self.pg2
43         self.logger.info(self.vapi.ppcli("show int addr"))
44
45         self.tra_spd = VppIpsecSpd(self, self.tra_spd_id)
46         self.tra_spd.add_vpp_config()
47         VppIpsecSpdItfBinding(self, self.tra_spd,
48                               self.tra_if).add_vpp_config()
49         self.tun_spd = VppIpsecSpd(self, self.tun_spd_id)
50         self.tun_spd.add_vpp_config()
51         VppIpsecSpdItfBinding(self, self.tun_spd,
52                               self.tun_if).add_vpp_config()
53
54         for _, p in self.params.items():
55             self.config_ah_tra(p)
56             self.configure_sa_tra(p)
57             self.logger.info(self.vapi.ppcli("show ipsec"))
58         for _, p in self.params.items():
59             self.config_ah_tun(p)
60             self.logger.info(self.vapi.ppcli("show ipsec"))
61         for _, p in self.params.items():
62             d = DpoProto.DPO_PROTO_IP6 if p.is_ipv6 else DpoProto.DPO_PROTO_IP4
63             VppIpRoute(self,  p.remote_tun_if_host, p.addr_len,
64                        [VppRoutePath(self.tun_if.remote_addr[p.addr_type],
65                                      0xffffffff,
66                                      proto=d)],
67                        is_ip6=p.is_ipv6).add_vpp_config()
68
69     def tearDown(self):
70         super(TemplateIpsecAh, self).tearDown()
71         if not self.vpp_dead:
72             self.vapi.cli("show hardware")
73
74     def config_ah_tun(self, params):
75         addr_type = params.addr_type
76         scapy_tun_sa_id = params.scapy_tun_sa_id
77         scapy_tun_spi = params.scapy_tun_spi
78         vpp_tun_sa_id = params.vpp_tun_sa_id
79         vpp_tun_spi = params.vpp_tun_spi
80         auth_algo_vpp_id = params.auth_algo_vpp_id
81         auth_key = params.auth_key
82         crypt_algo_vpp_id = params.crypt_algo_vpp_id
83         crypt_key = params.crypt_key
84         remote_tun_if_host = params.remote_tun_if_host
85         addr_any = params.addr_any
86         addr_bcast = params.addr_bcast
87         e = VppEnum.vl_api_ipsec_spd_action_t
88
89         params.tun_sa_in = VppIpsecSA(self, scapy_tun_sa_id, scapy_tun_spi,
90                                       auth_algo_vpp_id, auth_key,
91                                       crypt_algo_vpp_id, crypt_key,
92                                       self.vpp_ah_protocol,
93                                       self.tun_if.local_addr[addr_type],
94                                       self.tun_if.remote_addr[addr_type])
95         params.tun_sa_in.add_vpp_config()
96         params.tun_sa_out = VppIpsecSA(self, vpp_tun_sa_id, vpp_tun_spi,
97                                        auth_algo_vpp_id, auth_key,
98                                        crypt_algo_vpp_id, crypt_key,
99                                        self.vpp_ah_protocol,
100                                        self.tun_if.remote_addr[addr_type],
101                                        self.tun_if.local_addr[addr_type])
102         params.tun_sa_out.add_vpp_config()
103
104         params.spd_policy_in_any = VppIpsecSpdEntry(self, self.tun_spd,
105                                                     vpp_tun_sa_id,
106                                                     addr_any, addr_bcast,
107                                                     addr_any, addr_bcast,
108                                                     socket.IPPROTO_AH)
109         params.spd_policy_in_any.add_vpp_config()
110         params.spd_policy_out_any = VppIpsecSpdEntry(self, self.tun_spd,
111                                                      vpp_tun_sa_id,
112                                                      addr_any, addr_bcast,
113                                                      addr_any, addr_bcast,
114                                                      socket.IPPROTO_AH,
115                                                      is_outbound=0)
116         params.spd_policy_out_any.add_vpp_config()
117
118         VppIpsecSpdEntry(self, self.tun_spd, vpp_tun_sa_id,
119                          remote_tun_if_host,
120                          remote_tun_if_host,
121                          self.pg1.remote_addr[addr_type],
122                          self.pg1.remote_addr[addr_type],
123                          0, priority=10,
124                          policy=e.IPSEC_API_SPD_ACTION_PROTECT,
125                          is_outbound=0).add_vpp_config()
126         VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
127                          self.pg1.remote_addr[addr_type],
128                          self.pg1.remote_addr[addr_type],
129                          remote_tun_if_host,
130                          remote_tun_if_host,
131                          0, policy=e.IPSEC_API_SPD_ACTION_PROTECT,
132                          priority=10).add_vpp_config()
133
134         VppIpsecSpdEntry(self, self.tun_spd, vpp_tun_sa_id,
135                          remote_tun_if_host,
136                          remote_tun_if_host,
137                          self.pg0.local_addr[addr_type],
138                          self.pg0.local_addr[addr_type],
139                          0, priority=20,
140                          policy=e.IPSEC_API_SPD_ACTION_PROTECT,
141                          is_outbound=0).add_vpp_config()
142         VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
143                          self.pg0.local_addr[addr_type],
144                          self.pg0.local_addr[addr_type],
145                          remote_tun_if_host,
146                          remote_tun_if_host,
147                          0, policy=e.IPSEC_API_SPD_ACTION_PROTECT,
148                          priority=20).add_vpp_config()
149
150     def config_ah_tra(self, params):
151         addr_type = params.addr_type
152         scapy_tra_sa_id = params.scapy_tra_sa_id
153         scapy_tra_spi = params.scapy_tra_spi
154         vpp_tra_sa_id = params.vpp_tra_sa_id
155         vpp_tra_spi = params.vpp_tra_spi
156         auth_algo_vpp_id = params.auth_algo_vpp_id
157         auth_key = params.auth_key
158         crypt_algo_vpp_id = params.crypt_algo_vpp_id
159         crypt_key = params.crypt_key
160         addr_any = params.addr_any
161         addr_bcast = params.addr_bcast
162         flags = (VppEnum.vl_api_ipsec_sad_flags_t.
163                  IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY)
164         e = VppEnum.vl_api_ipsec_spd_action_t
165
166         params.tra_sa_in = VppIpsecSA(self, scapy_tra_sa_id, scapy_tra_spi,
167                                       auth_algo_vpp_id, auth_key,
168                                       crypt_algo_vpp_id, crypt_key,
169                                       self.vpp_ah_protocol,
170                                       flags=flags)
171         params.tra_sa_in.add_vpp_config()
172         params.tra_sa_out = VppIpsecSA(self, vpp_tra_sa_id, vpp_tra_spi,
173                                        auth_algo_vpp_id, auth_key,
174                                        crypt_algo_vpp_id, crypt_key,
175                                        self.vpp_ah_protocol,
176                                        flags=flags)
177         params.tra_sa_out.add_vpp_config()
178
179         VppIpsecSpdEntry(self, self.tra_spd, vpp_tra_sa_id,
180                          addr_any, addr_bcast,
181                          addr_any, addr_bcast,
182                          socket.IPPROTO_AH).add_vpp_config()
183         VppIpsecSpdEntry(self, self.tra_spd, scapy_tra_sa_id,
184                          addr_any, addr_bcast,
185                          addr_any, addr_bcast,
186                          socket.IPPROTO_AH,
187                          is_outbound=0).add_vpp_config()
188
189         VppIpsecSpdEntry(self, self.tra_spd, vpp_tra_sa_id,
190                          self.tra_if.local_addr[addr_type],
191                          self.tra_if.local_addr[addr_type],
192                          self.tra_if.remote_addr[addr_type],
193                          self.tra_if.remote_addr[addr_type],
194                          0, priority=10,
195                          policy=e.IPSEC_API_SPD_ACTION_PROTECT,
196                          is_outbound=0).add_vpp_config()
197         VppIpsecSpdEntry(self, self.tra_spd, scapy_tra_sa_id,
198                          self.tra_if.local_addr[addr_type],
199                          self.tra_if.local_addr[addr_type],
200                          self.tra_if.remote_addr[addr_type],
201                          self.tra_if.remote_addr[addr_type],
202                          0, policy=e.IPSEC_API_SPD_ACTION_PROTECT,
203                          priority=10).add_vpp_config()
204
205
206 class TestIpsecAh1(TemplateIpsecAh, IpsecTra46Tests, IpsecTun46Tests):
207     """ Ipsec AH - TUN & TRA tests """
208     tra4_encrypt_node_name = "ah4-encrypt"
209     tra4_decrypt_node_name = "ah4-decrypt"
210     tra6_encrypt_node_name = "ah6-encrypt"
211     tra6_decrypt_node_name = "ah6-decrypt"
212     tun4_encrypt_node_name = "ah4-encrypt"
213     tun4_decrypt_node_name = "ah4-decrypt"
214     tun6_encrypt_node_name = "ah6-encrypt"
215     tun6_decrypt_node_name = "ah6-decrypt"
216
217
218 class TestIpsecAh2(TemplateIpsecAh, IpsecTcpTests):
219     """ Ipsec AH - TCP tests """
220     pass
221
222
223 if __name__ == '__main__':
224     unittest.main(testRunner=VppTestRunner)