Search

RxSwift - Buffering Operator

ย Buffering Operator

Buffer, ์ฆ‰ ์ž„์‹œ ์ €์žฅ ๊ณต๊ฐ„์€ ๋ฐ์ดํ„ฐ๋ฅผ ํ•œ ๊ณณ์—์„œ ๋‹ค๋ฅธ ํ•œ ๊ณณ์œผ๋กœ ์ „์†กํ•˜๋Š” ๋™์•ˆ ์ผ์‹œ์ ์œผ๋กœ ๊ทธ ๋ฐ์ดํ„ฐ๋ฅผ ๋ณด๊ด€ํ•˜๋Š” ๋ฉ”๋ชจ๋ฆฌ ์˜์—ญ์ž…๋‹ˆ๋‹ค.
Buffering Operator๋„ ์ž„์‹œ ์ €์žฅ ๊ณต๊ฐ„๊ฐ™์ด ๊ณผ๊ฑฐ์˜ ์š”์†Œ๋ฅผ ๊ตฌ๋…์ž๊ฐ€ ์ง€์ •ํ•ด๋‘” ๋ฒ„ํผ ํฌ๊ธฐ๋งŒํผ ์ผ์‹œ์ ์œผ๋กœ ๋ณด๊ด€ํ•ด๋’€๋‹ค๊ฐ€ ์ „๋‹ฌํ•ด์ฃผ๋Š” ์—ญํ• ์„ ํ•ฉ๋‹ˆ๋‹ค.

ย Buffering Operator์˜ ์ข…๋ฅ˜

share

ํ•œ ๋ฒˆ ์ƒ์„ฑํ•œ Observable์„ ๊ณต์œ 
Observable์„ shareํ•˜์ง€ ์•Š์œผ๋ฉด subscribeํ•  ๋•Œ๋งˆ๋‹ค ์ƒˆ๋กœ์šด Observable ์‹œํ€€์Šค๊ฐ€ ์ƒ์„ฑ๋ฉ๋‹ˆ๋‹ค.
ํ•˜์ง€๋งŒ, share๋ฅผ ์‚ฌ์šฉํ•˜๊ฒŒ ๋˜๋ฉด Observable์—์„œ ๋ฐœ์ƒํ•œ ์ด๋ฒคํŠธ๊ฐ€ ๋ฒ„ํผ์— ์ €์žฅ๋˜์–ด ํ•˜๋‚˜์˜ ์‹œํ€€์Šค์—์„œ ๋ฐฉ์ถœ๋˜๋Š” ์•„์ดํ…œ์„ subscribeํ•œ ๊ณณ์—์„œ ๊ณต์œ ํ•ด์„œ ์‚ฌ์šฉํ•˜๊ฒŒ ๋ฉ๋‹ˆ๋‹ค.
share๋Š” completed๋˜์ง€ ์•Š๋Š” Observable์— ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์ด ์•ˆ์ „ํ•ฉ๋‹ˆ๋‹ค. ๋˜ํ•œ, ๊ณต์œ ํ•˜๋Š” ์‹œํ€€์Šค๊ฐ€ completed๋œ ํ›„ ์ƒˆ๋กœ์šด Observable์ด ์ƒ์„ฑ๋˜์ง€ ์•Š๋Š”๋‹ค๊ณ  ํ™•์‹ ํ•  ์ˆ˜ ์žˆ์„ ๋•Œ ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์ด ์ข‹์Šต๋‹ˆ๋‹ค.
โ€ข
share๋ฅผ ์‚ฌ์šฉํ•˜์ง€ ์•Š๋Š” ๊ฒฝ์šฐ
let nonShareObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance) .do(onNext: { print($0) }) .take(3) .debug("nonShare") nonShareObservable .map { $0 > 0 } .subscribe { print($0) } .disposed(by: disposeBag) nonShareObservable .map { "Count:\($0)" } .subscribe { print($0) } .disposed(by: disposeBag) // subscribeํ•  ๋•Œ๋งˆ๋‹ค ์ƒˆ๋กœ์šด Observable๋ฅผ subscribeํ•จ /* nonShare -> subscribed nonShare -> subscribed 0 nonShare -> Event next(0) next(false) 0 nonShare -> Event next(0) next(Count:0) 1 nonShare -> Event next(1) next(true) 1 nonShare -> Event next(1) next(Count:1) 2 nonShare -> Event next(2) next(true) nonShare -> Event completed completed nonShare -> isDisposed 2 nonShare -> Event next(2) next(Count:2) nonShare -> Event completed completed nonShare -> isDisposed */
Swift
๋ณต์‚ฌ
โ€ข
share๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๊ฒฝ์šฐ
let shareObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance) .do(onNext: { print($0) }) .take(3) .debug("share") .share() shareObservable .map { $0 > 0 } .subscribe { print($0) } .disposed(by: disposeBag) shareObservable .map { "Count:\($0)" } .subscribe { print($0) } .disposed(by: disposeBag) /* share -> subscribed 0 share -> Event next(0) next(false) next(Count:0) 1 share -> Event next(1) next(true) next(Count:1) 2 share -> Event next(2) next(true) next(Count:2) share -> Event completed completed completed share -> isDisposed */
Swift
๋ณต์‚ฌ

