jvpp: bugfix for deadlock in java future API (VPP-1037) 57/8957/9
authorMatej <matej.perina@pantheon.tech>
Mon, 23 Oct 2017 10:55:11 +0000 (12:55 +0200)
committerDamjan Marion <dmarion.lists@gmail.com>
Mon, 30 Oct 2017 11:06:57 +0000 (11:06 +0000)
- message sending method inside synchronization blocks causes
deadlock between sending and receiving part
- breaking atomicity of sending message and putting future with
corresponding id to map needs additional handling by writer and receiver,
regardless which part get access to sync block first will create
new future and second one will complete it and remove from map,
in case of dump calls where control ping reply is required
as confirmation that all information were send, if ping reply is
received before writer put future in map, reader will create
regular control ping future instead and writer needs to made association
between these two futures

Change-Id: Id29a19be7a5319291a5e07cf931080610178f00c
Signed-off-by: Matej <matej.perina@pantheon.tech>
src/vpp-api/java/jvpp-registry/io/fd/vpp/jvpp/JVppRegistryImpl.java
src/vpp-api/java/jvpp-registry/io/fd/vpp/jvpp/future/AbstractFutureJVppInvoker.java
src/vpp-api/java/jvpp/gen/jvppgen/jvpp_future_facade_gen.py

index 6e938ae..baef14c 100644 (file)
@@ -99,15 +99,24 @@ public final class JVppRegistryImpl implements JVppRegistry, ControlPingCallback
         final ControlPingCallback callback = (ControlPingCallback) pluginRegistry.get(clazz.getName());
         assertPluginWasRegistered(name, callback);
 
+        // controlPing0 is sending function and can go to waiting in case of e. g. full queue
+        // because of that it cant be in same synchronization block as used by reply handler function
+        int context = controlPing0();
+        if (context < 0) {
+            throw new VppInvocationException("controlPing", context);
+        }
+
         synchronized (pingCalls) {
-            int context = controlPing0();
-            if (context < 0) {
-                throw new VppInvocationException("controlPing", context);
+            // if callback is in map it's because reply was already received
+            EarlyControlPingReply earlyReplyCallback = (EarlyControlPingReply) pingCalls.remove(context);
+            if(earlyReplyCallback == null) {
+                pingCalls.put(context, callback);
+            } else {
+                callback.onControlPingReply(earlyReplyCallback.getReply());
             }
-
-            pingCalls.put(context, callback);
-            return context;
         }
+
+        return context;
     }
 
     @Override
@@ -116,8 +125,9 @@ public final class JVppRegistryImpl implements JVppRegistry, ControlPingCallback
         synchronized (pingCalls) {
             callback = pingCalls.remove(reply.context);
             if (callback == null) {
-                LOG.log(Level.WARNING, "No callback was registered for reply context=" + reply.context + " Contexts waiting="
-                    + pingCalls.keySet());
+                // reply received early, because we don't know callback to call
+                // we wrap the reply and let the sender to call it
+                pingCalls.put(reply.context, new EarlyControlPingReply(reply));
                 return;
             }
         }
@@ -151,4 +161,27 @@ public final class JVppRegistryImpl implements JVppRegistry, ControlPingCallback
     public void close() throws Exception {
         connection.close();
     }
+
+    private static class EarlyControlPingReply implements ControlPingCallback {
+
+        private final ControlPingReply reply;
+
+        public EarlyControlPingReply(final ControlPingReply reply) {
+            this.reply = reply;
+        }
+
+        public ControlPingReply getReply() {
+            return reply;
+        }
+
+        @Override
+        public void onError(VppCallbackException ex) {
+            throw new IllegalStateException("Calling onError in EarlyControlPingReply");
+        }
+
+        @Override
+        public void onControlPingReply(ControlPingReply reply) {
+            throw new IllegalStateException("Calling onControlPingReply in EarlyControlPingReply");
+        }
+    }
 }
index e7df528..ac85f53 100644 (file)
@@ -21,6 +21,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
+
 import io.fd.vpp.jvpp.JVpp;
 import io.fd.vpp.jvpp.JVppRegistry;
 import io.fd.vpp.jvpp.VppInvocationException;
