IPSEC: SPD counters in the stats sgement
[vpp.git] / test / test_ipsec_ah.py
1 import socket
2 import unittest
3
4 from scapy.layers.ipsec import AH
5
6 from framework import VppTestRunner
7 from template_ipsec import TemplateIpsec, IpsecTraTests, IpsecTunTests
8 from template_ipsec import IpsecTcpTests
9 from vpp_ipsec import VppIpsecSA, VppIpsecSpd, VppIpsecSpdEntry,\
10         VppIpsecSpdItfBinding
11 from vpp_ip_route import VppIpRoute, VppRoutePath
12 from vpp_ip import DpoProto
13 from vpp_papi import VppEnum
14
15
16 class TemplateIpsecAh(TemplateIpsec):
17     """
18     Basic test for IPSEC using AH transport and Tunnel mode
19
20     TRANSPORT MODE:
21
22      ---   encrypt   ---
23     |pg2| <-------> |VPP|
24      ---   decrypt   ---
25
26     TUNNEL MODE:
27
28      ---   encrypt   ---   plain   ---
29     |pg0| <-------  |VPP| <------ |pg1|
30      ---             ---           ---
31
32      ---   decrypt   ---   plain   ---
33     |pg0| ------->  |VPP| ------> |pg1|
34      ---             ---           ---
35     """
36
37     def setUp(self):
38         super(TemplateIpsecAh, self).setUp()
39
40         self.encryption_type = AH
41         self.tun_if = self.pg0
42         self.tra_if = self.pg2
43         self.logger.info(self.vapi.ppcli("show int addr"))
44
45         self.tra_spd = VppIpsecSpd(self, self.tra_spd_id)
46         self.tra_spd.add_vpp_config()
47         VppIpsecSpdItfBinding(self, self.tra_spd,
48                               self.tra_if).add_vpp_config()
49         self.tun_spd = VppIpsecSpd(self, self.tun_spd_id)
50         self.tun_spd.add_vpp_config()
51         VppIpsecSpdItfBinding(self, self.tun_spd,
52                               self.tun_if).add_vpp_config()
53
54         for _, p in self.params.items():
55             self.config_ah_tra(p)
56             self.configure_sa_tra(p)
57             self.logger.info(self.vapi.ppcli("show ipsec"))
58         for _, p in self.params.items():
59             self.config_ah_tun(p)
60             self.logger.info(self.vapi.ppcli("show ipsec"))
61         for _, p in self.params.items():
62             d = DpoProto.DPO_PROTO_IP6 if p.is_ipv6 else DpoProto.DPO_PROTO_IP4
63             VppIpRoute(self,  p.remote_tun_if_host, p.addr_len,
64                        [VppRoutePath(self.tun_if.remote_addr[p.addr_type],
65                                      0xffffffff,
66                                      proto=d)],
67                        is_ip6=p.is_ipv6).add_vpp_config()
68
69     def tearDown(self):
70         super(TemplateIpsecAh, self).tearDown()
71         if not self.vpp_dead:
72             self.vapi.cli("show hardware")
73
74     def config_ah_tun(self, params):
75         addr_type = params.addr_type
76         scapy_tun_sa_id = params.scapy_tun_sa_id
77         scapy_tun_spi = params.scapy_tun_spi
78         vpp_tun_sa_id = params.vpp_tun_sa_id
79         vpp_tun_spi = params.vpp_tun_spi
80         auth_algo_vpp_id = params.auth_algo_vpp_id
81         auth_key = params.auth_key
82         crypt_algo_vpp_id = params.crypt_algo_vpp_id
83         crypt_key = params.crypt_key
84         remote_tun_if_host = params.remote_tun_if_host
85         addr_any = params.addr_any
86         addr_bcast = params.addr_bcast
87         e = VppEnum.vl_api_ipsec_spd_action_t
88
89         VppIpsecSA(self, scapy_tun_sa_id, scapy_tun_spi,
90                    auth_algo_vpp_id, auth_key,
91                    crypt_algo_vpp_id, crypt_key,
92                    self.vpp_ah_protocol,
93                    self.tun_if.local_addr[addr_type],
94                    self.tun_if.remote_addr[addr_type]).add_vpp_config()
95         VppIpsecSA(self, vpp_tun_sa_id, vpp_tun_spi,
96                    auth_algo_vpp_id, auth_key,
97                    crypt_algo_vpp_id, crypt_key,
98                    self.vpp_ah_protocol,
99                    self.tun_if.remote_addr[addr_type],
100                    self.tun_if.local_addr[addr_type]).add_vpp_config()
101
102         params.spd_policy_in_any = VppIpsecSpdEntry(self, self.tun_spd,
103                                                     vpp_tun_sa_id,
104                                                     addr_any, addr_bcast,
105                                                     addr_any, addr_bcast,
106                                                     socket.IPPROTO_AH)
107         params.spd_policy_in_any.add_vpp_config()
108         params.spd_policy_out_any = VppIpsecSpdEntry(self, self.tun_spd,
109                                                      vpp_tun_sa_id,
110                                                      addr_any, addr_bcast,
111                                                      addr_any, addr_bcast,
112                                                      socket.IPPROTO_AH,
113                                                      is_outbound=0)
114         params.spd_policy_out_any.add_vpp_config()
115
116         VppIpsecSpdEntry(self, self.tun_spd, vpp_tun_sa_id,
117                          remote_tun_if_host,
118                          remote_tun_if_host,
119                          self.pg1.remote_addr[addr_type],
120                          self.pg1.remote_addr[addr_type],
121                          0, priority=10,
122                          policy=e.IPSEC_API_SPD_ACTION_PROTECT,
123                          is_outbound=0).add_vpp_config()
124         VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
125                          self.pg1.remote_addr[addr_type],
126                          self.pg1.remote_addr[addr_type],
127                          remote_tun_if_host,
128                          remote_tun_if_host,
129                          0, policy=e.IPSEC_API_SPD_ACTION_PROTECT,
130                          priority=10).add_vpp_config()
131
132         VppIpsecSpdEntry(self, self.tun_spd, vpp_tun_sa_id,
133                          remote_tun_if_host,
134                          remote_tun_if_host,
135                          self.pg0.local_addr[addr_type],
136                          self.pg0.local_addr[addr_type],
137                          0, priority=20,
138                          policy=e.IPSEC_API_SPD_ACTION_PROTECT,
139                          is_outbound=0).add_vpp_config()
140         VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
141                          self.pg0.local_addr[addr_type],
142                          self.pg0.local_addr[addr_type],
143                          remote_tun_if_host,
144                          remote_tun_if_host,
145                          0, policy=e.IPSEC_API_SPD_ACTION_PROTECT,
146                          priority=20).add_vpp_config()
147
148     def config_ah_tra(self, params):
149         addr_type = params.addr_type
150         scapy_tra_sa_id = params.scapy_tra_sa_id
151         scapy_tra_spi = params.scapy_tra_spi
152         vpp_tra_sa_id = params.vpp_tra_sa_id
153         vpp_tra_spi = params.vpp_tra_spi
154         auth_algo_vpp_id = params.auth_algo_vpp_id
155         auth_key = params.auth_key
156         crypt_algo_vpp_id = params.crypt_algo_vpp_id
157         crypt_key = params.crypt_key
158         addr_any = params.addr_any
159         addr_bcast = params.addr_bcast
160         flags = (VppEnum.vl_api_ipsec_sad_flags_t.
161                  IPSEC_API_SAD_FLAG_USE_ANTI_REPLAY)
162         e = VppEnum.vl_api_ipsec_spd_action_t
163
164         VppIpsecSA(self, scapy_tra_sa_id, scapy_tra_spi,
165                    auth_algo_vpp_id, auth_key,
166                    crypt_algo_vpp_id, crypt_key,
167                    self.vpp_ah_protocol,
168                    flags=flags).add_vpp_config()
169         VppIpsecSA(self, vpp_tra_sa_id, vpp_tra_spi,
170                    auth_algo_vpp_id, auth_key,
171                    crypt_algo_vpp_id, crypt_key,
172                    self.vpp_ah_protocol,
173                    flags=flags).add_vpp_config()
174
175         VppIpsecSpdEntry(self, self.tra_spd, vpp_tra_sa_id,
176                          addr_any, addr_bcast,
177                          addr_any, addr_bcast,
178                          socket.IPPROTO_AH).add_vpp_config()
179         VppIpsecSpdEntry(self, self.tra_spd, scapy_tra_sa_id,
180                          addr_any, addr_bcast,
181                          addr_any, addr_bcast,
182                          socket.IPPROTO_AH,
183                          is_outbound=0).add_vpp_config()
184
185         VppIpsecSpdEntry(self, self.tra_spd, vpp_tra_sa_id,
186                          self.tra_if.local_addr[addr_type],
187                          self.tra_if.local_addr[addr_type],
188                          self.tra_if.remote_addr[addr_type],
189                          self.tra_if.remote_addr[addr_type],
190                          0, priority=10,
191                          policy=e.IPSEC_API_SPD_ACTION_PROTECT,
192                          is_outbound=0).add_vpp_config()
193         VppIpsecSpdEntry(self, self.tra_spd, scapy_tra_sa_id,
194                          self.tra_if.local_addr[addr_type],
195                          self.tra_if.local_addr[addr_type],
196                          self.tra_if.remote_addr[addr_type],
197                          self.tra_if.remote_addr[addr_type],
198                          0, policy=e.IPSEC_API_SPD_ACTION_PROTECT,
199                          priority=10).add_vpp_config()
200
201
202 class TestIpsecAh1(TemplateIpsecAh, IpsecTraTests, IpsecTunTests):
203     """ Ipsec AH - TUN & TRA tests """
204     tra4_encrypt_node_name = "ah4-encrypt"
205     tra4_decrypt_node_name = "ah4-decrypt"
206     tra6_encrypt_node_name = "ah6-encrypt"
207     tra6_decrypt_node_name = "ah6-decrypt"
208     tun4_encrypt_node_name = "ah4-encrypt"
209     tun4_decrypt_node_name = "ah4-decrypt"
210     tun6_encrypt_node_name = "ah6-encrypt"
211     tun6_decrypt_node_name = "ah6-decrypt"
212
213
214 class TestIpsecAh2(TemplateIpsecAh, IpsecTcpTests):
215     """ Ipsec AH - TCP tests """
216     pass
217
218
219 if __name__ == '__main__':
220     unittest.main(testRunner=VppTestRunner)