Skip to content

Commit

Permalink
make ServerEngine methods async 4
Browse files Browse the repository at this point in the history
  • Loading branch information
erossignon committed Jun 18, 2023
1 parent 08024e5 commit f1e1061
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 128 deletions.
3 changes: 0 additions & 3 deletions packages/node-opcua-server/source/opcua_server.ts
Expand Up @@ -20,9 +20,6 @@ import * as utils from "node-opcua-utils";

import {
AddressSpace,
callMethodHelper,
ContinuationPoint,
IUserManager,
PseudoVariantBoolean,
PseudoVariantByteString,
PseudoVariantDateTime,
Expand Down
219 changes: 94 additions & 125 deletions packages/node-opcua-server/source/server_engine.ts
Expand Up @@ -12,12 +12,10 @@ import {
AddressSpace,
BaseNode,
bindExtObjArrayNode,
ensureDatatypeExtractedWithCallback,
ensureObjectIsSecure,
MethodFunctor,
removeElement,
SessionContext,
UADataType,
UADynamicVariableArray,
UAMethod,
UAObject,
Expand All @@ -31,7 +29,6 @@ import {
resolveOpaqueOnAddressSpace,
ContinuationData,
IServerBase,
UARole,
ensureDatatypeExtracted,
callMethodHelper
} from "node-opcua-address-space";
Expand Down Expand Up @@ -377,6 +374,13 @@ interface IAddressSpaceAccessor {
readNode(context: ISessionContext, nodeToRead: ReadValueIdOptions, timestampsToReturn?: TimestampsToReturn): Promise<DataValue>;
writeNode(context: ISessionContext, writeValue: WriteValue): Promise<StatusCode>;
callMethod(context: ISessionContext, methodToCall: CallMethodRequest): Promise<CallMethodResultOptions>;
historyReadNode(
context: ISessionContext,
nodeToRead: HistoryReadValueId,
historyReadDetails: HistoryReadDetails,
timestampsToReturn: TimestampsToReturn,
continuationData: ContinuationData
): Promise<HistoryReadResult>;
}

class AddressSpaceAccessor implements IAddressSpaceAccessor {
Expand Down Expand Up @@ -499,6 +503,64 @@ class AddressSpaceAccessor implements IAddressSpaceAccessor {
});
});
}

