Class: Chan::UNIXSocket

Inherits:
Object
  • Object
show all
Defined in:
lib/xchan/unix_socket.rb

Overview

An easy-to-use InterProcess Communication (IPC) library

Instance Attribute Summary collapse

Write methods collapse

Read methods collapse

Stat methods collapse

Wait methods collapse

Instance Method Summary collapse

Constructor Details

#initialize(s, sock_type: Socket::SOCK_DGRAM, tmpdir: Dir.tmpdir) ⇒ Chan::UNIXSocket

Returns an instance of Chan::UNIXSocket

Examples:

ch = Chan::UNIXSocket.new(:marshal)
ch.send([1,2,3])
ch.recv.pop # => 3
ch.close

Parameters:

  • s (Symbol, <#dump, #load>)

    The name of a serializer

  • sock_type (Integer) (defaults to: Socket::SOCK_DGRAM)

    Type of socket (eg Socket::SOCK_STREAM)

  • tmpdir (String) (defaults to: Dir.tmpdir)

    Directory where temporary files can be stored



40
41
42
43
44
45
46
# File 'lib/xchan/unix_socket.rb', line 40

def initialize(s, sock_type: Socket::SOCK_DGRAM, tmpdir: Dir.tmpdir)
  @s = Chan.serializers[s]&.call || s
  @r, @w = ::UNIXSocket.pair(sock_type)
  @bytes = Chan::Bytes.new(tmpdir)
  @counter = Chan::Counter.new(tmpdir)
  @lockf = Lock::File.new Chan.temporary_file(%w[xchan .lock], tmpdir:)
end

Instance Attribute Details

#rUNIXSocket (readonly)

Returns a socket used for read operations

Returns:

  • (UNIXSocket)

    Returns a socket used for read operations



13
14
15
# File 'lib/xchan/unix_socket.rb', line 13

def r
  @r
end

#wUNIXSocket (readonly)

Returns a socket used for write operations

Returns:

  • (UNIXSocket)

    Returns a socket used for write operations



18
19
20
# File 'lib/xchan/unix_socket.rb', line 18

def w
  @w
end

#s<#dump, #load> (readonly) Also known as: serializer

Returns the serializer used by the channel

Returns:

  • (<#dump, #load>)

    Returns the serializer used by the channel



23
24
25
# File 'lib/xchan/unix_socket.rb', line 23

def s
  @s
end

Instance Method Details

#wait_writable(s = nil) ⇒ Chan::UNIXSocket?

Waits for the channel to become writable

Parameters:

  • s (Float, Integer, nil) (defaults to: nil)

    The number of seconds to wait. Waits indefinitely with no arguments.

Returns:

  • (Chan::UNIXSocket, nil)

    Returns self when the channel is writable, otherwise returns nil



237
238
239
# File 'lib/xchan/unix_socket.rb', line 237

def wait_writable(s = nil)
  @w.wait_writable(s) and self
end

#closed?Boolean

Returns true when the channel is closed

Returns:

  • (Boolean)

    Returns true when the channel is closed



51
52
53
# File 'lib/xchan/unix_socket.rb', line 51

def closed?
  @r.closed? and @w.closed?
end

#closevoid

This method returns an undefined value.

Closes the channel

Raises:

  • (IOError)

    When the channel is closed



60
61
62
63
64
65
66
67
# File 'lib/xchan/unix_socket.rb', line 60

def close
  @lockf.lock
  raise IOError, "channel is closed" if closed?
  [@r, @w, @bytes, @lockf].each(&:close)
rescue IOError => ex
  @lockf.release
  raise(ex)
end

#send(object) ⇒ Object Also known as: write

Performs a blocking write

Parameters:

  • object (Object)

    An object

Returns:

  • (Object)

    Returns the number of bytes written to the channel

Raises:

  • (IOError)

    When the channel is closed



80
81
82
83
84
# File 'lib/xchan/unix_socket.rb', line 80

def send(object)
  send_nonblock(object)
rescue Chan::WaitWritable, Chan::WaitLockable
  retry
end

#send_nonblock(object) ⇒ Integer? Also known as: write_nonblock

Performs a non-blocking write

Parameters:

  • object (Object)

    An object

Returns:

  • (Integer, nil)

    Returns the number of bytes written to the channel

Raises:



99
100
101
102
103
104
105
106
107
108
109
110
111
# File 'lib/xchan/unix_socket.rb', line 99

def send_nonblock(object)
  @lockf.lock_nonblock
  raise IOError, "channel closed" if closed?
  len = @w.write_nonblock(serialize(object))
  @bytes.push(len)
  @counter.increment!(bytes_written: len)
  len.tap { @lockf.release }
rescue IOError, IO::WaitWritable, Errno::ENOBUFS => ex
  @lockf.release
  raise Chan::WaitWritable, ex.message
rescue Errno::EWOULDBLOCK => ex
  raise Chan::WaitLockable, ex.message
end

#recvObject Also known as: read

Performs a blocking read

Returns:

  • (Object)

    Returns an object from the channel

Raises:

  • (IOError)

    When the channel is closed



126
127
128
129
130
131
132
133
# File 'lib/xchan/unix_socket.rb', line 126

def recv
  recv_nonblock
rescue Chan::WaitReadable
  wait_readable
  retry
rescue Chan::WaitLockable
  retry
end

#recv_nonblockObject Also known as: read_nonblock

Performs a non-blocking read

Returns:

  • (Object)

    Returns an object from the channel

Raises:



146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# File 'lib/xchan/unix_socket.rb', line 146

def recv_nonblock
  @lockf.lock_nonblock
  raise IOError, "closed channel" if closed?
  len = @bytes.shift
  obj = deserialize(@r.read_nonblock(len.zero? ? 1 : len))
  @counter.increment!(bytes_read: len)
  obj.tap { @lockf.release }
rescue IOError => ex
  @lockf.release
  raise(ex)
rescue IO::WaitReadable => ex
  @bytes.unshift(len)
  @lockf.release
  raise Chan::WaitReadable, ex.message
rescue Errno::EAGAIN => ex
  raise Chan::WaitLockable, ex.message
end

#to_aArray<Object>

Returns the contents of the channel

Examples:

ch = xchan(:pure)
1.upto(4) { ch.send(_1) }
ch.to_a.last # => "4"

Returns:

  • (Array<Object>)

    Returns the contents of the channel



175
176
177
178
179
# File 'lib/xchan/unix_socket.rb', line 175

def to_a
  lock do
    [].tap { _1.push(recv) until empty? }
  end
end

#empty?Boolean

Returns true when the channel is empty

Returns:

  • (Boolean)

    Returns true when the channel is empty



184
185
186
187
# File 'lib/xchan/unix_socket.rb', line 184

def empty?
  return true if closed?
  lock { size.zero? }
end

#bytes_sentInteger Also known as: bytes_written

Returns the total number of bytes written to the channel

Returns:

  • (Integer)

    Returns the total number of bytes written to the channel



195
196
197
# File 'lib/xchan/unix_socket.rb', line 195

def bytes_sent
  lock { @counter.bytes_written }
end

#bytes_receivedInteger Also known as: bytes_read

Returns the total number of bytes read from the channel

Returns:

  • (Integer)

    Returns the total number of bytes read from the channel



203
204
205
# File 'lib/xchan/unix_socket.rb', line 203

def bytes_received
  lock { @counter.bytes_read }
end

#sizeInteger

Returns the number of objects waiting to be read

Returns:

  • (Integer)

    Returns the number of objects waiting to be read



211
212
213
# File 'lib/xchan/unix_socket.rb', line 211

def size
  lock { @bytes.size }
end

#wait_readable(s = nil) ⇒ Chan::UNIXSocket?

Waits for the channel to become readable

Parameters:

  • s (Float, Integer, nil) (defaults to: nil)

    The number of seconds to wait. Waits indefinitely with no arguments.

Returns:

  • (Chan::UNIXSocket, nil)

    Returns self when the channel is readable, otherwise returns nil



227
228
229
# File 'lib/xchan/unix_socket.rb', line 227

def wait_readable(s = nil)
  @r.wait_readable(s) and self
end