-- Transport Mechanism Mail Server: Mail spilling

-- [Indigo]<Grapevine>MS>Spiller.mesa

-- Andrew Birrell	September 14, 1982 2:08 pm
-- Randy Gobbel		19-May-81 18:56:08

DIRECTORY
BodyDefs	USING[ ItemHeader, ItemLength, maxRNameLength, RName ],
HeapDefs	USING[ GetReaderOffset, HeapEndRead, HeapEndWrite,
		       HeapReadData, HeapReadRName, HeapReadString,
		       HeapStartRead, HeapStartWrite, HeapWriteData,
		       HeapWriteRName, objectStart, ReaderHandle,
		       ReadItemHeader, SetReaderOffset, WriterHandle ],
Inline		USING[ LowHalf ],
LocalNameDefs	USING[ ReadMSName ],
NameInfoDefs	USING[ Close, Enumerate, GetMembers, MemberInfo],
ObjectDirDefs	USING[ FreeObject, ObjectNumber, UseObject ],
ProtocolDefs	USING[ endSST, Failed, Handle, SendBytes ],
PupDefs		USING[ AppendPupAddress, GetHopsToNetwork, PupAddress ],
PupStream	USING[ StreamClosing ],
ServerDefs	USING[ DownServer, GetServer, NoSuchServer, ServerAddr,
		       ServerHandle, ServerNotUp, ServerUp ],
SpillDefs	USING[ ],
Stream		USING[ SetSST ],
String		USING[ AppendChar, AppendString ],
Time		USING[ Append, Current, Unpack ],
VMDefs		USING[ AbandonFile, CantOpen, CantReadBackingStore,
		       CantWriteBackingStore, CloseFile, DestroyFile,
		       Error, FileHandle, FileSystem, GetFileSystem,
		       Login, Logout, MarkStartWait,
                       OpenFile, OpenOptions,
		       Page, PageNumber, pageSize, ReadPage, Release,
		       SetFileLength, UnableToLogin, UsePage ];

Spiller: MONITOR
IMPORTS HeapDefs, Inline, LocalNameDefs, NameInfoDefs, ObjectDirDefs,
        ProtocolDefs,
        PupDefs, PupStream, ServerDefs, Stream, String, Time,
        VMDefs
EXPORTS SpillDefs
SHARES ObjectDirDefs =

BEGIN

AppendTruncated: PROC[to: STRING, from: STRING, reserve: CARDINAL ← 0] =
   BEGIN
   FOR index: CARDINAL IN [0..from.length)
   WHILE to.length < to.maxlength-reserve
   DO to[to.length] ← from[index]; to.length ← to.length + 1; ENDLOOP;
   END;

localDisk:          BOOLEAN ← FALSE; -- debugging: archive to local disk --
maxFileTitleLength: CARDINAL = 99; -- IFS --
myName:             BodyDefs.RName = [BodyDefs.maxRNameLength];
myPassword:         STRING ← NIL;

-- Archive file titles are of the form:
--       <pathName>serverName>userName-uniqueID!1
-- The (almost) uniqueID is the text form of the current time; uniqueness
-- is sufficiently guaranteed by the "alreadyExists" error raised if we
-- attempt to create a second file with the same title.
-- For example, <DMS>Cabernet.ms>Birrell.pa-19-Jan-81-23-30-59-PDT!1
-- They are restricted to "maxFileTitleLength" characters by truncating
-- the userName and/or serverName and/or pathName if necessary.

uniqueIDLength: CARDINAL = 25 -- length of ">24-Jul-81-15-37-05-GMT!1" --;

AppendID: PROC[to: STRING] =
   BEGIN
   start: CARDINAL = to.length;
   Time.Append[to, Time.Unpack[Time.Current[]], TRUE];
   FOR i: CARDINAL IN [start..to.length)
   DO SELECT to[i] FROM
        IN ['a..'z], IN ['A..'Z], IN ['0..'9] => NULL;
      ENDCASE => to[i] ← '-; --stick to legal file title characters--
   ENDLOOP;
   END;

