Skip to content

Commit

Permalink
feat: broadcast and expect multiple acks
Browse files Browse the repository at this point in the history
Syntax:

```js
io.timeout(1000).emit("some-event", (err, responses) => {
  // ...
});
```

The adapter exposes two additional methods:

- `broadcastWithAck(packets, opts, clientCountCallback, ack)`

Similar to `broadcast(packets, opts)`, but:

* `clientCountCallback()` is called with the number of clients that
  received the packet (can be called several times in a cluster)
* `ack()` is called for each client response

- `serverCount()`

It returns the number of Socket.IO servers in the cluster (1 for the
in-memory adapter).

Those two methods will be implemented in the other adapters (Redis,
Postgres, MongoDB, ...).

Related:

- #1811
- #4163
- socketio/socket.io-redis-adapter#445
  • Loading branch information
darrachequesne committed Mar 31, 2022
1 parent 0b7d70c commit 8b20457
Show file tree
Hide file tree
Showing 8 changed files with 238 additions and 16 deletions.
86 changes: 80 additions & 6 deletions lib/broadcast-operator.ts
Expand Up @@ -129,6 +129,29 @@ export class BroadcastOperator<EmitEvents extends EventsMap, SocketData>
);
}

/**
* Adds a timeout in milliseconds for the next operation
*
* <pre><code>
*
* io.timeout(1000).emit("some-event", (err, responses) => {
* // ...
* });
*
* </pre></code>
*
* @param timeout
*/
public timeout(timeout: number) {
const flags = Object.assign({}, this.flags, { timeout });
return new BroadcastOperator(
this.adapter,
this.rooms,
this.exceptRooms,
flags
);
}

/**
* Emits to all clients.
*
Expand All @@ -149,14 +172,65 @@ export class BroadcastOperator<EmitEvents extends EventsMap, SocketData>
data: data,
};

if ("function" == typeof data[data.length - 1]) {
throw new Error("Callbacks are not supported when broadcasting");
const withAck = typeof data[data.length - 1] === "function";

if (!withAck) {
this.adapter.broadcast(packet, {
rooms: this.rooms,
except: this.exceptRooms,
flags: this.flags,
});

return true;
}

this.adapter.broadcast(packet, {
rooms: this.rooms,
except: this.exceptRooms,
flags: this.flags,
const ack = data.pop() as (...args: any[]) => void;
let timedOut = false;
let responses: any[] = [];

const timer = setTimeout(() => {
timedOut = true;
ack.apply(this, [new Error("operation has timed out"), responses]);
}, this.flags.timeout);

let expectedServerCount = -1;
let actualServerCount = 0;
let expectedClientCount = 0;

const checkCompleteness = () => {
if (
!timedOut &&
expectedServerCount === actualServerCount &&
responses.length === expectedClientCount
) {
clearTimeout(timer);
ack.apply(this, [null, responses]);
}
};

this.adapter.broadcastWithAck(
packet,
{
rooms: this.rooms,
except: this.exceptRooms,
flags: this.flags,
},
(clientCount) => {
// each Socket.IO server in the cluster sends the number of clients that were notified
expectedClientCount += clientCount;
actualServerCount++;
checkCompleteness();
},
(clientResponse) => {
// each client sends an acknowledgement
responses.push(clientResponse);
checkCompleteness();
}
);

this.adapter.serverCount().then((serverCount) => {
expectedServerCount = serverCount;
checkCompleteness();
});

return true;
Expand Down
17 changes: 17 additions & 0 deletions lib/index.ts
Expand Up @@ -772,6 +772,23 @@ export class Server<
return this.sockets.local;
}

/**
* Adds a timeout in milliseconds for the next operation
*
* <pre><code>
*
* io.timeout(1000).emit("some-event", (err, responses) => {
* // ...
* });
*
* </pre></code>
*
* @param timeout
*/
public timeout(timeout: number) {
return this.sockets.timeout(timeout);
}

/**
* Returns the matching socket instances
*
Expand Down
17 changes: 17 additions & 0 deletions lib/namespace.ts
Expand Up @@ -379,6 +379,23 @@ export class Namespace<
return new BroadcastOperator(this.adapter).local;
}

/**
* Adds a timeout in milliseconds for the next operation
*
* <pre><code>
*
* io.timeout(1000).emit("some-event", (err, responses) => {
* // ...
* });
*
* </pre></code>
*
* @param timeout
*/
public timeout(timeout: number) {
return new BroadcastOperator(this.adapter).timeout(timeout);
}

