Skip to content

Commit edb5f91

Browse files
committed
feat(types): Support custom events on PubSub and PeerStreams
1 parent e4f603f commit edb5f91

File tree

3 files changed

+11
-11
lines changed

3 files changed

+11
-11
lines changed

packages/interface/src/pubsub.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ export interface PubSubRPC {
6666
messages: PubSubRPCMessage[]
6767
}
6868

69-
export interface PeerStreams extends TypedEventTarget<PeerStreamEvents> {
69+
export interface PeerStreams<PeerEvents extends PeerStreamEvents = PeerStreamEvents> extends TypedEventTarget<PeerEvents> {
7070
id: PeerId
7171
protocol: string
7272
outboundStream?: Pushable<Uint8ArrayList>

packages/pubsub/src/index.ts

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ import {
4040
verifySignature
4141
} from './sign.js'
4242
import { toMessage, ensureArray, noSignMsgId, msgId, toRpcMessage, randomSeqno } from './utils.js'
43-
import type { PubSub, Message, StrictNoSign, StrictSign, PubSubInit, PubSubEvents, PeerStreams, PubSubRPCMessage, PubSubRPC, PubSubRPCSubscription, SubscriptionChangeData, PublishResult, TopicValidatorFn, ComponentLogger, Logger, Connection, PeerId, PrivateKey, IncomingStreamData } from '@libp2p/interface'
43+
import type { PubSub, Message, StrictNoSign, StrictSign, PubSubInit, PubSubEvents, PeerStreams, PubSubRPCMessage, PubSubRPC, PubSubRPCSubscription, SubscriptionChangeData, PublishResult, TopicValidatorFn, ComponentLogger, Logger, Connection, PeerId, PrivateKey, IncomingStreamData, PeerStreamEvents } from '@libp2p/interface'
4444
import type { Registrar } from '@libp2p/interface-internal'
4545
import type { Uint8ArrayList } from 'uint8arraylist'
4646

@@ -55,7 +55,7 @@ export interface PubSubComponents {
5555
* PubSubBaseProtocol handles the peers and connections logic for pubsub routers
5656
* and specifies the API that pubsub routers should have.
5757
*/
58-
export abstract class PubSubBaseProtocol<Events extends Record<string, any> = PubSubEvents> extends TypedEventEmitter<Events> implements PubSub<Events> {
58+
export abstract class PubSubBaseProtocol<Events extends Record<string, any> = PubSubEvents, PeerEvents extends PeerStreamEvents = PeerStreamEvents> extends TypedEventEmitter<Events> implements PubSub<Events> {
5959
protected log: Logger
6060

6161
public started: boolean
@@ -70,7 +70,7 @@ export abstract class PubSubBaseProtocol<Events extends Record<string, any> = Pu
7070
/**
7171
* Map of peer streams
7272
*/
73-
public peers: PeerMap<PeerStreams>
73+
public peers: PeerMap<PeerStreams<PeerEvents>>
7474
/**
7575
* The signature policy to follow by default
7676
*/
@@ -119,7 +119,7 @@ export abstract class PubSubBaseProtocol<Events extends Record<string, any> = Pu
119119
this.started = false
120120
this.topics = new Map()
121121
this.subscriptions = new Set()
122-
this.peers = new PeerMap<PeerStreams>()
122+
this.peers = new PeerMap<PeerStreams<PeerEvents>>()
123123
this.globalSignaturePolicy = globalSignaturePolicy === 'StrictNoSign' ? 'StrictNoSign' : 'StrictSign'
124124
this.canRelayMessage = canRelayMessage
125125
this.emitSelf = emitSelf
@@ -268,7 +268,7 @@ export abstract class PubSubBaseProtocol<Events extends Record<string, any> = Pu
268268
/**
269269
* Notifies the router that a peer has been connected
270270
*/
271-
addPeer (peerId: PeerId, protocol: string): PeerStreams {
271+
addPeer (peerId: PeerId, protocol: string): PeerStreams<PeerEvents> {
272272
const existing = this.peers.get(peerId)
273273

274274
// If peer streams already exists, do nothing
@@ -279,7 +279,7 @@ export abstract class PubSubBaseProtocol<Events extends Record<string, any> = Pu
279279
// else create a new peer streams
280280
this.log('new peer %p', peerId)
281281

282-
const peerStreams: PeerStreams = new PeerStreamsImpl(this.components, {
282+
const peerStreams: PeerStreams<PeerEvents> = new PeerStreamsImpl<PeerEvents>(this.components, {
283283
id: peerId,
284284
protocol
285285
})
@@ -295,7 +295,7 @@ export abstract class PubSubBaseProtocol<Events extends Record<string, any> = Pu
295295
/**
296296
* Notifies the router that a peer has been disconnected
297297
*/
298-
protected _removePeer (peerId: PeerId): PeerStreams | undefined {
298+
protected _removePeer (peerId: PeerId): PeerStreams<PeerEvents> | undefined {
299299
const peerStreams = this.peers.get(peerId)
300300
if (peerStreams == null) {
301301
return
@@ -321,7 +321,7 @@ export abstract class PubSubBaseProtocol<Events extends Record<string, any> = Pu
321321
/**
322322
* Responsible for processing each RPC message received by other peers.
323323
*/
324-
async processMessages (peerId: PeerId, stream: AsyncIterable<Uint8ArrayList>, peerStreams: PeerStreams): Promise<void> {
324+
async processMessages (peerId: PeerId, stream: AsyncIterable<Uint8ArrayList>, peerStreams: PeerStreams<PeerEvents>): Promise<void> {
325325
try {
326326
await pipe(
327327
stream,
@@ -369,7 +369,7 @@ export abstract class PubSubBaseProtocol<Events extends Record<string, any> = Pu
369369
/**
370370
* Handles an rpc request from a peer
371371
*/
372-
async processRpc (from: PeerId, peerStreams: PeerStreams, rpc: PubSubRPC): Promise<boolean> {
372+
async processRpc (from: PeerId, peerStreams: PeerStreams<PeerEvents>, rpc: PubSubRPC): Promise<boolean> {
373373
if (!this.acceptFrom(from)) {
374374
this.log('received message from unacceptable peer %p', from)
375375
return false

packages/pubsub/src/peer-streams.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ export interface DecoderOptions extends LpDecoderOptions {
2424
/**
2525
* Thin wrapper around a peer's inbound / outbound pubsub streams
2626
*/
27-
export class PeerStreams extends TypedEventEmitter<PeerStreamEvents> {
27+
export class PeerStreams<PeerEvents extends PeerStreamEvents = PeerStreamEvents> extends TypedEventEmitter<PeerEvents> {
2828
public readonly id: PeerId
2929
public readonly protocol: string
3030
/**

0 commit comments

Comments
 (0)