Skip to content

Commit eac7184

Browse files
committed
Corenet API: Move more logic away from commands
License: MIT Signed-off-by: Łukasz Magiera <[email protected]>
1 parent e66e8bb commit eac7184

File tree

6 files changed

+214
-149
lines changed

6 files changed

+214
-149
lines changed

core/commands/corenet.go

Lines changed: 23 additions & 145 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,10 @@ import (
99
"text/tabwriter"
1010

1111
cmds "github.com/ipfs/go-ipfs/commands"
12-
core "github.com/ipfs/go-ipfs/core"
13-
corenet "github.com/ipfs/go-ipfs/corenet"
12+
"github.com/ipfs/go-ipfs/core"
1413
cnet "github.com/ipfs/go-ipfs/corenet/net"
1514

16-
net "gx/ipfs/QmRscs8KxrSmSv4iuevHv8JfuUzHBMoqiaHzxfDRiksd6e/go-libp2p-net"
17-
peerstore "gx/ipfs/QmXZSd1qR5BxZkPyuwfT5jpqQFScZccoZvDneXsKzCNHWX/go-libp2p-peerstore"
1815
ma "gx/ipfs/QmcyqRMCAXVtYPS4DiBrA7sezL9rRGfW8Ctx7cywL4TXJj/go-multiaddr"
19-
manet "gx/ipfs/Qmf1Gq7N45Rpuw7ev47uWgH6dLPtdnvcMRNPkVBwqjLJg2/go-multiaddr-net"
2016
)
2117

