Typos. A bunch of typos I've been collecting.
[vpp.git] / test / template_ipsec.py
1 import unittest
2 import socket
3
4 from scapy.layers.inet import IP, ICMP, TCP, UDP
5 from scapy.layers.ipsec import SecurityAssociation
6 from scapy.layers.l2 import Ether, Raw
7 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
8
9 from framework import VppTestCase, VppTestRunner
10 from util import ppp
11 from vpp_papi import VppEnum
12
13
14 class IPsecIPv4Params(object):
15
16     addr_type = socket.AF_INET
17     addr_any = "0.0.0.0"
18     addr_bcast = "255.255.255.255"
19     addr_len = 32
20     is_ipv6 = 0
21
22     def __init__(self):
23         self.remote_tun_if_host = '1.1.1.1'
24
25         self.scapy_tun_sa_id = 10
26         self.scapy_tun_spi = 1001
27         self.vpp_tun_sa_id = 20
28         self.vpp_tun_spi = 1000
29
30         self.scapy_tra_sa_id = 30
31         self.scapy_tra_spi = 2001
32         self.vpp_tra_sa_id = 40
33         self.vpp_tra_spi = 2000
34
35         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
36                                  IPSEC_API_INTEG_ALG_SHA1_96)
37         self.auth_algo = 'HMAC-SHA1-96'  # scapy name
38         self.auth_key = 'C91KUR9GYMm5GfkEvNjX'
39
40         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
41                                   IPSEC_API_CRYPTO_ALG_AES_CBC_128)
42         self.crypt_algo = 'AES-CBC'  # scapy name
43         self.crypt_key = 'JPjyOWBeVEQiMe7h'
44         self.flags = 0
45         self.nat_header = None
46
47
48 class IPsecIPv6Params(object):
49
50     addr_type = socket.AF_INET6
51     addr_any = "0::0"
52     addr_bcast = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"
53     addr_len = 128
54     is_ipv6 = 1
55
56     def __init__(self):
57         self.remote_tun_if_host = '1111:1111:1111:1111:1111:1111:1111:1111'
58
59         self.scapy_tun_sa_id = 50
60         self.scapy_tun_spi = 3001
61         self.vpp_tun_sa_id = 60
62         self.vpp_tun_spi = 3000
63
64         self.scapy_tra_sa_id = 70
65         self.scapy_tra_spi = 4001
66         self.vpp_tra_sa_id = 80
67         self.vpp_tra_spi = 4000
68
69         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
70                                  IPSEC_API_INTEG_ALG_SHA_256_128)
71         self.auth_algo = 'SHA2-256-128'  # scapy name
72         self.auth_key = 'C91KUR9GYMm5GfkEvNjX'
73
74         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
75                                   IPSEC_API_CRYPTO_ALG_AES_CBC_256)
76         self.crypt_algo = 'AES-CBC'  # scapy name
77         self.crypt_key = 'JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h'
78         self.flags = 0
79         self.nat_header = None
80
81
82 def config_tun_params(p, encryption_type, tun_if):
83     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
84     use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
85                               IPSEC_API_SAD_FLAG_USE_EXTENDED_SEQ_NUM))
86     p.scapy_tun_sa = SecurityAssociation(
87         encryption_type, spi=p.vpp_tun_spi,
88         crypt_algo=p.crypt_algo, crypt_key=p.crypt_key,
89         auth_algo=p.auth_algo, auth_key=p.auth_key,
90         tunnel_header=ip_class_by_addr_type[p.addr_type](
91             src=tun_if.remote_addr[p.addr_type],
92             dst=tun_if.local_addr[p.addr_type]),
93         nat_t_header=p.nat_header,
94         use_esn=use_esn)
95     p.vpp_tun_sa = SecurityAssociation(
96         encryption_type, spi=p.scapy_tun_spi,
97         crypt_algo=p.crypt_algo, crypt_key=p.crypt_key,
98         auth_algo=p.auth_algo, auth_key=p.auth_key,
99         tunnel_header=ip_class_by_addr_type[p.addr_type](
100             dst=tun_if.remote_addr[p.addr_type],
101             src=tun_if.local_addr[p.addr_type]),
102         nat_t_header=p.nat_header,
103         use_esn=use_esn)
104
105
106 def config_tra_params(p, encryption_type):
107     use_esn = p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
108                          IPSEC_API_SAD_FLAG_USE_EXTENDED_SEQ_NUM)
109     p.scapy_tra_sa = SecurityAssociation(
110         encryption_type,
111         spi=p.vpp_tra_spi,
112         crypt_algo=p.crypt_algo,
113         crypt_key=p.crypt_key,
114         auth_algo=p.auth_algo,
115         auth_key=p.auth_key,
116         nat_t_header=p.nat_header,
117         use_esn=use_esn)
118     p.vpp_tra_sa = SecurityAssociation(
119         encryption_type,
120         spi=p.scapy_tra_spi,
121         crypt_algo=p.crypt_algo,
122         crypt_key=p.crypt_key,
123         auth_algo=p.auth_algo,
124         auth_key=p.auth_key,
125         nat_t_header=p.nat_header,
126         use_esn=use_esn)
127
128
129 class TemplateIpsec(VppTestCase):
130     """
131     TRANSPORT MODE:
132
133      ------   encrypt   ---
134     |tra_if| <-------> |VPP|
135      ------   decrypt   ---
136
137     TUNNEL MODE:
138
139      ------   encrypt   ---   plain   ---
140     |tun_if| <-------  |VPP| <------ |pg1|
141      ------             ---           ---
142
143      ------   decrypt   ---   plain   ---
144     |tun_if| ------->  |VPP| ------> |pg1|
145      ------             ---           ---
146     """
147
148     def ipsec_select_backend(self):
149         """ empty method to be overloaded when necessary """
150         pass
151
152     def setup_params(self):
153         self.ipv4_params = IPsecIPv4Params()
154         self.ipv6_params = IPsecIPv6Params()
155         self.params = {self.ipv4_params.addr_type: self.ipv4_params,
156                        self.ipv6_params.addr_type: self.ipv6_params}
157
158     def setUp(self):
159         super(TemplateIpsec, self).setUp()
160
161         self.setup_params()
162         self.payload = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"\
163                        "XXXXXXXXXXXXXXXXXXXXX"
164
165         self.tun_spd_id = 1
166         self.tra_spd_id = 2
167
168         self.vpp_esp_protocol = (VppEnum.vl_api_ipsec_proto_t.
169                                  IPSEC_API_PROTO_ESP)
170         self.vpp_ah_protocol = (VppEnum.vl_api_ipsec_proto_t.
171                                 IPSEC_API_PROTO_AH)
172
173         self.create_pg_interfaces(range(3))
174         self.interfaces = list(self.pg_interfaces)
175         for i in self.interfaces:
176             i.admin_up()
177             i.config_ip4()
178             i.resolve_arp()
179             i.config_ip6()
180             i.resolve_ndp()
181         self.ipsec_select_backend()
182
183     def tearDown(self):
184         super(TemplateIpsec, self).tearDown()
185
186         for i in self.interfaces:
187             i.admin_down()
188             i.unconfig_ip4()
189             i.unconfig_ip6()
190
191         if not self.vpp_dead:
192             self.vapi.cli("show hardware")
193
194     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1):
195         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
196                 sa.encrypt(IP(src=src, dst=dst) / ICMP() / self.payload)
197                 for i in range(count)]
198
199     def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1):
200         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
201                 sa.encrypt(IPv6(src=src, dst=dst) /
202                            ICMPv6EchoRequest(id=0, seq=1, data=self.payload))
203                 for i in range(count)]
204
205     def gen_pkts(self, sw_intf, src, dst, count=1):
206         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
207                 IP(src=src, dst=dst) / ICMP() / self.payload
208                 for i in range(count)]
209
210     def gen_pkts6(self, sw_intf, src, dst, count=1):
211         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
212                 IPv6(src=src, dst=dst) /
213                 ICMPv6EchoRequest(id=0, seq=1, data=self.payload)
214                 for i in range(count)]
215
216     def configure_sa_tra(self, params):
217         params.scapy_tra_sa = SecurityAssociation(
218             self.encryption_type,
219             spi=params.vpp_tra_spi,
220             crypt_algo=params.crypt_algo,
221             crypt_key=params.crypt_key,
222             auth_algo=params.auth_algo,
223             auth_key=params.auth_key,
224             nat_t_header=params.nat_header)
225         params.vpp_tra_sa = SecurityAssociation(
226             self.encryption_type,
227             spi=params.scapy_tra_spi,
228             crypt_algo=params.crypt_algo,
229             crypt_key=params.crypt_key,
230             auth_algo=params.auth_algo,
231             auth_key=params.auth_key,
232             nat_t_header=params.nat_header)
233
234
235 class IpsecTcpTests(object):
236     def test_tcp_checksum(self):
237         """ verify checksum correctness for vpp generated packets """
238         self.vapi.cli("test http server")
239         p = self.params[socket.AF_INET]
240         config_tun_params(p, self.encryption_type, self.tun_if)
241         send = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
242                 p.scapy_tun_sa.encrypt(IP(src=p.remote_tun_if_host,
243                                           dst=self.tun_if.local_ip4) /
244                                        TCP(flags='S', dport=80)))
245         self.logger.debug(ppp("Sending packet:", send))
246         recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
247         recv = recv[0]
248         decrypted = p.vpp_tun_sa.decrypt(recv[IP])
249         self.assert_packet_checksums_valid(decrypted)
250
251
252 class IpsecTra4Tests(object):
253     def test_tra_anti_replay(self, count=1):
254         """ ipsec v4 transport anti-reply test """
255         p = self.params[socket.AF_INET]
256         use_esn = p.vpp_tra_sa.use_esn
257
258         # fire in a packet with seq number 1
259         pkt = (Ether(src=self.tra_if.remote_mac,
260                      dst=self.tra_if.local_mac) /
261                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
262                                          dst=self.tra_if.local_ip4) /
263                                       ICMP(),
264                                       seq_num=1))
265         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
266
267         # now move the window over to 235
268         pkt = (Ether(src=self.tra_if.remote_mac,
269                      dst=self.tra_if.local_mac) /
270                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
271                                          dst=self.tra_if.local_ip4) /
272                                       ICMP(),
273                                       seq_num=235))
274         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
275
276         # replayed packets are dropped
277         self.send_and_assert_no_replies(self.tra_if, pkt * 3)
278         self.assert_packet_counter_equal(
279             '/err/%s/SA replayed packet' % self.tra4_decrypt_node_name, 3)
280
281         # the window size is 64 packets
282         # in window are still accepted
283         pkt = (Ether(src=self.tra_if.remote_mac,
284                      dst=self.tra_if.local_mac) /
285                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
286                                          dst=self.tra_if.local_ip4) /
287                                       ICMP(),
288                                       seq_num=172))
289         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
290
291         # a packet that does not decrypt does not move the window forward
292         bogus_sa = SecurityAssociation(self.encryption_type,
293                                        p.vpp_tra_spi)
294         pkt = (Ether(src=self.tra_if.remote_mac,
295                      dst=self.tra_if.local_mac) /
296                bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
297                                    dst=self.tra_if.local_ip4) /
298                                 ICMP(),
299                                 seq_num=350))
300         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
301
302         self.assert_packet_counter_equal(
303             '/err/%s/Integrity check failed' % self.tra4_decrypt_node_name, 17)
304
305         # which we can determine since this packet is still in the window
306         pkt = (Ether(src=self.tra_if.remote_mac,
307                      dst=self.tra_if.local_mac) /
308                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
309                                          dst=self.tra_if.local_ip4) /
310                                       ICMP(),
311                                       seq_num=234))
312         self.send_and_expect(self.tra_if, [pkt], self.tra_if)
313
314         # out of window are dropped
315         pkt = (Ether(src=self.tra_if.remote_mac,
316                      dst=self.tra_if.local_mac) /
317                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
318                                          dst=self.tra_if.local_ip4) /
319                                       ICMP(),
320                                       seq_num=17))
321         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
322
323         if use_esn:
324             # an out of window error with ESN looks like a high sequence
325             # wrap. but since it isn't then the verify will fail.
326             self.assert_packet_counter_equal(
327                 '/err/%s/Integrity check failed' %
328                 self.tra4_decrypt_node_name, 34)
329
330         else:
331             self.assert_packet_counter_equal(
332                 '/err/%s/SA replayed packet' %
333                 self.tra4_decrypt_node_name, 20)
334
335         # valid packet moves the window over to 236
336         pkt = (Ether(src=self.tra_if.remote_mac,
337                      dst=self.tra_if.local_mac) /
338                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
339                                          dst=self.tra_if.local_ip4) /
340                                       ICMP(),
341                                       seq_num=236))
342         rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
343         decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
344
345         # move VPP's SA to just before the seq-number wrap
346         self.vapi.cli("test ipsec sa %d seq 0xffffffff" % p.scapy_tra_sa_id)
347
348         # then fire in a packet that VPP should drop because it causes the
349         # seq number to wrap  unless we're using extended.
350         pkt = (Ether(src=self.tra_if.remote_mac,
351                      dst=self.tra_if.local_mac) /
352                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
353                                          dst=self.tra_if.local_ip4) /
354                                       ICMP(),
355                                       seq_num=237))
356
357         if use_esn:
358             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
359             # in order to decrpyt the high order number needs to wrap
360             p.vpp_tra_sa.seq_num = 0x100000000
361             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
362
363             # send packets with high bits set
364             p.scapy_tra_sa.seq_num = 0x100000005
365             pkt = (Ether(src=self.tra_if.remote_mac,
366                          dst=self.tra_if.local_mac) /
367                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
368                                              dst=self.tra_if.local_ip4) /
369                                           ICMP(),
370                                           seq_num=0x100000005))
371             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
372             # in order to decrpyt the high order number needs to wrap
373             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
374         else:
375             self.send_and_assert_no_replies(self.tra_if, [pkt])
376             self.assert_packet_counter_equal(
377                 '/err/%s/sequence number cycled' %
378                 self.tra4_encrypt_node_name, 1)
379
380         # move the security-associations seq number on to the last we used
381         self.vapi.cli("test ipsec sa %d seq 0x15f" % p.scapy_tra_sa_id)
382         p.scapy_tra_sa.seq_num = 351
383         p.vpp_tra_sa.seq_num = 351
384
385     def test_tra_basic(self, count=1):
386         """ ipsec v4 transport basic test """
387         self.vapi.cli("clear errors")
388         try:
389             p = self.params[socket.AF_INET]
390             send_pkts = self.gen_encrypt_pkts(p.scapy_tra_sa, self.tra_if,
391                                               src=self.tra_if.remote_ip4,
392                                               dst=self.tra_if.local_ip4,
393                                               count=count)
394             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
395                                              self.tra_if)
396             for rx in recv_pkts:
397                 try:
398                     decrypted = p.vpp_tra_sa.decrypt(rx[IP])
399                     self.assert_packet_checksums_valid(decrypted)
400                 except:
401                     self.logger.debug(ppp("Unexpected packet:", rx))
402                     raise
403         finally:
404             self.logger.info(self.vapi.ppcli("show error"))
405             self.logger.info(self.vapi.ppcli("show ipsec"))
406
407         pkts = p.tra_sa_in.get_stats()['packets']
408         self.assertEqual(pkts, count,
409                          "incorrect SA in counts: expected %d != %d" %
410                          (count, pkts))
411         pkts = p.tra_sa_out.get_stats()['packets']
412         self.assertEqual(pkts, count,
413                          "incorrect SA out counts: expected %d != %d" %
414                          (count, pkts))
415
416         self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
417         self.assert_packet_counter_equal(self.tra4_decrypt_node_name, count)
418
419     def test_tra_burst(self):
420         """ ipsec v4 transport burst test """
421         self.test_tra_basic(count=257)
422
423
424 class IpsecTra6Tests(object):
425     def test_tra_basic6(self, count=1):
426         """ ipsec v6 transport basic test """
427         self.vapi.cli("clear errors")
428         try:
429             p = self.params[socket.AF_INET6]
430             send_pkts = self.gen_encrypt_pkts6(p.scapy_tra_sa, self.tra_if,
431                                                src=self.tra_if.remote_ip6,
432                                                dst=self.tra_if.local_ip6,
433                                                count=count)
434             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
435                                              self.tra_if)
436             for rx in recv_pkts:
437                 try:
438                     decrypted = p.vpp_tra_sa.decrypt(rx[IPv6])
439                     self.assert_packet_checksums_valid(decrypted)
440                 except:
441                     self.logger.debug(ppp("Unexpected packet:", rx))
442                     raise
443         finally:
444             self.logger.info(self.vapi.ppcli("show error"))
445             self.logger.info(self.vapi.ppcli("show ipsec"))
446
447         pkts = p.tra_sa_in.get_stats()['packets']
448         self.assertEqual(pkts, count,
449                          "incorrect SA in counts: expected %d != %d" %
450                          (count, pkts))
451         pkts = p.tra_sa_out.get_stats()['packets']
452         self.assertEqual(pkts, count,
453                          "incorrect SA out counts: expected %d != %d" %
454                          (count, pkts))
455         self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
456         self.assert_packet_counter_equal(self.tra6_decrypt_node_name, count)
457
458     def test_tra_burst6(self):
459         """ ipsec v6 transport burst test """
460         self.test_tra_basic6(count=257)
461
462
463 class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
464     pass
465
466
467 class IpsecTun4(object):
468
469     def verify_tun_44(self, p, count=1):
470         self.vapi.cli("clear errors")
471         try:
472             config_tun_params(p, self.encryption_type, self.tun_if)
473             send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
474                                               src=p.remote_tun_if_host,
475                                               dst=self.pg1.remote_ip4,
476                                               count=count)
477             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
478             for recv_pkt in recv_pkts:
479                 self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host)
480                 self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
481                 self.assert_packet_checksums_valid(recv_pkt)
482             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
483                                       dst=p.remote_tun_if_host, count=count)
484             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
485             for recv_pkt in recv_pkts:
486                 try:
487                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
488                     if not decrypt_pkt.haslayer(IP):
489                         decrypt_pkt = IP(decrypt_pkt[Raw].load)
490                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
491                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
492                     self.assert_packet_checksums_valid(decrypt_pkt)
493                 except:
494                     self.logger.debug(ppp("Unexpected packet:", recv_pkt))
495                     try:
496                         self.logger.debug(
497                             ppp("Decrypted packet:", decrypt_pkt))
498                     except:
499                         pass
500                     raise
501         finally:
502             self.logger.info(self.vapi.ppcli("show error"))
503             self.logger.info(self.vapi.ppcli("show ipsec"))
504
505         if (hasattr(p, "spd_policy_in_any")):
506             pkts = p.spd_policy_in_any.get_stats()['packets']
507             self.assertEqual(pkts, count,
508                              "incorrect SPD any policy: expected %d != %d" %
509                              (count, pkts))
510
511         if (hasattr(p, "tun_sa_in")):
512             pkts = p.tun_sa_in.get_stats()['packets']
513             self.assertEqual(pkts, count,
514                              "incorrect SA in counts: expected %d != %d" %
515                              (count, pkts))
516             pkts = p.tun_sa_out.get_stats()['packets']
517             self.assertEqual(pkts, count,
518                              "incorrect SA out counts: expected %d != %d" %
519                              (count, pkts))
520
521         self.assert_packet_counter_equal(self.tun4_encrypt_node_name, count)
522         self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
523
524
525 class IpsecTun4Tests(IpsecTun4):
526
527     def test_tun_basic44(self):
528         """ ipsec 4o4 tunnel basic test """
529         self.verify_tun_44(self.params[socket.AF_INET], count=1)
530
531     def test_tun_burst44(self):
532         """ ipsec 4o4 tunnel burst test """
533         self.verify_tun_44(self.params[socket.AF_INET], count=257)
534
535
536 class IpsecTun6(object):
537
538     def verify_tun_66(self, p, count=1):
539         """ ipsec 6o6 tunnel basic test """
540         self.vapi.cli("clear errors")
541         try:
542             config_tun_params(p, self.encryption_type, self.tun_if)
543             send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
544                                                src=p.remote_tun_if_host,
545                                                dst=self.pg1.remote_ip6,
546                                                count=count)
547             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
548             for recv_pkt in recv_pkts:
549                 self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host)
550                 self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
551                 self.assert_packet_checksums_valid(recv_pkt)
552             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
553                                        dst=p.remote_tun_if_host,
554                                        count=count)
555             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
556             for recv_pkt in recv_pkts:
557                 try:
558                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
559                     if not decrypt_pkt.haslayer(IPv6):
560                         decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
561                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
562                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
563                     self.assert_packet_checksums_valid(decrypt_pkt)
564                 except:
565                     self.logger.debug(ppp("Unexpected packet:", recv_pkt))
566                     try:
567                         self.logger.debug(
568                             ppp("Decrypted packet:", decrypt_pkt))
569                     except:
570                         pass
571                     raise
572         finally:
573             self.logger.info(self.vapi.ppcli("show error"))
574             self.logger.info(self.vapi.ppcli("show ipsec"))
575
576         if (hasattr(p, "tun_sa_in")):
577             pkts = p.tun_sa_in.get_stats()['packets']
578             self.assertEqual(pkts, count,
579                              "incorrect SA in counts: expected %d != %d" %
580                              (count, pkts))
581             pkts = p.tun_sa_out.get_stats()['packets']
582             self.assertEqual(pkts, count,
583                              "incorrect SA out counts: expected %d != %d" %
584                              (count, pkts))
585         self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
586         self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count)
587
588
589 class IpsecTun6Tests(IpsecTun6):
590
591     def test_tun_basic66(self):
592         """ ipsec 6o6 tunnel basic test """
593         self.verify_tun_66(self.params[socket.AF_INET6], count=1)
594
595     def test_tun_burst66(self):
596         """ ipsec 6o6 tunnel burst test """
597         self.verify_tun_66(self.params[socket.AF_INET6], count=257)
598
599
600 class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests):
601     pass
602
603
604 if __name__ == '__main__':
605     unittest.main(testRunner=VppTestRunner)