Skip to content

Commit a9985a1

Browse files
feat: audio video stream missing data detector (#30)
1 parent 40c47fd commit a9985a1

File tree

4 files changed

+198
-0
lines changed

4 files changed

+198
-0
lines changed

README.md

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,23 @@ const exampleIssue = {
235235
}
236236
```
237237

238+
239+
### MissingStreamDataDetector
240+
Detects issues with missing data in active inbound streams
241+
```ts
242+
const exampleIssue = {
243+
type: 'stream',
244+
reason: 'missing-video-stream-data' | 'missing-audio-stream-data',
245+
trackIdentifier: 'some-track-id',
246+
statsSample: {
247+
bytesReceivedDelta: 0, // always zero if issue detected
248+
bytesReceived: 2392384,
249+
trackDetached: false,
250+
trackEnded: false,
251+
},
252+
}
253+
```
254+
238255
## Roadmap
239256

240257
- [ ] Adaptive getStats() call interval based on last getStats() execution time

src/WebRTCIssueDetector.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import {
2727
} from './detectors';
2828
import { CompositeRTCStatsParser, RTCStatsParser } from './parser';
2929
import createLogger from './utils/logger';
30+
import MissingStreamDataDetector from './detectors/MissingStreamDataDetector';
3031

3132
class WebRTCIssueDetector {
3233
readonly eventEmitter: WebRTCIssueEmitter;
@@ -67,6 +68,7 @@ class WebRTCIssueDetector {
6768
new AvailableOutgoingBitrateIssueDetector(),
6869
new UnknownVideoDecoderImplementationDetector(),
6970
new FrozenVideoTrackDetector(),
71+
new MissingStreamDataDetector(),
7072
];
7173

7274
this.networkScoresCalculator = params.networkScoresCalculator ?? new DefaultNetworkScoresCalculator();
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
import {
2+
CommonParsedInboundStreamStats,
3+
IssueDetectorResult,
4+
IssuePayload,
5+
IssueReason,
6+
IssueType,
7+
WebRTCStatsParsed,
8+
} from '../types';
9+
import BaseIssueDetector from './BaseIssueDetector';
10+
11+
interface MissingStreamDetectorParams {
12+
timeoutMs?: number; // delay to report the issue no more often then once per specified timeout
13+
steps?: number; // number of last stats to check
14+
}
15+
16+
export default class MissingStreamDataDetector extends BaseIssueDetector {
17+
readonly #lastMarkedAt = new Map<string, number>();
18+
19+
readonly #timeoutMs: number;
20+
21+
readonly #steps: number;
22+
23+
constructor(params: MissingStreamDetectorParams = {}) {
24+
super();
25+
this.#timeoutMs = params.timeoutMs ?? 15_000;
26+
this.#steps = params.steps ?? 3;
27+
}
28+
29+
performDetection(data: WebRTCStatsParsed): IssueDetectorResult {
30+
const { connection: { id: connectionId } } = data;
31+
const issues = this.processData(data);
32+
this.setLastProcessedStats(connectionId, data);
33+
return issues;
34+
}
35+
36+
private processData(data: WebRTCStatsParsed): IssueDetectorResult {
37+
const issues: IssueDetectorResult = [];
38+
39+
const allLastProcessedStats = [...this.getAllLastProcessedStats(data.connection.id), data];
40+
if (allLastProcessedStats.length < this.#steps) {
41+
return issues;
42+
}
43+
44+
const lastNProcessedStats = allLastProcessedStats.slice(-this.#steps);
45+
46+
const lastNVideoInbound = lastNProcessedStats.map((stats) => stats.video.inbound);
47+
const lastNAudioInbound = lastNProcessedStats.map((stats) => stats.audio.inbound);
48+
49+
issues.push(...this.detectMissingData(
50+
lastNAudioInbound as unknown as CommonParsedInboundStreamStats[][],
51+
IssueType.Stream,
52+
IssueReason.MissingAudioStreamData,
53+
));
54+
55+
issues.push(...this.detectMissingData(
56+
lastNVideoInbound,
57+
IssueType.Stream,
58+
IssueReason.MissingVideoStreamData,
59+
));
60+
61+
const unvisitedTrackIds = new Set(this.#lastMarkedAt.keys());
62+
63+
unvisitedTrackIds.forEach((trackId) => {
64+
const lastMarkedAt = this.#lastMarkedAt.get(trackId);
65+
if (lastMarkedAt && Date.now() - lastMarkedAt > this.#timeoutMs) {
66+
this.removeMarkedIssue(trackId);
67+
}
68+
});
69+
70+
return issues;
71+
}
72+
73+
private detectMissingData(
74+
lastNInboundStats: CommonParsedInboundStreamStats[][],
75+
type: IssueType,
76+
reason: IssueReason,
77+
): IssueDetectorResult {
78+
const issues: IssuePayload[] = [];
79+
80+
const currentInboundStats = lastNInboundStats.pop()!;
81+
const prevInboundItemsByTrackId = MissingStreamDataDetector.mapStatsByTrackId(lastNInboundStats);
82+
83+
currentInboundStats.forEach((inboundItem) => {
84+
const trackId = inboundItem.track.trackIdentifier;
85+
86+
const prevInboundItems = prevInboundItemsByTrackId.get(trackId);
87+
88+
if (!Array.isArray(prevInboundItems) || prevInboundItems.length === 0) {
89+
return;
90+
}
91+
92+
if (inboundItem.track.detached || inboundItem.track.ended) {
93+
return;
94+
}
95+
96+
if (!MissingStreamDataDetector.isAllBytesReceivedDidntChange(inboundItem.bytesReceived, prevInboundItems)) {
97+
this.removeMarkedIssue(trackId);
98+
return;
99+
}
100+
101+
const issueMarked = this.markIssue(trackId);
102+
103+
if (!issueMarked) {
104+
return;
105+
}
106+
107+
const statsSample = {
108+
bytesReceived: inboundItem.bytesReceived,
109+
};
110+
111+
issues.push({
112+
type,
113+
reason,
114+
statsSample,
115+
trackIdentifier: trackId,
116+
});
117+
});
118+
119+
return issues;
120+
}
121+
122+
private static mapStatsByTrackId(
123+
items: CommonParsedInboundStreamStats[][],
124+
): Map<string, CommonParsedInboundStreamStats[]> {
125+
const statsById = new Map<string, CommonParsedInboundStreamStats[]>();
126+
items.forEach((inboundItems) => {
127+
inboundItems.forEach((inbountItem) => {
128+
const accumulatedItems = statsById.get(inbountItem.track.trackIdentifier) || [];
129+
accumulatedItems.push(inbountItem);
130+
statsById.set(inbountItem.track.trackIdentifier, accumulatedItems);
131+
});
132+
});
133+
134+
return statsById;
135+
}
136+
137+
private static isAllBytesReceivedDidntChange(
138+
bytesReceived: number, inboundItems: CommonParsedInboundStreamStats[],
139+
): boolean {
140+
for (let i = 0; i < inboundItems.length; i += 1) {
141+
const inboundItem = inboundItems[i];
142+
if (inboundItem.bytesReceived !== bytesReceived) {
143+
return false;
144+
}
145+
}
146+
147+
return true;
148+
}
149+
150+
private markIssue(trackId: string): boolean {
151+
const now = Date.now();
152+
const lastMarkedAt = this.#lastMarkedAt.get(trackId);
153+
154+
if (!lastMarkedAt || now - lastMarkedAt > this.#timeoutMs) {
155+
this.#lastMarkedAt.set(trackId, now);
156+
return true;
157+
}
158+
159+
return false;
160+
}
161+
162+
private removeMarkedIssue(trackId: string): void {
163+
this.#lastMarkedAt.delete(trackId);
164+
}
165+
}

src/types.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ export enum IssueReason {
8383
LowInboundMOS = 'low-inbound-mean-opinion-score',
8484
LowOutboundMOS = 'low-outbound-mean-opinion-score',
8585
FrozenVideoTrack = 'frozen-video-track',
86+
MissingVideoStreamData = 'missing-video-stream-data',
87+
MissingAudioStreamData = 'missing-audio-stream-data',
8688
}
8789

8890
export type IssuePayload = {
@@ -433,3 +435,15 @@ export interface Logger {
433435
warn: (msg: any, ...meta: any[]) => void;
434436
error: (msg: any, ...meta: any[]) => void;
435437
}
438+
439+
type CommonKeys<T, U> = Extract<keyof T, keyof U>;
440+
441+
type CommonFields<T, U> = {
442+
[K in CommonKeys<T, U>]: T[K] extends object
443+
? U[K] extends object
444+
? CommonFields<T[K], U[K]> // Recursively check nested objects
445+
: never
446+
: T[K];
447+
};
448+
449+
export type CommonParsedInboundStreamStats = CommonFields<ParsedInboundVideoStreamStats, ParsedInboundAudioStreamStats>;

0 commit comments

Comments
 (0)