TEST: IPSEC NAT-T with UDP header
[vpp.git] / test / template_ipsec.py
1 import unittest
2 import socket
3
4 from scapy.layers.inet import IP, ICMP, TCP, UDP
5 from scapy.layers.ipsec import SecurityAssociation
6 from scapy.layers.l2 import Ether, Raw
7 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
8
9 from framework import VppTestCase, VppTestRunner
10 from util import ppp
11 from vpp_papi import VppEnum
12
13
14 class IPsecIPv4Params(object):
15
16     addr_type = socket.AF_INET
17     addr_any = "0.0.0.0"
18     addr_bcast = "255.255.255.255"
19     addr_len = 32
20     is_ipv6 = 0
21
22     def __init__(self):
23         self.remote_tun_if_host = '1.1.1.1'
24
25         self.scapy_tun_sa_id = 10
26         self.scapy_tun_spi = 1001
27         self.vpp_tun_sa_id = 20
28         self.vpp_tun_spi = 1000
29
30         self.scapy_tra_sa_id = 30
31         self.scapy_tra_spi = 2001
32         self.vpp_tra_sa_id = 40
33         self.vpp_tra_spi = 2000
34
35         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
36                                  IPSEC_API_INTEG_ALG_SHA1_96)
37         self.auth_algo = 'HMAC-SHA1-96'  # scapy name
38         self.auth_key = 'C91KUR9GYMm5GfkEvNjX'
39
40         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
41                                   IPSEC_API_CRYPTO_ALG_AES_CBC_128)
42         self.crypt_algo = 'AES-CBC'  # scapy name
43         self.crypt_key = 'JPjyOWBeVEQiMe7h'
44         self.flags = 0
45         self.nat_header = None
46
47
48 class IPsecIPv6Params(object):
49
50     addr_type = socket.AF_INET6
51     addr_any = "0::0"
52     addr_bcast = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"
53     addr_len = 128
54     is_ipv6 = 1
55
56     def __init__(self):
57         self.remote_tun_if_host = '1111:1111:1111:1111:1111:1111:1111:1111'
58
59         self.scapy_tun_sa_id = 50
60         self.scapy_tun_spi = 3001
61         self.vpp_tun_sa_id = 60
62         self.vpp_tun_spi = 3000
63
64         self.scapy_tra_sa_id = 70
65         self.scapy_tra_spi = 4001
66         self.vpp_tra_sa_id = 80
67         self.vpp_tra_spi = 4000
68
69         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
70                                  IPSEC_API_INTEG_ALG_SHA_256_128)
71         self.auth_algo = 'SHA2-256-128'  # scapy name
72         self.auth_key = 'C91KUR9GYMm5GfkEvNjX'
73
74         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
75                                   IPSEC_API_CRYPTO_ALG_AES_CBC_256)
76         self.crypt_algo = 'AES-CBC'  # scapy name
77         self.crypt_key = 'JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h'
78         self.flags = 0
79         self.nat_header = None
80
81
82 class TemplateIpsec(VppTestCase):
83     """
84     TRANSPORT MODE:
85
86      ------   encrypt   ---
87     |tra_if| <-------> |VPP|
88      ------   decrypt   ---
89
90     TUNNEL MODE:
91
92      ------   encrypt   ---   plain   ---
93     |tun_if| <-------  |VPP| <------ |pg1|
94      ------             ---           ---
95
96      ------   decrypt   ---   plain   ---
97     |tun_if| ------->  |VPP| ------> |pg1|
98      ------             ---           ---
99     """
100
101     def ipsec_select_backend(self):
102         """ empty method to be overloaded when necessary """
103         pass
104
105     def setUp(self):
106         super(TemplateIpsec, self).setUp()
107
108         self.ipv4_params = IPsecIPv4Params()
109         self.ipv6_params = IPsecIPv6Params()
110         self.params = {self.ipv4_params.addr_type: self.ipv4_params,
111                        self.ipv6_params.addr_type: self.ipv6_params}
112
113         self.payload = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"\
114                        "XXXXXXXXXXXXXXXXXXXXX"
115
116         self.tun_spd_id = 1
117         self.tra_spd_id = 2
118
119         self.vpp_esp_protocol = (VppEnum.vl_api_ipsec_proto_t.
120                                  IPSEC_API_PROTO_ESP)
121         self.vpp_ah_protocol = (VppEnum.vl_api_ipsec_proto_t.
122                                 IPSEC_API_PROTO_AH)
123
124         self.create_pg_interfaces(range(3))
125         self.interfaces = list(self.pg_interfaces)
126         for i in self.interfaces:
127             i.admin_up()
128             i.config_ip4()
129             i.resolve_arp()
130             i.config_ip6()
131             i.resolve_ndp()
132         self.ipsec_select_backend()
133
134     def tearDown(self):
135         super(TemplateIpsec, self).tearDown()
136
137         for i in self.interfaces:
138             i.admin_down()
139             i.unconfig_ip4()
140             i.unconfig_ip6()
141
142         if not self.vpp_dead:
143             self.vapi.cli("show hardware")
144
145     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1):
146         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
147                 sa.encrypt(IP(src=src, dst=dst) / ICMP() / self.payload)
148                 for i in range(count)]
149
150     def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1):
151         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
152                 sa.encrypt(IPv6(src=src, dst=dst) /
153                            ICMPv6EchoRequest(id=0, seq=1, data=self.payload))
154                 for i in range(count)]
155
156     def gen_pkts(self, sw_intf, src, dst, count=1):
157         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
158                 IP(src=src, dst=dst) / ICMP() / self.payload
159                 for i in range(count)]
160
161     def gen_pkts6(self, sw_intf, src, dst, count=1):
162         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
163                 IPv6(src=src, dst=dst) /
164                 ICMPv6EchoRequest(id=0, seq=1, data=self.payload)
165                 for i in range(count)]
166
167     def configure_sa_tun(self, params):
168         ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
169         scapy_tun_sa = SecurityAssociation(
170             self.encryption_type, spi=params.vpp_tun_spi,
171             crypt_algo=params.crypt_algo, crypt_key=params.crypt_key,
172             auth_algo=params.auth_algo, auth_key=params.auth_key,
173             tunnel_header=ip_class_by_addr_type[params.addr_type](
174                 src=self.tun_if.remote_addr[params.addr_type],
175                 dst=self.tun_if.local_addr[params.addr_type]),
176             nat_t_header=params.nat_header)
177         vpp_tun_sa = SecurityAssociation(
178             self.encryption_type, spi=params.scapy_tun_spi,
179             crypt_algo=params.crypt_algo, crypt_key=params.crypt_key,
180             auth_algo=params.auth_algo, auth_key=params.auth_key,
181             tunnel_header=ip_class_by_addr_type[params.addr_type](
182                 dst=self.tun_if.remote_addr[params.addr_type],
183                 src=self.tun_if.local_addr[params.addr_type]),
184             nat_t_header=params.nat_header)
185         return vpp_tun_sa, scapy_tun_sa
186
187     def configure_sa_tra(self, params):
188         params.scapy_tra_sa = SecurityAssociation(
189             self.encryption_type,
190             spi=params.vpp_tra_spi,
191             crypt_algo=params.crypt_algo,
192             crypt_key=params.crypt_key,
193             auth_algo=params.auth_algo,
194             auth_key=params.auth_key,
195             nat_t_header=params.nat_header)
196         params.vpp_tra_sa = SecurityAssociation(
197             self.encryption_type,
198             spi=params.scapy_tra_spi,
199             crypt_algo=params.crypt_algo,
200             crypt_key=params.crypt_key,
201             auth_algo=params.auth_algo,
202             auth_key=params.auth_key,
203             nat_t_header=params.nat_header)
204
205
206 class IpsecTcpTests(object):
207     def test_tcp_checksum(self):
208         """ verify checksum correctness for vpp generated packets """
209         self.vapi.cli("test http server")
210         p = self.params[socket.AF_INET]
211         vpp_tun_sa, scapy_tun_sa = self.configure_sa_tun(p)
212         send = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
213                 scapy_tun_sa.encrypt(IP(src=p.remote_tun_if_host,
214                                         dst=self.tun_if.local_ip4) /
215                                      TCP(flags='S', dport=80)))
216         self.logger.debug(ppp("Sending packet:", send))
217         recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
218         recv = recv[0]
219         decrypted = vpp_tun_sa.decrypt(recv[IP])
220         self.assert_packet_checksums_valid(decrypted)
221
222
223 class IpsecTra4Tests(object):
224     def test_tra_anti_replay(self, count=1):
225         """ ipsec v4 transport anti-reply test """
226         p = self.params[socket.AF_INET]
227
228         # fire in a packet with seq number 1
229         pkt = (Ether(src=self.tra_if.remote_mac,
230                      dst=self.tra_if.local_mac) /
231                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
232                                          dst=self.tra_if.local_ip4) /
233                                       ICMP(),
234                                       seq_num=1))
235         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
236
237         # now move the window over to 235
238         pkt = (Ether(src=self.tra_if.remote_mac,
239                      dst=self.tra_if.local_mac) /
240                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
241                                          dst=self.tra_if.local_ip4) /
242                                       ICMP(),
243                                       seq_num=235))
244         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
245
246         # the window size is 64 packets
247         # in window are still accepted
248         pkt = (Ether(src=self.tra_if.remote_mac,
249                      dst=self.tra_if.local_mac) /
250                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
251                                          dst=self.tra_if.local_ip4) /
252                                       ICMP(),
253                                       seq_num=172))
254         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
255
256         # out of window are dropped
257         pkt = (Ether(src=self.tra_if.remote_mac,
258                      dst=self.tra_if.local_mac) /
259                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
260                                          dst=self.tra_if.local_ip4) /
261                                       ICMP(),
262                                       seq_num=17))
263         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
264
265         self.assert_packet_counter_equal(
266             '/err/%s/SA replayed packet' % self.tra4_decrypt_node_name, 17)
267
268         # a packet that does not decrypt does not move the window forward
269         bogus_sa = SecurityAssociation(self.encryption_type,
270                                        p.vpp_tra_spi)
271         pkt = (Ether(src=self.tra_if.remote_mac,
272                      dst=self.tra_if.local_mac) /
273                bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
274                                    dst=self.tra_if.local_ip4) /
275                                 ICMP(),
276                                 seq_num=350))
277         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
278
279         self.assert_packet_counter_equal(
280             '/err/%s/Integrity check failed' % self.tra4_decrypt_node_name, 17)
281
282         # which we can determine since this packet is still in the window
283         pkt = (Ether(src=self.tra_if.remote_mac,
284                      dst=self.tra_if.local_mac) /
285                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
286                                          dst=self.tra_if.local_ip4) /
287                                       ICMP(),
288                                       seq_num=234))
289         self.send_and_expect(self.tra_if, [pkt], self.tra_if)
290
291         # move the security-associations seq number on to the last we used
292         p.scapy_tra_sa.seq_num = 351
293         p.vpp_tra_sa.seq_num = 351
294
295     def test_tra_basic(self, count=1):
296         """ ipsec v4 transport basic test """
297         self.vapi.cli("clear errors")
298         try:
299             p = self.params[socket.AF_INET]
300             send_pkts = self.gen_encrypt_pkts(p.scapy_tra_sa, self.tra_if,
301                                               src=self.tra_if.remote_ip4,
302                                               dst=self.tra_if.local_ip4,
303                                               count=count)
304             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
305                                              self.tra_if)
306             for rx in recv_pkts:
307                 try:
308                     decrypted = p.vpp_tra_sa.decrypt(rx[IP])
309                     self.assert_packet_checksums_valid(decrypted)
310                 except:
311                     self.logger.debug(ppp("Unexpected packet:", rx))
312                     raise
313         finally:
314             self.logger.info(self.vapi.ppcli("show error"))
315             self.logger.info(self.vapi.ppcli("show ipsec"))
316
317         pkts = p.tra_sa_in.get_stats()['packets']
318         self.assertEqual(pkts, count,
319                          "incorrect SA in counts: expected %d != %d" %
320                          (count, pkts))
321         pkts = p.tra_sa_out.get_stats()['packets']
322         self.assertEqual(pkts, count,
323                          "incorrect SA out counts: expected %d != %d" %
324                          (count, pkts))
325
326         self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
327         self.assert_packet_counter_equal(self.tra4_decrypt_node_name, count)
328
329     def test_tra_burst(self):
330         """ ipsec v4 transport burst test """
331         self.test_tra_basic(count=257)
332
333
334 class IpsecTra6Tests(object):
335     def test_tra_basic6(self, count=1):
336         """ ipsec v6 transport basic test """
337         self.vapi.cli("clear errors")
338         try:
339             p = self.params[socket.AF_INET6]
340             send_pkts = self.gen_encrypt_pkts6(p.scapy_tra_sa, self.tra_if,
341                                                src=self.tra_if.remote_ip6,
342                                                dst=self.tra_if.local_ip6,
343                                                count=count)
344             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
345                                              self.tra_if)
346             for rx in recv_pkts:
347                 try:
348                     decrypted = p.vpp_tra_sa.decrypt(rx[IPv6])
349                     self.assert_packet_checksums_valid(decrypted)
350                 except:
351                     self.logger.debug(ppp("Unexpected packet:", rx))
352                     raise
353         finally:
354             self.logger.info(self.vapi.ppcli("show error"))
355             self.logger.info(self.vapi.ppcli("show ipsec"))
356
357         pkts = p.tra_sa_in.get_stats()['packets']
358         self.assertEqual(pkts, count,
359                          "incorrect SA in counts: expected %d != %d" %
360                          (count, pkts))
361         pkts = p.tra_sa_out.get_stats()['packets']
362         self.assertEqual(pkts, count,
363                          "incorrect SA out counts: expected %d != %d" %
364                          (count, pkts))
365         self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
366         self.assert_packet_counter_equal(self.tra6_decrypt_node_name, count)
367
368     def test_tra_burst6(self):
369         """ ipsec v6 transport burst test """
370         self.test_tra_basic6(count=257)
371
372
373 class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
374     pass
375
376
377 class IpsecTun4Tests(object):
378     def test_tun_basic44(self, count=1):
379         """ ipsec 4o4 tunnel basic test """
380         self.vapi.cli("clear errors")
381         try:
382             p = self.params[socket.AF_INET]
383             vpp_tun_sa, scapy_tun_sa = self.configure_sa_tun(p)
384             send_pkts = self.gen_encrypt_pkts(scapy_tun_sa, self.tun_if,
385                                               src=p.remote_tun_if_host,
386                                               dst=self.pg1.remote_ip4,
387                                               count=count)
388             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
389             for recv_pkt in recv_pkts:
390                 self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host)
391                 self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
392                 self.assert_packet_checksums_valid(recv_pkt)
393             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
394                                       dst=p.remote_tun_if_host, count=count)
395             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
396             for recv_pkt in recv_pkts:
397                 try:
398                     decrypt_pkt = vpp_tun_sa.decrypt(recv_pkt[IP])
399                     if not decrypt_pkt.haslayer(IP):
400                         decrypt_pkt = IP(decrypt_pkt[Raw].load)
401                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
402                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
403                     self.assert_packet_checksums_valid(decrypt_pkt)
404                 except:
405                     self.logger.debug(ppp("Unexpected packet:", recv_pkt))
406                     try:
407                         self.logger.debug(
408                             ppp("Decrypted packet:", decrypt_pkt))
409                     except:
410                         pass
411                     raise
412         finally:
413             self.logger.info(self.vapi.ppcli("show error"))
414             self.logger.info(self.vapi.ppcli("show ipsec"))
415
416         if (hasattr(p, "spd_policy_in_any")):
417             pkts = p.spd_policy_in_any.get_stats()['packets']
418             self.assertEqual(pkts, count,
419                              "incorrect SPD any policy: expected %d != %d" %
420                              (count, pkts))
421
422         if (hasattr(p, "tun_sa_in")):
423             pkts = p.tun_sa_in.get_stats()['packets']
424             self.assertEqual(pkts, count,
425                              "incorrect SA in counts: expected %d != %d" %
426                              (count, pkts))
427             pkts = p.tun_sa_out.get_stats()['packets']
428             self.assertEqual(pkts, count,
429                              "incorrect SA out counts: expected %d != %d" %
430                              (count, pkts))
431
432         self.assert_packet_counter_equal(self.tun4_encrypt_node_name, count)
433         self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
434
435     def test_tun_burst44(self):
436         """ ipsec 4o4 tunnel burst test """
437         self.test_tun_basic44(count=257)
438
439
440 class IpsecTun6Tests(object):
441     def test_tun_basic66(self, count=1):
442         """ ipsec 6o6 tunnel basic test """
443         self.vapi.cli("clear errors")
444         try:
445             p = self.params[socket.AF_INET6]
446             vpp_tun_sa, scapy_tun_sa = self.configure_sa_tun(p)
447             send_pkts = self.gen_encrypt_pkts6(scapy_tun_sa, self.tun_if,
448                                                src=p.remote_tun_if_host,
449                                                dst=self.pg1.remote_ip6,
450                                                count=count)
451             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
452             for recv_pkt in recv_pkts:
453                 self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host)
454                 self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
455                 self.assert_packet_checksums_valid(recv_pkt)
456             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
457                                        dst=p.remote_tun_if_host,
458                                        count=count)
459             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
460             for recv_pkt in recv_pkts:
461                 try:
462                     decrypt_pkt = vpp_tun_sa.decrypt(recv_pkt[IPv6])
463                     if not decrypt_pkt.haslayer(IPv6):
464                         decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
465                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
466                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
467                     self.assert_packet_checksums_valid(decrypt_pkt)
468                 except:
469                     self.logger.debug(ppp("Unexpected packet:", recv_pkt))
470                     try:
471                         self.logger.debug(
472                             ppp("Decrypted packet:", decrypt_pkt))
473                     except:
474                         pass
475                     raise
476         finally:
477             self.logger.info(self.vapi.ppcli("show error"))
478             self.logger.info(self.vapi.ppcli("show ipsec"))
479
480         pkts = p.tun_sa_in.get_stats()['packets']
481         self.assertEqual(pkts, count,
482                          "incorrect SA in counts: expected %d != %d" %
483                          (count, pkts))
484         pkts = p.tun_sa_out.get_stats()['packets']
485         self.assertEqual(pkts, count,
486                          "incorrect SA out counts: expected %d != %d" %
487                          (count, pkts))
488         self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
489         self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count)
490
491     def test_tun_burst66(self):
492         """ ipsec 6o6 tunnel burst test """
493         self.test_tun_basic66(count=257)
494
495
496 class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests):
497     pass
498
499
500 if __name__ == '__main__':
501     unittest.main(testRunner=VppTestRunner)