diff --git a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtpDataChannel.java b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtpDataChannel.java index 41ea556e5a..6453308670 100644 --- a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtpDataChannel.java +++ b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtpDataChannel.java @@ -15,7 +15,9 @@ */ package com.google.android.exoplayer2.source.rtsp; +import com.google.android.exoplayer2.C; import com.google.android.exoplayer2.upstream.DataSource; +import java.io.IOException; /** An RTP {@link DataSource}. */ /* package */ interface RtpDataChannel extends DataSource { @@ -23,13 +25,35 @@ import com.google.android.exoplayer2.upstream.DataSource; /** Creates {@link RtpDataChannel} for RTSP streams. */ interface Factory { - /** Creates a new {@link RtpDataChannel} instance. */ - RtpDataChannel createDataChannel(); + /** + * Creates a new {@link RtpDataChannel} instance for RTP data transfer. + * + * @throws IOException If the data channels failed to open. + */ + RtpDataChannel createAndOpenDataChannel(int trackId) throws IOException; } - /** The default {@link Factory} that returns a new {@link UdpDataSourceRtpDataChannel}. */ - Factory DEFAULT_FACTORY = UdpDataSourceRtpDataChannel::new; + /** Returns the RTSP transport header for this {@link RtpDataChannel} */ + String getTransport(); - /** Returns the local port used in the underlying transport channel. */ + /** + * Returns the receiving port or channel used by the underlying transport protocol, {@link + * C#INDEX_UNSET} if the data channel is not opened. + */ int getLocalPort(); + + /** + * Returns whether the data channel is using sideband binary data to transmit RTP packets. For + * example, RTP-over-RTSP. + */ + boolean usesSidebandBinaryData(); + + /** + * Writes data to the channel. + * + *

The channel owns the written buffer, the user must not alter its content after writing. + * + * @param buffer The buffer from which data should be written. The buffer should be full. + */ + void write(byte[] buffer); } diff --git a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtpDataLoadable.java b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtpDataLoadable.java index dd6847d83b..573590047b 100644 --- a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtpDataLoadable.java +++ b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtpDataLoadable.java @@ -17,9 +17,7 @@ package com.google.android.exoplayer2.source.rtsp; import static com.google.android.exoplayer2.util.Assertions.checkNotNull; -import static com.google.android.exoplayer2.util.Util.closeQuietly; -import android.net.Uri; import android.os.Handler; import androidx.annotation.Nullable; import com.google.android.exoplayer2.C; @@ -27,7 +25,6 @@ import com.google.android.exoplayer2.extractor.DefaultExtractorInput; import com.google.android.exoplayer2.extractor.ExtractorInput; import com.google.android.exoplayer2.extractor.ExtractorOutput; import com.google.android.exoplayer2.extractor.PositionHolder; -import com.google.android.exoplayer2.upstream.DataSpec; import com.google.android.exoplayer2.upstream.Loader; import com.google.android.exoplayer2.util.Util; import java.io.IOException; @@ -55,16 +52,11 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; * * @param transport The RTSP transport (RFC2326 Section 12.39) including the client data port * and RTCP port. + * @param rtpDataChannel The {@link RtpDataChannel} associated with the transport. */ - void onTransportReady(String transport); + void onTransportReady(String transport, RtpDataChannel rtpDataChannel); } - private static final String DEFAULT_TRANSPORT_FORMAT = "RTP/AVP;unicast;client_port=%d-%d"; - - private static final String RTP_ANY_INCOMING_IPV4 = "rtp://0.0.0.0"; - // Using port zero will cause the system to generate a port. - private static final int RTP_LOCAL_PORT = 0; - private static final String RTP_BIND_ADDRESS = RTP_ANY_INCOMING_IPV4 + ":" + RTP_LOCAL_PORT; /** The track ID associated with the Loadable. */ public final int trackId; @@ -141,33 +133,13 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; @Override public void load() throws IOException { - @Nullable RtpDataChannel firstDataChannel = null; - @Nullable RtpDataChannel secondDataChannel = null; - + @Nullable RtpDataChannel dataChannel = null; try { - // Open and set up the data channel. - // From RFC3550 Section 11: "For UDP and similar protocols, RTP SHOULD use an even destination - // port number and the corresponding RTCP stream SHOULD use the next higher (odd) destination - // port number". Some RTSP servers are strict about this rule. We open a data channel first, - // and depending its port number, open the next data channel with a port number that is either - // the higher or the lower. - firstDataChannel = rtpDataChannelFactory.createDataChannel(); - firstDataChannel.open(new DataSpec(Uri.parse(RTP_BIND_ADDRESS))); + dataChannel = rtpDataChannelFactory.createAndOpenDataChannel(trackId); + String transport = dataChannel.getTransport(); - int firstPort = firstDataChannel.getLocalPort(); - boolean isFirstPortNumberEven = (firstPort % 2 == 0); - int secondPort = isFirstPortNumberEven ? firstPort + 1 : firstPort - 1; - - // RTCP always uses the immediate next port. - secondDataChannel = rtpDataChannelFactory.createDataChannel(); - secondDataChannel.open(new DataSpec(Uri.parse(RTP_ANY_INCOMING_IPV4 + ":" + secondPort))); - - // RTP data port is always the lower and even-numbered port. - RtpDataChannel dataChannel = isFirstPortNumberEven ? firstDataChannel : secondDataChannel; - int dataPort = dataChannel.getLocalPort(); - int rtcpPort = dataPort + 1; - String transport = Util.formatInvariant(DEFAULT_TRANSPORT_FORMAT, dataPort, rtcpPort); - playbackThreadHandler.post(() -> eventListener.onTransportReady(transport)); + RtpDataChannel finalDataChannel = dataChannel; + playbackThreadHandler.post(() -> eventListener.onTransportReady(transport, finalDataChannel)); // Sets up the extractor. ExtractorInput extractorInput = @@ -181,12 +153,10 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; extractor.seek(nextRtpTimestamp, pendingSeekPositionUs); pendingSeekPositionUs = C.TIME_UNSET; } - extractor.read(extractorInput, /* seekPosition= */ new PositionHolder()); } } finally { - closeQuietly(firstDataChannel); - closeQuietly(secondDataChannel); + Util.closeQuietly(dataChannel); } } diff --git a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtpUtils.java b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtpUtils.java new file mode 100644 index 0000000000..d37163f024 --- /dev/null +++ b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtpUtils.java @@ -0,0 +1,34 @@ +/* + * Copyright 2021 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.android.exoplayer2.source.rtsp; + +import android.net.Uri; +import com.google.android.exoplayer2.upstream.DataSpec; +import com.google.android.exoplayer2.util.Util; + +/** Utility methods for RTP. */ +public final class RtpUtils { + + private static final String RTP_ANY_INCOMING_IPV4 = "rtp://0.0.0.0"; + + /** Returns the {@link DataSpec} with the {@link Uri} for incoming RTP connection. */ + public static DataSpec getIncomingRtpDataSpec(int portNumber) { + return new DataSpec( + Uri.parse(Util.formatInvariant("%s:%d", RTP_ANY_INCOMING_IPV4, portNumber))); + } + + private RtpUtils() {} +} diff --git a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspClient.java b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspClient.java index c64bf343c6..b18b707459 100644 --- a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspClient.java +++ b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspClient.java @@ -91,14 +91,15 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; private final SessionInfoListener sessionInfoListener; private final Uri uri; @Nullable private final String userAgent; - private final RtspMessageChannel messageChannel; private final ArrayDeque pendingSetupRtpLoadInfos; // TODO(b/172331505) Add a timeout monitor for pending requests. private final SparseArray pendingRequests; private final MessageSender messageSender; + private final SparseArray transferRtpDataChannelMap; + private RtspMessageChannel messageChannel; private @MonotonicNonNull PlaybackEventListener playbackEventListener; - private @MonotonicNonNull String sessionId; + @Nullable private String sessionId; @Nullable private KeepAliveMonitor keepAliveMonitor; private boolean hasUpdatedTimelineAndTracks; private long pendingSeekPositionUs; @@ -121,11 +122,12 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; this.sessionInfoListener = sessionInfoListener; this.uri = RtspMessageUtil.removeUserInfo(uri); this.userAgent = userAgent; - messageChannel = new RtspMessageChannel(new MessageListener()); pendingSetupRtpLoadInfos = new ArrayDeque<>(); pendingRequests = new SparseArray<>(); messageSender = new MessageSender(); + transferRtpDataChannelMap = new SparseArray<>(); pendingSeekPositionUs = C.TIME_UNSET; + messageChannel = new RtspMessageChannel(new MessageListener()); } /** @@ -137,11 +139,8 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; * @throws IOException When failed to open a connection to the supplied {@link Uri}. */ public void start() throws IOException { - checkArgument(uri.getHost() != null); - int rtspPort = uri.getPort() > 0 ? uri.getPort() : DEFAULT_RTSP_PORT; - Socket socket = SocketFactory.getDefault().createSocket(checkNotNull(uri.getHost()), rtspPort); try { - messageChannel.openSocket(socket); + messageChannel.openSocket(openSocket()); } catch (IOException e) { Util.closeQuietly(messageChannel); throw e; @@ -149,6 +148,13 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; messageSender.sendOptionsRequest(uri, sessionId); } + /** Opens a {@link Socket} to the session {@link #uri}. */ + private Socket openSocket() throws IOException { + checkArgument(uri.getHost() != null); + int rtspPort = uri.getPort() > 0 ? uri.getPort() : DEFAULT_RTSP_PORT; + return SocketFactory.getDefault().createSocket(checkNotNull(uri.getHost()), rtspPort); + } + /** Sets the {@link PlaybackEventListener} to receive playback events. */ public void setPlaybackEventListener(PlaybackEventListener playbackEventListener) { this.playbackEventListener = playbackEventListener; @@ -202,6 +208,27 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; messageChannel.close(); } + /** + * Sets up a new playback session using TCP as RTP lower transport. + * + *

