More steps towards unified extractors.

This commit is contained in:
Oliver Woodman 2015-04-11 01:07:41 +01:00
parent 53a47524a1
commit da656e6f26
4 changed files with 161 additions and 72 deletions

View file

@ -24,9 +24,8 @@ import com.google.android.exoplayer.util.ParsableByteArray;
import java.io.IOException; import java.io.IOException;
/** /**
* Wraps a {@link RollingSampleBuffer}, adding higher level functionality such as enforcing that * A {@link TrackOutput} that buffers extracted samples in a queue, and allows for consumption from
* the first sample returned from the queue is a keyframe, allowing splicing to another queue, and * that queue.
* so on.
*/ */
public final class DefaultTrackOutput implements TrackOutput { public final class DefaultTrackOutput implements TrackOutput {
@ -51,12 +50,35 @@ public final class DefaultTrackOutput implements TrackOutput {
largestParsedTimestampUs = Long.MIN_VALUE; largestParsedTimestampUs = Long.MIN_VALUE;
} }
public void release() { // Called by the consuming thread, but only when there is no loading thread.
rollingBuffer.release();
/**
* Clears the queue, returning all allocations to the allocator.
*/
public void clear() {
rollingBuffer.clear();
needKeyframe = true;
lastReadTimeUs = Long.MIN_VALUE;
spliceOutTimeUs = Long.MIN_VALUE;
largestParsedTimestampUs = Long.MIN_VALUE;
}
/**
* Returns the current absolute write index.
*/
public int getWriteIndex() {
return rollingBuffer.getWriteIndex();
} }
// Called by the consuming thread. // Called by the consuming thread.
/**
* Returns the current absolute read index.
*/
public int getReadIndex() {
return rollingBuffer.getReadIndex();
}
/** /**
* True if the output has received a format. False otherwise. * True if the output has received a format. False otherwise.
*/ */
@ -64,14 +86,24 @@ public final class DefaultTrackOutput implements TrackOutput {
return format != null; return format != null;
} }
/**
* The format most recently received by the output, or null if a format has yet to be received.
*/
public MediaFormat getFormat() { public MediaFormat getFormat() {
return format; return format;
} }
/**
* The largest timestamp of any sample received by the output, or {@link Long#MIN_VALUE} if a
* sample has yet to be received.
*/
public long getLargestParsedTimestampUs() { public long getLargestParsedTimestampUs() {
return largestParsedTimestampUs; return largestParsedTimestampUs;
} }
/**
* True if at least one sample can be read from the queue. False otherwise.
*/
public boolean isEmpty() { public boolean isEmpty() {
return !advanceToEligibleSample(); return !advanceToEligibleSample();
} }

View file

