make test: ipsec test cleanup
[vpp.git] / test / test_ipsec_ah.py
1 import socket
2 import unittest
3
4 from scapy.layers.inet import IP, ICMP
5 from scapy.layers.l2 import Ether, Raw
6 from scapy.layers.ipsec import SecurityAssociation, AH
7
8 from framework import VppTestCase, VppTestRunner
9
10
11 class TestIpsecAh(VppTestCase):
12     """
13     Basic test for IPSEC using AH transport and Tunnel mode
14
15     Below 4 cases are covered as part of this test
16     1) ipsec ah v4 transport basic test  - IPv4 Transport mode
17      scenario using HMAC-SHA1-96 intergrity algo
18     2) ipsec ah v4 transport burst test
19      Above test for 257 pkts
20     3) ipsec ah 4o4 tunnel basic test    - IPv4 Tunnel mode
21      scenario using HMAC-SHA1-96 intergrity algo
22     4) ipsec ah 4o4 tunnel burst test
23      Above test for 257 pkts
24
25     TRANSPORT MODE:
26
27      ---   encrypt   ---
28     |pg2| <-------> |VPP|
29      ---   decrypt   ---
30
31     TUNNEL MODE:
32
33      ---   encrypt   ---   plain   ---
34     |pg0| ------->  |VPP| ------> |pg1|
35      ---             ---           ---
36
37      ---   decrypt   ---   plain   ---
38     |pg0| <-------  |VPP| <------ |pg1|
39      ---             ---           ---
40
41     Note : IPv6 is not covered
42     """
43
44     remote_pg0_lb_addr = '1.1.1.1'
45     remote_pg1_lb_addr = '2.2.2.2'
46
47     @classmethod
48     def setUpClass(cls):
49         super(TestIpsecAh, cls).setUpClass()
50         try:
51             cls.create_pg_interfaces(range(3))
52             cls.interfaces = list(cls.pg_interfaces)
53             for i in cls.interfaces:
54                 i.admin_up()
55                 i.config_ip4()
56                 i.resolve_arp()
57             cls.logger.info(cls.vapi.ppcli("show int addr"))
58             cls.configAhTra()
59             cls.logger.info(cls.vapi.ppcli("show ipsec"))
60             cls.configAhTun()
61             cls.logger.info(cls.vapi.ppcli("show ipsec"))
62         except Exception:
63             super(TestIpsecAh, cls).tearDownClass()
64             raise
65
66     @classmethod
67     def configAhTun(cls):
68         try:
69             spd_id = 1
70             remote_sa_id = 10
71             local_sa_id = 20
72             remote_tun_spi = 1001
73             local_tun_spi = 1000
74             src4 = socket.inet_pton(socket.AF_INET, cls.remote_pg0_lb_addr)
75             cls.vapi.ip_add_del_route(src4, 32, cls.pg0.remote_ip4n)
76             dst4 = socket.inet_pton(socket.AF_INET, cls.remote_pg1_lb_addr)
77             cls.vapi.ip_add_del_route(dst4, 32, cls.pg1.remote_ip4n)
78             cls.vapi.ipsec_sad_add_del_entry(
79                 remote_sa_id,
80                 remote_tun_spi,
81                 cls.pg0.local_ip4n,
82                 cls.pg0.remote_ip4n,
83                 integrity_key_length=20)
84             cls.vapi.ipsec_sad_add_del_entry(
85                 local_sa_id,
86                 local_tun_spi,
87                 cls.pg0.remote_ip4n,
88                 cls.pg0.local_ip4n,
89                 integrity_key_length=20)
90             cls.vapi.ipsec_spd_add_del(spd_id)
91             cls.vapi.ipsec_interface_add_del_spd(spd_id, cls.pg0.sw_if_index)
92             l_startaddr = r_startaddr = socket.inet_pton(
93                 socket.AF_INET, "0.0.0.0")
94             l_stopaddr = r_stopaddr = socket.inet_pton(
95                 socket.AF_INET, "255.255.255.255")
96             cls.vapi.ipsec_spd_add_del_entry(
97                 spd_id,
98                 l_startaddr,
99                 l_stopaddr,
100                 r_startaddr,
101                 r_stopaddr,
102                 protocol=51)
103             cls.vapi.ipsec_spd_add_del_entry(
104                 spd_id,
105                 l_startaddr,
106                 l_stopaddr,
107                 r_startaddr,
108                 r_stopaddr,
109                 protocol=51,
110                 is_outbound=0)
111             l_startaddr = l_stopaddr = socket.inet_pton(
112                 socket.AF_INET, cls.remote_pg0_lb_addr)
113             r_startaddr = r_stopaddr = socket.inet_pton(
114                 socket.AF_INET, cls.remote_pg1_lb_addr)
115             cls.vapi.ipsec_spd_add_del_entry(
116                 spd_id,
117                 l_startaddr,
118                 l_stopaddr,
119                 r_startaddr,
120                 r_stopaddr,
121                 priority=10,
122                 policy=3,
123                 is_outbound=0,
124                 sa_id=local_sa_id)
125             cls.vapi.ipsec_spd_add_del_entry(
126                 spd_id,
127                 r_startaddr,
128                 r_stopaddr,
129                 l_startaddr,
130                 l_stopaddr,
131                 priority=10,
132                 policy=3,
133                 sa_id=remote_sa_id)
134         except Exception:
135             raise
136
137     @classmethod
138     def configAhTra(cls):
139         try:
140             spd_id = 2
141             remote_sa_id = 30
142             local_sa_id = 40
143             remote_tra_spi = 2001
144             local_tra_spi = 2000
145             cls.vapi.ipsec_sad_add_del_entry(
146                 remote_sa_id,
147                 remote_tra_spi,
148                 integrity_key_length=20,
149                 is_tunnel=0)
150             cls.vapi.ipsec_sad_add_del_entry(
151                 local_sa_id,
152                 local_tra_spi,
153                 integrity_key_length=20,
154                 is_tunnel=0)
155             cls.vapi.ipsec_spd_add_del(spd_id)
156             cls.vapi.ipsec_interface_add_del_spd(spd_id, cls.pg2.sw_if_index)
157             l_startaddr = r_startaddr = socket.inet_pton(
158                 socket.AF_INET, "0.0.0.0")
159             l_stopaddr = r_stopaddr = socket.inet_pton(
160                 socket.AF_INET, "255.255.255.255")
161             cls.vapi.ipsec_spd_add_del_entry(
162                 spd_id,
163                 l_startaddr,
164                 l_stopaddr,
165                 r_startaddr,
166                 r_stopaddr,
167                 protocol=51)
168             cls.vapi.ipsec_spd_add_del_entry(
169                 spd_id,
170                 l_startaddr,
171                 l_stopaddr,
172                 r_startaddr,
173                 r_stopaddr,
174                 protocol=51,
175                 is_outbound=0)
176             l_startaddr = l_stopaddr = cls.pg2.local_ip4n
177             r_startaddr = r_stopaddr = cls.pg2.remote_ip4n
178             cls.vapi.ipsec_spd_add_del_entry(
179                 spd_id,
180                 l_startaddr,
181                 l_stopaddr,
182                 r_startaddr,
183                 r_stopaddr,
184                 priority=10,
185                 policy=3,
186                 is_outbound=0,
187                 sa_id=local_sa_id)
188             cls.vapi.ipsec_spd_add_del_entry(
189                 spd_id,
190                 l_startaddr,
191                 l_stopaddr,
192                 r_startaddr,
193                 r_stopaddr,
194                 priority=10,
195                 policy=3,
196                 sa_id=remote_sa_id)
197         except Exception:
198             raise
199
200     def configScapySA(self, is_tun=False):
201         if is_tun:
202             self.remote_tun_sa = SecurityAssociation(
203                 AH,
204                 spi=0x000003e8,
205                 auth_algo='HMAC-SHA1-96',
206                 auth_key='C91KUR9GYMm5GfkEvNjX',
207                 tunnel_header=IP(
208                     src=self.pg0.remote_ip4,
209                     dst=self.pg0.local_ip4))
210             self.local_tun_sa = SecurityAssociation(
211                 AH,
212                 spi=0x000003e9,
213                 auth_algo='HMAC-SHA1-96',
214                 auth_key='C91KUR9GYMm5GfkEvNjX',
215                 tunnel_header=IP(
216                     dst=self.pg0.remote_ip4,
217                     src=self.pg0.local_ip4))
218         else:
219             self.remote_tra_sa = SecurityAssociation(
220                 AH, spi=0x000007d0, auth_algo='HMAC-SHA1-96',
221                 auth_key='C91KUR9GYMm5GfkEvNjX')
222             self.local_tra_sa = SecurityAssociation(
223                 AH, spi=0x000007d1, auth_algo='HMAC-SHA1-96',
224                 auth_key='C91KUR9GYMm5GfkEvNjX')
225
226     def tearDown(self):
227         super(TestIpsecAh, self).tearDown()
228         if not self.vpp_dead:
229             self.vapi.cli("show hardware")
230
231     def send_and_expect(self, input, pkts, output, count=1):
232         input.add_stream(pkts)
233         self.pg_enable_capture(self.pg_interfaces)
234         self.pg_start()
235         rx = output.get_capture(count)
236         return rx
237
238     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1):
239         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
240                 sa.encrypt(IP(src=src, dst=dst) / ICMP() /
241                 "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX")
242                 ] * count
243
244     def gen_pkts(self, sw_intf, src, dst, count=1):
245         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
246                 IP(src=src, dst=dst) / ICMP() /
247                 "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
248                 ] * count
249
250     def test_ipsec_ah_tra_basic(self, count=1):
251         """ ipsec ah v4 transport basic test """
252         try:
253             self.configScapySA()
254             send_pkts = self.gen_encrypt_pkts(
255                 self.remote_tra_sa,
256                 self.pg2,
257                 src=self.pg2.remote_ip4,
258                 dst=self.pg2.local_ip4,
259                 count=count)
260             recv_pkts = self.send_and_expect(
261                 self.pg2, send_pkts, self.pg2, count=count)
262             # ESP TRA VPP encryption/decryption verification
263             for Pkts in recv_pkts:
264                 Pkts[AH].padding = Pkts[AH].icv[12:]
265                 Pkts[AH].icv = Pkts[AH].icv[:12]
266                 self.local_tra_sa.decrypt(Pkts[IP])
267         finally:
268             self.logger.info(self.vapi.ppcli("show error"))
269             self.logger.info(self.vapi.ppcli("show ipsec"))
270
271     def test_ipsec_ah_tra_burst(self):
272         """ ipsec ah v4 transport burst test """
273         try:
274             self.test_ipsec_ah_tra_basic(count=257)
275         finally:
276             self.logger.info(self.vapi.ppcli("show error"))
277             self.logger.info(self.vapi.ppcli("show ipsec"))
278
279     def test_ipsec_ah_tun_basic(self, count=1):
280         """ ipsec ah 4o4 tunnel basic test """
281         try:
282             self.configScapySA(is_tun=True)
283             send_pkts = self.gen_encrypt_pkts(
284                 self.remote_tun_sa,
285                 self.pg0,
286                 src=self.remote_pg0_lb_addr,
287                 dst=self.remote_pg1_lb_addr,
288                 count=count)
289             recv_pkts = self.send_and_expect(
290                 self.pg0, send_pkts, self.pg1, count=count)
291             # ESP TUN VPP decryption verification
292             for recv_pkt in recv_pkts:
293                 self.assert_equal(recv_pkt[IP].src, self.remote_pg0_lb_addr)
294                 self.assert_equal(recv_pkt[IP].dst, self.remote_pg1_lb_addr)
295             send_pkts = self.gen_pkts(
296                 self.pg1,
297                 src=self.remote_pg1_lb_addr,
298                 dst=self.remote_pg0_lb_addr,
299                 count=count)
300             recv_pkts = self.send_and_expect(
301                 self.pg1, send_pkts, self.pg0, count=count)
302             # ESP TUN VPP encryption verification
303             for recv_pkt in recv_pkts:
304                 decrypt_pkt = self.local_tun_sa.decrypt(recv_pkt[IP])
305                 decrypt_pkt = IP(decrypt_pkt[Raw].load)
306                 self.assert_equal(decrypt_pkt.src, self.remote_pg1_lb_addr)
307                 self.assert_equal(decrypt_pkt.dst, self.remote_pg0_lb_addr)
308         finally:
309             self.logger.info(self.vapi.ppcli("show error"))
310             self.logger.info(self.vapi.ppcli("show ipsec"))
311
312     def test_ipsec_ah_tun_burst(self):
313         """ ipsec ah 4o4 tunnel burst test """
314         try:
315             self.test_ipsec_ah_tun_basic(count=257)
316         finally:
317             self.logger.info(self.vapi.ppcli("show error"))
318             self.logger.info(self.vapi.ppcli("show ipsec"))
319
320
321 if __name__ == '__main__':
322     unittest.main(testRunner=VppTestRunner)