public async historyReadNode(
context: ISessionContext,
nodeToRead: HistoryReadValueId,
historyReadDetails: HistoryReadDetails,
timestampsToReturn: TimestampsToReturn,
continuationData: ContinuationData
): Promise<HistoryReadResult> {
assert(context instanceof SessionContext);

const nodeId = nodeToRead.nodeId;
const indexRange = nodeToRead.indexRange;
const dataEncoding = nodeToRead.dataEncoding;
const continuationPoint = nodeToRead.continuationPoint;

timestampsToReturn = coerceTimestampsToReturn(timestampsToReturn);
if (timestampsToReturn === TimestampsToReturn.Invalid) {
return new HistoryReadResult({ statusCode: StatusCodes.BadTimestampsToReturnInvalid });
}

const obj = this.__findNode(nodeId) as UAVariable;

if (!obj) {
// may be return BadNodeIdUnknown in dataValue instead ?
// Object Not Found
return new HistoryReadResult({ statusCode: StatusCodes.BadNodeIdUnknown });
} else {
// istanbul ignore next
if (!obj.historyRead) {
// note : Object and View may also support historyRead to provide Event historical data
// todo implement historyRead for Object and View
const msg =
" this node doesn't provide historyRead! probably not a UAVariable\n " +
obj.nodeId.toString() +
" " +
obj.browseName.toString() +
"\n" +
"with " +
nodeToRead.toString() +
"\n" +
"HistoryReadDetails " +
historyReadDetails.toString();
if (doDebug) {
debugLog(chalk.cyan("ServerEngine#_historyReadSingleNode "), chalk.white.bold(msg));
}
throw new Error(msg);
}
// check access
// BadUserAccessDenied
// BadNotReadable
// invalid attributes : BadNodeAttributesInvalid
// invalid range : BadIndexRangeInvalid
const result = await obj.historyRead(context, historyReadDetails, indexRange, dataEncoding, continuationData);
assert(result!.statusCode instanceof StatusCode);
assert(result!.isValid());
return result;
}
}
}
/**
*
Expand Down Expand Up @@ -1594,45 +1656,35 @@ export class ServerEngine extends EventEmitter implements IAddressSpaceAccessor
/**
*
*/
public historyReadSingleNode(
public async historyReadSingleNode(
context: ISessionContext,
nodeId: NodeId,
attributeId: AttributeIds,
historyReadDetails: ReadRawModifiedDetails | ReadEventDetails | ReadProcessedDetails | ReadAtTimeDetails,
timestampsToReturn: TimestampsToReturn,
continuationData: ContinuationData,
callback: (err: Error | null, results?: HistoryReadResult) => void
): void {
continuationData: ContinuationData
): Promise<HistoryReadResult> {
if (timestampsToReturn === TimestampsToReturn.Invalid) {
callback(
null,
new HistoryReadResult({
statusCode: StatusCodes.BadTimestampsToReturnInvalid
})
);
return;
return new HistoryReadResult({
statusCode: StatusCodes.BadTimestampsToReturnInvalid
});
}
assert(context instanceof SessionContext);
this._historyReadSingleNode(
return await this.historyReadNode(
context,
new HistoryReadValueId({
nodeId
}),
historyReadDetails,
timestampsToReturn,
continuationData,
callback
continuationData
);
}

public async historyRead(
context: ISessionContext,
historyReadRequest: HistoryReadRequest
): Promise<HistoryReadResult[]> {

public async historyRead(context: ISessionContext, historyReadRequest: HistoryReadRequest): Promise<HistoryReadResult[]> {
assert(context instanceof SessionContext);
assert(historyReadRequest instanceof HistoryReadRequest);

const timestampsToReturn = historyReadRequest.timestampsToReturn;
const historyReadDetails = historyReadRequest.historyReadDetails! as HistoryReadDetails;
const releaseContinuationPoints = historyReadRequest.releaseContinuationPoints;
Expand All @@ -1650,22 +1702,10 @@ export class ServerEngine extends EventEmitter implements IAddressSpaceAccessor
}

const _q = async (m: M): Promise<HistoryReadResult> => {
return new Promise((resolve) => {
const continuationPoint = m.nodeToRead.continuationPoint;
this._historyReadSingleNode(
context,
m.nodeToRead,
m.processDetail,
timestampsToReturn,
{ continuationPoint, releaseContinuationPoints /**, index = ??? */ },
(err: Error | null, result?: any) => {
if (err && !result) {
errorLog("Internal error", err.message);
result = new HistoryReadResult({ statusCode: StatusCodes.BadInternalError });
}
resolve(result);
}
);
const continuationPoint = m.nodeToRead.continuationPoint;
return await this.historyReadNode(context, m.nodeToRead, m.processDetail, timestampsToReturn, {
continuationPoint,
releaseContinuationPoints /**, index = ??? */
});
};

Expand All @@ -1687,28 +1727,17 @@ export class ServerEngine extends EventEmitter implements IAddressSpaceAccessor
promises.push(_q({ nodeToRead, processDetail, index }));
index++;
}

const results: HistoryReadResult[] = await Promise.all(promises);
return results;
}

const _r = async (nodeToRead: HistoryReadValueId, index: number) => {
const continuationPoint = nodeToRead.continuationPoint;
return new Promise<HistoryReadResult>((resolve, reject) => {
this._historyReadSingleNode(
context,
nodeToRead,
historyReadDetails,
timestampsToReturn,
{ continuationPoint, releaseContinuationPoints, index },
(err: Error | null, result?: any) => {
if (err && !result) {
result = new HistoryReadResult({ statusCode: StatusCodes.BadInternalError });
}
resolve(result);
// it's not guaranteed that the historical read process is really asynchronous
}
);
return await this.historyReadNode(context, nodeToRead, historyReadDetails, timestampsToReturn, {
continuationPoint,
releaseContinuationPoints,
index
});
};
const promises: Promise<HistoryReadResult>[] = [];
Expand Down Expand Up @@ -2203,80 +2232,20 @@ export class ServerEngine extends EventEmitter implements IAddressSpaceAccessor
);
}

private _historyReadSingleNode(
public async historyReadNode(
context: ISessionContext,
nodeToRead: HistoryReadValueId,
historyReadDetails: HistoryReadDetails,
timestampsToReturn: TimestampsToReturn,
continuationData: ContinuationData,
callback: CallbackT<HistoryReadResult>
): void {
assert(context instanceof SessionContext);
assert(typeof callback === "function");

const nodeId = nodeToRead.nodeId;
const indexRange = nodeToRead.indexRange;
const dataEncoding = nodeToRead.dataEncoding;
const continuationPoint = nodeToRead.continuationPoint;

timestampsToReturn = coerceTimestampsToReturn(timestampsToReturn);
if (timestampsToReturn === TimestampsToReturn.Invalid) {
return callback(null, new HistoryReadResult({ statusCode: StatusCodes.BadTimestampsToReturnInvalid }));
}

const obj = this.__findNode(nodeId) as UAVariable;

if (!obj) {
// may be return BadNodeIdUnknown in dataValue instead ?
// Object Not Found
callback(null, new HistoryReadResult({ statusCode: StatusCodes.BadNodeIdUnknown }));
return;
} else {
// istanbul ignore next
if (!obj.historyRead) {
// note : Object and View may also support historyRead to provide Event historical data
// todo implement historyRead for Object and View
const msg =
" this node doesn't provide historyRead! probably not a UAVariable\n " +
obj.nodeId.toString() +
" " +
obj.browseName.toString() +
"\n" +
"with " +
nodeToRead.toString() +
"\n" +
"HistoryReadDetails " +
historyReadDetails.toString();
if (doDebug) {
debugLog(chalk.cyan("ServerEngine#_historyReadSingleNode "), chalk.white.bold(msg));
}
const err = new Error(msg);
// object has no historyRead method
setImmediate(callback.bind(null, err));
return;
}
// check access
// BadUserAccessDenied
// BadNotReadable
// invalid attributes : BadNodeAttributesInvalid
// invalid range : BadIndexRangeInvalid
obj.historyRead(
context,
historyReadDetails,
indexRange,
dataEncoding,
continuationData,
(err: Error | null, result?: HistoryReadResult) => {
if (err || !result) {
return callback(err);
}
assert(result!.statusCode instanceof StatusCode);
assert(result!.isValid());
// result = apply_timestamps(result, timestampsToReturn, attributeId);
callback(err, result);
}
);
}
continuationData: ContinuationData
): Promise<HistoryReadResult> {
return this.addressSpaceAccessor!.historyReadNode(
context,
nodeToRead,
historyReadDetails,
timestampsToReturn,
continuationData
);
}

/**
Expand Down

0 comments on commit f1e1061

Please sign in to comment.