3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
16 """Traffic script for IPv6 Neighbor Solicitation test."""
21 # pylint: disable=no-name-in-module
22 # pylint: disable=import-error
23 logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
25 from scapy.all import Ether
26 from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS
27 from scapy.layers.inet6 import ICMPv6NDOptDstLLAddr, ICMPv6NDOptSrcLLAddr
29 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
30 from resources.libraries.python.PacketVerifier import checksum_equal
31 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
35 args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_ip', 'dst_ip'])
37 rxq = RxQueue(args.get_arg('rx_if'))
38 txq = TxQueue(args.get_arg('tx_if'))
40 src_mac = args.get_arg('src_mac')
41 dst_mac = args.get_arg('dst_mac')
42 src_ip = args.get_arg('src_ip')
43 dst_ip = args.get_arg('dst_ip')
47 # send ICMPv6 neighbor solicitation message
48 pkt_send = (Ether(src=src_mac, dst='ff:ff:ff:ff:ff:ff') /
49 IPv6(src=src_ip, dst='ff02::1:ff00:2') /
50 ICMPv6ND_NS(tgt=dst_ip) /
51 ICMPv6NDOptSrcLLAddr(lladdr=src_mac))
52 sent_packets.append(pkt_send)
55 # receive ICMPv6 neighbor advertisement message
57 ether = rxq.recv(2, sent_packets)
59 raise RuntimeError('ICMPv6 echo reply Rx timeout')
61 if ether.haslayer(ICMPv6ND_NS):
62 # read another packet in the queue if the current one is ICMPv6ND_NS
65 # otherwise process the current packet
69 raise RuntimeError('ICMPv6 echo reply Rx timeout')
71 if not ether.haslayer(IPv6):
72 raise RuntimeError('Unexpected packet with no IPv6 received {0}'.
73 format(ether.__repr__()))
77 if not ipv6.haslayer(ICMPv6ND_NA):
78 raise RuntimeError('Unexpected packet with no ICMPv6 ND-NA received '
79 '{0}'.format(ipv6.__repr__()))
81 icmpv6_na = ipv6[ICMPv6ND_NA]
83 # verify target address
84 if icmpv6_na.tgt != dst_ip:
85 raise RuntimeError('Invalid target address {0} should be {1}'.
86 format(icmpv6_na.tgt, dst_ip))
88 if not icmpv6_na.haslayer(ICMPv6NDOptDstLLAddr):
89 raise RuntimeError('Missing Destination Link-Layer Address option in '
90 'ICMPv6 Neighbor Advertisement {0}'.
91 format(icmpv6_na.__repr__()))
93 dst_ll_addr = icmpv6_na[ICMPv6NDOptDstLLAddr]
95 # verify destination link-layer address field
96 if dst_ll_addr.lladdr != dst_mac:
97 raise RuntimeError('Invalid lladdr {0} should be {1}'.
98 format(dst_ll_addr.lladdr, dst_mac))
101 cksum = icmpv6_na.cksum
103 tmp = ICMPv6ND_NA(str(icmpv6_na))
104 if not checksum_equal(tmp.cksum, cksum):
105 raise RuntimeError('Invalid checksum {0} should be {1}'.
106 format(cksum, tmp.cksum))
111 if __name__ == "__main__":