• If you are citizen of an European Union member nation, you may not use this service unless you are at least 16 years old.

  • Stop wasting time looking for files and revisions. Connect your Gmail, DriveDropbox, and Slack accounts and in less than 2 minutes, Dokkio will automatically organize all your file attachments. Learn more and claim your free account.


Rx Extensions

Page history last edited by MrJavaGuy 9 years, 6 months ago Saved with comment




  • What is it? 
    • LINQ: {1, 2, 3, 4, } .Select(x => x * 5) <- great when you have all the elements
    • RX Lets you do this with events where you don't know the future 
  • Trick: LINQ Pad understands IObservables. 
  • IObservable<T>.First() <- blocking call
  • You can't tell if an Observable is done, but you can OnCompleted, OnNext, OnError 
  • The only thing you can do to an observable is subscribe to it
  • Threads
    • You don't specify where your code's going to execute
    • ..unless you need to, then you use a scheduler (.ObserveOn(Scheduler._)).
    • immediate scheduler by default, but depends on operator
    • vs. TPL: TPL is more specific to the idea of a threadpool and running on threads. Rx isn't, so much. 
    • [ThreadStatic] on windows phone doesn't work, so there is a bug in Rx on the windows phone 
  • Operators 
    • Subscribe <- gives an IDisposable you can use to cancel the subscription
    • Aggregate <- special IObservable aggregate, yields another IObservable
    • Do <- run for it's side effect
    • DoWhile
    • Defer <- takes a hot observable and makes it cold
    • Repeat <- doesn't work with hot observables, so you defer
    • WhenAny(x => x.Foo, y => y.Bar, (foo, bar) => {}).SelectMany(WebService).ToProperty(this, x => x.Baz) 
    • Switch <- only listen to the latest thing that was started
      • depends on implementation, but usually, if something is not being listened to, it tries to cancel itself as quickly as possible 
  • IQObservable
    • IQueryable gives you the whole expression
    • IQObservable gives you the observable. Ex.: WMI "tell me when the battery is < 20%", Twitter API "give me @foo" 
  • Hot vs. Cold
    • Hot <- happening whether you listen or not
      • Rx wraps this so you can get events even if you hadn't subscribed yet 
    • Cold
      • gives you a new result each time you subscribe
      • lazy
      • executes when you enumerate 
  • Retries and Timeouts
    • .Retry(3) <- tries again 3 times if it fails
    • .Timeout(3.Seconds()).Retry(3) <- retries 3 times with a 3 second timeout
  • Abort Semantics
    • When something fails, it bails out early (it's lazy) 

Additional Notes:



Linq is good if you have the data ahead of time (i.e. compiler)


[1,2,3].Select(x => x* 5)  >>> [5,10,15]


RX is good if you do not know the data yet (i.e. stock ticker).


[Series of mouse events].Where(m =>InsideWindow(m))





OnNext(T value)




Write an Async Functions


Download a web page as a string


Iobservable<string> DownloadWebSite(string uri);




First in RX is blocking


String[] sites = { "http://foo", "http://bar","http://baz" }


Subscribe will trigger when each download happens

Sites.ToObservable().SelectMany(site => site.DownloadWebSite(site)).Subscribe(siteString => console.WriteLine(siteString ));


Subscribe will trigger when all downloads are done (Aggregate)

Sites.ToObservable().SelectMany(site => site.DownloadWebSite(site)).Aggregate(new List<string>(), (acc, x) => acc.Add(x)).Subscribe();


Hot and Cold Observable


Hot are things like  mouse move (they happen if you subscribe or not).


Cold are things like the code above where the download does not happen until subscribed.


AsyncSubject - solve the race condition if you subscribe after the "event" is done.


Finally is a function that is called when done.


Retry(X) will retry if event of fails




DoWhile() continue retry until some event.


Repeat() continual re-subscribe.


Defer() for hot observable allow data to be made cold.


Ischeduler is where things happen (example TaskPoolScheduler), what context.


ObserveOn(Scheduler.Dispatcher) - guaranteed on the UI thread.


Reactive UI (MVVM OSS project).




someObj.WhenAny(x => x.foo, x=> x.bar, (foo, barr) => {}).ToProperty(this, x => x.Baz);


Color picker (Red, Green, Blue)


someObj.WhenAny(x => x.foo, x=> x.bar, (foo, barr) => {}).ToProperty(this, x => x.Baz).SelectMany(WebService).ToProperty());



Unsubscribe will stop listening.


.SelectMany( x=> webService).Switch()







Switch select latest thing.


If you write an Observable, must have an action when it is disposed (could do nothing).


RX was written by Hascal programmers.


Subject is an observable that you control.


Observable.Create((subj) => {





Return() => subj;



Func<IObserver<T>, Action>




Subject<int> foo;






LinqPad can understand RX (copy binaries into).


Comments (0)

You don't have permission to comment on this page.