Class: Chan::UNIXSocket

Inherits:
Object
  • Object
show all
Defined in:
lib/xchan/unix_socket.rb

Overview

An easy-to-use InterProcess Communication (IPC) library

Instance Attribute Summary collapse

Write methods collapse

Read methods collapse

Stat methods collapse

Wait methods collapse

Instance Method Summary collapse

Constructor Details

#initialize(s, sock_type: Socket::SOCK_DGRAM, tmpdir: Dir.tmpdir) ⇒ Chan::UNIXSocket

Returns an instance of Chan::UNIXSocket

Examples:

ch = Chan::UNIXSocket.new(:marshal)
ch.send([1,2,3])
ch.recv.pop # => 3
ch.close

Parameters:

  • s (Symbol, <#dump, #load>)

    The name of a serializer

  • sock_type (Integer) (defaults to: Socket::SOCK_DGRAM)

    Type of socket (eg Socket::SOCK_STREAM)

  • tmpdir (String) (defaults to: Dir.tmpdir)

    Directory where temporary files can be stored



44
45
46
47
48
49
50
# File 'lib/xchan/unix_socket.rb', line 44

def initialize(s, sock_type: Socket::SOCK_DGRAM, tmpdir: Dir.tmpdir)
  @s = Chan.serializers[s]&.call || s
  @r, @w = ::UNIXSocket.pair(sock_type)
  @bytes = Chan::Bytes.new(tmpdir)
  @counter = Chan::Counter.new(tmpdir)
  @lockf = Lock::File.new Chan.temporary_file(%w[xchan .lock], tmpdir:)
end

Instance Attribute Details

#rUNIXSocket (readonly)

Returns a socket used for read operations

Returns:

  • (UNIXSocket)

    Returns a socket used for read operations



13
14
15
# File 'lib/xchan/unix_socket.rb', line 13

def r
  @r
end

#wUNIXSocket (readonly)

Returns a socket used for write operations

Returns:

  • (UNIXSocket)

    Returns a socket used for write operations



18
19
20
# File 'lib/xchan/unix_socket.rb', line 18

def w
  @w
end

#s<#dump, #load> (readonly) Also known as: serializer

Returns the serializer used by the channel

Returns:

  • (<#dump, #load>)

    Returns the serializer used by the channel



23
24
25
# File 'lib/xchan/unix_socket.rb', line 23

def s
  @s
end

Instance Method Details

#wait_writable(s = nil) ⇒ Chan::UNIXSocket?

Waits for the channel to become writable

Parameters:

  • s (Float, Integer, nil) (defaults to: nil)

    The number of seconds to wait. Waits indefinitely with no arguments.

Returns:

  • (Chan::UNIXSocket, nil)

    Returns self when the channel is writable, otherwise returns nil



262
263
264
# File 'lib/xchan/unix_socket.rb', line 262

def wait_writable(s = nil)
  @w.wait_writable(s) and self
end

#closed?Boolean

Returns true when the channel is closed

Returns:

  • (Boolean)

    Returns true when the channel is closed



55
56
57
# File 'lib/xchan/unix_socket.rb', line 55

def closed?
  @r.closed? and @w.closed?
end

#closevoid

This method returns an undefined value.

Closes the channel

Raises:

  • (IOError)

    When the channel is closed



66
67
68
69
70
71
72
73
# File 'lib/xchan/unix_socket.rb', line 66

def close
  @lockf.lock
  raise IOError, "channel is closed" if closed?
  [@r, @w, @bytes, @lockf].each(&:close)
rescue IOError => ex
  @lockf.release
  raise(ex)
end

#send(object) ⇒ Object Also known as: write

Performs a blocking write

Parameters:

  • object (Object)

    An object

Returns:

  • (Object)

    Returns the number of bytes written to the channel

Raises:

  • (IOError)

    When the channel is closed



89
90
91
92
93
# File 'lib/xchan/unix_socket.rb', line 89

def send(object)
  send_nonblock(object)
rescue Chan::WaitWritable, Chan::WaitLockable
  retry
end

#send_nonblock(object) ⇒ Integer? Also known as: write_nonblock

Performs a non-blocking write

Parameters:

  • object (Object)

    An object

Returns:

  • (Integer, nil)

    Returns the number of bytes written to the channel

Raises:



113
114
115
116
117
118
119
120
121
122
123
124
125
# File 'lib/xchan/unix_socket.rb', line 113

def send_nonblock(object)
  @lockf.lock_nonblock
  raise IOError, "channel closed" if closed?
  len = @w.write_nonblock(serialize(object))
  @bytes.push(len)
  @counter.increment!(bytes_written: len)
  len.tap { @lockf.release }
rescue IOError, IO::WaitWritable, Errno::ENOBUFS => ex
  @lockf.release
  raise Chan::WaitWritable, ex.message
rescue Errno::EWOULDBLOCK => ex
  raise Chan::WaitLockable, ex.message
end

#recvObject Also known as: read

Performs a blocking read

Returns:

  • (Object)

    Returns an object from the channel

Raises:

  • (IOError)

    When the channel is closed



142
143
144
145
146
147
148
149
# File 'lib/xchan/unix_socket.rb', line 142

def recv
  recv_nonblock
rescue Chan::WaitReadable
  wait_readable
  retry
rescue Chan::WaitLockable
  retry
end

#recv_nonblockObject Also known as: read_nonblock

Performs a non-blocking read

Returns:

  • (Object)

    Returns an object from the channel

Raises:



166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/xchan/unix_socket.rb', line 166

def recv_nonblock
  @lockf.lock_nonblock
  raise IOError, "closed channel" if closed?
  len = @bytes.shift
  obj = deserialize(@r.read_nonblock(len.zero? ? 1 : len))
  @counter.increment!(bytes_read: len)
  obj.tap { @lockf.release }
rescue IOError => ex
  @lockf.release
  raise(ex)
rescue IO::WaitReadable => ex
  @bytes.unshift(len)
  @lockf.release
  raise Chan::WaitReadable, ex.message
rescue Errno::EAGAIN => ex
  raise Chan::WaitLockable, ex.message
end

#to_aArray<Object>

Returns the contents of the channel

Examples:

ch = xchan(:pure)
1.upto(4) { ch.send(_1) }
ch.to_a.last # => "4"

Returns:

  • (Array<Object>)

    Returns the contents of the channel



196
197
198
199
200
# File 'lib/xchan/unix_socket.rb', line 196

def to_a
  lock do
    [].tap { _1.push(recv) until empty? }
  end
end

#empty?Boolean

Returns true when the channel is empty

Returns:

  • (Boolean)

    Returns true when the channel is empty



205
206
207
208
# File 'lib/xchan/unix_socket.rb', line 205

def empty?
  return true if closed?
  lock { size.zero? }
end

#bytes_sentInteger Also known as: bytes_written

Returns the total number of bytes written to the channel

Returns:

  • (Integer)

    Returns the total number of bytes written to the channel



216
217
218
# File 'lib/xchan/unix_socket.rb', line 216

def bytes_sent
  lock { @counter.bytes_written }
end

#bytes_receivedInteger Also known as: bytes_read

Returns the total number of bytes read from the channel

Returns:

  • (Integer)

    Returns the total number of bytes read from the channel



224
225
226
# File 'lib/xchan/unix_socket.rb', line 224

def bytes_received
  lock { @counter.bytes_read }
end

#sizeInteger

Returns the number of objects waiting to be read

Returns:

  • (Integer)

    Returns the number of objects waiting to be read



232
233
234
# File 'lib/xchan/unix_socket.rb', line 232

def size
  lock { @bytes.size }
end

#wait_readable(s = nil) ⇒ Chan::UNIXSocket?

Waits for the channel to become readable

Parameters:

  • s (Float, Integer, nil) (defaults to: nil)

    The number of seconds to wait. Waits indefinitely with no arguments.

Returns:

  • (Chan::UNIXSocket, nil)

    Returns self when the channel is readable, otherwise returns nil



250
251
252
# File 'lib/xchan/unix_socket.rb', line 250

def wait_readable(s = nil)
  @r.wait_readable(s) and self
end