T
T
Tsuzukeru2021-01-19 23:27:07
Java
Tsuzukeru, 2021-01-19 23:27:07

How to establish a connection to the server using RxJava?

I want to correctly connect to the server using RxJava. It seems to me that I need to change the implementation, but I don't know how. I use Single. I'm especially interested in where it is necessary to close ports and sockets. Tell me how to improve the architecture?) Any advice is welcome.

fun connectToServer() {
        val TAG = "Server answer"
        connectToServerWrapper()
            .subscribeOn(Schedulers.io())
            .subscribeBy (onSuccess = {
                if(it=="200 OK"){
                    _controllerState.postValue(ControllerState.ON)

                    Log.e(TAG, it)
                }
            }, onError = {
                if (it is ConnectException){
                    _controllerState.postValue(ControllerState.NO_INTERNET)
                }
                else {
                    _controllerState.postValue(ControllerState.ERROR)
                }
                Log.e(TAG, it.toString())
            })
            .addTo(compositeDisposable)
    }

    fun connectToServerWrapper(): Single<String> {
        _controllerState.postValue(ControllerState.LOADING)
        return Single.create { emitter ->
            val pw = PrintWriter(Utils.SOCKET.getOutputStream(), true)
            val br = BufferedReader(InputStreamReader(Utils.SOCKET.getInputStream()))
            pw.println("Hello from client 1")
            emitter.onSuccess(br.readLine())
//            pw.close()
//            br.close()
//            socket.close()
        }
    }

Answer the question

In order to leave comments, you need to log in

1 answer(s)
D
Denis Zagaevsky, 2021-01-20
@Tsuzukeru

Something like this, only first release all the resources, and only then throw onSuccess.
Well, it’s completely incomprehensible what the hell is happening with some kind of mutable state. This is best avoided.
For example, you can do this - instead of connectToServer, which returns nothing, but mutates the state, you can make a function that will not mutate anything, but will return Observable states (Loading/Error/Success/etc). To this function there will be a subscription and already processing of states.

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question