7 import scapy.layers.inet6 as inet6
9 from util import ppp, ppc
10 from re import compile
11 from scapy.packet import Raw
12 from scapy.layers.l2 import Ether
13 from scapy.layers.inet import IP, UDP, ICMP
14 from scapy.layers.inet6 import IPv6, ICMPv6DestUnreach
15 from framework import VppTestCase, VppTestRunner
18 class TestPuntSocket(VppTestCase):
23 err_ptr = compile(r"^([\d]+)\s+([-\w]+)\s+([ -\.\w)(]+)$")
26 def setUpConstants(cls):
28 cls.extra_vpp_punt_config = [
29 "punt", "{", "socket", cls.tempdir+"/socket_punt", "}"]
30 super(TestPuntSocket, cls).setUpConstants()
32 def process_cli(self, exp, ptr):
33 for line in self.vapi.cli(exp).split('\n')[1:]:
34 m = ptr.match(line.strip())
38 def show_errors(self):
39 for pack in self.process_cli("show errors", self.err_ptr):
41 count, node, reason = pack
45 yield count, node, reason
47 def get_punt_count(self, counter):
48 errors = list(self.show_errors())
49 for count, node, reason in errors:
50 if (node == counter and
51 reason == u'Socket TX'):
55 def socket_client_create(self, sock_name):
56 self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
60 self.logger.debug("Unlink socket faild")
61 self.sock.bind(sock_name)
63 def socket_client_close(self):
67 class TestIP4PuntSocket(TestPuntSocket):
68 """ Punt Socket for IPv4 """
71 super(TestIP4PuntSocket, self).setUp()
73 self.create_pg_interfaces(range(2))
75 for i in self.pg_interfaces:
81 super(TestIP4PuntSocket, self).tearDown()
82 for i in self.pg_interfaces:
86 def test_punt_socket_dump(self):
87 """ Punt socket registration"""
89 punts = self.vapi.punt_socket_dump(0)
90 self.assertEqual(len(punts), 0)
93 # configure a punt socket
95 self.vapi.punt_socket_register(1111, self.tempdir+"/socket_punt_1111")
96 self.vapi.punt_socket_register(2222, self.tempdir+"/socket_punt_2222")
97 punts = self.vapi.punt_socket_dump(0)
98 self.assertEqual(len(punts), 2)
99 self.assertEqual(punts[0].punt.l4_port, 1111)
100 # self.assertEqual(punts[0].pathname, "/tmp/punt_socket_udp_1234")
101 self.assertEqual(punts[1].punt.l4_port, 2222)
102 # self.assertEqual(punts[1].pathname, "/tmp/punt_socket_udp_5678")
105 # deregister a punt socket
107 self.vapi.punt_socket_deregister(1111)
108 punts = self.vapi.punt_socket_dump(0)
109 self.assertEqual(len(punts), 1)
112 # configure a punt socket again
114 self.vapi.punt_socket_register(1111, self.tempdir+"/socket_punt_1111")
115 self.vapi.punt_socket_register(3333, self.tempdir+"/socket_punt_3333")
116 punts = self.vapi.punt_socket_dump(0)
117 self.assertEqual(len(punts), 3)
120 # deregister all punt socket
122 self.vapi.punt_socket_deregister(1111)
123 self.vapi.punt_socket_deregister(2222)
124 self.vapi.punt_socket_deregister(3333)
125 punts = self.vapi.punt_socket_dump(0)
126 self.assertEqual(len(punts), 0)
128 def test_punt_socket_traffic(self):
129 """ Punt socket traffic"""
132 p = (Ether(src=self.pg0.remote_mac,
133 dst=self.pg0.local_mac) /
134 IP(src=self.pg0.remote_ip4, dst=self.pg0.local_ip4) /
135 UDP(sport=9876, dport=1234) /
138 pkts = p * nr_packets
140 punts = self.vapi.punt_socket_dump(0)
141 self.assertEqual(len(punts), 0)
144 # expect ICMP - port unreachable for all packets
146 self.vapi.cli("clear trace")
147 self.pg0.add_stream(pkts)
148 self.pg_enable_capture(self.pg_interfaces)
150 rx = self.pg0.get_capture(nr_packets)
152 self.assertEqual(int(p[IP].proto), 1) # ICMP
153 self.assertEqual(int(p[ICMP].code), 3) # unreachable
156 # configure a punt socket
158 self.socket_client_create(self.tempdir+"/socket_punt_1234")
159 self.vapi.punt_socket_register(1234, self.tempdir+"/socket_punt_1234")
160 punts = self.vapi.punt_socket_dump(0)
161 self.assertEqual(len(punts), 1)
164 # expect punt socket and no packets on pg0
166 self.vapi.cli("clear errors")
167 self.pg0.add_stream(pkts)
168 self.pg_enable_capture(self.pg_interfaces)
170 self.pg0.get_capture(0)
171 self.socket_client_close()
174 # remove punt socket. expect ICMP - port unreachable for all packets
176 self.vapi.punt_socket_deregister(1234)
177 punts = self.vapi.punt_socket_dump(0)
178 self.assertEqual(len(punts), 0)
179 self.pg0.add_stream(pkts)
180 self.pg_enable_capture(self.pg_interfaces)
182 # FIXME - when punt socket deregister is implemented
183 # self.pg0.get_capture(nr_packets)
186 class TestIP6PuntSocket(TestPuntSocket):
187 """ Punt Socket for IPv6"""
190 super(TestIP6PuntSocket, self).setUp()
192 self.create_pg_interfaces(range(2))
194 for i in self.pg_interfaces:
200 super(TestIP6PuntSocket, self).tearDown()
201 for i in self.pg_interfaces:
205 def test_punt_socket_dump(self):
206 """ Punt socket registration """
208 punts = self.vapi.punt_socket_dump(0)
209 self.assertEqual(len(punts), 0)
212 # configure a punt socket
214 self.vapi.punt_socket_register(1111, self.tempdir+"/socket_punt_1111",
216 self.vapi.punt_socket_register(2222, self.tempdir+"/socket_punt_2222",
218 punts = self.vapi.punt_socket_dump(1)
219 self.assertEqual(len(punts), 2)
220 self.assertEqual(punts[0].punt.l4_port, 1111)
221 # self.assertEqual(punts[0].pathname, "/tmp/punt_socket_udp_1234")
222 self.assertEqual(punts[1].punt.l4_port, 2222)
223 # self.assertEqual(punts[1].pathname, "/tmp/punt_socket_udp_5678")
226 # deregister a punt socket
228 self.vapi.punt_socket_deregister(1111, is_ip4=0)
229 punts = self.vapi.punt_socket_dump(1)
230 self.assertEqual(len(punts), 1)
233 # configure a punt socket again
235 self.vapi.punt_socket_register(1111, self.tempdir+"/socket_punt_1111",
237 punts = self.vapi.punt_socket_dump(1)
238 self.assertEqual(len(punts), 2)
241 # deregister all punt socket
243 self.vapi.punt_socket_deregister(1111, is_ip4=0)
244 self.vapi.punt_socket_deregister(2222, is_ip4=0)
245 self.vapi.punt_socket_deregister(3333, is_ip4=0)
246 punts = self.vapi.punt_socket_dump(1)
247 self.assertEqual(len(punts), 0)
249 def test_punt_socket_traffic(self):
250 """ Punt socket traffic"""
253 p = (Ether(src=self.pg0.remote_mac,
254 dst=self.pg0.local_mac) /
255 IPv6(src=self.pg0.remote_ip6, dst=self.pg0.local_ip6) /
256 inet6.UDP(sport=9876, dport=1234) /
259 pkts = p * nr_packets
261 punts = self.vapi.punt_socket_dump(1)
262 self.assertEqual(len(punts), 0)
265 # expect ICMPv6 - destination unreachable for all packets
267 self.vapi.cli("clear trace")
268 self.pg0.add_stream(pkts)
269 self.pg_enable_capture(self.pg_interfaces)
271 rx = self.pg0.get_capture(nr_packets)
273 self.assertEqual(int(p[IPv6].nh), 58) # ICMPv6
274 self.assertEqual(int(p[ICMPv6DestUnreach].code), 4) # unreachable
277 # configure a punt socket
279 self.socket_client_create(self.tempdir+"/socket_punt_1234")
280 self.vapi.punt_socket_register(1234, self.tempdir+"/socket_punt_1234",
282 punts = self.vapi.punt_socket_dump(1)
283 self.assertEqual(len(punts), 1)
286 # expect punt socket and no packets on pg0
288 self.vapi.cli("clear errors")
289 self.pg0.add_stream(pkts)
290 self.pg_enable_capture(self.pg_interfaces)
292 self.pg0.get_capture(0)
293 self.socket_client_close()
296 # remove punt socket. expect ICMP - dest. unreachable for all packets
298 self.vapi.punt_socket_deregister(1234, is_ip4=0)
299 punts = self.vapi.punt_socket_dump(1)
300 self.assertEqual(len(punts), 0)
301 self.pg0.add_stream(pkts)
302 self.pg_enable_capture(self.pg_interfaces)
304 # FIXME - when punt socket deregister is implemented
305 # self.pg0.get_capture(nr_packets)
308 if __name__ == '__main__':
309 unittest.main(testRunner=VppTestRunner)