From da656e6f26ac49e88f5abc5a7a90aa1bbf5c3a3e Mon Sep 17 00:00:00 2001 From: Oliver Woodman Date: Sat, 11 Apr 2015 01:07:41 +0100 Subject: [PATCH] More steps towards unified extractors. --- .../extractor/DefaultTrackOutput.java | 42 ++++- .../extractor/RollingSampleBuffer.java | 175 ++++++++++++------ .../exoplayer/hls/HlsExtractorWrapper.java | 10 +- .../exoplayer/hls/HlsSampleSource.java | 6 +- 4 files changed, 161 insertions(+), 72 deletions(-) diff --git a/library/src/main/java/com/google/android/exoplayer/extractor/DefaultTrackOutput.java b/library/src/main/java/com/google/android/exoplayer/extractor/DefaultTrackOutput.java index 5455e9a398..ba69ae0397 100644 --- a/library/src/main/java/com/google/android/exoplayer/extractor/DefaultTrackOutput.java +++ b/library/src/main/java/com/google/android/exoplayer/extractor/DefaultTrackOutput.java @@ -24,9 +24,8 @@ import com.google.android.exoplayer.util.ParsableByteArray; import java.io.IOException; /** - * Wraps a {@link RollingSampleBuffer}, adding higher level functionality such as enforcing that - * the first sample returned from the queue is a keyframe, allowing splicing to another queue, and - * so on. + * A {@link TrackOutput} that buffers extracted samples in a queue, and allows for consumption from + * that queue. */ public final class DefaultTrackOutput implements TrackOutput { @@ -51,12 +50,35 @@ public final class DefaultTrackOutput implements TrackOutput { largestParsedTimestampUs = Long.MIN_VALUE; } - public void release() { - rollingBuffer.release(); + // Called by the consuming thread, but only when there is no loading thread. + + /** + * Clears the queue, returning all allocations to the allocator. + */ + public void clear() { + rollingBuffer.clear(); + needKeyframe = true; + lastReadTimeUs = Long.MIN_VALUE; + spliceOutTimeUs = Long.MIN_VALUE; + largestParsedTimestampUs = Long.MIN_VALUE; + } + + /** + * Returns the current absolute write index. + */ + public int getWriteIndex() { + return rollingBuffer.getWriteIndex(); } // Called by the consuming thread. + /** + * Returns the current absolute read index. + */ + public int getReadIndex() { + return rollingBuffer.getReadIndex(); + } + /** * True if the output has received a format. False otherwise. */ @@ -64,14 +86,24 @@ public final class DefaultTrackOutput implements TrackOutput { return format != null; } + /** + * The format most recently received by the output, or null if a format has yet to be received. + */ public MediaFormat getFormat() { return format; } + /** + * The largest timestamp of any sample received by the output, or {@link Long#MIN_VALUE} if a + * sample has yet to be received. + */ public long getLargestParsedTimestampUs() { return largestParsedTimestampUs; } + /** + * True if at least one sample can be read from the queue. False otherwise. + */ public boolean isEmpty() { return !advanceToEligibleSample(); } diff --git a/library/src/main/java/com/google/android/exoplayer/extractor/RollingSampleBuffer.java b/library/src/main/java/com/google/android/exoplayer/extractor/RollingSampleBuffer.java index e073c1b099..446f64df4b 100644 --- a/library/src/main/java/com/google/android/exoplayer/extractor/RollingSampleBuffer.java +++ b/library/src/main/java/com/google/android/exoplayer/extractor/RollingSampleBuffer.java @@ -55,16 +55,41 @@ import java.util.concurrent.ConcurrentLinkedQueue; dataQueue = new ConcurrentLinkedQueue(); extrasHolder = new SampleExtrasHolder(); scratch = new ParsableByteArray(INITIAL_SCRATCH_SIZE); + lastFragmentOffset = fragmentLength; } - public void release() { + // Called by the consuming thread, but only when there is no loading thread. + + /** + * Clears the buffer, returning all allocations to the allocator. + */ + public void clear() { + infoQueue.clear(); while (!dataQueue.isEmpty()) { fragmentPool.releaseDirect(dataQueue.remove()); } + totalBytesDropped = 0; + totalBytesWritten = 0; + lastFragment = null; + lastFragmentOffset = fragmentLength; + } + + /** + * Returns the current absolute write index. + */ + public int getWriteIndex() { + return infoQueue.getWriteIndex(); } // Called by the consuming thread. + /** + * Returns the current absolute read index. + */ + public int getReadIndex() { + return infoQueue.getReadIndex(); + } + /** * Fills {@code holder} with information about the current sample, but does not write its data. *

@@ -83,7 +108,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; */ public void skipSample() { long nextOffset = infoQueue.moveToNextSample(); - dropFragmentsTo(nextOffset); + dropDownstreamTo(nextOffset); } /** @@ -112,7 +137,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; } // Advance the read head. long nextOffset = infoQueue.moveToNextSample(); - dropFragmentsTo(nextOffset); + dropDownstreamTo(nextOffset); return true; } @@ -198,7 +223,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; private void readData(long absolutePosition, ByteBuffer target, int length) { int remaining = length; while (remaining > 0) { - dropFragmentsTo(absolutePosition); + dropDownstreamTo(absolutePosition); int positionInFragment = (int) (absolutePosition - totalBytesDropped); int toCopy = Math.min(remaining, fragmentLength - positionInFragment); target.put(dataQueue.peek(), positionInFragment, toCopy); @@ -216,14 +241,14 @@ import java.util.concurrent.ConcurrentLinkedQueue; */ // TODO: Consider reducing duplication of this method and the one above. private void readData(long absolutePosition, byte[] target, int length) { - int remaining = length; - while (remaining > 0) { - dropFragmentsTo(absolutePosition); + int bytesRead = 0; + while (bytesRead < length) { + dropDownstreamTo(absolutePosition); int positionInFragment = (int) (absolutePosition - totalBytesDropped); - int toCopy = Math.min(remaining, fragmentLength - positionInFragment); - System.arraycopy(dataQueue.peek(), positionInFragment, target, 0, toCopy); + int toCopy = Math.min(length - bytesRead, fragmentLength - positionInFragment); + System.arraycopy(dataQueue.peek(), positionInFragment, target, bytesRead, toCopy); absolutePosition += toCopy; - remaining -= toCopy; + bytesRead += toCopy; } } @@ -233,7 +258,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; * * @param absolutePosition The absolute position up to which fragments can be discarded. */ - private void dropFragmentsTo(long absolutePosition) { + private void dropDownstreamTo(long absolutePosition) { int relativePosition = (int) (absolutePosition - totalBytesDropped); int fragmentIndex = relativePosition / fragmentLength; for (int i = 0; i < fragmentIndex; i++) { @@ -266,24 +291,23 @@ import java.util.concurrent.ConcurrentLinkedQueue; * Appends data to the rolling buffer. * * @param dataSource The source from which to read. - * @param length The maximum length of the read. - * @return The number of bytes read, or -1 if the the end of the source has been reached. + * @param length The maximum length of the read, or {@link C#LENGTH_UNBOUNDED} if the caller does + * not wish to impose a limit. + * @return The number of bytes appended. * @throws IOException If an error occurs reading from the source. */ public int appendData(DataSource dataSource, int length) throws IOException { - int remainingWriteLength = length; - if (dataQueue.isEmpty() || lastFragmentOffset == fragmentLength) { - lastFragmentOffset = 0; - lastFragment = fragmentPool.allocateDirect(); - dataQueue.add(lastFragment); - } - int thisWriteLength = Math.min(remainingWriteLength, fragmentLength - lastFragmentOffset); - int bytesRead = dataSource.read(lastFragment, lastFragmentOffset, thisWriteLength); - if (bytesRead == -1) { - return -1; + ensureSpaceForWrite(); + int remainingFragmentCapacity = fragmentLength - lastFragmentOffset; + length = length != C.LENGTH_UNBOUNDED ? Math.min(length, remainingFragmentCapacity) + : remainingFragmentCapacity; + + int bytesRead = dataSource.read(lastFragment, lastFragmentOffset, length); + if (bytesRead == C.RESULT_END_OF_INPUT) { + return C.RESULT_END_OF_INPUT; } + lastFragmentOffset += bytesRead; - remainingWriteLength -= bytesRead; totalBytesWritten += bytesRead; return bytesRead; } @@ -294,15 +318,10 @@ import java.util.concurrent.ConcurrentLinkedQueue; * @param buffer A buffer containing the data to append. * @param length The length of the data to append. */ - // TODO: Consider reducing duplication of this method and the one above. public void appendData(ParsableByteArray buffer, int length) { int remainingWriteLength = length; while (remainingWriteLength > 0) { - if (dataQueue.isEmpty() || lastFragmentOffset == fragmentLength) { - lastFragmentOffset = 0; - lastFragment = fragmentPool.allocateDirect(); - dataQueue.add(lastFragment); - } + ensureSpaceForWrite(); int thisWriteLength = Math.min(remainingWriteLength, fragmentLength - lastFragmentOffset); buffer.readBytes(lastFragment, lastFragmentOffset, thisWriteLength); lastFragmentOffset += thisWriteLength; @@ -325,6 +344,17 @@ import java.util.concurrent.ConcurrentLinkedQueue; infoQueue.commitSample(sampleTimeUs, flags, position, size, encryptionKey); } + /** + * Ensures at least one byte can be written, allocating a new fragment if necessary. + */ + private void ensureSpaceForWrite() { + if (lastFragmentOffset == fragmentLength) { + lastFragmentOffset = 0; + lastFragment = fragmentPool.allocateDirect(); + dataQueue.add(lastFragment); + } + } + /** * Holds information about the samples in the rolling buffer. */ @@ -341,8 +371,9 @@ import java.util.concurrent.ConcurrentLinkedQueue; private byte[][] encryptionKeys; private int queueSize; - private int readIndex; - private int writeIndex; + private int absoluteReadIndex; + private int relativeReadIndex; + private int relativeWriteIndex; public InfoQueue() { capacity = SAMPLE_CAPACITY_INCREMENT; @@ -353,8 +384,34 @@ import java.util.concurrent.ConcurrentLinkedQueue; encryptionKeys = new byte[capacity][]; } + // Called by the consuming thread, but only when there is no loading thread. + + /** + * Clears the queue. + */ + public void clear() { + absoluteReadIndex = 0; + relativeReadIndex = 0; + relativeWriteIndex = 0; + queueSize = 0; + } + + /** + * Returns the current absolute write index. + */ + public int getWriteIndex() { + return absoluteReadIndex + queueSize; + } + // Called by the consuming thread. + /** + * Returns the current absolute read index. + */ + public int getReadIndex() { + return absoluteReadIndex; + } + /** * Fills {@code holder} with information about the current sample, but does not write its data. * The first entry in {@code offsetHolder} is filled with the absolute position of the sample's @@ -371,11 +428,11 @@ import java.util.concurrent.ConcurrentLinkedQueue; if (queueSize == 0) { return false; } - holder.timeUs = timesUs[readIndex]; - holder.size = sizes[readIndex]; - holder.flags = flags[readIndex]; - extrasHolder.offset = offsets[readIndex]; - extrasHolder.encryptionKeyId = encryptionKeys[readIndex]; + holder.timeUs = timesUs[relativeReadIndex]; + holder.size = sizes[relativeReadIndex]; + holder.flags = flags[relativeReadIndex]; + extrasHolder.offset = offsets[relativeReadIndex]; + extrasHolder.encryptionKeyId = encryptionKeys[relativeReadIndex]; return true; } @@ -387,23 +444,25 @@ import java.util.concurrent.ConcurrentLinkedQueue; */ public synchronized long moveToNextSample() { queueSize--; - int lastReadIndex = readIndex++; - if (readIndex == capacity) { + int lastReadIndex = relativeReadIndex++; + absoluteReadIndex++; + if (relativeReadIndex == capacity) { // Wrap around. - readIndex = 0; + relativeReadIndex = 0; } - return queueSize > 0 ? offsets[readIndex] : (sizes[lastReadIndex] + offsets[lastReadIndex]); + return queueSize > 0 ? offsets[relativeReadIndex] + : (sizes[lastReadIndex] + offsets[lastReadIndex]); } // Called by the loading thread. public synchronized void commitSample(long timeUs, int sampleFlags, long offset, int size, byte[] encryptionKey) { - timesUs[writeIndex] = timeUs; - offsets[writeIndex] = offset; - sizes[writeIndex] = size; - flags[writeIndex] = sampleFlags; - encryptionKeys[writeIndex] = encryptionKey; + timesUs[relativeWriteIndex] = timeUs; + offsets[relativeWriteIndex] = offset; + sizes[relativeWriteIndex] = size; + flags[relativeWriteIndex] = sampleFlags; + encryptionKeys[relativeWriteIndex] = encryptionKey; // Increment the write index. queueSize++; if (queueSize == capacity) { @@ -414,13 +473,13 @@ import java.util.concurrent.ConcurrentLinkedQueue; int[] newFlags = new int[newCapacity]; int[] newSizes = new int[newCapacity]; byte[][] newEncryptionKeys = new byte[newCapacity][]; - int beforeWrap = capacity - readIndex; - System.arraycopy(offsets, readIndex, newOffsets, 0, beforeWrap); - System.arraycopy(timesUs, readIndex, newTimesUs, 0, beforeWrap); - System.arraycopy(flags, readIndex, newFlags, 0, beforeWrap); - System.arraycopy(sizes, readIndex, newSizes, 0, beforeWrap); - System.arraycopy(encryptionKeys, readIndex, newEncryptionKeys, 0, beforeWrap); - int afterWrap = readIndex; + int beforeWrap = capacity - relativeReadIndex; + System.arraycopy(offsets, relativeReadIndex, newOffsets, 0, beforeWrap); + System.arraycopy(timesUs, relativeReadIndex, newTimesUs, 0, beforeWrap); + System.arraycopy(flags, relativeReadIndex, newFlags, 0, beforeWrap); + System.arraycopy(sizes, relativeReadIndex, newSizes, 0, beforeWrap); + System.arraycopy(encryptionKeys, relativeReadIndex, newEncryptionKeys, 0, beforeWrap); + int afterWrap = relativeReadIndex; System.arraycopy(offsets, 0, newOffsets, beforeWrap, afterWrap); System.arraycopy(timesUs, 0, newTimesUs, beforeWrap, afterWrap); System.arraycopy(flags, 0, newFlags, beforeWrap, afterWrap); @@ -431,15 +490,15 @@ import java.util.concurrent.ConcurrentLinkedQueue; flags = newFlags; sizes = newSizes; encryptionKeys = newEncryptionKeys; - readIndex = 0; - writeIndex = capacity; + relativeReadIndex = 0; + relativeWriteIndex = capacity; queueSize = capacity; capacity = newCapacity; } else { - writeIndex++; - if (writeIndex == capacity) { + relativeWriteIndex++; + if (relativeWriteIndex == capacity) { // Wrap around. - writeIndex = 0; + relativeWriteIndex = 0; } } } diff --git a/library/src/main/java/com/google/android/exoplayer/hls/HlsExtractorWrapper.java b/library/src/main/java/com/google/android/exoplayer/hls/HlsExtractorWrapper.java index a340183ff0..dcdf7afe4a 100644 --- a/library/src/main/java/com/google/android/exoplayer/hls/HlsExtractorWrapper.java +++ b/library/src/main/java/com/google/android/exoplayer/hls/HlsExtractorWrapper.java @@ -125,13 +125,11 @@ public final class HlsExtractorWrapper implements ExtractorOutput { } /** - * Releases the extractor, recycling any pending or incomplete samples to the sample pool. - *

- * This method should not be called whilst {@link #read(ExtractorInput)} is also being invoked. + * Clears queues for all tracks, returning all allocations to the buffer pool. */ - public void release() { + public void clear() { for (int i = 0; i < sampleQueues.size(); i++) { - sampleQueues.valueAt(i).release(); + sampleQueues.valueAt(i).clear(); } } @@ -140,7 +138,7 @@ public final class HlsExtractorWrapper implements ExtractorOutput { * * @return The largest timestamp, or {@link Long#MIN_VALUE} if no samples have been parsed. */ - public long getLargestSampleTimestamp() { + public long getLargestParsedTimestampUs() { long largestParsedTimestampUs = Long.MIN_VALUE; for (int i = 0; i < sampleQueues.size(); i++) { largestParsedTimestampUs = Math.max(largestParsedTimestampUs, diff --git a/library/src/main/java/com/google/android/exoplayer/hls/HlsSampleSource.java b/library/src/main/java/com/google/android/exoplayer/hls/HlsSampleSource.java index 72f9e6d587..a069864272 100644 --- a/library/src/main/java/com/google/android/exoplayer/hls/HlsSampleSource.java +++ b/library/src/main/java/com/google/android/exoplayer/hls/HlsSampleSource.java @@ -261,7 +261,7 @@ public class HlsSampleSource implements SampleSource, Loader.Callback { } else if (loadingFinished) { return TrackRenderer.END_OF_TRACK_US; } else { - long largestSampleTimestamp = extractors.getLast().getLargestSampleTimestamp(); + long largestSampleTimestamp = extractors.getLast().getLargestParsedTimestampUs(); return largestSampleTimestamp == Long.MIN_VALUE ? downstreamPositionUs : largestSampleTimestamp; } @@ -333,7 +333,7 @@ public class HlsSampleSource implements SampleSource, Loader.Callback { HlsExtractorWrapper extractor = extractors.getFirst(); while (extractors.size() > 1 && !haveSamplesForEnabledTracks(extractor)) { // We're finished reading from the extractor for all tracks, and so can discard it. - extractors.removeFirst().release(); + extractors.removeFirst().clear(); extractor = extractors.getFirst(); } return extractor; @@ -382,7 +382,7 @@ public class HlsSampleSource implements SampleSource, Loader.Callback { private void clearState() { for (int i = 0; i < extractors.size(); i++) { - extractors.get(i).release(); + extractors.get(i).clear(); } extractors.clear(); clearCurrentLoadable();