Add supports for reading duration for a TS stream.

Add supports for reading duration for a TS stream by reading PCR values of the PCR PID packets at the start and at the end of the stream, calculating the difference, and converting that into stream duration.

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=203254626
This commit is contained in:
hoangtc 2018-07-04 02:46:09 -07:00 committed by Oliver Woodman
parent 0b631b05c2
commit 2237603a4d
8 changed files with 485 additions and 24 deletions

View file

@ -15,8 +15,11 @@
*/
package com.google.android.exoplayer2.extractor;
import android.support.annotation.IntDef;
import com.google.android.exoplayer2.C;
import java.io.IOException;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
/**
* Extracts media data from a container format.
@ -41,6 +44,11 @@ public interface Extractor {
*/
int RESULT_END_OF_INPUT = C.RESULT_END_OF_INPUT;
/** Result values that can be returned by {@link #read(ExtractorInput, PositionHolder)}. */
@Retention(RetentionPolicy.SOURCE)
@IntDef(value = {RESULT_CONTINUE, RESULT_SEEK, RESULT_END_OF_INPUT})
@interface ReadResult {}
/**
* Returns whether this extractor can extract samples from the {@link ExtractorInput}, which must
* provide data from the start of the stream.
@ -63,14 +71,14 @@ public interface Extractor {
void init(ExtractorOutput output);
/**
* Extracts data read from a provided {@link ExtractorInput}. Must not be called before
* {@link #init(ExtractorOutput)}.
* <p>
* A single call to this method will block until some progress has been made, but will not block
* for longer than this. Hence each call will consume only a small amount of input data.
* <p>
* In the common case, {@link #RESULT_CONTINUE} is returned to indicate that the
* {@link ExtractorInput} passed to the next read is required to provide data continuing from the
* Extracts data read from a provided {@link ExtractorInput}. Must not be called before {@link
* #init(ExtractorOutput)}.
*
* <p>A single call to this method will block until some progress has been made, but will not
* block for longer than this. Hence each call will consume only a small amount of input data.
*
* <p>In the common case, {@link #RESULT_CONTINUE} is returned to indicate that the {@link
* ExtractorInput} passed to the next read is required to provide data continuing from the
* position in the stream reached by the returning call. If the extractor requires data to be
* provided from a different position, then that position is set in {@code seekPosition} and
* {@link #RESULT_SEEK} is returned. If the extractor reached the end of the data provided by the
@ -83,6 +91,7 @@ public interface Extractor {
* @throws IOException If an error occurred reading from the input.
* @throws InterruptedException If the thread was interrupted.
*/
@ReadResult
int read(ExtractorInput input, PositionHolder seekPosition)
throws IOException, InterruptedException;

View file

