Giter VIP home page Giter VIP logo

js-libp2p-pubsub's People

Contributors

a1300 avatar achingbrain avatar alanshaw avatar ckousik avatar dependabot-preview[bot] avatar dependabot[bot] avatar fryorcraken avatar hugomrdias avatar jacobheun avatar libp2p-mgmt-read-write[bot] avatar mikerah avatar mpetrunic avatar semantic-release-bot avatar tabcat avatar tobowers avatar vasco-santos avatar web-flow avatar web3-bot avatar wemeetagain avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

js-libp2p-pubsub's Issues

Improve abstract pubsub implementation

The purpose of this module consists of being used as the base block for libp2p based pubsub routing protocols.

Currently, we have some logic on the pubsub routing protocols that can also be extracted into this module. Namely, the emitSelf option, message signing, among others. However, we must no be really restrictive as new protocols may not do what is currently the same for libp2p-floodsub and gossipsub.

In this context, we should draft a newer API that could help us maintaining less code and allowing easier development of new pubsub routing protocols.

cc @jacobheun

All rpc message processing after 'normalization'

Currently, we have two slightly different representations of RPC.Message objects, named (in js-libp2p-gossipsub) Message and InMessage.
The differ by their from property, which is a Buffer in Message and a base58-encoded string in InMessage

The strategy for creating an extensible interface has been to create a _processX method for each X entry in an RPC object, but 'normalizing' Message objects to InMessage objects before message processing.
Pseudocode for idealized RPC processing:

function _processRPC (peer: Peer, rpc: RPC) {
  rpc.subscriptions.forEach(sub => _processSubOpt(peer, sub))
  rpc.msgs.forEach(msg => _processMessage(peer, normalize(msg)))
}

function normalize (msg: Message): InMessage {}

function _processMessage (peer: Peer, msg: InMessage): void {
  if (await validate(peer, msg)) {
    _publishMsg(peer, msg)
  }
}

async function validate (peer: Peer, msg: InMessage): Promise<boolean> {}

One wrinkle to getting to this ^ is the validate method, which still operates on Message objects instead of InMessage objects. Because of this, we end up with special handling of message validation outside of the _processMessage scope. See https://github.com/ChainSafe/js-libp2p-gossipsub/blob/master/ts/pubsub.js#L163-L175

The current strategy of validating messages before "processing" them leads to problems adding pre-validation processing steps.. Eg: in gossipsub, checking a seen cache or peer gray/blacklist should likely be checked before topic validation (as is done in go-libp2p-pubsub), but cannot be done within _processMessage when validation occurs outside.

I think the only thing to do to allow validate to operate on InMessage objects is changing the signing code to generate a PeerId/public key fromB58String instead of fromBytes. Perhaps this can be done in a backwards compatible way by checking typeof message.from === 'string'.
https://github.com/libp2p/js-libp2p-pubsub/blob/master/src/message/sign.js#L69

Alternatively, the message can simply be converted back to Message before being passed to validate, but this doesn't seem ideal.

Peer class discussion

It seems Peers primary purpose is to manage a peer's pubsub stream and negotiated protocol.

A few suggestions to make this clearer:

  • constructor should take { id, stream, protocol } to align with the return values of conn.newStream and callback attached to registrar.handle
  • rename this.conn to something more descriptive, possible rename this.stream at the same time.
    In any case, it seems the object this.conn is actually the muxed pubsub stream and not a Connection object, See https://github.com/libp2p/js-libp2p-pubsub/blob/master/src/index.js#L197
  • I think the Peer class could/should be thinner, recommend removing topics, _sendRawSubscriptions, sendSubscriptions, sendUnsubscriptions, sendMessages, and updateSubscriptions. These should likely be handled independently by router implementations. topics is especially troublesome, as js-libp2p-pubsub doesn't currently manage each Peer.topics, so this additional index must be maintained by the router.
    I think ideally, any js-libp2p-pubsub-managed indices or convenience methods are placed on src/index.js and a Peer maintains a simple write/close interface.

Variables to be added to the js pubsub router

