NorskInput.rtmpServer() method

Create an RTMP Server to receive RTMP streams into your application

Signature:

rtmpServer(settings: RtmpServerInputSettings): Promise<RtmpServerInputNode>;

Parameters

Parameter Type Description

settings

RtmpServerInputSettings

Configuration for the RTMP server

Returns:

Example [01_rtmp_to_webrtc.ts]

Subscribe to an RTMP source and generate local WebRTC output from it

const input = await norsk.input.rtmpServer({ id: "rtmpInput" }); 
const output = await norsk.output.whep({ id: "webrtc", ...webRtcServerConfig }); 

output.subscribe([{ source: input, sourceSelector: selectAV }]); 
console.log(`WebRTC Player URL: ${output.playerUrl}`);

Run the following command to generate example input at url rtmp://127.0.0.1:1935/norsk/high:

ffmpeg -v error -re -stream_loop -1 -i data/InkDrop.ts  -vcodec copy -codec copy -f flv 'rtmp://127.0.0.1:1935/norsk/high'

Example [03_rtmp_to_hls_passthrough.ts]

Package an inbound RTMP stream into LL-HLS

export async function main() {
  const norsk = await Norsk.connect();

  const input = await norsk.input.rtmpServer({ id: "rtmpInput" });

  // Receive an inbound stream and segment it as CMAF chunks for publication as HLS and DASH
  // Note that as this is passthrough we don't necessarily know the bitrate of the stream
  // for the HLS multi variant (master) playlist.  Here we just set them by hand in the CMAF Audio and CMAF
  // video segmenters.  Other examples show how you can measure bitrates and use that in the multi variant playlist.
  // If a transcode is happening (take a look at the various _to_ladder examples) then
  // each streams will have well known bitrates that automatically flow through the workflow
  // Note that from here on down, the code is identical to the code in rtmp_to_hls_passthrough
  // With Norsk you only need to describe the desired media flow - it takes care of the differences
  // between various input types.

  const audioOutput = await norsk.output.cmafAudio({ id: "audio", bitrate: 20_000, ...segmentSettings });
  const videoOutput = await norsk.output.cmafVideo({ id: "video", bitrate: 1_500_000, ...segmentSettings });
  const mvOutput = await norsk.output.cmafMultiVariant({ id: "multi-variant", playlistName: "multi-variant", destinations });

  mvOutput.subscribe([
    { source: audioOutput, sourceSelector: selectPlaylist },
    { source: videoOutput, sourceSelector: selectPlaylist },
  ]);

  audioOutput.subscribe([{ source: input, sourceSelector: selectAudio }]);
  videoOutput.subscribe([{ source: input, sourceSelector: selectVideo }]);

  console.log(`Multi variant playlist: ${mvOutput.url}`);
  audioOutput.url().then(logMediaPlaylist("audio"));
  videoOutput.url().then(logMediaPlaylist("video"));
}

const destinations: CmafDestinationSettings[] =
  [{ id: "local", type: "local", retentionPeriodSeconds: 10 }];

const segmentSettings: CmafOutputSettings = {
  partDurationSeconds: 1.0,
  segmentDurationSeconds: 4.0,
  destinations
};

function logMediaPlaylist(name: string): (url: string) => void {
  return (
    url => { console.log(`${name} playlistUrl: ${url}`); }
  );
}

Run the following command to generate example input at url rtmp://127.0.0.1:1935/norsk/default:

ffmpeg -v error -re -stream_loop -1 -i data/InkDrop.ts  -vcodec copy -codec copy -f flv 'rtmp://127.0.0.1:1935/norsk/default'

Example [08_pid_normalization.ts]

Receive an RTMP stream and package it in a transport stream with explicit PID mappings

const input = await norsk.input.rtmpServer({ id: "rtmpInput" });
const videoPidNormalizer = await norsk.processor.transform.streamKeyOverride(videoStreamKeyConfig);
const audioPidNormalizer = await norsk.processor.transform.streamKeyOverride(audioStreamKeyConfig);
const output1 = await norsk.output.whep({ id: "webrtc", ...webRtcServerConfig });
const output2 = await norsk.output.fileTs(tsFileOutputSettings);

videoPidNormalizer.subscribe([{ source: input, sourceSelector: selectVideo }]);
audioPidNormalizer.subscribe([{ source: input, sourceSelector: selectAudio }]);

const normalizedSources = [{ source: videoPidNormalizer, sourceSelector: selectVideo }, { source: audioPidNormalizer, sourceSelector: selectAudio }];
output1.subscribe(normalizedSources);
output2.subscribe(normalizedSources);

console.log(`WebRTC Player URL: ${output1.playerUrl}`);

Run the following command to generate example input at url rtmp://127.0.0.1:1935/norsk/high:

ffmpeg -v error -re -stream_loop -1 -i data/InkDrop.ts  -vcodec copy -codec copy -f flv 'rtmp://127.0.0.1:1935/norsk/high'

Example [17_rtmp_server.ts]

A more fully featured RTMP server showing use of callbacks for security etc

