Enhance SampleQueue/RollingSampleBuffer to support other use cases.

- This is a step toward hopefully converging HLS and CHUNK packages.
- Add support for encrypted samples.
- Add support for appending from a DataSource.
This commit is contained in:
Oliver Woodman 2015-03-11 15:49:53 +00:00
parent 5d0457152d
commit becc6fca4c
6 changed files with 201 additions and 30 deletions

View file

@ -88,7 +88,7 @@ import java.util.Collections;
appendData(data, bytesToRead);
bytesRead += bytesToRead;
if (bytesRead == sampleSize) {
commitSample(true);
commitSample(C.SAMPLE_FLAG_SYNC);
timeUs += frameDurationUs;
bytesRead = 0;
state = STATE_FINDING_SYNC;

View file

@ -15,6 +15,7 @@
*/
package com.google.android.exoplayer.hls.parser;
import com.google.android.exoplayer.C;
import com.google.android.exoplayer.MediaFormat;
import com.google.android.exoplayer.mp4.Mp4Util;
import com.google.android.exoplayer.upstream.BufferPool;
@ -88,7 +89,7 @@ import java.util.List;
if (isKeyframe && !hasMediaFormat() && sps.isCompleted() && pps.isCompleted()) {
parseMediaFormat(sps, pps);
}
commitSample(isKeyframe, nalUnitOffsetInData);
commitSample(isKeyframe ? C.SAMPLE_FLAG_SYNC : 0, nalUnitOffsetInData);
}
startSample(pesTimeUs, nalUnitOffsetInData);
isKeyframe = false;

View file

@ -15,6 +15,7 @@
*/
package com.google.android.exoplayer.hls.parser;
import com.google.android.exoplayer.C;
import com.google.android.exoplayer.MediaFormat;
import com.google.android.exoplayer.upstream.BufferPool;
import com.google.android.exoplayer.util.ParsableByteArray;
@ -41,7 +42,7 @@ import com.google.android.exoplayer.util.ParsableByteArray;
@Override
public void packetFinished() {
commitSample(true);
commitSample(C.SAMPLE_FLAG_SYNC);
}
}

View file

