jvpp: bugfix for deadlock in java future API (VPP-1037)
[vpp.git] / src / vpp-api / java / jvpp-registry / io / fd / vpp / jvpp / future / AbstractFutureJVppInvoker.java
index e7df528..ac85f53 100644 (file)
@@ -21,6 +21,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
+
 import io.fd.vpp.jvpp.JVpp;
 import io.fd.vpp.jvpp.JVppRegistry;
 import io.fd.vpp.jvpp.VppInvocationException;
@@ -62,27 +63,41 @@ public abstract class AbstractFutureJVppInvoker implements FutureJVppInvoker {
     @Override
     @SuppressWarnings("unchecked")
     public <REQ extends JVppRequest, REPLY extends JVppReply<REQ>> CompletionStage<REPLY> send(REQ req) {
-        synchronized(requests) {
-            try {
-                final CompletableFuture<REPLY> replyCompletableFuture;
-                final int contextId = jvpp.send(req);
+        try {
+            // jvpp.send() can go to waiting state if sending queue is full, putting it into same
+            // synchronization block as used by receiving part (synchronized(requests)) can lead
+            // to deadlock between these two sides or at least slowing sending process by slow
+            // reader
+            final CompletableFuture<REPLY> replyCompletableFuture;
+            final int contextId = jvpp.send(req);
+
+            if(req instanceof JVppDump) {
+                throw new IllegalArgumentException("Send with empty reply dump has to be used in case of dump calls");
+            }
 
-                if(req instanceof JVppDump) {
-                    throw new IllegalArgumentException("Send with empty reply dump has to be used in case of dump calls");
+            synchronized(requests) {
+                CompletableFuture<? extends JVppReply<?>> replyFuture = requests.get(contextId);
+                if (replyFuture == null) {
+                    // reply not yet received, put new future into map
+                    replyCompletableFuture = new CompletableFuture<>();
+                    requests.put(contextId, replyCompletableFuture);
+                } else {
+                    // reply already received (should be completed by reader),
+                    // remove future from map and return it to caller
+                    replyCompletableFuture = (CompletableFuture<REPLY>) replyFuture;
+                    requests.remove(contextId);
                 }
-                replyCompletableFuture = new CompletableFuture<>();
-                requests.put(contextId, replyCompletableFuture);
-
-                // TODO in case of timeouts/missing replies, requests from the map are not removed
-                // consider adding cancel method, that would remove requests from the map and cancel
-                // associated replyCompletableFuture
-
-                return replyCompletableFuture;
-            } catch (VppInvocationException ex) {
-                final CompletableFuture<REPLY> replyCompletableFuture = new CompletableFuture<>();
-                replyCompletableFuture.completeExceptionally(ex);
-                return replyCompletableFuture;
             }
+
+            // TODO in case of timeouts/missing replies, requests from the map are not removed
+            // consider adding cancel method, that would remove requests from the map and cancel
+            // associated replyCompletableFuture
+
+            return replyCompletableFuture;
+        } catch (VppInvocationException ex) {
+            final CompletableFuture<REPLY> replyCompletableFuture = new CompletableFuture<>();
+            replyCompletableFuture.completeExceptionally(ex);
+            return replyCompletableFuture;
         }
     }
 
@@ -90,30 +105,54 @@ public abstract class AbstractFutureJVppInvoker implements FutureJVppInvoker {
     @SuppressWarnings("unchecked")
     public <REQ extends JVppRequest, REPLY extends JVppReply<REQ>, DUMP extends JVppReplyDump<REQ, REPLY>> CompletionStage<DUMP> send(
             REQ req, DUMP emptyReplyDump) {
-        synchronized(requests) {
-            try {
-                final CompletableDumpFuture<DUMP> replyCompletableFuture;
-                final int contextId = jvpp.send(req);
-
-                if(!(req instanceof JVppDump)) {
-                    throw new IllegalArgumentException("Send without empty reply dump has to be used in case of regular calls");
-                }
-                replyCompletableFuture = new CompletableDumpFuture<>(contextId, emptyReplyDump);
-
-                requests.put(contextId, replyCompletableFuture);
-                requests.put(registry.controlPing(jvpp.getClass()), replyCompletableFuture);
-
-                // TODO in case of timeouts/missing replies, requests from the map are not removed
-                // consider adding cancel method, that would remove requests from the map and cancel
-                // associated replyCompletableFuture
-
-                return replyCompletableFuture;
-            } catch (VppInvocationException ex) {
-                final CompletableFuture<DUMP> replyCompletableFuture = new CompletableFuture<>();
-                replyCompletableFuture.completeExceptionally(ex);
-                return replyCompletableFuture;
-            }
-        }
+      try {
+          // jvpp.send() and registry.controlPing() can go to waiting state if sending queue is full,
+          // putting it into same synchronization block as used by receiving part (synchronized(requests))
+          // can lead to deadlock between these two sides or at least slowing sending process by slow reader
+          final CompletableDumpFuture<DUMP> replyDumpFuture;
+          final int contextId = jvpp.send(req);
+
+          if(!(req instanceof JVppDump)) {
+              throw new IllegalArgumentException("Send without empty reply dump has to be used in case of regular calls");
+          }
+
+          synchronized(requests) {
+              CompletableFuture<? extends JVppReply<?>> replyFuture = requests.get(contextId);
+              if (replyFuture == null) {
+                  // reply not received yet, put new future to map
+                  replyDumpFuture = new CompletableDumpFuture<>(contextId, emptyReplyDump);
+                  requests.put(contextId, replyDumpFuture);
+              } else {
+                  // reply already received, save existing future
+                  replyDumpFuture = (CompletableDumpFuture<DUMP>) replyFuture;
+              }
+          }
+
+          final int pingId = registry.controlPing(jvpp.getClass());
+
+          synchronized(requests) {
+              if (requests.remove(pingId) == null) {
+                  // reply not received yet, put future into map under pingId
+                  requests.put(pingId, replyDumpFuture);
+              } else {
+                  // reply already received, complete future
+                  // ping reply couldn't complete the future because it is not in map under
+                  // ping id
+                  replyDumpFuture.complete(replyDumpFuture.getReplyDump());
+                  requests.remove(contextId);
+              }
+          }
+
+          // TODO in case of timeouts/missing replies, requests from the map are not removed
+          // consider adding cancel method, that would remove requests from the map and cancel
+          // associated replyCompletableFuture
+
+          return replyDumpFuture;
+      } catch (VppInvocationException ex) {
+          final CompletableFuture<DUMP> replyCompletableFuture = new CompletableFuture<>();
+          replyCompletableFuture.completeExceptionally(ex);
+          return replyCompletableFuture;
+      }
     }
 
     public static final class CompletableDumpFuture<T extends JVppReplyDump<?, ?>> extends CompletableFuture<T> {