From 77008e16fef2e829109e027594726fa44d87b2e5 Mon Sep 17 00:00:00 2001 From: claincly Date: Wed, 12 May 2021 08:13:44 +0100 Subject: [PATCH] Allow RTSP streaming using TCP. NAT will block off incoming UDP connection because the router has no knowledge of the necessary port mapping (the mapping is never set up because UDP is connectionless). The end result is, the UDP socket to receive RTP data will timeout. After the `SocketTimeoutException` is caught, the following takes place to try streaming with TCP (or, RTP over RTSP). - `RtspClient` sends TEARDOWN to tear down the current session. - `RtspClient` re-connect to the RTSP server. - `RtspMediaPeriod` cancels all loading `RtpDataLoadables` (that are using UDP) - `RtspMediaPeriod` constructs new `RtpDataLoadables` that use `TransferRtpDataChannel`, and starts loading. - Once the `RtpDataLoadables` are up and running, we are ready to receive. `RtspClient` sends the SETUP requests. - The rest of the flow is unchanged. #minor-release PiperOrigin-RevId: 373310774 --- .../source/rtsp/RtpDataChannel.java | 34 +++- .../source/rtsp/RtpDataLoadable.java | 46 +----- .../exoplayer2/source/rtsp/RtpUtils.java | 34 ++++ .../exoplayer2/source/rtsp/RtspClient.java | 49 +++++- .../source/rtsp/RtspMediaPeriod.java | 81 ++++++++-- .../source/rtsp/RtspMediaSource.java | 2 +- .../source/rtsp/TransferRtpDataChannel.java | 125 ++++++++++++++ .../rtsp/TransferRtpDataChannelFactory.java | 29 ++++ .../rtsp/UdpDataSourceRtpDataChannel.java | 44 ++++- .../UdpDataSourceRtpDataChannelFactory.java | 56 +++++++ .../source/rtsp/RtspMediaPeriodTest.java | 4 +- .../rtsp/TransferRtpDataChannelTest.java | 153 ++++++++++++++++++ 12 files changed, 587 insertions(+), 70 deletions(-) create mode 100644 library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtpUtils.java create mode 100644 library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/TransferRtpDataChannel.java create mode 100644 library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/TransferRtpDataChannelFactory.java create mode 100644 library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/UdpDataSourceRtpDataChannelFactory.java create mode 100644 library/rtsp/src/test/java/com/google/android/exoplayer2/source/rtsp/TransferRtpDataChannelTest.java diff --git a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtpDataChannel.java b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtpDataChannel.java index 41ea556e5a..6453308670 100644 --- a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtpDataChannel.java +++ b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtpDataChannel.java @@ -15,7 +15,9 @@ */ package com.google.android.exoplayer2.source.rtsp; +import com.google.android.exoplayer2.C; import com.google.android.exoplayer2.upstream.DataSource; +import java.io.IOException; /** An RTP {@link DataSource}. */ /* package */ interface RtpDataChannel extends DataSource { @@ -23,13 +25,35 @@ import com.google.android.exoplayer2.upstream.DataSource; /** Creates {@link RtpDataChannel} for RTSP streams. */ interface Factory { - /** Creates a new {@link RtpDataChannel} instance. */ - RtpDataChannel createDataChannel(); + /** + * Creates a new {@link RtpDataChannel} instance for RTP data transfer. + * + * @throws IOException If the data channels failed to open. + */ + RtpDataChannel createAndOpenDataChannel(int trackId) throws IOException; } - /** The default {@link Factory} that returns a new {@link UdpDataSourceRtpDataChannel}. */ - Factory DEFAULT_FACTORY = UdpDataSourceRtpDataChannel::new; + /** Returns the RTSP transport header for this {@link RtpDataChannel} */ + String getTransport(); - /** Returns the local port used in the underlying transport channel. */ + /** + * Returns the receiving port or channel used by the underlying transport protocol, {@link + * C#INDEX_UNSET} if the data channel is not opened. + */ int getLocalPort(); + + /** + * Returns whether the data channel is using sideband binary data to transmit RTP packets. For + * example, RTP-over-RTSP. + */ + boolean usesSidebandBinaryData(); + + /** + * Writes data to the channel. + * + *