@ -0,0 +1,235 @@
/*
* Copyright (C) 2018 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.android.exoplayer2.extractor.ts;
import com.google.android.exoplayer2.C;
import com.google.android.exoplayer2.extractor.Extractor;
import com.google.android.exoplayer2.extractor.ExtractorInput;
import com.google.android.exoplayer2.extractor.PositionHolder;
import com.google.android.exoplayer2.util.ParsableByteArray;
import com.google.android.exoplayer2.util.TimestampAdjuster;
import java.io.IOException;
/**
* A reader that can extract the approximate duration from a given MPEG transport stream (TS).
*
* <p>This reader extracts the duration by reading PCR values of the PCR PID packets at the start
* and at the end of the stream, calculating the difference, and converting that into stream
* duration. This reader also handles the case when a single PCR wraparound takes place within the
* stream, which can make PCR values at the beginning of the stream larger than PCR values at the
* end. This class can only be used once to read duration from a given stream, and the usage of the
* class is not thread-safe, so all calls should be made from the same thread.
*/
/* package */ final class TsDurationReader {
private static final int DURATION_READ_PACKETS = 200;
private static final int DURATION_READ_BYTES = TsExtractor.TS_PACKET_SIZE * DURATION_READ_PACKETS;
private final TimestampAdjuster pcrTimestampAdjuster;
private final ParsableByteArray packetBuffer;
private boolean isDurationRead;
private boolean isFirstPcrValueRead;
private boolean isLastPcrValueRead;
private long firstPcrValue;
private long lastPcrValue;
private long durationUs;
/* package */ TsDurationReader() {
pcrTimestampAdjuster = new TimestampAdjuster(/* firstSampleTimestampUs= */ 0);
firstPcrValue = C.TIME_UNSET;
lastPcrValue = C.TIME_UNSET;
durationUs = C.TIME_UNSET;
packetBuffer = new ParsableByteArray(DURATION_READ_BYTES);
}
/** Returns true if a TS duration has been read. */
public boolean isDurationReadFinished() {
return isDurationRead;
}
/**
* Reads a TS duration from the input, using the given PCR PID.
*
* <p>This reader reads the duration by reading PCR values of the PCR PID packets at the start and
* at the end of the stream, calculating the difference, and converting that into stream duration.
*
* @param input The {@link ExtractorInput} from which data should be read.
* @param seekPositionHolder If {@link Extractor#RESULT_SEEK} is returned, this holder is updated
* to hold the position of the required seek.
* @param pcrPid The PID of the packet stream within this TS stream that contains PCR values.
* @return One of the {@code RESULT_} values defined in {@link Extractor}.
* @throws IOException If an error occurred reading from the input.
* @throws InterruptedException If the thread was interrupted.
*/
public @Extractor.ReadResult int readDuration(
ExtractorInput input, PositionHolder seekPositionHolder, int pcrPid)
throws IOException, InterruptedException {
if (pcrPid <= 0) {
return finishReadDuration(input);
}
if (!isLastPcrValueRead) {
return readLastPcrValue(input, seekPositionHolder, pcrPid);
}
if (lastPcrValue == C.TIME_UNSET) {
return finishReadDuration(input);
}
if (!isFirstPcrValueRead) {
return readFirstPcrValue(input, seekPositionHolder, pcrPid);
}
if (firstPcrValue == C.TIME_UNSET) {
return finishReadDuration(input);
}
long minPcrPositionUs = pcrTimestampAdjuster.adjustTsTimestamp(firstPcrValue);
long maxPcrPositionUs = pcrTimestampAdjuster.adjustTsTimestamp(lastPcrValue);
durationUs = maxPcrPositionUs - minPcrPositionUs;
return finishReadDuration(input);
}
/**
* Returns the duration last read from {@link #readDuration(ExtractorInput, PositionHolder, int)}.
*/
public long getDurationUs() {
return durationUs;
}
private int finishReadDuration(ExtractorInput input) {
isDurationRead = true;
input.resetPeekPosition();
return Extractor.RESULT_CONTINUE;
}
private int readFirstPcrValue(ExtractorInput input, PositionHolder seekPositionHolder, int pcrPid)
throws IOException, InterruptedException {
if (input.getPosition() != 0) {
seekPositionHolder.position = 0;
return Extractor.RESULT_SEEK;
}
int bytesToRead = (int) Math.min(DURATION_READ_BYTES, input.getLength());
input.resetPeekPosition();
input.peekFully(packetBuffer.data, /* offset= */ 0, bytesToRead);
packetBuffer.setPosition(0);
packetBuffer.setLimit(bytesToRead);
firstPcrValue = readFirstPcrValueFromBuffer(packetBuffer, pcrPid);
isFirstPcrValueRead = true;
return Extractor.RESULT_CONTINUE;
}
private long readFirstPcrValueFromBuffer(ParsableByteArray packetBuffer, int pcrPid) {
int searchStartPosition = packetBuffer.getPosition();
int searchEndPosition = packetBuffer.limit();
for (int searchPosition = searchStartPosition;
searchPosition < searchEndPosition;
searchPosition++) {
if (packetBuffer.data[searchPosition] != TsExtractor.TS_SYNC_BYTE) {
continue;
}
long pcrValue = readPcrFromPacket(packetBuffer, searchPosition, pcrPid);
if (pcrValue != C.TIME_UNSET) {
return pcrValue;
}
}
return C.TIME_UNSET;
}
private int readLastPcrValue(ExtractorInput input, PositionHolder seekPositionHolder, int pcrPid)
throws IOException, InterruptedException {
int bytesToRead = (int) Math.min(DURATION_READ_BYTES, input.getLength());
long bufferStartStreamPosition = input.getLength() - bytesToRead;
if (input.getPosition() != bufferStartStreamPosition) {
seekPositionHolder.position = bufferStartStreamPosition;
return Extractor.RESULT_SEEK;
}
input.resetPeekPosition();
input.peekFully(packetBuffer.data, /* offset= */ 0, bytesToRead);
packetBuffer.setPosition(0);
packetBuffer.setLimit(bytesToRead);
lastPcrValue = readLastPcrValueFromBuffer(packetBuffer, pcrPid);
isLastPcrValueRead = true;
return Extractor.RESULT_CONTINUE;
}
private long readLastPcrValueFromBuffer(ParsableByteArray packetBuffer, int pcrPid) {
int searchStartPosition = packetBuffer.getPosition();
int searchEndPosition = packetBuffer.limit();
for (int searchPosition = searchEndPosition - 1;
searchPosition >= searchStartPosition;
searchPosition--) {
if (packetBuffer.data[searchPosition] != TsExtractor.TS_SYNC_BYTE) {
continue;
}
long pcrValue = readPcrFromPacket(packetBuffer, searchPosition, pcrPid);
if (pcrValue != C.TIME_UNSET) {
return pcrValue;
}
}
return C.TIME_UNSET;
}
private static long readPcrFromPacket(
ParsableByteArray packetBuffer, int startOfPacket, int pcrPid) {
packetBuffer.setPosition(startOfPacket);
if (packetBuffer.bytesLeft() < 5) {
// Header = 4 bytes, adaptationFieldLength = 1 byte.
return C.TIME_UNSET;
}
// Note: See ISO/IEC 13818-1, section 2.4.3.2 for details of the header format.
int tsPacketHeader = packetBuffer.readInt();
if ((tsPacketHeader & 0x800000) != 0) {
// transport_error_indicator != 0 means there are uncorrectable errors in this packet.
return C.TIME_UNSET;
}
int pid = (tsPacketHeader & 0x1FFF00) >> 8;
if (pid != pcrPid) {
return C.TIME_UNSET;
}
boolean adaptationFieldExists = (tsPacketHeader & 0x20) != 0;
if (!adaptationFieldExists) {
return C.TIME_UNSET;
}
int adaptationFieldLength = packetBuffer.readUnsignedByte();
if (adaptationFieldLength >= 7 && packetBuffer.bytesLeft() >= 7) {
int flags = packetBuffer.readUnsignedByte();
boolean pcrFlagSet = (flags & 0x10) == 0x10;
if (pcrFlagSet) {
byte[] pcrBytes = new byte[6];
packetBuffer.readBytes(pcrBytes, /* offset= */ 0, pcrBytes.length);
return readPcrValueFromPcrBytes(pcrBytes);
}
}
return C.TIME_UNSET;
}
/**
* Returns the value of PCR base - first 33 bits in big endian order from the PCR bytes.
*
* <p>We ignore PCR Ext, because it's too small to have any significance.
*/
private static long readPcrValueFromPcrBytes(byte[] pcrBytes) {
return (pcrBytes[0] & 0xFFL) << 25
| (pcrBytes[1] & 0xFFL) << 17
| (pcrBytes[2] & 0xFFL) << 9
| (pcrBytes[3] & 0xFFL) << 1
| (pcrBytes[4] & 0xFFL) >> 7;
}
}