/**
* Returns the matching socket instances
*
Expand Down
2 changes: 1 addition & 1 deletion lib/socket.ts
Expand Up @@ -140,7 +140,7 @@ export class Socket<
private readonly adapter: Adapter;
private acks: Map<number, () => void> = new Map();
private fns: Array<(event: Event, next: (err?: Error) => void) => void> = [];
private flags: BroadcastFlags & { timeout?: number } = {};
private flags: BroadcastFlags = {};
private _anyListeners?: Array<(...args: any[]) => void>;

/**
Expand Down
15 changes: 8 additions & 7 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Expand Up @@ -50,7 +50,7 @@
"base64id": "~2.0.0",
"debug": "~4.3.2",
"engine.io": "~6.1.2",
"socket.io-adapter": "~2.3.3",
"socket.io-adapter": "~2.4.0",
"socket.io-parser": "~4.0.4"
},
"devDependencies": {
Expand Down
113 changes: 113 additions & 0 deletions test/socket.io.ts
Expand Up @@ -2519,6 +2519,119 @@ describe("socket.io", () => {
});
});
});

it("should broadcast and expect multiple acknowledgements", (done) => {
const srv = createServer();
const sio = new Server(srv);

srv.listen(async () => {
const socket1 = client(srv, { multiplex: false });
const socket2 = client(srv, { multiplex: false });
const socket3 = client(srv, { multiplex: false });

await Promise.all([
waitFor(socket1, "connect"),
waitFor(socket2, "connect"),
waitFor(socket3, "connect"),
]);

socket1.on("some event", (cb) => {
cb(1);
});

socket2.on("some event", (cb) => {
cb(2);
});

socket3.on("some event", (cb) => {
cb(3);
});

sio.timeout(2000).emit("some event", (err, responses) => {
expect(err).to.be(null);
expect(responses).to.have.length(3);
expect(responses).to.contain(1, 2, 3);

done();
});
});
});

it("should fail when a client does not acknowledge the event in the given delay", (done) => {
const srv = createServer();
const sio = new Server(srv);

srv.listen(async () => {
const socket1 = client(srv, { multiplex: false });
const socket2 = client(srv, { multiplex: false });
const socket3 = client(srv, { multiplex: false });

await Promise.all([
waitFor(socket1, "connect"),
waitFor(socket2, "connect"),
waitFor(socket3, "connect"),
]);

socket1.on("some event", (cb) => {
cb(1);
});

socket2.on("some event", (cb) => {
cb(2);
});

socket3.on("some event", (cb) => {
// timeout
});

sio.timeout(200).emit("some event", (err, responses) => {
expect(err).to.be.an(Error);
expect(responses).to.have.length(2);
expect(responses).to.contain(1, 2);

done();
});
});
});

it("should broadcast and return if the packet is sent to 0 client", (done) => {
const srv = createServer();
const sio = new Server(srv);

srv.listen(async () => {
const socket1 = client(srv, { multiplex: false });
const socket2 = client(srv, { multiplex: false });
const socket3 = client(srv, { multiplex: false });

await Promise.all([
waitFor(socket1, "connect"),
waitFor(socket2, "connect"),
waitFor(socket3, "connect"),
]);

socket1.on("some event", () => {
done(new Error("should not happen"));
});

socket2.on("some event", () => {
done(new Error("should not happen"));
});

socket3.on("some event", () => {
done(new Error("should not happen"));
});

sio
.to("room123")
.timeout(200)
.emit("some event", (err, responses) => {
expect(err).to.be(null);
expect(responses).to.have.length(0);

done();
});
});
});
});

describe("middleware", () => {
Expand Down
2 changes: 1 addition & 1 deletion test/support/util.ts
Expand Up @@ -12,7 +12,7 @@ const i = expect.stringify;
// add support for Set/Map
const contain = expect.Assertion.prototype.contain;
expect.Assertion.prototype.contain = function (...args) {
if (typeof this.obj === "object") {
if (this.obj instanceof Set || this.obj instanceof Map) {
args.forEach((obj) => {
this.assert(
this.obj.has(obj),
Expand Down

0 comments on commit 8b20457

Please sign in to comment.