MAP: Convert from DPO to input feature.
[vpp.git] / test / test_ipsec_ah.py
1 import socket
2 import unittest
3
4 from scapy.layers.ipsec import AH
5
6 from framework import VppTestRunner
7 from template_ipsec import TemplateIpsec, IpsecTraTests, IpsecTunTests
8 from template_ipsec import IpsecTcpTests
9
10
11 class TemplateIpsecAh(TemplateIpsec):
12     """
13     Basic test for IPSEC using AH transport and Tunnel mode
14
15     TRANSPORT MODE:
16
17      ---   encrypt   ---
18     |pg2| <-------> |VPP|
19      ---   decrypt   ---
20
21     TUNNEL MODE:
22
23      ---   encrypt   ---   plain   ---
24     |pg0| <-------  |VPP| <------ |pg1|
25      ---             ---           ---
26
27      ---   decrypt   ---   plain   ---
28     |pg0| ------->  |VPP| ------> |pg1|
29      ---             ---           ---
30     """
31
32     encryption_type = AH
33
34     @classmethod
35     def setUpClass(cls):
36         super(TemplateIpsecAh, cls).setUpClass()
37         cls.tun_if = cls.pg0
38         cls.tra_if = cls.pg2
39         cls.logger.info(cls.vapi.ppcli("show int addr"))
40         cls.vapi.ipsec_spd_add_del(cls.tun_spd_id)
41         cls.vapi.ipsec_interface_add_del_spd(cls.tun_spd_id,
42                                              cls.tun_if.sw_if_index)
43         cls.vapi.ipsec_spd_add_del(cls.tra_spd_id)
44         cls.vapi.ipsec_interface_add_del_spd(cls.tra_spd_id,
45                                              cls.tra_if.sw_if_index)
46         for _, p in cls.params.items():
47             cls.config_ah_tra(p)
48             cls.configure_sa_tra(p)
49         cls.logger.info(cls.vapi.ppcli("show ipsec"))
50         for _, p in cls.params.items():
51             cls.config_ah_tun(p)
52         cls.logger.info(cls.vapi.ppcli("show ipsec"))
53         for _, p in cls.params.items():
54             src = socket.inet_pton(p.addr_type, p.remote_tun_if_host)
55             cls.vapi.ip_add_del_route(src, p.addr_len,
56                                       cls.tun_if.remote_addr_n[p.addr_type],
57                                       is_ipv6=p.is_ipv6)
58
59     @classmethod
60     def config_ah_tun(cls, params):
61         addr_type = params.addr_type
62         is_ipv6 = params.is_ipv6
63         scapy_tun_sa_id = params.scapy_tun_sa_id
64         scapy_tun_spi = params.scapy_tun_spi
65         vpp_tun_sa_id = params.vpp_tun_sa_id
66         vpp_tun_spi = params.vpp_tun_spi
67         auth_algo_vpp_id = params.auth_algo_vpp_id
68         auth_key = params.auth_key
69         crypt_algo_vpp_id = params.crypt_algo_vpp_id
70         crypt_key = params.crypt_key
71         remote_tun_if_host = params.remote_tun_if_host
72         addr_any = params.addr_any
73         addr_bcast = params.addr_bcast
74         cls.vapi.ipsec_sad_add_del_entry(scapy_tun_sa_id, scapy_tun_spi,
75                                          auth_algo_vpp_id, auth_key,
76                                          crypt_algo_vpp_id, crypt_key,
77                                          cls.vpp_ah_protocol,
78                                          cls.tun_if.local_addr_n[addr_type],
79                                          cls.tun_if.remote_addr_n[addr_type],
80                                          is_tunnel=1, is_tunnel_ipv6=is_ipv6)
81         cls.vapi.ipsec_sad_add_del_entry(vpp_tun_sa_id, vpp_tun_spi,
82                                          auth_algo_vpp_id, auth_key,
83                                          crypt_algo_vpp_id, crypt_key,
84                                          cls.vpp_ah_protocol,
85                                          cls.tun_if.remote_addr_n[addr_type],
86                                          cls.tun_if.local_addr_n[addr_type],
87                                          is_tunnel=1, is_tunnel_ipv6=is_ipv6)
88         l_startaddr = r_startaddr = socket.inet_pton(addr_type, addr_any)
89         l_stopaddr = r_stopaddr = socket.inet_pton(addr_type, addr_bcast)
90         cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, vpp_tun_sa_id,
91                                          l_startaddr, l_stopaddr, r_startaddr,
92                                          r_stopaddr, is_ipv6=is_ipv6,
93                                          protocol=socket.IPPROTO_AH)
94         cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, vpp_tun_sa_id,
95                                          l_startaddr, l_stopaddr, r_startaddr,
96                                          r_stopaddr, is_outbound=0,
97                                          is_ipv6=is_ipv6,
98                                          protocol=socket.IPPROTO_AH)
99         l_startaddr = l_stopaddr = socket.inet_pton(addr_type,
100                                                     remote_tun_if_host)
101         r_startaddr = r_stopaddr = cls.pg1.remote_addr_n[addr_type]
102         cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, vpp_tun_sa_id,
103                                          l_startaddr, l_stopaddr, r_startaddr,
104                                          r_stopaddr, priority=10, policy=3,
105                                          is_outbound=0, is_ipv6=is_ipv6)
106         cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, scapy_tun_sa_id,
107                                          r_startaddr, r_stopaddr, l_startaddr,
108                                          l_stopaddr, priority=10, policy=3,
109                                          is_ipv6=is_ipv6)
110         r_startaddr = r_stopaddr = cls.pg0.local_addr_n[addr_type]
111         cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, vpp_tun_sa_id,
112                                          l_startaddr, l_stopaddr, r_startaddr,
113                                          r_stopaddr, priority=20, policy=3,
114                                          is_outbound=0, is_ipv6=is_ipv6)
115         cls.vapi.ipsec_spd_add_del_entry(cls.tun_spd_id, scapy_tun_sa_id,
116                                          r_startaddr, r_stopaddr, l_startaddr,
117                                          l_stopaddr, priority=20, policy=3,
118                                          is_ipv6=is_ipv6)
119
120     @classmethod
121     def config_ah_tra(cls, params):
122         addr_type = params.addr_type
123         is_ipv6 = params.is_ipv6
124         scapy_tra_sa_id = params.scapy_tra_sa_id
125         scapy_tra_spi = params.scapy_tra_spi
126         vpp_tra_sa_id = params.vpp_tra_sa_id
127         vpp_tra_spi = params.vpp_tra_spi
128         auth_algo_vpp_id = params.auth_algo_vpp_id
129         auth_key = params.auth_key
130         crypt_algo_vpp_id = params.crypt_algo_vpp_id
131         crypt_key = params.crypt_key
132         addr_any = params.addr_any
133         addr_bcast = params.addr_bcast
134         cls.vapi.ipsec_sad_add_del_entry(scapy_tra_sa_id, scapy_tra_spi,
135                                          auth_algo_vpp_id, auth_key,
136                                          crypt_algo_vpp_id, crypt_key,
137                                          cls.vpp_ah_protocol, is_tunnel=0,
138                                          is_tunnel_ipv6=0,
139                                          use_anti_replay=1)
140         cls.vapi.ipsec_sad_add_del_entry(vpp_tra_sa_id, vpp_tra_spi,
141                                          auth_algo_vpp_id, auth_key,
142                                          crypt_algo_vpp_id, crypt_key,
143                                          cls.vpp_ah_protocol, is_tunnel=0,
144                                          is_tunnel_ipv6=0,
145                                          use_anti_replay=1)
146         l_startaddr = r_startaddr = socket.inet_pton(addr_type, addr_any)
147         l_stopaddr = r_stopaddr = socket.inet_pton(addr_type, addr_bcast)
148         cls.vapi.ipsec_spd_add_del_entry(cls.tra_spd_id, vpp_tra_sa_id,
149                                          l_startaddr, l_stopaddr, r_startaddr,
150                                          r_stopaddr, is_ipv6=is_ipv6,
151                                          protocol=socket.IPPROTO_AH)
152         cls.vapi.ipsec_spd_add_del_entry(cls.tra_spd_id, scapy_tra_sa_id,
153                                          l_startaddr, l_stopaddr, r_startaddr,
154                                          r_stopaddr, is_outbound=0,
155                                          is_ipv6=is_ipv6,
156                                          protocol=socket.IPPROTO_AH)
157         l_startaddr = l_stopaddr = cls.tra_if.local_addr_n[addr_type]
158         r_startaddr = r_stopaddr = cls.tra_if.remote_addr_n[addr_type]
159         cls.vapi.ipsec_spd_add_del_entry(cls.tra_spd_id, vpp_tra_sa_id,
160                                          l_startaddr, l_stopaddr, r_startaddr,
161                                          r_stopaddr, priority=10, policy=3,
162                                          is_outbound=0, is_ipv6=is_ipv6)
163         cls.vapi.ipsec_spd_add_del_entry(cls.tra_spd_id, scapy_tra_sa_id,
164                                          l_startaddr, l_stopaddr, r_startaddr,
165                                          r_stopaddr, priority=10,
166                                          policy=3, is_ipv6=is_ipv6)
167
168     def tearDown(self):
169         super(TemplateIpsecAh, self).tearDown()
170         if not self.vpp_dead:
171             self.vapi.cli("show hardware")
172
173
174 class TestIpsecAh1(TemplateIpsecAh, IpsecTraTests, IpsecTunTests):
175     """ Ipsec AH - TUN & TRA tests """
176     tra4_encrypt_node_name = "ah4-encrypt"
177     tra4_decrypt_node_name = "ah4-decrypt"
178     tra6_encrypt_node_name = "ah6-encrypt"
179     tra6_decrypt_node_name = "ah6-decrypt"
180     tun4_encrypt_node_name = "ah4-encrypt"
181     tun4_decrypt_node_name = "ah4-decrypt"
182     tun6_encrypt_node_name = "ah6-encrypt"
183     tun6_decrypt_node_name = "ah6-decrypt"
184
185
186 class TestIpsecAh2(TemplateIpsecAh, IpsecTcpTests):
187     """ Ipsec AH - TCP tests """
188     pass
189
190
191 if __name__ == '__main__':
192     unittest.main(testRunner=VppTestRunner)