Class: Chan::UNIXSocket

Inherits:
Object
  • Object
show all
Defined in:
lib/xchan/unix_socket.rb

Overview

An easy-to-use InterProcess Communication (IPC) library

Instance Attribute Summary collapse

Write methods collapse

Read methods collapse

Stat methods collapse

Wait methods collapse

Instance Method Summary collapse

Constructor Details

#initialize(serializer, sock: Socket::SOCK_DGRAM, tmpdir: Dir.tmpdir, lock: :file) ⇒ Chan::UNIXSocket

Returns an instance of Chan::UNIXSocket

Examples:

ch = Chan::UNIXSocket.new(:marshal)
ch.send([1,2,3])
ch.recv.pop # => 3
ch.close

Parameters:

  • serializer (Symbol, <#dump, #load>)

    The name of a serializer

  • sock (Integer) (defaults to: Socket::SOCK_DGRAM)

    Type of socket (eg Socket::SOCK_STREAM)

  • tmpdir (String) (defaults to: Dir.tmpdir)

    Directory where temporary files can be stored

  • lock (Symbol, <Lock::File, Chan::NullLock>) (defaults to: :file)

    The name of a lock, or an instance of Lock::File, or Chan::NullLock



42
43
44
45
46
47
48
# File 'lib/xchan/unix_socket.rb', line 42

def initialize(serializer, sock: Socket::SOCK_DGRAM, tmpdir: Dir.tmpdir, lock: :file)
  @s = Chan.serializers[serializer]&.call || serializer
  @r, @w = ::UNIXSocket.pair(sock)
  @bytes = Chan::Bytes.new(tmpdir)
  @counter = Chan::Counter.new(tmpdir)
  @lock = Chan.locks[lock]&.call(tmpdir) || lock
end

Instance Attribute Details

#rUNIXSocket (readonly)

Returns a socket used for read operations

Returns:

  • (UNIXSocket)

    Returns a socket used for read operations



13
14
15
# File 'lib/xchan/unix_socket.rb', line 13

def r
  @r
end

#wUNIXSocket (readonly)

Returns a socket used for write operations

Returns:

  • (UNIXSocket)

    Returns a socket used for write operations



18
19
20
# File 'lib/xchan/unix_socket.rb', line 18

def w
  @w
end

#s<#dump, #load> (readonly) Also known as: serializer

Returns the serializer used by the channel

Returns:

  • (<#dump, #load>)

    Returns the serializer used by the channel



23
24
25
# File 'lib/xchan/unix_socket.rb', line 23

def s
  @s
end

Instance Method Details

#wait_writable(s = nil) ⇒ Chan::UNIXSocket?

Waits for the channel to become writable

Parameters:

  • s (Float, Integer, nil) (defaults to: nil)

    The number of seconds to wait. Waits indefinitely with no arguments.

Returns:

  • (Chan::UNIXSocket, nil)

    Returns self when the channel is writable, otherwise returns nil



239
240
241
# File 'lib/xchan/unix_socket.rb', line 239

def wait_writable(s = nil)
  @w.wait_writable(s) and self
end

#closed?Boolean

Returns true when the channel is closed

Returns:

  • (Boolean)

    Returns true when the channel is closed



53
54
55
# File 'lib/xchan/unix_socket.rb', line 53

def closed?
  @r.closed? and @w.closed?
end

#closevoid

This method returns an undefined value.

Closes the channel

Raises:

  • (IOError)

    When the channel is closed



62
63
64
65
66
67
68
69
# File 'lib/xchan/unix_socket.rb', line 62

def close
  @lock.lock
  raise IOError, "channel is closed" if closed?
  [@r, @w, @bytes, @lock].each(&:close)
rescue IOError => ex
  @lock.release
  raise(ex)
end

#send(object) ⇒ Object Also known as: write

Performs a blocking write

Parameters:

  • object (Object)

    An object

Returns:

  • (Object)

    Returns the number of bytes written to the channel

Raises:

  • (IOError)

    When the channel is closed



82
83
84
85
86
# File 'lib/xchan/unix_socket.rb', line 82

def send(object)
  send_nonblock(object)
rescue Chan::WaitWritable, Chan::WaitLockable
  retry
end

#send_nonblock(object) ⇒ Integer? Also known as: write_nonblock

Performs a non-blocking write

Parameters:

  • object (Object)

    An object

Returns:

  • (Integer, nil)

    Returns the number of bytes written to the channel

Raises:



101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/xchan/unix_socket.rb', line 101

def send_nonblock(object)
  @lock.lock_nonblock
  raise IOError, "channel closed" if closed?
  len = @w.write_nonblock(serialize(object))
  @bytes.push(len)
  @counter.increment!(bytes_written: len)
  len.tap { @lock.release }
rescue IOError, IO::WaitWritable, Errno::ENOBUFS => ex
  @lock.release
  raise Chan::WaitWritable, ex.message
rescue Errno::EWOULDBLOCK => ex
  raise Chan::WaitLockable, ex.message
end

#recvObject Also known as: read

Performs a blocking read

Returns:

  • (Object)

    Returns an object from the channel

Raises:

  • (IOError)

    When the channel is closed



128
129
130
131
132
133
134
135
# File 'lib/xchan/unix_socket.rb', line 128

def recv
  recv_nonblock
rescue Chan::WaitReadable
  wait_readable
  retry
rescue Chan::WaitLockable
  retry
end

#recv_nonblockObject Also known as: read_nonblock

Performs a non-blocking read

Returns:

  • (Object)

    Returns an object from the channel

Raises:



148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
# File 'lib/xchan/unix_socket.rb', line 148

def recv_nonblock
  @lock.lock_nonblock
  raise IOError, "closed channel" if closed?
  len = @bytes.shift
  obj = deserialize(@r.read_nonblock(len.zero? ? 1 : len))
  @counter.increment!(bytes_read: len)
  obj.tap { @lock.release }
rescue IOError => ex
  @lock.release
  raise(ex)
rescue IO::WaitReadable => ex
  @bytes.unshift(len)
  @lock.release
  raise Chan::WaitReadable, ex.message
rescue Errno::EAGAIN => ex
  raise Chan::WaitLockable, ex.message
end

#to_aArray<Object>

Returns the contents of the channel

Examples:

ch = xchan(:pure)
1.upto(4) { ch.send(_1) }
ch.to_a.last # => "4"

Returns:

  • (Array<Object>)

    Returns the contents of the channel



177
178
179
180
181
# File 'lib/xchan/unix_socket.rb', line 177

def to_a
  lock do
    [].tap { _1.push(recv) until empty? }
  end
end

#empty?Boolean

Returns true when the channel is empty

Returns:

  • (Boolean)

    Returns true when the channel is empty



186
187
188
189
# File 'lib/xchan/unix_socket.rb', line 186

def empty?
  return true if closed?
  lock { size.zero? }
end

#bytes_sentInteger Also known as: bytes_written

Returns the total number of bytes written to the channel

Returns:

  • (Integer)

    Returns the total number of bytes written to the channel



197
198
199
# File 'lib/xchan/unix_socket.rb', line 197

def bytes_sent
  lock { @counter.bytes_written }
end

#bytes_receivedInteger Also known as: bytes_read

Returns the total number of bytes read from the channel

Returns:

  • (Integer)

    Returns the total number of bytes read from the channel



205
206
207
# File 'lib/xchan/unix_socket.rb', line 205

def bytes_received
  lock { @counter.bytes_read }
end

#sizeInteger

Returns the number of objects waiting to be read

Returns:

  • (Integer)

    Returns the number of objects waiting to be read



213
214
215
# File 'lib/xchan/unix_socket.rb', line 213

def size
  lock { @bytes.size }
end

#wait_readable(s = nil) ⇒ Chan::UNIXSocket?

Waits for the channel to become readable

Parameters:

  • s (Float, Integer, nil) (defaults to: nil)

    The number of seconds to wait. Waits indefinitely with no arguments.

Returns:

  • (Chan::UNIXSocket, nil)

    Returns self when the channel is readable, otherwise returns nil



229
230
231
# File 'lib/xchan/unix_socket.rb', line 229

def wait_readable(s = nil)
  @r.wait_readable(s) and self
end