Kefir.js

Kefir — is an FRP (functional reactive programming) library for JavaScript inspired by Bacon.js and RxJS with focus on high perfomance and low memory consumption.

Kefir has GitHub repository, where you can send pull requests, report bugs, and have fun reading source code.

If you spot a typo or grammar error, or know how to improve this documentation, please help the project by filing an issue or sending a pull request.

Downloads (0.2.6)

Main filekefir.js~ 30 kb
kefir.min.js~ 5 kb (when gzipped)
jQuery addonkefir-jquery.js~ 1 kb
kefir-jquery.min.js< 1 kb
All fileskefir-0.2.6.zipall files including documentation, demos, tests, source maps, etc.

You can also get edge versions from GitHub (.zip), but use it at your own risk

NPM and Bower

npm install kefir
bower install kefir

Demos and examples

Also almost any code snippet below can be ran in browser console on this page. So you can play with Kefir right now, just open up browser console.

Create a stream

emitterKefir.emitter()
Creates an emitter, that is ordinary stream, but also has additional methods: .emit(value) and .end(). Then you can easily send events to stream via emit().

var emitter = Kefir.emitter();
emitter.log(); // log events to console (see log)
emitter.emit(1);
emitter.emit(2);
emitter.end();
> [emitter] <value> 1
> [emitter] <value> 2
> [emitter] <end>

Emitter is the easiest way to create general purpose stream, but it doesn't give control over active state of stream (see active state). If you want to create general purpose stream and have control over active state, you should use fromBinder.

neverKefir.never()
Creates a stream, that already ended and will never produce any events.

var stream = Kefir.never();
stream.log();
> [never] <end:current>

laterKefir.later(wait, value)
Creates a stream, that produces single value after wait milliseconds then ends.

var stream = Kefir.later(1000, 1);
stream.log();
> [later] <value> 1
> [later] <end>

intervalKefir.interval(interval, value)
Creates a stream, that produces same value each interval milliseconds. Never ends.

var stream = Kefir.interval(1000, 1);
stream.log();
> [interval] <value> 1
> [interval] <value> 1
> [interval] <value> 1
...

sequentiallyKefir.sequentially(interval, values)
Creates a stream containing given values (array), delivered with given interval in milliseconds. Ends after all values delivered.

var stream = Kefir.sequentially(1000, [1, 2, 3]);
stream.log();
> [sequentially] <value> 1
> [sequentially] <value> 2
> [sequentially] <value> 3
> [sequentially] <end>

repeatedlyKefir.repeatedly(interval, values)
Creates a stream, that produces given values (array), with given interval in milliseconds. When all values emitted, it begins to produce them again from start. Never ends.

var stream = Kefir.repeatedly(1000, [1, 2, 3]);
stream.log();
> [repeatedly] <value> 1
> [repeatedly] <value> 2
> [repeatedly] <value> 3
> [repeatedly] <value> 1
> [repeatedly] <value> 2
> [repeatedly] <value> 3
> [repeatedly] <value> 1
...

fromPollKefir.fromPoll(interval, fn)
Creates a stream, that polls given fn function, with given interval in milliseconds, and emits values returned by fn. Never ends.

var start = new Date();
var stream = Kefir.fromPoll(1000, function(){ return new Date() - start });
stream.log();
> [fromPoll] <value> 1001
> [fromPoll] <value> 2002
> [fromPoll] <value> 3004
> [fromPoll] <value> 4006
> [fromPoll] <value> 5007
> [fromPoll] <value> 6007
...

