Skip to content

Commit 34eb455

Browse files
committed
Initial hackup.
1 parent e8d1b82 commit 34eb455

File tree

2 files changed

+113
-0
lines changed

2 files changed

+113
-0
lines changed

examples/command/client.rb

Whitespace-only changes.

examples/command/config.ru

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
require 'async'
2+
require 'async/queue'
3+
require 'async/barrier'
4+
5+
class Multiplexer
6+
def initialize(stream)
7+
@stream = stream
8+
9+
@streams = Hash.new{|h,k| h[k] = Async::Queue.new}
10+
end
11+
12+
def write(name, data)
13+
@stream.puts name
14+
@stream.puts data.bytesize
15+
@stream.write data
16+
@stream.flush
17+
end
18+
19+
def read
20+
if name = @stream.gets
21+
size = @stream.gets.to_i
22+
data = @stream.read(size)
23+
24+
return name, data
25+
end
26+
end
27+
28+
def run
29+
while message = self.read
30+
@streams[message.first] << message.last
31+
end
32+
end
33+
34+
def queue(name)
35+
@streams[name]
36+
end
37+
end
38+
39+
class Stream
40+
def initialize(multiplexer, name)
41+
@input, @output = IO.pipe
42+
@multiplexer = multiplexer
43+
@name = name
44+
end
45+
46+
attr :input
47+
attr :output
48+
49+
def write(data)
50+
@multiplexer.write(@name, data)
51+
end
52+
53+
def read
54+
@multiplexer.queue(@name).dequeue
55+
end
56+
57+
def sync_write
58+
@output.close
59+
60+
while chunk = @input.readparial(1024)
61+
write(chunk)
62+
end
63+
end
64+
65+
def sync_read
66+
@input.close
67+
68+
while chunk = read
69+
@output.write(chunk)
70+
end
71+
end
72+
end
73+
74+
run do |env|
75+
barrier = Async::Barrier.new
76+
77+
body = proc do |stream|
78+
multiplexer = Multiplexer.new(stream)
79+
barrier.async do
80+
multiplexer.run
81+
end
82+
83+
child_stdin = Stream.new(multiplexer, 'stdin')
84+
child_stdout = Stream.new(multiplexer, 'stdout')
85+
child_stderr = Stream.new(multiplexer, 'stderr')
86+
87+
pid = Process.spawn('while true; do ls; sleep 1; done', in: child_stdin.input, out: child_stdout.output, err: child_stderr.output)
88+
89+
barrier.async do
90+
child_stdin.sync_read
91+
end
92+
93+
barrier.async do
94+
child_stdout.sync_write
95+
end
96+
97+
barrier.async do
98+
child_stderr.sync_write
99+
end
100+
101+
Process.wait(pid)
102+
103+
barrier.wait
104+
ensure
105+
stream.close
106+
if pid
107+
Process.kill(:KILL, pid)
108+
Process.wait(pid)
109+
end
110+
end
111+
112+
[200, {}, body]
113+
end

0 commit comments

Comments
 (0)