FIB table add/delete API
[vpp.git] / test / test_nat.py
1 #!/usr/bin/env python
2
3 import socket
4 import unittest
5 import struct
6
7 from framework import VppTestCase, VppTestRunner, running_extended_tests
8 from scapy.layers.inet import IP, TCP, UDP, ICMP
9 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
10 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
11 from scapy.layers.inet6 import ICMPv6DestUnreach, IPerror6
12 from scapy.layers.l2 import Ether, ARP, GRE
13 from scapy.data import IP_PROTOS
14 from scapy.packet import bind_layers
15 from util import ppp
16 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
17 from time import sleep
18 from util import ip4_range
19
20
21 class MethodHolder(VppTestCase):
22     """ NAT create capture and verify method holder """
23
24     @classmethod
25     def setUpClass(cls):
26         super(MethodHolder, cls).setUpClass()
27
28     def tearDown(self):
29         super(MethodHolder, self).tearDown()
30
31     def check_ip_checksum(self, pkt):
32         """
33         Check IP checksum of the packet
34
35         :param pkt: Packet to check IP checksum
36         """
37         new = pkt.__class__(str(pkt))
38         del new['IP'].chksum
39         new = new.__class__(str(new))
40         self.assertEqual(new['IP'].chksum, pkt['IP'].chksum)
41
42     def check_tcp_checksum(self, pkt):
43         """
44         Check TCP checksum in IP packet
45
46         :param pkt: Packet to check TCP checksum
47         """
48         new = pkt.__class__(str(pkt))
49         del new['TCP'].chksum
50         new = new.__class__(str(new))
51         self.assertEqual(new['TCP'].chksum, pkt['TCP'].chksum)
52
53     def check_udp_checksum(self, pkt):
54         """
55         Check UDP checksum in IP packet
56
57         :param pkt: Packet to check UDP checksum
58         """
59         new = pkt.__class__(str(pkt))
60         del new['UDP'].chksum
61         new = new.__class__(str(new))
62         self.assertEqual(new['UDP'].chksum, pkt['UDP'].chksum)
63
64     def check_icmp_errror_embedded(self, pkt):
65         """
66         Check ICMP error embeded packet checksum
67
68         :param pkt: Packet to check ICMP error embeded packet checksum
69         """
70         if pkt.haslayer(IPerror):
71             new = pkt.__class__(str(pkt))
72             del new['IPerror'].chksum
73             new = new.__class__(str(new))
74             self.assertEqual(new['IPerror'].chksum, pkt['IPerror'].chksum)
75
76         if pkt.haslayer(TCPerror):
77             new = pkt.__class__(str(pkt))
78             del new['TCPerror'].chksum
79             new = new.__class__(str(new))
80             self.assertEqual(new['TCPerror'].chksum, pkt['TCPerror'].chksum)
81
82         if pkt.haslayer(UDPerror):
83             if pkt['UDPerror'].chksum != 0:
84                 new = pkt.__class__(str(pkt))
85                 del new['UDPerror'].chksum
86                 new = new.__class__(str(new))
87                 self.assertEqual(new['UDPerror'].chksum,
88                                  pkt['UDPerror'].chksum)
89
90         if pkt.haslayer(ICMPerror):
91             del new['ICMPerror'].chksum
92             new = new.__class__(str(new))
93             self.assertEqual(new['ICMPerror'].chksum, pkt['ICMPerror'].chksum)
94
95     def check_icmp_checksum(self, pkt):
96         """
97         Check ICMP checksum in IPv4 packet
98
99         :param pkt: Packet to check ICMP checksum
100         """
101         new = pkt.__class__(str(pkt))
102         del new['ICMP'].chksum
103         new = new.__class__(str(new))
104         self.assertEqual(new['ICMP'].chksum, pkt['ICMP'].chksum)
105         if pkt.haslayer(IPerror):
106             self.check_icmp_errror_embedded(pkt)
107
108     def check_icmpv6_checksum(self, pkt):
109         """
110         Check ICMPv6 checksum in IPv4 packet
111
112         :param pkt: Packet to check ICMPv6 checksum
113         """
114         new = pkt.__class__(str(pkt))
115         if pkt.haslayer(ICMPv6DestUnreach):
116             del new['ICMPv6DestUnreach'].cksum
117             new = new.__class__(str(new))
118             self.assertEqual(new['ICMPv6DestUnreach'].cksum,
119                              pkt['ICMPv6DestUnreach'].cksum)
120             self.check_icmp_errror_embedded(pkt)
121         if pkt.haslayer(ICMPv6EchoRequest):
122             del new['ICMPv6EchoRequest'].cksum
123             new = new.__class__(str(new))
124             self.assertEqual(new['ICMPv6EchoRequest'].cksum,
125                              pkt['ICMPv6EchoRequest'].cksum)
126         if pkt.haslayer(ICMPv6EchoReply):
127             del new['ICMPv6EchoReply'].cksum
128             new = new.__class__(str(new))
129             self.assertEqual(new['ICMPv6EchoReply'].cksum,
130                              pkt['ICMPv6EchoReply'].cksum)
131
132     def create_stream_in(self, in_if, out_if, ttl=64):
133         """
134         Create packet stream for inside network
135
136         :param in_if: Inside interface
137         :param out_if: Outside interface
138         :param ttl: TTL of generated packets
139         """
140         pkts = []
141         # TCP
142         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
143              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
144              TCP(sport=self.tcp_port_in, dport=20))
145         pkts.append(p)
146
147         # UDP
148         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
149              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
150              UDP(sport=self.udp_port_in, dport=20))
151         pkts.append(p)
152
153         # ICMP
154         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
155              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
156              ICMP(id=self.icmp_id_in, type='echo-request'))
157         pkts.append(p)
158
159         return pkts
160
161     def compose_ip6(self, ip4, pref, plen):
162         """
163         Compose IPv4-embedded IPv6 addresses
164
165         :param ip4: IPv4 address
166         :param pref: IPv6 prefix
167         :param plen: IPv6 prefix length
168         :returns: IPv4-embedded IPv6 addresses
169         """
170         pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
171         ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
172         if plen == 32:
173             pref_n[4] = ip4_n[0]
174             pref_n[5] = ip4_n[1]
175             pref_n[6] = ip4_n[2]
176             pref_n[7] = ip4_n[3]
177         elif plen == 40:
178             pref_n[5] = ip4_n[0]
179             pref_n[6] = ip4_n[1]
180             pref_n[7] = ip4_n[2]
181             pref_n[9] = ip4_n[3]
182         elif plen == 48:
183             pref_n[6] = ip4_n[0]
184             pref_n[7] = ip4_n[1]
185             pref_n[9] = ip4_n[2]
186             pref_n[10] = ip4_n[3]
187         elif plen == 56:
188             pref_n[7] = ip4_n[0]
189             pref_n[9] = ip4_n[1]
190             pref_n[10] = ip4_n[2]
191             pref_n[11] = ip4_n[3]
192         elif plen == 64:
193             pref_n[9] = ip4_n[0]
194             pref_n[10] = ip4_n[1]
195             pref_n[11] = ip4_n[2]
196             pref_n[12] = ip4_n[3]
197         elif plen == 96:
198             pref_n[12] = ip4_n[0]
199             pref_n[13] = ip4_n[1]
200             pref_n[14] = ip4_n[2]
201             pref_n[15] = ip4_n[3]
202         return socket.inet_ntop(socket.AF_INET6, ''.join(pref_n))
203
204     def create_stream_in_ip6(self, in_if, out_if, hlim=64, pref=None, plen=0):
205         """
206         Create IPv6 packet stream for inside network
207
208         :param in_if: Inside interface
209         :param out_if: Outside interface
210         :param ttl: Hop Limit of generated packets
211         :param pref: NAT64 prefix
212         :param plen: NAT64 prefix length
213         """
214         pkts = []
215         if pref is None:
216             dst = ''.join(['64:ff9b::', out_if.remote_ip4])
217         else:
218             dst = self.compose_ip6(out_if.remote_ip4, pref, plen)
219
220         # TCP
221         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
222              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
223              TCP(sport=self.tcp_port_in, dport=20))
224         pkts.append(p)
225
226         # UDP
227         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
228              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
229              UDP(sport=self.udp_port_in, dport=20))
230         pkts.append(p)
231
232         # ICMP
233         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
234              IPv6(src=in_if.remote_ip6, dst=dst, hlim=hlim) /
235              ICMPv6EchoRequest(id=self.icmp_id_in))
236         pkts.append(p)
237
238         return pkts
239
240     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
241         """
242         Create packet stream for outside network
243
244         :param out_if: Outside interface
245         :param dst_ip: Destination IP address (Default use global NAT address)
246         :param ttl: TTL of generated packets
247         """
248         if dst_ip is None:
249             dst_ip = self.nat_addr
250         pkts = []
251         # TCP
252         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
253              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
254              TCP(dport=self.tcp_port_out, sport=20))
255         pkts.append(p)
256
257         # UDP
258         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
259              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
260              UDP(dport=self.udp_port_out, sport=20))
261         pkts.append(p)
262
263         # ICMP
264         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
265              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
266              ICMP(id=self.icmp_id_out, type='echo-reply'))
267         pkts.append(p)
268
269         return pkts
270
271     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
272                            packet_num=3, dst_ip=None):
273         """
274         Verify captured packets on outside network
275
276         :param capture: Captured packets
277         :param nat_ip: Translated IP address (Default use global NAT address)
278         :param same_port: Sorce port number is not translated (Default False)
279         :param packet_num: Expected number of packets (Default 3)
280         :param dst_ip: Destination IP address (Default do not verify)
281         """
282         if nat_ip is None:
283             nat_ip = self.nat_addr
284         self.assertEqual(packet_num, len(capture))
285         for packet in capture:
286             try:
287                 self.check_ip_checksum(packet)
288                 self.assertEqual(packet[IP].src, nat_ip)
289                 if dst_ip is not None:
290                     self.assertEqual(packet[IP].dst, dst_ip)
291                 if packet.haslayer(TCP):
292                     if same_port:
293                         self.assertEqual(packet[TCP].sport, self.tcp_port_in)
294                     else:
295                         self.assertNotEqual(
296                             packet[TCP].sport, self.tcp_port_in)
297                     self.tcp_port_out = packet[TCP].sport
298                     self.check_tcp_checksum(packet)
299                 elif packet.haslayer(UDP):
300                     if same_port:
301                         self.assertEqual(packet[UDP].sport, self.udp_port_in)
302                     else:
303                         self.assertNotEqual(
304                             packet[UDP].sport, self.udp_port_in)
305                     self.udp_port_out = packet[UDP].sport
306                 else:
307                     if same_port:
308                         self.assertEqual(packet[ICMP].id, self.icmp_id_in)
309                     else:
310                         self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
311                     self.icmp_id_out = packet[ICMP].id
312                     self.check_icmp_checksum(packet)
313             except:
314                 self.logger.error(ppp("Unexpected or invalid packet "
315                                       "(outside network):", packet))
316                 raise
317
318     def verify_capture_in(self, capture, in_if, packet_num=3):
319         """
320         Verify captured packets on inside network
321
322         :param capture: Captured packets
323         :param in_if: Inside interface
324         :param packet_num: Expected number of packets (Default 3)
325         """
326         self.assertEqual(packet_num, len(capture))
327         for packet in capture:
328             try:
329                 self.check_ip_checksum(packet)
330                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
331                 if packet.haslayer(TCP):
332                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
333                     self.check_tcp_checksum(packet)
334                 elif packet.haslayer(UDP):
335                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
336                 else:
337                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
338                     self.check_icmp_checksum(packet)
339             except:
340                 self.logger.error(ppp("Unexpected or invalid packet "
341                                       "(inside network):", packet))
342                 raise
343
344     def verify_capture_in_ip6(self, capture, src_ip, dst_ip, packet_num=3):
345         """
346         Verify captured IPv6 packets on inside network
347
348         :param capture: Captured packets
349         :param src_ip: Source IP
350         :param dst_ip: Destination IP address
351         :param packet_num: Expected number of packets (Default 3)
352         """
353         self.assertEqual(packet_num, len(capture))
354         for packet in capture:
355             try:
356                 self.assertEqual(packet[IPv6].src, src_ip)
357                 self.assertEqual(packet[IPv6].dst, dst_ip)
358                 if packet.haslayer(TCP):
359                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
360                     self.check_tcp_checksum(packet)
361                 elif packet.haslayer(UDP):
362                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
363                     self.check_udp_checksum(packet)
364                 else:
365                     self.assertEqual(packet[ICMPv6EchoReply].id,
366                                      self.icmp_id_in)
367                     self.check_icmpv6_checksum(packet)
368             except:
369                 self.logger.error(ppp("Unexpected or invalid packet "
370                                       "(inside network):", packet))
371                 raise
372
373     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
374         """
375         Verify captured packet that don't have to be translated
376
377         :param capture: Captured packets
378         :param ingress_if: Ingress interface
379         :param egress_if: Egress interface
380         """
381         for packet in capture:
382             try:
383                 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
384                 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
385                 if packet.haslayer(TCP):
386                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
387                 elif packet.haslayer(UDP):
388                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
389                 else:
390                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
391             except:
392                 self.logger.error(ppp("Unexpected or invalid packet "
393                                       "(inside network):", packet))
394                 raise
395
396     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
397                                             packet_num=3, icmp_type=11):
398         """
399         Verify captured packets with ICMP errors on outside network
400
401         :param capture: Captured packets
402         :param src_ip: Translated IP address or IP address of VPP
403                        (Default use global NAT address)
404         :param packet_num: Expected number of packets (Default 3)
405         :param icmp_type: Type of error ICMP packet
406                           we are expecting (Default 11)
407         """
408         if src_ip is None:
409             src_ip = self.nat_addr
410         self.assertEqual(packet_num, len(capture))
411         for packet in capture:
412             try:
413                 self.assertEqual(packet[IP].src, src_ip)
414                 self.assertTrue(packet.haslayer(ICMP))
415                 icmp = packet[ICMP]
416                 self.assertEqual(icmp.type, icmp_type)
417                 self.assertTrue(icmp.haslayer(IPerror))
418                 inner_ip = icmp[IPerror]
419                 if inner_ip.haslayer(TCPerror):
420                     self.assertEqual(inner_ip[TCPerror].dport,
421                                      self.tcp_port_out)
422                 elif inner_ip.haslayer(UDPerror):
423                     self.assertEqual(inner_ip[UDPerror].dport,
424                                      self.udp_port_out)
425                 else:
426                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
427             except:
428                 self.logger.error(ppp("Unexpected or invalid packet "
429                                       "(outside network):", packet))
430                 raise
431
432     def verify_capture_in_with_icmp_errors(self, capture, in_if, packet_num=3,
433                                            icmp_type=11):
434         """
435         Verify captured packets with ICMP errors on inside network
436
437         :param capture: Captured packets
438         :param in_if: Inside interface
439         :param packet_num: Expected number of packets (Default 3)
440         :param icmp_type: Type of error ICMP packet
441                           we are expecting (Default 11)
442         """
443         self.assertEqual(packet_num, len(capture))
444         for packet in capture:
445             try:
446                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
447                 self.assertTrue(packet.haslayer(ICMP))
448                 icmp = packet[ICMP]
449                 self.assertEqual(icmp.type, icmp_type)
450                 self.assertTrue(icmp.haslayer(IPerror))
451                 inner_ip = icmp[IPerror]
452                 if inner_ip.haslayer(TCPerror):
453                     self.assertEqual(inner_ip[TCPerror].sport,
454                                      self.tcp_port_in)
455                 elif inner_ip.haslayer(UDPerror):
456                     self.assertEqual(inner_ip[UDPerror].sport,
457                                      self.udp_port_in)
458                 else:
459                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
460             except:
461                 self.logger.error(ppp("Unexpected or invalid packet "
462                                       "(inside network):", packet))
463                 raise
464
465     def verify_ipfix_nat44_ses(self, data):
466         """
467         Verify IPFIX NAT44 session create/delete event
468
469         :param data: Decoded IPFIX data records
470         """
471         nat44_ses_create_num = 0
472         nat44_ses_delete_num = 0
473         self.assertEqual(6, len(data))
474         for record in data:
475             # natEvent
476             self.assertIn(ord(record[230]), [4, 5])
477             if ord(record[230]) == 4:
478                 nat44_ses_create_num += 1
479             else:
480                 nat44_ses_delete_num += 1
481             # sourceIPv4Address
482             self.assertEqual(self.pg0.remote_ip4n, record[8])
483             # postNATSourceIPv4Address
484             self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
485                              record[225])
486             # ingressVRFID
487             self.assertEqual(struct.pack("!I", 0), record[234])
488             # protocolIdentifier/sourceTransportPort/postNAPTSourceTransportPort
489             if IP_PROTOS.icmp == ord(record[4]):
490                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
491                 self.assertEqual(struct.pack("!H", self.icmp_id_out),
492                                  record[227])
493             elif IP_PROTOS.tcp == ord(record[4]):
494                 self.assertEqual(struct.pack("!H", self.tcp_port_in),
495                                  record[7])
496                 self.assertEqual(struct.pack("!H", self.tcp_port_out),
497                                  record[227])
498             elif IP_PROTOS.udp == ord(record[4]):
499                 self.assertEqual(struct.pack("!H", self.udp_port_in),
500                                  record[7])
501                 self.assertEqual(struct.pack("!H", self.udp_port_out),
502                                  record[227])
503             else:
504                 self.fail("Invalid protocol")
505         self.assertEqual(3, nat44_ses_create_num)
506         self.assertEqual(3, nat44_ses_delete_num)
507
508     def verify_ipfix_addr_exhausted(self, data):
509         """
510         Verify IPFIX NAT addresses event
511
512         :param data: Decoded IPFIX data records
513         """
514         self.assertEqual(1, len(data))
515         record = data[0]
516         # natEvent
517         self.assertEqual(ord(record[230]), 3)
518         # natPoolID
519         self.assertEqual(struct.pack("!I", 0), record[283])
520
521
522 class TestNAT44(MethodHolder):
523     """ NAT44 Test Cases """
524
525     @classmethod
526     def setUpClass(cls):
527         super(TestNAT44, cls).setUpClass()
528
529         try:
530             cls.tcp_port_in = 6303
531             cls.tcp_port_out = 6303
532             cls.udp_port_in = 6304
533             cls.udp_port_out = 6304
534             cls.icmp_id_in = 6305
535             cls.icmp_id_out = 6305
536             cls.nat_addr = '10.0.0.3'
537             cls.ipfix_src_port = 4739
538             cls.ipfix_domain_id = 1
539
540             cls.create_pg_interfaces(range(9))
541             cls.interfaces = list(cls.pg_interfaces[0:4])
542
543             for i in cls.interfaces:
544                 i.admin_up()
545                 i.config_ip4()
546                 i.resolve_arp()
547
548             cls.pg0.generate_remote_hosts(3)
549             cls.pg0.configure_ipv4_neighbors()
550
551             cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
552             cls.vapi.ip_table_add_del(10, is_add=1)
553             cls.vapi.ip_table_add_del(20, is_add=1)
554
555             cls.pg4._local_ip4 = "172.16.255.1"
556             cls.pg4._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
557             cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
558             cls.pg4.set_table_ip4(10)
559             cls.pg5._local_ip4 = "172.17.255.3"
560             cls.pg5._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
561             cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
562             cls.pg5.set_table_ip4(10)
563             cls.pg6._local_ip4 = "172.16.255.1"
564             cls.pg6._local_ip4n = socket.inet_pton(socket.AF_INET, i.local_ip4)
565             cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
566             cls.pg6.set_table_ip4(20)
567             for i in cls.overlapping_interfaces:
568                 i.config_ip4()
569                 i.admin_up()
570                 i.resolve_arp()
571
572             cls.pg7.admin_up()
573             cls.pg8.admin_up()
574
575         except Exception:
576             super(TestNAT44, cls).tearDownClass()
577             raise
578
579     def clear_nat44(self):
580         """
581         Clear NAT44 configuration.
582         """
583         # I found no elegant way to do this
584         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
585                                    dst_address_length=32,
586                                    next_hop_address=self.pg7.remote_ip4n,
587                                    next_hop_sw_if_index=self.pg7.sw_if_index,
588                                    is_add=0)
589         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
590                                    dst_address_length=32,
591                                    next_hop_address=self.pg8.remote_ip4n,
592                                    next_hop_sw_if_index=self.pg8.sw_if_index,
593                                    is_add=0)
594
595         for intf in [self.pg7, self.pg8]:
596             neighbors = self.vapi.ip_neighbor_dump(intf.sw_if_index)
597             for n in neighbors:
598                 self.vapi.ip_neighbor_add_del(intf.sw_if_index,
599                                               n.mac_address,
600                                               n.ip_address,
601                                               is_add=0)
602
603         if self.pg7.has_ip4_config:
604             self.pg7.unconfig_ip4()
605
606         interfaces = self.vapi.nat44_interface_addr_dump()
607         for intf in interfaces:
608             self.vapi.nat44_add_interface_addr(intf.sw_if_index, is_add=0)
609
610         self.vapi.nat_ipfix(enable=0, src_port=self.ipfix_src_port,
611                             domain_id=self.ipfix_domain_id)
612         self.ipfix_src_port = 4739
613         self.ipfix_domain_id = 1
614
615         interfaces = self.vapi.nat44_interface_dump()
616         for intf in interfaces:
617             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
618                                                       intf.is_inside,
619                                                       is_add=0)
620
621         interfaces = self.vapi.nat44_interface_output_feature_dump()
622         for intf in interfaces:
623             self.vapi.nat44_interface_add_del_output_feature(intf.sw_if_index,
624                                                              intf.is_inside,
625                                                              is_add=0)
626
627         static_mappings = self.vapi.nat44_static_mapping_dump()
628         for sm in static_mappings:
629             self.vapi.nat44_add_del_static_mapping(
630                 sm.local_ip_address,
631                 sm.external_ip_address,
632                 local_port=sm.local_port,
633                 external_port=sm.external_port,
634                 addr_only=sm.addr_only,
635                 vrf_id=sm.vrf_id,
636                 protocol=sm.protocol,
637                 is_add=0)
638
639         lb_static_mappings = self.vapi.nat44_lb_static_mapping_dump()
640         for lb_sm in lb_static_mappings:
641             self.vapi.nat44_add_del_lb_static_mapping(
642                 lb_sm.external_addr,
643                 lb_sm.external_port,
644                 lb_sm.protocol,
645                 lb_sm.vrf_id,
646                 is_add=0)
647
648         adresses = self.vapi.nat44_address_dump()
649         for addr in adresses:
650             self.vapi.nat44_add_del_address_range(addr.ip_address,
651                                                   addr.ip_address,
652                                                   is_add=0)
653
654     def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
655                                  local_port=0, external_port=0, vrf_id=0,
656                                  is_add=1, external_sw_if_index=0xFFFFFFFF,
657                                  proto=0):
658         """
659         Add/delete NAT44 static mapping
660
661         :param local_ip: Local IP address
662         :param external_ip: External IP address
663         :param local_port: Local port number (Optional)
664         :param external_port: External port number (Optional)
665         :param vrf_id: VRF ID (Default 0)
666         :param is_add: 1 if add, 0 if delete (Default add)
667         :param external_sw_if_index: External interface instead of IP address
668         :param proto: IP protocol (Mandatory if port specified)
669         """
670         addr_only = 1
671         if local_port and external_port:
672             addr_only = 0
673         l_ip = socket.inet_pton(socket.AF_INET, local_ip)
674         e_ip = socket.inet_pton(socket.AF_INET, external_ip)
675         self.vapi.nat44_add_del_static_mapping(
676             l_ip,
677             e_ip,
678             external_sw_if_index,
679             local_port,
680             external_port,
681             addr_only,
682             vrf_id,
683             proto,
684             is_add)
685
686     def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
687         """
688         Add/delete NAT44 address
689
690         :param ip: IP address
691         :param is_add: 1 if add, 0 if delete (Default add)
692         """
693         nat_addr = socket.inet_pton(socket.AF_INET, ip)
694         self.vapi.nat44_add_del_address_range(nat_addr, nat_addr, is_add,
695                                               vrf_id=vrf_id)
696
697     def test_dynamic(self):
698         """ NAT44 dynamic translation test """
699
700         self.nat44_add_address(self.nat_addr)
701         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
702         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
703                                                   is_inside=0)
704
705         # in2out
706         pkts = self.create_stream_in(self.pg0, self.pg1)
707         self.pg0.add_stream(pkts)
708         self.pg_enable_capture(self.pg_interfaces)
709         self.pg_start()
710         capture = self.pg1.get_capture(len(pkts))
711         self.verify_capture_out(capture)
712
713         # out2in
714         pkts = self.create_stream_out(self.pg1)
715         self.pg1.add_stream(pkts)
716         self.pg_enable_capture(self.pg_interfaces)
717         self.pg_start()
718         capture = self.pg0.get_capture(len(pkts))
719         self.verify_capture_in(capture, self.pg0)
720
721     def test_dynamic_icmp_errors_in2out_ttl_1(self):
722         """ NAT44 handling of client packets with TTL=1 """
723
724         self.nat44_add_address(self.nat_addr)
725         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
726         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
727                                                   is_inside=0)
728
729         # Client side - generate traffic
730         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
731         self.pg0.add_stream(pkts)
732         self.pg_enable_capture(self.pg_interfaces)
733         self.pg_start()
734
735         # Client side - verify ICMP type 11 packets
736         capture = self.pg0.get_capture(len(pkts))
737         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
738
739     def test_dynamic_icmp_errors_out2in_ttl_1(self):
740         """ NAT44 handling of server packets with TTL=1 """
741
742         self.nat44_add_address(self.nat_addr)
743         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
744         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
745                                                   is_inside=0)
746
747         # Client side - create sessions
748         pkts = self.create_stream_in(self.pg0, self.pg1)
749         self.pg0.add_stream(pkts)
750         self.pg_enable_capture(self.pg_interfaces)
751         self.pg_start()
752
753         # Server side - generate traffic
754         capture = self.pg1.get_capture(len(pkts))
755         self.verify_capture_out(capture)
756         pkts = self.create_stream_out(self.pg1, ttl=1)
757         self.pg1.add_stream(pkts)
758         self.pg_enable_capture(self.pg_interfaces)
759         self.pg_start()
760
761         # Server side - verify ICMP type 11 packets
762         capture = self.pg1.get_capture(len(pkts))
763         self.verify_capture_out_with_icmp_errors(capture,
764                                                  src_ip=self.pg1.local_ip4)
765
766     def test_dynamic_icmp_errors_in2out_ttl_2(self):
767         """ NAT44 handling of error responses to client packets with TTL=2 """
768
769         self.nat44_add_address(self.nat_addr)
770         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
771         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
772                                                   is_inside=0)
773
774         # Client side - generate traffic
775         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
776         self.pg0.add_stream(pkts)
777         self.pg_enable_capture(self.pg_interfaces)
778         self.pg_start()
779
780         # Server side - simulate ICMP type 11 response
781         capture = self.pg1.get_capture(len(pkts))
782         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
783                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
784                 ICMP(type=11) / packet[IP] for packet in capture]
785         self.pg1.add_stream(pkts)
786         self.pg_enable_capture(self.pg_interfaces)
787         self.pg_start()
788
789         # Client side - verify ICMP type 11 packets
790         capture = self.pg0.get_capture(len(pkts))
791         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
792
793     def test_dynamic_icmp_errors_out2in_ttl_2(self):
794         """ NAT44 handling of error responses to server packets with TTL=2 """
795
796         self.nat44_add_address(self.nat_addr)
797         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
798         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
799                                                   is_inside=0)
800
801         # Client side - create sessions
802         pkts = self.create_stream_in(self.pg0, self.pg1)
803         self.pg0.add_stream(pkts)
804         self.pg_enable_capture(self.pg_interfaces)
805         self.pg_start()
806
807         # Server side - generate traffic
808         capture = self.pg1.get_capture(len(pkts))
809         self.verify_capture_out(capture)
810         pkts = self.create_stream_out(self.pg1, ttl=2)
811         self.pg1.add_stream(pkts)
812         self.pg_enable_capture(self.pg_interfaces)
813         self.pg_start()
814
815         # Client side - simulate ICMP type 11 response
816         capture = self.pg0.get_capture(len(pkts))
817         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
818                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
819                 ICMP(type=11) / packet[IP] for packet in capture]
820         self.pg0.add_stream(pkts)
821         self.pg_enable_capture(self.pg_interfaces)
822         self.pg_start()
823
824         # Server side - verify ICMP type 11 packets
825         capture = self.pg1.get_capture(len(pkts))
826         self.verify_capture_out_with_icmp_errors(capture)
827
828     def test_ping_out_interface_from_outside(self):
829         """ Ping NAT44 out interface from outside network """
830
831         self.nat44_add_address(self.nat_addr)
832         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
833         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
834                                                   is_inside=0)
835
836         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
837              IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
838              ICMP(id=self.icmp_id_out, type='echo-request'))
839         pkts = [p]
840         self.pg1.add_stream(pkts)
841         self.pg_enable_capture(self.pg_interfaces)
842         self.pg_start()
843         capture = self.pg1.get_capture(len(pkts))
844         self.assertEqual(1, len(capture))
845         packet = capture[0]
846         try:
847             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
848             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
849             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
850             self.assertEqual(packet[ICMP].type, 0)  # echo reply
851         except:
852             self.logger.error(ppp("Unexpected or invalid packet "
853                                   "(outside network):", packet))
854             raise
855
856     def test_ping_internal_host_from_outside(self):
857         """ Ping internal host from outside network """
858
859         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
860         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
861         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
862                                                   is_inside=0)
863
864         # out2in
865         pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
866                IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
867                ICMP(id=self.icmp_id_out, type='echo-request'))
868         self.pg1.add_stream(pkt)
869         self.pg_enable_capture(self.pg_interfaces)
870         self.pg_start()
871         capture = self.pg0.get_capture(1)
872         self.verify_capture_in(capture, self.pg0, packet_num=1)
873         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
874
875         # in2out
876         pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
877                IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
878                ICMP(id=self.icmp_id_in, type='echo-reply'))
879         self.pg0.add_stream(pkt)
880         self.pg_enable_capture(self.pg_interfaces)
881         self.pg_start()
882         capture = self.pg1.get_capture(1)
883         self.verify_capture_out(capture, same_port=True, packet_num=1)
884         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
885
886     def test_static_in(self):
887         """ 1:1 NAT initialized from inside network """
888
889         nat_ip = "10.0.0.10"
890         self.tcp_port_out = 6303
891         self.udp_port_out = 6304
892         self.icmp_id_out = 6305
893
894         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
895         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
896         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
897                                                   is_inside=0)
898
899         # in2out
900         pkts = self.create_stream_in(self.pg0, self.pg1)
901         self.pg0.add_stream(pkts)
902         self.pg_enable_capture(self.pg_interfaces)
903         self.pg_start()
904         capture = self.pg1.get_capture(len(pkts))
905         self.verify_capture_out(capture, nat_ip, True)
906
907         # out2in
908         pkts = self.create_stream_out(self.pg1, nat_ip)
909         self.pg1.add_stream(pkts)
910         self.pg_enable_capture(self.pg_interfaces)
911         self.pg_start()
912         capture = self.pg0.get_capture(len(pkts))
913         self.verify_capture_in(capture, self.pg0)
914
915     def test_static_out(self):
916         """ 1:1 NAT initialized from outside network """
917
918         nat_ip = "10.0.0.20"
919         self.tcp_port_out = 6303
920         self.udp_port_out = 6304
921         self.icmp_id_out = 6305
922
923         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
924         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
925         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
926                                                   is_inside=0)
927
928         # out2in
929         pkts = self.create_stream_out(self.pg1, nat_ip)
930         self.pg1.add_stream(pkts)
931         self.pg_enable_capture(self.pg_interfaces)
932         self.pg_start()
933         capture = self.pg0.get_capture(len(pkts))
934         self.verify_capture_in(capture, self.pg0)
935
936         # in2out
937         pkts = self.create_stream_in(self.pg0, self.pg1)
938         self.pg0.add_stream(pkts)
939         self.pg_enable_capture(self.pg_interfaces)
940         self.pg_start()
941         capture = self.pg1.get_capture(len(pkts))
942         self.verify_capture_out(capture, nat_ip, True)
943
944     def test_static_with_port_in(self):
945         """ 1:1 NAPT initialized from inside network """
946
947         self.tcp_port_out = 3606
948         self.udp_port_out = 3607
949         self.icmp_id_out = 3608
950
951         self.nat44_add_address(self.nat_addr)
952         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
953                                       self.tcp_port_in, self.tcp_port_out,
954                                       proto=IP_PROTOS.tcp)
955         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
956                                       self.udp_port_in, self.udp_port_out,
957                                       proto=IP_PROTOS.udp)
958         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
959                                       self.icmp_id_in, self.icmp_id_out,
960                                       proto=IP_PROTOS.icmp)
961         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
962         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
963                                                   is_inside=0)
964
965         # in2out
966         pkts = self.create_stream_in(self.pg0, self.pg1)
967         self.pg0.add_stream(pkts)
968         self.pg_enable_capture(self.pg_interfaces)
969         self.pg_start()
970         capture = self.pg1.get_capture(len(pkts))
971         self.verify_capture_out(capture)
972
973         # out2in
974         pkts = self.create_stream_out(self.pg1)
975         self.pg1.add_stream(pkts)
976         self.pg_enable_capture(self.pg_interfaces)
977         self.pg_start()
978         capture = self.pg0.get_capture(len(pkts))
979         self.verify_capture_in(capture, self.pg0)
980
981     def test_static_with_port_out(self):
982         """ 1:1 NAPT initialized from outside network """
983
984         self.tcp_port_out = 30606
985         self.udp_port_out = 30607
986         self.icmp_id_out = 30608
987
988         self.nat44_add_address(self.nat_addr)
989         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
990                                       self.tcp_port_in, self.tcp_port_out,
991                                       proto=IP_PROTOS.tcp)
992         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
993                                       self.udp_port_in, self.udp_port_out,
994                                       proto=IP_PROTOS.udp)
995         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
996                                       self.icmp_id_in, self.icmp_id_out,
997                                       proto=IP_PROTOS.icmp)
998         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
999         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1000                                                   is_inside=0)
1001
1002         # out2in
1003         pkts = self.create_stream_out(self.pg1)
1004         self.pg1.add_stream(pkts)
1005         self.pg_enable_capture(self.pg_interfaces)
1006         self.pg_start()
1007         capture = self.pg0.get_capture(len(pkts))
1008         self.verify_capture_in(capture, self.pg0)
1009
1010         # in2out
1011         pkts = self.create_stream_in(self.pg0, self.pg1)
1012         self.pg0.add_stream(pkts)
1013         self.pg_enable_capture(self.pg_interfaces)
1014         self.pg_start()
1015         capture = self.pg1.get_capture(len(pkts))
1016         self.verify_capture_out(capture)
1017
1018     def test_static_vrf_aware(self):
1019         """ 1:1 NAT VRF awareness """
1020
1021         nat_ip1 = "10.0.0.30"
1022         nat_ip2 = "10.0.0.40"
1023         self.tcp_port_out = 6303
1024         self.udp_port_out = 6304
1025         self.icmp_id_out = 6305
1026
1027         self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1028                                       vrf_id=10)
1029         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1030                                       vrf_id=10)
1031         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1032                                                   is_inside=0)
1033         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1034         self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1035
1036         # inside interface VRF match NAT44 static mapping VRF
1037         pkts = self.create_stream_in(self.pg4, self.pg3)
1038         self.pg4.add_stream(pkts)
1039         self.pg_enable_capture(self.pg_interfaces)
1040         self.pg_start()
1041         capture = self.pg3.get_capture(len(pkts))
1042         self.verify_capture_out(capture, nat_ip1, True)
1043
1044         # inside interface VRF don't match NAT44 static mapping VRF (packets
1045         # are dropped)
1046         pkts = self.create_stream_in(self.pg0, self.pg3)
1047         self.pg0.add_stream(pkts)
1048         self.pg_enable_capture(self.pg_interfaces)
1049         self.pg_start()
1050         self.pg3.assert_nothing_captured()
1051
1052     def test_static_lb(self):
1053         """ NAT44 local service load balancing """
1054         external_addr_n = socket.inet_pton(socket.AF_INET, self.nat_addr)
1055         external_port = 80
1056         local_port = 8080
1057         server1 = self.pg0.remote_hosts[0]
1058         server2 = self.pg0.remote_hosts[1]
1059
1060         locals = [{'addr': server1.ip4n,
1061                    'port': local_port,
1062                    'probability': 70},
1063                   {'addr': server2.ip4n,
1064                    'port': local_port,
1065                    'probability': 30}]
1066
1067         self.nat44_add_address(self.nat_addr)
1068         self.vapi.nat44_add_del_lb_static_mapping(external_addr_n,
1069                                                   external_port,
1070                                                   IP_PROTOS.tcp,
1071                                                   local_num=len(locals),
1072                                                   locals=locals)
1073         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1074         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1075                                                   is_inside=0)
1076
1077         # from client to service
1078         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1079              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1080              TCP(sport=12345, dport=external_port))
1081         self.pg1.add_stream(p)
1082         self.pg_enable_capture(self.pg_interfaces)
1083         self.pg_start()
1084         capture = self.pg0.get_capture(1)
1085         p = capture[0]
1086         server = None
1087         try:
1088             ip = p[IP]
1089             tcp = p[TCP]
1090             self.assertIn(ip.dst, [server1.ip4, server2.ip4])
1091             if ip.dst == server1.ip4:
1092                 server = server1
1093             else:
1094                 server = server2
1095             self.assertEqual(tcp.dport, local_port)
1096             self.check_tcp_checksum(p)
1097             self.check_ip_checksum(p)
1098         except:
1099             self.logger.error(ppp("Unexpected or invalid packet:", p))
1100             raise
1101
1102         # from service back to client
1103         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1104              IP(src=server.ip4, dst=self.pg1.remote_ip4) /
1105              TCP(sport=local_port, dport=12345))
1106         self.pg0.add_stream(p)
1107         self.pg_enable_capture(self.pg_interfaces)
1108         self.pg_start()
1109         capture = self.pg1.get_capture(1)
1110         p = capture[0]
1111         try:
1112             ip = p[IP]
1113             tcp = p[TCP]
1114             self.assertEqual(ip.src, self.nat_addr)
1115             self.assertEqual(tcp.sport, external_port)
1116             self.check_tcp_checksum(p)
1117             self.check_ip_checksum(p)
1118         except:
1119             self.logger.error(ppp("Unexpected or invalid packet:", p))
1120             raise
1121
1122         # multiple clients
1123         server1_n = 0
1124         server2_n = 0
1125         clients = ip4_range(self.pg1.remote_ip4, 10, 20)
1126         pkts = []
1127         for client in clients:
1128             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1129                  IP(src=client, dst=self.nat_addr) /
1130                  TCP(sport=12345, dport=external_port))
1131             pkts.append(p)
1132         self.pg1.add_stream(pkts)
1133         self.pg_enable_capture(self.pg_interfaces)
1134         self.pg_start()
1135         capture = self.pg0.get_capture(len(pkts))
1136         for p in capture:
1137             if p[IP].dst == server1.ip4:
1138                 server1_n += 1
1139             else:
1140                 server2_n += 1
1141         self.assertTrue(server1_n > server2_n)
1142
1143     def test_multiple_inside_interfaces(self):
1144         """ NAT44 multiple non-overlapping address space inside interfaces """
1145
1146         self.nat44_add_address(self.nat_addr)
1147         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1148         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1149         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1150                                                   is_inside=0)
1151
1152         # between two NAT44 inside interfaces (no translation)
1153         pkts = self.create_stream_in(self.pg0, self.pg1)
1154         self.pg0.add_stream(pkts)
1155         self.pg_enable_capture(self.pg_interfaces)
1156         self.pg_start()
1157         capture = self.pg1.get_capture(len(pkts))
1158         self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1159
1160         # from NAT44 inside to interface without NAT44 feature (no translation)
1161         pkts = self.create_stream_in(self.pg0, self.pg2)
1162         self.pg0.add_stream(pkts)
1163         self.pg_enable_capture(self.pg_interfaces)
1164         self.pg_start()
1165         capture = self.pg2.get_capture(len(pkts))
1166         self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1167
1168         # in2out 1st interface
1169         pkts = self.create_stream_in(self.pg0, self.pg3)
1170         self.pg0.add_stream(pkts)
1171         self.pg_enable_capture(self.pg_interfaces)
1172         self.pg_start()
1173         capture = self.pg3.get_capture(len(pkts))
1174         self.verify_capture_out(capture)
1175
1176         # out2in 1st interface
1177         pkts = self.create_stream_out(self.pg3)
1178         self.pg3.add_stream(pkts)
1179         self.pg_enable_capture(self.pg_interfaces)
1180         self.pg_start()
1181         capture = self.pg0.get_capture(len(pkts))
1182         self.verify_capture_in(capture, self.pg0)
1183
1184         # in2out 2nd interface
1185         pkts = self.create_stream_in(self.pg1, self.pg3)
1186         self.pg1.add_stream(pkts)
1187         self.pg_enable_capture(self.pg_interfaces)
1188         self.pg_start()
1189         capture = self.pg3.get_capture(len(pkts))
1190         self.verify_capture_out(capture)
1191
1192         # out2in 2nd interface
1193         pkts = self.create_stream_out(self.pg3)
1194         self.pg3.add_stream(pkts)
1195         self.pg_enable_capture(self.pg_interfaces)
1196         self.pg_start()
1197         capture = self.pg1.get_capture(len(pkts))
1198         self.verify_capture_in(capture, self.pg1)
1199
1200     def test_inside_overlapping_interfaces(self):
1201         """ NAT44 multiple inside interfaces with overlapping address space """
1202
1203         static_nat_ip = "10.0.0.10"
1204         self.nat44_add_address(self.nat_addr)
1205         self.vapi.nat44_interface_add_del_feature(self.pg3.sw_if_index,
1206                                                   is_inside=0)
1207         self.vapi.nat44_interface_add_del_feature(self.pg4.sw_if_index)
1208         self.vapi.nat44_interface_add_del_feature(self.pg5.sw_if_index)
1209         self.vapi.nat44_interface_add_del_feature(self.pg6.sw_if_index)
1210         self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1211                                       vrf_id=20)
1212
1213         # between NAT44 inside interfaces with same VRF (no translation)
1214         pkts = self.create_stream_in(self.pg4, self.pg5)
1215         self.pg4.add_stream(pkts)
1216         self.pg_enable_capture(self.pg_interfaces)
1217         self.pg_start()
1218         capture = self.pg5.get_capture(len(pkts))
1219         self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1220
1221         # between NAT44 inside interfaces with different VRF (hairpinning)
1222         p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1223              IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1224              TCP(sport=1234, dport=5678))
1225         self.pg4.add_stream(p)
1226         self.pg_enable_capture(self.pg_interfaces)
1227         self.pg_start()
1228         capture = self.pg6.get_capture(1)
1229         p = capture[0]
1230         try:
1231             ip = p[IP]
1232             tcp = p[TCP]
1233             self.assertEqual(ip.src, self.nat_addr)
1234             self.assertEqual(ip.dst, self.pg6.remote_ip4)
1235             self.assertNotEqual(tcp.sport, 1234)
1236             self.assertEqual(tcp.dport, 5678)
1237         except:
1238             self.logger.error(ppp("Unexpected or invalid packet:", p))
1239             raise
1240
1241         # in2out 1st interface
1242         pkts = self.create_stream_in(self.pg4, self.pg3)
1243         self.pg4.add_stream(pkts)
1244         self.pg_enable_capture(self.pg_interfaces)
1245         self.pg_start()
1246         capture = self.pg3.get_capture(len(pkts))
1247         self.verify_capture_out(capture)
1248
1249         # out2in 1st interface
1250         pkts = self.create_stream_out(self.pg3)
1251         self.pg3.add_stream(pkts)
1252         self.pg_enable_capture(self.pg_interfaces)
1253         self.pg_start()
1254         capture = self.pg4.get_capture(len(pkts))
1255         self.verify_capture_in(capture, self.pg4)
1256
1257         # in2out 2nd interface
1258         pkts = self.create_stream_in(self.pg5, self.pg3)
1259         self.pg5.add_stream(pkts)
1260         self.pg_enable_capture(self.pg_interfaces)
1261         self.pg_start()
1262         capture = self.pg3.get_capture(len(pkts))
1263         self.verify_capture_out(capture)
1264
1265         # out2in 2nd interface
1266         pkts = self.create_stream_out(self.pg3)
1267         self.pg3.add_stream(pkts)
1268         self.pg_enable_capture(self.pg_interfaces)
1269         self.pg_start()
1270         capture = self.pg5.get_capture(len(pkts))
1271         self.verify_capture_in(capture, self.pg5)
1272
1273         # pg5 session dump
1274         addresses = self.vapi.nat44_address_dump()
1275         self.assertEqual(len(addresses), 1)
1276         sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4n, 10)
1277         self.assertEqual(len(sessions), 3)
1278         for session in sessions:
1279             self.assertFalse(session.is_static)
1280             self.assertEqual(session.inside_ip_address[0:4],
1281                              self.pg5.remote_ip4n)
1282             self.assertEqual(session.outside_ip_address,
1283                              addresses[0].ip_address)
1284         self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1285         self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1286         self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1287         self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1288         self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1289         self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1290         self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1291         self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1292         self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1293
1294         # in2out 3rd interface
1295         pkts = self.create_stream_in(self.pg6, self.pg3)
1296         self.pg6.add_stream(pkts)
1297         self.pg_enable_capture(self.pg_interfaces)
1298         self.pg_start()
1299         capture = self.pg3.get_capture(len(pkts))
1300         self.verify_capture_out(capture, static_nat_ip, True)
1301
1302         # out2in 3rd interface
1303         pkts = self.create_stream_out(self.pg3, static_nat_ip)
1304         self.pg3.add_stream(pkts)
1305         self.pg_enable_capture(self.pg_interfaces)
1306         self.pg_start()
1307         capture = self.pg6.get_capture(len(pkts))
1308         self.verify_capture_in(capture, self.pg6)
1309
1310         # general user and session dump verifications
1311         users = self.vapi.nat44_user_dump()
1312         self.assertTrue(len(users) >= 3)
1313         addresses = self.vapi.nat44_address_dump()
1314         self.assertEqual(len(addresses), 1)
1315         for user in users:
1316             sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1317                                                          user.vrf_id)
1318             for session in sessions:
1319                 self.assertEqual(user.ip_address, session.inside_ip_address)
1320                 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1321                 self.assertTrue(session.protocol in
1322                                 [IP_PROTOS.tcp, IP_PROTOS.udp,
1323                                  IP_PROTOS.icmp])
1324
1325         # pg4 session dump
1326         sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4n, 10)
1327         self.assertTrue(len(sessions) >= 4)
1328         for session in sessions:
1329             self.assertFalse(session.is_static)
1330             self.assertEqual(session.inside_ip_address[0:4],
1331                              self.pg4.remote_ip4n)
1332             self.assertEqual(session.outside_ip_address,
1333                              addresses[0].ip_address)
1334
1335         # pg6 session dump
1336         sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4n, 20)
1337         self.assertTrue(len(sessions) >= 3)
1338         for session in sessions:
1339             self.assertTrue(session.is_static)
1340             self.assertEqual(session.inside_ip_address[0:4],
1341                              self.pg6.remote_ip4n)
1342             self.assertEqual(map(ord, session.outside_ip_address[0:4]),
1343                              map(int, static_nat_ip.split('.')))
1344             self.assertTrue(session.inside_port in
1345                             [self.tcp_port_in, self.udp_port_in,
1346                              self.icmp_id_in])
1347
1348     def test_hairpinning(self):
1349         """ NAT44 hairpinning - 1:1 NAPT """
1350
1351         host = self.pg0.remote_hosts[0]
1352         server = self.pg0.remote_hosts[1]
1353         host_in_port = 1234
1354         host_out_port = 0
1355         server_in_port = 5678
1356         server_out_port = 8765
1357
1358         self.nat44_add_address(self.nat_addr)
1359         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1360         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1361                                                   is_inside=0)
1362         # add static mapping for server
1363         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1364                                       server_in_port, server_out_port,
1365                                       proto=IP_PROTOS.tcp)
1366
1367         # send packet from host to server
1368         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1369              IP(src=host.ip4, dst=self.nat_addr) /
1370              TCP(sport=host_in_port, dport=server_out_port))
1371         self.pg0.add_stream(p)
1372         self.pg_enable_capture(self.pg_interfaces)
1373         self.pg_start()
1374         capture = self.pg0.get_capture(1)
1375         p = capture[0]
1376         try:
1377             ip = p[IP]
1378             tcp = p[TCP]
1379             self.assertEqual(ip.src, self.nat_addr)
1380             self.assertEqual(ip.dst, server.ip4)
1381             self.assertNotEqual(tcp.sport, host_in_port)
1382             self.assertEqual(tcp.dport, server_in_port)
1383             self.check_tcp_checksum(p)
1384             host_out_port = tcp.sport
1385         except:
1386             self.logger.error(ppp("Unexpected or invalid packet:", p))
1387             raise
1388
1389         # send reply from server to host
1390         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1391              IP(src=server.ip4, dst=self.nat_addr) /
1392              TCP(sport=server_in_port, dport=host_out_port))
1393         self.pg0.add_stream(p)
1394         self.pg_enable_capture(self.pg_interfaces)
1395         self.pg_start()
1396         capture = self.pg0.get_capture(1)
1397         p = capture[0]
1398         try:
1399             ip = p[IP]
1400             tcp = p[TCP]
1401             self.assertEqual(ip.src, self.nat_addr)
1402             self.assertEqual(ip.dst, host.ip4)
1403             self.assertEqual(tcp.sport, server_out_port)
1404             self.assertEqual(tcp.dport, host_in_port)
1405             self.check_tcp_checksum(p)
1406         except:
1407             self.logger.error(ppp("Unexpected or invalid packet:"), p)
1408             raise
1409
1410     def test_hairpinning2(self):
1411         """ NAT44 hairpinning - 1:1 NAT"""
1412
1413         server1_nat_ip = "10.0.0.10"
1414         server2_nat_ip = "10.0.0.11"
1415         host = self.pg0.remote_hosts[0]
1416         server1 = self.pg0.remote_hosts[1]
1417         server2 = self.pg0.remote_hosts[2]
1418         server_tcp_port = 22
1419         server_udp_port = 20
1420
1421         self.nat44_add_address(self.nat_addr)
1422         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1423         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1424                                                   is_inside=0)
1425
1426         # add static mapping for servers
1427         self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1428         self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1429
1430         # host to server1
1431         pkts = []
1432         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1433              IP(src=host.ip4, dst=server1_nat_ip) /
1434              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1435         pkts.append(p)
1436         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1437              IP(src=host.ip4, dst=server1_nat_ip) /
1438              UDP(sport=self.udp_port_in, dport=server_udp_port))
1439         pkts.append(p)
1440         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1441              IP(src=host.ip4, dst=server1_nat_ip) /
1442              ICMP(id=self.icmp_id_in, type='echo-request'))
1443         pkts.append(p)
1444         self.pg0.add_stream(pkts)
1445         self.pg_enable_capture(self.pg_interfaces)
1446         self.pg_start()
1447         capture = self.pg0.get_capture(len(pkts))
1448         for packet in capture:
1449             try:
1450                 self.assertEqual(packet[IP].src, self.nat_addr)
1451                 self.assertEqual(packet[IP].dst, server1.ip4)
1452                 if packet.haslayer(TCP):
1453                     self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1454                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1455                     self.tcp_port_out = packet[TCP].sport
1456                     self.check_tcp_checksum(packet)
1457                 elif packet.haslayer(UDP):
1458                     self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1459                     self.assertEqual(packet[UDP].dport, server_udp_port)
1460                     self.udp_port_out = packet[UDP].sport
1461                 else:
1462                     self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1463                     self.icmp_id_out = packet[ICMP].id
1464             except:
1465                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1466                 raise
1467
1468         # server1 to host
1469         pkts = []
1470         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1471              IP(src=server1.ip4, dst=self.nat_addr) /
1472              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1473         pkts.append(p)
1474         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1475              IP(src=server1.ip4, dst=self.nat_addr) /
1476              UDP(sport=server_udp_port, dport=self.udp_port_out))
1477         pkts.append(p)
1478         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1479              IP(src=server1.ip4, dst=self.nat_addr) /
1480              ICMP(id=self.icmp_id_out, type='echo-reply'))
1481         pkts.append(p)
1482         self.pg0.add_stream(pkts)
1483         self.pg_enable_capture(self.pg_interfaces)
1484         self.pg_start()
1485         capture = self.pg0.get_capture(len(pkts))
1486         for packet in capture:
1487             try:
1488                 self.assertEqual(packet[IP].src, server1_nat_ip)
1489                 self.assertEqual(packet[IP].dst, host.ip4)
1490                 if packet.haslayer(TCP):
1491                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1492                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1493                     self.check_tcp_checksum(packet)
1494                 elif packet.haslayer(UDP):
1495                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1496                     self.assertEqual(packet[UDP].sport, server_udp_port)
1497                 else:
1498                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1499             except:
1500                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1501                 raise
1502
1503         # server2 to server1
1504         pkts = []
1505         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1506              IP(src=server2.ip4, dst=server1_nat_ip) /
1507              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1508         pkts.append(p)
1509         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1510              IP(src=server2.ip4, dst=server1_nat_ip) /
1511              UDP(sport=self.udp_port_in, dport=server_udp_port))
1512         pkts.append(p)
1513         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1514              IP(src=server2.ip4, dst=server1_nat_ip) /
1515              ICMP(id=self.icmp_id_in, type='echo-request'))
1516         pkts.append(p)
1517         self.pg0.add_stream(pkts)
1518         self.pg_enable_capture(self.pg_interfaces)
1519         self.pg_start()
1520         capture = self.pg0.get_capture(len(pkts))
1521         for packet in capture:
1522             try:
1523                 self.assertEqual(packet[IP].src, server2_nat_ip)
1524                 self.assertEqual(packet[IP].dst, server1.ip4)
1525                 if packet.haslayer(TCP):
1526                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
1527                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1528                     self.tcp_port_out = packet[TCP].sport
1529                     self.check_tcp_checksum(packet)
1530                 elif packet.haslayer(UDP):
1531                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
1532                     self.assertEqual(packet[UDP].dport, server_udp_port)
1533                     self.udp_port_out = packet[UDP].sport
1534                 else:
1535                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1536                     self.icmp_id_out = packet[ICMP].id
1537             except:
1538                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1539                 raise
1540
1541         # server1 to server2
1542         pkts = []
1543         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1544              IP(src=server1.ip4, dst=server2_nat_ip) /
1545              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1546         pkts.append(p)
1547         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1548              IP(src=server1.ip4, dst=server2_nat_ip) /
1549              UDP(sport=server_udp_port, dport=self.udp_port_out))
1550         pkts.append(p)
1551         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1552              IP(src=server1.ip4, dst=server2_nat_ip) /
1553              ICMP(id=self.icmp_id_out, type='echo-reply'))
1554         pkts.append(p)
1555         self.pg0.add_stream(pkts)
1556         self.pg_enable_capture(self.pg_interfaces)
1557         self.pg_start()
1558         capture = self.pg0.get_capture(len(pkts))
1559         for packet in capture:
1560             try:
1561                 self.assertEqual(packet[IP].src, server1_nat_ip)
1562                 self.assertEqual(packet[IP].dst, server2.ip4)
1563                 if packet.haslayer(TCP):
1564                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1565                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1566                     self.check_tcp_checksum(packet)
1567                 elif packet.haslayer(UDP):
1568                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1569                     self.assertEqual(packet[UDP].sport, server_udp_port)
1570                 else:
1571                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1572             except:
1573                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1574                 raise
1575
1576     def test_max_translations_per_user(self):
1577         """ MAX translations per user - recycle the least recently used """
1578
1579         self.nat44_add_address(self.nat_addr)
1580         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1581         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1582                                                   is_inside=0)
1583
1584         # get maximum number of translations per user
1585         nat44_config = self.vapi.nat_show_config()
1586
1587         # send more than maximum number of translations per user packets
1588         pkts_num = nat44_config.max_translations_per_user + 5
1589         pkts = []
1590         for port in range(0, pkts_num):
1591             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1592                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1593                  TCP(sport=1025 + port))
1594             pkts.append(p)
1595         self.pg0.add_stream(pkts)
1596         self.pg_enable_capture(self.pg_interfaces)
1597         self.pg_start()
1598
1599         # verify number of translated packet
1600         self.pg1.get_capture(pkts_num)
1601
1602     def test_interface_addr(self):
1603         """ Acquire NAT44 addresses from interface """
1604         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1605
1606         # no address in NAT pool
1607         adresses = self.vapi.nat44_address_dump()
1608         self.assertEqual(0, len(adresses))
1609
1610         # configure interface address and check NAT address pool
1611         self.pg7.config_ip4()
1612         adresses = self.vapi.nat44_address_dump()
1613         self.assertEqual(1, len(adresses))
1614         self.assertEqual(adresses[0].ip_address[0:4], self.pg7.local_ip4n)
1615
1616         # remove interface address and check NAT address pool
1617         self.pg7.unconfig_ip4()
1618         adresses = self.vapi.nat44_address_dump()
1619         self.assertEqual(0, len(adresses))
1620
1621     def test_interface_addr_static_mapping(self):
1622         """ Static mapping with addresses from interface """
1623         self.vapi.nat44_add_interface_addr(self.pg7.sw_if_index)
1624         self.nat44_add_static_mapping(
1625             '1.2.3.4',
1626             external_sw_if_index=self.pg7.sw_if_index)
1627
1628         # static mappings with external interface
1629         static_mappings = self.vapi.nat44_static_mapping_dump()
1630         self.assertEqual(1, len(static_mappings))
1631         self.assertEqual(self.pg7.sw_if_index,
1632                          static_mappings[0].external_sw_if_index)
1633
1634         # configure interface address and check static mappings
1635         self.pg7.config_ip4()
1636         static_mappings = self.vapi.nat44_static_mapping_dump()
1637         self.assertEqual(1, len(static_mappings))
1638         self.assertEqual(static_mappings[0].external_ip_address[0:4],
1639                          self.pg7.local_ip4n)
1640         self.assertEqual(0xFFFFFFFF, static_mappings[0].external_sw_if_index)
1641
1642         # remove interface address and check static mappings
1643         self.pg7.unconfig_ip4()
1644         static_mappings = self.vapi.nat44_static_mapping_dump()
1645         self.assertEqual(0, len(static_mappings))
1646
1647     def test_ipfix_nat44_sess(self):
1648         """ IPFIX logging NAT44 session created/delted """
1649         self.ipfix_domain_id = 10
1650         self.ipfix_src_port = 20202
1651         colector_port = 30303
1652         bind_layers(UDP, IPFIX, dport=30303)
1653         self.nat44_add_address(self.nat_addr)
1654         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1655         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1656                                                   is_inside=0)
1657         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1658                                      src_address=self.pg3.local_ip4n,
1659                                      path_mtu=512,
1660                                      template_interval=10,
1661                                      collector_port=colector_port)
1662         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1663                             src_port=self.ipfix_src_port)
1664
1665         pkts = self.create_stream_in(self.pg0, self.pg1)
1666         self.pg0.add_stream(pkts)
1667         self.pg_enable_capture(self.pg_interfaces)
1668         self.pg_start()
1669         capture = self.pg1.get_capture(len(pkts))
1670         self.verify_capture_out(capture)
1671         self.nat44_add_address(self.nat_addr, is_add=0)
1672         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1673         capture = self.pg3.get_capture(3)
1674         ipfix = IPFIXDecoder()
1675         # first load template
1676         for p in capture:
1677             self.assertTrue(p.haslayer(IPFIX))
1678             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1679             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1680             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1681             self.assertEqual(p[UDP].dport, colector_port)
1682             self.assertEqual(p[IPFIX].observationDomainID,
1683                              self.ipfix_domain_id)
1684             if p.haslayer(Template):
1685                 ipfix.add_template(p.getlayer(Template))
1686         # verify events in data set
1687         for p in capture:
1688             if p.haslayer(Data):
1689                 data = ipfix.decode_data_set(p.getlayer(Set))
1690                 self.verify_ipfix_nat44_ses(data)
1691
1692     def test_ipfix_addr_exhausted(self):
1693         """ IPFIX logging NAT addresses exhausted """
1694         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1695         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1696                                                   is_inside=0)
1697         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4n,
1698                                      src_address=self.pg3.local_ip4n,
1699                                      path_mtu=512,
1700                                      template_interval=10)
1701         self.vapi.nat_ipfix(domain_id=self.ipfix_domain_id,
1702                             src_port=self.ipfix_src_port)
1703
1704         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
1705              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1706              TCP(sport=3025))
1707         self.pg0.add_stream(p)
1708         self.pg_enable_capture(self.pg_interfaces)
1709         self.pg_start()
1710         capture = self.pg1.get_capture(0)
1711         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
1712         capture = self.pg3.get_capture(3)
1713         ipfix = IPFIXDecoder()
1714         # first load template
1715         for p in capture:
1716             self.assertTrue(p.haslayer(IPFIX))
1717             self.assertEqual(p[IP].src, self.pg3.local_ip4)
1718             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
1719             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
1720             self.assertEqual(p[UDP].dport, 4739)
1721             self.assertEqual(p[IPFIX].observationDomainID,
1722                              self.ipfix_domain_id)
1723             if p.haslayer(Template):
1724                 ipfix.add_template(p.getlayer(Template))
1725         # verify events in data set
1726         for p in capture:
1727             if p.haslayer(Data):
1728                 data = ipfix.decode_data_set(p.getlayer(Set))
1729                 self.verify_ipfix_addr_exhausted(data)
1730
1731     def test_pool_addr_fib(self):
1732         """ NAT44 add pool addresses to FIB """
1733         static_addr = '10.0.0.10'
1734         self.nat44_add_address(self.nat_addr)
1735         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1736         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
1737                                                   is_inside=0)
1738         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
1739
1740         # NAT44 address
1741         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1742              ARP(op=ARP.who_has, pdst=self.nat_addr,
1743                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1744         self.pg1.add_stream(p)
1745         self.pg_enable_capture(self.pg_interfaces)
1746         self.pg_start()
1747         capture = self.pg1.get_capture(1)
1748         self.assertTrue(capture[0].haslayer(ARP))
1749         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1750
1751         # 1:1 NAT address
1752         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1753              ARP(op=ARP.who_has, pdst=static_addr,
1754                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1755         self.pg1.add_stream(p)
1756         self.pg_enable_capture(self.pg_interfaces)
1757         self.pg_start()
1758         capture = self.pg1.get_capture(1)
1759         self.assertTrue(capture[0].haslayer(ARP))
1760         self.assertTrue(capture[0][ARP].op, ARP.is_at)
1761
1762         # send ARP to non-NAT44 interface
1763         p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1764              ARP(op=ARP.who_has, pdst=self.nat_addr,
1765                  psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
1766         self.pg2.add_stream(p)
1767         self.pg_enable_capture(self.pg_interfaces)
1768         self.pg_start()
1769         capture = self.pg1.get_capture(0)
1770
1771         # remove addresses and verify
1772         self.nat44_add_address(self.nat_addr, is_add=0)
1773         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
1774                                       is_add=0)
1775
1776         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1777              ARP(op=ARP.who_has, pdst=self.nat_addr,
1778                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1779         self.pg1.add_stream(p)
1780         self.pg_enable_capture(self.pg_interfaces)
1781         self.pg_start()
1782         capture = self.pg1.get_capture(0)
1783
1784         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
1785              ARP(op=ARP.who_has, pdst=static_addr,
1786                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
1787         self.pg1.add_stream(p)
1788         self.pg_enable_capture(self.pg_interfaces)
1789         self.pg_start()
1790         capture = self.pg1.get_capture(0)
1791
1792     def test_vrf_mode(self):
1793         """ NAT44 tenant VRF aware address pool mode """
1794
1795         vrf_id1 = 1
1796         vrf_id2 = 2
1797         nat_ip1 = "10.0.0.10"
1798         nat_ip2 = "10.0.0.11"
1799
1800         self.pg0.unconfig_ip4()
1801         self.pg1.unconfig_ip4()
1802         self.vapi.ip_table_add_del(vrf_id1, is_add=1)
1803         self.vapi.ip_table_add_del(vrf_id2, is_add=1)
1804         self.pg0.set_table_ip4(vrf_id1)
1805         self.pg1.set_table_ip4(vrf_id2)
1806         self.pg0.config_ip4()
1807         self.pg1.config_ip4()
1808
1809         self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
1810         self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
1811         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1812         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1813         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
1814                                                   is_inside=0)
1815
1816         # first VRF
1817         pkts = self.create_stream_in(self.pg0, self.pg2)
1818         self.pg0.add_stream(pkts)
1819         self.pg_enable_capture(self.pg_interfaces)
1820         self.pg_start()
1821         capture = self.pg2.get_capture(len(pkts))
1822         self.verify_capture_out(capture, nat_ip1)
1823
1824         # second VRF
1825         pkts = self.create_stream_in(self.pg1, self.pg2)
1826         self.pg1.add_stream(pkts)
1827         self.pg_enable_capture(self.pg_interfaces)
1828         self.pg_start()
1829         capture = self.pg2.get_capture(len(pkts))
1830         self.verify_capture_out(capture, nat_ip2)
1831
1832         self.pg0.unconfig_ip4()
1833         self.pg1.unconfig_ip4()
1834         self.pg0.set_table_ip4(0)
1835         self.pg1.set_table_ip4(0)
1836         self.vapi.ip_table_add_del(vrf_id1, is_add=0)
1837         self.vapi.ip_table_add_del(vrf_id2, is_add=0)
1838
1839     def test_vrf_feature_independent(self):
1840         """ NAT44 tenant VRF independent address pool mode """
1841
1842         nat_ip1 = "10.0.0.10"
1843         nat_ip2 = "10.0.0.11"
1844
1845         self.nat44_add_address(nat_ip1)
1846         self.nat44_add_address(nat_ip2)
1847         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
1848         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index)
1849         self.vapi.nat44_interface_add_del_feature(self.pg2.sw_if_index,
1850                                                   is_inside=0)
1851
1852         # first VRF
1853         pkts = self.create_stream_in(self.pg0, self.pg2)
1854         self.pg0.add_stream(pkts)
1855         self.pg_enable_capture(self.pg_interfaces)
1856         self.pg_start()
1857         capture = self.pg2.get_capture(len(pkts))
1858         self.verify_capture_out(capture, nat_ip1)
1859
1860         # second VRF
1861         pkts = self.create_stream_in(self.pg1, self.pg2)
1862         self.pg1.add_stream(pkts)
1863         self.pg_enable_capture(self.pg_interfaces)
1864         self.pg_start()
1865         capture = self.pg2.get_capture(len(pkts))
1866         self.verify_capture_out(capture, nat_ip1)
1867
1868     def test_dynamic_ipless_interfaces(self):
1869         """ NAT44 interfaces without configured IP address """
1870
1871         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1872                                       self.pg7.remote_mac,
1873                                       self.pg7.remote_ip4n,
1874                                       is_static=1)
1875         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1876                                       self.pg8.remote_mac,
1877                                       self.pg8.remote_ip4n,
1878                                       is_static=1)
1879
1880         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1881                                    dst_address_length=32,
1882                                    next_hop_address=self.pg7.remote_ip4n,
1883                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1884         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1885                                    dst_address_length=32,
1886                                    next_hop_address=self.pg8.remote_ip4n,
1887                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1888
1889         self.nat44_add_address(self.nat_addr)
1890         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
1891         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
1892                                                   is_inside=0)
1893
1894         # in2out
1895         pkts = self.create_stream_in(self.pg7, self.pg8)
1896         self.pg7.add_stream(pkts)
1897         self.pg_enable_capture(self.pg_interfaces)
1898         self.pg_start()
1899         capture = self.pg8.get_capture(len(pkts))
1900         self.verify_capture_out(capture)
1901
1902         # out2in
1903         pkts = self.create_stream_out(self.pg8, self.nat_addr)
1904         self.pg8.add_stream(pkts)
1905         self.pg_enable_capture(self.pg_interfaces)
1906         self.pg_start()
1907         capture = self.pg7.get_capture(len(pkts))
1908         self.verify_capture_in(capture, self.pg7)
1909
1910     def test_static_ipless_interfaces(self):
1911         """ NAT44 interfaces without configured IP address - 1:1 NAT """
1912
1913         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1914                                       self.pg7.remote_mac,
1915                                       self.pg7.remote_ip4n,
1916                                       is_static=1)
1917         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1918                                       self.pg8.remote_mac,
1919                                       self.pg8.remote_ip4n,
1920                                       is_static=1)
1921
1922         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1923                                    dst_address_length=32,
1924                                    next_hop_address=self.pg7.remote_ip4n,
1925                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1926         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1927                                    dst_address_length=32,
1928                                    next_hop_address=self.pg8.remote_ip4n,
1929                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1930
1931         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
1932         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
1933         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
1934                                                   is_inside=0)
1935
1936         # out2in
1937         pkts = self.create_stream_out(self.pg8)
1938         self.pg8.add_stream(pkts)
1939         self.pg_enable_capture(self.pg_interfaces)
1940         self.pg_start()
1941         capture = self.pg7.get_capture(len(pkts))
1942         self.verify_capture_in(capture, self.pg7)
1943
1944         # in2out
1945         pkts = self.create_stream_in(self.pg7, self.pg8)
1946         self.pg7.add_stream(pkts)
1947         self.pg_enable_capture(self.pg_interfaces)
1948         self.pg_start()
1949         capture = self.pg8.get_capture(len(pkts))
1950         self.verify_capture_out(capture, self.nat_addr, True)
1951
1952     def test_static_with_port_ipless_interfaces(self):
1953         """ NAT44 interfaces without configured IP address - 1:1 NAPT """
1954
1955         self.tcp_port_out = 30606
1956         self.udp_port_out = 30607
1957         self.icmp_id_out = 30608
1958
1959         self.vapi.ip_neighbor_add_del(self.pg7.sw_if_index,
1960                                       self.pg7.remote_mac,
1961                                       self.pg7.remote_ip4n,
1962                                       is_static=1)
1963         self.vapi.ip_neighbor_add_del(self.pg8.sw_if_index,
1964                                       self.pg8.remote_mac,
1965                                       self.pg8.remote_ip4n,
1966                                       is_static=1)
1967
1968         self.vapi.ip_add_del_route(dst_address=self.pg7.remote_ip4n,
1969                                    dst_address_length=32,
1970                                    next_hop_address=self.pg7.remote_ip4n,
1971                                    next_hop_sw_if_index=self.pg7.sw_if_index)
1972         self.vapi.ip_add_del_route(dst_address=self.pg8.remote_ip4n,
1973                                    dst_address_length=32,
1974                                    next_hop_address=self.pg8.remote_ip4n,
1975                                    next_hop_sw_if_index=self.pg8.sw_if_index)
1976
1977         self.nat44_add_address(self.nat_addr)
1978         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
1979                                       self.tcp_port_in, self.tcp_port_out,
1980                                       proto=IP_PROTOS.tcp)
1981         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
1982                                       self.udp_port_in, self.udp_port_out,
1983                                       proto=IP_PROTOS.udp)
1984         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
1985                                       self.icmp_id_in, self.icmp_id_out,
1986                                       proto=IP_PROTOS.icmp)
1987         self.vapi.nat44_interface_add_del_feature(self.pg7.sw_if_index)
1988         self.vapi.nat44_interface_add_del_feature(self.pg8.sw_if_index,
1989                                                   is_inside=0)
1990
1991         # out2in
1992         pkts = self.create_stream_out(self.pg8)
1993         self.pg8.add_stream(pkts)
1994         self.pg_enable_capture(self.pg_interfaces)
1995         self.pg_start()
1996         capture = self.pg7.get_capture(len(pkts))
1997         self.verify_capture_in(capture, self.pg7)
1998
1999         # in2out
2000         pkts = self.create_stream_in(self.pg7, self.pg8)
2001         self.pg7.add_stream(pkts)
2002         self.pg_enable_capture(self.pg_interfaces)
2003         self.pg_start()
2004         capture = self.pg8.get_capture(len(pkts))
2005         self.verify_capture_out(capture)
2006
2007     def test_static_unknown_proto(self):
2008         """ 1:1 NAT translate packet with unknown protocol """
2009         nat_ip = "10.0.0.10"
2010         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2011         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2012         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2013                                                   is_inside=0)
2014
2015         # in2out
2016         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2017              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2018              GRE() /
2019              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2020              TCP(sport=1234, dport=1234))
2021         self.pg0.add_stream(p)
2022         self.pg_enable_capture(self.pg_interfaces)
2023         self.pg_start()
2024         p = self.pg1.get_capture(1)
2025         packet = p[0]
2026         try:
2027             self.assertEqual(packet[IP].src, nat_ip)
2028             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2029             self.assertTrue(packet.haslayer(GRE))
2030             self.check_ip_checksum(packet)
2031         except:
2032             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2033             raise
2034
2035         # out2in
2036         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2037              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2038              GRE() /
2039              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2040              TCP(sport=1234, dport=1234))
2041         self.pg1.add_stream(p)
2042         self.pg_enable_capture(self.pg_interfaces)
2043         self.pg_start()
2044         p = self.pg0.get_capture(1)
2045         packet = p[0]
2046         try:
2047             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2048             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2049             self.assertTrue(packet.haslayer(GRE))
2050             self.check_ip_checksum(packet)
2051         except:
2052             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2053             raise
2054
2055     def test_hairpinning_static_unknown_proto(self):
2056         """ 1:1 NAT translate packet with unknown protocol - hairpinning """
2057
2058         host = self.pg0.remote_hosts[0]
2059         server = self.pg0.remote_hosts[1]
2060
2061         host_nat_ip = "10.0.0.10"
2062         server_nat_ip = "10.0.0.11"
2063
2064         self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2065         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2066         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2067         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2068                                                   is_inside=0)
2069
2070         # host to server
2071         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2072              IP(src=host.ip4, dst=server_nat_ip) /
2073              GRE() /
2074              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2075              TCP(sport=1234, dport=1234))
2076         self.pg0.add_stream(p)
2077         self.pg_enable_capture(self.pg_interfaces)
2078         self.pg_start()
2079         p = self.pg0.get_capture(1)
2080         packet = p[0]
2081         try:
2082             self.assertEqual(packet[IP].src, host_nat_ip)
2083             self.assertEqual(packet[IP].dst, server.ip4)
2084             self.assertTrue(packet.haslayer(GRE))
2085             self.check_ip_checksum(packet)
2086         except:
2087             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2088             raise
2089
2090         # server to host
2091         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2092              IP(src=server.ip4, dst=host_nat_ip) /
2093              GRE() /
2094              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2095              TCP(sport=1234, dport=1234))
2096         self.pg0.add_stream(p)
2097         self.pg_enable_capture(self.pg_interfaces)
2098         self.pg_start()
2099         p = self.pg0.get_capture(1)
2100         packet = p[0]
2101         try:
2102             self.assertEqual(packet[IP].src, server_nat_ip)
2103             self.assertEqual(packet[IP].dst, host.ip4)
2104             self.assertTrue(packet.haslayer(GRE))
2105             self.check_ip_checksum(packet)
2106         except:
2107             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2108             raise
2109
2110     def test_unknown_proto(self):
2111         """ NAT44 translate packet with unknown protocol """
2112         self.nat44_add_address(self.nat_addr)
2113         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2114         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2115                                                   is_inside=0)
2116
2117         # in2out
2118         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2119              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2120              TCP(sport=self.tcp_port_in, dport=20))
2121         self.pg0.add_stream(p)
2122         self.pg_enable_capture(self.pg_interfaces)
2123         self.pg_start()
2124         p = self.pg1.get_capture(1)
2125
2126         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2127              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2128              GRE() /
2129              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2130              TCP(sport=1234, dport=1234))
2131         self.pg0.add_stream(p)
2132         self.pg_enable_capture(self.pg_interfaces)
2133         self.pg_start()
2134         p = self.pg1.get_capture(1)
2135         packet = p[0]
2136         try:
2137             self.assertEqual(packet[IP].src, self.nat_addr)
2138             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2139             self.assertTrue(packet.haslayer(GRE))
2140             self.check_ip_checksum(packet)
2141         except:
2142             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2143             raise
2144
2145         # out2in
2146         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2147              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2148              GRE() /
2149              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2150              TCP(sport=1234, dport=1234))
2151         self.pg1.add_stream(p)
2152         self.pg_enable_capture(self.pg_interfaces)
2153         self.pg_start()
2154         p = self.pg0.get_capture(1)
2155         packet = p[0]
2156         try:
2157             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2158             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2159             self.assertTrue(packet.haslayer(GRE))
2160             self.check_ip_checksum(packet)
2161         except:
2162             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2163             raise
2164
2165     def test_hairpinning_unknown_proto(self):
2166         """ NAT44 translate packet with unknown protocol - hairpinning """
2167         host = self.pg0.remote_hosts[0]
2168         server = self.pg0.remote_hosts[1]
2169         host_in_port = 1234
2170         host_out_port = 0
2171         server_in_port = 5678
2172         server_out_port = 8765
2173         server_nat_ip = "10.0.0.11"
2174
2175         self.nat44_add_address(self.nat_addr)
2176         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2177         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2178                                                   is_inside=0)
2179
2180         # add static mapping for server
2181         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2182
2183         # host to server
2184         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2185              IP(src=host.ip4, dst=server_nat_ip) /
2186              TCP(sport=host_in_port, dport=server_out_port))
2187         self.pg0.add_stream(p)
2188         self.pg_enable_capture(self.pg_interfaces)
2189         self.pg_start()
2190         capture = self.pg0.get_capture(1)
2191
2192         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2193              IP(src=host.ip4, dst=server_nat_ip) /
2194              GRE() /
2195              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2196              TCP(sport=1234, dport=1234))
2197         self.pg0.add_stream(p)
2198         self.pg_enable_capture(self.pg_interfaces)
2199         self.pg_start()
2200         p = self.pg0.get_capture(1)
2201         packet = p[0]
2202         try:
2203             self.assertEqual(packet[IP].src, self.nat_addr)
2204             self.assertEqual(packet[IP].dst, server.ip4)
2205             self.assertTrue(packet.haslayer(GRE))
2206             self.check_ip_checksum(packet)
2207         except:
2208             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2209             raise
2210
2211         # server to host
2212         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2213              IP(src=server.ip4, dst=self.nat_addr) /
2214              GRE() /
2215              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2216              TCP(sport=1234, dport=1234))
2217         self.pg0.add_stream(p)
2218         self.pg_enable_capture(self.pg_interfaces)
2219         self.pg_start()
2220         p = self.pg0.get_capture(1)
2221         packet = p[0]
2222         try:
2223             self.assertEqual(packet[IP].src, server_nat_ip)
2224             self.assertEqual(packet[IP].dst, host.ip4)
2225             self.assertTrue(packet.haslayer(GRE))
2226             self.check_ip_checksum(packet)
2227         except:
2228             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2229             raise
2230
2231     def test_output_feature(self):
2232         """ NAT44 interface output feature (in2out postrouting) """
2233         self.nat44_add_address(self.nat_addr)
2234         self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2235         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index)
2236         self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2237                                                          is_inside=0)
2238
2239         # in2out
2240         pkts = self.create_stream_in(self.pg0, self.pg3)
2241         self.pg0.add_stream(pkts)
2242         self.pg_enable_capture(self.pg_interfaces)
2243         self.pg_start()
2244         capture = self.pg3.get_capture(len(pkts))
2245         self.verify_capture_out(capture)
2246
2247         # out2in
2248         pkts = self.create_stream_out(self.pg3)
2249         self.pg3.add_stream(pkts)
2250         self.pg_enable_capture(self.pg_interfaces)
2251         self.pg_start()
2252         capture = self.pg0.get_capture(len(pkts))
2253         self.verify_capture_in(capture, self.pg0)
2254
2255         # from non-NAT interface to NAT inside interface
2256         pkts = self.create_stream_in(self.pg2, self.pg0)
2257         self.pg2.add_stream(pkts)
2258         self.pg_enable_capture(self.pg_interfaces)
2259         self.pg_start()
2260         capture = self.pg0.get_capture(len(pkts))
2261         self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2262
2263     def test_output_feature_vrf_aware(self):
2264         """ NAT44 interface output feature VRF aware (in2out postrouting) """
2265         nat_ip_vrf10 = "10.0.0.10"
2266         nat_ip_vrf20 = "10.0.0.20"
2267
2268         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2269                                    dst_address_length=32,
2270                                    next_hop_address=self.pg3.remote_ip4n,
2271                                    next_hop_sw_if_index=self.pg3.sw_if_index,
2272                                    table_id=10)
2273         self.vapi.ip_add_del_route(dst_address=self.pg3.remote_ip4n,
2274                                    dst_address_length=32,
2275                                    next_hop_address=self.pg3.remote_ip4n,
2276                                    next_hop_sw_if_index=self.pg3.sw_if_index,
2277                                    table_id=20)
2278
2279         self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2280         self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2281         self.vapi.nat44_interface_add_del_output_feature(self.pg4.sw_if_index)
2282         self.vapi.nat44_interface_add_del_output_feature(self.pg6.sw_if_index)
2283         self.vapi.nat44_interface_add_del_output_feature(self.pg3.sw_if_index,
2284                                                          is_inside=0)
2285
2286         # in2out VRF 10
2287         pkts = self.create_stream_in(self.pg4, self.pg3)
2288         self.pg4.add_stream(pkts)
2289         self.pg_enable_capture(self.pg_interfaces)
2290         self.pg_start()
2291         capture = self.pg3.get_capture(len(pkts))
2292         self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2293
2294         # out2in VRF 10
2295         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2296         self.pg3.add_stream(pkts)
2297         self.pg_enable_capture(self.pg_interfaces)
2298         self.pg_start()
2299         capture = self.pg4.get_capture(len(pkts))
2300         self.verify_capture_in(capture, self.pg4)
2301
2302         # in2out VRF 20
2303         pkts = self.create_stream_in(self.pg6, self.pg3)
2304         self.pg6.add_stream(pkts)
2305         self.pg_enable_capture(self.pg_interfaces)
2306         self.pg_start()
2307         capture = self.pg3.get_capture(len(pkts))
2308         self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2309
2310         # out2in VRF 20
2311         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2312         self.pg3.add_stream(pkts)
2313         self.pg_enable_capture(self.pg_interfaces)
2314         self.pg_start()
2315         capture = self.pg6.get_capture(len(pkts))
2316         self.verify_capture_in(capture, self.pg6)
2317
2318     def test_output_feature_hairpinning(self):
2319         """ NAT44 interface output feature hairpinning (in2out postrouting) """
2320         host = self.pg0.remote_hosts[0]
2321         server = self.pg0.remote_hosts[1]
2322         host_in_port = 1234
2323         host_out_port = 0
2324         server_in_port = 5678
2325         server_out_port = 8765
2326
2327         self.nat44_add_address(self.nat_addr)
2328         self.vapi.nat44_interface_add_del_output_feature(self.pg0.sw_if_index)
2329         self.vapi.nat44_interface_add_del_output_feature(self.pg1.sw_if_index,
2330                                                          is_inside=0)
2331
2332         # add static mapping for server
2333         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2334                                       server_in_port, server_out_port,
2335                                       proto=IP_PROTOS.tcp)
2336
2337         # send packet from host to server
2338         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2339              IP(src=host.ip4, dst=self.nat_addr) /
2340              TCP(sport=host_in_port, dport=server_out_port))
2341         self.pg0.add_stream(p)
2342         self.pg_enable_capture(self.pg_interfaces)
2343         self.pg_start()
2344         capture = self.pg0.get_capture(1)
2345         p = capture[0]
2346         try:
2347             ip = p[IP]
2348             tcp = p[TCP]
2349             self.assertEqual(ip.src, self.nat_addr)
2350             self.assertEqual(ip.dst, server.ip4)
2351             self.assertNotEqual(tcp.sport, host_in_port)
2352             self.assertEqual(tcp.dport, server_in_port)
2353             self.check_tcp_checksum(p)
2354             host_out_port = tcp.sport
2355         except:
2356             self.logger.error(ppp("Unexpected or invalid packet:", p))
2357             raise
2358
2359         # send reply from server to host
2360         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2361              IP(src=server.ip4, dst=self.nat_addr) /
2362              TCP(sport=server_in_port, dport=host_out_port))
2363         self.pg0.add_stream(p)
2364         self.pg_enable_capture(self.pg_interfaces)
2365         self.pg_start()
2366         capture = self.pg0.get_capture(1)
2367         p = capture[0]
2368         try:
2369             ip = p[IP]
2370             tcp = p[TCP]
2371             self.assertEqual(ip.src, self.nat_addr)
2372             self.assertEqual(ip.dst, host.ip4)
2373             self.assertEqual(tcp.sport, server_out_port)
2374             self.assertEqual(tcp.dport, host_in_port)
2375             self.check_tcp_checksum(p)
2376         except:
2377             self.logger.error(ppp("Unexpected or invalid packet:"), p)
2378             raise
2379
2380     def tearDown(self):
2381         super(TestNAT44, self).tearDown()
2382         if not self.vpp_dead:
2383             self.logger.info(self.vapi.cli("show nat44 verbose"))
2384             self.clear_nat44()
2385
2386
2387 class TestDeterministicNAT(MethodHolder):
2388     """ Deterministic NAT Test Cases """
2389
2390     @classmethod
2391     def setUpConstants(cls):
2392         super(TestDeterministicNAT, cls).setUpConstants()
2393         cls.vpp_cmdline.extend(["nat", "{", "deterministic", "}"])
2394
2395     @classmethod
2396     def setUpClass(cls):
2397         super(TestDeterministicNAT, cls).setUpClass()
2398
2399         try:
2400             cls.tcp_port_in = 6303
2401             cls.tcp_external_port = 6303
2402             cls.udp_port_in = 6304
2403             cls.udp_external_port = 6304
2404             cls.icmp_id_in = 6305
2405             cls.nat_addr = '10.0.0.3'
2406
2407             cls.create_pg_interfaces(range(3))
2408             cls.interfaces = list(cls.pg_interfaces)
2409
2410             for i in cls.interfaces:
2411                 i.admin_up()
2412                 i.config_ip4()
2413                 i.resolve_arp()
2414
2415             cls.pg0.generate_remote_hosts(2)
2416             cls.pg0.configure_ipv4_neighbors()
2417
2418         except Exception:
2419             super(TestDeterministicNAT, cls).tearDownClass()
2420             raise
2421
2422     def create_stream_in(self, in_if, out_if, ttl=64):
2423         """
2424         Create packet stream for inside network
2425
2426         :param in_if: Inside interface
2427         :param out_if: Outside interface
2428         :param ttl: TTL of generated packets
2429         """
2430         pkts = []
2431         # TCP
2432         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2433              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2434              TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
2435         pkts.append(p)
2436
2437         # UDP
2438         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2439              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2440              UDP(sport=self.udp_port_in, dport=self.udp_external_port))
2441         pkts.append(p)
2442
2443         # ICMP
2444         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
2445              IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
2446              ICMP(id=self.icmp_id_in, type='echo-request'))
2447         pkts.append(p)
2448
2449         return pkts
2450
2451     def create_stream_out(self, out_if, dst_ip=None, ttl=64):
2452         """
2453         Create packet stream for outside network
2454
2455         :param out_if: Outside interface
2456         :param dst_ip: Destination IP address (Default use global NAT address)
2457         :param ttl: TTL of generated packets
2458         """
2459         if dst_ip is None:
2460             dst_ip = self.nat_addr
2461         pkts = []
2462         # TCP
2463         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2464              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2465              TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
2466         pkts.append(p)
2467
2468         # UDP
2469         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2470              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2471              UDP(dport=self.udp_port_out, sport=self.udp_external_port))
2472         pkts.append(p)
2473
2474         # ICMP
2475         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
2476              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
2477              ICMP(id=self.icmp_external_id, type='echo-reply'))
2478         pkts.append(p)
2479
2480         return pkts
2481
2482     def verify_capture_out(self, capture, nat_ip=None, packet_num=3):
2483         """
2484         Verify captured packets on outside network
2485
2486         :param capture: Captured packets
2487         :param nat_ip: Translated IP address (Default use global NAT address)
2488         :param same_port: Sorce port number is not translated (Default False)
2489         :param packet_num: Expected number of packets (Default 3)
2490         """
2491         if nat_ip is None:
2492             nat_ip = self.nat_addr
2493         self.assertEqual(packet_num, len(capture))
2494         for packet in capture:
2495             try:
2496                 self.assertEqual(packet[IP].src, nat_ip)
2497                 if packet.haslayer(TCP):
2498                     self.tcp_port_out = packet[TCP].sport
2499                 elif packet.haslayer(UDP):
2500                     self.udp_port_out = packet[UDP].sport
2501                 else:
2502                     self.icmp_external_id = packet[ICMP].id
2503             except:
2504                 self.logger.error(ppp("Unexpected or invalid packet "
2505                                       "(outside network):", packet))
2506                 raise
2507
2508     def initiate_tcp_session(self, in_if, out_if):
2509         """
2510         Initiates TCP session
2511
2512         :param in_if: Inside interface
2513         :param out_if: Outside interface
2514         """
2515         try:
2516             # SYN packet in->out
2517             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2518                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2519                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2520                      flags="S"))
2521             in_if.add_stream(p)
2522             self.pg_enable_capture(self.pg_interfaces)
2523             self.pg_start()
2524             capture = out_if.get_capture(1)
2525             p = capture[0]
2526             self.tcp_port_out = p[TCP].sport
2527
2528             # SYN + ACK packet out->in
2529             p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
2530                  IP(src=out_if.remote_ip4, dst=self.nat_addr) /
2531                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2532                      flags="SA"))
2533             out_if.add_stream(p)
2534             self.pg_enable_capture(self.pg_interfaces)
2535             self.pg_start()
2536             in_if.get_capture(1)
2537
2538             # ACK packet in->out
2539             p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
2540                  IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
2541                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2542                      flags="A"))
2543             in_if.add_stream(p)
2544             self.pg_enable_capture(self.pg_interfaces)
2545             self.pg_start()
2546             out_if.get_capture(1)
2547
2548         except:
2549             self.logger.error("TCP 3 way handshake failed")
2550             raise
2551
2552     def verify_ipfix_max_entries_per_user(self, data):
2553         """
2554         Verify IPFIX maximum entries per user exceeded event
2555
2556         :param data: Decoded IPFIX data records
2557         """
2558         self.assertEqual(1, len(data))
2559         record = data[0]
2560         # natEvent
2561         self.assertEqual(ord(record[230]), 13)
2562         # natQuotaExceededEvent
2563         self.assertEqual('\x03\x00\x00\x00', record[466])
2564         # sourceIPv4Address
2565         self.assertEqual(self.pg0.remote_ip4n, record[8])
2566
2567     def test_deterministic_mode(self):
2568         """ NAT plugin run deterministic mode """
2569         in_addr = '172.16.255.0'
2570         out_addr = '172.17.255.50'
2571         in_addr_t = '172.16.255.20'
2572         in_addr_n = socket.inet_aton(in_addr)
2573         out_addr_n = socket.inet_aton(out_addr)
2574         in_addr_t_n = socket.inet_aton(in_addr_t)
2575         in_plen = 24
2576         out_plen = 32
2577
2578         nat_config = self.vapi.nat_show_config()
2579         self.assertEqual(1, nat_config.deterministic)
2580
2581         self.vapi.nat_det_add_del_map(in_addr_n, in_plen, out_addr_n, out_plen)
2582
2583         rep1 = self.vapi.nat_det_forward(in_addr_t_n)
2584         self.assertEqual(rep1.out_addr[:4], out_addr_n)
2585         rep2 = self.vapi.nat_det_reverse(out_addr_n, rep1.out_port_hi)
2586         self.assertEqual(rep2.in_addr[:4], in_addr_t_n)
2587
2588         deterministic_mappings = self.vapi.nat_det_map_dump()
2589         self.assertEqual(len(deterministic_mappings), 1)
2590         dsm = deterministic_mappings[0]
2591         self.assertEqual(in_addr_n, dsm.in_addr[:4])
2592         self.assertEqual(in_plen, dsm.in_plen)
2593         self.assertEqual(out_addr_n, dsm.out_addr[:4])
2594         self.assertEqual(out_plen, dsm.out_plen)
2595
2596         self.clear_nat_det()
2597         deterministic_mappings = self.vapi.nat_det_map_dump()
2598         self.assertEqual(len(deterministic_mappings), 0)
2599
2600     def test_set_timeouts(self):
2601         """ Set deterministic NAT timeouts """
2602         timeouts_before = self.vapi.nat_det_get_timeouts()
2603
2604         self.vapi.nat_det_set_timeouts(timeouts_before.udp + 10,
2605                                        timeouts_before.tcp_established + 10,
2606                                        timeouts_before.tcp_transitory + 10,
2607                                        timeouts_before.icmp + 10)
2608
2609         timeouts_after = self.vapi.nat_det_get_timeouts()
2610
2611         self.assertNotEqual(timeouts_before.udp, timeouts_after.udp)
2612         self.assertNotEqual(timeouts_before.icmp, timeouts_after.icmp)
2613         self.assertNotEqual(timeouts_before.tcp_established,
2614                             timeouts_after.tcp_established)
2615         self.assertNotEqual(timeouts_before.tcp_transitory,
2616                             timeouts_after.tcp_transitory)
2617
2618     def test_det_in(self):
2619         """ Deterministic NAT translation test (TCP, UDP, ICMP) """
2620
2621         nat_ip = "10.0.0.10"
2622
2623         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2624                                       32,
2625                                       socket.inet_aton(nat_ip),
2626                                       32)
2627         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2628         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2629                                                   is_inside=0)
2630
2631         # in2out
2632         pkts = self.create_stream_in(self.pg0, self.pg1)
2633         self.pg0.add_stream(pkts)
2634         self.pg_enable_capture(self.pg_interfaces)
2635         self.pg_start()
2636         capture = self.pg1.get_capture(len(pkts))
2637         self.verify_capture_out(capture, nat_ip)
2638
2639         # out2in
2640         pkts = self.create_stream_out(self.pg1, nat_ip)
2641         self.pg1.add_stream(pkts)
2642         self.pg_enable_capture(self.pg_interfaces)
2643         self.pg_start()
2644         capture = self.pg0.get_capture(len(pkts))
2645         self.verify_capture_in(capture, self.pg0)
2646
2647         # session dump test
2648         sessions = self.vapi.nat_det_session_dump(self.pg0.remote_ip4n)
2649         self.assertEqual(len(sessions), 3)
2650
2651         # TCP session
2652         s = sessions[0]
2653         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2654         self.assertEqual(s.in_port, self.tcp_port_in)
2655         self.assertEqual(s.out_port, self.tcp_port_out)
2656         self.assertEqual(s.ext_port, self.tcp_external_port)
2657
2658         # UDP session
2659         s = sessions[1]
2660         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2661         self.assertEqual(s.in_port, self.udp_port_in)
2662         self.assertEqual(s.out_port, self.udp_port_out)
2663         self.assertEqual(s.ext_port, self.udp_external_port)
2664
2665         # ICMP session
2666         s = sessions[2]
2667         self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
2668         self.assertEqual(s.in_port, self.icmp_id_in)
2669         self.assertEqual(s.out_port, self.icmp_external_id)
2670
2671     def test_multiple_users(self):
2672         """ Deterministic NAT multiple users """
2673
2674         nat_ip = "10.0.0.10"
2675         port_in = 80
2676         external_port = 6303
2677
2678         host0 = self.pg0.remote_hosts[0]
2679         host1 = self.pg0.remote_hosts[1]
2680
2681         self.vapi.nat_det_add_del_map(host0.ip4n,
2682                                       24,
2683                                       socket.inet_aton(nat_ip),
2684                                       32)
2685         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2686         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2687                                                   is_inside=0)
2688
2689         # host0 to out
2690         p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
2691              IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
2692              TCP(sport=port_in, dport=external_port))
2693         self.pg0.add_stream(p)
2694         self.pg_enable_capture(self.pg_interfaces)
2695         self.pg_start()
2696         capture = self.pg1.get_capture(1)
2697         p = capture[0]
2698         try:
2699             ip = p[IP]
2700             tcp = p[TCP]
2701             self.assertEqual(ip.src, nat_ip)
2702             self.assertEqual(ip.dst, self.pg1.remote_ip4)
2703             self.assertEqual(tcp.dport, external_port)
2704             port_out0 = tcp.sport
2705         except:
2706             self.logger.error(ppp("Unexpected or invalid packet:", p))
2707             raise
2708
2709         # host1 to out
2710         p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
2711              IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
2712              TCP(sport=port_in, dport=external_port))
2713         self.pg0.add_stream(p)
2714         self.pg_enable_capture(self.pg_interfaces)
2715         self.pg_start()
2716         capture = self.pg1.get_capture(1)
2717         p = capture[0]
2718         try:
2719             ip = p[IP]
2720             tcp = p[TCP]
2721             self.assertEqual(ip.src, nat_ip)
2722             self.assertEqual(ip.dst, self.pg1.remote_ip4)
2723             self.assertEqual(tcp.dport, external_port)
2724             port_out1 = tcp.sport
2725         except:
2726             self.logger.error(ppp("Unexpected or invalid packet:", p))
2727             raise
2728
2729         dms = self.vapi.nat_det_map_dump()
2730         self.assertEqual(1, len(dms))
2731         self.assertEqual(2, dms[0].ses_num)
2732
2733         # out to host0
2734         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2735              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2736              TCP(sport=external_port, dport=port_out0))
2737         self.pg1.add_stream(p)
2738         self.pg_enable_capture(self.pg_interfaces)
2739         self.pg_start()
2740         capture = self.pg0.get_capture(1)
2741         p = capture[0]
2742         try:
2743             ip = p[IP]
2744             tcp = p[TCP]
2745             self.assertEqual(ip.src, self.pg1.remote_ip4)
2746             self.assertEqual(ip.dst, host0.ip4)
2747             self.assertEqual(tcp.dport, port_in)
2748             self.assertEqual(tcp.sport, external_port)
2749         except:
2750             self.logger.error(ppp("Unexpected or invalid packet:", p))
2751             raise
2752
2753         # out to host1
2754         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2755              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2756              TCP(sport=external_port, dport=port_out1))
2757         self.pg1.add_stream(p)
2758         self.pg_enable_capture(self.pg_interfaces)
2759         self.pg_start()
2760         capture = self.pg0.get_capture(1)
2761         p = capture[0]
2762         try:
2763             ip = p[IP]
2764             tcp = p[TCP]
2765             self.assertEqual(ip.src, self.pg1.remote_ip4)
2766             self.assertEqual(ip.dst, host1.ip4)
2767             self.assertEqual(tcp.dport, port_in)
2768             self.assertEqual(tcp.sport, external_port)
2769         except:
2770             self.logger.error(ppp("Unexpected or invalid packet", p))
2771             raise
2772
2773         # session close api test
2774         self.vapi.nat_det_close_session_out(socket.inet_aton(nat_ip),
2775                                             port_out1,
2776                                             self.pg1.remote_ip4n,
2777                                             external_port)
2778         dms = self.vapi.nat_det_map_dump()
2779         self.assertEqual(dms[0].ses_num, 1)
2780
2781         self.vapi.nat_det_close_session_in(host0.ip4n,
2782                                            port_in,
2783                                            self.pg1.remote_ip4n,
2784                                            external_port)
2785         dms = self.vapi.nat_det_map_dump()
2786         self.assertEqual(dms[0].ses_num, 0)
2787
2788     def test_tcp_session_close_detection_in(self):
2789         """ Deterministic NAT TCP session close from inside network """
2790         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2791                                       32,
2792                                       socket.inet_aton(self.nat_addr),
2793                                       32)
2794         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2795         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2796                                                   is_inside=0)
2797
2798         self.initiate_tcp_session(self.pg0, self.pg1)
2799
2800         # close the session from inside
2801         try:
2802             # FIN packet in -> out
2803             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2804                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2805                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2806                      flags="F"))
2807             self.pg0.add_stream(p)
2808             self.pg_enable_capture(self.pg_interfaces)
2809             self.pg_start()
2810             self.pg1.get_capture(1)
2811
2812             pkts = []
2813
2814             # ACK packet out -> in
2815             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2816                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2817                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2818                      flags="A"))
2819             pkts.append(p)
2820
2821             # FIN packet out -> in
2822             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2823                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2824                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2825                      flags="F"))
2826             pkts.append(p)
2827
2828             self.pg1.add_stream(pkts)
2829             self.pg_enable_capture(self.pg_interfaces)
2830             self.pg_start()
2831             self.pg0.get_capture(2)
2832
2833             # ACK packet in -> out
2834             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2835                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2836                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2837                      flags="A"))
2838             self.pg0.add_stream(p)
2839             self.pg_enable_capture(self.pg_interfaces)
2840             self.pg_start()
2841             self.pg1.get_capture(1)
2842
2843             # Check if deterministic NAT44 closed the session
2844             dms = self.vapi.nat_det_map_dump()
2845             self.assertEqual(0, dms[0].ses_num)
2846         except:
2847             self.logger.error("TCP session termination failed")
2848             raise
2849
2850     def test_tcp_session_close_detection_out(self):
2851         """ Deterministic NAT TCP session close from outside network """
2852         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2853                                       32,
2854                                       socket.inet_aton(self.nat_addr),
2855                                       32)
2856         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2857         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2858                                                   is_inside=0)
2859
2860         self.initiate_tcp_session(self.pg0, self.pg1)
2861
2862         # close the session from outside
2863         try:
2864             # FIN packet out -> in
2865             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2866                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2867                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2868                      flags="F"))
2869             self.pg1.add_stream(p)
2870             self.pg_enable_capture(self.pg_interfaces)
2871             self.pg_start()
2872             self.pg0.get_capture(1)
2873
2874             pkts = []
2875
2876             # ACK packet in -> out
2877             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2878                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2879                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2880                      flags="A"))
2881             pkts.append(p)
2882
2883             # ACK packet in -> out
2884             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2885                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2886                  TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
2887                      flags="F"))
2888             pkts.append(p)
2889
2890             self.pg0.add_stream(pkts)
2891             self.pg_enable_capture(self.pg_interfaces)
2892             self.pg_start()
2893             self.pg1.get_capture(2)
2894
2895             # ACK packet out -> in
2896             p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
2897                  IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
2898                  TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
2899                      flags="A"))
2900             self.pg1.add_stream(p)
2901             self.pg_enable_capture(self.pg_interfaces)
2902             self.pg_start()
2903             self.pg0.get_capture(1)
2904
2905             # Check if deterministic NAT44 closed the session
2906             dms = self.vapi.nat_det_map_dump()
2907             self.assertEqual(0, dms[0].ses_num)
2908         except:
2909             self.logger.error("TCP session termination failed")
2910             raise
2911
2912     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2913     def test_session_timeout(self):
2914         """ Deterministic NAT session timeouts """
2915         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2916                                       32,
2917                                       socket.inet_aton(self.nat_addr),
2918                                       32)
2919         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2920         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2921                                                   is_inside=0)
2922
2923         self.initiate_tcp_session(self.pg0, self.pg1)
2924         self.vapi.nat_det_set_timeouts(5, 5, 5, 5)
2925         pkts = self.create_stream_in(self.pg0, self.pg1)
2926         self.pg0.add_stream(pkts)
2927         self.pg_enable_capture(self.pg_interfaces)
2928         self.pg_start()
2929         capture = self.pg1.get_capture(len(pkts))
2930         sleep(15)
2931
2932         dms = self.vapi.nat_det_map_dump()
2933         self.assertEqual(0, dms[0].ses_num)
2934
2935     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
2936     def test_session_limit_per_user(self):
2937         """ Deterministic NAT maximum sessions per user limit """
2938         self.vapi.nat_det_add_del_map(self.pg0.remote_ip4n,
2939                                       32,
2940                                       socket.inet_aton(self.nat_addr),
2941                                       32)
2942         self.vapi.nat44_interface_add_del_feature(self.pg0.sw_if_index)
2943         self.vapi.nat44_interface_add_del_feature(self.pg1.sw_if_index,
2944                                                   is_inside=0)
2945         self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
2946                                      src_address=self.pg2.local_ip4n,
2947                                      path_mtu=512,
2948                                      template_interval=10)
2949         self.vapi.nat_ipfix()
2950
2951         pkts = []
2952         for port in range(1025, 2025):
2953             p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2954                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2955                  UDP(sport=port, dport=port))
2956             pkts.append(p)
2957
2958         self.pg0.add_stream(pkts)
2959         self.pg_enable_capture(self.pg_interfaces)
2960         self.pg_start()
2961         capture = self.pg1.get_capture(len(pkts))
2962
2963         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2964              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2965              UDP(sport=3001, dport=3002))
2966         self.pg0.add_stream(p)
2967         self.pg_enable_capture(self.pg_interfaces)
2968         self.pg_start()
2969         capture = self.pg1.assert_nothing_captured()
2970
2971         # verify ICMP error packet
2972         capture = self.pg0.get_capture(1)
2973         p = capture[0]
2974         self.assertTrue(p.haslayer(ICMP))
2975         icmp = p[ICMP]
2976         self.assertEqual(icmp.type, 3)
2977         self.assertEqual(icmp.code, 1)
2978         self.assertTrue(icmp.haslayer(IPerror))
2979         inner_ip = icmp[IPerror]
2980         self.assertEqual(inner_ip[UDPerror].sport, 3001)
2981         self.assertEqual(inner_ip[UDPerror].dport, 3002)
2982
2983         dms = self.vapi.nat_det_map_dump()
2984
2985         self.assertEqual(1000, dms[0].ses_num)
2986
2987         # verify IPFIX logging
2988         self.vapi.cli("ipfix flush")  # FIXME this should be an API call
2989         sleep(1)
2990         capture = self.pg2.get_capture(2)
2991         ipfix = IPFIXDecoder()
2992         # first load template
2993         for p in capture:
2994             self.assertTrue(p.haslayer(IPFIX))
2995             if p.haslayer(Template):
2996                 ipfix.add_template(p.getlayer(Template))
2997         # verify events in data set
2998         for p in capture:
2999             if p.haslayer(Data):
3000                 data = ipfix.decode_data_set(p.getlayer(Set))
3001                 self.verify_ipfix_max_entries_per_user(data)
3002
3003     def clear_nat_det(self):
3004         """
3005         Clear deterministic NAT configuration.
3006         """
3007         self.vapi.nat_ipfix(enable=0)
3008         self.vapi.nat_det_set_timeouts()
3009         deterministic_mappings = self.vapi.nat_det_map_dump()
3010         for dsm in deterministic_mappings:
3011             self.vapi.nat_det_add_del_map(dsm.in_addr,
3012                                           dsm.in_plen,
3013                                           dsm.out_addr,
3014                                           dsm.out_plen,
3015                                           is_add=0)
3016
3017         interfaces = self.vapi.nat44_interface_dump()
3018         for intf in interfaces:
3019             self.vapi.nat44_interface_add_del_feature(intf.sw_if_index,
3020                                                       intf.is_inside,
3021                                                       is_add=0)
3022
3023     def tearDown(self):
3024         super(TestDeterministicNAT, self).tearDown()
3025         if not self.vpp_dead:
3026             self.logger.info(self.vapi.cli("show nat44 detail"))
3027             self.clear_nat_det()
3028
3029
3030 class TestNAT64(MethodHolder):
3031     """ NAT64 Test Cases """
3032
3033     @classmethod
3034     def setUpClass(cls):
3035         super(TestNAT64, cls).setUpClass()
3036
3037         try:
3038             cls.tcp_port_in = 6303
3039             cls.tcp_port_out = 6303
3040             cls.udp_port_in = 6304
3041             cls.udp_port_out = 6304
3042             cls.icmp_id_in = 6305
3043             cls.icmp_id_out = 6305
3044             cls.nat_addr = '10.0.0.3'
3045             cls.nat_addr_n = socket.inet_pton(socket.AF_INET, cls.nat_addr)
3046             cls.vrf1_id = 10
3047             cls.vrf1_nat_addr = '10.0.10.3'
3048             cls.vrf1_nat_addr_n = socket.inet_pton(socket.AF_INET,
3049                                                    cls.vrf1_nat_addr)
3050
3051             cls.create_pg_interfaces(range(3))
3052             cls.ip6_interfaces = list(cls.pg_interfaces[0:1])
3053             cls.ip6_interfaces.append(cls.pg_interfaces[2])
3054             cls.ip4_interfaces = list(cls.pg_interfaces[1:2])
3055
3056             cls.vapi.ip_table_add_del(cls.vrf1_id, is_add=1, is_ipv6=1)
3057
3058             cls.pg_interfaces[2].set_table_ip6(cls.vrf1_id)
3059
3060             cls.pg0.generate_remote_hosts(2)
3061
3062             for i in cls.ip6_interfaces:
3063                 i.admin_up()
3064                 i.config_ip6()
3065                 i.configure_ipv6_neighbors()
3066
3067             for i in cls.ip4_interfaces:
3068                 i.admin_up()
3069                 i.config_ip4()
3070                 i.resolve_arp()
3071
3072         except Exception:
3073             super(TestNAT64, cls).tearDownClass()
3074             raise
3075
3076     def test_pool(self):
3077         """ Add/delete address to NAT64 pool """
3078         nat_addr = socket.inet_pton(socket.AF_INET, '1.2.3.4')
3079
3080         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr)
3081
3082         addresses = self.vapi.nat64_pool_addr_dump()
3083         self.assertEqual(len(addresses), 1)
3084         self.assertEqual(addresses[0].address, nat_addr)
3085
3086         self.vapi.nat64_add_del_pool_addr_range(nat_addr, nat_addr, is_add=0)
3087
3088         addresses = self.vapi.nat64_pool_addr_dump()
3089         self.assertEqual(len(addresses), 0)
3090
3091     def test_interface(self):
3092         """ Enable/disable NAT64 feature on the interface """
3093         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3094         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3095
3096         interfaces = self.vapi.nat64_interface_dump()
3097         self.assertEqual(len(interfaces), 2)
3098         pg0_found = False
3099         pg1_found = False
3100         for intf in interfaces:
3101             if intf.sw_if_index == self.pg0.sw_if_index:
3102                 self.assertEqual(intf.is_inside, 1)
3103                 pg0_found = True
3104             elif intf.sw_if_index == self.pg1.sw_if_index:
3105                 self.assertEqual(intf.is_inside, 0)
3106                 pg1_found = True
3107         self.assertTrue(pg0_found)
3108         self.assertTrue(pg1_found)
3109
3110         features = self.vapi.cli("show interface features pg0")
3111         self.assertNotEqual(features.find('nat64-in2out'), -1)
3112         features = self.vapi.cli("show interface features pg1")
3113         self.assertNotEqual(features.find('nat64-out2in'), -1)
3114
3115         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index, is_add=0)
3116         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_add=0)
3117
3118         interfaces = self.vapi.nat64_interface_dump()
3119         self.assertEqual(len(interfaces), 0)
3120
3121     def test_static_bib(self):
3122         """ Add/delete static BIB entry """
3123         in_addr = socket.inet_pton(socket.AF_INET6,
3124                                    '2001:db8:85a3::8a2e:370:7334')
3125         out_addr = socket.inet_pton(socket.AF_INET, '10.1.1.3')
3126         in_port = 1234
3127         out_port = 5678
3128         proto = IP_PROTOS.tcp
3129
3130         self.vapi.nat64_add_del_static_bib(in_addr,
3131                                            out_addr,
3132                                            in_port,
3133                                            out_port,
3134                                            proto)
3135         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3136         static_bib_num = 0
3137         for bibe in bib:
3138             if bibe.is_static:
3139                 static_bib_num += 1
3140                 self.assertEqual(bibe.i_addr, in_addr)
3141                 self.assertEqual(bibe.o_addr, out_addr)
3142                 self.assertEqual(bibe.i_port, in_port)
3143                 self.assertEqual(bibe.o_port, out_port)
3144         self.assertEqual(static_bib_num, 1)
3145
3146         self.vapi.nat64_add_del_static_bib(in_addr,
3147                                            out_addr,
3148                                            in_port,
3149                                            out_port,
3150                                            proto,
3151                                            is_add=0)
3152         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3153         static_bib_num = 0
3154         for bibe in bib:
3155             if bibe.is_static:
3156                 static_bib_num += 1
3157         self.assertEqual(static_bib_num, 0)
3158
3159     def test_set_timeouts(self):
3160         """ Set NAT64 timeouts """
3161         # verify default values
3162         timeouts = self.vapi.nat64_get_timeouts()
3163         self.assertEqual(timeouts.udp, 300)
3164         self.assertEqual(timeouts.icmp, 60)
3165         self.assertEqual(timeouts.tcp_trans, 240)
3166         self.assertEqual(timeouts.tcp_est, 7440)
3167         self.assertEqual(timeouts.tcp_incoming_syn, 6)
3168
3169         # set and verify custom values
3170         self.vapi.nat64_set_timeouts(udp=200, icmp=30, tcp_trans=250,
3171                                      tcp_est=7450, tcp_incoming_syn=10)
3172         timeouts = self.vapi.nat64_get_timeouts()
3173         self.assertEqual(timeouts.udp, 200)
3174         self.assertEqual(timeouts.icmp, 30)
3175         self.assertEqual(timeouts.tcp_trans, 250)
3176         self.assertEqual(timeouts.tcp_est, 7450)
3177         self.assertEqual(timeouts.tcp_incoming_syn, 10)
3178
3179     def test_dynamic(self):
3180         """ NAT64 dynamic translation test """
3181         self.tcp_port_in = 6303
3182         self.udp_port_in = 6304
3183         self.icmp_id_in = 6305
3184
3185         ses_num_start = self.nat64_get_ses_num()
3186
3187         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3188                                                 self.nat_addr_n)
3189         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3190         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3191
3192         # in2out
3193         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3194         self.pg0.add_stream(pkts)
3195         self.pg_enable_capture(self.pg_interfaces)
3196         self.pg_start()
3197         capture = self.pg1.get_capture(len(pkts))
3198         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3199                                 dst_ip=self.pg1.remote_ip4)
3200
3201         # out2in
3202         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3203         self.pg1.add_stream(pkts)
3204         self.pg_enable_capture(self.pg_interfaces)
3205         self.pg_start()
3206         capture = self.pg0.get_capture(len(pkts))
3207         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3208         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3209
3210         # in2out
3211         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3212         self.pg0.add_stream(pkts)
3213         self.pg_enable_capture(self.pg_interfaces)
3214         self.pg_start()
3215         capture = self.pg1.get_capture(len(pkts))
3216         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3217                                 dst_ip=self.pg1.remote_ip4)
3218
3219         # out2in
3220         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3221         self.pg1.add_stream(pkts)
3222         self.pg_enable_capture(self.pg_interfaces)
3223         self.pg_start()
3224         capture = self.pg0.get_capture(len(pkts))
3225         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3226
3227         ses_num_end = self.nat64_get_ses_num()
3228
3229         self.assertEqual(ses_num_end - ses_num_start, 3)
3230
3231         # tenant with specific VRF
3232         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3233                                                 self.vrf1_nat_addr_n,
3234                                                 vrf_id=self.vrf1_id)
3235         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3236
3237         pkts = self.create_stream_in_ip6(self.pg2, self.pg1)
3238         self.pg2.add_stream(pkts)
3239         self.pg_enable_capture(self.pg_interfaces)
3240         self.pg_start()
3241         capture = self.pg1.get_capture(len(pkts))
3242         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3243                                 dst_ip=self.pg1.remote_ip4)
3244
3245         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3246         self.pg1.add_stream(pkts)
3247         self.pg_enable_capture(self.pg_interfaces)
3248         self.pg_start()
3249         capture = self.pg2.get_capture(len(pkts))
3250         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg2.remote_ip6)
3251
3252     def test_static(self):
3253         """ NAT64 static translation test """
3254         self.tcp_port_in = 60303
3255         self.udp_port_in = 60304
3256         self.icmp_id_in = 60305
3257         self.tcp_port_out = 60303
3258         self.udp_port_out = 60304
3259         self.icmp_id_out = 60305
3260
3261         ses_num_start = self.nat64_get_ses_num()
3262
3263         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3264                                                 self.nat_addr_n)
3265         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3266         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3267
3268         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3269                                            self.nat_addr_n,
3270                                            self.tcp_port_in,
3271                                            self.tcp_port_out,
3272                                            IP_PROTOS.tcp)
3273         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3274                                            self.nat_addr_n,
3275                                            self.udp_port_in,
3276                                            self.udp_port_out,
3277                                            IP_PROTOS.udp)
3278         self.vapi.nat64_add_del_static_bib(self.pg0.remote_ip6n,
3279                                            self.nat_addr_n,
3280                                            self.icmp_id_in,
3281                                            self.icmp_id_out,
3282                                            IP_PROTOS.icmp)
3283
3284         # in2out
3285         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3286         self.pg0.add_stream(pkts)
3287         self.pg_enable_capture(self.pg_interfaces)
3288         self.pg_start()
3289         capture = self.pg1.get_capture(len(pkts))
3290         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3291                                 dst_ip=self.pg1.remote_ip4, same_port=True)
3292
3293         # out2in
3294         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3295         self.pg1.add_stream(pkts)
3296         self.pg_enable_capture(self.pg_interfaces)
3297         self.pg_start()
3298         capture = self.pg0.get_capture(len(pkts))
3299         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3300         self.verify_capture_in_ip6(capture, ip[IPv6].src, self.pg0.remote_ip6)
3301
3302         ses_num_end = self.nat64_get_ses_num()
3303
3304         self.assertEqual(ses_num_end - ses_num_start, 3)
3305
3306     @unittest.skipUnless(running_extended_tests(), "part of extended tests")
3307     def test_session_timeout(self):
3308         """ NAT64 session timeout """
3309         self.icmp_id_in = 1234
3310         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3311                                                 self.nat_addr_n)
3312         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3313         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3314         self.vapi.nat64_set_timeouts(icmp=5)
3315
3316         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3317         self.pg0.add_stream(pkts)
3318         self.pg_enable_capture(self.pg_interfaces)
3319         self.pg_start()
3320         capture = self.pg1.get_capture(len(pkts))
3321
3322         ses_num_before_timeout = self.nat64_get_ses_num()
3323
3324         sleep(15)
3325
3326         # ICMP session after timeout
3327         ses_num_after_timeout = self.nat64_get_ses_num()
3328         self.assertNotEqual(ses_num_before_timeout, ses_num_after_timeout)
3329
3330     def test_icmp_error(self):
3331         """ NAT64 ICMP Error message translation """
3332         self.tcp_port_in = 6303
3333         self.udp_port_in = 6304
3334         self.icmp_id_in = 6305
3335
3336         ses_num_start = self.nat64_get_ses_num()
3337
3338         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3339                                                 self.nat_addr_n)
3340         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3341         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3342
3343         # send some packets to create sessions
3344         pkts = self.create_stream_in_ip6(self.pg0, self.pg1)
3345         self.pg0.add_stream(pkts)
3346         self.pg_enable_capture(self.pg_interfaces)
3347         self.pg_start()
3348         capture_ip4 = self.pg1.get_capture(len(pkts))
3349         self.verify_capture_out(capture_ip4,
3350                                 nat_ip=self.nat_addr,
3351                                 dst_ip=self.pg1.remote_ip4)
3352
3353         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3354         self.pg1.add_stream(pkts)
3355         self.pg_enable_capture(self.pg_interfaces)
3356         self.pg_start()
3357         capture_ip6 = self.pg0.get_capture(len(pkts))
3358         ip = IPv6(src=''.join(['64:ff9b::', self.pg1.remote_ip4]))
3359         self.verify_capture_in_ip6(capture_ip6, ip[IPv6].src,
3360                                    self.pg0.remote_ip6)
3361
3362         # in2out
3363         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3364                 IPv6(src=self.pg0.remote_ip6, dst=ip[IPv6].src) /
3365                 ICMPv6DestUnreach(code=1) /
3366                 packet[IPv6] for packet in capture_ip6]
3367         self.pg0.add_stream(pkts)
3368         self.pg_enable_capture(self.pg_interfaces)
3369         self.pg_start()
3370         capture = self.pg1.get_capture(len(pkts))
3371         for packet in capture:
3372             try:
3373                 self.assertEqual(packet[IP].src, self.nat_addr)
3374                 self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3375                 self.assertEqual(packet[ICMP].type, 3)
3376                 self.assertEqual(packet[ICMP].code, 13)
3377                 inner = packet[IPerror]
3378                 self.assertEqual(inner.src, self.pg1.remote_ip4)
3379                 self.assertEqual(inner.dst, self.nat_addr)
3380                 self.check_icmp_checksum(packet)
3381                 if inner.haslayer(TCPerror):
3382                     self.assertEqual(inner[TCPerror].dport, self.tcp_port_out)
3383                 elif inner.haslayer(UDPerror):
3384                     self.assertEqual(inner[UDPerror].dport, self.udp_port_out)
3385                 else:
3386                     self.assertEqual(inner[ICMPerror].id, self.icmp_id_out)
3387             except:
3388                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3389                 raise
3390
3391         # out2in
3392         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3393                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3394                 ICMP(type=3, code=13) /
3395                 packet[IP] for packet in capture_ip4]
3396         self.pg1.add_stream(pkts)
3397         self.pg_enable_capture(self.pg_interfaces)
3398         self.pg_start()
3399         capture = self.pg0.get_capture(len(pkts))
3400         for packet in capture:
3401             try:
3402                 self.assertEqual(packet[IPv6].src, ip.src)
3403                 self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
3404                 icmp = packet[ICMPv6DestUnreach]
3405                 self.assertEqual(icmp.code, 1)
3406                 inner = icmp[IPerror6]
3407                 self.assertEqual(inner.src, self.pg0.remote_ip6)
3408                 self.assertEqual(inner.dst, ip.src)
3409                 self.check_icmpv6_checksum(packet)
3410                 if inner.haslayer(TCPerror):
3411                     self.assertEqual(inner[TCPerror].sport, self.tcp_port_in)
3412                 elif inner.haslayer(UDPerror):
3413                     self.assertEqual(inner[UDPerror].sport, self.udp_port_in)
3414                 else:
3415                     self.assertEqual(inner[ICMPv6EchoRequest].id,
3416                                      self.icmp_id_in)
3417             except:
3418                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3419                 raise
3420
3421     def test_hairpinning(self):
3422         """ NAT64 hairpinning """
3423
3424         client = self.pg0.remote_hosts[0]
3425         server = self.pg0.remote_hosts[1]
3426         server_tcp_in_port = 22
3427         server_tcp_out_port = 4022
3428         server_udp_in_port = 23
3429         server_udp_out_port = 4023
3430         client_tcp_in_port = 1234
3431         client_udp_in_port = 1235
3432         client_tcp_out_port = 0
3433         client_udp_out_port = 0
3434         ip = IPv6(src=''.join(['64:ff9b::', self.nat_addr]))
3435         nat_addr_ip6 = ip.src
3436
3437         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3438                                                 self.nat_addr_n)
3439         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3440         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3441
3442         self.vapi.nat64_add_del_static_bib(server.ip6n,
3443                                            self.nat_addr_n,
3444                                            server_tcp_in_port,
3445                                            server_tcp_out_port,
3446                                            IP_PROTOS.tcp)
3447         self.vapi.nat64_add_del_static_bib(server.ip6n,
3448                                            self.nat_addr_n,
3449                                            server_udp_in_port,
3450                                            server_udp_out_port,
3451                                            IP_PROTOS.udp)
3452
3453         # client to server
3454         pkts = []
3455         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3456              IPv6(src=client.ip6, dst=nat_addr_ip6) /
3457              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3458         pkts.append(p)
3459         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3460              IPv6(src=client.ip6, dst=nat_addr_ip6) /
3461              UDP(sport=client_udp_in_port, dport=server_udp_out_port))
3462         pkts.append(p)
3463         self.pg0.add_stream(pkts)
3464         self.pg_enable_capture(self.pg_interfaces)
3465         self.pg_start()
3466         capture = self.pg0.get_capture(len(pkts))
3467         for packet in capture:
3468             try:
3469                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3470                 self.assertEqual(packet[IPv6].dst, server.ip6)
3471                 if packet.haslayer(TCP):
3472                     self.assertNotEqual(packet[TCP].sport, client_tcp_in_port)
3473                     self.assertEqual(packet[TCP].dport, server_tcp_in_port)
3474                     self.check_tcp_checksum(packet)
3475                     client_tcp_out_port = packet[TCP].sport
3476                 else:
3477                     self.assertNotEqual(packet[UDP].sport, client_udp_in_port)
3478                     self.assertEqual(packet[UDP].dport, server_udp_in_port)
3479                     self.check_udp_checksum(packet)
3480                     client_udp_out_port = packet[UDP].sport
3481             except:
3482                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3483                 raise
3484
3485         # server to client
3486         pkts = []
3487         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3488              IPv6(src=server.ip6, dst=nat_addr_ip6) /
3489              TCP(sport=server_tcp_in_port, dport=client_tcp_out_port))
3490         pkts.append(p)
3491         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3492              IPv6(src=server.ip6, dst=nat_addr_ip6) /
3493              UDP(sport=server_udp_in_port, dport=client_udp_out_port))
3494         pkts.append(p)
3495         self.pg0.add_stream(pkts)
3496         self.pg_enable_capture(self.pg_interfaces)
3497         self.pg_start()
3498         capture = self.pg0.get_capture(len(pkts))
3499         for packet in capture:
3500             try:
3501                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3502                 self.assertEqual(packet[IPv6].dst, client.ip6)
3503                 if packet.haslayer(TCP):
3504                     self.assertEqual(packet[TCP].sport, server_tcp_out_port)
3505                     self.assertEqual(packet[TCP].dport, client_tcp_in_port)
3506                     self.check_tcp_checksum(packet)
3507                 else:
3508                     self.assertEqual(packet[UDP].sport, server_udp_out_port)
3509                     self.assertEqual(packet[UDP].dport, client_udp_in_port)
3510                     self.check_udp_checksum(packet)
3511             except:
3512                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3513                 raise
3514
3515         # ICMP error
3516         pkts = []
3517         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3518                 IPv6(src=client.ip6, dst=nat_addr_ip6) /
3519                 ICMPv6DestUnreach(code=1) /
3520                 packet[IPv6] for packet in capture]
3521         self.pg0.add_stream(pkts)
3522         self.pg_enable_capture(self.pg_interfaces)
3523         self.pg_start()
3524         capture = self.pg0.get_capture(len(pkts))
3525         for packet in capture:
3526             try:
3527                 self.assertEqual(packet[IPv6].src, nat_addr_ip6)
3528                 self.assertEqual(packet[IPv6].dst, server.ip6)
3529                 icmp = packet[ICMPv6DestUnreach]
3530                 self.assertEqual(icmp.code, 1)
3531                 inner = icmp[IPerror6]
3532                 self.assertEqual(inner.src, server.ip6)
3533                 self.assertEqual(inner.dst, nat_addr_ip6)
3534                 self.check_icmpv6_checksum(packet)
3535                 if inner.haslayer(TCPerror):
3536                     self.assertEqual(inner[TCPerror].sport, server_tcp_in_port)
3537                     self.assertEqual(inner[TCPerror].dport,
3538                                      client_tcp_out_port)
3539                 else:
3540                     self.assertEqual(inner[UDPerror].sport, server_udp_in_port)
3541                     self.assertEqual(inner[UDPerror].dport,
3542                                      client_udp_out_port)
3543             except:
3544                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
3545                 raise
3546
3547     def test_prefix(self):
3548         """ NAT64 Network-Specific Prefix """
3549
3550         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3551                                                 self.nat_addr_n)
3552         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3553         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3554         self.vapi.nat64_add_del_pool_addr_range(self.vrf1_nat_addr_n,
3555                                                 self.vrf1_nat_addr_n,
3556                                                 vrf_id=self.vrf1_id)
3557         self.vapi.nat64_add_del_interface(self.pg2.sw_if_index)
3558
3559         # Add global prefix
3560         global_pref64 = "2001:db8::"
3561         global_pref64_n = socket.inet_pton(socket.AF_INET6, global_pref64)
3562         global_pref64_len = 32
3563         self.vapi.nat64_add_del_prefix(global_pref64_n, global_pref64_len)
3564
3565         prefix = self.vapi.nat64_prefix_dump()
3566         self.assertEqual(len(prefix), 1)
3567         self.assertEqual(prefix[0].prefix, global_pref64_n)
3568         self.assertEqual(prefix[0].prefix_len, global_pref64_len)
3569         self.assertEqual(prefix[0].vrf_id, 0)
3570
3571         # Add tenant specific prefix
3572         vrf1_pref64 = "2001:db8:122:300::"
3573         vrf1_pref64_n = socket.inet_pton(socket.AF_INET6, vrf1_pref64)
3574         vrf1_pref64_len = 56
3575         self.vapi.nat64_add_del_prefix(vrf1_pref64_n,
3576                                        vrf1_pref64_len,
3577                                        vrf_id=self.vrf1_id)
3578         prefix = self.vapi.nat64_prefix_dump()
3579         self.assertEqual(len(prefix), 2)
3580
3581         # Global prefix
3582         pkts = self.create_stream_in_ip6(self.pg0,
3583                                          self.pg1,
3584                                          pref=global_pref64,
3585                                          plen=global_pref64_len)
3586         self.pg0.add_stream(pkts)
3587         self.pg_enable_capture(self.pg_interfaces)
3588         self.pg_start()
3589         capture = self.pg1.get_capture(len(pkts))
3590         self.verify_capture_out(capture, nat_ip=self.nat_addr,
3591                                 dst_ip=self.pg1.remote_ip4)
3592
3593         pkts = self.create_stream_out(self.pg1, dst_ip=self.nat_addr)
3594         self.pg1.add_stream(pkts)
3595         self.pg_enable_capture(self.pg_interfaces)
3596         self.pg_start()
3597         capture = self.pg0.get_capture(len(pkts))
3598         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
3599                                   global_pref64,
3600                                   global_pref64_len)
3601         self.verify_capture_in_ip6(capture, dst_ip, self.pg0.remote_ip6)
3602
3603         # Tenant specific prefix
3604         pkts = self.create_stream_in_ip6(self.pg2,
3605                                          self.pg1,
3606                                          pref=vrf1_pref64,
3607                                          plen=vrf1_pref64_len)
3608         self.pg2.add_stream(pkts)
3609         self.pg_enable_capture(self.pg_interfaces)
3610         self.pg_start()
3611         capture = self.pg1.get_capture(len(pkts))
3612         self.verify_capture_out(capture, nat_ip=self.vrf1_nat_addr,
3613                                 dst_ip=self.pg1.remote_ip4)
3614
3615         pkts = self.create_stream_out(self.pg1, dst_ip=self.vrf1_nat_addr)
3616         self.pg1.add_stream(pkts)
3617         self.pg_enable_capture(self.pg_interfaces)
3618         self.pg_start()
3619         capture = self.pg2.get_capture(len(pkts))
3620         dst_ip = self.compose_ip6(self.pg1.remote_ip4,
3621                                   vrf1_pref64,
3622                                   vrf1_pref64_len)
3623         self.verify_capture_in_ip6(capture, dst_ip, self.pg2.remote_ip6)
3624
3625     def test_unknown_proto(self):
3626         """ NAT64 translate packet with unknown protocol """
3627
3628         self.vapi.nat64_add_del_pool_addr_range(self.nat_addr_n,
3629                                                 self.nat_addr_n)
3630         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3631         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3632         remote_ip6 = self.compose_ip6(self.pg1.remote_ip4, '64:ff9b::', 96)
3633
3634         # in2out
3635         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3636              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6) /
3637              TCP(sport=self.tcp_port_in, dport=20))
3638         self.pg0.add_stream(p)
3639         self.pg_enable_capture(self.pg_interfaces)
3640         self.pg_start()
3641         p = self.pg1.get_capture(1)
3642
3643         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3644              IPv6(src=self.pg0.remote_ip6, dst=remote_ip6, nh=47) /
3645              GRE() /
3646              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
3647              TCP(sport=1234, dport=1234))
3648         self.pg0.add_stream(p)
3649         self.pg_enable_capture(self.pg_interfaces)
3650         self.pg_start()
3651         p = self.pg1.get_capture(1)
3652         packet = p[0]
3653         try:
3654             self.assertEqual(packet[IP].src, self.nat_addr)
3655             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
3656             self.assertTrue(packet.haslayer(GRE))
3657             self.check_ip_checksum(packet)
3658         except:
3659             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3660             raise
3661
3662         # out2in
3663         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3664              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3665              GRE() /
3666              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
3667              TCP(sport=1234, dport=1234))
3668         self.pg1.add_stream(p)
3669         self.pg_enable_capture(self.pg_interfaces)
3670         self.pg_start()
3671         p = self.pg0.get_capture(1)
3672         packet = p[0]
3673         try:
3674             self.assertEqual(packet[IPv6].src, remote_ip6)
3675             self.assertEqual(packet[IPv6].dst, self.pg0.remote_ip6)
3676             self.assertEqual(packet[IPv6].nh, 47)
3677         except:
3678             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3679             raise
3680
3681     def test_hairpinning_unknown_proto(self):
3682         """ NAT64 translate packet with unknown protocol - hairpinning """
3683
3684         client = self.pg0.remote_hosts[0]
3685         server = self.pg0.remote_hosts[1]
3686         server_tcp_in_port = 22
3687         server_tcp_out_port = 4022
3688         client_tcp_in_port = 1234
3689         client_tcp_out_port = 1235
3690         server_nat_ip = "10.0.0.100"
3691         client_nat_ip = "10.0.0.110"
3692         server_nat_ip_n = socket.inet_pton(socket.AF_INET, server_nat_ip)
3693         client_nat_ip_n = socket.inet_pton(socket.AF_INET, client_nat_ip)
3694         server_nat_ip6 = self.compose_ip6(server_nat_ip, '64:ff9b::', 96)
3695         client_nat_ip6 = self.compose_ip6(client_nat_ip, '64:ff9b::', 96)
3696
3697         self.vapi.nat64_add_del_pool_addr_range(server_nat_ip_n,
3698                                                 client_nat_ip_n)
3699         self.vapi.nat64_add_del_interface(self.pg0.sw_if_index)
3700         self.vapi.nat64_add_del_interface(self.pg1.sw_if_index, is_inside=0)
3701
3702         self.vapi.nat64_add_del_static_bib(server.ip6n,
3703                                            server_nat_ip_n,
3704                                            server_tcp_in_port,
3705                                            server_tcp_out_port,
3706                                            IP_PROTOS.tcp)
3707
3708         self.vapi.nat64_add_del_static_bib(server.ip6n,
3709                                            server_nat_ip_n,
3710                                            0,
3711                                            0,
3712                                            IP_PROTOS.gre)
3713
3714         self.vapi.nat64_add_del_static_bib(client.ip6n,
3715                                            client_nat_ip_n,
3716                                            client_tcp_in_port,
3717                                            client_tcp_out_port,
3718                                            IP_PROTOS.tcp)
3719
3720         # client to server
3721         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3722              IPv6(src=client.ip6, dst=server_nat_ip6) /
3723              TCP(sport=client_tcp_in_port, dport=server_tcp_out_port))
3724         self.pg0.add_stream(p)
3725         self.pg_enable_capture(self.pg_interfaces)
3726         self.pg_start()
3727         p = self.pg0.get_capture(1)
3728
3729         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3730              IPv6(src=client.ip6, dst=server_nat_ip6, nh=IP_PROTOS.gre) /
3731              GRE() /
3732              IP(src=self.pg2.local_ip4, dst=self.pg2.remote_ip4) /
3733              TCP(sport=1234, dport=1234))
3734         self.pg0.add_stream(p)
3735         self.pg_enable_capture(self.pg_interfaces)
3736         self.pg_start()
3737         p = self.pg0.get_capture(1)
3738         packet = p[0]
3739         try:
3740             self.assertEqual(packet[IPv6].src, client_nat_ip6)
3741             self.assertEqual(packet[IPv6].dst, server.ip6)
3742             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
3743         except:
3744             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3745             raise
3746
3747         # server to client
3748         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3749              IPv6(src=server.ip6, dst=client_nat_ip6, nh=IP_PROTOS.gre) /
3750              GRE() /
3751              IP(src=self.pg2.remote_ip4, dst=self.pg2.local_ip4) /
3752              TCP(sport=1234, dport=1234))
3753         self.pg0.add_stream(p)
3754         self.pg_enable_capture(self.pg_interfaces)
3755         self.pg_start()
3756         p = self.pg0.get_capture(1)
3757         packet = p[0]
3758         try:
3759             self.assertEqual(packet[IPv6].src, server_nat_ip6)
3760             self.assertEqual(packet[IPv6].dst, client.ip6)
3761             self.assertEqual(packet[IPv6].nh, IP_PROTOS.gre)
3762         except:
3763             self.logger.error(ppp("Unexpected or invalid packet:", packet))
3764             raise
3765
3766     def nat64_get_ses_num(self):
3767         """
3768         Return number of active NAT64 sessions.
3769         """
3770         st = self.vapi.nat64_st_dump()
3771         return len(st)
3772
3773     def clear_nat64(self):
3774         """
3775         Clear NAT64 configuration.
3776         """
3777         self.vapi.nat64_set_timeouts()
3778
3779         interfaces = self.vapi.nat64_interface_dump()
3780         for intf in interfaces:
3781             self.vapi.nat64_add_del_interface(intf.sw_if_index,
3782                                               intf.is_inside,
3783                                               is_add=0)
3784
3785         bib = self.vapi.nat64_bib_dump(IP_PROTOS.tcp)
3786         for bibe in bib:
3787             if bibe.is_static:
3788                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3789                                                    bibe.o_addr,
3790                                                    bibe.i_port,
3791                                                    bibe.o_port,
3792                                                    bibe.proto,
3793                                                    bibe.vrf_id,
3794                                                    is_add=0)
3795
3796         bib = self.vapi.nat64_bib_dump(IP_PROTOS.udp)
3797         for bibe in bib:
3798             if bibe.is_static:
3799                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3800                                                    bibe.o_addr,
3801                                                    bibe.i_port,
3802                                                    bibe.o_port,
3803                                                    bibe.proto,
3804                                                    bibe.vrf_id,
3805                                                    is_add=0)
3806
3807         bib = self.vapi.nat64_bib_dump(IP_PROTOS.icmp)
3808         for bibe in bib:
3809             if bibe.is_static:
3810                 self.vapi.nat64_add_del_static_bib(bibe.i_addr,
3811                                                    bibe.o_addr,
3812                                                    bibe.i_port,
3813                                                    bibe.o_port,
3814                                                    bibe.proto,
3815                                                    bibe.vrf_id,
3816                                                    is_add=0)
3817
3818         adresses = self.vapi.nat64_pool_addr_dump()
3819         for addr in adresses:
3820             self.vapi.nat64_add_del_pool_addr_range(addr.address,
3821                                                     addr.address,
3822                                                     vrf_id=addr.vrf_id,
3823                                                     is_add=0)
3824
3825         prefixes = self.vapi.nat64_prefix_dump()
3826         for prefix in prefixes:
3827             self.vapi.nat64_add_del_prefix(prefix.prefix,
3828                                            prefix.prefix_len,
3829                                            vrf_id=prefix.vrf_id,
3830                                            is_add=0)
3831
3832     def tearDown(self):
3833         super(TestNAT64, self).tearDown()
3834         if not self.vpp_dead:
3835             self.logger.info(self.vapi.cli("show nat64 pool"))
3836             self.logger.info(self.vapi.cli("show nat64 interfaces"))
3837             self.logger.info(self.vapi.cli("show nat64 prefix"))
3838             self.logger.info(self.vapi.cli("show nat64 bib all"))
3839             self.logger.info(self.vapi.cli("show nat64 session table all"))
3840             self.clear_nat64()
3841
3842 if __name__ == '__main__':
3843     unittest.main(testRunner=VppTestRunner)