티스토리 뷰

UWP & Windows App

Reactive Extension part2

kaki104 2014. 8. 25. 23:56
반응형

Reactive Extension part2

 

시작이 반이라는 말이 있다. 이는 로또를 사면 50%의 확율이 생기는 것과 같은 이치이다! 로또를 사면 '된다'와 '않된다'의 기대값을 가질 수 있지만, 로또를 사지 않으면 무조건 않된다. 그렇치 않은가?

 

금주 앱 개발 모임에 참석 했을 때 여러분들이 Rx에 대한 관심을 보여주었다. 하하하 감사드린다. 그런데, 공통적인 의견은 포스트를 봐도 무슨 소리인지 감을 잡을 수 없다는 내용이... 쿨럭.. 쿨럭..^^;;; 

 

Rx에 관심을 가지시는 모든 분들께 최소한 감은 잡을 수 있도록 포스트를 쓰도록 하겠다. 음..물론 모든 포스트를 읽으셔야 하고, 그 포스트가 100개가 될지 모른 다는 것이 함정이지만...

 

"함정 팔꺼냐?"

 

"설마;;"

 

"함정 파라..않그러면 내가 너 무덤을 파주마 푸하하"

 

"컹.........."

 

 

1. Rx HOL .NET.pdf 다운로드

Reactive Extensions for .NET Resources and Community

http://msdn.microsoft.com/en-US/data/gg577612 

 

페이지로 이동해서 좌측 중간에 있는 Hands-on Lab Reactive Extensions for .NET 링크를 눌러서 Rx HOL .NET.pdf 파일을 다운로드 받도록 한다. 처음 Rx를 접하는 개발자들에게 좋은 내용을 담고 있어서, pdf 내용을 중심으로 설명을 하려고 한다.

 

구글의 도움으로 번역을 하고, 예제로 사용할 소스 작성하고 테스트도 하고, 그 내용을 기준으로 포스트를 작성하느랴 포스트 하나 작성하는데 일주일 정도의 시간이 걸린 것 같다. 완벽한 번역을 하는 포스트가 아닌, 핵심 내용을 설명하는 포스트이니, 혹시 번역이나 의역에 오류가 있으면 리플로 남겨 주기 바란다.

 

* pdf에서는 VisualStudio 2010을 기준으로 설명하고 있는데, 포스트에서는 VisualStudio 2013을 기준으로 설명한다.

 

* 솔루션은 part1에서 사용한 솔루션에 프로젝트를 추가해서 진행하기 바라는 마음으로 일부러 아래에서 사용한 소스를 첨부하지 않았다.

 

 

2. 기본 인터페이스

 

    public interface IObservable<out T>
    {
        IDisposable Subscribe(IObserver<T> observer);
    }

 

    public interface IObserver<in T>
    {
        void OnCompleted();
        void OnError(Exception error);
        void OnNext(T value);
    }

 

Rx의 기본이되는 Observable는 IObervable, IObserver 2개가 기본 인터페이스로 구성이 되며, IObservable은 data source를 담당하고, IOberver는 data 수신을 담당한다. 중요한 부분을 정리해 보면 아래와 같다.

 

2-1. 하나의 IObservable에 여러개의 IObserver가 존재할 수 있다. Subscribe를 하는 만큼 생기기 때문인데, Subscribe은 언제 어디서든 가능하다. Subscribe이 되어있다면, Notification이 발생할 때 마다 수신 할 수 있다.

 

2-2. 구독해제는 Subsribe메소드의 결과값인 IDisposable 객체를 Dispose()하면 된다. 

 

2-3. IObserver가 수신할 수 있는 Notification은 OnNext, OnError, OnCompleted 이렇게 3가지 이며, OnNext는 0~무한대로 수신이 가능하고, OnCompleted는 마지막에 한번, OnError는 오류가 발생했을 때 한번 수신된다.

 

2-4. IObservable는 한번에 하나의 data를 IObserver로 push하는데 이러한 과정을 옵저버블 스퀜스(Observable sequences)라고 한다.

 

 

 

3. Creating observable sequences

 

Rx의 기초를 이해하기 위해 pdf에 있는 기본적인 예제들을 따라 해보도록 하겠다. 우선 솔루션에 Console project를 추가하고, 프로젝트에 Rx를 추가해 준다.

 

3-1. Nuget package에서 Rx로 검색후 선택하기 

 

3-2. 솔루션에 Rx를 이미 설치해 놓았다면, Reactive Extension - Main Library를 선택하고 Manage 버튼을 클릭  

 

3-3. Select Projects창에서 새로 추가한 프로젝트를 선택하고 OK 클릭

 

3-4. 새로 추가한 프로젝트를 시작 프로젝트로 설정

 

 

