Skip to content

Commit 25ccddc

Browse files
authored
Allow TimerSet to safely handle an executor raising RejectedExecutionError (#999)
1 parent dadc2ad commit 25ccddc

File tree

2 files changed

+20
-2
lines changed

2 files changed

+20
-2
lines changed

lib/concurrent-ruby/concurrent/executor/timer_set.rb

+6-2
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
require 'concurrent/collection/non_concurrent_priority_queue'
44
require 'concurrent/executor/executor_service'
55
require 'concurrent/executor/single_thread_executor'
6-
6+
require 'concurrent/errors'
77
require 'concurrent/options'
88

99
module Concurrent
@@ -162,7 +162,11 @@ def process_tasks
162162
# queue now must have the same pop time, or a closer one, as
163163
# when we peeked).
164164
task = synchronize { @queue.pop }
165-
task.executor.post { task.process_task }
165+
begin
166+
task.executor.post { task.process_task }
167+
rescue RejectedExecutionError
168+
# ignore and continue
169+
end
166170
else
167171
@condition.wait([diff, 60].min)
168172
end

spec/concurrent/executor/timer_set_spec.rb

+14
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,20 @@ module Concurrent
122122
expect(task.value).to eq i
123123
end
124124
end
125+
126+
it 'safely handles an executor raising RejectedExecutionError' do
127+
# force a task's executor to raise RejectedExecutionError within the TimerSet
128+
abort_executor = ImmediateExecutor.new
129+
allow(abort_executor).to receive(:post).and_raise(Concurrent::RejectedExecutionError)
130+
ScheduledTask.execute(0.2, executor: abort_executor, timer_set: subject){ nil }
131+
abort_executor.shutdown
132+
133+
latch = CountDownLatch.new(1)
134+
ScheduledTask.execute(0.3, timer_set: subject) do
135+
latch.count_down
136+
end
137+
expect(latch.wait(1)).to be_truthy
138+
end
125139
end
126140

127141
context 'resolution' do

0 commit comments

Comments
 (0)