View file

@ -98,8 +98,9 @@ public final class TsExtractor implements Extractor {
public static final int TS_STREAM_TYPE_SPLICE_INFO = 0x86;
public static final int TS_STREAM_TYPE_DVBSUBS = 0x59;
private static final int TS_PACKET_SIZE = 188;
private static final int TS_SYNC_BYTE = 0x47; // First byte of each TS packet.
public static final int TS_PACKET_SIZE = 188;
public static final int TS_SYNC_BYTE = 0x47; // First byte of each TS packet.
private static final int TS_PAT_PID = 0;
private static final int MAX_PID_PLUS_ONE = 0x2000;
@ -110,7 +111,7 @@ public final class TsExtractor implements Extractor {
private static final int BUFFER_SIZE = TS_PACKET_SIZE * 50;
private static final int SNIFF_TS_PACKET_COUNT = 5;
@Mode private final int mode;
private final @Mode int mode;
private final List<TimestampAdjuster> timestampAdjusters;
private final ParsableByteArray tsPacketBuffer;
private final SparseIntArray continuityCounters;
@ -118,13 +119,17 @@ public final class TsExtractor implements Extractor {
private final SparseArray<TsPayloadReader> tsPayloadReaders; // Indexed by pid
private final SparseBooleanArray trackIds;
private final SparseBooleanArray trackPids;
private final TsDurationReader durationReader;
// Accessed only by the loading thread.
private ExtractorOutput output;
private int remainingPmts;
private boolean tracksEnded;
private boolean hasOutputSeekMap;
private boolean pendingSeekToStart;
private TsPayloadReader id3Reader;
private int bytesSinceLastSync;
private int pcrPid;
public TsExtractor() {
this(0);
@ -145,18 +150,21 @@ public final class TsExtractor implements Extractor {
* {@code FLAG_*} values that control the behavior of the payload readers.
*/
public TsExtractor(@Mode int mode, @Flags int defaultTsPayloadReaderFlags) {
this(mode, new TimestampAdjuster(0),
this(
mode,
new TimestampAdjuster(0),
new DefaultTsPayloadReaderFactory(defaultTsPayloadReaderFlags));
}
/**
* @param mode Mode for the extractor. One of {@link #MODE_MULTI_PMT}, {@link #MODE_SINGLE_PMT}
* and {@link #MODE_HLS}.
* @param timestampAdjuster A timestamp adjuster for offsetting and scaling sample timestamps.
* @param payloadReaderFactory Factory for injecting a custom set of payload readers.
*/
public TsExtractor(@Mode int mode, TimestampAdjuster timestampAdjuster,
public TsExtractor(
@Mode int mode,
TimestampAdjuster timestampAdjuster,
TsPayloadReader.Factory payloadReaderFactory) {
this.payloadReaderFactory = Assertions.checkNotNull(payloadReaderFactory);
this.mode = mode;
@ -171,6 +179,8 @@ public final class TsExtractor implements Extractor {
trackPids = new SparseBooleanArray();
tsPayloadReaders = new SparseArray<>();
continuityCounters = new SparseIntArray();
durationReader = new TsDurationReader();
pcrPid = -1;
resetPayloadReaders();
}
@ -200,7 +210,6 @@ public final class TsExtractor implements Extractor {
@Override
public void init(ExtractorOutput output) {
this.output = output;
output.seekMap(new SeekMap.Unseekable(C.TIME_UNSET));
}
@Override
@ -224,8 +233,25 @@ public final class TsExtractor implements Extractor {
}
@Override
public int read(ExtractorInput input, PositionHolder seekPosition)
public @ReadResult int read(ExtractorInput input, PositionHolder seekPosition)
throws IOException, InterruptedException {
if (tracksEnded) {
boolean canReadDuration = input.getLength() != C.LENGTH_UNSET && mode != MODE_HLS;
if (canReadDuration && !durationReader.isDurationReadFinished()) {
return durationReader.readDuration(input, seekPosition, pcrPid);
}
maybeOutputSeekMap();
if (pendingSeekToStart) {
pendingSeekToStart = false;
seek(/* position= */ 0, /* timeUs= */ 0);
if (input.getPosition() != 0) {
seekPosition.position = 0;
return RESULT_SEEK;
}
}
}
if (!fillBufferWithAtLeastOnePacket(input)) {
return RESULT_END_OF_INPUT;
}
@ -284,21 +310,26 @@ public final class TsExtractor implements Extractor {
payloadReader.consume(tsPacketBuffer, payloadUnitStartIndicator);
tsPacketBuffer.setLimit(limit);
}
if (mode != MODE_HLS && !wereTracksEnded && tracksEnded) {
// We have read all tracks from all PMTs in this stream. Now seek to the beginning and read
// again to make sure we output all media, including any contained in packets prior to those
// containing the track information.
seek(/* position= */ 0, /* timeUs= */ 0);
seekPosition.position = 0;
return RESULT_SEEK;
pendingSeekToStart = true;
}
tsPacketBuffer.setPosition(endOfPacket);
return RESULT_CONTINUE;
}
// Internals.
private void maybeOutputSeekMap() {
if (!hasOutputSeekMap) {
hasOutputSeekMap = true;
output.seekMap(new SeekMap.Unseekable(durationReader.getDurationUs()));
}
}
private boolean fillBufferWithAtLeastOnePacket(ExtractorInput input)
throws IOException, InterruptedException {
byte[] data = tsPacketBuffer.data;
@ -478,9 +509,16 @@ public final class TsExtractor implements Extractor {
// section_syntax_indicator(1), '0'(1), reserved(2), section_length(12)
sectionData.skipBytes(2);
int programNumber = sectionData.readUnsignedShort();
// Skip 3 bytes (24 bits), including:
// reserved (2), version_number (5), current_next_indicator (1), section_number (8),
// last_section_number (8), reserved (3), PCR_PID (13)
sectionData.skipBytes(5);
// last_section_number (8)
sectionData.skipBytes(3);
sectionData.readBytes(pmtScratch, 2);
// reserved (3), PCR_PID (13)
pmtScratch.skipBits(3);
pcrPid = pmtScratch.readBits(13);
// Read program_info_length.
sectionData.readBytes(pmtScratch, 2);

Binary file not shown.

View file

@ -1,6 +1,6 @@
seekMap:
isSeekable = false
duration = UNSET TIME
duration = 66733
getPosition(0) = [[timeUs=0, position=0]]
numberOfTracks = 2
track 256:

View file

@ -0,0 +1,79 @@
seekMap:
isSeekable = false
duration = UNSET TIME
getPosition(0) = [[timeUs=0, position=0]]
numberOfTracks = 2
track 256:
format:
bitrate = -1
id = 1/256
containerMimeType = null
sampleMimeType = video/mpeg2
maxInputSize = -1
width = 640
height = 426
frameRate = -1.0
rotationDegrees = 0
pixelWidthHeightRatio = 1.0
channelCount = -1
sampleRate = -1
pcmEncoding = -1
encoderDelay = 0
encoderPadding = 0
subsampleOffsetUs = 9223372036854775807
selectionFlags = 0
language = null
drmInitData = -
initializationData:
data = length 22, hash CE183139
total output bytes = 45026
sample count = 2
sample 0:
time = 33366
flags = 1
data = length 20711, hash 34341E8
sample 1:
time = 66733
flags = 0
data = length 18112, hash EC44B35B
track 257:
format:
bitrate = -1
id = 1/257
containerMimeType = null
sampleMimeType = audio/mpeg-L2
maxInputSize = 4096
width = -1
height = -1
frameRate = -1.0
rotationDegrees = 0
pixelWidthHeightRatio = 1.0
channelCount = 1
sampleRate = 44100
pcmEncoding = -1
encoderDelay = 0
encoderPadding = 0
subsampleOffsetUs = 9223372036854775807
selectionFlags = 0
language = und
drmInitData = -
initializationData:
total output bytes = 5015
sample count = 4
sample 0:
time = 22455
flags = 1
data = length 1253, hash 727FD1C6
sample 1:
time = 48577
flags = 1
data = length 1254, hash 73FB07B8
sample 2:
time = 74700
flags = 1
data = length 1254, hash 73FB07B8
sample 3:
time = 100822
flags = 1
data = length 1254, hash 73FB07B8
tracksEnded = true

View file

@ -0,0 +1,94 @@
/*
* Copyright (C) 2018 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.android.exoplayer2.extractor.ts;
import static com.google.common.truth.Truth.assertThat;
import com.google.android.exoplayer2.extractor.Extractor;
import com.google.android.exoplayer2.extractor.PositionHolder;
import com.google.android.exoplayer2.testutil.FakeExtractorInput;
import com.google.android.exoplayer2.testutil.TestUtil;
import java.io.IOException;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.robolectric.RobolectricTestRunner;
import org.robolectric.RuntimeEnvironment;
/** Unit test for {@link TsDurationReader}. */
@RunWith(RobolectricTestRunner.class)
public final class TsDurationReaderTest {
private TsDurationReader tsDurationReader;
private PositionHolder seekPositionHolder;
@Before
public void setUp() {
tsDurationReader = new TsDurationReader();
seekPositionHolder = new PositionHolder();
}
@Test
public void testIsDurationReadPending_returnFalseByDefault() {
assertThat(tsDurationReader.isDurationReadFinished()).isFalse();
}
@Test
public void testReadDuration_returnsCorrectDuration() throws IOException, InterruptedException {
FakeExtractorInput input =
new FakeExtractorInput.Builder()
.setData(TestUtil.getByteArray(RuntimeEnvironment.application, "ts/bbb_2500ms.ts"))
.setSimulateIOErrors(false)
.setSimulateUnknownLength(false)
.setSimulatePartialReads(false)
.build();
while (!tsDurationReader.isDurationReadFinished()) {
int result = tsDurationReader.readDuration(input, seekPositionHolder, /* pcrPid= */ 256);
if (result == Extractor.RESULT_END_OF_INPUT) {
break;
}
if (result == Extractor.RESULT_SEEK) {
input.setPosition((int) seekPositionHolder.position);
}
}
assertThat(tsDurationReader.getDurationUs() / 1000).isEqualTo(2500);
}
@Test
public void testReadDuration_midStream_returnsCorrectDuration()
throws IOException, InterruptedException {
FakeExtractorInput input =
new FakeExtractorInput.Builder()
.setData(TestUtil.getByteArray(RuntimeEnvironment.application, "ts/bbb_2500ms.ts"))
.setSimulateIOErrors(false)
.setSimulateUnknownLength(false)
.setSimulatePartialReads(false)
.build();
input.setPosition(1234);
while (!tsDurationReader.isDurationReadFinished()) {
int result = tsDurationReader.readDuration(input, seekPositionHolder, /* pcrPid= */ 256);
if (result == Extractor.RESULT_END_OF_INPUT) {
break;
}
if (result == Extractor.RESULT_SEEK) {
input.setPosition((int) seekPositionHolder.position);
}
}
assertThat(tsDurationReader.getDurationUs() / 1000).isEqualTo(2500);
}
}

View file

@ -105,6 +105,9 @@ public final class TsExtractorTest {
int readResult = Extractor.RESULT_CONTINUE;
while (readResult != Extractor.RESULT_END_OF_INPUT) {
readResult = tsExtractor.read(input, seekPositionHolder);
if (readResult == Extractor.RESULT_SEEK) {
input.setPosition((int) seekPositionHolder.position);
}
}
CustomEsReader reader = factory.esReader;
assertThat(reader.packetsRead).isEqualTo(2);
@ -131,8 +134,11 @@ public final class TsExtractorTest {
int readResult = Extractor.RESULT_CONTINUE;
while (readResult != Extractor.RESULT_END_OF_INPUT) {
readResult = tsExtractor.read(input, seekPositionHolder);
if (readResult == Extractor.RESULT_SEEK) {
input.setPosition((int) seekPositionHolder.position);
}
}
assertThat(factory.sdtReader.consumedSdts).isEqualTo(1);
assertThat(factory.sdtReader.consumedSdts).isEqualTo(2);
}
private static void writeJunkData(ByteArrayOutputStream out, int length) {