YggLogBasicImpl:
CEDAR MONITOR
IMPORTS
PBasics, Process, RuntimeError, YggFile, YggInline, YggBuffMan, YggLog, YggLogBasic, YggLogInline
EXPORTS
YggDID, YggLogBasic, YggLogBasicInternal =
BEGIN
DID: PUBLIC TYPE ~ REF DIDRep;
DIDRep: PUBLIC TYPE ~ YggDIDPrivate.DIDRep;
PageCount: TYPE = YggEnvironment.PageCount;
PageNumber: TYPE = YggEnvironment.PageNumber; -- in system pages, not log pages
PageRun: TYPE = YggEnvironment.PageRun;
WordNumber: TYPE = YggLogBasic.WordNumber;
WordCount: TYPE = YggLogBasic.WordCount;
RecordID: TYPE = YggLog.RecordID;
CallerProgrammingError: ERROR = CODE;
InternalProgrammingError: ERROR = CODE;
WordsINWORDS: CARD = WORDS[CARD32];
Header:
PROC [p:
LONG
POINTER]
RETURNS [
LONG
POINTER
TO YggLogRep.Header] =
INLINE {
RETURN [LOOPHOLE[p]] };
HeaderNotInline:
PROC [p:
LONG
POINTER]
RETURNS [
LONG
POINTER
TO YggLogRep.Header] = {
RETURN [LOOPHOLE[p]] };
YggLog tail.
This data structure keeps status information about log chunks near the log tail.
Specifically, any chunk that may be dirty but which has not yet been forced out
must be present in the table. The table is managed as a queue which always has
at least one element in it. When any chunk completes a forceout, its slot in the
table is marked free. A slot is reused if it is free and corresponds to the earliest
log page in the table.
Types
ChunkTableSize: CARDINAL = 4;
ChunkTableIndex: TYPE = [0..ChunkTableSize);
ChunkTableRec:
TYPE =
RECORD [
chunk: YggBuffMan.VMPageSet ← YggBuffMan.nullVMPageSet,
Contains pageRun (segmentPage, pages, etc.) and the LONG POINTER (buffer) to the mapped pages.
recordID: YggLog.RecordID ← YggLog.nullRecordID,
ID of first word of first page of chunk.
forceInProgress:
BOOL ←
FALSE
Set TRUE before forking process to force final update to the chunk; set FALSE when
this process finishes. So if chunk is not the tail and NOT forceInProgress, it is eligible
for re-use.
];
NextChunkTableIndex:
PROC [i: ChunkTableIndex]
RETURNS [ChunkTableIndex] =
INLINE {
RETURN [IF i = LAST[ChunkTableIndex] THEN FIRST[ChunkTableIndex] ELSE SUCC[i]] };
State
Variables below are only updated by AdvanceChunk, except that the TRUE -> FALSE
transition of forceInProgress for a chunk is made by ForceChunk
(forked by AdvanceChunk), and oldestFilePage is updated by Release.
chunkTable: ARRAY ChunkTableIndex OF ChunkTableRec;
tail: ChunkTableIndex;
completion:
CONDITION;
Wait here for a forceInProgress = FALSE in chunkTable.
curVersion: YggLogRep.PageVersion;
Version bit for pages in tail chunk
nextTailChunkID: YggLog.RecordID;
RecordID of first word of first page of NEXT tail chunk. Used to bounds-check RecordIDs
presented to Force and Get. Redundant; can be computed from chunkTable[tail] .
oldestFilePage: PageNumber;
The least-recently written page of the log file that cannot be overwritten.
enforceBufferPages: BOOL ← TRUE;
enforceLogOverwrite: BOOL ← TRUE;
nBufferPages:
INT = 20;
If enforceBufferPages, then do not allow writing within nBufferPages pages
of oldestFilePage
Operations on the log tail.
AdvanceChunk:
PUBLIC
ENTRY
PROC []
RETURNS [
version: YggLogRep.PageVersion, pagesInChunk: INT, firstPagePtr: LONG POINTER] = {
YggLogBasicInternal.AdvanceChunk
Caller must hold the log tail monitor; hence only one caller at a time.
newTail: ChunkTableIndex;
nextPage: PageNumber;
runForNextPage: PageRun;
biasedOldestFilePage: PageNumber;
Fork a process to write the current tail chunk.
chunkTable[tail].forceInProgress ← TRUE;
TRUSTED {Process.Detach[FORK ForceChunk[index: tail]]; };
Wait, if necessary, for room in the chunk table to allocate the next tail chunk.
DO
newTail ← NextChunkTableIndex[tail];
IF NOT chunkTable[newTail].forceInProgress THEN EXIT;
WAIT completion;
ENDLOOP;
Now make sure that the write will not overwrite, or even get too close to, log pages that are still needed.
nextPage ← PageFollowing[chunkTable[tail].chunk.first.pageRun];
runForNextPage ← PageRunStartingWith[nextPage];
biasedOldestFilePage ← oldestFilePage - nBufferPages;
IF biasedOldestFilePage < 0
THEN
biasedOldestFilePage ← biasedOldestFilePage + logFileSize;
IF (enforceBufferPages
AND
IsMemberOfPageRun[biasedOldestFilePage, runForNextPage]) OR
(enforceLogOverwrite AND IsMemberOfPageRun[oldestFilePage, runForNextPage])
THEN {
RETURN WITH ERROR YggLog.WriteFailed
};
About to start updating global data structures; from here on it's do or die--no more recoverable errors allowed;
IF chunkTable[newTail].chunk #
NIL
THEN
YggBuffMan.ReleaseVMPageSet[
vMPageSet: chunkTable[newTail].chunk, releaseState: clean, keep: TRUE];
IF nextPage = 0 THEN curVersion ← 1 - curVersion;
tail ← newTail;
chunkTable[tail].recordID ← nextTailChunkID;
chunkTable[tail].chunk ← YggBuffMan.UsePages[
fileHandle: logFileHandle, tid: YggEnvironment.nullTransID, pageRun: runForNextPage];
nextTailChunkID ← YggLogInline.AddC[r: nextTailChunkID,
words: logPagesInFilePages*YggInline.ShortWordsFromPages[chunkTable[tail].chunk.first.pageRun.pages, log2MDwordsPerPage]];
RETURN [
version: curVersion,
pagesInChunk: chunkTable[tail].chunk.first.pageRun.pages*logPagesInFilePages,
firstPagePtr: chunkTable[tail].chunk.first.buffer];
};
ForceChunk:
PROC [index: ChunkTableIndex] = {
On entry, chunkTable[index].forceInProgress.
Force the chunk, and set forceInProgress ← false.
Note: do NOT hold monitor during force out.
BroadcastCompletion:
ENTRY
PROC [] =
INLINE {
chunkTable[index].forceInProgress ← FALSE;
BROADCAST completion;
};
YggBuffMan.ForceOutVMPageSet[chunkTable[index].chunk];
BroadcastCompletion[];
};
ForceTo:
PUBLIC
PROC [followingRecord: RecordID] = {
YggLogBasicInternal.ForceTo
If the word before followingRecord is in the current tail chunk, force that chunk
(using this process). Then wait for other chunks that precede followingRecord to
go out.
TailForceRequired:
ENTRY
PROC []
RETURNS [doForce: BOOL, chunk: YggBuffMan.VMPageSet] = TRUSTED {
followingRecord must not be later than page about to be written.
IF ConstArith.Compare[followingRecord, nextTailChunkID] = greater
THEN
RETURN WITH ERROR CallerProgrammingError;
IF ConstArith.Compare[chunkTable[tail].recordID, followingRecord] = less
THEN {
YggBuffMan.ShareVMPageSet[chunkTable[tail].chunk];
RETURN [TRUE, chunkTable[tail].chunk] }
ELSE RETURN [FALSE, YggBuffMan.nullVMPageSet]
};
AwaitForce:
ENTRY
PROC [] = {
Return when all chunks that precede followingRecord are out.
FOR currentChunk: ChunkTableIndex
IN ChunkTableIndex
DO
TRUSTED {
IF ConstArith.Compare[chunkTable[currentChunk].recordID, followingRecord] = less
THEN UNTIL
NOT chunkTable[currentChunk].forceInProgress
DO
WAIT completion;
ENDLOOP;
};
ENDLOOP;
};
doForce: BOOL; chunk: YggBuffMan.VMPageSet;
[doForce, chunk] ← TailForceRequired[];
IF doForce
THEN {
YggBuffMan.ForceOutVMPageSet[vMPageSet: chunk];
YggBuffMan.ReleaseVMPageSet[vMPageSet: chunk, releaseState: clean, keep: TRUE] };
AwaitForce[];
};
OpenBasicForPut:
PUBLIC
ENTRY
PROC [
nextPage: PageNumber, version: YggLogRep.PageVersion, nextRecord: RecordID]
RETURNS [pagesInChunk: INT, firstPagePtr: LONG POINTER] = {
YggLogBasicInternal.OpenCoreForPut
wordsToAdd: CARDINAL;
enforceBufferPages ← FALSE;
enforceLogOverwrite ← FALSE;
curVersion ← version;
tail ← FIRST[ChunkTableIndex];
chunkTable[tail].recordID ← nextRecord;
chunkTable[tail].chunk ← YggBuffMan.UsePages[ fileHandle: logFileHandle, tid: YggEnvironment.nullTransID, pageRun: PageRunStartingWith[nextPage]];
wordsToAdd ← logPagesInFilePages * YggInline.ShortWordsFromPages[chunkTable[tail].chunk.first.pageRun.pages, log2MDwordsPerPage];
nextTailChunkID ← YggLogInline.AddC[r: chunkTable[tail].recordID,
words: wordsToAdd];
RETURN [
pagesInChunk: chunkTable[tail].chunk.first.pageRun.pages*logPagesInFilePages,
firstPagePtr: chunkTable[tail].chunk.first.buffer];
};
AssertNormalOperation:
PUBLIC
ENTRY
PROC [] = {
enforceBufferPages ← TRUE;
enforceLogOverwrite ← TRUE;
};
CloseBasicForPut:
PUBLIC
ENTRY
PROC [] = {
YggLogBasicInternal.CloseCoreForPut
FOR i: ChunkTableIndex
IN ChunkTableIndex
DO
WHILE chunkTable[i].forceInProgress
DO
WAIT completion;
ENDLOOP;
YggBuffMan.ForceOutVMPageSet[chunkTable[i].chunk];
YggBuffMan.ReleaseVMPageSet[
vMPageSet: chunkTable[i].chunk, releaseState: clean, keep: FALSE];
chunkTable[i] ← [];
ENDLOOP;
};
XAddC:
PROC [r: RecordID, words:
CARDINAL]
RETURNS [RecordID] =
TRUSTED {
Returns r+c.
RETURN [YggLogInline.AddLC[r, LONG[words]]];
};
XShortWordsFromPages:
PROC [pages:
CARDINAL, logWordsPerPage:
INT]
RETURNS [words:
CARDINAL] = {
RETURN [Basics.BITSHIFT[value: pages, count: logWordsPerPage]] };
XSubNumberLessCount:
PROC [num: WordNumber, words: WordCount]
RETURNS [WordNumber] =
TRUSTED {
Returns r+c.
RETURN ConstArith.Sub[num, words];
};
WordNumberFromRecordID:
PUBLIC
ENTRY
PROC [thisRecord: RecordID]
RETURNS [result: WordNumber] = {
YggLogBasic.WordNumberFromRecordID
offsetFromNextTailChunk: WordCount;
TRUSTED {IF ConstArith.Compare[thisRecord, nextTailChunkID] = greater THEN GOTO tooLarge; };
offsetFromNextTailChunk ← YggLogInline.WordsFromSubtract[
larger: nextTailChunkID, smaller: thisRecord ! RuntimeError.BoundsFault => GOTO tooSmall];
IF YggLogBasic.Compare[offsetFromNextTailChunk, logFileLength] # less THEN GOTO tooSmall;
result ← YggLogInline.SubNumberLessCount[
num: YggInline.WordsFromPages[
pages: PageFollowing[chunkTable[tail].chunk.first.pageRun], wordsPerPage: logPagesInFilePages*YggLog.wordsPerPage],
words: offsetFromNextTailChunk];
RETURN [IF result.sign = negative THEN YggLogInline.AddNumberAndCount[result, logFileLength] ELSE result];
EXITS
tooLarge => ERROR CallerProgrammingError;
tooSmall => ERROR CallerProgrammingError;
};
VerifyGet:
ENTRY
PROC [page: PageCount, pagePtr:
LONG
POINTER] = {
Called from Get if it encounters a page marked invalid.
TRUSTED {IF Header[pagePtr].valid THEN RETURN; };
IF IsMemberOfPageRun[page, [chunkTable[tail].chunk.first.pageRun.firstPage, chunkTable[tail].chunk.first.pageRun.pages]] THEN RETURN;
ERROR;
};
Release:
PUBLIC
PROC [beforeRecord: RecordID] = {
YggLogBasic.Release
AssignToOldestFilePage:
ENTRY
PROC [newOldestFilePage: PageNumber] =
INLINE {
oldestFilePage ← newOldestFilePage };
AssignToOldestFilePage[YggInline.PagesFromWords[
WordNumberFromRecordID[beforeRecord], YggLog.wordsPerPage]];
};
Readonly page stream the log
A readonly page stream on the log.
This stream has a limited capability for backup: it can save one page position that
MUST be returned to later.
End of log is determined at the page level.
nChunksReadAhead: CARDINAL = 4;
PageStreamObject:
TYPE =
RECORD [
chunk: YggBuffMan.VMPageSet ← YggBuffMan.nullVMPageSet,
savePageRun: PageRun,
pageInPageRun, savePageInPageRun: CARDINAL,
expectedVersion, saveExpectedVersion: YggLogRep.PageVersion,
pagePtr: LONG POINTER,
posSaved: BOOL,
nextReadAheadPage: PageNumber ];
pageStream: PageStreamObject;
OpenPageStream:
PROC [p: PageNumber]
RETURNS [endOfLog:
BOOL] = {
The initial stream position is NOT saved.
pageStream.chunk ←
YggBuffMan.ReadPages[ fileHandle: logFileHandle, tid: YggEnvironment.nullTransID, pageRun: PageRunStartingWith[p]];
pageStream.pagePtr ← pageStream.chunk.first.buffer;
pageStream.pageInPageRun ← 0;
pageStream.nextReadAheadPage ← PageFollowing[pageStream.chunk.first.pageRun];
THROUGH [1..nChunksReadAhead] DO ReadAhead[] ENDLOOP;
TRUSTED {
IF NOT Header[pageStream.pagePtr].valid THEN RETURN [TRUE];
pageStream.expectedVersion ← Header[pageStream.pagePtr].version;
};
pageStream.posSaved ← FALSE;
RETURN [FALSE];
};
ClosePageStream:
PROC [] = {
YggBuffMan.ReleaseVMPageSet[
vMPageSet: pageStream.chunk, releaseState: clean, keep: FALSE];
pageStream.chunk ← YggBuffMan.nullVMPageSet;
};
ReadAhead:
PROC [] = {
YggBuffMan.ReadAheadPages[
logFileHandle, PageRunStartingWith[pageStream.nextReadAheadPage]];
pageStream.nextReadAheadPage ←
IF pageStream.nextReadAheadPage < logFileSize - logChunkSize
THEN
pageStream.nextReadAheadPage + logChunkSize
ELSE 0;
};
SavePageStreamPos:
PROC [] = {
pageStream.savePageRun ← [pageStream.chunk.first.pageRun.firstPage, pageStream.chunk.first.pageRun.pages];
pageStream.savePageInPageRun ← pageStream.pageInPageRun;
pageStream.saveExpectedVersion ← pageStream.expectedVersion;
pageStream.posSaved ← TRUE;
};
CurrentPageNumber:
PROC []
RETURNS [PageNumber] =
INLINE {
RETURN [IF pageStream.chunk = NIL THEN 0 ELSE pageStream.chunk.first.pageRun.firstPage + pageStream.pageInPageRun];
};
ForgetSavedPageStreamPos:
PROC [] = {
It is only valid to forget when positioned at the saved position.
IF NOT pageStream.posSaved THEN RETURN;
IF pageStream.savePageRun # [pageStream.chunk.first.pageRun.firstPage, pageStream.chunk.first.pageRun.pages]
THEN
ERROR InternalProgrammingError;
pageStream.posSaved ← FALSE;
};
AdvancePageStream:
PROC []
RETURNS [endOfLog:
BOOL] = {
pageStream.pageInPageRun ← pageStream.pageInPageRun+1;
IF pageStream.pageInPageRun = pageStream.chunk.first.pageRun.pages
THEN {
p: PageNumber;
YggBuffMan.ReleaseVMPageSet[
vMPageSet: pageStream.chunk, releaseState: clean, keep: pageStream.posSaved];
p ← PageFollowing[pageStream.chunk.first.pageRun];
IF p = 0 THEN pageStream.expectedVersion ← 1 - pageStream.expectedVersion;
pageStream.chunk ←
YggBuffMan.ReadPages[fileHandle: logFileHandle, tid: YggEnvironment.nullTransID, pageRun: PageRunStartingWith[p]];
pageStream.pagePtr ← pageStream.chunk.first.buffer;
pageStream.pageInPageRun ← 0;
IF NOT pageStream.posSaved THEN ReadAhead[];
}
ELSE
pageStream.pagePtr ← pageStream.pagePtr + YggLog.wordsPerPage;
TRUSTED {RETURN[ (
NOT Header[pageStream.pagePtr].valid)
OR
(pageStream.expectedVersion # Header[pageStream.pagePtr].version) ]; };
};
SetPageStreamPosFromSavedPos:
PROC [] = {
IF NOT pageStream.posSaved THEN ERROR InternalProgrammingError;
IF PageRun[pageStream.chunk.first.pageRun.firstPage, pageStream.chunk.first.pageRun.pages] # pageStream.savePageRun
THEN {
YggBuffMan.ReleaseVMPageSet[vMPageSet: pageStream.chunk, releaseState: clean, keep: TRUE];
pageStream.chunk ←
YggBuffMan.ReadPages[fileHandle: logFileHandle, tid: YggEnvironment.nullTransID, pageRun: pageStream.savePageRun];
pageStream.expectedVersion ← pageStream.saveExpectedVersion;
};
pageStream.pageInPageRun ← pageStream.savePageInPageRun;
pageStream.pagePtr ← pageStream.chunk.first.buffer +
YggInline.ShortWordsFromPages[pageStream.pageInPageRun, log2MDwordsPerPage];
};
A readonly log record stream.
YggLog reading during recovery is significantly different than log reading for carrying out intentions. In the case of recovery, we expect to encounter the log end while reading; in carrying out intentions we expect all log records to be perfectly formed. Also, log reading for recovery is sequential, while log reading for intentions is more random. In the case of recovery we need the ability to first "peek" and then read; for intentions, reading is enough.
RecordStreamObject:
TYPE =
RECORD [
id: RecordID
of current log record, whose first word is on the current page stream page.
];
recordStream: RecordStreamObject;
OpenRecordStreamFromWord:
PUBLIC
PROC [firstWord: WordNumber]
RETURNS [notStartOfRecord: BOOL, currentRecord: RecordID] = {
notStartOfRecord ← OpenRecordStream[firstWord: firstWord,
firstRecord: YggLogInline.RecordIDFromWordNumber[firstWord]];
RETURN [notStartOfRecord, recordStream.id];
};
OpenRecordStreamFromCheckpoint:
PUBLIC
PROC [
checkpointWord: WordNumber, checkpointRecord: RecordID, firstRecord: RecordID]
RETURNS [notStartOfRecord: BOOL, currentRecord: RecordID] = {
offsetFromCheckpoint, firstWord: WordCount;
TRUSTED { IF ConstArith.Compare[checkpointRecord, firstRecord] = less
THEN
ERROR CallerProgrammingError; };
offsetFromCheckpoint ← YggLogInline.WordsFromSubtract[
larger: checkpointRecord, smaller: firstRecord ! RuntimeError.BoundsFault => GOTO tooSmall];
IF YggLogBasic.Compare[offsetFromCheckpoint, logFileLength] # less THEN GOTO tooSmall;
firstWord ← YggLogInline.SubNumberLessCount[checkpointWord, offsetFromCheckpoint];
IF firstWord.sign = negative THEN firstWord ← YggLogInline.AddNumberAndCount[firstWord, logFileLength];
notStartOfRecord ← OpenRecordStream[firstWord: firstWord, firstRecord: firstRecord];
RETURN [notStartOfRecord, firstRecord];
EXITS
tooSmall => ERROR CallerProgrammingError;
};
OpenRecordStream:
PROC [firstWord: WordNumber, firstRecord: RecordID]
RETURNS [notStartOfRecord: BOOL] = {
p: PageNumber;
w: CARDINAL;
[page: p, wordInPage: w] ← YggInline.DecomposeWords[firstWord, YggLog.wordsPerPage*logPagesInFilePages];
IF YggLogInline.WordInPageFromRecordID[firstRecord, YggLog.wordsPerPage*logPagesInFilePages] # w
THEN
ERROR CallerProgrammingError;
IF OpenPageStream[p].endOfLog THEN GOTO invalidPage;
SavePageStreamPos[];
recordStream ← [id: firstRecord];
RETURN [VerifyCurrentRecordID[].notStartOfRecord];
EXITS
invalidPage => ERROR InvalidPage;
};
InvalidPage: PUBLIC ERROR = CODE;
CloseRecordStream:
PUBLIC
PROC [] = {
ClosePageStream[];
};
VerifyCurrentRecordID:
PROC []
RETURNS [notStartOfRecord:
BOOL] = {
wordInPage: CARDINAL ← YggLogInline.WordInPageFromRecordID[recordStream.id, YggLog.wordsPerPage*logPagesInFilePages];
TRUSTED {
w: CARDINAL ← (wordInPage/YggLog.wordsPerPage) * YggLog.wordsPerPage;
UNTIL w >= wordInPage
DO
w ← w + Header[pageStream.pagePtr+w].nWords;
ENDLOOP;
IF w > wordInPage THEN RETURN [TRUE];
};
TRUSTED {RETURN [Header[pageStream.pagePtr+wordInPage].isContinuation]; };
};
CheckCurrentRecord:
PUBLIC
PROC []
RETURNS [truncated:
BOOL] = TRUSTED {
wordInPage: CARDINAL ← YggLogInline.WordInPageFromRecordID[recordStream.id, YggLog.wordsPerPage*logPagesInFilePages];
IF VerifyCurrentRecordID[].notStartOfRecord THEN ERROR InternalProgrammingError;
IF Header[pageStream.pagePtr+wordInPage].hasContinuation
THEN {
IF (
CARD[Header[pageStream.pagePtr+wordInPage].nWords] + wordInPage)
MOD YggLog.wordsPerPage # 0
THEN
GOTO checkFailed;
DO
wordInPage ← wordInPage + CARD[Header[pageStream.pagePtr+wordInPage].nWords];
IF wordInPage >= CARD[YggLog.wordsPerPage*logPagesInFilePages] THEN {
IF AdvancePageStream[].endOfLog THEN GOTO truncatedRecord;
wordInPage ← 0;
};
IF
NOT Header[pageStream.pagePtr+wordInPage].isContinuation
THEN GOTO truncatedRecord;
IF NOT Header[pageStream.pagePtr+wordInPage].hasContinuation THEN EXIT;
IF CARD[Header[pageStream.pagePtr+wordInPage].nWords] MOD YggLog.wordsPerPage # 0 THEN GOTO checkFailed;
ENDLOOP;
IF
CARD[Header[pageStream.pagePtr+wordInPage].nWords]
NOT
IN [1 .. YggLog.wordsPerPage]
THEN
GOTO checkFailed;
}
ELSE {
IF CARD[Header[pageStream.pagePtr+wordInPage].nWords]
NOT IN [1 .. CARD[YggLog.wordsPerPage*logPagesInFilePages-wordInPage]] THEN GOTO checkFailed;
};
SetPageStreamPosFromSavedPos[];
RETURN [FALSE];
EXITS
checkFailed => { SetPageStreamPosFromSavedPos[]; ERROR InternalProgrammingError };
truncatedRecord => { SetPageStreamPosFromSavedPos[]; RETURN [TRUE] };
};
GetCurrentRecord:
PUBLIC
PROC [currentRecord: RecordID, to: YggLog.Block]
RETURNS [status: YggLog.ReadProcStatus, wordsRead: CARDINAL] = {
wordInPage: CARDINAL ← YggLogInline.WordInPageFromRecordID[recordStream.id, YggLog.wordsPerPage*logPagesInFilePages];
wordsRead ← 0;
IF YggLogInline.Compare[currentRecord, recordStream.id] # equal
THEN
ERROR CallerProgrammingError;
DO
Read one log block from this page.
wordsThisBlock, wordsThisCopy: CARDINAL;
hasContinuation, isContinuation: BOOL;
Read the header for the log block.
TRUSTED { [hasContinuation: hasContinuation, isContinuation: isContinuation, nWords: wordsThisBlock] ← Header[pageStream.pagePtr+wordInPage]^; };
IF wordsRead > 0 AND NOT isContinuation THEN GOTO truncatedRecord;
wordsThisBlock ← wordsThisBlock-SIZE[YggLogRep.Header];
wordInPage ← wordInPage+SIZE[YggLogRep.Header];
DO
Copy a run of words to "to".
wordsThisCopy ← MIN[to.length, wordsThisBlock];
IF to.base #
NIL
THEN TRUSTED {
PBasics.Copy[
to: to.base, from: pageStream.pagePtr+wordInPage, nwords: wordsThisCopy/WORDS[CARD32]];
to.base ← to.base+wordsThisCopy;
};
wordsRead ← wordsRead+wordsThisCopy;
Now what?
IF wordsThisCopy # to.length
THEN {
to.length > wordsThisCopy = wordsThisBlock; go read next log block.
to.length ← to.length-wordsThisCopy;
IF NOT hasContinuation THEN GOTO sourceExhausted;
wordInPage ← wordInPage + wordsThisBlock;
IF wordInPage >= CARD[YggLog.wordsPerPage*logPagesInFilePages] THEN {
IF AdvancePageStream[].endOfLog THEN GOTO truncatedRecord;
wordInPage ← 0;
};
GOTO nextBlock;
}
ELSE
IF to.rest #
NIL
THEN TRUSTED {
wordsThisCopy = to.length AND to.rest # NIL, so not done copying.
to ← to.rest^;
wordsThisBlock ← wordsThisBlock-wordsThisCopy;
wordInPage ← wordInPage + wordsThisCopy;
}
ELSE {
wordsThisCopy = to.length AND to.rest = NIL, so destination full; return.
SetPageStreamPosFromSavedPos[];
RETURN [
IF (NOT hasContinuation AND wordsThisBlock = wordsThisCopy) THEN normal
ELSE destinationFull, wordsRead]
}
REPEAT
nextBlock => NULL;
ENDLOOP;
ENDLOOP;
EXITS
sourceExhausted => {
SetPageStreamPosFromSavedPos[];
RETURN [sourceExhausted, wordsRead]
};
truncatedRecord => {
SetPageStreamPosFromSavedPos[];
ERROR CallerProgrammingError;
};
};
GetCurrentPageAndVersion:
PUBLIC
PROC []
RETURNS [PageNumber, YggLogRep.PageVersion] = {
RETURN[CurrentPageNumber[], pageStream.expectedVersion];
};
AdvanceRecordStream:
PUBLIC
PROC []
RETURNS [endOfLog, truncatedRecord: BOOL, currentRecord: RecordID] = TRUSTED {
It is ok to call Advance when recordStream is in a bad state, e.g. not pointing to the start
of a record. It should be pointing to the start of a block, however. Note that
wordInPage = 0 is sufficient.
wordsInRecord: CARDINAL ← 0;
mdWordsInALogPage: CARD ← CARD[YggLog.wordsPerPage*logPagesInFilePages];
endOfLog ← truncatedRecord ← FALSE;
{ -- block for EXITS
wordInPage: CARDINAL ← YggLogInline.WordInPageFromRecordID[recordStream.id, mdWordsInALogPage];
ForgetSavedPageStreamPos[];
DO
wordsInRecord ← wordsInRecord + Header[pageStream.pagePtr+wordInPage].nWords;
wordInPage ← wordInPage + Header[pageStream.pagePtr+wordInPage].nWords;
IF wordInPage >=
CARD[YggLog.wordsPerPage*logPagesInFilePages]
THEN {
IF AdvancePageStream[].endOfLog
THEN {
endOfLog ← truncatedRecord ← TRUE; GOTO return };
wordInPage ← 0;
};
IF NOT Header[pageStream.pagePtr+wordInPage].isContinuation THEN EXIT;
ENDLOOP;
IF wordInPage < mdWordsInALogPage
AND Header[pageStream.pagePtr+wordInPage].nWords = 0
THEN {
wordsToSkip: CARDINAL;
wordsToSkip ← YggLog.wordsPerPage - (wordInPage MOD (YggLog.wordsPerPage)) ;
wordsInRecord ← wordsInRecord + wordsToSkip;
wordInPage ← wordInPage + wordsToSkip;
};
IF wordInPage = mdWordsInALogPage
AND AdvancePageStream[].endOfLog
THEN {
endOfLog ← TRUE;
};
GOTO return
EXITS return => {
SavePageStreamPos[];
recordStream.id ← YggLogInline.AddC[recordStream.id, wordsInRecord];
RETURN[endOfLog, truncatedRecord, recordStream.id] };
}};