ESP_AH_test_automation_scripts rev1
[vpp.git] / test / test_ipsec_ah.py
1 import socket
2
3 from scapy.layers.inet import IP, ICMP
4 from scapy.layers.l2 import Ether
5 from scapy.layers.ipsec import *
6
7 from framework import VppTestCase
8 from vpp_ip_route import VppIpRoute
9
10 from util import ppp
11
12
13 class TestIpsecAh(VppTestCase):
14     """
15     Basic test for IPSEC using AH transport and Tunnel mode
16
17     Below 4 cases are covered as part of this test
18     1) ipsec ah v4 transport basic test  - IPv4 Transport mode
19      scenario using HMAC-SHA1-96 intergrity algo
20     2) ipsec ah v4 transport burst test
21      Above test for 257 pkts
22     3) ipsec ah 4o4 tunnel basic test    - IPv4 Tunnel mode
23      scenario using HMAC-SHA1-96 intergrity algo
24     4) ipsec ah 4o4 tunnel burst test
25      Above test for 257 pkts
26
27     TRANSPORT MODE:
28
29      ---   encrypt   ---
30     |pg2| <-------> |VPP|
31      ---   decrypt   ---
32
33     TUNNEL MODE:
34
35      ---   encrypt   ---   plain   ---
36     |pg0| ------->  |VPP| ------> |pg1|
37      ---             ---           ---
38
39      ---   decrypt   ---   plain   ---
40     |pg0| <-------  |VPP| <------ |pg1|
41      ---             ---           ---
42
43     Note : IPv6 is not covered
44     """
45
46     remote_pg0_lb_addr = '1.1.1.1'
47     remote_pg1_lb_addr = '2.2.2.2'
48
49     @classmethod
50     def setUpClass(cls):
51         super(TestIpsecAh, cls).setUpClass()
52         try:
53             cls.create_pg_interfaces(range(3))
54             cls.interfaces = list(cls.pg_interfaces)
55             for i in cls.interfaces:
56                 i.admin_up()
57                 i.config_ip4()
58                 i.resolve_arp()
59             cls.logger.info(cls.vapi.ppcli("show int addr"))
60             cls.configAhTra()
61             cls.logger.info(cls.vapi.ppcli("show ipsec"))
62             cls.configAhTun()
63             cls.logger.info(cls.vapi.ppcli("show ipsec"))
64         except Exception:
65             super(TestIpsecAh, cls).tearDownClass()
66             raise
67
68     @classmethod
69     def configAhTun(cls):
70         try:
71             spd_id = 1
72             remote_sa_id = 10
73             local_sa_id = 20
74             remote_tun_spi = 1001
75             local_tun_spi = 1000
76             src4 = socket.inet_pton(socket.AF_INET, cls.remote_pg0_lb_addr)
77             cls.vapi.ip_add_del_route(src4, 32, cls.pg0.remote_ip4n)
78             dst4 = socket.inet_pton(socket.AF_INET, cls.remote_pg1_lb_addr)
79             cls.vapi.ip_add_del_route(dst4, 32, cls.pg1.remote_ip4n)
80             cls.vapi.ipsec_sad_add_del_entry(
81                 remote_sa_id,
82                 remote_tun_spi,
83                 cls.pg0.local_ip4n,
84                 cls.pg0.remote_ip4n,
85                 integrity_key_length=20)
86             cls.vapi.ipsec_sad_add_del_entry(
87                 local_sa_id,
88                 local_tun_spi,
89                 cls.pg0.remote_ip4n,
90                 cls.pg0.local_ip4n,
91                 integrity_key_length=20)
92             cls.vapi.ipsec_spd_add_del(spd_id)
93             cls.vapi.ipsec_interface_add_del_spd(spd_id, cls.pg0.sw_if_index)
94             l_startaddr = r_startaddr = socket.inet_pton(
95                 socket.AF_INET, "0.0.0.0")
96             l_stopaddr = r_stopaddr = socket.inet_pton(
97                 socket.AF_INET, "255.255.255.255")
98             cls.vapi.ipsec_spd_add_del_entry(
99                 spd_id,
100                 l_startaddr,
101                 l_stopaddr,
102                 r_startaddr,
103                 r_stopaddr,
104                 protocol=51)
105             cls.vapi.ipsec_spd_add_del_entry(
106                 spd_id,
107                 l_startaddr,
108                 l_stopaddr,
109                 r_startaddr,
110                 r_stopaddr,
111                 protocol=51,
112                 is_outbound=0)
113             l_startaddr = l_stopaddr = socket.inet_pton(
114                 socket.AF_INET, cls.remote_pg0_lb_addr)
115             r_startaddr = r_stopaddr = socket.inet_pton(
116                 socket.AF_INET, cls.remote_pg1_lb_addr)
117             cls.vapi.ipsec_spd_add_del_entry(
118                 spd_id,
119                 l_startaddr,
120                 l_stopaddr,
121                 r_startaddr,
122                 r_stopaddr,
123                 priority=10,
124                 policy=3,
125                 is_outbound=0,
126                 sa_id=local_sa_id)
127             cls.vapi.ipsec_spd_add_del_entry(
128                 spd_id,
129                 r_startaddr,
130                 r_stopaddr,
131                 l_startaddr,
132                 l_stopaddr,
133                 priority=10,
134                 policy=3,
135                 sa_id=remote_sa_id)
136         except Exception:
137             raise
138
139     @classmethod
140     def configAhTra(cls):
141         try:
142             spd_id = 2
143             remote_sa_id = 30
144             local_sa_id = 40
145             remote_tra_spi = 2001
146             local_tra_spi = 2000
147             cls.vapi.ipsec_sad_add_del_entry(
148                 remote_sa_id,
149                 remote_tra_spi,
150                 integrity_key_length=20,
151                 is_tunnel=0)
152             cls.vapi.ipsec_sad_add_del_entry(
153                 local_sa_id,
154                 local_tra_spi,
155                 integrity_key_length=20,
156                 is_tunnel=0)
157             cls.vapi.ipsec_spd_add_del(spd_id)
158             cls.vapi.ipsec_interface_add_del_spd(spd_id, cls.pg2.sw_if_index)
159             l_startaddr = r_startaddr = socket.inet_pton(
160                 socket.AF_INET, "0.0.0.0")
161             l_stopaddr = r_stopaddr = socket.inet_pton(
162                 socket.AF_INET, "255.255.255.255")
163             cls.vapi.ipsec_spd_add_del_entry(
164                 spd_id,
165                 l_startaddr,
166                 l_stopaddr,
167                 r_startaddr,
168                 r_stopaddr,
169                 protocol=51)
170             cls.vapi.ipsec_spd_add_del_entry(
171                 spd_id,
172                 l_startaddr,
173                 l_stopaddr,
174                 r_startaddr,
175                 r_stopaddr,
176                 protocol=51,
177                 is_outbound=0)
178             l_startaddr = l_stopaddr = cls.pg2.local_ip4n
179             r_startaddr = r_stopaddr = cls.pg2.remote_ip4n
180             cls.vapi.ipsec_spd_add_del_entry(
181                 spd_id,
182                 l_startaddr,
183                 l_stopaddr,
184                 r_startaddr,
185                 r_stopaddr,
186                 priority=10,
187                 policy=3,
188                 is_outbound=0,
189                 sa_id=local_sa_id)
190             cls.vapi.ipsec_spd_add_del_entry(
191                 spd_id,
192                 l_startaddr,
193                 l_stopaddr,
194                 r_startaddr,
195                 r_stopaddr,
196                 priority=10,
197                 policy=3,
198                 sa_id=remote_sa_id)
199         except Exception:
200             raise
201
202     def configScapySA(self, is_tun=False):
203         if is_tun:
204             self.remote_tun_sa = SecurityAssociation(
205                 AH,
206                 spi=0x000003e8,
207                 auth_algo='HMAC-SHA1-96',
208                 auth_key='C91KUR9GYMm5GfkEvNjX',
209                 tunnel_header=IP(
210                     src=self.pg0.remote_ip4,
211                     dst=self.pg0.local_ip4))
212             self.local_tun_sa = SecurityAssociation(
213                 AH,
214                 spi=0x000003e9,
215                 auth_algo='HMAC-SHA1-96',
216                 auth_key='C91KUR9GYMm5GfkEvNjX',
217                 tunnel_header=IP(
218                     dst=self.pg0.remote_ip4,
219                     src=self.pg0.local_ip4))
220         else:
221             self.remote_tra_sa = SecurityAssociation(
222                 AH, spi=0x000007d0, auth_algo='HMAC-SHA1-96',
223                 auth_key='C91KUR9GYMm5GfkEvNjX')
224             self.local_tra_sa = SecurityAssociation(
225                 AH, spi=0x000007d1, auth_algo='HMAC-SHA1-96',
226                 auth_key='C91KUR9GYMm5GfkEvNjX')
227
228     def tearDown(self):
229         super(TestIpsecAh, self).tearDown()
230         if not self.vpp_dead:
231             self.vapi.cli("show hardware")
232
233     def send_and_expect(self, input, pkts, output, count=1):
234         input.add_stream(pkts)
235         self.pg_enable_capture(self.pg_interfaces)
236         self.pg_start()
237         rx = output.get_capture(count)
238         return rx
239
240     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1):
241         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
242                 sa.encrypt(IP(src=src, dst=dst) / ICMP() /
243                 "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX")
244                 ] * count
245
246     def gen_pkts(self, sw_intf, src, dst, count=1):
247         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
248                 IP(src=src, dst=dst) / ICMP() /
249                 "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
250                 ] * count
251
252     def test_ipsec_ah_tra_basic(self, count=1):
253         """ ipsec ah v4 transport basic test """
254         try:
255             self.configScapySA()
256             send_pkts = self.gen_encrypt_pkts(
257                 self.remote_tra_sa,
258                 self.pg2,
259                 src=self.pg2.remote_ip4,
260                 dst=self.pg2.local_ip4,
261                 count=count)
262             recv_pkts = self.send_and_expect(
263                 self.pg2, send_pkts, self.pg2, count=count)
264             # ESP TRA VPP encryption/decryption verification
265             for Pkts in recv_pkts:
266                 Pkts[AH].padding = Pkts[AH].icv[12:]
267                 Pkts[AH].icv = Pkts[AH].icv[:12]
268                 decrypt_pkt = self.local_tra_sa.decrypt(Pkts[IP])
269         finally:
270             self.logger.info(self.vapi.ppcli("show error"))
271             self.logger.info(self.vapi.ppcli("show ipsec"))
272
273     def test_ipsec_ah_tra_burst(self):
274         """ ipsec ah v4 transport burst test """
275         try:
276             self.test_ipsec_ah_tra_basic(count=257)
277         finally:
278             self.logger.info(self.vapi.ppcli("show error"))
279             self.logger.info(self.vapi.ppcli("show ipsec"))
280
281     def test_ipsec_ah_tun_basic(self, count=1):
282         """ ipsec ah 4o4 tunnel basic test """
283         try:
284             self.configScapySA(is_tun=True)
285             send_pkts = self.gen_encrypt_pkts(
286                 self.remote_tun_sa,
287                 self.pg0,
288                 src=self.remote_pg0_lb_addr,
289                 dst=self.remote_pg1_lb_addr,
290                 count=count)
291             recv_pkts = self.send_and_expect(
292                 self.pg0, send_pkts, self.pg1, count=count)
293             # ESP TUN VPP decryption verification
294             for recv_pkt in recv_pkts:
295                 self.assert_equal(recv_pkt[IP].src, self.remote_pg0_lb_addr)
296                 self.assert_equal(recv_pkt[IP].dst, self.remote_pg1_lb_addr)
297             send_pkts = self.gen_pkts(
298                 self.pg1,
299                 src=self.remote_pg1_lb_addr,
300                 dst=self.remote_pg0_lb_addr,
301                 count=count)
302             recv_pkts = self.send_and_expect(
303                 self.pg1, send_pkts, self.pg0, count=count)
304             # ESP TUN VPP encryption verification
305             for recv_pkt in recv_pkts:
306                 recv_pkt[IP] = recv_pkt[IP] / IP(recv_pkt[AH].icv[12:])
307                 recv_pkt[AH].icv = recv_pkt[AH].icv[:12]
308                 decrypt_pkt = self.local_tun_sa.decrypt(recv_pkt[IP])
309                 self.assert_equal(decrypt_pkt.src, self.remote_pg1_lb_addr)
310                 self.assert_equal(decrypt_pkt.dst, self.remote_pg0_lb_addr)
311         finally:
312             self.logger.info(self.vapi.ppcli("show error"))
313             self.logger.info(self.vapi.ppcli("show ipsec"))
314
315     def test_ipsec_ah_tun_burst(self):
316         """ ipsec ah 4o4 tunnel burst test """
317         try:
318             self.test_ipsec_ah_tun_basic(count=257)
319         finally:
320             self.logger.info(self.vapi.ppcli("show error"))
321             self.logger.info(self.vapi.ppcli("show ipsec"))
322
323
324 if __name__ == '__main__':
325     unittest.main(testRunner=VppTestRunner)