nat: nat44-ei hairpinning code cleanup
[vpp.git] / test / test_nat44_ei.py
1 #!/usr/bin/env python3
2
3 import ipaddress
4 import random
5 import socket
6 import struct
7 import unittest
8 from io import BytesIO
9
10 import scapy.compat
11 from framework import VppTestCase, VppTestRunner
12 from ipfix import IPFIX, Set, Template, Data, IPFIXDecoder
13 from scapy.all import bind_layers, Packet, ByteEnumField, ShortField, \
14     IPField, IntField, LongField, XByteField, FlagsField, FieldLenField, \
15     PacketListField
16 from scapy.data import IP_PROTOS
17 from scapy.layers.inet import IP, TCP, UDP, ICMP
18 from scapy.layers.inet import IPerror, TCPerror, UDPerror, ICMPerror
19 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply
20 from scapy.layers.l2 import Ether, ARP, GRE
21 from scapy.packet import Raw
22 from syslog_rfc5424_parser import SyslogMessage, ParseError
23 from syslog_rfc5424_parser.constants import SyslogSeverity
24 from util import ppp
25 from vpp_ip_route import VppIpRoute, VppRoutePath
26 from vpp_neighbor import VppNeighbor
27 from vpp_papi import VppEnum
28
29
30 # NAT HA protocol event data
31 class Event(Packet):
32     name = "Event"
33     fields_desc = [ByteEnumField("event_type", None,
34                                  {1: "add", 2: "del", 3: "refresh"}),
35                    ByteEnumField("protocol", None,
36                                  {0: "other", 1: "udp", 2: "tcp", 3: "icmp"}),
37                    ShortField("flags", 0),
38                    IPField("in_addr", None),
39                    IPField("out_addr", None),
40                    ShortField("in_port", None),
41                    ShortField("out_port", None),
42                    IPField("eh_addr", None),
43                    IPField("ehn_addr", None),
44                    ShortField("eh_port", None),
45                    ShortField("ehn_port", None),
46                    IntField("fib_index", None),
47                    IntField("total_pkts", 0),
48                    LongField("total_bytes", 0)]
49
50     def extract_padding(self, s):
51         return "", s
52
53
54 # NAT HA protocol header
55 class HANATStateSync(Packet):
56     name = "HA NAT state sync"
57     fields_desc = [XByteField("version", 1),
58                    FlagsField("flags", 0, 8, ['ACK']),
59                    FieldLenField("count", None, count_of="events"),
60                    IntField("sequence_number", 1),
61                    IntField("thread_index", 0),
62                    PacketListField("events", [], Event,
63                                    count_from=lambda pkt: pkt.count)]
64
65
66 class MethodHolder(VppTestCase):
67     """ NAT create capture and verify method holder """
68
69     @property
70     def config_flags(self):
71         return VppEnum.vl_api_nat44_ei_config_flags_t
72
73     @property
74     def SYSLOG_SEVERITY(self):
75         return VppEnum.vl_api_syslog_severity_t
76
77     def nat44_add_static_mapping(self, local_ip, external_ip='0.0.0.0',
78                                  local_port=0, external_port=0, vrf_id=0,
79                                  is_add=1, external_sw_if_index=0xFFFFFFFF,
80                                  proto=0, tag="", flags=0):
81         """
82         Add/delete NAT44EI static mapping
83
84         :param local_ip: Local IP address
85         :param external_ip: External IP address
86         :param local_port: Local port number (Optional)
87         :param external_port: External port number (Optional)
88         :param vrf_id: VRF ID (Default 0)
89         :param is_add: 1 if add, 0 if delete (Default add)
90         :param external_sw_if_index: External interface instead of IP address
91         :param proto: IP protocol (Mandatory if port specified)
92         :param tag: Opaque string tag
93         :param flags: NAT configuration flags
94         """
95
96         if not (local_port and external_port):
97             flags |= self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
98
99         self.vapi.nat44_ei_add_del_static_mapping(
100             is_add=is_add,
101             local_ip_address=local_ip,
102             external_ip_address=external_ip,
103             external_sw_if_index=external_sw_if_index,
104             local_port=local_port,
105             external_port=external_port,
106             vrf_id=vrf_id, protocol=proto,
107             flags=flags,
108             tag=tag)
109
110     def nat44_add_address(self, ip, is_add=1, vrf_id=0xFFFFFFFF):
111         """
112         Add/delete NAT44EI address
113
114         :param ip: IP address
115         :param is_add: 1 if add, 0 if delete (Default add)
116         """
117         self.vapi.nat44_ei_add_del_address_range(first_ip_address=ip,
118                                                  last_ip_address=ip,
119                                                  vrf_id=vrf_id,
120                                                  is_add=is_add)
121
122     def create_routes_and_neigbors(self):
123         r1 = VppIpRoute(self, self.pg7.remote_ip4, 32,
124                         [VppRoutePath(self.pg7.remote_ip4,
125                                       self.pg7.sw_if_index)])
126         r2 = VppIpRoute(self, self.pg8.remote_ip4, 32,
127                         [VppRoutePath(self.pg8.remote_ip4,
128                                       self.pg8.sw_if_index)])
129         r1.add_vpp_config()
130         r2.add_vpp_config()
131
132         n1 = VppNeighbor(self,
133                          self.pg7.sw_if_index,
134                          self.pg7.remote_mac,
135                          self.pg7.remote_ip4,
136                          is_static=1)
137         n2 = VppNeighbor(self,
138                          self.pg8.sw_if_index,
139                          self.pg8.remote_mac,
140                          self.pg8.remote_ip4,
141                          is_static=1)
142         n1.add_vpp_config()
143         n2.add_vpp_config()
144
145     def create_stream_in(self, in_if, out_if, dst_ip=None, ttl=64):
146         """
147         Create packet stream for inside network
148
149         :param in_if: Inside interface
150         :param out_if: Outside interface
151         :param dst_ip: Destination address
152         :param ttl: TTL of generated packets
153         """
154         if dst_ip is None:
155             dst_ip = out_if.remote_ip4
156
157         pkts = []
158         # TCP
159         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
160              IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
161              TCP(sport=self.tcp_port_in, dport=20))
162         pkts.extend([p, p])
163
164         # UDP
165         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
166              IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
167              UDP(sport=self.udp_port_in, dport=20))
168         pkts.append(p)
169
170         # ICMP
171         p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
172              IP(src=in_if.remote_ip4, dst=dst_ip, ttl=ttl) /
173              ICMP(id=self.icmp_id_in, type='echo-request'))
174         pkts.append(p)
175
176         return pkts
177
178     def compose_ip6(self, ip4, pref, plen):
179         """
180         Compose IPv4-embedded IPv6 addresses
181
182         :param ip4: IPv4 address
183         :param pref: IPv6 prefix
184         :param plen: IPv6 prefix length
185         :returns: IPv4-embedded IPv6 addresses
186         """
187         pref_n = list(socket.inet_pton(socket.AF_INET6, pref))
188         ip4_n = list(socket.inet_pton(socket.AF_INET, ip4))
189         if plen == 32:
190             pref_n[4] = ip4_n[0]
191             pref_n[5] = ip4_n[1]
192             pref_n[6] = ip4_n[2]
193             pref_n[7] = ip4_n[3]
194         elif plen == 40:
195             pref_n[5] = ip4_n[0]
196             pref_n[6] = ip4_n[1]
197             pref_n[7] = ip4_n[2]
198             pref_n[9] = ip4_n[3]
199         elif plen == 48:
200             pref_n[6] = ip4_n[0]
201             pref_n[7] = ip4_n[1]
202             pref_n[9] = ip4_n[2]
203             pref_n[10] = ip4_n[3]
204         elif plen == 56:
205             pref_n[7] = ip4_n[0]
206             pref_n[9] = ip4_n[1]
207             pref_n[10] = ip4_n[2]
208             pref_n[11] = ip4_n[3]
209         elif plen == 64:
210             pref_n[9] = ip4_n[0]
211             pref_n[10] = ip4_n[1]
212             pref_n[11] = ip4_n[2]
213             pref_n[12] = ip4_n[3]
214         elif plen == 96:
215             pref_n[12] = ip4_n[0]
216             pref_n[13] = ip4_n[1]
217             pref_n[14] = ip4_n[2]
218             pref_n[15] = ip4_n[3]
219         packed_pref_n = b''.join([scapy.compat.chb(x) for x in pref_n])
220         return socket.inet_ntop(socket.AF_INET6, packed_pref_n)
221
222     def create_stream_out(self, out_if, dst_ip=None, ttl=64,
223                           use_inside_ports=False):
224         """
225         Create packet stream for outside network
226
227         :param out_if: Outside interface
228         :param dst_ip: Destination IP address (Default use global NAT address)
229         :param ttl: TTL of generated packets
230         :param use_inside_ports: Use inside NAT ports as destination ports
231                instead of outside ports
232         """
233         if dst_ip is None:
234             dst_ip = self.nat_addr
235         if not use_inside_ports:
236             tcp_port = self.tcp_port_out
237             udp_port = self.udp_port_out
238             icmp_id = self.icmp_id_out
239         else:
240             tcp_port = self.tcp_port_in
241             udp_port = self.udp_port_in
242             icmp_id = self.icmp_id_in
243         pkts = []
244         # TCP
245         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
246              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
247              TCP(dport=tcp_port, sport=20))
248         pkts.extend([p, p])
249
250         # UDP
251         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
252              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
253              UDP(dport=udp_port, sport=20))
254         pkts.append(p)
255
256         # ICMP
257         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
258              IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
259              ICMP(id=icmp_id, type='echo-reply'))
260         pkts.append(p)
261
262         return pkts
263
264     def create_stream_out_ip6(self, out_if, src_ip, dst_ip, hl=64):
265         """
266         Create packet stream for outside network
267
268         :param out_if: Outside interface
269         :param dst_ip: Destination IP address (Default use global NAT address)
270         :param hl: HL of generated packets
271         """
272         pkts = []
273         # TCP
274         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
275              IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
276              TCP(dport=self.tcp_port_out, sport=20))
277         pkts.append(p)
278
279         # UDP
280         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
281              IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
282              UDP(dport=self.udp_port_out, sport=20))
283         pkts.append(p)
284
285         # ICMP
286         p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
287              IPv6(src=src_ip, dst=dst_ip, hlim=hl) /
288              ICMPv6EchoReply(id=self.icmp_id_out))
289         pkts.append(p)
290
291         return pkts
292
293     def verify_capture_out(self, capture, nat_ip=None, same_port=False,
294                            dst_ip=None, is_ip6=False, ignore_port=False):
295         """
296         Verify captured packets on outside network
297
298         :param capture: Captured packets
299         :param nat_ip: Translated IP address (Default use global NAT address)
300         :param same_port: Source port number is not translated (Default False)
301         :param dst_ip: Destination IP address (Default do not verify)
302         :param is_ip6: If L3 protocol is IPv6 (Default False)
303         """
304         if is_ip6:
305             IP46 = IPv6
306             ICMP46 = ICMPv6EchoRequest
307         else:
308             IP46 = IP
309             ICMP46 = ICMP
310         if nat_ip is None:
311             nat_ip = self.nat_addr
312         for packet in capture:
313             try:
314                 if not is_ip6:
315                     self.assert_packet_checksums_valid(packet)
316                 self.assertEqual(packet[IP46].src, nat_ip)
317                 if dst_ip is not None:
318                     self.assertEqual(packet[IP46].dst, dst_ip)
319                 if packet.haslayer(TCP):
320                     if not ignore_port:
321                         if same_port:
322                             self.assertEqual(
323                                 packet[TCP].sport, self.tcp_port_in)
324                         else:
325                             self.assertNotEqual(
326                                 packet[TCP].sport, self.tcp_port_in)
327                     self.tcp_port_out = packet[TCP].sport
328                     self.assert_packet_checksums_valid(packet)
329                 elif packet.haslayer(UDP):
330                     if not ignore_port:
331                         if same_port:
332                             self.assertEqual(
333                                 packet[UDP].sport, self.udp_port_in)
334                         else:
335                             self.assertNotEqual(
336                                 packet[UDP].sport, self.udp_port_in)
337                     self.udp_port_out = packet[UDP].sport
338                 else:
339                     if not ignore_port:
340                         if same_port:
341                             self.assertEqual(
342                                 packet[ICMP46].id, self.icmp_id_in)
343                         else:
344                             self.assertNotEqual(
345                                 packet[ICMP46].id, self.icmp_id_in)
346                     self.icmp_id_out = packet[ICMP46].id
347                     self.assert_packet_checksums_valid(packet)
348             except:
349                 self.logger.error(ppp("Unexpected or invalid packet "
350                                       "(outside network):", packet))
351                 raise
352
353     def verify_capture_out_ip6(self, capture, nat_ip, same_port=False,
354                                dst_ip=None):
355         """
356         Verify captured packets on outside network
357
358         :param capture: Captured packets
359         :param nat_ip: Translated IP address
360         :param same_port: Source port number is not translated (Default False)
361         :param dst_ip: Destination IP address (Default do not verify)
362         """
363         return self.verify_capture_out(capture, nat_ip, same_port, dst_ip,
364                                        True)
365
366     def verify_capture_in(self, capture, in_if):
367         """
368         Verify captured packets on inside network
369
370         :param capture: Captured packets
371         :param in_if: Inside interface
372         """
373         for packet in capture:
374             try:
375                 self.assert_packet_checksums_valid(packet)
376                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
377                 if packet.haslayer(TCP):
378                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
379                 elif packet.haslayer(UDP):
380                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
381                 else:
382                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
383             except:
384                 self.logger.error(ppp("Unexpected or invalid packet "
385                                       "(inside network):", packet))
386                 raise
387
388     def verify_capture_no_translation(self, capture, ingress_if, egress_if):
389         """
390         Verify captured packet that don't have to be translated
391
392         :param capture: Captured packets
393         :param ingress_if: Ingress interface
394         :param egress_if: Egress interface
395         """
396         for packet in capture:
397             try:
398                 self.assertEqual(packet[IP].src, ingress_if.remote_ip4)
399                 self.assertEqual(packet[IP].dst, egress_if.remote_ip4)
400                 if packet.haslayer(TCP):
401                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
402                 elif packet.haslayer(UDP):
403                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
404                 else:
405                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
406             except:
407                 self.logger.error(ppp("Unexpected or invalid packet "
408                                       "(inside network):", packet))
409                 raise
410
411     def verify_capture_out_with_icmp_errors(self, capture, src_ip=None,
412                                             icmp_type=11):
413         """
414         Verify captured packets with ICMP errors on outside network
415
416         :param capture: Captured packets
417         :param src_ip: Translated IP address or IP address of VPP
418                        (Default use global NAT address)
419         :param icmp_type: Type of error ICMP packet
420                           we are expecting (Default 11)
421         """
422         if src_ip is None:
423             src_ip = self.nat_addr
424         for packet in capture:
425             try:
426                 self.assertEqual(packet[IP].src, src_ip)
427                 self.assertEqual(packet.haslayer(ICMP), 1)
428                 icmp = packet[ICMP]
429                 self.assertEqual(icmp.type, icmp_type)
430                 self.assertTrue(icmp.haslayer(IPerror))
431                 inner_ip = icmp[IPerror]
432                 if inner_ip.haslayer(TCPerror):
433                     self.assertEqual(inner_ip[TCPerror].dport,
434                                      self.tcp_port_out)
435                 elif inner_ip.haslayer(UDPerror):
436                     self.assertEqual(inner_ip[UDPerror].dport,
437                                      self.udp_port_out)
438                 else:
439                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_out)
440             except:
441                 self.logger.error(ppp("Unexpected or invalid packet "
442                                       "(outside network):", packet))
443                 raise
444
445     def verify_capture_in_with_icmp_errors(self, capture, in_if, icmp_type=11):
446         """
447         Verify captured packets with ICMP errors on inside network
448
449         :param capture: Captured packets
450         :param in_if: Inside interface
451         :param icmp_type: Type of error ICMP packet
452                           we are expecting (Default 11)
453         """
454         for packet in capture:
455             try:
456                 self.assertEqual(packet[IP].dst, in_if.remote_ip4)
457                 self.assertEqual(packet.haslayer(ICMP), 1)
458                 icmp = packet[ICMP]
459                 self.assertEqual(icmp.type, icmp_type)
460                 self.assertTrue(icmp.haslayer(IPerror))
461                 inner_ip = icmp[IPerror]
462                 if inner_ip.haslayer(TCPerror):
463                     self.assertEqual(inner_ip[TCPerror].sport,
464                                      self.tcp_port_in)
465                 elif inner_ip.haslayer(UDPerror):
466                     self.assertEqual(inner_ip[UDPerror].sport,
467                                      self.udp_port_in)
468                 else:
469                     self.assertEqual(inner_ip[ICMPerror].id, self.icmp_id_in)
470             except:
471                 self.logger.error(ppp("Unexpected or invalid packet "
472                                       "(inside network):", packet))
473                 raise
474
475     def create_stream_frag(self, src_if, dst, sport, dport, data,
476                            proto=IP_PROTOS.tcp, echo_reply=False):
477         """
478         Create fragmented packet stream
479
480         :param src_if: Source interface
481         :param dst: Destination IPv4 address
482         :param sport: Source port
483         :param dport: Destination port
484         :param data: Payload data
485         :param proto: protocol (TCP, UDP, ICMP)
486         :param echo_reply: use echo_reply if protocol is ICMP
487         :returns: Fragments
488         """
489         if proto == IP_PROTOS.tcp:
490             p = (IP(src=src_if.remote_ip4, dst=dst) /
491                  TCP(sport=sport, dport=dport) /
492                  Raw(data))
493             p = p.__class__(scapy.compat.raw(p))
494             chksum = p[TCP].chksum
495             proto_header = TCP(sport=sport, dport=dport, chksum=chksum)
496         elif proto == IP_PROTOS.udp:
497             proto_header = UDP(sport=sport, dport=dport)
498         elif proto == IP_PROTOS.icmp:
499             if not echo_reply:
500                 proto_header = ICMP(id=sport, type='echo-request')
501             else:
502                 proto_header = ICMP(id=sport, type='echo-reply')
503         else:
504             raise Exception("Unsupported protocol")
505         id = random.randint(0, 65535)
506         pkts = []
507         if proto == IP_PROTOS.tcp:
508             raw = Raw(data[0:4])
509         else:
510             raw = Raw(data[0:16])
511         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
512              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=0, id=id) /
513              proto_header /
514              raw)
515         pkts.append(p)
516         if proto == IP_PROTOS.tcp:
517             raw = Raw(data[4:20])
518         else:
519             raw = Raw(data[16:32])
520         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
521              IP(src=src_if.remote_ip4, dst=dst, flags="MF", frag=3, id=id,
522                 proto=proto) /
523              raw)
524         pkts.append(p)
525         if proto == IP_PROTOS.tcp:
526             raw = Raw(data[20:])
527         else:
528             raw = Raw(data[32:])
529         p = (Ether(src=src_if.remote_mac, dst=src_if.local_mac) /
530              IP(src=src_if.remote_ip4, dst=dst, frag=5, proto=proto,
531                 id=id) /
532              raw)
533         pkts.append(p)
534         return pkts
535
536     def reass_frags_and_verify(self, frags, src, dst):
537         """
538         Reassemble and verify fragmented packet
539
540         :param frags: Captured fragments
541         :param src: Source IPv4 address to verify
542         :param dst: Destination IPv4 address to verify
543
544         :returns: Reassembled IPv4 packet
545         """
546         buffer = BytesIO()
547         for p in frags:
548             self.assertEqual(p[IP].src, src)
549             self.assertEqual(p[IP].dst, dst)
550             self.assert_ip_checksum_valid(p)
551             buffer.seek(p[IP].frag * 8)
552             buffer.write(bytes(p[IP].payload))
553         ip = IP(src=frags[0][IP].src, dst=frags[0][IP].dst,
554                 proto=frags[0][IP].proto)
555         if ip.proto == IP_PROTOS.tcp:
556             p = (ip / TCP(buffer.getvalue()))
557             self.logger.debug(ppp("Reassembled:", p))
558             self.assert_tcp_checksum_valid(p)
559         elif ip.proto == IP_PROTOS.udp:
560             p = (ip / UDP(buffer.getvalue()[:8]) /
561                  Raw(buffer.getvalue()[8:]))
562         elif ip.proto == IP_PROTOS.icmp:
563             p = (ip / ICMP(buffer.getvalue()))
564         return p
565
566     def verify_ipfix_nat44_ses(self, data):
567         """
568         Verify IPFIX NAT44EI session create/delete event
569
570         :param data: Decoded IPFIX data records
571         """
572         nat44_ses_create_num = 0
573         nat44_ses_delete_num = 0
574         self.assertEqual(6, len(data))
575         for record in data:
576             # natEvent
577             self.assertIn(scapy.compat.orb(record[230]), [4, 5])
578             if scapy.compat.orb(record[230]) == 4:
579                 nat44_ses_create_num += 1
580             else:
581                 nat44_ses_delete_num += 1
582             # sourceIPv4Address
583             self.assertEqual(self.pg0.remote_ip4,
584                              str(ipaddress.IPv4Address(record[8])))
585             # postNATSourceIPv4Address
586             self.assertEqual(socket.inet_pton(socket.AF_INET, self.nat_addr),
587                              record[225])
588             # ingressVRFID
589             self.assertEqual(struct.pack("!I", 0), record[234])
590             # protocolIdentifier/sourceTransportPort
591             # /postNAPTSourceTransportPort
592             if IP_PROTOS.icmp == scapy.compat.orb(record[4]):
593                 self.assertEqual(struct.pack("!H", self.icmp_id_in), record[7])
594                 self.assertEqual(struct.pack("!H", self.icmp_id_out),
595                                  record[227])
596             elif IP_PROTOS.tcp == scapy.compat.orb(record[4]):
597                 self.assertEqual(struct.pack("!H", self.tcp_port_in),
598                                  record[7])
599                 self.assertEqual(struct.pack("!H", self.tcp_port_out),
600                                  record[227])
601             elif IP_PROTOS.udp == scapy.compat.orb(record[4]):
602                 self.assertEqual(struct.pack("!H", self.udp_port_in),
603                                  record[7])
604                 self.assertEqual(struct.pack("!H", self.udp_port_out),
605                                  record[227])
606             else:
607                 self.fail(f"Invalid protocol {scapy.compat.orb(record[4])}")
608         self.assertEqual(3, nat44_ses_create_num)
609         self.assertEqual(3, nat44_ses_delete_num)
610
611     def verify_ipfix_addr_exhausted(self, data):
612         self.assertEqual(1, len(data))
613         record = data[0]
614         # natEvent
615         self.assertEqual(scapy.compat.orb(record[230]), 3)
616         # natPoolID
617         self.assertEqual(struct.pack("!I", 0), record[283])
618
619     def verify_ipfix_max_sessions(self, data, limit):
620         self.assertEqual(1, len(data))
621         record = data[0]
622         # natEvent
623         self.assertEqual(scapy.compat.orb(record[230]), 13)
624         # natQuotaExceededEvent
625         self.assertEqual(struct.pack("!I", 1), record[466])
626         # maxSessionEntries
627         self.assertEqual(struct.pack("!I", limit), record[471])
628
629     def verify_no_nat44_user(self):
630         """ Verify that there is no NAT44EI user """
631         users = self.vapi.nat44_ei_user_dump()
632         self.assertEqual(len(users), 0)
633         users = self.statistics['/nat44-ei/total-users']
634         self.assertEqual(users[0][0], 0)
635         sessions = self.statistics['/nat44-ei/total-sessions']
636         self.assertEqual(sessions[0][0], 0)
637
638     def verify_syslog_apmap(self, data, is_add=True):
639         message = data.decode('utf-8')
640         try:
641             message = SyslogMessage.parse(message)
642         except ParseError as e:
643             self.logger.error(e)
644             raise
645         else:
646             self.assertEqual(message.severity, SyslogSeverity.info)
647             self.assertEqual(message.appname, 'NAT')
648             self.assertEqual(message.msgid, 'APMADD' if is_add else 'APMDEL')
649             sd_params = message.sd.get('napmap')
650             self.assertTrue(sd_params is not None)
651             self.assertEqual(sd_params.get('IATYP'), 'IPv4')
652             self.assertEqual(sd_params.get('ISADDR'), self.pg0.remote_ip4)
653             self.assertEqual(sd_params.get('ISPORT'), "%d" % self.tcp_port_in)
654             self.assertEqual(sd_params.get('XATYP'), 'IPv4')
655             self.assertEqual(sd_params.get('XSADDR'), self.nat_addr)
656             self.assertEqual(sd_params.get('XSPORT'), "%d" % self.tcp_port_out)
657             self.assertEqual(sd_params.get('PROTO'), "%d" % IP_PROTOS.tcp)
658             self.assertTrue(sd_params.get('SSUBIX') is not None)
659             self.assertEqual(sd_params.get('SVLAN'), '0')
660
661     def verify_mss_value(self, pkt, mss):
662         if not pkt.haslayer(IP) or not pkt.haslayer(TCP):
663             raise TypeError("Not a TCP/IP packet")
664
665         for option in pkt[TCP].options:
666             if option[0] == 'MSS':
667                 self.assertEqual(option[1], mss)
668                 self.assert_tcp_checksum_valid(pkt)
669
670     @staticmethod
671     def proto2layer(proto):
672         if proto == IP_PROTOS.tcp:
673             return TCP
674         elif proto == IP_PROTOS.udp:
675             return UDP
676         elif proto == IP_PROTOS.icmp:
677             return ICMP
678         else:
679             raise Exception("Unsupported protocol")
680
681     def frag_in_order(self, proto=IP_PROTOS.tcp, dont_translate=False,
682                       ignore_port=False):
683         layer = self.proto2layer(proto)
684
685         if proto == IP_PROTOS.tcp:
686             data = b"A" * 4 + b"B" * 16 + b"C" * 3
687         else:
688             data = b"A" * 16 + b"B" * 16 + b"C" * 3
689         self.port_in = random.randint(1025, 65535)
690
691         # in2out
692         pkts = self.create_stream_frag(self.pg0, self.pg1.remote_ip4,
693                                        self.port_in, 20, data, proto)
694         self.pg0.add_stream(pkts)
695         self.pg_enable_capture(self.pg_interfaces)
696         self.pg_start()
697         frags = self.pg1.get_capture(len(pkts))
698         if not dont_translate:
699             p = self.reass_frags_and_verify(frags,
700                                             self.nat_addr,
701                                             self.pg1.remote_ip4)
702         else:
703             p = self.reass_frags_and_verify(frags,
704                                             self.pg0.remote_ip4,
705                                             self.pg1.remote_ip4)
706         if proto != IP_PROTOS.icmp:
707             if not dont_translate:
708                 self.assertEqual(p[layer].dport, 20)
709                 if not ignore_port:
710                     self.assertNotEqual(p[layer].sport, self.port_in)
711             else:
712                 self.assertEqual(p[layer].sport, self.port_in)
713         else:
714             if not ignore_port:
715                 if not dont_translate:
716                     self.assertNotEqual(p[layer].id, self.port_in)
717                 else:
718                     self.assertEqual(p[layer].id, self.port_in)
719         self.assertEqual(data, p[Raw].load)
720
721         # out2in
722         if not dont_translate:
723             dst_addr = self.nat_addr
724         else:
725             dst_addr = self.pg0.remote_ip4
726         if proto != IP_PROTOS.icmp:
727             sport = 20
728             dport = p[layer].sport
729         else:
730             sport = p[layer].id
731             dport = 0
732         pkts = self.create_stream_frag(self.pg1, dst_addr, sport, dport, data,
733                                        proto, echo_reply=True)
734         self.pg1.add_stream(pkts)
735         self.pg_enable_capture(self.pg_interfaces)
736         self.pg_start()
737         frags = self.pg0.get_capture(len(pkts))
738         p = self.reass_frags_and_verify(frags,
739                                         self.pg1.remote_ip4,
740                                         self.pg0.remote_ip4)
741         if proto != IP_PROTOS.icmp:
742             self.assertEqual(p[layer].sport, 20)
743             self.assertEqual(p[layer].dport, self.port_in)
744         else:
745             self.assertEqual(p[layer].id, self.port_in)
746         self.assertEqual(data, p[Raw].load)
747
748     def reass_hairpinning(self, server_addr, server_in_port, server_out_port,
749                           host_in_port, proto=IP_PROTOS.tcp,
750                           ignore_port=False):
751
752         layer = self.proto2layer(proto)
753
754         if proto == IP_PROTOS.tcp:
755             data = b"A" * 4 + b"B" * 16 + b"C" * 3
756         else:
757             data = b"A" * 16 + b"B" * 16 + b"C" * 3
758
759         # send packet from host to server
760         pkts = self.create_stream_frag(self.pg0,
761                                        self.nat_addr,
762                                        host_in_port,
763                                        server_out_port,
764                                        data,
765                                        proto)
766         self.pg0.add_stream(pkts)
767         self.pg_enable_capture(self.pg_interfaces)
768         self.pg_start()
769         frags = self.pg0.get_capture(len(pkts))
770         p = self.reass_frags_and_verify(frags,
771                                         self.nat_addr,
772                                         server_addr)
773         if proto != IP_PROTOS.icmp:
774             if not ignore_port:
775                 self.assertNotEqual(p[layer].sport, host_in_port)
776             self.assertEqual(p[layer].dport, server_in_port)
777         else:
778             if not ignore_port:
779                 self.assertNotEqual(p[layer].id, host_in_port)
780         self.assertEqual(data, p[Raw].load)
781
782     def frag_out_of_order(self, proto=IP_PROTOS.tcp, dont_translate=False,
783                           ignore_port=False):
784         layer = self.proto2layer(proto)
785
786         if proto == IP_PROTOS.tcp:
787             data = b"A" * 4 + b"B" * 16 + b"C" * 3
788         else:
789             data = b"A" * 16 + b"B" * 16 + b"C" * 3
790         self.port_in = random.randint(1025, 65535)
791
792         for i in range(2):
793             # in2out
794             pkts = self.create_stream_frag(self.pg0, self.pg1.remote_ip4,
795                                            self.port_in, 20, data, proto)
796             pkts.reverse()
797             self.pg0.add_stream(pkts)
798             self.pg_enable_capture(self.pg_interfaces)
799             self.pg_start()
800             frags = self.pg1.get_capture(len(pkts))
801             if not dont_translate:
802                 p = self.reass_frags_and_verify(frags,
803                                                 self.nat_addr,
804                                                 self.pg1.remote_ip4)
805             else:
806                 p = self.reass_frags_and_verify(frags,
807                                                 self.pg0.remote_ip4,
808                                                 self.pg1.remote_ip4)
809             if proto != IP_PROTOS.icmp:
810                 if not dont_translate:
811                     self.assertEqual(p[layer].dport, 20)
812                     if not ignore_port:
813                         self.assertNotEqual(p[layer].sport, self.port_in)
814                 else:
815                     self.assertEqual(p[layer].sport, self.port_in)
816             else:
817                 if not ignore_port:
818                     if not dont_translate:
819                         self.assertNotEqual(p[layer].id, self.port_in)
820                     else:
821                         self.assertEqual(p[layer].id, self.port_in)
822             self.assertEqual(data, p[Raw].load)
823
824             # out2in
825             if not dont_translate:
826                 dst_addr = self.nat_addr
827             else:
828                 dst_addr = self.pg0.remote_ip4
829             if proto != IP_PROTOS.icmp:
830                 sport = 20
831                 dport = p[layer].sport
832             else:
833                 sport = p[layer].id
834                 dport = 0
835             pkts = self.create_stream_frag(self.pg1, dst_addr, sport, dport,
836                                            data, proto, echo_reply=True)
837             pkts.reverse()
838             self.pg1.add_stream(pkts)
839             self.pg_enable_capture(self.pg_interfaces)
840             self.pg_start()
841             frags = self.pg0.get_capture(len(pkts))
842             p = self.reass_frags_and_verify(frags,
843                                             self.pg1.remote_ip4,
844                                             self.pg0.remote_ip4)
845             if proto != IP_PROTOS.icmp:
846                 self.assertEqual(p[layer].sport, 20)
847                 self.assertEqual(p[layer].dport, self.port_in)
848             else:
849                 self.assertEqual(p[layer].id, self.port_in)
850             self.assertEqual(data, p[Raw].load)
851
852
853 def get_nat44_ei_in2out_worker_index(ip, vpp_worker_count):
854     if 0 == vpp_worker_count:
855         return 0
856     numeric = socket.inet_aton(ip)
857     numeric = struct.unpack("!L", numeric)[0]
858     numeric = socket.htonl(numeric)
859     h = numeric + (numeric >> 8) + (numeric >> 16) + (numeric >> 24)
860     return 1 + h % vpp_worker_count
861
862
863 class TestNAT44EI(MethodHolder):
864     """ NAT44EI Test Cases """
865
866     max_translations = 10240
867     max_users = 10240
868
869     @classmethod
870     def setUpClass(cls):
871         super(TestNAT44EI, cls).setUpClass()
872         cls.vapi.cli("set log class nat44-ei level debug")
873
874         cls.tcp_port_in = 6303
875         cls.tcp_port_out = 6303
876         cls.udp_port_in = 6304
877         cls.udp_port_out = 6304
878         cls.icmp_id_in = 6305
879         cls.icmp_id_out = 6305
880         cls.nat_addr = '10.0.0.3'
881         cls.ipfix_src_port = 4739
882         cls.ipfix_domain_id = 1
883         cls.tcp_external_port = 80
884         cls.udp_external_port = 69
885
886         cls.create_pg_interfaces(range(10))
887         cls.interfaces = list(cls.pg_interfaces[0:4])
888
889         for i in cls.interfaces:
890             i.admin_up()
891             i.config_ip4()
892             i.resolve_arp()
893
894         cls.pg0.generate_remote_hosts(3)
895         cls.pg0.configure_ipv4_neighbors()
896
897         cls.pg1.generate_remote_hosts(1)
898         cls.pg1.configure_ipv4_neighbors()
899
900         cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
901         cls.vapi.ip_table_add_del(is_add=1, table={'table_id': 10})
902         cls.vapi.ip_table_add_del(is_add=1, table={'table_id': 20})
903
904         cls.pg4._local_ip4 = "172.16.255.1"
905         cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
906         cls.pg4.set_table_ip4(10)
907         cls.pg5._local_ip4 = "172.17.255.3"
908         cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
909         cls.pg5.set_table_ip4(10)
910         cls.pg6._local_ip4 = "172.16.255.1"
911         cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
912         cls.pg6.set_table_ip4(20)
913         for i in cls.overlapping_interfaces:
914             i.config_ip4()
915             i.admin_up()
916             i.resolve_arp()
917
918         cls.pg7.admin_up()
919         cls.pg8.admin_up()
920
921         cls.pg9.generate_remote_hosts(2)
922         cls.pg9.config_ip4()
923         cls.vapi.sw_interface_add_del_address(
924             sw_if_index=cls.pg9.sw_if_index,
925             prefix="10.0.0.1/24")
926
927         cls.pg9.admin_up()
928         cls.pg9.resolve_arp()
929         cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
930         cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
931         cls.pg9.resolve_arp()
932
933     def plugin_enable(self):
934         self.vapi.nat44_ei_plugin_enable_disable(
935             sessions=self.max_translations,
936             users=self.max_users, enable=1)
937
938     def setUp(self):
939         super(TestNAT44EI, self).setUp()
940         self.plugin_enable()
941
942     def tearDown(self):
943         super(TestNAT44EI, self).tearDown()
944         if not self.vpp_dead:
945             self.vapi.nat44_ei_ipfix_enable_disable(
946                 domain_id=self.ipfix_domain_id, src_port=self.ipfix_src_port,
947                 enable=0)
948             self.ipfix_src_port = 4739
949             self.ipfix_domain_id = 1
950
951             self.vapi.nat44_ei_plugin_enable_disable(enable=0)
952             self.vapi.cli("clear logging")
953
954     def test_clear_sessions(self):
955         """ NAT44EI session clearing test """
956
957         self.nat44_add_address(self.nat_addr)
958         flags = self.config_flags.NAT44_EI_IF_INSIDE
959         self.vapi.nat44_ei_interface_add_del_feature(
960             sw_if_index=self.pg0.sw_if_index,
961             flags=flags, is_add=1)
962         self.vapi.nat44_ei_interface_add_del_feature(
963             sw_if_index=self.pg1.sw_if_index,
964             is_add=1)
965
966         pkts = self.create_stream_in(self.pg0, self.pg1)
967         self.pg0.add_stream(pkts)
968         self.pg_enable_capture(self.pg_interfaces)
969         self.pg_start()
970         capture = self.pg1.get_capture(len(pkts))
971         self.verify_capture_out(capture)
972
973         sessions = self.statistics['/nat44-ei/total-sessions']
974         self.assertGreater(sessions[:, 0].sum(), 0, "Session count invalid")
975         self.logger.info("sessions before clearing: %s" % sessions[0][0])
976
977         self.vapi.cli("clear nat44 ei sessions")
978
979         sessions = self.statistics['/nat44-ei/total-sessions']
980         self.assertEqual(sessions[:, 0].sum(), 0, "Session count invalid")
981         self.logger.info("sessions after clearing: %s" % sessions[0][0])
982
983     def test_dynamic(self):
984         """ NAT44EI dynamic translation test """
985         self.nat44_add_address(self.nat_addr)
986         flags = self.config_flags.NAT44_EI_IF_INSIDE
987         self.vapi.nat44_ei_interface_add_del_feature(
988             sw_if_index=self.pg0.sw_if_index,
989             flags=flags, is_add=1)
990         self.vapi.nat44_ei_interface_add_del_feature(
991             sw_if_index=self.pg1.sw_if_index,
992             is_add=1)
993
994         # in2out
995         tcpn = self.statistics['/nat44-ei/in2out/slowpath/tcp']
996         udpn = self.statistics['/nat44-ei/in2out/slowpath/udp']
997         icmpn = self.statistics['/nat44-ei/in2out/slowpath/icmp']
998         drops = self.statistics['/nat44-ei/in2out/slowpath/drops']
999
1000         pkts = self.create_stream_in(self.pg0, self.pg1)
1001         self.pg0.add_stream(pkts)
1002         self.pg_enable_capture(self.pg_interfaces)
1003         self.pg_start()
1004         capture = self.pg1.get_capture(len(pkts))
1005         self.verify_capture_out(capture)
1006
1007         if_idx = self.pg0.sw_if_index
1008         cnt = self.statistics['/nat44-ei/in2out/slowpath/tcp']
1009         self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2)
1010         cnt = self.statistics['/nat44-ei/in2out/slowpath/udp']
1011         self.assertEqual(cnt[:, if_idx].sum() - udpn[:, if_idx].sum(), 1)
1012         cnt = self.statistics['/nat44-ei/in2out/slowpath/icmp']
1013         self.assertEqual(cnt[:, if_idx].sum() - icmpn[:, if_idx].sum(), 1)
1014         cnt = self.statistics['/nat44-ei/in2out/slowpath/drops']
1015         self.assertEqual(cnt[:, if_idx].sum() - drops[:, if_idx].sum(), 0)
1016
1017         # out2in
1018         tcpn = self.statistics['/nat44-ei/out2in/slowpath/tcp']
1019         udpn = self.statistics['/nat44-ei/out2in/slowpath/udp']
1020         icmpn = self.statistics['/nat44-ei/out2in/slowpath/icmp']
1021         drops = self.statistics['/nat44-ei/out2in/slowpath/drops']
1022
1023         pkts = self.create_stream_out(self.pg1)
1024         self.pg1.add_stream(pkts)
1025         self.pg_enable_capture(self.pg_interfaces)
1026         self.pg_start()
1027         capture = self.pg0.get_capture(len(pkts))
1028         self.verify_capture_in(capture, self.pg0)
1029
1030         if_idx = self.pg1.sw_if_index
1031         cnt = self.statistics['/nat44-ei/out2in/slowpath/tcp']
1032         self.assertEqual(cnt[:, if_idx].sum() - tcpn[:, if_idx].sum(), 2)
1033         cnt = self.statistics['/nat44-ei/out2in/slowpath/udp']
1034         self.assertEqual(cnt[:, if_idx].sum() - udpn[:, if_idx].sum(), 1)
1035         cnt = self.statistics['/nat44-ei/out2in/slowpath/icmp']
1036         self.assertEqual(cnt[:, if_idx].sum() - icmpn[:, if_idx].sum(), 1)
1037         cnt = self.statistics['/nat44-ei/out2in/slowpath/drops']
1038         self.assertEqual(cnt[:, if_idx].sum() - drops[:, if_idx].sum(), 0)
1039
1040         users = self.statistics['/nat44-ei/total-users']
1041         self.assertEqual(users[:, 0].sum(), 1)
1042         sessions = self.statistics['/nat44-ei/total-sessions']
1043         self.assertEqual(sessions[:, 0].sum(), 3)
1044
1045     def test_dynamic_icmp_errors_in2out_ttl_1(self):
1046         """ NAT44EI handling of client packets with TTL=1 """
1047
1048         self.nat44_add_address(self.nat_addr)
1049         flags = self.config_flags.NAT44_EI_IF_INSIDE
1050         self.vapi.nat44_ei_interface_add_del_feature(
1051             sw_if_index=self.pg0.sw_if_index,
1052             flags=flags, is_add=1)
1053         self.vapi.nat44_ei_interface_add_del_feature(
1054             sw_if_index=self.pg1.sw_if_index,
1055             is_add=1)
1056
1057         # Client side - generate traffic
1058         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
1059         self.pg0.add_stream(pkts)
1060         self.pg_enable_capture(self.pg_interfaces)
1061         self.pg_start()
1062
1063         # Client side - verify ICMP type 11 packets
1064         capture = self.pg0.get_capture(len(pkts))
1065         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1066
1067     def test_dynamic_icmp_errors_out2in_ttl_1(self):
1068         """ NAT44EI handling of server packets with TTL=1 """
1069
1070         self.nat44_add_address(self.nat_addr)
1071         flags = self.config_flags.NAT44_EI_IF_INSIDE
1072         self.vapi.nat44_ei_interface_add_del_feature(
1073             sw_if_index=self.pg0.sw_if_index,
1074             flags=flags, is_add=1)
1075         self.vapi.nat44_ei_interface_add_del_feature(
1076             sw_if_index=self.pg1.sw_if_index,
1077             is_add=1)
1078
1079         # Client side - create sessions
1080         pkts = self.create_stream_in(self.pg0, self.pg1)
1081         self.pg0.add_stream(pkts)
1082         self.pg_enable_capture(self.pg_interfaces)
1083         self.pg_start()
1084
1085         # Server side - generate traffic
1086         capture = self.pg1.get_capture(len(pkts))
1087         self.verify_capture_out(capture)
1088         pkts = self.create_stream_out(self.pg1, ttl=1)
1089         self.pg1.add_stream(pkts)
1090         self.pg_enable_capture(self.pg_interfaces)
1091         self.pg_start()
1092
1093         # Server side - verify ICMP type 11 packets
1094         capture = self.pg1.get_capture(len(pkts))
1095         self.verify_capture_out_with_icmp_errors(capture,
1096                                                  src_ip=self.pg1.local_ip4)
1097
1098     def test_dynamic_icmp_errors_in2out_ttl_2(self):
1099         """ NAT44EI handling of error responses to client packets with TTL=2
1100         """
1101
1102         self.nat44_add_address(self.nat_addr)
1103         flags = self.config_flags.NAT44_EI_IF_INSIDE
1104         self.vapi.nat44_ei_interface_add_del_feature(
1105             sw_if_index=self.pg0.sw_if_index,
1106             flags=flags, is_add=1)
1107         self.vapi.nat44_ei_interface_add_del_feature(
1108             sw_if_index=self.pg1.sw_if_index,
1109             is_add=1)
1110
1111         # Client side - generate traffic
1112         pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
1113         self.pg0.add_stream(pkts)
1114         self.pg_enable_capture(self.pg_interfaces)
1115         self.pg_start()
1116
1117         # Server side - simulate ICMP type 11 response
1118         capture = self.pg1.get_capture(len(pkts))
1119         pkts = [Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1120                 IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
1121                 ICMP(type=11) / packet[IP] for packet in capture]
1122         self.pg1.add_stream(pkts)
1123         self.pg_enable_capture(self.pg_interfaces)
1124         self.pg_start()
1125
1126         # Client side - verify ICMP type 11 packets
1127         capture = self.pg0.get_capture(len(pkts))
1128         self.verify_capture_in_with_icmp_errors(capture, self.pg0)
1129
1130     def test_dynamic_icmp_errors_out2in_ttl_2(self):
1131         """ NAT44EI handling of error responses to server packets with TTL=2
1132         """
1133
1134         self.nat44_add_address(self.nat_addr)
1135         flags = self.config_flags.NAT44_EI_IF_INSIDE
1136         self.vapi.nat44_ei_interface_add_del_feature(
1137             sw_if_index=self.pg0.sw_if_index,
1138             flags=flags, is_add=1)
1139         self.vapi.nat44_ei_interface_add_del_feature(
1140             sw_if_index=self.pg1.sw_if_index,
1141             is_add=1)
1142
1143         # Client side - create sessions
1144         pkts = self.create_stream_in(self.pg0, self.pg1)
1145         self.pg0.add_stream(pkts)
1146         self.pg_enable_capture(self.pg_interfaces)
1147         self.pg_start()
1148
1149         # Server side - generate traffic
1150         capture = self.pg1.get_capture(len(pkts))
1151         self.verify_capture_out(capture)
1152         pkts = self.create_stream_out(self.pg1, ttl=2)
1153         self.pg1.add_stream(pkts)
1154         self.pg_enable_capture(self.pg_interfaces)
1155         self.pg_start()
1156
1157         # Client side - simulate ICMP type 11 response
1158         capture = self.pg0.get_capture(len(pkts))
1159         pkts = [Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1160                 IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
1161                 ICMP(type=11) / packet[IP] for packet in capture]
1162         self.pg0.add_stream(pkts)
1163         self.pg_enable_capture(self.pg_interfaces)
1164         self.pg_start()
1165
1166         # Server side - verify ICMP type 11 packets
1167         capture = self.pg1.get_capture(len(pkts))
1168         self.verify_capture_out_with_icmp_errors(capture)
1169
1170     def test_ping_out_interface_from_outside(self):
1171         """ NAT44EI ping out interface from outside network """
1172
1173         self.nat44_add_address(self.nat_addr)
1174         flags = self.config_flags.NAT44_EI_IF_INSIDE
1175         self.vapi.nat44_ei_interface_add_del_feature(
1176             sw_if_index=self.pg0.sw_if_index,
1177             flags=flags, is_add=1)
1178         self.vapi.nat44_ei_interface_add_del_feature(
1179             sw_if_index=self.pg1.sw_if_index,
1180             is_add=1)
1181
1182         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1183              IP(src=self.pg1.remote_ip4, dst=self.pg1.local_ip4) /
1184              ICMP(id=self.icmp_id_out, type='echo-request'))
1185         pkts = [p]
1186         self.pg1.add_stream(pkts)
1187         self.pg_enable_capture(self.pg_interfaces)
1188         self.pg_start()
1189         capture = self.pg1.get_capture(len(pkts))
1190         packet = capture[0]
1191         try:
1192             self.assertEqual(packet[IP].src, self.pg1.local_ip4)
1193             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
1194             self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1195             self.assertEqual(packet[ICMP].type, 0)  # echo reply
1196         except:
1197             self.logger.error(ppp("Unexpected or invalid packet "
1198                                   "(outside network):", packet))
1199             raise
1200
1201     def test_ping_internal_host_from_outside(self):
1202         """ NAT44EI ping internal host from outside network """
1203
1204         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr)
1205         flags = self.config_flags.NAT44_EI_IF_INSIDE
1206         self.vapi.nat44_ei_interface_add_del_feature(
1207             sw_if_index=self.pg0.sw_if_index,
1208             flags=flags, is_add=1)
1209         self.vapi.nat44_ei_interface_add_del_feature(
1210             sw_if_index=self.pg1.sw_if_index,
1211             is_add=1)
1212
1213         # out2in
1214         pkt = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
1215                IP(src=self.pg1.remote_ip4, dst=self.nat_addr, ttl=64) /
1216                ICMP(id=self.icmp_id_out, type='echo-request'))
1217         self.pg1.add_stream(pkt)
1218         self.pg_enable_capture(self.pg_interfaces)
1219         self.pg_start()
1220         capture = self.pg0.get_capture(1)
1221         self.verify_capture_in(capture, self.pg0)
1222         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1223
1224         # in2out
1225         pkt = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1226                IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4, ttl=64) /
1227                ICMP(id=self.icmp_id_in, type='echo-reply'))
1228         self.pg0.add_stream(pkt)
1229         self.pg_enable_capture(self.pg_interfaces)
1230         self.pg_start()
1231         capture = self.pg1.get_capture(1)
1232         self.verify_capture_out(capture, same_port=True)
1233         self.assert_equal(capture[0][IP].proto, IP_PROTOS.icmp)
1234
1235     def test_forwarding(self):
1236         """ NAT44EI forwarding test """
1237
1238         flags = self.config_flags.NAT44_EI_IF_INSIDE
1239         self.vapi.nat44_ei_interface_add_del_feature(
1240             sw_if_index=self.pg0.sw_if_index,
1241             flags=flags, is_add=1)
1242         self.vapi.nat44_ei_interface_add_del_feature(
1243             sw_if_index=self.pg1.sw_if_index,
1244             is_add=1)
1245         self.vapi.nat44_ei_forwarding_enable_disable(enable=1)
1246
1247         real_ip = self.pg0.remote_ip4
1248         alias_ip = self.nat_addr
1249         flags = self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
1250         self.vapi.nat44_ei_add_del_static_mapping(
1251             is_add=1, local_ip_address=real_ip,
1252             external_ip_address=alias_ip,
1253             external_sw_if_index=0xFFFFFFFF,
1254             flags=flags)
1255
1256         try:
1257             # static mapping match
1258
1259             pkts = self.create_stream_out(self.pg1)
1260             self.pg1.add_stream(pkts)
1261             self.pg_enable_capture(self.pg_interfaces)
1262             self.pg_start()
1263             capture = self.pg0.get_capture(len(pkts))
1264             self.verify_capture_in(capture, self.pg0)
1265
1266             pkts = self.create_stream_in(self.pg0, self.pg1)
1267             self.pg0.add_stream(pkts)
1268             self.pg_enable_capture(self.pg_interfaces)
1269             self.pg_start()
1270             capture = self.pg1.get_capture(len(pkts))
1271             self.verify_capture_out(capture, same_port=True)
1272
1273             # no static mapping match
1274
1275             host0 = self.pg0.remote_hosts[0]
1276             self.pg0.remote_hosts[0] = self.pg0.remote_hosts[1]
1277             try:
1278                 pkts = self.create_stream_out(self.pg1,
1279                                               dst_ip=self.pg0.remote_ip4,
1280                                               use_inside_ports=True)
1281                 self.pg1.add_stream(pkts)
1282                 self.pg_enable_capture(self.pg_interfaces)
1283                 self.pg_start()
1284                 capture = self.pg0.get_capture(len(pkts))
1285                 self.verify_capture_in(capture, self.pg0)
1286
1287                 pkts = self.create_stream_in(self.pg0, self.pg1)
1288                 self.pg0.add_stream(pkts)
1289                 self.pg_enable_capture(self.pg_interfaces)
1290                 self.pg_start()
1291                 capture = self.pg1.get_capture(len(pkts))
1292                 self.verify_capture_out(capture, nat_ip=self.pg0.remote_ip4,
1293                                         same_port=True)
1294             finally:
1295                 self.pg0.remote_hosts[0] = host0
1296
1297         finally:
1298             self.vapi.nat44_ei_forwarding_enable_disable(enable=0)
1299             flags = self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
1300             self.vapi.nat44_ei_add_del_static_mapping(
1301                 is_add=0,
1302                 local_ip_address=real_ip,
1303                 external_ip_address=alias_ip,
1304                 external_sw_if_index=0xFFFFFFFF,
1305                 flags=flags)
1306
1307     def test_static_in(self):
1308         """ NAT44EI 1:1 NAT initialized from inside network """
1309
1310         nat_ip = "10.0.0.10"
1311         self.tcp_port_out = 6303
1312         self.udp_port_out = 6304
1313         self.icmp_id_out = 6305
1314
1315         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1316         flags = self.config_flags.NAT44_EI_IF_INSIDE
1317         self.vapi.nat44_ei_interface_add_del_feature(
1318             sw_if_index=self.pg0.sw_if_index,
1319             flags=flags, is_add=1)
1320         self.vapi.nat44_ei_interface_add_del_feature(
1321             sw_if_index=self.pg1.sw_if_index,
1322             is_add=1)
1323         sm = self.vapi.nat44_ei_static_mapping_dump()
1324         self.assertEqual(len(sm), 1)
1325         self.assertEqual(sm[0].tag, '')
1326         self.assertEqual(sm[0].protocol, 0)
1327         self.assertEqual(sm[0].local_port, 0)
1328         self.assertEqual(sm[0].external_port, 0)
1329
1330         # in2out
1331         pkts = self.create_stream_in(self.pg0, self.pg1)
1332         self.pg0.add_stream(pkts)
1333         self.pg_enable_capture(self.pg_interfaces)
1334         self.pg_start()
1335         capture = self.pg1.get_capture(len(pkts))
1336         self.verify_capture_out(capture, nat_ip, True)
1337
1338         # out2in
1339         pkts = self.create_stream_out(self.pg1, nat_ip)
1340         self.pg1.add_stream(pkts)
1341         self.pg_enable_capture(self.pg_interfaces)
1342         self.pg_start()
1343         capture = self.pg0.get_capture(len(pkts))
1344         self.verify_capture_in(capture, self.pg0)
1345
1346     def test_static_out(self):
1347         """ NAT44EI 1:1 NAT initialized from outside network """
1348
1349         nat_ip = "10.0.0.20"
1350         self.tcp_port_out = 6303
1351         self.udp_port_out = 6304
1352         self.icmp_id_out = 6305
1353         tag = "testTAG"
1354
1355         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip, tag=tag)
1356         flags = self.config_flags.NAT44_EI_IF_INSIDE
1357         self.vapi.nat44_ei_interface_add_del_feature(
1358             sw_if_index=self.pg0.sw_if_index,
1359             flags=flags, is_add=1)
1360         self.vapi.nat44_ei_interface_add_del_feature(
1361             sw_if_index=self.pg1.sw_if_index,
1362             is_add=1)
1363         sm = self.vapi.nat44_ei_static_mapping_dump()
1364         self.assertEqual(len(sm), 1)
1365         self.assertEqual(sm[0].tag, tag)
1366
1367         # out2in
1368         pkts = self.create_stream_out(self.pg1, nat_ip)
1369         self.pg1.add_stream(pkts)
1370         self.pg_enable_capture(self.pg_interfaces)
1371         self.pg_start()
1372         capture = self.pg0.get_capture(len(pkts))
1373         self.verify_capture_in(capture, self.pg0)
1374
1375         # in2out
1376         pkts = self.create_stream_in(self.pg0, self.pg1)
1377         self.pg0.add_stream(pkts)
1378         self.pg_enable_capture(self.pg_interfaces)
1379         self.pg_start()
1380         capture = self.pg1.get_capture(len(pkts))
1381         self.verify_capture_out(capture, nat_ip, True)
1382
1383     def test_static_with_port_in(self):
1384         """ NAT44EI 1:1 NAPT initialized from inside network """
1385
1386         self.tcp_port_out = 3606
1387         self.udp_port_out = 3607
1388         self.icmp_id_out = 3608
1389
1390         self.nat44_add_address(self.nat_addr)
1391         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1392                                       self.tcp_port_in, self.tcp_port_out,
1393                                       proto=IP_PROTOS.tcp)
1394         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1395                                       self.udp_port_in, self.udp_port_out,
1396                                       proto=IP_PROTOS.udp)
1397         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1398                                       self.icmp_id_in, self.icmp_id_out,
1399                                       proto=IP_PROTOS.icmp)
1400         flags = self.config_flags.NAT44_EI_IF_INSIDE
1401         self.vapi.nat44_ei_interface_add_del_feature(
1402             sw_if_index=self.pg0.sw_if_index,
1403             flags=flags, is_add=1)
1404         self.vapi.nat44_ei_interface_add_del_feature(
1405             sw_if_index=self.pg1.sw_if_index,
1406             is_add=1)
1407
1408         # in2out
1409         pkts = self.create_stream_in(self.pg0, self.pg1)
1410         self.pg0.add_stream(pkts)
1411         self.pg_enable_capture(self.pg_interfaces)
1412         self.pg_start()
1413         capture = self.pg1.get_capture(len(pkts))
1414         self.verify_capture_out(capture)
1415
1416         # out2in
1417         pkts = self.create_stream_out(self.pg1)
1418         self.pg1.add_stream(pkts)
1419         self.pg_enable_capture(self.pg_interfaces)
1420         self.pg_start()
1421         capture = self.pg0.get_capture(len(pkts))
1422         self.verify_capture_in(capture, self.pg0)
1423
1424     def test_static_with_port_out(self):
1425         """ NAT44EI 1:1 NAPT initialized from outside network """
1426
1427         self.tcp_port_out = 30606
1428         self.udp_port_out = 30607
1429         self.icmp_id_out = 30608
1430
1431         self.nat44_add_address(self.nat_addr)
1432         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1433                                       self.tcp_port_in, self.tcp_port_out,
1434                                       proto=IP_PROTOS.tcp)
1435         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1436                                       self.udp_port_in, self.udp_port_out,
1437                                       proto=IP_PROTOS.udp)
1438         self.nat44_add_static_mapping(self.pg0.remote_ip4, self.nat_addr,
1439                                       self.icmp_id_in, self.icmp_id_out,
1440                                       proto=IP_PROTOS.icmp)
1441         flags = self.config_flags.NAT44_EI_IF_INSIDE
1442         self.vapi.nat44_ei_interface_add_del_feature(
1443             sw_if_index=self.pg0.sw_if_index,
1444             flags=flags, is_add=1)
1445         self.vapi.nat44_ei_interface_add_del_feature(
1446             sw_if_index=self.pg1.sw_if_index,
1447             is_add=1)
1448
1449         # out2in
1450         pkts = self.create_stream_out(self.pg1)
1451         self.pg1.add_stream(pkts)
1452         self.pg_enable_capture(self.pg_interfaces)
1453         self.pg_start()
1454         capture = self.pg0.get_capture(len(pkts))
1455         self.verify_capture_in(capture, self.pg0)
1456
1457         # in2out
1458         pkts = self.create_stream_in(self.pg0, self.pg1)
1459         self.pg0.add_stream(pkts)
1460         self.pg_enable_capture(self.pg_interfaces)
1461         self.pg_start()
1462         capture = self.pg1.get_capture(len(pkts))
1463         self.verify_capture_out(capture)
1464
1465     def test_static_vrf_aware(self):
1466         """ NAT44EI 1:1 NAT VRF awareness """
1467
1468         nat_ip1 = "10.0.0.30"
1469         nat_ip2 = "10.0.0.40"
1470         self.tcp_port_out = 6303
1471         self.udp_port_out = 6304
1472         self.icmp_id_out = 6305
1473
1474         self.nat44_add_static_mapping(self.pg4.remote_ip4, nat_ip1,
1475                                       vrf_id=10)
1476         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip2,
1477                                       vrf_id=10)
1478         flags = self.config_flags.NAT44_EI_IF_INSIDE
1479         self.vapi.nat44_ei_interface_add_del_feature(
1480             sw_if_index=self.pg3.sw_if_index,
1481             is_add=1)
1482         self.vapi.nat44_ei_interface_add_del_feature(
1483             sw_if_index=self.pg0.sw_if_index,
1484             flags=flags, is_add=1)
1485         self.vapi.nat44_ei_interface_add_del_feature(
1486             sw_if_index=self.pg4.sw_if_index,
1487             flags=flags, is_add=1)
1488
1489         # inside interface VRF match NAT44EI static mapping VRF
1490         pkts = self.create_stream_in(self.pg4, self.pg3)
1491         self.pg4.add_stream(pkts)
1492         self.pg_enable_capture(self.pg_interfaces)
1493         self.pg_start()
1494         capture = self.pg3.get_capture(len(pkts))
1495         self.verify_capture_out(capture, nat_ip1, True)
1496
1497         # inside interface VRF don't match NAT44EI static mapping VRF (packets
1498         # are dropped)
1499         pkts = self.create_stream_in(self.pg0, self.pg3)
1500         self.pg0.add_stream(pkts)
1501         self.pg_enable_capture(self.pg_interfaces)
1502         self.pg_start()
1503         self.pg3.assert_nothing_captured()
1504
1505     def test_dynamic_to_static(self):
1506         """ NAT44EI Switch from dynamic translation to 1:1NAT """
1507         nat_ip = "10.0.0.10"
1508         self.tcp_port_out = 6303
1509         self.udp_port_out = 6304
1510         self.icmp_id_out = 6305
1511
1512         self.nat44_add_address(self.nat_addr)
1513         flags = self.config_flags.NAT44_EI_IF_INSIDE
1514         self.vapi.nat44_ei_interface_add_del_feature(
1515             sw_if_index=self.pg0.sw_if_index,
1516             flags=flags, is_add=1)
1517         self.vapi.nat44_ei_interface_add_del_feature(
1518             sw_if_index=self.pg1.sw_if_index,
1519             is_add=1)
1520
1521         # dynamic
1522         pkts = self.create_stream_in(self.pg0, self.pg1)
1523         self.pg0.add_stream(pkts)
1524         self.pg_enable_capture(self.pg_interfaces)
1525         self.pg_start()
1526         capture = self.pg1.get_capture(len(pkts))
1527         self.verify_capture_out(capture)
1528
1529         # 1:1NAT
1530         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
1531         sessions = self.vapi.nat44_ei_user_session_dump(self.pg0.remote_ip4, 0)
1532         self.assertEqual(len(sessions), 0)
1533         pkts = self.create_stream_in(self.pg0, self.pg1)
1534         self.pg0.add_stream(pkts)
1535         self.pg_enable_capture(self.pg_interfaces)
1536         self.pg_start()
1537         capture = self.pg1.get_capture(len(pkts))
1538         self.verify_capture_out(capture, nat_ip, True)
1539
1540     def test_identity_nat(self):
1541         """ NAT44EI Identity NAT """
1542         flags = self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
1543         self.vapi.nat44_ei_add_del_identity_mapping(
1544             ip_address=self.pg0.remote_ip4, sw_if_index=0xFFFFFFFF,
1545             flags=flags, is_add=1)
1546         flags = self.config_flags.NAT44_EI_IF_INSIDE
1547         self.vapi.nat44_ei_interface_add_del_feature(
1548             sw_if_index=self.pg0.sw_if_index,
1549             flags=flags, is_add=1)
1550         self.vapi.nat44_ei_interface_add_del_feature(
1551             sw_if_index=self.pg1.sw_if_index,
1552             is_add=1)
1553
1554         p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
1555              IP(src=self.pg1.remote_ip4, dst=self.pg0.remote_ip4) /
1556              TCP(sport=12345, dport=56789))
1557         self.pg1.add_stream(p)
1558         self.pg_enable_capture(self.pg_interfaces)
1559         self.pg_start()
1560         capture = self.pg0.get_capture(1)
1561         p = capture[0]
1562         try:
1563             ip = p[IP]
1564             tcp = p[TCP]
1565             self.assertEqual(ip.dst, self.pg0.remote_ip4)
1566             self.assertEqual(ip.src, self.pg1.remote_ip4)
1567             self.assertEqual(tcp.dport, 56789)
1568             self.assertEqual(tcp.sport, 12345)
1569             self.assert_packet_checksums_valid(p)
1570         except:
1571             self.logger.error(ppp("Unexpected or invalid packet:", p))
1572             raise
1573
1574         sessions = self.vapi.nat44_ei_user_session_dump(self.pg0.remote_ip4, 0)
1575         self.assertEqual(len(sessions), 0)
1576         flags = self.config_flags.NAT44_EI_ADDR_ONLY_MAPPING
1577         self.vapi.nat44_ei_add_del_identity_mapping(
1578             ip_address=self.pg0.remote_ip4, sw_if_index=0xFFFFFFFF,
1579             flags=flags, vrf_id=1, is_add=1)
1580         identity_mappings = self.vapi.nat44_ei_identity_mapping_dump()
1581         self.assertEqual(len(identity_mappings), 2)
1582
1583     def test_multiple_inside_interfaces(self):
1584         """ NAT44EI multiple non-overlapping address space inside interfaces
1585         """
1586
1587         self.nat44_add_address(self.nat_addr)
1588         flags = self.config_flags.NAT44_EI_IF_INSIDE
1589         self.vapi.nat44_ei_interface_add_del_feature(
1590             sw_if_index=self.pg0.sw_if_index,
1591             flags=flags, is_add=1)
1592         self.vapi.nat44_ei_interface_add_del_feature(
1593             sw_if_index=self.pg1.sw_if_index,
1594             flags=flags, is_add=1)
1595         self.vapi.nat44_ei_interface_add_del_feature(
1596             sw_if_index=self.pg3.sw_if_index,
1597             is_add=1)
1598
1599         # between two NAT44EI inside interfaces (no translation)
1600         pkts = self.create_stream_in(self.pg0, self.pg1)
1601         self.pg0.add_stream(pkts)
1602         self.pg_enable_capture(self.pg_interfaces)
1603         self.pg_start()
1604         capture = self.pg1.get_capture(len(pkts))
1605         self.verify_capture_no_translation(capture, self.pg0, self.pg1)
1606
1607         # from inside to interface without translation
1608         pkts = self.create_stream_in(self.pg0, self.pg2)
1609         self.pg0.add_stream(pkts)
1610         self.pg_enable_capture(self.pg_interfaces)
1611         self.pg_start()
1612         capture = self.pg2.get_capture(len(pkts))
1613         self.verify_capture_no_translation(capture, self.pg0, self.pg2)
1614
1615         # in2out 1st interface
1616         pkts = self.create_stream_in(self.pg0, self.pg3)
1617         self.pg0.add_stream(pkts)
1618         self.pg_enable_capture(self.pg_interfaces)
1619         self.pg_start()
1620         capture = self.pg3.get_capture(len(pkts))
1621         self.verify_capture_out(capture)
1622
1623         # out2in 1st interface
1624         pkts = self.create_stream_out(self.pg3)
1625         self.pg3.add_stream(pkts)
1626         self.pg_enable_capture(self.pg_interfaces)
1627         self.pg_start()
1628         capture = self.pg0.get_capture(len(pkts))
1629         self.verify_capture_in(capture, self.pg0)
1630
1631         # in2out 2nd interface
1632         pkts = self.create_stream_in(self.pg1, self.pg3)
1633         self.pg1.add_stream(pkts)
1634         self.pg_enable_capture(self.pg_interfaces)
1635         self.pg_start()
1636         capture = self.pg3.get_capture(len(pkts))
1637         self.verify_capture_out(capture)
1638
1639         # out2in 2nd interface
1640         pkts = self.create_stream_out(self.pg3)
1641         self.pg3.add_stream(pkts)
1642         self.pg_enable_capture(self.pg_interfaces)
1643         self.pg_start()
1644         capture = self.pg1.get_capture(len(pkts))
1645         self.verify_capture_in(capture, self.pg1)
1646
1647     def test_inside_overlapping_interfaces(self):
1648         """ NAT44EI multiple inside interfaces with overlapping address space
1649         """
1650
1651         static_nat_ip = "10.0.0.10"
1652         self.nat44_add_address(self.nat_addr)
1653         flags = self.config_flags.NAT44_EI_IF_INSIDE
1654         self.vapi.nat44_ei_interface_add_del_feature(
1655             sw_if_index=self.pg3.sw_if_index,
1656             is_add=1)
1657         self.vapi.nat44_ei_interface_add_del_feature(
1658             sw_if_index=self.pg4.sw_if_index,
1659             flags=flags, is_add=1)
1660         self.vapi.nat44_ei_interface_add_del_feature(
1661             sw_if_index=self.pg5.sw_if_index,
1662             flags=flags, is_add=1)
1663         self.vapi.nat44_ei_interface_add_del_feature(
1664             sw_if_index=self.pg6.sw_if_index,
1665             flags=flags, is_add=1)
1666         self.nat44_add_static_mapping(self.pg6.remote_ip4, static_nat_ip,
1667                                       vrf_id=20)
1668
1669         # between NAT44EI inside interfaces with same VRF (no translation)
1670         pkts = self.create_stream_in(self.pg4, self.pg5)
1671         self.pg4.add_stream(pkts)
1672         self.pg_enable_capture(self.pg_interfaces)
1673         self.pg_start()
1674         capture = self.pg5.get_capture(len(pkts))
1675         self.verify_capture_no_translation(capture, self.pg4, self.pg5)
1676
1677         # between NAT44EI inside interfaces with different VRF (hairpinning)
1678         p = (Ether(src=self.pg4.remote_mac, dst=self.pg4.local_mac) /
1679              IP(src=self.pg4.remote_ip4, dst=static_nat_ip) /
1680              TCP(sport=1234, dport=5678))
1681         self.pg4.add_stream(p)
1682         self.pg_enable_capture(self.pg_interfaces)
1683         self.pg_start()
1684         capture = self.pg6.get_capture(1)
1685         p = capture[0]
1686         try:
1687             ip = p[IP]
1688             tcp = p[TCP]
1689             self.assertEqual(ip.src, self.nat_addr)
1690             self.assertEqual(ip.dst, self.pg6.remote_ip4)
1691             self.assertNotEqual(tcp.sport, 1234)
1692             self.assertEqual(tcp.dport, 5678)
1693         except:
1694             self.logger.error(ppp("Unexpected or invalid packet:", p))
1695             raise
1696
1697         # in2out 1st interface
1698         pkts = self.create_stream_in(self.pg4, self.pg3)
1699         self.pg4.add_stream(pkts)
1700         self.pg_enable_capture(self.pg_interfaces)
1701         self.pg_start()
1702         capture = self.pg3.get_capture(len(pkts))
1703         self.verify_capture_out(capture)
1704
1705         # out2in 1st interface
1706         pkts = self.create_stream_out(self.pg3)
1707         self.pg3.add_stream(pkts)
1708         self.pg_enable_capture(self.pg_interfaces)
1709         self.pg_start()
1710         capture = self.pg4.get_capture(len(pkts))
1711         self.verify_capture_in(capture, self.pg4)
1712
1713         # in2out 2nd interface
1714         pkts = self.create_stream_in(self.pg5, self.pg3)
1715         self.pg5.add_stream(pkts)
1716         self.pg_enable_capture(self.pg_interfaces)
1717         self.pg_start()
1718         capture = self.pg3.get_capture(len(pkts))
1719         self.verify_capture_out(capture)
1720
1721         # out2in 2nd interface
1722         pkts = self.create_stream_out(self.pg3)
1723         self.pg3.add_stream(pkts)
1724         self.pg_enable_capture(self.pg_interfaces)
1725         self.pg_start()
1726         capture = self.pg5.get_capture(len(pkts))
1727         self.verify_capture_in(capture, self.pg5)
1728
1729         # pg5 session dump
1730         addresses = self.vapi.nat44_ei_address_dump()
1731         self.assertEqual(len(addresses), 1)
1732         sessions = self.vapi.nat44_ei_user_session_dump(
1733             self.pg5.remote_ip4, 10)
1734         self.assertEqual(len(sessions), 3)
1735         for session in sessions:
1736             self.assertFalse(session.flags &
1737                              self.config_flags.NAT44_EI_STATIC_MAPPING)
1738             self.assertEqual(str(session.inside_ip_address),
1739                              self.pg5.remote_ip4)
1740             self.assertEqual(session.outside_ip_address,
1741                              addresses[0].ip_address)
1742         self.assertEqual(sessions[0].protocol, IP_PROTOS.tcp)
1743         self.assertEqual(sessions[1].protocol, IP_PROTOS.udp)
1744         self.assertEqual(sessions[2].protocol, IP_PROTOS.icmp)
1745         self.assertEqual(sessions[0].inside_port, self.tcp_port_in)
1746         self.assertEqual(sessions[1].inside_port, self.udp_port_in)
1747         self.assertEqual(sessions[2].inside_port, self.icmp_id_in)
1748         self.assertEqual(sessions[0].outside_port, self.tcp_port_out)
1749         self.assertEqual(sessions[1].outside_port, self.udp_port_out)
1750         self.assertEqual(sessions[2].outside_port, self.icmp_id_out)
1751
1752         # in2out 3rd interface
1753         pkts = self.create_stream_in(self.pg6, self.pg3)
1754         self.pg6.add_stream(pkts)
1755         self.pg_enable_capture(self.pg_interfaces)
1756         self.pg_start()
1757         capture = self.pg3.get_capture(len(pkts))
1758         self.verify_capture_out(capture, static_nat_ip, True)
1759
1760         # out2in 3rd interface
1761         pkts = self.create_stream_out(self.pg3, static_nat_ip)
1762         self.pg3.add_stream(pkts)
1763         self.pg_enable_capture(self.pg_interfaces)
1764         self.pg_start()
1765         capture = self.pg6.get_capture(len(pkts))
1766         self.verify_capture_in(capture, self.pg6)
1767
1768         # general user and session dump verifications
1769         users = self.vapi.nat44_ei_user_dump()
1770         self.assertGreaterEqual(len(users), 3)
1771         addresses = self.vapi.nat44_ei_address_dump()
1772         self.assertEqual(len(addresses), 1)
1773         for user in users:
1774             sessions = self.vapi.nat44_ei_user_session_dump(user.ip_address,
1775                                                             user.vrf_id)
1776             for session in sessions:
1777                 self.assertEqual(user.ip_address, session.inside_ip_address)
1778                 self.assertTrue(session.total_bytes > session.total_pkts > 0)
1779                 self.assertTrue(session.protocol in
1780                                 [IP_PROTOS.tcp, IP_PROTOS.udp,
1781                                  IP_PROTOS.icmp])
1782
1783         # pg4 session dump
1784         sessions = self.vapi.nat44_ei_user_session_dump(
1785             self.pg4.remote_ip4, 10)
1786         self.assertGreaterEqual(len(sessions), 4)
1787         for session in sessions:
1788             self.assertFalse(
1789                 session.flags & self.config_flags.NAT44_EI_STATIC_MAPPING)
1790             self.assertEqual(str(session.inside_ip_address),
1791                              self.pg4.remote_ip4)
1792             self.assertEqual(session.outside_ip_address,
1793                              addresses[0].ip_address)
1794
1795         # pg6 session dump
1796         sessions = self.vapi.nat44_ei_user_session_dump(
1797             self.pg6.remote_ip4, 20)
1798         self.assertGreaterEqual(len(sessions), 3)
1799         for session in sessions:
1800             self.assertTrue(
1801                 session.flags & self.config_flags.NAT44_EI_STATIC_MAPPING)
1802             self.assertEqual(str(session.inside_ip_address),
1803                              self.pg6.remote_ip4)
1804             self.assertEqual(str(session.outside_ip_address),
1805                              static_nat_ip)
1806             self.assertTrue(session.inside_port in
1807                             [self.tcp_port_in, self.udp_port_in,
1808                              self.icmp_id_in])
1809
1810     def test_hairpinning(self):
1811         """ NAT44EI hairpinning - 1:1 NAPT """
1812
1813         host = self.pg0.remote_hosts[0]
1814         server = self.pg0.remote_hosts[1]
1815         host_in_port = 1234
1816         host_out_port = 0
1817         server_in_port = 5678
1818         server_out_port = 8765
1819
1820         self.nat44_add_address(self.nat_addr)
1821         flags = self.config_flags.NAT44_EI_IF_INSIDE
1822         self.vapi.nat44_ei_interface_add_del_feature(
1823             sw_if_index=self.pg0.sw_if_index,
1824             flags=flags, is_add=1)
1825         self.vapi.nat44_ei_interface_add_del_feature(
1826             sw_if_index=self.pg1.sw_if_index,
1827             is_add=1)
1828
1829         # add static mapping for server
1830         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
1831                                       server_in_port, server_out_port,
1832                                       proto=IP_PROTOS.tcp)
1833
1834         cnt = self.statistics['/nat44-ei/hairpinning']
1835         # send packet from host to server
1836         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
1837              IP(src=host.ip4, dst=self.nat_addr) /
1838              TCP(sport=host_in_port, dport=server_out_port))
1839         self.pg0.add_stream(p)
1840         self.pg_enable_capture(self.pg_interfaces)
1841         self.pg_start()
1842         capture = self.pg0.get_capture(1)
1843         p = capture[0]
1844         try:
1845             ip = p[IP]
1846             tcp = p[TCP]
1847             self.assertEqual(ip.src, self.nat_addr)
1848             self.assertEqual(ip.dst, server.ip4)
1849             self.assertNotEqual(tcp.sport, host_in_port)
1850             self.assertEqual(tcp.dport, server_in_port)
1851             self.assert_packet_checksums_valid(p)
1852             host_out_port = tcp.sport
1853         except:
1854             self.logger.error(ppp("Unexpected or invalid packet:", p))
1855             raise
1856
1857         after = self.statistics['/nat44-ei/hairpinning']
1858         if_idx = self.pg0.sw_if_index
1859         self.assertEqual(after[:, if_idx].sum() - cnt[:, if_idx].sum(), 1)
1860
1861         # send reply from server to host
1862         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
1863              IP(src=server.ip4, dst=self.nat_addr) /
1864              TCP(sport=server_in_port, dport=host_out_port))
1865         self.pg0.add_stream(p)
1866         self.pg_enable_capture(self.pg_interfaces)
1867         self.pg_start()
1868         capture = self.pg0.get_capture(1)
1869         p = capture[0]
1870         try:
1871             ip = p[IP]
1872             tcp = p[TCP]
1873             self.assertEqual(ip.src, self.nat_addr)
1874             self.assertEqual(ip.dst, host.ip4)
1875             self.assertEqual(tcp.sport, server_out_port)
1876             self.assertEqual(tcp.dport, host_in_port)
1877             self.assert_packet_checksums_valid(p)
1878         except:
1879             self.logger.error(ppp("Unexpected or invalid packet:", p))
1880             raise
1881
1882         after = self.statistics['/nat44-ei/hairpinning']
1883         if_idx = self.pg0.sw_if_index
1884         self.assertEqual(after[:, if_idx].sum() - cnt[:, if_idx].sum(),
1885                          2+(1 if self.vpp_worker_count > 0 else 0))
1886
1887     def test_hairpinning2(self):
1888         """ NAT44EI hairpinning - 1:1 NAT"""
1889
1890         server1_nat_ip = "10.0.0.10"
1891         server2_nat_ip = "10.0.0.11"
1892         host = self.pg0.remote_hosts[0]
1893         server1 = self.pg0.remote_hosts[1]
1894         server2 = self.pg0.remote_hosts[2]
1895         server_tcp_port = 22
1896         server_udp_port = 20
1897
1898         self.nat44_add_address(self.nat_addr)
1899         flags = self.config_flags.NAT44_EI_IF_INSIDE
1900         self.vapi.nat44_ei_interface_add_del_feature(
1901             sw_if_index=self.pg0.sw_if_index,
1902             flags=flags, is_add=1)
1903         self.vapi.nat44_ei_interface_add_del_feature(
1904             sw_if_index=self.pg1.sw_if_index,
1905             is_add=1)
1906
1907         # add static mapping for servers
1908         self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
1909         self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
1910
1911         # host to server1
1912         pkts = []
1913         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1914              IP(src=host.ip4, dst=server1_nat_ip) /
1915              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1916         pkts.append(p)
1917         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1918              IP(src=host.ip4, dst=server1_nat_ip) /
1919              UDP(sport=self.udp_port_in, dport=server_udp_port))
1920         pkts.append(p)
1921         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1922              IP(src=host.ip4, dst=server1_nat_ip) /
1923              ICMP(id=self.icmp_id_in, type='echo-request'))
1924         pkts.append(p)
1925         self.pg0.add_stream(pkts)
1926         self.pg_enable_capture(self.pg_interfaces)
1927         self.pg_start()
1928         capture = self.pg0.get_capture(len(pkts))
1929         for packet in capture:
1930             try:
1931                 self.assertEqual(packet[IP].src, self.nat_addr)
1932                 self.assertEqual(packet[IP].dst, server1.ip4)
1933                 if packet.haslayer(TCP):
1934                     self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
1935                     self.assertEqual(packet[TCP].dport, server_tcp_port)
1936                     self.tcp_port_out = packet[TCP].sport
1937                     self.assert_packet_checksums_valid(packet)
1938                 elif packet.haslayer(UDP):
1939                     self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
1940                     self.assertEqual(packet[UDP].dport, server_udp_port)
1941                     self.udp_port_out = packet[UDP].sport
1942                 else:
1943                     self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
1944                     self.icmp_id_out = packet[ICMP].id
1945             except:
1946                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1947                 raise
1948
1949         # server1 to host
1950         pkts = []
1951         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1952              IP(src=server1.ip4, dst=self.nat_addr) /
1953              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
1954         pkts.append(p)
1955         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1956              IP(src=server1.ip4, dst=self.nat_addr) /
1957              UDP(sport=server_udp_port, dport=self.udp_port_out))
1958         pkts.append(p)
1959         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1960              IP(src=server1.ip4, dst=self.nat_addr) /
1961              ICMP(id=self.icmp_id_out, type='echo-reply'))
1962         pkts.append(p)
1963         self.pg0.add_stream(pkts)
1964         self.pg_enable_capture(self.pg_interfaces)
1965         self.pg_start()
1966         capture = self.pg0.get_capture(len(pkts))
1967         for packet in capture:
1968             try:
1969                 self.assertEqual(packet[IP].src, server1_nat_ip)
1970                 self.assertEqual(packet[IP].dst, host.ip4)
1971                 if packet.haslayer(TCP):
1972                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
1973                     self.assertEqual(packet[TCP].sport, server_tcp_port)
1974                     self.assert_packet_checksums_valid(packet)
1975                 elif packet.haslayer(UDP):
1976                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
1977                     self.assertEqual(packet[UDP].sport, server_udp_port)
1978                 else:
1979                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
1980             except:
1981                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
1982                 raise
1983
1984         # server2 to server1
1985         pkts = []
1986         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1987              IP(src=server2.ip4, dst=server1_nat_ip) /
1988              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
1989         pkts.append(p)
1990         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1991              IP(src=server2.ip4, dst=server1_nat_ip) /
1992              UDP(sport=self.udp_port_in, dport=server_udp_port))
1993         pkts.append(p)
1994         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
1995              IP(src=server2.ip4, dst=server1_nat_ip) /
1996              ICMP(id=self.icmp_id_in, type='echo-request'))
1997         pkts.append(p)
1998         self.pg0.add_stream(pkts)
1999         self.pg_enable_capture(self.pg_interfaces)
2000         self.pg_start()
2001         capture = self.pg0.get_capture(len(pkts))
2002         for packet in capture:
2003             try:
2004                 self.assertEqual(packet[IP].src, server2_nat_ip)
2005                 self.assertEqual(packet[IP].dst, server1.ip4)
2006                 if packet.haslayer(TCP):
2007                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
2008                     self.assertEqual(packet[TCP].dport, server_tcp_port)
2009                     self.tcp_port_out = packet[TCP].sport
2010                     self.assert_packet_checksums_valid(packet)
2011                 elif packet.haslayer(UDP):
2012                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
2013                     self.assertEqual(packet[UDP].dport, server_udp_port)
2014                     self.udp_port_out = packet[UDP].sport
2015                 else:
2016                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2017                     self.icmp_id_out = packet[ICMP].id
2018             except:
2019                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2020                 raise
2021
2022         # server1 to server2
2023         pkts = []
2024         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2025              IP(src=server1.ip4, dst=server2_nat_ip) /
2026              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
2027         pkts.append(p)
2028         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2029              IP(src=server1.ip4, dst=server2_nat_ip) /
2030              UDP(sport=server_udp_port, dport=self.udp_port_out))
2031         pkts.append(p)
2032         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2033              IP(src=server1.ip4, dst=server2_nat_ip) /
2034              ICMP(id=self.icmp_id_out, type='echo-reply'))
2035         pkts.append(p)
2036         self.pg0.add_stream(pkts)
2037         self.pg_enable_capture(self.pg_interfaces)
2038         self.pg_start()
2039         capture = self.pg0.get_capture(len(pkts))
2040         for packet in capture:
2041             try:
2042                 self.assertEqual(packet[IP].src, server1_nat_ip)
2043                 self.assertEqual(packet[IP].dst, server2.ip4)
2044                 if packet.haslayer(TCP):
2045                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
2046                     self.assertEqual(packet[TCP].sport, server_tcp_port)
2047                     self.assert_packet_checksums_valid(packet)
2048                 elif packet.haslayer(UDP):
2049                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
2050                     self.assertEqual(packet[UDP].sport, server_udp_port)
2051                 else:
2052                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
2053             except:
2054                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
2055                 raise
2056
2057     def test_hairpinning_avoid_inf_loop(self):
2058         """ NAT44EI hairpinning - 1:1 NAPT avoid infinite loop """
2059
2060         host = self.pg0.remote_hosts[0]
2061         server = self.pg0.remote_hosts[1]
2062         host_in_port = 1234
2063         host_out_port = 0
2064         server_in_port = 5678
2065         server_out_port = 8765
2066
2067         self.nat44_add_address(self.nat_addr)
2068         flags = self.config_flags.NAT44_EI_IF_INSIDE
2069         self.vapi.nat44_ei_interface_add_del_feature(
2070             sw_if_index=self.pg0.sw_if_index,
2071             flags=flags, is_add=1)
2072         self.vapi.nat44_ei_interface_add_del_feature(
2073             sw_if_index=self.pg1.sw_if_index,
2074             is_add=1)
2075
2076         # add static mapping for server
2077         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2078                                       server_in_port, server_out_port,
2079                                       proto=IP_PROTOS.tcp)
2080
2081         # add another static mapping that maps pg0.local_ip4 address to itself
2082         self.nat44_add_static_mapping(self.pg0.local_ip4, self.pg0.local_ip4)
2083
2084         # send packet from host to VPP (the packet should get dropped)
2085         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2086              IP(src=host.ip4, dst=self.pg0.local_ip4) /
2087              TCP(sport=host_in_port, dport=server_out_port))
2088         self.pg0.add_stream(p)
2089         self.pg_enable_capture(self.pg_interfaces)
2090         self.pg_start()
2091         # Here VPP used to crash due to an infinite loop
2092
2093         cnt = self.statistics['/nat44-ei/hairpinning']
2094         # send packet from host to server
2095         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2096              IP(src=host.ip4, dst=self.nat_addr) /
2097              TCP(sport=host_in_port, dport=server_out_port))
2098         self.pg0.add_stream(p)
2099         self.pg_enable_capture(self.pg_interfaces)
2100         self.pg_start()
2101         capture = self.pg0.get_capture(1)
2102         p = capture[0]
2103         try:
2104             ip = p[IP]
2105             tcp = p[TCP]
2106             self.assertEqual(ip.src, self.nat_addr)
2107             self.assertEqual(ip.dst, server.ip4)
2108             self.assertNotEqual(tcp.sport, host_in_port)
2109             self.assertEqual(tcp.dport, server_in_port)
2110             self.assert_packet_checksums_valid(p)
2111             host_out_port = tcp.sport
2112         except:
2113             self.logger.error(ppp("Unexpected or invalid packet:", p))
2114             raise
2115
2116         after = self.statistics['/nat44-ei/hairpinning']
2117         if_idx = self.pg0.sw_if_index
2118         self.assertEqual(after[:, if_idx].sum() - cnt[:, if_idx].sum(), 1)
2119
2120         # send reply from server to host
2121         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2122              IP(src=server.ip4, dst=self.nat_addr) /
2123              TCP(sport=server_in_port, dport=host_out_port))
2124         self.pg0.add_stream(p)
2125         self.pg_enable_capture(self.pg_interfaces)
2126         self.pg_start()
2127         capture = self.pg0.get_capture(1)
2128         p = capture[0]
2129         try:
2130             ip = p[IP]
2131             tcp = p[TCP]
2132             self.assertEqual(ip.src, self.nat_addr)
2133             self.assertEqual(ip.dst, host.ip4)
2134             self.assertEqual(tcp.sport, server_out_port)
2135             self.assertEqual(tcp.dport, host_in_port)
2136             self.assert_packet_checksums_valid(p)
2137         except:
2138             self.logger.error(ppp("Unexpected or invalid packet:", p))
2139             raise
2140
2141         after = self.statistics['/nat44-ei/hairpinning']
2142         if_idx = self.pg0.sw_if_index
2143         self.assertEqual(after[:, if_idx].sum() - cnt[:, if_idx].sum(),
2144                          2+(1 if self.vpp_worker_count > 0 else 0))
2145
2146     def test_interface_addr(self):
2147         """ NAT44EI acquire addresses from interface """
2148         self.vapi.nat44_ei_add_del_interface_addr(
2149             is_add=1,
2150             sw_if_index=self.pg7.sw_if_index)
2151
2152         # no address in NAT pool
2153         addresses = self.vapi.nat44_ei_address_dump()
2154         self.assertEqual(0, len(addresses))
2155
2156         # configure interface address and check NAT address pool
2157         self.pg7.config_ip4()
2158         addresses = self.vapi.nat44_ei_address_dump()
2159         self.assertEqual(1, len(addresses))
2160         self.assertEqual(str(addresses[0].ip_address), self.pg7.local_ip4)
2161
2162         # remove interface address and check NAT address pool
2163         self.pg7.unconfig_ip4()
2164         addresses = self.vapi.nat44_ei_address_dump()
2165         self.assertEqual(0, len(addresses))
2166
2167     def test_interface_addr_static_mapping(self):
2168         """ NAT44EI Static mapping with addresses from interface """
2169         tag = "testTAG"
2170
2171         self.vapi.nat44_ei_add_del_interface_addr(
2172             is_add=1,
2173             sw_if_index=self.pg7.sw_if_index)
2174         self.nat44_add_static_mapping(
2175             '1.2.3.4',
2176             external_sw_if_index=self.pg7.sw_if_index,
2177             tag=tag)
2178
2179         # static mappings with external interface
2180         static_mappings = self.vapi.nat44_ei_static_mapping_dump()
2181         self.assertEqual(1, len(static_mappings))
2182         self.assertEqual(self.pg7.sw_if_index,
2183                          static_mappings[0].external_sw_if_index)
2184         self.assertEqual(static_mappings[0].tag, tag)
2185
2186         # configure interface address and check static mappings
2187         self.pg7.config_ip4()
2188         static_mappings = self.vapi.nat44_ei_static_mapping_dump()
2189         self.assertEqual(2, len(static_mappings))
2190         resolved = False
2191         for sm in static_mappings:
2192             if sm.external_sw_if_index == 0xFFFFFFFF:
2193                 self.assertEqual(str(sm.external_ip_address),
2194                                  self.pg7.local_ip4)
2195                 self.assertEqual(sm.tag, tag)
2196                 resolved = True
2197         self.assertTrue(resolved)
2198
2199         # remove interface address and check static mappings
2200         self.pg7.unconfig_ip4()
2201         static_mappings = self.vapi.nat44_ei_static_mapping_dump()
2202         self.assertEqual(1, len(static_mappings))
2203         self.assertEqual(self.pg7.sw_if_index,
2204                          static_mappings[0].external_sw_if_index)
2205         self.assertEqual(static_mappings[0].tag, tag)
2206
2207         # configure interface address again and check static mappings
2208         self.pg7.config_ip4()
2209         static_mappings = self.vapi.nat44_ei_static_mapping_dump()
2210         self.assertEqual(2, len(static_mappings))
2211         resolved = False
2212         for sm in static_mappings:
2213             if sm.external_sw_if_index == 0xFFFFFFFF:
2214                 self.assertEqual(str(sm.external_ip_address),
2215                                  self.pg7.local_ip4)
2216                 self.assertEqual(sm.tag, tag)
2217                 resolved = True
2218         self.assertTrue(resolved)
2219
2220         # remove static mapping
2221         self.nat44_add_static_mapping(
2222             '1.2.3.4',
2223             external_sw_if_index=self.pg7.sw_if_index,
2224             tag=tag,
2225             is_add=0)
2226         static_mappings = self.vapi.nat44_ei_static_mapping_dump()
2227         self.assertEqual(0, len(static_mappings))
2228
2229     def test_interface_addr_identity_nat(self):
2230         """ NAT44EI Identity NAT with addresses from interface """
2231
2232         port = 53053
2233         self.vapi.nat44_ei_add_del_interface_addr(
2234             is_add=1,
2235             sw_if_index=self.pg7.sw_if_index)
2236         self.vapi.nat44_ei_add_del_identity_mapping(
2237             ip_address=b'0',
2238             sw_if_index=self.pg7.sw_if_index,
2239             port=port,
2240             protocol=IP_PROTOS.tcp,
2241             is_add=1)
2242
2243         # identity mappings with external interface
2244         identity_mappings = self.vapi.nat44_ei_identity_mapping_dump()
2245         self.assertEqual(1, len(identity_mappings))
2246         self.assertEqual(self.pg7.sw_if_index,
2247                          identity_mappings[0].sw_if_index)
2248
2249         # configure interface address and check identity mappings
2250         self.pg7.config_ip4()
2251         identity_mappings = self.vapi.nat44_ei_identity_mapping_dump()
2252         resolved = False
2253         self.assertEqual(2, len(identity_mappings))
2254         for sm in identity_mappings:
2255             if sm.sw_if_index == 0xFFFFFFFF:
2256                 self.assertEqual(str(identity_mappings[0].ip_address),
2257                                  self.pg7.local_ip4)
2258                 self.assertEqual(port, identity_mappings[0].port)
2259                 self.assertEqual(IP_PROTOS.tcp, identity_mappings[0].protocol)
2260                 resolved = True
2261         self.assertTrue(resolved)
2262
2263         # remove interface address and check identity mappings
2264         self.pg7.unconfig_ip4()
2265         identity_mappings = self.vapi.nat44_ei_identity_mapping_dump()
2266         self.assertEqual(1, len(identity_mappings))
2267         self.assertEqual(self.pg7.sw_if_index,
2268                          identity_mappings[0].sw_if_index)
2269
2270     def test_ipfix_nat44_sess(self):
2271         """ NAT44EI IPFIX logging NAT44EI session created/deleted """
2272         self.ipfix_domain_id = 10
2273         self.ipfix_src_port = 20202
2274         collector_port = 30303
2275         bind_layers(UDP, IPFIX, dport=30303)
2276         self.nat44_add_address(self.nat_addr)
2277         flags = self.config_flags.NAT44_EI_IF_INSIDE
2278         self.vapi.nat44_ei_interface_add_del_feature(
2279             sw_if_index=self.pg0.sw_if_index,
2280             flags=flags, is_add=1)
2281         self.vapi.nat44_ei_interface_add_del_feature(
2282             sw_if_index=self.pg1.sw_if_index,
2283             is_add=1)
2284         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
2285                                      src_address=self.pg3.local_ip4,
2286                                      path_mtu=512,
2287                                      template_interval=10,
2288                                      collector_port=collector_port)
2289         self.vapi.nat44_ei_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
2290                                                 src_port=self.ipfix_src_port,
2291                                                 enable=1)
2292
2293         pkts = self.create_stream_in(self.pg0, self.pg1)
2294         self.pg0.add_stream(pkts)
2295         self.pg_enable_capture(self.pg_interfaces)
2296         self.pg_start()
2297         capture = self.pg1.get_capture(len(pkts))
2298         self.verify_capture_out(capture)
2299         self.nat44_add_address(self.nat_addr, is_add=0)
2300         self.vapi.ipfix_flush()
2301         capture = self.pg3.get_capture(7)
2302         ipfix = IPFIXDecoder()
2303         # first load template
2304         for p in capture:
2305             self.assertTrue(p.haslayer(IPFIX))
2306             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2307             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2308             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2309             self.assertEqual(p[UDP].dport, collector_port)
2310             self.assertEqual(p[IPFIX].observationDomainID,
2311                              self.ipfix_domain_id)
2312             if p.haslayer(Template):
2313                 ipfix.add_template(p.getlayer(Template))
2314         # verify events in data set
2315         for p in capture:
2316             if p.haslayer(Data):
2317                 data = ipfix.decode_data_set(p.getlayer(Set))
2318                 self.verify_ipfix_nat44_ses(data)
2319
2320     def test_ipfix_addr_exhausted(self):
2321         """ NAT44EI IPFIX logging NAT addresses exhausted """
2322         flags = self.config_flags.NAT44_EI_IF_INSIDE
2323         self.vapi.nat44_ei_interface_add_del_feature(
2324             sw_if_index=self.pg0.sw_if_index,
2325             flags=flags, is_add=1)
2326         self.vapi.nat44_ei_interface_add_del_feature(
2327             sw_if_index=self.pg1.sw_if_index,
2328             is_add=1)
2329         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
2330                                      src_address=self.pg3.local_ip4,
2331                                      path_mtu=512,
2332                                      template_interval=10)
2333         self.vapi.nat44_ei_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
2334                                                 src_port=self.ipfix_src_port,
2335                                                 enable=1)
2336
2337         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
2338              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2339              TCP(sport=3025))
2340         self.pg0.add_stream(p)
2341         self.pg_enable_capture(self.pg_interfaces)
2342         self.pg_start()
2343         self.pg1.assert_nothing_captured()
2344         self.vapi.ipfix_flush()
2345         capture = self.pg3.get_capture(7)
2346         ipfix = IPFIXDecoder()
2347         # first load template
2348         for p in capture:
2349             self.assertTrue(p.haslayer(IPFIX))
2350             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2351             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2352             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2353             self.assertEqual(p[UDP].dport, 4739)
2354             self.assertEqual(p[IPFIX].observationDomainID,
2355                              self.ipfix_domain_id)
2356             if p.haslayer(Template):
2357                 ipfix.add_template(p.getlayer(Template))
2358         # verify events in data set
2359         for p in capture:
2360             if p.haslayer(Data):
2361                 data = ipfix.decode_data_set(p.getlayer(Set))
2362                 self.verify_ipfix_addr_exhausted(data)
2363
2364     def test_ipfix_max_sessions(self):
2365         """ NAT44EI IPFIX logging maximum session entries exceeded """
2366         self.nat44_add_address(self.nat_addr)
2367         flags = self.config_flags.NAT44_EI_IF_INSIDE
2368         self.vapi.nat44_ei_interface_add_del_feature(
2369             sw_if_index=self.pg0.sw_if_index,
2370             flags=flags, is_add=1)
2371         self.vapi.nat44_ei_interface_add_del_feature(
2372             sw_if_index=self.pg1.sw_if_index,
2373             is_add=1)
2374
2375         max_sessions_per_thread = self.max_translations
2376         max_sessions = max(1, self.vpp_worker_count) * max_sessions_per_thread
2377
2378         pkts = []
2379         for i in range(0, max_sessions):
2380             src = "10.10.%u.%u" % ((i & 0xFF00) >> 8, i & 0xFF)
2381             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2382                  IP(src=src, dst=self.pg1.remote_ip4) /
2383                  TCP(sport=1025))
2384             pkts.append(p)
2385         self.pg0.add_stream(pkts)
2386         self.pg_enable_capture(self.pg_interfaces)
2387         self.pg_start()
2388
2389         self.pg1.get_capture(max_sessions)
2390         self.vapi.set_ipfix_exporter(collector_address=self.pg3.remote_ip4,
2391                                      src_address=self.pg3.local_ip4,
2392                                      path_mtu=512,
2393                                      template_interval=10)
2394         self.vapi.nat44_ei_ipfix_enable_disable(domain_id=self.ipfix_domain_id,
2395                                                 src_port=self.ipfix_src_port,
2396                                                 enable=1)
2397
2398         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2399              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2400              TCP(sport=1025))
2401         self.pg0.add_stream(p)
2402         self.pg_enable_capture(self.pg_interfaces)
2403         self.pg_start()
2404         self.pg1.assert_nothing_captured()
2405         self.vapi.ipfix_flush()
2406         capture = self.pg3.get_capture(7)
2407         ipfix = IPFIXDecoder()
2408         # first load template
2409         for p in capture:
2410             self.assertTrue(p.haslayer(IPFIX))
2411             self.assertEqual(p[IP].src, self.pg3.local_ip4)
2412             self.assertEqual(p[IP].dst, self.pg3.remote_ip4)
2413             self.assertEqual(p[UDP].sport, self.ipfix_src_port)
2414             self.assertEqual(p[UDP].dport, 4739)
2415             self.assertEqual(p[IPFIX].observationDomainID,
2416                              self.ipfix_domain_id)
2417             if p.haslayer(Template):
2418                 ipfix.add_template(p.getlayer(Template))
2419         # verify events in data set
2420         for p in capture:
2421             if p.haslayer(Data):
2422                 data = ipfix.decode_data_set(p.getlayer(Set))
2423                 self.verify_ipfix_max_sessions(data, max_sessions_per_thread)
2424
2425     def test_syslog_apmap(self):
2426         """ NAT44EI syslog address and port mapping creation and deletion """
2427         self.vapi.syslog_set_filter(
2428             self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_INFO)
2429         self.vapi.syslog_set_sender(self.pg3.local_ip4, self.pg3.remote_ip4)
2430         self.nat44_add_address(self.nat_addr)
2431         flags = self.config_flags.NAT44_EI_IF_INSIDE
2432         self.vapi.nat44_ei_interface_add_del_feature(
2433             sw_if_index=self.pg0.sw_if_index,
2434             flags=flags, is_add=1)
2435         self.vapi.nat44_ei_interface_add_del_feature(
2436             sw_if_index=self.pg1.sw_if_index,
2437             is_add=1)
2438
2439         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2440              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2441              TCP(sport=self.tcp_port_in, dport=20))
2442         self.pg0.add_stream(p)
2443         self.pg_enable_capture(self.pg_interfaces)
2444         self.pg_start()
2445         capture = self.pg1.get_capture(1)
2446         self.tcp_port_out = capture[0][TCP].sport
2447         capture = self.pg3.get_capture(1)
2448         self.verify_syslog_apmap(capture[0][Raw].load)
2449
2450         self.pg_enable_capture(self.pg_interfaces)
2451         self.pg_start()
2452         self.nat44_add_address(self.nat_addr, is_add=0)
2453         capture = self.pg3.get_capture(1)
2454         self.verify_syslog_apmap(capture[0][Raw].load, False)
2455
2456     def test_pool_addr_fib(self):
2457         """ NAT44EI add pool addresses to FIB """
2458         static_addr = '10.0.0.10'
2459         self.nat44_add_address(self.nat_addr)
2460         flags = self.config_flags.NAT44_EI_IF_INSIDE
2461         self.vapi.nat44_ei_interface_add_del_feature(
2462             sw_if_index=self.pg0.sw_if_index,
2463             flags=flags, is_add=1)
2464         self.vapi.nat44_ei_interface_add_del_feature(
2465             sw_if_index=self.pg1.sw_if_index,
2466             is_add=1)
2467         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr)
2468
2469         # NAT44EI address
2470         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2471              ARP(op=ARP.who_has, pdst=self.nat_addr,
2472                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2473         self.pg1.add_stream(p)
2474         self.pg_enable_capture(self.pg_interfaces)
2475         self.pg_start()
2476         capture = self.pg1.get_capture(1)
2477         self.assertTrue(capture[0].haslayer(ARP))
2478         self.assertTrue(capture[0][ARP].op, ARP.is_at)
2479
2480         # 1:1 NAT address
2481         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2482              ARP(op=ARP.who_has, pdst=static_addr,
2483                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2484         self.pg1.add_stream(p)
2485         self.pg_enable_capture(self.pg_interfaces)
2486         self.pg_start()
2487         capture = self.pg1.get_capture(1)
2488         self.assertTrue(capture[0].haslayer(ARP))
2489         self.assertTrue(capture[0][ARP].op, ARP.is_at)
2490
2491         # send ARP to non-NAT44EI interface
2492         p = (Ether(src=self.pg2.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2493              ARP(op=ARP.who_has, pdst=self.nat_addr,
2494                  psrc=self.pg2.remote_ip4, hwsrc=self.pg2.remote_mac))
2495         self.pg2.add_stream(p)
2496         self.pg_enable_capture(self.pg_interfaces)
2497         self.pg_start()
2498         self.pg1.assert_nothing_captured()
2499
2500         # remove addresses and verify
2501         self.nat44_add_address(self.nat_addr, is_add=0)
2502         self.nat44_add_static_mapping(self.pg0.remote_ip4, static_addr,
2503                                       is_add=0)
2504
2505         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2506              ARP(op=ARP.who_has, pdst=self.nat_addr,
2507                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2508         self.pg1.add_stream(p)
2509         self.pg_enable_capture(self.pg_interfaces)
2510         self.pg_start()
2511         self.pg1.assert_nothing_captured()
2512
2513         p = (Ether(src=self.pg1.remote_mac, dst='ff:ff:ff:ff:ff:ff') /
2514              ARP(op=ARP.who_has, pdst=static_addr,
2515                  psrc=self.pg1.remote_ip4, hwsrc=self.pg1.remote_mac))
2516         self.pg1.add_stream(p)
2517         self.pg_enable_capture(self.pg_interfaces)
2518         self.pg_start()
2519         self.pg1.assert_nothing_captured()
2520
2521     def test_vrf_mode(self):
2522         """ NAT44EI tenant VRF aware address pool mode """
2523
2524         vrf_id1 = 1
2525         vrf_id2 = 2
2526         nat_ip1 = "10.0.0.10"
2527         nat_ip2 = "10.0.0.11"
2528
2529         self.pg0.unconfig_ip4()
2530         self.pg1.unconfig_ip4()
2531         self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id1})
2532         self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id2})
2533         self.pg0.set_table_ip4(vrf_id1)
2534         self.pg1.set_table_ip4(vrf_id2)
2535         self.pg0.config_ip4()
2536         self.pg1.config_ip4()
2537         self.pg0.resolve_arp()
2538         self.pg1.resolve_arp()
2539
2540         self.nat44_add_address(nat_ip1, vrf_id=vrf_id1)
2541         self.nat44_add_address(nat_ip2, vrf_id=vrf_id2)
2542         flags = self.config_flags.NAT44_EI_IF_INSIDE
2543         self.vapi.nat44_ei_interface_add_del_feature(
2544             sw_if_index=self.pg0.sw_if_index,
2545             flags=flags, is_add=1)
2546         self.vapi.nat44_ei_interface_add_del_feature(
2547             sw_if_index=self.pg1.sw_if_index,
2548             flags=flags, is_add=1)
2549         self.vapi.nat44_ei_interface_add_del_feature(
2550             sw_if_index=self.pg2.sw_if_index,
2551             is_add=1)
2552
2553         try:
2554             # first VRF
2555             pkts = self.create_stream_in(self.pg0, self.pg2)
2556             self.pg0.add_stream(pkts)
2557             self.pg_enable_capture(self.pg_interfaces)
2558             self.pg_start()
2559             capture = self.pg2.get_capture(len(pkts))
2560             self.verify_capture_out(capture, nat_ip1)
2561
2562             # second VRF
2563             pkts = self.create_stream_in(self.pg1, self.pg2)
2564             self.pg1.add_stream(pkts)
2565             self.pg_enable_capture(self.pg_interfaces)
2566             self.pg_start()
2567             capture = self.pg2.get_capture(len(pkts))
2568             self.verify_capture_out(capture, nat_ip2)
2569
2570         finally:
2571             self.pg0.unconfig_ip4()
2572             self.pg1.unconfig_ip4()
2573             self.pg0.set_table_ip4(0)
2574             self.pg1.set_table_ip4(0)
2575             self.pg0.config_ip4()
2576             self.pg1.config_ip4()
2577             self.pg0.resolve_arp()
2578             self.pg1.resolve_arp()
2579             self.vapi.ip_table_add_del(is_add=0, table={'table_id': vrf_id1})
2580             self.vapi.ip_table_add_del(is_add=0, table={'table_id': vrf_id2})
2581
2582     def test_vrf_feature_independent(self):
2583         """ NAT44EI tenant VRF independent address pool mode """
2584
2585         nat_ip1 = "10.0.0.10"
2586         nat_ip2 = "10.0.0.11"
2587
2588         self.nat44_add_address(nat_ip1)
2589         self.nat44_add_address(nat_ip2, vrf_id=99)
2590         flags = self.config_flags.NAT44_EI_IF_INSIDE
2591         self.vapi.nat44_ei_interface_add_del_feature(
2592             sw_if_index=self.pg0.sw_if_index,
2593             flags=flags, is_add=1)
2594         self.vapi.nat44_ei_interface_add_del_feature(
2595             sw_if_index=self.pg1.sw_if_index,
2596             flags=flags, is_add=1)
2597         self.vapi.nat44_ei_interface_add_del_feature(
2598             sw_if_index=self.pg2.sw_if_index,
2599             is_add=1)
2600
2601         # first VRF
2602         pkts = self.create_stream_in(self.pg0, self.pg2)
2603         self.pg0.add_stream(pkts)
2604         self.pg_enable_capture(self.pg_interfaces)
2605         self.pg_start()
2606         capture = self.pg2.get_capture(len(pkts))
2607         self.verify_capture_out(capture, nat_ip1)
2608
2609         # second VRF
2610         pkts = self.create_stream_in(self.pg1, self.pg2)
2611         self.pg1.add_stream(pkts)
2612         self.pg_enable_capture(self.pg_interfaces)
2613         self.pg_start()
2614         capture = self.pg2.get_capture(len(pkts))
2615         self.verify_capture_out(capture, nat_ip1)
2616
2617     def test_dynamic_ipless_interfaces(self):
2618         """ NAT44EI interfaces without configured IP address """
2619         self.create_routes_and_neigbors()
2620         self.nat44_add_address(self.nat_addr)
2621         flags = self.config_flags.NAT44_EI_IF_INSIDE
2622         self.vapi.nat44_ei_interface_add_del_feature(
2623             sw_if_index=self.pg7.sw_if_index,
2624             flags=flags, is_add=1)
2625         self.vapi.nat44_ei_interface_add_del_feature(
2626             sw_if_index=self.pg8.sw_if_index,
2627             is_add=1)
2628
2629         # in2out
2630         pkts = self.create_stream_in(self.pg7, self.pg8)
2631         self.pg7.add_stream(pkts)
2632         self.pg_enable_capture(self.pg_interfaces)
2633         self.pg_start()
2634         capture = self.pg8.get_capture(len(pkts))
2635         self.verify_capture_out(capture)
2636
2637         # out2in
2638         pkts = self.create_stream_out(self.pg8, self.nat_addr)
2639         self.pg8.add_stream(pkts)
2640         self.pg_enable_capture(self.pg_interfaces)
2641         self.pg_start()
2642         capture = self.pg7.get_capture(len(pkts))
2643         self.verify_capture_in(capture, self.pg7)
2644
2645     def test_static_ipless_interfaces(self):
2646         """ NAT44EI interfaces without configured IP address - 1:1 NAT """
2647
2648         self.create_routes_and_neigbors()
2649         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr)
2650         flags = self.config_flags.NAT44_EI_IF_INSIDE
2651         self.vapi.nat44_ei_interface_add_del_feature(
2652             sw_if_index=self.pg7.sw_if_index,
2653             flags=flags, is_add=1)
2654         self.vapi.nat44_ei_interface_add_del_feature(
2655             sw_if_index=self.pg8.sw_if_index,
2656             is_add=1)
2657
2658         # out2in
2659         pkts = self.create_stream_out(self.pg8)
2660         self.pg8.add_stream(pkts)
2661         self.pg_enable_capture(self.pg_interfaces)
2662         self.pg_start()
2663         capture = self.pg7.get_capture(len(pkts))
2664         self.verify_capture_in(capture, self.pg7)
2665
2666         # in2out
2667         pkts = self.create_stream_in(self.pg7, self.pg8)
2668         self.pg7.add_stream(pkts)
2669         self.pg_enable_capture(self.pg_interfaces)
2670         self.pg_start()
2671         capture = self.pg8.get_capture(len(pkts))
2672         self.verify_capture_out(capture, self.nat_addr, True)
2673
2674     def test_static_with_port_ipless_interfaces(self):
2675         """ NAT44EI interfaces without configured IP address - 1:1 NAPT """
2676
2677         self.tcp_port_out = 30606
2678         self.udp_port_out = 30607
2679         self.icmp_id_out = 30608
2680
2681         self.create_routes_and_neigbors()
2682         self.nat44_add_address(self.nat_addr)
2683         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2684                                       self.tcp_port_in, self.tcp_port_out,
2685                                       proto=IP_PROTOS.tcp)
2686         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2687                                       self.udp_port_in, self.udp_port_out,
2688                                       proto=IP_PROTOS.udp)
2689         self.nat44_add_static_mapping(self.pg7.remote_ip4, self.nat_addr,
2690                                       self.icmp_id_in, self.icmp_id_out,
2691                                       proto=IP_PROTOS.icmp)
2692         flags = self.config_flags.NAT44_EI_IF_INSIDE
2693         self.vapi.nat44_ei_interface_add_del_feature(
2694             sw_if_index=self.pg7.sw_if_index,
2695             flags=flags, is_add=1)
2696         self.vapi.nat44_ei_interface_add_del_feature(
2697             sw_if_index=self.pg8.sw_if_index,
2698             is_add=1)
2699
2700         # out2in
2701         pkts = self.create_stream_out(self.pg8)
2702         self.pg8.add_stream(pkts)
2703         self.pg_enable_capture(self.pg_interfaces)
2704         self.pg_start()
2705         capture = self.pg7.get_capture(len(pkts))
2706         self.verify_capture_in(capture, self.pg7)
2707
2708         # in2out
2709         pkts = self.create_stream_in(self.pg7, self.pg8)
2710         self.pg7.add_stream(pkts)
2711         self.pg_enable_capture(self.pg_interfaces)
2712         self.pg_start()
2713         capture = self.pg8.get_capture(len(pkts))
2714         self.verify_capture_out(capture)
2715
2716     def test_static_unknown_proto(self):
2717         """ NAT44EI 1:1 translate packet with unknown protocol """
2718         nat_ip = "10.0.0.10"
2719         self.nat44_add_static_mapping(self.pg0.remote_ip4, nat_ip)
2720         flags = self.config_flags.NAT44_EI_IF_INSIDE
2721         self.vapi.nat44_ei_interface_add_del_feature(
2722             sw_if_index=self.pg0.sw_if_index,
2723             flags=flags, is_add=1)
2724         self.vapi.nat44_ei_interface_add_del_feature(
2725             sw_if_index=self.pg1.sw_if_index,
2726             is_add=1)
2727
2728         # in2out
2729         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
2730              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
2731              GRE() /
2732              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2733              TCP(sport=1234, dport=1234))
2734         self.pg0.add_stream(p)
2735         self.pg_enable_capture(self.pg_interfaces)
2736         self.pg_start()
2737         p = self.pg1.get_capture(1)
2738         packet = p[0]
2739         try:
2740             self.assertEqual(packet[IP].src, nat_ip)
2741             self.assertEqual(packet[IP].dst, self.pg1.remote_ip4)
2742             self.assertEqual(packet.haslayer(GRE), 1)
2743             self.assert_packet_checksums_valid(packet)
2744         except:
2745             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2746             raise
2747
2748         # out2in
2749         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
2750              IP(src=self.pg1.remote_ip4, dst=nat_ip) /
2751              GRE() /
2752              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2753              TCP(sport=1234, dport=1234))
2754         self.pg1.add_stream(p)
2755         self.pg_enable_capture(self.pg_interfaces)
2756         self.pg_start()
2757         p = self.pg0.get_capture(1)
2758         packet = p[0]
2759         try:
2760             self.assertEqual(packet[IP].src, self.pg1.remote_ip4)
2761             self.assertEqual(packet[IP].dst, self.pg0.remote_ip4)
2762             self.assertEqual(packet.haslayer(GRE), 1)
2763             self.assert_packet_checksums_valid(packet)
2764         except:
2765             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2766             raise
2767
2768     def test_hairpinning_static_unknown_proto(self):
2769         """ NAT44EI 1:1 translate packet with unknown protocol - hairpinning
2770         """
2771
2772         host = self.pg0.remote_hosts[0]
2773         server = self.pg0.remote_hosts[1]
2774
2775         host_nat_ip = "10.0.0.10"
2776         server_nat_ip = "10.0.0.11"
2777
2778         self.nat44_add_static_mapping(host.ip4, host_nat_ip)
2779         self.nat44_add_static_mapping(server.ip4, server_nat_ip)
2780         flags = self.config_flags.NAT44_EI_IF_INSIDE
2781         self.vapi.nat44_ei_interface_add_del_feature(
2782             sw_if_index=self.pg0.sw_if_index,
2783             flags=flags, is_add=1)
2784         self.vapi.nat44_ei_interface_add_del_feature(
2785             sw_if_index=self.pg1.sw_if_index,
2786             is_add=1)
2787
2788         # host to server
2789         p = (Ether(dst=self.pg0.local_mac, src=host.mac) /
2790              IP(src=host.ip4, dst=server_nat_ip) /
2791              GRE() /
2792              IP(src=self.pg2.remote_ip4, dst=self.pg3.remote_ip4) /
2793              TCP(sport=1234, dport=1234))
2794         self.pg0.add_stream(p)
2795         self.pg_enable_capture(self.pg_interfaces)
2796         self.pg_start()
2797         p = self.pg0.get_capture(1)
2798         packet = p[0]
2799         try:
2800             self.assertEqual(packet[IP].src, host_nat_ip)
2801             self.assertEqual(packet[IP].dst, server.ip4)
2802             self.assertEqual(packet.haslayer(GRE), 1)
2803             self.assert_packet_checksums_valid(packet)
2804         except:
2805             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2806             raise
2807
2808         # server to host
2809         p = (Ether(dst=self.pg0.local_mac, src=server.mac) /
2810              IP(src=server.ip4, dst=host_nat_ip) /
2811              GRE() /
2812              IP(src=self.pg3.remote_ip4, dst=self.pg2.remote_ip4) /
2813              TCP(sport=1234, dport=1234))
2814         self.pg0.add_stream(p)
2815         self.pg_enable_capture(self.pg_interfaces)
2816         self.pg_start()
2817         p = self.pg0.get_capture(1)
2818         packet = p[0]
2819         try:
2820             self.assertEqual(packet[IP].src, server_nat_ip)
2821             self.assertEqual(packet[IP].dst, host.ip4)
2822             self.assertEqual(packet.haslayer(GRE), 1)
2823             self.assert_packet_checksums_valid(packet)
2824         except:
2825             self.logger.error(ppp("Unexpected or invalid packet:", packet))
2826             raise
2827
2828     def test_output_feature(self):
2829         """ NAT44EI output feature (in2out postrouting) """
2830         self.nat44_add_address(self.nat_addr)
2831         self.vapi.nat44_ei_add_del_output_interface(
2832             sw_if_index=self.pg3.sw_if_index, is_add=1)
2833
2834         # in2out
2835         pkts = self.create_stream_in(self.pg0, self.pg3)
2836         self.pg0.add_stream(pkts)
2837         self.pg_enable_capture(self.pg_interfaces)
2838         self.pg_start()
2839         capture = self.pg3.get_capture(len(pkts))
2840         self.verify_capture_out(capture)
2841
2842         # out2in
2843         pkts = self.create_stream_out(self.pg3)
2844         self.pg3.add_stream(pkts)
2845         self.pg_enable_capture(self.pg_interfaces)
2846         self.pg_start()
2847         capture = self.pg0.get_capture(len(pkts))
2848         self.verify_capture_in(capture, self.pg0)
2849
2850         # from non-NAT interface to NAT inside interface
2851         pkts = self.create_stream_in(self.pg2, self.pg0)
2852         self.pg2.add_stream(pkts)
2853         self.pg_enable_capture(self.pg_interfaces)
2854         self.pg_start()
2855         capture = self.pg0.get_capture(len(pkts))
2856         self.verify_capture_no_translation(capture, self.pg2, self.pg0)
2857
2858     def test_output_feature_vrf_aware(self):
2859         """ NAT44EI output feature VRF aware (in2out postrouting) """
2860         nat_ip_vrf10 = "10.0.0.10"
2861         nat_ip_vrf20 = "10.0.0.20"
2862
2863         r1 = VppIpRoute(self, self.pg3.remote_ip4, 32,
2864                         [VppRoutePath(self.pg3.remote_ip4,
2865                                       self.pg3.sw_if_index)],
2866                         table_id=10)
2867         r2 = VppIpRoute(self, self.pg3.remote_ip4, 32,
2868                         [VppRoutePath(self.pg3.remote_ip4,
2869                                       self.pg3.sw_if_index)],
2870                         table_id=20)
2871         r1.add_vpp_config()
2872         r2.add_vpp_config()
2873
2874         self.nat44_add_address(nat_ip_vrf10, vrf_id=10)
2875         self.nat44_add_address(nat_ip_vrf20, vrf_id=20)
2876         self.vapi.nat44_ei_add_del_output_interface(
2877             sw_if_index=self.pg3.sw_if_index, is_add=1)
2878
2879         # in2out VRF 10
2880         pkts = self.create_stream_in(self.pg4, self.pg3)
2881         self.pg4.add_stream(pkts)
2882         self.pg_enable_capture(self.pg_interfaces)
2883         self.pg_start()
2884         capture = self.pg3.get_capture(len(pkts))
2885         self.verify_capture_out(capture, nat_ip=nat_ip_vrf10)
2886
2887         # out2in VRF 10
2888         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf10)
2889         self.pg3.add_stream(pkts)
2890         self.pg_enable_capture(self.pg_interfaces)
2891         self.pg_start()
2892         capture = self.pg4.get_capture(len(pkts))
2893         self.verify_capture_in(capture, self.pg4)
2894
2895         # in2out VRF 20
2896         pkts = self.create_stream_in(self.pg6, self.pg3)
2897         self.pg6.add_stream(pkts)
2898         self.pg_enable_capture(self.pg_interfaces)
2899         self.pg_start()
2900         capture = self.pg3.get_capture(len(pkts))
2901         self.verify_capture_out(capture, nat_ip=nat_ip_vrf20)
2902
2903         # out2in VRF 20
2904         pkts = self.create_stream_out(self.pg3, dst_ip=nat_ip_vrf20)
2905         self.pg3.add_stream(pkts)
2906         self.pg_enable_capture(self.pg_interfaces)
2907         self.pg_start()
2908         capture = self.pg6.get_capture(len(pkts))
2909         self.verify_capture_in(capture, self.pg6)
2910
2911     def test_output_feature_hairpinning(self):
2912         """ NAT44EI output feature hairpinning (in2out postrouting) """
2913         host = self.pg0.remote_hosts[0]
2914         server = self.pg0.remote_hosts[1]
2915         host_in_port = 1234
2916         host_out_port = 0
2917         server_in_port = 5678
2918         server_out_port = 8765
2919
2920         self.nat44_add_address(self.nat_addr)
2921         self.vapi.nat44_ei_add_del_output_interface(
2922             sw_if_index=self.pg0.sw_if_index, is_add=1)
2923         self.vapi.nat44_ei_add_del_output_interface(
2924             sw_if_index=self.pg1.sw_if_index, is_add=1)
2925
2926         # add static mapping for server
2927         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
2928                                       server_in_port, server_out_port,
2929                                       proto=IP_PROTOS.tcp)
2930
2931         # send packet from host to server
2932         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
2933              IP(src=host.ip4, dst=self.nat_addr) /
2934              TCP(sport=host_in_port, dport=server_out_port))
2935         self.pg0.add_stream(p)
2936         self.pg_enable_capture(self.pg_interfaces)
2937         self.pg_start()
2938         capture = self.pg0.get_capture(1)
2939         p = capture[0]
2940         try:
2941             ip = p[IP]
2942             tcp = p[TCP]
2943             self.assertEqual(ip.src, self.nat_addr)
2944             self.assertEqual(ip.dst, server.ip4)
2945             self.assertNotEqual(tcp.sport, host_in_port)
2946             self.assertEqual(tcp.dport, server_in_port)
2947             self.assert_packet_checksums_valid(p)
2948             host_out_port = tcp.sport
2949         except:
2950             self.logger.error(ppp("Unexpected or invalid packet:", p))
2951             raise
2952
2953         # send reply from server to host
2954         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
2955              IP(src=server.ip4, dst=self.nat_addr) /
2956              TCP(sport=server_in_port, dport=host_out_port))
2957         self.pg0.add_stream(p)
2958         self.pg_enable_capture(self.pg_interfaces)
2959         self.pg_start()
2960         capture = self.pg0.get_capture(1)
2961         p = capture[0]
2962         try:
2963             ip = p[IP]
2964             tcp = p[TCP]
2965             self.assertEqual(ip.src, self.nat_addr)
2966             self.assertEqual(ip.dst, host.ip4)
2967             self.assertEqual(tcp.sport, server_out_port)
2968             self.assertEqual(tcp.dport, host_in_port)
2969             self.assert_packet_checksums_valid(p)
2970         except:
2971             self.logger.error(ppp("Unexpected or invalid packet:", p))
2972             raise
2973
2974     def test_one_armed_nat44(self):
2975         """ NAT44EI One armed NAT """
2976         remote_host = self.pg9.remote_hosts[0]
2977         local_host = self.pg9.remote_hosts[1]
2978         external_port = 0
2979
2980         self.nat44_add_address(self.nat_addr)
2981         flags = self.config_flags.NAT44_EI_IF_INSIDE
2982         self.vapi.nat44_ei_interface_add_del_feature(
2983             sw_if_index=self.pg9.sw_if_index,
2984             is_add=1)
2985         self.vapi.nat44_ei_interface_add_del_feature(
2986             sw_if_index=self.pg9.sw_if_index,
2987             flags=flags, is_add=1)
2988
2989         # in2out
2990         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
2991              IP(src=local_host.ip4, dst=remote_host.ip4) /
2992              TCP(sport=12345, dport=80))
2993         self.pg9.add_stream(p)
2994         self.pg_enable_capture(self.pg_interfaces)
2995         self.pg_start()
2996         capture = self.pg9.get_capture(1)
2997         p = capture[0]
2998         try:
2999             ip = p[IP]
3000             tcp = p[TCP]
3001             self.assertEqual(ip.src, self.nat_addr)
3002             self.assertEqual(ip.dst, remote_host.ip4)
3003             self.assertNotEqual(tcp.sport, 12345)
3004             external_port = tcp.sport
3005             self.assertEqual(tcp.dport, 80)
3006             self.assert_packet_checksums_valid(p)
3007         except:
3008             self.logger.error(ppp("Unexpected or invalid packet:", p))
3009             raise
3010
3011         # out2in
3012         p = (Ether(src=self.pg9.remote_mac, dst=self.pg9.local_mac) /
3013              IP(src=remote_host.ip4, dst=self.nat_addr) /
3014              TCP(sport=80, dport=external_port))
3015         self.pg9.add_stream(p)
3016         self.pg_enable_capture(self.pg_interfaces)
3017         self.pg_start()
3018         capture = self.pg9.get_capture(1)
3019         p = capture[0]
3020         try:
3021             ip = p[IP]
3022             tcp = p[TCP]
3023             self.assertEqual(ip.src, remote_host.ip4)
3024             self.assertEqual(ip.dst, local_host.ip4)
3025             self.assertEqual(tcp.sport, 80)
3026             self.assertEqual(tcp.dport, 12345)
3027             self.assert_packet_checksums_valid(p)
3028         except:
3029             self.logger.error(ppp("Unexpected or invalid packet:", p))
3030             raise
3031
3032         if self.vpp_worker_count > 1:
3033             node = "nat44-ei-handoff-classify"
3034         else:
3035             node = "nat44-ei-classify"
3036
3037         err = self.statistics.get_err_counter('/err/%s/next in2out' % node)
3038         self.assertEqual(err, 1)
3039         err = self.statistics.get_err_counter('/err/%s/next out2in' % node)
3040         self.assertEqual(err, 1)
3041
3042     def test_del_session(self):
3043         """ NAT44EI delete session """
3044         self.nat44_add_address(self.nat_addr)
3045         flags = self.config_flags.NAT44_EI_IF_INSIDE
3046         self.vapi.nat44_ei_interface_add_del_feature(
3047             sw_if_index=self.pg0.sw_if_index,
3048             flags=flags, is_add=1)
3049         self.vapi.nat44_ei_interface_add_del_feature(
3050             sw_if_index=self.pg1.sw_if_index,
3051             is_add=1)
3052
3053         pkts = self.create_stream_in(self.pg0, self.pg1)
3054         self.pg0.add_stream(pkts)
3055         self.pg_enable_capture(self.pg_interfaces)
3056         self.pg_start()
3057         self.pg1.get_capture(len(pkts))
3058
3059         sessions = self.vapi.nat44_ei_user_session_dump(self.pg0.remote_ip4, 0)
3060         nsessions = len(sessions)
3061
3062         self.vapi.nat44_ei_del_session(
3063             address=sessions[0].inside_ip_address,
3064             port=sessions[0].inside_port,
3065             protocol=sessions[0].protocol,
3066             flags=self.config_flags.NAT44_EI_IF_INSIDE)
3067
3068         self.vapi.nat44_ei_del_session(
3069             address=sessions[1].outside_ip_address,
3070             port=sessions[1].outside_port,
3071             protocol=sessions[1].protocol)
3072
3073         sessions = self.vapi.nat44_ei_user_session_dump(self.pg0.remote_ip4, 0)
3074         self.assertEqual(nsessions - len(sessions), 2)
3075
3076         self.vapi.nat44_ei_del_session(
3077             address=sessions[0].inside_ip_address,
3078             port=sessions[0].inside_port,
3079             protocol=sessions[0].protocol,
3080             flags=self.config_flags.NAT44_EI_IF_INSIDE)
3081
3082         self.verify_no_nat44_user()
3083
3084     def test_frag_in_order(self):
3085         """ NAT44EI translate fragments arriving in order """
3086
3087         self.nat44_add_address(self.nat_addr)
3088         flags = self.config_flags.NAT44_EI_IF_INSIDE
3089         self.vapi.nat44_ei_interface_add_del_feature(
3090             sw_if_index=self.pg0.sw_if_index,
3091             flags=flags, is_add=1)
3092         self.vapi.nat44_ei_interface_add_del_feature(
3093             sw_if_index=self.pg1.sw_if_index,
3094             is_add=1)
3095
3096         self.frag_in_order(proto=IP_PROTOS.tcp)
3097         self.frag_in_order(proto=IP_PROTOS.udp)
3098         self.frag_in_order(proto=IP_PROTOS.icmp)
3099
3100     def test_frag_forwarding(self):
3101         """ NAT44EI forwarding fragment test """
3102         self.vapi.nat44_ei_add_del_interface_addr(
3103             is_add=1,
3104             sw_if_index=self.pg1.sw_if_index)
3105         flags = self.config_flags.NAT44_EI_IF_INSIDE
3106         self.vapi.nat44_ei_interface_add_del_feature(
3107             sw_if_index=self.pg0.sw_if_index,
3108             flags=flags, is_add=1)
3109         self.vapi.nat44_ei_interface_add_del_feature(
3110             sw_if_index=self.pg1.sw_if_index,
3111             is_add=1)
3112         self.vapi.nat44_ei_forwarding_enable_disable(enable=1)
3113
3114         data = b"A" * 16 + b"B" * 16 + b"C" * 3
3115         pkts = self.create_stream_frag(self.pg1,
3116                                        self.pg0.remote_ip4,
3117                                        4789,
3118                                        4789,
3119                                        data,
3120                                        proto=IP_PROTOS.udp)
3121         self.pg1.add_stream(pkts)
3122         self.pg_enable_capture(self.pg_interfaces)
3123         self.pg_start()
3124         frags = self.pg0.get_capture(len(pkts))
3125         p = self.reass_frags_and_verify(frags,
3126                                         self.pg1.remote_ip4,
3127                                         self.pg0.remote_ip4)
3128         self.assertEqual(p[UDP].sport, 4789)
3129         self.assertEqual(p[UDP].dport, 4789)
3130         self.assertEqual(data, p[Raw].load)
3131
3132     def test_reass_hairpinning(self):
3133         """ NAT44EI fragments hairpinning """
3134
3135         server_addr = self.pg0.remote_hosts[1].ip4
3136         host_in_port = random.randint(1025, 65535)
3137         server_in_port = random.randint(1025, 65535)
3138         server_out_port = random.randint(1025, 65535)
3139
3140         self.nat44_add_address(self.nat_addr)
3141         flags = self.config_flags.NAT44_EI_IF_INSIDE
3142         self.vapi.nat44_ei_interface_add_del_feature(
3143             sw_if_index=self.pg0.sw_if_index,
3144             flags=flags, is_add=1)
3145         self.vapi.nat44_ei_interface_add_del_feature(
3146             sw_if_index=self.pg1.sw_if_index,
3147             is_add=1)
3148         # add static mapping for server
3149         self.nat44_add_static_mapping(server_addr, self.nat_addr,
3150                                       server_in_port,
3151                                       server_out_port,
3152                                       proto=IP_PROTOS.tcp)
3153         self.nat44_add_static_mapping(server_addr, self.nat_addr,
3154                                       server_in_port,
3155                                       server_out_port,
3156                                       proto=IP_PROTOS.udp)
3157         self.nat44_add_static_mapping(server_addr, self.nat_addr)
3158
3159         self.reass_hairpinning(server_addr, server_in_port, server_out_port,
3160                                host_in_port, proto=IP_PROTOS.tcp)
3161         self.reass_hairpinning(server_addr, server_in_port, server_out_port,
3162                                host_in_port, proto=IP_PROTOS.udp)
3163         self.reass_hairpinning(server_addr, server_in_port, server_out_port,
3164                                host_in_port, proto=IP_PROTOS.icmp)
3165
3166     def test_frag_out_of_order(self):
3167         """ NAT44EI translate fragments arriving out of order """
3168
3169         self.nat44_add_address(self.nat_addr)
3170         flags = self.config_flags.NAT44_EI_IF_INSIDE
3171         self.vapi.nat44_ei_interface_add_del_feature(
3172             sw_if_index=self.pg0.sw_if_index,
3173             flags=flags, is_add=1)
3174         self.vapi.nat44_ei_interface_add_del_feature(
3175             sw_if_index=self.pg1.sw_if_index,
3176             is_add=1)
3177
3178         self.frag_out_of_order(proto=IP_PROTOS.tcp)
3179         self.frag_out_of_order(proto=IP_PROTOS.udp)
3180         self.frag_out_of_order(proto=IP_PROTOS.icmp)
3181
3182     def test_port_restricted(self):
3183         """ NAT44EI Port restricted NAT44EI (MAP-E CE) """
3184         self.nat44_add_address(self.nat_addr)
3185         flags = self.config_flags.NAT44_EI_IF_INSIDE
3186         self.vapi.nat44_ei_interface_add_del_feature(
3187             sw_if_index=self.pg0.sw_if_index,
3188             flags=flags, is_add=1)
3189         self.vapi.nat44_ei_interface_add_del_feature(
3190             sw_if_index=self.pg1.sw_if_index,
3191             is_add=1)
3192         self.vapi.nat44_ei_set_addr_and_port_alloc_alg(alg=1,
3193                                                        psid_offset=6,
3194                                                        psid_length=6,
3195                                                        psid=10)
3196
3197         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3198              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3199              TCP(sport=4567, dport=22))
3200         self.pg0.add_stream(p)
3201         self.pg_enable_capture(self.pg_interfaces)
3202         self.pg_start()
3203         capture = self.pg1.get_capture(1)
3204         p = capture[0]
3205         try:
3206             ip = p[IP]
3207             tcp = p[TCP]
3208             self.assertEqual(ip.dst, self.pg1.remote_ip4)
3209             self.assertEqual(ip.src, self.nat_addr)
3210             self.assertEqual(tcp.dport, 22)
3211             self.assertNotEqual(tcp.sport, 4567)
3212             self.assertEqual((tcp.sport >> 6) & 63, 10)
3213             self.assert_packet_checksums_valid(p)
3214         except:
3215             self.logger.error(ppp("Unexpected or invalid packet:", p))
3216             raise
3217
3218     def test_port_range(self):
3219         """ NAT44EI External address port range """
3220         self.nat44_add_address(self.nat_addr)
3221         flags = self.config_flags.NAT44_EI_IF_INSIDE
3222         self.vapi.nat44_ei_interface_add_del_feature(
3223             sw_if_index=self.pg0.sw_if_index,
3224             flags=flags, is_add=1)
3225         self.vapi.nat44_ei_interface_add_del_feature(
3226             sw_if_index=self.pg1.sw_if_index,
3227             is_add=1)
3228         self.vapi.nat44_ei_set_addr_and_port_alloc_alg(alg=2,
3229                                                        start_port=1025,
3230                                                        end_port=1027)
3231
3232         pkts = []
3233         for port in range(0, 5):
3234             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3235                  IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3236                  TCP(sport=1125 + port))
3237             pkts.append(p)
3238         self.pg0.add_stream(pkts)
3239         self.pg_enable_capture(self.pg_interfaces)
3240         self.pg_start()
3241         capture = self.pg1.get_capture(3)
3242         for p in capture:
3243             tcp = p[TCP]
3244             self.assertGreaterEqual(tcp.sport, 1025)
3245             self.assertLessEqual(tcp.sport, 1027)
3246
3247     def test_multiple_outside_vrf(self):
3248         """ NAT44EI Multiple outside VRF """
3249         vrf_id1 = 1
3250         vrf_id2 = 2
3251
3252         self.pg1.unconfig_ip4()
3253         self.pg2.unconfig_ip4()
3254         self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id1})
3255         self.vapi.ip_table_add_del(is_add=1, table={'table_id': vrf_id2})
3256         self.pg1.set_table_ip4(vrf_id1)
3257         self.pg2.set_table_ip4(vrf_id2)
3258         self.pg1.config_ip4()
3259         self.pg2.config_ip4()
3260         self.pg1.resolve_arp()
3261         self.pg2.resolve_arp()
3262
3263         self.nat44_add_address(self.nat_addr)
3264         flags = self.config_flags.NAT44_EI_IF_INSIDE
3265         self.vapi.nat44_ei_interface_add_del_feature(
3266             sw_if_index=self.pg0.sw_if_index,
3267             flags=flags, is_add=1)
3268         self.vapi.nat44_ei_interface_add_del_feature(
3269             sw_if_index=self.pg1.sw_if_index,
3270             is_add=1)
3271         self.vapi.nat44_ei_interface_add_del_feature(
3272             sw_if_index=self.pg2.sw_if_index,
3273             is_add=1)
3274
3275         try:
3276             # first VRF
3277             pkts = self.create_stream_in(self.pg0, self.pg1)
3278             self.pg0.add_stream(pkts)
3279             self.pg_enable_capture(self.pg_interfaces)
3280             self.pg_start()
3281             capture = self.pg1.get_capture(len(pkts))
3282             self.verify_capture_out(capture, self.nat_addr)
3283
3284             pkts = self.create_stream_out(self.pg1, self.nat_addr)
3285             self.pg1.add_stream(pkts)
3286             self.pg_enable_capture(self.pg_interfaces)
3287             self.pg_start()
3288             capture = self.pg0.get_capture(len(pkts))
3289             self.verify_capture_in(capture, self.pg0)
3290
3291             self.tcp_port_in = 60303
3292             self.udp_port_in = 60304
3293             self.icmp_id_in = 60305
3294
3295             # second VRF
3296             pkts = self.create_stream_in(self.pg0, self.pg2)
3297             self.pg0.add_stream(pkts)
3298             self.pg_enable_capture(self.pg_interfaces)
3299             self.pg_start()
3300             capture = self.pg2.get_capture(len(pkts))
3301             self.verify_capture_out(capture, self.nat_addr)
3302
3303             pkts = self.create_stream_out(self.pg2, self.nat_addr)
3304             self.pg2.add_stream(pkts)
3305             self.pg_enable_capture(self.pg_interfaces)
3306             self.pg_start()
3307             capture = self.pg0.get_capture(len(pkts))
3308             self.verify_capture_in(capture, self.pg0)
3309
3310         finally:
3311             self.nat44_add_address(self.nat_addr, is_add=0)
3312             self.pg1.unconfig_ip4()
3313             self.pg2.unconfig_ip4()
3314             self.pg1.set_table_ip4(0)
3315             self.pg2.set_table_ip4(0)
3316             self.pg1.config_ip4()
3317             self.pg2.config_ip4()
3318             self.pg1.resolve_arp()
3319             self.pg2.resolve_arp()
3320
3321     def test_mss_clamping(self):
3322         """ NAT44EI TCP MSS clamping """
3323         self.nat44_add_address(self.nat_addr)
3324         flags = self.config_flags.NAT44_EI_IF_INSIDE
3325         self.vapi.nat44_ei_interface_add_del_feature(
3326             sw_if_index=self.pg0.sw_if_index,
3327             flags=flags, is_add=1)
3328         self.vapi.nat44_ei_interface_add_del_feature(
3329             sw_if_index=self.pg1.sw_if_index,
3330             is_add=1)
3331
3332         p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
3333              IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
3334              TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
3335                  flags="S", options=[('MSS', 1400)]))
3336
3337         self.vapi.nat44_ei_set_mss_clamping(enable=1, mss_value=1000)
3338         self.pg0.add_stream(p)
3339         self.pg_enable_capture(self.pg_interfaces)
3340         self.pg_start()
3341         capture = self.pg1.get_capture(1)
3342         # Negotiated MSS value greater than configured - changed
3343         self.verify_mss_value(capture[0], 1000)
3344
3345         self.vapi.nat44_ei_set_mss_clamping(enable=0, mss_value=1500)
3346         self.pg0.add_stream(p)
3347         self.pg_enable_capture(self.pg_interfaces)
3348         self.pg_start()
3349         capture = self.pg1.get_capture(1)
3350         # MSS clamping disabled - negotiated MSS unchanged
3351         self.verify_mss_value(capture[0], 1400)
3352
3353         self.vapi.nat44_ei_set_mss_clamping(enable=1, mss_value=1500)
3354         self.pg0.add_stream(p)
3355         self.pg_enable_capture(self.pg_interfaces)
3356         self.pg_start()
3357         capture = self.pg1.get_capture(1)
3358         # Negotiated MSS value smaller than configured - unchanged
3359         self.verify_mss_value(capture[0], 1400)
3360
3361     def test_ha_send(self):
3362         """ NAT44EI Send HA session synchronization events (active) """
3363         flags = self.config_flags.NAT44_EI_IF_INSIDE
3364         self.vapi.nat44_ei_interface_add_del_feature(
3365             sw_if_index=self.pg0.sw_if_index,
3366             flags=flags, is_add=1)
3367         self.vapi.nat44_ei_interface_add_del_feature(
3368             sw_if_index=self.pg1.sw_if_index,
3369             is_add=1)
3370         self.nat44_add_address(self.nat_addr)
3371
3372         self.vapi.nat44_ei_ha_set_listener(
3373             ip_address=self.pg3.local_ip4, port=12345, path_mtu=512)
3374         self.vapi.nat44_ei_ha_set_failover(
3375             ip_address=self.pg3.remote_ip4, port=12346,
3376             session_refresh_interval=10)
3377         bind_layers(UDP, HANATStateSync, sport=12345)
3378
3379         # create sessions
3380         pkts = self.create_stream_in(self.pg0, self.pg1)
3381         self.pg0.add_stream(pkts)
3382         self.pg_enable_capture(self.pg_interfaces)
3383         self.pg_start()
3384         capture = self.pg1.get_capture(len(pkts))
3385         self.verify_capture_out(capture)
3386         # active send HA events
3387         self.vapi.nat44_ei_ha_flush()
3388         stats = self.statistics['/nat44-ei/ha/add-event-send']
3389         self.assertEqual(stats[:, 0].sum(), 3)
3390         capture = self.pg3.get_capture(1)
3391         p = capture[0]
3392         self.assert_packet_checksums_valid(p)
3393         try:
3394             ip = p[IP]
3395             udp = p[UDP]
3396             hanat = p[HANATStateSync]
3397         except IndexError:
3398             self.logger.error(ppp("Invalid packet:", p))
3399             raise
3400         else:
3401             self.assertEqual(ip.src, self.pg3.local_ip4)
3402             self.assertEqual(ip.dst, self.pg3.remote_ip4)
3403             self.assertEqual(udp.sport, 12345)
3404             self.assertEqual(udp.dport, 12346)
3405             self.assertEqual(hanat.version, 1)
3406             # self.assertEqual(hanat.thread_index, 0)
3407             self.assertEqual(hanat.count, 3)
3408             seq = hanat.sequence_number
3409             for event in hanat.events:
3410                 self.assertEqual(event.event_type, 1)
3411                 self.assertEqual(event.in_addr, self.pg0.remote_ip4)
3412                 self.assertEqual(event.out_addr, self.nat_addr)
3413                 self.assertEqual(event.fib_index, 0)
3414
3415         # ACK received events
3416         ack = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
3417                IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
3418                UDP(sport=12346, dport=12345) /
3419                HANATStateSync(sequence_number=seq, flags='ACK',
3420                               thread_index=hanat.thread_index))
3421         self.pg3.add_stream(ack)
3422         self.pg_start()
3423         stats = self.statistics['/nat44-ei/ha/ack-recv']
3424         self.assertEqual(stats[:, 0].sum(), 1)
3425
3426         # delete one session
3427         self.pg_enable_capture(self.pg_interfaces)
3428         self.vapi.nat44_ei_del_session(
3429             address=self.pg0.remote_ip4, port=self.tcp_port_in,
3430             protocol=IP_PROTOS.tcp, flags=self.config_flags.NAT44_EI_IF_INSIDE)
3431         self.vapi.nat44_ei_ha_flush()
3432         stats = self.statistics['/nat44-ei/ha/del-event-send']
3433         self.assertEqual(stats[:, 0].sum(), 1)
3434         capture = self.pg3.get_capture(1)
3435         p = capture[0]
3436         try:
3437             hanat = p[HANATStateSync]
3438         except IndexError:
3439             self.logger.error(ppp("Invalid packet:", p))
3440             raise
3441         else:
3442             self.assertGreater(hanat.sequence_number, seq)
3443
3444         # do not send ACK, active retry send HA event again
3445         self.pg_enable_capture(self.pg_interfaces)
3446         self.virtual_sleep(12)
3447         stats = self.statistics['/nat44-ei/ha/retry-count']
3448         self.assertEqual(stats[:, 0].sum(), 3)
3449         stats = self.statistics['/nat44-ei/ha/missed-count']
3450         self.assertEqual(stats[:, 0].sum(), 1)
3451         capture = self.pg3.get_capture(3)
3452         for packet in capture:
3453             self.assertEqual(packet, p)
3454
3455         # session counters refresh
3456         pkts = self.create_stream_out(self.pg1)
3457         self.pg1.add_stream(pkts)
3458         self.pg_enable_capture(self.pg_interfaces)
3459         self.pg_start()
3460         self.pg0.get_capture(2)
3461         self.vapi.nat44_ei_ha_flush()
3462         stats = self.statistics['/nat44-ei/ha/refresh-event-send']
3463         self.assertEqual(stats[:, 0].sum(), 2)
3464         capture = self.pg3.get_capture(1)
3465         p = capture[0]
3466         self.assert_packet_checksums_valid(p)
3467         try:
3468             ip = p[IP]
3469             udp = p[UDP]
3470             hanat = p[HANATStateSync]
3471         except IndexError:
3472             self.logger.error(ppp("Invalid packet:", p))
3473             raise
3474         else:
3475             self.assertEqual(ip.src, self.pg3.local_ip4)
3476             self.assertEqual(ip.dst, self.pg3.remote_ip4)
3477             self.assertEqual(udp.sport, 12345)
3478             self.assertEqual(udp.dport, 12346)
3479             self.assertEqual(hanat.version, 1)
3480             self.assertEqual(hanat.count, 2)
3481             seq = hanat.sequence_number
3482             for event in hanat.events:
3483                 self.assertEqual(event.event_type, 3)
3484                 self.assertEqual(event.out_addr, self.nat_addr)
3485                 self.assertEqual(event.fib_index, 0)
3486                 self.assertEqual(event.total_pkts, 2)
3487                 self.assertGreater(event.total_bytes, 0)
3488
3489         stats = self.statistics['/nat44-ei/ha/ack-recv']
3490         ack = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
3491                IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
3492                UDP(sport=12346, dport=12345) /
3493                HANATStateSync(sequence_number=seq, flags='ACK',
3494                               thread_index=hanat.thread_index))
3495         self.pg3.add_stream(ack)
3496         self.pg_start()
3497         stats = self.statistics['/nat44-ei/ha/ack-recv']
3498         self.assertEqual(stats[:, 0].sum(), 2)
3499
3500     def test_ha_recv(self):
3501         """ NAT44EI Receive HA session synchronization events (passive) """
3502         self.nat44_add_address(self.nat_addr)
3503         flags = self.config_flags.NAT44_EI_IF_INSIDE
3504         self.vapi.nat44_ei_interface_add_del_feature(
3505             sw_if_index=self.pg0.sw_if_index,
3506             flags=flags, is_add=1)
3507         self.vapi.nat44_ei_interface_add_del_feature(
3508             sw_if_index=self.pg1.sw_if_index,
3509             is_add=1)
3510         self.vapi.nat44_ei_ha_set_listener(ip_address=self.pg3.local_ip4,
3511                                            port=12345, path_mtu=512)
3512         bind_layers(UDP, HANATStateSync, sport=12345)
3513
3514         # this is a bit tricky - HA dictates thread index due to how it's
3515         # designed, but once we use HA to create a session, we also want
3516         # to pass a packet through said session. so the session must end
3517         # up on the correct thread from both directions - in2out (based on
3518         # IP address) and out2in (based on outside port)
3519
3520         # first choose a thread index which is correct for IP
3521         thread_index = get_nat44_ei_in2out_worker_index(self.pg0.remote_ip4,
3522                                                         self.vpp_worker_count)
3523
3524         # now pick a port which is correct for given thread
3525         port_per_thread = int((0xffff-1024) / max(1, self.vpp_worker_count))
3526         self.tcp_port_out = 1024 + random.randint(1, port_per_thread)
3527         self.udp_port_out = 1024 + random.randint(1, port_per_thread)
3528         if self.vpp_worker_count > 0:
3529             self.tcp_port_out += port_per_thread * (thread_index - 1)
3530             self.udp_port_out += port_per_thread * (thread_index - 1)
3531
3532         # send HA session add events to failover/passive
3533         p = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
3534              IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
3535              UDP(sport=12346, dport=12345) /
3536              HANATStateSync(sequence_number=1, events=[
3537                  Event(event_type='add', protocol='tcp',
3538                        in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr,
3539                        in_port=self.tcp_port_in, out_port=self.tcp_port_out,
3540                        eh_addr=self.pg1.remote_ip4,
3541                        ehn_addr=self.pg1.remote_ip4,
3542                        eh_port=self.tcp_external_port,
3543                        ehn_port=self.tcp_external_port, fib_index=0),
3544                  Event(event_type='add', protocol='udp',
3545                        in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr,
3546                        in_port=self.udp_port_in, out_port=self.udp_port_out,
3547                        eh_addr=self.pg1.remote_ip4,
3548                        ehn_addr=self.pg1.remote_ip4,
3549                        eh_port=self.udp_external_port,
3550                        ehn_port=self.udp_external_port, fib_index=0)],
3551                  thread_index=thread_index))
3552
3553         self.pg3.add_stream(p)
3554         self.pg_enable_capture(self.pg_interfaces)
3555         self.pg_start()
3556         # receive ACK
3557         capture = self.pg3.get_capture(1)
3558         p = capture[0]
3559         try:
3560             hanat = p[HANATStateSync]
3561         except IndexError:
3562             self.logger.error(ppp("Invalid packet:", p))
3563             raise
3564         else:
3565             self.assertEqual(hanat.sequence_number, 1)
3566             self.assertEqual(hanat.flags, 'ACK')
3567             self.assertEqual(hanat.version, 1)
3568             self.assertEqual(hanat.thread_index, thread_index)
3569         stats = self.statistics['/nat44-ei/ha/ack-send']
3570         self.assertEqual(stats[:, 0].sum(), 1)
3571         stats = self.statistics['/nat44-ei/ha/add-event-recv']
3572         self.assertEqual(stats[:, 0].sum(), 2)
3573         users = self.statistics['/nat44-ei/total-users']
3574         self.assertEqual(users[:, 0].sum(), 1)
3575         sessions = self.statistics['/nat44-ei/total-sessions']
3576         self.assertEqual(sessions[:, 0].sum(), 2)
3577         users = self.vapi.nat44_ei_user_dump()
3578         self.assertEqual(len(users), 1)
3579         self.assertEqual(str(users[0].ip_address),
3580                          self.pg0.remote_ip4)
3581         # there should be 2 sessions created by HA
3582         sessions = self.vapi.nat44_ei_user_session_dump(
3583             users[0].ip_address, users[0].vrf_id)
3584         self.assertEqual(len(sessions), 2)
3585         for session in sessions:
3586             self.assertEqual(str(session.inside_ip_address),
3587                              self.pg0.remote_ip4)
3588             self.assertEqual(str(session.outside_ip_address),
3589                              self.nat_addr)
3590             self.assertIn(session.inside_port,
3591                           [self.tcp_port_in, self.udp_port_in])
3592             self.assertIn(session.outside_port,
3593                           [self.tcp_port_out, self.udp_port_out])
3594             self.assertIn(session.protocol, [IP_PROTOS.tcp, IP_PROTOS.udp])
3595
3596         # send HA session delete event to failover/passive
3597         p = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
3598              IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
3599              UDP(sport=12346, dport=12345) /
3600              HANATStateSync(sequence_number=2, events=[
3601                  Event(event_type='del', protocol='udp',
3602                        in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr,
3603                        in_port=self.udp_port_in, out_port=self.udp_port_out,
3604                        eh_addr=self.pg1.remote_ip4,
3605                        ehn_addr=self.pg1.remote_ip4,
3606                        eh_port=self.udp_external_port,
3607                        ehn_port=self.udp_external_port, fib_index=0)],
3608                  thread_index=thread_index))
3609
3610         self.pg3.add_stream(p)
3611         self.pg_enable_capture(self.pg_interfaces)
3612         self.pg_start()
3613         # receive ACK
3614         capture = self.pg3.get_capture(1)
3615         p = capture[0]
3616         try:
3617             hanat = p[HANATStateSync]
3618         except IndexError:
3619             self.logger.error(ppp("Invalid packet:", p))
3620             raise
3621         else:
3622             self.assertEqual(hanat.sequence_number, 2)
3623             self.assertEqual(hanat.flags, 'ACK')
3624             self.assertEqual(hanat.version, 1)
3625         users = self.vapi.nat44_ei_user_dump()
3626         self.assertEqual(len(users), 1)
3627         self.assertEqual(str(users[0].ip_address),
3628                          self.pg0.remote_ip4)
3629         # now we should have only 1 session, 1 deleted by HA
3630         sessions = self.vapi.nat44_ei_user_session_dump(users[0].ip_address,
3631                                                         users[0].vrf_id)
3632         self.assertEqual(len(sessions), 1)
3633         stats = self.statistics['/nat44-ei/ha/del-event-recv']
3634         self.assertEqual(stats[:, 0].sum(), 1)
3635
3636         stats = self.statistics.get_err_counter(
3637             '/err/nat44-ei-ha/pkts-processed')
3638         self.assertEqual(stats, 2)
3639
3640         # send HA session refresh event to failover/passive
3641         p = (Ether(dst=self.pg3.local_mac, src=self.pg3.remote_mac) /
3642              IP(src=self.pg3.remote_ip4, dst=self.pg3.local_ip4) /
3643              UDP(sport=12346, dport=12345) /
3644              HANATStateSync(sequence_number=3, events=[
3645                  Event(event_type='refresh', protocol='tcp',
3646                        in_addr=self.pg0.remote_ip4, out_addr=self.nat_addr,
3647                        in_port=self.tcp_port_in, out_port=self.tcp_port_out,
3648                        eh_addr=self.pg1.remote_ip4,
3649                        ehn_addr=self.pg1.remote_ip4,
3650                        eh_port=self.tcp_external_port,
3651                        ehn_port=self.tcp_external_port, fib_index=0,
3652                        total_bytes=1024, total_pkts=2)],
3653                  thread_index=thread_index))
3654         self.pg3.add_stream(p)
3655         self.pg_enable_capture(self.pg_interfaces)
3656         self.pg_start()
3657         # receive ACK
3658         capture = self.pg3.get_capture(1)
3659         p = capture[0]
3660         try:
3661             hanat = p[HANATStateSync]
3662         except IndexError:
3663             self.logger.error(ppp("Invalid packet:", p))
3664             raise
3665         else:
3666             self.assertEqual(hanat.sequence_number, 3)
3667             self.assertEqual(hanat.flags, 'ACK')
3668             self.assertEqual(hanat.version, 1)
3669         users = self.vapi.nat44_ei_user_dump()
3670         self.assertEqual(len(users), 1)
3671         self.assertEqual(str(users[0].ip_address),
3672                          self.pg0.remote_ip4)
3673         sessions = self.vapi.nat44_ei_user_session_dump(
3674             users[0].ip_address, users[0].vrf_id)
3675         self.assertEqual(len(sessions), 1)
3676         session = sessions[0]
3677         self.assertEqual(session.total_bytes, 1024)
3678         self.assertEqual(session.total_pkts, 2)
3679         stats = self.statistics['/nat44-ei/ha/refresh-event-recv']
3680         self.assertEqual(stats[:, 0].sum(), 1)
3681
3682         stats = self.statistics.get_err_counter(
3683             '/err/nat44-ei-ha/pkts-processed')
3684         self.assertEqual(stats, 3)
3685
3686         # send packet to test session created by HA
3687         p = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
3688              IP(src=self.pg1.remote_ip4, dst=self.nat_addr) /
3689              TCP(sport=self.tcp_external_port, dport=self.tcp_port_out))
3690         self.pg1.add_stream(p)
3691         self.pg_enable_capture(self.pg_interfaces)
3692         self.pg_start()
3693         capture = self.pg0.get_capture(1)
3694         p = capture[0]
3695         try:
3696             ip = p[IP]
3697             tcp = p[TCP]
3698         except IndexError:
3699             self.logger.error(ppp("Invalid packet:", p))
3700             raise
3701         else:
3702             self.assertEqual(ip.src, self.pg1.remote_ip4)
3703             self.assertEqual(ip.dst, self.pg0.remote_ip4)
3704             self.assertEqual(tcp.sport, self.tcp_external_port)
3705             self.assertEqual(tcp.dport, self.tcp_port_in)
3706
3707     def reconfigure_frame_queue_nelts(self, frame_queue_nelts):
3708         self.vapi.nat44_ei_plugin_enable_disable(enable=0)
3709         self.vapi.nat44_ei_set_fq_options(frame_queue_nelts=frame_queue_nelts)
3710         # keep plugin configuration persistent
3711         self.plugin_enable()
3712         return self.vapi.nat44_ei_show_fq_options().frame_queue_nelts
3713
3714     def test_set_frame_queue_nelts(self):
3715         """ NAT44EI API test - worker handoff frame queue elements """
3716         self.assertEqual(self.reconfigure_frame_queue_nelts(512), 512)
3717
3718     def show_commands_at_teardown(self):
3719         self.logger.info(self.vapi.cli("show nat44 ei timeouts"))
3720         self.logger.info(self.vapi.cli("show nat44 ei addresses"))
3721         self.logger.info(self.vapi.cli("show nat44 ei interfaces"))
3722         self.logger.info(self.vapi.cli("show nat44 ei static mappings"))
3723         self.logger.info(self.vapi.cli("show nat44 ei interface address"))
3724         self.logger.info(self.vapi.cli("show nat44 ei sessions detail"))
3725         self.logger.info(self.vapi.cli("show nat44 ei hash tables detail"))
3726         self.logger.info(self.vapi.cli("show nat44 ei ha"))
3727         self.logger.info(
3728             self.vapi.cli("show nat44 ei addr-port-assignment-alg"))
3729
3730     def test_outside_address_distribution(self):
3731         """ Outside address distribution based on source address """
3732
3733         x = 100
3734         nat_addresses = []
3735
3736         for i in range(1, x):
3737             a = "10.0.0.%d" % i
3738             nat_addresses.append(a)
3739
3740         flags = self.config_flags.NAT44_EI_IF_INSIDE
3741         self.vapi.nat44_ei_interface_add_del_feature(
3742             sw_if_index=self.pg0.sw_if_index,
3743             flags=flags, is_add=1)
3744         self.vapi.nat44_ei_interface_add_del_feature(
3745             sw_if_index=self.pg1.sw_if_index,
3746             is_add=1)
3747
3748         self.vapi.nat44_ei_add_del_address_range(
3749             first_ip_address=nat_addresses[0],
3750             last_ip_address=nat_addresses[-1],
3751             vrf_id=0xFFFFFFFF, is_add=1)
3752
3753         self.pg0.generate_remote_hosts(x)
3754
3755         pkts = []
3756         for i in range(x):
3757             info = self.create_packet_info(self.pg0, self.pg1)
3758             payload = self.info_to_payload(info)
3759             p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
3760                  IP(src=self.pg0.remote_hosts[i].ip4,
3761                      dst=self.pg1.remote_ip4) /
3762                  UDP(sport=7000+i, dport=8000+i) /
3763                  Raw(payload))
3764             info.data = p
3765             pkts.append(p)
3766
3767         self.pg0.add_stream(pkts)
3768         self.pg_enable_capture(self.pg_interfaces)
3769         self.pg_start()
3770         recvd = self.pg1.get_capture(len(pkts))
3771         for p_recvd in recvd:
3772             payload_info = self.payload_to_info(p_recvd[Raw])
3773             packet_index = payload_info.index
3774             info = self._packet_infos[packet_index]
3775             self.assertTrue(info is not None)
3776             self.assertEqual(packet_index, info.index)
3777             p_sent = info.data
3778             packed = socket.inet_aton(p_sent[IP].src)
3779             numeric = struct.unpack("!L", packed)[0]
3780             numeric = socket.htonl(numeric)
3781             a = nat_addresses[(numeric-1) % len(nat_addresses)]
3782             self.assertEqual(
3783                 a, p_recvd[IP].src,
3784                 "Invalid packet (src IP %s translated to %s, but expected %s)"
3785                 % (p_sent[IP].src, p_recvd[IP].src, a))
3786
3787     def test_default_user_sessions(self):
3788         """ NAT44EI default per-user session limit is used and reported """
3789         nat44_ei_config = self.vapi.nat44_ei_show_running_config()
3790         # a nonzero default should be reported for user_sessions
3791         self.assertNotEqual(nat44_ei_config.user_sessions, 0)
3792
3793
3794 class TestNAT44Out2InDPO(MethodHolder):
3795     """ NAT44EI Test Cases using out2in DPO """
3796
3797     @classmethod
3798     def setUpClass(cls):
3799         super(TestNAT44Out2InDPO, cls).setUpClass()
3800         cls.vapi.cli("set log class nat44-ei level debug")
3801
3802         cls.tcp_port_in = 6303
3803         cls.tcp_port_out = 6303
3804         cls.udp_port_in = 6304
3805         cls.udp_port_out = 6304
3806         cls.icmp_id_in = 6305
3807         cls.icmp_id_out = 6305
3808         cls.nat_addr = '10.0.0.3'
3809         cls.dst_ip4 = '192.168.70.1'
3810
3811         cls.create_pg_interfaces(range(2))
3812
3813         cls.pg0.admin_up()
3814         cls.pg0.config_ip4()
3815         cls.pg0.resolve_arp()
3816
3817         cls.pg1.admin_up()
3818         cls.pg1.config_ip6()
3819         cls.pg1.resolve_ndp()
3820
3821         r1 = VppIpRoute(cls, "::", 0,
3822                         [VppRoutePath(cls.pg1.remote_ip6,
3823                                       cls.pg1.sw_if_index)],
3824                         register=False)
3825         r1.add_vpp_config()
3826
3827     def setUp(self):
3828         super(TestNAT44Out2InDPO, self).setUp()
3829         flags = self.config_flags.NAT44_EI_OUT2IN_DPO
3830         self.vapi.nat44_ei_plugin_enable_disable(enable=1, flags=flags)
3831
3832     def tearDown(self):
3833         super(TestNAT44Out2InDPO, self).tearDown()
3834         if not self.vpp_dead:
3835             self.vapi.nat44_ei_plugin_enable_disable(enable=0)
3836             self.vapi.cli("clear logging")
3837
3838     def configure_xlat(self):
3839         self.dst_ip6_pfx = '1:2:3::'
3840         self.dst_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3841                                               self.dst_ip6_pfx)
3842         self.dst_ip6_pfx_len = 96
3843         self.src_ip6_pfx = '4:5:6::'
3844         self.src_ip6_pfx_n = socket.inet_pton(socket.AF_INET6,
3845                                               self.src_ip6_pfx)
3846         self.src_ip6_pfx_len = 96
3847         self.vapi.map_add_domain(self.dst_ip6_pfx_n, self.dst_ip6_pfx_len,
3848                                  self.src_ip6_pfx_n, self.src_ip6_pfx_len,
3849                                  '\x00\x00\x00\x00', 0)
3850
3851     @unittest.skip('Temporary disabled')
3852     def test_464xlat_ce(self):
3853         """ Test 464XLAT CE with NAT44EI """
3854
3855         self.configure_xlat()
3856
3857         flags = self.config_flags.NAT44_EI_IF_INSIDE
3858         self.vapi.nat44_ei_interface_add_del_feature(
3859             sw_if_index=self.pg0.sw_if_index,
3860             flags=flags, is_add=1)
3861         self.vapi.nat44_ei_add_del_address_range(
3862             first_ip_address=self.nat_addr_n,
3863             last_ip_address=self.nat_addr_n,
3864             vrf_id=0xFFFFFFFF, is_add=1)
3865
3866         out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3867                                        self.dst_ip6_pfx_len)
3868         out_dst_ip6 = self.compose_ip6(self.nat_addr, self.src_ip6_pfx,
3869                                        self.src_ip6_pfx_len)
3870
3871         try:
3872             pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3873             self.pg0.add_stream(pkts)
3874             self.pg_enable_capture(self.pg_interfaces)
3875             self.pg_start()
3876             capture = self.pg1.get_capture(len(pkts))
3877             self.verify_capture_out_ip6(capture, nat_ip=out_dst_ip6,
3878                                         dst_ip=out_src_ip6)
3879
3880             pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6,
3881                                               out_dst_ip6)
3882             self.pg1.add_stream(pkts)
3883             self.pg_enable_capture(self.pg_interfaces)
3884             self.pg_start()
3885             capture = self.pg0.get_capture(len(pkts))
3886             self.verify_capture_in(capture, self.pg0)
3887         finally:
3888             self.vapi.nat44_ei_interface_add_del_feature(
3889                 sw_if_index=self.pg0.sw_if_index,
3890                 flags=flags)
3891             self.vapi.nat44_ei_add_del_address_range(
3892                 first_ip_address=self.nat_addr_n,
3893                 last_ip_address=self.nat_addr_n,
3894                 vrf_id=0xFFFFFFFF)
3895
3896     @unittest.skip('Temporary disabled')
3897     def test_464xlat_ce_no_nat(self):
3898         """ Test 464XLAT CE without NAT44EI """
3899
3900         self.configure_xlat()
3901
3902         out_src_ip6 = self.compose_ip6(self.dst_ip4, self.dst_ip6_pfx,
3903                                        self.dst_ip6_pfx_len)
3904         out_dst_ip6 = self.compose_ip6(self.pg0.remote_ip4, self.src_ip6_pfx,
3905                                        self.src_ip6_pfx_len)
3906
3907         pkts = self.create_stream_in(self.pg0, self.pg1, self.dst_ip4)
3908         self.pg0.add_stream(pkts)
3909         self.pg_enable_capture(self.pg_interfaces)
3910         self.pg_start()
3911         capture = self.pg1.get_capture(len(pkts))
3912         self.verify_capture_out_ip6(capture, dst_ip=out_src_ip6,
3913                                     nat_ip=out_dst_ip6, same_port=True)
3914
3915         pkts = self.create_stream_out_ip6(self.pg1, out_src_ip6, out_dst_ip6)
3916         self.pg1.add_stream(pkts)
3917         self.pg_enable_capture(self.pg_interfaces)
3918         self.pg_start()
3919         capture = self.pg0.get_capture(len(pkts))
3920         self.verify_capture_in(capture, self.pg0)
3921
3922
3923 class TestNAT44EIMW(MethodHolder):
3924     """ NAT44EI Test Cases (multiple workers) """
3925     vpp_worker_count = 2
3926     max_translations = 10240
3927     max_users = 10240
3928
3929     @classmethod
3930     def setUpClass(cls):
3931         super(TestNAT44EIMW, cls).setUpClass()
3932         cls.vapi.cli("set log class nat level debug")
3933
3934         cls.tcp_port_in = 6303
3935         cls.tcp_port_out = 6303
3936         cls.udp_port_in = 6304
3937         cls.udp_port_out = 6304
3938         cls.icmp_id_in = 6305
3939         cls.icmp_id_out = 6305
3940         cls.nat_addr = '10.0.0.3'
3941         cls.ipfix_src_port = 4739
3942         cls.ipfix_domain_id = 1
3943         cls.tcp_external_port = 80
3944         cls.udp_external_port = 69
3945
3946         cls.create_pg_interfaces(range(10))
3947         cls.interfaces = list(cls.pg_interfaces[0:4])
3948
3949         for i in cls.interfaces:
3950             i.admin_up()
3951             i.config_ip4()
3952             i.resolve_arp()
3953
3954         cls.pg0.generate_remote_hosts(3)
3955         cls.pg0.configure_ipv4_neighbors()
3956
3957         cls.pg1.generate_remote_hosts(1)
3958         cls.pg1.configure_ipv4_neighbors()
3959
3960         cls.overlapping_interfaces = list(list(cls.pg_interfaces[4:7]))
3961         cls.vapi.ip_table_add_del(is_add=1, table={'table_id': 10})
3962         cls.vapi.ip_table_add_del(is_add=1, table={'table_id': 20})
3963
3964         cls.pg4._local_ip4 = "172.16.255.1"
3965         cls.pg4._remote_hosts[0]._ip4 = "172.16.255.2"
3966         cls.pg4.set_table_ip4(10)
3967         cls.pg5._local_ip4 = "172.17.255.3"
3968         cls.pg5._remote_hosts[0]._ip4 = "172.17.255.4"
3969         cls.pg5.set_table_ip4(10)
3970         cls.pg6._local_ip4 = "172.16.255.1"
3971         cls.pg6._remote_hosts[0]._ip4 = "172.16.255.2"
3972         cls.pg6.set_table_ip4(20)
3973         for i in cls.overlapping_interfaces:
3974             i.config_ip4()
3975             i.admin_up()
3976             i.resolve_arp()
3977
3978         cls.pg7.admin_up()
3979         cls.pg8.admin_up()
3980
3981         cls.pg9.generate_remote_hosts(2)
3982         cls.pg9.config_ip4()
3983         cls.vapi.sw_interface_add_del_address(
3984             sw_if_index=cls.pg9.sw_if_index,
3985             prefix="10.0.0.1/24")
3986
3987         cls.pg9.admin_up()
3988         cls.pg9.resolve_arp()
3989         cls.pg9._remote_hosts[1]._ip4 = cls.pg9._remote_hosts[0]._ip4
3990         cls.pg4._remote_ip4 = cls.pg9._remote_hosts[0]._ip4 = "10.0.0.2"
3991         cls.pg9.resolve_arp()
3992
3993     def setUp(self):
3994         super(TestNAT44EIMW, self).setUp()
3995         self.vapi.nat44_ei_plugin_enable_disable(
3996             sessions=self.max_translations,
3997             users=self.max_users, enable=1)
3998
3999     def tearDown(self):
4000         super(TestNAT44EIMW, self).tearDown()
4001         if not self.vpp_dead:
4002             self.vapi.nat44_ei_ipfix_enable_disable(
4003                 domain_id=self.ipfix_domain_id,
4004                 src_port=self.ipfix_src_port,
4005                 enable=0)
4006             self.ipfix_src_port = 4739
4007             self.ipfix_domain_id = 1
4008
4009             self.vapi.nat44_ei_plugin_enable_disable(enable=0)
4010             self.vapi.cli("clear logging")
4011
4012     def test_hairpinning(self):
4013         """ NAT44EI hairpinning - 1:1 NAPT """
4014
4015         host = self.pg0.remote_hosts[0]
4016         server = self.pg0.remote_hosts[1]
4017         host_in_port = 1234
4018         host_out_port = 0
4019         server_in_port = 5678
4020         server_out_port = 8765
4021         worker_1 = 1
4022         worker_2 = 2
4023
4024         self.nat44_add_address(self.nat_addr)
4025         flags = self.config_flags.NAT44_EI_IF_INSIDE
4026         self.vapi.nat44_ei_interface_add_del_feature(
4027             sw_if_index=self.pg0.sw_if_index,
4028             flags=flags, is_add=1)
4029         self.vapi.nat44_ei_interface_add_del_feature(
4030             sw_if_index=self.pg1.sw_if_index,
4031             is_add=1)
4032
4033         # add static mapping for server
4034         self.nat44_add_static_mapping(server.ip4, self.nat_addr,
4035                                       server_in_port, server_out_port,
4036                                       proto=IP_PROTOS.tcp)
4037
4038         cnt = self.statistics['/nat44-ei/hairpinning']
4039         # send packet from host to server
4040         p = (Ether(src=host.mac, dst=self.pg0.local_mac) /
4041              IP(src=host.ip4, dst=self.nat_addr) /
4042              TCP(sport=host_in_port, dport=server_out_port))
4043         self.pg0.add_stream(p)
4044         self.pg_enable_capture(self.pg_interfaces)
4045         self.pg_start()
4046         capture = self.pg0.get_capture(1)
4047         p = capture[0]
4048         try:
4049             ip = p[IP]
4050             tcp = p[TCP]
4051             self.assertEqual(ip.src, self.nat_addr)
4052             self.assertEqual(ip.dst, server.ip4)
4053             self.assertNotEqual(tcp.sport, host_in_port)
4054             self.assertEqual(tcp.dport, server_in_port)
4055             self.assert_packet_checksums_valid(p)
4056             host_out_port = tcp.sport
4057         except:
4058             self.logger.error(ppp("Unexpected or invalid packet:", p))
4059             raise
4060
4061         after = self.statistics['/nat44-ei/hairpinning']
4062
4063         if_idx = self.pg0.sw_if_index
4064         self.assertEqual(after[worker_2][if_idx] - cnt[worker_1][if_idx], 1)
4065
4066         # send reply from server to host
4067         p = (Ether(src=server.mac, dst=self.pg0.local_mac) /
4068              IP(src=server.ip4, dst=self.nat_addr) /
4069              TCP(sport=server_in_port, dport=host_out_port))
4070         self.pg0.add_stream(p)
4071         self.pg_enable_capture(self.pg_interfaces)
4072         self.pg_start()
4073         capture = self.pg0.get_capture(1)
4074         p = capture[0]
4075         try:
4076             ip = p[IP]
4077             tcp = p[TCP]
4078             self.assertEqual(ip.src, self.nat_addr)
4079             self.assertEqual(ip.dst, host.ip4)
4080             self.assertEqual(tcp.sport, server_out_port)
4081             self.assertEqual(tcp.dport, host_in_port)
4082             self.assert_packet_checksums_valid(p)
4083         except:
4084             self.logger.error(ppp("Unexpected or invalid packet:", p))
4085             raise
4086
4087         after = self.statistics['/nat44-ei/hairpinning']
4088         if_idx = self.pg0.sw_if_index
4089         self.assertEqual(after[worker_1][if_idx] - cnt[worker_1][if_idx], 1)
4090         self.assertEqual(after[worker_2][if_idx] - cnt[worker_2][if_idx], 2)
4091
4092     def test_hairpinning2(self):
4093         """ NAT44EI hairpinning - 1:1 NAT"""
4094
4095         server1_nat_ip = "10.0.0.10"
4096         server2_nat_ip = "10.0.0.11"
4097         host = self.pg0.remote_hosts[0]
4098         server1 = self.pg0.remote_hosts[1]
4099         server2 = self.pg0.remote_hosts[2]
4100         server_tcp_port = 22
4101         server_udp_port = 20
4102
4103         self.nat44_add_address(self.nat_addr)
4104         flags = self.config_flags.NAT44_EI_IF_INSIDE
4105         self.vapi.nat44_ei_interface_add_del_feature(
4106             sw_if_index=self.pg0.sw_if_index,
4107             flags=flags, is_add=1)
4108         self.vapi.nat44_ei_interface_add_del_feature(
4109             sw_if_index=self.pg1.sw_if_index,
4110             is_add=1)
4111
4112         # add static mapping for servers
4113         self.nat44_add_static_mapping(server1.ip4, server1_nat_ip)
4114         self.nat44_add_static_mapping(server2.ip4, server2_nat_ip)
4115
4116         # host to server1
4117         pkts = []
4118         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4119              IP(src=host.ip4, dst=server1_nat_ip) /
4120              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
4121         pkts.append(p)
4122         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4123              IP(src=host.ip4, dst=server1_nat_ip) /
4124              UDP(sport=self.udp_port_in, dport=server_udp_port))
4125         pkts.append(p)
4126         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4127              IP(src=host.ip4, dst=server1_nat_ip) /
4128              ICMP(id=self.icmp_id_in, type='echo-request'))
4129         pkts.append(p)
4130         self.pg0.add_stream(pkts)
4131         self.pg_enable_capture(self.pg_interfaces)
4132         self.pg_start()
4133         capture = self.pg0.get_capture(len(pkts))
4134         for packet in capture:
4135             try:
4136                 self.assertEqual(packet[IP].src, self.nat_addr)
4137                 self.assertEqual(packet[IP].dst, server1.ip4)
4138                 if packet.haslayer(TCP):
4139                     self.assertNotEqual(packet[TCP].sport, self.tcp_port_in)
4140                     self.assertEqual(packet[TCP].dport, server_tcp_port)
4141                     self.tcp_port_out = packet[TCP].sport
4142                     self.assert_packet_checksums_valid(packet)
4143                 elif packet.haslayer(UDP):
4144                     self.assertNotEqual(packet[UDP].sport, self.udp_port_in)
4145                     self.assertEqual(packet[UDP].dport, server_udp_port)
4146                     self.udp_port_out = packet[UDP].sport
4147                 else:
4148                     self.assertNotEqual(packet[ICMP].id, self.icmp_id_in)
4149                     self.icmp_id_out = packet[ICMP].id
4150             except:
4151                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4152                 raise
4153
4154         # server1 to host
4155         pkts = []
4156         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4157              IP(src=server1.ip4, dst=self.nat_addr) /
4158              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
4159         pkts.append(p)
4160         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4161              IP(src=server1.ip4, dst=self.nat_addr) /
4162              UDP(sport=server_udp_port, dport=self.udp_port_out))
4163         pkts.append(p)
4164         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4165              IP(src=server1.ip4, dst=self.nat_addr) /
4166              ICMP(id=self.icmp_id_out, type='echo-reply'))
4167         pkts.append(p)
4168         self.pg0.add_stream(pkts)
4169         self.pg_enable_capture(self.pg_interfaces)
4170         self.pg_start()
4171         capture = self.pg0.get_capture(len(pkts))
4172         for packet in capture:
4173             try:
4174                 self.assertEqual(packet[IP].src, server1_nat_ip)
4175                 self.assertEqual(packet[IP].dst, host.ip4)
4176                 if packet.haslayer(TCP):
4177                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
4178                     self.assertEqual(packet[TCP].sport, server_tcp_port)
4179                     self.assert_packet_checksums_valid(packet)
4180                 elif packet.haslayer(UDP):
4181                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
4182                     self.assertEqual(packet[UDP].sport, server_udp_port)
4183                 else:
4184                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
4185             except:
4186                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4187                 raise
4188
4189         # server2 to server1
4190         pkts = []
4191         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4192              IP(src=server2.ip4, dst=server1_nat_ip) /
4193              TCP(sport=self.tcp_port_in, dport=server_tcp_port))
4194         pkts.append(p)
4195         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4196              IP(src=server2.ip4, dst=server1_nat_ip) /
4197              UDP(sport=self.udp_port_in, dport=server_udp_port))
4198         pkts.append(p)
4199         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4200              IP(src=server2.ip4, dst=server1_nat_ip) /
4201              ICMP(id=self.icmp_id_in, type='echo-request'))
4202         pkts.append(p)
4203         self.pg0.add_stream(pkts)
4204         self.pg_enable_capture(self.pg_interfaces)
4205         self.pg_start()
4206         capture = self.pg0.get_capture(len(pkts))
4207         for packet in capture:
4208             try:
4209                 self.assertEqual(packet[IP].src, server2_nat_ip)
4210                 self.assertEqual(packet[IP].dst, server1.ip4)
4211                 if packet.haslayer(TCP):
4212                     self.assertEqual(packet[TCP].sport, self.tcp_port_in)
4213                     self.assertEqual(packet[TCP].dport, server_tcp_port)
4214                     self.tcp_port_out = packet[TCP].sport
4215                     self.assert_packet_checksums_valid(packet)
4216                 elif packet.haslayer(UDP):
4217                     self.assertEqual(packet[UDP].sport, self.udp_port_in)
4218                     self.assertEqual(packet[UDP].dport, server_udp_port)
4219                     self.udp_port_out = packet[UDP].sport
4220                 else:
4221                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
4222                     self.icmp_id_out = packet[ICMP].id
4223             except:
4224                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4225                 raise
4226
4227         # server1 to server2
4228         pkts = []
4229         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4230              IP(src=server1.ip4, dst=server2_nat_ip) /
4231              TCP(sport=server_tcp_port, dport=self.tcp_port_out))
4232         pkts.append(p)
4233         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4234              IP(src=server1.ip4, dst=server2_nat_ip) /
4235              UDP(sport=server_udp_port, dport=self.udp_port_out))
4236         pkts.append(p)
4237         p = (Ether(dst=self.pg0.local_mac, src=self.pg0.remote_mac) /
4238              IP(src=server1.ip4, dst=server2_nat_ip) /
4239              ICMP(id=self.icmp_id_out, type='echo-reply'))
4240         pkts.append(p)
4241         self.pg0.add_stream(pkts)
4242         self.pg_enable_capture(self.pg_interfaces)
4243         self.pg_start()
4244         capture = self.pg0.get_capture(len(pkts))
4245         for packet in capture:
4246             try:
4247                 self.assertEqual(packet[IP].src, server1_nat_ip)
4248                 self.assertEqual(packet[IP].dst, server2.ip4)
4249                 if packet.haslayer(TCP):
4250                     self.assertEqual(packet[TCP].dport, self.tcp_port_in)
4251                     self.assertEqual(packet[TCP].sport, server_tcp_port)
4252                     self.assert_packet_checksums_valid(packet)
4253                 elif packet.haslayer(UDP):
4254                     self.assertEqual(packet[UDP].dport, self.udp_port_in)
4255                     self.assertEqual(packet[UDP].sport, server_udp_port)
4256                 else:
4257                     self.assertEqual(packet[ICMP].id, self.icmp_id_in)
4258             except:
4259                 self.logger.error(ppp("Unexpected or invalid packet:", packet))
4260                 raise
4261
4262
4263 if __name__ == '__main__':
4264     unittest.main(testRunner=VppTestRunner)