代码拉取完成,页面将自动刷新
同步操作将从 木木南/simple-go-kv 强制同步,此操作会覆盖自 Fork 仓库以来所做的任何修改,且无法恢复!!!
确定后同步将在后台操作,完成时将刷新页面,请耐心等待。
package simple_bitcask_kv
import (
"encoding/binary"
"simple_bitcask_kv/data"
"sync"
)
const nonTxnSeqNo uint64 = 0 // 一般的db.RawPut/db.Delete也加上个SeqNo标记
// WriteBatch 用于原子批量写数据,保证原子写入
type WriteBatch struct {
mu *sync.RWMutex // 可能并发写入
options WriteBatchOptions
db *DB
seqNo uint64 // 生成wb就直接获取一个txn id
pendingWrites map[string]*RecordAndPos // WriteBatch不同事务的未提交数据放在这里
}
func (db *DB) NewWriteBatch(opts WriteBatchOptions) *WriteBatch {
wb := &WriteBatch{
mu: new(sync.RWMutex),
options: opts,
db: db,
seqNo: db.GetNewTxnID(),
}
// db中暂存这个txn还未提交的地方
wb.pendingWrites = make(map[string]*RecordAndPos)
return wb
}
// Put 往WriteBatch中加入一个写记录
func (wb *WriteBatch) Put(key []byte, value []byte) error {
if len(key) == 0 {
return ErrKeyIsEmpty
}
wb.mu.Lock()
defer wb.mu.Unlock()
// 暂存LogRecord 只更新磁盘记录,不更新内存(这样就不会)
// 之前居然忘了在WriteBatch中写入使用encodedKey
logRecord := &data.LogRecord{Key: encodeKey(key, wb.seqNo), Value: value,
Type: data.LogRecordNormal}
pos, err := wb.db.appendLogRecord(logRecord)
if err != nil {
return err
}
// 不直接更新db.index,而是更新到pendingWrites,直到commit才更新到index中
wb.pendingWrites[string(key)] = &RecordAndPos{
rawKey: key,
ty: logRecord.Type,
pos: pos,
}
return nil
}
// Delete 往WriteBatch中加入一个删除记录
func (wb *WriteBatch) Delete(key []byte) error {
if len(key) == 0 {
return ErrKeyIsEmpty
}
wb.mu.Lock()
defer wb.mu.Unlock()
// 如果db没有这个key,直接返回(还是为了减少文件膨胀)
if !wb.db.RawExistKey(key) {
// 如果WriteBatch中已经有这个要删除的key的记录,也直接删除
if wb.pendingWrites[string(key)] != nil {
delete(wb.pendingWrites, string(key))
}
return nil
}
// 暂存LogRecord
logRecord := &data.LogRecord{Key: encodeKey(key, wb.seqNo), Value: nil,
Type: data.LogRecordDeleted}
pos, err := wb.db.appendLogRecord(logRecord)
if err != nil {
return err
}
// 不直接更新db.index,而是更新到pendingWrites,直到commit才更新到index中
wb.pendingWrites[string(key)] = &RecordAndPos{
rawKey: key,
ty: logRecord.Type,
pos: pos,
}
return nil
}
func (wb *WriteBatch) Get(key []byte) ([]byte, error) {
if len(key) == 0 {
return nil, ErrKeyIsEmpty
}
wb.mu.RLock()
defer wb.mu.RUnlock()
rec := wb.pendingWrites[string(key)]
if rec == nil { // 当前事务没有进行修改,查看原先已有写入数据
return wb.db.RawGet(key)
}
return wb.db.getValueByPos(rec.pos)
}
var txnCommitFlag = []byte("txn-commit")
var txnRollbackFlag = []byte("txn-rollback")
// Commit 提交这个WriteBatch,确保原子写入
func (wb *WriteBatch) Commit() error {
wb.mu.Lock()
defer wb.mu.Unlock()
if len(wb.pendingWrites) == 0 {
return nil
}
if len(wb.pendingWrites) > wb.options.MaxBatchNum {
return ErrExceedMaxBatchNum
}
// 直接数据库一把大锁保证事务提交串行化
wb.db.mu.Lock()
defer wb.db.mu.Unlock()
// 标记事务结束的record
_, err := wb.db.appendLogRecordWL(&data.LogRecord{
Key: encodeKey(txnCommitFlag, wb.seqNo),
Value: nil,
Type: data.LogRecordTxnCommit,
})
if err != nil {
return err
}
// 根据配置决定是否立即持久化
if wb.options.SyncWrites {
wb.db.Sync()
}
// 更新内存中索引 将原本在pendingWrites中的直接加入index中
for _, rec := range wb.pendingWrites {
if rec.ty == data.LogRecordDeleted {
wb.db.index.Delete(rec.rawKey)
} else if rec.ty == data.LogRecordNormal {
wb.db.index.Put(rec.rawKey, rec.pos)
}
// 这个错误直接不处理就行
// else { panic("程序逻辑错误!这里不可能有除了Deleted和Normal之外的类型") }
}
// 清空这个事务的pending数据
wb.pendingWrites = nil
return nil
}
// Rollback 撤回这个WriteBatch
func (wb *WriteBatch) Rollback() error {
wb.mu.Lock()
defer wb.mu.Unlock()
// 直接数据库一把大锁保证事务提交串行化
wb.db.mu.Lock()
defer wb.db.mu.Unlock()
// 标记事务结束的record
_, err := wb.db.appendLogRecordWL(&data.LogRecord{
Key: encodeKey(txnCommitFlag, wb.seqNo),
Value: nil,
Type: data.LogRecordTxnRollback,
})
if err != nil {
return err
}
// 根据配置决定是否立即持久化
if wb.options.SyncWrites {
wb.db.Sync()
}
// 不更新内存中索引,直接清空这个事务的pending数据,这样这个WriteBatch的写入就都无效
wb.pendingWrites = nil
return nil
}
// 事务需要,key得是原始的key和事务号一起编码
func encodeKey(key []byte, seqNo uint64) []byte {
seqBytes := make([]byte, binary.MaxVarintLen64)
n := binary.PutUvarint(seqBytes[:], seqNo) // 返回n是seqNo真正编码后的字节数组长度(可能小于MaxVarintLen64)
encKey := make([]byte, n+len(key))
copy(encKey[:n], seqBytes[:n])
copy(encKey[n:], key)
return encKey
}
func decodeKey(encKey []byte) ([]byte, uint64) {
seqNo, n := binary.Uvarint(encKey)
rawKey := encKey[n:]
return rawKey, seqNo
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。