-- File: XStreamImpl.mesa - last edit:
-- AOF 28-Jan-88 11:23:01
-- Copyright (C) 1985, 1986, 1987, 1988 by Xerox Corporation. All rights reserved.
DIRECTORY
BulkData USING [
attnByte, bulkSST, Descriptor, Error, Identifier, Proc, program,
Sink, Source, version],
Courier USING [
Arguments, Call, Create, Delete, Error, ExportRemoteProgram, ErrorCode,
Description, Dispatcher, Handle, NoSuchProcedureNumber, Parameters,
RemoteErrorSignalled, SignalRemoteError, UnexportRemoteProgram,
LocalSystemElement],
CourierInternal USING [
AugmentedStream, ConnectionHandle, longZone, SetBulkStream],
CourierOps USING [NotesObject, RedirectStream],
CourierProtocol USING [dataSST],
Environment USING [Block],
NSBuffer USING [Body, Buffer],
NSTypes USING [bytesPerIDPHeader, bytesPerSppHeader],
PacketStream USING [ConnectionSuspended, Handle],
Process USING [
Abort, EnableAborts, GetCurrent, priorityForeground, SetPriority,
SetTimeout, SecondsToTicks],
Router USING [GetDelayToNet, NoTableEntryForNet],
SppOps USING [PacketStreamFromByteStream],
Stream USING [
Block, CompletionCode, defaultObject, DeleteProcedure, EndOfStream,
GetProcedure, Handle, InputOptions, InvalidOperation, PutProcedure,
SendNowProcedure, SubSequenceType, TimeOut],
System USING [
GetClockPulses, NetworkAddress, NetworkNumber, nullHostNumber,
nullSocketNumber, nullNetworkNumber, SocketNumber],
XStream USING [Request],
XStreamOps USING [Handle, IndicatedUse, Object, ObjectSeal];
XStreamImpl: MONITOR
IMPORTS
Courier, CourierInternal, CourierOps, PacketStream, Process, Router,
SppOps, Stream, System
EXPORTS BulkData, XStream, XStreamOps =
BEGIN
baseTMO: CARDINAL = 20; --20 seconds
hopTMO: CARDINAL = 2; --seconds again
ID: PUBLIC --BulkData-- TYPE = LONG CARDINAL;
Handle: PUBLIC --XStream-- TYPE = XStreamOps.Handle;
Object: PUBLIC --XStream-- TYPE = XStreamOps.Object;
CancelArguments: TYPE = MACHINE DEPENDENT RECORD[
identifier: BulkData.Identifier, timeout: CARDINAL];
TransferArguments: TYPE = MACHINE DEPENDENT RECORD[
identifier: BulkData.Identifier,
descriptor: immediate BulkData.Descriptor ← [immediate []],
timeout: CARDINAL];
list: RECORD[first: Handle, count: CARDINAL] ← [NIL, 0];
OnList: ENTRY PROC[h: Handle] =
{h.link ← list.first; list.first ← h; list.count ← SUCC[list.count]};
address: System.NetworkAddress ← Courier.LocalSystemElement[]; --local copy
currentID: ID ← System.GetClockPulses[];
NewID: ENTRY PROC RETURNS[ID] = INLINE {RETURN[(currentID ← currentID + 1)]};
BUG: ERROR[code: Code] = CODE;
Code: TYPE = {bug, reused, lost, protocol, undeleted};
AbortTransfer: PUBLIC <<XStream>> PROC[sH: Stream.Handle] =
BEGIN
<<
This is an immediate transfer that is being aborted. In order to
abort, we need to insure the SST is set, then send the attention.
Once the attention is sent, don't send and "end-of-data" when the
stream is deleted.
>>
sH.setSST[sH, BulkData.bulkSST]; --attention has to go as sst=1
sH.sendAttention[sH, BulkData.attnByte]; --send the abort
sH.delete ← DeleteNull; --no longer want to send endRecord
END; --AbortTransfer
ActiveRendezvous: <<ENTRY>> PROC[active: Handle] =
BEGIN
OPEN aH: NARROW[active↑, active Object];
<<
This is the active party of a three party transfer. We have to contact
the passive party using the Bulk Data Remote Program. Then connect the
active party's immediate request to the stream made available by the
Courier call. The local active party will never know what happened.
>>
TransferBulkData: PROC[cH: Courier.Handle] =
BEGIN
WITH request: active.request SELECT FROM
none, deferred => Courier.SignalRemoteError[
BulkData.Error[invalidDescriptor].ORD];
stream => StreamCopy[active]; --blind transfer
proc => request.proc[active]; --then call the client's proc
ENDCASE;
END; --TransferBulkData
error: PROC[errorNumber: CARDINAL, arguments: Courier.Arguments] =
BEGIN
arguments[]; --there aren't any arguments
--and I don't know what do do with this--
errorCode ← truncatedTransfer;
<<
errorCode ← SELECT errorNumber FROM
BulkData.Error[invalidDescriptor].ORD => truncatedTransfer,
BulkData.Error[noSuchIdentifier].ORD => truncatedTransfer,
BulkData.Error[identifierBusy].ORD => truncatedTransfer,
BulkData.Error[wrongHost].ORD => truncatedTransfer,
BulkData.Error[transferAborted].ORD => truncatedTransfer,
ENDCASE => ERROR Error[protocol];
>>
END;
cH: Courier.Handle;
arguments: TransferArguments;
errorCode: Courier.ErrorCode ← noError;
WITH aH: active SELECT FROM
active =>
BEGIN
procedure: CARDINAL = SELECT active.use FROM
puts => BulkData.Proc[receive].ORD,
gets => BulkData.Proc[send].ORD,
ENDCASE => ERROR BUG[protocol];
cH ← Courier.Create[
remote: [aH.passive.net, aH.passive.host, System.nullSocketNumber],
programNumber: BulkData.program, versionNumber: BulkData.version,
zone: aH.courier.object.zone, classOfService: bulk];
arguments ← [
identifier: active.identifier, timeout: WaitTime[aH.passive.net]];
aH.courier ← LOOPHOLE[cH]; --replace Courier handle in request
[] ← Courier.Call[
cH: cH, procedureNumber: procedure,
arguments: [@arguments, DescribeTransferArguments],
streamCheckoutProc: TransferBulkData !
UNWIND => Courier.Delete[cH];
Courier.RemoteErrorSignalled =>
{error[errorNumber, arguments]; CONTINUE}];
Courier.Delete[cH]; --that just about wraps it up
IF errorCode # noError THEN RETURN WITH ERROR Courier.Error[errorCode];
END;
ENDCASE;
END; --ActiveRendezvous
AttentionProc: <<FORKed>> PROC[sH: Stream.Handle, handle: Handle] =
BEGIN
ENABLE ABORTED => CONTINUE; --that's our forker telling us to go away
Process.SetPriority[Process.priorityForeground]; --make it run fast
[] ← sH.waitAttention[sH ! --wait for one to come in
Courier.Error => CONTINUE]; --stream died with 'transportTimeout'
handle.use ← truncated; --and if it does, then drop the hammer
END; --AttentionProc
BulkDataDispatcher: Courier.Dispatcher =
BEGIN
<<
This dispatcher is not written in the proper style of client/consumer
IMPORTer/EXPORTer like a good Courier program should be. So don't bother
trying to impress me with you understanding of software achitecture by
telling me how impure this is.
This model doesn't care whether the transfer request is a sink or source.
That bit of information should exist in the passive's request that we
rendezvous with. Nor is it going to call ServerCheckout. Instead it's
going to rendezvous directly.
>>
parameters: Courier.Parameters; --this isn't typed yet
BEGIN
ENABLE UNWIND => cH.zone.FREE[@parameters.location];
SELECT procedureNumber FROM
BulkData.Proc[send].ORD =>
BEGIN
parameters ← [
location: cH.zone.NEW[TransferArguments],
description: DescribeTransferArguments];
arguments[parameters]; --get the arguments
ServerRendezvous[cH, parameters.location, gets];
END;
BulkData.Proc[receive].ORD =>
BEGIN
parameters ← [
location: cH.zone.NEW[TransferArguments],
description: DescribeTransferArguments];
arguments[parameters]; --get the arguments
ServerRendezvous[cH, parameters.location, puts];
END;
BulkData.Proc[cancel].ORD =>
BEGIN
parameters ← [
location: cH.zone.NEW[CancelArguments],
description: DescribeCancelArguments];
arguments[parameters]; --get the arguments
CancelRendezvous[parameters.location];
END;
ENDCASE => RETURN WITH ERROR Courier.NoSuchProcedureNumber;
END;
cH.zone.FREE[@parameters.location]; --free up the storage
[] ← results[]; --we don't have any results to send
END; --BulkDataDispatcher
CancelRendezvous: PROC[cancel: LONG POINTER TO CancelArguments] =
BEGIN
<<
This operation is a remote procedure call coming in from the active
party of a three party transfer. This procedure has been called instead
of Produce or Consume because of some inability of the active party.
Since this is a Produce or Consume replacement, this code looks a lot like
the code needed to rendezvous the incoming call with the passive object at
the passive site (see ServerRendezvous).
If the passive party is waiting here, tell him to go away, else hang
a new object on the list and he'll find us, hopefully before we time out.
>>
CoupleWithPassive: ENTRY PROC[wait: BOOLEAN] RETURNS[BOOLEAN ← FALSE] =
BEGIN
FOR passive: Handle ← list.first, passive.link UNTIL passive = NIL DO
IF passive.identifier = cancel.identifier THEN
BEGIN
WITH pH: passive SELECT FROM
passive =>
SELECT passive.seal FROM
cancelled =>
RETURN WITH ERROR Courier.SignalRemoteError[
BulkData.Error[transferAborted].ORD];
ENDCASE =>
BEGIN --that's the one we wanted
passive.seal ← cancelled; --well, he is now
Process.Abort[pH.process]; --and wake him up
RETURN[TRUE]; --and tell caller we found him
END;
ENDCASE;
END;
ENDLOOP;
<<
Passive call isn't here yet, so we'll supply an cancel object and
he'll rendezvous with it when he gets here. If we get called with
wait = TRUE then this is the first call. If we don't make the rendezvous
with wait = FALSE, then signal BulkData.Error[noSuchIdentifier].
>>
IF wait THEN WAIT active.rendezvous --it isn't abortable
ELSE RETURN WITH ERROR
Courier.SignalRemoteError[BulkData.Error[noSuchIdentifier].ORD];
END; --CoupleWithPassive
active: LONG POINTER TO cancel Object ← CourierInternal.longZone.NEW[
cancel Object ← [
seal: old, --old before its time
courier: NIL, --we don't have any idea
identifier: cancel.identifier, --copy incoming object
request: [none[]], use: cancel, --to be used as one
variant: cancel[process: Process.GetCurrent[]]]];
Process.SetTimeout[
@active.rendezvous, Process.SecondsToTicks[cancel.timeout]];
OnList[active]; --insert into list
BEGIN
ENABLE UNWIND => OffList[active];
SELECT TRUE FROM
(CoupleWithPassive[TRUE]) => NULL; --he's cancelled
(CoupleWithPassive[FALSE]) => NULL; --he finally got here
ENDCASE; --error will have been raised in CoupleWithPassive
END;
OffList[active]; --remove element we added for rendezvous
END; --CancelRendezvous
CancelTransfer: PUBLIC <<XStream>> PROC[handle: Handle] =
BEGIN
WITH h: handle SELECT FROM
immediate =>
BEGIN
<<
This is an immediate transfer that is being aborted. In order to
abort, we need to insure the SST is set, then send the attention.
This almost has to be done no matter what state the call is in. It
is possible to bum this out IFF we are the called party and the
sender (source) of the bulk data, but that's too many checks and
just sending the thing is "correct".
>>
AbortTransfer[h.courier.object.sH];
END;
active =>
BEGIN
<<
The active party in a 3-party transfer can't produce|consume the
data. He calls this routine instead so that the passive party won't
hang around forever for the rendezvous to arrive.
Is it interesting to notify the client of any errors here?
>>
cancel: CancelArguments ← [handle.identifier, WaitTime[h.passive.net]];
cH: Courier.Handle ← Courier.Create[
remote: [h.passive.net, h.passive.host, System.nullSocketNumber],
programNumber: BulkData.program, versionNumber: BulkData.version,
zone: h.courier.object.zone, classOfService: bulk];
[] ← Courier.Call[
cH: cH, procedureNumber: BulkData.Proc[cancel].ORD,
arguments: [@cancel, DescribeCancelArguments] !
Courier.Error, Courier.RemoteErrorSignalled => CONTINUE];
Courier.Delete[cH];
END;
passive =>
<<
What do you do here? I'd like to leave around a passive object
marked cancelled so when (IF) the active process came in, it could
detect the cancellation quickly. But I don't know if the object
is queued yet. One end or the other is going to take a chance on
having to time out. I'm going to let the active rendezvous be the one.
>>
{}; --fill in as needed
ENDCASE => ERROR BUG[bug];
handle.seal ← cancelled; --looks easy, but what does it mean?
END; --CancelTransfer
GetSppLength: PROC[body: NSBuffer.Body] RETURNS[NATURAL] = INLINE
{RETURN[
body.pktLength -
NSTypes.bytesPerIDPHeader -
NSTypes.bytesPerSppHeader]};
SetSppLength: PROC[body: NSBuffer.Body, bytes: NATURAL] = INLINE
{body.pktLength ← bytes +
NSTypes.bytesPerIDPHeader +
NSTypes.bytesPerSppHeader};
PacketFromBulk: PROC[sH: Stream.Handle] RETURNS[PacketStream.Handle] =
BEGIN
sH ← RealStream[sH]; --that's okay as far as it goes
sH ← LOOPHOLE[@LOOPHOLE[sH, CourierInternal.AugmentedStream].context,
LONG POINTER TO Stream.Handle]↑; --retch!!!! (copied from CourierImplN)
RETURN[SppOps.PacketStreamFromByteStream[sH]]; --more loop-de-loops
END; --PacketFromBulk
Copy: PUBLIC <<XStream>> PROC[sink, source: Stream.Handle] =
BEGIN
<<
This procedure dives in and finds the packet stream behind Courier's
transport. It uses that so any bulk data operation that uses copy only
has to BLT the data once from his buffer to the ethernet buffers. Wonder
if all this is really worth it?
>>
handle: Handle;
moved: NATURAL;
sH: Stream.Handle;
b: NSBuffer.Buffer;
body: NSBuffer.Body;
block: Environment.Block;
psH: PacketStream.Handle;
why: Stream.CompletionCode ← normal;
options: Stream.InputOptions = [
TRUE, FALSE, FALSE, FALSE, FALSE, FALSE, TRUE, FALSE];
SELECT TRUE FROM --figure out which stream is ours, which it theirs
(source.delete = DeleteNull), (source.delete = DeleteEnd) => sH ← source;
(sink.delete = DeleteNull), (sink.delete = DeleteEnd) => sH ← sink;
ENDCASE => BUG[bug]; --we're in trouble now
psH ← PacketFromBulk[sH]; --that's the real packet stream
handle ← NARROW[sH.clientData]; --and that's the XStream object
UNTIL why = endOfStream DO
ENABLE PacketStream.ConnectionSuspended => GOTO streamFailed;
SELECT handle.use FROM
gets =>
BEGIN
SELECT TRUE FROM
((b ← psH.get[]) # NIL) => body ← b.ns; --got it; copy local pointer
(~source.options.signalTimeout) => GOTO streamFailed; --worst
ENDCASE => {SIGNAL Stream.TimeOut[0]; LOOP}; --he wants to try again
IF body.endOfMessage THEN why ← endOfStream; --to get out of loop
block ← [LOOPHOLE[@body.sppBody], 0, GetSppLength[body]];
SELECT TRUE FROM
(body.attention) => why ← attention; --remote's bailing out
(block.stopIndexPlusOne = 0) => NULL; --nothing going on here
(body.subtype = BulkData.bulkSST) => --this is for us
sink.put[sink, block, FALSE]; --scribble the bits
ENDCASE => why ← endOfStream; --truncated data of some kind
psH.returnReceiveBuffer[b]; --give the buffer back
IF why = attention THEN GOTO remoteAbort; --set earlier
END;
puts =>
BEGIN
body ← (b ← psH.getSendBuffer[]).ns; --get buffer to fill
block ← [LOOPHOLE[@body.sppBody], 0, psH.getSenderSizeLimit[]];
[moved, why, ] ← source.get[source, block, source.options !
UNWIND => psH.returnSendBuffer[b]];
SetSppLength[body, moved]; --set the length of this move
body.subtype ← BulkData.bulkSST; --set the sst appropriately
IF why # normal THEN why ← endOfStream;
psH.put[b]; --send out the buffer
END;
truncated => GOTO remoteAbort; --got an attention packet
ENDCASE;
REPEAT
remoteAbort =>
BEGIN
<<[] ← sH.getByte[sH]; --consume inband attention>>
sH.sendAttention[sH, BulkData.attnByte];
sH.delete ← DeleteNull; --so we don't send endRecord
Process.Abort[Process.GetCurrent[]]; --set to abort myself
--we want to raise aborted anyhow, so let this signal do it--
DO --ENABLE ABORTED => REJECT-- [] ← sH.waitAttention[sH]; ENDLOOP;
END;
streamFailed => {sH.delete ← DeleteNull; ERROR ABORTED};
ENDLOOP;
END; --Copy
Create: PUBLIC <<XStream>> PROC[handle: Handle] RETURNS[sH: Stream.Handle] =
BEGIN
<<
These stream are simplex, i.e., they can't be used in both directions. The
direction this instance of stream will be used is determined by the routine
used to describe the BulkData.SINK | SOURCE. Whichever direction is defined
for this stream is assigned a local procedure. The other direction is
assigned a Stream.defaultObject value. That will result in an ERROR
Stream.InvalidOperation if the client tries to use it incorrectly.
>>
IF handle.request.access = deferred THEN GOTO nostream; --he can't do that
WITH h: handle SELECT FROM --pull the stream out of the object
active, passive, immediate =>
BEGIN
sH ← handle.courier.object.sH; --that's his stream
SELECT handle.use FROM --what does he want to do?
(puts) =>
BEGIN
sH.setSST[sH, BulkData.bulkSST]; --then set the SST
sH.put ← XStreamPut; --my own put procedure
sH.get ← Stream.defaultObject.get; --invalid operation
sH.delete ← DeleteEnd; --we have to send the endRecord
sH.sendNow ← SendNow; --so we can trap client endRecords
handle.attentionProc ← FORK AttentionProc[sH, handle]; --watcher
sH.clientData ← handle; --store so get/put can find it
END;
(gets) =>
BEGIN
sH.get ← XStreamGet; --my own get procedure
sH.put ← Stream.defaultObject.put; --invalid operation
sH.delete ← DeleteNull; --we don't have to send the end
handle.attentionProc ← FORK AttentionProc[sH, handle]; --watcher
sH.clientData ← handle; --store so get/put can find it
END;
ENDCASE => ERROR BUG[bug]; --what's going on?
END;
<<active, null, deferred, cancel, truncated>>
ENDCASE => GOTO nostream; --and he can't do those
EXITS nostream => ERROR Courier.Error[streamNotYours];
END; --Create
DeleteEnd: Stream.DeleteProcedure =
BEGIN
tsH: Stream.Handle = RealStream[sH];
tsH.sendNow[tsH, TRUE]; --send the endRecord bit
DeleteNull[sH]; --then finish off the stream
END; --DeleteEnd
DeleteNull: Stream.DeleteProcedure =
BEGIN
ch: CourierInternal.ConnectionHandle = RealCourier[sH];
handle: Handle ← NARROW[sH.clientData]; --get the object
Process.Abort[handle.attentionProc]; JOIN handle.attentionProc;
handle.attentionProc ← NIL; --so we don't do it twice
sH.delete ← Stream.defaultObject.delete; --so he doesn't do it twice
END; --DeleteNull
DescribeCancelArguments: Courier.Description =
{[] ← notes.noteSize[SIZE[CancelArguments]]};
DescribeSink: PUBLIC <<XStream>> Courier.Description =
BEGIN
OPEN n: LOOPHOLE[notes, POINTER TO CourierOps.NotesObject];
<<
Clients that
'fetch' an immediate|null 'sink' will be doing "gets"
'store' an immediate|null 'sink' will be doing "puts"
>>
descriptor: BulkData.Descriptor; --garden variety type
handle: LONG POINTER TO Handle ← notes.noteSize[SIZE[Handle]];
notes.noteDeadSpace[handle, SIZE[Handle]]; --dispense with parm area
SELECT notes.operation FROM
fetch =>
BEGIN
WITH h: handle SELECT FROM
null =>
BEGIN
IF handle.seal = smashed THEN ERROR BUG[reused];
descriptor ← [null[]]; --make the descriptor
notes.noteSpace[@descriptor, SIZE[null BulkData.Sink]];
handle.use ← gets; --tag it intended use
handle.seal ← old; --type it and mark it ready
h.courier ← n.ch; --tie Courier handle to request
END;
immediate =>
BEGIN
IF handle.seal = smashed THEN ERROR BUG[reused];
descriptor ← [immediate[]]; --make the descriptor
notes.noteSpace[@descriptor, SIZE[immediate BulkData.Sink]];
handle.seal ← old; --mark it ready
handle.use ← gets; --tag it intended use
h.courier ← n.ch; --tie Courier handle to request
END;
deferred =>
BEGIN
<<
Which remote should be used for passive, which for active?
>>
OPEN r: NARROW[handle.request, deferred XStream.Request];
IF WaitTime[r.sink.net] <= WaitTime[r.source.net] THEN
BEGIN
descriptor ← [passive[r.sink.net, r.sink.host, handle.identifier]];
h.passive ← n.ch; --tie Courier handle to request
END
ELSE
BEGIN
descriptor ← [active[r.sink.net, r.sink.host, handle.identifier]];
h.active ← n.ch; --tie Courier handle to request
END;
notes.noteSpace[@descriptor, SIZE[passive BulkData.Sink]];
handle.seal ← old; --type it and mark it ready
handle.use ← deferred; --that's its final state
END;
ENDCASE;
END;
store =>
BEGIN
notes.noteSpace[@descriptor, SIZE[null BulkData.Sink]];
handle↑ ← CourierInternal.longZone.NEW[Object ← [
seal: old, use: puts, identifier: TRASH,
courier: n.ch, variant: null[]]];
WITH d: descriptor SELECT FROM
null =>
BEGIN
handle↑.variant ← null[];
handle.identifier ← [n.ch.object.remote.host, NewID[]];
END;
immediate =>
BEGIN
handle↑.variant ← immediate[];
handle.identifier ← [n.ch.object.remote.host, NewID[]];
END;
active =>
BEGIN
notes.noteSpace[@d.network,
SIZE[active BulkData.Sink] - SIZE[null BulkData.Sink]];
handle.identifier ← d.identifier;
handle↑.variant ← active[
process: Process.GetCurrent[],
passive: [d.network, d.host]];
END;
passive =>
BEGIN
notes.noteSpace[@d.network,
SIZE[active BulkData.Sink] - SIZE[null BulkData.Sink]];
handle.identifier ← d.identifier;
handle↑.variant ← passive[
process: Process.GetCurrent[],
active: [d.network, d.host]];
END;
ENDCASE;
OnList[handle↑]; --tack it on to the list
END;
free => Destroy[handle↑]; --equivalent to delete
ENDCASE;
END; --DescribeSink
DescribeSource: PUBLIC <<XStream>> Courier.Description =
BEGIN
OPEN n: LOOPHOLE[notes, POINTER TO CourierOps.NotesObject];
<<
Clients that
'fetch' an immediate|null 'source' will be doing "puts"
'store' an immediate|null 'source' will be doing "gets"
>>
descriptor: BulkData.Descriptor; --garden variety type
handle: LONG POINTER TO Handle ← notes.noteSize[SIZE[Handle]];
notes.noteDeadSpace[handle, SIZE[Handle]];
SELECT notes.operation FROM
fetch =>
BEGIN
WITH h: handle SELECT FROM
null =>
BEGIN
IF handle.seal = smashed THEN ERROR BUG[reused];
descriptor ← [null[]]; --build the descriptor
notes.noteSpace[@descriptor, SIZE[null BulkData.Source]];
handle.seal ← old; --age it a bit
handle.use ← puts; --type it and mark it ready
h.courier ← n.ch; --tie Courier handle to request
END;
immediate =>
BEGIN
IF handle.seal = smashed THEN ERROR BUG[reused];
descriptor ← [immediate[]]; --build the descriptor
notes.noteSpace[@descriptor, SIZE[immediate BulkData.Source]];
handle.seal ← old; --age it a bit
handle.use ← puts; --type it and mark it ready
h.courier ← n.ch; --tie Courier handle to request
END;
deferred =>
BEGIN
<<
Which remote should be used for passive, which for active?
Just make sure it's opposite of what was done in describing the
sink. We don't want both parties to be active | passive.
>>
OPEN r: NARROW[handle.request, deferred XStream.Request];
IF WaitTime[r.sink.net] > WaitTime[r.source.net] THEN
BEGIN
descriptor ← [passive[
r.source.net, r.source.host, handle.identifier]];
h.passive ← n.ch; --tie Courier handle to request
END
ELSE
BEGIN
descriptor ← [active[
r.source.net, r.source.host, handle.identifier]];
h.active ← n.ch; --tie Courier handle to request
END;
handle.seal ← old; --age it a bit
handle.use ← deferred; --that's its final state
notes.noteSpace[@descriptor, SIZE[passive BulkData.Sink]];
END;
ENDCASE;
END;
store =>
BEGIN
notes.noteSpace[@descriptor, SIZE[null BulkData.Source]];
handle↑ ← CourierInternal.longZone.NEW[Object ← [
seal: old, use: gets, identifier: TRASH,
courier: n.ch, variant: null[]]];
WITH d: descriptor SELECT FROM
null =>
BEGIN
handle↑.variant ← null[];
handle.identifier ← [n.ch.object.remote.host, NewID[]];
END;
immediate =>
BEGIN
handle↑.variant ← immediate[];
handle.identifier ← [n.ch.object.remote.host, NewID[]];
END;
active =>
BEGIN
notes.noteSpace[@d.network,
SIZE[active BulkData.Source] - SIZE[null BulkData.Source]];
handle.identifier ← d.identifier;
handle↑.variant ← active[
process: Process.GetCurrent[],
passive: [d.network, d.host]];
END;
passive =>
BEGIN
notes.noteSpace[@d.network,
SIZE[active BulkData.Source] - SIZE[null BulkData.Source]];
handle.identifier ← d.identifier;
handle↑.variant ← passive[
process: Process.GetCurrent[],
active: [d.network, d.host]];
END;
ENDCASE;
OnList[handle↑]; --tack it on to the list
END;
free => Destroy[handle↑]; --delete the object
ENDCASE;
END; --DescribeSource
DescribeTransferArguments: Courier.Description =
{[] ← notes.noteSize[SIZE[TransferArguments]]};
Destroy, OffList: PUBLIC <<XStream>> ENTRY PROC[handle: Handle] =
BEGIN
p: Handle ← NIL;
FOR h: Handle ← list.first, h.link UNTIL h = NIL DO
IF h = handle THEN
BEGIN
handle.seal ← smashed; --smash the object's seal
IF p = NIL THEN list.first ← h.link ELSE p.link ← h.link;
CourierInternal.longZone.FREE[@handle];
list.count ← PRED[list.count]; --one less
EXIT; --let's not look further
END
ELSE p ← h;
REPEAT FINISHED => RETURN WITH ERROR Courier.Error[streamNotYours];
ENDLOOP;
END; --Destroy
Make: PUBLIC <<XStream>> PROC[request: XStream.Request]
RETURNS[handle: Handle] =
BEGIN
handle ← CourierInternal.longZone.NEW[Object ← [
request: request, seal: new, use: undeclared, courier: NIL,
identifier: [address.host, NewID[]], variant: null[]]];
WITH request SELECT FROM
none => handle.variant ← null[];
deferred => handle.variant ← deferred[];
stream, proc => handle.variant ← immediate[];
ENDCASE => ERROR BUG[bug];
OnList[handle]; --add it to the list
END; --Make
PassiveRendezvous: ENTRY PROC[passive: Handle] =
BEGIN
--PASSIVE ASSERTION MADE BY CALLER--
OPEN pH: NARROW[passive, LONG POINTER TO passive Object];
ENABLE UNWIND => NULL;
<<
Check the list, checking it twice, seeing who's here and who ain't.
If we find a match between our handle.descriptor.identifier and one that's
already in the list, toss him out of bed. Give him our handle.reqeust
and then go to sleep ourselves.
If we find no match, our entry is already there and the active process will
find it when he comes in from the cold. He should wake us when he goes home
and then we can return.
>>
Process.EnableAborts[@pH.rendezvous]; --in case active wants to gun us
Process.SetTimeout[
@pH.rendezvous, Process.SecondsToTicks[ --set a condition timeout
WaitTime[pH.courier.object.remote.net] + --us to initiator
WaitTime[pH.active.net]]]; --plus us to active
passive.seal ← rendezvous; --we're ready to link up
FOR active: Handle ← list.first, active.link UNTIL active = NIL DO
<<
If the active party managed to call us before we got here, he will
have linked up an active descriptor with a null host number. If
active is here, wake him up and go to sleep ourselves. It's his
repsonsibility to wake us up when he's finished the transfer.
>>
IF active.identifier = passive.identifier THEN
WITH aH: active SELECT FROM
active => {NOTIFY aH.rendezvous; EXIT}; --jar him loose and exit
ENDCASE; --this is not the one we want; continue searching
<<
The active isn't here yet - wait for him. When he comes in he will
process the bulk data and notify us when he's finished. We need to
wait only a reasonable amount of time for the active party to get here.
Once the transfer starts, it may take an arbitrary amount of time.
Setting the timeout - we're passive. That means that we are as close
or closer to the initiator than the active element is. The worst case
for distance between the initiator and the active would be the distance
from us to active plus the distance from us to initiator (that assumes
the path from initiator to active goes through the net we live on).
>>
ENDLOOP;
<<
In this loop, a wakeup when the the seal is rendezvous | transferring
is ignored.
A wakeup with any other seal exits the loop. It may be a timeout
or the transfer may be complete.
An ABORTED means that the active party called Cancel instead of
Send or Recieive.
>>
WHILE passive.seal IN[rendezvous..transferring] DO
ENABLE ABORTED => EXIT; WAIT pH.rendezvous; ENDLOOP;
END; --PassiveRendezvous
RealCourier: PROC[sH: Stream.Handle]
RETURNS[ch: CourierInternal.ConnectionHandle] = INLINE
{RETURN[LOOPHOLE[sH, CourierInternal.AugmentedStream].back]};
RealStream: PROC[sH: Stream.Handle] RETURNS[Stream.Handle] = INLINE
{RETURN[@LOOPHOLE[
sH, CourierInternal.AugmentedStream].back.transFilter.object]};
SendNow: Stream.SendNowProcedure =
BEGIN
--Don't let silly client screw up the protocol.
IF endRecord THEN ERROR Stream.InvalidOperation;
sH ← RealStream[sH]; sH.sendNow[sH, endRecord];
END; --SendNow
ServerCheckout: PUBLIC <<XStream>> PROC[
cH: Courier.Handle, request: XStream.Request] =
BEGIN
<<
This means the source/sink has been deserialized, but servers don't know
they are playing a 3-party game, so we have to move carefully.
When (IF) we locate the Object behind this Courier.Handle, the object's
seal has the information about whether or not we are playing a 3rd
party game. If we are doing such, then this routine will call off to
rendezvous with the other half of the party. That means if we are passive,
we wait for the active's incoming. Else we initiate a call (using the
Bulk Data Protocol procedure appropriate (Send | Receive).
>>
CoupleCourierToRequest: ENTRY PROC =
BEGIN
FOR handle ← list.first, handle.link UNTIL handle = NIL DO
WITH h: handle SELECT FROM
null => IF @h.courier.object = cH THEN EXIT;
immediate => IF @h.courier.object = cH THEN EXIT;
active => IF @h.courier.object = cH THEN EXIT;
passive => IF @h.courier.object = cH THEN EXIT;
ENDCASE;
REPEAT FINISHED => RETURN WITH ERROR Courier.Error[streamNotYours];
ENDLOOP;
END; --CoupleCourierToRequest
handle: Handle;
CoupleCourierToRequest[]; --search for the handle
handle.request ← request; --just for posterity
WITH handle SELECT FROM
null => {}; --I don't know what goes on here
immediate =>
BEGIN
ENABLE UNWIND => TestForStreamDeleted[handle];
WITH req: request SELECT FROM
none => {}; --accepts or supplies no data
stream => StreamCopy[handle]; --blind stream copy
proc => req.proc[handle]; --user supplied proc
ENDCASE;
TestForStreamDeleted[handle];
END;
active => ActiveRendezvous[handle];
passive => PassiveRendezvous[handle];
ENDCASE;
END; --ServerCheckout
StartProtocol: PUBLIC <<XStreamOps>> PROC =
BEGIN
ENABLE Courier.Error => CONTINUE; --should've been duplicate export
Courier.ExportRemoteProgram[
programNumber: BulkData.program,
versionRange: [BulkData.version, BulkData.version],
dispatcher: BulkDataDispatcher, serviceName: "BulkDataTransfer"L,
zone: CourierInternal.longZone, classOfService: bulk];
END; --StartProtocol
StopProtocol: PUBLIC <<XStreamOps>> PROC =
BEGIN
ENABLE Courier.Error => CONTINUE; --should've been no such export
Courier.UnexportRemoteProgram[
programNumber: BulkData.program,
versionRange: [BulkData.version, BulkData.version]];
END; --StopProtocol
ServerRendezvous: PROC[
cH: Courier.Handle,
transfer: LONG POINTER TO TransferArguments,
use: XStreamOps.IndicatedUse] =
BEGIN
<<
The active process is calling in to rendezvous with the passive party.
If the passive party is waiting here, process his Reqeuest, else hang
a new object on the list and he'll find us, hopefully before we time out.
>>
CoupleWithPassive: ENTRY PROC[wait: BOOLEAN] RETURNS[BOOLEAN ← FALSE] =
BEGIN
ENABLE UNWIND => NULL;
FOR passive ← list.first, passive.link UNTIL passive = NIL DO
IF passive.identifier = transfer.identifier THEN
BEGIN
WITH pH: passive SELECT FROM
passive =>
BEGIN
error: NATURAL;
SELECT TRUE FROM
(passive.seal = transferring) =>
error ← BulkData.Error[identifierBusy].ORD;
(passive.seal = cancelled) =>
error ← BulkData.Error[transferAborted].ORD;
(passive.use = use) =>
error ← BulkData.Error[wrongDirection].ORD;
(pH.active.host # cH.remote.host) =>
error ← BulkData.Error[wrongHost].ORD;
ENDCASE =>
BEGIN
passive.seal ← active.seal ← transferring;
RETURN[TRUE]; --ready to run
END;
RETURN WITH ERROR Courier.SignalRemoteError[error];
END;
ENDCASE;
END;
<<
Passive call isn't here yet, so we'll supply an active object and
he'll rendezvous with it when he gets here. If we get called with
wait = TRUE then this is the first call. If we're in the second
call, we were either notified out of the first or we timed out.
>>
ENDLOOP;
IF wait THEN WAIT active.rendezvous --wait for passive to arrive
ELSE RETURN WITH ERROR
Courier.SignalRemoteError[BulkData.Error[noSuchIdentifier].ORD];
END; --CoupleWithPassive
WakePassive: ENTRY PROC[seal: XStreamOps.ObjectSeal] = INLINE
BEGIN
OPEN pH: NARROW[passive, LONG POINTER TO passive Object];
passive.seal ← seal; --reset the use
NOTIFY pH.rendezvous; --life's so simple
END;
passive: Handle;
active: LONG POINTER TO active Object ← cH.zone.NEW[active Object ← [
seal: old, use: use, identifier: transfer.identifier,
courier: LOOPHOLE[cH], --get the handle wired in
variant: active[
process: Process.GetCurrent[], --this is us
passive: [System.nullNetworkNumber, System.nullHostNumber]]]];
Process.SetTimeout[
@active.rendezvous, Process.SecondsToTicks[transfer.timeout]];
Process.EnableAborts[@active.rendezvous];
OnList[active]; --insert into list
BEGIN
ENABLE
BEGIN
Courier.Error => ERROR BUG[bug];
UNWIND => {OffList[active]; IF passive # NIL THEN WakePassive[cancelled]};
END;
SELECT TRUE FROM
(CoupleWithPassive[TRUE]) => NULL; --passive was laying in the bushes
(CoupleWithPassive[FALSE]) => NULL; --he finally got here (in time too)
ENDCASE; --too little too late - errored from CoupleWithPassive
active.request ← passive.request; --copy original request (NIT)
active.use ← passive.use; --that's not just a NIT!
WITH req: active.request SELECT FROM
none => {}; --accepts or supplies no data
stream => StreamCopy[active]; --copies stream
proc => req.proc[active]; --then
ENDCASE;
OffList[active]; --get rid of active element
WakePassive[finished]; --and tell passive we're finished
END;
END; --ServerRendezvous
StreamCopy: PROC[h: Handle] =
BEGIN
OPEN req: NARROW[h.request, stream XStream.Request];
sH: Stream.Handle ← Create[h]; --lot of error checks
{ENABLE UNWIND => sH.delete[sH]; --keep track of that stream
SELECT h.use FROM --choose direction of bit traffic
puts => Copy[sH, req.sH]; --bits go that way
gets => Copy[req.sH, sH]; --or bits come this way
ENDCASE};
sH.delete[sH]; --we created it, so we delete it
END; --StreamCopy
TestForStreamDeleted: PROC[handle: Handle] =
BEGIN
IF handle.courier.object.sH.delete # Stream.defaultObject.delete THEN
ERROR BUG[undeleted];
CourierInternal.SetBulkStream[handle.courier];
END; --TestForStreamDeleted
UserCheckout: PUBLIC <<XStream>> PROC[cH: Courier.Handle] =
BEGIN
FindCourierHandleMatch: ENTRY PROC = INLINE
BEGIN
FOR handle ← list.first, handle.link UNTIL handle = NIL DO
WITH h: handle SELECT FROM
null => IF @h.courier.object = cH THEN EXIT;
immediate => IF @h.courier.object = cH THEN EXIT;
deferred =>
IF (@h.passive.object = cH) OR (@h.active.object = cH) THEN EXIT;
ENDCASE;
REPEAT FINISHED => RETURN WITH ERROR Courier.Error[streamNotYours];
ENDLOOP;
END; --FindCourierHandleMatch
handle: Handle;
--this means the source/sink has been deserialized
FindCourierHandleMatch[]; --see if we can locate the handle
WITH h: handle.request SELECT FROM
none => {}; --accepts or supplies no data
stream => StreamCopy[handle]; --blind stream copy
proc =>
BEGIN
ENABLE UNWIND => TestForStreamDeleted[handle];
h.proc[handle]; --client supplied proc
TestForStreamDeleted[handle];
END;
ENDCASE;
END; --UserCheckout
WaitTime: PROC[net: System.NetworkNumber] RETURNS[seconds: CARDINAL] =
BEGIN
<<Sure wish this wasn't monitored in some cases.>>
hops: CARDINAL ← Router.GetDelayToNet[net!
Router.NoTableEntryForNet => hops ← 0 --this oughta be fun--];
RETURN[baseTMO + (hops * hopTMO)];
END; --WaitTime
XStreamGet: Stream.GetProcedure =
BEGIN
myOptions: Stream.InputOptions = [
terminateOnEndRecord: TRUE, signalLongBlock: FALSE,
signalShortBlock: FALSE, signalSSTChange: FALSE,
signalEndOfStream: options.signalEndOfStream,
signalAttention: FALSE, signalTimeout: options.signalTimeout,
signalEndRecord: FALSE];
handle: Handle = NARROW[sH.clientData]; --that's the owning handle
ch: CourierInternal.ConnectionHandle = handle.courier; --owning courier
sH ← RealStream[sH]; --get the real stream
why ← normal; sst ← BulkData.bulkSST; --in case the block is a null block
bytesTransferred ← 0; --to make the loop invariant work
UNTIL ((block.stopIndexPlusOne - block.startIndex) = bytesTransferred) DO
BEGIN
IF handle.use = truncated THEN GOTO truncated; --that's all
[bytesTransferred, why, sst] ← sH.get[sH, block, myOptions];
SELECT why FROM
normal =>
SELECT TRUE FROM
(sst = BulkData.bulkSST) => EXIT; --(why = normal) AND (sst = bulk)
(sst # CourierProtocol.dataSST) => ERROR BUG[protocol]; --no way
(bytesTransferred # 0) => GOTO lostData; --data was system data
ENDCASE => LOOP; --my sst but no bytes - just ignore
sstChange =>
SELECT TRUE FROM
(sst = CourierProtocol.dataSST) => GOTO truncated; --short
(sst # BulkData.bulkSST) => ERROR BUG[protocol]; --no way
(bytesTransferred # 0) => GOTO lostData; --some system data passed
ENDCASE => LOOP; --retry the get again
endRecord =>
IF ~options.signalEndOfStream THEN {why ← endOfStream; EXIT}
ELSE DO SIGNAL Stream.EndOfStream[bytesTransferred]; ENDLOOP;
attention => GOTO truncated; --other guys bailing out
ENDCASE => ERROR BUG[bug]; --what else is there?
EXITS
lostData =>
BEGIN
CourierOps.RedirectStream[ch, block, bytesTransferred, why, sst];
WITH vep: ch SELECT FROM
user => SELECT TRUE FROM
(vep.versExchProc = NIL) => GOTO truncated; --already been done
(vep.versExchProc[ch] # noError) => GOTO truncated;
ENDCASE; --version exchange and it's okay - do .get again
ENDCASE => GOTO truncated; --wasn't a user connection
bytesTransferred ← 0; --reset loop invariant
END;
END;
REPEAT
truncated =>
BEGIN
handle.use ← truncated; --record this bit of trivia
RETURN WITH ERROR ABORTED; --simply abort the client
END;
ENDLOOP;
END; --XStreamGet
XStreamPut: Stream.PutProcedure =
BEGIN
IF NARROW[sH.clientData, Handle].use = truncated THEN
BEGIN
sH.sendAttention[sH, BulkData.attnByte]; --we see the close
sH.delete ← DeleteNull; --so we don't also send EOM
ERROR ABORTED; --never to return
END;
sH ← RealStream[sH]; --if you don't do this, you'll recurse
sH.put[sH, block, endRecord]; --else do the put for him
END; --XStreamPut
END. --XStreamImpl
LOG ( date - person - action )
2-Jun-85 15:10:00 AOF Created file
19-Oct-86 12:40:45 AOF The store/fetch vs source/sink => gets/puts mapping
21-Oct-86 15:03:39 AOF Aborting (swapping attn packets and other lies).
31-Oct-86 11:46:10 AOF Reorder statements in Create to avoid race with attn.
26-Nov-86 10:14:31 AOF Get Copy to use PacketStream.
8-Jan-87 10:30:57 AOF Lighten up on checking for stream being "reused".
27-Jan-87 18:42:40 AOF AttentionProc catches Courier.Error, ~ConnectionSusp.
4-Jun-87 11:28:00 AOF Xlate BD error to Courier.Error in ActiveRendezvous.
4-Jun-87 12:27:23 AOF Allow for active to arrive earlier than passive.
3-Sep-87 14:49:34 AOF Watch out for NIL from psH.get in $Copy.
23-Sep-87 9:35:21 AOF Use Stream.defaultObject.* on for invalid stream ops.
23-Sep-87 19:46:30 AOF Help save client from him/her self.
9-Jan-88 17:59:34 AOF Playing the version # exchange game.
28-Jan-88 9:39:12 AOF AR#12728 - sending eom twice using $Copy.