New upstream version 18.11-rc1
[deb_dpdk.git] / examples / vm_power_manager / channel_monitor.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2010-2014 Intel Corporation
3  */
4
5 #include <unistd.h>
6 #include <stdio.h>
7 #include <stdlib.h>
8 #include <stdint.h>
9 #include <signal.h>
10 #include <errno.h>
11 #include <string.h>
12 #include <fcntl.h>
13 #include <sys/types.h>
14 #include <sys/epoll.h>
15 #include <sys/queue.h>
16 #include <sys/time.h>
17 #include <sys/socket.h>
18 #include <sys/select.h>
19 #ifdef USE_JANSSON
20 #include <jansson.h>
21 #else
22 #pragma message "Jansson dev libs unavailable, not including JSON parsing"
23 #endif
24 #include <rte_log.h>
25 #include <rte_memory.h>
26 #include <rte_malloc.h>
27 #include <rte_atomic.h>
28 #include <rte_cycles.h>
29 #include <rte_ethdev.h>
30 #include <rte_pmd_i40e.h>
31
32 #include <libvirt/libvirt.h>
33 #include "channel_monitor.h"
34 #include "channel_commands.h"
35 #include "channel_manager.h"
36 #include "power_manager.h"
37 #include "oob_monitor.h"
38
39 #define RTE_LOGTYPE_CHANNEL_MONITOR RTE_LOGTYPE_USER1
40
41 #define MAX_EVENTS 256
42
43 uint64_t vsi_pkt_count_prev[384];
44 uint64_t rdtsc_prev[384];
45 #define MAX_JSON_STRING_LEN 1024
46 char json_data[MAX_JSON_STRING_LEN];
47
48 double time_period_ms = 1;
49 static volatile unsigned run_loop = 1;
50 static int global_event_fd;
51 static unsigned int policy_is_set;
52 static struct epoll_event *global_events_list;
53 static struct policy policies[MAX_CLIENTS];
54
55 #ifdef USE_JANSSON
56
57 union PFID {
58         struct ether_addr addr;
59         uint64_t pfid;
60 };
61
62 static int
63 str_to_ether_addr(const char *a, struct ether_addr *ether_addr)
64 {
65         int i;
66         char *end;
67         unsigned long o[ETHER_ADDR_LEN];
68
69         i = 0;
70         do {
71                 errno = 0;
72                 o[i] = strtoul(a, &end, 16);
73                 if (errno != 0 || end == a || (end[0] != ':' && end[0] != 0))
74                         return -1;
75                 a = end + 1;
76         } while (++i != RTE_DIM(o) / sizeof(o[0]) && end[0] != 0);
77
78         /* Junk at the end of line */
79         if (end[0] != 0)
80                 return -1;
81
82         /* Support the format XX:XX:XX:XX:XX:XX */
83         if (i == ETHER_ADDR_LEN) {
84                 while (i-- != 0) {
85                         if (o[i] > UINT8_MAX)
86                                 return -1;
87                         ether_addr->addr_bytes[i] = (uint8_t)o[i];
88                 }
89         /* Support the format XXXX:XXXX:XXXX */
90         } else if (i == ETHER_ADDR_LEN / 2) {
91                 while (i-- != 0) {
92                         if (o[i] > UINT16_MAX)
93                                 return -1;
94                         ether_addr->addr_bytes[i * 2] =
95                                         (uint8_t)(o[i] >> 8);
96                         ether_addr->addr_bytes[i * 2 + 1] =
97                                         (uint8_t)(o[i] & 0xff);
98                 }
99         /* unknown format */
100         } else
101                 return -1;
102
103         return 0;
104 }
105
106 static int
107 set_policy_mac(struct channel_packet *pkt, int idx, char *mac)
108 {
109         union PFID pfid;
110         int ret;
111
112         /* Use port MAC address as the vfid */
113         ret = str_to_ether_addr(mac, &pfid.addr);
114
115         if (ret != 0) {
116                 RTE_LOG(ERR, CHANNEL_MONITOR,
117                         "Invalid mac address received in JSON\n");
118                 pkt->vfid[idx] = 0;
119                 return -1;
120         }
121
122         printf("Received MAC Address: %02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 ":"
123                         "%02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 "\n",
124                         pfid.addr.addr_bytes[0], pfid.addr.addr_bytes[1],
125                         pfid.addr.addr_bytes[2], pfid.addr.addr_bytes[3],
126                         pfid.addr.addr_bytes[4], pfid.addr.addr_bytes[5]);
127
128         pkt->vfid[idx] = pfid.pfid;
129         return 0;
130 }
131
132
133 static int
134 parse_json_to_pkt(json_t *element, struct channel_packet *pkt)
135 {
136         const char *key;
137         json_t *value;
138         int ret;
139
140         memset(pkt, 0, sizeof(struct channel_packet));
141
142         pkt->nb_mac_to_monitor = 0;
143         pkt->t_boost_status.tbEnabled = false;
144         pkt->workload = LOW;
145         pkt->policy_to_use = TIME;
146         pkt->command = PKT_POLICY;
147         pkt->core_type = CORE_TYPE_PHYSICAL;
148
149         json_object_foreach(element, key, value) {
150                 if (!strcmp(key, "policy")) {
151                         /* Recurse in to get the contents of profile */
152                         ret = parse_json_to_pkt(value, pkt);
153                         if (ret)
154                                 return ret;
155                 } else if (!strcmp(key, "instruction")) {
156                         /* Recurse in to get the contents of instruction */
157                         ret = parse_json_to_pkt(value, pkt);
158                         if (ret)
159                                 return ret;
160                 } else if (!strcmp(key, "name")) {
161                         strcpy(pkt->vm_name, json_string_value(value));
162                 } else if (!strcmp(key, "command")) {
163                         char command[32];
164                         snprintf(command, 32, "%s", json_string_value(value));
165                         if (!strcmp(command, "power")) {
166                                 pkt->command = CPU_POWER;
167                         } else if (!strcmp(command, "create")) {
168                                 pkt->command = PKT_POLICY;
169                         } else if (!strcmp(command, "destroy")) {
170                                 pkt->command = PKT_POLICY_REMOVE;
171                         } else {
172                                 RTE_LOG(ERR, CHANNEL_MONITOR,
173                                         "Invalid command received in JSON\n");
174                                 return -1;
175                         }
176                 } else if (!strcmp(key, "policy_type")) {
177                         char command[32];
178                         snprintf(command, 32, "%s", json_string_value(value));
179                         if (!strcmp(command, "TIME")) {
180                                 pkt->policy_to_use = TIME;
181                         } else if (!strcmp(command, "TRAFFIC")) {
182                                 pkt->policy_to_use = TRAFFIC;
183                         } else if (!strcmp(command, "WORKLOAD")) {
184                                 pkt->policy_to_use = WORKLOAD;
185                         } else if (!strcmp(command, "BRANCH_RATIO")) {
186                                 pkt->policy_to_use = BRANCH_RATIO;
187                         } else {
188                                 RTE_LOG(ERR, CHANNEL_MONITOR,
189                                         "Wrong policy_type received in JSON\n");
190                                 return -1;
191                         }
192                 } else if (!strcmp(key, "workload")) {
193                         char command[32];
194                         snprintf(command, 32, "%s", json_string_value(value));
195                         if (!strcmp(command, "HIGH")) {
196                                 pkt->workload = HIGH;
197                         } else if (!strcmp(command, "MEDIUM")) {
198                                 pkt->workload = MEDIUM;
199                         } else if (!strcmp(command, "LOW")) {
200                                 pkt->workload = LOW;
201                         } else {
202                                 RTE_LOG(ERR, CHANNEL_MONITOR,
203                                         "Wrong workload received in JSON\n");
204                                 return -1;
205                         }
206                 } else if (!strcmp(key, "busy_hours")) {
207                         unsigned int i;
208                         size_t size = json_array_size(value);
209
210                         for (i = 0; i < size; i++) {
211                                 int hour = (int)json_integer_value(
212                                                 json_array_get(value, i));
213                                 pkt->timer_policy.busy_hours[i] = hour;
214                         }
215                 } else if (!strcmp(key, "quiet_hours")) {
216                         unsigned int i;
217                         size_t size = json_array_size(value);
218
219                         for (i = 0; i < size; i++) {
220                                 int hour = (int)json_integer_value(
221                                                 json_array_get(value, i));
222                                 pkt->timer_policy.quiet_hours[i] = hour;
223                         }
224                 } else if (!strcmp(key, "core_list")) {
225                         unsigned int i;
226                         size_t size = json_array_size(value);
227
228                         for (i = 0; i < size; i++) {
229                                 int core = (int)json_integer_value(
230                                                 json_array_get(value, i));
231                                 pkt->vcpu_to_control[i] = core;
232                         }
233                         pkt->num_vcpu = size;
234                 } else if (!strcmp(key, "mac_list")) {
235                         unsigned int i;
236                         size_t size = json_array_size(value);
237
238                         for (i = 0; i < size; i++) {
239                                 char mac[32];
240                                 snprintf(mac, 32, "%s", json_string_value(
241                                                 json_array_get(value, i)));
242                                 set_policy_mac(pkt, i, mac);
243                         }
244                         pkt->nb_mac_to_monitor = size;
245                 } else if (!strcmp(key, "avg_packet_thresh")) {
246                         pkt->traffic_policy.avg_max_packet_thresh =
247                                         (uint32_t)json_integer_value(value);
248                 } else if (!strcmp(key, "max_packet_thresh")) {
249                         pkt->traffic_policy.max_max_packet_thresh =
250                                         (uint32_t)json_integer_value(value);
251                 } else if (!strcmp(key, "unit")) {
252                         char unit[32];
253                         snprintf(unit, 32, "%s", json_string_value(value));
254                         if (!strcmp(unit, "SCALE_UP")) {
255                                 pkt->unit = CPU_POWER_SCALE_UP;
256                         } else if (!strcmp(unit, "SCALE_DOWN")) {
257                                 pkt->unit = CPU_POWER_SCALE_DOWN;
258                         } else if (!strcmp(unit, "SCALE_MAX")) {
259                                 pkt->unit = CPU_POWER_SCALE_MAX;
260                         } else if (!strcmp(unit, "SCALE_MIN")) {
261                                 pkt->unit = CPU_POWER_SCALE_MIN;
262                         } else if (!strcmp(unit, "ENABLE_TURBO")) {
263                                 pkt->unit = CPU_POWER_ENABLE_TURBO;
264                         } else if (!strcmp(unit, "DISABLE_TURBO")) {
265                                 pkt->unit = CPU_POWER_DISABLE_TURBO;
266                         } else {
267                                 RTE_LOG(ERR, CHANNEL_MONITOR,
268                                         "Invalid command received in JSON\n");
269                                 return -1;
270                         }
271                 } else if (!strcmp(key, "resource_id")) {
272                         pkt->resource_id = (uint32_t)json_integer_value(value);
273                 } else {
274                         RTE_LOG(ERR, CHANNEL_MONITOR,
275                                 "Unknown key received in JSON string: %s\n",
276                                 key);
277                 }
278         }
279         return 0;
280 }
281 #endif
282
283 void channel_monitor_exit(void)
284 {
285         run_loop = 0;
286         rte_free(global_events_list);
287 }
288
289 static void
290 core_share(int pNo, int z, int x, int t)
291 {
292         if (policies[pNo].core_share[z].pcpu == lvm_info[x].pcpus[t]) {
293                 if (strcmp(policies[pNo].pkt.vm_name,
294                                 lvm_info[x].vm_name) != 0) {
295                         policies[pNo].core_share[z].status = 1;
296                         power_manager_scale_core_max(
297                                         policies[pNo].core_share[z].pcpu);
298                 }
299         }
300 }
301
302 static void
303 core_share_status(int pNo)
304 {
305
306         int noVms = 0, noVcpus = 0, z, x, t;
307
308         get_all_vm(&noVms, &noVcpus);
309
310         /* Reset Core Share Status. */
311         for (z = 0; z < noVcpus; z++)
312                 policies[pNo].core_share[z].status = 0;
313
314         /* Foreach vcpu in a policy. */
315         for (z = 0; z < policies[pNo].pkt.num_vcpu; z++) {
316                 /* Foreach VM on the platform. */
317                 for (x = 0; x < noVms; x++) {
318                         /* Foreach vcpu of VMs on platform. */
319                         for (t = 0; t < lvm_info[x].num_cpus; t++)
320                                 core_share(pNo, z, x, t);
321                 }
322         }
323 }
324
325
326 static int
327 pcpu_monitor(struct policy *pol, struct core_info *ci, int pcpu, int count)
328 {
329         int ret = 0;
330
331         if (pol->pkt.policy_to_use == BRANCH_RATIO) {
332                 ci->cd[pcpu].oob_enabled = 1;
333                 ret = add_core_to_monitor(pcpu);
334                 if (ret == 0)
335                         RTE_LOG(INFO, CHANNEL_MONITOR,
336                                         "Monitoring pcpu %d OOB for %s\n",
337                                         pcpu, pol->pkt.vm_name);
338                 else
339                         RTE_LOG(ERR, CHANNEL_MONITOR,
340                                         "Error monitoring pcpu %d OOB for %s\n",
341                                         pcpu, pol->pkt.vm_name);
342
343         } else {
344                 pol->core_share[count].pcpu = pcpu;
345                 RTE_LOG(INFO, CHANNEL_MONITOR,
346                                 "Monitoring pcpu %d for %s\n",
347                                 pcpu, pol->pkt.vm_name);
348         }
349         return ret;
350 }
351
352 static void
353 get_pcpu_to_control(struct policy *pol)
354 {
355
356         /* Convert vcpu to pcpu. */
357         struct vm_info info;
358         int pcpu, count;
359         uint64_t mask_u64b;
360         struct core_info *ci;
361
362         ci = get_core_info();
363
364         RTE_LOG(DEBUG, CHANNEL_MONITOR,
365                         "Looking for pcpu for %s\n", pol->pkt.vm_name);
366
367         /*
368          * So now that we're handling virtual and physical cores, we need to
369          * differenciate between them when adding them to the branch monitor.
370          * Virtual cores need to be converted to physical cores.
371          */
372         if (pol->pkt.core_type == CORE_TYPE_VIRTUAL) {
373                 /*
374                  * If the cores in the policy are virtual, we need to map them
375                  * to physical core. We look up the vm info and use that for
376                  * the mapping.
377                  */
378                 get_info_vm(pol->pkt.vm_name, &info);
379                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
380                         mask_u64b =
381                                 info.pcpu_mask[pol->pkt.vcpu_to_control[count]];
382                         for (pcpu = 0; mask_u64b;
383                                         mask_u64b &= ~(1ULL << pcpu++)) {
384                                 if ((mask_u64b >> pcpu) & 1)
385                                         pcpu_monitor(pol, ci, pcpu, count);
386                         }
387                 }
388         } else {
389                 /*
390                  * If the cores in the policy are physical, we just use
391                  * those core id's directly.
392                  */
393                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
394                         pcpu = pol->pkt.vcpu_to_control[count];
395                         pcpu_monitor(pol, ci, pcpu, count);
396                 }
397         }
398 }
399
400 static int
401 get_pfid(struct policy *pol)
402 {
403
404         int i, x, ret = 0;
405
406         for (i = 0; i < pol->pkt.nb_mac_to_monitor; i++) {
407
408                 RTE_ETH_FOREACH_DEV(x) {
409                         ret = rte_pmd_i40e_query_vfid_by_mac(x,
410                                 (struct ether_addr *)&(pol->pkt.vfid[i]));
411                         if (ret != -EINVAL) {
412                                 pol->port[i] = x;
413                                 break;
414                         }
415                 }
416                 if (ret == -EINVAL || ret == -ENOTSUP || ret == ENODEV) {
417                         RTE_LOG(INFO, CHANNEL_MONITOR,
418                                 "Error with Policy. MAC not found on "
419                                 "attached ports ");
420                         pol->enabled = 0;
421                         return ret;
422                 }
423                 pol->pfid[i] = ret;
424         }
425         return 1;
426 }
427
428 static int
429 update_policy(struct channel_packet *pkt)
430 {
431
432         unsigned int updated = 0;
433         int i;
434
435
436         RTE_LOG(INFO, CHANNEL_MONITOR,
437                         "Applying policy for %s\n", pkt->vm_name);
438
439         for (i = 0; i < MAX_CLIENTS; i++) {
440                 if (strcmp(policies[i].pkt.vm_name, pkt->vm_name) == 0) {
441                         /* Copy the contents of *pkt into the policy.pkt */
442                         policies[i].pkt = *pkt;
443                         get_pcpu_to_control(&policies[i]);
444                         if (get_pfid(&policies[i]) == -1) {
445                                 updated = 1;
446                                 break;
447                         }
448                         core_share_status(i);
449                         policies[i].enabled = 1;
450                         updated = 1;
451                 }
452         }
453         if (!updated) {
454                 for (i = 0; i < MAX_CLIENTS; i++) {
455                         if (policies[i].enabled == 0) {
456                                 policies[i].pkt = *pkt;
457                                 get_pcpu_to_control(&policies[i]);
458                                 if (get_pfid(&policies[i]) == -1)
459                                         break;
460                                 core_share_status(i);
461                                 policies[i].enabled = 1;
462                                 break;
463                         }
464                 }
465         }
466         return 0;
467 }
468
469 static int
470 remove_policy(struct channel_packet *pkt __rte_unused)
471 {
472         int i;
473
474         /*
475          * Disabling the policy is simply a case of setting
476          * enabled to 0
477          */
478         for (i = 0; i < MAX_CLIENTS; i++) {
479                 if (strcmp(policies[i].pkt.vm_name, pkt->vm_name) == 0) {
480                         policies[i].enabled = 0;
481                         return 0;
482                 }
483         }
484         return -1;
485 }
486
487 static uint64_t
488 get_pkt_diff(struct policy *pol)
489 {
490
491         uint64_t vsi_pkt_count,
492                 vsi_pkt_total = 0,
493                 vsi_pkt_count_prev_total = 0;
494         double rdtsc_curr, rdtsc_diff, diff;
495         int x;
496         struct rte_eth_stats vf_stats;
497
498         for (x = 0; x < pol->pkt.nb_mac_to_monitor; x++) {
499
500                 /*Read vsi stats*/
501                 if (rte_pmd_i40e_get_vf_stats(x, pol->pfid[x], &vf_stats) == 0)
502                         vsi_pkt_count = vf_stats.ipackets;
503                 else
504                         vsi_pkt_count = -1;
505
506                 vsi_pkt_total += vsi_pkt_count;
507
508                 vsi_pkt_count_prev_total += vsi_pkt_count_prev[pol->pfid[x]];
509                 vsi_pkt_count_prev[pol->pfid[x]] = vsi_pkt_count;
510         }
511
512         rdtsc_curr = rte_rdtsc_precise();
513         rdtsc_diff = rdtsc_curr - rdtsc_prev[pol->pfid[x-1]];
514         rdtsc_prev[pol->pfid[x-1]] = rdtsc_curr;
515
516         diff = (vsi_pkt_total - vsi_pkt_count_prev_total) *
517                         ((double)rte_get_tsc_hz() / rdtsc_diff);
518
519         return diff;
520 }
521
522 static void
523 apply_traffic_profile(struct policy *pol)
524 {
525
526         int count;
527         uint64_t diff = 0;
528
529         diff = get_pkt_diff(pol);
530
531         if (diff >= (pol->pkt.traffic_policy.max_max_packet_thresh)) {
532                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
533                         if (pol->core_share[count].status != 1)
534                                 power_manager_scale_core_max(
535                                                 pol->core_share[count].pcpu);
536                 }
537         } else if (diff >= (pol->pkt.traffic_policy.avg_max_packet_thresh)) {
538                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
539                         if (pol->core_share[count].status != 1)
540                                 power_manager_scale_core_med(
541                                                 pol->core_share[count].pcpu);
542                 }
543         } else if (diff < (pol->pkt.traffic_policy.avg_max_packet_thresh)) {
544                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
545                         if (pol->core_share[count].status != 1)
546                                 power_manager_scale_core_min(
547                                                 pol->core_share[count].pcpu);
548                 }
549         }
550 }
551
552 static void
553 apply_time_profile(struct policy *pol)
554 {
555
556         int count, x;
557         struct timeval tv;
558         struct tm *ptm;
559         char time_string[40];
560
561         /* Obtain the time of day, and convert it to a tm struct. */
562         gettimeofday(&tv, NULL);
563         ptm = localtime(&tv.tv_sec);
564         /* Format the date and time, down to a single second. */
565         strftime(time_string, sizeof(time_string), "%Y-%m-%d %H:%M:%S", ptm);
566
567         for (x = 0; x < HOURS; x++) {
568
569                 if (ptm->tm_hour == pol->pkt.timer_policy.busy_hours[x]) {
570                         for (count = 0; count < pol->pkt.num_vcpu; count++) {
571                                 if (pol->core_share[count].status != 1) {
572                                         power_manager_scale_core_max(
573                                                 pol->core_share[count].pcpu);
574                                 }
575                         }
576                         break;
577                 } else if (ptm->tm_hour ==
578                                 pol->pkt.timer_policy.quiet_hours[x]) {
579                         for (count = 0; count < pol->pkt.num_vcpu; count++) {
580                                 if (pol->core_share[count].status != 1) {
581                                         power_manager_scale_core_min(
582                                                 pol->core_share[count].pcpu);
583                         }
584                 }
585                         break;
586                 } else if (ptm->tm_hour ==
587                         pol->pkt.timer_policy.hours_to_use_traffic_profile[x]) {
588                         apply_traffic_profile(pol);
589                         break;
590                 }
591         }
592 }
593
594 static void
595 apply_workload_profile(struct policy *pol)
596 {
597
598         int count;
599
600         if (pol->pkt.workload == HIGH) {
601                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
602                         if (pol->core_share[count].status != 1)
603                                 power_manager_scale_core_max(
604                                                 pol->core_share[count].pcpu);
605                 }
606         } else if (pol->pkt.workload == MEDIUM) {
607                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
608                         if (pol->core_share[count].status != 1)
609                                 power_manager_scale_core_med(
610                                                 pol->core_share[count].pcpu);
611                 }
612         } else if (pol->pkt.workload == LOW) {
613                 for (count = 0; count < pol->pkt.num_vcpu; count++) {
614                         if (pol->core_share[count].status != 1)
615                                 power_manager_scale_core_min(
616                                                 pol->core_share[count].pcpu);
617                 }
618         }
619 }
620
621 static void
622 apply_policy(struct policy *pol)
623 {
624
625         struct channel_packet *pkt = &pol->pkt;
626
627         /*Check policy to use*/
628         if (pkt->policy_to_use == TRAFFIC)
629                 apply_traffic_profile(pol);
630         else if (pkt->policy_to_use == TIME)
631                 apply_time_profile(pol);
632         else if (pkt->policy_to_use == WORKLOAD)
633                 apply_workload_profile(pol);
634 }
635
636 static int
637 process_request(struct channel_packet *pkt, struct channel_info *chan_info)
638 {
639         uint64_t core_mask;
640
641         if (chan_info == NULL)
642                 return -1;
643
644         if (rte_atomic32_cmpset(&(chan_info->status), CHANNEL_MGR_CHANNEL_CONNECTED,
645                         CHANNEL_MGR_CHANNEL_PROCESSING) == 0)
646                 return -1;
647
648         if (pkt->command == CPU_POWER) {
649                 core_mask = get_pcpus_mask(chan_info, pkt->resource_id);
650                 if (core_mask == 0) {
651                         /*
652                          * Core mask will be 0 in the case where
653                          * hypervisor is not available so we're working in
654                          * the host, so use the core as the mask.
655                          */
656                         core_mask = 1ULL << pkt->resource_id;
657                 }
658                 if (__builtin_popcountll(core_mask) == 1) {
659
660                         unsigned core_num = __builtin_ffsll(core_mask) - 1;
661
662                         switch (pkt->unit) {
663                         case(CPU_POWER_SCALE_MIN):
664                                         power_manager_scale_core_min(core_num);
665                         break;
666                         case(CPU_POWER_SCALE_MAX):
667                                         power_manager_scale_core_max(core_num);
668                         break;
669                         case(CPU_POWER_SCALE_DOWN):
670                                         power_manager_scale_core_down(core_num);
671                         break;
672                         case(CPU_POWER_SCALE_UP):
673                                         power_manager_scale_core_up(core_num);
674                         break;
675                         case(CPU_POWER_ENABLE_TURBO):
676                                 power_manager_enable_turbo_core(core_num);
677                         break;
678                         case(CPU_POWER_DISABLE_TURBO):
679                                 power_manager_disable_turbo_core(core_num);
680                         break;
681                         default:
682                                 break;
683                         }
684                 } else {
685                         switch (pkt->unit) {
686                         case(CPU_POWER_SCALE_MIN):
687                                         power_manager_scale_mask_min(core_mask);
688                         break;
689                         case(CPU_POWER_SCALE_MAX):
690                                         power_manager_scale_mask_max(core_mask);
691                         break;
692                         case(CPU_POWER_SCALE_DOWN):
693                                         power_manager_scale_mask_down(core_mask);
694                         break;
695                         case(CPU_POWER_SCALE_UP):
696                                         power_manager_scale_mask_up(core_mask);
697                         break;
698                         case(CPU_POWER_ENABLE_TURBO):
699                                 power_manager_enable_turbo_mask(core_mask);
700                         break;
701                         case(CPU_POWER_DISABLE_TURBO):
702                                 power_manager_disable_turbo_mask(core_mask);
703                         break;
704                         default:
705                                 break;
706                         }
707
708                 }
709         }
710
711         if (pkt->command == PKT_POLICY) {
712                 RTE_LOG(INFO, CHANNEL_MONITOR, "Processing policy request %s\n",
713                                 pkt->vm_name);
714                 update_policy(pkt);
715                 policy_is_set = 1;
716         }
717
718         if (pkt->command == PKT_POLICY_REMOVE) {
719                 RTE_LOG(INFO, CHANNEL_MONITOR,
720                                  "Removing policy %s\n", pkt->vm_name);
721                 remove_policy(pkt);
722         }
723
724         /*
725          * Return is not checked as channel status may have been set to DISABLED
726          * from management thread
727          */
728         rte_atomic32_cmpset(&(chan_info->status), CHANNEL_MGR_CHANNEL_PROCESSING,
729                         CHANNEL_MGR_CHANNEL_CONNECTED);
730         return 0;
731
732 }
733
734 int
735 add_channel_to_monitor(struct channel_info **chan_info)
736 {
737         struct channel_info *info = *chan_info;
738         struct epoll_event event;
739
740         event.events = EPOLLIN;
741         event.data.ptr = info;
742         if (epoll_ctl(global_event_fd, EPOLL_CTL_ADD, info->fd, &event) < 0) {
743                 RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to add channel '%s' "
744                                 "to epoll\n", info->channel_path);
745                 return -1;
746         }
747         RTE_LOG(ERR, CHANNEL_MONITOR, "Added channel '%s' "
748                         "to monitor\n", info->channel_path);
749         return 0;
750 }
751
752 int
753 remove_channel_from_monitor(struct channel_info *chan_info)
754 {
755         if (epoll_ctl(global_event_fd, EPOLL_CTL_DEL,
756                         chan_info->fd, NULL) < 0) {
757                 RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to remove channel '%s' "
758                                 "from epoll\n", chan_info->channel_path);
759                 return -1;
760         }
761         return 0;
762 }
763
764 int
765 channel_monitor_init(void)
766 {
767         global_event_fd = epoll_create1(0);
768         if (global_event_fd == 0) {
769                 RTE_LOG(ERR, CHANNEL_MONITOR,
770                                 "Error creating epoll context with error %s\n",
771                                 strerror(errno));
772                 return -1;
773         }
774         global_events_list = rte_malloc("epoll_events",
775                         sizeof(*global_events_list)
776                         * MAX_EVENTS, RTE_CACHE_LINE_SIZE);
777         if (global_events_list == NULL) {
778                 RTE_LOG(ERR, CHANNEL_MONITOR, "Unable to rte_malloc for "
779                                 "epoll events\n");
780                 return -1;
781         }
782         return 0;
783 }
784
785 static void
786 read_binary_packet(struct channel_info *chan_info)
787 {
788         struct channel_packet pkt;
789         void *buffer = &pkt;
790         int buffer_len = sizeof(pkt);
791         int n_bytes, err = 0;
792
793         while (buffer_len > 0) {
794                 n_bytes = read(chan_info->fd,
795                                 buffer, buffer_len);
796                 if (n_bytes == buffer_len)
797                         break;
798                 if (n_bytes == -1) {
799                         err = errno;
800                         RTE_LOG(DEBUG, CHANNEL_MONITOR,
801                                 "Received error on "
802                                 "channel '%s' read: %s\n",
803                                 chan_info->channel_path,
804                                 strerror(err));
805                         remove_channel(&chan_info);
806                         break;
807                 }
808                 buffer = (char *)buffer + n_bytes;
809                 buffer_len -= n_bytes;
810         }
811         if (!err)
812                 process_request(&pkt, chan_info);
813 }
814
815 #ifdef USE_JANSSON
816 static void
817 read_json_packet(struct channel_info *chan_info)
818 {
819         struct channel_packet pkt;
820         int n_bytes, ret;
821         json_t *root;
822         json_error_t error;
823
824         /* read opening brace to closing brace */
825         do {
826                 int idx = 0;
827                 int indent = 0;
828                 do {
829                         n_bytes = read(chan_info->fd, &json_data[idx], 1);
830                         if (n_bytes == 0)
831                                 break;
832                         if (json_data[idx] == '{')
833                                 indent++;
834                         if (json_data[idx] == '}')
835                                 indent--;
836                         if ((indent > 0) || (idx > 0))
837                                 idx++;
838                         if (indent == 0)
839                                 json_data[idx] = 0;
840                         if (idx >= MAX_JSON_STRING_LEN-1)
841                                 break;
842                 } while (indent > 0);
843
844                 if (indent > 0)
845                         /*
846                          * We've broken out of the read loop without getting
847                          * a closing brace, so throw away the data
848                          */
849                         json_data[idx] = 0;
850
851                 if (strlen(json_data) == 0)
852                         continue;
853
854                 printf("got [%s]\n", json_data);
855
856                 root = json_loads(json_data, 0, &error);
857
858                 if (root) {
859                         /*
860                          * Because our data is now in the json
861                          * object, we can overwrite the pkt
862                          * with a channel_packet struct, using
863                          * parse_json_to_pkt()
864                          */
865                         ret = parse_json_to_pkt(root, &pkt);
866                         json_decref(root);
867                         if (ret) {
868                                 RTE_LOG(ERR, CHANNEL_MONITOR,
869                                         "Error validating JSON profile data\n");
870                                 break;
871                         }
872                         process_request(&pkt, chan_info);
873                 } else {
874                         RTE_LOG(ERR, CHANNEL_MONITOR,
875                                         "JSON error on line %d: %s\n",
876                                         error.line, error.text);
877                 }
878         } while (n_bytes > 0);
879 }
880 #endif
881
882 void
883 run_channel_monitor(void)
884 {
885         while (run_loop) {
886                 int n_events, i;
887
888                 n_events = epoll_wait(global_event_fd, global_events_list,
889                                 MAX_EVENTS, 1);
890                 if (!run_loop)
891                         break;
892                 for (i = 0; i < n_events; i++) {
893                         struct channel_info *chan_info = (struct channel_info *)
894                                         global_events_list[i].data.ptr;
895                         if ((global_events_list[i].events & EPOLLERR) ||
896                                 (global_events_list[i].events & EPOLLHUP)) {
897                                 RTE_LOG(INFO, CHANNEL_MONITOR,
898                                                 "Remote closed connection for "
899                                                 "channel '%s'\n",
900                                                 chan_info->channel_path);
901                                 remove_channel(&chan_info);
902                                 continue;
903                         }
904                         if (global_events_list[i].events & EPOLLIN) {
905
906                                 switch (chan_info->type) {
907                                 case CHANNEL_TYPE_BINARY:
908                                         read_binary_packet(chan_info);
909                                         break;
910 #ifdef USE_JANSSON
911                                 case CHANNEL_TYPE_JSON:
912                                         read_json_packet(chan_info);
913                                         break;
914 #endif
915                                 default:
916                                         break;
917                                 }
918                         }
919                 }
920                 rte_delay_us(time_period_ms*1000);
921                 if (policy_is_set) {
922                         int j;
923
924                         for (j = 0; j < MAX_CLIENTS; j++) {
925                                 if (policies[j].enabled == 1)
926                                         apply_policy(&policies[j]);
927                         }
928                 }
929         }
930 }