59f6864821fffe2397812fc667b53c70d5f222eb
[vpp.git] / test / test_ipsec_ah.py
1 import socket
2 import unittest
3
4 from scapy.layers.ipsec import AH
5
6 from framework import VppTestRunner
7 from template_ipsec import TemplateIpsec, IpsecTraTests, IpsecTunTests
8 from template_ipsec import IpsecTcpTests
9
10
11 class TemplateIpsecAh(TemplateIpsec):
12     """
13     Basic test for IPSEC using AH transport and Tunnel mode
14
15     Below 4 cases are covered as part of this test
16     1) ipsec ah v4 transport basic test  - IPv4 Transport mode
17      scenario using HMAC-SHA1-96 intergrity algo
18     2) ipsec ah v4 transport burst test
19      Above test for 257 pkts
20     3) ipsec ah 4o4 tunnel basic test    - IPv4 Tunnel mode
21      scenario using HMAC-SHA1-96 intergrity algo
22     4) ipsec ah 4o4 tunnel burst test
23      Above test for 257 pkts
24
25     TRANSPORT MODE:
26
27      ---   encrypt   ---
28     |pg2| <-------> |VPP|
29      ---   decrypt   ---
30
31     TUNNEL MODE:
32
33      ---   encrypt   ---   plain   ---
34     |pg0| <-------  |VPP| <------ |pg1|
35      ---             ---           ---
36
37      ---   decrypt   ---   plain   ---
38     |pg0| ------->  |VPP| ------> |pg1|
39      ---             ---           ---
40     """
41
42     encryption_type = AH
43
44     @classmethod
45     def setUpClass(cls):
46         super(TemplateIpsecAh, cls).setUpClass()
47         cls.tun_if = cls.pg0
48         cls.tra_if = cls.pg2
49         cls.logger.info(cls.vapi.ppcli("show int addr"))
50         cls.vapi.ipsec_spd_add_del(cls.tun_spd_id)
51         cls.vapi.ipsec_interface_add_del_spd(cls.tun_spd_id,
52                                              cls.tun_if.sw_if_index)
53         cls.vapi.ipsec_spd_add_del(cls.tra_spd_id)
54         cls.vapi.ipsec_interface_add_del_spd(cls.tra_spd_id,
55                                              cls.tra_if.sw_if_index)
56         for _, p in cls.params.items():
57             cls.config_ah_tra(p)
58         cls.logger.info(cls.vapi.ppcli("show ipsec"))
59         for _, p in cls.params.items():
60             cls.config_ah_tun(p)
61         cls.logger.info(cls.vapi.ppcli("show ipsec"))
62         for _, p in cls.params.items():
63             src = socket.inet_pton(p.addr_type, p.remote_tun_if_host)
64             cls.vapi.ip_add_del_route(src, p.addr_len,
65                                       cls.tun_if.remote_addr_n[p.addr_type],
66                                       is_ipv6=p.is_ipv6)
67
68     @classmethod
69     def config_ah_tun(cls, params):
70         addr_type = params.addr_type
71         is_ipv6 = params.is_ipv6
72         scapy_tun_sa_id = params.scapy_tun_sa_id
73         scapy_tun_spi = params.scapy_tun_spi
74         vpp_tun_sa_id = params.vpp_tun_sa_id
75         vpp_tun_spi = params.vpp_tun_spi
76         auth_algo_vpp_id = params.auth_algo_vpp_id
77         auth_key = params.auth_key
78         crypt_algo_vpp_id = params.crypt_algo_vpp_id
79         crypt_key = params.crypt_key
80         remote_tun_if_host = params.remote_tun_if_host
81         addr_any = params.addr_any
82         addr_bcast = params.addr_bcast
83         cls.vapi.ipsec_sad_add_del_entry(scapy_tun_sa_id, scapy_tun_spi,
84                                          auth_algo_vpp_id, auth_key,
85                                          crypt_algo_vpp_id, crypt_key,
86                                          cls.vpp_ah_protocol,
87                                          cls.tun_if.local_addr_n[addr_type],
88                                          cls.tun_if.remote_addr_n[addr_type],
89                                          is_tunnel=1, is_tunnel_ipv6=is_ipv6)
90         cls.vapi.ipsec_sad_add_del_entry(vpp_tun_sa_id, vpp_tun_spi,
91                                          auth_algo_vpp_id, auth_key,
92                                          crypt_algo_vpp_id, crypt_key,
93                                          cls.vpp_ah_protocol,
94                                          cls.tun_if.remote_addr_n[addr_type],
95                                          cls.tun_if.local_addr_n[addr_type],
96                                          is_tunnel=1, is_tunnel_ipv6=is_ipv6)
97         l_startaddr = r_startaddr = socket.inet_pton(addr_type, addr_any)
98         l_stopaddr = r_stopaddr = socket.inet_pton(addr_type, addr_bcast)
99         cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, vpp_tun_sa_id,
100                                          l_startaddr, l_stopaddr, r_startaddr,
101                                          r_stopaddr, is_ipv6=is_ipv6,
102                                          protocol=socket.IPPROTO_AH)
103         cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, vpp_tun_sa_id,
104                                          l_startaddr, l_stopaddr, r_startaddr,
105                                          r_stopaddr, is_outbound=0,
106                                          is_ipv6=is_ipv6,
107                                          protocol=socket.IPPROTO_AH)
108         l_startaddr = l_stopaddr = socket.inet_pton(addr_type,
109                                                     remote_tun_if_host)
110         r_startaddr = r_stopaddr = cls.pg1.remote_addr_n[addr_type]
111         cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, vpp_tun_sa_id,
112                                          l_startaddr, l_stopaddr, r_startaddr,
113                                          r_stopaddr, priority=10, policy=3,
114                                          is_outbound=0, is_ipv6=is_ipv6)
115         cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, scapy_tun_sa_id,
116                                          r_startaddr, r_stopaddr, l_startaddr,
117                                          l_stopaddr, priority=10, policy=3,
118                                          is_ipv6=is_ipv6)
119         r_startaddr = r_stopaddr = cls.pg0.local_addr_n[addr_type]
120         cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, vpp_tun_sa_id,
121                                          l_startaddr, l_stopaddr, r_startaddr,
122                                          r_stopaddr, priority=20, policy=3,
123                                          is_outbound=0, is_ipv6=is_ipv6)
124         cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, scapy_tun_sa_id,
125                                          r_startaddr, r_stopaddr, l_startaddr,
126                                          l_stopaddr, priority=20, policy=3,
127                                          is_ipv6=is_ipv6)
128
129     @classmethod
130     def config_ah_tra(cls, params):
131         addr_type = params.addr_type
132         is_ipv6 = params.is_ipv6
133         scapy_tra_sa_id = params.scapy_tra_sa_id
134         scapy_tra_spi = params.scapy_tra_spi
135         vpp_tra_sa_id = params.vpp_tra_sa_id
136         vpp_tra_spi = params.vpp_tra_spi
137         auth_algo_vpp_id = params.auth_algo_vpp_id
138         auth_key = params.auth_key
139         crypt_algo_vpp_id = params.crypt_algo_vpp_id
140         crypt_key = params.crypt_key
141         addr_any = params.addr_any
142         addr_bcast = params.addr_bcast
143         cls.vapi.ipsec_sad_add_del_entry(scapy_tra_sa_id, scapy_tra_spi,
144                                          auth_algo_vpp_id, auth_key,
145                                          crypt_algo_vpp_id, crypt_key,
146                                          cls.vpp_ah_protocol, is_tunnel=0,
147                                          is_tunnel_ipv6=0)
148         cls.vapi.ipsec_sad_add_del_entry(vpp_tra_sa_id, vpp_tra_spi,
149                                          auth_algo_vpp_id, auth_key,
150                                          crypt_algo_vpp_id, crypt_key,
151                                          cls.vpp_ah_protocol, is_tunnel=0,
152                                          is_tunnel_ipv6=0)
153         l_startaddr = r_startaddr = socket.inet_pton(addr_type, addr_any)
154         l_stopaddr = r_stopaddr = socket.inet_pton(addr_type, addr_bcast)
155         cls.vapi.ipsec_spd_add_del_entry(cls.tra_spd_id, vpp_tra_sa_id,
156                                          l_startaddr, l_stopaddr, r_startaddr,
157                                          r_stopaddr, is_ipv6=is_ipv6,
158                                          protocol=socket.IPPROTO_AH)
159         cls.vapi.ipsec_spd_add_del_entry(cls.tra_spd_id, scapy_tra_sa_id,
160                                          l_startaddr, l_stopaddr, r_startaddr,
161                                          r_stopaddr, is_outbound=0,
162                                          is_ipv6=is_ipv6,
163                                          protocol=socket.IPPROTO_AH)
164         l_startaddr = l_stopaddr = cls.tra_if.local_addr_n[addr_type]
165         r_startaddr = r_stopaddr = cls.tra_if.remote_addr_n[addr_type]
166         cls.vapi.ipsec_spd_add_del_entry(cls.tra_spd_id, vpp_tra_sa_id,
167                                          l_startaddr, l_stopaddr, r_startaddr,
168                                          r_stopaddr, priority=10, policy=3,
169                                          is_outbound=0, is_ipv6=is_ipv6)
170         cls.vapi.ipsec_spd_add_del_entry(cls.tra_spd_id, scapy_tra_sa_id,
171                                          l_startaddr, l_stopaddr, r_startaddr,
172                                          r_stopaddr, priority=10,
173                                          policy=3, is_ipv6=is_ipv6)
174
175     def tearDown(self):
176         super(TemplateIpsecAh, self).tearDown()
177         if not self.vpp_dead:
178             self.vapi.cli("show hardware")
179
180
181 class TestIpsecAh1(TemplateIpsecAh, IpsecTraTests, IpsecTunTests):
182     """ Ipsec AH - TUN & TRA tests """
183     pass
184
185
186 class TestIpsecAh2(TemplateIpsecAh, IpsecTcpTests):
187     """ Ipsec AH - TCP tests """
188     pass
189
190
191 if __name__ == '__main__':
192     unittest.main(testRunner=VppTestRunner)