The channel owns the written buffer, the user must not alter its content after writing. + * + * @param buffer The buffer from which data should be written. The buffer should be full. + */ + void write(byte[] buffer); } diff --git a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtpDataLoadable.java b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtpDataLoadable.java index dd6847d83b..573590047b 100644 --- a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtpDataLoadable.java +++ b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtpDataLoadable.java @@ -17,9 +17,7 @@ package com.google.android.exoplayer2.source.rtsp; import static com.google.android.exoplayer2.util.Assertions.checkNotNull; -import static com.google.android.exoplayer2.util.Util.closeQuietly; -import android.net.Uri; import android.os.Handler; import androidx.annotation.Nullable; import com.google.android.exoplayer2.C; @@ -27,7 +25,6 @@ import com.google.android.exoplayer2.extractor.DefaultExtractorInput; import com.google.android.exoplayer2.extractor.ExtractorInput; import com.google.android.exoplayer2.extractor.ExtractorOutput; import com.google.android.exoplayer2.extractor.PositionHolder; -import com.google.android.exoplayer2.upstream.DataSpec; import com.google.android.exoplayer2.upstream.Loader; import com.google.android.exoplayer2.util.Util; import java.io.IOException; @@ -55,16 +52,11 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; * * @param transport The RTSP transport (RFC2326 Section 12.39) including the client data port * and RTCP port. + * @param rtpDataChannel The {@link RtpDataChannel} associated with the transport. */ - void onTransportReady(String transport); + void onTransportReady(String transport, RtpDataChannel rtpDataChannel); } - private static final String DEFAULT_TRANSPORT_FORMAT = "RTP/AVP;unicast;client_port=%d-%d"; - - private static final String RTP_ANY_INCOMING_IPV4 = "rtp://0.0.0.0"; - // Using port zero will cause the system to generate a port. - private static final int RTP_LOCAL_PORT = 0; - private static final String RTP_BIND_ADDRESS = RTP_ANY_INCOMING_IPV4 + ":" + RTP_LOCAL_PORT; /** The track ID associated with the Loadable. */ public final int trackId; @@ -141,33 +133,13 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; @Override public void load() throws IOException { - @Nullable RtpDataChannel firstDataChannel = null; - @Nullable RtpDataChannel secondDataChannel = null; - + @Nullable RtpDataChannel dataChannel = null; try { - // Open and set up the data channel. - // From RFC3550 Section 11: "For UDP and similar protocols, RTP SHOULD use an even destination - // port number and the corresponding RTCP stream SHOULD use the next higher (odd) destination - // port number". Some RTSP servers are strict about this rule. We open a data channel first, - // and depending its port number, open the next data channel with a port number that is either - // the higher or the lower. - firstDataChannel = rtpDataChannelFactory.createDataChannel(); - firstDataChannel.open(new DataSpec(Uri.parse(RTP_BIND_ADDRESS))); + dataChannel = rtpDataChannelFactory.createAndOpenDataChannel(trackId); + String transport = dataChannel.getTransport(); - int firstPort = firstDataChannel.getLocalPort(); - boolean isFirstPortNumberEven = (firstPort % 2 == 0); - int secondPort = isFirstPortNumberEven ? firstPort + 1 : firstPort - 1; - - // RTCP always uses the immediate next port. - secondDataChannel = rtpDataChannelFactory.createDataChannel(); - secondDataChannel.open(new DataSpec(Uri.parse(RTP_ANY_INCOMING_IPV4 + ":" + secondPort))); - - // RTP data port is always the lower and even-numbered port. - RtpDataChannel dataChannel = isFirstPortNumberEven ? firstDataChannel : secondDataChannel; - int dataPort = dataChannel.getLocalPort(); - int rtcpPort = dataPort + 1; - String transport = Util.formatInvariant(DEFAULT_TRANSPORT_FORMAT, dataPort, rtcpPort); - playbackThreadHandler.post(() -> eventListener.onTransportReady(transport)); + RtpDataChannel finalDataChannel = dataChannel; + playbackThreadHandler.post(() -> eventListener.onTransportReady(transport, finalDataChannel)); // Sets up the extractor. ExtractorInput extractorInput = @@ -181,12 +153,10 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; extractor.seek(nextRtpTimestamp, pendingSeekPositionUs); pendingSeekPositionUs = C.TIME_UNSET; } - extractor.read(extractorInput, /* seekPosition= */ new PositionHolder()); } } finally { - closeQuietly(firstDataChannel); - closeQuietly(secondDataChannel); + Util.closeQuietly(dataChannel); } } diff --git a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtpUtils.java b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtpUtils.java new file mode 100644 index 0000000000..d37163f024 --- /dev/null +++ b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtpUtils.java @@ -0,0 +1,34 @@ +/* + * Copyright 2021 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.android.exoplayer2.source.rtsp; + +import android.net.Uri; +import com.google.android.exoplayer2.upstream.DataSpec; +import com.google.android.exoplayer2.util.Util; + +/** Utility methods for RTP. */ +public final class RtpUtils { + + private static final String RTP_ANY_INCOMING_IPV4 = "rtp://0.0.0.0"; + + /** Returns the {@link DataSpec} with the {@link Uri} for incoming RTP connection. */ + public static DataSpec getIncomingRtpDataSpec(int portNumber) { + return new DataSpec( + Uri.parse(Util.formatInvariant("%s:%d", RTP_ANY_INCOMING_IPV4, portNumber))); + } + + private RtpUtils() {} +} diff --git a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspClient.java b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspClient.java index c64bf343c6..b18b707459 100644 --- a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspClient.java +++ b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspClient.java @@ -91,14 +91,15 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; private final SessionInfoListener sessionInfoListener; private final Uri uri; @Nullable private final String userAgent; - private final RtspMessageChannel messageChannel; private final ArrayDeque pendingSetupRtpLoadInfos; // TODO(b/172331505) Add a timeout monitor for pending requests. private final SparseArray pendingRequests; private final MessageSender messageSender; + private final SparseArray transferRtpDataChannelMap; + private RtspMessageChannel messageChannel; private @MonotonicNonNull PlaybackEventListener playbackEventListener; - private @MonotonicNonNull String sessionId; + @Nullable private String sessionId; @Nullable private KeepAliveMonitor keepAliveMonitor; private boolean hasUpdatedTimelineAndTracks; private long pendingSeekPositionUs; @@ -121,11 +122,12 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; this.sessionInfoListener = sessionInfoListener; this.uri = RtspMessageUtil.removeUserInfo(uri); this.userAgent = userAgent; - messageChannel = new RtspMessageChannel(new MessageListener()); pendingSetupRtpLoadInfos = new ArrayDeque<>(); pendingRequests = new SparseArray<>(); messageSender = new MessageSender(); + transferRtpDataChannelMap = new SparseArray<>(); pendingSeekPositionUs = C.TIME_UNSET; + messageChannel = new RtspMessageChannel(new MessageListener()); } /** @@ -137,11 +139,8 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; * @throws IOException When failed to open a connection to the supplied {@link Uri}. */ public void start() throws IOException { - checkArgument(uri.getHost() != null); - int rtspPort = uri.getPort() > 0 ? uri.getPort() : DEFAULT_RTSP_PORT; - Socket socket = SocketFactory.getDefault().createSocket(checkNotNull(uri.getHost()), rtspPort); try { - messageChannel.openSocket(socket); + messageChannel.openSocket(openSocket()); } catch (IOException e) { Util.closeQuietly(messageChannel); throw e; @@ -149,6 +148,13 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; messageSender.sendOptionsRequest(uri, sessionId); } + /** Opens a {@link Socket} to the session {@link #uri}. */ + private Socket openSocket() throws IOException { + checkArgument(uri.getHost() != null); + int rtspPort = uri.getPort() > 0 ? uri.getPort() : DEFAULT_RTSP_PORT; + return SocketFactory.getDefault().createSocket(checkNotNull(uri.getHost()), rtspPort); + } + /** Sets the {@link PlaybackEventListener} to receive playback events. */ public void setPlaybackEventListener(PlaybackEventListener playbackEventListener) { this.playbackEventListener = playbackEventListener; @@ -202,6 +208,27 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; messageChannel.close(); } + /** + * Sets up a new playback session using TCP as RTP lower transport. + * + *

