IPSEC: tunnel fragmentation
[vpp.git] / test / template_ipsec.py
1 import unittest
2 import socket
3
4 from scapy.layers.inet import IP, ICMP, TCP, UDP
5 from scapy.layers.ipsec import SecurityAssociation
6 from scapy.layers.l2 import Ether, Raw
7 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
8
9 from framework import VppTestCase, VppTestRunner
10 from util import ppp, reassemble4
11 from vpp_papi import VppEnum
12
13
14 class IPsecIPv4Params(object):
15
16     addr_type = socket.AF_INET
17     addr_any = "0.0.0.0"
18     addr_bcast = "255.255.255.255"
19     addr_len = 32
20     is_ipv6 = 0
21
22     def __init__(self):
23         self.remote_tun_if_host = '1.1.1.1'
24         self.remote_tun_if_host6 = '1111::1'
25
26         self.scapy_tun_sa_id = 10
27         self.scapy_tun_spi = 1001
28         self.vpp_tun_sa_id = 20
29         self.vpp_tun_spi = 1000
30
31         self.scapy_tra_sa_id = 30
32         self.scapy_tra_spi = 2001
33         self.vpp_tra_sa_id = 40
34         self.vpp_tra_spi = 2000
35
36         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
37                                  IPSEC_API_INTEG_ALG_SHA1_96)
38         self.auth_algo = 'HMAC-SHA1-96'  # scapy name
39         self.auth_key = 'C91KUR9GYMm5GfkEvNjX'
40
41         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
42                                   IPSEC_API_CRYPTO_ALG_AES_CBC_128)
43         self.crypt_algo = 'AES-CBC'  # scapy name
44         self.crypt_key = 'JPjyOWBeVEQiMe7h'
45         self.flags = 0
46         self.nat_header = None
47
48
49 class IPsecIPv6Params(object):
50
51     addr_type = socket.AF_INET6
52     addr_any = "0::0"
53     addr_bcast = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"
54     addr_len = 128
55     is_ipv6 = 1
56
57     def __init__(self):
58         self.remote_tun_if_host = '1111:1111:1111:1111:1111:1111:1111:1111'
59         self.remote_tun_if_host4 = '1.1.1.1'
60
61         self.scapy_tun_sa_id = 50
62         self.scapy_tun_spi = 3001
63         self.vpp_tun_sa_id = 60
64         self.vpp_tun_spi = 3000
65
66         self.scapy_tra_sa_id = 70
67         self.scapy_tra_spi = 4001
68         self.vpp_tra_sa_id = 80
69         self.vpp_tra_spi = 4000
70
71         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
72                                  IPSEC_API_INTEG_ALG_SHA_256_128)
73         self.auth_algo = 'SHA2-256-128'  # scapy name
74         self.auth_key = 'C91KUR9GYMm5GfkEvNjX'
75
76         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
77                                   IPSEC_API_CRYPTO_ALG_AES_CBC_256)
78         self.crypt_algo = 'AES-CBC'  # scapy name
79         self.crypt_key = 'JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h'
80         self.flags = 0
81         self.nat_header = None
82
83
84 def config_tun_params(p, encryption_type, tun_if):
85     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
86     use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
87                               IPSEC_API_SAD_FLAG_USE_ESN))
88     p.scapy_tun_sa = SecurityAssociation(
89         encryption_type, spi=p.vpp_tun_spi,
90         crypt_algo=p.crypt_algo, crypt_key=p.crypt_key,
91         auth_algo=p.auth_algo, auth_key=p.auth_key,
92         tunnel_header=ip_class_by_addr_type[p.addr_type](
93             src=tun_if.remote_addr[p.addr_type],
94             dst=tun_if.local_addr[p.addr_type]),
95         nat_t_header=p.nat_header,
96         use_esn=use_esn)
97     p.vpp_tun_sa = SecurityAssociation(
98         encryption_type, spi=p.scapy_tun_spi,
99         crypt_algo=p.crypt_algo, crypt_key=p.crypt_key,
100         auth_algo=p.auth_algo, auth_key=p.auth_key,
101         tunnel_header=ip_class_by_addr_type[p.addr_type](
102             dst=tun_if.remote_addr[p.addr_type],
103             src=tun_if.local_addr[p.addr_type]),
104         nat_t_header=p.nat_header,
105         use_esn=use_esn)
106
107
108 def config_tra_params(p, encryption_type):
109     use_esn = p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
110                          IPSEC_API_SAD_FLAG_USE_ESN)
111     p.scapy_tra_sa = SecurityAssociation(
112         encryption_type,
113         spi=p.vpp_tra_spi,
114         crypt_algo=p.crypt_algo,
115         crypt_key=p.crypt_key,
116         auth_algo=p.auth_algo,
117         auth_key=p.auth_key,
118         nat_t_header=p.nat_header,
119         use_esn=use_esn)
120     p.vpp_tra_sa = SecurityAssociation(
121         encryption_type,
122         spi=p.scapy_tra_spi,
123         crypt_algo=p.crypt_algo,
124         crypt_key=p.crypt_key,
125         auth_algo=p.auth_algo,
126         auth_key=p.auth_key,
127         nat_t_header=p.nat_header,
128         use_esn=use_esn)
129
130
131 class TemplateIpsec(VppTestCase):
132     """
133     TRANSPORT MODE:
134
135      ------   encrypt   ---
136     |tra_if| <-------> |VPP|
137      ------   decrypt   ---
138
139     TUNNEL MODE:
140
141      ------   encrypt   ---   plain   ---
142     |tun_if| <-------  |VPP| <------ |pg1|
143      ------             ---           ---
144
145      ------   decrypt   ---   plain   ---
146     |tun_if| ------->  |VPP| ------> |pg1|
147      ------             ---           ---
148     """
149
150     def ipsec_select_backend(self):
151         """ empty method to be overloaded when necessary """
152         pass
153
154     def setup_params(self):
155         self.ipv4_params = IPsecIPv4Params()
156         self.ipv6_params = IPsecIPv6Params()
157         self.params = {self.ipv4_params.addr_type: self.ipv4_params,
158                        self.ipv6_params.addr_type: self.ipv6_params}
159
160     def setUp(self):
161         super(TemplateIpsec, self).setUp()
162
163         self.setup_params()
164
165         self.tun_spd_id = 1
166         self.tra_spd_id = 2
167
168         self.vpp_esp_protocol = (VppEnum.vl_api_ipsec_proto_t.
169                                  IPSEC_API_PROTO_ESP)
170         self.vpp_ah_protocol = (VppEnum.vl_api_ipsec_proto_t.
171                                 IPSEC_API_PROTO_AH)
172
173         self.create_pg_interfaces(range(3))
174         self.interfaces = list(self.pg_interfaces)
175         for i in self.interfaces:
176             i.admin_up()
177             i.config_ip4()
178             i.resolve_arp()
179             i.config_ip6()
180             i.resolve_ndp()
181         self.ipsec_select_backend()
182
183     def tearDown(self):
184         super(TemplateIpsec, self).tearDown()
185
186         for i in self.interfaces:
187             i.admin_down()
188             i.unconfig_ip4()
189             i.unconfig_ip6()
190
191         if not self.vpp_dead:
192             self.vapi.cli("show hardware")
193
194     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
195                          payload_size=54):
196         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
197                 sa.encrypt(IP(src=src, dst=dst) /
198                            ICMP() / Raw('X' * payload_size))
199                 for i in range(count)]
200
201     def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
202                           payload_size=54):
203         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
204                 sa.encrypt(IPv6(src=src, dst=dst) /
205                            ICMPv6EchoRequest(id=0, seq=1,
206                                              data='X' * payload_size))
207                 for i in range(count)]
208
209     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
210         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
211                 IP(src=src, dst=dst) / ICMP() / Raw('X' * payload_size)
212                 for i in range(count)]
213
214     def gen_pkts6(self, sw_intf, src, dst, count=1, payload_size=54):
215         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
216                 IPv6(src=src, dst=dst) /
217                 ICMPv6EchoRequest(id=0, seq=1, data='X' * payload_size)
218                 for i in range(count)]
219
220
221 class IpsecTcpTests(object):
222     def test_tcp_checksum(self):
223         """ verify checksum correctness for vpp generated packets """
224         self.vapi.cli("test http server")
225         p = self.params[socket.AF_INET]
226         config_tun_params(p, self.encryption_type, self.tun_if)
227         send = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
228                 p.scapy_tun_sa.encrypt(IP(src=p.remote_tun_if_host,
229                                           dst=self.tun_if.local_ip4) /
230                                        TCP(flags='S', dport=80)))
231         self.logger.debug(ppp("Sending packet:", send))
232         recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
233         recv = recv[0]
234         decrypted = p.vpp_tun_sa.decrypt(recv[IP])
235         self.assert_packet_checksums_valid(decrypted)
236
237
238 class IpsecTra4Tests(object):
239     def test_tra_anti_replay(self, count=1):
240         """ ipsec v4 transport anti-reply test """
241         p = self.params[socket.AF_INET]
242         use_esn = p.vpp_tra_sa.use_esn
243
244         # fire in a packet with seq number 1
245         pkt = (Ether(src=self.tra_if.remote_mac,
246                      dst=self.tra_if.local_mac) /
247                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
248                                          dst=self.tra_if.local_ip4) /
249                                       ICMP(),
250                                       seq_num=1))
251         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
252
253         # now move the window over to 235
254         pkt = (Ether(src=self.tra_if.remote_mac,
255                      dst=self.tra_if.local_mac) /
256                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
257                                          dst=self.tra_if.local_ip4) /
258                                       ICMP(),
259                                       seq_num=235))
260         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
261
262         # replayed packets are dropped
263         self.send_and_assert_no_replies(self.tra_if, pkt * 3)
264         self.assert_packet_counter_equal(
265             '/err/%s/SA replayed packet' % self.tra4_decrypt_node_name, 3)
266
267         # the window size is 64 packets
268         # in window are still accepted
269         pkt = (Ether(src=self.tra_if.remote_mac,
270                      dst=self.tra_if.local_mac) /
271                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
272                                          dst=self.tra_if.local_ip4) /
273                                       ICMP(),
274                                       seq_num=172))
275         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
276
277         # a packet that does not decrypt does not move the window forward
278         bogus_sa = SecurityAssociation(self.encryption_type,
279                                        p.vpp_tra_spi)
280         pkt = (Ether(src=self.tra_if.remote_mac,
281                      dst=self.tra_if.local_mac) /
282                bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
283                                    dst=self.tra_if.local_ip4) /
284                                 ICMP(),
285                                 seq_num=350))
286         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
287
288         self.assert_packet_counter_equal(
289             '/err/%s/Integrity check failed' % self.tra4_decrypt_node_name, 17)
290
291         # which we can determine since this packet is still in the window
292         pkt = (Ether(src=self.tra_if.remote_mac,
293                      dst=self.tra_if.local_mac) /
294                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
295                                          dst=self.tra_if.local_ip4) /
296                                       ICMP(),
297                                       seq_num=234))
298         self.send_and_expect(self.tra_if, [pkt], self.tra_if)
299
300         # out of window are dropped
301         pkt = (Ether(src=self.tra_if.remote_mac,
302                      dst=self.tra_if.local_mac) /
303                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
304                                          dst=self.tra_if.local_ip4) /
305                                       ICMP(),
306                                       seq_num=17))
307         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
308
309         if use_esn:
310             # an out of window error with ESN looks like a high sequence
311             # wrap. but since it isn't then the verify will fail.
312             self.assert_packet_counter_equal(
313                 '/err/%s/Integrity check failed' %
314                 self.tra4_decrypt_node_name, 34)
315
316         else:
317             self.assert_packet_counter_equal(
318                 '/err/%s/SA replayed packet' %
319                 self.tra4_decrypt_node_name, 20)
320
321         # valid packet moves the window over to 236
322         pkt = (Ether(src=self.tra_if.remote_mac,
323                      dst=self.tra_if.local_mac) /
324                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
325                                          dst=self.tra_if.local_ip4) /
326                                       ICMP(),
327                                       seq_num=236))
328         rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
329         decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
330
331         # move VPP's SA to just before the seq-number wrap
332         self.vapi.cli("test ipsec sa %d seq 0xffffffff" % p.scapy_tra_sa_id)
333
334         # then fire in a packet that VPP should drop because it causes the
335         # seq number to wrap  unless we're using extended.
336         pkt = (Ether(src=self.tra_if.remote_mac,
337                      dst=self.tra_if.local_mac) /
338                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
339                                          dst=self.tra_if.local_ip4) /
340                                       ICMP(),
341                                       seq_num=237))
342
343         if use_esn:
344             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
345             # in order to decrpyt the high order number needs to wrap
346             p.vpp_tra_sa.seq_num = 0x100000000
347             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
348
349             # send packets with high bits set
350             p.scapy_tra_sa.seq_num = 0x100000005
351             pkt = (Ether(src=self.tra_if.remote_mac,
352                          dst=self.tra_if.local_mac) /
353                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
354                                              dst=self.tra_if.local_ip4) /
355                                           ICMP(),
356                                           seq_num=0x100000005))
357             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
358             # in order to decrpyt the high order number needs to wrap
359             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
360         else:
361             self.send_and_assert_no_replies(self.tra_if, [pkt])
362             self.assert_packet_counter_equal(
363                 '/err/%s/sequence number cycled' %
364                 self.tra4_encrypt_node_name, 1)
365
366         # move the security-associations seq number on to the last we used
367         self.vapi.cli("test ipsec sa %d seq 0x15f" % p.scapy_tra_sa_id)
368         p.scapy_tra_sa.seq_num = 351
369         p.vpp_tra_sa.seq_num = 351
370
371     def test_tra_basic(self, count=1):
372         """ ipsec v4 transport basic test """
373         self.vapi.cli("clear errors")
374         try:
375             p = self.params[socket.AF_INET]
376             send_pkts = self.gen_encrypt_pkts(p.scapy_tra_sa, self.tra_if,
377                                               src=self.tra_if.remote_ip4,
378                                               dst=self.tra_if.local_ip4,
379                                               count=count)
380             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
381                                              self.tra_if)
382             for rx in recv_pkts:
383                 try:
384                     decrypted = p.vpp_tra_sa.decrypt(rx[IP])
385                     self.assert_packet_checksums_valid(decrypted)
386                 except:
387                     self.logger.debug(ppp("Unexpected packet:", rx))
388                     raise
389         finally:
390             self.logger.info(self.vapi.ppcli("show error"))
391             self.logger.info(self.vapi.ppcli("show ipsec"))
392
393         pkts = p.tra_sa_in.get_stats()['packets']
394         self.assertEqual(pkts, count,
395                          "incorrect SA in counts: expected %d != %d" %
396                          (count, pkts))
397         pkts = p.tra_sa_out.get_stats()['packets']
398         self.assertEqual(pkts, count,
399                          "incorrect SA out counts: expected %d != %d" %
400                          (count, pkts))
401
402         self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
403         self.assert_packet_counter_equal(self.tra4_decrypt_node_name, count)
404
405     def test_tra_burst(self):
406         """ ipsec v4 transport burst test """
407         self.test_tra_basic(count=257)
408
409
410 class IpsecTra6Tests(object):
411     def test_tra_basic6(self, count=1):
412         """ ipsec v6 transport basic test """
413         self.vapi.cli("clear errors")
414         try:
415             p = self.params[socket.AF_INET6]
416             send_pkts = self.gen_encrypt_pkts6(p.scapy_tra_sa, self.tra_if,
417                                                src=self.tra_if.remote_ip6,
418                                                dst=self.tra_if.local_ip6,
419                                                count=count)
420             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
421                                              self.tra_if)
422             for rx in recv_pkts:
423                 try:
424                     decrypted = p.vpp_tra_sa.decrypt(rx[IPv6])
425                     self.assert_packet_checksums_valid(decrypted)
426                 except:
427                     self.logger.debug(ppp("Unexpected packet:", rx))
428                     raise
429         finally:
430             self.logger.info(self.vapi.ppcli("show error"))
431             self.logger.info(self.vapi.ppcli("show ipsec"))
432
433         pkts = p.tra_sa_in.get_stats()['packets']
434         self.assertEqual(pkts, count,
435                          "incorrect SA in counts: expected %d != %d" %
436                          (count, pkts))
437         pkts = p.tra_sa_out.get_stats()['packets']
438         self.assertEqual(pkts, count,
439                          "incorrect SA out counts: expected %d != %d" %
440                          (count, pkts))
441         self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
442         self.assert_packet_counter_equal(self.tra6_decrypt_node_name, count)
443
444     def test_tra_burst6(self):
445         """ ipsec v6 transport burst test """
446         self.test_tra_basic6(count=257)
447
448
449 class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
450     pass
451
452
453 class IpsecTun4(object):
454
455     def verify_counters(self, p, count):
456         if (hasattr(p, "spd_policy_in_any")):
457             pkts = p.spd_policy_in_any.get_stats()['packets']
458             self.assertEqual(pkts, count,
459                              "incorrect SPD any policy: expected %d != %d" %
460                              (count, pkts))
461
462         if (hasattr(p, "tun_sa_in")):
463             pkts = p.tun_sa_in.get_stats()['packets']
464             self.assertEqual(pkts, count,
465                              "incorrect SA in counts: expected %d != %d" %
466                              (count, pkts))
467             pkts = p.tun_sa_out.get_stats()['packets']
468             self.assertEqual(pkts, count,
469                              "incorrect SA out counts: expected %d != %d" %
470                              (count, pkts))
471
472         self.assert_packet_counter_equal(self.tun4_encrypt_node_name, count)
473         self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
474
475     def verify_tun_44(self, p, count=1, payload_size=64, n_rx=None):
476         self.vapi.cli("clear errors")
477         if not n_rx:
478             n_rx = count
479         try:
480             config_tun_params(p, self.encryption_type, self.tun_if)
481             send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
482                                               src=p.remote_tun_if_host,
483                                               dst=self.pg1.remote_ip4,
484                                               count=count)
485             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
486             for recv_pkt in recv_pkts:
487                 self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host)
488                 self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
489                 self.assert_packet_checksums_valid(recv_pkt)
490             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
491                                       dst=p.remote_tun_if_host, count=count,
492                                       payload_size=payload_size)
493             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
494                                              self.tun_if, n_rx)
495             decrypt_pkts = []
496             for recv_pkt in recv_pkts:
497                 try:
498                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
499                     if not decrypt_pkt.haslayer(IP):
500                         decrypt_pkt = IP(decrypt_pkt[Raw].load)
501                     decrypt_pkts.append(decrypt_pkt)
502                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
503                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
504                 except:
505                     self.logger.debug(ppp("Unexpected packet:", recv_pkt))
506                     try:
507                         self.logger.debug(
508                             ppp("Decrypted packet:", decrypt_pkt))
509                     except:
510                         pass
511                     raise
512             pkts = reassemble4(decrypt_pkts)
513             for pkt in pkts:
514                 self.assert_packet_checksums_valid(pkt)
515         finally:
516             self.logger.info(self.vapi.ppcli("show error"))
517             self.logger.info(self.vapi.ppcli("show ipsec"))
518
519         self.verify_counters(p, count)
520
521     def verify_tun_64(self, p, count=1):
522         self.vapi.cli("clear errors")
523         try:
524             config_tun_params(p, self.encryption_type, self.tun_if)
525             send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
526                                                src=p.remote_tun_if_host6,
527                                                dst=self.pg1.remote_ip6,
528                                                count=count)
529             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
530             for recv_pkt in recv_pkts:
531                 self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host6)
532                 self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
533                 self.assert_packet_checksums_valid(recv_pkt)
534             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
535                                        dst=p.remote_tun_if_host6, count=count)
536             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
537             for recv_pkt in recv_pkts:
538                 try:
539                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
540                     if not decrypt_pkt.haslayer(IPv6):
541                         decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
542                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
543                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host6)
544                     self.assert_packet_checksums_valid(decrypt_pkt)
545                 except:
546                     self.logger.error(ppp("Unexpected packet:", recv_pkt))
547                     try:
548                         self.logger.debug(
549                             ppp("Decrypted packet:", decrypt_pkt))
550                     except:
551                         pass
552                     raise
553         finally:
554             self.logger.info(self.vapi.ppcli("show error"))
555             self.logger.info(self.vapi.ppcli("show ipsec"))
556
557         self.verify_counters(p, count)
558
559
560 class IpsecTun4Tests(IpsecTun4):
561
562     def test_tun_basic44(self):
563         """ ipsec 4o4 tunnel basic test """
564         self.verify_tun_44(self.params[socket.AF_INET], count=1)
565
566     def test_tun_burst44(self):
567         """ ipsec 4o4 tunnel burst test """
568         self.verify_tun_44(self.params[socket.AF_INET], count=257)
569
570
571 class IpsecTun6(object):
572
573     def verify_counters(self, p, count):
574         if (hasattr(p, "tun_sa_in")):
575             pkts = p.tun_sa_in.get_stats()['packets']
576             self.assertEqual(pkts, count,
577                              "incorrect SA in counts: expected %d != %d" %
578                              (count, pkts))
579             pkts = p.tun_sa_out.get_stats()['packets']
580             self.assertEqual(pkts, count,
581                              "incorrect SA out counts: expected %d != %d" %
582                              (count, pkts))
583         self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
584         self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count)
585
586     def verify_tun_66(self, p, count=1):
587         """ ipsec 6o6 tunnel basic test """
588         self.vapi.cli("clear errors")
589         try:
590             config_tun_params(p, self.encryption_type, self.tun_if)
591             send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
592                                                src=p.remote_tun_if_host,
593                                                dst=self.pg1.remote_ip6,
594                                                count=count)
595             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
596             for recv_pkt in recv_pkts:
597                 self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host)
598                 self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
599                 self.assert_packet_checksums_valid(recv_pkt)
600             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
601                                        dst=p.remote_tun_if_host,
602                                        count=count)
603             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
604             for recv_pkt in recv_pkts:
605                 try:
606                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
607                     if not decrypt_pkt.haslayer(IPv6):
608                         decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
609                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
610                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
611                     self.assert_packet_checksums_valid(decrypt_pkt)
612                 except:
613                     self.logger.debug(ppp("Unexpected packet:", recv_pkt))
614                     try:
615                         self.logger.debug(
616                             ppp("Decrypted packet:", decrypt_pkt))
617                     except:
618                         pass
619                     raise
620         finally:
621             self.logger.info(self.vapi.ppcli("show error"))
622             self.logger.info(self.vapi.ppcli("show ipsec"))
623         self.verify_counters(p, count)
624
625     def verify_tun_46(self, p, count=1):
626         """ ipsec 4o6 tunnel basic test """
627         self.vapi.cli("clear errors")
628         try:
629             config_tun_params(p, self.encryption_type, self.tun_if)
630             send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
631                                               src=p.remote_tun_if_host4,
632                                               dst=self.pg1.remote_ip4,
633                                               count=count)
634             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
635             for recv_pkt in recv_pkts:
636                 self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host4)
637                 self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
638                 self.assert_packet_checksums_valid(recv_pkt)
639             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
640                                       dst=p.remote_tun_if_host4,
641                                       count=count)
642             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
643             for recv_pkt in recv_pkts:
644                 try:
645                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
646                     if not decrypt_pkt.haslayer(IP):
647                         decrypt_pkt = IP(decrypt_pkt[Raw].load)
648                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
649                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host4)
650                     self.assert_packet_checksums_valid(decrypt_pkt)
651                 except:
652                     self.logger.debug(ppp("Unexpected packet:", recv_pkt))
653                     try:
654                         self.logger.debug(ppp("Decrypted packet:",
655                                               decrypt_pkt))
656                     except:
657                         pass
658                     raise
659         finally:
660             self.logger.info(self.vapi.ppcli("show error"))
661             self.logger.info(self.vapi.ppcli("show ipsec"))
662         self.verify_counters(p, count)
663
664
665 class IpsecTun6Tests(IpsecTun6):
666
667     def test_tun_basic66(self):
668         """ ipsec 6o6 tunnel basic test """
669         self.verify_tun_66(self.params[socket.AF_INET6], count=1)
670
671     def test_tun_burst66(self):
672         """ ipsec 6o6 tunnel burst test """
673         self.verify_tun_66(self.params[socket.AF_INET6], count=257)
674
675
676 class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests):
677     pass
678
679
680 if __name__ == '__main__':
681     unittest.main(testRunner=VppTestRunner)