From 7008da2667d2876bc58aa5ae57c4a9de48cc756b Mon Sep 17 00:00:00 2001 From: Matej Date: Mon, 23 Oct 2017 12:55:11 +0200 Subject: [PATCH] jvpp: bugfix for deadlock in java future API (VPP-1037) - message sending method inside synchronization blocks causes deadlock between sending and receiving part - breaking atomicity of sending message and putting future with corresponding id to map needs additional handling by writer and receiver, regardless which part get access to sync block first will create new future and second one will complete it and remove from map, in case of dump calls where control ping reply is required as confirmation that all information were send, if ping reply is received before writer put future in map, reader will create regular control ping future instead and writer needs to made association between these two futures Change-Id: Id29a19be7a5319291a5e07cf931080610178f00c Signed-off-by: Matej --- .../io/fd/vpp/jvpp/JVppRegistryImpl.java | 49 ++++++-- .../vpp/jvpp/future/AbstractFutureJVppInvoker.java | 123 ++++++++++++++------- .../jvpp/gen/jvppgen/jvpp_future_facade_gen.py | 60 ++++++---- 3 files changed, 160 insertions(+), 72 deletions(-) diff --git a/src/vpp-api/java/jvpp-registry/io/fd/vpp/jvpp/JVppRegistryImpl.java b/src/vpp-api/java/jvpp-registry/io/fd/vpp/jvpp/JVppRegistryImpl.java index 6e938ae313f..baef14c3865 100644 --- a/src/vpp-api/java/jvpp-registry/io/fd/vpp/jvpp/JVppRegistryImpl.java +++ b/src/vpp-api/java/jvpp-registry/io/fd/vpp/jvpp/JVppRegistryImpl.java @@ -99,15 +99,24 @@ public final class JVppRegistryImpl implements JVppRegistry, ControlPingCallback final ControlPingCallback callback = (ControlPingCallback) pluginRegistry.get(clazz.getName()); assertPluginWasRegistered(name, callback); + // controlPing0 is sending function and can go to waiting in case of e. g. full queue + // because of that it cant be in same synchronization block as used by reply handler function + int context = controlPing0(); + if (context < 0) { + throw new VppInvocationException("controlPing", context); + } + synchronized (pingCalls) { - int context = controlPing0(); - if (context < 0) { - throw new VppInvocationException("controlPing", context); + // if callback is in map it's because reply was already received + EarlyControlPingReply earlyReplyCallback = (EarlyControlPingReply) pingCalls.remove(context); + if(earlyReplyCallback == null) { + pingCalls.put(context, callback); + } else { + callback.onControlPingReply(earlyReplyCallback.getReply()); } - - pingCalls.put(context, callback); - return context; } + + return context; } @Override @@ -116,8 +125,9 @@ public final class JVppRegistryImpl implements JVppRegistry, ControlPingCallback synchronized (pingCalls) { callback = pingCalls.remove(reply.context); if (callback == null) { - LOG.log(Level.WARNING, "No callback was registered for reply context=" + reply.context + " Contexts waiting=" - + pingCalls.keySet()); + // reply received early, because we don't know callback to call + // we wrap the reply and let the sender to call it + pingCalls.put(reply.context, new EarlyControlPingReply(reply)); return; } } @@ -151,4 +161,27 @@ public final class JVppRegistryImpl implements JVppRegistry, ControlPingCallback public void close() throws Exception { connection.close(); } + + private static class EarlyControlPingReply implements ControlPingCallback { + + private final ControlPingReply reply; + + public EarlyControlPingReply(final ControlPingReply reply) { + this.reply = reply; + } + + public ControlPingReply getReply() { + return reply; + } + + @Override + public void onError(VppCallbackException ex) { + throw new IllegalStateException("Calling onError in EarlyControlPingReply"); + } + + @Override + public void onControlPingReply(ControlPingReply reply) { + throw new IllegalStateException("Calling onControlPingReply in EarlyControlPingReply"); + } + } } diff --git a/src/vpp-api/java/jvpp-registry/io/fd/vpp/jvpp/future/AbstractFutureJVppInvoker.java b/src/vpp-api/java/jvpp-registry/io/fd/vpp/jvpp/future/AbstractFutureJVppInvoker.java index e7df528ae30..ac85f5309ec 100644 --- a/src/vpp-api/java/jvpp-registry/io/fd/vpp/jvpp/future/AbstractFutureJVppInvoker.java +++ b/src/vpp-api/java/jvpp-registry/io/fd/vpp/jvpp/future/AbstractFutureJVppInvoker.java @@ -21,6 +21,7 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; + import io.fd.vpp.jvpp.JVpp; import io.fd.vpp.jvpp.JVppRegistry; import io.fd.vpp.jvpp.VppInvocationException; @@ -62,27 +63,41 @@ public abstract class AbstractFutureJVppInvoker implements FutureJVppInvoker { @Override @SuppressWarnings("unchecked") public > CompletionStage send(REQ req) { - synchronized(requests) { - try { - final CompletableFuture replyCompletableFuture; - final int contextId = jvpp.send(req); + try { + // jvpp.send() can go to waiting state if sending queue is full, putting it into same + // synchronization block as used by receiving part (synchronized(requests)) can lead + // to deadlock between these two sides or at least slowing sending process by slow + // reader + final CompletableFuture replyCompletableFuture; + final int contextId = jvpp.send(req); + + if(req instanceof JVppDump) { + throw new IllegalArgumentException("Send with empty reply dump has to be used in case of dump calls"); + } - if(req instanceof JVppDump) { - throw new IllegalArgumentException("Send with empty reply dump has to be used in case of dump calls"); + synchronized(requests) { + CompletableFuture> replyFuture = requests.get(contextId); + if (replyFuture == null) { + // reply not yet received, put new future into map + replyCompletableFuture = new CompletableFuture<>(); + requests.put(contextId, replyCompletableFuture); + } else { + // reply already received (should be completed by reader), + // remove future from map and return it to caller + replyCompletableFuture = (CompletableFuture) replyFuture; + requests.remove(contextId); } - replyCompletableFuture = new CompletableFuture<>(); - requests.put(contextId, replyCompletableFuture); - - // TODO in case of timeouts/missing replies, requests from the map are not removed - // consider adding cancel method, that would remove requests from the map and cancel - // associated replyCompletableFuture - - return replyCompletableFuture; - } catch (VppInvocationException ex) { - final CompletableFuture replyCompletableFuture = new CompletableFuture<>(); - replyCompletableFuture.completeExceptionally(ex); - return replyCompletableFuture; } + + // TODO in case of timeouts/missing replies, requests from the map are not removed + // consider adding cancel method, that would remove requests from the map and cancel + // associated replyCompletableFuture + + return replyCompletableFuture; + } catch (VppInvocationException ex) { + final CompletableFuture replyCompletableFuture = new CompletableFuture<>(); + replyCompletableFuture.completeExceptionally(ex); + return replyCompletableFuture; } } @@ -90,30 +105,54 @@ public abstract class AbstractFutureJVppInvoker implements FutureJVppInvoker { @SuppressWarnings("unchecked") public , DUMP extends JVppReplyDump> CompletionStage send( REQ req, DUMP emptyReplyDump) { - synchronized(requests) { - try { - final CompletableDumpFuture replyCompletableFuture; - final int contextId = jvpp.send(req); - - if(!(req instanceof JVppDump)) { - throw new IllegalArgumentException("Send without empty reply dump has to be used in case of regular calls"); - } - replyCompletableFuture = new CompletableDumpFuture<>(contextId, emptyReplyDump); - - requests.put(contextId, replyCompletableFuture); - requests.put(registry.controlPing(jvpp.getClass()), replyCompletableFuture); - - // TODO in case of timeouts/missing replies, requests from the map are not removed - // consider adding cancel method, that would remove requests from the map and cancel - // associated replyCompletableFuture - - return replyCompletableFuture; - } catch (VppInvocationException ex) { - final CompletableFuture replyCompletableFuture = new CompletableFuture<>(); - replyCompletableFuture.completeExceptionally(ex); - return replyCompletableFuture; - } - } + try { + // jvpp.send() and registry.controlPing() can go to waiting state if sending queue is full, + // putting it into same synchronization block as used by receiving part (synchronized(requests)) + // can lead to deadlock between these two sides or at least slowing sending process by slow reader + final CompletableDumpFuture replyDumpFuture; + final int contextId = jvpp.send(req); + + if(!(req instanceof JVppDump)) { + throw new IllegalArgumentException("Send without empty reply dump has to be used in case of regular calls"); + } + + synchronized(requests) { + CompletableFuture> replyFuture = requests.get(contextId); + if (replyFuture == null) { + // reply not received yet, put new future to map + replyDumpFuture = new CompletableDumpFuture<>(contextId, emptyReplyDump); + requests.put(contextId, replyDumpFuture); + } else { + // reply already received, save existing future + replyDumpFuture = (CompletableDumpFuture) replyFuture; + } + } + + final int pingId = registry.controlPing(jvpp.getClass()); + + synchronized(requests) { + if (requests.remove(pingId) == null) { + // reply not received yet, put future into map under pingId + requests.put(pingId, replyDumpFuture); + } else { + // reply already received, complete future + // ping reply couldn't complete the future because it is not in map under + // ping id + replyDumpFuture.complete(replyDumpFuture.getReplyDump()); + requests.remove(contextId); + } + } + + // TODO in case of timeouts/missing replies, requests from the map are not removed + // consider adding cancel method, that would remove requests from the map and cancel + // associated replyCompletableFuture + + return replyDumpFuture; + } catch (VppInvocationException ex) { + final CompletableFuture replyCompletableFuture = new CompletableFuture<>(); + replyCompletableFuture.completeExceptionally(ex); + return replyCompletableFuture; + } } public static final class CompletableDumpFuture> extends CompletableFuture { diff --git a/src/vpp-api/java/jvpp/gen/jvppgen/jvpp_future_facade_gen.py b/src/vpp-api/java/jvpp/gen/jvppgen/jvpp_future_facade_gen.py index a31287b3333..a45a532d7f2 100644 --- a/src/vpp-api/java/jvpp/gen/jvppgen/jvpp_future_facade_gen.py +++ b/src/vpp-api/java/jvpp/gen/jvppgen/jvpp_future_facade_gen.py @@ -61,26 +61,28 @@ public final class FutureJVpp${plugin_name}FacadeCallback implements $plugin_pac @Override @SuppressWarnings("unchecked") public void onControlPingReply(final $base_package.$dto_package.ControlPingReply reply) { - final java.util.concurrent.CompletableFuture<$base_package.$dto_package.JVppReply> completableFuture; + java.util.concurrent.CompletableFuture<$base_package.$dto_package.JVppReply> completableFuture; final int replyId = reply.context; synchronized(requests) { completableFuture = (java.util.concurrent.CompletableFuture<$base_package.$dto_package.JVppReply>) requests.get(replyId); - } - if(completableFuture != null) { - // Finish dump call - if (completableFuture instanceof $base_package.$future_package.AbstractFutureJVppInvoker.CompletableDumpFuture) { - completableFuture.complete((($base_package.$future_package.AbstractFutureJVppInvoker.CompletableDumpFuture) completableFuture).getReplyDump()); - // Remove future mapped to dump call context id - synchronized(requests) { + if(completableFuture != null) { + // Finish dump call + if (completableFuture instanceof $base_package.$future_package.AbstractFutureJVppInvoker.CompletableDumpFuture) { + completableFuture.complete((($base_package.$future_package.AbstractFutureJVppInvoker.CompletableDumpFuture) completableFuture).getReplyDump()); + // Remove future mapped to dump call context id requests.remove((($base_package.$future_package.AbstractFutureJVppInvoker.CompletableDumpFuture) completableFuture).getContextId()); + } else { + // reply to regular control ping, complete the future + completableFuture.complete(reply); } + requests.remove(replyId); } else { + // future not yet created by writer, create new future, complete it and put to map under ping id + completableFuture = new java.util.concurrent.CompletableFuture<>(); completableFuture.complete(reply); - } - synchronized(requests) { - requests.remove(replyId); + requests.put(replyId, completableFuture); } } } @@ -93,20 +95,26 @@ jvpp_facade_callback_method_template = Template(""" @Override @SuppressWarnings("unchecked") public void on$callback_dto(final $plugin_package.$dto_package.$callback_dto reply) { - final java.util.concurrent.CompletableFuture<$base_package.$dto_package.JVppReply> completableFuture; + java.util.concurrent.CompletableFuture<$base_package.$dto_package.JVppReply<$plugin_package.$dto_package.$request_dto>> completableFuture; final int replyId = reply.context; if (LOG.isLoggable(java.util.logging.Level.FINE)) { LOG.fine(String.format("Received $callback_dto event message: %s", reply)); } synchronized(requests) { - completableFuture = (java.util.concurrent.CompletableFuture<$base_package.$dto_package.JVppReply>) requests.get(replyId); - } + completableFuture = + (java.util.concurrent.CompletableFuture<$base_package.$dto_package.JVppReply<$plugin_package.$dto_package.$request_dto>>) requests.get(replyId); - if(completableFuture != null) { - completableFuture.complete(reply); - - synchronized(requests) { + if(completableFuture != null) { + // received reply on request, complete future created by sender and remove it from map + completableFuture.complete(reply); requests.remove(replyId); + } else { + // reply received before writer created future, + // create new future, complete it and put into map to + // notify sender that reply is already received + completableFuture = new java.util.concurrent.CompletableFuture<>(); + completableFuture.complete(reply); + requests.put(replyId, completableFuture); } } } @@ -126,16 +134,22 @@ jvpp_facade_details_callback_method_template = Template(""" @Override @SuppressWarnings("unchecked") public void on$callback_dto(final $plugin_package.$dto_package.$callback_dto reply) { - final $base_package.$future_package.AbstractFutureJVppInvoker.CompletableDumpFuture<$plugin_package.$dto_package.$callback_dto_reply_dump> completableFuture; + $base_package.$future_package.AbstractFutureJVppInvoker.CompletableDumpFuture<$plugin_package.$dto_package.$callback_dto_reply_dump> completableFuture; final int replyId = reply.context; if (LOG.isLoggable(java.util.logging.Level.FINE)) { LOG.fine(String.format("Received $callback_dto event message: %s", reply)); } synchronized(requests) { completableFuture = ($base_package.$future_package.AbstractFutureJVppInvoker.CompletableDumpFuture<$plugin_package.$dto_package.$callback_dto_reply_dump>) requests.get(replyId); - } - if(completableFuture != null) { + if(completableFuture == null) { + // reply received before writer created future, + // create new future, and put into map to notify sender that reply is already received, + // following details replies will add information to this future + completableFuture = new $base_package.$future_package.AbstractFutureJVppInvoker.CompletableDumpFuture<>(replyId, + new $plugin_package.$dto_package.$callback_dto_reply_dump()); + requests.put(replyId, completableFuture); + } completableFuture.getReplyDump().$callback_dto_field.add(reply); } } @@ -165,6 +179,7 @@ def generate_jvpp(func_list, base_package, plugin_package, plugin_name, dto_pack if not util.is_notification(func["name"]): camel_case_request_method_name = util.remove_reply_suffix(util.underscore_to_camelcase(func['name'])) + request_dto = util.remove_reply_suffix(util.underscore_to_camelcase_upper(func['name'])) if util.is_details(camel_case_name_with_suffix): camel_case_reply_name = get_standard_dump_reply_name(util.underscore_to_camelcase_upper(func['name']), func['name']) @@ -208,7 +223,8 @@ def generate_jvpp(func_list, base_package, plugin_package, plugin_name, dto_pack callbacks.append(jvpp_facade_callback_method_template.substitute(base_package=base_package, plugin_package=plugin_package, dto_package=dto_package, - callback_dto=camel_case_name_with_suffix)) + callback_dto=camel_case_name_with_suffix, + request_dto=request_dto)) if util.is_notification(func["name"]): callbacks.append(jvpp_facade_callback_notification_method_template.substitute(plugin_package=plugin_package, -- 2.16.6