Class: LLM::Stream::Queue
- Inherits:
-
Object
- Object
- LLM::Stream::Queue
- Defined in:
- lib/llm/stream/queue.rb
Overview
A small queue for collecting streamed tool work. Values can be immediate Function::Return objects or concurrent handles returned by Function#spawn. Calling #wait(strategy) resolves queued work and returns an array of Function::Return values.
Instance Method Summary collapse
- #initialize(stream) ⇒ LLM::Stream::Queue constructor
-
#<<(item)
⇒ LLM::Stream::Queue
Enqueue a function return or spawned task.
-
#empty? ⇒
Boolean
Returns true when the queue is empty.
-
#wait(strategy) ⇒
Array<LLM::Function::Return> (also: #value)
Waits for queued work to finish and returns function results.
Constructor Details
#initialize(stream) ⇒ LLM::Stream::Queue
13 14 15 16 |
# File 'lib/llm/stream/queue.rb', line 13 def initialize(stream) @stream = stream @items = [] end |
Instance Method Details
#<<(item) ⇒ LLM::Stream::Queue
Enqueue a function return or spawned task.
22 23 24 25 |
# File 'lib/llm/stream/queue.rb', line 22 def <<(item) @items << item self end |
#empty? ⇒ Boolean
Returns true when the queue is empty.
30 31 32 |
# File 'lib/llm/stream/queue.rb', line 30 def empty? @items.empty? end |
#wait(strategy) ⇒ Array<LLM::Function::Return> Also known as: value
Waits for queued work to finish and returns function results.
42 43 44 45 46 47 48 49 50 51 |
# File 'lib/llm/stream/queue.rb', line 42 def wait(strategy) returns, tasks = @items.shift(@items.length).partition { LLM::Function::Return === _1 } results = case strategy when :thread then LLM::Function::ThreadGroup.new(tasks).wait when :task then LLM::Function::TaskGroup.new(tasks).wait when :fiber then LLM::Function::FiberGroup.new(tasks).wait else raise ArgumentError, "Unknown strategy: #{strategy.inspect}. Expected :thread, :task, or :fiber" end returns.concat fire_hooks(tasks, results) end |