<> <> <> <> <> <> <> DIRECTORY Basics, YggDummyProcess, YggEnvironment, YggFile, YggInternal, YggInline, YggFilePageMgr, YggLog, YggLogBasic, YggLogInline, YggLogBasicInternal, YggLogRep; YggLogBasicImpl: CEDAR MONITOR IMPORTS Basics, YggDummyProcess, YggFile, YggInline, YggFilePageMgr, YggLog, YggLogBasic, YggLogInline EXPORTS YggLogBasic, YggLogBasicInternal = BEGIN PageCount: TYPE = YggEnvironment.PageCount; PageNumber: TYPE = YggEnvironment.PageNumber; PageRun: TYPE = YggEnvironment.PageRun; VolumeID: TYPE = YggEnvironment.VolumeID; WordNumber: TYPE = YggLogBasic.WordNumber; WordCount: TYPE = YggLogBasic.WordCount; RecordID: TYPE = YggLog.RecordID; nullRecordID: RecordID = YggLog.nullRecordID; Comparison: TYPE = YggLog.Comparison; CallerProgrammingError: ERROR = CODE; InternalProgrammingError: ERROR = CODE; Header: PROC [p: LONG POINTER] RETURNS [LONG POINTER TO YggLogRep.Header] = INLINE { RETURN [LOOPHOLE[p]] }; <> logFileHandle: YggInternal.FileHandle; logFileSize: PageCount; logFileLength: WordCount; logChunkSize: INT = 4; <> PageRunStartingWith: PROC [p: PageNumber] RETURNS [pr: PageRun] = INLINE { RETURN [[firstPage: p, count: MIN[logFileSize-p, logChunkSize]]] }; PageFollowing: PROC [pr: PageRun] RETURNS [p: PageNumber] = INLINE { p _ pr.firstPage+pr.count; RETURN [IF p = logFileSize THEN 0 ELSE p] }; IsMemberOfPageRun: PROC [p: PageNumber, pr: PageRun] RETURNS [BOOL] = INLINE { RETURN [p IN [pr.firstPage .. pr.firstPage+pr.count)] }; <> EstablishLogFile: PUBLIC PROC [ logFile: YggEnvironment.DID] = { <> logFileHandle _ YggFile.FilesForDID[did: logFile].contents; logFileSize _ YggFilePageMgr.GetSize[logFileHandle]; logFileLength _ YggInline.WordsFromPages[logFileSize, YggLog.logDeviceCharacteristics]; }; LogFileSize: PUBLIC PROC [] RETURNS [PageCount] = { RETURN[logFileSize]; }; LogUsage: PUBLIC ENTRY PROC [] RETURNS [PageCount] ~ { nPages: INT; RETURN[IF (nPages _ (PageFollowing[chunkTable[tail].chunk.pageRun] - oldestFilePage)) >= 0 THEN nPages + nBufferPages ELSE nPages + nBufferPages + logFileSize]; }; <> <> <> <> <> <> <> <> ChunkTableSize: CARDINAL = 4; ChunkTableIndex: TYPE = [0..ChunkTableSize); ChunkTableRec: TYPE = RECORD [ chunk: YggFilePageMgr.VMPageSet _ YggFilePageMgr.nullVMPageSet, <> recordID: YggLog.RecordID _ YggLog.nullRecordID, <> forceInProgress: BOOL _ FALSE <> <> <> ]; NextChunkTableIndex: PROC [i: ChunkTableIndex] RETURNS [ChunkTableIndex] = INLINE { RETURN [IF i = LAST[ChunkTableIndex] THEN FIRST[ChunkTableIndex] ELSE SUCC[i]] }; <> < FALSE>> <> <<(forked by AdvanceChunk), and oldestFilePage is updated by Release.>> chunkTable: ARRAY ChunkTableIndex OF ChunkTableRec; tail: ChunkTableIndex; completion: CONDITION; <> curVersion: YggLogRep.PageVersion; <> nextTailChunkID: YggLog.RecordID; <> <> oldestFilePage: PageNumber; <> enforceBufferPages: BOOL; nBufferPages: INT = 100; <> <> <> AdvanceChunk: PUBLIC ENTRY PROC [] RETURNS [ version: YggLogRep.PageVersion, pagesInChunk: INT, firstPagePtr: LONG POINTER] = { <> <> newTail: ChunkTableIndex; nextPage: PageNumber; runForNextPage: PageRun; biasedOldestFilePage: PageNumber; << Fork a process to write the current tail chunk.>> chunkTable[tail].forceInProgress _ TRUE; TRUSTED {YggDummyProcess.Detach[FORK ForceChunk[index: tail]]; }; << Wait, if necessary, for room in the chunk table to allocate the next tail chunk.>> DO newTail _ NextChunkTableIndex[tail]; IF NOT chunkTable[newTail].forceInProgress THEN EXIT; WAIT completion; ENDLOOP; << Now make sure that the write will not overwrite, or even get too close to, log pages that are still needed.>> nextPage _ PageFollowing[chunkTable[tail].chunk.pageRun]; runForNextPage _ PageRunStartingWith[nextPage]; biasedOldestFilePage _ oldestFilePage - nBufferPages; IF biasedOldestFilePage < 0 THEN biasedOldestFilePage _ biasedOldestFilePage + logFileSize; IF (enforceBufferPages AND IsMemberOfPageRun[biasedOldestFilePage, runForNextPage]) OR IsMemberOfPageRun[oldestFilePage, runForNextPage] THEN { RETURN WITH ERROR YggLog.WriteFailed }; << >> << About to start updating global data structures; from here on it's do or die--no more recoverable errors allowed;>> <<>> IF chunkTable[newTail].chunk.pages # NIL THEN YggFilePageMgr.ReleaseVMPageSet[ vMPageSet: chunkTable[newTail].chunk, releaseState: clean, keep: TRUE]; IF nextPage = 0 THEN curVersion _ 1 - curVersion; tail _ newTail; chunkTable[tail].recordID _ nextTailChunkID; chunkTable[tail].chunk _ YggFilePageMgr.UseLogPages[ fileHandle: logFileHandle, pageRun: runForNextPage]; nextTailChunkID _ YggLogInline.AddC[r: nextTailChunkID, words: YggInline.ShortWordsFromPages[chunkTable[tail].chunk.pageRun.count, YggLog.logDeviceCharacteristics]]; RETURN [ version: curVersion, pagesInChunk: chunkTable[tail].chunk.pageRun.count, firstPagePtr: chunkTable[tail].chunk.pages]; }; ForceChunk: PROC [index: ChunkTableIndex] = { << On entry, chunkTable[index].forceInProgress.>> << Force the chunk, and set forceInProgress _ false.>> << Note: do NOT hold monitor during force out.>> BroadcastCompletion: ENTRY PROC [] = INLINE { chunkTable[index].forceInProgress _ FALSE; BROADCAST completion; }; YggFilePageMgr.ForceOutVMPageSet[chunkTable[index].chunk]; BroadcastCompletion[]; }; ForceTo: PUBLIC PROC [followingRecord: RecordID] = { <> <> <<(using this process). Then wait for other chunks that precede followingRecord to>> <> TailForceRequired: ENTRY PROC [] RETURNS [doForce: BOOL, chunk: YggFilePageMgr.VMPageSet] = TRUSTED { <> IF ConstArith.Compare[followingRecord, nextTailChunkID] = greater THEN RETURN WITH ERROR CallerProgrammingError; IF ConstArith.Compare[chunkTable[tail].recordID, followingRecord] = less THEN { YggFilePageMgr.ShareVMPageSet[chunkTable[tail].chunk]; RETURN [TRUE, chunkTable[tail].chunk] } ELSE RETURN [FALSE, YggFilePageMgr.nullVMPageSet] }; AwaitForce: ENTRY PROC [] = { <> FOR currentChunk: ChunkTableIndex IN ChunkTableIndex DO TRUSTED { IF ConstArith.Compare[chunkTable[currentChunk].recordID, followingRecord] = less THEN UNTIL NOT chunkTable[currentChunk].forceInProgress DO WAIT completion; ENDLOOP; }; ENDLOOP; }; doForce: BOOL; chunk: YggFilePageMgr.VMPageSet; [doForce, chunk] _ TailForceRequired[]; IF doForce THEN { YggFilePageMgr.ForceOutVMPageSet[vMPageSet: chunk]; YggFilePageMgr.ReleaseVMPageSet[vMPageSet: chunk, releaseState: clean, keep: TRUE] }; AwaitForce[]; }; OpenBasicForPut: PUBLIC ENTRY PROC [ nextPage: PageNumber, version: YggLogRep.PageVersion, nextRecord: RecordID] RETURNS [pagesInChunk: INT, firstPagePtr: LONG POINTER] = { <> enforceBufferPages _ FALSE; curVersion _ version; tail _ FIRST[ChunkTableIndex]; chunkTable[tail].recordID _ nextRecord; chunkTable[tail].chunk _ YggFilePageMgr.UseLogPages[ fileHandle: logFileHandle, pageRun: PageRunStartingWith[nextPage]]; nextTailChunkID _ YggLogInline.AddC[r: chunkTable[tail].recordID, words: YggInline.ShortWordsFromPages[chunkTable[tail].chunk.pageRun.count, YggLog.logDeviceCharacteristics]]; RETURN [ pagesInChunk: chunkTable[tail].chunk.pageRun.count, firstPagePtr: chunkTable[tail].chunk.pages]; }; AssertNormalOperation: PUBLIC ENTRY PROC [] = { enforceBufferPages _ TRUE; }; CloseBasicForPut: PUBLIC ENTRY PROC [] = { <> FOR i: ChunkTableIndex IN ChunkTableIndex DO WHILE chunkTable[i].forceInProgress DO WAIT completion; ENDLOOP; YggFilePageMgr.ForceOutVMPageSet[chunkTable[i].chunk]; YggFilePageMgr.ReleaseVMPageSet[ vMPageSet: chunkTable[i].chunk, releaseState: clean, keep: FALSE]; chunkTable[i] _ []; ENDLOOP; }; WordNumberFromRecordID: PUBLIC ENTRY PROC [thisRecord: RecordID] RETURNS [result: WordNumber] = { <> offsetFromNextTailChunk: WordCount; TRUSTED {IF ConstArith.Compare[thisRecord, nextTailChunkID] = greater THEN GOTO tooLarge; }; offsetFromNextTailChunk _ YggLogInline.WordsFromSubtract[ larger: nextTailChunkID, smaller: thisRecord ! Basics.BoundsFault => GOTO tooSmall]; IF YggLogBasic.Compare[offsetFromNextTailChunk, logFileLength] # less THEN GOTO tooSmall; result _ YggLogInline.SubNumberLessCount[ num: YggInline.WordsFromPages[ pages: PageFollowing[chunkTable[tail].chunk.pageRun], deviceCharacteristics: YggLog.logDeviceCharacteristics], words: offsetFromNextTailChunk]; RETURN [IF result.sign = negative THEN YggLogInline.AddNumberAndCount[result, logFileLength] ELSE result]; EXITS tooLarge => ERROR CallerProgrammingError; tooSmall => ERROR CallerProgrammingError; }; VerifyGet: ENTRY PROC [page: PageCount, pagePtr: LONG POINTER] = { <> TRUSTED {IF Header[pagePtr].valid THEN RETURN; }; IF IsMemberOfPageRun[page, chunkTable[tail].chunk.pageRun] THEN RETURN; ERROR; }; Release: PUBLIC PROC [beforeRecord: RecordID] = { <> AssignToOldestFilePage: ENTRY PROC [newOldestFilePage: PageNumber] = INLINE { oldestFilePage _ newOldestFilePage }; AssignToOldestFilePage[YggInline.PagesFromWords[ WordNumberFromRecordID[beforeRecord], YggLog.logDeviceCharacteristics]]; }; <> <> <> <> <> nChunksReadAhead: CARDINAL = 4; PageStreamObject: TYPE = RECORD [ chunk: YggFilePageMgr.VMPageSet _ YggFilePageMgr.nullVMPageSet, savePageRun: PageRun, pageInPageRun, savePageInPageRun: CARDINAL, expectedVersion, saveExpectedVersion: YggLogRep.PageVersion, pagePtr: LONG POINTER, posSaved: BOOL, nextReadAheadPage: PageNumber ]; pageStream: PageStreamObject; OpenPageStream: PROC [p: PageNumber] RETURNS [endOfLog: BOOL] = { <> pageStream.chunk _ YggFilePageMgr.ReadLogPages[ fileHandle: logFileHandle, pageRun: PageRunStartingWith[p]]; pageStream.pagePtr _ pageStream.chunk.pages; pageStream.pageInPageRun _ 0; pageStream.nextReadAheadPage _ PageFollowing[pageStream.chunk.pageRun]; THROUGH [1..nChunksReadAhead] DO ReadAhead[] ENDLOOP; TRUSTED { IF NOT Header[pageStream.pagePtr].valid THEN RETURN [TRUE]; pageStream.expectedVersion _ Header[pageStream.pagePtr].version; }; pageStream.posSaved _ FALSE; RETURN [FALSE]; }; ClosePageStream: PROC [] = { YggFilePageMgr.ReleaseVMPageSet[ vMPageSet: pageStream.chunk, releaseState: clean, keep: FALSE]; pageStream.chunk _ YggFilePageMgr.nullVMPageSet; }; ReadAhead: PROC [] = { YggFilePageMgr.ReadAheadLogPages[ logFileHandle, PageRunStartingWith[pageStream.nextReadAheadPage]]; pageStream.nextReadAheadPage _ IF pageStream.nextReadAheadPage < logFileSize - logChunkSize THEN pageStream.nextReadAheadPage + logChunkSize ELSE 0; }; SavePageStreamPos: PROC [] = { pageStream.savePageRun _ pageStream.chunk.pageRun; pageStream.savePageInPageRun _ pageStream.pageInPageRun; pageStream.saveExpectedVersion _ pageStream.expectedVersion; pageStream.posSaved _ TRUE; }; CurrentPageNumber: PROC [] RETURNS [PageNumber] = INLINE { RETURN [pageStream.chunk.pageRun.firstPage + pageStream.pageInPageRun]; }; ForgetSavedPageStreamPos: PROC [] = { <> IF NOT pageStream.posSaved THEN RETURN; IF pageStream.savePageRun # pageStream.chunk.pageRun THEN ERROR InternalProgrammingError; pageStream.posSaved _ FALSE; }; AdvancePageStream: PROC [] RETURNS [endOfLog: BOOL] = { pageStream.pageInPageRun _ pageStream.pageInPageRun+1; IF pageStream.pageInPageRun = pageStream.chunk.pageRun.count THEN { p: PageNumber; YggFilePageMgr.ReleaseVMPageSet[ vMPageSet: pageStream.chunk, releaseState: clean, keep: pageStream.posSaved]; p _ PageFollowing[pageStream.chunk.pageRun]; IF p = 0 THEN pageStream.expectedVersion _ 1 - pageStream.expectedVersion; pageStream.chunk _ YggFilePageMgr.ReadLogPages[fileHandle: logFileHandle, pageRun: PageRunStartingWith[p]]; pageStream.pagePtr _ pageStream.chunk.pages; pageStream.pageInPageRun _ 0; IF NOT pageStream.posSaved THEN ReadAhead[]; } ELSE pageStream.pagePtr _ pageStream.pagePtr + YggLog.wordsPerPage; TRUSTED {RETURN[ (NOT Header[pageStream.pagePtr].valid) OR (pageStream.expectedVersion # Header[pageStream.pagePtr].version) ]; }; }; SetPageStreamPosFromSavedPos: PROC [] = { IF NOT pageStream.posSaved THEN ERROR InternalProgrammingError; IF pageStream.chunk.pageRun # pageStream.savePageRun THEN { YggFilePageMgr.ReleaseVMPageSet[vMPageSet: pageStream.chunk, releaseState: clean, keep: TRUE]; pageStream.chunk _ YggFilePageMgr.ReadLogPages[fileHandle: logFileHandle, pageRun: pageStream.savePageRun]; pageStream.expectedVersion _ pageStream.saveExpectedVersion; }; pageStream.pageInPageRun _ pageStream.savePageInPageRun; pageStream.pagePtr _ pageStream.chunk.pages + YggInline.ShortWordsFromPages[pageStream.pageInPageRun, YggLog.logDeviceCharacteristics]; }; <> <> RecordStreamObject: TYPE = RECORD [ id: RecordID <> ]; recordStream: RecordStreamObject; OpenRecordStreamFromWord: PUBLIC PROC [firstWord: WordNumber] RETURNS [notStartOfRecord: BOOL, currentRecord: RecordID] = { notStartOfRecord _ OpenRecordStream[firstWord: firstWord, firstRecord: YggLogInline.RecordIDFromWordNumber[firstWord]]; RETURN [notStartOfRecord, recordStream.id]; }; OpenRecordStreamFromCheckpoint: PUBLIC PROC [ checkpointWord: WordNumber, checkpointRecord: RecordID, firstRecord: RecordID] RETURNS [notStartOfRecord: BOOL, currentRecord: RecordID] = { offsetFromCheckpoint, firstWord: WordCount; TRUSTED { IF ConstArith.Compare[checkpointRecord, firstRecord] = less THEN ERROR CallerProgrammingError; }; offsetFromCheckpoint _ YggLogInline.WordsFromSubtract[ larger: checkpointRecord, smaller: firstRecord ! Basics.BoundsFault => GOTO tooSmall]; IF YggLogBasic.Compare[offsetFromCheckpoint, logFileLength] # less THEN GOTO tooSmall; firstWord _ YggLogInline.SubNumberLessCount[checkpointWord, offsetFromCheckpoint]; IF firstWord.sign = negative THEN firstWord _ YggLogInline.AddNumberAndCount[firstWord, logFileLength]; notStartOfRecord _ OpenRecordStream[firstWord: firstWord, firstRecord: firstRecord]; RETURN [notStartOfRecord, firstRecord]; EXITS tooSmall => ERROR CallerProgrammingError; }; OpenRecordStream: PROC [firstWord: WordNumber, firstRecord: RecordID] RETURNS [notStartOfRecord: BOOL] = { p: PageNumber; w: CARDINAL; [page: p, wordInPage: w] _ YggInline.DecomposeWords[firstWord, YggLog.logDeviceCharacteristics]; IF YggLogInline.WordInPageFromRecordID[firstRecord, YggLog.wordsPerPage] # w THEN ERROR CallerProgrammingError; IF OpenPageStream[p].endOfLog THEN GOTO invalidPage; SavePageStreamPos[]; recordStream _ [id: firstRecord]; RETURN [VerifyCurrentRecordID[].notStartOfRecord]; EXITS invalidPage => ERROR InvalidPage; }; InvalidPage: PUBLIC ERROR = CODE; CurrentWordNumber: PROC [] RETURNS [w: WordNumber] = { TRUSTED {w _ ConstArith.Add[YggInline.WordsFromPages[CurrentPageNumber[], YggLog.logDeviceCharacteristics], ConstArith.FromCard[YggLogInline.WordInPageFromRecordID[recordStream.id, YggLog.wordsPerPage]]]}; }; CloseRecordStream: PUBLIC PROC [] = { ClosePageStream[]; }; VerifyCurrentRecordID: PROC [] RETURNS [notStartOfRecord: BOOL] = { wordInPage: CARDINAL _ YggLogInline.WordInPageFromRecordID[recordStream.id, YggLog.wordsPerPage]; TRUSTED { w: CARDINAL _ 0; UNTIL w >= wordInPage DO w _ w + Header[pageStream.pagePtr+w].nWords; ENDLOOP; IF w > wordInPage THEN RETURN [TRUE]; }; TRUSTED {RETURN [Header[pageStream.pagePtr+wordInPage].isContinuation]; }; }; CheckCurrentRecord: PUBLIC PROC [] RETURNS [truncated: BOOL] = TRUSTED { wordInPage: CARDINAL _ YggLogInline.WordInPageFromRecordID[recordStream.id, YggLog.wordsPerPage]; IF VerifyCurrentRecordID[].notStartOfRecord THEN ERROR InternalProgrammingError; IF Header[pageStream.pagePtr+wordInPage].hasContinuation THEN { IF (Header[pageStream.pagePtr+wordInPage].nWords + wordInPage) # YggLog.wordsPerPage THEN GOTO checkFailed; DO IF AdvancePageStream[].endOfLog THEN GOTO truncatedRecord; IF NOT Header[pageStream.pagePtr].isContinuation THEN GOTO truncatedRecord; IF NOT Header[pageStream.pagePtr].hasContinuation THEN EXIT; IF Header[pageStream.pagePtr].nWords # YggLog.wordsPerPage THEN GOTO checkFailed; ENDLOOP; IF Header[pageStream.pagePtr].nWords NOT IN [1 .. YggLog.wordsPerPage] THEN GOTO checkFailed; } ELSE { IF Header[pageStream.pagePtr+wordInPage].nWords NOT IN [1 .. YggLog.wordsPerPage-wordInPage] THEN GOTO checkFailed; }; SetPageStreamPosFromSavedPos[]; RETURN [FALSE]; EXITS checkFailed => { SetPageStreamPosFromSavedPos[]; ERROR InternalProgrammingError }; truncatedRecord => { SetPageStreamPosFromSavedPos[]; RETURN [TRUE] }; }; GetCurrentRecord: PUBLIC PROC [currentRecord: RecordID, to: YggLog.Block] RETURNS [status: YggLog.ReadProcStatus, wordsRead: CARDINAL] = { wordInPage: CARDINAL _ YggLogInline.WordInPageFromRecordID[recordStream.id, YggLog.wordsPerPage]; wordsRead _ 0; IF YggLogInline.Compare[currentRecord, recordStream.id] # equal THEN ERROR CallerProgrammingError; DO <> wordsThisBlock, wordsThisCopy: CARDINAL; hasContinuation, isContinuation: BOOL; <> TRUSTED { [hasContinuation: hasContinuation, isContinuation: isContinuation, nWords: wordsThisBlock] _ Header[pageStream.pagePtr+wordInPage]^; }; IF wordsRead > 0 AND NOT isContinuation THEN GOTO truncatedRecord; wordsThisBlock _ wordsThisBlock-SIZE[YggLogRep.Header]; wordInPage _ wordInPage+SIZE[YggLogRep.Header]; DO <> wordsThisCopy _ MIN[to.length, wordsThisBlock]; IF to.base # NIL THEN TRUSTED { Basics.Copy[ to: to.base, from: pageStream.pagePtr+wordInPage, nwords: wordsThisCopy]; to.base _ to.base+wordsThisCopy; }; wordsRead _ wordsRead+wordsThisCopy; <> IF wordsThisCopy # to.length THEN { < wordsThisCopy = wordsThisBlock; go read next log block.>> to.length _ to.length-wordsThisCopy; IF NOT hasContinuation THEN GOTO sourceExhausted; IF AdvancePageStream[].endOfLog THEN GOTO truncatedRecord; wordInPage _ 0; GOTO nextBlock; } ELSE IF to.rest # NIL THEN TRUSTED { <> to _ to.rest^; wordsThisBlock _ wordsThisBlock-wordsThisCopy; wordInPage _ wordInPage + wordsThisCopy; } ELSE { <> SetPageStreamPosFromSavedPos[]; RETURN [ IF (NOT hasContinuation AND wordsThisBlock = wordsThisCopy) THEN normal ELSE destinationFull, wordsRead] } REPEAT nextBlock => NULL; ENDLOOP; ENDLOOP; EXITS sourceExhausted => { SetPageStreamPosFromSavedPos[]; RETURN [sourceExhausted, wordsRead] }; truncatedRecord => { SetPageStreamPosFromSavedPos[]; ERROR CallerProgrammingError; }; }; GetCurrentPageAndVersion: PUBLIC PROC [] RETURNS [PageNumber, YggLogRep.PageVersion] = { RETURN[CurrentPageNumber[], pageStream.expectedVersion]; }; AdvanceRecordStream: PUBLIC PROC [] RETURNS [endOfLog, truncatedRecord: BOOL, currentRecord: RecordID] = TRUSTED { <> <> <> wordsInRecord: CARDINAL _ 0; endOfLog _ truncatedRecord _ FALSE; { -- block for EXITS wordInPage: CARDINAL _ YggLogInline.WordInPageFromRecordID[recordStream.id, YggLog.wordsPerPage]; ForgetSavedPageStreamPos[]; WHILE Header[pageStream.pagePtr+wordInPage].hasContinuation DO wordsInRecord _ wordsInRecord + Header[pageStream.pagePtr+wordInPage].nWords; wordInPage _ 0; IF AdvancePageStream[].endOfLog THEN { endOfLog _ truncatedRecord _ TRUE; GOTO return }; IF NOT Header[pageStream.pagePtr].isContinuation THEN { truncatedRecord _ TRUE; GOTO return }; ENDLOOP; { wordsInLastBlock: CARDINAL _ Header[pageStream.pagePtr+wordInPage].nWords; wordInPage _ wordInPage + wordsInLastBlock; wordsInRecord _ wordsInRecord + wordsInLastBlock }; IF wordInPage < YggLog.wordsPerPage AND Header[pageStream.pagePtr+wordInPage].nWords = 0 THEN { wordsInRecord _ wordsInRecord + (YggLog.wordsPerPage-wordInPage); wordInPage _ YggLog.wordsPerPage; }; IF wordInPage = YggLog.wordsPerPage AND AdvancePageStream[].endOfLog THEN { endOfLog _ TRUE; }; GOTO return EXITS return => { SavePageStreamPos[]; recordStream.id _ YggLogInline.AddC[recordStream.id, wordsInRecord]; RETURN[endOfLog, truncatedRecord, recordStream.id] }; }}; <> <> nTransitionPages: INT = 10 * logChunkSize; LocateFirstRecord: PUBLIC PROC [] RETURNS [firstRecord: WordNumber] = { notStartOfRecord: BOOL; endOfLog: BOOL _ FALSE; [notStartOfRecord: notStartOfRecord] _ OpenRecordStreamFromWord[ YggInline.WordsFromPages[LocateStartingPage[], YggLog.logDeviceCharacteristics]]; IF notStartOfRecord THEN [endOfLog: endOfLog] _ AdvanceRecordStream[]; IF endOfLog THEN ERROR InternalProgrammingError; firstRecord _ CurrentWordNumber[]; CloseRecordStream[]; RETURN [firstRecord]; }; LocateStartingPage: PROC [] RETURNS [PageNumber] = { <> SkipAhead: PROC [] = { <> <> nextPage: PageNumber = CurrentPageNumber[] + nTransitionPages; ClosePageStream[]; IF OpenPageStream[nextPage].endOfLog THEN ERROR InternalProgrammingError; }; startPage, beginPage: PageNumber; { -- Read to find a page in a "clean" area. IF OpenPageStream[0].endOfLog THEN GOTO skip; THROUGH [0 .. nTransitionPages) DO IF AdvancePageStream[].endOfLog THEN GOTO skip; ENDLOOP; EXITS skip => SkipAhead[]; }; startPage _ CurrentPageNumber[]; <> DO IF AdvancePageStream[].endOfLog THEN EXIT ENDLOOP; <> SkipAhead[]; beginPage _ CurrentPageNumber[]; <> UNTIL CurrentPageNumber[] = startPage DO IF AdvancePageStream[].endOfLog THEN ERROR InternalProgrammingError; ENDLOOP; ClosePageStream[]; RETURN [beginPage]; }; <> Get: PUBLIC PROC [thisRecord: YggLog.RecordID, to: YggLog.Block] RETURNS [status: YggLog.ReadProcStatus, wordsRead: CARDINAL] = { <> < ReleaseChunk[]>> chunk: YggFilePageMgr.VMPageSet; pagePtr: LONG POINTER; pagesLeftInChunk: CARDINAL; GetChunk: PROC [nextPage: PageNumber] = { chunk _ YggFilePageMgr.ReadLogPages[ fileHandle: logFileHandle, pageRun: PageRunStartingWith[nextPage]]; pagePtr _ chunk.pages; pagesLeftInChunk _ chunk.pageRun.count - 1; }; ReleaseChunk: PROC [] = INLINE { YggFilePageMgr.ReleaseVMPageSet[vMPageSet: chunk, releaseState: clean, keep: FALSE]; }; AdvancePage: PROC [] = INLINE { IF pagesLeftInChunk = 0 THEN { nextPage: PageNumber _ PageFollowing[chunk.pageRun]; ReleaseChunk[]; IF nextPage = 0 THEN pageVersion _ 1-pageVersion; GetChunk[nextPage]; } ELSE { pagePtr _ pagePtr + YggLog.wordsPerPage; pagesLeftInChunk _ pagesLeftInChunk - 1; }; }; isFirstPage: BOOL _ TRUE; pageVersion: YggLogRep.PageVersion _ 0; wordInPage: CARDINAL; Verify: PROC [] = { <> <> w: CARDINAL _ 0; UNTIL w = wordInPage DO TRUSTED {w _ w + Header[pagePtr+w].nWords; }; IF w > wordInPage THEN ERROR InternalProgrammingError; ENDLOOP; }; { firstPage: PageNumber; [page: firstPage, wordInPage: wordInPage] _ YggInline.DecomposeWords[ WordNumberFromRecordID[thisRecord], YggLog.logDeviceCharacteristics]; GetChunk[firstPage]; }; Verify[]; wordsRead _ 0; DO <> wordsThisBlock, wordsThisCopy: CARDINAL; hasContinuation, isContinuation: BOOL; version: YggLogRep.PageVersion; <> TRUSTED { IF NOT Header[pagePtr].valid THEN VerifyGet[ chunk.pageRun.firstPage+(chunk.pageRun.count-(1+pagesLeftInChunk)), pagePtr]; }; <> TRUSTED {[version: version, hasContinuation: hasContinuation, isContinuation: isContinuation, nWords: wordsThisBlock] _ Header[pagePtr+wordInPage]^; }; wordsThisBlock _ wordsThisBlock-YggLogRep.Header.SIZE; wordInPage _ wordInPage+YggLogRep.Header.SIZE; IF isFirstPage THEN { IF isContinuation THEN ERROR InternalProgrammingError; pageVersion _ version; isFirstPage _ FALSE } ELSE { IF NOT isContinuation OR (version # pageVersion) THEN ERROR InternalProgrammingError }; DO <> wordsThisCopy _ MIN[to.length, wordsThisBlock]; IF to.base # NIL THEN TRUSTED { Basics.Copy[ to: to.base, from: pagePtr+wordInPage, nwords: wordsThisCopy]; to.base _ to.base+wordsThisCopy; }; wordsRead _ wordsRead+wordsThisCopy; <> IF wordsThisCopy # to.length THEN { < wordsThisCopy = wordsThisBlock; go read next log block.>> to.length _ to.length-wordsThisCopy; IF NOT hasContinuation THEN { ReleaseChunk[]; RETURN[sourceExhausted, wordsRead] }; AdvancePage[]; wordInPage _ 0; GOTO nextBlock; } ELSE IF to.rest # NIL THEN TRUSTED { <> to _ to.rest^; wordsThisBlock _ wordsThisBlock-wordsThisCopy; wordInPage _ wordInPage + wordsThisCopy; } ELSE { <> ReleaseChunk[]; RETURN [ IF (NOT hasContinuation AND wordsThisBlock = wordsThisCopy) THEN normal ELSE destinationFull, wordsRead] } REPEAT nextBlock => NULL; ENDLOOP; ENDLOOP; }; END. OPEN PROBLEMS Need a way to get the log chunk size from FilePageMgr (see logChunkSize constant above). CHANGE LOG Created by MBrown on June 15, 1982 3:38 pm <> Changed by MBrown on September 10, 1982 9:45 pm <> <> Changed by MBrown on September 25, 1982 3:27 pm <> Changed by MBrown on November 16, 1982 10:10 pm <> Changed by MBrown on April 3, 1983 4:17 pm <> <> Changed by MBrown on June 27, 1983 9:56 am <> <> Changed by MBrown on January 30, 1984 10:18:19 am PST <> <> <> <> <> <> <> <> <> <<>>