<> <> <> <> <> <<>> <> DIRECTORY AlpineEnvironment USING [VolumeID], AlpFile USING [LockOption, UnlockFile, Handle, Close], AlpineFile USING [Unknown], AlpineFS USING [OpenFile, FileOptions, StreamOptions, GetAlpFileHandle, Open, OpenFileFromStream, SetLockOption, StreamFromOpenFile, StreamFromOpenStream], AlpInstance USING [Failed, FileStore, Handle, Create], AlpTransaction USING [Handle, OperationFailed, Outcome, Create, Finish, GetNextVolumeGroup, UnlockOwnerDB, VolumeGroupID], BasicTime USING [GMT, nullGMT, Now], Convert USING [IntFromRope, RopeFromInt], DB USING [Aborted, Error, Failure, AbortTransaction, CloseTransaction, DeclareSegment, EraseSegment, Initialize, MakeTransHandle, MarkTransaction, OpenTransaction], DBCommon USING [TransactionHandle], FS USING [ComponentPositions, Error, nullOpenFile, Close, GetInfo, ExpandName, OpenFile, PagesForBytes, SetPageCount, StreamBufferParms, SetByteCountAndCreatedTime], IO, Rope, RPC USING [CallFailed], WalnutDefs USING [Error, Segment], WalnutKernelDefs USING [LogInfoFromRoot, RootEntry, SegmentID, WhichTempLog], WalnutRoot USING [], WalnutStream USING [FlushStream, SetHighWaterMark]; WalnutRootImpl: CEDAR MONITOR IMPORTS AlpFile, AlpineFile, AlpineFS, AlpInstance, AlpTransaction, BasicTime, Convert, DB, FS, IO, Rope, RPC, WalnutDefs, WalnutStream EXPORTS WalnutRoot = BEGIN <<>> <> <<>> <> <<>> <> GMT: TYPE = BasicTime.GMT; ROPE: TYPE = Rope.ROPE; STREAM: TYPE = IO.STREAM; TransactionHandle: TYPE = DBCommon.TransactionHandle; WhichTempLog: TYPE = WalnutKernelDefs.WhichTempLog; TransInfo: TYPE = RECORD [ fileStore: AlpInstance.FileStore, handle: AlpInstance.Handle]; nullTransInfo: TransInfo _ [NIL, NIL]; InternalLogInfo: TYPE = REF InternalLogInfoObject; InternalLogInfoObject: TYPE = RECORD [ readStream: STREAM, -- not used for expungeLog writeStream: STREAM, name: ROPE _ NIL, -- rope name of this log internalFileID: INT, logSeqNo: INT _ -1, logSeqNoPos: INT _ -1 -- position in rootLog of logSeqNo ]; MiscLogInfo: TYPE = REF MiscLogInfoObject; MiscLogInfoObject: TYPE = RECORD [ stream: STREAM, name: ROPE _ NIL, trans: AlpTransaction.Handle _ NIL, rootOpenFile: FS.OpenFile _ FS.nullOpenFile -- may have its own transaction ]; RootLog: TYPE = REF RootLogObject; RootLogObject: TYPE = RECORD [ name: ROPE, createDate: GMT _ BasicTime.nullGMT, readOnly: BOOL _ FALSE, keyFromRootFile: ROPE _ NIL, mailForFromRootFile: ROPE _ NIL, dbNameFromRootFile: ROPE _ NIL, segmentID: WalnutKernelDefs.SegmentID _ defaultSegmentID, transInfo: TransInfo _ nullTransInfo, alpTH: AlpTransaction.Handle _ NIL, -- current transaction dbTransH: TransactionHandle _ NIL, -- form needed by cypress rootOpenFileForLogs: FS.OpenFile _ FS.nullOpenFile, currentLog: InternalLogInfo _ NIL, -- only one log for now expungeLog: InternalLogInfo _ NIL, newMailLog: MiscLogInfo _ NIL, readArchiveLog: MiscLogInfo _ NIL ]; TransID: TYPE = MACHINE DEPENDENT RECORD [ -- taken from ConcreteTransID randomBits: INT, idOnFileStore: INT, fileStore: AlpineEnvironment.VolumeID ]; TransRequest: TYPE = { abort, close, continue }; <<>> <> <<>> rootLog: RootLog _ NIL; statsProc: PROC[ROPE] _ NIL; seriousDebugging: BOOL _ FALSE; BogusDBTrans: SIGNAL = CODE; defaultSegmentID: WalnutKernelDefs.SegmentID _ [$Walnut, 0]; currentVolumeGroupID: AlpTransaction.VolumeGroupID; <> <> <> fileOptions: AlpineFS.FileOptions = [ updateCreateTime: FALSE, -- is done explicitly referencePattern: sequential, recoveryOption: $log, finishTransOnClose: FALSE]; <> rootBufferOption: FS.StreamBufferParms = [vmPagesPerBuffer: 4, nBuffers: 1]; alpineStreamOptions: AlpineFS.StreamOptions _ [ tiogaRead: FALSE, commitAndReopenTransOnFlush: FALSE, -- just flush truncatePagesOnClose: FALSE, finishTransOnClose: FALSE, closeFSOpenFileOnClose: TRUE -- Note: this option is changed for opening temp logs when doing copies, so that we can Unlock the file before closing it ]; <<>> <<>> Open: PUBLIC PROC[ rootName: ROPE, readOnly: BOOL _ FALSE, newSegmentOk: BOOL _ FALSE] RETURNS[ key, mailFor: ROPE, rootFileCreateDate: GMT, segment: WalnutDefs.Segment, isReadOnly: BOOL] = { newMailLogName, readArchiveLogName: ROPE; logInfoList: LIST OF WalnutKernelDefs.LogInfoFromRoot; fileName, dbName: ROPE; isReadOnly _ FALSE; IF rootLog # NIL THEN ERROR WalnutDefs.Error[$root, $RootAlreadyOpen, "Must do a Close"]; BEGIN logInfoCount: INT _ 0; reason: ATOM; openFile: FS.OpenFile; rStream: STREAM; rootLog _ NEW[RootLogObject _ [name: rootName, readOnly: readOnly]]; rootLog.transInfo _ [fileStore: FileStoreForRoot[rootName]]; rootLog.alpTH _ CreateTrans[]; rootLog.dbTransH _ DB.MakeTransHandle[rootLog.alpTH]; currentVolumeGroupID _ AlpTransaction.GetNextVolumeGroup[rootLog.alpTH]; <> BEGIN ENABLE FS.Error => { reason _ error.code; GOTO error }; openFile _ AlpineFS.Open[ name: rootName, access: $write, lock: [$write, $wait], options: fileOptions, transHandle: rootLog.alpTH]; FS.Close[openFile]; EXITS error => IF reason = $unknownFile THEN ERROR WalnutDefs.Error[$root, $RootNotFound, rootName] ELSE IF reason = $accessDenied THEN isReadOnly _ TRUE ELSE ERROR WalnutDefs.Error[$root, reason, rootName]; END; rootLog.readOnly _ isReadOnly; BEGIN ENABLE FS.Error => { reason _ error.code; GOTO error }; openFile _ AlpineFS.Open[ name: rootName, access: $read, lock: [$read, $wait], options: fileOptions, transHandle: rootLog.alpTH]; rStream _ AlpineFS.StreamFromOpenFile[ openFile: openFile, streamOptions: alpineStreamOptions, streamBufferParms: streamBufferOption]; rootFileCreateDate _ rootLog.createDate _ FS.GetInfo[openFile].created; EXITS error => ERROR WalnutDefs.Error[$root, reason, rootName]; END; <> BEGIN ENABLE IO.EndOfStream, IO.Error => GOTO cantParse; finished: BOOL _ FALSE; rStream.SetIndex[0]; DO curPos: INT _ rStream.GetIndex[]; rootEntry: WalnutKernelDefs.RootEntry _ NextRootEntry[rStream]; DO IF rootEntry.ident.Equal["Key", FALSE] THEN { key _ rootEntry.value; EXIT }; IF rootEntry.ident.Equal["MailFor", FALSE] THEN { mailFor _ rootEntry.value; EXIT }; IF rootEntry.ident.Equal["Database", FALSE] THEN { dbName _ rootEntry.value; EXIT }; IF rootEntry.ident.Equal["NewMailLog", FALSE] THEN { newMailLogName _ rootEntry.value; EXIT }; IF rootEntry.ident.Equal["ReadArchiveLog", FALSE] THEN { readArchiveLogName _ rootEntry.value; EXIT }; IF rootEntry.ident.Equal["LogInfo", FALSE] THEN { logInfoList _ CONS[rootEntry.info, logInfoList]; logInfoCount _ logInfoCount + 1; EXIT }; IF rootEntry.ident.Equal["End", FALSE] THEN { finished _ TRUE; EXIT }; rStream.Close[ ! IO.Error, FS.Error => CONTINUE]; FinishTransaction[rootLog, close ! WalnutDefs.Error => CONTINUE]; rootLog _ NIL; ERROR WalnutDefs.Error[$log, $BadRootFile, IO.PutFR["Unknown entry %g in RootFile at pos %g", IO.rope[rootEntry.ident], IO.int[curPos]]]; ENDLOOP; IF finished THEN EXIT; ENDLOOP; EXITS cantParse => { rStream.Close[ ! IO.Error, FS.Error => CONTINUE]; FinishTransaction[rootLog, close ! WalnutDefs.Error => CONTINUE]; rootLog _ NIL; ERROR WalnutDefs.Error[$log, $BadRootFile, "Couldn't parse root file"]; }; END; rStream.Close[]; rStream_ NIL; -- not needed IF key.Length[] = 0 OR dbName.Length[] = 0 OR mailFor.Length[] = 0 OR newMailLogName.Length[] = 0 OR readArchiveLogName.Length[] = 0 OR logInfoCount # 2 THEN ERROR WalnutDefs.Error[ $root, $BadRootFile, "Incomplete RootFile - missing entries"]; <> BEGIN cp: FS.ComponentPositions; fn: ROPE; rServer, oServer: ROPE; CheckServer: PROC[name: ROPE] = { [fn, cp, ] _ FS.ExpandName[name]; oServer _ Rope.Substr[fn, cp.server.start, cp.server.length]; IF ~Rope.Equal[rServer, oServer, FALSE] THEN { rName: ROPE _ rootLog.name; FS.Close[openFile ! FS.Error => CONTINUE]; FinishTransaction[rootLog, close ! WalnutDefs.Error => CONTINUE]; rootLog _ NIL; ERROR WalnutDefs.Error[$root, $BadRootFile, IO.PutFR["server for file %g does not agree with that for %g", IO.rope[name], IO.rope[rName]] ]; }; }; [fn, cp, ] _ FS.ExpandName[rootLog.name]; rServer _ Rope.Substr[fn, cp.server.start, cp.server.length]; CheckServer[dbName]; CheckServer[newMailLogName]; CheckServer[readArchiveLogName]; CheckServer[logInfoList.first.fileName]; CheckServer[logInfoList.rest.first.fileName]; END; rootLog.keyFromRootFile _ key; rootLog.mailForFromRootFile _ mailFor; rootLog.dbNameFromRootFile _ dbName; FS.Close[openFile ! FS.Error => CONTINUE]; FinishTransaction[rootLog, close ! WalnutDefs.Error => CONTINUE]; rootLog.alpTH _ NIL; rootLog.dbTransH _ NIL; END; <> FOR lil: LIST OF WalnutKernelDefs.LogInfoFromRoot _ logInfoList, lil.rest UNTIL lil = NIL DO seqNo: ROPE _ lil.first.logSeqNo; fileName _ lil.first.fileName; IF seqNo.Fetch[0] # 'E THEN { -- this is the current log rootLog.currentLog _ NEW[InternalLogInfoObject _ [ name: fileName, internalFileID: lil.first.internalFileID, logSeqNo: Convert.IntFromRope[seqNo], logSeqNoPos: lil.first.seqNoPos ]]; } ELSE -- this is the expunge log rootLog.expungeLog _ NEW[InternalLogInfoObject _ [ name: fileName, internalFileID: lil.first.internalFileID, logSeqNo: -1, logSeqNoPos: lil.first.seqNoPos ]]; ENDLOOP; IF ~isReadOnly THEN { rootLog.newMailLog _ NEW[MiscLogInfoObject _ [name: newMailLogName]]; rootLog.readArchiveLog _ NEW[MiscLogInfoObject _ [name: readArchiveLogName]]; } ELSE rootLog.expungeLog _ NIL; -- is readOnly, can't have expungeLog segment _ rootLog.segmentID.segment; DeclareDBSegment[rootLog, newSegmentOk]; }; StartTransaction: PUBLIC PROC[openDB: BOOL _ TRUE] RETURNS[schemaInvalid: BOOL] = { IF rootLog.alpTH # NIL THEN { FS.Close[rootLog.rootOpenFileForLogs ! FS.Error => CONTINUE]; BEGIN DB.CloseTransaction[rootLog.dbTransH ! DB.Aborted, DB.Error, DB.Failure, RPC.CallFailed => GOTO err]; EXITS err => DB.AbortTransaction[rootLog.dbTransH ! DB.Aborted, DB.Error, DB.Failure, RPC.CallFailed => CONTINUE]; END; rootLog.alpTH _ NIL; }; [rootLog.rootOpenFileForLogs, rootLog.alpTH] _ GetRootReadLock[]; rootLog.dbTransH _ DB.MakeTransHandle[rootLog.alpTH]; IF openDB THEN schemaInvalid _ OpenDBTrans[rootLog] ELSE schemaInvalid _ TRUE; }; Shutdown: PUBLIC PROC = { IF rootLog = NIL THEN RETURN; CloseInternalLog[rootLog.currentLog]; CloseInternalLog[rootLog.expungeLog]; CloseMiscLog[mli: rootLog.readArchiveLog]; CloseMiscLog[mli: rootLog.newMailLog, abortIfExternal: TRUE]; FS.Close[rootLog.rootOpenFileForLogs ! FS.Error => CONTINUE]; FinishTransaction[rootLog, close ! WalnutDefs.Error => CONTINUE]; rootLog_ NIL; }; CloseTransaction: PUBLIC PROC = { IF rootLog = NIL THEN RETURN; CloseInternalLog[rootLog.currentLog]; CloseInternalLog[rootLog.expungeLog]; CloseMiscLog[rootLog.readArchiveLog]; FS.Close[rootLog.rootOpenFileForLogs ! FS.Error => CONTINUE]; rootLog.rootOpenFileForLogs _ FS.nullOpenFile; FinishTransaction[rootLog, close ! WalnutDefs.Error => CONTINUE]; rootLog.alpTH _ NIL; rootLog.dbTransH _ NIL; }; FinishTransaction: PROC[rootLog: RootLog, request: TransRequest] = { eCode: ATOM; IF rootLog = NIL OR rootLog.alpTH = NIL THEN RETURN; BEGIN ENABLE BEGIN DB.Aborted => {eCode _ $TransactionAbort; GOTO error }; DB.Error => {eCode _ $DBError; GOTO error }; DB.Failure => {eCode _ $DatabaseInaccessible; GOTO error }; RPC.CallFailed => {eCode _ $RPCCallFailed; GOTO error }; END; SELECT request FROM abort => DB.AbortTransaction[rootLog.dbTransH]; continue => { DB.MarkTransaction[rootLog.dbTransH]; AlpTransaction.UnlockOwnerDB[rootLog.alpTH, currentVolumeGroupID]; }; close => DB.CloseTransaction[rootLog.dbTransH]; ENDCASE => WalnutDefs.Error[$bug, $UnknownCode, "Bad value passed to FinishTransaction"]; EXITS error => ERROR WalnutDefs.Error[$db, eCode, "Can't access transaction"]; END; }; CommitAndContinue: PUBLIC PROC = { FinishTransaction[rootLog, continue] }; AbortTransaction: PUBLIC PROC = { IF rootLog = NIL THEN RETURN; CloseInternalLog[rootLog.currentLog]; CloseInternalLog[rootLog.expungeLog]; CloseMiscLog[rootLog.readArchiveLog]; FinishTransaction[rootLog, abort]; rootLog.alpTH _ NIL; rootLog.dbTransH _ NIL; }; EraseDB: PUBLIC PROC = { errCode: ATOM; IF rootLog = NIL THEN ERROR WalnutDefs.Error[$root, $RootNotOpen, "Trying to do an EraseDB"]; BEGIN ENABLE BEGIN DB.Aborted => { errCode _ $TransactionAbort; GOTO error}; DB.Error => { errCode _ $DBError; GOTO error}; DB.Failure => { errCode _ $DatabaseInaccessible; GOTO error}; RPC.CallFailed => {errCode _ $RPCCallFailed; GOTO error }; END; [] _ DB.EraseSegment[rootLog.segmentID.segment, rootLog.dbTransH]; -- leaves trans open EXITS error => { DB.AbortTransaction[rootLog.dbTransH]; ERROR WalnutDefs.Error[$db, errCode, "Can't erase"]; }; END; }; FlushAndContinue: PUBLIC PROC[strm: STREAM] = { mli: MiscLogInfo; IF strm # (mli _ rootLog.newMailLog).stream THEN ERROR WalnutDefs.Error[$root, $UnknownStream]; WalnutStream.FlushStream[strm: strm, setCreateDate: TRUE]; IF mli.trans = NIL THEN RETURN; FinishMiscTrans[alpTH: mli.trans, abort: FALSE, continue: TRUE] }; GetRootFileStamp: PUBLIC PROC RETURNS[rootFileStamp: GMT] = { reason: ATOM; BEGIN ENABLE FS.Error => { reason _ error.code; GOTO error }; of: FS.OpenFile _ AlpineFS.Open[ name: rootLog.name, options: fileOptions, lock: [$read, fail], transHandle: rootLog.alpTH]; rootFileStamp _ FS.GetInfo[of].created; FS.Close[of]; EXITS error => ERROR WalnutDefs.Error[$root, reason, rootLog.name]; END; }; GetExpungeID: PUBLIC PROC RETURNS[expungeFileID: INT] = { RETURN[rootLog.expungeLog.internalFileID] }; <> AcquireWriteLock: PUBLIC PROC = { of: AlpineFS.OpenFile _ AlpineFS.OpenFileFromStream[rootLog.currentLog.writeStream]; AlpineFS.SetLockOption[of, [$write, $wait] ] }; GetCurrentLogStreams: PUBLIC PROC RETURNS[readStream, writeStream: STREAM] = { IF rootLog = NIL OR rootLog.alpTH = NIL THEN ERROR WalnutDefs.Error[$log, $TransNotOpen, "Must do Shutdown and Startup"]; IF rootLog.readOnly THEN rootLog.currentLog.readStream _ StreamOpen[name: rootLog.currentLog.name, readOnly: TRUE, trans: rootLog.alpTH] ELSE { rootLog.currentLog.writeStream _ StreamOpen[name: rootLog.currentLog.name, trans: rootLog.alpTH]; rootLog.currentLog.readStream _ AlpineFS.StreamFromOpenStream[rootLog.currentLog.writeStream]; }; RETURN[rootLog.currentLog.readStream, rootLog.currentLog.writeStream] }; ReleaseWriteLock: PUBLIC PROC RETURNS[readStream, writeStream: STREAM] = { <<-- uses same trans as before>> CloseInternalLog[rootLog.currentLog]; rootLog.currentLog.writeStream _ StreamOpen[name: rootLog.currentLog.name, trans: rootLog.alpTH]; rootLog.currentLog.readStream _ AlpineFS.StreamFromOpenStream[rootLog.currentLog.writeStream]; RETURN[rootLog.currentLog.readStream, rootLog.currentLog.writeStream]; }; ReturnCurrentLogStreams: PUBLIC PROC = { IF rootLog = NIL THEN RETURN; CloseInternalLog[rootLog.currentLog]; CloseInternalLog[rootLog.expungeLog]; FinishTransaction[rootLog, continue]; }; <> GetStreamsForExpunge: PUBLIC PROC RETURNS[ currentStream, expungeStream: STREAM, keyIs: ROPE, expungeFileID, logSeqNo: INT] = { <<-- assert: have a write lock on the currentLog, the expungeLog is not open>> IF rootLog.expungeLog.writeStream # NIL THEN rootLog.expungeLog.writeStream.Close[ ! IO.Error, FS.Error => CONTINUE]; rootLog.expungeLog.writeStream _ NIL; rootLog.expungeLog.writeStream _ StreamOpen[ name: rootLog.expungeLog.name, trans: rootLog.alpTH, exclusive: TRUE]; RETURN[ rootLog.currentLog.readStream, rootLog.expungeLog.writeStream, rootLog.keyFromRootFile, rootLog.expungeLog.internalFileID, rootLog.currentLog.logSeqNo] -- *****NOTE FIRST AND LAST VALUES }; SwapLogs: PUBLIC PROC[expungeFileID: INT, newRootStamp: GMT] RETURNS[newLogLen: INT] = { <> tempLog: InternalLogInfo; reason: ATOM; IF rootLog.currentLog.internalFileID = expungeFileID THEN -- already did swap RETURN[rootLog.currentLog.writeStream.GetLength[]]; IF rootLog.expungeLog.internalFileID # expungeFileID THEN ERROR WalnutDefs.Error[$expungeLog, $WrongFileID, IO.PutFR["FileID is: %g, FileID expected is: %g", IO.int[rootLog.expungeLog.internalFileID], IO.int[expungeFileID]] ]; IF rootLog.expungeLog.writeStream = NIL THEN ERROR WalnutDefs.Error[$expungeLog, $LogNotOpen, "Doing SwapLogs"]; BEGIN ENABLE FS.Error => { reason _ error.code; GOTO error }; of: FS.OpenFile _ AlpineFS.Open[ name: rootLog.name, wantedCreatedTime: rootLog.createDate, access: $write, lock: [$write, $wait], transHandle: rootLog.alpTH]; rStream: STREAM _ AlpineFS.StreamFromOpenFile[ openFile: of, accessRights: $write, streamOptions: alpineStreamOptions, streamBufferParms: streamBufferOption]; rStream.SetIndex[rootLog.currentLog.logSeqNoPos]; rStream.PutRope["Expunge"]; rStream.SetIndex[rootLog.expungeLog.logSeqNoPos]; rStream.PutRope[IO.PutFR["%07d", IO.int[rootLog.currentLog.logSeqNo]]]; FS.SetByteCountAndCreatedTime[of, -1, newRootStamp]; WalnutStream.SetHighWaterMark[rootLog.currentLog.writeStream, 0, 0]; newLogLen _ rootLog.expungeLog.writeStream.GetLength[]; rStream.Flush[]; DB.MarkTransaction[rootLog.dbTransH]; -- clean up the transaction rStream.Close[]; rootLog.createDate _ newRootStamp; CloseInternalLog[rootLog.currentLog]; -- release the currentLog streams CloseInternalLog[rootLog.expungeLog]; -- will want only read Lock -- swap the rootLog variables currentLog & expungeLog tempLog _ rootLog.currentLog; rootLog.currentLog _ rootLog.expungeLog; rootLog.expungeLog _ tempLog; rootLog.currentLog.logSeqNo _ rootLog.expungeLog.logSeqNo; rootLog.expungeLog.logSeqNo _ -1; EXITS error => ERROR WalnutDefs.Error[$root, reason, "During SwapLogs"]; END; }; ReturnExpungeStreams: PUBLIC PROC = { IF rootLog.expungeLog.writeStream # NIL THEN { rootLog.expungeLog.writeStream.Close[ ! FS.Error, IO.Error => CONTINUE]; rootLog.expungeLog.writeStream _ NIL; }; }; <> PrepareToCopyTempLog: PUBLIC PROC[ which: WhichTempLog, pagesAlreadyCopied: INT, reportProc: PROC[msg1, msg2, msg3: ROPE _ NIL]] RETURNS[currentStream, tempStream: STREAM] = { <> tempLogLen, pagesNeeded, actualPages, currentInUsePages: INT; of: FS.OpenFile; mli: MiscLogInfo _ OpenTempStream[which]; currentStream _ rootLog.currentLog.writeStream; IF mli.stream = NIL THEN RETURN[currentStream, NIL]; currentInUsePages _ FS.PagesForBytes[rootLog.currentLog.writeStream.GetLength[]]; tempLogLen _ mli.stream.GetLength[]; <> pagesNeeded _ FS.PagesForBytes[tempLogLen+200] + currentInUsePages - pagesAlreadyCopied; actualPages _ FS.GetInfo[of_ AlpineFS.OpenFileFromStream[rootLog.currentLog.writeStream]].pages; BEGIN errCode: ATOM; IF actualPages < pagesNeeded THEN { FS.SetPageCount[of, pagesNeeded ! FS.Error => IF error.code = $quotaExceeded THEN { reportProc["\n"]; reportProc[error.explanation]; reportProc["\n"]; GOTO quota } ELSE REJECT]; <> BEGIN BEGIN ENABLE BEGIN DB.Error => { errCode _ $DBError; GOTO error}; DB.Failure => { errCode _ $DatabaseInaccessible; GOTO error}; DB.Aborted => { errCode _ $TransactionAbort; GOTO error}; RPC.CallFailed => {errCode _ $RPCCallFailed; GOTO error }; END; DB.MarkTransaction[rootLog.dbTransH]; AlpTransaction.UnlockOwnerDB[rootLog.alpTH, currentVolumeGroupID]; EXITS error => ERROR WalnutDefs.Error[$db, errCode, "During prepare to copy temp log"]; END; END } EXITS quota => { mli.stream.Close[ ! IO.Error, FS.Error => CONTINUE]; mli.stream _ NIL; RETURN[currentStream, NIL] }; END; RETURN[currentStream, mli.stream]; }; GetStreamsForCopy: PUBLIC PROC[which: WhichTempLog] RETURNS[currentStream, tempStream: STREAM] = { tempStream _ OpenTempStream[which].stream; RETURN[rootLog.currentLog.writeStream, tempStream]; }; <<>> FinishCopy: PUBLIC PROC[which: WhichTempLog] = { <> <> mli: MiscLogInfo; pages: INT; fileHandle: FS.OpenFile; errCode: ATOM; SELECT which FROM newMail => { mli _ rootLog.newMailLog; pages _ 200 }; readArchive => { mli _ rootLog.readArchiveLog; pages _ 1 }; ENDCASE => ERROR WalnutDefs.Error[$log, $UnknownLogFileType]; IF mli.stream = NIL THEN mli _ OpenTempStream[which]; fileHandle _ AlpineFS.OpenFileFromStream[mli.stream]; WalnutStream.SetHighWaterMark[strm: mli.stream, hwmBytes: 0, numPages: pages, setCreateDate: TRUE]; mli.stream.Close[ ! FS.Error, IO.Error => CONTINUE]; BEGIN ENABLE BEGIN DB.Aborted => { errCode _ $TransactionAbort; GOTO error}; DB.Error => { errCode _ $DBError; GOTO error}; DB.Failure => { errCode _ $DatabaseInaccessible; GOTO error}; RPC.CallFailed => {errCode _ $RPCCallFailed; GOTO error }; END; DB.MarkTransaction[rootLog.dbTransH]; AlpFile.UnlockFile[AlpineFS.GetAlpFileHandle[fileHandle]]; EXITS error => ERROR WalnutDefs.Error[$db, errCode, "During Finish copy temp log"]; END; -- be careful to throw away the file lock FS.Close[fileHandle]; mli.stream _ NIL; }; AbortTempCopy: PUBLIC PROC[which: WhichTempLog, strm: STREAM] = { mli: MiscLogInfo; fileHandle: AlpFile.Handle; closeError: BOOL _ FALSE; SELECT which FROM newMail => mli _ rootLog.newMailLog; readArchive => mli _ rootLog.readArchiveLog; ENDCASE => ERROR WalnutDefs.Error[$log, $UnknownLogFileType]; IF mli.stream = NIL THEN RETURN; IF mli.stream # strm THEN ERROR WalnutDefs.Error[$root, $UnknownStreamReturned]; fileHandle _ AlpineFS.GetAlpFileHandle[file: AlpineFS.OpenFileFromStream[strm]]; strm.Close[ ! FS.Error, IO.Error => { closeError _ TRUE; CONTINUE} ]; IF ~closeError THEN AlpFile.Close[fileHandle ! AlpineFile.Unknown => CONTINUE]; mli.stream _ NIL; }; <> GetNewMailStream: PUBLIC PROC[lengthRequired: INT, pagesWanted: INT] RETURNS[strm: STREAM, actualLen: INT] = { mStream: STREAM; CheckRootStamp[]; IF rootLog.newMailLog.stream # NIL THEN RETURN[NIL, -1]; IF seriousDebugging THEN StatsReport["\n### Trans for newMailLog"]; [rootLog.newMailLog.rootOpenFile, rootLog.newMailLog.trans] _ GetRootReadLock[]; mStream _ rootLog.newMailLog.stream _ StreamOpen[ name: rootLog.newMailLog.name, trans: rootLog.newMailLog.trans, exclusive: TRUE]; IF (actualLen _ mStream.GetLength[]) < lengthRequired THEN { CloseMiscLog[rootLog.newMailLog]; RETURN[NIL, actualLen]; }; IF actualLen > lengthRequired THEN { WalnutStream.SetHighWaterMark[mStream, lengthRequired, pagesWanted, TRUE]; mStream.Flush[]; FinishMiscTrans[alpTH: rootLog.newMailLog.trans, abort: FALSE, continue: TRUE]; }; RETURN[mStream, actualLen] }; GetReadArchiveStream: PUBLIC PROC[pagesWanted: INT _ -1] RETURNS[STREAM] = { CheckRootStamp[]; IF rootLog.readArchiveLog.stream # NIL THEN RETURN[NIL]; rootLog.readArchiveLog.stream _ StreamOpen[ name: rootLog.readArchiveLog.name, trans: rootLog.alpTH, pages: pagesWanted, exclusive: TRUE]; RETURN[rootLog.readArchiveLog.stream] }; KeyIs: PUBLIC PROC RETURNS[ROPE] = { CheckRootStamp[]; RETURN[rootLog.keyFromRootFile] }; Mailfor: PUBLIC PROC RETURNS[ROPE] = { CheckRootStamp[]; RETURN[rootLog.mailForFromRootFile] }; DBName: PUBLIC PROC RETURNS[ROPE] = { CheckRootStamp[]; RETURN[rootLog.dbNameFromRootFile] }; ReturnNewMailStream: PUBLIC PROC[newMailStream: STREAM] = { IF newMailStream # rootLog.newMailLog.stream THEN RETURN; rootLog.newMailLog.stream _ NIL; newMailStream.Close[ ! FS.Error, IO.Error => CONTINUE]; IF rootLog.newMailLog.rootOpenFile # FS.nullOpenFile THEN { trans: AlpTransaction.Handle _ rootLog.newMailLog.trans; FS.Close[rootLog.newMailLog.rootOpenFile ! FS.Error => CONTINUE]; rootLog.newMailLog.rootOpenFile _ FS.nullOpenFile; rootLog.newMailLog.trans _ NIL; FinishMiscTrans[alpTH: trans, abort: FALSE, continue: FALSE]; }; }; ReturnReadArchiveStream: PUBLIC PROC[readArchiveStream: STREAM] = { IF readArchiveStream # rootLog.readArchiveLog.stream THEN RETURN; rootLog.readArchiveLog.stream _ NIL; readArchiveStream.Close[ ! FS.Error, IO.Error => CONTINUE]; -- assume flushed }; StreamToWhich: PUBLIC PROC[stream: STREAM] RETURNS[ROPE] = { IF stream = rootLog.currentLog.readStream OR stream = rootLog.currentLog.writeStream OR stream = rootLog.expungeLog.readStream THEN RETURN["currentLog"]; IF stream = rootLog.expungeLog.writeStream THEN RETURN["expungeLog"]; IF stream = rootLog.newMailLog.stream THEN RETURN["newMailLog"]; IF stream = rootLog.readArchiveLog.stream THEN RETURN["readArchiveLog"]; RETURN["unknownLog"]; }; <> RegisterStatsProc: PUBLIC PROC[proc: PROC[ROPE]] = { statsProc _ proc }; UnregisterStatsProc: PUBLIC PROC[proc: PROC[ROPE]] = { IF statsProc = proc THEN statsProc _ NIL }; StatsReport: PUBLIC PROC[r: ROPE] = { IF statsProc # NIL THEN statsProc[r]; }; <> CreateTrans: PROC RETURNS[alpTH: AlpTransaction.Handle] = { needRetry: BOOL _ FALSE; haveRetried: BOOL _ FALSE; DO IF rootLog.transInfo.handle = NIL THEN rootLog.transInfo.handle _ AlpInstance.Create[rootLog.transInfo.fileStore ! AlpInstance.Failed => IF why = authenticateFailed THEN ERROR WalnutDefs.Error[$root, $BadUserPassword, "Can't create AlpInstance.Handle"] ELSE ERROR WalnutDefs.Error[$root, $communication, rootLog.transInfo.fileStore] ]; alpTH _ AlpTransaction.Create[rootLog.transInfo.handle ! AlpTransaction.OperationFailed => IF why = busy THEN ERROR WalnutDefs.Error[$root, $serverBusy, rootLog.transInfo.fileStore]; RPC.CallFailed => TRUSTED { IF why = unbound THEN {needRetry _ TRUE; CONTINUE} <> ELSE IF why IN [timeout .. busy] THEN ERROR WalnutDefs.Error[$root, $communication, rootLog.transInfo.fileStore]} ]; IF NOT needRetry THEN EXIT; IF haveRetried THEN ERROR WalnutDefs.Error[$root, $communication, rootLog.transInfo.fileStore]; needRetry _ FALSE; haveRetried _ TRUE; ENDLOOP; IF seriousDebugging THEN StatsReport[IO.PutFR["\n&&& CreateTransID: %g", IO.rope[TransIDToRope[alpTH]] ] ]; }; FinishMiscTrans: PROC[alpTH: AlpTransaction.Handle, abort: BOOL, continue: BOOL] = { outcome: AlpTransaction.Outcome; IF alpTH = NIL THEN RETURN; IF seriousDebugging THEN StatsReport[IO.PutFR["\n***[old: %g, abort: %g, continue: %g]", IO.rope[TransIDToRope[alpTH]], IO.bool[abort], IO.bool[continue]] ]; outcome _ alpTH.Finish[ requestedOutcome: IF abort THEN abort ELSE commit, continue: continue ! RPC.CallFailed => IF why IN [timeout .. busy] THEN { IF abort THEN {outcome _ abort; CONTINUE} -- so can escape when no communication! ELSE ERROR WalnutDefs.Error[$root, $communication, rootLog.transInfo.fileStore] } ELSE ERROR WalnutDefs.Error[$root, $ProtocolError, rootLog.transInfo.fileStore] ]; IF seriousDebugging AND continue THEN StatsReport[IO.PutFR[" NewTransID: %g", IO.rope[TransIDToRope[alpTH]]] ]; IF NOT abort AND outcome # commit THEN { StatsReport[IO.PutFR["\n TransactionAbort on %g", IO.rope[rootLog.transInfo.fileStore]]]; ERROR WalnutDefs.Error[$root, $TransactionAbort, rootLog.transInfo.fileStore]; }; }; TransIDToRope: PROC[alpTH: AlpTransaction.Handle] RETURNS[ROPE] = { trueTransID: TransID; IF alpTH = NIL THEN RETURN["NIL"]; trueTransID _ LOOPHOLE[alpTH.transID]; RETURN[Convert.RopeFromInt[trueTransID.idOnFileStore]]; }; GetRootReadLock: PROC RETURNS[of: FS.OpenFile, alpTH: AlpTransaction.Handle] = { reason: ATOM; rootFileStamp: GMT; alpTH _ CreateTrans[]; BEGIN ENABLE FS.Error => { reason _ error.code; GOTO error}; of _ AlpineFS.Open[ name: rootLog.name, lock: [$read, $fail], transHandle: alpTH, options: rootOpenOptions ]; rootFileStamp _ FS.GetInfo[of].created; EXITS error => { FS.Close[of ! FS.Error => CONTINUE]; FinishMiscTrans[alpTH, FALSE, FALSE]; ERROR WalnutDefs.Error[$root, reason, rootLog.name]; }; END; CheckStamp[rootFileStamp]; }; rootOpenOptions: AlpineFS.FileOptions _ [ updateCreateTime: FALSE, referencePattern: sequential, recoveryOption: $log, finishTransOnClose: FALSE]; CloseInternalLog: PROC[ili: InternalLogInfo] = { IF ili = NIL THEN RETURN; IF ili.writeStream # NIL THEN { ili.writeStream.Close[ ! FS.Error, IO.Error => CONTINUE]; ili.writeStream _ NIL; }; IF ili.readStream # NIL THEN { ili.readStream.Close[ ! FS.Error, IO.Error => CONTINUE]; ili.readStream _ NIL; }; }; CloseMiscLog: PROC[mli: MiscLogInfo, abortIfExternal: BOOL _ FALSE] = { <> strm: STREAM; IF mli = NIL THEN RETURN; strm _ mli.stream; mli.stream _ NIL; IF mli.rootOpenFile # NIL THEN { -- is external of: FS.OpenFile _ mli.rootOpenFile; mli.rootOpenFile _ FS.nullOpenFile; IF strm # NIL THEN strm.Close[! IO.Error, FS.Error => CONTINUE]; IF mli.trans # NIL AND mli.trans # rootLog.alpTH THEN FinishMiscTrans[alpTH: mli.trans, abort: abortIfExternal, continue: FALSE ! FS.Error => CONTINUE]; FS.Close[of ! FS.Error => CONTINUE]; } ELSE IF strm # NIL THEN strm.Close[ ! FS.Error, IO.Error => CONTINUE]; }; OpenTempStream: PROC[which: WhichTempLog] RETURNS[mli: MiscLogInfo] = { SELECT which FROM newMail => mli _ rootLog.newMailLog; readArchive => mli _ rootLog.readArchiveLog; ENDCASE => ERROR WalnutDefs.Error[$log, $UnknownLogFileType]; IF mli.stream # NIL THEN { mli.stream.Close[ ! IO.Error, FS.Error => CONTINUE]; mli.stream _ NIL; }; mli.stream _ StreamOpen[name: mli.name, trans: rootLog.alpTH, exclusive: FALSE, closeFSOpenFile: FALSE]; mli.trans _ rootLog.alpTH }; CheckRootStamp: PROC[] = { CheckStamp[GetRootFileStamp[]] }; CheckStamp: PROC[created: GMT] = { IF created # rootLog.createDate THEN ERROR WalnutDefs.Error[$root, $WrongCreateDate, IO.PutFR[" Create date is : %g, Create date expected is: %g", IO.time[created], IO.time[rootLog.createDate]]]; }; NextRootEntry: PROC[strm: STREAM] RETURNS[rte: WalnutKernelDefs.RootEntry] = { []_ strm.SkipWhitespace[flushComments: TRUE]; rte.ident _ strm.GetTokenRope[].token; IF rte.ident.Equal["End", FALSE] THEN RETURN; []_ strm.SkipWhitespace[flushComments: TRUE]; IF rte.ident.Equal["LogInfo", FALSE] THEN { lif: WalnutKernelDefs.LogInfoFromRoot; lif.fileName _ strm.GetTokenRope[IO.IDProc].token; []_ strm.SkipWhitespace[flushComments: TRUE]; lif.internalFileID _ strm.GetInt[]; []_ strm.SkipWhitespace[flushComments: TRUE]; lif.seqNoPos _ strm.GetIndex[]; lif.logSeqNo _ strm.GetTokenRope[].token; []_ strm.SkipWhitespace[flushComments: TRUE]; rte.info _ lif; } ELSE IF rte.ident.Equal["Key", FALSE] THEN rte.value _ strm.GetLineRope[] ELSE { rte.value _ strm.GetTokenRope[IO.IDProc].token; []_ strm.SkipWhitespace[flushComments: TRUE]; }; RETURN[rte]; }; FileStoreForRoot: PROC[rootName: ROPE] RETURNS[fileStore: ROPE] = { cp: FS.ComponentPositions _ FS.ExpandName[rootName].cp; IF cp.server.length = 0 THEN WalnutDefs.Error[$root, $InvalidName, rootName]; RETURN[rootName.Substr[cp.server.start, cp.server.length]]; }; <> StreamOpen: PROC[name: ROPE, readOnly: BOOL_ FALSE, pages: INT _ 200, trans: AlpTransaction.Handle_ NIL, exclusive: BOOL _ FALSE, closeFSOpenFile: BOOL _ TRUE] RETURNS [strm: STREAM] = { openFile: AlpineFS.OpenFile _ []; IF readOnly THEN { openFile _ AlpineFS.Open[ name: name, options: alpineFileOptions, lock: [$read, $fail], transHandle: trans]; strm _ AlpineFS.StreamFromOpenFile[openFile: openFile, streamOptions: alpineStreamOptions, streamBufferParms: streamBufferOption]; } ELSE { actualPages: INT; openFile _ AlpineFS.Open[ name: name, access: $write, lock: IF exclusive THEN [$write, $fail] ELSE [$read, $wait], options: alpineFileOptions, transHandle: trans]; actualPages _ FS.GetInfo[openFile].pages; IF pages > actualPages THEN FS.SetPageCount[openFile, pages]; <> alpineStreamOptions.closeFSOpenFileOnClose _ closeFSOpenFile; strm _ AlpineFS.StreamFromOpenFile[openFile: openFile, accessRights: $write, initialPosition: $start, streamOptions: alpineStreamOptions, streamBufferParms: streamBufferOption]; }; }; alpineFileOptions: AlpineFS.FileOptions _ [ updateCreateTime: FALSE, referencePattern: sequential, recoveryOption: $log, finishTransOnClose: FALSE ]; streamBufferOption: FS.StreamBufferParms = [vmPagesPerBuffer: 4, nBuffers: 4]; DeclareDBSegment: PROC[rootLog: RootLog, newSegmentOk: BOOL] = { protectionViolation: BOOL _ FALSE; DB.DeclareSegment[ filePath: rootLog.dbNameFromRootFile, segment: rootLog.segmentID.segment, number: rootLog.segmentID.index, readonly: rootLog.readOnly, createIfNotFound: newSegmentOk, nPagesInitial: 256, nPagesPerExtent: 256 ]; }; OpenDBTrans: PROC[rootLog: RootLog] RETURNS[schemaInvalid: BOOL] = { inaccessible, mismatchedSegment: BOOL _ FALSE; reason: ROPE _ "unknown"; schemaInvalid _ DB.OpenTransaction[segment: rootLog.segmentID.segment, useTrans: rootLog.dbTransH ! DB.Aborted => GOTO abort; DB.Error => { IF code = TransactionAlreadyOpen THEN CONTINUE; inaccessible _ TRUE; SELECT code FROM ProtectionViolation => reason _ "database is read only"; DirectoryNotFound => reason _ "directory not found"; FileNotFound => reason _ "file not found"; ServerNotFound => reason _ "server not found"; QuotaExceeded => reason _ "quota exceeded"; MismatchedExistingSegment => { mismatchedSegment _ TRUE; reason _ "mismatched existing segment"; }; ENDCASE => REJECT; CONTINUE; }; DB.Failure => { inaccessible _ TRUE; SELECT what FROM $communication => reason _ "cannot connect to server"; ENDCASE => REJECT; CONTINUE; }; RPC.CallFailed => { inaccessible _ TRUE; reason _ "RPC.CallFailed"; CONTINUE; }; ].schemaInvalid; IF mismatchedSegment THEN WalnutDefs.Error[$db, $MismatchedSegment, reason]; IF inaccessible THEN ERROR WalnutDefs.Error[$db, $DatabaseInaccessible, reason]; EXITS abort => ERROR WalnutDefs.Error[$db, $TransactionAbort, "During open database"]; }; DB.Initialize[]; END.