This mode is also known as "RTP-over-RTSP". + */ + public void retryWithRtpTcp() { + try { + close(); + messageChannel = new RtspMessageChannel(new MessageListener()); + messageChannel.openSocket(openSocket()); + sessionId = null; + } catch (IOException e) { + checkNotNull(playbackEventListener).onPlaybackError(new RtspPlaybackException(e)); + } + } + + /** Registers an {@link RtpDataChannel} to receive RTSP interleaved data. */ + public void registerInterleavedDataChannel(RtpDataChannel rtpDataChannel) { + transferRtpDataChannelMap.put(rtpDataChannel.getLocalPort(), rtpDataChannel); + } + private void continueSetupRtspTrack() { @Nullable RtpLoadInfo loadInfo = pendingSetupRtpLoadInfos.pollFirst(); if (loadInfo == null) { @@ -413,6 +440,14 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; } } + @Override + public void onInterleavedBinaryDataReceived(byte[] data, int channel) { + @Nullable RtpDataChannel dataChannel = transferRtpDataChannelMap.get(channel); + if (dataChannel != null) { + dataChannel.write(data); + } + } + // Response handlers must only be called only on 200 (OK) responses. public void onOptionsResponseReceived(RtspOptionsResponse response) { diff --git a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMediaPeriod.java b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMediaPeriod.java index bf54f1775d..c88ae3bb1f 100644 --- a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMediaPeriod.java +++ b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMediaPeriod.java @@ -47,6 +47,7 @@ import com.google.android.exoplayer2.trackselection.ExoTrackSelection; import com.google.android.exoplayer2.trackselection.TrackSelection; import com.google.android.exoplayer2.upstream.Allocator; import com.google.android.exoplayer2.upstream.Loader; +import com.google.android.exoplayer2.upstream.Loader.LoadErrorAction; import com.google.android.exoplayer2.upstream.Loader.Loadable; import com.google.android.exoplayer2.util.Util; import com.google.common.collect.ImmutableList; @@ -61,7 +62,6 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; /** A {@link MediaPeriod} that loads an RTSP stream. */ /* package */ final class RtspMediaPeriod implements MediaPeriod { - private static final String TAG = "RtspMediaPeriod"; /** The maximum times to retry if the underlying data channel failed to bind. */ private static final int PORT_BINDING_MAX_RETRY_COUNT = 3; @@ -70,7 +70,6 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; private final InternalListener internalListener; private final RtspClient rtspClient; - private final RtpDataChannel.Factory rtpDataChannelFactory; private final List rtspLoaderWrappers; private final List selectedLoadInfos; @@ -85,6 +84,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; private boolean prepared; private boolean trackSelected; private int portBindingRetryCount; + private boolean hasRetriedWithRtpTcp; /** * Creates an RTSP media period. @@ -106,11 +106,11 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; rtspLoaderWrappers = new ArrayList<>(rtspTracks.size()); this.rtspClient = rtspClient; this.rtspClient.setPlaybackEventListener(internalListener); - this.rtpDataChannelFactory = rtpDataChannelFactory; for (int i = 0; i < rtspTracks.size(); i++) { RtspMediaTrack rtspMediaTrack = rtspTracks.get(i); - rtspLoaderWrappers.add(new RtspLoaderWrapper(rtspMediaTrack, /* trackId= */ i)); + rtspLoaderWrappers.add( + new RtspLoaderWrapper(rtspMediaTrack, /* trackId= */ i, rtpDataChannelFactory)); } selectedLoadInfos = new ArrayList<>(rtspTracks.size()); pendingSeekPositionUs = C.TIME_UNSET; @@ -434,7 +434,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; preparationError = error; } else { if (error.getCause() instanceof SocketTimeoutException) { - handleSocketTimeout(loadable); + return handleSocketTimeout(loadable); } else if (error.getCause() instanceof BindException) { // Allow for retry on RTP port open failure by catching BindException. Two ports are // opened for each RTP stream, the first port number is auto assigned by the system, while @@ -511,22 +511,63 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; } /** Handles the {@link Loadable} whose {@link RtpDataChannel} timed out. */ - private void handleSocketTimeout(RtpDataLoadable loadable) { + private LoadErrorAction handleSocketTimeout(RtpDataLoadable loadable) { // TODO(b/172331505) Allow for retry when loading is not ending. if (getBufferedPositionUs() == Long.MIN_VALUE) { - // Raise exception if no sample has been received so far. - playbackException = new RtspPlaybackException("Possible dropped UDP connection."); - return; + // Retry playback with TCP if no sample has been received so far. + if (!hasRetriedWithRtpTcp) { + retryWithRtpTcp(); + hasRetriedWithRtpTcp = true; + } + // Don't retry with the current UDP backed loadables. + return Loader.DONT_RETRY; } for (int i = 0; i < rtspLoaderWrappers.size(); i++) { RtspLoaderWrapper loaderWrapper = rtspLoaderWrappers.get(i); if (loaderWrapper.loadInfo.loadable == loadable) { loaderWrapper.cancelLoad(); - return; + break; } } playbackException = new RtspPlaybackException("Unknown loadable timed out."); + return Loader.DONT_RETRY; + } + } + + private void retryWithRtpTcp() { + rtspClient.retryWithRtpTcp(); + + RtpDataChannel.Factory rtpDataChannelFactory = new TransferRtpDataChannelFactory(); + ArrayList newLoaderWrappers = new ArrayList<>(rtspLoaderWrappers.size()); + ArrayList newSelectedLoadInfos = new ArrayList<>(selectedLoadInfos.size()); + + for (int i = 0; i < rtspLoaderWrappers.size(); i++) { + RtspLoaderWrapper loaderWrapper = rtspLoaderWrappers.get(i); + + RtspLoaderWrapper newLoaderWrapper = + new RtspLoaderWrapper( + loaderWrapper.loadInfo.mediaTrack, /* trackId= */ i, rtpDataChannelFactory); + newLoaderWrappers.add(newLoaderWrapper); + newLoaderWrapper.startLoading(); + + if (selectedLoadInfos.contains(loaderWrapper.loadInfo)) { + newSelectedLoadInfos.add(newLoaderWrapper.loadInfo); + } + } + + // Switch to new LoaderWrappers. + ImmutableList oldRtspLoaderWrappers = + ImmutableList.copyOf(rtspLoaderWrappers); + rtspLoaderWrappers.clear(); + rtspLoaderWrappers.addAll(newLoaderWrappers); + selectedLoadInfos.clear(); + selectedLoadInfos.addAll(newSelectedLoadInfos); + + // Cancel old loadable wrappers after switching, so that buffered position is always read from + // active sample queues. + for (int i = 0; i < oldRtspLoaderWrappers.size(); i++) { + oldRtspLoaderWrappers.get(i).cancelLoad(); } } @@ -576,9 +617,10 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; * *

