Ignore unexpected ICMPv6 Neighbor Discovery - Neighbor Solicitation packets
[csit.git] / resources / traffic_scripts / icmpv6_echo_req_resp.py
1 #!/usr/bin/env python
2
3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 """Send ICMPv6 echo request from one TG port to another through DUT nodes and
17    send reply back. Also verify hop limit processing."""
18
19 import sys
20 import logging
21
22 # pylint: disable=no-name-in-module
23 # pylint: disable=import-error
24 logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
25
26 from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS
27 from scapy.layers.inet6 import ICMPv6NDOptDstLLAddr
28 from scapy.layers.inet6 import ICMPv6EchoRequest, ICMPv6EchoReply
29 from scapy.all import Ether
30
31 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
32 from resources.libraries.python.PacketVerifier import checksum_equal
33 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
34
35
36 def main():
37     args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_nh_mac', 'dst_nh_mac',
38                              'src_ip', 'dst_ip', 'h_num'])
39
40     src_rxq = RxQueue(args.get_arg('rx_if'))
41     src_txq = TxQueue(args.get_arg('rx_if'))
42     dst_rxq = RxQueue(args.get_arg('tx_if'))
43     dst_txq = TxQueue(args.get_arg('tx_if'))
44
45     src_mac = args.get_arg('src_mac')
46     dst_mac = args.get_arg('dst_mac')
47     src_nh_mac = args.get_arg('src_nh_mac')
48     dst_nh_mac = args.get_arg('dst_nh_mac')
49     src_ip = args.get_arg('src_ip')
50     dst_ip = args.get_arg('dst_ip')
51     hop_num = int(args.get_arg('h_num'))
52     hop_limit = 64
53     echo_id = 0xa
54     echo_seq = 0x1
55
56     src_sent_packets = []
57     dst_sent_packets = []
58
59     # send ICMPv6 neighbor advertisement message
60     pkt_send = (Ether(src=src_mac, dst='ff:ff:ff:ff:ff:ff') /
61                 IPv6(src=src_ip, dst='ff02::1:ff00:2') /
62                 ICMPv6ND_NA(tgt=src_ip, R=0) /
63                 ICMPv6NDOptDstLLAddr(lladdr=src_mac))
64     src_sent_packets.append(pkt_send)
65     src_txq.send(pkt_send)
66     pkt_send = (Ether(src=dst_mac, dst='ff:ff:ff:ff:ff:ff') /
67                 IPv6(src=dst_ip, dst='ff02::1:ff00:2') /
68                 ICMPv6ND_NA(tgt=dst_ip, R=0) /
69                 ICMPv6NDOptDstLLAddr(lladdr=dst_mac))
70     dst_sent_packets.append(pkt_send)
71     dst_txq.send(pkt_send)
72
73     # send ICMPv6 echo request from first TG interface
74     pkt_send = (Ether(src=src_mac, dst=src_nh_mac) /
75                 IPv6(src=src_ip, dst=dst_ip, hlim=hop_limit) /
76                 ICMPv6EchoRequest(id=echo_id, seq=echo_seq))
77     src_sent_packets.append(pkt_send)
78     src_txq.send(pkt_send)
79
80     # receive ICMPv6 echo request on second TG interface
81     while True:
82         ether = dst_rxq.recv(2, dst_sent_packets)
83         if ether is None:
84             raise RuntimeError('ICMPv6 echo reply Rx timeout')
85
86         if ether.haslayer(ICMPv6ND_NS):
87             # read another packet in the queue if the current one is ICMPv6ND_NS
88             continue
89         else:
90             # otherwise process the current packet
91             break
92
93     if not ether.haslayer(IPv6):
94         raise RuntimeError('Unexpected packet with no IPv6 received: {0}'.
95                            format(ether.__repr__()))
96
97     ipv6 = ether[IPv6]
98
99     # verify hop limit processing
100     if ipv6.hlim != (hop_limit - hop_num):
101         raise RuntimeError('Invalid hop limit {0} should be {1}'.
102                            format(ipv6.hlim,hop_limit - hop_num))
103
104     if not ipv6.haslayer(ICMPv6EchoRequest):
105         raise RuntimeError('Unexpected packet with no IPv6 ICMP received {0}'.
106                            format(ipv6.__repr__()))
107
108     icmpv6 = ipv6[ICMPv6EchoRequest]
109
110     # check identifier and sequence number
111     if icmpv6.id != echo_id or icmpv6.seq != echo_seq:
112         raise RuntimeError('Invalid ICMPv6 echo reply received ID {0} '
113                            'seq {1} should be ID {2} seq {3}'.
114                            format(icmpv6.id, icmpv6.seq, echo_id, echo_seq))
115
116     # verify checksum
117     cksum = icmpv6.cksum
118     del icmpv6.cksum
119     tmp = ICMPv6EchoRequest(str(icmpv6))
120     if not checksum_equal(tmp.cksum, cksum):
121         raise RuntimeError('Invalid checksum {0} should be {1}'.
122                            format(cksum, tmp.cksum))
123
124     # send ICMPv6 echo reply from second TG interface
125     pkt_send = (Ether(src=dst_mac, dst=dst_nh_mac) /
126                 IPv6(src=dst_ip, dst=src_ip) /
127                 ICMPv6EchoReply(id=echo_id, seq=echo_seq))
128     dst_sent_packets.append(pkt_send)
129     dst_txq.send(pkt_send)
130
131     # receive ICMPv6 echo reply on first TG interface
132     while True:
133         ether = src_rxq.recv(2, src_sent_packets)
134         if ether is None:
135             raise RuntimeError('ICMPv6 echo reply Rx timeout')
136
137         if ether.haslayer(ICMPv6ND_NS):
138             # read another packet in the queue if the current one is ICMPv6ND_NS
139             continue
140         else:
141             # otherwise process the current packet
142             break
143
144     if not ether.haslayer(IPv6):
145         raise RuntimeError('Unexpected packet with no IPv6 layer received {0}'.
146                            format(ether.__repr__()))
147
148     ipv6 = ether[IPv6]
149
150     # verify hop limit processing
151     if ipv6.hlim != (hop_limit - hop_num):
152         raise RuntimeError('Invalid hop limit {0} should be {1}'.
153                            format(ipv6.hlim, hop_limit - hop_num))
154
155     if not ipv6.haslayer(ICMPv6EchoReply):
156         raise RuntimeError('Unexpected packet with no IPv6 ICMP received {0}'.
157                            format(ipv6.__repr__()))
158
159     icmpv6 = ipv6[ICMPv6EchoReply]
160
161     # check identifier and sequence number
162     if icmpv6.id != echo_id or icmpv6.seq != echo_seq:
163         raise RuntimeError('Invalid ICMPv6 echo reply received ID {0} '
164                            'seq {1} should be ID {2} seq {3}'.
165                            format(icmpv6.id, icmpv6.seq, echo_id, echo_seq))
166
167     # verify checksum
168     cksum = icmpv6.cksum
169     del icmpv6.cksum
170     tmp = ICMPv6EchoReply(str(icmpv6))
171     if not checksum_equal(tmp.cksum, cksum):
172         raise RuntimeError('Invalid checksum {0} should be {1}'.
173                            format(cksum, tmp.cksum))
174
175     sys.exit(0)
176
177
178 if __name__ == "__main__":
179     main()