- A successful delivery cancels it out - An incoming delivery from account of the inbox cancels it outmaster
@@ -32,6 +32,7 @@ class ActivityPub::InboxesController < Api::BaseController | |||||
end | end | ||||
Pubsubhubbub::UnsubscribeWorker.perform_async(signed_request_account.id) if signed_request_account.subscribed? | Pubsubhubbub::UnsubscribeWorker.perform_async(signed_request_account.id) if signed_request_account.subscribed? | ||||
DeliveryFailureTracker.track_inverse_success!(signed_request_account) | |||||
end | end | ||||
def process_payload | def process_payload | ||||
@@ -0,0 +1,56 @@ | |||||
# frozen_string_literal: true | |||||
class DeliveryFailureTracker | |||||
FAILURE_DAYS_THRESHOLD = 7 | |||||
def initialize(inbox_url) | |||||
@inbox_url = inbox_url | |||||
end | |||||
def track_failure! | |||||
Redis.current.sadd(exhausted_deliveries_key, today) | |||||
Redis.current.sadd('unavailable_inboxes', @inbox_url) if reached_failure_threshold? | |||||
end | |||||
def track_success! | |||||
Redis.current.del(exhausted_deliveries_key) | |||||
Redis.current.srem('unavailable_inboxes', @inbox_url) | |||||
end | |||||
def days | |||||
Redis.current.scard(exhausted_deliveries_key) || 0 | |||||
end | |||||
class << self | |||||
def filter(arr) | |||||
arr.reject(&method(:unavailable?)) | |||||
end | |||||
def unavailable?(url) | |||||
Redis.current.sismember('unavailable_inboxes', url) | |||||
end | |||||
def available?(url) | |||||
!unavailable?(url) | |||||
end | |||||
def track_inverse_success!(from_account) | |||||
new(from_account.inbox_url).track_success! if from_account.inbox_url.present? | |||||
new(from_account.shared_inbox_url).track_success! if from_account.shared_inbox_url.present? | |||||
end | |||||
end | |||||
private | |||||
def exhausted_deliveries_key | |||||
"exhausted_deliveries:#{@inbox_url}" | |||||
end | |||||
def today | |||||
Time.now.utc.strftime('%Y%m%d') | |||||
end | |||||
def reached_failure_threshold? | |||||
days >= FAILURE_DAYS_THRESHOLD | |||||
end | |||||
end |
@@ -190,7 +190,8 @@ class Account < ApplicationRecord | |||||
end | end | ||||
def inboxes | def inboxes | ||||
reorder(nil).where(protocol: :activitypub).pluck("distinct coalesce(nullif(accounts.shared_inbox_url, ''), accounts.inbox_url)") | |||||
urls = reorder(nil).where(protocol: :activitypub).pluck("distinct coalesce(nullif(accounts.shared_inbox_url, ''), accounts.inbox_url)") | |||||
DeliveryFailureTracker.filter(urls) | |||||
end | end | ||||
def triadic_closures(account, limit: 5, offset: 0) | def triadic_closures(account, limit: 5, offset: 0) | ||||
@@ -15,7 +15,10 @@ class ActivityPub::DeliveryWorker | |||||
perform_request | perform_request | ||||
raise Mastodon::UnexpectedResponseError, @response unless response_successful? | raise Mastodon::UnexpectedResponseError, @response unless response_successful? | ||||
failure_tracker.track_success! | |||||
rescue => e | rescue => e | ||||
failure_tracker.track_failure! | |||||
raise e.class, "Delivery failed for #{inbox_url}: #{e.message}", e.backtrace[0] | raise e.class, "Delivery failed for #{inbox_url}: #{e.message}", e.backtrace[0] | ||||
end | end | ||||
@@ -34,4 +37,8 @@ class ActivityPub::DeliveryWorker | |||||
def response_successful? | def response_successful? | ||||
@response.code > 199 && @response.code < 300 | @response.code > 199 && @response.code < 300 | ||||
end | end | ||||
def failure_tracker | |||||
@failure_tracker ||= DeliveryFailureTracker.new(@inbox_url) | |||||
end | |||||
end | end |
@@ -0,0 +1,71 @@ | |||||
# frozen_string_literal: true | |||||
require 'rails_helper' | |||||
describe DeliveryFailureTracker do | |||||
subject { described_class.new('http://example.com/inbox') } | |||||
describe '#track_success!' do | |||||
before do | |||||
subject.track_failure! | |||||
subject.track_success! | |||||
end | |||||
it 'marks URL as available again' do | |||||
expect(described_class.available?('http://example.com/inbox')).to be true | |||||
end | |||||
it 'resets days to 0' do | |||||
expect(subject.days).to be_zero | |||||
end | |||||
end | |||||
describe '#track_failure!' do | |||||
it 'marks URL as unavailable after 7 days of being called' do | |||||
6.times { |i| Redis.current.sadd('exhausted_deliveries:http://example.com/inbox', i) } | |||||
subject.track_failure! | |||||
expect(subject.days).to eq 7 | |||||
expect(described_class.unavailable?('http://example.com/inbox')).to be true | |||||
end | |||||
it 'repeated calls on the same day do not count' do | |||||
subject.track_failure! | |||||
subject.track_failure! | |||||
expect(subject.days).to eq 1 | |||||
end | |||||
end | |||||
describe '.filter' do | |||||
before do | |||||
Redis.current.sadd('unavailable_inboxes', 'http://example.com/unavailable/inbox') | |||||
end | |||||
it 'removes URLs that are unavailable' do | |||||
result = described_class.filter(['http://example.com/good/inbox', 'http://example.com/unavailable/inbox']) | |||||
expect(result).to include('http://example.com/good/inbox') | |||||
expect(result).to_not include('http://example.com/unavailable/inbox') | |||||
end | |||||
end | |||||
describe '.track_inverse_success!' do | |||||
let(:from_account) { Fabricate(:account, inbox_url: 'http://example.com/inbox', shared_inbox_url: 'http://example.com/shared/inbox') } | |||||
before do | |||||
Redis.current.sadd('unavailable_inboxes', 'http://example.com/inbox') | |||||
Redis.current.sadd('unavailable_inboxes', 'http://example.com/shared/inbox') | |||||
described_class.track_inverse_success!(from_account) | |||||
end | |||||
it 'marks inbox URL as available again' do | |||||
expect(described_class.available?('http://example.com/inbox')).to be true | |||||
end | |||||
it 'marks shared inbox URL as available again' do | |||||
expect(described_class.available?('http://example.com/shared/inbox')).to be true | |||||
end | |||||
end | |||||
end |