export async function main() {
  const norsk = await Norsk.connect();

  const input = await norsk.input.rtmpServer({
    id: "rtmp",

    onConnection: (_cid: string, app: string, url: string) => {
      console.log("Got RTMP connection", app, url);
      return { accept: true }; // accept all!!!
    },

    onStream: (
      _cid: string,
      app: string,
      url: string,
      streamId: number,
      publishingName: string
    ) => {
      if (!(publishingName in allowedRenditions)) {
        return {
          accept: false,
          reason: "only known rendition names are accepted around here",
        };
      }

      console.log("Got RTMP stream", app, url, streamId, publishingName);
      const onStream = async () => {
        const destinations: CmafDestinationSettings[] = [{ type: "local", retentionPeriodSeconds: 10, id: "local" }]
        // Register this app if we've not seen it before, and start up a multi variant playlist for it
        if (!knownApps[app]) {
          const settings: CmafMultiVariantOutputSettings = {
            id: "hls-multi-variant-" + app,
            playlistName: app,
            destinations,
          };
          const multiVariantPlaylist = await norsk.output.cmafMultiVariant(settings);
          knownApps[app] = { multiVariant: multiVariantPlaylist, sources: [], webrtc: [], videoInput: [], audioInput: [] };
          console.log(`HLS Multi Variant Playlist: ${multiVariantPlaylist.url}`);
        }
        // Create a single WebRTC output for this new stream
        const webRtcOutput = await norsk.output.whep({
          id: "webrtc-" + app + "-" + publishingName,
          ...webRtcServerConfig
        });
        webRtcOutput.subscribe([subscribeAV(input, app, publishingName)]);
        knownApps[app].webrtc.push(webRtcOutput);
        console.log(`WebRTC Player URL: ${webRtcOutput.playerUrl}`);

        // Create a single audio HLS output for this new stream
        const audioOutput = await norsk.output.cmafAudio({
          id: "hls-" + app + "-" + publishingName + "-audio",
          partDurationSeconds,
          segmentDurationSeconds,
          destinations
        });
        audioOutput.subscribe([subscribeAudio(input, app, publishingName)]);

        // Create a single video HLS output for this new stream
        const videoOutput = await norsk.output.cmafVideo({
          id: "hls-" + app + "-" + publishingName + "-video",
          partDurationSeconds,
          segmentDurationSeconds,
          destinations,
        });
        videoOutput.subscribe([subscribeVideo(input, app, publishingName)]);

        knownApps[app].videoInput.push(videoOutput)
        knownApps[app].audioInput.push(audioOutput)
        // Add this to the list of renditions we know about
        knownApps[app].sources.push(publishingName);

        // And re-subscribe the multi variant playlist to all of the known about renditions
        knownApps[app].multiVariant.subscribe(
          knownApps[app].videoInput.concat(knownApps[app].audioInput).map((r) => subscribePlaylists(r))
        );
      };
      onStream();

      return {
        accept: true,
        // These are in fact the defaults
        audioStreamKey: {
          programNumber: 1,
          streamId: 1,
          sourceName: app,
          renditionName: publishingName,
        },
        videoStreamKey: {
          programNumber: 1,
          streamId: 2,
          sourceName: app,
          renditionName: publishingName,
        },
      };
    },
  });
}

Run the following command to generate example input at url rtmp://127.0.0.1:1935/norsk/high:

ffmpeg -v error -re -stream_loop -1 -i data/InkDrop.ts  -vcodec copy -codec copy -f flv 'rtmp://127.0.0.1:1935/norsk/high'

Example [18_rtmp_mosaic.ts]

Create an RTMP server and dynamically mosaic any inbound streams into a tiled mosaic, publishing the result as Transport Stream-based HLS

export async function main() {
  const norsk = await Norsk.connect();

  const audioSignalInput = await norsk.input.audioSignal(audioInputSettings());

  const mosaic = new Mosaic(norsk, audioSignalInput);

  await mosaic.run();
}

class Mosaic {
  norsk: Norsk;
  audioSignalInput: AudioSignalGeneratorNode;
  rtmpInput: RtmpServerInputNode | undefined;
  compose: VideoComposeNode<string> | undefined = undefined;
  streams: string[] = [];
  composeCreatePending: boolean = false;

  constructor(norsk: Norsk, audioSignalInput: AudioSignalGeneratorNode) {
    this.norsk = norsk;
    this.audioSignalInput = audioSignalInput;
  }

  async run() {
    this.rtmpInput = await this.norsk.input.rtmpServer({
      id: "rtmp",
      port: 1935,
      onConnection: this.onConnection.bind(this),
      onStream: this.onStream.bind(this),
      onConnectionStatusChange: this.onConnectionStatusChange.bind(this),
    });
  }

  onConnection(_cid: string, app: string, _url: string) {
    if (app === "mosaic") {
      return { accept: true };
    } else {
      return { accept: false, reason: "App name must be mosaic" };
    }
  }