@ -55,16 +55,41 @@ import java.util.concurrent.ConcurrentLinkedQueue;
dataQueue = new ConcurrentLinkedQueue<byte[]>(); dataQueue = new ConcurrentLinkedQueue<byte[]>();
extrasHolder = new SampleExtrasHolder(); extrasHolder = new SampleExtrasHolder();
scratch = new ParsableByteArray(INITIAL_SCRATCH_SIZE); scratch = new ParsableByteArray(INITIAL_SCRATCH_SIZE);
lastFragmentOffset = fragmentLength;
} }
public void release() { // Called by the consuming thread, but only when there is no loading thread.
/**
* Clears the buffer, returning all allocations to the allocator.
*/
public void clear() {
infoQueue.clear();
while (!dataQueue.isEmpty()) { while (!dataQueue.isEmpty()) {
fragmentPool.releaseDirect(dataQueue.remove()); fragmentPool.releaseDirect(dataQueue.remove());
} }
totalBytesDropped = 0;
totalBytesWritten = 0;
lastFragment = null;
lastFragmentOffset = fragmentLength;
}
/**
* Returns the current absolute write index.
*/
public int getWriteIndex() {
return infoQueue.getWriteIndex();
} }
// Called by the consuming thread. // Called by the consuming thread.
/**
* Returns the current absolute read index.
*/
public int getReadIndex() {
return infoQueue.getReadIndex();
}
/** /**
* Fills {@code holder} with information about the current sample, but does not write its data. * Fills {@code holder} with information about the current sample, but does not write its data.
* <p> * <p>
@ -83,7 +108,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
*/ */
public void skipSample() { public void skipSample() {
long nextOffset = infoQueue.moveToNextSample(); long nextOffset = infoQueue.moveToNextSample();
dropFragmentsTo(nextOffset); dropDownstreamTo(nextOffset);
} }
/** /**
@ -112,7 +137,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
} }
// Advance the read head. // Advance the read head.
long nextOffset = infoQueue.moveToNextSample(); long nextOffset = infoQueue.moveToNextSample();
dropFragmentsTo(nextOffset); dropDownstreamTo(nextOffset);
return true; return true;
} }
@ -198,7 +223,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
private void readData(long absolutePosition, ByteBuffer target, int length) { private void readData(long absolutePosition, ByteBuffer target, int length) {
int remaining = length; int remaining = length;
while (remaining > 0) { while (remaining > 0) {
dropFragmentsTo(absolutePosition); dropDownstreamTo(absolutePosition);
int positionInFragment = (int) (absolutePosition - totalBytesDropped); int positionInFragment = (int) (absolutePosition - totalBytesDropped);
int toCopy = Math.min(remaining, fragmentLength - positionInFragment); int toCopy = Math.min(remaining, fragmentLength - positionInFragment);
target.put(dataQueue.peek(), positionInFragment, toCopy); target.put(dataQueue.peek(), positionInFragment, toCopy);
@ -216,14 +241,14 @@ import java.util.concurrent.ConcurrentLinkedQueue;
*/ */
// TODO: Consider reducing duplication of this method and the one above. // TODO: Consider reducing duplication of this method and the one above.
private void readData(long absolutePosition, byte[] target, int length) { private void readData(long absolutePosition, byte[] target, int length) {
int remaining = length; int bytesRead = 0;
while (remaining > 0) { while (bytesRead < length) {
dropFragmentsTo(absolutePosition); dropDownstreamTo(absolutePosition);
int positionInFragment = (int) (absolutePosition - totalBytesDropped); int positionInFragment = (int) (absolutePosition - totalBytesDropped);
int toCopy = Math.min(remaining, fragmentLength - positionInFragment); int toCopy = Math.min(length - bytesRead, fragmentLength - positionInFragment);
System.arraycopy(dataQueue.peek(), positionInFragment, target, 0, toCopy); System.arraycopy(dataQueue.peek(), positionInFragment, target, bytesRead, toCopy);
absolutePosition += toCopy; absolutePosition += toCopy;
remaining -= toCopy; bytesRead += toCopy;
} }
} }
@ -233,7 +258,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
* *
* @param absolutePosition The absolute position up to which fragments can be discarded. * @param absolutePosition The absolute position up to which fragments can be discarded.
*/ */
private void dropFragmentsTo(long absolutePosition) { private void dropDownstreamTo(long absolutePosition) {
int relativePosition = (int) (absolutePosition - totalBytesDropped); int relativePosition = (int) (absolutePosition - totalBytesDropped);
int fragmentIndex = relativePosition / fragmentLength; int fragmentIndex = relativePosition / fragmentLength;
for (int i = 0; i < fragmentIndex; i++) { for (int i = 0; i < fragmentIndex; i++) {
@ -266,24 +291,23 @@ import java.util.concurrent.ConcurrentLinkedQueue;
* Appends data to the rolling buffer. * Appends data to the rolling buffer.
* *
* @param dataSource The source from which to read. * @param dataSource The source from which to read.
* @param length The maximum length of the read. * @param length The maximum length of the read, or {@link C#LENGTH_UNBOUNDED} if the caller does
* @return The number of bytes read, or -1 if the the end of the source has been reached. * not wish to impose a limit.
* @return The number of bytes appended.
* @throws IOException If an error occurs reading from the source. * @throws IOException If an error occurs reading from the source.
*/ */
public int appendData(DataSource dataSource, int length) throws IOException { public int appendData(DataSource dataSource, int length) throws IOException {
int remainingWriteLength = length; ensureSpaceForWrite();
if (dataQueue.isEmpty() || lastFragmentOffset == fragmentLength) { int remainingFragmentCapacity = fragmentLength - lastFragmentOffset;
lastFragmentOffset = 0; length = length != C.LENGTH_UNBOUNDED ? Math.min(length, remainingFragmentCapacity)
lastFragment = fragmentPool.allocateDirect(); : remainingFragmentCapacity;
dataQueue.add(lastFragment);
} int bytesRead = dataSource.read(lastFragment, lastFragmentOffset, length);
int thisWriteLength = Math.min(remainingWriteLength, fragmentLength - lastFragmentOffset); if (bytesRead == C.RESULT_END_OF_INPUT) {
int bytesRead = dataSource.read(lastFragment, lastFragmentOffset, thisWriteLength); return C.RESULT_END_OF_INPUT;
if (bytesRead == -1) {
return -1;
} }
lastFragmentOffset += bytesRead; lastFragmentOffset += bytesRead;
remainingWriteLength -= bytesRead;
totalBytesWritten += bytesRead; totalBytesWritten += bytesRead;
return bytesRead; return bytesRead;
} }
@ -294,15 +318,10 @@ import java.util.concurrent.ConcurrentLinkedQueue;
* @param buffer A buffer containing the data to append. * @param buffer A buffer containing the data to append.
* @param length The length of the data to append. * @param length The length of the data to append.
*/ */
// TODO: Consider reducing duplication of this method and the one above.
public void appendData(ParsableByteArray buffer, int length) { public void appendData(ParsableByteArray buffer, int length) {
int remainingWriteLength = length; int remainingWriteLength = length;
while (remainingWriteLength > 0) { while (remainingWriteLength > 0) {
if (dataQueue.isEmpty() || lastFragmentOffset == fragmentLength) { ensureSpaceForWrite();
lastFragmentOffset = 0;
lastFragment = fragmentPool.allocateDirect();
dataQueue.add(lastFragment);
}
int thisWriteLength = Math.min(remainingWriteLength, fragmentLength - lastFragmentOffset); int thisWriteLength = Math.min(remainingWriteLength, fragmentLength - lastFragmentOffset);
buffer.readBytes(lastFragment, lastFragmentOffset, thisWriteLength); buffer.readBytes(lastFragment, lastFragmentOffset, thisWriteLength);
lastFragmentOffset += thisWriteLength; lastFragmentOffset += thisWriteLength;
@ -325,6 +344,17 @@ import java.util.concurrent.ConcurrentLinkedQueue;
infoQueue.commitSample(sampleTimeUs, flags, position, size, encryptionKey); infoQueue.commitSample(sampleTimeUs, flags, position, size, encryptionKey);
} }
/**
* Ensures at least one byte can be written, allocating a new fragment if necessary.
*/
private void ensureSpaceForWrite() {
if (lastFragmentOffset == fragmentLength) {
lastFragmentOffset = 0;
lastFragment = fragmentPool.allocateDirect();
dataQueue.add(lastFragment);
}
}
/** /**
* Holds information about the samples in the rolling buffer. * Holds information about the samples in the rolling buffer.
*/ */
@ -341,8 +371,9 @@ import java.util.concurrent.ConcurrentLinkedQueue;
private byte[][] encryptionKeys; private byte[][] encryptionKeys;
private int queueSize; private int queueSize;
private int readIndex; private int absoluteReadIndex;
private int writeIndex; private int relativeReadIndex;
private int relativeWriteIndex;
public InfoQueue() { public InfoQueue() {
capacity = SAMPLE_CAPACITY_INCREMENT; capacity = SAMPLE_CAPACITY_INCREMENT;
@ -353,8 +384,34 @@ import java.util.concurrent.ConcurrentLinkedQueue;
encryptionKeys = new byte[capacity][]; encryptionKeys = new byte[capacity][];
} }
// Called by the consuming thread, but only when there is no loading thread.
/**
* Clears the queue.
*/
public void clear() {
absoluteReadIndex = 0;
relativeReadIndex = 0;
relativeWriteIndex = 0;
queueSize = 0;
}
/**
* Returns the current absolute write index.
*/
public int getWriteIndex() {
return absoluteReadIndex + queueSize;
}
// Called by the consuming thread. // Called by the consuming thread.
/**
* Returns the current absolute read index.
*/
public int getReadIndex() {
return absoluteReadIndex;
}
/** /**
* Fills {@code holder} with information about the current sample, but does not write its data. * Fills {@code holder} with information about the current sample, but does not write its data.
* The first entry in {@code offsetHolder} is filled with the absolute position of the sample's * The first entry in {@code offsetHolder} is filled with the absolute position of the sample's
@ -371,11 +428,11 @@ import java.util.concurrent.ConcurrentLinkedQueue;
if (queueSize == 0) { if (queueSize == 0) {
return false; return false;
} }
holder.timeUs = timesUs[readIndex]; holder.timeUs = timesUs[relativeReadIndex];
holder.size = sizes[readIndex]; holder.size = sizes[relativeReadIndex];
holder.flags = flags[readIndex]; holder.flags = flags[relativeReadIndex];
extrasHolder.offset = offsets[readIndex]; extrasHolder.offset = offsets[relativeReadIndex];
extrasHolder.encryptionKeyId = encryptionKeys[readIndex]; extrasHolder.encryptionKeyId = encryptionKeys[relativeReadIndex];
return true; return true;
} }
@ -387,23 +444,25 @@ import java.util.concurrent.ConcurrentLinkedQueue;
*/ */
public synchronized long moveToNextSample() { public synchronized long moveToNextSample() {
queueSize--; queueSize--;
int lastReadIndex = readIndex++; int lastReadIndex = relativeReadIndex++;
if (readIndex == capacity) { absoluteReadIndex++;
if (relativeReadIndex == capacity) {
// Wrap around. // Wrap around.
readIndex = 0; relativeReadIndex = 0;
} }
return queueSize > 0 ? offsets[readIndex] : (sizes[lastReadIndex] + offsets[lastReadIndex]); return queueSize > 0 ? offsets[relativeReadIndex]
: (sizes[lastReadIndex] + offsets[lastReadIndex]);
} }
// Called by the loading thread. // Called by the loading thread.
public synchronized void commitSample(long timeUs, int sampleFlags, long offset, int size, public synchronized void commitSample(long timeUs, int sampleFlags, long offset, int size,
byte[] encryptionKey) { byte[] encryptionKey) {
timesUs[writeIndex] = timeUs; timesUs[relativeWriteIndex] = timeUs;
offsets[writeIndex] = offset; offsets[relativeWriteIndex] = offset;
sizes[writeIndex] = size; sizes[relativeWriteIndex] = size;
flags[writeIndex] = sampleFlags; flags[relativeWriteIndex] = sampleFlags;
encryptionKeys[writeIndex] = encryptionKey; encryptionKeys[relativeWriteIndex] = encryptionKey;
// Increment the write index. // Increment the write index.
queueSize++; queueSize++;
if (queueSize == capacity) { if (queueSize == capacity) {
@ -414,13 +473,13 @@ import java.util.concurrent.ConcurrentLinkedQueue;
int[] newFlags = new int[newCapacity]; int[] newFlags = new int[newCapacity];
int[] newSizes = new int[newCapacity]; int[] newSizes = new int[newCapacity];
byte[][] newEncryptionKeys = new byte[newCapacity][]; byte[][] newEncryptionKeys = new byte[newCapacity][];
int beforeWrap = capacity - readIndex; int beforeWrap = capacity - relativeReadIndex;
System.arraycopy(offsets, readIndex, newOffsets, 0, beforeWrap); System.arraycopy(offsets, relativeReadIndex, newOffsets, 0, beforeWrap);
System.arraycopy(timesUs, readIndex, newTimesUs, 0, beforeWrap); System.arraycopy(timesUs, relativeReadIndex, newTimesUs, 0, beforeWrap);
System.arraycopy(flags, readIndex, newFlags, 0, beforeWrap); System.arraycopy(flags, relativeReadIndex, newFlags, 0, beforeWrap);
System.arraycopy(sizes, readIndex, newSizes, 0, beforeWrap); System.arraycopy(sizes, relativeReadIndex, newSizes, 0, beforeWrap);
System.arraycopy(encryptionKeys, readIndex, newEncryptionKeys, 0, beforeWrap); System.arraycopy(encryptionKeys, relativeReadIndex, newEncryptionKeys, 0, beforeWrap);
int afterWrap = readIndex; int afterWrap = relativeReadIndex;
System.arraycopy(offsets, 0, newOffsets, beforeWrap, afterWrap); System.arraycopy(offsets, 0, newOffsets, beforeWrap, afterWrap);
System.arraycopy(timesUs, 0, newTimesUs, beforeWrap, afterWrap); System.arraycopy(timesUs, 0, newTimesUs, beforeWrap, afterWrap);
System.arraycopy(flags, 0, newFlags, beforeWrap, afterWrap); System.arraycopy(flags, 0, newFlags, beforeWrap, afterWrap);
@ -431,15 +490,15 @@ import java.util.concurrent.ConcurrentLinkedQueue;
flags = newFlags; flags = newFlags;
sizes = newSizes; sizes = newSizes;
encryptionKeys = newEncryptionKeys; encryptionKeys = newEncryptionKeys;
readIndex = 0; relativeReadIndex = 0;
writeIndex = capacity; relativeWriteIndex = capacity;
queueSize = capacity; queueSize = capacity;
capacity = newCapacity; capacity = newCapacity;
} else { } else {
writeIndex++; relativeWriteIndex++;
if (writeIndex == capacity) { if (relativeWriteIndex == capacity) {
// Wrap around. // Wrap around.
writeIndex = 0; relativeWriteIndex = 0;
} }
} }
} }

View file

@ -125,13 +125,11 @@ public final class HlsExtractorWrapper implements ExtractorOutput {
} }
/** /**
* Releases the extractor, recycling any pending or incomplete samples to the sample pool. * Clears queues for all tracks, returning all allocations to the buffer pool.
* <p>
* This method should not be called whilst {@link #read(ExtractorInput)} is also being invoked.
*/ */
public void release() { public void clear() {
for (int i = 0; i < sampleQueues.size(); i++) { for (int i = 0; i < sampleQueues.size(); i++) {
sampleQueues.valueAt(i).release(); sampleQueues.valueAt(i).clear();
} }
} }
@ -140,7 +138,7 @@ public final class HlsExtractorWrapper implements ExtractorOutput {
* *
* @return The largest timestamp, or {@link Long#MIN_VALUE} if no samples have been parsed. * @return The largest timestamp, or {@link Long#MIN_VALUE} if no samples have been parsed.
*/ */
public long getLargestSampleTimestamp() { public long getLargestParsedTimestampUs() {
long largestParsedTimestampUs = Long.MIN_VALUE; long largestParsedTimestampUs = Long.MIN_VALUE;
for (int i = 0; i < sampleQueues.size(); i++) { for (int i = 0; i < sampleQueues.size(); i++) {
largestParsedTimestampUs = Math.max(largestParsedTimestampUs, largestParsedTimestampUs = Math.max(largestParsedTimestampUs,

View file

@ -261,7 +261,7 @@ public class HlsSampleSource implements SampleSource, Loader.Callback {
} else if (loadingFinished) { } else if (loadingFinished) {
return TrackRenderer.END_OF_TRACK_US; return TrackRenderer.END_OF_TRACK_US;
} else { } else {
long largestSampleTimestamp = extractors.getLast().getLargestSampleTimestamp(); long largestSampleTimestamp = extractors.getLast().getLargestParsedTimestampUs();
return largestSampleTimestamp == Long.MIN_VALUE ? downstreamPositionUs return largestSampleTimestamp == Long.MIN_VALUE ? downstreamPositionUs
: largestSampleTimestamp; : largestSampleTimestamp;
} }
@ -333,7 +333,7 @@ public class HlsSampleSource implements SampleSource, Loader.Callback {
HlsExtractorWrapper extractor = extractors.getFirst(); HlsExtractorWrapper extractor = extractors.getFirst();
while (extractors.size() > 1 && !haveSamplesForEnabledTracks(extractor)) { while (extractors.size() > 1 && !haveSamplesForEnabledTracks(extractor)) {
// We're finished reading from the extractor for all tracks, and so can discard it. // We're finished reading from the extractor for all tracks, and so can discard it.
extractors.removeFirst().release(); extractors.removeFirst().clear();
extractor = extractors.getFirst(); extractor = extractors.getFirst();
} }
return extractor; return extractor;
@ -382,7 +382,7 @@ public class HlsSampleSource implements SampleSource, Loader.Callback {
private void clearState() { private void clearState() {
for (int i = 0; i < extractors.size(); i++) { for (int i = 0; i < extractors.size(); i++) {
extractors.get(i).release(); extractors.get(i).clear();
} }
extractors.clear(); extractors.clear();
clearCurrentLoadable(); clearCurrentLoadable();