@ -18,9 +18,11 @@ package com.google.android.exoplayer.hls.parser;
import com.google.android.exoplayer.C;
import com.google.android.exoplayer.SampleHolder;
import com.google.android.exoplayer.upstream.BufferPool;
import com.google.android.exoplayer.upstream.DataSource;
import com.google.android.exoplayer.util.Assertions;
import com.google.android.exoplayer.util.ParsableByteArray;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentLinkedQueue;
@ -29,12 +31,15 @@ import java.util.concurrent.ConcurrentLinkedQueue;
*/
/* package */ final class RollingSampleBuffer {
private static final int INITIAL_SCRATCH_SIZE = 32;
private final BufferPool fragmentPool;
private final int fragmentLength;
private final InfoQueue infoQueue;
private final ConcurrentLinkedQueue<byte[]> dataQueue;
private final long[] dataOffsetHolder;
private final SampleExtrasHolder extrasHolder;
private final ParsableByteArray scratch;
// Accessed only by the consuming thread.
private long totalBytesDropped;
@ -51,7 +56,8 @@ import java.util.concurrent.ConcurrentLinkedQueue;
fragmentLength = bufferPool.bufferLength;
infoQueue = new InfoQueue();
dataQueue = new ConcurrentLinkedQueue<byte[]>();
dataOffsetHolder = new long[1];
extrasHolder = new SampleExtrasHolder();
scratch = new ParsableByteArray(INITIAL_SCRATCH_SIZE);
}
public void release() {
@ -71,7 +77,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
* @return True if the holder was filled. False if there is no current sample.
*/
public boolean peekSample(SampleHolder holder) {
return infoQueue.peekSample(holder, dataOffsetHolder);
return infoQueue.peekSample(holder, extrasHolder);
}
/**
@ -85,23 +91,99 @@ import java.util.concurrent.ConcurrentLinkedQueue;
/**
* Reads the current sample, advancing the read index to the next sample.
*
* @param holder The holder into which the current sample should be written.
* @param sampleHolder The holder into which the current sample should be written.
*/
public void readSample(SampleHolder holder) {
// Write the sample information into the holder.
infoQueue.peekSample(holder, dataOffsetHolder);
// Write the sample data into the holder.
if (holder.data == null || holder.data.capacity() < holder.size) {
holder.replaceBuffer(holder.size);
public void readSample(SampleHolder sampleHolder) {
// Write the sample information into the holder and extrasHolder.
infoQueue.peekSample(sampleHolder, extrasHolder);
// Read encryption data if the sample is encrypted.
if ((sampleHolder.flags & C.SAMPLE_FLAG_ENCRYPTED) != 0) {
readEncryptionData(sampleHolder, extrasHolder);
}
if (holder.data != null) {
readData(dataOffsetHolder[0], holder.data, holder.size);
// Write the sample data into the holder.
if (sampleHolder.data == null || sampleHolder.data.capacity() < sampleHolder.size) {
sampleHolder.replaceBuffer(sampleHolder.size);
}
if (sampleHolder.data != null) {
readData(extrasHolder.offset, sampleHolder.data, sampleHolder.size);
}
// Advance the read head.
long nextOffset = infoQueue.moveToNextSample();
dropFragmentsTo(nextOffset);
}
/**
* Reads encryption data for the current sample.
* <p>
* The encryption data is written into {@code sampleHolder.cryptoInfo}, and
* {@code sampleHolder.size} is adjusted to subtract the number of bytes that were read. The
* same value is added to {@code extrasHolder.offset}.
*
* @param sampleHolder The holder into which the encryption data should be written.
* @param extrasHolder The extras holder whose offset should be read and subsequently adjusted.
*/
private void readEncryptionData(SampleHolder sampleHolder, SampleExtrasHolder extrasHolder) {
long offset = extrasHolder.offset;
// Read the signal byte.
readData(offset, scratch.data, 1);
offset++;
byte signalByte = scratch.data[0];
boolean subsampleEncryption = (signalByte & 0x80) != 0;
int ivSize = signalByte & 0x7F;
// Read the initialization vector.
if (sampleHolder.cryptoInfo.iv == null) {
sampleHolder.cryptoInfo.iv = new byte[16];
}
readData(offset, sampleHolder.cryptoInfo.iv, ivSize);
offset += ivSize;
// Read the subsample count, if present.
int subsampleCount;
if (subsampleEncryption) {
readData(offset, scratch.data, 2);
offset += 2;
scratch.setPosition(0);
subsampleCount = scratch.readUnsignedShort();
} else {
subsampleCount = 1;
}
// Write the clear and encrypted subsample sizes.
int[] clearDataSizes = sampleHolder.cryptoInfo.numBytesOfClearData;
if (clearDataSizes == null || clearDataSizes.length < subsampleCount) {
clearDataSizes = new int[subsampleCount];
}
int[] encryptedDataSizes = sampleHolder.cryptoInfo.numBytesOfEncryptedData;
if (encryptedDataSizes == null || encryptedDataSizes.length < subsampleCount) {
encryptedDataSizes = new int[subsampleCount];
}
if (subsampleEncryption) {
int subsampleDataLength = 6 * subsampleCount;
ensureCapacity(scratch, subsampleDataLength);
readData(offset, scratch.data, subsampleDataLength);
offset += subsampleDataLength;
scratch.setPosition(0);
for (int i = 0; i < subsampleCount; i++) {
clearDataSizes[i] = scratch.readUnsignedShort();
encryptedDataSizes[i] = scratch.readUnsignedIntToInt();
}
} else {
clearDataSizes[0] = 0;
encryptedDataSizes[0] = sampleHolder.size - (int) (offset - extrasHolder.offset);
}
// Populate the cryptoInfo.
sampleHolder.cryptoInfo.set(subsampleCount, clearDataSizes, encryptedDataSizes,
extrasHolder.encryptionKeyId, sampleHolder.cryptoInfo.iv, C.CRYPTO_MODE_AES_CTR);
// Adjust the offset and size to take into account the bytes read.
int bytesRead = (int) (offset - extrasHolder.offset);
extrasHolder.offset += bytesRead;
sampleHolder.size -= bytesRead;
}
/**
* Reads data from the front of the rolling buffer.
*
@ -121,6 +203,26 @@ import java.util.concurrent.ConcurrentLinkedQueue;
}
}
/**
* Reads data from the front of the rolling buffer.
*
* @param absolutePosition The absolute position from which data should be read.
* @param target The array into which data should be written.
* @param length The number of bytes to read.
*/
// TODO: Consider reducing duplication of this method and the one above.
private void readData(long absolutePosition, byte[] target, int length) {
int remaining = length;
while (remaining > 0) {
dropFragmentsTo(absolutePosition);
int positionInFragment = (int) (absolutePosition - totalBytesDropped);
int toCopy = Math.min(remaining, fragmentLength - positionInFragment);
System.arraycopy(dataQueue.peek(), positionInFragment, target, 0, toCopy);
absolutePosition += toCopy;
remaining -= toCopy;
}
}
/**
* Discard any fragments that hold data prior to the specified absolute position, returning
* them to the pool.
@ -136,6 +238,15 @@ import java.util.concurrent.ConcurrentLinkedQueue;
}
}
/**
* Ensure that the passed {@link ParsableByteArray} is of at least the specified limit.
*/
private static void ensureCapacity(ParsableByteArray byteArray, int limit) {
if (byteArray.limit() < limit) {
byteArray.reset(new byte[limit], limit);
}
}
// Called by the loading thread.
/**
@ -151,12 +262,39 @@ import java.util.concurrent.ConcurrentLinkedQueue;
pendingSampleOffset = totalBytesWritten + offset;
}
/**
* Appends data to the rolling buffer.
*
* @param dataSource The source from which to read.
* @param length The maximum length of the read.
* @return The number of bytes read, or -1 if the the end of the source has been reached.
* @throws IOException If an error occurs reading from the source.
*/
public int appendData(DataSource dataSource, int length) throws IOException {
int remainingWriteLength = length;
if (dataQueue.isEmpty() || lastFragmentOffset == fragmentLength) {
lastFragmentOffset = 0;
lastFragment = fragmentPool.allocateDirect();
dataQueue.add(lastFragment);
}
int thisWriteLength = Math.min(remainingWriteLength, fragmentLength - lastFragmentOffset);
int bytesRead = dataSource.read(lastFragment, lastFragmentOffset, thisWriteLength);
if (bytesRead == -1) {
return -1;
}
lastFragmentOffset += bytesRead;
remainingWriteLength -= bytesRead;
totalBytesWritten += bytesRead;
return bytesRead;
}
/**
* Appends data to the rolling buffer.
*
* @param buffer A buffer containing the data to append.
* @param length The length of the data to append.
*/
// TODO: Consider reducing duplication of this method and the one above.
public void appendData(ParsableByteArray buffer, int length) {
int remainingWriteLength = length;
while (remainingWriteLength > 0) {
@ -176,21 +314,22 @@ import java.util.concurrent.ConcurrentLinkedQueue;
/**
* Indicates the end point for the current sample, making it available for consumption.
*
* @param isKeyframe True if the sample being committed is a keyframe. False otherwise.
* @param flags Flags that accompany the sample. See {@link SampleHolder#flags}.
* @param offset The offset of the first byte after the end of the sample's data, relative to
* the total number of bytes written to the buffer. Must be negative or zero.
* @param encryptionKey The encryption key associated with the sample, or null.
*/
public void commitSample(boolean isKeyframe, int offset) {
public void commitSample(int flags, int offset, byte[] encryptionKey) {
Assertions.checkState(offset <= 0);
int sampleSize = (int) (totalBytesWritten + offset - pendingSampleOffset);
infoQueue.commitSample(pendingSampleTimeUs, pendingSampleOffset, sampleSize,
isKeyframe ? C.SAMPLE_FLAG_SYNC : 0);
infoQueue.commitSample(pendingSampleTimeUs, pendingSampleOffset, sampleSize, flags,
encryptionKey);
}
/**
* Holds information about the samples in the rolling buffer.
*/
private static class InfoQueue {
private static final class InfoQueue {
private static final int SAMPLE_CAPACITY_INCREMENT = 1000;
@ -200,6 +339,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
private int[] sizes;
private int[] flags;
private long[] timesUs;
private byte[][] encryptionKeys;
private int queueSize;
private int readIndex;
@ -211,6 +351,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
timesUs = new long[capacity];
flags = new int[capacity];
sizes = new int[capacity];
encryptionKeys = new byte[capacity][];
}
// Called by the consuming thread.
@ -224,18 +365,18 @@ import java.util.concurrent.ConcurrentLinkedQueue;
* {@code offsetHolder[0]}.
*
* @param holder The holder into which the current sample information should be written.
* @param offsetHolder The holder into which the absolute position of the sample's data should
* be written.
* @param extrasHolder The holder into which extra sample information should be written.
* @return True if the holders were filled. False if there is no current sample.
*/
public synchronized boolean peekSample(SampleHolder holder, long[] offsetHolder) {
public synchronized boolean peekSample(SampleHolder holder, SampleExtrasHolder extrasHolder) {
if (queueSize == 0) {
return false;
}
holder.timeUs = timesUs[readIndex];
holder.size = sizes[readIndex];
holder.flags = flags[readIndex];
offsetHolder[0] = offsets[readIndex];
extrasHolder.offset = offsets[readIndex];
extrasHolder.encryptionKeyId = encryptionKeys[readIndex];
return true;
}
@ -257,11 +398,13 @@ import java.util.concurrent.ConcurrentLinkedQueue;
// Called by the loading thread.
public synchronized void commitSample(long timeUs, long offset, int size, int sampleFlags) {
public synchronized void commitSample(long timeUs, long offset, int size, int sampleFlags,
byte[] encryptionKey) {
timesUs[writeIndex] = timeUs;
offsets[writeIndex] = offset;
sizes[writeIndex] = size;
flags[writeIndex] = sampleFlags;
encryptionKeys[writeIndex] = encryptionKey;
// Increment the write index.
queueSize++;
if (queueSize == capacity) {
@ -271,20 +414,24 @@ import java.util.concurrent.ConcurrentLinkedQueue;
long[] newTimesUs = new long[newCapacity];
int[] newFlags = new int[newCapacity];
int[] newSizes = new int[newCapacity];
byte[][] newEncryptionKeys = new byte[newCapacity][];
int beforeWrap = capacity - readIndex;
System.arraycopy(offsets, readIndex, newOffsets, 0, beforeWrap);
System.arraycopy(timesUs, readIndex, newTimesUs, 0, beforeWrap);
System.arraycopy(flags, readIndex, newFlags, 0, beforeWrap);
System.arraycopy(sizes, readIndex, newSizes, 0, beforeWrap);
System.arraycopy(encryptionKeys, readIndex, newEncryptionKeys, 0, beforeWrap);
int afterWrap = readIndex;
System.arraycopy(offsets, 0, newOffsets, beforeWrap, afterWrap);
System.arraycopy(timesUs, 0, newTimesUs, beforeWrap, afterWrap);
System.arraycopy(flags, 0, newFlags, beforeWrap, afterWrap);
System.arraycopy(sizes, 0, newSizes, beforeWrap, afterWrap);
System.arraycopy(encryptionKeys, 0, newEncryptionKeys, beforeWrap, afterWrap);
offsets = newOffsets;
timesUs = newTimesUs;
flags = newFlags;
sizes = newSizes;
encryptionKeys = newEncryptionKeys;
readIndex = 0;
writeIndex = capacity;
queueSize = capacity;
@ -300,4 +447,14 @@ import java.util.concurrent.ConcurrentLinkedQueue;
}
/**
* Holds additional sample information not held by {@link SampleHolder}.
*/
private static final class SampleExtrasHolder {
public long offset;
public byte[] encryptionKeyId;
}
}

View file

@ -19,8 +19,11 @@ import com.google.android.exoplayer.C;
import com.google.android.exoplayer.MediaFormat;
import com.google.android.exoplayer.SampleHolder;
import com.google.android.exoplayer.upstream.BufferPool;
import com.google.android.exoplayer.upstream.DataSource;
import com.google.android.exoplayer.util.ParsableByteArray;
import java.io.IOException;
/**
* Wraps a {@link RollingSampleBuffer}, adding higher level functionality such as enforcing that
* the first sample returned from the queue is a keyframe, allowing splicing to another queue, and
@ -186,16 +189,24 @@ import com.google.android.exoplayer.util.ParsableByteArray;
rollingBuffer.startSample(sampleTimeUs, offset);
}
protected int appendData(DataSource dataSource, int length) throws IOException {
return rollingBuffer.appendData(dataSource, length);
}
protected void appendData(ParsableByteArray buffer, int length) {
rollingBuffer.appendData(buffer, length);
}
protected void commitSample(boolean isKeyframe) {
commitSample(isKeyframe, 0);
protected void commitSample(int flags) {
commitSample(flags, 0, null);
}
protected void commitSample(boolean isKeyframe, int offset) {
rollingBuffer.commitSample(isKeyframe, offset);
protected void commitSample(int flags, int offset) {
commitSample(flags, offset, null);
}
protected void commitSample(int flags, int offset, byte[] encryptionKey) {
rollingBuffer.commitSample(flags, offset, encryptionKey);
writingSample = false;
}

View file

@ -15,6 +15,7 @@
*/
package com.google.android.exoplayer.hls.parser;
import com.google.android.exoplayer.C;
import com.google.android.exoplayer.MediaFormat;
import com.google.android.exoplayer.text.eia608.Eia608Parser;
import com.google.android.exoplayer.upstream.BufferPool;
@ -59,7 +60,7 @@ import com.google.android.exoplayer.util.ParsableByteArray;
if (Eia608Parser.isSeiMessageEia608(payloadType, payloadSize, seiBuffer)) {
startSample(pesTimeUs);
appendData(seiBuffer, payloadSize);
commitSample(true);
commitSample(C.SAMPLE_FLAG_SYNC);
} else {
seiBuffer.skip(payloadSize);
}