java – Нужно ли отписываться от завершенных заметок? –

Почему reactive?

Почему все вокруг вдруг стали говорить о реактивном программировании? Если вы не можете сделать приложение полностью синхронным, то наличие единственного асинхронного ресурса полностью ломает традиционный императивный стиль программирования, к которому мы привыкли.

Чтобы пояснить, почему я считаю это серьёзной проблемой, приведу пример.

Android – rxjava2 как правильно отписаться от устаревших результатов –

Android – это мобильная операционная система Google, используемая для программирования или разработки цифровых устройств (смартфоны, планшеты, автомобили, телевизоры, одежда, стекло, IoT). Для тем, связанных с Android, используйте специальные теги Android, такие как android-intent, android-activity, android-адаптер и т. Д. Для вопросов, не связанных с разработкой или программированием, но связанных с платформой Android, используйте эту ссылку: https: // android.stackexchange.com .

Подробнее про android…

Flowable vs. observable

В RxJava 2 источники представлены двумя основными типами — Flowable и Observable. Они устроены очень похоже. Оба генерируют от нуля до n элементов. Оба могут завершаться успешно или с ошибкой. Так зачем нам два разных типа для представления одной и той же структуры данных?

Всё сводится к такой штуке, как backpressure. Не углубляясь в подробности, скажу лишь, что backpressure позволяет замедлить работу источника данных. Существующие системы имеют ограниченные ресурсы. И с помощью backpressure мы можем сказать всем, кто шлёт нам данные, чтобы они притормозили, потому что мы не можем обрабатывать информацию с той скоростью, с которой она к нам поступает.

В RxJava 1 была поддержка backpressure, но она была добавлена довольно поздно в процессе развития API. В RxJava 1 каждый тип в системе имеет механизм backpressure. И хотя концепцию backpressure поддерживают все типы, далеко не все источники её реализуют, так что использование этого механизма может привести к падению приложения.

Допустим, у нас есть источник данных — события касания экрана. Мы не можем его замедлить. Нельзя же сказать пользователю: «Нарисуй-ка половину символа, остановись и подожди, пока я обработаю, а затем дорисуй оставшееся». Мы можем замедлить ввод данных иначе, например, отключив кнопки или отобразив другой UI, но сам источник замедлить не получится.

Возьмём другой пример: у нас есть база данных, содержащая большой набор строк, из которого нам нужно извлекать по несколько за раз. БД может очень эффективно решить эту задачу благодаря такому инструменту, как курсоры. Но для потока событий касания это реализовать невозможно, потому что нельзя замедлить палец пользователя.

В RxJava 1 оба вышеупомянутых типа реализованы как Observable, так что может случиться, что вы попробуете применить backpressure в рантайме, а в результате получите исключение MissingBackpressureException. Это стало причиной, по которой в RxJava 2 источники представлены в виде разных типов: один поддерживает backpressure, второй — нет.

Observer:

interfaceObserver<T> { voidonNext(T t); voidonComplete(); voidonError(Throwable t); voidonSubscribe(Disposable d);
}

И Subscriber:

interfaceSubscriber<T> { voidonNext(T t); voidonComplete(); voidonError(Throwable t); voidonSubscribe(Subscription s);
}

Первый метод называется onNext, сюда будут доставляться элементы. Этот метод вызывается каждый раз, когда Observable или Flowable генерирует элемент, позволяя его обрабатывать произвольным образом. Это может происходить бесконечно.

onComplete и onError — это терминальные события, то есть вы уже не получите никаких событий от источника после получения одного из них.Различие между интерфейсами Observer и Subscriber заключается в последнем методе – onSubscribe.

Это новый метод по сравнению с RxJava 1. Когда вы подписываетесь на Observable или Flowable, то тем самым создаёте ресурс, а ресурсы часто приходится очищать после окончания работы с ними. Колбэк onSubscribe вызывается сразу же, как только вы начинаете прослушивать Observable или Flowable, и он передаст вам объект одного из двух типов: Disposable.

