Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ static TopologyLoadConf captureTopology(Nimbus.Iface client, TopologySummary top

Map<String, Object> savedTopoConf = new HashMap<>();
Map<String, Object> topoConf = (Map<String, Object>) JSONValue.parse(client.getTopologyConf(topologyId));
for (String key: TopologyLoadConf.IMPORTANT_CONF_KEYS) {
for (String key : TopologyLoadConf.IMPORTANT_CONF_KEYS) {
Object o = topoConf.get(key);
if (o != null) {
savedTopoConf.put(key, o);
Expand Down Expand Up @@ -153,7 +153,7 @@ static TopologyLoadConf captureTopology(Nimbus.Iface client, TopologySummary top
}

Map<String, Map<String, Double>> boltResources = getBoltsResources(topo, topoConf);
for (Map.Entry<String, Map<String, Double>> entry: boltResources.entrySet()) {
for (Map.Entry<String, Map<String, Double>> entry : boltResources.entrySet()) {
LoadCompConf.Builder bd = boltBuilders.get(entry.getKey());
if (bd != null) {
Map<String, Double> resources = entry.getValue();
Expand Down Expand Up @@ -194,7 +194,7 @@ static TopologyLoadConf captureTopology(Nimbus.Iface client, TopologySummary top
}

Map<String, Map<String, Double>> spoutResources = getSpoutsResources(topo, topoConf);
for (Map.Entry<String, Map<String, Double>> entry: spoutResources.entrySet()) {
for (Map.Entry<String, Map<String, Double>> entry : spoutResources.entrySet()) {
LoadCompConf.Builder sd = spoutBuilders.get(entry.getKey());
if (sd != null) {
Map<String, Double> resources = entry.getValue();
Expand All @@ -212,7 +212,7 @@ static TopologyLoadConf captureTopology(Nimbus.Iface client, TopologySummary top

//Stats...
Map<String, List<ExecutorSummary>> byComponent = new HashMap<>();
for (ExecutorSummary executor: info.get_executors()) {
for (ExecutorSummary executor : info.get_executors()) {
String component = executor.get_component_id();
List<ExecutorSummary> list = byComponent.get(component);
if (list == null) {
Expand Down Expand Up @@ -250,7 +250,7 @@ static TopologyLoadConf captureTopology(Nimbus.Iface client, TopologySummary top
List<Double> emittedRate = new ArrayList<>();
List<ExecutorSummary> summaries = byComponent.get(id.get_componentId());
if (summaries != null) {
for (ExecutorSummary summary: summaries) {
for (ExecutorSummary summary : summaries) {
if (summary.is_set_stats()) {
int uptime = summary.get_uptime_secs();
LOG.debug("UPTIME {}", uptime);
Expand Down Expand Up @@ -349,7 +349,7 @@ public static void main(String[] args) throws Exception {
Nimbus.Iface client = nc.getClient();
List<String> topologyNames = cmd.getArgList();

for (TopologySummary topologySummary: client.getTopologySummaries()) {
for (TopologySummary topologySummary : client.getTopologySummaries()) {
if (topologyNames.isEmpty() || topologyNames.contains(topologySummary.get_name())) {
TopologyLoadConf capturedConf = captureTopology(client, topologySummary);
if (cmd.hasOption('a')) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public static void main(String[] args) throws Exception {
Nimbus.Iface client = nc.getClient();
List<String> topologyNames = cmd.getArgList();

for (TopologySummary topologySummary: client.getTopologySummaries()) {
for (TopologySummary topologySummary : client.getTopologySummaries()) {
if (topologyNames.isEmpty() || topologyNames.contains(topologySummary.get_name())) {
TopologyLoadConf capturedConf = CaptureLoad.captureTopology(client, topologySummary);
if (capturedConf.looksLikeTrident()) {
Expand All @@ -89,10 +89,10 @@ public static void main(String[] args) throws Exception {
}

System.out.println("TOPOLOGY\tTOTAL MESSAGES/sec\tESTIMATED INPUT MESSAGES/sec");
for (TopologyLoadConf tl: regular) {
for (TopologyLoadConf tl : regular) {
System.out.println(tl.name + "\t" + tl.getAllEmittedAggregate() + "\t" + tl.getSpoutEmittedAggregate());
}
for (TopologyLoadConf tl: trident) {
for (TopologyLoadConf tl : trident) {
System.out.println(tl.name + "\t" + tl.getAllEmittedAggregate() + "\t" + tl.getTridentEstimatedEmittedAggregate());
}
exitStatus = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void prepare(Map<String, Object> topoConf, TopologyContext context, Outpu
}

private void emitTuples(Tuple input) {
for (OutputStreamEngine se: outputStreams) {
for (OutputStreamEngine se : outputStreams) {
// we may output many tuples for a given input tuple
while (se.shouldEmit() != null) {
collector.emit(se.streamName, input, new Values(se.nextKey(), "SOME-BOLT-VALUE"));
Expand All @@ -87,7 +87,7 @@ public void execute(final Tuple input) {

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
for (OutputStream s: outputStreamStats) {
for (OutputStream s : outputStreamStats) {
declarer.declareStream(s.id, new Fields("key", "value"));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public static LoadCompConf fromConf(Map<String, Object> conf) {
List<OutputStream> streams = new ArrayList<>();
List<Map<String, Object>> streamData = (List<Map<String, Object>>) conf.get("streams");
if (streamData != null) {
for (Map<String, Object> streamInfo: streamData) {
for (Map<String, Object> streamInfo : streamData) {
streams.add(OutputStream.fromConf(streamInfo));
}
}
Expand Down Expand Up @@ -161,7 +161,7 @@ public LoadCompConf overrideSlowExecutorPattern(SlowExecutorPattern slp) {
public double getAllEmittedAggregate() {
double ret = 0;
if (streams != null) {
for (OutputStream out: streams) {
for (OutputStream out : streams) {
if (out.rate != null) {
ret += out.rate.mean * parallelism;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ static Measurements combine(List<Measurements> measurements, Integer start, Inte
count = Math.min(count, measurements.size() - start);

Measurements ret = new Measurements();
for (int i = start; i < start + count; i ++) {
for (int i = start; i < start + count; i++) {
ret.add(measurements.get(i));
}
return ret;
Expand Down Expand Up @@ -534,7 +534,7 @@ abstract static class ColumnsFileReporter extends FileReporter {
if (query.containsKey("extraColumns")) {
List<String> moreExtractors =
handleExtractorCleanup(Arrays.asList(query.get("extraColumns").split("\\s*,\\s*")));
for (String extractor: moreExtractors) {
for (String extractor : moreExtractors) {
if (!allExtractors.containsKey(extractor)) {
throw new IllegalArgumentException(extractor + " is not a supported column");
}
Expand All @@ -557,7 +557,7 @@ abstract static class ColumnsFileReporter extends FileReporter {
protected List<String> handleExtractorCleanup(List<String> orig) {
Map<String, Object> stormConfig = Utils.readStormConfig();
List<String> ret = new ArrayList<>(orig.size());
for (String extractor: orig) {
for (String extractor : orig) {
if (extractor.startsWith("conf:")) {
String confKey = extractor.substring("conf:".length());
Object confValue = stormConfig.get(confKey);
Expand Down Expand Up @@ -617,7 +617,7 @@ protected String format(Object o) {
@Override
public void start() {
boolean first = true;
for (String name: extractors) {
for (String name : extractors) {
if (!first) {
out.print(" ");
}
Expand All @@ -634,7 +634,7 @@ public void start() {
@Override
public void reportWindow(Measurements m, List<Measurements> allTime) {
boolean first = true;
for (String name: extractors) {
for (String name : extractors) {
if (!first) {
out.print(" ");
}
Expand All @@ -661,7 +661,7 @@ static class SepValReporter extends ColumnsFileReporter {
@Override
public void start() {
boolean first = true;
for (String name: extractors) {
for (String name : extractors) {
if (!first) {
out.print(separator);
}
Expand All @@ -678,7 +678,7 @@ public void start() {
@Override
public void reportWindow(Measurements m, List<Measurements> allTime) {
boolean first = true;
for (String name: extractors) {
for (String name : extractors) {
if (!first) {
out.print(separator);
}
Expand Down Expand Up @@ -812,7 +812,7 @@ public static void addCommandLineOptions(Options options) {
FileNotFoundException {
super(conf);
Map<String, MetricExtractor> allExtractors = new LinkedHashMap<>(NAMED_EXTRACTORS);
for (Map.Entry<String, Object> entry: parameterMetrics.entrySet()) {
for (Map.Entry<String, Object> entry : parameterMetrics.entrySet()) {
final Object value = entry.getValue();
allExtractors.put(entry.getKey(), new MetricExtractor((m, unit) -> value, ""));
}
Expand All @@ -826,7 +826,7 @@ public static void addCommandLineOptions(Options options) {
}
reporters = new ArrayList<>();
if (commandLine.hasOption("reporter")) {
for (String reporterString: commandLine.getOptionValues("reporter")) {
for (String reporterString : commandLine.getOptionValues("reporter")) {
Matcher m = REPORTER_PATTERN.matcher(reporterString);
if (!m.matches()) {
throw new IllegalArgumentException(reporterString + " does not look like it is a reporter");
Expand Down Expand Up @@ -878,20 +878,20 @@ public static void addCommandLineOptions(Options options) {

private long readMemory() {
long total = 0;
for (MemMeasure mem: memoryBytes.values()) {
for (MemMeasure mem : memoryBytes.values()) {
total += mem.get();
}
return total;
}

private void startMetricsOutput() {
for (MetricResultsReporter reporter: reporters) {
for (MetricResultsReporter reporter : reporters) {
reporter.start();
}
}

private void finishMetricsOutput() throws Exception {
for (MetricResultsReporter reporter: reporters) {
for (MetricResultsReporter reporter : reporters) {
reporter.finish(allCombined);
}
}
Expand Down Expand Up @@ -923,7 +923,7 @@ private void outputMetrics(Nimbus.Iface client, Collection<String> names) throws
long failed = 0;
double totalLatMs = 0;
long totalLatCount = 0;
for (String name: names) {
for (String name : names) {
TopologyInfo info = client.getTopologyInfoByName(name);
ids.add(info.get_id());
@SuppressWarnings("checkstyle:VariableDeclarationUsageDistance")
Expand Down Expand Up @@ -984,7 +984,7 @@ private void outputMetrics(Nimbus.Iface client, Collection<String> names) throws
ids, workers.size(), executors, hosts.size(), congested.getAndSet(new ConcurrentHashMap<>()), skippedMaxSpout,
totalLatMs / totalLatCount));
Measurements inWindow = Measurements.combine(allCombined, null, windowLength);
for (MetricResultsReporter reporter: reporters) {
for (MetricResultsReporter reporter : reporters) {
reporter.reportWindow(inWindow, allCombined);
}
}
Expand All @@ -993,7 +993,7 @@ private void outputMetrics(Nimbus.Iface client, Collection<String> names) throws
@SuppressWarnings("unchecked")
public void handle(IMetricsConsumer.TaskInfo taskInfo, Collection<IMetricsConsumer.DataPoint> dataPoints, String topologyId) {
String worker = taskInfo.srcWorkerHost + ":" + taskInfo.srcWorkerPort;
for (IMetricsConsumer.DataPoint dp: dataPoints) {
for (IMetricsConsumer.DataPoint dp : dataPoints) {
if (dp.name.startsWith("comp-lat-histo") && dp.value instanceof Histogram) {
synchronized (histo) {
histo.add((Histogram) dp.value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ private static class OutputStreamEngineWithHisto extends OutputStreamEngine {
OutputStreamEngineWithHisto(OutputStream stats, TopologyContext context) {
super(stats);
histogram = new HistogramMetric(3600000000000L, 3);
//TODO perhaps we can adjust the frequency later...
//TODO: perhaps we can adjust the frequency later...
context.registerMetric("comp-lat-histo-" + stats.id, histogram, 10);
}
}
Expand Down Expand Up @@ -127,7 +127,7 @@ protected Values getNextValues(OutputStreamEngine se) {

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
for (OutputStream s: streamStats) {
for (OutputStream s : streamStats) {
declarer.declareStream(s.id, new Fields("key", "value"));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,14 @@ public NormalDistStats(List<Double> values) {
double max = values.isEmpty() ? 0.0 : values.get(0);
double sum = 0.0;
long count = values.size();
for (Double v: values) {
for (Double v : values) {
sum += v;
min = Math.min(min, v);
max = Math.max(max, v);
}
double mean = sum / Math.max(count, 1);
double sdPartial = 0;
for (Double v: values) {
for (Double v : values) {
sdPartial += Math.pow(v - mean, 2);
}
double stddev = 0.0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public String nextKey() {
if (stats.areKeysSkewed) {
//We set the stddev of the skewed keys to be 1/5 of the length, but then we use the absolute value
// of that so everything is skewed towards 0
keyIndex = Math.min(KEYS.length - 1 , Math.abs((int) (rand.nextGaussian() * KEYS.length / 5)));
keyIndex = Math.min(KEYS.length - 1, Math.abs((int) (rand.nextGaussian() * KEYS.length / 5)));
} else {
keyIndex = rand.nextInt(KEYS.length);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,9 @@ public void prepare(Map<String, Object> stormConf,

@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
sleep.simulateProcessAndExecTime(executorIndex, Time.nanoTime(), null , () -> {
sleep.simulateProcessAndExecTime(executorIndex, Time.nanoTime(), null, () -> {
String sentence = tuple.getString(0);
for (String word: sentence.split("\\s+")) {
for (String word : sentence.split("\\s+")) {
collector.emit(new Values(word, 1));
}
});
Expand Down
Loading
Loading