maplog headers, offline processing, doxygen tags 95/9695/3
authorDave Barach <[email protected]>
Fri, 1 Dec 2017 21:20:32 +0000 (16:20 -0500)
committerDamjan Marion <[email protected]>
Sat, 2 Dec 2017 18:44:07 +0000 (18:44 +0000)
Change-Id: I0545018ec02f3706ad6a2da6fc13537db5c31a2d
Signed-off-by: Dave Barach <[email protected]>
src/vppinfra/maplog.c
src/vppinfra/maplog.h
src/vppinfra/test_maplog.c

index 94cc9ac..41ee075 100644 (file)
 
 #include <vppinfra/maplog.h>
 
+/**
+ * @brief Initialize a maplog object
+ *
+ * Compute record and file size parameters
+ * Create and map two log segments to seed the process
+ *
+ * @param[in/out] a   init args structure
+ * @return    0 => success, <0 => failure
+ */
 int
-clib_maplog_init (clib_maplog_main_t * mm, char *file_basename,
-                 u64 file_size_in_bytes, u32 record_size_in_bytes)
+clib_maplog_init (clib_maplog_init_args_t * a)
 {
   int i, fd;
   int rv = 0;
   u8 zero = 0;
   u32 record_size_in_cache_lines;
   u64 file_size_in_records;
+  clib_maplog_main_t *mm;
+  clib_maplog_header_t _h, *h = &_h;
+
+  ASSERT (a && a->mm);
+  mm = a->mm;
 
   /* Already initialized? */
   if (mm->flags & CLIB_MAPLOG_FLAG_INIT)
@@ -32,20 +45,26 @@ clib_maplog_init (clib_maplog_main_t * mm, char *file_basename,
   memset (mm, 0, sizeof (*mm));
 
   record_size_in_cache_lines =
-    (record_size_in_bytes + CLIB_CACHE_LINE_BYTES -
+    (a->record_size_in_bytes + CLIB_CACHE_LINE_BYTES -
      1) / CLIB_CACHE_LINE_BYTES;
 
-  file_size_in_records = file_size_in_bytes
+  file_size_in_records = a->file_size_in_bytes
     / (record_size_in_cache_lines * CLIB_CACHE_LINE_BYTES);
 
   /* Round up file size in records to a power of 2, for speed... */
   mm->log2_file_size_in_records = max_log2 (file_size_in_records);
   file_size_in_records = 1ULL << (mm->log2_file_size_in_records);
 
-  file_size_in_bytes = file_size_in_records * record_size_in_cache_lines
+  a->file_size_in_bytes = file_size_in_records * record_size_in_cache_lines
     * CLIB_CACHE_LINE_BYTES;
 
-  mm->file_basename = format (0, "%s", file_basename);
+  mm->file_basename = format (0, "%s", a->file_basename);
+  if (vec_len (mm->file_basename) > ARRAY_LEN (h->file_basename))
+    {
+      vec_free (mm->file_basename);
+      return -11;
+    }
+
   mm->file_size_in_records = file_size_in_records;
   mm->flags |= CLIB_MAPLOG_FLAG_INIT;
   mm->record_size_in_cachelines = record_size_in_cache_lines;
@@ -55,6 +74,7 @@ clib_maplog_init (clib_maplog_main_t * mm, char *file_basename,
     {
       mm->filenames[i] = format (0, "%v_%d", mm->file_basename,
                                 mm->current_file_index++);
+      vec_add1 (mm->filenames[i], 0);
 
       fd = open ((char *) mm->filenames[i], O_CREAT | O_RDWR | O_TRUNC, 0600);
       if (fd < 0)
@@ -63,7 +83,7 @@ clib_maplog_init (clib_maplog_main_t * mm, char *file_basename,
          goto fail;
        }
 
-      if (lseek (fd, file_size_in_bytes - 1, SEEK_SET) == (off_t) - 1)
+      if (lseek (fd, a->file_size_in_bytes - 1, SEEK_SET) == (off_t) - 1)
        {
          rv = -4;
          goto fail;
@@ -74,7 +94,7 @@ clib_maplog_init (clib_maplog_main_t * mm, char *file_basename,
          goto fail;
        }
 
-      mm->file_baseva[i] = mmap (0, file_size_in_bytes,
+      mm->file_baseva[i] = mmap (0, a->file_size_in_bytes,
                                 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
       if (mm->file_baseva[i] == (u8 *) MAP_FAILED)
        {
@@ -83,7 +103,41 @@ clib_maplog_init (clib_maplog_main_t * mm, char *file_basename,
        }
       (void) close (fd);
     }
-  return rv;
+
+  memset (h, 0, sizeof (*h));
+  h->maplog_major_version = MAPLOG_MAJOR_VERSION;
+  h->maplog_minor_version = MAPLOG_MINOR_VERSION;
+  h->maplog_patch_version = MAPLOG_PATCH_VERSION;
+  h->application_id = a->application_id;
+  h->application_major_version = a->application_major_version;
+  h->application_minor_version = a->application_minor_version;
+  h->application_patch_version = a->application_patch_version;
+  h->record_size_in_cachelines = record_size_in_cache_lines;
+  h->cacheline_size = CLIB_CACHE_LINE_BYTES;
+  h->file_size_in_records = file_size_in_records;
+  h->number_of_records = ~0ULL;
+  h->number_of_files = ~0ULL;
+  memcpy (h->file_basename, mm->file_basename, vec_len (mm->file_basename));
+
+  mm->header_filename = format (0, "%v_header", mm->file_basename);
+  vec_add1 (mm->header_filename, 0);
+
+  fd = open ((char *) mm->header_filename, O_CREAT | O_RDWR | O_TRUNC, 0600);
+  if (fd < 0)
+    {
+      clib_unix_warning ("header create");
+      rv = -6;
+      goto fail;
+    }
+  rv = write (fd, h, sizeof (*h));
+  if (rv != sizeof (*h))
+    {
+      clib_unix_warning ("header write");
+      rv = -7;
+      goto fail;
+    }
+  (void) close (fd);
+  return 0;
 
 fail:
   if (fd >= 0)
@@ -92,11 +146,21 @@ fail:
   for (i = 0; i < 2; i++)
     {
       if (mm->file_baseva[i])
-       (void) munmap ((u8 *) mm->file_baseva[i], file_size_in_bytes);
+       (void) munmap ((u8 *) mm->file_baseva[i], a->file_size_in_bytes);
+      if (mm->filenames[i])
+       (void) unlink ((char *) mm->filenames[i]);
+      vec_free (mm->filenames[i]);
+    }
+  if (mm->header_filename)
+    {
+      (void) unlink ((char *) mm->header_filename);
+      vec_free (mm->header_filename);
     }
   return rv;
 }
 
+/* slow path: unmap a full log segment, and replace it */
+
 u8 *
 _clib_maplog_get_entry_slowpath (clib_maplog_main_t * mm, u64 my_record_index)
 {
@@ -107,16 +171,24 @@ _clib_maplog_get_entry_slowpath (clib_maplog_main_t * mm, u64 my_record_index)
   u64 file_size_in_bytes = mm->file_size_in_records
     * mm->record_size_in_cachelines * CLIB_CACHE_LINE_BYTES;
 
-  (void) munmap ((u8 *) mm->file_baseva[unmap_index], file_size_in_bytes);
+  /*
+   * Kill some time by calling format before we make the previous log
+   * segment disappear. Obviously it won't do to call clib_maplog_get_entry(),
+   * wait 100ms, and then fill in the log entry.
+   */
   vec_reset_length (mm->filenames[unmap_index]);
-
   mm->filenames[unmap_index] = format (mm->filenames[unmap_index],
                                       "%v_%d", mm->file_basename,
                                       mm->current_file_index++);
 
+  /* Unmap the previous (full) segment */
+  (void) munmap ((u8 *) mm->file_baseva[unmap_index], file_size_in_bytes);
+
+  /* Create a new segment */
   fd = open ((char *) mm->filenames[unmap_index],
             O_CREAT | O_RDWR | O_TRUNC, 0600);
-  /* $$$ this is not real error recovery... */
+
+  /* This is not real error recovery... */
   if (fd < 0)
     {
       clib_unix_warning ("creat");
@@ -151,11 +223,21 @@ _clib_maplog_get_entry_slowpath (clib_maplog_main_t * mm, u64 my_record_index)
   return rv;
 }
 
+/**
+ * @brief Close a mapped log, and update the log header file
+ *
+ * Unmap the current log segments.
+ * Read the log header. Update the number of records, and number of files
+ *
+ * @param[in/out] mm   mapped log object
+ */
 void
 clib_maplog_close (clib_maplog_main_t * mm)
 {
-  int i;
+  int i, rv;
   u64 file_size_in_bytes;
+  int fd;
+  clib_maplog_header_t _h, *h = &_h;
 
   if (!(mm->flags & CLIB_MAPLOG_FLAG_INIT))
     return;
@@ -164,16 +246,185 @@ clib_maplog_close (clib_maplog_main_t * mm)
     mm->file_size_in_records * mm->record_size_in_cachelines *
     CLIB_CACHE_LINE_BYTES;
 
+  /* unmap current + next segments */
   for (i = 0; i < 2; i++)
     {
       (void) munmap ((u8 *) mm->file_baseva[i], file_size_in_bytes);
       vec_free (mm->filenames[i]);
     }
 
+  /* Open the log header */
+  fd = open ((char *) mm->header_filename, O_RDWR, 0600);
+  if (fd < 0)
+    {
+      clib_unix_warning ("reopen maplog header");
+      goto out;
+    }
+
+  /* Read the log */
+  rv = read (fd, h, sizeof (*h));
+  if (rv != sizeof (*h))
+    {
+      clib_unix_warning ("read maplog header");
+      goto out;
+    }
+  /* Fix the header... */
+  h->number_of_records = mm->next_record_index;
+  h->number_of_files = mm->current_file_index;
+
+  /* Back to the beginning of the log header... */
+  if (lseek (fd, 0, SEEK_SET) < 0)
+    {
+      clib_unix_warning ("lseek to rewrite header");
+      goto out;
+    }
+  /* Rewrite the log header */
+  rv = write (fd, h, sizeof (*h));
+  if (rv != sizeof (*h))
+    clib_unix_warning ("rewrite header");
+
+out:
+  if (fd >= 0)
+    (void) close (fd);
+
   vec_free (mm->file_basename);
+  vec_free (mm->header_filename);
   memset (mm, 0, sizeof (*mm));
 }
 
+/**
+ * @brief format a log header
+ *
+ * Usage: s = format (0, "%U", format_maplog_header, headerp, verbose);
+ * @param [in] h clib_maplog_header_t pointer
+ * @param [in] verbose self-explanatory
+ */
+u8 *
+format_maplog_header (u8 * s, va_list * args)
+{
+  clib_maplog_header_t *h = va_arg (*args, clib_maplog_header_t *);
+  int verbose = va_arg (*args, int);
+
+  if (!verbose)
+    goto brief;
+  s = format (s, "basename %s ", h->file_basename);
+  s = format (s, "log ver %d.%d.%d app id %u ver %d.%d.%d\n",
+             h->maplog_major_version,
+             h->maplog_minor_version,
+             h->maplog_patch_version,
+             h->application_id,
+             h->application_major_version,
+             h->application_minor_version, h->application_patch_version);
+  s = format (s, "  records are %d %d-byte cachelines\n",
+             h->record_size_in_cachelines, h->cacheline_size);
+  s = format (s, "  files are %lld records long, %lld files\n",
+             h->file_size_in_records, h->number_of_files);
+  s = format (s, "  %lld records total\n", h->number_of_records);
+  return s;
+
+brief:
+  s = format (s, "%s %lld records %lld files %lld records/file",
+             h->file_basename, h->number_of_records, h->number_of_files,
+             h->file_size_in_records);
+  return s;
+}
+
+/**
+ * @brief Process a complete maplog
+ *
+ * Reads the maplog header. Map and process all log segments in order.
+ * Calls the callback function once per file with a record count.
+ *
+ * @param [in] file_basename Same basename supplied to clib_maplog_init
+ * @param [in] fp_arg Callback function pointer
+ */
+int
+clib_maplog_process (char *file_basename, void *fp_arg)
+{
+  clib_maplog_header_t _h, *h = &_h;
+  int fd, rv = 0;
+  u64 file_index;
+  u64 file_size_in_bytes;
+  u8 *header_filename, *this_filename = 0;
+  u8 *file_baseva;
+  void (*fp) (clib_maplog_header_t *, void *data, u64 count);
+  u64 records_this_file, records_left;
+  ASSERT (fp_arg);
+
+  fp = fp_arg;
+
+  header_filename = format (0, "%s_header%c", file_basename, 0);
+
+  fd = open ((char *) header_filename, O_RDONLY, 0600);
+  if (fd < 0)
+    {
+      clib_unix_warning ("open maplog header");
+      rv = -1;
+      goto out;
+    }
+  rv = read (fd, h, sizeof (*h));
+  if (rv != sizeof (*h))
+    {
+      clib_unix_warning ("read maplog header");
+      rv = -2;
+      goto out;
+    }
+  (void) close (fd);
+
+  file_size_in_bytes = h->file_size_in_records
+    * h->record_size_in_cachelines * CLIB_CACHE_LINE_BYTES;
+
+  records_left = h->number_of_records;
+
+  for (file_index = 0; file_index < h->number_of_files; file_index++)
+    {
+      vec_reset_length (this_filename);
+      this_filename = format (this_filename, "%s_%llu%c", file_basename,
+                             file_index, 0);
+      fd = open ((char *) this_filename, O_RDONLY, 0600);
+      if (fd < 0)
+       {
+         clib_unix_warning ("open maplog file");
+         rv = -3;
+         goto out;
+       }
+
+      file_baseva =
+       mmap (0, file_size_in_bytes, PROT_READ, MAP_SHARED, fd, 0);
+      (void) close (fd);
+      if (file_baseva == (u8 *) MAP_FAILED)
+       {
+         clib_unix_warning ("mmap");
+         rv = -4;
+         goto out;
+       }
+
+      records_this_file = (records_left > h->file_size_in_records) ?
+       h->file_size_in_records : records_left;
+
+      (*fp) (h, file_baseva, records_this_file);
+
+      if (munmap (file_baseva, file_size_in_bytes) < 0)
+       {
+         clib_warning ("munmap");
+         rv = -5;
+         /* but don't stop... */
+       }
+      records_left -= records_this_file;
+      if (records_left == 0)
+       break;
+    }
+
+out:
+  if (fd > 0)
+    (void) close (fd);
+
+  vec_free (this_filename);
+  vec_free (header_filename);
+  return rv;
+}
+
+
 /*
  * fd.io coding-style-patch-verification: ON
  *
index 192dc22..d1c23d2 100644 (file)
 #ifndef __included_maplog_h__
 #define __included_maplog_h__
 
-/** \file
+/** @file
+    @brief mmap-based thread-safe fixed-size record double-buffered logging.
 
-   mmap-based fixed-size record double-buffered logging
+   This scheme should be about as fast as practicable. By fiat, log
+   records are rounded to a multiple of CLIB_CACHE_LINE_BYTES.
+   Consumer code calls clib_maplog_get_entry(...) to obtain a pointer
+   to a log entry.
+
+   We use an atomic ticket-counter to dole out log entries. Whichever
+   client thread crosses the double-buffer boundary is in charge of
+   replacing the log segment which just filled.
 */
 
 #include <vppinfra/clib.h>
 #include <fcntl.h>
 #include <sys/mman.h>
 
+/** Maplog log file header segment. In a separate file */
+
+typedef struct
+{
+  u8 maplog_major_version;     /**< library major version number */
+  u8 maplog_minor_version;     /**< library minor version number */
+  u8 maplog_patch_version;     /**< library patch version number */
+  u8 pad;
+  u32 application_id;          /**< application identifier */
+  u8 application_major_version;        /**< application major version number */
+  u8 application_minor_version;        /**< application minor version number */
+  u8 application_patch_version;        /**< application patch version number */
+  u8 pad2;
+  u32 record_size_in_cachelines; /**< record size in cache lines */
+  u32 cacheline_size;           /**< cache line size  */
+  u64 file_size_in_records;     /**< file size in records */
+  u64 number_of_records;        /**< number of records in entire log  */
+  u64 number_of_files;          /**< number of files in entire log  */
+  u8 file_basename[256];        /**< file basename  */
+} clib_maplog_header_t;
+
+#define MAPLOG_MAJOR_VERSION 1
+#define MAPLOG_MINOR_VERSION 0
+#define MAPLOG_PATCH_VERSION 0
+
+/** Process-private main data structure */
+
 typedef struct
 {
-  /* rw: atomic ticket-counter, file index */
   CLIB_CACHE_LINE_ALIGN_MARK (cacheline1);
+  /** rw cache line: atomic ticket-counter, file index */
   volatile u64 next_record_index;
-  u64 file_size_in_records; /**< power of two */
-  u32 log2_file_size_in_records;
-  volatile u32 current_file_index;
-  volatile u32 flags;
+  /** file size in records, rounded to a power of two */
+  u64 file_size_in_records;
+  u32 log2_file_size_in_records; /**< lg file size in records */
+  volatile u32 current_file_index; /**< current file index */
+  volatile u32 flags;             /**< flags, currently just "init" or not  */
 
-  /* ro: size parameters */
+  /* read-mostly cache line: size parameters, file names, etc. */
     CLIB_CACHE_LINE_ALIGN_MARK (cacheline2);
-  u32 record_size_in_cachelines;
+  u32 record_size_in_cachelines; /**< record size in cache lines */
 
   /* double-buffered mmap'ed logfiles */
-  volatile u8 *file_baseva[2];
-  u8 *filenames[2];
+  volatile u8 *file_baseva[2]; /**< active segment base addresses */
+  u8 *filenames[2];            /**< active segment file names  */
   /* vector not c-string */
-  u8 *file_basename;
+  u8 *file_basename;           /**< basename, e.g. "/tmp/mylog" */
+  u8 *header_filename;         /**< log header file name */
 } clib_maplog_main_t;
 
-int clib_maplog_init (clib_maplog_main_t * mm, char *file_basename,
-                     u64 file_size, u32 record_size_in_bytes);
+/* flag bits */
+#define CLIB_MAPLOG_FLAG_INIT  (1<<0)
 
+/** log initialization structure */
+typedef struct
+{
+  clib_maplog_main_t *mm;      /**< pointer to the main structure */
+  char *file_basename;         /**< file base name  */
+  u64 file_size_in_bytes;      /**< file size in bytes */
+  u32 record_size_in_bytes;    /**< record size in bytes */
+  u32 application_id;          /**< application identifier */
+  u8 application_major_version;        /**< applcation major version number */
+  u8 application_minor_version;        /**< applcation minor version number */
+  u8 application_patch_version;        /**< applcation patch version number */
+} clib_maplog_init_args_t;
+
+/* function prototypes */
+
+int clib_maplog_init (clib_maplog_init_args_t * ap);
 void clib_maplog_close (clib_maplog_main_t * mm);
+int clib_maplog_process (char *file_basename, void *fp_arg);
 
-#define CLIB_MAPLOG_FLAG_INIT  (1<<0)
+format_function_t format_maplog_header;
 
 u8 *_clib_maplog_get_entry_slowpath (clib_maplog_main_t * mm,
                                     u64 my_record_index);
 
+/**
+ * @brief Obtain a log entry pointer
+ *
+ * Increments the atomic ticket counter, and returns a pointer to
+ * the newly-allocated log entry. The slowpath function replaces
+ * a full log segment with a new/fresh/empty log segment
+ *
+ * @param[in] mm   maplog object pointer
+ * @return    pointer to the allocated log entry
+ */
 static inline void *
 clib_maplog_get_entry (clib_maplog_main_t * mm)
 {
index fb9fb0c..c49ebd6 100644 (file)
@@ -23,15 +23,52 @@ typedef struct
   u64 junk[7];
 } test_entry_t;
 
+static void
+process_maplog_records (clib_maplog_header_t * h,
+                       test_entry_t * e, u64 records_this_file)
+{
+  static int print_header;
+  int i = 0;
+
+  if (print_header == 0)
+    {
+      print_header = 1;
+      fformat (stdout, "%U", format_maplog_header, h, 1 /* verbose */ );
+    }
+
+  while (records_this_file--)
+    {
+      fformat (stdout, "%4lld ", e->serial_number);
+      if (++i == 8)
+       {
+         fformat (stdout, "\n");
+         i = 0;
+       }
+      e++;
+    }
+  fformat (stdout, "--------------\n");
+}
+
 int
 test_maplog_main (unformat_input_t * input)
 {
   clib_maplog_main_t *mm = &maplog_main;
+  clib_maplog_init_args_t _a, *a = &_a;
   int rv;
   int i;
   test_entry_t *t;
 
-  rv = clib_maplog_init (mm, "/tmp/maplog_test", 4096, sizeof (test_entry_t));
+  memset (a, 0, sizeof (*a));
+  a->mm = mm;
+  a->file_basename = "/tmp/maplog_test";
+  a->file_size_in_bytes = 4096;
+  a->record_size_in_bytes = sizeof (test_entry_t);
+  a->application_id = 1;
+  a->application_major_version = 1;
+  a->application_minor_version = 0;
+  a->application_patch_version = 0;
+
+  rv = clib_maplog_init (a);
 
   if (rv)
     {
@@ -47,6 +84,8 @@ test_maplog_main (unformat_input_t * input)
 
   clib_maplog_close (mm);
 
+  clib_maplog_process ("/tmp/maplog_test", process_maplog_records);
+
   return 0;
 }