@vasco-santos and @jacobheun are extracting out the BaseProtocol that is meant to be a base class for more specific pubsub routers. The reference Go pubsub router has a lot of variables and functions that may or may not be useful in JS.

An initial read through suggests that the following variables and functions may be useful for JS:

  • topics: This is a mapping of topics to peers that have subscribed to that topic.
  • seen: This is a time cache of seen messages. So far, this has been used in both FloodSub and `GossipSub
  • Protocols: This function returns a list of protocols supported by the pubsub router. I think a variant of this should be added to peerInfo as well. The router should be able to check if a peer supports a particular protocol.

I think everything else in the Go implementation is too specific and might not be needed for an implementer's needs.

Backporting stream fixes to 0.4.x

We are seeing some performance issues that have been quite hard to debug and noticed this issue: libp2p/js-libp2p#668 Have a suspicion that this is what could be causing them, but have not been able to confirm yet.

Looks like two PRs with mitigations to this issue (#48 #49) was released to 0.5.x, however these can only be used with [email protected] which js-ipfs has yet to be upgraded to. Therefore I was wondering if it's possible to backport these fixes to [email protected]?

Happy to make a PR if you think it's easy to cherry-pick!

PubSub abstraction API

Per discussion in libp2p/js-libp2p#720#pullrequestreview-456405724, I am creating this issue to discuss the pubsub abstraction and to find a good way of accessing/modifying pubsub components.

At first, looking into libp2p/js-libp2p/src/pubsub.js, I think that we should basically get rid of this and use the pubsub instance directly. The base pubsub class (hopefully interface-pubsub in the future) should handle the specificities that are handled by libp2p. Currently, someone needs to do node1.pubsub._pubsub.*, to access stuff that is not exposed. In addition, the current setup makes us replicate the API in libp2p unnecessarily.

Other than the above, we need to think how we can customize some behaviours, both from startup (configuration?) and in runtime. An example for this is the topicValidators.

cc @jacobheun @wemeetagain let me know what you think

Improve test stream mock

We are currently using alanshaw/it-pair for testing the pubsub implementations. This pair of {source, sink} streams is bare minimal and does not include functions like reset.

In libp2p/js-libp2p-pubsub#45, we added a stream reset for properly closing the read side of the stream, but we needed to add a validation if the function exists because of the tests.

We should probably create a wrapper around the it-pair that makes these streams similar to the multiplexex streams.

Publish method signature discussion

Currently, the publish method has two params
topics: string[] | string - the list of topics associated with the 'messages'
and messages: (string | Buffer | ArrayBuffer)[] | (string | Buffer | ArrayBuffer) - the list of data payloads to be propagated.

How come this method accepts a list of payloads instead of a single payload? (the Message object within an RPC object looks more like this shape -- a list of topics and a single data payload)

Is the intention to be able to reuse a single RPC to broadcast multiple message payloads at once?
In that case, shouldn't the interface allow something more like {topics: string[] | string, data: string | Buffer | ArrayBuffer}[], which exposes the full flexibility of each Message object?

Add signing and verification of messages

As we go for enabling signing and verification of messages, this logic should live under the base protocol implementation, that is, the codebase of js-libp2p-pubsub.

This issue intends to track the refactor of js-libp2p-pubsub, as well as of its implementors to have these implemented in here, instead of needing to implement this in each implementation of pubsub.

Issue surrounding dropping peers

While reviewing @Mikerah PR ChainSafe/js-libp2p-gossipsub/pull/15 , a specific line caught me by surprise and I'm curious on all your thoughts.

On this line we are checking if if (--peer._references === 0) before dropping a peer, which leads me to believe that if there was only two peers on the network they would kick each-other, meaning they would never make a connection.

Which I'm thinking might cause issues on startup if theres a latency getting peers (which if I remember correctly) eth1,x clients do have a bit of an issue with. Sometimes it takes quite a while to get peers sync'd.

pubsub stops working after a while

ipfs-npm-registry-mirror has a bunch of docker containers - one is continuously syncing the npm registry (the replication master) and broadcasting updates to the others (the mirrors). After a while (30-60 mins) the messages stop being received by the mirrors.

  • The replication master and mirrors all have each other in their swarm peers list
  • The replication master has the mirrors in the peer list for the topic
  • The mirrors have each other in the peer list for the topic
  • The replication master is not subscribed to the topic

Anyone got any ideas why this would happen?

Node not reciving PubSub messages

Hey new to using libp2p and this is my second attempt in the last year to get it to work ( gave up the first time haha).

So I have made two separate nodes. My first node (boot-node) witch has a persistent id for easy discovery, my second node (client) is just a standalone separate node that connects to my boot-node.

As of now both nodes run smooth and the connection manger event listener for peer:connect on both nodes trigger my console.log, so i presume both nodes are connected. (correct me if I'm wrong please)

After both nodes "connect" my pub sub function is implemented on the client side and attempts to send the nodes multiaddrs of the client to the boot-node. This is when nothing happens the clients emits to self and i see the publish there but the boot-node never receives the publish.

Client.js

import { createLibp2p } from 'libp2p'
import ipily from 'ipily'
import { Bootstrap } from '@libp2p/bootstrap'
import { TCP } from '@libp2p/tcp'
import { Noise } from '@chainsafe/libp2p-noise'
import { Mplex } from '@libp2p/mplex'
import { FloodSub } from '@libp2p/floodsub'
import { createFromJSON } from '@libp2p/peer-id-factory'
import { toString } from 'uint8arrays/to-string'
import { fromString } from 'uint8arrays/from-string'

const main = async () => {
    
    const bootstrapers = ['/ip6/::/tcp/4002/p2p/QmYMxFJAWQfwM3nWCChySdJSn4X9T4tkkTuUgYKaQXytg8']

    const ip = await ipily()

    const node = await createLibp2p({
        addresses: {
        // add a listen address (localhost) to accept TCP connections on a random port
        listen: [`/ip6/${ip}/tcp/4001`]
        },
    
        transports: [new TCP()],

        connectionEncryption: [new Noise()],

        streamMuxers: [new Mplex()],

        peerDiscovery: [new Bootstrap({
            interval: 60e3,
            list: bootstrapers
        })],

        pubsub: new FloodSub({
            emitSelf: true,
            enabled: true
        }),

        connectionManager: {
            autoDial: true
            
        }
    })

    const topic = 'peer-bradcast'

    // start libp2p
    await node.start()
    console.log('libp2p has started')

    // print out listening addresses
    node.getMultiaddrs().forEach((addr) => {
        console.log('listening on addresses:', addr.toString())
    })
    
    node.pubsub.subscribe(topic)

    node.connectionManager.addEventListener('peer:connect', (evt) => {
        const connection = evt.detail
        console.log(`connection to : ${connection.remotePeer.toString()}`)
        
        setInterval(() => {node.getMultiaddrs().forEach((addr) => {
            node.pubsub.publish(topic, fromString(addr.toString()))
        })}, 5000)
        
    })

    node.pubsub.addEventListener('message', (evt) => {
        console.log('received:', toString(evt.detail.data))
    })
}

main().then().catch(console.error)

Boot-node.js

import { createLibp2p } from 'libp2p'
import { Bootstrap } from '@libp2p/bootstrap'
import { TCP } from '@libp2p/tcp'
import { Noise } from '@chainsafe/libp2p-noise'
import { Mplex } from '@libp2p/mplex'
import { createFromJSON } from '@libp2p/peer-id-factory'
import { FloodSub } from '@libp2p/floodsub'
import { Multiaddr } from '@multiformats/multiaddr'
import { toString } from 'uint8arrays/to-string'
import { fromString } from 'uint8arrays/from-string'
import ipily from 'ipily'


const main = async () => {
    const myPeerId = await createFromJSON({
        id: 'QmYMxFJAWQfwM3nWCChySdJSn4X9T4tkkTuUgYKaQXytg8',
        privKey: 'CAAS4gQwggJeAgEAAoGBAK3ZWsNyweiltshFXVqViO/+vSKnKHA8v/RMIpJK7TPlQlDEjkYQO+AOCj+NJ35cQZLet3RE6MOf4nSD5deKX15v+nS5xcCaDR3uHnq/RK5zm8CveEodGBuPplbcS9i+0z8FIHdjZRBQ0UYn6BZb0AIe06UPoL6slhyn8kae6+9rAgMBAAECgYEAn/GT/rNq0Wb8xj6DB98BsIO2cNZHbxOFKvaM6/kBSLiJDZP4rV8sJxPju9pa1nd8YOqE7/SiDbDggIAFCwukdtHzpMQTKoEVFXSvkxV4J18VdkpEHpy19F2qWrbVExiTmZo3JBgoVDFvT79vDWlMLKYu0559Fn/x1lCALVEIdJECQQDim9P8zdtRdvqm6ykHHsqe1nzFTd5hbslQiSwggoQoH1eLxKVl+LZWSZ3Zv30RvSvOtuFFtQvcNSc1vgRUkCMDAkEAxGW6YTM5T5ouh8u6JPZfKoccDEH22fPl9ZC11Ve/T/gahIsNquIdkg00K9c4aWErAagXXrynuU0rBKBs7hgheQJBAOI3zZhBqP/aVr7rYznIc129oTEWWznI7w+G4JZTtLqZDzxoQvVZJC1fsET/9EsRHfpudtSvt/dM2Ke92XTRryECQCfV7nj6t9wyjM+UfhwzMNhicUj3NEe/Fdy6TneIhdCFpXNO9SW9GXxfWz6tN88AG4YJfzwDDg4i3d/hXlbo5BkCQQDP3XAQOuOxIjQFQhtkbzmePJeUWCrDF/9/wCDjhZI/Qslap1Twc4jgAFlPSQN4PlPrEYAXBGl8ErWaYVv8KgX8',
        pubKey: 'CAASogEwgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAK3ZWsNyweiltshFXVqViO/+vSKnKHA8v/RMIpJK7TPlQlDEjkYQO+AOCj+NJ35cQZLet3RE6MOf4nSD5deKX15v+nS5xcCaDR3uHnq/RK5zm8CveEodGBuPplbcS9i+0z8FIHdjZRBQ0UYn6BZb0AIe06UPoL6slhyn8kae6+9rAgMBAAE='
    });

    const ip = await ipily()

    const node = await createLibp2p({
        addresses: {
        // add a listen address (localhost) to accept TCP connections on a random port
        listen: [`/ip6/${ip}/tcp/4002`]
        },
        
        peerId: myPeerId,
    
        transports: [new TCP()],

        connectionEncryption: [new Noise()],

        streamMuxers: [new Mplex()],

        pubsub: new FloodSub({
            emitSelf: true,
            enabled: true
            
        }),

        connectionManager: {
            autoDial: true
        }
    })
    
    const topic = 'peer-bradcast'

    // start libp2p
    await node.start()
    console.log('libp2p has started')

    // print out listening addresses
    node.getMultiaddrs().forEach((addr) => {
        console.log('listening on addresses:', addr.toString())
    })
    
    node.pubsub.subscribe(topic)

    node.connectionManager.addEventListener('peer:connect', (evt) => {
        const connection = evt.detail
        console.log(`connection to : ${connection.remotePeer.toString()}`)
    })

    node.pubsub.addEventListener('message', (evt) => {
        console.log('received:', toString(evt.detail.data))
    })
    

}

main().then().catch(console.error)

pleas let me know what I'm doing wrong thanks in advance.

Add message signing support

Go is looking to turn message signing on by default and rolled out to the daemon and IPFS. The change has already been done, but the rollout process is still underway. We need to get message signing support added to js pubsub. We can do this in two phases to split up the work:

  • 1. Add support for message signing and sign outgoing messages by default
  • 2. Verify the signatures of incoming messages (before they go through validators)

Ref: libp2p/go-libp2p-pubsub#179

Default value for self emit option

Our floodsub implementation was self-emitting, pubsub messages if the node has previously subscribed to that topic. However, users may not expect this to happen and this is not clear at the moment.

In this context, I created PRs to allow this to be configurable:

libp2p/js-libp2p-floodsub#85
ChainSafe/gossipsub-js#40

In this context, I opened this issue for discussing whether we should continue with this default behaviour in js-libp2p, or change this to false.

cc @jacobheun

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.