Break API calls in work towards moving all blocking io to ractors, dropped BackgroundWorker class

This commit is contained in:
2026-01-30 19:36:11 -06:00
parent 6e79c4639d
commit f98d8c3394
7 changed files with 134 additions and 207 deletions

View File

@@ -16,6 +16,7 @@ class W3DHub
].freeze
def self.on_thread(method, *args, &callback)
raise "Renew."
BackgroundWorker.foreground_job(-> { Api.send(method, *args) }, callback)
end

View File

@@ -682,7 +682,23 @@ class W3DHub
# mark MAIN ractor's task as started before handing off to background ractor
# so that we don't start up multiple tasks at once.
task.start
BackgroundWorker.ractor_task(task)
on_ractor(task)
end
def on_ractor(task)
raise "Something has gone horribly wrong!!!" unless Ractor.main?
ractor = Ractor.new(task) do |t|
t.start
end
Thread.new do
while (message_event = ractor.take)
break unless message_event.is_a?(Task::MessageEvent)
Store.application_manager.handle_task_event(message_event)
end
end
end
def task?(type, app_id, channel)

View File

@@ -1,186 +0,0 @@
class W3DHub
class BackgroundWorker
LOG_TAG = "W3DHub::BackgroundWorker"
@@instance = nil
@@alive = false
def self.create
raise "BackgroundWorker instance already exists!" if @@instance
logger.info(LOG_TAG) { "Starting background job worker..." }
@@thread = Thread.current
@@alive = true
@@run = true
@@instance = self.new
@@instance.handle_jobs
end
def self.instance
@@instance
end
def self.run?
@@run
end
def self.alive?
@@alive
end
def self.busy?
instance&.busy?
end
def self.shutdown!
@@run = false
end
def self.kill!
@@thread.kill
@@instance.kill!
end
def self.job(job, callback, error_handler = nil, data = nil)
@@instance.add_job(Job.new(job: job, callback: callback, error_handler: error_handler, data: data))
end
def self.parallel_job(job, callback, error_handler = nil, data = nil)
@@instance.add_parallel_job(Job.new(job: job, callback: callback, error_handler: error_handler, data: data))
end
def self.foreground_job(job, callback, error_handler = nil, data = nil)
@@instance.add_job(Job.new(job: job, callback: callback, error_handler: error_handler, deliver_to_queue: true, data: data))
end
def self.foreground_parallel_job(job, callback, error_handler = nil, data = nil)
@@instance.add_parallel_job(Job.new(job: job, callback: callback, error_handler: error_handler, deliver_to_queue: true, data: data))
end
def self.ractor_task(task)
raise "Something has gone horribly wrong!!!" unless Ractor.main?
ractor = Ractor.new do
t = Ractor.receive
t.start
end
ractor.send(task)
Thread.new do
while (message_event = ractor.take)
break unless message_event.is_a?(W3DHub::ApplicationManager::Task::MessageEvent)
Store.application_manager.handle_task_event(message_event)
end
end
end
def initialize
@busy = false
@jobs = []
# Jobs which are order independent
@parallel_busy = false
@thread_pool = []
@parallel_jobs = []
end
def kill!
@thread_pool.each(&:kill)
logger.info(LOG_TAG) { "Forcefully killed background job worker." }
@@alive = false
end
def handle_jobs
8.times do |i|
Thread.new do
@thread_pool << Thread.current
while BackgroundWorker.run?
job = @parallel_jobs.shift
@parallel_busy = true
begin
job&.do
rescue => e
job&.raise_error(e)
end
@parallel_busy = !@parallel_jobs.empty?
sleep 0.1
end
end
end
Thread.new do
@thread_pool << Thread.current
while BackgroundWorker.run?
job = @jobs.shift
@busy = true
begin
job&.do
rescue => e
job&.raise_error(e)
end
@busy = !@jobs.empty?
sleep 0.1
end
logger.info(LOG_TAG) { "Stopped background job worker." }
@@alive = false
end
end
def add_job(job)
@jobs << job
end
def add_parallel_job(job)
@parallel_jobs << job
end
def busy?
@busy || @parallel_busy
end
class Job
def initialize(job:, callback:, error_handler: nil, deliver_to_queue: false, data: nil)
@job = job
@callback = callback
@error_handler = error_handler
@deliver_to_queue = deliver_to_queue
@data = data
end
def do
result = @data ? @job.call(@data) : @job.call
deliver(result)
end
def deliver(result)
if @deliver_to_queue
Store.main_thread_queue << -> { @callback.call(result) }
else
@callback.call(result)
end
end
def raise_error(error)
logger.error error
@error_handler&.call(error)
end
end
end
end

96
lib/network_manager.rb Normal file
View File

@@ -0,0 +1,96 @@
class W3DHub
# all http(s) requests for API calls and downloading images run through here
class NetworkManager
NetworkEvent = Data.define(:context, :result)
Request = Struct.new(:context, :callback)
Context = Data.define(
:request_id,
:url,
:headers,
:body,
:bearer_token
)
def initialize
@requests = {}
@ractor = Ractor.new do
raise "Something has gone quite wrong!" if Ractor.main?
queue = []
api_client = ApiClient.new
# Ractor has no concept of non-blocking send/receive... :cry:
Thread.new do
while (context = Ractor.receive) # blocking
# we cannot (easily) ensure we always are receive expected data
next unless context.is_a?(Context)
queue << context
end
end
Async do
loop do
context = queue.shift
# goto sleep for an instant if there is no work to be doing
unless context
sleep 0.1
next
end
Sync do
result = api_client.handle(context)
Ractor.yield(NetworkEvent.new(context, result))
end
end
end
end
monitor
end
def add_request(url, headers, body, bearer_token, &block)
request_id = SecureRandom.hex
@requests << Request.new(
Context.new(
request_id,
url,
headers,
body,
bearer_token
),
block
)
@ractor.send(context)
request_id
end
def monitor
raise "Something has gone quite wrong!!!" unless Ractor.main?
# Thread that spends its days sleeping **yawn**
Thread.new do
while (event = @ractor.take)
pp event
next unless event.is_a?(NetworkEvent)
request = @request.find { |r| r.context.request_id == event.context.request_id }
next if request
@requests.delete(request)
result = event.result
Store.main_thread_queue << ->(result) { request.callback(result) }
end
end
end
end
end

View File

@@ -0,0 +1,9 @@
class W3DHub
class NetworkManager
# Api reimplemented in a Ractor friendly manner
class ApiClient
def initialize
end
end
end
end

View File

@@ -6,10 +6,12 @@ class W3DHub
Store[:server_list] = []
Store[:settings] = Settings.new
Store[:network_manager] = NetworkManager.new
Store[:application_manager] = ApplicationManager.new
Store[:ping_manager] = PingManager.new
BackgroundWorker.parallel_job(-> { Async { |task| Store.ping_manager.monitor(task) } }, nil)
# FIXME
# BackgroundWorker.parallel_job(-> { Async { |task| Store.ping_manager.monitor(task) } }, nil)
Store[:main_thread_queue] = []
@@ -34,9 +36,6 @@ class W3DHub
while (block = Store.main_thread_queue.shift)
block&.call
end
# Manually sleep main thread so that the BackgroundWorker thread can be scheduled
sleep(update_interval / 1000.0) if W3DHub::BackgroundWorker.busy? || Store.application_manager.busy?
end
def needs_redraw?