Ignore unexpected ICMPv6 Neighbor Discovery - Neighbor Solicitation packets
[csit.git] / resources / traffic_scripts / ipv6_nd_proxy_check.py
1 #!/usr/bin/env python
2 # Copyright (c) 2016 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 """Traffic script that sends DHCPv6 proxy packets."""
16
17 from scapy.layers.inet import Ether
18 from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS
19 from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
20
21
22 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
23 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
24
25
26 def imcpv6nd_solicit(tx_if, src_mac, dst_mac, src_ip, dst_ip):
27     """Send ICMPv6 Neighbor Solicitation packet and expect a response
28      from the proxy.
29
30     :param tx_if: Interface on TG.
31     :param src_mac: MAC address of TG interface.
32     :param dst_mac: MAC address of proxy interface.
33     :param src_ip: IP address of TG interface.
34     :param dst_ip: IP address of proxied interface.
35     :type tx_if: str
36     :type src_mac: str
37     :type dst_mac: str
38     :type src_ip: str
39     :type dst_ip: str
40     :raises RuntimeError: If the received packet is not correct.
41     """
42
43     rxq = RxQueue(tx_if)
44     txq = TxQueue(tx_if)
45
46     sent_packets = []
47
48     icmpv6nd_solicit_pkt = (Ether(src=src_mac, dst=dst_mac) /
49                             IPv6(src=src_ip) /
50                             ICMPv6ND_NS(tgt=dst_ip))
51
52     sent_packets.append(icmpv6nd_solicit_pkt)
53     txq.send(icmpv6nd_solicit_pkt)
54
55     ether = None
56     for _ in range(5):
57         while True:
58             pkt = rxq.recv(3, ignore=sent_packets)
59             if ether.haslayer(ICMPv6ND_NS):
60                 # read another packet in the queue in case of ICMPv6ND_NS packet
61                 continue
62             else:
63                 # otherwise process the current packet
64                 break
65         if pkt is not None:
66             ether = pkt
67             break
68
69     if ether is None:
70         raise RuntimeError('ICMPv6ND Proxy response timeout.')
71
72     if ether.src != dst_mac:
73         raise RuntimeError("Source MAC address error: {} != {}".
74                            format(ether.src, dst_mac))
75     print "Source MAC address: OK."
76
77     if ether.dst != src_mac:
78         raise RuntimeError("Destination MAC address error: {} != {}".
79                            format(ether.dst, src_mac))
80     print "Destination MAC address: OK."
81
82     if ether[IPv6].src != dst_ip:
83         raise RuntimeError("Source IP address error: {} != {}".
84                            format(ether[IPv6].src, dst_ip))
85     print "Source IP address: OK."
86
87     if ether[IPv6].dst != src_ip:
88         raise RuntimeError("Destination IP address error: {} != {}".
89                            format(ether[IPv6].dst, src_ip))
90     print "Destination IP address: OK."
91
92     try:
93         target_addr = ether[IPv6][ICMPv6ND_NA].tgt
94     except (KeyError, AttributeError):
95         raise RuntimeError("Not an ICMPv6ND Neighbor Advertisement packet.")
96
97     if target_addr != dst_ip:
98         raise RuntimeError("ICMPv6 field 'Target address' error: {} != {}".
99                            format(target_addr, dst_ip))
100     print "Target address field: OK."
101
102
103 def ipv6_ping(src_if, dst_if, src_mac, dst_mac,
104               proxy_to_src_mac, proxy_to_dst_mac, src_ip, dst_ip):
105     """Sends ICMPv6 Echo Request, receive it and send a reply.
106
107     :param src_if: First TG interface on link to DUT.
108     :param dst_if: Second TG interface on link to DUT.
109     :param src_mac: MAC address of first interface.
110     :param dst_mac: MAC address of second interface.
111     :param proxy_to_src_mac: MAC address of first proxy interface on DUT.
112     :param proxy_to_dst_mac: MAC address of second proxy interface on DUT.
113     :param src_ip: IP address of first interface.
114     :param dst_ip: IP address of second interface.
115     :type src_if: str
116     :type dst_if: str
117     :type src_mac: str
118     :type dst_mac: str
119     :type proxy_to_src_mac: str
120     :type proxy_to_dst_mac: str
121     :type src_ip: str
122     :type dst_ip: str
123     :raises RuntimeError: If a received packet is not correct.
124     """
125     rxq = RxQueue(dst_if)
126     txq = TxQueue(src_if)
127
128     icmpv6_ping_pkt = (Ether(src=src_mac, dst=proxy_to_src_mac) /
129                        IPv6(src=src_ip, dst=dst_ip) /
130                        ICMPv6EchoRequest())
131
132     txq.send(icmpv6_ping_pkt)
133
134     ether = None
135     for _ in range(5):
136         while True:
137             pkt = rxq.recv(3)
138             if ether.haslayer(ICMPv6ND_NS):
139                 # read another packet in the queue in case of ICMPv6ND_NS packet
140                 continue
141             else:
142                 # otherwise process the current packet
143                 break
144         if pkt is not None:
145             ether = pkt
146             break
147
148     if ether is None:
149         raise RuntimeError('ICMPv6 Echo Request timeout.')
150     try:
151         ether[IPv6]["ICMPv6 Echo Request"]
152     except KeyError:
153         raise RuntimeError("Received packet is not an ICMPv6 Echo Request.")
154     print "ICMP Echo: OK."
155
156     rxq = RxQueue(src_if)
157     txq = TxQueue(dst_if)
158
159     icmpv6_ping_pkt = (Ether(src=dst_mac, dst=proxy_to_dst_mac) /
160                        IPv6(src=dst_ip, dst=src_ip) /
161                        ICMPv6EchoReply())
162
163     txq.send(icmpv6_ping_pkt)
164
165     ether = None
166     for _ in range(5):
167         while True:
168             pkt = rxq.recv(3)
169             if ether.haslayer(ICMPv6ND_NS):
170                 # read another packet in the queue in case of ICMPv6ND_NS packet
171                 continue
172             else:
173                 # otherwise process the current packet
174                 break
175         if pkt is not None:
176             ether = pkt
177             break
178
179     if ether is None:
180         raise RuntimeError('DHCPv6 SOLICIT timeout')
181     try:
182         ether[IPv6]["ICMPv6 Echo Reply"]
183     except KeyError:
184         raise RuntimeError("Received packet is not an ICMPv6 Echo Reply.")
185
186     print "ICMP Reply: OK."
187
188
189 def main():
190     """Send DHCPv6 proxy messages."""
191
192     args = TrafficScriptArg(['src_ip', 'dst_ip', 'src_mac', 'dst_mac',
193                              'proxy_to_src_mac', 'proxy_to_dst_mac'])
194
195     src_if = args.get_arg('tx_if')
196     dst_if = args.get_arg('rx_if')
197     src_ip = args.get_arg("src_ip")
198     dst_ip = args.get_arg("dst_ip")
199     src_mac = args.get_arg('src_mac')
200     dst_mac = args.get_arg('dst_mac')
201     proxy_to_src_mac = args.get_arg('proxy_to_src_mac')
202     proxy_to_dst_mac = args.get_arg('proxy_to_dst_mac')
203
204     # Neighbor solicitation
205     imcpv6nd_solicit(src_if, src_mac, proxy_to_src_mac, src_ip, dst_ip)
206
207     # Verify route (ICMP echo/reply)
208     ipv6_ping(src_if, dst_if, src_mac, dst_mac, proxy_to_src_mac,
209               proxy_to_dst_mac, src_ip, dst_ip)
210
211
212 if __name__ == "__main__":
213     main()