withIntervalKefir.withInterval(interval, handler)
General method to create an interval based stream. Creates a stream, that call given handler function, with given interval in milliseconds. Handler is called with one argument — emitter object (it's similar to emitter stream but not actualy a stream, you cant read more about emitter object here).

var start = new Date();
var stream = Kefir.withInterval(1000, function(emitter) {
  var time = new Date() - start;
  if (time < 4000) {
    emitter.emit(time);   // emit a value
  } else {
    emitter.end();        // end the stream
  }
});
stream.log();
> [withInterval] <value> 1002
> [withInterval] <value> 2003
> [withInterval] <value> 3005
> [withInterval] <end>
...

You may call emitter.emit several times on each interval tick, or not call it at all.

fromCallbackKefir.fromCallback(callbackConsumer)

fromBinderKefir.fromBinder(subscribe)
Another method for creation general purpose stream, along with emitter. Unlike emitter it gives you control over active state of the stream.

Creates stream which call subscribe function on each activation, passing to it an emitter object. Then you can call emiter.emit or emitter.end at any time to emit value or end the stream. Subscribe function can also return an unsubscribe function, that will be called on deactivation of the stream. Read about active state to understand what activation and deactivation means.

var stream = Kefir.fromBinder(function(emitter) {
  console.log('!activation');
  var i = 0;
  var intervalId = setInterval(function() {
    emitter.emit(++i);
  }, 1000);
  return function() {
    console.log('!deactivation');
    clearInterval(intervalId);
  }
});
stream.log();
setTimeout(function() {
  stream.offLog(); // turn off logging to deactivate stream
}, 3500);
> !activation
> [fromBinder] <value> 1
> [fromBinder] <value> 2
> [fromBinder] <value> 3
> !deactivation

See also Custom stream demo as another fromBinder usage example.

Create a property

For now there is only one method that creates a property directly — constant(). But you can always convert any stream to a property using toProperty method.

constantKefir.constant(value)
Creates ended property, with specified current value.

var property = Kefir.constant(1);
property.log();
> [constant] <value:current> 1
> [constant] <end:current>

jQuery addon

jQuery functionality comes as an addon for core Kefir.js library. In order to use it you should include addon JavaScript file, that can be found in downloads section.

asKefirStream$(...).asKefirStream(eventName, [selector], [eventTransformer])
Creates a stream from events on a jQuery object. This methods mimics jQuery .on method with two exceptions: it not accepts data argument, and instead of handler function it accepts optional eventTransformer function, which, if provided, will be called on each event with same arguments and context as jQuery handler callback, and value returned by eventTransformer will be emitted to Kefir stream. If no eventTransformer provided, jQuery event object will be emited in stream.

var clicks = $('body').asKefirStream('click');
clicks.log();
> [asKefirStream] <value> jQuery.Event {originalEvent: MouseEvent...}
> [asKefirStream] <value> jQuery.Event {originalEvent: MouseEvent...}
> [asKefirStream] <value> jQuery.Event {originalEvent: MouseEvent...}

Example with optional arguments:

var clicksOnContainer = $('body').asKefirStream('click', '.container');
clicksOnContainer.log('[clicks on .container]');

var clicksPageX = $('body').asKefirStream('click', function(e) {return e.pageX});
clicksPageX.log('[e.pageX]');
> [clicks on .container] <value> jQuery.Event {originalEvent: MouseEvent...}
> [e.pageX] <value> 643
> [e.pageX] <value> 15
> [clicks on .container] <value> jQuery.Event {originalEvent: MouseEvent...}
> [e.pageX] <value> 721

asKefirProperty$(...).asKefirProperty(eventName, [selector], getter)

Convert observables

toPropertystream.toProperty([current])
Converts a stream to a property. Accepts optional current argument, which becomes current value of the property.

var emitter = Kefir.emitter();
var property = emitter.toProperty(0);
property.log();
emitter.emit(1);
emitter.emit(2);
emitter.end();
> [emitter.toProperty] <value:current> 0
> [emitter.toProperty] <value> 1
> [emitter.toProperty] <value> 2
> [emitter.toProperty] <end>

changesproperty.changes()
Converts a property to a stream. If property has current value, it will be ignored (stream's subscribers won't get it).

var emitter = Kefir.emitter();
var property = emitter.toProperty(0);
var changesStream = property.changes();
changesStream.log();
emitter.emit(1);
emitter.emit(2);
emitter.end();
> [emitter.toProperty.changes] <value> 1
> [emitter.toProperty.changes] <value> 2
> [emitter.toProperty.changes] <end>

Main observable* methods

* stream or property

onValueobs.onValue(fn)
Subscribes fn functions to values on an observable. If onValue called on a property, and property has current value, fn will be called immediately with property current value as argument.

var emitter = Kefir.emitter();
emitter.onValue(function(x) {  console.log('value:', x)  });
emitter.emit(1);
emitter.emit(2);
> value: 1
> value: 2

offValueobs.offValue(fn)
Unsubscribes fn from values on an observable.

onEndobs.onEnd(fn)
Subscribes fn functions to ending off an observable. If observable already ended, fn will be called immediately.

var emitter = Kefir.emitter();
emitter.onEnd(function(x) {  console.log('stream ended')  });
emitter.end();
> stream ended

offEndobs.offEnd(fn)
Unsubscribes fn from ending off an observable.

onAnyobs.onAny(fn)
Subscribes fn functions to both ending and values on an observable. Callback is called with event object as argument. Each event object contains three attributes — type, value, and current.

var emitter = Kefir.emitter();
emitter.onAny(function(event) {  console.log('event:', event)  });
emitter.emit(1);
emitter.emit(2);
emitter.end();
> event: Object {type: "value", value: 1, current: false}
> event: Object {type: "value", value: 2, current: false}
> event: Object {type: "end", value: undefined, current: false}

offAnyobs.offAny(fn)
Unsubscribes an onAny subscriber.

logobs.log([name])
Turns on logging of any events on an observable to browser console. Accepts optional name argument that will be shown in log if provided.

var emitter = Kefir.emitter();
emitter.log('myEmitter');
emitter.emit(1);
emitter.emit(2);
emitter.end();
> myEmitter <value> 1
> myEmitter <value> 2
> myEmitter <end>

offLogobs.offLog([name])
Turns off logging. If .log was called with name argument, offLog must be called with same name argument.

Modify an observable

All methods in this section create a new observable of same kind* from an original one. New observable applies some transformation to each event from original and emits result of transformation. In most cases a transformation is applied only to value events, but end event just passes through (i.e. new observable ends when original ends).

* For example if original observable was a stream, then new one also will be a stream. Same for properties. This rule has one exception for scan method, that always returns a property.

mapobs.map(fn)
Applies given fn function to each value from original observable and emits value returned by fn.

var emitter = Kefir.emitter();
emitter.map(function(x) {  return x + 1  }).log();
emitter.emit(1);
emitter.emit(2);
emitter.end();
> [emitter.map] <value> 2
> [emitter.map] <value> 3
> [emitter.map] <end>

mapToobs.mapTo(value)
On each value from original observable emits given value.
Shorthand for observable.map(function() {return value}).

var emitter = Kefir.emitter();
emitter.mapTo(5).log();
emitter.emit(1);
emitter.emit(2);
emitter.end();
> [emitter.mapTo] <value> 5
> [emitter.mapTo] <value> 5
> [emitter.mapTo] <end>

pluckobs.pluck(propertyName)
On each value from original observable emits value[propertyName].
Shorthand for observable.map(function(x) {return x.foo})

var emitter = Kefir.emitter();
emitter.pluck('num').log();
emitter.emit({num: 1});
emitter.emit({num: 2});
emitter.end();
> [emitter.pluck] <value> 1
> [emitter.pluck] <value> 2
> [emitter.pluck] <end>

invokeobs.invoke(methodName)
Just like .pluck, but instead of emitting value[propertyName] it emits value[methodName](), i.e. calls method methodName of each value object and emits whatever it returns.
Shorthand for observable.map(function(x) {return x.foo()})

var emitter = Kefir.emitter();
emitter.invoke('getNum').log();
emitter.emit({num: 1, getNum: function() {return this.num}});
emitter.emit({num: 2, getNum: function() {return this.num}});
emitter.end();
> [emitter.invoke] <value> 1
> [emitter.invoke] <value> 2
> [emitter.invoke] <end>

notobs.not()
Inverts every value from original observable using ! operator.
Shorthand for observable.map(function(x) {return !x})

var emitter = Kefir.emitter();
emitter.not().log();
emitter.emit(true);
emitter.emit(false);
emitter.end();
> [emitter.not] <value> false
> [emitter.not] <value> true
> [emitter.not] <end>

tapobs.tap(fn)
Just like .map applies given fn function to each value from original observable, but emits original value (not what fn returns).

var emitter = Kefir.emitter();
emitter.tap(function(x) {
  console.log('from tap fn:', x);
  return 5; // will be ignored
}).log();
emitter.emit(1);
emitter.emit(2);
emitter.end();
> from tap fn: 1
> [emitter.tap] <value> 1
> from tap fn: 2
> [emitter.tap] <value> 2
> [emitter.tap] <end>

filterobs.filter(predicate)
Filters values from original observable using given predicate function.

var emitter = Kefir.emitter();
emitter.filter(function(x) {  return x > 1  }).log();
emitter.emit(1);
emitter.emit(2);
emitter.end();
> [emitter.filter] <value> 2
> [emitter.filter] <end>

See also filterBy.

takeobs.take(n)
Emits first n values from original observable, then ends.

var emitter = Kefir.emitter();
emitter.take(2).log();
emitter.emit(1);
emitter.emit(2);
emitter.emit(3);
emitter.emit(4);
emitter.end();
> [emitter.take] <value> 1
> [emitter.take] <value> 2
> [emitter.take] <end>

takeWhileobs.takeWhile(predicate)
Emits values from original observable until given predicate function applied to a value returns false. Ends when predicate returns false.

var emitter = Kefir.emitter();
emitter.takeWhile(function(x) {  return x < 3  }).log();
emitter.emit(1);
emitter.emit(2);
emitter.emit(3);
emitter.emit(1);
emitter.end();
> [emitter.takeWhile] <value> 1
> [emitter.takeWhile] <value> 2
> [emitter.takeWhile] <end>

skipobs.skip(n)
Skips first n values from original observable, then emits all rest.

var emitter = Kefir.emitter();
emitter.skip(2).log();
emitter.emit(1);
emitter.emit(2);
emitter.emit(3);
emitter.emit(4);
emitter.end();
> [emitter.skip] <value> 3
> [emitter.skip] <value> 4
> [emitter.skip] <end>

skipWhileobs.skipWhile(predicate)
Skips values from original observable until given predicate function applied to a value returns false, then stops applying predicate to values and emits all of them.

var emitter = Kefir.emitter();
emitter.skipWhile(function(x) {  return x < 3  }).log();
emitter.emit(1);
emitter.emit(2);
emitter.emit(3);
emitter.emit(1);
emitter.end();
> [emitter.skipWhile] <value> 3
> [emitter.skipWhile] <value> 1
> [emitter.skipWhile] <end>

skipDuplicatesobs.skipDuplicates([comparator])
Skips duplicate values using === for comparison by default. Accepts optional comparator function, that, if provided, is used for comparison instead of ===.

var emitter = Kefir.emitter();
emitter.skipDuplicates().log();
emitter.emit(1);
emitter.emit(2);
emitter.emit(2);
emitter.emit(3);
emitter.end();
> [emitter.skipDuplicates] <value> 1
> [emitter.skipDuplicates] <value> 2
> [emitter.skipDuplicates] <value> 3
> [emitter.skipDuplicates] <end>

With custom comparator function:

var emitter = Kefir.emitter();
emitter.skipDuplicates(function(a, b) {
  return Math.round(a) === Math.round(b);
}).log();
emitter.emit(1);
emitter.emit(2);
emitter.emit(2.1);
emitter.emit(3);
emitter.end();
> [emitter.skipDuplicates] <value> 1
> [emitter.skipDuplicates] <value> 2
> [emitter.skipDuplicates] <value> 3
> [emitter.skipDuplicates] <end>

diffobs.diff(seed, fn)
On each value from original observable calls fn function with previous and current value as arguments. At first time calls fn with seed and current value. Emits whatever fn returns.

var emitter = Kefir.emitter();
emitter.diff(0, function(prev, next) {
  return next - prev;
}).log();
emitter.emit(1);
emitter.emit(2);
emitter.emit(2);
emitter.emit(3);
emitter.end();
> [emitter.diff] <value> 1
> [emitter.diff] <value> 1
> [emitter.diff] <value> 0
> [emitter.diff] <value> 1
> [emitter.diff] <end>

scanobs.scan(seed, fn)
On each value from original observable calls fn function with previous result returned by fn and current value emitted by original observable. At first time calls fn with seed and current value. Emits whatever fn returns. Always creates a property.

var emitter = Kefir.emitter();
emitter.scan(0, function(prev, next) {
  return next + prev;
}).log();
emitter.emit(1);
emitter.emit(2);
emitter.emit(2);
emitter.emit(3);
emitter.end();
> [emitter.scan] <value:current> 0
> [emitter.scan] <value> 1
> [emitter.scan] <value> 3
> [emitter.scan] <value> 5
> [emitter.scan] <value> 8
> [emitter.scan] <end>

reduceobs.reduce(seed, fn)
Similar to .scan, but emits only last result just before end.

var emitter = Kefir.emitter();
emitter.reduce(0, function(prev, next) {
  return next + prev;
}).log();
emitter.emit(1);
emitter.emit(2);
emitter.emit(2);
emitter.emit(3);
emitter.end();
> [emitter.reduce] <value> 8
> [emitter.reduce] <end>

delayobs.delay(wait)
Delays all events by wait milliseconds, with exception for current value of property, or current end for already ended observable.

var emitter = Kefir.emitter();
emitter.delay(100).map(function(originalEmitTime){
  return new Date() - originalEmitTime;
}).log();
emitter.emit(new Date());
emitter.emit(new Date());
emitter.end();
> [emitter.delay.map] <value> 100
> [emitter.delay.map] <value> 100
> [emitter.delay.map] <end>

throttleobs.throttle(wait, [options])

debounceobs.debounce(wait, [options])

withHandlerobs.withHandler(fn)

Combine observables

combineKefir.combine(obss, [fn])obs.combine(otherObs, [fn])

andKefir.and(obss)obs.and(otherObs)

orKefir.or(obss)obs.or(otherObs)

sampledByKefir.sampledBy(passiveObss, activeObss, [fn])obs.sampledBy(otherObs, [fn])

mergeKefir.merge(obss)obs.merge(otherObs)

concatKefir.concat(obss)obs.concat(otherObs)

poolKefir.pool()

flatMapobs.flatMap([fn])

flatMapLatestobs.flatMapLatest([fn])

flatMapFirstobs.flatMapFirst([fn])

flatMapConcatobs.flatMapConcat([fn])

flatMapWithConcurrencyLimitobs.flatMapWithConcurrencyLimit([fn], limit)

awatingobs.awating(otherObs)

filterByobs.filterBy(otherObs)

Active state

Each stream or property at any time may be in one of two states — active or inactive. When observable in inactive state it not emits any events, and not subscribes to it's original source. Observables automatically became active when first listener added, and became inactive when last listener removed.

For example stream = $('.foo').asKefirStream('click') won't immediately subscribe to 'click' event on $('.foo'), it will subscribe only when first listener will be added to stream. And will automatically unsubscribe when last listener will be removed from stream.

var stream = $('.foo').asKefirStream('click')
// at this moment event listener to .foo not added

stream.onValue(someFn);
// now 'click' listener added to .foo

stream.offValue(someFn);
// and now it removed again

If one observable depends on another, its active state propagates to its dependencies. For example, in following code, mapA will activate A, filterMapA will activate mapA and A, mergeAB will activate A and B.

var A = Kefir.emitter();
var B = Kefir.emitter();

var mapA = A.map(function(){ ... });
var filterMapA = mapA.filter(function(){ ... });
var mergeAB = Kefir.merge(A, B);

Notice that current value of a property won't update when property is inactive. For example, we convert an emitter to property, then emit some values via emitter, while property has no subscribers (i.e. inactive). In this case property won't get those values and won't update its current value.

var emitter = Kefir.emitter();
var property = emitter.toProperty(0);

// 1 and 2 won't become property current value
emitter.emit(1);
emitter.emit(2);

// now we activate property by subscribing to it,
// and also check current value
property.onValue(function(x) {  console.log(x)  }) // => 0

// those values will become property current
emitter.emit(3);
emitter.emit(4);

property.onValue(function(x) {  console.log(x)  }) // => 4

In fact active state is just a convention that strictly followed in Kefir code, for better performance. But you free to not follow it in your custom plugins or combinators. For example you can create your own jQuery .asKefirStream() plugin using emitter and always subscribe to DOM event, even when stream has no subscribers. Or you can create your own .map() combinator that always keeps subscription to its source observable (i.e keeps it in active state).

About callbacks

In Kefir wherever you pass functions as arguments, you can as well pass an array containing this context and additional arguments with which function will be called.

It works everywhere in Kefir where function passes as argument, for example in .map(fn), .filter(fn), .fromPoll(wait, fn) — everywhere.

All following code snippets are equivalent.

someStream.onValue(function(x) {
  someObj.someMethod(1, 2, x);
});
someStream.onValue([someObj.someMethod, someObj, 1, 2]);
someStream.onValue(['someMethod', someObj, 1, 2]);
 
someStream.onValue(someObj.someMethod.bind(someObj, 1, 2));

You can pass method name as a string instead of method itself in first position of array. Here some real world example where it can be useful:

isVisibleProperty.onValue(function(x) {
  $('.foo').toggleClass('is-visible', x);
});
isVisibleProperty.onValue(['toggleClass', $('.foo'), 'is-visible']);

If you don't need aditional arguments, but only this context, just don't add them to array: [foo.bar, foo]. And if you don't need context, pass null in its place: [foo, null, 1, 2, 3].

In order to unsubscribe from observable, you must call .offValue(fn) or .offEnd(fn) with exact same array with which you called .onValue(fn) or .onEnd(fn).

someStream.onValue(['someMethod', someObj, 1, 2]);
someStream.offValue(['someMethod', someObj, 1, 2]);

Emitter object

Emitter object is an object, that has two methods emit and end. It is used in several places in Kefir as a proxy to emit events to some stream.

emiter.emit accepts one argument (any value).
emiter.end accepts no arguments.

emitter.emit(123)
emitter.end()

Do not confuse emitter object with emitter stream. They both have emit and end methods, but emitter object isn't actually a stream, it has no stream methods or functionality. Emitter object has only two methods emit and end, that's it.

All emitter object methods are bound to its context, and can be passed safely as callbacks without binding (see example).

// instead of this
$('.foo').on('click', emitter.emit.bind(emitter))

// you can do just this
$('.foo').on('click', emitter.emit)