| 
  • If you are citizen of an European Union member nation, you may not use this service unless you are at least 16 years old.

  • You already know Dokkio is an AI-powered assistant to organize & manage your digital files & messages. Very soon, Dokkio will support Outlook as well as One Drive. Check it out today!

View
 

Rx Extensions

Page history last edited by MrJavaGuy 12 years, 10 months ago Saved with comment

Links

 

Notes 

  • What is it? 
    • LINQ: {1, 2, 3, 4, } .Select(x => x * 5) <- great when you have all the elements
    • RX Lets you do this with events where you don't know the future 
  • Trick: LINQ Pad understands IObservables. 
  • IObservable<T>.First() <- blocking call
  • You can't tell if an Observable is done, but you can OnCompleted, OnNext, OnError 
  • The only thing you can do to an observable is subscribe to it
  • Threads
    • You don't specify where your code's going to execute
    • ..unless you need to, then you use a scheduler (.ObserveOn(Scheduler._)).
    • immediate scheduler by default, but depends on operator
    • vs. TPL: TPL is more specific to the idea of a threadpool and running on threads. Rx isn't, so much. 
    • [ThreadStatic] on windows phone doesn't work, so there is a bug in Rx on the windows phone 
  • Operators 
    • Subscribe <- gives an IDisposable you can use to cancel the subscription
    • Aggregate <- special IObservable aggregate, yields another IObservable
    • Do <- run for it's side effect
    • DoWhile
    • Defer <- takes a hot observable and makes it cold
    • Repeat <- doesn't work with hot observables, so you defer
    • WhenAny(x => x.Foo, y => y.Bar, (foo, bar) => {}).SelectMany(WebService).ToProperty(this, x => x.Baz) 
    • Switch <- only listen to the latest thing that was started
      • depends on implementation, but usually, if something is not being listened to, it tries to cancel itself as quickly as possible 
  • IQObservable
    • IQueryable gives you the whole expression
    • IQObservable gives you the observable. Ex.: WMI "tell me when the battery is < 20%", Twitter API "give me @foo" 
  • Hot vs. Cold
    • Hot <- happening whether you listen or not
      • Rx wraps this so you can get events even if you hadn't subscribed yet 
    • Cold
      • gives you a new result each time you subscribe
      • lazy
      • executes when you enumerate 
  • Retries and Timeouts
    • .Retry(3) <- tries again 3 times if it fails
    • .Timeout(3.Seconds()).Retry(3) <- retries 3 times with a 3 second timeout
  • Abort Semantics
    • When something fails, it bails out early (it's lazy) 

Additional Notes:

RX.NET

 

Linq is good if you have the data ahead of time (i.e. compiler)

 

[1,2,3].Select(x => x* 5)  >>> [5,10,15]

 

RX is good if you do not know the data yet (i.e. stock ticker).

 

[Series of mouse events].Where(m =>InsideWindow(m))

 

Example

 

Iobservable

OnNext(T value)

OnCompleted()

OnError(Exception)

 

Write an Async Functions

 

Download a web page as a string

 

Iobservable<string> DownloadWebSite(string uri);

 

DownloadWebSite("…").First();

 

First in RX is blocking

 

String[] sites = { "http://foo", "http://bar","http://baz" }

 

Subscribe will trigger when each download happens

Sites.ToObservable().SelectMany(site => site.DownloadWebSite(site)).Subscribe(siteString => console.WriteLine(siteString ));

 

Subscribe will trigger when all downloads are done (Aggregate)

Sites.ToObservable().SelectMany(site => site.DownloadWebSite(site)).Aggregate(new List<string>(), (acc, x) => acc.Add(x)).Subscribe();

 

Hot and Cold Observable

 

Hot are things like  mouse move (they happen if you subscribe or not).

 

Cold are things like the code above where the download does not happen until subscribed.

 

AsyncSubject - solve the race condition if you subscribe after the "event" is done.

 

Finally is a function that is called when done.

 

Retry(X) will retry if event of fails

 

Timeout()

 

DoWhile() continue retry until some event.

 

Repeat() continual re-subscribe.

 

Defer() for hot observable allow data to be made cold.

 

Ischeduler is where things happen (example TaskPoolScheduler), what context.

 

ObserveOn(Scheduler.Dispatcher) - guaranteed on the UI thread.

 

Reactive UI (MVVM OSS project).

 

Example:

 

someObj.WhenAny(x => x.foo, x=> x.bar, (foo, barr) => {}).ToProperty(this, x => x.Baz);

 

Color picker (Red, Green, Blue)

 

someObj.WhenAny(x => x.foo, x=> x.bar, (foo, barr) => {}).ToProperty(this, x => x.Baz).SelectMany(WebService).ToProperty());

 

Cancellation:

Unsubscribe will stop listening.

 

.SelectMany( x=> webService).Switch()

 

 ------------------------------>

   ------->

      ---------------------------------->

             --------->

 

Switch select latest thing.

 

If you write an Observable, must have an action when it is disposed (could do nothing).

 

RX was written by Hascal programmers.

 

Subject is an observable that you control.

 

Observable.Create((subj) => {

Subj.OnNext(1);

Subj.OnNext(2);

Subj.OnNext(3);

Subj.OnComplete();

Return() => subj;

});

 

Func<IObserver<T>, Action>

 

Simpler

 

Subject<int> foo;

foo.Subscribe(Console.WriteLine);

foo.OnNext(3);

….

foo.OnComplete();

 

LinqPad can understand RX (copy binaries into).

  

Comments (0)

You don't have permission to comment on this page.