golang实践LSM相关内容

LSM

LSM(log-structured merge-tree)是一种分层,有序,面向磁盘的数据结构,其核心思想是充分了利用了,磁盘批量的顺序写要远比随机写性能高出很多,在计算机科学中,日志结构的合并树是一种具有性能特征的数据结构,这使得它对于提供对具有高插入量的文件(例如事务日志数据)的索引访问很有吸引力。与其他搜索树一样,LSM树也维护键值对。LSM树以两个或多个单独的结构来维护数据,每种结构都针对其各自的底层存储介质进行了优化;数据在两个结构之间批量有效地同步。

在日志的写入过程中通过批量顺序写会提高IO性能,但是基于写入的日志进行查询的效率就不太满足需求,故一般都是通过如下方式来进行兼顾读写性能。

  1. WAL(write ahead log)即预写日志,将写入的数据先保存在内存中,当达到一定阈值时批量写入硬盘。
  2. 保存的数据通过文件偏移,并辅助索引,从而完成较快的数据查询。

如上的解决方案基本上是leveldb、hbase等主流NoSql的解决方案。将文件以SSTable的形式进行落盘保存,当然其中还有相关分层或者数据压缩的方式来提高效率。

LSM总体架构

参考博文深入理解什么是LSM-Tree

在这里插入图片描述

写流程
  1. 当写入的时候,首先先写到wal日志中,用以当做故障恢复等用。
  2. 当写完WAL之后,会首先写到内存中的Memtable中,数据会存在内存中,并在内存中维护一个排序用于查找。
  3. 当Memtable超过一定的大小后,会在内存里面冻结,变成不可变的Memtable,同时为了不阻塞写操作需要新生成一个Memtable继续提供服务。
  4. 把内存里面不可变的Memtable给dump到硬盘上的SSTable层中,此步骤也称为Minor Compaction,这里需要注意在L0层的SSTable是没有进行合并的,所以这里的key range在多个SSTable中可能会出现重叠,在层数大于0层之后的SSTable,不存在重叠key。
  5. 当每层的磁盘上的SSTable的体积超过一定的大小或者个数,也会周期的进行合并。此步骤也称为Major Compaction,这个阶段会真正的清除掉被标记删除掉的数据以及多版本数据的合并,避免浪费空间,注意由于SSTable都是有序的,我们可以直接采用merge sort进行高效合并,在合并的过程中会导致查询写入的性能下降。
读流程
  1. 当收到读请求时,先查询内存的memtable,如果查到则返回。
  2. 当在Memtable中没有查到的时候,如果此时有Immemtable正在合并则在此中查找如果查到则返回。
  3. 如果仍然没有找到,则进入硬盘中查找,并通过sstable进行分层的遍历查找,找到则返回未找到则返回空,最差的结果就是所有的层都遍历到。
动手实践

实践的内容主要是参考 从0开始:500行代码实现 LSM数据库

设计的SSTable结构都是按照上文设计而来,只不过没有实现后台的文件合并的操作,实质上只做了一层的数据落盘,然后直接使用了json的数据格式,并通过map结构简单的实践了一下,实践的代码主要是为了加深学习。

package trydb

import (
	"encoding/binary"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"io/ioutil"
	"os"
	"sort"
	"strings"
	"sync"
	"time"
)


const (
	SET_METHOD = iota
	DELETE_METHOD
)


var db Db


func init()  {
	lsm := &LsmKvStore{
		index: make(map[string]Entry),
		immutableIndex: make(map[string]Entry),
		storeThreshold: 10,
		ssTables: make([]*SsTable, 0),
		Wal: &Wal{WalFileName: "wal", TempWalFileName:"temp_wal"},
		tableName: ".table",
		partSize:3,
	}
	lsm.Wal.InitWal()

	lsm.RestoreFromFiles()
	lsm.RestoreFromWal()
	db = lsm
}


func Set(key, value string) error {
	return db.Set(key, value)
}

func Get(key string) (string, error) {
	return db.Get(key)
}

func Delete(key string) error {
	return db.Delete(key)
}



type Db interface {
	Set(key, value string) error
	Get(key string) (string, error)
	Delete(key string) error
}


type TableMetaInfo struct {
	version int64
	dataStart int64
	dataLen int64
	indexStart int64
	indexLen int64
	partSize int64
}


type Position struct {
	Start int64
	Length int64
}