interfaceObserver<T> { voidonNext(T t); voidonComplete(); voidonError(Throwable t); voidonSubscribe(Disposable d);
}interfaceDisposable{ voiddispose();
}

Или Subscription:

interfaceSubscriber<T> { voidonNext(T t); voidonComplete(); voidonError(Throwable t); voidonSubscribe(Subscription s);
}interfaceSubscription{ voidcancel(); voidrequest(long r);
}

Применительно к Observable тип Disposable позволяет вызывать метод dispose, означающий «Я закончил работать с этим ресурсом, мне больше не нужны данные». Если у вас есть сетевой запрос, то он может быть отменён. Если вы прослушивали бесконечный поток нажатий кнопок, то это будет означать, что вы больше не хотите получать эти события, в таком случае можно удалить OnClickListener у View.

Всё это справедливо и для интерфейса Subscription. Хоть он и называется иначе, но используется точно так же: у него есть метод cancel(), аналогичный dispose(). Отличается он лишь наличием второго метода request(long r), посредством которого backpressure проявляется в API. C помощью этого метода мы говорим Flowable, что нам нужно больше элементов.

Итак, единственная разница между двумя этими типами заключается в том, что один поддерживает backpressure, а другой — нет.

Java – нужно ли отписываться от завершенных заметок? –

Хотя вам не нужно вручную отписываться от прекращенного потока, вы все равно можете создать утечку памяти, используя RxJava2, если вы не будете осторожны.

Другие подписки:  Лучшие платные и бесплатные антивирусы для компьютера и телефона - Топ 15

Рассмотрим следующий код:

repository.getData() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(data-> myTextView.setText(data.toString()));

Параметр lambda в подписке является «синтаксическим сахаром» над анонимным внутренним классом:

subscribe(new Consumer<Data>() { @Overridepublicvoidaccept(final Data data){ myTextView.setText(data.toString()); }
});

В JVM анонимный внутренний класс поддерживает ссылку на внешний класс.

Предположим, что для приведенного выше наивного кода внешним классом является Activity (это также следует для Fragment, Service, BroadcastReceiver или любого класса, жизненный цикл которого контролируется ОС Android).

Активность подписывается на Observer, но затем уничтожается ОС Android в условиях нехватки памяти (вы можете имитировать этот эффект, включив Параметры разработчика / Не сохранять действия). Если работа над Schedulers.io() все еще выполняется, когда действие уничтожено, ссылка на действие будет по-прежнему сохраняться через анонимный внутренний класс. Это означает утечку памяти, которая препятствует завершению Activity сборщиком мусора. Если у Activity есть несколько видов или, скажем, объект Bitmap, тогда утечка памяти может быть весьма существенной.

Здесь есть несколько решений, но одно из них – сохранить объект CompositeDisposable и очистить его в методе жизненного цикла onDestroy() действия Android:

publicclassMyActivityextendsActivity{ DataRepository dataRepository; CompositeDisposable disposables; @OverridepublicvoidonCreate(Bundle savedInstanceState){ super.onCreate(savedInstanceState); disposables = new CompositeDisposable(); } publicvoidonButtonClick(View v){ repository.getData() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .doOnSubscribe(disposable -> disposables.add(disposable)) .subscribe(data -> myTextView.setText(data.toString())); } @OverridepublicvoidonDestroy(){ disposables.clear(); super.onDestroy(); }
}

Вы можете сослаться на хороший пример использования RxJava в приложении для Android в официальном Чертежи архитектуры Google Android.

Rxjava

Переходим к RxJava. Эта реактивная библиотека стала наиболее популярной при разработке под Android по большей части потому, что была первым полноценным [реактивным] инструментом для Java. RxJava 2 сохраняет поддержку старой версии Java, что важно для разработки под Android.

RxJava предоставляет:

Источники

Источник данных делает некую работу, когда вы начинаете или заканчиваете его прослушивать. Представьте сетевой запрос, который не будет отправлен, пока вы не начнёте ожидать ответа. И если вы отписываетесь от источника данных до его завершения, то теоретически он может отменить сетевой запрос.

