A
A
artch2012-06-04 16:38:59
Java
artch, 2012-06-04 16:38:59

Synchronization on a list of values?

There is a list of URLs with various objects. There are several threads that asynchronously try to download objects and cache them. The threads all start together, and there is a good chance that several threads will try to download the same object, doing extra work and wasting extra traffic.
What is the best way to do a mutual thread-safe check to see if a given object is currently being downloaded, and put the thread to sleep until another running thread finishes downloading and caches the object so that we can immediately get it in the current thread from the cache and stop execution without starting the download?
That is, some kind of synchronization is needed, but a string key should act as a mutex. So far, something like this works for me, but it is not kosher at all:

private static volatile LinkedHashSet<String> sDownloadingNow = new LinkedHashSet<String>();

...

Object getObject(String url) {
   Object object = cache.get(url);
   if( object == null ) {
      while( !sDownloadingNow.add(url) ) {
         SystemClock.sleep(100);
      }				
      object = cache.get(url);
      if(object == null) {
         object = fetchObject(url);
         cache.put(url, object);
      }
      sDownloadingNow.remove(url);
   }
   return object;
}

Answer the question

In order to leave comments, you need to log in

2 answer(s)
M
Moxa, 2012-06-04
@artch

something like this…

import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

public class MyCache<K, V> implements Computable<K, V> {

    private final ConcurrentHashMap<K, Future<V>> cache = new ConcurrentHashMap<K, Future<V>>();
    private final Computable<K, V> c;

    public MyCache(Computable<K, V> c) {
        this.c = c;
    }

    @Override
    public V compute(final K key) throws InterruptedException {
        while (true) {
            Future<V> f = cache.get(key);
            if (f == null) {
                Callable<V> eval = new Callable<V>() {

                    @Override
                    public V call() throws Exception {
                        return c.compute(key);
                    }
                };
                FutureTask<V> ft = new FutureTask<V>(eval);
                f = cache.putIfAbsent(key, ft);
                if (f == null) {
                    f = ft;
                    ft.run();
                }
            }
            try {
                return f.get();
            } catch (CancellationException e) {
                cache.remove(key, f);
            } catch (ExecutionException e) {
                throw new RuntimeException(e.getCause());
            }
        }
    }

    public static interface Computable<K, V> {

        public V compute(K k) throws InterruptedException;
    }
}

S
serso, 2012-06-04
@serso

// key: url, value: list of waiting latches for threads
private static final Map<String, List<CountDownLatch>> waiters;

At the entrance of a new thread, waiters are checked for the presence of a list by url:
if there is no entry (waiters.get(url) == null), then a new empty list is created and placed in the Map, the thread itself starts doing work;
if there is an entry, then this thread creates an object of type CountDownLatch with parameter 1, adds it to the list, and calls the await() method.
The thread that got the right to perform work at the end checks the Map for the list of waiting threads, removes it from the Map and calls the count () methods for each latch.
All operations with waiters must be synchronized:
synchronized(waiters) {
    // check if record for url exists and either continue work or wait
    // ...
}

// do work

synchronized(waiters) {
    // check if record for url and call count() for all waiters, then remove record
    // ...
}

It seems to solve your problem.

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question