CSIT-755: Presentation and analytics layer
[csit.git] / resources / traffic_scripts / send_rs_check_ra.py
1 #!/usr/bin/env python
2 # Copyright (c) 2016 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 """Router solicitation check script."""
16
17 import sys
18 import ipaddress
19
20 from scapy.layers.l2 import Ether
21 from scapy.layers.inet6 import IPv6, ICMPv6ND_RA, ICMPv6ND_RS, ICMPv6ND_NS
22
23 from resources.libraries.python.PacketVerifier import RxQueue, TxQueue
24 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
25
26
27 def mac_to_ipv6_linklocal(mac):
28     """Transfer MAC address into specific link-local IPv6 address.
29
30     :param mac: MAC address to be transferred.
31     :type mac: str
32     :return: IPv6 link-local address.
33     :rtype: str
34     """
35     # Remove the most common delimiters: dots, dashes, etc.
36     mac_value = int(mac.translate(None, ' .:-'), 16)
37
38     # Split out the bytes that slot into the IPv6 address
39     # XOR the most significant byte with 0x02, inverting the
40     # Universal / Local bit
41     high2 = mac_value >> 32 & 0xffff ^ 0x0200
42     high1 = mac_value >> 24 & 0xff
43     low1 = mac_value >> 16 & 0xff
44     low2 = mac_value & 0xffff
45
46     return 'fe80::{:04x}:{:02x}ff:fe{:02x}:{:04x}'.format(
47         high2, high1, low1, low2)
48
49
50 def main():
51     """Send Router Solicitation packet, check if the received response\
52      is a Router Advertisement packet and verify."""
53
54     args = TrafficScriptArg(
55         ['src_mac', 'dst_mac', 'src_ip']
56     )
57
58     router_mac = args.get_arg('dst_mac')
59     src_mac = args.get_arg('src_mac')
60     src_ip = args.get_arg('src_ip')
61     if not src_ip:
62         src_ip = mac_to_ipv6_linklocal(src_mac)
63     tx_if = args.get_arg('tx_if')
64
65     txq = TxQueue(tx_if)
66     rxq = RxQueue(tx_if)
67
68     pkt_raw = (Ether(src=src_mac, dst='33:33:00:00:00:02') /
69                IPv6(src=src_ip, dst='ff02::2') /
70                ICMPv6ND_RS())
71
72     sent_packets = [pkt_raw]
73     txq.send(pkt_raw)
74
75     while True:
76         ether = rxq.recv(2, sent_packets)
77         if ether is None:
78             raise RuntimeError('ICMP echo Rx timeout')
79
80         if ether.haslayer(ICMPv6ND_NS):
81             # read another packet in the queue if the current one is ICMPv6ND_NS
82             continue
83         else:
84             # otherwise process the current packet
85             break
86
87     # Check whether received packet contains layer RA and check other values
88     if ether.src != router_mac:
89         raise RuntimeError('Packet source MAC ({0}) does not match router MAC '
90                            '({1}).'.format(ether.src, router_mac))
91     if ether.dst != src_mac:
92         raise RuntimeError('Packet destination MAC ({0}) does not match RS '
93                            'source MAC ({1}).'.format(ether.dst, src_mac))
94
95     if not ether.haslayer(ICMPv6ND_RA):
96         raise RuntimeError('Not an RA packet received {0}'.
97                            format(ether.__repr__()))
98
99     src_address = ipaddress.IPv6Address(unicode(ether['IPv6'].src))
100     dst_address = ipaddress.IPv6Address(unicode(ether['IPv6'].dst))
101     router_link_local = ipaddress.IPv6Address(unicode(
102         mac_to_ipv6_linklocal(router_mac)))
103     rs_src_address = ipaddress.IPv6Address(unicode(src_ip))
104
105     if src_address != router_link_local:
106         raise RuntimeError('Packet source address ({0}) does not match link '
107                            'local address({1})'.
108                            format(src_address, router_link_local))
109
110     if dst_address != rs_src_address:
111         raise RuntimeError('Packet destination address ({0}) does not match '
112                            'RS source address ({1}).'.
113                            format(dst_address, rs_src_address))
114
115     if ether['IPv6'].hlim != 255:
116         raise RuntimeError('Hop limit not correct: {0}!=255'.
117                            format(ether['IPv6'].hlim))
118
119     ra_code = ether[ICMPv6ND_RA].code
120     if ra_code != 0:
121         raise RuntimeError('ICMP code: {0} not correct. '.format(ra_code))
122
123     sys.exit(0)
124
125
126 if __name__ == "__main__":
127     main()