<> <> <> <> <<>> <> <<>> <> <> <> DIRECTORY AlpFile USING [AccessFailed], AlpineFS USING [ErrorFromStream], BasicTime USING [GMT], FS USING [Error, BytesForPages], IO, RefText USING [line, AppendRope], Rope, RuntimeError USING [BoundsFault], ViewerTools USING [TiogaContentsRec, TiogaContents], WalnutDefs USING [Error], WalnutKernelDefs USING [LogEntry, WhichTempLog], WalnutLog USING [], WalnutMiscLog USING [CloseReadArchiveLog, CreateReadArchiveLog], WalnutRoot USING [AbortTempCopy, AcquireWriteLock, CommitAndContinue, CloseTransaction, FinishCopy, GetCurrentLogStreams, GetExpungeID, GetStreamsForCopy, PrepareToCopyTempLog, ReleaseWriteLock, StatsReport], WalnutStream; WalnutLogImpl: CEDAR PROGRAM IMPORTS AlpFile, AlpineFS, FS, IO, RefText, Rope, RuntimeError, WalnutDefs, WalnutMiscLog, WalnutRoot, WalnutStream EXPORTS WalnutLog = BEGIN <<>> <> <<>> <> <<>> <> GMT: TYPE = BasicTime.GMT; ROPE: TYPE = Rope.ROPE; STREAM: TYPE = IO.STREAM; LogEntry: TYPE = WalnutKernelDefs.LogEntry; TiogaContents: TYPE = ViewerTools.TiogaContents; <<>> <> <> <> <<$NotScanning>> <<$InvalidOperation>> <<$UnknownLogFileType>> <<$AccessFailed>> <<$LogInaccessible>> <<$TransactionAbort>> <<>> <<>> <> <<>> <<-- the read and write (and temp if open) streams are all under the same transaction; WalnutLogImpl must get them from WalnutRoot>> << >> writeStream: STREAM _ NIL; tempStream: STREAM _ NIL; -- when copying a tempFile readStream: STREAM _ NIL; -- only one log for now scanning: BOOL _ FALSE; currentEntryLengthHint: INT _ -1; currentEntryPos: INT _ -1; <<>> field1: REF TEXT = NEW[TEXT[RefText.line]]; field2: REF TEXT = NEW[TEXT[RefText.line]]; field3: REF TEXT = NEW[TEXT[RefText.line]]; <> <> ExpungeMsgs: PUBLIC PROC RETURNS[at, next: INT] = { [at, next] _ WriteEntry[WalnutStream.expungeMsgs] }; <<>> WriteExpungeLog: PUBLIC PROC RETURNS[at, next: INT] = { WalnutStream.writeExpungeLog.internalFileID _ WalnutRoot.GetExpungeID[]; [at, next] _ WriteEntry[WalnutStream.writeExpungeLog]; }; CreateMsgSet: PUBLIC PROC[name: ROPE] RETURNS[at, next: INT] = { field1.length _ 0; WalnutStream.createMsgSet.msgSet _ RefText.AppendRope[field1, name]; [at, next] _ WriteEntry[WalnutStream.createMsgSet] }; EmptyMsgSet: PUBLIC PROC[msgSet: ROPE] RETURNS[at, next: INT] = { field1.length _ 0; WalnutStream.emptyMsgSet.msgSet _ RefText.AppendRope[field1, msgSet]; [at, next] _ WriteEntry[WalnutStream.emptyMsgSet] }; DestroyMsgSet: PUBLIC PROC[msgSet: ROPE] RETURNS[at, next: INT] = { field1.length _ 0; WalnutStream.destroyMsgSet.msgSet _ RefText.AppendRope[field1, msgSet]; [at, next] _ WriteEntry[WalnutStream.destroyMsgSet] }; AddMsg: PUBLIC PROC[msg: ROPE, to: ROPE] RETURNS[at, next: INT] = { field1.length _ 0; WalnutStream.addMsg.msg _ RefText.AppendRope[field1, msg]; field2.length _ 0; WalnutStream.addMsg.to _ RefText.AppendRope[field2, to]; [at, next] _ WriteEntry[WalnutStream.addMsg] }; RemoveMsg: PUBLIC PROC[msg: ROPE, from: ROPE] RETURNS[at, next: INT] = { field1.length _ 0; WalnutStream.removeMsg.msg _ RefText.AppendRope[field1, msg]; field2.length _ 0; WalnutStream.removeMsg.from _ RefText.AppendRope[field2, from]; [at, next] _ WriteEntry[WalnutStream.removeMsg] }; MoveMsg: PUBLIC PROC[msg: ROPE, from, to: ROPE] RETURNS[at, next: INT] = { field1.length _ 0; WalnutStream.moveMsg.msg _ RefText.AppendRope[field1, msg]; field2.length _ 0; WalnutStream.moveMsg.from _ RefText.AppendRope[field2, from]; field3.length _ 0; WalnutStream.moveMsg.to _ RefText.AppendRope[field3, to]; [at, next] _ WriteEntry[WalnutStream.moveMsg] }; HasBeenRead: PUBLIC PROC[msg: ROPE] RETURNS[at, next: INT] = { field1.length _ 0; WalnutStream.hasbeenRead.msg _ RefText.AppendRope[field1, msg]; [at, next] _ WriteEntry[WalnutStream.hasbeenRead] }; RecordNewMailInfo: PUBLIC PROC[logLen: INT, when: GMT, server: ROPE, num: INT] RETURNS[at, next: INT] = { WalnutStream.recordNewMailInfo.logLen _ logLen; WalnutStream.recordNewMailInfo.when _ when; field1.length _ 0; WalnutStream.recordNewMailInfo.server _ RefText.AppendRope[field1, server]; WalnutStream.recordNewMailInfo.num _ num; [at, next] _ WriteEntry[WalnutStream.recordNewMailInfo] }; StartCopyNewMail: PUBLIC PROC RETURNS[at, next: INT] = { [at, next] _ WriteEntry[WalnutStream.startCopyNewMail] }; AcceptNewMail: PUBLIC PROC RETURNS[at, next: INT] = { [at, next] _ WriteEntry[WalnutStream.acceptNewMail] }; StartReadArchiveFile: PUBLIC PROC[file: ROPE, msgSet: ROPE] RETURNS[at, next: INT] = { field1.length _ 0; WalnutStream.startReadArchiveFile.file _ RefText.AppendRope[field1, file]; field2.length _ 0; WalnutStream.startReadArchiveFile.msgSet _ RefText.AppendRope[field2, msgSet]; [at, next] _ WriteEntry[WalnutStream.startReadArchiveFile] }; EndReadArchiveFile: PUBLIC PROC RETURNS[at, next: INT] = { [at, next] _ WriteEntry[WalnutStream.endReadArchiveFile] }; StartCopyReadArchive: PUBLIC PROC[] RETURNS[at, next: INT] = { [at, next] _ WriteEntry[WalnutStream.startCopyReadArchive] }; <> AcquireWriteLock: PUBLIC PROC = { Awl: PROC = { WalnutRoot.AcquireWriteLock[] }; CarefullyApply[Awl, "AcquireWriteLock"]; }; ForgetLogStreams: PUBLIC PROC = { readStream _ writeStream _ NIL }; CloseLogStreams: PUBLIC PROC = { WalnutRoot.CloseTransaction[]; readStream _ writeStream _ NIL; }; LogLength: PUBLIC PROC RETURNS[length: INT] = { Llen: PROC = { length _ writeStream.GetLength[] }; CarefullyApply[Llen, "LogLength"]; }; OpenLogStreams: PUBLIC PROC = { IF writeStream = NIL THEN { [readStream, writeStream] _ WalnutRoot.GetCurrentLogStreams[]; IF scanning THEN readStream.SetIndex[currentEntryPos]; }; }; ReleaseWriteLock: PUBLIC PROC = { IF writeStream = NIL THEN RETURN; [readStream, writeStream] _ WalnutRoot.ReleaseWriteLock[]; IF scanning THEN readStream.SetIndex[currentEntryPos]; }; ResetLog: PUBLIC PROC[newLength: INT] = { Rl: PROC = { writeStream.SetIndex[newLength]; WalnutStream.SetHighWaterMark[writeStream, newLength, -1]; writeStream.SetIndex[newLength]; writeStream.Flush[]; }; CarefullyApply[Rl, "ResetLog"]; }; ReturnCurrentLogStreams: PUBLIC PROC = { readStream _ writeStream _ NIL; scanning _ FALSE; }; ShutdownLog: PUBLIC PROC = { WalnutRoot.CloseTransaction[]; readStream _ writeStream _ NIL; scanning _ FALSE; }; <> WriteMessage: PUBLIC PROC[msg: ROPE, body: TiogaContents] RETURNS[at: INT]= { WriteMsg: PROC[] = { writeStream.SetIndex[writeStream.GetLength[]]; field1.length _ 0; WalnutStream.createMsg.msg _ RefText.AppendRope[field1, msg]; IF body.formatting.Length[] # 0 THEN { last: INT _ body.contents.Length[] - 1; IF body.contents.Fetch[last] = '\000 THEN -- NULL for padding { body.contents _ Rope.Substr[body.contents, 1, last-1]; body.formatting _ Rope.Concat["\000", body.formatting] } ELSE body.contents _ Rope.Substr[body.contents, 1]; }; WalnutStream.createMsg.textLen _ body.contents.Length[]; WalnutStream.createMsg.formatLen _ body.formatting.Length[]; at _ WalnutStream.WriteEntry[writeStream, WalnutStream.createMsg]; WalnutStream.WriteMsgBody[writeStream, body]; WalnutStream.FlushStream[strm: writeStream, setCreateDate: TRUE]; }; CarefullyApply[WriteMsg, "WriteMessage"]; }; <> SetPosition: PUBLIC PROC[startPos: INT] RETURNS[charsSkipped: INT] = { next: INT; SetPs: PROC[] = { readStream.SetIndex[startPos]; next _ WalnutStream.FindNextEntry[readStream]; currentEntryLengthHint _ -1; IF next # -1 THEN currentEntryPos _ next; scanning _ TRUE; }; CarefullyApply[SetPs, "SetPosition"]; RETURN[IF next = -1 THEN next ELSE next - startPos]; }; SetIndex: PUBLIC PROC[pos: INT] = { Si: PROC[] = { readStream.SetIndex[pos]; currentEntryLengthHint _ -1; currentEntryPos _ pos; scanning _ TRUE; }; CarefullyApply[Si, "SetIndex"]; }; NextAt: PUBLIC PROC RETURNS[at: INT] = { at _ currentEntryPos }; NextEntry: PUBLIC PROC RETURNS[le: LogEntry, at: INT] = { [le, at] _ NextEntryInternal[FALSE, "NextEntry"] }; NextEntryInternal: PROC[quick: BOOL, who: ROPE] RETURNS[le: LogEntry, at: INT] = { Ne: PROC[] = { length: INT; [le, length] _ WalnutStream.ReadEntry[readStream, quick]; at _ currentEntryPos; currentEntryLengthHint _ -1; IF length = -1 THEN { IF at = readStream.GetLength[] THEN RETURN; at _ -1; RETURN }; currentEntryPos _ at + length; }; IF ~scanning THEN ERROR WalnutDefs.Error[$log, $NotScanning]; CarefullyApply[Ne, who]; }; QuickScan: PUBLIC PROC RETURNS[le: LogEntry, at: INT] = { [le, at] _ NextEntryInternal[quick: TRUE, who: "QuickScan"] }; ArchiveEntry: PUBLIC PROC[to: IO.STREAM] RETURNS[ok: BOOL]= { <> Ane: PROC[] = { ok _ FALSE; IF currentEntryLengthHint = -1 THEN currentEntryLengthHint _ WalnutStream.PeekEntry[readStream].length; IF currentEntryLengthHint = -1 THEN RETURN; WalnutStream.CopyBytes[ from: readStream, to: to, num: currentEntryLengthHint]; currentEntryPos _ readStream.GetIndex[]; currentEntryLengthHint _ -1; -- unknown ok _ TRUE; }; IF ~scanning THEN ERROR WalnutDefs.Error[$log, $NotScanning]; CarefullyApply[Ane, "ArchiveEntry"]; }; CopyBytesToArchive: PUBLIC PROC[to: IO.STREAM, startPos, num: INT] = { Cba: PROC = { readStream.SetIndex[startPos]; WalnutStream.CopyBytes[from: readStream, to: to, num: num]; scanning _ FALSE; }; CarefullyApply[Cba, "CopyBytesToArchive"]; }; <> PrepareToCopyTempLog: PUBLIC PROC[which: WalnutKernelDefs.WhichTempLog, pagesAlreadyCopied: INT, reportProc: PROC[msg1, msg2, msg3: ROPE _ NIL]] RETURNS[BOOL] = { <> [writeStream, tempStream] _ WalnutRoot.PrepareToCopyTempLog[which, pagesAlreadyCopied, reportProc]; RETURN[tempStream # NIL]; }; CopyTempLog: PUBLIC PROC[which: WalnutKernelDefs.WhichTempLog, startCopyPos, fromPos: INT, reportProc: PROC[msg1, msg2, msg3: ROPE _ NIL] ] = { <> exp: ROPE; endLE: LogEntry; abort: BOOL _ TRUE; ok: BOOL _ FALSE; bytesToCopy: INT; bytesPerCopy: INT = FS.BytesForPages[200]; BEGIN ENABLE BEGIN IO.Error => { why: ATOM = AlpineFS.ErrorFromStream[stream].code; IF why = $transAborted THEN GOTO exit; IF why = $remoteCallFailed THEN { exp _ AlpineFS.ErrorFromStream[stream].explanation; GOTO exit; }; REJECT; }; FS.Error => { IF error.code = $transAborted THEN GOTO exit; IF error.code = $remoteCallFailed THEN { exp _ error.explanation; GOTO exit; }; REJECT; }; AlpFile.AccessFailed => { abort _ FALSE; GOTO exit }; END; SELECT which FROM newMail => { WalnutStream.endCopyNewMailInfo.startCopyPos _ startCopyPos; endLE _ WalnutStream.endCopyNewMailInfo; }; readArchive => { WalnutStream.endCopyReadArchiveInfo.startCopyPos _ startCopyPos; endLE _ WalnutStream.endCopyReadArchiveInfo }; ENDCASE => ERROR WalnutDefs.Error[$log, $UnknownLogFileType]; IF tempStream = NIL THEN [writeStream, tempStream] _ WalnutRoot.GetStreamsForCopy[which]; bytesToCopy _ tempStream.GetLength[] - fromPos; IF bytesToCopy > 0 THEN { first: BOOL _ TRUE; tempStream.SetIndex[fromPos]; writeStream.SetIndex[writeStream.GetLength[]]; DO thisCopy: INT _ MIN[bytesPerCopy, bytesToCopy]; WalnutStream.CopyBytes[to: writeStream, from: tempStream, num: thisCopy]; WalnutStream.FlushStream[writeStream, TRUE]; WalnutRoot.CommitAndContinue[]; IF first THEN first _ FALSE ELSE reportProc["#"]; bytesToCopy _ bytesToCopy - thisCopy; IF bytesToCopy = 0 THEN EXIT; ENDLOOP; }; [] _ WalnutStream.WriteEntry[writeStream, endLE]; WalnutStream.FlushStream[writeStream, TRUE]; EXITS exit => { who: ROPE _ IF which = newMail THEN "NewMail" ELSE "ReadArchive"; IF ~abort THEN ERROR WalnutDefs.Error[$log, $AccessFailed, who]; WalnutRoot.AbortTempCopy[which, tempStream]; WalnutRoot.CloseTransaction[]; writeStream _ readStream _ tempStream _ NIL; ERROR WalnutDefs.Error[ $log, IF exp = NIL THEN $TransactionAbort ELSE $RemoteCallFailed, IO.PutFR["Copying the %g log", IO.rope[who]] ]; }; END; tempStream _ NIL; WalnutRoot.FinishCopy[which]; }; FinishTempLogCopy: PUBLIC PROC[which: WalnutKernelDefs.WhichTempLog] = { WalnutRoot.FinishCopy[which] }; CreateArchiveLog: PUBLIC PROC[ fileToRead: STREAM, msgSet: ROPE, reportProc: PROC[msg1, msg2, msg3: ROPE _ NIL]] RETURNS[ok: BOOL] = { reason: ROPE; [ok, reason] _ WalnutMiscLog.CreateReadArchiveLog[fileToRead, msgSet, reportProc]; IF ok THEN [] _ WriteEntry[WalnutStream.endReadArchiveFile]; WalnutMiscLog.CloseReadArchiveLog[]; }; <> GetTiogaContents: PUBLIC PROC[textStart, textLen, formatLen: INT] RETURNS[contents: TiogaContents] = { TContents: PROC[] = { readStream.SetIndex[textStart]; scanning _ FALSE; contents _ NEW[ViewerTools.TiogaContentsRec]; contents.contents _ WalnutStream.ReadRope[readStream, textLen]; contents.formatting _ WalnutStream.ReadRope[readStream, formatLen]; }; IF formatLen # 0 THEN -- tioga formatting nonsense { textLen _ textLen + 1; textStart _ textStart - 1}; CarefullyApply[TContents, "GetTiogaContents"]; }; GetRefTextFromLog: PUBLIC PROC[startPos, length: INT, text: REF TEXT] = { natLen: NAT; Grf: PROC = { bytesRead: NAT; readStream.SetIndex[startPos]; scanning _ FALSE; bytesRead _ readStream.GetBlock[block: text, startIndex: 0, count: natLen]; IF bytesRead < natLen THEN text.length _ 0; }; CheckSize: PROC[len: INT] RETURNS[nat: NAT] = { nat _ len }; natLen _ CheckSize[length ! RuntimeError.BoundsFault => GOTO noGood]; CarefullyApply[Grf, "GetRefTextFromLog"]; EXITS noGood => ERROR WalnutDefs.Error[$log, $BadLength, IO.PutFR[" Can't convert %g into a nat", IO.int[length] ]]; }; <> WriteEntry: PROC[le: LogEntry] RETURNS[at, next: INT] = { We: PROC = { at _ WalnutStream.WriteEntry[writeStream, le]; WalnutStream.FlushStream[writeStream, TRUE]; next _ writeStream.GetIndex[]; }; CarefullyApply[We, "WriteEntry"]; }; curLog: ROPE = "On currentLog, during "; CarefullyApply: PROC[ proc: PROC[], who: ROPE ] = { exp: ROPE; BEGIN ENABLE BEGIN IO.Error => { why: ATOM = AlpineFS.ErrorFromStream[stream].code; IF why = $transAborted THEN GOTO aborted; IF why = $remoteCallFailed THEN { exp _ AlpineFS.ErrorFromStream[stream].explanation; GOTO failed; }; REJECT; }; FS.Error => { IF error.code = $transAborted THEN GOTO aborted; IF error.code = $remoteCallFailed THEN { exp _ error.explanation; GOTO failed; }; REJECT; }; AlpFile.AccessFailed => IF missingAccess = spaceQuota THEN GOTO outOfSpace ELSE GOTO exit; END; IF readStream = NIL THEN -- test read, not write stream, in case readOnly ERROR WalnutDefs.Error[$log, $LogNotOpen, curLog.Concat[who]]; proc[]; EXITS aborted => { WalnutRoot.StatsReport[Rope.Concat["\n**** trans abort on currentLog doing", who]]; ERROR WalnutDefs.Error[$log, $TransactionAbort, curLog.Concat[who]]; }; failed => { WalnutRoot.StatsReport[Rope.Concat["\n**** remote call failed on currentLog doing", who]]; ERROR WalnutDefs.Error[$log, $RemoteCallFailed, exp]; }; outOfSpace => ERROR WalnutDefs.Error[$log, $OutOfSpace, curLog.Concat[who]]; exit => ERROR WalnutDefs.Error[$log, $OtherAlpineMissingAccess, curLog.Concat[who]]; END; }; END.