Commit 7eca329e authored by Matteo Melli's avatar Matteo Melli
Browse files

Refactor to use serializer and UnsignerLong instead of BigInteger

parent da2b7502
......@@ -21,166 +21,83 @@ package com.ongres.pgio.main;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Streams;
import com.ongres.pgio.main.config.Config;
import com.ongres.pgio.main.stats.GroupInfo;
import com.ongres.pgio.main.stats.GroupStat;
import com.ongres.pgio.main.stats.OtherGroupInfo;
import com.ongres.pgio.main.stats.OtherStat;
import com.ongres.pgio.main.stats.ProcessGroupInfo;
import com.ongres.pgio.main.stats.ProcessGroupStat;
import com.ongres.pgio.main.stats.ProcessInfo;
import com.ongres.pgio.main.stats.ProcessStat;
import com.ongres.pgio.main.stats.ProcessStatAccumulator;
import com.ongres.pgio.main.stats.SystemIo;
import com.ongres.pgio.main.stats.SystemStat;
import com.ongres.pgio.main.stats.parser.ProcessInfoParser;
import com.ongres.pgio.main.stats.parser.ProcessIoParser;
import com.ongres.pgio.main.stats.parser.SystemIoParser;
import com.ongres.pgio.main.stats.StatProcessor;
import com.ongres.pgio.main.stats.StatSnapshot;
import com.ongres.pgio.main.stats.serializer.CsvSerializer;
import com.ongres.pgio.main.stats.serializer.PrometheusSerializer;
import com.ongres.pgio.main.stats.serializer.StatSerializer;
import com.ongres.pgio.main.version.Version;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Paths;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.json.Json;
import javax.json.JsonString;
public class Main {
public static void main(String[] args) throws Exception {
OptionParser parser = createOptionParser();
OptionSet options = parser.parse(args);
if (options.has("help")) {
parser.printHelpOn(System.out);
System.exit(0);
}
if (options.has("version")) {
System.out.println(Version.getVersion());
System.exit(0);
}
Config config = configOptionSet(options);
SystemIoParser systemIoParser = new SystemIoParser();
if (config.isPrintHeader() && !config.isPrometheusFormat()) {
System.out.println("timestamp,pid,ppid,label,rchar,wchar,"
+ "read_bytes,write_bytes,cancelled_write_bytes");
}
Instant start = Instant.now();
Map<Integer, ProcessStat> stats = new HashMap<>();
Optional<SystemStat> systemStat = Optional.empty();
while (true) {
final Map<Integer, ProcessStat> previousStats = new HashMap<>(stats);
Map<Integer, ProcessInfo> processInfoMap = Arrays.asList(new File("/proc").listFiles())
.stream()
.filter(file -> IntStream.range(0, file.getName().length())
.allMatch(i -> file.getName().charAt(i) >= '0' && file.getName().charAt(i) <= '9'))
.filter(file -> file.canRead())
.filter(file -> file.isDirectory())
.filter(file -> file.listFiles(entry -> entry.getName().equals("cmdline")).length == 1)
.filter(file -> file.listFiles(entry -> entry.getName().equals("io")).length == 1)
.map(file -> file.getName())
.map(name -> Integer.parseInt(name))
.map(pid -> getPreviousOrParseInfo(previousStats, pid))
.filter(optionalInfo -> optionalInfo.isPresent())
.map(optionalInfo -> optionalInfo.get())
.collect(Collectors.toMap(info -> info.getPid(), info -> info));
try {
OptionParser parser = createOptionParser();
List<ProcessInfo> filteredProcessInfoList = processInfoMap
.values()
.stream()
.filter(info -> config.getPpid()
.map(ppid -> info.getPid() == ppid || info.isChildOf(ppid, processInfoMap))
.orElse(true))
.collect(Collectors.toList());
OptionSet options = parser.parse(args);
stats = filteredProcessInfoList
.stream()
.map(info -> getPreviousOrParseIo(previousStats, info))
.filter(optionalStat -> optionalStat.isPresent())
.map(optionalStat -> optionalStat.get())
.collect(Collectors.toMap(entry -> entry.getInfo().getPid(), entry -> entry));
if (options.has("help")) {
parser.printHelpOn(System.out);
System.exit(0);
}
if (config.getProcessGroups().isPresent()) {
Map<Integer, ProcessStat> currentStats = stats;
Consumer<GroupStat> serializer;
if (config.isPrometheusFormat()) {
serializer = stat -> stat.serializeForPrometheus(System.out);
} else {
serializer = stat -> stat.serialize(System.out);
}
Streams.concat(
config.getProcessGroups().get().stream(),
Stream.of(new OtherGroupInfo(config.getProcessGroups().get())))
.map(info -> extractGroupStat(info, currentStats))
.forEach(stat -> serializer.accept(stat));
} else {
Consumer<ProcessStat> serializer;
if (config.isPrometheusFormat()) {
serializer = stat -> stat.serializeForPrometheus(System.out);
} else {
serializer = stat -> stat.serialize(System.out);
}
stats
.values()
.forEach(stat -> serializer.accept(stat));
if (options.has("version")) {
System.out.println(Version.getVersion());
System.exit(0);
}
SystemIo systemIo = systemIoParser.parse();
systemStat = systemStat
.map(stat -> Optional.of(stat.next(systemIo)))
.orElseGet(() -> Optional.of(new SystemStat(systemIo)));
Config config = configOptionSet(options);
if (systemStat.isPresent()) {
if (config.isShowSystem()) {
if (config.isPrometheusFormat()) {
systemStat.get().serializeForPrometheus(System.out);
} else {
systemStat.get().serialize(System.out);
}
}
if (config.isShowOther()) {
ProcessStatAccumulator processStatAccumulator = stats
.values()
.stream()
.reduce(new ProcessStatAccumulator(),
(result, stat) -> result.add(stat), (o, v) -> v);
OtherStat otherStat = new OtherStat(systemStat.get(), processStatAccumulator);
if (config.isPrometheusFormat()) {
otherStat.serializeForPrometheus(System.out);
} else {
otherStat.serialize(System.out);
StatSerializer serializer;
if (config.isPrometheusFormat()) {
serializer = new PrometheusSerializer(System.out);
} else {
serializer = new CsvSerializer(config, System.out);
}
StatProcessor statProcessor = new StatProcessor(config, serializer, System.err);
try {
statProcessor.begin();
Instant start = Instant.now();
Optional<StatSnapshot> previousStats = Optional.empty();
while (true) {
previousStats = Optional.of(statProcessor.getNext(previousStats));
if (config.getInterval() > 0) {
Thread.sleep(config.getInterval() - Duration.between(
start, Instant.now()).toMillis() % config.getInterval());
}
}
} finally {
statProcessor.end();
}
if (config.getInterval() > 0) {
Thread.sleep(config.getInterval() - Duration.between(
start, Instant.now()).toMillis() % config.getInterval());
} catch (Throwable throwable) {
if (throwable.getMessage() != null) {
System.err.println(throwable.getMessage());
} else {
System.err.println("Seems like you hit a bug. Please open an issue with"
+ " the following stack trace at https://gitlab.com/teoincontatto/pgio/issues/new");
System.err.println();
throwable.printStackTrace(System.err);
System.err.println();
}
System.err.println("Try \"pgio --help\" for more information.");
System.exit(1);
}
}
......@@ -193,22 +110,30 @@ public class Main {
"Show version and quit");
parser.acceptsAll(Lists.newArrayList("print-header"),
"Print header");
parser.acceptsAll(Lists.newArrayList("prometheus-format"),
"Print header");
parser.acceptsAll(Lists.newArrayList("a", "advanced"),
"Enable advanced options");
parser.acceptsAll(Lists.newArrayList("s", "show-system"),
"Print read/write data for the whole system");
parser.acceptsAll(Lists.newArrayList("o", "show-other"),
"Print read/write data not accounted by any listed process");
parser.acceptsAll(Lists.newArrayList("g", "group"),
"Group results using specified group configuration file")
parser.acceptsAll(Lists.newArrayList("D"),
"Specifies the file system location of the database configuration files."
+ " If this is omitted, the environment variable PGDATA is used.")
.withRequiredArg();
parser.acceptsAll(Lists.newArrayList("i", "interval"),
"Interval in milliseconds to gather stats")
.withRequiredArg()
.defaultsTo("3000");
parser.acceptsAll(Lists.newArrayList("prometheus-format"),
"Print output in prometheus format");
parser.acceptsAll(Lists.newArrayList("ppid"),
"Parent pid of the process to scan"
+ " (if not specified will collect stats from all processes)")
.availableIf("advanced")
.withRequiredArg();
parser.acceptsAll(Lists.newArrayList("g", "group"),
"Group results using specified group configuration file")
.availableIf("advanced")
.withRequiredArg();
return parser;
}
......@@ -216,13 +141,17 @@ public class Main {
private static Config configOptionSet(OptionSet options) {
Config.Builder configBuilder = new Config.Builder();
ConfigHelper configHelper = new ConfigHelper(options);
if (System.getenv("PGDATA") != null) {
configBuilder.withDataDir(Paths.get(System.getenv("PGDATA")));
}
configHelper.set("D", Paths::get, configBuilder::withDataDir);
configHelper.set("interval", Long::valueOf, configBuilder::withInterval);
configHelper.set("ppid", Integer::valueOf, configBuilder::withPpid);
configHelper.setIf("prometheus-format", configBuilder::withPrometheusFormat);
configHelper.setIf("print-header", configBuilder::withPrintHeader);
configHelper.setIf("no-print-header", configBuilder::withNoPrintHeader);
configHelper.setIf("show-system", configBuilder::withShowSystem);
configHelper.setIf("show-other", configBuilder::withShowOther);
configHelper.set("group", Main::readGroupConfig, configBuilder::withProcessGroups);
configHelper.set("group", Main::readGroupConfig, configBuilder::appendProcessGroups);
configHelper.setIf("prometheus-format", configBuilder::withPrometheusFormat);
Config config = configBuilder.build();
return config;
}
......@@ -238,7 +167,7 @@ public class Main {
builder.add(groupBuilder.build());
});
return builder.build();
} catch (Exception ex) {
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
......@@ -260,47 +189,4 @@ public class Main {
setValue.accept(options.has(option));
}
}
private static Optional<ProcessInfo> getPreviousOrParseInfo(
final Map<Integer, ProcessStat> previousStats, Integer pid) {
if (previousStats.containsKey(pid)) {
return Optional.of(previousStats.get(pid).getInfo());
} else {
return logAndEmptyIfException(() -> new ProcessInfoParser(pid).parse(),
() -> "Can not parse process info " + pid + ": ");
}
}
private static Optional<ProcessStat> getPreviousOrParseIo(
final Map<Integer, ProcessStat> previousStats, ProcessInfo info) {
if (previousStats.containsKey(info.getPid())) {
return logAndEmptyIfException(
() -> previousStats.get(info.getPid()).next(new ProcessIoParser(info.getPid()).parse()),
() -> "Can not parse process io " + info.getPid() + ": ");
} else {
return logAndEmptyIfException(
() -> new ProcessStat(info, new ProcessIoParser(info.getPid()).parse()),
() -> "Can not parse process io " + info.getPid() + ": ");
}
}
private static <T> Optional<T> logAndEmptyIfException(
Callable<T> callable, Supplier<String> errorPrefix) {
try {
return Optional.of(callable.call());
} catch (Throwable throwable) {
System.err.println(errorPrefix.get() + throwable.getMessage());
throwable.printStackTrace(System.err);
return Optional.empty();
}
}
private static GroupStat extractGroupStat(GroupInfo info, Map<Integer, ProcessStat> stats) {
return new GroupStat(info, stats
.values()
.stream()
.filter(stat -> info.belongsToGroup(stat.getInfo()))
.reduce(new ProcessGroupStat.Builder(), (builder, stat) -> builder.add(stat), (u, v) -> v)
.build());
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.ongres.pgio.main;
import com.google.common.base.Charsets;
import java.io.FileInputStream;
import java.nio.file.Path;
import java.util.Scanner;
public class PostgresPidFileParser {
private final Path dataDir;
public PostgresPidFileParser(Path dataDir) {
this.dataDir = dataDir;
}
public int parse() throws Exception {
Path postmasterPid = dataDir.resolve("postmaster.pid");
try (Scanner scanner = new Scanner(
new FileInputStream(postmasterPid.toFile()),
Charsets.UTF_8.name())) {
if (scanner.hasNext()) {
String line = scanner.nextLine();
return Integer.parseInt(line);
}
}
throw new IllegalStateException("Error reading pid from file " + postmasterPid);
}
}
......@@ -19,35 +19,44 @@
package com.ongres.pgio.main.config;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.ongres.pgio.main.stats.ProcessGroupInfo;
import com.ongres.pgio.main.stats.groups.DefaultPostgresGroups;
import java.nio.file.Path;
import java.util.Optional;
public class Config {
private final Path dataDir;
private final long interval;
private final Optional<Integer> ppid;
private final boolean prometheusFormat;
private final boolean printHeader;
private final boolean noPrintHeader;
private final boolean showSystem;
private final boolean showOther;
private final Optional<ImmutableList<ProcessGroupInfo>> processGroups;
private Config(long interval, Optional<Integer> ppid,
private Config(Path dataDir, long interval, Optional<Integer> ppid,
boolean prometheusFormat,
boolean printHeader, boolean showSystem, boolean showOther,
boolean noPrintHeader, boolean showSystem, boolean showOther,
Optional<ImmutableList<ProcessGroupInfo>> processGroups) {
super();
this.dataDir = dataDir;
this.interval = interval;
this.ppid = ppid;
this.prometheusFormat = prometheusFormat;
this.printHeader = printHeader;
this.noPrintHeader = noPrintHeader;
this.showSystem = showSystem;
this.showOther = showOther;
this.processGroups = processGroups;
}
public Path getDataDir() {
return dataDir;
}
public long getInterval() {
return interval;
}
......@@ -60,8 +69,8 @@ public class Config {
return prometheusFormat;
}
public boolean isPrintHeader() {
return printHeader;
public boolean isNoPrintHeader() {
return noPrintHeader;
}
public boolean isShowOther() {
......@@ -77,13 +86,20 @@ public class Config {
}
public static class Builder {
private Path dataDir;
private long interval;
private Optional<Integer> ppid = Optional.empty();
private boolean prometheusFormat;
private boolean printHeader;
private boolean noPrintHeader;
private boolean showSystem;
private boolean showOther;
private Optional<ImmutableList<ProcessGroupInfo>> processGroups = Optional.empty();
private Optional<ImmutableList<ProcessGroupInfo>> processGroups =
Optional.of(DefaultPostgresGroups.GROUPS);
public Builder withDataDir(Path dataDir) {
this.dataDir = dataDir;
return this;
}
public Builder withInterval(long interval) {
this.interval = interval;
......@@ -100,8 +116,8 @@ public class Config {
return this;
}
public Builder withPrintHeader(boolean printHeader) {
this.printHeader = printHeader;
public Builder withNoPrintHeader(boolean noPrintHeader) {
this.noPrintHeader = noPrintHeader;
return this;
}
......@@ -120,9 +136,22 @@ public class Config {
return this;
}
public Builder appendProcessGroups(ImmutableList<ProcessGroupInfo> processGroups) {
this.processGroups = Optional.of(this.processGroups.map(currentProcessGroups -> ImmutableList
.<ProcessGroupInfo>builder()
.addAll(currentProcessGroups)
.addAll(processGroups)
.build())
.orElse(processGroups));
return this;
}
public Config build() {
return new Config(interval, ppid,
prometheusFormat, printHeader, showSystem, showOther,
Preconditions.checkArgument(dataDir != null,
"no database directory specified and environment variable PGDATA unset");
return new Config(dataDir, interval, ppid,
prometheusFormat, noPrintHeader, showSystem, showOther,
processGroups);
}
}
......
......@@ -18,19 +18,12 @@
*/
package com.ongres.pgio.main.stats;
import static com.ongres.pgio.main.stats.StatUtil.bigIntegerIfPresentOrZero;
import static com.ongres.pgio.main.stats.StatUtil.protectString;
import static com.ongres.pgio.main.stats.StatUtil.protectStringForPrometheus;
import com.google.common.primitives.UnsignedLong;
import com.google.common.collect.ImmutableMap;
import java.io.PrintStream;
import java.math.BigInteger;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;
public class GroupStat {
public class GroupStat implements IoStat {
private final GroupInfo group;
private final ProcessGroupStat stat;
......@@ -53,23 +46,43 @@ public class GroupStat {
return stat.getTimestamp();
}
public Optional<BigInteger> getRchar() {
@Override
public String getLabel() {
return group.getName();
}
@Override
public Optional<Integer> getPid() {
return Optional.empty();
}
@Override
public Optional<Integer> getPpid() {
return Optional.empty();
}
@Override
public Optional<UnsignedLong> getRchar() {
return stat.getRchar();
}
public Optional<BigInteger> getWchar() {
@Override
public Optional<UnsignedLong> getWchar() {
return stat.getWchar();
}
public Optional<BigInteger> getReadBytes() {
@Override
public Optional<UnsignedLong> getReadBytes() {
return stat.getReadBytes();
}
public Optional<BigInteger> getWriteBytes() {
@Override
public Optional<UnsignedLong> getWriteBytes() {
return stat.getWriteBytes();
}
public Optional<BigInteger> getCancelledWriteBytes() {
@Override
public Optional<UnsignedLong> getCancelledWriteBytes() {
return stat.getCancelledWriteBytes();
}
......@@ -77,47 +90,4 @@ public class GroupStat {
public String toString() {
return "GroupStat [group=" + group + ", stat=" + stat + "]";
}
public void serialize(PrintStream out) {
out.print(getTimestamp());
out.print(',');
out.print(',');
out.print(',');
out.print(protectString(group.getName()));
out.print(',');
out.print(bigIntegerIfPresentOrZero(getRchar()));
out.print(',');
out.print(bigIntegerIfPresentOrZero(getWchar()));
out.print(',');
out.print(bigIntegerIfPresentOrZero(getReadBytes()));
out.print(',');
out.print(bigIntegerIfPresentOrZero(getWriteBytes()));
out.print(',');