Instances must be {@link #release() released} after loadings conclude. */ - public RtspLoaderWrapper(RtspMediaTrack mediaTrack, int trackId) { - loadInfo = new RtpLoadInfo(mediaTrack, trackId); - loader = new Loader("ExoPlayer:RtspMediaPeriod:RtspDataLoader " + trackId); + public RtspLoaderWrapper( + RtspMediaTrack mediaTrack, int trackId, RtpDataChannel.Factory rtpDataChannelFactory) { + loadInfo = new RtpLoadInfo(mediaTrack, trackId, rtpDataChannelFactory); + loader = new Loader("ExoPlayer:RtspMediaPeriod:RtspLoaderWrapper " + trackId); sampleQueue = SampleQueue.createWithoutDrm(allocator); sampleQueue.setUpstreamFormatChangeListener(internalListener); } @@ -639,14 +681,21 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; @Nullable private String transport; /** Creates a new instance. */ - public RtpLoadInfo(RtspMediaTrack mediaTrack, int trackId) { + public RtpLoadInfo( + RtspMediaTrack mediaTrack, int trackId, RtpDataChannel.Factory rtpDataChannelFactory) { this.mediaTrack = mediaTrack; RtpDataLoadable.EventListener transportEventListener = - (transport) -> { - this.transport = transport; + (transport, rtpDataChannel) -> { + RtpLoadInfo.this.transport = transport; + + if (rtpDataChannel.usesSidebandBinaryData()) { + rtspClient.registerInterleavedDataChannel(rtpDataChannel); + } + maybeSetupTracks(); }; + this.loadable = new RtpDataLoadable( trackId, diff --git a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMediaSource.java b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMediaSource.java index e4e49306b7..9596be328f 100644 --- a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMediaSource.java +++ b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMediaSource.java @@ -150,7 +150,7 @@ public final class RtspMediaSource extends BaseMediaSource { private RtspMediaSource(MediaItem mediaItem) { this.mediaItem = mediaItem; - rtpDataChannelFactory = RtpDataChannel.DEFAULT_FACTORY; + rtpDataChannelFactory = new UdpDataSourceRtpDataChannelFactory(); } @Override diff --git a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/TransferRtpDataChannel.java b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/TransferRtpDataChannel.java new file mode 100644 index 0000000000..fc1bcd63ff --- /dev/null +++ b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/TransferRtpDataChannel.java @@ -0,0 +1,125 @@ +/* + * Copyright 2021 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.android.exoplayer2.source.rtsp; + +import static com.google.android.exoplayer2.util.Assertions.checkState; +import static java.lang.Math.min; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import android.net.Uri; +import androidx.annotation.Nullable; +import com.google.android.exoplayer2.C; +import com.google.android.exoplayer2.upstream.BaseDataSource; +import com.google.android.exoplayer2.upstream.DataSpec; +import com.google.android.exoplayer2.util.Util; +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.util.Arrays; +import java.util.concurrent.LinkedBlockingQueue; + +/** An {@link RtpDataChannel} that transfers received data in-memory. */ +/* package */ final class TransferRtpDataChannel extends BaseDataSource implements RtpDataChannel { + + private static final String DEFAULT_TCP_TRANSPORT_FORMAT = + "RTP/AVP/TCP;unicast;interleaved=%d-%d"; + private static final long TIMEOUT_MS = 8_000; + + private final LinkedBlockingQueue packetQueue; + + private byte[] unreadData; + private int channelNumber; + + /** Creates a new instance. */ + public TransferRtpDataChannel() { + super(/* isNetwork= */ true); + packetQueue = new LinkedBlockingQueue<>(); + unreadData = new byte[0]; + channelNumber = C.INDEX_UNSET; + } + + @Override + public String getTransport() { + checkState(channelNumber != C.INDEX_UNSET); // Assert open() is called. + return Util.formatInvariant(DEFAULT_TCP_TRANSPORT_FORMAT, channelNumber, channelNumber + 1); + } + + @Override + public int getLocalPort() { + return channelNumber; + } + + @Override + public boolean usesSidebandBinaryData() { + return true; + } + + @Override + public long open(DataSpec dataSpec) { + this.channelNumber = dataSpec.uri.getPort(); + return C.LENGTH_UNSET; + } + + @Override + public void close() {} + + @Nullable + @Override + public Uri getUri() { + return null; + } + + @Override + public int read(byte[] target, int offset, int length) throws IOException { + if (length == 0) { + return 0; + } + + int bytesRead = 0; + int bytesToRead = min(length, unreadData.length); + System.arraycopy(unreadData, /* srcPos= */ 0, target, offset, bytesToRead); + bytesRead += bytesToRead; + unreadData = Arrays.copyOfRange(unreadData, bytesToRead, unreadData.length); + + if (bytesRead == length) { + return bytesRead; + } + + @Nullable byte[] data; + try { + // TODO(internal b/172331505) Consider move the receiving timeout logic to an upper level + // (maybe RtspClient). There is no actual socket receiving here. + data = packetQueue.poll(TIMEOUT_MS, MILLISECONDS); + if (data == null) { + throw new IOException(new SocketTimeoutException()); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return C.RESULT_END_OF_INPUT; + } + + bytesToRead = min(length - bytesRead, data.length); + System.arraycopy(data, /* srcPos= */ 0, target, offset + bytesRead, bytesToRead); + if (bytesToRead < data.length) { + unreadData = Arrays.copyOfRange(data, bytesToRead, data.length); + } + return bytesRead + bytesToRead; + } + + @Override + public void write(byte[] buffer) { + packetQueue.add(buffer); + } +} diff --git a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/TransferRtpDataChannelFactory.java b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/TransferRtpDataChannelFactory.java new file mode 100644 index 0000000000..31860a8fd6 --- /dev/null +++ b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/TransferRtpDataChannelFactory.java @@ -0,0 +1,29 @@ +/* + * Copyright 2021 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.android.exoplayer2.source.rtsp; + +/** Factory for {@link TransferRtpDataChannel}. */ +/* package */ final class TransferRtpDataChannelFactory implements RtpDataChannel.Factory { + + private static final int INTERLEAVED_CHANNELS_PER_TRACK = 2; + + @Override + public RtpDataChannel createAndOpenDataChannel(int trackId) { + TransferRtpDataChannel dataChannel = new TransferRtpDataChannel(); + dataChannel.open(RtpUtils.getIncomingRtpDataSpec(trackId * INTERLEAVED_CHANNELS_PER_TRACK)); + return dataChannel; + } +} diff --git a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/UdpDataSourceRtpDataChannel.java b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/UdpDataSourceRtpDataChannel.java index 429be6634d..3f56d4ac36 100644 --- a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/UdpDataSourceRtpDataChannel.java +++ b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/UdpDataSourceRtpDataChannel.java @@ -15,25 +15,44 @@ */ package com.google.android.exoplayer2.source.rtsp; +import static com.google.android.exoplayer2.util.Assertions.checkArgument; +import static com.google.android.exoplayer2.util.Assertions.checkState; + import android.net.Uri; import androidx.annotation.Nullable; +import com.google.android.exoplayer2.C; import com.google.android.exoplayer2.upstream.DataSpec; import com.google.android.exoplayer2.upstream.TransferListener; import com.google.android.exoplayer2.upstream.UdpDataSource; +import com.google.android.exoplayer2.util.Util; import java.io.IOException; /** An {@link RtpDataChannel} for UDP transport. */ /* package */ final class UdpDataSourceRtpDataChannel implements RtpDataChannel { + + private static final String DEFAULT_UDP_TRANSPORT_FORMAT = "RTP/AVP;unicast;client_port=%d-%d"; + private final UdpDataSource dataSource; + /** The associated RTCP channel; {@code null} if the current channel is an RTCP channel. */ + @Nullable private UdpDataSourceRtpDataChannel rtcpChannel; + /** Creates a new instance. */ public UdpDataSourceRtpDataChannel() { dataSource = new UdpDataSource(); } + @Override + public String getTransport() { + int dataPortNumber = getLocalPort(); + checkState(dataPortNumber != C.INDEX_UNSET); // Assert open() is called. + return Util.formatInvariant(DEFAULT_UDP_TRANSPORT_FORMAT, dataPortNumber, dataPortNumber + 1); + } + @Override public int getLocalPort() { - return dataSource.getLocalPort(); + int port = dataSource.getLocalPort(); + return port == UdpDataSource.UDP_PORT_UNSET ? C.INDEX_UNSET : port; } @Override @@ -55,10 +74,33 @@ import java.io.IOException; @Override public void close() { dataSource.close(); + + if (rtcpChannel != null) { + rtcpChannel.close(); + } } @Override public int read(byte[] target, int offset, int length) throws IOException { return dataSource.read(target, offset, length); } + + @Override + public boolean usesSidebandBinaryData() { + return false; + } + + /** + * Writing to a {@link UdpDataSource} backed {@link RtpDataChannel} is not supported at the + * moment. + */ + @Override + public void write(byte[] buffer) { + throw new UnsupportedOperationException(); + } + + public void setRtcpChannel(UdpDataSourceRtpDataChannel rtcpChannel) { + checkArgument(this != rtcpChannel); + this.rtcpChannel = rtcpChannel; + } } diff --git a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/UdpDataSourceRtpDataChannelFactory.java b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/UdpDataSourceRtpDataChannelFactory.java new file mode 100644 index 0000000000..ccb201dbe9 --- /dev/null +++ b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/UdpDataSourceRtpDataChannelFactory.java @@ -0,0 +1,56 @@ +/* + * Copyright 2021 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.android.exoplayer2.source.rtsp; + +import com.google.android.exoplayer2.util.Util; +import java.io.IOException; + +/** Factory for {@link UdpDataSourceRtpDataChannel}. */ +/* package */ final class UdpDataSourceRtpDataChannelFactory implements RtpDataChannel.Factory { + + @Override + public RtpDataChannel createAndOpenDataChannel(int trackId) throws IOException { + UdpDataSourceRtpDataChannel firstChannel = new UdpDataSourceRtpDataChannel(); + UdpDataSourceRtpDataChannel secondChannel = new UdpDataSourceRtpDataChannel(); + + try { + // From RFC3550 Section 11: "For UDP and similar protocols, RTP SHOULD use an even destination + // port number and the corresponding RTCP stream SHOULD use the next higher (odd) destination + // port number". Some RTSP servers are strict about this rule. We open a data channel first, + // and depending its port number, open the next data channel with a port number that is either + // the higher or the lower. + + // Using port zero will cause the system to generate a port. + firstChannel.open(RtpUtils.getIncomingRtpDataSpec(/* portNumber= */ 0)); + int firstPort = firstChannel.getLocalPort(); + boolean isFirstPortEven = firstPort % 2 == 0; + int portToOpen = isFirstPortEven ? firstPort + 1 : firstPort - 1; + secondChannel.open(RtpUtils.getIncomingRtpDataSpec(/* portNumber= */ portToOpen)); + + if (isFirstPortEven) { + firstChannel.setRtcpChannel(secondChannel); + return firstChannel; + } else { + secondChannel.setRtcpChannel(firstChannel); + return secondChannel; + } + } catch (IOException e) { + Util.closeQuietly(firstChannel); + Util.closeQuietly(secondChannel); + throw e; + } + } +} diff --git a/library/rtsp/src/test/java/com/google/android/exoplayer2/source/rtsp/RtspMediaPeriodTest.java b/library/rtsp/src/test/java/com/google/android/exoplayer2/source/rtsp/RtspMediaPeriodTest.java index a93fe3a897..16b48c06c1 100644 --- a/library/rtsp/src/test/java/com/google/android/exoplayer2/source/rtsp/RtspMediaPeriodTest.java +++ b/library/rtsp/src/test/java/com/google/android/exoplayer2/source/rtsp/RtspMediaPeriodTest.java @@ -67,7 +67,7 @@ public class RtspMediaPeriodTest { .build(), Uri.parse("rtsp://localhost/test"))), PLACEHOLDER_RTSP_CLIENT, - RtpDataChannel.DEFAULT_FACTORY); + new UdpDataSourceRtpDataChannelFactory()); AtomicBoolean prepareCallbackCalled = new AtomicBoolean(false); rtspMediaPeriod.prepare( @@ -95,7 +95,7 @@ public class RtspMediaPeriodTest { new DefaultAllocator(/* trimOnReset= */ true, C.DEFAULT_BUFFER_SEGMENT_SIZE), ImmutableList.of(), PLACEHOLDER_RTSP_CLIENT, - RtpDataChannel.DEFAULT_FACTORY); + new UdpDataSourceRtpDataChannelFactory()); assertThat(rtspMediaPeriod.getBufferedPositionUs()).isEqualTo(C.TIME_END_OF_SOURCE); } diff --git a/library/rtsp/src/test/java/com/google/android/exoplayer2/source/rtsp/TransferRtpDataChannelTest.java b/library/rtsp/src/test/java/com/google/android/exoplayer2/source/rtsp/TransferRtpDataChannelTest.java new file mode 100644 index 0000000000..93c921456e --- /dev/null +++ b/library/rtsp/src/test/java/com/google/android/exoplayer2/source/rtsp/TransferRtpDataChannelTest.java @@ -0,0 +1,153 @@ +/* + * Copyright 2021 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.android.exoplayer2.source.rtsp; + +import static com.google.android.exoplayer2.testutil.TestUtil.buildTestData; +import static com.google.common.truth.Truth.assertThat; + +import androidx.test.ext.junit.runners.AndroidJUnit4; +import com.google.common.primitives.Bytes; +import java.util.Arrays; +import org.junit.Test; +import org.junit.runner.RunWith; + +/** Unit test for {@link TransferRtpDataChannel}. */ +@RunWith(AndroidJUnit4.class) +public class TransferRtpDataChannelTest { + + @Test + public void read_withLargeEnoughBuffer_reads() throws Exception { + byte[] randomBytes = buildTestData(20); + byte[] buffer = new byte[40]; + TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel(); + transferRtpDataChannel.write(randomBytes); + + transferRtpDataChannel.read(buffer, /* offset= */ 0, buffer.length); + + assertThat(Arrays.copyOfRange(buffer, /* from= */ 0, /* to= */ 20)).isEqualTo(randomBytes); + } + + @Test + public void read_withSmallBufferEnoughBuffer_readsThreeTimes() throws Exception { + byte[] randomBytes = buildTestData(20); + byte[] buffer = new byte[8]; + TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel(); + transferRtpDataChannel.write(randomBytes); + + transferRtpDataChannel.read(buffer, /* offset= */ 0, buffer.length); + assertThat(buffer).isEqualTo(Arrays.copyOfRange(randomBytes, /* from= */ 0, /* to= */ 8)); + transferRtpDataChannel.read(buffer, /* offset= */ 0, buffer.length); + assertThat(buffer).isEqualTo(Arrays.copyOfRange(randomBytes, /* from= */ 8, /* to= */ 16)); + transferRtpDataChannel.read(buffer, /* offset= */ 0, /* length= */ 4); + assertThat(Arrays.copyOfRange(buffer, /* from= */ 0, /* to= */ 4)) + .isEqualTo(Arrays.copyOfRange(randomBytes, /* from= */ 16, /* to= */ 20)); + } + + @Test + public void read_withSmallBuffer_reads() throws Exception { + byte[] randomBytes = buildTestData(40); + byte[] buffer = new byte[20]; + TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel(); + transferRtpDataChannel.write(randomBytes); + + transferRtpDataChannel.read(buffer, /* offset= */ 0, buffer.length); + assertThat(buffer).isEqualTo(Arrays.copyOfRange(randomBytes, /* from= */ 0, /* to= */ 20)); + + transferRtpDataChannel.read(buffer, /* offset= */ 0, buffer.length); + assertThat(buffer).isEqualTo(Arrays.copyOfRange(randomBytes, /* from= */ 20, /* to= */ 40)); + } + + @Test + public void read_withSmallAndModerateBufferAndSubsequentProducerWrite_reads() throws Exception { + byte[] randomBytes1 = buildTestData(40); + byte[] randomBytes2 = buildTestData(40); + byte[] smallBuffer = new byte[20]; + byte[] bigBuffer = new byte[40]; + TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel(); + transferRtpDataChannel.write(randomBytes1); + + transferRtpDataChannel.read(smallBuffer, /* offset= */ 0, smallBuffer.length); + assertThat(smallBuffer) + .isEqualTo(Arrays.copyOfRange(randomBytes1, /* from= */ 0, /* to= */ 20)); + + transferRtpDataChannel.write(randomBytes2); + + // Read the remaining 20 bytes in randomBytes1, and 20 bytes from randomBytes2. + transferRtpDataChannel.read(bigBuffer, /* offset= */ 0, bigBuffer.length); + assertThat(bigBuffer) + .isEqualTo( + Bytes.concat( + Arrays.copyOfRange(randomBytes1, /* from= */ 20, /* to= */ 40), + Arrays.copyOfRange(randomBytes2, /* from= */ 0, /* to= */ 20))); + + // Read the remaining 20 bytes in randomBytes2. + transferRtpDataChannel.read(smallBuffer, /* offset= */ 0, smallBuffer.length); + assertThat(smallBuffer) + .isEqualTo(Arrays.copyOfRange(randomBytes2, /* from= */ 20, /* to= */ 40)); + } + + @Test + public void read_withSmallAndBigBufferWithPartialReadAndSubsequentProducerWrite_reads() + throws Exception { + byte[] randomBytes1 = buildTestData(40); + byte[] randomBytes2 = buildTestData(40); + byte[] smallBuffer = new byte[30]; + byte[] bigBuffer = new byte[30]; + TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel(); + transferRtpDataChannel.write(randomBytes1); + + transferRtpDataChannel.read(smallBuffer, /* offset= */ 0, smallBuffer.length); + assertThat(smallBuffer) + .isEqualTo(Arrays.copyOfRange(randomBytes1, /* from= */ 0, /* to= */ 30)); + + transferRtpDataChannel.write(randomBytes2); + + // Read 30 bytes to big buffer. + transferRtpDataChannel.read(bigBuffer, /* offset= */ 0, bigBuffer.length); + assertThat(bigBuffer) + .isEqualTo( + Bytes.concat( + Arrays.copyOfRange(randomBytes1, /* from= */ 30, /* to= */ 40), + Arrays.copyOfRange(randomBytes1, /* from= */ 0, /* to= */ 20))); + + // Read the remaining 20 bytes to big buffer. + transferRtpDataChannel.read(bigBuffer, /* offset= */ 0, /* length= */ 20); + assertThat(Arrays.copyOfRange(bigBuffer, /* from= */ 0, /* to= */ 20)) + .isEqualTo(Arrays.copyOfRange(randomBytes2, /* from= */ 20, /* to= */ 40)); + } + + @Test + public void read_withSmallAndBigBufferAndSubsequentProducerWrite_reads() throws Exception { + byte[] randomBytes1 = buildTestData(40); + byte[] randomBytes2 = buildTestData(40); + byte[] smallBuffer = new byte[20]; + byte[] bigBuffer = new byte[70]; + TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel(); + transferRtpDataChannel.write(randomBytes1); + + transferRtpDataChannel.read(smallBuffer, /* offset= */ 0, smallBuffer.length); + assertThat(smallBuffer) + .isEqualTo(Arrays.copyOfRange(randomBytes1, /* from= */ 0, /* to= */ 20)); + + transferRtpDataChannel.write(randomBytes2); + + transferRtpDataChannel.read(bigBuffer, /* offset= */ 0, bigBuffer.length); + assertThat(Arrays.copyOfRange(bigBuffer, /* from= */ 0, /* to= */ 60)) + .isEqualTo( + Bytes.concat( + Arrays.copyOfRange(randomBytes1, /* from= */ 20, /* to= */ 40), randomBytes2)); + } +}