CSIT-755: Presentation and analytics layer
[csit.git] / resources / traffic_scripts / ipv6_ns.py
1 #!/usr/bin/env python
2
3 # Copyright (c) 2016 Cisco and/or its affiliates.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at:
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15
16 """Traffic script for IPv6 Neighbor Solicitation test."""
17
18 import sys
19 import logging
20
21 # pylint: disable=no-name-in-module
22 # pylint: disable=import-error
23 logging.getLogger("scapy.runtime").setLevel(logging.ERROR)
24
25 from scapy.all import Ether
26 from scapy.layers.inet6 import IPv6, ICMPv6ND_NA, ICMPv6ND_NS
27 from scapy.layers.inet6 import ICMPv6NDOptDstLLAddr, ICMPv6NDOptSrcLLAddr
28
29 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
30 from resources.libraries.python.PacketVerifier import checksum_equal
31 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
32
33
34 def main():
35     args = TrafficScriptArg(['src_mac', 'dst_mac', 'src_ip', 'dst_ip'])
36
37     rxq = RxQueue(args.get_arg('rx_if'))
38     txq = TxQueue(args.get_arg('tx_if'))
39
40     src_mac = args.get_arg('src_mac')
41     dst_mac = args.get_arg('dst_mac')
42     src_ip = args.get_arg('src_ip')
43     dst_ip = args.get_arg('dst_ip')
44
45     sent_packets = []
46
47     # send ICMPv6 neighbor solicitation message
48     pkt_send = (Ether(src=src_mac, dst='ff:ff:ff:ff:ff:ff') /
49                 IPv6(src=src_ip, dst='ff02::1:ff00:2') /
50                 ICMPv6ND_NS(tgt=dst_ip) /
51                 ICMPv6NDOptSrcLLAddr(lladdr=src_mac))
52     sent_packets.append(pkt_send)
53     txq.send(pkt_send)
54
55     # receive ICMPv6 neighbor advertisement message
56     while True:
57         ether = rxq.recv(2, sent_packets)
58         if ether is None:
59             raise RuntimeError('ICMPv6 echo reply Rx timeout')
60
61         if ether.haslayer(ICMPv6ND_NS):
62             # read another packet in the queue if the current one is ICMPv6ND_NS
63             continue
64         else:
65             # otherwise process the current packet
66             break
67
68     if ether is None:
69         raise RuntimeError('ICMPv6 echo reply Rx timeout')
70
71     if not ether.haslayer(IPv6):
72         raise RuntimeError('Unexpected packet with no IPv6 received {0}'.
73                            format(ether.__repr__()))
74
75     ipv6 = ether[IPv6]
76
77     if not ipv6.haslayer(ICMPv6ND_NA):
78         raise RuntimeError('Unexpected packet with no ICMPv6 ND-NA received '
79                            '{0}'.format(ipv6.__repr__()))
80
81     icmpv6_na = ipv6[ICMPv6ND_NA]
82
83     # verify target address
84     if icmpv6_na.tgt != dst_ip:
85         raise RuntimeError('Invalid target address {0} should be {1}'.
86                            format(icmpv6_na.tgt, dst_ip))
87
88     if not icmpv6_na.haslayer(ICMPv6NDOptDstLLAddr):
89         raise RuntimeError('Missing Destination Link-Layer Address option in '
90                            'ICMPv6 Neighbor Advertisement {0}'.
91                            format(icmpv6_na.__repr__()))
92
93     dst_ll_addr = icmpv6_na[ICMPv6NDOptDstLLAddr]
94
95     # verify destination link-layer address field
96     if dst_ll_addr.lladdr != dst_mac:
97         raise RuntimeError('Invalid lladdr {0} should be {1}'.
98                            format(dst_ll_addr.lladdr, dst_mac))
99
100     # verify checksum
101     cksum = icmpv6_na.cksum
102     del icmpv6_na.cksum
103     tmp = ICMPv6ND_NA(str(icmpv6_na))
104     if not checksum_equal(tmp.cksum, cksum):
105         raise RuntimeError('Invalid checksum {0} should be {1}'.
106                            format(cksum, tmp.cksum))
107
108     sys.exit(0)
109
110
111 if __name__ == "__main__":
112     main()