  onStream(
    _cid: string,
    _app: string,
    _url: string,
    _streamId: number,
    publishingName: string
  ): OnStreamResult {
    this.streams.push(publishingName);
    this.handleStreamChange();

    return {
      accept: true,
      videoStreamKey: {
        renditionName: "default",
        sourceName: publishingName,
      },
      audioStreamKey: {
        renditionName: "default",
        sourceName: publishingName,
      },
    };
  }

  onConnectionStatusChange(
    _cid: string,
    status: string,
    streamKeys: RtmpServerStreamKeys
  ) {
    if (status !== "disconnected") {
      // "I only know about one state";
      return;
    }
    for (const key of streamKeys) {
      const stream = key.videoStreamKey?.sourceName?.sourceName;
      this.streams = this.streams.filter((x) => x !== stream);
      console.log(`Stream disconnected: ${stream}`);
      this.handleStreamChange();
    }
  }

  handleStreamChange() {
    if (
      this.compose === undefined &&
      this.streams.length > 0 &&
      !this.composeCreatePending
    ) {
      this.composeCreatePending = true;
      this.norsk.processor.transform
        .videoCompose({
          id: "compose",
          referenceStream: this.streams[0],
          referenceResolution: { width: 100, height: 100 }, // make it % based
          outputResolution: { width: 1280, height: 720 },
          parts: createParts(this.streams),
        })
        .then(async (x) => {
          this.compose = x;
          this.compose?.subscribeToPins([
            {
              source: this.rtmpInput!,
              sourceSelector: (streamMetadata: StreamMetadata[]) => {
                const pins: PinToKey<string> = {};
                for (const stream of this.streams) {
                  pins[stream] = videoStreamKeys(streamMetadata).filter(
                    (x) => x?.sourceName == stream
                  );
                }
                return pins;
              },
            },
          ]);

          const encode = await this.norsk.processor.transform.videoEncode({
            id: "ladder1",
            rungs: [mkRung("high", 854, 480, 800)],
          });
          encode.subscribe([
            { source: this.compose, sourceSelector: videoStreamKeys },
          ]);

          const override =
            await this.norsk.processor.transform.streamKeyOverride({
              id: "normalise",
              streamKey: {
                programNumber: 1,
                sourceName: "output",
                renditionName: "high",
                streamId: 256,
              },
            });

          override.subscribe([
            { source: encode, sourceSelector: videoStreamKeys },
          ]);

          const output = await this.norsk.output.hlsTsVideo({
            id: "video",
            segmentDurationSeconds: 4.0,
            destinations: [{ type: "local", retentionPeriodSeconds: 60, id: "local" }],
          });
          output.subscribe([
            { source: override, sourceSelector: videoStreamKeys },
          ]);

          const rtcOutput = await this.norsk.output.whep({ id: "webrtc" });
          rtcOutput.subscribe([
            { source: override, sourceSelector: videoStreamKeys },
            { source: this.audioSignalInput, sourceSelector: audioStreamKeys },
          ]);

          let url = await output.url();
          console.log("Media playlist", `${url}`);
          console.log("WebRTC Player URL: " + rtcOutput.playerUrl);
        });
    } else if (this.compose != undefined && this.streams.length > 0) {
      this.compose?.updateConfig({ parts: createParts(this.streams) });
    } else if (this.streams.length > 0) {
      setInterval(this.handleStreamChange.bind(this), 500);
    }
  }
}

function createParts(streams: string[]) {
  const division = Math.ceil(Math.sqrt(streams.length));
  return streams.map((stream, i) => ({
    destRect: {
      width: 100 / division,
      height: 100 / division,
      x: (100 / division) * (i % division),
      y: (100 / division) * Math.floor(i / division),
    },
    opacity: 1.0,
    pin: stream,
    sourceRect: { x: 0, y: 0, width: 100, height: 100 },
    zIndex: 1,
  }));
}

function audioInputSettings(): AudioSignalGeneratorSettings {
  return {
    sourceName: "wave1",
    channelLayout: "stereo",
    sampleRate: 48000,
    sampleFormat: "s16p",
    wave: mkSine(220),
  };
}

function mkRung(
  name: string,
  width: number,
  height: number,
  bitrate: number
): VideoEncodeRung {
  return {
    name,
    width,
    height,
    frameRate: { frames: 25, seconds: 1 },
    codec: {
      type: "x264",
      bitrateMode: { value: bitrate, mode: "abr" },
      keyFrameIntervalMax: 50,
      keyFrameIntervalMin: 50,
      sceneCut: 0,
      bframes: 0,
      tune: "zerolatency",
    },
  };
}

Run the following commands together to generate example inputs at urls rtmp://127.0.0.1:1935/mosaic/source1, rtmp://127.0.0.1:1935/mosaic/source2:

ffmpeg -v error -re -stream_loop -1 -i data/InkDrop.ts  -vcodec copy -an -f flv 'rtmp://127.0.0.1:1935/mosaic/source1'
ffmpeg -v error -re -stream_loop -1 -i data/Weaving.ts  -vcodec copy -an -f flv 'rtmp://127.0.0.1:1935/mosaic/source2'

Find Examples