StartAndroid Ru RxJava
@rxjavachat 353 участника
Загрузить еще
20 апреля
Подскажите, пожалуйста, что нужно использовать в данной ситуции
Просьба вернутся к моему вопросу. Обрисую задачу немного по другому. Представим что мне нужно выполнить следующие действия:
1) Получить данные с одного сервера
2) Получить данные с второго сервера
3) Прочитать данные из БД
4) Дождатся окончания пунктов 1, 2, 3 и после этого отобразить пользователю что все Ок или же что все плохо при условии что один из пунктов отработал с ошибкой

Пункты 1,2 ,3 полностью не зависят друг от друга и нужно чтобы они выполнялись параллельно.
так по отдельности или вместе?
результата двух запросов обрабатывается совместно?
О @MrStuff88 у нас схожие задачи ;)
зип в обоих случаях
в данном случае, на основе полученных резуьтатов от двух запросов проприсовывается экран
Observable.zip(requestA(), requestB(), (a, b) -> process(a, b))
request`ы A и B могут быть single?
или их тоже в observable переделывать?
соотвественно методы retfoit`a нужно будет переделать
@ga11ean я тут приводил пример решения до которого я дошел...t.me/... Можно подробней объяснить в чем я не прав ?
да
понял, спасибо
Single.zip((Single) requestA(), (Single) requestB(), (a, b) -> process(a, b))
благодарю
Стас там выше написал что мыслите не в рх-стиле. он конечно прав, на это надо почитать побольше... и поглубже))

например не пишут в рх
Completable first = new Completable() {
@Override
protected void subscribeActual(CompletableObserver s) {
doFirstTask();
s.onComplete();
}
};пишут так
Completable first = Completable.fromAction(() -> doSecondTaskTask());
а задачу (первую сверху) я бы решил так
Observable obs =
Observable.zip(
Completable.fromAction(() -> doFirstTask()).toObservable().subscribeOn(Schedulers.newThread()),
Completable.fromAction(() -> doSecondTaskTask()).toObservable().subscribeOn(Schedulers.newThread()),
(t1, t2) -> doFinish());
вторую тот же принцип - ждать все в зипе
подписку стоит откладывать до последнего момента (не знаю насколько актуально в случае наверху)
а в чем тут разница для решения данной задачи между zip и merge ?
зип ждет, мерж же клеит в один поток как только что-то приходит
т.е. на выходе из зипа будет столько элементов, сколько в наименьшем из склеиваемых потоков
в мерже будет обще суммарное кол-во, в любом порядке
Observable zip =
Observable.zip(
Observable.interval(50, TimeUnit.MILLISECONDS).map(v -> "a" + v).take(5),
Observable.interval(100, TimeUnit.MILLISECONDS).map(v -> "b" + v).take(4),
(a, b) -> a + "_" + b);

Observable merge =
Observable.merge(
Observable.interval(50, TimeUnit.MILLISECONDS).map(v -> "a" + v).take(5),
Observable.interval(100, TimeUnit.MILLISECONDS).map(v -> "b" + v).take(4));
Что-то пока я далек от понимания ;) вот пытаюсь сделать так:

Observable first = Completable.fromAction(() -> doFirstTask())
.subscribeOn(Schedulers.newThread()).toObservable();

Observable second = Completable.fromAction(() -> doSecondTaskTask())
.subscribeOn(Schedulers.newThread()).toObservable();

Observable third = Completable.fromAction(() -> doThirdTaskTask())
.subscribeOn(Schedulers.newThread()).toObservable();

Observable obs = Observable.zip(first, second, third, (aVoid, aVoid2, aVoid3) -> finishTask());
obs.subscribe();
В результате как только одна из задач заканчивается оставшиеся получают java.lang.InterruptedException
это получается оставшиеся задачи прерываются. Но почему
?
а вот если делаю вот так:
Observable obsm = Observable.mergeArray(first, second, third).doFinally(() -> {
finishTask();
});
obsm.subscribe();

