HONEYCOMB-130: Separate v3po plugin from HC infra
[honeycomb.git] / infra / translate-utils / src / main / java / io / fd / honeycomb / v3po / translate / util / write / registry / FlatWriterRegistry.java
1 /*
2  * Copyright (c) 2016 Cisco and/or its affiliates.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at:
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package io.fd.honeycomb.v3po.translate.util.write.registry;
18
19 import static com.google.common.base.Preconditions.checkArgument;
20
21 import com.google.common.base.Optional;
22 import com.google.common.collect.HashMultimap;
23 import com.google.common.collect.ImmutableMap;
24 import com.google.common.collect.ImmutableSet;
25 import com.google.common.collect.Lists;
26 import com.google.common.collect.Multimap;
27 import com.google.common.collect.Sets;
28 import io.fd.honeycomb.v3po.translate.TranslationException;
29 import io.fd.honeycomb.v3po.translate.util.RWUtils;
30 import io.fd.honeycomb.v3po.translate.write.DataObjectUpdate;
31 import io.fd.honeycomb.v3po.translate.write.WriteContext;
32 import io.fd.honeycomb.v3po.translate.write.WriteFailedException;
33 import io.fd.honeycomb.v3po.translate.write.Writer;
34 import io.fd.honeycomb.v3po.translate.write.registry.WriterRegistry;
35 import java.util.Collection;
36 import java.util.Collections;
37 import java.util.HashSet;
38 import java.util.Map;
39 import java.util.Set;
40 import java.util.stream.Collectors;
41 import javax.annotation.Nonnull;
42 import javax.annotation.Nullable;
43 import javax.annotation.concurrent.ThreadSafe;
44 import org.opendaylight.yangtools.yang.binding.DataObject;
45 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48
49 /**
50  * Flat writer registry, delegating updates to writers in the order writers were submitted.
51  */
52 @ThreadSafe
53 final class FlatWriterRegistry implements WriterRegistry {
54
55     private static final Logger LOG = LoggerFactory.getLogger(FlatWriterRegistry.class);
56
57     // All types handled by writers directly or as children
58     private final ImmutableSet<InstanceIdentifier<?>> handledTypes;
59
60     private final Set<InstanceIdentifier<?>> writersOrderReversed;
61     private final Set<InstanceIdentifier<?>> writersOrder;
62     private final Map<InstanceIdentifier<?>, Writer<?>> writers;
63
64     /**
65      * Create flat registry instance.
66      *
67      * @param writers immutable, ordered map of writers to use to process updates. Order of the writers has to be
68      *                one in which create and update operations should be handled. Deletes will be handled in reversed
69      *                order. All deletes are handled before handling all the updates.
70      */
71     FlatWriterRegistry(@Nonnull final ImmutableMap<InstanceIdentifier<?>, Writer<?>> writers) {
72         this.writers = writers;
73         this.writersOrderReversed = Sets.newLinkedHashSet(Lists.reverse(Lists.newArrayList(writers.keySet())));
74         this.writersOrder = writers.keySet();
75         this.handledTypes = getAllHandledTypes(writers);
76     }
77
78     private static ImmutableSet<InstanceIdentifier<?>> getAllHandledTypes(
79             @Nonnull final ImmutableMap<InstanceIdentifier<?>, Writer<?>> writers) {
80         final ImmutableSet.Builder<InstanceIdentifier<?>> handledTypesBuilder = ImmutableSet.builder();
81         for (Map.Entry<InstanceIdentifier<?>, Writer<?>> writerEntry : writers.entrySet()) {
82             final InstanceIdentifier<?> writerType = writerEntry.getKey();
83             final Writer<?> writer = writerEntry.getValue();
84             handledTypesBuilder.add(writerType);
85             if (writer instanceof SubtreeWriter) {
86                 handledTypesBuilder.addAll(((SubtreeWriter<?>) writer).getHandledChildTypes());
87             }
88         }
89         return handledTypesBuilder.build();
90     }
91
92     @Override
93     public void update(@Nonnull final DataObjectUpdates updates,
94                        @Nonnull final WriteContext ctx) throws TranslationException {
95         if (updates.isEmpty()) {
96             return;
97         }
98
99         // Optimization
100         if (updates.containsOnlySingleType()) {
101             // First process delete
102             singleUpdate(updates.getDeletes(), ctx);
103             // Next is update
104             singleUpdate(updates.getUpdates(), ctx);
105         } else {
106             // First process deletes
107             bulkUpdate(updates.getDeletes(), ctx, true, writersOrderReversed);
108             // Next are updates
109             bulkUpdate(updates.getUpdates(), ctx, true, writersOrder);
110         }
111
112         LOG.debug("Update successful for types: {}", updates.getTypeIntersection());
113         LOG.trace("Update successful for: {}", updates);
114     }
115
116     private void singleUpdate(@Nonnull final Multimap<InstanceIdentifier<?>, ? extends DataObjectUpdate> updates,
117                               @Nonnull final WriteContext ctx) throws WriteFailedException {
118         if (updates.isEmpty()) {
119             return;
120         }
121
122         final InstanceIdentifier<?> singleType = updates.keySet().iterator().next();
123         LOG.debug("Performing single type update for: {}", singleType);
124         Collection<? extends DataObjectUpdate> singleTypeUpdates = updates.get(singleType);
125         Writer<?> writer = getWriter(singleType);
126
127         if (writer == null) {
128             // This node must be handled by a subtree writer, find it and call it or else fail
129             checkArgument(handledTypes.contains(singleType), "Unable to process update. Missing writers for: %s",
130                     singleType);
131             writer = getSubtreeWriterResponsible(singleType);
132             singleTypeUpdates = getParentDataObjectUpdate(ctx, updates, writer);
133         }
134
135         LOG.trace("Performing single type update with writer: {}", writer);
136         for (DataObjectUpdate singleUpdate : singleTypeUpdates) {
137             writer.update(singleUpdate.getId(), singleUpdate.getDataBefore(), singleUpdate.getDataAfter(), ctx);
138         }
139     }
140
141     private Writer<?> getSubtreeWriterResponsible(final InstanceIdentifier<?> singleType) {
142         final Writer<?> writer;// This is slow ( minor TODO-perf )
143         writer = writers.values().stream()
144                 .filter(w -> w instanceof SubtreeWriter)
145                 .filter(w -> ((SubtreeWriter<?>) w).getHandledChildTypes().contains(singleType))
146                 .findFirst()
147                 .get();
148         return writer;
149     }
150
151     private Collection<DataObjectUpdate> getParentDataObjectUpdate(final WriteContext ctx,
152                                                                    final Multimap<InstanceIdentifier<?>, ? extends DataObjectUpdate> updates,
153                                                                    final Writer<?> writer) {
154         // Now read data for subtree reader root, but first keyed ID is needed and that ID can be cut from updates
155         InstanceIdentifier<?> firstAffectedChildId = ((SubtreeWriter<?>) writer).getHandledChildTypes().stream()
156                 .filter(updates::containsKey)
157                 .map(unkeyedId -> updates.get(unkeyedId))
158                 .flatMap(doUpdates -> doUpdates.stream())
159                 .map(DataObjectUpdate::getId)
160                 .findFirst()
161                 .get();
162
163         final InstanceIdentifier<?> parentKeyedId =
164                 RWUtils.cutId(firstAffectedChildId, writer.getManagedDataObjectType());
165
166         final Optional<? extends DataObject> parentBefore = ctx.readBefore(parentKeyedId);
167         final Optional<? extends DataObject> parentAfter = ctx.readAfter(parentKeyedId);
168         return Collections.singleton(
169                 DataObjectUpdate.create(parentKeyedId, parentBefore.orNull(), parentAfter.orNull()));
170     }
171
172     private void bulkUpdate(@Nonnull final Multimap<InstanceIdentifier<?>, ? extends DataObjectUpdate> updates,
173                             @Nonnull final WriteContext ctx,
174                             final boolean attemptRevert,
175                             @Nonnull final Set<InstanceIdentifier<?>> writersOrder) throws BulkUpdateException {
176         if (updates.isEmpty()) {
177             return;
178         }
179
180         LOG.debug("Performing bulk update with revert attempt: {} for: {}", attemptRevert, updates.keySet());
181
182         // Check that all updates can be handled
183         checkAllTypesCanBeHandled(updates);
184
185         // Capture all changes successfully processed in case revert is needed
186         final Set<InstanceIdentifier<?>> processedNodes = new HashSet<>();
187
188         // Iterate over all writers and call update if there are any related updates
189         for (InstanceIdentifier<?> writerType : writersOrder) {
190             Collection<? extends DataObjectUpdate> writersData = updates.get(writerType);
191             final Writer<?> writer = getWriter(writerType);
192
193             if (writersData.isEmpty()) {
194                 // If there are no data for current writer, but it is a SubtreeWriter and there are updates to
195                 // its children, still invoke it with its root data
196                 if (writer instanceof SubtreeWriter<?> && isAffected(((SubtreeWriter<?>) writer), updates)) {
197                     // Provide parent data for SubtreeWriter for further processing
198                     writersData = getParentDataObjectUpdate(ctx, updates, writer);
199                 } else {
200                     // Skipping unaffected writer
201                     // Alternative to this would be modification sort according to the order of writers
202                     continue;
203                 }
204             }
205
206             LOG.debug("Performing update for: {}",  writerType);
207             LOG.trace("Performing update with writer: {}", writer);
208
209             for (DataObjectUpdate singleUpdate : writersData) {
210                 try {
211                     writer.update(singleUpdate.getId(), singleUpdate.getDataBefore(), singleUpdate.getDataAfter(), ctx);
212                     processedNodes.add(singleUpdate.getId());
213                     LOG.trace("Update successful for type: {}", writerType);
214                     LOG.debug("Update successful for: {}", singleUpdate);
215                 } catch (Exception e) {
216                     LOG.error("Error while processing data change of: {} (updates={})", writerType, writersData, e);
217
218                     final Reverter reverter = attemptRevert
219                             ? new ReverterImpl(processedNodes, updates, writersOrder, ctx)
220                             : () -> {}; // NOOP reverter
221
222                     // Find out which changes left unprocessed
223                     final Set<InstanceIdentifier<?>> unprocessedChanges = updates.values().stream()
224                             .map(DataObjectUpdate::getId)
225                             .filter(id -> !processedNodes.contains(id))
226                             .collect(Collectors.toSet());
227                     throw new BulkUpdateException(unprocessedChanges, reverter, e);
228                 }
229             }
230         }
231     }
232
233     private void checkAllTypesCanBeHandled(
234             @Nonnull final Multimap<InstanceIdentifier<?>, ? extends DataObjectUpdate> updates) {
235         if (!handledTypes.containsAll(updates.keySet())) {
236             final Sets.SetView<InstanceIdentifier<?>> missingWriters = Sets.difference(updates.keySet(), handledTypes);
237             LOG.warn("Unable to process update. Missing writers for: {}", missingWriters);
238             throw new IllegalArgumentException("Unable to process update. Missing writers for: " + missingWriters);
239         }
240     }
241
242     /**
243      * Check whether {@link SubtreeWriter} is affected by the updates.
244      *
245      * @return true if there are any updates to SubtreeWriter's child nodes (those marked by SubtreeWriter
246      *         as being taken care of)
247      * */
248     private static boolean isAffected(final SubtreeWriter<?> writer,
249                                final Multimap<InstanceIdentifier<?>, ? extends DataObjectUpdate> updates) {
250         return !Sets.intersection(writer.getHandledChildTypes(), updates.keySet()).isEmpty();
251     }
252
253     @Nullable
254     private Writer<?> getWriter(@Nonnull final InstanceIdentifier<?> singleType) {
255         return writers.get(singleType);
256     }
257
258     // FIXME unit test
259     private final class ReverterImpl implements Reverter {
260
261         private final Collection<InstanceIdentifier<?>> processedNodes;
262         private final Multimap<InstanceIdentifier<?>, ? extends DataObjectUpdate> updates;
263         private final Set<InstanceIdentifier<?>> revertDeleteOrder;
264         private final WriteContext ctx;
265
266         ReverterImpl(final Collection<InstanceIdentifier<?>> processedNodes,
267                      final Multimap<InstanceIdentifier<?>, ? extends DataObjectUpdate> updates,
268                      final Set<InstanceIdentifier<?>> writersOrderOriginal,
269                      final WriteContext ctx) {
270             this.processedNodes = processedNodes;
271             this.updates = updates;
272             // Use opposite ordering when executing revert
273             this.revertDeleteOrder =  writersOrderOriginal == FlatWriterRegistry.this.writersOrder
274                     ? FlatWriterRegistry.this.writersOrderReversed
275                     : FlatWriterRegistry.this.writersOrder;
276             this.ctx = ctx;
277         }
278
279         @Override
280         public void revert() throws RevertFailedException {
281             Multimap<InstanceIdentifier<?>, DataObjectUpdate> updatesToRevert =
282                     filterAndRevertProcessed(updates, processedNodes);
283
284             LOG.info("Attempting revert for changes: {}", updatesToRevert);
285             try {
286                 // Perform reversed bulk update without revert attempt
287                 bulkUpdate(updatesToRevert, ctx, true, revertDeleteOrder);
288                 LOG.info("Revert successful");
289             } catch (BulkUpdateException e) {
290                 LOG.error("Revert failed", e);
291                 throw new RevertFailedException(e.getFailedIds(), e);
292             }
293         }
294
295         /**
296          * Create new updates map, but only keep already processed changes. Switching before and after data for each
297          * update.
298          */
299         private Multimap<InstanceIdentifier<?>, DataObjectUpdate> filterAndRevertProcessed(
300                 final Multimap<InstanceIdentifier<?>, ? extends DataObjectUpdate> updates,
301                 final Collection<InstanceIdentifier<?>> processedNodes) {
302             final Multimap<InstanceIdentifier<?>, DataObjectUpdate> filtered = HashMultimap.create();
303             for (InstanceIdentifier<?> processedNode : processedNodes) {
304                 final InstanceIdentifier<?> wildcardedIid = RWUtils.makeIidWildcarded(processedNode);
305                 if (updates.containsKey(wildcardedIid)) {
306                     updates.get(wildcardedIid).stream()
307                             .filter(dataObjectUpdate -> processedNode.contains(dataObjectUpdate.getId()))
308                             .forEach(dataObjectUpdate -> filtered.put(processedNode, dataObjectUpdate.reverse()));
309                 }
310             }
311             return filtered;
312         }
313     }
314
315 }