IPSEC: tests use opbject registry
[vpp.git] / test / test_ipsec_ah.py
1 import socket
2 import unittest
3
4 from scapy.layers.ipsec import AH
5
6 from framework import VppTestRunner
7 from template_ipsec import TemplateIpsec, IpsecTraTests, IpsecTunTests
8 from template_ipsec import IpsecTcpTests
9 from vpp_ipsec import *
10 from vpp_ip_route import VppIpRoute, VppRoutePath
11 from vpp_ip import DpoProto
12
13
14 class TemplateIpsecAh(TemplateIpsec):
15     """
16     Basic test for IPSEC using AH transport and Tunnel mode
17
18     TRANSPORT MODE:
19
20      ---   encrypt   ---
21     |pg2| <-------> |VPP|
22      ---   decrypt   ---
23
24     TUNNEL MODE:
25
26      ---   encrypt   ---   plain   ---
27     |pg0| <-------  |VPP| <------ |pg1|
28      ---             ---           ---
29
30      ---   decrypt   ---   plain   ---
31     |pg0| ------->  |VPP| ------> |pg1|
32      ---             ---           ---
33     """
34
35     def setUp(self):
36         super(TemplateIpsecAh, self).setUp()
37
38         self.encryption_type = AH
39         self.tun_if = self.pg0
40         self.tra_if = self.pg2
41         self.logger.info(self.vapi.ppcli("show int addr"))
42
43         self.tra_spd = VppIpsecSpd(self, self.tra_spd_id)
44         self.tra_spd.add_vpp_config()
45         VppIpsecSpdItfBinding(self, self.tra_spd,
46                               self.tra_if).add_vpp_config()
47         self.tun_spd = VppIpsecSpd(self, self.tun_spd_id)
48         self.tun_spd.add_vpp_config()
49         VppIpsecSpdItfBinding(self, self.tun_spd,
50                               self.tun_if).add_vpp_config()
51
52         for _, p in self.params.items():
53             self.config_ah_tra(p)
54             self.configure_sa_tra(p)
55             self.logger.info(self.vapi.ppcli("show ipsec"))
56         for _, p in self.params.items():
57             self.config_ah_tun(p)
58             self.logger.info(self.vapi.ppcli("show ipsec"))
59         for _, p in self.params.items():
60             d = DpoProto.DPO_PROTO_IP6 if p.is_ipv6 else DpoProto.DPO_PROTO_IP4
61             VppIpRoute(self,  p.remote_tun_if_host, p.addr_len,
62                        [VppRoutePath(self.tun_if.remote_addr[p.addr_type],
63                                      0xffffffff,
64                                      proto=d)],
65                        is_ip6=p.is_ipv6).add_vpp_config()
66
67     def tearDown(self):
68         super(TemplateIpsecAh, self).tearDown()
69         if not self.vpp_dead:
70             self.vapi.cli("show hardware")
71
72     def config_ah_tun(self, params):
73         addr_type = params.addr_type
74         scapy_tun_sa_id = params.scapy_tun_sa_id
75         scapy_tun_spi = params.scapy_tun_spi
76         vpp_tun_sa_id = params.vpp_tun_sa_id
77         vpp_tun_spi = params.vpp_tun_spi
78         auth_algo_vpp_id = params.auth_algo_vpp_id
79         auth_key = params.auth_key
80         crypt_algo_vpp_id = params.crypt_algo_vpp_id
81         crypt_key = params.crypt_key
82         remote_tun_if_host = params.remote_tun_if_host
83         addr_any = params.addr_any
84         addr_bcast = params.addr_bcast
85         VppIpsecSA(self, scapy_tun_sa_id, scapy_tun_spi,
86                    auth_algo_vpp_id, auth_key,
87                    crypt_algo_vpp_id, crypt_key,
88                    self.vpp_ah_protocol,
89                    self.tun_if.local_addr[addr_type],
90                    self.tun_if.remote_addr[addr_type]).add_vpp_config()
91         VppIpsecSA(self, vpp_tun_sa_id, vpp_tun_spi,
92                    auth_algo_vpp_id, auth_key,
93                    crypt_algo_vpp_id, crypt_key,
94                    self.vpp_ah_protocol,
95                    self.tun_if.remote_addr[addr_type],
96                    self.tun_if.local_addr[addr_type]).add_vpp_config()
97
98         VppIpsecSpdEntry(self, self.tun_spd, vpp_tun_sa_id,
99                          addr_any, addr_bcast,
100                          addr_any, addr_bcast,
101                          socket.IPPROTO_AH).add_vpp_config()
102         VppIpsecSpdEntry(self, self.tun_spd, vpp_tun_sa_id,
103                          addr_any, addr_bcast,
104                          addr_any, addr_bcast,
105                          socket.IPPROTO_AH,
106                          is_outbound=0).add_vpp_config()
107
108         VppIpsecSpdEntry(self, self.tun_spd, vpp_tun_sa_id,
109                          remote_tun_if_host,
110                          remote_tun_if_host,
111                          self.pg1.remote_addr[addr_type],
112                          self.pg1.remote_addr[addr_type],
113                          0, priority=10, policy=3,
114                          is_outbound=0).add_vpp_config()
115         VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
116                          self.pg1.remote_addr[addr_type],
117                          self.pg1.remote_addr[addr_type],
118                          remote_tun_if_host,
119                          remote_tun_if_host,
120                          0, priority=10, policy=3).add_vpp_config()
121
122         VppIpsecSpdEntry(self, self.tun_spd, vpp_tun_sa_id,
123                          remote_tun_if_host,
124                          remote_tun_if_host,
125                          self.pg0.local_addr[addr_type],
126                          self.pg0.local_addr[addr_type],
127                          0, priority=20, policy=3,
128                          is_outbound=0).add_vpp_config()
129         VppIpsecSpdEntry(self, self.tun_spd, scapy_tun_sa_id,
130                          self.pg0.local_addr[addr_type],
131                          self.pg0.local_addr[addr_type],
132                          remote_tun_if_host,
133                          remote_tun_if_host,
134                          0, priority=20, policy=3).add_vpp_config()
135
136     def config_ah_tra(self, params):
137         addr_type = params.addr_type
138         scapy_tra_sa_id = params.scapy_tra_sa_id
139         scapy_tra_spi = params.scapy_tra_spi
140         vpp_tra_sa_id = params.vpp_tra_sa_id
141         vpp_tra_spi = params.vpp_tra_spi
142         auth_algo_vpp_id = params.auth_algo_vpp_id
143         auth_key = params.auth_key
144         crypt_algo_vpp_id = params.crypt_algo_vpp_id
145         crypt_key = params.crypt_key
146         addr_any = params.addr_any
147         addr_bcast = params.addr_bcast
148
149         VppIpsecSA(self, scapy_tra_sa_id, scapy_tra_spi,
150                    auth_algo_vpp_id, auth_key,
151                    crypt_algo_vpp_id, crypt_key,
152                    self.vpp_ah_protocol,
153                    use_anti_replay=1).add_vpp_config()
154         VppIpsecSA(self, vpp_tra_sa_id, vpp_tra_spi,
155                    auth_algo_vpp_id, auth_key,
156                    crypt_algo_vpp_id, crypt_key,
157                    self.vpp_ah_protocol,
158                    use_anti_replay=1).add_vpp_config()
159
160         VppIpsecSpdEntry(self, self.tra_spd, vpp_tra_sa_id,
161                          addr_any, addr_bcast,
162                          addr_any, addr_bcast,
163                          socket.IPPROTO_AH).add_vpp_config()
164         VppIpsecSpdEntry(self, self.tra_spd, scapy_tra_sa_id,
165                          addr_any, addr_bcast,
166                          addr_any, addr_bcast,
167                          socket.IPPROTO_AH,
168                          is_outbound=0).add_vpp_config()
169
170         VppIpsecSpdEntry(self, self.tra_spd, vpp_tra_sa_id,
171                          self.tra_if.local_addr[addr_type],
172                          self.tra_if.local_addr[addr_type],
173                          self.tra_if.remote_addr[addr_type],
174                          self.tra_if.remote_addr[addr_type],
175                          0, priority=10, policy=3,
176                          is_outbound=0).add_vpp_config()
177         VppIpsecSpdEntry(self, self.tra_spd, scapy_tra_sa_id,
178                          self.tra_if.local_addr[addr_type],
179                          self.tra_if.local_addr[addr_type],
180                          self.tra_if.remote_addr[addr_type],
181                          self.tra_if.remote_addr[addr_type],
182                          0, priority=10, policy=3).add_vpp_config()
183
184
185 class TestIpsecAh1(TemplateIpsecAh, IpsecTraTests, IpsecTunTests):
186     """ Ipsec AH - TUN & TRA tests """
187     tra4_encrypt_node_name = "ah4-encrypt"
188     tra4_decrypt_node_name = "ah4-decrypt"
189     tra6_encrypt_node_name = "ah6-encrypt"
190     tra6_decrypt_node_name = "ah6-decrypt"
191     tun4_encrypt_node_name = "ah4-encrypt"
192     tun4_decrypt_node_name = "ah4-decrypt"
193     tun6_encrypt_node_name = "ah6-encrypt"
194     tun6_decrypt_node_name = "ah6-decrypt"
195
196
197 class TestIpsecAh2(TemplateIpsecAh, IpsecTcpTests):
198     """ Ipsec AH - TCP tests """
199     pass
200
201
202 if __name__ == '__main__':
203     unittest.main(testRunner=VppTestRunner)