tests: replace pycodestyle with black
[vpp.git] / test / test_syslog.py
1 #!/usr/bin/env python3
2
3 import unittest
4 from framework import VppTestCase, VppTestRunner
5 from util import ppp
6 from scapy.packet import Raw
7 from scapy.layers.inet import IP, UDP
8 from syslog_rfc5424_parser import SyslogMessage, ParseError
9 from syslog_rfc5424_parser.constants import SyslogFacility, SyslogSeverity
10 from vpp_papi import VppEnum
11
12
13 class TestSyslog(VppTestCase):
14     """Syslog Protocol Test Cases"""
15
16     @property
17     def SYSLOG_SEVERITY(self):
18         return VppEnum.vl_api_syslog_severity_t
19
20     @classmethod
21     def setUpClass(cls):
22         super(TestSyslog, cls).setUpClass()
23
24         try:
25             (cls.pg0,) = cls.create_pg_interfaces(range(1))
26             cls.pg0.admin_up()
27             cls.pg0.config_ip4()
28             cls.pg0.resolve_arp()
29
30         except Exception:
31             super(TestSyslog, cls).tearDownClass()
32             raise
33
34     @classmethod
35     def tearDownClass(cls):
36         super(TestSyslog, cls).tearDownClass()
37
38     def syslog_generate(self, facility, severity, appname, msgid, sd=None, msg=None):
39         """
40         Generate syslog message
41
42         :param facility: facility value
43         :param severity: severity level
44         :param appname: application name that originate message
45         :param msgid: message identifier
46         :param sd: structured data (optional)
47         :param msg: free-form message (optional)
48         """
49         facility_str = [
50             "kernel",
51             "user-level",
52             "mail-system",
53             "system-daemons",
54             "security-authorization",
55             "syslogd",
56             "line-printer",
57             "network-news",
58             "uucp",
59             "clock-daemon",
60             "",
61             "ftp-daemon",
62             "ntp-subsystem",
63             "log-audit",
64             "log-alert",
65             "",
66             "local0",
67             "local1",
68             "local2",
69             "local3",
70             "local4",
71             "local5",
72             "local6",
73             "local7",
74         ]
75
76         severity_str = [
77             "emergency",
78             "alert",
79             "critical",
80             "error",
81             "warning",
82             "notice",
83             "informational",
84             "debug",
85         ]
86
87         cli_str = "test syslog %s %s %s %s" % (
88             facility_str[facility],
89             severity_str[severity],
90             appname,
91             msgid,
92         )
93         if sd is not None:
94             for sd_id, sd_params in sd.items():
95                 cli_str += " sd-id %s" % (sd_id)
96                 for name, value in sd_params.items():
97                     cli_str += " sd-param %s %s" % (name, value)
98         if msg is not None:
99             cli_str += " %s" % (msg)
100         self.vapi.cli(cli_str)
101
102     def syslog_verify(
103         self, data, facility, severity, appname, msgid, sd=None, msg=None
104     ):
105         """
106         Verify syslog message
107
108         :param data: syslog message
109         :param facility: facility value
110         :param severity: severity level
111         :param appname: application name that originate message
112         :param msgid: message identifier
113         :param sd: structured data (optional)
114         :param msg: free-form message (optional)
115         """
116         message = data.decode("utf-8")
117         if sd is None:
118             sd = {}
119         try:
120             message = SyslogMessage.parse(message)
121         except ParseError as e:
122             self.logger.error(e)
123             raise
124         else:
125             self.assertEqual(message.facility, facility)
126             self.assertEqual(message.severity, severity)
127             self.assertEqual(message.appname, appname)
128             self.assertEqual(message.msgid, msgid)
129             self.assertEqual(message.msg, msg)
130             self.assertEqual(message.sd, sd)
131             self.assertEqual(message.version, 1)
132             self.assertEqual(message.hostname, self.pg0.local_ip4)
133
134     def test_syslog(self):
135         """Syslog Protocol test"""
136         self.vapi.syslog_set_sender(
137             src_address=self.pg0.local_ip4, collector_address=self.pg0.remote_ip4
138         )
139         config = self.vapi.syslog_get_sender()
140         self.assertEqual(str(config.collector_address), self.pg0.remote_ip4)
141         self.assertEqual(config.collector_port, 514)
142         self.assertEqual(str(config.src_address), self.pg0.local_ip4)
143         self.assertEqual(config.vrf_id, 0)
144         self.assertEqual(config.max_msg_size, 480)
145
146         appname = "test"
147         msgid = "testMsg"
148         msg = "this is message"
149         sd1 = {
150             "exampleSDID@32473": {"iut": "3", "eventSource": "App", "eventID": "1011"}
151         }
152         sd2 = {
153             "exampleSDID@32473": {"iut": "3", "eventSource": "App", "eventID": "1011"},
154             "examplePriority@32473": {"class": "high"},
155         }
156
157         self.pg_enable_capture(self.pg_interfaces)
158         self.syslog_generate(
159             SyslogFacility.local7, SyslogSeverity.info, appname, msgid, None, msg
160         )
161         capture = self.pg0.get_capture(1)
162         try:
163             self.assertEqual(capture[0][IP].src, self.pg0.local_ip4)
164             self.assertEqual(capture[0][IP].dst, self.pg0.remote_ip4)
165             self.assertEqual(capture[0][UDP].dport, 514)
166             self.assert_packet_checksums_valid(capture[0], False)
167         except:
168             self.logger.error(ppp("invalid packet:", capture[0]))
169             raise
170         self.syslog_verify(
171             capture[0][Raw].load,
172             SyslogFacility.local7,
173             SyslogSeverity.info,
174             appname,
175             msgid,
176             None,
177             msg,
178         )
179
180         self.pg_enable_capture(self.pg_interfaces)
181         self.vapi.syslog_set_filter(self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_WARN)
182         filter = self.vapi.syslog_get_filter()
183         self.assertEqual(filter.severity, self.SYSLOG_SEVERITY.SYSLOG_API_SEVERITY_WARN)
184         self.syslog_generate(
185             SyslogFacility.local7, SyslogSeverity.info, appname, msgid, None, msg
186         )
187         self.pg0.assert_nothing_captured()
188
189         self.pg_enable_capture(self.pg_interfaces)
190         self.syslog_generate(
191             SyslogFacility.local6, SyslogSeverity.warning, appname, msgid, sd1, msg
192         )
193         capture = self.pg0.get_capture(1)
194         self.syslog_verify(
195             capture[0][Raw].load,
196             SyslogFacility.local6,
197             SyslogSeverity.warning,
198             appname,
199             msgid,
200             sd1,
201             msg,
202         )
203
204         self.vapi.syslog_set_sender(
205             self.pg0.local_ip4, self.pg0.remote_ip4, collector_port=12345
206         )
207         config = self.vapi.syslog_get_sender()
208         self.assertEqual(config.collector_port, 12345)
209
210         self.pg_enable_capture(self.pg_interfaces)
211         self.syslog_generate(
212             SyslogFacility.local5, SyslogSeverity.err, appname, msgid, sd2, None
213         )
214         capture = self.pg0.get_capture(1)
215         try:
216             self.assertEqual(capture[0][UDP].dport, 12345)
217         except:
218             self.logger.error(ppp("invalid packet:", capture[0]))
219             raise
220         self.syslog_verify(
221             capture[0][Raw].load,
222             SyslogFacility.local5,
223             SyslogSeverity.err,
224             appname,
225             msgid,
226             sd2,
227             None,
228         )
229
230
231 if __name__ == "__main__":
232     unittest.main(testRunner=VppTestRunner)