capture = self.pg0.get_capture(len(pkts))
self.verify_capture_in(capture, self.pg0)
+ # in2out
+ pkts = self.create_stream_in(self.pg0, self.pg1, ttl=2)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg1.get_capture(len(pkts))
+ self.verify_capture_out(capture, ignore_port=True)
+
+ # out2in
+ pkts = self.create_stream_out(self.pg1, ttl=2)
+ self.pg1.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ self.verify_capture_in(capture, self.pg0)
+
+ # in2out
+ pkts = self.create_stream_in(self.pg0, self.pg1, ttl=1)
+ self.pg0.add_stream(pkts)
+ self.pg_enable_capture(self.pg_interfaces)
+ self.pg_start()
+ capture = self.pg0.get_capture(len(pkts))
+ for p in capture:
+ self.assertIn(ICMP, p)
+ self.assertEqual(p[ICMP].type, 11) # 11 == time-exceeded
+
def test_static_with_port_out2(self):
""" NAT44ED 1:1 NAPT asymmetrical rule """