Flowprobe - tests speed-up
[vpp.git] / test / test_snat.py
1 #!/usr/bin/env python
2
3 import socket
4 import unittest
5 import struct
6
7 from framework import VppTestCase, VppTestRunner, running_extended_tests
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
10 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
11 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6
12 from scapy.layers.l2 import Ether, ARP
13 from scapy.data import IP_PROTOS
14 from scapy.packet import bind_layers
15 from util import ppp
16 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
17 from time import sleep
18
19
20 class MethodHolder(VppTestCase):
21     """ SNAT create capture and verify method holder """
22
23     @classmethod
24     def setUpClass(cls):
25         super(MethodHolder, cls).setUpClass()
26
27     def tearDown(self):
28         super(MethodHolder, self).tearDown()
29
30     def check_tcp_checksum(self, pkt):
31         """
32         Check TCP checksum in IP packet
33
34         :param pkt: Packet to check TCP checksum
35         """
36         new = pkt.__class__(str(pkt))
37         del new['TCP'].chksum
38         new = new.__class__(str(new))
39         self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
40
41     def create_stream_in(self, in_if, out_if, ttl=64):
42         """
43         Create packet stream for inside network
44
45         :param in_if: Inside interface
46         :param out_if: Outside interface
47         :param ttl: TTL of generated packets
48         """
49         pkts = []
50         # TCP
51         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
52              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
53              TCP(sport=self.tcp_port_in, dport=20))
54         pkts.append(p)
55
56         # UDP
57         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
58              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
59              UDP(sport=self.udp_port_in, dport=20))
60         pkts.append(p)
61
62         # ICMP
63         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
64              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
65              ICMP(id=self.icmp_id_in, type='echo-request'))
66         pkts.append(p)
67
68         return pkts
69
70     def create_stream_in_ip6(self, in_if, out_if, hlim=64):
71         """
72         Create IPv6 packet stream for inside network
73
74         :param in_if: Inside interface
75         :param out_if: Outside interface
76         :param ttl: Hop Limit of generated packets
77         """
78         pkts = []
79         dst = ''.join(['64:ff9b::', out_if.remote_ip4])
80         # TCP
81         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
82              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
83              TCP(sport=self.tcp_port_in, dport=20))
84         pkts.append(p)
85
86         # UDP
87         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
88              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
89              UDP(sport=self.udp_port_in, dport=20))
90         pkts.append(p)
91
92         # ICMP
93         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
94              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
95              ICMPv6EchoRequest(id=self.icmp_id_in))
96         pkts.append(p)
97
98         return pkts
99
100     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
101         """
102         Create packet stream for outside network
103
104         :param out_if: Outside interface
105         :param dst_ip: Destination IP address (Default use global SNAT address)
106         :param ttl: TTL of generated packets
107         """
108         if dst_ip is None:
109             dst_ip = self.snat_addr
110         pkts = []
111         # TCP
112         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
113              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
114              TCP(dport=self.tcp_port_out, sport=20))
115         pkts.append(p)
116
117         # UDP
118         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
119              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
120              UDP(dport=self.udp_port_out, sport=20))
121         pkts.append(p)
122
123         # ICMP
124         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
125              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
126              ICMP(id=self.icmp_id_out, type='echo-reply'))
127         pkts.append(p)
128
129         return pkts
130
131     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
132                            packet_num=3, dst_ip=None):
133         """
134         Verify captured packets on outside network
135
136         :param capture: Captured packets
137         :param nat_ip: Translated IP address (Default use global SNAT address)
138         :param same_port: Sorce port number is not translated (Default False)
139         :param packet_num: Expected number of packets (Default 3)
140         :param dst_ip: Destination IP address (Default do not verify)
141         """
142         if nat_ip is None:
143             nat_ip = self.snat_addr
144         self.assertEqual(packet_num, len(capture))
145         for packet in capture:
146             try:
147                 self.assertEqual(packet[IP].src, nat_ip)
148                 if dst_ip is not None:
149                     self.assertEqual(packet[IP].dst, dst_ip)
150                 if packet.haslayer(TCP):
151                     if same_port:
152                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
153                     else:
154                         self.assertNotEqual(
155                             packet[TCP].sport, self.tcp_port_in)
156                     self.tcp_port_out = packet[TCP].sport
157                 elif packet.haslayer(UDP):
158                     if same_port:
159                         self.assertEqual(packet[UDP].sport, self.udp_port_in)
160                     else:
161                         self.assertNotEqual(
162                             packet[UDP].sport, self.udp_port_in)
163                     self.udp_port_out = packet[UDP].sport
164                 else:
165                     if same_port:
166                         self.assertEqual(packet[ICMP].id, self.icmp_id_in)
167                     else:
168                         self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
169                     self.icmp_id_out = packet[ICMP].id
170             except:
171                 self.logger.error(ppp("Unexpected or invalid packet "
172                                       "(outside network):", packet))
173                 raise
174
175     def verify_capture_in(self, capture, in_if, packet_num=3):
176         """
177         Verify captured packets on inside network
178
179         :param capture: Captured packets
180         :param in_if: Inside interface
181         :param packet_num: Expected number of packets (Default 3)
182         """
183         self.assertEqual(packet_num, len(capture))
184         for packet in capture:
185             try:
186                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
187                 if packet.haslayer(TCP):
188                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
189                 elif packet.haslayer(UDP):
190                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
191                 else:
192                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
193             except:
194                 self.logger.error(ppp("Unexpected or invalid packet "
195                                       "(inside network):", packet))
196                 raise
197
198     def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
199         """
200         Verify captured IPv6 packets on inside network
201
202         :param capture: Captured packets
203         :param src_ip: Source IP
204         :param dst_ip: Destination IP address
205         :param packet_num: Expected number of packets (Default 3)
206         """
207         self.assertEqual(packet_num, len(capture))
208         for packet in capture:
209             try:
210                 self.assertEqual(packet[IPv6].src, src_ip)
211                 self.assertEqual(packet[IPv6].dst, dst_ip)
212                 if packet.haslayer(TCP):
213                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
214                 elif packet.haslayer(UDP):
215                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
216                 else:
217                     self.assertEqual(packet[ICMPv6EchoReply].id,
218                                      self.icmp_id_in)
219             except:
220                 self.logger.error(ppp("Unexpected or invalid packet "
221                                       "(inside network):", packet))
222                 raise
223
224     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
225         """
226         Verify captured packet that don't have to be translated
227
228         :param capture: Captured packets
229         :param ingress_if: Ingress interface
230         :param egress_if: Egress interface
231         """
232         for packet in capture:
233             try:
234                 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
235                 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
236                 if packet.haslayer(TCP):
237                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
238                 elif packet.haslayer(UDP):
239                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
240                 else:
241                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
242             except:
243                 self.logger.error(ppp("Unexpected or invalid packet "
244                                       "(inside network):", packet))
245                 raise
246
247     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
248                                             packet_num=3, icmp_type=11):
249         """
250         Verify captured packets with ICMP errors on outside network
251
252         :param capture: Captured packets
253         :param src_ip: Translated IP address or IP address of VPP
254                        (Default use global SNAT address)
255         :param packet_num: Expected number of packets (Default 3)
256         :param icmp_type: Type of error ICMP packet
257                           we are expecting (Default 11)
258         """
259         if src_ip is None:
260             src_ip = self.snat_addr
261         self.assertEqual(packet_num, len(capture))
262         for packet in capture:
263             try:
264                 self.assertEqual(packet[IP].src, src_ip)
265                 self.assertTrue(packet.haslayer(ICMP))
266                 icmp = packet[ICMP]
267                 self.assertEqual(icmp.type, icmp_type)
268                 self.assertTrue(icmp.haslayer(IPerror))
269                 inner_ip = icmp[IPerror]
270                 if inner_ip.haslayer(TCPerror):
271                     self.assertEqual(inner_ip[TCPerror].dport,
272                                      self.tcp_port_out)
273                 elif inner_ip.haslayer(UDPerror):
274                     self.assertEqual(inner_ip[UDPerror].dport,
275                                      self.udp_port_out)
276                 else:
277                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
278             except:
279                 self.logger.error(ppp("Unexpected or invalid packet "
280                                       "(outside network):", packet))
281                 raise
282
283     def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
284                                            icmp_type=11):
285         """
286         Verify captured packets with ICMP errors on inside network
287
288         :param capture: Captured packets
289         :param in_if: Inside interface
290         :param packet_num: Expected number of packets (Default 3)
291         :param icmp_type: Type of error ICMP packet
292                           we are expecting (Default 11)
293         """
294         self.assertEqual(packet_num, len(capture))
295         for packet in capture:
296             try:
297                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
298                 self.assertTrue(packet.haslayer(ICMP))
299                 icmp = packet[ICMP]
300                 self.assertEqual(icmp.type, icmp_type)
301                 self.assertTrue(icmp.haslayer(IPerror))
302                 inner_ip = icmp[IPerror]
303                 if inner_ip.haslayer(TCPerror):
304                     self.assertEqual(inner_ip[TCPerror].sport,
305                                      self.tcp_port_in)
306                 elif inner_ip.haslayer(UDPerror):
307                     self.assertEqual(inner_ip[UDPerror].sport,
308                                      self.udp_port_in)
309                 else:
310                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
311             except:
312                 self.logger.error(ppp("Unexpected or invalid packet "
313                                       "(inside network):", packet))
314                 raise
315
316     def verify_ipfix_nat44_ses(self, data):
317         """
318         Verify IPFIX NAT44 session create/delete event
319
320         :param data: Decoded IPFIX data records
321         """
322         nat44_ses_create_num = 0
323         nat44_ses_delete_num = 0
324         self.assertEqual(6, len(data))
325         for record in data:
326             # natEvent
327             self.assertIn(ord(record[230]), [4, 5])
328             if ord(record[230]) == 4:
329                 nat44_ses_create_num += 1
330             else:
331                 nat44_ses_delete_num += 1
332             # sourceIPv4Address
333             self.assertEqual(self.pg0.remote_ip4n, record[8])
334             # postNATSourceIPv4Address
335             self.assertEqual(socket.inet_pton(socket.AF_INET, self.snat_addr),
336                              record[225])
337             # ingressVRFID
338             self.assertEqual(struct.pack("!I", 0), record[234])
339             # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
340             if IP_PROTOS.icmp == ord(record[4]):
341                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
342                 self.assertEqual(struct.pack("!H", self.icmp_id_out),
343                                  record[227])
344             elif IP_PROTOS.tcp == ord(record[4]):
345                 self.assertEqual(struct.pack("!H", self.tcp_port_in),
346                                  record[7])
347                 self.assertEqual(struct.pack("!H", self.tcp_port_out),
348                                  record[227])
349             elif IP_PROTOS.udp == ord(record[4]):
350                 self.assertEqual(struct.pack("!H", self.udp_port_in),
351                                  record[7])
352                 self.assertEqual(struct.pack("!H", self.udp_port_out),
353                                  record[227])
354             else:
355                 self.fail("Invalid protocol")
356         self.assertEqual(3, nat44_ses_create_num)
357         self.assertEqual(3, nat44_ses_delete_num)
358
359     def verify_ipfix_addr_exhausted(self, data):
360         """
361         Verify IPFIX NAT addresses event
362
363         :param data: Decoded IPFIX data records
364         """
365         self.assertEqual(1, len(data))
366         record = data[0]
367         # natEvent
368         self.assertEqual(ord(record[230]), 3)
369         # natPoolID
370         self.assertEqual(struct.pack("!I", 0), record[283])
371
372
373 class TestSNAT(MethodHolder):
374     """ SNAT Test Cases """
375
376     @classmethod
377     def setUpClass(cls):
378         super(TestSNAT, cls).setUpClass()
379
380         try:
381             cls.tcp_port_in = 6303
382             cls.tcp_port_out = 6303
383             cls.udp_port_in = 6304
384             cls.udp_port_out = 6304
385             cls.icmp_id_in = 6305
386             cls.icmp_id_out = 6305
387             cls.snat_addr = '10.0.0.3'
388             cls.ipfix_src_port = 4739
389             cls.ipfix_domain_id = 1
390
391             cls.create_pg_interfaces(range(9))
392             cls.interfaces = list(cls.pg_interfaces[0:4])
393
394             for i in cls.interfaces:
395                 i.admin_up()
396                 i.config_ip4()
397                 i.resolve_arp()
398
399             cls.pg0.generate_remote_hosts(3)
400             cls.pg0.configure_ipv4_neighbors()
401
402             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
403
404             cls.pg4._local_ip4 = "172.16.255.1"
405             cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
406             cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
407             cls.pg4.set_table_ip4(10)
408             cls.pg5._local_ip4 = "172.16.255.3"
409             cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
410             cls.pg5._remote_hosts[0]._ip4 = "172.16.255.4"
411             cls.pg5.set_table_ip4(10)
412             cls.pg6._local_ip4 = "172.16.255.1"
413             cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
414             cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
415             cls.pg6.set_table_ip4(20)
416             for i in cls.overlapping_interfaces:
417                 i.config_ip4()
418                 i.admin_up()
419                 i.resolve_arp()
420
421             cls.pg7.admin_up()
422             cls.pg8.admin_up()
423
424         except Exception:
425             super(TestSNAT, cls).tearDownClass()
426             raise
427
428     def clear_snat(self):
429         """
430         Clear SNAT configuration.
431         """
432         # I found no elegant way to do this
433         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
434                                    dst_address_length=32,
435                                    next_hop_address=self.pg7.remote_ip4n,
436                                    next_hop_sw_if_index=self.pg7.sw_if_index,
437                                    is_add=0)
438         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
439                                    dst_address_length=32,
440                                    next_hop_address=self.pg8.remote_ip4n,
441                                    next_hop_sw_if_index=self.pg8.sw_if_index,
442                                    is_add=0)
443
444         for intf in [self.pg7, self.pg8]:
445             neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
446             for n in neighbors:
447                 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
448                                               n.mac_address,
449                                               n.ip_address,
450                                               is_add=0)
451
452         if self.pg7.has_ip4_config:
453             self.pg7.unconfig_ip4()
454
455         interfaces = self.vapi.snat_interface_addr_dump()
456         for intf in interfaces:
457             self.vapi.snat_add_interface_addr(intf.sw_if_index, is_add=0)
458
459         self.vapi.snat_ipfix(enable=0, src_port=self.ipfix_src_port,
460                              domain_id=self.ipfix_domain_id)
461         self.ipfix_src_port = 4739
462         self.ipfix_domain_id = 1
463
464         interfaces = self.vapi.snat_interface_dump()
465         for intf in interfaces:
466             self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
467                                                      intf.is_inside,
468                                                      is_add=0)
469
470         static_mappings = self.vapi.snat_static_mapping_dump()
471         for sm in static_mappings:
472             self.vapi.snat_add_static_mapping(sm.local_ip_address,
473                                               sm.external_ip_address,
474                                               local_port=sm.local_port,
475                                               external_port=sm.external_port,
476                                               addr_only=sm.addr_only,
477                                               vrf_id=sm.vrf_id,
478                                               protocol=sm.protocol,
479                                               is_add=0)
480
481         adresses = self.vapi.snat_address_dump()
482         for addr in adresses:
483             self.vapi.snat_add_address_range(addr.ip_address,
484                                              addr.ip_address,
485                                              is_add=0)
486
487     def snat_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
488                                 local_port=0, external_port=0, vrf_id=0,
489                                 is_add=1, external_sw_if_index=0xFFFFFFFF,
490                                 proto=0):
491         """
492         Add/delete S-NAT static mapping
493
494         :param local_ip: Local IP address
495         :param external_ip: External IP address
496         :param local_port: Local port number (Optional)
497         :param external_port: External port number (Optional)
498         :param vrf_id: VRF ID (Default 0)
499         :param is_add: 1 if add, 0 if delete (Default add)
500         :param external_sw_if_index: External interface instead of IP address
501         :param proto: IP protocol (Mandatory if port specified)
502         """
503         addr_only = 1
504         if local_port and external_port:
505             addr_only = 0
506         l_ip = socket.inet_pton(socket.AF_INET, local_ip)
507         e_ip = socket.inet_pton(socket.AF_INET, external_ip)
508         self.vapi.snat_add_static_mapping(
509             l_ip,
510             e_ip,
511             external_sw_if_index,
512             local_port,
513             external_port,
514             addr_only,
515             vrf_id,
516             proto,
517             is_add)
518
519     def snat_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
520         """
521         Add/delete S-NAT address
522
523         :param ip: IP address
524         :param is_add: 1 if add, 0 if delete (Default add)
525         """
526         snat_addr = socket.inet_pton(socket.AF_INET, ip)
527         self.vapi.snat_add_address_range(snat_addr, snat_addr, is_add,
528                                          vrf_id=vrf_id)
529
530     def test_dynamic(self):
531         """ SNAT dynamic translation test """
532
533         self.snat_add_address(self.snat_addr)
534         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
535         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
536                                                  is_inside=0)
537
538         # in2out
539         pkts = self.create_stream_in(self.pg0, self.pg1)
540         self.pg0.add_stream(pkts)
541         self.pg_enable_capture(self.pg_interfaces)
542         self.pg_start()
543         capture = self.pg1.get_capture(len(pkts))
544         self.verify_capture_out(capture)
545
546         # out2in
547         pkts = self.create_stream_out(self.pg1)
548         self.pg1.add_stream(pkts)
549         self.pg_enable_capture(self.pg_interfaces)
550         self.pg_start()
551         capture = self.pg0.get_capture(len(pkts))
552         self.verify_capture_in(capture, self.pg0)
553
554     def test_dynamic_icmp_errors_in2out_ttl_1(self):
555         """ SNAT handling of client packets with TTL=1 """
556
557         self.snat_add_address(self.snat_addr)
558         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
559         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
560                                                  is_inside=0)
561
562         # Client side - generate traffic
563         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
564         self.pg0.add_stream(pkts)
565         self.pg_enable_capture(self.pg_interfaces)
566         self.pg_start()
567
568         # Client side - verify ICMP type 11 packets
569         capture = self.pg0.get_capture(len(pkts))
570         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
571
572     def test_dynamic_icmp_errors_out2in_ttl_1(self):
573         """ SNAT handling of server packets with TTL=1 """
574
575         self.snat_add_address(self.snat_addr)
576         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
577         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
578                                                  is_inside=0)
579
580         # Client side - create sessions
581         pkts = self.create_stream_in(self.pg0, self.pg1)
582         self.pg0.add_stream(pkts)
583         self.pg_enable_capture(self.pg_interfaces)
584         self.pg_start()
585
586         # Server side - generate traffic
587         capture = self.pg1.get_capture(len(pkts))
588         self.verify_capture_out(capture)
589         pkts = self.create_stream_out(self.pg1, ttl=1)
590         self.pg1.add_stream(pkts)
591         self.pg_enable_capture(self.pg_interfaces)
592         self.pg_start()
593
594         # Server side - verify ICMP type 11 packets
595         capture = self.pg1.get_capture(len(pkts))
596         self.verify_capture_out_with_icmp_errors(capture,
597                                                  src_ip=self.pg1.local_ip4)
598
599     def test_dynamic_icmp_errors_in2out_ttl_2(self):
600         """ SNAT handling of error responses to client packets with TTL=2 """
601
602         self.snat_add_address(self.snat_addr)
603         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
604         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
605                                                  is_inside=0)
606
607         # Client side - generate traffic
608         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
609         self.pg0.add_stream(pkts)
610         self.pg_enable_capture(self.pg_interfaces)
611         self.pg_start()
612
613         # Server side - simulate ICMP type 11 response
614         capture = self.pg1.get_capture(len(pkts))
615         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
616                 IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
617                 ICMP(type=11) / packet[IP] for packet in capture]
618         self.pg1.add_stream(pkts)
619         self.pg_enable_capture(self.pg_interfaces)
620         self.pg_start()
621
622         # Client side - verify ICMP type 11 packets
623         capture = self.pg0.get_capture(len(pkts))
624         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
625
626     def test_dynamic_icmp_errors_out2in_ttl_2(self):
627         """ SNAT handling of error responses to server packets with TTL=2 """
628
629         self.snat_add_address(self.snat_addr)
630         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
631         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
632                                                  is_inside=0)
633
634         # Client side - create sessions
635         pkts = self.create_stream_in(self.pg0, self.pg1)
636         self.pg0.add_stream(pkts)
637         self.pg_enable_capture(self.pg_interfaces)
638         self.pg_start()
639
640         # Server side - generate traffic
641         capture = self.pg1.get_capture(len(pkts))
642         self.verify_capture_out(capture)
643         pkts = self.create_stream_out(self.pg1, ttl=2)
644         self.pg1.add_stream(pkts)
645         self.pg_enable_capture(self.pg_interfaces)
646         self.pg_start()
647
648         # Client side - simulate ICMP type 11 response
649         capture = self.pg0.get_capture(len(pkts))
650         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
651                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
652                 ICMP(type=11) / packet[IP] for packet in capture]
653         self.pg0.add_stream(pkts)
654         self.pg_enable_capture(self.pg_interfaces)
655         self.pg_start()
656
657         # Server side - verify ICMP type 11 packets
658         capture = self.pg1.get_capture(len(pkts))
659         self.verify_capture_out_with_icmp_errors(capture)
660
661     def test_ping_out_interface_from_outside(self):
662         """ Ping SNAT out interface from outside network """
663
664         self.snat_add_address(self.snat_addr)
665         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
666         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
667                                                  is_inside=0)
668
669         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
670              IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
671              ICMP(id=self.icmp_id_out, type='echo-request'))
672         pkts = [p]
673         self.pg1.add_stream(pkts)
674         self.pg_enable_capture(self.pg_interfaces)
675         self.pg_start()
676         capture = self.pg1.get_capture(len(pkts))
677         self.assertEqual(1, len(capture))
678         packet = capture[0]
679         try:
680             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
681             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
682             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
683             self.assertEqual(packet[ICMP].type, 0)  # echo reply
684         except:
685             self.logger.error(ppp("Unexpected or invalid packet "
686                                   "(outside network):", packet))
687             raise
688
689     def test_ping_internal_host_from_outside(self):
690         """ Ping internal host from outside network """
691
692         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr)
693         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
694         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
695                                                  is_inside=0)
696
697         # out2in
698         pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
699                IP(src=self.pg1.remote_ip4, dst=self.snat_addr, ttl=64) /
700                ICMP(id=self.icmp_id_out, type='echo-request'))
701         self.pg1.add_stream(pkt)
702         self.pg_enable_capture(self.pg_interfaces)
703         self.pg_start()
704         capture = self.pg0.get_capture(1)
705         self.verify_capture_in(capture, self.pg0, packet_num=1)
706         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
707
708         # in2out
709         pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
710                IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
711                ICMP(id=self.icmp_id_in, type='echo-reply'))
712         self.pg0.add_stream(pkt)
713         self.pg_enable_capture(self.pg_interfaces)
714         self.pg_start()
715         capture = self.pg1.get_capture(1)
716         self.verify_capture_out(capture, same_port=True, packet_num=1)
717         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
718
719     def test_static_in(self):
720         """ SNAT 1:1 NAT initialized from inside network """
721
722         nat_ip = "10.0.0.10"
723         self.tcp_port_out = 6303
724         self.udp_port_out = 6304
725         self.icmp_id_out = 6305
726
727         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
728         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
729         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
730                                                  is_inside=0)
731
732         # in2out
733         pkts = self.create_stream_in(self.pg0, self.pg1)
734         self.pg0.add_stream(pkts)
735         self.pg_enable_capture(self.pg_interfaces)
736         self.pg_start()
737         capture = self.pg1.get_capture(len(pkts))
738         self.verify_capture_out(capture, nat_ip, True)
739
740         # out2in
741         pkts = self.create_stream_out(self.pg1, nat_ip)
742         self.pg1.add_stream(pkts)
743         self.pg_enable_capture(self.pg_interfaces)
744         self.pg_start()
745         capture = self.pg0.get_capture(len(pkts))
746         self.verify_capture_in(capture, self.pg0)
747
748     def test_static_out(self):
749         """ SNAT 1:1 NAT initialized from outside network """
750
751         nat_ip = "10.0.0.20"
752         self.tcp_port_out = 6303
753         self.udp_port_out = 6304
754         self.icmp_id_out = 6305
755
756         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip)
757         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
758         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
759                                                  is_inside=0)
760
761         # out2in
762         pkts = self.create_stream_out(self.pg1, nat_ip)
763         self.pg1.add_stream(pkts)
764         self.pg_enable_capture(self.pg_interfaces)
765         self.pg_start()
766         capture = self.pg0.get_capture(len(pkts))
767         self.verify_capture_in(capture, self.pg0)
768
769         # in2out
770         pkts = self.create_stream_in(self.pg0, self.pg1)
771         self.pg0.add_stream(pkts)
772         self.pg_enable_capture(self.pg_interfaces)
773         self.pg_start()
774         capture = self.pg1.get_capture(len(pkts))
775         self.verify_capture_out(capture, nat_ip, True)
776
777     def test_static_with_port_in(self):
778         """ SNAT 1:1 NAT with port initialized from inside network """
779
780         self.tcp_port_out = 3606
781         self.udp_port_out = 3607
782         self.icmp_id_out = 3608
783
784         self.snat_add_address(self.snat_addr)
785         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
786                                      self.tcp_port_in, self.tcp_port_out,
787                                      proto=IP_PROTOS.tcp)
788         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
789                                      self.udp_port_in, self.udp_port_out,
790                                      proto=IP_PROTOS.udp)
791         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
792                                      self.icmp_id_in, self.icmp_id_out,
793                                      proto=IP_PROTOS.icmp)
794         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
795         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
796                                                  is_inside=0)
797
798         # in2out
799         pkts = self.create_stream_in(self.pg0, self.pg1)
800         self.pg0.add_stream(pkts)
801         self.pg_enable_capture(self.pg_interfaces)
802         self.pg_start()
803         capture = self.pg1.get_capture(len(pkts))
804         self.verify_capture_out(capture)
805
806         # out2in
807         pkts = self.create_stream_out(self.pg1)
808         self.pg1.add_stream(pkts)
809         self.pg_enable_capture(self.pg_interfaces)
810         self.pg_start()
811         capture = self.pg0.get_capture(len(pkts))
812         self.verify_capture_in(capture, self.pg0)
813
814     def test_static_with_port_out(self):
815         """ SNAT 1:1 NAT with port initialized from outside network """
816
817         self.tcp_port_out = 30606
818         self.udp_port_out = 30607
819         self.icmp_id_out = 30608
820
821         self.snat_add_address(self.snat_addr)
822         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
823                                      self.tcp_port_in, self.tcp_port_out,
824                                      proto=IP_PROTOS.tcp)
825         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
826                                      self.udp_port_in, self.udp_port_out,
827                                      proto=IP_PROTOS.udp)
828         self.snat_add_static_mapping(self.pg0.remote_ip4, self.snat_addr,
829                                      self.icmp_id_in, self.icmp_id_out,
830                                      proto=IP_PROTOS.icmp)
831         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
832         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
833                                                  is_inside=0)
834
835         # out2in
836         pkts = self.create_stream_out(self.pg1)
837         self.pg1.add_stream(pkts)
838         self.pg_enable_capture(self.pg_interfaces)
839         self.pg_start()
840         capture = self.pg0.get_capture(len(pkts))
841         self.verify_capture_in(capture, self.pg0)
842
843         # in2out
844         pkts = self.create_stream_in(self.pg0, self.pg1)
845         self.pg0.add_stream(pkts)
846         self.pg_enable_capture(self.pg_interfaces)
847         self.pg_start()
848         capture = self.pg1.get_capture(len(pkts))
849         self.verify_capture_out(capture)
850
851     def test_static_vrf_aware(self):
852         """ SNAT 1:1 NAT VRF awareness """
853
854         nat_ip1 = "10.0.0.30"
855         nat_ip2 = "10.0.0.40"
856         self.tcp_port_out = 6303
857         self.udp_port_out = 6304
858         self.icmp_id_out = 6305
859
860         self.snat_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
861                                      vrf_id=10)
862         self.snat_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
863                                      vrf_id=10)
864         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
865                                                  is_inside=0)
866         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
867         self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
868
869         # inside interface VRF match SNAT static mapping VRF
870         pkts = self.create_stream_in(self.pg4, self.pg3)
871         self.pg4.add_stream(pkts)
872         self.pg_enable_capture(self.pg_interfaces)
873         self.pg_start()
874         capture = self.pg3.get_capture(len(pkts))
875         self.verify_capture_out(capture, nat_ip1, True)
876
877         # inside interface VRF don't match SNAT static mapping VRF (packets
878         # are dropped)
879         pkts = self.create_stream_in(self.pg0, self.pg3)
880         self.pg0.add_stream(pkts)
881         self.pg_enable_capture(self.pg_interfaces)
882         self.pg_start()
883         self.pg3.assert_nothing_captured()
884
885     def test_multiple_inside_interfaces(self):
886         """ SNAT multiple inside interfaces (non-overlapping address space) """
887
888         self.snat_add_address(self.snat_addr)
889         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
890         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
891         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
892                                                  is_inside=0)
893
894         # between two S-NAT inside interfaces (no translation)
895         pkts = self.create_stream_in(self.pg0, self.pg1)
896         self.pg0.add_stream(pkts)
897         self.pg_enable_capture(self.pg_interfaces)
898         self.pg_start()
899         capture = self.pg1.get_capture(len(pkts))
900         self.verify_capture_no_translation(capture, self.pg0, self.pg1)
901
902         # from S-NAT inside to interface without S-NAT feature (no translation)
903         pkts = self.create_stream_in(self.pg0, self.pg2)
904         self.pg0.add_stream(pkts)
905         self.pg_enable_capture(self.pg_interfaces)
906         self.pg_start()
907         capture = self.pg2.get_capture(len(pkts))
908         self.verify_capture_no_translation(capture, self.pg0, self.pg2)
909
910         # in2out 1st interface
911         pkts = self.create_stream_in(self.pg0, self.pg3)
912         self.pg0.add_stream(pkts)
913         self.pg_enable_capture(self.pg_interfaces)
914         self.pg_start()
915         capture = self.pg3.get_capture(len(pkts))
916         self.verify_capture_out(capture)
917
918         # out2in 1st interface
919         pkts = self.create_stream_out(self.pg3)
920         self.pg3.add_stream(pkts)
921         self.pg_enable_capture(self.pg_interfaces)
922         self.pg_start()
923         capture = self.pg0.get_capture(len(pkts))
924         self.verify_capture_in(capture, self.pg0)
925
926         # in2out 2nd interface
927         pkts = self.create_stream_in(self.pg1, self.pg3)
928         self.pg1.add_stream(pkts)
929         self.pg_enable_capture(self.pg_interfaces)
930         self.pg_start()
931         capture = self.pg3.get_capture(len(pkts))
932         self.verify_capture_out(capture)
933
934         # out2in 2nd interface
935         pkts = self.create_stream_out(self.pg3)
936         self.pg3.add_stream(pkts)
937         self.pg_enable_capture(self.pg_interfaces)
938         self.pg_start()
939         capture = self.pg1.get_capture(len(pkts))
940         self.verify_capture_in(capture, self.pg1)
941
942     def test_inside_overlapping_interfaces(self):
943         """ SNAT multiple inside interfaces with overlapping address space """
944
945         static_nat_ip = "10.0.0.10"
946         self.snat_add_address(self.snat_addr)
947         self.vapi.snat_interface_add_del_feature(self.pg3.sw_if_index,
948                                                  is_inside=0)
949         self.vapi.snat_interface_add_del_feature(self.pg4.sw_if_index)
950         self.vapi.snat_interface_add_del_feature(self.pg5.sw_if_index)
951         self.vapi.snat_interface_add_del_feature(self.pg6.sw_if_index)
952         self.snat_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
953                                      vrf_id=20)
954
955         # between S-NAT inside interfaces with same VRF (no translation)
956         pkts = self.create_stream_in(self.pg4, self.pg5)
957         self.pg4.add_stream(pkts)
958         self.pg_enable_capture(self.pg_interfaces)
959         self.pg_start()
960         capture = self.pg5.get_capture(len(pkts))
961         self.verify_capture_no_translation(capture, self.pg4, self.pg5)
962
963         # between S-NAT inside interfaces with different VRF (hairpinning)
964         p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
965              IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
966              TCP(sport=1234, dport=5678))
967         self.pg4.add_stream(p)
968         self.pg_enable_capture(self.pg_interfaces)
969         self.pg_start()
970         capture = self.pg6.get_capture(1)
971         p = capture[0]
972         try:
973             ip = p[IP]
974             tcp = p[TCP]
975             self.assertEqual(ip.src, self.snat_addr)
976             self.assertEqual(ip.dst, self.pg6.remote_ip4)
977             self.assertNotEqual(tcp.sport, 1234)
978             self.assertEqual(tcp.dport, 5678)
979         except:
980             self.logger.error(ppp("Unexpected or invalid packet:", p))
981             raise
982
983         # in2out 1st interface
984         pkts = self.create_stream_in(self.pg4, self.pg3)
985         self.pg4.add_stream(pkts)
986         self.pg_enable_capture(self.pg_interfaces)
987         self.pg_start()
988         capture = self.pg3.get_capture(len(pkts))
989         self.verify_capture_out(capture)
990
991         # out2in 1st interface
992         pkts = self.create_stream_out(self.pg3)
993         self.pg3.add_stream(pkts)
994         self.pg_enable_capture(self.pg_interfaces)
995         self.pg_start()
996         capture = self.pg4.get_capture(len(pkts))
997         self.verify_capture_in(capture, self.pg4)
998
999         # in2out 2nd interface
1000         pkts = self.create_stream_in(self.pg5, self.pg3)
1001         self.pg5.add_stream(pkts)
1002         self.pg_enable_capture(self.pg_interfaces)
1003         self.pg_start()
1004         capture = self.pg3.get_capture(len(pkts))
1005         self.verify_capture_out(capture)
1006
1007         # out2in 2nd interface
1008         pkts = self.create_stream_out(self.pg3)
1009         self.pg3.add_stream(pkts)
1010         self.pg_enable_capture(self.pg_interfaces)
1011         self.pg_start()
1012         capture = self.pg5.get_capture(len(pkts))
1013         self.verify_capture_in(capture, self.pg5)
1014
1015         # pg5 session dump
1016         addresses = self.vapi.snat_address_dump()
1017         self.assertEqual(len(addresses), 1)
1018         sessions = self.vapi.snat_user_session_dump(self.pg5.remote_ip4n, 10)
1019         self.assertEqual(len(sessions), 3)
1020         for session in sessions:
1021             self.assertFalse(session.is_static)
1022             self.assertEqual(session.inside_ip_address[0:4],
1023                              self.pg5.remote_ip4n)
1024             self.assertEqual(session.outside_ip_address,
1025                              addresses[0].ip_address)
1026         self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1027         self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1028         self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1029         self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1030         self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1031         self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1032         self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1033         self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1034         self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1035
1036         # in2out 3rd interface
1037         pkts = self.create_stream_in(self.pg6, self.pg3)
1038         self.pg6.add_stream(pkts)
1039         self.pg_enable_capture(self.pg_interfaces)
1040         self.pg_start()
1041         capture = self.pg3.get_capture(len(pkts))
1042         self.verify_capture_out(capture, static_nat_ip, True)
1043
1044         # out2in 3rd interface
1045         pkts = self.create_stream_out(self.pg3, static_nat_ip)
1046         self.pg3.add_stream(pkts)
1047         self.pg_enable_capture(self.pg_interfaces)
1048         self.pg_start()
1049         capture = self.pg6.get_capture(len(pkts))
1050         self.verify_capture_in(capture, self.pg6)
1051
1052         # general user and session dump verifications
1053         users = self.vapi.snat_user_dump()
1054         self.assertTrue(len(users) >= 3)
1055         addresses = self.vapi.snat_address_dump()
1056         self.assertEqual(len(addresses), 1)
1057         for user in users:
1058             sessions = self.vapi.snat_user_session_dump(user.ip_address,
1059                                                         user.vrf_id)
1060             for session in sessions:
1061                 self.assertEqual(user.ip_address, session.inside_ip_address)
1062                 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1063                 self.assertTrue(session.protocol in
1064                                 [IP_PROTOS.tcp, IP_PROTOS.udp,
1065                                  IP_PROTOS.icmp])
1066
1067         # pg4 session dump
1068         sessions = self.vapi.snat_user_session_dump(self.pg4.remote_ip4n, 10)
1069         self.assertTrue(len(sessions) >= 4)
1070         for session in sessions:
1071             self.assertFalse(session.is_static)
1072             self.assertEqual(session.inside_ip_address[0:4],
1073                              self.pg4.remote_ip4n)
1074             self.assertEqual(session.outside_ip_address,
1075                              addresses[0].ip_address)
1076
1077         # pg6 session dump
1078         sessions = self.vapi.snat_user_session_dump(self.pg6.remote_ip4n, 20)
1079         self.assertTrue(len(sessions) >= 3)
1080         for session in sessions:
1081             self.assertTrue(session.is_static)
1082             self.assertEqual(session.inside_ip_address[0:4],
1083                              self.pg6.remote_ip4n)
1084             self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1085                              map(int, static_nat_ip.split('.')))
1086             self.assertTrue(session.inside_port in
1087                             [self.tcp_port_in, self.udp_port_in,
1088                              self.icmp_id_in])
1089
1090     def test_hairpinning(self):
1091         """ SNAT hairpinning - 1:1 NAT with port"""
1092
1093         host = self.pg0.remote_hosts[0]
1094         server = self.pg0.remote_hosts[1]
1095         host_in_port = 1234
1096         host_out_port = 0
1097         server_in_port = 5678
1098         server_out_port = 8765
1099
1100         self.snat_add_address(self.snat_addr)
1101         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1102         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1103                                                  is_inside=0)
1104         # add static mapping for server
1105         self.snat_add_static_mapping(server.ip4, self.snat_addr,
1106                                      server_in_port, server_out_port,
1107                                      proto=IP_PROTOS.tcp)
1108
1109         # send packet from host to server
1110         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1111              IP(src=host.ip4, dst=self.snat_addr) /
1112              TCP(sport=host_in_port, dport=server_out_port))
1113         self.pg0.add_stream(p)
1114         self.pg_enable_capture(self.pg_interfaces)
1115         self.pg_start()
1116         capture = self.pg0.get_capture(1)
1117         p = capture[0]
1118         try:
1119             ip = p[IP]
1120             tcp = p[TCP]
1121             self.assertEqual(ip.src, self.snat_addr)
1122             self.assertEqual(ip.dst, server.ip4)
1123             self.assertNotEqual(tcp.sport, host_in_port)
1124             self.assertEqual(tcp.dport, server_in_port)
1125             self.check_tcp_checksum(p)
1126             host_out_port = tcp.sport
1127         except:
1128             self.logger.error(ppp("Unexpected or invalid packet:", p))
1129             raise
1130
1131         # send reply from server to host
1132         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1133              IP(src=server.ip4, dst=self.snat_addr) /
1134              TCP(sport=server_in_port, dport=host_out_port))
1135         self.pg0.add_stream(p)
1136         self.pg_enable_capture(self.pg_interfaces)
1137         self.pg_start()
1138         capture = self.pg0.get_capture(1)
1139         p = capture[0]
1140         try:
1141             ip = p[IP]
1142             tcp = p[TCP]
1143             self.assertEqual(ip.src, self.snat_addr)
1144             self.assertEqual(ip.dst, host.ip4)
1145             self.assertEqual(tcp.sport, server_out_port)
1146             self.assertEqual(tcp.dport, host_in_port)
1147             self.check_tcp_checksum(p)
1148         except:
1149             self.logger.error(ppp("Unexpected or invalid packet:"), p)
1150             raise
1151
1152     def test_hairpinning2(self):
1153         """ SNAT hairpinning - 1:1 NAT"""
1154
1155         server1_nat_ip = "10.0.0.10"
1156         server2_nat_ip = "10.0.0.11"
1157         host = self.pg0.remote_hosts[0]
1158         server1 = self.pg0.remote_hosts[1]
1159         server2 = self.pg0.remote_hosts[2]
1160         server_tcp_port = 22
1161         server_udp_port = 20
1162
1163         self.snat_add_address(self.snat_addr)
1164         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1165         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1166                                                  is_inside=0)
1167
1168         # add static mapping for servers
1169         self.snat_add_static_mapping(server1.ip4, server1_nat_ip)
1170         self.snat_add_static_mapping(server2.ip4, server2_nat_ip)
1171
1172         # host to server1
1173         pkts = []
1174         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1175              IP(src=host.ip4, dst=server1_nat_ip) /
1176              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1177         pkts.append(p)
1178         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1179              IP(src=host.ip4, dst=server1_nat_ip) /
1180              UDP(sport=self.udp_port_in, dport=server_udp_port))
1181         pkts.append(p)
1182         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1183              IP(src=host.ip4, dst=server1_nat_ip) /
1184              ICMP(id=self.icmp_id_in, type='echo-request'))
1185         pkts.append(p)
1186         self.pg0.add_stream(pkts)
1187         self.pg_enable_capture(self.pg_interfaces)
1188         self.pg_start()
1189         capture = self.pg0.get_capture(len(pkts))
1190         for packet in capture:
1191             try:
1192                 self.assertEqual(packet[IP].src, self.snat_addr)
1193                 self.assertEqual(packet[IP].dst, server1.ip4)
1194                 if packet.haslayer(TCP):
1195                     self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1196                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1197                     self.tcp_port_out = packet[TCP].sport
1198                     self.check_tcp_checksum(packet)
1199                 elif packet.haslayer(UDP):
1200                     self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1201                     self.assertEqual(packet[UDP].dport, server_udp_port)
1202                     self.udp_port_out = packet[UDP].sport
1203                 else:
1204                     self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1205                     self.icmp_id_out = packet[ICMP].id
1206             except:
1207                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1208                 raise
1209
1210         # server1 to host
1211         pkts = []
1212         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1213              IP(src=server1.ip4, dst=self.snat_addr) /
1214              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1215         pkts.append(p)
1216         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1217              IP(src=server1.ip4, dst=self.snat_addr) /
1218              UDP(sport=server_udp_port, dport=self.udp_port_out))
1219         pkts.append(p)
1220         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1221              IP(src=server1.ip4, dst=self.snat_addr) /
1222              ICMP(id=self.icmp_id_out, type='echo-reply'))
1223         pkts.append(p)
1224         self.pg0.add_stream(pkts)
1225         self.pg_enable_capture(self.pg_interfaces)
1226         self.pg_start()
1227         capture = self.pg0.get_capture(len(pkts))
1228         for packet in capture:
1229             try:
1230                 self.assertEqual(packet[IP].src, server1_nat_ip)
1231                 self.assertEqual(packet[IP].dst, host.ip4)
1232                 if packet.haslayer(TCP):
1233                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1234                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1235                     self.check_tcp_checksum(packet)
1236                 elif packet.haslayer(UDP):
1237                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1238                     self.assertEqual(packet[UDP].sport, server_udp_port)
1239                 else:
1240                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1241             except:
1242                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1243                 raise
1244
1245         # server2 to server1
1246         pkts = []
1247         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1248              IP(src=server2.ip4, dst=server1_nat_ip) /
1249              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1250         pkts.append(p)
1251         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1252              IP(src=server2.ip4, dst=server1_nat_ip) /
1253              UDP(sport=self.udp_port_in, dport=server_udp_port))
1254         pkts.append(p)
1255         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1256              IP(src=server2.ip4, dst=server1_nat_ip) /
1257              ICMP(id=self.icmp_id_in, type='echo-request'))
1258         pkts.append(p)
1259         self.pg0.add_stream(pkts)
1260         self.pg_enable_capture(self.pg_interfaces)
1261         self.pg_start()
1262         capture = self.pg0.get_capture(len(pkts))
1263         for packet in capture:
1264             try:
1265                 self.assertEqual(packet[IP].src, server2_nat_ip)
1266                 self.assertEqual(packet[IP].dst, server1.ip4)
1267                 if packet.haslayer(TCP):
1268                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1269                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1270                     self.tcp_port_out = packet[TCP].sport
1271                     self.check_tcp_checksum(packet)
1272                 elif packet.haslayer(UDP):
1273                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
1274                     self.assertEqual(packet[UDP].dport, server_udp_port)
1275                     self.udp_port_out = packet[UDP].sport
1276                 else:
1277                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1278                     self.icmp_id_out = packet[ICMP].id
1279             except:
1280                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1281                 raise
1282
1283         # server1 to server2
1284         pkts = []
1285         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1286              IP(src=server1.ip4, dst=server2_nat_ip) /
1287              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1288         pkts.append(p)
1289         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1290              IP(src=server1.ip4, dst=server2_nat_ip) /
1291              UDP(sport=server_udp_port, dport=self.udp_port_out))
1292         pkts.append(p)
1293         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1294              IP(src=server1.ip4, dst=server2_nat_ip) /
1295              ICMP(id=self.icmp_id_out, type='echo-reply'))
1296         pkts.append(p)
1297         self.pg0.add_stream(pkts)
1298         self.pg_enable_capture(self.pg_interfaces)
1299         self.pg_start()
1300         capture = self.pg0.get_capture(len(pkts))
1301         for packet in capture:
1302             try:
1303                 self.assertEqual(packet[IP].src, server1_nat_ip)
1304                 self.assertEqual(packet[IP].dst, server2.ip4)
1305                 if packet.haslayer(TCP):
1306                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1307                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1308                     self.check_tcp_checksum(packet)
1309                 elif packet.haslayer(UDP):
1310                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1311                     self.assertEqual(packet[UDP].sport, server_udp_port)
1312                 else:
1313                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1314             except:
1315                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1316                 raise
1317
1318     def test_max_translations_per_user(self):
1319         """ MAX translations per user - recycle the least recently used """
1320
1321         self.snat_add_address(self.snat_addr)
1322         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1323         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1324                                                  is_inside=0)
1325
1326         # get maximum number of translations per user
1327         snat_config = self.vapi.snat_show_config()
1328
1329         # send more than maximum number of translations per user packets
1330         pkts_num = snat_config.max_translations_per_user + 5
1331         pkts = []
1332         for port in range(0, pkts_num):
1333             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1334                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1335                  TCP(sport=1025 + port))
1336             pkts.append(p)
1337         self.pg0.add_stream(pkts)
1338         self.pg_enable_capture(self.pg_interfaces)
1339         self.pg_start()
1340
1341         # verify number of translated packet
1342         self.pg1.get_capture(pkts_num)
1343
1344     def test_interface_addr(self):
1345         """ Acquire SNAT addresses from interface """
1346         self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1347
1348         # no address in NAT pool
1349         adresses = self.vapi.snat_address_dump()
1350         self.assertEqual(0, len(adresses))
1351
1352         # configure interface address and check NAT address pool
1353         self.pg7.config_ip4()
1354         adresses = self.vapi.snat_address_dump()
1355         self.assertEqual(1, len(adresses))
1356         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1357
1358         # remove interface address and check NAT address pool
1359         self.pg7.unconfig_ip4()
1360         adresses = self.vapi.snat_address_dump()
1361         self.assertEqual(0, len(adresses))
1362
1363     def test_interface_addr_static_mapping(self):
1364         """ Static mapping with addresses from interface """
1365         self.vapi.snat_add_interface_addr(self.pg7.sw_if_index)
1366         self.snat_add_static_mapping('1.2.3.4',
1367                                      external_sw_if_index=self.pg7.sw_if_index)
1368
1369         # static mappings with external interface
1370         static_mappings = self.vapi.snat_static_mapping_dump()
1371         self.assertEqual(1, len(static_mappings))
1372         self.assertEqual(self.pg7.sw_if_index,
1373                          static_mappings[0].external_sw_if_index)
1374
1375         # configure interface address and check static mappings
1376         self.pg7.config_ip4()
1377         static_mappings = self.vapi.snat_static_mapping_dump()
1378         self.assertEqual(1, len(static_mappings))
1379         self.assertEqual(static_mappings[0].external_ip_address[0:4],
1380                          self.pg7.local_ip4n)
1381         self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1382
1383         # remove interface address and check static mappings
1384         self.pg7.unconfig_ip4()
1385         static_mappings = self.vapi.snat_static_mapping_dump()
1386         self.assertEqual(0, len(static_mappings))
1387
1388     def test_ipfix_nat44_sess(self):
1389         """ S-NAT IPFIX logging NAT44 session created/delted """
1390         self.ipfix_domain_id = 10
1391         self.ipfix_src_port = 20202
1392         colector_port = 30303
1393         bind_layers(UDP, IPFIX, dport=30303)
1394         self.snat_add_address(self.snat_addr)
1395         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1396         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1397                                                  is_inside=0)
1398         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1399                                      src_address=self.pg3.local_ip4n,
1400                                      path_mtu=512,
1401                                      template_interval=10,
1402                                      collector_port=colector_port)
1403         self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
1404                              src_port=self.ipfix_src_port)
1405
1406         pkts = self.create_stream_in(self.pg0, self.pg1)
1407         self.pg0.add_stream(pkts)
1408         self.pg_enable_capture(self.pg_interfaces)
1409         self.pg_start()
1410         capture = self.pg1.get_capture(len(pkts))
1411         self.verify_capture_out(capture)
1412         self.snat_add_address(self.snat_addr, is_add=0)
1413         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1414         capture = self.pg3.get_capture(3)
1415         ipfix = IPFIXDecoder()
1416         # first load template
1417         for p in capture:
1418             self.assertTrue(p.haslayer(IPFIX))
1419             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1420             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1421             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1422             self.assertEqual(p[UDP].dport, colector_port)
1423             self.assertEqual(p[IPFIX].observationDomainID,
1424                              self.ipfix_domain_id)
1425             if p.haslayer(Template):
1426                 ipfix.add_template(p.getlayer(Template))
1427         # verify events in data set
1428         for p in capture:
1429             if p.haslayer(Data):
1430                 data = ipfix.decode_data_set(p.getlayer(Set))
1431                 self.verify_ipfix_nat44_ses(data)
1432
1433     def test_ipfix_addr_exhausted(self):
1434         """ S-NAT IPFIX logging NAT addresses exhausted """
1435         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1436         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1437                                                  is_inside=0)
1438         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1439                                      src_address=self.pg3.local_ip4n,
1440                                      path_mtu=512,
1441                                      template_interval=10)
1442         self.vapi.snat_ipfix(domain_id=self.ipfix_domain_id,
1443                              src_port=self.ipfix_src_port)
1444
1445         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1446              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1447              TCP(sport=3025))
1448         self.pg0.add_stream(p)
1449         self.pg_enable_capture(self.pg_interfaces)
1450         self.pg_start()
1451         capture = self.pg1.get_capture(0)
1452         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1453         capture = self.pg3.get_capture(3)
1454         ipfix = IPFIXDecoder()
1455         # first load template
1456         for p in capture:
1457             self.assertTrue(p.haslayer(IPFIX))
1458             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1459             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1460             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1461             self.assertEqual(p[UDP].dport, 4739)
1462             self.assertEqual(p[IPFIX].observationDomainID,
1463                              self.ipfix_domain_id)
1464             if p.haslayer(Template):
1465                 ipfix.add_template(p.getlayer(Template))
1466         # verify events in data set
1467         for p in capture:
1468             if p.haslayer(Data):
1469                 data = ipfix.decode_data_set(p.getlayer(Set))
1470                 self.verify_ipfix_addr_exhausted(data)
1471
1472     def test_pool_addr_fib(self):
1473         """ S-NAT add pool addresses to FIB """
1474         static_addr = '10.0.0.10'
1475         self.snat_add_address(self.snat_addr)
1476         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1477         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1478                                                  is_inside=0)
1479         self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr)
1480
1481         # SNAT address
1482         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1483              ARP(op=ARP.who_has, pdst=self.snat_addr,
1484                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1485         self.pg1.add_stream(p)
1486         self.pg_enable_capture(self.pg_interfaces)
1487         self.pg_start()
1488         capture = self.pg1.get_capture(1)
1489         self.assertTrue(capture[0].haslayer(ARP))
1490         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1491
1492         # 1:1 NAT address
1493         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1494              ARP(op=ARP.who_has, pdst=static_addr,
1495                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1496         self.pg1.add_stream(p)
1497         self.pg_enable_capture(self.pg_interfaces)
1498         self.pg_start()
1499         capture = self.pg1.get_capture(1)
1500         self.assertTrue(capture[0].haslayer(ARP))
1501         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1502
1503         # send ARP to non-SNAT interface
1504         p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1505              ARP(op=ARP.who_has, pdst=self.snat_addr,
1506                  psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1507         self.pg2.add_stream(p)
1508         self.pg_enable_capture(self.pg_interfaces)
1509         self.pg_start()
1510         capture = self.pg1.get_capture(0)
1511
1512         # remove addresses and verify
1513         self.snat_add_address(self.snat_addr, is_add=0)
1514         self.snat_add_static_mapping(self.pg0.remote_ip4, static_addr,
1515                                      is_add=0)
1516
1517         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1518              ARP(op=ARP.who_has, pdst=self.snat_addr,
1519                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1520         self.pg1.add_stream(p)
1521         self.pg_enable_capture(self.pg_interfaces)
1522         self.pg_start()
1523         capture = self.pg1.get_capture(0)
1524
1525         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1526              ARP(op=ARP.who_has, pdst=static_addr,
1527                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1528         self.pg1.add_stream(p)
1529         self.pg_enable_capture(self.pg_interfaces)
1530         self.pg_start()
1531         capture = self.pg1.get_capture(0)
1532
1533     def test_vrf_mode(self):
1534         """ S-NAT tenant VRF aware address pool mode """
1535
1536         vrf_id1 = 1
1537         vrf_id2 = 2
1538         nat_ip1 = "10.0.0.10"
1539         nat_ip2 = "10.0.0.11"
1540
1541         self.pg0.unconfig_ip4()
1542         self.pg1.unconfig_ip4()
1543         self.pg0.set_table_ip4(vrf_id1)
1544         self.pg1.set_table_ip4(vrf_id2)
1545         self.pg0.config_ip4()
1546         self.pg1.config_ip4()
1547
1548         self.snat_add_address(nat_ip1, vrf_id=vrf_id1)
1549         self.snat_add_address(nat_ip2, vrf_id=vrf_id2)
1550         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1551         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1552         self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1553                                                  is_inside=0)
1554
1555         # first VRF
1556         pkts = self.create_stream_in(self.pg0, self.pg2)
1557         self.pg0.add_stream(pkts)
1558         self.pg_enable_capture(self.pg_interfaces)
1559         self.pg_start()
1560         capture = self.pg2.get_capture(len(pkts))
1561         self.verify_capture_out(capture, nat_ip1)
1562
1563         # second VRF
1564         pkts = self.create_stream_in(self.pg1, self.pg2)
1565         self.pg1.add_stream(pkts)
1566         self.pg_enable_capture(self.pg_interfaces)
1567         self.pg_start()
1568         capture = self.pg2.get_capture(len(pkts))
1569         self.verify_capture_out(capture, nat_ip2)
1570
1571     def test_vrf_feature_independent(self):
1572         """ S-NAT tenant VRF independent address pool mode """
1573
1574         nat_ip1 = "10.0.0.10"
1575         nat_ip2 = "10.0.0.11"
1576
1577         self.snat_add_address(nat_ip1)
1578         self.snat_add_address(nat_ip2)
1579         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1580         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index)
1581         self.vapi.snat_interface_add_del_feature(self.pg2.sw_if_index,
1582                                                  is_inside=0)
1583
1584         # first VRF
1585         pkts = self.create_stream_in(self.pg0, self.pg2)
1586         self.pg0.add_stream(pkts)
1587         self.pg_enable_capture(self.pg_interfaces)
1588         self.pg_start()
1589         capture = self.pg2.get_capture(len(pkts))
1590         self.verify_capture_out(capture, nat_ip1)
1591
1592         # second VRF
1593         pkts = self.create_stream_in(self.pg1, self.pg2)
1594         self.pg1.add_stream(pkts)
1595         self.pg_enable_capture(self.pg_interfaces)
1596         self.pg_start()
1597         capture = self.pg2.get_capture(len(pkts))
1598         self.verify_capture_out(capture, nat_ip1)
1599
1600     def test_dynamic_ipless_interfaces(self):
1601         """ SNAT interfaces without configured ip dynamic map """
1602
1603         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1604                                       self.pg7.remote_mac,
1605                                       self.pg7.remote_ip4n,
1606                                       is_static=1)
1607         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1608                                       self.pg8.remote_mac,
1609                                       self.pg8.remote_ip4n,
1610                                       is_static=1)
1611
1612         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1613                                    dst_address_length=32,
1614                                    next_hop_address=self.pg7.remote_ip4n,
1615                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1616         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1617                                    dst_address_length=32,
1618                                    next_hop_address=self.pg8.remote_ip4n,
1619                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1620
1621         self.snat_add_address(self.snat_addr)
1622         self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1623         self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1624                                                  is_inside=0)
1625
1626         # in2out
1627         pkts = self.create_stream_in(self.pg7, self.pg8)
1628         self.pg7.add_stream(pkts)
1629         self.pg_enable_capture(self.pg_interfaces)
1630         self.pg_start()
1631         capture = self.pg8.get_capture(len(pkts))
1632         self.verify_capture_out(capture)
1633
1634         # out2in
1635         pkts = self.create_stream_out(self.pg8, self.snat_addr)
1636         self.pg8.add_stream(pkts)
1637         self.pg_enable_capture(self.pg_interfaces)
1638         self.pg_start()
1639         capture = self.pg7.get_capture(len(pkts))
1640         self.verify_capture_in(capture, self.pg7)
1641
1642     def test_static_ipless_interfaces(self):
1643         """ SNAT 1:1 NAT interfaces without configured ip """
1644
1645         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1646                                       self.pg7.remote_mac,
1647                                       self.pg7.remote_ip4n,
1648                                       is_static=1)
1649         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1650                                       self.pg8.remote_mac,
1651                                       self.pg8.remote_ip4n,
1652                                       is_static=1)
1653
1654         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1655                                    dst_address_length=32,
1656                                    next_hop_address=self.pg7.remote_ip4n,
1657                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1658         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1659                                    dst_address_length=32,
1660                                    next_hop_address=self.pg8.remote_ip4n,
1661                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1662
1663         self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr)
1664         self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1665         self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1666                                                  is_inside=0)
1667
1668         # out2in
1669         pkts = self.create_stream_out(self.pg8)
1670         self.pg8.add_stream(pkts)
1671         self.pg_enable_capture(self.pg_interfaces)
1672         self.pg_start()
1673         capture = self.pg7.get_capture(len(pkts))
1674         self.verify_capture_in(capture, self.pg7)
1675
1676         # in2out
1677         pkts = self.create_stream_in(self.pg7, self.pg8)
1678         self.pg7.add_stream(pkts)
1679         self.pg_enable_capture(self.pg_interfaces)
1680         self.pg_start()
1681         capture = self.pg8.get_capture(len(pkts))
1682         self.verify_capture_out(capture, self.snat_addr, True)
1683
1684     def test_static_with_port_ipless_interfaces(self):
1685         """ SNAT 1:1 NAT with port interfaces without configured ip """
1686
1687         self.tcp_port_out = 30606
1688         self.udp_port_out = 30607
1689         self.icmp_id_out = 30608
1690
1691         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1692                                       self.pg7.remote_mac,
1693                                       self.pg7.remote_ip4n,
1694                                       is_static=1)
1695         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1696                                       self.pg8.remote_mac,
1697                                       self.pg8.remote_ip4n,
1698                                       is_static=1)
1699
1700         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1701                                    dst_address_length=32,
1702                                    next_hop_address=self.pg7.remote_ip4n,
1703                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1704         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1705                                    dst_address_length=32,
1706                                    next_hop_address=self.pg8.remote_ip4n,
1707                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1708
1709         self.snat_add_address(self.snat_addr)
1710         self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1711                                      self.tcp_port_in, self.tcp_port_out,
1712                                      proto=IP_PROTOS.tcp)
1713         self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1714                                      self.udp_port_in, self.udp_port_out,
1715                                      proto=IP_PROTOS.udp)
1716         self.snat_add_static_mapping(self.pg7.remote_ip4, self.snat_addr,
1717                                      self.icmp_id_in, self.icmp_id_out,
1718                                      proto=IP_PROTOS.icmp)
1719         self.vapi.snat_interface_add_del_feature(self.pg7.sw_if_index)
1720         self.vapi.snat_interface_add_del_feature(self.pg8.sw_if_index,
1721                                                  is_inside=0)
1722
1723         # out2in
1724         pkts = self.create_stream_out(self.pg8)
1725         self.pg8.add_stream(pkts)
1726         self.pg_enable_capture(self.pg_interfaces)
1727         self.pg_start()
1728         capture = self.pg7.get_capture(len(pkts))
1729         self.verify_capture_in(capture, self.pg7)
1730
1731         # in2out
1732         pkts = self.create_stream_in(self.pg7, self.pg8)
1733         self.pg7.add_stream(pkts)
1734         self.pg_enable_capture(self.pg_interfaces)
1735         self.pg_start()
1736         capture = self.pg8.get_capture(len(pkts))
1737         self.verify_capture_out(capture)
1738
1739     def tearDown(self):
1740         super(TestSNAT, self).tearDown()
1741         if not self.vpp_dead:
1742             self.logger.info(self.vapi.cli("show snat verbose"))
1743             self.clear_snat()
1744
1745
1746 class TestDeterministicNAT(MethodHolder):
1747     """ Deterministic NAT Test Cases """
1748
1749     @classmethod
1750     def setUpConstants(cls):
1751         super(TestDeterministicNAT, cls).setUpConstants()
1752         cls.vpp_cmdline.extend(["snat", "{", "deterministic", "}"])
1753
1754     @classmethod
1755     def setUpClass(cls):
1756         super(TestDeterministicNAT, cls).setUpClass()
1757
1758         try:
1759             cls.tcp_port_in = 6303
1760             cls.tcp_external_port = 6303
1761             cls.udp_port_in = 6304
1762             cls.udp_external_port = 6304
1763             cls.icmp_id_in = 6305
1764             cls.snat_addr = '10.0.0.3'
1765
1766             cls.create_pg_interfaces(range(3))
1767             cls.interfaces = list(cls.pg_interfaces)
1768
1769             for i in cls.interfaces:
1770                 i.admin_up()
1771                 i.config_ip4()
1772                 i.resolve_arp()
1773
1774             cls.pg0.generate_remote_hosts(2)
1775             cls.pg0.configure_ipv4_neighbors()
1776
1777         except Exception:
1778             super(TestDeterministicNAT, cls).tearDownClass()
1779             raise
1780
1781     def create_stream_in(self, in_if, out_if, ttl=64):
1782         """
1783         Create packet stream for inside network
1784
1785         :param in_if: Inside interface
1786         :param out_if: Outside interface
1787         :param ttl: TTL of generated packets
1788         """
1789         pkts = []
1790         # TCP
1791         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1792              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1793              TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
1794         pkts.append(p)
1795
1796         # UDP
1797         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1798              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1799              UDP(sport=self.udp_port_in, dport=self.udp_external_port))
1800         pkts.append(p)
1801
1802         # ICMP
1803         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
1804              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
1805              ICMP(id=self.icmp_id_in, type='echo-request'))
1806         pkts.append(p)
1807
1808         return pkts
1809
1810     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
1811         """
1812         Create packet stream for outside network
1813
1814         :param out_if: Outside interface
1815         :param dst_ip: Destination IP address (Default use global SNAT address)
1816         :param ttl: TTL of generated packets
1817         """
1818         if dst_ip is None:
1819             dst_ip = self.snat_addr
1820         pkts = []
1821         # TCP
1822         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1823              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1824              TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
1825         pkts.append(p)
1826
1827         # UDP
1828         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1829              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1830              UDP(dport=self.udp_port_out, sport=self.udp_external_port))
1831         pkts.append(p)
1832
1833         # ICMP
1834         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
1835              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
1836              ICMP(id=self.icmp_external_id, type='echo-reply'))
1837         pkts.append(p)
1838
1839         return pkts
1840
1841     def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
1842         """
1843         Verify captured packets on outside network
1844
1845         :param capture: Captured packets
1846         :param nat_ip: Translated IP address (Default use global SNAT address)
1847         :param same_port: Sorce port number is not translated (Default False)
1848         :param packet_num: Expected number of packets (Default 3)
1849         """
1850         if nat_ip is None:
1851             nat_ip = self.snat_addr
1852         self.assertEqual(packet_num, len(capture))
1853         for packet in capture:
1854             try:
1855                 self.assertEqual(packet[IP].src, nat_ip)
1856                 if packet.haslayer(TCP):
1857                     self.tcp_port_out = packet[TCP].sport
1858                 elif packet.haslayer(UDP):
1859                     self.udp_port_out = packet[UDP].sport
1860                 else:
1861                     self.icmp_external_id = packet[ICMP].id
1862             except:
1863                 self.logger.error(ppp("Unexpected or invalid packet "
1864                                       "(outside network):", packet))
1865                 raise
1866
1867     def initiate_tcp_session(self, in_if, out_if):
1868         """
1869         Initiates TCP session
1870
1871         :param in_if: Inside interface
1872         :param out_if: Outside interface
1873         """
1874         try:
1875             # SYN packet in->out
1876             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
1877                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
1878                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
1879                      flags="S"))
1880             in_if.add_stream(p)
1881             self.pg_enable_capture(self.pg_interfaces)
1882             self.pg_start()
1883             capture = out_if.get_capture(1)
1884             p = capture[0]
1885             self.tcp_port_out = p[TCP].sport
1886
1887             # SYN + ACK packet out->in
1888             p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
1889                  IP(src=out_if.remote_ip4, dst=self.snat_addr) /
1890                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
1891                      flags="SA"))
1892             out_if.add_stream(p)
1893             self.pg_enable_capture(self.pg_interfaces)
1894             self.pg_start()
1895             in_if.get_capture(1)
1896
1897             # ACK packet in->out
1898             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
1899                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
1900                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
1901                      flags="A"))
1902             in_if.add_stream(p)
1903             self.pg_enable_capture(self.pg_interfaces)
1904             self.pg_start()
1905             out_if.get_capture(1)
1906
1907         except:
1908             self.logger.error("TCP 3 way handshake failed")
1909             raise
1910
1911     def verify_ipfix_max_entries_per_user(self, data):
1912         """
1913         Verify IPFIX maximum entries per user exceeded event
1914
1915         :param data: Decoded IPFIX data records
1916         """
1917         self.assertEqual(1, len(data))
1918         record = data[0]
1919         # natEvent
1920         self.assertEqual(ord(record[230]), 13)
1921         # natQuotaExceededEvent
1922         self.assertEqual('\x03\x00\x00\x00', record[466])
1923         # sourceIPv4Address
1924         self.assertEqual(self.pg0.remote_ip4n, record[8])
1925
1926     def test_deterministic_mode(self):
1927         """ S-NAT run deterministic mode """
1928         in_addr = '172.16.255.0'
1929         out_addr = '172.17.255.50'
1930         in_addr_t = '172.16.255.20'
1931         in_addr_n = socket.inet_aton(in_addr)
1932         out_addr_n = socket.inet_aton(out_addr)
1933         in_addr_t_n = socket.inet_aton(in_addr_t)
1934         in_plen = 24
1935         out_plen = 32
1936
1937         snat_config = self.vapi.snat_show_config()
1938         self.assertEqual(1, snat_config.deterministic)
1939
1940         self.vapi.snat_add_det_map(in_addr_n, in_plen, out_addr_n, out_plen)
1941
1942         rep1 = self.vapi.snat_det_forward(in_addr_t_n)
1943         self.assertEqual(rep1.out_addr[:4], out_addr_n)
1944         rep2 = self.vapi.snat_det_reverse(out_addr_n, rep1.out_port_hi)
1945         self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
1946
1947         deterministic_mappings = self.vapi.snat_det_map_dump()
1948         self.assertEqual(len(deterministic_mappings), 1)
1949         dsm = deterministic_mappings[0]
1950         self.assertEqual(in_addr_n, dsm.in_addr[:4])
1951         self.assertEqual(in_plen, dsm.in_plen)
1952         self.assertEqual(out_addr_n, dsm.out_addr[:4])
1953         self.assertEqual(out_plen, dsm.out_plen)
1954
1955         self.clear_snat()
1956         deterministic_mappings = self.vapi.snat_det_map_dump()
1957         self.assertEqual(len(deterministic_mappings), 0)
1958
1959     def test_set_timeouts(self):
1960         """ Set deterministic NAT timeouts """
1961         timeouts_before = self.vapi.snat_det_get_timeouts()
1962
1963         self.vapi.snat_det_set_timeouts(timeouts_before.udp + 10,
1964                                         timeouts_before.tcp_established + 10,
1965                                         timeouts_before.tcp_transitory + 10,
1966                                         timeouts_before.icmp + 10)
1967
1968         timeouts_after = self.vapi.snat_det_get_timeouts()
1969
1970         self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
1971         self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
1972         self.assertNotEqual(timeouts_before.tcp_established,
1973                             timeouts_after.tcp_established)
1974         self.assertNotEqual(timeouts_before.tcp_transitory,
1975                             timeouts_after.tcp_transitory)
1976
1977     def test_det_in(self):
1978         """ CGNAT translation test (TCP, UDP, ICMP) """
1979
1980         nat_ip = "10.0.0.10"
1981
1982         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
1983                                    32,
1984                                    socket.inet_aton(nat_ip),
1985                                    32)
1986         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
1987         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
1988                                                  is_inside=0)
1989
1990         # in2out
1991         pkts = self.create_stream_in(self.pg0, self.pg1)
1992         self.pg0.add_stream(pkts)
1993         self.pg_enable_capture(self.pg_interfaces)
1994         self.pg_start()
1995         capture = self.pg1.get_capture(len(pkts))
1996         self.verify_capture_out(capture, nat_ip)
1997
1998         # out2in
1999         pkts = self.create_stream_out(self.pg1, nat_ip)
2000         self.pg1.add_stream(pkts)
2001         self.pg_enable_capture(self.pg_interfaces)
2002         self.pg_start()
2003         capture = self.pg0.get_capture(len(pkts))
2004         self.verify_capture_in(capture, self.pg0)
2005
2006         # session dump test
2007         sessions = self.vapi.snat_det_session_dump(self.pg0.remote_ip4n)
2008         self.assertEqual(len(sessions), 3)
2009
2010         # TCP session
2011         s = sessions[0]
2012         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2013         self.assertEqual(s.in_port, self.tcp_port_in)
2014         self.assertEqual(s.out_port, self.tcp_port_out)
2015         self.assertEqual(s.ext_port, self.tcp_external_port)
2016
2017         # UDP session
2018         s = sessions[1]
2019         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2020         self.assertEqual(s.in_port, self.udp_port_in)
2021         self.assertEqual(s.out_port, self.udp_port_out)
2022         self.assertEqual(s.ext_port, self.udp_external_port)
2023
2024         # ICMP session
2025         s = sessions[2]
2026         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2027         self.assertEqual(s.in_port, self.icmp_id_in)
2028         self.assertEqual(s.out_port, self.icmp_external_id)
2029
2030     def test_multiple_users(self):
2031         """ CGNAT multiple users """
2032
2033         nat_ip = "10.0.0.10"
2034         port_in = 80
2035         external_port = 6303
2036
2037         host0 = self.pg0.remote_hosts[0]
2038         host1 = self.pg0.remote_hosts[1]
2039
2040         self.vapi.snat_add_det_map(host0.ip4n,
2041                                    24,
2042                                    socket.inet_aton(nat_ip),
2043                                    32)
2044         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2045         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2046                                                  is_inside=0)
2047
2048         # host0 to out
2049         p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
2050              IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
2051              TCP(sport=port_in, dport=external_port))
2052         self.pg0.add_stream(p)
2053         self.pg_enable_capture(self.pg_interfaces)
2054         self.pg_start()
2055         capture = self.pg1.get_capture(1)
2056         p = capture[0]
2057         try:
2058             ip = p[IP]
2059             tcp = p[TCP]
2060             self.assertEqual(ip.src, nat_ip)
2061             self.assertEqual(ip.dst, self.pg1.remote_ip4)
2062             self.assertEqual(tcp.dport, external_port)
2063             port_out0 = tcp.sport
2064         except:
2065             self.logger.error(ppp("Unexpected or invalid packet:", p))
2066             raise
2067
2068         # host1 to out
2069         p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
2070              IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
2071              TCP(sport=port_in, dport=external_port))
2072         self.pg0.add_stream(p)
2073         self.pg_enable_capture(self.pg_interfaces)
2074         self.pg_start()
2075         capture = self.pg1.get_capture(1)
2076         p = capture[0]
2077         try:
2078             ip = p[IP]
2079             tcp = p[TCP]
2080             self.assertEqual(ip.src, nat_ip)
2081             self.assertEqual(ip.dst, self.pg1.remote_ip4)
2082             self.assertEqual(tcp.dport, external_port)
2083             port_out1 = tcp.sport
2084         except:
2085             self.logger.error(ppp("Unexpected or invalid packet:", p))
2086             raise
2087
2088         dms = self.vapi.snat_det_map_dump()
2089         self.assertEqual(1, len(dms))
2090         self.assertEqual(2, dms[0].ses_num)
2091
2092         # out to host0
2093         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2094              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2095              TCP(sport=external_port, dport=port_out0))
2096         self.pg1.add_stream(p)
2097         self.pg_enable_capture(self.pg_interfaces)
2098         self.pg_start()
2099         capture = self.pg0.get_capture(1)
2100         p = capture[0]
2101         try:
2102             ip = p[IP]
2103             tcp = p[TCP]
2104             self.assertEqual(ip.src, self.pg1.remote_ip4)
2105             self.assertEqual(ip.dst, host0.ip4)
2106             self.assertEqual(tcp.dport, port_in)
2107             self.assertEqual(tcp.sport, external_port)
2108         except:
2109             self.logger.error(ppp("Unexpected or invalid packet:", p))
2110             raise
2111
2112         # out to host1
2113         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2114              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2115              TCP(sport=external_port, dport=port_out1))
2116         self.pg1.add_stream(p)
2117         self.pg_enable_capture(self.pg_interfaces)
2118         self.pg_start()
2119         capture = self.pg0.get_capture(1)
2120         p = capture[0]
2121         try:
2122             ip = p[IP]
2123             tcp = p[TCP]
2124             self.assertEqual(ip.src, self.pg1.remote_ip4)
2125             self.assertEqual(ip.dst, host1.ip4)
2126             self.assertEqual(tcp.dport, port_in)
2127             self.assertEqual(tcp.sport, external_port)
2128         except:
2129             self.logger.error(ppp("Unexpected or invalid packet", p))
2130             raise
2131
2132         # session close api test
2133         self.vapi.snat_det_close_session_out(socket.inet_aton(nat_ip),
2134                                              port_out1,
2135                                              self.pg1.remote_ip4n,
2136                                              external_port)
2137         dms = self.vapi.snat_det_map_dump()
2138         self.assertEqual(dms[0].ses_num, 1)
2139
2140         self.vapi.snat_det_close_session_in(host0.ip4n,
2141                                             port_in,
2142                                             self.pg1.remote_ip4n,
2143                                             external_port)
2144         dms = self.vapi.snat_det_map_dump()
2145         self.assertEqual(dms[0].ses_num, 0)
2146
2147     def test_tcp_session_close_detection_in(self):
2148         """ CGNAT TCP session close initiated from inside network """
2149         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2150                                    32,
2151                                    socket.inet_aton(self.snat_addr),
2152                                    32)
2153         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2154         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2155                                                  is_inside=0)
2156
2157         self.initiate_tcp_session(self.pg0, self.pg1)
2158
2159         # close the session from inside
2160         try:
2161             # FIN packet in -> out
2162             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2163                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2164                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2165                      flags="F"))
2166             self.pg0.add_stream(p)
2167             self.pg_enable_capture(self.pg_interfaces)
2168             self.pg_start()
2169             self.pg1.get_capture(1)
2170
2171             pkts = []
2172
2173             # ACK packet out -> in
2174             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2175                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2176                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2177                      flags="A"))
2178             pkts.append(p)
2179
2180             # FIN packet out -> in
2181             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2182                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2183                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2184                      flags="F"))
2185             pkts.append(p)
2186
2187             self.pg1.add_stream(pkts)
2188             self.pg_enable_capture(self.pg_interfaces)
2189             self.pg_start()
2190             self.pg0.get_capture(2)
2191
2192             # ACK packet in -> out
2193             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2194                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2195                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2196                      flags="A"))
2197             self.pg0.add_stream(p)
2198             self.pg_enable_capture(self.pg_interfaces)
2199             self.pg_start()
2200             self.pg1.get_capture(1)
2201
2202             # Check if snat closed the session
2203             dms = self.vapi.snat_det_map_dump()
2204             self.assertEqual(0, dms[0].ses_num)
2205         except:
2206             self.logger.error("TCP session termination failed")
2207             raise
2208
2209     def test_tcp_session_close_detection_out(self):
2210         """ CGNAT TCP session close initiated from outside network """
2211         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2212                                    32,
2213                                    socket.inet_aton(self.snat_addr),
2214                                    32)
2215         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2216         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2217                                                  is_inside=0)
2218
2219         self.initiate_tcp_session(self.pg0, self.pg1)
2220
2221         # close the session from outside
2222         try:
2223             # FIN packet out -> in
2224             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2225                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2226                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2227                      flags="F"))
2228             self.pg1.add_stream(p)
2229             self.pg_enable_capture(self.pg_interfaces)
2230             self.pg_start()
2231             self.pg0.get_capture(1)
2232
2233             pkts = []
2234
2235             # ACK packet in -> out
2236             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2237                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2238                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2239                      flags="A"))
2240             pkts.append(p)
2241
2242             # ACK packet in -> out
2243             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2244                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2245                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2246                      flags="F"))
2247             pkts.append(p)
2248
2249             self.pg0.add_stream(pkts)
2250             self.pg_enable_capture(self.pg_interfaces)
2251             self.pg_start()
2252             self.pg1.get_capture(2)
2253
2254             # ACK packet out -> in
2255             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2256                  IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
2257                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2258                      flags="A"))
2259             self.pg1.add_stream(p)
2260             self.pg_enable_capture(self.pg_interfaces)
2261             self.pg_start()
2262             self.pg0.get_capture(1)
2263
2264             # Check if snat closed the session
2265             dms = self.vapi.snat_det_map_dump()
2266             self.assertEqual(0, dms[0].ses_num)
2267         except:
2268             self.logger.error("TCP session termination failed")
2269             raise
2270
2271     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2272     def test_session_timeout(self):
2273         """ CGNAT session timeouts """
2274         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2275                                    32,
2276                                    socket.inet_aton(self.snat_addr),
2277                                    32)
2278         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2279         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2280                                                  is_inside=0)
2281
2282         self.initiate_tcp_session(self.pg0, self.pg1)
2283         self.vapi.snat_det_set_timeouts(5, 5, 5, 5)
2284         pkts = self.create_stream_in(self.pg0, self.pg1)
2285         self.pg0.add_stream(pkts)
2286         self.pg_enable_capture(self.pg_interfaces)
2287         self.pg_start()
2288         capture = self.pg1.get_capture(len(pkts))
2289         sleep(15)
2290
2291         dms = self.vapi.snat_det_map_dump()
2292         self.assertEqual(0, dms[0].ses_num)
2293
2294     def test_session_limit_per_user(self):
2295         """ CGNAT maximum 1000 sessions per user should be created """
2296         self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
2297                                    32,
2298                                    socket.inet_aton(self.snat_addr),
2299                                    32)
2300         self.vapi.snat_interface_add_del_feature(self.pg0.sw_if_index)
2301         self.vapi.snat_interface_add_del_feature(self.pg1.sw_if_index,
2302                                                  is_inside=0)
2303         self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
2304                                      src_address=self.pg2.local_ip4n,
2305                                      path_mtu=512,
2306                                      template_interval=10)
2307         self.vapi.snat_ipfix()
2308
2309         pkts = []
2310         for port in range(1025, 2025):
2311             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2312                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2313                  UDP(sport=port, dport=port))
2314             pkts.append(p)
2315
2316         self.pg0.add_stream(pkts)
2317         self.pg_enable_capture(self.pg_interfaces)
2318         self.pg_start()
2319         capture = self.pg1.get_capture(len(pkts))
2320
2321         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2322              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2323              UDP(sport=3001, dport=3002))
2324         self.pg0.add_stream(p)
2325         self.pg_enable_capture(self.pg_interfaces)
2326         self.pg_start()
2327         capture = self.pg1.assert_nothing_captured()
2328
2329         # verify ICMP error packet
2330         capture = self.pg0.get_capture(1)
2331         p = capture[0]
2332         self.assertTrue(p.haslayer(ICMP))
2333         icmp = p[ICMP]
2334         self.assertEqual(icmp.type, 3)
2335         self.assertEqual(icmp.code, 1)
2336         self.assertTrue(icmp.haslayer(IPerror))
2337         inner_ip = icmp[IPerror]
2338         self.assertEqual(inner_ip[UDPerror].sport, 3001)
2339         self.assertEqual(inner_ip[UDPerror].dport, 3002)
2340
2341         dms = self.vapi.snat_det_map_dump()
2342
2343         self.assertEqual(1000, dms[0].ses_num)
2344
2345         # verify IPFIX logging
2346         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
2347         capture = self.pg2.get_capture(2)
2348         ipfix = IPFIXDecoder()
2349         # first load template
2350         for p in capture:
2351             self.assertTrue(p.haslayer(IPFIX))
2352             if p.haslayer(Template):
2353                 ipfix.add_template(p.getlayer(Template))
2354         # verify events in data set
2355         for p in capture:
2356             if p.haslayer(Data):
2357                 data = ipfix.decode_data_set(p.getlayer(Set))
2358                 self.verify_ipfix_max_entries_per_user(data)
2359
2360     def clear_snat(self):
2361         """
2362         Clear SNAT configuration.
2363         """
2364         self.vapi.snat_ipfix(enable=0)
2365         self.vapi.snat_det_set_timeouts()
2366         deterministic_mappings = self.vapi.snat_det_map_dump()
2367         for dsm in deterministic_mappings:
2368             self.vapi.snat_add_det_map(dsm.in_addr,
2369                                        dsm.in_plen,
2370                                        dsm.out_addr,
2371                                        dsm.out_plen,
2372                                        is_add=0)
2373
2374         interfaces = self.vapi.snat_interface_dump()
2375         for intf in interfaces:
2376             self.vapi.snat_interface_add_del_feature(intf.sw_if_index,
2377                                                      intf.is_inside,
2378                                                      is_add=0)
2379
2380     def tearDown(self):
2381         super(TestDeterministicNAT, self).tearDown()
2382         if not self.vpp_dead:
2383             self.logger.info(self.vapi.cli("show snat detail"))
2384             self.clear_snat()
2385
2386
2387 class TestNAT64(MethodHolder):
2388     """ NAT64 Test Cases """
2389
2390     @classmethod
2391     def setUpClass(cls):
2392         super(TestNAT64, cls).setUpClass()
2393
2394         try:
2395             cls.tcp_port_in = 6303
2396             cls.tcp_port_out = 6303
2397             cls.udp_port_in = 6304
2398             cls.udp_port_out = 6304
2399             cls.icmp_id_in = 6305
2400             cls.icmp_id_out = 6305
2401             cls.nat_addr = '10.0.0.3'
2402             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
2403
2404             cls.create_pg_interfaces(range(2))
2405             cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
2406             cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
2407
2408             for i in cls.ip6_interfaces:
2409                 i.admin_up()
2410                 i.config_ip6()
2411                 i.resolve_ndp()
2412
2413             for i in cls.ip4_interfaces:
2414                 i.admin_up()
2415                 i.config_ip4()
2416                 i.resolve_arp()
2417
2418         except Exception:
2419             super(TestNAT64, cls).tearDownClass()
2420             raise
2421
2422     def test_pool(self):
2423         """ Add/delete address to NAT64 pool """
2424         nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
2425
2426         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
2427
2428         addresses = self.vapi.nat64_pool_addr_dump()
2429         self.assertEqual(len(addresses), 1)
2430         self.assertEqual(addresses[0].address, nat_addr)
2431
2432         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
2433
2434         addresses = self.vapi.nat64_pool_addr_dump()
2435         self.assertEqual(len(addresses), 0)
2436
2437     def test_interface(self):
2438         """ Enable/disable NAT64 feature on the interface """
2439         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2440         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2441
2442         interfaces = self.vapi.nat64_interface_dump()
2443         self.assertEqual(len(interfaces), 2)
2444         pg0_found = False
2445         pg1_found = False
2446         for intf in interfaces:
2447             if intf.sw_if_index == self.pg0.sw_if_index:
2448                 self.assertEqual(intf.is_inside, 1)
2449                 pg0_found = True
2450             elif intf.sw_if_index == self.pg1.sw_if_index:
2451                 self.assertEqual(intf.is_inside, 0)
2452                 pg1_found = True
2453         self.assertTrue(pg0_found)
2454         self.assertTrue(pg1_found)
2455
2456         features = self.vapi.cli("show interface features pg0")
2457         self.assertNotEqual(features.find('nat64-in2out'), -1)
2458         features = self.vapi.cli("show interface features pg1")
2459         self.assertNotEqual(features.find('nat64-out2in'), -1)
2460
2461         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
2462         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
2463
2464         interfaces = self.vapi.nat64_interface_dump()
2465         self.assertEqual(len(interfaces), 0)
2466
2467     def test_static_bib(self):
2468         """ Add/delete static BIB entry """
2469         in_addr = socket.inet_pton(socket.AF_INET6,
2470                                    '2001:db8:85a3::8a2e:370:7334')
2471         out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
2472         in_port = 1234
2473         out_port = 5678
2474         proto = IP_PROTOS.tcp
2475
2476         self.vapi.nat64_add_del_static_bib(in_addr,
2477                                            out_addr,
2478                                            in_port,
2479                                            out_port,
2480                                            proto)
2481         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
2482         static_bib_num = 0
2483         for bibe in bib:
2484             if bibe.is_static:
2485                 static_bib_num += 1
2486                 self.assertEqual(bibe.i_addr, in_addr)
2487                 self.assertEqual(bibe.o_addr, out_addr)
2488                 self.assertEqual(bibe.i_port, in_port)
2489                 self.assertEqual(bibe.o_port, out_port)
2490         self.assertEqual(static_bib_num, 1)
2491
2492         self.vapi.nat64_add_del_static_bib(in_addr,
2493                                            out_addr,
2494                                            in_port,
2495                                            out_port,
2496                                            proto,
2497                                            is_add=0)
2498         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
2499         static_bib_num = 0
2500         for bibe in bib:
2501             if bibe.is_static:
2502                 static_bib_num += 1
2503         self.assertEqual(static_bib_num, 0)
2504
2505     def test_set_timeouts(self):
2506         """ Set NAT64 timeouts """
2507         # verify default values
2508         timeouts = self.vapi.nat64_get_timeouts()
2509         self.assertEqual(timeouts.udp, 300)
2510         self.assertEqual(timeouts.icmp, 60)
2511         self.assertEqual(timeouts.tcp_trans, 240)
2512         self.assertEqual(timeouts.tcp_est, 7440)
2513         self.assertEqual(timeouts.tcp_incoming_syn, 6)
2514
2515         # set and verify custom values
2516         self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
2517                                      tcp_est=7450, tcp_incoming_syn=10)
2518         timeouts = self.vapi.nat64_get_timeouts()
2519         self.assertEqual(timeouts.udp, 200)
2520         self.assertEqual(timeouts.icmp, 30)
2521         self.assertEqual(timeouts.tcp_trans, 250)
2522         self.assertEqual(timeouts.tcp_est, 7450)
2523         self.assertEqual(timeouts.tcp_incoming_syn, 10)
2524
2525     def test_dynamic(self):
2526         """ NAT64 dynamic translation test """
2527         self.tcp_port_in = 6303
2528         self.udp_port_in = 6304
2529         self.icmp_id_in = 6305
2530
2531         ses_num_start = self.nat64_get_ses_num()
2532
2533         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2534                                                 self.nat_addr_n)
2535         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2536         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2537
2538         # in2out
2539         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2540         self.pg0.add_stream(pkts)
2541         self.pg_enable_capture(self.pg_interfaces)
2542         self.pg_start()
2543         capture = self.pg1.get_capture(3)
2544         self.verify_capture_out(capture, packet_num=3, nat_ip=self.nat_addr,
2545                                 dst_ip=self.pg1.remote_ip4)
2546
2547         # out2in
2548         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2549         self.pg1.add_stream(pkts)
2550         self.pg_enable_capture(self.pg_interfaces)
2551         self.pg_start()
2552         capture = self.pg0.get_capture(3)
2553         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2554         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2555
2556         # in2out
2557         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2558         self.pg0.add_stream(pkts)
2559         self.pg_enable_capture(self.pg_interfaces)
2560         self.pg_start()
2561         capture = self.pg1.get_capture(3)
2562         self.verify_capture_out(capture, packet_num=3, nat_ip=self.nat_addr,
2563                                 dst_ip=self.pg1.remote_ip4)
2564
2565         # out2in
2566         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2567         self.pg1.add_stream(pkts)
2568         self.pg_enable_capture(self.pg_interfaces)
2569         self.pg_start()
2570         capture = self.pg0.get_capture(3)
2571         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2572         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2573
2574         ses_num_end = self.nat64_get_ses_num()
2575
2576         self.assertEqual(ses_num_end - ses_num_start, 3)
2577
2578     def test_static(self):
2579         """ NAT64 static translation test """
2580         self.tcp_port_in = 60303
2581         self.udp_port_in = 60304
2582         self.icmp_id_in = 60305
2583         self.tcp_port_out = 60303
2584         self.udp_port_out = 60304
2585         self.icmp_id_out = 60305
2586
2587         ses_num_start = self.nat64_get_ses_num()
2588
2589         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2590                                                 self.nat_addr_n)
2591         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2592         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2593
2594         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
2595                                            self.nat_addr_n,
2596                                            self.tcp_port_in,
2597                                            self.tcp_port_out,
2598                                            IP_PROTOS.tcp)
2599         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
2600                                            self.nat_addr_n,
2601                                            self.udp_port_in,
2602                                            self.udp_port_out,
2603                                            IP_PROTOS.udp)
2604         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
2605                                            self.nat_addr_n,
2606                                            self.icmp_id_in,
2607                                            self.icmp_id_out,
2608                                            IP_PROTOS.icmp)
2609
2610         # in2out
2611         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2612         self.pg0.add_stream(pkts)
2613         self.pg_enable_capture(self.pg_interfaces)
2614         self.pg_start()
2615         capture = self.pg1.get_capture(3)
2616         self.verify_capture_out(capture, packet_num=3, nat_ip=self.nat_addr,
2617                                 dst_ip=self.pg1.remote_ip4, same_port=True)
2618
2619         # out2in
2620         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2621         self.pg1.add_stream(pkts)
2622         self.pg_enable_capture(self.pg_interfaces)
2623         self.pg_start()
2624         capture = self.pg0.get_capture(3)
2625         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2626         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
2627
2628         ses_num_end = self.nat64_get_ses_num()
2629
2630         self.assertEqual(ses_num_end - ses_num_start, 3)
2631
2632     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2633     def test_session_timeout(self):
2634         """ NAT64 session timeout """
2635         self.icmp_id_in = 1234
2636         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2637                                                 self.nat_addr_n)
2638         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2639         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2640         self.vapi.nat64_set_timeouts(icmp=5)
2641
2642         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2643         self.pg0.add_stream(pkts)
2644         self.pg_enable_capture(self.pg_interfaces)
2645         self.pg_start()
2646         capture = self.pg1.get_capture(3)
2647
2648         ses_num_before_timeout = self.nat64_get_ses_num()
2649
2650         sleep(15)
2651
2652         # ICMP session after timeout
2653         ses_num_after_timeout = self.nat64_get_ses_num()
2654         self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
2655
2656     def test_icmp_error(self):
2657         """ NAT64 ICMP Error message translation """
2658         self.tcp_port_in = 6303
2659         self.udp_port_in = 6304
2660         self.icmp_id_in = 6305
2661
2662         ses_num_start = self.nat64_get_ses_num()
2663
2664         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
2665                                                 self.nat_addr_n)
2666         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
2667         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
2668
2669         # send some packets to create sessions
2670         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
2671         self.pg0.add_stream(pkts)
2672         self.pg_enable_capture(self.pg_interfaces)
2673         self.pg_start()
2674         capture_ip4 = self.pg1.get_capture(len(pkts))
2675         self.verify_capture_out(capture_ip4, packet_num=3,
2676                                 nat_ip=self.nat_addr,
2677                                 dst_ip=self.pg1.remote_ip4)
2678
2679         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
2680         self.pg1.add_stream(pkts)
2681         self.pg_enable_capture(self.pg_interfaces)
2682         self.pg_start()
2683         capture_ip6 = self.pg0.get_capture(len(pkts))
2684         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
2685         self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
2686                                    self.pg0.remote_ip6)
2687
2688         # in2out
2689         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2690                 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
2691                 ICMPv6DestUnreach(code=1) /
2692                 packet[IPv6] for packet in capture_ip6]
2693         self.pg0.add_stream(pkts)
2694         self.pg_enable_capture(self.pg_interfaces)
2695         self.pg_start()
2696         capture = self.pg1.get_capture(len(pkts))
2697         for packet in capture:
2698             try:
2699                 self.assertEqual(packet[IP].src, self.nat_addr)
2700                 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2701                 self.assertEqual(packet[ICMP].type, 3)
2702                 self.assertEqual(packet[ICMP].code, 13)
2703                 inner = packet[IPerror]
2704                 self.assertEqual(inner.src, self.pg1.remote_ip4)
2705                 self.assertEqual(inner.dst, self.nat_addr)
2706                 if inner.haslayer(TCPerror):
2707                     self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
2708                 elif inner.haslayer(UDPerror):
2709                     self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
2710                 else:
2711                     self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
2712             except:
2713                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2714                 raise
2715
2716         # out2in
2717         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2718                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2719                 ICMP(type=3, code=13) /
2720                 packet[IP] for packet in capture_ip4]
2721         self.pg1.add_stream(pkts)
2722         self.pg_enable_capture(self.pg_interfaces)
2723         self.pg_start()
2724         capture = self.pg0.get_capture(len(pkts))
2725         for packet in capture:
2726             try:
2727                 self.assertEqual(packet[IPv6].src, ip.src)
2728                 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
2729                 icmp = packet[ICMPv6DestUnreach]
2730                 self.assertEqual(icmp.code, 1)
2731                 inner = icmp[IPerror6]
2732                 self.assertEqual(inner.src, self.pg0.remote_ip6)
2733                 self.assertEqual(inner.dst, ip.src)
2734                 if inner.haslayer(TCPerror):
2735                     self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
2736                 elif inner.haslayer(UDPerror):
2737                     self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
2738                 else:
2739                     self.assertEqual(inner[ICMPv6EchoRequest].id,
2740                                      self.icmp_id_in)
2741             except:
2742                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2743                 raise
2744
2745     def nat64_get_ses_num(self):
2746         """
2747         Return number of active NAT64 sessions.
2748         """
2749         ses_num = 0
2750         st = self.vapi.nat64_st_dump(IP_PROTOS.tcp)
2751         ses_num += len(st)
2752         st = self.vapi.nat64_st_dump(IP_PROTOS.udp)
2753         ses_num += len(st)
2754         st = self.vapi.nat64_st_dump(IP_PROTOS.icmp)
2755         ses_num += len(st)
2756         return ses_num
2757
2758     def clear_nat64(self):
2759         """
2760         Clear NAT64 configuration.
2761         """
2762         self.vapi.nat64_set_timeouts()
2763
2764         interfaces = self.vapi.nat64_interface_dump()
2765         for intf in interfaces:
2766             self.vapi.nat64_add_del_interface(intf.sw_if_index,
2767                                               intf.is_inside,
2768                                               is_add=0)
2769
2770         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
2771         for bibe in bib:
2772             if bibe.is_static:
2773                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
2774                                                    bibe.o_addr,
2775                                                    bibe.i_port,
2776                                                    bibe.o_port,
2777                                                    bibe.proto,
2778                                                    bibe.vrf_id,
2779                                                    is_add=0)
2780
2781         bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
2782         for bibe in bib:
2783             if bibe.is_static:
2784                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
2785                                                    bibe.o_addr,
2786                                                    bibe.i_port,
2787                                                    bibe.o_port,
2788                                                    bibe.proto,
2789                                                    bibe.vrf_id,
2790                                                    is_add=0)
2791
2792         bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
2793         for bibe in bib:
2794             if bibe.is_static:
2795                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
2796                                                    bibe.o_addr,
2797                                                    bibe.i_port,
2798                                                    bibe.o_port,
2799                                                    bibe.proto,
2800                                                    bibe.vrf_id,
2801                                                    is_add=0)
2802
2803         adresses = self.vapi.nat64_pool_addr_dump()
2804         for addr in adresses:
2805             self.vapi.nat64_add_del_pool_addr_range(addr.address,
2806                                                     addr.address,
2807                                                     is_add=0)
2808
2809     def tearDown(self):
2810         super(TestNAT64, self).tearDown()
2811         if not self.vpp_dead:
2812             self.logger.info(self.vapi.cli("show nat64 pool"))
2813             self.logger.info(self.vapi.cli("show nat64 interfaces"))
2814             self.logger.info(self.vapi.cli("show nat64 bib tcp"))
2815             self.logger.info(self.vapi.cli("show nat64 bib udp"))
2816             self.logger.info(self.vapi.cli("show nat64 bib icmp"))
2817             self.logger.info(self.vapi.cli("show nat64 session table tcp"))
2818             self.logger.info(self.vapi.cli("show nat64 session table udp"))
2819             self.logger.info(self.vapi.cli("show nat64 session table icmp"))
2820             self.clear_nat64()
2821
2822 if __name__ == '__main__':
2823     unittest.main(testRunner=VppTestRunner)