* Framework for delivery worker spec * Refactor of pubsub delivery workermaster
@@ -10,40 +10,83 @@ class Pubsubhubbub::DeliveryWorker | |||
5 * (count + 1) | |||
end | |||
attr_reader :subscription, :payload | |||
def perform(subscription_id, payload) | |||
subscription = Subscription.find(subscription_id) | |||
headers = {} | |||
host = Addressable::URI.parse(subscription.callback_url).normalize.host | |||
@subscription = Subscription.find(subscription_id) | |||
@payload = payload | |||
process_delivery unless blocked_domain? | |||
end | |||
return if DomainBlock.blocked?(host) | |||
private | |||
headers['User-Agent'] = 'Mastodon/PubSubHubbub' | |||
headers['Content-Type'] = 'application/atom+xml' | |||
headers['Link'] = LinkHeader.new([[api_push_url, [%w(rel hub)]], [account_url(subscription.account, format: :atom), [%w(rel self)]]]).to_s | |||
headers['X-Hub-Signature'] = signature(subscription.secret, payload) if subscription.secret? | |||
def process_delivery | |||
payload_delivery | |||
response = HTTP.timeout(:per_operation, write: 50, connect: 20, read: 50) | |||
.headers(headers) | |||
.post(subscription.callback_url, body: payload) | |||
if response_successful? | |||
subscription.touch(:last_successful_delivery_at) | |||
elsif response_failed_permanently? | |||
subscription.destroy! | |||
else | |||
raise "Delivery failed for #{subscription.callback_url}: HTTP #{payload_delivery.code}" | |||
end | |||
end | |||
return subscription.destroy! if response_failed_permanently?(response) # HTTP 4xx means error is not temporary, except for 429 (throttling) | |||
raise "Delivery failed for #{subscription.callback_url}: HTTP #{response.code}" unless response_successful?(response) | |||
def payload_delivery | |||
@_payload_delivery ||= callback_post_payload | |||
end | |||
subscription.touch(:last_successful_delivery_at) | |||
def callback_post_payload | |||
HTTP.timeout(:per_operation, write: 50, connect: 20, read: 50) | |||
.headers(headers) | |||
.post(subscription.callback_url, body: payload) | |||
end | |||
private | |||
def blocked_domain? | |||
DomainBlock.blocked?(host) | |||
end | |||
def host | |||
Addressable::URI.parse(subscription.callback_url).normalize.host | |||
end | |||
def headers | |||
{ | |||
'User-Agent' => 'Mastodon/PubSubHubbub', | |||
'Content-Type' => 'application/atom+xml', | |||
'Link' => link_headers, | |||
}.merge(signature_headers.to_h) | |||
end | |||
def link_headers | |||
LinkHeader.new([hub_link_header, self_link_header]).to_s | |||
end | |||
def hub_link_header | |||
[api_push_url, [%w(rel hub)]] | |||
end | |||
def self_link_header | |||
[account_url(subscription.account, format: :atom), [%w(rel self)]] | |||
end | |||
def signature_headers | |||
{ 'X-Hub-Signature' => payload_signature } if subscription.secret? | |||
end | |||
def payload_signature | |||
"sha1=#{hmac_payload_digest}" | |||
end | |||
def signature(secret, payload) | |||
hmac = OpenSSL::HMAC.hexdigest(OpenSSL::Digest.new('sha1'), secret, payload) | |||
"sha1=#{hmac}" | |||
def hmac_payload_digest | |||
OpenSSL::HMAC.hexdigest(OpenSSL::Digest.new('sha1'), subscription.secret, payload) | |||
end | |||
def response_failed_permanently?(response) | |||
response.code > 299 && response.code < 500 && response.code != 429 | |||
def response_failed_permanently? | |||
payload_delivery.code > 299 && payload_delivery.code < 500 && payload_delivery.code != 429 | |||
end | |||
def response_successful?(response) | |||
response.code > 199 && response.code < 300 | |||
def response_successful? | |||
payload_delivery.code > 199 && payload_delivery.code < 300 | |||
end | |||
end |
@@ -1,4 +1,5 @@ | |||
Fabricator(:subscription) do | |||
account | |||
callback_url "http://example.com/callback" | |||
secret "foobar" | |||
expires_at "2016-11-28 11:30:07" | |||
@@ -8,7 +8,7 @@ describe Pubsubhubbub::ConfirmationWorker do | |||
subject { described_class.new } | |||
let!(:alice) { Fabricate(:account, username: 'alice') } | |||
let!(:subscription) { Fabricate(:subscription, account_id: alice.id, callback_url: 'http://example.com/api', confirmed: false, expires_at: 3.days.from_now, secret: nil) } | |||
let!(:subscription) { Fabricate(:subscription, account: alice, callback_url: 'http://example.com/api', confirmed: false, expires_at: 3.days.from_now, secret: nil) } | |||
describe 'perform' do | |||
describe 'with subscribe mode' do | |||
@@ -0,0 +1,78 @@ | |||
# frozen_string_literal: true | |||
require 'rails_helper' | |||
describe Pubsubhubbub::DeliveryWorker do | |||
include RoutingHelper | |||
subject { described_class.new } | |||
let(:payload) { 'test' } | |||
describe 'perform' do | |||
it 'raises when subscription does not exist' do | |||
expect { subject.perform 123, payload }.to raise_error(ActiveRecord::RecordNotFound) | |||
end | |||
it 'does not attempt to deliver when domain blocked' do | |||
_domain_block = Fabricate(:domain_block, domain: 'example.com', severity: :suspend) | |||
subscription = Fabricate(:subscription, callback_url: 'https://example.com/api', last_successful_delivery_at: 2.days.ago) | |||
subject.perform(subscription.id, payload) | |||
expect(subscription.reload.last_successful_delivery_at).to be_within(2).of(2.days.ago) | |||
end | |||
it 'destroys subscription when request fails permanently' do | |||
subscription = Fabricate(:subscription) | |||
stub_request_to_respond_with(subscription, 404) | |||
subject.perform(subscription.id, payload) | |||
expect { subscription.reload }.to raise_error(ActiveRecord::RecordNotFound) | |||
end | |||
it 'raises when request fails' do | |||
subscription = Fabricate(:subscription) | |||
stub_request_to_respond_with(subscription, 500) | |||
expect { subject.perform(subscription.id, payload) }.to raise_error(/Delivery failed/) | |||
end | |||
it 'updates subscriptions when delivery succeeds' do | |||
subscription = Fabricate(:subscription) | |||
stub_request_to_respond_with(subscription, 200) | |||
subject.perform(subscription.id, payload) | |||
expect(subscription.reload.last_successful_delivery_at).to be_within(2).of(Time.now.utc) | |||
end | |||
it 'updates subscription without a secret when delivery succeeds' do | |||
subscription = Fabricate(:subscription, secret: nil) | |||
stub_request_to_respond_with(subscription, 200) | |||
subject.perform(subscription.id, payload) | |||
expect(subscription.reload.last_successful_delivery_at).to be_within(2).of(Time.now.utc) | |||
end | |||
def stub_request_to_respond_with(subscription, code) | |||
stub_request(:post, 'http://example.com/callback') | |||
.with(body: payload, headers: expected_headers(subscription)) | |||
.to_return(status: code, body: '', headers: {}) | |||
end | |||
def expected_headers(subscription) | |||
{ | |||
'Connection' => 'close', | |||
'Content-Type' => 'application/atom+xml', | |||
'Host' => 'example.com', | |||
'Link' => "<https://#{Rails.configuration.x.local_domain}/api/push>; rel=\"hub\", <https://#{Rails.configuration.x.local_domain}/users/#{subscription.account.username}.atom>; rel=\"self\"", | |||
'User-Agent' => 'Mastodon/PubSubHubbub', | |||
}.tap do |basic| | |||
known_digest = OpenSSL::HMAC.hexdigest(OpenSSL::Digest.new('sha1'), subscription.secret.to_s, payload) | |||
basic.merge('X-Hub-Signature' => "sha1=#{known_digest}") if subscription.secret? | |||
end | |||
end | |||
end | |||
end |
@@ -5,8 +5,8 @@ describe Pubsubhubbub::DistributionWorker do | |||
let!(:alice) { Fabricate(:account, username: 'alice') } | |||
let!(:bob) { Fabricate(:account, username: 'bob', domain: 'example2.com') } | |||
let!(:anonymous_subscription) { Fabricate(:subscription, account_id: alice.id, callback_url: 'http://example1.com', confirmed: true, lease_seconds: 3600) } | |||
let!(:subscription_with_follower) { Fabricate(:subscription, account_id: alice.id, callback_url: 'http://example2.com', confirmed: true, lease_seconds: 3600) } | |||
let!(:anonymous_subscription) { Fabricate(:subscription, account: alice, callback_url: 'http://example1.com', confirmed: true, lease_seconds: 3600) } | |||
let!(:subscription_with_follower) { Fabricate(:subscription, account: alice, callback_url: 'http://example2.com', confirmed: true, lease_seconds: 3600) } | |||
before do | |||
bob.follow!(alice) | |||