Replace LinkedBlockingDeque with our own linked list

This will allow us to maintain a reference to the middle
of the queue, which is necessary to efficiently support
decoupling the read position from the start of the buffer.

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=158839336
This commit is contained in:
olly 2017-06-13 06:43:20 -07:00 committed by Oliver Woodman
parent f3e9166a4e
commit 6362dfeb98
2 changed files with 145 additions and 43 deletions

View file

@ -91,6 +91,13 @@ public class DefaultTrackOutputTest extends TestCase {
inputBuffer = null;
}
public void testDisableReleasesAllocations() {
writeTestData();
assertAllocationCount(10);
trackOutput.disable();
assertAllocationCount(0);
}
public void testReadWithoutWrite() {
assertNoSamplesToRead(null);
}

View file

@ -15,6 +15,7 @@
*/
package com.google.android.exoplayer2.extractor;
import android.support.annotation.Nullable;
import com.google.android.exoplayer2.C;
import com.google.android.exoplayer2.Format;
import com.google.android.exoplayer2.FormatHolder;
@ -26,7 +27,6 @@ import com.google.android.exoplayer2.util.ParsableByteArray;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicInteger;
/**
@ -57,15 +57,16 @@ public final class DefaultTrackOutput implements TrackOutput {
private final Allocator allocator;
private final int allocationLength;
private final SampleMetadataQueue metadataQueue;
private final LinkedBlockingDeque<Allocation> dataQueue;
private final SampleExtrasHolder extrasHolder;
private final ParsableByteArray scratch;
private final AtomicInteger state;
// References into the linked list of allocations.
private AllocationNode firstAllocationNode;
private AllocationNode writeAllocationNode;
// Accessed only by the consuming thread.
private long totalBytesDropped;
private Format downstreamFormat;
// Accessed only by the loading thread (or the consuming thread when there is no loading thread).
@ -73,7 +74,6 @@ public final class DefaultTrackOutput implements TrackOutput {
private Format lastUnadjustedFormat;
private long sampleOffsetUs;
private long totalBytesWritten;
private Allocation lastAllocation;
private int lastAllocationOffset;
private boolean pendingSplice;
private UpstreamFormatChangedListener upstreamFormatChangeListener;
@ -85,11 +85,12 @@ public final class DefaultTrackOutput implements TrackOutput {
this.allocator = allocator;
allocationLength = allocator.getIndividualAllocationLength();
metadataQueue = new SampleMetadataQueue();
dataQueue = new LinkedBlockingDeque<>();
extrasHolder = new SampleExtrasHolder();
scratch = new ParsableByteArray(INITIAL_SCRATCH_SIZE);
state = new AtomicInteger();
lastAllocationOffset = allocationLength;
firstAllocationNode = new AllocationNode(0, allocationLength);
writeAllocationNode = firstAllocationNode;
}
// Called by the consuming thread, but only when there is no loading thread.
@ -149,23 +150,23 @@ public final class DefaultTrackOutput implements TrackOutput {
* @param absolutePosition The absolute position (inclusive) from which to discard data.
*/
private void dropUpstreamFrom(long absolutePosition) {
int relativePosition = (int) (absolutePosition - totalBytesDropped);
// Calculate the index of the allocation containing the position, and the offset within it.
int allocationIndex = relativePosition / allocationLength;
int allocationOffset = relativePosition % allocationLength;
// We want to discard any allocations after the one at allocationIdnex.
int allocationDiscardCount = dataQueue.size() - allocationIndex - 1;
if (allocationOffset == 0) {
// If the allocation at allocationIndex is empty, we should discard that one too.
allocationDiscardCount++;
if (absolutePosition == firstAllocationNode.startPosition) {
clearAllocationNodes(firstAllocationNode);
firstAllocationNode = new AllocationNode(absolutePosition, allocationLength);
writeAllocationNode = firstAllocationNode;
} else {
AllocationNode newWriteAllocationNode = firstAllocationNode;
AllocationNode currentNode = firstAllocationNode.next;
while (absolutePosition > currentNode.startPosition) {
newWriteAllocationNode = currentNode;
currentNode = currentNode.next;
}
clearAllocationNodes(currentNode);
writeAllocationNode = newWriteAllocationNode;
writeAllocationNode.next = new AllocationNode(writeAllocationNode.endPosition,
allocationLength);
lastAllocationOffset = (int) (absolutePosition - writeAllocationNode.startPosition);
}
// Discard the allocations.
for (int i = 0; i < allocationDiscardCount; i++) {
allocator.release(dataQueue.removeLast());
}
// Update lastAllocation and lastAllocationOffset to reflect the new position.
lastAllocation = dataQueue.peekLast();
lastAllocationOffset = allocationOffset == 0 ? allocationLength : allocationOffset;
}
// Called by the consuming thread.
@ -384,14 +385,18 @@ public final class DefaultTrackOutput implements TrackOutput {
*/
private void readData(long absolutePosition, ByteBuffer target, int length) {
int remaining = length;
dropDownstreamTo(absolutePosition);
while (remaining > 0) {
dropDownstreamTo(absolutePosition);
int positionInAllocation = (int) (absolutePosition - totalBytesDropped);
int positionInAllocation = (int) (absolutePosition - firstAllocationNode.startPosition);
int toCopy = Math.min(remaining, allocationLength - positionInAllocation);
Allocation allocation = dataQueue.peek();
Allocation allocation = firstAllocationNode.allocation;
target.put(allocation.data, allocation.translateOffset(positionInAllocation), toCopy);
absolutePosition += toCopy;
remaining -= toCopy;
if (absolutePosition == firstAllocationNode.endPosition) {
allocator.release(allocation);
firstAllocationNode = firstAllocationNode.clear();
}
}
}
@ -404,15 +409,19 @@ public final class DefaultTrackOutput implements TrackOutput {
*/
private void readData(long absolutePosition, byte[] target, int length) {
int bytesRead = 0;
dropDownstreamTo(absolutePosition);
while (bytesRead < length) {
dropDownstreamTo(absolutePosition);
int positionInAllocation = (int) (absolutePosition - totalBytesDropped);
int positionInAllocation = (int) (absolutePosition - firstAllocationNode.startPosition);
int toCopy = Math.min(length - bytesRead, allocationLength - positionInAllocation);
Allocation allocation = dataQueue.peek();
Allocation allocation = firstAllocationNode.allocation;
System.arraycopy(allocation.data, allocation.translateOffset(positionInAllocation), target,
bytesRead, toCopy);
absolutePosition += toCopy;
bytesRead += toCopy;
if (absolutePosition == firstAllocationNode.endPosition) {
allocator.release(allocation);
firstAllocationNode = firstAllocationNode.clear();
}
}
}
@ -423,11 +432,9 @@ public final class DefaultTrackOutput implements TrackOutput {
* @param absolutePosition The absolute position up to which allocations can be discarded.
*/
private void dropDownstreamTo(long absolutePosition) {
int relativePosition = (int) (absolutePosition - totalBytesDropped);
int allocationIndex = relativePosition / allocationLength;
for (int i = 0; i < allocationIndex; i++) {
allocator.release(dataQueue.remove());
totalBytesDropped += allocationLength;
while (absolutePosition >= firstAllocationNode.endPosition) {
allocator.release(firstAllocationNode.allocation);
firstAllocationNode = firstAllocationNode.clear();
}
}
@ -481,8 +488,9 @@ public final class DefaultTrackOutput implements TrackOutput {
}
try {
length = prepareForAppend(length);
int bytesAppended = input.read(lastAllocation.data,
lastAllocation.translateOffset(lastAllocationOffset), length);
Allocation writeAllocation = writeAllocationNode.allocation;
int bytesAppended = input.read(writeAllocation.data,
writeAllocation.translateOffset(lastAllocationOffset), length);
if (bytesAppended == C.RESULT_END_OF_INPUT) {
if (allowEndOfInput) {
return C.RESULT_END_OF_INPUT;
@ -505,7 +513,8 @@ public final class DefaultTrackOutput implements TrackOutput {
}
while (length > 0) {
int thisAppendLength = prepareForAppend(length);
buffer.readBytes(lastAllocation.data, lastAllocation.translateOffset(lastAllocationOffset),
Allocation writeAllocation = writeAllocationNode.allocation;
buffer.readBytes(writeAllocation.data, writeAllocation.translateOffset(lastAllocationOffset),
thisAppendLength);
lastAllocationOffset += thisAppendLength;
totalBytesWritten += thisAppendLength;
@ -553,13 +562,35 @@ public final class DefaultTrackOutput implements TrackOutput {
private void clearSampleData() {
metadataQueue.clearSampleData();
allocator.release(dataQueue.toArray(new Allocation[dataQueue.size()]));
dataQueue.clear();
allocator.trim();
totalBytesDropped = 0;
clearAllocationNodes(firstAllocationNode);
firstAllocationNode = new AllocationNode(0, allocationLength);
writeAllocationNode = firstAllocationNode;
totalBytesWritten = 0;
lastAllocation = null;
lastAllocationOffset = allocationLength;
allocator.trim();
}
/**
* Clears allocation nodes starting from {@code fromNode}.
*
* @param fromNode The node from which to clear.
*/
private void clearAllocationNodes(AllocationNode fromNode) {
if (!fromNode.wasInitialized) {
return;
}
// Bulk release allocations for performance (it's significantly faster when using
// DefaultAllocator because the allocator's lock only needs to be acquired and released once)
// [Internal: See b/29542039].
int allocationCount = (writeAllocationNode.wasInitialized ? 1 : 0)
+ ((int) (writeAllocationNode.startPosition - fromNode.startPosition) / allocationLength);
Allocation[] allocationsToRelease = new Allocation[allocationCount];
AllocationNode currentNode = fromNode;
for (int i = 0; i < allocationsToRelease.length; i++) {
allocationsToRelease[i] = currentNode.allocation;
currentNode = currentNode.clear();
}
allocator.release(allocationsToRelease);
}
/**
@ -569,8 +600,11 @@ public final class DefaultTrackOutput implements TrackOutput {
private int prepareForAppend(int length) {
if (lastAllocationOffset == allocationLength) {
lastAllocationOffset = 0;
lastAllocation = allocator.allocate();
dataQueue.add(lastAllocation);
if (writeAllocationNode.wasInitialized) {
writeAllocationNode = writeAllocationNode.next;
}
writeAllocationNode.initialize(allocator.allocate(),
new AllocationNode(writeAllocationNode.endPosition, allocationLength));
}
return Math.min(length, allocationLength - lastAllocationOffset);
}
@ -592,4 +626,65 @@ public final class DefaultTrackOutput implements TrackOutput {
return format;
}
/**
* A node in a linked list of {@link Allocation}s held by the output.
*/
private static final class AllocationNode {
/**
* The absolute position of the start of the data (inclusive).
*/
public final long startPosition;
/**
* The absolute position of the end of the data (exclusive).
*/
public final long endPosition;
/**
* Whether the node has been initialized. Remains true after {@link #clear()}.
*/
public boolean wasInitialized;
/**
* The {@link Allocation}, or {@code null} if the node is not initialized.
*/
@Nullable public Allocation allocation;
/**
* The next {@link AllocationNode} in the list, or {@code null} if the node has not been
* initialized. Remains set after {@link #clear()}.
*/
@Nullable public AllocationNode next;
/**
* @param startPosition See {@link #startPosition}.
* @param allocationLength The length of the {@link Allocation} with which this node will be
* initialized.
*/
public AllocationNode(long startPosition, int allocationLength) {
this.startPosition = startPosition;
this.endPosition = startPosition + allocationLength;
}
/**
* Initializes the node.
*
* @param allocation The node's {@link Allocation}.
* @param next The next {@link AllocationNode}.
*/
public void initialize(Allocation allocation, AllocationNode next) {
this.allocation = allocation;
this.next = next;
wasInitialized = true;
}
/**
* Clears {@link #allocation}.
*
* @return The next {@link AllocationNode}, for convenience.
*/
public AllocationNode clear() {
allocation = null;
return next;
}
}
}