개발일지

[WebRTC] mediasoup으로 SFU 구현하기 본문

NestJS, Node.js/#03 WebRTC

[WebRTC] mediasoup으로 SFU 구현하기

lyjin 2024. 4. 11.

개요

구현하고자 하는 화상 통신 프로젝트의 특징은 1:N 그리고 단방향 송출 이라는 것이다. 따라서 가장 적합하다고 생각하는 SFU 아키텍처를 선택했고 이제 이걸 어떻게 구현할 지 고민해봐야했다. SFU를 지원하는 라이브러리로 OpenVidu, Kurento, Mediasoup가 있었는데 그 중 Mediasoup를 선택했다. 이유는 가장 가벼우면서 공부하기에도 적합하다고 생각했기때문이다. OpenVidu와 Kurento는 구현이 쉽고 다양한 기능을 탑재하고 있긴 하지만 어차피 나는 기본 기능만 구현 하면 됐기 때문에 불필요하다고 생각했다.


이번 시간에는 mediasoup 개념과 통신과정을 알아보고 이를 실제로 구현해보고자 한다.

 


mediasoup 용어 정리

Producer, Consumer

  • mediasoup에서는 Offer, Answer 대신 Producer, Consumer라는 용어를 사용한다. Producer는 미디어를 생산하고 보내는 역할을 하고 Consumer는 이러한 미디어를 받는 역할을 한다.

 

Worker

  • 실제 작업을 처리하는 프로세스
  • 여러 Worker가 클러스터링되어 대규모 미디어 처리를 지원할 수 있다.
  • Worker는 CPU 코어를 활용하여 병렬 처리를 수행한다.

 

Router

  • 미디어 스트림을 라우팅하는 역할을 한다.
  • Worker에 연결되어 미디어 스트림을 받아들이고 처리한다.
  • Router로부터 생성된 Transport를 통해 미디어 스트림을 주입하고 선택 및 전송한다.

 

Transport

  • 미디어 스트림을 전송하는 역할을 한다.
  • 클라이언트의 통신을 담당하는 소켓 및 연결을 관리한다.
  • Producer, Consumer는 Transport를 통해 미디어 스트림을 전송하고 수신한다.

 

mediasoup 통신 과정

출처) https://www.youtube.com/watch?v=DOe7GkQgwPo

 

Device 로드

  1. 클라이언트는 서버 측 router의 RTP capabilities 달라고 요청한다.
  2. 얻어온 RTP capabilities로 device를 로드한다. - device.load()

 

 

Transports 생성

미디어 송신(send) 또는 수신(receive)을 위해서는 별도의 WebRTC transports가 필요하다. produce 또는 consume 하기 전 transport를 생성한다.

 

미디어 송신

  1. 서버 측 router로부터 transport를 생성한다. - router.createWebRtcTransport()
  2. 클라이언트는 서버의 transport를 복제하여 send transport를 생성한다. - device.createSendTransport()
  3. 이때 transport는 ‘connect’, ‘produce’ 이벤트를 subscribe 하고 있어야한다.

 

미디어 수신

  1. 서버 측 router로부터 transport를 생성한다. - router.createWebRtcTransport()
  2. 클라이언트는 서버의 transport를 복제하여 receive transport를 생성한다. - device.createRecvTransport()
  3. 이때 transport는 ‘connect’, 이벤트를 subscribe 하고 있어야한다.

 

 

미디어 Produce

send transport가 생성되면 클라이언트 애플리케이션은 오디오 및 비디오 트랙을 produce 할 수 있다.

  1. 클라이언트는 필요한 트랙을 얻어온다. - getUserMedia()
  2. send transport로 produce()를 호출한다. - tansport.produce()
    1. 앞서 subscribe 한 ‘connect’ 이벤트 발생 → 서버 측 ‘connect’ 를 emit 한다. (클-서 연결)
    2. 앞서 subscribe 한 ‘produce’ 이벤트 발생 → 서버 측 ‘produce’ 를 emit 한다. → 서버는 producer를 생성하고 인스턴스 반환
    3. produce() 호출 및 클라이언트 producer 생성 완료

 

 

미디어 Consume

