Tests Cleanup: Fix missing calls to setUpClass/tearDownClass.
[vpp.git] / test / template_ipsec.py
1 import unittest
2 import socket
3
4 from scapy.layers.inet import IP, ICMP, TCP, UDP
5 from scapy.layers.ipsec import SecurityAssociation
6 from scapy.layers.l2 import Ether, Raw
7 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest
8
9 from framework import VppTestCase, VppTestRunner
10 from util import ppp, reassemble4
11 from vpp_papi import VppEnum
12
13
14 class IPsecIPv4Params(object):
15
16     addr_type = socket.AF_INET
17     addr_any = "0.0.0.0"
18     addr_bcast = "255.255.255.255"
19     addr_len = 32
20     is_ipv6 = 0
21
22     def __init__(self):
23         self.remote_tun_if_host = '1.1.1.1'
24         self.remote_tun_if_host6 = '1111::1'
25
26         self.scapy_tun_sa_id = 10
27         self.scapy_tun_spi = 1001
28         self.vpp_tun_sa_id = 20
29         self.vpp_tun_spi = 1000
30
31         self.scapy_tra_sa_id = 30
32         self.scapy_tra_spi = 2001
33         self.vpp_tra_sa_id = 40
34         self.vpp_tra_spi = 2000
35
36         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
37                                  IPSEC_API_INTEG_ALG_SHA1_96)
38         self.auth_algo = 'HMAC-SHA1-96'  # scapy name
39         self.auth_key = 'C91KUR9GYMm5GfkEvNjX'
40
41         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
42                                   IPSEC_API_CRYPTO_ALG_AES_CBC_128)
43         self.crypt_algo = 'AES-CBC'  # scapy name
44         self.crypt_key = 'JPjyOWBeVEQiMe7h'
45         self.flags = 0
46         self.nat_header = None
47
48
49 class IPsecIPv6Params(object):
50
51     addr_type = socket.AF_INET6
52     addr_any = "0::0"
53     addr_bcast = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"
54     addr_len = 128
55     is_ipv6 = 1
56
57     def __init__(self):
58         self.remote_tun_if_host = '1111:1111:1111:1111:1111:1111:1111:1111'
59         self.remote_tun_if_host4 = '1.1.1.1'
60
61         self.scapy_tun_sa_id = 50
62         self.scapy_tun_spi = 3001
63         self.vpp_tun_sa_id = 60
64         self.vpp_tun_spi = 3000
65
66         self.scapy_tra_sa_id = 70
67         self.scapy_tra_spi = 4001
68         self.vpp_tra_sa_id = 80
69         self.vpp_tra_spi = 4000
70
71         self.auth_algo_vpp_id = (VppEnum.vl_api_ipsec_integ_alg_t.
72                                  IPSEC_API_INTEG_ALG_SHA1_96)
73         self.auth_algo = 'HMAC-SHA1-96'  # scapy name
74         self.auth_key = 'C91KUR9GYMm5GfkEvNjX'
75
76         self.crypt_algo_vpp_id = (VppEnum.vl_api_ipsec_crypto_alg_t.
77                                   IPSEC_API_CRYPTO_ALG_AES_CBC_256)
78         self.crypt_algo = 'AES-CBC'  # scapy name
79         self.crypt_key = 'JPjyOWBeVEQiMe7hJPjyOWBeVEQiMe7h'
80         self.flags = 0
81         self.nat_header = None
82
83
84 def config_tun_params(p, encryption_type, tun_if):
85     ip_class_by_addr_type = {socket.AF_INET: IP, socket.AF_INET6: IPv6}
86     use_esn = bool(p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
87                               IPSEC_API_SAD_FLAG_USE_ESN))
88     p.scapy_tun_sa = SecurityAssociation(
89         encryption_type, spi=p.vpp_tun_spi,
90         crypt_algo=p.crypt_algo, crypt_key=p.crypt_key,
91         auth_algo=p.auth_algo, auth_key=p.auth_key,
92         tunnel_header=ip_class_by_addr_type[p.addr_type](
93             src=tun_if.remote_addr[p.addr_type],
94             dst=tun_if.local_addr[p.addr_type]),
95         nat_t_header=p.nat_header,
96         use_esn=use_esn)
97     p.vpp_tun_sa = SecurityAssociation(
98         encryption_type, spi=p.scapy_tun_spi,
99         crypt_algo=p.crypt_algo, crypt_key=p.crypt_key,
100         auth_algo=p.auth_algo, auth_key=p.auth_key,
101         tunnel_header=ip_class_by_addr_type[p.addr_type](
102             dst=tun_if.remote_addr[p.addr_type],
103             src=tun_if.local_addr[p.addr_type]),
104         nat_t_header=p.nat_header,
105         use_esn=use_esn)
106
107
108 def config_tra_params(p, encryption_type):
109     use_esn = p.flags & (VppEnum.vl_api_ipsec_sad_flags_t.
110                          IPSEC_API_SAD_FLAG_USE_ESN)
111     p.scapy_tra_sa = SecurityAssociation(
112         encryption_type,
113         spi=p.vpp_tra_spi,
114         crypt_algo=p.crypt_algo,
115         crypt_key=p.crypt_key,
116         auth_algo=p.auth_algo,
117         auth_key=p.auth_key,
118         nat_t_header=p.nat_header,
119         use_esn=use_esn)
120     p.vpp_tra_sa = SecurityAssociation(
121         encryption_type,
122         spi=p.scapy_tra_spi,
123         crypt_algo=p.crypt_algo,
124         crypt_key=p.crypt_key,
125         auth_algo=p.auth_algo,
126         auth_key=p.auth_key,
127         nat_t_header=p.nat_header,
128         use_esn=use_esn)
129
130
131 class TemplateIpsec(VppTestCase):
132     """
133     TRANSPORT MODE:
134
135      ------   encrypt   ---
136     |tra_if| <-------> |VPP|
137      ------   decrypt   ---
138
139     TUNNEL MODE:
140
141      ------   encrypt   ---   plain   ---
142     |tun_if| <-------  |VPP| <------ |pg1|
143      ------             ---           ---
144
145      ------   decrypt   ---   plain   ---
146     |tun_if| ------->  |VPP| ------> |pg1|
147      ------             ---           ---
148     """
149
150     def ipsec_select_backend(self):
151         """ empty method to be overloaded when necessary """
152         pass
153
154     @classmethod
155     def setUpClass(cls):
156         super(TemplateIpsec, cls).setUpClass()
157
158     @classmethod
159     def tearDownClass(cls):
160         super(TemplateIpsec, cls).tearDownClass()
161
162     def setUp(self):
163         super(TemplateIpsec, self).setUp()
164
165     def setup_params(self):
166         self.ipv4_params = IPsecIPv4Params()
167         self.ipv6_params = IPsecIPv6Params()
168         self.params = {self.ipv4_params.addr_type: self.ipv4_params,
169                        self.ipv6_params.addr_type: self.ipv6_params}
170
171     def setUp(self):
172         super(TemplateIpsec, self).setUp()
173
174         self.setup_params()
175
176         self.tun_spd_id = 1
177         self.tra_spd_id = 2
178
179         self.vpp_esp_protocol = (VppEnum.vl_api_ipsec_proto_t.
180                                  IPSEC_API_PROTO_ESP)
181         self.vpp_ah_protocol = (VppEnum.vl_api_ipsec_proto_t.
182                                 IPSEC_API_PROTO_AH)
183
184         self.create_pg_interfaces(range(3))
185         self.interfaces = list(self.pg_interfaces)
186         for i in self.interfaces:
187             i.admin_up()
188             i.config_ip4()
189             i.resolve_arp()
190             i.config_ip6()
191             i.resolve_ndp()
192         self.ipsec_select_backend()
193
194     def tearDown(self):
195         super(TemplateIpsec, self).tearDown()
196
197         for i in self.interfaces:
198             i.admin_down()
199             i.unconfig_ip4()
200             i.unconfig_ip6()
201
202         if not self.vpp_dead:
203             self.vapi.cli("show hardware")
204
205     def gen_encrypt_pkts(self, sa, sw_intf, src, dst, count=1,
206                          payload_size=54):
207         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
208                 sa.encrypt(IP(src=src, dst=dst) /
209                            ICMP() / Raw('X' * payload_size))
210                 for i in range(count)]
211
212     def gen_encrypt_pkts6(self, sa, sw_intf, src, dst, count=1,
213                           payload_size=54):
214         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
215                 sa.encrypt(IPv6(src=src, dst=dst) /
216                            ICMPv6EchoRequest(id=0, seq=1,
217                                              data='X' * payload_size))
218                 for i in range(count)]
219
220     def gen_pkts(self, sw_intf, src, dst, count=1, payload_size=54):
221         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
222                 IP(src=src, dst=dst) / ICMP() / Raw('X' * payload_size)
223                 for i in range(count)]
224
225     def gen_pkts6(self, sw_intf, src, dst, count=1, payload_size=54):
226         return [Ether(src=sw_intf.remote_mac, dst=sw_intf.local_mac) /
227                 IPv6(src=src, dst=dst) /
228                 ICMPv6EchoRequest(id=0, seq=1, data='X' * payload_size)
229                 for i in range(count)]
230
231
232 class IpsecTcpTests(object):
233     def test_tcp_checksum(self):
234         """ verify checksum correctness for vpp generated packets """
235         self.vapi.cli("test http server")
236         p = self.params[socket.AF_INET]
237         config_tun_params(p, self.encryption_type, self.tun_if)
238         send = (Ether(src=self.tun_if.remote_mac, dst=self.tun_if.local_mac) /
239                 p.scapy_tun_sa.encrypt(IP(src=p.remote_tun_if_host,
240                                           dst=self.tun_if.local_ip4) /
241                                        TCP(flags='S', dport=80)))
242         self.logger.debug(ppp("Sending packet:", send))
243         recv = self.send_and_expect(self.tun_if, [send], self.tun_if)
244         recv = recv[0]
245         decrypted = p.vpp_tun_sa.decrypt(recv[IP])
246         self.assert_packet_checksums_valid(decrypted)
247
248
249 class IpsecTra4Tests(object):
250     def test_tra_anti_replay(self, count=1):
251         """ ipsec v4 transport anti-reply test """
252         p = self.params[socket.AF_INET]
253         use_esn = p.vpp_tra_sa.use_esn
254
255         # fire in a packet with seq number 1
256         pkt = (Ether(src=self.tra_if.remote_mac,
257                      dst=self.tra_if.local_mac) /
258                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
259                                          dst=self.tra_if.local_ip4) /
260                                       ICMP(),
261                                       seq_num=1))
262         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
263
264         # now move the window over to 235
265         pkt = (Ether(src=self.tra_if.remote_mac,
266                      dst=self.tra_if.local_mac) /
267                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
268                                          dst=self.tra_if.local_ip4) /
269                                       ICMP(),
270                                       seq_num=235))
271         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
272
273         # replayed packets are dropped
274         self.send_and_assert_no_replies(self.tra_if, pkt * 3)
275         self.assert_packet_counter_equal(
276             '/err/%s/SA replayed packet' % self.tra4_decrypt_node_name, 3)
277
278         # the window size is 64 packets
279         # in window are still accepted
280         pkt = (Ether(src=self.tra_if.remote_mac,
281                      dst=self.tra_if.local_mac) /
282                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
283                                          dst=self.tra_if.local_ip4) /
284                                       ICMP(),
285                                       seq_num=172))
286         recv_pkts = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
287
288         # a packet that does not decrypt does not move the window forward
289         bogus_sa = SecurityAssociation(self.encryption_type,
290                                        p.vpp_tra_spi)
291         pkt = (Ether(src=self.tra_if.remote_mac,
292                      dst=self.tra_if.local_mac) /
293                bogus_sa.encrypt(IP(src=self.tra_if.remote_ip4,
294                                    dst=self.tra_if.local_ip4) /
295                                 ICMP(),
296                                 seq_num=350))
297         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
298
299         self.assert_packet_counter_equal(
300             '/err/%s/Integrity check failed' % self.tra4_decrypt_node_name, 17)
301
302         # which we can determine since this packet is still in the window
303         pkt = (Ether(src=self.tra_if.remote_mac,
304                      dst=self.tra_if.local_mac) /
305                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
306                                          dst=self.tra_if.local_ip4) /
307                                       ICMP(),
308                                       seq_num=234))
309         self.send_and_expect(self.tra_if, [pkt], self.tra_if)
310
311         # out of window are dropped
312         pkt = (Ether(src=self.tra_if.remote_mac,
313                      dst=self.tra_if.local_mac) /
314                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
315                                          dst=self.tra_if.local_ip4) /
316                                       ICMP(),
317                                       seq_num=17))
318         self.send_and_assert_no_replies(self.tra_if, pkt * 17)
319
320         if use_esn:
321             # an out of window error with ESN looks like a high sequence
322             # wrap. but since it isn't then the verify will fail.
323             self.assert_packet_counter_equal(
324                 '/err/%s/Integrity check failed' %
325                 self.tra4_decrypt_node_name, 34)
326
327         else:
328             self.assert_packet_counter_equal(
329                 '/err/%s/SA replayed packet' %
330                 self.tra4_decrypt_node_name, 20)
331
332         # valid packet moves the window over to 236
333         pkt = (Ether(src=self.tra_if.remote_mac,
334                      dst=self.tra_if.local_mac) /
335                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
336                                          dst=self.tra_if.local_ip4) /
337                                       ICMP(),
338                                       seq_num=236))
339         rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
340         decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
341
342         # move VPP's SA to just before the seq-number wrap
343         self.vapi.cli("test ipsec sa %d seq 0xffffffff" % p.scapy_tra_sa_id)
344
345         # then fire in a packet that VPP should drop because it causes the
346         # seq number to wrap  unless we're using extended.
347         pkt = (Ether(src=self.tra_if.remote_mac,
348                      dst=self.tra_if.local_mac) /
349                p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
350                                          dst=self.tra_if.local_ip4) /
351                                       ICMP(),
352                                       seq_num=237))
353
354         if use_esn:
355             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
356             # in order to decrpyt the high order number needs to wrap
357             p.vpp_tra_sa.seq_num = 0x100000000
358             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
359
360             # send packets with high bits set
361             p.scapy_tra_sa.seq_num = 0x100000005
362             pkt = (Ether(src=self.tra_if.remote_mac,
363                          dst=self.tra_if.local_mac) /
364                    p.scapy_tra_sa.encrypt(IP(src=self.tra_if.remote_ip4,
365                                              dst=self.tra_if.local_ip4) /
366                                           ICMP(),
367                                           seq_num=0x100000005))
368             rx = self.send_and_expect(self.tra_if, [pkt], self.tra_if)
369             # in order to decrpyt the high order number needs to wrap
370             decrypted = p.vpp_tra_sa.decrypt(rx[0][IP])
371         else:
372             self.send_and_assert_no_replies(self.tra_if, [pkt])
373             self.assert_packet_counter_equal(
374                 '/err/%s/sequence number cycled' %
375                 self.tra4_encrypt_node_name, 1)
376
377         # move the security-associations seq number on to the last we used
378         self.vapi.cli("test ipsec sa %d seq 0x15f" % p.scapy_tra_sa_id)
379         p.scapy_tra_sa.seq_num = 351
380         p.vpp_tra_sa.seq_num = 351
381
382     def test_tra_basic(self, count=1):
383         """ ipsec v4 transport basic test """
384         self.vapi.cli("clear errors")
385         try:
386             p = self.params[socket.AF_INET]
387             send_pkts = self.gen_encrypt_pkts(p.scapy_tra_sa, self.tra_if,
388                                               src=self.tra_if.remote_ip4,
389                                               dst=self.tra_if.local_ip4,
390                                               count=count)
391             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
392                                              self.tra_if)
393             for rx in recv_pkts:
394                 try:
395                     decrypted = p.vpp_tra_sa.decrypt(rx[IP])
396                     self.assert_packet_checksums_valid(decrypted)
397                 except:
398                     self.logger.debug(ppp("Unexpected packet:", rx))
399                     raise
400         finally:
401             self.logger.info(self.vapi.ppcli("show error"))
402             self.logger.info(self.vapi.ppcli("show ipsec"))
403
404         pkts = p.tra_sa_in.get_stats()['packets']
405         self.assertEqual(pkts, count,
406                          "incorrect SA in counts: expected %d != %d" %
407                          (count, pkts))
408         pkts = p.tra_sa_out.get_stats()['packets']
409         self.assertEqual(pkts, count,
410                          "incorrect SA out counts: expected %d != %d" %
411                          (count, pkts))
412
413         self.assert_packet_counter_equal(self.tra4_encrypt_node_name, count)
414         self.assert_packet_counter_equal(self.tra4_decrypt_node_name, count)
415
416     def test_tra_burst(self):
417         """ ipsec v4 transport burst test """
418         self.test_tra_basic(count=257)
419
420
421 class IpsecTra6Tests(object):
422     def test_tra_basic6(self, count=1):
423         """ ipsec v6 transport basic test """
424         self.vapi.cli("clear errors")
425         try:
426             p = self.params[socket.AF_INET6]
427             send_pkts = self.gen_encrypt_pkts6(p.scapy_tra_sa, self.tra_if,
428                                                src=self.tra_if.remote_ip6,
429                                                dst=self.tra_if.local_ip6,
430                                                count=count)
431             recv_pkts = self.send_and_expect(self.tra_if, send_pkts,
432                                              self.tra_if)
433             for rx in recv_pkts:
434                 try:
435                     decrypted = p.vpp_tra_sa.decrypt(rx[IPv6])
436                     self.assert_packet_checksums_valid(decrypted)
437                 except:
438                     self.logger.debug(ppp("Unexpected packet:", rx))
439                     raise
440         finally:
441             self.logger.info(self.vapi.ppcli("show error"))
442             self.logger.info(self.vapi.ppcli("show ipsec"))
443
444         pkts = p.tra_sa_in.get_stats()['packets']
445         self.assertEqual(pkts, count,
446                          "incorrect SA in counts: expected %d != %d" %
447                          (count, pkts))
448         pkts = p.tra_sa_out.get_stats()['packets']
449         self.assertEqual(pkts, count,
450                          "incorrect SA out counts: expected %d != %d" %
451                          (count, pkts))
452         self.assert_packet_counter_equal(self.tra6_encrypt_node_name, count)
453         self.assert_packet_counter_equal(self.tra6_decrypt_node_name, count)
454
455     def test_tra_burst6(self):
456         """ ipsec v6 transport burst test """
457         self.test_tra_basic6(count=257)
458
459
460 class IpsecTra46Tests(IpsecTra4Tests, IpsecTra6Tests):
461     pass
462
463
464 class IpsecTun4(object):
465
466     def verify_counters(self, p, count):
467         if (hasattr(p, "spd_policy_in_any")):
468             pkts = p.spd_policy_in_any.get_stats()['packets']
469             self.assertEqual(pkts, count,
470                              "incorrect SPD any policy: expected %d != %d" %
471                              (count, pkts))
472
473         if (hasattr(p, "tun_sa_in")):
474             pkts = p.tun_sa_in.get_stats()['packets']
475             self.assertEqual(pkts, count,
476                              "incorrect SA in counts: expected %d != %d" %
477                              (count, pkts))
478             pkts = p.tun_sa_out.get_stats()['packets']
479             self.assertEqual(pkts, count,
480                              "incorrect SA out counts: expected %d != %d" %
481                              (count, pkts))
482
483         self.assert_packet_counter_equal(self.tun4_encrypt_node_name, count)
484         self.assert_packet_counter_equal(self.tun4_decrypt_node_name, count)
485
486     def verify_decrypted(self, p, rxs):
487         for rx in rxs:
488             self.assert_equal(rx[IP].src, p.remote_tun_if_host)
489             self.assert_equal(rx[IP].dst, self.pg1.remote_ip4)
490             self.assert_packet_checksums_valid(rx)
491
492     def verify_encrypted(self, p, sa, rxs):
493         decrypt_pkts = []
494         for rx in rxs:
495             try:
496                 decrypt_pkt = p.vpp_tun_sa.decrypt(rx[IP])
497                 if not decrypt_pkt.haslayer(IP):
498                     decrypt_pkt = IP(decrypt_pkt[Raw].load)
499                 decrypt_pkts.append(decrypt_pkt)
500                 self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
501                 self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
502             except:
503                 self.logger.debug(ppp("Unexpected packet:", rx))
504                 try:
505                     self.logger.debug(ppp("Decrypted packet:", decrypt_pkt))
506                 except:
507                     pass
508                 raise
509         pkts = reassemble4(decrypt_pkts)
510         for pkt in pkts:
511             self.assert_packet_checksums_valid(pkt)
512
513     def verify_tun_44(self, p, count=1, payload_size=64, n_rx=None):
514         self.vapi.cli("clear errors")
515         if not n_rx:
516             n_rx = count
517         try:
518             config_tun_params(p, self.encryption_type, self.tun_if)
519             send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
520                                               src=p.remote_tun_if_host,
521                                               dst=self.pg1.remote_ip4,
522                                               count=count)
523             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
524             self.verify_decrypted(p, recv_pkts)
525
526             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
527                                       dst=p.remote_tun_if_host, count=count,
528                                       payload_size=payload_size)
529             recv_pkts = self.send_and_expect(self.pg1, send_pkts,
530                                              self.tun_if, n_rx)
531             self.verify_encrypted(p, p.vpp_tun_sa, recv_pkts)
532
533         finally:
534             self.logger.info(self.vapi.ppcli("show error"))
535             self.logger.info(self.vapi.ppcli("show ipsec"))
536
537         self.verify_counters(p, count)
538
539     def verify_tun_64(self, p, count=1):
540         self.vapi.cli("clear errors")
541         try:
542             config_tun_params(p, self.encryption_type, self.tun_if)
543             send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
544                                                src=p.remote_tun_if_host6,
545                                                dst=self.pg1.remote_ip6,
546                                                count=count)
547             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
548             for recv_pkt in recv_pkts:
549                 self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host6)
550                 self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
551                 self.assert_packet_checksums_valid(recv_pkt)
552             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
553                                        dst=p.remote_tun_if_host6, count=count)
554             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
555             for recv_pkt in recv_pkts:
556                 try:
557                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IP])
558                     if not decrypt_pkt.haslayer(IPv6):
559                         decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
560                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
561                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host6)
562                     self.assert_packet_checksums_valid(decrypt_pkt)
563                 except:
564                     self.logger.error(ppp("Unexpected packet:", recv_pkt))
565                     try:
566                         self.logger.debug(
567                             ppp("Decrypted packet:", decrypt_pkt))
568                     except:
569                         pass
570                     raise
571         finally:
572             self.logger.info(self.vapi.ppcli("show error"))
573             self.logger.info(self.vapi.ppcli("show ipsec"))
574
575         self.verify_counters(p, count)
576
577
578 class IpsecTun4Tests(IpsecTun4):
579
580     def test_tun_basic44(self):
581         """ ipsec 4o4 tunnel basic test """
582         self.verify_tun_44(self.params[socket.AF_INET], count=1)
583
584     def test_tun_burst44(self):
585         """ ipsec 4o4 tunnel burst test """
586         self.verify_tun_44(self.params[socket.AF_INET], count=257)
587
588
589 class IpsecTun6(object):
590
591     def verify_counters(self, p, count):
592         if (hasattr(p, "tun_sa_in")):
593             pkts = p.tun_sa_in.get_stats()['packets']
594             self.assertEqual(pkts, count,
595                              "incorrect SA in counts: expected %d != %d" %
596                              (count, pkts))
597             pkts = p.tun_sa_out.get_stats()['packets']
598             self.assertEqual(pkts, count,
599                              "incorrect SA out counts: expected %d != %d" %
600                              (count, pkts))
601         self.assert_packet_counter_equal(self.tun6_encrypt_node_name, count)
602         self.assert_packet_counter_equal(self.tun6_decrypt_node_name, count)
603
604     def verify_tun_66(self, p, count=1):
605         """ ipsec 6o6 tunnel basic test """
606         self.vapi.cli("clear errors")
607         try:
608             config_tun_params(p, self.encryption_type, self.tun_if)
609             send_pkts = self.gen_encrypt_pkts6(p.scapy_tun_sa, self.tun_if,
610                                                src=p.remote_tun_if_host,
611                                                dst=self.pg1.remote_ip6,
612                                                count=count)
613             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
614             for recv_pkt in recv_pkts:
615                 self.assert_equal(recv_pkt[IPv6].src, p.remote_tun_if_host)
616                 self.assert_equal(recv_pkt[IPv6].dst, self.pg1.remote_ip6)
617                 self.assert_packet_checksums_valid(recv_pkt)
618             send_pkts = self.gen_pkts6(self.pg1, src=self.pg1.remote_ip6,
619                                        dst=p.remote_tun_if_host,
620                                        count=count)
621             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
622             for recv_pkt in recv_pkts:
623                 try:
624                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
625                     if not decrypt_pkt.haslayer(IPv6):
626                         decrypt_pkt = IPv6(decrypt_pkt[Raw].load)
627                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip6)
628                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host)
629                     self.assert_packet_checksums_valid(decrypt_pkt)
630                 except:
631                     self.logger.debug(ppp("Unexpected packet:", recv_pkt))
632                     try:
633                         self.logger.debug(
634                             ppp("Decrypted packet:", decrypt_pkt))
635                     except:
636                         pass
637                     raise
638         finally:
639             self.logger.info(self.vapi.ppcli("show error"))
640             self.logger.info(self.vapi.ppcli("show ipsec"))
641         self.verify_counters(p, count)
642
643     def verify_tun_46(self, p, count=1):
644         """ ipsec 4o6 tunnel basic test """
645         self.vapi.cli("clear errors")
646         try:
647             config_tun_params(p, self.encryption_type, self.tun_if)
648             send_pkts = self.gen_encrypt_pkts(p.scapy_tun_sa, self.tun_if,
649                                               src=p.remote_tun_if_host4,
650                                               dst=self.pg1.remote_ip4,
651                                               count=count)
652             recv_pkts = self.send_and_expect(self.tun_if, send_pkts, self.pg1)
653             for recv_pkt in recv_pkts:
654                 self.assert_equal(recv_pkt[IP].src, p.remote_tun_if_host4)
655                 self.assert_equal(recv_pkt[IP].dst, self.pg1.remote_ip4)
656                 self.assert_packet_checksums_valid(recv_pkt)
657             send_pkts = self.gen_pkts(self.pg1, src=self.pg1.remote_ip4,
658                                       dst=p.remote_tun_if_host4,
659                                       count=count)
660             recv_pkts = self.send_and_expect(self.pg1, send_pkts, self.tun_if)
661             for recv_pkt in recv_pkts:
662                 try:
663                     decrypt_pkt = p.vpp_tun_sa.decrypt(recv_pkt[IPv6])
664                     if not decrypt_pkt.haslayer(IP):
665                         decrypt_pkt = IP(decrypt_pkt[Raw].load)
666                     self.assert_equal(decrypt_pkt.src, self.pg1.remote_ip4)
667                     self.assert_equal(decrypt_pkt.dst, p.remote_tun_if_host4)
668                     self.assert_packet_checksums_valid(decrypt_pkt)
669                 except:
670                     self.logger.debug(ppp("Unexpected packet:", recv_pkt))
671                     try:
672                         self.logger.debug(ppp("Decrypted packet:",
673                                               decrypt_pkt))
674                     except:
675                         pass
676                     raise
677         finally:
678             self.logger.info(self.vapi.ppcli("show error"))
679             self.logger.info(self.vapi.ppcli("show ipsec"))
680         self.verify_counters(p, count)
681
682
683 class IpsecTun6Tests(IpsecTun6):
684
685     def test_tun_basic66(self):
686         """ ipsec 6o6 tunnel basic test """
687         self.verify_tun_66(self.params[socket.AF_INET6], count=1)
688
689     def test_tun_burst66(self):
690         """ ipsec 6o6 tunnel burst test """
691         self.verify_tun_66(self.params[socket.AF_INET6], count=257)
692
693
694 class IpsecTun46Tests(IpsecTun4Tests, IpsecTun6Tests):
695     pass
696
697
698 if __name__ == '__main__':
699     unittest.main(testRunner=VppTestRunner)