Skip to content

Commit

Permalink
improve client reconnection when ActivateSession returns BadUserAcces…
Browse files Browse the repository at this point in the history
…sDenied
  • Loading branch information
erossignon committed Jun 29, 2023
1 parent 4dfb651 commit 283bd1f
Show file tree
Hide file tree
Showing 8 changed files with 112 additions and 56 deletions.
Expand Up @@ -384,9 +384,13 @@ export class ClientBaseImpl extends OPCUASecureObject implements OPCUAClientBase
private _instanceNumber: number;
private _transportSettings: TransportSettings;
private _transportTimeout?: number;

public clientCertificateManager: OPCUACertificateManager;

public isUnusable() {
return this._internalState === "disconnected" || this._internalState === "disconnecting";
}

protected _setInternalState(internalState: InternalClientState): void {
const previousState = this._internalState;
if (doDebug || traceInternalState) {
Expand Down
Expand Up @@ -1797,6 +1797,7 @@ export class ClientSessionImpl extends EventEmitter implements ClientSession {
public hasBeenClosed(): boolean {
return isNullOrUndefined(this._client) || this._closed || this._closeEventHasBeenEmitted;
}


public async call(methodToCall: CallMethodRequestLike): Promise<CallMethodResult>;
public async call(methodToCall: CallMethodRequestLike[]): Promise<CallMethodResult[]>;
Expand Down
2 changes: 2 additions & 0 deletions packages/node-opcua-client/source/private/i_private_client.ts
Expand Up @@ -28,4 +28,6 @@ export interface IClientBase {
getTransportSettings(): IBasicTransportSettings;

requestReconnection(): void;

isUnusable(): boolean;
}
15 changes: 12 additions & 3 deletions packages/node-opcua-client/source/private/opcua_client_impl.ts
Expand Up @@ -312,6 +312,7 @@ function _adjustRevisedSessionTimeout(revisedSessionTimeout: number, requestedTi
*/
export class OPCUAClientImpl extends ClientBaseImpl implements OPCUAClient {
public static minimumRevisedSessionTimeout = 100.0;
private _retryCreateSessionTimer?: NodeJS.Timer;

public static create(options: OPCUAClientOptions): OPCUAClient {
return new OPCUAClientImpl(options);
Expand Down Expand Up @@ -430,12 +431,15 @@ export class OPCUAClientImpl extends ClientBaseImpl implements OPCUAClient {
// we do not have a connection anymore
return callback(new Error("Connection is closed"));
}
if (this._internalState === "disconnected" || this._internalState === "disconnecting") {
return callback(new Error(`disconnecting`));
}
return this.createSession(args[0], (err: Error | null, session?: ClientSession) => {
if (err && err.message.match(/BadTooManySessions/)) {
const delayToRetry = 5; // seconds
errorLog(`TooManySession .... we need to retry later ... in ${delayToRetry} secondes`);
const retryCreateSessionTimer = setTimeout(() => {
errorLog("TooManySession .... now retrying");
errorLog(`TooManySession .... we need to retry later ... in ${delayToRetry} secondes ${this._internalState}`);
this._retryCreateSessionTimer = setTimeout(() => {
errorLog(`TooManySession .... now retrying (${this._internalState})`);
this.createSession2(userIdentityInfo, callback);
}, delayToRetry * 1000);
return;
Expand Down Expand Up @@ -474,6 +478,11 @@ export class OPCUAClientImpl extends ClientBaseImpl implements OPCUAClient {
* @param args
*/
public closeSession(...args: any[]): any {

if (this._retryCreateSessionTimer) {
clearTimeout(this._retryCreateSessionTimer);
this._retryCreateSessionTimer = undefined;
}
super.closeSession(...args);
}

Expand Down
118 changes: 72 additions & 46 deletions packages/node-opcua-client/source/reconnection.ts
Expand Up @@ -18,10 +18,10 @@ import { ClientSessionImpl, Reconnectable } from "./private/client_session_impl"
import { ClientSubscriptionImpl } from "./private/client_subscription_impl";
import { IClientBase } from "./private/i_private_client";

const debugLog = make_debugLog(__filename);
const doDebug = checkDebugFlag(__filename);
const errorLog = make_errorLog(__filename);
const warningLog = make_warningLog(__filename);
const debugLog = make_debugLog("RECONNECTION");
const doDebug = checkDebugFlag("RECONNECTION");
const errorLog = make_errorLog("RECONNECTION");
const warningLog = make_warningLog("RECONNECTION");

//
// a new secure channel has be created, we need to reactivate the corresponding session,
Expand Down Expand Up @@ -167,6 +167,9 @@ function repair_client_session_by_recreating_a_new_session(
session: ClientSessionImpl,
callback: (err?: Error) => void
) {
if (session._client && session._client.isUnusable()) {
return callback(new Error("Client is unusable"));
}
// As we don"t know if server has been rebooted or not,
// and may be upgraded in between, we have to invalidate the extra data type manager
invalidateExtraDataTypeManager(session);
Expand Down Expand Up @@ -215,25 +218,25 @@ function repair_client_session_by_recreating_a_new_session(
newSession,
newSession.userIdentityInfo!,
(err: Error | null, session1?: ClientSessionImpl) => {
doDebug &&
debugLog(
chalk.bgWhite.cyan(" => activating a new session .... Done err=", err ? err.message : "null")
);
doDebug && debugLog(" => activating a new session .... Done err=", err ? err.message : "null");
if (err) {
doDebug &&
debugLog(
chalk.bgWhite.cyan(
"reactivation of the new session has failed: let be smart and close it before failing this repair attempt"
)
"reactivation of the new session has failed: let be smart and close it before failing this repair attempt"
);
// but just on the server side, not on the client side
const closeSessionRequest = new CloseSessionRequest({
requestHeader: {
authenticationToken: newSession.authenticationToken
},
deleteSubscriptions: true
});
session.performMessageTransaction(closeSessionRequest, (err2?: Error | null) => {
newSession._client!.performMessageTransaction(closeSessionRequest, (err2?: Error | null) => {
if (err2) {
warningLog("closing session", err2.message);
}
doDebug && debugLog("the temporary replacement session is now closed");
doDebug && debugLog(" err ", err.message, "propagated upwards");
innerCallback(err);
});
} else {
Expand Down Expand Up @@ -384,6 +387,7 @@ function repair_client_session_by_recreating_a_new_session(
}
],
(err) => {
doDebug && err && debugLog("repair_client_session_by_recreating_a_new_session failed with ", err.message);
callback(err!);
}
);
Expand All @@ -393,7 +397,9 @@ function _repair_client_session(client: IClientBase, session: ClientSessionImpl,
const callback2 = (err2?: Error) => {
doDebug &&
debugLog("Session repair completed with err: ", err2 ? err2.message : "<no error>", session.sessionId.toString());
session.emit("session_repaired");
if (!err2) {
session.emit("session_repaired");
}
callback(err2);
};

Expand All @@ -420,53 +426,71 @@ function _repair_client_session(client: IClientBase, session: ClientSessionImpl,
type EmptyCallback = (err?: Error) => void;

export function repair_client_session(client: IClientBase, session: ClientSessionImpl, callback: EmptyCallback): void {

if (!client) {
doDebug && debugLog("Aborting reactivation of old session because user requested session to be close");
return callback();
}

doDebug && debugLog(chalk.yellow("Starting client session repair"));

const privateSession = session as any as Reconnectable;
privateSession._reconnecting = privateSession._reconnecting || { reconnecting: false, pendingCallbacks: [] };

if (privateSession._reconnecting.reconnecting) {
doDebug && debugLog(chalk.bgCyan("Reconnecting already happening for session"), session.sessionId.toString());
privateSession._reconnecting.pendingCallbacks.push(callback);
return;
}

privateSession._reconnecting.reconnecting = true;

// get old transaction queue ...
const transactionQueue = privateSession.pendingTransactions ? privateSession.pendingTransactions.splice(0) : [];

_repair_client_session(client, session, (err) => {
privateSession._reconnecting.reconnecting = false;
if (err) {
errorLog(
chalk.red("session restoration has failed! err ="),
err.message,
session.sessionId.toString(),
" => Let's retry"
);
if (!session.hasBeenClosed()) {
setTimeout(() => {
_repair_client_session(client, session, callback);
}, 2000);
} else {
// session does not need to be repaired anymore
callback();

const repeatedAction = (callback: EmptyCallback) => {
_repair_client_session(client, session, (err) => {
if (err) {
errorLog(
chalk.red("session restoration has failed! err ="),
err.message,
session.sessionId.toString(),
" => Let's retry"
);
if (!session.hasBeenClosed()) {

const delay = 2000;
errorLog(chalk.red(`... will retry session repair... in ${delay} ms`));
setTimeout(() => {
errorLog(chalk.red("Retrying session repair..."));
repeatedAction(callback);
}, delay);
return;
} else {
console.log(chalk.red("session restoration should be interrupted because session has been closed forcefully"));
// session does not need to be repaired anymore
callback();
}
return;
}
return;
}
doDebug && debugLog(chalk.yellow("session has been restored"), session.sessionId.toString());
session.emit("session_restored");
const otherCallbacks = privateSession._reconnecting.pendingCallbacks;
privateSession._reconnecting.pendingCallbacks = [];

// re-inject element in queue
doDebug && debugLog(chalk.yellow("re-injecting transaction queue"), transactionQueue.length);
transactionQueue.forEach((e: any) => privateSession.pendingTransactions.push(e));
otherCallbacks.forEach((c: EmptyCallback) => c(err));
callback(err);
});

console.log("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! DELETE ME");
privateSession._reconnecting.reconnecting = false;
errorLog("Session has been restored");
doDebug && debugLog(chalk.yellow("session has been restored"), session.sessionId.toString());
session.emit("session_restored");
const otherCallbacks = privateSession._reconnecting.pendingCallbacks;
privateSession._reconnecting.pendingCallbacks = [];

// re-inject element in queue
doDebug && debugLog(chalk.yellow("re-injecting transaction queue"), transactionQueue.length);
transactionQueue.forEach((e: any) => privateSession.pendingTransactions.push(e));
otherCallbacks.forEach((c: EmptyCallback) => c(err));
callback(err);
});
};
repeatedAction(callback);
}

export function repair_client_sessions(client: IClientBase, callback: (err?: Error) => void): void {
Expand All @@ -475,11 +499,13 @@ export function repair_client_sessions(client: IClientBase, callback: (err?: Err
doDebug && debugLog(chalk.red.bgWhite(" Starting sessions reactivation", sessions.length));
async.map(
sessions,
(session, next: (err?: Error) => void) => {
repair_client_session(client, session as ClientSessionImpl, next);
(session, next: (err: Error | null, err2: Error | null | undefined) => void) => {
repair_client_session(client, session as ClientSessionImpl, (err) => {
next(null, err);
});
},
(err) => {
err && errorLog("sessions reactivation completed: err ", err ? err.message : "null");
(err, allErrors: (undefined | Error | null)[] | undefined) => {
err && errorLog("sessions reactivation completed with err: err ", err ? err.message : "null");
return callback(err!);
}
);
Expand Down
Expand Up @@ -49,7 +49,7 @@ module.exports = function (test) {
const clients = [];

const sessions = [];
function create_non_activated_session(callback) {
function create_non_activated_session(callback) {
const endpointUrl = test.endpointUrl;
const client1 = OPCUAClient.create({
connectionStrategy: fail_fast_connectionStrategy
Expand Down
Expand Up @@ -6,7 +6,7 @@ const should = require("should");
const opcua = require("node-opcua");
const chalk = require("chalk");

const doDebug = true;
const doDebug = false;

module.exports = function(test) {
const maxSessionsForTest = 50;
Expand Down
22 changes: 18 additions & 4 deletions packages/node-opcua-end2end-test/test/test_issue_1162.ts
Expand Up @@ -66,7 +66,7 @@ describe("Testing automatic reconnection to a server when credential have change
console.log("server connection refused");
});
server.on("session_closed", () => {
console.log("server sesion closed");
console.log("server session closed");
});
server.on("create_session", () => {
console.log("server create session");
Expand Down Expand Up @@ -141,29 +141,43 @@ describe("Testing automatic reconnection to a server when credential have change
}

async function shutDownServerChangePasswordAndRestart(waitingTIme: number, newPassword = "password1-New") {
console.log("============================ shuting down server");
console.log("============================ shutting down server");
await server.shutdown();
await wait(waitingTIme);
console.log("============================ changing user password");
users[0].password = newPassword;
console.log("============================ restarting server again");
server = await startServer();
console.log("============================ server restarteds");
console.log("============================ server restarted");
}

it("should try to reconnected automatically - but fail to do so", async () => {
const client = await createAndConnectClient();

let reconnectingCount = 0;
client.on("reconnecting", () => {
reconnectingCount++;
});
try {
await shutDownServerChangePasswordAndRestart(1, "password1-New");

// wait until client is connected
await wait_until_condition(() => {
return client.isReconnecting;
}, 10000);
await wait(10*1000);

console.log("client.isReconnecting = ", client.isReconnecting);
client.isReconnecting.should.eql(true, "client should be trying to reconnect constantly without success");
await wait(10 * 1000);
console.log("client.isReconnecting = ", client.isReconnecting);
await wait(10 * 1000);
console.log("client.isReconnecting = ", client.isReconnecting);
await wait(10 * 1000);
console.log("client.isReconnecting = ", client.isReconnecting);
client.isReconnecting.should.eql(true, "client should be trying to reconnect constantly without success");
} finally {
console.log("now disconnecting");

await client.disconnect();
}
console.log("done!");
Expand Down

0 comments on commit 283bd1f

Please sign in to comment.