AppendTitle: INTERNAL PROC[to: STRING, user, path: BodyDefs.RName] =
   BEGIN
   sep: CHARACTER = IF localDisk THEN '- ELSE '>;
   IF NOT localDisk THEN AppendTruncated[to, path, uniqueIDLength+1];
   AppendTruncated[to, myName, uniqueIDLength+1];
   String.AppendChar[to, sep];
   AppendTruncated[to, user, uniqueIDLength];
   String.AppendChar[to, '-];
   AppendID[to];
   IF NOT localDisk THEN String.AppendString[to, "!1"L];
   END;

AlreadyExists: ERROR = CODE;

OpenFile: PROC[server: ServerDefs.ServerHandle, title: STRING,
               options: VMDefs.OpenOptions]
      RETURNS[ file: VMDefs.FileHandle ] =
   BEGIN
   ENABLE
      BEGIN
      VMDefs.CantOpen =>
        IF reason = alreadyExists THEN ERROR AlreadyExists[]
        ELSE ERROR ServerCrash[];
      VMDefs.Error, VMDefs.UnableToLogin,
      VMDefs.CantWriteBackingStore,
      ServerDefs.ServerNotUp, ServerDefs.NoSuchServer
             => ERROR ServerCrash[];
      END;
   addr: PupDefs.PupAddress ← ServerDefs.ServerAddr[server];
   addrString: STRING = [21] -- 377#377#177777|177777 --;
   addr.socket ← [0,0];
   PupDefs.AppendPupAddress[addrString, addr];
   IF NOT ServerDefs.ServerUp[server] THEN ERROR ServerCrash[];
   BEGIN
     ENABLE UNWIND => ServerDefs.DownServer[server];
     fs: VMDefs.FileSystem =
           VMDefs.Login[IF localDisk THEN Alto ELSE IFS,
                        addrString, myName, myPassword];
     file ← VMDefs.OpenFile[system: fs, name: title, options: options !
                            UNWIND => VMDefs.Logout[fs] ];
   END;
   END;

CloseAndLogout: PROC[ file: VMDefs.FileHandle ] =
   BEGIN
   fs: VMDefs.FileSystem = VMDefs.GetFileSystem[file];
   VMDefs.CloseFile[file];
   VMDefs.Logout[fs];
   END;

AbandonAndLogout: PROC[ file: VMDefs.FileHandle ] =
   BEGIN
   fs: VMDefs.FileSystem = VMDefs.GetFileSystem[file];
   VMDefs.AbandonFile[file];
   VMDefs.Logout[fs];
   END;

TryServer: INTERNAL PROC[user: BodyDefs.RName,
                         server: ServerDefs.ServerHandle,
                         serverName: BodyDefs.RName,
                         pathName: BodyDefs.RName]
                 RETURNS[obj: ObjectDirDefs.ObjectNumber,
                         file: VMDefs.FileHandle ] =
   BEGIN
   title: STRING = [maxFileTitleLength];
   BEGIN
      ENABLE AlreadyExists => { title.length←0; RETRY };
      AppendTitle[title, user, pathName];
      file ← OpenFile[server, title, new];
   END;
   BEGIN
      writer: HeapDefs.WriterHandle = HeapDefs.HeapStartWrite[archived];
      CatchObj: PROC[given: ObjectDirDefs.ObjectNumber] =
         { ObjectDirDefs.UseObject[obj ← given] };
      HeapDefs.HeapWriteRName[writer, serverName];
      HeapDefs.HeapWriteRName[writer, title];
      HeapDefs.HeapWriteRName[writer, user];
      HeapDefs.HeapWriteRName[writer, myName];
      HeapDefs.HeapEndWrite[writer, CatchObj];
   END;
   END; 


-- An archive file consists of a directory, followed by the message bodies
-- A directory is a sequence of pages, each containing an array of entries
-- A directory entry is as follows:

DirEntry: TYPE = MACHINE DEPENDENT RECORD[
   bodyStart:  LONG CARDINAL, -- words from start of file; 0 for deleted --
   bodyLength: LONG CARDINAL, -- words --
   textStart:  LONG CARDINAL, -- words from start of file, for MTP --
   textLength: BodyDefs.ItemLength -- bytes, for MTP -- ];

DirHeader: TYPE = MACHINE DEPENDENT RECORD[spare: CARDINAL];

entriesPerPage:  CARDINAL = (VMDefs.pageSize-SIZE[DirHeader]) /
                             SIZE[DirEntry];

DirPageContents: TYPE = MACHINE DEPENDENT RECORD[
   header: DirHeader, entries: ARRAY [0..entriesPerPage) OF DirEntry ];

DirPage:         TYPE = POINTER TO DirPageContents;



-- Writer: one at a time, state in global frame --

file:       VMDefs.FileHandle;
page:       VMDefs.Page;  -- current buffer for writing bodies --
pageNumber: CARDINAL ← 0; -- page number of body buffer --
pageUsed:   CARDINAL ← 0; -- words already used in body buffer --
bodiesSeen: CARDINAL ← 0; -- bodies previously written --
bodiesTold: CARDINAL ← 0; -- bodies specified for this file --
dirPage:    DirPage;      -- current buffer for directory --
dirPageNumber: CARDINAL ← 0; -- page number of directory buffer --

-- only one writing client is allowed: others wait --

writerActive: BOOLEAN ← FALSE;
writerStopped: CONDITION;

NoteInactiveWriter: INTERNAL PROC =
   { writerActive ← FALSE; NOTIFY writerStopped };

StartWrite: PUBLIC ENTRY PROC[user: BodyDefs.RName, bodies: CARDINAL]
      RETURNS[ obj: ObjectDirDefs.ObjectNumber] =
   BEGIN
   -- raises the error ServerCrash if no backing server is available --
   ENABLE UNWIND => NULL;
   serverListPrefix: STRING = "Archive-"L;
   serverListName: BodyDefs.RName = [BodyDefs.maxRNameLength];
   String.AppendString[serverListName,serverListPrefix];
   IF myName.length + serverListName.length > serverListName.maxlength
   THEN --obscure!-- ERROR ServerCrash[];
   String.AppendString[serverListName,myName];
   WHILE writerActive DO WAIT writerStopped ENDLOOP;
   writerActive ← TRUE;
   DO -- repeat if server appears to be up then crashes --
      ENABLE UNWIND => NoteInactiveWriter[];
      bestHops: CARDINAL ← LAST[CARDINAL];
      serverName: BodyDefs.RName = [BodyDefs.maxRNameLength];
      pathName: BodyDefs.RName = [BodyDefs.maxRNameLength];
      server: ServerDefs.ServerHandle;
      found: BOOLEAN ← FALSE;
      LookAtServer: PROC[name: BodyDefs.RName] RETURNS[done: BOOLEAN] =
         BEGIN
         ENABLE ServerDefs.ServerNotUp, ServerDefs.NoSuchServer =>
            { done ← FALSE;  CONTINUE };
         trialServerName: BodyDefs.RName = [BodyDefs.maxRNameLength];
         trialPathName: BodyDefs.RName = [BodyDefs.maxRNameLength];
         IF GetHostPath[name, trialServerName, trialPathName]
         THEN BEGIN
              this: ServerDefs.ServerHandle =
                  ServerDefs.GetServer[[rName[trialServerName]],foreign];
              thisAddr: PupDefs.PupAddress = ServerDefs.ServerAddr[this];
              hops: CARDINAL = PupDefs.GetHopsToNetwork[thisAddr.net];
              IF hops < bestHops AND ServerDefs.ServerUp[this]
              THEN BEGIN
                   serverName.length ← pathName.length ← 0;
                   String.AppendString[serverName, trialServerName];
                   String.AppendString[pathName, trialPathName];
                   server ← this; bestHops ← hops;
                   found ← TRUE;
                   END;
              END;
         done ← FALSE;
         END;
      listInfo: NameInfoDefs.MemberInfo =
         NameInfoDefs.GetMembers[serverListName];
      WITH listInfo SELECT FROM
        group =>
          BEGIN
          NameInfoDefs.Enumerate[members, LookAtServer !
             UNWIND => NameInfoDefs.Close[members]];
          NameInfoDefs.Close[members];
          IF NOT found THEN ERROR ServerCrash[];
          END;
      ENDCASE => ERROR ServerCrash[];
      -- we have a candidate: try it! --
      [obj, file] ← TryServer[user, server, serverName, pathName !
                              ServerCrash => LOOP];
      BEGIN
         pageNumber ← (bodies+entriesPerPage-1)/entriesPerPage;
         pageUsed ← 0;
         VMDefs.SetFileLength[file, [pageNumber,0] !
              VMDefs.Error, VMDefs.CantWriteBackingStore => GOTO crashed];
         page ← VMDefs.UsePage[[file,pageNumber]];
         bodiesTold ← bodies;
         dirPageNumber ← 0; bodiesSeen ← 0;
         dirPage ← LOOPHOLE[VMDefs.UsePage[[file,dirPageNumber]], DirPage];
         EXIT
         EXITS crashed =>
           { ServerDefs.DownServer[server]; AbandonAndLogout[file];
             ObjectDirDefs.FreeObject[obj] };
      END;
   ENDLOOP;
   END;

GetHostPath: PROC[name: BodyDefs.RName, host, path: BodyDefs.RName]
             RETURNS[BOOLEAN] =
   BEGIN
   -- name should be "[Ivy]<DMS>" --
   host.length ← path.length ← 0;
   IF name[0] # '[ THEN GOTO cant;
   FOR i: CARDINAL IN [1..name.length)
   DO IF name[i] = '] THEN EXIT;
      String.AppendChar[host, name[i]];
   REPEAT FINISHED => GOTO cant
   ENDLOOP;
   FOR i: CARDINAL IN [host.length+2..name.length)
   DO String.AppendChar[path, name[i]] ENDLOOP;
   IF path.length < 2 OR path[0] # '< OR path[path.length-1] # '>
   THEN GOTO cant;
   RETURN[TRUE]
   EXITS cant => RETURN[FALSE];
   END;


ServerCrash: PUBLIC ERROR = CODE;
MoreBodiesThanAdvertised: ERROR = CODE;

AddBody: PUBLIC ENTRY PROC[body: ObjectDirDefs.ObjectNumber] =
   BEGIN
   ENABLE
      BEGIN
      VMDefs.Error, VMDefs.CantWriteBackingStore => ERROR ServerCrash[];
      UNWIND => AbortWriting[];
      END;
   Here: PROC RETURNS[ LONG CARDINAL ] =
      { RETURN[LONG[pageNumber]*VMDefs.pageSize + pageUsed] };
   reader: HeapDefs.ReaderHandle = HeapDefs.HeapStartRead[body];
   entry: POINTER TO DirEntry;
   wantedDirPage: VMDefs.PageNumber = bodiesSeen/entriesPerPage;
   IF bodiesSeen >= bodiesTold THEN ERROR MoreBodiesThanAdvertised[];
   IF wantedDirPage # dirPageNumber
   THEN BEGIN
        VMDefs.MarkStartWait[LOOPHOLE[dirPage]];
        VMDefs.Release[LOOPHOLE[dirPage]];
        dirPageNumber ← wantedDirPage;
        dirPage ← LOOPHOLE[VMDefs.UsePage[[file,dirPageNumber]], DirPage];
        END;
   entry ← @((dirPage.entries)[bodiesSeen MOD entriesPerPage]);
   entry.bodyStart ← Here[];
   -- find text start and length for MTP --
   entry.textStart ← 0; entry.textLength ← 0; --defaults--
   DO header: BodyDefs.ItemHeader = HeapDefs.ReadItemHeader[reader];
      SELECT header.type FROM
        Text =>
           BEGIN
           entry.textStart ← entry.bodyStart +
               HeapDefs.GetReaderOffset[reader];
           entry.textLength ← header.length;
           EXIT
           END;
        LastItem => EXIT --no text--;
      ENDCASE => HeapDefs.SetReaderOffset[reader,
             HeapDefs.GetReaderOffset[reader] + (header.length+1)/2 ];
   ENDLOOP;
   HeapDefs.SetReaderOffset[reader, HeapDefs.objectStart];
   -- copy body --
   DO ENABLE UNWIND => HeapDefs.HeapEndRead[reader];
      ended: BOOLEAN;
      used: CARDINAL;
      [ended,used] ← HeapDefs.HeapReadData[reader,
           [LOOPHOLE[page,POINTER]+pageUsed, VMDefs.pageSize-pageUsed]];
      pageUsed ← pageUsed + used;
      IF pageUsed >= VMDefs.pageSize
      THEN BEGIN
           IF pageUsed > VMDefs.pageSize THEN ERROR;
           VMDefs.MarkStartWait[page];
           VMDefs.Release[page];
           pageNumber ← pageNumber + 1; pageUsed ← 0;
           page ← VMDefs.UsePage[[file,pageNumber]];
           END;
      IF ended THEN EXIT;
   ENDLOOP;
   entry.bodyLength ← Here[] - entry.bodyStart;
   HeapDefs.HeapEndRead[reader];
   bodiesSeen ← bodiesSeen + 1;
   END;

EndWrite: PUBLIC ENTRY PROC =
   BEGIN
   ENABLE
      BEGIN
      VMDefs.Error, VMDefs.CantWriteBackingStore => ERROR ServerCrash[];
      UNWIND => AbortWriting[];
      END;
   VMDefs.MarkStartWait[LOOPHOLE[dirPage]];
   IF pageUsed > 0 THEN VMDefs.MarkStartWait[page];
   VMDefs.Release[LOOPHOLE[dirPage]];
   VMDefs.Release[page];
   CloseAndLogout[file];
   NoteInactiveWriter[];
   END;

AbortWriting: INTERNAL PROC =
   { AbandonAndLogout[file]; NoteInactiveWriter[] };




-- Reading, modifying. deleting archive files --

ArchReader: PUBLIC TYPE = RECORD[silly:VMDefs.FileHandle];

OpenToRead: PUBLIC PROC[obj: ObjectDirDefs.ObjectNumber]
                RETURNS[file: ArchReader] =
   { RETURN[ [silly:OpenFromObj[obj, old]] ] };

OpenFromObj: PROC[obj: ObjectDirDefs.ObjectNumber,
                  options: VMDefs.OpenOptions]
      RETURNS[file: VMDefs.FileHandle] =
   BEGIN
   title: STRING = [maxFileTitleLength];
   serverName: BodyDefs.RName = [BodyDefs.maxRNameLength];
   reader: HeapDefs.ReaderHandle = HeapDefs.HeapStartRead[obj];
   [] ← HeapDefs.HeapReadRName[reader, serverName];
   [] ← HeapDefs.HeapReadString[reader, title];
   HeapDefs.HeapEndRead[reader];
   file ← OpenFile[ServerDefs.GetServer[[rName[serverName]],foreign],
                   title, options];
   END;

CopyBody: PUBLIC PROC[file: ArchReader, body: CARDINAL,
                      str: ProtocolDefs.Handle] =
   BEGIN
   ENABLE
      BEGIN
      VMDefs.Error, VMDefs.CantReadBackingStore => ERROR ServerCrash[];
      UNWIND => AbandonAndLogout[file];
      END;
   entry: DirEntry = ReadDirEntry[file, body];
   CopyBytes[file, entry.bodyStart, 2*entry.bodyLength,
             str, ProtocolDefs.SendBytes];
   Stream.SetSST[str, ProtocolDefs.endSST ! PupStream.StreamClosing =>
                 ERROR ProtocolDefs.Failed[communicationError] ];
   END;

CopyText: PUBLIC PROC[file: ArchReader, body: CARDINAL,
                      sendTextLength: PROC[LONG CARDINAL],
                      str: UNSPECIFIED,
                      sendBytes: PROC[UNSPECIFIED,POINTER,CARDINAL] ] =
   BEGIN
   ENABLE
      BEGIN
      VMDefs.Error, VMDefs.CantReadBackingStore => ERROR ServerCrash[];
      UNWIND => AbandonAndLogout[file];
      END;
   entry: DirEntry = ReadDirEntry[file, body];
   sendTextLength[entry.textLength];
   CopyBytes[file, entry.textStart, entry.textLength, str, sendBytes];
   END;

RecoverBody: PUBLIC PROC[file: ArchReader, body: CARDINAL]
                 RETURNS[ writer: HeapDefs.WriterHandle ] =
   BEGIN
   ENABLE
      BEGIN
      VMDefs.Error, VMDefs.CantReadBackingStore => ERROR ServerCrash[];
      UNWIND => AbandonAndLogout[file];
      END;
   entry: DirEntry = ReadDirEntry[file, body];
   WriteBlock: PROC[writer: UNSPECIFIED, ptr: POINTER, amount: CARDINAL] =
      BEGIN
      -- amount is in bytes, but it's even --
      IF amount MOD 2 # 0 THEN ERROR;
      HeapDefs.HeapWriteData[writer, [ptr,amount/2]];
      END;
   writer ← HeapDefs.HeapStartWrite[body];
   CopyBytes[file, entry.bodyStart, 2*entry.bodyLength,
             writer, WriteBlock];
   END;

DeleteEntry: PUBLIC PROC[file: ArchReader, body: CARDINAL] =
   BEGIN
   ENABLE
      BEGIN
      VMDefs.Error, VMDefs.CantReadBackingStore,
      VMDefs.CantWriteBackingStore => ERROR ServerCrash[];
      UNWIND => AbandonAndLogout[file];
      END;
   dir: DirPage = LOOPHOLE[VMDefs.ReadPage[[file,body/entriesPerPage],0]];
   (dir.entries)[body MOD entriesPerPage].bodyStart ← 0;
   VMDefs.MarkStartWait[LOOPHOLE[dir]];
   VMDefs.Release[LOOPHOLE[dir]];
   END;

TestDeletion: PUBLIC PROC[file: ArchReader, body: CARDINAL]
             RETURNS[deleted: BOOLEAN] =
   BEGIN
   ENABLE
      BEGIN
      VMDefs.Error, VMDefs.CantReadBackingStore => ERROR ServerCrash[];
      UNWIND => AbandonAndLogout[file];
      END;
   RETURN[ ReadDirEntry[file, body].bodyStart = 0 ];
   END;

EndReading: PUBLIC PROC[file: ArchReader] =
   BEGIN
   CloseAndLogout[file !
      VMDefs.Error => ERROR ServerCrash[];
      UNWIND => AbandonAndLogout[file] ];
   END;

Delete: PUBLIC PROC[obj: ObjectDirDefs.ObjectNumber] =
   BEGIN
   file: VMDefs.FileHandle = OpenFromObj[obj, old];
   fs: VMDefs.FileSystem = VMDefs.GetFileSystem[file];
   VMDefs.DestroyFile[file !
      VMDefs.Error => ERROR ServerCrash[];
      UNWIND => AbandonAndLogout[file] ];
   VMDefs.Logout[fs];
   END;

CopyBytes: PROC[file: VMDefs.FileHandle,
                start: LONG CARDINAL, bytes: LONG CARDINAL,
                str: UNSPECIFIED,
                sendBytes: PROC[UNSPECIFIED,POINTER,CARDINAL] ] =
   BEGIN
   offset: CARDINAL ← Inline.LowHalf[start MOD VMDefs.pageSize];
   pageNumber: CARDINAL ← Inline.LowHalf[start / VMDefs.pageSize];
   WHILE bytes > 0
   DO page: VMDefs.Page = VMDefs.ReadPage[[file,pageNumber], 3];
      available: CARDINAL = 2*(VMDefs.pageSize-offset); --bytes--
      amount: CARDINAL = IF bytes < available
                         THEN Inline.LowHalf[bytes]
                         ELSE available; 
      sendBytes[str, page+offset, amount ! UNWIND => VMDefs.Release[page] ];
      pageNumber ← pageNumber + 1; offset ← 0; bytes ← bytes - amount;
      VMDefs.Release[page];
   ENDLOOP;
   END;

ReadDirEntry: PROC[file: VMDefs.FileHandle, body: CARDINAL]
           RETURNS[entry: DirEntry] =
   BEGIN
   dir: DirPage = LOOPHOLE[VMDefs.ReadPage[[file,body/entriesPerPage],0]];
   entry ← (dir.entries)[body MOD entriesPerPage];
   VMDefs.Release[LOOPHOLE[dir]];
   END;


Init: ENTRY PROC =
   BEGIN
   String.AppendString[myName, LocalNameDefs.ReadMSName[].name];
   myPassword ← LocalNameDefs.ReadMSName[].password;
   END;


Init[];   
   

END.