То отрабатывает все правильно
но подозреваю что это опять не правильно и я мыслю не правильно. Но хотелось бы понять как правильно и почему
не правильно потому что мерж их клеит в одну последовательность, а тебе надо ждать всех троих.
просто у тебя совпало что результатов нет (void), вот тебе их обрабатывать не надо.
а если бы надо было работать с результатом, то merge+doOnComplete не прокатил бы
Если вам нужно последовательно что-то выполнить, зачем вам эрыкс? Вы оборачиваете императивщину в эрыкс, чтобы сделать всё то же самое, что можно было написать просто:
doFirst();
doSecond();
finishTask();
все три должны работать параллельно.
мы тут про зип, он для этого и нужен - в цикле обработки в рх включить ожидание некоторых событий
с разных источников
почему при zip я получаю InterruptedException ?
хороший вопрос, я сам смотрю) зип что-то решил не ждать эти три потока))
вот два примера
Observable obs = Observable.zip(
Observable.fromCallable(() -> doFirstTask()).subscribeOn(Schedulers.newThread()),
Observable.fromCallable(() -> doSecondTaskTask()).subscribeOn(Schedulers.newThread()),
Observable.fromCallable(() -> doThirdTaskTask()).subscribeOn(Schedulers.newThread()),
(aVoid, aVoid2, aVoid3) -> finishTask());Observable obs2 = Observable.zip(
Completable.fromAction(() -> doFirstTask()).subscribeOn(Schedulers.newThread()).toObservable(),
Completable.fromCallable(() -> doSecondTaskTask()).subscribeOn(Schedulers.newThread()).toObservable(),
Completable.fromCallable(() -> doThirdTaskTask()).subscribeOn(Schedulers.newThread()).toObservable(),
(aVoid, aVoid2, aVoid3) -> finishTask());почему-то зип вокруг Completable-ов (которые потом в toObservable) не ждет окончания, а сразу при окончании любого из них завершается, без выполнения finishTask
при этом зип + Observable все ок. я думал они должны сработать одинаково. в чем причина?
выше я два примера скинул, так что не могу ответить почему с zip + Completable так получается. я сам думал что зип будет дожидаться Completable, но почему-то нет. с zip + Observable такой проблемы нет, все ждет
но делая Observable конечно выглядит немного не так
Насколько я понял если у меня нет результата то мне нужно использовать именно Completable. Насколько я понял он и был создан для этого.
да, все верно. он просто показывает завершилось или упало на чем-то
Итого:

Completable first = Completable.fromAction(() -> doFirstTask())
.subscribeOn(Schedulers.newThread());

Completable second = Completable.fromAction(() -> doSecondTaskTask())
.subscribeOn(Schedulers.newThread());

Completable third = Completable.fromAction(() -> doThirdTaskTask())
.subscribeOn(Schedulers.newThread());


Completable.mergeArray(first, second, third).subscribe(() -> {
finishTask();
}, throwable -> {
throwable.printStackTrace();
});
что скажете насчет этого ?
Не вижу смысла что-то переводить в Observable поскольку мои таски по сути side effect. И никакого результата мне не возвращают. Или я опять не прав ?
Что в тасках самих?
Тут проблема в архитектуре, кажется.
Есть приложение оно уже живое. В нем не использовались RX. При запуски приложеия пользователь видит splash screen. Пока пользователь любуется сплеш скрином происходят фоновые операции и как только все запросы заканчиваются пользователь отправляет на главный экран.
1) Сходить на сервер и обновить токен
2) Сходить на другой сервер проверить наличие новой версии
3) Прочитать список темповых файлов в определенной директории
4) Подчистить не нужные файлы
5) и т.д.
все эти задачи не связаны между собой никак и по этому выполнять их последовательно нет смысла
Сейчас все это на AsyncTask работает. Мне нужно внести небольшие модификации вот и думал избавится в этой части от кучи колбеков
Да и хочется начать пробовать применять rx ;)
понял в чем я тупил. zip же вызывает обработчик именно на события (а не onComplete), а Completable возвращают именно onComplete, а не события (их там и нет). соответственно zip сразу отскнавливается, когда первый из подпотоков завершится
727776167
727781740