b1028028b0c1c4d5f4da7d7c38870205ddad777c
[csit.git] / resources / traffic_scripts / ipv6_nd_proxy_check.py
1 #!/usr/bin/env python
2 # Copyright (c) 2016 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 """Traffic script that sends DHCPv6 proxy packets."""
16
17 from scapy.layers.inet import Ether
18 from scapy.layers.inet6 import IPv6, ICMPv6EchoRequest, ICMPv6EchoReply,\
19     ICMPv6ND_NS
20
21 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
22 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
23
24
25 def imcpv6nd_solicit(tx_if, src_mac, dst_mac, src_ip, dst_ip):
26     """Send ICMPv6 Neighbor Solicitation packet and expect a response
27      from the proxy.
28
29     :param tx_if: Interface on TG.
30     :param src_mac: MAC address of TG interface.
31     :param dst_mac: MAC address of proxy interface.
32     :param src_ip: IP address of TG interface.
33     :param dst_ip: IP address of proxied interface.
34     :type tx_if: str
35     :type src_mac: str
36     :type dst_mac: str
37     :type src_ip: str
38     :type dst_ip: str
39     :raises RuntimeError: If the received packet is not correct.
40     """
41
42     rxq = RxQueue(tx_if)
43     txq = TxQueue(tx_if)
44
45     sent_packets = []
46
47     icmpv6nd_solicit_pkt =\
48         Ether(src=src_mac, dst=dst_mac) / \
49         IPv6(src=src_ip) / \
50         ICMPv6ND_NS(tgt=dst_ip)
51
52     sent_packets.append(icmpv6nd_solicit_pkt)
53     txq.send(icmpv6nd_solicit_pkt)
54
55     ether = None
56     for _ in range(5):
57         pkt = rxq.recv(3, ignore=sent_packets)
58         if pkt is not None:
59             ether = pkt
60             break
61
62     if ether is None:
63         raise RuntimeError('ICMPv6ND Proxy response timeout.')
64
65     if ether.src != dst_mac:
66         raise RuntimeError("Source MAC address error: {} != {}".format(
67             ether.src, dst_mac))
68     print "Source MAC address: OK."
69
70     if ether.dst != src_mac:
71         raise RuntimeError("Destination MAC address error: {} != {}".format(
72             ether.dst, src_mac))
73     print "Destination MAC address: OK."
74
75     if ether['IPv6'].src != dst_ip:
76         raise RuntimeError("Source IP address error: {} != {}".format(
77             ether['IPv6'].src, dst_ip))
78     print "Source IP address: OK."
79
80     if ether['IPv6'].dst != src_ip:
81         raise RuntimeError("Destination IP address error: {} != {}".format(
82             ether['IPv6'].dst, src_ip))
83     print "Destination IP address: OK."
84
85     try:
86         target_addr = ether['IPv6']\
87             ['ICMPv6 Neighbor Discovery - Neighbor Advertisement'].tgt
88     except (KeyError, AttributeError):
89         raise RuntimeError("Not an ICMPv6ND Neighbor Advertisement packet.")
90
91     if target_addr != dst_ip:
92         raise RuntimeError("ICMPv6 field 'Target address' error:"
93                            " {} != {}".format(target_addr, dst_ip))
94     print "Target address field: OK."
95
96
97 def ipv6_ping(src_if, dst_if, src_mac, dst_mac,
98               proxy_to_src_mac, proxy_to_dst_mac, src_ip, dst_ip):
99     """Sends ICMPv6 Echo Request, receive it and send a reply.
100
101     :param src_if: First TG interface on link to DUT.
102     :param dst_if: Second TG interface on link to DUT.
103     :param src_mac: MAC address of first interface.
104     :param dst_mac: MAC address of second interface.
105     :param proxy_to_src_mac: MAC address of first proxy interface on DUT.
106     :param proxy_to_dst_mac: MAC address of second proxy interface on DUT.
107     :param src_ip: IP address of first interface.
108     :param dst_ip: IP address of second interface.
109     :type src_if: str
110     :type dst_if: str
111     :type src_mac: str
112     :type dst_mac: str
113     :type proxy_to_src_mac: str
114     :type proxy_to_dst_mac: str
115     :type src_ip: str
116     :type dst_ip: str
117     :raises RuntimeError: If a received packet is not correct.
118     """
119     rxq = RxQueue(dst_if)
120     txq = TxQueue(src_if)
121
122     icmpv6_ping_pkt = \
123         Ether(src=src_mac, dst=proxy_to_src_mac) / \
124         IPv6(src=src_ip, dst=dst_ip) / \
125         ICMPv6EchoRequest()
126
127     txq.send(icmpv6_ping_pkt)
128
129     ether = None
130     for _ in range(5):
131         pkt = rxq.recv(3)
132         if pkt is not None:
133             ether = pkt
134             break
135
136     if ether is None:
137         raise RuntimeError('ICMPv6 Echo Request timeout.')
138     try:
139         ether["IPv6"]["ICMPv6 Echo Request"]
140     except KeyError:
141         raise RuntimeError("Received packet is not an ICMPv6 Echo Request.")
142     print "ICMP Echo: OK."
143
144     rxq = RxQueue(src_if)
145     txq = TxQueue(dst_if)
146
147     icmpv6_ping_pkt = \
148         Ether(src=dst_mac, dst=proxy_to_dst_mac) / \
149         IPv6(src=dst_ip, dst=src_ip) / \
150         ICMPv6EchoReply()
151
152     txq.send(icmpv6_ping_pkt)
153
154     ether = None
155     for _ in range(5):
156         pkt = rxq.recv(3)
157         if pkt is not None:
158             ether = pkt
159             break
160
161     if ether is None:
162         raise RuntimeError('DHCPv6 SOLICIT timeout')
163     try:
164         ether["IPv6"]["ICMPv6 Echo Reply"]
165     except KeyError:
166         raise RuntimeError("Received packet is not an ICMPv6 Echo Reply.")
167
168     print "ICMP Reply: OK."
169
170
171 def main():
172     """Send DHCPv6 proxy messages."""
173
174     args = TrafficScriptArg(['src_ip', 'dst_ip', 'src_mac', 'dst_mac',
175                              'proxy_to_src_mac', 'proxy_to_dst_mac'])
176
177     src_if = args.get_arg('tx_if')
178     dst_if = args.get_arg('rx_if')
179     src_ip = args.get_arg("src_ip")
180     dst_ip = args.get_arg("dst_ip")
181     src_mac = args.get_arg('src_mac')
182     dst_mac = args.get_arg('dst_mac')
183     proxy_to_src_mac = args.get_arg('proxy_to_src_mac')
184     proxy_to_dst_mac = args.get_arg('proxy_to_dst_mac')
185
186     # Neighbor solicitation
187     imcpv6nd_solicit(src_if, src_mac, proxy_to_src_mac, src_ip, dst_ip)
188
189     # Verify route (ICMP echo/reply)
190     ipv6_ping(src_if, dst_if, src_mac, dst_mac,
191               proxy_to_src_mac, proxy_to_dst_mac, src_ip, dst_ip)
192
193 if __name__ == "__main__":
194     main()