-- File: NSDataStreamImpl.mesa - last edit:
-- AOF 9-Dec-87 19:59:46
-- Copyright (C) 1984, 1985, 1987 by Xerox Corporation. All rights reserved.
DIRECTORY
BulkDataTransfer USING [
Descriptor, immediateDescriptor, nullDescriptor, Sink, Source],
Courier USING [Description, Error, Handle, SystemElement],
CourierInternal USING [longZone],
CourierOps USING [
StackBlockHandle, StackBlockPush, stackPageLength, StackBlockPop],
Environment USING [Byte, bytesPerPage],
NSDataStream USING [
Couple, ErrorCode, Handle, SinkStream, Sink, Source, SourceStream, Ticket],
Process USING [Abort, CancelAbort, GetCurrent, Pause, SecondsToTicks, Yield],
Stream USING [
Attention, Block, Byte, CompletionCode, defaultInputOptions, defaultObject,
EndOfStream, InputOptions, GetProcedure, Handle, Object, PutProcedure,
SSTChange, SubSequenceType, TimeOut],
StreamCouple USING [Create];
NSDataStreamImpl: MONITOR
IMPORTS Courier, CourierInternal, CourierOps, Process, Stream, StreamCouple
EXPORTS NSDataStream =
BEGIN
---- ---- ---- ---- ---- ---- ---- ----
-- Types
---- ---- ---- ---- ---- ---- ---- ----
AbortStatus: TYPE = {none, bySender, byReceiver};
Direction: TYPE = {send, receive};
DS: TYPE = LONG POINTER TO DSObject;
DSObject: TYPE = RECORD [
object: Stream.Object,
baseStream: Stream.Handle,
clientData: LONG UNSPECIFIED ← LONG[NIL],
deleteProc: PROC [Stream.Handle, LONG UNSPECIFIED] ← NIL,
deleteWithStatusProc: PROC [Stream.Handle, Status] ← NIL,
id: CARDINAL,
activated, endOfStreamEncountered, deleted, notifiedOfAbort, local,
attentionByteReceivedInBand, attentionByteReceivedOutOfBand,
courierRejectSeen: BOOLEAN ← FALSE,
aborted: AbortStatus ← none,
direction: Direction,
attentionWaiter: PROCESS RETURNS [BOOLEAN] ← NIL,
rejectWatcher: PROCESS RETURNS [BOOLEAN] ← NIL,
attentionByte: Stream.Byte ← NULL,
streamOpProcess: PROCESS ← NIL,
seal: Seal ← okay];
Seal: TYPE = MACHINE DEPENDENT{(0), okay(1987), busted, (LAST[NATURAL])};
ID: TYPE = CARDINAL;
Side: TYPE = {this, that};
Status: TYPE = RECORD [aborted: BOOLEAN];
StreamPair: TYPE = ARRAY Side OF Stream.Handle;
StreamRequest: TYPE = LONG POINTER TO StreamRequestObject;
StreamRequestObject: TYPE = RECORD [
id: ID,
ds: DS,
transition: CONDITION,
variant: SELECT type: StreamRequestState FROM
requested => [direction: Direction],
registeredCancelled => [status: Status, cH: Courier.Handle],
registeredFree => [status: Status],
registered => [
systemElement: Courier.SystemElement,
cH: Courier.Handle,
direction: Direction],
registeredWaiting => [
ticket: NSDataStream.Ticket,
cH: Courier.Handle,
other: StreamRequest ← NIL],
registeredInUse => [stream: Stream.Handle, other: StreamRequest ← NIL],
inUse, free => [stream: Stream.Handle],
ENDCASE];
StreamRequestState: TYPE = {
requested, -- one end local
registered, -- one end remote
registeredWaiting, -- one end local, the other remote, no stream announced
registeredInUse, -- local-remote stream established
registeredCancelled, -- local-remote cancelled
registeredFree, -- local-remote: remote abort or end-of-stream received
inUse, -- local stream established (both ends)
free}; -- one end of local stream deleted
SRList: TYPE = LONG POINTER TO SRObject;
SRObject: TYPE = RECORD [first: StreamRequest, rest: SRList];
SRRef: TYPE = LONG POINTER TO SRList;
CancelStreamAction: TYPE = RECORD [
SELECT type: * FROM
none => NULL, waitForDeletion, terminate => [ds: DS], ENDCASE];
---- ---- ---- ---- ---- ---- ---- ----
-- Constants & Variables
---- ---- ---- ---- ---- ---- ---- ----
currentID: ID ← 0;
dataSST: Stream.SubSequenceType = 1;
noProcess: PROCESS = LOOPHOLE[LAST[INTEGER]];
defaultTimeout: LONG CARDINAL = LAST[LONG CARDINAL];
immediateTicket: NSDataStream.Ticket = LOOPHOLE[
BulkDataTransfer.immediateDescriptor];
nullTicket: NSDataStream.Ticket = LOOPHOLE[
BulkDataTransfer.nullDescriptor];
srList: SRList ← NIL;
srListProcs: RECORD [
Find: PROCEDURE [list: SRRef, element: StreamRequest] RETURNS [SRRef],
Free: PROCEDURE [list: SRRef, zone: UNCOUNTED ZONE ← NIL],
New: PROCEDURE [element: StreamRequest, list: SRList ← NIL] RETURNS [SRList]];
ticketVariantSizes: ARRAY [0..4) OF CARDINAL ← [
SIZE[null BulkDataTransfer.Descriptor], SIZE[
immediate BulkDataTransfer.Descriptor], SIZE[
passive BulkDataTransfer.Descriptor], SIZE[
active BulkDataTransfer.Descriptor]];
desTicketVariantSizes: LONG DESCRIPTOR FOR ARRAY CARDINAL OF CARDINAL ←
DESCRIPTOR[BASE[ticketVariantSizes], LENGTH[ticketVariantSizes]];
---- ---- ---- ---- ---- ---- ---- ----
-- SIGNALS
---- ---- ---- ---- ---- ---- ---- ----
Aborted: PUBLIC ERROR = CODE;
Error: PUBLIC ERROR [errorCode: NSDataStream.ErrorCode] = CODE;
InternalError: ERROR = CODE;
---- ---- ---- ---- ---- ---- ---- ----
-- PUBLIC PROCEDURES
---- ---- ---- ---- ---- ---- ---- ----
Abort: PUBLIC PROCEDURE [stream: NSDataStream.Handle] =
{InitiateAbort[GetActiveDS[stream ! Aborted => CONTINUE]]};
AnnounceStream: PUBLIC PROCEDURE [cH: Courier.Handle] =
BEGIN
thisDS, otherDS: DS;
cancelled: BOOLEAN;
cH.sH.setTimeout[cH.sH, defaultTimeout];
[thisDS, otherDS, cancelled] ← FillInStream[cH];
IF thisDS # NIL THEN {
IF cancelled THEN TerminateDataStream[thisDS];
IF otherDS # NIL THEN
CopyStreamToStream[
to: IF thisDS.direction = send THEN thisDS ELSE otherDS,
from: IF thisDS.direction = receive THEN thisDS ELSE otherDS];
[] ← WaitForDeletion[thisDS]}
END;
<<
AssertLocal is like a Get/Put of 0 bytes, except it won't block when
the stream is not yet activated and it won't raise Aborted.
>>
AssertLocal: PUBLIC PROCEDURE [stream: NSDataStream.Handle] =
{[] ← GetActiveDS[stream, TRUE]};
CancelTicket: PUBLIC PROCEDURE [
ticket: NSDataStream.Ticket, cH: Courier.Handle] =
BEGIN
IF ticket # nullTicket THEN
BEGIN
action: CancelStreamAction = CancelStream[ticket, cH];
WITH a: action SELECT FROM
none => NULL;
waitForDeletion => [] ← WaitForDeletion[a.ds];
terminate => TerminateDataStream[a.ds];
ENDCASE;
END
END;
CreateCouple: PUBLIC PROCEDURE RETURNS [couple: NSDataStream.Couple] =
BEGIN
id: ID ← NewID[];
couple.source ← [InternalCreate[NIL, receive, id]];
couple.sink ← [InternalCreate[NIL, send, id]];
END;
DescribeTicket: PUBLIC Courier.Description --[notes: Notes]-- =
BEGIN
parameters: LONG POINTER TO BulkDataTransfer.Descriptor = notes.noteSize[
size: SIZE[BulkDataTransfer.Descriptor]];
notes.noteChoice[
site: parameters, size: SIZE[BulkDataTransfer.Descriptor],
variant: desTicketVariantSizes];
END;
Find: INTERNAL PROC[list: SRRef, element: StreamRequest] RETURNS [SRRef] =
BEGIN
FOR list ← list, @list.rest UNTIL list↑ = NIL DO
IF list.first=element THEN RETURN[list]
ENDLOOP;
RETURN[NIL]
END; --Find
Free: INTERNAL PROC[list: SRRef, zone: UNCOUNTED ZONE ← NIL] =
BEGIN
handle: SRList ← list↑;
IF zone # NIL[UNCOUNTED ZONE] THEN
zone.FREE[@LOOPHOLE[list.first, LONG POINTER]];
list↑ ← list.rest;
CourierInternal.longZone.FREE[@handle]
END; --Free
New: INTERNAL PROC[element: StreamRequest, list: SRList ← NIL]
RETURNS [handle: SRList] =
BEGIN
handle ← CourierInternal.longZone.NEW[SRObject];
handle↑ ← [element, list]
END; --New
OpenSink: PUBLIC PROCEDURE [ticket: NSDataStream.Ticket, cH: Courier.Handle]
RETURNS [NSDataStream.SinkStream] =
BEGIN
sink: BulkDataTransfer.Sink ← LOOPHOLE[ticket];
WITH sink SELECT FROM
null => RETURN[[[NIL]]];
immediate => RETURN[[CreateFilter[cH.sH, send, 0, NIL]]];
ENDCASE => ERROR Error[tooManyTickets] -- third party not implemented
END;
OpenSource: PUBLIC PROCEDURE [ticket: NSDataStream.Ticket, cH: Courier.Handle]
RETURNS [NSDataStream.SourceStream] =
BEGIN
source: BulkDataTransfer.Source ← LOOPHOLE[ticket];
WITH source SELECT FROM
null => RETURN[[[NIL]]];
immediate => RETURN[[CreateFilter[cH.sH, receive, 0, NIL]]];
ENDCASE => ERROR Error[tooManyTickets] -- third party not implemented
END;
OperateOnSink: PUBLIC PROCEDURE [
sink: NSDataStream.Sink, operation: PROCEDURE [NSDataStream.SinkStream]] =
BEGIN
WITH s: sink SELECT FROM
proc =>
BEGIN
couple: NSDataStream.Couple ← CreateCouple[];
p: PROCESS ← FORK operation[couple.sink];
s.proc[couple.source ! UNWIND => JOIN p];
JOIN p;
RETURN
END;
stream => operation[s.stream];
none => operation[[[NIL]]];
ENDCASE
END;
OperateOnSource: PUBLIC PROCEDURE [
source: NSDataStream.Source,
operation: PROCEDURE [NSDataStream.SourceStream]] =
BEGIN
WITH s: source SELECT FROM
proc =>
BEGIN
couple: NSDataStream.Couple ← CreateCouple[];
p: PROCESS ← FORK operation[couple.source];
s.proc[couple.sink ! UNWIND => JOIN p];
JOIN p;
RETURN
END;
stream => operation[s.stream];
none => operation[[[NIL]]];
ENDCASE
END;
Register: PUBLIC ENTRY PROCEDURE [
stream: NSDataStream.Handle, forUseAt: Courier.SystemElement,
cH: Courier.Handle, useImmediateTicket: BOOLEAN ← TRUE]
RETURNS [ticket: NSDataStream.Ticket ← immediateTicket] =
BEGIN
ENABLE UNWIND => NULL;
ds: DS ← DSFromHandle[stream];
otherStreamRequest: StreamRequest ← NIL;
streamRequest: StreamRequest;
IF ds = NIL THEN RETURN[nullTicket];
FOR list: SRList ← srList, list.rest UNTIL list = NIL[SRList] DO
streamRequest ← list.first;
IF streamRequest.id = ds.id THEN
WITH request: streamRequest SELECT FROM
requested =>
BEGIN
IF request.direction = ds.direction THEN ERROR InternalError[];
streamRequest.variant ← registeredWaiting[immediateTicket, cH];
BROADCAST streamRequest.transition;
DeleteDS[@ds];
RETURN
END;
registered =>
BEGIN
IF request.direction = ds.direction THEN ERROR InternalError[];
otherStreamRequest ← streamRequest
END;
ENDCASE => ERROR InternalError[];
REPEAT
FINISHED =>
BEGIN
streamRequest ← NewStreamRequest[ds.id, ds];
srList ← srListProcs.New[streamRequest, srList];
IF otherStreamRequest = NIL THEN
srList.first.variant ← registered[forUseAt, cH, ds.direction]
ELSE
BEGIN
srList.first.variant ← registeredWaiting[
immediateTicket, cH, otherStreamRequest];
WITH request: otherStreamRequest SELECT FROM
registered =>
BEGIN
otherCH: Courier.Handle ← request.cH;
otherStreamRequest.variant ← registeredWaiting[
immediateTicket, otherCH, streamRequest];
BROADCAST otherStreamRequest.transition
END;
ENDCASE => ERROR InternalError[]
END
END;
ENDLOOP;
DO
WITH request: streamRequest SELECT FROM
registered => WAIT streamRequest.transition;
registeredWaiting => RETURN[request.ticket];
registeredCancelled => RETURN[immediateTicket]; -- he'll find out later
ENDCASE => InternalError[];
ENDLOOP
END;
SetStreamTimeout: PUBLIC PROCEDURE [
stream: NSDataStream.Handle, waitTimeInSeconds: LONG CARDINAL] =
BEGIN
ENABLE Aborted => CONTINUE;
ds: DS ← GetActiveDS[stream];
IF NOT ds.local THEN
ds.baseStream.setTimeout[ds.baseStream, waitTimeInSeconds * 1000];
END;
---- ---- ---- ---- ---- ---- ---- ----
-- PRIVATE PROCEDURES
---- ---- ---- ---- ---- ---- ---- ----
ActivateDS: PROCEDURE [ds: DS, notificationOnly: BOOLEAN ← FALSE] =
BEGIN
IF ~ds.activated THEN
BEGIN
inputOptions: Stream.InputOptions ← Stream.defaultInputOptions;
ds.activated ← TRUE;
IF ds.baseStream = NIL THEN
BEGIN
ds.baseStream ← GetStream[ds, notificationOnly];
ds.deleteWithStatusProc ← FreeStream;
END;
IF ds.baseStream = NIL THEN ds.activated ← FALSE -- AssertLocal case
ELSE {
inputOptions.terminateOnEndRecord ← TRUE;
ds.baseStream.options ← inputOptions;
IF ds.direction = send THEN
BEGIN
ds.baseStream.setSST[ds.baseStream, dataSST];
IF NOT ds.local THEN ds.rejectWatcher ← FORK RejectWatcher[ds]
END;
ds.attentionWaiter ← FORK AttentionWaiter[ds]}
END;
IF ds.aborted = bySender
OR (ds.aborted = byReceiver AND ds.direction = send) THEN
IF notificationOnly THEN RETURN ELSE ErrorAborted[ds];
END;
AnnounceAttention: ENTRY PROCEDURE [ds: DS, actuallySeen: BOOLEAN] =
BEGIN
IF actuallySeen THEN ds.attentionByteReceivedOutOfBand ← TRUE
ELSE ds.courierRejectSeen ← TRUE;
SELECT TRUE FROM
(ds.streamOpProcess = NIL), (ds.streamOpProcess = noProcess) => NULL;
(~ds.local) => Process.Abort[ds.streamOpProcess];
ENDCASE;
ds.streamOpProcess ←
IF ds.streamOpProcess # NIL AND NOT actuallySeen THEN noProcess ELSE NIL
END;
AssignGetAndPutProcs: PROCEDURE [direction: Direction]
RETURNS [Stream.GetProcedure, Stream.PutProcedure] =
BEGIN
PutProcs: ARRAY Direction OF Stream.PutProcedure = [
LOOPHOLE[PutProcedure], Stream.defaultObject.put];
GetProcs: ARRAY Direction OF Stream.GetProcedure = [
Stream.defaultObject.get, LOOPHOLE[GetProcedure]];
RETURN[GetProcs[direction], PutProcs[direction]]
END;
AttentionWaiter: PROCEDURE [ds: DS] RETURNS [received: BOOLEAN ← FALSE] =
BEGIN
ENABLE ABORTED, Courier.Error => CONTINUE;
[] ← ds.baseStream.waitAttention[ds.baseStream];
AnnounceAttention[ds, (received ← TRUE)]
END;
CancelStream: ENTRY PROCEDURE [
ticket: NSDataStream.Ticket, cH: Courier.Handle]
RETURNS [action: CancelStreamAction ← [none[]]] =
BEGIN
ENABLE UNWIND => NULL;
DeleteStreamAndRequest: PROCEDURE [streamRequest: StreamRequest] =
BEGIN DeleteDS[@streamRequest.ds]; FreeStreamRequest[streamRequest]; END;
streamRequest: StreamRequest;
FOR list: SRList ← srList, list.rest UNTIL list = NIL[SRList] DO
streamRequest ← list.first;
WITH request: streamRequest SELECT FROM
registeredWaiting =>
BEGIN
otherStreamRequest: StreamRequest ← request.other;
IF request.cH # cH THEN LOOP;
IF otherStreamRequest = NIL THEN -- local-remote data transfer
BEGIN
streamRequest.ds.aborted ← bySender;
streamRequest.variant ← registeredCancelled[[TRUE], cH];
BROADCAST streamRequest.transition;
RETURN[[waitForDeletion[streamRequest.ds]]]
END
ELSE -- remote-remote data transfer
BEGIN
DeleteStreamAndRequest[streamRequest];
WITH otherRequest: otherStreamRequest SELECT FROM
registeredWaiting => {
otherCH: Courier.Handle ← otherRequest.cH;
otherStreamRequest.variant ← registeredCancelled[
[TRUE], otherCH];
BROADCAST otherStreamRequest.transition};
registeredInUse => RETURN[[terminate[otherRequest.ds]]];
ENDCASE => ERROR InternalError[];
END;
EXIT
END;
registeredCancelled =>
IF request.cH = cH THEN DeleteStreamAndRequest[streamRequest];
ENDCASE
ENDLOOP
END;
CheckForAbortBefore: PROCEDURE [ds: DS] =
{IF StartStreamOp[ds].aborted THEN ProcessAbort[ds, FALSE]};
CheckForAbortAfter: PROCEDURE [ds: DS, abortSeen: BOOLEAN] =
BEGIN
aborted: BOOLEAN;
Process.Yield[]; -- Give the attention waiter a chance to run.
aborted ← EndStreamOp[ds].aborted;
-- Pause to allow the ABORTED signal to be flushed.
IF aborted AND NOT abortSeen AND NOT ds.local THEN {
Process.Pause[Process.SecondsToTicks[60] ! ABORTED => CONTINUE]};
IF aborted OR ds.courierRejectSeen THEN ProcessAbort[ds, FALSE]
END;
CheckForOutOfBandAttention: PROCEDURE [ds: DS]
RETURNS [receivedIt: BOOLEAN ← FALSE] =
BEGIN
abortSeen: BOOLEAN ← FALSE;
self: PROCESS = Process.GetCurrent[];
Process.Abort[self];
BEGIN
ENABLE
BEGIN
ABORTED => {abortSeen ← TRUE; CONTINUE};
Stream.TimeOut, Courier.Error => CONTINUE;
END;
[] ← ds.baseStream.waitAttention[ds.baseStream];
receivedIt ← TRUE
END;
IF NOT abortSeen THEN Process.CancelAbort[self];
ds.attentionByteReceivedOutOfBand ← receivedIt;
END;
Close: PROCEDURE [ds: DS] =
BEGIN
ENABLE
Courier.Error, Stream.TimeOut => HandleProblem[ds, FALSE];
SynchronizeAttentions[ds];
IF ds.direction = send THEN
SELECT ds.aborted FROM
none =>
ds.baseStream.sendNow[ds.baseStream, TRUE !
Stream.Attention => {
ds.attentionByteReceivedOutOfBand ← TRUE;
SynchronizeAttentions[ds];
IF ds.notifiedOfAbort THEN CONTINUE ELSE ErrorAborted[ds]}];
byReceiver => ds.baseStream.sendAttention[ds.baseStream, 1];
ENDCASE
END;
CopyStreamToStream: PROCEDURE [to, from: DS] =
BEGIN
abortStatus: AbortStatus ← none;
toStream: NSDataStream.Handle ← HandleFromDS[to];
fromStream: NSDataStream.Handle ← HandleFromDS[from];
buffer: CourierOps.StackBlockHandle ← CourierOps.StackBlockPush[NIL];
bufferSize: INTEGER = CourierOps.stackPageLength * Environment.bytesPerPage;
block: Stream.Block ← [
blockPointer: LOOPHOLE[buffer],
startIndex: 0, stopIndexPlusOne: bufferSize];
whyStop: Stream.CompletionCode ← normal;
UNTIL whyStop = endOfStream DO
block.stopIndexPlusOne ← bufferSize;
[bytesTransferred: block.stopIndexPlusOne, why: whyStop] ←
fromStream.get[fromStream, block, fromStream.options !
Aborted => {abortStatus ← bySender; EXIT}];
toStream.put[toStream, block, FALSE !
Aborted => {abortStatus ← byReceiver; EXIT}];
ENDLOOP;
buffer.nextBlock ← NIL; [] ← CourierOps.StackBlockPop[buffer];
IF abortStatus = bySender THEN Abort[toStream];
toStream.delete[toStream ! Aborted => {abortStatus ← byReceiver; CONTINUE}];
IF abortStatus = byReceiver THEN Abort[fromStream];
fromStream.delete[fromStream]
END;
CreateFilter: PROCEDURE [
networkStream: Stream.Handle, direction: Direction,
clientData: LONG UNSPECIFIED,
deleteProc: PROC [Stream.Handle, LONG UNSPECIFIED]]
RETURNS [NSDataStream.Handle] = {
networkStream.setTimeout[networkStream, defaultTimeout];
RETURN[
InternalCreate[networkStream, direction, NewID[], clientData, deleteProc]]};
DeleteProcedure: PROCEDURE [stream: NSDataStream.Handle] =
BEGIN
aborted: BOOLEAN ← FALSE;
ds: DS ← DSFromHandle[stream];
notifiedOfAbort: BOOLEAN ← ds.notifiedOfAbort;
ds.deleted ← TRUE;
ActivateDS[
ds ! Aborted => {ds.notifiedOfAbort ← notifiedOfAbort; CONTINUE}];
IF ds.direction = receive AND ~ds.endOfStreamEncountered THEN Abort[stream];
notifiedOfAbort ← ds.notifiedOfAbort;
IF ds.baseStream # NIL THEN
BEGIN
IF ds.deleteProc # NIL THEN ds.deleteProc[ds.baseStream, ds.clientData];
IF ds.rejectWatcher # NIL THEN
BEGIN
Process.Abort[ds.rejectWatcher];
[] ← JOIN ds.rejectWatcher;
ds.rejectWatcher ← NIL;
END;
IF ds.attentionWaiter # NIL THEN
BEGIN
Process.Abort[ds.attentionWaiter];
aborted ← JOIN ds.attentionWaiter;
ds.attentionWaiter ← NIL;
END;
Close[ds ! Aborted => {aborted ← TRUE; CONTINUE}];
IF ds.deleteWithStatusProc # NIL THEN
ds.deleteWithStatusProc[ds.baseStream, [ds.aborted # none]];
END;
DeleteDS[@ds];
IF aborted AND NOT notifiedOfAbort THEN ERROR Aborted
END;
DeleteDS: PROCEDURE [ds: LONG POINTER TO DS] =
{ds.seal ← busted; CourierInternal.longZone.FREE[ds]};
DSFromHandle: PROCEDURE [stream: NSDataStream.Handle] RETURNS [DS] = INLINE
BEGIN
OPEN ds: LOOPHOLE[stream, DS];
SELECT TRUE FROM
(stream = NIL) => RETURN[NIL]; --that means it's a null ticket
(ds.seal # okay) => ERROR InternalError; --somebody deleted it!
ENDCASE => RETURN[@ds]; --give back the coerced handle
END; --DSFromHandle
EndStreamOp: ENTRY PROCEDURE [ds: DS] RETURNS [aborted: BOOLEAN] =
BEGIN
aborted ← (ds.streamOpProcess = NIL OR ds.streamOpProcess = noProcess);
ds.streamOpProcess ← NIL
END;
ErrorAborted: PROCEDURE [ds: DS] = {ds.notifiedOfAbort ← TRUE; ERROR Aborted};
FillInStream: ENTRY PROCEDURE [cH: Courier.Handle]
RETURNS [thisDS, otherDS: DS ← NIL, cancelled: BOOLEAN ← FALSE] =
BEGIN
ENABLE UNWIND => NULL;
streamRequest, otherRequest: StreamRequest;
FOR list: SRList ← srList, list.rest UNTIL list = NIL[SRList] DO
streamRequest ← list.first;
thisDS ← streamRequest.ds;
WITH request: streamRequest SELECT FROM
registeredWaiting =>
IF request.cH = cH THEN
BEGIN
otherRequest ← request.other;
otherDS ←
IF otherRequest # NIL AND otherRequest.type = registeredInUse THEN
otherRequest.ds ELSE NIL;
BROADCAST streamRequest.transition;
EXIT
END;
registeredCancelled =>
IF request.cH = cH THEN {otherRequest ← NIL; cancelled ← TRUE; EXIT};
ENDCASE;
REPEAT FINISHED => RETURN[thisDS: NIL]
ENDLOOP;
streamRequest.variant ← registeredInUse[cH.sH, otherRequest];
thisDS.baseStream ← cH.sH;
thisDS.deleteWithStatusProc ← FreeStream;
IF otherRequest # NIL OR cancelled THEN ReverseDirection[thisDS];
BROADCAST streamRequest.transition;
END;
FlushToEndOfStream: PROCEDURE [ds: DS, waitIfNecessary: BOOLEAN] =
BEGIN
sH: Stream.Handle ← ds.baseStream;
IF NOT (ds.local OR waitIfNecessary) THEN sH.setTimeout[sH, 0];
BEGIN
ENABLE
Courier.Error, Stream.TimeOut => {
ds.attentionByteReceivedInBand ← TRUE; CONTINUE};
array: PACKED ARRAY [0..1] OF Environment.Byte;
why: Stream.CompletionCode;
sst: Stream.SubSequenceType;
firstTime: BOOLEAN ← TRUE; -- workaround for lost first attention byte.
UNTIL ds.endOfStreamEncountered OR ds.attentionByteReceivedInBand DO
[, why, sst] ← sH.get[sH, [@array, 1, 2], sH.options];
SELECT why FROM
normal =>
IF firstTime THEN {
firstTime ← FALSE;
IF NOT ds.local AND CheckForOutOfBandAttention[ds] THEN
sH.setTimeout[sH, 0]};
endRecord, endOfStream => ds.endOfStreamEncountered ← TRUE;
sstChange =>
IF sst # dataSST THEN ds.attentionByteReceivedInBand ← TRUE;
attention => {
[] ← sH.get[sH, [@array, 1, 2], sH.options];
ds.attentionByteReceivedInBand ← TRUE};
ENDCASE;
ENDLOOP
END;
END;
FreeStream: ENTRY PROCEDURE [stream: Stream.Handle, status: Status] =
BEGIN
ENABLE UNWIND => NULL;
FOR list: SRList ← srList, list.rest UNTIL list = NIL[SRList] DO
streamRequest: StreamRequest ← list.first;
WITH request: streamRequest SELECT FROM
registeredInUse =>
BEGIN
IF request.stream # stream THEN LOOP;
streamRequest.variant ← registeredFree[status];
BROADCAST streamRequest.transition;
RETURN;
END;
inUse =>
BEGIN
IF request.stream # stream THEN LOOP;
streamRequest.variant ← free[stream];
FOR list: SRList ← srList, list.rest UNTIL list = NIL[SRList] DO
IF list.first # streamRequest AND list.first.id = request.id THEN
DO
SELECT list.first.type FROM
free => {BROADCAST streamRequest.transition; RETURN};
inUse =>
DO
WAIT list.first.transition;
WITH sr: list.first SELECT FROM
free =>
BEGIN
stream.delete[stream]; sr.stream.delete[sr.stream];
FreeStreamRequest[streamRequest];
FreeStreamRequest[list.first];
RETURN
END;
ENDCASE;
ENDLOOP;
ENDCASE => EXIT
ENDLOOP;
ENDLOOP;
InternalError[]
END;
ENDCASE;
ENDLOOP;
InternalError[]
END;
FreeStreamRequest: PROCEDURE [streamRequest: StreamRequest] = {
other: StreamRequest ←
WITH request: streamRequest SELECT FROM
registeredWaiting => request.other,
registeredInUse => request.other,
ENDCASE => NIL;
IF other # NIL THEN
WITH request: other SELECT FROM
registeredWaiting => request.other ← NIL;
registeredInUse => request.other ← NIL;
ENDCASE;
srListProcs.Free[
srListProcs.Find[@srList, streamRequest], CourierInternal.longZone]};
GetActiveDS: PROCEDURE [
stream: NSDataStream.Handle, notificationOnly: BOOLEAN ← FALSE]
RETURNS [ds: DS] = INLINE {
ActivateDS[(ds ← DSFromHandle[stream]), notificationOnly]};
GetProcedure: PROCEDURE [
stream: NSDataStream.Handle, block: Stream.Block,
options: Stream.InputOptions]
RETURNS [
bytesTransferred: CARDINAL, why: Stream.CompletionCode,
sst: Stream.SubSequenceType] =
BEGIN
BEGIN
abortSeen: BOOLEAN ← FALSE;
ds: DS ← GetActiveDS[stream];
IF ds.endOfStreamEncountered THEN {
IF options.signalEndOfStream THEN
ERROR Stream.EndOfStream[block.startIndex];
bytesTransferred ← 0;
why ← endOfStream;
sst ← dataSST}
ELSE
BEGIN
CheckForAbortBefore[ds];
BEGIN
ENABLE
BEGIN
Courier.Error --[truncatedTransfer] --, Stream.TimeOut =>
HandleProblem[DSFromHandle[stream], FALSE];
Aborted, Stream.EndOfStream =>
CheckForAbortAfter[ds, abortSeen ! Aborted => CONTINUE]
END;
DO
[bytesTransferred, why, sst] ← ds.baseStream.get[
ds.baseStream, block, ds.baseStream.options !
ABORTED => {abortSeen ← TRUE; GOTO SeenAbort}];
SELECT why FROM
normal => EXIT;
sstChange => IF sst # dataSST THEN GOTO unexpectedPacketSubtype;
endRecord, endOfStream =>
IF bytesTransferred = block.stopIndexPlusOne - block.startIndex THEN
{why ← normal; ds.endOfStreamEncountered ← TRUE; EXIT}
ELSE {
why ← endOfStream;
ds.endOfStreamEncountered ← TRUE;
IF options.signalEndOfStream THEN
ERROR Stream.EndOfStream[block.startIndex + bytesTransferred]
ELSE RETURN};
attention => ProcessAbort[ds, TRUE]; -- should not return
ENDCASE => ERROR InternalError[];
REPEAT
unexpectedPacketSubtype => HandleProblem[ds, TRUE];
SeenAbort => NULL;
ENDLOOP
END;
CheckForAbortAfter[ds, abortSeen];
IF abortSeen THEN ERROR ABORTED;
END
END
END;
GetStream: ENTRY PROCEDURE [ds: DS, dontWait: BOOLEAN ← FALSE]
RETURNS [stream: Stream.Handle] =
BEGIN
ENABLE UNWIND => NULL;
streamRequest: StreamRequest;
FOR ref: SRRef ← @srList, @ref.rest UNTIL ref↑ = NIL[SRList] DO
streamRequest ← ref.first;
IF streamRequest.id = ds.id THEN
WITH request: streamRequest SELECT FROM
registered =>
BEGIN
cH: Courier.Handle ← request.cH;
IF request.direction = ds.direction THEN ERROR InternalError[];
DeleteDS[@streamRequest.ds];
streamRequest.ds ← ds;
streamRequest.variant ← registeredWaiting[immediateTicket, cH];
BROADCAST streamRequest.transition;
EXIT
END;
registeredWaiting =>
IF request.other = NIL THEN EXIT ELSE ERROR InternalError[];
registeredInUse =>
IF request.other = NIL THEN EXIT ELSE ERROR InternalError[];
inUse => EXIT;
requested =>
IF request.direction = ds.direction THEN EXIT
ELSE
BEGIN
streamPair: StreamPair = GetStreamPair[];
newStreamRequest: StreamRequest ← NewStreamRequest[ds.id, ds];
streamRequest.ds.local ← ds.local ← TRUE;
newStreamRequest.variant ← inUse[streamPair[this]];
srList ← srListProcs.New[newStreamRequest, srList];
streamRequest.variant ← inUse[streamPair[that]];
BROADCAST streamRequest.transition;
RETURN[streamPair[this]]
END;
registeredCancelled => EXIT;
ENDCASE => ERROR;
REPEAT
FINISHED =>
BEGIN
streamRequest ← NewStreamRequest[ds.id, ds, ds.direction];
srList ← srListProcs.New[streamRequest, srList]
END
ENDLOOP;
IF dontWait THEN RETURN[NIL];
DO
WITH request: streamRequest SELECT FROM
requested, registeredWaiting => WAIT streamRequest.transition;
inUse => RETURN[request.stream];
registeredInUse => RETURN[request.stream];
registeredCancelled => {
status: Status ← request.status;
streamRequest.variant ← registeredFree[status];
BROADCAST streamRequest.transition;
ErrorAborted[request.ds]};
registeredFree => ErrorAborted[request.ds];
ENDCASE => InternalError[];
ENDLOOP;
END;
GetStreamPair: PROCEDURE RETURNS [streams: StreamPair] = {
[streams[this], streams[that]] ← StreamCouple.Create[]};
HandleFromDS: PROCEDURE [ds: DS] RETURNS [NSDataStream.Handle] = INLINE {
RETURN[LOOPHOLE[ds]]};
HandleProblem: PROCEDURE [ds: DS, streamOK: BOOLEAN ← TRUE] =
BEGIN
IF NOT streamOK THEN ds.attentionByteReceivedInBand ← TRUE;
IF ds.aborted = none THEN
IF streamOK THEN InitiateAbort[ds] ELSE ds.aborted ← bySender;
ErrorAborted[ds]
END;
InitiateAbort: PROCEDURE [ds: DS] =
BEGIN
ENABLE Stream.TimeOut, Courier.Error => CONTINUE;
IF ds.aborted # none OR ds.endOfStreamEncountered THEN RETURN;
ds.aborted ← IF ds.direction = send THEN bySender ELSE byReceiver;
ds.baseStream.setSST[ds.baseStream, dataSST];
ds.baseStream.sendAttention[ds.baseStream, 1]
END;
InternalCreate: PROCEDURE [
stream: Stream.Handle, direction: Direction, id: ID,
clientData: LONG UNSPECIFIED ← LONG[NIL],
deleteProc: PROC [Stream.Handle, LONG UNSPECIFIED] ← NIL]
RETURNS [NSDataStream.Handle] =
BEGIN
ds: DS ← CourierInternal.longZone.NEW[
DSObject ← [
object: Stream.defaultObject, baseStream: stream, id: id,
clientData: clientData, direction: direction]];
[ds.object.get, ds.object.put] ← AssignGetAndPutProcs[direction];
ds.object.delete ← LOOPHOLE[DeleteProcedure];
ds.deleteProc ← deleteProc;
RETURN[[@ds.object]]
END;
NewID: ENTRY PROCEDURE RETURNS [ID] = {RETURN[currentID ← currentID + 1]};
NewStreamRequest: PROCEDURE [
id: ID, ds: DS ← NIL, direction: Direction ← send]
RETURNS [streamRequest: StreamRequest] = {
streamRequest ← CourierInternal.longZone.NEW[
StreamRequestObject ← [id, ds, , requested[direction]]]};
ProcessAbort: PROCEDURE [ds: DS, receivedInBand: BOOLEAN] =
BEGIN
ds.aborted ← IF ds.direction = send THEN byReceiver ELSE bySender;
IF receivedInBand THEN ds.attentionByteReceivedInBand ← TRUE;
ErrorAborted[ds]
END;
PutProcedure: PROCEDURE [
stream: NSDataStream.Handle, block: Stream.Block,
endPhysicalRecord: BOOLEAN] =
BEGIN
abortSeen: BOOLEAN ← FALSE;
ds: DS ← GetActiveDS[stream];
IF block.stopIndexPlusOne = block.startIndex THEN RETURN;
CheckForAbortBefore[ds];
BEGIN
ENABLE
BEGIN
Courier.Error, Stream.TimeOut =>
HandleProblem[DSFromHandle[stream], FALSE];
Aborted => CheckForAbortAfter[ds, abortSeen ! Aborted => CONTINUE]
END;
ds.baseStream.put[ds.baseStream, block, FALSE !
Stream.Attention => ProcessAbort[ds, FALSE];
ABORTED => {abortSeen ← TRUE; CONTINUE}]
END;
CheckForAbortAfter[ds, abortSeen];
IF abortSeen THEN ERROR ABORTED;
END;
RejectWatcher: PROCEDURE [ds: DS] RETURNS [received: BOOLEAN ← FALSE] =
BEGIN
ENABLE ABORTED => CONTINUE;
[] ← ds.baseStream.getByte[ds.baseStream !
Courier.Error =>
IF errorCode = transportTimeout THEN GOTO abort ELSE CONTINUE;
Stream.SSTChange, Stream.TimeOut => GOTO abort];
received ← TRUE;
AnnounceAttention[ds, FALSE];
EXITS abort => NULL;
END;
ReverseDirection: INTERNAL PROCEDURE [ds: DS] =
BEGIN
ds.direction ← IF ds.direction = send THEN receive ELSE send;
[ds.object.get, ds.object.put] ← AssignGetAndPutProcs[ds.direction]
END;
StartStreamOp: ENTRY PROCEDURE [ds: DS] RETURNS [aborted: BOOLEAN ← FALSE] =
BEGIN
IF ds.attentionByteReceivedOutOfBand OR ds.courierRejectSeen THEN
RETURN[TRUE];
ds.streamOpProcess ← Process.GetCurrent[]
END;
SynchronizeAttentions: PROCEDURE [ds: DS] =
BEGIN
DO
IF ds.attentionByteReceivedInBand
AND NOT ds.attentionByteReceivedOutOfBand THEN {
[] ← CheckForOutOfBandAttention[ds]; EXIT}
ELSE
SELECT ds.direction FROM
send =>
IF ds.attentionByteReceivedOutOfBand
AND NOT ds.attentionByteReceivedInBand THEN
FlushToEndOfStream[ds, FALSE]
ELSE EXIT;
receive =>
IF NOT (ds.attentionByteReceivedInBand OR ds.endOfStreamEncountered)
THEN FlushToEndOfStream[ds, NOT ds.attentionByteReceivedOutOfBand]
ELSE EXIT;
ENDCASE
ENDLOOP
END;
TerminateDataStream: PROCEDURE [ds: DS] =
BEGIN
Abort[HandleFromDS[ds]];
HandleFromDS[ds].delete[HandleFromDS[ds] ! Aborted => CONTINUE]
END;
WaitForDeletion: ENTRY PROCEDURE [ds: DS] RETURNS [status: Status] =
BEGIN
ENABLE UNWIND => NULL;
streamRequest: StreamRequest;
FOR list: SRList ← srList, list.rest UNTIL list = NIL[SRList] DO
streamRequest ← list.first;
IF ds = streamRequest.ds THEN EXIT
REPEAT FINISHED => ERROR InternalError[]
ENDLOOP;
DO
WITH request: streamRequest SELECT FROM
registeredInUse, registeredCancelled => WAIT streamRequest.transition;
registeredFree => {status ← request.status; EXIT};
ENDCASE => InternalError[];
ENDLOOP;
FreeStreamRequest[streamRequest];
END;
srListProcs ← [Find, Free, New];
END.
LOG ( date - person - action )
20-Dec-84 15:31:10 - AOF - Post Klamath
27-May-85 12:07:32 - AOF - Starting rewrite
25-Nov-85 7:26:41 - AOF - AR# 7888