New upstream version 18.11.2
[deb_dpdk.git] / examples / vm_power_manager / channel_monitor.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2014 Intel Corporation
3  */
4
5 #include <unistd.h>
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <stdint.h>
9 #include <signal.h>
10 #include <errno.h>
11 #include <string.h>
12 #include <fcntl.h>
13 #include <sys/types.h>
14 #include <sys/epoll.h>
15 #include <sys/queue.h>
16 #include <sys/time.h>
17 #include <sys/socket.h>
18 #include <sys/select.h>
19 #ifdef USE_JANSSON
20 #include <jansson.h>
21 #else
22 #pragma message "Jansson dev libs unavailable, not including JSON parsing"
23 #endif
24 #include <rte_log.h>
25 #include <rte_memory.h>
26 #include <rte_malloc.h>
27 #include <rte_atomic.h>
28 #include <rte_cycles.h>
29 #include <rte_ethdev.h>
30 #include <rte_pmd_i40e.h>
31
32 #include <libvirt/libvirt.h>
33 #include "channel_monitor.h"
34 #include "channel_commands.h"
35 #include "channel_manager.h"
36 #include "power_manager.h"
37 #include "oob_monitor.h"
38
39 #define RTE_LOGTYPE_CHANNEL_MONITOR RTE_LOGTYPE_USER1
40
41 #define MAX_EVENTS 256
42
43 uint64_t vsi_pkt_count_prev[384];
44 uint64_t rdtsc_prev[384];
45 #define MAX_JSON_STRING_LEN 1024
46 char json_data[MAX_JSON_STRING_LEN];
47
48 double time_period_ms = 1;
49 static volatile unsigned run_loop = 1;
50 static int global_event_fd;
51 static unsigned int policy_is_set;
52 static struct epoll_event *global_events_list;
53 static struct policy policies[MAX_CLIENTS];
54
55 #ifdef USE_JANSSON
56
57 union PFID {
58         struct ether_addr addr;
59         uint64_t pfid;
60 };
61
62 static int
63 str_to_ether_addr(const char *a, struct ether_addr *ether_addr)
64 {
65         int i;
66         char *end;
67         unsigned long o[ETHER_ADDR_LEN];
68
69         i = 0;
70         do {
71                 errno = 0;
72                 o[i] = strtoul(a, &end, 16);
73                 if (errno != 0 || end == a || (end[0] != ':' && end[0] != 0))
74                         return -1;
75                 a = end + 1;
76         } while (++i != RTE_DIM(o) / sizeof(o[0]) && end[0] != 0);
77
78         /* Junk at the end of line */
79         if (end[0] != 0)
80                 return -1;
81
82         /* Support the format XX:XX:XX:XX:XX:XX */
83         if (i == ETHER_ADDR_LEN) {
84                 while (i-- != 0) {
85                         if (o[i] > UINT8_MAX)
86                                 return -1;
87                         ether_addr->addr_bytes[i] = (uint8_t)o[i];
88                 }
89         /* Support the format XXXX:XXXX:XXXX */
90         } else if (i == ETHER_ADDR_LEN / 2) {
91                 while (i-- != 0) {
92                         if (o[i] > UINT16_MAX)
93                                 return -1;
94                         ether_addr->addr_bytes[i * 2] =
95                                         (uint8_t)(o[i] >> 8);
96                         ether_addr->addr_bytes[i * 2 + 1] =
97                                         (uint8_t)(o[i] & 0xff);
98                 }
99         /* unknown format */
100         } else
101                 return -1;
102
103         return 0;
104 }
105
106 static int
107 set_policy_mac(struct channel_packet *pkt, int idx, char *mac)
108 {
109         union PFID pfid;
110         int ret;
111
112         /* Use port MAC address as the vfid */
113         ret = str_to_ether_addr(mac, &pfid.addr);
114
115         if (ret != 0) {
116                 RTE_LOG(ERR, CHANNEL_MONITOR,
117                         "Invalid mac address received in JSON\n");
118                 pkt->vfid[idx] = 0;
119                 return -1;
120         }
121
122         printf("Received MAC Address: %02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 ":"
123                         "%02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 "\n",
124                         pfid.addr.addr_bytes[0], pfid.addr.addr_bytes[1],
125                         pfid.addr.addr_bytes[2], pfid.addr.addr_bytes[3],
126                         pfid.addr.addr_bytes[4], pfid.addr.addr_bytes[5]);
127
128         pkt->vfid[idx] = pfid.pfid;
129         return 0;
130 }
131
132
133 static int
134 parse_json_to_pkt(json_t *element, struct channel_packet *pkt)
135 {
136         const char *key;
137         json_t *value;
138         int ret;
139
140         memset(pkt, 0, sizeof(struct channel_packet));
141
142         pkt->nb_mac_to_monitor = 0;
143         pkt->t_boost_status.tbEnabled = false;
144         pkt->workload = LOW;
145         pkt->policy_to_use = TIME;
146         pkt->command = PKT_POLICY;
147         pkt->core_type = CORE_TYPE_PHYSICAL;
148
149         json_object_foreach(element, key, value) {
150                 if (!strcmp(key, "policy")) {
151                         /* Recurse in to get the contents of profile */
152                         ret = parse_json_to_pkt(value, pkt);
153                         if (ret)
154                                 return ret;
155                 } else if (!strcmp(key, "instruction")) {
156                         /* Recurse in to get the contents of instruction */
157                         ret = parse_json_to_pkt(value, pkt);
158                         if (ret)
159                                 return ret;
160                 } else if (!strcmp(key, "name")) {
161                         strlcpy(pkt->vm_name, json_string_value(value),
162                                         sizeof(pkt->vm_name));
163                 } else if (!strcmp(key, "command")) {
164                         char command[32];
165                         snprintf(command, 32, "%s", json_string_value(value));
166                         if (!strcmp(command, "power")) {
167                                 pkt->command = CPU_POWER;
168                         } else if (!strcmp(command, "create")) {
169                                 pkt->command = PKT_POLICY;
170                         } else if (!strcmp(command, "destroy")) {
171                                 pkt->command = PKT_POLICY_REMOVE;
172                         } else {
173                                 RTE_LOG(ERR, CHANNEL_MONITOR,
174                                         "Invalid command received in JSON\n");
175                                 return -1;
176                         }
177                 } else if (!strcmp(key, "policy_type")) {
178                         char command[32];
179                         snprintf(command, 32, "%s", json_string_value(value));
180                         if (!strcmp(command, "TIME")) {
181                                 pkt->policy_to_use = TIME;
182                         } else if (!strcmp(command, "TRAFFIC")) {
183                                 pkt->policy_to_use = TRAFFIC;
184                         } else if (!strcmp(command, "WORKLOAD")) {
185                                 pkt->policy_to_use = WORKLOAD;
186                         } else if (!strcmp(command, "BRANCH_RATIO")) {
187                                 pkt->policy_to_use = BRANCH_RATIO;
188                         } else {
189                                 RTE_LOG(ERR, CHANNEL_MONITOR,
190                                         "Wrong policy_type received in JSON\n");
191                                 return -1;
192                         }
193                 } else if (!strcmp(key, "workload")) {
194                         char command[32];
195                         snprintf(command, 32, "%s", json_string_value(value));
196                         if (!strcmp(command, "HIGH")) {
197                                 pkt->workload = HIGH;
198                         } else if (!strcmp(command, "MEDIUM")) {
199                                 pkt->workload = MEDIUM;
200                         } else if (!strcmp(command, "LOW")) {
201                                 pkt->workload = LOW;
202                         } else {
203                                 RTE_LOG(ERR, CHANNEL_MONITOR,
204                                         "Wrong workload received in JSON\n");
205                                 return -1;
206                         }
207                 } else if (!strcmp(key, "busy_hours")) {
208                         unsigned int i;
209                         size_t size = json_array_size(value);
210
211                         for (i = 0; i < size; i++) {
212                                 int hour = (int)json_integer_value(
213                                                 json_array_get(value, i));
214                                 pkt->timer_policy.busy_hours[i] = hour;
215                         }
216                 } else if (!strcmp(key, "quiet_hours")) {
217                         unsigned int i;
218                         size_t size = json_array_size(value);
219
220                         for (i = 0; i < size; i++) {
221                                 int hour = (int)json_integer_value(
222                                                 json_array_get(value, i));
223                                 pkt->timer_policy.quiet_hours[i] = hour;
224                         }
225                 } else if (!strcmp(key, "core_list")) {
226                         unsigned int i;
227                         size_t size = json_array_size(value);
228
229                         for (i = 0; i < size; i++) {
230                                 int core = (int)json_integer_value(
231                                                 json_array_get(value, i));
232                                 pkt->vcpu_to_control[i] = core;
233                         }
234                         pkt->num_vcpu = size;
235                 } else if (!strcmp(key, "mac_list")) {
236                         unsigned int i;
237                         size_t size = json_array_size(value);
238
239                         for (i = 0; i < size; i++) {
240                                 char mac[32];
241                                 snprintf(mac, 32, "%s", json_string_value(
242                                                 json_array_get(value, i)));
243                                 set_policy_mac(pkt, i, mac);
244                         }
245                         pkt->nb_mac_to_monitor = size;
246                 } else if (!strcmp(key, "avg_packet_thresh")) {
247                         pkt->traffic_policy.avg_max_packet_thresh =
248                                         (uint32_t)json_integer_value(value);
249                 } else if (!strcmp(key, "max_packet_thresh")) {
250                         pkt->traffic_policy.max_max_packet_thresh =
251                                         (uint32_t)json_integer_value(value);
252                 } else if (!strcmp(key, "unit")) {
253                         char unit[32];
254                         snprintf(unit, 32, "%s", json_string_value(value));
255                         if (!strcmp(unit, "SCALE_UP")) {
256                                 pkt->unit = CPU_POWER_SCALE_UP;
257                         } else if (!strcmp(unit, "SCALE_DOWN")) {
258                                 pkt->unit = CPU_POWER_SCALE_DOWN;
259                         } else if (!strcmp(unit, "SCALE_MAX")) {
260                                 pkt->unit = CPU_POWER_SCALE_MAX;
261                         } else if (!strcmp(unit, "SCALE_MIN")) {
262                                 pkt->unit = CPU_POWER_SCALE_MIN;
263                         } else if (!strcmp(unit, "ENABLE_TURBO")) {
264                                 pkt->unit = CPU_POWER_ENABLE_TURBO;
265                         } else if (!strcmp(unit, "DISABLE_TURBO")) {
266                                 pkt->unit = CPU_POWER_DISABLE_TURBO;
267                         } else {
268                                 RTE_LOG(ERR, CHANNEL_MONITOR,
269                                         "Invalid command received in JSON\n");
270                                 return -1;
271                         }
272                 } else if (!strcmp(key, "resource_id")) {
273                         pkt->resource_id = (uint32_t)json_integer_value(value);
274                 } else {
275                         RTE_LOG(ERR, CHANNEL_MONITOR,
276                                 "Unknown key received in JSON string: %s\n",
277                                 key);
278                 }
279         }
280         return 0;
281 }
282 #endif
283
284 void channel_monitor_exit(void)
285 {
286         run_loop = 0;
287         rte_free(global_events_list);
288 }
289
290 static void
291 core_share(int pNo, int z, int x, int t)
292 {
293         if (policies[pNo].core_share[z].pcpu == lvm_info[x].pcpus[t]) {
294                 if (strcmp(policies[pNo].pkt.vm_name,
295                                 lvm_info[x].vm_name) != 0) {
296                         policies[pNo].core_share[z].status = 1;
297                         power_manager_scale_core_max(
298                                         policies[pNo].core_share[z].pcpu);
299                 }
300         }
301 }
302
303 static void
304 core_share_status(int pNo)
305 {
306
307         int noVms = 0, noVcpus = 0, z, x, t;
308
309         get_all_vm(&noVms, &noVcpus);
310
311         /* Reset Core Share Status. */
312         for (z = 0; z < noVcpus; z++)
313                 policies[pNo].core_share[z].status = 0;
314
315         /* Foreach vcpu in a policy. */
316         for (z = 0; z < policies[pNo].pkt.num_vcpu; z++) {
317                 /* Foreach VM on the platform. */
318                 for (x = 0; x < noVms; x++) {
319                         /* Foreach vcpu of VMs on platform. */
320                         for (t = 0; t < lvm_info[x].num_cpus; t++)
321                                 core_share(pNo, z, x, t);
322                 }
323         }
324 }
325
326
327 static int
328 pcpu_monitor(struct policy *pol, struct core_info *ci, int pcpu, int count)
329 {
330         int ret = 0;
331
332         if (pol->pkt.policy_to_use == BRANCH_RATIO) {
333                 ci->cd[pcpu].oob_enabled = 1;
334                 ret = add_core_to_monitor(pcpu);
335                 if (ret == 0)
336                         RTE_LOG(INFO, CHANNEL_MONITOR,
337                                         "Monitoring pcpu %d OOB for %s\n",
338                                         pcpu, pol->pkt.vm_name);
339                 else
340                         RTE_LOG(ERR, CHANNEL_MONITOR,
341                                         "Error monitoring pcpu %d OOB for %s\n",
342                                         pcpu, pol->pkt.vm_name);
343
344         } else {
345                 pol->core_share[count].pcpu = pcpu;
346                 RTE_LOG(INFO, CHANNEL_MONITOR,
347                                 "Monitoring pcpu %d for %s\n",
348                                 pcpu, pol->pkt.vm_name);
349         }
350         return ret;
351 }
352
353 static void
354 get_pcpu_to_control(struct policy *pol)
355 {
356
357         /* Convert vcpu to pcpu. */
358         struct vm_info info;
359         int pcpu, count;
360         uint64_t mask_u64b;
361         struct core_info *ci;
362
363         ci = get_core_info();
364
365         RTE_LOG(DEBUG, CHANNEL_MONITOR,
366                         "Looking for pcpu for %s\n", pol->pkt.vm_name);
367
368         /*
369          * So now that we're handling virtual and physical cores, we need to
370          * differenciate between them when adding them to the branch monitor.
371          * Virtual cores need to be converted to physical cores.
372          */
373         if (pol->pkt.core_type == CORE_TYPE_VIRTUAL) {
374                 /*
375                  * If the cores in the policy are virtual, we need to map them
376                  * to physical core. We look up the vm info and use that for
377                  * the mapping.
378                  */
379                 get_info_vm(pol->pkt.vm_name, &info);
380                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
381                         mask_u64b =
382                                 info.pcpu_mask[pol->pkt.vcpu_to_control[count]];
383                         for (pcpu = 0; mask_u64b;
384                                         mask_u64b &= ~(1ULL << pcpu++)) {
385                                 if ((mask_u64b >> pcpu) & 1)
386                                         pcpu_monitor(pol, ci, pcpu, count);
387                         }
388                 }
389         } else {
390                 /*
391                  * If the cores in the policy are physical, we just use
392                  * those core id's directly.
393                  */
394                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
395                         pcpu = pol->pkt.vcpu_to_control[count];
396                         pcpu_monitor(pol, ci, pcpu, count);
397                 }
398         }
399 }
400
401 static int
402 get_pfid(struct policy *pol)
403 {
404
405         int i, x, ret = 0;
406
407         for (i = 0; i < pol->pkt.nb_mac_to_monitor; i++) {
408
409                 RTE_ETH_FOREACH_DEV(x) {
410                         ret = rte_pmd_i40e_query_vfid_by_mac(x,
411                                 (struct ether_addr *)&(pol->pkt.vfid[i]));
412                         if (ret != -EINVAL) {
413                                 pol->port[i] = x;
414                                 break;
415                         }
416                 }
417                 if (ret == -EINVAL || ret == -ENOTSUP || ret == ENODEV) {
418                         RTE_LOG(INFO, CHANNEL_MONITOR,
419                                 "Error with Policy. MAC not found on "
420                                 "attached ports ");
421                         pol->enabled = 0;
422                         return ret;
423                 }
424                 pol->pfid[i] = ret;
425         }
426         return 1;
427 }
428
429 static int
430 update_policy(struct channel_packet *pkt)
431 {
432
433         unsigned int updated = 0;
434         int i;
435
436
437         RTE_LOG(INFO, CHANNEL_MONITOR,
438                         "Applying policy for %s\n", pkt->vm_name);
439
440         for (i = 0; i < MAX_CLIENTS; i++) {
441                 if (strcmp(policies[i].pkt.vm_name, pkt->vm_name) == 0) {
442                         /* Copy the contents of *pkt into the policy.pkt */
443                         policies[i].pkt = *pkt;
444                         get_pcpu_to_control(&policies[i]);
445                         if (get_pfid(&policies[i]) == -1) {
446                                 updated = 1;
447                                 break;
448                         }
449                         core_share_status(i);
450                         policies[i].enabled = 1;
451                         updated = 1;
452                 }
453         }
454         if (!updated) {
455                 for (i = 0; i < MAX_CLIENTS; i++) {
456                         if (policies[i].enabled == 0) {
457                                 policies[i].pkt = *pkt;
458                                 get_pcpu_to_control(&policies[i]);
459                                 if (get_pfid(&policies[i]) == -1)
460                                         break;
461                                 core_share_status(i);
462                                 policies[i].enabled = 1;
463                                 break;
464                         }
465                 }
466         }
467         return 0;
468 }
469
470 static int
471 remove_policy(struct channel_packet *pkt __rte_unused)
472 {
473         int i;
474
475         /*
476          * Disabling the policy is simply a case of setting
477          * enabled to 0
478          */
479         for (i = 0; i < MAX_CLIENTS; i++) {
480                 if (strcmp(policies[i].pkt.vm_name, pkt->vm_name) == 0) {
481                         policies[i].enabled = 0;
482                         return 0;
483                 }
484         }
485         return -1;
486 }
487
488 static uint64_t
489 get_pkt_diff(struct policy *pol)
490 {
491
492         uint64_t vsi_pkt_count,
493                 vsi_pkt_total = 0,
494                 vsi_pkt_count_prev_total = 0;
495         double rdtsc_curr, rdtsc_diff, diff;
496         int x;
497         struct rte_eth_stats vf_stats;
498
499         for (x = 0; x < pol->pkt.nb_mac_to_monitor; x++) {
500
501                 /*Read vsi stats*/
502                 if (rte_pmd_i40e_get_vf_stats(x, pol->pfid[x], &vf_stats) == 0)
503                         vsi_pkt_count = vf_stats.ipackets;
504                 else
505                         vsi_pkt_count = -1;
506
507                 vsi_pkt_total += vsi_pkt_count;
508
509                 vsi_pkt_count_prev_total += vsi_pkt_count_prev[pol->pfid[x]];
510                 vsi_pkt_count_prev[pol->pfid[x]] = vsi_pkt_count;
511         }
512
513         rdtsc_curr = rte_rdtsc_precise();
514         rdtsc_diff = rdtsc_curr - rdtsc_prev[pol->pfid[x-1]];
515         rdtsc_prev[pol->pfid[x-1]] = rdtsc_curr;
516
517         diff = (vsi_pkt_total - vsi_pkt_count_prev_total) *
518                         ((double)rte_get_tsc_hz() / rdtsc_diff);
519
520         return diff;
521 }
522
523 static void
524 apply_traffic_profile(struct policy *pol)
525 {
526
527         int count;
528         uint64_t diff = 0;
529
530         diff = get_pkt_diff(pol);
531
532         if (diff >= (pol->pkt.traffic_policy.max_max_packet_thresh)) {
533                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
534                         if (pol->core_share[count].status != 1)
535                                 power_manager_scale_core_max(
536                                                 pol->core_share[count].pcpu);
537                 }
538         } else if (diff >= (pol->pkt.traffic_policy.avg_max_packet_thresh)) {
539                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
540                         if (pol->core_share[count].status != 1)
541                                 power_manager_scale_core_med(
542                                                 pol->core_share[count].pcpu);
543                 }
544         } else if (diff < (pol->pkt.traffic_policy.avg_max_packet_thresh)) {
545                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
546                         if (pol->core_share[count].status != 1)
547                                 power_manager_scale_core_min(
548                                                 pol->core_share[count].pcpu);
549                 }
550         }
551 }
552
553 static void
554 apply_time_profile(struct policy *pol)
555 {
556
557         int count, x;
558         struct timeval tv;
559         struct tm *ptm;
560         char time_string[40];
561
562         /* Obtain the time of day, and convert it to a tm struct. */
563         gettimeofday(&tv, NULL);
564         ptm = localtime(&tv.tv_sec);
565         /* Format the date and time, down to a single second. */
566         strftime(time_string, sizeof(time_string), "%Y-%m-%d %H:%M:%S", ptm);
567
568         for (x = 0; x < HOURS; x++) {
569
570                 if (ptm->tm_hour == pol->pkt.timer_policy.busy_hours[x]) {
571                         for (count = 0; count < pol->pkt.num_vcpu; count++) {
572                                 if (pol->core_share[count].status != 1) {
573                                         power_manager_scale_core_max(
574                                                 pol->core_share[count].pcpu);
575                                 }
576                         }
577                         break;
578                 } else if (ptm->tm_hour ==
579                                 pol->pkt.timer_policy.quiet_hours[x]) {
580                         for (count = 0; count < pol->pkt.num_vcpu; count++) {
581                                 if (pol->core_share[count].status != 1) {
582                                         power_manager_scale_core_min(
583                                                 pol->core_share[count].pcpu);
584                         }
585                 }
586                         break;
587                 } else if (ptm->tm_hour ==
588                         pol->pkt.timer_policy.hours_to_use_traffic_profile[x]) {
589                         apply_traffic_profile(pol);
590                         break;
591                 }
592         }
593 }
594
595 static void
596 apply_workload_profile(struct policy *pol)
597 {
598
599         int count;
600
601         if (pol->pkt.workload == HIGH) {
602                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
603                         if (pol->core_share[count].status != 1)
604                                 power_manager_scale_core_max(
605                                                 pol->core_share[count].pcpu);
606                 }
607         } else if (pol->pkt.workload == MEDIUM) {
608                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
609                         if (pol->core_share[count].status != 1)
610                                 power_manager_scale_core_med(
611                                                 pol->core_share[count].pcpu);
612                 }
613         } else if (pol->pkt.workload == LOW) {
614                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
615                         if (pol->core_share[count].status != 1)
616                                 power_manager_scale_core_min(
617                                                 pol->core_share[count].pcpu);
618                 }
619         }
620 }
621
622 static void
623 apply_policy(struct policy *pol)
624 {
625
626         struct channel_packet *pkt = &pol->pkt;
627
628         /*Check policy to use*/
629         if (pkt->policy_to_use == TRAFFIC)
630                 apply_traffic_profile(pol);
631         else if (pkt->policy_to_use == TIME)
632                 apply_time_profile(pol);
633         else if (pkt->policy_to_use == WORKLOAD)
634                 apply_workload_profile(pol);
635 }
636
637 static int
638 process_request(struct channel_packet *pkt, struct channel_info *chan_info)
639 {
640         uint64_t core_mask;
641
642         if (chan_info == NULL)
643                 return -1;
644
645         if (rte_atomic32_cmpset(&(chan_info->status), CHANNEL_MGR_CHANNEL_CONNECTED,
646                         CHANNEL_MGR_CHANNEL_PROCESSING) == 0)
647                 return -1;
648
649         if (pkt->command == CPU_POWER) {
650                 core_mask = get_pcpus_mask(chan_info, pkt->resource_id);
651                 if (core_mask == 0) {
652                         /*
653                          * Core mask will be 0 in the case where
654                          * hypervisor is not available so we're working in
655                          * the host, so use the core as the mask.
656                          */
657                         core_mask = 1ULL << pkt->resource_id;
658                 }
659                 if (__builtin_popcountll(core_mask) == 1) {
660
661                         unsigned core_num = __builtin_ffsll(core_mask) - 1;
662
663                         switch (pkt->unit) {
664                         case(CPU_POWER_SCALE_MIN):
665                                         power_manager_scale_core_min(core_num);
666                         break;
667                         case(CPU_POWER_SCALE_MAX):
668                                         power_manager_scale_core_max(core_num);
669                         break;
670                         case(CPU_POWER_SCALE_DOWN):
671                                         power_manager_scale_core_down(core_num);
672                         break;
673                         case(CPU_POWER_SCALE_UP):
674                                         power_manager_scale_core_up(core_num);
675                         break;
676                         case(CPU_POWER_ENABLE_TURBO):
677                                 power_manager_enable_turbo_core(core_num);
678                         break;
679                         case(CPU_POWER_DISABLE_TURBO):
680                                 power_manager_disable_turbo_core(core_num);
681                         break;
682                         default:
683                                 break;
684                         }
685                 } else {
686                         switch (pkt->unit) {
687                         case(CPU_POWER_SCALE_MIN):
688                                         power_manager_scale_mask_min(core_mask);
689                         break;
690                         case(CPU_POWER_SCALE_MAX):
691                                         power_manager_scale_mask_max(core_mask);
692                         break;
693                         case(CPU_POWER_SCALE_DOWN):
694                                         power_manager_scale_mask_down(core_mask);
695                         break;
696                         case(CPU_POWER_SCALE_UP):
697                                         power_manager_scale_mask_up(core_mask);
698                         break;
699                         case(CPU_POWER_ENABLE_TURBO):
700                                 power_manager_enable_turbo_mask(core_mask);
701                         break;
702                         case(CPU_POWER_DISABLE_TURBO):
703                                 power_manager_disable_turbo_mask(core_mask);
704                         break;
705                         default:
706                                 break;
707                         }
708
709                 }
710         }
711
712         if (pkt->command == PKT_POLICY) {
713                 RTE_LOG(INFO, CHANNEL_MONITOR, "Processing policy request %s\n",
714                                 pkt->vm_name);
715                 update_policy(pkt);
716                 policy_is_set = 1;
717         }
718
719         if (pkt->command == PKT_POLICY_REMOVE) {
720                 RTE_LOG(INFO, CHANNEL_MONITOR,
721                                  "Removing policy %s\n", pkt->vm_name);
722                 remove_policy(pkt);
723         }
724
725         /*
726          * Return is not checked as channel status may have been set to DISABLED
727          * from management thread
728          */
729         rte_atomic32_cmpset(&(chan_info->status), CHANNEL_MGR_CHANNEL_PROCESSING,
730                         CHANNEL_MGR_CHANNEL_CONNECTED);
731         return 0;
732
733 }
734
735 int
736 add_channel_to_monitor(struct channel_info **chan_info)
737 {
738         struct channel_info *info = *chan_info;
739         struct epoll_event event;
740
741         event.events = EPOLLIN;
742         event.data.ptr = info;
743         if (epoll_ctl(global_event_fd, EPOLL_CTL_ADD, info->fd, &event) < 0) {
744                 RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to add channel '%s' "
745                                 "to epoll\n", info->channel_path);
746                 return -1;
747         }
748         RTE_LOG(ERR, CHANNEL_MONITOR, "Added channel '%s' "
749                         "to monitor\n", info->channel_path);
750         return 0;
751 }
752
753 int
754 remove_channel_from_monitor(struct channel_info *chan_info)
755 {
756         if (epoll_ctl(global_event_fd, EPOLL_CTL_DEL,
757                         chan_info->fd, NULL) < 0) {
758                 RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to remove channel '%s' "
759                                 "from epoll\n", chan_info->channel_path);
760                 return -1;
761         }
762         return 0;
763 }
764
765 int
766 channel_monitor_init(void)
767 {
768         global_event_fd = epoll_create1(0);
769         if (global_event_fd == 0) {
770                 RTE_LOG(ERR, CHANNEL_MONITOR,
771                                 "Error creating epoll context with error %s\n",
772                                 strerror(errno));
773                 return -1;
774         }
775         global_events_list = rte_malloc("epoll_events",
776                         sizeof(*global_events_list)
777                         * MAX_EVENTS, RTE_CACHE_LINE_SIZE);
778         if (global_events_list == NULL) {
779                 RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to rte_malloc for "
780                                 "epoll events\n");
781                 return -1;
782         }
783         return 0;
784 }
785
786 static void
787 read_binary_packet(struct channel_info *chan_info)
788 {
789         struct channel_packet pkt;
790         void *buffer = &pkt;
791         int buffer_len = sizeof(pkt);
792         int n_bytes, err = 0;
793
794         while (buffer_len > 0) {
795                 n_bytes = read(chan_info->fd,
796                                 buffer, buffer_len);
797                 if (n_bytes == buffer_len)
798                         break;
799                 if (n_bytes == -1) {
800                         err = errno;
801                         RTE_LOG(DEBUG, CHANNEL_MONITOR,
802                                 "Received error on "
803                                 "channel '%s' read: %s\n",
804                                 chan_info->channel_path,
805                                 strerror(err));
806                         remove_channel(&chan_info);
807                         break;
808                 }
809                 buffer = (char *)buffer + n_bytes;
810                 buffer_len -= n_bytes;
811         }
812         if (!err)
813                 process_request(&pkt, chan_info);
814 }
815
816 #ifdef USE_JANSSON
817 static void
818 read_json_packet(struct channel_info *chan_info)
819 {
820         struct channel_packet pkt;
821         int n_bytes, ret;
822         json_t *root;
823         json_error_t error;
824
825         /* read opening brace to closing brace */
826         do {
827                 int idx = 0;
828                 int indent = 0;
829                 do {
830                         n_bytes = read(chan_info->fd, &json_data[idx], 1);
831                         if (n_bytes == 0)
832                                 break;
833                         if (json_data[idx] == '{')
834                                 indent++;
835                         if (json_data[idx] == '}')
836                                 indent--;
837                         if ((indent > 0) || (idx > 0))
838                                 idx++;
839                         if (indent <= 0)
840                                 json_data[idx] = 0;
841                         if (idx >= MAX_JSON_STRING_LEN-1)
842                                 break;
843                 } while (indent > 0);
844
845                 json_data[idx] = '\0';
846
847                 if (strlen(json_data) == 0)
848                         continue;
849
850                 printf("got [%s]\n", json_data);
851
852                 root = json_loads(json_data, 0, &error);
853
854                 if (root) {
855                         /*
856                          * Because our data is now in the json
857                          * object, we can overwrite the pkt
858                          * with a channel_packet struct, using
859                          * parse_json_to_pkt()
860                          */
861                         ret = parse_json_to_pkt(root, &pkt);
862                         json_decref(root);
863                         if (ret) {
864                                 RTE_LOG(ERR, CHANNEL_MONITOR,
865                                         "Error validating JSON profile data\n");
866                                 break;
867                         }
868                         process_request(&pkt, chan_info);
869                 } else {
870                         RTE_LOG(ERR, CHANNEL_MONITOR,
871                                         "JSON error on line %d: %s\n",
872                                         error.line, error.text);
873                 }
874         } while (n_bytes > 0);
875 }
876 #endif
877
878 void
879 run_channel_monitor(void)
880 {
881         while (run_loop) {
882                 int n_events, i;
883
884                 n_events = epoll_wait(global_event_fd, global_events_list,
885                                 MAX_EVENTS, 1);
886                 if (!run_loop)
887                         break;
888                 for (i = 0; i < n_events; i++) {
889                         struct channel_info *chan_info = (struct channel_info *)
890                                         global_events_list[i].data.ptr;
891                         if ((global_events_list[i].events & EPOLLERR) ||
892                                 (global_events_list[i].events & EPOLLHUP)) {
893                                 RTE_LOG(INFO, CHANNEL_MONITOR,
894                                                 "Remote closed connection for "
895                                                 "channel '%s'\n",
896                                                 chan_info->channel_path);
897                                 remove_channel(&chan_info);
898                                 continue;
899                         }
900                         if (global_events_list[i].events & EPOLLIN) {
901
902                                 switch (chan_info->type) {
903                                 case CHANNEL_TYPE_BINARY:
904                                         read_binary_packet(chan_info);
905                                         break;
906 #ifdef USE_JANSSON
907                                 case CHANNEL_TYPE_JSON:
908                                         read_json_packet(chan_info);
909                                         break;
910 #endif
911                                 default:
912                                         break;
913                                 }
914                         }
915                 }
916                 rte_delay_us(time_period_ms*1000);
917                 if (policy_is_set) {
918                         int j;
919
920                         for (j = 0; j < MAX_CLIENTS; j++) {
921                                 if (policies[j].enabled == 1)
922                                         apply_policy(&policies[j]);
923                         }
924                 }
925         }
926 }