ย Buffering Operator
Buffer, ์ฆ ์์ ์ ์ฅ ๊ณต๊ฐ์ ๋ฐ์ดํฐ๋ฅผ ํ ๊ณณ์์ ๋ค๋ฅธ ํ ๊ณณ์ผ๋ก ์ ์กํ๋ ๋์ ์ผ์์ ์ผ๋ก ๊ทธ ๋ฐ์ดํฐ๋ฅผ ๋ณด๊ดํ๋ ๋ฉ๋ชจ๋ฆฌ ์์ญ์
๋๋ค.
Buffering Operator๋ ์์ ์ ์ฅ ๊ณต๊ฐ๊ฐ์ด ๊ณผ๊ฑฐ์ ์์๋ฅผ ๊ตฌ๋
์๊ฐ ์ง์ ํด๋ ๋ฒํผ ํฌ๊ธฐ๋งํผ ์ผ์์ ์ผ๋ก ๋ณด๊ดํด๋๋ค๊ฐ ์ ๋ฌํด์ฃผ๋ ์ญํ ์ ํฉ๋๋ค.
ย Buffering Operator์ ์ข ๋ฅ
share
ํ ๋ฒ ์์ฑํ Observable์ ๊ณต์
Observable์ shareํ์ง ์์ผ๋ฉด subscribeํ ๋๋ง๋ค ์๋ก์ด Observable ์ํ์ค๊ฐ ์์ฑ๋ฉ๋๋ค.
ํ์ง๋ง, share๋ฅผ ์ฌ์ฉํ๊ฒ ๋๋ฉด Observable์์ ๋ฐ์ํ ์ด๋ฒคํธ๊ฐ ๋ฒํผ์ ์ ์ฅ๋์ด ํ๋์ ์ํ์ค์์ ๋ฐฉ์ถ๋๋ ์์ดํ
์ subscribeํ ๊ณณ์์ ๊ณต์ ํด์ ์ฌ์ฉํ๊ฒ ๋ฉ๋๋ค.
share๋ completed๋์ง ์๋ Observable์ ์ฌ์ฉํ๋ ๊ฒ์ด ์์ ํฉ๋๋ค. ๋ํ, ๊ณต์ ํ๋ ์ํ์ค๊ฐ completed๋ ํ ์๋ก์ด Observable์ด ์์ฑ๋์ง ์๋๋ค๊ณ ํ์ ํ ์ ์์ ๋ ์ฌ์ฉํ๋ ๊ฒ์ด ์ข์ต๋๋ค.
โข
share๋ฅผ ์ฌ์ฉํ์ง ์๋ ๊ฒฝ์ฐ
let nonShareObservable =
Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
.do(onNext: {
print($0)
})
.take(3)
.debug("nonShare")
nonShareObservable
.map { $0 > 0 }
.subscribe { print($0) }
.disposed(by: disposeBag)
nonShareObservable
.map { "Count:\($0)" }
.subscribe { print($0) }
.disposed(by: disposeBag)
// subscribeํ ๋๋ง๋ค ์๋ก์ด Observable๋ฅผ subscribeํจ
/*
nonShare -> subscribed
nonShare -> subscribed
0
nonShare -> Event next(0)
next(false)
0
nonShare -> Event next(0)
next(Count:0)
1
nonShare -> Event next(1)
next(true)
1
nonShare -> Event next(1)
next(Count:1)
2
nonShare -> Event next(2)
next(true)
nonShare -> Event completed
completed
nonShare -> isDisposed
2
nonShare -> Event next(2)
next(Count:2)
nonShare -> Event completed
completed
nonShare -> isDisposed
*/
Swift
๋ณต์ฌ
โข
share๋ฅผ ์ฌ์ฉํ๋ ๊ฒฝ์ฐ
let shareObservable =
Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
.do(onNext: {
print($0)
})
.take(3)
.debug("share")
.share()
shareObservable
.map { $0 > 0 }
.subscribe { print($0) }
.disposed(by: disposeBag)
shareObservable
.map { "Count:\($0)" }
.subscribe { print($0) }
.disposed(by: disposeBag)
/*
share -> subscribed
0
share -> Event next(0)
next(false)
next(Count:0)
1
share -> Event next(1)
next(true)
next(Count:1)
2
share -> Event next(2)
next(true)
next(Count:2)
share -> Event completed
completed
completed
share -> isDisposed
*/
Swift
๋ณต์ฌ
share(replay:, scope:)
๋ค๋ฅธ ์ํ์ค์์ share๋ Observable๋ฅผ ๊ตฌ๋
ํ์ ๋, ๊ฐ์ฅ ์ต๊ทผ ๋ฐฉ์ถํ๋ ์์ดํ
์ ๋ฒํผ์ ํฌ๊ธฐ๋งํผ ์๋ก์ด ๊ตฌ๋
์ํ์ค์ ์ ๋ฌ
replay์๋ ๋ฒํผ์ ํฌ๊ธฐ๋ฅผ ๋ฐ์ต๋๋ค. scope์๋ ๋ฒํผ์ ์๋ช
์ฃผ๊ธฐ๊ฐ ๋ค์ด๊ฐ๋๋ค.
๋ฒํผ์ ์๋ช
์ฃผ๊ธฐ๋ .forever์ .whileConnected ๋ ๊ฐ์ง๊ฐ ์์ต๋๋ค.
โข
.forever
subscription์ด 0์ด ๋๋๋ผ๋ ๋ฒํผ๊ฐ ์ ์ง๋ฉ๋๋ค. ์๋ก์ด subscription์ด subscribe๋ฅผ ํ๋ฉด ๋ง์ง๋ง์ ๋ฒํผ์ ๋จ์์๋ replay ๊ฐ์๋งํผ ๊ฐ์ ์์ ํฉ๋๋ค.
stream์ ์บ์์์ ์ญ์ ํ์ง ์์ต๋๋ค.
โข
.whileConnected
1๊ฐ ์ด์์ subscriber๊ฐ ์กด์ฌํ๋ ๋์๋ง ๋ฒํผ๊ฐ ์ ์ง๋ฉ๋๋ค. subscription์ด 0์ด ๋๋ฉด ๋ฒํผ๊ฐ ๋น์์ง๊ณ ์๋ก์ด subscription์ ๋ฒํผ์ ๋จ์์๋ ๊ฐ์ด ์์ผ๋ฏ๋ก replay ์์ ์ ๊ฐ์ ์์ฒญํด ์์ ํ๊ฒ ๋ฉ๋๋ค.
๊ณต์ ๋๊ณ ์๋ stream์ ์บ์์์ ์ญ์ ํฉ๋๋ค.
let shareObservable =
Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
.do(onNext: {
print($0)
})
.take(5)
.debug("share")
.share(replay: 2, scope: .forever)
shareObservable
.map { $0 > 0 }
.subscribe { print($0) }
.disposed(by: disposeBag)
shareObservable
.map { "Count:\($0)" }
.subscribe { print($0) }
.disposed(by: disposeBag)
DispatchQueue.main.asyncAfter(deadline: .now() + 5, execute: {
shareObservable
.map { "new: \($0)"}
.subscribe { print($0) }
.disposed(by: self.disposeBag)
})
/*
share -> subscribed
0
share -> Event next(0)
next(false)
next(Count:0)
...
share -> Event completed
share -> isDisposed
completed
completed
next(new: 3) // replay๋ฅผ 2๋ก ์ค์ ํด๋์ด 2๊ฐ์ ์์ดํ
์ ์ ๋ฌ
next(new: 4)
completed
*/
Swift
๋ณต์ฌ
replay
subscription์ ๊ณต์ ์ ์ง์ ํ ๋ฒํผ ํฌ๊ธฐ๋งํผ ์ด๋ฒคํธ๋ฅผ ์ ์ฅํ๊ณ ์ ๋ฌ
share์์ ๋งค๊ฐ๋ณ์๋ก ์ฌ์ฉํ replay์ ๋ปํ๋ ๋ฐ๊ฐ ๋์ผํฉ๋๋ค. ๋งค๊ฐ๋ณ์๋ก ์ด๋ฒคํธ๋ฅผ ์ ์ฅํ๊ณ ์ถ์ ๋งํผ ๋ฒํผ ํฌ๊ธฐ๋ฅผ ์ง์ ํ๋ฉด ๋ฉ๋๋ค.
ConnectableObservable์ด๊ธฐ ๋๋ฌธ์ connect ๋ฉ์๋๋ฅผ ํตํด์ ๋ถ๋ฆฌ๊ธฐ ์ ๊น์ง๋ ๊ตฌ๋
์ ์์ ๊ด๊ณ ์์ด ์๋ฌด๊ฒ๋ ๋ฐฉ์ถํ์ง ์์ต๋๋ค.
let bellyClock = Observable<Int>
.interval(.seconds(1), scheduler: MainScheduler.instance)
.take(until: { $0 == 6 }, behavior: .inclusive)
let replay = bellyClock.replay(2) // replayAll()๋ฅผ ์ฌ์ฉํ๊ฒ ๋๋ฉด ๋ชจ๋ ์ด๋ฒคํธ๋ฅผ ๋ฐฉ์ถ
_ = replay.connect()
replay
.subscribe(onNext: { time in
print("๋ฐฐ๊ผฝ ์๊ณ ์ธ๋ฆฐ๋ค!! \(time)")
})
.disposed(by: disposeBag)
replay
.delaySubscription(.seconds(3), scheduler: MainScheduler.instance)
.subscribe(onNext: { time in
print("๋๋ฒ์งธ ๋ฐฐ๊ผฝ ์๊ณ ์ธ๋ฆฐ๋ค!! \(time)")
})
.disposed(by: disposeBag)
/*
๋ฐฐ๊ผฝ ์๊ณ ์ธ๋ฆฐ๋ค!! 0
๋ฐฐ๊ผฝ ์๊ณ ์ธ๋ฆฐ๋ค!! 1
๋ฐฐ๊ผฝ ์๊ณ ์ธ๋ฆฐ๋ค!! 2
๋๋ฒ์งธ ๋ฐฐ๊ผฝ ์๊ณ ์ธ๋ฆฐ๋ค!! 1
๋๋ฒ์งธ ๋ฐฐ๊ผฝ ์๊ณ ์ธ๋ฆฐ๋ค!! 2
๋ฐฐ๊ผฝ ์๊ณ ์ธ๋ฆฐ๋ค!! 3
๋๋ฒ์งธ ๋ฐฐ๊ผฝ ์๊ณ ์ธ๋ฆฐ๋ค!! 3
๋ฐฐ๊ผฝ ์๊ณ ์ธ๋ฆฐ๋ค!! 4
๋๋ฒ์งธ ๋ฐฐ๊ผฝ ์๊ณ ์ธ๋ฆฐ๋ค!! 4
๋ฐฐ๊ผฝ ์๊ณ ์ธ๋ฆฐ๋ค!! 5
๋๋ฒ์งธ ๋ฐฐ๊ผฝ ์๊ณ ์ธ๋ฆฐ๋ค!! 5
๋ฐฐ๊ผฝ ์๊ณ ์ธ๋ฆฐ๋ค!! 6
๋๋ฒ์งธ ๋ฐฐ๊ผฝ ์๊ณ ์ธ๋ฆฐ๋ค!! 6
*/
Swift
๋ณต์ฌ
publish
Observable์ ์ํ์ค๋ฅผ ํ๋์ Subject๋ฅผ ํตํด multicast๋ก ์ด๋ฒคํธ๋ฅผ ์ ๋ฌ
publish๋ multicast์ publishSubject๊ฐ ํฉ์ณ์ง ๋ฐฉ์์
๋๋ค. publish๋ multicast์ publishSubject๋ฅผ ํจ๊ป ์ฌ์ฉํ๋ ๋ฐฉ์์ ํ๋์ ๋ฉ์๋๋ก ํจ์ถ์์ผ์ค๋๋ค.
โข
multicast + publishSubject๋ฅผ ์ฌ์ฉํ๋ ๋ฐฉ์
let bellyClock = Observable<Int>
.interval(.seconds(1), scheduler: MainScheduler.instance)
.take(until: { $0 == 6 }, behavior: .inclusive)
let subject = PublishSubject<Int>()
let multicast = bellyClock.multicast(subject)
_ = multicast.connect()
multicast
.subscribe(onNext: { time in
print("๋ฐฐ๊ผฝ ์๊ณ ์ธ๋ฆฐ๋ค!! \(time)")
})
.disposed(by: disposeBag)
multicast
.delaySubscription(.seconds(3), scheduler: MainScheduler.instance)
.subscribe(onNext: { time in
print("๋๋ฒ์งธ ๋ฐฐ๊ผฝ ์๊ณ ์ธ๋ฆฐ๋ค!! \(time)")
})
.disposed(by: disposeBag)
Swift
๋ณต์ฌ
โข
publish๋ฅผ ์ฌ์ฉํ๋ ๋ฐฉ์
let bellyClock = Observable<Int>
.interval(.seconds(1), scheduler: MainScheduler.instance)
.take(until: { $0 == 6 }, behavior: .inclusive)
let multicast = bellyClock.publish()
_ = multicast.connect()
multicast
.subscribe(onNext: { time in
print("๋ฐฐ๊ผฝ ์๊ณ ์ธ๋ฆฐ๋ค!! \(time)")
})
.disposed(by: disposeBag)
multicast
.delaySubscription(.seconds(3), scheduler: MainScheduler.instance)
.subscribe(onNext: { time in
print("๋๋ฒ์งธ ๋ฐฐ๊ผฝ ์๊ณ ์ธ๋ฆฐ๋ค!! \(time)")
})
.disposed(by: disposeBag)
Swift
๋ณต์ฌ
๋ ๊ฐ์ง ๋ฐฉ์ ๋ชจ๋ ๊ฐ์ Output์ ๋ฐฉ์ถํฉ๋๋ค.
/*
๋ฐฐ๊ผฝ ์๊ณ ์ธ๋ฆฐ๋ค!! 0
๋ฐฐ๊ผฝ ์๊ณ ์ธ๋ฆฐ๋ค!! 1
๋ฐฐ๊ผฝ ์๊ณ ์ธ๋ฆฐ๋ค!! 2
๋ฐฐ๊ผฝ ์๊ณ ์ธ๋ฆฐ๋ค!! 3
๋๋ฒ์งธ ๋ฐฐ๊ผฝ ์๊ณ ์ธ๋ฆฐ๋ค!! 3
๋ฐฐ๊ผฝ ์๊ณ ์ธ๋ฆฐ๋ค!! 4
๋๋ฒ์งธ ๋ฐฐ๊ผฝ ์๊ณ ์ธ๋ฆฐ๋ค!! 4
๋ฐฐ๊ผฝ ์๊ณ ์ธ๋ฆฐ๋ค!! 5
๋๋ฒ์งธ ๋ฐฐ๊ผฝ ์๊ณ ์ธ๋ฆฐ๋ค!! 5
๋ฐฐ๊ผฝ ์๊ณ ์ธ๋ฆฐ๋ค!! 6
๋๋ฒ์งธ ๋ฐฐ๊ผฝ ์๊ณ ์ธ๋ฆฐ๋ค!! 6
*/
Swift
๋ณต์ฌ
buffer, window
์ด๋ฒคํธ๋ฅผ ๋ฌถ์์ผ๋ก ์ ๋ฌ
๋ ์ฐ์ฐ์์ ์ธ์๋ ์์ ํ ๋์ผํ์ง๋ง, ๋ฐํ๊ฐ์ด ๋ค๋ฅด๊ฒ ๋ํ๋ฉ๋๋ค.
func buffer(timeSpan: RxTimeInterval, count: Int, scheduler: SchedulerType)
-> Observable<[Element]>
func window(timeSpan: RxTimeInterval, count: Int, scheduler: SchedulerType)
-> Observable<Observable<Element>>
Swift
๋ณต์ฌ
buffer๋ ๋ฐฐ์ด ํํ๋ก ์ด๋ฒคํธ๋ฅผ ๋ฐฉ์ถํ์ง๋ง, window๋ ๋ค๋ฅธ Observable ํํ๋ก ์ด๋ฒคํธ๋ฅผ ๋ฐฉ์ถํฉ๋๋ค.
buffer์ window๋ ๋ ๊ฐ์ง ์กฐ๊ฑด ์ค 1๊ฐ์ง๊ฐ ๋ง์กฑ๋๊ธฐ ์ ๊น์ง๋ ์ด๋ฒคํธ๋ฅผ ๋ฐฉ์ถํ์ง ์์ต๋๋ค.
โต ์ฒซ ๊ตฌ๋
์์ ํน์ ๊ฐ์ฅ ์ต๊ทผ ์ด๋ฒคํธ ๋ฐ์ ํ๋ก timespan์ผ๋ก ์ง์ ํ ์๊ฐ์ด ์ง๋ฌ์ ๊ฒฝ์ฐ
โถ ์ฒซ ๊ตฌ๋
์์ ํน์ ๊ฐ์ฅ ์ต๊ทผ ์ด๋ฒคํธ ๋ฐ์ ํ๋ก ์๋ณธ Observable์์ count๋งํผ์ ์ด๋ฒคํธ๊ฐ ๋ฐ์ํ ๊ฒฝ์ฐ
ํ๋์ ์กฐ๊ฑด์ ๋ง์กฑํ๊ธฐ ์ ๊น์ง๋ ์๋ณธ Observable์ ์ด๋ฒคํธ๋ ๋ด๋ถ ๋ฒํผ์ ์ ์ฅ๋ฉ๋๋ค. ์ด๋ฒคํธ ๋ฐ์ ์์ ์ด ๋๋ฉด ํด๋น ๋ฒํผ์ ์ด๋ฒคํธ๋ค์ ๋ชจ๋ ๋ด๋ณด๋ด๊ณ ๋ฒํผ๋ฅผ ๋น์ฐ๊ณ ๋ค์ ํ์ด๋จธ(timespan)๋ฅผ ์์ํฉ๋๋ค.
buffer, window๋ฅผ ์ฌ์ฉํ๋ฉด ์ด๋ฒคํธ๋ฅผ ํน์ ํ์๋ ์๊ฐ ๋จ์๋ก ๋ฌถ์ด์ ํ๊บผ๋ฒ์ ์ฒ๋ฆฌํ ์ ์๊ฒ ๋ฉ๋๋ค.
โข
buffer
let timer1 = Observable<Int>
.interval(.seconds(1), scheduler: MainScheduler.instance)
.take(until: { $0 == 5 }, behavior: .inclusive)
.map({ "์ธ๊ณ ์๋ค \($0)" })
timer1
.buffer(timeSpan: .seconds(3), count: 2, scheduler: MainScheduler.instance)
.subscribe { event in
switch event {
case let .next(value):
print(value)
default:
print("finished")
}
}.disposed(by: disposeBag)
/*
["์ธ๊ณ ์๋ค 0", "์ธ๊ณ ์๋ค 1"]
["์ธ๊ณ ์๋ค 2", "์ธ๊ณ ์๋ค 3"]
["์ธ๊ณ ์๋ค 4", "์ธ๊ณ ์๋ค 5"]
[]
finished
*/
Swift
๋ณต์ฌ
โข
window
let timer1 = Observable<Int>
.interval(.seconds(1), scheduler: MainScheduler.instance)
.take(until: { $0 == 5 }, behavior: .inclusive)
.map({ "์ธ๊ณ ์๋ค \($0)" })
timer1
.window(timeSpan: .seconds(3), count: 2, scheduler: MainScheduler.instance)
.subscribe { [weak self] event in
switch event {
case let .next(observable):
observable
.subscribe { e in
switch e {
case let .next(value):
print(value)
default:
print("inner finished")
}
}
.disposed(by: self?.disposeBag ?? DisposeBag())
default:
print("finished")
}
}.disposed(by: disposeBag)
/*
์ธ๊ณ ์๋ค 0
์ธ๊ณ ์๋ค 1
inner finished
์ธ๊ณ ์๋ค 2
์ธ๊ณ ์๋ค 3
inner finished
์ธ๊ณ ์๋ค 4
์ธ๊ณ ์๋ค 5
inner finished
inner finished
finished
*/
Swift
๋ณต์ฌ