Skip to content
This repository was archived by the owner on Jul 21, 2023. It is now read-only.
Permalink

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: libp2p/js-libp2p-kad-dht
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 9e12b7ca5024576f7bb1ccbbf508b4a69d2e851e
Choose a base ref
...
head repository: libp2p/js-libp2p-kad-dht
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 42eb5ebfb6c5359698af7cec8fcda68411c6115a
Choose a head ref

Commits on Sep 16, 2017

  1. fix #17

    daviddias authored Sep 16, 2017

    Verified

    This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
    Copy the full SHA
    550c0d9 View commit details
  2. docs: fix #17

    daviddias authored Sep 16, 2017
    Copy the full SHA
    0ca9708 View commit details

Commits on Nov 9, 2017

  1. chore: Windows Interop

    richardschneider authored and daviddias committed Nov 9, 2017
    Copy the full SHA
    2d6e499 View commit details
  2. chore: update contributors

    daviddias committed Nov 9, 2017
    Copy the full SHA
    7e1c2da View commit details
  3. Copy the full SHA
    c53baa0 View commit details
  4. chore: gitignore

    daviddias committed Nov 9, 2017
    Copy the full SHA
    d4f71a9 View commit details

Commits on Jan 30, 2018

  1. Copy the full SHA
    ff87f4b View commit details
  2. Updating CI files (#21)

    This commit updates all CI scripts to the latest version
    victorb authored and daviddias committed Jan 30, 2018
    Copy the full SHA
    d3ee20e View commit details
  3. chore: update deps

    daviddias committed Jan 30, 2018
    Copy the full SHA
    1e4f8f6 View commit details
  4. chore: update contributors

    daviddias committed Jan 30, 2018
    Copy the full SHA
    34307bf View commit details
  5. Copy the full SHA
    8b49eb4 View commit details
  6. chore: update contributors

    daviddias committed Jan 30, 2018
    Copy the full SHA
    81f8b57 View commit details
  7. Copy the full SHA
    5d15733 View commit details
  8. chore: update contributors

    daviddias committed Jan 30, 2018
    Copy the full SHA
    86bf57f View commit details
  9. Copy the full SHA
    a2a5c2e View commit details

Commits on Feb 7, 2018

  1. feat: use libp2p-switch

    daviddias committed Feb 7, 2018
    Copy the full SHA
    054e5e5 View commit details
  2. chore: update contributors

    daviddias committed Feb 7, 2018
    Copy the full SHA
    4687dfa View commit details
  3. Copy the full SHA
    800677d View commit details
  4. chore: update contributors

    daviddias committed Feb 7, 2018
    Copy the full SHA
    02370eb View commit details
  5. Copy the full SHA
    7a906d2 View commit details

Commits on Mar 15, 2018

  1. docs: update README

    daviddias committed Mar 15, 2018
    Copy the full SHA
    a575dc7 View commit details
  2. chore: update deps

    daviddias committed Mar 15, 2018
    Copy the full SHA
    954d6d6 View commit details
  3. chore: update deps

    daviddias committed Mar 15, 2018
    Copy the full SHA
    57bdc84 View commit details
  4. Copy the full SHA
    e98cc85 View commit details
  5. Copy the full SHA
    412f8d5 View commit details
  6. chore: update CI files

    daviddias committed Mar 15, 2018
    Copy the full SHA
    a910f02 View commit details
  7. chore: linting

    daviddias committed Mar 15, 2018
    Copy the full SHA
    35c2c53 View commit details
  8. chore: appveyor no more

    daviddias committed Mar 15, 2018
    Copy the full SHA
    b99e5a7 View commit details
  9. Copy the full SHA
    be12317 View commit details
  10. Copy the full SHA
    b8e0f72 View commit details
  11. Copy the full SHA
    188d7f9 View commit details
  12. chore: update npm scripts

    daviddias committed Mar 15, 2018
    Copy the full SHA
    3a9b485 View commit details
  13. chore: update contributors

    daviddias committed Mar 15, 2018
    Copy the full SHA
    3ed9783 View commit details
  14. Copy the full SHA
    34424ba View commit details

Commits on Apr 5, 2018

  1. chore: update deps

    daviddias committed Apr 5, 2018
    Copy the full SHA
    c6bdcba View commit details
  2. chore: update contributors

    daviddias committed Apr 5, 2018
    Copy the full SHA
    acff75e View commit details
  3. Copy the full SHA
    45f3703 View commit details

Commits on Jun 4, 2018

  1. docs: remove mentions of coral, it's not implemented

    Lars Gierth authored and daviddias committed Jun 4, 2018
    Copy the full SHA
    7a54bd3 View commit details

Commits on Jun 25, 2018

  1. docs: add lead-maintainer

    * docs: add lead-maintainer
    vasco-santos authored Jun 25, 2018
    Copy the full SHA
    cc08abb View commit details

Commits on Jul 13, 2018

  1. chore: update deps

    jacobheun authored and daviddias committed Jul 13, 2018
    Copy the full SHA
    b64d6f0 View commit details
  2. Copy the full SHA
    7a92139 View commit details
  3. chore: remove travis and circle

    jacobheun authored and daviddias committed Jul 13, 2018
    Copy the full SHA
    493be7c View commit details
  4. Copy the full SHA
    a5b4b71 View commit details
  5. Copy the full SHA
    51d425b View commit details
  6. chore: update contributors

    daviddias committed Jul 13, 2018
    Copy the full SHA
    4113d87 View commit details
  7. Copy the full SHA
    012d2c3 View commit details

Commits on Aug 27, 2018

  1. Make logs better for users and fix flaky test (#33)

    * fix: add a b58 formatter for logging and update logs
    jacobheun authored and vasco-santos committed Aug 27, 2018
    Copy the full SHA
    bb5f9af View commit details

Commits on Aug 29, 2018

  1. Copy the full SHA
    fcdb01d View commit details
  2. Copy the full SHA
    f41e3d0 View commit details
  3. Copy the full SHA
    edab8a3 View commit details
Showing with 6,817 additions and 3,460 deletions.
  1. +4 −0 .aegir.js
  2. +26 −26 .travis.yml
  3. +601 −0 CHANGELOG.md
  4. +28 −2 README.md
  5. +0 −14 circle.yml
  6. +67 −0 docs/IMPL_SUMMARY.MD
  7. +74 −53 package.json
  8. +12 −3 src/constants.js
  9. +281 −0 src/content-fetching/index.js
  10. +158 −0 src/content-routing/index.js
  11. +0 −13 src/errors.js
  12. +371 −398 src/index.js
  13. +1 −0 src/message/dht.proto.js
  14. +10 −23 src/message/index.js
  15. +80 −117 src/network.js
  16. +12 −12 src/{peer-list.js → peer-list/index.js}
  17. +5 −5 src/{ → peer-list}/limited-peer-list.js
  18. +88 −0 src/peer-list/peer-distance-list.js
  19. +20 −36 src/{ → peer-list}/peer-queue.js
  20. +274 −0 src/peer-routing/index.js
  21. +0 −638 src/private.js
  22. +130 −165 src/providers.js
  23. +52 −0 src/query-manager.js
  24. +0 −193 src/query.js
  25. +118 −0 src/query/index.js
  26. +84 −0 src/query/path.js
  27. +202 −0 src/query/run.js
  28. +254 −0 src/query/workerQueue.js
  29. +184 −0 src/random-walk.js
  30. +31 −50 src/routing.js
  31. +23 −14 src/rpc/handlers/add-provider.js
  32. +19 −25 src/rpc/handlers/find-node.js
  33. +26 −45 src/rpc/handlers/get-providers.js
  34. +32 −38 src/rpc/handlers/get-value.js
  35. +1 −1 src/rpc/handlers/index.js
  36. +6 −7 src/rpc/handlers/ping.js
  37. +15 −24 src/rpc/handlers/put-value.js
  38. +50 −69 src/rpc/index.js
  39. +76 −47 src/utils.js
  40. +785 −412 test/kad-dht.spec.js
  41. +47 −50 test/{utils.spec.js → kad-utils.spec.js}
  42. +18 −20 test/limited-peer-list.spec.js
  43. +54 −77 test/message.spec.js
  44. +110 −0 test/multiple-nodes.spec.js
  45. +106 −74 test/network.spec.js
  46. +116 −0 test/peer-distance-list.spec.js
  47. +8 −14 test/peer-list.spec.js
  48. +31 −39 test/peer-queue.spec.js
  49. +107 −123 test/providers.spec.js
  50. +641 −63 test/query.spec.js
  51. +163 −0 test/query/index.spec.js
  52. +280 −0 test/random-walk.spec.js
  53. +54 −106 test/routing.spec.js
  54. +66 −77 test/rpc/handlers/add-provider.spec.js
  55. +34 −52 test/rpc/handlers/find-node.spec.js
  56. +46 −62 test/rpc/handlers/get-providers.spec.js
  57. +56 −86 test/rpc/handlers/get-value.spec.js
  58. +17 −23 test/rpc/handlers/ping.spec.js
  59. +36 −40 test/rpc/handlers/put-value.spec.js
  60. +47 −53 test/rpc/index.spec.js
  61. +12 −0 test/simulation/README.md
  62. +231 −0 test/simulation/index.js
  63. +66 −0 test/utils/create-disjoint-tracks.js
  64. +19 −0 test/utils/create-peer-id.js
  65. +20 −0 test/utils/create-values.js
  66. +70 −71 test/utils/index.js
  67. +147 −0 test/utils/test-dht.js
  68. +15 −0 test/utils/to-buffer.js
4 changes: 4 additions & 0 deletions .aegir.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
module.exports = {
bundlesize: { maxSize: '222kB' }
}

52 changes: 26 additions & 26 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,32 +1,32 @@
sudo: false
language: node_js
cache: npm

matrix:
include:
- node_js: 6
env: CXX=g++-4.8
- node_js: 8
env: CXX=g++-4.8
# - node_js: stable
# env: CXX=g++-4.8
stages:
- check
- test
- cov

node_js:
- '10'

script:
- npm run lint
- npm run test
- npm run coverage
- make test
os:
- linux
- osx

before_script:
- export DISPLAY=:99.0
- sh -e /etc/init.d/xvfb start
script: npx nyc -s npm run test:node -- --bail
after_success: npx nyc report --reporter=text-lcov > coverage.lcov && npx codecov

jobs:
include:
- os: windows
filter_secrets: false
cache: false

after_success:
- npm run coverage-publish
- stage: check
script:
- npx aegir build --bundlesize
- npx aegir dep-check
- npm run lint

addons:
firefox: 'latest'
apt:
sources:
- ubuntu-toolchain-r-test
packages:
- g++-4.8
notifications:
email: false
601 changes: 601 additions & 0 deletions CHANGELOG.md

Large diffs are not rendered by default.

30 changes: 28 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -3,16 +3,22 @@
[![](https://img.shields.io/badge/made%20by-Protocol%20Labs-blue.svg?style=flat-square)](http://ipn.io)
[![](https://img.shields.io/badge/project-IPFS-blue.svg?style=flat-square)](http://ipfs.io/)
[![](https://img.shields.io/badge/freenode-%23ipfs-blue.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23ipfs)
[![Discourse posts](https://img.shields.io/discourse/https/discuss.libp2p.io/posts.svg)](https://discuss.libp2p.io)
[![Build Status](https://travis-ci.org/libp2p/js-libp2p-kad-dht.svg?style=flat-square)](https://travis-ci.org/libp2p/js-libp2p-kad-dht)
[![Coverage Status](https://coveralls.io/repos/github/libp2p/js-libp2p-kad-dht/badge.svg?branch=master)](https://coveralls.io/github/libp2p/js-libp2p-kad-dht?branch=master)
[![Dependency Status](https://david-dm.org/libp2p/js-libp2p-kad-dht.svg?style=flat-square)](https://david-dm.org/libp2p/js-libp2p-kad-dht)
[![Bundle Size](https://flat.badgen.net/bundlephobia/minzip/libp2p-kad-dht)](https://bundlephobia.com/result?p=libp2p-kad-dht)
[![js-standard-style](https://img.shields.io/badge/code%20style-standard-brightgreen.svg?style=flat-square)](https://github.com/feross/standard)
[![standard-readme compliant](https://img.shields.io/badge/standard--readme-OK-green.svg?style=flat-square)](https://github.com/RichardLitt/standard-readme)
![](https://img.shields.io/badge/npm-%3E%3D3.0.0-orange.svg?style=flat-square)
![](https://img.shields.io/badge/Node.js-%3E%3D6.0.0-orange.svg?style=flat-square)

> JavaScript implementation of the Kademlia DHT for libp2p, based on [go-libp2p-kad-dht](https://github.com/libp2p/go-libp2p-kad-dht).
## Lead Maintainer

[Vasco Santos](https://github.com/vasco-santos).

## Table of Contents

- [Install](#install)
@@ -27,7 +33,7 @@
### npm

```sh
> npm i libp2p-dht
> npm i libp2p-kad-dht
```

### Use in Node.js
@@ -38,7 +44,27 @@ const KadDHT = require('libp2p-kad-dht')

## API

See https://libp2p.github.io/js-libp2p-kad-dht
See https://libp2p.github.io/js-libp2p-kad-dht for the auto generated docs.

The libp2p-kad-dht module offers 3 APIs: Peer Routing, Content Routing and Peer Discovery.

### Peer Routing

[![](https://raw.githubusercontent.com/libp2p/js-libp2p-interfaces/master/src/peer-routing/img/badge.png)](https://github.com/libp2p/js-libp2p-interfaces/tree/master/src/peer-routing)

### Content Routing

[![](https://raw.githubusercontent.com/libp2p/js-libp2p-interfaces/master/src/content-routing/img/badge.png)](https://github.com/libp2p/js-libp2p-interfaces/tree/master/src/content-routing)

### Peer Discovery

[![](https://raw.githubusercontent.com/libp2p/js-libp2p-interfaces/master/src/peer-discovery/img/badge.png)](https://github.com/libp2p/js-libp2p-interfaces/tree/master/src/peer-discovery)

`libp2p-kad-dht` provides a discovery service called `Random Walk` (random walks on the DHT to discover more nodes). It is accessible through `dht.randomWalk` and exposes the [Peer Discovery interface](https://github.com/libp2p/js-libp2p-interfaces/tree/master/src/peer-discovery).

### Implementation Summary

A [summary](docs/IMPL_SUMMARY.MD) of the algorithms and API for this implementation of Kademlia.

## Contribute

14 changes: 0 additions & 14 deletions circle.yml

This file was deleted.

67 changes: 67 additions & 0 deletions docs/IMPL_SUMMARY.MD
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# js-libp2p-kad-dht

js-libp2p-kad-dht is a JavaScript implementation of the [Kademlia DHT](http://www.scs.stanford.edu/%7Edm/home/papers/kpos.pdf) with some features of S/Kademlia. A "provider" node uses the DHT to advertise that it has a particular piece of content, and "querying" nodes will search the DHT for peers that have a particular piece of content. Content is modeled as a value that is identified by a key, where the key and value are Buffers.

#### DHT Identifiers

The DHT uses a sha2-256 hash for identifiers:
- For peers the DHT identifier is the hash of the [PeerId][PeerId]
- For content the DHT identifier is the hash of the key (eg a Block CID)

#### FIND_NODE

`findPeer (PeerId):` [PeerInfo][PeerInfo]

The address space is so large (256 bits) that there are big gaps between DHT ids, and nodes frequently join and leave the DHT.

To find a particular node
- the `querying node` converts the [PeerId][PeerId] to a DHT id
- the `querying node` sends a request to the nearest peers to that DHT id that it knows about
- those peers respond with the nearest peers to the DHT id that they know about
- the `querying node` sorts the responses and recursively queries the closest peers to the DHT id, continuing until it finds the node or it has queried all the closest peers.

#### PUT

`put (Key, Value)`

To store a value in the DHT, the `provider node`
- converts the key to a DHT id
- follows the "closest peers" algorithm as above to find the nearest peers to the DHT id
- sends the value to those nearest peers

Note that DHT nodes will only store values that are accepted by its "validators", configurable functions that validate the key/value to ensure the node can control what kind of content it stores (eg IPNS records).

#### GET

`get (Key): [Value]`

To retrieve a value from the DHT
- the `querying node` converts the key to a DHT id
- the `querying node` follows the "closest peers" algorithm to find the nearest peers to the DHT id
- at each iteration of the algorithm, if the peer has the value it responds with the value itself in addition to closer peers.

Note that the value for a particular key is stored by many nodes, and these nodes receive `PUT` requests asynchronously, so it's possible that nodes may have distinct values for the same key. For example if node A `PUT`s the value `hello` to key `greeting` and node B concurrently `PUT`s the value `bonjour` to key `greeting`, some nodes close to the key `greeting` may receive `hello` first and others may receive `bonjour` first.

Therefore a `GET` request to the DHT may collect distinct values (eg `hello` and `bonjour`) for a particular key from the nodes close to the key. The DHT has "selectors", configurable functions that choose the "best" value (for example IPNS records include a sequence number, and the "best" value is the record with the highest sequence number).

#### PROVIDE

`provide (Key)`

To advertise that it has the content for a particular key
- the `provider node` converts the key to a DHT id
- the `provider node` follows the "closest peers" algorithm to find the nearest peers to the DHT id
- the `provider node` sends a "provide" message to each of the nearest peers
- each of the nearest peers saves the association between the "provider" peer and the key

#### FIND_PROVIDERS

`findProviders (Key):` [[PeerInfo][PeerInfo]]

To find providers for a particular key
- the `querying node` converts the key to a DHT id
- the `querying node` follows the "closest peers" algorithm to find the nearest peers to the DHT id
- at each iteration of the algorithm, if the peer knows which nodes are providing the value it responds with the provider nodes in addition to closer peers.

[PeerId]: https://github.com/libp2p/js-peer-id
[PeerInfo]: https://github.com/libp2p/js-peer-info
127 changes: 74 additions & 53 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,27 +1,24 @@
{
"name": "libp2p-kad-dht",
"version": "0.5.1",
"version": "0.19.0",
"description": "JavaScript implementation of the Kad-DHT for libp2p",
"leadMaintainer": "Vasco Santos <vasco.santos@moxy.studio>",
"main": "src/index.js",
"scripts": {
"lint": "aegir-lint",
"test": "aegir-test --env node",
"test:node": "aegir-test node",
"test:browser": "aegir-test browser",
"build": "aegir-build",
"docs": "aegir-docs",
"release": "aegir-release --docs --env node",
"release-minor": "aegir-release --type minor --docs --env node",
"release-major": "aegir-release --type major --docs --env node",
"coverage": "aegir-coverage",
"coverage-publish": "aegir-coverage publish"
"lint": "aegir lint",
"test": "aegir test -t node",
"test:node": "aegir test -t node",
"build": "aegir build",
"docs": "aegir docs",
"release": "aegir release --docs -t node",
"release-minor": "aegir release --type minor --docs -t node",
"release-major": "aegir release --type major --docs -t node",
"coverage": "aegir coverage",
"coverage-publish": "aegir-coverage publish",
"sim": "node test/simulation/index.js"
},
"browser": {
"./test/nodejs-bundle": "./test/browser-bundle"
},
"pre-commit": [
"lint",
"test"
"pre-push": [
"lint"
],
"repository": {
"type": "git",
@@ -30,57 +27,81 @@
"keywords": [
"IPFS"
],
"author": "Friedel Ziegelmayer <dignifiedquire@gmail.com>",
"license": "MIT",
"bugs": {
"url": "https://github.com/libp2p/js-libp2p-kad-dht/issues"
},
"engines": {
"node": ">=6.0.0",
"npm": ">=3.0.0"
"node": ">=10.0.0",
"npm": ">=6.0.0"
},
"homepage": "https://github.com/libp2p/js-libp2p-kad-dht",
"dependencies": {
"async": "^2.5.0",
"base32.js": "^0.1.0",
"cids": "~0.5.1",
"debug": "^3.0.1",
"hashlru": "^2.2.0",
"heap": "^0.2.6",
"interface-datastore": "^0.3.0",
"k-bucket": "^3.3.0",
"libp2p-crypto": "~0.10.3",
"libp2p-record": "~0.5.1",
"multihashing-async": "~0.4.6",
"peer-id": "~0.10.1",
"peer-info": "~0.11.0",
"priorityqueue": "^0.2.0",
"protons": "^1.0.0",
"pull-length-prefixed": "^1.3.0",
"pull-stream": "^3.6.0",
"safe-buffer": "^5.1.1",
"abort-controller": "^3.0.0",
"async": "^2.6.2",
"base32.js": "~0.1.0",
"cids": "~0.8.0",
"debug": "^4.1.1",
"err-code": "^2.0.0",
"hashlru": "^2.3.0",
"heap": "~0.2.6",
"interface-datastore": "~0.8.0",
"it-length-prefixed": "^3.0.0",
"it-pipe": "^1.1.0",
"k-bucket": "^5.0.0",
"libp2p-crypto": "~0.17.1",
"libp2p-interfaces": "^0.3.0",
"libp2p-record": "~0.7.0",
"multiaddr": "^7.4.3",
"multihashes": "~0.4.15",
"multihashing-async": "~0.8.0",
"p-filter": "^2.1.0",
"p-map": "^4.0.0",
"p-queue": "^6.2.1",
"p-timeout": "^3.2.0",
"p-times": "^2.1.0",
"peer-id": "~0.13.5",
"promise-to-callback": "^1.0.0",
"protons": "^1.0.1",
"streaming-iterables": "^4.1.1",
"varint": "^5.0.0",
"xor-distance": "^1.0.0"
"xor-distance": "^2.0.0"
},
"devDependencies": {
"aegir": "^11.0.2",
"chai": "^4.1.2",
"datastore-level": "^0.6.0",
"aegir": "^21.0.2",
"async-iterator-all": "^1.0.0",
"chai": "^4.2.0",
"chai-checkmark": "^1.0.1",
"datastore-level": "~0.14.1",
"delay": "^4.3.0",
"dirty-chai": "^2.0.1",
"interface-connection": "~0.3.2",
"libp2p-multiplex": "~0.5.0",
"libp2p-swarm": "~0.32.3",
"libp2p-tcp": "~0.11.0",
"lodash": "^4.17.4",
"it-pair": "^1.0.0",
"libp2p": "libp2p/js-libp2p#0.28.x",
"lodash": "^4.17.11",
"lodash.random": "^3.2.0",
"lodash.range": "^3.2.0",
"peer-book": "~0.5.0",
"pre-commit": "^1.2.2",
"safe-buffer": "^5.1.1"
"p-defer": "^3.0.0",
"p-each-series": "^2.1.0",
"p-map-series": "^2.1.0",
"p-retry": "^4.2.0",
"sinon": "^9.0.0"
},
"contributors": [
"Vasco Santos <vasco.santos@moxy.studio>",
"David Dias <daviddias.p@gmail.com>",
"Jacob Heun <jacobheun@gmail.com>",
"dirkmc <dirkmdev@gmail.com>",
"Alan Shaw <alan.shaw@protocol.ai>",
"Friedel Ziegelmayer <dignifiedquire@gmail.com>",
"Pedro Teixeira <i@pgte.me>"
"Richard Schneider <makaretu@gmail.com>",
"Blake Byrnes <blakebyrnes@gmail.com>",
"John Hiesey <jhiesey@cs.stanford.edu>",
"Hugo Dias <hugomrdias@gmail.com>",
"ᴠɪᴄᴛᴏʀ ʙᴊᴇʟᴋʜᴏʟᴍ <victorbjelkholm@gmail.com>",
"Alex Potsides <alex@achingbrain.net>",
"Lars Gierth <larsg@systemli.org>",
"Marcus Bernales <mboperator@gmail.com>",
"Pedro Teixeira <i@pgte.me>",
"Thomas Eizinger <thomas@eizinger.io>"
]
}
}
15 changes: 12 additions & 3 deletions src/constants.js
Original file line number Diff line number Diff line change
@@ -23,12 +23,21 @@ exports.PROVIDERS_VALIDITY = 24 * hour

exports.PROVIDERS_CLEANUP_INTERVAL = hour

exports.READ_MESSAGE_TIMEOUT = minute
exports.READ_MESSAGE_TIMEOUT = 10 * second

// K is the maximum number of requests to perform before returning failue
// The number of records that will be retrieved on a call to getMany()
exports.GET_MANY_RECORD_COUNT = 16

// K is the maximum number of requests to perform before returning failure
exports.K = 20

// Alpha is the concurrency for asynchronous requests
exports.ALPHA = 3

exports.maxMessageSize = 2 << 22 // 4MB
exports.defaultRandomWalk = {
enabled: true,
queriesPerPeriod: 1,
interval: 5 * minute,
timeout: 10 * second,
delay: 10 * second
}
281 changes: 281 additions & 0 deletions src/content-fetching/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
'use strict'

const errcode = require('err-code')
const pTimeout = require('p-timeout')

const libp2pRecord = require('libp2p-record')

const c = require('../constants')
const Query = require('../query')

const utils = require('../utils')

const Record = libp2pRecord.Record

module.exports = (dht) => {
const putLocal = async (key, rec) => { // eslint-disable-line require-await
return dht.datastore.put(utils.bufferToKey(key), rec)
}

/**
* Attempt to retrieve the value for the given key from
* the local datastore.
*
* @param {Buffer} key
* @returns {Promise<Record>}
*
* @private
*/
const getLocal = async (key) => {
dht._log('getLocal %b', key)

const raw = await dht.datastore.get(utils.bufferToKey(key))
dht._log('found %b in local datastore', key)
const rec = Record.deserialize(raw)

await dht._verifyRecordLocally(rec)
return rec
}

/**
* Send the best record found to any peers that have an out of date record.
*
* @param {Buffer} key
* @param {Array<Object>} vals - values retrieved from the DHT
* @param {Object} best - the best record that was found
* @returns {Promise}
*
* @private
*/
const sendCorrectionRecord = async (key, vals, best) => {
const fixupRec = await utils.createPutRecord(key, best)

return Promise.all(vals.map(async (v) => {
// no need to do anything
if (v.val.equals(best)) {
return
}

// correct ourself
if (dht._isSelf(v.from)) {
try {
await dht._putLocal(key, fixupRec)
} catch (err) {
dht._log.error('Failed error correcting self', err)
}
return
}

// send correction
try {
await dht._putValueToPeer(key, fixupRec, v.from)
} catch (err) {
dht._log.error('Failed error correcting entry', err)
}
}))
}

return {
/**
* Store the given key/value pair locally, in the datastore.
* @param {Buffer} key
* @param {Buffer} rec - encoded record
* @returns {Promise<void>}
* @private
*/
async _putLocal (key, rec) { // eslint-disable-line require-await
return putLocal(key, rec)
},

/**
* Store the given key/value pair in the DHT.
*
* @param {Buffer} key
* @param {Buffer} value
* @param {Object} [options] - put options
* @param {number} [options.minPeers] - minimum number of peers required to successfully put (default: closestPeers.length)
* @returns {Promise<void>}
*/
async put (key, value, options = {}) {
dht._log('PutValue %b', key)

// create record in the dht format
const record = await utils.createPutRecord(key, value)

// store the record locally
await putLocal(key, record)

// put record to the closest peers
let counterAll = 0
let counterSuccess = 0

for await (const peer of dht.getClosestPeers(key, { shallow: true })) {
try {
counterAll += 1
await dht._putValueToPeer(key, record, peer)
counterSuccess += 1
} catch (err) {
dht._log.error('Failed to put to peer (%b): %s', peer.id, err)
}
}

// verify if we were able to put to enough peers
const minPeers = options.minPeers || counterAll // Ensure we have a default `minPeers`

if (minPeers > counterSuccess) {
const error = errcode(new Error(`Failed to put value to enough peers: ${counterSuccess}/${minPeers}`), 'ERR_NOT_ENOUGH_PUT_PEERS')
dht._log.error(error)
throw error
}
},

/**
* Get the value to the given key.
* Times out after 1 minute by default.
*
* @param {Buffer} key
* @param {Object} [options] - get options
* @param {number} [options.timeout] - optional timeout (default: 60000)
* @returns {Promise<Buffer>}
*/
async get (key, options = {}) {
options.timeout = options.timeout || c.minute

dht._log('_get %b', key)

const vals = await dht.getMany(key, c.GET_MANY_RECORD_COUNT, options)
const recs = vals.map((v) => v.val)
let i = 0

try {
i = libp2pRecord.selection.bestRecord(dht.selectors, key, recs)
} catch (err) {
// Assume the first record if no selector available
if (err.code !== 'ERR_NO_SELECTOR_FUNCTION_FOR_RECORD_KEY') {
throw err
}
}

const best = recs[i]
dht._log('GetValue %b %s', key, best)

if (!best) {
throw errcode(new Error('best value was not found'), 'ERR_NOT_FOUND')
}

await sendCorrectionRecord(key, vals, best)

return best
},

/**
* Get the `n` values to the given key without sorting.
*
* @param {Buffer} key
* @param {number} nvals
* @param {Object} [options] - get options
* @param {number} [options.timeout] - optional timeout (default: 60000)
* @returns {Promise<Array<{from: PeerId, val: Buffer}>>}
*/
async getMany (key, nvals, options = {}) {
options.timeout = options.timeout || c.minute

dht._log('getMany %b (%s)', key, nvals)

let vals = []
let localRec

try {
localRec = await getLocal(key)
} catch (err) {
if (nvals === 0) {
throw err
}
}

if (localRec) {
vals.push({
val: localRec.value,
from: dht.peerId
})
}

if (vals.length >= nvals) {
return vals
}

const paths = []
const id = await utils.convertBuffer(key)
const rtp = dht.routingTable.closestPeers(id, this.kBucketSize)

dht._log('peers in rt: %d', rtp.length)

if (rtp.length === 0) {
const errMsg = 'Failed to lookup key! No peers from routing table!'

dht._log.error(errMsg)
if (vals.length === 0) {
throw errcode(new Error(errMsg), 'ERR_NO_PEERS_IN_ROUTING_TABLE')
}
return vals
}

// we have peers, lets do the actual query to them
const query = new Query(dht, key, (pathIndex, numPaths) => {
// This function body runs once per disjoint path
const pathSize = utils.pathSize(nvals - vals.length, numPaths)
const pathVals = []
paths.push(pathVals)

// Here we return the query function to use on this particular disjoint path
return async (peer) => {
let rec, peers, lookupErr
try {
const results = await dht._getValueOrPeers(peer, key)
rec = results.record
peers = results.peers
} catch (err) {
// If we have an invalid record we just want to continue and fetch a new one.
if (err.code !== 'ERR_INVALID_RECORD') {
throw err
}
lookupErr = err
}

const res = { closerPeers: peers }

if ((rec && rec.value) || lookupErr) {
pathVals.push({
val: rec && rec.value,
from: peer
})
}

// enough is enough
if (pathVals.length >= pathSize) {
res.pathComplete = true
}

return res
}
})

let error
try {
await pTimeout(query.run(rtp), options.timeout)
} catch (err) {
error = err
}
query.stop()

// combine vals from each path
vals = [].concat.apply(vals, paths).slice(0, nvals)

if (error && vals.length === 0) {
throw error
}

return vals
}
}
}
158 changes: 158 additions & 0 deletions src/content-routing/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
'use strict'

const errcode = require('err-code')
const pTimeout = require('p-timeout')

const c = require('../constants')
const LimitedPeerList = require('../peer-list/limited-peer-list')
const Message = require('../message')
const Query = require('../query')
const utils = require('../utils')

module.exports = (dht) => {
/**
* Check for providers from a single node.
*
* @param {PeerId} peer
* @param {CID} key
* @returns {Promise<Message>}
*
* @private
*/
const findProvidersSingle = async (peer, key) => { // eslint-disable-line require-await
const msg = new Message(Message.TYPES.GET_PROVIDERS, key.buffer, 0)
return dht.network.sendRequest(peer, msg)
}

return {
/**
* Announce to the network that we can provide given key's value.
* @param {CID} key
* @returns {Promise<void>}
*/
async provide (key) {
dht._log('provide: %s', key.toBaseEncodedString())

const errors = []

// Add peer as provider
await dht.providers.addProvider(key, dht.peerId)

const msg = new Message(Message.TYPES.ADD_PROVIDER, key.buffer, 0)
msg.providerPeers = [{
id: dht.peerId
}]

// Notify closest peers
for await (const peer of dht.getClosestPeers(key.buffer)) {
dht._log('putProvider %s to %s', key.toBaseEncodedString(), peer.toB58String())
try {
await dht.network.sendMessage(peer, msg)
} catch (err) {
errors.push(err)
}
}

if (errors.length) {
// TODO:
// This should be infrequent. This means a peer we previously connected
// to failed to exchange the provide message. If getClosestPeers was an
// iterator, we could continue to pull until we announce to kBucketSize peers.
throw errcode(new Error(`Failed to provide to ${errors.length} of ${dht.kBucketSize} peers`, 'ERR_SOME_PROVIDES_FAILED'), { errors })
}
},

/**
* Search the dht for up to `K` providers of the given CID.
* @param {CID} key
* @param {Object} options - findProviders options
* @param {number} options.timeout - how long the query should maximally run, in milliseconds (default: 60000)
* @param {number} options.maxNumProviders - maximum number of providers to find
* @returns {AsyncIterable<{ id: PeerId, multiaddrs: Multiaddr[] }>}
*/
async * findProviders (key, options = {}) {
const providerTimeout = options.timeout || c.minute
const n = options.maxNumProviders || c.K

dht._log('findProviders %s', key.toBaseEncodedString())

const out = new LimitedPeerList(n)
const provs = await dht.providers.getProviders(key)

provs.forEach((id) => {
const peerData = dht.peerStore.get(id) || {}
out.push({
id: peerData.id || id,
multiaddrs: (peerData.multiaddrInfos || []).map((mi) => mi.multiaddr)
})
})

// All done
if (out.length >= n) {
// yield values
for (const pData of out.toArray()) {
yield pData
}
return
}

// need more, query the network
const paths = []
const query = new Query(dht, key.buffer, (pathIndex, numPaths) => {
// This function body runs once per disjoint path
const pathSize = utils.pathSize(n - out.length, numPaths)
const pathProviders = new LimitedPeerList(pathSize)
paths.push(pathProviders)

// Here we return the query function to use on this particular disjoint path
return async (peer) => {
const msg = await findProvidersSingle(peer, key)
const provs = msg.providerPeers
dht._log('(%s) found %s provider entries', dht.peerId.toB58String(), provs.length)

provs.forEach((prov) => {
pathProviders.push({ id: prov.id })
})

// hooray we have all that we want
if (pathProviders.length >= pathSize) {
return { pathComplete: true }
}

// it looks like we want some more
return { closerPeers: msg.closerPeers }
}
})

const peers = dht.routingTable.closestPeers(key.buffer, dht.kBucketSize)

try {
await pTimeout(
query.run(peers),
providerTimeout
)
} catch (err) {
if (err.name !== pTimeout.TimeoutError.name) {
throw err
}
} finally {
query.stop()
}

// combine peers from each path
paths.forEach((path) => {
path.toArray().forEach((peer) => {
out.push(peer)
})
})

if (out.length === 0) {
throw errcode(new Error('no providers found'), 'ERR_NOT_FOUND')
}

for (const pData of out.toArray()) {
yield pData
}
}
}
}
13 changes: 0 additions & 13 deletions src/errors.js
Original file line number Diff line number Diff line change
@@ -1,13 +0,0 @@
'use strict'

class InvalidRecordError extends Error {}

class NotFoundError extends Error {}

class LookupFailureError extends Error {}

module.exports = {
InvalidRecordError,
NotFoundError,
LookupFailureError
}
769 changes: 371 additions & 398 deletions src/index.js

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions src/message/dht.proto.js
Original file line number Diff line number Diff line change
@@ -53,6 +53,7 @@ message Message {
optional MessageType type = 1;
// defines what coral cluster level this query/response belongs to.
// in case we want to implement coral's cluster rings in the future.
optional int32 clusterLevelRaw = 10;
// Used to specify the key associated with this message.
33 changes: 10 additions & 23 deletions src/message/index.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
'use strict'

const assert = require('assert')
const PeerInfo = require('peer-info')
const PeerId = require('peer-id')
const multiaddr = require('multiaddr')
const protons = require('protons')
const Record = require('libp2p-record').Record
const { Record } = require('libp2p-record')

const pbm = protons(require('./dht.proto'))

@@ -21,8 +20,8 @@ class Message {
* @param {number} level
*/
constructor (type, key, level) {
if (key) {
assert(Buffer.isBuffer(key))
if (key && !Buffer.isBuffer(key)) {
throw new Error('Key must be a buffer')
}

this.type = type
@@ -98,30 +97,18 @@ Message.TYPES = MESSAGE_TYPE
Message.CONNECTION_TYPES = CONNECTION_TYPE

function toPbPeer (peer) {
const res = {
return {
id: peer.id.id,
addrs: peer.multiaddrs.toArray().map((m) => m.buffer)
addrs: (peer.multiaddrs || []).map((m) => m.buffer),
connection: CONNECTION_TYPE.CONNECTED
}

if (peer.isConnected()) {
res.connection = CONNECTION_TYPE.CONNECTED
} else {
res.connection = CONNECTION_TYPE.NOT_CONNECTED
}

return res
}

function fromPbPeer (peer) {
const info = new PeerInfo(new PeerId(peer.id))
peer.addrs.forEach((a) => info.multiaddrs.add(a))

// TODO: upgrade protobuf to send the address connected on
if (peer.connection === CONNECTION_TYPE.CONNECTED) {
info.connect(peer.addrs[0])
return {
id: new PeerId(peer.id),
multiaddrs: peer.addrs.map((a) => multiaddr(a))
}

return info
}

module.exports = Message
197 changes: 80 additions & 117 deletions src/network.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
'use strict'

const pull = require('pull-stream')
const timeout = require('async/timeout')
const lp = require('pull-length-prefixed')
const setImmediate = require('async/setImmediate')
const errcode = require('err-code')

const pipe = require('it-pipe')
const lp = require('it-length-prefixed')
const pTimeout = require('p-timeout')

const MulticodecTopology = require('libp2p-interfaces/src/topology/multicodec-topology')

const rpc = require('./rpc')
const c = require('./constants')
@@ -22,58 +25,53 @@ class Network {
constructor (self) {
this.dht = self
this.readMessageTimeout = c.READ_MESSAGE_TIMEOUT
this._log = utils.logger(this.dht.peerInfo.id, 'net')
this._log = utils.logger(this.dht.peerId, 'net')
this._rpc = rpc(this.dht)
this._onPeerConnected = this._onPeerConnected.bind(this)
this._running = false
}

/**
* Start the network.
*
* @param {function(Error)} callback
* @returns {void}
* @returns {Promise<void>}
*/
start (callback) {
const cb = (err) => setImmediate(() => callback(err))

if (this.isOnline) {
return cb(new Error('Network is already running'))
async start () {
if (this._running) {
return
}

// TODO add a way to check if swarm has started or not
if (!this.dht.isStarted) {
return cb(new Error('Can not start network'))
throw errcode(new Error('Can not start network'), 'ERR_CANNOT_START_NETWORK')
}

this._running = true

// handle incoming connections
this.dht.swarm.handle(c.PROTOCOL_DHT, this._rpc)
// Incoming streams
this.dht.registrar.handle(c.PROTOCOL_DHT, this._rpc)

// handle new connections
this.dht.swarm.on('peer-mux-established', this._onPeerConnected)

cb()
// register protocol with topology
const topology = new MulticodecTopology({
multicodecs: [c.PROTOCOL_DHT],
handlers: {
onConnect: this._onPeerConnected,
onDisconnect: () => {}
}
})
this._registrarId = await this.dht.registrar.register(topology)
}

/**
* Stop all network activity.
*
* @param {function(Error)} callback
* @returns {void}
* @returns {Promise<void>}
*/
stop (callback) {
const cb = (err) => setImmediate(() => callback(err))

async stop () {
if (!this.dht.isStarted && !this.isStarted) {
return cb(new Error('Network is already stopped'))
return
}
this._running = false
this.dht.swarm.removeListener('peer-mux-established', this._onPeerConnected)

this.dht.swarm.unhandle(c.PROTOCOL_DHT)
cb()
// unregister protocol and handlers
await this.dht.registrar.unregister(this._registrarId)
}

/**
@@ -91,151 +89,116 @@ class Network {
* @type {bool}
*/
get isConnected () {
// TODO add a way to check if swarm has started or not
// TODO add a way to check if switch has started or not
return this.dht.isStarted && this.isStarted
}

/**
* Handle new connections in the swarm.
*
* @param {PeerInfo} peer
* @returns {void}
* Registrar notifies a connection successfully with dht protocol.
* @private
* @param {PeerId} peerId remote peer id
* @returns {Promise<void>}
*/
_onPeerConnected (peer) {
if (!this.isConnected) {
return this._log.error('Network is offline')
}

this.dht.swarm.dial(peer, c.PROTOCOL_DHT, (err, conn) => {
if (err) {
return this._log('%s does not support protocol: %s', peer.id.toB58String(), c.PROTOCOL_DHT)
}

// TODO: conn.close()
pull(pull.empty(), conn)

this.dht._add(peer, (err) => {
if (err) {
return this._log.error('Failed to add to the routing table', err)
}

this._log('added to the routing table: %s', peer.id.toB58String())
})
})
async _onPeerConnected (peerId) {
await this.dht._add(peerId)
this._log('added to the routing table: %s', peerId.toB58String())
}

/**
* Send a request and record RTT for latency measurements.
*
* @async
* @param {PeerId} to - The peer that should receive a message
* @param {Message} msg - The message to send.
* @param {function(Error, Message)} callback
* @returns {void}
* @returns {Promise<Message>}
*/
sendRequest (to, msg, callback) {
async sendRequest (to, msg) {
// TODO: record latency
if (!this.isConnected) {
return callback(new Error('Network is offline'))
throw errcode(new Error('Network is offline'), 'ERR_NETWORK_OFFLINE')
}

this._log('sending to: %s', to.toB58String())
this.dht.swarm.dial(to, c.PROTOCOL_DHT, (err, conn) => {
if (err) {
return callback(err)
}
const id = to.toB58String()
this._log('sending to: %s', id)

this._writeReadMessage(conn, msg.serialize(), callback)
})
const conn = await this.dht.dialer.connectToPeer(to)
const { stream } = await conn.newStream(c.PROTOCOL_DHT)

return this._writeReadMessage(stream, msg.serialize())
}

/**
* Sends a message without expecting an answer.
*
* @param {PeerId} to
* @param {Message} msg
* @param {function(Error)} callback
* @returns {void}
* @returns {Promise<void>}
*/
sendMessage (to, msg, callback) {
async sendMessage (to, msg) {
if (!this.isConnected) {
return setImmediate(() => callback(new Error('Network is offline')))
throw errcode(new Error('Network is offline'), 'ERR_NETWORK_OFFLINE')
}

this._log('sending to: %s', to.toB58String())
const id = to.toB58String()
this._log('sending to: %s', id)

this.dht.swarm.dial(to, c.PROTOCOL_DHT, (err, conn) => {
if (err) {
return callback(err)
}
const conn = await this.dht.dialer.connectToPeer(to)
const { stream } = await conn.newStream(c.PROTOCOL_DHT)

this._writeMessage(conn, msg.serialize(), callback)
})
return this._writeMessage(stream, msg.serialize())
}

/**
* Write a message and read its response.
* If no response is received after the specified timeout
* this will error out.
*
* @param {Connection} conn - the connection to use
* @param {DuplexIterable} stream - the stream to use
* @param {Buffer} msg - the message to send
* @param {function(Error, Message)} callback
* @returns {void}
* @returns {Promise<Message>}
* @private
*/
_writeReadMessage (conn, msg, callback) {
timeout(
writeReadMessage,
async _writeReadMessage (stream, msg) { // eslint-disable-line require-await
return pTimeout(
writeReadMessage(stream, msg),
this.readMessageTimeout
)(conn, msg, callback)
)
}

/**
* Write a message to the given connection.
* Write a message to the given stream.
*
* @param {Connection} conn - the connection to use
* @param {DuplexIterable} stream - the stream to use
* @param {Buffer} msg - the message to send
* @param {function(Error)} callback
* @returns {void}
* @returns {Promise<void>}
* @private
*/
_writeMessage (conn, msg, callback) {
pull(
pull.values([msg]),
_writeMessage (stream, msg) {
return pipe(
[msg],
lp.encode(),
conn,
lp.decode(),
pull.collect((err) => callback(err))
stream
)
}
}

function writeReadMessage (conn, msg, callback) {
pull(
pull.values([msg]),
async function writeReadMessage (stream, msg) {
const res = await pipe(
[msg],
lp.encode(),
conn,
pull.filter((msg) => msg.length < c.maxMessageSize),
stream,
lp.decode(),
pull.collect((err, res) => {
if (err) {
return callback(err)
}
if (res.length === 0) {
return callback(new Error('No message received'))
async source => {
for await (const chunk of source) {
return chunk.slice()
}
}
)

let response
try {
response = Message.deserialize(res[0])
} catch (err) {
return callback(new Error('failed to deserialize response: ' + err.message))
}
if (res.length === 0) {
throw errcode(new Error('No message received'), 'ERR_NO_MESSAGE_RECEIVED')
}

callback(null, response)
})
)
return Message.deserialize(res)
}

module.exports = Network
24 changes: 12 additions & 12 deletions src/peer-list.js → src/peer-list/index.js
Original file line number Diff line number Diff line change
@@ -1,42 +1,42 @@
'use strict'

/**
* A list of unique peer infos.
* A list of unique peers.
*/
class PeerList {
constructor () {
this.list = []
}

/**
* Add a new info. Returns `true` if it was a new one
* Add a new peer. Returns `true` if it was a new one
*
* @param {PeerInfo} info
* @param {PeerData} peerData
* @returns {bool}
*/
push (info) {
if (!this.has(info)) {
this.list.push(info)
push (peerData) {
if (!this.has(peerData.id)) {
this.list.push(peerData)
return true
}
return false
}

/**
* Check if this PeerInfo is already in here.
* Check if this PeerData is already in here.
*
* @param {PeerInfo} info
* @param {PeerId} peerId
* @returns {bool}
*/
has (info) {
const match = this.list.find((i) => i.id.isEqual(info.id))
has (peerId) {
const match = this.list.find((i) => i.id.isEqual(peerId))
return Boolean(match)
}

/**
* Get the list as an array.
*
* @returns {Array<PeerInfo>}
* @returns {Array<PeerData>}
*/
toArray () {
return this.list.slice()
@@ -45,7 +45,7 @@ class PeerList {
/**
* Remove the last element
*
* @returns {PeerInfo}
* @returns {PeerData}
*/
pop () {
return this.list.pop()
10 changes: 5 additions & 5 deletions src/limited-peer-list.js → src/peer-list/limited-peer-list.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict'

const PeerList = require('./peer-list')
const PeerList = require('.')

/**
* Like PeerList but with a length restriction.
@@ -17,14 +17,14 @@ class LimitedPeerList extends PeerList {
}

/**
* Add a PeerInfo if it fits in the list
* Add a PeerData if it fits in the list
*
* @param {PeerInfo} info
* @param {PeerData} peerData
* @returns {bool}
*/
push (info) {
push (peerData) {
if (this.length < this.limit) {
return super.push(info)
return super.push(peerData)
}
return false
}
88 changes: 88 additions & 0 deletions src/peer-list/peer-distance-list.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
'use strict'

const distance = require('xor-distance')
const utils = require('../utils')
const pMap = require('p-map')

/**
* Maintains a list of peerIds sorted by distance from a DHT key.
*/
class PeerDistanceList {
/**
* Creates a new PeerDistanceList.
*
* @param {Buffer} originDhtKey - the DHT key from which distance is calculated
* @param {number} capacity - the maximum size of the list
*/
constructor (originDhtKey, capacity) {
this.originDhtKey = originDhtKey
this.capacity = capacity
this.peerDistances = []
}

/**
* The length of the list
*/
get length () {
return this.peerDistances.length
}

/**
* The peerIds in the list, in order of distance from the origin key
*/
get peers () {
return this.peerDistances.map(pd => pd.peerId)
}

/**
* Add a peerId to the list.
*
* @param {PeerId} peerId
* @returns {Promise<void>}
*/
async add (peerId) {
if (this.peerDistances.find(pd => pd.peerId.id.equals(peerId.id))) {
return
}

const dhtKey = await utils.convertPeerId(peerId)
const el = {
peerId,
distance: distance(this.originDhtKey, dhtKey)
}

this.peerDistances.push(el)
this.peerDistances.sort((a, b) => distance.compare(a.distance, b.distance))
this.peerDistances = this.peerDistances.slice(0, this.capacity)
}

/**
* Indicates whether any of the peerIds passed as a parameter are closer
* to the origin key than the furthest peerId in the PeerDistanceList.
*
* @param {Array<PeerId>} peerIds
* @returns {Boolean}
*/
async anyCloser (peerIds) {
if (!peerIds.length) {
return false
}

if (!this.length) {
return true
}

const dhtKeys = await pMap(peerIds, (peerId) => utils.convertPeerId(peerId))

const furthestDistance = this.peerDistances[this.peerDistances.length - 1].distance
for (const dhtKey of dhtKeys) {
const keyDistance = distance(this.originDhtKey, dhtKey)
if (distance.compare(keyDistance, furthestDistance) < 0) {
return true
}
}
return false
}
}

module.exports = PeerDistanceList
56 changes: 20 additions & 36 deletions src/peer-queue.js → src/peer-list/peer-queue.js
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@ const Heap = require('heap')
const distance = require('xor-distance')
const debug = require('debug')

const utils = require('./utils')
const utils = require('../utils')

const log = debug('libp2p:dht:peer-queue')

@@ -17,34 +17,24 @@ class PeerQueue {
* Create from a given peer id.
*
* @param {PeerId} id
* @param {function(Error, PeerQueue)} callback
* @returns {void}
* @returns {Promise<PeerQueue>}
*/
static fromPeerId (id, callback) {
utils.convertPeerId(id, (err, key) => {
if (err) {
return callback(err)
}
static async fromPeerId (id) {
const key = await utils.convertPeerId(id)

callback(null, new PeerQueue(key))
})
return new PeerQueue(key)
}

/**
* Create from a given buffer.
*
* @param {Buffer} key
* @param {function(Error, PeerQueue)} callback
* @returns {void}
* @param {Buffer} keyBuffer
* @returns {Promise<PeerQueue>}
*/
static fromKey (key, callback) {
utils.convertBuffer(key, (err, key) => {
if (err) {
return callback(err)
}
static async fromKey (keyBuffer) {
const key = await utils.convertBuffer(keyBuffer)

callback(null, new PeerQueue(key))
})
return new PeerQueue(key)
}

/**
@@ -53,7 +43,7 @@ class PeerQueue {
* @param {Buffer} from - The sha2-256 encoded peer id
*/
constructor (from) {
log('create: %s', from.toString('hex'))
log('create: %b', from)
this.from = from
this.heap = new Heap(utils.xorCompare)
}
@@ -62,24 +52,18 @@ class PeerQueue {
* Add a new PeerId to the queue.
*
* @param {PeerId} id
* @param {function(Error)} callback
* @returns {void}
* @returns {Promise}
*/
enqueue (id, callback) {
log('enqueue %s', id.id.toString('hex'))
utils.convertPeerId(id, (err, key) => {
if (err) {
return callback(err)
}
async enqueue (id) {
log('enqueue %s', id.toB58String())
const key = await utils.convertPeerId(id)

const el = {
id: id,
distance: distance(this.from, key)
}
const el = {
id: id,
distance: distance(this.from, key)
}

this.heap.push(el)
callback()
})
this.heap.push(el)
}

/**
274 changes: 274 additions & 0 deletions src/peer-routing/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
'use strict'

const errcode = require('err-code')
const pTimeout = require('p-timeout')

const PeerId = require('peer-id')
const crypto = require('libp2p-crypto')

const c = require('../constants')
const Message = require('../message')
const Query = require('../query')

const utils = require('../utils')

module.exports = (dht) => {
/**
* Look if we are connected to a peer with the given id.
* Returns its id and addresses, if found, otherwise `undefined`.
* @param {PeerId} peer
* @returns {Promise<{ id: PeerId, multiaddrs: Multiaddr[] }>}
*/
const findPeerLocal = async (peer) => {
dht._log('findPeerLocal %s', peer.toB58String())
const p = await dht.routingTable.find(peer)

const peerData = p && dht.peerStore.get(p)

if (peerData) {
return {
id: peerData.id,
multiaddrs: peerData.multiaddrInfos.map((mi) => mi.multiaddr)
}
}
}

/**
* Get a value via rpc call for the given parameters.
* @param {PeerId} peer
* @param {Buffer} key
* @returns {Promise<Message>}
* @private
*/
const getValueSingle = async (peer, key) => { // eslint-disable-line require-await
const msg = new Message(Message.TYPES.GET_VALUE, key, 0)
return dht.network.sendRequest(peer, msg)
}

/**
* Find close peers for a given peer
* @param {Buffer} key
* @param {PeerId} peer
* @returns {Promise<Array<{ id: PeerId, multiaddrs: Multiaddr[] }>>}
* @private
*/

const closerPeersSingle = async (key, peer) => {
dht._log('closerPeersSingle %b from %s', key, peer.toB58String())
const msg = await dht.peerRouting._findPeerSingle(peer, new PeerId(key))

return msg.closerPeers
.filter((peerData) => !dht._isSelf(peerData.id))
.map((peerData) => {
dht.peerStore.addressBook.add(peerData.id, peerData.multiaddrs)

return peerData
})
}

/**
* Get the public key directly from a node.
* @param {PeerId} peer
* @returns {Promise<PublicKey>}
* @private
*/
const getPublicKeyFromNode = async (peer) => {
const pkKey = utils.keyForPublicKey(peer)
const msg = await getValueSingle(peer, pkKey)

if (!msg.record || !msg.record.value) {
throw errcode(`Node not responding with its public key: ${peer.toB58String()}`, 'ERR_INVALID_RECORD')
}

const recPeer = PeerId.createFromPubKey(msg.record.value)

// compare hashes of the pub key
if (!recPeer.isEqual(peer)) {
throw errcode('public key does not match id', 'ERR_PUBLIC_KEY_DOES_NOT_MATCH_ID')
}

return recPeer.pubKey
}

return {
/**
* Ask peer `peer` if they know where the peer with id `target` is.
* @param {PeerId} peer
* @param {PeerId} target
* @returns {Promise<Message>}
* @private
*/
async _findPeerSingle (peer, target) { // eslint-disable-line require-await
dht._log('findPeerSingle %s', peer.toB58String())
const msg = new Message(Message.TYPES.FIND_NODE, target.id, 0)

return dht.network.sendRequest(peer, msg)
},

/**
* Search for a peer with the given ID.
* @param {PeerId} id
* @param {Object} options - findPeer options
* @param {number} options.timeout - how long the query should maximally run, in milliseconds (default: 60000)
* @returns {Promise<{ id: PeerId, multiaddrs: Multiaddr[] }>}
*/
async findPeer (id, options = {}) {
options.timeout = options.timeout || c.minute
dht._log('findPeer %s', id.toB58String())

// Try to find locally
const pi = await findPeerLocal(id)

// already got it
if (pi != null) {
dht._log('found local')
return pi
}

const key = await utils.convertPeerId(id)
const peers = dht.routingTable.closestPeers(key, dht.kBucketSize)

if (peers.length === 0) {
throw errcode(new Error('Peer lookup failed'), 'ERR_LOOKUP_FAILED')
}

// sanity check
const match = peers.find((p) => p.isEqual(id))
if (match) {
const peer = dht.peerStore.get(id)

if (peer) {
dht._log('found in peerStore')
return {
id: peer.id,
multiaddrs: peer.multiaddrInfos.map((mi) => mi.multiaddr)
}
}
}

// query the network
const query = new Query(dht, id.id, () => {
// There is no distinction between the disjoint paths,
// so there are no per-path variables in dht scope.
// Just return the actual query function.
return async (peer) => {
const msg = await this._findPeerSingle(peer, id)
const match = msg.closerPeers.find((p) => p.id.isEqual(id))

// found it
if (match) {
return {
peer: match,
queryComplete: true
}
}

return {
closerPeers: msg.closerPeers
}
}
})

let error, result
try {
result = await pTimeout(query.run(peers), options.timeout)
} catch (err) {
error = err
}
query.stop()
if (error) throw error

let success = false
result.paths.forEach((result) => {
if (result.success) {
success = true
dht.peerStore.addressBook.add(result.peer.id, result.peer.multiaddrs)
}
})
dht._log('findPeer %s: %s', id.toB58String(), success)

if (!success) {
throw errcode(new Error('No peer found'), 'ERR_NOT_FOUND')
}

const peerData = dht.peerStore.get(id)

return {
id: peerData.id,
multiaddrs: peerData.multiaddrInfos.map((mi) => mi.multiaddr)
}
},

/**
* Kademlia 'node lookup' operation.
* @param {Buffer} key
* @param {Object} [options]
* @param {boolean} [options.shallow] shallow query (default: false)
* @returns {AsyncIterable<PeerId>}
*/
async * getClosestPeers (key, options = { shallow: false }) {
dht._log('getClosestPeers to %b', key)

const id = await utils.convertBuffer(key)
const tablePeers = dht.routingTable.closestPeers(id, dht.kBucketSize)

const q = new Query(dht, key, () => {
// There is no distinction between the disjoint paths,
// so there are no per-path variables in dht scope.
// Just return the actual query function.
return async (peer) => {
const closer = await closerPeersSingle(key, peer)

return {
closerPeers: closer,
pathComplete: options.shallow ? true : undefined
}
}
})

const res = await q.run(tablePeers)
if (!res || !res.finalSet) {
return []
}

const sorted = await utils.sortClosestPeers(Array.from(res.finalSet), id)

for (const pId of sorted.slice(0, dht.kBucketSize)) {
yield pId
}
},

/**
* Get the public key for the given peer id.
* @param {PeerId} peer
* @returns {Promise<PubKey>}
*/
async getPublicKey (peer) {
dht._log('getPublicKey %s', peer.toB58String())

// local check
const peerData = dht.peerStore.get(peer)
if (peerData && peerData.id.pubKey) {
dht._log('getPublicKey: found local copy')
return peerData.id.pubKey
}

// try the node directly
let pk
try {
pk = await getPublicKeyFromNode(peer)
} catch (err) {
// try dht directly
const pkKey = utils.keyForPublicKey(peer)
const value = await dht.get(pkKey)
pk = crypto.keys.unmarshalPublicKey(value)
}

peerData.id = new PeerId(peer.id, null, pk)
const addrs = peerData.multiaddrInfos.map((mi) => mi.multiaddr)
dht.peerStore.addressBook.add(peerData.id, addrs)

return pk
}
}
}
638 changes: 0 additions & 638 deletions src/private.js

This file was deleted.

295 changes: 130 additions & 165 deletions src/providers.js
Original file line number Diff line number Diff line change
@@ -2,11 +2,9 @@

const cache = require('hashlru')
const varint = require('varint')
const each = require('async/each')
const pull = require('pull-stream')
const CID = require('cids')
const PeerId = require('peer-id')
const Key = require('interface-datastore').Key
const { Key } = require('interface-datastore')
const { default: Queue } = require('p-queue')

const c = require('./constants')
const utils = require('./utils')
@@ -56,137 +54,105 @@ class Providers {
this.lruCacheSize = cacheSize || c.PROVIDERS_LRU_CACHE_SIZE

this.providers = cache(this.lruCacheSize)

this.syncQueue = new Queue({ concurrency: 1 })
}

/**
* Check all providers if they are still valid, and if not
* delete them.
*
* @returns {undefined}
* Release any resources.
*
* @private
* @returns {void}
*/
_cleanup () {
this._getProviderCids((err, cids) => {
if (err) {
return this._log.error('Failed to get cids', err)
}

each(cids, (cid, cb) => {
this._getProvidersMap(cid, (err, provs) => {
if (err) {
return cb(err)
}

provs.forEach((time, provider) => {
this._log('comparing: %s - %s > %s', Date.now(), time, this.provideValidity)
if (Date.now() - time > this.provideValidity) {
provs.delete(provider)
}
})

if (provs.size === 0) {
return this._deleteProvidersMap(cid, cb)
}

cb()
})
}, (err) => {
if (err) {
return this._log.error('Failed to cleanup', err)
}

this._log('Cleanup successfull')
})
})
stop () {
if (this._cleaner) {
clearInterval(this._cleaner)
this._cleaner = null
}
}

/**
* Get a list of all cids that providers are known for.
*
* @param {function(Error, Array<CID>)} callback
* @returns {undefined}
* Check all providers if they are still valid, and if not delete them.
*
* @returns {Promise<void>}
* @private
*/
_getProviderCids (callback) {
pull(
this.datastore.query({prefix: c.PROVIDERS_KEY_PREFIX}),
pull.map((entry) => {
const parts = entry.key.toString().split('/')
if (parts.length !== 4) {
this._log.error('incorrectly formatted provider entry in datastore: %s', entry.key)
return
}

let decoded
_cleanup () {
return this.syncQueue.add(async () => {
this._log('start cleanup')
const start = Date.now()

let count = 0
let deleteCount = 0
const deleted = new Map()
const batch = this.datastore.batch()

// Get all provider entries from the datastore
const query = this.datastore.query({ prefix: c.PROVIDERS_KEY_PREFIX })
for await (const entry of query) {
try {
decoded = utils.decodeBase32(parts[2])
// Add a delete to the batch for each expired entry
const { cid, peerId } = parseProviderKey(entry.key)
const time = readTime(entry.value)
const now = Date.now()
const delta = now - time
const expired = delta > this.provideValidity
this._log('comparing: %d - %d = %d > %d %s',
now, time, delta, this.provideValidity, expired ? '(expired)' : '')
if (expired) {
deleteCount++
batch.delete(entry.key)
const peers = deleted.get(cid) || new Set()
peers.add(peerId)
deleted.set(cid, peers)
}
count++
} catch (err) {
this._log.error('error decoding base32 provider key: %s', parts[2])
return
this._log.error(err.message)
}
}
this._log('deleting %d / %d entries', deleteCount, count)

let cid
try {
cid = new CID(decoded)
} catch (err) {
this._log.error('error converting key to cid from datastore: %s', err.message)
// Commit the deletes to the datastore
if (deleted.size) {
await batch.commit()
}

// Clear expired entries from the cache
for (const [cid, peers] of deleted) {
const key = makeProviderKey(cid)
const provs = this.providers.get(key)
if (provs) {
for (const peerId of peers) {
provs.delete(peerId)
}
if (provs.size === 0) {
this.providers.remove(key)
} else {
this.providers.set(key, provs)
}
}
}

return cid
}),
pull.filter(Boolean),
pull.collect(callback)
)
this._log('Cleanup successful (%dms)', Date.now() - start)
})
}

/**
* Get the currently known provider maps for a given CID.
* Get the currently known provider peer ids for a given CID.
*
* @param {CID} cid
* @param {function(Error, Map<PeerId, Date>)} callback
* @returns {undefined}
* @returns {Promise<Map<String, Date>>}
*
* @private
*/
_getProvidersMap (cid, callback) {
const provs = this.providers.get(makeProviderKey(cid))

async _getProvidersMap (cid) {
const cacheKey = makeProviderKey(cid)
let provs = this.providers.get(cacheKey)
if (!provs) {
return loadProviders(this.datastore, cid, callback)
provs = await loadProviders(this.datastore, cid)
this.providers.set(cacheKey, provs)
}

callback(null, provs)
}

/**
* Completely remove a providers map entry for a given CID.
*
* @param {CID} cid
* @param {function(Error)} callback
* @returns {undefined}
*
* @private
*/
_deleteProvidersMap (cid, callback) {
const dsKey = makeProviderKey(cid)
this.providers.set(dsKey, null)
const batch = this.datastore.batch()

pull(
this.datastore.query({
keysOnly: true,
prefix: dsKey
}),
pull.through((entry) => batch.delete(entry.key)),
pull.onEnd((err) => {
if (err) {
return callback(err)
}
batch.commit(callback)
})
)
return provs
}

get cleanupInterval () {
@@ -207,67 +173,55 @@ class Providers {
}

/**
* Add a new provider.
* Add a new provider for the given CID.
*
* @param {CID} cid
* @param {PeerId} provider
* @param {function(Error)} callback
* @returns {undefined}
* @returns {Promise<void>}
*/
addProvider (cid, provider, callback) {
this._log('addProvider %s', cid.toBaseEncodedString())
const dsKey = makeProviderKey(cid)
const provs = this.providers.get(dsKey)

const next = (err, provs) => {
if (err) {
return callback(err)
}
async addProvider (cid, provider) { // eslint-disable-line require-await
return this.syncQueue.add(async () => {
this._log('addProvider %s', cid.toBaseEncodedString())
const provs = await this._getProvidersMap(cid)

this._log('loaded %s provs', provs.size)
const now = Date.now()
provs.set(provider, now)
provs.set(utils.encodeBase32(provider.id), now)

const dsKey = makeProviderKey(cid)
this.providers.set(dsKey, provs)
writeProviderEntry(this.datastore, cid, provider, now, callback)
}

if (!provs) {
loadProviders(this.datastore, cid, next)
} else {
next(null, provs)
}
return writeProviderEntry(this.datastore, cid, provider, now)
})
}

/**
* Get a list of providers for the given CID.
*
* @param {CID} cid
* @param {function(Error, Array<PeerId>)} callback
* @returns {undefined}
* @returns {Promise<Array<PeerId>>}
*/
getProviders (cid, callback) {
this._log('getProviders %s', cid.toBaseEncodedString())
this._getProvidersMap(cid, (err, provs) => {
if (err) {
return callback(err)
}

callback(null, Array.from(provs.keys()))
async getProviders (cid) { // eslint-disable-line require-await
return this.syncQueue.add(async () => {
this._log('getProviders %s', cid.toBaseEncodedString())
const provs = await this._getProvidersMap(cid)
return [...provs.keys()].map((base32PeerId) => {
return new PeerId(utils.decodeBase32(base32PeerId))
})
})
}
}

/**
* Encode the given key its matching datastore key.
*
* @param {CID} cid
* @param {CID|string} cid - cid or base32 encoded string
* @returns {string}
*
* @private
*/
function makeProviderKey (cid) {
return c.PROVIDERS_KEY_PREFIX + utils.encodeBase32(cid.buffer)
cid = typeof cid === 'string' ? cid : utils.encodeBase32(cid.buffer)
return c.PROVIDERS_KEY_PREFIX + cid
}

/**
@@ -277,48 +231,59 @@ function makeProviderKey (cid) {
* @param {CID} cid
* @param {PeerId} peer
* @param {number} time
* @param {function(Error)} callback
* @returns {undefined}
* @returns {Promise<void>}
*
* @private
*/
function writeProviderEntry (store, cid, peer, time, callback) {
async function writeProviderEntry (store, cid, peer, time) { // eslint-disable-line require-await
const dsKey = [
makeProviderKey(cid),
'/',
utils.encodeBase32(peer.id)
].join('')

store.put(new Key(dsKey), new Buffer(varint.encode(time)), callback)
const key = new Key(dsKey)
const buffer = Buffer.from(varint.encode(time))
return store.put(key, buffer)
}

/**
* Parse the CID and provider peer id from the key
*
* @param {DKey} key
* @returns {Object} object with peer id and cid
*
* @private
*/
function parseProviderKey (key) {
const parts = key.toString().split('/')
if (parts.length !== 4) {
throw new Error('incorrectly formatted provider entry key in datastore: ' + key)
}

return {
cid: parts[2],
peerId: parts[3]
}
}

/**
* Load providers from the store.
* Load providers for the given CID from the store.
*
* @param {Datastore} store
* @param {CID} cid
* @param {function(Error, Map<PeerId, Date>)} callback
* @returns {undefined}
* @returns {Promise<Map<PeerId, Date>>}
*
* @private
*/
function loadProviders (store, cid, callback) {
pull(
store.query({prefix: makeProviderKey(cid)}),
pull.map((entry) => {
const parts = entry.key.toString().split('/')
const lastPart = parts[parts.length - 1]
const rawPeerId = utils.decodeBase32(lastPart)
return [new PeerId(rawPeerId), readTime(entry.value)]
}),
pull.collect((err, res) => {
if (err) {
return callback(err)
}

return callback(null, new Map(res))
})
)
async function loadProviders (store, cid) {
const providers = new Map()
const query = store.query({ prefix: makeProviderKey(cid) })
for await (const entry of query) {
const { peerId } = parseProviderKey(entry.key)
providers.set(peerId, readTime(entry.value))
}
return providers
}

function readTime (buf) {
52 changes: 52 additions & 0 deletions src/query-manager.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
'use strict'

/**
* Keeps track of all running queries.
*/
class QueryManager {
/**
* Creates a new QueryManager.
*/
constructor () {
this.queries = new Set()
this.running = false
}

/**
* Called when a query is started.
*
* @param {Query} query
*/
queryStarted (query) {
this.queries.add(query)
}

/**
* Called when a query completes.
*
* @param {Query} query
*/
queryCompleted (query) {
this.queries.delete(query)
}

/**
* Starts the query manager.
*/
start () {
this.running = true
}

/**
* Stops all queries.
*/
stop () {
this.running = false
for (const query of this.queries) {
query.stop()
}
this.queries.clear()
}
}

module.exports = QueryManager
193 changes: 0 additions & 193 deletions src/query.js

This file was deleted.

118 changes: 118 additions & 0 deletions src/query/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
'use strict'

const mh = require('multihashes')

const utils = require('../utils')
const Run = require('./run')

/**
* Divide peers up into disjoint paths (subqueries). Any peer can only be used once over all paths.
* Within each path, query peers from closest to farthest away.
*/
class Query {
/**
* User-supplied function to set up an individual disjoint path. Per-path
* query state should be held in this function's closure.
* @typedef {makePath} function
* @param {number} pathNum - Numeric index from zero to numPaths - 1
* @returns {queryFunc} - Function to call on each peer in the query
*/

/**
* Query function.
* @typedef {queryFunc} function
* @param {PeerId} next - Peer to query
* @param {function(Error, Object)} callback - Query result callback
*/

/**
* Create a new query. The makePath function is called once per disjoint path, so that per-path
* variables can be created in that scope. makePath then returns the actual query function (queryFunc) to
* use when on that path.
*
* @param {DHT} dht - DHT instance
* @param {Buffer} key
* @param {makePath} makePath - Called to set up each disjoint path. Must return the query function.
*/
constructor (dht, key, makePath) {
this.dht = dht
this.key = key
this.makePath = makePath
this._log = utils.logger(this.dht.peerId, 'query:' + mh.toB58String(key))

this.running = false

this._onStart = this._onStart.bind(this)
this._onComplete = this._onComplete.bind(this)
}

/**
* Run this query, start with the given list of peers first.
*
* @param {Array<PeerId>} peers
* @returns {Promise}
*/
async run (peers) { // eslint-disable-line require-await
if (!this.dht._queryManager.running) {
this._log.error('Attempt to run query after shutdown')
return { finalSet: new Set(), paths: [] }
}

if (peers.length === 0) {
this._log.error('Running query with no peers')
return { finalSet: new Set(), paths: [] }
}

this._run = new Run(this)

this._log(`query running with K=${this.dht.kBucketSize}, A=${this.dht.concurrency}, D=${Math.min(this.dht.disjointPaths, peers.length)}`)
this._run.once('start', this._onStart)
this._run.once('complete', this._onComplete)

return this._run.execute(peers)
}

/**
* Called when the run starts.
*/
_onStart () {
this.running = true
this._startTime = Date.now()
this._log('query:start')

// Register this query so we can stop it if the DHT stops
this.dht._queryManager.queryStarted(this)
}

/**
* Called when the run completes (even if there's an error).
*/
_onComplete () {
// Ensure worker queues for all paths are stopped at the end of the query
this.stop()
}

/**
* Stop the query.
*/
stop () {
this._log(`query:done in ${Date.now() - this._startTime}ms`)

if (this._run) {
this._log(`${this._run.errors.length} of ${this._run.peersSeen.size} peers errored (${this._run.errors.length / this._run.peersSeen.size * 100}% fail rate)`)
}

if (!this.running) {
return
}

this._run.removeListener('start', this._onStart)
this._run.removeListener('complete', this._onComplete)

this.running = false
this._run && this._run.stop()
this.dht._queryManager.queryCompleted(this)
}
}

module.exports = Query
84 changes: 84 additions & 0 deletions src/query/path.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
'use strict'

const PeerQueue = require('../peer-list/peer-queue')
const utils = require('../utils')

// TODO: Temporary until parallel dial in Switch have a proper
// timeout. Requires async/await refactor of transports and
// dial abort logic. This gives us 30s to complete the `queryFunc`.
// This should help reduce the high end call times of queries
const QUERY_FUNC_TIMEOUT = 30e3

/**
* Manages a single Path through the DHT.
*/
class Path {
/**
* Creates a Path.
*
* @param {Run} run
* @param {queryFunc} queryFunc
*/
constructor (run, queryFunc) {
this.run = run
this.queryFunc = utils.withTimeout(queryFunc, QUERY_FUNC_TIMEOUT)
if (!this.queryFunc) throw new Error('Path requires a `queryFn` to be specified')
if (typeof this.queryFunc !== 'function') throw new Error('Path expected `queryFn` to be a function. Got ' + typeof this.queryFunc)

/**
* @type {Array<PeerId>}
*/
this.initialPeers = []

/**
* @type {PeerQueue}
*/
this.peersToQuery = null
}

/**
* Add a peer to the set of peers that are used to intialize the path.
* @param {PeerId} peer
*/
addInitialPeer (peer) {
this.initialPeers.push(peer)
}

/**
* Execute the path.
*
* @returns {Promise}
*
*/
async execute () {
// Create a queue of peers ordered by distance from the key
const queue = await PeerQueue.fromKey(this.run.query.key)
// Add initial peers to the queue
this.peersToQuery = queue
await Promise.all(this.initialPeers.map(peer => this.addPeerToQuery(peer)))
await this.run.workerQueue(this)
}

/**
* Add a peer to the peers to be queried.
*
* @param {PeerId} peer
* @returns {Promise<void>}
*/
async addPeerToQuery (peer) {
// Don't add self
if (this.run.query.dht._isSelf(peer)) {
return
}

// The paths must be disjoint, meaning that no two paths in the Query may
// traverse the same peer
if (this.run.peersSeen.has(peer.toB58String())) {
return
}

await this.peersToQuery.enqueue(peer)
}
}

module.exports = Path
202 changes: 202 additions & 0 deletions src/query/run.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
'use strict'

const PeerDistanceList = require('../peer-list/peer-distance-list')
const EventEmitter = require('events')

const Path = require('./path')
const WorkerQueue = require('./workerQueue')
const utils = require('../utils')

/**
* Manages a single run of the query.
*/
class Run extends EventEmitter {
/**
* Creates a Run.
*
* @param {Query} query
*/
constructor (query) {
super()

this.query = query

this.running = false
this.workers = []

// The peers that have been queried (including error responses)
this.peersSeen = new Set()
// The errors received when querying peers
this.errors = []
// The closest K peers that have been queried successfully
// (this member is initialized when the worker queues start)
this.peersQueried = null
}

/**
* Stop all the workers
*/
stop () {
if (!this.running) {
return
}

this.running = false
for (const worker of this.workers) {
worker.stop()
}
}

/**
* Execute the run with the given initial set of peers.
*
* @param {Array<PeerId>} peers
* @returns {Promise}
*/

async execute (peers) {
const paths = [] // array of states per disjoint path

// Create disjoint paths
const numPaths = Math.min(this.query.dht.disjointPaths, peers.length)
for (let i = 0; i < numPaths; i++) {
paths.push(new Path(this, this.query.makePath(i, numPaths)))
}

// Assign peers to paths round-robin style
peers.forEach((peer, i) => {
paths[i % numPaths].addInitialPeer(peer)
})

// Execute the query along each disjoint path
await this.executePaths(paths)

const res = {
// The closest K peers we were able to query successfully
finalSet: new Set(this.peersQueried.peers),
paths: []
}

// Collect the results from each completed path
for (const path of paths) {
if (path.res && (path.res.pathComplete || path.res.queryComplete)) {
path.res.success = true
res.paths.push(path.res)
}
}

return res
}

/**
* Execute all paths through the DHT.
*
* @param {Array<Path>} paths
* @returns {Promise<void>}
*/
async executePaths (paths) {
this.running = true

this.emit('start')
try {
await Promise.all(paths.map(path => path.execute()))
} finally {
// Ensure all workers are stopped
this.stop()
// Completed the Run
this.emit('complete')
}

// If all queries errored out, something is seriously wrong, so callback
// with an error
if (this.errors.length === this.peersSeen.size) {
throw this.errors[0]
}
}

/**
* Initialize the list of queried peers, then start a worker queue for the
* given path.
*
* @param {Path} path
* @returns {Promise<void>}
*/
async workerQueue (path) {
await this.init()
await this.startWorker(path)
}

/**
* Create and start a worker queue for a particular path.
*
* @param {Path} path
* @returns {Promise<void>}
*/
async startWorker (path) {
const worker = new WorkerQueue(this.query.dht, this, path, this.query._log)
this.workers.push(worker)
await worker.execute()
}

/**
* Initialize the list of closest peers we've queried - this is shared by all
* paths in the run.
*
* @returns {Promise<void>}
*/
async init () {
if (this.peersQueried) {
return
}

// We only want to initialize the PeerDistanceList once for the run
if (this.peersQueriedPromise) {
await this.peersQueriedPromise
return
}

// This promise is temporarily stored so that others may await its completion
this.peersQueriedPromise = (async () => {
const dhtKey = await utils.convertBuffer(this.query.key)
this.peersQueried = new PeerDistanceList(dhtKey, this.query.dht.kBucketSize)
})()

// After PeerDistanceList is initialized, clean up
await this.peersQueriedPromise
delete this.peersQueriedPromise
}

/**
* If we've queried K peers, and the remaining peers in the given `worker`'s queue
* are all further from the key than the peers we've already queried, then we should
* stop querying on that `worker`.
*
* @param {WorkerQueue} worker
* @returns {Promise<Boolean>}
*/
async continueQuerying (worker) {
// If we haven't queried K peers yet, keep going
if (this.peersQueried.length < this.peersQueried.capacity) {
return true
}

// Get all the peers that are currently being queried.
// Note that this function gets called right after a peer has been popped
// off the head of the closest peers queue so it will include that peer.
const running = worker.queue.workersList().map(i => i.data)

// Check if any of the peers that are currently being queried are closer
// to the key than the peers we've already queried
const someCloser = await this.peersQueried.anyCloser(running)

// Some are closer, the worker should keep going
if (someCloser) {
return true
}

// None are closer, the worker can stop
return false
}
}

module.exports = Run
Loading