HTTP connections must be explicitly closed in many cases, and letting perform method close connections makes its callers less redundant and prevent them from forgetting to close connections.master
@@ -60,9 +60,9 @@ module JsonLdHelper | |||
end | |||
def fetch_resource_without_id_validation(uri) | |||
response = build_request(uri).perform | |||
return if response.code != 200 | |||
body_to_json(response.to_s) | |||
build_request(uri).perform do |response| | |||
response.code == 200 ? body_to_json(response.to_s) : nil | |||
end | |||
end | |||
def body_to_json(body) | |||
@@ -13,15 +13,14 @@ class ProviderDiscovery < OEmbed::ProviderDiscovery | |||
def discover_provider(url, **options) | |||
format = options[:format] | |||
if options[:html] | |||
html = Nokogiri::HTML(options[:html]) | |||
else | |||
res = Request.new(:get, url).perform | |||
raise OEmbed::NotFound, url if res.code != 200 || res.mime_type != 'text/html' | |||
html = Nokogiri::HTML(res.to_s) | |||
end | |||
html = if options[:html] | |||
Nokogiri::HTML(options[:html]) | |||
else | |||
Request.new(:get, url).perform do |res| | |||
raise OEmbed::NotFound, url if res.code != 200 || res.mime_type != 'text/html' | |||
Nokogiri::HTML(res.to_s) | |||
end | |||
end | |||
if format.nil? || format == :json | |||
provider_endpoint ||= html.at_xpath('//link[@type="application/json+oembed"]')&.attribute('href')&.value | |||
@@ -33,9 +33,17 @@ class Request | |||
end | |||
def perform | |||
http_client.headers(headers).public_send(@verb, @url.to_s, @options) | |||
rescue => e | |||
raise e.class, "#{e.message} on #{@url}", e.backtrace[0] | |||
begin | |||
response = http_client.headers(headers).public_send(@verb, @url.to_s, @options) | |||
rescue => e | |||
raise e.class, "#{e.message} on #{@url}", e.backtrace[0] | |||
end | |||
begin | |||
yield response | |||
ensure | |||
http_client.close | |||
end | |||
end | |||
def headers | |||
@@ -88,7 +96,7 @@ class Request | |||
end | |||
def http_client | |||
HTTP.timeout(:per_operation, timeout).follow(max_hops: 2) | |||
@http_client ||= HTTP.timeout(:per_operation, timeout).follow(max_hops: 2) | |||
end | |||
class Socket < TCPSocket | |||
@@ -21,23 +21,23 @@ module Remotable | |||
return if !%w(http https).include?(parsed_url.scheme) || parsed_url.host.empty? || self[attribute_name] == url | |||
begin | |||
response = Request.new(:get, url).perform | |||
return if response.code != 200 | |||
matches = response.headers['content-disposition']&.match(/filename="([^"]*)"/) | |||
filename = matches.nil? ? parsed_url.path.split('/').last : matches[1] | |||
basename = SecureRandom.hex(8) | |||
extname = if filename.nil? | |||
'' | |||
else | |||
File.extname(filename) | |||
end | |||
send("#{attachment_name}=", StringIO.new(response.to_s)) | |||
send("#{attachment_name}_file_name=", basename + extname) | |||
self[attribute_name] = url if has_attribute?(attribute_name) | |||
Request.new(:get, url).perform do |response| | |||
next if response.code != 200 | |||
matches = response.headers['content-disposition']&.match(/filename="([^"]*)"/) | |||
filename = matches.nil? ? parsed_url.path.split('/').last : matches[1] | |||
basename = SecureRandom.hex(8) | |||
extname = if filename.nil? | |||
'' | |||
else | |||
File.extname(filename) | |||
end | |||
send("#{attachment_name}=", StringIO.new(response.to_s)) | |||
send("#{attachment_name}_file_name=", basename + extname) | |||
self[attribute_name] = url if has_attribute?(attribute_name) | |||
end | |||
rescue HTTP::TimeoutError, HTTP::ConnectionError, OpenSSL::SSL::SSLError, Paperclip::Errors::NotIdentifiedByImageMagickError, Addressable::URI::InvalidURIError, Mastodon::HostValidationError => e | |||
Rails.logger.debug "Error fetching remote #{attachment_name}: #{e}" | |||
nil | |||
@@ -24,43 +24,44 @@ class FetchAtomService < BaseService | |||
def process(url, terminal = false) | |||
@url = url | |||
perform_request | |||
process_response(terminal) | |||
perform_request { |response| process_response(response, terminal) } | |||
end | |||
def perform_request | |||
def perform_request(&block) | |||
accept = 'text/html' | |||
accept = 'application/activity+json, application/ld+json, application/atom+xml, ' + accept unless @unsupported_activity | |||
@response = Request.new(:get, @url) | |||
.add_headers('Accept' => accept) | |||
.perform | |||
Request.new(:get, @url).add_headers('Accept' => accept).perform(&block) | |||
end | |||
def process_response(terminal = false) | |||
return nil if @response.code != 200 | |||
def process_response(response, terminal = false) | |||
return nil if response.code != 200 | |||
if @response.mime_type == 'application/atom+xml' | |||
[@url, { prefetched_body: @response.to_s }, :ostatus] | |||
elsif ['application/activity+json', 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"'].include?(@response.mime_type) | |||
json = body_to_json(@response.to_s) | |||
if response.mime_type == 'application/atom+xml' | |||
[@url, { prefetched_body: response.to_s }, :ostatus] | |||
elsif ['application/activity+json', 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"'].include?(response.mime_type) | |||
json = body_to_json(response.to_s) | |||
if supported_context?(json) && json['type'] == 'Person' && json['inbox'].present? | |||
[json['id'], { prefetched_body: @response.to_s, id: true }, :activitypub] | |||
[json['id'], { prefetched_body: response.to_s, id: true }, :activitypub] | |||
elsif supported_context?(json) && json['type'] == 'Note' | |||
[json['id'], { prefetched_body: @response.to_s, id: true }, :activitypub] | |||
[json['id'], { prefetched_body: response.to_s, id: true }, :activitypub] | |||
else | |||
@unsupported_activity = true | |||
nil | |||
end | |||
elsif @response['Link'] && !terminal && link_header.find_link(%w(rel alternate)) | |||
process_headers | |||
elsif @response.mime_type == 'text/html' && !terminal | |||
process_html | |||
elsif !terminal | |||
link_header = response['Link'] && parse_link_header(response) | |||
if link_header&.find_link(%w(rel alternate)) | |||
process_link_headers(link_header) | |||
elsif response.mime_type == 'text/html' | |||
process_html(response) | |||
end | |||
end | |||
end | |||
def process_html | |||
page = Nokogiri::HTML(@response.to_s) | |||
def process_html(response) | |||
page = Nokogiri::HTML(response.to_s) | |||
json_link = page.xpath('//link[@rel="alternate"]').find { |link| ['application/activity+json', 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"'].include?(link['type']) } | |||
atom_link = page.xpath('//link[@rel="alternate"]').find { |link| link['type'] == 'application/atom+xml' } | |||
@@ -71,7 +72,7 @@ class FetchAtomService < BaseService | |||
result | |||
end | |||
def process_headers | |||
def process_link_headers(link_header) | |||
json_link = link_header.find_link(%w(rel alternate), %w(type application/activity+json)) || link_header.find_link(%w(rel alternate), ['type', 'application/ld+json; profile="https://www.w3.org/ns/activitystreams"']) | |||
atom_link = link_header.find_link(%w(rel alternate), %w(type application/atom+xml)) | |||
@@ -81,7 +82,7 @@ class FetchAtomService < BaseService | |||
result | |||
end | |||
def link_header | |||
@link_header ||= LinkHeader.parse(@response['Link'].is_a?(Array) ? @response['Link'].first : @response['Link']) | |||
def parse_link_header(response) | |||
LinkHeader.parse(response['Link'].is_a?(Array) ? response['Link'].first : response['Link']) | |||
end | |||
end |
@@ -36,15 +36,24 @@ class FetchLinkCardService < BaseService | |||
def process_url | |||
@card ||= PreviewCard.new(url: @url) | |||
res = Request.new(:head, @url).perform | |||
return if res.code != 405 && (res.code != 200 || res.mime_type != 'text/html') | |||
failed = Request.new(:head, @url).perform do |res| | |||
res.code != 405 && (res.code != 200 || res.mime_type != 'text/html') | |||
end | |||
@response = Request.new(:get, @url).perform | |||
return if failed | |||
return if @response.code != 200 || @response.mime_type != 'text/html' | |||
Request.new(:get, @url).perform do |res| | |||
if res.code == 200 && res.mime_type == 'text/html' | |||
@html = res.to_s | |||
@html_charset = res.charset | |||
else | |||
@html = nil | |||
@html_charset = nil | |||
end | |||
end | |||
@html = @response.to_s | |||
return if @html.nil? | |||
attempt_oembed || attempt_opengraph | |||
end | |||
@@ -118,7 +127,7 @@ class FetchLinkCardService < BaseService | |||
detector = CharlockHolmes::EncodingDetector.new | |||
detector.strip_tags = true | |||
guess = detector.detect(@html, @response.charset) | |||
guess = detector.detect(@html, @html_charset) | |||
page = Nokogiri::HTML(@html, nil, guess&.fetch(:encoding, nil)) | |||
if meta_property(page, 'twitter:player') | |||
@@ -179,11 +179,10 @@ class ResolveAccountService < BaseService | |||
def atom_body | |||
return @atom_body if defined?(@atom_body) | |||
response = Request.new(:get, atom_url).perform | |||
raise Mastodon::UnexpectedResponseError, response unless response.code == 200 | |||
@atom_body = response.to_s | |||
@atom_body = Request.new(:get, atom_url).perform do |response| | |||
raise Mastodon::UnexpectedResponseError, response unless response.code == 200 | |||
response.to_s | |||
end | |||
end | |||
def actor_json | |||
@@ -12,11 +12,9 @@ class SendInteractionService < BaseService | |||
return if !target_account.ostatus? || block_notification? | |||
delivery = build_request.perform | |||
raise Mastodon::UnexpectedResponseError, delivery unless delivery.code > 199 && delivery.code < 300 | |||
delivery.connection&.close | |||
build_request.perform do |delivery| | |||
raise Mastodon::UnexpectedResponseError, delivery unless delivery.code > 199 && delivery.code < 300 | |||
end | |||
end | |||
private | |||
@@ -6,21 +6,21 @@ class SubscribeService < BaseService | |||
@account = account | |||
@account.secret = SecureRandom.hex | |||
@response = build_request.perform | |||
if response_failed_permanently? | |||
# We're not allowed to subscribe. Fail and move on. | |||
@account.secret = '' | |||
@account.save! | |||
elsif response_successful? | |||
# The subscription will be confirmed asynchronously. | |||
@account.save! | |||
else | |||
# The response was either a 429 rate limit, or a 5xx error. | |||
# We need to retry at a later time. Fail loudly! | |||
raise Mastodon::UnexpectedResponseError, @response | |||
build_request.perform do |response| | |||
if response_failed_permanently? response | |||
# We're not allowed to subscribe. Fail and move on. | |||
@account.secret = '' | |||
@account.save! | |||
elsif response_successful? response | |||
# The subscription will be confirmed asynchronously. | |||
@account.save! | |||
else | |||
# The response was either a 429 rate limit, or a 5xx error. | |||
# We need to retry at a later time. Fail loudly! | |||
raise Mastodon::UnexpectedResponseError, response | |||
end | |||
end | |||
@response.connection&.close | |||
end | |||
private | |||
@@ -47,12 +47,12 @@ class SubscribeService < BaseService | |||
end | |||
# Any response in the 3xx or 4xx range, except for 429 (rate limit) | |||
def response_failed_permanently? | |||
(@response.status.redirect? || @response.status.client_error?) && !@response.status.too_many_requests? | |||
def response_failed_permanently?(response) | |||
(response.status.redirect? || response.status.client_error?) && !response.status.too_many_requests? | |||
end | |||
# Any response in the 2xx range | |||
def response_successful? | |||
@response.status.success? | |||
def response_successful?(response) | |||
response.status.success? | |||
end | |||
end |
@@ -7,10 +7,9 @@ class UnsubscribeService < BaseService | |||
@account = account | |||
begin | |||
@response = build_request.perform | |||
Rails.logger.debug "PuSH unsubscribe for #{@account.acct} failed: #{@response.status}" unless @response.status.success? | |||
@response.connection&.close | |||
build_request.perform do |response| | |||
Rails.logger.debug "PuSH unsubscribe for #{@account.acct} failed: #{response.status}" unless response.status.success? | |||
end | |||
rescue HTTP::Error, OpenSSL::SSL::SSLError => e | |||
Rails.logger.debug "PuSH unsubscribe for #{@account.acct} failed: #{e}" | |||
end | |||
@@ -12,11 +12,10 @@ class ActivityPub::DeliveryWorker | |||
@source_account = Account.find(source_account_id) | |||
@inbox_url = inbox_url | |||
perform_request | |||
perform_request do |response| | |||
raise Mastodon::UnexpectedResponseError, response unless response_successful? response | |||
end | |||
raise Mastodon::UnexpectedResponseError, @response unless response_successful? | |||
@response.connection&.close | |||
failure_tracker.track_success! | |||
rescue => e | |||
failure_tracker.track_failure! | |||
@@ -31,12 +30,12 @@ class ActivityPub::DeliveryWorker | |||
request.add_headers(HEADERS) | |||
end | |||
def perform_request | |||
@response = build_request.perform | |||
def perform_request(&block) | |||
build_request.perform(&block) | |||
end | |||
def response_successful? | |||
@response.code > 199 && @response.code < 300 | |||
def response_successful?(response) | |||
response.code > 199 && response.code < 300 | |||
end | |||
def failure_tracker | |||
@@ -21,8 +21,8 @@ class Pubsubhubbub::ConfirmationWorker | |||
def process_confirmation | |||
prepare_subscription | |||
confirm_callback | |||
logger.debug "Confirming PuSH subscription for #{subscription.callback_url} with challenge #{challenge}: #{callback_response_body}" | |||
callback_get_with_params | |||
logger.debug "Confirming PuSH subscription for #{subscription.callback_url} with challenge #{challenge}: #{@callback_response_body}" | |||
update_subscription | |||
end | |||
@@ -44,7 +44,7 @@ class Pubsubhubbub::ConfirmationWorker | |||
end | |||
def response_matches_challenge? | |||
callback_response_body == challenge | |||
@callback_response_body == challenge | |||
end | |||
def subscribing? | |||
@@ -55,16 +55,10 @@ class Pubsubhubbub::ConfirmationWorker | |||
mode == 'unsubscribe' | |||
end | |||
def confirm_callback | |||
@_confirm_callback ||= callback_get_with_params | |||
end | |||
def callback_get_with_params | |||
Request.new(:get, subscription.callback_url, params: callback_params).perform | |||
end | |||
def callback_response_body | |||
confirm_callback.body.to_s | |||
Request.new(:get, subscription.callback_url, params: callback_params).perform do |response| | |||
@callback_response_body = response.body.to_s | |||
end | |||
end | |||
def callback_params | |||
@@ -23,22 +23,17 @@ class Pubsubhubbub::DeliveryWorker | |||
private | |||
def process_delivery | |||
payload_delivery | |||
callback_post_payload do |payload_delivery| | |||
raise Mastodon::UnexpectedResponseError, payload_delivery unless response_successful? payload_delivery | |||
end | |||
raise Mastodon::UnexpectedResponseError, payload_delivery unless response_successful? | |||
payload_delivery.connection&.close | |||
subscription.touch(:last_successful_delivery_at) | |||
end | |||
def payload_delivery | |||
@_payload_delivery ||= callback_post_payload | |||
end | |||
def callback_post_payload | |||
def callback_post_payload(&block) | |||
request = Request.new(:post, subscription.callback_url, body: payload) | |||
request.add_headers(headers) | |||
request.perform | |||
request.perform(&block) | |||
end | |||
def blocked_domain? | |||
@@ -80,7 +75,7 @@ class Pubsubhubbub::DeliveryWorker | |||
OpenSSL::HMAC.hexdigest(OpenSSL::Digest.new('sha1'), subscription.secret, payload) | |||
end | |||
def response_successful? | |||
def response_successful?(payload_delivery) | |||
payload_delivery.code > 199 && payload_delivery.code < 300 | |||
end | |||
end |
@@ -777,7 +777,7 @@ namespace :mastodon do | |||
progress_bar.increment | |||
begin | |||
res = Request.new(:head, account.uri).perform | |||
code = Request.new(:head, account.uri).perform(&:code) | |||
rescue StandardError | |||
# This could happen due to network timeout, DNS timeout, wrong SSL cert, etc, | |||
# which should probably not lead to perceiving the account as deleted, so | |||
@@ -785,7 +785,7 @@ namespace :mastodon do | |||
next | |||
end | |||
if [404, 410].include?(res.code) | |||
if [404, 410].include?(code) | |||
if options[:force] | |||
SuspendAccountService.new.call(account) | |||
account.destroy | |||
@@ -39,12 +39,10 @@ describe Request do | |||
describe '#perform' do | |||
context 'with valid host' do | |||
before do | |||
stub_request(:get, 'http://example.com') | |||
subject.perform | |||
end | |||
before { stub_request(:get, 'http://example.com') } | |||
it 'executes a HTTP request' do | |||
expect { |block| subject.perform &block }.to yield_control | |||
expect(a_request(:get, 'http://example.com')).to have_been_made.once | |||
end | |||
@@ -52,12 +50,20 @@ describe Request do | |||
allow(Addrinfo).to receive(:foreach).with('example.com', nil, nil, :SOCK_STREAM) | |||
.and_yield(Addrinfo.new(["AF_INET", 0, "example.com", "0.0.0.0"], :PF_INET, :SOCK_STREAM)) | |||
.and_yield(Addrinfo.new(["AF_INET6", 0, "example.com", "2001:4860:4860::8844"], :PF_INET6, :SOCK_STREAM)) | |||
expect { |block| subject.perform &block }.to yield_control | |||
expect(a_request(:get, 'http://example.com')).to have_been_made.once | |||
end | |||
it 'sets headers' do | |||
expect { |block| subject.perform &block }.to yield_control | |||
expect(a_request(:get, 'http://example.com').with(headers: subject.headers)).to have_been_made | |||
end | |||
it 'closes underlaying connection' do | |||
expect_any_instance_of(HTTP::Client).to receive(:close) | |||
expect { |block| subject.perform &block }.to yield_control | |||
end | |||
end | |||
context 'with private host' do | |||