Skip to content

Commit eb45644

Browse files
authored
Merge pull request #4804 from ipfs/feat/coreapi/dht
coreapi: DHT API
2 parents 82204d1 + 86f9eb7 commit eb45644

File tree

9 files changed

+426
-64
lines changed

9 files changed

+426
-64
lines changed

core/coreapi/coreapi.go

+5
Original file line numberDiff line numberDiff line change
@@ -62,3 +62,8 @@ func (api *CoreAPI) Object() coreiface.ObjectAPI {
6262
func (api *CoreAPI) Pin() coreiface.PinAPI {
6363
return (*PinAPI)(api)
6464
}
65+
66+
// Dht returns the DhtAPI interface implementation backed by the go-ipfs node
67+
func (api *CoreAPI) Dht() coreiface.DhtAPI {
68+
return (*DhtAPI)(api)
69+
}

core/coreapi/dht.go

+132
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
package coreapi
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
8+
coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface"
9+
caopts "github.com/ipfs/go-ipfs/core/coreapi/interface/options"
10+
11+
dag "gx/ipfs/QmNr4E8z9bGTztvHJktp7uQaMdx9p3r9Asrq6eYk7iCh4a/go-merkledag"
12+
offline "gx/ipfs/QmPuLWvxK1vg6ckKUpT53Dow9VLCcQGdL5Trwxa8PTLp7r/go-ipfs-exchange-offline"
13+
cidutil "gx/ipfs/QmPyxJ2QS7L5FhGkNYkNcXHGjDhvGHueJ4auqAstFHYxy5/go-cidutil"
14+
blockservice "gx/ipfs/QmQLG22wSEStiociTSKQpZAuuaaWoF1B3iKyjPFvWiTQ77/go-blockservice"
15+
peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer"
16+
routing "gx/ipfs/QmY9JUvS8kbgao3XbPh6WAV3ChE2nxGKhcGTHiwMC4gmcU/go-libp2p-routing"
17+
cid "gx/ipfs/QmZFbDTY9jfSBms2MchvYM9oYRbAF19K7Pby47yDBfpPrb/go-cid"
18+
pstore "gx/ipfs/Qmda4cPRvSRyox3SqgJN6DfSZGU5TtHufPTp9uXjFj71X6/go-libp2p-peerstore"
19+
blockstore "gx/ipfs/Qmeg56ecxRnVv7VWViMrDeEMoBHaNFMs4vQnyQrJ79Zz7i/go-ipfs-blockstore"
20+
)
21+
22+
type DhtAPI CoreAPI
23+
24+
func (api *DhtAPI) FindPeer(ctx context.Context, p peer.ID) (pstore.PeerInfo, error) {
25+
pi, err := api.node.Routing.FindPeer(ctx, peer.ID(p))
26+
if err != nil {
27+
return pstore.PeerInfo{}, err
28+
}
29+
30+
return pi, nil
31+
}
32+
33+
func (api *DhtAPI) FindProviders(ctx context.Context, p coreiface.Path, opts ...caopts.DhtFindProvidersOption) (<-chan pstore.PeerInfo, error) {
34+
settings, err := caopts.DhtFindProvidersOptions(opts...)
35+
if err != nil {
36+
return nil, err
37+
}
38+
39+
rp, err := api.core().ResolvePath(ctx, p)
40+
if err != nil {
41+
return nil, err
42+
}
43+
44+
numProviders := settings.NumProviders
45+
if numProviders < 1 {
46+
return nil, fmt.Errorf("number of providers must be greater than 0")
47+
}
48+
49+
pchan := api.node.Routing.FindProvidersAsync(ctx, rp.Cid(), numProviders)
50+
return pchan, nil
51+
}
52+
53+
func (api *DhtAPI) Provide(ctx context.Context, path coreiface.Path, opts ...caopts.DhtProvideOption) error {
54+
settings, err := caopts.DhtProvideOptions(opts...)
55+
if err != nil {
56+
return err
57+
}
58+
59+
if api.node.Routing == nil {
60+
return errors.New("cannot provide in offline mode")
61+
}
62+
63+
rp, err := api.core().ResolvePath(ctx, path)
64+
if err != nil {
65+
return err
66+
}
67+
68+
c := rp.Cid()
69+
70+
has, err := api.node.Blockstore.Has(c)
71+
if err != nil {
72+
return err
73+
}
74+
75+
if !has {
76+
return fmt.Errorf("block %s not found locally, cannot provide", c)
77+
}
78+
79+
if settings.Recursive {
80+
err = provideKeysRec(ctx, api.node.Routing, api.node.Blockstore, []*cid.Cid{c})
81+
} else {
82+
err = provideKeys(ctx, api.node.Routing, []*cid.Cid{c})
83+
}
84+
if err != nil {
85+
return err
86+
}
87+
88+
return nil
89+
}
90+
91+
func provideKeys(ctx context.Context, r routing.IpfsRouting, cids []*cid.Cid) error {
92+
for _, c := range cids {
93+
err := r.Provide(ctx, c, true)
94+
if err != nil {
95+
return err
96+
}
97+
}
98+
return nil
99+
}
100+
101+
func provideKeysRec(ctx context.Context, r routing.IpfsRouting, bs blockstore.Blockstore, cids []*cid.Cid) error {
102+
provided := cidutil.NewStreamingSet()
103+
104+
errCh := make(chan error)
105+
go func() {
106+
dserv := dag.NewDAGService(blockservice.New(bs, offline.Exchange(bs)))
107+
for _, c := range cids {
108+
err := dag.EnumerateChildrenAsync(ctx, dag.GetLinksDirect(dserv), c, provided.Visitor(ctx))
109+
if err != nil {
110+
errCh <- err
111+
}
112+
}
113+
}()
114+
115+
for {
116+
select {
117+
case k := <-provided.New:
118+
err := r.Provide(ctx, k, true)
119+
if err != nil {
120+
return err
121+
}
122+
case err := <-errCh:
123+
return err
124+
case <-ctx.Done():
125+
return ctx.Err()
126+
}
127+
}
128+
}
129+
130+
func (api *DhtAPI) core() coreiface.CoreAPI {
131+
return (*CoreAPI)(api)
132+
}

