IPSEC: API modernisation
[vpp.git] / test / template_ipsec.py
1 import unittest
2 import socket
3
4 from scapy.layers.inet import IP, ICMP, TCP
5 from scapy.layers.ipsec import SecurityAssociation
6 from scapy.layers.l2 import Ether, Raw
7 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
8
9 from framework import VppTestCase, VppTestRunner
10 from util import ppp
11 from vpp_papi import VppEnum
12
13
14 class IPsecIPv4Params(object):
15
16     addr_type = socket.AF_INET
17     addr_any = "0.0.0.0"
18     addr_bcast = "255.255.255.255"
19     addr_len = 32
20     is_ipv6 = 0
21
22     def __init__(self):
23         self.remote_tun_if_host = '1.1.1.1'
24
25         self.scapy_tun_sa_id = 10
26         self.scapy_tun_spi = 1001
27         self.vpp_tun_sa_id = 20
28         self.vpp_tun_spi = 1000
29
30         self.scapy_tra_sa_id = 30
31         self.scapy_tra_spi = 2001
32         self.vpp_tra_sa_id = 40
33         self.vpp_tra_spi = 2000
34
35         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
36                                  IPSEC_API_INTEG_ALG_SHA1_96)
37         self.auth_algo = 'HMAC-SHA1-96'  # scapy name
38         self.auth_key = 'C91KUR9GYMm5GfkEvNjX'
39
40         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
41                                   IPSEC_API_CRYPTO_ALG_AES_CBC_128)
42         self.crypt_algo = 'AES-CBC'  # scapy name
43         self.crypt_key = 'JPjyOWBeVEQiMe7h'
44
45
46 class IPsecIPv6Params(object):
47
48     addr_type = socket.AF_INET6
49     addr_any = "0::0"
50     addr_bcast = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"
51     addr_len = 128
52     is_ipv6 = 1
53
54     def __init__(self):
55         self.remote_tun_if_host = '1111:1111:1111:1111:1111:1111:1111:1111'
56
57         self.scapy_tun_sa_id = 50
58         self.scapy_tun_spi = 3001
59         self.vpp_tun_sa_id = 60
60         self.vpp_tun_spi = 3000
61
62         self.scapy_tra_sa_id = 70
63         self.scapy_tra_spi = 4001
64         self.vpp_tra_sa_id = 80
65         self.vpp_tra_spi = 4000
66
67         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
68                                  IPSEC_API_INTEG_ALG_SHA_256_128)
69         self.auth_algo = 'SHA2-256-128'  # scapy name
70         self.auth_key = 'C91KUR9GYMm5GfkEvNjX'
71
72         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
73                                   IPSEC_API_CRYPTO_ALG_AES_CBC_256)
74         self.crypt_algo = 'AES-CBC'  # scapy name
75         self.crypt_key = 'JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h'
76
77
78 class TemplateIpsec(VppTestCase):
79     """
80     TRANSPORT MODE:
81
82      ------   encrypt   ---
83     |tra_if| <-------> |VPP|
84      ------   decrypt   ---
85
86     TUNNEL MODE:
87
88      ------   encrypt   ---   plain   ---
89     |tun_if| <-------  |VPP| <------ |pg1|
90      ------             ---           ---
91
92      ------   decrypt   ---   plain   ---
93     |tun_if| ------->  |VPP| ------> |pg1|
94      ------             ---           ---
95     """
96
97     def ipsec_select_backend(self):
98         """ empty method to be overloaded when necessary """
99         pass
100
101     def setUp(self):
102         super(TemplateIpsec, self).setUp()
103
104         self.ipv4_params = IPsecIPv4Params()
105         self.ipv6_params = IPsecIPv6Params()
106         self.params = {self.ipv4_params.addr_type: self.ipv4_params,
107                        self.ipv6_params.addr_type: self.ipv6_params}
108
109         self.payload = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"\
110                        "XXXXXXXXXXXXXXXXXXXXX"
111
112         self.tun_spd_id = 1
113         self.tra_spd_id = 2
114
115         self.vpp_esp_protocol = (VppEnum.vl_api_ipsec_proto_t.
116                                  IPSEC_API_PROTO_ESP)
117         self.vpp_ah_protocol = (VppEnum.vl_api_ipsec_proto_t.
118                                 IPSEC_API_PROTO_AH)
119
120         self.create_pg_interfaces(range(3))
121         self.interfaces = list(self.pg_interfaces)
122         for i in self.interfaces:
123             i.admin_up()
124             i.config_ip4()
125             i.resolve_arp()
126             i.config_ip6()
127             i.resolve_ndp()
128         self.ipsec_select_backend()
129
130     def tearDown(self):
131         super(TemplateIpsec, self).tearDown()
132
133         for i in self.interfaces:
134             i.admin_down()
135             i.unconfig_ip4()
136             i.unconfig_ip6()
137
138         if not self.vpp_dead:
139             self.vapi.cli("show hardware")
140
141     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1):
142         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
143                 sa.encrypt(IP(src=src, dst=dst) / ICMP() / self.payload)
144                 for i in range(count)]
145
146     def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1):
147         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
148                 sa.encrypt(IPv6(src=src, dst=dst) /
149                            ICMPv6EchoRequest(id=0, seq=1, data=self.payload))
150                 for i in range(count)]
151
152     def gen_pkts(self, sw_intf, src, dst, count=1):
153         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
154                 IP(src=src, dst=dst) / ICMP() / self.payload
155                 for i in range(count)]
156
157     def gen_pkts6(self, sw_intf, src, dst, count=1):
158         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
159                 IPv6(src=src, dst=dst) /
160                 ICMPv6EchoRequest(id=0, seq=1, data=self.payload)
161                 for i in range(count)]
162
163     def configure_sa_tun(self, params):
164         ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
165         scapy_tun_sa = SecurityAssociation(
166             self.encryption_type, spi=params.vpp_tun_spi,
167             crypt_algo=params.crypt_algo, crypt_key=params.crypt_key,
168             auth_algo=params.auth_algo, auth_key=params.auth_key,
169             tunnel_header=ip_class_by_addr_type[params.addr_type](
170                 src=self.tun_if.remote_addr[params.addr_type],
171                 dst=self.tun_if.local_addr[params.addr_type]))
172         vpp_tun_sa = SecurityAssociation(
173             self.encryption_type, spi=params.scapy_tun_spi,
174             crypt_algo=params.crypt_algo, crypt_key=params.crypt_key,
175             auth_algo=params.auth_algo, auth_key=params.auth_key,
176             tunnel_header=ip_class_by_addr_type[params.addr_type](
177                 dst=self.tun_if.remote_addr[params.addr_type],
178                 src=self.tun_if.local_addr[params.addr_type]))
179         return vpp_tun_sa, scapy_tun_sa
180
181     def configure_sa_tra(self, params):
182         params.scapy_tra_sa = SecurityAssociation(self.encryption_type,
183                                                   spi=params.vpp_tra_spi,
184                                                   crypt_algo=params.crypt_algo,
185                                                   crypt_key=params.crypt_key,
186                                                   auth_algo=params.auth_algo,
187                                                   auth_key=params.auth_key)
188         params.vpp_tra_sa = SecurityAssociation(self.encryption_type,
189                                                 spi=params.scapy_tra_spi,
190                                                 crypt_algo=params.crypt_algo,
191                                                 crypt_key=params.crypt_key,
192                                                 auth_algo=params.auth_algo,
193                                                 auth_key=params.auth_key)
194
195
196 class IpsecTcpTests(object):
197     def test_tcp_checksum(self):
198         """ verify checksum correctness for vpp generated packets """
199         self.vapi.cli("test http server")
200         p = self.params[socket.AF_INET]
201         vpp_tun_sa, scapy_tun_sa = self.configure_sa_tun(p)
202         send = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
203                 scapy_tun_sa.encrypt(IP(src=p.remote_tun_if_host,
204                                         dst=self.tun_if.local_ip4) /
205                                      TCP(flags='S', dport=80)))
206         self.logger.debug(ppp("Sending packet:", send))
207         recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
208         recv = recv[0]
209         decrypted = vpp_tun_sa.decrypt(recv[IP])
210         self.assert_packet_checksums_valid(decrypted)
211
212
213 class IpsecTraTests(object):
214     def test_tra_anti_replay(self, count=1):
215         """ ipsec v4 transport anti-reply test """
216         p = self.params[socket.AF_INET]
217
218         # fire in a packet with seq number 1
219         pkt = (Ether(src=self.tra_if.remote_mac,
220                      dst=self.tra_if.local_mac) /
221                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
222                                          dst=self.tra_if.local_ip4) /
223                                       ICMP(),
224                                       seq_num=1))
225         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
226
227         # now move the window over to 235
228         pkt = (Ether(src=self.tra_if.remote_mac,
229                      dst=self.tra_if.local_mac) /
230                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
231                                          dst=self.tra_if.local_ip4) /
232                                       ICMP(),
233                                       seq_num=235))
234         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
235
236         # the window size is 64 packets
237         # in window are still accepted
238         pkt = (Ether(src=self.tra_if.remote_mac,
239                      dst=self.tra_if.local_mac) /
240                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
241                                          dst=self.tra_if.local_ip4) /
242                                       ICMP(),
243                                       seq_num=172))
244         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
245
246         # out of window are dropped
247         pkt = (Ether(src=self.tra_if.remote_mac,
248                      dst=self.tra_if.local_mac) /
249                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
250                                          dst=self.tra_if.local_ip4) /
251                                       ICMP(),
252                                       seq_num=17))
253         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
254
255         self.assert_packet_counter_equal(
256             '/err/%s/SA replayed packet' % self.tra4_decrypt_node_name, 17)
257
258         # a packet that does not decrypt does not move the window forward
259         bogus_sa = SecurityAssociation(self.encryption_type,
260                                        p.vpp_tra_spi)
261         pkt = (Ether(src=self.tra_if.remote_mac,
262                      dst=self.tra_if.local_mac) /
263                bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
264                                    dst=self.tra_if.local_ip4) /
265                                 ICMP(),
266                                 seq_num=350))
267         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
268
269         self.assert_packet_counter_equal(
270             '/err/%s/Integrity check failed' % self.tra4_decrypt_node_name, 17)
271
272         # which we can determine since this packet is still in the window
273         pkt = (Ether(src=self.tra_if.remote_mac,
274                      dst=self.tra_if.local_mac) /
275                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
276                                          dst=self.tra_if.local_ip4) /
277                                       ICMP(),
278                                       seq_num=234))
279         self.send_and_expect(self.tra_if, [pkt], self.tra_if)
280
281         # move the security-associations seq number on to the last we used
282         p.scapy_tra_sa.seq_num = 351
283         p.vpp_tra_sa.seq_num = 351
284
285     def test_tra_basic(self, count=1):
286         """ ipsec v4 transport basic test """
287         self.vapi.cli("clear errors")
288         try:
289             p = self.params[socket.AF_INET]
290             send_pkts = self.gen_encrypt_pkts(p.scapy_tra_sa, self.tra_if,
291                                               src=self.tra_if.remote_ip4,
292                                               dst=self.tra_if.local_ip4,
293                                               count=count)
294             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
295                                              self.tra_if)
296             for rx in recv_pkts:
297                 try:
298                     decrypted = p.vpp_tra_sa.decrypt(rx[IP])
299                     self.assert_packet_checksums_valid(decrypted)
300                 except:
301                     self.logger.debug(ppp("Unexpected packet:", rx))
302                     raise
303         finally:
304             self.logger.info(self.vapi.ppcli("show error"))
305             self.logger.info(self.vapi.ppcli("show ipsec"))
306
307         self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
308         self.assert_packet_counter_equal(self.tra4_decrypt_node_name, count)
309
310     def test_tra_burst(self):
311         """ ipsec v4 transport burst test """
312         self.test_tra_basic(count=257)
313
314     def test_tra_basic6(self, count=1):
315         """ ipsec v6 transport basic test """
316         self.vapi.cli("clear errors")
317         try:
318             p = self.params[socket.AF_INET6]
319             send_pkts = self.gen_encrypt_pkts6(p.scapy_tra_sa, self.tra_if,
320                                                src=self.tra_if.remote_ip6,
321                                                dst=self.tra_if.local_ip6,
322                                                count=count)
323             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
324                                              self.tra_if)
325             for rx in recv_pkts:
326                 try:
327                     decrypted = p.vpp_tra_sa.decrypt(rx[IPv6])
328                     self.assert_packet_checksums_valid(decrypted)
329                 except:
330                     self.logger.debug(ppp("Unexpected packet:", rx))
331                     raise
332         finally:
333             self.logger.info(self.vapi.ppcli("show error"))
334             self.logger.info(self.vapi.ppcli("show ipsec"))
335
336         self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
337         self.assert_packet_counter_equal(self.tra6_decrypt_node_name, count)
338
339     def test_tra_burst6(self):
340         """ ipsec v6 transport burst test """
341         self.test_tra_basic6(count=257)
342
343
344 class IpsecTun4Tests(object):
345     def test_tun_basic44(self, count=1):
346         """ ipsec 4o4 tunnel basic test """
347         self.vapi.cli("clear errors")
348         try:
349             p = self.params[socket.AF_INET]
350             vpp_tun_sa, scapy_tun_sa = self.configure_sa_tun(p)
351             send_pkts = self.gen_encrypt_pkts(scapy_tun_sa, self.tun_if,
352                                               src=p.remote_tun_if_host,
353                                               dst=self.pg1.remote_ip4,
354                                               count=count)
355             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
356             for recv_pkt in recv_pkts:
357                 self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host)
358                 self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
359                 self.assert_packet_checksums_valid(recv_pkt)
360             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
361                                       dst=p.remote_tun_if_host, count=count)
362             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
363             for recv_pkt in recv_pkts:
364                 try:
365                     decrypt_pkt = vpp_tun_sa.decrypt(recv_pkt[IP])
366                     if not decrypt_pkt.haslayer(IP):
367                         decrypt_pkt = IP(decrypt_pkt[Raw].load)
368                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
369                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
370                     self.assert_packet_checksums_valid(decrypt_pkt)
371                 except:
372                     self.logger.debug(ppp("Unexpected packet:", recv_pkt))
373                     try:
374                         self.logger.debug(
375                             ppp("Decrypted packet:", decrypt_pkt))
376                     except:
377                         pass
378                     raise
379         finally:
380             self.logger.info(self.vapi.ppcli("show error"))
381             self.logger.info(self.vapi.ppcli("show ipsec"))
382
383         self.assert_packet_counter_equal(self.tun4_encrypt_node_name, count)
384         self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
385
386     def test_tun_burst44(self):
387         """ ipsec 4o4 tunnel burst test """
388         self.test_tun_basic44(count=257)
389
390
391 class IpsecTun6Tests(object):
392     def test_tun_basic66(self, count=1):
393         """ ipsec 6o6 tunnel basic test """
394         self.vapi.cli("clear errors")
395         try:
396             p = self.params[socket.AF_INET6]
397             vpp_tun_sa, scapy_tun_sa = self.configure_sa_tun(p)
398             send_pkts = self.gen_encrypt_pkts6(scapy_tun_sa, self.tun_if,
399                                                src=p.remote_tun_if_host,
400                                                dst=self.pg1.remote_ip6,
401                                                count=count)
402             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
403             for recv_pkt in recv_pkts:
404                 self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host)
405                 self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
406                 self.assert_packet_checksums_valid(recv_pkt)
407             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
408                                        dst=p.remote_tun_if_host,
409                                        count=count)
410             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
411             for recv_pkt in recv_pkts:
412                 try:
413                     decrypt_pkt = vpp_tun_sa.decrypt(recv_pkt[IPv6])
414                     if not decrypt_pkt.haslayer(IPv6):
415                         decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
416                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
417                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
418                     self.assert_packet_checksums_valid(decrypt_pkt)
419                 except:
420                     self.logger.debug(ppp("Unexpected packet:", recv_pkt))
421                     try:
422                         self.logger.debug(
423                             ppp("Decrypted packet:", decrypt_pkt))
424                     except:
425                         pass
426                     raise
427         finally:
428             self.logger.info(self.vapi.ppcli("show error"))
429             self.logger.info(self.vapi.ppcli("show ipsec"))
430
431         self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
432         self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count)
433
434     def test_tun_burst66(self):
435         """ ipsec 6o6 tunnel burst test """
436         self.test_tun_basic66(count=257)
437
438
439 class IpsecTunTests(IpsecTun4Tests, IpsecTun6Tests):
440     pass
441
442
443 if __name__ == '__main__':
444     unittest.main(testRunner=VppTestRunner)