This mode is also known as "RTP-over-RTSP". + */ + public void retryWithRtpTcp() { + try { + close(); + messageChannel = new RtspMessageChannel(new MessageListener()); + messageChannel.openSocket(openSocket()); + sessionId = null; + } catch (IOException e) { + checkNotNull(playbackEventListener).onPlaybackError(new RtspPlaybackException(e)); + } + } + + /** Registers an {@link RtpDataChannel} to receive RTSP interleaved data. */ + public void registerInterleavedDataChannel(RtpDataChannel rtpDataChannel) { + transferRtpDataChannelMap.put(rtpDataChannel.getLocalPort(), rtpDataChannel); + } + private void continueSetupRtspTrack() { @Nullable RtpLoadInfo loadInfo = pendingSetupRtpLoadInfos.pollFirst(); if (loadInfo == null) { @@ -413,6 +440,14 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; } } + @Override + public void onInterleavedBinaryDataReceived(byte[] data, int channel) { + @Nullable RtpDataChannel dataChannel = transferRtpDataChannelMap.get(channel); + if (dataChannel != null) { + dataChannel.write(data); + } + } + // Response handlers must only be called only on 200 (OK) responses. public void onOptionsResponseReceived(RtspOptionsResponse response) { diff --git a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMediaPeriod.java b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMediaPeriod.java index bf54f1775d..c88ae3bb1f 100644 --- a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMediaPeriod.java +++ b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMediaPeriod.java @@ -47,6 +47,7 @@ import com.google.android.exoplayer2.trackselection.ExoTrackSelection; import com.google.android.exoplayer2.trackselection.TrackSelection; import com.google.android.exoplayer2.upstream.Allocator; import com.google.android.exoplayer2.upstream.Loader; +import com.google.android.exoplayer2.upstream.Loader.LoadErrorAction; import com.google.android.exoplayer2.upstream.Loader.Loadable; import com.google.android.exoplayer2.util.Util; import com.google.common.collect.ImmutableList; @@ -61,7 +62,6 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; /** A {@link MediaPeriod} that loads an RTSP stream. */ /* package */ final class RtspMediaPeriod implements MediaPeriod { - private static final String TAG = "RtspMediaPeriod"; /** The maximum times to retry if the underlying data channel failed to bind. */ private static final int PORT_BINDING_MAX_RETRY_COUNT = 3; @@ -70,7 +70,6 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; private final InternalListener internalListener; private final RtspClient rtspClient; - private final RtpDataChannel.Factory rtpDataChannelFactory; private final List rtspLoaderWrappers; private final List selectedLoadInfos; @@ -85,6 +84,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; private boolean prepared; private boolean trackSelected; private int portBindingRetryCount; + private boolean hasRetriedWithRtpTcp; /** * Creates an RTSP media period. @@ -106,11 +106,11 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; rtspLoaderWrappers = new ArrayList<>(rtspTracks.size()); this.rtspClient = rtspClient; this.rtspClient.setPlaybackEventListener(internalListener); - this.rtpDataChannelFactory = rtpDataChannelFactory; for (int i = 0; i < rtspTracks.size(); i++) { RtspMediaTrack rtspMediaTrack = rtspTracks.get(i); - rtspLoaderWrappers.add(new RtspLoaderWrapper(rtspMediaTrack, /* trackId= */ i)); + rtspLoaderWrappers.add( + new RtspLoaderWrapper(rtspMediaTrack, /* trackId= */ i, rtpDataChannelFactory)); } selectedLoadInfos = new ArrayList<>(rtspTracks.size()); pendingSeekPositionUs = C.TIME_UNSET; @@ -434,7 +434,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; preparationError = error; } else { if (error.getCause() instanceof SocketTimeoutException) { - handleSocketTimeout(loadable); + return handleSocketTimeout(loadable); } else if (error.getCause() instanceof BindException) { // Allow for retry on RTP port open failure by catching BindException. Two ports are // opened for each RTP stream, the first port number is auto assigned by the system, while @@ -511,22 +511,63 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; } /** Handles the {@link Loadable} whose {@link RtpDataChannel} timed out. */ - private void handleSocketTimeout(RtpDataLoadable loadable) { + private LoadErrorAction handleSocketTimeout(RtpDataLoadable loadable) { // TODO(b/172331505) Allow for retry when loading is not ending. if (getBufferedPositionUs() == Long.MIN_VALUE) { - // Raise exception if no sample has been received so far. - playbackException = new RtspPlaybackException("Possible dropped UDP connection."); - return; + // Retry playback with TCP if no sample has been received so far. + if (!hasRetriedWithRtpTcp) { + retryWithRtpTcp(); + hasRetriedWithRtpTcp = true; + } + // Don't retry with the current UDP backed loadables. + return Loader.DONT_RETRY; } for (int i = 0; i < rtspLoaderWrappers.size(); i++) { RtspLoaderWrapper loaderWrapper = rtspLoaderWrappers.get(i); if (loaderWrapper.loadInfo.loadable == loadable) { loaderWrapper.cancelLoad(); - return; + break; } } playbackException = new RtspPlaybackException("Unknown loadable timed out."); + return Loader.DONT_RETRY; + } + } + + private void retryWithRtpTcp() { + rtspClient.retryWithRtpTcp(); + + RtpDataChannel.Factory rtpDataChannelFactory = new TransferRtpDataChannelFactory(); + ArrayList newLoaderWrappers = new ArrayList<>(rtspLoaderWrappers.size()); + ArrayList newSelectedLoadInfos = new ArrayList<>(selectedLoadInfos.size()); + + for (int i = 0; i < rtspLoaderWrappers.size(); i++) { + RtspLoaderWrapper loaderWrapper = rtspLoaderWrappers.get(i); + + RtspLoaderWrapper newLoaderWrapper = + new RtspLoaderWrapper( + loaderWrapper.loadInfo.mediaTrack, /* trackId= */ i, rtpDataChannelFactory); + newLoaderWrappers.add(newLoaderWrapper); + newLoaderWrapper.startLoading(); + + if (selectedLoadInfos.contains(loaderWrapper.loadInfo)) { + newSelectedLoadInfos.add(newLoaderWrapper.loadInfo); + } + } + + // Switch to new LoaderWrappers. + ImmutableList oldRtspLoaderWrappers = + ImmutableList.copyOf(rtspLoaderWrappers); + rtspLoaderWrappers.clear(); + rtspLoaderWrappers.addAll(newLoaderWrappers); + selectedLoadInfos.clear(); + selectedLoadInfos.addAll(newSelectedLoadInfos); + + // Cancel old loadable wrappers after switching, so that buffered position is always read from + // active sample queues. + for (int i = 0; i < oldRtspLoaderWrappers.size(); i++) { + oldRtspLoaderWrappers.get(i).cancelLoad(); } } @@ -576,9 +617,10 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; * *

