ipsec: handle UDP keepalives
[vpp.git] / test / template_ipsec.py
1 import unittest
2 import socket
3 import struct
4
5 from scapy.layers.inet import IP, ICMP, TCP, UDP
6 from scapy.layers.ipsec import SecurityAssociation, ESP
7 from scapy.layers.l2 import Ether, Raw
8 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
9
10 from framework import VppTestCase, VppTestRunner
11 from util import ppp, reassemble4
12 from vpp_papi import VppEnum
13
14
15 class IPsecIPv4Params(object):
16
17     addr_type = socket.AF_INET
18     addr_any = "0.0.0.0"
19     addr_bcast = "255.255.255.255"
20     addr_len = 32
21     is_ipv6 = 0
22
23     def __init__(self):
24         self.remote_tun_if_host = '1.1.1.1'
25         self.remote_tun_if_host6 = '1111::1'
26
27         self.scapy_tun_sa_id = 10
28         self.scapy_tun_spi = 1001
29         self.vpp_tun_sa_id = 20
30         self.vpp_tun_spi = 1000
31
32         self.scapy_tra_sa_id = 30
33         self.scapy_tra_spi = 2001
34         self.vpp_tra_sa_id = 40
35         self.vpp_tra_spi = 2000
36
37         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
38                                  IPSEC_API_INTEG_ALG_SHA1_96)
39         self.auth_algo = 'HMAC-SHA1-96'  # scapy name
40         self.auth_key = 'C91KUR9GYMm5GfkEvNjX'
41
42         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
43                                   IPSEC_API_CRYPTO_ALG_AES_CBC_128)
44         self.crypt_algo = 'AES-CBC'  # scapy name
45         self.crypt_key = 'JPjyOWBeVEQiMe7h'
46         self.salt = 0
47         self.flags = 0
48         self.nat_header = None
49
50
51 class IPsecIPv6Params(object):
52
53     addr_type = socket.AF_INET6
54     addr_any = "0::0"
55     addr_bcast = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"
56     addr_len = 128
57     is_ipv6 = 1
58
59     def __init__(self):
60         self.remote_tun_if_host = '1111:1111:1111:1111:1111:1111:1111:1111'
61         self.remote_tun_if_host4 = '1.1.1.1'
62
63         self.scapy_tun_sa_id = 50
64         self.scapy_tun_spi = 3001
65         self.vpp_tun_sa_id = 60
66         self.vpp_tun_spi = 3000
67
68         self.scapy_tra_sa_id = 70
69         self.scapy_tra_spi = 4001
70         self.vpp_tra_sa_id = 80
71         self.vpp_tra_spi = 4000
72
73         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
74                                  IPSEC_API_INTEG_ALG_SHA1_96)
75         self.auth_algo = 'HMAC-SHA1-96'  # scapy name
76         self.auth_key = 'C91KUR9GYMm5GfkEvNjX'
77
78         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
79                                   IPSEC_API_CRYPTO_ALG_AES_CBC_128)
80         self.crypt_algo = 'AES-CBC'  # scapy name
81         self.crypt_key = 'JPjyOWBeVEQiMe7h'
82         self.salt = 0
83         self.flags = 0
84         self.nat_header = None
85
86
87 def config_tun_params(p, encryption_type, tun_if):
88     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
89     use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
90                               IPSEC_API_SAD_FLAG_USE_ESN))
91     if p.crypt_algo == "AES-GCM":
92         crypt_key = p.crypt_key + struct.pack("!I", p.salt)
93     else:
94         crypt_key = p.crypt_key
95     p.scapy_tun_sa = SecurityAssociation(
96         encryption_type, spi=p.vpp_tun_spi,
97         crypt_algo=p.crypt_algo,
98         crypt_key=crypt_key,
99         auth_algo=p.auth_algo, auth_key=p.auth_key,
100         tunnel_header=ip_class_by_addr_type[p.addr_type](
101             src=tun_if.remote_addr[p.addr_type],
102             dst=tun_if.local_addr[p.addr_type]),
103         nat_t_header=p.nat_header,
104         use_esn=use_esn)
105     p.vpp_tun_sa = SecurityAssociation(
106         encryption_type, spi=p.scapy_tun_spi,
107         crypt_algo=p.crypt_algo,
108         crypt_key=crypt_key,
109         auth_algo=p.auth_algo, auth_key=p.auth_key,
110         tunnel_header=ip_class_by_addr_type[p.addr_type](
111             dst=tun_if.remote_addr[p.addr_type],
112             src=tun_if.local_addr[p.addr_type]),
113         nat_t_header=p.nat_header,
114         use_esn=use_esn)
115
116
117 def config_tra_params(p, encryption_type):
118     use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
119                               IPSEC_API_SAD_FLAG_USE_ESN))
120     if p.crypt_algo == "AES-GCM":
121         crypt_key = p.crypt_key + struct.pack("!I", p.salt)
122     else:
123         crypt_key = p.crypt_key
124     p.scapy_tra_sa = SecurityAssociation(
125         encryption_type,
126         spi=p.vpp_tra_spi,
127         crypt_algo=p.crypt_algo,
128         crypt_key=crypt_key,
129         auth_algo=p.auth_algo,
130         auth_key=p.auth_key,
131         nat_t_header=p.nat_header,
132         use_esn=use_esn)
133     p.vpp_tra_sa = SecurityAssociation(
134         encryption_type,
135         spi=p.scapy_tra_spi,
136         crypt_algo=p.crypt_algo,
137         crypt_key=crypt_key,
138         auth_algo=p.auth_algo,
139         auth_key=p.auth_key,
140         nat_t_header=p.nat_header,
141         use_esn=use_esn)
142
143
144 class TemplateIpsec(VppTestCase):
145     """
146     TRANSPORT MODE:
147
148      ------   encrypt   ---
149     |tra_if| <-------> |VPP|
150      ------   decrypt   ---
151
152     TUNNEL MODE:
153
154      ------   encrypt   ---   plain   ---
155     |tun_if| <-------  |VPP| <------ |pg1|
156      ------             ---           ---
157
158      ------   decrypt   ---   plain   ---
159     |tun_if| ------->  |VPP| ------> |pg1|
160      ------             ---           ---
161     """
162     tun_spd_id = 1
163     tra_spd_id = 2
164
165     def ipsec_select_backend(self):
166         """ empty method to be overloaded when necessary """
167         pass
168
169     @classmethod
170     def setUpClass(cls):
171         super(TemplateIpsec, cls).setUpClass()
172
173     @classmethod
174     def tearDownClass(cls):
175         super(TemplateIpsec, cls).tearDownClass()
176
177     def setup_params(self):
178         self.ipv4_params = IPsecIPv4Params()
179         self.ipv6_params = IPsecIPv6Params()
180         self.params = {self.ipv4_params.addr_type: self.ipv4_params,
181                        self.ipv6_params.addr_type: self.ipv6_params}
182
183     def config_interfaces(self):
184         self.create_pg_interfaces(range(3))
185         self.interfaces = list(self.pg_interfaces)
186         for i in self.interfaces:
187             i.admin_up()
188             i.config_ip4()
189             i.resolve_arp()
190             i.config_ip6()
191             i.resolve_ndp()
192
193     def setUp(self):
194         super(TemplateIpsec, self).setUp()
195
196         self.setup_params()
197
198         self.vpp_esp_protocol = (VppEnum.vl_api_ipsec_proto_t.
199                                  IPSEC_API_PROTO_ESP)
200         self.vpp_ah_protocol = (VppEnum.vl_api_ipsec_proto_t.
201                                 IPSEC_API_PROTO_AH)
202
203         self.config_interfaces()
204
205         self.ipsec_select_backend()
206
207     def unconfig_interfaces(self):
208         for i in self.interfaces:
209             i.admin_down()
210             i.unconfig_ip4()
211             i.unconfig_ip6()
212
213     def tearDown(self):
214         super(TemplateIpsec, self).tearDown()
215
216         self.unconfig_interfaces()
217
218     def show_commands_at_teardown(self):
219         self.logger.info(self.vapi.cli("show hardware"))
220
221     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
222                          payload_size=54):
223         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
224                 sa.encrypt(IP(src=src, dst=dst) /
225                            ICMP() / Raw('X' * payload_size))
226                 for i in range(count)]
227
228     def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
229                           payload_size=54):
230         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
231                 sa.encrypt(IPv6(src=src, dst=dst) /
232                            ICMPv6EchoRequest(id=0, seq=1,
233                                              data='X' * payload_size))
234                 for i in range(count)]
235
236     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
237         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
238                 IP(src=src, dst=dst) / ICMP() / Raw('X' * payload_size)
239                 for i in range(count)]
240
241     def gen_pkts6(self, sw_intf, src, dst, count=1, payload_size=54):
242         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
243                 IPv6(src=src, dst=dst) /
244                 ICMPv6EchoRequest(id=0, seq=1, data='X' * payload_size)
245                 for i in range(count)]
246
247
248 class IpsecTcp(object):
249     def verify_tcp_checksum(self):
250         self.vapi.cli("test http server")
251         p = self.params[socket.AF_INET]
252         config_tun_params(p, self.encryption_type, self.tun_if)
253         send = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
254                 p.scapy_tun_sa.encrypt(IP(src=p.remote_tun_if_host,
255                                           dst=self.tun_if.local_ip4) /
256                                        TCP(flags='S', dport=80)))
257         self.logger.debug(ppp("Sending packet:", send))
258         recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
259         recv = recv[0]
260         decrypted = p.vpp_tun_sa.decrypt(recv[IP])
261         self.assert_packet_checksums_valid(decrypted)
262
263
264 class IpsecTcpTests(IpsecTcp):
265     def test_tcp_checksum(self):
266         """ verify checksum correctness for vpp generated packets """
267         self.verify_tcp_checksum()
268
269
270 class IpsecTra4(object):
271     """ verify methods for Transport v4 """
272     def verify_tra_anti_replay(self, count=1):
273         p = self.params[socket.AF_INET]
274         use_esn = p.vpp_tra_sa.use_esn
275
276         # fire in a packet with seq number 1
277         pkt = (Ether(src=self.tra_if.remote_mac,
278                      dst=self.tra_if.local_mac) /
279                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
280                                          dst=self.tra_if.local_ip4) /
281                                       ICMP(),
282                                       seq_num=1))
283         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
284
285         # now move the window over to 235
286         pkt = (Ether(src=self.tra_if.remote_mac,
287                      dst=self.tra_if.local_mac) /
288                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
289                                          dst=self.tra_if.local_ip4) /
290                                       ICMP(),
291                                       seq_num=235))
292         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
293
294         # replayed packets are dropped
295         self.send_and_assert_no_replies(self.tra_if, pkt * 3)
296         self.assert_error_counter_equal(
297             '/err/%s/SA replayed packet' % self.tra4_decrypt_node_name, 3)
298
299         # the window size is 64 packets
300         # in window are still accepted
301         pkt = (Ether(src=self.tra_if.remote_mac,
302                      dst=self.tra_if.local_mac) /
303                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
304                                          dst=self.tra_if.local_ip4) /
305                                       ICMP(),
306                                       seq_num=172))
307         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
308
309         # a packet that does not decrypt does not move the window forward
310         bogus_sa = SecurityAssociation(self.encryption_type,
311                                        p.vpp_tra_spi,
312                                        crypt_algo=p.crypt_algo,
313                                        crypt_key=p.crypt_key[::-1],
314                                        auth_algo=p.auth_algo,
315                                        auth_key=p.auth_key[::-1])
316         pkt = (Ether(src=self.tra_if.remote_mac,
317                      dst=self.tra_if.local_mac) /
318                bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
319                                    dst=self.tra_if.local_ip4) /
320                                 ICMP(),
321                                 seq_num=350))
322         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
323
324         self.assert_error_counter_equal(
325             '/err/%s/Integrity check failed' % self.tra4_decrypt_node_name, 17)
326
327         # a malformed 'runt' packet
328         #  created by a mis-constructed SA
329         if (ESP == self.encryption_type):
330             bogus_sa = SecurityAssociation(self.encryption_type,
331                                            p.vpp_tra_spi)
332             pkt = (Ether(src=self.tra_if.remote_mac,
333                          dst=self.tra_if.local_mac) /
334                    bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
335                                        dst=self.tra_if.local_ip4) /
336                                     ICMP(),
337                                     seq_num=350))
338             self.send_and_assert_no_replies(self.tra_if, pkt * 17)
339
340             self.assert_error_counter_equal(
341                 '/err/%s/undersized packet' % self.tra4_decrypt_node_name, 17)
342
343         # which we can determine since this packet is still in the window
344         pkt = (Ether(src=self.tra_if.remote_mac,
345                      dst=self.tra_if.local_mac) /
346                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
347                                          dst=self.tra_if.local_ip4) /
348                                       ICMP(),
349                                       seq_num=234))
350         self.send_and_expect(self.tra_if, [pkt], self.tra_if)
351
352         # out of window are dropped
353         pkt = (Ether(src=self.tra_if.remote_mac,
354                      dst=self.tra_if.local_mac) /
355                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
356                                          dst=self.tra_if.local_ip4) /
357                                       ICMP(),
358                                       seq_num=17))
359         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
360
361         if use_esn:
362             # an out of window error with ESN looks like a high sequence
363             # wrap. but since it isn't then the verify will fail.
364             self.assert_error_counter_equal(
365                 '/err/%s/Integrity check failed' %
366                 self.tra4_decrypt_node_name, 34)
367
368         else:
369             self.assert_error_counter_equal(
370                 '/err/%s/SA replayed packet' %
371                 self.tra4_decrypt_node_name, 20)
372
373         # valid packet moves the window over to 236
374         pkt = (Ether(src=self.tra_if.remote_mac,
375                      dst=self.tra_if.local_mac) /
376                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
377                                          dst=self.tra_if.local_ip4) /
378                                       ICMP(),
379                                       seq_num=236))
380         rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
381         decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
382
383         # move VPP's SA to just before the seq-number wrap
384         self.vapi.cli("test ipsec sa %d seq 0xffffffff" % p.scapy_tra_sa_id)
385
386         # then fire in a packet that VPP should drop because it causes the
387         # seq number to wrap  unless we're using extended.
388         pkt = (Ether(src=self.tra_if.remote_mac,
389                      dst=self.tra_if.local_mac) /
390                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
391                                          dst=self.tra_if.local_ip4) /
392                                       ICMP(),
393                                       seq_num=237))
394
395         if use_esn:
396             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
397             # in order to decrpyt the high order number needs to wrap
398             p.vpp_tra_sa.seq_num = 0x100000000
399             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
400
401             # send packets with high bits set
402             p.scapy_tra_sa.seq_num = 0x100000005
403             pkt = (Ether(src=self.tra_if.remote_mac,
404                          dst=self.tra_if.local_mac) /
405                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
406                                              dst=self.tra_if.local_ip4) /
407                                           ICMP(),
408                                           seq_num=0x100000005))
409             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
410             # in order to decrpyt the high order number needs to wrap
411             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
412         else:
413             self.send_and_assert_no_replies(self.tra_if, [pkt])
414             self.assert_error_counter_equal(
415                 '/err/%s/sequence number cycled' %
416                 self.tra4_encrypt_node_name, 1)
417
418         # move the security-associations seq number on to the last we used
419         self.vapi.cli("test ipsec sa %d seq 0x15f" % p.scapy_tra_sa_id)
420         p.scapy_tra_sa.seq_num = 351
421         p.vpp_tra_sa.seq_num = 351
422
423     def verify_tra_basic4(self, count=1):
424         """ ipsec v4 transport basic test """
425         self.vapi.cli("clear errors")
426         try:
427             p = self.params[socket.AF_INET]
428             send_pkts = self.gen_encrypt_pkts(p.scapy_tra_sa, self.tra_if,
429                                               src=self.tra_if.remote_ip4,
430                                               dst=self.tra_if.local_ip4,
431                                               count=count)
432             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
433                                              self.tra_if)
434             for rx in recv_pkts:
435                 self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
436                 self.assert_packet_checksums_valid(rx)
437                 try:
438                     decrypted = p.vpp_tra_sa.decrypt(rx[IP])
439                     self.assert_packet_checksums_valid(decrypted)
440                 except:
441                     self.logger.debug(ppp("Unexpected packet:", rx))
442                     raise
443         finally:
444             self.logger.info(self.vapi.ppcli("show error"))
445             self.logger.info(self.vapi.ppcli("show ipsec all"))
446
447         pkts = p.tra_sa_in.get_stats()['packets']
448         self.assertEqual(pkts, count,
449                          "incorrect SA in counts: expected %d != %d" %
450                          (count, pkts))
451         pkts = p.tra_sa_out.get_stats()['packets']
452         self.assertEqual(pkts, count,
453                          "incorrect SA out counts: expected %d != %d" %
454                          (count, pkts))
455
456         self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
457         self.assert_packet_counter_equal(self.tra4_decrypt_node_name, count)
458
459
460 class IpsecTra4Tests(IpsecTra4):
461     """ UT test methods for Transport v4 """
462     def test_tra_anti_replay(self):
463         """ ipsec v4 transport anti-reply test """
464         self.verify_tra_anti_replay(count=1)
465
466     def test_tra_basic(self, count=1):
467         """ ipsec v4 transport basic test """
468         self.verify_tra_basic4(count=1)
469
470     def test_tra_burst(self):
471         """ ipsec v4 transport burst test """
472         self.verify_tra_basic4(count=257)
473
474
475 class IpsecTra6(object):
476     """ verify methods for Transport v6 """
477     def verify_tra_basic6(self, count=1):
478         self.vapi.cli("clear errors")
479         try:
480             p = self.params[socket.AF_INET6]
481             send_pkts = self.gen_encrypt_pkts6(p.scapy_tra_sa, self.tra_if,
482                                                src=self.tra_if.remote_ip6,
483                                                dst=self.tra_if.local_ip6,
484                                                count=count)
485             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
486                                              self.tra_if)
487             for rx in recv_pkts:
488                 self.assertEqual(len(rx) - len(Ether()) - len(IPv6()),
489                                  rx[IPv6].plen)
490                 try:
491                     decrypted = p.vpp_tra_sa.decrypt(rx[IPv6])
492                     self.assert_packet_checksums_valid(decrypted)
493                 except:
494                     self.logger.debug(ppp("Unexpected packet:", rx))
495                     raise
496         finally:
497             self.logger.info(self.vapi.ppcli("show error"))
498             self.logger.info(self.vapi.ppcli("show ipsec all"))
499
500         pkts = p.tra_sa_in.get_stats()['packets']
501         self.assertEqual(pkts, count,
502                          "incorrect SA in counts: expected %d != %d" %
503                          (count, pkts))
504         pkts = p.tra_sa_out.get_stats()['packets']
505         self.assertEqual(pkts, count,
506                          "incorrect SA out counts: expected %d != %d" %
507                          (count, pkts))
508         self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
509         self.assert_packet_counter_equal(self.tra6_decrypt_node_name, count)
510
511
512 class IpsecTra6Tests(IpsecTra6):
513     """ UT test methods for Transport v6 """
514     def test_tra_basic6(self):
515         """ ipsec v6 transport basic test """
516         self.verify_tra_basic6(count=1)
517
518     def test_tra_burst6(self):
519         """ ipsec v6 transport burst test """
520         self.verify_tra_basic6(count=257)
521
522
523 class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
524     """ UT test methods for Transport v6 and v4"""
525     pass
526
527
528 class IpsecTun4(object):
529     """ verify methods for Tunnel v4 """
530     def verify_counters4(self, p, count, n_frags=None):
531         if not n_frags:
532             n_frags = count
533         if (hasattr(p, "spd_policy_in_any")):
534             pkts = p.spd_policy_in_any.get_stats()['packets']
535             self.assertEqual(pkts, count,
536                              "incorrect SPD any policy: expected %d != %d" %
537                              (count, pkts))
538
539         if (hasattr(p, "tun_sa_in")):
540             pkts = p.tun_sa_in.get_stats()['packets']
541             self.assertEqual(pkts, count,
542                              "incorrect SA in counts: expected %d != %d" %
543                              (count, pkts))
544             pkts = p.tun_sa_out.get_stats()['packets']
545             self.assertEqual(pkts, count,
546                              "incorrect SA out counts: expected %d != %d" %
547                              (count, pkts))
548
549         self.assert_packet_counter_equal(self.tun4_encrypt_node_name, n_frags)
550         self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
551
552     def verify_decrypted(self, p, rxs):
553         for rx in rxs:
554             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
555             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
556             self.assert_packet_checksums_valid(rx)
557
558     def verify_encrypted(self, p, sa, rxs):
559         decrypt_pkts = []
560         for rx in rxs:
561             if p.nat_header:
562                 self.assertEqual(rx[UDP].dport, 4500)
563             self.assert_packet_checksums_valid(rx)
564             self.assertEqual(len(rx) - len(Ether()), rx[IP].len)
565             try:
566                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IP])
567                 if not decrypt_pkt.haslayer(IP):
568                     decrypt_pkt = IP(decrypt_pkt[Raw].load)
569                 decrypt_pkts.append(decrypt_pkt)
570                 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
571                 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
572             except:
573                 self.logger.debug(ppp("Unexpected packet:", rx))
574                 try:
575                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
576                 except:
577                     pass
578                 raise
579         pkts = reassemble4(decrypt_pkts)
580         for pkt in pkts:
581             self.assert_packet_checksums_valid(pkt)
582
583     def verify_tun_44(self, p, count=1, payload_size=64, n_rx=None):
584         self.vapi.cli("clear errors")
585         if not n_rx:
586             n_rx = count
587         try:
588             config_tun_params(p, self.encryption_type, self.tun_if)
589             send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
590                                               src=p.remote_tun_if_host,
591                                               dst=self.pg1.remote_ip4,
592                                               count=count)
593             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
594             self.verify_decrypted(p, recv_pkts)
595
596             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
597                                       dst=p.remote_tun_if_host, count=count,
598                                       payload_size=payload_size)
599             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
600                                              self.tun_if, n_rx)
601             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
602
603         finally:
604             self.logger.info(self.vapi.ppcli("show error"))
605             self.logger.info(self.vapi.ppcli("show ipsec all"))
606
607         self.verify_counters4(p, count, n_rx)
608
609     def verify_tun_64(self, p, count=1):
610         self.vapi.cli("clear errors")
611         try:
612             config_tun_params(p, self.encryption_type, self.tun_if)
613             send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
614                                                src=p.remote_tun_if_host6,
615                                                dst=self.pg1.remote_ip6,
616                                                count=count)
617             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
618             for recv_pkt in recv_pkts:
619                 self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host6)
620                 self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
621                 self.assert_packet_checksums_valid(recv_pkt)
622             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
623                                        dst=p.remote_tun_if_host6, count=count)
624             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
625             for recv_pkt in recv_pkts:
626                 try:
627                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
628                     if not decrypt_pkt.haslayer(IPv6):
629                         decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
630                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
631                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host6)
632                     self.assert_packet_checksums_valid(decrypt_pkt)
633                 except:
634                     self.logger.error(ppp("Unexpected packet:", recv_pkt))
635                     try:
636                         self.logger.debug(
637                             ppp("Decrypted packet:", decrypt_pkt))
638                     except:
639                         pass
640                     raise
641         finally:
642             self.logger.info(self.vapi.ppcli("show error"))
643             self.logger.info(self.vapi.ppcli("show ipsec all"))
644
645         self.verify_counters4(p, count)
646
647     def verify_keepalive(self, p):
648         pkt = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
649                IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4) /
650                UDP(sport=333, dport=4500) /
651                Raw(0xff))
652         self.send_and_assert_no_replies(self.tun_if, pkt*31)
653         self.assert_error_counter_equal(
654             '/err/%s/NAT Keepalive' % self.tun4_input_node, 31)
655
656         pkt = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
657                IP(src=p.remote_tun_if_host, dst=self.tun_if.local_ip4) /
658                UDP(sport=333, dport=4500) /
659                Raw(0xfe))
660         self.send_and_assert_no_replies(self.tun_if, pkt*31)
661         self.assert_error_counter_equal(
662             '/err/%s/Too Short' % self.tun4_input_node, 31)
663
664
665 class IpsecTun4Tests(IpsecTun4):
666     """ UT test methods for Tunnel v4 """
667     def test_tun_basic44(self):
668         """ ipsec 4o4 tunnel basic test """
669         self.verify_tun_44(self.params[socket.AF_INET], count=1)
670
671     def test_tun_burst44(self):
672         """ ipsec 4o4 tunnel burst test """
673         self.verify_tun_44(self.params[socket.AF_INET], count=257)
674
675
676 class IpsecTun6(object):
677     """ verify methods for Tunnel v6 """
678     def verify_counters6(self, p_in, p_out, count):
679         if (hasattr(p_in, "tun_sa_in")):
680             pkts = p_in.tun_sa_in.get_stats()['packets']
681             self.assertEqual(pkts, count,
682                              "incorrect SA in counts: expected %d != %d" %
683                              (count, pkts))
684         if (hasattr(p_out, "tun_sa_out")):
685             pkts = p_out.tun_sa_out.get_stats()['packets']
686             self.assertEqual(pkts, count,
687                              "incorrect SA out counts: expected %d != %d" %
688                              (count, pkts))
689         self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
690         self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count)
691
692     def verify_decrypted6(self, p, rxs):
693         for rx in rxs:
694             self.assert_equal(rx[IPv6].src, p.remote_tun_if_host)
695             self.assert_equal(rx[IPv6].dst, self.pg1.remote_ip6)
696             self.assert_packet_checksums_valid(rx)
697
698     def verify_encrypted6(self, p, sa, rxs):
699         for rx in rxs:
700             self.assert_packet_checksums_valid(rx)
701             self.assertEqual(len(rx) - len(Ether()) - len(IPv6()),
702                              rx[IPv6].plen)
703             try:
704                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IPv6])
705                 if not decrypt_pkt.haslayer(IPv6):
706                     decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
707                 self.assert_packet_checksums_valid(decrypt_pkt)
708                 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
709                 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
710             except:
711                 self.logger.debug(ppp("Unexpected packet:", rx))
712                 try:
713                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
714                 except:
715                     pass
716                 raise
717
718     def verify_drop_tun_66(self, p_in, count=1, payload_size=64):
719         self.vapi.cli("clear errors")
720         self.vapi.cli("clear ipsec sa")
721
722         config_tun_params(p_in, self.encryption_type, self.tun_if)
723         send_pkts = self.gen_encrypt_pkts6(p_in.scapy_tun_sa, self.tun_if,
724                                            src=p_in.remote_tun_if_host,
725                                            dst=self.pg1.remote_ip6,
726                                            count=count)
727         self.send_and_assert_no_replies(self.tun_if, send_pkts)
728         self.logger.info(self.vapi.cli("sh punt stats"))
729
730     def verify_tun_66(self, p_in, p_out=None, count=1, payload_size=64):
731         self.vapi.cli("clear errors")
732         self.vapi.cli("clear ipsec sa")
733         if not p_out:
734             p_out = p_in
735         try:
736             config_tun_params(p_in, self.encryption_type, self.tun_if)
737             config_tun_params(p_out, self.encryption_type, self.tun_if)
738             send_pkts = self.gen_encrypt_pkts6(p_in.scapy_tun_sa, self.tun_if,
739                                                src=p_in.remote_tun_if_host,
740                                                dst=self.pg1.remote_ip6,
741                                                count=count)
742             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
743             self.verify_decrypted6(p_in, recv_pkts)
744
745             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
746                                        dst=p_out.remote_tun_if_host,
747                                        count=count,
748                                        payload_size=payload_size)
749             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
750                                              self.tun_if)
751             self.verify_encrypted6(p_out, p_out.vpp_tun_sa, recv_pkts)
752
753         finally:
754             self.logger.info(self.vapi.ppcli("show error"))
755             self.logger.info(self.vapi.ppcli("show ipsec all"))
756         self.verify_counters6(p_in, p_out, count)
757
758     def verify_tun_46(self, p, count=1):
759         """ ipsec 4o6 tunnel basic test """
760         self.vapi.cli("clear errors")
761         try:
762             config_tun_params(p, self.encryption_type, self.tun_if)
763             send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
764                                               src=p.remote_tun_if_host4,
765                                               dst=self.pg1.remote_ip4,
766                                               count=count)
767             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
768             for recv_pkt in recv_pkts:
769                 self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host4)
770                 self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
771                 self.assert_packet_checksums_valid(recv_pkt)
772             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
773                                       dst=p.remote_tun_if_host4,
774                                       count=count)
775             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
776             for recv_pkt in recv_pkts:
777                 try:
778                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
779                     if not decrypt_pkt.haslayer(IP):
780                         decrypt_pkt = IP(decrypt_pkt[Raw].load)
781                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
782                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host4)
783                     self.assert_packet_checksums_valid(decrypt_pkt)
784                 except:
785                     self.logger.debug(ppp("Unexpected packet:", recv_pkt))
786                     try:
787                         self.logger.debug(ppp("Decrypted packet:",
788                                               decrypt_pkt))
789                     except:
790                         pass
791                     raise
792         finally:
793             self.logger.info(self.vapi.ppcli("show error"))
794             self.logger.info(self.vapi.ppcli("show ipsec all"))
795         self.verify_counters6(p, p, count)
796
797
798 class IpsecTun6Tests(IpsecTun6):
799     """ UT test methods for Tunnel v6 """
800
801     def test_tun_basic66(self):
802         """ ipsec 6o6 tunnel basic test """
803         self.verify_tun_66(self.params[socket.AF_INET6], count=1)
804
805     def test_tun_burst66(self):
806         """ ipsec 6o6 tunnel burst test """
807         self.verify_tun_66(self.params[socket.AF_INET6], count=257)
808
809
810 class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests):
811     """ UT test methods for Tunnel v6 & v4 """
812     pass
813
814
815 if __name__ == '__main__':
816     unittest.main(testRunner=VppTestRunner)