The code powering m.abunchtell.com https://m.abunchtell.com
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

712 lines
21 KiB

  1. const os = require('os');
  2. const throng = require('throng');
  3. const dotenv = require('dotenv');
  4. const express = require('express');
  5. const http = require('http');
  6. const redis = require('redis');
  7. const pg = require('pg');
  8. const log = require('npmlog');
  9. const url = require('url');
  10. const { WebSocketServer } = require('@clusterws/cws');
  11. const uuid = require('uuid');
  12. const fs = require('fs');
  13. const env = process.env.NODE_ENV || 'development';
  14. const alwaysRequireAuth = process.env.WHITELIST_MODE === 'true' || process.env.AUTHORIZED_FETCH === 'true';
  15. dotenv.config({
  16. path: env === 'production' ? '.env.production' : '.env',
  17. });
  18. log.level = process.env.LOG_LEVEL || 'verbose';
  19. const dbUrlToConfig = (dbUrl) => {
  20. if (!dbUrl) {
  21. return {};
  22. }
  23. const params = url.parse(dbUrl, true);
  24. const config = {};
  25. if (params.auth) {
  26. [config.user, config.password] = params.auth.split(':');
  27. }
  28. if (params.hostname) {
  29. config.host = params.hostname;
  30. }
  31. if (params.port) {
  32. config.port = params.port;
  33. }
  34. if (params.pathname) {
  35. config.database = params.pathname.split('/')[1];
  36. }
  37. const ssl = params.query && params.query.ssl;
  38. if (ssl && ssl === 'true' || ssl === '1') {
  39. config.ssl = true;
  40. }
  41. return config;
  42. };
  43. const redisUrlToClient = (defaultConfig, redisUrl) => {
  44. const config = defaultConfig;
  45. if (!redisUrl) {
  46. return redis.createClient(config);
  47. }
  48. if (redisUrl.startsWith('unix://')) {
  49. return redis.createClient(redisUrl.slice(7), config);
  50. }
  51. return redis.createClient(Object.assign(config, {
  52. url: redisUrl,
  53. }));
  54. };
  55. const numWorkers = +process.env.STREAMING_CLUSTER_NUM || (env === 'development' ? 1 : Math.max(os.cpus().length - 1, 1));
  56. const startMaster = () => {
  57. if (!process.env.SOCKET && process.env.PORT && isNaN(+process.env.PORT)) {
  58. log.warn('UNIX domain socket is now supported by using SOCKET. Please migrate from PORT hack.');
  59. }
  60. log.info(`Starting streaming API server master with ${numWorkers} workers`);
  61. };
  62. const startWorker = (workerId) => {
  63. log.info(`Starting worker ${workerId}`);
  64. const pgConfigs = {
  65. development: {
  66. user: process.env.DB_USER || pg.defaults.user,
  67. password: process.env.DB_PASS || pg.defaults.password,
  68. database: process.env.DB_NAME || 'mastodon_development',
  69. host: process.env.DB_HOST || pg.defaults.host,
  70. port: process.env.DB_PORT || pg.defaults.port,
  71. max: 10,
  72. },
  73. production: {
  74. user: process.env.DB_USER || 'mastodon',
  75. password: process.env.DB_PASS || '',
  76. database: process.env.DB_NAME || 'mastodon_production',
  77. host: process.env.DB_HOST || 'localhost',
  78. port: process.env.DB_PORT || 5432,
  79. max: 10,
  80. },
  81. };
  82. if (!!process.env.DB_SSLMODE && process.env.DB_SSLMODE !== 'disable') {
  83. pgConfigs.development.ssl = true;
  84. pgConfigs.production.ssl = true;
  85. }
  86. const app = express();
  87. app.set('trusted proxy', process.env.TRUSTED_PROXY_IP || 'loopback,uniquelocal');
  88. const pgPool = new pg.Pool(Object.assign(pgConfigs[env], dbUrlToConfig(process.env.DATABASE_URL)));
  89. const server = http.createServer(app);
  90. const redisNamespace = process.env.REDIS_NAMESPACE || null;
  91. const redisParams = {
  92. host: process.env.REDIS_HOST || '127.0.0.1',
  93. port: process.env.REDIS_PORT || 6379,
  94. db: process.env.REDIS_DB || 0,
  95. password: process.env.REDIS_PASSWORD,
  96. };
  97. if (redisNamespace) {
  98. redisParams.namespace = redisNamespace;
  99. }
  100. const redisPrefix = redisNamespace ? `${redisNamespace}:` : '';
  101. const redisSubscribeClient = redisUrlToClient(redisParams, process.env.REDIS_URL);
  102. const redisClient = redisUrlToClient(redisParams, process.env.REDIS_URL);
  103. const subs = {};
  104. redisSubscribeClient.on('message', (channel, message) => {
  105. const callbacks = subs[channel];
  106. log.silly(`New message on channel ${channel}`);
  107. if (!callbacks) {
  108. return;
  109. }
  110. callbacks.forEach(callback => callback(message));
  111. });
  112. const subscriptionHeartbeat = (channel) => {
  113. const interval = 6*60;
  114. const tellSubscribed = () => {
  115. redisClient.set(`${redisPrefix}subscribed:${channel}`, '1', 'EX', interval*3);
  116. };
  117. tellSubscribed();
  118. const heartbeat = setInterval(tellSubscribed, interval*1000);
  119. return () => {
  120. clearInterval(heartbeat);
  121. };
  122. };
  123. const subscribe = (channel, callback) => {
  124. log.silly(`Adding listener for ${channel}`);
  125. subs[channel] = subs[channel] || [];
  126. if (subs[channel].length === 0) {
  127. log.verbose(`Subscribe ${channel}`);
  128. redisSubscribeClient.subscribe(channel);
  129. }
  130. subs[channel].push(callback);
  131. };
  132. const unsubscribe = (channel, callback) => {
  133. log.silly(`Removing listener for ${channel}`);
  134. subs[channel] = subs[channel].filter(item => item !== callback);
  135. if (subs[channel].length === 0) {
  136. log.verbose(`Unsubscribe ${channel}`);
  137. redisSubscribeClient.unsubscribe(channel);
  138. }
  139. };
  140. const allowCrossDomain = (req, res, next) => {
  141. res.header('Access-Control-Allow-Origin', '*');
  142. res.header('Access-Control-Allow-Headers', 'Authorization, Accept, Cache-Control');
  143. res.header('Access-Control-Allow-Methods', 'GET, OPTIONS');
  144. next();
  145. };
  146. const setRequestId = (req, res, next) => {
  147. req.requestId = uuid.v4();
  148. res.header('X-Request-Id', req.requestId);
  149. next();
  150. };
  151. const setRemoteAddress = (req, res, next) => {
  152. req.remoteAddress = req.connection.remoteAddress;
  153. next();
  154. };
  155. const accountFromToken = (token, allowedScopes, req, next) => {
  156. pgPool.connect((err, client, done) => {
  157. if (err) {
  158. next(err);
  159. return;
  160. }
  161. client.query('SELECT oauth_access_tokens.resource_owner_id, users.account_id, users.chosen_languages, oauth_access_tokens.scopes FROM oauth_access_tokens INNER JOIN users ON oauth_access_tokens.resource_owner_id = users.id WHERE oauth_access_tokens.token = $1 AND oauth_access_tokens.revoked_at IS NULL LIMIT 1', [token], (err, result) => {
  162. done();
  163. if (err) {
  164. next(err);
  165. return;
  166. }
  167. if (result.rows.length === 0) {
  168. err = new Error('Invalid access token');
  169. err.statusCode = 401;
  170. next(err);
  171. return;
  172. }
  173. const scopes = result.rows[0].scopes.split(' ');
  174. if (allowedScopes.size > 0 && !scopes.some(scope => allowedScopes.includes(scope))) {
  175. err = new Error('Access token does not cover required scopes');
  176. err.statusCode = 401;
  177. next(err);
  178. return;
  179. }
  180. req.accountId = result.rows[0].account_id;
  181. req.chosenLanguages = result.rows[0].chosen_languages;
  182. req.allowNotifications = scopes.some(scope => ['read', 'read:notifications'].includes(scope));
  183. next();
  184. });
  185. });
  186. };
  187. const accountFromRequest = (req, next, required = true, allowedScopes = ['read']) => {
  188. const authorization = req.headers.authorization;
  189. const location = url.parse(req.url, true);
  190. const accessToken = location.query.access_token || req.headers['sec-websocket-protocol'];
  191. if (!authorization && !accessToken) {
  192. if (required) {
  193. const err = new Error('Missing access token');
  194. err.statusCode = 401;
  195. next(err);
  196. return;
  197. } else {
  198. next();
  199. return;
  200. }
  201. }
  202. const token = authorization ? authorization.replace(/^Bearer /, '') : accessToken;
  203. accountFromToken(token, allowedScopes, req, next);
  204. };
  205. const PUBLIC_STREAMS = [
  206. 'public',
  207. 'public:media',
  208. 'public:local',
  209. 'public:local:media',
  210. 'hashtag',
  211. 'hashtag:local',
  212. ];
  213. const wsVerifyClient = (info, cb) => {
  214. const location = url.parse(info.req.url, true);
  215. const authRequired = alwaysRequireAuth || !PUBLIC_STREAMS.some(stream => stream === location.query.stream);
  216. const allowedScopes = [];
  217. if (authRequired) {
  218. allowedScopes.push('read');
  219. if (location.query.stream === 'user:notification') {
  220. allowedScopes.push('read:notifications');
  221. } else {
  222. allowedScopes.push('read:statuses');
  223. }
  224. }
  225. accountFromRequest(info.req, err => {
  226. if (!err) {
  227. cb(true, undefined, undefined);
  228. } else {
  229. log.error(info.req.requestId, err.toString());
  230. cb(false, 401, 'Unauthorized');
  231. }
  232. }, authRequired, allowedScopes);
  233. };
  234. const PUBLIC_ENDPOINTS = [
  235. '/api/v1/streaming/public',
  236. '/api/v1/streaming/public/local',
  237. '/api/v1/streaming/hashtag',
  238. '/api/v1/streaming/hashtag/local',
  239. ];
  240. const authenticationMiddleware = (req, res, next) => {
  241. if (req.method === 'OPTIONS') {
  242. next();
  243. return;
  244. }
  245. const authRequired = alwaysRequireAuth || !PUBLIC_ENDPOINTS.some(endpoint => endpoint === req.path);
  246. const allowedScopes = [];
  247. if (authRequired) {
  248. allowedScopes.push('read');
  249. if (req.path === '/api/v1/streaming/user/notification') {
  250. allowedScopes.push('read:notifications');
  251. } else {
  252. allowedScopes.push('read:statuses');
  253. }
  254. }
  255. accountFromRequest(req, next, authRequired, allowedScopes);
  256. };
  257. const errorMiddleware = (err, req, res, {}) => {
  258. log.error(req.requestId, err.toString());
  259. res.writeHead(err.statusCode || 500, { 'Content-Type': 'application/json' });
  260. res.end(JSON.stringify({ error: err.statusCode ? err.toString() : 'An unexpected error occurred' }));
  261. };
  262. const placeholders = (arr, shift = 0) => arr.map((_, i) => `$${i + 1 + shift}`).join(', ');
  263. const authorizeListAccess = (id, req, next) => {
  264. pgPool.connect((err, client, done) => {
  265. if (err) {
  266. next(false);
  267. return;
  268. }
  269. client.query('SELECT id, account_id FROM lists WHERE id = $1 LIMIT 1', [id], (err, result) => {
  270. done();
  271. if (err || result.rows.length === 0 || result.rows[0].account_id !== req.accountId) {
  272. next(false);
  273. return;
  274. }
  275. next(true);
  276. });
  277. });
  278. };
  279. const streamFrom = (id, req, output, attachCloseHandler, needsFiltering = false, notificationOnly = false) => {
  280. const accountId = req.accountId || req.remoteAddress;
  281. const streamType = notificationOnly ? ' (notification)' : '';
  282. log.verbose(req.requestId, `Starting stream from ${id} for ${accountId}${streamType}`);
  283. const listener = message => {
  284. const { event, payload, queued_at } = JSON.parse(message);
  285. const transmit = () => {
  286. const now = new Date().getTime();
  287. const delta = now - queued_at;
  288. const encodedPayload = typeof payload === 'object' ? JSON.stringify(payload) : payload;
  289. log.silly(req.requestId, `Transmitting for ${accountId}: ${event} ${encodedPayload} Delay: ${delta}ms`);
  290. output(event, encodedPayload);
  291. };
  292. if (notificationOnly && event !== 'notification') {
  293. return;
  294. }
  295. if (event === 'notification' && !req.allowNotifications) {
  296. return;
  297. }
  298. // Only messages that may require filtering are statuses, since notifications
  299. // are already personalized and deletes do not matter
  300. if (!needsFiltering || event !== 'update') {
  301. transmit();
  302. return;
  303. }
  304. const unpackedPayload = payload;
  305. const targetAccountIds = [unpackedPayload.account.id].concat(unpackedPayload.mentions.map(item => item.id));
  306. const accountDomain = unpackedPayload.account.acct.split('@')[1];
  307. if (Array.isArray(req.chosenLanguages) && unpackedPayload.language !== null && req.chosenLanguages.indexOf(unpackedPayload.language) === -1) {
  308. log.silly(req.requestId, `Message ${unpackedPayload.id} filtered by language (${unpackedPayload.language})`);
  309. return;
  310. }
  311. // When the account is not logged in, it is not necessary to confirm the block or mute
  312. if (!req.accountId) {
  313. transmit();
  314. return;
  315. }
  316. pgPool.connect((err, client, done) => {
  317. if (err) {
  318. log.error(err);
  319. return;
  320. }
  321. const queries = [
  322. client.query(`SELECT 1 FROM blocks WHERE (account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 2)})) OR (account_id = $2 AND target_account_id = $1) UNION SELECT 1 FROM mutes WHERE account_id = $1 AND target_account_id IN (${placeholders(targetAccountIds, 2)})`, [req.accountId, unpackedPayload.account.id].concat(targetAccountIds)),
  323. ];
  324. if (accountDomain) {
  325. queries.push(client.query('SELECT 1 FROM account_domain_blocks WHERE account_id = $1 AND domain = $2', [req.accountId, accountDomain]));
  326. }
  327. Promise.all(queries).then(values => {
  328. done();
  329. if (values[0].rows.length > 0 || (values.length > 1 && values[1].rows.length > 0)) {
  330. return;
  331. }
  332. transmit();
  333. }).catch(err => {
  334. done();
  335. log.error(err);
  336. });
  337. });
  338. };
  339. subscribe(`${redisPrefix}${id}`, listener);
  340. attachCloseHandler(`${redisPrefix}${id}`, listener);
  341. };
  342. // Setup stream output to HTTP
  343. const streamToHttp = (req, res) => {
  344. const accountId = req.accountId || req.remoteAddress;
  345. res.setHeader('Content-Type', 'text/event-stream');
  346. res.setHeader('Transfer-Encoding', 'chunked');
  347. const heartbeat = setInterval(() => res.write(':thump\n'), 15000);
  348. req.on('close', () => {
  349. log.verbose(req.requestId, `Ending stream for ${accountId}`);
  350. clearInterval(heartbeat);
  351. });
  352. return (event, payload) => {
  353. res.write(`event: ${event}\n`);
  354. res.write(`data: ${payload}\n\n`);
  355. };
  356. };
  357. // Setup stream end for HTTP
  358. const streamHttpEnd = (req, closeHandler = false) => (id, listener) => {
  359. req.on('close', () => {
  360. unsubscribe(id, listener);
  361. if (closeHandler) {
  362. closeHandler();
  363. }
  364. });
  365. };
  366. // Setup stream output to WebSockets
  367. const streamToWs = (req, ws) => (event, payload) => {
  368. if (ws.readyState !== ws.OPEN) {
  369. log.error(req.requestId, 'Tried writing to closed socket');
  370. return;
  371. }
  372. ws.send(JSON.stringify({ event, payload }));
  373. };
  374. // Setup stream end for WebSockets
  375. const streamWsEnd = (req, ws, closeHandler = false) => (id, listener) => {
  376. const accountId = req.accountId || req.remoteAddress;
  377. ws.on('close', () => {
  378. log.verbose(req.requestId, `Ending stream for ${accountId}`);
  379. unsubscribe(id, listener);
  380. if (closeHandler) {
  381. closeHandler();
  382. }
  383. });
  384. ws.on('error', () => {
  385. log.verbose(req.requestId, `Ending stream for ${accountId}`);
  386. unsubscribe(id, listener);
  387. if (closeHandler) {
  388. closeHandler();
  389. }
  390. });
  391. };
  392. const httpNotFound = res => {
  393. res.writeHead(404, { 'Content-Type': 'application/json' });
  394. res.end(JSON.stringify({ error: 'Not found' }));
  395. };
  396. app.use(setRequestId);
  397. app.use(setRemoteAddress);
  398. app.use(allowCrossDomain);
  399. app.get('/api/v1/streaming/health', (req, res) => {
  400. res.writeHead(200, { 'Content-Type': 'text/plain' });
  401. res.end('OK');
  402. });
  403. app.use(authenticationMiddleware);
  404. app.use(errorMiddleware);
  405. app.get('/api/v1/streaming/user', (req, res) => {
  406. const channel = `timeline:${req.accountId}`;
  407. streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req, subscriptionHeartbeat(channel)));
  408. });
  409. app.get('/api/v1/streaming/user/notification', (req, res) => {
  410. streamFrom(`timeline:${req.accountId}`, req, streamToHttp(req, res), streamHttpEnd(req), false, true);
  411. });
  412. app.get('/api/v1/streaming/public', (req, res) => {
  413. const onlyMedia = req.query.only_media === '1' || req.query.only_media === 'true';
  414. const channel = onlyMedia ? 'timeline:public:media' : 'timeline:public';
  415. streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req), true);
  416. });
  417. app.get('/api/v1/streaming/public/local', (req, res) => {
  418. const onlyMedia = req.query.only_media === '1' || req.query.only_media === 'true';
  419. const channel = onlyMedia ? 'timeline:public:local:media' : 'timeline:public:local';
  420. streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req), true);
  421. });
  422. app.get('/api/v1/streaming/direct', (req, res) => {
  423. const channel = `timeline:direct:${req.accountId}`;
  424. streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req, subscriptionHeartbeat(channel)), true);
  425. });
  426. app.get('/api/v1/streaming/hashtag', (req, res) => {
  427. const { tag } = req.query;
  428. if (!tag || tag.length === 0) {
  429. httpNotFound(res);
  430. return;
  431. }
  432. streamFrom(`timeline:hashtag:${tag.toLowerCase()}`, req, streamToHttp(req, res), streamHttpEnd(req), true);
  433. });
  434. app.get('/api/v1/streaming/hashtag/local', (req, res) => {
  435. const { tag } = req.query;
  436. if (!tag || tag.length === 0) {
  437. httpNotFound(res);
  438. return;
  439. }
  440. streamFrom(`timeline:hashtag:${tag.toLowerCase()}:local`, req, streamToHttp(req, res), streamHttpEnd(req), true);
  441. });
  442. app.get('/api/v1/streaming/list', (req, res) => {
  443. const listId = req.query.list;
  444. authorizeListAccess(listId, req, authorized => {
  445. if (!authorized) {
  446. httpNotFound(res);
  447. return;
  448. }
  449. const channel = `timeline:list:${listId}`;
  450. streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req, subscriptionHeartbeat(channel)));
  451. });
  452. });
  453. const wss = new WebSocketServer({ server, verifyClient: wsVerifyClient });
  454. wss.on('connection', (ws, req) => {
  455. const location = url.parse(req.url, true);
  456. req.requestId = uuid.v4();
  457. req.remoteAddress = ws._socket.remoteAddress;
  458. let channel;
  459. switch(location.query.stream) {
  460. case 'user':
  461. channel = `timeline:${req.accountId}`;
  462. streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel)));
  463. break;
  464. case 'user:notification':
  465. streamFrom(`timeline:${req.accountId}`, req, streamToWs(req, ws), streamWsEnd(req, ws), false, true);
  466. break;
  467. case 'public':
  468. streamFrom('timeline:public', req, streamToWs(req, ws), streamWsEnd(req, ws), true);
  469. break;
  470. case 'public:local':
  471. streamFrom('timeline:public:local', req, streamToWs(req, ws), streamWsEnd(req, ws), true);
  472. break;
  473. case 'public:media':
  474. streamFrom('timeline:public:media', req, streamToWs(req, ws), streamWsEnd(req, ws), true);
  475. break;
  476. case 'public:local:media':
  477. streamFrom('timeline:public:local:media', req, streamToWs(req, ws), streamWsEnd(req, ws), true);
  478. break;
  479. case 'direct':
  480. channel = `timeline:direct:${req.accountId}`;
  481. streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel)), true);
  482. break;
  483. case 'hashtag':
  484. if (!location.query.tag || location.query.tag.length === 0) {
  485. ws.close();
  486. return;
  487. }
  488. streamFrom(`timeline:hashtag:${location.query.tag.toLowerCase()}`, req, streamToWs(req, ws), streamWsEnd(req, ws), true);
  489. break;
  490. case 'hashtag:local':
  491. if (!location.query.tag || location.query.tag.length === 0) {
  492. ws.close();
  493. return;
  494. }
  495. streamFrom(`timeline:hashtag:${location.query.tag.toLowerCase()}:local`, req, streamToWs(req, ws), streamWsEnd(req, ws), true);
  496. break;
  497. case 'list':
  498. const listId = location.query.list;
  499. authorizeListAccess(listId, req, authorized => {
  500. if (!authorized) {
  501. ws.close();
  502. return;
  503. }
  504. channel = `timeline:list:${listId}`;
  505. streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel)));
  506. });
  507. break;
  508. default:
  509. ws.close();
  510. }
  511. });
  512. wss.startAutoPing(30000);
  513. attachServerWithConfig(server, address => {
  514. log.info(`Worker ${workerId} now listening on ${address}`);
  515. });
  516. const onExit = () => {
  517. log.info(`Worker ${workerId} exiting, bye bye`);
  518. server.close();
  519. process.exit(0);
  520. };
  521. const onError = (err) => {
  522. log.error(err);
  523. server.close();
  524. process.exit(0);
  525. };
  526. process.on('SIGINT', onExit);
  527. process.on('SIGTERM', onExit);
  528. process.on('exit', onExit);
  529. process.on('uncaughtException', onError);
  530. };
  531. const attachServerWithConfig = (server, onSuccess) => {
  532. if (process.env.SOCKET || process.env.PORT && isNaN(+process.env.PORT)) {
  533. server.listen(process.env.SOCKET || process.env.PORT, () => {
  534. if (onSuccess) {
  535. fs.chmodSync(server.address(), 0o666);
  536. onSuccess(server.address());
  537. }
  538. });
  539. } else {
  540. server.listen(+process.env.PORT || 4000, process.env.BIND || '127.0.0.1', () => {
  541. if (onSuccess) {
  542. onSuccess(`${server.address().address}:${server.address().port}`);
  543. }
  544. });
  545. }
  546. };
  547. const onPortAvailable = onSuccess => {
  548. const testServer = http.createServer();
  549. testServer.once('error', err => {
  550. onSuccess(err);
  551. });
  552. testServer.once('listening', () => {
  553. testServer.once('close', () => onSuccess());
  554. testServer.close();
  555. });
  556. attachServerWithConfig(testServer);
  557. };
  558. onPortAvailable(err => {
  559. if (err) {
  560. log.error('Could not start server, the port or socket is in use');
  561. return;
  562. }
  563. throng({
  564. workers: numWorkers,
  565. lifetime: Infinity,
  566. start: startWorker,
  567. master: startMaster,
  568. });
  569. });