1 Star 0 Fork 1

NIGHTFIGHTING / simple-go-kv

forked from 木木南 / simple-go-kv 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
db.go 8.97 KB
一键复制 编辑 原始数据 按行查看 历史
package simple_bitcask_kv
import (
"errors"
"io"
"os"
"simple_bitcask_kv/data"
"simple_bitcask_kv/index"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
)
// DB bitcask存储引擎实例 提供一些用户接口
type DB struct {
options Options
seqNo uint64 // 递增的事务号 记录当前数据库中存在的最大事务号
// 内存结构
index index.Indexer
indextype index.IndexType
// 磁盘结构 - 需不需要把磁盘结构也抽象出来
mu *sync.RWMutex // 保护磁盘结构那几个并发访问数据结构 btree本身已经能够并发读写
activeFile *data.DataFile // 当前活跃文件,可追加写入record
olderFiles map[uint32]*data.DataFile // 所有旧文件,只读
isMerging bool // 最多同时允许一个merge进程
}
type RecordAndPos struct {
rawKey []byte // 解码后原始的key
ty data.LogRecordType
pos *data.LogRecordPos
}
/*
注意上读写锁的地方
只需要保存磁盘结构,内存结构本身就已经并发安全
Put中appendLogRecord需要上写锁
Get中整体上读锁
*/
// Open 打开bitcask 存储引擎实例
func Open(options Options) (*DB, error) {
if err := checkOptions(options); err != nil {
return nil, err
}
// 判断目录是否存在,不存在就创建
if _, err := os.Stat(options.DirPath); os.IsNotExist(err) {
if err := os.MkdirAll(options.DirPath, os.ModePerm); err != nil {
return nil, err
}
}
// 初始化DB实例
db := &DB{
options: options,
seqNo: nonTxnSeqNo + 1, // 初始化的txn id至少是1
indextype: index.Btree,
index: index.NewIndexer(index.Btree),
mu: new(sync.RWMutex),
olderFiles: make(map[uint32]*data.DataFile),
isMerging: false,
}
// 先检查如果有merge文件夹,就加载
if err := db.loadMergeFiles(); err != nil {
return nil, err
}
// 作者这里还加上了加载HintFile中索引,我觉得没必要,直接按照正常从DataFile中加载就行
// 通过HintFile也是同样读磁盘文件,还不如读DataFile时候直接就一起读取了
// 加载对应的数据文件 DirPath目录下 0001.data 0002.data ...
// 同时也会恢复索引
// 同时也会获取到之前最大的序列号,之后数据库新的序列号就从这之后开始
if err := db.loadDataFiles(); err != nil {
return nil, err
}
return db, nil
}
// getValueByPos 根据内存索引从磁盘中读取具体数据
func (db *DB) getValueByPos(pos *data.LogRecordPos) ([]byte, error) {
fileId := pos.Fid
offset := pos.Offset
db.mu.RLock()
defer db.mu.RUnlock()
var dataFile *data.DataFile = nil
if fileId == db.activeFile.FileId {
dataFile = db.activeFile
} else {
dataFile = db.olderFiles[fileId]
}
if dataFile == nil {
return nil, ErrDataFileNotFound
}
record, _, err := dataFile.ReadLogRecord(offset)
if err != nil {
return nil, err
}
if record.Type == data.LogRecordDeleted {
return nil, ErrKeyNotFound
}
// 注意这里读出来的key其实是encoded key,只不过这里不需要处理
return record.Value, nil
}
func (db *DB) GetNewTxnID() uint64 {
return atomic.AddUint64(&db.seqNo, 1) // 原子增长
}
// Close 清理相应资源,关闭活跃文件和旧文件的文件描述符 等其他资源
func (db *DB) Close() error {
if db.activeFile == nil {
return nil
}
db.mu.Lock()
defer db.mu.Unlock()
// 关闭当前活跃文件
if err := db.activeFile.Close(); err != nil {
return err
}
// 关闭旧文件
for _, file := range db.olderFiles {
if err := file.Close(); err != nil {
return err
}
}
return nil
}
// Sync 持久化活跃文件 DataFile已经实现Sync功能(底层还是靠IOManager)
func (db *DB) Sync() error {
if db.activeFile == nil {
return nil
}
db.mu.Lock()
defer db.mu.Unlock()
if err := db.activeFile.Sync(); err != nil {
return nil
}
return nil
}
// 有些地方需要已经上锁再调用
func (db *DB) appendLogRecordWL(record *data.LogRecord) (*data.LogRecordPos, error) {
// 判断当前活跃文件是否存在,没有就初始化
if db.activeFile == nil {
if err := db.setActiveDataFile(); err != nil {
return nil, err
}
}
// 把LogRecord编码成一个[]byte再写入
encRecord, size := data.EncodeLogRecord(record)
// 如果写入数据已经达到活跃文件的阈值,则关闭活跃文件,打开新文件 (按照论文描述)
if db.activeFile.WriteOffset+size > db.options.DataFileMaxSize {
// 持久化活跃文件
if err := db.activeFile.Sync(); err != nil {
return nil, err
}
// 当前活跃文件变为old files
db.olderFiles[db.activeFile.FileId] = db.activeFile
// 打开新的数据文件
if err := db.setActiveDataFile(); err != nil {
return nil, err
}
}
writeOffset := db.activeFile.WriteOffset
if err := db.activeFile.Write(encRecord); err != nil {
return nil, err
}
// 用户可指定每次写入后必须持久化
if db.options.SyncWrites {
if err := db.activeFile.Sync(); err != nil {
return nil, err
}
}
pos := &data.LogRecordPos{
Fid: db.activeFile.FileId,
Offset: writeOffset,
}
return pos, nil
}
// 将LogRecord写入磁盘中,并返回位置信息LogRecordPos以更新内存索引
// 实际上是db.Put的最重要的逻辑
// 保证并发安全
func (db *DB) appendLogRecord(record *data.LogRecord) (*data.LogRecordPos, error) {
db.mu.Lock()
defer db.mu.Unlock()
return db.appendLogRecordWL(record)
}
// 没有active file就设置一个
// 这个函数调用的时候已经加上上锁了
func (db *DB) setActiveDataFile() error {
var initialFileId uint32 = 0
if db.activeFile != nil { // 原有activeFile的Id基础上+1
initialFileId = db.activeFile.FileId + 1
}
dataFile, err := data.OpenDataFile(db.options.DirPath, initialFileId)
if err != nil {
return err
}
db.activeFile = dataFile
return nil
}
func (db *DB) loadDataFiles() error {
dirEntries, err := os.ReadDir(db.options.DirPath)
if err != nil {
return err
}
var fileIds []int
// .data结尾的就是数据文件
for _, entry := range dirEntries {
if strings.HasSuffix(entry.Name(), data.DataFileNameSuffix) {
// xxxx.data
splitName := strings.Split(entry.Name(), ".")[0]
atoi, err := strconv.Atoi(splitName)
if err != nil {
// 目录可能损害了
return ErrDataDirCorrupted
}
fileIds = append(fileIds, atoi)
}
}
// 依次加载数据文件
sort.Ints(fileIds)
for i, fid := range fileIds {
dataFile, err := data.OpenDataFile(db.options.DirPath, uint32(fid))
if err != nil {
return err
}
// 根据dataFile恢复索引
err = db.loadIndexFromDataFile(dataFile)
if err != nil {
return err
}
// id最大的是当前活跃文件
if i == len(fileIds)-1 {
db.activeFile = dataFile
} else {
db.olderFiles[uint32(fid)] = dataFile
}
}
return nil
}
func (db *DB) loadIndexFromDataFile(dataFile *data.DataFile) error {
updateIndex := func(key []byte, ty data.LogRecordType, pos *data.LogRecordPos) {
// 更新内存索引
if ty == data.LogRecordDeleted {
db.index.Delete(key)
} else {
db.index.Put(key, pos)
}
}
// 读取文件所有内容
var offset int64 = 0
// 用于暂存某个事务对应的一系列LogRecord和LogRecordPos
// 每个事务都对应一系列keys的修改 RecordAndPos里面存放的就是更新一次内存索引所需要的所有信息
txnRecords := make(map[uint64][]*RecordAndPos)
for {
logRecord, size, err := dataFile.ReadLogRecord(offset)
if err != nil {
if err == io.EOF { // 文件读取结束
break
}
return err
}
// 注意logRecord中的key是encoded key,要解析
rawKey, seqNo := decodeKey(logRecord.Key)
pos := &data.LogRecordPos{
Fid: dataFile.FileId,
Offset: offset,
}
if seqNo == nonTxnSeqNo {
// 说明是直接用db.RawPut/db.Delete写入的非事务操作,直接更新内存索引
updateIndex(rawKey, logRecord.Type, pos)
} else {
// 说明是WriteBatch写入一个事务,要确定找到Commit记录后才写入
if logRecord.Type == data.LogRecordTxnCommit {
// 找到commit记录,可以将对应事务的修改更新到内存索引中
txnWrites := txnRecords[seqNo]
if txnWrites != nil {
for _, rec := range txnRecords[seqNo] {
updateIndex(rec.rawKey, rec.ty, rec.pos)
}
}
} else if logRecord.Type == data.LogRecordTxnRollback {
// rollback记录,不用管这个事务 记录删除
delete(txnRecords, seqNo)
} else {
// 暂存这个记录的修改
txnRecords[seqNo] = append(txnRecords[seqNo], &RecordAndPos{
rawKey: rawKey,
ty: logRecord.Type,
pos: pos,
})
}
}
// 更新一些信息
if seqNo > db.seqNo { // 更新数据库恢复后的序列号
db.seqNo = seqNo
}
offset += size
}
// 如果是当前活跃文件,要更新WriteOffset
if dataFile == db.activeFile {
db.activeFile.WriteOffset = offset
}
return nil
}
func checkOptions(options Options) error {
if options.DirPath == "" {
return errors.New("incorrect database dir path")
}
if options.DataFileMaxSize <= 0 {
return errors.New("incorrect data file size")
}
return nil
}
1
https://gitee.com/NIGHTFIGHTING/simple-go-kv.git
git@gitee.com:NIGHTFIGHTING/simple-go-kv.git
NIGHTFIGHTING
simple-go-kv
simple-go-kv
master

搜索帮助