Improve timeout handling and allow customizing the timeout.

Previously, a SocketTimeourException is used to signal the end of the stream
that is caused by "no RTP packets received for a while". However, such
signaling is inappropriate under TransferRtpDataChannel, or FakeRtpDataChannel
in RtspPlaybackTests.

Hence, the signaling of end of stream is changed to use RESULT_END_OF_INPUT.
The RtpDataChannel implementations will Still block until a set timeout, but
will return a C.RESULT_END_OF_INPUT should a timeout occur, instead of
throwing a nested SocketTimeoutException.

This also allowed customization of the timeout amount, in
RtspMediaSource.Factory

PiperOrigin-RevId: 380981534
This commit is contained in:
claincly 2021-06-23 09:47:11 +01:00 committed by Ian Baker
parent e4bd2e213f
commit baa9a367e2
11 changed files with 162 additions and 77 deletions

View file

@ -30,9 +30,16 @@ import java.io.IOException;
/**
* Creates a new {@link RtpDataChannel} instance for RTP data transfer.
*
* @param trackId The track ID.
* @throws IOException If the data channels failed to open.
*/
RtpDataChannel createAndOpenDataChannel(int trackId) throws IOException;
/** Returns a fallback {@code Factory}, {@code null} when there is no fallback available. */
@Nullable
default Factory createFallbackDataChannelFactory() {
return null;
}
}
/** Returns the RTSP transport header for this {@link RtpDataChannel} */

View file

