Class: Chan::UNIXSocket
- Inherits:
-
Object
- Object
- Chan::UNIXSocket
- Defined in:
- lib/xchan/unix_socket.rb
Overview
An easy-to-use InterProcess Communication (IPC) library
Instance Attribute Summary collapse
-
#r ⇒ UNIXSocket
readonly
Returns a socket used for read operations.
-
#w ⇒ UNIXSocket
readonly
Returns a socket used for write operations.
-
#s ⇒ <#dump, #load>
(also: #serializer)
readonly
Returns the serializer used by the channel.
Write methods collapse
-
#send(object) ⇒ Object
(also: #write)
Performs a blocking write.
-
#send_nonblock(object) ⇒ Integer?
(also: #write_nonblock)
Performs a non-blocking write.
Read methods collapse
-
#recv ⇒ Object
(also: #read)
Performs a blocking read.
-
#recv_nonblock ⇒ Object
(also: #read_nonblock)
Performs a non-blocking read.
Stat methods collapse
-
#bytes_sent ⇒ Integer
(also: #bytes_written)
Returns the total number of bytes written to the channel.
-
#bytes_received ⇒ Integer
(also: #bytes_read)
Returns the total number of bytes read from the channel.
-
#size ⇒ Integer
Returns the number of objects waiting to be read.
Wait methods collapse
-
#wait_writable(s = nil) ⇒ Chan::UNIXSocket?
Waits for the channel to become writable.
-
#wait_readable(s = nil) ⇒ Chan::UNIXSocket?
Waits for the channel to become readable.
Instance Method Summary collapse
-
#closed? ⇒ Boolean
Returns true when the channel is closed.
-
#close ⇒ void
Closes the channel.
-
#to_a ⇒ Array<Object>
Returns the contents of the channel.
-
#empty? ⇒ Boolean
Returns true when the channel is empty.
-
#initialize(s, sock_type: Socket::SOCK_DGRAM, tmpdir: Dir.tmpdir) ⇒ Chan::UNIXSocket
constructor
Returns an instance of Chan::UNIXSocket.
Constructor Details
#initialize(s, sock_type: Socket::SOCK_DGRAM, tmpdir: Dir.tmpdir) ⇒ Chan::UNIXSocket
Returns an instance of Chan::UNIXSocket
40 41 42 43 44 45 46 |
# File 'lib/xchan/unix_socket.rb', line 40 def initialize(s, sock_type: Socket::SOCK_DGRAM, tmpdir: Dir.tmpdir) @s = Chan.serializers[s]&.call || s @r, @w = ::UNIXSocket.pair(sock_type) @bytes = Chan::Bytes.new(tmpdir) @counter = Chan::Counter.new(tmpdir) @lockf = Lock::File.new Chan.temporary_file(%w[xchan .lock], tmpdir:) end |
Instance Attribute Details
#r ⇒ UNIXSocket (readonly)
Returns a socket used for read operations
13 14 15 |
# File 'lib/xchan/unix_socket.rb', line 13 def r @r end |
#w ⇒ UNIXSocket (readonly)
Returns a socket used for write operations
18 19 20 |
# File 'lib/xchan/unix_socket.rb', line 18 def w @w end |
#s ⇒ <#dump, #load> (readonly) Also known as: serializer
Returns the serializer used by the channel
23 24 25 |
# File 'lib/xchan/unix_socket.rb', line 23 def s @s end |
Instance Method Details
#wait_writable(s = nil) ⇒ Chan::UNIXSocket?
Waits for the channel to become writable
237 238 239 |
# File 'lib/xchan/unix_socket.rb', line 237 def wait_writable(s = nil) @w.wait_writable(s) and self end |
#closed? ⇒ Boolean
Returns true when the channel is closed
51 52 53 |
# File 'lib/xchan/unix_socket.rb', line 51 def closed? @r.closed? and @w.closed? end |
#close ⇒ void
This method returns an undefined value.
Closes the channel
60 61 62 63 64 65 66 67 |
# File 'lib/xchan/unix_socket.rb', line 60 def close @lockf.lock raise IOError, "channel is closed" if closed? [@r, @w, @bytes, @lockf].each(&:close) rescue IOError => ex @lockf.release raise(ex) end |
#send(object) ⇒ Object Also known as: write
Performs a blocking write
80 81 82 83 84 |
# File 'lib/xchan/unix_socket.rb', line 80 def send(object) send_nonblock(object) rescue Chan::WaitWritable, Chan::WaitLockable retry end |
#send_nonblock(object) ⇒ Integer? Also known as: write_nonblock
Performs a non-blocking write
99 100 101 102 103 104 105 106 107 108 109 110 111 |
# File 'lib/xchan/unix_socket.rb', line 99 def send_nonblock(object) @lockf.lock_nonblock raise IOError, "channel closed" if closed? len = @w.write_nonblock(serialize(object)) @bytes.push(len) @counter.increment!(bytes_written: len) len.tap { @lockf.release } rescue IOError, IO::WaitWritable, Errno::ENOBUFS => ex @lockf.release raise Chan::WaitWritable, ex. rescue Errno::EWOULDBLOCK => ex raise Chan::WaitLockable, ex. end |
#recv ⇒ Object Also known as: read
Performs a blocking read
126 127 128 129 130 131 132 133 |
# File 'lib/xchan/unix_socket.rb', line 126 def recv recv_nonblock rescue Chan::WaitReadable wait_readable retry rescue Chan::WaitLockable retry end |
#recv_nonblock ⇒ Object Also known as: read_nonblock
Performs a non-blocking read
146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
# File 'lib/xchan/unix_socket.rb', line 146 def recv_nonblock @lockf.lock_nonblock raise IOError, "closed channel" if closed? len = @bytes.shift obj = deserialize(@r.read_nonblock(len.zero? ? 1 : len)) @counter.increment!(bytes_read: len) obj.tap { @lockf.release } rescue IOError => ex @lockf.release raise(ex) rescue IO::WaitReadable => ex @bytes.unshift(len) @lockf.release raise Chan::WaitReadable, ex. rescue Errno::EAGAIN => ex raise Chan::WaitLockable, ex. end |
#to_a ⇒ Array<Object>
Returns the contents of the channel
175 176 177 178 179 |
# File 'lib/xchan/unix_socket.rb', line 175 def to_a lock do [].tap { _1.push(recv) until empty? } end end |
#empty? ⇒ Boolean
Returns true when the channel is empty
184 185 186 187 |
# File 'lib/xchan/unix_socket.rb', line 184 def empty? return true if closed? lock { size.zero? } end |
#bytes_sent ⇒ Integer Also known as: bytes_written
Returns the total number of bytes written to the channel
195 196 197 |
# File 'lib/xchan/unix_socket.rb', line 195 def bytes_sent lock { @counter.bytes_written } end |
#bytes_received ⇒ Integer Also known as: bytes_read
Returns the total number of bytes read from the channel
203 204 205 |
# File 'lib/xchan/unix_socket.rb', line 203 def bytes_received lock { @counter.bytes_read } end |
#size ⇒ Integer
Returns the number of objects waiting to be read
211 212 213 |
# File 'lib/xchan/unix_socket.rb', line 211 def size lock { @bytes.size } end |
#wait_readable(s = nil) ⇒ Chan::UNIXSocket?
Waits for the channel to become readable
227 228 229 |
# File 'lib/xchan/unix_socket.rb', line 227 def wait_readable(s = nil) @r.wait_readable(s) and self end |