IPSEC: 4o6 and 6o4 for tunnel interfaces
[vpp.git] / test / template_ipsec.py
1 import unittest
2 import socket
3
4 from scapy.layers.inet import IP, ICMP, TCP, UDP
5 from scapy.layers.ipsec import SecurityAssociation
6 from scapy.layers.l2 import Ether, Raw
7 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
8
9 from framework import VppTestCase, VppTestRunner
10 from util import ppp
11 from vpp_papi import VppEnum
12
13
14 class IPsecIPv4Params(object):
15
16     addr_type = socket.AF_INET
17     addr_any = "0.0.0.0"
18     addr_bcast = "255.255.255.255"
19     addr_len = 32
20     is_ipv6 = 0
21
22     def __init__(self):
23         self.remote_tun_if_host = '1.1.1.1'
24         self.remote_tun_if_host6 = '1111::1'
25
26         self.scapy_tun_sa_id = 10
27         self.scapy_tun_spi = 1001
28         self.vpp_tun_sa_id = 20
29         self.vpp_tun_spi = 1000
30
31         self.scapy_tra_sa_id = 30
32         self.scapy_tra_spi = 2001
33         self.vpp_tra_sa_id = 40
34         self.vpp_tra_spi = 2000
35
36         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
37                                  IPSEC_API_INTEG_ALG_SHA1_96)
38         self.auth_algo = 'HMAC-SHA1-96'  # scapy name
39         self.auth_key = 'C91KUR9GYMm5GfkEvNjX'
40
41         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
42                                   IPSEC_API_CRYPTO_ALG_AES_CBC_128)
43         self.crypt_algo = 'AES-CBC'  # scapy name
44         self.crypt_key = 'JPjyOWBeVEQiMe7h'
45         self.flags = 0
46         self.nat_header = None
47
48
49 class IPsecIPv6Params(object):
50
51     addr_type = socket.AF_INET6
52     addr_any = "0::0"
53     addr_bcast = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"
54     addr_len = 128
55     is_ipv6 = 1
56
57     def __init__(self):
58         self.remote_tun_if_host = '1111:1111:1111:1111:1111:1111:1111:1111'
59         self.remote_tun_if_host4 = '1.1.1.1'
60
61         self.scapy_tun_sa_id = 50
62         self.scapy_tun_spi = 3001
63         self.vpp_tun_sa_id = 60
64         self.vpp_tun_spi = 3000
65
66         self.scapy_tra_sa_id = 70
67         self.scapy_tra_spi = 4001
68         self.vpp_tra_sa_id = 80
69         self.vpp_tra_spi = 4000
70
71         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
72                                  IPSEC_API_INTEG_ALG_SHA_256_128)
73         self.auth_algo = 'SHA2-256-128'  # scapy name
74         self.auth_key = 'C91KUR9GYMm5GfkEvNjX'
75
76         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
77                                   IPSEC_API_CRYPTO_ALG_AES_CBC_256)
78         self.crypt_algo = 'AES-CBC'  # scapy name
79         self.crypt_key = 'JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h'
80         self.flags = 0
81         self.nat_header = None
82
83
84 def config_tun_params(p, encryption_type, tun_if):
85     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
86     use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
87                               IPSEC_API_SAD_FLAG_USE_EXTENDED_SEQ_NUM))
88     p.scapy_tun_sa = SecurityAssociation(
89         encryption_type, spi=p.vpp_tun_spi,
90         crypt_algo=p.crypt_algo, crypt_key=p.crypt_key,
91         auth_algo=p.auth_algo, auth_key=p.auth_key,
92         tunnel_header=ip_class_by_addr_type[p.addr_type](
93             src=tun_if.remote_addr[p.addr_type],
94             dst=tun_if.local_addr[p.addr_type]),
95         nat_t_header=p.nat_header,
96         use_esn=use_esn)
97     p.vpp_tun_sa = SecurityAssociation(
98         encryption_type, spi=p.scapy_tun_spi,
99         crypt_algo=p.crypt_algo, crypt_key=p.crypt_key,
100         auth_algo=p.auth_algo, auth_key=p.auth_key,
101         tunnel_header=ip_class_by_addr_type[p.addr_type](
102             dst=tun_if.remote_addr[p.addr_type],
103             src=tun_if.local_addr[p.addr_type]),
104         nat_t_header=p.nat_header,
105         use_esn=use_esn)
106
107
108 def config_tra_params(p, encryption_type):
109     use_esn = p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
110                          IPSEC_API_SAD_FLAG_USE_EXTENDED_SEQ_NUM)
111     p.scapy_tra_sa = SecurityAssociation(
112         encryption_type,
113         spi=p.vpp_tra_spi,
114         crypt_algo=p.crypt_algo,
115         crypt_key=p.crypt_key,
116         auth_algo=p.auth_algo,
117         auth_key=p.auth_key,
118         nat_t_header=p.nat_header,
119         use_esn=use_esn)
120     p.vpp_tra_sa = SecurityAssociation(
121         encryption_type,
122         spi=p.scapy_tra_spi,
123         crypt_algo=p.crypt_algo,
124         crypt_key=p.crypt_key,
125         auth_algo=p.auth_algo,
126         auth_key=p.auth_key,
127         nat_t_header=p.nat_header,
128         use_esn=use_esn)
129
130
131 class TemplateIpsec(VppTestCase):
132     """
133     TRANSPORT MODE:
134
135      ------   encrypt   ---
136     |tra_if| <-------> |VPP|
137      ------   decrypt   ---
138
139     TUNNEL MODE:
140
141      ------   encrypt   ---   plain   ---
142     |tun_if| <-------  |VPP| <------ |pg1|
143      ------             ---           ---
144
145      ------   decrypt   ---   plain   ---
146     |tun_if| ------->  |VPP| ------> |pg1|
147      ------             ---           ---
148     """
149
150     def ipsec_select_backend(self):
151         """ empty method to be overloaded when necessary """
152         pass
153
154     def setup_params(self):
155         self.ipv4_params = IPsecIPv4Params()
156         self.ipv6_params = IPsecIPv6Params()
157         self.params = {self.ipv4_params.addr_type: self.ipv4_params,
158                        self.ipv6_params.addr_type: self.ipv6_params}
159
160     def setUp(self):
161         super(TemplateIpsec, self).setUp()
162
163         self.setup_params()
164         self.payload = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"\
165                        "XXXXXXXXXXXXXXXXXXXXX"
166
167         self.tun_spd_id = 1
168         self.tra_spd_id = 2
169
170         self.vpp_esp_protocol = (VppEnum.vl_api_ipsec_proto_t.
171                                  IPSEC_API_PROTO_ESP)
172         self.vpp_ah_protocol = (VppEnum.vl_api_ipsec_proto_t.
173                                 IPSEC_API_PROTO_AH)
174
175         self.create_pg_interfaces(range(3))
176         self.interfaces = list(self.pg_interfaces)
177         for i in self.interfaces:
178             i.admin_up()
179             i.config_ip4()
180             i.resolve_arp()
181             i.config_ip6()
182             i.resolve_ndp()
183         self.ipsec_select_backend()
184
185     def tearDown(self):
186         super(TemplateIpsec, self).tearDown()
187
188         for i in self.interfaces:
189             i.admin_down()
190             i.unconfig_ip4()
191             i.unconfig_ip6()
192
193         if not self.vpp_dead:
194             self.vapi.cli("show hardware")
195
196     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1):
197         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
198                 sa.encrypt(IP(src=src, dst=dst) / ICMP() / self.payload)
199                 for i in range(count)]
200
201     def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1):
202         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
203                 sa.encrypt(IPv6(src=src, dst=dst) /
204                            ICMPv6EchoRequest(id=0, seq=1, data=self.payload))
205                 for i in range(count)]
206
207     def gen_pkts(self, sw_intf, src, dst, count=1):
208         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
209                 IP(src=src, dst=dst) / ICMP() / self.payload
210                 for i in range(count)]
211
212     def gen_pkts6(self, sw_intf, src, dst, count=1):
213         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
214                 IPv6(src=src, dst=dst) /
215                 ICMPv6EchoRequest(id=0, seq=1, data=self.payload)
216                 for i in range(count)]
217
218
219 class IpsecTcpTests(object):
220     def test_tcp_checksum(self):
221         """ verify checksum correctness for vpp generated packets """
222         self.vapi.cli("test http server")
223         p = self.params[socket.AF_INET]
224         config_tun_params(p, self.encryption_type, self.tun_if)
225         send = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
226                 p.scapy_tun_sa.encrypt(IP(src=p.remote_tun_if_host,
227                                           dst=self.tun_if.local_ip4) /
228                                        TCP(flags='S', dport=80)))
229         self.logger.debug(ppp("Sending packet:", send))
230         recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
231         recv = recv[0]
232         decrypted = p.vpp_tun_sa.decrypt(recv[IP])
233         self.assert_packet_checksums_valid(decrypted)
234
235
236 class IpsecTra4Tests(object):
237     def test_tra_anti_replay(self, count=1):
238         """ ipsec v4 transport anti-reply test """
239         p = self.params[socket.AF_INET]
240         use_esn = p.vpp_tra_sa.use_esn
241
242         # fire in a packet with seq number 1
243         pkt = (Ether(src=self.tra_if.remote_mac,
244                      dst=self.tra_if.local_mac) /
245                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
246                                          dst=self.tra_if.local_ip4) /
247                                       ICMP(),
248                                       seq_num=1))
249         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
250
251         # now move the window over to 235
252         pkt = (Ether(src=self.tra_if.remote_mac,
253                      dst=self.tra_if.local_mac) /
254                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
255                                          dst=self.tra_if.local_ip4) /
256                                       ICMP(),
257                                       seq_num=235))
258         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
259
260         # replayed packets are dropped
261         self.send_and_assert_no_replies(self.tra_if, pkt * 3)
262         self.assert_packet_counter_equal(
263             '/err/%s/SA replayed packet' % self.tra4_decrypt_node_name, 3)
264
265         # the window size is 64 packets
266         # in window are still accepted
267         pkt = (Ether(src=self.tra_if.remote_mac,
268                      dst=self.tra_if.local_mac) /
269                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
270                                          dst=self.tra_if.local_ip4) /
271                                       ICMP(),
272                                       seq_num=172))
273         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
274
275         # a packet that does not decrypt does not move the window forward
276         bogus_sa = SecurityAssociation(self.encryption_type,
277                                        p.vpp_tra_spi)
278         pkt = (Ether(src=self.tra_if.remote_mac,
279                      dst=self.tra_if.local_mac) /
280                bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
281                                    dst=self.tra_if.local_ip4) /
282                                 ICMP(),
283                                 seq_num=350))
284         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
285
286         self.assert_packet_counter_equal(
287             '/err/%s/Integrity check failed' % self.tra4_decrypt_node_name, 17)
288
289         # which we can determine since this packet is still in the window
290         pkt = (Ether(src=self.tra_if.remote_mac,
291                      dst=self.tra_if.local_mac) /
292                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
293                                          dst=self.tra_if.local_ip4) /
294                                       ICMP(),
295                                       seq_num=234))
296         self.send_and_expect(self.tra_if, [pkt], self.tra_if)
297
298         # out of window are dropped
299         pkt = (Ether(src=self.tra_if.remote_mac,
300                      dst=self.tra_if.local_mac) /
301                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
302                                          dst=self.tra_if.local_ip4) /
303                                       ICMP(),
304                                       seq_num=17))
305         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
306
307         if use_esn:
308             # an out of window error with ESN looks like a high sequence
309             # wrap. but since it isn't then the verify will fail.
310             self.assert_packet_counter_equal(
311                 '/err/%s/Integrity check failed' %
312                 self.tra4_decrypt_node_name, 34)
313
314         else:
315             self.assert_packet_counter_equal(
316                 '/err/%s/SA replayed packet' %
317                 self.tra4_decrypt_node_name, 20)
318
319         # valid packet moves the window over to 236
320         pkt = (Ether(src=self.tra_if.remote_mac,
321                      dst=self.tra_if.local_mac) /
322                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
323                                          dst=self.tra_if.local_ip4) /
324                                       ICMP(),
325                                       seq_num=236))
326         rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
327         decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
328
329         # move VPP's SA to just before the seq-number wrap
330         self.vapi.cli("test ipsec sa %d seq 0xffffffff" % p.scapy_tra_sa_id)
331
332         # then fire in a packet that VPP should drop because it causes the
333         # seq number to wrap  unless we're using extended.
334         pkt = (Ether(src=self.tra_if.remote_mac,
335                      dst=self.tra_if.local_mac) /
336                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
337                                          dst=self.tra_if.local_ip4) /
338                                       ICMP(),
339                                       seq_num=237))
340
341         if use_esn:
342             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
343             # in order to decrpyt the high order number needs to wrap
344             p.vpp_tra_sa.seq_num = 0x100000000
345             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
346
347             # send packets with high bits set
348             p.scapy_tra_sa.seq_num = 0x100000005
349             pkt = (Ether(src=self.tra_if.remote_mac,
350                          dst=self.tra_if.local_mac) /
351                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
352                                              dst=self.tra_if.local_ip4) /
353                                           ICMP(),
354                                           seq_num=0x100000005))
355             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
356             # in order to decrpyt the high order number needs to wrap
357             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
358         else:
359             self.send_and_assert_no_replies(self.tra_if, [pkt])
360             self.assert_packet_counter_equal(
361                 '/err/%s/sequence number cycled' %
362                 self.tra4_encrypt_node_name, 1)
363
364         # move the security-associations seq number on to the last we used
365         self.vapi.cli("test ipsec sa %d seq 0x15f" % p.scapy_tra_sa_id)
366         p.scapy_tra_sa.seq_num = 351
367         p.vpp_tra_sa.seq_num = 351
368
369     def test_tra_basic(self, count=1):
370         """ ipsec v4 transport basic test """
371         self.vapi.cli("clear errors")
372         try:
373             p = self.params[socket.AF_INET]
374             send_pkts = self.gen_encrypt_pkts(p.scapy_tra_sa, self.tra_if,
375                                               src=self.tra_if.remote_ip4,
376                                               dst=self.tra_if.local_ip4,
377                                               count=count)
378             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
379                                              self.tra_if)
380             for rx in recv_pkts:
381                 try:
382                     decrypted = p.vpp_tra_sa.decrypt(rx[IP])
383                     self.assert_packet_checksums_valid(decrypted)
384                 except:
385                     self.logger.debug(ppp("Unexpected packet:", rx))
386                     raise
387         finally:
388             self.logger.info(self.vapi.ppcli("show error"))
389             self.logger.info(self.vapi.ppcli("show ipsec"))
390
391         pkts = p.tra_sa_in.get_stats()['packets']
392         self.assertEqual(pkts, count,
393                          "incorrect SA in counts: expected %d != %d" %
394                          (count, pkts))
395         pkts = p.tra_sa_out.get_stats()['packets']
396         self.assertEqual(pkts, count,
397                          "incorrect SA out counts: expected %d != %d" %
398                          (count, pkts))
399
400         self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
401         self.assert_packet_counter_equal(self.tra4_decrypt_node_name, count)
402
403     def test_tra_burst(self):
404         """ ipsec v4 transport burst test """
405         self.test_tra_basic(count=257)
406
407
408 class IpsecTra6Tests(object):
409     def test_tra_basic6(self, count=1):
410         """ ipsec v6 transport basic test """
411         self.vapi.cli("clear errors")
412         try:
413             p = self.params[socket.AF_INET6]
414             send_pkts = self.gen_encrypt_pkts6(p.scapy_tra_sa, self.tra_if,
415                                                src=self.tra_if.remote_ip6,
416                                                dst=self.tra_if.local_ip6,
417                                                count=count)
418             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
419                                              self.tra_if)
420             for rx in recv_pkts:
421                 try:
422                     decrypted = p.vpp_tra_sa.decrypt(rx[IPv6])
423                     self.assert_packet_checksums_valid(decrypted)
424                 except:
425                     self.logger.debug(ppp("Unexpected packet:", rx))
426                     raise
427         finally:
428             self.logger.info(self.vapi.ppcli("show error"))
429             self.logger.info(self.vapi.ppcli("show ipsec"))
430
431         pkts = p.tra_sa_in.get_stats()['packets']
432         self.assertEqual(pkts, count,
433                          "incorrect SA in counts: expected %d != %d" %
434                          (count, pkts))
435         pkts = p.tra_sa_out.get_stats()['packets']
436         self.assertEqual(pkts, count,
437                          "incorrect SA out counts: expected %d != %d" %
438                          (count, pkts))
439         self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
440         self.assert_packet_counter_equal(self.tra6_decrypt_node_name, count)
441
442     def test_tra_burst6(self):
443         """ ipsec v6 transport burst test """
444         self.test_tra_basic6(count=257)
445
446
447 class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
448     pass
449
450
451 class IpsecTun4(object):
452
453     def verify_counters(self, p, count):
454         if (hasattr(p, "spd_policy_in_any")):
455             pkts = p.spd_policy_in_any.get_stats()['packets']
456             self.assertEqual(pkts, count,
457                              "incorrect SPD any policy: expected %d != %d" %
458                              (count, pkts))
459
460         if (hasattr(p, "tun_sa_in")):
461             pkts = p.tun_sa_in.get_stats()['packets']
462             self.assertEqual(pkts, count,
463                              "incorrect SA in counts: expected %d != %d" %
464                              (count, pkts))
465             pkts = p.tun_sa_out.get_stats()['packets']
466             self.assertEqual(pkts, count,
467                              "incorrect SA out counts: expected %d != %d" %
468                              (count, pkts))
469
470         self.assert_packet_counter_equal(self.tun4_encrypt_node_name, count)
471         self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
472
473     def verify_tun_44(self, p, count=1):
474         self.vapi.cli("clear errors")
475         try:
476             config_tun_params(p, self.encryption_type, self.tun_if)
477             send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
478                                               src=p.remote_tun_if_host,
479                                               dst=self.pg1.remote_ip4,
480                                               count=count)
481             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
482             for recv_pkt in recv_pkts:
483                 self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host)
484                 self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
485                 self.assert_packet_checksums_valid(recv_pkt)
486             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
487                                       dst=p.remote_tun_if_host, count=count)
488             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
489             for recv_pkt in recv_pkts:
490                 try:
491                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
492                     if not decrypt_pkt.haslayer(IP):
493                         decrypt_pkt = IP(decrypt_pkt[Raw].load)
494                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
495                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
496                     self.assert_packet_checksums_valid(decrypt_pkt)
497                 except:
498                     self.logger.debug(ppp("Unexpected packet:", recv_pkt))
499                     try:
500                         self.logger.debug(
501                             ppp("Decrypted packet:", decrypt_pkt))
502                     except:
503                         pass
504                     raise
505         finally:
506             self.logger.info(self.vapi.ppcli("show error"))
507             self.logger.info(self.vapi.ppcli("show ipsec"))
508
509         self.verify_counters(p, count)
510
511     def verify_tun_64(self, p, count=1):
512         self.vapi.cli("clear errors")
513         try:
514             config_tun_params(p, self.encryption_type, self.tun_if)
515             send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
516                                                src=p.remote_tun_if_host6,
517                                                dst=self.pg1.remote_ip6,
518                                                count=count)
519             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
520             for recv_pkt in recv_pkts:
521                 self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host6)
522                 self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
523                 self.assert_packet_checksums_valid(recv_pkt)
524             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
525                                        dst=p.remote_tun_if_host6, count=count)
526             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
527             for recv_pkt in recv_pkts:
528                 try:
529                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
530                     if not decrypt_pkt.haslayer(IPv6):
531                         decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
532                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
533                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host6)
534                     self.assert_packet_checksums_valid(decrypt_pkt)
535                 except:
536                     self.logger.error(ppp("Unexpected packet:", recv_pkt))
537                     try:
538                         self.logger.debug(
539                             ppp("Decrypted packet:", decrypt_pkt))
540                     except:
541                         pass
542                     raise
543         finally:
544             self.logger.info(self.vapi.ppcli("show error"))
545             self.logger.info(self.vapi.ppcli("show ipsec"))
546
547         self.verify_counters(p, count)
548
549
550 class IpsecTun4Tests(IpsecTun4):
551
552     def test_tun_basic44(self):
553         """ ipsec 4o4 tunnel basic test """
554         self.verify_tun_44(self.params[socket.AF_INET], count=1)
555
556     def test_tun_burst44(self):
557         """ ipsec 4o4 tunnel burst test """
558         self.verify_tun_44(self.params[socket.AF_INET], count=257)
559
560
561 class IpsecTun6(object):
562
563     def verify_counters(self, p, count):
564         if (hasattr(p, "tun_sa_in")):
565             pkts = p.tun_sa_in.get_stats()['packets']
566             self.assertEqual(pkts, count,
567                              "incorrect SA in counts: expected %d != %d" %
568                              (count, pkts))
569             pkts = p.tun_sa_out.get_stats()['packets']
570             self.assertEqual(pkts, count,
571                              "incorrect SA out counts: expected %d != %d" %
572                              (count, pkts))
573         self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
574         self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count)
575
576     def verify_tun_66(self, p, count=1):
577         """ ipsec 6o6 tunnel basic test """
578         self.vapi.cli("clear errors")
579         try:
580             config_tun_params(p, self.encryption_type, self.tun_if)
581             send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
582                                                src=p.remote_tun_if_host,
583                                                dst=self.pg1.remote_ip6,
584                                                count=count)
585             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
586             for recv_pkt in recv_pkts:
587                 self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host)
588                 self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
589                 self.assert_packet_checksums_valid(recv_pkt)
590             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
591                                        dst=p.remote_tun_if_host,
592                                        count=count)
593             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
594             for recv_pkt in recv_pkts:
595                 try:
596                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
597                     if not decrypt_pkt.haslayer(IPv6):
598                         decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
599                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
600                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
601                     self.assert_packet_checksums_valid(decrypt_pkt)
602                 except:
603                     self.logger.debug(ppp("Unexpected packet:", recv_pkt))
604                     try:
605                         self.logger.debug(
606                             ppp("Decrypted packet:", decrypt_pkt))
607                     except:
608                         pass
609                     raise
610         finally:
611             self.logger.info(self.vapi.ppcli("show error"))
612             self.logger.info(self.vapi.ppcli("show ipsec"))
613         self.verify_counters(p, count)
614
615     def verify_tun_46(self, p, count=1):
616         """ ipsec 4o6 tunnel basic test """
617         self.vapi.cli("clear errors")
618         try:
619             config_tun_params(p, self.encryption_type, self.tun_if)
620             send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
621                                               src=p.remote_tun_if_host4,
622                                               dst=self.pg1.remote_ip4,
623                                               count=count)
624             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
625             for recv_pkt in recv_pkts:
626                 self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host4)
627                 self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
628                 self.assert_packet_checksums_valid(recv_pkt)
629             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
630                                       dst=p.remote_tun_if_host4,
631                                       count=count)
632             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
633             for recv_pkt in recv_pkts:
634                 try:
635                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
636                     if not decrypt_pkt.haslayer(IP):
637                         decrypt_pkt = IP(decrypt_pkt[Raw].load)
638                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
639                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host4)
640                     self.assert_packet_checksums_valid(decrypt_pkt)
641                 except:
642                     self.logger.debug(ppp("Unexpected packet:", recv_pkt))
643                     try:
644                         self.logger.debug(ppp("Decrypted packet:",
645                                               decrypt_pkt))
646                     except:
647                         pass
648                     raise
649         finally:
650             self.logger.info(self.vapi.ppcli("show error"))
651             self.logger.info(self.vapi.ppcli("show ipsec"))
652         self.verify_counters(p, count)
653
654
655 class IpsecTun6Tests(IpsecTun6):
656
657     def test_tun_basic66(self):
658         """ ipsec 6o6 tunnel basic test """
659         self.verify_tun_66(self.params[socket.AF_INET6], count=1)
660
661     def test_tun_burst66(self):
662         """ ipsec 6o6 tunnel burst test """
663         self.verify_tun_66(self.params[socket.AF_INET6], count=257)
664
665
666 class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests):
667     pass
668
669
670 if __name__ == '__main__':
671     unittest.main(testRunner=VppTestRunner)