<> <> <> <> <> DIRECTORY PBasics, Process, RuntimeError, YggDID, YggDIDPrivate, YggDIDMap, YggEnvironment, YggFile, YggInternal, YggInline, YggBuffMan, YggLog, YggLogBasic, YggLogInline, YggLogBasicInternal, YggLogRep, YggSunOS; YggLogBasicImpl: CEDAR MONITOR IMPORTS PBasics, Process, RuntimeError, YggFile, YggInline, YggBuffMan, YggLog, YggLogBasic, YggLogInline EXPORTS YggDID, YggLogBasic, YggLogBasicInternal = BEGIN DID: PUBLIC TYPE ~ REF DIDRep; DIDRep: PUBLIC TYPE ~ YggDIDPrivate.DIDRep; PageCount: TYPE = YggEnvironment.PageCount; PageNumber: TYPE = YggEnvironment.PageNumber; -- in system pages, not log pages PageRun: TYPE = YggEnvironment.PageRun; WordNumber: TYPE = YggLogBasic.WordNumber; WordCount: TYPE = YggLogBasic.WordCount; RecordID: TYPE = YggLog.RecordID; CallerProgrammingError: ERROR = CODE; InternalProgrammingError: ERROR = CODE; WordsINWORDS: CARD = WORDS[CARD32]; Header: PROC [p: LONG POINTER] RETURNS [LONG POINTER TO YggLogRep.Header] = INLINE { RETURN [LOOPHOLE[p]] }; HeaderNotInline: PROC [p: LONG POINTER] RETURNS [LONG POINTER TO YggLogRep.Header] = { RETURN [LOOPHOLE[p]] }; <> logFileHandle: YggInternal.FileHandle; logFileSize: PageCount; logFileLength: WordCount; logChunkSize: INT = 1; logPagesInFilePages: INT _ YggFile.bytesPerPage/YggSunOS.LogBytesPerPage; log2MDwordsPerPage: INT; -- the log base 2 number of machine dependant words per page (Log2[YggLog.wordsPerPage]) <> PageRunStartingWith: PROC [p: PageNumber] RETURNS [pr: PageRun] = INLINE { RETURN [[firstPage: p, count: MIN[logFileSize/logPagesInFilePages-p, logChunkSize]]] }; PageFollowing: PROC [pr: YggDIDMap.Run] RETURNS [p: PageNumber] = { p _ pr.firstPage+pr.pages; RETURN [IF p*logPagesInFilePages = logFileSize THEN 0 ELSE p] }; IsMemberOfPageRun: PROC [p: PageNumber, pr: PageRun] RETURNS [BOOL] = INLINE { RETURN [p IN [pr.firstPage .. pr.firstPage+pr.count)] }; <> EstablishLogFile: PUBLIC PROC [] = { <> twpp: INT; IF logFileHandle = NIL THEN { logFileHandle _ YggFile.Open[runList: LIST[[YggSunOS.LogSegmentID, 0, 0, YggSunOS.LogFileSize, FALSE]], byteSize: YggSunOS.LogFileSize*YggSunOS.LogBytesPerPage, did: NEW[YggDIDPrivate.DIDRep _ [YggDIDPrivate.HighDIDSystemReserved, 11]], fileUse: $systemLog, verifyLeaderNow: FALSE]; log2MDwordsPerPage _ 1; FOR twpp _ YggLog.wordsPerPage, twpp/2 UNTIL twpp =1 DO log2MDwordsPerPage _ log2MDwordsPerPage + 1; ENDLOOP; logFileSize _ YggSunOS.LogFileSize; logFileLength _ YggInline.WordsFromPages[logFileSize, YggLog.wordsPerPage]; }; }; LogFileSize: PUBLIC PROC [] RETURNS [PageCount] = { RETURN[logFileSize]; }; LogUsage: PUBLIC ENTRY PROC [] RETURNS [PageCount] ~ { nPages: INT; RETURN[IF (nPages _ (PageFollowing[chunkTable[tail].chunk.first.pageRun] - oldestFilePage)) >= 0 THEN nPages + nBufferPages ELSE nPages + nBufferPages + logFileSize]; }; <> <> <> <> <> <> <> <> ChunkTableSize: CARDINAL = 4; ChunkTableIndex: TYPE = [0..ChunkTableSize); ChunkTableRec: TYPE = RECORD [ chunk: YggBuffMan.VMPageSet _ YggBuffMan.nullVMPageSet, <> recordID: YggLog.RecordID _ YggLog.nullRecordID, <> forceInProgress: BOOL _ FALSE <> <> <> ]; NextChunkTableIndex: PROC [i: ChunkTableIndex] RETURNS [ChunkTableIndex] = INLINE { RETURN [IF i = LAST[ChunkTableIndex] THEN FIRST[ChunkTableIndex] ELSE SUCC[i]] }; <> < FALSE>> <> <<(forked by AdvanceChunk), and oldestFilePage is updated by Release.>> chunkTable: ARRAY ChunkTableIndex OF ChunkTableRec; tail: ChunkTableIndex; completion: CONDITION; <> curVersion: YggLogRep.PageVersion; <> nextTailChunkID: YggLog.RecordID; <> <> oldestFilePage: PageNumber; <> enforceBufferPages: BOOL _ TRUE; enforceLogOverwrite: BOOL _ TRUE; nBufferPages: INT = 20; <> <> <> AdvanceChunk: PUBLIC ENTRY PROC [] RETURNS [ version: YggLogRep.PageVersion, pagesInChunk: INT, firstPagePtr: LONG POINTER] = { <> <> newTail: ChunkTableIndex; nextPage: PageNumber; runForNextPage: PageRun; biasedOldestFilePage: PageNumber; << Fork a process to write the current tail chunk.>> chunkTable[tail].forceInProgress _ TRUE; TRUSTED {Process.Detach[FORK ForceChunk[index: tail]]; }; << Wait, if necessary, for room in the chunk table to allocate the next tail chunk.>> DO newTail _ NextChunkTableIndex[tail]; IF NOT chunkTable[newTail].forceInProgress THEN EXIT; WAIT completion; ENDLOOP; << Now make sure that the write will not overwrite, or even get too close to, log pages that are still needed.>> nextPage _ PageFollowing[chunkTable[tail].chunk.first.pageRun]; runForNextPage _ PageRunStartingWith[nextPage]; biasedOldestFilePage _ oldestFilePage - nBufferPages; IF biasedOldestFilePage < 0 THEN biasedOldestFilePage _ biasedOldestFilePage + logFileSize; IF (enforceBufferPages AND IsMemberOfPageRun[biasedOldestFilePage, runForNextPage]) OR (enforceLogOverwrite AND IsMemberOfPageRun[oldestFilePage, runForNextPage]) THEN { RETURN WITH ERROR YggLog.WriteFailed }; << >> << About to start updating global data structures; from here on it's do or die--no more recoverable errors allowed;>> <<>> IF chunkTable[newTail].chunk # NIL THEN YggBuffMan.ReleaseVMPageSet[ vMPageSet: chunkTable[newTail].chunk, releaseState: clean, keep: TRUE]; IF nextPage = 0 THEN curVersion _ 1 - curVersion; tail _ newTail; chunkTable[tail].recordID _ nextTailChunkID; chunkTable[tail].chunk _ YggBuffMan.UsePages[ fileHandle: logFileHandle, tid: YggEnvironment.nullTransID, pageRun: runForNextPage]; nextTailChunkID _ YggLogInline.AddC[r: nextTailChunkID, words: logPagesInFilePages*YggInline.ShortWordsFromPages[chunkTable[tail].chunk.first.pageRun.pages, log2MDwordsPerPage]]; RETURN [ version: curVersion, pagesInChunk: chunkTable[tail].chunk.first.pageRun.pages*logPagesInFilePages, firstPagePtr: chunkTable[tail].chunk.first.buffer]; }; ForceChunk: PROC [index: ChunkTableIndex] = { << On entry, chunkTable[index].forceInProgress.>> << Force the chunk, and set forceInProgress _ false.>> << Note: do NOT hold monitor during force out.>> BroadcastCompletion: ENTRY PROC [] = INLINE { chunkTable[index].forceInProgress _ FALSE; BROADCAST completion; }; YggBuffMan.ForceOutVMPageSet[chunkTable[index].chunk]; BroadcastCompletion[]; }; ForceTo: PUBLIC PROC [followingRecord: RecordID] = { <> <> <<(using this process). Then wait for other chunks that precede followingRecord to>> <> TailForceRequired: ENTRY PROC [] RETURNS [doForce: BOOL, chunk: YggBuffMan.VMPageSet] = TRUSTED { <> IF ConstArith.Compare[followingRecord, nextTailChunkID] = greater THEN RETURN WITH ERROR CallerProgrammingError; IF ConstArith.Compare[chunkTable[tail].recordID, followingRecord] = less THEN { YggBuffMan.ShareVMPageSet[chunkTable[tail].chunk]; RETURN [TRUE, chunkTable[tail].chunk] } ELSE RETURN [FALSE, YggBuffMan.nullVMPageSet] }; AwaitForce: ENTRY PROC [] = { <> FOR currentChunk: ChunkTableIndex IN ChunkTableIndex DO TRUSTED { IF ConstArith.Compare[chunkTable[currentChunk].recordID, followingRecord] = less THEN UNTIL NOT chunkTable[currentChunk].forceInProgress DO WAIT completion; ENDLOOP; }; ENDLOOP; }; doForce: BOOL; chunk: YggBuffMan.VMPageSet; [doForce, chunk] _ TailForceRequired[]; IF doForce THEN { YggBuffMan.ForceOutVMPageSet[vMPageSet: chunk]; YggBuffMan.ReleaseVMPageSet[vMPageSet: chunk, releaseState: clean, keep: TRUE] }; AwaitForce[]; }; OpenBasicForPut: PUBLIC ENTRY PROC [ nextPage: PageNumber, version: YggLogRep.PageVersion, nextRecord: RecordID] RETURNS [pagesInChunk: INT, firstPagePtr: LONG POINTER] = { <> wordsToAdd: CARDINAL; enforceBufferPages _ FALSE; enforceLogOverwrite _ FALSE; curVersion _ version; tail _ FIRST[ChunkTableIndex]; chunkTable[tail].recordID _ nextRecord; chunkTable[tail].chunk _ YggBuffMan.UsePages[ fileHandle: logFileHandle, tid: YggEnvironment.nullTransID, pageRun: PageRunStartingWith[nextPage]]; wordsToAdd _ logPagesInFilePages * YggInline.ShortWordsFromPages[chunkTable[tail].chunk.first.pageRun.pages, log2MDwordsPerPage]; nextTailChunkID _ YggLogInline.AddC[r: chunkTable[tail].recordID, words: wordsToAdd]; RETURN [ pagesInChunk: chunkTable[tail].chunk.first.pageRun.pages*logPagesInFilePages, firstPagePtr: chunkTable[tail].chunk.first.buffer]; }; AssertNormalOperation: PUBLIC ENTRY PROC [] = { enforceBufferPages _ TRUE; enforceLogOverwrite _ TRUE; }; CloseBasicForPut: PUBLIC ENTRY PROC [] = { <> FOR i: ChunkTableIndex IN ChunkTableIndex DO WHILE chunkTable[i].forceInProgress DO WAIT completion; ENDLOOP; YggBuffMan.ForceOutVMPageSet[chunkTable[i].chunk]; YggBuffMan.ReleaseVMPageSet[ vMPageSet: chunkTable[i].chunk, releaseState: clean, keep: FALSE]; chunkTable[i] _ []; ENDLOOP; }; XAddC: PROC [r: RecordID, words: CARDINAL] RETURNS [RecordID] = TRUSTED { <> RETURN [YggLogInline.AddLC[r, LONG[words]]]; }; XShortWordsFromPages: PROC [pages: CARDINAL, logWordsPerPage: INT] RETURNS [words: CARDINAL] = { RETURN [Basics.BITSHIFT[value: pages, count: logWordsPerPage]] }; XSubNumberLessCount: PROC [num: WordNumber, words: WordCount] RETURNS [WordNumber] = TRUSTED { <> RETURN ConstArith.Sub[num, words]; }; WordNumberFromRecordID: PUBLIC ENTRY PROC [thisRecord: RecordID] RETURNS [result: WordNumber] = { <> offsetFromNextTailChunk: WordCount; TRUSTED {IF ConstArith.Compare[thisRecord, nextTailChunkID] = greater THEN GOTO tooLarge; }; offsetFromNextTailChunk _ YggLogInline.WordsFromSubtract[ larger: nextTailChunkID, smaller: thisRecord ! RuntimeError.BoundsFault => GOTO tooSmall]; IF YggLogBasic.Compare[offsetFromNextTailChunk, logFileLength] # less THEN GOTO tooSmall; result _ YggLogInline.SubNumberLessCount[ num: YggInline.WordsFromPages[ pages: PageFollowing[chunkTable[tail].chunk.first.pageRun], wordsPerPage: logPagesInFilePages*YggLog.wordsPerPage], words: offsetFromNextTailChunk]; RETURN [IF result.sign = negative THEN YggLogInline.AddNumberAndCount[result, logFileLength] ELSE result]; EXITS tooLarge => ERROR CallerProgrammingError; tooSmall => ERROR CallerProgrammingError; }; VerifyGet: ENTRY PROC [page: PageCount, pagePtr: LONG POINTER] = { <> TRUSTED {IF Header[pagePtr].valid THEN RETURN; }; IF IsMemberOfPageRun[page, [chunkTable[tail].chunk.first.pageRun.firstPage, chunkTable[tail].chunk.first.pageRun.pages]] THEN RETURN; ERROR; }; Release: PUBLIC PROC [beforeRecord: RecordID] = { <> AssignToOldestFilePage: ENTRY PROC [newOldestFilePage: PageNumber] = INLINE { oldestFilePage _ newOldestFilePage }; AssignToOldestFilePage[YggInline.PagesFromWords[ WordNumberFromRecordID[beforeRecord], YggLog.wordsPerPage]]; }; <> <> <> <> <> nChunksReadAhead: CARDINAL = 4; PageStreamObject: TYPE = RECORD [ chunk: YggBuffMan.VMPageSet _ YggBuffMan.nullVMPageSet, savePageRun: PageRun, pageInPageRun, savePageInPageRun: CARDINAL, expectedVersion, saveExpectedVersion: YggLogRep.PageVersion, pagePtr: LONG POINTER, posSaved: BOOL, nextReadAheadPage: PageNumber ]; pageStream: PageStreamObject; OpenPageStream: PROC [p: PageNumber] RETURNS [endOfLog: BOOL] = { <> pageStream.chunk _ YggBuffMan.ReadPages[ fileHandle: logFileHandle, tid: YggEnvironment.nullTransID, pageRun: PageRunStartingWith[p]]; pageStream.pagePtr _ pageStream.chunk.first.buffer; pageStream.pageInPageRun _ 0; pageStream.nextReadAheadPage _ PageFollowing[pageStream.chunk.first.pageRun]; THROUGH [1..nChunksReadAhead] DO ReadAhead[] ENDLOOP; TRUSTED { IF NOT Header[pageStream.pagePtr].valid THEN RETURN [TRUE]; pageStream.expectedVersion _ Header[pageStream.pagePtr].version; }; pageStream.posSaved _ FALSE; RETURN [FALSE]; }; ClosePageStream: PROC [] = { YggBuffMan.ReleaseVMPageSet[ vMPageSet: pageStream.chunk, releaseState: clean, keep: FALSE]; pageStream.chunk _ YggBuffMan.nullVMPageSet; }; ReadAhead: PROC [] = { YggBuffMan.ReadAheadPages[ logFileHandle, PageRunStartingWith[pageStream.nextReadAheadPage]]; pageStream.nextReadAheadPage _ IF pageStream.nextReadAheadPage < logFileSize - logChunkSize THEN pageStream.nextReadAheadPage + logChunkSize ELSE 0; }; SavePageStreamPos: PROC [] = { pageStream.savePageRun _ [pageStream.chunk.first.pageRun.firstPage, pageStream.chunk.first.pageRun.pages]; pageStream.savePageInPageRun _ pageStream.pageInPageRun; pageStream.saveExpectedVersion _ pageStream.expectedVersion; pageStream.posSaved _ TRUE; }; CurrentPageNumber: PROC [] RETURNS [PageNumber] = INLINE { RETURN [IF pageStream.chunk = NIL THEN 0 ELSE pageStream.chunk.first.pageRun.firstPage + pageStream.pageInPageRun]; }; ForgetSavedPageStreamPos: PROC [] = { <> IF NOT pageStream.posSaved THEN RETURN; IF pageStream.savePageRun # [pageStream.chunk.first.pageRun.firstPage, pageStream.chunk.first.pageRun.pages] THEN ERROR InternalProgrammingError; pageStream.posSaved _ FALSE; }; AdvancePageStream: PROC [] RETURNS [endOfLog: BOOL] = { pageStream.pageInPageRun _ pageStream.pageInPageRun+1; IF pageStream.pageInPageRun = pageStream.chunk.first.pageRun.pages THEN { p: PageNumber; YggBuffMan.ReleaseVMPageSet[ vMPageSet: pageStream.chunk, releaseState: clean, keep: pageStream.posSaved]; p _ PageFollowing[pageStream.chunk.first.pageRun]; IF p = 0 THEN pageStream.expectedVersion _ 1 - pageStream.expectedVersion; pageStream.chunk _ YggBuffMan.ReadPages[fileHandle: logFileHandle, tid: YggEnvironment.nullTransID, pageRun: PageRunStartingWith[p]]; pageStream.pagePtr _ pageStream.chunk.first.buffer; pageStream.pageInPageRun _ 0; IF NOT pageStream.posSaved THEN ReadAhead[]; } ELSE pageStream.pagePtr _ pageStream.pagePtr + YggLog.wordsPerPage; TRUSTED {RETURN[ (NOT Header[pageStream.pagePtr].valid) OR (pageStream.expectedVersion # Header[pageStream.pagePtr].version) ]; }; }; SetPageStreamPosFromSavedPos: PROC [] = { IF NOT pageStream.posSaved THEN ERROR InternalProgrammingError; IF PageRun[pageStream.chunk.first.pageRun.firstPage, pageStream.chunk.first.pageRun.pages] # pageStream.savePageRun THEN { YggBuffMan.ReleaseVMPageSet[vMPageSet: pageStream.chunk, releaseState: clean, keep: TRUE]; pageStream.chunk _ YggBuffMan.ReadPages[fileHandle: logFileHandle, tid: YggEnvironment.nullTransID, pageRun: pageStream.savePageRun]; pageStream.expectedVersion _ pageStream.saveExpectedVersion; }; pageStream.pageInPageRun _ pageStream.savePageInPageRun; pageStream.pagePtr _ pageStream.chunk.first.buffer + YggInline.ShortWordsFromPages[pageStream.pageInPageRun, log2MDwordsPerPage]; }; <> <> RecordStreamObject: TYPE = RECORD [ id: RecordID <> ]; recordStream: RecordStreamObject; OpenRecordStreamFromWord: PUBLIC PROC [firstWord: WordNumber] RETURNS [notStartOfRecord: BOOL, currentRecord: RecordID] = { notStartOfRecord _ OpenRecordStream[firstWord: firstWord, firstRecord: YggLogInline.RecordIDFromWordNumber[firstWord]]; RETURN [notStartOfRecord, recordStream.id]; }; OpenRecordStreamFromCheckpoint: PUBLIC PROC [ checkpointWord: WordNumber, checkpointRecord: RecordID, firstRecord: RecordID] RETURNS [notStartOfRecord: BOOL, currentRecord: RecordID] = { offsetFromCheckpoint, firstWord: WordCount; TRUSTED { IF ConstArith.Compare[checkpointRecord, firstRecord] = less THEN ERROR CallerProgrammingError; }; offsetFromCheckpoint _ YggLogInline.WordsFromSubtract[ larger: checkpointRecord, smaller: firstRecord ! RuntimeError.BoundsFault => GOTO tooSmall]; IF YggLogBasic.Compare[offsetFromCheckpoint, logFileLength] # less THEN GOTO tooSmall; firstWord _ YggLogInline.SubNumberLessCount[checkpointWord, offsetFromCheckpoint]; IF firstWord.sign = negative THEN firstWord _ YggLogInline.AddNumberAndCount[firstWord, logFileLength]; notStartOfRecord _ OpenRecordStream[firstWord: firstWord, firstRecord: firstRecord]; RETURN [notStartOfRecord, firstRecord]; EXITS tooSmall => ERROR CallerProgrammingError; }; OpenRecordStream: PROC [firstWord: WordNumber, firstRecord: RecordID] RETURNS [notStartOfRecord: BOOL] = { p: PageNumber; w: CARDINAL; [page: p, wordInPage: w] _ YggInline.DecomposeWords[firstWord, YggLog.wordsPerPage*logPagesInFilePages]; IF YggLogInline.WordInPageFromRecordID[firstRecord, YggLog.wordsPerPage*logPagesInFilePages] # w THEN ERROR CallerProgrammingError; IF OpenPageStream[p].endOfLog THEN GOTO invalidPage; SavePageStreamPos[]; recordStream _ [id: firstRecord]; RETURN [VerifyCurrentRecordID[].notStartOfRecord]; EXITS invalidPage => ERROR InvalidPage; }; InvalidPage: PUBLIC ERROR = CODE; CloseRecordStream: PUBLIC PROC [] = { ClosePageStream[]; }; VerifyCurrentRecordID: PROC [] RETURNS [notStartOfRecord: BOOL] = { wordInPage: CARDINAL _ YggLogInline.WordInPageFromRecordID[recordStream.id, YggLog.wordsPerPage*logPagesInFilePages]; TRUSTED { w: CARDINAL _ (wordInPage/YggLog.wordsPerPage) * YggLog.wordsPerPage; UNTIL w >= wordInPage DO w _ w + Header[pageStream.pagePtr+w].nWords; ENDLOOP; IF w > wordInPage THEN RETURN [TRUE]; }; TRUSTED {RETURN [Header[pageStream.pagePtr+wordInPage].isContinuation]; }; }; CheckCurrentRecord: PUBLIC PROC [] RETURNS [truncated: BOOL] = TRUSTED { wordInPage: CARDINAL _ YggLogInline.WordInPageFromRecordID[recordStream.id, YggLog.wordsPerPage*logPagesInFilePages]; IF VerifyCurrentRecordID[].notStartOfRecord THEN ERROR InternalProgrammingError; IF Header[pageStream.pagePtr+wordInPage].hasContinuation THEN { IF (CARD[Header[pageStream.pagePtr+wordInPage].nWords] + wordInPage) MOD YggLog.wordsPerPage # 0 THEN GOTO checkFailed; wordInPage _ CARD[Header[pageStream.pagePtr+wordInPage].nWords] + wordInPage; DO wordInPage _ wordInPage + CARD[Header[pageStream.pagePtr+wordInPage].nWords]; IF wordInPage >= CARD[YggLog.wordsPerPage*logPagesInFilePages] THEN { IF AdvancePageStream[].endOfLog THEN GOTO truncatedRecord; wordInPage _ 0; }; IF NOT Header[pageStream.pagePtr+wordInPage].isContinuation THEN GOTO truncatedRecord; IF NOT Header[pageStream.pagePtr+wordInPage].hasContinuation THEN EXIT; IF CARD[Header[pageStream.pagePtr+wordInPage].nWords] MOD YggLog.wordsPerPage # 0 THEN GOTO checkFailed; ENDLOOP; IF CARD[Header[pageStream.pagePtr+wordInPage].nWords] NOT IN [1 .. YggLog.wordsPerPage] THEN GOTO checkFailed; } ELSE { IF CARD[Header[pageStream.pagePtr+wordInPage].nWords] NOT IN [1 .. CARD[YggLog.wordsPerPage*logPagesInFilePages-wordInPage]] THEN GOTO checkFailed; }; SetPageStreamPosFromSavedPos[]; RETURN [FALSE]; EXITS checkFailed => { SetPageStreamPosFromSavedPos[]; ERROR InternalProgrammingError }; truncatedRecord => { SetPageStreamPosFromSavedPos[]; RETURN [TRUE] }; }; GetCurrentRecord: PUBLIC PROC [currentRecord: RecordID, to: YggLog.Block] RETURNS [status: YggLog.ReadProcStatus, wordsRead: CARDINAL] = { wordInPage: CARDINAL _ YggLogInline.WordInPageFromRecordID[recordStream.id, YggLog.wordsPerPage*logPagesInFilePages]; wordsRead _ 0; IF YggLogInline.Compare[currentRecord, recordStream.id] # equal THEN ERROR CallerProgrammingError; DO <> wordsThisBlock, wordsThisCopy: CARDINAL; hasContinuation, isContinuation: BOOL; <> TRUSTED { [hasContinuation: hasContinuation, isContinuation: isContinuation, nWords: wordsThisBlock] _ Header[pageStream.pagePtr+wordInPage]^; }; IF wordsRead > 0 AND NOT isContinuation THEN GOTO truncatedRecord; wordsThisBlock _ wordsThisBlock-SIZE[YggLogRep.Header]; wordInPage _ wordInPage+SIZE[YggLogRep.Header]; DO <> wordsThisCopy _ MIN[to.length, wordsThisBlock]; IF to.base # NIL THEN TRUSTED { PBasics.Copy[ to: to.base, from: pageStream.pagePtr+wordInPage, nwords: wordsThisCopy/WORDS[CARD32]]; to.base _ to.base+wordsThisCopy; }; wordsRead _ wordsRead+wordsThisCopy; <> IF wordsThisCopy # to.length THEN { < wordsThisCopy = wordsThisBlock; go read next log block.>> to.length _ to.length-wordsThisCopy; IF NOT hasContinuation THEN GOTO sourceExhausted; wordInPage _ wordInPage + wordsThisBlock; IF wordInPage >= CARD[YggLog.wordsPerPage*logPagesInFilePages] THEN { IF AdvancePageStream[].endOfLog THEN GOTO truncatedRecord; wordInPage _ 0; }; GOTO nextBlock; } ELSE IF to.rest # NIL THEN TRUSTED { <> to _ to.rest^; wordsThisBlock _ wordsThisBlock-wordsThisCopy; wordInPage _ wordInPage + wordsThisCopy; } ELSE { <> SetPageStreamPosFromSavedPos[]; RETURN [ IF (NOT hasContinuation AND wordsThisBlock = wordsThisCopy) THEN normal ELSE destinationFull, wordsRead] } REPEAT nextBlock => NULL; ENDLOOP; ENDLOOP; EXITS sourceExhausted => { SetPageStreamPosFromSavedPos[]; RETURN [sourceExhausted, wordsRead] }; truncatedRecord => { SetPageStreamPosFromSavedPos[]; ERROR CallerProgrammingError; }; }; GetCurrentPageAndVersion: PUBLIC PROC [] RETURNS [PageNumber, YggLogRep.PageVersion] = { RETURN[CurrentPageNumber[], pageStream.expectedVersion]; }; AdvanceRecordStream: PUBLIC PROC [] RETURNS [endOfLog, truncatedRecord: BOOL, currentRecord: RecordID] = TRUSTED { <> <> <> wordsInRecord: CARDINAL _ 0; mdWordsInALogPage: CARD _ CARD[YggLog.wordsPerPage*logPagesInFilePages]; endOfLog _ truncatedRecord _ FALSE; { -- block for EXITS wordInPage: CARDINAL _ YggLogInline.WordInPageFromRecordID[recordStream.id, mdWordsInALogPage]; ForgetSavedPageStreamPos[]; WHILE Header[pageStream.pagePtr+wordInPage].hasContinuation DO wordsInRecord _ wordsInRecord + Header[pageStream.pagePtr+wordInPage].nWords; wordInPage _ 0; IF AdvancePageStream[].endOfLog THEN { endOfLog _ truncatedRecord _ TRUE; GOTO return }; IF NOT Header[pageStream.pagePtr].isContinuation THEN { truncatedRecord _ TRUE; GOTO return }; ENDLOOP; { wordsInLastBlock: CARDINAL _ Header[pageStream.pagePtr+wordInPage].nWords; wordInPage _ wordInPage + wordsInLastBlock; wordsInRecord _ wordsInRecord + wordsInLastBlock; }; IF wordInPage < mdWordsInALogPage AND Header[pageStream.pagePtr+wordInPage].nWords = 0 THEN { wordsToSkip: CARDINAL; wordsToSkip _ YggLog.wordsPerPage - (wordInPage MOD (YggLog.wordsPerPage)) ; wordsInRecord _ wordsInRecord + wordsToSkip; wordInPage _ wordInPage + wordsToSkip; }; IF wordInPage = mdWordsInALogPage AND AdvancePageStream[].endOfLog THEN { endOfLog _ TRUE; }; GOTO return EXITS return => { SavePageStreamPos[]; recordStream.id _ YggLogInline.AddC[recordStream.id, wordsInRecord]; RETURN[endOfLog, truncatedRecord, recordStream.id] }; }}; <> Get: PUBLIC PROC [thisRecord: YggLog.RecordID, to: YggLog.Block] RETURNS [status: YggLog.ReadProcStatus, wordsRead: CARDINAL] = { <> < ReleaseChunk[]>> chunk: YggBuffMan.VMPageSet; pagePtr: LONG POINTER; pagesLeftInChunk: CARDINAL; GetChunk: PROC [nextPage: PageNumber] = { chunk _ YggBuffMan.ReadPages[ fileHandle: logFileHandle, tid: YggEnvironment.nullTransID, pageRun: PageRunStartingWith[nextPage]]; pagePtr _ chunk.first.buffer; pagesLeftInChunk _ chunk.first.pageRun.pages - 1; }; ReleaseChunk: PROC [] = INLINE { YggBuffMan.ReleaseVMPageSet[vMPageSet: chunk, releaseState: clean, keep: FALSE]; }; AdvancePage: PROC [] = INLINE { IF pagesLeftInChunk = 0 THEN { nextPage: PageNumber _ PageFollowing[chunk.first.pageRun]; ReleaseChunk[]; IF nextPage = 0 THEN pageVersion _ 1-pageVersion; GetChunk[nextPage]; } ELSE { pagePtr _ pagePtr + YggLog.wordsPerPage; pagesLeftInChunk _ pagesLeftInChunk - 1; }; }; isFirstPage: BOOL _ TRUE; pageVersion: YggLogRep.PageVersion _ 0; wordInPage: CARDINAL; Verify: PROC [] = { <> <> w: CARDINAL _ 0; UNTIL w = wordInPage DO TRUSTED {w _ w + Header[pagePtr+w].nWords; }; IF w > wordInPage THEN ERROR InternalProgrammingError; ENDLOOP; }; { firstPage: PageNumber; [page: firstPage, wordInPage: wordInPage] _ YggInline.DecomposeWords[ WordNumberFromRecordID[thisRecord], YggLog.wordsPerPage]; GetChunk[firstPage]; }; Verify[]; wordsRead _ 0; DO <> wordsThisBlock, wordsThisCopy: CARDINAL; hasContinuation, isContinuation: BOOL; version: YggLogRep.PageVersion; <> TRUSTED { IF NOT Header[pagePtr].valid THEN VerifyGet[ chunk.first.pageRun.firstPage+(chunk.first.pageRun.pages-(1+pagesLeftInChunk)), pagePtr]; }; <> TRUSTED {[version: version, hasContinuation: hasContinuation, isContinuation: isContinuation, nWords: wordsThisBlock] _ Header[pagePtr+wordInPage]^; }; wordsThisBlock _ wordsThisBlock-YggLogRep.Header.SIZE; wordInPage _ wordInPage+YggLogRep.Header.SIZE; IF isFirstPage THEN { IF isContinuation THEN ERROR InternalProgrammingError; pageVersion _ version; isFirstPage _ FALSE } ELSE { IF NOT isContinuation OR (version # pageVersion) THEN ERROR InternalProgrammingError }; DO <> wordsThisCopy _ MIN[to.length, wordsThisBlock]; IF to.base # NIL THEN TRUSTED { PBasics.Copy[ to: to.base, from: pagePtr+wordInPage, nwords: wordsThisCopy/WORDS[CARD32]]; to.base _ to.base+wordsThisCopy; }; wordsRead _ wordsRead+wordsThisCopy; <> IF wordsThisCopy # to.length THEN { < wordsThisCopy = wordsThisBlock; go read next log block.>> to.length _ to.length-wordsThisCopy; IF NOT hasContinuation THEN { ReleaseChunk[]; RETURN[sourceExhausted, wordsRead] }; AdvancePage[]; wordInPage _ 0; GOTO nextBlock; } ELSE IF to.rest # NIL THEN TRUSTED { <> to _ to.rest^; wordsThisBlock _ wordsThisBlock-wordsThisCopy; wordInPage _ wordInPage + wordsThisCopy; } ELSE { <> ReleaseChunk[]; RETURN [ IF (NOT hasContinuation AND wordsThisBlock = wordsThisCopy) THEN normal ELSE destinationFull, wordsRead] } REPEAT nextBlock => NULL; ENDLOOP; ENDLOOP; }; <> YggLog.wordsPerPage _ YggSunOS.LogBytesPerPage/BYTES[WORD]; END.