Источник может работать как синхронно, так и асинхронно. Например, блокирующий сетевой запрос, выполняющийся в фоновом потоке, либо что-то чисто асинхронное вроде обращения к Android и ожидания onActivityResult. Источник может выдать один элемент или несколько элементов.

Ещё источники могут быть пустыми. Это концепция источника данных, который не содержит никаких элементов и работа которого либо успешно выполняется, либо завершается сбоем. Чтобы было понятно: представьте, что вы записываете данные в базу данных или в файл.

Они не возвращают вам элементы. Запись либо успешна, либо нет. В RxJava источники моделируют этот подход «выполнения или отказа» с помощью так называемых терминальных событий onComplete()/ onError(). Это аналогично методу, который либо возвращает ответ, либо бросает исключение.

Завершения может и не быть. К примеру, мы моделируем нажатие кнопки в качестве источника данных, который работает до тех пор, пока работает UI. И когда UI исчезает, то вы, вероятно, отписываетесь от этого источника нажатий кнопки, но он не завершает свою работу.

Всё это соответствует паттерну Observer. У нас есть нечто, что может генерировать данные; есть соглашение, как должны выглядеть эти данные. И мы хотим наблюдать за ними. Мы хотим добавить слушателя и получать уведомления, когда что-то происходит.

Как отписаться от flowable в rxkotlin/rxjava? – android-fragments

Я использую Room с RxJava/RxKotlin Flowable после этой статьи. Я запустил его, но проблема возникла с использованием ViewPager с 3 фрагментами.

Я покажу вам немного кода:

У меня есть просмотрщик с макетом вкладки и три фрагмента (A, B и избранное). Первые два фрагмента содержат списки данных, которые можно добавить в избранное.

В избранном фрагменте я использую Flowable для прослушивания изменений, внесенных A и B, и соответственно обновляю список. Но происходит то, что, когда элемент становится избранным в A и B, приложение вылетает, потому что подписка Flowable в избранном фрагменте запускается, даже если фрагмент находится не на переднем плане.

Я хочу, чтобы была возможность остановить подписку, когда фрагмент не на переднем плане и начать на переднем плане.

Я попытался остановить это в методе onPause любимого фрагмента, но у flowable нет метода отписки или удаления.

Мой код

dbRepository?.getAllImportant()?.subscribeOn(Schedulers.io()) ?.observeOn(AndroidSchedulers.mainThread()) ?.subscribe(getFlowableSubscriber())

Наблюдение за источниками

Поговорим о методах onSubscribe в интерфейсах Observer / Subscriber.

Observer:

interfaceObserver<T> { voidonNext(T t); voidonComplete(); voidonError(Throwable t); voidonSubscribe(Disposable d);
}interfaceDisposable{ voiddispose();
}

Subscriber:

interfaceSubscriber<T> { voidonNext(T t); voidonComplete(); voidonError(Throwable t); voidonSubscribe(Subscription s);
}interfaceSubscription{ voidcancel(); voidrequest(long r);
}

На самом деле, вам необязательно реализовывать интерфейсы Observer/ Subscriber напрямую, когда вы подписываетесь с помощью метода subscribe. Это не вполне удобно как раз из-за четвёртого метода onSubscribe – в нём надо как-то обрабатывать передаваемый объект Disposable / Subscription в момент подписки.

Observable<String> o = Observable.just(“Hello”);
o.subscribe(new Observer<String>() { @OverridepublicvoidonNext(String s){ … } @OverridepublicvoidonComplete(){ … } @OverridepublicvoidonError(Throwable t){ … } @OverridepublicvoidonSubscribe(Disposable d){ ??? }
});

Вместо этого есть возможность использовать DisposableObserver, который автоматически обработает этот четвёртый метод, так что вам достаточно будет позаботиться лишь об уведомлениях от самого Observable.

