<> <> <> <> <> <<>> <> DIRECTORY AlpineEnvironment USING [VolumeID], AlpFile USING [LockOption, UnlockFile, Handle, Close], AlpineFile USING [Unknown], AlpineFS USING [OpenFile, FileOptions, StreamOptions, GetAlpFileHandle, Open, OpenFileFromStream, SetLockOption, StreamFromOpenFile, StreamFromOpenStream], AlpInstance USING [Failed, FileStore, Handle, Create], AlpTransaction USING [Handle, OperationFailed, Outcome, Create, Finish, GetNextVolumeGroup, UnlockOwnerDB, VolumeGroupID], BasicTime USING [GMT, nullGMT, Now], Convert USING [IntFromRope, RopeFromInt], DB USING [Aborted, Error, Failure, AbortCache, DeclareSegment, EndTransaction, EraseSegment, FlushCache, GetSegmentInfo, Initialize, OpenTransaction], DBEnvironment USING [Error, Failure], FS USING [ComponentPositions, Error, nullOpenFile, Close, GetInfo, ExpandName, OpenFile, PagesForBytes, SetPageCount, StreamBufferParms, SetByteCountAndCreatedTime], IO, Rope, RPC USING [CallFailed], WalnutDefs USING [Error, Segment], WalnutKernelDefs USING [LogInfoFromRoot, RootEntry, SegmentID, WhichTempLog], WalnutRoot USING [], WalnutStream USING [FlushStream, SetHighWaterMark]; WalnutRootImpl: CEDAR MONITOR IMPORTS AlpFile, AlpineFile, AlpineFS, AlpInstance, AlpTransaction, BasicTime, Convert, DB, DBEnvironment, FS, IO, Rope, RPC, WalnutDefs, WalnutStream EXPORTS WalnutRoot = BEGIN <<>> <> <<>> <> <<>> <> GMT: TYPE = BasicTime.GMT; ROPE: TYPE = Rope.ROPE; STREAM: TYPE = IO.STREAM; WhichTempLog: TYPE = WalnutKernelDefs.WhichTempLog; TransInfo: TYPE = RECORD [ fileStore: AlpInstance.FileStore, handle: AlpInstance.Handle]; nullTransInfo: TransInfo _ [NIL, NIL]; InternalLogInfo: TYPE = REF InternalLogInfoObject; InternalLogInfoObject: TYPE = RECORD [ readStream: STREAM, -- not used for expungeLog writeStream: STREAM, name: ROPE _ NIL, -- rope name of this log internalFileID: INT, logSeqNo: INT _ -1, logSeqNoPos: INT _ -1 -- position in rootLog of logSeqNo ]; MiscLogInfo: TYPE = REF MiscLogInfoObject; MiscLogInfoObject: TYPE = RECORD [ stream: STREAM, name: ROPE _ NIL, trans: AlpTransaction.Handle _ NIL, rootOpenFile: FS.OpenFile _ FS.nullOpenFile -- may have its own transaction ]; RootLog: TYPE = REF RootLogObject; RootLogObject: TYPE = RECORD [ name: ROPE, createDate: GMT _ BasicTime.nullGMT, readOnly: BOOL _ FALSE, keyFromRootFile: ROPE _ NIL, mailForFromRootFile: ROPE _ NIL, dbNameFromRootFile: ROPE _ NIL, segmentID: WalnutKernelDefs.SegmentID _ defaultSegmentID, transInfo: TransInfo _ nullTransInfo, alpTH: AlpTransaction.Handle _ NIL, -- current transaction rootOpenFileForLogs: FS.OpenFile _ FS.nullOpenFile, currentLog: InternalLogInfo _ NIL, -- only one log for now expungeLog: InternalLogInfo _ NIL, newMailLog: MiscLogInfo _ NIL, readArchiveLog: MiscLogInfo _ NIL ]; TransID: TYPE = MACHINE DEPENDENT RECORD [ -- taken from ConcreteTransID randomBits: INT, idOnFileStore: INT, fileStore: AlpineEnvironment.VolumeID ]; defaultSegmentID: WalnutKernelDefs.SegmentID _ [$Walnut, 0]; <<>> <> <<>> rootLog: RootLog _ NIL; statsProc: PROC[ROPE] _ NIL; seriousDebugging: BOOL _ FALSE; BogusDBTrans: SIGNAL = CODE; currentVolumeGroupID: AlpTransaction.VolumeGroupID; <> <> <> fileOptions: AlpineFS.FileOptions = [ updateCreateTime: FALSE, -- is done explicitly referencePattern: sequential, recoveryOption: $log, finishTransOnClose: FALSE]; <> rootBufferOption: FS.StreamBufferParms = [vmPagesPerBuffer: 4, nBuffers: 1]; alpineStreamOptions: AlpineFS.StreamOptions _ [ tiogaRead: FALSE, commitAndReopenTransOnFlush: FALSE, -- just flush truncatePagesOnClose: FALSE, finishTransOnClose: FALSE, closeFSOpenFileOnClose: TRUE -- Note: this option is changed for opening temp logs when doing copies, so that we can Unlock the file before closing it ]; <<>> <<>> Open: PUBLIC PROC[ rootName: ROPE, readOnly: BOOL _ FALSE, newSegmentOk: BOOL _ FALSE] RETURNS[ key, mailFor: ROPE, rootFileCreateDate: GMT, segment: WalnutDefs.Segment, isReadOnly: BOOL] = { newMailLogName, readArchiveLogName: ROPE; logInfoList: LIST OF WalnutKernelDefs.LogInfoFromRoot; fileName, dbName: ROPE; isReadOnly _ FALSE; IF rootLog # NIL THEN ERROR WalnutDefs.Error[$root, $RootAlreadyOpen, "Must do a Close"]; IF seriousDebugging THEN { trans: AlpTransaction.Handle _ NARROW[DB.GetSegmentInfo[$Walnut].trans]; IF trans # NIL THEN SIGNAL BogusDBTrans; DB.AbortCache[trans]; }; BEGIN logInfoCount: INT _ 0; reason: ATOM; openFile: FS.OpenFile; rStream: STREAM; rootLog _ NEW[RootLogObject _ [name: rootName, readOnly: readOnly]]; rootLog.transInfo _ [fileStore: FileStoreForRoot[rootName]]; rootLog.alpTH _ CreateTrans[]; currentVolumeGroupID _ AlpTransaction.GetNextVolumeGroup[rootLog.alpTH]; <> BEGIN ENABLE FS.Error => { reason _ error.code; GOTO error }; openFile _ AlpineFS.Open[ name: rootName, access: $write, lock: [$write, $wait], options: fileOptions, transHandle: rootLog.alpTH]; FS.Close[openFile]; EXITS error => IF reason = $unknownFile THEN ERROR WalnutDefs.Error[$root, $RootNotFound, rootName] ELSE IF reason = $accessDenied THEN isReadOnly _ TRUE ELSE ERROR WalnutDefs.Error[$root, reason, rootName]; END; rootLog.readOnly _ isReadOnly; BEGIN ENABLE FS.Error => { reason _ error.code; GOTO error }; openFile _ AlpineFS.Open[ name: rootName, access: $read, lock: [$read, $wait], options: fileOptions, transHandle: rootLog.alpTH]; rStream _ AlpineFS.StreamFromOpenFile[ openFile: openFile, streamOptions: alpineStreamOptions, streamBufferParms: streamBufferOption]; rootFileCreateDate _ rootLog.createDate _ FS.GetInfo[openFile].created; EXITS error => ERROR WalnutDefs.Error[$root, reason, rootName]; END; <> BEGIN ENABLE IO.EndOfStream, IO.Error => GOTO cantParse; finished: BOOL _ FALSE; rStream.SetIndex[0]; DO curPos: INT _ rStream.GetIndex[]; rootEntry: WalnutKernelDefs.RootEntry _ NextRootEntry[rStream]; DO IF rootEntry.ident.Equal["Key", FALSE] THEN { key _ rootEntry.value; EXIT }; IF rootEntry.ident.Equal["MailFor", FALSE] THEN { mailFor _ rootEntry.value; EXIT }; IF rootEntry.ident.Equal["Database", FALSE] THEN { dbName _ rootEntry.value; EXIT }; IF rootEntry.ident.Equal["NewMailLog", FALSE] THEN { newMailLogName _ rootEntry.value; EXIT }; IF rootEntry.ident.Equal["ReadArchiveLog", FALSE] THEN { readArchiveLogName _ rootEntry.value; EXIT }; IF rootEntry.ident.Equal["LogInfo", FALSE] THEN { logInfoList _ CONS[rootEntry.info, logInfoList]; logInfoCount _ logInfoCount + 1; EXIT }; IF rootEntry.ident.Equal["End", FALSE] THEN { finished _ TRUE; EXIT }; rStream.Close[ ! IO.Error, FS.Error => CONTINUE]; FinishTrans[rootLog.alpTH, FALSE, FALSE ! WalnutDefs.Error => CONTINUE]; rootLog _ NIL; ERROR WalnutDefs.Error[$log, $BadRootFile, IO.PutFR["Unknown entry %g in RootFile at pos %g", IO.rope[rootEntry.ident], IO.int[curPos]]]; ENDLOOP; IF finished THEN EXIT; ENDLOOP; EXITS cantParse => { rStream.Close[ ! IO.Error, FS.Error => CONTINUE]; FinishTrans[rootLog.alpTH, FALSE, FALSE ! WalnutDefs.Error => CONTINUE]; rootLog _ NIL; ERROR WalnutDefs.Error[$log, $BadRootFile, "Couldn't parse root file"]; }; END; rStream.Close[]; rStream_ NIL; -- not needed IF key.Length[] = 0 OR dbName.Length[] = 0 OR mailFor.Length[] = 0 OR newMailLogName.Length[] = 0 OR readArchiveLogName.Length[] = 0 OR logInfoCount # 2 THEN ERROR WalnutDefs.Error[ $root, $BadRootFile, "Incomplete RootFile - missing entries"]; <> BEGIN cp: FS.ComponentPositions; fn: ROPE; rServer, oServer: ROPE; CheckServer: PROC[name: ROPE] = { [fn, cp, ] _ FS.ExpandName[name]; oServer _ Rope.Substr[fn, cp.server.start, cp.server.length]; IF ~Rope.Equal[rServer, oServer, FALSE] THEN { rName: ROPE _ rootLog.name; FS.Close[openFile ! FS.Error => CONTINUE]; FinishTrans[rootLog.alpTH, FALSE, FALSE ! WalnutDefs.Error => CONTINUE]; rootLog _ NIL; ERROR WalnutDefs.Error[$root, $BadRootFile, IO.PutFR["server for file %g does not agree with that for %g", IO.rope[name], IO.rope[rName]] ]; }; }; [fn, cp, ] _ FS.ExpandName[rootLog.name]; rServer _ Rope.Substr[fn, cp.server.start, cp.server.length]; CheckServer[dbName]; CheckServer[newMailLogName]; CheckServer[readArchiveLogName]; CheckServer[logInfoList.first.fileName]; CheckServer[logInfoList.rest.first.fileName]; END; rootLog.keyFromRootFile _ key; rootLog.mailForFromRootFile _ mailFor; rootLog.dbNameFromRootFile _ dbName; FS.Close[openFile ! FS.Error => CONTINUE]; FinishTrans[rootLog.alpTH, FALSE, FALSE ! WalnutDefs.Error => CONTINUE]; rootLog.alpTH _ NIL; END; <> FOR lil: LIST OF WalnutKernelDefs.LogInfoFromRoot _ logInfoList, lil.rest UNTIL lil = NIL DO seqNo: ROPE _ lil.first.logSeqNo; fileName _ lil.first.fileName; IF seqNo.Fetch[0] # 'E THEN { -- this is the current log rootLog.currentLog _ NEW[InternalLogInfoObject _ [ name: fileName, internalFileID: lil.first.internalFileID, logSeqNo: Convert.IntFromRope[seqNo], logSeqNoPos: lil.first.seqNoPos ]]; } ELSE -- this is the expunge log rootLog.expungeLog _ NEW[InternalLogInfoObject _ [ name: fileName, internalFileID: lil.first.internalFileID, logSeqNo: -1, logSeqNoPos: lil.first.seqNoPos ]]; ENDLOOP; IF ~isReadOnly THEN { rootLog.newMailLog _ NEW[MiscLogInfoObject _ [name: newMailLogName]]; rootLog.readArchiveLog _ NEW[MiscLogInfoObject _ [name: readArchiveLogName]]; } ELSE rootLog.expungeLog _ NIL; -- is readOnly, can't have expungeLog segment _ rootLog.segmentID.segment; DeclareDBSegment[rootLog, newSegmentOk]; }; StartTransaction: PUBLIC PROC = { IF rootLog.alpTH # NIL THEN { BEGIN IF seriousDebugging THEN CheckDBTrans["StartTransaction"]; DB.EndTransaction[rootLog.alpTH ! DB.Aborted, DB.Error, DB.Failure => GOTO err]; EXITS err => DB.AbortCache[rootLog.alpTH ! DB.Aborted, DB.Error, DB.Failure => CONTINUE]; END; FS.Close[rootLog.rootOpenFileForLogs ! FS.Error => CONTINUE]; FinishTrans[alpTH: rootLog.alpTH, abort: FALSE, continue: FALSE ! WalnutDefs.Error => CONTINUE]; }; [rootLog.rootOpenFileForLogs, rootLog.alpTH] _ GetRootReadLock[]; OpenDBTrans[rootLog]; }; Shutdown: PUBLIC PROC = { IF rootLog = NIL THEN RETURN; CloseInternalLog[rootLog.currentLog]; CloseInternalLog[rootLog.expungeLog]; CloseMiscLog[mli: rootLog.readArchiveLog]; CloseMiscLog[mli: rootLog.newMailLog, abortIfExternal: TRUE]; IF seriousDebugging THEN CheckDBTrans["Shutdown"]; DB.EndTransaction[rootLog.alpTH ! DB.Aborted, DB.Error, DB.Failure => CONTINUE]; <<>> <> FS.Close[rootLog.rootOpenFileForLogs ! FS.Error => CONTINUE]; FinishTrans[alpTH: rootLog.alpTH, abort: FALSE, continue: FALSE ! WalnutDefs.Error => CONTINUE]; rootLog_ NIL; }; CloseTransaction: PUBLIC PROC = { IF rootLog = NIL THEN RETURN; CloseInternalLog[rootLog.currentLog]; CloseInternalLog[rootLog.expungeLog]; CloseMiscLog[rootLog.readArchiveLog]; IF seriousDebugging THEN CheckDBTrans["CloseTransaction"]; BEGIN DB.EndTransaction[rootLog.alpTH ! DB.Aborted, DB.Error, DB.Failure => GOTO err]; EXITS err => DB.AbortCache[rootLog.alpTH ! DB.Aborted, DB.Error, DB.Failure => CONTINUE]; END; FS.Close[rootLog.rootOpenFileForLogs ! FS.Error => CONTINUE]; rootLog.rootOpenFileForLogs _ FS.nullOpenFile; FinishTrans[rootLog.alpTH, FALSE, FALSE ! WalnutDefs.Error => CONTINUE]; rootLog.alpTH _ NIL; }; CommitAndContinue: PUBLIC PROC[flushDB: BOOL _ TRUE] = { eCode: ATOM; BEGIN ENABLE BEGIN DB.Aborted => {eCode _ $TransactionAbort; GOTO error }; DB.Error => {eCode _ $DBError; GOTO error }; DB.Failure => {eCode _ $DatabaseInaccessible; GOTO error }; END; IF seriousDebugging THEN CheckDBTrans["CommitAndContinue"]; IF flushDB THEN DB.FlushCache[rootLog.alpTH]; EXITS error => ERROR WalnutDefs.Error[$db, eCode, "Can't flush"]; END; FinishTrans[alpTH: rootLog.alpTH, abort: FALSE, continue: TRUE]; AlpTransaction.UnlockOwnerDB[rootLog.alpTH, currentVolumeGroupID] }; AbortTransaction: PUBLIC PROC = { IF rootLog = NIL THEN RETURN; IF rootLog.alpTH = NIL THEN { IF seriousDebugging THEN StatsReport["\nAbort of NIL trans"]; RETURN }; CloseInternalLog[rootLog.currentLog]; CloseInternalLog[rootLog.expungeLog]; CloseMiscLog[rootLog.readArchiveLog]; IF seriousDebugging THEN CheckDBTrans["AbortTransaction"]; DB.AbortCache[rootLog.alpTH]; FinishTrans[alpTH: rootLog.alpTH, abort: TRUE, continue: FALSE]; rootLog.alpTH _ NIL; }; EraseDB: PUBLIC PROC = { code: ATOM; IF rootLog = NIL THEN ERROR WalnutDefs.Error[$root, $RootNotOpen, "Trying to do an EraseDB"]; IF seriousDebugging THEN CheckDBTrans["EraseDB"]; DB.EndTransaction[rootLog.alpTH ! DB.Aborted, DB.Error, DB.Failure => CONTINUE]; DB.DeclareSegment[ filePath: rootLog.dbNameFromRootFile, segment: rootLog.segmentID.segment, number: rootLog.segmentID.index, readonly: FALSE, nPagesInitial: 256, nPagesPerExtent: 256]; BEGIN ENABLE BEGIN DB.Error, DB.Failure => { code _ $DBError; GOTO error}; DB.Failure => { code _ $DatabaseInaccessible; GOTO error}; DB.Aborted => { code _ $TransactionAbort; GOTO error}; END; DB.EraseSegment[rootLog.segmentID.segment, rootLog.alpTH]; -- leaves trans open EXITS error => { DB.AbortCache[rootLog.alpTH]; ERROR WalnutDefs.Error[$db, code, "Can't erase"]; }; END; }; FlushAndContinue: PUBLIC PROC[strm: STREAM] = { mli: MiscLogInfo; IF strm # (mli _ rootLog.newMailLog).stream THEN ERROR WalnutDefs.Error[$root, $UnknownStream]; WalnutStream.FlushStream[strm: strm, setCreateDate: TRUE]; FinishTrans[alpTH: mli.trans, abort: FALSE, continue: TRUE] }; GetRootFileStamp: PUBLIC PROC RETURNS[rootFileStamp: GMT] = { reason: ATOM; BEGIN ENABLE FS.Error => { reason _ error.code; GOTO error }; of: FS.OpenFile _ AlpineFS.Open[ name: rootLog.name, options: fileOptions, lock: [$read, fail], transHandle: rootLog.alpTH]; rootFileStamp _ FS.GetInfo[of].created; FS.Close[of]; EXITS error => ERROR WalnutDefs.Error[$root, reason, rootLog.name]; END; }; GetExpungeID: PUBLIC PROC RETURNS[expungeFileID: INT] = { RETURN[rootLog.expungeLog.internalFileID] }; <> AcquireWriteLock: PUBLIC PROC = { of: AlpineFS.OpenFile _ AlpineFS.OpenFileFromStream[rootLog.currentLog.writeStream]; AlpineFS.SetLockOption[of, [$write, $wait] ] }; GetCurrentLogStreams: PUBLIC PROC RETURNS[readStream, writeStream: STREAM] = { IF rootLog = NIL OR rootLog.alpTH = NIL THEN ERROR WalnutDefs.Error[$log, $TransNotOpen, "Must do Shutdown and Startup"]; IF rootLog.readOnly THEN rootLog.currentLog.readStream _ StreamOpen[name: rootLog.currentLog.name, readOnly: TRUE, trans: rootLog.alpTH] ELSE { rootLog.currentLog.writeStream _ StreamOpen[name: rootLog.currentLog.name, trans: rootLog.alpTH]; rootLog.currentLog.readStream _ AlpineFS.StreamFromOpenStream[rootLog.currentLog.writeStream]; }; RETURN[rootLog.currentLog.readStream, rootLog.currentLog.writeStream] }; ReleaseWriteLock: PUBLIC PROC RETURNS[readStream, writeStream: STREAM] = { <<-- uses same trans as before>> CloseInternalLog[rootLog.currentLog]; rootLog.currentLog.writeStream _ StreamOpen[name: rootLog.currentLog.name, trans: rootLog.alpTH]; rootLog.currentLog.readStream _ AlpineFS.StreamFromOpenStream[rootLog.currentLog.writeStream]; RETURN[rootLog.currentLog.readStream, rootLog.currentLog.writeStream]; }; ReturnCurrentLogStreams: PUBLIC PROC = { IF rootLog = NIL THEN RETURN; CloseInternalLog[rootLog.currentLog]; CloseInternalLog[rootLog.expungeLog]; FinishTrans[rootLog.alpTH, FALSE, TRUE]; }; <> GetStreamsForExpunge: PUBLIC PROC RETURNS[ currentStream, expungeStream: STREAM, keyIs: ROPE, expungeFileID, logSeqNo: INT] = { <<-- assert: have a write lock on the currentLog, the expungeLog is not open>> IF rootLog.expungeLog.writeStream # NIL THEN rootLog.expungeLog.writeStream.Close[ ! IO.Error, FS.Error => CONTINUE]; rootLog.expungeLog.writeStream _ NIL; rootLog.expungeLog.writeStream _ StreamOpen[ name: rootLog.expungeLog.name, trans: rootLog.alpTH, exclusive: TRUE]; RETURN[ rootLog.currentLog.readStream, rootLog.expungeLog.writeStream, rootLog.keyFromRootFile, rootLog.expungeLog.internalFileID, rootLog.currentLog.logSeqNo] -- *****NOTE FIRST AND LAST VALUES }; SwapLogs: PUBLIC PROC[expungeFileID: INT, newRootStamp: GMT] RETURNS[newLogLen: INT] = { <> tempLog: InternalLogInfo; reason: ATOM; IF rootLog.currentLog.internalFileID = expungeFileID THEN -- already did swap RETURN[rootLog.currentLog.writeStream.GetLength[]]; IF rootLog.expungeLog.internalFileID # expungeFileID THEN ERROR WalnutDefs.Error[$expungeLog, $WrongFileID, IO.PutFR["FileID is: %g, FileID expected is: %g", IO.int[rootLog.expungeLog.internalFileID], IO.int[expungeFileID]] ]; IF rootLog.expungeLog.writeStream = NIL THEN ERROR WalnutDefs.Error[$expungeLog, $LogNotOpen, "Doing SwapLogs"]; BEGIN ENABLE FS.Error => { reason _ error.code; GOTO error }; of: FS.OpenFile _ AlpineFS.Open[ name: rootLog.name, wantedCreatedTime: rootLog.createDate, access: $write, lock: [$write, $wait], transHandle: rootLog.alpTH]; rStream: STREAM _ AlpineFS.StreamFromOpenFile[ openFile: of, accessRights: $write, streamOptions: alpineStreamOptions, streamBufferParms: streamBufferOption]; rStream.SetIndex[rootLog.currentLog.logSeqNoPos]; rStream.PutRope["Expunge"]; rStream.SetIndex[rootLog.expungeLog.logSeqNoPos]; rStream.PutRope[IO.PutFR["%07d", IO.int[rootLog.currentLog.logSeqNo]]]; FS.SetByteCountAndCreatedTime[of, -1, newRootStamp]; WalnutStream.SetHighWaterMark[rootLog.currentLog.writeStream, 0, 0]; newLogLen _ rootLog.expungeLog.writeStream.GetLength[]; rStream.Close[]; -- commits rootFile and other streams rootLog.createDate _ newRootStamp; CloseInternalLog[rootLog.currentLog]; -- release the currentLog streams CloseInternalLog[rootLog.expungeLog]; -- will want only read Lock -- swap the rootLog variables currentLog & expungeLog tempLog _ rootLog.currentLog; rootLog.currentLog _ rootLog.expungeLog; rootLog.expungeLog _ tempLog; rootLog.currentLog.logSeqNo _ rootLog.expungeLog.logSeqNo; rootLog.expungeLog.logSeqNo _ -1; EXITS error => ERROR WalnutDefs.Error[$root, reason, "During SwapLogs"]; END; }; ReturnExpungeStreams: PUBLIC PROC = { IF rootLog.expungeLog.writeStream # NIL THEN { rootLog.expungeLog.writeStream.Close[ ! FS.Error, IO.Error => CONTINUE]; rootLog.expungeLog.writeStream _ NIL; }; }; <> PrepareToCopyTempLog: PUBLIC PROC[ which: WhichTempLog, pagesAlreadyCopied: INT, reportProc: PROC[msg1, msg2, msg3: ROPE _ NIL]] RETURNS[currentStream, tempStream: STREAM] = { <> mli: MiscLogInfo _ OpenTempStream[which]; tempLogLen, pagesNeeded, actualPages, currentInUsePages: INT; of: FS.OpenFile; currentStream _ rootLog.currentLog.writeStream; IF mli.stream = NIL THEN RETURN[currentStream, NIL]; currentInUsePages _ FS.PagesForBytes[rootLog.currentLog.writeStream.GetLength[]]; tempLogLen _ mli.stream.GetLength[]; <> pagesNeeded _ FS.PagesForBytes[tempLogLen+200] + currentInUsePages - pagesAlreadyCopied; actualPages _ FS.GetInfo[of_ AlpineFS.OpenFileFromStream[rootLog.currentLog.writeStream]].pages; BEGIN IF actualPages < pagesNeeded THEN { FS.SetPageCount[of, pagesNeeded ! FS.Error => IF error.code = $quotaExceeded THEN { reportProc["\n"]; reportProc[error.explanation]; reportProc["\n"]; GOTO quota } ELSE REJECT]; <> BEGIN streamTrans: AlpTransaction.Handle = mli.trans; FinishTrans[alpTH: streamTrans, abort: FALSE, continue: TRUE]; AlpTransaction.UnlockOwnerDB[streamTrans, currentVolumeGroupID]; END } EXITS quota => { mli.stream.Close[ ! IO.Error, FS.Error => CONTINUE]; mli.stream _ NIL; RETURN[currentStream, NIL] }; END; RETURN[currentStream, mli.stream]; }; GetStreamsForCopy: PUBLIC PROC[which: WhichTempLog] RETURNS[currentStream, tempStream: STREAM] = { tempStream _ OpenTempStream[which].stream; RETURN[rootLog.currentLog.writeStream, tempStream]; }; <<>> FinishCopy: PUBLIC PROC[which: WhichTempLog] = { <> <> mli: MiscLogInfo; pages: INT; fileHandle: FS.OpenFile; SELECT which FROM newMail => { mli _ rootLog.newMailLog; pages _ 200 }; readArchive => { mli _ rootLog.readArchiveLog; pages _ 1 }; ENDCASE => ERROR WalnutDefs.Error[$log, $UnknownLogFileType]; IF mli.stream = NIL THEN mli _ OpenTempStream[which]; fileHandle _ AlpineFS.OpenFileFromStream[mli.stream]; WalnutStream.SetHighWaterMark[strm: mli.stream, hwmBytes: 0, numPages: pages, setCreateDate: TRUE]; mli.stream.Close[ ! FS.Error, IO.Error => CONTINUE]; CommitAndContinue[]; AlpFile.UnlockFile[AlpineFS.GetAlpFileHandle[fileHandle]]; -- be careful to throw away the file lock FS.Close[fileHandle]; mli.stream _ NIL; }; AbortTempCopy: PUBLIC PROC[which: WhichTempLog, strm: STREAM] = { mli: MiscLogInfo; fileHandle: AlpFile.Handle; closeError: BOOL _ FALSE; SELECT which FROM newMail => mli _ rootLog.newMailLog; readArchive => mli _ rootLog.readArchiveLog; ENDCASE => ERROR WalnutDefs.Error[$log, $UnknownLogFileType]; IF mli.stream = NIL THEN RETURN; IF mli.stream # strm THEN ERROR WalnutDefs.Error[$root, $UnknownStreamReturned]; fileHandle _ AlpineFS.GetAlpFileHandle[file: AlpineFS.OpenFileFromStream[strm]]; strm.Close[ ! FS.Error, IO.Error => { closeError _ TRUE; CONTINUE} ]; IF ~closeError THEN AlpFile.Close[fileHandle ! AlpineFile.Unknown => CONTINUE]; mli.stream _ NIL; }; <> GetNewMailStream: PUBLIC PROC[lengthRequired: INT, pagesWanted: INT] RETURNS[strm: STREAM, actualLen: INT] = { mStream: STREAM; CheckRootStamp[]; IF rootLog.newMailLog.stream # NIL THEN RETURN[NIL, -1]; IF seriousDebugging THEN StatsReport["\n### Trans for newMailLog"]; [rootLog.newMailLog.rootOpenFile, rootLog.newMailLog.trans] _ GetRootReadLock[]; mStream _ rootLog.newMailLog.stream _ StreamOpen[ name: rootLog.newMailLog.name, trans: rootLog.newMailLog.trans, exclusive: TRUE]; IF (actualLen _ mStream.GetLength[]) < lengthRequired THEN { CloseMiscLog[rootLog.newMailLog]; RETURN[NIL, actualLen]; }; IF actualLen > lengthRequired THEN { WalnutStream.SetHighWaterMark[mStream, lengthRequired, pagesWanted, TRUE]; mStream.Flush[]; FinishTrans[alpTH: rootLog.newMailLog.trans, abort: FALSE, continue: TRUE]; }; RETURN[mStream, actualLen] }; GetReadArchiveStream: PUBLIC PROC[pagesWanted: INT _ -1] RETURNS[STREAM] = { CheckRootStamp[]; IF rootLog.readArchiveLog.stream # NIL THEN RETURN[NIL]; rootLog.readArchiveLog.stream _ StreamOpen[ name: rootLog.readArchiveLog.name, trans: rootLog.alpTH, pages: pagesWanted, exclusive: TRUE]; RETURN[rootLog.readArchiveLog.stream] }; KeyIs: PUBLIC PROC RETURNS[ROPE] = { CheckRootStamp[]; RETURN[rootLog.keyFromRootFile] }; Mailfor: PUBLIC PROC RETURNS[ROPE] = { CheckRootStamp[]; RETURN[rootLog.mailForFromRootFile] }; DBName: PUBLIC PROC RETURNS[ROPE] = { CheckRootStamp[]; RETURN[rootLog.dbNameFromRootFile] }; ReturnNewMailStream: PUBLIC PROC[newMailStream: STREAM] = { IF newMailStream # rootLog.newMailLog.stream THEN RETURN; rootLog.newMailLog.stream _ NIL; newMailStream.Close[ ! FS.Error, IO.Error => CONTINUE]; IF rootLog.newMailLog.rootOpenFile # FS.nullOpenFile THEN { trans: AlpTransaction.Handle _ rootLog.newMailLog.trans; FS.Close[rootLog.newMailLog.rootOpenFile ! FS.Error => CONTINUE]; rootLog.newMailLog.rootOpenFile _ FS.nullOpenFile; rootLog.newMailLog.trans _ NIL; FinishTrans[alpTH: trans, abort: FALSE, continue: FALSE]; }; }; ReturnReadArchiveStream: PUBLIC PROC[readArchiveStream: STREAM] = { IF readArchiveStream # rootLog.readArchiveLog.stream THEN RETURN; rootLog.readArchiveLog.stream _ NIL; readArchiveStream.Close[ ! FS.Error, IO.Error => CONTINUE]; -- assume flushed }; StreamToWhich: PUBLIC PROC[stream: STREAM] RETURNS[ROPE] = { IF stream = rootLog.currentLog.readStream OR stream = rootLog.currentLog.writeStream OR stream = rootLog.expungeLog.readStream THEN RETURN["currentLog"]; IF stream = rootLog.expungeLog.writeStream THEN RETURN["expungeLog"]; IF stream = rootLog.newMailLog.stream THEN RETURN["newMailLog"]; IF stream = rootLog.readArchiveLog.stream THEN RETURN["readArchiveLog"]; RETURN["unknownLog"]; }; <> RegisterStatsProc: PUBLIC PROC[proc: PROC[ROPE]] = { statsProc _ proc }; UnregisterStatsProc: PUBLIC PROC[proc: PROC[ROPE]] = { IF statsProc = proc THEN statsProc _ NIL }; StatsReport: PUBLIC PROC[r: ROPE] = { IF statsProc # NIL THEN statsProc[r]; }; <> CreateTrans: PROC RETURNS[alpTH: AlpTransaction.Handle] = { needRetry: BOOL _ FALSE; haveRetried: BOOL _ FALSE; DO IF rootLog.transInfo.handle = NIL THEN rootLog.transInfo.handle _ AlpInstance.Create[rootLog.transInfo.fileStore ! AlpInstance.Failed => IF why = authenticateFailed THEN ERROR WalnutDefs.Error[$root, $BadUserPassword, "Can't create AlpInstance.Handle"] ELSE ERROR WalnutDefs.Error[$root, $communication, rootLog.transInfo.fileStore] ]; alpTH _ AlpTransaction.Create[rootLog.transInfo.handle ! AlpTransaction.OperationFailed => IF why = busy THEN ERROR WalnutDefs.Error[$root, $serverBusy, rootLog.transInfo.fileStore]; RPC.CallFailed => TRUSTED { IF why = unbound THEN {needRetry _ TRUE; CONTINUE} <> ELSE IF why IN [timeout .. busy] THEN ERROR WalnutDefs.Error[$root, $communication, rootLog.transInfo.fileStore]} ]; IF NOT needRetry THEN EXIT; IF haveRetried THEN ERROR WalnutDefs.Error[$root, $communication, rootLog.transInfo.fileStore]; needRetry _ FALSE; haveRetried _ TRUE; ENDLOOP; IF seriousDebugging THEN StatsReport[IO.PutFR["\n&&& CreateTransID: %g", IO.rope[TransIDToRope[alpTH]] ] ]; }; FinishTrans: PROC[alpTH: AlpTransaction.Handle, abort: BOOL, continue: BOOL] = { outcome: AlpTransaction.Outcome; IF alpTH = NIL THEN RETURN; IF seriousDebugging THEN StatsReport[IO.PutFR["\n***[old: %g, abort: %g, continue: %g]", IO.rope[TransIDToRope[alpTH]], IO.bool[abort], IO.bool[continue]] ]; outcome _ alpTH.Finish[ requestedOutcome: IF abort THEN abort ELSE commit, continue: continue ! RPC.CallFailed => IF why IN [timeout .. busy] THEN { IF abort THEN {outcome _ abort; CONTINUE} -- so can escape when no communication! ELSE ERROR WalnutDefs.Error[$root, $communication, rootLog.transInfo.fileStore] } ELSE ERROR WalnutDefs.Error[$root, $ProtocolError, rootLog.transInfo.fileStore] ]; IF seriousDebugging AND continue THEN StatsReport[IO.PutFR[" NewTransID: %g", IO.rope[TransIDToRope[alpTH]]] ]; IF NOT abort AND outcome # commit THEN { StatsReport[IO.PutFR["\n TransactionAbort on %g", IO.rope[rootLog.transInfo.fileStore]]]; ERROR WalnutDefs.Error[$root, $TransactionAbort, rootLog.transInfo.fileStore]; }; }; CheckDBTrans: PROC[who: ROPE] = { dbTrans: AlpTransaction.Handle _ NARROW[DB.GetSegmentInfo[rootLog.segmentID.segment].trans]; IF dbTrans = rootLog.alpTH THEN RETURN; SIGNAL BogusDBTrans; StatsReport[IO.PutFR["\n-> -> During %g: DBTrans is: %g, rootTrans is: %g", IO.rope[who], IO.rope[TransIDToRope[rootLog.alpTH]], IO.rope[TransIDToRope[dbTrans]] ] ]; }; TransIDToRope: PROC[alpTH: AlpTransaction.Handle] RETURNS[ROPE] = { trueTransID: TransID; IF alpTH = NIL THEN RETURN["NIL"]; trueTransID _ LOOPHOLE[alpTH.transID]; RETURN[Convert.RopeFromInt[trueTransID.idOnFileStore]]; }; GetRootReadLock: PROC RETURNS[of: FS.OpenFile, alpTH: AlpTransaction.Handle] = { reason: ATOM; rootFileStamp: GMT; alpTH _ CreateTrans[]; BEGIN ENABLE FS.Error => { reason _ error.code; GOTO error}; of _ AlpineFS.Open[ name: rootLog.name, lock: [$read, $fail], transHandle: alpTH, options: rootOpenOptions ]; rootFileStamp _ FS.GetInfo[of].created; EXITS error => { FS.Close[of ! FS.Error => CONTINUE]; FinishTrans[alpTH, FALSE, FALSE]; ERROR WalnutDefs.Error[$root, reason, rootLog.name]; }; END; CheckStamp[rootFileStamp]; }; rootOpenOptions: AlpineFS.FileOptions _ [ updateCreateTime: FALSE, referencePattern: sequential, recoveryOption: $log, finishTransOnClose: FALSE]; CloseInternalLog: PROC[ili: InternalLogInfo] = { IF ili = NIL THEN RETURN; IF ili.writeStream # NIL THEN { ili.writeStream.Close[ ! FS.Error, IO.Error => CONTINUE]; ili.writeStream _ NIL; }; IF ili.readStream # NIL THEN { ili.readStream.Close[ ! FS.Error, IO.Error => CONTINUE]; ili.readStream _ NIL; }; }; CloseMiscLog: PROC[mli: MiscLogInfo, abortIfExternal: BOOL _ FALSE] = { <> strm: STREAM; IF mli = NIL THEN RETURN; strm _ mli.stream; mli.stream _ NIL; IF mli.rootOpenFile # NIL THEN { -- is external of: FS.OpenFile _ mli.rootOpenFile; mli.rootOpenFile _ FS.nullOpenFile; IF strm # NIL THEN strm.Close[! IO.Error, FS.Error => CONTINUE]; IF mli.trans # NIL THEN FinishTrans[alpTH: mli.trans, abort: abortIfExternal, continue: FALSE ! FS.Error => CONTINUE]; FS.Close[of ! FS.Error => CONTINUE]; } ELSE IF strm # NIL THEN strm.Close[ ! FS.Error, IO.Error => CONTINUE]; }; OpenTempStream: PROC[which: WhichTempLog] RETURNS[mli: MiscLogInfo] = { SELECT which FROM newMail => mli _ rootLog.newMailLog; readArchive => mli _ rootLog.readArchiveLog; ENDCASE => ERROR WalnutDefs.Error[$log, $UnknownLogFileType]; IF mli.stream # NIL THEN { mli.stream.Close[ ! IO.Error, FS.Error => CONTINUE]; mli.stream _ NIL; }; mli.stream _ StreamOpen[name: mli.name, trans: rootLog.alpTH, exclusive: FALSE, closeFSOpenFile: FALSE]; mli.trans _ rootLog.alpTH }; CheckRootStamp: PROC[] = { CheckStamp[GetRootFileStamp[]] }; CheckStamp: PROC[created: GMT] = { IF created # rootLog.createDate THEN ERROR WalnutDefs.Error[$root, $WrongCreateDate, IO.PutFR[" Create date is : %g, Create date expected is: %g", IO.time[created], IO.time[rootLog.createDate]]]; }; NextRootEntry: PROC[strm: STREAM] RETURNS[rte: WalnutKernelDefs.RootEntry] = { []_ strm.SkipWhitespace[flushComments: TRUE]; rte.ident _ strm.GetTokenRope[].token; IF rte.ident.Equal["End", FALSE] THEN RETURN; []_ strm.SkipWhitespace[flushComments: TRUE]; IF rte.ident.Equal["LogInfo", FALSE] THEN { lif: WalnutKernelDefs.LogInfoFromRoot; lif.fileName _ strm.GetTokenRope[IO.IDProc].token; []_ strm.SkipWhitespace[flushComments: TRUE]; lif.internalFileID _ strm.GetInt[]; []_ strm.SkipWhitespace[flushComments: TRUE]; lif.seqNoPos _ strm.GetIndex[]; lif.logSeqNo _ strm.GetTokenRope[].token; []_ strm.SkipWhitespace[flushComments: TRUE]; rte.info _ lif; } ELSE IF rte.ident.Equal["Key", FALSE] THEN rte.value _ strm.GetLineRope[] ELSE { rte.value _ strm.GetTokenRope[IO.IDProc].token; []_ strm.SkipWhitespace[flushComments: TRUE]; }; RETURN[rte]; }; FileStoreForRoot: PROC[rootName: ROPE] RETURNS[fileStore: ROPE] = { cp: FS.ComponentPositions _ FS.ExpandName[rootName].cp; IF cp.server.length = 0 THEN WalnutDefs.Error[$root, $InvalidName, rootName]; RETURN[rootName.Substr[cp.server.start, cp.server.length]]; }; <> StreamOpen: PROC[name: ROPE, readOnly: BOOL_ FALSE, pages: INT _ 200, trans: AlpTransaction.Handle_ NIL, exclusive: BOOL _ FALSE, closeFSOpenFile: BOOL _ TRUE] RETURNS [strm: STREAM] = { openFile: AlpineFS.OpenFile _ []; IF readOnly THEN { openFile _ AlpineFS.Open[ name: name, options: alpineFileOptions, lock: [$read, $fail], transHandle: trans]; strm _ AlpineFS.StreamFromOpenFile[openFile: openFile, streamOptions: alpineStreamOptions, streamBufferParms: streamBufferOption]; } ELSE { actualPages: INT; openFile _ AlpineFS.Open[ name: name, access: $write, lock: IF exclusive THEN [$write, $fail] ELSE [$read, $wait], options: alpineFileOptions, transHandle: trans]; actualPages _ FS.GetInfo[openFile].pages; IF pages > actualPages THEN FS.SetPageCount[openFile, pages]; <> alpineStreamOptions.closeFSOpenFileOnClose _ closeFSOpenFile; strm _ AlpineFS.StreamFromOpenFile[openFile: openFile, accessRights: $write, initialPosition: $start, streamOptions: alpineStreamOptions, streamBufferParms: streamBufferOption]; }; }; alpineFileOptions: AlpineFS.FileOptions _ [ updateCreateTime: FALSE, referencePattern: sequential, recoveryOption: $log, finishTransOnClose: FALSE ]; streamBufferOption: FS.StreamBufferParms = [vmPagesPerBuffer: 4, nBuffers: 4]; nFreeTuples: NAT _ 256; DeclareDBSegment: PROC[rootLog: RootLog, newSegmentOk: BOOL] = { protectionViolation: BOOL _ FALSE; DB.DeclareSegment[ filePath: rootLog.dbNameFromRootFile, segment: rootLog.segmentID.segment, number: rootLog.segmentID.index, readonly: rootLog.readOnly, createIfNotFound: newSegmentOk, nPagesInitial: 256, nPagesPerExtent: 256 ]; }; OpenDBTrans: PROC[rootLog: RootLog] = { inaccessible: BOOL _ FALSE; reason: ROPE _ "unknown"; DB.OpenTransaction[segment: rootLog.segmentID.segment, useTrans: rootLog.alpTH ! DB.Aborted => GOTO abort; DBEnvironment.Error => { IF code = TransactionAlreadyOpen THEN CONTINUE; inaccessible _ TRUE; SELECT code FROM ProtectionViolation => reason _ "database is read only"; DirectoryNotFound => reason _ "directory not found"; FileNotFound => reason _ "file not found"; ServerNotFound => reason _ "server not found"; QuotaExceeded => reason _ "quota exceeded"; ENDCASE => REJECT; CONTINUE; }; DBEnvironment.Failure => { inaccessible _ TRUE; SELECT what FROM $communication => reason _ "cannot connect to server"; ENDCASE => REJECT; CONTINUE; }; ]; IF inaccessible THEN ERROR WalnutDefs.Error[$db, $DatabaseInaccessible, reason]; EXITS abort => ERROR WalnutDefs.Error[$db, $TransactionAbort, "During open database"]; }; DB.Initialize[nFreeTuples: nFreeTuples]; END.