@@ -62,27 +63,41 @@ public abstract class AbstractFutureJVppInvoker implements FutureJVppInvoker {
     @Override
     @SuppressWarnings("unchecked")
     public <REQ extends JVppRequest, REPLY extends JVppReply<REQ>> CompletionStage<REPLY> send(REQ req) {
-        synchronized(requests) {
-            try {
-                final CompletableFuture<REPLY> replyCompletableFuture;
-                final int contextId = jvpp.send(req);
+        try {
+            // jvpp.send() can go to waiting state if sending queue is full, putting it into same
+            // synchronization block as used by receiving part (synchronized(requests)) can lead
+            // to deadlock between these two sides or at least slowing sending process by slow
+            // reader
+            final CompletableFuture<REPLY> replyCompletableFuture;
+            final int contextId = jvpp.send(req);
+
+            if(req instanceof JVppDump) {
+                throw new IllegalArgumentException("Send with empty reply dump has to be used in case of dump calls");
+            }
 
-                if(req instanceof JVppDump) {
-                    throw new IllegalArgumentException("Send with empty reply dump has to be used in case of dump calls");
+            synchronized(requests) {
+                CompletableFuture<? extends JVppReply<?>> replyFuture = requests.get(contextId);
+                if (replyFuture == null) {
+                    // reply not yet received, put new future into map
+                    replyCompletableFuture = new CompletableFuture<>();
+                    requests.put(contextId, replyCompletableFuture);
+                } else {
+                    // reply already received (should be completed by reader),
+                    // remove future from map and return it to caller
+                    replyCompletableFuture = (CompletableFuture<REPLY>) replyFuture;
+                    requests.remove(contextId);
                 }
-                replyCompletableFuture = new CompletableFuture<>();
-                requests.put(contextId, replyCompletableFuture);
-
-                // TODO in case of timeouts/missing replies, requests from the map are not removed
-                // consider adding cancel method, that would remove requests from the map and cancel
-                // associated replyCompletableFuture
-
-                return replyCompletableFuture;
-            } catch (VppInvocationException ex) {
-                final CompletableFuture<REPLY> replyCompletableFuture = new CompletableFuture<>();
-                replyCompletableFuture.completeExceptionally(ex);
-                return replyCompletableFuture;
             }
+
+            // TODO in case of timeouts/missing replies, requests from the map are not removed
+            // consider adding cancel method, that would remove requests from the map and cancel
+            // associated replyCompletableFuture
+
+            return replyCompletableFuture;
+        } catch (VppInvocationException ex) {
+            final CompletableFuture<REPLY> replyCompletableFuture = new CompletableFuture<>();
+            replyCompletableFuture.completeExceptionally(ex);
+            return replyCompletableFuture;
         }
     }
 
@@ -90,30 +105,54 @@ public abstract class AbstractFutureJVppInvoker implements FutureJVppInvoker {
     @SuppressWarnings("unchecked")
     public <REQ extends JVppRequest, REPLY extends JVppReply<REQ>, DUMP extends JVppReplyDump<REQ, REPLY>> CompletionStage<DUMP> send(
             REQ req, DUMP emptyReplyDump) {
-        synchronized(requests) {
-            try {
-                final CompletableDumpFuture<DUMP> replyCompletableFuture;
-                final int contextId = jvpp.send(req);
-
-                if(!(req instanceof JVppDump)) {
-                    throw new IllegalArgumentException("Send without empty reply dump has to be used in case of regular calls");
-                }
-                replyCompletableFuture = new CompletableDumpFuture<>(contextId, emptyReplyDump);
-
-                requests.put(contextId, replyCompletableFuture);
-                requests.put(registry.controlPing(jvpp.getClass()), replyCompletableFuture);
-
-                // TODO in case of timeouts/missing replies, requests from the map are not removed
-                // consider adding cancel method, that would remove requests from the map and cancel
-                // associated replyCompletableFuture
-
-                return replyCompletableFuture;
-            } catch (VppInvocationException ex) {
-                final CompletableFuture<DUMP> replyCompletableFuture = new CompletableFuture<>();
-                replyCompletableFuture.completeExceptionally(ex);
-                return replyCompletableFuture;
-            }
-        }
+      try {
+          // jvpp.send() and registry.controlPing() can go to waiting state if sending queue is full,
+          // putting it into same synchronization block as used by receiving part (synchronized(requests))
+          // can lead to deadlock between these two sides or at least slowing sending process by slow reader
+          final CompletableDumpFuture<DUMP> replyDumpFuture;
+          final int contextId = jvpp.send(req);
+
+          if(!(req instanceof JVppDump)) {
+              throw new IllegalArgumentException("Send without empty reply dump has to be used in case of regular calls");
+          }
+
+          synchronized(requests) {
+              CompletableFuture<? extends JVppReply<?>> replyFuture = requests.get(contextId);
+              if (replyFuture == null) {
+                  // reply not received yet, put new future to map
+                  replyDumpFuture = new CompletableDumpFuture<>(contextId, emptyReplyDump);
+                  requests.put(contextId, replyDumpFuture);
+              } else {
+                  // reply already received, save existing future
+                  replyDumpFuture = (CompletableDumpFuture<DUMP>) replyFuture;
+              }
+          }
+
+          final int pingId = registry.controlPing(jvpp.getClass());
+
+          synchronized(requests) {
+              if (requests.remove(pingId) == null) {
+                  // reply not received yet, put future into map under pingId
+                  requests.put(pingId, replyDumpFuture);
+              } else {
+                  // reply already received, complete future
+                  // ping reply couldn't complete the future because it is not in map under
+                  // ping id
+                  replyDumpFuture.complete(replyDumpFuture.getReplyDump());
+                  requests.remove(contextId);
+              }
+          }
+
+          // TODO in case of timeouts/missing replies, requests from the map are not removed
+          // consider adding cancel method, that would remove requests from the map and cancel
+          // associated replyCompletableFuture
+
+          return replyDumpFuture;
+      } catch (VppInvocationException ex) {
+          final CompletableFuture<DUMP> replyCompletableFuture = new CompletableFuture<>();
+          replyCompletableFuture.completeExceptionally(ex);
+          return replyCompletableFuture;
+      }
     }
 
     public static final class CompletableDumpFuture<T extends JVppReplyDump<?, ?>> extends CompletableFuture<T> {
index a31287b..a45a532 100644 (file)
@@ -61,26 +61,28 @@ public final class FutureJVpp${plugin_name}FacadeCallback implements $plugin_pac
     @Override
     @SuppressWarnings("unchecked")
     public void onControlPingReply(final $base_package.$dto_package.ControlPingReply reply) {
-        final java.util.concurrent.CompletableFuture<$base_package.$dto_package.JVppReply<?>> completableFuture;
+        java.util.concurrent.CompletableFuture<$base_package.$dto_package.JVppReply<?>> completableFuture;
 
         final int replyId = reply.context;
         synchronized(requests) {
             completableFuture = (java.util.concurrent.CompletableFuture<$base_package.$dto_package.JVppReply<?>>) requests.get(replyId);
-        }
 
-        if(completableFuture != null) {
-            // Finish dump call
-            if (completableFuture instanceof $base_package.$future_package.AbstractFutureJVppInvoker.CompletableDumpFuture) {
-                completableFuture.complete((($base_package.$future_package.AbstractFutureJVppInvoker.CompletableDumpFuture) completableFuture).getReplyDump());
-                // Remove future mapped to dump call context id
-                synchronized(requests) {
+            if(completableFuture != null) {
+                // Finish dump call
+                if (completableFuture instanceof $base_package.$future_package.AbstractFutureJVppInvoker.CompletableDumpFuture) {
+                    completableFuture.complete((($base_package.$future_package.AbstractFutureJVppInvoker.CompletableDumpFuture) completableFuture).getReplyDump());
+                    // Remove future mapped to dump call context id
                     requests.remove((($base_package.$future_package.AbstractFutureJVppInvoker.CompletableDumpFuture) completableFuture).getContextId());
+                } else {
+                    // reply to regular control ping, complete the future
+                    completableFuture.complete(reply);
                 }
+                requests.remove(replyId);
             } else {
+                // future not yet created by writer, create new future, complete it and put to map under ping id
+                completableFuture = new java.util.concurrent.CompletableFuture<>();
                 completableFuture.complete(reply);
-            }
-            synchronized(requests) {
-                requests.remove(replyId);
+                requests.put(replyId, completableFuture);
             }
         }
     }
@@ -93,20 +95,26 @@ jvpp_facade_callback_method_template = Template("""
     @Override
     @SuppressWarnings("unchecked")
     public void on$callback_dto(final $plugin_package.$dto_package.$callback_dto reply) {
-        final java.util.concurrent.CompletableFuture<$base_package.$dto_package.JVppReply<?>> completableFuture;
+        java.util.concurrent.CompletableFuture<$base_package.$dto_package.JVppReply<$plugin_package.$dto_package.$request_dto>> completableFuture;
         final int replyId = reply.context;
         if (LOG.isLoggable(java.util.logging.Level.FINE)) {
             LOG.fine(String.format("Received $callback_dto event message: %s", reply));
         }
         synchronized(requests) {
-            completableFuture = (java.util.concurrent.CompletableFuture<$base_package.$dto_package.JVppReply<?>>) requests.get(replyId);
-        }
+            completableFuture =
+            (java.util.concurrent.CompletableFuture<$base_package.$dto_package.JVppReply<$plugin_package.$dto_package.$request_dto>>) requests.get(replyId);
 
-        if(completableFuture != null) {
-            completableFuture.complete(reply);
-
-            synchronized(requests) {
+            if(completableFuture != null) {
+                // received reply on request, complete future created by sender and remove it from map
+                completableFuture.complete(reply);
                 requests.remove(replyId);
+            } else {
+                // reply received before writer created future,
+                // create new future, complete it and put into map to
+                // notify sender that reply is already received
+                completableFuture = new  java.util.concurrent.CompletableFuture<>();
+                completableFuture.complete(reply);
+                requests.put(replyId, completableFuture);
             }
         }
     }
@@ -126,16 +134,22 @@ jvpp_facade_details_callback_method_template = Template("""
     @Override
     @SuppressWarnings("unchecked")
     public void on$callback_dto(final $plugin_package.$dto_package.$callback_dto reply) {
-        final $base_package.$future_package.AbstractFutureJVppInvoker.CompletableDumpFuture<$plugin_package.$dto_package.$callback_dto_reply_dump> completableFuture;
+        $base_package.$future_package.AbstractFutureJVppInvoker.CompletableDumpFuture<$plugin_package.$dto_package.$callback_dto_reply_dump> completableFuture;
         final int replyId = reply.context;
         if (LOG.isLoggable(java.util.logging.Level.FINE)) {
             LOG.fine(String.format("Received $callback_dto event message: %s", reply));
         }
         synchronized(requests) {
             completableFuture = ($base_package.$future_package.AbstractFutureJVppInvoker.CompletableDumpFuture<$plugin_package.$dto_package.$callback_dto_reply_dump>) requests.get(replyId);
-        }
 
-        if(completableFuture != null) {
+            if(completableFuture == null) {
+                // reply received before writer created future,
+                // create new future, and put into map to notify sender that reply is already received,
+                // following details replies will add information to this future
+                completableFuture = new $base_package.$future_package.AbstractFutureJVppInvoker.CompletableDumpFuture<>(replyId,
+                    new $plugin_package.$dto_package.$callback_dto_reply_dump());
+                requests.put(replyId, completableFuture);
+            }
             completableFuture.getReplyDump().$callback_dto_field.add(reply);
         }
     }
@@ -165,6 +179,7 @@ def generate_jvpp(func_list, base_package, plugin_package, plugin_name, dto_pack
 
         if not util.is_notification(func["name"]):
             camel_case_request_method_name = util.remove_reply_suffix(util.underscore_to_camelcase(func['name']))
+            request_dto = util.remove_reply_suffix(util.underscore_to_camelcase_upper(func['name']))
             if util.is_details(camel_case_name_with_suffix):
                 camel_case_reply_name = get_standard_dump_reply_name(util.underscore_to_camelcase_upper(func['name']),
                                                                      func['name'])
@@ -208,7 +223,8 @@ def generate_jvpp(func_list, base_package, plugin_package, plugin_name, dto_pack
                 callbacks.append(jvpp_facade_callback_method_template.substitute(base_package=base_package,
                                                                                  plugin_package=plugin_package,
                                                                                  dto_package=dto_package,
-                                                                                 callback_dto=camel_case_name_with_suffix))
+                                                                                 callback_dto=camel_case_name_with_suffix,
+                                                                                 request_dto=request_dto))
 
         if util.is_notification(func["name"]):
             callbacks.append(jvpp_facade_callback_notification_method_template.substitute(plugin_package=plugin_package,