func OpenFile(fileName string)*os.File{
	f, err := os.OpenFile(fileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
	if err != nil {
		fmt.Println(" error open file ", err.Error())
		os.Exit(-1)
	}
	return f
}


type SsTable struct {
	f *os.File
	filePath string
	tableMetaInfo *TableMetaInfo
	sparseIndex map[string]Position
	partSize int64
}


func(s *SsTable) Query(key string)*Entry{
	// 先排序
	var keys []string
	for k := range s.sparseIndex {
		keys = append(keys, k)
	}
	sort.Strings(keys)

	// 使用二分查找
	i, j := 0, len(keys)
	foundKey := ""
	for i < j {
		mid := int((i+j)/2)
		if mid == i || mid == j {
			break
		}
		if keys[mid] > key {
			j = mid - 1
		}else if keys[mid] < key {
			i = mid + 1
		} else {
			foundKey = key
		}
	}

	var minIndex int
	if len(foundKey) > 0 {
		minIndex = int((i+j)/2)
	} else {
		foundIndex := len(keys)
		if i > j {
			minIndex = j
		} else {
			minIndex = i
		}
		if minIndex < 0 {
			minIndex = 0
		} else if minIndex >= foundIndex {
			minIndex = foundIndex - 1
		}
	}

	pos, ok := s.sparseIndex[keys[minIndex]]
	if ok == false {
		fmt.Println(" found sparse index error")
		os.Exit(1)
	}

	s.f.Seek(pos.Start, 0)

	bytes := make([]byte, pos.Length)
	fmt.Println("  found sparse  read  ", pos.Length, pos.Start, keys[minIndex])
	_, err := s.f.Read(bytes)
	if err != nil {
		fmt.Println(" found sparse Read error ", err.Error())
		os.Exit(1)
	}

	partData := make(map[string]Entry)
	err = json.Unmarshal(bytes, &partData)
	if err != nil {
		fmt.Println(" found sparse unmarshal error ", err.Error())
		os.Exit(1)
	}
	e, ok := partData[key]
	if ok == false {
		return nil
	}

	return &e
}


func CreateSsTableFromIndex(fileName string, partSize int64, immutableIndex map[string]Entry)*SsTable{
	f := OpenFile(fileName)
	f.Seek(0, 0)

	partData := make(map[string]Entry)
	tableMetaMap := make(map[string]Position)
	tableMetaInfo := &TableMetaInfo{}

	// 先排序
	var keys []string
	for k := range immutableIndex {
		keys = append(keys, k)
	}
	sort.Strings(keys)

	startKey := ""
	startPos := 0
	tableMetaInfo.version = 0
	tableMetaInfo.partSize = partSize
	tableMetaInfo.dataStart = int64(startPos)

	for _, k := range keys {
		if len(startKey) == 0 {
			startKey = k
		}
		partData[k] = immutableIndex[k]
		if len(partData) == int(partSize) {
			bytes, err := json.Marshal(partData)
			if err != nil {
				fmt.Println(" marshal json error ", err)
				os.Exit(-1)
			}
			f.Write(bytes)
			// 保存当前的part信息
			tableMetaMap[startKey] = Position{Start:int64(startPos), Length:int64(len(bytes))}
			startPos += len(bytes)
			startKey = ""
			partData = make(map[string]Entry)
		}
	}
	if len(partData) > 0 {
		bytes, err := json.Marshal(partData)
		if err != nil {
			fmt.Println(" marshal json error ", err)
			os.Exit(-1)
		}
		f.Write(bytes)
		// 保存当前的part信息
		tableMetaMap[startKey] = Position{Start:int64(startPos), Length:int64(len(bytes))}
		startPos += len(bytes)
		startKey = ""
		partData = make(map[string]Entry)
	}

	tableMetaInfo.dataLen = int64(startPos)
	tableMetaInfo.indexStart = int64(startPos)
	// 将稀疏信息也写入
	bytes, err := json.Marshal(tableMetaMap)
	if err != nil {
		fmt.Println(" marshal json error ", err)
		os.Exit(-1)
	}
	f.Write(bytes)
	tableMetaInfo.indexLen = int64(len(bytes))

	// 写入文件元信息
	binary.Write(f, binary.LittleEndian, &tableMetaInfo.version)
	binary.Write(f, binary.LittleEndian, &tableMetaInfo.dataStart)
	binary.Write(f, binary.LittleEndian, &tableMetaInfo.dataLen)
	binary.Write(f, binary.LittleEndian, &tableMetaInfo.indexStart)
	binary.Write(f, binary.LittleEndian, &tableMetaInfo.indexLen)
	binary.Write(f, binary.LittleEndian, &tableMetaInfo.partSize)


	return &SsTable{
		f: f,
		filePath: fileName,
		tableMetaInfo:tableMetaInfo,
		sparseIndex:tableMetaMap,
		partSize:partSize,
	}
}

func RestoreFromFile(fileName string)*SsTable {
	f := OpenFile(fileName)

	// 因为是int64 占八个字节 从文件尾部开始
	tableMetaInfo := &TableMetaInfo{}
	f.Seek(0, 0)
	fileInfo, err := f.Stat()
	if err != nil {
		fmt.Println(" RestoreFromFile file stat error ", err.Error())
		os.Exit(1)
	}
	fileSize := fileInfo.Size()
	f.Seek(fileSize-8*6, 0)
	err = binary.Read(f, binary.LittleEndian, &tableMetaInfo.version)
	if err != nil {
		fmt.Println(" RestoreFromFile binary.Read error ", err.Error())
		os.Exit(1)
	}
	f.Seek(fileSize-8*5, 0)
	err = binary.Read(f, binary.LittleEndian, &tableMetaInfo.dataStart)
	if err != nil {
		fmt.Println(" RestoreFromFile binary.Read error ", err.Error())
		os.Exit(1)
	}
	f.Seek(fileSize-8*4, 0)
	err = binary.Read(f, binary.LittleEndian, &tableMetaInfo.dataLen)
	if err != nil {
		fmt.Println(" RestoreFromFile binary.Read error ", err.Error())
		os.Exit(1)
	}
	f.Seek(fileSize-8*3, 0)
	err = binary.Read(f, binary.LittleEndian, &tableMetaInfo.indexStart)
	if err != nil {
		fmt.Println(" RestoreFromFile binary.Read error ", err.Error())
		os.Exit(1)
	}
	f.Seek(fileSize-8*2, 0)
	err = binary.Read(f, binary.LittleEndian, &tableMetaInfo.indexLen)
	if err != nil {
		fmt.Println(" RestoreFromFile binary.Read error ", err.Error())
		os.Exit(1)
	}
	f.Seek(fileSize-8*1, 0)
	err = binary.Read(f, binary.LittleEndian, &tableMetaInfo.partSize)
	if err != nil {
		fmt.Println(" RestoreFromFile binary.Read error ", err.Error())
		os.Exit(1)
	}

	fmt.Println(" restore from file ", tableMetaInfo)
	f.Seek(0, 0)

	f.Seek(tableMetaInfo.indexStart, 0)

	bytes := make([]byte, tableMetaInfo.indexLen)

	_, err = f.Read(bytes)
	if err != nil {
		fmt.Println(" restore from file error ", err.Error())
		os.Exit(-1)
	}
	sparseIndex := make(map[string]Position)


	err = json.Unmarshal(bytes, &sparseIndex)
	if err != nil {
		fmt.Println(" restore from file Unmarshal error ", err.Error())
		os.Exit(-1)
	}

	return &SsTable{
		f: f,
		tableMetaInfo:tableMetaInfo,
		partSize:tableMetaInfo.partSize,
		filePath:fileName,
		sparseIndex:sparseIndex,
	}
}


type Wal struct {
	f *os.File
	WalFileName string
	TempWalFileName string
}


func(w *Wal) InitWal(){
	w.f = OpenFile(w.WalFileName)
}

func(w *Wal) WriteInt(length int)error {
	return binary.Write(w.f, binary.LittleEndian, int64(length))
}

func(w *Wal) ReadInt() (int, error) {
	var r int64

	return int(r), binary.Read(w.f, binary.LittleEndian, &r)
}

func(w *Wal) Write(bytes []byte) error {
	totalLen := len(bytes)
	writeLen := 0
	for writeLen < totalLen {
		n, err := w.f.Write(bytes)
		if err != nil {
			return err
		}
		writeLen += n
	}
	return nil
}

func (w *Wal) Read(l int)([]byte, error)  {
	bytes := make([]byte, l)
	_, err := w.f.Read(bytes)
	return bytes, err
}


type Entry struct {
	Method int `json:"method"`
	Key string `json:"key"`
	Value string `json:"value"`
}


func(e *Entry) Encode()([]byte, error){
	bytes, err := json.Marshal(e)
	if err != nil {
		return []byte{}, err
	}
	return bytes, nil
}

func DecodeEntry(bytes []byte)(Entry, error){
	var e Entry
	err := json.Unmarshal(bytes, &e)
	return e, err
}


type LsmKvStore struct {
	// 当前的索引地址
	index map[string]Entry
	// 不可变的索引地址
	immutableIndex map[string]Entry

	lock sync.RWMutex

	ssTables []*SsTable
	// 触发同步到sstable的阈值
	storeThreshold int

	tableName string
	partSize int64
	Wal *Wal
}


func(l *LsmKvStore) SwitchIndex(){
	l.immutableIndex = l.index
	l.index = make(map[string]Entry)
	// 关闭文件
	l.Wal.f.Close()
	// 重命名文件
	os.Rename(l.Wal.WalFileName, l.Wal.TempWalFileName)
	l.Wal.InitWal()
}


func(l *LsmKvStore) StoreToSsTable(){
	timeStr := time.Now().Format("2006-01-02_15:04:05.000000")
	fileName := fmt.Sprintf("%s%s", timeStr, l.tableName)
	s := CreateSsTableFromIndex(fileName, l.partSize, l.immutableIndex)
	l.ssTables = append(l.ssTables, s)
}


func(l *LsmKvStore) RestoreFromFiles(){
	files, _ := ioutil.ReadDir("./")
	for _, f := range files {
		if strings.HasSuffix(f.Name(), l.tableName){
			s := RestoreFromFile(f.Name())
			l.ssTables = append(l.ssTables, s)
		}
	}
}

func(l *LsmKvStore) RestoreFromWal() {
	for {
		length, err := l.Wal.ReadInt()
		if err != nil {
			if err == io.EOF {
				return
			}
			fmt.Println(" error read wal int ", err.Error())
			return
		}
		bytes, err := l.Wal.Read(length)
		if err != nil {
			fmt.Println(" error read wal unmarshal ", err.Error())
			os.Exit(1)
		}
		e := Entry{}
		err = json.Unmarshal(bytes, &e)
		if err != nil {
			fmt.Println(" error read wal unmarshal ", err.Error())
			os.Exit(1)
		}
		l.index[e.Key] = e
	}

}


func(l *LsmKvStore) Get(key string)(string, error){
	l.lock.RLock()
	defer l.lock.RUnlock()

	v, ok := l.index[key]
	fmt.Println(" index  ", l.index)
	if ok == true {
		if v.Method == DELETE_METHOD {
			return "", nil
		}
		return v.Value, nil
	}

	// 查找不可变的index
	v, ok = l.immutableIndex[key]
	if ok == true {
		if v.Method == DELETE_METHOD {
			return "", nil
		}
		return v.Value, nil
	}

	// 查找文件
	for _, s := range l.ssTables {
		e := s.Query(key)
		if e != nil {
			if e.Method == SET_METHOD {
				return e.Value, nil
			}
		}
	}
	return "", errors.New(" not found key")
}

func(l *LsmKvStore) Set(key, value string) error {
	l.lock.Lock()
	defer l.lock.Unlock()
	e := Entry{Key: key, Value:value, Method: SET_METHOD}
	bytes, err := e.Encode()
	if err != nil {
		fmt.Println("error set  ", key, value)
		return err
	}
	err = l.Wal.WriteInt(len(bytes))
	if err != nil {
		fmt.Println(" error write int  ", err.Error())
		return err
	}
	l.Wal.Write(bytes)
	l.index[key] = e

	if len(l.index) >= l.storeThreshold {
		fmt.Println(" threshold  ", len(l.index))
		l.SwitchIndex()
		l.StoreToSsTable()
	}
	return nil
}

func(l *LsmKvStore) Delete(key string) error {
	l.lock.Lock()
	defer l.lock.Unlock()

	e := Entry{Key:key, Value:"", Method:DELETE_METHOD}

	bytes, err := e.Encode()
	if err != nil {
		fmt.Println("error delete  ", key)
		return err
	}

	err = l.Wal.WriteInt(len(bytes))
	if err != nil {
		fmt.Println(" error write int  ", err.Error(), len(bytes))
		return err
	}
	l.Wal.Write(bytes)
	l.index[key] = e

	if len(l.index) >= l.storeThreshold {
		fmt.Println(" threshold  ", len(l.index))
		l.SwitchIndex()
		l.StoreToSsTable()
	}
	return nil
}

测试的程序如下:

package main

import (
	"fmt"
	"github.com/xiaowuzidaxia/trydb"
)

func main() {
	err := trydb.Set("a", "b")
	if err != nil {
		fmt.Println(err)
	}
	a, err := trydb.Get("a")
	if err != nil {
		fmt.Println(err)
	}
	fmt.Println("get a", a)
	
	err = trydb.Delete("a")
	if err != nil {
		fmt.Println(err)
	}
	
	a, err = trydb.Get("a")
	if err != nil {
		fmt.Println(err)
	}
	fmt.Println("get a", a)
	
	
	for i:=0;i<20; i++ {
		k := fmt.Sprintf("key_w_%d", i)
		trydb.Set(k, k)
	}

	v, err := trydb.Get("a")
	fmt.Println(v, err)
	trydb.Set("kk", "valuekk")
	trydb.Set("kk11", "valuekk")
	v, err = trydb.Get("key_w_19")
	fmt.Println(v, err)
	v, err = trydb.Get("kk")
	fmt.Println(v, err)
}

运行完成之后就可看出输出的相关内容,本文主要是为了动手实践一下,在实践代码中,并没有完成有关sstable合并和分层的操作,其他的一些细节都参考链接博文实现。

总结

本文主要是学习与了解LSM相关的流程,主流的一些NoSQL的实现都基于此原理,通过文中链接的博文主要来深刻理解有关LSM相关技术栈的组件的一些运行特性。由于本人才疏学浅,如有错误请批评指正。


版权声明:本文为qq_33339479原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接和本声明。