core/coreapi/dht_test.go

+109
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package coreapi_test
2+
3+
import (
4+
"context"
5+
"io"
6+
"io/ioutil"
7+
"testing"
8+
9+
"github.com/ipfs/go-ipfs/core/coreapi/interface"
10+
"github.com/ipfs/go-ipfs/core/coreapi/interface/options"
11+
12+
peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer"
13+
blocks "gx/ipfs/QmWAzSEoqZ6xU6pu8yL8e5WaMb7wtbfbhhN4p1DknUPtr3/go-block-format"
14+
)
15+
16+
func TestDhtFindPeer(t *testing.T) {
17+
ctx := context.Background()
18+
nds, apis, err := makeAPISwarm(ctx, true, 5)
19+
if err != nil {
20+
t.Fatal(err)
21+
}
22+
23+
pi, err := apis[2].Dht().FindPeer(ctx, peer.ID(nds[0].Identity))
24+
if err != nil {
25+
t.Fatal(err)
26+
}
27+
28+
if pi.Addrs[0].String() != "/ip4/127.0.0.1/tcp/4001" {
29+
t.Errorf("got unexpected address from FindPeer: %s", pi.Addrs[0].String())
30+
}
31+
32+
pi, err = apis[1].Dht().FindPeer(ctx, peer.ID(nds[2].Identity))
33+
if err != nil {
34+
t.Fatal(err)
35+
}
36+
37+
if pi.Addrs[0].String() != "/ip4/127.0.2.1/tcp/4001" {
38+
t.Errorf("got unexpected address from FindPeer: %s", pi.Addrs[0].String())
39+
}
40+
}
41+
42+
func TestDhtFindProviders(t *testing.T) {
43+
ctx := context.Background()
44+
nds, apis, err := makeAPISwarm(ctx, true, 5)
45+
if err != nil {
46+
t.Fatal(err)
47+
}
48+
49+
p, err := addTestObject(ctx, apis[0])
50+
if err != nil {
51+
t.Fatal(err)
52+
}
53+
54+
out, err := apis[2].Dht().FindProviders(ctx, p, options.Dht.NumProviders(1))
55+
if err != nil {
56+
t.Fatal(err)
57+
}
58+
59+
provider := <-out
60+
61+
if provider.ID.String() != nds[0].Identity.String() {
62+
t.Errorf("got wrong provider: %s != %s", provider.ID.String(), nds[0].Identity.String())
63+
}
64+
}
65+
66+
func TestDhtProvide(t *testing.T) {
67+
ctx := context.Background()
68+
nds, apis, err := makeAPISwarm(ctx, true, 5)
69+
if err != nil {
70+
t.Fatal(err)
71+
}
72+
73+
// TODO: replace once there is local add on unixfs or somewhere
74+
data, err := ioutil.ReadAll(&io.LimitedReader{R: rnd, N: 4092})
75+
if err != nil {
76+
t.Fatal(err)
77+
}
78+
79+
b := blocks.NewBlock(data)
80+
nds[0].Blockstore.Put(b)
81+
p := iface.IpfsPath(b.Cid())
82+
83+
out, err := apis[2].Dht().FindProviders(ctx, p, options.Dht.NumProviders(1))
84+
if err != nil {
85+
t.Fatal(err)
86+
}
87+
88+
provider := <-out
89+
90+
if provider.ID.String() != "<peer.ID >" {
91+
t.Errorf("got wrong provider: %s != %s", provider.ID.String(), nds[0].Identity.String())
92+
}
93+
94+
err = apis[0].Dht().Provide(ctx, p)
95+
if err != nil {
96+
t.Fatal(err)
97+
}
98+
99+
out, err = apis[2].Dht().FindProviders(ctx, p, options.Dht.NumProviders(1))
100+
if err != nil {
101+
t.Fatal(err)
102+
}
103+
104+
provider = <-out
105+
106+
if provider.ID.String() != nds[0].Identity.String() {
107+
t.Errorf("got wrong provider: %s != %s", provider.ID.String(), nds[0].Identity.String())
108+
}
109+
}

