ESP_AH_test_automation_scripts rev1
[vpp.git] / test / test_ipsec_esp.py
1 import socket
2
3 from scapy.layers.inet import IP, ICMP
4 from scapy.layers.l2 import Ether
5 from scapy.layers.ipsec import *
6
7 from framework import VppTestCase
8 from vpp_ip_route import VppIpRoute
9
10 from util import ppp
11
12
13 class TestIpsecEsp(VppTestCase):
14     """
15     Basic test for ipsec esp sanity - tunnel and transport modes.
16
17     Below 4 cases are covered as part of this test
18     1) ipsec esp v4 transport basic test  - IPv4 Transport mode
19         scenario using HMAC-SHA1-96 intergrity algo
20     2) ipsec esp v4 transport burst test
21         Above test for 257 pkts
22     3) ipsec esp 4o4 tunnel basic test    - IPv4 Tunnel mode
23         scenario using HMAC-SHA1-96 intergrity algo
24     4) ipsec esp 4o4 tunnel burst test
25         Above test for 257 pkts
26
27     TRANSPORT MODE:
28
29      ---   encrypt   ---
30     |pg2| <-------> |VPP|
31      ---   decrypt   ---
32
33     TUNNEL MODE:
34
35      ---   encrypt   ---   plain   ---
36     |pg0| ------->  |VPP| ------> |pg1|
37      ---             ---           ---
38
39      ---   decrypt   ---   plain   ---
40     |pg0| <-------  |VPP| <------ |pg1|
41      ---             ---           ---
42
43     Note : IPv6 is not covered
44     """
45
46     remote_pg0_lb_addr = '1.1.1.1'
47     remote_pg1_lb_addr = '2.2.2.2'
48
49     @classmethod
50     def setUpClass(cls):
51         super(TestIpsecEsp, cls).setUpClass()
52         try:
53             cls.create_pg_interfaces(range(3))
54             cls.interfaces = list(cls.pg_interfaces)
55             for i in cls.interfaces:
56                 i.admin_up()
57                 i.config_ip4()
58                 i.resolve_arp()
59             cls.logger.info(cls.vapi.ppcli("show int addr"))
60             cls.configEspTra()
61             cls.logger.info(cls.vapi.ppcli("show ipsec"))
62             cls.configEspTun()
63             cls.logger.info(cls.vapi.ppcli("show ipsec"))
64         except Exception:
65             super(TestIpsecEsp, cls).tearDownClass()
66             raise
67
68     @classmethod
69     def configEspTun(cls):
70         try:
71             spd_id = 1
72             remote_sa_id = 10
73             local_sa_id = 20
74             remote_tun_spi = 1001
75             local_tun_spi = 1000
76             src4 = socket.inet_pton(socket.AF_INET, cls.remote_pg0_lb_addr)
77             cls.vapi.ip_add_del_route(src4, 32, cls.pg0.remote_ip4n)
78             dst4 = socket.inet_pton(socket.AF_INET, cls.remote_pg1_lb_addr)
79             cls.vapi.ip_add_del_route(dst4, 32, cls.pg1.remote_ip4n)
80             cls.vapi.ipsec_sad_add_del_entry(
81                 remote_sa_id,
82                 remote_tun_spi,
83                 cls.pg0.local_ip4n,
84                 cls.pg0.remote_ip4n,
85                 integrity_key_length=20,
86                 crypto_key_length=16,
87                 protocol=1)
88             cls.vapi.ipsec_sad_add_del_entry(
89                 local_sa_id,
90                 local_tun_spi,
91                 cls.pg0.remote_ip4n,
92                 cls.pg0.local_ip4n,
93                 integrity_key_length=20,
94                 crypto_key_length=16,
95                 protocol=1)
96             cls.vapi.ipsec_spd_add_del(spd_id)
97             cls.vapi.ipsec_interface_add_del_spd(spd_id, cls.pg0.sw_if_index)
98             l_startaddr = r_startaddr = socket.inet_pton(
99                 socket.AF_INET, "0.0.0.0")
100             l_stopaddr = r_stopaddr = socket.inet_pton(
101                 socket.AF_INET, "255.255.255.255")
102             cls.vapi.ipsec_spd_add_del_entry(
103                 spd_id,
104                 l_startaddr,
105                 l_stopaddr,
106                 r_startaddr,
107                 r_stopaddr,
108                 protocol=50)
109             cls.vapi.ipsec_spd_add_del_entry(
110                 spd_id,
111                 l_startaddr,
112                 l_stopaddr,
113                 r_startaddr,
114                 r_stopaddr,
115                 protocol=50,
116                 is_outbound=0)
117             l_startaddr = l_stopaddr = socket.inet_pton(
118                 socket.AF_INET, cls.remote_pg0_lb_addr)
119             r_startaddr = r_stopaddr = socket.inet_pton(
120                 socket.AF_INET, cls.remote_pg1_lb_addr)
121             cls.vapi.ipsec_spd_add_del_entry(
122                 spd_id,
123                 l_startaddr,
124                 l_stopaddr,
125                 r_startaddr,
126                 r_stopaddr,
127                 priority=10,
128                 policy=3,
129                 is_outbound=0,
130                 sa_id=local_sa_id)
131             cls.vapi.ipsec_spd_add_del_entry(
132                 spd_id,
133                 r_startaddr,
134                 r_stopaddr,
135                 l_startaddr,
136                 l_stopaddr,
137                 priority=10,
138                 policy=3,
139                 sa_id=remote_sa_id)
140         except Exception:
141             raise
142
143     @classmethod
144     def configEspTra(cls):
145         try:
146             spd_id = 2
147             remote_sa_id = 30
148             local_sa_id = 40
149             remote_tra_spi = 2001
150             local_tra_spi = 2000
151             cls.vapi.ipsec_sad_add_del_entry(
152                 remote_sa_id,
153                 remote_tra_spi,
154                 integrity_key_length=20,
155                 crypto_key_length=16,
156                 protocol=1,
157                 is_tunnel=0)
158             cls.vapi.ipsec_sad_add_del_entry(
159                 local_sa_id,
160                 local_tra_spi,
161                 integrity_key_length=20,
162                 crypto_key_length=16,
163                 protocol=1,
164                 is_tunnel=0)
165             cls.vapi.ipsec_spd_add_del(spd_id)
166             cls.vapi.ipsec_interface_add_del_spd(spd_id, cls.pg2.sw_if_index)
167             l_startaddr = r_startaddr = socket.inet_pton(
168                 socket.AF_INET, "0.0.0.0")
169             l_stopaddr = r_stopaddr = socket.inet_pton(
170                 socket.AF_INET, "255.255.255.255")
171             cls.vapi.ipsec_spd_add_del_entry(
172                 spd_id,
173                 l_startaddr,
174                 l_stopaddr,
175                 r_startaddr,
176                 r_stopaddr,
177                 protocol=50)
178             cls.vapi.ipsec_spd_add_del_entry(
179                 spd_id,
180                 l_startaddr,
181                 l_stopaddr,
182                 r_startaddr,
183                 r_stopaddr,
184                 protocol=50,
185                 is_outbound=0)
186             l_startaddr = l_stopaddr = cls.pg2.local_ip4n
187             r_startaddr = r_stopaddr = cls.pg2.remote_ip4n
188             cls.vapi.ipsec_spd_add_del_entry(
189                 spd_id,
190                 l_startaddr,
191                 l_stopaddr,
192                 r_startaddr,
193                 r_stopaddr,
194                 priority=10,
195                 policy=3,
196                 is_outbound=0,
197                 sa_id=local_sa_id)
198             cls.vapi.ipsec_spd_add_del_entry(
199                 spd_id,
200                 l_startaddr,
201                 l_stopaddr,
202                 r_startaddr,
203                 r_stopaddr,
204                 priority=10,
205                 policy=3,
206                 sa_id=remote_sa_id)
207         except Exception:
208             raise
209
210     def configScapySA(self, is_tun=False):
211         if is_tun:
212             self.remote_tun_sa = SecurityAssociation(
213                 ESP,
214                 spi=0x000003e8,
215                 crypt_algo='AES-CBC',
216                 crypt_key='JPjyOWBeVEQiMe7h',
217                 auth_algo='HMAC-SHA1-96',
218                 auth_key='C91KUR9GYMm5GfkEvNjX',
219                 tunnel_header=IP(
220                     src=self.pg0.remote_ip4,
221                     dst=self.pg0.local_ip4))
222             self.local_tun_sa = SecurityAssociation(
223                 ESP,
224                 spi=0x000003e9,
225                 crypt_algo='AES-CBC',
226                 crypt_key='JPjyOWBeVEQiMe7h',
227                 auth_algo='HMAC-SHA1-96',
228                 auth_key='C91KUR9GYMm5GfkEvNjX',
229                 tunnel_header=IP(
230                     dst=self.pg0.remote_ip4,
231                     src=self.pg0.local_ip4))
232         else:
233             self.remote_tra_sa = SecurityAssociation(
234                 ESP,
235                 spi=0x000007d0,
236                 crypt_algo='AES-CBC',
237                 crypt_key='JPjyOWBeVEQiMe7h',
238                 auth_algo='HMAC-SHA1-96',
239                 auth_key='C91KUR9GYMm5GfkEvNjX')
240             self.local_tra_sa = SecurityAssociation(
241                 ESP,
242                 spi=0x000007d1,
243                 crypt_algo='AES-CBC',
244                 crypt_key='JPjyOWBeVEQiMe7h',
245                 auth_algo='HMAC-SHA1-96',
246                 auth_key='C91KUR9GYMm5GfkEvNjX')
247
248     def tearDown(self):
249         super(TestIpsecEsp, self).tearDown()
250         if not self.vpp_dead:
251             self.vapi.cli("show hardware")
252
253     def send_and_expect(self, input, pkts, output, count=1):
254         input.add_stream(pkts)
255         self.pg_enable_capture(self.pg_interfaces)
256         self.pg_start()
257         rx = output.get_capture(count)
258         return rx
259
260     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1):
261         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
262                 sa.encrypt(IP(src=src, dst=dst) / ICMP() /
263                 "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX")
264                 ] * count
265
266     def gen_pkts(self, sw_intf, src, dst, count=1):
267         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
268                 IP(src=src, dst=dst) / ICMP() /
269                 "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
270                 ] * count
271
272     def test_ipsec_esp_tra_basic(self, count=1):
273         """ ipsec esp v4 transport basic test """
274         try:
275             self.configScapySA()
276             send_pkts = self.gen_encrypt_pkts(
277                 self.remote_tra_sa,
278                 self.pg2,
279                 src=self.pg2.remote_ip4,
280                 dst=self.pg2.local_ip4,
281                 count=count)
282             recv_pkts = self.send_and_expect(
283                 self.pg2, send_pkts, self.pg2, count=count)
284             # ESP TRA VPP encryption/decryption verification
285             for Pkts in recv_pkts:
286                 decrypt_pkt = self.local_tra_sa.decrypt(Pkts[IP])
287         finally:
288             self.logger.info(self.vapi.ppcli("show error"))
289             self.logger.info(self.vapi.ppcli("show ipsec"))
290
291     def test_ipsec_esp_tra_burst(self):
292         """ ipsec esp v4 transport burst test """
293         try:
294             self.test_ipsec_esp_tra_basic(count=257)
295         finally:
296             self.logger.info(self.vapi.ppcli("show error"))
297             self.logger.info(self.vapi.ppcli("show ipsec"))
298
299     def test_ipsec_esp_tun_basic(self, count=1):
300         """ ipsec esp 4o4 tunnel basic test """
301         try:
302             self.configScapySA(is_tun=True)
303             send_pkts = self.gen_encrypt_pkts(
304                 self.remote_tun_sa,
305                 self.pg0,
306                 src=self.remote_pg0_lb_addr,
307                 dst=self.remote_pg1_lb_addr,
308                 count=count)
309             recv_pkts = self.send_and_expect(
310                 self.pg0, send_pkts, self.pg1, count=count)
311             # ESP TUN VPP decryption verification
312             for recv_pkt in recv_pkts:
313                 self.assert_equal(recv_pkt[IP].src, self.remote_pg0_lb_addr)
314                 self.assert_equal(recv_pkt[IP].dst, self.remote_pg1_lb_addr)
315             send_pkts = self.gen_pkts(
316                 self.pg1,
317                 src=self.remote_pg1_lb_addr,
318                 dst=self.remote_pg0_lb_addr,
319                 count=count)
320             recv_pkts = self.send_and_expect(
321                 self.pg1, send_pkts, self.pg0, count=count)
322             # ESP TUN VPP encryption verification
323             for recv_pkt in recv_pkts:
324                 decrypt_pkt = self.local_tun_sa.decrypt(recv_pkt[IP])
325                 self.assert_equal(decrypt_pkt.src, self.remote_pg1_lb_addr)
326                 self.assert_equal(decrypt_pkt.dst, self.remote_pg0_lb_addr)
327         finally:
328             self.logger.info(self.vapi.ppcli("show error"))
329             self.logger.info(self.vapi.ppcli("show ipsec"))
330
331     def test_ipsec_esp_tun_burst(self):
332         """ ipsec esp 4o4 tunnel burst test """
333         try:
334             self.test_ipsec_esp_tun_basic(count=257)
335         finally:
336             self.logger.info(self.vapi.ppcli("show error"))
337             self.logger.info(self.vapi.ppcli("show ipsec"))
338
339
340 if __name__ == '__main__':
341     unittest.main(testRunner=VppTestRunner)