try:
cls.tcp_port_in = 6303
- cls.tcp_port_out = 6303
+ cls.tcp_external_port = 6303
cls.udp_port_in = 6304
- cls.udp_port_out = 6304
+ cls.udp_external_port = 6304
cls.icmp_id_in = 6305
cls.snat_addr = '10.0.0.3'
# TCP
p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
- TCP(sport=self.tcp_port_in, dport=self.tcp_port_out))
+ TCP(sport=self.tcp_port_in, dport=self.tcp_external_port))
pkts.append(p)
# UDP
p = (Ether(dst=in_if.local_mac, src=in_if.remote_mac) /
IP(src=in_if.remote_ip4, dst=out_if.remote_ip4, ttl=ttl) /
- UDP(sport=self.udp_port_in, dport=self.udp_port_out))
+ UDP(sport=self.udp_port_in, dport=self.udp_external_port))
pkts.append(p)
# ICMP
# TCP
p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
- TCP(dport=self.tcp_external_port, sport=self.tcp_port_out))
+ TCP(dport=self.tcp_port_out, sport=self.tcp_external_port))
pkts.append(p)
# UDP
p = (Ether(dst=out_if.local_mac, src=out_if.remote_mac) /
IP(src=out_if.remote_ip4, dst=dst_ip, ttl=ttl) /
- UDP(dport=self.udp_external_port, sport=self.udp_port_out))
+ UDP(dport=self.udp_port_out, sport=self.udp_external_port))
pkts.append(p)
# ICMP
try:
self.assertEqual(packet[IP].src, nat_ip)
if packet.haslayer(TCP):
- self.tcp_external_port = packet[TCP].sport
+ self.tcp_port_out = packet[TCP].sport
elif packet.haslayer(UDP):
- self.udp_external_port = packet[UDP].sport
+ self.udp_port_out = packet[UDP].sport
else:
self.icmp_external_id = packet[ICMP].id
except:
# SYN packet in->out
p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
- TCP(sport=self.tcp_port_in, dport=self.tcp_port_out,
+ TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
flags="S"))
in_if.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
capture = out_if.get_capture(1)
p = capture[0]
- self.tcp_external_port = p[TCP].sport
+ self.tcp_port_out = p[TCP].sport
# SYN + ACK packet out->in
p = (Ether(src=out_if.remote_mac, dst=out_if.local_mac) /
IP(src=out_if.remote_ip4, dst=self.snat_addr) /
- TCP(sport=self.tcp_port_out, dport=self.tcp_external_port,
+ TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
flags="SA"))
out_if.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
# ACK packet in->out
p = (Ether(src=in_if.remote_mac, dst=in_if.local_mac) /
IP(src=in_if.remote_ip4, dst=out_if.remote_ip4) /
- TCP(sport=self.tcp_port_in, dport=self.tcp_port_out,
+ TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
flags="A"))
in_if.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
""" CGNAT translation test (TCP, UDP, ICMP) """
nat_ip = "10.0.0.10"
- self.tcp_port_out = 6303
- self.udp_port_out = 6304
- self.icmp_id_in = 6305
- self.tcp_external_port = 7303
- self.udp_external_port = 7304
- self.icmp_id_out = 7305
self.vapi.snat_add_det_map(self.pg0.remote_ip4n,
32,
capture = self.pg0.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg0)
+ # session dump test
+ sessions = self.vapi.snat_det_session_dump(self.pg0.remote_ip4n)
+ self.assertEqual(len(sessions), 3)
+
+ # TCP session
+ s = sessions[0]
+ self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
+ self.assertEqual(s.in_port, self.tcp_port_in)
+ self.assertEqual(s.out_port, self.tcp_port_out)
+ self.assertEqual(s.ext_port, self.tcp_external_port)
+
+ # UDP session
+ s = sessions[1]
+ self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
+ self.assertEqual(s.in_port, self.udp_port_in)
+ self.assertEqual(s.out_port, self.udp_port_out)
+ self.assertEqual(s.ext_port, self.udp_external_port)
+
+ # ICMP session
+ s = sessions[2]
+ self.assertEqual(s.ext_addr[:4], self.pg1.remote_ip4n)
+ self.assertEqual(s.in_port, self.icmp_id_in)
+ self.assertEqual(s.out_port, self.icmp_external_id)
+
def test_multiple_users(self):
""" CGNAT multiple users """
nat_ip = "10.0.0.10"
port_in = 80
- port_out = 6303
+ external_port = 6303
host0 = self.pg0.remote_hosts[0]
host1 = self.pg0.remote_hosts[1]
# host0 to out
p = (Ether(src=host0.mac, dst=self.pg0.local_mac) /
IP(src=host0.ip4, dst=self.pg1.remote_ip4) /
- TCP(sport=port_in, dport=port_out))
+ TCP(sport=port_in, dport=external_port))
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
tcp = p[TCP]
self.assertEqual(ip.src, nat_ip)
self.assertEqual(ip.dst, self.pg1.remote_ip4)
- self.assertEqual(tcp.dport, port_out)
- external_port0 = tcp.sport
+ self.assertEqual(tcp.dport, external_port)
+ port_out0 = tcp.sport
except:
self.logger.error(ppp("Unexpected or invalid packet:", p))
raise
# host1 to out
p = (Ether(src=host1.mac, dst=self.pg0.local_mac) /
IP(src=host1.ip4, dst=self.pg1.remote_ip4) /
- TCP(sport=port_in, dport=port_out))
+ TCP(sport=port_in, dport=external_port))
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
tcp = p[TCP]
self.assertEqual(ip.src, nat_ip)
self.assertEqual(ip.dst, self.pg1.remote_ip4)
- self.assertEqual(tcp.dport, port_out)
- external_port1 = tcp.sport
+ self.assertEqual(tcp.dport, external_port)
+ port_out1 = tcp.sport
except:
self.logger.error(ppp("Unexpected or invalid packet:", p))
raise
# out to host0
p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
IP(src=self.pg1.remote_ip4, dst=nat_ip) /
- TCP(sport=port_out, dport=external_port0))
+ TCP(sport=external_port, dport=port_out0))
self.pg1.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
self.assertEqual(ip.src, self.pg1.remote_ip4)
self.assertEqual(ip.dst, host0.ip4)
self.assertEqual(tcp.dport, port_in)
- self.assertEqual(tcp.sport, port_out)
+ self.assertEqual(tcp.sport, external_port)
except:
self.logger.error(ppp("Unexpected or invalid packet:", p))
raise
# out to host1
p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
IP(src=self.pg1.remote_ip4, dst=nat_ip) /
- TCP(sport=port_out, dport=external_port1))
+ TCP(sport=external_port, dport=port_out1))
self.pg1.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
self.assertEqual(ip.src, self.pg1.remote_ip4)
self.assertEqual(ip.dst, host1.ip4)
self.assertEqual(tcp.dport, port_in)
- self.assertEqual(tcp.sport, port_out)
+ self.assertEqual(tcp.sport, external_port)
except:
self.logger.error(ppp("Unexpected or invalid packet", p))
raise
# session close api test
self.vapi.snat_det_close_session_out(socket.inet_aton(nat_ip),
- external_port1,
+ port_out1,
self.pg1.remote_ip4n,
- port_out)
+ external_port)
dms = self.vapi.snat_det_map_dump()
self.assertEqual(dms[0].ses_num, 1)
self.vapi.snat_det_close_session_in(host0.ip4n,
port_in,
self.pg1.remote_ip4n,
- port_out)
+ external_port)
dms = self.vapi.snat_det_map_dump()
self.assertEqual(dms[0].ses_num, 0)
# FIN packet in -> out
p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
- TCP(sport=self.tcp_port_in, dport=self.tcp_port_out,
+ TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
flags="F"))
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
# ACK packet out -> in
p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
- TCP(sport=self.tcp_port_out, dport=self.tcp_external_port,
+ TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
flags="A"))
pkts.append(p)
# FIN packet out -> in
p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
- TCP(sport=self.tcp_port_out, dport=self.tcp_external_port,
+ TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
flags="F"))
pkts.append(p)
# ACK packet in -> out
p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
- TCP(sport=self.tcp_port_in, dport=self.tcp_port_out,
+ TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
flags="A"))
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
# FIN packet out -> in
p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
- TCP(sport=self.tcp_port_out, dport=self.tcp_external_port,
+ TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
flags="F"))
self.pg1.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
# ACK packet in -> out
p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
- TCP(sport=self.tcp_port_in, dport=self.tcp_port_out,
+ TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
flags="A"))
pkts.append(p)
# ACK packet in -> out
p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
- TCP(sport=self.tcp_port_in, dport=self.tcp_port_out,
+ TCP(sport=self.tcp_port_in, dport=self.tcp_external_port,
flags="F"))
pkts.append(p)
# ACK packet out -> in
p = (Ether(src=self.pg1.remote_mac, dst=self.pg1.local_mac) /
IP(src=self.pg1.remote_ip4, dst=self.snat_addr) /
- TCP(sport=self.tcp_port_out, dport=self.tcp_external_port,
+ TCP(sport=self.tcp_external_port, dport=self.tcp_port_out,
flags="A"))
self.pg1.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
p = (Ether(src=self.pg0.remote_mac, dst=self.pg0.local_mac) /
IP(src=self.pg0.remote_ip4, dst=self.pg1.remote_ip4) /
- UDP(sport=3000, dport=3000))
+ UDP(sport=3001, dport=3002))
self.pg0.add_stream(p)
self.pg_enable_capture(self.pg_interfaces)
self.pg_start()
capture = self.pg1.assert_nothing_captured()
+ # verify ICMP error packet
+ capture = self.pg0.get_capture(1)
+ p = capture[0]
+ self.assertTrue(p.haslayer(ICMP))
+ icmp = p[ICMP]
+ self.assertEqual(icmp.type, 3)
+ self.assertEqual(icmp.code, 1)
+ self.assertTrue(icmp.haslayer(IPerror))
+ inner_ip = icmp[IPerror]
+ self.assertEqual(inner_ip[UDPerror].sport, 3001)
+ self.assertEqual(inner_ip[UDPerror].dport, 3002)
+
dms = self.vapi.snat_det_map_dump()
self.assertEqual(1000, dms[0].ses_num)