core/coreapi/interface/coreapi.go

+3
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ type CoreAPI interface {
3131
// ObjectAPI returns an implementation of Object API
3232
Object() ObjectAPI
3333

34+
// Dht returns an implementation of Dht API
35+
Dht() DhtAPI
36+
3437
// ResolvePath resolves the path using Unixfs resolver
3538
ResolvePath(context.Context, Path) (ResolvedPath, error)
3639

core/coreapi/interface/dht.go

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package iface
2+
3+
import (
4+
"context"
5+
6+
"github.com/ipfs/go-ipfs/core/coreapi/interface/options"
7+
8+
peer "gx/ipfs/QmQsErDt8Qgw1XrsXf2BpEzDgGWtB1YLsTAARBup5b6B9W/go-libp2p-peer"
9+
pstore "gx/ipfs/Qmda4cPRvSRyox3SqgJN6DfSZGU5TtHufPTp9uXjFj71X6/go-libp2p-peerstore"
10+
)
11+
12+
// DhtAPI specifies the interface to the DHT
13+
// Note: This API will likely get deprecated in near future, see
14+
// https://github.com/ipfs/interface-ipfs-core/issues/249 for more context.
15+
type DhtAPI interface {
16+
// FindPeer queries the DHT for all of the multiaddresses associated with a
17+
// Peer ID
18+
FindPeer(context.Context, peer.ID) (pstore.PeerInfo, error)
19+
20+
// FindProviders finds peers in the DHT who can provide a specific value
21+
// given a key.
22+
FindProviders(context.Context, Path, ...options.DhtFindProvidersOption) (<-chan pstore.PeerInfo, error)
23+
24+
// Provide announces to the network that you are providing given values
25+
Provide(context.Context, Path, ...options.DhtProvideOption) error
26+
}

core/coreapi/interface/options/dht.go

+62
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package options
2+
3+
type DhtProvideSettings struct {
4+
Recursive bool
5+
}
6+
7+
type DhtFindProvidersSettings struct {
8+
NumProviders int
9+
}
10+
11+
type DhtProvideOption func(*DhtProvideSettings) error
12+
type DhtFindProvidersOption func(*DhtFindProvidersSettings) error
13+
14+
func DhtProvideOptions(opts ...DhtProvideOption) (*DhtProvideSettings, error) {
15+
options := &DhtProvideSettings{
16+
Recursive: false,
17+
}
18+
19+
for _, opt := range opts {
20+
err := opt(options)
21+
if err != nil {
22+
return nil, err
23+
}
24+
}
25+
return options, nil
26+
}
27+
28+
func DhtFindProvidersOptions(opts ...DhtFindProvidersOption) (*DhtFindProvidersSettings, error) {
29+
options := &DhtFindProvidersSettings{
30+
NumProviders: 20,
31+
}
32+
33+
for _, opt := range opts {
34+
err := opt(options)
35+
if err != nil {
36+
return nil, err
37+
}
38+
}
39+
return options, nil
40+
}
41+
42+
type dhtOpts struct{}
43+
44+
var Dht dhtOpts
45+
46+
// Recursive is an option for Dht.Provide which specifies whether to provide
47+
// the given path recursively
48+
func (dhtOpts) Recursive(recursive bool) DhtProvideOption {
49+
return func(settings *DhtProvideSettings) error {
50+
settings.Recursive = recursive
51+
return nil
52+
}
53+
}
54+
55+
// NumProviders is an option for Dht.FindProviders which specifies the
56+
// number of peers to look for. Default is 20
57+
func (dhtOpts) NumProviders(numProviders int) DhtFindProvidersOption {
58+
return func(settings *DhtFindProvidersSettings) error {
59+
settings.NumProviders = numProviders
60+
return nil
61+
}
62+
}

core/coreapi/name_test.go

+8-4
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,13 @@ func addTestObject(ctx context.Context, api coreiface.CoreAPI) (coreiface.Path,
2121

2222
func TestBasicPublishResolve(t *testing.T) {
2323
ctx := context.Background()
24-
n, api, err := makeAPIIdent(ctx, true)
24+
nds, apis, err := makeAPISwarm(ctx, true, 2)
2525
if err != nil {
2626
t.Fatal(err)
2727
return
2828
}
29+
n := nds[0]
30+
api := apis[0]
2931

3032
p, err := addTestObject(ctx, api)
3133
if err != nil {
@@ -60,11 +62,12 @@ func TestBasicPublishResolve(t *testing.T) {
6062

6163
func TestBasicPublishResolveKey(t *testing.T) {
6264
ctx := context.Background()
63-
_, api, err := makeAPIIdent(ctx, true)
65+
_, apis, err := makeAPISwarm(ctx, true, 2)
6466
if err != nil {
6567
t.Fatal(err)
6668
return
6769
}
70+
api := apis[0]
6871

6972
k, err := api.Key().Generate(ctx, "foo")
7073
if err != nil {
@@ -107,12 +110,13 @@ func TestBasicPublishResolveTimeout(t *testing.T) {
107110
t.Skip("ValidTime doesn't appear to work at this time resolution")
108111

109112
ctx := context.Background()
110-
n, api, err := makeAPIIdent(ctx, true)
113+
nds, apis, err := makeAPISwarm(ctx, true, 2)
111114
if err != nil {
112115
t.Fatal(err)
113116
return
114117
}
115-
118+
n := nds[0]
119+
api := apis[0]
116120
p, err := addTestObject(ctx, api)
117121
if err != nil {
118122
t.Fatal(err)

0 commit comments

Comments
 (0)