PupSocketImpl.mesa
Copyright © 1986 by Xerox Corporation. All rights reserved.
Hal Murray, November 17, 1986 4:13:21 pm PST
Doug Wyatt, June 10, 1986 2:54:21 pm PDT
DIRECTORY
Allocator USING [NHeaderP, NormalHeader],
Basics USING [BITAND, bytesPerWord, LowHalf],
BasicTime USING [GetClockPulses],
Booting USING [switches],
Checksum USING [ComputeChecksum],
CommBuffer USING [Direction, Encapsulation],
CommBufferExtras USING [gapNoList],
CommDriver USING [AllocBuffer, Buffer, BufferObject, FreeBuffer, GetNetworkChain, InsertReceiveProc, Network],
DebuggerSwap USING [CallDebugger],
Endian USING [bytesPerHWord, FFromCard, FWORD, HWORD],
PrincOpsUtils USING [LongCopy],
Process USING [Detach, DisableTimeout, EnableAborts, MsecToTicks, Pause, priorityBackground, SecondsToTicks, SetPriority, SetTimeout, Ticks],
Pup USING [Address, allHosts, allNets, Host, nullAddress, nullHost, nullNet, nullSocket, Socket],
PupBuffer USING [abortOverheadBytes, Buffer, BufferObject, ByteAlloc, ByteIndex, errorOverheadBytes, FWordIndex, HWordIndex, noChecksum, RoundUpForChecksum, WordsWithoutChecksum],
PupInternal USING [Route],
PupName USING [],
PupSocket USING [dontWait, Milliseconds, waitForever],
PupSocketBackdoor USING [ReceiveProc],
PupType USING [bytesInPupHeader, bytesOfPupOverhead, CardFromSocket, ErrorCode, HeaderWithoutChecksum, SocketFromCard],
PupWKS USING [rpc],
RefText USING [ObtainScratch, ReleaseScratch],
Rope USING [Fetch, FromRefText, Length, ROPE],
SafeStorage USING [EnableFinalization, EstablishFinalization, FinalizationQueue, FQEmpty, FQNext, NewFQ];
PupSocketImpl:
CEDAR
MONITOR
LOCKS socket
USING socket: Socket
IMPORTS Basics, BasicTime, Booting, Checksum, CommDriver, DebuggerSwap, Endian, PrincOpsUtils, Process, PupBuffer, PupInternal, PupType, RefText, Rope, SafeStorage
EXPORTS PupInternal, PupName, PupSocket, PupSocketBackdoor = {
Buffer: TYPE = PupBuffer.Buffer;
Direction: TYPE = CommBuffer.Direction;
ErrorCode: TYPE = PupType.ErrorCode;
FWORD: TYPE = Endian.FWORD;
HeaderWithoutChecksum: TYPE = PupType.HeaderWithoutChecksum;
HWORD: TYPE = Endian.HWORD;
Milliseconds: TYPE = PupSocket.Milliseconds;
Network: TYPE = CommDriver.Network;
ROPE: TYPE = Rope.ROPE;
ReceiveProc: TYPE = PupSocketBackdoor.ReceiveProc;
gapNoList: [0..16384) ~ CommBufferExtras.gapNoList;
gapSocket: [0..16384) ~ 10; -- b.ovh.gap=gapSocket while b is on the socket queue
bytesInPupHeader: NAT = PupType.bytesInPupHeader;
bytesOfPupOverhead: NAT = PupType.bytesOfPupOverhead;
bytesPerWord: NAT = Basics.bytesPerWord;
bytesPerHWord: NAT = Endian.bytesPerHWord;
outOfLineChecksum:
BOOL ←
TRUE;
Hackery for performance tuning. "= FALSE" => no performance loss.
OutOfLineChecksum:
PROC [
cs: WORD ← 0, nWords: CARDINAL, p: LONG POINTER]
RETURNS [checksum: WORD] = TRUSTED {
RETURN[Checksum.ComputeChecksum[cs, nWords, p]]; };
globalLock: Socket ← NEW[Object];
Socket:
TYPE =
REF Object;
While a buffer is on the input queue, b.ovh.socket is NIL so that dropped sockets will get finalized. While the client has the buffer (Alloc, Get, GetDirect) b.ovh.socket is filled in so we can fixup the socket counters if the buffer gets dropped.
Object:
PUBLIC
TYPE =
MONITORED
RECORD [
local, remote: Pup.Address,
sendBuffersInUse: CARDINAL ← 0,
sendBuffersAlloc: CARDINAL,
recvBuffersInUse: CARDINAL ← 0,
recvBuffersAlloc: CARDINAL,
sendChecksum: BOOL ← TRUE,
recvChecksum: BOOL ← TRUE,
dead, noErrors: BOOL ← FALSE,
dontWait: BOOL ← FALSE,
waitForInput: CONDITION,
waitForSendBuffer: CONDITION,
waitForRecvBuffer: CONDITION,
routing: Routing ← NIL, -- NIL => no cached route
firstInput, lastInput: Buffer ← NIL,
next: Socket ← NIL, -- Only used when a server socket gets captured
proc: ReceiveProc ← NIL,
user: REF ANY ];
Routing: TYPE = REF RoutingInfo;
RoutingInfo:
TYPE =
RECORD [
network: Network,
immediate: Pup.Host,
encap: CommBuffer.Encapsulation ];
Beware: See the comments near Driver.Buffer
RealDriverBuffer:
PROC [b: Buffer]
RETURNS [CommDriver.Buffer] =
TRUSTED
INLINE {
nhp: Allocator.NHeaderP ← LOOPHOLE[b, Allocator.NHeaderP]-SIZE[Allocator.NormalHeader];
nhp.type ← CODE[CommDriver.BufferObject];
b.ovh.direction ← none;
b.ovh.socket ← NIL;
RETURN[LOOPHOLE[b]]; };
TempDriverBuffer:
PROC [b: Buffer]
RETURNS [CommDriver.Buffer] =
TRUSTED
INLINE {
RETURN[LOOPHOLE[b]]; };
RealPupBuffer:
PROC [b: CommDriver.Buffer, d: Direction]
RETURNS [Buffer] =
TRUSTED
INLINE {
nhp: Allocator.NHeaderP ← LOOPHOLE[b, Allocator.NHeaderP]-SIZE[Allocator.NormalHeader];
nhp.type ← CODE[PupBuffer.BufferObject];
b.ovh.next ← NIL;
b.ovh.direction ← d;
RETURN[LOOPHOLE[b]]; };
Next:
PROC [b: Buffer]
RETURNS [Buffer] =
TRUSTED
INLINE {
RETURN[LOOPHOLE[b.ovh.next]]; };
Errors
SocketNotWellKnown: PUBLIC ERROR = CODE;
Creation and parameter setting
IsWellKnown:
PUBLIC
PROC [local: Pup.Socket]
RETURNS [yes:
BOOL] = {
socketNumber: LONG CARDINAL = PupType.CardFromSocket[local];
IF local = Pup.nullSocket THEN RETURN[FALSE];
IF socketNumber >= maxWellKnownSockets THEN RETURN[FALSE];
RETURN[TRUE];
};
CreateServer:
PUBLIC PROC [
local: Pup.Socket,
sendBuffers: NAT ← 1,
recvBuffers: NAT ← 5,
getTimeout: Milliseconds ← 10000 ]
RETURNS [socket: Socket] = {
socketNumber: LONG CARDINAL = PupType.CardFromSocket[local];
IF local = Pup.nullSocket THEN ERROR SocketNotWellKnown;
IF socketNumber >= maxWellKnownSockets THEN ERROR SocketNotWellKnown;
socket ← CreateCommon[local, Pup.nullAddress, sendBuffers, recvBuffers, getTimeout];
};
CreateEphemeral:
PUBLIC PROC [
remote: Pup.Address,
sendBuffers: NAT ← 1,
recvBuffers: NAT ← 5,
getTimeout: Milliseconds ← 10000 ]
RETURNS [socket: Socket] = {
socket ← CreateCommon[Pup.nullSocket, remote, sendBuffers, recvBuffers, getTimeout];
AddSocket assigns a local socket number if socket.local.socket = nullSocket.
};
CreateCommon:
PROC [
local: Pup.Socket,
remote: Pup.Address,
sendBuffers: NAT,
recvBuffers: NAT,
getTimeout: Milliseconds ]
RETURNS [socket: Socket] = {
network: Network;
socket ←
NEW [Object ← [
local: [Pup.nullNet, Pup.nullHost, local],
remote: remote,
sendBuffersAlloc: sendBuffers,
recvBuffersAlloc: recvBuffers ] ];
network ← PupInternal.Route[remote].network;
IF network = NIL THEN network ← CommDriver.GetNetworkChain[];
IF network #
NIL
THEN {
socket.local.net ← network.pup.net;
socket.local.host ← network.pup.host; };
TRUSTED {
Process.EnableAborts[@socket.waitForInput];
Process.EnableAborts[@socket.waitForSendBuffer];
Process.EnableAborts[@socket.waitForRecvBuffer]; };
SetGetTimeout[socket, getTimeout];
CreateInner[globalLock, socket];
SafeStorage.EnableFinalization[socket];
};
socketsCreated: INT ← 0;
CreateInner:
ENTRY
PROC [socket: Socket, new: Socket] = {
ENABLE UNWIND => NULL;
Beware: socket is the global lock, new is the real socket
AddSocket[new];
socketsCreated ← socketsCreated.SUCC;
};
SetRemoteAddress:
PUBLIC PROC [socket: Socket, remote: Pup.Address] = {
socket.remote ← remote;
};
GetRemoteAddress:
PUBLIC PROC [socket: Socket]
RETURNS [Pup.Address] = {
RETURN[socket.remote];
};
GetLocalAddress:
PUBLIC PROC [socket: Socket]
RETURNS [Pup.Address] = {
RETURN[socket.local];
};
SetGetTimeout:
PUBLIC PROC [socket: Socket, ms: Milliseconds] = {
socket.dontWait ← FALSE;
SELECT ms
FROM
PupSocket.dontWait => socket.dontWait ← TRUE;
PupSocket.waitForever =>
TRUSTED {
Process.DisableTimeout[@socket.waitForInput]; };
<
CARDINAL.
LAST =>
TRUSTED {
Process.SetTimeout[@socket.waitForInput, Process.MsecToTicks[ms] ]; };
ENDCASE =>
TRUSTED {
Process.SetTimeout[@socket.waitForInput, Process.SecondsToTicks[ms/1000] ]; };
};
SetSoftwareChecksumming:
PUBLIC PROC [socket: Socket, send, recv:
BOOL] = {
socket.sendChecksum ← send;
socket.recvChecksum ← recv;
};
SetNoErrors:
PUBLIC PROC [socket: Socket] = {
socket.noErrors ← TRUE;
};
Kick:
PUBLIC
ENTRY
PROC [socket: Socket] = {
BROADCAST socket.waitForInput;
};
Destroy:
PUBLIC
ENTRY
PROC [socket: Socket] = {
BROADCAST socket.waitForInput;
socket.dead ← TRUE;
UNTIL socket.firstInput =
NIL
DO
b: Buffer ← socket.firstInput;
socket.firstInput ← Next[b];
b.ovh.next ← NIL; -- DKW: just to be careful ...
IF b.ovh.gap#gapSocket
THEN
DebuggerSwap.CallDebugger["Clobbered buffer in socket queue!"];
b.ovh.gap ← gapNoList; -- DKW: b now removed from the queue
socket.recvBuffersInUse ← socket.recvBuffersInUse - 1;
CommDriver.FreeBuffer[RealDriverBuffer[b]];
ENDLOOP;
socket.lastInput ← NIL; -- Help Buffer Finalization
socket.routing ← NIL;
Now just drop it on the floor
};
DestroyInner:
ENTRY
PROC [socket: Socket, old: Socket] = {
Beware: socket is the global lock, old is the real socket
RemoveSocket[old];
};
Sending
AllocBuffer:
PUBLIC
PROC [socket: Socket]
RETURNS [b: Buffer] = {
InnerAllocBuffer[socket];
b ← RealPupBuffer[CommDriver.AllocBuffer[], send];
b.ovh.network ← NIL;
b.ovh.socket ← socket; -- Needed for Finalization
};
InnerAllocBuffer:
ENTRY
PROC [socket: Socket] = {
Needed so that the call to CommDriver.AllocBuffer is outside the ML.
ENABLE UNWIND => NULL;
UNTIL socket.sendBuffersInUse < socket.sendBuffersAlloc
DO
WAIT socket.waitForSendBuffer;
ENDLOOP;
socket.sendBuffersInUse ← socket.sendBuffersInUse + 1;
};
SetUserBytes:
PUBLIC PROC [b: Buffer, bytes: PupBuffer.ByteIndex] = {
boundsCheck: PupBuffer.ByteIndex ← bytes; -- In case somebody is using TRUSTED.
b.byteLength ← bytes + bytesOfPupOverhead;
};
SetUserHWords:
PUBLIC PROC [b: Buffer, hWords: PupBuffer.HWordIndex] = {
boundsCheck: PupBuffer.HWordIndex ← hWords;
b.byteLength ← hWords*SIZE[HWORD, bytesPerWord] + bytesOfPupOverhead;
};
SetUserFWords:
PUBLIC PROC [b: Buffer, fWords: PupBuffer.FWordIndex] = {
boundsCheck: PupBuffer.FWordIndex ← fWords;
b.byteLength ← fWords*SIZE[FWORD, bytesPerWord] + bytesOfPupOverhead;
};
SetUserSize:
PUBLIC PROC [b: Buffer, sizeUnits:
NAT] = {
boundsCheck: PupBuffer.ByteIndex = sizeUnits*SIZE[WORD, bytesPerWord];
b.byteLength ← boundsCheck + bytesOfPupOverhead;
};
Broadcast:
PUBLIC PROC [socket: Socket, b: Buffer] = {
BroadcastInner[socket, b, socket.remote.socket];
};
BroadcastInner:
PROC [socket: Socket, b: Buffer, dest: Pup.Socket] = {
bytes: NAT = PupBuffer.RoundUpForChecksum[b.byteLength];
b.hopCount ← 0;
b.spares ← 0;
b.dest.host ← Pup.allHosts;
b.dest.socket ← dest;
b.source.socket ← socket.local.socket;
FOR network: Network ← CommDriver.GetNetworkChain[], network.next
UNTIL network =
NIL
DO
b.ovh.encap ← network.pup.getEncapsulation[network, Pup.allHosts];
b.dest.net ← network.pup.net;
b.source.net ← network.pup.net;
b.source.host ← network.pup.host;
SetChecksum[b];
network.pup.send[network, TempDriverBuffer[b], bytes];
ENDLOOP;
FreeBuffer[b];
};
Put:
PUBLIC PROC [socket: Socket, b: Buffer] = {
Send[socket, b, socket.remote];
};
Send:
PUBLIC PROC [socket: Socket, b: Buffer, dest: Pup.Address] = {
bytes: NAT = PupBuffer.RoundUpForChecksum[b.byteLength];
network: Network;
immediate: Pup.Host;
IF dest.net = Pup.allNets THEN { BroadcastInner[socket, b, dest.socket]; RETURN; };
[network, immediate] ← PupInternal.Route[dest];
IF network = NIL THEN { FreeBuffer[b]; RETURN; };
b.hopCount ← 0;
b.spares ← 0;
b.dest ← dest;
b.source ← socket.local;
b.ovh.encap ← network.pup.getEncapsulation[network, immediate];
TRUSTED {
words: NAT = PupBuffer.WordsWithoutChecksum[b.byteLength];
checksumLoc: LONG POINTER TO HWORD ← @b.byteLength + words;
IF socket.sendChecksum
THEN {
IF outOfLineChecksum
THEN
checksumLoc^ ← OutOfLineChecksum[0, words, @b.byteLength]
ELSE checksumLoc^ ← Checksum.ComputeChecksum[0, words, @b.byteLength]; }
ELSE checksumLoc^ ← PupBuffer.noChecksum; };
network.pup.send[network, TempDriverBuffer[b], bytes];
FreeBuffer[b];
};
Receiving
Get:
PUBLIC
ENTRY
PROC [socket: Socket]
RETURNS [b: Buffer] = {
ENABLE UNWIND => NULL;
IF socket.firstInput =
NIL
THEN {
IF socket.dontWait THEN RETURN[NIL];
IF socket.dead THEN RETURN[NIL];
WAIT socket.waitForInput; };
IF socket.firstInput = NIL THEN RETURN[NIL];
b ← socket.firstInput;
socket.firstInput ← Next[b];
b.ovh.next ← NIL; -- DKW: just to be careful ...
IF b.ovh.gap#gapSocket
THEN
DebuggerSwap.CallDebugger["Clobbered buffer in socket queue!"];
b.ovh.gap ← gapNoList; -- DKW: b now removed from the queue
b.ovh.socket ← socket; -- Needed for Finalization
};
GetUserBytes:
PUBLIC PROC [b: Buffer]
RETURNS [bytes: PupBuffer.ByteIndex] = {
bytes ← b.byteLength - bytesOfPupOverhead;
};
GetUserHWords:
PUBLIC PROC [b: Buffer]
RETURNS [hWords: PupBuffer.HWordIndex] = {
bytes: PupBuffer.ByteIndex ← b.byteLength - bytesOfPupOverhead;
hWords ← bytes/SIZE[HWORD]/bytesPerWord;
};
GetUserFWords:
PUBLIC PROC [b: Buffer]
RETURNS [fWords: PupBuffer.FWordIndex] = {
bytes: PupBuffer.ByteIndex ← b.byteLength - bytesOfPupOverhead;
fWords 𡤋ytes/SIZE[FWORD]/bytesPerWord;
};
GetUserSize:
PUBLIC
PROC [b: Buffer]
RETURNS [sizeUnits:
NAT] = {
bytes: PupBuffer.ByteIndex ← b.byteLength - bytesOfPupOverhead;
sizeUnits ← bytes/SIZE[WORD]/bytesPerWord;
};
FreeBuffer:
PUBLIC
PROC [b: Buffer] = {
socket: Socket ← NARROW[b.ovh.socket];
FreeBufferInner[socket, b];
};
FreeBufferInner:
ENTRY
PROC [socket: Socket, b: Buffer] =
INLINE {
ENABLE UNWIND => NULL;
SELECT b.ovh.direction
FROM
send => {
socket.sendBuffersInUse ← socket.sendBuffersInUse - 1;
NOTIFY socket.waitForSendBuffer; };
recv => {
socket.recvBuffersInUse ← socket.recvBuffersInUse - 1;
NOTIFY socket.waitForRecvBuffer; };
ENDCASE => ERROR;
CommDriver.FreeBuffer[RealDriverBuffer[b]];
};
FixupSourceAndDest:
PUBLIC
PROC [b: Buffer] = {
network: Network ← NARROW[b.ovh.network];
IF b.source.net = Pup.nullNet THEN b.source.net ← network.pup.net;
IF b.dest.net = Pup.nullNet THEN b.dest.net ← network.pup.net;
IF b.dest.host = Pup.allHosts THEN b.dest.host ← network.pup.host;
};
ReturnToSender:
PUBLIC
PROC [b: Buffer] = {
ReturnToSenderNoFree[b];
FreeBuffer[b];
};
ReturnToSenderNoFree:
PUBLIC
PROC [b: Buffer] = {
socket: Socket ← NARROW[b.ovh.socket];
network: Network ← NARROW[b.ovh.network];
bytes: NAT = PupBuffer.RoundUpForChecksum[b.byteLength];
temp: Pup.Address;
FixupSourceAndDest[b];
b.hopCount ← 0;
b.spares ← 0;
temp ← b.source;
b.source ← b.dest;
b.dest ← temp;
TRUSTED {
words: NAT = PupBuffer.WordsWithoutChecksum[b.byteLength];
checksumLoc: LONG POINTER TO HWORD ← @b.byteLength + words;
IF socket =
NIL
OR socket.sendChecksum
THEN {
IF outOfLineChecksum
THEN
checksumLoc^ ← OutOfLineChecksum[0, words, @b.byteLength]
ELSE checksumLoc^ ← Checksum.ComputeChecksum[0, words, @b.byteLength]; }
ELSE checksumLoc^ ← PupBuffer.noChecksum; };
IF b.source.host = Pup.allHosts
OR network.toBroadcast[network, TempDriverBuffer[b]] THEN RETURN;
network.pup.return[network, TempDriverBuffer[b], bytes];
};
SetChecksum:
PUBLIC
PROC [b: Buffer] =
TRUSTED {
words: NAT = PupBuffer.WordsWithoutChecksum[b.byteLength];
checksumLoc: LONG POINTER TO HWORD ← @b.byteLength + words;
IF outOfLineChecksum
THEN
checksumLoc^ ← OutOfLineChecksum[0, words, @b.byteLength]
ELSE checksumLoc^ ← Checksum.ComputeChecksum[0, words, @b.byteLength];
};
errorSize: NAT = SIZE[HeaderWithoutChecksum] + SIZE[ErrorCode] + SIZE[HWORD];
ReturnError:
PUBLIC PROC [b: Buffer, code: ErrorCode, rope:
ROPE] = {
ReturnErrorNoFree[b, code, rope];
FreeBuffer[b];
};
ReturnErrorNoFree:
PUBLIC PROC [b: Buffer, code: ErrorCode, rope:
ROPE] = {
socket: Socket ← NARROW[b.ovh.socket];
IF b.type = error THEN RETURN;
TRUSTED {
PrincOpsUtils.LongCopy[
from: @b.byteLength,
nwords: SIZE[HeaderWithoutChecksum],
to: @b.error.header]; };
b.type ← error;
b.error.code ← code;
b.error.options ← 0;
SetUserSize[b, errorSize];
IF rope =
NIL
THEN
SELECT code
FROM
badChecksum => rope ← "Bad Software Checksum";
noSocket => rope ← "No such Port";
resourceLimits => rope ← "Buffers full";
ENDCASE => NULL;
AppendRope[b, rope];
IF code = iAmNotAGateway
THEN {
-- Fill in our return address
network: Network = NARROW[b.ovh.network];
b.dest.net ← network.pup.net;
b.dest.host ← network.pup.host;
b.dest.socket ← Pup.nullSocket; };
ReturnToSenderNoFree[b];
};
Back door - for speed
newRoutingInfo: INT ← 0;
PutFirst:
PUBLIC PROC [socket: Socket, b: Buffer] = {
routing: Routing ← socket.routing; -- ATOMIC
new: RoutingInfo;
IF socket.remote.socket = Pup.nullSocket THEN GOTO CantGetThere;
[new.network, new.immediate] ← PupInternal.Route[socket.remote];
IF new.network = NIL THEN GOTO CantGetThere;
new.encap ← new.network.pup.getEncapsulation[new.network, new.immediate];
IF routing =
NIL
OR routing^ # new
THEN {
routing ← NEW[RoutingInfo ← new];
socket.routing ← routing; -- ATOMIC
newRoutingInfo ← newRoutingInfo.SUCC; };
PutAgain[socket, b];
EXITS CantGetThere => { socket.routing ← NIL; };
};
PutAgain:
PUBLIC PROC [socket: Socket, b: Buffer] = {
bytes: NAT = PupBuffer.RoundUpForChecksum[b.byteLength];
routing: Routing ← socket.routing; -- ATOMIC
network: Network;
IF routing = NIL THEN RETURN;
network ← routing.network;
IF network = NIL THEN RETURN;
b.ovh.encap ← routing.encap;
b.hopCount ← 0;
b.spares ← 0;
b.dest ← socket.remote;
b.source ← socket.local;
TRUSTED {
words: NAT = PupBuffer.WordsWithoutChecksum[b.byteLength];
checksumLoc: LONG POINTER TO HWORD ← @b.byteLength + words;
IF socket.sendChecksum
THEN {
IF outOfLineChecksum
THEN
checksumLoc^ ← OutOfLineChecksum[0, words, @b.byteLength]
ELSE checksumLoc^ ← Checksum.ComputeChecksum[0, words, @b.byteLength]; }
ELSE checksumLoc^ ← PupBuffer.noChecksum; };
IF network # NIL THEN network.pup.send[network, TempDriverBuffer[b], bytes];
};
Resend:
PUBLIC PROC [b: Buffer, checksum:
BOOL ←
TRUE] = {
bytes: NAT = PupBuffer.RoundUpForChecksum[b.byteLength];
network: Network ← NARROW[b.ovh.network];
IF network = NIL THEN RETURN;
TRUSTED {
words: NAT = PupBuffer.WordsWithoutChecksum[b.byteLength];
checksumLoc: LONG POINTER TO HWORD ← @b.byteLength + words;
IF checksum
THEN {
IF outOfLineChecksum
THEN
checksumLoc^ ← OutOfLineChecksum[0, words, @b.byteLength]
ELSE checksumLoc^ ← Checksum.ComputeChecksum[0, words, @b.byteLength]; }
ELSE checksumLoc^ ← PupBuffer.noChecksum; };
network.pup.send[network, TempDriverBuffer[b], bytes];
};
SetDirectReceive:
PUBLIC
ENTRY
PROC [socket: Socket, proc: ReceiveProc, user:
REF] = {
socket.proc ← proc;
socket.user ← user;
};
UseNormalPath:
PUBLIC
PROC [b: Buffer] = {
socket: Socket ← NARROW[b.ovh.socket];
UseNormalPathInner[socket, b];
};
UseNormalPathInner:
ENTRY
PROC [socket: Socket, b: Buffer] = {
b.ovh.socket ← NIL;
IF b.ovh.gap#gapNoList
THEN
DebuggerSwap.CallDebugger["Buffer already in a list!"];
b.ovh.gap ← gapSocket; -- DKW: b is now in the socket queue
IF socket.firstInput = NIL THEN socket.firstInput ← b
ELSE socket.lastInput.ovh.next ← b;
socket.lastInput ← b;
NOTIFY socket.waitForInput;
};
AllocRecvBuffer:
PUBLIC
PROC [socket: Socket]
RETURNS [b: Buffer] = {
InnerAllocRecvBuffer[socket];
b ← RealPupBuffer[CommDriver.AllocBuffer[], recv];
b.ovh.network ← NIL;
b.ovh.socket ← socket; -- Needed for Finalization
};
InnerAllocRecvBuffer:
ENTRY
PROC [socket: Socket] = {
Needed so that the call to CommDriver.AllocBuffer is outside the ML.
ENABLE UNWIND => NULL;
UNTIL socket.recvBuffersInUse < socket.recvBuffersAlloc
DO
WAIT socket.waitForRecvBuffer;
ENDLOOP;
socket.recvBuffersInUse ← socket.recvBuffersInUse + 1;
};
Rope Utilities
CopyRope:
PUBLIC PROC [b: Buffer, rope: Rope.
ROPE] = {
chars: PupBuffer.ByteIndex ← MIN[Rope.Length[rope], PupBuffer.ByteAlloc.LAST];
FOR i: PupBuffer.ByteAlloc
IN [0..chars)
DO
b.char[i] ← Rope.Fetch[rope, i];
ENDLOOP;
SetUserBytes[b, chars];
};
AppendRope:
PUBLIC PROC [b: Buffer, rope: Rope.
ROPE] = {
start: PupBuffer.ByteIndex ← GetUserBytes[b];
chars: PupBuffer.ByteIndex ← MIN[Rope.Length[rope], PupBuffer.ByteAlloc.LAST-start];
FOR i: PupBuffer.ByteAlloc
IN [0..chars)
DO
b.char[start+i] ← Rope.Fetch[rope, i];
ENDLOOP;
SetUserBytes[b, start+chars];
};
ExtractRope:
PUBLIC
PROC [b: Buffer]
RETURNS [rope: Rope.
ROPE] = {
bytes: NAT ← GetUserBytes[b];
text: REF TEXT ← RefText.ObtainScratch[bytes];
FOR i: PupBuffer.ByteAlloc
IN [0..bytes)
DO
text[i] ← b.char[i];
ENDLOOP;
text.length ← bytes;
rope ← Rope.FromRefText[text];
RefText.ReleaseScratch[text];
};
ExtractErrorRope:
PUBLIC
PROC [b: Buffer]
RETURNS [rope: Rope.
ROPE] = {
bytes: NAT ← GetUserBytes[b];
text: REF TEXT ← RefText.ObtainScratch[bytes];
IF bytes < PupBuffer.errorOverheadBytes THEN RETURN[NIL];
bytes ← bytes - PupBuffer.errorOverheadBytes;
FOR i:
NAT
IN [0..bytes)
DO
text[i] ← b.error.text[i];
ENDLOOP;
text.length ← bytes;
rope ← Rope.FromRefText[text];
RefText.ReleaseScratch[text];
};
ExtractAbortRope:
PUBLIC
PROC [b: Buffer]
RETURNS [rope: Rope.
ROPE] = {
bytes: NAT ← GetUserBytes[b];
text: REF TEXT ← RefText.ObtainScratch[bytes];
IF bytes < PupBuffer.abortOverheadBytes THEN RETURN[NIL];
bytes ← bytes - PupBuffer.abortOverheadBytes;
FOR i:
NAT
IN [0..bytes)
DO
text[i] ← b.abort.text[i];
ENDLOOP;
text.length ← bytes;
rope ← Rope.FromRefText[text];
RefText.ReleaseScratch[text];
};
Utilities
unique: LONG CARDINAL ← BasicTime.GetClockPulses[];
GetUniqueID:
PUBLIC
PROC
RETURNS [
FWORD] = {
new: LONG CARDINAL ← GetUniqueIDInner[globalLock];
RETURN[Endian.FFromCard[new]];
};
GetUniqueIDInner:
ENTRY
PROC [socket: Socket]
RETURNS [
LONG
CARDINAL] = {
unique ← unique.SUCC;
RETURN[unique];
};
IsThisMe:
PUBLIC
PROC [address: Pup.Address]
RETURNS [yes:
BOOL] = {
network: Network ← PupInternal.Route[address].network;
IF network = NIL THEN RETURN[FALSE];
IF network.pup.net # address.net THEN RETURN[FALSE];
IF network.pup.host # address.host THEN RETURN[FALSE];
RETURN[TRUE];
};
GetMyAddress:
PUBLIC
PROC
RETURNS [address: Pup.Address] = {
network: Network ← CommDriver.GetNetworkChain[];
IF network = NIL THEN RETURN[Pup.nullAddress];
address.net ← network.pup.net;
address.host ← network.pup.host;
address.socket ← Pup.nullSocket;
};
Keeping track of active sockets and assigning ephemeral socket numbers
Once upon a time, this was a simple linked list. The current structure means that you can do a lookup without any chaining. There are two types of sockets: well known, and ephemeral. There is a separate lookup table for each type.
Well known sockets have small values. In terms of binding, they are the constants "wired in" to the net. Actually, they are chained together if you create another instance of the same socket number. The idea is that you can capture a socket, say the echo server, without cooperation from the real server.
Ephemeral socket numbers are assigned locally. Anything goes as long as they are unique. This module uses the low bits as an index into a table and increments the high bits when necessary to force uniqueness. Allocating 10 bits for "low" means that we can have up to 1000 ephemeral sockets active at the same time, which seems like enough for now. That leaves a 22 bit counter to guarantee uniqeness.
I checked Bataan once. It had used 2^15 sockets in 4 hours. That's 2^18 per day. At that rate, things will wrap around every 16 days. That means we can't ignore the wraparound case. In practice, things will be much better than that since the algorithim reuses all the vacant slots before bumping the high bits. Note that even when things wrap around, we will never reuse an old socket number that is still in use since it will still be holding onto its slot. (We may reuse a socket number "right away" if it is used for a very long time, and deleted just before we check.)
ClumpOfSockets: TYPE = RECORD [sockets: SEQUENCE count: CARDINAL OF Socket];
wellKnownSockets: REF ClumpOfSockets ← NEW[ClumpOfSockets[PupWKS.rpc.d]];
ephemeralSockets:
REF ClumpOfSockets ←
NEW[ClumpOfSockets[16]];
These tables are extended when needed. They never shrink.
maxWellKnownSockets: CARDINAL = 512; -- Must be a power of 2 (for wraparound)
maxEphemeralSockets: CARDINAL = 1024; -- Must be a power of 2 (for masking into table)
ephemeralSocketMask: CARDINAL = maxEphemeralSockets - 1;
wraparoundSocket: CARDINAL ← MAX[maxWellKnownSockets, maxEphemeralSockets];
nextEphemeralSocket:
LONG
CARDINAL ← BasicTime.GetClockPulses[];
IndexFromSocket:
PROC [socket: Pup.Socket]
RETURNS [index:
CARDINAL] = {
socketNumber: LONG CARDINAL = PupType.CardFromSocket[socket];
index ← IndexFromLC[socketNumber];
};
IndexFromLC:
PROC [socketNumber:
LONG
CARDINAL]
RETURNS [index:
CARDINAL] = {
index ← Basics.BITAND[Basics.LowHalf[socketNumber], ephemeralSocketMask];
};
CantSpecifyArbitraryLocalSocketNumber: PUBLIC ERROR = CODE;
AddSocket:
INTERNAL
PROC [new: Socket] = {
socketNumber: LONG CARDINAL = PupType.CardFromSocket[new.local.socket];
SELECT
TRUE
FROM
new.local.socket = Pup.nullSocket => AddEphemeralSocket[new];
socketNumber < maxWellKnownSockets => AddWellKnownSocket[new];
ENDCASE => ERROR CantSpecifyArbitraryLocalSocketNumber;
};
AddWellKnownSocket:
INTERNAL
PROC [new: Socket] = {
socketNumber: CARDINAL = PupType.CardFromSocket[new.local.socket];
IF socketNumber >= wellKnownSockets.count
THEN
wellKnownSockets ← Expand[wellKnownSockets, socketNumber+1];
new.next ← wellKnownSockets[socketNumber];
wellKnownSockets[socketNumber] ← new;
};
IndexAdjustmentConfusion: ERROR = CODE; -- Bug in this code
SocketNumberWraparoundConfusion: ERROR = CODE; -- Bug in this code
AddEphemeralSocket:
INTERNAL
PROC [new: Socket] = {
Assigns local socket number.
localSocket: LONG CARDINAL ← nextEphemeralSocket;
FOR i:
CARDINAL
IN [0..ephemeralSockets.count)
DO
index: CARDINAL ← IndexFromLC[localSocket];
IF ~(index < ephemeralSockets.count)
THEN {
Out of range. Bump high bits and set low bits back to 0.
localSocket ← localSocket + (maxEphemeralSockets-index);
IF localSocket < maxWellKnownSockets THEN localSocket ← wraparoundSocket;
index ← IndexFromLC[localSocket];
IF index # 0 THEN ERROR IndexAdjustmentConfusion; };
IF ephemeralSockets[index] = NIL THEN EXIT;
localSocket ← localSocket + 1;
IF localSocket < maxWellKnownSockets THEN localSocket ← wraparoundSocket;
REPEAT
FINISHED => {
index: CARDINAL ← IndexFromLC[nextEphemeralSocket];
IF ephemeralSockets.count = maxEphemeralSockets
THEN
DebuggerSwap.CallDebugger["Socket table full"];
localSocket ← nextEphemeralSocket + (ephemeralSockets.count-index);
IF localSocket < maxWellKnownSockets THEN localSocket ← wraparoundSocket;
index ← IndexFromLC[localSocket];
IF index # ephemeralSockets.count THEN ERROR IndexAdjustmentConfusion;
ephemeralSockets ← Expand[ephemeralSockets, ephemeralSockets.count+1]; };
ENDLOOP;
IF localSocket < maxWellKnownSockets THEN ERROR SocketNumberWraparoundConfusion;
new.local.socket ← PupType.SocketFromCard[localSocket];
nextEphemeralSocket ← localSocket + 1;
ephemeralSockets[IndexFromSocket[new.local.socket]] ← new;
};
Expand:
INTERNAL
PROC [old:
REF ClumpOfSockets, size:
CARDINAL]
RETURNS [new: REF ClumpOfSockets] = {
new ← NEW[ClumpOfSockets[size]];
FOR i: CARDINAL IN [0..old.count) DO new[i] ← old[i]; ENDLOOP;
};
FindSocket:
PROC [localSocket: Pup.Socket]
RETURNS [h: Socket ←
NIL] = {
socketNumber: LONG CARDINAL = PupType.CardFromSocket[localSocket];
IF socketNumber < maxWellKnownSockets
THEN
h ← FindWellKnownSocket[socketNumber]
ELSE h ← FindEphemeralSocket[localSocket];
};
FindWellKnownSocket:
PROC [socketNumber:
CARDINAL]
RETURNS [h: Socket ←
NIL] = {
IF socketNumber < wellKnownSockets.count THEN h ← wellKnownSockets[socketNumber];
DO
IF h = NIL THEN RETURN;
IF ~h.dead THEN RETURN;
h ← h.next;
Hackery: look more if we encounter a dead one (echo server slot captured)
ENDLOOP;
};
FindEphemeralSocket:
PROC [localSocket: Pup.Socket]
RETURNS [h: Socket ←
NIL] = {
index: CARDINAL = IndexFromSocket[localSocket];
IF index < ephemeralSockets.count THEN h ← ephemeralSockets[index];
IF h = NIL THEN RETURN;
IF localSocket # h.local.socket THEN h ← NIL;
};
RemoveSocket:
INTERNAL PROC [old: Socket] = {
socketNumber: LONG CARDINAL = PupType.CardFromSocket[old.local.socket];
IF socketNumber < maxWellKnownSockets THEN RemoveWellKnownSocket[old, socketNumber]
ELSE RemoveEphemeralSocket[old];
old.next ← NIL;
};
RemoveWellKnownSocket:
INTERNAL
PROC [old: Socket, socketNumber:
CARDINAL] = {
head: Socket ← wellKnownSockets[socketNumber];
IF head = old
THEN {
wellKnownSockets[socketNumber] ← head.next;
RETURN; };
DO
-- NIL fault if not on chain
IF head.next = old THEN { head.next ← old.next; EXIT; };
head ← head.next;
ENDLOOP;
};
RemoveEphemeralSocket:
INTERNAL
PROC [old: Socket] = {
index: CARDINAL = IndexFromSocket[old.local.socket];
IF ephemeralSockets[index] # old THEN ERROR; -- Or bounds fault if bogus
ephemeralSockets[index] ← NIL;
};
Incoming packets from Drivers
Counters for all the strange cases
errorTooShort: INT ← 0;
errorNoGateway: INT ← 0;
errorNoSocket: INT ← 0;
errorDeadSocket: INT ← 0;
errorBadChecksum: INT ← 0;
errorBuffersFull: INT ← 0;
TakeThis:
PUBLIC
PROC [network: Network, buffer: CommDriver.Buffer, bytes:
NAT]
RETURNS [CommDriver.Buffer] = {
b: Buffer = RealPupBuffer[buffer, recv];
bytesNeeded: NAT = PupBuffer.RoundUpForChecksum[b.byteLength];
dest: Pup.Address ← b.dest;
localSocket: Pup.Socket = dest.socket;
socket: Socket;
proc: ReceiveProc;
user: REF ANY;
IF bytes < bytesNeeded
OR b.byteLength < bytesOfPupOverhead
THEN {
errorTooShort ← errorTooShort.SUCC;
RETURN[RealDriverBuffer[b]]; };
BEGIN
Don't be surprised if you find krocks lurking in here. /HGM, April 1, 1986
SELECT
TRUE
FROM
(dest.host = network.pup.host) => {
SELECT
TRUE
FROM
(dest.net = network.pup.net) => NULL; -- Main line case
(network.pup.net = Pup.nullNet) => NULL; -- We don't know, he does
(dest.net = Pup.nullNet) => NULL; -- He doesn't know, we do
ENDCASE => GOTO TryBackDoor; };
(dest.host = Pup.allHosts) => {
SELECT
TRUE
FROM
(dest.net = network.pup.net) => NULL; -- Main line case
(network.pup.net = Pup.nullNet) => NULL; -- We don't know, he does
(dest.net = Pup.nullNet) => NULL; -- He doesn't know, we do
ENDCASE => GOTO TryBackDoor; };
ENDCASE => GOTO TryBackDoor;
EXITS TryBackDoor => {
[network, ] ← PupInternal.Route[dest]; -- Back door fixup
IF network =
NIL
THEN
network ← NARROW[b.ovh.network]; -- Argh. Our Initialization or his confusion
IF dest.net # network.pup.net
OR dest.host # network.pup.host
THEN
RETURN[RealDriverBuffer[forwarder[b]]]; };
END;
socket ← FindSocket[localSocket];
IF socket =
NIL
THEN {
errorNoSocket ← errorNoSocket.SUCC;
ReturnErrorNoFree[b, noSocket, "No socket at this machine"];
RETURN[RealDriverBuffer[b]]; };
IF socket.dead
THEN {
errorDeadSocket ← errorDeadSocket.SUCC;
IF ~socket.noErrors THEN ReturnErrorNoFree[b, noSocket, "Socket has been deleted"];
RETURN[RealDriverBuffer[b]]; };
IF socket.recvChecksum
THEN
TRUSTED {
words: NAT = PupBuffer.WordsWithoutChecksum[b.byteLength];
checksumLoc: LONG POINTER TO HWORD ← @b.byteLength + words;
hisChecksum: HWORD ← checksumLoc^;
IF hisChecksum # PupBuffer.noChecksum
THEN {
checksum: WORD;
IF outOfLineChecksum
THEN
checksum ← OutOfLineChecksum[0, words, @b.byteLength]
ELSE checksum ← Checksum.ComputeChecksum[0, words, @b.byteLength];
IF checksum # hisChecksum
THEN {
errorBadChecksum ← errorBadChecksum.SUCC;
ReturnErrorNoFree[b, badChecksum, "Bad software checksum"];
RETURN[RealDriverBuffer[b]]; }; }; };
[proc, user] ← CheckDirect[socket];
IF proc #
NIL
THEN {
new: Buffer;
b.ovh.socket ← socket;
new ← proc[socket, b, user];
IF new = NIL THEN RETURN[NIL];
UnbumpRecvCount[socket];
RETURN[RealDriverBuffer[new]]; };
IF TakeThisInner[socket, b] THEN RETURN[NIL]
ELSE {
errorBuffersFull ← errorBuffersFull.SUCC;
IF ~socket.noErrors THEN ReturnErrorNoFree[b, resourceLimits, "Buffer allocation limit"];
RETURN[RealDriverBuffer[b]]; };
};
CheckDirect:
ENTRY
PROC [socket: Socket]
RETURNS [proc: ReceiveProc, user:
REF
ANY] = {
IF socket.proc = NIL THEN RETURN[NIL, NIL];
IF socket.recvBuffersInUse < socket.recvBuffersAlloc
THEN {
socket.recvBuffersInUse ← socket.recvBuffersInUse.SUCC;
RETURN[socket.proc, socket.user]; };
RETURN[NIL, NIL];
};
UnbumpRecvCount:
ENTRY
PROC [socket: Socket] =
INLINE {
socket.recvBuffersInUse ← socket.recvBuffersInUse.PRED;
};
TakeThisInner:
ENTRY
PROC [socket: Socket, b: Buffer]
RETURNS [ok:
BOOL] = {
IF socket.dead
THEN
{
CommDriver.FreeBuffer[RealDriverBuffer[b]];
RETURN[TRUE]; };
IF socket.recvBuffersInUse < socket.recvBuffersAlloc
THEN {
socket.recvBuffersInUse ← socket.recvBuffersInUse.SUCC;
IF b.ovh.gap#gapNoList
THEN
DebuggerSwap.CallDebugger["Buffer already in a list!"];
b.ovh.gap ← gapSocket; -- DKW: b is now in the socket queue
IF socket.firstInput = NIL THEN socket.firstInput ← b
ELSE socket.lastInput.ovh.next ← b;
socket.lastInput ← b;
NOTIFY socket.waitForInput;
RETURN[TRUE]; };
RETURN[FALSE];
};
Stubs for gatewaying
forwarder: PROC [b: Buffer] RETURNS [Buffer] ← DummyForwarder;
DummyForwarder:
PROC [b: Buffer]
RETURNS [Buffer] = {
errorNoGateway ← errorNoGateway.SUCC;
ReturnErrorNoFree[b, iAmNotAGateway, "I'm not a gateway (yet)"];
RETURN[b];
};
CaptureForwarding:
PUBLIC
PROC [proc:
PROC [Buffer]
RETURNS [Buffer] ] = {
forwarder ← proc;
};
Finalization
sfqMaxCount: INT ← 0; -- DKW: see how full sfq can get in 10 seconds
droppedSockets: INT ← 0;
finishedSockets: INT ← 0;
finishedErrors: INT ← 0;
finishedDelayed: INT ← 0;
finishedBatches: INT ← 0;
oneSecond: Process.Ticks = Process.SecondsToTicks[1];
SocketFinalizer:
PROC = {
Process.SetPriority[Process.priorityBackground];
DO
sfqCount: INT ← 0; -- DKW: count the number of sockets in the queue each iteration
list: LIST OF Socket ← NIL; -- Batch up sockets to be Destroyed, avoiding 10 seconds each
DO
socket: Socket ← NARROW[SafeStorage.FQNext[sfq]];
sfqCount ← sfqCount+1;
IF ~socket.dead
THEN {
-- User forgot to call Destroy
SafeStorage.EnableFinalization[socket];
Destroy[socket];
droppedSockets ← droppedSockets.SUCC;
}
ELSE {
-- Normal end of life
IF socket.noErrors THEN list ← CONS[socket, list]
ELSE DestroyInner[globalLock, socket];
finishedSockets ← finishedSockets.SUCC;
};
socket ← NIL;
IF SafeStorage.FQEmpty[sfq] THEN EXIT;
ENDLOOP;
sfqMaxCount ← MAX[sfqMaxCount, sfqCount];
IF list = NIL THEN LOOP;
Process.Pause[10*oneSecond];
UNTIL list =
NIL
DO
DestroyInner[globalLock, list.first];
finishedDelayed ← finishedDelayed.SUCC;
list ← list.rest;
ENDLOOP;
finishedBatches ← finishedBatches.SUCC;
ENDLOOP;
};
BufferFinalizer:
PROC = {
Beware: See the comments near CommDriver.Buffer
Process.SetPriority[Process.priorityBackground];
DO
b: Buffer ← NARROW[SafeStorage.FQNext[bfq]];
SafeStorage.EnableFinalization[b];
FreeBuffer[b];
I saw a NIL fault in FreeBuffer because b.ovh.socket was NIL. I think that was a missing check for socket.dead in TakeThisInner. /HGM, May 86
b ← NIL;
droppedBuffers ← droppedBuffers.SUCC;
ENDLOOP;
};
DropTest:
PROC [n:
NAT] = {
FOR i:
NAT
IN [0..n)
DO
socket: Socket ← CreateEphemeral[remote: Pup.nullAddress];
[] ← AllocBuffer[socket];
SetNoErrors[socket];
ENDLOOP;
};
bfq: SafeStorage.FinalizationQueue ← SafeStorage.NewFQ[];
sfq: SafeStorage.FinalizationQueue ← SafeStorage.NewFQ[length: 300];
DKW: Because of the pause in SocketFinalizer, this queue must hold 10 seconds worth of discarded sockets; 100 (10 per second) is not enough. The TrickleCharger, for example, does FS file lookups as fast as it can; each lookup uses an ephemeral socket. When sfq had only 100 elements, the socket finalizer eventually fell behind and the socket table filled up.
SafeStorage.EstablishFinalization[type: CODE[PupBuffer.BufferObject], npr: 0, fq: bfq];
SafeStorage.EstablishFinalization[type: CODE[Object], npr: 1, fq: sfq];
TRUSTED { Process.Detach[FORK SocketFinalizer[]]; };
TRUSTED { Process.Detach[FORK BufferFinalizer[]]; };
IF ~Booting.switches[c] THEN CommDriver.InsertReceiveProc[NIL, pup, TakeThis];