Kefir.js 1.3.1 (changelog)

Kefir — is a Reactive Programming library for JavaScript inspired by Bacon.js and RxJS, with focus on high performance and low memory usage.

Kefir has a GitHub repository, where you can send pull requests, report bugs, and have fun reading source code.

See also Deprecated API docs.

Installation

Kefir is available as an NPM and a Bower package, as well as a simple file download.

NPM

npm install kefir

Bower

bower install kefir

Downloads (1.3.1)

For Developmentkefir.js~ 50 KB
For Productionkefir.min.js~ 7 KB (when gzipped)
kefir.min.js.mapsource map file, in case you need one
All fileskefir-1.3.1.zip... including documentation, demos, tests, source maps, etc.

jQuery plugin

If you are going to use Kefir together with jQuery, you might be interested in the kefir-jquery plugin. It contains two handy methods $().asKefirStream and $().asKefirProperty for creating streams and properties from events on jQuery objects.

Examples

Let's start from a quick little example to get you a feel of what is it like to program with Kefir. First we create a stream of events that will produce three numbers with interval of 100 milliseconds:

var numbers = Kefir.sequentially(100, [1, 2, 3]);

Now let's create another stream based on the first one. As you might guess, it will produce 2, 4, and 6.

var numbers2 = numbers.map(function(x) {
  return x * 2;
});

Suppose we don't want number 4 to be in the sequence, no problem, we can filter it out:

var numbers3 = numbers2.filter(function(x) {
  return x !== 4;
});

Ok, I think numbers3 stream is what we want, it's time to subscribe to it and to get the values:

numbers3.onValue(function(x) {
  logger.log(x);
});

More examples

Also, almost any code snippet below can be run in the browser console, on this page. So you can play with Kefir right now, just open up the browser console.

Intro to Streams and Properties

Kefir supports two types of observables — streams and properties. Streams represent sequences of events made available over time. And properties represent values that change over time. The value of a property changes in response to events, which means that any stream may be easily converted to a property.

In practice, the only difference between the two types of observables is that properties may have a current value. The process of subscribing to both types of observables is the same: you call the onValue method, passing a callback function to it. But when you subscribe to a property which has a current value, the callback is called immediately (synchronously) with the current value of the property.

Create a stream

emitterKefir.emitter()
Creates an emitter, which is an ordinary stream, but with additional methods: .emit(value), .error(error), .end(), and .emitEvent(). The first three are pretty self-descriptive, and the last one accepts an event object with the same format than in the onAny method, and emits that event. Once an emitter was created, one can easily emit all three kinds of events from it, using these methods.

var emitter = Kefir.emitter();
emitter.log(); // log events to console (see log)
emitter.emit(1);
emitter.error('Oops!');
emitter.end();
> [emitter] <value> 1
> [emitter] <error> Oops!
> [emitter] <end>
emitter:  ----1----e----X
                   Oops!

Emitter is the easiest way to create general purpose streams, but it doesn't give you control over the active state of the stream — it doesn't allow you to monitor if the stream has subscribers or not, and to sub/unsub to your original source, or to do other resource management based on that. If you want to have that control, you should use fromBinder or fromSubUnsub.

neverKefir.never()
Creates a stream that already ended and will never produce any events.

var stream = Kefir.never();
stream.log();
> [never] <end:current>
stream:  X

laterKefir.later(wait, value)
Creates a stream that produces a single value after wait milliseconds, then ends.

var stream = Kefir.later(1000, 1);
stream.log();
> [later] <value> 1
> [later] <end>
stream:  ----1X

intervalKefir.interval(interval, value)
Creates a stream that produces the same value each interval milliseconds. Never ends.

var stream = Kefir.interval(1000, 1);
stream.log();
> [interval] <value> 1
> [interval] <value> 1
> [interval] <value> 1
...
stream:  ----1----1----1----1---

sequentiallyKefir.sequentially(interval, values)
Creates a stream containing the given values (array), delivered with the given interval in milliseconds. Ends after all values are delivered.

var stream = Kefir.sequentially(1000, [1, 2, 3]);
stream.log();
> [sequentially] <value> 1
> [sequentially] <value> 2
> [sequentially] <value> 3
> [sequentially] <end>
stream:  ----1----2----3X

fromPollKefir.fromPoll(interval, fn)
Creates a stream that polls the given fn function, with the given interval in milliseconds, and emits the values returned by fn. Never ends.

var start = new Date();
var stream = Kefir.fromPoll(1000, function(){ return new Date() - start });
stream.log();
> [fromPoll] <value> 1001
> [fromPoll] <value> 2002
> [fromPoll] <value> 3004
> [fromPoll] <value> 4006
> [fromPoll] <value> 5007
> [fromPoll] <value> 6007
...
stream:  ----•----•----•----•---
          1001 2002 3004 4006

withIntervalKefir.withInterval(interval, handler)
General method to create an interval based stream. Creates a stream that calls the given handler function, with the given interval in milliseconds. The handler function is called with one argument — emitter object.

var start = new Date();
var stream = Kefir.withInterval(1000, function(emitter) {
  var time = new Date() - start;
  if (time < 4000) {
    emitter.emit(time);   // emit a value
  } else {
    emitter.end();        // end the stream
  }
});
stream.log();
> [withInterval] <value> 1002
> [withInterval] <value> 2003
> [withInterval] <value> 3005
> [withInterval] <end>
stream:  ----•----•----•----X
          1002 2003 3005

You may call emitter methods several times on each interval tick, or not call them at all.

fromCallbackKefir.fromCallback(fn)
Convert a function than accepts a callback as the first argument to a stream. Emits at most one value when callback is called, then ends. The fn function will be called at most once, when the first subscriber will be added to the stream.

var stream = Kefir.fromCallback(function(callback) {
  // we use setTimeout here just to simulate some asynchronous activity
  setTimeout(function() {  callback(1)  }, 1000);
});
stream.log();
> [fromCallback] <value> 1
> [fromCallback] <end>
stream:  ----1X

fromNodeCallbackKefir.fromNodeCallback(fn)
Similar to fromCallback, but the callback passed to fn is a Node.JS style callback — callback(error, result). If the error argument of the callback is truthy, an error will be emitted from the result stream, otherwise a value is emitted. The stream will end after the first value or on error.

var stream = Kefir.fromNodeCallback(function(callback) {
  // we use setTimeout here just to simulate some asynchronous activity
  setTimeout(function() {  callback(null, 1)  }, 1000);
});
stream.log();
> [fromNodeCallback] <value> 1
> [fromNodeCallback] <end>
stream:  ----1X

fromEventKefir.fromEvent(target, eventName, [transform])
Creates a stream from events on a DOM EventTarget or a Node.JS EventEmitter object, or an object that supports event listeners using on/off methods (e.g. a jQuery object).

If a transform function is provided, it will be called on each event with the same arguments and context (this) as the event listener callback. And the value returned by transform will be emitted from the stream. If no transform function is provided, the first argument of the callback is emitted by default, i.e. the function(x) {return x} is used as transform.

var stream = Kefir.fromEvent(document.body, 'click');
stream.log()
> [fromEvent] <value> MouseEvent {dataTransfer: null, y: 474, x: 551 ...}
> [fromEvent] <value> MouseEvent {dataTransfer: null, y: 361, x: 751 ...}
> [fromEvent] <value> MouseEvent {dataTransfer: null, y: 444, x: 1120 ...}
stream:  ----•-----------•----•---
    MouseEvent   MouseEvent   MouseEvent

Note that it uses addEventListener() for DOM events, which is not supported by IE8. If you need IE8 support use the jQuery plugin or call fromEvent on a jQuery object, e.g. Kefir.fromEvent($('.foo'), 'click').

