diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CaptureLoad.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CaptureLoad.java index f970ef27ea9..00a6a32f4e9 100644 --- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CaptureLoad.java +++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/CaptureLoad.java @@ -96,7 +96,7 @@ static TopologyLoadConf captureTopology(Nimbus.Iface client, TopologySummary top Map savedTopoConf = new HashMap<>(); Map topoConf = (Map) JSONValue.parse(client.getTopologyConf(topologyId)); - for (String key: TopologyLoadConf.IMPORTANT_CONF_KEYS) { + for (String key : TopologyLoadConf.IMPORTANT_CONF_KEYS) { Object o = topoConf.get(key); if (o != null) { savedTopoConf.put(key, o); @@ -153,7 +153,7 @@ static TopologyLoadConf captureTopology(Nimbus.Iface client, TopologySummary top } Map> boltResources = getBoltsResources(topo, topoConf); - for (Map.Entry> entry: boltResources.entrySet()) { + for (Map.Entry> entry : boltResources.entrySet()) { LoadCompConf.Builder bd = boltBuilders.get(entry.getKey()); if (bd != null) { Map resources = entry.getValue(); @@ -194,7 +194,7 @@ static TopologyLoadConf captureTopology(Nimbus.Iface client, TopologySummary top } Map> spoutResources = getSpoutsResources(topo, topoConf); - for (Map.Entry> entry: spoutResources.entrySet()) { + for (Map.Entry> entry : spoutResources.entrySet()) { LoadCompConf.Builder sd = spoutBuilders.get(entry.getKey()); if (sd != null) { Map resources = entry.getValue(); @@ -212,7 +212,7 @@ static TopologyLoadConf captureTopology(Nimbus.Iface client, TopologySummary top //Stats... Map> byComponent = new HashMap<>(); - for (ExecutorSummary executor: info.get_executors()) { + for (ExecutorSummary executor : info.get_executors()) { String component = executor.get_component_id(); List list = byComponent.get(component); if (list == null) { @@ -250,7 +250,7 @@ static TopologyLoadConf captureTopology(Nimbus.Iface client, TopologySummary top List emittedRate = new ArrayList<>(); List summaries = byComponent.get(id.get_componentId()); if (summaries != null) { - for (ExecutorSummary summary: summaries) { + for (ExecutorSummary summary : summaries) { if (summary.is_set_stats()) { int uptime = summary.get_uptime_secs(); LOG.debug("UPTIME {}", uptime); @@ -349,7 +349,7 @@ public static void main(String[] args) throws Exception { Nimbus.Iface client = nc.getClient(); List topologyNames = cmd.getArgList(); - for (TopologySummary topologySummary: client.getTopologySummaries()) { + for (TopologySummary topologySummary : client.getTopologySummaries()) { if (topologyNames.isEmpty() || topologyNames.contains(topologySummary.get_name())) { TopologyLoadConf capturedConf = captureTopology(client, topologySummary); if (cmd.hasOption('a')) { diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/EstimateThroughput.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/EstimateThroughput.java index aee99bf225a..2b1570d957e 100644 --- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/EstimateThroughput.java +++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/EstimateThroughput.java @@ -77,7 +77,7 @@ public static void main(String[] args) throws Exception { Nimbus.Iface client = nc.getClient(); List topologyNames = cmd.getArgList(); - for (TopologySummary topologySummary: client.getTopologySummaries()) { + for (TopologySummary topologySummary : client.getTopologySummaries()) { if (topologyNames.isEmpty() || topologyNames.contains(topologySummary.get_name())) { TopologyLoadConf capturedConf = CaptureLoad.captureTopology(client, topologySummary); if (capturedConf.looksLikeTrident()) { @@ -89,10 +89,10 @@ public static void main(String[] args) throws Exception { } System.out.println("TOPOLOGY\tTOTAL MESSAGES/sec\tESTIMATED INPUT MESSAGES/sec"); - for (TopologyLoadConf tl: regular) { + for (TopologyLoadConf tl : regular) { System.out.println(tl.name + "\t" + tl.getAllEmittedAggregate() + "\t" + tl.getSpoutEmittedAggregate()); } - for (TopologyLoadConf tl: trident) { + for (TopologyLoadConf tl : trident) { System.out.println(tl.name + "\t" + tl.getAllEmittedAggregate() + "\t" + tl.getTridentEstimatedEmittedAggregate()); } exitStatus = 0; diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadBolt.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadBolt.java index 7eb2b73fdc2..06f1e5a25ac 100644 --- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadBolt.java +++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadBolt.java @@ -67,7 +67,7 @@ public void prepare(Map topoConf, TopologyContext context, Outpu } private void emitTuples(Tuple input) { - for (OutputStreamEngine se: outputStreams) { + for (OutputStreamEngine se : outputStreams) { // we may output many tuples for a given input tuple while (se.shouldEmit() != null) { collector.emit(se.streamName, input, new Values(se.nextKey(), "SOME-BOLT-VALUE")); @@ -87,7 +87,7 @@ public void execute(final Tuple input) { @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { - for (OutputStream s: outputStreamStats) { + for (OutputStream s : outputStreamStats) { declarer.declareStream(s.id, new Fields("key", "value")); } } diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadCompConf.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadCompConf.java index 724083854f2..33460a8d882 100644 --- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadCompConf.java +++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadCompConf.java @@ -48,7 +48,7 @@ public static LoadCompConf fromConf(Map conf) { List streams = new ArrayList<>(); List> streamData = (List>) conf.get("streams"); if (streamData != null) { - for (Map streamInfo: streamData) { + for (Map streamInfo : streamData) { streams.add(OutputStream.fromConf(streamInfo)); } } @@ -161,7 +161,7 @@ public LoadCompConf overrideSlowExecutorPattern(SlowExecutorPattern slp) { public double getAllEmittedAggregate() { double ret = 0; if (streams != null) { - for (OutputStream out: streams) { + for (OutputStream out : streams) { if (out.rate != null) { ret += out.rate.mean * parallelism; } diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java index 2fb69a21bcd..f02c407093a 100644 --- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java +++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadMetricsServer.java @@ -297,7 +297,7 @@ static Measurements combine(List measurements, Integer start, Inte count = Math.min(count, measurements.size() - start); Measurements ret = new Measurements(); - for (int i = start; i < start + count; i ++) { + for (int i = start; i < start + count; i++) { ret.add(measurements.get(i)); } return ret; @@ -534,7 +534,7 @@ abstract static class ColumnsFileReporter extends FileReporter { if (query.containsKey("extraColumns")) { List moreExtractors = handleExtractorCleanup(Arrays.asList(query.get("extraColumns").split("\\s*,\\s*"))); - for (String extractor: moreExtractors) { + for (String extractor : moreExtractors) { if (!allExtractors.containsKey(extractor)) { throw new IllegalArgumentException(extractor + " is not a supported column"); } @@ -557,7 +557,7 @@ abstract static class ColumnsFileReporter extends FileReporter { protected List handleExtractorCleanup(List orig) { Map stormConfig = Utils.readStormConfig(); List ret = new ArrayList<>(orig.size()); - for (String extractor: orig) { + for (String extractor : orig) { if (extractor.startsWith("conf:")) { String confKey = extractor.substring("conf:".length()); Object confValue = stormConfig.get(confKey); @@ -617,7 +617,7 @@ protected String format(Object o) { @Override public void start() { boolean first = true; - for (String name: extractors) { + for (String name : extractors) { if (!first) { out.print(" "); } @@ -634,7 +634,7 @@ public void start() { @Override public void reportWindow(Measurements m, List allTime) { boolean first = true; - for (String name: extractors) { + for (String name : extractors) { if (!first) { out.print(" "); } @@ -661,7 +661,7 @@ static class SepValReporter extends ColumnsFileReporter { @Override public void start() { boolean first = true; - for (String name: extractors) { + for (String name : extractors) { if (!first) { out.print(separator); } @@ -678,7 +678,7 @@ public void start() { @Override public void reportWindow(Measurements m, List allTime) { boolean first = true; - for (String name: extractors) { + for (String name : extractors) { if (!first) { out.print(separator); } @@ -812,7 +812,7 @@ public static void addCommandLineOptions(Options options) { FileNotFoundException { super(conf); Map allExtractors = new LinkedHashMap<>(NAMED_EXTRACTORS); - for (Map.Entry entry: parameterMetrics.entrySet()) { + for (Map.Entry entry : parameterMetrics.entrySet()) { final Object value = entry.getValue(); allExtractors.put(entry.getKey(), new MetricExtractor((m, unit) -> value, "")); } @@ -826,7 +826,7 @@ public static void addCommandLineOptions(Options options) { } reporters = new ArrayList<>(); if (commandLine.hasOption("reporter")) { - for (String reporterString: commandLine.getOptionValues("reporter")) { + for (String reporterString : commandLine.getOptionValues("reporter")) { Matcher m = REPORTER_PATTERN.matcher(reporterString); if (!m.matches()) { throw new IllegalArgumentException(reporterString + " does not look like it is a reporter"); @@ -878,20 +878,20 @@ public static void addCommandLineOptions(Options options) { private long readMemory() { long total = 0; - for (MemMeasure mem: memoryBytes.values()) { + for (MemMeasure mem : memoryBytes.values()) { total += mem.get(); } return total; } private void startMetricsOutput() { - for (MetricResultsReporter reporter: reporters) { + for (MetricResultsReporter reporter : reporters) { reporter.start(); } } private void finishMetricsOutput() throws Exception { - for (MetricResultsReporter reporter: reporters) { + for (MetricResultsReporter reporter : reporters) { reporter.finish(allCombined); } } @@ -923,7 +923,7 @@ private void outputMetrics(Nimbus.Iface client, Collection names) throws long failed = 0; double totalLatMs = 0; long totalLatCount = 0; - for (String name: names) { + for (String name : names) { TopologyInfo info = client.getTopologyInfoByName(name); ids.add(info.get_id()); @SuppressWarnings("checkstyle:VariableDeclarationUsageDistance") @@ -984,7 +984,7 @@ private void outputMetrics(Nimbus.Iface client, Collection names) throws ids, workers.size(), executors, hosts.size(), congested.getAndSet(new ConcurrentHashMap<>()), skippedMaxSpout, totalLatMs / totalLatCount)); Measurements inWindow = Measurements.combine(allCombined, null, windowLength); - for (MetricResultsReporter reporter: reporters) { + for (MetricResultsReporter reporter : reporters) { reporter.reportWindow(inWindow, allCombined); } } @@ -993,7 +993,7 @@ private void outputMetrics(Nimbus.Iface client, Collection names) throws @SuppressWarnings("unchecked") public void handle(IMetricsConsumer.TaskInfo taskInfo, Collection dataPoints, String topologyId) { String worker = taskInfo.srcWorkerHost + ":" + taskInfo.srcWorkerPort; - for (IMetricsConsumer.DataPoint dp: dataPoints) { + for (IMetricsConsumer.DataPoint dp : dataPoints) { if (dp.name.startsWith("comp-lat-histo") && dp.value instanceof Histogram) { synchronized (histo) { histo.add((Histogram) dp.value); diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadSpout.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadSpout.java index 5d639d38467..1fc006eb800 100644 --- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadSpout.java +++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/LoadSpout.java @@ -44,7 +44,7 @@ private static class OutputStreamEngineWithHisto extends OutputStreamEngine { OutputStreamEngineWithHisto(OutputStream stats, TopologyContext context) { super(stats); histogram = new HistogramMetric(3600000000000L, 3); - //TODO perhaps we can adjust the frequency later... + //TODO: perhaps we can adjust the frequency later... context.registerMetric("comp-lat-histo-" + stats.id, histogram, 10); } } @@ -127,7 +127,7 @@ protected Values getNextValues(OutputStreamEngine se) { @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { - for (OutputStream s: streamStats) { + for (OutputStream s : streamStats) { declarer.declareStream(s.id, new Fields("key", "value")); } } diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/NormalDistStats.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/NormalDistStats.java index d7555a555b9..12dc6c5efc2 100644 --- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/NormalDistStats.java +++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/NormalDistStats.java @@ -87,14 +87,14 @@ public NormalDistStats(List values) { double max = values.isEmpty() ? 0.0 : values.get(0); double sum = 0.0; long count = values.size(); - for (Double v: values) { + for (Double v : values) { sum += v; min = Math.min(min, v); max = Math.max(max, v); } double mean = sum / Math.max(count, 1); double sdPartial = 0; - for (Double v: values) { + for (Double v : values) { sdPartial += Math.pow(v - mean, 2); } double stddev = 0.0; diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/OutputStreamEngine.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/OutputStreamEngine.java index ae23679ee28..2794fcdc29e 100644 --- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/OutputStreamEngine.java +++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/OutputStreamEngine.java @@ -109,7 +109,7 @@ public String nextKey() { if (stats.areKeysSkewed) { //We set the stddev of the skewed keys to be 1/5 of the length, but then we use the absolute value // of that so everything is skewed towards 0 - keyIndex = Math.min(KEYS.length - 1 , Math.abs((int) (rand.nextGaussian() * KEYS.length / 5))); + keyIndex = Math.min(KEYS.length - 1, Math.abs((int) (rand.nextGaussian() * KEYS.length / 5))); } else { keyIndex = rand.nextInt(KEYS.length); } diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ThroughputVsLatency.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ThroughputVsLatency.java index b83c438e9b6..17aa8cf9441 100644 --- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ThroughputVsLatency.java +++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/ThroughputVsLatency.java @@ -105,9 +105,9 @@ public void prepare(Map stormConf, @Override public void execute(Tuple tuple, BasicOutputCollector collector) { - sleep.simulateProcessAndExecTime(executorIndex, Time.nanoTime(), null , () -> { + sleep.simulateProcessAndExecTime(executorIndex, Time.nanoTime(), null, () -> { String sentence = tuple.getString(0); - for (String word: sentence.split("\\s+")) { + for (String word : sentence.split("\\s+")) { collector.emit(new Values(word, 1)); } }); diff --git a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/TopologyLoadConf.java b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/TopologyLoadConf.java index c639091fc12..3765f21d13d 100644 --- a/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/TopologyLoadConf.java +++ b/examples/storm-loadgen/src/main/java/org/apache/storm/loadgen/TopologyLoadConf.java @@ -105,7 +105,7 @@ public static TopologyLoadConf fromConf(Map conf) { } List spouts = new ArrayList<>(); - for (Map spoutInfo: (List>) conf.get("spouts")) { + for (Map spoutInfo : (List>) conf.get("spouts")) { spouts.add(LoadCompConf.fromConf(spoutInfo)); } @@ -120,7 +120,7 @@ public static TopologyLoadConf fromConf(Map conf) { List streams = new ArrayList<>(); List> streamInfos = (List>) conf.get("streams"); if (streamInfos != null) { - for (Map streamInfo: streamInfos) { + for (Map streamInfo : streamInfos) { streams.add(InputStream.fromConf(streamInfo)); } } @@ -235,7 +235,7 @@ public TopologyLoadConf withName(String baseName) { * @return the first one that is not null */ static V or(V...rest) { - for (V i: rest) { + for (V i : rest) { if (i != null) { return i; } @@ -331,7 +331,7 @@ public TopologyLoadConf overrideSlowExecs(Map topoS public TopologyLoadConf anonymize() { Map remappedComponents = new HashMap<>(); Map remappedStreams = new HashMap<>(); - for (LoadCompConf comp: bolts) { + for (LoadCompConf comp : bolts) { String newId = getUniqueBoltName(); remappedComponents.put(comp.id, newId); if (comp.streams != null) { @@ -343,7 +343,7 @@ public TopologyLoadConf anonymize() { } } - for (LoadCompConf comp: spouts) { + for (LoadCompConf comp : spouts) { remappedComponents.put(comp.id, getUniqueSpoutName()); String newId = getUniqueSpoutName(); remappedComponents.put(comp.id, newId); @@ -387,7 +387,7 @@ public TopologyLoadConf anonymize() { private static Map anonymizeTopoConf(Map topoConf) { //Only keep important conf keys Map ret = new HashMap<>(); - for (Map.Entry entry: topoConf.entrySet()) { + for (Map.Entry entry : topoConf.entrySet()) { String key = entry.getKey(); Object value = entry.getValue(); if (IMPORTANT_CONF_KEYS.contains(key)) { @@ -405,7 +405,7 @@ private static Object cleanupChildOpts(Object value) { if (value instanceof String) { String sv = (String) value; StringBuffer ret = new StringBuffer(); - for (String part: sv.split("\\s+")) { + for (String part : sv.split("\\s+")) { if (part.startsWith("-X")) { ret.append(part).append(" "); } @@ -413,7 +413,7 @@ private static Object cleanupChildOpts(Object value) { return ret.toString(); } else { List ret = new ArrayList<>(); - for (String subValue: (Collection) value) { + for (String subValue : (Collection) value) { ret.add((String) cleanupChildOpts(subValue)); } return ret.stream().filter((item) -> item != null && !item.isEmpty()).collect(Collectors.toList()); @@ -426,19 +426,19 @@ private static Object cleanupChildOpts(Object value) { * @return true if it does else false. */ public boolean looksLikeTrident() { - for (LoadCompConf spout: spouts) { + for (LoadCompConf spout : spouts) { if (spout.id.startsWith("$mastercoord")) { return true; } } - for (LoadCompConf bolt: bolts) { + for (LoadCompConf bolt : bolts) { if (bolt.id.startsWith("$spoutcoord")) { return true; } } - for (InputStream in: streams) { + for (InputStream in : streams) { if (in.id.equals("$batch")) { return true; } @@ -452,7 +452,7 @@ public boolean looksLikeTrident() { */ public double getAllEmittedAggregate() { double ret = getSpoutEmittedAggregate(); - for (LoadCompConf bolt: bolts) { + for (LoadCompConf bolt : bolts) { ret += bolt.getAllEmittedAggregate(); } return ret; @@ -464,7 +464,7 @@ public double getAllEmittedAggregate() { */ public double getSpoutEmittedAggregate() { double ret = 0; - for (LoadCompConf spout: spouts) { + for (LoadCompConf spout : spouts) { ret += spout.getAllEmittedAggregate(); } return ret; @@ -484,7 +484,7 @@ public double getTridentEstimatedEmittedAggregate() { for (LoadCompConf comp : all) { if (comp.id.startsWith("spout-")) { if (comp.streams != null) { - for (OutputStream out: comp.streams) { + for (OutputStream out : comp.streams) { if (!out.id.startsWith("$") && !out.id.startsWith("__") && out.rate != null) { diff --git a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java index fd9f042d3aa..c411844a34b 100755 --- a/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java +++ b/examples/storm-perf/src/main/java/org/apache/storm/perf/utils/MetricsSample.java @@ -103,7 +103,7 @@ private static MetricsSample getMetricsSample(TopologyInfo topInfo) { continue; } for (String key : txMap.keySet()) { - // todo, ignore the master batch coordinator ? + // TODO: ignore the master batch coordinator ? if (!Utils.isSystemId(key)) { Long count = txMap.get(key); totalTransferred += count; diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulTopology.java index 3f30f4866c0..be8d28ed848 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulTopology.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/StatefulTopology.java @@ -58,7 +58,6 @@ * "timeout":2000, "database":0, "password":"xyz"}}' * * - *

*/ public class StatefulTopology { private static final Logger LOG = LoggerFactory.getLogger(StatefulTopology.class); diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/AggregateExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/AggregateExample.java index 626ac8f7359..338a894031e 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/AggregateExample.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/AggregateExample.java @@ -29,7 +29,7 @@ public class AggregateExample { @SuppressWarnings("unchecked") public static void main(String[] args) throws Exception { StreamBuilder builder = new StreamBuilder(); - /** + /* * Computes average of the stream of numbers emitted by the spout. Internally the per-partition * sum and counts are accumulated and emitted to a downstream task where the partially accumulated * results are merged and the final result is emitted. diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/JoinExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/JoinExample.java index 9016c787896..49085c5102e 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/JoinExample.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/JoinExample.java @@ -53,7 +53,7 @@ public static void main(String[] args) throws Exception { * of the cubes stream within the window will be joined together. */ .join(cubes) - /** + /* * The results should be of the form (number, (square, cube)) */ .print(); diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBaseNimbus.java b/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBaseNimbus.java index 95e312d20b5..84ce57e0844 100644 --- a/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBaseNimbus.java +++ b/external/storm-autocreds/src/main/java/org/apache/storm/hbase/security/AutoHBaseNimbus.java @@ -116,7 +116,7 @@ protected byte[] getHadoopCredentials(Map conf, Configuration hb throw new RuntimeException("Security is not enabled for Hadoop"); } } catch (Exception ex) { - throw new RuntimeException("Failed to get delegation tokens." , ex); + throw new RuntimeException("Failed to get delegation tokens.", ex); } } diff --git a/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFSNimbus.java b/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFSNimbus.java index 87ab4294543..200ae4747d8 100644 --- a/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFSNimbus.java +++ b/external/storm-autocreds/src/main/java/org/apache/storm/hdfs/security/AutoHDFSNimbus.java @@ -132,7 +132,7 @@ public Object run() { throw new RuntimeException("Security is not enabled for HDFS"); } } catch (Exception ex) { - throw new RuntimeException("Failed to get delegation tokens." , ex); + throw new RuntimeException("Failed to get delegation tokens.", ex); } } diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java index 0b3da9c2625..42924c91d18 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/TextFileReader.java @@ -24,7 +24,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -// Todo: Track file offsets instead of line number +// TODO: Track file offsets instead of line number public class TextFileReader extends AbstractFileReader { public static final String[] defaultFields = { "line" }; public static final String CHARSET = "hdfsspout.reader.charset"; diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java index 0b3498e543c..f5f08e12d1c 100644 --- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java +++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/common/Column.java @@ -27,14 +27,14 @@ * * *

The following class can be used to represent the data in the table as - *

{@code
  * List> rows = new ArrayList>();
  * List row1 = Lists.newArrayList(new Column("UserId", 1, Types.INTEGER), new Column("UserName", "Foo", Types.VARCHAR))
  * List row1 = Lists.newArrayList(new Column("UserId", 2, Types.INTEGER), new Column("UserName", "Bar", Types.VARCHAR))
  *
  * rows.add(row1)
  * rows.add(row2)
- * ]]>
+ * }
  * 
*/ public class Column implements Serializable { diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java index 370af537bd6..73582dd4c8a 100644 --- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java +++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java @@ -533,7 +533,7 @@ private void commitOffsetsForAckedTuples() { position, committedOffset); consumer.seek(tp, committedOffset); } - /** + /* * In some cases the waitingToEmit list may contain tuples that have just been committed. Drop these. */ List> waitingToEmitForTp = waitingToEmit.get(tp); diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java b/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java index c89c4d70ccc..815a7bc1b18 100644 --- a/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java +++ b/flux/flux-core/src/main/java/org/apache/storm/flux/Flux.java @@ -150,7 +150,7 @@ private static void runCli(CommandLine cmd) throws Exception { TopologyDef topologyDef = null; String filePath = (String) cmd.getArgList().get(0); - // TODO conditionally load properties from a file our resource + // TODO: conditionally load properties from a file our resource String filterProps = null; if (cmd.hasOption(OPTION_FILTER)) { filterProps = cmd.getOptionValue(OPTION_FILTER); diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java b/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java index 148852ebf60..338994baa0d 100644 --- a/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java +++ b/flux/flux-core/src/main/java/org/apache/storm/flux/FluxBuilder.java @@ -246,7 +246,7 @@ private static void buildStreamDefinitions(ExecutionContext context, TopologyBui declarer.shuffleGrouping(stream.getFrom(), streamId); break; case FIELDS: - //TODO check for null grouping args + //TODO: check for null grouping args declarer.fieldsGrouping(stream.getFrom(), streamId, new Fields(grouping.getArgs())); break; case ALL: @@ -489,7 +489,7 @@ private static void buildBolts(ExecutionContext context) throws ClassNotFoundExc */ private static void buildWorkerHooks(ExecutionContext context, TopologyBuilder builder) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException, NoSuchFieldException { - for (WorkerHookDef whDef: context.getTopologyDef().getWorkerHooks()) { + for (WorkerHookDef whDef : context.getTopologyDef().getWorkerHooks()) { IWorkerHook workerHook = (IWorkerHook) buildObject(whDef, context); builder.addWorkerHook(workerHook); } @@ -665,7 +665,7 @@ private static Object[] getArgsWithListCoercion(List args, Class[] param // List to array conversion if (paramType.isArray() && List.class.isAssignableFrom(objectType)) { - // TODO more collection content type checking + // TODO: more collection content type checking LOG.debug("Conversion appears possible..."); List list = (List) obj; LOG.debug("Array Type: {}, List type: {}", paramType.getComponentType(), list.get(0).getClass()); @@ -723,7 +723,7 @@ private static boolean canInvokeWithArgs(List args, Class[] parameterTyp } else if (paramType.isEnum() && objectType.equals(String.class)) { LOG.debug("Yes, will convert a String to enum"); } else if (paramType.isArray() && List.class.isAssignableFrom(objectType)) { - // TODO more collection content type checking + // TODO: more collection content type checking LOG.debug("Assignment is possible if we convert a List to an array."); LOG.debug("Array Type: {}, List type: {}", paramType.getComponentType(), ((List) obj).get(0).getClass()); } else { diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologyDef.java b/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologyDef.java index 7744e4c2d0a..fe05275b7e0 100644 --- a/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologyDef.java +++ b/flux/flux-core/src/main/java/org/apache/storm/flux/model/TopologyDef.java @@ -264,7 +264,7 @@ public void addAllComponents(List components, boolean override) { * @param override whether or not to override existing definitions (currently ignored) */ public void addAllStreams(List streams, boolean override) { - //TODO figure out how we want to deal with overrides. Users may want to add streams even when overriding other + //TODO: figure out how we want to deal with overrides. Users may want to add streams even when overriding other // properties. For now we just add them blindly which could lead to a potentially invalid topology. this.streams.addAll(streams); } diff --git a/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java b/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java index e9adc29d95f..c96a5fe10ac 100644 --- a/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java +++ b/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java @@ -219,7 +219,7 @@ private static Yaml yaml() { */ private static TopologyDef processIncludes(Yaml yaml, TopologyDef topologyDef, Properties properties, boolean envSub) throws IOException { - //TODO support multiple levels of includes + //TODO: support multiple levels of includes if (topologyDef.getIncludes() != null) { for (IncludeDef include : topologyDef.getIncludes()) { TopologyDef includeTopologyDef = null; @@ -240,7 +240,7 @@ private static TopologyDef processIncludes(Yaml yaml, TopologyDef topologyDef, P // config if (includeTopologyDef.getConfig() != null) { - //TODO move this logic to the model class + //TODO: move this logic to the model class Map config = topologyDef.getConfig(); Map includeConfig = includeTopologyDef.getConfig(); if (override) { @@ -269,7 +269,7 @@ private static TopologyDef processIncludes(Yaml yaml, TopologyDef topologyDef, P topologyDef.addAllSpouts(includeTopologyDef.getSpouts(), override); } //stream overrides - //TODO streams should be uniquely identifiable + //TODO: streams should be uniquely identifiable if (includeTopologyDef.getStreams() != null) { topologyDef.addAllStreams(includeTopologyDef.getStreams(), override); } diff --git a/storm-checkstyle/src/main/resources/storm/storm_checkstyle.xml b/storm-checkstyle/src/main/resources/storm/storm_checkstyle.xml index 485f6fd9a92..3fbcfd063c0 100644 --- a/storm-checkstyle/src/main/resources/storm/storm_checkstyle.xml +++ b/storm-checkstyle/src/main/resources/storm/storm_checkstyle.xml @@ -197,7 +197,6 @@ may only be represented as '{}' when not part of a multi-block statement (4.1.3)"/> - @@ -394,7 +393,6 @@ value="COMMA, SEMI, POST_INC, POST_DEC, DOT, LABELED_STAT, METHOD_REF, ELLIPSIS"/> - @@ -438,12 +436,8 @@ - - - - - - + + @@ -514,7 +508,6 @@ - diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java index 0d70465858f..79142a9c239 100644 --- a/storm-client/src/jvm/org/apache/storm/Config.java +++ b/storm-client/src/jvm/org/apache/storm/Config.java @@ -1680,28 +1680,28 @@ public class Config extends HashMap { */ @IsPositiveNumber public static final String SUPERVISOR_CPU_CAPACITY = "supervisor.cpu.capacity"; - @IsInteger - @IsPositiveNumber /** * Port used for supervisor thrift server. */ + @IsInteger + @IsPositiveNumber public static final String SUPERVISOR_THRIFT_PORT = "supervisor.thrift.port"; - @IsString /** * The Supervisor invocations transport plug-in for Thrift client/server communication. */ + @IsString public static final String SUPERVISOR_THRIFT_TRANSPORT_PLUGIN = "supervisor.thrift.transport"; - @IsInteger - @IsPositiveNumber /** * Supervisor thrift server queue size. */ - public static final String SUPERVISOR_QUEUE_SIZE = "supervisor.queue.size"; @IsInteger @IsPositiveNumber + public static final String SUPERVISOR_QUEUE_SIZE = "supervisor.queue.size"; /** * The number of threads that should be used by the supervisor thrift server. */ + @IsInteger + @IsPositiveNumber public static final String SUPERVISOR_THRIFT_THREADS = "supervisor.thrift.threads"; @IsNumber @IsPositiveNumber diff --git a/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.java b/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.java index 6246966f80c..354723e704f 100644 --- a/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.java +++ b/storm-client/src/jvm/org/apache/storm/bolt/JoinBolt.java @@ -316,7 +316,7 @@ private String getStreamSelector(Tuple ti) { // Performs projection on the tuples based on 'projectionFields' protected ArrayList doProjection(ArrayList tuples, FieldSelector[] projectionFields) { ArrayList result = new ArrayList<>(projectionFields.length); - // Todo: optimize this computation... perhaps inner loop can be outside to avoid rescanning tuples + // TODO: optimize this computation... perhaps inner loop can be outside to avoid rescanning tuples for (int i = 0; i < projectionFields.length; i++) { boolean missingField = true; for (Tuple tuple : tuples) { diff --git a/storm-client/src/jvm/org/apache/storm/hooks/IWorkerHook.java b/storm-client/src/jvm/org/apache/storm/hooks/IWorkerHook.java index fa60975f2c7..61719c1b0e3 100644 --- a/storm-client/src/jvm/org/apache/storm/hooks/IWorkerHook.java +++ b/storm-client/src/jvm/org/apache/storm/hooks/IWorkerHook.java @@ -29,7 +29,7 @@ public interface IWorkerHook extends Serializable { * * @param topoConf The Storm configuration for this worker * @param context This object can be used to get information about this worker's place within the topology and exposes - * {@link WorkerUserContext#setResource(String, Object)} to set the shared application state. + * {@link WorkerUserContext#setResource(String, Object)} to set the shared application state. */ default void start(Map topoConf, WorkerUserContext context) { // NOOP diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java index 1a74f2fe4f0..1865ac33d54 100644 --- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java +++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Client.java @@ -70,7 +70,7 @@ * destination is currently unavailable. */ public class Client extends ConnectionWithStatus implements ISaslClient { - private final long pendingMessagesFlushTimeoutMs ; + private final long pendingMessagesFlushTimeoutMs; private final long pendingMessagesFlushIntervalMs; private final double pendingMessagesFlushFactor = 0.0016; /** diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Login.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Login.java index 9b2feb5715c..c3e3d6fb481 100644 --- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Login.java +++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Login.java @@ -12,13 +12,6 @@ package org.apache.storm.messaging.netty; -/** - * This class is responsible for refreshing Kerberos credentials for logins for both Zookeeper client and server. See ZooKeeperSaslServer - * for server-side usage. See ZooKeeperSaslClient for client-side usage. This class is a copied from - * https://github.com/apache/zookeeper/blob/branch-3.4/src/java/main/org/apache/zookeeper/Login.java with the difference that refresh thread - * does not die. - */ - import java.io.File; import java.net.URI; import java.security.URIParameter; @@ -37,6 +30,12 @@ import org.apache.storm.shade.org.apache.zookeeper.Shell; import org.apache.storm.shade.org.apache.zookeeper.client.ZooKeeperSaslClient; +/** + * This class is responsible for refreshing Kerberos credentials for logins for both Zookeeper client and server. See ZooKeeperSaslServer + * for server-side usage. See ZooKeeperSaslClient for client-side usage. This class is a copied from + * https://github.com/apache/zookeeper/blob/branch-3.4/src/java/main/org/apache/zookeeper/Login.java with the difference that refresh thread + * does not die. + */ public class Login { // Login will sleep until 80% of time from last refresh to // ticket's expiry has been reached, at which time it will wake diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/ClientAuthUtils.java b/storm-client/src/jvm/org/apache/storm/security/auth/ClientAuthUtils.java index 4472960bc53..34326ecf9aa 100644 --- a/storm-client/src/jvm/org/apache/storm/security/auth/ClientAuthUtils.java +++ b/storm-client/src/jvm/org/apache/storm/security/auth/ClientAuthUtils.java @@ -184,7 +184,7 @@ public static IPrincipalToLocal getPrincipalToLocalPlugin(Map to LOG.warn("No principal to local given {}", Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN); } else { ptol = ReflectionUtils.newInstance(ptolClassname); - //TODO this can only ever be null if someone is doing something odd with mocking + //TODO: this can only ever be null if someone is doing something odd with mocking // We should really fix the mocking and remove this if (ptol != null) { ptol.prepare(topoConf); diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/ITransportPlugin.java b/storm-client/src/jvm/org/apache/storm/security/auth/ITransportPlugin.java index 19ec3746600..d927e8fb4cf 100644 --- a/storm-client/src/jvm/org/apache/storm/security/auth/ITransportPlugin.java +++ b/storm-client/src/jvm/org/apache/storm/security/auth/ITransportPlugin.java @@ -53,7 +53,7 @@ public interface ITransportPlugin { /** * Get port. * @return The port this transport is using. This is not known until - * {@link #getServer(org.apache.storm.thrift.TProcessor)} has been called + * {@link #getServer(org.apache.storm.thrift.TProcessor)} has been called */ int getPort(); diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/MultiThriftServer.java b/storm-client/src/jvm/org/apache/storm/security/auth/MultiThriftServer.java index 538f9b51d5e..514685645be 100644 --- a/storm-client/src/jvm/org/apache/storm/security/auth/MultiThriftServer.java +++ b/storm-client/src/jvm/org/apache/storm/security/auth/MultiThriftServer.java @@ -41,7 +41,7 @@ public void add(T thriftServer) { } public void serve() { - for (ThriftServer thriftServer: thriftServerMap.values()) { + for (ThriftServer thriftServer : thriftServerMap.values()) { if (Boolean.TRUE.equals(thriftServerIsServingMap.get(thriftServer.getType()))) { throw new IllegalStateException("The MultiThriftServer " + thriftServerThreadMap.get(thriftServer.getType()).getName() + " is already serving"); @@ -53,7 +53,7 @@ public void serve() { } public void stop() { - for (ThriftServer thriftServer: thriftServerMap.values()) { + for (ThriftServer thriftServer : thriftServerMap.values()) { if (Boolean.TRUE.equals(thriftServerIsServingMap.get(thriftServer.getType()))) { thriftServerThreadMap.get(thriftServer.getType()).interrupt(); thriftServerMap.get(thriftServer.getType()).stop(); diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/ThriftClient.java b/storm-client/src/jvm/org/apache/storm/security/auth/ThriftClient.java index 03720eb1765..f7becc3f385 100644 --- a/storm-client/src/jvm/org/apache/storm/security/auth/ThriftClient.java +++ b/storm-client/src/jvm/org/apache/storm/security/auth/ThriftClient.java @@ -189,7 +189,7 @@ public synchronized void reconnect() { //construct a transport plugin ITransportPlugin transportPlugin = ClientAuthUtils.getTransportPlugin(type, conf); - //TODO get this from type instead of hardcoding to Nimbus. + //TODO: get this from type instead of hardcoding to Nimbus. //establish client-server transport via plugin //do retries if the connect fails TBackoffConnect connectionRetry diff --git a/storm-client/src/jvm/org/apache/storm/security/auth/authorizer/ImpersonationAuthorizer.java b/storm-client/src/jvm/org/apache/storm/security/auth/authorizer/ImpersonationAuthorizer.java index b07e9bd8a00..22aa4e1669c 100644 --- a/storm-client/src/jvm/org/apache/storm/security/auth/authorizer/ImpersonationAuthorizer.java +++ b/storm-client/src/jvm/org/apache/storm/security/auth/authorizer/ImpersonationAuthorizer.java @@ -73,7 +73,7 @@ public boolean permit(ReqContext context, String operation, Map LOG.info("user = {}, principal = {} is attempting to impersonate user = {} for operation = {} from host = {}", impersonatingUser, impersonatingPrincipal, userBeingImpersonated, operation, remoteAddress); - /** + /* * no config is present for impersonating principal or user, do not permit impersonation. */ if (!userImpersonationACL.containsKey(impersonatingPrincipal) && !userImpersonationACL.containsKey(impersonatingUser)) { diff --git a/storm-client/src/jvm/org/apache/storm/spout/ShellSpout.java b/storm-client/src/jvm/org/apache/storm/spout/ShellSpout.java index 07ab24c7ae1..8487e113507 100644 --- a/storm-client/src/jvm/org/apache/storm/spout/ShellSpout.java +++ b/storm-client/src/jvm/org/apache/storm/spout/ShellSpout.java @@ -78,13 +78,13 @@ public boolean shouldChangeChildCWD() { return changeDirectory; } - @SuppressWarnings("checkstyle:AbbreviationAsWordInName") /** * Set if the current working directory of the child process should change to the resources dir from extracted from the jar, or if it * should stay the same as the worker process to access things from the blob store. * * @param changeDirectory true change the directory (default) false leave the directory the same as the worker process. */ + @SuppressWarnings("checkstyle:AbbreviationAsWordInName") public void changeChildCWD(boolean changeDirectory) { this.changeDirectory = changeDirectory; } diff --git a/storm-client/src/jvm/org/apache/storm/topology/base/BaseRichSpout.java b/storm-client/src/jvm/org/apache/storm/topology/base/BaseRichSpout.java index c7e36b797e0..80646869758 100644 --- a/storm-client/src/jvm/org/apache/storm/topology/base/BaseRichSpout.java +++ b/storm-client/src/jvm/org/apache/storm/topology/base/BaseRichSpout.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at diff --git a/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java b/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java index 94599ec8935..d0fc9691dbd 100644 --- a/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java +++ b/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java @@ -44,7 +44,7 @@ public class ConfigUtils { private static final Set passwordConfigKeys = new HashSet<>(); static { - for (Class clazz: ConfigValidation.getConfigClasses()) { + for (Class clazz : ConfigValidation.getConfigClasses()) { for (Field field : clazz.getFields()) { for (Annotation annotation : field.getAnnotations()) { boolean isPassword = annotation.annotationType().getName().equals( diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java b/storm-client/src/jvm/org/apache/storm/utils/Utils.java index 7ba3298fee2..3590583ad3b 100644 --- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java +++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java @@ -1448,7 +1448,7 @@ public static String threadDump() { final StringBuilder dump = new StringBuilder(); final java.lang.management.ThreadMXBean threadMxBean = ManagementFactory.getThreadMXBean(); final java.lang.management.ThreadInfo[] threadInfos = threadMxBean.getThreadInfo(threadMxBean.getAllThreadIds(), 100); - for (Entry entry: Thread.getAllStackTraces().entrySet()) { + for (Entry entry : Thread.getAllStackTraces().entrySet()) { Thread t = entry.getKey(); ThreadInfo threadInfo = threadMxBean.getThreadInfo(t.getId()); if (threadInfo == null) { @@ -2113,7 +2113,7 @@ private static void findComponentCyclesRecursion( return; } Set children = new HashSet<>(edgesOut.get(compId1)); - for (String compId2: children) { + for (String compId2 : children) { if (seen.contains(compId2)) { // cycle/diamond detected List possibleCycle = new ArrayList<>(); @@ -2158,7 +2158,7 @@ public static List> findComponentCycles(StormTopology topology, Str Map> edgesOut = getStormTopologyForwardGraph(topology); Set allComponentIds = new HashSet<>(); edgesOut.forEach((k, v) -> { - allComponentIds.add(k) ; + allComponentIds.add(k); allComponentIds.addAll(v); }); diff --git a/storm-client/src/jvm/org/apache/storm/utils/VersionInfo.java b/storm-client/src/jvm/org/apache/storm/utils/VersionInfo.java index 8dad52c3d5c..4c9a4478738 100644 --- a/storm-client/src/jvm/org/apache/storm/utils/VersionInfo.java +++ b/storm-client/src/jvm/org/apache/storm/utils/VersionInfo.java @@ -124,7 +124,7 @@ public static IVersionInfo getFromClasspath(List classpath) { private static IVersionInfo getFromClasspath(List classpath, final String propFileName) { IVersionInfo ret = null; - for (String part: classpath) { + for (String part : classpath) { Path p = Paths.get(part); if (Files.isDirectory(p)) { Path child = p.resolve(propFileName); diff --git a/storm-client/src/jvm/org/apache/storm/utils/WritableUtils.java b/storm-client/src/jvm/org/apache/storm/utils/WritableUtils.java index e21fc4b9a70..2d964390c93 100644 --- a/storm-client/src/jvm/org/apache/storm/utils/WritableUtils.java +++ b/storm-client/src/jvm/org/apache/storm/utils/WritableUtils.java @@ -1,39 +1,4 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version - * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version - * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * - * This file originally comes from the Apache Hadoop project. Changes have been made to the file. - * - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version - * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions - * and limitations under the License. - * - * This file originally comes from the Apache Hadoop project. Changes have been made to the file. - */ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -51,7 +16,7 @@ * limitations under the License. */ -/** +/* * This file originally comes from the Apache Hadoop project. Changes have been made to the file. * */ diff --git a/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java b/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java index 7f2d2462c8e..0d59fed77de 100644 --- a/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java +++ b/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java @@ -953,7 +953,7 @@ public void validateField(String name, Object o) { "Field " + name + " must be an Iterable containing only Map of Maps"); } Map map1 = (Map) o; - for (Map.Entry entry1: map1.entrySet()) { + for (Map.Entry entry1 : map1.entrySet()) { String comp1 = entry1.getKey(); Object o2 = entry1.getValue(); if (!(o2 instanceof Map)) { @@ -962,7 +962,7 @@ public void validateField(String name, Object o) { throw new IllegalArgumentException(err); } Map map2 = (Map) o2; - for (Map.Entry entry2: map2.entrySet()) { + for (Map.Entry entry2 : map2.entrySet()) { String constraintType = entry2.getKey(); Object o3 = entry2.getValue(); switch (constraintType) { diff --git a/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java b/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java index 76c1ed0c4a0..83c77ffd85e 100644 --- a/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java +++ b/storm-core/src/jvm/org/apache/storm/command/AdminCommands.java @@ -98,7 +98,7 @@ private static class CredentialsDebug implements AdminCommand { public void run(String[] args, Map conf, String command) throws Exception { // We are pretending to be nimbus here. IStormClusterState state = ClusterUtils.mkStormClusterState(conf, new ClusterStateContext(DaemonType.NIMBUS, conf)); - for (String topologyId: args) { + for (String topologyId : args) { System.out.println(topologyId + ":"); Credentials creds = state.credentials(topologyId, null); if (creds != null) { @@ -187,7 +187,7 @@ private static void prettyPrintKeyValue(String key, Object o, int depth, StringB println(out, depth, "}"); } else if (o instanceof Collection) { println(out, depth, keyStr(key) + "["); - for (Object sub: (Collection) o) { + for (Object sub : (Collection) o) { prettyPrintKeyValue(null, sub, depth + 1, out); } println(out, depth, "]"); @@ -202,7 +202,7 @@ private static class PrintTopo implements AdminCommand { @Override public void run(String[] args, Map conf, String command) throws Exception { - for (String arg: args) { + for (String arg : args) { System.out.println(arg + ":"); StormTopology topo; File f = new File(arg); @@ -232,7 +232,7 @@ public void run(String[] args, Map conf, String command) throws IStormClusterState stormClusterState = ClusterUtils.mkStormClusterState(conf, new ClusterStateContext(DaemonType.NIMBUS, conf)); Map infos = stormClusterState.allSupervisorInfo(); if (args.length <= 0) { - for (Map.Entry entry: infos.entrySet()) { + for (Map.Entry entry : infos.entrySet()) { System.out.println(entry.getKey() + ":"); System.out.println(prettyPrint(entry.getValue())); } @@ -260,7 +260,7 @@ public void run(String[] args, Map conf, String command) throws stormClusterState.setAssignmentsBackendSynchronized(); Map infos = stormClusterState.assignmentsInfo(); if (args.length <= 0) { - for (Map.Entry entry: infos.entrySet()) { + for (Map.Entry entry : infos.entrySet()) { System.out.println(entry.getKey() + ":"); System.out.println(prettyPrint(entry.getValue())); } @@ -321,7 +321,7 @@ static void help(String message, PrintStream out) { } out.println("storm admin [args]"); out.println(); - for (Map.Entry entry: COMMANDS.entrySet()) { + for (Map.Entry entry : COMMANDS.entrySet()) { entry.getValue().printCliHelp(entry.getKey(), out); out.println(); } diff --git a/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java b/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java index 2bf1f2bd8d9..7d8d7bbc8d0 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java +++ b/storm-core/src/jvm/org/apache/storm/utils/TopologySpoutLag.java @@ -57,7 +57,7 @@ public class TopologySpoutLag { public static Map> lag(StormTopology stormTopology, Map topologyConf) { Map> result = new HashMap<>(); Map spouts = stormTopology.get_spouts(); - for (Map.Entry spout: spouts.entrySet()) { + for (Map.Entry spout : spouts.entrySet()) { try { SpoutSpec spoutSpec = spout.getValue(); addLagResultForKafkaSpout(result, spout.getKey(), spoutSpec); @@ -90,7 +90,7 @@ private static List getCommandLineOptionsForNewKafkaSpout(Map jsonConf) { File file = null; Map extraProperties = new HashMap<>(); - for (Map.Entry conf: jsonConf.entrySet()) { + for (Map.Entry conf : jsonConf.entrySet()) { if (conf.getKey().startsWith(CONFIG_KEY_PREFIX) && !ALL_CONFIGS.contains(conf.getKey())) { extraProperties.put(conf.getKey().substring(CONFIG_KEY_PREFIX.length()), conf.getValue().toString()); } diff --git a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java index aabf7283c0d..15924b1edfd 100644 --- a/storm-server/src/main/java/org/apache/storm/DaemonConfig.java +++ b/storm-server/src/main/java/org/apache/storm/DaemonConfig.java @@ -1047,7 +1047,7 @@ public class DaemonConfig implements Validated { @IsImplementationOfClass(implementsClass = ResourceIsolationInterface.class) public static final String STORM_RESOURCE_ISOLATION_PLUGIN = "storm.resource.isolation.plugin"; - /** + /* * CGroup Setting below. */ diff --git a/storm-server/src/main/java/org/apache/storm/LocalCluster.java b/storm-server/src/main/java/org/apache/storm/LocalCluster.java index cd2dbb121c7..71ee890c6cc 100644 --- a/storm-server/src/main/java/org/apache/storm/LocalCluster.java +++ b/storm-server/src/main/java/org/apache/storm/LocalCluster.java @@ -856,33 +856,33 @@ public String getTrackedId() { @Override public void setLogConfig(String name, LogConfig config) throws TException { - // TODO Auto-generated method stub + // TODO: Auto-generated method stub throw new RuntimeException("NOT IMPLEMENTED YET"); } @Override public LogConfig getLogConfig(String name) throws TException { - // TODO Auto-generated method stub + // TODO: Auto-generated method stub throw new RuntimeException("NOT IMPLEMENTED YET"); } @Override public void debug(String name, String component, boolean enable, double samplingPercentage) throws NotAliveException, AuthorizationException, TException { - // TODO Auto-generated method stub + // TODO: Auto-generated method stub throw new RuntimeException("NOT IMPLEMENTED YET"); } @Override public void setWorkerProfiler(String id, ProfileRequest profileRequest) throws TException { - // TODO Auto-generated method stub + // TODO: Auto-generated method stub throw new RuntimeException("NOT IMPLEMENTED YET"); } @Override public List getComponentPendingProfileActions(String id, String componentId, ProfileAction action) throws TException { - // TODO Auto-generated method stub + // TODO: Auto-generated method stub throw new RuntimeException("NOT IMPLEMENTED YET"); } @@ -960,7 +960,7 @@ public int updateBlobReplication(String key, int replication) @Override public void createStateInZookeeper(String key) throws TException { - // TODO Auto-generated method stub + // TODO: Auto-generated method stub throw new RuntimeException("NOT IMPLEMENTED YET"); } @@ -987,7 +987,7 @@ public ByteBuffer downloadChunk(String id) throws AuthorizationException, TExcep @Override public String getNimbusConf() throws AuthorizationException, TException { - // TODO Auto-generated method stub + // TODO: Auto-generated method stub throw new RuntimeException("NOT IMPLEMENTED YET"); } @@ -1004,39 +1004,39 @@ public boolean isTopologyNameAllowed(String name) throws AuthorizationException, @Override public TopologyPageInfo getTopologyPageInfo(String id, String window, boolean isIncludeSys) throws NotAliveException, AuthorizationException, TException { - // TODO Auto-generated method stub + // TODO: Auto-generated method stub throw new RuntimeException("NOT IMPLEMENTED YET"); } @Override public SupervisorPageInfo getSupervisorPageInfo(String id, String host, boolean isIncludeSys) throws NotAliveException, AuthorizationException, TException { - // TODO Auto-generated method stub + // TODO: Auto-generated method stub throw new RuntimeException("NOT IMPLEMENTED YET"); } @Override public ComponentPageInfo getComponentPageInfo(String topologyId, String componentId, String window, boolean isIncludeSys) throws NotAliveException, AuthorizationException, TException { - // TODO Auto-generated method stub + // TODO: Auto-generated method stub throw new RuntimeException("NOT IMPLEMENTED YET"); } @Override public StormTopology getUserTopology(String id) throws NotAliveException, AuthorizationException, TException { - // TODO Auto-generated method stub + // TODO: Auto-generated method stub throw new RuntimeException("NOT IMPLEMENTED YET"); } @Override public TopologyHistoryInfo getTopologyHistory(String user) throws AuthorizationException, TException { - // TODO Auto-generated method stub + // TODO: Auto-generated method stub throw new RuntimeException("NOT IMPLEMENTED YET"); } @Override public List getOwnerResourceSummaries(String owner) throws AuthorizationException, TException { - // TODO Auto-generated method stub + // TODO: Auto-generated method stub throw new RuntimeException("NOT IMPLEMENTED YET"); } diff --git a/storm-server/src/main/java/org/apache/storm/ProcessSimulator.java b/storm-server/src/main/java/org/apache/storm/ProcessSimulator.java index e332d6e59d9..a647428e873 100644 --- a/storm-server/src/main/java/org/apache/storm/ProcessSimulator.java +++ b/storm-server/src/main/java/org/apache/storm/ProcessSimulator.java @@ -75,7 +75,7 @@ public static void killAllProcesses() { } else if (e instanceof RuntimeException) { throw e; } else { - //TODO once everything is in java this should not be possible any more + //TODO: once everything is in java this should not be possible any more throw new RuntimeException(e); } } diff --git a/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java b/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java index 3fcd5eedde7..5ec0b2357a4 100644 --- a/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java +++ b/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java @@ -133,11 +133,11 @@ private void setupBlobstore() throws AuthorizationException, KeyNotFoundExceptio keysToDelete.removeAll(activeKeys); NimbusInfo nimbusInfo = this.nimbusInfo; LOG.debug("Deleting keys not on the zookeeper {}", keysToDelete); - for (String toDelete: keysToDelete) { + for (String toDelete : keysToDelete) { store.deleteBlob(toDelete, NIMBUS_SUBJECT); } LOG.debug("Creating list of key entries for blobstore inside zookeeper {} local {}", activeKeys, activeLocalKeys); - for (String key: activeLocalKeys) { + for (String key : activeLocalKeys) { try { state.setupBlob(key, nimbusInfo, getVersionForKey(key, nimbusInfo, zkClient)); } catch (KeyNotFoundException e) { diff --git a/storm-server/src/main/java/org/apache/storm/container/cgroup/CgroupManager.java b/storm-server/src/main/java/org/apache/storm/container/cgroup/CgroupManager.java index d50c8d85725..cf8b4edc8a8 100644 --- a/storm-server/src/main/java/org/apache/storm/container/cgroup/CgroupManager.java +++ b/storm-server/src/main/java/org/apache/storm/container/cgroup/CgroupManager.java @@ -232,7 +232,7 @@ private void prefixNumaPinning(List command, String numaId) { command.add(2, "--membind=" + numaId); return; } else { - // TODO : Add support for pinning on Windows host + // TODO: Add support for pinning on Windows host throw new RuntimeException("numactl pinning currently not supported on non-Linux hosts"); } } diff --git a/storm-server/src/main/java/org/apache/storm/container/docker/DockerRunCommand.java b/storm-server/src/main/java/org/apache/storm/container/docker/DockerRunCommand.java index bbe1d1fe612..0e69dc9b6c5 100644 --- a/storm-server/src/main/java/org/apache/storm/container/docker/DockerRunCommand.java +++ b/storm-server/src/main/java/org/apache/storm/container/docker/DockerRunCommand.java @@ -121,7 +121,7 @@ public DockerRunCommand addAllReadWriteMountLocations(List paths) throws */ public DockerRunCommand addAllReadWriteMountLocations(List paths, boolean createSource) throws IOException { - for (String dir: paths) { + for (String dir : paths) { this.addReadWriteMountLocation(dir, dir, createSource); } return this; @@ -174,7 +174,7 @@ public DockerRunCommand addAllReadOnlyMountLocations(List paths) throws */ public DockerRunCommand addAllReadOnlyMountLocations(List paths, boolean createSource) throws IOException { - for (String dir: paths) { + for (String dir : paths) { this.addReadOnlyMountLocation(dir, dir, createSource); } return this; diff --git a/storm-server/src/main/java/org/apache/storm/container/oci/OciResourcesLocalizerInterface.java b/storm-server/src/main/java/org/apache/storm/container/oci/OciResourcesLocalizerInterface.java index d0e6275c1f9..e2527afb758 100644 --- a/storm-server/src/main/java/org/apache/storm/container/oci/OciResourcesLocalizerInterface.java +++ b/storm-server/src/main/java/org/apache/storm/container/oci/OciResourcesLocalizerInterface.java @@ -44,7 +44,7 @@ public interface OciResourcesLocalizerInterface { */ default List localize(List resourceList) throws IOException { List resourceLocalDsts = new ArrayList<>(); - for (OciResource resource: resourceList) { + for (OciResource resource : resourceList) { resourceLocalDsts.add(localize(resource)); } return resourceLocalDsts; diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java index d59a2527d21..103eb00b61a 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/Nimbus.java @@ -1141,7 +1141,6 @@ private static void addToSerializers(Map ser, List conf) } } - @SuppressWarnings("unchecked") /** * Create a normalized topology conf. * @@ -1149,6 +1148,7 @@ private static void addToSerializers(Map ser, List conf) * @param topoConf initial topology conf * @param topology the Storm topology */ + @SuppressWarnings("unchecked") static Map normalizeConf(Map conf, Map topoConf, StormTopology topology) { // clear any values from the topoConf that it should not be setting. @@ -1430,7 +1430,7 @@ public void launchServer() throws Exception { leaderElector.addToLeaderLockQueue(); this.blobStore.startSyncBlobs(); - for (ClusterMetricsConsumerExecutor exec: clusterConsumerExceutors) { + for (ClusterMetricsConsumerExecutor exec : clusterConsumerExceutors) { exec.prepare(); } @@ -3530,7 +3530,7 @@ public void rebalance(String topoName, RebalanceOptions options) comps.addAll(stormTopology.get_spouts().keySet()); comps.addAll(stormTopology.get_bolts().keySet()); Map execOverrides = options.is_set_num_executors() ? options.get_num_executors() : Collections.emptyMap(); - for (Map.Entry e: execOverrides.entrySet()) { + for (Map.Entry e : execOverrides.entrySet()) { String comp = e.getKey(); // validate non-system component ids if (!Utils.isSystemId(comp) && !comps.contains(comp)) { diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java index 1807e97fe20..b4fec0bb631 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java @@ -161,7 +161,7 @@ private Map mkSupervisorCapacities(Map conf) { public void run() { Map validatedNumaMap = SupervisorUtils.getNumaMap(conf); Map supervisorInfoList = buildSupervisorInfo(conf, supervisor, validatedNumaMap); - for (Map.Entry supervisorInfoEntry: supervisorInfoList.entrySet()) { + for (Map.Entry supervisorInfoEntry : supervisorInfoList.entrySet()) { stormClusterState.supervisorHeartbeat(supervisorInfoEntry.getKey(), supervisorInfoEntry.getValue()); } } diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbMetricsWriter.java b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbMetricsWriter.java index 2c0142310fa..029c51652fd 100644 --- a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbMetricsWriter.java +++ b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbMetricsWriter.java @@ -31,10 +31,11 @@ import org.slf4j.LoggerFactory; /** + *

* Class designed to perform all metrics inserts into RocksDB. Metrics are processed from a blocking queue. Inserts * to RocksDB are done using a single thread to simplify design (such as looking up existing metric data for aggregation, * and fetching/evicting metadata from the cache). This class is not thread safe. - *

+ *

* A writable LRU StringMetadataCache is used to minimize looking up metadata string Ids. As entries are added to the full cache, older * entries are evicted from the cache and need to be written to the database. This happens as the handleEvictedMetadata() * method callback. diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java index 89d6849e93a..1a8879d61d5 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/Cluster.java @@ -515,8 +515,8 @@ public int getAssignedNumWorkers(TopologyDetails topology) { @Override public NormalizedResourceOffer getAvailableResources(SupervisorDetails sd) { NormalizedResourceOffer ret = new NormalizedResourceOffer(sd.getTotalResources()); - for (SchedulerAssignment assignment: assignments.values()) { - for (Entry entry: assignment.getScheduledResources().entrySet()) { + for (SchedulerAssignment assignment : assignments.values()) { + for (Entry entry : assignment.getScheduledResources().entrySet()) { if (sd.getId().equals(entry.getKey().getNodeId())) { ret.remove(entry.getValue(), getResourceMetrics()); } diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java b/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java index cc3561e39ee..e530a45e651 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/ISchedulingState.java @@ -366,7 +366,7 @@ default Map getHostToRack() { default Set getAssignedRacks(String... topologyIds) { Set ret = new HashSet<>(); Map networkTopographyInverted = getHostToRack(); - for (String topologyId: topologyIds) { + for (String topologyId : topologyIds) { SchedulerAssignment assignment = getAssignmentById(topologyId); if (assignment == null) { continue; diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/FIFOSchedulingPriorityStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/FIFOSchedulingPriorityStrategy.java index 29a05844de9..849ffac1d35 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/FIFOSchedulingPriorityStrategy.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/priority/FIFOSchedulingPriorityStrategy.java @@ -43,7 +43,7 @@ public double getScore(double availableCpu, double availableMemory) { return origScore; } //Not enough guaranteed use the age of the topology instead. - //TODO need a good way to only do this once... + //TODO: need a good way to only do this once... Collections.sort(tds, new TopologyBySubmissionTimeComparator()); td = getNextHighest(); if (td != null) { diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java index 5f12557fe87..e686e1a093f 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.java @@ -435,14 +435,14 @@ protected SchedulingResult scheduleExecutorsOnNodes(List ordere RasNode[] nodeForExec = new RasNode[maxExecCnt]; WorkerSlot[] workerSlotForExec = new WorkerSlot[maxExecCnt]; - for (int i = 0; i < maxExecCnt ; i++) { + for (int i = 0; i < maxExecCnt; i++) { progressIdxForExec[i] = -1; } LOG.debug("scheduleExecutorsOnNodes: will assign {} executors for topo {}, sortNodesForEachExecutor={}", maxExecCnt, topoName, sortNodesForEachExecutor); OUTERMOST_LOOP: - for (int loopCnt = 0 ; true ; loopCnt++) { + for (int loopCnt = 0; true; loopCnt++) { LOG.debug("scheduleExecutorsOnNodes: loopCnt={}, execIndex={}, topo={}", loopCnt, searcherState.getExecIndex(), topoName); if (searcherState.areSearchLimitsExceeded()) { LOG.warn("Limits exceeded, backtrackCnt={}, loopCnt={}, topo={}", searcherState.getNumBacktrack(), loopCnt, topoName); diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverConfig.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverConfig.java index 84dca6bc5a3..041fc375692 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverConfig.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/ConstraintSolverConfig.java @@ -119,7 +119,7 @@ private void computeComponentConstraints() { } List list; list = (constraint instanceof String) ? Arrays.asList((String) constraint) : (List) constraint; - for (String comp2: list) { + for (String comp2 : list) { if (!comps.contains(comp2)) { LOG.warn("TopoId {}: {} {} declared for Comp {} is not a valid Comp", topoId, ctype, comp2, comp1); continue; diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/RoundRobinResourceAwareStrategy.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/RoundRobinResourceAwareStrategy.java index c0bbcc3fcec..1bb5a4bf502 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/RoundRobinResourceAwareStrategy.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/RoundRobinResourceAwareStrategy.java @@ -96,7 +96,7 @@ protected SchedulingResult scheduleExecutorsOnNodes(List ordere searcherState.setSortedExecs(orderedExecutors); OUTERMOST_LOOP: - for (int loopCnt = 0 ; true ; loopCnt++) { + for (int loopCnt = 0; true; loopCnt++) { LOG.debug("scheduleExecutorsOnNodes: loopCnt={}, execIndex={}, topo={}, nodeSortCnt={}", loopCnt, searcherState.getExecIndex(), topoName, nodeSortCnt); if (searcherState.areSearchLimitsExceeded()) { diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/SchedulingSearcherState.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/SchedulingSearcherState.java index 13b11fcaf57..6d65a3e437a 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/SchedulingSearcherState.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/SchedulingSearcherState.java @@ -417,7 +417,7 @@ public void logNodeCompAssignments() { StringBuffer sb = new StringBuffer(); int cntAllNodes = 0; int cntFilledNodes = 0; - for (RasNode node: new TreeSet<>(nodeCompAssignmentCnts.keySet())) { + for (RasNode node : new TreeSet<>(nodeCompAssignmentCnts.keySet())) { cntAllNodes++; Map oneMap = nodeCompAssignmentCnts.get(node); if (oneMap.isEmpty()) { diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/ExecSorterByProximity.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/ExecSorterByProximity.java index 6aa3b605c4d..788eb1d997c 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/ExecSorterByProximity.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/ExecSorterByProximity.java @@ -71,7 +71,7 @@ public List sortExecutors(Set unassignedExecut List sortedComponents = topologicalSortComponents(componentMap); - for (Component currComp: sortedComponents) { + for (Component currComp : sortedComponents) { int numExecs = compToExecsToSchedule.get(currComp.getId()).size(); for (int i = 0; i < numExecs; i++) { orderedExecutorSet.addAll(takeExecutors(currComp, componentMap, compToExecsToSchedule)); @@ -160,7 +160,7 @@ private List takeExecutors(Component currComp, } execsScheduled.add(currQueue.poll()); Set sortedChildren = getSortedChildren(currComp, componentMap); - for (String childId: sortedChildren) { + for (String childId : sortedChildren) { Component childComponent = componentMap.get(childId); Queue childQueue = compToExecsToSchedule.get(childId); int childUnscheduledNumExecs = childQueue.size(); @@ -200,7 +200,7 @@ private Set getSortedChildren(Component component, final Map inputEntry: child.getInputs().entrySet()) { + for (Map.Entry inputEntry : child.getInputs().entrySet()) { GlobalStreamId globalStreamId = inputEntry.getKey(); Grouping grouping = inputEntry.getValue(); if (globalStreamId.get_componentId().equals(parent.getId()) diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorter.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorter.java index bff2394e685..0286a8a0f3d 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorter.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorter.java @@ -89,7 +89,7 @@ public NodeSorter(Cluster cluster, TopologyDetails topologyDetails, BaseResource networkTopography = cluster.getNetworkTopography(); Map hostToRack = cluster.getHostToRack(); RasNodes nodes = new RasNodes(cluster); - for (RasNode node: nodes.getNodes()) { + for (RasNode node : nodes.getNodes()) { String superId = node.getId(); String hostName = node.getHostname(); if (!node.isAlive() || hostName == null) { @@ -397,7 +397,7 @@ protected List makeHostToNodeIds(List hosts) { return Collections.emptyList(); } List ret = new ArrayList<>(hosts.size()); - for (String host: hosts) { + for (String host : hosts) { List nodes = hostnameToNodes.get(host); if (nodes != null) { for (RasNode node : nodes) { diff --git a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorterHostProximity.java b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorterHostProximity.java index c5acc991184..b3552746908 100644 --- a/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorterHostProximity.java +++ b/storm-server/src/main/java/org/apache/storm/scheduler/resource/strategies/scheduling/sorter/NodeSorterHostProximity.java @@ -93,7 +93,7 @@ public NodeSorterHostProximity(Cluster cluster, TopologyDetails topologyDetails, greyListedSupervisorIds = cluster.getGreyListedSupervisors(); Map hostToRack = cluster.getHostToRack(); RasNodes nodes = new RasNodes(cluster); - for (RasNode node: nodes.getNodes()) { + for (RasNode node : nodes.getNodes()) { String superId = node.getId(); String hostName = node.getHostname(); if (!node.isAlive() || hostName == null) { @@ -462,7 +462,7 @@ protected List makeHostToNodeIds(List hosts) { return Collections.emptyList(); } List ret = new ArrayList<>(hosts.size()); - for (String host: hosts) { + for (String host : hosts) { List nodes = hostnameToNodes.get(host); if (nodes != null) { for (RasNode node : nodes) { diff --git a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java index 1c3319d5419..f8d81c9b86b 100644 --- a/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java +++ b/storm-server/src/main/java/org/apache/storm/utils/ServerUtils.java @@ -930,7 +930,7 @@ public static boolean isAnyProcessAlive(Collection pids, int uid) throws I */ private static boolean isAnyWindowsProcessAlive(Collection pids, String user) throws IOException { List unexpectedUsers = new ArrayList<>(); - for (Long pid: pids) { + for (Long pid : pids) { List cmdArgs = new ArrayList<>(); cmdArgs.add("tasklist"); cmdArgs.add("/fo"); @@ -1270,7 +1270,7 @@ public static boolean isAnyPosixProcessPidDirAlive(Collection pids, String throw new IOException("Missing process directory " + procDir.getAbsolutePath() + ": method not supported on " + "os.name=" + System.getProperty("os.name")); } - for (long pid: pids) { + for (long pid : pids) { File pidDir = new File(procDir, String.valueOf(pid)); if (!pidDir.exists()) { continue; diff --git a/storm-submit-tools/src/main/java/org/apache/storm/submit/dependency/AetherUtils.java b/storm-submit-tools/src/main/java/org/apache/storm/submit/dependency/AetherUtils.java index 395fa3d853f..ad9a1a3f353 100644 --- a/storm-submit-tools/src/main/java/org/apache/storm/submit/dependency/AetherUtils.java +++ b/storm-submit-tools/src/main/java/org/apache/storm/submit/dependency/AetherUtils.java @@ -46,7 +46,7 @@ private AetherUtils() { public static Dependency parseDependency(String dependency) { List dependencyAndExclusions = Arrays.asList(dependency.split("\\^")); Collection exclusions = new ArrayList<>(); - for (int idx = 1 ; idx < dependencyAndExclusions.size() ; idx++) { + for (int idx = 1; idx < dependencyAndExclusions.size(); idx++) { exclusions.add(AetherUtils.createExclusion(dependencyAndExclusions.get(idx))); } diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/DRPCServer.java b/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/DRPCServer.java index b643b396e61..837dee00ed2 100644 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/DRPCServer.java +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/DRPCServer.java @@ -55,7 +55,7 @@ public class DRPCServer implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(DRPCServer.class); private final Meter meterShutdownCalls; - //TODO in the future this might be better in a common webapp location + //TODO: in the future this might be better in a common webapp location /** * Add a request context filter to the Servlet Context Handler. @@ -106,7 +106,7 @@ private static Server mkHttpServer(StormMetricsRegistry metricsRegistry, Map body = new HashMap<>(); - //TODO I would love to standardize this... + //TODO: I would love to standardize this... body.put("error", ex.is_set_type() ? ex.get_type().toString() : "Internal Error"); body.put("errorMessage", ex.get_msg()); return builder.entity(JSONValue.toJSONString(body)).type("application/json").build(); diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCResource.java b/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCResource.java index 0621e8a1f60..7d2ba133a69 100644 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCResource.java +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/drpc/webapp/DRPCResource.java @@ -44,8 +44,8 @@ public DRPCResource(DRPC drpc, StormMetricsRegistry metricsRegistry) { this.responseDuration = metricsRegistry.registerTimer("drpc:HTTP-request-response-duration"); } - //TODO put in some better exception mapping... - //TODO move populateContext to a filter... + //TODO: put in some better exception mapping... + //TODO: move populateContext to a filter... @POST @Path("/{func}") public String post(@PathParam("func") String func, String args, @Context HttpServletRequest request) throws Exception { diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java index 0a098f27849..1ef42d9e7a7 100644 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/LogviewerServer.java @@ -142,7 +142,7 @@ void awaitTermination() throws InterruptedException { @Override public synchronized void close() { if (!closed) { - //TODO this is causing issues... + //TODO: this is causing issues... //if (httpServer != null) { // httpServer.destroy(); //} diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java index d96a1d608b6..43319ce613e 100644 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java @@ -245,7 +245,7 @@ private int getWorkerLogTimeout(Map conf, String topologyId, int public SortedSet getLogDirs(Set logDirs, Predicate predicate) { // we could also make this static, but not to do it due to mock TreeSet ret = new TreeSet<>(); - for (Path logDir: logDirs) { + for (Path logDir : logDirs) { String workerId = ""; try { Optional metaFile = getMetadataFileForWorkerLogDir(logDir); diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java b/storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java index 7ebb39cc347..2e4f64f2f1b 100644 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/ui/UIHelpers.java @@ -541,7 +541,7 @@ public static Response makeStandardResponse( String body = getJsonResponseBody(data, callback, needsSerialization); Response.ResponseBuilder responseBuilder = Response.status(status).entity(body); Map headers = getJsonResponseHeaders(callback, null); - for (Map.Entry headerEntry: headers.entrySet()) { + for (Map.Entry headerEntry : headers.entrySet()) { responseBuilder.header(headerEntry.getKey(), headerEntry.getValue()); } return responseBuilder.build(); @@ -2106,7 +2106,7 @@ public static Map getComponentPage( } result.put("user", user); - result.put("id" , component); + result.put("id", component); result.put("encodedId", Utils.urlEncodeUtf8(component)); result.put("name", componentPageInfo.get_topology_name()); result.put("executors", componentPageInfo.get_num_executors()); @@ -2307,7 +2307,7 @@ public static Map getTopologyProfilingStart(Nimbus.Iface client, String hostPort, String timeout, Map config) throws TException { setTopologyProfilingAction( - client, id , hostPort, System.currentTimeMillis() + (Long.valueOf(timeout) * 60_000), + client, id, hostPort, System.currentTimeMillis() + (Long.valueOf(timeout) * 60_000), config, ProfileAction.JPROFILE_STOP); Map result = new HashMap(); String host = hostPort.split(":")[0]; @@ -2331,7 +2331,7 @@ public static Map getTopologyProfilingStart(Nimbus.Iface client, public static Map getTopologyProfilingStop(Nimbus.Iface client, String id, String hostPort, Map config) throws TException { - setTopologyProfilingAction(client, id , hostPort, 0L, config, ProfileAction.JPROFILE_STOP); + setTopologyProfilingAction(client, id, hostPort, 0L, config, ProfileAction.JPROFILE_STOP); Map result = new HashMap(); result.put("status", "ok"); result.put("id", hostPort); @@ -2361,7 +2361,7 @@ public static Map getProfilingDisabled() { public static Map getTopologyProfilingDump(Nimbus.Iface client, String id, String hostPort, Map config) throws TException { setTopologyProfilingAction( - client, id , hostPort, System.currentTimeMillis(), + client, id, hostPort, System.currentTimeMillis(), config, ProfileAction.JPROFILE_DUMP ); Map result = new HashMap(); @@ -2374,7 +2374,7 @@ public static Map getTopologyProfilingDumpJstack(Nimbus.Iface cl String hostPort, Map config) throws TException { setTopologyProfilingAction( - client, id , hostPort, System.currentTimeMillis(), config, ProfileAction.JSTACK_DUMP + client, id, hostPort, System.currentTimeMillis(), config, ProfileAction.JSTACK_DUMP ); Map result = new HashMap(); result.put("status", "ok"); @@ -2395,7 +2395,7 @@ public static Map getTopologyProfilingRestartWorker(Nimbus.Iface String id, String hostPort, Map config) throws TException { setTopologyProfilingAction( - client, id , hostPort, System.currentTimeMillis(), config, ProfileAction.JVM_RESTART + client, id, hostPort, System.currentTimeMillis(), config, ProfileAction.JVM_RESTART ); Map result = new HashMap(); result.put("status", "ok"); @@ -2414,7 +2414,7 @@ public static Map getTopologyProfilingRestartWorker(Nimbus.Iface */ public static Map getTopologyProfilingDumpHeap(Nimbus.Iface client, String id, String hostPort, Map config) throws TException { - setTopologyProfilingAction(client, id , hostPort, System.currentTimeMillis(), config, ProfileAction.JMAP_DUMP); + setTopologyProfilingAction(client, id, hostPort, System.currentTimeMillis(), config, ProfileAction.JMAP_DUMP); Map result = new HashMap(); result.put("status", "ok"); result.put("id", hostPort);