mirror of
https://github.com/Graylog2/graylog2-server.git
synced 2026-03-13 09:32:21 +08:00
resilienter index directory parser (#24397)
* resilienter index directory parser * added changelog
This commit is contained in:
5
changelog/unreleased/pr-24397.toml
Normal file
5
changelog/unreleased/pr-24397.toml
Normal file
@@ -0,0 +1,5 @@
|
||||
type = "f"
|
||||
message = "Make index directory parser more resilient, handling state files as optional"
|
||||
|
||||
issues = ["24358"]
|
||||
pulls = ["24397"]
|
||||
@@ -16,6 +16,7 @@
|
||||
*/
|
||||
package org.graylog.datanode.filesystem.index;
|
||||
|
||||
import jakarta.annotation.Nullable;
|
||||
import jakarta.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import org.graylog.datanode.filesystem.index.dto.IndexInformation;
|
||||
@@ -26,18 +27,23 @@ import org.graylog.datanode.filesystem.index.indexreader.ShardStats;
|
||||
import org.graylog.datanode.filesystem.index.indexreader.ShardStatsParser;
|
||||
import org.graylog.datanode.filesystem.index.statefile.StateFile;
|
||||
import org.graylog.datanode.filesystem.index.statefile.StateFileParser;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@Singleton
|
||||
public class IndicesDirectoryParser {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(IndicesDirectoryParser.class);
|
||||
|
||||
public static final String STATE_DIR_NAME = "_state";
|
||||
public static final String STATE_FILE_EXTENSION = ".st";
|
||||
|
||||
@@ -83,7 +89,7 @@ public class IndicesDirectoryParser {
|
||||
|
||||
private NodeInformation parseNode(Path nodePath) {
|
||||
final Path indicesDir = nodePath.resolve("indices");
|
||||
if(!Files.exists(indicesDir)) {
|
||||
if (!Files.exists(indicesDir)) {
|
||||
return NodeInformation.empty(nodePath);
|
||||
}
|
||||
try (Stream<Path> indicesDirs = Files.list(indicesDir)) {
|
||||
@@ -98,9 +104,17 @@ public class IndicesDirectoryParser {
|
||||
}
|
||||
}
|
||||
|
||||
@Nullable
|
||||
private StateFile getState(Path path, String stateFilePrefix) {
|
||||
final Path stateFile = findStateFile(path, stateFilePrefix);
|
||||
return stateFileParser.parse(stateFile);
|
||||
final Optional<StateFile> stateFile = findStateFile(path, stateFilePrefix)
|
||||
.map(stateFileParser::parse);
|
||||
if (stateFile.isPresent()) {
|
||||
return stateFile.get();
|
||||
} else {
|
||||
LOG.warn("Couldn't find state file in directory " + path + ". This is unexpected but indexers can usually recover from this.");
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private IndexInformation parseIndex(Path path) {
|
||||
@@ -126,14 +140,13 @@ public class IndicesDirectoryParser {
|
||||
return new ShardInformation(path, shardStats.documentsCount(), state, shardStats.minSegmentLuceneVersion());
|
||||
}
|
||||
|
||||
private Path findStateFile(Path stateDir, String stateFilePrefix) {
|
||||
private Optional<Path> findStateFile(Path stateDir, String stateFilePrefix) {
|
||||
try (Stream<Path> stateFiles = Files.list(stateDir.resolve(STATE_DIR_NAME))) {
|
||||
return stateFiles
|
||||
.filter(Files::isRegularFile)
|
||||
.filter(file -> file.getFileName().toString().startsWith(stateFilePrefix))
|
||||
.filter(file -> file.getFileName().toString().endsWith(STATE_FILE_EXTENSION))
|
||||
.findFirst()
|
||||
.orElseThrow(() -> new IndexerInformationParserException("No state file available in dir " + stateDir));
|
||||
.findFirst();
|
||||
} catch (IOException e) {
|
||||
throw new IndexerInformationParserException("Failed to list state file of index" + stateDir, e);
|
||||
}
|
||||
|
||||
@@ -18,40 +18,60 @@ package org.graylog.datanode.filesystem.index.dto;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import jakarta.annotation.Nullable;
|
||||
import org.graylog.datanode.filesystem.index.statefile.StateFile;
|
||||
import org.graylog.shaded.opensearch2.org.opensearch.Version;
|
||||
|
||||
import java.nio.file.Path;
|
||||
import java.time.Instant;
|
||||
import java.time.LocalDate;
|
||||
import java.time.ZoneId;
|
||||
import java.time.ZonedDateTime;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
public record IndexInformation(@JsonIgnore Path path, String indexID, @JsonIgnore StateFile stateFile,
|
||||
public record IndexInformation(@JsonIgnore Path path, String indexID, @Nullable @JsonIgnore StateFile stateFile,
|
||||
List<ShardInformation> shards) {
|
||||
|
||||
@JsonProperty
|
||||
public String indexName() {
|
||||
return stateFile.document().keySet().stream().findFirst().orElseThrow(() -> new RuntimeException("Failed to read index name"));
|
||||
return Optional.ofNullable(stateFile)
|
||||
.map(StateFile::document)
|
||||
.map(Map::keySet)
|
||||
.flatMap(keyset -> keyset.stream().findFirst())
|
||||
.orElse(indexID);
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@JsonProperty
|
||||
public String indexVersionCreated() {
|
||||
final int versionValue = Integer.parseInt(indexSetting("index.version.created"));
|
||||
return Version.fromId(versionValue).toString();
|
||||
return indexSetting("index.version.created")
|
||||
.map(Integer::parseInt)
|
||||
.map(Version::fromId)
|
||||
.map(Version::toString)
|
||||
.orElse(null);
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@JsonProperty
|
||||
public String creationDate() {
|
||||
|
||||
final long timestamp = Long.parseLong(indexSetting("index.creation_date"));
|
||||
return Instant.ofEpochMilli(timestamp).atZone(ZoneId.systemDefault()).toLocalDateTime().toString();
|
||||
return indexSetting("index.creation_date")
|
||||
.map(Long::parseLong)
|
||||
.map(Instant::ofEpochMilli)
|
||||
.map(instant -> instant.atZone(ZoneId.systemDefault()))
|
||||
.map(ZonedDateTime::toLocalDate)
|
||||
.map(LocalDate::toString)
|
||||
.orElse(null);
|
||||
}
|
||||
|
||||
private String indexSetting(String setting) {
|
||||
final Map<String, Object> index = (Map<String, Object>) stateFile.document().get(indexName());
|
||||
Map<String, Object> settings = (Map<String, Object>) index.get("settings");
|
||||
return (String) settings.get(setting);
|
||||
private Optional<String> indexSetting(String setting) {
|
||||
return Optional.ofNullable(stateFile).map(sf -> {
|
||||
final Map<String, Object> index = (Map<String, Object>) sf.document().get(indexName());
|
||||
Map<String, Object> settings = (Map<String, Object>) index.get("settings");
|
||||
return (String) settings.get(setting);
|
||||
});
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -19,14 +19,19 @@ package org.graylog.datanode.filesystem.index.dto;
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
import com.fasterxml.jackson.annotation.JsonInclude;
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import jakarta.annotation.Nullable;
|
||||
import org.apache.lucene.util.Version;
|
||||
import org.graylog.datanode.filesystem.index.statefile.StateFile;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
@JsonInclude(JsonInclude.Include.NON_NULL)
|
||||
public record ShardInformation(@JsonIgnore java.nio.file.Path path, int documentsCount, @JsonIgnore StateFile stateFile,
|
||||
public record ShardInformation(@JsonIgnore java.nio.file.Path path, int documentsCount,
|
||||
@Nullable @JsonIgnore StateFile stateFile,
|
||||
@JsonIgnore Version minSegmentLuceneVersion) {
|
||||
|
||||
private static final String PRIMARY_DOC_KEY = "primary";
|
||||
|
||||
@JsonProperty
|
||||
public String name() {
|
||||
return "S" + path.getFileName().toString();
|
||||
@@ -39,7 +44,11 @@ public record ShardInformation(@JsonIgnore java.nio.file.Path path, int document
|
||||
|
||||
@JsonProperty
|
||||
public boolean primary() {
|
||||
return (boolean) stateFile.document().get("primary");
|
||||
return Optional.ofNullable(stateFile)
|
||||
.map(StateFile::document)
|
||||
.filter(doc -> doc.containsKey(PRIMARY_DOC_KEY))
|
||||
.map(doc -> (boolean) doc.get("primary"))
|
||||
.orElse(false);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
Reference in New Issue
Block a user