CSIT-687: Directory structure reorganization
[csit.git] / resources / traffic_scripts / check_ra_packet.py
1 #!/usr/bin/env python
2 # Copyright (c) 2016 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 """Router advertisement check script."""
16
17 import sys
18 import ipaddress
19
20 from resources.libraries.python.PacketVerifier import RxQueue
21 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
22
23
24 def mac_to_ipv6_linklocal(mac):
25     """Transfer MAC address into specific link-local IPv6 address.
26
27     :param mac: MAC address to be transferred.
28     :type mac: str
29     :return: IPv6 link-local address.
30     :rtype: str
31     """
32     # Remove the most common delimiters: dots, dashes, etc.
33     mac_value = int(mac.translate(None, ' .:-'), 16)
34
35     # Split out the bytes that slot into the IPv6 address
36     # XOR the most significant byte with 0x02, inverting the
37     # Universal / Local bit
38     high2 = mac_value >> 32 & 0xffff ^ 0x0200
39     high1 = mac_value >> 24 & 0xff
40     low1 = mac_value >> 16 & 0xff
41     low2 = mac_value & 0xffff
42
43     return 'fe80::{:04x}:{:02x}ff:fe{:02x}:{:04x}'.format(
44         high2, high1, low1, low2)
45
46
47 def main():
48     """Check packets on specific port and look for the Router Advertisement
49      part.
50     """
51
52     args = TrafficScriptArg(['src_mac', 'interval'])
53
54     rx_if = args.get_arg('rx_if')
55     src_mac = args.get_arg('src_mac')
56     interval = int(args.get_arg('interval'))
57     rxq = RxQueue(rx_if)
58
59     ether = rxq.recv(max(5, interval))
60
61     # Check whether received packet contains layer RA and check other values
62     if ether is None:
63         raise RuntimeError('ICMP echo Rx timeout')
64
65     if not ether.haslayer('ICMPv6ND_RA'):
66         raise RuntimeError('Not an RA packet received {0}'
67                            .format(ether.__repr__()))
68
69     src_address = ipaddress.IPv6Address(unicode(ether['IPv6'].src))
70     dst_address = ipaddress.IPv6Address(unicode(ether['IPv6'].dst))
71     link_local = ipaddress.IPv6Address(unicode(mac_to_ipv6_linklocal(src_mac)))
72     all_nodes_multicast = ipaddress.IPv6Address(u'ff02::1')
73
74     if src_address != link_local:
75         raise RuntimeError(
76             'Source address ({0}) not matching link local address({1})'.format(
77                 src_address, link_local))
78     if dst_address != all_nodes_multicast:
79         raise RuntimeError('Packet destination address ({0}) is not the all '
80                            'nodes multicast address ({1}).'.format(
81                             dst_address, all_nodes_multicast))
82     if ether['IPv6'].hlim != 255:
83         raise RuntimeError('Hop limit not correct: {0}!=255'.format(
84             ether['IPv6'].hlim))
85
86     ra_code = ether['ICMPv6 Neighbor Discovery - Router Advertisement'].code
87     if ra_code != 0:
88         raise RuntimeError('ICMP code: {0} not correct. '.format(ra_code))
89
90     sys.exit(0)
91
92 if __name__ == "__main__":
93     main()