처음 입력할 코딩은 아래와 같다.

 

        static void Main(string[] args)
        {

            //이 부분을 변경하면서 OnNext, (OnCompleted, OnError)가 어떻게 발생하는지 확인한다.
            IObservable<int> source = Observable.Empty<int>();

 

            IDisposable subscription = source.Subscribe(
                x => Console.WriteLine("OnNext: {0}", x),
                ex => Console.WriteLine("OnError : {0}", ex.Message),
                () => Console.WriteLine("OnCompleted")
                );

            Console.WriteLine("Press ENTER to unsubscribe...");
            Console.ReadLine();

            subscription.Dispose();
        }

 

 

IObservable<int> source = Observable.Empty<int>();

 

source를 구성하는 IObservable을 빈 값으로 채우고 f5키를 눌러 실행해보면 다음과 같은 결과를 볼 수 있다.

 

 

source에 아무런 데이터가 없으니 바로 OnCompleted가 실행되었다. 데이터가 존재 했다면, OnNext xxx라는 글씨가 나온 후에 OnComplete가 출력되었을 것이다.

 

여기서 한가지 의문이 생기는데, 지난 part1에서는 마우스의 이벤트를 기다리기 때문에 완료가 되지 않았는데, 왜 여기서는 완료가 되어 버리는 것일까?

 

위의 예제에서 사용한 것은 데이터가 확정된 IObservable 객체를 사용한 것으로 cool observable이라고 한다. 어떤 데이터의 구성이 이미 정해져있기 때문에 바로 하나하나 끄집어 내서 OnNext를 실행하게 되는데, 여기서는 데이터가 없기 때문에 OnNext도 한번 실행하지 않고, 바로 OnCompleted를 실행하게 된 것이다.

 

part1에서 사용했던 pointer move event를 관찰하는 IObservable객체를 hot observable이라고 하는데, 어떤 데이터가 입력될지 정해진 것이 없으니 계속 관찰만 하고 있다가, 데이터가 입력이 되는데로 OnNext를 계속 실행하게 되는 것이다.

 

 

IObservable<int> source = Observable.Return(42);

 

위의 문장으로 교체를 하면 아래와 같은 결과가 나온다.

 

 

1개의 데이터를 가지고 있기 때문에 실행을 하자마자 OnNext로 42라는 숫자를 보내고, 또 다른 데이터를 찾아보지만 찾지 못해서 OnCompleted를 호출하고 완료되는 것을 알 수 있다. 가장 평범한 모습을 보여준 것이라 할 수 있다.

 

이제 소스를 아래와 같이 수정 한 후 실행해보자

 

            IObservable<int> source = Observable.Range(5, 3);

 

            IDisposable subscription = source.Subscribe(
                x => Console.WriteLine("OnNext : {0}", x),
                ex => Console.WriteLine("OnError : {0}", ex.Message),
                () => Console.WriteLine("OnCompleted")
                );

            Console.WriteLine("Press ENTER to unsubscribe...");
            Console.ReadLine();

            subscription.Dispose();

 

            //추가 테스트
            IDisposable subscription2 = source.Subscribe(
            x => Console.WriteLine("OnNext2 : {0}", x),
            ex => Console.WriteLine("OnError2 : {0}", ex.Message),
            () => Console.WriteLine("OnCompleted2")
            );

            Console.WriteLine("Press ENTER to unsubscribe2...");
            Console.ReadLine();

            subscription2.Dispose();

 

 

Range는 cood observable로서 Range(5,3)은 5부터 3개의 데이터를 생성한 IObservable객체를 생성한다.

 

위의 예제는 첫 Subscribe에 의해 5, 6, 7이 출력이 되고, Enter키를 눌러서 subscription을 Dispose시킨 후 두번째 Subscribe에 어떠한 반응을 보이는 지 확인하기 위해서 만들어졌다. 옵저버블 스퀜스라고 해서, 한번 출력되고 나서, 리셋이 않되는 것 아닌가 하고 테스트를 해보았는데, 잘된다.

 

* pdf 9page를 보면 Run메소드라는 것을 사용하는 예제가 있는데, 지금은 존재하지 않는 메소드이다.

 

마지막은 디버깅에 관한 내용으로 아래의 소스를 입력해보자.

 

        private static void Test2()
        {
            IObservable<int> source = Observable.Generate(0,
                i => i < 5,
                i => i + 1,
                i => i * i,
                i => TimeSpan.FromSeconds(i));

            using (
                source.Subscribe(
                    x => Console.WriteLine("OnNext : {0}", x),
                    ex => Console.WriteLine("OnError : {0}", ex.Message),
                    () => Console.WriteLine("OnCompleted")))
            {
                Console.WriteLine("Press ENTER to unsubscribe...");
                Console.ReadLine();
            }
        } 

 

* pdf에서는 GenerateWithTime이라는 메소드를 사용하는데, 이 메소드는 Generate메소드에 통합되었다.

 

Generate는 조건을 주어서 데이터를 생성시키는 메소드로 여기서 조건은 시작값 0에서 부터 5보다 작을 때까지 1씩 증가하며, 결과는 i * i를 한 데이터를 만들어 낸다. 그런데 마지막에 TimeSpan.FromSeconds에 의해서 각 각의 데이터를 만들어 내는 시간이 서로 다르다. 0초, 1초, 2초, 3초, 4초의 간격으로 생성된다

 