Observable<String> o = Observable.just("Hello");
o.subscribe(new DisposableObserver<String>() { @OverridepublicvoidonNext(String s){ … } @OverridepublicvoidonComplete(){ … } @OverridepublicvoidonError(Throwable t){ … }
});

Но как нам отписываться? У нас больше нет этого четвёртого метода.Первый способ – сохранить ссылку на DisposableObserver в Observer. Он реализует Disposable, поэтому можно вызвать метод dispose, который позаботится о переадресации вверх по цепочке.

Observable<String> o = Observable.just(“Hello”);
DisposableObserver observer = new DisposableObserver<String>() { @OverridepublicvoidonNext(String s){ … } @OverridepublicvoidonComplete(){ … } @OverridepublicvoidonError(Throwable t){ … }
}
o.subscribe(observer);
observer.dispose();

В RxJava 2 также появился новый метод subscribeWith, который похож на subscribe из RxJava 1. Он возвращает Disposable.

Observable<String> o = Observable.just(“Hello”);
Disposable d = o.subscribeWith(new DisposableObserver<String>() { @OverridepublicvoidonNext(String s){ … } @OverridepublicvoidonComplete(){ … } @OverridepublicvoidonError(Throwable t){ … }
});
d.dispose();

По аналогии с составной подпиской из RxJava есть и составной Disposable: вы можете подписаться на несколько источников, взять возвращаемые Disposable, добавить их в CompositeDisposable и одновременно отписаться от всех потоков.

Observable<String> o = Observable.just(“Hello”);
CompositeDisposable disposables = new CompositeDisposable();
disposables.add(o.subscribeWith(new DisposableObserver<String>() { @OverridepublicvoidonNext(String s){ … } @OverridepublicvoidonComplete(){ … } @OverridepublicvoidonError(Throwable t){ … }
}));
disposables.dispose();

На Android вы часто будете сталкиваться с тем, что у вас есть CompositeDisposable для Activity или фрагмента, и вы отписываетесь в onDestroy (или ещё где-то).

Другие подписки:  Как отключить домофон в квартире - 30 советов адвокатов и юристов

Метод subscribeWith существует для всех четырёх типов без поддержки backpressure.

Объясните rxjava2. disposable

Пишу android приложение. Начал использовать RxJava2 и RxAndroid2. Но во многих вещах еще нет четкого понимания.

Реализовал обращение к API через Retrofit2 которое возвращает Observable<List<Market>>

Потом следующий код и предупреждение IDE:

ide screen

Через ctrl space увидел, что .subscribe() возвращает Disposable.

Что за Disposable такой? Какая его задача и как его правильно использовать?

По RxJava много статей, простые или самые важные вещи объясняются довольно понятно.

Цитата со статьи (podpiski-help.ru) Исследуем RxJava 2 для Android

Применительно к Observable тип Disposable позволяет вызывать метод dispose, означающий «Я закончил работать с этим ресурсом, мне больше не нужны данные». Если у вас есть сетевой запрос, то он может быть отменён. Если вы прослушивали бесконечный поток нажатий кнопок, то это будет означать, что вы больше не хотите получать эти события, в таком случае можно удалить OnClickListener у View.

Кто реализует интрефейс Disposable? Нужно ли мне беспокоится о том, что написано в цитате при использовании Retrofit? И если я сохраню ссылку на Disposable что полезного потом можно с ней сделать?

Просто от не знания, меня напрягает аннотация @SuppressLint("CheckResult") в android проекте, что бы в глаза не бросалось выделение.

Ну и самый главный вопрос: можно/нужно это игнорировать? Или нужно переписать/дописать код с умом.

Операторы

Операторы позволяют решать три задачи:

Рассмотрим первые две.Точно так же, как источники позволяют оборачивать синхронные методы и делать их реактивными, операторы позволяют делать реактивными некие действия. Например, здесь мы применяем метод toUppercase() к строке и получаем новую строку.

String greeting = “Hello”;
String yelling = greeting.toUppercase();

В реактивном мире мы берем Observable и с помощью оператора осуществляем эту операцию.