share(replay:, scope:)

๋‹ค๋ฅธ ์‹œํ€€์Šค์—์„œ share๋œ Observable๋ฅผ ๊ตฌ๋…ํ–ˆ์„ ๋•Œ, ๊ฐ€์žฅ ์ตœ๊ทผ ๋ฐฉ์ถœํ–ˆ๋˜ ์•„์ดํ…œ์„ ๋ฒ„ํผ์˜ ํฌ๊ธฐ๋งŒํผ ์ƒˆ๋กœ์šด ๊ตฌ๋… ์‹œํ€€์Šค์— ์ „๋‹ฌ
replay์—๋Š” ๋ฒ„ํผ์˜ ํฌ๊ธฐ๋ฅผ ๋ฐ›์Šต๋‹ˆ๋‹ค. scope์—๋Š” ๋ฒ„ํผ์˜ ์ƒ๋ช…์ฃผ๊ธฐ๊ฐ€ ๋“ค์–ด๊ฐ‘๋‹ˆ๋‹ค.
๋ฒ„ํผ์˜ ์ƒ๋ช…์ฃผ๊ธฐ๋Š” .forever์™€ .whileConnected ๋‘ ๊ฐ€์ง€๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค.
โ€ข
.forever
subscription์ด 0์ด ๋˜๋”๋ผ๋„ ๋ฒ„ํผ๊ฐ€ ์œ ์ง€๋ฉ๋‹ˆ๋‹ค. ์ƒˆ๋กœ์šด subscription์ด subscribe๋ฅผ ํ•˜๋ฉด ๋งˆ์ง€๋ง‰์— ๋ฒ„ํผ์— ๋‚จ์•„์žˆ๋˜ replay ๊ฐœ์ˆ˜๋งŒํผ ๊ฐ’์„ ์ˆ˜์‹ ํ•ฉ๋‹ˆ๋‹ค.
stream์„ ์บ์‹œ์—์„œ ์‚ญ์ œํ•˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค.
โ€ข
.whileConnected
1๊ฐœ ์ด์ƒ์˜ subscriber๊ฐ€ ์กด์žฌํ•˜๋Š” ๋™์•ˆ๋งŒ ๋ฒ„ํผ๊ฐ€ ์œ ์ง€๋ฉ๋‹ˆ๋‹ค. subscription์ด 0์ด ๋˜๋ฉด ๋ฒ„ํผ๊ฐ€ ๋น„์›Œ์ง€๊ณ  ์ƒˆ๋กœ์šด subscription์€ ๋ฒ„ํผ์— ๋‚จ์•„์žˆ๋˜ ๊ฐ’์ด ์—†์œผ๋ฏ€๋กœ replay ์‹œ์— ์ƒˆ ๊ฐ’์„ ์š”์ฒญํ•ด ์ˆ˜์‹ ํ•˜๊ฒŒ ๋ฉ๋‹ˆ๋‹ค.
๊ณต์œ ๋˜๊ณ  ์žˆ๋˜ stream์„ ์บ์‹œ์—์„œ ์‚ญ์ œํ•ฉ๋‹ˆ๋‹ค.
let shareObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance) .do(onNext: { print($0) }) .take(5) .debug("share") .share(replay: 2, scope: .forever) shareObservable .map { $0 > 0 } .subscribe { print($0) } .disposed(by: disposeBag) shareObservable .map { "Count:\($0)" } .subscribe { print($0) } .disposed(by: disposeBag) DispatchQueue.main.asyncAfter(deadline: .now() + 5, execute: { shareObservable .map { "new: \($0)"} .subscribe { print($0) } .disposed(by: self.disposeBag) }) /* share -> subscribed 0 share -> Event next(0) next(false) next(Count:0) ... share -> Event completed share -> isDisposed completed completed next(new: 3) // replay๋ฅผ 2๋กœ ์„ค์ •ํ•ด๋‘์–ด 2๊ฐœ์˜ ์•„์ดํ…œ์„ ์ „๋‹ฌ next(new: 4) completed */
Swift
๋ณต์‚ฌ

