ipsec: GCM, Anti-replay and ESN fixess
[vpp.git] / test / template_ipsec.py
1 import unittest
2 import socket
3 import struct
4
5 from scapy.layers.inet import IP, ICMP, TCP, UDP
6 from scapy.layers.ipsec import SecurityAssociation, ESP
7 from scapy.layers.l2 import Ether, Raw
8 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
9
10 from framework import VppTestCase, VppTestRunner
11 from util import ppp, reassemble4
12 from vpp_papi import VppEnum
13
14
15 class IPsecIPv4Params(object):
16
17     addr_type = socket.AF_INET
18     addr_any = "0.0.0.0"
19     addr_bcast = "255.255.255.255"
20     addr_len = 32
21     is_ipv6 = 0
22
23     def __init__(self):
24         self.remote_tun_if_host = '1.1.1.1'
25         self.remote_tun_if_host6 = '1111::1'
26
27         self.scapy_tun_sa_id = 10
28         self.scapy_tun_spi = 1001
29         self.vpp_tun_sa_id = 20
30         self.vpp_tun_spi = 1000
31
32         self.scapy_tra_sa_id = 30
33         self.scapy_tra_spi = 2001
34         self.vpp_tra_sa_id = 40
35         self.vpp_tra_spi = 2000
36
37         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
38                                  IPSEC_API_INTEG_ALG_SHA1_96)
39         self.auth_algo = 'HMAC-SHA1-96'  # scapy name
40         self.auth_key = 'C91KUR9GYMm5GfkEvNjX'
41
42         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
43                                   IPSEC_API_CRYPTO_ALG_AES_CBC_128)
44         self.crypt_algo = 'AES-CBC'  # scapy name
45         self.crypt_key = 'JPjyOWBeVEQiMe7h'
46         self.salt = 0
47         self.flags = 0
48         self.nat_header = None
49
50
51 class IPsecIPv6Params(object):
52
53     addr_type = socket.AF_INET6
54     addr_any = "0::0"
55     addr_bcast = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"
56     addr_len = 128
57     is_ipv6 = 1
58
59     def __init__(self):
60         self.remote_tun_if_host = '1111:1111:1111:1111:1111:1111:1111:1111'
61         self.remote_tun_if_host4 = '1.1.1.1'
62
63         self.scapy_tun_sa_id = 50
64         self.scapy_tun_spi = 3001
65         self.vpp_tun_sa_id = 60
66         self.vpp_tun_spi = 3000
67
68         self.scapy_tra_sa_id = 70
69         self.scapy_tra_spi = 4001
70         self.vpp_tra_sa_id = 80
71         self.vpp_tra_spi = 4000
72
73         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
74                                  IPSEC_API_INTEG_ALG_SHA1_96)
75         self.auth_algo = 'HMAC-SHA1-96'  # scapy name
76         self.auth_key = 'C91KUR9GYMm5GfkEvNjX'
77
78         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
79                                   IPSEC_API_CRYPTO_ALG_AES_CBC_128)
80         self.crypt_algo = 'AES-CBC'  # scapy name
81         self.crypt_key = 'JPjyOWBeVEQiMe7h'
82         self.salt = 0
83         self.flags = 0
84         self.nat_header = None
85
86
87 def mk_scapy_crpyt_key(p):
88     if p.crypt_algo == "AES-GCM":
89         return p.crypt_key + struct.pack("!I", p.salt)
90     else:
91         return p.crypt_key
92
93
94 def config_tun_params(p, encryption_type, tun_if):
95     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
96     use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
97                               IPSEC_API_SAD_FLAG_USE_ESN))
98     crypt_key = mk_scapy_crpyt_key(p)
99     p.scapy_tun_sa = SecurityAssociation(
100         encryption_type, spi=p.vpp_tun_spi,
101         crypt_algo=p.crypt_algo,
102         crypt_key=crypt_key,
103         auth_algo=p.auth_algo, auth_key=p.auth_key,
104         tunnel_header=ip_class_by_addr_type[p.addr_type](
105             src=tun_if.remote_addr[p.addr_type],
106             dst=tun_if.local_addr[p.addr_type]),
107         nat_t_header=p.nat_header,
108         use_esn=use_esn)
109     p.vpp_tun_sa = SecurityAssociation(
110         encryption_type, spi=p.scapy_tun_spi,
111         crypt_algo=p.crypt_algo,
112         crypt_key=crypt_key,
113         auth_algo=p.auth_algo, auth_key=p.auth_key,
114         tunnel_header=ip_class_by_addr_type[p.addr_type](
115             dst=tun_if.remote_addr[p.addr_type],
116             src=tun_if.local_addr[p.addr_type]),
117         nat_t_header=p.nat_header,
118         use_esn=use_esn)
119
120
121 def config_tra_params(p, encryption_type):
122     use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
123                               IPSEC_API_SAD_FLAG_USE_ESN))
124     crypt_key = mk_scapy_crpyt_key(p)
125     p.scapy_tra_sa = SecurityAssociation(
126         encryption_type,
127         spi=p.vpp_tra_spi,
128         crypt_algo=p.crypt_algo,
129         crypt_key=crypt_key,
130         auth_algo=p.auth_algo,
131         auth_key=p.auth_key,
132         nat_t_header=p.nat_header,
133         use_esn=use_esn)
134     p.vpp_tra_sa = SecurityAssociation(
135         encryption_type,
136         spi=p.scapy_tra_spi,
137         crypt_algo=p.crypt_algo,
138         crypt_key=crypt_key,
139         auth_algo=p.auth_algo,
140         auth_key=p.auth_key,
141         nat_t_header=p.nat_header,
142         use_esn=use_esn)
143
144
145 class TemplateIpsec(VppTestCase):
146     """
147     TRANSPORT MODE:
148
149      ------   encrypt   ---
150     |tra_if| <-------> |VPP|
151      ------   decrypt   ---
152
153     TUNNEL MODE:
154
155      ------   encrypt   ---   plain   ---
156     |tun_if| <-------  |VPP| <------ |pg1|
157      ------             ---           ---
158
159      ------   decrypt   ---   plain   ---
160     |tun_if| ------->  |VPP| ------> |pg1|
161      ------             ---           ---
162     """
163     tun_spd_id = 1
164     tra_spd_id = 2
165
166     def ipsec_select_backend(self):
167         """ empty method to be overloaded when necessary """
168         pass
169
170     @classmethod
171     def setUpClass(cls):
172         super(TemplateIpsec, cls).setUpClass()
173
174     @classmethod
175     def tearDownClass(cls):
176         super(TemplateIpsec, cls).tearDownClass()
177
178     def setup_params(self):
179         self.ipv4_params = IPsecIPv4Params()
180         self.ipv6_params = IPsecIPv6Params()
181         self.params = {self.ipv4_params.addr_type: self.ipv4_params,
182                        self.ipv6_params.addr_type: self.ipv6_params}
183
184     def config_interfaces(self):
185         self.create_pg_interfaces(range(3))
186         self.interfaces = list(self.pg_interfaces)
187         for i in self.interfaces:
188             i.admin_up()
189             i.config_ip4()
190             i.resolve_arp()
191             i.config_ip6()
192             i.resolve_ndp()
193
194     def setUp(self):
195         super(TemplateIpsec, self).setUp()
196
197         self.setup_params()
198
199         self.vpp_esp_protocol = (VppEnum.vl_api_ipsec_proto_t.
200                                  IPSEC_API_PROTO_ESP)
201         self.vpp_ah_protocol = (VppEnum.vl_api_ipsec_proto_t.
202                                 IPSEC_API_PROTO_AH)
203
204         self.config_interfaces()
205
206         self.ipsec_select_backend()
207
208     def unconfig_interfaces(self):
209         for i in self.interfaces:
210             i.admin_down()
211             i.unconfig_ip4()
212             i.unconfig_ip6()
213
214     def tearDown(self):
215         super(TemplateIpsec, self).tearDown()
216
217         self.unconfig_interfaces()
218
219     def show_commands_at_teardown(self):
220         self.logger.info(self.vapi.cli("show hardware"))
221
222     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
223                          payload_size=54):
224         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
225                 sa.encrypt(IP(src=src, dst=dst) /
226                            ICMP() / Raw('X' * payload_size))
227                 for i in range(count)]
228
229     def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
230                           payload_size=54):
231         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
232                 sa.encrypt(IPv6(src=src, dst=dst) /
233                            ICMPv6EchoRequest(id=0, seq=1,
234                                              data='X' * payload_size))
235                 for i in range(count)]
236
237     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
238         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
239                 IP(src=src, dst=dst) / ICMP() / Raw('X' * payload_size)
240                 for i in range(count)]
241
242     def gen_pkts6(self, sw_intf, src, dst, count=1, payload_size=54):
243         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
244                 IPv6(src=src, dst=dst) /
245                 ICMPv6EchoRequest(id=0, seq=1, data='X' * payload_size)
246                 for i in range(count)]
247
248
249 class IpsecTcp(object):
250     def verify_tcp_checksum(self):
251         self.vapi.cli("test http server")
252         p = self.params[socket.AF_INET]
253         config_tun_params(p, self.encryption_type, self.tun_if)
254         send = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
255                 p.scapy_tun_sa.encrypt(IP(src=p.remote_tun_if_host,
256                                           dst=self.tun_if.local_ip4) /
257                                        TCP(flags='S', dport=80)))
258         self.logger.debug(ppp("Sending packet:", send))
259         recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
260         recv = recv[0]
261         decrypted = p.vpp_tun_sa.decrypt(recv[IP])
262         self.assert_packet_checksums_valid(decrypted)
263
264
265 class IpsecTcpTests(IpsecTcp):
266     def test_tcp_checksum(self):
267         """ verify checksum correctness for vpp generated packets """
268         self.verify_tcp_checksum()
269
270
271 class IpsecTra4(object):
272     """ verify methods for Transport v4 """
273     def verify_tra_anti_replay(self):
274         p = self.params[socket.AF_INET]
275         use_esn = p.vpp_tra_sa.use_esn
276
277         seq_cycle_node_name = ('/err/%s/sequence number cycled' %
278                                self.tra4_encrypt_node_name)
279         replay_node_name = ('/err/%s/SA replayed packet' %
280                             self.tra4_decrypt_node_name)
281         if ESP == self.encryption_type and p.crypt_algo == "AES-GCM":
282             hash_failed_node_name = ('/err/%s/ESP decryption failed' %
283                                      self.tra4_decrypt_node_name)
284         else:
285             hash_failed_node_name = ('/err/%s/Integrity check failed' %
286                                      self.tra4_decrypt_node_name)
287         replay_count = self.statistics.get_err_counter(replay_node_name)
288         hash_failed_count = self.statistics.get_err_counter(
289             hash_failed_node_name)
290         seq_cycle_count = self.statistics.get_err_counter(seq_cycle_node_name)
291
292         if ESP == self.encryption_type:
293             undersize_node_name = ('/err/%s/undersized packet' %
294                                    self.tra4_decrypt_node_name)
295             undersize_count = self.statistics.get_err_counter(
296                 undersize_node_name)
297
298         #
299         # send packets with seq numbers 1->34
300         # this means the window size is still in Case B (see RFC4303
301         # Appendix A)
302         #
303         # for reasons i haven't investigated Scapy won't create a packet with
304         # seq_num=0
305         #
306         pkts = [(Ether(src=self.tra_if.remote_mac,
307                        dst=self.tra_if.local_mac) /
308                  p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
309                                            dst=self.tra_if.local_ip4) /
310                                         ICMP(),
311                                         seq_num=seq))
312                 for seq in range(1, 34)]
313         recv_pkts = self.send_and_expect(self.tra_if, pkts, self.tra_if)
314
315         # replayed packets are dropped
316         self.send_and_assert_no_replies(self.tra_if, pkts)
317         replay_count += len(pkts)
318         self.assert_error_counter_equal(replay_node_name, replay_count)
319
320         #
321         # now move the window over to 257 (more than one byte) and into Case A
322         #
323         pkt = (Ether(src=self.tra_if.remote_mac,
324                      dst=self.tra_if.local_mac) /
325                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
326                                          dst=self.tra_if.local_ip4) /
327                                       ICMP(),
328                                       seq_num=257))
329         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
330
331         # replayed packets are dropped
332         self.send_and_assert_no_replies(self.tra_if, pkt * 3)
333         replay_count += 3
334         self.assert_error_counter_equal(replay_node_name, replay_count)
335
336         # the window size is 64 packets
337         # in window are still accepted
338         pkt = (Ether(src=self.tra_if.remote_mac,
339                      dst=self.tra_if.local_mac) /
340                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
341                                          dst=self.tra_if.local_ip4) /
342                                       ICMP(),
343                                       seq_num=200))
344         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
345
346         # a packet that does not decrypt does not move the window forward
347         bogus_sa = SecurityAssociation(self.encryption_type,
348                                        p.vpp_tra_spi,
349                                        crypt_algo=p.crypt_algo,
350                                        crypt_key=mk_scapy_crpyt_key(p)[::-1],
351                                        auth_algo=p.auth_algo,
352                                        auth_key=p.auth_key[::-1])
353         pkt = (Ether(src=self.tra_if.remote_mac,
354                      dst=self.tra_if.local_mac) /
355                bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
356                                    dst=self.tra_if.local_ip4) /
357                                 ICMP(),
358                                 seq_num=350))
359         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
360
361         hash_failed_count += 17
362         self.assert_error_counter_equal(hash_failed_node_name,
363                                         hash_failed_count)
364
365         # a malformed 'runt' packet
366         #  created by a mis-constructed SA
367         if (ESP == self.encryption_type):
368             bogus_sa = SecurityAssociation(self.encryption_type,
369                                            p.vpp_tra_spi)
370             pkt = (Ether(src=self.tra_if.remote_mac,
371                          dst=self.tra_if.local_mac) /
372                    bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
373                                        dst=self.tra_if.local_ip4) /
374                                     ICMP(),
375                                     seq_num=350))
376             self.send_and_assert_no_replies(self.tra_if, pkt * 17)
377
378             undersize_count += 17
379             self.assert_error_counter_equal(undersize_node_name,
380                                             undersize_count)
381
382         # which we can determine since this packet is still in the window
383         pkt = (Ether(src=self.tra_if.remote_mac,
384                      dst=self.tra_if.local_mac) /
385                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
386                                          dst=self.tra_if.local_ip4) /
387                                       ICMP(),
388                                       seq_num=234))
389         self.send_and_expect(self.tra_if, [pkt], self.tra_if)
390
391         #
392         # out of window are dropped
393         #  this is Case B. So VPP will consider this to be a high seq num wrap
394         #  and so the decrypt attempt will fail
395         #
396         pkt = (Ether(src=self.tra_if.remote_mac,
397                      dst=self.tra_if.local_mac) /
398                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
399                                          dst=self.tra_if.local_ip4) /
400                                       ICMP(),
401                                       seq_num=17))
402         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
403
404         if use_esn:
405             # an out of window error with ESN looks like a high sequence
406             # wrap. but since it isn't then the verify will fail.
407             hash_failed_count += 17
408             self.assert_error_counter_equal(hash_failed_node_name,
409                                             hash_failed_count)
410
411         else:
412             replay_count += 17
413             self.assert_error_counter_equal(replay_node_name,
414                                             replay_count)
415
416         # valid packet moves the window over to 258
417         pkt = (Ether(src=self.tra_if.remote_mac,
418                      dst=self.tra_if.local_mac) /
419                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
420                                          dst=self.tra_if.local_ip4) /
421                                       ICMP(),
422                                       seq_num=258))
423         rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
424         decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
425
426         #
427         # move VPP's SA TX seq-num to just before the seq-number wrap.
428         # then fire in a packet that VPP should drop on TX because it
429         # causes the TX seq number to wrap; unless we're using extened sequence
430         # numbers.
431         #
432         self.vapi.cli("test ipsec sa %d seq 0xffffffff" % p.scapy_tra_sa_id)
433         self.logger.info(self.vapi.ppcli("show ipsec sa 0"))
434         self.logger.info(self.vapi.ppcli("show ipsec sa 1"))
435
436         pkts = [(Ether(src=self.tra_if.remote_mac,
437                        dst=self.tra_if.local_mac) /
438                  p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
439                                            dst=self.tra_if.local_ip4) /
440                                         ICMP(),
441                                         seq_num=seq))
442                 for seq in range(259, 280)]
443
444         if use_esn:
445             rxs = self.send_and_expect(self.tra_if, pkts, self.tra_if)
446
447             #
448             # in order for scapy to decrypt its SA's high order number needs
449             # to wrap
450             #
451             p.vpp_tra_sa.seq_num = 0x100000000
452             for rx in rxs:
453                 decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
454
455             #
456             # wrap scapy's TX high sequence number. VPP is in case B, so it
457             # will consider this a high seq wrap also.
458             # The low seq num we set it to will place VPP's RX window in Case A
459             #
460             p.scapy_tra_sa.seq_num = 0x100000005
461             pkt = (Ether(src=self.tra_if.remote_mac,
462                          dst=self.tra_if.local_mac) /
463                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
464                                              dst=self.tra_if.local_ip4) /
465                                           ICMP(),
466                                           seq_num=0x100000005))
467             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
468             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
469
470             #
471             # A packet that has seq num between (2^32-64) and 5 is within
472             # the window
473             #
474             p.scapy_tra_sa.seq_num = 0xfffffffd
475             pkt = (Ether(src=self.tra_if.remote_mac,
476                          dst=self.tra_if.local_mac) /
477                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
478                                              dst=self.tra_if.local_ip4) /
479                                           ICMP(),
480                                           seq_num=0xfffffffd))
481             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
482             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
483
484             #
485             # While in case A we cannot wrap the high sequence number again
486             # becuase VPP will consider this packet to be one that moves the
487             # window forward
488             #
489             pkt = (Ether(src=self.tra_if.remote_mac,
490                          dst=self.tra_if.local_mac) /
491                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
492                                              dst=self.tra_if.local_ip4) /
493                                           ICMP(),
494                                           seq_num=0x200000999))
495             self.send_and_assert_no_replies(self.tra_if, [pkt], self.tra_if)
496
497             hash_failed_count += 1
498             self.assert_error_counter_equal(hash_failed_node_name,
499                                             hash_failed_count)
500
501             #
502             # but if we move the wondow forward to case B, then we can wrap
503             # again
504             #
505             p.scapy_tra_sa.seq_num = 0x100000555
506             pkt = (Ether(src=self.tra_if.remote_mac,
507                          dst=self.tra_if.local_mac) /
508                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
509                                              dst=self.tra_if.local_ip4) /
510                                           ICMP(),
511                                           seq_num=0x100000555))
512             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
513             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
514
515             p.scapy_tra_sa.seq_num = 0x200000444
516             pkt = (Ether(src=self.tra_if.remote_mac,
517                          dst=self.tra_if.local_mac) /
518                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
519                                              dst=self.tra_if.local_ip4) /
520                                           ICMP(),
521                                           seq_num=0x200000444))
522             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
523             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
524
525         else:
526             #
527             # without ESN TX sequence numbers can't wrap and packets are
528             # dropped from here on out.
529             #
530             self.send_and_assert_no_replies(self.tra_if, pkts)
531             seq_cycle_count += len(pkts)
532             self.assert_error_counter_equal(seq_cycle_node_name,
533                                             seq_cycle_count)
534
535         # move the security-associations seq number on to the last we used
536         self.vapi.cli("test ipsec sa %d seq 0x15f" % p.scapy_tra_sa_id)
537         p.scapy_tra_sa.seq_num = 351
538         p.vpp_tra_sa.seq_num = 351
539
540     def verify_tra_basic4(self, count=1):
541         """ ipsec v4 transport basic test """
542         self.vapi.cli("clear errors")
543         self.vapi.cli("clear ipsec sa")
544         try:
545             p = self.params[socket.AF_INET]
546             send_pkts = self.gen_encrypt_pkts(p.scapy_tra_sa, self.tra_if,
547                                               src=self.tra_if.remote_ip4,
548                                               dst=self.tra_if.local_ip4,
549                                               count=count)
550             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
551                                              self.tra_if)
552             for rx in recv_pkts:
553                 self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
554                 self.assert_packet_checksums_valid(rx)
555                 try:
556                     decrypted = p.vpp_tra_sa.decrypt(rx[IP])
557                     self.assert_packet_checksums_valid(decrypted)
558                 except:
559                     self.logger.debug(ppp("Unexpected packet:", rx))
560                     raise
561         finally:
562             self.logger.info(self.vapi.ppcli("show error"))
563             self.logger.info(self.vapi.ppcli("show ipsec all"))
564
565         pkts = p.tra_sa_in.get_stats()['packets']
566         self.assertEqual(pkts, count,
567                          "incorrect SA in counts: expected %d != %d" %
568                          (count, pkts))
569         pkts = p.tra_sa_out.get_stats()['packets']
570         self.assertEqual(pkts, count,
571                          "incorrect SA out counts: expected %d != %d" %
572                          (count, pkts))
573
574         self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
575         self.assert_packet_counter_equal(self.tra4_decrypt_node_name, count)
576
577
578 class IpsecTra4Tests(IpsecTra4):
579     """ UT test methods for Transport v4 """
580     def test_tra_anti_replay(self):
581         """ ipsec v4 transport anti-reply test """
582         self.verify_tra_anti_replay()
583
584     def test_tra_basic(self, count=1):
585         """ ipsec v4 transport basic test """
586         self.verify_tra_basic4(count=1)
587
588     def test_tra_burst(self):
589         """ ipsec v4 transport burst test """
590         self.verify_tra_basic4(count=257)
591
592
593 class IpsecTra6(object):
594     """ verify methods for Transport v6 """
595     def verify_tra_basic6(self, count=1):
596         self.vapi.cli("clear errors")
597         try:
598             p = self.params[socket.AF_INET6]
599             send_pkts = self.gen_encrypt_pkts6(p.scapy_tra_sa, self.tra_if,
600                                                src=self.tra_if.remote_ip6,
601                                                dst=self.tra_if.local_ip6,
602                                                count=count)
603             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
604                                              self.tra_if)
605             for rx in recv_pkts:
606                 self.assertEqual(len(rx) - len(Ether()) - len(IPv6()),
607                                  rx[IPv6].plen)
608                 try:
609                     decrypted = p.vpp_tra_sa.decrypt(rx[IPv6])
610                     self.assert_packet_checksums_valid(decrypted)
611                 except:
612                     self.logger.debug(ppp("Unexpected packet:", rx))
613                     raise
614         finally:
615             self.logger.info(self.vapi.ppcli("show error"))
616             self.logger.info(self.vapi.ppcli("show ipsec all"))
617
618         pkts = p.tra_sa_in.get_stats()['packets']
619         self.assertEqual(pkts, count,
620                          "incorrect SA in counts: expected %d != %d" %
621                          (count, pkts))
622         pkts = p.tra_sa_out.get_stats()['packets']
623         self.assertEqual(pkts, count,
624                          "incorrect SA out counts: expected %d != %d" %
625                          (count, pkts))
626         self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
627         self.assert_packet_counter_equal(self.tra6_decrypt_node_name, count)
628
629
630 class IpsecTra6Tests(IpsecTra6):
631     """ UT test methods for Transport v6 """
632     def test_tra_basic6(self):
633         """ ipsec v6 transport basic test """
634         self.verify_tra_basic6(count=1)
635
636     def test_tra_burst6(self):
637         """ ipsec v6 transport burst test """
638         self.verify_tra_basic6(count=257)
639
640
641 class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
642     """ UT test methods for Transport v6 and v4"""
643     pass
644
645
646 class IpsecTun4(object):
647     """ verify methods for Tunnel v4 """
648     def verify_counters4(self, p, count, n_frags=None):
649         if not n_frags:
650             n_frags = count
651         if (hasattr(p, "spd_policy_in_any")):
652             pkts = p.spd_policy_in_any.get_stats()['packets']
653             self.assertEqual(pkts, count,
654                              "incorrect SPD any policy: expected %d != %d" %
655                              (count, pkts))
656
657         if (hasattr(p, "tun_sa_in")):
658             pkts = p.tun_sa_in.get_stats()['packets']
659             self.assertEqual(pkts, count,
660                              "incorrect SA in counts: expected %d != %d" %
661                              (count, pkts))
662             pkts = p.tun_sa_out.get_stats()['packets']
663             self.assertEqual(pkts, count,
664                              "incorrect SA out counts: expected %d != %d" %
665                              (count, pkts))
666
667         self.assert_packet_counter_equal(self.tun4_encrypt_node_name, n_frags)
668         self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
669
670     def verify_decrypted(self, p, rxs):
671         for rx in rxs:
672             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
673             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
674             self.assert_packet_checksums_valid(rx)
675
676     def verify_encrypted(self, p, sa, rxs):
677         decrypt_pkts = []
678         for rx in rxs:
679             if p.nat_header:
680                 self.assertEqual(rx[UDP].dport, 4500)
681             self.assert_packet_checksums_valid(rx)
682             self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
683             try:
684                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IP])
685                 if not decrypt_pkt.haslayer(IP):
686                     decrypt_pkt = IP(decrypt_pkt[Raw].load)
687                 decrypt_pkts.append(decrypt_pkt)
688                 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
689                 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
690             except:
691                 self.logger.debug(ppp("Unexpected packet:", rx))
692                 try:
693                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
694                 except:
695                     pass
696                 raise
697         pkts = reassemble4(decrypt_pkts)
698         for pkt in pkts:
699             self.assert_packet_checksums_valid(pkt)
700
701     def verify_tun_44(self, p, count=1, payload_size=64, n_rx=None):
702         self.vapi.cli("clear errors")
703         if not n_rx:
704             n_rx = count
705         try:
706             config_tun_params(p, self.encryption_type, self.tun_if)
707             send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
708                                               src=p.remote_tun_if_host,
709                                               dst=self.pg1.remote_ip4,
710                                               count=count)
711             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
712             self.verify_decrypted(p, recv_pkts)
713
714             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
715                                       dst=p.remote_tun_if_host, count=count,
716                                       payload_size=payload_size)
717             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
718                                              self.tun_if, n_rx)
719             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
720
721         finally:
722             self.logger.info(self.vapi.ppcli("show error"))
723             self.logger.info(self.vapi.ppcli("show ipsec all"))
724
725         self.verify_counters4(p, count, n_rx)
726
727     def verify_tun_64(self, p, count=1):
728         self.vapi.cli("clear errors")
729         try:
730             config_tun_params(p, self.encryption_type, self.tun_if)
731             send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
732                                                src=p.remote_tun_if_host6,
733                                                dst=self.pg1.remote_ip6,
734                                                count=count)
735             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
736             for recv_pkt in recv_pkts:
737                 self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host6)
738                 self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
739                 self.assert_packet_checksums_valid(recv_pkt)
740             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
741                                        dst=p.remote_tun_if_host6, count=count)
742             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
743             for recv_pkt in recv_pkts:
744                 try:
745                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
746                     if not decrypt_pkt.haslayer(IPv6):
747                         decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
748                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
749                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host6)
750                     self.assert_packet_checksums_valid(decrypt_pkt)
751                 except:
752                     self.logger.error(ppp("Unexpected packet:", recv_pkt))
753                     try:
754                         self.logger.debug(
755                             ppp("Decrypted packet:", decrypt_pkt))
756                     except:
757                         pass
758                     raise
759         finally:
760             self.logger.info(self.vapi.ppcli("show error"))
761             self.logger.info(self.vapi.ppcli("show ipsec all"))
762
763         self.verify_counters4(p, count)
764
765     def verify_keepalive(self, p):
766         pkt = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
767                IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4) /
768                UDP(sport=333, dport=4500) /
769                Raw(0xff))
770         self.send_and_assert_no_replies(self.tun_if, pkt*31)
771         self.assert_error_counter_equal(
772             '/err/%s/NAT Keepalive' % self.tun4_input_node, 31)
773
774         pkt = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
775                IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4) /
776                UDP(sport=333, dport=4500) /
777                Raw(0xfe))
778         self.send_and_assert_no_replies(self.tun_if, pkt*31)
779         self.assert_error_counter_equal(
780             '/err/%s/Too Short' % self.tun4_input_node, 31)
781
782
783 class IpsecTun4Tests(IpsecTun4):
784     """ UT test methods for Tunnel v4 """
785     def test_tun_basic44(self):
786         """ ipsec 4o4 tunnel basic test """
787         self.verify_tun_44(self.params[socket.AF_INET], count=1)
788
789     def test_tun_burst44(self):
790         """ ipsec 4o4 tunnel burst test """
791         self.verify_tun_44(self.params[socket.AF_INET], count=257)
792
793
794 class IpsecTun6(object):
795     """ verify methods for Tunnel v6 """
796     def verify_counters6(self, p_in, p_out, count):
797         if (hasattr(p_in, "tun_sa_in")):
798             pkts = p_in.tun_sa_in.get_stats()['packets']
799             self.assertEqual(pkts, count,
800                              "incorrect SA in counts: expected %d != %d" %
801                              (count, pkts))
802         if (hasattr(p_out, "tun_sa_out")):
803             pkts = p_out.tun_sa_out.get_stats()['packets']
804             self.assertEqual(pkts, count,
805                              "incorrect SA out counts: expected %d != %d" %
806                              (count, pkts))
807         self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
808         self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count)
809
810     def verify_decrypted6(self, p, rxs):
811         for rx in rxs:
812             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
813             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
814             self.assert_packet_checksums_valid(rx)
815
816     def verify_encrypted6(self, p, sa, rxs):
817         for rx in rxs:
818             self.assert_packet_checksums_valid(rx)
819             self.assertEqual(len(rx) - len(Ether()) - len(IPv6()),
820                              rx[IPv6].plen)
821             try:
822                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IPv6])
823                 if not decrypt_pkt.haslayer(IPv6):
824                     decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
825                 self.assert_packet_checksums_valid(decrypt_pkt)
826                 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
827                 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
828             except:
829                 self.logger.debug(ppp("Unexpected packet:", rx))
830                 try:
831                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
832                 except:
833                     pass
834                 raise
835
836     def verify_drop_tun_66(self, p_in, count=1, payload_size=64):
837         self.vapi.cli("clear errors")
838         self.vapi.cli("clear ipsec sa")
839
840         config_tun_params(p_in, self.encryption_type, self.tun_if)
841         send_pkts = self.gen_encrypt_pkts6(p_in.scapy_tun_sa, self.tun_if,
842                                            src=p_in.remote_tun_if_host,
843                                            dst=self.pg1.remote_ip6,
844                                            count=count)
845         self.send_and_assert_no_replies(self.tun_if, send_pkts)
846         self.logger.info(self.vapi.cli("sh punt stats"))
847
848     def verify_tun_66(self, p_in, p_out=None, count=1, payload_size=64):
849         self.vapi.cli("clear errors")
850         self.vapi.cli("clear ipsec sa")
851         if not p_out:
852             p_out = p_in
853         try:
854             config_tun_params(p_in, self.encryption_type, self.tun_if)
855             config_tun_params(p_out, self.encryption_type, self.tun_if)
856             send_pkts = self.gen_encrypt_pkts6(p_in.scapy_tun_sa, self.tun_if,
857                                                src=p_in.remote_tun_if_host,
858                                                dst=self.pg1.remote_ip6,
859                                                count=count)
860             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
861             self.verify_decrypted6(p_in, recv_pkts)
862
863             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
864                                        dst=p_out.remote_tun_if_host,
865                                        count=count,
866                                        payload_size=payload_size)
867             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
868                                              self.tun_if)
869             self.verify_encrypted6(p_out, p_out.vpp_tun_sa, recv_pkts)
870
871         finally:
872             self.logger.info(self.vapi.ppcli("show error"))
873             self.logger.info(self.vapi.ppcli("show ipsec all"))
874         self.verify_counters6(p_in, p_out, count)
875
876     def verify_tun_46(self, p, count=1):
877         """ ipsec 4o6 tunnel basic test """
878         self.vapi.cli("clear errors")
879         try:
880             config_tun_params(p, self.encryption_type, self.tun_if)
881             send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
882                                               src=p.remote_tun_if_host4,
883                                               dst=self.pg1.remote_ip4,
884                                               count=count)
885             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
886             for recv_pkt in recv_pkts:
887                 self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host4)
888                 self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
889                 self.assert_packet_checksums_valid(recv_pkt)
890             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
891                                       dst=p.remote_tun_if_host4,
892                                       count=count)
893             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
894             for recv_pkt in recv_pkts:
895                 try:
896                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
897                     if not decrypt_pkt.haslayer(IP):
898                         decrypt_pkt = IP(decrypt_pkt[Raw].load)
899                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
900                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host4)
901                     self.assert_packet_checksums_valid(decrypt_pkt)
902                 except:
903                     self.logger.debug(ppp("Unexpected packet:", recv_pkt))
904                     try:
905                         self.logger.debug(ppp("Decrypted packet:",
906                                               decrypt_pkt))
907                     except:
908                         pass
909                     raise
910         finally:
911             self.logger.info(self.vapi.ppcli("show error"))
912             self.logger.info(self.vapi.ppcli("show ipsec all"))
913         self.verify_counters6(p, p, count)
914
915
916 class IpsecTun6Tests(IpsecTun6):
917     """ UT test methods for Tunnel v6 """
918
919     def test_tun_basic66(self):
920         """ ipsec 6o6 tunnel basic test """
921         self.verify_tun_66(self.params[socket.AF_INET6], count=1)
922
923     def test_tun_burst66(self):
924         """ ipsec 6o6 tunnel burst test """
925         self.verify_tun_66(self.params[socket.AF_INET6], count=257)
926
927
928 class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests):
929     """ UT test methods for Tunnel v6 & v4 """
930     pass
931
932
933 if __name__ == '__main__':
934     unittest.main(testRunner=VppTestRunner)