Class: LLM::Stream::Queue

Inherits:
Object
  • Object
show all
Defined in:
lib/llm/stream/queue.rb

Overview

A small queue for collecting streamed tool work. Values can be immediate Function::Return objects or concurrent handles returned by Function#spawn. Calling #wait(strategy) resolves queued work and returns an array of Function::Return values.

Instance Method Summary collapse

Constructor Details

#initialize(stream) ⇒ LLM::Stream::Queue

Parameters:



13
14
15
16
# File 'lib/llm/stream/queue.rb', line 13

def initialize(stream)
  @stream = stream
  @items = []
end

Instance Method Details

#<<(item) ⇒ LLM::Stream::Queue

Enqueue a function return or spawned task.

Parameters:

Returns:



22
23
24
25
# File 'lib/llm/stream/queue.rb', line 22

def <<(item)
  @items << item
  self
end

#empty?Boolean

Returns true when the queue is empty.

Returns:

  • (Boolean)


30
31
32
# File 'lib/llm/stream/queue.rb', line 30

def empty?
  @items.empty?
end

#wait(strategy) ⇒ Array<LLM::Function::Return> Also known as: value

Waits for queued work to finish and returns function results.

Parameters:

  • strategy (Symbol)

    Controls concurrency strategy:

    • :thread: Use threads
    • :task: Use async tasks (requires async gem)
    • :fiber: Use raw fibers

Returns:



42
43
44
45
46
47
48
49
50
51
# File 'lib/llm/stream/queue.rb', line 42

def wait(strategy)
  returns, tasks = @items.shift(@items.length).partition { LLM::Function::Return === _1 }
  results = case strategy
  when :thread then LLM::Function::ThreadGroup.new(tasks).wait
  when :task then LLM::Function::TaskGroup.new(tasks).wait
  when :fiber then LLM::Function::FiberGroup.new(tasks).wait
  else raise ArgumentError, "Unknown strategy: #{strategy.inspect}. Expected :thread, :task, or :fiber"
  end
  returns.concat fire_hooks(tasks, results)
end