Amir Sharif
Engineer.
Weekend hacker.
Self improvement enthusiast.

Concurrency and Parallelism in Ruby on Rails

Ruby is slow but most tasks are I/O bound (file, network, etc.) To make Ruby faster, split your work across different threads or processes.

Vanilla Threads

  • You can use vanilla Ruby threads for simple tasks.
# Make threads bubble up exceptions. By default, they won't break the main process.
Thread.abort_on_exception = true 

thread =
  Thread.new do
    begin
      # do a thing like a network call
    ensure
      File.delete(LOCKFILE)
    end
  end
# Make ONLY this thread abort if you didn't set the default.
thread.abort_on_exception = true 

Use Parallel for most tasks

Parallel wraps a bunch of concurrent/parallel features in a really nice interface. See below for some common snippets.

results = Parallel.map(['a','b','c'], in_threads: 2) do |one_letter|
  expensive_calculation(one_letter)
end


# After each iteration, it calls the lambda. Keep going until you reach Parallel::Stop.
items = [...]
Parallel.each( -> { items.pop || Parallel::Stop }) { |number| ... }

Parallel.each([], in_processes: 8, progress: "Doing stuff")

# Make sure to use Concurrent structures!
Concurrent::Hash
Concurrent::Set
Concurrent::Array
# Do NOT use += operations on these. Use a.concat(b)

Use Concurrent Ruby

mod = Module.new do
  extend Concurrent::Promises::FactoryMethods
  @thread_pool = Concurrent::FixedThreadPool.new(5)
  def self.default_executor
    @thread_pool
  end
end 
mod.future { 1 }.default_executor        # => :fast
Concurrent::Promises.future { 1 }.default_executor
pool = Concurrent::FixedThreadPool.new(5)
100.times { Concurrent::Promise.execute { sleep(10); puts 'Done' } }
Thread.list.count

500.times { Concurrent::Promise.execute(executor: pool) { puts "Hi" } }
Thread.list.count
# Simple promise
timeout = Concurrent::Promise.new do
  
end

# Returns 0, immediately executes.
Concurrent::Future.execute { 0 }.value!
# Adds a timeout and will return nil.
Concurrent::Future.execute { sleep 5; 0 }.value!(1)

# Immediately executes. Will not throw errors. .value will be nil
r = Concurrent::Promise.execute do
  sleep 2
  fail 'FUCK'
end
r.state # This updates the state. Check if it's :fulfilled
r.value # The final value in the execution. This will BLOCK the main thread until we have a value.

# Common pattern:
(
  first_request = Concurrent::Promise.execute { sleep(rand(5)); 'first' }
  second_request = Concurrent::Promise.execute { sleep(rand(5)); 'second' }
  Concurrent::Promise.zip(first_request, second_request).value! # ['first', 'second']
)

promises = 10.times.map { Concurrent::Promise.execute { sleep 5; Company.first } }
Thread.list.size

ActiveRecord::Base.connection_pool.with_connection do
  Concurrent::Promise.execute do
    Rails.application.executor.wrap do
      Company.first
    end
  end
end

thread_pool = Concurrent::FixedThreadPool.new(4)
executors =
  posts.map do |post|
    Concurrent::Future.execute(executor: thread_pool) do
      SMUtils.fetch_user_from_post(post)
    end
  end

executors.map(&:value!).each_with_index do |user_info, index|
  posts[index]['user'] = user_info
end

futures = []
job.hashtags.each do |hashtag|
  futures << Concurrent::Promises.future do
    Rails.application.executor.wrap do
      InstagramWebClient.search_for_hashtag(hashtag, only_top: true)
    end
  end
end
all_posts = Concurrent::Promises.zip(*futures).value!

executors.map(&:value!)

> ZeroDivisionError: divided by 0
> from (pry):4:in `/'

def get_all_conversations
  groups_promise = Concurrent::Promise.execute do
    get_groups_list
  end

  channels_promise = Concurrent::Promise.execute do
    get_channels_list
  end

  [groups_promise, channels_promise].map(&:value!). flatten
end

Concurrency in Rails

When doing concurrent tasks in Rails there are additional implications. Read up on Threading & Code Execution in the Rails docs.

Wrap your Parallel work in permit_concurrent_loads

data = [1, 2, 3] 
ActiveSupport::Dependencies.interlock.permit_concurrent_loads do
  Parallel.map(data, in_threads: 3) do |item|
    Rails.application.executor.wrap do
      puts "ParallelMapper instance: work_in_threads: #{item}"
      collaborator_one = CollaboratorOne.new(item)
      collaborator_one.perform
    end
  end
end

Deadlocks in Rails

This is a nightmare to debug, but you can visit the locks endpoint.

For a given process you can also print out backtrace for all the threads. This snippet from Thoughtbot.

threads = Thread.list
 
puts
puts "=" * 80
puts "Received USR1 signal; printing all #{threads.count} thread backtraces."

threads.each do |thr|
  description = thr == Thread.main ? "Main thread" : thr.inspect
  puts
  puts "#{description} backtrace: "
  puts thr.backtrace.join("\n")
end

puts "=" * 80

Fibres

More lightweight and memory efficient than threads, but harder to use. You probably won’t need it.

Ractor

True parallelism for Ruby (even over the GIL) without multiple processes. Mostly experimental right now.

Other Resources



Date
April 25, 2023