SOURCE

function Observer(observer) {
  if (! (this instanceof Observer)) {
    throw Error('Observer must use new to construct');
  }
  this._raw = observer;
  this._complete = false;
  this.observer = {};
  this.normalize(observer);
  this._proxy();
}

function noop() {

}

Observer.prototype.normalize = function(observer) {
  var thiz = this;
  if (typeof observer === 'object') {
    this.observer = {
      next: function() {
        if (!thiz._complete) {
          return observer.next ? observer.next.apply(thiz, arguments) : noop();
        }
      },
      complete: function() {
        thiz._complete = true;
        return observer.complete ? observer.complete.apply(thiz, arguments) : noop();
      },
      error: observer.error || noop
    }
    return;
  }
  args = [].slice.call(arguments);
  return this.observer = {
    next: (args[0] && typeof args[0] === 'function') ? args[0] : noop,
    complete: (args[1] && typeof args[1] === 'function') ? args[1] : noop,
    error: (args[2] && typeof args[2] === 'function') ? args[2] : noop
  };
}

Observer.prototype._proxy = function() {
	for (let key in this.observer) {
    if (this.observer.hasOwnProperty(key)) {
      Object.defineProperty(this, key, {
        get: function() {
          return this.observer[key];
        }
      });
    }
  }
}

function Observable(generator) {
  if (! (this instanceof Observable)) {
    throw new Error('Observer must use new to construct');
  }
  this._generator = generator;
}

Observable.prototype.subscribe = function(observer) {
  observer = new Observer(observer);
  this._generator.call(this, observer);
}

Observable.create = function(generator) {
  return new Observable(generator);
}


var ob = Observable.create(function(observer) {
    observer.next(1);
    observer.next(2);
    observer.complete();
    observer.next(3);
});

ob.subscribe({
    next: function(data) {
        console.log(data);
    },
    complete: function() {
        console.log('complete');
    },
    error: function(error) {
        console.log(error);
    }
});

console 命令行工具 X clear

                    
>
console