* Add conversations API * Add web UI for conversations * Add test for conversations API * Add tests for ConversationAccount * Improve web UI * Rename ConversationAccount to AccountConversation * Remove conversations on block and mute * Change last_status_id to be a denormalization of status_ids * Add optimistic lockingmaster
@@ -0,0 +1,55 @@ | |||
# frozen_string_literal: true | |||
class Api::V1::ConversationsController < Api::BaseController | |||
LIMIT = 20 | |||
before_action -> { doorkeeper_authorize! :read, :'read:statuses' } | |||
before_action :require_user! | |||
after_action :insert_pagination_headers | |||
respond_to :json | |||
def index | |||
@conversations = paginated_conversations | |||
render json: @conversations, each_serializer: REST::ConversationSerializer | |||
end | |||
private | |||
def paginated_conversations | |||
AccountConversation.where(account: current_account) | |||
.paginate_by_id(limit_param(LIMIT), params_slice(:max_id, :since_id, :min_id)) | |||
end | |||
def insert_pagination_headers | |||
set_pagination_headers(next_path, prev_path) | |||
end | |||
def next_path | |||
if records_continue? | |||
api_v1_conversations_url pagination_params(max_id: pagination_max_id) | |||
end | |||
end | |||
def prev_path | |||
unless @conversations.empty? | |||
api_v1_conversations_url pagination_params(min_id: pagination_since_id) | |||
end | |||
end | |||
def pagination_max_id | |||
@conversations.last.last_status_id | |||
end | |||
def pagination_since_id | |||
@conversations.first.last_status_id | |||
end | |||
def records_continue? | |||
@conversations.size == limit_param(LIMIT) | |||
end | |||
def pagination_params(core_params) | |||
params.slice(:limit).permit(:limit).merge(core_params) | |||
end | |||
end |
@@ -0,0 +1,59 @@ | |||
import api, { getLinks } from '../api'; | |||
import { | |||
importFetchedAccounts, | |||
importFetchedStatuses, | |||
importFetchedStatus, | |||
} from './importer'; | |||
export const CONVERSATIONS_FETCH_REQUEST = 'CONVERSATIONS_FETCH_REQUEST'; | |||
export const CONVERSATIONS_FETCH_SUCCESS = 'CONVERSATIONS_FETCH_SUCCESS'; | |||
export const CONVERSATIONS_FETCH_FAIL = 'CONVERSATIONS_FETCH_FAIL'; | |||
export const CONVERSATIONS_UPDATE = 'CONVERSATIONS_UPDATE'; | |||
export const expandConversations = ({ maxId } = {}) => (dispatch, getState) => { | |||
dispatch(expandConversationsRequest()); | |||
const params = { max_id: maxId }; | |||
if (!maxId) { | |||
params.since_id = getState().getIn(['conversations', 0, 'last_status']); | |||
} | |||
api(getState).get('/api/v1/conversations', { params }) | |||
.then(response => { | |||
const next = getLinks(response).refs.find(link => link.rel === 'next'); | |||
dispatch(importFetchedAccounts(response.data.reduce((aggr, item) => aggr.concat(item.accounts), []))); | |||
dispatch(importFetchedStatuses(response.data.map(item => item.last_status).filter(x => !!x))); | |||
dispatch(expandConversationsSuccess(response.data, next ? next.uri : null)); | |||
}) | |||
.catch(err => dispatch(expandConversationsFail(err))); | |||
}; | |||
export const expandConversationsRequest = () => ({ | |||
type: CONVERSATIONS_FETCH_REQUEST, | |||
}); | |||
export const expandConversationsSuccess = (conversations, next) => ({ | |||
type: CONVERSATIONS_FETCH_SUCCESS, | |||
conversations, | |||
next, | |||
}); | |||
export const expandConversationsFail = error => ({ | |||
type: CONVERSATIONS_FETCH_FAIL, | |||
error, | |||
}); | |||
export const updateConversations = conversation => dispatch => { | |||
dispatch(importFetchedAccounts(conversation.accounts)); | |||
if (conversation.last_status) { | |||
dispatch(importFetchedStatus(conversation.last_status)); | |||
} | |||
dispatch({ | |||
type: CONVERSATIONS_UPDATE, | |||
conversation, | |||
}); | |||
}; |
@@ -6,6 +6,7 @@ import { | |||
disconnectTimeline, | |||
} from './timelines'; | |||
import { updateNotifications, expandNotifications } from './notifications'; | |||
import { updateConversations } from './conversations'; | |||
import { fetchFilters } from './filters'; | |||
import { getLocale } from '../locales'; | |||
@@ -31,6 +32,9 @@ export function connectTimelineStream (timelineId, path, pollingRefresh = null) | |||
case 'notification': | |||
dispatch(updateNotifications(JSON.parse(data.payload), messages, locale)); | |||
break; | |||
case 'conversation': | |||
dispatch(updateConversations(JSON.parse(data.payload))); | |||
break; | |||
case 'filters_changed': | |||
dispatch(fetchFilters()); | |||
break; | |||
@@ -76,7 +76,6 @@ export function expandTimeline(timelineId, path, params = {}, done = noOp) { | |||
export const expandHomeTimeline = ({ maxId } = {}, done = noOp) => expandTimeline('home', '/api/v1/timelines/home', { max_id: maxId }, done); | |||
export const expandPublicTimeline = ({ maxId, onlyMedia } = {}, done = noOp) => expandTimeline(`public${onlyMedia ? ':media' : ''}`, '/api/v1/timelines/public', { max_id: maxId, only_media: !!onlyMedia }, done); | |||
export const expandCommunityTimeline = ({ maxId, onlyMedia } = {}, done = noOp) => expandTimeline(`community${onlyMedia ? ':media' : ''}`, '/api/v1/timelines/public', { local: true, max_id: maxId, only_media: !!onlyMedia }, done); | |||
export const expandDirectTimeline = ({ maxId } = {}, done = noOp) => expandTimeline('direct', '/api/v1/timelines/direct', { max_id: maxId }, done); | |||
export const expandAccountTimeline = (accountId, { maxId, withReplies } = {}) => expandTimeline(`account:${accountId}${withReplies ? ':with_replies' : ''}`, `/api/v1/accounts/${accountId}/statuses`, { exclude_replies: !withReplies, max_id: maxId }); | |||
export const expandAccountFeaturedTimeline = accountId => expandTimeline(`account:${accountId}:pinned`, `/api/v1/accounts/${accountId}/statuses`, { pinned: true }); | |||
export const expandAccountMediaTimeline = (accountId, { maxId } = {}) => expandTimeline(`account:${accountId}:media`, `/api/v1/accounts/${accountId}/statuses`, { max_id: maxId, only_media: true }); | |||
@@ -1,18 +1,25 @@ | |||
import React from 'react'; | |||
import ImmutablePropTypes from 'react-immutable-proptypes'; | |||
import PropTypes from 'prop-types'; | |||
export default class DisplayName extends React.PureComponent { | |||
static propTypes = { | |||
account: ImmutablePropTypes.map.isRequired, | |||
withAcct: PropTypes.bool, | |||
}; | |||
static defaultProps = { | |||
withAcct: true, | |||
}; | |||
render () { | |||
const displayNameHtml = { __html: this.props.account.get('display_name_html') }; | |||
const { account, withAcct } = this.props; | |||
const displayNameHtml = { __html: account.get('display_name_html') }; | |||
return ( | |||
<span className='display-name'> | |||
<bdi><strong className='display-name__html' dangerouslySetInnerHTML={displayNameHtml} /></bdi> <span className='display-name__account'>@{this.props.account.get('acct')}</span> | |||
<bdi><strong className='display-name__html' dangerouslySetInnerHTML={displayNameHtml} /></bdi> {withAcct && <span className='display-name__account'>@{account.get('acct')}</span>} | |||
</span> | |||
); | |||
} | |||
@@ -0,0 +1,85 @@ | |||
import React from 'react'; | |||
import PropTypes from 'prop-types'; | |||
import ImmutablePropTypes from 'react-immutable-proptypes'; | |||
import ImmutablePureComponent from 'react-immutable-pure-component'; | |||
import StatusContent from '../../../components/status_content'; | |||
import RelativeTimestamp from '../../../components/relative_timestamp'; | |||
import DisplayName from '../../../components/display_name'; | |||
import Avatar from '../../../components/avatar'; | |||
import AttachmentList from '../../../components/attachment_list'; | |||
import { HotKeys } from 'react-hotkeys'; | |||
export default class Conversation extends ImmutablePureComponent { | |||
static contextTypes = { | |||
router: PropTypes.object, | |||
}; | |||
static propTypes = { | |||
conversationId: PropTypes.string.isRequired, | |||
accounts: ImmutablePropTypes.list.isRequired, | |||
lastStatus: ImmutablePropTypes.map.isRequired, | |||
onMoveUp: PropTypes.func, | |||
onMoveDown: PropTypes.func, | |||
}; | |||
handleClick = () => { | |||
if (!this.context.router) { | |||
return; | |||
} | |||
const { lastStatus } = this.props; | |||
this.context.router.history.push(`/statuses/${lastStatus.get('id')}`); | |||
} | |||
handleHotkeyMoveUp = () => { | |||
this.props.onMoveUp(this.props.conversationId); | |||
} | |||
handleHotkeyMoveDown = () => { | |||
this.props.onMoveDown(this.props.conversationId); | |||
} | |||
render () { | |||
const { accounts, lastStatus, lastAccount } = this.props; | |||
if (lastStatus === null) { | |||
return null; | |||
} | |||
const handlers = { | |||
moveDown: this.handleHotkeyMoveDown, | |||
moveUp: this.handleHotkeyMoveUp, | |||
open: this.handleClick, | |||
}; | |||
let media; | |||
if (lastStatus.get('media_attachments').size > 0) { | |||
media = <AttachmentList compact media={lastStatus.get('media_attachments')} />; | |||
} | |||
return ( | |||
<HotKeys handlers={handlers}> | |||
<div className='conversation focusable' tabIndex='0' onClick={this.handleClick} role='button'> | |||
<div className='conversation__header'> | |||
<div className='conversation__avatars'> | |||
<div>{accounts.map(account => <Avatar key={account.get('id')} size={36} account={account} />)}</div> | |||
</div> | |||
<div className='conversation__time'> | |||
<RelativeTimestamp timestamp={lastStatus.get('created_at')} /> | |||
<br /> | |||
<DisplayName account={lastAccount} withAcct={false} /> | |||
</div> | |||
</div> | |||
<StatusContent status={lastStatus} onClick={this.handleClick} /> | |||
{media} | |||
</div> | |||
</HotKeys> | |||
); | |||
} | |||
} |
@@ -0,0 +1,68 @@ | |||
import React from 'react'; | |||
import PropTypes from 'prop-types'; | |||
import ImmutablePropTypes from 'react-immutable-proptypes'; | |||
import ImmutablePureComponent from 'react-immutable-pure-component'; | |||
import ConversationContainer from '../containers/conversation_container'; | |||
import ScrollableList from '../../../components/scrollable_list'; | |||
import { debounce } from 'lodash'; | |||
export default class ConversationsList extends ImmutablePureComponent { | |||
static propTypes = { | |||
conversationIds: ImmutablePropTypes.list.isRequired, | |||
hasMore: PropTypes.bool, | |||
isLoading: PropTypes.bool, | |||
onLoadMore: PropTypes.func, | |||
shouldUpdateScroll: PropTypes.func, | |||
}; | |||
getCurrentIndex = id => this.props.conversationIds.indexOf(id) | |||
handleMoveUp = id => { | |||
const elementIndex = this.getCurrentIndex(id) - 1; | |||
this._selectChild(elementIndex); | |||
} | |||
handleMoveDown = id => { | |||
const elementIndex = this.getCurrentIndex(id) + 1; | |||
this._selectChild(elementIndex); | |||
} | |||
_selectChild (index) { | |||
const element = this.node.node.querySelector(`article:nth-of-type(${index + 1}) .focusable`); | |||
if (element) { | |||
element.focus(); | |||
} | |||
} | |||
setRef = c => { | |||
this.node = c; | |||
} | |||
handleLoadOlder = debounce(() => { | |||
const last = this.props.conversationIds.last(); | |||
if (last) { | |||
this.props.onLoadMore(last); | |||
} | |||
}, 300, { leading: true }) | |||
render () { | |||
const { conversationIds, onLoadMore, ...other } = this.props; | |||
return ( | |||
<ScrollableList {...other} onLoadMore={onLoadMore && this.handleLoadOlder} scrollKey='direct' ref={this.setRef}> | |||
{conversationIds.map(item => ( | |||
<ConversationContainer | |||
key={item} | |||
conversationId={item} | |||
onMoveUp={this.handleMoveUp} | |||
onMoveDown={this.handleMoveDown} | |||
/> | |||
))} | |||
</ScrollableList> | |||
); | |||
} | |||
} |
@@ -0,0 +1,15 @@ | |||
import { connect } from 'react-redux'; | |||
import Conversation from '../components/conversation'; | |||
const mapStateToProps = (state, { conversationId }) => { | |||
const conversation = state.getIn(['conversations', 'items']).find(x => x.get('id') === conversationId); | |||
const lastStatus = state.getIn(['statuses', conversation.get('last_status')], null); | |||
return { | |||
accounts: conversation.get('accounts').map(accountId => state.getIn(['accounts', accountId], null)), | |||
lastStatus, | |||
lastAccount: lastStatus === null ? null : state.getIn(['accounts', lastStatus.get('account')], null), | |||
}; | |||
}; | |||
export default connect(mapStateToProps)(Conversation); |
@@ -0,0 +1,15 @@ | |||
import { connect } from 'react-redux'; | |||
import ConversationsList from '../components/conversations_list'; | |||
import { expandConversations } from '../../../actions/conversations'; | |||
const mapStateToProps = state => ({ | |||
conversationIds: state.getIn(['conversations', 'items']).map(x => x.get('id')), | |||
isLoading: state.getIn(['conversations', 'isLoading'], true), | |||
hasMore: state.getIn(['conversations', 'hasMore'], false), | |||
}); | |||
const mapDispatchToProps = dispatch => ({ | |||
onLoadMore: maxId => dispatch(expandConversations({ maxId })), | |||
}); | |||
export default connect(mapStateToProps, mapDispatchToProps)(ConversationsList); |
@@ -1,23 +1,19 @@ | |||
import React from 'react'; | |||
import { connect } from 'react-redux'; | |||
import PropTypes from 'prop-types'; | |||
import StatusListContainer from '../ui/containers/status_list_container'; | |||
import Column from '../../components/column'; | |||
import ColumnHeader from '../../components/column_header'; | |||
import { expandDirectTimeline } from '../../actions/timelines'; | |||
import { expandConversations } from '../../actions/conversations'; | |||
import { addColumn, removeColumn, moveColumn } from '../../actions/columns'; | |||
import { defineMessages, injectIntl, FormattedMessage } from 'react-intl'; | |||
import { defineMessages, injectIntl } from 'react-intl'; | |||
import { connectDirectStream } from '../../actions/streaming'; | |||
import ConversationsListContainer from './containers/conversations_list_container'; | |||
const messages = defineMessages({ | |||
title: { id: 'column.direct', defaultMessage: 'Direct messages' }, | |||
}); | |||
const mapStateToProps = state => ({ | |||
hasUnread: state.getIn(['timelines', 'direct', 'unread']) > 0, | |||
}); | |||
export default @connect(mapStateToProps) | |||
export default @connect() | |||
@injectIntl | |||
class DirectTimeline extends React.PureComponent { | |||
@@ -52,7 +48,7 @@ class DirectTimeline extends React.PureComponent { | |||
componentDidMount () { | |||
const { dispatch } = this.props; | |||
dispatch(expandDirectTimeline()); | |||
dispatch(expandConversations()); | |||
this.disconnect = dispatch(connectDirectStream()); | |||
} | |||
@@ -68,11 +64,11 @@ class DirectTimeline extends React.PureComponent { | |||
} | |||
handleLoadMore = maxId => { | |||
this.props.dispatch(expandDirectTimeline({ maxId })); | |||
this.props.dispatch(expandConversations({ maxId })); | |||
} | |||
render () { | |||
const { intl, shouldUpdateScroll, hasUnread, columnId, multiColumn } = this.props; | |||
const { intl, hasUnread, columnId, multiColumn, shouldUpdateScroll } = this.props; | |||
const pinned = !!columnId; | |||
return ( | |||
@@ -88,14 +84,7 @@ class DirectTimeline extends React.PureComponent { | |||
multiColumn={multiColumn} | |||
/> | |||
<StatusListContainer | |||
trackScroll={!pinned} | |||
scrollKey={`direct_timeline-${columnId}`} | |||
timelineId='direct' | |||
onLoadMore={this.handleLoadMore} | |||
emptyMessage={<FormattedMessage id='empty_column.direct' defaultMessage="You don't have any direct messages yet. When you send or receive one, it will show up here." />} | |||
shouldUpdateScroll={shouldUpdateScroll} | |||
/> | |||
<ConversationsListContainer shouldUpdateScroll={shouldUpdateScroll} /> | |||
</Column> | |||
); | |||
} | |||
@@ -0,0 +1,79 @@ | |||
import { Map as ImmutableMap, List as ImmutableList } from 'immutable'; | |||
import { | |||
CONVERSATIONS_FETCH_REQUEST, | |||
CONVERSATIONS_FETCH_SUCCESS, | |||
CONVERSATIONS_FETCH_FAIL, | |||
CONVERSATIONS_UPDATE, | |||
} from '../actions/conversations'; | |||
import compareId from '../compare_id'; | |||
const initialState = ImmutableMap({ | |||
items: ImmutableList(), | |||
isLoading: false, | |||
hasMore: true, | |||
}); | |||
const conversationToMap = item => ImmutableMap({ | |||
id: item.id, | |||
accounts: ImmutableList(item.accounts.map(a => a.id)), | |||
last_status: item.last_status.id, | |||
}); | |||
const updateConversation = (state, item) => state.update('items', list => { | |||
const index = list.findIndex(x => x.get('id') === item.id); | |||
const newItem = conversationToMap(item); | |||
if (index === -1) { | |||
return list.unshift(newItem); | |||
} else { | |||
return list.set(index, newItem); | |||
} | |||
}); | |||
const expandNormalizedConversations = (state, conversations, next) => { | |||
let items = ImmutableList(conversations.map(conversationToMap)); | |||
return state.withMutations(mutable => { | |||
if (!items.isEmpty()) { | |||
mutable.update('items', list => { | |||
list = list.map(oldItem => { | |||
const newItemIndex = items.findIndex(x => x.get('id') === oldItem.get('id')); | |||
if (newItemIndex === -1) { | |||
return oldItem; | |||
} | |||
const newItem = items.get(newItemIndex); | |||
items = items.delete(newItemIndex); | |||
return newItem; | |||
}); | |||
list = list.concat(items); | |||
return list.sortBy(x => x.get('last_status'), (a, b) => compareId(a, b) * -1); | |||
}); | |||
} | |||
if (!next) { | |||
mutable.set('hasMore', false); | |||
} | |||
mutable.set('isLoading', false); | |||
}); | |||
}; | |||
export default function conversations(state = initialState, action) { | |||
switch (action.type) { | |||
case CONVERSATIONS_FETCH_REQUEST: | |||
return state.set('isLoading', true); | |||
case CONVERSATIONS_FETCH_FAIL: | |||
return state.set('isLoading', false); | |||
case CONVERSATIONS_FETCH_SUCCESS: | |||
return expandNormalizedConversations(state, action.conversations, action.next); | |||
case CONVERSATIONS_UPDATE: | |||
return updateConversation(state, action.conversation); | |||
default: | |||
return state; | |||
} | |||
}; |
@@ -27,6 +27,7 @@ import custom_emojis from './custom_emojis'; | |||
import lists from './lists'; | |||
import listEditor from './list_editor'; | |||
import filters from './filters'; | |||
import conversations from './conversations'; | |||
const reducers = { | |||
dropdown_menu, | |||
@@ -57,6 +58,7 @@ const reducers = { | |||
lists, | |||
listEditor, | |||
filters, | |||
conversations, | |||
}; | |||
export default combineReducers(reducers); |
@@ -69,7 +69,7 @@ const expandNormalizedNotifications = (state, notifications, next) => { | |||
} | |||
if (!next) { | |||
mutable.set('hasMore', true); | |||
mutable.set('hasMore', false); | |||
} | |||
mutable.set('isLoading', false); | |||
@@ -825,6 +825,7 @@ | |||
&.status-direct { | |||
background: lighten($ui-base-color, 8%); | |||
border-bottom-color: lighten($ui-base-color, 12%); | |||
} | |||
&.light { | |||
@@ -5496,3 +5497,44 @@ noscript { | |||
} | |||
} | |||
} | |||
.conversation { | |||
padding: 14px 10px; | |||
border-bottom: 1px solid lighten($ui-base-color, 8%); | |||
cursor: pointer; | |||
&__header { | |||
display: flex; | |||
margin-bottom: 15px; | |||
} | |||
&__avatars { | |||
overflow: hidden; | |||
flex: 1 1 auto; | |||
& > div { | |||
display: flex; | |||
flex-wrap: none; | |||
width: 900px; | |||
} | |||
.account__avatar { | |||
margin-right: 10px; | |||
} | |||
} | |||
&__time { | |||
flex: 0 0 auto; | |||
font-size: 14px; | |||
color: $darker-text-color; | |||
text-align: right; | |||
.display-name { | |||
color: $secondary-text-color; | |||
} | |||
} | |||
.attachment-list.compact { | |||
margin-top: 15px; | |||
} | |||
} |
@@ -13,6 +13,8 @@ class InlineRenderer | |||
serializer = REST::StatusSerializer | |||
when :notification | |||
serializer = REST::NotificationSerializer | |||
when :conversation | |||
serializer = REST::ConversationSerializer | |||
else | |||
return | |||
end | |||
@@ -0,0 +1,111 @@ | |||
# frozen_string_literal: true | |||
# == Schema Information | |||
# | |||
# Table name: account_conversations | |||
# | |||
# id :bigint(8) not null, primary key | |||
# account_id :bigint(8) | |||
# conversation_id :bigint(8) | |||
# participant_account_ids :bigint(8) default([]), not null, is an Array | |||
# status_ids :bigint(8) default([]), not null, is an Array | |||
# last_status_id :bigint(8) | |||
# lock_version :integer default(0), not null | |||
# | |||
class AccountConversation < ApplicationRecord | |||
after_commit :push_to_streaming_api | |||
belongs_to :account | |||
belongs_to :conversation | |||
belongs_to :last_status, class_name: 'Status' | |||
before_validation :set_last_status | |||
def participant_account_ids=(arr) | |||
self[:participant_account_ids] = arr.sort | |||
end | |||
def participant_accounts | |||
if participant_account_ids.empty? | |||
[account] | |||
else | |||
Account.where(id: participant_account_ids) | |||
end | |||
end | |||
class << self | |||
def paginate_by_id(limit, options = {}) | |||
if options[:min_id] | |||
paginate_by_min_id(limit, options[:min_id]).reverse | |||
else | |||
paginate_by_max_id(limit, options[:max_id], options[:since_id]) | |||
end | |||
end | |||
def paginate_by_min_id(limit, min_id = nil) | |||
query = order(arel_table[:last_status_id].asc).limit(limit) | |||
query = query.where(arel_table[:last_status_id].gt(min_id)) if min_id.present? | |||
query | |||
end | |||
def paginate_by_max_id(limit, max_id = nil, since_id = nil) | |||
query = order(arel_table[:last_status_id].desc).limit(limit) | |||
query = query.where(arel_table[:last_status_id].lt(max_id)) if max_id.present? | |||
query = query.where(arel_table[:last_status_id].gt(since_id)) if since_id.present? | |||
query | |||
end | |||
def add_status(recipient, status) | |||
conversation = find_or_initialize_by(account: recipient, conversation_id: status.conversation_id, participant_account_ids: participants_from_status(recipient, status)) | |||
conversation.status_ids << status.id | |||
conversation.save | |||
conversation | |||
rescue ActiveRecord::StaleObjectError | |||
retry | |||
end | |||
def remove_status(recipient, status) | |||
conversation = find_by(account: recipient, conversation_id: status.conversation_id, participant_account_ids: participants_from_status(recipient, status)) | |||
return if conversation.nil? | |||
conversation.status_ids.delete(status.id) | |||
if conversation.status_ids.empty? | |||
conversation.destroy | |||
else | |||
conversation.save | |||
end | |||
conversation | |||
rescue ActiveRecord::StaleObjectError | |||
retry | |||
end | |||
private | |||
def participants_from_status(recipient, status) | |||
((status.mentions.pluck(:account_id) + [status.account_id]).uniq - [recipient.id]).sort | |||
end | |||
end | |||
private | |||
def set_last_status | |||
self.status_ids = status_ids.sort | |||
self.last_status_id = status_ids.last | |||
end | |||
def push_to_streaming_api | |||
return if destroyed? || !subscribed_to_timeline? | |||
PushConversationWorker.perform_async(id) | |||
end | |||
def subscribed_to_timeline? | |||
Redis.current.exists("subscribed:#{streaming_channel}") | |||
end | |||
def streaming_channel | |||
"timeline:direct:#{account_id}" | |||
end | |||
end |
@@ -24,6 +24,8 @@ | |||
# | |||
class Status < ApplicationRecord | |||
before_destroy :unlink_from_conversations | |||
include Paginable | |||
include Streamable | |||
include Cacheable | |||
@@ -473,4 +475,15 @@ class Status < ApplicationRecord | |||
reblog&.decrement_count!(:reblogs_count) if reblog? | |||
thread&.decrement_count!(:replies_count) if in_reply_to_id.present? && (public_visibility? || unlisted_visibility?) | |||
end | |||
def unlink_from_conversations | |||
return unless direct_visibility? | |||
mentioned_accounts = mentions.includes(:account).map(&:account) | |||
inbox_owners = mentioned_accounts.select(&:local?) + (account.local? ? [account] : []) | |||
inbox_owners.each do |inbox_owner| | |||
AccountConversation.remove_status(inbox_owner, self) | |||
end | |||
end | |||
end |
@@ -0,0 +1,7 @@ | |||
# frozen_string_literal: true | |||
class REST::ConversationSerializer < ActiveModel::Serializer | |||
attribute :id | |||
has_many :participant_accounts, key: :accounts, serializer: REST::AccountSerializer | |||
has_one :last_status, serializer: REST::StatusSerializer | |||
end |
@@ -2,16 +2,43 @@ | |||
class AfterBlockService < BaseService | |||
def call(account, target_account) | |||
FeedManager.instance.clear_from_timeline(account, target_account) | |||
clear_home_feed(account, target_account) | |||
clear_notifications(account, target_account) | |||
clear_conversations(account, target_account) | |||
end | |||
private | |||
def clear_home_feed(account, target_account) | |||
FeedManager.instance.clear_from_timeline(account, target_account) | |||
end | |||
def clear_conversations(account, target_account) | |||
AccountConversation.where(account: account) | |||
.where('? = ANY(participant_account_ids)', target_account.id) | |||
.in_batches | |||
.destroy_all | |||
end | |||
def clear_notifications(account, target_account) | |||
Notification.where(account: account).joins(:follow).where(activity_type: 'Follow', follows: { account_id: target_account.id }).delete_all | |||
Notification.where(account: account).joins(mention: :status).where(activity_type: 'Mention', statuses: { account_id: target_account.id }).delete_all | |||
Notification.where(account: account).joins(:favourite).where(activity_type: 'Favourite', favourites: { account_id: target_account.id }).delete_all | |||
Notification.where(account: account).joins(:status).where(activity_type: 'Status', statuses: { account_id: target_account.id }).delete_all | |||
Notification.where(account: account) | |||
.joins(:follow) | |||
.where(activity_type: 'Follow', follows: { account_id: target_account.id }) | |||
.delete_all | |||
Notification.where(account: account) | |||
.joins(mention: :status) | |||
.where(activity_type: 'Mention', statuses: { account_id: target_account.id }) | |||
.delete_all | |||
Notification.where(account: account) | |||
.joins(:favourite) | |||
.where(activity_type: 'Favourite', favourites: { account_id: target_account.id }) | |||
.delete_all | |||
Notification.where(account: account) | |||
.joins(:status) | |||
.where(activity_type: 'Status', statuses: { account_id: target_account.id }) | |||
.delete_all | |||
end | |||
end |
@@ -13,6 +13,7 @@ class FanOutOnWriteService < BaseService | |||
if status.direct_visibility? | |||
deliver_to_mentioned_followers(status) | |||
deliver_to_direct_timelines(status) | |||
deliver_to_own_conversation(status) | |||
else | |||
deliver_to_followers(status) | |||
deliver_to_lists(status) | |||
@@ -99,6 +100,11 @@ class FanOutOnWriteService < BaseService | |||
status.mentions.includes(:account).each do |mention| | |||
Redis.current.publish("timeline:direct:#{mention.account.id}", @payload) if mention.account.local? | |||
end | |||
Redis.current.publish("timeline:direct:#{status.account.id}", @payload) if status.account.local? | |||
end | |||
def deliver_to_own_conversation(status) | |||
AccountConversation.add_status(status.account, status) | |||
end | |||
end |
@@ -5,11 +5,13 @@ class MuteService < BaseService | |||
return if account.id == target_account.id | |||
mute = account.mute!(target_account, notifications: notifications) | |||
if mute.hide_notifications? | |||
BlockWorker.perform_async(account.id, target_account.id) | |||
else | |||
FeedManager.instance.clear_from_timeline(account, target_account) | |||
MuteWorker.perform_async(account.id, target_account.id) | |||
end | |||
mute | |||
end | |||
end |
@@ -8,9 +8,10 @@ class NotifyService < BaseService | |||
return if recipient.user.nil? || blocked? | |||
create_notification | |||
push_notification if @notification.browserable? | |||
send_email if email_enabled? | |||
create_notification! | |||
push_notification! if @notification.browserable? | |||
push_to_conversation! if direct_message? | |||
send_email! if email_enabled? | |||
rescue ActiveRecord::RecordInvalid | |||
return | |||
end | |||
@@ -100,18 +101,23 @@ class NotifyService < BaseService | |||
end | |||
end | |||
def create_notification | |||
def create_notification! | |||
@notification.save! | |||
end | |||
def push_notification | |||
def push_notification! | |||
return if @notification.activity.nil? | |||
Redis.current.publish("timeline:#{@recipient.id}", Oj.dump(event: :notification, payload: InlineRenderer.render(@notification, @recipient, :notification))) | |||
send_push_notifications | |||
send_push_notifications! | |||
end | |||
def send_push_notifications | |||
def push_to_conversation! | |||
return if @notification.activity.nil? | |||
AccountConversation.add_status(@recipient, @notification.target_status) | |||
end | |||
def send_push_notifications! | |||
subscriptions_ids = ::Web::PushSubscription.where(user_id: @recipient.user.id) | |||
.select { |subscription| subscription.pushable?(@notification) } | |||
.map(&:id) | |||
@@ -121,7 +127,7 @@ class NotifyService < BaseService | |||
end | |||
end | |||
def send_email | |||
def send_email! | |||
return if @notification.activity.nil? | |||
NotificationMailer.public_send(@notification.type, @recipient, @notification).deliver_later(wait: 2.minutes) | |||
end | |||
@@ -4,6 +4,9 @@ class BlockWorker | |||
include Sidekiq::Worker | |||
def perform(account_id, target_account_id) | |||
AfterBlockService.new.call(Account.find(account_id), Account.find(target_account_id)) | |||
AfterBlockService.new.call( | |||
Account.find(account_id), | |||
Account.find(target_account_id) | |||
) | |||
end | |||
end |
@@ -0,0 +1,12 @@ | |||
# frozen_string_literal: true | |||
class MuteWorker | |||
include Sidekiq::Worker | |||
def perform(account_id, target_account_id) | |||
FeedManager.instance.clear_from_timeline( | |||
Account.find(account_id), | |||
Account.find(target_account_id) | |||
) | |||
end | |||
end |
@@ -0,0 +1,15 @@ | |||
# frozen_string_literal: true | |||
class PushConversationWorker | |||
include Sidekiq::Worker | |||
def perform(conversation_account_id) | |||
conversation = AccountConversation.find(conversation_account_id) | |||
message = InlineRenderer.render(conversation, conversation.account, :conversation) | |||
timeline_id = "timeline:direct:#{conversation.account_id}" | |||
Redis.current.publish(timeline_id, Oj.dump(event: :conversation, payload: message, queued_at: (Time.now.to_f * 1000.0).to_i)) | |||
rescue ActiveRecord::RecordNotFound | |||
true | |||
end | |||
end |
@@ -87,7 +87,7 @@ Rails.application.configure do | |||
config.x.otp_secret = ENV.fetch('OTP_SECRET', '1fc2b87989afa6351912abeebe31ffc5c476ead9bf8b3d74cbc4a302c7b69a45b40b1bbef3506ddad73e942e15ed5ca4b402bf9a66423626051104f4b5f05109') | |||
end | |||
ActiveRecordQueryTrace.enabled = ENV.fetch('QUERY_TRACE_ENABLED') { false } | |||
ActiveRecordQueryTrace.enabled = ENV['QUERY_TRACE_ENABLED'] == 'true' | |||
module PrivateAddressCheck | |||
def self.private_address?(*) | |||
@@ -261,6 +261,7 @@ Rails.application.routes.draw do | |||
resources :streaming, only: [:index] | |||
resources :custom_emojis, only: [:index] | |||
resources :suggestions, only: [:index, :destroy] | |||
resources :conversations, only: [:index] | |||
get '/search', to: 'search#index', as: :search | |||
@@ -0,0 +1,14 @@ | |||
class CreateAccountConversations < ActiveRecord::Migration[5.2] | |||
def change | |||
create_table :account_conversations do |t| | |||
t.belongs_to :account, foreign_key: { on_delete: :cascade } | |||
t.belongs_to :conversation, foreign_key: { on_delete: :cascade } | |||
t.bigint :participant_account_ids, array: true, null: false, default: [] | |||
t.bigint :status_ids, array: true, null: false, default: [] | |||
t.bigint :last_status_id, null: true, default: nil | |||
t.integer :lock_version, null: false, default: 0 | |||
end | |||
add_index :account_conversations, [:account_id, :conversation_id, :participant_account_ids], unique: true, name: 'index_unique_conversations' | |||
end | |||
end |
@@ -10,10 +10,23 @@ | |||
# | |||
# It's strongly recommended that you check this file into your version control system. | |||
ActiveRecord::Schema.define(version: 2018_08_20_232245) do | |||
ActiveRecord::Schema.define(version: 2018_09_29_222014) do | |||
# These are extensions that must be enabled in order to support this database | |||
enable_extension "plpgsql" | |||
create_table "account_conversations", force: :cascade do |t| | |||
t.bigint "account_id" | |||
t.bigint "conversation_id" | |||
t.bigint "participant_account_ids", default: [], null: false, array: true | |||
t.bigint "status_ids", default: [], null: false, array: true | |||
t.bigint "last_status_id" | |||
t.integer "lock_version", default: 0, null: false | |||
t.index ["account_id", "conversation_id", "participant_account_ids"], name: "index_unique_conversations", unique: true | |||
t.index ["account_id"], name: "index_account_conversations_on_account_id" | |||
t.index ["conversation_id"], name: "index_account_conversations_on_conversation_id" | |||
end | |||
create_table "account_domain_blocks", force: :cascade do |t| | |||
t.string "domain" | |||
t.datetime "created_at", null: false | |||
@@ -597,6 +610,8 @@ ActiveRecord::Schema.define(version: 2018_08_20_232245) do | |||
t.index ["user_id"], name: "index_web_settings_on_user_id", unique: true | |||
end | |||
add_foreign_key "account_conversations", "accounts", on_delete: :cascade | |||
add_foreign_key "account_conversations", "conversations", on_delete: :cascade | |||
add_foreign_key "account_domain_blocks", "accounts", name: "fk_206c6029bd", on_delete: :cascade | |||
add_foreign_key "account_moderation_notes", "accounts" | |||
add_foreign_key "account_moderation_notes", "accounts", column: "target_account_id" | |||
@@ -0,0 +1,37 @@ | |||
require 'rails_helper' | |||
RSpec.describe Api::V1::ConversationsController, type: :controller do | |||
render_views | |||
let!(:user) { Fabricate(:user, account: Fabricate(:account, username: 'alice')) } | |||
let(:token) { Fabricate(:accessible_access_token, resource_owner_id: user.id, scopes: scopes) } | |||
let(:other) { Fabricate(:user, account: Fabricate(:account, username: 'bob')) } | |||
before do | |||
allow(controller).to receive(:doorkeeper_token) { token } | |||
end | |||
describe 'GET #index' do | |||
let(:scopes) { 'read:statuses' } | |||
before do | |||
PostStatusService.new.call(other.account, 'Hey @alice', nil, visibility: 'direct') | |||
end | |||
it 'returns http success' do | |||
get :index | |||
expect(response).to have_http_status(200) | |||
end | |||
it 'returns pagination headers' do | |||
get :index, params: { limit: 1 } | |||
expect(response.headers['Link'].links.size).to eq(2) | |||
end | |||
it 'returns conversations' do | |||
get :index | |||
json = body_as_json | |||
expect(json.size).to eq 1 | |||
end | |||
end | |||
end |
@@ -0,0 +1,6 @@ | |||
Fabricator(:conversation_account) do | |||
account nil | |||
conversation nil | |||
participant_account_ids "" | |||
last_status nil | |||
end |
@@ -0,0 +1,72 @@ | |||
require 'rails_helper' | |||
RSpec.describe AccountConversation, type: :model do | |||
let!(:alice) { Fabricate(:account, username: 'alice') } | |||
let!(:bob) { Fabricate(:account, username: 'bob') } | |||
let!(:mark) { Fabricate(:account, username: 'mark') } | |||
describe '.add_status' do | |||
it 'creates new record when no others exist' do | |||
status = Fabricate(:status, account: alice, visibility: :direct) | |||
status.mentions.create(account: bob) | |||
conversation = AccountConversation.add_status(alice, status) | |||
expect(conversation.participant_accounts).to include(bob) | |||
expect(conversation.last_status).to eq status | |||
expect(conversation.status_ids).to eq [status.id] | |||
end | |||
it 'appends to old record when there is a match' do | |||
last_status = Fabricate(:status, account: alice, visibility: :direct) | |||
conversation = AccountConversation.create!(account: alice, conversation: last_status.conversation, participant_account_ids: [bob.id], status_ids: [last_status.id]) | |||
status = Fabricate(:status, account: bob, visibility: :direct, thread: last_status) | |||
status.mentions.create(account: alice) | |||
new_conversation = AccountConversation.add_status(alice, status) | |||
expect(new_conversation.id).to eq conversation.id | |||
expect(new_conversation.participant_accounts).to include(bob) | |||
expect(new_conversation.last_status).to eq status | |||
expect(new_conversation.status_ids).to eq [last_status.id, status.id] | |||
end | |||
it 'creates new record when new participants are added' do | |||
last_status = Fabricate(:status, account: alice, visibility: :direct) | |||
conversation = AccountConversation.create!(account: alice, conversation: last_status.conversation, participant_account_ids: [bob.id], status_ids: [last_status.id]) | |||
status = Fabricate(:status, account: bob, visibility: :direct, thread: last_status) | |||
status.mentions.create(account: alice) | |||
status.mentions.create(account: mark) | |||
new_conversation = AccountConversation.add_status(alice, status) | |||
expect(new_conversation.id).to_not eq conversation.id | |||
expect(new_conversation.participant_accounts).to include(bob, mark) | |||
expect(new_conversation.last_status).to eq status | |||
expect(new_conversation.status_ids).to eq [status.id] | |||
end | |||
end | |||
describe '.remove_status' do | |||
it 'updates last status to a previous value' do | |||
last_status = Fabricate(:status, account: alice, visibility: :direct) | |||
status = Fabricate(:status, account: alice, visibility: :direct) | |||
conversation = AccountConversation.create!(account: alice, conversation: last_status.conversation, participant_account_ids: [bob.id], status_ids: [status.id, last_status.id]) | |||
last_status.mentions.create(account: bob) | |||
last_status.destroy! | |||
conversation.reload | |||
expect(conversation.last_status).to eq status | |||
expect(conversation.status_ids).to eq [status.id] | |||
end | |||
it 'removes the record if no other statuses are referenced' do | |||
last_status = Fabricate(:status, account: alice, visibility: :direct) | |||
conversation = AccountConversation.create!(account: alice, conversation: last_status.conversation, participant_account_ids: [bob.id], status_ids: [last_status.id]) | |||
last_status.mentions.create(account: bob) | |||
last_status.destroy! | |||
expect(AccountConversation.where(id: conversation.id).count).to eq 0 | |||
end | |||
end | |||
end |
@@ -485,7 +485,8 @@ const startWorker = (workerId) => { | |||
}); | |||
app.get('/api/v1/streaming/direct', (req, res) => { | |||
streamFrom(`timeline:direct:${req.accountId}`, req, streamToHttp(req, res), streamHttpEnd(req), true); | |||
const channel = `timeline:direct:${req.accountId}`; | |||
streamFrom(channel, req, streamToHttp(req, res), streamHttpEnd(req, subscriptionHeartbeat(channel)), true); | |||
}); | |||
app.get('/api/v1/streaming/hashtag', (req, res) => { | |||
@@ -525,9 +526,11 @@ const startWorker = (workerId) => { | |||
ws.isAlive = true; | |||
}); | |||
let channel; | |||
switch(location.query.stream) { | |||
case 'user': | |||
const channel = `timeline:${req.accountId}`; | |||
channel = `timeline:${req.accountId}`; | |||
streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel))); | |||
break; | |||
case 'user:notification': | |||
@@ -546,7 +549,8 @@ const startWorker = (workerId) => { | |||
streamFrom('timeline:public:local:media', req, streamToWs(req, ws), streamWsEnd(req, ws), true); | |||
break; | |||
case 'direct': | |||
streamFrom(`timeline:direct:${req.accountId}`, req, streamToWs(req, ws), streamWsEnd(req, ws), true); | |||
channel = `timeline:direct:${req.accountId}`; | |||
streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel)), true); | |||
break; | |||
case 'hashtag': | |||
streamFrom(`timeline:hashtag:${location.query.tag.toLowerCase()}`, req, streamToWs(req, ws), streamWsEnd(req, ws), true); | |||
@@ -563,7 +567,7 @@ const startWorker = (workerId) => { | |||
return; | |||
} | |||
const channel = `timeline:list:${listId}`; | |||
channel = `timeline:list:${listId}`; | |||
streamFrom(channel, req, streamToWs(req, ws), streamWsEnd(req, ws, subscriptionHeartbeat(channel))); | |||
}); | |||
break; | |||