IPSEC: API modernisation
[vpp.git] / test / test_ipsec_ah.py
1 import socket
2 import unittest
3
4 from scapy.layers.ipsec import AH
5
6 from framework import VppTestRunner
7 from template_ipsec import TemplateIpsec, IpsecTraTests, IpsecTunTests
8 from template_ipsec import IpsecTcpTests
9 from vpp_ipsec import VppIpsecSA, VppIpsecSpd, VppIpsecSpdEntry,\
10         VppIpsecSpdItfBinding
11 from vpp_ip_route import VppIpRoute, VppRoutePath
12 from vpp_ip import DpoProto
13 from vpp_papi import VppEnum
14
15
16 class TemplateIpsecAh(TemplateIpsec):
17     """
18     Basic test for IPSEC using AH transport and Tunnel mode
19
20     TRANSPORT MODE:
21
22      ---   encrypt   ---
23     |pg2| <-------> |VPP|
24      ---   decrypt   ---
25
26     TUNNEL MODE:
27
28      ---   encrypt   ---   plain   ---
29     |pg0| <-------  |VPP| <------ |pg1|
30      ---             ---           ---
31
32      ---   decrypt   ---   plain   ---
33     |pg0| ------->  |VPP| ------> |pg1|
34      ---             ---           ---
35     """
36
37     def setUp(self):
38         super(TemplateIpsecAh, self).setUp()
39
40         self.encryption_type = AH
41         self.tun_if = self.pg0
42         self.tra_if = self.pg2
43         self.logger.info(self.vapi.ppcli("show int addr"))
44
45         self.tra_spd = VppIpsecSpd(self, self.tra_spd_id)
46         self.tra_spd.add_vpp_config()
47         VppIpsecSpdItfBinding(self, self.tra_spd,
48                               self.tra_if).add_vpp_config()
49         self.tun_spd = VppIpsecSpd(self, self.tun_spd_id)
50         self.tun_spd.add_vpp_config()
51         VppIpsecSpdItfBinding(self, self.tun_spd,
52                               self.tun_if).add_vpp_config()
53
54         for _, p in self.params.items():
55             self.config_ah_tra(p)
56             self.configure_sa_tra(p)
57             self.logger.info(self.vapi.ppcli("show ipsec"))
58         for _, p in self.params.items():
59             self.config_ah_tun(p)
60             self.logger.info(self.vapi.ppcli("show ipsec"))
61         for _, p in self.params.items():
62             d = DpoProto.DPO_PROTO_IP6 if p.is_ipv6 else DpoProto.DPO_PROTO_IP4
63             VppIpRoute(self,  p.remote_tun_if_host, p.addr_len,
64                        [VppRoutePath(self.tun_if.remote_addr[p.addr_type],
65                                      0xffffffff,
66                                      proto=d)],
67                        is_ip6=p.is_ipv6).add_vpp_config()
68
69     def tearDown(self):
70         super(TemplateIpsecAh, self).tearDown()
71         if not self.vpp_dead:
72             self.vapi.cli("show hardware")
73
74     def config_ah_tun(self, params):
75         addr_type = params.addr_type
76         scapy_tun_sa_id = params.scapy_tun_sa_id
77         scapy_tun_spi = params.scapy_tun_spi
78         vpp_tun_sa_id = params.vpp_tun_sa_id
79         vpp_tun_spi = params.vpp_tun_spi
80         auth_algo_vpp_id = params.auth_algo_vpp_id
81         auth_key = params.auth_key
82         crypt_algo_vpp_id = params.crypt_algo_vpp_id
83         crypt_key = params.crypt_key
84         remote_tun_if_host = params.remote_tun_if_host
85         addr_any = params.addr_any
86         addr_bcast = params.addr_bcast
87         e = VppEnum.vl_api_ipsec_spd_action_t
88
89         VppIpsecSA(self, scapy_tun_sa_id, scapy_tun_spi,
90                    auth_algo_vpp_id, auth_key,
91                    crypt_algo_vpp_id, crypt_key,
92                    self.vpp_ah_protocol,
93                    self.tun_if.local_addr[addr_type],
94                    self.tun_if.remote_addr[addr_type]).add_vpp_config()
95         VppIpsecSA(self, vpp_tun_sa_id, vpp_tun_spi,
96                    auth_algo_vpp_id, auth_key,
97                    crypt_algo_vpp_id, crypt_key,
98                    self.vpp_ah_protocol,
99                    self.tun_if.remote_addr[addr_type],
100                    self.tun_if.local_addr[addr_type]).add_vpp_config()
101
102         VppIpsecSpdEntry(self, self.tun_spd, vpp_tun_sa_id,
103                          addr_any, addr_bcast,
104                          addr_any, addr_bcast,
105                          socket.IPPROTO_AH).add_vpp_config()
106         VppIpsecSpdEntry(self, self.tun_spd, vpp_tun_sa_id,
107                          addr_any, addr_bcast,
108                          addr_any, addr_bcast,
109                          socket.IPPROTO_AH,
110                          is_outbound=0).add_vpp_config()
111
112         VppIpsecSpdEntry(self, self.tun_spd, vpp_tun_sa_id,
113                          remote_tun_if_host,
114                          remote_tun_if_host,
115                          self.pg1.remote_addr[addr_type],
116                          self.pg1.remote_addr[addr_type],
117                          0, priority=10,
118                          policy=e.IPSEC_API_SPD_ACTION_PROTECT,
119                          is_outbound=0).add_vpp_config()
120         VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
121                          self.pg1.remote_addr[addr_type],
122                          self.pg1.remote_addr[addr_type],
123                          remote_tun_if_host,
124                          remote_tun_if_host,
125                          0, policy=e.IPSEC_API_SPD_ACTION_PROTECT,
126                          priority=10).add_vpp_config()
127
128         VppIpsecSpdEntry(self, self.tun_spd, vpp_tun_sa_id,
129                          remote_tun_if_host,
130                          remote_tun_if_host,
131                          self.pg0.local_addr[addr_type],
132                          self.pg0.local_addr[addr_type],
133                          0, priority=20,
134                          policy=e.IPSEC_API_SPD_ACTION_PROTECT,
135                          is_outbound=0).add_vpp_config()
136         VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
137                          self.pg0.local_addr[addr_type],
138                          self.pg0.local_addr[addr_type],
139                          remote_tun_if_host,
140                          remote_tun_if_host,
141                          0, policy=e.IPSEC_API_SPD_ACTION_PROTECT,
142                          priority=20).add_vpp_config()
143
144     def config_ah_tra(self, params):
145         addr_type = params.addr_type
146         scapy_tra_sa_id = params.scapy_tra_sa_id
147         scapy_tra_spi = params.scapy_tra_spi
148         vpp_tra_sa_id = params.vpp_tra_sa_id
149         vpp_tra_spi = params.vpp_tra_spi
150         auth_algo_vpp_id = params.auth_algo_vpp_id
151         auth_key = params.auth_key
152         crypt_algo_vpp_id = params.crypt_algo_vpp_id
153         crypt_key = params.crypt_key
154         addr_any = params.addr_any
155         addr_bcast = params.addr_bcast
156         flags = (VppEnum.vl_api_ipsec_sad_flags_t.
157                  IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY)
158         e = VppEnum.vl_api_ipsec_spd_action_t
159
160         VppIpsecSA(self, scapy_tra_sa_id, scapy_tra_spi,
161                    auth_algo_vpp_id, auth_key,
162                    crypt_algo_vpp_id, crypt_key,
163                    self.vpp_ah_protocol,
164                    flags=flags).add_vpp_config()
165         VppIpsecSA(self, vpp_tra_sa_id, vpp_tra_spi,
166                    auth_algo_vpp_id, auth_key,
167                    crypt_algo_vpp_id, crypt_key,
168                    self.vpp_ah_protocol,
169                    flags=flags).add_vpp_config()
170
171         VppIpsecSpdEntry(self, self.tra_spd, vpp_tra_sa_id,
172                          addr_any, addr_bcast,
173                          addr_any, addr_bcast,
174                          socket.IPPROTO_AH).add_vpp_config()
175         VppIpsecSpdEntry(self, self.tra_spd, scapy_tra_sa_id,
176                          addr_any, addr_bcast,
177                          addr_any, addr_bcast,
178                          socket.IPPROTO_AH,
179                          is_outbound=0).add_vpp_config()
180
181         VppIpsecSpdEntry(self, self.tra_spd, vpp_tra_sa_id,
182                          self.tra_if.local_addr[addr_type],
183                          self.tra_if.local_addr[addr_type],
184                          self.tra_if.remote_addr[addr_type],
185                          self.tra_if.remote_addr[addr_type],
186                          0, priority=10,
187                          policy=e.IPSEC_API_SPD_ACTION_PROTECT,
188                          is_outbound=0).add_vpp_config()
189         VppIpsecSpdEntry(self, self.tra_spd, scapy_tra_sa_id,
190                          self.tra_if.local_addr[addr_type],
191                          self.tra_if.local_addr[addr_type],
192                          self.tra_if.remote_addr[addr_type],
193                          self.tra_if.remote_addr[addr_type],
194                          0, policy=e.IPSEC_API_SPD_ACTION_PROTECT,
195                          priority=10).add_vpp_config()
196
197
198 class TestIpsecAh1(TemplateIpsecAh, IpsecTraTests, IpsecTunTests):
199     """ Ipsec AH - TUN & TRA tests """
200     tra4_encrypt_node_name = "ah4-encrypt"
201     tra4_decrypt_node_name = "ah4-decrypt"
202     tra6_encrypt_node_name = "ah6-encrypt"
203     tra6_decrypt_node_name = "ah6-decrypt"
204     tun4_encrypt_node_name = "ah4-encrypt"
205     tun4_decrypt_node_name = "ah4-decrypt"
206     tun6_encrypt_node_name = "ah6-encrypt"
207     tun6_decrypt_node_name = "ah6-decrypt"
208
209
210 class TestIpsecAh2(TemplateIpsecAh, IpsecTcpTests):
211     """ Ipsec AH - TCP tests """
212     pass
213
214
215 if __name__ == '__main__':
216     unittest.main(testRunner=VppTestRunner)