DIRECTORY AlpFile USING [AccessFailed], AlpineFS USING [ErrorFromStream], BasicTime USING [GMT], FS USING [Error, BytesForPages], IO, RefText USING [line, AppendRope], Rope, RuntimeError USING [BoundsFault], ViewerTools USING [TiogaContentsRec, TiogaContents], WalnutDefs USING [Error], WalnutKernelDefs USING [LogEntry, WhichTempLog], WalnutLog USING [], WalnutMiscLog USING [CloseReadArchiveLog, CreateReadArchiveLog], WalnutRoot USING [AbortTempCopy, AcquireWriteLock, CommitAndContinue, CloseTransaction, FinishCopy, GetCurrentLogStreams, GetExpungeID, GetStreamsForCopy, PrepareToCopyTempLog, ReleaseWriteLock, StatsReport], WalnutStream; WalnutLogImpl: CEDAR PROGRAM IMPORTS AlpFile, AlpineFS, FS, IO, RefText, Rope, RuntimeError, WalnutDefs, WalnutMiscLog, WalnutRoot, WalnutStream EXPORTS WalnutLog = BEGIN GMT: TYPE = BasicTime.GMT; ROPE: TYPE = Rope.ROPE; STREAM: TYPE = IO.STREAM; LogEntry: TYPE = WalnutKernelDefs.LogEntry; TiogaContents: TYPE = ViewerTools.TiogaContents; writeStream: STREAM _ NIL; tempStream: STREAM _ NIL; -- when copying a tempFile readStream: STREAM _ NIL; -- only one log for now scanning: BOOL _ FALSE; currentEntryLengthHint: INT _ -1; currentEntryPos: INT _ -1; field1: REF TEXT = NEW[TEXT[RefText.line]]; field2: REF TEXT = NEW[TEXT[RefText.line]]; field3: REF TEXT = NEW[TEXT[RefText.line]]; ExpungeMsgs: PUBLIC PROC RETURNS[at, next: INT] = { [at, next] _ WriteEntry[WalnutStream.expungeMsgs] }; WriteExpungeLog: PUBLIC PROC RETURNS[at, next: INT] = { WalnutStream.writeExpungeLog.internalFileID _ WalnutRoot.GetExpungeID[]; [at, next] _ WriteEntry[WalnutStream.writeExpungeLog]; }; CreateMsgSet: PUBLIC PROC[name: ROPE] RETURNS[at, next: INT] = { field1.length _ 0; WalnutStream.createMsgSet.msgSet _ RefText.AppendRope[field1, name]; [at, next] _ WriteEntry[WalnutStream.createMsgSet] }; EmptyMsgSet: PUBLIC PROC[msgSet: ROPE] RETURNS[at, next: INT] = { field1.length _ 0; WalnutStream.emptyMsgSet.msgSet _ RefText.AppendRope[field1, msgSet]; [at, next] _ WriteEntry[WalnutStream.emptyMsgSet] }; DestroyMsgSet: PUBLIC PROC[msgSet: ROPE] RETURNS[at, next: INT] = { field1.length _ 0; WalnutStream.destroyMsgSet.msgSet _ RefText.AppendRope[field1, msgSet]; [at, next] _ WriteEntry[WalnutStream.destroyMsgSet] }; AddMsg: PUBLIC PROC[msg: ROPE, to: ROPE] RETURNS[at, next: INT] = { field1.length _ 0; WalnutStream.addMsg.msg _ RefText.AppendRope[field1, msg]; field2.length _ 0; WalnutStream.addMsg.to _ RefText.AppendRope[field2, to]; [at, next] _ WriteEntry[WalnutStream.addMsg] }; RemoveMsg: PUBLIC PROC[msg: ROPE, from: ROPE] RETURNS[at, next: INT] = { field1.length _ 0; WalnutStream.removeMsg.msg _ RefText.AppendRope[field1, msg]; field2.length _ 0; WalnutStream.removeMsg.from _ RefText.AppendRope[field2, from]; [at, next] _ WriteEntry[WalnutStream.removeMsg] }; MoveMsg: PUBLIC PROC[msg: ROPE, from, to: ROPE] RETURNS[at, next: INT] = { field1.length _ 0; WalnutStream.moveMsg.msg _ RefText.AppendRope[field1, msg]; field2.length _ 0; WalnutStream.moveMsg.from _ RefText.AppendRope[field2, from]; field3.length _ 0; WalnutStream.moveMsg.to _ RefText.AppendRope[field3, to]; [at, next] _ WriteEntry[WalnutStream.moveMsg] }; HasBeenRead: PUBLIC PROC[msg: ROPE] RETURNS[at, next: INT] = { field1.length _ 0; WalnutStream.hasbeenRead.msg _ RefText.AppendRope[field1, msg]; [at, next] _ WriteEntry[WalnutStream.hasbeenRead] }; RecordNewMailInfo: PUBLIC PROC[logLen: INT, when: GMT, server: ROPE, num: INT] RETURNS[at, next: INT] = { WalnutStream.recordNewMailInfo.logLen _ logLen; WalnutStream.recordNewMailInfo.when _ when; field1.length _ 0; WalnutStream.recordNewMailInfo.server _ RefText.AppendRope[field1, server]; WalnutStream.recordNewMailInfo.num _ num; [at, next] _ WriteEntry[WalnutStream.recordNewMailInfo] }; StartCopyNewMail: PUBLIC PROC RETURNS[at, next: INT] = { [at, next] _ WriteEntry[WalnutStream.startCopyNewMail] }; AcceptNewMail: PUBLIC PROC RETURNS[at, next: INT] = { [at, next] _ WriteEntry[WalnutStream.acceptNewMail] }; StartReadArchiveFile: PUBLIC PROC[file: ROPE, msgSet: ROPE] RETURNS[at, next: INT] = { field1.length _ 0; WalnutStream.startReadArchiveFile.file _ RefText.AppendRope[field1, file]; field2.length _ 0; WalnutStream.startReadArchiveFile.msgSet _ RefText.AppendRope[field2, msgSet]; [at, next] _ WriteEntry[WalnutStream.startReadArchiveFile] }; EndReadArchiveFile: PUBLIC PROC RETURNS[at, next: INT] = { [at, next] _ WriteEntry[WalnutStream.endReadArchiveFile] }; StartCopyReadArchive: PUBLIC PROC[] RETURNS[at, next: INT] = { [at, next] _ WriteEntry[WalnutStream.startCopyReadArchive] }; AcquireWriteLock: PUBLIC PROC = { Awl: PROC = { WalnutRoot.AcquireWriteLock[] }; CarefullyApply[Awl, "AcquireWriteLock"]; }; ForgetLogStreams: PUBLIC PROC = { readStream _ writeStream _ NIL }; CloseLogStreams: PUBLIC PROC = { WalnutRoot.CloseTransaction[]; readStream _ writeStream _ NIL; }; LogLength: PUBLIC PROC RETURNS[length: INT] = { Llen: PROC = { length _ writeStream.GetLength[] }; CarefullyApply[Llen, "LogLength"]; }; OpenLogStreams: PUBLIC PROC = { IF writeStream = NIL THEN { [readStream, writeStream] _ WalnutRoot.GetCurrentLogStreams[]; IF scanning THEN readStream.SetIndex[currentEntryPos]; }; }; ReleaseWriteLock: PUBLIC PROC = { IF writeStream = NIL THEN RETURN; [readStream, writeStream] _ WalnutRoot.ReleaseWriteLock[]; IF scanning THEN readStream.SetIndex[currentEntryPos]; }; ResetLog: PUBLIC PROC[newLength: INT] = { Rl: PROC = { writeStream.SetIndex[newLength]; WalnutStream.SetHighWaterMark[writeStream, newLength, -1]; writeStream.SetIndex[newLength]; writeStream.Flush[]; }; CarefullyApply[Rl, "ResetLog"]; }; ReturnCurrentLogStreams: PUBLIC PROC = { readStream _ writeStream _ NIL; scanning _ FALSE; }; ShutdownLog: PUBLIC PROC = { WalnutRoot.CloseTransaction[]; readStream _ writeStream _ NIL; scanning _ FALSE; }; WriteMessage: PUBLIC PROC[msg: ROPE, body: TiogaContents] RETURNS[at: INT]= { WriteMsg: PROC[] = { writeStream.SetIndex[writeStream.GetLength[]]; field1.length _ 0; WalnutStream.createMsg.msg _ RefText.AppendRope[field1, msg]; IF body.formatting.Length[] # 0 THEN { last: INT _ body.contents.Length[] - 1; IF body.contents.Fetch[last] = '\000 THEN -- NULL for padding { body.contents _ Rope.Substr[body.contents, 1, last-1]; body.formatting _ Rope.Concat["\000", body.formatting] } ELSE body.contents _ Rope.Substr[body.contents, 1]; }; WalnutStream.createMsg.textLen _ body.contents.Length[]; WalnutStream.createMsg.formatLen _ body.formatting.Length[]; at _ WalnutStream.WriteEntry[writeStream, WalnutStream.createMsg]; WalnutStream.WriteMsgBody[writeStream, body]; WalnutStream.FlushStream[strm: writeStream, setCreateDate: TRUE]; }; CarefullyApply[WriteMsg, "WriteMessage"]; }; SetPosition: PUBLIC PROC[startPos: INT] RETURNS[charsSkipped: INT] = { next: INT; SetPs: PROC[] = { readStream.SetIndex[startPos]; next _ WalnutStream.FindNextEntry[readStream]; currentEntryLengthHint _ -1; IF next # -1 THEN currentEntryPos _ next; scanning _ TRUE; }; CarefullyApply[SetPs, "SetPosition"]; RETURN[IF next = -1 THEN next ELSE next - startPos]; }; SetIndex: PUBLIC PROC[pos: INT] = { Si: PROC[] = { readStream.SetIndex[pos]; currentEntryLengthHint _ -1; currentEntryPos _ pos; scanning _ TRUE; }; CarefullyApply[Si, "SetIndex"]; }; NextAt: PUBLIC PROC RETURNS[at: INT] = { at _ currentEntryPos }; NextEntry: PUBLIC PROC RETURNS[le: LogEntry, at: INT] = { [le, at] _ NextEntryInternal[FALSE, "NextEntry"] }; NextEntryInternal: PROC[quick: BOOL, who: ROPE] RETURNS[le: LogEntry, at: INT] = { Ne: PROC[] = { length: INT; [le, length] _ WalnutStream.ReadEntry[readStream, quick]; at _ currentEntryPos; currentEntryLengthHint _ -1; IF length = -1 THEN { IF at = readStream.GetLength[] THEN RETURN; at _ -1; RETURN }; currentEntryPos _ at + length; }; IF ~scanning THEN ERROR WalnutDefs.Error[$log, $NotScanning]; CarefullyApply[Ne, who]; }; QuickScan: PUBLIC PROC RETURNS[le: LogEntry, at: INT] = { [le, at] _ NextEntryInternal[quick: TRUE, who: "QuickScan"] }; ArchiveEntry: PUBLIC PROC[to: IO.STREAM] RETURNS[ok: BOOL]= { Ane: PROC[] = { ok _ FALSE; IF currentEntryLengthHint = -1 THEN currentEntryLengthHint _ WalnutStream.PeekEntry[readStream].length; IF currentEntryLengthHint = -1 THEN RETURN; WalnutStream.CopyBytes[ from: readStream, to: to, num: currentEntryLengthHint]; currentEntryPos _ readStream.GetIndex[]; currentEntryLengthHint _ -1; -- unknown ok _ TRUE; }; IF ~scanning THEN ERROR WalnutDefs.Error[$log, $NotScanning]; CarefullyApply[Ane, "ArchiveEntry"]; }; CopyBytesToArchive: PUBLIC PROC[to: IO.STREAM, startPos, num: INT] = { Cba: PROC = { readStream.SetIndex[startPos]; WalnutStream.CopyBytes[from: readStream, to: to, num: num]; scanning _ FALSE; }; CarefullyApply[Cba, "CopyBytesToArchive"]; }; PrepareToCopyTempLog: PUBLIC PROC[which: WalnutKernelDefs.WhichTempLog, pagesAlreadyCopied: INT, reportProc: PROC[msg1, msg2, msg3: ROPE _ NIL]] RETURNS[BOOL] = { [writeStream, tempStream] _ WalnutRoot.PrepareToCopyTempLog[which, pagesAlreadyCopied, reportProc]; RETURN[tempStream # NIL]; }; CopyTempLog: PUBLIC PROC[which: WalnutKernelDefs.WhichTempLog, startCopyPos, fromPos: INT, reportProc: PROC[msg1, msg2, msg3: ROPE _ NIL] ] = { exp: ROPE; endLE: LogEntry; abort: BOOL _ TRUE; ok: BOOL _ FALSE; bytesToCopy: INT; bytesPerCopy: INT = FS.BytesForPages[200]; BEGIN ENABLE BEGIN IO.Error => { why: ATOM = AlpineFS.ErrorFromStream[stream].code; IF why = $transAborted THEN GOTO exit; IF why = $remoteCallFailed THEN { exp _ AlpineFS.ErrorFromStream[stream].explanation; GOTO exit; }; REJECT; }; FS.Error => { IF error.code = $transAborted THEN GOTO exit; IF error.code = $remoteCallFailed THEN { exp _ error.explanation; GOTO exit; }; REJECT; }; AlpFile.AccessFailed => { abort _ FALSE; GOTO exit }; END; SELECT which FROM newMail => { WalnutStream.endCopyNewMailInfo.startCopyPos _ startCopyPos; endLE _ WalnutStream.endCopyNewMailInfo; }; readArchive => { WalnutStream.endCopyReadArchiveInfo.startCopyPos _ startCopyPos; endLE _ WalnutStream.endCopyReadArchiveInfo }; ENDCASE => ERROR WalnutDefs.Error[$log, $UnknownLogFileType]; IF tempStream = NIL THEN [writeStream, tempStream] _ WalnutRoot.GetStreamsForCopy[which]; bytesToCopy _ tempStream.GetLength[] - fromPos; IF bytesToCopy > 0 THEN { first: BOOL _ TRUE; tempStream.SetIndex[fromPos]; writeStream.SetIndex[writeStream.GetLength[]]; DO thisCopy: INT _ MIN[bytesPerCopy, bytesToCopy]; WalnutStream.CopyBytes[to: writeStream, from: tempStream, num: thisCopy]; WalnutStream.FlushStream[writeStream, TRUE]; WalnutRoot.CommitAndContinue[]; IF first THEN first _ FALSE ELSE reportProc["#"]; bytesToCopy _ bytesToCopy - thisCopy; IF bytesToCopy = 0 THEN EXIT; ENDLOOP; }; [] _ WalnutStream.WriteEntry[writeStream, endLE]; WalnutStream.FlushStream[writeStream, TRUE]; EXITS exit => { who: ROPE _ IF which = newMail THEN "NewMail" ELSE "ReadArchive"; IF ~abort THEN ERROR WalnutDefs.Error[$log, $AccessFailed, who]; WalnutRoot.AbortTempCopy[which, tempStream]; WalnutRoot.CloseTransaction[]; writeStream _ readStream _ tempStream _ NIL; ERROR WalnutDefs.Error[ $log, IF exp = NIL THEN $TransactionAbort ELSE $RemoteCallFailed, IO.PutFR["Copying the %g log", IO.rope[who]] ]; }; END; tempStream _ NIL; WalnutRoot.FinishCopy[which]; }; FinishTempLogCopy: PUBLIC PROC[which: WalnutKernelDefs.WhichTempLog] = { WalnutRoot.FinishCopy[which] }; CreateArchiveLog: PUBLIC PROC[ fileToRead: STREAM, msgSet: ROPE, reportProc: PROC[msg1, msg2, msg3: ROPE _ NIL]] RETURNS[ok: BOOL] = { reason: ROPE; [ok, reason] _ WalnutMiscLog.CreateReadArchiveLog[fileToRead, msgSet, reportProc]; IF ok THEN [] _ WriteEntry[WalnutStream.endReadArchiveFile]; WalnutMiscLog.CloseReadArchiveLog[]; }; GetTiogaContents: PUBLIC PROC[textStart, textLen, formatLen: INT] RETURNS[contents: TiogaContents] = { TContents: PROC[] = { readStream.SetIndex[textStart]; scanning _ FALSE; contents _ NEW[ViewerTools.TiogaContentsRec]; contents.contents _ WalnutStream.ReadRope[readStream, textLen]; contents.formatting _ WalnutStream.ReadRope[readStream, formatLen]; }; IF formatLen # 0 THEN -- tioga formatting nonsense { textLen _ textLen + 1; textStart _ textStart - 1}; CarefullyApply[TContents, "GetTiogaContents"]; }; GetRefTextFromLog: PUBLIC PROC[startPos, length: INT, text: REF TEXT] = { natLen: NAT; Grf: PROC = { bytesRead: NAT; readStream.SetIndex[startPos]; scanning _ FALSE; bytesRead _ readStream.GetBlock[block: text, startIndex: 0, count: natLen]; IF bytesRead < natLen THEN text.length _ 0; }; CheckSize: PROC[len: INT] RETURNS[nat: NAT] = { nat _ len }; natLen _ CheckSize[length ! RuntimeError.BoundsFault => GOTO noGood]; CarefullyApply[Grf, "GetRefTextFromLog"]; EXITS noGood => ERROR WalnutDefs.Error[$log, $BadLength, IO.PutFR[" Can't convert %g into a nat", IO.int[length] ]]; }; WriteEntry: PROC[le: LogEntry] RETURNS[at, next: INT] = { We: PROC = { at _ WalnutStream.WriteEntry[writeStream, le]; WalnutStream.FlushStream[writeStream, TRUE]; next _ writeStream.GetIndex[]; }; CarefullyApply[We, "WriteEntry"]; }; curLog: ROPE = "On currentLog, during "; CarefullyApply: PROC[ proc: PROC[], who: ROPE ] = { exp: ROPE; BEGIN ENABLE BEGIN IO.Error => { why: ATOM = AlpineFS.ErrorFromStream[stream].code; IF why = $transAborted THEN GOTO aborted; IF why = $remoteCallFailed THEN { exp _ AlpineFS.ErrorFromStream[stream].explanation; GOTO failed; }; REJECT; }; FS.Error => { IF error.code = $transAborted THEN GOTO aborted; IF error.code = $remoteCallFailed THEN { exp _ error.explanation; GOTO failed; }; REJECT; }; AlpFile.AccessFailed => IF missingAccess = spaceQuota THEN GOTO outOfSpace ELSE GOTO exit; END; IF readStream = NIL THEN -- test read, not write stream, in case readOnly ERROR WalnutDefs.Error[$log, $LogNotOpen, curLog.Concat[who]]; proc[]; EXITS aborted => { WalnutRoot.StatsReport[Rope.Concat["\n**** trans abort on currentLog doing", who]]; ERROR WalnutDefs.Error[$log, $TransactionAbort, curLog.Concat[who]]; }; failed => { WalnutRoot.StatsReport[Rope.Concat["\n**** remote call failed on currentLog doing", who]]; ERROR WalnutDefs.Error[$log, $RemoteCallFailed, exp]; }; outOfSpace => ERROR WalnutDefs.Error[$log, $OutOfSpace, curLog.Concat[who]]; exit => ERROR WalnutDefs.Error[$log, $OtherAlpineMissingAccess, curLog.Concat[who]]; END; }; END. ÒWalnutLogImpl.mesa Copyright c 1984 by Xerox Corporation. All rights reserved. Willie-Sue, November 7, 1985 12:25:38 pm PST Donahue, January 22, 1986 8:04:31 am PST Walnut Log Operations Implementation Last Edited by: Willie-Sue, January 10, 1985 12:27:10 pm PST Last Edited by: Wert, August 31, 1984 9:49:21 pm PDT Last Edited by: Donahue, December 11, 1984 10:37:45 am PST Plan The log streams provide an abstraction for WalnutOps use in handling the log. WalnutOps procedures deal only with the notion of writing an entry on the current log and reading the entry from a specified position. The log position for a write is always at the end of the current log. Types Signals WalnutDefs.Error: ERROR[who, code: ATOM, explanation: ROPE _ NIL] = CODE; Raised for a variety of reasons: the current set of codes used by WalnutLogImpl include $NotScanning $InvalidOperation $UnknownLogFileType $AccessFailed $LogInaccessible $TransactionAbort Variables -- the read and write (and temp if open) streams are all under the same transaction; WalnutLogImpl must get them from WalnutRoot Procedures Logging of operations that perform actions on the Walnut database Managing the current log Write a message on the current log Parsing a log (used by Archive and Replay/Scavenge Copies the next entry of the current log to an archive stream. Copying NewMail/ReadArchive logs makes sure the writeStream is long enought to copy the which log onto copies the which templog, starting at fromPos, to the end of the writeStream; writes an EndCopy log entry, and sets the length of which to 0; if the operation fails, WalnutDefs.Error is raised; NOT done inside carefullyApply, so that it can catch transaction aborts Utilities Local Utilities Êü˜šÏb™Jšœ Ïmœ1™Jšœ˜Jšœ?˜?Jšœ1˜1J˜—J˜š¡œŸœŸœ ŸœŸœŸœŸœ˜NJšŸœ Ÿœ˜Jšœ/˜/Jšœ+˜+Jšœ˜JšœK˜KJšœ)˜)Jšœ7˜7Jšœ˜—J˜š ¡œŸœŸœŸœ Ÿœ˜6Jšœ;˜;—J˜š ¡ œŸœŸœŸœ Ÿœ˜3Jšœ8˜8—J˜š¡œŸœŸœŸœ ŸœŸœ Ÿœ˜VJšœ˜JšœJ˜JJšœ˜JšœN˜NJšœ:˜:J˜—J˜š¡œŸœŸ œ Ÿœ˜8Jšœ=˜=—J˜š ¡œŸœŸœŸœ Ÿœ˜˜>JšŸœ Ÿœ&˜6Jšœ˜—J˜—J˜š¡œŸœŸœ˜!JšŸœŸœŸœŸœ˜!Jšœ:˜:JšŸœ Ÿœ&˜6J˜—J˜š¡œŸœŸœ Ÿœ˜)š¡œŸœ˜ Jšœ ˜ Jšœ:˜:Jšœ ˜ Jšœ˜J˜—J˜J˜—J˜š¡œŸœŸœ˜(JšœŸœ˜Jšœ Ÿœ˜Jšœ˜—J˜š¡ œŸœŸœ˜Jšœ˜JšœŸœ˜Jšœ Ÿœ˜Jšœ˜——š"™"š ¡ œŸœŸœŸœŸœŸœ˜Mš¡œŸœ˜Jšœ.˜.Jšœ˜Jšœ=˜=šŸœŸœ˜&JšœŸœ˜'šŸœ#Ÿœ ˜>šœ8˜8J˜6—J˜—JšŸœ/˜3J˜—Jšœ8˜8Jšœ<˜š¡œŸœ˜JšœŸœ˜ šŸœŸ˜#šœ˜Jšœ*˜*——JšŸœŸœŸœ˜+šœ˜Jšœ7˜7—Jšœ(˜(Jšœ  ˜(JšœŸœ˜ Jšœ˜—JšŸœ Ÿ œ&˜=Jšœ$˜$J˜—J˜š ¡œŸœŸœŸœŸœŸœ˜Fš¡œŸœ˜ Jšœ˜Jšœ;˜;Jšœ Ÿœ˜J˜—Jšœ*˜*J˜——š ™ J˜š¡œŸœŸœ;ŸœŸœŸœŸœŸœŸœ˜¢JšœE™EJšœc˜cJšŸœŸœ˜J˜—J˜š¡ œŸœŸœ>Ÿœ ŸœŸœŸœ˜Jšœ‰™‰JšœŸœ˜ J˜JšœŸœŸœ˜JšœŸœŸœ˜Jšœ Ÿœ˜JšœŸœŸœ˜*šŸœŸœŸ˜šŸœ Ÿ˜ JšœŸœ)˜2JšŸœŸœŸœ˜&šŸœŸœ˜!Jšœ3˜3JšŸœ˜ Jšœ˜—JšŸœ˜J˜—šŸœ Ÿœ˜ JšŸœŸœŸœ˜-šŸœ Ÿœ˜(Jšœ˜JšŸœ˜ Jšœ˜—JšŸœ˜J˜—Jšœ"ŸœŸœ˜5JšŸœ˜J˜šŸœŸ˜˜ J˜—J˜J˜J˜šŸ˜šœ ˜ JšœS˜SJšŸœ?˜DJ˜—šœ ˜ JšœZ˜ZJšŸœ0˜5J˜—šœ ˜ JšŸœ9˜>—šœ˜JšŸœG˜L——JšŸœ˜J˜—J˜—J˜JšŸœ˜——…—7ÚQ¨