Observable<String> greeting = Observable.just("Hello");
Observable<String> yelling = greeting.map(s -> s.toUppercase());

В данном случае оператором является map. Он позволяет взять генерируемые данные и применить к ним какую-то операцию, чтобы получить новый тип данных.

Реактивное мышление

В реальном приложении всё работает асинхронно. У нас есть сеть, в которую мы отправляем запросы и из которой через длительное время получаем ответы. Мы не можем блокировать основной поток выполнения, так что работа с сетью должна выполняться в фоновом потоке.

Пользователи — тоже нечто вроде асинхронных источников данных. Мы даём им информацию через UI, и они реагируют на неё нажатиями кнопок и внесением данных в поля.

Пользователь может возвращаться к приложению в разное время. И приложение должно быть готово к приёму данных, должно быть реактивным, чтобы не возникло состояния, при котором блокируется основной поток выполнения; чтобы не было ситуации, когда часть данных поступает асинхронно, а приложение этого не ожидает и в результате не учитывает полученные данные или вообще падает.

В этом и заключается сложность: вы должны поддерживать все эти состояния в ваших Activity / Fragment. Нужно примириться с тем, что многочисленные асинхронные источники генерируют и потребляют данные, возможно, с разной скоростью.

И это мы ещё не учитываем работу самой Android, которая является асинхронной платформой. У нас есть push-уведомления, широковещательные сообщения и изменения конфигурации. Пользователи могут в любой момент поворачивать устройства из портретной ориентации в альбомную и обратно, и если ваш код к этому не готов, то приложение будет крашиться или вести себя неправильно.

Пока вы не можете обеспечить синхронность всей архитектуры вашего приложения, наличие единственного асинхронного ресурса будет ломать традиционный императивный стиль программирования.

Трудно найти приложение, не использующее сетевые запросы, а они по своей природе асинхронны. У вас есть диск, база данных — асинхронные источники. UI тоже должен рассматриваться исключительно как асинхронный источник. Так что по умолчанию всё в Android функционирует асинхронно.

Другие подписки:  Где найти и как отключить платные подписки на сервисы? —

Вместо того чтобы пытаться координировать все асинхронные элементы архитектуры, мы можем снять с себя эту обязанность, соединив их напрямую. Можно напрямую подписать UI на базу данных, чтобы он реагировал на изменения данных. Можно так изменить базу данных и сетевые вызовы, чтобы они реагировали на нажатие кнопки, а не пытаться получить клик и затем его отправить.

Было бы прекрасно, если бы получаемый нами сетевой ответ обновлял данные. Ведь когда обновляются данные, автоматически обновляется и UI. Таким образом мы снимаем с себя ответственность за это. Если Android делает что-то асинхронно (например, поворот экрана или рассылка broadcast), то было бы замечательно, если бы это автоматически отразилось на интерфейсе или при этом автоматически запускалась какая-нибудь фоновая задача.

В целом такой подход позволяет нам не писать кучу кода, необходимого для поддержки состояний: вместо того чтобы управлять состояниями, мы просто соединяем компоненты друг с другом.

Реактивные потоки

Хочу коснуться вопроса, почему типы Disposable и Subscription имеют разные названия, как и их методы — dispose() и cancel(). Почему нельзя было просто расширить один другим, добавив метод request()?

interfacePublisher<T> { voidsubscribe(Subscriber<? super T> s);
}interfaceSubscriber<T> { voidonNext(T t); voidonComplete(); voidonError(Throwable t); voidonSubscribe(Subscription s);
}interfaceSubscription{ voidrequest(long n); voidcancel();
}interfaceProcessor<T, R> extendsSubscriber<T>, Publisher<R> {
}

В представленном коде вы видите типы Subscriber и Subscription. Они являются частью спецификации, и поэтому именно эти названия были использованы в RxJava 2. Поскольку они являются частью стандарта, мы ничего не можем с этим поделать.

Тип Flowable реализует спецификацию реактивных потоков, что подразумевает поддержку backpressure.

Создание источников

