DIRECTORY PBasics, Process, RuntimeError, YggDID, YggDIDPrivate, YggDIDMap, YggEnvironment, YggFile, YggInternal, YggInline, YggBuffMan, YggLog, YggLogBasic, YggLogInline, YggLogBasicInternal, YggLogRep, YggSunOS; YggLogBasicImpl: CEDAR MONITOR IMPORTS PBasics, Process, RuntimeError, YggFile, YggInline, YggBuffMan, YggLog, YggLogBasic, YggLogInline EXPORTS YggDID, YggLogBasic, YggLogBasicInternal = BEGIN DID: PUBLIC TYPE ~ REF DIDRep; DIDRep: PUBLIC TYPE ~ YggDIDPrivate.DIDRep; PageCount: TYPE = YggEnvironment.PageCount; PageNumber: TYPE = YggEnvironment.PageNumber; -- in system pages, not log pages PageRun: TYPE = YggEnvironment.PageRun; WordNumber: TYPE = YggLogBasic.WordNumber; WordCount: TYPE = YggLogBasic.WordCount; RecordID: TYPE = YggLog.RecordID; CallerProgrammingError: ERROR = CODE; InternalProgrammingError: ERROR = CODE; WordsINWORDS: CARD = WORDS[CARD32]; Header: PROC [p: LONG POINTER] RETURNS [LONG POINTER TO YggLogRep.Header] = INLINE { RETURN [LOOPHOLE[p]] }; HeaderNotInline: PROC [p: LONG POINTER] RETURNS [LONG POINTER TO YggLogRep.Header] = { RETURN [LOOPHOLE[p]] }; logFileHandle: YggInternal.FileHandle; logFileSize: PageCount; logFileLength: WordCount; logChunkSize: INT = 1; logPagesInFilePages: INT _ YggFile.bytesPerPage/YggSunOS.LogBytesPerPage; log2MDwordsPerPage: INT; -- the log base 2 number of machine dependant words per page (Log2[YggLog.wordsPerPage]) PageRunStartingWith: PROC [p: PageNumber] RETURNS [pr: PageRun] = INLINE { RETURN [[firstPage: p, count: MIN[logFileSize/logPagesInFilePages-p, logChunkSize]]] }; PageFollowing: PROC [pr: YggDIDMap.Run] RETURNS [p: PageNumber] = { p _ pr.firstPage+pr.pages; RETURN [IF p*logPagesInFilePages = logFileSize THEN 0 ELSE p] }; IsMemberOfPageRun: PROC [p: PageNumber, pr: PageRun] RETURNS [BOOL] = INLINE { RETURN [p IN [pr.firstPage .. pr.firstPage+pr.count)] }; EstablishLogFile: PUBLIC PROC [] = { twpp: INT; IF logFileHandle = NIL THEN { logFileHandle _ YggFile.Open[runList: LIST[[YggSunOS.LogSegmentID, 0, 0, YggSunOS.LogFileSize, FALSE]], byteSize: YggSunOS.LogFileSize*YggSunOS.LogBytesPerPage, did: NEW[YggDIDPrivate.DIDRep _ [YggDIDPrivate.HighDIDSystemReserved, 11]], fileUse: $systemLog, verifyLeaderNow: FALSE]; log2MDwordsPerPage _ 1; FOR twpp _ YggLog.wordsPerPage, twpp/2 UNTIL twpp =1 DO log2MDwordsPerPage _ log2MDwordsPerPage + 1; ENDLOOP; logFileSize _ YggSunOS.LogFileSize; logFileLength _ YggInline.WordsFromPages[logFileSize, YggLog.wordsPerPage]; }; }; LogFileSize: PUBLIC PROC [] RETURNS [PageCount] = { RETURN[logFileSize]; }; LogUsage: PUBLIC ENTRY PROC [] RETURNS [PageCount] ~ { nPages: INT; RETURN[IF (nPages _ (PageFollowing[chunkTable[tail].chunk.first.pageRun] - oldestFilePage)) >= 0 THEN nPages + nBufferPages ELSE nPages + nBufferPages + logFileSize]; }; ChunkTableSize: CARDINAL = 4; ChunkTableIndex: TYPE = [0..ChunkTableSize); ChunkTableRec: TYPE = RECORD [ chunk: YggBuffMan.VMPageSet _ YggBuffMan.nullVMPageSet, recordID: YggLog.RecordID _ YggLog.nullRecordID, forceInProgress: BOOL _ FALSE ]; NextChunkTableIndex: PROC [i: ChunkTableIndex] RETURNS [ChunkTableIndex] = INLINE { RETURN [IF i = LAST[ChunkTableIndex] THEN FIRST[ChunkTableIndex] ELSE SUCC[i]] }; chunkTable: ARRAY ChunkTableIndex OF ChunkTableRec; tail: ChunkTableIndex; completion: CONDITION; curVersion: YggLogRep.PageVersion; nextTailChunkID: YggLog.RecordID; oldestFilePage: PageNumber; enforceBufferPages: BOOL _ TRUE; enforceLogOverwrite: BOOL _ TRUE; nBufferPages: INT = 20; AdvanceChunk: PUBLIC ENTRY PROC [] RETURNS [ version: YggLogRep.PageVersion, pagesInChunk: INT, firstPagePtr: LONG POINTER] = { newTail: ChunkTableIndex; nextPage: PageNumber; runForNextPage: PageRun; biasedOldestFilePage: PageNumber; chunkTable[tail].forceInProgress _ TRUE; TRUSTED {Process.Detach[FORK ForceChunk[index: tail]]; }; DO newTail _ NextChunkTableIndex[tail]; IF NOT chunkTable[newTail].forceInProgress THEN EXIT; WAIT completion; ENDLOOP; nextPage _ PageFollowing[chunkTable[tail].chunk.first.pageRun]; runForNextPage _ PageRunStartingWith[nextPage]; biasedOldestFilePage _ oldestFilePage - nBufferPages; IF biasedOldestFilePage < 0 THEN biasedOldestFilePage _ biasedOldestFilePage + logFileSize; IF (enforceBufferPages AND IsMemberOfPageRun[biasedOldestFilePage, runForNextPage]) OR (enforceLogOverwrite AND IsMemberOfPageRun[oldestFilePage, runForNextPage]) THEN { RETURN WITH ERROR YggLog.WriteFailed }; IF chunkTable[newTail].chunk # NIL THEN YggBuffMan.ReleaseVMPageSet[ vMPageSet: chunkTable[newTail].chunk, releaseState: clean, keep: TRUE]; IF nextPage = 0 THEN curVersion _ 1 - curVersion; tail _ newTail; chunkTable[tail].recordID _ nextTailChunkID; chunkTable[tail].chunk _ YggBuffMan.UsePages[ fileHandle: logFileHandle, tid: YggEnvironment.nullTransID, pageRun: runForNextPage]; nextTailChunkID _ YggLogInline.AddC[r: nextTailChunkID, words: logPagesInFilePages*YggInline.ShortWordsFromPages[chunkTable[tail].chunk.first.pageRun.pages, log2MDwordsPerPage]]; RETURN [ version: curVersion, pagesInChunk: chunkTable[tail].chunk.first.pageRun.pages*logPagesInFilePages, firstPagePtr: chunkTable[tail].chunk.first.buffer]; }; ForceChunk: PROC [index: ChunkTableIndex] = { BroadcastCompletion: ENTRY PROC [] = INLINE { chunkTable[index].forceInProgress _ FALSE; BROADCAST completion; }; YggBuffMan.ForceOutVMPageSet[chunkTable[index].chunk]; BroadcastCompletion[]; }; ForceTo: PUBLIC PROC [followingRecord: RecordID] = { TailForceRequired: ENTRY PROC [] RETURNS [doForce: BOOL, chunk: YggBuffMan.VMPageSet] = TRUSTED { IF ConstArith.Compare[followingRecord, nextTailChunkID] = greater THEN RETURN WITH ERROR CallerProgrammingError; IF ConstArith.Compare[chunkTable[tail].recordID, followingRecord] = less THEN { YggBuffMan.ShareVMPageSet[chunkTable[tail].chunk]; RETURN [TRUE, chunkTable[tail].chunk] } ELSE RETURN [FALSE, YggBuffMan.nullVMPageSet] }; AwaitForce: ENTRY PROC [] = { FOR currentChunk: ChunkTableIndex IN ChunkTableIndex DO TRUSTED { IF ConstArith.Compare[chunkTable[currentChunk].recordID, followingRecord] = less THEN UNTIL NOT chunkTable[currentChunk].forceInProgress DO WAIT completion; ENDLOOP; }; ENDLOOP; }; doForce: BOOL; chunk: YggBuffMan.VMPageSet; [doForce, chunk] _ TailForceRequired[]; IF doForce THEN { YggBuffMan.ForceOutVMPageSet[vMPageSet: chunk]; YggBuffMan.ReleaseVMPageSet[vMPageSet: chunk, releaseState: clean, keep: TRUE] }; AwaitForce[]; }; OpenBasicForPut: PUBLIC ENTRY PROC [ nextPage: PageNumber, version: YggLogRep.PageVersion, nextRecord: RecordID] RETURNS [pagesInChunk: INT, firstPagePtr: LONG POINTER] = { wordsToAdd: CARDINAL; enforceBufferPages _ FALSE; enforceLogOverwrite _ FALSE; curVersion _ version; tail _ FIRST[ChunkTableIndex]; chunkTable[tail].recordID _ nextRecord; chunkTable[tail].chunk _ YggBuffMan.UsePages[ fileHandle: logFileHandle, tid: YggEnvironment.nullTransID, pageRun: PageRunStartingWith[nextPage]]; wordsToAdd _ logPagesInFilePages * YggInline.ShortWordsFromPages[chunkTable[tail].chunk.first.pageRun.pages, log2MDwordsPerPage]; nextTailChunkID _ YggLogInline.AddC[r: chunkTable[tail].recordID, words: wordsToAdd]; RETURN [ pagesInChunk: chunkTable[tail].chunk.first.pageRun.pages*logPagesInFilePages, firstPagePtr: chunkTable[tail].chunk.first.buffer]; }; AssertNormalOperation: PUBLIC ENTRY PROC [] = { enforceBufferPages _ TRUE; enforceLogOverwrite _ TRUE; }; CloseBasicForPut: PUBLIC ENTRY PROC [] = { FOR i: ChunkTableIndex IN ChunkTableIndex DO WHILE chunkTable[i].forceInProgress DO WAIT completion; ENDLOOP; YggBuffMan.ForceOutVMPageSet[chunkTable[i].chunk]; YggBuffMan.ReleaseVMPageSet[ vMPageSet: chunkTable[i].chunk, releaseState: clean, keep: FALSE]; chunkTable[i] _ []; ENDLOOP; }; XAddC: PROC [r: RecordID, words: CARDINAL] RETURNS [RecordID] = TRUSTED { RETURN [YggLogInline.AddLC[r, LONG[words]]]; }; XShortWordsFromPages: PROC [pages: CARDINAL, logWordsPerPage: INT] RETURNS [words: CARDINAL] = { RETURN [Basics.BITSHIFT[value: pages, count: logWordsPerPage]] }; XSubNumberLessCount: PROC [num: WordNumber, words: WordCount] RETURNS [WordNumber] = TRUSTED { RETURN ConstArith.Sub[num, words]; }; WordNumberFromRecordID: PUBLIC ENTRY PROC [thisRecord: RecordID] RETURNS [result: WordNumber] = { offsetFromNextTailChunk: WordCount; TRUSTED {IF ConstArith.Compare[thisRecord, nextTailChunkID] = greater THEN GOTO tooLarge; }; offsetFromNextTailChunk _ YggLogInline.WordsFromSubtract[ larger: nextTailChunkID, smaller: thisRecord ! RuntimeError.BoundsFault => GOTO tooSmall]; IF YggLogBasic.Compare[offsetFromNextTailChunk, logFileLength] # less THEN GOTO tooSmall; result _ YggLogInline.SubNumberLessCount[ num: YggInline.WordsFromPages[ pages: PageFollowing[chunkTable[tail].chunk.first.pageRun], wordsPerPage: logPagesInFilePages*YggLog.wordsPerPage], words: offsetFromNextTailChunk]; RETURN [IF result.sign = negative THEN YggLogInline.AddNumberAndCount[result, logFileLength] ELSE result]; EXITS tooLarge => ERROR CallerProgrammingError; tooSmall => ERROR CallerProgrammingError; }; VerifyGet: ENTRY PROC [page: PageCount, pagePtr: LONG POINTER] = { TRUSTED {IF Header[pagePtr].valid THEN RETURN; }; IF IsMemberOfPageRun[page, [chunkTable[tail].chunk.first.pageRun.firstPage, chunkTable[tail].chunk.first.pageRun.pages]] THEN RETURN; ERROR; }; Release: PUBLIC PROC [beforeRecord: RecordID] = { AssignToOldestFilePage: ENTRY PROC [newOldestFilePage: PageNumber] = INLINE { oldestFilePage _ newOldestFilePage }; AssignToOldestFilePage[YggInline.PagesFromWords[ WordNumberFromRecordID[beforeRecord], YggLog.wordsPerPage]]; }; nChunksReadAhead: CARDINAL = 4; PageStreamObject: TYPE = RECORD [ chunk: YggBuffMan.VMPageSet _ YggBuffMan.nullVMPageSet, savePageRun: PageRun, pageInPageRun, savePageInPageRun: CARDINAL, expectedVersion, saveExpectedVersion: YggLogRep.PageVersion, pagePtr: LONG POINTER, posSaved: BOOL, nextReadAheadPage: PageNumber ]; pageStream: PageStreamObject; OpenPageStream: PROC [p: PageNumber] RETURNS [endOfLog: BOOL] = { pageStream.chunk _ YggBuffMan.ReadPages[ fileHandle: logFileHandle, tid: YggEnvironment.nullTransID, pageRun: PageRunStartingWith[p]]; pageStream.pagePtr _ pageStream.chunk.first.buffer; pageStream.pageInPageRun _ 0; pageStream.nextReadAheadPage _ PageFollowing[pageStream.chunk.first.pageRun]; THROUGH [1..nChunksReadAhead] DO ReadAhead[] ENDLOOP; TRUSTED { IF NOT Header[pageStream.pagePtr].valid THEN RETURN [TRUE]; pageStream.expectedVersion _ Header[pageStream.pagePtr].version; }; pageStream.posSaved _ FALSE; RETURN [FALSE]; }; ClosePageStream: PROC [] = { YggBuffMan.ReleaseVMPageSet[ vMPageSet: pageStream.chunk, releaseState: clean, keep: FALSE]; pageStream.chunk _ YggBuffMan.nullVMPageSet; }; ReadAhead: PROC [] = { YggBuffMan.ReadAheadPages[ logFileHandle, PageRunStartingWith[pageStream.nextReadAheadPage]]; pageStream.nextReadAheadPage _ IF pageStream.nextReadAheadPage < logFileSize - logChunkSize THEN pageStream.nextReadAheadPage + logChunkSize ELSE 0; }; SavePageStreamPos: PROC [] = { pageStream.savePageRun _ [pageStream.chunk.first.pageRun.firstPage, pageStream.chunk.first.pageRun.pages]; pageStream.savePageInPageRun _ pageStream.pageInPageRun; pageStream.saveExpectedVersion _ pageStream.expectedVersion; pageStream.posSaved _ TRUE; }; CurrentPageNumber: PROC [] RETURNS [PageNumber] = INLINE { RETURN [IF pageStream.chunk = NIL THEN 0 ELSE pageStream.chunk.first.pageRun.firstPage + pageStream.pageInPageRun]; }; ForgetSavedPageStreamPos: PROC [] = { IF NOT pageStream.posSaved THEN RETURN; IF pageStream.savePageRun # [pageStream.chunk.first.pageRun.firstPage, pageStream.chunk.first.pageRun.pages] THEN ERROR InternalProgrammingError; pageStream.posSaved _ FALSE; }; AdvancePageStream: PROC [] RETURNS [endOfLog: BOOL] = { pageStream.pageInPageRun _ pageStream.pageInPageRun+1; IF pageStream.pageInPageRun = pageStream.chunk.first.pageRun.pages THEN { p: PageNumber; YggBuffMan.ReleaseVMPageSet[ vMPageSet: pageStream.chunk, releaseState: clean, keep: pageStream.posSaved]; p _ PageFollowing[pageStream.chunk.first.pageRun]; IF p = 0 THEN pageStream.expectedVersion _ 1 - pageStream.expectedVersion; pageStream.chunk _ YggBuffMan.ReadPages[fileHandle: logFileHandle, tid: YggEnvironment.nullTransID, pageRun: PageRunStartingWith[p]]; pageStream.pagePtr _ pageStream.chunk.first.buffer; pageStream.pageInPageRun _ 0; IF NOT pageStream.posSaved THEN ReadAhead[]; } ELSE pageStream.pagePtr _ pageStream.pagePtr + YggLog.wordsPerPage; TRUSTED {RETURN[ (NOT Header[pageStream.pagePtr].valid) OR (pageStream.expectedVersion # Header[pageStream.pagePtr].version) ]; }; }; SetPageStreamPosFromSavedPos: PROC [] = { IF NOT pageStream.posSaved THEN ERROR InternalProgrammingError; IF PageRun[pageStream.chunk.first.pageRun.firstPage, pageStream.chunk.first.pageRun.pages] # pageStream.savePageRun THEN { YggBuffMan.ReleaseVMPageSet[vMPageSet: pageStream.chunk, releaseState: clean, keep: TRUE]; pageStream.chunk _ YggBuffMan.ReadPages[fileHandle: logFileHandle, tid: YggEnvironment.nullTransID, pageRun: pageStream.savePageRun]; pageStream.expectedVersion _ pageStream.saveExpectedVersion; }; pageStream.pageInPageRun _ pageStream.savePageInPageRun; pageStream.pagePtr _ pageStream.chunk.first.buffer + YggInline.ShortWordsFromPages[pageStream.pageInPageRun, log2MDwordsPerPage]; }; RecordStreamObject: TYPE = RECORD [ id: RecordID ]; recordStream: RecordStreamObject; OpenRecordStreamFromWord: PUBLIC PROC [firstWord: WordNumber] RETURNS [notStartOfRecord: BOOL, currentRecord: RecordID] = { notStartOfRecord _ OpenRecordStream[firstWord: firstWord, firstRecord: YggLogInline.RecordIDFromWordNumber[firstWord]]; RETURN [notStartOfRecord, recordStream.id]; }; OpenRecordStreamFromCheckpoint: PUBLIC PROC [ checkpointWord: WordNumber, checkpointRecord: RecordID, firstRecord: RecordID] RETURNS [notStartOfRecord: BOOL, currentRecord: RecordID] = { offsetFromCheckpoint, firstWord: WordCount; TRUSTED { IF ConstArith.Compare[checkpointRecord, firstRecord] = less THEN ERROR CallerProgrammingError; }; offsetFromCheckpoint _ YggLogInline.WordsFromSubtract[ larger: checkpointRecord, smaller: firstRecord ! RuntimeError.BoundsFault => GOTO tooSmall]; IF YggLogBasic.Compare[offsetFromCheckpoint, logFileLength] # less THEN GOTO tooSmall; firstWord _ YggLogInline.SubNumberLessCount[checkpointWord, offsetFromCheckpoint]; IF firstWord.sign = negative THEN firstWord _ YggLogInline.AddNumberAndCount[firstWord, logFileLength]; notStartOfRecord _ OpenRecordStream[firstWord: firstWord, firstRecord: firstRecord]; RETURN [notStartOfRecord, firstRecord]; EXITS tooSmall => ERROR CallerProgrammingError; }; OpenRecordStream: PROC [firstWord: WordNumber, firstRecord: RecordID] RETURNS [notStartOfRecord: BOOL] = { p: PageNumber; w: CARDINAL; [page: p, wordInPage: w] _ YggInline.DecomposeWords[firstWord, YggLog.wordsPerPage*logPagesInFilePages]; IF YggLogInline.WordInPageFromRecordID[firstRecord, YggLog.wordsPerPage*logPagesInFilePages] # w THEN ERROR CallerProgrammingError; IF OpenPageStream[p].endOfLog THEN GOTO invalidPage; SavePageStreamPos[]; recordStream _ [id: firstRecord]; RETURN [VerifyCurrentRecordID[].notStartOfRecord]; EXITS invalidPage => ERROR InvalidPage; }; InvalidPage: PUBLIC ERROR = CODE; CloseRecordStream: PUBLIC PROC [] = { ClosePageStream[]; }; VerifyCurrentRecordID: PROC [] RETURNS [notStartOfRecord: BOOL] = { wordInPage: CARDINAL _ YggLogInline.WordInPageFromRecordID[recordStream.id, YggLog.wordsPerPage*logPagesInFilePages]; TRUSTED { w: CARDINAL _ (wordInPage/YggLog.wordsPerPage) * YggLog.wordsPerPage; UNTIL w >= wordInPage DO w _ w + Header[pageStream.pagePtr+w].nWords; ENDLOOP; IF w > wordInPage THEN RETURN [TRUE]; }; TRUSTED {RETURN [Header[pageStream.pagePtr+wordInPage].isContinuation]; }; }; CheckCurrentRecord: PUBLIC PROC [] RETURNS [truncated: BOOL] = TRUSTED { wordInPage: CARDINAL _ YggLogInline.WordInPageFromRecordID[recordStream.id, YggLog.wordsPerPage*logPagesInFilePages]; IF VerifyCurrentRecordID[].notStartOfRecord THEN ERROR InternalProgrammingError; IF Header[pageStream.pagePtr+wordInPage].hasContinuation THEN { IF (CARD[Header[pageStream.pagePtr+wordInPage].nWords] + wordInPage) MOD YggLog.wordsPerPage # 0 THEN GOTO checkFailed; wordInPage _ CARD[Header[pageStream.pagePtr+wordInPage].nWords] + wordInPage; DO wordInPage _ wordInPage + CARD[Header[pageStream.pagePtr+wordInPage].nWords]; IF wordInPage >= CARD[YggLog.wordsPerPage*logPagesInFilePages] THEN { IF AdvancePageStream[].endOfLog THEN GOTO truncatedRecord; wordInPage _ 0; }; IF NOT Header[pageStream.pagePtr+wordInPage].isContinuation THEN GOTO truncatedRecord; IF NOT Header[pageStream.pagePtr+wordInPage].hasContinuation THEN EXIT; IF CARD[Header[pageStream.pagePtr+wordInPage].nWords] MOD YggLog.wordsPerPage # 0 THEN GOTO checkFailed; ENDLOOP; IF CARD[Header[pageStream.pagePtr+wordInPage].nWords] NOT IN [1 .. YggLog.wordsPerPage] THEN GOTO checkFailed; } ELSE { IF CARD[Header[pageStream.pagePtr+wordInPage].nWords] NOT IN [1 .. CARD[YggLog.wordsPerPage*logPagesInFilePages-wordInPage]] THEN GOTO checkFailed; }; SetPageStreamPosFromSavedPos[]; RETURN [FALSE]; EXITS checkFailed => { SetPageStreamPosFromSavedPos[]; ERROR InternalProgrammingError }; truncatedRecord => { SetPageStreamPosFromSavedPos[]; RETURN [TRUE] }; }; GetCurrentRecord: PUBLIC PROC [currentRecord: RecordID, to: YggLog.Block] RETURNS [status: YggLog.ReadProcStatus, wordsRead: CARDINAL] = { wordInPage: CARDINAL _ YggLogInline.WordInPageFromRecordID[recordStream.id, YggLog.wordsPerPage*logPagesInFilePages]; wordsRead _ 0; IF YggLogInline.Compare[currentRecord, recordStream.id] # equal THEN ERROR CallerProgrammingError; DO wordsThisBlock, wordsThisCopy: CARDINAL; hasContinuation, isContinuation: BOOL; TRUSTED { [hasContinuation: hasContinuation, isContinuation: isContinuation, nWords: wordsThisBlock] _ Header[pageStream.pagePtr+wordInPage]^; }; IF wordsRead > 0 AND NOT isContinuation THEN GOTO truncatedRecord; wordsThisBlock _ wordsThisBlock-SIZE[YggLogRep.Header]; wordInPage _ wordInPage+SIZE[YggLogRep.Header]; DO wordsThisCopy _ MIN[to.length, wordsThisBlock]; IF to.base # NIL THEN TRUSTED { PBasics.Copy[ to: to.base, from: pageStream.pagePtr+wordInPage, nwords: wordsThisCopy/WORDS[CARD32]]; to.base _ to.base+wordsThisCopy; }; wordsRead _ wordsRead+wordsThisCopy; IF wordsThisCopy # to.length THEN { to.length _ to.length-wordsThisCopy; IF NOT hasContinuation THEN GOTO sourceExhausted; wordInPage _ wordInPage + wordsThisBlock; IF wordInPage >= CARD[YggLog.wordsPerPage*logPagesInFilePages] THEN { IF AdvancePageStream[].endOfLog THEN GOTO truncatedRecord; wordInPage _ 0; }; GOTO nextBlock; } ELSE IF to.rest # NIL THEN TRUSTED { to _ to.rest^; wordsThisBlock _ wordsThisBlock-wordsThisCopy; wordInPage _ wordInPage + wordsThisCopy; } ELSE { SetPageStreamPosFromSavedPos[]; RETURN [ IF (NOT hasContinuation AND wordsThisBlock = wordsThisCopy) THEN normal ELSE destinationFull, wordsRead] } REPEAT nextBlock => NULL; ENDLOOP; ENDLOOP; EXITS sourceExhausted => { SetPageStreamPosFromSavedPos[]; RETURN [sourceExhausted, wordsRead] }; truncatedRecord => { SetPageStreamPosFromSavedPos[]; ERROR CallerProgrammingError; }; }; GetCurrentPageAndVersion: PUBLIC PROC [] RETURNS [PageNumber, YggLogRep.PageVersion] = { RETURN[CurrentPageNumber[], pageStream.expectedVersion]; }; AdvanceRecordStream: PUBLIC PROC [] RETURNS [endOfLog, truncatedRecord: BOOL, currentRecord: RecordID] = TRUSTED { wordsInRecord: CARDINAL _ 0; mdWordsInALogPage: CARD _ CARD[YggLog.wordsPerPage*logPagesInFilePages]; endOfLog _ truncatedRecord _ FALSE; { -- block for EXITS wordInPage: CARDINAL _ YggLogInline.WordInPageFromRecordID[recordStream.id, mdWordsInALogPage]; ForgetSavedPageStreamPos[]; WHILE Header[pageStream.pagePtr+wordInPage].hasContinuation DO wordsInRecord _ wordsInRecord + Header[pageStream.pagePtr+wordInPage].nWords; wordInPage _ 0; IF AdvancePageStream[].endOfLog THEN { endOfLog _ truncatedRecord _ TRUE; GOTO return }; IF NOT Header[pageStream.pagePtr].isContinuation THEN { truncatedRecord _ TRUE; GOTO return }; ENDLOOP; { wordsInLastBlock: CARDINAL _ Header[pageStream.pagePtr+wordInPage].nWords; wordInPage _ wordInPage + wordsInLastBlock; wordsInRecord _ wordsInRecord + wordsInLastBlock; }; IF wordInPage < mdWordsInALogPage AND Header[pageStream.pagePtr+wordInPage].nWords = 0 THEN { wordsToSkip: CARDINAL; wordsToSkip _ YggLog.wordsPerPage - (wordInPage MOD (YggLog.wordsPerPage)) ; wordsInRecord _ wordsInRecord + wordsToSkip; wordInPage _ wordInPage + wordsToSkip; }; IF wordInPage = mdWordsInALogPage AND AdvancePageStream[].endOfLog THEN { endOfLog _ TRUE; }; GOTO return EXITS return => { SavePageStreamPos[]; recordStream.id _ YggLogInline.AddC[recordStream.id, wordsInRecord]; RETURN[endOfLog, truncatedRecord, recordStream.id] }; }}; Get: PUBLIC PROC [thisRecord: YggLog.RecordID, to: YggLog.Block] RETURNS [status: YggLog.ReadProcStatus, wordsRead: CARDINAL] = { chunk: YggBuffMan.VMPageSet; pagePtr: LONG POINTER; pagesLeftInChunk: CARDINAL; GetChunk: PROC [nextPage: PageNumber] = { chunk _ YggBuffMan.ReadPages[ fileHandle: logFileHandle, tid: YggEnvironment.nullTransID, pageRun: PageRunStartingWith[nextPage]]; pagePtr _ chunk.first.buffer; pagesLeftInChunk _ chunk.first.pageRun.pages - 1; }; ReleaseChunk: PROC [] = INLINE { YggBuffMan.ReleaseVMPageSet[vMPageSet: chunk, releaseState: clean, keep: FALSE]; }; AdvancePage: PROC [] = INLINE { IF pagesLeftInChunk = 0 THEN { nextPage: PageNumber _ PageFollowing[chunk.first.pageRun]; ReleaseChunk[]; IF nextPage = 0 THEN pageVersion _ 1-pageVersion; GetChunk[nextPage]; } ELSE { pagePtr _ pagePtr + YggLog.wordsPerPage; pagesLeftInChunk _ pagesLeftInChunk - 1; }; }; isFirstPage: BOOL _ TRUE; pageVersion: YggLogRep.PageVersion _ 0; wordInPage: CARDINAL; Verify: PROC [] = { w: CARDINAL _ 0; UNTIL w = wordInPage DO TRUSTED {w _ w + Header[pagePtr+w].nWords; }; IF w > wordInPage THEN ERROR InternalProgrammingError; ENDLOOP; }; { firstPage: PageNumber; [page: firstPage, wordInPage: wordInPage] _ YggInline.DecomposeWords[ WordNumberFromRecordID[thisRecord], YggLog.wordsPerPage]; GetChunk[firstPage]; }; Verify[]; wordsRead _ 0; DO wordsThisBlock, wordsThisCopy: CARDINAL; hasContinuation, isContinuation: BOOL; version: YggLogRep.PageVersion; TRUSTED { IF NOT Header[pagePtr].valid THEN VerifyGet[ chunk.first.pageRun.firstPage+(chunk.first.pageRun.pages-(1+pagesLeftInChunk)), pagePtr]; }; TRUSTED {[version: version, hasContinuation: hasContinuation, isContinuation: isContinuation, nWords: wordsThisBlock] _ Header[pagePtr+wordInPage]^; }; wordsThisBlock _ wordsThisBlock-YggLogRep.Header.SIZE; wordInPage _ wordInPage+YggLogRep.Header.SIZE; IF isFirstPage THEN { IF isContinuation THEN ERROR InternalProgrammingError; pageVersion _ version; isFirstPage _ FALSE } ELSE { IF NOT isContinuation OR (version # pageVersion) THEN ERROR InternalProgrammingError }; DO wordsThisCopy _ MIN[to.length, wordsThisBlock]; IF to.base # NIL THEN TRUSTED { PBasics.Copy[ to: to.base, from: pagePtr+wordInPage, nwords: wordsThisCopy/WORDS[CARD32]]; to.base _ to.base+wordsThisCopy; }; wordsRead _ wordsRead+wordsThisCopy; IF wordsThisCopy # to.length THEN { to.length _ to.length-wordsThisCopy; IF NOT hasContinuation THEN { ReleaseChunk[]; RETURN[sourceExhausted, wordsRead] }; AdvancePage[]; wordInPage _ 0; GOTO nextBlock; } ELSE IF to.rest # NIL THEN TRUSTED { to _ to.rest^; wordsThisBlock _ wordsThisBlock-wordsThisCopy; wordInPage _ wordInPage + wordsThisCopy; } ELSE { ReleaseChunk[]; RETURN [ IF (NOT hasContinuation AND wordsThisBlock = wordsThisCopy) THEN normal ELSE destinationFull, wordsRead] } REPEAT nextBlock => NULL; ENDLOOP; ENDLOOP; }; YggLog.wordsPerPage _ YggSunOS.LogBytesPerPage/BYTES[WORD]; END.  YggLogBasicImpl.mesa Copyright Σ 1985, 1987, 1988, 1989 by Xerox Corporation. All rights reserved. Implements stream log reading, log tail management, random log reading. Last edited by Bob Hagmann June 27, 1989 10:21:24 am PDT Static log file state Utilities The log as a whole YggLogBasic.EstablishLogFile YggLog tail. This data structure keeps status information about log chunks near the log tail. Specifically, any chunk that may be dirty but which has not yet been forced out must be present in the table. The table is managed as a queue which always has at least one element in it. When any chunk completes a forceout, its slot in the table is marked free. A slot is reused if it is free and corresponds to the earliest log page in the table. Types Contains pageRun (segmentPage, pages, etc.) and the LONG POINTER (buffer) to the mapped pages. ID of first word of first page of chunk. Set TRUE before forking process to force final update to the chunk; set FALSE when this process finishes. So if chunk is not the tail and NOT forceInProgress, it is eligible for re-use. State Variables below are only updated by AdvanceChunk, except that the TRUE -> FALSE transition of forceInProgress for a chunk is made by ForceChunk (forked by AdvanceChunk), and oldestFilePage is updated by Release. Wait here for a forceInProgress = FALSE in chunkTable. Version bit for pages in tail chunk RecordID of first word of first page of NEXT tail chunk. Used to bounds-check RecordIDs presented to Force and Get. Redundant; can be computed from chunkTable[tail] . The least-recently written page of the log file that cannot be overwritten. If enforceBufferPages, then do not allow writing within nBufferPages pages of oldestFilePage Operations on the log tail. YggLogBasicInternal.AdvanceChunk Caller must hold the log tail monitor; hence only one caller at a time. Fork a process to write the current tail chunk. Wait, if necessary, for room in the chunk table to allocate the next tail chunk. Now make sure that the write will not overwrite, or even get too close to, log pages that are still needed. About to start updating global data structures; from here on it's do or die--no more recoverable errors allowed; On entry, chunkTable[index].forceInProgress. Force the chunk, and set forceInProgress _ false. Note: do NOT hold monitor during force out. YggLogBasicInternal.ForceTo If the word before followingRecord is in the current tail chunk, force that chunk (using this process). Then wait for other chunks that precede followingRecord to go out. followingRecord must not be later than page about to be written. Return when all chunks that precede followingRecord are out. YggLogBasicInternal.OpenCoreForPut YggLogBasicInternal.CloseCoreForPut Returns r+c. Returns r+c. YggLogBasic.WordNumberFromRecordID Called from Get if it encounters a page marked invalid. YggLogBasic.Release Readonly page stream the log A readonly page stream on the log. This stream has a limited capability for backup: it can save one page position that MUST be returned to later. End of log is determined at the page level. The initial stream position is NOT saved. It is only valid to forget when positioned at the saved position. A readonly log record stream. YggLog reading during recovery is significantly different than log reading for carrying out intentions. In the case of recovery, we expect to encounter the log end while reading; in carrying out intentions we expect all log records to be perfectly formed. Also, log reading for recovery is sequential, while log reading for intentions is more random. In the case of recovery we need the ability to first "peek" and then read; for intentions, reading is enough. of current log record, whose first word is on the current page stream page. Read one log block from this page. Read the header for the log block. Copy a run of words to "to". Now what? to.length > wordsThisCopy = wordsThisBlock; go read next log block. wordsThisCopy = to.length AND to.rest # NIL, so not done copying. wordsThisCopy = to.length AND to.rest = NIL, so destination full; return. It is ok to call Advance when recordStream is in a bad state, e.g. not pointing to the start of a record. It should be pointing to the start of a block, however. Note that wordInPage = 0 is sufficient. Random log reading. YggLogBasic.Get Enable UNWIND => ReleaseChunk[] Ensure that wordInPage points to a YggLogRep.Header, assuming that page is properly formatted. Read one log block from this page. Read valid bit from the header for first block on this page. Read the header for this log block. Copy a run of words to "to". Now what? to.length > wordsThisCopy = wordsThisBlock; go read next log block. wordsThisCopy = to.length AND to.rest # NIL, so not done copying. wordsThisCopy = to.length AND to.rest = NIL, so destination full; return. Init Κ£˜šœ™IcodešœN™N—JšœG™Gšœ™K™)—˜šΟk ˜ J˜Jšœ˜Jšœ ˜ Jšœ˜Jšœ˜Jšœ ˜ Jšœ˜Jšœ˜J˜ Jšœ ˜ Jšœ ˜ Jšœ˜Jšœ ˜ Jšœ ˜ Jšœ˜Jšœ ˜ Jšœ ˜ J˜——šœ ˜š˜Jšœa˜a—š˜Jšœ*˜*—Jš˜J˜Kšœ œœ˜Kšœœœ˜+K˜J˜J˜Jšœ œ˜+Jšœ œΟc!˜PJšœ œ˜'Jšœ œ˜*Jšœ œ˜(Jšœ œ˜!J˜Jšœœœ˜%Jšœœœ˜'J˜J˜#J˜šΟnœœœœœœœœœ˜TJšœœ˜J˜J˜—šŸœœœœœœœœ˜VJšœœ˜J˜——headšœ™Jšœ&˜&J˜J˜Jšœœ˜JšœI˜IJšœœEΟdœ˜s—šœ ™ šŸœœœœ˜JJšœœ6˜WJ˜—šŸ œœœ˜CJšœ˜Jšœœ%œœ˜@J˜—š Ÿœœœœœ˜NJšœœ,˜8J˜——šœ™šŸœœœ˜$Jšœ™J˜ šœ˜Jš œ&œ5œBœjœ˜šJšœ˜šœ$œ ˜7Jšœ,˜,Jšœ˜—Jšœ#˜#JšœK˜KJ˜—˜J˜——šŸ œœœœ˜3Jšœ˜J˜J˜—šŸœœœ˜6K˜ KšœœXœœ&˜¦K˜——šœ ™ JšœQ™QJšœO™OJšœO™OJšœQ™QJšœU™UJšœ™J˜Jšœ™Jšœœ˜Jšœœ˜,šœœœ˜šœ7˜7Jšœ^™^—šœ0˜0Jšœ(™(—šœœ˜JšœR™RJšœ[™[Jšœ ™ —J˜—šŸœœœœ˜SJšœœœœœœœ˜QJ˜—Jšœ™JšœO™OJšœ?™?JšœC™CJšœ œœ˜3J˜šœ  œ˜Jšœ6™6—šœ"˜"Jšœ#™#—šœ!˜!JšœX™XJšœO™O—˜JšœK™K—Jšœ œ˜ Jšœ œ˜!šœœ˜JšœJ™JJšœ™——šœ™š Ÿ œœœœœ˜,Jšœ.œœœ˜RJ˜Jšœ ™ JšœG™GJ˜J˜J˜J˜"J˜Jšœ0™0Jšœ#œ˜(Jšœœ˜9J˜JšœQ™Qš˜J˜$Jšœœ%œœ˜5Jšœ ˜Jšœ˜J˜—J™lJ˜?J˜/Jšœ5˜5šœ˜ J˜:—šœ˜Jšœ9˜;šœMœ˜SJšœœœ˜$J˜—J™—JšœMž%™rJ™šœœ˜'šœ˜JšœAœ˜GJ˜——Jšœœ˜1J˜J˜,šœ-˜-JšœU˜U—šœ7˜7Jšœz˜zJ˜—šœ˜J˜JšœM˜MJšœ3˜3—J˜J˜J˜—šŸ œœ˜-Jšœ-™-Jšœ2™2Jšœ,™,šŸœœœœ˜.Jšœ$œ˜*Jš œ ˜J˜—Jšœ6˜6J˜J˜J˜—šŸœœœ ˜4Jšœ™JšœQ™QJšœQ™QJšœ™šŸœœœ˜ Jšœ œ*˜@Jšœ@™@šœ@˜FJšœœœ˜)—šœGœ˜OJšœ2˜2Jšœœ˜'—Jšœœœ˜-J˜—šŸ œœœ˜Jšœ<™<šœœ˜7š ˜ šœO œœ*˜‹šœ ˜Jšœ˜—J˜——Jšœ˜—J˜—Jšœ œ˜,J˜'šœ œ˜Jšœ/˜/JšœIœ˜Q—J˜ J˜J˜—šŸœœœœ˜$JšœK˜KJšœœœœ˜;Jšœ"™"Jšœ œ˜Jšœœ˜Jšœœ˜J˜Jšœœ˜J˜'Jšœ’˜’Jšœ˜šœA˜AJšœ˜—šœ˜JšœN˜NJšœ3˜3—J˜J˜J˜—šŸœœœœ˜/Jšœœ˜Jšœœ˜J˜J˜—šŸœœœœ˜*Jšœ#™#šœœ˜,šœ˜&Jšœ ˜Jšœ˜—Jšœ2˜2šœ˜Jšœ;œ˜B—J˜Jšœ˜—J˜J˜—š Ÿœœœœœ˜IJšœ ™ Jšœœ ˜,J˜J˜—š Ÿœœ œœœ œ˜`Jšœ œ*˜AJ˜—šŸœœ%œœ˜^Jšœ ™ Jšœ˜"J˜J˜—šŸœœœœ˜@Jšœ˜ Jšœ"™"J˜#Jš œ;œœ ˜\šœ9˜9JšœKœ ˜Z—JšœDœœ ˜YšœH˜HJšœs˜s—Jšœ!˜!Jšœœœ7œ ˜jš˜Jšœ œ˜)Jšœ œ˜)—J˜J˜—š Ÿ œœœœœ˜BJšœ7™7Jš œœœ˜1Jšœwœœ˜…Jšœ˜J˜J˜—šŸœœœ˜1Jšœ™šŸœœœ#œ˜MJ˜%—šœ0˜0Jšœ<˜<—J˜——šœ™Jšœ"™"JšœS™SJšœ™Jšœ+™+J˜Jšœœ˜J˜šœœœ˜!Jšœ7˜7J˜Jšœ"œ˜+Jšœ<˜˜>—šœœ#˜:J˜G—J˜J˜—šŸœœ˜)Jšœœœœ˜?šœrœ˜zJšœTœ˜Z˜Jšœr˜r—J˜˜DJšœ˜—š˜Jšœ"™"Jšœœ˜(Jšœ!œ˜&Jšœ"™"J˜‘Jš œœœœœ˜BJšœ œ˜7Jšœœ˜/š˜Jšœ™Jšœœ˜/šœ œœ ˜˜ J˜W—J˜ J˜—J˜$Jšœ ™ šœœ˜#JšœC™CJ˜$Jšœœœœ˜1Jšœ)˜)šœE˜EJšœœœ˜:J˜J˜—Jšœ ˜J˜—šœœ œœ ˜$JšœA™AJ˜J˜.J˜(J˜—šœ˜JšœI™IJ˜šœ˜Jšœœœ!œ˜GJšœ˜ —J˜—š˜Jšœ œ˜—Jšœ˜—Jšœ˜—š˜˜J˜Jšœ˜#J˜—˜J˜Jšœ˜J˜——J˜J˜—šŸœœœ˜(Jšœ(˜/Jšœ2˜8J˜J˜—šŸœœœ˜#Jšœœ&˜NJšœ\™\JšœP™PJšœ™Jšœœ˜Jšœœœ*˜HJšœœ˜#Jšœž˜Jšœ œK˜_J˜šœ7˜>J˜MJ˜šœœ˜&Jšœœœ ˜2—šœœ+œ˜7Jšœœœ ˜'—Jšœ˜—šœ˜Jšœœ0˜JJ˜+J˜1J˜—šœ œ2œ˜]Jšœ œ˜Jšœ0œ˜LJšœ,˜,Jšœ&˜&J˜—šœ œœ˜IJšœ œ˜J˜—Jšœ˜ šœ ˜J˜JšœD˜DJšœ/˜5—J˜——šœ™šŸœœœ0˜@Jšœ,œ˜@Jšœ™Jšœœ™J˜Jšœ˜Jšœ œœ˜Jšœœ˜J˜šŸœœ˜*˜šœ˜Jšœd˜d——J˜J˜1J˜J˜—šŸ œœœ˜ JšœIœ˜PJ˜J˜—šŸ œœœ˜šœœ˜J˜:J˜Jšœœ˜1J˜J˜—šœ˜Jšœ(˜(J˜(J˜—J˜J˜—Jšœ œœ˜Jšœ'˜'Jšœ œ˜J˜šŸœœ˜JšœG™GJšœ™Jšœœ˜šœ˜Jšœ&˜-Jšœœœ˜6Jšœ˜—J˜—˜J˜šœE˜EJšœ9˜9—J˜J˜—J˜ J˜š˜Jšœ"™"Jšœœ˜(Jšœ!œ˜&Jšœ˜Jšœ<™