378 lines
10 KiB
Go
378 lines
10 KiB
Go
/*
|
|
Copyright IBM Corp. All Rights Reserved.
|
|
|
|
SPDX-License-Identifier: Apache-2.0
|
|
*/
|
|
|
|
package leveldbhelper
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"sync"
|
|
|
|
"github.com/hyperledger/fabric/common/ledger/dataformat"
|
|
"github.com/pkg/errors"
|
|
"github.com/syndtr/goleveldb/leveldb"
|
|
"github.com/syndtr/goleveldb/leveldb/iterator"
|
|
)
|
|
|
|
const (
|
|
// internalDBName is used to keep track of data related to internals such as data format
|
|
// _ is used as name because this is not allowed as a channelname
|
|
internalDBName = "_"
|
|
// maxBatchSize limits the memory usage (1MB) for a batch. It is measured by the total number of bytes
|
|
// of all the keys in a batch.
|
|
maxBatchSize = 1000000
|
|
)
|
|
|
|
var (
|
|
dbNameKeySep = []byte{0x00}
|
|
lastKeyIndicator = byte(0x01)
|
|
formatVersionKey = []byte{'f'} // a single key in db whose value indicates the version of the data format
|
|
)
|
|
|
|
// closeFunc closes the db handle
|
|
type closeFunc func()
|
|
|
|
// Conf configuration for `Provider`
|
|
//
|
|
// `ExpectedFormat` is the expected value of the format key in the internal database.
|
|
// At the time of opening the db, A check is performed that
|
|
// either the db is empty (i.e., opening for the first time) or the value
|
|
// of the formatVersionKey is equal to `ExpectedFormat`. Otherwise, an error is returned.
|
|
// A nil value for ExpectedFormat indicates that the format is never set and hence there is no such record.
|
|
type Conf struct {
|
|
DBPath string
|
|
ExpectedFormat string
|
|
}
|
|
|
|
// Provider enables to use a single leveldb as multiple logical leveldbs
|
|
type Provider struct {
|
|
db *DB
|
|
|
|
mux sync.Mutex
|
|
dbHandles map[string]*DBHandle
|
|
}
|
|
|
|
// DataFormatInfo contains the information about the version of the data format
|
|
type DataFormatInfo struct {
|
|
FormatVerison string // version of the data format
|
|
IsDBEmpty bool // set to true if the db does not contain any data
|
|
}
|
|
|
|
// RetrieveDataFormatInfo retrieves the DataFormatInfo for the db at the supplied `dbPath`
|
|
func RetrieveDataFormatInfo(dbPath string) (*DataFormatInfo, error) {
|
|
db := CreateDB(&Conf{DBPath: dbPath})
|
|
db.Open()
|
|
defer db.Close()
|
|
|
|
dbEmpty, err := db.IsEmpty()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
internalDB := &DBHandle{
|
|
db: db,
|
|
dbName: internalDBName,
|
|
}
|
|
|
|
formatVersion, err := internalDB.Get(formatVersionKey)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &DataFormatInfo{
|
|
IsDBEmpty: dbEmpty,
|
|
FormatVerison: string(formatVersion),
|
|
}, nil
|
|
}
|
|
|
|
// NewProvider constructs a Provider
|
|
func NewProvider(conf *Conf) (*Provider, error) {
|
|
db, err := openDBAndCheckFormat(conf)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &Provider{
|
|
db: db,
|
|
dbHandles: make(map[string]*DBHandle),
|
|
}, nil
|
|
}
|
|
|
|
func openDBAndCheckFormat(conf *Conf) (d *DB, e error) {
|
|
db := CreateDB(conf)
|
|
db.Open()
|
|
|
|
defer func() {
|
|
if e != nil {
|
|
db.Close()
|
|
}
|
|
}()
|
|
|
|
internalDB := &DBHandle{
|
|
db: db,
|
|
dbName: internalDBName,
|
|
}
|
|
|
|
dbEmpty, err := db.IsEmpty()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if dbEmpty && conf.ExpectedFormat != "" {
|
|
logger.Infof("DB is empty Setting db format as %s", conf.ExpectedFormat)
|
|
if err := internalDB.Put(formatVersionKey, []byte(conf.ExpectedFormat), true); err != nil {
|
|
return nil, err
|
|
}
|
|
return db, nil
|
|
}
|
|
|
|
formatVersion, err := internalDB.Get(formatVersionKey)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
logger.Debugf("Checking for db format at path [%s]", conf.DBPath)
|
|
|
|
if !bytes.Equal(formatVersion, []byte(conf.ExpectedFormat)) {
|
|
logger.Errorf("The db at path [%s] contains data in unexpected format. expected data format = [%s] (%#v), data format = [%s] (%#v).",
|
|
conf.DBPath, conf.ExpectedFormat, []byte(conf.ExpectedFormat), formatVersion, formatVersion)
|
|
return nil, &dataformat.ErrFormatMismatch{
|
|
ExpectedFormat: conf.ExpectedFormat,
|
|
Format: string(formatVersion),
|
|
DBInfo: fmt.Sprintf("leveldb at [%s]", conf.DBPath),
|
|
}
|
|
}
|
|
logger.Debug("format is latest, nothing to do")
|
|
return db, nil
|
|
}
|
|
|
|
// GetDataFormat returns the format of the data
|
|
func (p *Provider) GetDataFormat() (string, error) {
|
|
f, err := p.GetDBHandle(internalDBName).Get(formatVersionKey)
|
|
return string(f), err
|
|
}
|
|
|
|
func (p *Provider) SetDataFormat(format string) error {
|
|
db := p.GetDBHandle(internalDBName)
|
|
return db.Put(formatVersionKey, []byte(format), true)
|
|
}
|
|
|
|
// GetDBHandle returns a handle to a named db
|
|
func (p *Provider) GetDBHandle(dbName string) *DBHandle {
|
|
p.mux.Lock()
|
|
defer p.mux.Unlock()
|
|
dbHandle := p.dbHandles[dbName]
|
|
if dbHandle == nil {
|
|
closeFunc := func() {
|
|
p.mux.Lock()
|
|
defer p.mux.Unlock()
|
|
delete(p.dbHandles, dbName)
|
|
}
|
|
dbHandle = &DBHandle{dbName, p.db, closeFunc}
|
|
p.dbHandles[dbName] = dbHandle
|
|
}
|
|
return dbHandle
|
|
}
|
|
|
|
// Close closes the underlying leveldb
|
|
func (p *Provider) Close() {
|
|
p.db.Close()
|
|
}
|
|
|
|
// Drop drops all the data for the given dbName
|
|
func (p *Provider) Drop(dbName string) error {
|
|
dbHandle := p.GetDBHandle(dbName)
|
|
defer dbHandle.Close()
|
|
return dbHandle.deleteAll()
|
|
}
|
|
|
|
// DBHandle is an handle to a named db
|
|
type DBHandle struct {
|
|
dbName string
|
|
db *DB
|
|
closeFunc closeFunc
|
|
}
|
|
|
|
// Get returns the value for the given key
|
|
func (h *DBHandle) Get(key []byte) ([]byte, error) {
|
|
return h.db.Get(constructLevelKey(h.dbName, key))
|
|
}
|
|
|
|
// Put saves the key/value
|
|
func (h *DBHandle) Put(key []byte, value []byte, sync bool) error {
|
|
return h.db.Put(constructLevelKey(h.dbName, key), value, sync)
|
|
}
|
|
|
|
// Delete deletes the given key
|
|
func (h *DBHandle) Delete(key []byte, sync bool) error {
|
|
return h.db.Delete(constructLevelKey(h.dbName, key), sync)
|
|
}
|
|
|
|
// DeleteAll deletes all the keys that belong to the channel (dbName).
|
|
func (h *DBHandle) deleteAll() error {
|
|
iter, err := h.GetIterator(nil, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer iter.Release()
|
|
|
|
// use leveldb iterator directly to be more efficient
|
|
dbIter := iter.Iterator
|
|
|
|
// This is common code shared by all the leveldb instances. Because each leveldb has its own key size pattern,
|
|
// each batch is limited by memory usage instead of number of keys. Once the batch memory usage reaches maxBatchSize,
|
|
// the batch will be committed.
|
|
numKeys := 0
|
|
batchSize := 0
|
|
batch := &leveldb.Batch{}
|
|
for dbIter.Next() {
|
|
if err := dbIter.Error(); err != nil {
|
|
return errors.Wrap(err, "internal leveldb error while retrieving data from db iterator")
|
|
}
|
|
key := dbIter.Key()
|
|
numKeys++
|
|
batchSize = batchSize + len(key)
|
|
batch.Delete(key)
|
|
if batchSize >= maxBatchSize {
|
|
if err := h.db.WriteBatch(batch, true); err != nil {
|
|
return err
|
|
}
|
|
logger.Infof("Have removed %d entries for channel %s in leveldb %s", numKeys, h.dbName, h.db.conf.DBPath)
|
|
batchSize = 0
|
|
batch.Reset()
|
|
}
|
|
}
|
|
if batch.Len() > 0 {
|
|
return h.db.WriteBatch(batch, true)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// IsEmpty returns true if no data exists for the DBHandle
|
|
func (h *DBHandle) IsEmpty() (bool, error) {
|
|
itr, err := h.GetIterator(nil, nil)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
defer itr.Release()
|
|
|
|
if err := itr.Error(); err != nil {
|
|
return false, errors.WithMessagef(itr.Error(), "internal leveldb error while obtaining next entry from iterator")
|
|
}
|
|
|
|
return !itr.Next(), nil
|
|
}
|
|
|
|
// NewUpdateBatch returns a new UpdateBatch that can be used to update the db
|
|
func (h *DBHandle) NewUpdateBatch() *UpdateBatch {
|
|
return &UpdateBatch{
|
|
dbName: h.dbName,
|
|
leveldbBatch: &leveldb.Batch{},
|
|
}
|
|
}
|
|
|
|
// WriteBatch writes a batch in an atomic way
|
|
func (h *DBHandle) WriteBatch(batch *UpdateBatch, sync bool) error {
|
|
if batch == nil || batch.leveldbBatch.Len() == 0 {
|
|
return nil
|
|
}
|
|
if err := h.db.WriteBatch(batch.leveldbBatch, sync); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GetIterator gets an handle to iterator. The iterator should be released after the use.
|
|
// The resultset contains all the keys that are present in the db between the startKey (inclusive) and the endKey (exclusive).
|
|
// A nil startKey represents the first available key and a nil endKey represent a logical key after the last available key
|
|
func (h *DBHandle) GetIterator(startKey []byte, endKey []byte) (*Iterator, error) {
|
|
sKey := constructLevelKey(h.dbName, startKey)
|
|
eKey := constructLevelKey(h.dbName, endKey)
|
|
if endKey == nil {
|
|
// replace the last byte 'dbNameKeySep' by 'lastKeyIndicator'
|
|
eKey[len(eKey)-1] = lastKeyIndicator
|
|
}
|
|
logger.Debugf("Getting iterator for range [%#v] - [%#v]", sKey, eKey)
|
|
itr := h.db.GetIterator(sKey, eKey)
|
|
if err := itr.Error(); err != nil {
|
|
itr.Release()
|
|
return nil, errors.Wrapf(err, "internal leveldb error while obtaining db iterator")
|
|
}
|
|
return &Iterator{h.dbName, itr}, nil
|
|
}
|
|
|
|
// Close closes the DBHandle after its db data have been deleted
|
|
func (h *DBHandle) Close() {
|
|
if h.closeFunc != nil {
|
|
h.closeFunc()
|
|
}
|
|
}
|
|
|
|
// UpdateBatch encloses the details of multiple `updates`
|
|
type UpdateBatch struct {
|
|
leveldbBatch *leveldb.Batch
|
|
dbName string
|
|
size int
|
|
}
|
|
|
|
// Put adds a KV
|
|
func (b *UpdateBatch) Put(key []byte, value []byte) {
|
|
if value == nil {
|
|
panic("Nil value not allowed")
|
|
}
|
|
k := constructLevelKey(b.dbName, key)
|
|
b.leveldbBatch.Put(k, value)
|
|
b.size += len(k) + len(value)
|
|
}
|
|
|
|
// Delete deletes a Key and associated value
|
|
func (b *UpdateBatch) Delete(key []byte) {
|
|
k := constructLevelKey(b.dbName, key)
|
|
b.size += len(k)
|
|
b.leveldbBatch.Delete(k)
|
|
}
|
|
|
|
// Size returns the current size of the batch
|
|
func (b *UpdateBatch) Size() int {
|
|
return b.size
|
|
}
|
|
|
|
// Len returns number of records in the batch
|
|
func (b *UpdateBatch) Len() int {
|
|
return b.leveldbBatch.Len()
|
|
}
|
|
|
|
// Reset resets the batch
|
|
func (b *UpdateBatch) Reset() {
|
|
b.leveldbBatch.Reset()
|
|
b.size = 0
|
|
}
|
|
|
|
// Iterator extends actual leveldb iterator
|
|
type Iterator struct {
|
|
dbName string
|
|
iterator.Iterator
|
|
}
|
|
|
|
// Key wraps actual leveldb iterator method
|
|
func (itr *Iterator) Key() []byte {
|
|
return retrieveAppKey(itr.Iterator.Key())
|
|
}
|
|
|
|
// Seek moves the iterator to the first key/value pair
|
|
// whose key is greater than or equal to the given key.
|
|
// It returns whether such pair exist.
|
|
func (itr *Iterator) Seek(key []byte) bool {
|
|
levelKey := constructLevelKey(itr.dbName, key)
|
|
return itr.Iterator.Seek(levelKey)
|
|
}
|
|
|
|
func constructLevelKey(dbName string, key []byte) []byte {
|
|
return append(append([]byte(dbName), dbNameKeySep...), key...)
|
|
}
|
|
|
|
func retrieveAppKey(levelKey []byte) []byte {
|
|
return bytes.SplitN(levelKey, dbNameKeySep, 2)[1]
|
|
}
|