IPSEC: move SA counters into the stats segment
[vpp.git] / test / template_ipsec.py
1 import unittest
2 import socket
3
4 from scapy.layers.inet import IP, ICMP, TCP
5 from scapy.layers.ipsec import SecurityAssociation
6 from scapy.layers.l2 import Ether, Raw
7 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
8
9 from framework import VppTestCase, VppTestRunner
10 from util import ppp
11 from vpp_papi import VppEnum
12
13
14 class IPsecIPv4Params(object):
15
16     addr_type = socket.AF_INET
17     addr_any = "0.0.0.0"
18     addr_bcast = "255.255.255.255"
19     addr_len = 32
20     is_ipv6 = 0
21
22     def __init__(self):
23         self.remote_tun_if_host = '1.1.1.1'
24
25         self.scapy_tun_sa_id = 10
26         self.scapy_tun_spi = 1001
27         self.vpp_tun_sa_id = 20
28         self.vpp_tun_spi = 1000
29
30         self.scapy_tra_sa_id = 30
31         self.scapy_tra_spi = 2001
32         self.vpp_tra_sa_id = 40
33         self.vpp_tra_spi = 2000
34
35         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
36                                  IPSEC_API_INTEG_ALG_SHA1_96)
37         self.auth_algo = 'HMAC-SHA1-96'  # scapy name
38         self.auth_key = 'C91KUR9GYMm5GfkEvNjX'
39
40         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
41                                   IPSEC_API_CRYPTO_ALG_AES_CBC_128)
42         self.crypt_algo = 'AES-CBC'  # scapy name
43         self.crypt_key = 'JPjyOWBeVEQiMe7h'
44
45
46 class IPsecIPv6Params(object):
47
48     addr_type = socket.AF_INET6
49     addr_any = "0::0"
50     addr_bcast = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"
51     addr_len = 128
52     is_ipv6 = 1
53
54     def __init__(self):
55         self.remote_tun_if_host = '1111:1111:1111:1111:1111:1111:1111:1111'
56
57         self.scapy_tun_sa_id = 50
58         self.scapy_tun_spi = 3001
59         self.vpp_tun_sa_id = 60
60         self.vpp_tun_spi = 3000
61
62         self.scapy_tra_sa_id = 70
63         self.scapy_tra_spi = 4001
64         self.vpp_tra_sa_id = 80
65         self.vpp_tra_spi = 4000
66
67         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
68                                  IPSEC_API_INTEG_ALG_SHA_256_128)
69         self.auth_algo = 'SHA2-256-128'  # scapy name
70         self.auth_key = 'C91KUR9GYMm5GfkEvNjX'
71
72         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
73                                   IPSEC_API_CRYPTO_ALG_AES_CBC_256)
74         self.crypt_algo = 'AES-CBC'  # scapy name
75         self.crypt_key = 'JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h'
76
77
78 class TemplateIpsec(VppTestCase):
79     """
80     TRANSPORT MODE:
81
82      ------   encrypt   ---
83     |tra_if| <-------> |VPP|
84      ------   decrypt   ---
85
86     TUNNEL MODE:
87
88      ------   encrypt   ---   plain   ---
89     |tun_if| <-------  |VPP| <------ |pg1|
90      ------             ---           ---
91
92      ------   decrypt   ---   plain   ---
93     |tun_if| ------->  |VPP| ------> |pg1|
94      ------             ---           ---
95     """
96
97     def ipsec_select_backend(self):
98         """ empty method to be overloaded when necessary """
99         pass
100
101     def setUp(self):
102         super(TemplateIpsec, self).setUp()
103
104         self.ipv4_params = IPsecIPv4Params()
105         self.ipv6_params = IPsecIPv6Params()
106         self.params = {self.ipv4_params.addr_type: self.ipv4_params,
107                        self.ipv6_params.addr_type: self.ipv6_params}
108
109         self.payload = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"\
110                        "XXXXXXXXXXXXXXXXXXXXX"
111
112         self.tun_spd_id = 1
113         self.tra_spd_id = 2
114
115         self.vpp_esp_protocol = (VppEnum.vl_api_ipsec_proto_t.
116                                  IPSEC_API_PROTO_ESP)
117         self.vpp_ah_protocol = (VppEnum.vl_api_ipsec_proto_t.
118                                 IPSEC_API_PROTO_AH)
119
120         self.create_pg_interfaces(range(3))
121         self.interfaces = list(self.pg_interfaces)
122         for i in self.interfaces:
123             i.admin_up()
124             i.config_ip4()
125             i.resolve_arp()
126             i.config_ip6()
127             i.resolve_ndp()
128         self.ipsec_select_backend()
129
130     def tearDown(self):
131         super(TemplateIpsec, self).tearDown()
132
133         for i in self.interfaces:
134             i.admin_down()
135             i.unconfig_ip4()
136             i.unconfig_ip6()
137
138         if not self.vpp_dead:
139             self.vapi.cli("show hardware")
140
141     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1):
142         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
143                 sa.encrypt(IP(src=src, dst=dst) / ICMP() / self.payload)
144                 for i in range(count)]
145
146     def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1):
147         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
148                 sa.encrypt(IPv6(src=src, dst=dst) /
149                            ICMPv6EchoRequest(id=0, seq=1, data=self.payload))
150                 for i in range(count)]
151
152     def gen_pkts(self, sw_intf, src, dst, count=1):
153         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
154                 IP(src=src, dst=dst) / ICMP() / self.payload
155                 for i in range(count)]
156
157     def gen_pkts6(self, sw_intf, src, dst, count=1):
158         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
159                 IPv6(src=src, dst=dst) /
160                 ICMPv6EchoRequest(id=0, seq=1, data=self.payload)
161                 for i in range(count)]
162
163     def configure_sa_tun(self, params):
164         ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
165         scapy_tun_sa = SecurityAssociation(
166             self.encryption_type, spi=params.vpp_tun_spi,
167             crypt_algo=params.crypt_algo, crypt_key=params.crypt_key,
168             auth_algo=params.auth_algo, auth_key=params.auth_key,
169             tunnel_header=ip_class_by_addr_type[params.addr_type](
170                 src=self.tun_if.remote_addr[params.addr_type],
171                 dst=self.tun_if.local_addr[params.addr_type]))
172         vpp_tun_sa = SecurityAssociation(
173             self.encryption_type, spi=params.scapy_tun_spi,
174             crypt_algo=params.crypt_algo, crypt_key=params.crypt_key,
175             auth_algo=params.auth_algo, auth_key=params.auth_key,
176             tunnel_header=ip_class_by_addr_type[params.addr_type](
177                 dst=self.tun_if.remote_addr[params.addr_type],
178                 src=self.tun_if.local_addr[params.addr_type]))
179         return vpp_tun_sa, scapy_tun_sa
180
181     def configure_sa_tra(self, params):
182         params.scapy_tra_sa = SecurityAssociation(self.encryption_type,
183                                                   spi=params.vpp_tra_spi,
184                                                   crypt_algo=params.crypt_algo,
185                                                   crypt_key=params.crypt_key,
186                                                   auth_algo=params.auth_algo,
187                                                   auth_key=params.auth_key)
188         params.vpp_tra_sa = SecurityAssociation(self.encryption_type,
189                                                 spi=params.scapy_tra_spi,
190                                                 crypt_algo=params.crypt_algo,
191                                                 crypt_key=params.crypt_key,
192                                                 auth_algo=params.auth_algo,
193                                                 auth_key=params.auth_key)
194
195
196 class IpsecTcpTests(object):
197     def test_tcp_checksum(self):
198         """ verify checksum correctness for vpp generated packets """
199         self.vapi.cli("test http server")
200         p = self.params[socket.AF_INET]
201         vpp_tun_sa, scapy_tun_sa = self.configure_sa_tun(p)
202         send = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
203                 scapy_tun_sa.encrypt(IP(src=p.remote_tun_if_host,
204                                         dst=self.tun_if.local_ip4) /
205                                      TCP(flags='S', dport=80)))
206         self.logger.debug(ppp("Sending packet:", send))
207         recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
208         recv = recv[0]
209         decrypted = vpp_tun_sa.decrypt(recv[IP])
210         self.assert_packet_checksums_valid(decrypted)
211
212
213 class IpsecTraTests(object):
214     def test_tra_anti_replay(self, count=1):
215         """ ipsec v4 transport anti-reply test """
216         p = self.params[socket.AF_INET]
217
218         # fire in a packet with seq number 1
219         pkt = (Ether(src=self.tra_if.remote_mac,
220                      dst=self.tra_if.local_mac) /
221                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
222                                          dst=self.tra_if.local_ip4) /
223                                       ICMP(),
224                                       seq_num=1))
225         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
226
227         # now move the window over to 235
228         pkt = (Ether(src=self.tra_if.remote_mac,
229                      dst=self.tra_if.local_mac) /
230                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
231                                          dst=self.tra_if.local_ip4) /
232                                       ICMP(),
233                                       seq_num=235))
234         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
235
236         # the window size is 64 packets
237         # in window are still accepted
238         pkt = (Ether(src=self.tra_if.remote_mac,
239                      dst=self.tra_if.local_mac) /
240                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
241                                          dst=self.tra_if.local_ip4) /
242                                       ICMP(),
243                                       seq_num=172))
244         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
245
246         # out of window are dropped
247         pkt = (Ether(src=self.tra_if.remote_mac,
248                      dst=self.tra_if.local_mac) /
249                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
250                                          dst=self.tra_if.local_ip4) /
251                                       ICMP(),
252                                       seq_num=17))
253         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
254
255         self.assert_packet_counter_equal(
256             '/err/%s/SA replayed packet' % self.tra4_decrypt_node_name, 17)
257
258         # a packet that does not decrypt does not move the window forward
259         bogus_sa = SecurityAssociation(self.encryption_type,
260                                        p.vpp_tra_spi)
261         pkt = (Ether(src=self.tra_if.remote_mac,
262                      dst=self.tra_if.local_mac) /
263                bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
264                                    dst=self.tra_if.local_ip4) /
265                                 ICMP(),
266                                 seq_num=350))
267         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
268
269         self.assert_packet_counter_equal(
270             '/err/%s/Integrity check failed' % self.tra4_decrypt_node_name, 17)
271
272         # which we can determine since this packet is still in the window
273         pkt = (Ether(src=self.tra_if.remote_mac,
274                      dst=self.tra_if.local_mac) /
275                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
276                                          dst=self.tra_if.local_ip4) /
277                                       ICMP(),
278                                       seq_num=234))
279         self.send_and_expect(self.tra_if, [pkt], self.tra_if)
280
281         # move the security-associations seq number on to the last we used
282         p.scapy_tra_sa.seq_num = 351
283         p.vpp_tra_sa.seq_num = 351
284
285     def test_tra_basic(self, count=1):
286         """ ipsec v4 transport basic test """
287         self.vapi.cli("clear errors")
288         try:
289             p = self.params[socket.AF_INET]
290             send_pkts = self.gen_encrypt_pkts(p.scapy_tra_sa, self.tra_if,
291                                               src=self.tra_if.remote_ip4,
292                                               dst=self.tra_if.local_ip4,
293                                               count=count)
294             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
295                                              self.tra_if)
296             for rx in recv_pkts:
297                 try:
298                     decrypted = p.vpp_tra_sa.decrypt(rx[IP])
299                     self.assert_packet_checksums_valid(decrypted)
300                 except:
301                     self.logger.debug(ppp("Unexpected packet:", rx))
302                     raise
303         finally:
304             self.logger.info(self.vapi.ppcli("show error"))
305             self.logger.info(self.vapi.ppcli("show ipsec"))
306
307         pkts = p.tra_sa_in.get_stats()['packets']
308         self.assertEqual(pkts, count,
309                          "incorrect SA in counts: expected %d != %d" %
310                          (count, pkts))
311         pkts = p.tra_sa_out.get_stats()['packets']
312         self.assertEqual(pkts, count,
313                          "incorrect SA out counts: expected %d != %d" %
314                          (count, pkts))
315
316         self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
317         self.assert_packet_counter_equal(self.tra4_decrypt_node_name, count)
318
319     def test_tra_burst(self):
320         """ ipsec v4 transport burst test """
321         self.test_tra_basic(count=257)
322
323     def test_tra_basic6(self, count=1):
324         """ ipsec v6 transport basic test """
325         self.vapi.cli("clear errors")
326         try:
327             p = self.params[socket.AF_INET6]
328             send_pkts = self.gen_encrypt_pkts6(p.scapy_tra_sa, self.tra_if,
329                                                src=self.tra_if.remote_ip6,
330                                                dst=self.tra_if.local_ip6,
331                                                count=count)
332             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
333                                              self.tra_if)
334             for rx in recv_pkts:
335                 try:
336                     decrypted = p.vpp_tra_sa.decrypt(rx[IPv6])
337                     self.assert_packet_checksums_valid(decrypted)
338                 except:
339                     self.logger.debug(ppp("Unexpected packet:", rx))
340                     raise
341         finally:
342             self.logger.info(self.vapi.ppcli("show error"))
343             self.logger.info(self.vapi.ppcli("show ipsec"))
344
345         pkts = p.tra_sa_in.get_stats()['packets']
346         self.assertEqual(pkts, count,
347                          "incorrect SA in counts: expected %d != %d" %
348                          (count, pkts))
349         pkts = p.tra_sa_out.get_stats()['packets']
350         self.assertEqual(pkts, count,
351                          "incorrect SA out counts: expected %d != %d" %
352                          (count, pkts))
353         self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
354         self.assert_packet_counter_equal(self.tra6_decrypt_node_name, count)
355
356     def test_tra_burst6(self):
357         """ ipsec v6 transport burst test """
358         self.test_tra_basic6(count=257)
359
360
361 class IpsecTun4Tests(object):
362     def test_tun_basic44(self, count=1):
363         """ ipsec 4o4 tunnel basic test """
364         self.vapi.cli("clear errors")
365         try:
366             p = self.params[socket.AF_INET]
367             vpp_tun_sa, scapy_tun_sa = self.configure_sa_tun(p)
368             send_pkts = self.gen_encrypt_pkts(scapy_tun_sa, self.tun_if,
369                                               src=p.remote_tun_if_host,
370                                               dst=self.pg1.remote_ip4,
371                                               count=count)
372             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
373             for recv_pkt in recv_pkts:
374                 self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host)
375                 self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
376                 self.assert_packet_checksums_valid(recv_pkt)
377             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
378                                       dst=p.remote_tun_if_host, count=count)
379             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
380             for recv_pkt in recv_pkts:
381                 try:
382                     decrypt_pkt = vpp_tun_sa.decrypt(recv_pkt[IP])
383                     if not decrypt_pkt.haslayer(IP):
384                         decrypt_pkt = IP(decrypt_pkt[Raw].load)
385                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
386                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
387                     self.assert_packet_checksums_valid(decrypt_pkt)
388                 except:
389                     self.logger.debug(ppp("Unexpected packet:", recv_pkt))
390                     try:
391                         self.logger.debug(
392                             ppp("Decrypted packet:", decrypt_pkt))
393                     except:
394                         pass
395                     raise
396         finally:
397             self.logger.info(self.vapi.ppcli("show error"))
398             self.logger.info(self.vapi.ppcli("show ipsec"))
399
400         if (hasattr(p, "spd_policy_in_any")):
401             pkts = p.spd_policy_in_any.get_stats()['packets']
402             self.assertEqual(pkts, count,
403                              "incorrect SPD any policy: expected %d != %d" %
404                              (count, pkts))
405
406         if (hasattr(p, "tun_sa_in")):
407             pkts = p.tun_sa_in.get_stats()['packets']
408             self.assertEqual(pkts, count,
409                              "incorrect SA in counts: expected %d != %d" %
410                              (count, pkts))
411             pkts = p.tun_sa_out.get_stats()['packets']
412             self.assertEqual(pkts, count,
413                              "incorrect SA out counts: expected %d != %d" %
414                              (count, pkts))
415
416         self.assert_packet_counter_equal(self.tun4_encrypt_node_name, count)
417         self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
418
419     def test_tun_burst44(self):
420         """ ipsec 4o4 tunnel burst test """
421         self.test_tun_basic44(count=257)
422
423
424 class IpsecTun6Tests(object):
425     def test_tun_basic66(self, count=1):
426         """ ipsec 6o6 tunnel basic test """
427         self.vapi.cli("clear errors")
428         try:
429             p = self.params[socket.AF_INET6]
430             vpp_tun_sa, scapy_tun_sa = self.configure_sa_tun(p)
431             send_pkts = self.gen_encrypt_pkts6(scapy_tun_sa, self.tun_if,
432                                                src=p.remote_tun_if_host,
433                                                dst=self.pg1.remote_ip6,
434                                                count=count)
435             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
436             for recv_pkt in recv_pkts:
437                 self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host)
438                 self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
439                 self.assert_packet_checksums_valid(recv_pkt)
440             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
441                                        dst=p.remote_tun_if_host,
442                                        count=count)
443             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
444             for recv_pkt in recv_pkts:
445                 try:
446                     decrypt_pkt = vpp_tun_sa.decrypt(recv_pkt[IPv6])
447                     if not decrypt_pkt.haslayer(IPv6):
448                         decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
449                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
450                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
451                     self.assert_packet_checksums_valid(decrypt_pkt)
452                 except:
453                     self.logger.debug(ppp("Unexpected packet:", recv_pkt))
454                     try:
455                         self.logger.debug(
456                             ppp("Decrypted packet:", decrypt_pkt))
457                     except:
458                         pass
459                     raise
460         finally:
461             self.logger.info(self.vapi.ppcli("show error"))
462             self.logger.info(self.vapi.ppcli("show ipsec"))
463
464         pkts = p.tun_sa_in.get_stats()['packets']
465         self.assertEqual(pkts, count,
466                          "incorrect SA in counts: expected %d != %d" %
467                          (count, pkts))
468         pkts = p.tun_sa_out.get_stats()['packets']
469         self.assertEqual(pkts, count,
470                          "incorrect SA out counts: expected %d != %d" %
471                          (count, pkts))
472         self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
473         self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count)
474
475     def test_tun_burst66(self):
476         """ ipsec 6o6 tunnel burst test """
477         self.test_tun_basic66(count=257)
478
479
480 class IpsecTunTests(IpsecTun4Tests, IpsecTun6Tests):
481     pass
482
483
484 if __name__ == '__main__':
485     unittest.main(testRunner=VppTestRunner)