На этом примере показано, как создаются источники и как можно оборачивать то, что вы уже используете, в реактивные источники. Все эти типы имеют статические методы, позволяющие создавать их на основе одиночных значений.

Flowable.just("Hello");
Flowable.just("Hello", "World");
Observable.just("Hello");
Observable.just("Hello", "World");
Maybe.just("Hello");
Single.just("Hello");

Также можно создавать их из массивов или Iterable.

String[] array = { “Hello”, “World” };
List<String> list = Arrays.asList(array);
Flowable.fromArray(array);
Flowable.fromIterable(list);
Observable.fromArray(array);
Observable.fromIterable(list);

Но наиболее полезны два метода, которые, как мне кажется, больше всего будут использоваться для адаптации методов и действий, которые вы уже выполняете (как синхронно, так и асинхронно).

Первый называется fromCallable.

Observable.fromCallable(new Callable<String>() { @Overridepublic String call(){ return getName(); }
});

Специализация операторов

В RxJava есть операторы, которые берут Observable и возвращают другой тип. Например, оператор first(), который берёт первый элемент из последовательности и возвращает его. В RxJava 1 мы получали Observable, просто генерирующий один элемент.

Получалось довольно странно: ведь если у вас есть список элементов, и вы применяете к нему get(0), чтобы получить первый элемент, то вы же не получаете в ответ список, состоящий из одного элемента, – вы получаете скаляр. В RxJava 2 сделано иначе: вызывая оператор first(), гарантированно возвращающий один элемент, вы получите Single.

Если Observable был пустой, то это приведёт к ошибке, потому что Single либо содержит элемент, либо возвращает ошибку.

Есть операторы вроде firstElement(), который возвращает Maybe. Пустой Observable можно смоделировать посредством завершения Maybe без ошибки.

Есть операторы, возвращающие Completable. Так что если вас не интересуют данные, и вы просто ждёте завершения или ошибки, то можно использовать оператор ignoreElements.

Всё описанное применимо и к Flowable: здесь есть точно такие же операторы, и они возвращают такие же специализированные типы.

В этой таблице приведены некоторые операторы.

В правой верхней части таблицы представлены операторы «сужающего» действия. Например, если вам нужно посчитать количество элементов в последовательности, то счётчик всегда будет иметь одиночное значение, поэтому выбирается более «узкий» тип вроде Single.

Специализированные источники

В RxJava 2 есть три специализированных источника, представляющих собой подмножество Observable. Первый называется Single. Он либо содержит один элемент, либо выдаёт ошибку, так что это не столько последовательность элементов, сколько потенциально асинхронный источник одиночного элемента.

Он не поддерживает backpressure. Можете представлять его себе как обычный метод. Вы вызываете метод и получаете возвращаемое значение; либо метод бросает исключение. Именно эту схему реализует Single. Вы на него подписываетесь и получаете либо элемент, либо ошибку. Но при этом Single является реактивным.

Второй тип называется Completable. Он похож на void-метод. Он либо успешно завершает свою работу без каких-либо данных, либо бросает исключение. То есть это некий кусок кода, который можно запустить, и он либо успешно выполнится, либо завершится сбоем.

Третий тип — Maybe. Это новый тип по сравнению с RxJava 1. Он может либо содержать элемент, либо выдать ошибку, либо не содержать данных — этакий реактивный Optional. Этот тип тоже не поддерживает backpressure.

В RxJava 2 нет типов, подобных Single/ Completable/ Maybe, но с поддержкой backpressure (совместимых с Reactive Streams Specification).

Заключение

В RxJava 2 реализована следующая идея: берутся изначально асинхронные вещи — работа с сетью, сама Android, база данных, даже UI — и пишется такой код, который реагирует на изменения в этих источниках, а не пытается справиться с изменениями и самостоятельно управлять состояниями.

Если вы используете RxJava 1, то обратите внимание на проект, позволяющий конвертировать типы. С его помощью вы сможете постепенно обновить свои приложения до RxJava 2.

Оцените статью
Подписки Help