@@ -31,6 +31,7 @@ const (
31
31
DefaultCheckpointInterval = 1 * time .Minute
32
32
DefaultMinCheckpointPageN = 1000
33
33
DefaultMaxCheckpointPageN = 10000
34
+ DefaultTruncatePageN = 500000
34
35
)
35
36
36
37
// MaxIndex is the maximum possible WAL index.
@@ -83,6 +84,16 @@ type DB struct {
83
84
// unbounded if there are always read transactions occurring.
84
85
MaxCheckpointPageN int
85
86
87
+ // Threshold of WAL size, in pages, before a forced truncation checkpoint.
88
+ // A forced truncation checkpoint will block new transactions and wait for
89
+ // existing transactions to finish before issuing a checkpoint and
90
+ // truncating the WAL.
91
+ //
92
+ // If zero, no truncates are forced. This can cause the WAL to grow
93
+ // unbounded if there's a sudden spike of changes between other
94
+ // checkpoints.
95
+ TruncatePageN int
96
+
86
97
// Time between automatic checkpoints in the WAL. This is done to allow
87
98
// more fine-grained WAL files so that restores can be performed with
88
99
// better precision.
@@ -104,6 +115,7 @@ func NewDB(path string) *DB {
104
115
105
116
MinCheckpointPageN : DefaultMinCheckpointPageN ,
106
117
MaxCheckpointPageN : DefaultMaxCheckpointPageN ,
118
+ TruncatePageN : DefaultTruncatePageN ,
107
119
CheckpointInterval : DefaultCheckpointInterval ,
108
120
MonitorInterval : DefaultMonitorInterval ,
109
121
}
@@ -740,7 +752,7 @@ func (db *DB) Sync(ctx context.Context) (err error) {
740
752
}
741
753
742
754
// Synchronize real WAL with current shadow WAL.
743
- newWALSize , err := db .syncWAL (info )
755
+ origWALSize , newWALSize , err := db .syncWAL (info )
744
756
if err != nil {
745
757
return fmt .Errorf ("sync wal: %w" , err )
746
758
}
@@ -749,7 +761,9 @@ func (db *DB) Sync(ctx context.Context) (err error) {
749
761
// If WAL size is greater than min threshold, attempt checkpoint.
750
762
var checkpoint bool
751
763
checkpointMode := CheckpointModePassive
752
- if db .MaxCheckpointPageN > 0 && newWALSize >= calcWALSize (db .pageSize , db .MaxCheckpointPageN ) {
764
+ if db .TruncatePageN > 0 && origWALSize >= calcWALSize (db .pageSize , db .TruncatePageN ) {
765
+ checkpoint , checkpointMode = true , CheckpointModeTruncate
766
+ } else if db .MaxCheckpointPageN > 0 && newWALSize >= calcWALSize (db .pageSize , db .MaxCheckpointPageN ) {
753
767
checkpoint , checkpointMode = true , CheckpointModeRestart
754
768
} else if newWALSize >= calcWALSize (db .pageSize , db .MinCheckpointPageN ) {
755
769
checkpoint = true
@@ -908,29 +922,29 @@ type syncInfo struct {
908
922
}
909
923
910
924
// syncWAL copies pending bytes from the real WAL to the shadow WAL.
911
- func (db * DB ) syncWAL (info syncInfo ) (newSize int64 , err error ) {
925
+ func (db * DB ) syncWAL (info syncInfo ) (origSize int64 , newSize int64 , err error ) {
912
926
// Copy WAL starting from end of shadow WAL. Exit if no new shadow WAL needed.
913
- newSize , err = db .copyToShadowWAL (info .shadowWALPath )
927
+ origSize , newSize , err = db .copyToShadowWAL (info .shadowWALPath )
914
928
if err != nil {
915
- return newSize , fmt .Errorf ("cannot copy to shadow wal: %w" , err )
929
+ return origSize , newSize , fmt .Errorf ("cannot copy to shadow wal: %w" , err )
916
930
} else if ! info .restart {
917
- return newSize , nil // If no restart required, exit.
931
+ return origSize , newSize , nil // If no restart required, exit.
918
932
}
919
933
920
934
// Parse index of current shadow WAL file.
921
935
dir , base := filepath .Split (info .shadowWALPath )
922
936
index , err := ParseWALPath (base )
923
937
if err != nil {
924
- return 0 , fmt .Errorf ("cannot parse shadow wal filename: %s" , base )
938
+ return 0 , 0 , fmt .Errorf ("cannot parse shadow wal filename: %s" , base )
925
939
}
926
940
927
941
// Start a new shadow WAL file with next index.
928
942
newShadowWALPath := filepath .Join (dir , FormatWALPath (index + 1 ))
929
943
newSize , err = db .initShadowWALFile (newShadowWALPath )
930
944
if err != nil {
931
- return 0 , fmt .Errorf ("cannot init shadow wal file: name=%s err=%w" , newShadowWALPath , err )
945
+ return 0 , 0 , fmt .Errorf ("cannot init shadow wal file: name=%s err=%w" , newShadowWALPath , err )
932
946
}
933
- return newSize , nil
947
+ return origSize , newSize , nil
934
948
}
935
949
936
950
func (db * DB ) initShadowWALFile (filename string ) (int64 , error ) {
@@ -966,58 +980,64 @@ func (db *DB) initShadowWALFile(filename string) (int64, error) {
966
980
_ = os .Chown (filename , uid , gid )
967
981
968
982
// Copy as much shadow WAL as available.
969
- newSize , err := db .copyToShadowWAL (filename )
983
+ _ , newSize , err := db .copyToShadowWAL (filename )
970
984
if err != nil {
971
985
return 0 , fmt .Errorf ("cannot copy to new shadow wal: %w" , err )
972
986
}
973
987
return newSize , nil
974
988
}
975
989
976
- func (db * DB ) copyToShadowWAL (filename string ) (newSize int64 , err error ) {
990
+ func (db * DB ) copyToShadowWAL (filename string ) (origWalSize int64 , newSize int64 , err error ) {
977
991
Tracef ("%s: copy-shadow: %s" , db .path , filename )
978
992
979
993
r , err := os .Open (db .WALPath ())
980
994
if err != nil {
981
- return 0 , err
995
+ return 0 , 0 , err
982
996
}
983
997
defer r .Close ()
984
998
999
+ fi , err := r .Stat ()
1000
+ if err != nil {
1001
+ return 0 , 0 , err
1002
+ }
1003
+ origWalSize = frameAlign (fi .Size (), db .pageSize )
1004
+
985
1005
w , err := os .OpenFile (filename , os .O_RDWR , 0666 )
986
1006
if err != nil {
987
- return 0 , err
1007
+ return 0 , 0 , err
988
1008
}
989
1009
defer w .Close ()
990
1010
991
- fi , err : = w .Stat ()
1011
+ fi , err = w .Stat ()
992
1012
if err != nil {
993
- return 0 , err
1013
+ return 0 , 0 , err
994
1014
}
995
1015
origSize := frameAlign (fi .Size (), db .pageSize )
996
1016
997
1017
// Read shadow WAL header to determine byte order for checksum & salt.
998
1018
hdr := make ([]byte , WALHeaderSize )
999
1019
if _ , err := io .ReadFull (w , hdr ); err != nil {
1000
- return 0 , fmt .Errorf ("read header: %w" , err )
1020
+ return 0 , 0 , fmt .Errorf ("read header: %w" , err )
1001
1021
}
1002
1022
hsalt0 := binary .BigEndian .Uint32 (hdr [16 :])
1003
1023
hsalt1 := binary .BigEndian .Uint32 (hdr [20 :])
1004
1024
1005
1025
bo , err := headerByteOrder (hdr )
1006
1026
if err != nil {
1007
- return 0 , err
1027
+ return 0 , 0 , err
1008
1028
}
1009
1029
1010
1030
// Read previous checksum.
1011
1031
chksum0 , chksum1 , err := readLastChecksumFrom (w , db .pageSize )
1012
1032
if err != nil {
1013
- return 0 , fmt .Errorf ("last checksum: %w" , err )
1033
+ return 0 , 0 , fmt .Errorf ("last checksum: %w" , err )
1014
1034
}
1015
1035
1016
1036
// Seek to correct position on real wal.
1017
1037
if _ , err := r .Seek (origSize , io .SeekStart ); err != nil {
1018
- return 0 , fmt .Errorf ("real wal seek: %w" , err )
1038
+ return 0 , 0 , fmt .Errorf ("real wal seek: %w" , err )
1019
1039
} else if _ , err := w .Seek (origSize , io .SeekStart ); err != nil {
1020
- return 0 , fmt .Errorf ("shadow wal seek: %w" , err )
1040
+ return 0 , 0 , fmt .Errorf ("shadow wal seek: %w" , err )
1021
1041
}
1022
1042
1023
1043
// Read through WAL from last position to find the page of the last
@@ -1032,7 +1052,7 @@ func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) {
1032
1052
Tracef ("%s: copy-shadow: break %s @ %d; err=%s" , db .path , filename , offset , err )
1033
1053
break // end of file or partial page
1034
1054
} else if err != nil {
1035
- return 0 , fmt .Errorf ("read wal: %w" , err )
1055
+ return 0 , 0 , fmt .Errorf ("read wal: %w" , err )
1036
1056
}
1037
1057
1038
1058
// Read frame salt & compare to header salt. Stop reading on mismatch.
@@ -1063,7 +1083,7 @@ func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) {
1063
1083
newDBSize := binary .BigEndian .Uint32 (frame [4 :])
1064
1084
if newDBSize != 0 {
1065
1085
if _ , err := buf .WriteTo (w ); err != nil {
1066
- return 0 , fmt .Errorf ("write shadow wal: %w" , err )
1086
+ return 0 , 0 , fmt .Errorf ("write shadow wal: %w" , err )
1067
1087
}
1068
1088
buf .Reset ()
1069
1089
lastCommitSize = offset
@@ -1072,15 +1092,15 @@ func (db *DB) copyToShadowWAL(filename string) (newSize int64, err error) {
1072
1092
1073
1093
// Sync & close.
1074
1094
if err := w .Sync (); err != nil {
1075
- return 0 , err
1095
+ return 0 , 0 , err
1076
1096
} else if err := w .Close (); err != nil {
1077
- return 0 , err
1097
+ return 0 , 0 , err
1078
1098
}
1079
1099
1080
1100
// Track total number of bytes written to WAL.
1081
1101
db .totalWALBytesCounter .Add (float64 (lastCommitSize - origSize ))
1082
1102
1083
- return lastCommitSize , nil
1103
+ return origWalSize , lastCommitSize , nil
1084
1104
}
1085
1105
1086
1106
// ShadowWALReader opens a reader for a shadow WAL file at a given position.
@@ -1249,7 +1269,7 @@ func (db *DB) checkpoint(ctx context.Context, generation, mode string) error {
1249
1269
}
1250
1270
1251
1271
// Copy shadow WAL before checkpoint to copy as much as possible.
1252
- if _ , err := db .copyToShadowWAL (shadowWALPath ); err != nil {
1272
+ if _ , _ , err := db .copyToShadowWAL (shadowWALPath ); err != nil {
1253
1273
return fmt .Errorf ("cannot copy to end of shadow wal before checkpoint: %w" , err )
1254
1274
}
1255
1275
@@ -1284,7 +1304,7 @@ func (db *DB) checkpoint(ctx context.Context, generation, mode string) error {
1284
1304
}
1285
1305
1286
1306
// Copy the end of the previous WAL before starting a new shadow WAL.
1287
- if _ , err := db .copyToShadowWAL (shadowWALPath ); err != nil {
1307
+ if _ , _ , err := db .copyToShadowWAL (shadowWALPath ); err != nil {
1288
1308
return fmt .Errorf ("cannot copy to end of shadow wal: %w" , err )
1289
1309
}
1290
1310
@@ -1343,7 +1363,7 @@ func (db *DB) execCheckpoint(mode string) (err error) {
1343
1363
if err := db .db .QueryRow (rawsql ).Scan (& row [0 ], & row [1 ], & row [2 ]); err != nil {
1344
1364
return err
1345
1365
}
1346
- Tracef ("%s: checkpoint: mode=%v (%d,%d,%d)" , db .path , mode , row [0 ], row [1 ], row [2 ])
1366
+ log . Printf ("%s: checkpoint: mode=%v (%d,%d,%d)" , db .path , mode , row [0 ], row [1 ], row [2 ])
1347
1367
1348
1368
// Reacquire the read lock immediately after the checkpoint.
1349
1369
if err := db .acquireReadLock (); err != nil {
0 commit comments