Fix pylint warnings in IPFIXUtil.py
[csit.git] / resources / libraries / python / IPFIXUtil.py
1 # Copyright (c) 2016 Cisco and/or its affiliates.
2 # Licensed under the Apache License, Version 2.0 (the "License");
3 # you may not use this file except in compliance with the License.
4 # You may obtain a copy of the License at:
5 #
6 #     http://www.apache.org/licenses/LICENSE-2.0
7 #
8 # Unless required by applicable law or agreed to in writing, software
9 # distributed under the License is distributed on an "AS IS" BASIS,
10 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11 # See the License for the specific language governing permissions and
12 # limitations under the License.
13
14 """IPFIX utilities library. Provides classes that allow scapy to work
15 with IPFIX packets.
16
17  Note:
18  Template and data sets in one packet are not supported.
19  Option template sets (Set_ID = 3) are not supported.
20   """
21
22
23 from scapy.all import Packet, bind_layers
24 from scapy.fields import ByteField, ShortField, IntField, LongField, IPField,\
25     StrFixedLenField, FieldListField
26 from scapy.layers.inet import UDP
27 from scapy.layers.inet6 import IP6Field
28 from scapy.contrib.ppi_geotag import UTCTimeField
29
30
31 class IPFIXHandler(object):
32     """Class for handling IPFIX packets. To use, create instance of class before
33      dissecting IPFIX packets with scapy, then run update_template every time
34      an IPFIX template packet is received."""
35
36     template_elements = {
37         4: ByteField("Protocol_ID", 0x00),
38         7: ShortField("src_port", 0),
39         8: IPField("IPv4_src", ""),
40         11: ShortField("dst_port", 0),
41         12: IPField("IPv4_dst", ""),
42         27: IP6Field("IPv6_src", "::"),
43         28: IP6Field("IPv6_dst", "::"),
44         86: LongField("packetTotalCount", 0),
45         180: ShortField("udp_src_port", 0),
46         181: ShortField("udp_dst_port", 0),
47         182: ShortField("tcp_src_port", 0),
48         183: ShortField("tcp_dst_port", 0),
49         193: ByteField("Next_header", 0x00)
50     }
51
52     def __init__(self):
53         """Initializer, registers IPFIX header and template layers with scapy.
54         """
55         bind_layers(UDP, IPFIXHeader, dport=4739)
56         bind_layers(IPFIXHeader, IPFIXTemplate, Set_ID=2)
57
58     def update_template(self, packet):
59         """Updates IPFIXData class with new data template. Registers IPFIX data
60         layer with scapy using the new template.
61
62         :param packet: Packet containing an IPFIX template.
63         :type packet: scapy.Ether
64         """
65         template_list = packet['IPFIX template'].Template
66         template_id = packet['IPFIX template'].Template_ID
67
68         IPFIXData.fields_desc = []
69         for item in template_list[::2]:
70             try:
71                 IPFIXData.fields_desc.append(self.template_elements[item])
72             except KeyError:
73                 raise KeyError(
74                     "Unknown IPFIX template element with ID {0}".format(item))
75         bind_layers(IPFIXHeader, IPFIXData, Set_ID=template_id)
76         # if the packet doesn't end here, assume it contains more data sets
77         bind_layers(IPFIXData, IPFIXData)
78
79
80 class IPFIXHeader(Packet):
81     """Class for IPFIX header."""
82     name = "IPFIX header"
83     fields_desc = [StrFixedLenField("Version", 0x000a, length=2),
84                    ShortField("Message Length", 0),
85                    UTCTimeField("Timestamp(UTC)", ""),
86                    IntField("Sequence Number", 0),
87                    IntField("Observation Domain ID", 0),
88                    ShortField("Set_ID", 0),
89                    ShortField("Set_Length", 0)
90                    ]
91
92
93 class IPFIXTemplate(Packet):
94     """Class for IPFIX template layer."""
95     name = "IPFIX template"
96     fields_desc = [ShortField("Template_ID", 256),
97                    ShortField("nFields", 2),
98                    FieldListField("Template", [], ShortField("type_len", ""),
99                                   count_from=lambda p: p.nFields*2)
100                    ]
101
102
103 class IPFIXData(Packet):
104     """Class for IPFIX data layer. Needs to be updated with
105     a template before use."""
106     name = "IPFIX flow data"
107     fields_desc = []