Class: Chan::UNIXSocket
- Inherits:
-
Object
- Object
- Chan::UNIXSocket
- Defined in:
- lib/xchan/unix_socket.rb
Overview
An easy-to-use InterProcess Communication (IPC) library
Instance Attribute Summary collapse
-
#r ⇒ UNIXSocket
readonly
Returns a socket used for read operations.
-
#w ⇒ UNIXSocket
readonly
Returns a socket used for write operations.
-
#s ⇒ <#dump, #load>
(also: #serializer)
readonly
Returns the serializer used by the channel.
Write methods collapse
-
#send(object) ⇒ Object
(also: #write)
Performs a blocking write.
-
#send_nonblock(object) ⇒ Integer?
(also: #write_nonblock)
Performs a non-blocking write.
Read methods collapse
-
#recv ⇒ Object
(also: #read)
Performs a blocking read.
-
#recv_nonblock ⇒ Object
(also: #read_nonblock)
Performs a non-blocking read.
Stat methods collapse
-
#bytes_sent ⇒ Integer
(also: #bytes_written)
Returns the total number of bytes written to the channel.
-
#bytes_received ⇒ Integer
(also: #bytes_read)
Returns the total number of bytes read from the channel.
-
#size ⇒ Integer
Returns the number of objects waiting to be read.
Wait methods collapse
-
#wait_writable(s = nil) ⇒ Chan::UNIXSocket?
Waits for the channel to become writable.
-
#wait_readable(s = nil) ⇒ Chan::UNIXSocket?
Waits for the channel to become readable.
Instance Method Summary collapse
-
#closed? ⇒ Boolean
Returns true when the channel is closed.
-
#close ⇒ void
Closes the channel.
-
#to_a ⇒ Array<Object>
Returns the contents of the channel.
-
#empty? ⇒ Boolean
Returns true when the channel is empty.
-
#initialize(serializer, sock: Socket::SOCK_DGRAM, tmpdir: Dir.tmpdir, lock: :file) ⇒ Chan::UNIXSocket
constructor
Returns an instance of Chan::UNIXSocket.
Constructor Details
#initialize(serializer, sock: Socket::SOCK_DGRAM, tmpdir: Dir.tmpdir, lock: :file) ⇒ Chan::UNIXSocket
Returns an instance of Chan::UNIXSocket
42 43 44 45 46 47 48 |
# File 'lib/xchan/unix_socket.rb', line 42 def initialize(serializer, sock: Socket::SOCK_DGRAM, tmpdir: Dir.tmpdir, lock: :file) @s = Chan.serializers[serializer]&.call || serializer @r, @w = ::UNIXSocket.pair(sock) @bytes = Chan::Bytes.new(tmpdir) @counter = Chan::Counter.new(tmpdir) @lock = Chan.locks[lock]&.call(tmpdir) || lock end |
Instance Attribute Details
#r ⇒ UNIXSocket (readonly)
Returns a socket used for read operations
13 14 15 |
# File 'lib/xchan/unix_socket.rb', line 13 def r @r end |
#w ⇒ UNIXSocket (readonly)
Returns a socket used for write operations
18 19 20 |
# File 'lib/xchan/unix_socket.rb', line 18 def w @w end |
#s ⇒ <#dump, #load> (readonly) Also known as: serializer
Returns the serializer used by the channel
23 24 25 |
# File 'lib/xchan/unix_socket.rb', line 23 def s @s end |
Instance Method Details
#wait_writable(s = nil) ⇒ Chan::UNIXSocket?
Waits for the channel to become writable
239 240 241 |
# File 'lib/xchan/unix_socket.rb', line 239 def wait_writable(s = nil) @w.wait_writable(s) and self end |
#closed? ⇒ Boolean
Returns true when the channel is closed
53 54 55 |
# File 'lib/xchan/unix_socket.rb', line 53 def closed? @r.closed? and @w.closed? end |
#close ⇒ void
This method returns an undefined value.
Closes the channel
62 63 64 65 66 67 68 69 |
# File 'lib/xchan/unix_socket.rb', line 62 def close @lock.lock raise IOError, "channel is closed" if closed? [@r, @w, @bytes, @lock].each(&:close) rescue IOError => ex @lock.release raise(ex) end |
#send(object) ⇒ Object Also known as: write
Performs a blocking write
82 83 84 85 86 |
# File 'lib/xchan/unix_socket.rb', line 82 def send(object) send_nonblock(object) rescue Chan::WaitWritable, Chan::WaitLockable retry end |
#send_nonblock(object) ⇒ Integer? Also known as: write_nonblock
Performs a non-blocking write
101 102 103 104 105 106 107 108 109 110 111 112 113 |
# File 'lib/xchan/unix_socket.rb', line 101 def send_nonblock(object) @lock.lock_nonblock raise IOError, "channel closed" if closed? len = @w.write_nonblock(serialize(object)) @bytes.push(len) @counter.increment!(bytes_written: len) len.tap { @lock.release } rescue IOError, IO::WaitWritable, Errno::ENOBUFS => ex @lock.release raise Chan::WaitWritable, ex. rescue Errno::EWOULDBLOCK => ex raise Chan::WaitLockable, ex. end |
#recv ⇒ Object Also known as: read
Performs a blocking read
128 129 130 131 132 133 134 135 |
# File 'lib/xchan/unix_socket.rb', line 128 def recv recv_nonblock rescue Chan::WaitReadable wait_readable retry rescue Chan::WaitLockable retry end |
#recv_nonblock ⇒ Object Also known as: read_nonblock
Performs a non-blocking read
148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
# File 'lib/xchan/unix_socket.rb', line 148 def recv_nonblock @lock.lock_nonblock raise IOError, "closed channel" if closed? len = @bytes.shift obj = deserialize(@r.read_nonblock(len.zero? ? 1 : len)) @counter.increment!(bytes_read: len) obj.tap { @lock.release } rescue IOError => ex @lock.release raise(ex) rescue IO::WaitReadable => ex @bytes.unshift(len) @lock.release raise Chan::WaitReadable, ex. rescue Errno::EAGAIN => ex raise Chan::WaitLockable, ex. end |
#to_a ⇒ Array<Object>
Returns the contents of the channel
177 178 179 180 181 |
# File 'lib/xchan/unix_socket.rb', line 177 def to_a lock do [].tap { _1.push(recv) until empty? } end end |
#empty? ⇒ Boolean
Returns true when the channel is empty
186 187 188 189 |
# File 'lib/xchan/unix_socket.rb', line 186 def empty? return true if closed? lock { size.zero? } end |
#bytes_sent ⇒ Integer Also known as: bytes_written
Returns the total number of bytes written to the channel
197 198 199 |
# File 'lib/xchan/unix_socket.rb', line 197 def bytes_sent lock { @counter.bytes_written } end |
#bytes_received ⇒ Integer Also known as: bytes_read
Returns the total number of bytes read from the channel
205 206 207 |
# File 'lib/xchan/unix_socket.rb', line 205 def bytes_received lock { @counter.bytes_read } end |
#size ⇒ Integer
Returns the number of objects waiting to be read
213 214 215 |
# File 'lib/xchan/unix_socket.rb', line 213 def size lock { @bytes.size } end |
#wait_readable(s = nil) ⇒ Chan::UNIXSocket?
Waits for the channel to become readable
229 230 231 |
# File 'lib/xchan/unix_socket.rb', line 229 def wait_readable(s = nil) @r.wait_readable(s) and self end |