/* Copyright IBM Corp. All Rights Reserved. SPDX-License-Identifier: Apache-2.0 */ package blkstorage import ( "bufio" "fmt" "io" "os" "github.com/golang/protobuf/proto" "github.com/pkg/errors" ) // ErrUnexpectedEndOfBlockfile error used to indicate an unexpected end of a file segment // this can happen mainly if a crash occurs during appending a block and partial block contents // get written towards the end of the file var ErrUnexpectedEndOfBlockfile = errors.New("unexpected end of blockfile") // blockfileStream reads blocks sequentially from a single file. // It starts from the given offset and can traverse till the end of the file type blockfileStream struct { fileNum int file *os.File reader *bufio.Reader currentOffset int64 } // blockStream reads blocks sequentially from multiple files. // it starts from a given file offset and continues with the next // file segment until the end of the last segment (`endFileNum`) type blockStream struct { rootDir string currentFileNum int endFileNum int currentFileStream *blockfileStream } // blockPlacementInfo captures the information related // to block's placement in the file. type blockPlacementInfo struct { fileNum int blockStartOffset int64 blockBytesOffset int64 } // ///////////////////////////////// // blockfileStream functions // ////////////////////////////////// func newBlockfileStream(rootDir string, fileNum int, startOffset int64) (*blockfileStream, error) { filePath := deriveBlockfilePath(rootDir, fileNum) logger.Debugf("newBlockfileStream(): filePath=[%s], startOffset=[%d]", filePath, startOffset) var file *os.File var err error if file, err = os.OpenFile(filePath, os.O_RDONLY, 0o600); err != nil { return nil, errors.Wrapf(err, "error opening block file %s", filePath) } var newPosition int64 if newPosition, err = file.Seek(startOffset, 0); err != nil { return nil, errors.Wrapf(err, "error seeking block file [%s] to startOffset [%d]", filePath, startOffset) } if newPosition != startOffset { panic(fmt.Sprintf("Could not seek block file [%s] to startOffset [%d]. New position = [%d]", filePath, startOffset, newPosition)) } s := &blockfileStream{fileNum, file, bufio.NewReader(file), startOffset} return s, nil } func (s *blockfileStream) nextBlockBytes() ([]byte, error) { blockBytes, _, err := s.nextBlockBytesAndPlacementInfo() return blockBytes, err } // nextBlockBytesAndPlacementInfo returns bytes for the next block // along with the offset information in the block file. // An error `ErrUnexpectedEndOfBlockfile` is returned if a partial written data is detected // which is possible towards the tail of the file if a crash had taken place during appending of a block func (s *blockfileStream) nextBlockBytesAndPlacementInfo() ([]byte, *blockPlacementInfo, error) { var lenBytes []byte var err error var fileInfo os.FileInfo moreContentAvailable := true if fileInfo, err = s.file.Stat(); err != nil { return nil, nil, errors.Wrapf(err, "error getting block file stat") } if s.currentOffset == fileInfo.Size() { logger.Debugf("Finished reading file number [%d]", s.fileNum) return nil, nil, nil } remainingBytes := fileInfo.Size() - s.currentOffset // Peek 8 or smaller number of bytes (if remaining bytes are less than 8) // Assumption is that a block size would be small enough to be represented in 8 bytes varint peekBytes := 8 if remainingBytes < int64(peekBytes) { peekBytes = int(remainingBytes) moreContentAvailable = false } logger.Debugf("Remaining bytes=[%d], Going to peek [%d] bytes", remainingBytes, peekBytes) if lenBytes, err = s.reader.Peek(peekBytes); err != nil { return nil, nil, errors.Wrapf(err, "error peeking [%d] bytes from block file", peekBytes) } length, n := proto.DecodeVarint(lenBytes) if n == 0 { // proto.DecodeVarint did not consume any byte at all which means that the bytes // representing the size of the block are partial bytes if !moreContentAvailable { return nil, nil, ErrUnexpectedEndOfBlockfile } panic(errors.Errorf("Error in decoding varint bytes [%#v]", lenBytes)) } bytesExpected := int64(n) + int64(length) if bytesExpected > remainingBytes { logger.Debugf("At least [%d] bytes expected. Remaining bytes = [%d]. Returning with error [%s]", bytesExpected, remainingBytes, ErrUnexpectedEndOfBlockfile) return nil, nil, ErrUnexpectedEndOfBlockfile } // skip the bytes representing the block size if _, err = s.reader.Discard(n); err != nil { return nil, nil, errors.Wrapf(err, "error discarding [%d] bytes", n) } blockBytes := make([]byte, length) if _, err = io.ReadAtLeast(s.reader, blockBytes, int(length)); err != nil { logger.Errorf("Error reading [%d] bytes from file number [%d], error: %s", length, s.fileNum, err) return nil, nil, errors.Wrapf(err, "error reading [%d] bytes from file number [%d]", length, s.fileNum) } blockPlacementInfo := &blockPlacementInfo{ fileNum: s.fileNum, blockStartOffset: s.currentOffset, blockBytesOffset: s.currentOffset + int64(n), } s.currentOffset += int64(n) + int64(length) logger.Debugf("Returning blockbytes - length=[%d], placementInfo={%s}", len(blockBytes), blockPlacementInfo) return blockBytes, blockPlacementInfo, nil } func (s *blockfileStream) close() error { return errors.WithStack(s.file.Close()) } // ///////////////////////////////// // blockStream functions // ////////////////////////////////// func newBlockStream(rootDir string, startFileNum int, startOffset int64, endFileNum int) (*blockStream, error) { startFileStream, err := newBlockfileStream(rootDir, startFileNum, startOffset) if err != nil { return nil, err } return &blockStream{rootDir, startFileNum, endFileNum, startFileStream}, nil } func (s *blockStream) moveToNextBlockfileStream() error { var err error if err = s.currentFileStream.close(); err != nil { return err } s.currentFileNum++ if s.currentFileStream, err = newBlockfileStream(s.rootDir, s.currentFileNum, 0); err != nil { return err } return nil } func (s *blockStream) nextBlockBytes() ([]byte, error) { blockBytes, _, err := s.nextBlockBytesAndPlacementInfo() return blockBytes, err } func (s *blockStream) nextBlockBytesAndPlacementInfo() ([]byte, *blockPlacementInfo, error) { var blockBytes []byte var blockPlacementInfo *blockPlacementInfo var err error if blockBytes, blockPlacementInfo, err = s.currentFileStream.nextBlockBytesAndPlacementInfo(); err != nil { logger.Errorf("Error reading next block bytes from file number [%d]: %s", s.currentFileNum, err) return nil, nil, err } logger.Debugf("blockbytes [%d] read from file [%d]", len(blockBytes), s.currentFileNum) if blockBytes == nil && (s.currentFileNum < s.endFileNum || s.endFileNum < 0) { logger.Debugf("current file [%d] exhausted. Moving to next file", s.currentFileNum) if err = s.moveToNextBlockfileStream(); err != nil { return nil, nil, err } return s.nextBlockBytesAndPlacementInfo() } return blockBytes, blockPlacementInfo, nil } func (s *blockStream) close() error { return s.currentFileStream.close() } func (i *blockPlacementInfo) String() string { return fmt.Sprintf("fileNum=[%d], startOffset=[%d], bytesOffset=[%d]", i.fileNum, i.blockStartOffset, i.blockBytesOffset) }