DIRECTORY
Rope USING [ROPE],
Process USING [SetTimeout, MsecToTicks],
CommUtilDefs USING [CopyLong],
PupDefs USING [DataWordsPerPupBuffer, GetFreePupBuffer, ReturnFreePupBuffer, GetPupContentsBytes, SetPupContentsBytes, PupAddress, PupBuffer, ReturnPup, PupSocket, PupSocketMakeFull, PupSocketDestroy, SecondsToTocks],
PupTypes USING [maxDataBytesPerGatewayPup, fillInPupAddress],
EFTPDefs USING [EFTPTimeOut, EFTPAbortCode];
EFTPRecv:
MONITOR
IMPORTS Process, CommUtilDefs, EFTPDefs, PupDefs EXPORTS EFTPDefs =
BEGIN OPEN PupDefs, EFTPDefs;
EFTPAlreadyReceiving: PUBLIC ERROR = CODE;
EFTPNotReceiving: PUBLIC ERROR = CODE;
EFTPEndReceiving: PUBLIC ERROR = CODE;
EFTPTroubleReceiving: PUBLIC ERROR [e: EFTPAbortCode, s: Rope.ROPE] = CODE;
recv: CONDITION;
recverStop: BOOLEAN;
recverFork: PROCESS;
receiveSocket: PupSocket ← NIL;
recvStarted: BOOLEAN;
recvHim: PupAddress;
receiveSeqNumber: CARDINAL;
receivePupBuffer: PupBuffer ← NIL;
receiveGobbled: CARDINAL ← 0; -- in words !
EFTPSetRecvTimeout:
PUBLIC
PROCEDURE [ms:
CARDINAL] =
BEGIN Process.SetTimeout[@recv, Process.MsecToTicks[ms]]; END;
EFTPOpenForReceiving:
PUBLIC
PROCEDURE [me: PupAddress]
RETURNS [PupAddress] =
BEGIN
EFTPOpenCheck[me];
BEGIN ENABLE UNWIND => EFTPKillReceiver[]; EFTPOpenLocked[]; END;
RETURN[recvHim];
END;
EFTPOpenCheck:
ENTRY
PROCEDURE [me: PupAddress] =
BEGIN
ENABLE UNWIND => NULL;
IF receiveSocket # NIL THEN ERROR EFTPAlreadyReceiving;
recvStarted ← FALSE;
recverStop ← FALSE;
receiveSocket ← PupSocketMakeFull[
me, PupTypes.fillInPupAddress, SecondsToTocks[1]];
recverFork ← FORK EFTPRecvDataProcess[];
receiveSeqNumber ← 0;
END;
EFTPOpenLocked:
ENTRY
PROCEDURE =
BEGIN
ENABLE UNWIND => NULL;
UNTIL receivePupBuffer #
NIL
DO
THROUGH [0..10) UNTIL receivePupBuffer # NIL DO WAIT recv; ENDLOOP;
IF receivePupBuffer # NIL THEN EXIT;
SIGNAL EFTPTimeOut; -- 10*1 sec
ENDLOOP;
receiveSocket.setRemoteAddress[recvHim];
END;
EFTPGetBlock:
PUBLIC
ENTRY
PROCEDURE [p:
LONG
POINTER, l:
CARDINAL]
RETURNS [CARDINAL] =
BEGIN
ENABLE UNWIND => NULL;
n: CARDINAL; -- bytes to transfer
pupLength: CARDINAL;
IF receiveSocket = NIL THEN ERROR EFTPNotReceiving;
IF (l MOD 2) = 1 THEN l ← l - 1;
UNTIL receivePupBuffer #
NIL
DO
WAIT recv;
IF receivePupBuffer # NIL THEN EXIT;
SIGNAL EFTPTimeOut; -- 1*1 sec
ENDLOOP;
IF receivePupBuffer.pupType = eEnd
OR receivePupBuffer.pupType = eAbort
THEN
BEGIN
b: PupBuffer ← receivePupBuffer;
receivePupBuffer ← NIL;
EFTPSendAck[b, receiveSeqNumber];
RETURN WITH ERROR EFTPEndReceiving;
END;
pupLength ← GetPupContentsBytes[receivePupBuffer];
n ← pupLength - (receiveGobbled*2);
n ← MIN[l, n];
CommUtilDefs.CopyLong[
from: @receivePupBuffer.pupWords[receiveGobbled], nwords: (n + 1)/2, to: p];
receiveGobbled ← receiveGobbled + (n/2);
IF l > n
OR receiveGobbled*2 = pupLength
THEN
BEGIN -- done with this buffer, use it to send back the ack
b: PupBuffer ← receivePupBuffer;
receivePupBuffer ← NIL;
EFTPSendAck[b, receiveSeqNumber];
receiveSeqNumber ← receiveSeqNumber + 1;
END;
RETURN[n];
END;
EFTPFinishReceiving:
PUBLIC
PROCEDURE =
BEGIN
IF receiveSocket = NIL THEN ERROR EFTPNotReceiving;
EFTPKillReceiver[];
END;
EFTPAbortReceiving:
PUBLIC
PROCEDURE [s:
STRING] =
BEGIN
IF receiveSocket = NIL THEN RETURN;
EFTPAbortReceivingLocked[s];
EFTPKillReceiver[];
END;
EFTPAbortReceivingLocked:
ENTRY
PROCEDURE [s:
STRING] =
BEGIN
end: CARDINAL;
b: PupBuffer;
ec: EFTPAbortCode = eftpExternalReceiverAbort;
b ← GetFreePupBuffer[];
b.pupType ← eAbort;
b.pupID ← [0, receiveSeqNumber];
b.pupWords[0] ← LOOPHOLE[ec];
end ←
MIN[
s.length + 2, 2*DataWordsPerPupBuffer[],
PupTypes.maxDataBytesPerGatewayPup];
FOR i: CARDINAL ← 2, i + 1 UNTIL i = end DO b.pupChars[i] ← s[i - 2]; ENDLOOP;
SetPupContentsBytes[b, end];
receiveSocket.put[b];
END;
internal procedures
EFTPRecvDataProcess:
PROCEDURE =
BEGIN
b: PupBuffer;
UNTIL recverStop
DO
b ← receiveSocket.get[]; IF b # NIL THEN EFTPRecvDataLocked[b]; ENDLOOP;
END;
EFTPRecvDataLocked:
ENTRY
PROCEDURE [b: PupBuffer] =
BEGIN -- better not SIGNAL here
IF b.pupType
NOT
IN [eData..eAbort]
THEN
-- not EFTP type packet
BEGIN ReturnFreePupBuffer[b]; GOTO AllDone; END;
IF b.source = recvHim
AND b.pupType = eData
AND b.pupID =
[0, receiveSeqNumber - 1] THEN
BEGIN -- duplicate: use the buffer to send back the ack
EFTPSendAck[b, receiveSeqNumber - 1];
GOTO AllDone;
END;
IF b.source # recvHim
OR b.pupID # [0, receiveSeqNumber]
THEN
BEGIN -- from wrong place
IF ~recvStarted
AND b.pupID = [0, 0]
THEN
BEGIN -- we are listening, and this is the first packet
IF receivePupBuffer =
NIL
THEN
-- take this new user
BEGIN
receivePupBuffer ← b;
receiveGobbled ← 0;
recvStarted ← TRUE;
recvHim ← receivePupBuffer.source;
NOTIFY recv;
GOTO AllDone;
END;
END;
IF b.pupType = eData
THEN
BEGIN -- use the buffer to send back an abort
ec: EFTPAbortCode;
ec ←
IF b.pupID # [0, 0] THEN eftpOutOfSyncAbort ELSE eftpReceiverBusyAbort;
b.pupType ← eAbort;
b.pupWords[0] ← LOOPHOLE[ec];
ReturnPup[b, eAbort, 2]; -- should send some text
GOTO AllDone;
END;
ReturnFreePupBuffer[b];
GOTO AllDone;
END;
must be the one we want
IF receivePupBuffer #
NIL
THEN
BEGIN
Oops, retransmission. We will ack when we finish reading this buffer.
ReturnFreePupBuffer[b];
GOTO AllDone;
END;
receivePupBuffer ← b;
receiveGobbled ← 0;
NOTIFY recv;
EXITS AllDone => NULL;
END;
EFTPSendAck:
PROCEDURE [b: PupBuffer, id:
CARDINAL] =
BEGIN
b.pupType ← eAck;
b.pupID ← [0, id];
SetPupContentsBytes[b, 0];
receiveSocket.put[b];
END;
EFTPKillReceiver:
PROCEDURE =
BEGIN
recverStop ← TRUE;
JOIN recverFork;
IF receiveSocket # NIL THEN PupSocketDestroy[receiveSocket];
receiveSocket ← NIL;
IF receivePupBuffer #
NIL
THEN
BEGIN
b: PupBuffer ← receivePupBuffer;
receivePupBuffer ← NIL;
ReturnFreePupBuffer[b];
END;
END;
initialization
EFTPSetRecvTimeout[1000];
END.