DIRECTORY BasicTime USING [GMT, nullGMT, Now], FS USING [BytesForPages, Error, <> <> PagesForBytes, StreamOpen], IO, LoganBerry USING [Error], LoganBerryEntry USING [I2V], Process USING [PauseMsec], RefText USING [MaxLen, New, TrustTextAsRope], Rope, -- UserCredentials USING [Get], TapFilter USING [Agent, CreateAgent, Error, IsAgentIdle, MonitorAgent, MonitorProc, ParseMsgIntoFields, WakeupAgent, msgID, seqNum], TapMsgQueue USING [Create, Msg, Put], UserProfile USING [Boolean, Token], ViewerClasses USING [Viewer], ViewerIO USING [CreateViewerStreams, GetViewerFromStream], ViewerOps USING [FindViewer], WalnutDB --using lots-- , WalnutDefs USING [CheckReportProc, dontCareMsgSetVersion, Error, LogInfo, MsgSet, VersionMismatch, WalnutOpsHandle], WalnutKernelDefs USING [LogEntry, LogExpungePhase, MsgLogEntry, WhichTempLog], WalnutLog --using lots-- , WalnutLogExpunge --using lots-- , WalnutMiscLog USING [CreateReadArchiveLog, GetNewMailLog, walnutItemFixedLength], WalnutOps, WalnutOpsInternal, WalnutRegistryPrivate USING [NotifyForEvent], WalnutRoot USING [AbortTransaction, AcquireWriteLock, CommitAndContinue, GetStreamsForExpunge, Open, RegisterStatsProc, RootHandle, RootHandleRec, Shutdown, StartTransaction, StatsReport, SwapLogs, UnregisterStatsProc], WalnutStream USING [Open]; WalnutOpsInternalImpl: CEDAR PROGRAM IMPORTS BasicTime, FS, IO, LoganBerryEntry, Process, RefText, Rope, <> UserProfile, ViewerIO, ViewerOps, LoganBerry, TapFilter, TapMsgQueue, WalnutDB, WalnutDefs, WalnutLog, WalnutLogExpunge, WalnutMiscLog, WalnutOps, WalnutOpsInternal, WalnutRegistryPrivate, WalnutRoot, WalnutStream EXPORTS WalnutDefs, WalnutOpsInternal = BEGIN OPEN WalnutOps, WalnutOpsInternal; ROPE: TYPE = Rope.ROPE; STREAM: TYPE = IO.STREAM; GMT: TYPE = BasicTime.GMT; CheckReportProc: TYPE = WalnutDefs.CheckReportProc; LogInfo: TYPE = WalnutDefs.LogInfo; MsgSet: TYPE = WalnutDefs.MsgSet; WalnutOpsHandle: TYPE = WalnutDefs.WalnutOpsHandle; RootHandle: TYPE = WalnutRoot.RootHandle; RootHandleRec: PUBLIC TYPE = WalnutRoot.RootHandleRec; KernelHandle: TYPE = WalnutOpsInternal.KernelHandle; KernelHandleRec: PUBLIC TYPE = WalnutOpsInternal.KernelHandleRec; WhichTempLog: TYPE = WalnutKernelDefs.WhichTempLog; OutCome: TYPE = { done, couldntRestart }; heavyhandedDebugging: BOOL ¬ FALSE; debugging: BOOL ¬ FALSE; DBErrorSeen: SIGNAL = CODE; WDErrorSeen: SIGNAL = CODE; defaultLogLength: INT ¬ FS.BytesForPages[10000]; -- five megabytes expungeCommitFrequency: INT ¬ 200; commitFrequency: INT ¬ expungeCommitFrequency; shutdownFrequency: INT ¬ 1200; parseCommitFrequency: INT ¬ 200; dontCareMsgSetVersion: INT = WalnutOps.dontCareMsgSetVersion; dontCareDomainVersion: INT = WalnutOps.dontCareDomainVersion; feederSeqNum: INT ¬ 0; StartStatsReporting: PUBLIC PROC[opsH: WalnutOpsHandle] = { which: ROPE = "WalnutLB"; kH: KernelHandle = opsH.kernelHandle; statsFile: ROPE ¬ "/tmp/WalnutLB.log"; IF kH.statsStream # NIL THEN kH.statsStream.Close[ ! IO.Error => CONTINUE]; kH.statsStream ¬ FS.StreamOpen[fileName: statsFile, accessOptions: $create, keep: 4]; IF heavyhandedDebugging THEN { name: ROPE = Rope.Concat[which, " Stats"]; kH.statsProgressTS ¬ ViewerOps.FindViewer[name]; kH.statsProgressStream ¬ ViewerIO.CreateViewerStreams[name, kH.statsProgressTS].out; IF kH.statsProgressTS = NIL THEN kH.statsProgressTS ¬ ViewerIO.GetViewerFromStream[kH.statsProgressStream]; kH.statsProgressTS.inhibitDestroy ¬ TRUE; }; WalnutRoot.RegisterStatsProc[opsH, StatsReport]; }; StatsReport: PUBLIC WalnutDefs.CheckReportProc = { r: ROPE = "@ %g\n"; thyme: GMT = BasicTime.Now[]; strm: STREAM; IF (strm ¬ opsH.kernelHandle.statsStream) = NIL THEN RETURN; strm.PutRope["\n "]; strm.PutF[format, v1, v2, v3]; strm.PutF1[r, [time[thyme]] ]; IF (strm ¬ opsH.kernelHandle.statsProgressStream) # NIL THEN { strm.PutRope["\n "]; strm.PutF[format, v1, v2, v3]; strm.PutF1[r, [time[thyme]] ]; }; }; CarefullyApply: PUBLIC PROC[opsH: WalnutOpsHandle, proc: PROC[], didUpdate: BOOL] = { reTryCount: INT ¬ 2; schemaInvalid: BOOL ¬ TRUE; eCode: ATOM; exp: ROPE; opsH.kernelHandle.recentActivity ¬ TRUE; IF NOT opsH.kernelHandle.started THEN ERROR WalnutDefs.Error[$root, $NotStarted, "Must do WalnutOps.Startup"]; DO BEGIN ENABLE { WalnutDefs.Error => { IF debugging THEN SIGNAL WDErrorSeen; opsH.kernelHandle.errorInProgress ¬ TRUE; }; LoganBerry.Error => { IF debugging THEN SIGNAL DBErrorSeen; eCode ¬ ec; exp ¬ explanation; GOTO dbError }; }; IF opsH.rootHandle.currentLog.readStream = NIL THEN ERROR WalnutDefs.Error[$log, $LogNotOpen]; proc[]; IF didUpdate THEN WalnutRoot.CommitAndContinue[opsH]; RETURN; EXITS dbError => { opsH.kernelHandle.errorInProgress ¬ TRUE; WalnutRoot.StatsReport[opsH, "\n ***DB error - code: %g, exp: %g ", [atom[eCode]], [rope[exp]] ]; ERROR WalnutDefs.Error[$db, eCode, exp]; }; END; ENDLOOP; }; LongRunningApply: PUBLIC PROC[opsH: WalnutOpsHandle, proc: PROC[inProgress: BOOL]] = { reTryCount: INT ¬ 2; alreadyCalled: BOOL ¬ FALSE; schemaInvalid: BOOL ¬ TRUE; eCode: ATOM; exp: ROPE; opsH.kernelHandle.recentActivity ¬ TRUE; opsH.kernelHandle.isShutdown ¬ FALSE; IF NOT opsH.kernelHandle.started THEN ERROR WalnutDefs.Error[$root, $NotStarted, "Must do WalnutOps.Startup"]; DO BEGIN ENABLE { WalnutDefs.Error => { IF debugging THEN SIGNAL WDErrorSeen; opsH.kernelHandle.errorInProgress ¬ TRUE; }; LoganBerry.Error => { IF debugging THEN SIGNAL DBErrorSeen; eCode ¬ ec; exp ¬ explanation; GOTO dbError }; }; IF opsH.rootHandle.currentLog.readStream = NIL THEN ERROR WalnutDefs.Error[$log, $LogNotOpen]; WalnutRoot.AcquireWriteLock[opsH]; proc[alreadyCalled ! WalnutDefs.VersionMismatch => { ReleaseWriteLock[opsH]; REJECT} ]; WalnutDB.SetOpInProgressPos[opsH, -1]; ReleaseWriteLock[opsH]; -- also does commit RETURN; EXITS dbError => { opsH.kernelHandle.errorInProgress ¬ TRUE; WalnutRoot.StatsReport[opsH, "\n ***DB error - code: %g, exp: %g ", [atom[eCode]], [rope[exp]] ]; ERROR WalnutDefs.Error[$db, eCode, exp]; }; END; ENDLOOP; }; Restart: PUBLIC PROC[opsH: WalnutOpsHandle] = { -- need to re-open the transaction schemaInvalid: BOOL = WalnutRoot.StartTransaction[opsH]; IF schemaInvalid THEN [] ¬ WalnutDB.InitSchema[opsH, schemaInvalid]; WalnutLog.OpenLogStreams[opsH]; opsH.kernelHandle.isShutdown ¬ FALSE; }; ReleaseWriteLock: PROC[opsH: WalnutOpsHandle] = { WalnutRoot.CommitAndContinue[opsH]; WalnutLog.ReleaseWriteLock[opsH]; }; CheckReport: PUBLIC CheckReportProc = { IF opsH.kernelHandle.reporterList = NIL THEN RETURN; FOR rL: LIST OF STREAM ¬ opsH.kernelHandle.reporterList, rL.rest UNTIL rL= NIL DO rL.first.PutF[format, v1, v2, v3]; ENDLOOP; }; InternalShutdown: PUBLIC PROC[opsH: WalnutOpsHandle] = { kH: KernelHandle = opsH.kernelHandle; WalnutRoot.UnregisterStatsProc[opsH, StatsReport]; IF kH.errorInProgress THEN WalnutRoot.AbortTransaction[opsH]; WalnutLogExpunge.EndExpunge[opsH]; -- clear its variables WalnutLog.ShutdownLog[opsH]; WalnutRoot.Shutdown[opsH]; -- takes care of database StatsReport[opsH, "\n *** Shutdown"]; IF kH.statsProgressTS # NIL THEN { kH.statsProgressTS.inhibitDestroy ¬ FALSE }; IF kH.statsStream # NIL THEN { kH.statsStream.Close[]; kH.statsStream ¬ NIL }; IF kH.statsProgressStream # NIL THEN { kH.statsProgressStream.Close[]; kH.statsProgressStream ¬ NIL }; kH.started ¬ FALSE; kH.recentActivity¬ FALSE; kH.isShutdown ¬ TRUE; kH.errorInProgress ¬ FALSE; kH.mailStream ¬ NIL; WalnutRegistryPrivate.NotifyForEvent[stopped]; }; ComputeMaxExpungeLogPages: PUBLIC PROC[opsH: WalnutOpsHandle] RETURNS[pages: INT] = { bytesInDestroyedMsgs: INT = WalnutDB.GetExpungeInfo[opsH].bytesInDestroyedMsgs; logLen: INT = WalnutLog.LogLength[opsH]; RETURN[FS.PagesForBytes[logLen-bytesInDestroyedMsgs]]; }; DoLogExpunge: PUBLIC PROC[opsH: WalnutOpsHandle] = { DO expPhase: WalnutKernelDefs.LogExpungePhase ¬ WalnutDB.GetLogExpungePhase[opsH]; SELECT expPhase FROM idle => { StatsReport[opsH, "\n ~~~ Starting log expunge"]; WalnutDB.SetLogExpungePhase[opsH, initializingExpungeLog]; WalnutRoot.CommitAndContinue[opsH]; commitFrequency ¬ expungeCommitFrequency; }; initializingExpungeLog => { expungeFileID: INT; pagesNeeded: INT = ComputeMaxExpungeLogPages[opsH]; expungeFileID ¬ WalnutLogExpunge.StartExpunge[opsH, pagesNeeded]; WalnutDB.SetExpungeFileID[opsH, expungeFileID]; WalnutDB.SetLogExpungePhase[opsH, writingExpungeLog]; WalnutRoot.CommitAndContinue[opsH]; }; writingExpungeLog => { [] ¬ WriteExpungeLog[opsH]; [] ¬ WalnutLogExpunge.EndExpunge[opsH]; WalnutDB.SetLogExpungePhase[opsH, swappingLogs]; WalnutRoot.CommitAndContinue[opsH]; }; swappingLogs => { expungeFileID: INT ¬ WalnutDB.GetExpungeFileID[opsH]; date: BasicTime.GMT; logLen: INT; [] ¬ WalnutRoot.GetStreamsForExpunge[opsH: opsH, starting: FALSE, pagesWanted: -1]; StatsReport[opsH, "\n Swapping Log Files\n"]; WalnutDB.SetExpungeProgressInfo[opsH, 0,0]; WalnutDB.SetExpungeInfo[opsH, 0, 0]; WalnutDB.SetOpInProgressPos[opsH, -1]; WalnutDB.SetLogExpungePhase[opsH, idle]; -- whew date ¬ BasicTime.Now[]; WalnutDB.SetTimeOfLastExpunge[opsH, date]; logLen ¬ WalnutRoot.SwapLogs[opsH, expungeFileID, date]; WalnutDB.SetRootFileVersion[opsH, opsH.rootHandle.createDate]; WalnutRoot.CommitAndContinue[opsH]; StatsReport[opsH, "\n ~~~ Finished expunge"]; WalnutLog.OpenLogStreams[opsH]; RETURN; }; ENDCASE => ERROR; ENDLOOP; }; ExpungeLogResult: TYPE = { unknown, couldntRestart, finished }; WriteExpungeLog: PROC[opsH: WalnutOpsHandle] RETURNS[result: ExpungeLogResult] = { numMsgs: INT ¬ 0; currentLogPos, expungeLogPos: INT; previousAt, at: INT; startRetryCount: INT = 2; retryCount: INT ¬ startRetryCount; bytesToCopyBeforeFlush: INT ¬ FS.BytesForPages[500]; lastExpungeCommitLength: INT ¬ -1; oldLength: INT = WalnutLog.LogLength[opsH]; DO BEGIN ENABLE WalnutDefs.Error => { IF debugging THEN SIGNAL WDErrorSeen; IF code = $TransactionAbort THEN GOTO retry ELSE REJECT; }; bytesCopiedSinceLastCommit, numMsgsSinceShutdown: INT ¬ 0; lastAcceptNewMailPos: INT; firstDestroyedMsgPos: INT ¬ WalnutDB.GetExpungeInfo[opsH].firstDestroyedMsgPos; [currentLogPos, expungeLogPos] ¬ WalnutDB.GetExpungeProgressInfo[opsH]; IF lastExpungeCommitLength = expungeLogPos THEN { IF (retryCount ¬ retryCount - 1) <= 0 THEN ERROR WalnutDefs.Error[$log, $TooManyLogAborts, IO.PutFR1["\n During Expunge - last commit pos in expungeLog was %g", [integer[expungeLogPos]]] ]; } ELSE { lastExpungeCommitLength ¬ expungeLogPos; retryCount ¬ startRetryCount; }; IF ~WalnutLogExpunge.RestartExpunge[opsH, currentLogPos, expungeLogPos] THEN { pagesNeeded: INT ¬ FS.PagesForBytes[WalnutLogExpunge.ExpLogLength[opsH] - WalnutDB.GetExpungeInfo[opsH].bytesInDestroyedMsgs]; CheckReport[opsH, "\n ****Could not restart expunge from %g in expungeLog, %g in currentLog - restarting Log Expunge\n", [integer[expungeLogPos]], [integer[currentLogPos]] ]; expungeLogPos ¬ currentLogPos ¬ 0; WalnutDB.SetExpungeProgressInfo[opsH, currentLogPos, expungeLogPos]; -- commits [] ¬ WalnutLogExpunge.StartExpunge[opsH, pagesNeeded]; -- try again StatsReport[opsH, "Restarting expunge from beginning"]; IF ~WalnutLogExpunge.RestartExpunge[opsH, currentLogPos, expungeLogPos] THEN WalnutDefs.Error[$log, $CantStartExpunge]; lastExpungeCommitLength ¬ expungeLogPos; }; IF currentLogPos < firstDestroyedMsgPos THEN { -- copyingHead phase bytesLeftToCopyInHead: INT ¬ firstDestroyedMsgPos - currentLogPos; bytesToCopyAtOneTime: INT = FS.BytesForPages[400]; IF currentLogPos = 0 THEN { WalnutLogExpunge.SetIndex[opsH, 0]; [] ¬ WalnutLogExpunge.SkipEntry[opsH]; -- need to skip the LogFileInfo entry bytesLeftToCopyInHead ¬ bytesLeftToCopyInHead - WalnutLogExpunge.GetIndex[opsH]; }; CheckReport[opsH, "Copying the head of the log, starting at bytePos %g, ending at bytePos %g\n\t", [integer[currentLogPos]], [integer[firstDestroyedMsgPos]] ]; DO bytesToCopyThisTime: INT ¬ MIN[bytesLeftToCopyInHead, bytesToCopyAtOneTime]; WalnutLogExpunge.CopyBytesToExpungeLog[opsH, bytesToCopyThisTime]; [currentLogPos, expungeLogPos] ¬ WalnutLogExpunge.GetExpungeProgress[opsH]; WalnutDB.SetExpungeProgressInfo[opsH, currentLogPos, expungeLogPos]; WalnutRoot.CommitAndContinue[opsH]; CheckReport[opsH, "#"]; bytesLeftToCopyInHead ¬ bytesLeftToCopyInHead - bytesToCopyThisTime; IF bytesLeftToCopyInHead = 0 THEN EXIT; ENDLOOP; retryCount ¬ startRetryCount; -- in case has abort during copy }; lastAcceptNewMailPos ¬ WalnutDB.GetAcceptNewMailPos[opsH]; WalnutLogExpunge.SetIndex[opsH, at ¬ currentLogPos]; -- before doing PeekEntry CheckReport[opsH, "\nProcessing the tail of the log, starting at bytePos %g (old length %g bytes, %g pages)\n", [integer[at]], [integer[oldLength]], [integer[FS.PagesForBytes[oldLength]]] ]; DO ident: ATOM; msgID: ROPE; bytesThisCopy: INT ¬ 0; previousAt ¬ at; [ident, msgID, at] ¬ WalnutLogExpunge.PeekEntry[opsH]; IF ident = NIL THEN IF at # -1 THEN EXIT -- end of log ELSE { charSkipped: INT ¬ WalnutLogExpunge.SetPosition[opsH, previousAt+1]; IF charSkipped = -1 THEN ERROR WalnutDefs.Error[$log, $DuringExpunge, IO.PutFR1["No entry found after %g", [integer[previousAt]] ]]; at ¬ previousAt + charSkipped; LOOP; }; SELECT ident FROM $ExpungeMsgs => [] ¬ WalnutLogExpunge.SkipEntry[opsH]; $WriteExpungeLog => { result ¬ finished; EXIT }; -- last entry on the Log $LogFileInfo => [] ¬ WalnutLogExpunge.SkipEntry[opsH]; $RecordNewMailInfo, $StartCopyNewMail => IF at < lastAcceptNewMailPos THEN [] ¬ WalnutLogExpunge.SkipEntry[opsH] ELSE bytesThisCopy ¬ WalnutLogExpunge.CopyEntry[opsH].bytesCopied; $EndCopyNewMailInfo => { IF at < lastAcceptNewMailPos THEN { -- maybe skip startCopyPos: INT ¬ WalnutLogExpunge.EndCopyEntry[opsH]; IF firstDestroyedMsgPos < startCopyPos THEN [] ¬ WalnutLogExpunge.SkipEntry[opsH] ELSE bytesThisCopy ¬ WalnutLogExpunge.CopyEntry[opsH].bytesCopied; } ELSE bytesThisCopy ¬ WalnutLogExpunge.CopyEntry[opsH].bytesCopied; }; $AcceptNewMail => IF at < lastAcceptNewMailPos THEN [] ¬ WalnutLogExpunge.SkipEntry[opsH] ELSE { newLogPos: INT; [newLogPos, bytesThisCopy] ¬ WalnutLogExpunge.CopyEntry[opsH]; WalnutDB.SetAcceptNewMailPos[opsH, newLogPos]; }; $StartCopyReadArchive => [] ¬ WalnutLogExpunge.SkipEntry[opsH]; $EndCopyReadArchiveInfo => { startCopyPos: INT ¬ WalnutLogExpunge.EndCopyEntry[opsH]; IF firstDestroyedMsgPos < startCopyPos THEN [] ¬ WalnutLogExpunge.SkipEntry[opsH] ELSE bytesThisCopy ¬ WalnutLogExpunge.CopyEntry[opsH].bytesCopied; }; ENDCASE => IF msgID.Length[] = 0 OR WalnutDB.MsgExists[opsH, msgID] THEN { newLogPos: INT ; [newLogPos, bytesThisCopy] ¬ WalnutLogExpunge.CopyEntry[opsH]; IF ident = $CreateMsg THEN { IF newLogPos # at THEN WalnutDB.SetMsgEntryPosition[opsH, newLogPos]; numMsgsSinceShutdown ¬ numMsgsSinceShutdown + 1; IF (numMsgs ¬ numMsgs + 1) MOD 10 = 0 THEN IF numMsgs MOD 100 = 0 THEN CheckReport[opsH, "(%g) ", [integer[numMsgs]] ] ELSE CheckReport[opsH, "."]; }; } ELSE [] ¬ WalnutLogExpunge.SkipEntry[opsH]; IF numMsgsSinceShutdown >= shutdownFrequency THEN { expLogPos: INT; [currentLogPos, expLogPos] ¬ WalnutLogExpunge.GetExpungeProgress[opsH]; WalnutDB.SetExpungeProgressInfo[opsH, currentLogPos, expLogPos]; WalnutLogExpunge.EndExpunge[opsH]; ShortShutdown[opsH, 20, -1]; [] ¬ WalnutLogExpunge.RestartExpunge[opsH, currentLogPos, expLogPos]; lastExpungeCommitLength ¬ expLogPos; bytesCopiedSinceLastCommit ¬ bytesThisCopy ¬ 0; numMsgsSinceShutdown ¬ 0; }; IF (bytesCopiedSinceLastCommit ¬ bytesCopiedSinceLastCommit + bytesThisCopy) >= bytesToCopyBeforeFlush THEN { expLogPos: INT; [currentLogPos, expLogPos] ¬ WalnutLogExpunge.GetExpungeProgress[opsH]; WalnutDB.SetExpungeProgressInfo[opsH, currentLogPos, expLogPos]; WalnutRoot.CommitAndContinue[opsH]; lastExpungeCommitLength ¬ expLogPos; bytesCopiedSinceLastCommit ¬ 0; }; ENDLOOP; IF bytesCopiedSinceLastCommit > 0 THEN { [currentLogPos, expungeLogPos] ¬ WalnutLogExpunge.GetExpungeProgress[opsH]; WalnutDB.SetExpungeProgressInfo[opsH, currentLogPos, expungeLogPos]; WalnutRoot.CommitAndContinue[opsH]; lastExpungeCommitLength ¬ expungeLogPos; }; RETURN; EXITS retry => { schemaInvalid: BOOL ¬ TRUE; WalnutLogExpunge.EndExpunge[opsH]; WalnutRoot.AbortTransaction[opsH]; schemaInvalid ¬ WalnutRoot.StartTransaction[opsH]; IF schemaInvalid THEN [] ¬ WalnutDB.InitSchema[opsH, schemaInvalid]; WalnutLog.OpenLogStreams[opsH]; }; END; ENDLOOP; }; CheckInProgress: PUBLIC PROC[opsH: WalnutOpsHandle] = { inProgressPos: INT ¬ 0; parseInProgress: BOOL ¬ FALSE; needsParse: BOOL ¬ FALSE; doingLogExpunge: BOOL ¬ FALSE; Cip: PROC = { IF (parseInProgress ¬ WalnutDB.GetParseLogInProgress[opsH]) THEN { WalnutRoot.AcquireWriteLock[opsH]; RETURN }; inProgressPos ¬ WalnutDB.GetOpInProgressPos[opsH]; doingLogExpunge ¬ ( WalnutDB.GetLogExpungePhase[opsH] # idle ); IF inProgressPos <= 0 THEN RETURN; WalnutRoot.AcquireWriteLock[opsH]; }; Fip: PROC[inProgress: BOOL] = { -- FinishInProgress at: INT; wle: WalnutKernelDefs.LogEntry; WalnutLog.SetIndex[opsH, inProgressPos]; [wle, at] ¬ WalnutLog.NextEntry[opsH]; IF at = -1 THEN ERROR WalnutDefs.Error[$log, $BadLog, IO.PutFR1["No entry at %g", [integer[inProgressPos]]] ]; TRUSTED { WITH le: wle SELECT FROM ExpungeMsgs => WalnutDB.ExpungeMsgs[opsH, dontCareMsgSetVersion, CheckReport]; WriteExpungeLog => DoLogExpunge[opsH]; EmptyMsgSet => []¬ WalnutDB.EmptyMsgSet[ opsH, [ le.msgSet, dontCareMsgSetVersion ], WalnutOpsInternal.CheckReport ]; DestroyMsgSet => []¬ WalnutDB.DestroyMsgSet[ opsH: opsH, msgSet: [ le.msgSet, dontCareMsgSetVersion ], msDomainVersion: dontCareDomainVersion, report: WalnutOpsInternal.CheckReport ]; AcceptNewMail => { WalnutDB.AcceptNewMail[opsH, at, dontCareMsgSetVersion]; opsH.kernelHandle.newMailSomewhere ¬ FALSE; }; StartCopyNewMail => { FinishCopyTempLog[opsH, newMail, at]; WalnutDB.SetCopyMailLogPos[opsH, 0]; WalnutRoot.CommitAndContinue[opsH]; WalnutLog.FinishTempLogCopy[opsH, newMail]; needsParse ¬ TRUE; }; StartReadArchiveFile => { WalnutLog.ResetLog[opsH, at]; WalnutDB.SetReadArchivePos[opsH, 0]; }; StartCopyReadArchive => { FinishCopyTempLog[opsH, readArchive, at]; WalnutDB.SetCopyReadArchivePos[opsH, 0]; WalnutRoot.CommitAndContinue[opsH]; WalnutLog.FinishTempLogCopy[opsH, readArchive]; needsParse ¬ TRUE; }; ENDCASE => ERROR WalnutDefs.Error[$db, $ErroneousInProgress, IO.PutFR1["Entry at %g", [integer[inProgressPos]]] ]; }; -- end trusted WalnutDB.SetOpInProgressPos[opsH, -1]; WalnutRoot.CommitAndContinue[opsH]; }; IF opsH.kernelHandle.errorInProgress THEN ERROR WalnutDefs.Error[$db, $InternalError, "Must do Shutdown & Startup"]; IF opsH.kernelHandle.isShutdown THEN { IF opsH.rootHandle = NIL THEN [] ¬ WalnutRoot.Open[opsH]; Restart[opsH]; }; WalnutOpsInternal.CarefullyApply[opsH, Cip, FALSE]; IF parseInProgress THEN { StatsReport[opsH, "\n Continuing parseInProgress"]; [] ¬ ParseLog[opsH, TRUE]; RETURN }; IF inProgressPos <= 0 THEN RETURN; StatsReport[opsH, "\n Continuing longRunningOperation"]; WalnutOpsInternal.LongRunningApply[opsH, Fip]; IF needsParse THEN { numNew: INT ¬ 0; CheckReport[opsH, "Adding messages to the database\n"]; numNew ¬ ParseLog[opsH, FALSE]; IF numNew = 0 THEN CheckReport[opsH, "No messages were new\n"] ELSE CheckReport[opsH, " %g new messages\n", [integer[numNew]] ]; }; }; FinishCopyTempLog: PROC[opsH: WalnutOpsHandle, which: WhichTempLog, at: INT] = { logLen: INT ¬ WalnutLog.LogLength[opsH]; startedCopyAt, startCopyPos, fromPos: INT; pagesAlreadyCopied: INT; IF WalnutLog.SetPosition[opsH, at] # 0 THEN ERROR WalnutDefs.Error[$log, $BadLog, IO.PutFR1["no entry at %g", [integer[at]]] ]; startCopyPos ¬ WalnutLog.NextEntry[opsH].at; -- skip the copy entry startedCopyAt ¬ WalnutLog.NextAt[opsH]; fromPos ¬ logLen - startedCopyAt; pagesAlreadyCopied ¬ FS.PagesForBytes[fromPos]; IF ~WalnutLog.PrepareToCopyTempLog[opsH: opsH, which: which, pagesAlreadyCopied: pagesAlreadyCopied, reportProc: WalnutOpsInternal.CheckReport] THEN ERROR WalnutDefs.Error[$log, $DuringOpInProgress, "can't get tempLog for copy"]; CheckReport[opsH, "\nContinuing copy of tempLog, starting at bytePos %g\n", [integer[fromPos]] ]; WalnutLog.CopyTempLog[opsH, which, startCopyPos, fromPos, CheckReport]; WalnutDB.SetParseLogInProgress[opsH, TRUE]; WalnutDB.SetParseLogPos[opsH, WalnutDB.GetOpInProgressPos[opsH]]; WalnutDB.SetOpInProgressPos[opsH, -1]; }; ParseLog: PUBLIC PROC[opsH: WalnutOpsHandle, doingScavenge: BOOL ¬ FALSE] RETURNS[numNew: INT] = { parseLogPos: INT; at: INT; -- position in log of entry just processed lastNumNew, savedNumNew, savedLastNumNew: INT ¬ 0; wle: WalnutLog.LogEntry; charsSkipped: INT; errorRope: ROPE = "Can't find a log entry after pos %g (log length is %g)"; retries: INT ¬ 2; accepted: BOOL; giveStartInfo: BOOL ¬ TRUE; commitFrequency: INT ¬ parseCommitFrequency; updatesSinceLastCommit, numSinceShutdown: INT ¬ 0; numNew ¬ 0; DO currentLogLength: INT ¬ WalnutLog.LogLength[opsH]; -- in case at end of Expunge BEGIN ENABLE BEGIN WalnutDefs.Error => { IF debugging THEN SIGNAL WDErrorSeen; IF code = $TransactionAbort THEN { StatsReport[opsH, " *** TransactionAborts during parseLog"]; IF ( retries ¬ retries - 1) > 0 THEN GOTO tryAgain; opsH.kernelHandle.errorInProgress ¬ TRUE; StatsReport[opsH, " *** Too many TransactionAborts during parseLog"]; REJECT } ELSE { opsH.kernelHandle.errorInProgress ¬ TRUE; StatsReport[opsH, "InternalError: %g during parseLog", [atom[code]] ]; REJECT }; }; END; parseLogPos ¬ WalnutDB.GetParseLogPos[opsH]; IF giveStartInfo THEN { IF doingScavenge THEN numNew ¬ WalnutDB.SizeOfDatabase[opsH].messages ELSE numNew ¬ 0; IF doingScavenge AND numNew # 0 THEN CheckReport[opsH, " Parsing the log - database has %g messages so far\n", [integer[numNew]] ]; giveStartInfo ¬ FALSE; }; IF ( parseLogPos < 0 ) OR ( parseLogPos = currentLogLength ) THEN RETURN[numNew]; charsSkipped ¬ WalnutLog.SetPosition[opsH, parseLogPos]; IF charsSkipped = -1 THEN ERROR WalnutDefs.Error[$log, $BadLog, IO.PutFR[errorRope, [integer[parseLogPos]], [integer[currentLogLength]]] ]; at ¬ parseLogPos + charsSkipped; IF at # parseLogPos THEN { WalnutDB.SetParseLogPos[opsH, at]; WalnutRoot.CommitAndContinue[opsH]; }; accepted ¬ NOT WalnutDB.GetAddingServerMsgs[opsH]; DO previousAt: INT ¬ at; [wle, at] ¬ WalnutLog.NextEntry[opsH]; IF wle = NIL THEN { maybeBadAt: INT = at; charsSkipped ¬ WalnutLog.SetPosition[opsH, previousAt+1]; IF charsSkipped = -1 THEN ERROR WalnutDefs.Error[$log, $BadLog, IO.PutFR[errorRope, [integer[previousAt]], [integer[currentLogLength]]] ]; [wle, at] ¬ WalnutLog.NextEntry[opsH]; IF wle = NIL THEN ERROR WalnutDefs.Error[$log, $BadLog, IO.PutFR[errorRope, [integer[previousAt]], [integer[currentLogLength]]] ]; CheckReport[opsH, "\n*** Bad log entry at or near %g or %g; do a ScanWalnutLog sometime soon\n", [integer[previousAt]], [integer[at]] ]; }; IF numSinceShutdown >= shutdownFrequency THEN { WalnutDB.SetParseLogPos[opsH, at]; -- at is beginning of current (unseen) entry ShortShutdown[opsH, 20, at]; updatesSinceLastCommit ¬ 0; savedNumNew ¬ numNew; savedLastNumNew ¬ lastNumNew; numSinceShutdown ¬ 0; }; IF (updatesSinceLastCommit ¬ updatesSinceLastCommit+1) >= commitFrequency THEN { WalnutDB.SetParseLogPos[opsH, at]; -- at is beginning of current (unseen) entry WalnutRoot.CommitAndContinue[opsH]; updatesSinceLastCommit ¬ 0; savedNumNew ¬ numNew; savedLastNumNew ¬ lastNumNew; }; BEGIN ENABLE WalnutDefs.VersionMismatch => GOTO continue; TRUSTED { WITH le: wle SELECT FROM LogFileInfo => NULL; ExpungeMsgs => { WalnutDB.SetParseLogPos[opsH, at]; WalnutRoot.CommitAndContinue[opsH]; CheckReport[opsH, "\nStarting ExpungeMsgs "]; WalnutDB.ExpungeMsgs[opsH, dontCareMsgSetVersion, WalnutOpsInternal.CheckReport]; WalnutRoot.CommitAndContinue[opsH]; CheckReport[opsH, " ... finished ExpungeMsgs\n"]; }; WriteExpungeLog => { CheckReport[opsH, "\nStarting WriteExpungeLog "]; WalnutDB.SetParseLogPos[opsH, -1]; WalnutDB.SetParseLogInProgress[opsH, FALSE]; WalnutDB.SetOpInProgressPos[opsH, at]; -- at is beginning of current entry WalnutRoot.CommitAndContinue[opsH]; [] ¬ DoLogExpunge[opsH]; CheckReport[opsH, " ... finished WriteExpungeLog\n"]; EXIT; -- WriteExpungeLog had to be the last entry }; EndOfLog => { WalnutDB.SetParseLogPos[opsH, -1]; WalnutDB.SetParseLogInProgress[opsH, FALSE]; WalnutRoot.CommitAndContinue[opsH]; RETURN }; CreateMsgSet => [] ¬ WalnutDB.CreateMsgSet[opsH, le.msgSet, dontCareDomainVersion]; EmptyMsgSet => { [] ¬ WalnutDB.EmptyMsgSet[ opsH, [le.msgSet, dontCareMsgSetVersion], WalnutOpsInternal.CheckReport ]; }; DestroyMsgSet => { [] ¬ WalnutDB.DestroyMsgSet[ opsH: opsH, msgSet: [le.msgSet, dontCareMsgSetVersion], msDomainVersion: dontCareDomainVersion, report: WalnutOpsInternal.CheckReport ]; }; CreateMsg => { me: WalnutKernelDefs.MsgLogEntry = NARROW[wle]; me.show ¬ accepted; IF me.msg # NIL THEN { IF ~WalnutDB.AddNewMsg[opsH, me] THEN numNew ¬ numNew + 1; numSinceShutdown ¬ numSinceShutdown + 1; }; }; AddMsg => IF le.msg # NIL THEN []¬ WalnutDB.AddMsg[ opsH: opsH, msg: le.msg, from: [NIL, dontCareMsgSetVersion], to: [le.to, dontCareMsgSetVersion] ]; RemoveMsg => IF le.msg # NIL THEN []¬ WalnutDB.RemoveMsg[ opsH: opsH, msg: le.msg, from: [le.from, dontCareMsgSetVersion], deletedVersion: dontCareMsgSetVersion ]; MoveMsg => IF le.msg # NIL THEN []¬ WalnutDB.MoveMsg[ opsH: opsH, msg: le.msg, from: [ le.from, dontCareMsgSetVersion], to: [le.to, dontCareMsgSetVersion] ]; HasBeenRead => IF le.msg # NIL THEN WalnutDB.SetHasBeenRead[opsH, le.msg]; RecordNewMailInfo => { WalnutDB.SetNewMailInfo[ opsH, le.logLen, le.when, le.server, le.num]; opsH.kernelHandle.newMailSomewhere ¬ TRUE; }; StartCopyNewMail => { WalnutDB.SetCopyMailLogPos[opsH, at]; WalnutDB.SetAddingServerMsgs[opsH, TRUE]; accepted ¬ FALSE; }; EndCopyNewMailInfo => { WalnutDB.SetCopyMailLogPos[opsH, 0]; WalnutDB.SetNewMailInfo[opsH, 0, BasicTime.nullGMT, NIL, 0]; WalnutDB.SetAddingServerMsgs[opsH, FALSE]; accepted ¬ TRUE; }; AcceptNewMail => { WalnutDB.AcceptNewMail[opsH, at, dontCareMsgSetVersion]; opsH.kernelHandle.newMailSomewhere ¬ FALSE; }; StartReadArchiveFile => WalnutDB.SetReadArchivePos[opsH, at]; EndReadArchiveFile => WalnutDB.SetReadArchivePos[opsH, 0]; StartCopyReadArchive => WalnutDB.SetCopyReadArchivePos[opsH, at]; EndCopyReadArchiveInfo => WalnutDB.SetCopyReadArchivePos[opsH, 0]; ENDCASE => ERROR; }; EXITS continue => NULL; END; IF numNew # lastNumNew THEN IF numNew MOD 10 = 0 THEN IF numNew MOD 100 = 0 THEN CheckReport[opsH, "(%g) ", [integer[numNew]] ] ELSE CheckReport[opsH, "~"]; lastNumNew ¬ numNew; ENDLOOP; EXITS tryAgain => { schemaInvalid: BOOL ¬ TRUE; numNew ¬ savedNumNew; lastNumNew ¬ savedLastNumNew; WalnutRoot.AbortTransaction[opsH]; schemaInvalid ¬ WalnutRoot.StartTransaction[opsH, TRUE]; IF schemaInvalid THEN [] ¬ WalnutDB.InitSchema[opsH, schemaInvalid]; WalnutLog.OpenLogStreams[opsH]; giveStartInfo ¬ TRUE; }; END; ENDLOOP; }; ShortShutdown: PROC[opsH: WalnutOpsHandle, sec: INT, logPos: INT] = { WalnutLog.ShutdownLog[opsH]; -- this does a close transaction as well Process.PauseMsec[sec*1000]; [] ¬ WalnutRoot.StartTransaction[opsH, TRUE]; WalnutLog.OpenLogStreams[opsH]; IF logPos >= 0 THEN [] ¬ WalnutLog.SetPosition[opsH, logPos]; }; DoStartNewMail: PUBLIC PROC[opsH: WalnutOpsHandle] RETURNS[STREAM] = { newMailLogLength: INT ¬ -1; expungeInProgress: BOOL ¬ FALSE; Gnml: PROC = { newMailLogLength ¬ WalnutDB.GetNewMailLogLength[opsH]; expungeInProgress ¬ WalnutDB.GetLogExpungePhase[opsH] # idle; }; CheckInProgress[opsH]; IF opsH.kernelHandle.mailStream # NIL THEN RETURN[NIL]; CarefullyApply[opsH, Gnml, FALSE]; IF newMailLogLength = -1 OR expungeInProgress THEN RETURN[NIL]; RETURN[opsH.kernelHandle.mailStream ¬ WalnutMiscLog.GetNewMailLog[opsH, newMailLogLength, 200]]; }; DoAcceptNewMail: PUBLIC PROC[opsH: WalnutOpsHandle, activeVersion: INT] = { Anm: PROC[inProgress: BOOL] = { at: INT; IF ~inProgress THEN { [] ¬ WalnutDB.VerifyMsgSet[opsH, [ActiveName, activeVersion]]; at ¬ WalnutLog.AcceptNewMail[opsH].at; WalnutDB.SetOpInProgressPos[opsH, at]; } ELSE at ¬ WalnutDB.GetAcceptNewMailPos[opsH]; WalnutDB.AcceptNewMail[opsH, at, activeVersion]; }; CheckInProgress[opsH]; LongRunningApply[opsH, Anm]; opsH.kernelHandle.newMailSomewhere ¬ FALSE; }; DoNewMail: PUBLIC PROC[ opsH: WalnutOpsHandle, activeVersion: INT, proc: PROC[msg, tocEntry: ROPE, startOfSubject: INT]] RETURNS[responses: LIST OF WalnutOps.ServerInfo, complete: BOOL] = { someEntries: BOOL ¬ FALSE; Cml: PROC = { IF activeVersion # WalnutDefs.dontCareMsgSetVersion THEN [] ¬ WalnutDB.VerifyMsgSet[opsH, [ActiveName, activeVersion] ]; someEntries ¬ WalnutDB.GetNewMailLogLength[opsH] # 0; }; Cml2: PROC[inProgress: BOOL] = { fromPos: INT ¬ 0; complete ¬ WalnutLog.PrepareToCopyTempLog[opsH: opsH, which: newMail, pagesAlreadyCopied: 0, reportProc: CheckReport]; IF ~complete THEN RETURN; IF ~inProgress THEN { at: INT ¬ WalnutLog.StartCopyNewMail[opsH].at; WalnutDB.SetOpInProgressPos[opsH, at]; WalnutDB.SetCopyMailLogPos[opsH, at]; } ELSE { at: INT = WalnutDB.GetCopyMailLogPos[opsH]; CheckReport[opsH, "\n Continue copying the newMailLog\n"]; [] ¬ WalnutLog.SetPosition[opsH, at]; [] ¬ WalnutLog.NextEntry[opsH]; -- skip the copy entry fromPos ¬ WalnutLog.LogLength[opsH] - WalnutLog.NextAt[opsH]; }; WalnutDB.SetAddingServerMsgs[opsH, TRUE]; WalnutLog.CopyTempLog[opsH, newMail, WalnutDB.GetCopyMailLogPos[opsH], fromPos, CheckReport]; WalnutDB.SetParseLogInProgress[opsH, TRUE]; WalnutDB.SetParseLogPos[opsH, WalnutDB.GetOpInProgressPos[opsH]]; WalnutDB.SetOpInProgressPos[opsH, -1]; }; Gnm: PROC = { IF proc # NIL THEN WalnutDB.EnumerateUnacceptedMsgs[opsH, activeVersion, proc]; responses ¬ WalnutDB.EnumerateServers[opsH]; }; CheckInProgress[opsH]; CarefullyApply[opsH, Cml, FALSE]; IF opsH.kernelHandle.mailStream # NIL THEN RETURN[NIL, FALSE]; -- file is busy complete ¬ TRUE; IF someEntries THEN { LongRunningApply[opsH, Cml2]; IF complete THEN { WalnutLog.OpenLogStreams[opsH]; CheckReport[opsH, "Adding new mail to the database @ %g\n", [time[BasicTime.Now[]]] ]; [] ¬ ParseLog[opsH, FALSE]; -- "see" messages [] ¬ FilterMessages[opsH, activeVersion]; }; }; CarefullyApply[opsH, Gnm, FALSE]; }; TapRef: TYPE = REF TapFilterRep; TapFilterRep: TYPE = RECORD[opsH: WalnutOpsHandle, annotations: INT ¬ 0]; ReportProgress: TapFilter.MonitorProc = { tR: TapRef ¬ NARROW[clientData]; CheckReport[tR.opsH, "@"]; tR.annotations ¬ tR.annotations + 1; }; FilterMessages: PROC[opsH: WalnutOpsHandle, activeVersion: INT] RETURNS [annotations: INT ¬ 0] ~ { ENABLE TapFilter.Error => { CheckReport[opsH, "Problem with filtering agent: %g - %g.\n", IO.atom[ec], IO.rope[explanation]]; CONTINUE; }; GetMsgText: PROC [opsHandle: WalnutOpsHandle, msg: ROPE] RETURNS [ROPE] = { textStart, textLen: INT; contents: REF TEXT; [textStart, textLen, ] ¬ WalnutDB.GetMsgText[opsHandle, msg]; IF textStart = 0 THEN RETURN[NIL]; IF textLen > RefText.MaxLen-8 THEN textLen ¬ RefText.MaxLen-8; contents ¬ RefText.New[textLen]; WalnutLog.GetRefTextFromLog[opsHandle, textStart, textLen, contents]; RETURN[RefText.TrustTextAsRope[contents]]; }; PassToFilterAgent: PROC[msg, tocEntry: ROPE, startOfSubject: INT] = { parsedMsg: TapMsgQueue.Msg; contents: ROPE ¬ GetMsgText[opsH, msg]; parsedMsg ¬ TapFilter.ParseMsgIntoFields[contents]; feederSeqNum ¬ feederSeqNum + 1; parsedMsg ¬ CONS[[TapFilter.seqNum, LoganBerryEntry.I2V[feederSeqNum]], parsedMsg]; parsedMsg ¬ CONS[[TapFilter.msgID, msg], parsedMsg]; TapMsgQueue.Put[parsedMsg, opsH.filterFeeder]; }; Fm: PROC = { wantAnnotations: BOOLEAN ¬ UserProfile.Boolean[key: "WallTapestry.AnnotateNewMail", default: FALSE]; filterDBName: ROPE ¬ UserProfile.Token[key: "WallTapestry.FilterDB", default: NIL]; annotationDBName: ROPE ¬ UserProfile.Token[key: "WallTapestry.AnnotationDB", default: NIL]; filteringAgent: TapFilter.Agent ¬ opsH.filterAgent; IF NOT wantAnnotations OR filterDBName = NIL OR annotationDBName = NIL THEN RETURN; IF filteringAgent = NIL THEN { opsH.filterFeeder ¬ TapMsgQueue.Create[]; filteringAgent ¬ opsH.filterAgent ¬ TapFilter.CreateAgent[feeder: opsH.filterFeeder, filterDB: filterDBName, user: NIL, annotDB: annotationDBName]; IF filteringAgent = NIL THEN RETURN; }; tR ¬ NEW[TapFilterRep ¬ [opsH, 0]]; CheckReport[opsH, "\nAnnotating messages: "]; TapFilter.MonitorAgent[agent: filteringAgent, proc: ReportProgress, clientData: tR]; WalnutDB.EnumerateUnacceptedMsgs[opsH, activeVersion, PassToFilterAgent]; TapFilter.WakeupAgent[filteringAgent]; [] ¬ TapFilter.IsAgentIdle[agent: filteringAgent, wait: TRUE]; CheckReport[opsH, " done.\n"]; TapFilter.MonitorAgent[agent: filteringAgent, proc: NIL, clientData: NIL]; }; tR: TapRef; CarefullyApply[opsH, Fm, FALSE]; RETURN[IF tR # NIL THEN tR.annotations ELSE 0]; }; DoReadArchiveFile: PUBLIC PROC[opsH: WalnutOpsHandle, file: ROPE, msgSet: MsgSet] RETURNS[numNew: INT] = { ENABLE UNWIND => NULL; ok: BOOL ¬ FALSE; fStream: STREAM; reason: ROPE ¬ NIL; Raf: PROC = { at: INT; IF msgSet.name # NIL THEN [] ¬ WalnutDB.VerifyMsgSet[opsH, msgSet]; at ¬ WalnutLog.StartReadArchiveFile[opsH, file, msgSet.name].at; WalnutDB.SetReadArchivePos[opsH, at]; }; Raf2: PROC = { [] ¬ WalnutLog.EndReadArchiveFile[opsH]; WalnutDB.SetReadArchivePos[opsH, 0]; }; Caf: PROC[inProgress: BOOL] = { fromPos: INT ¬ 0; IF ~inProgress THEN { at: INT; ok ¬ WalnutLog.PrepareToCopyTempLog[ opsH: opsH, which: readArchive, pagesAlreadyCopied: 0, reportProc: WalnutOpsInternal.CheckReport]; IF ~ok THEN RETURN; at ¬ WalnutLog.StartCopyReadArchive[opsH].at; WalnutDB.SetCopyReadArchivePos[opsH, at]; WalnutDB.SetOpInProgressPos[opsH, at]; WalnutRoot.CommitAndContinue[opsH]; } ELSE { -- calculate fromPos logLen: INT ¬ WalnutLog.LogLength[opsH]; startedCopyAt, startCopyPos: INT; startCopyPos ¬ WalnutDB.GetCopyReadArchivePos[opsH]; IF WalnutLog.SetPosition[opsH, startCopyPos] # 0 THEN ERROR WalnutDefs.Error[$log, $BadLog, IO.PutFR1["no entry at %g", [integer[startCopyPos]]] ]; [] ¬ WalnutLog.NextEntry[opsH]; -- skip the copy entry startedCopyAt ¬ WalnutLog.NextAt[opsH]; fromPos ¬ logLen - startedCopyAt; }; CheckReport[opsH, "Copying the ReadArchiveTempLog, starting at bytePos %g\n", [integer[fromPos]]]; WalnutLog.CopyTempLog[opsH, readArchive, WalnutDB.GetCopyReadArchivePos[opsH], fromPos, CheckReport]; WalnutDB.SetParseLogInProgress[opsH, TRUE]; WalnutDB.SetParseLogPos[opsH, WalnutDB.GetOpInProgressPos[opsH]]; WalnutDB.SetOpInProgressPos[opsH, -1]; }; CheckInProgress[opsH]; fStream ¬ WalnutStream.Open[name: file, readOnly: TRUE ! FS.Error => { CheckReport[opsH, error.explanation]; fStream ¬ NIL; CONTINUE} ].strm; IF fStream = NIL THEN RETURN[-1]; CarefullyApply[opsH, Raf, TRUE]; BEGIN ENABLE BEGIN FS.Error => { reason ¬ error.explanation; GOTO exit }; IO.Error => { <> IF reason = NIL THEN reason ¬ "IO Error creating readArchiveLog"; GOTO exit }; END; [ok, reason] ¬ WalnutMiscLog.CreateReadArchiveLog[opsH, fStream, msgSet.name, CheckReport]; EXITS exit => ok ¬ FALSE; END; fStream.Close[ ! IO.Error, FS.Error => CONTINUE]; IF ~ok THEN { CheckReport[opsH, " Archive Read of %g failed\n", [rope[file]] ]; IF reason # NIL THEN CheckReport[opsH, " Error reported as: %g\n", [rope[reason]]]; RETURN[-1]; } ELSE Raf2[]; LongRunningApply[opsH, Caf]; IF ~ok THEN { CheckReport[opsH, "Out of space trying to copy readArchiveLog for file %g\n", [rope[file]] ]; RETURN[-1]; }; CheckReport[opsH, "Adding messages to database\n"]; numNew ¬ ParseLog[opsH, FALSE]; }; nativeWalnutItemForm: ROPE = "@%05d 00525 %05d\n"; -- 20 chars, tioga formatting crWalnutItemForm: ROPE = "@%05d 00525 %05d\r"; -- 20 chars, tioga formatting nativeStartHeaderForm: ROPE = "*start*\n%05d %05d US \n"; crStartHeaderForm: ROPE = "*start*\r%05d %05d US \r"; DoWriteArchiveFile: PUBLIC PROC[ opsH: WalnutOpsHandle, file: ROPE, msgSetList: LIST OF MsgSet, append: BOOL] RETURNS[ok: BOOL]= { wStream: STREAM; someMsgWasTooBig: BOOL ¬ FALSE; thisMsgSet, exp: ROPE; startHeaderFixedLen: INT = 24; useCRForArchive: BOOL ¬ UserProfile.Boolean["Walnut.useCRForArchiveFile", FALSE]; walnutItemForm: ROPE ~ IF useCRForArchive THEN crWalnutItemForm ELSE nativeWalnutItemForm; startHeaderForm: ROPE ~ IF useCRForArchive THEN crStartHeaderForm ELSE nativeStartHeaderForm; first: BOOL ¬ TRUE; WriteProc: PROC[msg, tocEntry: ROPE, hasBeenRead: BOOL, startOfSubject: INT] RETURNS[continue: BOOL] = { textStart, textLen, formatLen, prefixLen: INT; length, walnutItemLen: INT ¬ 0; walnutItem: ROPE; continue ¬ TRUE; [textStart, textLen, formatLen, , ] ¬ WalnutDB.GetMsgText[opsH, msg]; walnutItem ¬ IF useCRForArchive THEN Rope.Cat[msg, "\r", thisMsgSet, "\r"] ELSE Rope.Cat[msg, "\n", thisMsgSet, "\n"]; walnutItemLen ¬ WalnutMiscLog.walnutItemFixedLength + walnutItem.Length[] + formatLen; IF formatLen # 0 THEN walnutItemLen ¬ walnutItemLen + 1; -- for extra CR after formatting prefixLen ¬ startHeaderFixedLen + walnutItemLen; length ¬ prefixLen + textLen + 1; -- extra CR after text IF length > 99999 THEN { CheckReport[opsH, "\nLength of msg %g is too big (%g bytes) - skipping", [rope[msg]], [integer[length]] ]; someMsgWasTooBig ¬ TRUE; RETURN }; wStream.PutF[ startHeaderForm, [integer[length]], [integer[prefixLen]] ]; wStream.PutF[walnutItemForm, [integer[walnutItemLen-2]], [integer[formatLen]] ]; wStream.PutRope[walnutItem]; IF formatLen # 0 THEN { WalnutLog.CopyBytesToArchive[opsH, wStream, textStart+textLen, formatLen]; wStream.PutChar[IF useCRForArchive THEN '\r ELSE '\n]; }; wStream.PutChar['@]; WalnutLog.CopyBytesToArchive[opsH, wStream, textStart, textLen]; wStream.PutChar[IF useCRForArchive THEN '\r ELSE '\n]; }; ok ¬ FALSE; CheckInProgress[opsH]; BEGIN ENABLE BEGIN LoganBerry.Error => { IF debugging THEN SIGNAL DBErrorSeen; IF wStream # NIL THEN wStream.Close[ ! IO.Error, FS.Error => CONTINUE]; ERROR WalnutDefs.Error[$db, ec, IO.PutFR1["DB.Error with explanation: %g", [rope[explanation]] ] ]; }; WalnutDefs.Error => { IF debugging THEN SIGNAL WDErrorSeen; IF wStream # NIL THEN wStream.Close[ ! IO.Error, FS.Error => CONTINUE]; REJECT; }; WalnutDefs.VersionMismatch => { IF wStream # NIL THEN wStream.Close[ ! IO.Error, FS.Error => CONTINUE]; REJECT; }; < { CheckReport[opsH, exp ¬ FS.ErrorFromStream[stream].explanation]; GOTO err };>> IO.Error => { CheckReport[opsH, exp ¬ "IO Error"]; GOTO err }; FS.Error => { CheckReport[opsH, exp ¬ error.explanation]; GOTO err }; END; BEGIN wStream ¬ WalnutStream.Open[ name: file, useOldIfFound: append, exclusive: TRUE ! FS.Error => { CheckReport[opsH, error.explanation]; GOTO none }].strm; EXITS none => wStream ¬ NIL; END; IF wStream = NIL THEN { CheckReport[opsH, "\nCould not open %g\n", [rope[file]] ]; RETURN }; IF append THEN wStream.SetIndex[wStream.GetLength[]] ELSE { wStream.SetIndex[0]; wStream.SetLength[0]; }; CheckReport[opsH, "\n Archiving: "]; FOR mL: LIST OF WalnutDefs.MsgSet ¬ msgSetList, mL.rest UNTIL mL=NIL DO thisMsgSet ¬ mL.first.name; IF ~WalnutDB.VerifyMsgSet[opsH, mL.first] THEN { CheckReport[opsH, "\n MsgSet %g doesn't exist - continuing", [rope[thisMsgSet]] ]; LOOP; }; IF first THEN first ¬ FALSE ELSE CheckReport[opsH, ", "]; CheckReport[opsH, thisMsgSet]; [] ¬ WalnutDB.EnumerateMsgsInSet[opsH, thisMsgSet, TRUE, WriteProc]; ENDLOOP; EXITS err => { wStream.Close[]; ERROR WalnutDefs.Error[$log, $ErrorDuringWriteArchive, exp]; }; END; wStream.Close[]; ok ¬ TRUE; CheckReport[opsH, "\tFinished writing archive file\n"]; }; END.  WalnutOpsInternalImpl.mesa Copyright Σ 1984, 1988, 1989, 1990, 1991, 1992 by Xerox Corporation. All rights reserved. Willie-sue, April 30, 1992 11:21 am PDT Donahue, July 16, 1985 2:25:35 pm PDT Doug Terry, July 20, 1992 5:28 pm PDT Swinehar, February 20, 1991 4:32 pm PST Implementation of procedures of WalnutOpsInternal Types Variables Procedures -- * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * which: ROPE = Atom.GetPName[opsH.segmentID.segment]; statsFile: ROPE _ IO.PutFR["///Users/%g/%g.log", [rope[UserCredentials.Get[].name]], [rope[which]] ]; Utilities DB.Aborted => { IF debugging THEN SIGNAL DBAbortedSeen; GOTO aborted; }; aborted => { WalnutRoot.StatsReport[opsH, "\n**** trans abort" ]; StatsReport[opsH, " **** TransactionAbort"]; IF ( reTryCount _ reTryCount - 1) < 0 THEN { opsH.kernelHandle.errorInProgress _ TRUE; StatsReport[opsH, " *** Too many TransactionAbort's during WalnutOps call"]; WalnutRoot.AbortTransaction[opsH]; ERROR WalnutDefs.Error[$log, $TransactionAbort]; }; }; WalnutRoot.AbortTransaction[opsH]; IF (schemaInvalid _ WalnutRoot.StartTransaction[opsH]) THEN [] _ WalnutDB.InitSchema[opsH, schemaInvalid]; WalnutLog.OpenLogStreams[opsH]; DB.Aborted => GOTO aborted; aborted => { WalnutRoot.StatsReport[opsH, "\n**** trans abort" ]; StatsReport[opsH, " **** TransactionAbort"]; IF ( reTryCount _ reTryCount - 1) < 0 THEN { opsH.kernelHandle.errorInProgress _ TRUE; StatsReport[opsH, " *** Too many TransactionAbort's during WalnutOps call"]; WalnutRoot.AbortTransaction[opsH]; ERROR WalnutDefs.Error[$log, $TransactionAbort]; }; }; WalnutRoot.AbortTransaction[opsH]; IF (schemaInvalid _ WalnutRoot.StartTransaction[opsH]) THEN [] _ WalnutDB.InitSchema[opsH, schemaInvalid]; WalnutLog.OpenLogStreams[opsH]; alreadyCalled _ WalnutDB.GetOpInProgressPos[opsH] > 0; * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * The log is re-written, starting from the first file with deleted messages. Called from within a LongRunningApply; calling SetLogExpungePhase does a commit called from within catch phrases, but must do its own error catching, since the expunge Log cannot be flushed after every write - that makes too much traffic on the alpine server; reads the old log, starting at currentLogPos, copies entries to the expungeLog, changing msg entryStart pointers as it goes, doing ocasional commits of both the expungeLog and the database lastAcceptNewMailPos can't be before firstDestroyedMsgPos the next entry either has nothing to do with a message or it is an operation on a message that still exists copy the entry to the new log it is the message body itself; record the change in position in the database if you haven't consumed the entry by copying it, then just advance the log pointer Commit the last batch of updates Is some long running operation in progress Parsing the Log - assumes the log is write-locked make sure things are in good shape before starting if we don't SetParseLogPos & Commit here, we might delete some messages, commit and later get an error, parse the log from an earlier point, and add more to bytesInDestroyedMsgs raises error if problem [msgID: ROPE, msg: TapMsgQueue.Msg, filterID: ROPE, annot: TapFilter.Annotation, clientData: REF ANY] RETURNS [doIt: BOOLEAN _ TRUE] Adapted from WalnutOpsImpl.GetMsgText. Get user profile info. Create filtering agent if necessary. Parse msgs and place on filter queue. Wakeup agent to filter messages and wait until it is finished. Write a "readArchiveFile" log entry; if the archiveFile exists, parses it and writes the appropriate entries (new msgs and moves if msgSet is not "Active" (NIL defaults to categories specified in the file). Then replays the log. If the file couldn't be read, numNew = -1; Write an archive file that contains the messages from the given message sets. No log entry is written and no updates are made to the database (we just hold the monitor to guarantee that no changes to the message sets occur). -- the -2 below are because the bytecount within the prefix item does not include the surrounding @'s Κ+΅–(cedarcode) style•NewlineDelimiter ™codešΟb™Kšœ ΟeœO™ZK™'KšΟy%™%K™%K™'K™Kšœ1™1K™K™—šΟk ˜ Kšœ  œ œ˜$Kš œ œW˜_Kš œ˜Kšœ  œ ˜Kšœ œ˜Kšœ œ ˜Kšœ œ ˜-Kšœ˜Kšœ œ˜Kšœ  œu˜„Kšœ  œ˜%Kšœ  œ˜#Kšœ œ ˜Kšœ  œ,˜:Kšœ  œ˜Kšœ Οcœ˜Kšœ  œd˜tKšœ œ8˜NKšœ ‘œ˜Kšœ‘œ˜"Kšœ œ>˜QKšœ ˜ Kšœ˜Kšœ œ˜-Kšœ  œΛ˜ΫKšœ  œ˜—K˜šœ œ ˜$š ˜Kšœ  œ œa˜rKšœ#˜#Kšœ˜Kšœ+˜+KšœM˜M—K˜Kš œ˜%K˜Kšœ  œ ˜*—K™š™K˜Kš œ œ œ˜Kš œ œ œ œ˜Kš œ œ  œ˜K˜Kšœ œ˜3Kšœ  œ˜#Kšœ œ˜!Kšœ œ˜3Kšœ  œ˜)Kšœ  œ˜6Kšœ œ"˜4Kšœ œ œ%˜AK˜Kšœ œ!˜3Kšœ  œ˜)—K™š ™ K˜Kšœ œ œ˜#Kšœ  œ œ˜KšΟn œ œ œ˜Kš’ œ œ œ˜Kšœ œ œ‘˜BK˜Kšœ œ˜"Kšœ œ˜.Kšœ œ˜K˜Kšœ œ˜ Kšœ œ#˜=Kšœ œ#˜=K˜Kšœ œ˜K˜—K™Kš ™ ˜™HK˜š’œ œ œ˜;Kšœ œ)™4Kšœ œ˜K˜%Kšœ  œ œQ™eKšœ  œ˜&K˜Kš  œ œ œ œ  œ˜KKšœ œB˜UK˜š œ œ˜Kšœ œ ˜*K˜0K˜Tš œ œ ˜ K˜J—Kšœ$ œ˜)K˜—K˜Kšœ0˜0K˜—K˜š’ œ œ˜2Kšœ œ ˜Kšœ œ˜Kšœ œ˜ Kš œ* œ œ œ˜K˜K˜K˜K˜—K˜———K™š ™ ˜š ’œ œ œ œ œ˜UKšœ  œ˜Kšœ œ œ˜Kšœ œ˜ Kšœ œ˜ Kšœ# œ˜(K˜š œ œ ˜%Kš œC˜H—K˜Kš ˜š œ œ˜šœ˜Kš œ  œ œ ˜%Kšœ$ œ˜)K˜—šœ˜Kš œ  œ œ ˜%K˜ K˜Kš œ˜ Kšœ˜—š œ ™Kš œ  œ œ™'Kš œ ™ K™—K˜—˜š œ) œ ˜3Kš œ%˜*—Kšœ˜K˜Kš œ  œ$˜5Kš œ˜K˜š ˜šœ ˜ Kšœ$ œ˜)šœC˜CKšœ˜—Kš œ#˜(K˜—šœ ™ Kšœ4™4Kšœ-™-š œ$ œ™,Kšœ$ œ™)KšœL™LKšœ"™"Kš œ+™0K™—K™——Kš œ˜K˜K™"K™š œ5 ™;Kšœ/™/—Kšœ™—Kš œ˜K˜——˜š ’œ œ œ œ  œ˜VKšœ  œ˜Kšœ œ œ˜Kšœ œ œ˜Kšœ œ˜ Kšœ œ˜ Kšœ# œ˜(Kšœ œ˜%K˜š œ œ ˜%Kš œC˜H—K˜Kš ˜š œ œ˜šœ˜Kš œ  œ œ ˜%Kšœ$ œ˜)K˜—šœ˜Kš œ  œ œ ˜%K˜ K˜Kš œ˜ Kšœ˜—Kš œ  œ ™K˜—˜š œ) œ ˜3Kš œ%˜*—K˜"šœ˜Kšœ; œ˜E—K˜Kšœ&˜&Kšœ‘˜,Kš œ˜š ˜šœ ˜ Kšœ$ œ˜)šœC˜CKšœ˜—Kš œ#˜(K˜—šœ ™ Kšœ4™4Kšœ-™-š œ$ œ™,Kšœ$ œ™)KšœL™LKšœ"™"Kš œ+™0K™—K™——Kš œ˜K˜Kšœ"™"K™š œ5 ™;Kšœ.™.—Kšœ™Kšœ6™6—Kš œ˜K˜——˜š’œ œ œ‘"˜RKšœ œ%˜8Kš œ œ/˜DKšœ˜Kšœ œ˜%K˜——˜š’œ œ˜1K˜#K˜!K˜K˜—š’ œ œ˜'Kš œ" œ œ œ˜4š  œ œ œ œ+ œ œ ˜QKšœ"˜"Kš œ˜—K˜—K˜š’œ œ œ˜8K˜%Kšœ2˜2Kš œ œ#˜=Kšœ#‘˜9Kšœ˜Kšœ‘˜5Kšœ%˜%Kš œ œ œ' œ˜OKš œ œ œ, œ˜Nš œ œ ˜$Kšœ; œ˜A—Kšœ  œ˜Kšœ œ˜Kšœ œ˜Kšœ œ˜Kšœ œ˜Kšœ.˜.K˜—K˜——K˜šœE™EK™š ’œ œ œ œ œ˜UKšœ œ6˜OKšœ œ˜(Kš œ œ-˜6K˜K™—š’ œ œ œ˜4šœ›™›š ˜K˜Oš œ  ˜˜ K˜1Kšœ:˜:K˜#K˜)K˜—šœ˜Kšœ ˜Kšœ  œ#˜3K˜AKšœ/˜/Kšœ5˜5K˜#K˜—šœ˜K˜K˜'Kšœ0˜0Kšœ#˜#K˜—šœ˜Kšœ œ#˜5Kšœ œ˜Kšœ œ˜ Kšœ; œ˜SKšœ/˜/K˜+Kšœ$˜$K˜&Kšœ*‘˜2K˜Kšœ*˜*K˜8Kšœ>˜>K˜K˜#K˜-Kšœ˜Kš œ˜K˜—Kš œ œ˜——Kš œ˜—K˜K˜—šœ œ)˜?K˜—š’œ œ œ˜RKšœπ™πKšœ  œ˜Kšœ œ˜"Kšœ œ˜Kšœ œ˜Kšœ  œ˜"Kšœ œ œ˜4Kšœ œ˜"Kšœ  œ˜+K˜Kš ˜š œ œ˜"Kš œ  œ œ ˜%Kš  œ œ œ œ œ˜8K˜—K˜Kšœ2 œ˜:Kšœ œ˜Kšœ œ6˜OK˜K˜Gš œ) œ˜1š œ$ ˜*š œ*˜/š œC˜EKšœ˜———K˜š œ˜K˜(K˜K˜——K˜š œF œ˜NKšœ  œ œi˜~K˜šœ˜šœf˜fKšœ5˜5——K˜"KšœE‘ ˜OKšœ9‘ ˜EK˜7š œF ˜LKšœ*˜*—K˜(K˜—K˜š œ& œ‘˜DKšœ œ(˜BKšœ œ œ˜2š œ œ˜K˜#Kšœ'‘%˜L˜Kšœ8˜8—K˜—šœ˜šœP˜PKšœ<˜<——š ˜Kšœ œ œ.˜LKšœB˜BK˜KKšœD˜DK˜#Kšœ˜K˜DKš œ œ œ˜'Kš œ˜—Kšœ‘ ˜?K˜—K˜Kšœ9™9K˜:Kšœ5‘˜Nšœ˜šœ]˜]Kšœ. œ˜N——š ˜Kšœ œ˜ Kšœ œ˜ Kšœ œ˜K˜K˜K˜7š œ  œ ˜Kš œ  œ œ‘ ˜#š œ˜Kšœ  œ4˜Dš œ ˜š œ'˜,Kš œ<˜>——K˜Kš œ˜K˜——K˜š œ ˜K˜K˜6K˜Kšœ) œ‘˜IK˜K˜6K˜šœ(˜(š œ œ&˜GKš œ>˜B——K˜šœ˜š œ œ‘ ˜2Kšœ œ'˜8š œ% œ%˜QKš œ>˜B—K˜Kš œ>˜B—K˜—K˜˜š œ œ&˜Gš œ˜Kšœ  œ˜K˜>Kšœ.˜.K˜———K˜K˜?K˜šœ˜Kšœ œ'˜8š œ% œ%˜QKš œ?˜C—K˜—K˜š œ˜ š œ œ! œ˜?Kšœk™kKšœ™Kšœ  œ˜K˜>š œ œ˜KšœL™LKš œ œ/˜EK˜0š œ œ ˜*š œ  œ  ˜šœ/˜/Kš œ˜———Kšœ˜—Kšœ˜—KšœR™RKš œ'˜+——K˜š œ+ œ˜3Kšœ  œ˜K˜GKšœ@˜@K˜"K˜K˜EK˜$K˜/K˜Kšœ˜—K˜š œe œ˜mKšœ  œ˜K˜GKšœ@˜@K˜#K˜$K˜K˜—K˜Kš œ˜—K˜šœ ™ š œ  œ˜(K˜KKšœD˜DK˜#K˜(Kšœ˜—Kš œ˜—˜š ˜šœ ˜ Kšœ œ œ˜K˜"K˜"K˜2Kš œ œ/˜DKšœ˜Kšœ˜——Kš œ˜—Kš œ˜K˜K˜—š*™*K˜—š’œ œ œ˜7Kšœ œ˜Kšœ œ œ˜Kšœ  œ œ˜Kšœ œ œ˜š’œ œ˜ š œ: œ˜BKšœ# œ˜,—K˜2K˜?Kš œ œ œ˜"K˜"K˜—š’œ œ  œ‘˜3Kšœ œ˜K˜K˜Kšœ(˜(K˜&š œ  ˜š œ ˜%Kš œ6˜8——K˜š œ œ  œ ˜"K˜šœ˜Kšœ?˜?—K˜Kšœ&˜&K˜˜(K˜Kšœ%˜%Kšœ ˜ —K˜šœ˜˜K˜ Kšœ-˜-Kšœ'˜'Kšœ%˜%Kšœ˜——K˜˜Kšœ8˜8Kšœ% œ˜+K˜—K˜˜K˜%K˜$K˜#Kšœ+˜+Kšœ  œ˜K˜—K˜šœ˜Kšœ˜Kšœ$˜$K˜—K˜šœ˜Kšœ)˜)K˜(Kšœ#˜#Kšœ/˜/Kšœ  œ˜K˜—K˜š œ˜ š œ,˜1Kš œ3˜5——Kšœ‘˜—K˜&Kšœ#˜#K˜—š œ# ˜)Kš œE˜J—š œ œ˜&Kš œ œ œ˜9Kšœ˜K˜—Kšœ, œ˜3š œ œ˜Kšœ3˜3Kšœ œ˜Kš ˜Kšœ˜—Kš œ œ œ˜"Kšœ8˜8Kšœ.˜.š œ  œ˜Kšœ œ˜Kšœ7˜7Kšœ œ˜š œ  ˜Kšœ+˜+Kš œ=˜A—K˜—K˜K˜—š’œ œ1 œ˜QKšœ œ˜(Kšœ& œ˜*Kšœ œ˜š œ% ˜+Kš œ! œ+˜S—Kšœ.‘˜DK˜'K˜!Kšœ œ˜/š œ˜Kš  œK˜U—K˜Kšœa˜aKšœG˜GKšœ% œ˜+KšœA˜AK˜&K˜—K˜Kš1™1K˜š ’œ œ œ' œ œ˜IKšœ œ  œ˜K˜Kšœ  œ˜Kšœ œ‘*˜4Kšœ* œ˜2K˜K˜Kšœ œ˜Kšœ  œ<˜KKšœ  œ˜Kšœ  œ˜Kšœ œ œ˜Kšœ œ˜,Kšœ* œ˜2K˜K˜ K˜š ˜Kšœ œ‘˜Pš œ œ ˜šœ˜Kš œ  œ œ ˜%š œ œ˜"Kšœ<˜K˜&Kšœ&˜&K˜Kš œ)˜-—Kšœ0˜0Kšœ˜—Kšœ˜Kšœ˜Kšœ% œ˜+K˜K˜—š’ œ œ œ( œ œ œ œ œ  œ œ! œ˜½Kšœ  œ œ˜š’œ œ˜ š œ2 ˜8K˜@—K˜5K˜K˜—š’œ œ  œ˜ Kšœ  œ˜K˜wKš œ  œ œ˜š œ  œ˜Kšœ œ'˜.Kšœ&˜&Kšœ%˜%K˜š œ˜Kšœ œ$˜+Kšœ:˜:K˜%Kšœ ‘˜7K˜=K˜——Kšœ# œ˜)šœ]˜]Kšœ™—Kšœ# œ˜+KšœA˜AKšœ&˜&Kšœ˜—š’œ œ˜ Kš œ œ œ=˜OK˜,K˜—K˜Kšœ˜Kšœ œ˜!Kš  œ  œ œ œ œ œ‘˜NK˜Kšœ  œ˜š œ  œ˜Kšœ˜š œ  œ˜Kšœ˜šœ˜KšœE˜E—Kšœ œ‘˜-K˜)K˜—Kšœ˜—K˜Kšœ œ˜!K˜K˜—šœ œ œ˜ Kšœ œ œ% œ˜I—˜K˜—•StartOfExpansions -- [msgID: ROPE, msg: TapMsgQueue.Msg, filterID: ROPE, annot: TapFilter.Annotation] RETURNS [doIt: BOOLEAN _ TRUE]š’œ˜)KšΠck„™„Kšœ  œ ˜ K˜K˜$K˜K˜—š ’œ œ' œ œ œ ˜bš œ˜Kšœ> œ  œ˜bKš œ˜ Kšœ˜—š ’ œ œ# œ œ œ˜KK™&Kšœ œ˜Kšœ  œ œ˜K˜=Kš œ œ œ œ˜"Kš œ œ˜?K˜ KšœE˜EKš œ$˜*K˜—š’œ œ œ œ˜FKšœ˜Kšœ  œ˜'K˜3K˜ Kšœ  œC˜SKšœ  œ$˜4Kšœ.˜.K˜—š’œ œ˜ K™K–$[key: ROPE, default: ROPE _ NIL]šœ œE œ˜dKšœ œ< œ˜SKšœ œ@ œ˜[K˜3K–[]š œ œ œ œ œ œ œ œ˜SK™$š œ œ œ˜K˜)Kšœs œ˜“Kš œ œ œ œ˜$K˜—K™%Kšœ œ˜#K˜.K–9[agent: TapFilter.Agent, proc: TapFilter.MonitorProc]˜TKšœI˜IK™>Kšœ&˜&Kšœ8 œ˜>Kšœ˜K–9[agent: TapFilter.Agent, proc: TapFilter.MonitorProc]šœ4 œ œ˜JK˜—K˜ Kšœ œ˜ Kš  œ œ œ œ œ˜/K˜K™—š ’œ œ œ œ œ  œ˜jK™Kš œ œ œ˜Kšœ œ œ˜Kšœ  œ˜Kšœ œ œ˜K˜š’œ œ˜ Kšœ œ˜Kš œ œ œ*˜CK˜@Kšœ%˜%K˜—K˜š’œ œ˜K˜(Kšœ$˜$K˜—K˜š’œ œ  œ˜Kšœ  œ˜š œ  œ˜Kšœ œ˜K˜‡Kš œ œ œ˜K˜-Kšœ)˜)Kšœ&˜&Kšœ#˜#K˜š œ‘˜Kšœ œ˜(Kšœ œ˜!K˜4š œ/ ˜5š œ! œ˜AKšœ˜——Kšœ"‘˜8K˜'K˜!K˜——šœ˜KšœP˜P—šœ˜KšœI˜I—Kšœ# œ˜+KšœA˜AKšœ&˜&K˜—K˜Kšœ˜šœ2 œ œ ˜EKšœ2 œ œ ˜H—Kš œ  œ œ œ˜!K˜Kšœ œ˜ K˜š œ œ ˜Kš œ( œ˜6š œ ˜Kšœ  œ'˜4Kš œ  œ œ-˜AKš œ˜ Kšœ˜—Kš œ˜K˜˜KšœL˜L—š ˜Kšœ  œ˜——Kš œ˜Kšœ œ œ  œ˜1K˜š œ œ˜ KšœA˜AKš œ  œ œ?˜SKš œ˜ K˜Kš œ˜ —K˜Kšœ˜š œ œ˜ Kšœ]˜]Kš œ˜ K˜—Kšœ3˜3Kšœ œ˜K˜K˜—Kšœ œ‘˜PKšœ œ‘˜LKšœ œ˜9šœ œ˜5K˜—š’œ œ œ˜ š œ œ œ œ œ˜LKšœ œ œ˜—K™αKšœ  œ˜Kšœ œ œ˜K˜Kšœ œ˜Kšœ œ˜Kšœ œ5 œ˜QKš œ œ œ œ œ˜ZKš œ œ œ œ œ˜]Kšœ œ œ˜K˜š’ œ œ œ œ œ œ  œ˜hKšœ* œ˜.Kšœ œ˜Kšœ  œ˜Kšœ  œ˜K˜EKšœ  œ œ' œ'˜v˜KšœF˜F—Kš œ œ$‘ ˜YK˜0Kšœ#‘˜9K˜š œ œ˜šœH˜HKšœ!˜!—Kšœ œ˜Kš ˜K˜—K™eKšœI˜IKšœP˜PKšœ˜š œ œ˜KšœJ˜JKšœ œ œ œ˜6K˜—Kšœ˜Kšœ@˜@Kšœ œ œ œ˜6K˜—K˜Kšœ œ˜ Kšœ˜K˜š œ œ ˜šœ˜Kš œ  œ œ ˜%Kš  œ  œ œ œ œ  œ˜GKš œ œA˜cK˜—šœ˜Kš œ  œ œ ˜%Kš  œ  œ œ œ œ  œ˜GKš œ˜K˜—šœ˜Kš  œ  œ œ œ œ  œ˜GKš œ˜K˜—Kš œ$ œ' œ ˜^Kš œ1 œ˜>Kš œ8 œ˜EKš œ˜K˜š ˜˜Kšœ. œ œ ˜@Kšœ( œ˜:—š ˜Kšœ œ˜—Kš œ˜—K˜š œ  œ œ˜Kšœ:˜:Kš ˜Kšœ˜—K˜š œ œ&˜4š œ˜Kšœ˜Kšœ˜K˜——Kšœ$˜$š  œ œ œ) œ œ ˜GK˜š œ( œ˜0KšœR˜RKš œ˜Kšœ˜—Kš œ œ  œ œ˜9Kšœ˜Kšœ3 œ ˜D—Kš œ˜K˜š ˜šœ˜Kšœ˜Kš œ7˜