Kaynağa Gözat

Avoid race condition when streaming deleted statuses (#10280)

* Avoid race condition when streaming deleted statuses

* Move redis lock to DistributionWorker to avoid extra Redis value
master
ThibG 5 yıl önce
committed by Eugen Rochko
ebeveyn
işleme
6b0eda14a1
2 değiştirilmiş dosya ile 27 ekleme ve 11 silme
  1. +20
    -10
      app/services/remove_status_service.rb
  2. +7
    -1
      app/workers/distribution_worker.rb

+ 20
- 10
app/services/remove_status_service.rb Dosyayı Görüntüle

@@ -14,16 +14,22 @@ class RemoveStatusService < BaseService
@stream_entry = status.stream_entry @stream_entry = status.stream_entry
@options = options @options = options


remove_from_self if status.account.local?
remove_from_followers
remove_from_lists
remove_from_affected
remove_reblogs
remove_from_hashtags
remove_from_public
remove_from_media if status.media_attachments.any?

@status.destroy!
RedisLock.acquire(lock_options) do |lock|
if lock.acquired?
remove_from_self if status.account.local?
remove_from_followers
remove_from_lists
remove_from_affected
remove_reblogs
remove_from_hashtags
remove_from_public
remove_from_media if status.media_attachments.any?

@status.destroy!
else
raise Mastodon::RaceConditionError
end
end


# There is no reason to send out Undo activities when the # There is no reason to send out Undo activities when the
# cause is that the original object has been removed, since # cause is that the original object has been removed, since
@@ -156,4 +162,8 @@ class RemoveStatusService < BaseService
redis.publish('timeline:public:media', @payload) redis.publish('timeline:public:media', @payload)
redis.publish('timeline:public:local:media', @payload) if @status.local? redis.publish('timeline:public:local:media', @payload) if @status.local?
end end

def lock_options
{ redis: Redis.current, key: "distribute:#{@status.id}" }
end
end end

+ 7
- 1
app/workers/distribution_worker.rb Dosyayı Görüntüle

@@ -4,7 +4,13 @@ class DistributionWorker
include Sidekiq::Worker include Sidekiq::Worker


def perform(status_id) def perform(status_id)
FanOutOnWriteService.new.call(Status.find(status_id))
RedisLock.acquire(redis: Redis.current, key: "distribute:#{status_id}") do |lock|
if lock.acquired?
FanOutOnWriteService.new.call(Status.find(status_id))
else
raise Mastodon::RaceConditionError
end
end
rescue ActiveRecord::RecordNotFound rescue ActiveRecord::RecordNotFound
true true
end end


Yükleniyor…
İptal
Kaydet