@@ -1,5 +1,5 @@
import os from 'os';
import cluster from 'cluster ';
import throng from 'throng ';
import dotenv from 'dotenv';
import express from 'express';
import http from 'http';
@@ -16,6 +16,8 @@ dotenv.config({
path: env === 'production' ? '.env.production' : '.env',
});
log.level = process.env.LOG_LEVEL || 'verbose';
const dbUrlToConfig = (dbUrl) => {
if (!dbUrl) {
return {};
@@ -65,24 +67,15 @@ const redisUrlToClient = (defaultConfig, redisUrl) => {
}));
};
if (cluster.isMaster) {
// Cluster master
const core = +process.env.STREAMING_CLUSTER_NUM || (env === 'development' ? 1 : Math.max(os.cpus().length - 1, 1));
const fork = () => {
const worker = cluster.fork();
const numWorkers = +process.env.STREAMING_CLUSTER_NUM || (env === 'development' ? 1 : Math.max(os.cpus().length - 1, 1));
worker.on('exit', (code, signal) => {
log.error(`Worker died with exit code ${code}, signal ${signal} received.`);
setTimeout(() => fork(), 0);
});
};
const startMaster = () => {
log.info(`Starting streaming API server master with ${numWorkers} workers`);
};
for (let i = 0; i < core; i++) fork();
const startWorker = (workerId) => {
log.info(`Starting worker ${workerId}`);
log.info(`Starting streaming API server master with ${core} workers`);
} else {
// Cluster worker
const pgConfigs = {
development: {
database: 'mastodon_development',
@@ -130,6 +123,7 @@ if (cluster.isMaster) {
if (!callbacks) {
return;
}
callbacks.forEach(callback => callback(message));
});
@@ -215,9 +209,9 @@ if (cluster.isMaster) {
};
const errorMiddleware = (err, req, res, next) => {
log.error(req.requestId, err);
log.error(req.requestId, err.toString() );
res.writeHead(err.statusCode || 500, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ error: err.statusCode ? `${err}` : 'An unexpected error occurred' }));
res.end(JSON.stringify({ error: err.statusCode ? err.toString() : 'An unexpected error occurred' }));
};
const placeholders = (arr, shift = 0) => arr.map((_, i) => `$${i + 1 + shift}`).join(', ');
@@ -249,8 +243,9 @@ if (cluster.isMaster) {
const targetAccountIds = [unpackedPayload.account.id].concat(unpackedPayload.mentions.map(item => item.id)).concat(unpackedPayload.reblog ? [unpackedPayload.reblog.account.id] : []);
const accountDomain = unpackedPayload.account.acct.split('@')[1];
if (req.filteredLanguages.indexOf(unpackedPayload.language) !== -1 ) {
if (Array.isArray(req.filteredLanguages) && req.filteredLanguages.includes(unpackedPayload.language) ) {
log.silly(req.requestId, `Message ${unpackedPayload.id} filtered by language (${unpackedPayload.language})`);
done();
return;
}
@@ -271,6 +266,7 @@ if (cluster.isMaster) {
transmit();
}).catch(err => {
done();
log.error(err);
});
});
@@ -309,26 +305,13 @@ if (cluster.isMaster) {
};
// Setup stream output to WebSockets
const streamToWs = (req, ws) => {
const heartbeat = setInterval(() => {
// TODO: Can't add multiple listeners, due to the limitation of uws.
if (ws.readyState !== ws.OPEN) {
log.verbose(req.requestId, `Ending stream for ${req.accountId}`);
clearInterval(heartbeat);
return;
}
ws.ping();
}, 15000);
return (event, payload) => {
if (ws.readyState !== ws.OPEN) {
log.error(req.requestId, 'Tried writing to closed socket');
return;
}
const streamToWs = (req, ws) => (event, payload) => {
if (ws.readyState !== ws.OPEN) {
log.error(req.requestId, 'Tried writing to closed socket');
return;
}
ws.send(JSON.stringify({ event, payload }));
};
ws.send(JSON.stringify({ event, payload }));
};
// Setup stream end for WebSockets
@@ -372,6 +355,12 @@ if (cluster.isMaster) {
const token = location.query.access_token;
const req = { requestId: uuid.v4() };
ws.isAlive = true;
ws.on('pong', () => {
ws.isAlive = true;
});
accountFromToken(token, req, err => {
if (err) {
log.error(req.requestId, err);
@@ -401,16 +390,40 @@ if (cluster.isMaster) {
});
});
const wsInterval = setInterval(() => {
wss.clients.forEach(ws => {
if (ws.isAlive === false) {
ws.terminate();
return;
}
ws.isAlive = false;
ws.ping('', false, true);
});
}, 30000);
server.listen(process.env.PORT || 4000, () => {
log.level = process.env.LOG_LEVEL || 'verbose';
log.info(`Starting streaming API server worker on ${server.address().address}:${server.address().port}`);
log.info(`Worker ${workerId} now listening on ${server.address().address}:${server.address().port}`);
});
process.on('SIGINT', exit);
process.on('SIGTERM', exit);
process.on('exit', exit);
function exit() {
const onExit = () => {
log.info(`Worker ${workerId} exiting, bye bye`);
server.close();
}
}
};
const onError = (err) => {
log.error(err);
};
process.on('SIGINT', onExit);
process.on('SIGTERM', onExit);
process.on('exit', onExit);
process.on('error', onError);
};
throng({
workers: numWorkers,
lifetime: Infinity,
start: startWorker,
master: startMaster,
});