A
A
Arseniy Golovin2016-10-30 01:15:17
Java
Arseniy Golovin, 2016-10-30 01:15:17

What is the best way to implement an Rx Observable that will pass a thread-safe List to multiple subscribers?

My task:
The device must write the passed route to the server via HTTP. If the server / Internet / connection is unavailable - write the route to the phone's internal database. When the connection is restored, add the missing points to the server.
The structure and model of the application that I am trying to apply (the first application on Android, after all):
CurrentLocationHolder
- stores the current location, location buffer, Subject subscribed to fill the buffer
LocationListener
- gets the new location, saves it to the CurrentLocationHolder.getInstance() property .currentLocation , and adds it to the buffer, so points that have not yet been sent to the server, but not saved in the database, are stored in(List)CurrentLocationHolder.getInstance().locationBuffer
HTTPManager (HTTP request manager)
- manages the queue of requests to the server. subscribes to the fact that the buffer is full, receives its contents, and uploads it to the server;
- onError (network error, loss of Internet, response from the server) changes the NetStateListener.Connected property to false, stops sending requests to the server;
NetStateListener is a simple pinger that has a Subject subscribed to changes in the Connected property (false - starts polling the server. As soon as the server has answered, it changes the Connected property to true.
HTTPManager monitors it and reacts accordingly.
DatabaseManagersubscribed to the fact onError HTTPManager'a, receives from onError a list of points that did not reach the server, writes them to the database. The DatabaseManager is also subscribed to NetStateListener.Connected (when false - writes the location buffer to the database, when true - it starts to quietly throw undelivered location points from the database into the buffer)
Tell me, did I describe the application model correctly? After all, the first application on Android...
So far, I've only implemented this:
A singleton object that stores the current location, a location buffer, Rx - the locationBufferFull subject

public class CurrentLocationHolder {
    //Буфер предыдущих местоположений. При наступлении порога locationBuffer.size() его элементы 
    //должны передаваться подписчикам
    //я так понял, что буфер должен быть потокосогласованным (синхронным). поправьте, если я не прав...
    private List<Location> locationBuffer = Collections.synchronizedList(new ArrayList<>());
   
    //это свойство хранит текущее местоположение
    private Location currentLocation;
    
    //создаём субъект подписки (нафига - я ещё до конца не понимаю, видимо для того, чтобы вызвать метод onNext() )
    private final PublishSubject<List<LocationPoint>> locationBufferFull = PublishSubject.create();

    //Этот метод, если я правильно понял, возвращает Observable при регистрации подписчиков из 
    //внешнего кода. поясните, плз, для чего тут emitCurrentValue?
    public Observable<List<LocationPoint>> observeLocationBufferFull(boolean emitCurrentValue) {
        return emitCurrentValue ? locationBufferFull.startWith(locationBuffer) : locationBufferFull;
    }

    //сеттер для свойства this.currentLocation
    public void setLocation(Location point) {
        this.currentLocation = point;

        //добавляем местоположение в буфер
        locationBuffer.add(point);

        //если размер буфера превышает порог, оповещаем подписчиков о заполнении буфера, и передаем 
        //содержимое буфера подписчикам. 
        if (locationBuffer.size() >= 10) {
            //может быть, рационально вызвать onNext() с аргументом - копией буфера?
            locationBufferChanged.onNext(this.locationBuffer);
        }
        //после обработки подписчиками, очищаем буфер
        locationBuffer.clear();
    }
}

Subscriber code:
public class DatabaseManager {
    //поле для хранения объекта подписки (оно нужно вообще? и так кода наплодил...)
    private Subscription locationBufferSubscription;

    private static DatabaseManager instance;
    
    public static void InitInstance() {
        if (instance == null) {
            instance = new DatabaseManager();
        //при инициализации экземпляра класса подписываемся на заполнение locationBuffer 
        instance.locationBufferSubscription = 
            CurrentLocationHolder.getInstance().observeLocationBufferFull()
                .subscribe(listOfLocations -> {
                    //пишем содержимое буфера в базу
                    ActiveAndroid.beginTransaction();
                    try {
                        for (int i = 0; i < locArray.size(); i++) {
                            listOfLocations.get(i).save();
                        }
                        ActiveAndroid.setTransactionSuccessful();
                    } finally {
                        ActiveAndroid.endTransaction();
                    }
            });
        }
    }
}

Retrofit will be added to this code, which will write the location to the server ... Tell me, how can I most easily transfer control to the DatabaseManager's buffer processing if retrofit throws an onError? And how in this case to avoid data loss? After all, they must all be guaranteed to get into the database, and when the Internet connection is restored, everything is guaranteed to get to the server ...
And minor questions:
  1. Will the passed -in be the locationBuffersame for all subscribers?
  2. locationBuffer.clear();will be executed only after all subscribers have processed it? The method locationBuffer.add(point)must wait in a lock until it executes.locationBuffer.clear();
  3. Is there an easier way to implement this behavior? I probably write a lot of unnecessary code in Rx

Sorry, if the questions and implementation are stupid, I got acquainted with Java and AndroidSDK a little over a week ago. RxJava philosophy has not entered yet, but the problem needs to be solved urgently, there is no time for buildup at all... Help, plz!

Answer the question

In order to leave comments, you need to log in

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question