<> <> <> <> <> <> <> DIRECTORY AlpineEnvironment, AlpineInternal, AlpineInline, FileMap, FilePageMgr, AlpineLog, LogBasic, LogInline, LogBasicInternal, LogRep, PrincOpsUtils, Process, RuntimeError; LogBasicCoreImpl: MONITOR IMPORTS AlpineInline, FileMap, FilePageMgr, AlpineLog, LogInline, PrincOpsUtils, Process, RuntimeError EXPORTS LogBasic, LogBasicInternal = BEGIN PageCount: TYPE = AlpineEnvironment.PageCount; PageNumber: TYPE = AlpineEnvironment.PageNumber; PageRun: TYPE = AlpineEnvironment.PageRun; FileID: TYPE = AlpineEnvironment.FileID; VolumeID: TYPE = AlpineEnvironment.VolumeID; WordNumber: TYPE = LogBasic.WordNumber; WordCount: TYPE = LogBasic.WordCount; RecordID: TYPE = AlpineLog.RecordID; nullRecordID: RecordID = AlpineLog.nullRecordID; Comparison: TYPE = AlpineLog.Comparison; wordsPerPage: CARDINAL = AlpineEnvironment.wordsPerPage; CallerProgrammingError: ERROR = CODE; InternalProgrammingError: ERROR = CODE; Header: PROC [p: LONG POINTER] RETURNS [LONG POINTER TO LogRep.Header] = INLINE { RETURN [LOOPHOLE[p]] }; <> logFileHandle: FileMap.Handle; logFileSize: PageCount; logFileLength: WordCount; logChunkSize: INT = 4; PageRunStartingWith: PROC [p: PageNumber] RETURNS [pr: PageRun] = INLINE { RETURN [[firstPage: p, count: MIN[logFileSize-p, logChunkSize]]] }; PageFollowing: PROC [pr: PageRun] RETURNS [p: PageNumber] = INLINE { p _ pr.firstPage+pr.count; RETURN [IF p = logFileSize THEN 0 ELSE p] }; IsMemberOfPageRun: PROC [p: PageNumber, pr: PageRun] RETURNS [BOOL] = INLINE { RETURN [p IN [pr.firstPage .. pr.firstPage+pr.count)] }; EstablishLogFile: PUBLIC PROC [ logVolume: AlpineEnvironment.VolumeID, logFile: AlpineEnvironment.FileID] = { <> logFileHandle _ FileMap.Register[volumeID: logVolume, fileID: logFile]; logFileSize _ FilePageMgr.GetSize[logFileHandle]; logFileLength _ AlpineInline.WordsFromPages[logFileSize]; }; LogFileSize: PUBLIC PROC [] RETURNS [PageCount] = { RETURN[logFileSize]; }; LogUsage: PUBLIC ENTRY PROC [] RETURNS [PageCount] ~ { nPages: INT; RETURN[IF (nPages _ (PageFollowing[chunkTable[tail].chunk.pageRun] - oldestFilePage)) >= 0 THEN nPages + nBufferPages ELSE nPages + nBufferPages + logFileSize]; }; <> <> <> <> <> <> <> <> ChunkTableSize: CARDINAL = 4; ChunkTableIndex: TYPE = [0..ChunkTableSize); ChunkTableRec: TYPE = RECORD [ chunk: FilePageMgr.VMPageSet _ FilePageMgr.nullVMPageSet, <> recordID: AlpineLog.RecordID _ AlpineLog.nullRecordID, <> forceInProgress: BOOL _ FALSE <> <> <> ]; NextChunkTableIndex: PROC [i: ChunkTableIndex] RETURNS [ChunkTableIndex] = INLINE { RETURN [IF i = LAST[ChunkTableIndex] THEN FIRST[ChunkTableIndex] ELSE SUCC[i]] }; <> < FALSE>> <> <<(forked by AdvanceChunk), and oldestFilePage is updated by Release.>> chunkTable: ARRAY ChunkTableIndex OF ChunkTableRec; tail: ChunkTableIndex; completion: CONDITION; <> curVersion: LogRep.PageVersion; <> nextTailChunkID: AlpineLog.RecordID; <> <> oldestFilePage: PageNumber; <> enforceBufferPages: BOOL; nBufferPages: INT = 2 * AlpineInternal.MaxCoordinators; <> <> <> AdvanceChunk: PUBLIC ENTRY PROC [] RETURNS [ version: LogRep.PageVersion, pagesInChunk: INT, firstPagePtr: LONG POINTER] = { <> <> newTail: ChunkTableIndex; <> chunkTable[tail].forceInProgress _ TRUE; Process.Detach[FORK ForceChunk[index: tail]]; <> DO newTail _ NextChunkTableIndex[tail]; IF NOT chunkTable[newTail].forceInProgress THEN EXIT; WAIT completion; ENDLOOP; IF chunkTable[newTail].chunk.pages # NIL THEN FilePageMgr.ReleaseVMPageSet[ vMPageSet: chunkTable[newTail].chunk, releaseState: clean, keep: TRUE]; { <> <> nextPage: PageNumber _ PageFollowing[chunkTable[tail].chunk.pageRun]; IF nextPage = 0 THEN curVersion _ 1 - curVersion; <> tail _ newTail; chunkTable[tail].recordID _ nextTailChunkID; chunkTable[tail].chunk _ FilePageMgr.UseLogPages[ fileHandle: logFileHandle, pageRun: PageRunStartingWith[nextPage]]; { -- hack to avoid completely filling up log file biasedOldestFilePage: PageNumber _ oldestFilePage - nBufferPages; IF biasedOldestFilePage < 0 THEN biasedOldestFilePage _ biasedOldestFilePage + logFileSize; IF (enforceBufferPages AND IsMemberOfPageRun[biasedOldestFilePage, chunkTable[tail].chunk.pageRun]) OR IsMemberOfPageRun[oldestFilePage, chunkTable[tail].chunk.pageRun] THEN ERROR AlpineLog.WriteFailed; }; nextTailChunkID _ LogInline.AddC[r: nextTailChunkID, words: AlpineInline.ShortWordsFromPages[chunkTable[tail].chunk.pageRun.count]]; }; RETURN [ version: curVersion, pagesInChunk: chunkTable[tail].chunk.pageRun.count, firstPagePtr: chunkTable[tail].chunk.pages]; }; ForceChunk: PROC [index: ChunkTableIndex] = { <> <> <> BroadcastCompletion: ENTRY PROC [] = INLINE { chunkTable[index].forceInProgress _ FALSE; BROADCAST completion; }; FilePageMgr.ForceOutVMPageSet[chunkTable[index].chunk]; BroadcastCompletion[]; }; ForceTo: PUBLIC PROC [followingRecord: RecordID] = { <> <> <<(using this process). Then wait for other chunks that precede followingRecord to>> <> TailForceRequired: ENTRY PROC [] RETURNS [doForce: BOOL, chunk: FilePageMgr.VMPageSet] = { <> IF AlpineLog.Compare[followingRecord, nextTailChunkID] = greater THEN RETURN WITH ERROR CallerProgrammingError; IF AlpineLog.Compare[chunkTable[tail].recordID, followingRecord] = less THEN { FilePageMgr.ShareVMPageSet[chunkTable[tail].chunk]; RETURN [TRUE, chunkTable[tail].chunk] } ELSE RETURN [FALSE, FilePageMgr.nullVMPageSet] }; AwaitForce: ENTRY PROC [] = { <> FOR currentChunk: ChunkTableIndex IN ChunkTableIndex DO IF AlpineLog.Compare[chunkTable[currentChunk].recordID, followingRecord] = less THEN UNTIL NOT chunkTable[currentChunk].forceInProgress DO WAIT completion; ENDLOOP; ENDLOOP; }; doForce: BOOL; chunk: FilePageMgr.VMPageSet; [doForce, chunk] _ TailForceRequired[]; IF doForce THEN { FilePageMgr.ForceOutVMPageSet[vMPageSet: chunk]; FilePageMgr.ReleaseVMPageSet[vMPageSet: chunk, releaseState: clean, keep: TRUE] }; AwaitForce[]; }; OpenCoreForPut: PUBLIC ENTRY PROC [ nextPage: PageNumber, version: LogRep.PageVersion, nextRecord: RecordID] RETURNS [pagesInChunk: INT, firstPagePtr: LONG POINTER] = { <> enforceBufferPages _ FALSE; curVersion _ version; tail _ FIRST[ChunkTableIndex]; chunkTable[tail].recordID _ nextRecord; chunkTable[tail].chunk _ FilePageMgr.UseLogPages[ fileHandle: logFileHandle, pageRun: PageRunStartingWith[nextPage]]; nextTailChunkID _ LogInline.AddC[r: chunkTable[tail].recordID, words: AlpineInline.ShortWordsFromPages[chunkTable[tail].chunk.pageRun.count]]; RETURN [ pagesInChunk: chunkTable[tail].chunk.pageRun.count, firstPagePtr: chunkTable[tail].chunk.pages]; }; AssertNormalOperation: PUBLIC ENTRY PROC [] = { enforceBufferPages _ TRUE; }; CloseCoreForPut: PUBLIC ENTRY PROC [] = { <> FOR i: ChunkTableIndex IN ChunkTableIndex DO WHILE chunkTable[i].forceInProgress DO WAIT completion; ENDLOOP; FilePageMgr.ForceOutVMPageSet[chunkTable[i].chunk]; FilePageMgr.ReleaseVMPageSet[ vMPageSet: chunkTable[i].chunk, releaseState: clean, keep: FALSE]; chunkTable[i] _ []; ENDLOOP; }; WordNumberFromRecordID: PUBLIC ENTRY PROC [thisRecord: RecordID] RETURNS [result: WordNumber] = { <> offsetFromNextTailChunk: WordCount; IF AlpineLog.Compare[thisRecord, nextTailChunkID] = greater THEN GOTO tooLarge; offsetFromNextTailChunk _ LogInline.WordsFromSubtract[ larger: nextTailChunkID, smaller: thisRecord ! RuntimeError.BoundsFault => GOTO tooSmall]; IF offsetFromNextTailChunk >= logFileLength THEN GOTO tooSmall; result _ AlpineInline.WordsFromPages[ pages: PageFollowing[chunkTable[tail].chunk.pageRun]] - offsetFromNextTailChunk; RETURN [IF result < 0 THEN result + logFileLength ELSE result]; EXITS tooLarge => ERROR CallerProgrammingError; tooSmall => ERROR CallerProgrammingError; }; VerifyGet: ENTRY PROC [page: PageCount, pagePtr: LONG POINTER] = { <> IF Header[pagePtr].valid THEN RETURN; IF IsMemberOfPageRun[page, chunkTable[tail].chunk.pageRun] THEN RETURN; ERROR; }; Release: PUBLIC PROC [beforeRecord: RecordID] = { <> AssignToOldestFilePage: ENTRY PROC [newOldestFilePage: PageNumber] = INLINE { oldestFilePage _ newOldestFilePage }; AssignToOldestFilePage[AlpineInline.PagesFromWords[ WordNumberFromRecordID[beforeRecord]]]; }; <> <> <> <> nChunksReadAhead: CARDINAL = 4; PageStreamObject: TYPE = RECORD [ chunk: FilePageMgr.VMPageSet _ FilePageMgr.nullVMPageSet, savePageRun: PageRun, pageInPageRun, savePageInPageRun: CARDINAL, expectedVersion, saveExpectedVersion: LogRep.PageVersion, pagePtr: LONG POINTER, posSaved: BOOL, nextReadAheadPage: PageNumber ]; pageStream: PageStreamObject; OpenPageStream: PROC [p: PageNumber] RETURNS [endOfLog: BOOL] = { <> pageStream.chunk _ FilePageMgr.ReadLogPages[ fileHandle: logFileHandle, pageRun: PageRunStartingWith[p]]; pageStream.pagePtr _ pageStream.chunk.pages; pageStream.pageInPageRun _ 0; pageStream.nextReadAheadPage _ PageFollowing[pageStream.chunk.pageRun]; THROUGH [1..nChunksReadAhead] DO ReadAhead[] ENDLOOP; IF NOT Header[pageStream.pagePtr].valid THEN RETURN [TRUE]; pageStream.expectedVersion _ Header[pageStream.pagePtr].version; pageStream.posSaved _ FALSE; RETURN [FALSE]; }; ClosePageStream: PROC [] = { FilePageMgr.ReleaseVMPageSet[ vMPageSet: pageStream.chunk, releaseState: clean, keep: FALSE]; pageStream.chunk _ FilePageMgr.nullVMPageSet; }; ReadAhead: PROC [] = { FilePageMgr.ReadAheadLogPages[ logFileHandle, PageRunStartingWith[pageStream.nextReadAheadPage]]; pageStream.nextReadAheadPage _ IF pageStream.nextReadAheadPage < logFileSize - logChunkSize THEN pageStream.nextReadAheadPage + logChunkSize ELSE 0; }; SavePageStreamPos: PROC [] = { pageStream.savePageRun _ pageStream.chunk.pageRun; pageStream.savePageInPageRun _ pageStream.pageInPageRun; pageStream.saveExpectedVersion _ pageStream.expectedVersion; pageStream.posSaved _ TRUE; }; CurrentPageNumber: PROC [] RETURNS [PageNumber] = INLINE { RETURN [pageStream.chunk.pageRun.firstPage + pageStream.pageInPageRun]; }; ForgetSavedPageStreamPos: PROC [] = { <> IF NOT pageStream.posSaved THEN RETURN; IF pageStream.savePageRun # pageStream.chunk.pageRun THEN ERROR InternalProgrammingError; pageStream.posSaved _ FALSE; }; AdvancePageStream: PROC [] RETURNS [endOfLog: BOOL] = { pageStream.pageInPageRun _ pageStream.pageInPageRun+1; IF pageStream.pageInPageRun = pageStream.chunk.pageRun.count THEN { p: PageNumber; FilePageMgr.ReleaseVMPageSet[ vMPageSet: pageStream.chunk, releaseState: clean, keep: pageStream.posSaved]; p _ PageFollowing[pageStream.chunk.pageRun]; IF p = 0 THEN pageStream.expectedVersion _ 1 - pageStream.expectedVersion; pageStream.chunk _ FilePageMgr.ReadLogPages[fileHandle: logFileHandle, pageRun: PageRunStartingWith[p]]; pageStream.pagePtr _ pageStream.chunk.pages; pageStream.pageInPageRun _ 0; IF NOT pageStream.posSaved THEN ReadAhead[]; } ELSE pageStream.pagePtr _ pageStream.pagePtr + wordsPerPage; RETURN[ (NOT Header[pageStream.pagePtr].valid) OR (pageStream.expectedVersion # Header[pageStream.pagePtr].version) ]; }; SetPageStreamPosFromSavedPos: PROC [] = { IF NOT pageStream.posSaved THEN ERROR InternalProgrammingError; IF pageStream.chunk.pageRun # pageStream.savePageRun THEN { FilePageMgr.ReleaseVMPageSet[vMPageSet: pageStream.chunk, releaseState: clean, keep: TRUE]; pageStream.chunk _ FilePageMgr.ReadLogPages[fileHandle: logFileHandle, pageRun: pageStream.savePageRun]; pageStream.expectedVersion _ pageStream.saveExpectedVersion; }; pageStream.pageInPageRun _ pageStream.savePageInPageRun; pageStream.pagePtr _ pageStream.chunk.pages + AlpineInline.ShortWordsFromPages[pageStream.pageInPageRun]; }; <> <> RecordStreamObject: TYPE = RECORD [ id: RecordID <> ]; recordStream: RecordStreamObject; OpenRecordStreamFromWord: PUBLIC PROC [firstWord: WordNumber] RETURNS [notStartOfRecord: BOOL, currentRecord: RecordID] = { notStartOfRecord _ OpenRecordStream[firstWord: firstWord, firstRecord: LogInline.RecordIDFromWordNumber[firstWord]]; RETURN [notStartOfRecord, recordStream.id]; }; OpenRecordStreamFromCheckpoint: PUBLIC PROC [ checkpointWord: WordNumber, checkpointRecord: RecordID, firstRecord: RecordID] RETURNS [notStartOfRecord: BOOL, currentRecord: RecordID] = { offsetFromCheckpoint, firstWord: WordCount; IF AlpineLog.Compare[checkpointRecord, firstRecord] = less THEN ERROR CallerProgrammingError; offsetFromCheckpoint _ LogInline.WordsFromSubtract[ larger: checkpointRecord, smaller: firstRecord ! RuntimeError.BoundsFault => GOTO tooSmall]; IF offsetFromCheckpoint >= logFileLength THEN GOTO tooSmall; firstWord _ checkpointWord - offsetFromCheckpoint; IF firstWord < 0 THEN firstWord _ firstWord + logFileLength; notStartOfRecord _ OpenRecordStream[firstWord: firstWord, firstRecord: firstRecord]; RETURN [notStartOfRecord, firstRecord]; EXITS tooSmall => ERROR CallerProgrammingError; }; OpenRecordStream: PROC [firstWord: WordNumber, firstRecord: RecordID] RETURNS [notStartOfRecord: BOOL] = { p: PageNumber; w: CARDINAL [0..wordsPerPage); [page: p, wordInPage: w] _ AlpineInline.DecomposeWords[firstWord]; IF LogInline.WordInPageFromRecordID[firstRecord] # w THEN ERROR CallerProgrammingError; IF OpenPageStream[p].endOfLog THEN GOTO invalidPage; SavePageStreamPos[]; recordStream _ [id: firstRecord]; RETURN [VerifyCurrentRecordID[].notStartOfRecord]; EXITS invalidPage => ERROR InvalidPage; }; InvalidPage: PUBLIC ERROR = CODE; CurrentWordNumber: PROC [] RETURNS [WordNumber] = { RETURN [AlpineInline.WordsFromPages[CurrentPageNumber[]] + LogInline.WordInPageFromRecordID[recordStream.id]]; }; CloseRecordStream: PUBLIC PROC [] = { ClosePageStream[]; }; VerifyCurrentRecordID: PROC [] RETURNS [notStartOfRecord: BOOL] = { wordInPage: CARDINAL _ LogInline.WordInPageFromRecordID[recordStream.id]; { w: CARDINAL _ 0; UNTIL w >= wordInPage DO w _ w + Header[pageStream.pagePtr+w].nWords; ENDLOOP; IF w > wordInPage THEN RETURN [TRUE]; }; RETURN [Header[pageStream.pagePtr+wordInPage].isContinuation]; }; CheckCurrentRecord: PUBLIC PROC [] RETURNS [truncated: BOOL] = { wordInPage: CARDINAL _ LogInline.WordInPageFromRecordID[recordStream.id]; IF VerifyCurrentRecordID[].notStartOfRecord THEN ERROR InternalProgrammingError; IF Header[pageStream.pagePtr+wordInPage].hasContinuation THEN { IF (Header[pageStream.pagePtr+wordInPage].nWords + wordInPage) # wordsPerPage THEN GOTO checkFailed; DO IF AdvancePageStream[].endOfLog THEN GOTO truncatedRecord; IF NOT Header[pageStream.pagePtr].isContinuation THEN GOTO truncatedRecord; IF NOT Header[pageStream.pagePtr].hasContinuation THEN EXIT; IF Header[pageStream.pagePtr].nWords # wordsPerPage THEN GOTO checkFailed; ENDLOOP; IF Header[pageStream.pagePtr].nWords NOT IN [1 .. wordsPerPage] THEN GOTO checkFailed; } ELSE { IF Header[pageStream.pagePtr+wordInPage].nWords NOT IN [1 .. wordsPerPage-wordInPage] THEN GOTO checkFailed; }; SetPageStreamPosFromSavedPos[]; RETURN [FALSE]; EXITS checkFailed => { SetPageStreamPosFromSavedPos[]; ERROR InternalProgrammingError }; truncatedRecord => { SetPageStreamPosFromSavedPos[]; RETURN [TRUE] }; }; GetCurrentRecord: PUBLIC PROC [currentRecord: RecordID, to: AlpineLog.Block] RETURNS [status: AlpineLog.ReadProcStatus, wordsRead: CARDINAL] = { wordInPage: CARDINAL _ LogInline.WordInPageFromRecordID[recordStream.id]; wordsRead _ 0; IF LogInline.Compare[currentRecord, recordStream.id] # equal THEN ERROR CallerProgrammingError; DO <> wordsThisBlock, wordsThisCopy: CARDINAL; hasContinuation, isContinuation: BOOL; <> [hasContinuation: hasContinuation, isContinuation: isContinuation, nWords: wordsThisBlock] _ Header[pageStream.pagePtr+wordInPage]^; IF wordsRead > 0 AND NOT isContinuation THEN GOTO truncatedRecord; wordsThisBlock _ wordsThisBlock-SIZE[LogRep.Header]; wordInPage _ wordInPage+SIZE[LogRep.Header]; DO <> wordsThisCopy _ MIN[to.length, wordsThisBlock]; IF to.base # NIL THEN { PrincOpsUtils.LongCopy[ to: to.base, from: pageStream.pagePtr+wordInPage, nwords: wordsThisCopy]; to.base _ to.base+wordsThisCopy; }; wordsRead _ wordsRead+wordsThisCopy; <> IF wordsThisCopy # to.length THEN { < wordsThisCopy = wordsThisBlock; go read next log block.>> to.length _ to.length-wordsThisCopy; IF NOT hasContinuation THEN GOTO sourceExhausted; IF AdvancePageStream[].endOfLog THEN GOTO truncatedRecord; wordInPage _ 0; GOTO nextBlock; } ELSE IF to.rest # NIL THEN { <> to _ to.rest^; wordsThisBlock _ wordsThisBlock-wordsThisCopy; wordInPage _ wordInPage + wordsThisCopy; } ELSE { <> SetPageStreamPosFromSavedPos[]; RETURN [ IF (NOT hasContinuation AND wordsThisBlock = wordsThisCopy) THEN normal ELSE destinationFull, wordsRead] } REPEAT nextBlock => NULL; ENDLOOP; ENDLOOP; EXITS sourceExhausted => { SetPageStreamPosFromSavedPos[]; RETURN [sourceExhausted, wordsRead] }; truncatedRecord => { SetPageStreamPosFromSavedPos[]; ERROR CallerProgrammingError; }; }; GetCurrentPageAndVersion: PUBLIC PROC [] RETURNS [PageNumber, LogRep.PageVersion] = { RETURN[CurrentPageNumber[], pageStream.expectedVersion]; }; AdvanceRecordStream: PUBLIC PROC [] RETURNS [endOfLog, truncatedRecord: BOOL, currentRecord: RecordID] = { <> <> <> wordsInRecord: CARDINAL _ 0; endOfLog _ truncatedRecord _ FALSE; { -- block for EXITS wordInPage: CARDINAL _ LogInline.WordInPageFromRecordID[recordStream.id]; ForgetSavedPageStreamPos[]; WHILE Header[pageStream.pagePtr+wordInPage].hasContinuation DO wordsInRecord _ wordsInRecord + Header[pageStream.pagePtr+wordInPage].nWords; wordInPage _ 0; IF AdvancePageStream[].endOfLog THEN { endOfLog _ truncatedRecord _ TRUE; GOTO return }; IF NOT Header[pageStream.pagePtr].isContinuation THEN { truncatedRecord _ TRUE; GOTO return }; ENDLOOP; { wordsInLastBlock: CARDINAL _ Header[pageStream.pagePtr+wordInPage].nWords; wordInPage _ wordInPage + wordsInLastBlock; wordsInRecord _ wordsInRecord + wordsInLastBlock }; IF wordInPage < wordsPerPage AND Header[pageStream.pagePtr+wordInPage].nWords = 0 THEN { wordsInRecord _ wordsInRecord + (wordsPerPage-wordInPage); wordInPage _ wordsPerPage; }; IF wordInPage = wordsPerPage AND AdvancePageStream[].endOfLog THEN { endOfLog _ TRUE; }; GOTO return EXITS return => { SavePageStreamPos[]; recordStream.id _ LogInline.AddC[recordStream.id, wordsInRecord]; RETURN[endOfLog, truncatedRecord, recordStream.id] }; }}; <> <> nTransitionPages: INT = 10 * logChunkSize; LocateFirstRecord: PUBLIC PROC [] RETURNS [firstRecord: WordNumber] = { notStartOfRecord: BOOL; endOfLog: BOOL _ FALSE; [notStartOfRecord: notStartOfRecord] _ OpenRecordStreamFromWord[ AlpineInline.WordsFromPages[LocateStartingPage[]]]; IF notStartOfRecord THEN [endOfLog: endOfLog] _ AdvanceRecordStream[]; IF endOfLog THEN ERROR InternalProgrammingError; firstRecord _ CurrentWordNumber[]; CloseRecordStream[]; RETURN [firstRecord]; }; LocateStartingPage: PROC [] RETURNS [PageNumber] = { <> SkipAhead: PROC [] = { <> <> nextPage: PageNumber = CurrentPageNumber[] + nTransitionPages; ClosePageStream[]; IF OpenPageStream[nextPage].endOfLog THEN ERROR InternalProgrammingError; }; startPage, beginPage: PageNumber; { -- Read to find a page in a "clean" area. IF OpenPageStream[0].endOfLog THEN GOTO skip; THROUGH [0 .. nTransitionPages) DO IF AdvancePageStream[].endOfLog THEN GOTO skip; ENDLOOP; EXITS skip => SkipAhead[]; }; startPage _ CurrentPageNumber[]; <> DO IF AdvancePageStream[].endOfLog THEN EXIT ENDLOOP; <> SkipAhead[]; beginPage _ CurrentPageNumber[]; <> UNTIL CurrentPageNumber[] = startPage DO IF AdvancePageStream[].endOfLog THEN ERROR InternalProgrammingError; ENDLOOP; ClosePageStream[]; RETURN [beginPage]; }; <> Get: PUBLIC PROC [thisRecord: AlpineLog.RecordID, to: AlpineLog.Block] RETURNS [status: AlpineLog.ReadProcStatus, wordsRead: CARDINAL] = { <> < ReleaseChunk[]>> chunk: FilePageMgr.VMPageSet; pagePtr: LONG POINTER; pagesLeftInChunk: CARDINAL; GetChunk: PROC [nextPage: PageNumber] = { chunk _ FilePageMgr.ReadLogPages[ fileHandle: logFileHandle, pageRun: PageRunStartingWith[nextPage]]; pagePtr _ chunk.pages; pagesLeftInChunk _ chunk.pageRun.count - 1; }; ReleaseChunk: PROC [] = INLINE { FilePageMgr.ReleaseVMPageSet[vMPageSet: chunk, releaseState: clean, keep: FALSE]; }; AdvancePage: PROC [] = INLINE { IF pagesLeftInChunk = 0 THEN { nextPage: PageNumber _ PageFollowing[chunk.pageRun]; ReleaseChunk[]; IF nextPage = 0 THEN pageVersion _ 1-pageVersion; GetChunk[nextPage]; } ELSE { pagePtr _ pagePtr + AlpineEnvironment.wordsPerPage; pagesLeftInChunk _ pagesLeftInChunk - 1; }; }; isFirstPage: BOOL _ TRUE; pageVersion: LogRep.PageVersion _ 0; wordInPage: CARDINAL; Verify: PROC [] = { <> <> w: CARDINAL _ 0; UNTIL w = wordInPage DO w _ w + Header[pagePtr+w].nWords; IF w > wordInPage THEN ERROR InternalProgrammingError; ENDLOOP; }; { firstPage: PageNumber; [page: firstPage, wordInPage: wordInPage] _ AlpineInline.DecomposeWords[ WordNumberFromRecordID[thisRecord]]; GetChunk[firstPage]; }; Verify[]; wordsRead _ 0; DO <> wordsThisBlock, wordsThisCopy: CARDINAL; hasContinuation, isContinuation: BOOL; version: LogRep.PageVersion; <> IF NOT Header[pagePtr].valid THEN VerifyGet[ chunk.pageRun.firstPage+(chunk.pageRun.count-(1+pagesLeftInChunk)), pagePtr]; <> [version: version, hasContinuation: hasContinuation, isContinuation: isContinuation, nWords: wordsThisBlock] _ Header[pagePtr+wordInPage]^; wordsThisBlock _ wordsThisBlock-LogRep.Header.SIZE; wordInPage _ wordInPage+LogRep.Header.SIZE; IF isFirstPage THEN { IF isContinuation THEN ERROR InternalProgrammingError; pageVersion _ version; isFirstPage _ FALSE } ELSE { IF NOT isContinuation OR (version # pageVersion) THEN ERROR InternalProgrammingError }; DO <> wordsThisCopy _ MIN[to.length, wordsThisBlock]; IF to.base # NIL THEN { PrincOpsUtils.LongCopy[ to: to.base, from: pagePtr+wordInPage, nwords: wordsThisCopy]; to.base _ to.base+wordsThisCopy; }; wordsRead _ wordsRead+wordsThisCopy; <> IF wordsThisCopy # to.length THEN { < wordsThisCopy = wordsThisBlock; go read next log block.>> to.length _ to.length-wordsThisCopy; IF NOT hasContinuation THEN { ReleaseChunk[]; RETURN[sourceExhausted, wordsRead] }; AdvancePage[]; wordInPage _ 0; GOTO nextBlock; } ELSE IF to.rest # NIL THEN { <> to _ to.rest^; wordsThisBlock _ wordsThisBlock-wordsThisCopy; wordInPage _ wordInPage + wordsThisCopy; } ELSE { <> ReleaseChunk[]; RETURN [ IF (NOT hasContinuation AND wordsThisBlock = wordsThisCopy) THEN normal ELSE destinationFull, wordsRead] } REPEAT nextBlock => NULL; ENDLOOP; ENDLOOP; }; END. OPEN PROBLEMS Need a way to get the log chunk size from FilePageMgr (see logChunkSize constant above). CHANGE LOG Created by MBrown on June 15, 1982 3:38 pm <> Changed by MBrown on September 10, 1982 9:45 pm <> <> Changed by MBrown on September 25, 1982 3:27 pm <> Changed by MBrown on November 16, 1982 10:10 pm <> Changed by MBrown on April 3, 1983 4:17 pm <> <> Changed by MBrown on June 27, 1983 9:56 am <> <> Changed by MBrown on January 30, 1984 10:18:19 am PST <> <> <> <> <> <> <> <> <> <<>>