NorskInput.rtmpServer() method
Create an RTMP Server to receive RTMP streams into your application
Signature:
rtmpServer(settings: RtmpServerInputSettings): Promise<RtmpServerInputNode>;
Parameters
Parameter | Type | Description |
---|---|---|
settings |
Configuration for the RTMP server |
Returns:
Promise<RtmpServerInputNode>
Example [01_rtmp_to_webrtc.ts]
Subscribe to an RTMP source and generate local WebRTC output from it
const input = await norsk.input.rtmpServer({ id: "rtmpInput" });
const output = await norsk.output.whep({ id: "webrtc", ...webRtcServerConfig });
output.subscribe([{ source: input, sourceSelector: selectAV }]);
console.log(`WebRTC Player URL: ${output.playerUrl}`);
Run the following command to generate example input at url rtmp://127.0.0.1:1935/norsk/high
:
ffmpeg -v error -re -stream_loop -1 -i data/InkDrop.ts -vcodec copy -codec copy -f flv 'rtmp://127.0.0.1:1935/norsk/high'
Example [03_rtmp_to_hls_passthrough.ts]
Package an inbound RTMP stream into LL-HLS
export async function main() {
const norsk = await Norsk.connect();
const input = await norsk.input.rtmpServer({ id: "rtmpInput" });
// Receive an inbound stream and segment it as CMAF chunks for publication as HLS and DASH
// Note that as this is passthrough we don't necessarily know the bitrate of the stream
// for the HLS multi variant (master) playlist. Here we just set them by hand in the CMAF Audio and CMAF
// video segmenters. Other examples show how you can measure bitrates and use that in the multi variant playlist.
// If a transcode is happening (take a look at the various _to_ladder examples) then
// each streams will have well known bitrates that automatically flow through the workflow
// Note that from here on down, the code is identical to the code in rtmp_to_hls_passthrough
// With Norsk you only need to describe the desired media flow - it takes care of the differences
// between various input types.
const audioOutput = await norsk.output.cmafAudio({ id: "audio", bitrate: 20_000, ...segmentSettings });
const videoOutput = await norsk.output.cmafVideo({ id: "video", bitrate: 1_500_000, ...segmentSettings });
const mvOutput = await norsk.output.cmafMultiVariant({ id: "multi-variant", playlistName: "multi-variant", destinations });
mvOutput.subscribe([
{ source: audioOutput, sourceSelector: selectPlaylist },
{ source: videoOutput, sourceSelector: selectPlaylist },
]);
audioOutput.subscribe([{ source: input, sourceSelector: selectAudio }]);
videoOutput.subscribe([{ source: input, sourceSelector: selectVideo }]);
console.log(`Multi variant playlist: ${mvOutput.url}`);
audioOutput.url().then(logMediaPlaylist("audio"));
videoOutput.url().then(logMediaPlaylist("video"));
}
const destinations: CmafDestinationSettings[] =
[{ id: "local", type: "local", retentionPeriodSeconds: 10 }];
const segmentSettings: CmafOutputSettings = {
partDurationSeconds: 1.0,
segmentDurationSeconds: 4.0,
destinations
};
function logMediaPlaylist(name: string): (url: string) => void {
return (
url => { console.log(`${name} playlistUrl: ${url}`); }
);
}
Run the following command to generate example input at url rtmp://127.0.0.1:1935/norsk/default
:
ffmpeg -v error -re -stream_loop -1 -i data/InkDrop.ts -vcodec copy -codec copy -f flv 'rtmp://127.0.0.1:1935/norsk/default'
Example [08_pid_normalization.ts]
Receive an RTMP stream and package it in a transport stream with explicit PID mappings
const input = await norsk.input.rtmpServer({ id: "rtmpInput" });
const videoPidNormalizer = await norsk.processor.transform.streamKeyOverride(videoStreamKeyConfig);
const audioPidNormalizer = await norsk.processor.transform.streamKeyOverride(audioStreamKeyConfig);
const output1 = await norsk.output.whep({ id: "webrtc", ...webRtcServerConfig });
const output2 = await norsk.output.fileTs(tsFileOutputSettings);
videoPidNormalizer.subscribe([{ source: input, sourceSelector: selectVideo }]);
audioPidNormalizer.subscribe([{ source: input, sourceSelector: selectAudio }]);
const normalizedSources = [{ source: videoPidNormalizer, sourceSelector: selectVideo }, { source: audioPidNormalizer, sourceSelector: selectAudio }];
output1.subscribe(normalizedSources);
output2.subscribe(normalizedSources);
console.log(`WebRTC Player URL: ${output1.playerUrl}`);
Run the following command to generate example input at url rtmp://127.0.0.1:1935/norsk/high
:
ffmpeg -v error -re -stream_loop -1 -i data/InkDrop.ts -vcodec copy -codec copy -f flv 'rtmp://127.0.0.1:1935/norsk/high'
Example [17_rtmp_server.ts]
A more fully featured RTMP server showing use of callbacks for security etc
export async function main() {
const norsk = await Norsk.connect();
const input = await norsk.input.rtmpServer({
id: "rtmp",
onConnection: (_cid: string, app: string, url: string) => {
console.log("Got RTMP connection", app, url);
return { accept: true }; // accept all!!!
},
onStream: (
_cid: string,
app: string,
url: string,
streamId: number,
publishingName: string
) => {
if (!(publishingName in allowedRenditions)) {
return {
accept: false,
reason: "only known rendition names are accepted around here",
};
}
console.log("Got RTMP stream", app, url, streamId, publishingName);
const onStream = async () => {
const destinations: CmafDestinationSettings[] = [{ type: "local", retentionPeriodSeconds: 10, id: "local" }]
// Register this app if we've not seen it before, and start up a multi variant playlist for it
if (!knownApps[app]) {
const settings: CmafMultiVariantOutputSettings = {
id: "hls-multi-variant-" + app,
playlistName: app,
destinations,
};
const multiVariantPlaylist = await norsk.output.cmafMultiVariant(settings);
knownApps[app] = { multiVariant: multiVariantPlaylist, sources: [], webrtc: [], videoInput: [], audioInput: [] };
console.log(`HLS Multi Variant Playlist: ${multiVariantPlaylist.url}`);
}
// Create a single WebRTC output for this new stream
const webRtcOutput = await norsk.output.whep({
id: "webrtc-" + app + "-" + publishingName,
...webRtcServerConfig
});
webRtcOutput.subscribe([subscribeAV(input, app, publishingName)]);
knownApps[app].webrtc.push(webRtcOutput);
console.log(`WebRTC Player URL: ${webRtcOutput.playerUrl}`);
// Create a single audio HLS output for this new stream
const audioOutput = await norsk.output.cmafAudio({
id: "hls-" + app + "-" + publishingName + "-audio",
partDurationSeconds,
segmentDurationSeconds,
destinations
});
audioOutput.subscribe([subscribeAudio(input, app, publishingName)]);
// Create a single video HLS output for this new stream
const videoOutput = await norsk.output.cmafVideo({
id: "hls-" + app + "-" + publishingName + "-video",
partDurationSeconds,
segmentDurationSeconds,
destinations,
});
videoOutput.subscribe([subscribeVideo(input, app, publishingName)]);
knownApps[app].videoInput.push(videoOutput)
knownApps[app].audioInput.push(audioOutput)
// Add this to the list of renditions we know about
knownApps[app].sources.push(publishingName);
// And re-subscribe the multi variant playlist to all of the known about renditions
knownApps[app].multiVariant.subscribe(
knownApps[app].videoInput.concat(knownApps[app].audioInput).map((r) => subscribePlaylists(r))
);
};
onStream();
return {
accept: true,
// These are in fact the defaults
audioStreamKey: {
programNumber: 1,
streamId: 1,
sourceName: app,
renditionName: publishingName,
},
videoStreamKey: {
programNumber: 1,
streamId: 2,
sourceName: app,
renditionName: publishingName,
},
};
},
});
}
Run the following command to generate example input at url rtmp://127.0.0.1:1935/norsk/high
:
ffmpeg -v error -re -stream_loop -1 -i data/InkDrop.ts -vcodec copy -codec copy -f flv 'rtmp://127.0.0.1:1935/norsk/high'
Example [18_rtmp_mosaic.ts]
Create an RTMP server and dynamically mosaic any inbound streams into a tiled mosaic, publishing the result as Transport Stream-based HLS
export async function main() {
const norsk = await Norsk.connect();
const audioSignalInput = await norsk.input.audioSignal(audioInputSettings());
const mosaic = new Mosaic(norsk, audioSignalInput);
await mosaic.run();
}
class Mosaic {
norsk: Norsk;
audioSignalInput: AudioSignalGeneratorNode;
rtmpInput: RtmpServerInputNode | undefined;
compose: VideoComposeNode<string> | undefined = undefined;
streams: string[] = [];
composeCreatePending: boolean = false;
constructor(norsk: Norsk, audioSignalInput: AudioSignalGeneratorNode) {
this.norsk = norsk;
this.audioSignalInput = audioSignalInput;
}
async run() {
this.rtmpInput = await this.norsk.input.rtmpServer({
id: "rtmp",
port: 1935,
onConnection: this.onConnection.bind(this),
onStream: this.onStream.bind(this),
onConnectionStatusChange: this.onConnectionStatusChange.bind(this),
});
}
onConnection(_cid: string, app: string, _url: string) {
if (app === "mosaic") {
return { accept: true };
} else {
return { accept: false, reason: "App name must be mosaic" };
}
}
onStream(
_cid: string,
_app: string,
_url: string,
_streamId: number,
publishingName: string
): OnStreamResult {
this.streams.push(publishingName);
this.handleStreamChange();
return {
accept: true,
videoStreamKey: {
renditionName: "default",
sourceName: publishingName,
},
audioStreamKey: {
renditionName: "default",
sourceName: publishingName,
},
};
}
onConnectionStatusChange(
_cid: string,
status: string,
streamKeys: RtmpServerStreamKeys
) {
if (status !== "disconnected") {
// "I only know about one state";
return;
}
for (const key of streamKeys) {
const stream = key.videoStreamKey?.sourceName?.sourceName;
this.streams = this.streams.filter((x) => x !== stream);
console.log(`Stream disconnected: ${stream}`);
this.handleStreamChange();
}
}
handleStreamChange() {
if (
this.compose === undefined &&
this.streams.length > 0 &&
!this.composeCreatePending
) {
this.composeCreatePending = true;
this.norsk.processor.transform
.videoCompose({
id: "compose",
referenceStream: this.streams[0],
outputResolution: { width: 1280, height: 720 },
parts: createParts(this.streams),
})
.then(async (x) => {
this.compose = x;
this.compose?.subscribeToPins([
{
source: this.rtmpInput!,
sourceSelector: (streamMetadata: StreamMetadata[]) => {
const pins: PinToKey<string> = {};
for (const stream of this.streams) {
pins[stream] = videoStreamKeys(streamMetadata).filter(
(x) => x?.sourceName == stream
);
}
return pins;
},
},
]);
const encode = await this.norsk.processor.transform.videoEncode({
id: "ladder1",
rungs: [mkRung("high", 854, 480, 800)],
});
encode.subscribe([
{ source: this.compose, sourceSelector: videoStreamKeys },
]);
const override =
await this.norsk.processor.transform.streamKeyOverride({
id: "normalise",
streamKey: {
programNumber: 1,
sourceName: "output",
renditionName: "high",
streamId: 256,
},
});
override.subscribe([
{ source: encode, sourceSelector: videoStreamKeys },
]);
const output = await this.norsk.output.hlsTsVideo({
id: "video",
segmentDurationSeconds: 4.0,
destinations: [{ type: "local", retentionPeriodSeconds: 60, id: "local" }],
});
output.subscribe([
{ source: override, sourceSelector: videoStreamKeys },
]);
const rtcOutput = await this.norsk.output.whep({ id: "webrtc" });
rtcOutput.subscribe([
{ source: override, sourceSelector: videoStreamKeys },
{ source: this.audioSignalInput, sourceSelector: audioStreamKeys },
]);
const url = await output.url();
console.log("Media playlist", `${url}`);
console.log("WebRTC Player URL: " + rtcOutput.playerUrl);
});
} else if (this.compose != undefined && this.streams.length > 0) {
this.compose?.updateConfig({ parts: createParts(this.streams) });
} else if (this.streams.length > 0) {
setInterval(this.handleStreamChange.bind(this), 500);
}
}
}
function createParts(streams: string[]) {
const division = Math.ceil(Math.sqrt(streams.length));
return streams.map((stream, i) => ({
compose: VideoComposeDefaults.percentage({
sourceRect: { x: 0, y: 0, width: 100, height: 100 },
destRect: {
width: 100 / division,
height: 100 / division,
x: (100 / division) * (i % division),
y: (100 / division) * Math.floor(i / division),
},
}),
opacity: 1.0,
pin: stream,
zIndex: 1,
}));
}
function audioInputSettings(): AudioSignalGeneratorSettings {
return {
sourceName: "wave1",
channelLayout: "stereo",
sampleRate: 48000,
sampleFormat: "s16p",
wave: mkSine(220),
};
}
function mkRung(
name: string,
width: number,
height: number,
bitrate: number
): VideoEncodeRung {
return {
name,
width,
height,
frameRate: { frames: 25, seconds: 1 },
codec: {
type: "x264",
bitrateMode: { value: bitrate, mode: "abr" },
keyFrameIntervalMax: 50,
keyFrameIntervalMin: 50,
sceneCut: 0,
bframes: 0,
tune: "zerolatency",
},
};
}
Run the following commands together to generate example inputs at urls rtmp://127.0.0.1:1935/mosaic/source1
, rtmp://127.0.0.1:1935/mosaic/source2
:
ffmpeg -v error -re -stream_loop -1 -i data/InkDrop.ts -vcodec copy -an -f flv 'rtmp://127.0.0.1:1935/mosaic/source1'
ffmpeg -v error -re -stream_loop -1 -i data/Weaving.ts -vcodec copy -an -f flv 'rtmp://127.0.0.1:1935/mosaic/source2'
Find Examples
Search for examples using rtmpServer in our examples repo.