receive transport가 생성되면 클라이언트 애플리케이션은 transport가 가지고 있는 트랙들을 사용할 수 있다. 단, Produce 과정과는 다르게 서버에서부터 시작된다.

  1. 클라이언트는 device.rtpCapabilities를 서버로 보낸다. (원격 스트림 사용하기 위해)
  2. 클라이언트는 원격 장치가 송신할 producer를 consume 할 수 있는지 확인하고 consumer를 생성한다. - router.canConsume()
  3. 서버는 생성한 consumer 를 포함해, 클라이언트 측 ‘consume’ 이벤트를 emit 한다.
  4. 클라이언트는 receive transport로 consume()을 호출한다. - tansport.consume()
    1. 앞서 subscribe 한 ‘connect’ 이벤트 발생 → 서버 측 ‘connect’ 를 emit 한다. (클-서 연결)
    2. consume() 호출 및 클라이언트 consumer 생성 완료

 


구현 - 시나리오

응시자 → 영상 수신만 가능, produce만 하면 됨

  1. device 로드
  2. send transport 생성
  3. 미디어 produce
    1. 해당 응시자 producer 생성
    2. 주최자에게 “새로운 응시자가 들어왔음”을 알림 (생성된 producer를 포함하여 broadcast)
    3. → 주최자는 해당 응시자 consume

주최자 → 영상 송신만 가능, consume만 하면 됨

  1. “새로운 응시자가 들어왔음” 이벤트가 호출되었을 때
    1. device 로드
    2. receive transport 생성
    3. 미디어 consume
  2. 접속했는데 이미 들어와있는 응시자가 존재할 경우
    1. 응시자들.forEach(응시자 ⇒ consume) → 1번 과정과 동일

 


구현 - 실제 코드

“mediasoup 통신 과정”에 따라 코드를 작성해보고자 한다. 실제 회사 코드와는 차이가 있으며 소켓 연결에 필요한 핵심 메서드만 가져왔다. 여기서 ‘return-이벤트’ 형식을 갖는 이벤트들은 호출된 이벤트에 대한 응답을 나타내는 이벤트이다.

 

 

서버 Worker, Router 생성

먼저 서버에서 웹소켓 통신을 하기 위해서는 WorkerRouter를 생성해줘야한다.

// server

let worker = await mediasoup.createWorker({
  rtcMinPort: 40000,
  rtcMaxPort: 60000,
});

worker.on('died', (err) => console.error(`mediasoup worker 생성 실패: ${err}`));
  • rtcMinPort, rtcMaxPort: 사용할 RTC Port 범위, 기본 값은 10000 ~ 59999 이다.

 

// server

let router;

worker
  .createRouter({
    mediaCodecs: [
      {
        kind: 'video',
        mimeType: 'video/VP8',
        clockRate: 90000,
        parameters: {},
      },
    ],
  })
  .then((router) => (router = router))
  .catch((err) => console.error(`mediasoup router 생성 실패: ${err}`));
  • mediaCodecs: 사용할 미디어 코덱을 정의한다. 그밖의 옵션과 mediasoup에서 지원하는 코덱 종류는 여기서 확인할 수 있다.

 


방 입장하기

응시자인지 주최자인지에 따라 시나리오가 달라진다. 따라서 클라이언트는 주최자 여부를 판별하는 isHost 변수를 포함하여 ‘join’ 이벤트를 호출하도록 작성 했다. 서버는 isHost 변수와 함께 현재 참여 중인 응시자 (existTesters)를 같이 반환 한다. 만약 소켓 주체가 주최자이며 이미 참여 중인 응시자가 존재할 경우(isHost == true && existTesters != null), 클라이언트는 “응시자들.forEach(응시자 ⇒ consume)” 로직을 수행한다.

// client

await socket.request('join', { isHost });

 

// server

socket.emit('return-join', ({ isHost }) => {
 // logic...
 
 return {
  isHost,
  existTesters: existTesters ?? [], 
 }
});

 


응시자일 경우 (Produce)

Device 로드

1. 클라이언트는 서버 측 router의 RTP capabilities 달라고 요청한다.

2. 얻어온 RTP capabilities로 device를 로드한다. - device.load()

// client

socket.on('return-join', async (data) => {
  // logic...

  await socket.request('get-router-rtp-capabilities'); // 1
});

