Skip to content

Commit 8765a5b

Browse files
authored
Consensus storage layer (elastic#12)
Implements a translog-like storage layer for the new consensus module.
1 parent 6dfe53d commit 8765a5b

File tree

11 files changed

+1363
-186
lines changed

11 files changed

+1363
-186
lines changed

core/src/main/java/org/elasticsearch/cluster/AbstractDiffable.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,21 @@ public T apply(T part) {
9292
return part;
9393
}
9494
}
95+
96+
@Override
97+
public boolean equals(Object o) {
98+
if (this == o) return true;
99+
if (o == null || getClass() != o.getClass()) return false;
100+
101+
CompleteDiff<?> that = (CompleteDiff<?>) o;
102+
103+
return part != null ? part.equals(that.part) : that.part == null;
104+
}
105+
106+
@Override
107+
public int hashCode() {
108+
return part != null ? part.hashCode() : 0;
109+
}
95110
}
96111

97112
@SuppressWarnings("unchecked")

core/src/main/java/org/elasticsearch/common/io/stream/StreamOutput.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -752,6 +752,10 @@ public void writeOptionalStreamable(@Nullable Streamable streamable) throws IOEx
752752
}
753753
}
754754

755+
public void writeWriteable(Writeable writeable) throws IOException {
756+
writeable.writeTo(this);
757+
}
758+
755759
public void writeOptionalWriteable(@Nullable Writeable writeable) throws IOException {
756760
if (writeable != null) {
757761
writeBoolean(true);
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.elasticsearch.discovery.zen2;
20+
21+
import org.apache.lucene.codecs.CodecUtil;
22+
import org.apache.lucene.store.DataInput;
23+
import org.apache.lucene.store.DataOutput;
24+
import org.apache.lucene.store.Directory;
25+
import org.apache.lucene.store.IOContext;
26+
import org.apache.lucene.store.IndexInput;
27+
import org.apache.lucene.store.OutputStreamIndexOutput;
28+
import org.apache.lucene.store.SimpleFSDirectory;
29+
import org.elasticsearch.common.io.Channels;
30+
import org.elasticsearch.index.translog.ChannelFactory;
31+
32+
import java.io.ByteArrayOutputStream;
33+
import java.io.IOException;
34+
import java.nio.channels.FileChannel;
35+
import java.nio.file.OpenOption;
36+
import java.nio.file.Path;
37+
38+
final class Checkpoint {
39+
40+
final long generation;
41+
final long offset;
42+
final long term;
43+
44+
private static final int INITIAL_VERSION = 1;
45+
46+
private static final String CHECKPOINT_CODEC = "ckp";
47+
48+
static final int FILE_SIZE = CodecUtil.headerLength(CHECKPOINT_CODEC)
49+
+ Long.BYTES // generation
50+
+ Long.BYTES // offset
51+
+ Long.BYTES // term
52+
+ CodecUtil.footerLength();
53+
54+
/**
55+
* Create a new checkpoint for {@link ConsensusStorage}.
56+
*
57+
* @param generation the current translog generation
58+
* @param offset the current offset in the translog
59+
* @param term the current term
60+
*/
61+
Checkpoint(long generation, long offset, long term) {
62+
this.generation = generation;
63+
this.offset = offset;
64+
this.term = term;
65+
}
66+
67+
Checkpoint(DataInput in) throws IOException {
68+
this(in.readLong(), in.readLong(), in.readLong());
69+
}
70+
71+
private void write(DataOutput out) throws IOException {
72+
out.writeLong(generation);
73+
out.writeLong(offset);
74+
out.writeLong(term);
75+
}
76+
77+
@Override
78+
public String toString() {
79+
return "Checkpoint{" +
80+
"offset=" + offset +
81+
", generation=" + generation +
82+
", term=" + term +
83+
'}';
84+
}
85+
86+
public static Checkpoint read(Path path) throws IOException {
87+
try (Directory dir = new SimpleFSDirectory(path.getParent())) {
88+
try (IndexInput indexInput = dir.openInput(path.getFileName().toString(), IOContext.DEFAULT)) {
89+
// We checksum the entire file before we even go and parse it. If it's corrupted we barf right here.
90+
CodecUtil.checksumEntireFile(indexInput);
91+
CodecUtil.checkHeader(indexInput, CHECKPOINT_CODEC, INITIAL_VERSION, INITIAL_VERSION);
92+
return new Checkpoint(indexInput);
93+
}
94+
}
95+
}
96+
97+
public static void write(ChannelFactory factory, Path checkpointFile, Checkpoint checkpoint, OpenOption... options) throws IOException {
98+
final ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream(FILE_SIZE) {
99+
@Override
100+
public synchronized byte[] toByteArray() {
101+
// don't clone
102+
return buf;
103+
}
104+
};
105+
final String resourceDesc = "checkpoint(path=\"" + checkpointFile + "\", gen=" + checkpoint + ")";
106+
try (OutputStreamIndexOutput indexOutput =
107+
new OutputStreamIndexOutput(resourceDesc, checkpointFile.toString(), byteOutputStream, FILE_SIZE)) {
108+
CodecUtil.writeHeader(indexOutput, CHECKPOINT_CODEC, INITIAL_VERSION);
109+
checkpoint.write(indexOutput);
110+
CodecUtil.writeFooter(indexOutput);
111+
112+
assert indexOutput.getFilePointer() == FILE_SIZE :
113+
"get you numbers straight; bytes written: " + indexOutput.getFilePointer() + ", buffer size: " + FILE_SIZE;
114+
assert indexOutput.getFilePointer() < 512 :
115+
"checkpoint files have to be smaller than 512 bytes for atomic writes; size: " + indexOutput.getFilePointer();
116+
117+
}
118+
// now go and write to the channel, in one go.
119+
try (FileChannel channel = factory.open(checkpointFile, options)) {
120+
Channels.writeToChannel(byteOutputStream.toByteArray(), channel);
121+
// no need to force metadata, file size stays the same and we did the full fsync
122+
// when we first created the file, so the directory entry doesn't change as well
123+
channel.force(false);
124+
}
125+
}
126+
127+
@Override
128+
public boolean equals(Object o) {
129+
if (this == o) return true;
130+
if (o == null || getClass() != o.getClass()) return false;
131+
132+
Checkpoint that = (Checkpoint) o;
133+
134+
if (generation != that.generation) return false;
135+
if (offset != that.offset) return false;
136+
return term == that.term;
137+
}
138+
139+
@Override
140+
public int hashCode() {
141+
int result = (int) (generation ^ (generation >>> 32));
142+
result = 31 * result + (int) (offset ^ (offset >>> 32));
143+
result = 31 * result + (int) (term ^ (term >>> 32));
144+
return result;
145+
}
146+
147+
}

0 commit comments

Comments
 (0)