CSIT-768: Refactor Python container libraries
[csit.git] / resources / traffic_scripts / check_ra_packet.py
1 #!/usr/bin/env python
2 # Copyright (c) 2016 Cisco and/or its affiliates.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at:
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 """Router advertisement check script."""
16
17 import sys
18 import ipaddress
19
20 from scapy.layers.inet6 import IPv6, ICMPv6ND_RA, ICMPv6ND_NS
21
22 from resources.libraries.python.PacketVerifier import RxQueue
23 from resources.libraries.python.TrafficScriptArg import TrafficScriptArg
24
25
26 def mac_to_ipv6_linklocal(mac):
27     """Transfer MAC address into specific link-local IPv6 address.
28
29     :param mac: MAC address to be transferred.
30     :type mac: str
31     :return: IPv6 link-local address.
32     :rtype: str
33     """
34     # Remove the most common delimiters: dots, dashes, etc.
35     mac_value = int(mac.translate(None, ' .:-'), 16)
36
37     # Split out the bytes that slot into the IPv6 address
38     # XOR the most significant byte with 0x02, inverting the
39     # Universal / Local bit
40     high2 = mac_value >> 32 & 0xffff ^ 0x0200
41     high1 = mac_value >> 24 & 0xff
42     low1 = mac_value >> 16 & 0xff
43     low2 = mac_value & 0xffff
44
45     return 'fe80::{:04x}:{:02x}ff:fe{:02x}:{:04x}'.format(
46         high2, high1, low1, low2)
47
48
49 def main():
50     """Check packets on specific port and look for the Router Advertisement
51      part.
52     """
53
54     args = TrafficScriptArg(['src_mac', 'interval'])
55
56     rx_if = args.get_arg('rx_if')
57     src_mac = args.get_arg('src_mac')
58     interval = int(args.get_arg('interval'))
59     rxq = RxQueue(rx_if)
60
61     # receive ICMPv6ND_RA packet
62     while True:
63         ether = rxq.recv(max(5, interval))
64         if ether is None:
65             raise RuntimeError('ICMP echo Rx timeout')
66
67         if ether.haslayer(ICMPv6ND_NS):
68             # read another packet in the queue if the current one is ICMPv6ND_NS
69             continue
70         else:
71             # otherwise process the current packet
72             break
73
74     # Check if received packet contains layer RA and check other values
75     if not ether.haslayer(ICMPv6ND_RA):
76         raise RuntimeError('Not an RA packet received {0}'.
77                            format(ether.__repr__()))
78
79     src_address = ipaddress.IPv6Address(unicode(ether['IPv6'].src))
80     dst_address = ipaddress.IPv6Address(unicode(ether['IPv6'].dst))
81     link_local = ipaddress.IPv6Address(unicode(mac_to_ipv6_linklocal(src_mac)))
82     all_nodes_multicast = ipaddress.IPv6Address(u'ff02::1')
83
84     if src_address != link_local:
85         raise RuntimeError('Source address ({0}) not matching link local '
86                            'address ({1})'.format(src_address, link_local))
87     if dst_address != all_nodes_multicast:
88         raise RuntimeError('Packet destination address ({0}) is not the all'
89                            ' nodes multicast address ({1}).'.
90                            format(dst_address, all_nodes_multicast))
91     if ether[IPv6].hlim != 255:
92         raise RuntimeError('Hop limit not correct: {0}!=255'.
93                            format(ether[IPv6].hlim))
94
95     ra_code = ether[ICMPv6ND_RA].code
96     if ra_code != 0:
97         raise RuntimeError('ICMP code: {0} not correct. '.format(ra_code))
98
99     sys.exit(0)
100
101
102 if __name__ == "__main__":
103     main()