NAT44: active-passive HA (VPP-1571)
[vpp.git] / src / plugins / nat / nat_ha.c
1 /*
2  * Copyright (c) 2019 Cisco and/or its affiliates.
3  * Licensed under the Apache License, Version 2.0 (the "License");
4  * you may not use this file except in compliance with the License.
5  * You may obtain a copy of the License at:
6  *
7  *     http://www.apache.org/licenses/LICENSE-2.0
8  *
9  * Unless required by applicable law or agreed to in writing, software
10  * distributed under the License is distributed on an "AS IS" BASIS,
11  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12  * See the License for the specific language governing permissions and
13  * limitations under the License.
14  */
15
16 #include "nat_ha.h"
17 #include <vnet/udp/udp.h>
18 #include <nat/nat.h>
19 #include <vppinfra/atomics.h>
20
21 /* number of retries */
22 #define NAT_HA_RETRIES 3
23
24 #define foreach_nat_ha_counter           \
25 _(RECV_ADD, "add-event-recv", 0)         \
26 _(RECV_DEL, "del-event-recv", 1)         \
27 _(RECV_REFRESH, "refresh-event-recv", 2) \
28 _(SEND_ADD, "add-event-send", 3)         \
29 _(SEND_DEL, "del-event-send", 4)         \
30 _(SEND_REFRESH, "refresh-event-send", 5) \
31 _(RECV_ACK, "ack-recv", 6)               \
32 _(SEND_ACK, "ack-send", 7)               \
33 _(RETRY_COUNT, "retry-count", 8)         \
34 _(MISSED_COUNT, "missed-count", 9)
35
36 /* NAT HA protocol version */
37 #define NAT_HA_VERSION 0x01
38
39 /* NAT HA protocol flags */
40 #define NAT_HA_FLAG_ACK 0x01
41
42 /* NAT HA event types */
43 typedef enum
44 {
45   NAT_HA_ADD = 1,
46   NAT_HA_DEL,
47   NAT_HA_REFRESH,
48 } nat_ha_event_type_t;
49
50 /* NAT HA protocol header */
51 typedef struct
52 {
53   /* version */
54   u8 version;
55   /* flags */
56   u8 flags;
57   /* event count */
58   u16 count;
59   /* sequence number */
60   u32 sequence_number;
61   /* thread index where events originated */
62   u32 thread_index;
63 } __attribute__ ((packed)) nat_ha_message_header_t;
64
65 /* NAT HA protocol event data */
66 typedef struct
67 {
68   /* event type */
69   u8 event_type;
70   /* session data */
71   u8 protocol;
72   u16 flags;
73   u32 in_addr;
74   u32 out_addr;
75   u16 in_port;
76   u16 out_port;
77   u32 eh_addr;
78   u32 ehn_addr;
79   u16 eh_port;
80   u16 ehn_port;
81   u32 fib_index;
82   u32 total_pkts;
83   u64 total_bytes;
84 } __attribute__ ((packed)) nat_ha_event_t;
85
86 typedef enum
87 {
88 #define _(N, s, v) NAT_HA_COUNTER_##N = v,
89   foreach_nat_ha_counter
90 #undef _
91   NAT_HA_N_COUNTERS
92 } nat_ha_counter_t;
93
94 /* data waiting for ACK */
95 typedef struct
96 {
97   /* sequence number */
98   u32 seq;
99   /* retry count */
100   u32 retry_count;
101   /* next retry time */
102   f64 retry_timer;
103   /* 1 if HA resync */
104   u8 is_resync;
105   /* packet data */
106   u8 *data;
107 } nat_ha_resend_entry_t;
108
109 /* per thread data */
110 typedef struct
111 {
112   /* buffer under construction */
113   vlib_buffer_t *state_sync_buffer;
114   /* frame containing NAT HA buffers */
115   vlib_frame_t *state_sync_frame;
116   /* number of events */
117   u16 state_sync_count;
118   /* next event offset */
119   u32 state_sync_next_event_offset;
120   /* data waiting for ACK */
121   nat_ha_resend_entry_t *resend_queue;
122 } nat_ha_per_thread_data_t;
123
124 /* NAT HA settings */
125 typedef struct nat_ha_main_s
126 {
127   /* local IP address and UDP port */
128   ip4_address_t src_ip_address;
129   u16 src_port;
130   /* failvoer IP address and UDP port */
131   ip4_address_t dst_ip_address;
132   u16 dst_port;
133   /* path MTU between local and failover */
134   u32 state_sync_path_mtu;
135   /* number of seconds after which to send session counters refresh */
136   u32 session_refresh_interval;
137   /* counters */
138   vlib_simple_counter_main_t counters[NAT_HA_N_COUNTERS];
139   vlib_main_t *vlib_main;
140   /* sequence number counter */
141   u32 sequence_number;
142   /* 1 if resync in progress */
143   u8 in_resync;
144   /* number of remaing ACK for resync */
145   u32 resync_ack_count;
146   /* number of missed ACK for resync */
147   u32 resync_ack_missed;
148   /* resync data */
149   nat_ha_resync_event_cb_t event_callback;
150   u32 client_index;
151   u32 pid;
152   /* call back functions for received HA events on failover */
153   nat_ha_sadd_cb_t sadd_cb;
154   nat_ha_sdel_cb_t sdel_cb;
155   nat_ha_sref_cb_t sref_cb;
156   /* per thread data */
157   u32 num_workers;
158   nat_ha_per_thread_data_t *per_thread_data;
159   /* worker handoff frame-queue index */
160   u32 fq_index;
161 } nat_ha_main_t;
162
163 nat_ha_main_t nat_ha_main;
164 vlib_node_registration_t nat_ha_process_node;
165 vlib_node_registration_t nat_ha_worker_node;
166 vlib_node_registration_t nat_ha_node;
167 vlib_node_registration_t nat_ha_handoff_node;
168
169 static void
170 nat_ha_resync_fin (void)
171 {
172   nat_ha_main_t *ha = &nat_ha_main;
173
174   /* if no more resync ACK remainig we are done */
175   if (ha->resync_ack_count)
176     return;
177
178   ha->in_resync = 0;
179   nat_log_info ("resync completed with result %s",
180                 ha->resync_ack_missed ? "FAILED" : "SUCESS");
181   if (ha->event_callback)
182     ha->event_callback (ha->client_index, ha->pid, ha->resync_ack_missed);
183 }
184
185 /* cache HA NAT data waiting for ACK */
186 static int
187 nat_ha_resend_queue_add (u32 seq, u8 * data, u8 data_len, u8 is_resync,
188                          u32 thread_index)
189 {
190   nat_ha_main_t *ha = &nat_ha_main;
191   nat_ha_per_thread_data_t *td = &ha->per_thread_data[thread_index];
192   nat_ha_resend_entry_t *entry;
193   f64 now = vlib_time_now (ha->vlib_main);
194
195   vec_add2 (td->resend_queue, entry, 1);
196   clib_memset (entry, 0, sizeof (*entry));
197   entry->retry_timer = now + 2.0;
198   entry->seq = seq;
199   entry->is_resync = is_resync;
200   vec_add (entry->data, data, data_len);
201
202   return 0;
203 }
204
205 static_always_inline void
206 nat_ha_ack_recv (u32 seq, u32 thread_index)
207 {
208   nat_ha_main_t *ha = &nat_ha_main;
209   nat_ha_per_thread_data_t *td = &ha->per_thread_data[thread_index];
210   u32 i;
211
212   vec_foreach_index (i, td->resend_queue)
213   {
214     if (td->resend_queue[i].seq != seq)
215       continue;
216
217     vlib_increment_simple_counter (&ha->counters[NAT_HA_COUNTER_RECV_ACK],
218                                    thread_index, 0, 1);
219     /* ACK received remove cached data */
220     if (td->resend_queue[i].is_resync)
221       {
222         clib_atomic_fetch_sub (&ha->resync_ack_count, 1);
223         nat_ha_resync_fin ();
224       }
225     vec_free (td->resend_queue[i].data);
226     vec_del1 (td->resend_queue, i);
227     nat_log_debug ("ACK for seq %d received", clib_net_to_host_u32 (seq));
228
229     return;
230   }
231 }
232
233 /* scan non-ACKed HA NAT for retry */
234 static void
235 nat_ha_resend_scan (f64 now, u32 thread_index)
236 {
237   nat_ha_main_t *ha = &nat_ha_main;
238   nat_ha_per_thread_data_t *td = &ha->per_thread_data[thread_index];
239   u32 i, *del, *to_delete = 0;
240   vlib_main_t *vm = ha->vlib_main;
241   vlib_buffer_t *b = 0;
242   vlib_frame_t *f;
243   u32 bi, *to_next;
244   ip4_header_t *ip;
245
246   vec_foreach_index (i, td->resend_queue)
247   {
248     if (td->resend_queue[i].retry_timer > now)
249       continue;
250
251     /* maximum retry reached delete cached data */
252     if (td->resend_queue[i].retry_count >= NAT_HA_RETRIES)
253       {
254         nat_log_notice ("seq %d missed",
255                         clib_net_to_host_u32 (td->resend_queue[i].seq));
256         if (td->resend_queue[i].is_resync)
257           {
258             clib_atomic_fetch_add (&ha->resync_ack_missed, 1);
259             clib_atomic_fetch_sub (&ha->resync_ack_count, 1);
260             nat_ha_resync_fin ();
261           }
262         vec_add1 (to_delete, i);
263         vlib_increment_simple_counter (&ha->counters
264                                        [NAT_HA_COUNTER_MISSED_COUNT],
265                                        thread_index, 0, 1);
266         continue;
267       }
268
269     /* retry to send non-ACKed data */
270     nat_log_debug ("state sync seq %d resend",
271                    clib_net_to_host_u32 (td->resend_queue[i].seq));
272     td->resend_queue[i].retry_count++;
273     vlib_increment_simple_counter (&ha->counters[NAT_HA_COUNTER_RETRY_COUNT],
274                                    thread_index, 0, 1);
275     if (vlib_buffer_alloc (vm, &bi, 1) != 1)
276       {
277         nat_log_warn ("HA NAT state sync can't allocate buffer");
278         return;
279       }
280     b = vlib_get_buffer (vm, bi);
281     b->current_length = vec_len (td->resend_queue[i].data);
282     b->flags |= VLIB_BUFFER_TOTAL_LENGTH_VALID;
283     b->flags |= VNET_BUFFER_F_LOCALLY_ORIGINATED;
284     vnet_buffer (b)->sw_if_index[VLIB_RX] = 0;
285     vnet_buffer (b)->sw_if_index[VLIB_TX] = 0;
286     ip = vlib_buffer_get_current (b);
287     clib_memcpy (ip, td->resend_queue[i].data,
288                  vec_len (td->resend_queue[i].data));
289     f = vlib_get_frame_to_node (vm, ip4_lookup_node.index);
290     to_next = vlib_frame_vector_args (f);
291     to_next[0] = bi;
292     f->n_vectors = 1;
293     vlib_put_frame_to_node (vm, ip4_lookup_node.index, f);
294     td->resend_queue[i].retry_timer = now + 2.0;
295   }
296
297   vec_foreach (del, to_delete)
298   {
299     vec_free (td->resend_queue[*del].data);
300     vec_del1 (td->resend_queue, *del);
301   }
302   vec_free (to_delete);
303 }
304
305 void
306 nat_ha_init (vlib_main_t * vm, nat_ha_sadd_cb_t sadd_cb,
307              nat_ha_sdel_cb_t sdel_cb, nat_ha_sref_cb_t sref_cb)
308 {
309   nat_ha_main_t *ha = &nat_ha_main;
310   vlib_thread_main_t *tm = vlib_get_thread_main ();
311   vlib_thread_registration_t *tr;
312   uword *p;
313
314   ha->src_ip_address.as_u32 = 0;
315   ha->src_port = 0;
316   ha->dst_ip_address.as_u32 = 0;
317   ha->dst_port = 0;
318   ha->in_resync = 0;
319   ha->resync_ack_count = 0;
320   ha->resync_ack_missed = 0;
321   ha->vlib_main = vm;
322   ha->sadd_cb = sadd_cb;
323   ha->sdel_cb = sdel_cb;
324   ha->sref_cb = sref_cb;
325   ha->num_workers = 0;
326   vec_validate (ha->per_thread_data, tm->n_vlib_mains - 1);
327   ha->fq_index = ~0;
328   p = hash_get_mem (tm->thread_registrations_by_name, "workers");
329   if (p)
330     {
331       tr = (vlib_thread_registration_t *) p[0];
332       if (tr)
333         ha->num_workers = tr->count;
334     }
335
336 #define _(N, s, v) ha->counters[v].name = s;          \
337   ha->counters[v].stat_segment_name = "/nat44/ha/" s; \
338   vlib_validate_simple_counter(&ha->counters[v], 0);  \
339   vlib_zero_simple_counter(&ha->counters[v], 0);
340   foreach_nat_ha_counter
341 #undef _
342 }
343
344 int
345 nat_ha_set_listener (ip4_address_t * addr, u16 port, u32 path_mtu)
346 {
347   nat_ha_main_t *ha = &nat_ha_main;
348
349   /* unregister previously set UDP port */
350   if (ha->src_port)
351     udp_unregister_dst_port (ha->vlib_main, ha->src_port, 1);
352
353   ha->src_ip_address.as_u32 = addr->as_u32;
354   ha->src_port = port;
355   ha->state_sync_path_mtu = path_mtu;
356
357   if (port)
358     {
359       /* if multiple worker threads first go to handoff node */
360       if (ha->num_workers > 1)
361         {
362           if (ha->fq_index == ~0)
363             ha->fq_index = vlib_frame_queue_main_init (nat_ha_node.index, 0);
364           udp_register_dst_port (ha->vlib_main, port,
365                                  nat_ha_handoff_node.index, 1);
366         }
367       else
368         {
369           udp_register_dst_port (ha->vlib_main, port, nat_ha_node.index, 1);
370         }
371       nat_log_info ("HA listening on port %d for state sync", port);
372     }
373
374   return 0;
375 }
376
377 void
378 nat_ha_get_listener (ip4_address_t * addr, u16 * port, u32 * path_mtu)
379 {
380   nat_ha_main_t *ha = &nat_ha_main;
381
382   addr->as_u32 = ha->src_ip_address.as_u32;
383   *port = ha->src_port;
384   *path_mtu = ha->state_sync_path_mtu;
385 }
386
387 int
388 nat_ha_set_failover (ip4_address_t * addr, u16 port,
389                      u32 session_refresh_interval)
390 {
391   nat_ha_main_t *ha = &nat_ha_main;
392
393   ha->dst_ip_address.as_u32 = addr->as_u32;
394   ha->dst_port = port;
395   ha->session_refresh_interval = session_refresh_interval;
396
397   vlib_process_signal_event (ha->vlib_main, nat_ha_process_node.index, 1, 0);
398
399   return 0;
400 }
401
402 void
403 nat_ha_get_failover (ip4_address_t * addr, u16 * port,
404                      u32 * session_refresh_interval)
405 {
406   nat_ha_main_t *ha = &nat_ha_main;
407
408   addr->as_u32 = ha->dst_ip_address.as_u32;
409   *port = ha->dst_port;
410   *session_refresh_interval = ha->session_refresh_interval;
411 }
412
413 static_always_inline void
414 nat_ha_recv_add (nat_ha_event_t * event, f64 now, u32 thread_index)
415 {
416   nat_ha_main_t *ha = &nat_ha_main;
417   ip4_address_t in_addr, out_addr, eh_addr, ehn_addr;
418   u32 fib_index;
419   u16 flags;
420
421   vlib_increment_simple_counter (&ha->counters[NAT_HA_COUNTER_RECV_ADD],
422                                  thread_index, 0, 1);
423
424   in_addr.as_u32 = event->in_addr;
425   out_addr.as_u32 = event->out_addr;
426   eh_addr.as_u32 = event->eh_addr;
427   ehn_addr.as_u32 = event->ehn_addr;
428   fib_index = clib_net_to_host_u32 (event->fib_index);
429   flags = clib_net_to_host_u16 (event->flags);
430
431   ha->sadd_cb (&in_addr, event->in_port, &out_addr, event->out_port, &eh_addr,
432                event->eh_port, &ehn_addr, event->ehn_port, event->protocol,
433                fib_index, flags, thread_index);
434 }
435
436 static_always_inline void
437 nat_ha_recv_del (nat_ha_event_t * event, u32 thread_index)
438 {
439   nat_ha_main_t *ha = &nat_ha_main;
440   ip4_address_t out_addr, eh_addr;
441   u32 fib_index;
442
443   vlib_increment_simple_counter (&ha->counters[NAT_HA_COUNTER_RECV_DEL],
444                                  thread_index, 0, 1);
445
446   out_addr.as_u32 = event->out_addr;
447   eh_addr.as_u32 = event->eh_addr;
448   fib_index = clib_net_to_host_u32 (event->fib_index);
449
450   ha->sdel_cb (&out_addr, event->out_port, &eh_addr, event->eh_port,
451                event->protocol, fib_index, thread_index);
452 }
453
454 static_always_inline void
455 nat_ha_recv_refresh (nat_ha_event_t * event, f64 now, u32 thread_index)
456 {
457   nat_ha_main_t *ha = &nat_ha_main;
458   ip4_address_t out_addr, eh_addr;
459   u32 fib_index, total_pkts;
460   u64 total_bytes;
461
462   vlib_increment_simple_counter (&ha->counters[NAT_HA_COUNTER_RECV_REFRESH],
463                                  thread_index, 0, 1);
464
465   out_addr.as_u32 = event->out_addr;
466   eh_addr.as_u32 = event->eh_addr;
467   fib_index = clib_net_to_host_u32 (event->fib_index);
468   total_pkts = clib_net_to_host_u32 (event->total_pkts);
469   total_bytes = clib_net_to_host_u64 (event->total_bytes);
470
471   ha->sref_cb (&out_addr, event->out_port, &eh_addr, event->eh_port,
472                event->protocol, fib_index, total_pkts, total_bytes,
473                thread_index);
474 }
475
476 /* process received NAT HA event */
477 static_always_inline void
478 nat_ha_event_process (nat_ha_event_t * event, f64 now, u32 thread_index)
479 {
480   switch (event->event_type)
481     {
482     case NAT_HA_ADD:
483       nat_ha_recv_add (event, now, thread_index);
484       break;
485     case NAT_HA_DEL:
486       nat_ha_recv_del (event, thread_index);
487       break;
488     case NAT_HA_REFRESH:
489       nat_ha_recv_refresh (event, now, thread_index);
490       break;
491     default:
492       nat_log_notice ("Unsupported HA event type %d", event->event_type);
493       break;
494     }
495 }
496
497 static inline void
498 nat_ha_header_create (vlib_buffer_t * b, u32 * offset, u32 thread_index)
499 {
500   nat_ha_main_t *ha = &nat_ha_main;
501   nat_ha_message_header_t *h;
502   ip4_header_t *ip;
503   udp_header_t *udp;
504   u32 sequence_number;
505
506   b->current_data = 0;
507   b->current_length = sizeof (*ip) + sizeof (*udp) + sizeof (*h);
508   b->flags |= VLIB_BUFFER_TOTAL_LENGTH_VALID;
509   b->flags |= VNET_BUFFER_F_LOCALLY_ORIGINATED;
510   vnet_buffer (b)->sw_if_index[VLIB_RX] = 0;
511   vnet_buffer (b)->sw_if_index[VLIB_TX] = 0;
512   ip = vlib_buffer_get_current (b);
513   udp = (udp_header_t *) (ip + 1);
514   h = (nat_ha_message_header_t *) (udp + 1);
515
516   /* IP header */
517   ip->ip_version_and_header_length = 0x45;
518   ip->ttl = 254;
519   ip->protocol = IP_PROTOCOL_UDP;
520   ip->flags_and_fragment_offset =
521     clib_host_to_net_u16 (IP4_HEADER_FLAG_DONT_FRAGMENT);
522   ip->src_address.as_u32 = ha->src_ip_address.as_u32;
523   ip->dst_address.as_u32 = ha->dst_ip_address.as_u32;
524   /* UDP header */
525   udp->src_port = clib_host_to_net_u16 (ha->src_port);
526   udp->dst_port = clib_host_to_net_u16 (ha->dst_port);
527   udp->checksum = 0;
528
529   /* NAT HA protocol header */
530   h->version = NAT_HA_VERSION;
531   h->flags = 0;
532   h->count = 0;
533   h->thread_index = clib_host_to_net_u32 (thread_index);
534   sequence_number = clib_atomic_fetch_add (&ha->sequence_number, 1);
535   h->sequence_number = clib_host_to_net_u32 (sequence_number);
536
537   *offset =
538     sizeof (ip4_header_t) + sizeof (udp_header_t) +
539     sizeof (nat_ha_message_header_t);
540 }
541
542 static inline void
543 nat_ha_send (vlib_frame_t * f, vlib_buffer_t * b, u8 is_resync,
544              u32 thread_index)
545 {
546   nat_ha_main_t *ha = &nat_ha_main;
547   nat_ha_per_thread_data_t *td = &ha->per_thread_data[thread_index];
548   nat_ha_message_header_t *h;
549   ip4_header_t *ip;
550   udp_header_t *udp;
551   vlib_main_t *vm = vlib_mains[thread_index];
552
553   ip = vlib_buffer_get_current (b);
554   udp = ip4_next_header (ip);
555   h = (nat_ha_message_header_t *) (udp + 1);
556
557   h->count = clib_host_to_net_u16 (td->state_sync_count);
558
559   ip->length = clib_host_to_net_u16 (b->current_length);
560   ip->checksum = ip4_header_checksum (ip);
561   udp->length = clib_host_to_net_u16 (b->current_length - sizeof (*ip));
562
563   nat_ha_resend_queue_add (h->sequence_number, (u8 *) ip, b->current_length,
564                            is_resync, thread_index);
565
566   vlib_put_frame_to_node (vm, ip4_lookup_node.index, f);
567 }
568
569 /* add NAT HA protocol event */
570 static_always_inline void
571 nat_ha_event_add (nat_ha_event_t * event, u8 do_flush, u32 thread_index,
572                   u8 is_resync)
573 {
574   nat_ha_main_t *ha = &nat_ha_main;
575   nat_ha_per_thread_data_t *td = &ha->per_thread_data[thread_index];
576   vlib_main_t *vm = vlib_mains[thread_index];
577   vlib_buffer_t *b = 0;
578   vlib_frame_t *f;
579   u32 bi = ~0, offset;
580
581   b = td->state_sync_buffer;
582
583   if (PREDICT_FALSE (b == 0))
584     {
585       if (do_flush)
586         return;
587
588       if (vlib_buffer_alloc (vm, &bi, 1) != 1)
589         {
590           nat_log_warn ("HA NAT state sync can't allocate buffer");
591           return;
592         }
593
594       b = td->state_sync_buffer = vlib_get_buffer (vm, bi);
595       clib_memset (vnet_buffer (b), 0, sizeof (*vnet_buffer (b)));
596       VLIB_BUFFER_TRACE_TRAJECTORY_INIT (b);
597       offset = 0;
598     }
599   else
600     {
601       bi = vlib_get_buffer_index (vm, b);
602       offset = td->state_sync_next_event_offset;
603     }
604
605   f = td->state_sync_frame;
606   if (PREDICT_FALSE (f == 0))
607     {
608       u32 *to_next;
609       f = vlib_get_frame_to_node (vm, ip4_lookup_node.index);
610       td->state_sync_frame = f;
611       to_next = vlib_frame_vector_args (f);
612       to_next[0] = bi;
613       f->n_vectors = 1;
614     }
615
616   if (PREDICT_FALSE (td->state_sync_count == 0))
617     nat_ha_header_create (b, &offset, thread_index);
618
619   if (PREDICT_TRUE (do_flush == 0))
620     {
621       clib_memcpy_fast (b->data + offset, event, sizeof (*event));
622       offset += sizeof (*event);
623       td->state_sync_count++;
624       b->current_length += sizeof (*event);
625
626       switch (event->event_type)
627         {
628         case NAT_HA_ADD:
629           vlib_increment_simple_counter (&ha->counters
630                                          [NAT_HA_COUNTER_SEND_ADD],
631                                          thread_index, 0, 1);
632           break;
633         case NAT_HA_DEL:
634           vlib_increment_simple_counter (&ha->counters
635                                          [NAT_HA_COUNTER_SEND_DEL],
636                                          thread_index, 0, 1);
637           break;
638         case NAT_HA_REFRESH:
639           vlib_increment_simple_counter (&ha->counters
640                                          [NAT_HA_COUNTER_SEND_REFRESH],
641                                          thread_index, 0, 1);
642           break;
643         default:
644           break;
645         }
646     }
647
648   if (PREDICT_FALSE
649       (do_flush || offset + (sizeof (*event)) > ha->state_sync_path_mtu))
650     {
651       nat_ha_send (f, b, is_resync, thread_index);
652       td->state_sync_buffer = 0;
653       td->state_sync_frame = 0;
654       td->state_sync_count = 0;
655       offset = 0;
656       if (is_resync)
657         {
658           clib_atomic_fetch_add (&ha->resync_ack_count, 1);
659           nat_ha_resync_fin ();
660         }
661     }
662
663   td->state_sync_next_event_offset = offset;
664 }
665
666 #define skip_if_disabled()          \
667 do {                                \
668   nat_ha_main_t *ha = &nat_ha_main; \
669   if (PREDICT_TRUE (!ha->dst_port)) \
670     return;                         \
671 } while (0)
672
673 void
674 nat_ha_flush (u8 is_resync)
675 {
676   skip_if_disabled ();
677   nat_ha_event_add (0, 1, 0, is_resync);
678 }
679
680 void
681 nat_ha_sadd (ip4_address_t * in_addr, u16 in_port, ip4_address_t * out_addr,
682              u16 out_port, ip4_address_t * eh_addr, u16 eh_port,
683              ip4_address_t * ehn_addr, u16 ehn_port, u8 proto, u32 fib_index,
684              u16 flags, u32 thread_index, u8 is_resync)
685 {
686   nat_ha_event_t event;
687
688   skip_if_disabled ();
689
690   clib_memset (&event, 0, sizeof (event));
691   event.event_type = NAT_HA_ADD;
692   event.flags = clib_host_to_net_u16 (flags);
693   event.in_addr = in_addr->as_u32;
694   event.in_port = in_port;
695   event.out_addr = out_addr->as_u32;
696   event.out_port = out_port;
697   event.eh_addr = eh_addr->as_u32;
698   event.eh_port = eh_port;
699   event.ehn_addr = ehn_addr->as_u32;
700   event.ehn_port = ehn_port;
701   event.fib_index = clib_host_to_net_u32 (fib_index);
702   event.protocol = proto;
703   nat_ha_event_add (&event, 0, thread_index, is_resync);
704 }
705
706 void
707 nat_ha_sdel (ip4_address_t * out_addr, u16 out_port, ip4_address_t * eh_addr,
708              u16 eh_port, u8 proto, u32 fib_index, u32 thread_index)
709 {
710   nat_ha_event_t event;
711
712   skip_if_disabled ();
713
714   clib_memset (&event, 0, sizeof (event));
715   event.event_type = NAT_HA_DEL;
716   event.out_addr = out_addr->as_u32;
717   event.out_port = out_port;
718   event.eh_addr = eh_addr->as_u32;
719   event.eh_port = eh_port;
720   event.fib_index = clib_host_to_net_u32 (fib_index);
721   event.protocol = proto;
722   nat_ha_event_add (&event, 0, thread_index, 0);
723 }
724
725 void
726 nat_ha_sref (ip4_address_t * out_addr, u16 out_port, ip4_address_t * eh_addr,
727              u16 eh_port, u8 proto, u32 fib_index, u32 total_pkts,
728              u64 total_bytes, u32 thread_index, f64 * last_refreshed, f64 now)
729 {
730   nat_ha_main_t *ha = &nat_ha_main;
731   nat_ha_event_t event;
732
733   skip_if_disabled ();
734
735   if ((*last_refreshed + ha->session_refresh_interval) > now)
736     return;
737
738   *last_refreshed = now;
739   clib_memset (&event, 0, sizeof (event));
740   event.event_type = NAT_HA_REFRESH;
741   event.out_addr = out_addr->as_u32;
742   event.out_port = out_port;
743   event.eh_addr = eh_addr->as_u32;
744   event.eh_port = eh_port;
745   event.fib_index = clib_host_to_net_u32 (fib_index);
746   event.protocol = proto;
747   event.total_pkts = clib_host_to_net_u32 (total_pkts);
748   event.total_bytes = clib_host_to_net_u64 (total_bytes);
749   nat_ha_event_add (&event, 0, thread_index, 0);
750 }
751
752 /* per thread process waiting for interrupt */
753 static uword
754 nat_ha_worker_fn (vlib_main_t * vm, vlib_node_runtime_t * rt,
755                   vlib_frame_t * f)
756 {
757   u32 thread_index = vm->thread_index;
758   /* flush HA NAT data under construction */
759   nat_ha_event_add (0, 1, thread_index, 0);
760   /* scan if we need to resend some non-ACKed data */
761   nat_ha_resend_scan (vlib_time_now (vm), thread_index);
762   return 0;
763 }
764
765 /* *INDENT-OFF* */
766 VLIB_REGISTER_NODE (nat_ha_worker_node) = {
767     .function = nat_ha_worker_fn,
768     .type = VLIB_NODE_TYPE_INPUT,
769     .state = VLIB_NODE_STATE_INTERRUPT,
770     .name = "nat-ha-worker",
771 };
772 /* *INDENT-ON* */
773
774 /* periodically send interrupt to each thread */
775 static uword
776 nat_ha_process (vlib_main_t * vm, vlib_node_runtime_t * rt, vlib_frame_t * f)
777 {
778   nat_ha_main_t *ha = &nat_ha_main;
779   uword event_type;
780   uword *event_data = 0;
781   u32 ti;
782
783   vlib_process_wait_for_event (vm);
784   event_type = vlib_process_get_events (vm, &event_data);
785   if (event_type)
786     nat_log_info ("nat-ha-process: bogus kickoff event received");
787   vec_reset_length (event_data);
788
789   while (1)
790     {
791       vlib_process_wait_for_event_or_clock (vm, 1.0);
792       event_type = vlib_process_get_events (vm, &event_data);
793       vec_reset_length (event_data);
794       for (ti = 0; ti < vec_len (vlib_mains); ti++)
795         {
796           if (ti >= vec_len (ha->per_thread_data))
797             continue;
798
799           vlib_node_set_interrupt_pending (vlib_mains[ti],
800                                            nat_ha_worker_node.index);
801         }
802     }
803
804   return 0;
805 }
806
807 /* *INDENT-OFF* */
808 VLIB_REGISTER_NODE (nat_ha_process_node) = {
809     .function = nat_ha_process,
810     .type = VLIB_NODE_TYPE_PROCESS,
811     .name = "nat-ha-process",
812 };
813 /* *INDENT-ON* */
814
815 void
816 nat_ha_get_resync_status (u8 * in_resync, u32 * resync_ack_missed)
817 {
818   nat_ha_main_t *ha = &nat_ha_main;
819
820   *in_resync = ha->in_resync;
821   *resync_ack_missed = ha->resync_ack_missed;
822 }
823
824 int
825 nat44_ha_resync (u32 client_index, u32 pid,
826                  nat_ha_resync_event_cb_t event_callback)
827 {
828   nat_ha_main_t *ha = &nat_ha_main;
829   snat_main_t *sm = &snat_main;
830   snat_session_t *ses;
831   snat_main_per_thread_data_t *tsm;
832
833   if (ha->in_resync)
834     return VNET_API_ERROR_IN_PROGRESS;
835
836   ha->in_resync = 1;
837   ha->resync_ack_count = 0;
838   ha->resync_ack_missed = 0;
839   ha->event_callback = event_callback;
840   ha->client_index = client_index;
841   ha->pid = pid;
842
843   /* *INDENT-OFF* */
844   vec_foreach (tsm, sm->per_thread_data)
845     {
846       pool_foreach (ses, tsm->sessions, ({
847         nat_ha_sadd (&ses->in2out.addr, ses->in2out.port,
848                      &ses->out2in.addr, ses->out2in.port,
849                      &ses->ext_host_addr, ses->ext_host_port,
850                      &ses->ext_host_nat_addr, ses->ext_host_nat_port,
851                      ses->in2out.protocol, ses->in2out.fib_index,
852                      ses->flags, 0, 1);
853       }));
854     }
855   /* *INDENT-ON* */
856
857   nat_ha_flush (1);
858
859   return 0;
860 }
861
862 typedef struct
863 {
864   ip4_address_t addr;
865   u32 event_count;
866 } nat_ha_trace_t;
867
868 static u8 *
869 format_nat_ha_trace (u8 * s, va_list * args)
870 {
871   CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *);
872   CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *);
873   nat_ha_trace_t *t = va_arg (*args, nat_ha_trace_t *);
874
875   s =
876     format (s, "nat-ha: %u events from %U", t->event_count,
877             format_ip4_address, &t->addr);
878
879   return s;
880 }
881
882 typedef enum
883 {
884   NAT_HA_NEXT_IP4_LOOKUP,
885   NAT_HA_NEXT_DROP,
886   NAT_HA_N_NEXT,
887 } nat_ha_next_t;
888
889 #define foreach_nat_ha_error   \
890 _(PROCESSED, "pkts-processed") \
891 _(BAD_VERSION, "bad-version")
892
893 typedef enum
894 {
895 #define _(sym, str) NAT_HA_ERROR_##sym,
896   foreach_nat_ha_error
897 #undef _
898     NAT_HA_N_ERROR,
899 } nat_ha_error_t;
900
901 static char *nat_ha_error_strings[] = {
902 #define _(sym, str) str,
903   foreach_nat_ha_error
904 #undef _
905 };
906
907 /* process received HA NAT protocol messages */
908 static uword
909 nat_ha_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
910                 vlib_frame_t * frame)
911 {
912   u32 n_left_from, *from, next_index, *to_next;
913   f64 now = vlib_time_now (vm);
914   u32 thread_index = vm->thread_index;
915   u32 pkts_processed = 0;
916   ip4_main_t *i4m = &ip4_main;
917   u8 host_config_ttl = i4m->host_config.ttl;
918   nat_ha_main_t *ha = &nat_ha_main;
919
920   from = vlib_frame_vector_args (frame);
921   n_left_from = frame->n_vectors;
922   next_index = node->cached_next_index;
923
924   while (n_left_from > 0)
925     {
926       u32 n_left_to_next;
927
928       vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next);
929
930       while (n_left_from > 0 && n_left_to_next > 0)
931         {
932           u32 bi0, next0, src_addr0, dst_addr0;;
933           vlib_buffer_t *b0;
934           nat_ha_message_header_t *h0;
935           nat_ha_event_t *e0;
936           u16 event_count0, src_port0, dst_port0, old_len0;
937           ip4_header_t *ip0;
938           udp_header_t *udp0;
939           ip_csum_t sum0;
940
941           bi0 = from[0];
942           to_next[0] = bi0;
943           from += 1;
944           to_next += 1;
945           n_left_from -= 1;
946           n_left_to_next -= 1;
947
948           b0 = vlib_get_buffer (vm, bi0);
949           h0 = vlib_buffer_get_current (b0);
950           vlib_buffer_advance (b0, -sizeof (*udp0));
951           udp0 = vlib_buffer_get_current (b0);
952           vlib_buffer_advance (b0, -sizeof (*ip0));
953           ip0 = vlib_buffer_get_current (b0);
954
955           next0 = NAT_HA_NEXT_DROP;
956
957           if (h0->version != NAT_HA_VERSION)
958             {
959               b0->error = node->errors[NAT_HA_ERROR_BAD_VERSION];
960               goto done0;
961             }
962
963           event_count0 = clib_net_to_host_u16 (h0->count);
964           /* ACK for previously send data */
965           if (!event_count0 && (h0->flags & NAT_HA_FLAG_ACK))
966             {
967               nat_ha_ack_recv (h0->sequence_number, thread_index);
968               b0->error = node->errors[NAT_HA_ERROR_PROCESSED];
969               goto done0;
970             }
971
972           e0 = (nat_ha_event_t *) (h0 + 1);
973
974           /* process each event */
975           while (event_count0)
976             {
977               nat_ha_event_process (e0, now, thread_index);
978               event_count0--;
979               e0 = (nat_ha_event_t *) ((u8 *) e0 + sizeof (nat_ha_event_t));
980             }
981
982           next0 = NAT_HA_NEXT_IP4_LOOKUP;
983           pkts_processed++;
984
985           /* reply with ACK */
986           b0->current_length = sizeof (*ip0) + sizeof (*udp0) + sizeof (*h0);
987
988           src_addr0 = ip0->src_address.data_u32;
989           dst_addr0 = ip0->dst_address.data_u32;
990           ip0->src_address.data_u32 = dst_addr0;
991           ip0->dst_address.data_u32 = src_addr0;
992           old_len0 = ip0->length;
993           ip0->length = clib_host_to_net_u16 (b0->current_length);
994
995           sum0 = ip0->checksum;
996           sum0 = ip_csum_update (sum0, ip0->ttl, host_config_ttl,
997                                  ip4_header_t, ttl);
998           ip0->ttl = host_config_ttl;
999           sum0 =
1000             ip_csum_update (sum0, old_len0, ip0->length, ip4_header_t,
1001                             length);
1002           ip0->checksum = ip_csum_fold (sum0);
1003
1004           udp0->checksum = 0;
1005           src_port0 = udp0->src_port;
1006           dst_port0 = udp0->dst_port;
1007           udp0->src_port = dst_port0;
1008           udp0->dst_port = src_port0;
1009           udp0->length =
1010             clib_host_to_net_u16 (b0->current_length - sizeof (*ip0));
1011
1012           h0->flags = NAT_HA_FLAG_ACK;
1013           h0->count = 0;
1014           vlib_increment_simple_counter (&ha->counters
1015                                          [NAT_HA_COUNTER_SEND_ACK],
1016                                          thread_index, 0, 1);
1017
1018         done0:
1019           if (PREDICT_FALSE ((node->flags & VLIB_NODE_FLAG_TRACE)
1020                              && (b0->flags & VLIB_BUFFER_IS_TRACED)))
1021             {
1022               nat_ha_trace_t *t = vlib_add_trace (vm, node, b0, sizeof (*t));
1023               ip4_header_t *ip =
1024                 (void *) (b0->data + vnet_buffer (b0)->l3_hdr_offset);
1025               t->event_count = clib_net_to_host_u16 (h0->count);
1026               t->addr.as_u32 = ip->src_address.data_u32;
1027             }
1028
1029           vlib_validate_buffer_enqueue_x1 (vm, node, next_index,
1030                                            to_next, n_left_to_next,
1031                                            bi0, next0);
1032         }
1033
1034       vlib_put_next_frame (vm, node, next_index, n_left_to_next);
1035     }
1036
1037   vlib_node_increment_counter (vm, nat_ha_node.index,
1038                                NAT_HA_ERROR_PROCESSED, pkts_processed);
1039
1040   return frame->n_vectors;
1041 }
1042
1043 /* *INDENT-OFF* */
1044 VLIB_REGISTER_NODE (nat_ha_node) = {
1045   .function = nat_ha_node_fn,
1046   .name = "nat-ha",
1047   .vector_size = sizeof (u32),
1048   .format_trace = format_nat_ha_trace,
1049   .type = VLIB_NODE_TYPE_INTERNAL,
1050   .n_errors = ARRAY_LEN (nat_ha_error_strings),
1051   .error_strings = nat_ha_error_strings,
1052   .n_next_nodes = NAT_HA_N_NEXT,
1053   .next_nodes = {
1054      [NAT_HA_NEXT_IP4_LOOKUP] = "ip4-lookup",
1055      [NAT_HA_NEXT_DROP] = "error-drop",
1056   },
1057 };
1058 /* *INDENT-ON* */
1059
1060 typedef struct
1061 {
1062   u32 next_worker_index;
1063   u8 in2out;
1064 } nat_ha_handoff_trace_t;
1065
1066 #define foreach_nat_ha_handoff_error  \
1067 _(CONGESTION_DROP, "congestion drop") \
1068 _(SAME_WORKER, "same worker")         \
1069 _(DO_HANDOFF, "do handoff")
1070
1071 typedef enum
1072 {
1073 #define _(sym,str) NAT_HA_HANDOFF_ERROR_##sym,
1074   foreach_nat_ha_handoff_error
1075 #undef _
1076     NAT44_HANDOFF_N_ERROR,
1077 } nat_ha_handoff_error_t;
1078
1079 static char *nat_ha_handoff_error_strings[] = {
1080 #define _(sym,string) string,
1081   foreach_nat_ha_handoff_error
1082 #undef _
1083 };
1084
1085 static u8 *
1086 format_nat_ha_handoff_trace (u8 * s, va_list * args)
1087 {
1088   CLIB_UNUSED (vlib_main_t * vm) = va_arg (*args, vlib_main_t *);
1089   CLIB_UNUSED (vlib_node_t * node) = va_arg (*args, vlib_node_t *);
1090   nat_ha_handoff_trace_t *t = va_arg (*args, nat_ha_handoff_trace_t *);
1091
1092   s =
1093     format (s, "NAT_HA_WORKER_HANDOFF: next-worker %d", t->next_worker_index);
1094
1095   return s;
1096 }
1097
1098 /* do worker handoff based on thread_index in NAT HA protcol header */
1099 static uword
1100 nat_ha_handoff_node_fn (vlib_main_t * vm, vlib_node_runtime_t * node,
1101                         vlib_frame_t * frame)
1102 {
1103   nat_ha_main_t *ha = &nat_ha_main;
1104   vlib_buffer_t *bufs[VLIB_FRAME_SIZE], **b;
1105   u32 n_enq, n_left_from, *from;
1106   u16 thread_indices[VLIB_FRAME_SIZE], *ti;
1107   u32 thread_index = vm->thread_index;
1108   u32 do_handoff = 0, same_worker = 0;
1109
1110   from = vlib_frame_vector_args (frame);
1111   n_left_from = frame->n_vectors;
1112   vlib_get_buffers (vm, from, bufs, n_left_from);
1113
1114   b = bufs;
1115   ti = thread_indices;
1116
1117   while (n_left_from > 0)
1118     {
1119       nat_ha_message_header_t *h0;
1120
1121       h0 = vlib_buffer_get_current (b[0]);
1122       ti[0] = clib_net_to_host_u32 (h0->thread_index);
1123
1124       if (ti[0] != thread_index)
1125         do_handoff++;
1126       else
1127         same_worker++;
1128
1129       if (PREDICT_FALSE ((node->flags & VLIB_NODE_FLAG_TRACE)
1130                          && (b[0]->flags & VLIB_BUFFER_IS_TRACED)))
1131         {
1132           nat_ha_handoff_trace_t *t =
1133             vlib_add_trace (vm, node, b[0], sizeof (*t));
1134           t->next_worker_index = ti[0];
1135         }
1136
1137       n_left_from -= 1;
1138       ti += 1;
1139       b += 1;
1140     }
1141
1142   n_enq =
1143     vlib_buffer_enqueue_to_thread (vm, ha->fq_index, from, thread_indices,
1144                                    frame->n_vectors, 1);
1145
1146   if (n_enq < frame->n_vectors)
1147     vlib_node_increment_counter (vm, node->node_index,
1148                                  NAT_HA_HANDOFF_ERROR_CONGESTION_DROP,
1149                                  frame->n_vectors - n_enq);
1150   vlib_node_increment_counter (vm, node->node_index,
1151                                NAT_HA_HANDOFF_ERROR_SAME_WORKER, same_worker);
1152   vlib_node_increment_counter (vm, node->node_index,
1153                                NAT_HA_HANDOFF_ERROR_DO_HANDOFF, do_handoff);
1154   return frame->n_vectors;
1155 }
1156
1157 /* *INDENT-OFF* */
1158 VLIB_REGISTER_NODE (nat_ha_handoff_node) = {
1159   .function = nat_ha_handoff_node_fn,
1160   .name = "nat-ha-handoff",
1161   .vector_size = sizeof (u32),
1162   .format_trace = format_nat_ha_handoff_trace,
1163   .type = VLIB_NODE_TYPE_INTERNAL,
1164   .n_errors = ARRAY_LEN(nat_ha_handoff_error_strings),
1165   .error_strings = nat_ha_handoff_error_strings,
1166   .n_next_nodes = 1,
1167   .next_nodes = {
1168     [0] = "error-drop",
1169   },
1170 };
1171 /* *INDENT-ON* */
1172
1173 /*
1174  * fd.io coding-style-patch-verification: ON
1175  *
1176  * Local Variables:
1177  * eval: (c-set-style "gnu")
1178  * End:
1179  */