fromSubUnsubKefir.fromSubUnsub(subscribe, unsubscribe, [transform])
Creates a stream from subscribe and unsubscribe functions. The subscribe function is called on each activation with a callback as argument, giving you an opportunity to subscribe with this callback to an original source of values. When all subscribers from the stream are removed, the unsubscribe function is called with the same callback, so you can unsubscribe from your original source.

You can also provide a transform function, which will work the same way as in fromEvent.

function subscribe(callback) {
  document.body.addEventListener('click', callback);
}

function unsubscribe(callback) {
  document.body.removeEventListener('click', callback);
}

function transform(event) {
  return event.type + ' on ' + this.tagName;
}

var stream = Kefir.fromSubUnsub(subscribe, unsubscribe, transform);
stream.log();
> [fromBinder] <value> click on BODY
> [fromBinder] <value> click on BODY
> [fromBinder] <value> click on BODY
stream:  ----•--------------•----•---
  'click on...'  'click on...'  'click on...'

fromBinderKefir.fromBinder(subscribe)
Another method for creation of general purpose stream, along with emitter. Unlike emitter it gives you control over the active state of the stream.

Creates a stream which calls the subscribe function on each activation, passing to it an emitter object. Then you can call emitter methods at any time to emit events. You can also return an unsubscribe function from the subscribe function. If a function is returned from subscribe, it will be called on deactivation of the stream.

var stream = Kefir.fromBinder(function(emitter) {
  console.log('!activation');
  var i = 0;
  var intervalId = setInterval(function() {
    emitter.emit(++i);
  }, 1000);
  return function() {
    console.log('!deactivation');
    clearInterval(intervalId);
  }
});
stream.log();
setTimeout(function() {
  stream.offLog(); // turn off logging to deactivate stream
}, 3500);
> !activation
> [fromBinder] <value> 1
> [fromBinder] <value> 2
> [fromBinder] <value> 3
> !deactivation
stream:  ----1----2----3---

See also Custom stream demo as another fromBinder usage example.

Note that if you call emitter methods synchronously in the subscribe function, the callback passed to on* methods (onValue etc.) will be also called synchronously. And only the first subscriber will get values emitted synchronously. But if you convert the stream to a property, this value will become the current value of the property, and all subscribers will get it.

var stream = Kefir.fromBinder(function(emitter) {
  emitter.emit(1); // synchronous call
  setTimeout(function() {emitter.emit(2)}, 0); // asynchronous call
});
console.log('about to add first subscriber');
stream.onValue(function(x) {console.log('first:', x)});
console.log('first subscriber added');
stream.onValue(function(x) {console.log('second:', x)});
console.log('second subscriber added');
> about to add first subscriber
> first: 1
> first subscriber added
> second subscriber added
> first: 2
> second: 2

repeatKefir.repeat(generator)
Calls the generator function which is supposed to return an observable. Emits values and errors from the spawned observable; when it ends, calls generator again to get a new one and so on.

The generator function is called with one argument — iteration number starting from 0. If a falsy value is returned from the generator, the stream ends.

var result = Kefir.repeat(function(i) {
  if (i < 3) {
    return Kefir.sequentially(100, [i, i]);
  } else {
    return false;
  }
});
result.log();
> [repeat] <value> 0
> [repeat] <value> 0
> [repeat] <value> 1
> [repeat] <value> 1
> [repeat] <value> 2
> [repeat] <value> 2
> [repeat] <end>
spawned 1:  ---0---0X
spawned 2:          ---1---1X
spawned 3:                  ---2---2X

result:     ---0---0---1---1---2---2X

Note that with this method it is possible to create an infinite loop. Consider this example:

var result = Kefir.repeat(function() {
  return Kefir.constant(1);
});

// When we subscribe to it (directly or via .log)
// we already are in an infinite loop.
result.log();

// But if we limit it with .take or something it'll work just fine.
// So the `result` stream defined like this
// may still make sense, depending on how we use it.
result.take(10).log();

It is even more dangerous if generator constantly returns an ended observable with no values (e.g. never). In this case, .take won't help, because you'll never get any single value from it, but generator will be called over and over. The only escape path here is to define an escape condition in the generator:

var result = Kefir.repeat(function(i) {

  // Defining that a new observable will be spawned at most 10 times
  if (i >= 10) {
    return false;
  }

  return Kefir.never();
});

So just be careful when using repeat, it's a little dangerous but it is still a great method.

Create a property

constantKefir.constant(value)
Creates an ended property, with the specified current value.

var property = Kefir.constant(1);
property.log();
> [constant] <value:current> 1
> [constant] <end:current>
property: 1X

constantErrorKefir.constantError(error)
Creates an ended property, with the specified current error.

var property = Kefir.constantError(1);
property.log();
> [constantError] <error:current> 1
> [constantError] <end:current>
property: eX

fromPromiseKefir.fromPromise(promise)
Converts a promise to a property.

var myPromise = {
  then: function(onSuccess, onError) {
    var fulfill = function() {  onSuccess(1)  };
    setTimeout(fulfill, 1000);
  }
};

var result = Kefir.fromPromise(myPromise);
result.log();
> [fromPromise] <value> 1
> [fromPromise] <end>
result:  ----1X

Convert observables

toPropertystream.toProperty([current])
Converts a stream to a property. Accepts an optional current argument, which becomes the current value of the property.

You can also call toProperty on a property. If called on a property that already has a current value, it just returns the same property with the same current value. But if the source property has no current value, the specified value will be the current value of the result property.

var source = Kefir.sequentially(100, [1, 2, 3]);
var result = source.toProperty(0);
result.log();
> [sequentially.toProperty] <value:current> 0
> [sequentially.toProperty] <value> 1
> [sequentially.toProperty] <value> 2
> [sequentially.toProperty] <value> 3
> [sequentially.toProperty] <end>
source:  ----1----2----3X
result: 0----1----2----3X

