-- File: XStreamImpl.mesa - last edit: -- AOF 28-Jan-88 11:23:01 -- Copyright (C) 1985, 1986, 1987, 1988 by Xerox Corporation. All rights reserved. DIRECTORY BulkData USING [ attnByte, bulkSST, Descriptor, Error, Identifier, Proc, program, Sink, Source, version], Courier USING [ Arguments, Call, Create, Delete, Error, ExportRemoteProgram, ErrorCode, Description, Dispatcher, Handle, NoSuchProcedureNumber, Parameters, RemoteErrorSignalled, SignalRemoteError, UnexportRemoteProgram, LocalSystemElement], CourierInternal USING [ AugmentedStream, ConnectionHandle, longZone, SetBulkStream], CourierOps USING [NotesObject, RedirectStream], CourierProtocol USING [dataSST], Environment USING [Block], NSBuffer USING [Body, Buffer], NSTypes USING [bytesPerIDPHeader, bytesPerSppHeader], PacketStream USING [ConnectionSuspended, Handle], Process USING [ Abort, EnableAborts, GetCurrent, priorityForeground, SetPriority, SetTimeout, SecondsToTicks], Router USING [GetDelayToNet, NoTableEntryForNet], SppOps USING [PacketStreamFromByteStream], Stream USING [ Block, CompletionCode, defaultObject, DeleteProcedure, EndOfStream, GetProcedure, Handle, InputOptions, InvalidOperation, PutProcedure, SendNowProcedure, SubSequenceType, TimeOut], System USING [ GetClockPulses, NetworkAddress, NetworkNumber, nullHostNumber, nullSocketNumber, nullNetworkNumber, SocketNumber], XStream USING [Request], XStreamOps USING [Handle, IndicatedUse, Object, ObjectSeal]; XStreamImpl: MONITOR IMPORTS Courier, CourierInternal, CourierOps, PacketStream, Process, Router, SppOps, Stream, System EXPORTS BulkData, XStream, XStreamOps = BEGIN baseTMO: CARDINAL = 20; --20 seconds hopTMO: CARDINAL = 2; --seconds again ID: PUBLIC --BulkData-- TYPE = LONG CARDINAL; Handle: PUBLIC --XStream-- TYPE = XStreamOps.Handle; Object: PUBLIC --XStream-- TYPE = XStreamOps.Object; CancelArguments: TYPE = MACHINE DEPENDENT RECORD[ identifier: BulkData.Identifier, timeout: CARDINAL]; TransferArguments: TYPE = MACHINE DEPENDENT RECORD[ identifier: BulkData.Identifier, descriptor: immediate BulkData.Descriptor ← [immediate []], timeout: CARDINAL]; list: RECORD[first: Handle, count: CARDINAL] ← [NIL, 0]; OnList: ENTRY PROC[h: Handle] = {h.link ← list.first; list.first ← h; list.count ← SUCC[list.count]}; address: System.NetworkAddress ← Courier.LocalSystemElement[]; --local copy currentID: ID ← System.GetClockPulses[]; NewID: ENTRY PROC RETURNS[ID] = INLINE {RETURN[(currentID ← currentID + 1)]}; BUG: ERROR[code: Code] = CODE; Code: TYPE = {bug, reused, lost, protocol, undeleted}; AbortTransfer: PUBLIC <<XStream>> PROC[sH: Stream.Handle] = BEGIN << This is an immediate transfer that is being aborted. In order to abort, we need to insure the SST is set, then send the attention. Once the attention is sent, don't send and "end-of-data" when the stream is deleted. >> sH.setSST[sH, BulkData.bulkSST]; --attention has to go as sst=1 sH.sendAttention[sH, BulkData.attnByte]; --send the abort sH.delete ← DeleteNull; --no longer want to send endRecord END; --AbortTransfer ActiveRendezvous: <<ENTRY>> PROC[active: Handle] = BEGIN OPEN aH: NARROW[active↑, active Object]; << This is the active party of a three party transfer. We have to contact the passive party using the Bulk Data Remote Program. Then connect the active party's immediate request to the stream made available by the Courier call. The local active party will never know what happened. >> TransferBulkData: PROC[cH: Courier.Handle] = BEGIN WITH request: active.request SELECT FROM none, deferred => Courier.SignalRemoteError[ BulkData.Error[invalidDescriptor].ORD]; stream => StreamCopy[active]; --blind transfer proc => request.proc[active]; --then call the client's proc ENDCASE; END; --TransferBulkData error: PROC[errorNumber: CARDINAL, arguments: Courier.Arguments] = BEGIN arguments[]; --there aren't any arguments --and I don't know what do do with this-- errorCode ← truncatedTransfer; << errorCode ← SELECT errorNumber FROM BulkData.Error[invalidDescriptor].ORD => truncatedTransfer, BulkData.Error[noSuchIdentifier].ORD => truncatedTransfer, BulkData.Error[identifierBusy].ORD => truncatedTransfer, BulkData.Error[wrongHost].ORD => truncatedTransfer, BulkData.Error[transferAborted].ORD => truncatedTransfer, ENDCASE => ERROR Error[protocol]; >> END; cH: Courier.Handle; arguments: TransferArguments; errorCode: Courier.ErrorCode ← noError; WITH aH: active SELECT FROM active => BEGIN procedure: CARDINAL = SELECT active.use FROM puts => BulkData.Proc[receive].ORD, gets => BulkData.Proc[send].ORD, ENDCASE => ERROR BUG[protocol]; cH ← Courier.Create[ remote: [aH.passive.net, aH.passive.host, System.nullSocketNumber], programNumber: BulkData.program, versionNumber: BulkData.version, zone: aH.courier.object.zone, classOfService: bulk]; arguments ← [ identifier: active.identifier, timeout: WaitTime[aH.passive.net]]; aH.courier ← LOOPHOLE[cH]; --replace Courier handle in request [] ← Courier.Call[ cH: cH, procedureNumber: procedure, arguments: [@arguments, DescribeTransferArguments], streamCheckoutProc: TransferBulkData ! UNWIND => Courier.Delete[cH]; Courier.RemoteErrorSignalled => {error[errorNumber, arguments]; CONTINUE}]; Courier.Delete[cH]; --that just about wraps it up IF errorCode # noError THEN RETURN WITH ERROR Courier.Error[errorCode]; END; ENDCASE; END; --ActiveRendezvous AttentionProc: <<FORKed>> PROC[sH: Stream.Handle, handle: Handle] = BEGIN ENABLE ABORTED => CONTINUE; --that's our forker telling us to go away Process.SetPriority[Process.priorityForeground]; --make it run fast [] ← sH.waitAttention[sH ! --wait for one to come in Courier.Error => CONTINUE]; --stream died with 'transportTimeout' handle.use ← truncated; --and if it does, then drop the hammer END; --AttentionProc BulkDataDispatcher: Courier.Dispatcher = BEGIN << This dispatcher is not written in the proper style of client/consumer IMPORTer/EXPORTer like a good Courier program should be. So don't bother trying to impress me with you understanding of software achitecture by telling me how impure this is. This model doesn't care whether the transfer request is a sink or source. That bit of information should exist in the passive's request that we rendezvous with. Nor is it going to call ServerCheckout. Instead it's going to rendezvous directly. >> parameters: Courier.Parameters; --this isn't typed yet BEGIN ENABLE UNWIND => cH.zone.FREE[@parameters.location]; SELECT procedureNumber FROM BulkData.Proc[send].ORD => BEGIN parameters ← [ location: cH.zone.NEW[TransferArguments], description: DescribeTransferArguments]; arguments[parameters]; --get the arguments ServerRendezvous[cH, parameters.location, gets]; END; BulkData.Proc[receive].ORD => BEGIN parameters ← [ location: cH.zone.NEW[TransferArguments], description: DescribeTransferArguments]; arguments[parameters]; --get the arguments ServerRendezvous[cH, parameters.location, puts]; END; BulkData.Proc[cancel].ORD => BEGIN parameters ← [ location: cH.zone.NEW[CancelArguments], description: DescribeCancelArguments]; arguments[parameters]; --get the arguments CancelRendezvous[parameters.location]; END; ENDCASE => RETURN WITH ERROR Courier.NoSuchProcedureNumber; END; cH.zone.FREE[@parameters.location]; --free up the storage [] ← results[]; --we don't have any results to send END; --BulkDataDispatcher CancelRendezvous: PROC[cancel: LONG POINTER TO CancelArguments] = BEGIN << This operation is a remote procedure call coming in from the active party of a three party transfer. This procedure has been called instead of Produce or Consume because of some inability of the active party. Since this is a Produce or Consume replacement, this code looks a lot like the code needed to rendezvous the incoming call with the passive object at the passive site (see ServerRendezvous). If the passive party is waiting here, tell him to go away, else hang a new object on the list and he'll find us, hopefully before we time out. >> CoupleWithPassive: ENTRY PROC[wait: BOOLEAN] RETURNS[BOOLEAN ← FALSE] = BEGIN FOR passive: Handle ← list.first, passive.link UNTIL passive = NIL DO IF passive.identifier = cancel.identifier THEN BEGIN WITH pH: passive SELECT FROM passive => SELECT passive.seal FROM cancelled => RETURN WITH ERROR Courier.SignalRemoteError[ BulkData.Error[transferAborted].ORD]; ENDCASE => BEGIN --that's the one we wanted passive.seal ← cancelled; --well, he is now Process.Abort[pH.process]; --and wake him up RETURN[TRUE]; --and tell caller we found him END; ENDCASE; END; ENDLOOP; << Passive call isn't here yet, so we'll supply an cancel object and he'll rendezvous with it when he gets here. If we get called with wait = TRUE then this is the first call. If we don't make the rendezvous with wait = FALSE, then signal BulkData.Error[noSuchIdentifier]. >> IF wait THEN WAIT active.rendezvous --it isn't abortable ELSE RETURN WITH ERROR Courier.SignalRemoteError[BulkData.Error[noSuchIdentifier].ORD]; END; --CoupleWithPassive active: LONG POINTER TO cancel Object ← CourierInternal.longZone.NEW[ cancel Object ← [ seal: old, --old before its time courier: NIL, --we don't have any idea identifier: cancel.identifier, --copy incoming object request: [none[]], use: cancel, --to be used as one variant: cancel[process: Process.GetCurrent[]]]]; Process.SetTimeout[ @active.rendezvous, Process.SecondsToTicks[cancel.timeout]]; OnList[active]; --insert into list BEGIN ENABLE UNWIND => OffList[active]; SELECT TRUE FROM (CoupleWithPassive[TRUE]) => NULL; --he's cancelled (CoupleWithPassive[FALSE]) => NULL; --he finally got here ENDCASE; --error will have been raised in CoupleWithPassive END; OffList[active]; --remove element we added for rendezvous END; --CancelRendezvous CancelTransfer: PUBLIC <<XStream>> PROC[handle: Handle] = BEGIN WITH h: handle SELECT FROM immediate => BEGIN << This is an immediate transfer that is being aborted. In order to abort, we need to insure the SST is set, then send the attention. This almost has to be done no matter what state the call is in. It is possible to bum this out IFF we are the called party and the sender (source) of the bulk data, but that's too many checks and just sending the thing is "correct". >> AbortTransfer[h.courier.object.sH]; END; active => BEGIN << The active party in a 3-party transfer can't produce|consume the data. He calls this routine instead so that the passive party won't hang around forever for the rendezvous to arrive. Is it interesting to notify the client of any errors here? >> cancel: CancelArguments ← [handle.identifier, WaitTime[h.passive.net]]; cH: Courier.Handle ← Courier.Create[ remote: [h.passive.net, h.passive.host, System.nullSocketNumber], programNumber: BulkData.program, versionNumber: BulkData.version, zone: h.courier.object.zone, classOfService: bulk]; [] ← Courier.Call[ cH: cH, procedureNumber: BulkData.Proc[cancel].ORD, arguments: [@cancel, DescribeCancelArguments] ! Courier.Error, Courier.RemoteErrorSignalled => CONTINUE]; Courier.Delete[cH]; END; passive => << What do you do here? I'd like to leave around a passive object marked cancelled so when (IF) the active process came in, it could detect the cancellation quickly. But I don't know if the object is queued yet. One end or the other is going to take a chance on having to time out. I'm going to let the active rendezvous be the one. >> {}; --fill in as needed ENDCASE => ERROR BUG[bug]; handle.seal ← cancelled; --looks easy, but what does it mean? END; --CancelTransfer GetSppLength: PROC[body: NSBuffer.Body] RETURNS[NATURAL] = INLINE {RETURN[ body.pktLength - NSTypes.bytesPerIDPHeader - NSTypes.bytesPerSppHeader]}; SetSppLength: PROC[body: NSBuffer.Body, bytes: NATURAL] = INLINE {body.pktLength ← bytes + NSTypes.bytesPerIDPHeader + NSTypes.bytesPerSppHeader}; PacketFromBulk: PROC[sH: Stream.Handle] RETURNS[PacketStream.Handle] = BEGIN sH ← RealStream[sH]; --that's okay as far as it goes sH ← LOOPHOLE[@LOOPHOLE[sH, CourierInternal.AugmentedStream].context, LONG POINTER TO Stream.Handle]↑; --retch!!!! (copied from CourierImplN) RETURN[SppOps.PacketStreamFromByteStream[sH]]; --more loop-de-loops END; --PacketFromBulk Copy: PUBLIC <<XStream>> PROC[sink, source: Stream.Handle] = BEGIN << This procedure dives in and finds the packet stream behind Courier's transport. It uses that so any bulk data operation that uses copy only has to BLT the data once from his buffer to the ethernet buffers. Wonder if all this is really worth it? >> handle: Handle; moved: NATURAL; sH: Stream.Handle; b: NSBuffer.Buffer; body: NSBuffer.Body; block: Environment.Block; psH: PacketStream.Handle; why: Stream.CompletionCode ← normal; options: Stream.InputOptions = [ TRUE, FALSE, FALSE, FALSE, FALSE, FALSE, TRUE, FALSE]; SELECT TRUE FROM --figure out which stream is ours, which it theirs (source.delete = DeleteNull), (source.delete = DeleteEnd) => sH ← source; (sink.delete = DeleteNull), (sink.delete = DeleteEnd) => sH ← sink; ENDCASE => BUG[bug]; --we're in trouble now psH ← PacketFromBulk[sH]; --that's the real packet stream handle ← NARROW[sH.clientData]; --and that's the XStream object UNTIL why = endOfStream DO ENABLE PacketStream.ConnectionSuspended => GOTO streamFailed; SELECT handle.use FROM gets => BEGIN SELECT TRUE FROM ((b ← psH.get[]) # NIL) => body ← b.ns; --got it; copy local pointer (~source.options.signalTimeout) => GOTO streamFailed; --worst ENDCASE => {SIGNAL Stream.TimeOut[0]; LOOP}; --he wants to try again IF body.endOfMessage THEN why ← endOfStream; --to get out of loop block ← [LOOPHOLE[@body.sppBody], 0, GetSppLength[body]]; SELECT TRUE FROM (body.attention) => why ← attention; --remote's bailing out (block.stopIndexPlusOne = 0) => NULL; --nothing going on here (body.subtype = BulkData.bulkSST) => --this is for us sink.put[sink, block, FALSE]; --scribble the bits ENDCASE => why ← endOfStream; --truncated data of some kind psH.returnReceiveBuffer[b]; --give the buffer back IF why = attention THEN GOTO remoteAbort; --set earlier END; puts => BEGIN body ← (b ← psH.getSendBuffer[]).ns; --get buffer to fill block ← [LOOPHOLE[@body.sppBody], 0, psH.getSenderSizeLimit[]]; [moved, why, ] ← source.get[source, block, source.options ! UNWIND => psH.returnSendBuffer[b]]; SetSppLength[body, moved]; --set the length of this move body.subtype ← BulkData.bulkSST; --set the sst appropriately IF why # normal THEN why ← endOfStream; psH.put[b]; --send out the buffer END; truncated => GOTO remoteAbort; --got an attention packet ENDCASE; REPEAT remoteAbort => BEGIN <<[] ← sH.getByte[sH]; --consume inband attention>> sH.sendAttention[sH, BulkData.attnByte]; sH.delete ← DeleteNull; --so we don't send endRecord Process.Abort[Process.GetCurrent[]]; --set to abort myself --we want to raise aborted anyhow, so let this signal do it-- DO --ENABLE ABORTED => REJECT-- [] ← sH.waitAttention[sH]; ENDLOOP; END; streamFailed => {sH.delete ← DeleteNull; ERROR ABORTED}; ENDLOOP; END; --Copy Create: PUBLIC <<XStream>> PROC[handle: Handle] RETURNS[sH: Stream.Handle] = BEGIN << These stream are simplex, i.e., they can't be used in both directions. The direction this instance of stream will be used is determined by the routine used to describe the BulkData.SINK | SOURCE. Whichever direction is defined for this stream is assigned a local procedure. The other direction is assigned a Stream.defaultObject value. That will result in an ERROR Stream.InvalidOperation if the client tries to use it incorrectly. >> IF handle.request.access = deferred THEN GOTO nostream; --he can't do that WITH h: handle SELECT FROM --pull the stream out of the object active, passive, immediate => BEGIN sH ← handle.courier.object.sH; --that's his stream SELECT handle.use FROM --what does he want to do? (puts) => BEGIN sH.setSST[sH, BulkData.bulkSST]; --then set the SST sH.put ← XStreamPut; --my own put procedure sH.get ← Stream.defaultObject.get; --invalid operation sH.delete ← DeleteEnd; --we have to send the endRecord sH.sendNow ← SendNow; --so we can trap client endRecords handle.attentionProc ← FORK AttentionProc[sH, handle]; --watcher sH.clientData ← handle; --store so get/put can find it END; (gets) => BEGIN sH.get ← XStreamGet; --my own get procedure sH.put ← Stream.defaultObject.put; --invalid operation sH.delete ← DeleteNull; --we don't have to send the end handle.attentionProc ← FORK AttentionProc[sH, handle]; --watcher sH.clientData ← handle; --store so get/put can find it END; ENDCASE => ERROR BUG[bug]; --what's going on? END; <<active, null, deferred, cancel, truncated>> ENDCASE => GOTO nostream; --and he can't do those EXITS nostream => ERROR Courier.Error[streamNotYours]; END; --Create DeleteEnd: Stream.DeleteProcedure = BEGIN tsH: Stream.Handle = RealStream[sH]; tsH.sendNow[tsH, TRUE]; --send the endRecord bit DeleteNull[sH]; --then finish off the stream END; --DeleteEnd DeleteNull: Stream.DeleteProcedure = BEGIN ch: CourierInternal.ConnectionHandle = RealCourier[sH]; handle: Handle ← NARROW[sH.clientData]; --get the object Process.Abort[handle.attentionProc]; JOIN handle.attentionProc; handle.attentionProc ← NIL; --so we don't do it twice sH.delete ← Stream.defaultObject.delete; --so he doesn't do it twice END; --DeleteNull DescribeCancelArguments: Courier.Description = {[] ← notes.noteSize[SIZE[CancelArguments]]}; DescribeSink: PUBLIC <<XStream>> Courier.Description = BEGIN OPEN n: LOOPHOLE[notes, POINTER TO CourierOps.NotesObject]; << Clients that 'fetch' an immediate|null 'sink' will be doing "gets" 'store' an immediate|null 'sink' will be doing "puts" >> descriptor: BulkData.Descriptor; --garden variety type handle: LONG POINTER TO Handle ← notes.noteSize[SIZE[Handle]]; notes.noteDeadSpace[handle, SIZE[Handle]]; --dispense with parm area SELECT notes.operation FROM fetch => BEGIN WITH h: handle SELECT FROM null => BEGIN IF handle.seal = smashed THEN ERROR BUG[reused]; descriptor ← [null[]]; --make the descriptor notes.noteSpace[@descriptor, SIZE[null BulkData.Sink]]; handle.use ← gets; --tag it intended use handle.seal ← old; --type it and mark it ready h.courier ← n.ch; --tie Courier handle to request END; immediate => BEGIN IF handle.seal = smashed THEN ERROR BUG[reused]; descriptor ← [immediate[]]; --make the descriptor notes.noteSpace[@descriptor, SIZE[immediate BulkData.Sink]]; handle.seal ← old; --mark it ready handle.use ← gets; --tag it intended use h.courier ← n.ch; --tie Courier handle to request END; deferred => BEGIN << Which remote should be used for passive, which for active? >> OPEN r: NARROW[handle.request, deferred XStream.Request]; IF WaitTime[r.sink.net] <= WaitTime[r.source.net] THEN BEGIN descriptor ← [passive[r.sink.net, r.sink.host, handle.identifier]]; h.passive ← n.ch; --tie Courier handle to request END ELSE BEGIN descriptor ← [active[r.sink.net, r.sink.host, handle.identifier]]; h.active ← n.ch; --tie Courier handle to request END; notes.noteSpace[@descriptor, SIZE[passive BulkData.Sink]]; handle.seal ← old; --type it and mark it ready handle.use ← deferred; --that's its final state END; ENDCASE; END; store => BEGIN notes.noteSpace[@descriptor, SIZE[null BulkData.Sink]]; handle↑ ← CourierInternal.longZone.NEW[Object ← [ seal: old, use: puts, identifier: TRASH, courier: n.ch, variant: null[]]]; WITH d: descriptor SELECT FROM null => BEGIN handle↑.variant ← null[]; handle.identifier ← [n.ch.object.remote.host, NewID[]]; END; immediate => BEGIN handle↑.variant ← immediate[]; handle.identifier ← [n.ch.object.remote.host, NewID[]]; END; active => BEGIN notes.noteSpace[@d.network, SIZE[active BulkData.Sink] - SIZE[null BulkData.Sink]]; handle.identifier ← d.identifier; handle↑.variant ← active[ process: Process.GetCurrent[], passive: [d.network, d.host]]; END; passive => BEGIN notes.noteSpace[@d.network, SIZE[active BulkData.Sink] - SIZE[null BulkData.Sink]]; handle.identifier ← d.identifier; handle↑.variant ← passive[ process: Process.GetCurrent[], active: [d.network, d.host]]; END; ENDCASE; OnList[handle↑]; --tack it on to the list END; free => Destroy[handle↑]; --equivalent to delete ENDCASE; END; --DescribeSink DescribeSource: PUBLIC <<XStream>> Courier.Description = BEGIN OPEN n: LOOPHOLE[notes, POINTER TO CourierOps.NotesObject]; << Clients that 'fetch' an immediate|null 'source' will be doing "puts" 'store' an immediate|null 'source' will be doing "gets" >> descriptor: BulkData.Descriptor; --garden variety type handle: LONG POINTER TO Handle ← notes.noteSize[SIZE[Handle]]; notes.noteDeadSpace[handle, SIZE[Handle]]; SELECT notes.operation FROM fetch => BEGIN WITH h: handle SELECT FROM null => BEGIN IF handle.seal = smashed THEN ERROR BUG[reused]; descriptor ← [null[]]; --build the descriptor notes.noteSpace[@descriptor, SIZE[null BulkData.Source]]; handle.seal ← old; --age it a bit handle.use ← puts; --type it and mark it ready h.courier ← n.ch; --tie Courier handle to request END; immediate => BEGIN IF handle.seal = smashed THEN ERROR BUG[reused]; descriptor ← [immediate[]]; --build the descriptor notes.noteSpace[@descriptor, SIZE[immediate BulkData.Source]]; handle.seal ← old; --age it a bit handle.use ← puts; --type it and mark it ready h.courier ← n.ch; --tie Courier handle to request END; deferred => BEGIN << Which remote should be used for passive, which for active? Just make sure it's opposite of what was done in describing the sink. We don't want both parties to be active | passive. >> OPEN r: NARROW[handle.request, deferred XStream.Request]; IF WaitTime[r.sink.net] > WaitTime[r.source.net] THEN BEGIN descriptor ← [passive[ r.source.net, r.source.host, handle.identifier]]; h.passive ← n.ch; --tie Courier handle to request END ELSE BEGIN descriptor ← [active[ r.source.net, r.source.host, handle.identifier]]; h.active ← n.ch; --tie Courier handle to request END; handle.seal ← old; --age it a bit handle.use ← deferred; --that's its final state notes.noteSpace[@descriptor, SIZE[passive BulkData.Sink]]; END; ENDCASE; END; store => BEGIN notes.noteSpace[@descriptor, SIZE[null BulkData.Source]]; handle↑ ← CourierInternal.longZone.NEW[Object ← [ seal: old, use: gets, identifier: TRASH, courier: n.ch, variant: null[]]]; WITH d: descriptor SELECT FROM null => BEGIN handle↑.variant ← null[]; handle.identifier ← [n.ch.object.remote.host, NewID[]]; END; immediate => BEGIN handle↑.variant ← immediate[]; handle.identifier ← [n.ch.object.remote.host, NewID[]]; END; active => BEGIN notes.noteSpace[@d.network, SIZE[active BulkData.Source] - SIZE[null BulkData.Source]]; handle.identifier ← d.identifier; handle↑.variant ← active[ process: Process.GetCurrent[], passive: [d.network, d.host]]; END; passive => BEGIN notes.noteSpace[@d.network, SIZE[active BulkData.Source] - SIZE[null BulkData.Source]]; handle.identifier ← d.identifier; handle↑.variant ← passive[ process: Process.GetCurrent[], active: [d.network, d.host]]; END; ENDCASE; OnList[handle↑]; --tack it on to the list END; free => Destroy[handle↑]; --delete the object ENDCASE; END; --DescribeSource DescribeTransferArguments: Courier.Description = {[] ← notes.noteSize[SIZE[TransferArguments]]}; Destroy, OffList: PUBLIC <<XStream>> ENTRY PROC[handle: Handle] = BEGIN p: Handle ← NIL; FOR h: Handle ← list.first, h.link UNTIL h = NIL DO IF h = handle THEN BEGIN handle.seal ← smashed; --smash the object's seal IF p = NIL THEN list.first ← h.link ELSE p.link ← h.link; CourierInternal.longZone.FREE[@handle]; list.count ← PRED[list.count]; --one less EXIT; --let's not look further END ELSE p ← h; REPEAT FINISHED => RETURN WITH ERROR Courier.Error[streamNotYours]; ENDLOOP; END; --Destroy Make: PUBLIC <<XStream>> PROC[request: XStream.Request] RETURNS[handle: Handle] = BEGIN handle ← CourierInternal.longZone.NEW[Object ← [ request: request, seal: new, use: undeclared, courier: NIL, identifier: [address.host, NewID[]], variant: null[]]]; WITH request SELECT FROM none => handle.variant ← null[]; deferred => handle.variant ← deferred[]; stream, proc => handle.variant ← immediate[]; ENDCASE => ERROR BUG[bug]; OnList[handle]; --add it to the list END; --Make PassiveRendezvous: ENTRY PROC[passive: Handle] = BEGIN --PASSIVE ASSERTION MADE BY CALLER-- OPEN pH: NARROW[passive, LONG POINTER TO passive Object]; ENABLE UNWIND => NULL; << Check the list, checking it twice, seeing who's here and who ain't. If we find a match between our handle.descriptor.identifier and one that's already in the list, toss him out of bed. Give him our handle.reqeust and then go to sleep ourselves. If we find no match, our entry is already there and the active process will find it when he comes in from the cold. He should wake us when he goes home and then we can return. >> Process.EnableAborts[@pH.rendezvous]; --in case active wants to gun us Process.SetTimeout[ @pH.rendezvous, Process.SecondsToTicks[ --set a condition timeout WaitTime[pH.courier.object.remote.net] + --us to initiator WaitTime[pH.active.net]]]; --plus us to active passive.seal ← rendezvous; --we're ready to link up FOR active: Handle ← list.first, active.link UNTIL active = NIL DO << If the active party managed to call us before we got here, he will have linked up an active descriptor with a null host number. If active is here, wake him up and go to sleep ourselves. It's his repsonsibility to wake us up when he's finished the transfer. >> IF active.identifier = passive.identifier THEN WITH aH: active SELECT FROM active => {NOTIFY aH.rendezvous; EXIT}; --jar him loose and exit ENDCASE; --this is not the one we want; continue searching << The active isn't here yet - wait for him. When he comes in he will process the bulk data and notify us when he's finished. We need to wait only a reasonable amount of time for the active party to get here. Once the transfer starts, it may take an arbitrary amount of time. Setting the timeout - we're passive. That means that we are as close or closer to the initiator than the active element is. The worst case for distance between the initiator and the active would be the distance from us to active plus the distance from us to initiator (that assumes the path from initiator to active goes through the net we live on). >> ENDLOOP; << In this loop, a wakeup when the the seal is rendezvous | transferring is ignored. A wakeup with any other seal exits the loop. It may be a timeout or the transfer may be complete. An ABORTED means that the active party called Cancel instead of Send or Recieive. >> WHILE passive.seal IN[rendezvous..transferring] DO ENABLE ABORTED => EXIT; WAIT pH.rendezvous; ENDLOOP; END; --PassiveRendezvous RealCourier: PROC[sH: Stream.Handle] RETURNS[ch: CourierInternal.ConnectionHandle] = INLINE {RETURN[LOOPHOLE[sH, CourierInternal.AugmentedStream].back]}; RealStream: PROC[sH: Stream.Handle] RETURNS[Stream.Handle] = INLINE {RETURN[@LOOPHOLE[ sH, CourierInternal.AugmentedStream].back.transFilter.object]}; SendNow: Stream.SendNowProcedure = BEGIN --Don't let silly client screw up the protocol. IF endRecord THEN ERROR Stream.InvalidOperation; sH ← RealStream[sH]; sH.sendNow[sH, endRecord]; END; --SendNow ServerCheckout: PUBLIC <<XStream>> PROC[ cH: Courier.Handle, request: XStream.Request] = BEGIN << This means the source/sink has been deserialized, but servers don't know they are playing a 3-party game, so we have to move carefully. When (IF) we locate the Object behind this Courier.Handle, the object's seal has the information about whether or not we are playing a 3rd party game. If we are doing such, then this routine will call off to rendezvous with the other half of the party. That means if we are passive, we wait for the active's incoming. Else we initiate a call (using the Bulk Data Protocol procedure appropriate (Send | Receive). >> CoupleCourierToRequest: ENTRY PROC = BEGIN FOR handle ← list.first, handle.link UNTIL handle = NIL DO WITH h: handle SELECT FROM null => IF @h.courier.object = cH THEN EXIT; immediate => IF @h.courier.object = cH THEN EXIT; active => IF @h.courier.object = cH THEN EXIT; passive => IF @h.courier.object = cH THEN EXIT; ENDCASE; REPEAT FINISHED => RETURN WITH ERROR Courier.Error[streamNotYours]; ENDLOOP; END; --CoupleCourierToRequest handle: Handle; CoupleCourierToRequest[]; --search for the handle handle.request ← request; --just for posterity WITH handle SELECT FROM null => {}; --I don't know what goes on here immediate => BEGIN ENABLE UNWIND => TestForStreamDeleted[handle]; WITH req: request SELECT FROM none => {}; --accepts or supplies no data stream => StreamCopy[handle]; --blind stream copy proc => req.proc[handle]; --user supplied proc ENDCASE; TestForStreamDeleted[handle]; END; active => ActiveRendezvous[handle]; passive => PassiveRendezvous[handle]; ENDCASE; END; --ServerCheckout StartProtocol: PUBLIC <<XStreamOps>> PROC = BEGIN ENABLE Courier.Error => CONTINUE; --should've been duplicate export Courier.ExportRemoteProgram[ programNumber: BulkData.program, versionRange: [BulkData.version, BulkData.version], dispatcher: BulkDataDispatcher, serviceName: "BulkDataTransfer"L, zone: CourierInternal.longZone, classOfService: bulk]; END; --StartProtocol StopProtocol: PUBLIC <<XStreamOps>> PROC = BEGIN ENABLE Courier.Error => CONTINUE; --should've been no such export Courier.UnexportRemoteProgram[ programNumber: BulkData.program, versionRange: [BulkData.version, BulkData.version]]; END; --StopProtocol ServerRendezvous: PROC[ cH: Courier.Handle, transfer: LONG POINTER TO TransferArguments, use: XStreamOps.IndicatedUse] = BEGIN << The active process is calling in to rendezvous with the passive party. If the passive party is waiting here, process his Reqeuest, else hang a new object on the list and he'll find us, hopefully before we time out. >> CoupleWithPassive: ENTRY PROC[wait: BOOLEAN] RETURNS[BOOLEAN ← FALSE] = BEGIN ENABLE UNWIND => NULL; FOR passive ← list.first, passive.link UNTIL passive = NIL DO IF passive.identifier = transfer.identifier THEN BEGIN WITH pH: passive SELECT FROM passive => BEGIN error: NATURAL; SELECT TRUE FROM (passive.seal = transferring) => error ← BulkData.Error[identifierBusy].ORD; (passive.seal = cancelled) => error ← BulkData.Error[transferAborted].ORD; (passive.use = use) => error ← BulkData.Error[wrongDirection].ORD; (pH.active.host # cH.remote.host) => error ← BulkData.Error[wrongHost].ORD; ENDCASE => BEGIN passive.seal ← active.seal ← transferring; RETURN[TRUE]; --ready to run END; RETURN WITH ERROR Courier.SignalRemoteError[error]; END; ENDCASE; END; << Passive call isn't here yet, so we'll supply an active object and he'll rendezvous with it when he gets here. If we get called with wait = TRUE then this is the first call. If we're in the second call, we were either notified out of the first or we timed out. >> ENDLOOP; IF wait THEN WAIT active.rendezvous --wait for passive to arrive ELSE RETURN WITH ERROR Courier.SignalRemoteError[BulkData.Error[noSuchIdentifier].ORD]; END; --CoupleWithPassive WakePassive: ENTRY PROC[seal: XStreamOps.ObjectSeal] = INLINE BEGIN OPEN pH: NARROW[passive, LONG POINTER TO passive Object]; passive.seal ← seal; --reset the use NOTIFY pH.rendezvous; --life's so simple END; passive: Handle; active: LONG POINTER TO active Object ← cH.zone.NEW[active Object ← [ seal: old, use: use, identifier: transfer.identifier, courier: LOOPHOLE[cH], --get the handle wired in variant: active[ process: Process.GetCurrent[], --this is us passive: [System.nullNetworkNumber, System.nullHostNumber]]]]; Process.SetTimeout[ @active.rendezvous, Process.SecondsToTicks[transfer.timeout]]; Process.EnableAborts[@active.rendezvous]; OnList[active]; --insert into list BEGIN ENABLE BEGIN Courier.Error => ERROR BUG[bug]; UNWIND => {OffList[active]; IF passive # NIL THEN WakePassive[cancelled]}; END; SELECT TRUE FROM (CoupleWithPassive[TRUE]) => NULL; --passive was laying in the bushes (CoupleWithPassive[FALSE]) => NULL; --he finally got here (in time too) ENDCASE; --too little too late - errored from CoupleWithPassive active.request ← passive.request; --copy original request (NIT) active.use ← passive.use; --that's not just a NIT! WITH req: active.request SELECT FROM none => {}; --accepts or supplies no data stream => StreamCopy[active]; --copies stream proc => req.proc[active]; --then ENDCASE; OffList[active]; --get rid of active element WakePassive[finished]; --and tell passive we're finished END; END; --ServerRendezvous StreamCopy: PROC[h: Handle] = BEGIN OPEN req: NARROW[h.request, stream XStream.Request]; sH: Stream.Handle ← Create[h]; --lot of error checks {ENABLE UNWIND => sH.delete[sH]; --keep track of that stream SELECT h.use FROM --choose direction of bit traffic puts => Copy[sH, req.sH]; --bits go that way gets => Copy[req.sH, sH]; --or bits come this way ENDCASE}; sH.delete[sH]; --we created it, so we delete it END; --StreamCopy TestForStreamDeleted: PROC[handle: Handle] = BEGIN IF handle.courier.object.sH.delete # Stream.defaultObject.delete THEN ERROR BUG[undeleted]; CourierInternal.SetBulkStream[handle.courier]; END; --TestForStreamDeleted UserCheckout: PUBLIC <<XStream>> PROC[cH: Courier.Handle] = BEGIN FindCourierHandleMatch: ENTRY PROC = INLINE BEGIN FOR handle ← list.first, handle.link UNTIL handle = NIL DO WITH h: handle SELECT FROM null => IF @h.courier.object = cH THEN EXIT; immediate => IF @h.courier.object = cH THEN EXIT; deferred => IF (@h.passive.object = cH) OR (@h.active.object = cH) THEN EXIT; ENDCASE; REPEAT FINISHED => RETURN WITH ERROR Courier.Error[streamNotYours]; ENDLOOP; END; --FindCourierHandleMatch handle: Handle; --this means the source/sink has been deserialized FindCourierHandleMatch[]; --see if we can locate the handle WITH h: handle.request SELECT FROM none => {}; --accepts or supplies no data stream => StreamCopy[handle]; --blind stream copy proc => BEGIN ENABLE UNWIND => TestForStreamDeleted[handle]; h.proc[handle]; --client supplied proc TestForStreamDeleted[handle]; END; ENDCASE; END; --UserCheckout WaitTime: PROC[net: System.NetworkNumber] RETURNS[seconds: CARDINAL] = BEGIN <<Sure wish this wasn't monitored in some cases.>> hops: CARDINAL ← Router.GetDelayToNet[net! Router.NoTableEntryForNet => hops ← 0 --this oughta be fun--]; RETURN[baseTMO + (hops * hopTMO)]; END; --WaitTime XStreamGet: Stream.GetProcedure = BEGIN myOptions: Stream.InputOptions = [ terminateOnEndRecord: TRUE, signalLongBlock: FALSE, signalShortBlock: FALSE, signalSSTChange: FALSE, signalEndOfStream: options.signalEndOfStream, signalAttention: FALSE, signalTimeout: options.signalTimeout, signalEndRecord: FALSE]; handle: Handle = NARROW[sH.clientData]; --that's the owning handle ch: CourierInternal.ConnectionHandle = handle.courier; --owning courier sH ← RealStream[sH]; --get the real stream why ← normal; sst ← BulkData.bulkSST; --in case the block is a null block bytesTransferred ← 0; --to make the loop invariant work UNTIL ((block.stopIndexPlusOne - block.startIndex) = bytesTransferred) DO BEGIN IF handle.use = truncated THEN GOTO truncated; --that's all [bytesTransferred, why, sst] ← sH.get[sH, block, myOptions]; SELECT why FROM normal => SELECT TRUE FROM (sst = BulkData.bulkSST) => EXIT; --(why = normal) AND (sst = bulk) (sst # CourierProtocol.dataSST) => ERROR BUG[protocol]; --no way (bytesTransferred # 0) => GOTO lostData; --data was system data ENDCASE => LOOP; --my sst but no bytes - just ignore sstChange => SELECT TRUE FROM (sst = CourierProtocol.dataSST) => GOTO truncated; --short (sst # BulkData.bulkSST) => ERROR BUG[protocol]; --no way (bytesTransferred # 0) => GOTO lostData; --some system data passed ENDCASE => LOOP; --retry the get again endRecord => IF ~options.signalEndOfStream THEN {why ← endOfStream; EXIT} ELSE DO SIGNAL Stream.EndOfStream[bytesTransferred]; ENDLOOP; attention => GOTO truncated; --other guys bailing out ENDCASE => ERROR BUG[bug]; --what else is there? EXITS lostData => BEGIN CourierOps.RedirectStream[ch, block, bytesTransferred, why, sst]; WITH vep: ch SELECT FROM user => SELECT TRUE FROM (vep.versExchProc = NIL) => GOTO truncated; --already been done (vep.versExchProc[ch] # noError) => GOTO truncated; ENDCASE; --version exchange and it's okay - do .get again ENDCASE => GOTO truncated; --wasn't a user connection bytesTransferred ← 0; --reset loop invariant END; END; REPEAT truncated => BEGIN handle.use ← truncated; --record this bit of trivia RETURN WITH ERROR ABORTED; --simply abort the client END; ENDLOOP; END; --XStreamGet XStreamPut: Stream.PutProcedure = BEGIN IF NARROW[sH.clientData, Handle].use = truncated THEN BEGIN sH.sendAttention[sH, BulkData.attnByte]; --we see the close sH.delete ← DeleteNull; --so we don't also send EOM ERROR ABORTED; --never to return END; sH ← RealStream[sH]; --if you don't do this, you'll recurse sH.put[sH, block, endRecord]; --else do the put for him END; --XStreamPut END. --XStreamImpl LOG ( date - person - action ) 2-Jun-85 15:10:00 AOF Created file 19-Oct-86 12:40:45 AOF The store/fetch vs source/sink => gets/puts mapping 21-Oct-86 15:03:39 AOF Aborting (swapping attn packets and other lies). 31-Oct-86 11:46:10 AOF Reorder statements in Create to avoid race with attn. 26-Nov-86 10:14:31 AOF Get Copy to use PacketStream. 8-Jan-87 10:30:57 AOF Lighten up on checking for stream being "reused". 27-Jan-87 18:42:40 AOF AttentionProc catches Courier.Error, ~ConnectionSusp. 4-Jun-87 11:28:00 AOF Xlate BD error to Courier.Error in ActiveRendezvous. 4-Jun-87 12:27:23 AOF Allow for active to arrive earlier than passive. 3-Sep-87 14:49:34 AOF Watch out for NIL from psH.get in $Copy. 23-Sep-87 9:35:21 AOF Use Stream.defaultObject.* on for invalid stream ops. 23-Sep-87 19:46:30 AOF Help save client from him/her self. 9-Jan-88 17:59:34 AOF Playing the version # exchange game. 28-Jan-88 9:39:12 AOF AR#12728 - sending eom twice using $Copy.