1 Star 0 Fork 1

NIGHTFIGHTING / simple-go-kv

forked from 木木南 / simple-go-kv 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
merge.go 7.20 KB
一键复制 编辑 原始数据 按行查看 历史
package simple_bitcask_kv
import (
"io"
"os"
"path"
"path/filepath"
"simple_bitcask_kv/data"
"sort"
"strconv"
)
const mergeDirName string = "-merge"
var mergeFinishedKey = []byte("merge-finished")
// merge会生成新的DataFile存放某个key最新的那个record,也会生成的HintFile存放某个key的LogRecordPos
func (db *DB) Merge() error {
// 如果数据库为空,直接返回
if db.activeFile == nil {
return nil
}
db.mu.Lock() // 要修改一些字段,但是merge过程中肯定不用上锁,不然太影响正常处理了
// 注意所有return都要释放锁! -- 怪不得ob规定一个函数最多一个return语句!
if db.isMerging { // 已经在Merge,返回
db.mu.Unlock()
return ErrMergeInProgress
}
db.isMerging = true
defer func() {
db.isMerging = false
}()
// 当前active文件转为old文件,新开一个active文件,所有old文件进行merge
if err := db.activeFile.Sync(); err != nil {
db.mu.Unlock()
return err
}
db.olderFiles[db.activeFile.FileId] = db.activeFile
if err := db.setActiveDataFile(); err != nil {
db.mu.Unlock()
return err
}
// 记录最近没有参与 merge 的文件 id
nonMergeFileId := db.activeFile.FileId
// 取出所有old files进行merge
var mergeFiles []*data.DataFile
for _, file := range db.olderFiles {
mergeFiles = append(mergeFiles, file)
}
// 现在可以释放锁,merge不再影响正常读写
db.mu.Unlock()
// mergeFiles按FileId排序
sort.Slice(mergeFiles, func(i, j int) bool {
return mergeFiles[i].FileId < mergeFiles[j].FileId
})
mergePath := db.getMergePath()
// 如果已经有mergePath,可能是上次merge目录没有删除(我觉得可以换成更加随机的名字,比如加上时间戳)
if _, err := os.Stat(mergePath); err == nil {
// 能顺利打开这个目录,说明已存在
if err := os.RemoveAll(mergePath); err != nil {
return err
}
}
// 新建目录
if err := os.MkdirAll(mergePath, os.ModePerm); err != nil {
return err
}
// 打开一个新的临时bitcask实例用于merge
mergeOpts := db.options // 这应该是直接复制吧?
mergeOpts.DirPath = mergePath
mergeOpts.SyncWrites = false
mergeDB, err := Open(mergeOpts)
if err != nil {
return err
}
hintFile, err := data.OpenHintFile(db.options.DirPath)
if err != nil {
return err
}
// 处理每个数据文件
for _, dataFile := range mergeFiles {
// 跟当时恢复索引很类似 看看能不能抽象出个函数,总归都是从DataFiles中读出所有Records
var offset int64 = 0
for {
logRecord, size, err := dataFile.ReadLogRecord(offset)
if err != nil {
if err == io.EOF { // 文件读取结束
break
}
return err
}
rawKey, _ := decodeKey(logRecord.Key)
// 跟索引位置进行比较,如果一致,说明这个record就是最新的(也就是说这个record中的Value是最新的),否则说明这个record已经过时
logRecordPos := db.index.Get(rawKey)
if logRecordPos != nil &&
offset == logRecordPos.Offset &&
dataFile.FileId == logRecordPos.Fid {
logRecord.Key = encodeKey(rawKey, nonTxnSeqNo)
hintPos, err := mergeDB.appendLogRecord(logRecord)
if err != nil {
return err
}
// 将当前位置索引写入Hint文件中
hintFile.WriteHintRecord(rawKey, hintPos)
}
// 这样HintFile和mergeDB的新数据文件中存储的都是最新的那条记录
// TODO: 总感觉哪里还有不对,那些进行到一半中的事务记录怎么办
// TODO: 那些完结的事务可以去掉标记直接进行,但是进行到一半的事务
offset += size
}
}
// 持久化
if err := hintFile.Sync(); err != nil {
return err
}
if err := mergeDB.Sync(); err != nil {
return err
}
// 添加一个merge完成标识文件
mergeFinishedFile, err := data.OpenMergeFinishedFile(mergeOpts.DirPath)
if err != nil {
return err
}
encRec, _ := data.EncodeLogRecord(&data.LogRecord{
Key: mergeFinishedKey,
Value: []byte(strconv.Itoa(int(nonMergeFileId))),
Type: data.LogRecordMergeFinished,
})
if err := mergeFinishedFile.Write(encRec); err != nil {
return err
}
if err := mergeFinishedFile.Sync(); err != nil {
return err
}
// TODO: 我觉得merge成功直接就可以切换文件,没必要在Open时候才加载merge后文件
db.mu.Lock()
defer db.mu.Unlock()
// HintFile确实有必要,因为DataFiles切换后pos其实都变了
if err := db.loadMergeFiles(); err != nil {
return nil
}
if err := db.loadIndexFromHintFile(); err != nil {
return nil
}
return nil
}
// 比如数据库目录/tmp/simple-kv,那么merge目录就是/tmp/simple-kv-merge
func (db *DB) getMergePath() string {
dir := path.Dir(path.Clean(db.options.DirPath)) // 找到DirPath的父目录部分
base := path.Base(db.options.DirPath) // 整个目录的最后一个部分
return filepath.Join(dir, base+mergeDirName)
}
func (db *DB) loadMergeFiles() error {
mergePath := db.getMergePath()
// merge目录不存在直接返回
if _, err := os.Stat(mergePath); os.IsNotExist(err) {
return nil
}
defer func() { // 删除merge目录
os.RemoveAll(mergePath)
}()
dirEntries, err := os.ReadDir(mergePath)
if err != nil {
return err
}
// 查找标识 merge完成的文件,判断merge是否处理完了
mergeFinished := false
// 判断是否merge成功
var mergeFiles []string
for _, entry := range dirEntries {
if entry.Name() == data.MergeFinishedFileName {
mergeFinished = true
} else if entry.Name() != data.HintFileName {
mergeFiles = append(mergeFiles, entry.Name())
}
}
if !mergeFinished { // merge不完成直接返回
return nil
}
// merge完成就用新的数据文件替代
// 读出nonMergeFileId
nonMergeFileId, err := db.getNonMergeFileId(mergePath)
if err != nil {
return err
}
// 删除被merge的old文件
var fileId uint32 = 0
for ; fileId < nonMergeFileId; fileId++ {
fileName := data.GetDataFileName(db.options.DirPath, fileId)
if _, err := os.Stat(fileName); err != nil {
if err := os.Remove(fileName); err != nil {
return err
}
}
}
// 移入所有merge files
for _, fileName := range mergeFiles {
srcPath := filepath.Join(mergePath, fileName)
dstPath := filepath.Join(db.options.DirPath, fileName)
if err := os.Rename(srcPath, dstPath); err != nil {
return err
}
}
return nil
}
func (db *DB) getNonMergeFileId(mergePath string) (uint32, error) {
df, err := data.OpenMergeFinishedFile(mergePath)
if err != nil {
return 0, err
}
record, _, err := df.ReadLogRecord(0)
if err != nil {
return 0, err
}
id, err := strconv.Atoi(string(record.Value))
if err != nil {
return 0, err
}
return uint32(id), nil
}
func (db *DB) loadIndexFromHintFile() error {
hintFileName := filepath.Join(db.options.DirPath, data.HintFileName)
if _, err := os.Stat(hintFileName); os.IsNotExist(err) {
return nil
}
// 打开hint索引文件
hintFile, err := data.OpenHintFile(db.options.DirPath)
if err != nil {
return err
}
// 读取索引
var offset int64 = 0
for {
logRecord, size, err := hintFile.ReadLogRecord(offset)
if err != nil {
if err == io.EOF {
break
}
return err
}
// 解码拿到实际位置的索引
pos := data.DecodeLogRecordPos(logRecord.Value)
db.index.Put(logRecord.Key, pos)
offset += size
}
return nil
}
1
https://gitee.com/NIGHTFIGHTING/simple-go-kv.git
git@gitee.com:NIGHTFIGHTING/simple-go-kv.git
NIGHTFIGHTING
simple-go-kv
simple-go-kv
master

搜索帮助