replay

subscription์„ ๊ณต์œ ์‹œ ์ง€์ •ํ•œ ๋ฒ„ํผ ํฌ๊ธฐ๋งŒํผ ์ด๋ฒคํŠธ๋ฅผ ์ €์žฅํ•˜๊ณ  ์ „๋‹ฌ
share์—์„œ ๋งค๊ฐœ๋ณ€์ˆ˜๋กœ ์‚ฌ์šฉํ•œ replay์™€ ๋œปํ•˜๋Š” ๋ฐ”๊ฐ€ ๋™์ผํ•ฉ๋‹ˆ๋‹ค. ๋งค๊ฐœ๋ณ€์ˆ˜๋กœ ์ด๋ฒคํŠธ๋ฅผ ์ €์žฅํ•˜๊ณ  ์‹ถ์€ ๋งŒํผ ๋ฒ„ํผ ํฌ๊ธฐ๋ฅผ ์ง€์ •ํ•˜๋ฉด ๋ฉ๋‹ˆ๋‹ค.
ConnectableObservable์ด๊ธฐ ๋•Œ๋ฌธ์— connect ๋ฉ”์„œ๋“œ๋ฅผ ํ†ตํ•ด์„œ ๋ถˆ๋ฆฌ๊ธฐ ์ „๊นŒ์ง€๋Š” ๊ตฌ๋…์ž ์ˆ˜์™€ ๊ด€๊ณ„ ์—†์ด ์•„๋ฌด๊ฒƒ๋„ ๋ฐฉ์ถœํ•˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค.
let bellyClock = Observable<Int> .interval(.seconds(1), scheduler: MainScheduler.instance) .take(until: { $0 == 6 }, behavior: .inclusive) let replay = bellyClock.replay(2) // replayAll()๋ฅผ ์‚ฌ์šฉํ•˜๊ฒŒ ๋˜๋ฉด ๋ชจ๋“  ์ด๋ฒคํŠธ๋ฅผ ๋ฐฉ์ถœ _ = replay.connect() replay .subscribe(onNext: { time in print("๋ฐฐ๊ผฝ ์‹œ๊ณ„ ์šธ๋ฆฐ๋‹ค!! \(time)") }) .disposed(by: disposeBag) replay .delaySubscription(.seconds(3), scheduler: MainScheduler.instance) .subscribe(onNext: { time in print("๋‘๋ฒˆ์งธ ๋ฐฐ๊ผฝ ์‹œ๊ณ„ ์šธ๋ฆฐ๋‹ค!! \(time)") }) .disposed(by: disposeBag) /* ๋ฐฐ๊ผฝ ์‹œ๊ณ„ ์šธ๋ฆฐ๋‹ค!! 0 ๋ฐฐ๊ผฝ ์‹œ๊ณ„ ์šธ๋ฆฐ๋‹ค!! 1 ๋ฐฐ๊ผฝ ์‹œ๊ณ„ ์šธ๋ฆฐ๋‹ค!! 2 ๋‘๋ฒˆ์งธ ๋ฐฐ๊ผฝ ์‹œ๊ณ„ ์šธ๋ฆฐ๋‹ค!! 1 ๋‘๋ฒˆ์งธ ๋ฐฐ๊ผฝ ์‹œ๊ณ„ ์šธ๋ฆฐ๋‹ค!! 2 ๋ฐฐ๊ผฝ ์‹œ๊ณ„ ์šธ๋ฆฐ๋‹ค!! 3 ๋‘๋ฒˆ์งธ ๋ฐฐ๊ผฝ ์‹œ๊ณ„ ์šธ๋ฆฐ๋‹ค!! 3 ๋ฐฐ๊ผฝ ์‹œ๊ณ„ ์šธ๋ฆฐ๋‹ค!! 4 ๋‘๋ฒˆ์งธ ๋ฐฐ๊ผฝ ์‹œ๊ณ„ ์šธ๋ฆฐ๋‹ค!! 4 ๋ฐฐ๊ผฝ ์‹œ๊ณ„ ์šธ๋ฆฐ๋‹ค!! 5 ๋‘๋ฒˆ์งธ ๋ฐฐ๊ผฝ ์‹œ๊ณ„ ์šธ๋ฆฐ๋‹ค!! 5 ๋ฐฐ๊ผฝ ์‹œ๊ณ„ ์šธ๋ฆฐ๋‹ค!! 6 ๋‘๋ฒˆ์งธ ๋ฐฐ๊ผฝ ์‹œ๊ณ„ ์šธ๋ฆฐ๋‹ค!! 6 */
Swift
๋ณต์‚ฌ

