Make CacheDataSource detect cache availability change

In certain conditions CacheDataSource switch to reading from upstream
without writing back to cache. This change makes it detect the change of
these conditions and switch to reading from or writing to cache.

-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=180901463
This commit is contained in:
eguven 2018-01-05 01:22:41 -08:00 committed by Oliver Woodman
parent a1bac99f3b
commit 373935aeb6
4 changed files with 201 additions and 72 deletions

View file

@ -41,6 +41,8 @@
* New Cast extension: Simplifies toggling between local and Cast playbacks.
* Audio: Support TrueHD passthrough for rechunked samples in Matroska files
([#2147](https://github.com/google/ExoPlayer/issues/2147)).
* CacheDataSource: Check periodically if it's possible to read from/write to
cache after deciding to bypass cache.
### 2.6.1 ###

View file

@ -86,6 +86,9 @@ public final class CacheDataSource implements DataSource {
}
/** Minimum number of bytes to read before checking cache for availability. */
private static final long MIN_READ_BEFORE_CHECKING_CACHE = 100 * 1024;
private final Cache cache;
private final DataSource cacheReadDataSource;
private final DataSource cacheWriteDataSource;
@ -97,16 +100,17 @@ public final class CacheDataSource implements DataSource {
private final boolean ignoreCacheForUnsetLengthRequests;
private DataSource currentDataSource;
private boolean readingUnknownLengthDataFromUpstream;
private boolean currentDataSpecLengthUnset;
private Uri uri;
private int flags;
private String key;
private long readPosition;
private long bytesRemaining;
private CacheSpan lockedSpan;
private CacheSpan currentHoleSpan;
private boolean seenCacheError;
private boolean currentRequestIgnoresCache;
private long totalCachedBytesRead;
private long checkCachePosition;
/**
* Constructs an instance with default {@link DataSource} and {@link DataSink} instances for
@ -219,8 +223,11 @@ public final class CacheDataSource implements DataSource {
return C.RESULT_END_OF_INPUT;
}
try {
if (readPosition >= checkCachePosition) {
openNextSource();
}
int bytesRead = currentDataSource.read(buffer, offset, readLength);
if (bytesRead >= 0) {
if (bytesRead != C.RESULT_END_OF_INPUT) {
if (currentDataSource == cacheReadDataSource) {
totalCachedBytesRead += bytesRead;
}
@ -228,28 +235,18 @@ public final class CacheDataSource implements DataSource {
if (bytesRemaining != C.LENGTH_UNSET) {
bytesRemaining -= bytesRead;
}
} else {
if (readingUnknownLengthDataFromUpstream) {
setCurrentDataSourceBytesRemaining(0);
}
closeCurrentSource();
if (bytesRemaining > 0 || bytesRemaining == C.LENGTH_UNSET) {
try {
openNextSource();
} catch (IOException e) {
if (readingUnknownLengthDataFromUpstream && isCausedByPositionOutOfRange(e)) {
setCurrentDataSourceBytesRemaining(0);
} else {
throw e;
}
}
if (bytesRemaining != 0) {
return read(buffer, offset, readLength);
}
}
} else if (currentDataSpecLengthUnset) {
setBytesRemaining(0);
} else if (bytesRemaining > 0 || bytesRemaining == C.LENGTH_UNSET) {
openNextSource();
return read(buffer, offset, readLength);
}
return bytesRead;
} catch (IOException e) {
if (currentDataSpecLengthUnset && isCausedByPositionOutOfRange(e)) {
setBytesRemaining(0);
return C.RESULT_END_OF_INPUT;
}
handleBeforeThrow(e);
throw e;
}
@ -278,62 +275,76 @@ public final class CacheDataSource implements DataSource {
* opened to read from the upstream source and write into the cache.
*/
private void openNextSource() throws IOException {
DataSpec dataSpec;
CacheSpan span;
CacheSpan nextSpan;
if (currentRequestIgnoresCache) {
span = null;
nextSpan = null;
} else if (blockOnCache) {
try {
span = cache.startReadWrite(key, readPosition);
nextSpan = cache.startReadWrite(key, readPosition);
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
} else {
span = cache.startReadWriteNonBlocking(key, readPosition);
nextSpan = cache.startReadWriteNonBlocking(key, readPosition);
}
if (span == null) {
DataSpec nextDataSpec;
DataSource nextDataSource;
if (nextSpan == null) {
// The data is locked in the cache, or we're ignoring the cache. Bypass the cache and read
// from upstream.
currentDataSource = upstreamDataSource;
dataSpec = new DataSpec(uri, readPosition, bytesRemaining, key, flags);
} else if (span.isCached) {
nextDataSource = upstreamDataSource;
nextDataSpec = new DataSpec(uri, readPosition, bytesRemaining, key, flags);
} else if (nextSpan.isCached) {
// Data is cached, read from cache.
Uri fileUri = Uri.fromFile(span.file);
long filePosition = readPosition - span.position;
long length = span.length - filePosition;
Uri fileUri = Uri.fromFile(nextSpan.file);
long filePosition = readPosition - nextSpan.position;
long length = nextSpan.length - filePosition;
if (bytesRemaining != C.LENGTH_UNSET) {
length = Math.min(length, bytesRemaining);
}
dataSpec = new DataSpec(fileUri, readPosition, filePosition, length, key, flags);
currentDataSource = cacheReadDataSource;
nextDataSpec = new DataSpec(fileUri, readPosition, filePosition, length, key, flags);
nextDataSource = cacheReadDataSource;
} else {
// Data is not cached, and data is not locked, read from upstream with cache backing.
long length;
if (span.isOpenEnded()) {
if (nextSpan.isOpenEnded()) {
length = bytesRemaining;
} else {
length = span.length;
length = nextSpan.length;
if (bytesRemaining != C.LENGTH_UNSET) {
length = Math.min(length, bytesRemaining);
}
}
dataSpec = new DataSpec(uri, readPosition, length, key, flags);
nextDataSpec = new DataSpec(uri, readPosition, length, key, flags);
if (cacheWriteDataSource != null) {
currentDataSource = cacheWriteDataSource;
lockedSpan = span;
nextDataSource = cacheWriteDataSource;
} else {
currentDataSource = upstreamDataSource;
cache.releaseHoleSpan(span);
nextDataSource = upstreamDataSource;
cache.releaseHoleSpan(nextSpan);
nextSpan = null;
}
}
// If the request is unbounded it must be an upstream request.
readingUnknownLengthDataFromUpstream = dataSpec.length == C.LENGTH_UNSET;
if (nextDataSource == upstreamDataSource) {
checkCachePosition = readPosition + MIN_READ_BEFORE_CHECKING_CACHE;
if (currentDataSource == upstreamDataSource) {
return;
}
} else {
checkCachePosition = Long.MAX_VALUE;
}
closeCurrentSource();
long resolvedLength = currentDataSource.open(dataSpec);
if (readingUnknownLengthDataFromUpstream && resolvedLength != C.LENGTH_UNSET) {
setCurrentDataSourceBytesRemaining(resolvedLength);
if (nextSpan != null && nextSpan.isHoleSpan()) {
currentHoleSpan = nextSpan;
}
currentDataSource = nextDataSource;
currentDataSpecLengthUnset = nextDataSpec.length == C.LENGTH_UNSET;
long resolvedLength = nextDataSource.open(nextDataSpec);
if (currentDataSpecLengthUnset && resolvedLength != C.LENGTH_UNSET) {
setBytesRemaining(resolvedLength);
}
}
@ -351,7 +362,7 @@ public final class CacheDataSource implements DataSource {
return false;
}
private void setCurrentDataSourceBytesRemaining(long bytesRemaining) throws IOException {
private void setBytesRemaining(long bytesRemaining) throws IOException {
this.bytesRemaining = bytesRemaining;
if (isWritingToCache()) {
cache.setContentLength(key, readPosition + bytesRemaining);
@ -369,11 +380,11 @@ public final class CacheDataSource implements DataSource {
try {
currentDataSource.close();
currentDataSource = null;
readingUnknownLengthDataFromUpstream = false;
currentDataSpecLengthUnset = false;
} finally {
if (lockedSpan != null) {
cache.releaseHoleSpan(lockedSpan);
lockedSpan = null;
if (currentHoleSpan != null) {
cache.releaseHoleSpan(currentHoleSpan);
currentHoleSpan = null;
}
}
}

View file

@ -16,6 +16,7 @@
package com.google.android.exoplayer2.upstream.cache;
import android.net.Uri;
import android.support.annotation.Nullable;
import com.google.android.exoplayer2.C;
import com.google.android.exoplayer2.upstream.DataSource;
import com.google.android.exoplayer2.upstream.DataSpec;
@ -110,12 +111,13 @@ public final class CacheUtil {
* @param dataSpec Defines the data to be cached.
* @param cache A {@link Cache} to store the data.
* @param upstream A {@link DataSource} for reading data not in the cache.
* @param counters Counters to update during caching.
* @param counters If not null, updated during caching.
* @throws IOException If an error occurs reading from the source.
* @throws InterruptedException If the thread was interrupted.
*/
public static void cache(DataSpec dataSpec, Cache cache, DataSource upstream,
CachingCounters counters) throws IOException, InterruptedException {
public static void cache(
DataSpec dataSpec, Cache cache, DataSource upstream, @Nullable CachingCounters counters)
throws IOException, InterruptedException {
cache(dataSpec, cache, new CacheDataSource(cache, upstream),
new byte[DEFAULT_BUFFER_SIZE_BYTES], null, 0, counters, false);
}
@ -131,15 +133,21 @@ public final class CacheUtil {
* @param priorityTaskManager If not null it's used to check whether it is allowed to proceed with
* caching.
* @param priority The priority of this task. Used with {@code priorityTaskManager}.
* @param counters Counters to update during caching.
* @param counters If not null, updated during caching.
* @param enableEOFException Whether to throw an {@link EOFException} if end of input has been
* reached unexpectedly.
* @throws IOException If an error occurs reading from the source.
* @throws InterruptedException If the thread was interrupted.
*/
public static void cache(DataSpec dataSpec, Cache cache, CacheDataSource dataSource,
byte[] buffer, PriorityTaskManager priorityTaskManager, int priority,
CachingCounters counters, boolean enableEOFException)
public static void cache(
DataSpec dataSpec,
Cache cache,
CacheDataSource dataSource,
byte[] buffer,
PriorityTaskManager priorityTaskManager,
int priority,
@Nullable CachingCounters counters,
boolean enableEOFException)
throws IOException, InterruptedException {
Assertions.checkNotNull(dataSource);
Assertions.checkNotNull(buffer);

View file

@ -15,7 +15,6 @@
*/
package com.google.android.exoplayer2.upstream.cache;
import static android.net.Uri.EMPTY;
import static com.google.android.exoplayer2.C.LENGTH_UNSET;
import static com.google.android.exoplayer2.upstream.cache.CacheAsserts.assertCacheEmpty;
import static com.google.common.truth.Truth.assertThat;
@ -51,14 +50,16 @@ public final class CacheDataSourceTest {
private static final byte[] TEST_DATA = new byte[] {0, 1, 2, 3, 4, 5, 6, 7, 8, 9};
private static final int MAX_CACHE_FILE_SIZE = 3;
private static final String KEY_1 = "key 1";
private static final String KEY_2 = "key 2";
private Uri testDataUri;
private String testDataKey;
private File tempFolder;
private SimpleCache cache;
@Before
public void setUp() throws Exception {
testDataUri = Uri.parse("test_data");
testDataKey = CacheUtil.generateKey(testDataUri);
tempFolder = Util.createTempDirectory(RuntimeEnvironment.application, "ExoPlayerTest");
cache = new SimpleCache(tempFolder, new NoOpCacheEvictor());
}
@ -116,7 +117,7 @@ public final class CacheDataSourceTest {
// If the user try to access off range then it should throw an IOException
try {
cacheDataSource = createCacheDataSource(false, false);
cacheDataSource.open(new DataSpec(Uri.EMPTY, TEST_DATA.length, 5, KEY_1));
cacheDataSource.open(new DataSpec(testDataUri, TEST_DATA.length, 5, testDataKey));
fail();
} catch (IOException e) {
// success
@ -128,7 +129,7 @@ public final class CacheDataSourceTest {
// Read partial at EOS but don't cross it so length is unknown
CacheDataSource cacheDataSource = createCacheDataSource(false, true);
assertReadData(cacheDataSource, true, TEST_DATA.length - 2, 2);
assertThat(cache.getContentLength(KEY_1)).isEqualTo(LENGTH_UNSET);
assertThat(cache.getContentLength(testDataKey)).isEqualTo(LENGTH_UNSET);
// Now do an unbounded request for whole data. This will cause a bounded request from upstream.
// End of data from upstream shouldn't be mixed up with EOS and cause length set wrong.
@ -136,12 +137,16 @@ public final class CacheDataSourceTest {
assertReadDataContentLength(cacheDataSource, true, true);
// Now the length set correctly do an unbounded request with offset
assertThat(cacheDataSource.open(new DataSpec(EMPTY, TEST_DATA.length - 2,
LENGTH_UNSET, KEY_1))).isEqualTo(2);
assertThat(
cacheDataSource.open(
new DataSpec(testDataUri, TEST_DATA.length - 2, LENGTH_UNSET, testDataKey)))
.isEqualTo(2);
// An unbounded request with offset for not cached content
assertThat(cacheDataSource.open(new DataSpec(EMPTY, TEST_DATA.length - 2,
LENGTH_UNSET, KEY_2))).isEqualTo(LENGTH_UNSET);
assertThat(
cacheDataSource.open(
new DataSpec(Uri.parse("notCachedUri"), TEST_DATA.length - 2, LENGTH_UNSET, null)))
.isEqualTo(LENGTH_UNSET);
}
@Test
@ -159,6 +164,107 @@ public final class CacheDataSourceTest {
assertCacheEmpty(cache);
}
@Test
public void testSwitchToCacheSourceWithReadOnlyCacheDataSource() throws Exception {
// Create a fake data source with a 1 MB default data.
FakeDataSource upstream = new FakeDataSource();
FakeData fakeData = upstream.getDataSet().newDefaultData().appendReadData(1024 * 1024 - 1);
// Insert an action just before the end of the data to fail the test if reading from upstream
// reaches end of the data.
fakeData
.appendReadAction(
new Runnable() {
@Override
public void run() {
fail("Read from upstream shouldn't reach to the end of the data.");
}
})
.appendReadData(1);
// Create cache read-only CacheDataSource.
CacheDataSource cacheDataSource =
new CacheDataSource(cache, upstream, new FileDataSource(), null, 0, null);
// Open source and read some data from upstream as the data hasn't cached yet.
DataSpec dataSpec = new DataSpec(testDataUri, 0, C.LENGTH_UNSET, testDataKey);
cacheDataSource.open(dataSpec);
byte[] buffer = new byte[1024];
cacheDataSource.read(buffer, 0, buffer.length);
// Cache the data.
// Although we use another FakeDataSource instance, it shouldn't matter.
FakeDataSource upstream2 =
new FakeDataSource(
new FakeDataSource()
.getDataSet()
.newDefaultData()
.appendReadData(1024 * 1024)
.endData());
CacheUtil.cache(dataSpec, cache, upstream2, null);
// Read the rest of the data.
while (true) {
if (cacheDataSource.read(buffer, 0, buffer.length) == C.RESULT_END_OF_INPUT) {
break;
}
}
cacheDataSource.close();
}
@Test
public void testSwitchToCacheSourceWithNonBlockingCacheDataSource() throws Exception {
// Create a fake data source with a 1 MB default data.
FakeDataSource upstream = new FakeDataSource();
FakeData fakeData = upstream.getDataSet().newDefaultData().appendReadData(1024 * 1024 - 1);
// Insert an action just before the end of the data to fail the test if reading from upstream
// reaches end of the data.
fakeData
.appendReadAction(
new Runnable() {
@Override
public void run() {
fail("Read from upstream shouldn't reach to the end of the data.");
}
})
.appendReadData(1);
// Lock the content on the cache.
SimpleCacheSpan cacheSpan = cache.startReadWriteNonBlocking(testDataKey, 0);
assertThat(cacheSpan).isNotNull();
assertThat(cacheSpan.isHoleSpan()).isTrue();
// Create non blocking CacheDataSource.
CacheDataSource cacheDataSource = new CacheDataSource(cache, upstream, 0);
// Open source and read some data from upstream without writing to cache as the data is locked.
DataSpec dataSpec = new DataSpec(testDataUri, 0, C.LENGTH_UNSET, testDataKey);
cacheDataSource.open(dataSpec);
byte[] buffer = new byte[1024];
cacheDataSource.read(buffer, 0, buffer.length);
// Unlock the span.
cache.releaseHoleSpan(cacheSpan);
assertCacheEmpty(cache);
// Cache the data.
// Although we use another FakeDataSource instance, it shouldn't matter.
FakeDataSource upstream2 =
new FakeDataSource(
new FakeDataSource()
.getDataSet()
.newDefaultData()
.appendReadData(1024 * 1024)
.endData());
CacheUtil.cache(dataSpec, cache, upstream2, null);
// Read the rest of the data.
while (true) {
if (cacheDataSource.read(buffer, 0, buffer.length) == C.RESULT_END_OF_INPUT) {
break;
}
}
cacheDataSource.close();
}
private void assertCacheAndRead(boolean unboundedRequest, boolean simulateUnknownLength)
throws IOException {
// Read all data from upstream and write to cache
@ -179,8 +285,10 @@ public final class CacheDataSourceTest {
boolean unboundedRequest, boolean unknownLength) throws IOException {
int length = unboundedRequest ? C.LENGTH_UNSET : TEST_DATA.length;
assertReadData(cacheDataSource, unknownLength, 0, length);
assertWithMessage("When the range specified, CacheDataSource doesn't reach EOS so shouldn't "
+ "cache content length").that(cache.getContentLength(KEY_1))
assertWithMessage(
"When the range specified, CacheDataSource doesn't reach EOS so shouldn't "
+ "cache content length")
.that(cache.getContentLength(testDataKey))
.isEqualTo(!unboundedRequest ? C.LENGTH_UNSET : TEST_DATA.length);
}
@ -190,7 +298,7 @@ public final class CacheDataSourceTest {
if (length != C.LENGTH_UNSET) {
testDataLength = Math.min(testDataLength, length);
}
assertThat(cacheDataSource.open(new DataSpec(EMPTY, position, length, KEY_1)))
assertThat(cacheDataSource.open(new DataSpec(testDataUri, position, length, testDataKey)))
.isEqualTo(unknownLength ? length : testDataLength);
byte[] buffer = new byte[100];