-- File: NSDataStreamImpl.mesa - last edit: -- AOF 9-Dec-87 19:59:46 -- Copyright (C) 1984, 1985, 1987 by Xerox Corporation. All rights reserved. DIRECTORY BulkDataTransfer USING [ Descriptor, immediateDescriptor, nullDescriptor, Sink, Source], Courier USING [Description, Error, Handle, SystemElement], CourierInternal USING [longZone], CourierOps USING [ StackBlockHandle, StackBlockPush, stackPageLength, StackBlockPop], Environment USING [Byte, bytesPerPage], NSDataStream USING [ Couple, ErrorCode, Handle, SinkStream, Sink, Source, SourceStream, Ticket], Process USING [Abort, CancelAbort, GetCurrent, Pause, SecondsToTicks, Yield], Stream USING [ Attention, Block, Byte, CompletionCode, defaultInputOptions, defaultObject, EndOfStream, InputOptions, GetProcedure, Handle, Object, PutProcedure, SSTChange, SubSequenceType, TimeOut], StreamCouple USING [Create]; NSDataStreamImpl: MONITOR IMPORTS Courier, CourierInternal, CourierOps, Process, Stream, StreamCouple EXPORTS NSDataStream = BEGIN ---- ---- ---- ---- ---- ---- ---- ---- -- Types ---- ---- ---- ---- ---- ---- ---- ---- AbortStatus: TYPE = {none, bySender, byReceiver}; Direction: TYPE = {send, receive}; DS: TYPE = LONG POINTER TO DSObject; DSObject: TYPE = RECORD [ object: Stream.Object, baseStream: Stream.Handle, clientData: LONG UNSPECIFIED ← LONG[NIL], deleteProc: PROC [Stream.Handle, LONG UNSPECIFIED] ← NIL, deleteWithStatusProc: PROC [Stream.Handle, Status] ← NIL, id: CARDINAL, activated, endOfStreamEncountered, deleted, notifiedOfAbort, local, attentionByteReceivedInBand, attentionByteReceivedOutOfBand, courierRejectSeen: BOOLEAN ← FALSE, aborted: AbortStatus ← none, direction: Direction, attentionWaiter: PROCESS RETURNS [BOOLEAN] ← NIL, rejectWatcher: PROCESS RETURNS [BOOLEAN] ← NIL, attentionByte: Stream.Byte ← NULL, streamOpProcess: PROCESS ← NIL, seal: Seal ← okay]; Seal: TYPE = MACHINE DEPENDENT{(0), okay(1987), busted, (LAST[NATURAL])}; ID: TYPE = CARDINAL; Side: TYPE = {this, that}; Status: TYPE = RECORD [aborted: BOOLEAN]; StreamPair: TYPE = ARRAY Side OF Stream.Handle; StreamRequest: TYPE = LONG POINTER TO StreamRequestObject; StreamRequestObject: TYPE = RECORD [ id: ID, ds: DS, transition: CONDITION, variant: SELECT type: StreamRequestState FROM requested => [direction: Direction], registeredCancelled => [status: Status, cH: Courier.Handle], registeredFree => [status: Status], registered => [ systemElement: Courier.SystemElement, cH: Courier.Handle, direction: Direction], registeredWaiting => [ ticket: NSDataStream.Ticket, cH: Courier.Handle, other: StreamRequest ← NIL], registeredInUse => [stream: Stream.Handle, other: StreamRequest ← NIL], inUse, free => [stream: Stream.Handle], ENDCASE]; StreamRequestState: TYPE = { requested, -- one end local registered, -- one end remote registeredWaiting, -- one end local, the other remote, no stream announced registeredInUse, -- local-remote stream established registeredCancelled, -- local-remote cancelled registeredFree, -- local-remote: remote abort or end-of-stream received inUse, -- local stream established (both ends) free}; -- one end of local stream deleted SRList: TYPE = LONG POINTER TO SRObject; SRObject: TYPE = RECORD [first: StreamRequest, rest: SRList]; SRRef: TYPE = LONG POINTER TO SRList; CancelStreamAction: TYPE = RECORD [ SELECT type: * FROM none => NULL, waitForDeletion, terminate => [ds: DS], ENDCASE]; ---- ---- ---- ---- ---- ---- ---- ---- -- Constants & Variables ---- ---- ---- ---- ---- ---- ---- ---- currentID: ID ← 0; dataSST: Stream.SubSequenceType = 1; noProcess: PROCESS = LOOPHOLE[LAST[INTEGER]]; defaultTimeout: LONG CARDINAL = LAST[LONG CARDINAL]; immediateTicket: NSDataStream.Ticket = LOOPHOLE[ BulkDataTransfer.immediateDescriptor]; nullTicket: NSDataStream.Ticket = LOOPHOLE[ BulkDataTransfer.nullDescriptor]; srList: SRList ← NIL; srListProcs: RECORD [ Find: PROCEDURE [list: SRRef, element: StreamRequest] RETURNS [SRRef], Free: PROCEDURE [list: SRRef, zone: UNCOUNTED ZONE ← NIL], New: PROCEDURE [element: StreamRequest, list: SRList ← NIL] RETURNS [SRList]]; ticketVariantSizes: ARRAY [0..4) OF CARDINAL ← [ SIZE[null BulkDataTransfer.Descriptor], SIZE[ immediate BulkDataTransfer.Descriptor], SIZE[ passive BulkDataTransfer.Descriptor], SIZE[ active BulkDataTransfer.Descriptor]]; desTicketVariantSizes: LONG DESCRIPTOR FOR ARRAY CARDINAL OF CARDINAL ← DESCRIPTOR[BASE[ticketVariantSizes], LENGTH[ticketVariantSizes]]; ---- ---- ---- ---- ---- ---- ---- ---- -- SIGNALS ---- ---- ---- ---- ---- ---- ---- ---- Aborted: PUBLIC ERROR = CODE; Error: PUBLIC ERROR [errorCode: NSDataStream.ErrorCode] = CODE; InternalError: ERROR = CODE; ---- ---- ---- ---- ---- ---- ---- ---- -- PUBLIC PROCEDURES ---- ---- ---- ---- ---- ---- ---- ---- Abort: PUBLIC PROCEDURE [stream: NSDataStream.Handle] = {InitiateAbort[GetActiveDS[stream ! Aborted => CONTINUE]]}; AnnounceStream: PUBLIC PROCEDURE [cH: Courier.Handle] = BEGIN thisDS, otherDS: DS; cancelled: BOOLEAN; cH.sH.setTimeout[cH.sH, defaultTimeout]; [thisDS, otherDS, cancelled] ← FillInStream[cH]; IF thisDS # NIL THEN { IF cancelled THEN TerminateDataStream[thisDS]; IF otherDS # NIL THEN CopyStreamToStream[ to: IF thisDS.direction = send THEN thisDS ELSE otherDS, from: IF thisDS.direction = receive THEN thisDS ELSE otherDS]; [] ← WaitForDeletion[thisDS]} END; << AssertLocal is like a Get/Put of 0 bytes, except it won't block when the stream is not yet activated and it won't raise Aborted. >> AssertLocal: PUBLIC PROCEDURE [stream: NSDataStream.Handle] = {[] ← GetActiveDS[stream, TRUE]}; CancelTicket: PUBLIC PROCEDURE [ ticket: NSDataStream.Ticket, cH: Courier.Handle] = BEGIN IF ticket # nullTicket THEN BEGIN action: CancelStreamAction = CancelStream[ticket, cH]; WITH a: action SELECT FROM none => NULL; waitForDeletion => [] ← WaitForDeletion[a.ds]; terminate => TerminateDataStream[a.ds]; ENDCASE; END END; CreateCouple: PUBLIC PROCEDURE RETURNS [couple: NSDataStream.Couple] = BEGIN id: ID ← NewID[]; couple.source ← [InternalCreate[NIL, receive, id]]; couple.sink ← [InternalCreate[NIL, send, id]]; END; DescribeTicket: PUBLIC Courier.Description --[notes: Notes]-- = BEGIN parameters: LONG POINTER TO BulkDataTransfer.Descriptor = notes.noteSize[ size: SIZE[BulkDataTransfer.Descriptor]]; notes.noteChoice[ site: parameters, size: SIZE[BulkDataTransfer.Descriptor], variant: desTicketVariantSizes]; END; Find: INTERNAL PROC[list: SRRef, element: StreamRequest] RETURNS [SRRef] = BEGIN FOR list ← list, @list.rest UNTIL list↑ = NIL DO IF list.first=element THEN RETURN[list] ENDLOOP; RETURN[NIL] END; --Find Free: INTERNAL PROC[list: SRRef, zone: UNCOUNTED ZONE ← NIL] = BEGIN handle: SRList ← list↑; IF zone # NIL[UNCOUNTED ZONE] THEN zone.FREE[@LOOPHOLE[list.first, LONG POINTER]]; list↑ ← list.rest; CourierInternal.longZone.FREE[@handle] END; --Free New: INTERNAL PROC[element: StreamRequest, list: SRList ← NIL] RETURNS [handle: SRList] = BEGIN handle ← CourierInternal.longZone.NEW[SRObject]; handle↑ ← [element, list] END; --New OpenSink: PUBLIC PROCEDURE [ticket: NSDataStream.Ticket, cH: Courier.Handle] RETURNS [NSDataStream.SinkStream] = BEGIN sink: BulkDataTransfer.Sink ← LOOPHOLE[ticket]; WITH sink SELECT FROM null => RETURN[[[NIL]]]; immediate => RETURN[[CreateFilter[cH.sH, send, 0, NIL]]]; ENDCASE => ERROR Error[tooManyTickets] -- third party not implemented END; OpenSource: PUBLIC PROCEDURE [ticket: NSDataStream.Ticket, cH: Courier.Handle] RETURNS [NSDataStream.SourceStream] = BEGIN source: BulkDataTransfer.Source ← LOOPHOLE[ticket]; WITH source SELECT FROM null => RETURN[[[NIL]]]; immediate => RETURN[[CreateFilter[cH.sH, receive, 0, NIL]]]; ENDCASE => ERROR Error[tooManyTickets] -- third party not implemented END; OperateOnSink: PUBLIC PROCEDURE [ sink: NSDataStream.Sink, operation: PROCEDURE [NSDataStream.SinkStream]] = BEGIN WITH s: sink SELECT FROM proc => BEGIN couple: NSDataStream.Couple ← CreateCouple[]; p: PROCESS ← FORK operation[couple.sink]; s.proc[couple.source ! UNWIND => JOIN p]; JOIN p; RETURN END; stream => operation[s.stream]; none => operation[[[NIL]]]; ENDCASE END; OperateOnSource: PUBLIC PROCEDURE [ source: NSDataStream.Source, operation: PROCEDURE [NSDataStream.SourceStream]] = BEGIN WITH s: source SELECT FROM proc => BEGIN couple: NSDataStream.Couple ← CreateCouple[]; p: PROCESS ← FORK operation[couple.source]; s.proc[couple.sink ! UNWIND => JOIN p]; JOIN p; RETURN END; stream => operation[s.stream]; none => operation[[[NIL]]]; ENDCASE END; Register: PUBLIC ENTRY PROCEDURE [ stream: NSDataStream.Handle, forUseAt: Courier.SystemElement, cH: Courier.Handle, useImmediateTicket: BOOLEAN ← TRUE] RETURNS [ticket: NSDataStream.Ticket ← immediateTicket] = BEGIN ENABLE UNWIND => NULL; ds: DS ← DSFromHandle[stream]; otherStreamRequest: StreamRequest ← NIL; streamRequest: StreamRequest; IF ds = NIL THEN RETURN[nullTicket]; FOR list: SRList ← srList, list.rest UNTIL list = NIL[SRList] DO streamRequest ← list.first; IF streamRequest.id = ds.id THEN WITH request: streamRequest SELECT FROM requested => BEGIN IF request.direction = ds.direction THEN ERROR InternalError[]; streamRequest.variant ← registeredWaiting[immediateTicket, cH]; BROADCAST streamRequest.transition; DeleteDS[@ds]; RETURN END; registered => BEGIN IF request.direction = ds.direction THEN ERROR InternalError[]; otherStreamRequest ← streamRequest END; ENDCASE => ERROR InternalError[]; REPEAT FINISHED => BEGIN streamRequest ← NewStreamRequest[ds.id, ds]; srList ← srListProcs.New[streamRequest, srList]; IF otherStreamRequest = NIL THEN srList.first.variant ← registered[forUseAt, cH, ds.direction] ELSE BEGIN srList.first.variant ← registeredWaiting[ immediateTicket, cH, otherStreamRequest]; WITH request: otherStreamRequest SELECT FROM registered => BEGIN otherCH: Courier.Handle ← request.cH; otherStreamRequest.variant ← registeredWaiting[ immediateTicket, otherCH, streamRequest]; BROADCAST otherStreamRequest.transition END; ENDCASE => ERROR InternalError[] END END; ENDLOOP; DO WITH request: streamRequest SELECT FROM registered => WAIT streamRequest.transition; registeredWaiting => RETURN[request.ticket]; registeredCancelled => RETURN[immediateTicket]; -- he'll find out later ENDCASE => InternalError[]; ENDLOOP END; SetStreamTimeout: PUBLIC PROCEDURE [ stream: NSDataStream.Handle, waitTimeInSeconds: LONG CARDINAL] = BEGIN ENABLE Aborted => CONTINUE; ds: DS ← GetActiveDS[stream]; IF NOT ds.local THEN ds.baseStream.setTimeout[ds.baseStream, waitTimeInSeconds * 1000]; END; ---- ---- ---- ---- ---- ---- ---- ---- -- PRIVATE PROCEDURES ---- ---- ---- ---- ---- ---- ---- ---- ActivateDS: PROCEDURE [ds: DS, notificationOnly: BOOLEAN ← FALSE] = BEGIN IF ~ds.activated THEN BEGIN inputOptions: Stream.InputOptions ← Stream.defaultInputOptions; ds.activated ← TRUE; IF ds.baseStream = NIL THEN BEGIN ds.baseStream ← GetStream[ds, notificationOnly]; ds.deleteWithStatusProc ← FreeStream; END; IF ds.baseStream = NIL THEN ds.activated ← FALSE -- AssertLocal case ELSE { inputOptions.terminateOnEndRecord ← TRUE; ds.baseStream.options ← inputOptions; IF ds.direction = send THEN BEGIN ds.baseStream.setSST[ds.baseStream, dataSST]; IF NOT ds.local THEN ds.rejectWatcher ← FORK RejectWatcher[ds] END; ds.attentionWaiter ← FORK AttentionWaiter[ds]} END; IF ds.aborted = bySender OR (ds.aborted = byReceiver AND ds.direction = send) THEN IF notificationOnly THEN RETURN ELSE ErrorAborted[ds]; END; AnnounceAttention: ENTRY PROCEDURE [ds: DS, actuallySeen: BOOLEAN] = BEGIN IF actuallySeen THEN ds.attentionByteReceivedOutOfBand ← TRUE ELSE ds.courierRejectSeen ← TRUE; SELECT TRUE FROM (ds.streamOpProcess = NIL), (ds.streamOpProcess = noProcess) => NULL; (~ds.local) => Process.Abort[ds.streamOpProcess]; ENDCASE; ds.streamOpProcess ← IF ds.streamOpProcess # NIL AND NOT actuallySeen THEN noProcess ELSE NIL END; AssignGetAndPutProcs: PROCEDURE [direction: Direction] RETURNS [Stream.GetProcedure, Stream.PutProcedure] = BEGIN PutProcs: ARRAY Direction OF Stream.PutProcedure = [ LOOPHOLE[PutProcedure], Stream.defaultObject.put]; GetProcs: ARRAY Direction OF Stream.GetProcedure = [ Stream.defaultObject.get, LOOPHOLE[GetProcedure]]; RETURN[GetProcs[direction], PutProcs[direction]] END; AttentionWaiter: PROCEDURE [ds: DS] RETURNS [received: BOOLEAN ← FALSE] = BEGIN ENABLE ABORTED, Courier.Error => CONTINUE; [] ← ds.baseStream.waitAttention[ds.baseStream]; AnnounceAttention[ds, (received ← TRUE)] END; CancelStream: ENTRY PROCEDURE [ ticket: NSDataStream.Ticket, cH: Courier.Handle] RETURNS [action: CancelStreamAction ← [none[]]] = BEGIN ENABLE UNWIND => NULL; DeleteStreamAndRequest: PROCEDURE [streamRequest: StreamRequest] = BEGIN DeleteDS[@streamRequest.ds]; FreeStreamRequest[streamRequest]; END; streamRequest: StreamRequest; FOR list: SRList ← srList, list.rest UNTIL list = NIL[SRList] DO streamRequest ← list.first; WITH request: streamRequest SELECT FROM registeredWaiting => BEGIN otherStreamRequest: StreamRequest ← request.other; IF request.cH # cH THEN LOOP; IF otherStreamRequest = NIL THEN -- local-remote data transfer BEGIN streamRequest.ds.aborted ← bySender; streamRequest.variant ← registeredCancelled[[TRUE], cH]; BROADCAST streamRequest.transition; RETURN[[waitForDeletion[streamRequest.ds]]] END ELSE -- remote-remote data transfer BEGIN DeleteStreamAndRequest[streamRequest]; WITH otherRequest: otherStreamRequest SELECT FROM registeredWaiting => { otherCH: Courier.Handle ← otherRequest.cH; otherStreamRequest.variant ← registeredCancelled[ [TRUE], otherCH]; BROADCAST otherStreamRequest.transition}; registeredInUse => RETURN[[terminate[otherRequest.ds]]]; ENDCASE => ERROR InternalError[]; END; EXIT END; registeredCancelled => IF request.cH = cH THEN DeleteStreamAndRequest[streamRequest]; ENDCASE ENDLOOP END; CheckForAbortBefore: PROCEDURE [ds: DS] = {IF StartStreamOp[ds].aborted THEN ProcessAbort[ds, FALSE]}; CheckForAbortAfter: PROCEDURE [ds: DS, abortSeen: BOOLEAN] = BEGIN aborted: BOOLEAN; Process.Yield[]; -- Give the attention waiter a chance to run. aborted ← EndStreamOp[ds].aborted; -- Pause to allow the ABORTED signal to be flushed. IF aborted AND NOT abortSeen AND NOT ds.local THEN { Process.Pause[Process.SecondsToTicks[60] ! ABORTED => CONTINUE]}; IF aborted OR ds.courierRejectSeen THEN ProcessAbort[ds, FALSE] END; CheckForOutOfBandAttention: PROCEDURE [ds: DS] RETURNS [receivedIt: BOOLEAN ← FALSE] = BEGIN abortSeen: BOOLEAN ← FALSE; self: PROCESS = Process.GetCurrent[]; Process.Abort[self]; BEGIN ENABLE BEGIN ABORTED => {abortSeen ← TRUE; CONTINUE}; Stream.TimeOut, Courier.Error => CONTINUE; END; [] ← ds.baseStream.waitAttention[ds.baseStream]; receivedIt ← TRUE END; IF NOT abortSeen THEN Process.CancelAbort[self]; ds.attentionByteReceivedOutOfBand ← receivedIt; END; Close: PROCEDURE [ds: DS] = BEGIN ENABLE Courier.Error, Stream.TimeOut => HandleProblem[ds, FALSE]; SynchronizeAttentions[ds]; IF ds.direction = send THEN SELECT ds.aborted FROM none => ds.baseStream.sendNow[ds.baseStream, TRUE ! Stream.Attention => { ds.attentionByteReceivedOutOfBand ← TRUE; SynchronizeAttentions[ds]; IF ds.notifiedOfAbort THEN CONTINUE ELSE ErrorAborted[ds]}]; byReceiver => ds.baseStream.sendAttention[ds.baseStream, 1]; ENDCASE END; CopyStreamToStream: PROCEDURE [to, from: DS] = BEGIN abortStatus: AbortStatus ← none; toStream: NSDataStream.Handle ← HandleFromDS[to]; fromStream: NSDataStream.Handle ← HandleFromDS[from]; buffer: CourierOps.StackBlockHandle ← CourierOps.StackBlockPush[NIL]; bufferSize: INTEGER = CourierOps.stackPageLength * Environment.bytesPerPage; block: Stream.Block ← [ blockPointer: LOOPHOLE[buffer], startIndex: 0, stopIndexPlusOne: bufferSize]; whyStop: Stream.CompletionCode ← normal; UNTIL whyStop = endOfStream DO block.stopIndexPlusOne ← bufferSize; [bytesTransferred: block.stopIndexPlusOne, why: whyStop] ← fromStream.get[fromStream, block, fromStream.options ! Aborted => {abortStatus ← bySender; EXIT}]; toStream.put[toStream, block, FALSE ! Aborted => {abortStatus ← byReceiver; EXIT}]; ENDLOOP; buffer.nextBlock ← NIL; [] ← CourierOps.StackBlockPop[buffer]; IF abortStatus = bySender THEN Abort[toStream]; toStream.delete[toStream ! Aborted => {abortStatus ← byReceiver; CONTINUE}]; IF abortStatus = byReceiver THEN Abort[fromStream]; fromStream.delete[fromStream] END; CreateFilter: PROCEDURE [ networkStream: Stream.Handle, direction: Direction, clientData: LONG UNSPECIFIED, deleteProc: PROC [Stream.Handle, LONG UNSPECIFIED]] RETURNS [NSDataStream.Handle] = { networkStream.setTimeout[networkStream, defaultTimeout]; RETURN[ InternalCreate[networkStream, direction, NewID[], clientData, deleteProc]]}; DeleteProcedure: PROCEDURE [stream: NSDataStream.Handle] = BEGIN aborted: BOOLEAN ← FALSE; ds: DS ← DSFromHandle[stream]; notifiedOfAbort: BOOLEAN ← ds.notifiedOfAbort; ds.deleted ← TRUE; ActivateDS[ ds ! Aborted => {ds.notifiedOfAbort ← notifiedOfAbort; CONTINUE}]; IF ds.direction = receive AND ~ds.endOfStreamEncountered THEN Abort[stream]; notifiedOfAbort ← ds.notifiedOfAbort; IF ds.baseStream # NIL THEN BEGIN IF ds.deleteProc # NIL THEN ds.deleteProc[ds.baseStream, ds.clientData]; IF ds.rejectWatcher # NIL THEN BEGIN Process.Abort[ds.rejectWatcher]; [] ← JOIN ds.rejectWatcher; ds.rejectWatcher ← NIL; END; IF ds.attentionWaiter # NIL THEN BEGIN Process.Abort[ds.attentionWaiter]; aborted ← JOIN ds.attentionWaiter; ds.attentionWaiter ← NIL; END; Close[ds ! Aborted => {aborted ← TRUE; CONTINUE}]; IF ds.deleteWithStatusProc # NIL THEN ds.deleteWithStatusProc[ds.baseStream, [ds.aborted # none]]; END; DeleteDS[@ds]; IF aborted AND NOT notifiedOfAbort THEN ERROR Aborted END; DeleteDS: PROCEDURE [ds: LONG POINTER TO DS] = {ds.seal ← busted; CourierInternal.longZone.FREE[ds]}; DSFromHandle: PROCEDURE [stream: NSDataStream.Handle] RETURNS [DS] = INLINE BEGIN OPEN ds: LOOPHOLE[stream, DS]; SELECT TRUE FROM (stream = NIL) => RETURN[NIL]; --that means it's a null ticket (ds.seal # okay) => ERROR InternalError; --somebody deleted it! ENDCASE => RETURN[@ds]; --give back the coerced handle END; --DSFromHandle EndStreamOp: ENTRY PROCEDURE [ds: DS] RETURNS [aborted: BOOLEAN] = BEGIN aborted ← (ds.streamOpProcess = NIL OR ds.streamOpProcess = noProcess); ds.streamOpProcess ← NIL END; ErrorAborted: PROCEDURE [ds: DS] = {ds.notifiedOfAbort ← TRUE; ERROR Aborted}; FillInStream: ENTRY PROCEDURE [cH: Courier.Handle] RETURNS [thisDS, otherDS: DS ← NIL, cancelled: BOOLEAN ← FALSE] = BEGIN ENABLE UNWIND => NULL; streamRequest, otherRequest: StreamRequest; FOR list: SRList ← srList, list.rest UNTIL list = NIL[SRList] DO streamRequest ← list.first; thisDS ← streamRequest.ds; WITH request: streamRequest SELECT FROM registeredWaiting => IF request.cH = cH THEN BEGIN otherRequest ← request.other; otherDS ← IF otherRequest # NIL AND otherRequest.type = registeredInUse THEN otherRequest.ds ELSE NIL; BROADCAST streamRequest.transition; EXIT END; registeredCancelled => IF request.cH = cH THEN {otherRequest ← NIL; cancelled ← TRUE; EXIT}; ENDCASE; REPEAT FINISHED => RETURN[thisDS: NIL] ENDLOOP; streamRequest.variant ← registeredInUse[cH.sH, otherRequest]; thisDS.baseStream ← cH.sH; thisDS.deleteWithStatusProc ← FreeStream; IF otherRequest # NIL OR cancelled THEN ReverseDirection[thisDS]; BROADCAST streamRequest.transition; END; FlushToEndOfStream: PROCEDURE [ds: DS, waitIfNecessary: BOOLEAN] = BEGIN sH: Stream.Handle ← ds.baseStream; IF NOT (ds.local OR waitIfNecessary) THEN sH.setTimeout[sH, 0]; BEGIN ENABLE Courier.Error, Stream.TimeOut => { ds.attentionByteReceivedInBand ← TRUE; CONTINUE}; array: PACKED ARRAY [0..1] OF Environment.Byte; why: Stream.CompletionCode; sst: Stream.SubSequenceType; firstTime: BOOLEAN ← TRUE; -- workaround for lost first attention byte. UNTIL ds.endOfStreamEncountered OR ds.attentionByteReceivedInBand DO [, why, sst] ← sH.get[sH, [@array, 1, 2], sH.options]; SELECT why FROM normal => IF firstTime THEN { firstTime ← FALSE; IF NOT ds.local AND CheckForOutOfBandAttention[ds] THEN sH.setTimeout[sH, 0]}; endRecord, endOfStream => ds.endOfStreamEncountered ← TRUE; sstChange => IF sst # dataSST THEN ds.attentionByteReceivedInBand ← TRUE; attention => { [] ← sH.get[sH, [@array, 1, 2], sH.options]; ds.attentionByteReceivedInBand ← TRUE}; ENDCASE; ENDLOOP END; END; FreeStream: ENTRY PROCEDURE [stream: Stream.Handle, status: Status] = BEGIN ENABLE UNWIND => NULL; FOR list: SRList ← srList, list.rest UNTIL list = NIL[SRList] DO streamRequest: StreamRequest ← list.first; WITH request: streamRequest SELECT FROM registeredInUse => BEGIN IF request.stream # stream THEN LOOP; streamRequest.variant ← registeredFree[status]; BROADCAST streamRequest.transition; RETURN; END; inUse => BEGIN IF request.stream # stream THEN LOOP; streamRequest.variant ← free[stream]; FOR list: SRList ← srList, list.rest UNTIL list = NIL[SRList] DO IF list.first # streamRequest AND list.first.id = request.id THEN DO SELECT list.first.type FROM free => {BROADCAST streamRequest.transition; RETURN}; inUse => DO WAIT list.first.transition; WITH sr: list.first SELECT FROM free => BEGIN stream.delete[stream]; sr.stream.delete[sr.stream]; FreeStreamRequest[streamRequest]; FreeStreamRequest[list.first]; RETURN END; ENDCASE; ENDLOOP; ENDCASE => EXIT ENDLOOP; ENDLOOP; InternalError[] END; ENDCASE; ENDLOOP; InternalError[] END; FreeStreamRequest: PROCEDURE [streamRequest: StreamRequest] = { other: StreamRequest ← WITH request: streamRequest SELECT FROM registeredWaiting => request.other, registeredInUse => request.other, ENDCASE => NIL; IF other # NIL THEN WITH request: other SELECT FROM registeredWaiting => request.other ← NIL; registeredInUse => request.other ← NIL; ENDCASE; srListProcs.Free[ srListProcs.Find[@srList, streamRequest], CourierInternal.longZone]}; GetActiveDS: PROCEDURE [ stream: NSDataStream.Handle, notificationOnly: BOOLEAN ← FALSE] RETURNS [ds: DS] = INLINE { ActivateDS[(ds ← DSFromHandle[stream]), notificationOnly]}; GetProcedure: PROCEDURE [ stream: NSDataStream.Handle, block: Stream.Block, options: Stream.InputOptions] RETURNS [ bytesTransferred: CARDINAL, why: Stream.CompletionCode, sst: Stream.SubSequenceType] = BEGIN BEGIN abortSeen: BOOLEAN ← FALSE; ds: DS ← GetActiveDS[stream]; IF ds.endOfStreamEncountered THEN { IF options.signalEndOfStream THEN ERROR Stream.EndOfStream[block.startIndex]; bytesTransferred ← 0; why ← endOfStream; sst ← dataSST} ELSE BEGIN CheckForAbortBefore[ds]; BEGIN ENABLE BEGIN Courier.Error --[truncatedTransfer] --, Stream.TimeOut => HandleProblem[DSFromHandle[stream], FALSE]; Aborted, Stream.EndOfStream => CheckForAbortAfter[ds, abortSeen ! Aborted => CONTINUE] END; DO [bytesTransferred, why, sst] ← ds.baseStream.get[ ds.baseStream, block, ds.baseStream.options ! ABORTED => {abortSeen ← TRUE; GOTO SeenAbort}]; SELECT why FROM normal => EXIT; sstChange => IF sst # dataSST THEN GOTO unexpectedPacketSubtype; endRecord, endOfStream => IF bytesTransferred = block.stopIndexPlusOne - block.startIndex THEN {why ← normal; ds.endOfStreamEncountered ← TRUE; EXIT} ELSE { why ← endOfStream; ds.endOfStreamEncountered ← TRUE; IF options.signalEndOfStream THEN ERROR Stream.EndOfStream[block.startIndex + bytesTransferred] ELSE RETURN}; attention => ProcessAbort[ds, TRUE]; -- should not return ENDCASE => ERROR InternalError[]; REPEAT unexpectedPacketSubtype => HandleProblem[ds, TRUE]; SeenAbort => NULL; ENDLOOP END; CheckForAbortAfter[ds, abortSeen]; IF abortSeen THEN ERROR ABORTED; END END END; GetStream: ENTRY PROCEDURE [ds: DS, dontWait: BOOLEAN ← FALSE] RETURNS [stream: Stream.Handle] = BEGIN ENABLE UNWIND => NULL; streamRequest: StreamRequest; FOR ref: SRRef ← @srList, @ref.rest UNTIL ref↑ = NIL[SRList] DO streamRequest ← ref.first; IF streamRequest.id = ds.id THEN WITH request: streamRequest SELECT FROM registered => BEGIN cH: Courier.Handle ← request.cH; IF request.direction = ds.direction THEN ERROR InternalError[]; DeleteDS[@streamRequest.ds]; streamRequest.ds ← ds; streamRequest.variant ← registeredWaiting[immediateTicket, cH]; BROADCAST streamRequest.transition; EXIT END; registeredWaiting => IF request.other = NIL THEN EXIT ELSE ERROR InternalError[]; registeredInUse => IF request.other = NIL THEN EXIT ELSE ERROR InternalError[]; inUse => EXIT; requested => IF request.direction = ds.direction THEN EXIT ELSE BEGIN streamPair: StreamPair = GetStreamPair[]; newStreamRequest: StreamRequest ← NewStreamRequest[ds.id, ds]; streamRequest.ds.local ← ds.local ← TRUE; newStreamRequest.variant ← inUse[streamPair[this]]; srList ← srListProcs.New[newStreamRequest, srList]; streamRequest.variant ← inUse[streamPair[that]]; BROADCAST streamRequest.transition; RETURN[streamPair[this]] END; registeredCancelled => EXIT; ENDCASE => ERROR; REPEAT FINISHED => BEGIN streamRequest ← NewStreamRequest[ds.id, ds, ds.direction]; srList ← srListProcs.New[streamRequest, srList] END ENDLOOP; IF dontWait THEN RETURN[NIL]; DO WITH request: streamRequest SELECT FROM requested, registeredWaiting => WAIT streamRequest.transition; inUse => RETURN[request.stream]; registeredInUse => RETURN[request.stream]; registeredCancelled => { status: Status ← request.status; streamRequest.variant ← registeredFree[status]; BROADCAST streamRequest.transition; ErrorAborted[request.ds]}; registeredFree => ErrorAborted[request.ds]; ENDCASE => InternalError[]; ENDLOOP; END; GetStreamPair: PROCEDURE RETURNS [streams: StreamPair] = { [streams[this], streams[that]] ← StreamCouple.Create[]}; HandleFromDS: PROCEDURE [ds: DS] RETURNS [NSDataStream.Handle] = INLINE { RETURN[LOOPHOLE[ds]]}; HandleProblem: PROCEDURE [ds: DS, streamOK: BOOLEAN ← TRUE] = BEGIN IF NOT streamOK THEN ds.attentionByteReceivedInBand ← TRUE; IF ds.aborted = none THEN IF streamOK THEN InitiateAbort[ds] ELSE ds.aborted ← bySender; ErrorAborted[ds] END; InitiateAbort: PROCEDURE [ds: DS] = BEGIN ENABLE Stream.TimeOut, Courier.Error => CONTINUE; IF ds.aborted # none OR ds.endOfStreamEncountered THEN RETURN; ds.aborted ← IF ds.direction = send THEN bySender ELSE byReceiver; ds.baseStream.setSST[ds.baseStream, dataSST]; ds.baseStream.sendAttention[ds.baseStream, 1] END; InternalCreate: PROCEDURE [ stream: Stream.Handle, direction: Direction, id: ID, clientData: LONG UNSPECIFIED ← LONG[NIL], deleteProc: PROC [Stream.Handle, LONG UNSPECIFIED] ← NIL] RETURNS [NSDataStream.Handle] = BEGIN ds: DS ← CourierInternal.longZone.NEW[ DSObject ← [ object: Stream.defaultObject, baseStream: stream, id: id, clientData: clientData, direction: direction]]; [ds.object.get, ds.object.put] ← AssignGetAndPutProcs[direction]; ds.object.delete ← LOOPHOLE[DeleteProcedure]; ds.deleteProc ← deleteProc; RETURN[[@ds.object]] END; NewID: ENTRY PROCEDURE RETURNS [ID] = {RETURN[currentID ← currentID + 1]}; NewStreamRequest: PROCEDURE [ id: ID, ds: DS ← NIL, direction: Direction ← send] RETURNS [streamRequest: StreamRequest] = { streamRequest ← CourierInternal.longZone.NEW[ StreamRequestObject ← [id, ds, , requested[direction]]]}; ProcessAbort: PROCEDURE [ds: DS, receivedInBand: BOOLEAN] = BEGIN ds.aborted ← IF ds.direction = send THEN byReceiver ELSE bySender; IF receivedInBand THEN ds.attentionByteReceivedInBand ← TRUE; ErrorAborted[ds] END; PutProcedure: PROCEDURE [ stream: NSDataStream.Handle, block: Stream.Block, endPhysicalRecord: BOOLEAN] = BEGIN abortSeen: BOOLEAN ← FALSE; ds: DS ← GetActiveDS[stream]; IF block.stopIndexPlusOne = block.startIndex THEN RETURN; CheckForAbortBefore[ds]; BEGIN ENABLE BEGIN Courier.Error, Stream.TimeOut => HandleProblem[DSFromHandle[stream], FALSE]; Aborted => CheckForAbortAfter[ds, abortSeen ! Aborted => CONTINUE] END; ds.baseStream.put[ds.baseStream, block, FALSE ! Stream.Attention => ProcessAbort[ds, FALSE]; ABORTED => {abortSeen ← TRUE; CONTINUE}] END; CheckForAbortAfter[ds, abortSeen]; IF abortSeen THEN ERROR ABORTED; END; RejectWatcher: PROCEDURE [ds: DS] RETURNS [received: BOOLEAN ← FALSE] = BEGIN ENABLE ABORTED => CONTINUE; [] ← ds.baseStream.getByte[ds.baseStream ! Courier.Error => IF errorCode = transportTimeout THEN GOTO abort ELSE CONTINUE; Stream.SSTChange, Stream.TimeOut => GOTO abort]; received ← TRUE; AnnounceAttention[ds, FALSE]; EXITS abort => NULL; END; ReverseDirection: INTERNAL PROCEDURE [ds: DS] = BEGIN ds.direction ← IF ds.direction = send THEN receive ELSE send; [ds.object.get, ds.object.put] ← AssignGetAndPutProcs[ds.direction] END; StartStreamOp: ENTRY PROCEDURE [ds: DS] RETURNS [aborted: BOOLEAN ← FALSE] = BEGIN IF ds.attentionByteReceivedOutOfBand OR ds.courierRejectSeen THEN RETURN[TRUE]; ds.streamOpProcess ← Process.GetCurrent[] END; SynchronizeAttentions: PROCEDURE [ds: DS] = BEGIN DO IF ds.attentionByteReceivedInBand AND NOT ds.attentionByteReceivedOutOfBand THEN { [] ← CheckForOutOfBandAttention[ds]; EXIT} ELSE SELECT ds.direction FROM send => IF ds.attentionByteReceivedOutOfBand AND NOT ds.attentionByteReceivedInBand THEN FlushToEndOfStream[ds, FALSE] ELSE EXIT; receive => IF NOT (ds.attentionByteReceivedInBand OR ds.endOfStreamEncountered) THEN FlushToEndOfStream[ds, NOT ds.attentionByteReceivedOutOfBand] ELSE EXIT; ENDCASE ENDLOOP END; TerminateDataStream: PROCEDURE [ds: DS] = BEGIN Abort[HandleFromDS[ds]]; HandleFromDS[ds].delete[HandleFromDS[ds] ! Aborted => CONTINUE] END; WaitForDeletion: ENTRY PROCEDURE [ds: DS] RETURNS [status: Status] = BEGIN ENABLE UNWIND => NULL; streamRequest: StreamRequest; FOR list: SRList ← srList, list.rest UNTIL list = NIL[SRList] DO streamRequest ← list.first; IF ds = streamRequest.ds THEN EXIT REPEAT FINISHED => ERROR InternalError[] ENDLOOP; DO WITH request: streamRequest SELECT FROM registeredInUse, registeredCancelled => WAIT streamRequest.transition; registeredFree => {status ← request.status; EXIT}; ENDCASE => InternalError[]; ENDLOOP; FreeStreamRequest[streamRequest]; END; srListProcs ← [Find, Free, New]; END. LOG ( date - person - action ) 20-Dec-84 15:31:10 - AOF - Post Klamath 27-May-85 12:07:32 - AOF - Starting rewrite 25-Nov-85 7:26:41 - AOF - AR# 7888