Revert "ipsec: VPP-1316 calculate IP/TCP/UDP inner checksums"
[vpp.git] / test / test_ipsec_esp.py
1 import socket
2 import unittest
3
4 from scapy.layers.inet import IP, ICMP
5 from scapy.layers.l2 import Ether
6 from scapy.layers.ipsec import SecurityAssociation, ESP
7
8 from framework import VppTestCase, VppTestRunner
9
10
11 class TestIpsecEsp(VppTestCase):
12     """
13     Basic test for ipsec esp sanity - tunnel and transport modes.
14
15     Below 4 cases are covered as part of this test
16     1) ipsec esp v4 transport basic test  - IPv4 Transport mode
17         scenario using HMAC-SHA1-96 intergrity algo
18     2) ipsec esp v4 transport burst test
19         Above test for 257 pkts
20     3) ipsec esp 4o4 tunnel basic test    - IPv4 Tunnel mode
21         scenario using HMAC-SHA1-96 intergrity algo
22     4) ipsec esp 4o4 tunnel burst test
23         Above test for 257 pkts
24
25     TRANSPORT MODE:
26
27      ---   encrypt   ---
28     |pg2| <-------> |VPP|
29      ---   decrypt   ---
30
31     TUNNEL MODE:
32
33      ---   encrypt   ---   plain   ---
34     |pg0| <-------  |VPP| <------ |pg1|
35      ---             ---           ---
36
37      ---   decrypt   ---   plain   ---
38     |pg0| ------->  |VPP| ------> |pg1|
39      ---             ---           ---
40
41     Note : IPv6 is not covered
42     """
43
44     remote_pg0_lb_addr = '1.1.1.1'
45     remote_pg1_lb_addr = '2.2.2.2'
46
47     @classmethod
48     def setUpClass(cls):
49         super(TestIpsecEsp, cls).setUpClass()
50         try:
51             cls.create_pg_interfaces(range(3))
52             cls.interfaces = list(cls.pg_interfaces)
53             for i in cls.interfaces:
54                 i.admin_up()
55                 i.config_ip4()
56                 i.resolve_arp()
57             cls.logger.info(cls.vapi.ppcli("show int addr"))
58             cls.configEspTra()
59             cls.logger.info(cls.vapi.ppcli("show ipsec"))
60             cls.configEspTun()
61             cls.logger.info(cls.vapi.ppcli("show ipsec"))
62         except Exception:
63             super(TestIpsecEsp, cls).tearDownClass()
64             raise
65
66     @classmethod
67     def configEspTun(cls):
68         try:
69             spd_id = 1
70             remote_sa_id = 10
71             local_sa_id = 20
72             remote_tun_spi = 1001
73             local_tun_spi = 1000
74             src4 = socket.inet_pton(socket.AF_INET, cls.remote_pg0_lb_addr)
75             cls.vapi.ip_add_del_route(src4, 32, cls.pg0.remote_ip4n)
76             dst4 = socket.inet_pton(socket.AF_INET, cls.remote_pg1_lb_addr)
77             cls.vapi.ip_add_del_route(dst4, 32, cls.pg1.remote_ip4n)
78             cls.vapi.ipsec_sad_add_del_entry(
79                 remote_sa_id,
80                 remote_tun_spi,
81                 cls.pg0.local_ip4n,
82                 cls.pg0.remote_ip4n,
83                 integrity_key_length=20,
84                 crypto_key_length=16,
85                 protocol=1)
86             cls.vapi.ipsec_sad_add_del_entry(
87                 local_sa_id,
88                 local_tun_spi,
89                 cls.pg0.remote_ip4n,
90                 cls.pg0.local_ip4n,
91                 integrity_key_length=20,
92                 crypto_key_length=16,
93                 protocol=1)
94             cls.vapi.ipsec_spd_add_del(spd_id)
95             cls.vapi.ipsec_interface_add_del_spd(spd_id, cls.pg0.sw_if_index)
96             l_startaddr = r_startaddr = socket.inet_pton(
97                 socket.AF_INET, "0.0.0.0")
98             l_stopaddr = r_stopaddr = socket.inet_pton(
99                 socket.AF_INET, "255.255.255.255")
100             cls.vapi.ipsec_spd_add_del_entry(
101                 spd_id,
102                 l_startaddr,
103                 l_stopaddr,
104                 r_startaddr,
105                 r_stopaddr,
106                 protocol=socket.IPPROTO_ESP)
107             cls.vapi.ipsec_spd_add_del_entry(
108                 spd_id,
109                 l_startaddr,
110                 l_stopaddr,
111                 r_startaddr,
112                 r_stopaddr,
113                 protocol=socket.IPPROTO_ESP,
114                 is_outbound=0)
115             l_startaddr = l_stopaddr = socket.inet_pton(
116                 socket.AF_INET, cls.remote_pg0_lb_addr)
117             r_startaddr = r_stopaddr = socket.inet_pton(
118                 socket.AF_INET, cls.remote_pg1_lb_addr)
119             cls.vapi.ipsec_spd_add_del_entry(
120                 spd_id,
121                 l_startaddr,
122                 l_stopaddr,
123                 r_startaddr,
124                 r_stopaddr,
125                 priority=10,
126                 policy=3,
127                 is_outbound=0,
128                 sa_id=local_sa_id)
129             cls.vapi.ipsec_spd_add_del_entry(
130                 spd_id,
131                 r_startaddr,
132                 r_stopaddr,
133                 l_startaddr,
134                 l_stopaddr,
135                 priority=10,
136                 policy=3,
137                 sa_id=remote_sa_id)
138         except Exception:
139             raise
140
141     @classmethod
142     def configEspTra(cls):
143         try:
144             spd_id = 2
145             remote_sa_id = 30
146             local_sa_id = 40
147             remote_tra_spi = 2001
148             local_tra_spi = 2000
149             cls.vapi.ipsec_sad_add_del_entry(
150                 remote_sa_id,
151                 remote_tra_spi,
152                 integrity_key_length=20,
153                 crypto_key_length=16,
154                 protocol=1,
155                 is_tunnel=0)
156             cls.vapi.ipsec_sad_add_del_entry(
157                 local_sa_id,
158                 local_tra_spi,
159                 integrity_key_length=20,
160                 crypto_key_length=16,
161                 protocol=1,
162                 is_tunnel=0)
163             cls.vapi.ipsec_spd_add_del(spd_id)
164             cls.vapi.ipsec_interface_add_del_spd(spd_id, cls.pg2.sw_if_index)
165             l_startaddr = r_startaddr = socket.inet_pton(
166                 socket.AF_INET, "0.0.0.0")
167             l_stopaddr = r_stopaddr = socket.inet_pton(
168                 socket.AF_INET, "255.255.255.255")
169             cls.vapi.ipsec_spd_add_del_entry(
170                 spd_id,
171                 l_startaddr,
172                 l_stopaddr,
173                 r_startaddr,
174                 r_stopaddr,
175                 protocol=socket.IPPROTO_ESP)
176             cls.vapi.ipsec_spd_add_del_entry(
177                 spd_id,
178                 l_startaddr,
179                 l_stopaddr,
180                 r_startaddr,
181                 r_stopaddr,
182                 protocol=socket.IPPROTO_ESP,
183                 is_outbound=0)
184             l_startaddr = l_stopaddr = cls.pg2.local_ip4n
185             r_startaddr = r_stopaddr = cls.pg2.remote_ip4n
186             cls.vapi.ipsec_spd_add_del_entry(
187                 spd_id,
188                 l_startaddr,
189                 l_stopaddr,
190                 r_startaddr,
191                 r_stopaddr,
192                 priority=10,
193                 policy=3,
194                 is_outbound=0,
195                 sa_id=local_sa_id)
196             cls.vapi.ipsec_spd_add_del_entry(
197                 spd_id,
198                 l_startaddr,
199                 l_stopaddr,
200                 r_startaddr,
201                 r_stopaddr,
202                 priority=10,
203                 policy=3,
204                 sa_id=remote_sa_id)
205         except Exception:
206             raise
207
208     def configScapySA(self, is_tun=False):
209         if is_tun:
210             self.remote_tun_sa = SecurityAssociation(
211                 ESP,
212                 spi=0x000003e8,
213                 crypt_algo='AES-CBC',
214                 crypt_key='JPjyOWBeVEQiMe7h',
215                 auth_algo='HMAC-SHA1-96',
216                 auth_key='C91KUR9GYMm5GfkEvNjX',
217                 tunnel_header=IP(
218                     src=self.pg0.remote_ip4,
219                     dst=self.pg0.local_ip4))
220             self.local_tun_sa = SecurityAssociation(
221                 ESP,
222                 spi=0x000003e9,
223                 crypt_algo='AES-CBC',
224                 crypt_key='JPjyOWBeVEQiMe7h',
225                 auth_algo='HMAC-SHA1-96',
226                 auth_key='C91KUR9GYMm5GfkEvNjX',
227                 tunnel_header=IP(
228                     dst=self.pg0.remote_ip4,
229                     src=self.pg0.local_ip4))
230         else:
231             self.remote_tra_sa = SecurityAssociation(
232                 ESP,
233                 spi=0x000007d0,
234                 crypt_algo='AES-CBC',
235                 crypt_key='JPjyOWBeVEQiMe7h',
236                 auth_algo='HMAC-SHA1-96',
237                 auth_key='C91KUR9GYMm5GfkEvNjX')
238             self.local_tra_sa = SecurityAssociation(
239                 ESP,
240                 spi=0x000007d1,
241                 crypt_algo='AES-CBC',
242                 crypt_key='JPjyOWBeVEQiMe7h',
243                 auth_algo='HMAC-SHA1-96',
244                 auth_key='C91KUR9GYMm5GfkEvNjX')
245
246     def tearDown(self):
247         super(TestIpsecEsp, self).tearDown()
248         if not self.vpp_dead:
249             self.vapi.cli("show hardware")
250
251     def send_and_expect(self, input, pkts, output, count=1):
252         input.add_stream(pkts)
253         self.pg_enable_capture(self.pg_interfaces)
254         self.pg_start()
255         rx = output.get_capture(count)
256         return rx
257
258     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1):
259         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
260                 sa.encrypt(IP(src=src, dst=dst) / ICMP() /
261                 "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX")
262                 ] * count
263
264     def gen_pkts(self, sw_intf, src, dst, count=1):
265         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
266                 IP(src=src, dst=dst) / ICMP() /
267                 "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
268                 ] * count
269
270     def test_ipsec_esp_tra_basic(self, count=1):
271         """ ipsec esp v4 transport basic test """
272         try:
273             self.configScapySA()
274             send_pkts = self.gen_encrypt_pkts(
275                 self.remote_tra_sa,
276                 self.pg2,
277                 src=self.pg2.remote_ip4,
278                 dst=self.pg2.local_ip4,
279                 count=count)
280             recv_pkts = self.send_and_expect(
281                 self.pg2, send_pkts, self.pg2, count=count)
282             # ESP TRA VPP encryption/decryption verification
283             for Pkts in recv_pkts:
284                 self.local_tra_sa.decrypt(Pkts[IP])
285         finally:
286             self.logger.info(self.vapi.ppcli("show error"))
287             self.logger.info(self.vapi.ppcli("show ipsec"))
288
289     def test_ipsec_esp_tra_burst(self):
290         """ ipsec esp v4 transport burst test """
291         try:
292             self.test_ipsec_esp_tra_basic(count=257)
293         finally:
294             self.logger.info(self.vapi.ppcli("show error"))
295             self.logger.info(self.vapi.ppcli("show ipsec"))
296
297     def test_ipsec_esp_tun_basic(self, count=1):
298         """ ipsec esp 4o4 tunnel basic test """
299         try:
300             self.configScapySA(is_tun=True)
301             send_pkts = self.gen_encrypt_pkts(
302                 self.remote_tun_sa,
303                 self.pg0,
304                 src=self.remote_pg0_lb_addr,
305                 dst=self.remote_pg1_lb_addr,
306                 count=count)
307             recv_pkts = self.send_and_expect(
308                 self.pg0, send_pkts, self.pg1, count=count)
309             # ESP TUN VPP decryption verification
310             for recv_pkt in recv_pkts:
311                 self.assert_equal(recv_pkt[IP].src, self.remote_pg0_lb_addr)
312                 self.assert_equal(recv_pkt[IP].dst, self.remote_pg1_lb_addr)
313             send_pkts = self.gen_pkts(
314                 self.pg1,
315                 src=self.remote_pg1_lb_addr,
316                 dst=self.remote_pg0_lb_addr,
317                 count=count)
318             recv_pkts = self.send_and_expect(
319                 self.pg1, send_pkts, self.pg0, count=count)
320             # ESP TUN VPP encryption verification
321             for recv_pkt in recv_pkts:
322                 decrypt_pkt = self.local_tun_sa.decrypt(recv_pkt[IP])
323                 self.assert_equal(decrypt_pkt.src, self.remote_pg1_lb_addr)
324                 self.assert_equal(decrypt_pkt.dst, self.remote_pg0_lb_addr)
325         finally:
326             self.logger.info(self.vapi.ppcli("show error"))
327             self.logger.info(self.vapi.ppcli("show ipsec"))
328
329     def test_ipsec_esp_tun_burst(self):
330         """ ipsec esp 4o4 tunnel burst test """
331         try:
332             self.test_ipsec_esp_tun_basic(count=257)
333         finally:
334             self.logger.info(self.vapi.ppcli("show error"))
335             self.logger.info(self.vapi.ppcli("show ipsec"))
336
337
338 if __name__ == '__main__':
339     unittest.main(testRunner=VppTestRunner)