.

그런데, 이전 예제에서는 Subscribe가 되면 바로 데이터를 OnNext를 출력하고 모든 데이터를 출력 후에 OnComplete를 호출하고 종료된 후에 Console.WriteLine 문장이 실행이 되는 동기(synchronous) 예제 였다. 그러나 이번 예제는 약간 다른 결과 메시지가 출력되는데, 이는 Subscribe가 비동기(asynchronous)에서 동작하도록 만든 것이기 때문이다.

 

Console.WriteLine("OnNext : {0}", x)

 

이 위치에 커서를 이동 시키고 f9를 눌러서 breake point를 설정한 후 f5키를 눌러서 실행한다.

 

 

결과를 보면 using으로 감싼 블럭 내부에 있는 Console.WriteLine이 먼저 실행이 되고, 브레이크 포인트에 멈추어 진것을 볼 수 있는데, 좀더 자세히 보기 위해 Thread 창을 열어서 보기로 한다.

 

 

Category에 Main Thread가 보이고, 바로 아래 Worker Thread에 깃발이 보이는데, 메인 스레드는 Console.ReadLine()에 멈추어 있고, 워커 스레드가 Console.WriteLine("OnNext : {0}", x) 부분에 멈추어져 있다. pdf에서는 여기서 한발짝 더 들어가서 Location을 확장해서 보고 있는데, 그렇게 하기 위해서는 Debug -> Option and Settings -> Enable My Code의 체크를 풀어주면, 내 코드가 아닌 부분에 대해서도 확인이 가능하다.

 

 

위와 같이 변경 한 후 OK를 눌러서 창을 닫은 후 Location을 확장해서 보면

 

6012 12 Worker Thread Worker Thread RxSample.Console1.exe!RxSample.Console1.Program.Test2.AnonymousMethod__4 Normal
RxSample.Console1.exe!RxSample.Console1.Program.Test2.AnonymousMethod__4(int x) Line 28 
System.Reactive.Core.dll!System.Reactive.AnonymousSafeObserver<int>.OnNext(int value) 
System.Reactive.Linq.dll!System.Reactive.Linq.ObservableImpl.Generate<int,int>.Delta.InvokeRec(System.Reactive.Concurrency.IScheduler self, int state) 
System.Reactive.Core.dll!System.Reactive.Concurrency.DefaultScheduler.Schedule<int>.AnonymousMethod__0(object _) 
System.Reactive.PlatformServices.dll!System.Reactive.Concurrency.ConcurrencyAbstractionLayerImpl.QueueUserWorkItem.AnonymousMethod__0(object _) 
mscorlib.dll!System.Threading.QueueUserWorkItemCallback.WaitCallback_Context(object state) 
mscorlib.dll!System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state, bool preserveSyncCtx) 
mscorlib.dll!System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext executionContext, System.Threading.ContextCallback callback, object state, bool preserveSyncCtx) 
mscorlib.dll!System.Threading.QueueUserWorkItemCallback.System.Threading.IThreadPoolWorkItem.ExecuteWorkItem()  
mscorlib.dll!System.Threading.ThreadPoolWorkQueue.Dispatch() 
mscorlib.dll!System.Threading._ThreadPoolWaitCallback.PerformWaitCallback() 

 

위의 Location 리스트를 보면, OnNext, Generate, Concurrency Schedule, ExecutionContext.Run, ThreadPoolWorkQueue.Dispatch 등이 차례대로 수행되면서, 워크 스레드와 메인 스레드간에 발생할 수 있는 문제들을 자동으로 해결하면서, 비동기 처리를 하게 된다.

 

기초적인 사용 방법에 대해서 살펴 보았다. 포스트에서 다루지 않은 몇가지는 직접 코딩을해서 테스트를 진행하도록 하고, 너무 쉽다고 대충 보고 넘기지 말고, 하나하나를 음미 하면서 익혀 두기 바란다. 다음에는 이벤트를 이용하는 방법에 대해서 알아보도록 하겠다.

 

"엇 벌써 끝내는 거냐?"

 

"ㅇㅇ"

 

"너무 짧은것 같아!"

 

"짧고 굵게!"

 

"짧고 굵게 맞을래?"

 

"그래도 이거 쓴다고 일주일 고생했단 말야~"

 

"그래서?"

 

"다음에는 더 잘 쓴다고.....ㅜㅜ"

 

 

 

 

 

 

 

 

반응형

'UWP & Windows App' 카테고리의 다른 글

ReactiveUI part1  (0) 2014.09.29
Reactive Extension part5  (2) 2014.09.19
Reactive Extension part4  (0) 2014.09.14
Reactive Extension part 3  (2) 2014.09.13
Reactive Extension part1  (0) 2014.08.24
댓글