PupSocketImpl.mesa
Copyright © 1986 by Xerox Corporation. All rights reserved.
Hal Murray, November 17, 1986 4:13:21 pm PST
Doug Wyatt, June 10, 1986 2:54:21 pm PDT
DIRECTORY
Allocator USING [NHeaderP, NormalHeader],
Basics USING [BITAND, bytesPerWord, LowHalf],
BasicTime USING [GetClockPulses],
Booting USING [switches],
Checksum USING [ComputeChecksum],
CommBuffer USING [Direction, Encapsulation],
CommBufferExtras USING [gapNoList],
CommDriver USING [AllocBuffer, Buffer, BufferObject, FreeBuffer, GetNetworkChain, InsertReceiveProc, Network],
DebuggerSwap USING [CallDebugger],
Endian USING [bytesPerHWord, FFromCard, FWORD, HWORD],
PrincOpsUtils USING [LongCopy],
Process USING [Detach, DisableTimeout, EnableAborts, MsecToTicks, Pause, priorityBackground, SecondsToTicks, SetPriority, SetTimeout, Ticks],
Pup USING [Address, allHosts, allNets, Host, nullAddress, nullHost, nullNet, nullSocket, Socket],
PupBuffer USING [abortOverheadBytes, Buffer, BufferObject, ByteAlloc, ByteIndex, errorOverheadBytes, FWordIndex, HWordIndex, noChecksum, RoundUpForChecksum, WordsWithoutChecksum],
PupInternal USING [Route],
PupName USING [],
PupSocket USING [dontWait, Milliseconds, waitForever],
PupSocketBackdoor USING [ReceiveProc],
PupType USING [bytesInPupHeader, bytesOfPupOverhead, CardFromSocket, ErrorCode, HeaderWithoutChecksum, SocketFromCard],
PupWKS USING [rpc],
RefText USING [ObtainScratch, ReleaseScratch],
Rope USING [Fetch, FromRefText, Length, ROPE],
SafeStorage USING [EnableFinalization, EstablishFinalization, FinalizationQueue, FQEmpty, FQNext, NewFQ];
PupSocketImpl: CEDAR MONITOR LOCKS socket USING socket: Socket
IMPORTS Basics, BasicTime, Booting, Checksum, CommDriver, DebuggerSwap, Endian, PrincOpsUtils, Process, PupBuffer, PupInternal, PupType, RefText, Rope, SafeStorage
EXPORTS PupInternal, PupName, PupSocket, PupSocketBackdoor = {
Buffer: TYPE = PupBuffer.Buffer;
Direction: TYPE = CommBuffer.Direction;
ErrorCode: TYPE = PupType.ErrorCode;
FWORD: TYPE = Endian.FWORD;
HeaderWithoutChecksum: TYPE = PupType.HeaderWithoutChecksum;
HWORD: TYPE = Endian.HWORD;
Milliseconds: TYPE = PupSocket.Milliseconds;
Network: TYPE = CommDriver.Network;
ROPE: TYPE = Rope.ROPE;
ReceiveProc: TYPE = PupSocketBackdoor.ReceiveProc;
gapNoList: [0..16384) ~ CommBufferExtras.gapNoList;
gapSocket: [0..16384) ~ 10; -- b.ovh.gap=gapSocket while b is on the socket queue
bytesInPupHeader: NAT = PupType.bytesInPupHeader;
bytesOfPupOverhead: NAT = PupType.bytesOfPupOverhead;
bytesPerWord: NAT = Basics.bytesPerWord;
bytesPerHWord: NAT = Endian.bytesPerHWord;
outOfLineChecksum: BOOLTRUE;
Hackery for performance tuning. "= FALSE" => no performance loss.
OutOfLineChecksum: PROC [
cs: WORD ← 0, nWords: CARDINAL, p: LONG POINTER]
RETURNS [checksum: WORD] = TRUSTED {
RETURN[Checksum.ComputeChecksum[cs, nWords, p]]; };
globalLock: Socket ← NEW[Object];
Socket: TYPE = REF Object;
While a buffer is on the input queue, b.ovh.socket is NIL so that dropped sockets will get finalized. While the client has the buffer (Alloc, Get, GetDirect) b.ovh.socket is filled in so we can fixup the socket counters if the buffer gets dropped.
Object: PUBLIC TYPE = MONITORED RECORD [
local, remote: Pup.Address,
sendBuffersInUse: CARDINAL ← 0,
sendBuffersAlloc: CARDINAL,
recvBuffersInUse: CARDINAL ← 0,
recvBuffersAlloc: CARDINAL,
sendChecksum: BOOLTRUE,
recvChecksum: BOOLTRUE,
dead, noErrors: BOOLFALSE,
dontWait: BOOLFALSE,
waitForInput: CONDITION,
waitForSendBuffer: CONDITION,
waitForRecvBuffer: CONDITION,
routing: Routing ← NIL, -- NIL => no cached route
firstInput, lastInput: Buffer ← NIL,
next: Socket ← NIL, -- Only used when a server socket gets captured
proc: ReceiveProc ← NIL,
user: REF ANY ];
Routing: TYPE = REF RoutingInfo;
RoutingInfo: TYPE = RECORD [
network: Network,
immediate: Pup.Host,
encap: CommBuffer.Encapsulation ];
Beware: See the comments near Driver.Buffer
RealDriverBuffer: PROC [b: Buffer] RETURNS [CommDriver.Buffer] = TRUSTED INLINE {
nhp: Allocator.NHeaderP ← LOOPHOLE[b, Allocator.NHeaderP]-SIZE[Allocator.NormalHeader];
nhp.type ← CODE[CommDriver.BufferObject];
b.ovh.direction ← none;
b.ovh.socket ← NIL;
RETURN[LOOPHOLE[b]]; };
TempDriverBuffer: PROC [b: Buffer] RETURNS [CommDriver.Buffer] = TRUSTED INLINE {
RETURN[LOOPHOLE[b]]; };
RealPupBuffer: PROC [b: CommDriver.Buffer, d: Direction] RETURNS [Buffer] = TRUSTED INLINE {
nhp: Allocator.NHeaderP ← LOOPHOLE[b, Allocator.NHeaderP]-SIZE[Allocator.NormalHeader];
nhp.type ← CODE[PupBuffer.BufferObject];
b.ovh.next ← NIL;
b.ovh.direction ← d;
RETURN[LOOPHOLE[b]]; };
Next: PROC [b: Buffer] RETURNS [Buffer] = TRUSTED INLINE {
RETURN[LOOPHOLE[b.ovh.next]]; };
Errors
SocketNotWellKnown: PUBLIC ERROR = CODE;
Creation and parameter setting
IsWellKnown: PUBLIC PROC [local: Pup.Socket] RETURNS [yes: BOOL] = {
socketNumber: LONG CARDINAL = PupType.CardFromSocket[local];
IF local = Pup.nullSocket THEN RETURN[FALSE];
IF socketNumber >= maxWellKnownSockets THEN RETURN[FALSE];
RETURN[TRUE];
};
CreateServer: PUBLIC PROC [
local: Pup.Socket,
sendBuffers: NAT ← 1,
recvBuffers: NAT ← 5,
getTimeout: Milliseconds ← 10000 ]
RETURNS [socket: Socket] = {
socketNumber: LONG CARDINAL = PupType.CardFromSocket[local];
IF local = Pup.nullSocket THEN ERROR SocketNotWellKnown;
IF socketNumber >= maxWellKnownSockets THEN ERROR SocketNotWellKnown;
socket ← CreateCommon[local, Pup.nullAddress, sendBuffers, recvBuffers, getTimeout];
};
CreateEphemeral: PUBLIC PROC [
remote: Pup.Address,
sendBuffers: NAT ← 1,
recvBuffers: NAT ← 5,
getTimeout: Milliseconds ← 10000 ]
RETURNS [socket: Socket] = {
socket ← CreateCommon[Pup.nullSocket, remote, sendBuffers, recvBuffers, getTimeout];
AddSocket assigns a local socket number if socket.local.socket = nullSocket.
};
CreateCommon: PROC [
local: Pup.Socket,
remote: Pup.Address,
sendBuffers: NAT,
recvBuffers: NAT,
getTimeout: Milliseconds ]
RETURNS [socket: Socket] = {
network: Network;
socket ← NEW [Object ← [
local: [Pup.nullNet, Pup.nullHost, local],
remote: remote,
sendBuffersAlloc: sendBuffers,
recvBuffersAlloc: recvBuffers ] ];
network ← PupInternal.Route[remote].network;
IF network = NIL THEN network ← CommDriver.GetNetworkChain[];
IF network # NIL THEN {
socket.local.net ← network.pup.net;
socket.local.host ← network.pup.host; };
TRUSTED {
Process.EnableAborts[@socket.waitForInput];
Process.EnableAborts[@socket.waitForSendBuffer];
Process.EnableAborts[@socket.waitForRecvBuffer]; };
SetGetTimeout[socket, getTimeout];
CreateInner[globalLock, socket];
SafeStorage.EnableFinalization[socket];
};
socketsCreated: INT ← 0;
CreateInner: ENTRY PROC [socket: Socket, new: Socket] = {
ENABLE UNWIND => NULL;
Beware: socket is the global lock, new is the real socket
AddSocket[new];
socketsCreated ← socketsCreated.SUCC;
};
SetRemoteAddress: PUBLIC PROC [socket: Socket, remote: Pup.Address] = {
socket.remote ← remote;
};
GetRemoteAddress: PUBLIC PROC [socket: Socket] RETURNS [Pup.Address] = {
RETURN[socket.remote];
};
GetLocalAddress: PUBLIC PROC [socket: Socket] RETURNS [Pup.Address] = {
RETURN[socket.local];
};
SetGetTimeout: PUBLIC PROC [socket: Socket, ms: Milliseconds] = {
socket.dontWait ← FALSE;
SELECT ms FROM
PupSocket.dontWait => socket.dontWait ← TRUE;
PupSocket.waitForever => TRUSTED {
Process.DisableTimeout[@socket.waitForInput]; };
< CARDINAL.LAST => TRUSTED {
Process.SetTimeout[@socket.waitForInput, Process.MsecToTicks[ms] ]; };
ENDCASE => TRUSTED {
Process.SetTimeout[@socket.waitForInput, Process.SecondsToTicks[ms/1000] ]; };
};
SetSoftwareChecksumming: PUBLIC PROC [socket: Socket, send, recv: BOOL] = {
socket.sendChecksum ← send;
socket.recvChecksum ← recv;
};
SetNoErrors: PUBLIC PROC [socket: Socket] = {
socket.noErrors ← TRUE;
};
Kick: PUBLIC ENTRY PROC [socket: Socket] = {
BROADCAST socket.waitForInput;
};
Destroy: PUBLIC ENTRY PROC [socket: Socket] = {
BROADCAST socket.waitForInput;
socket.dead ← TRUE;
UNTIL socket.firstInput = NIL DO
b: Buffer ← socket.firstInput;
socket.firstInput ← Next[b];
b.ovh.next ← NIL; -- DKW: just to be careful ...
IF b.ovh.gap#gapSocket THEN
DebuggerSwap.CallDebugger["Clobbered buffer in socket queue!"];
b.ovh.gap ← gapNoList; -- DKW: b now removed from the queue
socket.recvBuffersInUse ← socket.recvBuffersInUse - 1;
CommDriver.FreeBuffer[RealDriverBuffer[b]];
ENDLOOP;
socket.lastInput ← NIL; -- Help Buffer Finalization
socket.routing ← NIL;
Now just drop it on the floor
};
DestroyInner: ENTRY PROC [socket: Socket, old: Socket] = {
Beware: socket is the global lock, old is the real socket
RemoveSocket[old];
};
Sending
AllocBuffer: PUBLIC PROC [socket: Socket] RETURNS [b: Buffer] = {
InnerAllocBuffer[socket];
b ← RealPupBuffer[CommDriver.AllocBuffer[], send];
b.ovh.network ← NIL;
b.ovh.socket ← socket; -- Needed for Finalization
};
InnerAllocBuffer: ENTRY PROC [socket: Socket] = {
Needed so that the call to CommDriver.AllocBuffer is outside the ML.
ENABLE UNWIND => NULL;
UNTIL socket.sendBuffersInUse < socket.sendBuffersAlloc DO
WAIT socket.waitForSendBuffer;
ENDLOOP;
socket.sendBuffersInUse ← socket.sendBuffersInUse + 1;
};
SetUserBytes: PUBLIC PROC [b: Buffer, bytes: PupBuffer.ByteIndex] = {
boundsCheck: PupBuffer.ByteIndex ← bytes; -- In case somebody is using TRUSTED.
b.byteLength ← bytes + bytesOfPupOverhead;
};
SetUserHWords: PUBLIC PROC [b: Buffer, hWords: PupBuffer.HWordIndex] = {
boundsCheck: PupBuffer.HWordIndex ← hWords;
b.byteLength ← hWords*SIZE[HWORD, bytesPerWord] + bytesOfPupOverhead;
};
SetUserFWords: PUBLIC PROC [b: Buffer, fWords: PupBuffer.FWordIndex] = {
boundsCheck: PupBuffer.FWordIndex ← fWords;
b.byteLength ← fWords*SIZE[FWORD, bytesPerWord] + bytesOfPupOverhead;
};
SetUserSize: PUBLIC PROC [b: Buffer, sizeUnits: NAT] = {
boundsCheck: PupBuffer.ByteIndex = sizeUnits*SIZE[WORD, bytesPerWord];
b.byteLength ← boundsCheck + bytesOfPupOverhead;
};
Broadcast: PUBLIC PROC [socket: Socket, b: Buffer] = {
BroadcastInner[socket, b, socket.remote.socket];
};
BroadcastInner: PROC [socket: Socket, b: Buffer, dest: Pup.Socket] = {
bytes: NAT = PupBuffer.RoundUpForChecksum[b.byteLength];
b.hopCount ← 0;
b.spares ← 0;
b.dest.host ← Pup.allHosts;
b.dest.socket ← dest;
b.source.socket ← socket.local.socket;
FOR network: Network ← CommDriver.GetNetworkChain[], network.next UNTIL network = NIL DO
b.ovh.encap ← network.pup.getEncapsulation[network, Pup.allHosts];
b.dest.net ← network.pup.net;
b.source.net ← network.pup.net;
b.source.host ← network.pup.host;
SetChecksum[b];
network.pup.send[network, TempDriverBuffer[b], bytes];
ENDLOOP;
FreeBuffer[b];
};
Put: PUBLIC PROC [socket: Socket, b: Buffer] = {
Send[socket, b, socket.remote];
};
Send: PUBLIC PROC [socket: Socket, b: Buffer, dest: Pup.Address] = {
bytes: NAT = PupBuffer.RoundUpForChecksum[b.byteLength];
network: Network;
immediate: Pup.Host;
IF dest.net = Pup.allNets THEN { BroadcastInner[socket, b, dest.socket]; RETURN; };
[network, immediate] ← PupInternal.Route[dest];
IF network = NIL THEN { FreeBuffer[b]; RETURN; };
b.hopCount ← 0;
b.spares ← 0;
b.dest ← dest;
b.source ← socket.local;
b.ovh.encap ← network.pup.getEncapsulation[network, immediate];
TRUSTED {
words: NAT = PupBuffer.WordsWithoutChecksum[b.byteLength];
checksumLoc: LONG POINTER TO HWORD ← @b.byteLength + words;
IF socket.sendChecksum THEN {
IF outOfLineChecksum THEN
checksumLoc^ ← OutOfLineChecksum[0, words, @b.byteLength]
ELSE checksumLoc^ ← Checksum.ComputeChecksum[0, words, @b.byteLength]; }
ELSE checksumLoc^ ← PupBuffer.noChecksum; };
network.pup.send[network, TempDriverBuffer[b], bytes];
FreeBuffer[b];
};
Receiving
Get: PUBLIC ENTRY PROC [socket: Socket] RETURNS [b: Buffer] = {
ENABLE UNWIND => NULL;
IF socket.firstInput = NIL THEN {
IF socket.dontWait THEN RETURN[NIL];
IF socket.dead THEN RETURN[NIL];
WAIT socket.waitForInput; };
IF socket.firstInput = NIL THEN RETURN[NIL];
b ← socket.firstInput;
socket.firstInput ← Next[b];
b.ovh.next ← NIL; -- DKW: just to be careful ...
IF b.ovh.gap#gapSocket THEN
DebuggerSwap.CallDebugger["Clobbered buffer in socket queue!"];
b.ovh.gap ← gapNoList; -- DKW: b now removed from the queue
b.ovh.socket ← socket; -- Needed for Finalization
};
GetUserBytes: PUBLIC PROC [b: Buffer] RETURNS [bytes: PupBuffer.ByteIndex] = {
bytes ← b.byteLength - bytesOfPupOverhead;
};
GetUserHWords: PUBLIC PROC [b: Buffer] RETURNS [hWords: PupBuffer.HWordIndex] = {
bytes: PupBuffer.ByteIndex ← b.byteLength - bytesOfPupOverhead;
hWords ← bytes/SIZE[HWORD]/bytesPerWord;
};
GetUserFWords: PUBLIC PROC [b: Buffer] RETURNS [fWords: PupBuffer.FWordIndex] = {
bytes: PupBuffer.ByteIndex ← b.byteLength - bytesOfPupOverhead;
fWords 𡤋ytes/SIZE[FWORD]/bytesPerWord;
};
GetUserSize: PUBLIC PROC [b: Buffer] RETURNS [sizeUnits: NAT] = {
bytes: PupBuffer.ByteIndex ← b.byteLength - bytesOfPupOverhead;
sizeUnits ← bytes/SIZE[WORD]/bytesPerWord;
};
FreeBuffer: PUBLIC PROC [b: Buffer] = {
socket: Socket ← NARROW[b.ovh.socket];
FreeBufferInner[socket, b];
};
FreeBufferInner: ENTRY PROC [socket: Socket, b: Buffer] = INLINE {
ENABLE UNWIND => NULL;
SELECT b.ovh.direction FROM
send => {
socket.sendBuffersInUse ← socket.sendBuffersInUse - 1;
NOTIFY socket.waitForSendBuffer; };
recv => {
socket.recvBuffersInUse ← socket.recvBuffersInUse - 1;
NOTIFY socket.waitForRecvBuffer; };
ENDCASE => ERROR;
CommDriver.FreeBuffer[RealDriverBuffer[b]];
};
FixupSourceAndDest: PUBLIC PROC [b: Buffer] = {
network: Network ← NARROW[b.ovh.network];
IF b.source.net = Pup.nullNet THEN b.source.net ← network.pup.net;
IF b.dest.net = Pup.nullNet THEN b.dest.net ← network.pup.net;
IF b.dest.host = Pup.allHosts THEN b.dest.host ← network.pup.host;
};
ReturnToSender: PUBLIC PROC [b: Buffer] = {
ReturnToSenderNoFree[b];
FreeBuffer[b];
};
ReturnToSenderNoFree: PUBLIC PROC [b: Buffer] = {
socket: Socket ← NARROW[b.ovh.socket];
network: Network ← NARROW[b.ovh.network];
bytes: NAT = PupBuffer.RoundUpForChecksum[b.byteLength];
temp: Pup.Address;
FixupSourceAndDest[b];
b.hopCount ← 0;
b.spares ← 0;
temp ← b.source;
b.source ← b.dest;
b.dest ← temp;
TRUSTED {
words: NAT = PupBuffer.WordsWithoutChecksum[b.byteLength];
checksumLoc: LONG POINTER TO HWORD ← @b.byteLength + words;
IF socket = NIL OR socket.sendChecksum THEN {
IF outOfLineChecksum THEN
checksumLoc^ ← OutOfLineChecksum[0, words, @b.byteLength]
ELSE checksumLoc^ ← Checksum.ComputeChecksum[0, words, @b.byteLength]; }
ELSE checksumLoc^ ← PupBuffer.noChecksum; };
IF b.source.host = Pup.allHosts
OR network.toBroadcast[network, TempDriverBuffer[b]] THEN RETURN;
network.pup.return[network, TempDriverBuffer[b], bytes];
};
SetChecksum: PUBLIC PROC [b: Buffer] = TRUSTED {
words: NAT = PupBuffer.WordsWithoutChecksum[b.byteLength];
checksumLoc: LONG POINTER TO HWORD ← @b.byteLength + words;
IF outOfLineChecksum THEN
checksumLoc^ ← OutOfLineChecksum[0, words, @b.byteLength]
ELSE checksumLoc^ ← Checksum.ComputeChecksum[0, words, @b.byteLength];
};
errorSize: NAT = SIZE[HeaderWithoutChecksum] + SIZE[ErrorCode] + SIZE[HWORD];
ReturnError: PUBLIC PROC [b: Buffer, code: ErrorCode, rope: ROPE] = {
ReturnErrorNoFree[b, code, rope];
FreeBuffer[b];
};
ReturnErrorNoFree: PUBLIC PROC [b: Buffer, code: ErrorCode, rope: ROPE] = {
socket: Socket ← NARROW[b.ovh.socket];
IF b.type = error THEN RETURN;
TRUSTED {
PrincOpsUtils.LongCopy[
from: @b.byteLength,
nwords: SIZE[HeaderWithoutChecksum],
to: @b.error.header]; };
b.type ← error;
b.error.code ← code;
b.error.options ← 0;
SetUserSize[b, errorSize];
IF rope = NIL THEN
SELECT code FROM
badChecksum => rope ← "Bad Software Checksum";
noSocket => rope ← "No such Port";
resourceLimits => rope ← "Buffers full";
ENDCASE => NULL;
AppendRope[b, rope];
IF code = iAmNotAGateway THEN { -- Fill in our return address
network: Network = NARROW[b.ovh.network];
b.dest.net ← network.pup.net;
b.dest.host ← network.pup.host;
b.dest.socket ← Pup.nullSocket; };
ReturnToSenderNoFree[b];
};
Back door - for speed
newRoutingInfo: INT ← 0;
PutFirst: PUBLIC PROC [socket: Socket, b: Buffer] = {
routing: Routing ← socket.routing; -- ATOMIC
new: RoutingInfo;
IF socket.remote.socket = Pup.nullSocket THEN GOTO CantGetThere;
[new.network, new.immediate] ← PupInternal.Route[socket.remote];
IF new.network = NIL THEN GOTO CantGetThere;
new.encap ← new.network.pup.getEncapsulation[new.network, new.immediate];
IF routing = NIL OR routing^ # new THEN {
routing ← NEW[RoutingInfo ← new];
socket.routing ← routing; -- ATOMIC
newRoutingInfo ← newRoutingInfo.SUCC; };
PutAgain[socket, b];
EXITS CantGetThere => { socket.routing ← NIL; };
};
PutAgain: PUBLIC PROC [socket: Socket, b: Buffer] = {
bytes: NAT = PupBuffer.RoundUpForChecksum[b.byteLength];
routing: Routing ← socket.routing; -- ATOMIC
network: Network;
IF routing = NIL THEN RETURN;
network ← routing.network;
IF network = NIL THEN RETURN;
b.ovh.encap ← routing.encap;
b.hopCount ← 0;
b.spares ← 0;
b.dest ← socket.remote;
b.source ← socket.local;
TRUSTED {
words: NAT = PupBuffer.WordsWithoutChecksum[b.byteLength];
checksumLoc: LONG POINTER TO HWORD ← @b.byteLength + words;
IF socket.sendChecksum THEN {
IF outOfLineChecksum THEN
checksumLoc^ ← OutOfLineChecksum[0, words, @b.byteLength]
ELSE checksumLoc^ ← Checksum.ComputeChecksum[0, words, @b.byteLength]; }
ELSE checksumLoc^ ← PupBuffer.noChecksum; };
IF network # NIL THEN network.pup.send[network, TempDriverBuffer[b], bytes];
};
Resend: PUBLIC PROC [b: Buffer, checksum: BOOLTRUE] = {
bytes: NAT = PupBuffer.RoundUpForChecksum[b.byteLength];
network: Network ← NARROW[b.ovh.network];
IF network = NIL THEN RETURN;
TRUSTED {
words: NAT = PupBuffer.WordsWithoutChecksum[b.byteLength];
checksumLoc: LONG POINTER TO HWORD ← @b.byteLength + words;
IF checksum THEN {
IF outOfLineChecksum THEN
checksumLoc^ ← OutOfLineChecksum[0, words, @b.byteLength]
ELSE checksumLoc^ ← Checksum.ComputeChecksum[0, words, @b.byteLength]; }
ELSE checksumLoc^ ← PupBuffer.noChecksum; };
network.pup.send[network, TempDriverBuffer[b], bytes];
};
SetDirectReceive: PUBLIC ENTRY PROC [socket: Socket, proc: ReceiveProc, user: REF] = {
socket.proc ← proc;
socket.user ← user;
};
UseNormalPath: PUBLIC PROC [b: Buffer] = {
socket: Socket ← NARROW[b.ovh.socket];
UseNormalPathInner[socket, b];
};
UseNormalPathInner: ENTRY PROC [socket: Socket, b: Buffer] = {
b.ovh.socket ← NIL;
IF b.ovh.gap#gapNoList THEN
DebuggerSwap.CallDebugger["Buffer already in a list!"];
b.ovh.gap ← gapSocket; -- DKW: b is now in the socket queue
IF socket.firstInput = NIL THEN socket.firstInput ← b
ELSE socket.lastInput.ovh.next ← b;
socket.lastInput ← b;
NOTIFY socket.waitForInput;
};
AllocRecvBuffer: PUBLIC PROC [socket: Socket] RETURNS [b: Buffer] = {
InnerAllocRecvBuffer[socket];
b ← RealPupBuffer[CommDriver.AllocBuffer[], recv];
b.ovh.network ← NIL;
b.ovh.socket ← socket; -- Needed for Finalization
};
InnerAllocRecvBuffer: ENTRY PROC [socket: Socket] = {
Needed so that the call to CommDriver.AllocBuffer is outside the ML.
ENABLE UNWIND => NULL;
UNTIL socket.recvBuffersInUse < socket.recvBuffersAlloc DO
WAIT socket.waitForRecvBuffer;
ENDLOOP;
socket.recvBuffersInUse ← socket.recvBuffersInUse + 1;
};
Rope Utilities
CopyRope: PUBLIC PROC [b: Buffer, rope: Rope.ROPE] = {
chars: PupBuffer.ByteIndex ← MIN[Rope.Length[rope], PupBuffer.ByteAlloc.LAST];
FOR i: PupBuffer.ByteAlloc IN [0..chars) DO
b.char[i] ← Rope.Fetch[rope, i];
ENDLOOP;
SetUserBytes[b, chars];
};
AppendRope: PUBLIC PROC [b: Buffer, rope: Rope.ROPE] = {
start: PupBuffer.ByteIndex ← GetUserBytes[b];
chars: PupBuffer.ByteIndex ← MIN[Rope.Length[rope], PupBuffer.ByteAlloc.LAST-start];
FOR i: PupBuffer.ByteAlloc IN [0..chars) DO
b.char[start+i] ← Rope.Fetch[rope, i];
ENDLOOP;
SetUserBytes[b, start+chars];
};
ExtractRope: PUBLIC PROC [b: Buffer] RETURNS [rope: Rope.ROPE] = {
bytes: NAT ← GetUserBytes[b];
text: REF TEXT ← RefText.ObtainScratch[bytes];
FOR i: PupBuffer.ByteAlloc IN [0..bytes) DO
text[i] ← b.char[i];
ENDLOOP;
text.length ← bytes;
rope ← Rope.FromRefText[text];
RefText.ReleaseScratch[text];
};
ExtractErrorRope: PUBLIC PROC [b: Buffer] RETURNS [rope: Rope.ROPE] = {
bytes: NAT ← GetUserBytes[b];
text: REF TEXT ← RefText.ObtainScratch[bytes];
IF bytes < PupBuffer.errorOverheadBytes THEN RETURN[NIL];
bytes ← bytes - PupBuffer.errorOverheadBytes;
FOR i: NAT IN [0..bytes) DO
text[i] ← b.error.text[i];
ENDLOOP;
text.length ← bytes;
rope ← Rope.FromRefText[text];
RefText.ReleaseScratch[text];
};
ExtractAbortRope: PUBLIC PROC [b: Buffer] RETURNS [rope: Rope.ROPE] = {
bytes: NAT ← GetUserBytes[b];
text: REF TEXT ← RefText.ObtainScratch[bytes];
IF bytes < PupBuffer.abortOverheadBytes THEN RETURN[NIL];
bytes ← bytes - PupBuffer.abortOverheadBytes;
FOR i: NAT IN [0..bytes) DO
text[i] ← b.abort.text[i];
ENDLOOP;
text.length ← bytes;
rope ← Rope.FromRefText[text];
RefText.ReleaseScratch[text];
};
Utilities
unique: LONG CARDINAL ← BasicTime.GetClockPulses[];
GetUniqueID: PUBLIC PROC RETURNS [FWORD] = {
new: LONG CARDINAL ← GetUniqueIDInner[globalLock];
RETURN[Endian.FFromCard[new]];
};
GetUniqueIDInner: ENTRY PROC [socket: Socket] RETURNS [LONG CARDINAL] = {
unique ← unique.SUCC;
RETURN[unique];
};
IsThisMe: PUBLIC PROC [address: Pup.Address] RETURNS [yes: BOOL] = {
network: Network ← PupInternal.Route[address].network;
IF network = NIL THEN RETURN[FALSE];
IF network.pup.net # address.net THEN RETURN[FALSE];
IF network.pup.host # address.host THEN RETURN[FALSE];
RETURN[TRUE];
};
GetMyAddress: PUBLIC PROC RETURNS [address: Pup.Address] = {
network: Network ← CommDriver.GetNetworkChain[];
IF network = NIL THEN RETURN[Pup.nullAddress];
address.net ← network.pup.net;
address.host ← network.pup.host;
address.socket ← Pup.nullSocket;
};
Keeping track of active sockets and assigning ephemeral socket numbers
Once upon a time, this was a simple linked list. The current structure means that you can do a lookup without any chaining. There are two types of sockets: well known, and ephemeral. There is a separate lookup table for each type.
Well known sockets have small values. In terms of binding, they are the constants "wired in" to the net. Actually, they are chained together if you create another instance of the same socket number. The idea is that you can capture a socket, say the echo server, without cooperation from the real server.
Ephemeral socket numbers are assigned locally. Anything goes as long as they are unique. This module uses the low bits as an index into a table and increments the high bits when necessary to force uniqueness. Allocating 10 bits for "low" means that we can have up to 1000 ephemeral sockets active at the same time, which seems like enough for now. That leaves a 22 bit counter to guarantee uniqeness.
I checked Bataan once. It had used 2^15 sockets in 4 hours. That's 2^18 per day. At that rate, things will wrap around every 16 days. That means we can't ignore the wraparound case. In practice, things will be much better than that since the algorithim reuses all the vacant slots before bumping the high bits. Note that even when things wrap around, we will never reuse an old socket number that is still in use since it will still be holding onto its slot. (We may reuse a socket number "right away" if it is used for a very long time, and deleted just before we check.)
ClumpOfSockets: TYPE = RECORD [sockets: SEQUENCE count: CARDINAL OF Socket];
wellKnownSockets: REF ClumpOfSockets ← NEW[ClumpOfSockets[PupWKS.rpc.d]];
ephemeralSockets: REF ClumpOfSockets ← NEW[ClumpOfSockets[16]];
These tables are extended when needed. They never shrink.
maxWellKnownSockets: CARDINAL = 512; -- Must be a power of 2 (for wraparound)
maxEphemeralSockets: CARDINAL = 1024; -- Must be a power of 2 (for masking into table)
ephemeralSocketMask: CARDINAL = maxEphemeralSockets - 1;
wraparoundSocket: CARDINALMAX[maxWellKnownSockets, maxEphemeralSockets];
nextEphemeralSocket: LONG CARDINAL ← BasicTime.GetClockPulses[];
IndexFromSocket: PROC [socket: Pup.Socket] RETURNS [index: CARDINAL] = {
socketNumber: LONG CARDINAL = PupType.CardFromSocket[socket];
index ← IndexFromLC[socketNumber];
};
IndexFromLC: PROC [socketNumber: LONG CARDINAL] RETURNS [index: CARDINAL] = {
index ← Basics.BITAND[Basics.LowHalf[socketNumber], ephemeralSocketMask];
};
CantSpecifyArbitraryLocalSocketNumber: PUBLIC ERROR = CODE;
AddSocket: INTERNAL PROC [new: Socket] = {
socketNumber: LONG CARDINAL = PupType.CardFromSocket[new.local.socket];
SELECT TRUE FROM
new.local.socket = Pup.nullSocket => AddEphemeralSocket[new];
socketNumber < maxWellKnownSockets => AddWellKnownSocket[new];
ENDCASE => ERROR CantSpecifyArbitraryLocalSocketNumber;
};
AddWellKnownSocket: INTERNAL PROC [new: Socket] = {
socketNumber: CARDINAL = PupType.CardFromSocket[new.local.socket];
IF socketNumber >= wellKnownSockets.count THEN
wellKnownSockets ← Expand[wellKnownSockets, socketNumber+1];
new.next ← wellKnownSockets[socketNumber];
wellKnownSockets[socketNumber] ← new;
};
IndexAdjustmentConfusion: ERROR = CODE; -- Bug in this code
SocketNumberWraparoundConfusion: ERROR = CODE; -- Bug in this code
AddEphemeralSocket: INTERNAL PROC [new: Socket] = {
Assigns local socket number.
localSocket: LONG CARDINAL ← nextEphemeralSocket;
FOR i: CARDINAL IN [0..ephemeralSockets.count) DO
index: CARDINAL ← IndexFromLC[localSocket];
IF ~(index < ephemeralSockets.count) THEN {
Out of range. Bump high bits and set low bits back to 0.
localSocket ← localSocket + (maxEphemeralSockets-index);
IF localSocket < maxWellKnownSockets THEN localSocket ← wraparoundSocket;
index ← IndexFromLC[localSocket];
IF index # 0 THEN ERROR IndexAdjustmentConfusion; };
IF ephemeralSockets[index] = NIL THEN EXIT;
localSocket ← localSocket + 1;
IF localSocket < maxWellKnownSockets THEN localSocket ← wraparoundSocket;
REPEAT FINISHED => {
index: CARDINAL ← IndexFromLC[nextEphemeralSocket];
IF ephemeralSockets.count = maxEphemeralSockets THEN
DebuggerSwap.CallDebugger["Socket table full"];
localSocket ← nextEphemeralSocket + (ephemeralSockets.count-index);
IF localSocket < maxWellKnownSockets THEN localSocket ← wraparoundSocket;
index ← IndexFromLC[localSocket];
IF index # ephemeralSockets.count THEN ERROR IndexAdjustmentConfusion;
ephemeralSockets ← Expand[ephemeralSockets, ephemeralSockets.count+1]; };
ENDLOOP;
IF localSocket < maxWellKnownSockets THEN ERROR SocketNumberWraparoundConfusion;
new.local.socket ← PupType.SocketFromCard[localSocket];
nextEphemeralSocket ← localSocket + 1;
ephemeralSockets[IndexFromSocket[new.local.socket]] ← new;
};
Expand: INTERNAL PROC [old: REF ClumpOfSockets, size: CARDINAL]
RETURNS [new: REF ClumpOfSockets] = {
new ← NEW[ClumpOfSockets[size]];
FOR i: CARDINAL IN [0..old.count) DO new[i] ← old[i]; ENDLOOP;
};
FindSocket: PROC [localSocket: Pup.Socket] RETURNS [h: Socket ← NIL] = {
socketNumber: LONG CARDINAL = PupType.CardFromSocket[localSocket];
IF socketNumber < maxWellKnownSockets THEN
h ← FindWellKnownSocket[socketNumber]
ELSE h ← FindEphemeralSocket[localSocket];
};
FindWellKnownSocket: PROC [socketNumber: CARDINAL] RETURNS [h: Socket ← NIL] = {
IF socketNumber < wellKnownSockets.count THEN h ← wellKnownSockets[socketNumber];
DO
IF h = NIL THEN RETURN;
IF ~h.dead THEN RETURN;
h ← h.next;
Hackery: look more if we encounter a dead one (echo server slot captured)
ENDLOOP;
};
FindEphemeralSocket: PROC [localSocket: Pup.Socket] RETURNS [h: Socket ← NIL] = {
index: CARDINAL = IndexFromSocket[localSocket];
IF index < ephemeralSockets.count THEN h ← ephemeralSockets[index];
IF h = NIL THEN RETURN;
IF localSocket # h.local.socket THEN h ← NIL;
};
RemoveSocket: INTERNAL PROC [old: Socket] = {
socketNumber: LONG CARDINAL = PupType.CardFromSocket[old.local.socket];
IF socketNumber < maxWellKnownSockets THEN RemoveWellKnownSocket[old, socketNumber]
ELSE RemoveEphemeralSocket[old];
old.next ← NIL;
};
RemoveWellKnownSocket: INTERNAL PROC [old: Socket, socketNumber: CARDINAL] = {
head: Socket ← wellKnownSockets[socketNumber];
IF head = old THEN {
wellKnownSockets[socketNumber] ← head.next;
RETURN; };
DO -- NIL fault if not on chain
IF head.next = old THEN { head.next ← old.next; EXIT; };
head ← head.next;
ENDLOOP;
};
RemoveEphemeralSocket: INTERNAL PROC [old: Socket] = {
index: CARDINAL = IndexFromSocket[old.local.socket];
IF ephemeralSockets[index] # old THEN ERROR; -- Or bounds fault if bogus
ephemeralSockets[index] ← NIL;
};
Incoming packets from Drivers
Counters for all the strange cases
errorTooShort: INT ← 0;
errorNoGateway: INT ← 0;
errorNoSocket: INT ← 0;
errorDeadSocket: INT ← 0;
errorBadChecksum: INT ← 0;
errorBuffersFull: INT ← 0;
TakeThis: PUBLIC PROC [network: Network, buffer: CommDriver.Buffer, bytes: NAT] RETURNS [CommDriver.Buffer] = {
b: Buffer = RealPupBuffer[buffer, recv];
bytesNeeded: NAT = PupBuffer.RoundUpForChecksum[b.byteLength];
dest: Pup.Address ← b.dest;
localSocket: Pup.Socket = dest.socket;
socket: Socket;
proc: ReceiveProc;
user: REF ANY;
IF bytes < bytesNeeded OR b.byteLength < bytesOfPupOverhead THEN {
errorTooShort ← errorTooShort.SUCC;
RETURN[RealDriverBuffer[b]]; };
BEGIN
Don't be surprised if you find krocks lurking in here. /HGM, April 1, 1986
SELECT TRUE FROM
(dest.host = network.pup.host) => {
SELECT TRUE FROM
(dest.net = network.pup.net) => NULL; -- Main line case
(network.pup.net = Pup.nullNet) => NULL; -- We don't know, he does
(dest.net = Pup.nullNet) => NULL; -- He doesn't know, we do
ENDCASE => GOTO TryBackDoor; };
(dest.host = Pup.allHosts) => {
SELECT TRUE FROM
(dest.net = network.pup.net) => NULL; -- Main line case
(network.pup.net = Pup.nullNet) => NULL; -- We don't know, he does
(dest.net = Pup.nullNet) => NULL; -- He doesn't know, we do
ENDCASE => GOTO TryBackDoor; };
ENDCASE => GOTO TryBackDoor;
EXITS TryBackDoor => {
[network, ] ← PupInternal.Route[dest]; -- Back door fixup
IF network = NIL THEN
network ← NARROW[b.ovh.network]; -- Argh. Our Initialization or his confusion
IF dest.net # network.pup.net OR dest.host # network.pup.host THEN
RETURN[RealDriverBuffer[forwarder[b]]]; };
END;
socket ← FindSocket[localSocket];
IF socket = NIL THEN {
errorNoSocket ← errorNoSocket.SUCC;
ReturnErrorNoFree[b, noSocket, "No socket at this machine"];
RETURN[RealDriverBuffer[b]]; };
IF socket.dead THEN {
errorDeadSocket ← errorDeadSocket.SUCC;
IF ~socket.noErrors THEN ReturnErrorNoFree[b, noSocket, "Socket has been deleted"];
RETURN[RealDriverBuffer[b]]; };
IF socket.recvChecksum THEN TRUSTED {
words: NAT = PupBuffer.WordsWithoutChecksum[b.byteLength];
checksumLoc: LONG POINTER TO HWORD ← @b.byteLength + words;
hisChecksum: HWORD ← checksumLoc^;
IF hisChecksum # PupBuffer.noChecksum THEN {
checksum: WORD;
IF outOfLineChecksum THEN
checksum ← OutOfLineChecksum[0, words, @b.byteLength]
ELSE checksum ← Checksum.ComputeChecksum[0, words, @b.byteLength];
IF checksum # hisChecksum THEN {
errorBadChecksum ← errorBadChecksum.SUCC;
ReturnErrorNoFree[b, badChecksum, "Bad software checksum"];
RETURN[RealDriverBuffer[b]]; }; }; };
[proc, user] ← CheckDirect[socket];
IF proc # NIL THEN {
new: Buffer;
b.ovh.socket ← socket;
new ← proc[socket, b, user];
IF new = NIL THEN RETURN[NIL];
UnbumpRecvCount[socket];
RETURN[RealDriverBuffer[new]]; };
IF TakeThisInner[socket, b] THEN RETURN[NIL]
ELSE {
errorBuffersFull ← errorBuffersFull.SUCC;
IF ~socket.noErrors THEN ReturnErrorNoFree[b, resourceLimits, "Buffer allocation limit"];
RETURN[RealDriverBuffer[b]]; };
};
CheckDirect: ENTRY PROC [socket: Socket] RETURNS [proc: ReceiveProc, user: REF ANY] = {
IF socket.proc = NIL THEN RETURN[NIL, NIL];
IF socket.recvBuffersInUse < socket.recvBuffersAlloc THEN {
socket.recvBuffersInUse ← socket.recvBuffersInUse.SUCC;
RETURN[socket.proc, socket.user]; };
RETURN[NIL, NIL];
};
UnbumpRecvCount: ENTRY PROC [socket: Socket] = INLINE {
socket.recvBuffersInUse ← socket.recvBuffersInUse.PRED;
};
TakeThisInner: ENTRY PROC [socket: Socket, b: Buffer] RETURNS [ok: BOOL] = {
IF socket.dead THEN {
CommDriver.FreeBuffer[RealDriverBuffer[b]];
RETURN[TRUE]; };
IF socket.recvBuffersInUse < socket.recvBuffersAlloc THEN {
socket.recvBuffersInUse ← socket.recvBuffersInUse.SUCC;
IF b.ovh.gap#gapNoList THEN
DebuggerSwap.CallDebugger["Buffer already in a list!"];
b.ovh.gap ← gapSocket; -- DKW: b is now in the socket queue
IF socket.firstInput = NIL THEN socket.firstInput ← b
ELSE socket.lastInput.ovh.next ← b;
socket.lastInput ← b;
NOTIFY socket.waitForInput;
RETURN[TRUE]; };
RETURN[FALSE];
};
Stubs for gatewaying
forwarder: PROC [b: Buffer] RETURNS [Buffer] ← DummyForwarder;
DummyForwarder: PROC [b: Buffer] RETURNS [Buffer] = {
errorNoGateway ← errorNoGateway.SUCC;
ReturnErrorNoFree[b, iAmNotAGateway, "I'm not a gateway (yet)"];
RETURN[b];
};
CaptureForwarding: PUBLIC PROC [proc: PROC [Buffer] RETURNS [Buffer] ] = {
forwarder ← proc;
};
Finalization
sfqMaxCount: INT ← 0; -- DKW: see how full sfq can get in 10 seconds
droppedSockets: INT ← 0;
finishedSockets: INT ← 0;
finishedErrors: INT ← 0;
finishedDelayed: INT ← 0;
finishedBatches: INT ← 0;
droppedBuffers: INT ← 0;
oneSecond: Process.Ticks = Process.SecondsToTicks[1];
SocketFinalizer: PROC = {
Process.SetPriority[Process.priorityBackground];
DO
sfqCount: INT ← 0; -- DKW: count the number of sockets in the queue each iteration
list: LIST OF Socket ← NIL; -- Batch up sockets to be Destroyed, avoiding 10 seconds each
DO
socket: Socket ← NARROW[SafeStorage.FQNext[sfq]];
sfqCount ← sfqCount+1;
IF ~socket.dead THEN { -- User forgot to call Destroy
SafeStorage.EnableFinalization[socket];
Destroy[socket];
droppedSockets ← droppedSockets.SUCC;
}
ELSE { -- Normal end of life
IF socket.noErrors THEN list ← CONS[socket, list]
ELSE DestroyInner[globalLock, socket];
finishedSockets ← finishedSockets.SUCC;
};
socket ← NIL;
IF SafeStorage.FQEmpty[sfq] THEN EXIT;
ENDLOOP;
sfqMaxCount ← MAX[sfqMaxCount, sfqCount];
IF list = NIL THEN LOOP;
Process.Pause[10*oneSecond];
UNTIL list = NIL DO
DestroyInner[globalLock, list.first];
finishedDelayed ← finishedDelayed.SUCC;
list ← list.rest;
ENDLOOP;
finishedBatches ← finishedBatches.SUCC;
ENDLOOP;
};
BufferFinalizer: PROC = {
Beware: See the comments near CommDriver.Buffer
Process.SetPriority[Process.priorityBackground];
DO
b: Buffer ← NARROW[SafeStorage.FQNext[bfq]];
SafeStorage.EnableFinalization[b];
FreeBuffer[b];
I saw a NIL fault in FreeBuffer because b.ovh.socket was NIL. I think that was a missing check for socket.dead in TakeThisInner. /HGM, May 86
b ← NIL;
droppedBuffers ← droppedBuffers.SUCC;
ENDLOOP;
};
DropTest: PROC [n: NAT] = {
FOR i: NAT IN [0..n) DO
socket: Socket ← CreateEphemeral[remote: Pup.nullAddress];
[] ← AllocBuffer[socket];
SetNoErrors[socket];
ENDLOOP;
};
bfq: SafeStorage.FinalizationQueue ← SafeStorage.NewFQ[];
sfq: SafeStorage.FinalizationQueue ← SafeStorage.NewFQ[length: 300];
DKW: Because of the pause in SocketFinalizer, this queue must hold 10 seconds worth of discarded sockets; 100 (10 per second) is not enough. The TrickleCharger, for example, does FS file lookups as fast as it can; each lookup uses an ephemeral socket. When sfq had only 100 elements, the socket finalizer eventually fell behind and the socket table filled up.
SafeStorage.EstablishFinalization[type: CODE[PupBuffer.BufferObject], npr: 0, fq: bfq];
SafeStorage.EstablishFinalization[type: CODE[Object], npr: 1, fq: sfq];
TRUSTED { Process.Detach[FORK SocketFinalizer[]]; };
TRUSTED { Process.Detach[FORK BufferFinalizer[]]; };
IF ~Booting.switches[c] THEN CommDriver.InsertReceiveProc[NIL, pup, TakeThis];
}.