publish

Observable์˜ ์‹œํ€€์Šค๋ฅผ ํ•˜๋‚˜์˜ Subject๋ฅผ ํ†ตํ•ด multicast๋กœ ์ด๋ฒคํŠธ๋ฅผ ์ „๋‹ฌ
publish๋Š” multicast์™€ publishSubject๊ฐ€ ํ•ฉ์ณ์ง„ ๋ฐฉ์‹์ž…๋‹ˆ๋‹ค. publish๋Š” multicast์™€ publishSubject๋ฅผ ํ•จ๊ป˜ ์‚ฌ์šฉํ•˜๋Š” ๋ฐฉ์‹์„ ํ•˜๋‚˜์˜ ๋ฉ”์„œ๋“œ๋กœ ํ•จ์ถ•์‹œ์ผœ์ค๋‹ˆ๋‹ค.
โ€ข
multicast + publishSubject๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๋ฐฉ์‹
let bellyClock = Observable<Int> .interval(.seconds(1), scheduler: MainScheduler.instance) .take(until: { $0 == 6 }, behavior: .inclusive) let subject = PublishSubject<Int>() let multicast = bellyClock.multicast(subject) _ = multicast.connect() multicast .subscribe(onNext: { time in print("๋ฐฐ๊ผฝ ์‹œ๊ณ„ ์šธ๋ฆฐ๋‹ค!! \(time)") }) .disposed(by: disposeBag) multicast .delaySubscription(.seconds(3), scheduler: MainScheduler.instance) .subscribe(onNext: { time in print("๋‘๋ฒˆ์งธ ๋ฐฐ๊ผฝ ์‹œ๊ณ„ ์šธ๋ฆฐ๋‹ค!! \(time)") }) .disposed(by: disposeBag)
Swift
๋ณต์‚ฌ
โ€ข
publish๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๋ฐฉ์‹
let bellyClock = Observable<Int> .interval(.seconds(1), scheduler: MainScheduler.instance) .take(until: { $0 == 6 }, behavior: .inclusive) let multicast = bellyClock.publish() _ = multicast.connect() multicast .subscribe(onNext: { time in print("๋ฐฐ๊ผฝ ์‹œ๊ณ„ ์šธ๋ฆฐ๋‹ค!! \(time)") }) .disposed(by: disposeBag) multicast .delaySubscription(.seconds(3), scheduler: MainScheduler.instance) .subscribe(onNext: { time in print("๋‘๋ฒˆ์งธ ๋ฐฐ๊ผฝ ์‹œ๊ณ„ ์šธ๋ฆฐ๋‹ค!! \(time)") }) .disposed(by: disposeBag)
Swift
๋ณต์‚ฌ
๋‘ ๊ฐ€์ง€ ๋ฐฉ์‹ ๋ชจ๋‘ ๊ฐ™์€ Output์„ ๋ฐฉ์ถœํ•ฉ๋‹ˆ๋‹ค.
/* ๋ฐฐ๊ผฝ ์‹œ๊ณ„ ์šธ๋ฆฐ๋‹ค!! 0 ๋ฐฐ๊ผฝ ์‹œ๊ณ„ ์šธ๋ฆฐ๋‹ค!! 1 ๋ฐฐ๊ผฝ ์‹œ๊ณ„ ์šธ๋ฆฐ๋‹ค!! 2 ๋ฐฐ๊ผฝ ์‹œ๊ณ„ ์šธ๋ฆฐ๋‹ค!! 3 ๋‘๋ฒˆ์งธ ๋ฐฐ๊ผฝ ์‹œ๊ณ„ ์šธ๋ฆฐ๋‹ค!! 3 ๋ฐฐ๊ผฝ ์‹œ๊ณ„ ์šธ๋ฆฐ๋‹ค!! 4 ๋‘๋ฒˆ์งธ ๋ฐฐ๊ผฝ ์‹œ๊ณ„ ์šธ๋ฆฐ๋‹ค!! 4 ๋ฐฐ๊ผฝ ์‹œ๊ณ„ ์šธ๋ฆฐ๋‹ค!! 5 ๋‘๋ฒˆ์งธ ๋ฐฐ๊ผฝ ์‹œ๊ณ„ ์šธ๋ฆฐ๋‹ค!! 5 ๋ฐฐ๊ผฝ ์‹œ๊ณ„ ์šธ๋ฆฐ๋‹ค!! 6 ๋‘๋ฒˆ์งธ ๋ฐฐ๊ผฝ ์‹œ๊ณ„ ์šธ๋ฆฐ๋‹ค!! 6 */
Swift
๋ณต์‚ฌ

