+ def test_icmp(self):
+ vips = [
+ Ep("30.0.0.1", 5555),
+ Ep("30.0.0.2", 5554),
+ Ep("30.0.0.2", 5553, UDP),
+ Ep("30::1", 6666),
+ Ep("30::2", 5553, UDP),
+ ]
+ sport = 1234
+
+ self.pg0.generate_remote_hosts(len(vips))
+ self.pg0.configure_ipv6_neighbors()
+ self.pg0.configure_ipv4_neighbors()
+
+ self.pg1.generate_remote_hosts(len(vips))
+ self.pg1.configure_ipv6_neighbors()
+ self.pg1.configure_ipv4_neighbors()
+
+ self.vapi.cli("test cnat scanner off")
+ trs = []
+ for nbr, vip in enumerate(vips):
+ trs.append(self.cnat_create_translation(vip, nbr))
+
+ self.logger.info(self.vapi.cli("sh cnat client"))
+ self.logger.info(self.vapi.cli("sh cnat translation"))
+
+ for nbr, vip in enumerate(vips):
+ if vip.isV6:
+ client_addr = self.pg0.remote_hosts[0].ip6
+ remote_addr = self.pg1.remote_hosts[nbr].ip6
+ remote2_addr = self.pg2.remote_hosts[0].ip6
+ else:
+ client_addr = self.pg0.remote_hosts[0].ip4
+ remote_addr = self.pg1.remote_hosts[nbr].ip4
+ remote2_addr = self.pg2.remote_hosts[0].ip4
+ IP46 = IPv6 if vip.isV6 else IP
+ # from client to vip
+ p1 = (Ether(dst=self.pg0.local_mac,
+ src=self.pg0.remote_hosts[0].mac) /
+ IP46(src=client_addr, dst=vip.ip) /
+ vip.l4p(sport=sport, dport=vip.port) /
+ Raw())
+
+ rxs = self.send_and_expect(self.pg0,
+ p1 * N_PKTS,
+ self.pg1)
+
+ for rx in rxs:
+ self.assert_packet_checksums_valid(rx)
+ self.assertEqual(rx[IP46].dst, remote_addr)
+ self.assertEqual(rx[vip.l4p].dport, 4000 + nbr)
+ self.assertEqual(rx[IP46].src, client_addr)
+ self.assertEqual(rx[vip.l4p].sport, sport)
+
+ InnerIP = rxs[0][IP46]
+
+ ICMP46 = ICMPv6DestUnreach if vip.isV6 else ICMP
+ ICMPelem = ICMPv6DestUnreach(code=1) if vip.isV6 else ICMP(type=11)
+ # from vip to client, ICMP error
+ p1 = (Ether(dst=self.pg1.local_mac, src=self.pg1.remote_mac) /
+ IP46(src=remote_addr, dst=client_addr) /
+ ICMPelem / InnerIP)
+
+ rxs = self.send_and_expect(self.pg1,
+ p1 * N_PKTS,
+ self.pg0)
+
+ TCPUDPError = TCPerror if vip.l4p == TCP else UDPerror
+ IP46error = IPerror6 if vip.isV6 else IPerror
+ for rx in rxs:
+ self.assert_packet_checksums_valid(rx)
+ self.assertEqual(rx[IP46].src, vip.ip)
+ self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
+ self.assertEqual(rx[ICMP46][IP46error].dst, vip.ip)
+ self.assertEqual(rx[ICMP46][IP46error]
+ [TCPUDPError].sport, sport)
+ self.assertEqual(rx[ICMP46][IP46error]
+ [TCPUDPError].dport, vip.port)
+
+ # from other remote to client, ICMP error
+ # outside shouldn't be NAT-ed
+ p1 = (Ether(dst=self.pg2.local_mac, src=self.pg2.remote_mac) /
+ IP46(src=remote2_addr, dst=client_addr) /
+ ICMPelem / InnerIP)
+
+ rxs = self.send_and_expect(self.pg1,
+ p1 * N_PKTS,
+ self.pg0)
+
+ TCPUDPError = TCPerror if vip.l4p == TCP else UDPerror
+ IP46error = IPerror6 if vip.isV6 else IPerror
+ for rx in rxs:
+ self.assert_packet_checksums_valid(rx)
+ self.assertEqual(rx[IP46].src, remote2_addr)
+ self.assertEqual(rx[ICMP46][IP46error].src, client_addr)
+ self.assertEqual(rx[ICMP46][IP46error].dst, vip.ip)
+ self.assertEqual(rx[ICMP46][IP46error]
+ [TCPUDPError].sport, sport)
+ self.assertEqual(rx[ICMP46][IP46error]
+ [TCPUDPError].dport, vip.port)
+
+ self.vapi.cnat_session_purge()
+