socket.on('return-get-router-rtp-capabilities', async ({ rtpCapabilities }) => {
  device = new mediasoupClient.Device();
  await device.load({ routerRtpCapabilities }); // 2
}

 

// server

socket.on('return-get-router-rtp-capabilities', () => {
  return router.rtpCapabilities;
});

 


Transports 생성 - 미디어 송신

1. 서버 측 router로부터 transport를 생성한다. - router.createWebRtcTransport()

// client
await socket.request('create-webrtc-transport');

// server
socket.on('create-webrtc-transport', async () => {
  const transport = await createTransport();

  const { id, iceParameters, iceCandidates, dtlsParameters, sctpParameters } =
      transport;

  socket.emit('return-create-webrtc-transport', {
    id,
    iceParameters,
    iceCandidates,
    dtlsParameters,
    sctpParameters,
  });
});

async function createTransport(): Promise<WebRtcTransport> {
  return await router.createWebRtcTransport({
    listenIps: [
      {
        ip: '127.0.0.1',
        announcedIp: null,
      },
    ],
    enableUdp: true,
    enableTcp: true,
    preferUdp: true,
  });
}
  • listenIps: WebRTC transport가 수신할 IP 주소
    • ip: 실제 서버에서 수신될 IP 주소
    • announcedIp: 외부에서 접근할 수 있는 IP 주소, null일 경우 ip와 동일한 주소를 갖는다.

해당 코드는 로컬 환경 기준으로, 실제 상용화 서버에서는 그에 맞는 IP 주소로 변경해줘야한다.

 


 

2. 클라이언트는 서버의 transport를 복제하여 send transport를 생성한다. - device.createSendTransport()

3. 이때 transport는 ‘connect’, ‘produce’ 이벤트를 subscribe 하고 있어야한다.

// client

let producerTransport;

socket.on('return-create-webrtc-transport', async (transportOptions) => {
  producerTransport = device.createSendTransport(transportOptions); // 2

  // 3
  producerTransport.on('connect', async ({ dtlsParameters }, cb, eb) => {
    // logic... 이후 단계에서 작성
  });
  
  producerTransport.on(
    'produce',
    async ({ kind, rtpParameters, appData }, cb, eb) => {
      // logic... 이후 단계에서 작성
    }
  );
});

 


미디어 Produce

1. 클라이언트는 필요한 트랙을 얻어온다. - getUserMedia()

// client

let myStream = await navigator.mediaDevices.getUserMedia();

 


2. send transport로 produce()를 호출한다. - tansport.produce()

 

produce()를 호출하면 구독 중인 transport의 ‘connect’ 및 ‘produce’ 이벤트가 발생될 것이다.

// client

async function connectSendTransport() {
  const track = myStream.getVideoTracks()[0];
  const params = { track };

  const producer = await producerTransport.produce(params); // 2
}

 


2-1. 앞서 subscribe 한 ‘connect’ 이벤트 발생 → 서버 측 ‘connect’ 를 emit 한다. (클-서 연결)

// client

socket.on('return-create-webrtc-transport', async (transportOptions) => {
  producerTransport = device.createSendTransport(transportOptions);

  producerTransport.on('connect', async ({ dtlsParameters }, cb, eb) => {
    socket
      .emit('transport-connect', {
        transportId: producerTransport.id,
        dtlsParameters,
      })
      .then(() => cb())
      .catch((err) => eb(err));
  });
  
  producerTransport.on(
    'produce',
    async ({ kind, rtpParameters, appData }, cb, eb) => {
      // logic...
    }
  );
});

 

여기서 ({ dtlsParameters }, cb, eb) 매개 변수는 ‘connect’에서 반환되는 값들이다. 이 중 cb(callback) 은 특정 값들이 서버 측에 성공적으로 전달 됐을 때 실행되어야할 함수이다.

 

// server

socket.on('transport-connect', async ({ transportId, dtlsParameters }) => {
  await transport.connect({ dtlsParameters });
});

 

서버 측에서는  webRtcTransport.connect() 를 호출해야한다.

 


2-2. 앞서 subscribe 한 ‘produce’ 이벤트 발생 → 서버 측 ‘produce’ 를 emit 한다. → 서버는 producer를 생성하고 인스턴스 반환

// client

socket.on('return-create-webrtc-transport', async (transportOptions) => {
  producerTransport = device.createSendTransport(transportOptions);

  producerTransport.on('connect', async ({ dtlsParameters }, cb, eb) => {
    // logic...
  });
  
  producerTransport.on(
    'produce',
    async ({ kind, rtpParameters, appData }, cb, eb) => {
      socket
        .emit('transport-produce', {
          transportId: producerTransport.id,
          kind,
          rtpParameters,
          appData,
        });
        // .then(({ id }) => cb({ id }))
        // .catch((err) => eb(err));
    }
  );
});

 

// server

socket.on(
  'transport-produce',
  async ({ kind, rtpParameters }) => {
    const producer = await transport.produce({
      kind,
      rtpParameters,
    });

    // “새로운 응시자가 들어왔음”을 알림
    broadcastHost(socket, 'new-producer', { id: producer.id });
  }
);

 

서버 측에서는 transport.produce() 를 호출해야한다. **transport.produce()**는 router에 오디오 또는 비디오 RTC를 수신하도록 지시하는 함수로 해당 함수를 통해 미디어를 주입한다.

 

새로운 응시자가 들어왔으므로 이를 주최자들에게 알린다.

 


2-3. produce() 호출 및 클라이언트 producer 생성 완료

 

‘connect’ 및 ‘produce’ 가 완료되면 transport.produce() 함수가 종료되고 생성된 Producer를 반환한다.

const producer = await producerTransport.produce(params); // 실행 완료!

 


주최자일 경우 (Consume)

주최자 시나리오가 발생되는 경우는 다음과 같이 두 가지이다.

  1. “새로운 응시자가 들어왔음” 이벤트가 호출되었을 때
  2. 접속했는데 이미 들어와있는 응시자가 존재할 경우

이 중 1번 시나리오를 예시로 들고자 한다. 앞서 응시자의 producer 생성까지 완료 되었을 때 새로운 응시자가 들어왔음을 알리는 이벤트, 'new-producer’를 브로드캐스트 했었다. 클라이언트에서는 이 ’new-producer’이벤트가 호출됐을 때 consume 과정이 시작된다.

2번 시나리오도 간단하다. 방에 입장 했을 때 이미 참여 중인 응시자가 존재할 경우 모든 응시자들에 대해 동일한 과정을 수행하면 된다.

 

 

Transports 생성 - 미디어 수신

device를 load 하고 transport를 생성하는 과정까지는 Produce 과정과 동일하다. 다만 다른 점은 device.createRecvTransport() 메서드를 사용한다는 점, transport.on('connect') 이벤트만 subscribe 하고 있다는 점이다.

// client

let remoteProducerId;

socket.on('new-producer', async ({ id }) => {
  remoteProducerId = id;

  await socket.request('create-webrtc-transport');
});

 

let consumerTransport;

socket.on('return-create-webrtc-transport', async (transportOptions) => {
  consumerTransport = device.createRecvTransport(transportOptions);

  consumerTransport.on('connect', async ({ dtlsParameters }, cb, eb) => {
    socket
      .emit('transport-connect', {
        transportId: consumerTransport.id,
        dtlsParameters,
      })
      .then(() => cb())
      .catch((err) => eb(err));
  });
});

 


미디어 Consume

1. 클라이언트는 device.rtpCapabilities를 서버로 보낸다. (원격 스트림 사용하기 위해)

//client

await socket.request('transport-consume', {
  rtpCapabilities: device.rtpCapabilities,
  producerId: remoteProducerId,      // consume할 producer id
  transportId: consumerTransport.id,
});

 


 

2. 클라이언트는 원격 장치가 송신할 producer를 consume 할 수 있는지 확인하고 consumer를 생성한다. - router.canConsume()

3. 서버는 생성한 consumer 를 포함해, 클라이언트 측 ‘consume’ 이벤트를 emit 한다.

// server

socket.on(
  'transport-consume',
  async ({ rtpCapabilities, producerId, transportId }) => {
    if (!router.canConsume({ producerId, rtpCapabilities })) {
      console.error('cant consume');
      return;
    }
    
    const consumer = await transport.consume({
      producerId,
      rtpCapabilities,
      paused: true,  // true일 경우 resume() 필요
    });
    
    const { id, kind, rtpParameters, type, producerPaused } = consumer;

    socket.emit('return-transport-consume', {
      consumer,
      params: { id, kind, rtpParameters, type, producerId, producerPaused },
    });
  }
);

 


4. 클라이언트는 receive transport로 consume()을 호출한다. - tansport.consume()

// client

socket.on('return-transport-consume', async (data) => {
  const { params } = data;

  const consumer = await consumerTransport.consume(params);

  // logic...
});

 

 

4-1. 앞서 subscribe 한 ‘connect’ 이벤트 발생 → 서버 측 ‘connect’ 를 emit 한다. (클-서 연결)

    → 앞에서 설명했으므로 생략

 


4-2. consume() 호출 및 클라이언트 consumer 생성 완료

 

생성된 Consumer로부터 미디어 트랙을 얻을 수 있다. 원격 비디오 트랙을 video element에 렌더링 해주자.

// client

socket.on('return-transport-consume', async (data) => {
  const { params } = data;

  const consumer = await consumerTransport.consume(params);

  const { track } = consumer;
  
  videoElem.srcObject = new MediaStream([ track ]);
});

 

마지막으로 해당 consumer를 resume 해줘야한다. 왜냐하면 서버에서 consumer 생성할 때 옵션으로 paused: true 값을 줬기 때문이다. paused 값이 true일 경우 consumer 생성 시 미디어 스트림 재생을 일시중지 한다. (공식문서에서 안정성 및 최적화를 위해 paused 된 상태로 생성하는 것을 권장하고 있다. 🔗참고)

 

렌더링까지 완료 했으면 resume() 함수를 호출하여 일시정지를 풀어주자.

// client
socket.on('return-transport-consume', async (data) => {
  // ... 렌더링 완료
  await socket.request('resume', { consumerId: consumer.id });
});

// server
socket.on('resume', async ({ consumerId }) => {
  const consumer = getConsumer(consumerId);
  await consumer.resume();
});

 


방 나가기

마지막으로 방에서 나갔을 때, 즉 소켓 연결을 끊기 위한 처리 과정이 필요하다. 이를 위해서는 공식 문서 🔗Communicating Actions and Events를 먼저 읽어보는 것이 좋다.

 

어떤 transport가 닫히면 연결된 producer 또는 consumer가 닫힌다. producer가 닫히면 연관된 consumer가 닫힌다. ‘transportclose’, ‘producerclose’ 등 이벤트를 트리거 하여 원하는대로 핸들링 할 수 있다.

 

producer 또는 consumer를 생성할 때 원하는 이벤트를 subscribe 해주자.

// server

let consumers = new Map();

socket.on(
  'transport-produce',
  async ({ kind, rtpParameters }) => {
    const producer = await transport.produce({...});
    
    producer.on('transportclose', () => {
      console.log('producer transport close');
    });
  }
);

socket.on(
  'transport-consume',
  async ({ rtpCapabilities, producerId, transportId }) => {
    ...
    
    const consumer = await transport.consume({...});
    
    consumer.on('transportclose', () => {
      console.log('consumer transport close');
      consumers.delete(consumer.id);
    });

    consumer.on('producerclose', () => {
      console.log("consumer's producer close");
      consumers.delete(consumer.id);
    });
  }
);

 

그리고 사용자가 방을 나갈 때 클라이언트가 서버 측에 연결 해제를 알려줄 수 있도록 이벤트를 하나 생성해야한다. 클라이언트 또는 서버 측에서 연결이 끊기더라도 상대 측 인스턴스를 제어할 특정 트리거가 존재하지 않기 때문이다. 연결 해제 ‘exit’를 호출하면 서버 측에서 모든 transports를 close()한다.

// server

socket.on('exit', async () => {
  // logic...
  
  exit();
});

function exit() {
  transports.forEach((transport) => transport.close());
}

 

 


마무리

이렇게 해서 WebRTC 구현이 완료 되었다. 나는 원래 새로운 기술을 익힐 때면 정말 최소한의 개념만 습득하고 일단 만들어보자! 하는 타입이었다. 근데 이번 프로젝트는 모든 개념이 낯설다보니 그 방식이 통하지 않았다. 특히 WebRTC는 정해진 통신 순서가 있고 이에 따라 이벤트가 호출되는 식이었기에 통신 과정을 정확히 이해하는 게 굉장히 중요한 일이었다. 심지어 라이브러리를 사용 하려니 또 새로운 용어를 익혀야했다. 생각해보면 구현하는 것보다 개념 이해하는데 더 많은 시간을 쏟은 것 같은데, 지금 다시 보면 왜 그렇게 어려워 했던 건지 이해 안갈 정도로 많은 걸 얻을 수 있었다.

 

물론 아쉬움도 있다. 일단 잘 돌아가는게 목적이었고 성능이나 통신 속도는 고려되지 못했다. 언젠간 기회가 된다면 성능 위주로 고도화 해보싶고, SFU 말고 다른 아키텍처도 구현해보고싶다.

 

 

참고

https://mediasoup.org/documentation/v3/communication-between-client-and-server/

https://www.youtube.com/watch?v=DOe7GkQgwPo

https://webrtc.ventures/2022/05/webrtc-with-mediasoup/