Skip to content

Commit 9762521

Browse files
committed
spawn: Allow libuv IOServers to be passed as extra FDs
Extracted from #59271, but I now have an independent use case. This allows libuv `IOServer`s (unix domain servers, tcp servers, etc) to be passed as extra fd arguments to child processes.
1 parent 71c6f82 commit 9762521

File tree

5 files changed

+54
-14
lines changed

5 files changed

+54
-14
lines changed

base/cmd.jl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ if OS_HANDLE !== RawFD
188188
end
189189
setup_stdio(stdio::Union{DevNull,OS_HANDLE,RawFD}, ::Bool) = (stdio, false)
190190

191-
const Redirectable = Union{IO, FileRedirect, RawFD, OS_HANDLE}
191+
const Redirectable = Union{IO, IOServer, FileRedirect, RawFD, OS_HANDLE}
192192
const StdIOSet = NTuple{3, Redirectable}
193193

194194
struct CmdRedirect <: AbstractCmd

base/process.jl

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ function _uv_hook_close(proc::Process)
8484
nothing
8585
end
8686

87-
const SpawnIO = Union{IO, RawFD, OS_HANDLE, SyncCloseFD} # internal copy of Redirectable, removing FileRedirect and adding SyncCloseFD
87+
const SpawnIO = Union{IO, IOServer, RawFD, OS_HANDLE, SyncCloseFD} # internal copy of Redirectable, removing FileRedirect and adding SyncCloseFD
8888
const SpawnIOs = Memory{SpawnIO} # convenience name for readability (used for dispatch also to clearly distinguish from Vector{Redirectable})
8989

9090
function as_cpumask(cpus::Vector{UInt16})
@@ -341,13 +341,13 @@ close_stdio(stdio::SyncCloseFD) = close_stdio(stdio.fd)
341341

342342
spawn_opts_swallow(stdios::StdIOSet) = Redirectable[stdios...]
343343
spawn_opts_inherit(stdios::StdIOSet) = Redirectable[stdios...]
344-
spawn_opts_swallow(in::Redirectable=devnull, out::Redirectable=devnull, err::Redirectable=devnull) =
345-
Redirectable[in, out, err]
344+
spawn_opts_swallow(in::Redirectable=devnull, out::Redirectable=devnull, err::Redirectable=devnull, extra::Redirectable...) =
345+
Redirectable[in, out, err, extra...]
346346
# pass original descriptors to child processes by default, because we might
347347
# have already exhausted and closed the libuv object for our standard streams.
348348
# ref issue #8529
349-
spawn_opts_inherit(in::Redirectable=RawFD(0), out::Redirectable=RawFD(1), err::Redirectable=RawFD(2)) =
350-
Redirectable[in, out, err]
349+
spawn_opts_inherit(in::Redirectable=RawFD(0), out::Redirectable=RawFD(1), err::Redirectable=RawFD(2), extra::Redirectable...) =
350+
Redirectable[in, out, err, extra...]
351351

352352
function eachline(cmd::AbstractCmd; keep::Bool=false)
353353
out = PipeEndpoint()

base/stream.jl

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -202,12 +202,7 @@ end
202202

203203
function PipeEndpoint(fd::OS_HANDLE)
204204
pipe = PipeEndpoint()
205-
iolock_begin()
206-
err = ccall(:uv_pipe_open, Int32, (Ptr{Cvoid}, OS_HANDLE), pipe.handle, fd)
207-
uv_error("pipe_open", err)
208-
pipe.status = StatusOpen
209-
iolock_end()
210-
return pipe
205+
return open_pipe!(pipe, fd)
211206
end
212207
if OS_HANDLE != RawFD
213208
PipeEndpoint(fd::RawFD) = PipeEndpoint(Libc._get_osfhandle(fd))
@@ -283,8 +278,8 @@ end
283278
lock(s::LibuvStream) = lock(s.lock)
284279
unlock(s::LibuvStream) = unlock(s.lock)
285280

286-
setup_stdio(stream::LibuvStream, ::Bool) = (stream, false)
287-
rawhandle(stream::LibuvStream) = stream.handle
281+
setup_stdio(stream::Union{LibuvStream, LibuvServer}, ::Bool) = (stream, false)
282+
rawhandle(stream::Union{LibuvStream, LibuvServer}) = stream.handle
288283
unsafe_convert(::Type{Ptr{Cvoid}}, s::Union{LibuvStream, LibuvServer}) = s.handle
289284

290285
function init_stdio(handle::Ptr{Cvoid})

stdlib/Sockets/src/PipeServer.jl

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,23 @@ function PipeServer()
2424
return pipe
2525
end
2626

27+
function PipeServer(handle::OS_HANDLE)
28+
pipe = PipeServer()
29+
return Base.open_pipe!(pipe, handle)
30+
end
31+
32+
function Base.open_pipe!(p::PipeServer, handle::OS_HANDLE)
33+
iolock_begin()
34+
if p.status != StatusInit
35+
error("pipe is already in use or has been closed")
36+
end
37+
err = ccall(:uv_pipe_open, Int32, (Ptr{Cvoid}, OS_HANDLE), p.handle, handle)
38+
uv_error("pipe_open", err)
39+
p.status = StatusOpen
40+
iolock_end()
41+
return p
42+
end
43+
2744
## server functions ##
2845

2946
accept(server::PipeServer) = accept(server, PipeEndpoint())

test/spawn.jl

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1053,3 +1053,31 @@ end
10531053
@assert eltype(args) != String
10541054
@test Cmd(["ls", args...]) == `ls -l /tmp`
10551055
end
1056+
1057+
# Test passing a pipe server as an addition fd
1058+
@testset "Pipe server as additional fd" begin
1059+
mktempdir() do dir
1060+
path = joinpath(dir, "test.sock")
1061+
server = Sockets.PipeServer()
1062+
bind(server, path)
1063+
Base.errormonitor(@async begin
1064+
local client
1065+
while true
1066+
try
1067+
client = Sockets.connect(path)
1068+
break
1069+
catch e
1070+
isa(e, Base.IOError) || rethrow(e)
1071+
end
1072+
sleep(1)
1073+
end
1074+
println(client, "Hello Socket!")
1075+
closewrite(client)
1076+
end)
1077+
buf = IOBuffer()
1078+
proc = run(`$(Base.julia_cmd()) -e 'using Sockets; s = listen(Sockets.PipeServer(RawFD(3))); c = accept(s); print(read(c, String))'`, devnull, buf, stderr, server)
1079+
close(server)
1080+
@test success(proc)
1081+
@test String(take!(buf)) == "Hello Socket!\n"
1082+
end
1083+
end

0 commit comments

Comments
 (0)