Answer the question
In order to leave comments, you need to log in
What is the best way to implement an Rx Observable that will pass a thread-safe List to multiple subscribers?
My task:
The device must write the passed route to the server via HTTP. If the server / Internet / connection is unavailable - write the route to the phone's internal database. When the connection is restored, add the missing points to the server.
The structure and model of the application that I am trying to apply (the first application on Android, after all):
CurrentLocationHolder
- stores the current location, location buffer, Subject subscribed to fill the buffer
LocationListener
- gets the new location, saves it to the CurrentLocationHolder.getInstance() property .currentLocation , and adds it to the buffer, so points that have not yet been sent to the server, but not saved in the database, are stored in(List)CurrentLocationHolder.getInstance().locationBuffer
HTTPManager (HTTP request manager)
- manages the queue of requests to the server. subscribes to the fact that the buffer is full, receives its contents, and uploads it to the server;
- onError (network error, loss of Internet, response from the server) changes the NetStateListener.Connected property to false, stops sending requests to the server;
NetStateListener is a simple pinger that has a Subject subscribed to changes in the Connected property (false - starts polling the server. As soon as the server has answered, it changes the Connected property to true.
HTTPManager monitors it and reacts accordingly.
DatabaseManagersubscribed to the fact onError HTTPManager'a, receives from onError a list of points that did not reach the server, writes them to the database. The DatabaseManager is also subscribed to NetStateListener.Connected (when false - writes the location buffer to the database, when true - it starts to quietly throw undelivered location points from the database into the buffer)
Tell me, did I describe the application model correctly? After all, the first application on Android...
So far, I've only implemented this:
A singleton object that stores the current location, a location buffer, Rx - the locationBufferFull subject
public class CurrentLocationHolder {
//Буфер предыдущих местоположений. При наступлении порога locationBuffer.size() его элементы
//должны передаваться подписчикам
//я так понял, что буфер должен быть потокосогласованным (синхронным). поправьте, если я не прав...
private List<Location> locationBuffer = Collections.synchronizedList(new ArrayList<>());
//это свойство хранит текущее местоположение
private Location currentLocation;
//создаём субъект подписки (нафига - я ещё до конца не понимаю, видимо для того, чтобы вызвать метод onNext() )
private final PublishSubject<List<LocationPoint>> locationBufferFull = PublishSubject.create();
//Этот метод, если я правильно понял, возвращает Observable при регистрации подписчиков из
//внешнего кода. поясните, плз, для чего тут emitCurrentValue?
public Observable<List<LocationPoint>> observeLocationBufferFull(boolean emitCurrentValue) {
return emitCurrentValue ? locationBufferFull.startWith(locationBuffer) : locationBufferFull;
}
//сеттер для свойства this.currentLocation
public void setLocation(Location point) {
this.currentLocation = point;
//добавляем местоположение в буфер
locationBuffer.add(point);
//если размер буфера превышает порог, оповещаем подписчиков о заполнении буфера, и передаем
//содержимое буфера подписчикам.
if (locationBuffer.size() >= 10) {
//может быть, рационально вызвать onNext() с аргументом - копией буфера?
locationBufferChanged.onNext(this.locationBuffer);
}
//после обработки подписчиками, очищаем буфер
locationBuffer.clear();
}
}
public class DatabaseManager {
//поле для хранения объекта подписки (оно нужно вообще? и так кода наплодил...)
private Subscription locationBufferSubscription;
private static DatabaseManager instance;
public static void InitInstance() {
if (instance == null) {
instance = new DatabaseManager();
//при инициализации экземпляра класса подписываемся на заполнение locationBuffer
instance.locationBufferSubscription =
CurrentLocationHolder.getInstance().observeLocationBufferFull()
.subscribe(listOfLocations -> {
//пишем содержимое буфера в базу
ActiveAndroid.beginTransaction();
try {
for (int i = 0; i < locArray.size(); i++) {
listOfLocations.get(i).save();
}
ActiveAndroid.setTransactionSuccessful();
} finally {
ActiveAndroid.endTransaction();
}
});
}
}
}
locationBuffer
same for all subscribers?locationBuffer.clear();
will be executed only after all subscribers have processed it? The method locationBuffer.add(point)
must wait in a lock until it executes.locationBuffer.clear();
Answer the question
In order to leave comments, you need to log in
Didn't find what you were looking for?
Ask your questionAsk a Question
731 491 924 answers to any question