make test: improve documentation and PEP8 compliance
[vpp.git] / test / test_flowperpkt.py
1 #!/usr/bin/env python
2
3 import unittest
4
5 from framework import VppTestCase, VppTestRunner
6
7 from scapy.packet import Raw
8 from scapy.layers.l2 import Ether
9 from scapy.layers.inet import IP, UDP
10
11
12 class TestFlowperpkt(VppTestCase):
13     """ Flow-per-packet plugin: test both L2 and IP4 reporting """
14
15     def setUp(self):
16         """
17         Set up
18
19         **Config:**
20             - create three PG interfaces
21         """
22         super(TestFlowperpkt, self).setUp()
23
24         self.create_pg_interfaces(range(3))
25
26         self.pg_if_packet_sizes = [150]
27
28         self.interfaces = list(self.pg_interfaces)
29
30         for intf in self.interfaces:
31             intf.admin_up()
32             intf.config_ip4()
33             intf.resolve_arp()
34
35     def create_stream(self, src_if, dst_if, packet_sizes):
36         """Create a packet stream to tickle the plugin
37
38         :param VppInterface src_if: Source interface for packet stream
39         :param VppInterface src_if: Dst interface for packet stream
40         :param list packet_sizes: Sizes to test
41         """
42         pkts = []
43         for size in packet_sizes:
44             info = self.create_packet_info(src_if, dst_if)
45             payload = self.info_to_payload(info)
46             p = (Ether(src=src_if.local_mac, dst=dst_if.remote_mac) /
47                  IP(src=src_if.remote_ip4, dst=dst_if.remote_ip4) /
48                  UDP(sport=1234, dport=4321) /
49                  Raw(payload))
50             info.data = p.copy()
51             self.extend_packet(p, size)
52             pkts.append(p)
53         return pkts
54
55     @staticmethod
56     def compare_with_mask(payload, masked_expected_data):
57         if len(payload) * 2 != len(masked_expected_data):
58             return False
59
60         # iterate over pairs: raw byte from payload and ASCII code for that
61         # byte from masked payload (or XX if masked)
62         for i in range(len(payload)):
63             p = payload[i]
64             m = masked_expected_data[2 * i:2 * i + 2]
65             if m != "XX":
66                 if "%02x" % ord(p) != m:
67                     return False
68         return True
69
70     def verify_ipfix(self, collector_if):
71         """Check the ipfix capture"""
72         found_data_packet = False
73         found_template_packet = False
74         found_l2_data_packet = False
75         found_l2_template_packet = False
76
77         # Scapy, of course, understands ipfix not at all...
78         # These data vetted by manual inspection in wireshark
79         # X'ed out fields are timestamps, which will absolutely
80         # fail to compare.
81
82         data_udp_string = "1283128300370000000a002fXXXXXXXX000000000000000101"\
83             "00001f0000000100000002ac100102ac10020200XXXXXXXXXXXXXXXX0092"
84
85         template_udp_string = "12831283003c0000000a0034XXXXXXXX00000002000000"\
86             "010002002401000007000a0004000e000400080004000c000400050001009c00"\
87             "0801380002"
88
89         l2_data_udp_string = "12831283003c0000000a0034XXXXXXXX000000010000000"\
90             "1010100240000000100000002%s02020000ff020008XXXXXXXXXXX"\
91             "XXXXX0092" % self.pg1.local_mac.translate(None, ":")
92
93         l2_template_udp_string = "12831283003c0000000a0034XXXXXXXX00000002000"\
94             "000010002002401010007000a0004000e0004003800060050000601000002009"\
95             "c000801380002"
96
97         self.logger.info("Look for ipfix packets on %s sw_if_index %d "
98                          % (collector_if.name, collector_if.sw_if_index))
99         # expecting 4 packets on collector interface based on traffic on other
100         # interfaces
101         capture = collector_if.get_capture(4)
102
103         for p in capture:
104             ip = p[IP]
105             udp = p[UDP]
106             self.logger.info("src %s dst %s" % (ip.src, ip.dst))
107             self.logger.info(" udp src_port %s dst_port %s"
108                              % (udp.sport, udp.dport))
109
110             payload = str(udp)
111
112             if self.compare_with_mask(payload, data_udp_string):
113                 self.logger.info("found ip4 data packet")
114                 found_data_packet = True
115             elif self.compare_with_mask(payload, template_udp_string):
116                 self.logger.info("found ip4 template packet")
117                 found_template_packet = True
118             elif self.compare_with_mask(payload, l2_data_udp_string):
119                 self.logger.info("found l2 data packet")
120                 found_l2_data_packet = True
121             elif self.compare_with_mask(payload, l2_template_udp_string):
122                 self.logger.info("found l2 template packet")
123                 found_l2_template_packet = True
124             else:
125                 unmasked_payload = "".join(["%02x" % ord(c) for c in payload])
126                 self.logger.error("unknown pkt '%s'" % unmasked_payload)
127
128         self.assertTrue(found_data_packet, "Data packet not found")
129         self.assertTrue(found_template_packet, "Template packet not found")
130         self.assertTrue(found_l2_data_packet, "L2 data packet not found")
131         self.assertTrue(found_l2_template_packet,
132                         "L2 template packet not found")
133
134     def test_L3_fpp(self):
135         """ Flow per packet L3 test """
136
137         # Configure an ipfix report on the [nonexistent] collector
138         # 172.16.3.2, as if it was connected to the pg2 interface
139         # Install a FIB entry, so the exporter's work won't turn into
140         # an ARP request
141
142         self.pg_enable_capture(self.pg_interfaces)
143         self.pg2.configure_ipv4_neighbors()
144         self.vapi.set_ipfix_exporter(collector_address=self.pg2.remote_ip4n,
145                                      src_address=self.pg2.local_ip4n,
146                                      path_mtu=1450,
147                                      template_interval=1)
148
149         # Export flow records for all pkts transmitted on pg1
150         self.vapi.cli("flowperpkt feature add-del pg1")
151         self.vapi.cli("flowperpkt feature add-del pg1 l2")
152
153         # Arrange to minimally trace generated ipfix packets
154         self.vapi.cli("trace add flowperpkt-ipv4 10")
155         self.vapi.cli("trace add flowperpkt-l2 10")
156
157         # Create a stream from pg0 -> pg1, which causes
158         # an ipfix packet to be transmitted on pg2
159
160         pkts = self.create_stream(self.pg0, self.pg1,
161                                   self.pg_if_packet_sizes)
162         self.pg0.add_stream(pkts)
163         self.pg_start()
164
165         # Flush the ipfix collector, so we don't need any
166         # asinine time.sleep(5) action
167         self.vapi.cli("ipfix flush")
168
169         # Make sure the 4 pkts we expect actually showed up
170         self.verify_ipfix(self.pg2)
171
172 if __name__ == '__main__':
173     unittest.main(testRunner=VppTestRunner)