Instances must be {@link #release() released} after loadings conclude. */ - public RtspLoaderWrapper(RtspMediaTrack mediaTrack, int trackId) { - loadInfo = new RtpLoadInfo(mediaTrack, trackId); - loader = new Loader("ExoPlayer:RtspMediaPeriod:RtspDataLoader " + trackId); + public RtspLoaderWrapper( + RtspMediaTrack mediaTrack, int trackId, RtpDataChannel.Factory rtpDataChannelFactory) { + loadInfo = new RtpLoadInfo(mediaTrack, trackId, rtpDataChannelFactory); + loader = new Loader("ExoPlayer:RtspMediaPeriod:RtspLoaderWrapper " + trackId); sampleQueue = SampleQueue.createWithoutDrm(allocator); sampleQueue.setUpstreamFormatChangeListener(internalListener); } @@ -639,14 +681,21 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull; @Nullable private String transport; /** Creates a new instance. */ - public RtpLoadInfo(RtspMediaTrack mediaTrack, int trackId) { + public RtpLoadInfo( + RtspMediaTrack mediaTrack, int trackId, RtpDataChannel.Factory rtpDataChannelFactory) { this.mediaTrack = mediaTrack; RtpDataLoadable.EventListener transportEventListener = - (transport) -> { - this.transport = transport; + (transport, rtpDataChannel) -> { + RtpLoadInfo.this.transport = transport; + + if (rtpDataChannel.usesSidebandBinaryData()) { + rtspClient.registerInterleavedDataChannel(rtpDataChannel); + } + maybeSetupTracks(); }; + this.loadable = new RtpDataLoadable( trackId, diff --git a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMediaSource.java b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMediaSource.java index e4e49306b7..9596be328f 100644 --- a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMediaSource.java +++ b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/RtspMediaSource.java @@ -150,7 +150,7 @@ public final class RtspMediaSource extends BaseMediaSource { private RtspMediaSource(MediaItem mediaItem) { this.mediaItem = mediaItem; - rtpDataChannelFactory = RtpDataChannel.DEFAULT_FACTORY; + rtpDataChannelFactory = new UdpDataSourceRtpDataChannelFactory(); } @Override diff --git a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/TransferRtpDataChannel.java b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/TransferRtpDataChannel.java new file mode 100644 index 0000000000..fc1bcd63ff --- /dev/null +++ b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/TransferRtpDataChannel.java @@ -0,0 +1,125 @@ +/* + * Copyright 2021 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.android.exoplayer2.source.rtsp; + +import static com.google.android.exoplayer2.util.Assertions.checkState; +import static java.lang.Math.min; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +import android.net.Uri; +import androidx.annotation.Nullable; +import com.google.android.exoplayer2.C; +import com.google.android.exoplayer2.upstream.BaseDataSource; +import com.google.android.exoplayer2.upstream.DataSpec; +import com.google.android.exoplayer2.util.Util; +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.util.Arrays; +import java.util.concurrent.LinkedBlockingQueue; + +/** An {@link RtpDataChannel} that transfers received data in-memory. */ +/* package */ final class TransferRtpDataChannel extends BaseDataSource implements RtpDataChannel { + + private static final String DEFAULT_TCP_TRANSPORT_FORMAT = + "RTP/AVP/TCP;unicast;interleaved=%d-%d"; + private static final long TIMEOUT_MS = 8_000; + + private final LinkedBlockingQueue packetQueue; + + private byte[] unreadData; + private int channelNumber; + + /** Creates a new instance. */ + public TransferRtpDataChannel() { + super(/* isNetwork= */ true); + packetQueue = new LinkedBlockingQueue<>(); + unreadData = new byte[0]; + channelNumber = C.INDEX_UNSET; + } + + @Override + public String getTransport() { + checkState(channelNumber != C.INDEX_UNSET); // Assert open() is called. + return Util.formatInvariant(DEFAULT_TCP_TRANSPORT_FORMAT, channelNumber, channelNumber + 1); + } + + @Override + public int getLocalPort() { + return channelNumber; + } + + @Override + public boolean usesSidebandBinaryData() { + return true; + } + + @Override + public long open(DataSpec dataSpec) { + this.channelNumber = dataSpec.uri.getPort(); + return C.LENGTH_UNSET; + } + + @Override + public void close() {} + + @Nullable + @Override + public Uri getUri() { + return null; + } + + @Override + public int read(byte[] target, int offset, int length) throws IOException { + if (length == 0) { + return 0; + } + + int bytesRead = 0; + int bytesToRead = min(length, unreadData.length); + System.arraycopy(unreadData, /* srcPos= */ 0, target, offset, bytesToRead); + bytesRead += bytesToRead; + unreadData = Arrays.copyOfRange(unreadData, bytesToRead, unreadData.length); + + if (bytesRead == length) { + return bytesRead; + } + + @Nullable byte[] data; + try { + // TODO(internal b/172331505) Consider move the receiving timeout logic to an upper level + // (maybe RtspClient). There is no actual socket receiving here. + data = packetQueue.poll(TIMEOUT_MS, MILLISECONDS); + if (data == null) { + throw new IOException(new SocketTimeoutException()); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return C.RESULT_END_OF_INPUT; + } + + bytesToRead = min(length - bytesRead, data.length); + System.arraycopy(data, /* srcPos= */ 0, target, offset + bytesRead, bytesToRead); + if (bytesToRead < data.length) { + unreadData = Arrays.copyOfRange(data, bytesToRead, data.length); + } + return bytesRead + bytesToRead; + } + + @Override + public void write(byte[] buffer) { + packetQueue.add(buffer); + } +} diff --git a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/TransferRtpDataChannelFactory.java b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/TransferRtpDataChannelFactory.java new file mode 100644 index 0000000000..31860a8fd6 --- /dev/null +++ b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/TransferRtpDataChannelFactory.java @@ -0,0 +1,29 @@ +/* + * Copyright 2021 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.android.exoplayer2.source.rtsp; + +/** Factory for {@link TransferRtpDataChannel}. */ +/* package */ final class TransferRtpDataChannelFactory implements RtpDataChannel.Factory { + + private static final int INTERLEAVED_CHANNELS_PER_TRACK = 2; + + @Override + public RtpDataChannel createAndOpenDataChannel(int trackId) { + TransferRtpDataChannel dataChannel = new TransferRtpDataChannel(); + dataChannel.open(RtpUtils.getIncomingRtpDataSpec(trackId * INTERLEAVED_CHANNELS_PER_TRACK)); + return dataChannel; + } +} diff --git a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/UdpDataSourceRtpDataChannel.java b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/UdpDataSourceRtpDataChannel.java index 429be6634d..3f56d4ac36 100644 --- a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/UdpDataSourceRtpDataChannel.java +++ b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/UdpDataSourceRtpDataChannel.java @@ -15,25 +15,44 @@ */ package com.google.android.exoplayer2.source.rtsp; +import static com.google.android.exoplayer2.util.Assertions.checkArgument; +import static com.google.android.exoplayer2.util.Assertions.checkState; + import android.net.Uri; import androidx.annotation.Nullable; +import com.google.android.exoplayer2.C; import com.google.android.exoplayer2.upstream.DataSpec; import com.google.android.exoplayer2.upstream.TransferListener; import com.google.android.exoplayer2.upstream.UdpDataSource; +import com.google.android.exoplayer2.util.Util; import java.io.IOException; /** An {@link RtpDataChannel} for UDP transport. */ /* package */ final class UdpDataSourceRtpDataChannel implements RtpDataChannel { + + private static final String DEFAULT_UDP_TRANSPORT_FORMAT = "RTP/AVP;unicast;client_port=%d-%d"; + private final UdpDataSource dataSource; + /** The associated RTCP channel; {@code null} if the current channel is an RTCP channel. */ + @Nullable private UdpDataSourceRtpDataChannel rtcpChannel; + /** Creates a new instance. */ public UdpDataSourceRtpDataChannel() { dataSource = new UdpDataSource(); } + @Override + public String getTransport() { + int dataPortNumber = getLocalPort(); + checkState(dataPortNumber != C.INDEX_UNSET); // Assert open() is called. + return Util.formatInvariant(DEFAULT_UDP_TRANSPORT_FORMAT, dataPortNumber, dataPortNumber + 1); + } + @Override public int getLocalPort() { - return dataSource.getLocalPort(); + int port = dataSource.getLocalPort(); + return port == UdpDataSource.UDP_PORT_UNSET ? C.INDEX_UNSET : port; } @Override @@ -55,10 +74,33 @@ import java.io.IOException; @Override public void close() { dataSource.close(); + + if (rtcpChannel != null) { + rtcpChannel.close(); + } } @Override public int read(byte[] target, int offset, int length) throws IOException { return dataSource.read(target, offset, length); } + + @Override + public boolean usesSidebandBinaryData() { + return false; + } + + /** + * Writing to a {@link UdpDataSource} backed {@link RtpDataChannel} is not supported at the + * moment. + */ + @Override + public void write(byte[] buffer) { + throw new UnsupportedOperationException(); + } + + public void setRtcpChannel(UdpDataSourceRtpDataChannel rtcpChannel) { + checkArgument(this != rtcpChannel); + this.rtcpChannel = rtcpChannel; + } } diff --git a/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/UdpDataSourceRtpDataChannelFactory.java b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/UdpDataSourceRtpDataChannelFactory.java new file mode 100644 index 0000000000..ccb201dbe9 --- /dev/null +++ b/library/rtsp/src/main/java/com/google/android/exoplayer2/source/rtsp/UdpDataSourceRtpDataChannelFactory.java @@ -0,0 +1,56 @@ +/* + * Copyright 2021 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.android.exoplayer2.source.rtsp; + +import com.google.android.exoplayer2.util.Util; +import java.io.IOException; + +/** Factory for {@link UdpDataSourceRtpDataChannel}. */ +/* package */ final class UdpDataSourceRtpDataChannelFactory implements RtpDataChannel.Factory { + + @Override + public RtpDataChannel createAndOpenDataChannel(int trackId) throws IOException { + UdpDataSourceRtpDataChannel firstChannel = new UdpDataSourceRtpDataChannel(); + UdpDataSourceRtpDataChannel secondChannel = new UdpDataSourceRtpDataChannel(); + + try { + // From RFC3550 Section 11: "For UDP and similar protocols, RTP SHOULD use an even destination + // port number and the corresponding RTCP stream SHOULD use the next higher (odd) destination + // port number". Some RTSP servers are strict about this rule. We open a data channel first, + // and depending its port number, open the next data channel with a port number that is either + // the higher or the lower. + + // Using port zero will cause the system to generate a port. + firstChannel.open(RtpUtils.getIncomingRtpDataSpec(/* portNumber= */ 0)); + int firstPort = firstChannel.getLocalPort(); + boolean isFirstPortEven = firstPort % 2 == 0; + int portToOpen = isFirstPortEven ? firstPort + 1 : firstPort - 1; + secondChannel.open(RtpUtils.getIncomingRtpDataSpec(/* portNumber= */ portToOpen)); + + if (isFirstPortEven) { + firstChannel.setRtcpChannel(secondChannel); + return firstChannel; + } else { + secondChannel.setRtcpChannel(firstChannel); + return secondChannel; + } + } catch (IOException e) { + Util.closeQuietly(firstChannel); + Util.closeQuietly(secondChannel); + throw e; + } + } +} diff --git a/library/rtsp/src/test/java/com/google/android/exoplayer2/source/rtsp/RtspMediaPeriodTest.java b/library/rtsp/src/test/java/com/google/android/exoplayer2/source/rtsp/RtspMediaPeriodTest.java index a93fe3a897..16b48c06c1 100644 --- a/library/rtsp/src/test/java/com/google/android/exoplayer2/source/rtsp/RtspMediaPeriodTest.java +++ b/library/rtsp/src/test/java/com/google/android/exoplayer2/source/rtsp/RtspMediaPeriodTest.java @@ -67,7 +67,7 @@ public class RtspMediaPeriodTest { .build(), Uri.parse("rtsp://localhost/test"))), PLACEHOLDER_RTSP_CLIENT, - RtpDataChannel.DEFAULT_FACTORY); + new UdpDataSourceRtpDataChannelFactory()); AtomicBoolean prepareCallbackCalled = new AtomicBoolean(false); rtspMediaPeriod.prepare( @@ -95,7 +95,7 @@ public class RtspMediaPeriodTest { new DefaultAllocator(/* trimOnReset= */ true, C.DEFAULT_BUFFER_SEGMENT_SIZE), ImmutableList.of(), PLACEHOLDER_RTSP_CLIENT, - RtpDataChannel.DEFAULT_FACTORY); + new UdpDataSourceRtpDataChannelFactory()); assertThat(rtspMediaPeriod.getBufferedPositionUs()).isEqualTo(C.TIME_END_OF_SOURCE); } diff --git a/library/rtsp/src/test/java/com/google/android/exoplayer2/source/rtsp/TransferRtpDataChannelTest.java b/library/rtsp/src/test/java/com/google/android/exoplayer2/source/rtsp/TransferRtpDataChannelTest.java new file mode 100644 index 0000000000..93c921456e --- /dev/null +++ b/library/rtsp/src/test/java/com/google/android/exoplayer2/source/rtsp/TransferRtpDataChannelTest.java @@ -0,0 +1,153 @@ +/* + * Copyright 2021 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.android.exoplayer2.source.rtsp; + +import static com.google.android.exoplayer2.testutil.TestUtil.buildTestData; +import static com.google.common.truth.Truth.assertThat; + +import androidx.test.ext.junit.runners.AndroidJUnit4; +import com.google.common.primitives.Bytes; +import java.util.Arrays; +import org.junit.Test; +import org.junit.runner.RunWith; + +/** Unit test for {@link TransferRtpDataChannel}. */ +@RunWith(AndroidJUnit4.class) +public class TransferRtpDataChannelTest { + + @Test + public void read_withLargeEnoughBuffer_reads() throws Exception { + byte[] randomBytes = buildTestData(20); + byte[] buffer = new byte[40]; + TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel(); + transferRtpDataChannel.write(randomBytes); + + transferRtpDataChannel.read(buffer, /* offset= */ 0, buffer.length); + + assertThat(Arrays.copyOfRange(buffer, /* from= */ 0, /* to= */ 20)).isEqualTo(randomBytes); + } + + @Test + public void read_withSmallBufferEnoughBuffer_readsThreeTimes() throws Exception { + byte[] randomBytes = buildTestData(20); + byte[] buffer = new byte[8]; + TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel(); + transferRtpDataChannel.write(randomBytes); + + transferRtpDataChannel.read(buffer, /* offset= */ 0, buffer.length); + assertThat(buffer).isEqualTo(Arrays.copyOfRange(randomBytes, /* from= */ 0, /* to= */ 8)); + transferRtpDataChannel.read(buffer, /* offset= */ 0, buffer.length); + assertThat(buffer).isEqualTo(Arrays.copyOfRange(randomBytes, /* from= */ 8, /* to= */ 16)); + transferRtpDataChannel.read(buffer, /* offset= */ 0, /* length= */ 4); + assertThat(Arrays.copyOfRange(buffer, /* from= */ 0, /* to= */ 4)) + .isEqualTo(Arrays.copyOfRange(randomBytes, /* from= */ 16, /* to= */ 20)); + } + + @Test + public void read_withSmallBuffer_reads() throws Exception { + byte[] randomBytes = buildTestData(40); + byte[] buffer = new byte[20]; + TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel(); + transferRtpDataChannel.write(randomBytes); + + transferRtpDataChannel.read(buffer, /* offset= */ 0, buffer.length); + assertThat(buffer).isEqualTo(Arrays.copyOfRange(randomBytes, /* from= */ 0, /* to= */ 20)); + + transferRtpDataChannel.read(buffer, /* offset= */ 0, buffer.length); + assertThat(buffer).isEqualTo(Arrays.copyOfRange(randomBytes, /* from= */ 20, /* to= */ 40)); + } + + @Test + public void read_withSmallAndModerateBufferAndSubsequentProducerWrite_reads() throws Exception { + byte[] randomBytes1 = buildTestData(40); + byte[] randomBytes2 = buildTestData(40); + byte[] smallBuffer = new byte[20]; + byte[] bigBuffer = new byte[40]; + TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel(); + transferRtpDataChannel.write(randomBytes1); + + transferRtpDataChannel.read(smallBuffer, /* offset= */ 0, smallBuffer.length); + assertThat(smallBuffer) + .isEqualTo(Arrays.copyOfRange(randomBytes1, /* from= */ 0, /* to= */ 20)); + + transferRtpDataChannel.write(randomBytes2); + + // Read the remaining 20 bytes in randomBytes1, and 20 bytes from randomBytes2. + transferRtpDataChannel.read(bigBuffer, /* offset= */ 0, bigBuffer.length); + assertThat(bigBuffer) + .isEqualTo( + Bytes.concat( + Arrays.copyOfRange(randomBytes1, /* from= */ 20, /* to= */ 40), + Arrays.copyOfRange(randomBytes2, /* from= */ 0, /* to= */ 20))); + + // Read the remaining 20 bytes in randomBytes2. + transferRtpDataChannel.read(smallBuffer, /* offset= */ 0, smallBuffer.length); + assertThat(smallBuffer) + .isEqualTo(Arrays.copyOfRange(randomBytes2, /* from= */ 20, /* to= */ 40)); + } + + @Test + public void read_withSmallAndBigBufferWithPartialReadAndSubsequentProducerWrite_reads() + throws Exception { + byte[] randomBytes1 = buildTestData(40); + byte[] randomBytes2 = buildTestData(40); + byte[] smallBuffer = new byte[30]; + byte[] bigBuffer = new byte[30]; + TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel(); + transferRtpDataChannel.write(randomBytes1); + + transferRtpDataChannel.read(smallBuffer, /* offset= */ 0, smallBuffer.length); + assertThat(smallBuffer) + .isEqualTo(Arrays.copyOfRange(randomBytes1, /* from= */ 0, /* to= */ 30)); + + transferRtpDataChannel.write(randomBytes2); + + // Read 30 bytes to big buffer. + transferRtpDataChannel.read(bigBuffer, /* offset= */ 0, bigBuffer.length); + assertThat(bigBuffer) + .isEqualTo( + Bytes.concat( + Arrays.copyOfRange(randomBytes1, /* from= */ 30, /* to= */ 40), + Arrays.copyOfRange(randomBytes1, /* from= */ 0, /* to= */ 20))); + + // Read the remaining 20 bytes to big buffer. + transferRtpDataChannel.read(bigBuffer, /* offset= */ 0, /* length= */ 20); + assertThat(Arrays.copyOfRange(bigBuffer, /* from= */ 0, /* to= */ 20)) + .isEqualTo(Arrays.copyOfRange(randomBytes2, /* from= */ 20, /* to= */ 40)); + } + + @Test + public void read_withSmallAndBigBufferAndSubsequentProducerWrite_reads() throws Exception { + byte[] randomBytes1 = buildTestData(40); + byte[] randomBytes2 = buildTestData(40); + byte[] smallBuffer = new byte[20]; + byte[] bigBuffer = new byte[70]; + TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel(); + transferRtpDataChannel.write(randomBytes1); + + transferRtpDataChannel.read(smallBuffer, /* offset= */ 0, smallBuffer.length); + assertThat(smallBuffer) + .isEqualTo(Arrays.copyOfRange(randomBytes1, /* from= */ 0, /* to= */ 20)); + + transferRtpDataChannel.write(randomBytes2); + + transferRtpDataChannel.read(bigBuffer, /* offset= */ 0, bigBuffer.length); + assertThat(Arrays.copyOfRange(bigBuffer, /* from= */ 0, /* to= */ 60)) + .isEqualTo( + Bytes.concat( + Arrays.copyOfRange(randomBytes1, /* from= */ 20, /* to= */ 40), randomBytes2)); + } +}