buffer, window

์ด๋ฒคํŠธ๋ฅผ ๋ฌถ์Œ์œผ๋กœ ์ „๋‹ฌ
๋‘ ์—ฐ์‚ฐ์ž์˜ ์ธ์ž๋Š” ์™„์ „ํžˆ ๋™์ผํ•˜์ง€๋งŒ, ๋ฐ˜ํ™˜๊ฐ’์ด ๋‹ค๋ฅด๊ฒŒ ๋‚˜ํƒ€๋‚ฉ๋‹ˆ๋‹ค.
func buffer(timeSpan: RxTimeInterval, count: Int, scheduler: SchedulerType) -> Observable<[Element]> func window(timeSpan: RxTimeInterval, count: Int, scheduler: SchedulerType) -> Observable<Observable<Element>>
Swift
๋ณต์‚ฌ
buffer๋Š” ๋ฐฐ์—ด ํ˜•ํƒœ๋กœ ์ด๋ฒคํŠธ๋ฅผ ๋ฐฉ์ถœํ•˜์ง€๋งŒ, window๋Š” ๋‹ค๋ฅธ Observable ํ˜•ํƒœ๋กœ ์ด๋ฒคํŠธ๋ฅผ ๋ฐฉ์ถœํ•ฉ๋‹ˆ๋‹ค.
buffer์™€ window๋Š” ๋‘ ๊ฐ€์ง€ ์กฐ๊ฑด ์ค‘ 1๊ฐ€์ง€๊ฐ€ ๋งŒ์กฑ๋˜๊ธฐ ์ „๊นŒ์ง€๋Š” ์ด๋ฒคํŠธ๋ฅผ ๋ฐฉ์ถœํ•˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค.
โ“ต ์ฒซ ๊ตฌ๋… ์‹œ์  ํ˜น์€ ๊ฐ€์žฅ ์ตœ๊ทผ ์ด๋ฒคํŠธ ๋ฐœ์ƒ ํ›„๋กœ timespan์œผ๋กœ ์ง€์ •ํ•œ ์‹œ๊ฐ„์ด ์ง€๋‚ฌ์„ ๊ฒฝ์šฐ
โ“ถ ์ฒซ ๊ตฌ๋… ์‹œ์  ํ˜น์€ ๊ฐ€์žฅ ์ตœ๊ทผ ์ด๋ฒคํŠธ ๋ฐœ์ƒ ํ›„๋กœ ์›๋ณธ Observable์—์„œ count๋งŒํผ์˜ ์ด๋ฒคํŠธ๊ฐ€ ๋ฐœ์ƒํ•œ ๊ฒฝ์šฐ
ํ•˜๋‚˜์˜ ์กฐ๊ฑด์„ ๋งŒ์กฑํ•˜๊ธฐ ์ „๊นŒ์ง€๋Š” ์›๋ณธ Observable์˜ ์ด๋ฒคํŠธ๋Š” ๋‚ด๋ถ€ ๋ฒ„ํผ์— ์ €์žฅ๋ฉ๋‹ˆ๋‹ค. ์ด๋ฒคํŠธ ๋ฐœ์ƒ ์‹œ์ ์ด ๋˜๋ฉด ํ•ด๋‹น ๋ฒ„ํผ์˜ ์ด๋ฒคํŠธ๋“ค์„ ๋ชจ๋‘ ๋‚ด๋ณด๋‚ด๊ณ  ๋ฒ„ํผ๋ฅผ ๋น„์šฐ๊ณ  ๋‹ค์‹œ ํƒ€์ด๋จธ(timespan)๋ฅผ ์‹œ์ž‘ํ•ฉ๋‹ˆ๋‹ค.
buffer, window๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ์ด๋ฒคํŠธ๋ฅผ ํŠน์ • ํšŸ์ˆ˜๋‚˜ ์‹œ๊ฐ„ ๋‹จ์œ„๋กœ ๋ฌถ์–ด์„œ ํ•œ๊บผ๋ฒˆ์— ์ฒ˜๋ฆฌํ•  ์ˆ˜ ์žˆ๊ฒŒ ๋ฉ๋‹ˆ๋‹ค.
โ€ข
buffer
let timer1 = Observable<Int> .interval(.seconds(1), scheduler: MainScheduler.instance) .take(until: { $0 == 5 }, behavior: .inclusive) .map({ "์„ธ๊ณ  ์žˆ๋‹ค \($0)" }) timer1 .buffer(timeSpan: .seconds(3), count: 2, scheduler: MainScheduler.instance) .subscribe { event in switch event { case let .next(value): print(value) default: print("finished") } }.disposed(by: disposeBag) /* ["์„ธ๊ณ  ์žˆ๋‹ค 0", "์„ธ๊ณ  ์žˆ๋‹ค 1"] ["์„ธ๊ณ  ์žˆ๋‹ค 2", "์„ธ๊ณ  ์žˆ๋‹ค 3"] ["์„ธ๊ณ  ์žˆ๋‹ค 4", "์„ธ๊ณ  ์žˆ๋‹ค 5"] [] finished */
Swift
๋ณต์‚ฌ
โ€ข
window
let timer1 = Observable<Int> .interval(.seconds(1), scheduler: MainScheduler.instance) .take(until: { $0 == 5 }, behavior: .inclusive) .map({ "์„ธ๊ณ  ์žˆ๋‹ค \($0)" }) timer1 .window(timeSpan: .seconds(3), count: 2, scheduler: MainScheduler.instance) .subscribe { [weak self] event in switch event { case let .next(observable): observable .subscribe { e in switch e { case let .next(value): print(value) default: print("inner finished") } } .disposed(by: self?.disposeBag ?? DisposeBag()) default: print("finished") } }.disposed(by: disposeBag) /* ์„ธ๊ณ  ์žˆ๋‹ค 0 ์„ธ๊ณ  ์žˆ๋‹ค 1 inner finished ์„ธ๊ณ  ์žˆ๋‹ค 2 ์„ธ๊ณ  ์žˆ๋‹ค 3 inner finished ์„ธ๊ณ  ์žˆ๋‹ค 4 ์„ธ๊ณ  ์žˆ๋‹ค 5 inner finished inner finished finished */
Swift
๋ณต์‚ฌ