changesproperty.changes()
Converts a property to a stream. If the property has a current value, it will be ignored (subscribers of the stream won't get it).

If you call changes on a stream, it'll just return the same stream.

var source = Kefir.sequentially(100, [1, 2, 3]);
var property = source.toProperty(0);
var result = property.changes();
result.log();
> [sequentially.toProperty.changes] <value> 1
> [sequentially.toProperty.changes] <value> 2
> [sequentially.toProperty.changes] <value> 3
> [sequentially.toProperty.changes] <end>
property: 0----1----2----3X
result:    ----1----2----3X

Main observable methods

onValueobs.onValue(callback)
Subscribes callback to values on an observable.

If called on a property, which has a current value, callback will be called immediately (synchronously) with that value.

var stream = Kefir.sequentially(1000, [1, 2]);
stream.onValue(function(x) {
  console.log('value:', x);
});
> value: 1
> value: 2

offValueobs.offValue(callback)
Unsubscribes callback from values on an observable.

onErrorobs.onError(callback)
Subscribes callback to errors on an observable.

If called on a property, which has a current error, callback will be called immediately (synchronously) with that error.

var stream = Kefir.sequentially(1000, [1, 2]).valuesToErrors();
stream.onError(function(x) {
  console.log('error:', x);
});
> error: 1
> error: 2

offErrorobs.offError(callback)
Unsubscribes callback from errors on an observable.

onEndobs.onEnd(callback)
Subscribes callback to ending of an observable.

If observable is already ended, callback will be called immediately (synchronously).

var stream = Kefir.sequentially(1000, [1, 2]);
stream.onEnd(function() {
  console.log('stream ended');
});
> stream ended

offEndobs.offEnd(callback)
Unsubscribes callback from ending of an observable.

onAnyobs.onAny(callback)
Subscribes callback to all three types of events. Callback is called with an event object as argument. Each event object contains three attributes — type, value, and current.

var stream = Kefir.sequentially(1000, [1, 2]);
stream.onAny(function(event) {
  console.log('event:', event);
});
> event: Object {type: "value", value: 1, current: false}
> event: Object {type: "error", value: 2, current: false}
> event: Object {type: "end", value: undefined, current: false}

offAnyobs.offAny(callback)
Unsubscribes an onAny subscriber.

logobs.log([name])
Turns on logging of any event to the browser console. Accepts an optional name argument that will be shown in the log if provided.

var stream = Kefir.sequentially(1000, [1, 2]);
stream.log('my stream');
> my stream <value> 1
> my stream <value> 2
> my stream <end>

offLogobs.offLog([name])
Turns off logging. If .log was called with a name argument, offLog must be called with the same name argument.

Modify an observable

All methods in this section create a new observable of same type* from an original one. The new observable applies some transformation to each event from the original one and emits the result of the transformation. In most cases a transformation is applied only to value events, end and error events just passes through untouched.

* For example if the original observable was a stream, then the new one will also be a stream. Same for properties. This rule has one exception for the scan method, that always returns a property.

mapobs.map(fn)
Applies the given fn function to each value from the original observable and emits the value returned by fn.

var source = Kefir.sequentially(100, [1, 2, 3]);
var result = source.map(function(x) {  return x + 1  });
result.log();
> [sequentially.map] <value> 2
> [sequentially.map] <value> 3
> [sequentially.map] <value> 4
> [sequentially.map] <end>
source: ---1---2---3X
result: ---2---3---4X

filterobs.filter([predicate])
Filters values from the original observable using the given predicate function.

If no predicate is provided, the function(x) {return x} will be used.

var source = Kefir.sequentially(100, [1, 2, 3]);
var result = source.filter(function(x) {  return x > 1  });
result.log();
> [sequentially.filter] <value> 2
> [sequentially.filter] <value> 3
> [sequentially.filter] <end>
source: ---1---2---3X
result: -------2---3X

See also filterBy.

takeobs.take(n)
Emits the first n values from the original observable, then ends.

var source = Kefir.sequentially(100, [1, 2, 3]);
var result = source.take(2);
result.log();
> [sequentially.take] <value> 1
> [sequentially.take] <value> 2
> [sequentially.take] <end>
source: ---1---2---3X
result: ---1---2X

takeWhileobs.takeWhile([predicate])
Emits values from the original observable until the given predicate function applied to a value returns false. Ends when the predicate returns false.

If no predicate is provided, the function(x) {return x} will be used.

var source = Kefir.sequentially(100, [1, 2, 3]);
var result = source.takeWhile(function(x) {  return x < 3  });
result.log();
> [sequentially.takeWhile] <value> 1
> [sequentially.takeWhile] <value> 2
> [sequentially.takeWhile] <end>
source: ---1---2---3X
result: ---1---2---X

See also takeWhileBy.

skipobs.skip(n)
Skips the first n values from the original observable, then emits all the rest.

var source = Kefir.sequentially(100, [1, 2, 3]);
var result = source.skip(2);
result.log();
> [sequentially.skip] <value> 3
> [sequentially.skip] <end>
source: ---1---2---3X
result: -----------3X

skipWhileobs.skipWhile([predicate])
Skips values from the original observable until the given predicate function applied to a value returns false, then stops applying the predicate to values and emits all of them.

If no predicate is provided, the function(x) {return x} will be used.

var source = Kefir.sequentially(100, [1, 3, 2]);
var result = source.skipWhile(function(x) {  return x < 3  });
result.log();
> [sequentially.skipWhile] <value> 3
> [sequentially.skipWhile] <value> 2
> [sequentially.skipWhile] <end>
source: ---1---3---2X
result: -------3---2X

See also skipWhileBy.

skipDuplicatesobs.skipDuplicates([comparator])
Skips duplicate values using === for comparison. Accepts an optional comparator function which is then used instead of ===.

var source = Kefir.sequentially(100, [1, 2, 2, 3, 1]);
var result = source.skipDuplicates();
result.log();
> [sequentially.skipDuplicates] <value> 1
> [sequentially.skipDuplicates] <value> 2
> [sequentially.skipDuplicates] <value> 3
> [sequentially.skipDuplicates] <value> 1
> [sequentially.skipDuplicates] <end>
source: ---1---2---2---3---1X
result: ---1---2-------3---1X

With custom comparator function:

var source = Kefir.sequentially(100, [1, 2, 2.1, 3, 1]);
var result = source.skipDuplicates(function(a, b) {
  return Math.round(a) === Math.round(b);
});
result.log();
> [sequentially.skipDuplicates] <value> 1
> [sequentially.skipDuplicates] <value> 2
> [sequentially.skipDuplicates] <value> 3
> [sequentially.skipDuplicates] <value> 1
> [sequentially.skipDuplicates] <end>
source: ---1---2---•---3---1X
                 2.1
result: ---1---2-------3---1X

diffobs.diff([fn], [seed])
On each value from the original observable, calls the fn function with the previous and current values as arguments. At first time, calls fn with seed and current value. Emits whatever fn returns.

If no seed is provided, the first value will be used as a seed, and the result observable won't emit on first value.

If no fn function is provided, function(a, b) {return [a, b]} will be used. If you want to omit fn but provide seed, pass null as fn.

var source = Kefir.sequentially(100, [1, 2, 2, 3]);
var result = source.diff(function(prev, next) {
  return next - prev;
}, 0);
result.log();
> [sequentially.diff] <value> 1
> [sequentially.diff] <value> 1
> [sequentially.diff] <value> 0
> [sequentially.diff] <value> 1
> [sequentially.diff] <end>
source: ---1---2---2---3X
result: ---1---1---0---1X

scanobs.scan(fn, [seed])
On each value from the original observable, calls the fn function with the previous result returned by fn and the current value emitted by the original observable. At first time, calls fn with seed as previous result. Emits whatever fn returns. Always creates a property.

If no seed is provided, the first value will be used as a seed.

var source = Kefir.sequentially(100, [1, 2, 2, 3]);
var result = source.scan(function(prev, next) {
  return next + prev;
}, 0);
result.log();
> [sequentially.scan] <value:current> 0
> [sequentially.scan] <value> 1
> [sequentially.scan] <value> 3
> [sequentially.scan] <value> 5
> [sequentially.scan] <value> 8
> [sequentially.scan] <end>
source:  ---1---2---2---3X
result: 0---1---3---5---8X

reduceobs.reduce(fn, [seed])
Similar to .scan, but emits only the last result just before end.

var source = Kefir.sequentially(100, [1, 2, 2, 3]);
var result = source.reduce(function(prev, next) {
  return next + prev;
}, 0);
result.log();
> [sequentially.reduce] <value> 8
> [sequentially.reduce] <end>
source:  ---1---2---2---3 X
result:  ----------------8X

mapEndobs.mapEnd(fn)
Allows you to insert an additional value just before the observable ends. fn will be called on obs' end with no arguments, and whatever it return will be emitted in the result stream before end.

var source = Kefir.sequentially(100, [1, 2, 3]);
var result = source.mapEnd(function() {  return 0  });
result.log();
> [sequentially.mapEnd] <value> 1
> [sequentially.mapEnd] <value> 2
> [sequentially.mapEnd] <value> 3
> [sequentially.mapEnd] <value> 0
> [sequentially.mapEnd] <end>
source:  ---1---2---3 X
result:  ---1---3---30X

skipEndobs.skipEnd()
Ignores end of source observable.

var source = Kefir.sequentially(100, [1, 2, 3]);
var result = source.skipEnd();
result.log();
> [sequentially.skipEnd] <value> 1
> [sequentially.skipEnd] <value> 2
> [sequentially.skipEnd] <value> 3
source:  ---1---2---3X
result:  ---1---2---3---

slidingWindowobs.slidingWindow(max, [min])
Will emit arrays containing the last n values from the obs observable, where n is between max and min arguments. By default min equals 0.

var source = Kefir.sequentially(100, [1, 2, 3, 4, 5]);
var result = source.slidingWindow(3, 2)
result.log();
> [sequentially.slidingWindow] <value> [1, 2]
> [sequentially.slidingWindow] <value> [1, 2, 3]
> [sequentially.slidingWindow] <value> [2, 3, 4]
> [sequentially.slidingWindow] <value> [3, 4, 5]
> [sequentially.slidingWindow] <end>
source:  --------1--------2--------3--------4--------5X
result:  -----------------•--------•--------•--------•X
                      [1,2]  [1,2,3]  [2,3,4]  [3,4,5]

bufferWhileobs.bufferWhile([predicate], [options])
Passes every value from the source observable to the predicate function. If it returns true, adds the value to the buffer, otherwise flushes the buffer. Also flushes the buffer before end, but you can disable that by passing {flushOnEnd: false} as options.

The default predicate is function(x) {return x}. If you want to omit predicate but pass options, pass null as predicate.

var source = Kefir.sequentially(100, [1, 2, 3, 4, 5]);
var result = source.bufferWhile(function(x) {return x !== 3});
result.log();
> [sequentially.bufferWhile] <value> [1, 2, 3]
> [sequentially.bufferWhile] <value> [4, 5]
> [sequentially.bufferWhile] <end>
source:  ---1---2---3---4---5 X
result:  -----------•--------•X
              [1,2,3]    [4,5]

delayobs.delay(wait)
Delays all events by wait milliseconds, with an exception for the current value of a property, or the end of an already ended observable. Doesn't delay errors.

var source = Kefir.sequentially(200, [1, 2, 3]);
var result = source.delay(100);
result.log();

> [sequentially.delay] <value> 1
> [sequentially.delay] <value> 2
> [sequentially.delay] <value> 3
> [sequentially.delay] <end>
source:  -----1-----2-----3X
result:  --------1-----2-----3X

throttleobs.throttle(wait, [options])
Return a new throttled version of the original observable, which will emit values only at most once every wait milliseconds. If used on a property, the current value will always pass without any delay.

Accepts an optional options object similar to underscore.throttle. By default, it will emit an event as soon as it comes for the first time, and, if any new event comes during the wait period, it will emit the last of them as soon as that period is over. If you'd like to disable the leading-edge emit, pass {leading: false}. And if you'd like to disable the emit on the trailing-edge, pass {trailing: false}.

var source = Kefir.sequentially(750, [1, 2, 3, 4, 5, 6, 7, 8, 9, 0]);
var result = source.throttle(2500);
result.log();
> [sequentially.throttle] <value> 1
> [sequentially.throttle] <value> 4
> [sequentially.throttle] <value> 7
> [sequentially.throttle] <value> 0
> [sequentially.throttle] <end>
source:  --1--2--3--4--5--6--7--8--9--0X
result:  --1---------4---------7---------0X

debounceobs.debounce(wait, [options])
Creates a new debounced version of the original observable. Will emit a value only after wait milliseconds period of no events. Pass {immediate: true} as an options object to cause observable to emit a value on the leading instead of the trailing edge of the wait interval. If used on a property, the current value will always pass without any delay.

var source = Kefir.sequentially(100, [1, 2, 3, 0, 0, 0, 4, 5, 6]);
source = source.filter(function(x) {return x > 0});
var result = source.debounce(250);
result.log();
> [sequentially.filter.debounce] <value> 3
> [sequentially.filter.debounce] <value> 6
> [sequentially.filter.debounce] <end>
source:  ---1---2---3---------------4---5---6X
result:  ----------------------3---------------------6X

flattenobs.flatten([transformer])
For this method it's expected that the source observable emits arrays. The result observable will then emit each element of these arrays.

var source = Kefir.sequentially(100, [[1], [], [2,3]]);
var result = source.flatten();
result.log();
> [sequentially.flatten] <value> 1
> [sequentially.flatten] <value> 2
> [sequentially.flatten] <value> 3
> [sequentially.flatten] <end>
source:  --------•--------•-------- •X
               [1]       []     [2,3]
result:  --------1-----------------23X

You can also provide the transformer function which will be applied to each value from obs observable, and which is supposed to return an array. This makes flatten to be a pretty powerful transformation method. It allows you to do three kinds of transformations on each value: change value (like map), skip value (like filter), and respond with several values to a single value. If you want to skip a value, return an empty array, to change the value — return an array with a single new value, to emit several values — return them in an array.

var source = Kefir.sequentially(100, [1, 2, 3, 4]);
var result = source.flatten(function(x) {
  if (x % 2 === 0) {
    return [x * 10];
  } else {
    return [];
  }
});
result.log();
> [sequentially.flatten] <value> 20
> [sequentially.flatten] <value> 40
> [sequentially.flatten] <end>
source:  ---1---2---3---4X
result:  -------•-------•X
               20      40

See also flatMap

transduceobs.transduce(transducer)
This method allows you to use transducers in Kefir. It supports any transducers implementation that follows the transducer protocol, for example cognitect-labs/transducers-js or jlongster/transducers.js. To learn more about transducers please visit these library pages.

In the example the cognitect-labs/transducers-js library is used.

var t = transducers;
var source = Kefir.sequentially(100, [1, 2, 3, 4, 5, 6]);
var myTransducer = t.comp(
  t.map(function(x) {return x + 10}),
  t.filter(function(x) {return x % 2 === 0}),
  t.take(2)
);
var result = source.transduce(myTransducer);
result.log();
> [sequentially.transduce] <value> 12
> [sequentially.transduce] <value> 14
> [sequentially.transduce] <end>
source:  ---1---2---3---4---5---6X
result:  -------•-------•X
               12      14

withHandlerobs.withHandler(handler)
The most general transformation method. All other transformation methods above can be implemented via withHandler. Will call the handler function on each event from obs observable, passing to it two arguments: an emitter object, and an event object (with same format as in onAny callback).

By default, it will not emit any values or errors, and it will not end when obs observable ends. Instead you should implement the desired behaviour in the handler function, i.e. analyse event object and call emitter methods if necessary. You can call the emitter methods several times in each handler execution, and you can also call them any time later, for example to implement delay.

var source = Kefir.sequentially(100, [0, 1, 2, 3]);
var result = source.withHandler(function(emitter, event) {
  if (event.type === 'end') {
    emitter.emit('bye');
    emitter.end();
  }
  if (event.type === 'value') {
    for (var i = 0; i < event.value; i++) {
      emitter.emit(event.value);
    }
  }
});
result.log();
> [sequentially.withHandler] <value> 1
> [sequentially.withHandler] <value> 2
> [sequentially.withHandler] <value> 2
> [sequentially.withHandler] <value> 3
> [sequentially.withHandler] <value> 3
> [sequentially.withHandler] <value> 3
> [sequentially.withHandler] <value> bye
> [sequentially.withHandler] <end>
source:  ---0---1--- 2---  3 X
result:  -------•---••---••••X
                1   22   333bye

valuesToErrorsobs.valuesToErrors([handler])
Converts values to errors. By default it converts all values to errors, but you can specify a custom handler function to change that. The handler function is called with one argument — a value, and must return an object with two properties {convert: Boolean, error: AnyType}, if convert is set to true, the specified error will be emitted, otherwise the original value will be emitted, and the error property will be ignored.

var source = Kefir.sequentially(100, [0, -1, 2, -3]);
var result = source.valuesToErrors(function(x) {
  return {
    convert: x < 0,
    error: x * 2
  };
});
result.log();
> [sequentially.valuesToErrors] <value> 0
> [sequentially.valuesToErrors] <error> -2
> [sequentially.valuesToErrors] <value> 2
> [sequentially.valuesToErrors] <error> -6
> [sequentially.valuesToErrors] <end>
source:  ---•---•---•---•X
            0  -1   2  -3
result:  ---•---e---•---eX
            0  -2   2  -6

errorsToValuesobs.errorsToValues([handler])
Same as valuesToErrors but vice versa.

var source = Kefir.sequentially(100, [0, -1, 2, -3]).valuesToErrors();
var result = source.errorsToValues(function(x) {
  return {
    convert: x >= 0,
    value: x * 2
  };
});
result.log();
> [sequentially.valuesToErrors.errorsToValues] <value> 0
> [sequentially.valuesToErrors.errorsToValues] <error> -1
> [sequentially.valuesToErrors.errorsToValues] <value> 4
> [sequentially.valuesToErrors.errorsToValues] <error> -3
> [sequentially.valuesToErrors.errorsToValues] <end>
source:  ---e---e---e---eX
            0  -1   2  -3
result:  ---•---e---•---eX
            0  -1   4  -3

mapErrorsobs.mapErrors(fn)
Applies the given fn function to each error from the original observable and emits the error returned by fn.

var source = Kefir.sequentially(100, [0, 1, 2, 3]).valuesToErrors();
var result = source.mapErrors(function(x) {
  return x * 2;
});
result.log();
> [sequentially.valuesToErrors.mapErrors] <error> 0
> [sequentially.valuesToErrors.mapErrors] <error> 2
> [sequentially.valuesToErrors.mapErrors] <error> 4
> [sequentially.valuesToErrors.mapErrors] <error> 6
> [sequentially.valuesToErrors.mapErrors] <end>
source:  ---e---e---e---eX
            0   1   2   3
result:  ---e---e---e---eX
            0   2   4   6

filterErrorsobs.filterErrors([predicate])
Filters errors from the original observable using the given predicate function.

If no predicate is provided, the function(x) {return x} will be used.

var source = Kefir.sequentially(100, [0, 1, 2, 3]).valuesToErrors();
var result = source.filterErrors(function(x) {
  return (x % 2) === 0;
});
result.log();
> [sequentially.valuesToErrors.filterErrors] <error> 0
> [sequentially.valuesToErrors.filterErrors] <error> 2
> [sequentially.valuesToErrors.filterErrors] <end>
source:  ---e---e---e---eX
            0   1   2   3
result:  ---e-------e----X
            0       2

skipErrorsobs.skipErrors()
Ignores all errors from the original observable, emitting only values and end.

var source = Kefir.sequentially(100, [0, -1, 2, -3])
  .valuesToErrors(function(x) {
    return {convert: x < 0, error: x};
  });
var result = source.skipErrors()
result.log();
> [sequentially.valuesToErrors.skipErrors] <value> 0
> [sequentially.valuesToErrors.skipErrors] <value> 2
> [sequentially.valuesToErrors.skipErrors] <end>
source:  ---•---e---•---eX
            0  -1   2  -3
result:  ---•-------•----X
            0       2

skipValuesobs.skipValues()
Ignores all values from the original observable, emitting only errors and end.

var source = Kefir.sequentially(100, [0, -1, 2, -3])
  .valuesToErrors(function(x) {
    return {convert: x < 0, error: x};
  });
var result = source.skipValues()
result.log();
> [sequentially.valuesToErrors.skipValues] <error> -1
> [sequentially.valuesToErrors.skipValues] <error> -3
> [sequentially.valuesToErrors.skipValues] <end>
source:  ---•---e---•---eX
            0  -1   2  -3
result:  -------e-------eX
               -1      -3

endOnErrorobs.endOnError()
Makes an observable to end on first error.

var source = Kefir.sequentially(100, [0, -1, 2, -3])
  .valuesToErrors(function(x) {
    return {convert: x < 0, error: x};
  });
var result = source.endOnError()
result.log();
> [sequentially.valuesToErrors.endOnError] <value> 0
> [sequentially.valuesToErrors.endOnError] <error> -1
> [sequentially.valuesToErrors.endOnError] <end>
source:  ---•---e---•---eX
            0  -1   2  -3
result:  ---•---eX
            0  -1

Combine observables

combineKefir.combine(obss, [passiveObss], [combinator])obs.combine(otherObs, [combinator])Returns a stream. Combines two or more observables together. On each value from any source observable (obss array), emits a combined value, generated by the combinator function from the latest values from each source observable. The combinator function is called with the latest values as arguments. If no combinator is provided, it emits an array containing the latest values.

var a = Kefir.sequentially(100, [1, 3]);
var b = Kefir.sequentially(100, [2, 4]).delay(40);

function sum(a, b) {
  return a + b;
}

var result = Kefir.combine([a, b], sum);
result.log();
> [combine] <value> 3
> [combine] <value> 5
> [combine] <value> 7
> [combine] <end>
a:       ----1----3X
b:       ------2----4X

result:  ------3--5-7X

You can also pass part of the source observables as passiveObss in a second array, the result stream won't emit on values from passiveObss, but all the values will be available in the combinator function.

var a = Kefir.sequentially(100, [1, 3]);
var b = Kefir.sequentially(100, [2, 4]).delay(40);
var c = Kefir.sequentially(60, [5, 6, 7]);

function sum(a, b, c) {
  return a + b + c;
}

var result = Kefir.combine([a, b], [c], sum);
result.log();
> [combine] <value> 9
> [combine] <value> 12
> [combine] <value> 14
> [combine] <end>
a:       ----1----3X
b:       ------2----4X
c:       --5--6--7X

result:  ------•--•-•X
               9 12 14

The result stream emits a value only when it has at least one value from each of source observables. Ends when all the active source observables (obss array) end.

You can also combine two observables by calling a.combine(b, combinator) if you like.

zipKefir.zip(sources, [combinator])obs.zip(otherObs, [combinator])Creates a stream with values from sources lined up with each other. For example if you have two sources with values [1, 2, 3] and [4, 5, 6, 7], the result stream will emit [1, 4], [2, 5], and [3, 6]. The result stream will emit the next value only when it has at least one value from each source.

You can also provide a combinator function. In this case, instead of emitting an array of values, they will be passed to combinator as arguments, and the returned value will be emitted (same as in combine, sampledBy, etc.)

Also in zip you can pass ordinary arrays along with observables in the sources, e.g. Kefir.zip([obs, [1, 2, 3]], fn). In other words, sources is an array of observables and arrays, or only observables of course.

The result stream ends when all sources end.

var a = Kefir.sequentially(100, [0, 1, 2, 3]);
var b = Kefir.sequentially(160, [4, 5, 6]);
var c = Kefir.sequentially(100, [8, 9]).delay(260).toProperty(7);
var result = Kefir.zip([a, b, c]);
result.log();
> [zip] <value> [0, 4, 7]
> [zip] <value> [1, 5, 8]
> [zip] <value> [2, 6, 9]
> [zip] <end>
a:    ----0----1----2----3X
b:    -------4-------5-------6X
c:   7-----------------8----9X

abc:  -------•---------•-----•X
       [0,4,7]   [1,5,8]     [2,6,9]

mergeKefir.merge(obss)obs.merge(otherObs)Merges several obss observables into a single stream i.e., simply repeats values from each source observable. Ends when all obss observables end.

You can also merge two observables by calling a.merge(b), if you like.

var a = Kefir.sequentially(100, [0, 1, 2]);
var b = Kefir.sequentially(100, [0, 1, 2]).delay(30);
var c = Kefir.sequentially(100, [0, 1, 2]).delay(60);
var abc = Kefir.merge([a, b, c]);
abc.log();
> [merge] <value> 0
> [merge] <value> 0
> [merge] <value> 0
> [merge] <value> 1
> [merge] <value> 1
> [merge] <value> 1
> [merge] <value> 2
> [merge] <value> 2
> [merge] <value> 2
> [merge] <end>
a:    ----------0---------1---------2X
b:    ------------0---------1---------2X
c:    --------------0---------1---------2X

abc:  ----------0-0-0-----1-1-1-----2-2-2X

concatKefir.concat(obss)obs.concat(otherObs)Concatenates several obss observables into one stream. Like merge, but it starts emitting values from the next source only after the previous source ends, ignoring any value from the next sources before that.

var a = Kefir.emitter();
var b = Kefir.emitter();
var c = Kefir.emitter();

var abc = Kefir.concat([a, b, c]);
abc.log();

a.emit(0).emit(1);
b.emit(0);
a.emit(2).end();
c.emit(0);
b.emit(1);
c.emit(1);
b.emit(2).end();
c.emit(2).end();
> [concat] <value> 0
> [concat] <value> 1
> [concat] <value> 2
> [concat] <value> 1
> [concat] <value> 2
> [concat] <value> 2
> [concat] <end>
a:    ---0---1---2X
b:    ---------0-----1---2X
c:    -------------0---1---2X

abc:  ---0---1---2---1---2-2X

poolKefir.pool()
Pool is like merge to which you can dynamically add and remove sources. When you create a new pool it has no sources. Then you can add observables to it using the plug method, and remove them using unplug. Pool never ends.

var a = Kefir.emitter();
var b = Kefir.emitter();

var pool = Kefir.pool();
pool.log();

a.emit(1);
b.emit(1);
pool.plug(a);
a.emit(2);
b.emit(2);
pool.plug(b);
a.emit(3);
b.emit(3);
pool.unplug(a);
a.emit(4);
b.emit(4);
a.end();
b.end();
> [pool] <value> 2
> [pool] <value> 3
> [pool] <value> 3
> [pool] <value> 4
a:       ---1-----2-----3----4-----X
b:       ----1------2------3----4---X

plug:    ------a------b------------------
unplug:  -------------------a------------

pool:    ---------2-----3--3----4--------

In this graph plug and unplug are shown just to illustrate moments when we plug and unplug sources. Don't be confused, there are no plug or unplug streams somewhere.

busKefir.bus()
Bus is a pool with emitter methods. You can emit values from it directly. It is the best choice to expose an input from a module, so module users could easily send events to your module directly or by plugging an observable.

var bus = Kefir.bus();
var emitter = Kefir.emitter();
bus.log();

bus.plug(emitter);
bus.emit(1);
emitter.emit(2);
bus.end();
> [bus] <value> 1
> [bus] <value> 2
> [bus] <end>

flatMapobs.flatMap([transform])
Works similar to flatten, but instead of arrays, it handles observables. Like in flatten you can either provide a transform function which will return observables, or you can use the source obs observable that already emits observables.

flatMap ends when obs and all spawned observables end.

var source = Kefir.sequentially(100, [1, 2, 3]);
var result = source.flatMap(function(x) {
  return Kefir.interval(40, x).take(4);
});
result.log();
> [sequentially.flatMap] <value> 1
> [sequentially.flatMap] <value> 1
> [sequentially.flatMap] <value> 1
> [sequentially.flatMap] <value> 2
> [sequentially.flatMap] <value> 1
> [sequentially.flatMap] <value> 2
> [sequentially.flatMap] <value> 2
> [sequentially.flatMap] <value> 3
> [sequentially.flatMap] <value> 2
> [sequentially.flatMap] <value> 3
> [sequentially.flatMap] <value> 3
> [sequentially.flatMap] <value> 3
> [sequentially.flatMap] <end>
source:      ----------1---------2---------3X

spawned 1:             ---1---1---1---1X
spawned 2:                       ---2---2---2---2X
spawned 3:                                 ---3---3---3---3X

result:      -------------1---1---1-2-1-2---2-3-2-3---3---3X

flatMapLatestobs.flatMapLatest([fn])
Like flatMap, but repeats events only from the latest added observable i.e., switching from one observable to another.

var source = Kefir.sequentially(100, [1, 2, 3]);
var result = source.flatMapLatest(function(x) {
  return Kefir.interval(40, x).take(4);
});
result.log();
> [sequentially.flatMapLatest] <value> 1
> [sequentially.flatMapLatest] <value> 1
> [sequentially.flatMapLatest] <value> 2
> [sequentially.flatMapLatest] <value> 2
> [sequentially.flatMapLatest] <value> 3
> [sequentially.flatMapLatest] <value> 3
> [sequentially.flatMapLatest] <value> 3
> [sequentially.flatMapLatest] <value> 3
> [sequentially.flatMapLatest] <end>
source:      ----------1---------2---------3X

spawned 1:             ---1---1---1---1X
spawned 2:                       ---2---2---2---2X
spawned 3:                                 ---3---3---3---3X

result:      -------------1---1-----2---2-----3---3---3---3X

flatMapFirstobs.flatMapFirst([fn])
Like flatMap, but adds a new observable only if the previous one ended. Otherwise, it just ignores the new observable.

var source = Kefir.sequentially(100, [1, 2, 3]);
var result = source.flatMapFirst(function(x) {
  return Kefir.interval(40, x).take(4);
});
result.log();
> [sequentially.flatMapFirst] <value> 1
> [sequentially.flatMapFirst] <value> 1
> [sequentially.flatMapFirst] <value> 1
> [sequentially.flatMapFirst] <value> 1
> [sequentially.flatMapFirst] <value> 3
> [sequentially.flatMapFirst] <value> 3
> [sequentially.flatMapFirst] <value> 3
> [sequentially.flatMapFirst] <value> 3
> [sequentially.flatMapFirst] <end>
source:      ----------1---------2---------3X

spawned 1:             ---1---1---1---1X
spawned 2:                       ---2---2---2---2X
spawned 3:                                 ---3---3---3---3X

result:      -------------1---1---1---1-------3---3---3---3X

flatMapConcatobs.flatMapConcat([fn])
Like flatMapFirst, but instead of ignoring new observables (if the previous one is still alive), it adds them to the queue. Then, when the current source ends, it takes the oldest observable from the queue, and switches to it.

var source = Kefir.sequentially(100, [1, 2, 3]);
var result = source.flatMapConcat(function(x) {
  return Kefir.interval(40, x).take(4);
});
result.log();
> [sequentially.flatMapConcat] <value> 1
> [sequentially.flatMapConcat] <value> 1
> [sequentially.flatMapConcat] <value> 1
> [sequentially.flatMapConcat] <value> 1
> [sequentially.flatMapConcat] <value> 2
> [sequentially.flatMapConcat] <value> 2
> [sequentially.flatMapConcat] <value> 2
> [sequentially.flatMapConcat] <value> 2
> [sequentially.flatMapConcat] <value> 3
> [sequentially.flatMapConcat] <value> 3
> [sequentially.flatMapConcat] <value> 3
> [sequentially.flatMapConcat] <value> 3
> [sequentially.flatMapConcat] <end>
source:      ----------1---------2---------3X

spawned 1:             ---1---1---1---1X
spawned 2:                             ---2---2---2---2X
spawned 3:                                             ---3---3---3---3X

result:      -------------1---1---1---1---2---2---2---2---3---3---3---3X

flatMapConcurLimitobs.flatMapConcurLimit([fn], limit)
Like flatMapConcat, but with a configurable number of concurent sources. In other words flatMapConcat is flatMapConcurLimit(fn, 1).

var source = Kefir.sequentially(100, [1, 2, 3]);
var result = source.flatMapConcurLimit(function(x) {
  return Kefir.interval(40, x).take(6);
}, 2);
result.log();
> [sequentially.flatMapConcurLimit] <value> 1
> [sequentially.flatMapConcurLimit] <value> 1
> [sequentially.flatMapConcurLimit] <value> 1
> [sequentially.flatMapConcurLimit] <value> 2
> [sequentially.flatMapConcurLimit] <value> 1
> [sequentially.flatMapConcurLimit] <value> 2
> [sequentially.flatMapConcurLimit] <value> 1
> [sequentially.flatMapConcurLimit] <value> 2
> [sequentially.flatMapConcurLimit] <value> 1
> [sequentially.flatMapConcurLimit] <value> 2
> [sequentially.flatMapConcurLimit] <value> 3
> [sequentially.flatMapConcurLimit] <value> 2
> [sequentially.flatMapConcurLimit] <value> 3
> [sequentially.flatMapConcurLimit] <value> 2
> [sequentially.flatMapConcurLimit] <value> 3
> [sequentially.flatMapConcurLimit] <value> 3
> [sequentially.flatMapConcurLimit] <value> 3
> [sequentially.flatMapConcurLimit] <value> 3
> [sequentially.flatMapConcurLimit] <end>
source:      ----------1---------2---------3X

spawned 1:             ---1---1---1---1---1---1X
spawned 2:                       ---2---2---2---2---2---2X
spawned 3:                                     ---3---3---3---3---3---3X

result:      -------------1---1---1-2-1-2-1-2-1-2-3-2-3-2-3---3---3---3X

Combine two observables

Just like in the "Modify an observable" section, most of the methods in this section will return an observable of same type as the original observable (on which the method was called).

filterByobs.filterBy(otherObs)
Works like filter, but instead of calling a predicate on each value from obs observable, it checks the last value from otherObs.

var foo = Kefir.sequentially(100, [1, 2, 3, 4, 5, 6, 7, 8]);
var bar = Kefir.sequentially(200, [false, true, false]).delay(40).toProperty(true);
var result = foo.filterBy(bar);
result.log();
> [sequentially.filterBy] <value> 1
> [sequentially.filterBy] <value> 2
> [sequentially.filterBy] <value> 5
> [sequentially.filterBy] <value> 6
> [sequentially.filterBy] <end>
foo:     ----1----2----3----4----5----6----7----8X
bar:    t-----------f---------t---------fX

result:  ----1----2--------------5----6----------X

sampledByobs.sampledBy(otherObs, [combinator])
Returns a stream that emits the latest value from obs observable on each value from otherObs. Ends when otherObs ends.

You can also provide a combinator function which will be used to form the value emitted by the result stream. It is called with the latest values from obs and otherObs as arguments. The default combinator function is function(a, b) {return a}.

var a = Kefir.sequentially(200, [2, 3]).toProperty(1);
var b = Kefir.interval(100, 0).delay(40).take(5);
var result = a.sampledBy(b);
result.log();
> [sequentially.toProperty.sampledBy] <value> 1
> [sequentially.toProperty.sampledBy] <value> 2
> [sequentially.toProperty.sampledBy] <value> 2
> [sequentially.toProperty.sampledBy] <value> 3
> [sequentially.toProperty.sampledBy] <value> 3
> [sequentially.toProperty.sampledBy] <end>
a:      1---------2---------3X
b:       ------0----0----0----0----0X

result:  ------1----2----2----3----3X

takeWhileByobs.takeWhileBy(otherObs)
Works like takeWhile, but instead of using a predicate function it uses another observable. It takes values from obs observable until the first falsey value from otherObs.

Note that it will not produce any value until the first value from otherObs. If that is not what you need, just turn your stream into a property with the current value at true by calling .toProperty(true).

var foo = Kefir.sequentially(100, [1, 2, 3, 4, 5, 6, 7, 8]);
var bar = Kefir.sequentially(200, [true, false, true]).delay(40).toProperty(true);
var result = foo.takeWhileBy(bar);
result.log();
> [sequentially.takeWhileBy] <value> 1
> [sequentially.takeWhileBy] <value> 2
> [sequentially.takeWhileBy] <value> 3
> [sequentially.takeWhileBy] <value> 4
> [sequentially.takeWhileBy] <end>
foo:     ----1----2----3----4----5----6----7----8X
bar:     t----------t---------f---------tX

result:  ----1----2----3----4-X

skipWhileByobs.skipWhileBy(otherObs)
Works like skipWhile, but instead of using a predicate function it uses another observable. It skips values from obs observable until the first falsey value from otherObs.

var foo = Kefir.sequentially(100, [1, 2, 3, 4, 5, 6, 7, 8]);
var bar = Kefir.sequentially(200, [true, false, true]).delay(40);
var result = foo.skipWhileBy(bar);
result.log();
> [sequentially.skipWhileBy] <value> 1
> [sequentially.skipWhileBy] <value> 2
> [sequentially.skipWhileBy] <value> 3
> [sequentially.skipWhileBy] <value> 4
> [sequentially.skipWhileBy] <end>
foo:     ----1----2----3----4----5----6----7----8X
bar:     -----------t---------f---------tX

result:  ------------------------5----6----7----8X

skipUntilByobs.skipUntilBy(otherObs)
Similar to skipWhileBy, but instead of waiting for the first falsey value from otherObs, it waits for just any value from it.

var foo = Kefir.sequentially(100, [1, 2, 3, 4]);
var bar = Kefir.later(250, 0);
var result = foo.skipUntilBy(bar);
result.log();
> [sequentially.skipUntilBy] <value> 3
> [sequentially.skipUntilBy] <value> 4
> [sequentially.skipUntilBy] <end>
foo:     ----1----2----3----4X
bar:     -----------0X

result:  --------------3----4X

takeUntilByobs.takeUntilBy(otherObs)
Similar to takeWhileBy, but instead of waiting for the first falsey value from otherObs, it waits for just any value from it.

var foo = Kefir.sequentially(100, [1, 2, 3, 4]);
var bar = Kefir.later(250, 0);
var result = foo.takeUntilBy(bar);
result.log();
> [sequentially.takeUntilBy] <value> 1
> [sequentially.takeUntilBy] <value> 2
> [sequentially.takeUntilBy] <end>
foo:     ----1----2----3----4X
bar:     -----------0X

result:  ----1----2-X

bufferByobs.bufferBy(otherObs, [options])
Buffers all values from obs observable, and flushes the buffer on each value from otherObs. Also flushes the buffer before end, but you can disable that by passing {flushOnEnd: false} as options.

var foo = Kefir.sequentially(100, [1, 2, 3, 4, 5, 6, 7, 8]).delay(40);
var bar = Kefir.sequentially(300, [1, 2])
var result = foo.bufferBy(bar);
result.log();
> [sequentially.delay.bufferBy] <value> [1, 2]
> [sequentially.delay.bufferBy] <value> [3, 4, 5]
> [sequentially.delay.bufferBy] <value> [6, 7, 8]
> [sequentially.delay.bufferBy] <end>
foo:     ------1----2----3----4----5----6----7----8 X
bar:     --------------1--------------2X

result:  --------------•--------------•------------•X
                  [1, 2]      [3, 4, 5]    [6, 7, 8]

bufferWhileByobs.bufferWhileBy(otherObs, [options])
Similar to bufferWhile, but instead of using a predicate function it uses another observable. On each value from obs observable: if the last value from otherObs was truthy, adds the new value to the buffer, otherwise flushes the buffer (with the new value included). Also flushes the buffer before end, but you can disable that by passing {flushOnEnd: false} as options.

var foo = Kefir.sequentially(100, [1, 2, 3, 4, 5, 6, 7, 8]);
var bar = Kefir.sequentially(200, [false, true, false]).delay(40);
var result = foo.bufferWhileBy(bar);
result.log();
> [sequentially.bufferWhileBy] <value> [1, 2, 3]
> [sequentially.bufferWhileBy] <value> [4]
> [sequentially.bufferWhileBy] <value> [5, 6, 7]
> [sequentially.bufferWhileBy] <value> [8]
> [sequentially.bufferWhileBy] <end>
foo:     ----1----2----3----4----5----6----7----8X
bar:     -----------f---------t---------fX

result:  --------------•----•--------------•----•X
               [1, 2, 3]  [4]      [5, 6, 7]  [8]

awaitingobs.awaiting(otherObs)
Returns a property that represents the awaiting status of two observables, i.e. answers the question «Has obs observable emitted a value since the last value from otherObs observable has been emitted?».

var foo = Kefir.sequentially(100, [1, 2, 3]);
var bar = Kefir.sequentially(100, [1, 2, 3]).delay(40);
var result = foo.awaiting(bar);
result.log();
> [sequentially.awaiting] <value:current> false
> [sequentially.awaiting] <value> true
> [sequentially.awaiting] <value> false
> [sequentially.awaiting] <value> true
> [sequentially.awaiting] <value> false
> [sequentially.awaiting] <value> true
> [sequentially.awaiting] <value> false
> [sequentially.awaiting] <end>
foo:     ----1----2----3X
bar:     ------1----2----3X

result:  f---t-f--t-f--t-fX

Active state

Each stream or property at any given time may be in one of two states — active or inactive. When an observable is in an inactive state, it does not emit any event, and does not subscribe to it's original source. Observables automatically become active when the first listener is added, and become inactive when the last listener is removed.

For example stream = Kefir.fromEvent(el, 'click') won't immediately subscribe to the 'click' event on el, it will subscribe only when the first listener will be added to the stream. And it will automatically unsubscribe when the last listener will be removed from the stream.

var stream = Kefir.fromEvent(el, 'click');
// at this moment event listener to _el_ not added

stream.onValue(someFn);
// now 'click' listener is added to _el_

stream.offValue(someFn);
// and now it is removed again

If one observable depends on another, its active state propagates to its dependencies. For example, in the following code, mapA will activate A, filterMapA will activate mapA and A, mergeAB will activate A and B.

var A = Kefir.emitter();
var B = Kefir.emitter();

var mapA = A.map(function(){ ... });
var filterMapA = mapA.filter(function(){ ... });
var mergeAB = Kefir.merge(A, B);

In fact, active state is just a convention that is strictly followed in the code of Kefir, for better performance. But you are free to not follow it in your custom plugins or combinators.

Note that the current value of a property won't update when that property is inactive. For example, we convert an emitter to a property, then emit some values, while the property has no subscribers (i.e. is inactive). In this case the property won't get those values, and it won't update its current value.

var emitter = Kefir.emitter();
var property = emitter.toProperty(0);

// 1 and 2 won't become the property's current value
emitter.emit(1);
emitter.emit(2);

// now we activate property by subscribing to it,
// and also check current value
property.onValue(function(x) {  console.log(x)  }) // => 0

// these values will become the property's current value
emitter.emit(3);
emitter.emit(4);

property.onValue(function(x) {  console.log(x)  }) // => 4

This issue applies not only to properties, but also to all stateful observables (like take, diff, scan etc.). In some rare cases, to solve this issue, you might need to activate an observable by adding a dummy subscriber. It's ok if you really need this, but don't overuse that pattern. For example obs.map(sideEffect).onValue(function(){}) is a anti-pattern, you should do obs.onValue(sideEffect) instead.

Emitter object

The emitter object has four methods for emitting events. It is used in several places in Kefir as a proxy to emit events to some observable.

The methods names describe themselves pretty clearly:

emitter.emit(123);
emitter.error('Oh, snap!');
emitter.end();

Do not confuse the emitter object with an emitter stream. They both have similar methods, but the emitter object isn't actually a stream, it has no stream methods or functionality. The emitter object has only four methods, that's it.

All emitter object methods are bound to its context, and can be passed safely as callbacks without binding:

// instead of this
$('.foo').on('click', emitter.emit.bind(emitter));

// you can do just this
$('.foo').on('click', emitter.emit);

Errors

Kefir supports an additional channel to pass data through observables — errors. Unlike values, errors normally just flow through the observable chain without any transformation. Consider this example:

function add2(x) {
  return x + 2;
}

function gt3(x) {
  return x > 3;
}

var foo = Kefir.emitter();
var bar = foo.map(add2).filter(gt3);

bar.log();

foo.emit(0);
foo.emit(2);
foo.error(-1);
foo.emit(3);
foo.end();

> [emitter.map.filter] <value> 4
> [emitter.map.filter] <error> -1
> [emitter.map.filter] <value> 5
> [emitter.map.filter] <end>
foo: ---0---2---e---3---X
                -1

bar: -------4---e---5---X
                -1

As you can see values are being mapped and filtered, but errors just flow unchanged. Also notice that observable doesn't end on an error by default, but you can use the endOnError method to make it happen.

With multiple source observables it works same way. Errors from each source show up unchanged in the resulting observable.

function sum(a, b) {
  return a + b;
}

var foo = Kefir.emitter();
var bar = Kefir.emitter();
var baz = Kefir.combine([foo, bar], sum);

baz.log();

foo.emit(1);
bar.emit(2);
foo.error(-1);
foo.emit(3);
bar.error(-2);
bar.emit(4);
foo.end();
bar.end();
> [combine] <value> 3
> [combine] <error> -1
> [combine] <value> 5
> [combine] <error> -2
> [combine] <value> 7
> [combine] <end>
foo: ---1-------e---3-----------X
                -1
bar: -------2-----------e---4-------X
                       -2

baz: -------3---e---5---e---7-------X
                -1      -2

But notice that if a multiple source observable isn't watched for some of it's sources at some point in time it will also not emit errors from them. This applies to observables like concat, flatMapConcat, etc.

Current values/errors in streams

Normally only properties have current values, but sometimes streams may also emit them. I.e. sometimes a stream can emit a value (or error) in response to a first subscription (i.e. on activation), and only the first subscriber will get the value.

In Bacon.js there are even methods that create such streams (.once, .fromArray), but in Kefir we are trying to avoid them as they may cause some confusion for beginners (which actually happens quite a lot for Bacon). So in Kefir it is less easier to create streams emitting current values, but there are still situations when they can be created.

Let see some examples. Each of these streams will emit a value at the moment the first subscriber is added:

var s1 = Kefir.merge([Kefir.constant(1), Kefir.never()]);

var s2 = Kefir.fromBinder(function(emitter) {
  emitter.emit(1);
});

var s3 = Kefir.combine([Kefir.constant(1), Kefir.constant(1)], function(x) {
  return x * x;
});

When a stream emits a value like this, it is internally considered as a current value. The log method will mark it as current, and in onAny subscriber, event.current will be equal to true.

There are some issues with all this, but also some benefits. Issue number one is that only the first subscriber gets this value (or error). Even if it was an onEnd subscriber, it'll still eat all current values, and further subscribers won't get them. Second issue is that it may be considered as a not semantically correct behaviour, i.e., because the moment when the stream emits values depends on when it gets subscribers...

First benefit — it allows you to define the current value in fromBinder. It is easier to show with an example:

var scrollTop = Kefir.fromBinder(function(emitter) {

  function emitScrollY() {
    emitter.emit(window.scrollY);
  }

  emitScrollY(); // here we are emitting the current value

  window.addEventListener('scroll', emitScrollY);

  return function() {
    window.removeEventListener('scroll', emitScrollY);
  };

}).toProperty();

scrollTop.log();
> [fromBinder.toProperty] <value:current> 0
> [fromBinder.toProperty] <value> 4
> [fromBinder.toProperty] <value> 9
> [fromBinder.toProperty] <value> 23

Defining a current value this way is better than scrollYStream.toProperty(getScrollY()) because in the fromBinder example the current value will be pulled at the moment of subscription, and with .toProperty(getScrollY()) it will be pulled at the moment of property creation, and might become obsolete when the property will be used.

The second benefit is that it makes possible to not lose current values when converting properties to streams and then back to properties. For example, combine always returns a stream (why?) but it'll still emit the current value. So one can do Kefir.combine([p1, p2], fn).toProperty(), and get a property combined from two other properties with the correct current value.

Also it's a good practice to convert all streams that might emit current values to properties by using the toProperty method. That should make your code more reliable as all subscribers will get current values. And it's just better semantically as current values should live in the properties.