@ -22,6 +22,7 @@ import android.os.Handler;
import androidx.annotation.Nullable;
import com.google.android.exoplayer2.C;
import com.google.android.exoplayer2.extractor.DefaultExtractorInput;
import com.google.android.exoplayer2.extractor.Extractor;
import com.google.android.exoplayer2.extractor.ExtractorInput;
import com.google.android.exoplayer2.extractor.ExtractorOutput;
import com.google.android.exoplayer2.extractor.PositionHolder;
@ -153,7 +154,13 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
extractor.seek(nextRtpTimestamp, pendingSeekPositionUs);
pendingSeekPositionUs = C.TIME_UNSET;
}
extractor.read(extractorInput, /* seekPosition= */ new PositionHolder());
@Extractor.ReadResult
int readResult = extractor.read(extractorInput, /* seekPosition= */ new PositionHolder());
if (readResult == Extractor.RESULT_END_OF_INPUT) {
// Loading is finished.
break;
}
}
} finally {
Util.closeQuietly(dataChannel);

View file

@ -125,10 +125,10 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
// Reads one RTP packet at a time.
int bytesRead = input.read(rtpPacketScratchBuffer.getData(), 0, RtpPacket.MAX_SIZE);
if (bytesRead == RESULT_END_OF_INPUT) {
return RESULT_END_OF_INPUT;
if (bytesRead == C.RESULT_END_OF_INPUT) {
return Extractor.RESULT_END_OF_INPUT;
} else if (bytesRead == 0) {
return RESULT_CONTINUE;
return Extractor.RESULT_CONTINUE;
}
rtpPacketScratchBuffer.setPosition(0);

View file

@ -48,13 +48,11 @@ import com.google.android.exoplayer2.trackselection.ExoTrackSelection;
import com.google.android.exoplayer2.trackselection.TrackSelection;
import com.google.android.exoplayer2.upstream.Allocator;
import com.google.android.exoplayer2.upstream.Loader;
import com.google.android.exoplayer2.upstream.Loader.LoadErrorAction;
import com.google.android.exoplayer2.upstream.Loader.Loadable;
import com.google.android.exoplayer2.util.Util;
import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.net.BindException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.List;
import org.checkerframework.checker.nullness.compatqual.NullableType;
@ -103,6 +101,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
* @param rtpDataChannelFactory A {@link RtpDataChannel.Factory} for {@link RtpDataChannel}.
* @param uri The RTSP playback {@link Uri}.
* @param listener A {@link Listener} to receive session information updates.
* @param userAgent The user agent.
*/
public RtspMediaPeriod(
Allocator allocator,
@ -432,7 +431,28 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
@Override
public void onLoadCompleted(
RtpDataLoadable loadable, long elapsedRealtimeMs, long loadDurationMs) {}
RtpDataLoadable loadable, long elapsedRealtimeMs, long loadDurationMs) {
// TODO(b/172331505) Allow for retry when loading is not ending.
if (getBufferedPositionUs() == 0) {
if (!isUsingRtpTcp) {
// Retry playback with TCP if no sample has been received so far, and we are not already
// using TCP. Retrying will setup new loadables, so will not retry with the current
// loadables.
retryWithRtpTcp();
isUsingRtpTcp = true;
}
return;
}
// Cancel the loader wrapper associated with the completed loadable.
for (int i = 0; i < rtspLoaderWrappers.size(); i++) {
RtspLoaderWrapper loaderWrapper = rtspLoaderWrappers.get(i);
if (loaderWrapper.loadInfo.loadable == loadable) {
loaderWrapper.cancelLoad();
break;
}
}
}
@Override
public void onLoadCanceled(
@ -458,9 +478,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
if (!prepared) {
preparationError = error;
} else {
if (error.getCause() instanceof SocketTimeoutException) {
return handleSocketTimeout(loadable);
} else if (error.getCause() instanceof BindException) {
if (error.getCause() instanceof BindException) {
// Allow for retry on RTP port open failure by catching BindException. Two ports are
// opened for each RTP stream, the first port number is auto assigned by the system, while
// the second is manually selected. It is thus possible that the second port fails to
@ -535,30 +553,6 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
playbackException = error;
}
/** Handles the {@link Loadable} whose {@link RtpDataChannel} timed out. */
private LoadErrorAction handleSocketTimeout(RtpDataLoadable loadable) {
// TODO(b/172331505) Allow for retry when loading is not ending.
if (getBufferedPositionUs() == 0) {
if (!isUsingRtpTcp) {
// Retry playback with TCP if no sample has been received so far, and we are not already
// using TCP. Retrying will setup new loadables, so will not retry with the current
// loadables.
retryWithRtpTcp();
isUsingRtpTcp = true;
}
return Loader.DONT_RETRY;
}
for (int i = 0; i < rtspLoaderWrappers.size(); i++) {
RtspLoaderWrapper loaderWrapper = rtspLoaderWrappers.get(i);
if (loaderWrapper.loadInfo.loadable == loadable) {
loaderWrapper.cancelLoad();
break;
}
}
return Loader.DONT_RETRY;
}
@Override
public void onSessionTimelineUpdated(
RtspSessionTiming timing, ImmutableList<RtspMediaTrack> tracks) {
@ -582,7 +576,15 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
private void retryWithRtpTcp() {
rtspClient.retryWithRtpTcp();
RtpDataChannel.Factory rtpDataChannelFactory = new TransferRtpDataChannelFactory();
@Nullable
RtpDataChannel.Factory fallbackRtpDataChannelFactory =
rtpDataChannelFactory.createFallbackDataChannelFactory();
if (fallbackRtpDataChannelFactory == null) {
playbackException =
new RtspPlaybackException("No fallback data channel factory for TCP retry");
return;
}
ArrayList<RtspLoaderWrapper> newLoaderWrappers = new ArrayList<>(rtspLoaderWrappers.size());
ArrayList<RtpLoadInfo> newSelectedLoadInfos = new ArrayList<>(selectedLoadInfos.size());
@ -593,7 +595,7 @@ import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
if (!loaderWrapper.canceled) {
RtspLoaderWrapper newLoaderWrapper =
new RtspLoaderWrapper(
loaderWrapper.loadInfo.mediaTrack, /* trackId= */ i, rtpDataChannelFactory);
loaderWrapper.loadInfo.mediaTrack, /* trackId= */ i, fallbackRtpDataChannelFactory);
newLoaderWrappers.add(newLoaderWrapper);
newLoaderWrapper.startLoading();
if (selectedLoadInfos.contains(loaderWrapper.loadInfo)) {

View file

@ -16,9 +16,11 @@
package com.google.android.exoplayer2.source.rtsp;
import static com.google.android.exoplayer2.util.Assertions.checkArgument;
import static com.google.android.exoplayer2.util.Assertions.checkNotNull;
import android.net.Uri;
import androidx.annotation.IntRange;
import androidx.annotation.Nullable;
import com.google.android.exoplayer2.C;
import com.google.android.exoplayer2.ExoPlayerLibraryInfo;
@ -45,6 +47,9 @@ public final class RtspMediaSource extends BaseMediaSource {
ExoPlayerLibraryInfo.registerModule("goog.exo.rtsp");
}
/** The default value for {@link Factory#setTimeoutMs}. */
public static final long DEFAULT_TIMEOUT_MS = 8000;
/**
* Factory for {@link RtspMediaSource}
*
@ -60,10 +65,12 @@ public final class RtspMediaSource extends BaseMediaSource {
*/
public static final class Factory implements MediaSourceFactory {
private long timeoutMs;
private String userAgent;
private boolean forceUseRtpTcp;
public Factory() {
timeoutMs = DEFAULT_TIMEOUT_MS;
userAgent = ExoPlayerLibraryInfo.VERSION_SLASHY;
}
@ -94,6 +101,21 @@ public final class RtspMediaSource extends BaseMediaSource {
return this;
}
/**
* Sets the timeout in milliseconds, the default value is {@link #DEFAULT_TIMEOUT_MS}.
*
* <p>A positive number of milliseconds to wait before lack of received RTP packets is treated
* as the end of input.
*
* @param timeoutMs The timeout measured in milliseconds.
* @return This Factory, for convenience.
*/
public Factory setTimeoutMs(@IntRange(from = 1) long timeoutMs) {
checkArgument(timeoutMs > 0);
this.timeoutMs = timeoutMs;
return this;
}
/** Does nothing. {@link RtspMediaSource} does not support DRM. */
@Override
public Factory setDrmSessionManagerProvider(
@ -161,8 +183,8 @@ public final class RtspMediaSource extends BaseMediaSource {
return new RtspMediaSource(
mediaItem,
forceUseRtpTcp
? new TransferRtpDataChannelFactory()
: new UdpDataSourceRtpDataChannelFactory(),
? new TransferRtpDataChannelFactory(timeoutMs)
: new UdpDataSourceRtpDataChannelFactory(timeoutMs),
userAgent);
}
}

View file

@ -26,8 +26,6 @@ import com.google.android.exoplayer2.source.rtsp.RtspMessageChannel.InterleavedB
import com.google.android.exoplayer2.upstream.BaseDataSource;
import com.google.android.exoplayer2.upstream.DataSpec;
import com.google.android.exoplayer2.util.Util;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.Arrays;
import java.util.concurrent.LinkedBlockingQueue;
@ -37,16 +35,22 @@ import java.util.concurrent.LinkedBlockingQueue;
private static final String DEFAULT_TCP_TRANSPORT_FORMAT =
"RTP/AVP/TCP;unicast;interleaved=%d-%d";
private static final long TIMEOUT_MS = 8_000;
private final LinkedBlockingQueue<byte[]> packetQueue;
private final long pollTimeoutMs;
private byte[] unreadData;
private int channelNumber;
/** Creates a new instance. */
public TransferRtpDataChannel() {
/**
* Creates a new instance.
*
* @param pollTimeoutMs The number of milliseconds which {@link #read} waits for a packet to be
* available. After the time has expired, {@link C#RESULT_END_OF_INPUT} is returned.
*/
public TransferRtpDataChannel(long pollTimeoutMs) {
super(/* isNetwork= */ true);
this.pollTimeoutMs = pollTimeoutMs;
packetQueue = new LinkedBlockingQueue<>();
unreadData = new byte[0];
channelNumber = C.INDEX_UNSET;
@ -84,7 +88,7 @@ import java.util.concurrent.LinkedBlockingQueue;
}
@Override
public int read(byte[] target, int offset, int length) throws IOException {
public int read(byte[] target, int offset, int length) {
if (length == 0) {
return 0;
}
@ -101,11 +105,9 @@ import java.util.concurrent.LinkedBlockingQueue;
@Nullable byte[] data;
try {
// TODO(internal b/172331505) Consider move the receiving timeout logic to an upper level
// (maybe RtspClient). There is no actual socket receiving here.
data = packetQueue.poll(TIMEOUT_MS, MILLISECONDS);
data = packetQueue.poll(pollTimeoutMs, MILLISECONDS);
if (data == null) {
throw new IOException(new SocketTimeoutException());
return C.RESULT_END_OF_INPUT;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();

View file

@ -20,9 +20,21 @@ package com.google.android.exoplayer2.source.rtsp;
private static final int INTERLEAVED_CHANNELS_PER_TRACK = 2;
private final long timeoutMs;
/**
* Creates a new instance.
*
* @param timeoutMs A positive number of milliseconds to wait before lack of received RTP packets
* is treated as the end of input.
*/
public TransferRtpDataChannelFactory(long timeoutMs) {
this.timeoutMs = timeoutMs;
}
@Override
public RtpDataChannel createAndOpenDataChannel(int trackId) {
TransferRtpDataChannel dataChannel = new TransferRtpDataChannel();
TransferRtpDataChannel dataChannel = new TransferRtpDataChannel(timeoutMs);
dataChannel.open(RtpUtils.getIncomingRtpDataSpec(trackId * INTERLEAVED_CHANNELS_PER_TRACK));
return dataChannel;
}

View file

@ -25,7 +25,9 @@ import com.google.android.exoplayer2.upstream.DataSpec;
import com.google.android.exoplayer2.upstream.TransferListener;
import com.google.android.exoplayer2.upstream.UdpDataSource;
import com.google.android.exoplayer2.util.Util;
import com.google.common.primitives.Ints;
import java.io.IOException;
import java.net.SocketTimeoutException;
/** An {@link RtpDataChannel} for UDP transport. */
/* package */ final class UdpDataSourceRtpDataChannel implements RtpDataChannel {
@ -37,9 +39,14 @@ import java.io.IOException;
/** The associated RTCP channel; {@code null} if the current channel is an RTCP channel. */
@Nullable private UdpDataSourceRtpDataChannel rtcpChannel;
/** Creates a new instance. */
public UdpDataSourceRtpDataChannel() {
dataSource = new UdpDataSource();
/**
* Creates a new instance.
*
* @param socketTimeoutMs The timeout for {@link #read} in milliseconds.
*/
public UdpDataSourceRtpDataChannel(long socketTimeoutMs) {
dataSource =
new UdpDataSource(UdpDataSource.DEFAULT_MAX_PACKET_SIZE, Ints.checkedCast(socketTimeoutMs));
}
@Override
@ -88,7 +95,15 @@ import java.io.IOException;
@Override
public int read(byte[] target, int offset, int length) throws IOException {
return dataSource.read(target, offset, length);
try {
return dataSource.read(target, offset, length);
} catch (UdpDataSource.UdpDataSourceException e) {
if (e.getCause() instanceof SocketTimeoutException) {
return C.RESULT_END_OF_INPUT;
} else {
throw e;
}
}
}
public void setRtcpChannel(UdpDataSourceRtpDataChannel rtcpChannel) {

View file

@ -21,10 +21,22 @@ import java.io.IOException;
/** Factory for {@link UdpDataSourceRtpDataChannel}. */
/* package */ final class UdpDataSourceRtpDataChannelFactory implements RtpDataChannel.Factory {
private final long socketTimeoutMs;
/**
* Creates a new instance.
*
* @param socketTimeoutMs A positive number of milliseconds to wait before lack of received RTP
* packets is treated as the end of input.
*/
public UdpDataSourceRtpDataChannelFactory(long socketTimeoutMs) {
this.socketTimeoutMs = socketTimeoutMs;
}
@Override
public RtpDataChannel createAndOpenDataChannel(int trackId) throws IOException {
UdpDataSourceRtpDataChannel firstChannel = new UdpDataSourceRtpDataChannel();
UdpDataSourceRtpDataChannel secondChannel = new UdpDataSourceRtpDataChannel();
UdpDataSourceRtpDataChannel firstChannel = new UdpDataSourceRtpDataChannel(socketTimeoutMs);
UdpDataSourceRtpDataChannel secondChannel = new UdpDataSourceRtpDataChannel(socketTimeoutMs);
try {
// From RFC3550 Section 11: "For UDP and similar protocols, RTP SHOULD use an even destination
@ -53,4 +65,9 @@ import java.io.IOException;
throw e;
}
}
@Override
public RtpDataChannel.Factory createFallbackDataChannelFactory() {
return new TransferRtpDataChannelFactory(/* timeoutMs= */ socketTimeoutMs);
}
}

View file

@ -17,11 +17,10 @@ package com.google.android.exoplayer2.source.rtsp;
import static com.google.android.exoplayer2.testutil.TestUtil.buildTestData;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;
import androidx.test.ext.junit.runners.AndroidJUnit4;
import com.google.android.exoplayer2.C;
import com.google.common.primitives.Bytes;
import java.io.IOException;
import java.util.Arrays;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -30,29 +29,30 @@ import org.junit.runner.RunWith;
@RunWith(AndroidJUnit4.class)
public class TransferRtpDataChannelTest {
private static final long POLL_TIMEOUT_MS = 8000;
@Test
public void getInterleavedBinaryDataListener_returnsAnInterleavedBinaryDataListener() {
TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel();
TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel(POLL_TIMEOUT_MS);
assertThat(transferRtpDataChannel.getInterleavedBinaryDataListener())
.isEqualTo(transferRtpDataChannel);
}
@Test
public void read_withoutReceivingInterleavedData_timesOut() {
TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel();
public void read_withoutReceivingInterleavedData_returnsEndOfInput() {
TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel(POLL_TIMEOUT_MS);
byte[] buffer = new byte[1];
assertThrows(
IOException.class,
() -> transferRtpDataChannel.read(buffer, /* offset= */ 0, buffer.length));
assertThat(transferRtpDataChannel.read(buffer, /* offset= */ 0, buffer.length))
.isEqualTo(C.RESULT_END_OF_INPUT);
}
@Test
public void read_withLargeEnoughBuffer_reads() throws Exception {
public void read_withLargeEnoughBuffer_reads() {
byte[] randomBytes = buildTestData(20);
byte[] buffer = new byte[40];
TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel();
TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel(POLL_TIMEOUT_MS);
transferRtpDataChannel.onInterleavedBinaryDataReceived(randomBytes);
transferRtpDataChannel.read(buffer, /* offset= */ 0, buffer.length);
@ -61,10 +61,10 @@ public class TransferRtpDataChannelTest {
}
@Test
public void read_withSmallBufferEnoughBuffer_readsThreeTimes() throws Exception {
public void read_withSmallBufferEnoughBuffer_readsThreeTimes() {
byte[] randomBytes = buildTestData(20);
byte[] buffer = new byte[8];
TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel();
TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel(POLL_TIMEOUT_MS);
transferRtpDataChannel.onInterleavedBinaryDataReceived(randomBytes);
transferRtpDataChannel.read(buffer, /* offset= */ 0, buffer.length);
@ -77,10 +77,10 @@ public class TransferRtpDataChannelTest {
}
@Test
public void read_withSmallBuffer_reads() throws Exception {
public void read_withSmallBuffer_reads() {
byte[] randomBytes = buildTestData(40);
byte[] buffer = new byte[20];
TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel();
TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel(POLL_TIMEOUT_MS);
transferRtpDataChannel.onInterleavedBinaryDataReceived(randomBytes);
transferRtpDataChannel.read(buffer, /* offset= */ 0, buffer.length);
@ -91,12 +91,12 @@ public class TransferRtpDataChannelTest {
}
@Test
public void read_withSmallAndModerateBufferAndSubsequentProducerWrite_reads() throws Exception {
public void read_withSmallAndModerateBufferAndSubsequentProducerWrite_reads() {
byte[] randomBytes1 = buildTestData(40);
byte[] randomBytes2 = buildTestData(40);
byte[] smallBuffer = new byte[20];
byte[] bigBuffer = new byte[40];
TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel();
TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel(POLL_TIMEOUT_MS);
transferRtpDataChannel.onInterleavedBinaryDataReceived(randomBytes1);
transferRtpDataChannel.read(smallBuffer, /* offset= */ 0, smallBuffer.length);
@ -120,13 +120,12 @@ public class TransferRtpDataChannelTest {
}
@Test
public void read_withSmallAndBigBufferWithPartialReadAndSubsequentProducerWrite_reads()
throws Exception {
public void read_withSmallAndBigBufferWithPartialReadAndSubsequentProducerWrite_reads() {
byte[] randomBytes1 = buildTestData(40);
byte[] randomBytes2 = buildTestData(40);
byte[] smallBuffer = new byte[30];
byte[] bigBuffer = new byte[30];
TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel();
TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel(POLL_TIMEOUT_MS);
transferRtpDataChannel.onInterleavedBinaryDataReceived(randomBytes1);
transferRtpDataChannel.read(smallBuffer, /* offset= */ 0, smallBuffer.length);
@ -150,12 +149,12 @@ public class TransferRtpDataChannelTest {
}
@Test
public void read_withSmallAndBigBufferAndSubsequentProducerWrite_reads() throws Exception {
public void read_withSmallAndBigBufferAndSubsequentProducerWrite_reads() {
byte[] randomBytes1 = buildTestData(40);
byte[] randomBytes2 = buildTestData(40);
byte[] smallBuffer = new byte[20];
byte[] bigBuffer = new byte[70];
TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel();
TransferRtpDataChannel transferRtpDataChannel = new TransferRtpDataChannel(POLL_TIMEOUT_MS);
transferRtpDataChannel.onInterleavedBinaryDataReceived(randomBytes1);
transferRtpDataChannel.read(smallBuffer, /* offset= */ 0, smallBuffer.length);

View file

@ -18,6 +18,7 @@ package com.google.android.exoplayer2.source.rtsp;
import static com.google.common.truth.Truth.assertThat;
import androidx.test.ext.junit.runners.AndroidJUnit4;
import com.google.android.exoplayer2.upstream.UdpDataSource;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -27,7 +28,8 @@ public class UdpDataSourceRtpDataChannelTest {
@Test
public void getInterleavedBinaryDataListener_returnsNull() {
UdpDataSourceRtpDataChannel udpDataSourceRtpDataChannel = new UdpDataSourceRtpDataChannel();
UdpDataSourceRtpDataChannel udpDataSourceRtpDataChannel =
new UdpDataSourceRtpDataChannel(UdpDataSource.DEFAULT_SOCKET_TIMEOUT_MILLIS);
assertThat(udpDataSourceRtpDataChannel.getInterleavedBinaryDataListener()).isNull();
}