The code powering m.abunchtell.com https://m.abunchtell.com
Você não pode selecionar mais de 25 tópicos Os tópicos devem começar com uma letra ou um número, podem incluir traços ('-') e podem ter até 35 caracteres.
 
 
 
 

119 linhas
2.8 KiB

  1. import WebSocketClient from '@gamestdio/websocket';
  2. const randomIntUpTo = max => Math.floor(Math.random() * Math.floor(max));
  3. const knownEventTypes = [
  4. 'update',
  5. 'delete',
  6. 'notification',
  7. 'conversation',
  8. 'filters_changed',
  9. ];
  10. export function connectStream(path, pollingRefresh = null, callbacks = () => ({ onConnect() {}, onDisconnect() {}, onReceive() {} })) {
  11. return (dispatch, getState) => {
  12. const streamingAPIBaseURL = getState().getIn(['meta', 'streaming_api_base_url']);
  13. const accessToken = getState().getIn(['meta', 'access_token']);
  14. const { onConnect, onDisconnect, onReceive } = callbacks(dispatch, getState);
  15. let polling = null;
  16. const setupPolling = () => {
  17. pollingRefresh(dispatch, () => {
  18. polling = setTimeout(() => setupPolling(), 20000 + randomIntUpTo(20000));
  19. });
  20. };
  21. const clearPolling = () => {
  22. if (polling) {
  23. clearTimeout(polling);
  24. polling = null;
  25. }
  26. };
  27. const subscription = getStream(streamingAPIBaseURL, accessToken, path, {
  28. connected () {
  29. if (pollingRefresh) {
  30. clearPolling();
  31. }
  32. onConnect();
  33. },
  34. disconnected () {
  35. if (pollingRefresh) {
  36. polling = setTimeout(() => setupPolling(), randomIntUpTo(40000));
  37. }
  38. onDisconnect();
  39. },
  40. received (data) {
  41. onReceive(data);
  42. },
  43. reconnected () {
  44. if (pollingRefresh) {
  45. clearPolling();
  46. pollingRefresh(dispatch);
  47. }
  48. onConnect();
  49. },
  50. });
  51. const disconnect = () => {
  52. if (subscription) {
  53. subscription.close();
  54. }
  55. clearPolling();
  56. };
  57. return disconnect;
  58. };
  59. }
  60. export default function getStream(streamingAPIBaseURL, accessToken, stream, { connected, received, disconnected, reconnected }) {
  61. const params = stream.split('&');
  62. stream = params.shift();
  63. if (streamingAPIBaseURL.startsWith('ws')) {
  64. params.unshift(`stream=${stream}`);
  65. const ws = new WebSocketClient(`${streamingAPIBaseURL}/api/v1/streaming/?${params.join('&')}`, accessToken);
  66. ws.onopen = connected;
  67. ws.onmessage = e => received(JSON.parse(e.data));
  68. ws.onclose = disconnected;
  69. ws.onreconnect = reconnected;
  70. return ws;
  71. }
  72. params.push(`access_token=${accessToken}`);
  73. const es = new EventSource(`${streamingAPIBaseURL}/api/v1/streaming/${stream}?${params.join('&')}`);
  74. let firstConnect = true;
  75. es.onopen = () => {
  76. if (firstConnect) {
  77. firstConnect = false;
  78. connected();
  79. } else {
  80. reconnected();
  81. }
  82. };
  83. for (let type of knownEventTypes) {
  84. es.addEventListener(type, (e) => {
  85. received({
  86. event: e.type,
  87. payload: e.data,
  88. });
  89. });
  90. }
  91. es.onerror = disconnected;
  92. return es;
  93. };