<<>> <> <> <> <> <> <> <<>> <> <<>> <<>> DIRECTORY BasicTime USING [GMT, nullGMT, Now], FS USING [BytesForPages, Error, <> <> PagesForBytes, StreamOpen], IO, LoganBerry USING [Error], LoganBerryEntry USING [I2V], Process USING [PauseMsec], RefText USING [MaxLen, New, TrustTextAsRope], Rope, -- UserCredentials USING [Get], TapFilter USING [Agent, CreateAgent, Error, IsAgentIdle, MonitorAgent, MonitorProc, ParseMsgIntoFields, WakeupAgent, msgID, seqNum], TapMsgQueue USING [Create, Msg, Put], UserProfile USING [Boolean, Token], ViewerClasses USING [Viewer], ViewerIO USING [CreateViewerStreams, GetViewerFromStream], ViewerOps USING [FindViewer], WalnutDB --using lots-- , WalnutDefs USING [CheckReportProc, dontCareMsgSetVersion, Error, LogInfo, MsgSet, VersionMismatch, WalnutOpsHandle], WalnutKernelDefs USING [LogEntry, LogExpungePhase, MsgLogEntry, WhichTempLog], WalnutLog --using lots-- , WalnutLogExpunge --using lots-- , WalnutMiscLog USING [CreateReadArchiveLog, GetNewMailLog, walnutItemFixedLength], WalnutOps, WalnutOpsInternal, WalnutRegistryPrivate USING [NotifyForEvent], WalnutRoot USING [AbortTransaction, AcquireWriteLock, CommitAndContinue, GetStreamsForExpunge, Open, RegisterStatsProc, RootHandle, RootHandleRec, Shutdown, StartTransaction, StatsReport, SwapLogs, UnregisterStatsProc], WalnutStream USING [Open]; WalnutOpsInternalImpl: CEDAR PROGRAM IMPORTS BasicTime, FS, IO, LoganBerryEntry, Process, RefText, Rope, <> UserProfile, ViewerIO, ViewerOps, LoganBerry, TapFilter, TapMsgQueue, WalnutDB, WalnutDefs, WalnutLog, WalnutLogExpunge, WalnutMiscLog, WalnutOps, WalnutOpsInternal, WalnutRegistryPrivate, WalnutRoot, WalnutStream EXPORTS WalnutDefs, WalnutOpsInternal = BEGIN OPEN WalnutOps, WalnutOpsInternal; <<>> <> ROPE: TYPE = Rope.ROPE; STREAM: TYPE = IO.STREAM; GMT: TYPE = BasicTime.GMT; CheckReportProc: TYPE = WalnutDefs.CheckReportProc; LogInfo: TYPE = WalnutDefs.LogInfo; MsgSet: TYPE = WalnutDefs.MsgSet; WalnutOpsHandle: TYPE = WalnutDefs.WalnutOpsHandle; RootHandle: TYPE = WalnutRoot.RootHandle; RootHandleRec: PUBLIC TYPE = WalnutRoot.RootHandleRec; KernelHandle: TYPE = WalnutOpsInternal.KernelHandle; KernelHandleRec: PUBLIC TYPE = WalnutOpsInternal.KernelHandleRec; WhichTempLog: TYPE = WalnutKernelDefs.WhichTempLog; OutCome: TYPE = { done, couldntRestart }; <<>> <> heavyhandedDebugging: BOOL ¬ FALSE; debugging: BOOL ¬ FALSE; DBErrorSeen: SIGNAL = CODE; WDErrorSeen: SIGNAL = CODE; defaultLogLength: INT ¬ FS.BytesForPages[10000]; -- five megabytes expungeCommitFrequency: INT ¬ 200; commitFrequency: INT ¬ expungeCommitFrequency; shutdownFrequency: INT ¬ 1200; parseCommitFrequency: INT ¬ 200; dontCareMsgSetVersion: INT = WalnutOps.dontCareMsgSetVersion; dontCareDomainVersion: INT = WalnutOps.dontCareDomainVersion; feederSeqNum: INT ¬ 0; <<>> <> <<-- * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *>> StartStatsReporting: PUBLIC PROC[opsH: WalnutOpsHandle] = { <> which: ROPE = "WalnutLB"; kH: KernelHandle = opsH.kernelHandle; <> statsFile: ROPE ¬ "/tmp/WalnutLB.log"; IF kH.statsStream # NIL THEN kH.statsStream.Close[ ! IO.Error => CONTINUE]; kH.statsStream ¬ FS.StreamOpen[fileName: statsFile, accessOptions: $create, keep: 4]; IF heavyhandedDebugging THEN { name: ROPE = Rope.Concat[which, " Stats"]; kH.statsProgressTS ¬ ViewerOps.FindViewer[name]; kH.statsProgressStream ¬ ViewerIO.CreateViewerStreams[name, kH.statsProgressTS].out; IF kH.statsProgressTS = NIL THEN kH.statsProgressTS ¬ ViewerIO.GetViewerFromStream[kH.statsProgressStream]; kH.statsProgressTS.inhibitDestroy ¬ TRUE; }; WalnutRoot.RegisterStatsProc[opsH, StatsReport]; }; StatsReport: PUBLIC WalnutDefs.CheckReportProc = { r: ROPE = "@ %g\n"; thyme: GMT = BasicTime.Now[]; strm: STREAM; IF (strm ¬ opsH.kernelHandle.statsStream) = NIL THEN RETURN; strm.PutRope["\n "]; strm.PutF[format, v1, v2, v3]; strm.PutF1[r, [time[thyme]] ]; IF (strm ¬ opsH.kernelHandle.statsProgressStream) # NIL THEN { strm.PutRope["\n "]; strm.PutF[format, v1, v2, v3]; strm.PutF1[r, [time[thyme]] ]; }; }; <<>> <> CarefullyApply: PUBLIC PROC[opsH: WalnutOpsHandle, proc: PROC[], didUpdate: BOOL] = { reTryCount: INT ¬ 2; schemaInvalid: BOOL ¬ TRUE; eCode: ATOM; exp: ROPE; opsH.kernelHandle.recentActivity ¬ TRUE; IF NOT opsH.kernelHandle.started THEN ERROR WalnutDefs.Error[$root, $NotStarted, "Must do WalnutOps.Startup"]; DO BEGIN ENABLE { WalnutDefs.Error => { IF debugging THEN SIGNAL WDErrorSeen; opsH.kernelHandle.errorInProgress ¬ TRUE; }; LoganBerry.Error => { IF debugging THEN SIGNAL DBErrorSeen; eCode ¬ ec; exp ¬ explanation; GOTO dbError }; < {>> <> <> <<};>> }; IF opsH.rootHandle.currentLog.readStream = NIL THEN ERROR WalnutDefs.Error[$log, $LogNotOpen]; proc[]; IF didUpdate THEN WalnutRoot.CommitAndContinue[opsH]; RETURN; EXITS dbError => { opsH.kernelHandle.errorInProgress ¬ TRUE; WalnutRoot.StatsReport[opsH, "\n ***DB error - code: %g, exp: %g ", [atom[eCode]], [rope[exp]] ]; ERROR WalnutDefs.Error[$db, eCode, exp]; }; < {>> <> <> <> <> <> <> <> <<};>> <<};>> END; <> <<>> <> << [] _ WalnutDB.InitSchema[opsH, schemaInvalid];>> <> ENDLOOP; }; LongRunningApply: PUBLIC PROC[opsH: WalnutOpsHandle, proc: PROC[inProgress: BOOL]] = { reTryCount: INT ¬ 2; alreadyCalled: BOOL ¬ FALSE; schemaInvalid: BOOL ¬ TRUE; eCode: ATOM; exp: ROPE; opsH.kernelHandle.recentActivity ¬ TRUE; opsH.kernelHandle.isShutdown ¬ FALSE; IF NOT opsH.kernelHandle.started THEN ERROR WalnutDefs.Error[$root, $NotStarted, "Must do WalnutOps.Startup"]; DO BEGIN ENABLE { WalnutDefs.Error => { IF debugging THEN SIGNAL WDErrorSeen; opsH.kernelHandle.errorInProgress ¬ TRUE; }; LoganBerry.Error => { IF debugging THEN SIGNAL DBErrorSeen; eCode ¬ ec; exp ¬ explanation; GOTO dbError }; < GOTO aborted;>> }; IF opsH.rootHandle.currentLog.readStream = NIL THEN ERROR WalnutDefs.Error[$log, $LogNotOpen]; WalnutRoot.AcquireWriteLock[opsH]; proc[alreadyCalled ! WalnutDefs.VersionMismatch => { ReleaseWriteLock[opsH]; REJECT} ]; WalnutDB.SetOpInProgressPos[opsH, -1]; ReleaseWriteLock[opsH]; -- also does commit RETURN; EXITS dbError => { opsH.kernelHandle.errorInProgress ¬ TRUE; WalnutRoot.StatsReport[opsH, "\n ***DB error - code: %g, exp: %g ", [atom[eCode]], [rope[exp]] ]; ERROR WalnutDefs.Error[$db, eCode, exp]; }; < {>> <> <> <> <> <> <> <> <<};>> <<};>> END; <> <<>> <> <<[] _ WalnutDB.InitSchema[opsH, schemaInvalid];>> <> < 0;>> ENDLOOP; }; Restart: PUBLIC PROC[opsH: WalnutOpsHandle] = { -- need to re-open the transaction schemaInvalid: BOOL = WalnutRoot.StartTransaction[opsH]; IF schemaInvalid THEN [] ¬ WalnutDB.InitSchema[opsH, schemaInvalid]; WalnutLog.OpenLogStreams[opsH]; opsH.kernelHandle.isShutdown ¬ FALSE; }; ReleaseWriteLock: PROC[opsH: WalnutOpsHandle] = { WalnutRoot.CommitAndContinue[opsH]; WalnutLog.ReleaseWriteLock[opsH]; }; CheckReport: PUBLIC CheckReportProc = { IF opsH.kernelHandle.reporterList = NIL THEN RETURN; FOR rL: LIST OF STREAM ¬ opsH.kernelHandle.reporterList, rL.rest UNTIL rL= NIL DO rL.first.PutF[format, v1, v2, v3]; ENDLOOP; }; InternalShutdown: PUBLIC PROC[opsH: WalnutOpsHandle] = { kH: KernelHandle = opsH.kernelHandle; WalnutRoot.UnregisterStatsProc[opsH, StatsReport]; IF kH.errorInProgress THEN WalnutRoot.AbortTransaction[opsH]; WalnutLogExpunge.EndExpunge[opsH]; -- clear its variables WalnutLog.ShutdownLog[opsH]; WalnutRoot.Shutdown[opsH]; -- takes care of database StatsReport[opsH, "\n *** Shutdown"]; IF kH.statsProgressTS # NIL THEN { kH.statsProgressTS.inhibitDestroy ¬ FALSE }; IF kH.statsStream # NIL THEN { kH.statsStream.Close[]; kH.statsStream ¬ NIL }; IF kH.statsProgressStream # NIL THEN { kH.statsProgressStream.Close[]; kH.statsProgressStream ¬ NIL }; kH.started ¬ FALSE; kH.recentActivity¬ FALSE; kH.isShutdown ¬ TRUE; kH.errorInProgress ¬ FALSE; kH.mailStream ¬ NIL; WalnutRegistryPrivate.NotifyForEvent[stopped]; }; <<* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *>> <<>> ComputeMaxExpungeLogPages: PUBLIC PROC[opsH: WalnutOpsHandle] RETURNS[pages: INT] = { bytesInDestroyedMsgs: INT = WalnutDB.GetExpungeInfo[opsH].bytesInDestroyedMsgs; logLen: INT = WalnutLog.LogLength[opsH]; RETURN[FS.PagesForBytes[logLen-bytesInDestroyedMsgs]]; }; <<>> DoLogExpunge: PUBLIC PROC[opsH: WalnutOpsHandle] = { <> DO expPhase: WalnutKernelDefs.LogExpungePhase ¬ WalnutDB.GetLogExpungePhase[opsH]; SELECT expPhase FROM idle => { StatsReport[opsH, "\n ~~~ Starting log expunge"]; WalnutDB.SetLogExpungePhase[opsH, initializingExpungeLog]; WalnutRoot.CommitAndContinue[opsH]; commitFrequency ¬ expungeCommitFrequency; }; initializingExpungeLog => { expungeFileID: INT; pagesNeeded: INT = ComputeMaxExpungeLogPages[opsH]; expungeFileID ¬ WalnutLogExpunge.StartExpunge[opsH, pagesNeeded]; WalnutDB.SetExpungeFileID[opsH, expungeFileID]; WalnutDB.SetLogExpungePhase[opsH, writingExpungeLog]; WalnutRoot.CommitAndContinue[opsH]; }; writingExpungeLog => { [] ¬ WriteExpungeLog[opsH]; [] ¬ WalnutLogExpunge.EndExpunge[opsH]; WalnutDB.SetLogExpungePhase[opsH, swappingLogs]; WalnutRoot.CommitAndContinue[opsH]; }; swappingLogs => { expungeFileID: INT ¬ WalnutDB.GetExpungeFileID[opsH]; date: BasicTime.GMT; logLen: INT; [] ¬ WalnutRoot.GetStreamsForExpunge[opsH: opsH, starting: FALSE, pagesWanted: -1]; StatsReport[opsH, "\n Swapping Log Files\n"]; WalnutDB.SetExpungeProgressInfo[opsH, 0,0]; WalnutDB.SetExpungeInfo[opsH, 0, 0]; WalnutDB.SetOpInProgressPos[opsH, -1]; WalnutDB.SetLogExpungePhase[opsH, idle]; -- whew date ¬ BasicTime.Now[]; WalnutDB.SetTimeOfLastExpunge[opsH, date]; logLen ¬ WalnutRoot.SwapLogs[opsH, expungeFileID, date]; WalnutDB.SetRootFileVersion[opsH, opsH.rootHandle.createDate]; WalnutRoot.CommitAndContinue[opsH]; StatsReport[opsH, "\n ~~~ Finished expunge"]; WalnutLog.OpenLogStreams[opsH]; RETURN; }; ENDCASE => ERROR; ENDLOOP; }; ExpungeLogResult: TYPE = { unknown, couldntRestart, finished }; WriteExpungeLog: PROC[opsH: WalnutOpsHandle] RETURNS[result: ExpungeLogResult] = { <> numMsgs: INT ¬ 0; currentLogPos, expungeLogPos: INT; previousAt, at: INT; startRetryCount: INT = 2; retryCount: INT ¬ startRetryCount; bytesToCopyBeforeFlush: INT ¬ FS.BytesForPages[500]; lastExpungeCommitLength: INT ¬ -1; oldLength: INT = WalnutLog.LogLength[opsH]; DO BEGIN ENABLE WalnutDefs.Error => { IF debugging THEN SIGNAL WDErrorSeen; IF code = $TransactionAbort THEN GOTO retry ELSE REJECT; }; bytesCopiedSinceLastCommit, numMsgsSinceShutdown: INT ¬ 0; lastAcceptNewMailPos: INT; firstDestroyedMsgPos: INT ¬ WalnutDB.GetExpungeInfo[opsH].firstDestroyedMsgPos; [currentLogPos, expungeLogPos] ¬ WalnutDB.GetExpungeProgressInfo[opsH]; IF lastExpungeCommitLength = expungeLogPos THEN { IF (retryCount ¬ retryCount - 1) <= 0 THEN ERROR WalnutDefs.Error[$log, $TooManyLogAborts, IO.PutFR1["\n During Expunge - last commit pos in expungeLog was %g", [integer[expungeLogPos]]] ]; } ELSE { lastExpungeCommitLength ¬ expungeLogPos; retryCount ¬ startRetryCount; }; IF ~WalnutLogExpunge.RestartExpunge[opsH, currentLogPos, expungeLogPos] THEN { pagesNeeded: INT ¬ FS.PagesForBytes[WalnutLogExpunge.ExpLogLength[opsH] - WalnutDB.GetExpungeInfo[opsH].bytesInDestroyedMsgs]; CheckReport[opsH, "\n ****Could not restart expunge from %g in expungeLog, %g in currentLog - restarting Log Expunge\n", [integer[expungeLogPos]], [integer[currentLogPos]] ]; expungeLogPos ¬ currentLogPos ¬ 0; WalnutDB.SetExpungeProgressInfo[opsH, currentLogPos, expungeLogPos]; -- commits [] ¬ WalnutLogExpunge.StartExpunge[opsH, pagesNeeded]; -- try again StatsReport[opsH, "Restarting expunge from beginning"]; IF ~WalnutLogExpunge.RestartExpunge[opsH, currentLogPos, expungeLogPos] THEN WalnutDefs.Error[$log, $CantStartExpunge]; lastExpungeCommitLength ¬ expungeLogPos; }; IF currentLogPos < firstDestroyedMsgPos THEN { -- copyingHead phase bytesLeftToCopyInHead: INT ¬ firstDestroyedMsgPos - currentLogPos; bytesToCopyAtOneTime: INT = FS.BytesForPages[400]; IF currentLogPos = 0 THEN { WalnutLogExpunge.SetIndex[opsH, 0]; [] ¬ WalnutLogExpunge.SkipEntry[opsH]; -- need to skip the LogFileInfo entry bytesLeftToCopyInHead ¬ bytesLeftToCopyInHead - WalnutLogExpunge.GetIndex[opsH]; }; CheckReport[opsH, "Copying the head of the log, starting at bytePos %g, ending at bytePos %g\n\t", [integer[currentLogPos]], [integer[firstDestroyedMsgPos]] ]; DO bytesToCopyThisTime: INT ¬ MIN[bytesLeftToCopyInHead, bytesToCopyAtOneTime]; WalnutLogExpunge.CopyBytesToExpungeLog[opsH, bytesToCopyThisTime]; [currentLogPos, expungeLogPos] ¬ WalnutLogExpunge.GetExpungeProgress[opsH]; WalnutDB.SetExpungeProgressInfo[opsH, currentLogPos, expungeLogPos]; WalnutRoot.CommitAndContinue[opsH]; CheckReport[opsH, "#"]; bytesLeftToCopyInHead ¬ bytesLeftToCopyInHead - bytesToCopyThisTime; IF bytesLeftToCopyInHead = 0 THEN EXIT; ENDLOOP; retryCount ¬ startRetryCount; -- in case has abort during copy }; <> lastAcceptNewMailPos ¬ WalnutDB.GetAcceptNewMailPos[opsH]; WalnutLogExpunge.SetIndex[opsH, at ¬ currentLogPos]; -- before doing PeekEntry CheckReport[opsH, "\nProcessing the tail of the log, starting at bytePos %g (old length %g bytes, %g pages)\n", [integer[at]], [integer[oldLength]], [integer[FS.PagesForBytes[oldLength]]] ]; DO ident: ATOM; msgID: ROPE; bytesThisCopy: INT ¬ 0; previousAt ¬ at; [ident, msgID, at] ¬ WalnutLogExpunge.PeekEntry[opsH]; IF ident = NIL THEN IF at # -1 THEN EXIT -- end of log ELSE { charSkipped: INT ¬ WalnutLogExpunge.SetPosition[opsH, previousAt+1]; IF charSkipped = -1 THEN ERROR WalnutDefs.Error[$log, $DuringExpunge, IO.PutFR1["No entry found after %g", [integer[previousAt]] ]]; at ¬ previousAt + charSkipped; LOOP; }; SELECT ident FROM $ExpungeMsgs => [] ¬ WalnutLogExpunge.SkipEntry[opsH]; $WriteExpungeLog => { result ¬ finished; EXIT }; -- last entry on the Log $LogFileInfo => [] ¬ WalnutLogExpunge.SkipEntry[opsH]; $RecordNewMailInfo, $StartCopyNewMail => IF at < lastAcceptNewMailPos THEN [] ¬ WalnutLogExpunge.SkipEntry[opsH] ELSE bytesThisCopy ¬ WalnutLogExpunge.CopyEntry[opsH].bytesCopied; $EndCopyNewMailInfo => { IF at < lastAcceptNewMailPos THEN { -- maybe skip startCopyPos: INT ¬ WalnutLogExpunge.EndCopyEntry[opsH]; IF firstDestroyedMsgPos < startCopyPos THEN [] ¬ WalnutLogExpunge.SkipEntry[opsH] ELSE bytesThisCopy ¬ WalnutLogExpunge.CopyEntry[opsH].bytesCopied; } ELSE bytesThisCopy ¬ WalnutLogExpunge.CopyEntry[opsH].bytesCopied; }; $AcceptNewMail => IF at < lastAcceptNewMailPos THEN [] ¬ WalnutLogExpunge.SkipEntry[opsH] ELSE { newLogPos: INT; [newLogPos, bytesThisCopy] ¬ WalnutLogExpunge.CopyEntry[opsH]; WalnutDB.SetAcceptNewMailPos[opsH, newLogPos]; }; $StartCopyReadArchive => [] ¬ WalnutLogExpunge.SkipEntry[opsH]; $EndCopyReadArchiveInfo => { startCopyPos: INT ¬ WalnutLogExpunge.EndCopyEntry[opsH]; IF firstDestroyedMsgPos < startCopyPos THEN [] ¬ WalnutLogExpunge.SkipEntry[opsH] ELSE bytesThisCopy ¬ WalnutLogExpunge.CopyEntry[opsH].bytesCopied; }; ENDCASE => IF msgID.Length[] = 0 OR WalnutDB.MsgExists[opsH, msgID] THEN { <> <> newLogPos: INT ; [newLogPos, bytesThisCopy] ¬ WalnutLogExpunge.CopyEntry[opsH]; IF ident = $CreateMsg THEN { <> IF newLogPos # at THEN WalnutDB.SetMsgEntryPosition[opsH, newLogPos]; numMsgsSinceShutdown ¬ numMsgsSinceShutdown + 1; IF (numMsgs ¬ numMsgs + 1) MOD 10 = 0 THEN IF numMsgs MOD 100 = 0 THEN CheckReport[opsH, "(%g) ", [integer[numMsgs]] ] ELSE CheckReport[opsH, "."]; }; } <> ELSE [] ¬ WalnutLogExpunge.SkipEntry[opsH]; IF numMsgsSinceShutdown >= shutdownFrequency THEN { expLogPos: INT; [currentLogPos, expLogPos] ¬ WalnutLogExpunge.GetExpungeProgress[opsH]; WalnutDB.SetExpungeProgressInfo[opsH, currentLogPos, expLogPos]; WalnutLogExpunge.EndExpunge[opsH]; ShortShutdown[opsH, 20, -1]; [] ¬ WalnutLogExpunge.RestartExpunge[opsH, currentLogPos, expLogPos]; lastExpungeCommitLength ¬ expLogPos; bytesCopiedSinceLastCommit ¬ bytesThisCopy ¬ 0; numMsgsSinceShutdown ¬ 0; }; IF (bytesCopiedSinceLastCommit ¬ bytesCopiedSinceLastCommit + bytesThisCopy) >= bytesToCopyBeforeFlush THEN { expLogPos: INT; [currentLogPos, expLogPos] ¬ WalnutLogExpunge.GetExpungeProgress[opsH]; WalnutDB.SetExpungeProgressInfo[opsH, currentLogPos, expLogPos]; WalnutRoot.CommitAndContinue[opsH]; lastExpungeCommitLength ¬ expLogPos; bytesCopiedSinceLastCommit ¬ 0; }; ENDLOOP; <> IF bytesCopiedSinceLastCommit > 0 THEN { [currentLogPos, expungeLogPos] ¬ WalnutLogExpunge.GetExpungeProgress[opsH]; WalnutDB.SetExpungeProgressInfo[opsH, currentLogPos, expungeLogPos]; WalnutRoot.CommitAndContinue[opsH]; lastExpungeCommitLength ¬ expungeLogPos; }; RETURN; EXITS retry => { schemaInvalid: BOOL ¬ TRUE; WalnutLogExpunge.EndExpunge[opsH]; WalnutRoot.AbortTransaction[opsH]; schemaInvalid ¬ WalnutRoot.StartTransaction[opsH]; IF schemaInvalid THEN [] ¬ WalnutDB.InitSchema[opsH, schemaInvalid]; WalnutLog.OpenLogStreams[opsH]; }; END; ENDLOOP; }; <> CheckInProgress: PUBLIC PROC[opsH: WalnutOpsHandle] = { inProgressPos: INT ¬ 0; parseInProgress: BOOL ¬ FALSE; needsParse: BOOL ¬ FALSE; doingLogExpunge: BOOL ¬ FALSE; Cip: PROC = { IF (parseInProgress ¬ WalnutDB.GetParseLogInProgress[opsH]) THEN { WalnutRoot.AcquireWriteLock[opsH]; RETURN }; inProgressPos ¬ WalnutDB.GetOpInProgressPos[opsH]; doingLogExpunge ¬ ( WalnutDB.GetLogExpungePhase[opsH] # idle ); IF inProgressPos <= 0 THEN RETURN; WalnutRoot.AcquireWriteLock[opsH]; }; Fip: PROC[inProgress: BOOL] = { -- FinishInProgress at: INT; wle: WalnutKernelDefs.LogEntry; WalnutLog.SetIndex[opsH, inProgressPos]; [wle, at] ¬ WalnutLog.NextEntry[opsH]; IF at = -1 THEN ERROR WalnutDefs.Error[$log, $BadLog, IO.PutFR1["No entry at %g", [integer[inProgressPos]]] ]; TRUSTED { WITH le: wle SELECT FROM ExpungeMsgs => WalnutDB.ExpungeMsgs[opsH, dontCareMsgSetVersion, CheckReport]; WriteExpungeLog => DoLogExpunge[opsH]; EmptyMsgSet => []¬ WalnutDB.EmptyMsgSet[ opsH, [ le.msgSet, dontCareMsgSetVersion ], WalnutOpsInternal.CheckReport ]; DestroyMsgSet => []¬ WalnutDB.DestroyMsgSet[ opsH: opsH, msgSet: [ le.msgSet, dontCareMsgSetVersion ], msDomainVersion: dontCareDomainVersion, report: WalnutOpsInternal.CheckReport ]; AcceptNewMail => { WalnutDB.AcceptNewMail[opsH, at, dontCareMsgSetVersion]; opsH.kernelHandle.newMailSomewhere ¬ FALSE; }; StartCopyNewMail => { FinishCopyTempLog[opsH, newMail, at]; WalnutDB.SetCopyMailLogPos[opsH, 0]; WalnutRoot.CommitAndContinue[opsH]; WalnutLog.FinishTempLogCopy[opsH, newMail]; needsParse ¬ TRUE; }; StartReadArchiveFile => { WalnutLog.ResetLog[opsH, at]; WalnutDB.SetReadArchivePos[opsH, 0]; }; StartCopyReadArchive => { FinishCopyTempLog[opsH, readArchive, at]; WalnutDB.SetCopyReadArchivePos[opsH, 0]; WalnutRoot.CommitAndContinue[opsH]; WalnutLog.FinishTempLogCopy[opsH, readArchive]; needsParse ¬ TRUE; }; ENDCASE => ERROR WalnutDefs.Error[$db, $ErroneousInProgress, IO.PutFR1["Entry at %g", [integer[inProgressPos]]] ]; }; -- end trusted WalnutDB.SetOpInProgressPos[opsH, -1]; WalnutRoot.CommitAndContinue[opsH]; }; IF opsH.kernelHandle.errorInProgress THEN ERROR WalnutDefs.Error[$db, $InternalError, "Must do Shutdown & Startup"]; IF opsH.kernelHandle.isShutdown THEN { IF opsH.rootHandle = NIL THEN [] ¬ WalnutRoot.Open[opsH]; Restart[opsH]; }; WalnutOpsInternal.CarefullyApply[opsH, Cip, FALSE]; IF parseInProgress THEN { StatsReport[opsH, "\n Continuing parseInProgress"]; [] ¬ ParseLog[opsH, TRUE]; RETURN }; IF inProgressPos <= 0 THEN RETURN; StatsReport[opsH, "\n Continuing longRunningOperation"]; WalnutOpsInternal.LongRunningApply[opsH, Fip]; IF needsParse THEN { numNew: INT ¬ 0; CheckReport[opsH, "Adding messages to the database\n"]; numNew ¬ ParseLog[opsH, FALSE]; IF numNew = 0 THEN CheckReport[opsH, "No messages were new\n"] ELSE CheckReport[opsH, " %g new messages\n", [integer[numNew]] ]; }; }; FinishCopyTempLog: PROC[opsH: WalnutOpsHandle, which: WhichTempLog, at: INT] = { logLen: INT ¬ WalnutLog.LogLength[opsH]; startedCopyAt, startCopyPos, fromPos: INT; pagesAlreadyCopied: INT; IF WalnutLog.SetPosition[opsH, at] # 0 THEN ERROR WalnutDefs.Error[$log, $BadLog, IO.PutFR1["no entry at %g", [integer[at]]] ]; startCopyPos ¬ WalnutLog.NextEntry[opsH].at; -- skip the copy entry startedCopyAt ¬ WalnutLog.NextAt[opsH]; fromPos ¬ logLen - startedCopyAt; pagesAlreadyCopied ¬ FS.PagesForBytes[fromPos]; IF ~WalnutLog.PrepareToCopyTempLog[opsH: opsH, which: which, pagesAlreadyCopied: pagesAlreadyCopied, reportProc: WalnutOpsInternal.CheckReport] THEN ERROR WalnutDefs.Error[$log, $DuringOpInProgress, "can't get tempLog for copy"]; CheckReport[opsH, "\nContinuing copy of tempLog, starting at bytePos %g\n", [integer[fromPos]] ]; WalnutLog.CopyTempLog[opsH, which, startCopyPos, fromPos, CheckReport]; WalnutDB.SetParseLogInProgress[opsH, TRUE]; WalnutDB.SetParseLogPos[opsH, WalnutDB.GetOpInProgressPos[opsH]]; WalnutDB.SetOpInProgressPos[opsH, -1]; }; <> ParseLog: PUBLIC PROC[opsH: WalnutOpsHandle, doingScavenge: BOOL ¬ FALSE] RETURNS[numNew: INT] = { parseLogPos: INT; at: INT; -- position in log of entry just processed lastNumNew, savedNumNew, savedLastNumNew: INT ¬ 0; wle: WalnutLog.LogEntry; charsSkipped: INT; errorRope: ROPE = "Can't find a log entry after pos %g (log length is %g)"; retries: INT ¬ 2; accepted: BOOL; giveStartInfo: BOOL ¬ TRUE; commitFrequency: INT ¬ parseCommitFrequency; updatesSinceLastCommit, numSinceShutdown: INT ¬ 0; numNew ¬ 0; DO currentLogLength: INT ¬ WalnutLog.LogLength[opsH]; -- in case at end of Expunge BEGIN ENABLE BEGIN WalnutDefs.Error => { IF debugging THEN SIGNAL WDErrorSeen; IF code = $TransactionAbort THEN { StatsReport[opsH, " *** TransactionAborts during parseLog"]; IF ( retries ¬ retries - 1) > 0 THEN GOTO tryAgain; opsH.kernelHandle.errorInProgress ¬ TRUE; StatsReport[opsH, " *** Too many TransactionAborts during parseLog"]; REJECT } ELSE { opsH.kernelHandle.errorInProgress ¬ TRUE; StatsReport[opsH, "InternalError: %g during parseLog", [atom[code]] ]; REJECT }; }; END; parseLogPos ¬ WalnutDB.GetParseLogPos[opsH]; IF giveStartInfo THEN { IF doingScavenge THEN numNew ¬ WalnutDB.SizeOfDatabase[opsH].messages ELSE numNew ¬ 0; IF doingScavenge AND numNew # 0 THEN CheckReport[opsH, " Parsing the log - database has %g messages so far\n", [integer[numNew]] ]; giveStartInfo ¬ FALSE; }; IF ( parseLogPos < 0 ) OR ( parseLogPos = currentLogLength ) THEN RETURN[numNew]; charsSkipped ¬ WalnutLog.SetPosition[opsH, parseLogPos]; IF charsSkipped = -1 THEN ERROR WalnutDefs.Error[$log, $BadLog, IO.PutFR[errorRope, [integer[parseLogPos]], [integer[currentLogLength]]] ]; at ¬ parseLogPos + charsSkipped; <<>> <> IF at # parseLogPos THEN { WalnutDB.SetParseLogPos[opsH, at]; WalnutRoot.CommitAndContinue[opsH]; }; accepted ¬ NOT WalnutDB.GetAddingServerMsgs[opsH]; DO previousAt: INT ¬ at; [wle, at] ¬ WalnutLog.NextEntry[opsH]; IF wle = NIL THEN { maybeBadAt: INT = at; charsSkipped ¬ WalnutLog.SetPosition[opsH, previousAt+1]; IF charsSkipped = -1 THEN ERROR WalnutDefs.Error[$log, $BadLog, IO.PutFR[errorRope, [integer[previousAt]], [integer[currentLogLength]]] ]; [wle, at] ¬ WalnutLog.NextEntry[opsH]; IF wle = NIL THEN ERROR WalnutDefs.Error[$log, $BadLog, IO.PutFR[errorRope, [integer[previousAt]], [integer[currentLogLength]]] ]; CheckReport[opsH, "\n*** Bad log entry at or near %g or %g; do a ScanWalnutLog sometime soon\n", [integer[previousAt]], [integer[at]] ]; }; IF numSinceShutdown >= shutdownFrequency THEN { WalnutDB.SetParseLogPos[opsH, at]; -- at is beginning of current (unseen) entry ShortShutdown[opsH, 20, at]; updatesSinceLastCommit ¬ 0; savedNumNew ¬ numNew; savedLastNumNew ¬ lastNumNew; numSinceShutdown ¬ 0; }; IF (updatesSinceLastCommit ¬ updatesSinceLastCommit+1) >= commitFrequency THEN { WalnutDB.SetParseLogPos[opsH, at]; -- at is beginning of current (unseen) entry WalnutRoot.CommitAndContinue[opsH]; updatesSinceLastCommit ¬ 0; savedNumNew ¬ numNew; savedLastNumNew ¬ lastNumNew; }; BEGIN ENABLE WalnutDefs.VersionMismatch => GOTO continue; TRUSTED { WITH le: wle SELECT FROM LogFileInfo => NULL; ExpungeMsgs => { <> WalnutDB.SetParseLogPos[opsH, at]; WalnutRoot.CommitAndContinue[opsH]; CheckReport[opsH, "\nStarting ExpungeMsgs "]; WalnutDB.ExpungeMsgs[opsH, dontCareMsgSetVersion, WalnutOpsInternal.CheckReport]; WalnutRoot.CommitAndContinue[opsH]; CheckReport[opsH, " ... finished ExpungeMsgs\n"]; }; WriteExpungeLog => { CheckReport[opsH, "\nStarting WriteExpungeLog "]; WalnutDB.SetParseLogPos[opsH, -1]; WalnutDB.SetParseLogInProgress[opsH, FALSE]; WalnutDB.SetOpInProgressPos[opsH, at]; -- at is beginning of current entry WalnutRoot.CommitAndContinue[opsH]; [] ¬ DoLogExpunge[opsH]; CheckReport[opsH, " ... finished WriteExpungeLog\n"]; EXIT; -- WriteExpungeLog had to be the last entry }; EndOfLog => { WalnutDB.SetParseLogPos[opsH, -1]; WalnutDB.SetParseLogInProgress[opsH, FALSE]; WalnutRoot.CommitAndContinue[opsH]; RETURN }; CreateMsgSet => [] ¬ WalnutDB.CreateMsgSet[opsH, le.msgSet, dontCareDomainVersion]; EmptyMsgSet => { [] ¬ WalnutDB.EmptyMsgSet[ opsH, [le.msgSet, dontCareMsgSetVersion], WalnutOpsInternal.CheckReport ]; }; DestroyMsgSet => { [] ¬ WalnutDB.DestroyMsgSet[ opsH: opsH, msgSet: [le.msgSet, dontCareMsgSetVersion], msDomainVersion: dontCareDomainVersion, report: WalnutOpsInternal.CheckReport ]; }; CreateMsg => { me: WalnutKernelDefs.MsgLogEntry = NARROW[wle]; me.show ¬ accepted; IF me.msg # NIL THEN { IF ~WalnutDB.AddNewMsg[opsH, me] THEN numNew ¬ numNew + 1; numSinceShutdown ¬ numSinceShutdown + 1; }; }; AddMsg => IF le.msg # NIL THEN []¬ WalnutDB.AddMsg[ opsH: opsH, msg: le.msg, from: [NIL, dontCareMsgSetVersion], to: [le.to, dontCareMsgSetVersion] ]; RemoveMsg => IF le.msg # NIL THEN []¬ WalnutDB.RemoveMsg[ opsH: opsH, msg: le.msg, from: [le.from, dontCareMsgSetVersion], deletedVersion: dontCareMsgSetVersion ]; MoveMsg => IF le.msg # NIL THEN []¬ WalnutDB.MoveMsg[ opsH: opsH, msg: le.msg, from: [ le.from, dontCareMsgSetVersion], to: [le.to, dontCareMsgSetVersion] ]; HasBeenRead => IF le.msg # NIL THEN WalnutDB.SetHasBeenRead[opsH, le.msg]; RecordNewMailInfo => { WalnutDB.SetNewMailInfo[ opsH, le.logLen, le.when, le.server, le.num]; opsH.kernelHandle.newMailSomewhere ¬ TRUE; }; StartCopyNewMail => { WalnutDB.SetCopyMailLogPos[opsH, at]; WalnutDB.SetAddingServerMsgs[opsH, TRUE]; accepted ¬ FALSE; }; EndCopyNewMailInfo => { WalnutDB.SetCopyMailLogPos[opsH, 0]; WalnutDB.SetNewMailInfo[opsH, 0, BasicTime.nullGMT, NIL, 0]; WalnutDB.SetAddingServerMsgs[opsH, FALSE]; accepted ¬ TRUE; }; AcceptNewMail => { WalnutDB.AcceptNewMail[opsH, at, dontCareMsgSetVersion]; opsH.kernelHandle.newMailSomewhere ¬ FALSE; }; StartReadArchiveFile => WalnutDB.SetReadArchivePos[opsH, at]; EndReadArchiveFile => WalnutDB.SetReadArchivePos[opsH, 0]; StartCopyReadArchive => WalnutDB.SetCopyReadArchivePos[opsH, at]; EndCopyReadArchiveInfo => WalnutDB.SetCopyReadArchivePos[opsH, 0]; ENDCASE => ERROR; }; EXITS continue => NULL; END; IF numNew # lastNumNew THEN IF numNew MOD 10 = 0 THEN IF numNew MOD 100 = 0 THEN CheckReport[opsH, "(%g) ", [integer[numNew]] ] ELSE CheckReport[opsH, "~"]; lastNumNew ¬ numNew; ENDLOOP; EXITS tryAgain => { schemaInvalid: BOOL ¬ TRUE; numNew ¬ savedNumNew; lastNumNew ¬ savedLastNumNew; WalnutRoot.AbortTransaction[opsH]; schemaInvalid ¬ WalnutRoot.StartTransaction[opsH, TRUE]; IF schemaInvalid THEN [] ¬ WalnutDB.InitSchema[opsH, schemaInvalid]; WalnutLog.OpenLogStreams[opsH]; giveStartInfo ¬ TRUE; }; END; ENDLOOP; }; ShortShutdown: PROC[opsH: WalnutOpsHandle, sec: INT, logPos: INT] = { WalnutLog.ShutdownLog[opsH]; -- this does a close transaction as well Process.PauseMsec[sec*1000]; [] ¬ WalnutRoot.StartTransaction[opsH, TRUE]; WalnutLog.OpenLogStreams[opsH]; IF logPos >= 0 THEN [] ¬ WalnutLog.SetPosition[opsH, logPos]; }; DoStartNewMail: PUBLIC PROC[opsH: WalnutOpsHandle] RETURNS[STREAM] = { newMailLogLength: INT ¬ -1; expungeInProgress: BOOL ¬ FALSE; Gnml: PROC = { newMailLogLength ¬ WalnutDB.GetNewMailLogLength[opsH]; expungeInProgress ¬ WalnutDB.GetLogExpungePhase[opsH] # idle; }; CheckInProgress[opsH]; IF opsH.kernelHandle.mailStream # NIL THEN RETURN[NIL]; CarefullyApply[opsH, Gnml, FALSE]; IF newMailLogLength = -1 OR expungeInProgress THEN RETURN[NIL]; RETURN[opsH.kernelHandle.mailStream ¬ WalnutMiscLog.GetNewMailLog[opsH, newMailLogLength, 200]]; }; DoAcceptNewMail: PUBLIC PROC[opsH: WalnutOpsHandle, activeVersion: INT] = { Anm: PROC[inProgress: BOOL] = { at: INT; IF ~inProgress THEN { [] ¬ WalnutDB.VerifyMsgSet[opsH, [ActiveName, activeVersion]]; at ¬ WalnutLog.AcceptNewMail[opsH].at; WalnutDB.SetOpInProgressPos[opsH, at]; } ELSE at ¬ WalnutDB.GetAcceptNewMailPos[opsH]; WalnutDB.AcceptNewMail[opsH, at, activeVersion]; }; CheckInProgress[opsH]; LongRunningApply[opsH, Anm]; opsH.kernelHandle.newMailSomewhere ¬ FALSE; }; DoNewMail: PUBLIC PROC[ opsH: WalnutOpsHandle, activeVersion: INT, proc: PROC[msg, tocEntry: ROPE, startOfSubject: INT]] RETURNS[responses: LIST OF WalnutOps.ServerInfo, complete: BOOL] = { someEntries: BOOL ¬ FALSE; Cml: PROC = { IF activeVersion # WalnutDefs.dontCareMsgSetVersion THEN [] ¬ WalnutDB.VerifyMsgSet[opsH, [ActiveName, activeVersion] ]; someEntries ¬ WalnutDB.GetNewMailLogLength[opsH] # 0; }; Cml2: PROC[inProgress: BOOL] = { fromPos: INT ¬ 0; complete ¬ WalnutLog.PrepareToCopyTempLog[opsH: opsH, which: newMail, pagesAlreadyCopied: 0, reportProc: CheckReport]; IF ~complete THEN RETURN; IF ~inProgress THEN { at: INT ¬ WalnutLog.StartCopyNewMail[opsH].at; WalnutDB.SetOpInProgressPos[opsH, at]; WalnutDB.SetCopyMailLogPos[opsH, at]; } ELSE { at: INT = WalnutDB.GetCopyMailLogPos[opsH]; CheckReport[opsH, "\n Continue copying the newMailLog\n"]; [] ¬ WalnutLog.SetPosition[opsH, at]; [] ¬ WalnutLog.NextEntry[opsH]; -- skip the copy entry fromPos ¬ WalnutLog.LogLength[opsH] - WalnutLog.NextAt[opsH]; }; WalnutDB.SetAddingServerMsgs[opsH, TRUE]; WalnutLog.CopyTempLog[opsH, newMail, WalnutDB.GetCopyMailLogPos[opsH], fromPos, CheckReport]; <> WalnutDB.SetParseLogInProgress[opsH, TRUE]; WalnutDB.SetParseLogPos[opsH, WalnutDB.GetOpInProgressPos[opsH]]; WalnutDB.SetOpInProgressPos[opsH, -1]; }; Gnm: PROC = { IF proc # NIL THEN WalnutDB.EnumerateUnacceptedMsgs[opsH, activeVersion, proc]; responses ¬ WalnutDB.EnumerateServers[opsH]; }; CheckInProgress[opsH]; CarefullyApply[opsH, Cml, FALSE]; IF opsH.kernelHandle.mailStream # NIL THEN RETURN[NIL, FALSE]; -- file is busy complete ¬ TRUE; IF someEntries THEN { LongRunningApply[opsH, Cml2]; IF complete THEN { WalnutLog.OpenLogStreams[opsH]; CheckReport[opsH, "Adding new mail to the database @ %g\n", [time[BasicTime.Now[]]] ]; [] ¬ ParseLog[opsH, FALSE]; -- "see" messages [] ¬ FilterMessages[opsH, activeVersion]; }; }; CarefullyApply[opsH, Gnm, FALSE]; }; TapRef: TYPE = REF TapFilterRep; TapFilterRep: TYPE = RECORD[opsH: WalnutOpsHandle, annotations: INT ¬ 0]; ReportProgress: TapFilter.MonitorProc = { <<[msgID: ROPE, msg: TapMsgQueue.Msg, filterID: ROPE, annot: TapFilter.Annotation, clientData: REF ANY] RETURNS [doIt: BOOLEAN _ TRUE]>> tR: TapRef ¬ NARROW[clientData]; CheckReport[tR.opsH, "@"]; tR.annotations ¬ tR.annotations + 1; }; FilterMessages: PROC[opsH: WalnutOpsHandle, activeVersion: INT] RETURNS [annotations: INT ¬ 0] ~ { ENABLE TapFilter.Error => { CheckReport[opsH, "Problem with filtering agent: %g - %g.\n", IO.atom[ec], IO.rope[explanation]]; CONTINUE; }; GetMsgText: PROC [opsHandle: WalnutOpsHandle, msg: ROPE] RETURNS [ROPE] = { <> textStart, textLen: INT; contents: REF TEXT; [textStart, textLen, ] ¬ WalnutDB.GetMsgText[opsHandle, msg]; IF textStart = 0 THEN RETURN[NIL]; IF textLen > RefText.MaxLen-8 THEN textLen ¬ RefText.MaxLen-8; contents ¬ RefText.New[textLen]; WalnutLog.GetRefTextFromLog[opsHandle, textStart, textLen, contents]; RETURN[RefText.TrustTextAsRope[contents]]; }; PassToFilterAgent: PROC[msg, tocEntry: ROPE, startOfSubject: INT] = { parsedMsg: TapMsgQueue.Msg; contents: ROPE ¬ GetMsgText[opsH, msg]; parsedMsg ¬ TapFilter.ParseMsgIntoFields[contents]; feederSeqNum ¬ feederSeqNum + 1; parsedMsg ¬ CONS[[TapFilter.seqNum, LoganBerryEntry.I2V[feederSeqNum]], parsedMsg]; parsedMsg ¬ CONS[[TapFilter.msgID, msg], parsedMsg]; TapMsgQueue.Put[parsedMsg, opsH.filterFeeder]; }; Fm: PROC = { <> wantAnnotations: BOOLEAN ¬ UserProfile.Boolean[key: "WallTapestry.AnnotateNewMail", default: FALSE]; filterDBName: ROPE ¬ UserProfile.Token[key: "WallTapestry.FilterDB", default: NIL]; annotationDBName: ROPE ¬ UserProfile.Token[key: "WallTapestry.AnnotationDB", default: NIL]; filteringAgent: TapFilter.Agent ¬ opsH.filterAgent; IF NOT wantAnnotations OR filterDBName = NIL OR annotationDBName = NIL THEN RETURN; <> IF filteringAgent = NIL THEN { opsH.filterFeeder ¬ TapMsgQueue.Create[]; filteringAgent ¬ opsH.filterAgent ¬ TapFilter.CreateAgent[feeder: opsH.filterFeeder, filterDB: filterDBName, user: NIL, annotDB: annotationDBName]; IF filteringAgent = NIL THEN RETURN; }; <> tR ¬ NEW[TapFilterRep ¬ [opsH, 0]]; CheckReport[opsH, "\nAnnotating messages: "]; TapFilter.MonitorAgent[agent: filteringAgent, proc: ReportProgress, clientData: tR]; WalnutDB.EnumerateUnacceptedMsgs[opsH, activeVersion, PassToFilterAgent]; <> TapFilter.WakeupAgent[filteringAgent]; [] ¬ TapFilter.IsAgentIdle[agent: filteringAgent, wait: TRUE]; CheckReport[opsH, " done.\n"]; TapFilter.MonitorAgent[agent: filteringAgent, proc: NIL, clientData: NIL]; }; tR: TapRef; CarefullyApply[opsH, Fm, FALSE]; RETURN[IF tR # NIL THEN tR.annotations ELSE 0]; }; <<>> DoReadArchiveFile: PUBLIC PROC[opsH: WalnutOpsHandle, file: ROPE, msgSet: MsgSet] RETURNS[numNew: INT] = { <> ENABLE UNWIND => NULL; ok: BOOL ¬ FALSE; fStream: STREAM; reason: ROPE ¬ NIL; Raf: PROC = { at: INT; IF msgSet.name # NIL THEN [] ¬ WalnutDB.VerifyMsgSet[opsH, msgSet]; at ¬ WalnutLog.StartReadArchiveFile[opsH, file, msgSet.name].at; WalnutDB.SetReadArchivePos[opsH, at]; }; Raf2: PROC = { [] ¬ WalnutLog.EndReadArchiveFile[opsH]; WalnutDB.SetReadArchivePos[opsH, 0]; }; Caf: PROC[inProgress: BOOL] = { fromPos: INT ¬ 0; IF ~inProgress THEN { at: INT; ok ¬ WalnutLog.PrepareToCopyTempLog[ opsH: opsH, which: readArchive, pagesAlreadyCopied: 0, reportProc: WalnutOpsInternal.CheckReport]; IF ~ok THEN RETURN; at ¬ WalnutLog.StartCopyReadArchive[opsH].at; WalnutDB.SetCopyReadArchivePos[opsH, at]; WalnutDB.SetOpInProgressPos[opsH, at]; WalnutRoot.CommitAndContinue[opsH]; } ELSE { -- calculate fromPos logLen: INT ¬ WalnutLog.LogLength[opsH]; startedCopyAt, startCopyPos: INT; startCopyPos ¬ WalnutDB.GetCopyReadArchivePos[opsH]; IF WalnutLog.SetPosition[opsH, startCopyPos] # 0 THEN ERROR WalnutDefs.Error[$log, $BadLog, IO.PutFR1["no entry at %g", [integer[startCopyPos]]] ]; [] ¬ WalnutLog.NextEntry[opsH]; -- skip the copy entry startedCopyAt ¬ WalnutLog.NextAt[opsH]; fromPos ¬ logLen - startedCopyAt; }; CheckReport[opsH, "Copying the ReadArchiveTempLog, starting at bytePos %g\n", [integer[fromPos]]]; WalnutLog.CopyTempLog[opsH, readArchive, WalnutDB.GetCopyReadArchivePos[opsH], fromPos, CheckReport]; WalnutDB.SetParseLogInProgress[opsH, TRUE]; WalnutDB.SetParseLogPos[opsH, WalnutDB.GetOpInProgressPos[opsH]]; WalnutDB.SetOpInProgressPos[opsH, -1]; }; CheckInProgress[opsH]; fStream ¬ WalnutStream.Open[name: file, readOnly: TRUE ! FS.Error => { CheckReport[opsH, error.explanation]; fStream ¬ NIL; CONTINUE} ].strm; IF fStream = NIL THEN RETURN[-1]; CarefullyApply[opsH, Raf, TRUE]; BEGIN ENABLE BEGIN FS.Error => { reason ¬ error.explanation; GOTO exit }; IO.Error => { <> IF reason = NIL THEN reason ¬ "IO Error creating readArchiveLog"; GOTO exit }; END; [ok, reason] ¬ WalnutMiscLog.CreateReadArchiveLog[opsH, fStream, msgSet.name, CheckReport]; EXITS exit => ok ¬ FALSE; END; fStream.Close[ ! IO.Error, FS.Error => CONTINUE]; IF ~ok THEN { CheckReport[opsH, " Archive Read of %g failed\n", [rope[file]] ]; IF reason # NIL THEN CheckReport[opsH, " Error reported as: %g\n", [rope[reason]]]; RETURN[-1]; } ELSE Raf2[]; LongRunningApply[opsH, Caf]; IF ~ok THEN { CheckReport[opsH, "Out of space trying to copy readArchiveLog for file %g\n", [rope[file]] ]; RETURN[-1]; }; CheckReport[opsH, "Adding messages to database\n"]; numNew ¬ ParseLog[opsH, FALSE]; }; nativeWalnutItemForm: ROPE = "@%05d 00525 %05d\n"; -- 20 chars, tioga formatting crWalnutItemForm: ROPE = "@%05d 00525 %05d\r"; -- 20 chars, tioga formatting nativeStartHeaderForm: ROPE = "*start*\n%05d %05d US \n"; crStartHeaderForm: ROPE = "*start*\r%05d %05d US \r"; DoWriteArchiveFile: PUBLIC PROC[ opsH: WalnutOpsHandle, file: ROPE, msgSetList: LIST OF MsgSet, append: BOOL] RETURNS[ok: BOOL]= { <> wStream: STREAM; someMsgWasTooBig: BOOL ¬ FALSE; thisMsgSet, exp: ROPE; startHeaderFixedLen: INT = 24; useCRForArchive: BOOL ¬ UserProfile.Boolean["Walnut.useCRForArchiveFile", FALSE]; walnutItemForm: ROPE ~ IF useCRForArchive THEN crWalnutItemForm ELSE nativeWalnutItemForm; startHeaderForm: ROPE ~ IF useCRForArchive THEN crStartHeaderForm ELSE nativeStartHeaderForm; first: BOOL ¬ TRUE; WriteProc: PROC[msg, tocEntry: ROPE, hasBeenRead: BOOL, startOfSubject: INT] RETURNS[continue: BOOL] = { textStart, textLen, formatLen, prefixLen: INT; length, walnutItemLen: INT ¬ 0; walnutItem: ROPE; continue ¬ TRUE; [textStart, textLen, formatLen, , ] ¬ WalnutDB.GetMsgText[opsH, msg]; walnutItem ¬ IF useCRForArchive THEN Rope.Cat[msg, "\r", thisMsgSet, "\r"] ELSE Rope.Cat[msg, "\n", thisMsgSet, "\n"]; walnutItemLen ¬ WalnutMiscLog.walnutItemFixedLength + walnutItem.Length[] + formatLen; IF formatLen # 0 THEN walnutItemLen ¬ walnutItemLen + 1; -- for extra CR after formatting prefixLen ¬ startHeaderFixedLen + walnutItemLen; length ¬ prefixLen + textLen + 1; -- extra CR after text IF length > 99999 THEN { CheckReport[opsH, "\nLength of msg %g is too big (%g bytes) - skipping", [rope[msg]], [integer[length]] ]; someMsgWasTooBig ¬ TRUE; RETURN }; <<-- the -2 below are because the bytecount within the prefix item does not include the surrounding @'s>> wStream.PutF[ startHeaderForm, [integer[length]], [integer[prefixLen]] ]; wStream.PutF[walnutItemForm, [integer[walnutItemLen-2]], [integer[formatLen]] ]; wStream.PutRope[walnutItem]; IF formatLen # 0 THEN { WalnutLog.CopyBytesToArchive[opsH, wStream, textStart+textLen, formatLen]; wStream.PutChar[IF useCRForArchive THEN '\r ELSE '\n]; }; wStream.PutChar['@]; WalnutLog.CopyBytesToArchive[opsH, wStream, textStart, textLen]; wStream.PutChar[IF useCRForArchive THEN '\r ELSE '\n]; }; ok ¬ FALSE; CheckInProgress[opsH]; BEGIN ENABLE BEGIN LoganBerry.Error => { IF debugging THEN SIGNAL DBErrorSeen; IF wStream # NIL THEN wStream.Close[ ! IO.Error, FS.Error => CONTINUE]; ERROR WalnutDefs.Error[$db, ec, IO.PutFR1["DB.Error with explanation: %g", [rope[explanation]] ] ]; }; WalnutDefs.Error => { IF debugging THEN SIGNAL WDErrorSeen; IF wStream # NIL THEN wStream.Close[ ! IO.Error, FS.Error => CONTINUE]; REJECT; }; WalnutDefs.VersionMismatch => { IF wStream # NIL THEN wStream.Close[ ! IO.Error, FS.Error => CONTINUE]; REJECT; }; < { CheckReport[opsH, exp ¬ FS.ErrorFromStream[stream].explanation]; GOTO err };>> IO.Error => { CheckReport[opsH, exp ¬ "IO Error"]; GOTO err }; FS.Error => { CheckReport[opsH, exp ¬ error.explanation]; GOTO err }; END; BEGIN wStream ¬ WalnutStream.Open[ name: file, useOldIfFound: append, exclusive: TRUE ! FS.Error => { CheckReport[opsH, error.explanation]; GOTO none }].strm; EXITS none => wStream ¬ NIL; END; IF wStream = NIL THEN { CheckReport[opsH, "\nCould not open %g\n", [rope[file]] ]; RETURN }; IF append THEN wStream.SetIndex[wStream.GetLength[]] ELSE { wStream.SetIndex[0]; wStream.SetLength[0]; }; CheckReport[opsH, "\n Archiving: "]; FOR mL: LIST OF WalnutDefs.MsgSet ¬ msgSetList, mL.rest UNTIL mL=NIL DO thisMsgSet ¬ mL.first.name; IF ~WalnutDB.VerifyMsgSet[opsH, mL.first] THEN { CheckReport[opsH, "\n MsgSet %g doesn't exist - continuing", [rope[thisMsgSet]] ]; LOOP; }; IF first THEN first ¬ FALSE ELSE CheckReport[opsH, ", "]; CheckReport[opsH, thisMsgSet]; [] ¬ WalnutDB.EnumerateMsgsInSet[opsH, thisMsgSet, TRUE, WriteProc]; ENDLOOP; EXITS err => { wStream.Close[]; ERROR WalnutDefs.Error[$log, $ErrorDuringWriteArchive, exp]; }; END; wStream.Close[]; ok ¬ TRUE; CheckReport[opsH, "\tFinished writing archive file\n"]; }; END.