2 * Copyright (c) 2016 Cisco and/or its affiliates.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at:
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
17 package io.fd.honeycomb.v3po.translate.util.write.registry;
19 import static com.google.common.base.Preconditions.checkArgument;
20 import static com.google.common.base.Preconditions.checkNotNull;
22 import com.google.common.base.Optional;
23 import com.google.common.collect.HashMultimap;
24 import com.google.common.collect.ImmutableMap;
25 import com.google.common.collect.ImmutableMultimap;
26 import com.google.common.collect.ImmutableSet;
27 import com.google.common.collect.Lists;
28 import com.google.common.collect.Multimap;
29 import com.google.common.collect.Sets;
30 import io.fd.honeycomb.v3po.translate.TranslationException;
31 import io.fd.honeycomb.v3po.translate.util.RWUtils;
32 import io.fd.honeycomb.v3po.translate.write.DataObjectUpdate;
33 import io.fd.honeycomb.v3po.translate.write.WriteContext;
34 import io.fd.honeycomb.v3po.translate.write.WriteFailedException;
35 import io.fd.honeycomb.v3po.translate.write.Writer;
36 import io.fd.honeycomb.v3po.translate.write.WriterRegistry;
37 import java.util.Collection;
38 import java.util.Collections;
39 import java.util.HashSet;
42 import java.util.stream.Collectors;
43 import javax.annotation.Nonnull;
44 import javax.annotation.Nullable;
45 import javax.annotation.concurrent.ThreadSafe;
46 import org.opendaylight.yangtools.yang.binding.DataObject;
47 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
48 import org.slf4j.Logger;
49 import org.slf4j.LoggerFactory;
52 * Flat writer registry, delegating updates to writers in the order writers were submitted.
55 final class FlatWriterRegistry implements WriterRegistry {
57 private static final Logger LOG = LoggerFactory.getLogger(FlatWriterRegistry.class);
59 // All types handled by writers directly or as children
60 private final ImmutableSet<InstanceIdentifier<?>> handledTypes;
62 private final Set<InstanceIdentifier<?>> writersOrderReversed;
63 private final Set<InstanceIdentifier<?>> writersOrder;
64 private final Map<InstanceIdentifier<?>, Writer<?>> writers;
67 * Create flat registry instance.
69 * @param writers immutable, ordered map of writers to use to process updates. Order of the writers has to be
70 * one in which create and update operations should be handled. Deletes will be handled in reversed
71 * order. All deletes are handled before handling all the updates.
73 FlatWriterRegistry(@Nonnull final ImmutableMap<InstanceIdentifier<?>, Writer<?>> writers) {
74 this.writers = writers;
75 this.writersOrderReversed = Sets.newLinkedHashSet(Lists.reverse(Lists.newArrayList(writers.keySet())));
76 this.writersOrder = writers.keySet();
77 this.handledTypes = getAllHandledTypes(writers);
80 private static ImmutableSet<InstanceIdentifier<?>> getAllHandledTypes(
81 @Nonnull final ImmutableMap<InstanceIdentifier<?>, Writer<?>> writers) {
82 final ImmutableSet.Builder<InstanceIdentifier<?>> handledTypesBuilder = ImmutableSet.builder();
83 for (Map.Entry<InstanceIdentifier<?>, Writer<?>> writerEntry : writers.entrySet()) {
84 final InstanceIdentifier<?> writerType = writerEntry.getKey();
85 final Writer<?> writer = writerEntry.getValue();
86 handledTypesBuilder.add(writerType);
87 if (writer instanceof SubtreeWriter) {
88 handledTypesBuilder.addAll(((SubtreeWriter<?>) writer).getHandledChildTypes());
91 return handledTypesBuilder.build();
95 public void update(@Nonnull final InstanceIdentifier<? extends DataObject> id,
96 @Nullable final DataObject dataBefore,
97 @Nullable final DataObject dataAfter,
98 @Nonnull final WriteContext ctx) throws WriteFailedException {
99 singleUpdate(ImmutableMultimap.of(
100 RWUtils.makeIidWildcarded(id), DataObjectUpdate.create(id, dataBefore, dataAfter)), ctx);
104 public void update(@Nonnull final DataObjectUpdates updates,
105 @Nonnull final WriteContext ctx) throws TranslationException {
106 if (updates.isEmpty()) {
111 if (updates.containsOnlySingleType()) {
112 // First process delete
113 singleUpdate(updates.getDeletes(), ctx);
115 singleUpdate(updates.getUpdates(), ctx);
117 // First process deletes
118 bulkUpdate(updates.getDeletes(), ctx, true, writersOrderReversed);
120 bulkUpdate(updates.getUpdates(), ctx, true, writersOrder);
123 LOG.debug("Update successful for types: {}", updates.getTypeIntersection());
124 LOG.trace("Update successful for: {}", updates);
127 private void singleUpdate(@Nonnull final Multimap<InstanceIdentifier<?>, ? extends DataObjectUpdate> updates,
128 @Nonnull final WriteContext ctx) throws WriteFailedException {
129 if (updates.isEmpty()) {
133 final InstanceIdentifier<?> singleType = updates.keySet().iterator().next();
134 LOG.debug("Performing single type update for: {}", singleType);
135 Collection<? extends DataObjectUpdate> singleTypeUpdates = updates.get(singleType);
136 Writer<?> writer = getWriter(singleType);
138 if (writer == null) {
139 // This node must be handled by a subtree writer, find it and call it or else fail
140 checkArgument(handledTypes.contains(singleType), "Unable to process update. Missing writers for: %s",
142 writer = getSubtreeWriterResponsible(singleType);
143 singleTypeUpdates = getParentDataObjectUpdate(ctx, updates, writer);
146 LOG.trace("Performing single type update with writer: {}", writer);
147 for (DataObjectUpdate singleUpdate : singleTypeUpdates) {
148 writer.update(singleUpdate.getId(), singleUpdate.getDataBefore(), singleUpdate.getDataAfter(), ctx);
152 private Writer<?> getSubtreeWriterResponsible(final InstanceIdentifier<?> singleType) {
153 final Writer<?> writer;// This is slow ( minor TODO-perf )
154 writer = writers.values().stream()
155 .filter(w -> w instanceof SubtreeWriter)
156 .filter(w -> ((SubtreeWriter<?>) w).getHandledChildTypes().contains(singleType))
162 private Collection<DataObjectUpdate> getParentDataObjectUpdate(final WriteContext ctx,
163 final Multimap<InstanceIdentifier<?>, ? extends DataObjectUpdate> updates,
164 final Writer<?> writer) {
165 // Now read data for subtree reader root, but first keyed ID is needed and that ID can be cut from updates
166 InstanceIdentifier<?> firstAffectedChildId = ((SubtreeWriter<?>) writer).getHandledChildTypes().stream()
167 .filter(updates::containsKey)
168 .map(unkeyedId -> updates.get(unkeyedId))
169 .flatMap(doUpdates -> doUpdates.stream())
170 .map(DataObjectUpdate::getId)
174 final InstanceIdentifier<?> parentKeyedId =
175 RWUtils.cutId(firstAffectedChildId, writer.getManagedDataObjectType());
177 final Optional<? extends DataObject> parentBefore = ctx.readBefore(parentKeyedId);
178 final Optional<? extends DataObject> parentAfter = ctx.readAfter(parentKeyedId);
179 return Collections.singleton(
180 DataObjectUpdate.create(parentKeyedId, parentBefore.orNull(), parentAfter.orNull()));
183 private void bulkUpdate(@Nonnull final Multimap<InstanceIdentifier<?>, ? extends DataObjectUpdate> updates,
184 @Nonnull final WriteContext ctx,
185 final boolean attemptRevert,
186 @Nonnull final Set<InstanceIdentifier<?>> writersOrder) throws BulkUpdateException {
187 if (updates.isEmpty()) {
191 LOG.debug("Performing bulk update with revert attempt: {} for: {}", attemptRevert, updates.keySet());
193 // Check that all updates can be handled
194 checkAllTypesCanBeHandled(updates);
196 // Capture all changes successfully processed in case revert is needed
197 final Set<InstanceIdentifier<?>> processedNodes = new HashSet<>();
199 // Iterate over all writers and call update if there are any related updates
200 for (InstanceIdentifier<?> writerType : writersOrder) {
201 Collection<? extends DataObjectUpdate> writersData = updates.get(writerType);
202 final Writer<?> writer = getWriter(writerType);
204 if (writersData.isEmpty()) {
205 // If there are no data for current writer, but it is a SubtreeWriter and there are updates to
206 // its children, still invoke it with its root data
207 if (writer instanceof SubtreeWriter<?> && isAffected(((SubtreeWriter<?>) writer), updates)) {
208 // Provide parent data for SubtreeWriter for further processing
209 writersData = getParentDataObjectUpdate(ctx, updates, writer);
211 // Skipping unaffected writer
212 // Alternative to this would be modification sort according to the order of writers
217 LOG.debug("Performing update for: {}", writerType);
218 LOG.trace("Performing update with writer: {}", writer);
220 for (DataObjectUpdate singleUpdate : writersData) {
222 writer.update(singleUpdate.getId(), singleUpdate.getDataBefore(), singleUpdate.getDataAfter(), ctx);
223 processedNodes.add(singleUpdate.getId());
224 LOG.trace("Update successful for type: {}", writerType);
225 LOG.debug("Update successful for: {}", singleUpdate);
226 } catch (Exception e) {
227 LOG.error("Error while processing data change of: {} (updates={})", writerType, writersData, e);
229 final Reverter reverter = attemptRevert
230 ? new ReverterImpl(processedNodes, updates, writersOrder, ctx)
231 : () -> {}; // NOOP reverter
233 // Find out which changes left unprocessed
234 final Set<InstanceIdentifier<?>> unprocessedChanges = updates.values().stream()
235 .map(DataObjectUpdate::getId)
236 .filter(id -> !processedNodes.contains(id))
237 .collect(Collectors.toSet());
238 throw new BulkUpdateException(unprocessedChanges, reverter, e);
244 private void checkAllTypesCanBeHandled(
245 @Nonnull final Multimap<InstanceIdentifier<?>, ? extends DataObjectUpdate> updates) {
246 if (!handledTypes.containsAll(updates.keySet())) {
247 final Sets.SetView<InstanceIdentifier<?>> missingWriters = Sets.difference(updates.keySet(), handledTypes);
248 LOG.warn("Unable to process update. Missing writers for: {}", missingWriters);
249 throw new IllegalArgumentException("Unable to process update. Missing writers for: " + missingWriters);
254 * Check whether {@link SubtreeWriter} is affected by the updates.
256 * @return true if there are any updates to SubtreeWriter's child nodes (those marked by SubtreeWriter
257 * as being taken care of)
259 private static boolean isAffected(final SubtreeWriter<?> writer,
260 final Multimap<InstanceIdentifier<?>, ? extends DataObjectUpdate> updates) {
261 return !Sets.intersection(writer.getHandledChildTypes(), updates.keySet()).isEmpty();
264 private Writer<?> getWriter(@Nonnull final InstanceIdentifier<?> singleType) {
265 final Writer<?> writer = writers.get(singleType);
267 "Unable to write %s. Missing writer. Current writers for: %s", singleType, writers.keySet());
273 public InstanceIdentifier<DataObject> getManagedDataObjectType() {
274 throw new UnsupportedOperationException("Registry has no managed type");
278 private final class ReverterImpl implements Reverter {
280 private final Collection<InstanceIdentifier<?>> processedNodes;
281 private final Multimap<InstanceIdentifier<?>, ? extends DataObjectUpdate> updates;
282 private final Set<InstanceIdentifier<?>> revertDeleteOrder;
283 private final WriteContext ctx;
285 ReverterImpl(final Collection<InstanceIdentifier<?>> processedNodes,
286 final Multimap<InstanceIdentifier<?>, ? extends DataObjectUpdate> updates,
287 final Set<InstanceIdentifier<?>> writersOrderOriginal,
288 final WriteContext ctx) {
289 this.processedNodes = processedNodes;
290 this.updates = updates;
291 // Use opposite ordering when executing revert
292 this.revertDeleteOrder = writersOrderOriginal == FlatWriterRegistry.this.writersOrder
293 ? FlatWriterRegistry.this.writersOrderReversed
294 : FlatWriterRegistry.this.writersOrder;
299 public void revert() throws RevertFailedException {
300 Multimap<InstanceIdentifier<?>, DataObjectUpdate> updatesToRevert =
301 filterAndRevertProcessed(updates, processedNodes);
303 LOG.info("Attempting revert for changes: {}", updatesToRevert);
305 // Perform reversed bulk update without revert attempt
306 bulkUpdate(updatesToRevert, ctx, true, revertDeleteOrder);
307 LOG.info("Revert successful");
308 } catch (BulkUpdateException e) {
309 LOG.error("Revert failed", e);
310 throw new RevertFailedException(e.getFailedIds(), e);
315 * Create new updates map, but only keep already processed changes. Switching before and after data for each
318 private Multimap<InstanceIdentifier<?>, DataObjectUpdate> filterAndRevertProcessed(
319 final Multimap<InstanceIdentifier<?>, ? extends DataObjectUpdate> updates,
320 final Collection<InstanceIdentifier<?>> processedNodes) {
321 final Multimap<InstanceIdentifier<?>, DataObjectUpdate> filtered = HashMultimap.create();
322 for (InstanceIdentifier<?> processedNode : processedNodes) {
323 final InstanceIdentifier<?> wildcardedIid = RWUtils.makeIidWildcarded(processedNode);
324 if (updates.containsKey(wildcardedIid)) {
325 updates.get(wildcardedIid).stream()
326 .filter(dataObjectUpdate -> processedNode.contains(dataObjectUpdate.getId()))
327 .forEach(dataObjectUpdate -> filtered.put(processedNode, dataObjectUpdate.reverse()));