SunYPAgentImpl.mesa
Demers, September 25, 1987 5:23:00 pm PDT
DIRECTORY
Arpa USING [Address, nullAddress],
ArpaUDP USING [nullPort, Port],
Ascii USING [CR, FF, LF, SP, TAB],
Basics USING [HFromCard16],
BasicTime USING [earliestGMT, GMT, Now, Period],
Process USING [Detach, PauseMsec],
RefText USING [Append, New, ObtainScratch, ReleaseScratch],
Rope USING [FromRefText, ROPE, ToRefText],
SunPMap USING [ipProtocolUDP, udpPort],
SunPMapClient USING [GetPort],
SunRPC USING [Create, Destroy, Error, Handle, SetRemote],
SunRPCAuth USING [Conversation, Initiate, Terminate],
SunYP USING [EachMapNameProc, program, programVersion, ResponseKeyVal, ResponseVal, Status],
SunYPAgent USING [TextSeq, TextSeqObject],
SunYPClient USING [Domain, First, Maplist, Match, Next, Null],
SymTab USING [Create, Fetch, Ref, Store, Update, UpdateAction, Val]
;
SunYPAgentImpl: CEDAR MONITOR
LOCKS lock USING lock: Lock
IMPORTS Basics, BasicTime, Process, RefText, Rope, SunPMapClient, SunRPC, SunRPCAuth, SunYPClient, SymTab
EXPORTS SunYPAgent
~ {
Parameters
msecBetweenSweeps: CARD ← 11111;
initialServerTTL: CARDINAL ← 300;
initialCachedValueTTL: CARDINAL ← 300;
maxCachedValues: CARDINAL ← 31;
defaultDomainName: ROPE ← "PARC"; -- should be in SystemSite or something ???
Types
ROPE: TYPE ~ Rope.ROPE;
Lock: TYPE ~ REF LockObject;
LockObject: TYPE ~ MONITORED RECORD [];
Error: PUBLIC ERROR [code: ATOM] ~ CODE;
Server Cache
Server: TYPE ~ REF ServerObject;
ServerObject: TYPE ~ RECORD [
next: Server,
address: Arpa.Address,
port: ArpaUDP.Port,
ttl: CARDINAL,
down: BOOL
];
serverTab: SymTab.Ref ← SymTab.Create[];
serverLock: Lock ~ NEW[LockObject];
serverList: Server ← NIL;
nullServerLock: Lock ~ NEW[LockObject];
nullServerList: Server ← NIL;
GetServerByAddressAndPort: ENTRY PROC [address: Arpa.Address, port: ArpaUDP.Port, lock: Lock ← serverLock] RETURNS [server: Server] ~ {
FOR server ← serverList, server.next
WHILE (server # NIL) AND ((server.address # address) OR (server.port # port))
DO NULL ENDLOOP;
IF server = NIL THEN {
server ← NEW[ServerObject ← [serverList, address, port, initialServerTTL, FALSE]];
serverList ← server;
};
};
SweepServers: ENTRY PROC [seconds: CARD, lock: Lock ← serverLock] ~ {
FOR server: Server ← serverList, server.next WHILE server # NIL DO
server.ttl ← (IF server.ttl > seconds THEN server.ttl - seconds ELSE 0);
ENDLOOP;
};
IsNullServer: PROC [server: Server] RETURNS [BOOL] ~ INLINE {
RETURN [server.address = Arpa.nullAddress] };
GetNullServer: ENTRY PROC [lock: Lock ← nullServerLock] RETURNS [server: Server] ~ {
server ← NEW[ServerObject ← [nullServerList, Arpa.nullAddress, ArpaUDP.nullPort, initialServerTTL, FALSE]];
nullServerList ← server;
};
SweepNullServers: ENTRY PROC [seconds: CARD, lock: Lock ← nullServerLock] ~ {
prev, server: Server;
prev ← NIL; server ← nullServerList;
WHILE server # NIL DO
IF server.ttl > seconds
THEN {
server.ttl ← server.ttl - seconds;
prev ← server;
}
ELSE {
server.ttl ← 0;
IF prev = NIL THEN nullServerList ← server.next ELSE prev.next ← server.next;
};
server ← server.next;
ENDLOOP;
};
GetServerForDomain: PROC [domain: ROPE] RETURNS [server: Server] ~ {
server ← NARROW[SymTab.Fetch[serverTab, domain].val];
DO
SELECT TRUE FROM
(server = NIL) => {
address: Arpa.Address;
port: ArpaUDP.Port;
[address, port] ← BroadcastForServer[domain];
IF address # Arpa.nullAddress
THEN server ← GetServerByAddressAndPort[address, port]
ELSE server ← GetNullServer[];
[] ← SymTab.Store[serverTab, domain, server];
EXIT;
};
(server.ttl = 0) => {
IF NOT ServerResponding[server]
THEN { server ← NIL; LOOP };
EXIT;
};
ENDCASE => {
IF (server.down) AND (server.address # Arpa.nullAddress)
THEN { server ← NIL; LOOP };
EXIT;
};
ENDLOOP;
};
BroadcastForServer: PROC [domain: ROPE]
RETURNS [address: Arpa.Address, port: ArpaUDP.Port] ~ {
Not yet implemented!
ENABLE Error => { address ← Arpa.nullAddress; port ← ArpaUDP.nullPort; CONTINUE };
server: Server ~ GetServerByAddress[[13,1,100,208], domain]; -- Palain
address ← server.address; port ← server.port;
};
GetServerByAddress: PROC [address: Arpa.Address, domain: ROPE]
RETURNS [server: Server] ~ {
h: SunRPC.Handle ← SunRPC.Create[remoteAddress~address, remotePort~Basics.HFromCard16[SunPMap.udpPort]];
c: SunRPCAuth.Conversation ← SunRPCAuth.Initiate[];
errorCode: ATOMNIL;
port: CARD;
{
ENABLE {
SunRPC.Error => { errorCode ← code; CONTINUE };
};
port ← SunPMapClient.GetPort[h, c, SunYP.program, SunYP.programVersion, SunPMap.ipProtocolUDP];
IF (errorCode = NIL) THEN {
IF port = 0
THEN {
errorCode ← $noYP;
}
ELSE {
h ← SunRPC.SetRemote[h, address, Basics.HFromCard16[port]];
IF NOT SunYPClient.Domain[h, c, domain] THEN errorCode ← $domainNotFound;
};
};
};
SunRPC.Destroy[h];
SunRPCAuth.Terminate[c];
IF errorCode = NIL
THEN {
server ← GetServerByAddressAndPort[address, Basics.HFromCard16[port]];
server.down ← FALSE;
server.ttl ← initialServerTTL;
}
ELSE {
ERROR Error[errorCode];
};
};
ServerResponding: PROC [server: Server] RETURNS [responding: BOOLTRUE] ~ {
h: SunRPC.Handle;
c: SunRPCAuth.Conversation;
IF IsNullServer[server] THEN RETURN [FALSE];
h ← SunRPC.Create[remoteAddress~server.address, remotePort~server.port];
c ← SunRPCAuth.Initiate[];
SunYPClient.Null[h, c
! SunRPC.Error => { responding ← FALSE; CONTINUE }];
server.down ← NOT responding;
server.ttl ← initialServerTTL;
SunRPC.Destroy[h];
SunRPCAuth.Terminate[c];
};
Default Domain
ROPE assignment is atomic, so ... manipulate defaultDomainName without ML.
GetDefaultDomain: PUBLIC PROC RETURNS [domainName: ROPE] ~ {
RETURN [defaultDomainName];
};
SetDefaultDomain: PUBLIC PROC [domainName: ROPE] ~ {
defaultDomainName ← domainName;
};
Handles
Handle: TYPE ~ REF Object;
Object: PUBLIC TYPE ~ RECORD [
domain: ROPE,
server: Server,
rpcHandle: SunRPC.Handle,
conversation: SunRPCAuth.Conversation
];
ObtainHandle: PUBLIC PROC [domainName: ROPE, conversation: SunRPCAuth.Conversation, serverAddress: Arpa.Address] RETURNS [h: Handle] ~ {
server: Server;
rpcHandle: SunRPC.Handle;
IF domainName = NIL THEN domainName ← defaultDomainName;
IF serverAddress = Arpa.nullAddress
THEN {
server ← GetServerForDomain[domainName];
IF IsNullServer[server] THEN ERROR Error[$domainNotFound];
}
ELSE {
server ← GetServerByAddress[serverAddress, domainName];
};
IF conversation = NIL THEN conversation ← SunRPCAuth.Initiate[];
rpcHandle ← SunRPC.Create[remoteAddress~server.address, remotePort~server.port];
h ← NEW[Object ← [domain~domainName, server~server, rpcHandle~rpcHandle, conversation~conversation]];
};
RefreshHandle: PUBLIC PROC [h: Handle, conversation: SunRPCAuth.Conversation, serverAddress: Arpa.Address] ~ {
server: Server;
IF serverAddress = Arpa.nullAddress
THEN {
server ← GetServerForDomain[h.domain];
IF IsNullServer[server] THEN ERROR Error[$domainNotFound];
}
ELSE {
server ← GetServerByAddress[serverAddress, h.domain];
};
SunRPCAuth.Terminate[h.conversation];
h.conversation ← IF conversation # NIL THEN conversation ELSE SunRPCAuth.Initiate[];
IF server # h.server THEN {
h.server ← server;
h.rpcHandle ← SunRPC.SetRemote[h.rpcHandle, server.address, server.port];
};
};
ReleaseHandle: PUBLIC PROC [h: Handle] ~ {
IF h.rpcHandle # NIL THEN { SunRPC.Destroy[h.rpcHandle]; h.rpcHandle ← NIL };
h.server ← NIL;
IF h.conversation # NIL THEN { SunRPCAuth.Terminate[h.conversation]; h.conversation ← NIL };
};
Match Response Cache
CachedValue: TYPE ~ REF CachedValueObject;
CachedValueObject: TYPE ~ RECORD [
next, prev: CachedValue,
ttl: CARDINAL ← initialCachedValueTTL,
busy: CARDINAL ← 0,
value: REF TEXT,
key: ROPE,
table: SymTab.Ref
];
cachedValueLock: Lock ~ NEW[LockObject];
cachedValueHead, cachedValueTail: CachedValue;
numCachedValues: CARD ← 0;
cachedValueAvailable: CONDITION;
tabByDomain: SymTab.Ref ~ SymTab.Create[];
RemoveCachedValueFromTable: PROC [cV: CachedValue] ~ {
value: SymTab.Val ~ cV;
DeleteValue: SymTab.UpdateAction -- [found, val] RETURNS [op, new] -- ~ {
IF found AND val = value
THEN RETURN [delete, NIL]
ELSE RETURN [none, NIL];
};
IF (cV.table = NIL) OR (cV.key = NIL) THEN ERROR;
SymTab.Update[cV.table, cV.key, DeleteValue];
};
RemoveCachedValueFromList: INTERNAL PROC [cV: CachedValue] ~ {
IF cV = cachedValueTail
THEN cachedValueTail ← cV.prev
ELSE cV.next.prev ← cV.prev;
IF cV = cachedValueHead
THEN cachedValueHead ← cV.next
ELSE cV.prev.next ← cV.next;
};
LookupCachedValue: ENTRY PROC [domainName, mapName, key: ROPE, lock: Lock ← cachedValueLock] RETURNS [cV: CachedValue] ~ {
tabByMap, tabByKey: SymTab.Ref;
GetTabByMap: SymTab.UpdateAction -- [found, val] RETURNS [op, new] -- ~ {
IF found
THEN { tabByMap ← NARROW[val]; RETURN [none, NIL] }
ELSE { tabByMap ← SymTab.Create[]; RETURN [store, tabByMap] };
};
GetTabByKey: SymTab.UpdateAction -- [found, val] RETURNS [op, new] -- ~ {
IF found
THEN { tabByKey ← NARROW[val]; RETURN [none, NIL] }
ELSE { tabByKey ← SymTab.Create[]; RETURN [store, tabByKey] };
};
SymTab.Update[tabByDomain, domainName, GetTabByMap];
SymTab.Update[tabByMap, mapName, GetTabByKey];
cV ← NARROW[SymTab.Fetch[tabByKey, key].val];
SELECT TRUE FROM
(cV = NIL) => {
DO
SELECT TRUE FROM
(numCachedValues < maxCachedValues) => {
cV ← NEW[CachedValueObject];
numCachedValues ← numCachedValues + 1;
EXIT;
};
(cachedValueHead = NIL) => {
WAIT cachedValueAvailable;
LOOP;
};
ENDCASE => {
cV ← cachedValueTail;
RemoveCachedValueFromTable[cV];
RemoveCachedValueFromList[cV];
EXIT;
};
ENDLOOP;
cV.ttl ← initialCachedValueTTL;
cV.key ← key;
cV.value ← NIL;
cV.table ← tabByKey;
};
(cV.busy = 0) => {
RemoveCachedValueFromList[cV];
};
ENDCASE;
cV.busy ← cV.busy + 1;
};
CacheValue: ENTRY PROC [cV: CachedValue, newValue: REF TEXTNIL, lock: Lock ← cachedValueLock] ~ {
IF newValue # NIL THEN {
IF cV.busy # 1 THEN ERROR;
cV.value ← newValue;
[] ← SymTab.Store[cV.table, cV.key, cV];
};
IF (cV.busy ← cV.busy - 1) = 0 THEN {
IF cachedValueHead = NIL
THEN {
cV.next ← cV.prev ← NIL;
cachedValueHead ← cachedValueTail ← cV;
NOTIFY cachedValueAvailable;
}
ELSE {
cV.next ← cachedValueHead;
cV.prev ← NIL;
cachedValueHead.prev ← cV;
cachedValueHead ← cV;
};
};
};
FreeCachedValue: INTERNAL PROC [cV: CachedValue, lock: Lock] ~ {
cV.key ← NIL;
cV.value ← NIL;
cV.table ← NIL;
cV.next ← cV.prev ← NIL;
numCachedValues ← numCachedValues - 1;
NOTIFY cachedValueAvailable;
};
ReleaseCacheEntry: ENTRY PROC [cV: CachedValue, lock: Lock ← cachedValueLock] ~ {
IF cV.busy # 1 THEN ERROR;
FreeCachedValue[cV, lock];
};
SweepCachedValues: ENTRY PROC [seconds: CARD, lock: Lock ← cachedValueLock] ~ {
p, next: CachedValue;
FOR p ← cachedValueHead, next WHILE p # NIL DO
next ← p.next;
IF p.ttl > seconds
THEN {
p.ttl ← p.ttl - seconds;
}
ELSE {
IF p.busy # 0 THEN ERROR;
RemoveCachedValueFromTable[p];
RemoveCachedValueFromList[p];
FreeCachedValue[p, lock];
};
ENDLOOP;
};
Public Procedures
MapErrorCode: PROC [code: ATOM, h: Handle] RETURNS [mappedCode: ATOM] ~ {
h.server.down ← TRUE;
h.server.ttl ← initialServerTTL;
mappedCode ← code;
};
MapErrorStatus: PROC [status: SunYP.Status, h: Handle] RETURNS [mappedCode: ATOM] ~ {
h.server.down ← FALSE;
h.server.ttl ← initialServerTTL;
mappedCode ← (SELECT status FROM
true => NIL,
nomore => $noMoreEntries,
nomap => $mapNotFound,
nodomain => $domainNotFound,
nokey => $keyNotFound,
ENDCASE => $protocol);
};
Match: PUBLIC PROC [h: Handle, map: ROPE, key: ROPE] RETURNS [val: REF TEXT] ~ {
cV: CachedValue;
errorCode: ATOMNIL;
response: SunYP.ResponseVal;
cV ← LookupCachedValue[h.domain, map, key];
IF cV.value # NIL THEN {
val ← cV.value;
CacheValue[cV];
RETURN;
};
{
ENABLE {
SunRPC.Error => { errorCode ← MapErrorCode[code, h]; CONTINUE };
};
response ← SunYPClient.Match[h.rpcHandle, h.conversation, [domain~h.domain, map~map, key~Rope.ToRefText[key]]];
errorCode ← MapErrorStatus[response.status, h];
};
IF errorCode = NIL
THEN {
val ← response.val;
CacheValue[cV, val];
RETURN;
}
ELSE {
ReleaseCacheEntry[cV];
ERROR Error[errorCode];
};
};
First: PUBLIC PROC [h: Handle, map: ROPE] RETURNS [key: ROPE, val: REF TEXT] ~ {
response: SunYP.ResponseKeyVal;
errorCode: ATOMNIL;
{
ENABLE {
SunRPC.Error => { errorCode ← MapErrorCode[code, h]; CONTINUE };
};
response ← SunYPClient.First[h.rpcHandle, h.conversation, [domain~h.domain, map~map]];
errorCode ← MapErrorStatus[response.status, h];
};
IF errorCode # NIL THEN ERROR Error[errorCode];
RETURN [Rope.FromRefText[response.key], response.val];
};
Next: PUBLIC PROC [h: Handle, map: ROPE, keyBefore: ROPE]
RETURNS [key: ROPE, val: REF TEXT] ~ {
response: SunYP.ResponseKeyVal;
errorCode: ATOMNIL;
{
ENABLE {
SunRPC.Error => { errorCode ← MapErrorCode[code, h]; CONTINUE };
};
response ← SunYPClient.Next[h.rpcHandle, h.conversation, [domain~h.domain, map~map, key~Rope.ToRefText[keyBefore]]];
errorCode ← MapErrorStatus[response.status, h];
};
IF errorCode # NIL THEN ERROR Error[errorCode];
RETURN [Rope.FromRefText[response.key], response.val];
};
MapList: PUBLIC PROC [h: Handle, eachMap: SunYP.EachMapNameProc] ~ {
status: SunYP.Status;
errorCode: ATOMNIL;
{
ENABLE {
SunRPC.Error => { errorCode ← MapErrorCode[code, h]; CONTINUE };
};
status ← SunYPClient.Maplist[h.rpcHandle, h.conversation, h.domain, eachMap];
errorCode ← MapErrorStatus[status, h];
};
IF errorCode # NIL THEN ERROR Error[errorCode];
};
Utilities
ExpandTextSeq: PROC [s: SunYPAgent.TextSeq] RETURNS [new: SunYPAgent.TextSeq] ~ {
new ← NEW[SunYPAgent.TextSeqObject[2*s.maxLength]];
FOR i: CARDINAL IN [0 .. s.length) DO
new.refText[i] ← s.refText[i];
ENDLOOP;
};
IsWhiteSpace: PROC [c: CHAR] RETURNS [isWhite: BOOL] ~ {
OPEN Ascii;
isWhite ← (SELECT c FROM
TAB, LF, FF, CR, SP => TRUE,
ENDCASE => FALSE);
};
TokenizeUsingSeparator: PUBLIC PROC [in: REF READONLY TEXT, sepChar: CHAR, trimLeadingSpace, trimTrailingSpace: BOOL] RETURNS [out: SunYPAgent.TextSeq]
~ {
i, iTo: CARDINAL;
scratch: REF TEXT ← RefText.ObtainScratch[in.length];
out ← NEW[SunYPAgent.TextSeqObject[10]];
out.length ← 0;
i ← 0;
DO
IF i >= in.length THEN EXIT;
IF trimLeadingSpace AND IsWhiteSpace[in[i]] THEN { i ← i + 1; LOOP };
iTo ← 0;
DO
IF (i >= in.length) OR (in[i] = sepChar) THEN EXIT;
scratch[iTo] ← in[i];
i ← i + 1;
iTo ← iTo + 1;
ENDLOOP;
IF trimTrailingSpace THEN
WHILE (iTo > 0) AND IsWhiteSpace[scratch[iTo-1]] DO iTo ← iTo - 1 ENDLOOP;
scratch.length ← iTo;
IF out.length >= out.maxLength THEN out ← ExpandTextSeq[out];
out.refText[out.length] ← RefText.Append[to~RefText.New[scratch.length], from~scratch];
out.length ← out.length + 1;
i ← i + 1;
ENDLOOP;
RefText.ReleaseScratch[scratch];
};
Tokenize: PUBLIC PROC [in: REF READONLY TEXT]
RETURNS [out: SunYPAgent.TextSeq]
~ {
i, iTo: CARDINAL;
scratch: REF TEXT ← RefText.ObtainScratch[in.length];
out ← NEW[SunYPAgent.TextSeqObject[8]];
out.length ← 0;
i ← 0;
DO
IF i >= in.length THEN EXIT;
iTo ← 0;
DO
IF i >= in.length THEN EXIT;
IF IsWhiteSpace[in[i]] THEN EXIT;
scratch[iTo] ← in[i];
i ← i + 1;
iTo ← iTo + 1;
ENDLOOP;
scratch.length ← iTo;
IF out.length >= out.maxLength THEN out ← ExpandTextSeq[out];
out.refText[out.length] ← RefText.Append[to~RefText.New[scratch.length], from~scratch];
out.length ← out.length + 1;
i ← i + 1;
WHILE (i < in.length) AND (IsWhiteSpace[in[i]]) DO i ← i + 1 ENDLOOP;
ENDLOOP;
RefText.ReleaseScratch[scratch];
};
Daemon
lastSweepTime: BasicTime.GMT ← BasicTime.earliestGMT;
Daemon: PROC ~ {
DO
thisSweepTime: BasicTime.GMT ~ BasicTime.Now[];
secondsSinceLastSweep: INT ~ BasicTime.Period[from~lastSweepTime, to~thisSweepTime];
secs: CARD;
IF secondsSinceLastSweep < 0 THEN ERROR;
secs ← secondsSinceLastSweep;
SweepCachedValues[secs];
SweepServers[secs];
SweepNullServers[secs];
Process.PauseMsec[msecBetweenSweeps];
ENDLOOP;
};
TRUSTED { Process.Detach[ FORK Daemon[] ] };
}...