1 Star 0 Fork 1

NIGHTFIGHTING / simple-go-kv

forked from 木木南 / simple-go-kv 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
batch.go 5.16 KB
一键复制 编辑 原始数据 按行查看 历史
package simple_bitcask_kv
import (
"encoding/binary"
"simple_bitcask_kv/data"
"sync"
)
const nonTxnSeqNo uint64 = 0 // 一般的db.RawPut/db.Delete也加上个SeqNo标记
// WriteBatch 用于原子批量写数据,保证原子写入
type WriteBatch struct {
mu *sync.RWMutex // 可能并发写入
options WriteBatchOptions
db *DB
seqNo uint64 // 生成wb就直接获取一个txn id
pendingWrites map[string]*RecordAndPos // WriteBatch不同事务的未提交数据放在这里
}
func (db *DB) NewWriteBatch(opts WriteBatchOptions) *WriteBatch {
wb := &WriteBatch{
mu: new(sync.RWMutex),
options: opts,
db: db,
seqNo: db.GetNewTxnID(),
}
// db中暂存这个txn还未提交的地方
wb.pendingWrites = make(map[string]*RecordAndPos)
return wb
}
// Put 往WriteBatch中加入一个写记录
func (wb *WriteBatch) Put(key []byte, value []byte) error {
if len(key) == 0 {
return ErrKeyIsEmpty
}
wb.mu.Lock()
defer wb.mu.Unlock()
// 暂存LogRecord 只更新磁盘记录,不更新内存(这样就不会)
// 之前居然忘了在WriteBatch中写入使用encodedKey
logRecord := &data.LogRecord{Key: encodeKey(key, wb.seqNo), Value: value,
Type: data.LogRecordNormal}
pos, err := wb.db.appendLogRecord(logRecord)
if err != nil {
return err
}
// 不直接更新db.index,而是更新到pendingWrites,直到commit才更新到index中
wb.pendingWrites[string(key)] = &RecordAndPos{
rawKey: key,
ty: logRecord.Type,
pos: pos,
}
return nil
}
// Delete 往WriteBatch中加入一个删除记录
func (wb *WriteBatch) Delete(key []byte) error {
if len(key) == 0 {
return ErrKeyIsEmpty
}
wb.mu.Lock()
defer wb.mu.Unlock()
// 如果db没有这个key,直接返回(还是为了减少文件膨胀)
if !wb.db.RawExistKey(key) {
// 如果WriteBatch中已经有这个要删除的key的记录,也直接删除
if wb.pendingWrites[string(key)] != nil {
delete(wb.pendingWrites, string(key))
}
return nil
}
// 暂存LogRecord
logRecord := &data.LogRecord{Key: encodeKey(key, wb.seqNo), Value: nil,
Type: data.LogRecordDeleted}
pos, err := wb.db.appendLogRecord(logRecord)
if err != nil {
return err
}
// 不直接更新db.index,而是更新到pendingWrites,直到commit才更新到index中
wb.pendingWrites[string(key)] = &RecordAndPos{
rawKey: key,
ty: logRecord.Type,
pos: pos,
}
return nil
}
func (wb *WriteBatch) Get(key []byte) ([]byte, error) {
if len(key) == 0 {
return nil, ErrKeyIsEmpty
}
wb.mu.RLock()
defer wb.mu.RUnlock()
rec := wb.pendingWrites[string(key)]
if rec == nil { // 当前事务没有进行修改,查看原先已有写入数据
return wb.db.RawGet(key)
}
return wb.db.getValueByPos(rec.pos)
}
var txnCommitFlag = []byte("txn-commit")
var txnRollbackFlag = []byte("txn-rollback")
// Commit 提交这个WriteBatch,确保原子写入
func (wb *WriteBatch) Commit() error {
wb.mu.Lock()
defer wb.mu.Unlock()
if len(wb.pendingWrites) == 0 {
return nil
}
if len(wb.pendingWrites) > wb.options.MaxBatchNum {
return ErrExceedMaxBatchNum
}
// 直接数据库一把大锁保证事务提交串行化
wb.db.mu.Lock()
defer wb.db.mu.Unlock()
// 标记事务结束的record
_, err := wb.db.appendLogRecordWL(&data.LogRecord{
Key: encodeKey(txnCommitFlag, wb.seqNo),
Value: nil,
Type: data.LogRecordTxnCommit,
})
if err != nil {
return err
}
// 根据配置决定是否立即持久化
if wb.options.SyncWrites {
wb.db.Sync()
}
// 更新内存中索引 将原本在pendingWrites中的直接加入index中
for _, rec := range wb.pendingWrites {
if rec.ty == data.LogRecordDeleted {
wb.db.index.Delete(rec.rawKey)
} else if rec.ty == data.LogRecordNormal {
wb.db.index.Put(rec.rawKey, rec.pos)
}
// 这个错误直接不处理就行
// else { panic("程序逻辑错误!这里不可能有除了Deleted和Normal之外的类型") }
}
// 清空这个事务的pending数据
wb.pendingWrites = nil
return nil
}
// Rollback 撤回这个WriteBatch
func (wb *WriteBatch) Rollback() error {
wb.mu.Lock()
defer wb.mu.Unlock()
// 直接数据库一把大锁保证事务提交串行化
wb.db.mu.Lock()
defer wb.db.mu.Unlock()
// 标记事务结束的record
_, err := wb.db.appendLogRecordWL(&data.LogRecord{
Key: encodeKey(txnCommitFlag, wb.seqNo),
Value: nil,
Type: data.LogRecordTxnRollback,
})
if err != nil {
return err
}
// 根据配置决定是否立即持久化
if wb.options.SyncWrites {
wb.db.Sync()
}
// 不更新内存中索引,直接清空这个事务的pending数据,这样这个WriteBatch的写入就都无效
wb.pendingWrites = nil
return nil
}
// 事务需要,key得是原始的key和事务号一起编码
func encodeKey(key []byte, seqNo uint64) []byte {
seqBytes := make([]byte, binary.MaxVarintLen64)
n := binary.PutUvarint(seqBytes[:], seqNo) // 返回n是seqNo真正编码后的字节数组长度(可能小于MaxVarintLen64)
encKey := make([]byte, n+len(key))
copy(encKey[:n], seqBytes[:n])
copy(encKey[n:], key)
return encKey
}
func decodeKey(encKey []byte) ([]byte, uint64) {
seqNo, n := binary.Uvarint(encKey)
rawKey := encKey[n:]
return rawKey, seqNo
}
1
https://gitee.com/NIGHTFIGHTING/simple-go-kv.git
git@gitee.com:NIGHTFIGHTING/simple-go-kv.git
NIGHTFIGHTING
simple-go-kv
simple-go-kv
master

搜索帮助