Answer the question
In order to leave comments, you need to log in
Synchronization on a list of values?
There is a list of URLs with various objects. There are several threads that asynchronously try to download objects and cache them. The threads all start together, and there is a good chance that several threads will try to download the same object, doing extra work and wasting extra traffic.
What is the best way to do a mutual thread-safe check to see if a given object is currently being downloaded, and put the thread to sleep until another running thread finishes downloading and caches the object so that we can immediately get it in the current thread from the cache and stop execution without starting the download?
That is, some kind of synchronization is needed, but a string key should act as a mutex. So far, something like this works for me, but it is not kosher at all:
private static volatile LinkedHashSet<String> sDownloadingNow = new LinkedHashSet<String>();
...
Object getObject(String url) {
Object object = cache.get(url);
if( object == null ) {
while( !sDownloadingNow.add(url) ) {
SystemClock.sleep(100);
}
object = cache.get(url);
if(object == null) {
object = fetchObject(url);
cache.put(url, object);
}
sDownloadingNow.remove(url);
}
return object;
}
Answer the question
In order to leave comments, you need to log in
something like this…
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
public class MyCache<K, V> implements Computable<K, V> {
private final ConcurrentHashMap<K, Future<V>> cache = new ConcurrentHashMap<K, Future<V>>();
private final Computable<K, V> c;
public MyCache(Computable<K, V> c) {
this.c = c;
}
@Override
public V compute(final K key) throws InterruptedException {
while (true) {
Future<V> f = cache.get(key);
if (f == null) {
Callable<V> eval = new Callable<V>() {
@Override
public V call() throws Exception {
return c.compute(key);
}
};
FutureTask<V> ft = new FutureTask<V>(eval);
f = cache.putIfAbsent(key, ft);
if (f == null) {
f = ft;
ft.run();
}
}
try {
return f.get();
} catch (CancellationException e) {
cache.remove(key, f);
} catch (ExecutionException e) {
throw new RuntimeException(e.getCause());
}
}
}
public static interface Computable<K, V> {
public V compute(K k) throws InterruptedException;
}
}
// key: url, value: list of waiting latches for threads
private static final Map<String, List<CountDownLatch>> waiters;
synchronized(waiters) {
// check if record for url exists and either continue work or wait
// ...
}
// do work
synchronized(waiters) {
// check if record for url and call count() for all waiters, then remove record
// ...
}
Didn't find what you were looking for?
Ask your questionAsk a Question
731 491 924 answers to any question