16a3376e9b268d8a3291ecc0ea4e10ac21b476e6
[vpp.git] / src / plugins / nat / test / test_nat44_ei.py
1 #!/usr/bin/env python3
2
3 import ipaddress
4 import random
5 import socket
6 import struct
7 import unittest
8 from io import BytesIO
9 from time import sleep
10
11 import scapy.compat
12 from framework import VppTestCase, VppTestRunner
13 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
14 from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \
15     IPField, IntField, LongField, XByteField, FlagsField, FieldLenField, \
16     PacketListField
17 from scapy.data import IP_PROTOS
18 from scapy.layers.inet import IP, TCP, UDP, ICMP
19 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
20 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
21 from scapy.layers.l2 import Ether, ARP, GRE
22 from scapy.packet import Raw
23 from syslog_rfc5424_parser import SyslogMessage, ParseError
24 from syslog_rfc5424_parser.constants import SyslogSeverity
25 from util import ppp
26 from vpp_ip_route import VppIpRoute, VppRoutePath
27 from vpp_neighbor import VppNeighbor
28 from vpp_papi import VppEnum
29
30
31 # NAT HA protocol event data
32 class Event(Packet):
33     name = "Event"
34     fields_desc = [ByteEnumField("event_type", None,
35                                  {1: "add", 2: "del", 3: "refresh"}),
36                    ByteEnumField("protocol", None,
37                                  {0: "other", 1: "udp", 2: "tcp", 3: "icmp"}),
38                    ShortField("flags", 0),
39                    IPField("in_addr", None),
40                    IPField("out_addr", None),
41                    ShortField("in_port", None),
42                    ShortField("out_port", None),
43                    IPField("eh_addr", None),
44                    IPField("ehn_addr", None),
45                    ShortField("eh_port", None),
46                    ShortField("ehn_port", None),
47                    IntField("fib_index", None),
48                    IntField("total_pkts", 0),
49                    LongField("total_bytes", 0)]
50
51     def extract_padding(self, s):
52         return "", s
53
54
55 # NAT HA protocol header
56 class HANATStateSync(Packet):
57     name = "HA NAT state sync"
58     fields_desc = [XByteField("version", 1),
59                    FlagsField("flags", 0, 8, ['ACK']),
60                    FieldLenField("count", None, count_of="events"),
61                    IntField("sequence_number", 1),
62                    IntField("thread_index", 0),
63                    PacketListField("events", [], Event,
64                                    count_from=lambda pkt: pkt.count)]
65
66
67 class MethodHolder(VppTestCase):
68     """ NAT create capture and verify method holder """
69
70     @property
71     def config_flags(self):
72         return VppEnum.vl_api_nat_config_flags_t
73
74     @property
75     def nat44_config_flags(self):
76         return VppEnum.vl_api_nat44_config_flags_t
77
78     @property
79     def SYSLOG_SEVERITY(self):
80         return VppEnum.vl_api_syslog_severity_t
81
82     def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
83                                  local_port=0, external_port=0, vrf_id=0,
84                                  is_add=1, external_sw_if_index=0xFFFFFFFF,
85                                  proto=0, tag="", flags=0):
86         """
87         Add/delete NAT44EI static mapping
88
89         :param local_ip: Local IP address
90         :param external_ip: External IP address
91         :param local_port: Local port number (Optional)
92         :param external_port: External port number (Optional)
93         :param vrf_id: VRF ID (Default 0)
94         :param is_add: 1 if add, 0 if delete (Default add)
95         :param external_sw_if_index: External interface instead of IP address
96         :param proto: IP protocol (Mandatory if port specified)
97         :param tag: Opaque string tag
98         :param flags: NAT configuration flags
99         """
100
101         if not (local_port and external_port):
102             flags |= self.config_flags.NAT_IS_ADDR_ONLY
103
104         self.vapi.nat44_add_del_static_mapping(
105             is_add=is_add,
106             local_ip_address=local_ip,
107             external_ip_address=external_ip,
108             external_sw_if_index=external_sw_if_index,
109             local_port=local_port,
110             external_port=external_port,
111             vrf_id=vrf_id, protocol=proto,
112             flags=flags,
113             tag=tag)
114
115     def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF, twice_nat=0):
116         """
117         Add/delete NAT44EI address
118
119         :param ip: IP address
120         :param is_add: 1 if add, 0 if delete (Default add)
121         :param twice_nat: twice NAT address for external hosts
122         """
123         flags = self.config_flags.NAT_IS_TWICE_NAT if twice_nat else 0
124         self.vapi.nat44_add_del_address_range(first_ip_address=ip,
125                                               last_ip_address=ip,
126                                               vrf_id=vrf_id,
127                                               is_add=is_add,
128                                               flags=flags)
129
130     def create_routes_and_neigbors(self):
131         r1 = VppIpRoute(self, self.pg7.remote_ip4, 32,
132                         [VppRoutePath(self.pg7.remote_ip4,
133                                       self.pg7.sw_if_index)])
134         r2 = VppIpRoute(self, self.pg8.remote_ip4, 32,
135                         [VppRoutePath(self.pg8.remote_ip4,
136                                       self.pg8.sw_if_index)])
137         r1.add_vpp_config()
138         r2.add_vpp_config()
139
140         n1 = VppNeighbor(self,
141                          self.pg7.sw_if_index,
142                          self.pg7.remote_mac,
143                          self.pg7.remote_ip4,
144                          is_static=1)
145         n2 = VppNeighbor(self,
146                          self.pg8.sw_if_index,
147                          self.pg8.remote_mac,
148                          self.pg8.remote_ip4,
149                          is_static=1)
150         n1.add_vpp_config()
151         n2.add_vpp_config()
152
153     def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
154         """
155         Create packet stream for inside network
156
157         :param in_if: Inside interface
158         :param out_if: Outside interface
159         :param dst_ip: Destination address
160         :param ttl: TTL of generated packets
161         """
162         if dst_ip is None:
163             dst_ip = out_if.remote_ip4
164
165         pkts = []
166         # TCP
167         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
168              IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
169              TCP(sport=self.tcp_port_in, dport=20))
170         pkts.extend([p, p])
171
172         # UDP
173         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
174              IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
175              UDP(sport=self.udp_port_in, dport=20))
176         pkts.append(p)
177
178         # ICMP
179         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
180              IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
181              ICMP(id=self.icmp_id_in, type='echo-request'))
182         pkts.append(p)
183
184         return pkts
185
186     def compose_ip6(self, ip4, pref, plen):
187         """
188         Compose IPv4-embedded IPv6 addresses
189
190         :param ip4: IPv4 address
191         :param pref: IPv6 prefix
192         :param plen: IPv6 prefix length
193         :returns: IPv4-embedded IPv6 addresses
194         """
195         pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
196         ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
197         if plen == 32:
198             pref_n[4] = ip4_n[0]
199             pref_n[5] = ip4_n[1]
200             pref_n[6] = ip4_n[2]
201             pref_n[7] = ip4_n[3]
202         elif plen == 40:
203             pref_n[5] = ip4_n[0]
204             pref_n[6] = ip4_n[1]
205             pref_n[7] = ip4_n[2]
206             pref_n[9] = ip4_n[3]
207         elif plen == 48:
208             pref_n[6] = ip4_n[0]
209             pref_n[7] = ip4_n[1]
210             pref_n[9] = ip4_n[2]
211             pref_n[10] = ip4_n[3]
212         elif plen == 56:
213             pref_n[7] = ip4_n[0]
214             pref_n[9] = ip4_n[1]
215             pref_n[10] = ip4_n[2]
216             pref_n[11] = ip4_n[3]
217         elif plen == 64:
218             pref_n[9] = ip4_n[0]
219             pref_n[10] = ip4_n[1]
220             pref_n[11] = ip4_n[2]
221             pref_n[12] = ip4_n[3]
222         elif plen == 96:
223             pref_n[12] = ip4_n[0]
224             pref_n[13] = ip4_n[1]
225             pref_n[14] = ip4_n[2]
226             pref_n[15] = ip4_n[3]
227         packed_pref_n = b''.join([scapy.compat.chb(x) for x in pref_n])
228         return socket.inet_ntop(socket.AF_INET6, packed_pref_n)
229
230     def create_stream_out(self, out_if, dst_ip=None, ttl=64,
231                           use_inside_ports=False):
232         """
233         Create packet stream for outside network
234
235         :param out_if: Outside interface
236         :param dst_ip: Destination IP address (Default use global NAT address)
237         :param ttl: TTL of generated packets
238         :param use_inside_ports: Use inside NAT ports as destination ports
239                instead of outside ports
240         """
241         if dst_ip is None:
242             dst_ip = self.nat_addr
243         if not use_inside_ports:
244             tcp_port = self.tcp_port_out
245             udp_port = self.udp_port_out
246             icmp_id = self.icmp_id_out
247         else:
248             tcp_port = self.tcp_port_in
249             udp_port = self.udp_port_in
250             icmp_id = self.icmp_id_in
251         pkts = []
252         # TCP
253         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
254              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
255              TCP(dport=tcp_port, sport=20))
256         pkts.extend([p, p])
257
258         # UDP
259         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
260              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
261              UDP(dport=udp_port, sport=20))
262         pkts.append(p)
263
264         # ICMP
265         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
266              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
267              ICMP(id=icmp_id, type='echo-reply'))
268         pkts.append(p)
269
270         return pkts
271
272     def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
273         """
274         Create packet stream for outside network
275
276         :param out_if: Outside interface
277         :param dst_ip: Destination IP address (Default use global NAT address)
278         :param hl: HL of generated packets
279         """
280         pkts = []
281         # TCP
282         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
283              IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
284              TCP(dport=self.tcp_port_out, sport=20))
285         pkts.append(p)
286
287         # UDP
288         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
289              IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
290              UDP(dport=self.udp_port_out, sport=20))
291         pkts.append(p)
292
293         # ICMP
294         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
295              IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
296              ICMPv6EchoReply(id=self.icmp_id_out))
297         pkts.append(p)
298
299         return pkts
300
301     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
302                            dst_ip=None, is_ip6=False, ignore_port=False):
303         """
304         Verify captured packets on outside network
305
306         :param capture: Captured packets
307         :param nat_ip: Translated IP address (Default use global NAT address)
308         :param same_port: Source port number is not translated (Default False)
309         :param dst_ip: Destination IP address (Default do not verify)
310         :param is_ip6: If L3 protocol is IPv6 (Default False)
311         """
312         if is_ip6:
313             IP46 = IPv6
314             ICMP46 = ICMPv6EchoRequest
315         else:
316             IP46 = IP
317             ICMP46 = ICMP
318         if nat_ip is None:
319             nat_ip = self.nat_addr
320         for packet in capture:
321             try:
322                 if not is_ip6:
323                     self.assert_packet_checksums_valid(packet)
324                 self.assertEqual(packet[IP46].src, nat_ip)
325                 if dst_ip is not None:
326                     self.assertEqual(packet[IP46].dst, dst_ip)
327                 if packet.haslayer(TCP):
328                     if not ignore_port:
329                         if same_port:
330                             self.assertEqual(
331                                 packet[TCP].sport, self.tcp_port_in)
332                         else:
333                             self.assertNotEqual(
334                                 packet[TCP].sport, self.tcp_port_in)
335                     self.tcp_port_out = packet[TCP].sport
336                     self.assert_packet_checksums_valid(packet)
337                 elif packet.haslayer(UDP):
338                     if not ignore_port:
339                         if same_port:
340                             self.assertEqual(
341                                 packet[UDP].sport, self.udp_port_in)
342                         else:
343                             self.assertNotEqual(
344                                 packet[UDP].sport, self.udp_port_in)
345                     self.udp_port_out = packet[UDP].sport
346                 else:
347                     if not ignore_port:
348                         if same_port:
349                             self.assertEqual(
350                                 packet[ICMP46].id, self.icmp_id_in)
351                         else:
352                             self.assertNotEqual(
353                                 packet[ICMP46].id, self.icmp_id_in)
354                     self.icmp_id_out = packet[ICMP46].id
355                     self.assert_packet_checksums_valid(packet)
356             except:
357                 self.logger.error(ppp("Unexpected or invalid packet "
358                                       "(outside network):", packet))
359                 raise
360
361     def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
362                                dst_ip=None):
363         """
364         Verify captured packets on outside network
365
366         :param capture: Captured packets
367         :param nat_ip: Translated IP address
368         :param same_port: Source port number is not translated (Default False)
369         :param dst_ip: Destination IP address (Default do not verify)
370         """
371         return self.verify_capture_out(capture, nat_ip, same_port, dst_ip,
372                                        True)
373
374     def verify_capture_in(self, capture, in_if):
375         """
376         Verify captured packets on inside network
377
378         :param capture: Captured packets
379         :param in_if: Inside interface
380         """
381         for packet in capture:
382             try:
383                 self.assert_packet_checksums_valid(packet)
384                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
385                 if packet.haslayer(TCP):
386                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
387                 elif packet.haslayer(UDP):
388                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
389                 else:
390                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
391             except:
392                 self.logger.error(ppp("Unexpected or invalid packet "
393                                       "(inside network):", packet))
394                 raise
395
396     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
397         """
398         Verify captured packet that don't have to be translated
399
400         :param capture: Captured packets
401         :param ingress_if: Ingress interface
402         :param egress_if: Egress interface
403         """
404         for packet in capture:
405             try:
406                 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
407                 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
408                 if packet.haslayer(TCP):
409                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
410                 elif packet.haslayer(UDP):
411                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
412                 else:
413                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
414             except:
415                 self.logger.error(ppp("Unexpected or invalid packet "
416                                       "(inside network):", packet))
417                 raise
418
419     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
420                                             icmp_type=11):
421         """
422         Verify captured packets with ICMP errors on outside network
423
424         :param capture: Captured packets
425         :param src_ip: Translated IP address or IP address of VPP
426                        (Default use global NAT address)
427         :param icmp_type: Type of error ICMP packet
428                           we are expecting (Default 11)
429         """
430         if src_ip is None:
431             src_ip = self.nat_addr
432         for packet in capture:
433             try:
434                 self.assertEqual(packet[IP].src, src_ip)
435                 self.assertEqual(packet.haslayer(ICMP), 1)
436                 icmp = packet[ICMP]
437                 self.assertEqual(icmp.type, icmp_type)
438                 self.assertTrue(icmp.haslayer(IPerror))
439                 inner_ip = icmp[IPerror]
440                 if inner_ip.haslayer(TCPerror):
441                     self.assertEqual(inner_ip[TCPerror].dport,
442                                      self.tcp_port_out)
443                 elif inner_ip.haslayer(UDPerror):
444                     self.assertEqual(inner_ip[UDPerror].dport,
445                                      self.udp_port_out)
446                 else:
447                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
448             except:
449                 self.logger.error(ppp("Unexpected or invalid packet "
450                                       "(outside network):", packet))
451                 raise
452
453     def verify_capture_in_with_icmp_errors(self, capture, in_if, icmp_type=11):
454         """
455         Verify captured packets with ICMP errors on inside network
456
457         :param capture: Captured packets
458         :param in_if: Inside interface
459         :param icmp_type: Type of error ICMP packet
460                           we are expecting (Default 11)
461         """
462         for packet in capture:
463             try:
464                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
465                 self.assertEqual(packet.haslayer(ICMP), 1)
466                 icmp = packet[ICMP]
467                 self.assertEqual(icmp.type, icmp_type)
468                 self.assertTrue(icmp.haslayer(IPerror))
469                 inner_ip = icmp[IPerror]
470                 if inner_ip.haslayer(TCPerror):
471                     self.assertEqual(inner_ip[TCPerror].sport,
472                                      self.tcp_port_in)
473                 elif inner_ip.haslayer(UDPerror):
474                     self.assertEqual(inner_ip[UDPerror].sport,
475                                      self.udp_port_in)
476                 else:
477                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
478             except:
479                 self.logger.error(ppp("Unexpected or invalid packet "
480                                       "(inside network):", packet))
481                 raise
482
483     def create_stream_frag(self, src_if, dst, sport, dport, data,
484                            proto=IP_PROTOS.tcp, echo_reply=False):
485         """
486         Create fragmented packet stream
487
488         :param src_if: Source interface
489         :param dst: Destination IPv4 address
490         :param sport: Source port
491         :param dport: Destination port
492         :param data: Payload data
493         :param proto: protocol (TCP, UDP, ICMP)
494         :param echo_reply: use echo_reply if protocol is ICMP
495         :returns: Fragments
496         """
497         if proto == IP_PROTOS.tcp:
498             p = (IP(src=src_if.remote_ip4, dst=dst) /
499                  TCP(sport=sport, dport=dport) /
500                  Raw(data))
501             p = p.__class__(scapy.compat.raw(p))
502             chksum = p[TCP].chksum
503             proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
504         elif proto == IP_PROTOS.udp:
505             proto_header = UDP(sport=sport, dport=dport)
506         elif proto == IP_PROTOS.icmp:
507             if not echo_reply:
508                 proto_header = ICMP(id=sport, type='echo-request')
509             else:
510                 proto_header = ICMP(id=sport, type='echo-reply')
511         else:
512             raise Exception("Unsupported protocol")
513         id = random.randint(0, 65535)
514         pkts = []
515         if proto == IP_PROTOS.tcp:
516             raw = Raw(data[0:4])
517         else:
518             raw = Raw(data[0:16])
519         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
520              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
521              proto_header /
522              raw)
523         pkts.append(p)
524         if proto == IP_PROTOS.tcp:
525             raw = Raw(data[4:20])
526         else:
527             raw = Raw(data[16:32])
528         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
529              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
530                 proto=proto) /
531              raw)
532         pkts.append(p)
533         if proto == IP_PROTOS.tcp:
534             raw = Raw(data[20:])
535         else:
536             raw = Raw(data[32:])
537         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
538              IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto,
539                 id=id) /
540              raw)
541         pkts.append(p)
542         return pkts
543
544     def reass_frags_and_verify(self, frags, src, dst):
545         """
546         Reassemble and verify fragmented packet
547
548         :param frags: Captured fragments
549         :param src: Source IPv4 address to verify
550         :param dst: Destination IPv4 address to verify
551
552         :returns: Reassembled IPv4 packet
553         """
554         buffer = BytesIO()
555         for p in frags:
556             self.assertEqual(p[IP].src, src)
557             self.assertEqual(p[IP].dst, dst)
558             self.assert_ip_checksum_valid(p)
559             buffer.seek(p[IP].frag * 8)
560             buffer.write(bytes(p[IP].payload))
561         ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
562                 proto=frags[0][IP].proto)
563         if ip.proto == IP_PROTOS.tcp:
564             p = (ip / TCP(buffer.getvalue()))
565             self.logger.debug(ppp("Reassembled:", p))
566             self.assert_tcp_checksum_valid(p)
567         elif ip.proto == IP_PROTOS.udp:
568             p = (ip / UDP(buffer.getvalue()[:8]) /
569                  Raw(buffer.getvalue()[8:]))
570         elif ip.proto == IP_PROTOS.icmp:
571             p = (ip / ICMP(buffer.getvalue()))
572         return p
573
574     def verify_ipfix_nat44_ses(self, data):
575         """
576         Verify IPFIX NAT44EI session create/delete event
577
578         :param data: Decoded IPFIX data records
579         """
580         nat44_ses_create_num = 0
581         nat44_ses_delete_num = 0
582         self.assertEqual(6, len(data))
583         for record in data:
584             # natEvent
585             self.assertIn(scapy.compat.orb(record[230]), [4, 5])
586             if scapy.compat.orb(record[230]) == 4:
587                 nat44_ses_create_num += 1
588             else:
589                 nat44_ses_delete_num += 1
590             # sourceIPv4Address
591             self.assertEqual(self.pg0.remote_ip4,
592                              str(ipaddress.IPv4Address(record[8])))
593             # postNATSourceIPv4Address
594             self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
595                              record[225])
596             # ingressVRFID
597             self.assertEqual(struct.pack("!I", 0), record[234])
598             # protocolIdentifier/sourceTransportPort
599             # /postNAPTSourceTransportPort
600             if IP_PROTOS.icmp == scapy.compat.orb(record[4]):
601                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
602                 self.assertEqual(struct.pack("!H", self.icmp_id_out),
603                                  record[227])
604             elif IP_PROTOS.tcp == scapy.compat.orb(record[4]):
605                 self.assertEqual(struct.pack("!H", self.tcp_port_in),
606                                  record[7])
607                 self.assertEqual(struct.pack("!H", self.tcp_port_out),
608                                  record[227])
609             elif IP_PROTOS.udp == scapy.compat.orb(record[4]):
610                 self.assertEqual(struct.pack("!H", self.udp_port_in),
611                                  record[7])
612                 self.assertEqual(struct.pack("!H", self.udp_port_out),
613                                  record[227])
614             else:
615                 self.fail("Invalid protocol")
616         self.assertEqual(3, nat44_ses_create_num)
617         self.assertEqual(3, nat44_ses_delete_num)
618
619     def verify_ipfix_addr_exhausted(self, data):
620         self.assertEqual(1, len(data))
621         record = data[0]
622         # natEvent
623         self.assertEqual(scapy.compat.orb(record[230]), 3)
624         # natPoolID
625         self.assertEqual(struct.pack("!I", 0), record[283])
626
627     def verify_ipfix_max_sessions(self, data, limit):
628         self.assertEqual(1, len(data))
629         record = data[0]
630         # natEvent
631         self.assertEqual(scapy.compat.orb(record[230]), 13)
632         # natQuotaExceededEvent
633         self.assertEqual(struct.pack("I", 1), record[466])
634         # maxSessionEntries
635         self.assertEqual(struct.pack("I", limit), record[471])
636
637     def verify_no_nat44_user(self):
638         """ Verify that there is no NAT44EI user """
639         users = self.vapi.nat44_user_dump()
640         self.assertEqual(len(users), 0)
641         users = self.statistics.get_counter('/nat44/total-users')
642         self.assertEqual(users[0][0], 0)
643         sessions = self.statistics.get_counter('/nat44/total-sessions')
644         self.assertEqual(sessions[0][0], 0)
645
646     def verify_syslog_apmap(self, data, is_add=True):
647         message = data.decode('utf-8')
648         try:
649             message = SyslogMessage.parse(message)
650         except ParseError as e:
651             self.logger.error(e)
652             raise
653         else:
654             self.assertEqual(message.severity, SyslogSeverity.info)
655             self.assertEqual(message.appname, 'NAT')
656             self.assertEqual(message.msgid, 'APMADD' if is_add else 'APMDEL')
657             sd_params = message.sd.get('napmap')
658             self.assertTrue(sd_params is not None)
659             self.assertEqual(sd_params.get('IATYP'), 'IPv4')
660             self.assertEqual(sd_params.get('ISADDR'), self.pg0.remote_ip4)
661             self.assertEqual(sd_params.get('ISPORT'), "%d" % self.tcp_port_in)
662             self.assertEqual(sd_params.get('XATYP'), 'IPv4')
663             self.assertEqual(sd_params.get('XSADDR'), self.nat_addr)
664             self.assertEqual(sd_params.get('XSPORT'), "%d" % self.tcp_port_out)
665             self.assertEqual(sd_params.get('PROTO'), "%d" % IP_PROTOS.tcp)
666             self.assertTrue(sd_params.get('SSUBIX') is not None)
667             self.assertEqual(sd_params.get('SVLAN'), '0')
668
669     def verify_mss_value(self, pkt, mss):
670         if not pkt.haslayer(IP) or not pkt.haslayer(TCP):
671             raise TypeError("Not a TCP/IP packet")
672
673         for option in pkt[TCP].options:
674             if option[0] == 'MSS':
675                 self.assertEqual(option[1], mss)
676                 self.assert_tcp_checksum_valid(pkt)
677
678     @staticmethod
679     def proto2layer(proto):
680         if proto == IP_PROTOS.tcp:
681             return TCP
682         elif proto == IP_PROTOS.udp:
683             return UDP
684         elif proto == IP_PROTOS.icmp:
685             return ICMP
686         else:
687             raise Exception("Unsupported protocol")
688
689     def frag_in_order(self, proto=IP_PROTOS.tcp, dont_translate=False,
690                       ignore_port=False):
691         layer = self.proto2layer(proto)
692
693         if proto == IP_PROTOS.tcp:
694             data = b"A" * 4 + b"B" * 16 + b"C" * 3
695         else:
696             data = b"A" * 16 + b"B" * 16 + b"C" * 3
697         self.port_in = random.randint(1025, 65535)
698
699         # in2out
700         pkts = self.create_stream_frag(self.pg0, self.pg1.remote_ip4,
701                                        self.port_in, 20, data, proto)
702         self.pg0.add_stream(pkts)
703         self.pg_enable_capture(self.pg_interfaces)
704         self.pg_start()
705         frags = self.pg1.get_capture(len(pkts))
706         if not dont_translate:
707             p = self.reass_frags_and_verify(frags,
708                                             self.nat_addr,
709                                             self.pg1.remote_ip4)
710         else:
711             p = self.reass_frags_and_verify(frags,
712                                             self.pg0.remote_ip4,
713                                             self.pg1.remote_ip4)
714         if proto != IP_PROTOS.icmp:
715             if not dont_translate:
716                 self.assertEqual(p[layer].dport, 20)
717                 if not ignore_port:
718                     self.assertNotEqual(p[layer].sport, self.port_in)
719             else:
720                 self.assertEqual(p[layer].sport, self.port_in)
721         else:
722             if not ignore_port:
723                 if not dont_translate:
724                     self.assertNotEqual(p[layer].id, self.port_in)
725                 else:
726                     self.assertEqual(p[layer].id, self.port_in)
727         self.assertEqual(data, p[Raw].load)
728
729         # out2in
730         if not dont_translate:
731             dst_addr = self.nat_addr
732         else:
733             dst_addr = self.pg0.remote_ip4
734         if proto != IP_PROTOS.icmp:
735             sport = 20
736             dport = p[layer].sport
737         else:
738             sport = p[layer].id
739             dport = 0
740         pkts = self.create_stream_frag(self.pg1, dst_addr, sport, dport, data,
741                                        proto, echo_reply=True)
742         self.pg1.add_stream(pkts)
743         self.pg_enable_capture(self.pg_interfaces)
744         self.pg_start()
745         frags = self.pg0.get_capture(len(pkts))
746         p = self.reass_frags_and_verify(frags,
747                                         self.pg1.remote_ip4,
748                                         self.pg0.remote_ip4)
749         if proto != IP_PROTOS.icmp:
750             self.assertEqual(p[layer].sport, 20)
751             self.assertEqual(p[layer].dport, self.port_in)
752         else:
753             self.assertEqual(p[layer].id, self.port_in)
754         self.assertEqual(data, p[Raw].load)
755
756     def reass_hairpinning(self, server_addr, server_in_port, server_out_port,
757                           host_in_port, proto=IP_PROTOS.tcp,
758                           ignore_port=False):
759
760         layer = self.proto2layer(proto)
761
762         if proto == IP_PROTOS.tcp:
763             data = b"A" * 4 + b"B" * 16 + b"C" * 3
764         else:
765             data = b"A" * 16 + b"B" * 16 + b"C" * 3
766
767         # send packet from host to server
768         pkts = self.create_stream_frag(self.pg0,
769                                        self.nat_addr,
770                                        host_in_port,
771                                        server_out_port,
772                                        data,
773                                        proto)
774         self.pg0.add_stream(pkts)
775         self.pg_enable_capture(self.pg_interfaces)
776         self.pg_start()
777         frags = self.pg0.get_capture(len(pkts))
778         p = self.reass_frags_and_verify(frags,
779                                         self.nat_addr,
780                                         server_addr)
781         if proto != IP_PROTOS.icmp:
782             if not ignore_port:
783                 self.assertNotEqual(p[layer].sport, host_in_port)
784             self.assertEqual(p[layer].dport, server_in_port)
785         else:
786             if not ignore_port:
787                 self.assertNotEqual(p[layer].id, host_in_port)
788         self.assertEqual(data, p[Raw].load)
789
790     def frag_out_of_order(self, proto=IP_PROTOS.tcp, dont_translate=False,
791                           ignore_port=False):
792         layer = self.proto2layer(proto)
793
794         if proto == IP_PROTOS.tcp:
795             data = b"A" * 4 + b"B" * 16 + b"C" * 3
796         else:
797             data = b"A" * 16 + b"B" * 16 + b"C" * 3
798         self.port_in = random.randint(1025, 65535)
799
800         for i in range(2):
801             # in2out
802             pkts = self.create_stream_frag(self.pg0, self.pg1.remote_ip4,
803                                            self.port_in, 20, data, proto)
804             pkts.reverse()
805             self.pg0.add_stream(pkts)
806             self.pg_enable_capture(self.pg_interfaces)
807             self.pg_start()
808             frags = self.pg1.get_capture(len(pkts))
809             if not dont_translate:
810                 p = self.reass_frags_and_verify(frags,
811                                                 self.nat_addr,
812                                                 self.pg1.remote_ip4)
813             else:
814                 p = self.reass_frags_and_verify(frags,
815                                                 self.pg0.remote_ip4,
816                                                 self.pg1.remote_ip4)
817             if proto != IP_PROTOS.icmp:
818                 if not dont_translate:
819                     self.assertEqual(p[layer].dport, 20)
820                     if not ignore_port:
821                         self.assertNotEqual(p[layer].sport, self.port_in)
822                 else:
823                     self.assertEqual(p[layer].sport, self.port_in)
824             else:
825                 if not ignore_port:
826                     if not dont_translate:
827                         self.assertNotEqual(p[layer].id, self.port_in)
828                     else:
829                         self.assertEqual(p[layer].id, self.port_in)
830             self.assertEqual(data, p[Raw].load)
831
832             # out2in
833             if not dont_translate:
834                 dst_addr = self.nat_addr
835             else:
836                 dst_addr = self.pg0.remote_ip4
837             if proto != IP_PROTOS.icmp:
838                 sport = 20
839                 dport = p[layer].sport
840             else:
841                 sport = p[layer].id
842                 dport = 0
843             pkts = self.create_stream_frag(self.pg1, dst_addr, sport, dport,
844                                            data, proto, echo_reply=True)
845             pkts.reverse()
846             self.pg1.add_stream(pkts)
847             self.pg_enable_capture(self.pg_interfaces)
848             self.pg_start()
849             frags = self.pg0.get_capture(len(pkts))
850             p = self.reass_frags_and_verify(frags,
851                                             self.pg1.remote_ip4,
852                                             self.pg0.remote_ip4)
853             if proto != IP_PROTOS.icmp:
854                 self.assertEqual(p[layer].sport, 20)
855                 self.assertEqual(p[layer].dport, self.port_in)
856             else:
857                 self.assertEqual(p[layer].id, self.port_in)
858             self.assertEqual(data, p[Raw].load)
859
860
861 class TestNAT44EI(MethodHolder):
862     """ NAT44EI Test Cases """
863
864     max_translations = 10240
865     max_users = 10240
866
867     @classmethod
868     def setUpClass(cls):
869         super(TestNAT44EI, cls).setUpClass()
870         cls.vapi.cli("set log class nat level debug")
871
872         cls.tcp_port_in = 6303
873         cls.tcp_port_out = 6303
874         cls.udp_port_in = 6304
875         cls.udp_port_out = 6304
876         cls.icmp_id_in = 6305
877         cls.icmp_id_out = 6305
878         cls.nat_addr = '10.0.0.3'
879         cls.ipfix_src_port = 4739
880         cls.ipfix_domain_id = 1
881         cls.tcp_external_port = 80
882         cls.udp_external_port = 69
883
884         cls.create_pg_interfaces(range(10))
885         cls.interfaces = list(cls.pg_interfaces[0:4])
886
887         for i in cls.interfaces:
888             i.admin_up()
889             i.config_ip4()
890             i.resolve_arp()
891
892         cls.pg0.generate_remote_hosts(3)
893         cls.pg0.configure_ipv4_neighbors()
894
895         cls.pg1.generate_remote_hosts(1)
896         cls.pg1.configure_ipv4_neighbors()
897
898         cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
899         cls.vapi.ip_table_add_del(is_add=1, table={'table_id': 10})
900         cls.vapi.ip_table_add_del(is_add=1, table={'table_id': 20})
901
902         cls.pg4._local_ip4 = "172.16.255.1"
903         cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
904         cls.pg4.set_table_ip4(10)
905         cls.pg5._local_ip4 = "172.17.255.3"
906         cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
907         cls.pg5.set_table_ip4(10)
908         cls.pg6._local_ip4 = "172.16.255.1"
909         cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
910         cls.pg6.set_table_ip4(20)
911         for i in cls.overlapping_interfaces:
912             i.config_ip4()
913             i.admin_up()
914             i.resolve_arp()
915
916         cls.pg7.admin_up()
917         cls.pg8.admin_up()
918
919         cls.pg9.generate_remote_hosts(2)
920         cls.pg9.config_ip4()
921         cls.vapi.sw_interface_add_del_address(
922             sw_if_index=cls.pg9.sw_if_index,
923             prefix="10.0.0.1/24")
924
925         cls.pg9.admin_up()
926         cls.pg9.resolve_arp()
927         cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
928         cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
929         cls.pg9.resolve_arp()
930
931     def setUp(self):
932         super(TestNAT44EI, self).setUp()
933         self.vapi.nat44_plugin_enable_disable(
934             sessions=self.max_translations,
935             users=self.max_users, enable=1)
936
937     def tearDown(self):
938         super(TestNAT44EI, self).tearDown()
939         if not self.vpp_dead:
940             self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
941                                                src_port=self.ipfix_src_port,
942                                                enable=0)
943             self.ipfix_src_port = 4739
944             self.ipfix_domain_id = 1
945
946             self.vapi.nat44_plugin_enable_disable(enable=0)
947             self.vapi.cli("clear logging")
948
949     def test_clear_sessions(self):
950         """ NAT44EI session clearing test """
951
952         self.nat44_add_address(self.nat_addr)
953         flags = self.config_flags.NAT_IS_INSIDE
954         self.vapi.nat44_interface_add_del_feature(
955             sw_if_index=self.pg0.sw_if_index,
956             flags=flags, is_add=1)
957         self.vapi.nat44_interface_add_del_feature(
958             sw_if_index=self.pg1.sw_if_index,
959             is_add=1)
960
961         nat_config = self.vapi.nat_show_config()
962         self.assertEqual(0, nat_config.endpoint_dependent)
963
964         pkts = self.create_stream_in(self.pg0, self.pg1)
965         self.pg0.add_stream(pkts)
966         self.pg_enable_capture(self.pg_interfaces)
967         self.pg_start()
968         capture = self.pg1.get_capture(len(pkts))
969         self.verify_capture_out(capture)
970
971         sessions = self.statistics.get_counter('/nat44/total-sessions')
972         self.assertTrue(sessions[0][0] > 0)
973         self.logger.info("sessions before clearing: %s" % sessions[0][0])
974
975         self.vapi.cli("clear nat44 sessions")
976
977         sessions = self.statistics.get_counter('/nat44/total-sessions')
978         self.assertEqual(sessions[0][0], 0)
979         self.logger.info("sessions after clearing: %s" % sessions[0][0])
980
981     def test_dynamic(self):
982         """ NAT44EI dynamic translation test """
983         self.nat44_add_address(self.nat_addr)
984         flags = self.config_flags.NAT_IS_INSIDE
985         self.vapi.nat44_interface_add_del_feature(
986             sw_if_index=self.pg0.sw_if_index,
987             flags=flags, is_add=1)
988         self.vapi.nat44_interface_add_del_feature(
989             sw_if_index=self.pg1.sw_if_index,
990             is_add=1)
991
992         # in2out
993         tcpn = self.statistics.get_counter('/nat44/in2out/slowpath/tcp')[0]
994         udpn = self.statistics.get_counter('/nat44/in2out/slowpath/udp')[0]
995         icmpn = self.statistics.get_counter('/nat44/in2out/slowpath/icmp')[0]
996         drops = self.statistics.get_counter('/nat44/in2out/slowpath/drops')[0]
997
998         pkts = self.create_stream_in(self.pg0, self.pg1)
999         self.pg0.add_stream(pkts)
1000         self.pg_enable_capture(self.pg_interfaces)
1001         self.pg_start()
1002         capture = self.pg1.get_capture(len(pkts))
1003         self.verify_capture_out(capture)
1004
1005         if_idx = self.pg0.sw_if_index
1006         cnt = self.statistics.get_counter('/nat44/in2out/slowpath/tcp')[0]
1007         self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2)
1008         cnt = self.statistics.get_counter('/nat44/in2out/slowpath/udp')[0]
1009         self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
1010         cnt = self.statistics.get_counter('/nat44/in2out/slowpath/icmp')[0]
1011         self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
1012         cnt = self.statistics.get_counter('/nat44/in2out/slowpath/drops')[0]
1013         self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
1014
1015         # out2in
1016         tcpn = self.statistics.get_counter('/nat44/out2in/slowpath/tcp')[0]
1017         udpn = self.statistics.get_counter('/nat44/out2in/slowpath/udp')[0]
1018         icmpn = self.statistics.get_counter('/nat44/out2in/slowpath/icmp')[0]
1019         drops = self.statistics.get_counter('/nat44/out2in/slowpath/drops')[0]
1020
1021         pkts = self.create_stream_out(self.pg1)
1022         self.pg1.add_stream(pkts)
1023         self.pg_enable_capture(self.pg_interfaces)
1024         self.pg_start()
1025         capture = self.pg0.get_capture(len(pkts))
1026         self.verify_capture_in(capture, self.pg0)
1027
1028         if_idx = self.pg1.sw_if_index
1029         cnt = self.statistics.get_counter('/nat44/out2in/slowpath/tcp')[0]
1030         self.assertEqual(cnt[if_idx] - tcpn[if_idx], 2)
1031         cnt = self.statistics.get_counter('/nat44/out2in/slowpath/udp')[0]
1032         self.assertEqual(cnt[if_idx] - udpn[if_idx], 1)
1033         cnt = self.statistics.get_counter('/nat44/out2in/slowpath/icmp')[0]
1034         self.assertEqual(cnt[if_idx] - icmpn[if_idx], 1)
1035         cnt = self.statistics.get_counter('/nat44/out2in/slowpath/drops')[0]
1036         self.assertEqual(cnt[if_idx] - drops[if_idx], 0)
1037
1038         users = self.statistics.get_counter('/nat44/total-users')
1039         self.assertEqual(users[0][0], 1)
1040         sessions = self.statistics.get_counter('/nat44/total-sessions')
1041         self.assertEqual(sessions[0][0], 3)
1042
1043     def test_dynamic_icmp_errors_in2out_ttl_1(self):
1044         """ NAT44EI handling of client packets with TTL=1 """
1045
1046         self.nat44_add_address(self.nat_addr)
1047         flags = self.config_flags.NAT_IS_INSIDE
1048         self.vapi.nat44_interface_add_del_feature(
1049             sw_if_index=self.pg0.sw_if_index,
1050             flags=flags, is_add=1)
1051         self.vapi.nat44_interface_add_del_feature(
1052             sw_if_index=self.pg1.sw_if_index,
1053             is_add=1)
1054
1055         # Client side - generate traffic
1056         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1057         self.pg0.add_stream(pkts)
1058         self.pg_enable_capture(self.pg_interfaces)
1059         self.pg_start()
1060
1061         # Client side - verify ICMP type 11 packets
1062         capture = self.pg0.get_capture(len(pkts))
1063         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1064
1065     def test_dynamic_icmp_errors_out2in_ttl_1(self):
1066         """ NAT44EI handling of server packets with TTL=1 """
1067
1068         self.nat44_add_address(self.nat_addr)
1069         flags = self.config_flags.NAT_IS_INSIDE
1070         self.vapi.nat44_interface_add_del_feature(
1071             sw_if_index=self.pg0.sw_if_index,
1072             flags=flags, is_add=1)
1073         self.vapi.nat44_interface_add_del_feature(
1074             sw_if_index=self.pg1.sw_if_index,
1075             is_add=1)
1076
1077         # Client side - create sessions
1078         pkts = self.create_stream_in(self.pg0, self.pg1)
1079         self.pg0.add_stream(pkts)
1080         self.pg_enable_capture(self.pg_interfaces)
1081         self.pg_start()
1082
1083         # Server side - generate traffic
1084         capture = self.pg1.get_capture(len(pkts))
1085         self.verify_capture_out(capture)
1086         pkts = self.create_stream_out(self.pg1, ttl=1)
1087         self.pg1.add_stream(pkts)
1088         self.pg_enable_capture(self.pg_interfaces)
1089         self.pg_start()
1090
1091         # Server side - verify ICMP type 11 packets
1092         capture = self.pg1.get_capture(len(pkts))
1093         self.verify_capture_out_with_icmp_errors(capture,
1094                                                  src_ip=self.pg1.local_ip4)
1095
1096     def test_dynamic_icmp_errors_in2out_ttl_2(self):
1097         """ NAT44EI handling of error responses to client packets with TTL=2
1098         """
1099
1100         self.nat44_add_address(self.nat_addr)
1101         flags = self.config_flags.NAT_IS_INSIDE
1102         self.vapi.nat44_interface_add_del_feature(
1103             sw_if_index=self.pg0.sw_if_index,
1104             flags=flags, is_add=1)
1105         self.vapi.nat44_interface_add_del_feature(
1106             sw_if_index=self.pg1.sw_if_index,
1107             is_add=1)
1108
1109         # Client side - generate traffic
1110         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1111         self.pg0.add_stream(pkts)
1112         self.pg_enable_capture(self.pg_interfaces)
1113         self.pg_start()
1114
1115         # Server side - simulate ICMP type 11 response
1116         capture = self.pg1.get_capture(len(pkts))
1117         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1118                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1119                 ICMP(type=11) / packet[IP] for packet in capture]
1120         self.pg1.add_stream(pkts)
1121         self.pg_enable_capture(self.pg_interfaces)
1122         self.pg_start()
1123
1124         # Client side - verify ICMP type 11 packets
1125         capture = self.pg0.get_capture(len(pkts))
1126         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1127
1128     def test_dynamic_icmp_errors_out2in_ttl_2(self):
1129         """ NAT44EI handling of error responses to server packets with TTL=2
1130         """
1131
1132         self.nat44_add_address(self.nat_addr)
1133         flags = self.config_flags.NAT_IS_INSIDE
1134         self.vapi.nat44_interface_add_del_feature(
1135             sw_if_index=self.pg0.sw_if_index,
1136             flags=flags, is_add=1)
1137         self.vapi.nat44_interface_add_del_feature(
1138             sw_if_index=self.pg1.sw_if_index,
1139             is_add=1)
1140
1141         # Client side - create sessions
1142         pkts = self.create_stream_in(self.pg0, self.pg1)
1143         self.pg0.add_stream(pkts)
1144         self.pg_enable_capture(self.pg_interfaces)
1145         self.pg_start()
1146
1147         # Server side - generate traffic
1148         capture = self.pg1.get_capture(len(pkts))
1149         self.verify_capture_out(capture)
1150         pkts = self.create_stream_out(self.pg1, ttl=2)
1151         self.pg1.add_stream(pkts)
1152         self.pg_enable_capture(self.pg_interfaces)
1153         self.pg_start()
1154
1155         # Client side - simulate ICMP type 11 response
1156         capture = self.pg0.get_capture(len(pkts))
1157         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1158                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1159                 ICMP(type=11) / packet[IP] for packet in capture]
1160         self.pg0.add_stream(pkts)
1161         self.pg_enable_capture(self.pg_interfaces)
1162         self.pg_start()
1163
1164         # Server side - verify ICMP type 11 packets
1165         capture = self.pg1.get_capture(len(pkts))
1166         self.verify_capture_out_with_icmp_errors(capture)
1167
1168     def test_ping_out_interface_from_outside(self):
1169         """ NAT44EI ping out interface from outside network """
1170
1171         self.nat44_add_address(self.nat_addr)
1172         flags = self.config_flags.NAT_IS_INSIDE
1173         self.vapi.nat44_interface_add_del_feature(
1174             sw_if_index=self.pg0.sw_if_index,
1175             flags=flags, is_add=1)
1176         self.vapi.nat44_interface_add_del_feature(
1177             sw_if_index=self.pg1.sw_if_index,
1178             is_add=1)
1179
1180         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1181              IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1182              ICMP(id=self.icmp_id_out, type='echo-request'))
1183         pkts = [p]
1184         self.pg1.add_stream(pkts)
1185         self.pg_enable_capture(self.pg_interfaces)
1186         self.pg_start()
1187         capture = self.pg1.get_capture(len(pkts))
1188         packet = capture[0]
1189         try:
1190             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1191             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1192             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1193             self.assertEqual(packet[ICMP].type, 0)  # echo reply
1194         except:
1195             self.logger.error(ppp("Unexpected or invalid packet "
1196                                   "(outside network):", packet))
1197             raise
1198
1199     def test_ping_internal_host_from_outside(self):
1200         """ NAT44EI ping internal host from outside network """
1201
1202         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1203         flags = self.config_flags.NAT_IS_INSIDE
1204         self.vapi.nat44_interface_add_del_feature(
1205             sw_if_index=self.pg0.sw_if_index,
1206             flags=flags, is_add=1)
1207         self.vapi.nat44_interface_add_del_feature(
1208             sw_if_index=self.pg1.sw_if_index,
1209             is_add=1)
1210
1211         # out2in
1212         pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1213                IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1214                ICMP(id=self.icmp_id_out, type='echo-request'))
1215         self.pg1.add_stream(pkt)
1216         self.pg_enable_capture(self.pg_interfaces)
1217         self.pg_start()
1218         capture = self.pg0.get_capture(1)
1219         self.verify_capture_in(capture, self.pg0)
1220         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1221
1222         # in2out
1223         pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1224                IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1225                ICMP(id=self.icmp_id_in, type='echo-reply'))
1226         self.pg0.add_stream(pkt)
1227         self.pg_enable_capture(self.pg_interfaces)
1228         self.pg_start()
1229         capture = self.pg1.get_capture(1)
1230         self.verify_capture_out(capture, same_port=True)
1231         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1232
1233     def test_forwarding(self):
1234         """ NAT44EI forwarding test """
1235
1236         flags = self.config_flags.NAT_IS_INSIDE
1237         self.vapi.nat44_interface_add_del_feature(
1238             sw_if_index=self.pg0.sw_if_index,
1239             flags=flags, is_add=1)
1240         self.vapi.nat44_interface_add_del_feature(
1241             sw_if_index=self.pg1.sw_if_index,
1242             is_add=1)
1243         self.vapi.nat44_forwarding_enable_disable(enable=1)
1244
1245         real_ip = self.pg0.remote_ip4
1246         alias_ip = self.nat_addr
1247         flags = self.config_flags.NAT_IS_ADDR_ONLY
1248         self.vapi.nat44_add_del_static_mapping(is_add=1,
1249                                                local_ip_address=real_ip,
1250                                                external_ip_address=alias_ip,
1251                                                external_sw_if_index=0xFFFFFFFF,
1252                                                flags=flags)
1253
1254         try:
1255             # static mapping match
1256
1257             pkts = self.create_stream_out(self.pg1)
1258             self.pg1.add_stream(pkts)
1259             self.pg_enable_capture(self.pg_interfaces)
1260             self.pg_start()
1261             capture = self.pg0.get_capture(len(pkts))
1262             self.verify_capture_in(capture, self.pg0)
1263
1264             pkts = self.create_stream_in(self.pg0, self.pg1)
1265             self.pg0.add_stream(pkts)
1266             self.pg_enable_capture(self.pg_interfaces)
1267             self.pg_start()
1268             capture = self.pg1.get_capture(len(pkts))
1269             self.verify_capture_out(capture, same_port=True)
1270
1271             # no static mapping match
1272
1273             host0 = self.pg0.remote_hosts[0]
1274             self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1275             try:
1276                 pkts = self.create_stream_out(self.pg1,
1277                                               dst_ip=self.pg0.remote_ip4,
1278                                               use_inside_ports=True)
1279                 self.pg1.add_stream(pkts)
1280                 self.pg_enable_capture(self.pg_interfaces)
1281                 self.pg_start()
1282                 capture = self.pg0.get_capture(len(pkts))
1283                 self.verify_capture_in(capture, self.pg0)
1284
1285                 pkts = self.create_stream_in(self.pg0, self.pg1)
1286                 self.pg0.add_stream(pkts)
1287                 self.pg_enable_capture(self.pg_interfaces)
1288                 self.pg_start()
1289                 capture = self.pg1.get_capture(len(pkts))
1290                 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1291                                         same_port=True)
1292             finally:
1293                 self.pg0.remote_hosts[0] = host0
1294
1295         finally:
1296             self.vapi.nat44_forwarding_enable_disable(enable=0)
1297             flags = self.config_flags.NAT_IS_ADDR_ONLY
1298             self.vapi.nat44_add_del_static_mapping(
1299                 is_add=0,
1300                 local_ip_address=real_ip,
1301                 external_ip_address=alias_ip,
1302                 external_sw_if_index=0xFFFFFFFF,
1303                 flags=flags)
1304
1305     def test_static_in(self):
1306         """ NAT44EI 1:1 NAT initialized from inside network """
1307
1308         nat_ip = "10.0.0.10"
1309         self.tcp_port_out = 6303
1310         self.udp_port_out = 6304
1311         self.icmp_id_out = 6305
1312
1313         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1314         flags = self.config_flags.NAT_IS_INSIDE
1315         self.vapi.nat44_interface_add_del_feature(
1316             sw_if_index=self.pg0.sw_if_index,
1317             flags=flags, is_add=1)
1318         self.vapi.nat44_interface_add_del_feature(
1319             sw_if_index=self.pg1.sw_if_index,
1320             is_add=1)
1321         sm = self.vapi.nat44_static_mapping_dump()
1322         self.assertEqual(len(sm), 1)
1323         self.assertEqual(sm[0].tag, '')
1324         self.assertEqual(sm[0].protocol, 0)
1325         self.assertEqual(sm[0].local_port, 0)
1326         self.assertEqual(sm[0].external_port, 0)
1327
1328         # in2out
1329         pkts = self.create_stream_in(self.pg0, self.pg1)
1330         self.pg0.add_stream(pkts)
1331         self.pg_enable_capture(self.pg_interfaces)
1332         self.pg_start()
1333         capture = self.pg1.get_capture(len(pkts))
1334         self.verify_capture_out(capture, nat_ip, True)
1335
1336         # out2in
1337         pkts = self.create_stream_out(self.pg1, nat_ip)
1338         self.pg1.add_stream(pkts)
1339         self.pg_enable_capture(self.pg_interfaces)
1340         self.pg_start()
1341         capture = self.pg0.get_capture(len(pkts))
1342         self.verify_capture_in(capture, self.pg0)
1343
1344     def test_static_out(self):
1345         """ NAT44EI 1:1 NAT initialized from outside network """
1346
1347         nat_ip = "10.0.0.20"
1348         self.tcp_port_out = 6303
1349         self.udp_port_out = 6304
1350         self.icmp_id_out = 6305
1351         tag = "testTAG"
1352
1353         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1354         flags = self.config_flags.NAT_IS_INSIDE
1355         self.vapi.nat44_interface_add_del_feature(
1356             sw_if_index=self.pg0.sw_if_index,
1357             flags=flags, is_add=1)
1358         self.vapi.nat44_interface_add_del_feature(
1359             sw_if_index=self.pg1.sw_if_index,
1360             is_add=1)
1361         sm = self.vapi.nat44_static_mapping_dump()
1362         self.assertEqual(len(sm), 1)
1363         self.assertEqual(sm[0].tag, tag)
1364
1365         # out2in
1366         pkts = self.create_stream_out(self.pg1, nat_ip)
1367         self.pg1.add_stream(pkts)
1368         self.pg_enable_capture(self.pg_interfaces)
1369         self.pg_start()
1370         capture = self.pg0.get_capture(len(pkts))
1371         self.verify_capture_in(capture, self.pg0)
1372
1373         # in2out
1374         pkts = self.create_stream_in(self.pg0, self.pg1)
1375         self.pg0.add_stream(pkts)
1376         self.pg_enable_capture(self.pg_interfaces)
1377         self.pg_start()
1378         capture = self.pg1.get_capture(len(pkts))
1379         self.verify_capture_out(capture, nat_ip, True)
1380
1381     def test_static_with_port_in(self):
1382         """ NAT44EI 1:1 NAPT initialized from inside network """
1383
1384         self.tcp_port_out = 3606
1385         self.udp_port_out = 3607
1386         self.icmp_id_out = 3608
1387
1388         self.nat44_add_address(self.nat_addr)
1389         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1390                                       self.tcp_port_in, self.tcp_port_out,
1391                                       proto=IP_PROTOS.tcp)
1392         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1393                                       self.udp_port_in, self.udp_port_out,
1394                                       proto=IP_PROTOS.udp)
1395         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1396                                       self.icmp_id_in, self.icmp_id_out,
1397                                       proto=IP_PROTOS.icmp)
1398         flags = self.config_flags.NAT_IS_INSIDE
1399         self.vapi.nat44_interface_add_del_feature(
1400             sw_if_index=self.pg0.sw_if_index,
1401             flags=flags, is_add=1)
1402         self.vapi.nat44_interface_add_del_feature(
1403             sw_if_index=self.pg1.sw_if_index,
1404             is_add=1)
1405
1406         # in2out
1407         pkts = self.create_stream_in(self.pg0, self.pg1)
1408         self.pg0.add_stream(pkts)
1409         self.pg_enable_capture(self.pg_interfaces)
1410         self.pg_start()
1411         capture = self.pg1.get_capture(len(pkts))
1412         self.verify_capture_out(capture)
1413
1414         # out2in
1415         pkts = self.create_stream_out(self.pg1)
1416         self.pg1.add_stream(pkts)
1417         self.pg_enable_capture(self.pg_interfaces)
1418         self.pg_start()
1419         capture = self.pg0.get_capture(len(pkts))
1420         self.verify_capture_in(capture, self.pg0)
1421
1422     def test_static_with_port_out(self):
1423         """ NAT44EI 1:1 NAPT initialized from outside network """
1424
1425         self.tcp_port_out = 30606
1426         self.udp_port_out = 30607
1427         self.icmp_id_out = 30608
1428
1429         self.nat44_add_address(self.nat_addr)
1430         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1431                                       self.tcp_port_in, self.tcp_port_out,
1432                                       proto=IP_PROTOS.tcp)
1433         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1434                                       self.udp_port_in, self.udp_port_out,
1435                                       proto=IP_PROTOS.udp)
1436         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1437                                       self.icmp_id_in, self.icmp_id_out,
1438                                       proto=IP_PROTOS.icmp)
1439         flags = self.config_flags.NAT_IS_INSIDE
1440         self.vapi.nat44_interface_add_del_feature(
1441             sw_if_index=self.pg0.sw_if_index,
1442             flags=flags, is_add=1)
1443         self.vapi.nat44_interface_add_del_feature(
1444             sw_if_index=self.pg1.sw_if_index,
1445             is_add=1)
1446
1447         # out2in
1448         pkts = self.create_stream_out(self.pg1)
1449         self.pg1.add_stream(pkts)
1450         self.pg_enable_capture(self.pg_interfaces)
1451         self.pg_start()
1452         capture = self.pg0.get_capture(len(pkts))
1453         self.verify_capture_in(capture, self.pg0)
1454
1455         # in2out
1456         pkts = self.create_stream_in(self.pg0, self.pg1)
1457         self.pg0.add_stream(pkts)
1458         self.pg_enable_capture(self.pg_interfaces)
1459         self.pg_start()
1460         capture = self.pg1.get_capture(len(pkts))
1461         self.verify_capture_out(capture)
1462
1463     def test_static_vrf_aware(self):
1464         """ NAT44EI 1:1 NAT VRF awareness """
1465
1466         nat_ip1 = "10.0.0.30"
1467         nat_ip2 = "10.0.0.40"
1468         self.tcp_port_out = 6303
1469         self.udp_port_out = 6304
1470         self.icmp_id_out = 6305
1471
1472         self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1473                                       vrf_id=10)
1474         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1475                                       vrf_id=10)
1476         flags = self.config_flags.NAT_IS_INSIDE
1477         self.vapi.nat44_interface_add_del_feature(
1478             sw_if_index=self.pg3.sw_if_index,
1479             is_add=1)
1480         self.vapi.nat44_interface_add_del_feature(
1481             sw_if_index=self.pg0.sw_if_index,
1482             flags=flags, is_add=1)
1483         self.vapi.nat44_interface_add_del_feature(
1484             sw_if_index=self.pg4.sw_if_index,
1485             flags=flags, is_add=1)
1486
1487         # inside interface VRF match NAT44EI static mapping VRF
1488         pkts = self.create_stream_in(self.pg4, self.pg3)
1489         self.pg4.add_stream(pkts)
1490         self.pg_enable_capture(self.pg_interfaces)
1491         self.pg_start()
1492         capture = self.pg3.get_capture(len(pkts))
1493         self.verify_capture_out(capture, nat_ip1, True)
1494
1495         # inside interface VRF don't match NAT44EI static mapping VRF (packets
1496         # are dropped)
1497         pkts = self.create_stream_in(self.pg0, self.pg3)
1498         self.pg0.add_stream(pkts)
1499         self.pg_enable_capture(self.pg_interfaces)
1500         self.pg_start()
1501         self.pg3.assert_nothing_captured()
1502
1503     def test_dynamic_to_static(self):
1504         """ NAT44EI Switch from dynamic translation to 1:1NAT """
1505         nat_ip = "10.0.0.10"
1506         self.tcp_port_out = 6303
1507         self.udp_port_out = 6304
1508         self.icmp_id_out = 6305
1509
1510         self.nat44_add_address(self.nat_addr)
1511         flags = self.config_flags.NAT_IS_INSIDE
1512         self.vapi.nat44_interface_add_del_feature(
1513             sw_if_index=self.pg0.sw_if_index,
1514             flags=flags, is_add=1)
1515         self.vapi.nat44_interface_add_del_feature(
1516             sw_if_index=self.pg1.sw_if_index,
1517             is_add=1)
1518
1519         # dynamic
1520         pkts = self.create_stream_in(self.pg0, self.pg1)
1521         self.pg0.add_stream(pkts)
1522         self.pg_enable_capture(self.pg_interfaces)
1523         self.pg_start()
1524         capture = self.pg1.get_capture(len(pkts))
1525         self.verify_capture_out(capture)
1526
1527         # 1:1NAT
1528         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1529         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
1530         self.assertEqual(len(sessions), 0)
1531         pkts = self.create_stream_in(self.pg0, self.pg1)
1532         self.pg0.add_stream(pkts)
1533         self.pg_enable_capture(self.pg_interfaces)
1534         self.pg_start()
1535         capture = self.pg1.get_capture(len(pkts))
1536         self.verify_capture_out(capture, nat_ip, True)
1537
1538     def test_identity_nat(self):
1539         """ NAT44EI Identity NAT """
1540         flags = self.config_flags.NAT_IS_ADDR_ONLY
1541         self.vapi.nat44_add_del_identity_mapping(
1542             ip_address=self.pg0.remote_ip4, sw_if_index=0xFFFFFFFF,
1543             flags=flags, is_add=1)
1544         flags = self.config_flags.NAT_IS_INSIDE
1545         self.vapi.nat44_interface_add_del_feature(
1546             sw_if_index=self.pg0.sw_if_index,
1547             flags=flags, is_add=1)
1548         self.vapi.nat44_interface_add_del_feature(
1549             sw_if_index=self.pg1.sw_if_index,
1550             is_add=1)
1551
1552         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1553              IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1554              TCP(sport=12345, dport=56789))
1555         self.pg1.add_stream(p)
1556         self.pg_enable_capture(self.pg_interfaces)
1557         self.pg_start()
1558         capture = self.pg0.get_capture(1)
1559         p = capture[0]
1560         try:
1561             ip = p[IP]
1562             tcp = p[TCP]
1563             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1564             self.assertEqual(ip.src, self.pg1.remote_ip4)
1565             self.assertEqual(tcp.dport, 56789)
1566             self.assertEqual(tcp.sport, 12345)
1567             self.assert_packet_checksums_valid(p)
1568         except:
1569             self.logger.error(ppp("Unexpected or invalid packet:", p))
1570             raise
1571
1572         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
1573         self.assertEqual(len(sessions), 0)
1574         flags = self.config_flags.NAT_IS_ADDR_ONLY
1575         self.vapi.nat44_add_del_identity_mapping(
1576             ip_address=self.pg0.remote_ip4, sw_if_index=0xFFFFFFFF,
1577             flags=flags, vrf_id=1, is_add=1)
1578         identity_mappings = self.vapi.nat44_identity_mapping_dump()
1579         self.assertEqual(len(identity_mappings), 2)
1580
1581     def test_multiple_inside_interfaces(self):
1582         """ NAT44EI multiple non-overlapping address space inside interfaces
1583         """
1584
1585         self.nat44_add_address(self.nat_addr)
1586         flags = self.config_flags.NAT_IS_INSIDE
1587         self.vapi.nat44_interface_add_del_feature(
1588             sw_if_index=self.pg0.sw_if_index,
1589             flags=flags, is_add=1)
1590         self.vapi.nat44_interface_add_del_feature(
1591             sw_if_index=self.pg1.sw_if_index,
1592             flags=flags, is_add=1)
1593         self.vapi.nat44_interface_add_del_feature(
1594             sw_if_index=self.pg3.sw_if_index,
1595             is_add=1)
1596
1597         # between two NAT44EI inside interfaces (no translation)
1598         pkts = self.create_stream_in(self.pg0, self.pg1)
1599         self.pg0.add_stream(pkts)
1600         self.pg_enable_capture(self.pg_interfaces)
1601         self.pg_start()
1602         capture = self.pg1.get_capture(len(pkts))
1603         self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1604
1605         # from inside to interface without translation
1606         pkts = self.create_stream_in(self.pg0, self.pg2)
1607         self.pg0.add_stream(pkts)
1608         self.pg_enable_capture(self.pg_interfaces)
1609         self.pg_start()
1610         capture = self.pg2.get_capture(len(pkts))
1611         self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1612
1613         # in2out 1st interface
1614         pkts = self.create_stream_in(self.pg0, self.pg3)
1615         self.pg0.add_stream(pkts)
1616         self.pg_enable_capture(self.pg_interfaces)
1617         self.pg_start()
1618         capture = self.pg3.get_capture(len(pkts))
1619         self.verify_capture_out(capture)
1620
1621         # out2in 1st interface
1622         pkts = self.create_stream_out(self.pg3)
1623         self.pg3.add_stream(pkts)
1624         self.pg_enable_capture(self.pg_interfaces)
1625         self.pg_start()
1626         capture = self.pg0.get_capture(len(pkts))
1627         self.verify_capture_in(capture, self.pg0)
1628
1629         # in2out 2nd interface
1630         pkts = self.create_stream_in(self.pg1, self.pg3)
1631         self.pg1.add_stream(pkts)
1632         self.pg_enable_capture(self.pg_interfaces)
1633         self.pg_start()
1634         capture = self.pg3.get_capture(len(pkts))
1635         self.verify_capture_out(capture)
1636
1637         # out2in 2nd interface
1638         pkts = self.create_stream_out(self.pg3)
1639         self.pg3.add_stream(pkts)
1640         self.pg_enable_capture(self.pg_interfaces)
1641         self.pg_start()
1642         capture = self.pg1.get_capture(len(pkts))
1643         self.verify_capture_in(capture, self.pg1)
1644
1645     def test_inside_overlapping_interfaces(self):
1646         """ NAT44EI multiple inside interfaces with overlapping address space
1647         """
1648
1649         static_nat_ip = "10.0.0.10"
1650         self.nat44_add_address(self.nat_addr)
1651         flags = self.config_flags.NAT_IS_INSIDE
1652         self.vapi.nat44_interface_add_del_feature(
1653             sw_if_index=self.pg3.sw_if_index,
1654             is_add=1)
1655         self.vapi.nat44_interface_add_del_feature(
1656             sw_if_index=self.pg4.sw_if_index,
1657             flags=flags, is_add=1)
1658         self.vapi.nat44_interface_add_del_feature(
1659             sw_if_index=self.pg5.sw_if_index,
1660             flags=flags, is_add=1)
1661         self.vapi.nat44_interface_add_del_feature(
1662             sw_if_index=self.pg6.sw_if_index,
1663             flags=flags, is_add=1)
1664         self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1665                                       vrf_id=20)
1666
1667         # between NAT44EI inside interfaces with same VRF (no translation)
1668         pkts = self.create_stream_in(self.pg4, self.pg5)
1669         self.pg4.add_stream(pkts)
1670         self.pg_enable_capture(self.pg_interfaces)
1671         self.pg_start()
1672         capture = self.pg5.get_capture(len(pkts))
1673         self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1674
1675         # between NAT44EI inside interfaces with different VRF (hairpinning)
1676         p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1677              IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1678              TCP(sport=1234, dport=5678))
1679         self.pg4.add_stream(p)
1680         self.pg_enable_capture(self.pg_interfaces)
1681         self.pg_start()
1682         capture = self.pg6.get_capture(1)
1683         p = capture[0]
1684         try:
1685             ip = p[IP]
1686             tcp = p[TCP]
1687             self.assertEqual(ip.src, self.nat_addr)
1688             self.assertEqual(ip.dst, self.pg6.remote_ip4)
1689             self.assertNotEqual(tcp.sport, 1234)
1690             self.assertEqual(tcp.dport, 5678)
1691         except:
1692             self.logger.error(ppp("Unexpected or invalid packet:", p))
1693             raise
1694
1695         # in2out 1st interface
1696         pkts = self.create_stream_in(self.pg4, self.pg3)
1697         self.pg4.add_stream(pkts)
1698         self.pg_enable_capture(self.pg_interfaces)
1699         self.pg_start()
1700         capture = self.pg3.get_capture(len(pkts))
1701         self.verify_capture_out(capture)
1702
1703         # out2in 1st interface
1704         pkts = self.create_stream_out(self.pg3)
1705         self.pg3.add_stream(pkts)
1706         self.pg_enable_capture(self.pg_interfaces)
1707         self.pg_start()
1708         capture = self.pg4.get_capture(len(pkts))
1709         self.verify_capture_in(capture, self.pg4)
1710
1711         # in2out 2nd interface
1712         pkts = self.create_stream_in(self.pg5, self.pg3)
1713         self.pg5.add_stream(pkts)
1714         self.pg_enable_capture(self.pg_interfaces)
1715         self.pg_start()
1716         capture = self.pg3.get_capture(len(pkts))
1717         self.verify_capture_out(capture)
1718
1719         # out2in 2nd interface
1720         pkts = self.create_stream_out(self.pg3)
1721         self.pg3.add_stream(pkts)
1722         self.pg_enable_capture(self.pg_interfaces)
1723         self.pg_start()
1724         capture = self.pg5.get_capture(len(pkts))
1725         self.verify_capture_in(capture, self.pg5)
1726
1727         # pg5 session dump
1728         addresses = self.vapi.nat44_address_dump()
1729         self.assertEqual(len(addresses), 1)
1730         sessions = self.vapi.nat44_user_session_dump(self.pg5.remote_ip4, 10)
1731         self.assertEqual(len(sessions), 3)
1732         for session in sessions:
1733             self.assertFalse(session.flags & self.config_flags.NAT_IS_STATIC)
1734             self.assertEqual(str(session.inside_ip_address),
1735                              self.pg5.remote_ip4)
1736             self.assertEqual(session.outside_ip_address,
1737                              addresses[0].ip_address)
1738         self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1739         self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1740         self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1741         self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1742         self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1743         self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1744         self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1745         self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1746         self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1747
1748         # in2out 3rd interface
1749         pkts = self.create_stream_in(self.pg6, self.pg3)
1750         self.pg6.add_stream(pkts)
1751         self.pg_enable_capture(self.pg_interfaces)
1752         self.pg_start()
1753         capture = self.pg3.get_capture(len(pkts))
1754         self.verify_capture_out(capture, static_nat_ip, True)
1755
1756         # out2in 3rd interface
1757         pkts = self.create_stream_out(self.pg3, static_nat_ip)
1758         self.pg3.add_stream(pkts)
1759         self.pg_enable_capture(self.pg_interfaces)
1760         self.pg_start()
1761         capture = self.pg6.get_capture(len(pkts))
1762         self.verify_capture_in(capture, self.pg6)
1763
1764         # general user and session dump verifications
1765         users = self.vapi.nat44_user_dump()
1766         self.assertGreaterEqual(len(users), 3)
1767         addresses = self.vapi.nat44_address_dump()
1768         self.assertEqual(len(addresses), 1)
1769         for user in users:
1770             sessions = self.vapi.nat44_user_session_dump(user.ip_address,
1771                                                          user.vrf_id)
1772             for session in sessions:
1773                 self.assertEqual(user.ip_address, session.inside_ip_address)
1774                 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1775                 self.assertTrue(session.protocol in
1776                                 [IP_PROTOS.tcp, IP_PROTOS.udp,
1777                                  IP_PROTOS.icmp])
1778                 self.assertFalse(session.flags &
1779                                  self.config_flags.NAT_IS_EXT_HOST_VALID)
1780
1781         # pg4 session dump
1782         sessions = self.vapi.nat44_user_session_dump(self.pg4.remote_ip4, 10)
1783         self.assertGreaterEqual(len(sessions), 4)
1784         for session in sessions:
1785             self.assertFalse(session.flags & self.config_flags.NAT_IS_STATIC)
1786             self.assertEqual(str(session.inside_ip_address),
1787                              self.pg4.remote_ip4)
1788             self.assertEqual(session.outside_ip_address,
1789                              addresses[0].ip_address)
1790
1791         # pg6 session dump
1792         sessions = self.vapi.nat44_user_session_dump(self.pg6.remote_ip4, 20)
1793         self.assertGreaterEqual(len(sessions), 3)
1794         for session in sessions:
1795             self.assertTrue(session.flags & self.config_flags.NAT_IS_STATIC)
1796             self.assertEqual(str(session.inside_ip_address),
1797                              self.pg6.remote_ip4)
1798             self.assertEqual(str(session.outside_ip_address),
1799                              static_nat_ip)
1800             self.assertTrue(session.inside_port in
1801                             [self.tcp_port_in, self.udp_port_in,
1802                              self.icmp_id_in])
1803
1804     def test_hairpinning(self):
1805         """ NAT44EI hairpinning - 1:1 NAPT """
1806
1807         host = self.pg0.remote_hosts[0]
1808         server = self.pg0.remote_hosts[1]
1809         host_in_port = 1234
1810         host_out_port = 0
1811         server_in_port = 5678
1812         server_out_port = 8765
1813
1814         self.nat44_add_address(self.nat_addr)
1815         flags = self.config_flags.NAT_IS_INSIDE
1816         self.vapi.nat44_interface_add_del_feature(
1817             sw_if_index=self.pg0.sw_if_index,
1818             flags=flags, is_add=1)
1819         self.vapi.nat44_interface_add_del_feature(
1820             sw_if_index=self.pg1.sw_if_index,
1821             is_add=1)
1822
1823         # add static mapping for server
1824         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1825                                       server_in_port, server_out_port,
1826                                       proto=IP_PROTOS.tcp)
1827
1828         cnt = self.statistics.get_counter('/nat44/hairpinning')[0]
1829         # send packet from host to server
1830         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1831              IP(src=host.ip4, dst=self.nat_addr) /
1832              TCP(sport=host_in_port, dport=server_out_port))
1833         self.pg0.add_stream(p)
1834         self.pg_enable_capture(self.pg_interfaces)
1835         self.pg_start()
1836         capture = self.pg0.get_capture(1)
1837         p = capture[0]
1838         try:
1839             ip = p[IP]
1840             tcp = p[TCP]
1841             self.assertEqual(ip.src, self.nat_addr)
1842             self.assertEqual(ip.dst, server.ip4)
1843             self.assertNotEqual(tcp.sport, host_in_port)
1844             self.assertEqual(tcp.dport, server_in_port)
1845             self.assert_packet_checksums_valid(p)
1846             host_out_port = tcp.sport
1847         except:
1848             self.logger.error(ppp("Unexpected or invalid packet:", p))
1849             raise
1850
1851         after = self.statistics.get_counter('/nat44/hairpinning')[0]
1852         if_idx = self.pg0.sw_if_index
1853         self.assertEqual(after[if_idx] - cnt[if_idx], 1)
1854
1855         # send reply from server to host
1856         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1857              IP(src=server.ip4, dst=self.nat_addr) /
1858              TCP(sport=server_in_port, dport=host_out_port))
1859         self.pg0.add_stream(p)
1860         self.pg_enable_capture(self.pg_interfaces)
1861         self.pg_start()
1862         capture = self.pg0.get_capture(1)
1863         p = capture[0]
1864         try:
1865             ip = p[IP]
1866             tcp = p[TCP]
1867             self.assertEqual(ip.src, self.nat_addr)
1868             self.assertEqual(ip.dst, host.ip4)
1869             self.assertEqual(tcp.sport, server_out_port)
1870             self.assertEqual(tcp.dport, host_in_port)
1871             self.assert_packet_checksums_valid(p)
1872         except:
1873             self.logger.error(ppp("Unexpected or invalid packet:", p))
1874             raise
1875
1876         after = self.statistics.get_counter('/nat44/hairpinning')[0]
1877         if_idx = self.pg0.sw_if_index
1878         self.assertEqual(after[if_idx] - cnt[if_idx], 2)
1879
1880     def test_hairpinning2(self):
1881         """ NAT44EI hairpinning - 1:1 NAT"""
1882
1883         server1_nat_ip = "10.0.0.10"
1884         server2_nat_ip = "10.0.0.11"
1885         host = self.pg0.remote_hosts[0]
1886         server1 = self.pg0.remote_hosts[1]
1887         server2 = self.pg0.remote_hosts[2]
1888         server_tcp_port = 22
1889         server_udp_port = 20
1890
1891         self.nat44_add_address(self.nat_addr)
1892         flags = self.config_flags.NAT_IS_INSIDE
1893         self.vapi.nat44_interface_add_del_feature(
1894             sw_if_index=self.pg0.sw_if_index,
1895             flags=flags, is_add=1)
1896         self.vapi.nat44_interface_add_del_feature(
1897             sw_if_index=self.pg1.sw_if_index,
1898             is_add=1)
1899
1900         # add static mapping for servers
1901         self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1902         self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1903
1904         # host to server1
1905         pkts = []
1906         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1907              IP(src=host.ip4, dst=server1_nat_ip) /
1908              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1909         pkts.append(p)
1910         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1911              IP(src=host.ip4, dst=server1_nat_ip) /
1912              UDP(sport=self.udp_port_in, dport=server_udp_port))
1913         pkts.append(p)
1914         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1915              IP(src=host.ip4, dst=server1_nat_ip) /
1916              ICMP(id=self.icmp_id_in, type='echo-request'))
1917         pkts.append(p)
1918         self.pg0.add_stream(pkts)
1919         self.pg_enable_capture(self.pg_interfaces)
1920         self.pg_start()
1921         capture = self.pg0.get_capture(len(pkts))
1922         for packet in capture:
1923             try:
1924                 self.assertEqual(packet[IP].src, self.nat_addr)
1925                 self.assertEqual(packet[IP].dst, server1.ip4)
1926                 if packet.haslayer(TCP):
1927                     self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1928                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1929                     self.tcp_port_out = packet[TCP].sport
1930                     self.assert_packet_checksums_valid(packet)
1931                 elif packet.haslayer(UDP):
1932                     self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1933                     self.assertEqual(packet[UDP].dport, server_udp_port)
1934                     self.udp_port_out = packet[UDP].sport
1935                 else:
1936                     self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1937                     self.icmp_id_out = packet[ICMP].id
1938             except:
1939                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1940                 raise
1941
1942         # server1 to host
1943         pkts = []
1944         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1945              IP(src=server1.ip4, dst=self.nat_addr) /
1946              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1947         pkts.append(p)
1948         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1949              IP(src=server1.ip4, dst=self.nat_addr) /
1950              UDP(sport=server_udp_port, dport=self.udp_port_out))
1951         pkts.append(p)
1952         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1953              IP(src=server1.ip4, dst=self.nat_addr) /
1954              ICMP(id=self.icmp_id_out, type='echo-reply'))
1955         pkts.append(p)
1956         self.pg0.add_stream(pkts)
1957         self.pg_enable_capture(self.pg_interfaces)
1958         self.pg_start()
1959         capture = self.pg0.get_capture(len(pkts))
1960         for packet in capture:
1961             try:
1962                 self.assertEqual(packet[IP].src, server1_nat_ip)
1963                 self.assertEqual(packet[IP].dst, host.ip4)
1964                 if packet.haslayer(TCP):
1965                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1966                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1967                     self.assert_packet_checksums_valid(packet)
1968                 elif packet.haslayer(UDP):
1969                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1970                     self.assertEqual(packet[UDP].sport, server_udp_port)
1971                 else:
1972                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1973             except:
1974                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1975                 raise
1976
1977         # server2 to server1
1978         pkts = []
1979         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1980              IP(src=server2.ip4, dst=server1_nat_ip) /
1981              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1982         pkts.append(p)
1983         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1984              IP(src=server2.ip4, dst=server1_nat_ip) /
1985              UDP(sport=self.udp_port_in, dport=server_udp_port))
1986         pkts.append(p)
1987         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1988              IP(src=server2.ip4, dst=server1_nat_ip) /
1989              ICMP(id=self.icmp_id_in, type='echo-request'))
1990         pkts.append(p)
1991         self.pg0.add_stream(pkts)
1992         self.pg_enable_capture(self.pg_interfaces)
1993         self.pg_start()
1994         capture = self.pg0.get_capture(len(pkts))
1995         for packet in capture:
1996             try:
1997                 self.assertEqual(packet[IP].src, server2_nat_ip)
1998                 self.assertEqual(packet[IP].dst, server1.ip4)
1999                 if packet.haslayer(TCP):
2000                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
2001                     self.assertEqual(packet[TCP].dport, server_tcp_port)
2002                     self.tcp_port_out = packet[TCP].sport
2003                     self.assert_packet_checksums_valid(packet)
2004                 elif packet.haslayer(UDP):
2005                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
2006                     self.assertEqual(packet[UDP].dport, server_udp_port)
2007                     self.udp_port_out = packet[UDP].sport
2008                 else:
2009                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2010                     self.icmp_id_out = packet[ICMP].id
2011             except:
2012                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2013                 raise
2014
2015         # server1 to server2
2016         pkts = []
2017         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2018              IP(src=server1.ip4, dst=server2_nat_ip) /
2019              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2020         pkts.append(p)
2021         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2022              IP(src=server1.ip4, dst=server2_nat_ip) /
2023              UDP(sport=server_udp_port, dport=self.udp_port_out))
2024         pkts.append(p)
2025         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2026              IP(src=server1.ip4, dst=server2_nat_ip) /
2027              ICMP(id=self.icmp_id_out, type='echo-reply'))
2028         pkts.append(p)
2029         self.pg0.add_stream(pkts)
2030         self.pg_enable_capture(self.pg_interfaces)
2031         self.pg_start()
2032         capture = self.pg0.get_capture(len(pkts))
2033         for packet in capture:
2034             try:
2035                 self.assertEqual(packet[IP].src, server1_nat_ip)
2036                 self.assertEqual(packet[IP].dst, server2.ip4)
2037                 if packet.haslayer(TCP):
2038                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2039                     self.assertEqual(packet[TCP].sport, server_tcp_port)
2040                     self.assert_packet_checksums_valid(packet)
2041                 elif packet.haslayer(UDP):
2042                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
2043                     self.assertEqual(packet[UDP].sport, server_udp_port)
2044                 else:
2045                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2046             except:
2047                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2048                 raise
2049
2050     def test_hairpinning_avoid_inf_loop(self):
2051         """ NAT44 hairpinning - 1:1 NAPT avoid infinite loop """
2052
2053         host = self.pg0.remote_hosts[0]
2054         server = self.pg0.remote_hosts[1]
2055         host_in_port = 1234
2056         host_out_port = 0
2057         server_in_port = 5678
2058         server_out_port = 8765
2059
2060         self.nat44_add_address(self.nat_addr)
2061         flags = self.config_flags.NAT_IS_INSIDE
2062         self.vapi.nat44_interface_add_del_feature(
2063             sw_if_index=self.pg0.sw_if_index,
2064             flags=flags, is_add=1)
2065         self.vapi.nat44_interface_add_del_feature(
2066             sw_if_index=self.pg1.sw_if_index,
2067             is_add=1)
2068
2069         # add static mapping for server
2070         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2071                                       server_in_port, server_out_port,
2072                                       proto=IP_PROTOS.tcp)
2073
2074         # add another static mapping that maps pg0.local_ip4 address to itself
2075         self.nat44_add_static_mapping(self.pg0.local_ip4, self.pg0.local_ip4)
2076
2077         # send packet from host to VPP (the packet should get dropped)
2078         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2079              IP(src=host.ip4, dst=self.pg0.local_ip4) /
2080              TCP(sport=host_in_port, dport=server_out_port))
2081         self.pg0.add_stream(p)
2082         self.pg_enable_capture(self.pg_interfaces)
2083         self.pg_start()
2084         # Here VPP used to crash due to an infinite loop
2085
2086         cnt = self.statistics.get_counter('/nat44/hairpinning')[0]
2087         # send packet from host to server
2088         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2089              IP(src=host.ip4, dst=self.nat_addr) /
2090              TCP(sport=host_in_port, dport=server_out_port))
2091         self.pg0.add_stream(p)
2092         self.pg_enable_capture(self.pg_interfaces)
2093         self.pg_start()
2094         capture = self.pg0.get_capture(1)
2095         p = capture[0]
2096         try:
2097             ip = p[IP]
2098             tcp = p[TCP]
2099             self.assertEqual(ip.src, self.nat_addr)
2100             self.assertEqual(ip.dst, server.ip4)
2101             self.assertNotEqual(tcp.sport, host_in_port)
2102             self.assertEqual(tcp.dport, server_in_port)
2103             self.assert_packet_checksums_valid(p)
2104             host_out_port = tcp.sport
2105         except:
2106             self.logger.error(ppp("Unexpected or invalid packet:", p))
2107             raise
2108
2109         after = self.statistics.get_counter('/nat44/hairpinning')[0]
2110         if_idx = self.pg0.sw_if_index
2111         self.assertEqual(after[if_idx] - cnt[if_idx], 1)
2112
2113         # send reply from server to host
2114         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2115              IP(src=server.ip4, dst=self.nat_addr) /
2116              TCP(sport=server_in_port, dport=host_out_port))
2117         self.pg0.add_stream(p)
2118         self.pg_enable_capture(self.pg_interfaces)
2119         self.pg_start()
2120         capture = self.pg0.get_capture(1)
2121         p = capture[0]
2122         try:
2123             ip = p[IP]
2124             tcp = p[TCP]
2125             self.assertEqual(ip.src, self.nat_addr)
2126             self.assertEqual(ip.dst, host.ip4)
2127             self.assertEqual(tcp.sport, server_out_port)
2128             self.assertEqual(tcp.dport, host_in_port)
2129             self.assert_packet_checksums_valid(p)
2130         except:
2131             self.logger.error(ppp("Unexpected or invalid packet:", p))
2132             raise
2133
2134         after = self.statistics.get_counter('/nat44/hairpinning')[0]
2135         if_idx = self.pg0.sw_if_index
2136         self.assertEqual(after[if_idx] - cnt[if_idx], 2)
2137
2138     def test_interface_addr(self):
2139         """ NAT44EI acquire addresses from interface """
2140         self.vapi.nat44_add_del_interface_addr(
2141             is_add=1,
2142             sw_if_index=self.pg7.sw_if_index)
2143
2144         # no address in NAT pool
2145         addresses = self.vapi.nat44_address_dump()
2146         self.assertEqual(0, len(addresses))
2147
2148         # configure interface address and check NAT address pool
2149         self.pg7.config_ip4()
2150         addresses = self.vapi.nat44_address_dump()
2151         self.assertEqual(1, len(addresses))
2152         self.assertEqual(str(addresses[0].ip_address), self.pg7.local_ip4)
2153
2154         # remove interface address and check NAT address pool
2155         self.pg7.unconfig_ip4()
2156         addresses = self.vapi.nat44_address_dump()
2157         self.assertEqual(0, len(addresses))
2158
2159     def test_interface_addr_static_mapping(self):
2160         """ NAT44EI Static mapping with addresses from interface """
2161         tag = "testTAG"
2162
2163         self.vapi.nat44_add_del_interface_addr(
2164             is_add=1,
2165             sw_if_index=self.pg7.sw_if_index)
2166         self.nat44_add_static_mapping(
2167             '1.2.3.4',
2168             external_sw_if_index=self.pg7.sw_if_index,
2169             tag=tag)
2170
2171         # static mappings with external interface
2172         static_mappings = self.vapi.nat44_static_mapping_dump()
2173         self.assertEqual(1, len(static_mappings))
2174         self.assertEqual(self.pg7.sw_if_index,
2175                          static_mappings[0].external_sw_if_index)
2176         self.assertEqual(static_mappings[0].tag, tag)
2177
2178         # configure interface address and check static mappings
2179         self.pg7.config_ip4()
2180         static_mappings = self.vapi.nat44_static_mapping_dump()
2181         self.assertEqual(2, len(static_mappings))
2182         resolved = False
2183         for sm in static_mappings:
2184             if sm.external_sw_if_index == 0xFFFFFFFF:
2185                 self.assertEqual(str(sm.external_ip_address),
2186                                  self.pg7.local_ip4)
2187                 self.assertEqual(sm.tag, tag)
2188                 resolved = True
2189         self.assertTrue(resolved)
2190
2191         # remove interface address and check static mappings
2192         self.pg7.unconfig_ip4()
2193         static_mappings = self.vapi.nat44_static_mapping_dump()
2194         self.assertEqual(1, len(static_mappings))
2195         self.assertEqual(self.pg7.sw_if_index,
2196                          static_mappings[0].external_sw_if_index)
2197         self.assertEqual(static_mappings[0].tag, tag)
2198
2199         # configure interface address again and check static mappings
2200         self.pg7.config_ip4()
2201         static_mappings = self.vapi.nat44_static_mapping_dump()
2202         self.assertEqual(2, len(static_mappings))
2203         resolved = False
2204         for sm in static_mappings:
2205             if sm.external_sw_if_index == 0xFFFFFFFF:
2206                 self.assertEqual(str(sm.external_ip_address),
2207                                  self.pg7.local_ip4)
2208                 self.assertEqual(sm.tag, tag)
2209                 resolved = True
2210         self.assertTrue(resolved)
2211
2212         # remove static mapping
2213         self.nat44_add_static_mapping(
2214             '1.2.3.4',
2215             external_sw_if_index=self.pg7.sw_if_index,
2216             tag=tag,
2217             is_add=0)
2218         static_mappings = self.vapi.nat44_static_mapping_dump()
2219         self.assertEqual(0, len(static_mappings))
2220
2221     def test_interface_addr_identity_nat(self):
2222         """ NAT44EI Identity NAT with addresses from interface """
2223
2224         port = 53053
2225         self.vapi.nat44_add_del_interface_addr(
2226             is_add=1,
2227             sw_if_index=self.pg7.sw_if_index)
2228         self.vapi.nat44_add_del_identity_mapping(
2229             ip_address=b'0',
2230             sw_if_index=self.pg7.sw_if_index,
2231             port=port,
2232             protocol=IP_PROTOS.tcp,
2233             is_add=1)
2234
2235         # identity mappings with external interface
2236         identity_mappings = self.vapi.nat44_identity_mapping_dump()
2237         self.assertEqual(1, len(identity_mappings))
2238         self.assertEqual(self.pg7.sw_if_index,
2239                          identity_mappings[0].sw_if_index)
2240
2241         # configure interface address and check identity mappings
2242         self.pg7.config_ip4()
2243         identity_mappings = self.vapi.nat44_identity_mapping_dump()
2244         resolved = False
2245         self.assertEqual(2, len(identity_mappings))
2246         for sm in identity_mappings:
2247             if sm.sw_if_index == 0xFFFFFFFF:
2248                 self.assertEqual(str(identity_mappings[0].ip_address),
2249                                  self.pg7.local_ip4)
2250                 self.assertEqual(port, identity_mappings[0].port)
2251                 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2252                 resolved = True
2253         self.assertTrue(resolved)
2254
2255         # remove interface address and check identity mappings
2256         self.pg7.unconfig_ip4()
2257         identity_mappings = self.vapi.nat44_identity_mapping_dump()
2258         self.assertEqual(1, len(identity_mappings))
2259         self.assertEqual(self.pg7.sw_if_index,
2260                          identity_mappings[0].sw_if_index)
2261
2262     def test_ipfix_nat44_sess(self):
2263         """ NAT44EI IPFIX logging NAT44EI session created/deleted """
2264         self.ipfix_domain_id = 10
2265         self.ipfix_src_port = 20202
2266         collector_port = 30303
2267         bind_layers(UDP, IPFIX, dport=30303)
2268         self.nat44_add_address(self.nat_addr)
2269         flags = self.config_flags.NAT_IS_INSIDE
2270         self.vapi.nat44_interface_add_del_feature(
2271             sw_if_index=self.pg0.sw_if_index,
2272             flags=flags, is_add=1)
2273         self.vapi.nat44_interface_add_del_feature(
2274             sw_if_index=self.pg1.sw_if_index,
2275             is_add=1)
2276         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
2277                                      src_address=self.pg3.local_ip4,
2278                                      path_mtu=512,
2279                                      template_interval=10,
2280                                      collector_port=collector_port)
2281         self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
2282                                            src_port=self.ipfix_src_port,
2283                                            enable=1)
2284
2285         pkts = self.create_stream_in(self.pg0, self.pg1)
2286         self.pg0.add_stream(pkts)
2287         self.pg_enable_capture(self.pg_interfaces)
2288         self.pg_start()
2289         capture = self.pg1.get_capture(len(pkts))
2290         self.verify_capture_out(capture)
2291         self.nat44_add_address(self.nat_addr, is_add=0)
2292         self.vapi.ipfix_flush()
2293         capture = self.pg3.get_capture(7)
2294         ipfix = IPFIXDecoder()
2295         # first load template
2296         for p in capture:
2297             self.assertTrue(p.haslayer(IPFIX))
2298             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2299             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2300             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2301             self.assertEqual(p[UDP].dport, collector_port)
2302             self.assertEqual(p[IPFIX].observationDomainID,
2303                              self.ipfix_domain_id)
2304             if p.haslayer(Template):
2305                 ipfix.add_template(p.getlayer(Template))
2306         # verify events in data set
2307         for p in capture:
2308             if p.haslayer(Data):
2309                 data = ipfix.decode_data_set(p.getlayer(Set))
2310                 self.verify_ipfix_nat44_ses(data)
2311
2312     def test_ipfix_addr_exhausted(self):
2313         """ NAT44EI IPFIX logging NAT addresses exhausted """
2314         flags = self.config_flags.NAT_IS_INSIDE
2315         self.vapi.nat44_interface_add_del_feature(
2316             sw_if_index=self.pg0.sw_if_index,
2317             flags=flags, is_add=1)
2318         self.vapi.nat44_interface_add_del_feature(
2319             sw_if_index=self.pg1.sw_if_index,
2320             is_add=1)
2321         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
2322                                      src_address=self.pg3.local_ip4,
2323                                      path_mtu=512,
2324                                      template_interval=10)
2325         self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
2326                                            src_port=self.ipfix_src_port,
2327                                            enable=1)
2328
2329         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2330              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2331              TCP(sport=3025))
2332         self.pg0.add_stream(p)
2333         self.pg_enable_capture(self.pg_interfaces)
2334         self.pg_start()
2335         self.pg1.assert_nothing_captured()
2336         sleep(1)
2337         self.vapi.ipfix_flush()
2338         capture = self.pg3.get_capture(7)
2339         ipfix = IPFIXDecoder()
2340         # first load template
2341         for p in capture:
2342             self.assertTrue(p.haslayer(IPFIX))
2343             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2344             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2345             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2346             self.assertEqual(p[UDP].dport, 4739)
2347             self.assertEqual(p[IPFIX].observationDomainID,
2348                              self.ipfix_domain_id)
2349             if p.haslayer(Template):
2350                 ipfix.add_template(p.getlayer(Template))
2351         # verify events in data set
2352         for p in capture:
2353             if p.haslayer(Data):
2354                 data = ipfix.decode_data_set(p.getlayer(Set))
2355                 self.verify_ipfix_addr_exhausted(data)
2356
2357     def test_ipfix_max_sessions(self):
2358         """ NAT44EI IPFIX logging maximum session entries exceeded """
2359         self.nat44_add_address(self.nat_addr)
2360         flags = self.config_flags.NAT_IS_INSIDE
2361         self.vapi.nat44_interface_add_del_feature(
2362             sw_if_index=self.pg0.sw_if_index,
2363             flags=flags, is_add=1)
2364         self.vapi.nat44_interface_add_del_feature(
2365             sw_if_index=self.pg1.sw_if_index,
2366             is_add=1)
2367
2368         max_sessions = self.max_translations
2369
2370         pkts = []
2371         for i in range(0, max_sessions):
2372             src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2373             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2374                  IP(src=src, dst=self.pg1.remote_ip4) /
2375                  TCP(sport=1025))
2376             pkts.append(p)
2377         self.pg0.add_stream(pkts)
2378         self.pg_enable_capture(self.pg_interfaces)
2379         self.pg_start()
2380
2381         self.pg1.get_capture(max_sessions)
2382         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
2383                                      src_address=self.pg3.local_ip4,
2384                                      path_mtu=512,
2385                                      template_interval=10)
2386         self.vapi.nat_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
2387                                            src_port=self.ipfix_src_port,
2388                                            enable=1)
2389
2390         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2391              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2392              TCP(sport=1025))
2393         self.pg0.add_stream(p)
2394         self.pg_enable_capture(self.pg_interfaces)
2395         self.pg_start()
2396         self.pg1.assert_nothing_captured()
2397         sleep(1)
2398         self.vapi.ipfix_flush()
2399         capture = self.pg3.get_capture(7)
2400         ipfix = IPFIXDecoder()
2401         # first load template
2402         for p in capture:
2403             self.assertTrue(p.haslayer(IPFIX))
2404             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2405             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2406             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2407             self.assertEqual(p[UDP].dport, 4739)
2408             self.assertEqual(p[IPFIX].observationDomainID,
2409                              self.ipfix_domain_id)
2410             if p.haslayer(Template):
2411                 ipfix.add_template(p.getlayer(Template))
2412         # verify events in data set
2413         for p in capture:
2414             if p.haslayer(Data):
2415                 data = ipfix.decode_data_set(p.getlayer(Set))
2416                 self.verify_ipfix_max_sessions(data, max_sessions)
2417
2418     def test_syslog_apmap(self):
2419         """ NAT44EI syslog address and port mapping creation and deletion """
2420         self.vapi.syslog_set_filter(
2421             self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
2422         self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
2423         self.nat44_add_address(self.nat_addr)
2424         flags = self.config_flags.NAT_IS_INSIDE
2425         self.vapi.nat44_interface_add_del_feature(
2426             sw_if_index=self.pg0.sw_if_index,
2427             flags=flags, is_add=1)
2428         self.vapi.nat44_interface_add_del_feature(
2429             sw_if_index=self.pg1.sw_if_index,
2430             is_add=1)
2431
2432         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2433              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2434              TCP(sport=self.tcp_port_in, dport=20))
2435         self.pg0.add_stream(p)
2436         self.pg_enable_capture(self.pg_interfaces)
2437         self.pg_start()
2438         capture = self.pg1.get_capture(1)
2439         self.tcp_port_out = capture[0][TCP].sport
2440         capture = self.pg3.get_capture(1)
2441         self.verify_syslog_apmap(capture[0][Raw].load)
2442
2443         self.pg_enable_capture(self.pg_interfaces)
2444         self.pg_start()
2445         self.nat44_add_address(self.nat_addr, is_add=0)
2446         capture = self.pg3.get_capture(1)
2447         self.verify_syslog_apmap(capture[0][Raw].load, False)
2448
2449     def test_pool_addr_fib(self):
2450         """ NAT44EI add pool addresses to FIB """
2451         static_addr = '10.0.0.10'
2452         self.nat44_add_address(self.nat_addr)
2453         flags = self.config_flags.NAT_IS_INSIDE
2454         self.vapi.nat44_interface_add_del_feature(
2455             sw_if_index=self.pg0.sw_if_index,
2456             flags=flags, is_add=1)
2457         self.vapi.nat44_interface_add_del_feature(
2458             sw_if_index=self.pg1.sw_if_index,
2459             is_add=1)
2460         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2461
2462         # NAT44EI address
2463         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2464              ARP(op=ARP.who_has, pdst=self.nat_addr,
2465                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2466         self.pg1.add_stream(p)
2467         self.pg_enable_capture(self.pg_interfaces)
2468         self.pg_start()
2469         capture = self.pg1.get_capture(1)
2470         self.assertTrue(capture[0].haslayer(ARP))
2471         self.assertTrue(capture[0][ARP].op, ARP.is_at)
2472
2473         # 1:1 NAT address
2474         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2475              ARP(op=ARP.who_has, pdst=static_addr,
2476                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2477         self.pg1.add_stream(p)
2478         self.pg_enable_capture(self.pg_interfaces)
2479         self.pg_start()
2480         capture = self.pg1.get_capture(1)
2481         self.assertTrue(capture[0].haslayer(ARP))
2482         self.assertTrue(capture[0][ARP].op, ARP.is_at)
2483
2484         # send ARP to non-NAT44EI interface
2485         p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2486              ARP(op=ARP.who_has, pdst=self.nat_addr,
2487                  psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2488         self.pg2.add_stream(p)
2489         self.pg_enable_capture(self.pg_interfaces)
2490         self.pg_start()
2491         self.pg1.assert_nothing_captured()
2492
2493         # remove addresses and verify
2494         self.nat44_add_address(self.nat_addr, is_add=0)
2495         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2496                                       is_add=0)
2497
2498         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2499              ARP(op=ARP.who_has, pdst=self.nat_addr,
2500                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2501         self.pg1.add_stream(p)
2502         self.pg_enable_capture(self.pg_interfaces)
2503         self.pg_start()
2504         self.pg1.assert_nothing_captured()
2505
2506         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2507              ARP(op=ARP.who_has, pdst=static_addr,
2508                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2509         self.pg1.add_stream(p)
2510         self.pg_enable_capture(self.pg_interfaces)
2511         self.pg_start()
2512         self.pg1.assert_nothing_captured()
2513
2514     def test_vrf_mode(self):
2515         """ NAT44EI tenant VRF aware address pool mode """
2516
2517         vrf_id1 = 1
2518         vrf_id2 = 2
2519         nat_ip1 = "10.0.0.10"
2520         nat_ip2 = "10.0.0.11"
2521
2522         self.pg0.unconfig_ip4()
2523         self.pg1.unconfig_ip4()
2524         self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id1})
2525         self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id2})
2526         self.pg0.set_table_ip4(vrf_id1)
2527         self.pg1.set_table_ip4(vrf_id2)
2528         self.pg0.config_ip4()
2529         self.pg1.config_ip4()
2530         self.pg0.resolve_arp()
2531         self.pg1.resolve_arp()
2532
2533         self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2534         self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2535         flags = self.config_flags.NAT_IS_INSIDE
2536         self.vapi.nat44_interface_add_del_feature(
2537             sw_if_index=self.pg0.sw_if_index,
2538             flags=flags, is_add=1)
2539         self.vapi.nat44_interface_add_del_feature(
2540             sw_if_index=self.pg1.sw_if_index,
2541             flags=flags, is_add=1)
2542         self.vapi.nat44_interface_add_del_feature(
2543             sw_if_index=self.pg2.sw_if_index,
2544             is_add=1)
2545
2546         try:
2547             # first VRF
2548             pkts = self.create_stream_in(self.pg0, self.pg2)
2549             self.pg0.add_stream(pkts)
2550             self.pg_enable_capture(self.pg_interfaces)
2551             self.pg_start()
2552             capture = self.pg2.get_capture(len(pkts))
2553             self.verify_capture_out(capture, nat_ip1)
2554
2555             # second VRF
2556             pkts = self.create_stream_in(self.pg1, self.pg2)
2557             self.pg1.add_stream(pkts)
2558             self.pg_enable_capture(self.pg_interfaces)
2559             self.pg_start()
2560             capture = self.pg2.get_capture(len(pkts))
2561             self.verify_capture_out(capture, nat_ip2)
2562
2563         finally:
2564             self.pg0.unconfig_ip4()
2565             self.pg1.unconfig_ip4()
2566             self.pg0.set_table_ip4(0)
2567             self.pg1.set_table_ip4(0)
2568             self.pg0.config_ip4()
2569             self.pg1.config_ip4()
2570             self.pg0.resolve_arp()
2571             self.pg1.resolve_arp()
2572             self.vapi.ip_table_add_del(is_add=0, table={'table_id': vrf_id1})
2573             self.vapi.ip_table_add_del(is_add=0, table={'table_id': vrf_id2})
2574
2575     def test_vrf_feature_independent(self):
2576         """ NAT44EI tenant VRF independent address pool mode """
2577
2578         nat_ip1 = "10.0.0.10"
2579         nat_ip2 = "10.0.0.11"
2580
2581         self.nat44_add_address(nat_ip1)
2582         self.nat44_add_address(nat_ip2, vrf_id=99)
2583         flags = self.config_flags.NAT_IS_INSIDE
2584         self.vapi.nat44_interface_add_del_feature(
2585             sw_if_index=self.pg0.sw_if_index,
2586             flags=flags, is_add=1)
2587         self.vapi.nat44_interface_add_del_feature(
2588             sw_if_index=self.pg1.sw_if_index,
2589             flags=flags, is_add=1)
2590         self.vapi.nat44_interface_add_del_feature(
2591             sw_if_index=self.pg2.sw_if_index,
2592             is_add=1)
2593
2594         # first VRF
2595         pkts = self.create_stream_in(self.pg0, self.pg2)
2596         self.pg0.add_stream(pkts)
2597         self.pg_enable_capture(self.pg_interfaces)
2598         self.pg_start()
2599         capture = self.pg2.get_capture(len(pkts))
2600         self.verify_capture_out(capture, nat_ip1)
2601
2602         # second VRF
2603         pkts = self.create_stream_in(self.pg1, self.pg2)
2604         self.pg1.add_stream(pkts)
2605         self.pg_enable_capture(self.pg_interfaces)
2606         self.pg_start()
2607         capture = self.pg2.get_capture(len(pkts))
2608         self.verify_capture_out(capture, nat_ip1)
2609
2610     def test_dynamic_ipless_interfaces(self):
2611         """ NAT44EI interfaces without configured IP address """
2612         self.create_routes_and_neigbors()
2613         self.nat44_add_address(self.nat_addr)
2614         flags = self.config_flags.NAT_IS_INSIDE
2615         self.vapi.nat44_interface_add_del_feature(
2616             sw_if_index=self.pg7.sw_if_index,
2617             flags=flags, is_add=1)
2618         self.vapi.nat44_interface_add_del_feature(
2619             sw_if_index=self.pg8.sw_if_index,
2620             is_add=1)
2621
2622         # in2out
2623         pkts = self.create_stream_in(self.pg7, self.pg8)
2624         self.pg7.add_stream(pkts)
2625         self.pg_enable_capture(self.pg_interfaces)
2626         self.pg_start()
2627         capture = self.pg8.get_capture(len(pkts))
2628         self.verify_capture_out(capture)
2629
2630         # out2in
2631         pkts = self.create_stream_out(self.pg8, self.nat_addr)
2632         self.pg8.add_stream(pkts)
2633         self.pg_enable_capture(self.pg_interfaces)
2634         self.pg_start()
2635         capture = self.pg7.get_capture(len(pkts))
2636         self.verify_capture_in(capture, self.pg7)
2637
2638     def test_static_ipless_interfaces(self):
2639         """ NAT44EI interfaces without configured IP address - 1:1 NAT """
2640
2641         self.create_routes_and_neigbors()
2642         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2643         flags = self.config_flags.NAT_IS_INSIDE
2644         self.vapi.nat44_interface_add_del_feature(
2645             sw_if_index=self.pg7.sw_if_index,
2646             flags=flags, is_add=1)
2647         self.vapi.nat44_interface_add_del_feature(
2648             sw_if_index=self.pg8.sw_if_index,
2649             is_add=1)
2650
2651         # out2in
2652         pkts = self.create_stream_out(self.pg8)
2653         self.pg8.add_stream(pkts)
2654         self.pg_enable_capture(self.pg_interfaces)
2655         self.pg_start()
2656         capture = self.pg7.get_capture(len(pkts))
2657         self.verify_capture_in(capture, self.pg7)
2658
2659         # in2out
2660         pkts = self.create_stream_in(self.pg7, self.pg8)
2661         self.pg7.add_stream(pkts)
2662         self.pg_enable_capture(self.pg_interfaces)
2663         self.pg_start()
2664         capture = self.pg8.get_capture(len(pkts))
2665         self.verify_capture_out(capture, self.nat_addr, True)
2666
2667     def test_static_with_port_ipless_interfaces(self):
2668         """ NAT44EI interfaces without configured IP address - 1:1 NAPT """
2669
2670         self.tcp_port_out = 30606
2671         self.udp_port_out = 30607
2672         self.icmp_id_out = 30608
2673
2674         self.create_routes_and_neigbors()
2675         self.nat44_add_address(self.nat_addr)
2676         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2677                                       self.tcp_port_in, self.tcp_port_out,
2678                                       proto=IP_PROTOS.tcp)
2679         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2680                                       self.udp_port_in, self.udp_port_out,
2681                                       proto=IP_PROTOS.udp)
2682         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2683                                       self.icmp_id_in, self.icmp_id_out,
2684                                       proto=IP_PROTOS.icmp)
2685         flags = self.config_flags.NAT_IS_INSIDE
2686         self.vapi.nat44_interface_add_del_feature(
2687             sw_if_index=self.pg7.sw_if_index,
2688             flags=flags, is_add=1)
2689         self.vapi.nat44_interface_add_del_feature(
2690             sw_if_index=self.pg8.sw_if_index,
2691             is_add=1)
2692
2693         # out2in
2694         pkts = self.create_stream_out(self.pg8)
2695         self.pg8.add_stream(pkts)
2696         self.pg_enable_capture(self.pg_interfaces)
2697         self.pg_start()
2698         capture = self.pg7.get_capture(len(pkts))
2699         self.verify_capture_in(capture, self.pg7)
2700
2701         # in2out
2702         pkts = self.create_stream_in(self.pg7, self.pg8)
2703         self.pg7.add_stream(pkts)
2704         self.pg_enable_capture(self.pg_interfaces)
2705         self.pg_start()
2706         capture = self.pg8.get_capture(len(pkts))
2707         self.verify_capture_out(capture)
2708
2709     def test_static_unknown_proto(self):
2710         """ NAT44EI 1:1 translate packet with unknown protocol """
2711         nat_ip = "10.0.0.10"
2712         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2713         flags = self.config_flags.NAT_IS_INSIDE
2714         self.vapi.nat44_interface_add_del_feature(
2715             sw_if_index=self.pg0.sw_if_index,
2716             flags=flags, is_add=1)
2717         self.vapi.nat44_interface_add_del_feature(
2718             sw_if_index=self.pg1.sw_if_index,
2719             is_add=1)
2720
2721         # in2out
2722         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2723              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2724              GRE() /
2725              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2726              TCP(sport=1234, dport=1234))
2727         self.pg0.add_stream(p)
2728         self.pg_enable_capture(self.pg_interfaces)
2729         self.pg_start()
2730         p = self.pg1.get_capture(1)
2731         packet = p[0]
2732         try:
2733             self.assertEqual(packet[IP].src, nat_ip)
2734             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2735             self.assertEqual(packet.haslayer(GRE), 1)
2736             self.assert_packet_checksums_valid(packet)
2737         except:
2738             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2739             raise
2740
2741         # out2in
2742         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2743              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2744              GRE() /
2745              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2746              TCP(sport=1234, dport=1234))
2747         self.pg1.add_stream(p)
2748         self.pg_enable_capture(self.pg_interfaces)
2749         self.pg_start()
2750         p = self.pg0.get_capture(1)
2751         packet = p[0]
2752         try:
2753             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2754             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2755             self.assertEqual(packet.haslayer(GRE), 1)
2756             self.assert_packet_checksums_valid(packet)
2757         except:
2758             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2759             raise
2760
2761     def test_hairpinning_static_unknown_proto(self):
2762         """ NAT44EI 1:1 translate packet with unknown protocol - hairpinning
2763         """
2764
2765         host = self.pg0.remote_hosts[0]
2766         server = self.pg0.remote_hosts[1]
2767
2768         host_nat_ip = "10.0.0.10"
2769         server_nat_ip = "10.0.0.11"
2770
2771         self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2772         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2773         flags = self.config_flags.NAT_IS_INSIDE
2774         self.vapi.nat44_interface_add_del_feature(
2775             sw_if_index=self.pg0.sw_if_index,
2776             flags=flags, is_add=1)
2777         self.vapi.nat44_interface_add_del_feature(
2778             sw_if_index=self.pg1.sw_if_index,
2779             is_add=1)
2780
2781         # host to server
2782         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2783              IP(src=host.ip4, dst=server_nat_ip) /
2784              GRE() /
2785              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2786              TCP(sport=1234, dport=1234))
2787         self.pg0.add_stream(p)
2788         self.pg_enable_capture(self.pg_interfaces)
2789         self.pg_start()
2790         p = self.pg0.get_capture(1)
2791         packet = p[0]
2792         try:
2793             self.assertEqual(packet[IP].src, host_nat_ip)
2794             self.assertEqual(packet[IP].dst, server.ip4)
2795             self.assertEqual(packet.haslayer(GRE), 1)
2796             self.assert_packet_checksums_valid(packet)
2797         except:
2798             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2799             raise
2800
2801         # server to host
2802         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2803              IP(src=server.ip4, dst=host_nat_ip) /
2804              GRE() /
2805              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2806              TCP(sport=1234, dport=1234))
2807         self.pg0.add_stream(p)
2808         self.pg_enable_capture(self.pg_interfaces)
2809         self.pg_start()
2810         p = self.pg0.get_capture(1)
2811         packet = p[0]
2812         try:
2813             self.assertEqual(packet[IP].src, server_nat_ip)
2814             self.assertEqual(packet[IP].dst, host.ip4)
2815             self.assertEqual(packet.haslayer(GRE), 1)
2816             self.assert_packet_checksums_valid(packet)
2817         except:
2818             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2819             raise
2820
2821     def test_output_feature(self):
2822         """ NAT44EI output feature (in2out postrouting) """
2823         self.nat44_add_address(self.nat_addr)
2824         flags = self.config_flags.NAT_IS_INSIDE
2825         self.vapi.nat44_interface_add_del_output_feature(
2826             is_add=1, flags=flags,
2827             sw_if_index=self.pg0.sw_if_index)
2828         self.vapi.nat44_interface_add_del_output_feature(
2829             is_add=1, flags=flags,
2830             sw_if_index=self.pg1.sw_if_index)
2831         self.vapi.nat44_interface_add_del_output_feature(
2832             is_add=1,
2833             sw_if_index=self.pg3.sw_if_index)
2834
2835         # in2out
2836         pkts = self.create_stream_in(self.pg0, self.pg3)
2837         self.pg0.add_stream(pkts)
2838         self.pg_enable_capture(self.pg_interfaces)
2839         self.pg_start()
2840         capture = self.pg3.get_capture(len(pkts))
2841         self.verify_capture_out(capture)
2842
2843         # out2in
2844         pkts = self.create_stream_out(self.pg3)
2845         self.pg3.add_stream(pkts)
2846         self.pg_enable_capture(self.pg_interfaces)
2847         self.pg_start()
2848         capture = self.pg0.get_capture(len(pkts))
2849         self.verify_capture_in(capture, self.pg0)
2850
2851         # from non-NAT interface to NAT inside interface
2852         pkts = self.create_stream_in(self.pg2, self.pg0)
2853         self.pg2.add_stream(pkts)
2854         self.pg_enable_capture(self.pg_interfaces)
2855         self.pg_start()
2856         capture = self.pg0.get_capture(len(pkts))
2857         self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2858
2859     def test_output_feature_vrf_aware(self):
2860         """ NAT44EI output feature VRF aware (in2out postrouting) """
2861         nat_ip_vrf10 = "10.0.0.10"
2862         nat_ip_vrf20 = "10.0.0.20"
2863
2864         r1 = VppIpRoute(self, self.pg3.remote_ip4, 32,
2865                         [VppRoutePath(self.pg3.remote_ip4,
2866                                       self.pg3.sw_if_index)],
2867                         table_id=10)
2868         r2 = VppIpRoute(self, self.pg3.remote_ip4, 32,
2869                         [VppRoutePath(self.pg3.remote_ip4,
2870                                       self.pg3.sw_if_index)],
2871                         table_id=20)
2872         r1.add_vpp_config()
2873         r2.add_vpp_config()
2874
2875         self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2876         self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2877         flags = self.config_flags.NAT_IS_INSIDE
2878         self.vapi.nat44_interface_add_del_output_feature(
2879             is_add=1, flags=flags,
2880             sw_if_index=self.pg4.sw_if_index)
2881         self.vapi.nat44_interface_add_del_output_feature(
2882             is_add=1, flags=flags,
2883             sw_if_index=self.pg6.sw_if_index)
2884         self.vapi.nat44_interface_add_del_output_feature(
2885             is_add=1,
2886             sw_if_index=self.pg3.sw_if_index)
2887
2888         # in2out VRF 10
2889         pkts = self.create_stream_in(self.pg4, self.pg3)
2890         self.pg4.add_stream(pkts)
2891         self.pg_enable_capture(self.pg_interfaces)
2892         self.pg_start()
2893         capture = self.pg3.get_capture(len(pkts))
2894         self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2895
2896         # out2in VRF 10
2897         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2898         self.pg3.add_stream(pkts)
2899         self.pg_enable_capture(self.pg_interfaces)
2900         self.pg_start()
2901         capture = self.pg4.get_capture(len(pkts))
2902         self.verify_capture_in(capture, self.pg4)
2903
2904         # in2out VRF 20
2905         pkts = self.create_stream_in(self.pg6, self.pg3)
2906         self.pg6.add_stream(pkts)
2907         self.pg_enable_capture(self.pg_interfaces)
2908         self.pg_start()
2909         capture = self.pg3.get_capture(len(pkts))
2910         self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2911
2912         # out2in VRF 20
2913         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2914         self.pg3.add_stream(pkts)
2915         self.pg_enable_capture(self.pg_interfaces)
2916         self.pg_start()
2917         capture = self.pg6.get_capture(len(pkts))
2918         self.verify_capture_in(capture, self.pg6)
2919
2920     def test_output_feature_hairpinning(self):
2921         """ NAT44EI output feature hairpinning (in2out postrouting) """
2922         host = self.pg0.remote_hosts[0]
2923         server = self.pg0.remote_hosts[1]
2924         host_in_port = 1234
2925         host_out_port = 0
2926         server_in_port = 5678
2927         server_out_port = 8765
2928
2929         self.nat44_add_address(self.nat_addr)
2930         flags = self.config_flags.NAT_IS_INSIDE
2931         self.vapi.nat44_interface_add_del_output_feature(
2932             is_add=1, flags=flags,
2933             sw_if_index=self.pg0.sw_if_index)
2934         self.vapi.nat44_interface_add_del_output_feature(
2935             is_add=1,
2936             sw_if_index=self.pg1.sw_if_index)
2937
2938         # add static mapping for server
2939         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2940                                       server_in_port, server_out_port,
2941                                       proto=IP_PROTOS.tcp)
2942
2943         # send packet from host to server
2944         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2945              IP(src=host.ip4, dst=self.nat_addr) /
2946              TCP(sport=host_in_port, dport=server_out_port))
2947         self.pg0.add_stream(p)
2948         self.pg_enable_capture(self.pg_interfaces)
2949         self.pg_start()
2950         capture = self.pg0.get_capture(1)
2951         p = capture[0]
2952         try:
2953             ip = p[IP]
2954             tcp = p[TCP]
2955             self.assertEqual(ip.src, self.nat_addr)
2956             self.assertEqual(ip.dst, server.ip4)
2957             self.assertNotEqual(tcp.sport, host_in_port)
2958             self.assertEqual(tcp.dport, server_in_port)
2959             self.assert_packet_checksums_valid(p)
2960             host_out_port = tcp.sport
2961         except:
2962             self.logger.error(ppp("Unexpected or invalid packet:", p))
2963             raise
2964
2965         # send reply from server to host
2966         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2967              IP(src=server.ip4, dst=self.nat_addr) /
2968              TCP(sport=server_in_port, dport=host_out_port))
2969         self.pg0.add_stream(p)
2970         self.pg_enable_capture(self.pg_interfaces)
2971         self.pg_start()
2972         capture = self.pg0.get_capture(1)
2973         p = capture[0]
2974         try:
2975             ip = p[IP]
2976             tcp = p[TCP]
2977             self.assertEqual(ip.src, self.nat_addr)
2978             self.assertEqual(ip.dst, host.ip4)
2979             self.assertEqual(tcp.sport, server_out_port)
2980             self.assertEqual(tcp.dport, host_in_port)
2981             self.assert_packet_checksums_valid(p)
2982         except:
2983             self.logger.error(ppp("Unexpected or invalid packet:", p))
2984             raise
2985
2986     def test_one_armed_nat44(self):
2987         """ NAT44EI One armed NAT """
2988         remote_host = self.pg9.remote_hosts[0]
2989         local_host = self.pg9.remote_hosts[1]
2990         external_port = 0
2991
2992         self.nat44_add_address(self.nat_addr)
2993         flags = self.config_flags.NAT_IS_INSIDE
2994         self.vapi.nat44_interface_add_del_feature(
2995             sw_if_index=self.pg9.sw_if_index,
2996             is_add=1)
2997         self.vapi.nat44_interface_add_del_feature(
2998             sw_if_index=self.pg9.sw_if_index,
2999             flags=flags, is_add=1)
3000
3001         # in2out
3002         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3003              IP(src=local_host.ip4, dst=remote_host.ip4) /
3004              TCP(sport=12345, dport=80))
3005         self.pg9.add_stream(p)
3006         self.pg_enable_capture(self.pg_interfaces)
3007         self.pg_start()
3008         capture = self.pg9.get_capture(1)
3009         p = capture[0]
3010         try:
3011             ip = p[IP]
3012             tcp = p[TCP]
3013             self.assertEqual(ip.src, self.nat_addr)
3014             self.assertEqual(ip.dst, remote_host.ip4)
3015             self.assertNotEqual(tcp.sport, 12345)
3016             external_port = tcp.sport
3017             self.assertEqual(tcp.dport, 80)
3018             self.assert_packet_checksums_valid(p)
3019         except:
3020             self.logger.error(ppp("Unexpected or invalid packet:", p))
3021             raise
3022
3023         # out2in
3024         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3025              IP(src=remote_host.ip4, dst=self.nat_addr) /
3026              TCP(sport=80, dport=external_port))
3027         self.pg9.add_stream(p)
3028         self.pg_enable_capture(self.pg_interfaces)
3029         self.pg_start()
3030         capture = self.pg9.get_capture(1)
3031         p = capture[0]
3032         try:
3033             ip = p[IP]
3034             tcp = p[TCP]
3035             self.assertEqual(ip.src, remote_host.ip4)
3036             self.assertEqual(ip.dst, local_host.ip4)
3037             self.assertEqual(tcp.sport, 80)
3038             self.assertEqual(tcp.dport, 12345)
3039             self.assert_packet_checksums_valid(p)
3040         except:
3041             self.logger.error(ppp("Unexpected or invalid packet:", p))
3042             raise
3043
3044         err = self.statistics.get_err_counter(
3045             '/err/nat44-classify/next in2out')
3046         self.assertEqual(err, 1)
3047         err = self.statistics.get_err_counter(
3048             '/err/nat44-classify/next out2in')
3049         self.assertEqual(err, 1)
3050
3051     def test_del_session(self):
3052         """ NAT44EI delete session """
3053         self.nat44_add_address(self.nat_addr)
3054         flags = self.config_flags.NAT_IS_INSIDE
3055         self.vapi.nat44_interface_add_del_feature(
3056             sw_if_index=self.pg0.sw_if_index,
3057             flags=flags, is_add=1)
3058         self.vapi.nat44_interface_add_del_feature(
3059             sw_if_index=self.pg1.sw_if_index,
3060             is_add=1)
3061
3062         pkts = self.create_stream_in(self.pg0, self.pg1)
3063         self.pg0.add_stream(pkts)
3064         self.pg_enable_capture(self.pg_interfaces)
3065         self.pg_start()
3066         self.pg1.get_capture(len(pkts))
3067
3068         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3069         nsessions = len(sessions)
3070
3071         self.vapi.nat44_del_session(address=sessions[0].inside_ip_address,
3072                                     port=sessions[0].inside_port,
3073                                     protocol=sessions[0].protocol,
3074                                     flags=self.config_flags.NAT_IS_INSIDE)
3075         self.vapi.nat44_del_session(address=sessions[1].outside_ip_address,
3076                                     port=sessions[1].outside_port,
3077                                     protocol=sessions[1].protocol)
3078
3079         sessions = self.vapi.nat44_user_session_dump(self.pg0.remote_ip4, 0)
3080         self.assertEqual(nsessions - len(sessions), 2)
3081
3082         self.vapi.nat44_del_session(address=sessions[0].inside_ip_address,
3083                                     port=sessions[0].inside_port,
3084                                     protocol=sessions[0].protocol,
3085                                     flags=self.config_flags.NAT_IS_INSIDE)
3086
3087         self.verify_no_nat44_user()
3088
3089     def test_frag_in_order(self):
3090         """ NAT44EI translate fragments arriving in order """
3091
3092         self.nat44_add_address(self.nat_addr)
3093         flags = self.config_flags.NAT_IS_INSIDE
3094         self.vapi.nat44_interface_add_del_feature(
3095             sw_if_index=self.pg0.sw_if_index,
3096             flags=flags, is_add=1)
3097         self.vapi.nat44_interface_add_del_feature(
3098             sw_if_index=self.pg1.sw_if_index,
3099             is_add=1)
3100
3101         self.frag_in_order(proto=IP_PROTOS.tcp)
3102         self.frag_in_order(proto=IP_PROTOS.udp)
3103         self.frag_in_order(proto=IP_PROTOS.icmp)
3104
3105     def test_frag_forwarding(self):
3106         """ NAT44EI forwarding fragment test """
3107         self.vapi.nat44_add_del_interface_addr(
3108             is_add=1,
3109             sw_if_index=self.pg1.sw_if_index)
3110         flags = self.config_flags.NAT_IS_INSIDE
3111         self.vapi.nat44_interface_add_del_feature(
3112             sw_if_index=self.pg0.sw_if_index,
3113             flags=flags, is_add=1)
3114         self.vapi.nat44_interface_add_del_feature(
3115             sw_if_index=self.pg1.sw_if_index,
3116             is_add=1)
3117         self.vapi.nat44_forwarding_enable_disable(enable=1)
3118
3119         data = b"A" * 16 + b"B" * 16 + b"C" * 3
3120         pkts = self.create_stream_frag(self.pg1,
3121                                        self.pg0.remote_ip4,
3122                                        4789,
3123                                        4789,
3124                                        data,
3125                                        proto=IP_PROTOS.udp)
3126         self.pg1.add_stream(pkts)
3127         self.pg_enable_capture(self.pg_interfaces)
3128         self.pg_start()
3129         frags = self.pg0.get_capture(len(pkts))
3130         p = self.reass_frags_and_verify(frags,
3131                                         self.pg1.remote_ip4,
3132                                         self.pg0.remote_ip4)
3133         self.assertEqual(p[UDP].sport, 4789)
3134         self.assertEqual(p[UDP].dport, 4789)
3135         self.assertEqual(data, p[Raw].load)
3136
3137     def test_reass_hairpinning(self):
3138         """ NAT44EI fragments hairpinning """
3139
3140         server_addr = self.pg0.remote_hosts[1].ip4
3141         host_in_port = random.randint(1025, 65535)
3142         server_in_port = random.randint(1025, 65535)
3143         server_out_port = random.randint(1025, 65535)
3144
3145         self.nat44_add_address(self.nat_addr)
3146         flags = self.config_flags.NAT_IS_INSIDE
3147         self.vapi.nat44_interface_add_del_feature(
3148             sw_if_index=self.pg0.sw_if_index,
3149             flags=flags, is_add=1)
3150         self.vapi.nat44_interface_add_del_feature(
3151             sw_if_index=self.pg1.sw_if_index,
3152             is_add=1)
3153         # add static mapping for server
3154         self.nat44_add_static_mapping(server_addr, self.nat_addr,
3155                                       server_in_port,
3156                                       server_out_port,
3157                                       proto=IP_PROTOS.tcp)
3158         self.nat44_add_static_mapping(server_addr, self.nat_addr,
3159                                       server_in_port,
3160                                       server_out_port,
3161                                       proto=IP_PROTOS.udp)
3162         self.nat44_add_static_mapping(server_addr, self.nat_addr)
3163
3164         self.reass_hairpinning(server_addr, server_in_port, server_out_port,
3165                                host_in_port, proto=IP_PROTOS.tcp)
3166         self.reass_hairpinning(server_addr, server_in_port, server_out_port,
3167                                host_in_port, proto=IP_PROTOS.udp)
3168         self.reass_hairpinning(server_addr, server_in_port, server_out_port,
3169                                host_in_port, proto=IP_PROTOS.icmp)
3170
3171     def test_frag_out_of_order(self):
3172         """ NAT44EI translate fragments arriving out of order """
3173
3174         self.nat44_add_address(self.nat_addr)
3175         flags = self.config_flags.NAT_IS_INSIDE
3176         self.vapi.nat44_interface_add_del_feature(
3177             sw_if_index=self.pg0.sw_if_index,
3178             flags=flags, is_add=1)
3179         self.vapi.nat44_interface_add_del_feature(
3180             sw_if_index=self.pg1.sw_if_index,
3181             is_add=1)
3182
3183         self.frag_out_of_order(proto=IP_PROTOS.tcp)
3184         self.frag_out_of_order(proto=IP_PROTOS.udp)
3185         self.frag_out_of_order(proto=IP_PROTOS.icmp)
3186
3187     def test_port_restricted(self):
3188         """ NAT44EI Port restricted NAT44EI (MAP-E CE) """
3189         self.nat44_add_address(self.nat_addr)
3190         flags = self.config_flags.NAT_IS_INSIDE
3191         self.vapi.nat44_interface_add_del_feature(
3192             sw_if_index=self.pg0.sw_if_index,
3193             flags=flags, is_add=1)
3194         self.vapi.nat44_interface_add_del_feature(
3195             sw_if_index=self.pg1.sw_if_index,
3196             is_add=1)
3197         self.vapi.nat_set_addr_and_port_alloc_alg(alg=1,
3198                                                   psid_offset=6,
3199                                                   psid_length=6,
3200                                                   psid=10)
3201
3202         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3203              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3204              TCP(sport=4567, dport=22))
3205         self.pg0.add_stream(p)
3206         self.pg_enable_capture(self.pg_interfaces)
3207         self.pg_start()
3208         capture = self.pg1.get_capture(1)
3209         p = capture[0]
3210         try:
3211             ip = p[IP]
3212             tcp = p[TCP]
3213             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3214             self.assertEqual(ip.src, self.nat_addr)
3215             self.assertEqual(tcp.dport, 22)
3216             self.assertNotEqual(tcp.sport, 4567)
3217             self.assertEqual((tcp.sport >> 6) & 63, 10)
3218             self.assert_packet_checksums_valid(p)
3219         except:
3220             self.logger.error(ppp("Unexpected or invalid packet:", p))
3221             raise
3222
3223     def test_port_range(self):
3224         """ NAT44EI External address port range """
3225         self.nat44_add_address(self.nat_addr)
3226         flags = self.config_flags.NAT_IS_INSIDE
3227         self.vapi.nat44_interface_add_del_feature(
3228             sw_if_index=self.pg0.sw_if_index,
3229             flags=flags, is_add=1)
3230         self.vapi.nat44_interface_add_del_feature(
3231             sw_if_index=self.pg1.sw_if_index,
3232             is_add=1)
3233         self.vapi.nat_set_addr_and_port_alloc_alg(alg=2,
3234                                                   start_port=1025,
3235                                                   end_port=1027)
3236
3237         pkts = []
3238         for port in range(0, 5):
3239             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3240                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3241                  TCP(sport=1125 + port))
3242             pkts.append(p)
3243         self.pg0.add_stream(pkts)
3244         self.pg_enable_capture(self.pg_interfaces)
3245         self.pg_start()
3246         capture = self.pg1.get_capture(3)
3247         for p in capture:
3248             tcp = p[TCP]
3249             self.assertGreaterEqual(tcp.sport, 1025)
3250             self.assertLessEqual(tcp.sport, 1027)
3251
3252     def test_multiple_outside_vrf(self):
3253         """ NAT44EI Multiple outside VRF """
3254         vrf_id1 = 1
3255         vrf_id2 = 2
3256
3257         self.pg1.unconfig_ip4()
3258         self.pg2.unconfig_ip4()
3259         self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id1})
3260         self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id2})
3261         self.pg1.set_table_ip4(vrf_id1)
3262         self.pg2.set_table_ip4(vrf_id2)
3263         self.pg1.config_ip4()
3264         self.pg2.config_ip4()
3265         self.pg1.resolve_arp()
3266         self.pg2.resolve_arp()
3267
3268         self.nat44_add_address(self.nat_addr)
3269         flags = self.config_flags.NAT_IS_INSIDE
3270         self.vapi.nat44_interface_add_del_feature(
3271             sw_if_index=self.pg0.sw_if_index,
3272             flags=flags, is_add=1)
3273         self.vapi.nat44_interface_add_del_feature(
3274             sw_if_index=self.pg1.sw_if_index,
3275             is_add=1)
3276         self.vapi.nat44_interface_add_del_feature(
3277             sw_if_index=self.pg2.sw_if_index,
3278             is_add=1)
3279
3280         try:
3281             # first VRF
3282             pkts = self.create_stream_in(self.pg0, self.pg1)
3283             self.pg0.add_stream(pkts)
3284             self.pg_enable_capture(self.pg_interfaces)
3285             self.pg_start()
3286             capture = self.pg1.get_capture(len(pkts))
3287             self.verify_capture_out(capture, self.nat_addr)
3288
3289             pkts = self.create_stream_out(self.pg1, self.nat_addr)
3290             self.pg1.add_stream(pkts)
3291             self.pg_enable_capture(self.pg_interfaces)
3292             self.pg_start()
3293             capture = self.pg0.get_capture(len(pkts))
3294             self.verify_capture_in(capture, self.pg0)
3295
3296             self.tcp_port_in = 60303
3297             self.udp_port_in = 60304
3298             self.icmp_id_in = 60305
3299
3300             # second VRF
3301             pkts = self.create_stream_in(self.pg0, self.pg2)
3302             self.pg0.add_stream(pkts)
3303             self.pg_enable_capture(self.pg_interfaces)
3304             self.pg_start()
3305             capture = self.pg2.get_capture(len(pkts))
3306             self.verify_capture_out(capture, self.nat_addr)
3307
3308             pkts = self.create_stream_out(self.pg2, self.nat_addr)
3309             self.pg2.add_stream(pkts)
3310             self.pg_enable_capture(self.pg_interfaces)
3311             self.pg_start()
3312             capture = self.pg0.get_capture(len(pkts))
3313             self.verify_capture_in(capture, self.pg0)
3314
3315         finally:
3316             self.nat44_add_address(self.nat_addr, is_add=0)
3317             self.pg1.unconfig_ip4()
3318             self.pg2.unconfig_ip4()
3319             self.pg1.set_table_ip4(0)
3320             self.pg2.set_table_ip4(0)
3321             self.pg1.config_ip4()
3322             self.pg2.config_ip4()
3323             self.pg1.resolve_arp()
3324             self.pg2.resolve_arp()
3325
3326     def test_mss_clamping(self):
3327         """ NAT44EI TCP MSS clamping """
3328         self.nat44_add_address(self.nat_addr)
3329         flags = self.config_flags.NAT_IS_INSIDE
3330         self.vapi.nat44_interface_add_del_feature(
3331             sw_if_index=self.pg0.sw_if_index,
3332             flags=flags, is_add=1)
3333         self.vapi.nat44_interface_add_del_feature(
3334             sw_if_index=self.pg1.sw_if_index,
3335             is_add=1)
3336
3337         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3338              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3339              TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3340                  flags="S", options=[('MSS', 1400)]))
3341
3342         self.vapi.nat_set_mss_clamping(enable=1, mss_value=1000)
3343         self.pg0.add_stream(p)
3344         self.pg_enable_capture(self.pg_interfaces)
3345         self.pg_start()
3346         capture = self.pg1.get_capture(1)
3347         # Negotiated MSS value greater than configured - changed
3348         self.verify_mss_value(capture[0], 1000)
3349
3350         self.vapi.nat_set_mss_clamping(enable=0, mss_value=1500)
3351         self.pg0.add_stream(p)
3352         self.pg_enable_capture(self.pg_interfaces)
3353         self.pg_start()
3354         capture = self.pg1.get_capture(1)
3355         # MSS clamping disabled - negotiated MSS unchanged
3356         self.verify_mss_value(capture[0], 1400)
3357
3358         self.vapi.nat_set_mss_clamping(enable=1, mss_value=1500)
3359         self.pg0.add_stream(p)
3360         self.pg_enable_capture(self.pg_interfaces)
3361         self.pg_start()
3362         capture = self.pg1.get_capture(1)
3363         # Negotiated MSS value smaller than configured - unchanged
3364         self.verify_mss_value(capture[0], 1400)
3365
3366     def test_ha_send(self):
3367         """ NAT44EI Send HA session synchronization events (active) """
3368         flags = self.config_flags.NAT_IS_INSIDE
3369         self.vapi.nat44_interface_add_del_feature(
3370             sw_if_index=self.pg0.sw_if_index,
3371             flags=flags, is_add=1)
3372         self.vapi.nat44_interface_add_del_feature(
3373             sw_if_index=self.pg1.sw_if_index,
3374             is_add=1)
3375         self.nat44_add_address(self.nat_addr)
3376
3377         self.vapi.nat_ha_set_listener(ip_address=self.pg3.local_ip4,
3378                                       port=12345,
3379                                       path_mtu=512)
3380         self.vapi.nat_ha_set_failover(ip_address=self.pg3.remote_ip4,
3381                                       port=12346, session_refresh_interval=10)
3382         bind_layers(UDP, HANATStateSync, sport=12345)
3383
3384         # create sessions
3385         pkts = self.create_stream_in(self.pg0, self.pg1)
3386         self.pg0.add_stream(pkts)
3387         self.pg_enable_capture(self.pg_interfaces)
3388         self.pg_start()
3389         capture = self.pg1.get_capture(len(pkts))
3390         self.verify_capture_out(capture)
3391         # active send HA events
3392         self.vapi.nat_ha_flush()
3393         stats = self.statistics.get_counter('/nat44/ha/add-event-send')
3394         self.assertEqual(stats[0][0], 3)
3395         capture = self.pg3.get_capture(1)
3396         p = capture[0]
3397         self.assert_packet_checksums_valid(p)
3398         try:
3399             ip = p[IP]
3400             udp = p[UDP]
3401             hanat = p[HANATStateSync]
3402         except IndexError:
3403             self.logger.error(ppp("Invalid packet:", p))
3404             raise
3405         else:
3406             self.assertEqual(ip.src, self.pg3.local_ip4)
3407             self.assertEqual(ip.dst, self.pg3.remote_ip4)
3408             self.assertEqual(udp.sport, 12345)
3409             self.assertEqual(udp.dport, 12346)
3410             self.assertEqual(hanat.version, 1)
3411             self.assertEqual(hanat.thread_index, 0)
3412             self.assertEqual(hanat.count, 3)
3413             seq = hanat.sequence_number
3414             for event in hanat.events:
3415                 self.assertEqual(event.event_type, 1)
3416                 self.assertEqual(event.in_addr, self.pg0.remote_ip4)
3417                 self.assertEqual(event.out_addr, self.nat_addr)
3418                 self.assertEqual(event.fib_index, 0)
3419
3420         # ACK received events
3421         ack = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
3422                IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
3423                UDP(sport=12346, dport=12345) /
3424                HANATStateSync(sequence_number=seq, flags='ACK'))
3425         self.pg3.add_stream(ack)
3426         self.pg_start()
3427         stats = self.statistics.get_counter('/nat44/ha/ack-recv')
3428         self.assertEqual(stats[0][0], 1)
3429
3430         # delete one session
3431         self.pg_enable_capture(self.pg_interfaces)
3432         self.vapi.nat44_del_session(address=self.pg0.remote_ip4,
3433                                     port=self.tcp_port_in,
3434                                     protocol=IP_PROTOS.tcp,
3435                                     flags=self.config_flags.NAT_IS_INSIDE)
3436         self.vapi.nat_ha_flush()
3437         stats = self.statistics.get_counter('/nat44/ha/del-event-send')
3438         self.assertEqual(stats[0][0], 1)
3439         capture = self.pg3.get_capture(1)
3440         p = capture[0]
3441         try:
3442             hanat = p[HANATStateSync]
3443         except IndexError:
3444             self.logger.error(ppp("Invalid packet:", p))
3445             raise
3446         else:
3447             self.assertGreater(hanat.sequence_number, seq)
3448
3449         # do not send ACK, active retry send HA event again
3450         self.pg_enable_capture(self.pg_interfaces)
3451         sleep(12)
3452         stats = self.statistics.get_counter('/nat44/ha/retry-count')
3453         self.assertEqual(stats[0][0], 3)
3454         stats = self.statistics.get_counter('/nat44/ha/missed-count')
3455         self.assertEqual(stats[0][0], 1)
3456         capture = self.pg3.get_capture(3)
3457         for packet in capture:
3458             self.assertEqual(packet, p)
3459
3460         # session counters refresh
3461         pkts = self.create_stream_out(self.pg1)
3462         self.pg1.add_stream(pkts)
3463         self.pg_enable_capture(self.pg_interfaces)
3464         self.pg_start()
3465         self.pg0.get_capture(2)
3466         self.vapi.nat_ha_flush()
3467         stats = self.statistics.get_counter('/nat44/ha/refresh-event-send')
3468         self.assertEqual(stats[0][0], 2)
3469         capture = self.pg3.get_capture(1)
3470         p = capture[0]
3471         self.assert_packet_checksums_valid(p)
3472         try:
3473             ip = p[IP]
3474             udp = p[UDP]
3475             hanat = p[HANATStateSync]
3476         except IndexError:
3477             self.logger.error(ppp("Invalid packet:", p))
3478             raise
3479         else:
3480             self.assertEqual(ip.src, self.pg3.local_ip4)
3481             self.assertEqual(ip.dst, self.pg3.remote_ip4)
3482             self.assertEqual(udp.sport, 12345)
3483             self.assertEqual(udp.dport, 12346)
3484             self.assertEqual(hanat.version, 1)
3485             self.assertEqual(hanat.count, 2)
3486             seq = hanat.sequence_number
3487             for event in hanat.events:
3488                 self.assertEqual(event.event_type, 3)
3489                 self.assertEqual(event.out_addr, self.nat_addr)
3490                 self.assertEqual(event.fib_index, 0)
3491                 self.assertEqual(event.total_pkts, 2)
3492                 self.assertGreater(event.total_bytes, 0)
3493
3494         ack = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
3495                IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
3496                UDP(sport=12346, dport=12345) /
3497                HANATStateSync(sequence_number=seq, flags='ACK'))
3498         self.pg3.add_stream(ack)
3499         self.pg_start()
3500         stats = self.statistics.get_counter('/nat44/ha/ack-recv')
3501         self.assertEqual(stats[0][0], 2)
3502
3503     def test_ha_recv(self):
3504         """ NAT44EI Receive HA session synchronization events (passive) """
3505         self.nat44_add_address(self.nat_addr)
3506         flags = self.config_flags.NAT_IS_INSIDE
3507         self.vapi.nat44_interface_add_del_feature(
3508             sw_if_index=self.pg0.sw_if_index,
3509             flags=flags, is_add=1)
3510         self.vapi.nat44_interface_add_del_feature(
3511             sw_if_index=self.pg1.sw_if_index,
3512             is_add=1)
3513         self.vapi.nat_ha_set_listener(ip_address=self.pg3.local_ip4,
3514                                       port=12345,
3515                                       path_mtu=512)
3516         bind_layers(UDP, HANATStateSync, sport=12345)
3517
3518         self.tcp_port_out = random.randint(1025, 65535)
3519         self.udp_port_out = random.randint(1025, 65535)
3520
3521         # send HA session add events to failover/passive
3522         p = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
3523              IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
3524              UDP(sport=12346, dport=12345) /
3525              HANATStateSync(sequence_number=1, events=[
3526                  Event(event_type='add', protocol='tcp',
3527                        in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr,
3528                        in_port=self.tcp_port_in, out_port=self.tcp_port_out,
3529                        eh_addr=self.pg1.remote_ip4,
3530                        ehn_addr=self.pg1.remote_ip4,
3531                        eh_port=self.tcp_external_port,
3532                        ehn_port=self.tcp_external_port, fib_index=0),
3533                  Event(event_type='add', protocol='udp',
3534                        in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr,
3535                        in_port=self.udp_port_in, out_port=self.udp_port_out,
3536                        eh_addr=self.pg1.remote_ip4,
3537                        ehn_addr=self.pg1.remote_ip4,
3538                        eh_port=self.udp_external_port,
3539                        ehn_port=self.udp_external_port, fib_index=0)]))
3540
3541         self.pg3.add_stream(p)
3542         self.pg_enable_capture(self.pg_interfaces)
3543         self.pg_start()
3544         # receive ACK
3545         capture = self.pg3.get_capture(1)
3546         p = capture[0]
3547         try:
3548             hanat = p[HANATStateSync]
3549         except IndexError:
3550             self.logger.error(ppp("Invalid packet:", p))
3551             raise
3552         else:
3553             self.assertEqual(hanat.sequence_number, 1)
3554             self.assertEqual(hanat.flags, 'ACK')
3555             self.assertEqual(hanat.version, 1)
3556             self.assertEqual(hanat.thread_index, 0)
3557         stats = self.statistics.get_counter('/nat44/ha/ack-send')
3558         self.assertEqual(stats[0][0], 1)
3559         stats = self.statistics.get_counter('/nat44/ha/add-event-recv')
3560         self.assertEqual(stats[0][0], 2)
3561         users = self.statistics.get_counter('/nat44/total-users')
3562         self.assertEqual(users[0][0], 1)
3563         sessions = self.statistics.get_counter('/nat44/total-sessions')
3564         self.assertEqual(sessions[0][0], 2)
3565         users = self.vapi.nat44_user_dump()
3566         self.assertEqual(len(users), 1)
3567         self.assertEqual(str(users[0].ip_address),
3568                          self.pg0.remote_ip4)
3569         # there should be 2 sessions created by HA
3570         sessions = self.vapi.nat44_user_session_dump(users[0].ip_address,
3571                                                      users[0].vrf_id)
3572         self.assertEqual(len(sessions), 2)
3573         for session in sessions:
3574             self.assertEqual(str(session.inside_ip_address),
3575                              self.pg0.remote_ip4)
3576             self.assertEqual(str(session.outside_ip_address),
3577                              self.nat_addr)
3578             self.assertIn(session.inside_port,
3579                           [self.tcp_port_in, self.udp_port_in])
3580             self.assertIn(session.outside_port,
3581                           [self.tcp_port_out, self.udp_port_out])
3582             self.assertIn(session.protocol, [IP_PROTOS.tcp, IP_PROTOS.udp])
3583
3584         # send HA session delete event to failover/passive
3585         p = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
3586              IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
3587              UDP(sport=12346, dport=12345) /
3588              HANATStateSync(sequence_number=2, events=[
3589                  Event(event_type='del', protocol='udp',
3590                        in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr,
3591                        in_port=self.udp_port_in, out_port=self.udp_port_out,
3592                        eh_addr=self.pg1.remote_ip4,
3593                        ehn_addr=self.pg1.remote_ip4,
3594                        eh_port=self.udp_external_port,
3595                        ehn_port=self.udp_external_port, fib_index=0)]))
3596
3597         self.pg3.add_stream(p)
3598         self.pg_enable_capture(self.pg_interfaces)
3599         self.pg_start()
3600         # receive ACK
3601         capture = self.pg3.get_capture(1)
3602         p = capture[0]
3603         try:
3604             hanat = p[HANATStateSync]
3605         except IndexError:
3606             self.logger.error(ppp("Invalid packet:", p))
3607             raise
3608         else:
3609             self.assertEqual(hanat.sequence_number, 2)
3610             self.assertEqual(hanat.flags, 'ACK')
3611             self.assertEqual(hanat.version, 1)
3612         users = self.vapi.nat44_user_dump()
3613         self.assertEqual(len(users), 1)
3614         self.assertEqual(str(users[0].ip_address),
3615                          self.pg0.remote_ip4)
3616         # now we should have only 1 session, 1 deleted by HA
3617         sessions = self.vapi.nat44_user_session_dump(users[0].ip_address,
3618                                                      users[0].vrf_id)
3619         self.assertEqual(len(sessions), 1)
3620         stats = self.statistics.get_counter('/nat44/ha/del-event-recv')
3621         self.assertEqual(stats[0][0], 1)
3622
3623         stats = self.statistics.get_err_counter('/err/nat-ha/pkts-processed')
3624         self.assertEqual(stats, 2)
3625
3626         # send HA session refresh event to failover/passive
3627         p = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
3628              IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
3629              UDP(sport=12346, dport=12345) /
3630              HANATStateSync(sequence_number=3, events=[
3631                  Event(event_type='refresh', protocol='tcp',
3632                        in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr,
3633                        in_port=self.tcp_port_in, out_port=self.tcp_port_out,
3634                        eh_addr=self.pg1.remote_ip4,
3635                        ehn_addr=self.pg1.remote_ip4,
3636                        eh_port=self.tcp_external_port,
3637                        ehn_port=self.tcp_external_port, fib_index=0,
3638                        total_bytes=1024, total_pkts=2)]))
3639         self.pg3.add_stream(p)
3640         self.pg_enable_capture(self.pg_interfaces)
3641         self.pg_start()
3642         # receive ACK
3643         capture = self.pg3.get_capture(1)
3644         p = capture[0]
3645         try:
3646             hanat = p[HANATStateSync]
3647         except IndexError:
3648             self.logger.error(ppp("Invalid packet:", p))
3649             raise
3650         else:
3651             self.assertEqual(hanat.sequence_number, 3)
3652             self.assertEqual(hanat.flags, 'ACK')
3653             self.assertEqual(hanat.version, 1)
3654         users = self.vapi.nat44_user_dump()
3655         self.assertEqual(len(users), 1)
3656         self.assertEqual(str(users[0].ip_address),
3657                          self.pg0.remote_ip4)
3658         sessions = self.vapi.nat44_user_session_dump(users[0].ip_address,
3659                                                      users[0].vrf_id)
3660         self.assertEqual(len(sessions), 1)
3661         session = sessions[0]
3662         self.assertEqual(session.total_bytes, 1024)
3663         self.assertEqual(session.total_pkts, 2)
3664         stats = self.statistics.get_counter('/nat44/ha/refresh-event-recv')
3665         self.assertEqual(stats[0][0], 1)
3666
3667         stats = self.statistics.get_err_counter('/err/nat-ha/pkts-processed')
3668         self.assertEqual(stats, 3)
3669
3670         # send packet to test session created by HA
3671         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3672              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3673              TCP(sport=self.tcp_external_port, dport=self.tcp_port_out))
3674         self.pg1.add_stream(p)
3675         self.pg_enable_capture(self.pg_interfaces)
3676         self.pg_start()
3677         capture = self.pg0.get_capture(1)
3678         p = capture[0]
3679         try:
3680             ip = p[IP]
3681             tcp = p[TCP]
3682         except IndexError:
3683             self.logger.error(ppp("Invalid packet:", p))
3684             raise
3685         else:
3686             self.assertEqual(ip.src, self.pg1.remote_ip4)
3687             self.assertEqual(ip.dst, self.pg0.remote_ip4)
3688             self.assertEqual(tcp.sport, self.tcp_external_port)
3689             self.assertEqual(tcp.dport, self.tcp_port_in)
3690
3691     def show_commands_at_teardown(self):
3692         self.logger.info(self.vapi.cli("show nat44 addresses"))
3693         self.logger.info(self.vapi.cli("show nat44 interfaces"))
3694         self.logger.info(self.vapi.cli("show nat44 static mappings"))
3695         self.logger.info(self.vapi.cli("show nat44 interface address"))
3696         self.logger.info(self.vapi.cli("show nat44 sessions detail"))
3697         self.logger.info(self.vapi.cli("show nat44 hash tables detail"))
3698         self.logger.info(self.vapi.cli("show nat timeouts"))
3699         self.logger.info(
3700             self.vapi.cli("show nat addr-port-assignment-alg"))
3701         self.logger.info(self.vapi.cli("show nat ha"))
3702
3703
3704 class TestNAT44Out2InDPO(MethodHolder):
3705     """ NAT44EI Test Cases using out2in DPO """
3706
3707     @classmethod
3708     def setUpClass(cls):
3709         super(TestNAT44Out2InDPO, cls).setUpClass()
3710         cls.vapi.cli("set log class nat level debug")
3711
3712         cls.tcp_port_in = 6303
3713         cls.tcp_port_out = 6303
3714         cls.udp_port_in = 6304
3715         cls.udp_port_out = 6304
3716         cls.icmp_id_in = 6305
3717         cls.icmp_id_out = 6305
3718         cls.nat_addr = '10.0.0.3'
3719         cls.dst_ip4 = '192.168.70.1'
3720
3721         cls.create_pg_interfaces(range(2))
3722
3723         cls.pg0.admin_up()
3724         cls.pg0.config_ip4()
3725         cls.pg0.resolve_arp()
3726
3727         cls.pg1.admin_up()
3728         cls.pg1.config_ip6()
3729         cls.pg1.resolve_ndp()
3730
3731         r1 = VppIpRoute(cls, "::", 0,
3732                         [VppRoutePath(cls.pg1.remote_ip6,
3733                                       cls.pg1.sw_if_index)],
3734                         register=False)
3735         r1.add_vpp_config()
3736
3737     def setUp(self):
3738         super(TestNAT44Out2InDPO, self).setUp()
3739         flags = self.nat44_config_flags.NAT44_API_IS_OUT2IN_DPO
3740         self.vapi.nat44_plugin_enable_disable(enable=1, flags=flags)
3741
3742     def tearDown(self):
3743         super(TestNAT44Out2InDPO, self).tearDown()
3744         if not self.vpp_dead:
3745             self.vapi.nat44_plugin_enable_disable(enable=0)
3746             self.vapi.cli("clear logging")
3747
3748     def configure_xlat(self):
3749         self.dst_ip6_pfx = '1:2:3::'
3750         self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3751                                               self.dst_ip6_pfx)
3752         self.dst_ip6_pfx_len = 96
3753         self.src_ip6_pfx = '4:5:6::'
3754         self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3755                                               self.src_ip6_pfx)
3756         self.src_ip6_pfx_len = 96
3757         self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
3758                                  self.src_ip6_pfx_n, self.src_ip6_pfx_len,
3759                                  '\x00\x00\x00\x00', 0)
3760
3761     @unittest.skip('Temporary disabled')
3762     def test_464xlat_ce(self):
3763         """ Test 464XLAT CE with NAT44EI """
3764
3765         nat_config = self.vapi.nat_show_config()
3766         self.assertEqual(1, nat_config.out2in_dpo)
3767
3768         self.configure_xlat()
3769
3770         flags = self.config_flags.NAT_IS_INSIDE
3771         self.vapi.nat44_interface_add_del_feature(
3772             sw_if_index=self.pg0.sw_if_index,
3773             flags=flags, is_add=1)
3774         self.vapi.nat44_add_del_address_range(first_ip_address=self.nat_addr_n,
3775                                               last_ip_address=self.nat_addr_n,
3776                                               vrf_id=0xFFFFFFFF, is_add=1)
3777
3778         out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3779                                        self.dst_ip6_pfx_len)
3780         out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
3781                                        self.src_ip6_pfx_len)
3782
3783         try:
3784             pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3785             self.pg0.add_stream(pkts)
3786             self.pg_enable_capture(self.pg_interfaces)
3787             self.pg_start()
3788             capture = self.pg1.get_capture(len(pkts))
3789             self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
3790                                         dst_ip=out_src_ip6)
3791
3792             pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
3793                                               out_dst_ip6)
3794             self.pg1.add_stream(pkts)
3795             self.pg_enable_capture(self.pg_interfaces)
3796             self.pg_start()
3797             capture = self.pg0.get_capture(len(pkts))
3798             self.verify_capture_in(capture, self.pg0)
3799         finally:
3800             self.vapi.nat44_interface_add_del_feature(
3801                 sw_if_index=self.pg0.sw_if_index,
3802                 flags=flags)
3803             self.vapi.nat44_add_del_address_range(
3804                 first_ip_address=self.nat_addr_n,
3805                 last_ip_address=self.nat_addr_n,
3806                 vrf_id=0xFFFFFFFF)
3807
3808     @unittest.skip('Temporary disabled')
3809     def test_464xlat_ce_no_nat(self):
3810         """ Test 464XLAT CE without NAT44EI """
3811
3812         self.configure_xlat()
3813
3814         out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3815                                        self.dst_ip6_pfx_len)
3816         out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
3817                                        self.src_ip6_pfx_len)
3818
3819         pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3820         self.pg0.add_stream(pkts)
3821         self.pg_enable_capture(self.pg_interfaces)
3822         self.pg_start()
3823         capture = self.pg1.get_capture(len(pkts))
3824         self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
3825                                     nat_ip=out_dst_ip6, same_port=True)
3826
3827         pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
3828         self.pg1.add_stream(pkts)
3829         self.pg_enable_capture(self.pg_interfaces)
3830         self.pg_start()
3831         capture = self.pg0.get_capture(len(pkts))
3832         self.verify_capture_in(capture, self.pg0)
3833
3834
3835 if __name__ == '__main__':
3836     unittest.main(testRunner=VppTestRunner)