2218
// CorenetAppInfoOutput is output type of ls command
@@ -188,6 +184,12 @@ var corenetStreamsCmd = &cmds.Command{
188184
var corenetListenCmd = &cmds.Command{
189185
Helptext: cmds.HelpText{
190186
Tagline: "Create application protocol listener and proxy to network multiaddr.",
187+
ShortDescription: `
188+
Register a p2p connection handler and proxies the connections to a specified
189+
address.
190+
191+
Note that the connections originate from the ipfs daemon process.
192+
`,
191193
},
192194
Arguments: []cmds.Argument{
193195
cmds.StringArg("Protocol", true, false, "Protocol identifier."),
@@ -212,7 +214,7 @@ var corenetListenCmd = &cmds.Command{
212214
}
213215

214216
proto := "/app/" + req.Arguments()[0]
215-
if checkProtoExists(n.PeerHost.Mux().Protocols(), proto) {
217+
if cnet.CheckProtoExists(n, proto) {
216218
res.SetError(errors.New("protocol handler already registered"), cmds.ErrNormal)
217219
return
218220
}
@@ -223,25 +225,12 @@ var corenetListenCmd = &cmds.Command{
223225
return
224226
}
225227

226-
listener, err := cnet.Listen(n, proto)
228+
_, err = cnet.NewListener(n, proto, addr)
227229
if err != nil {
228230
res.SetError(err, cmds.ErrNormal)
229231
return
230232
}
231233

232-
app := corenet.AppInfo{
233-
Identity: n.Identity,
234-
Protocol: proto,
235-
Address: addr,
236-
Closer: listener,
237-
Running: true,
238-
Registry: &n.Corenet.Apps,
239-
}
240-
241-
go acceptStreams(n, &app, listener)
242-
243-
n.Corenet.Apps.Register(&app)
244-
245234
// Successful response.
246235
res.SetOutput(&CorenetAppInfoOutput{
247236
Protocol: proto,
@@ -250,66 +239,17 @@ var corenetListenCmd = &cmds.Command{
250239
},
251240
}
252241

253-
func checkProtoExists(protos []string, proto string) bool {
254-
for _, p := range protos {
255-
if p != proto {
256-
continue
257-
}
258-
return true
259-
}
260-
return false
261-
}
262-
263-
func acceptStreams(n *core.IpfsNode, app *corenet.AppInfo, listener cnet.Listener) {
264-
for app.Running {
265-
remote, err := listener.Accept()
266-
if err != nil {
267-
listener.Close()
268-
break
269-
}
270-
271-
local, err := manet.Dial(app.Address)
272-
if err != nil {
273-
remote.Close()
274-
continue
275-
}
276-
277-
stream := corenet.StreamInfo{
278-
Protocol: app.Protocol,
279-
280-
LocalPeer: app.Identity,
281-
LocalAddr: app.Address,
282-
283-
RemotePeer: remote.Conn().RemotePeer(),
284-
RemoteAddr: remote.Conn().RemoteMultiaddr(),
285-
286-
Local: local,
287-
Remote: remote,
288-
289-
Registry: &n.Corenet.Streams,
290-
}
291-
292-
n.Corenet.Streams.Register(&stream)
293-
startStreaming(&stream)
294-
}
295-
n.Corenet.Apps.Deregister(app.Protocol)
296-
}
297-
298-
func startStreaming(stream *corenet.StreamInfo) {
299-
go func() {
300-
io.Copy(stream.Local, stream.Remote)
301-
stream.Close()
302-
}()
303-
304-
go func() {
305-
io.Copy(stream.Remote, stream.Local)
306-
stream.Close()
307-
}()
308-
}
309-
310242
var corenetDialCmd = &cmds.Command{
311243
Helptext: cmds.HelpText{
312244
Tagline: "Dial to an application service.",
245+
246+
ShortDescription: `
247+
Establish a new connection to a peer service.
248+
249+
When a connection is made to a peer service the ipfs daemon will setup one time
250+
TCP listener and return it's bind port, this way a dialing application can
251+
transparently connect to a corenet service.
252+
`,
313253
},
314254
Arguments: []cmds.Argument{
315255
cmds.StringArg("Peer", true, false, "Remote peer to connect to"),
@@ -351,47 +291,12 @@ var corenetDialCmd = &cmds.Command{
351291
}
352292
}
353293

354-
lnet, _, err := manet.DialArgs(bindAddr)
294+
app, err := cnet.Dial(n, addr, peer, proto, bindAddr)
355295
if err != nil {
356296
res.SetError(err, cmds.ErrNormal)
357297
return
358298
}
359299

360-
app := corenet.AppInfo{
361-
Identity: n.Identity,
362-
Protocol: proto,
363-
}
364-
365-
n.Peerstore.AddAddr(peer, addr, peerstore.TempAddrTTL)
366-
367-
remote, err := cnet.Dial(n, peer, proto)
368-
if err != nil {
369-
res.SetError(err, cmds.ErrNormal)
370-
return
371-
}
372-
373-
switch lnet {
374-
case "tcp", "tcp4", "tcp6":
375-
listener, err := manet.Listen(bindAddr)
376-
if err != nil {
377-
res.SetError(err, cmds.ErrNormal)
378-
if err := remote.Close(); err != nil {
379-
res.SetError(err, cmds.ErrNormal)
380-
}
381-
return
382-
}
383-
384-
app.Address = listener.Multiaddr()
385-
app.Closer = listener
386-
app.Running = true
387-
388-
go doAccept(n, &app, remote, listener)
389-
390-
default:
391-
res.SetError(errors.New("unsupported protocol: "+lnet), cmds.ErrNormal)
392-
return
393-
}
394-
395300
output := CorenetAppInfoOutput{
396301
Protocol: app.Protocol,
397302
Address: app.Address.String(),
@@ -401,33 +306,6 @@ var corenetDialCmd = &cmds.Command{
401306
},
402307
}
403308

404-
func doAccept(n *core.IpfsNode, app *corenet.AppInfo, remote net.Stream, listener manet.Listener) {
405-
defer listener.Close()
406-
407-
local, err := listener.Accept()
408-
if err != nil {
409-
return
410-
}
411-
412-
stream := corenet.StreamInfo{
413-
Protocol: app.Protocol,
414-
415-
LocalPeer: app.Identity,
416-
LocalAddr: app.Address,
417-
418-
RemotePeer: remote.Conn().RemotePeer(),
419-
RemoteAddr: remote.Conn().RemoteMultiaddr(),
420-
421-
Local: local,
422-
Remote: remote,
423-
424-
Registry: &n.Corenet.Streams,
425-
}
426-
427-
n.Corenet.Streams.Register(&stream)
428-
startStreaming(&stream)
429-
}
430-
431309
var corenetCloseCmd = &cmds.Command{
432310
Helptext: cmds.HelpText{
433311
Tagline: "Closes an active stream listener or client.",
@@ -464,15 +342,15 @@ var corenetCloseCmd = &cmds.Command{
464342

465343
useHandlerID := false
466344

467-
if !closeAll && len(req.Arguments()) == 0 {
468-
res.SetError(errors.New("no handlerID nor stream protocol specified"), cmds.ErrNormal)
469-
return
345+
if !closeAll {
346+
if len(req.Arguments()) == 0 {
347+
res.SetError(errors.New("no handlerID nor stream protocol specified"), cmds.ErrNormal)
348+
return
349+
}
470350

471-
} else if !closeAll {
472351
handlerID, err = strconv.ParseUint(req.Arguments()[0], 10, 64)
473352
if err != nil {
474353
proto = "/app/" + req.Arguments()[0]
475-
476354
} else {
477355
useHandlerID = true
478356
}

corenet/apps.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55

66
ma "gx/ipfs/QmcyqRMCAXVtYPS4DiBrA7sezL9rRGfW8Ctx7cywL4TXJj/go-multiaddr"
77
peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer"
8+
"fmt"
89
)
910

1011
// AppInfo holds information on a local application protocol listener service.
@@ -30,9 +31,9 @@ type AppInfo struct {
3031

3132
// Close closes the listener. Does not affect child streams
3233
func (c *AppInfo) Close() error {
33-
c.Registry.Deregister(c.Protocol)
3434
c.Closer.Close()
35-
return nil
35+
err := c.Registry.Deregister(c.Protocol)
36+
return err
3637
}
3738

3839
// AppRegistry is a collection of local application protocol listeners.
@@ -46,7 +47,7 @@ func (c *AppRegistry) Register(appInfo *AppInfo) {
4647
}
4748

4849
// Deregister deregisters protocol handler from this registry
49-
func (c *AppRegistry) Deregister(proto string) {
50+
func (c *AppRegistry) Deregister(proto string) error {
5051
foundAt := -1
5152
for i, a := range c.Apps {
5253
if a.Protocol == proto {
@@ -57,5 +58,8 @@ func (c *AppRegistry) Deregister(proto string) {
5758

5859
if foundAt != -1 {
5960
c.Apps = append(c.Apps[:foundAt], c.Apps[foundAt+1:]...)
61+
return nil
6062
}
63+
64+
return fmt.Errorf("failed to deregister proto %s", proto)
6165
}

corenet/net/dial.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package net
2+
3+
import (
4+
"errors"
5+
6+
core "github.com/ipfs/go-ipfs/core"
7+
corenet "github.com/ipfs/go-ipfs/corenet"
8+
9+
net "gx/ipfs/QmRscs8KxrSmSv4iuevHv8JfuUzHBMoqiaHzxfDRiksd6e/go-libp2p-net"
10+
peerstore "gx/ipfs/QmXZSd1qR5BxZkPyuwfT5jpqQFScZccoZvDneXsKzCNHWX/go-libp2p-peerstore"
11+
ma "gx/ipfs/QmcyqRMCAXVtYPS4DiBrA7sezL9rRGfW8Ctx7cywL4TXJj/go-multiaddr"
12+
peer "gx/ipfs/QmdS9KpbDyPrieswibZhkod1oXqRwZJrUPzxCofAMWpFGq/go-libp2p-peer"
13+
manet "gx/ipfs/Qmf1Gq7N45Rpuw7ev47uWgH6dLPtdnvcMRNPkVBwqjLJg2/go-multiaddr-net"
14+
)
15+
16+
func Dial(n *core.IpfsNode, addr ma.Multiaddr, peer peer.ID, proto string, bindAddr ma.Multiaddr) (*corenet.AppInfo, error) {
17+
lnet, _, err := manet.DialArgs(bindAddr)
18+
if err != nil {
19+
return nil, err
20+
}
21+
22+
app := corenet.AppInfo{
23+
Identity: n.Identity,
24+
Protocol: proto,
25+
}
26+
27+
n.Peerstore.AddAddr(peer, addr, peerstore.TempAddrTTL)
28+
29+
remote, err := dial(n, peer, proto)
30+
if err != nil {
31+
return nil, err
32+
}
33+
34+
switch lnet {
35+
case "tcp", "tcp4", "tcp6":
36+
listener, err := manet.Listen(bindAddr)
37+
if err != nil {
38+
if err2 := remote.Close(); err2 != nil {
39+
return nil, err2
40+
}
41+
return nil, err
42+
}
43+
44+
app.Address = listener.Multiaddr()
45+
app.Closer = listener
46+
app.Running = true
47+
48+
go doAccept(n, &app, remote, listener)
49+
50+
default:
51+
return nil, errors.New("unsupported protocol: " + lnet)
52+
}
53+
54+
return &app, nil
55+
}
56+
57+
func doAccept(n *core.IpfsNode, app *corenet.AppInfo, remote net.Stream, listener manet.Listener) {
58+
defer listener.Close()
59+
60+
local, err := listener.Accept()
61+
if err != nil {
62+
return
63+
}
64+
65+
stream := corenet.StreamInfo{
66+
Protocol: app.Protocol,
67+
68+
LocalPeer: app.Identity,
69+
LocalAddr: app.Address,
70+
71+
RemotePeer: remote.Conn().RemotePeer(),
72+
RemoteAddr: remote.Conn().RemoteMultiaddr(),
73+
74+
Local: local,
75+
Remote: remote,
76+
77+
Registry: &n.Corenet.Streams,
78+
}
79+
80+
n.Corenet.Streams.Register(&stream)
81+
startStreaming(&stream)
82+
}

0 commit comments

Comments
 (0)