diff --git a/README.md b/README.md index 54884a9..6d914c7 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,9 @@ # @libp2p/delegated-content-routing [![libp2p.io](https://img.shields.io/badge/project-libp2p-yellow.svg?style=flat-square)](http://libp2p.io/) -[![IRC](https://img.shields.io/badge/freenode-%23libp2p-yellow.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23libp2p) [![Discuss](https://img.shields.io/discourse/https/discuss.libp2p.io/posts.svg?style=flat-square)](https://discuss.libp2p.io) [![codecov](https://img.shields.io/codecov/c/github/libp2p/js-libp2p-delegated-content-routing.svg?style=flat-square)](https://codecov.io/gh/libp2p/js-libp2p-delegated-content-routing) -[![CI](https://img.shields.io/github/workflow/status/libp2p/js-libp2p-interfaces/test%20&%20maybe%20release/master?style=flat-square)](https://github.com/libp2p/js-libp2p-delegated-content-routing/actions/workflows/js-test-and-release.yml) +[![CI](https://img.shields.io/github/workflow/status/libp2p/js-libp2p-delegated-content-routing/test%20&%20maybe%20release/master?style=flat-square)](https://github.com/libp2p/js-libp2p-delegated-content-routing/actions/workflows/js-test-and-release.yml) > Leverage other peers in the libp2p network to perform Content Routing calls. @@ -37,24 +36,29 @@ npm install ipfs-http-client @libp2p/delegated-content-routing ## Example ```js -import { DelegatedContentRouting } from '@libp2p/delegated-content-routing' -import ipfsHttpClient from 'ipfs-http-client' +import { createLibp2p } from 'libp2p' +import { delegatedContentRouting } from '@libp2p/delegated-content-routing' +import { create as createIpfsHttpClient } from 'ipfs-http-client') // default is to use ipfs.io -const routing = new DelegatedContentRouting(peerId, ipfsHttpClient.create({ +const client = createIpfsHttpClient({ // use default api settings protocol: 'https', port: 443, - host: 'node0.delegate.ipfs.io' // In production you should setup your own delegates -})) -const cid = new CID('QmS4ustL54uo8FzR9455qaxZwuMiUhyvMcX9Ba8nUH4uVv') - -for await (const { id, multiaddrs } of routing.findProviders(cid)) { - console.log('found peer', id, multiaddrs) + host: 'node0.delegate.ipfs.io' +}) + +const node = await createLibp2p({ + peerRouting: [ + delegatedContentRouting(client) + ] + //.. other config +}) +await node.start() + +for await (const provider of node.contentRouting.findProviders('cid')) { + console.log('provider', provider) } - -await routing.provide(cid) -console.log('providing %s', cid.toBaseEncodedString()) ``` ## License diff --git a/package.json b/package.json index d5a1281..c020ee5 100644 --- a/package.json +++ b/package.json @@ -143,18 +143,20 @@ "any-signal": "^3.0.1", "err-code": "^3.0.1", "it-drain": "^1.0.5", + "multiformats": "^10.0.0", "p-defer": "^4.0.0", "p-queue": "^7.2.0" }, "devDependencies": { "aegir": "^37.5.3", - "go-ipfs": "^0.15.0", + "go-ipfs": "^0.16.0", "ipfs-core-types": "^0.12.0", "ipfs-http-client": "^58.0.0", "ipfsd-ctl": "^12.0.2", "it-all": "^1.0.6", "it-drain": "^1.0.5", - "uint8arrays": "^3.0.0", + "timeout-abort-controller": "^3.0.0", + "uint8arrays": "^4.0.2", "wherearewe": "^2.0.1" }, "browser": { diff --git a/src/index.ts b/src/index.ts index e69c634..6ee341d 100644 --- a/src/index.ts +++ b/src/index.ts @@ -4,11 +4,12 @@ import PQueue from 'p-queue' import defer from 'p-defer' import errCode from 'err-code' import anySignal from 'any-signal' -import type { IPFSHTTPClient, CID, HTTPClientExtraOptions } from 'ipfs-http-client' import type { AbortOptions } from 'ipfs-core-types/src/utils' import type { ContentRouting } from '@libp2p/interface-content-routing' import type { PeerInfo } from '@libp2p/interface-peer-info' import type { Startable } from '@libp2p/interfaces/startable' +import type { CID } from 'multiformats/cid' +import type { PeerId } from '@libp2p/interface-peer-id' const log = logger('libp2p:delegated-content-routing') @@ -16,11 +17,125 @@ const DEFAULT_TIMEOUT = 30e3 // 30 second default const CONCURRENT_HTTP_REQUESTS = 4 const CONCURRENT_HTTP_REFS_REQUESTS = 2 +export interface HTTPClientExtraOptions { + headers?: Record + searchParams?: URLSearchParams +} + +export enum EventTypes { + SENDING_QUERY = 0, + PEER_RESPONSE, + FINAL_PEER, + QUERY_ERROR, + PROVIDER, + VALUE, + ADDING_PEER, + DIALING_PEER +} + +/** + * The types of messages set/received during DHT queries + */ +export enum MessageType { + PUT_VALUE = 0, + GET_VALUE, + ADD_PROVIDER, + GET_PROVIDERS, + FIND_NODE, + PING +} + +export type MessageName = keyof typeof MessageType + +export interface DHTRecord { + key: Uint8Array + value: Uint8Array + timeReceived?: Date +} + +export interface SendingQueryEvent { + type: EventTypes.SENDING_QUERY + name: 'SENDING_QUERY' +} + +export interface PeerResponseEvent { + from: PeerId + type: EventTypes.PEER_RESPONSE + name: 'PEER_RESPONSE' + messageType: MessageType + messageName: MessageName + providers: PeerInfo[] + closer: PeerInfo[] + record?: DHTRecord +} + +export interface FinalPeerEvent { + peer: PeerInfo + type: EventTypes.FINAL_PEER + name: 'FINAL_PEER' +} + +export interface QueryErrorEvent { + type: EventTypes.QUERY_ERROR + name: 'QUERY_ERROR' + error: Error +} + +export interface ProviderEvent { + type: EventTypes.PROVIDER + name: 'PROVIDER' + providers: PeerInfo[] +} + +export interface ValueEvent { + type: EventTypes.VALUE + name: 'VALUE' + value: Uint8Array +} + +export interface AddingPeerEvent { + type: EventTypes.ADDING_PEER + name: 'ADDING_PEER' + peer: PeerId +} + +export interface DialingPeerEvent { + peer: PeerId + type: EventTypes.DIALING_PEER + name: 'DIALING_PEER' +} + +export type QueryEvent = SendingQueryEvent | PeerResponseEvent | FinalPeerEvent | QueryErrorEvent | ProviderEvent | ValueEvent | AddingPeerEvent | DialingPeerEvent + +export interface DHTProvideOptions extends AbortOptions { + recursive?: boolean +} + +export interface StatResult { + cid: CID + size: number +} + +export interface Delegate { + getEndpointConfig: () => { protocol: string, host: string, port: string } + + block: { + stat: (cid: CID, options?: AbortOptions) => Promise + } + + dht: { + findProvs: (cid: CID, options?: HTTPClientExtraOptions & AbortOptions) => AsyncIterable + provide: (cid: CID, options?: HTTPClientExtraOptions & DHTProvideOptions) => AsyncIterable + put: (key: string | Uint8Array, value: Uint8Array, options?: HTTPClientExtraOptions & AbortOptions) => AsyncIterable + get: (key: string | Uint8Array, options?: HTTPClientExtraOptions & AbortOptions) => AsyncIterable + } +} + /** * An implementation of content routing, using a delegated peer */ -export class DelegatedContentRouting implements ContentRouting, Startable { - private readonly client: IPFSHTTPClient +class DelegatedContentRouting implements ContentRouting, Startable { + private readonly client: Delegate private readonly httpQueue: PQueue private readonly httpQueueRefs: PQueue private started: boolean @@ -29,7 +144,7 @@ export class DelegatedContentRouting implements ContentRouting, Startable { /** * Create a new DelegatedContentRouting instance */ - constructor (client: IPFSHTTPClient) { + constructor (client: Delegate) { if (client == null) { throw new Error('missing ipfs http client') } @@ -180,3 +295,7 @@ export class DelegatedContentRouting implements ContentRouting, Startable { }) } } + +export function delegatedContentRouting (client: Delegate): (components?: any) => ContentRouting { + return () => new DelegatedContentRouting(client) +} diff --git a/test/index.spec.ts b/test/index.spec.ts index 0f8cd23..d7032eb 100644 --- a/test/index.spec.ts +++ b/test/index.spec.ts @@ -2,18 +2,22 @@ import { expect } from 'aegir/chai' import { Controller, createFactory } from 'ipfsd-ctl' -import { create, CID } from 'ipfs-http-client' +import { create, Options, CID as IPFSCID } from 'ipfs-http-client' import all from 'it-all' import drain from 'it-drain' import { isElectronMain, isNode } from 'wherearewe' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' -import { DelegatedContentRouting } from '../src/index.js' +import { delegatedContentRouting } from '../src/index.js' // @ts-expect-error no types import goIpfs from 'go-ipfs' import pDefer from 'p-defer' +import { CID } from 'multiformats/cid' import type { PeerId } from '@libp2p/interface-peer-id' import type { IDResult } from 'ipfs-core-types/src/root' import type { PeerInfo } from '@libp2p/interface-peer-info' +import { stop } from '@libp2p/interfaces/startable' +import { TimeoutController } from 'timeout-abort-controller' +import type { AbortOptions } from '@libp2p/interfaces' const factory = createFactory({ type: 'go', @@ -46,6 +50,38 @@ async function spawnNode (bootstrap: any[] = []) { } } +function createIpfsClient (opts: Options) { + const client = create(opts) + + return { + getEndpointConfig: () => client.getEndpointConfig(), + block: { + async stat (cid: CID, options?: AbortOptions) { + const result = await client.block.stat(IPFSCID.parse(cid.toString()), options) + + return { + cid: CID.parse(result.cid.toString()), + size: result.size + } + } + }, + dht: { + async * findProvs (cid: CID, options?: AbortOptions) { + yield * client.dht.findProvs(IPFSCID.parse(cid.toString()), options) + }, + async * provide (cid: CID, options?: AbortOptions) { + yield * client.dht.provide(IPFSCID.parse(cid.toString()), options) + }, + async * put (key: string | Uint8Array, value: Uint8Array, options?: AbortOptions) { + yield * client.dht.put(key, value, options) + }, + async * get (key: string | Uint8Array, options?: AbortOptions) { + yield * client.dht.get(key, options) + } + } + } +} + describe('DelegatedContentRouting', function () { this.timeout(20 * 1000) // we're spawning daemons, give ci some time @@ -78,16 +114,16 @@ describe('DelegatedContentRouting', function () { describe('create', () => { it('should require ipfs http client', () => { // @ts-expect-error missing parameters - expect(() => new DelegatedContentRouting()).to.throw() + expect(() => delegatedContentRouting()()).to.throw() }) it('should accept an http api client instance at construction time', () => { - const client = create({ + const client = createIpfsClient({ protocol: 'http', port: 8000, host: 'localhost' }) - const router = new DelegatedContentRouting(client) + const router = delegatedContentRouting(client)() expect(router).to.have.property('client') .that.has.property('getEndpointConfig') @@ -111,18 +147,18 @@ describe('DelegatedContentRouting', function () { selfNode.api.add(data) ]) await Promise.all([ - drain(bootstrapNode.api.dht.provide(cid)), - drain(selfNode.api.dht.provide(cid)) + drain(bootstrapNode.api.dht.provide(IPFSCID.parse(cid.toString()))), + drain(selfNode.api.dht.provide(IPFSCID.parse(cid.toString()))) ]) }) it('should be able to find providers through the delegate node', async function () { const opts = delegateNode.apiAddr.toOptions() - const routing = new DelegatedContentRouting(create({ + const routing = delegatedContentRouting(createIpfsClient({ protocol: 'http', port: opts.port, host: opts.host - })) + }))() const providers = await all(routing.findProviders(cid)) @@ -134,30 +170,33 @@ describe('DelegatedContentRouting', function () { it('should be able to specify a timeout', async () => { const opts = delegateNode.apiAddr.toOptions() - const routing = new DelegatedContentRouting(create({ + const routing = delegatedContentRouting(createIpfsClient({ protocol: 'http', port: opts.port, host: opts.host - })) + }))() + const controller = new TimeoutController(5e3) - const providers = await all(routing.findProviders(cid, { timeout: 5e3 })) + const providers = await all(routing.findProviders(cid, { signal: controller.signal })) expect(providers.map((p) => p.id.toString())).to.include(bootstrapId.id.toString(), 'Did not include bootstrap node') + + controller.clear() }) }) describe('provide', () => { it('should be able to register as a content provider to the delegate node', async () => { const opts = delegateNode.apiAddr.toOptions() - const contentRouter = new DelegatedContentRouting(create({ + const contentRouter = delegatedContentRouting(createIpfsClient({ protocol: 'http', port: opts.port, host: opts.host - })) + }))() const { cid } = await selfNode.api.add(uint8ArrayFromString(`hello-${Math.random()}`)) - await contentRouter.provide(cid) + await contentRouter.provide(CID.parse(cid.toString())) const providers: PeerInfo[] = [] @@ -173,18 +212,18 @@ describe('DelegatedContentRouting', function () { it('should provide non-dag-pb nodes via the delegate node', async () => { const opts = delegateNode.apiAddr.toOptions() - const contentRouter = new DelegatedContentRouting(create({ + const contentRouter = delegatedContentRouting(createIpfsClient({ protocol: 'http', port: opts.port, host: opts.host - })) + }))() const cid = await selfNode.api.dag.put(`hello-${Math.random()}`, { storeCodec: 'dag-cbor', hashAlg: 'sha2-256' }) - await contentRouter.provide(cid) + await contentRouter.provide(CID.parse(cid.toString())) const providers: PeerInfo[] = [] @@ -202,11 +241,11 @@ describe('DelegatedContentRouting', function () { describe('get', () => { it('should get a value', async () => { const opts = delegateNode.apiAddr.toOptions() - const contentRouter = new DelegatedContentRouting(create({ + const contentRouter = delegatedContentRouting(createIpfsClient({ protocol: 'http', port: opts.port, host: opts.host - })) + }))() const cid = await selfNode.api.dag.put(`hello-${Math.random()}`, { storeCodec: 'dag-cbor', @@ -224,11 +263,11 @@ describe('DelegatedContentRouting', function () { describe('put', () => { it('should put a value', async () => { const opts = delegateNode.apiAddr.toOptions() - const contentRouter = new DelegatedContentRouting(create({ + const contentRouter = delegatedContentRouting(createIpfsClient({ protocol: 'http', port: opts.port, host: opts.host - })) + }))() const cid = await selfNode.api.dag.put(`hello-${Math.random()}`, { storeCodec: 'dag-cbor', @@ -259,11 +298,11 @@ describe('DelegatedContentRouting', function () { describe('stop', () => { it('should cancel in-flight requests when stopping', async () => { const opts = delegateNode.apiAddr.toOptions() - const contentRouter = new DelegatedContentRouting(create({ + const contentRouter = delegatedContentRouting(createIpfsClient({ protocol: 'http', port: opts.port, host: opts.host - })) + }))() const deferred = pDefer() // non-existent CID @@ -277,7 +316,7 @@ describe('DelegatedContentRouting', function () { deferred.resolve(err) }) - await contentRouter.stop() + await stop(contentRouter) await expect(deferred.promise).to.eventually.have.property('message').that.matches(/aborted/) }) })