观察者模式 VS 发布订阅者模式
在设计模式当中,观察者模式(Observer pattern) 与 发布订阅模式(Publish-subscribe pattern) 这两种应该是很容易被混淆,甚至有些人把这两种模式视为同一种。然而
观察者模式
我们使用观察者模式,目的是为了实现松耦合(loosely coupled)。
举个粟子
我们有一个城市的气象站,每天都会更新天象信息,而每次天气更新时,都会在 update()
里面打印相关的天气信息。
javascript
interface WeatherInfo {
date: Date,
weather: string,
temperature: number,
}
interface WeatherStation {
update: (weatherInfo: WeatherInfo) => void
}
// 城市气象站
class GuangZhouStation implements WeatherStation {
private city: string = '广州';
protected weathers: WeatherInfo[] = [];
set weather (weatherInfo: WeatherInfo) {
this.weathers.push(weatherInfo);
// 当天气信息发生变化时,将会触发 update 方法
this.update(weatherInfo);
}
getDate (date: Date): string {
const D = new Date(date),
YY = D.getFullYear(),
MM = D.getMonth() + 1,
DD = D.getDate();
return `${YY}年${(MM + '').padStart(2, '0')}月${(DD + '').padStart(2, '0')}日`;
}
update (weatherInfo: WeatherInfo) {
const {
date,
weather,
} = weatherInfo;
console.log(`${this.getDate(date)},${this.city},天气:${weather}。`);
}
}
const station = new GuangZhouStation();
const d = new Date();
const currentDate = d.getDate(),
endDate = d.getDate() + 10;
for (let i = currentDate; i <= endDate; i ++) {
d.setDate(i);
station.weather = {
date: d,
weather: Math.random() > .5 ? '晴' : '雨',
temperature: parseFloat((Math.random() * 5 + 20).toFixed(1))
}
}
// logs:
// 2020年10月22日,广州,天气:晴。
// 2020年10月23日,广州,天气:晴。
// 2020年10月24日,广州,天气:晴。
// 2020年10月25日,广州,天气:晴。
// 2020年10月26日,广州,天气:雨。
// 2020年10月27日,广州,天气:雨。
// 2020年10月28日,广州,天气:雨。
// 2020年10月29日,广州,天气:雨。
// 2020年10月30日,广州,天气:晴。
// 2020年10月31日,广州,天气:晴。
// 2020年11月01日,广州,天气:雨。
有一天,我们希望能把当天的温度也获取到,那么我们就需要去修改 update()
里面的代码来实现这个功能。
js
update (weatherInfo: WeatherInfo) {
const {
date,
weather,
temperature,
} = weatherInfo;
console.log(`${this.getDate(date)},${this.city},天气:${weather},温度:${temperature}。`);
}
日后想获取更多的信息,都需要去修改 update()
里面的代码,这就是紧耦合的坏处。
怎么解决呢?使用观察者模式,面向接口编程,实现松耦合。
在观察者模式中,上面的粟子中的 update()
方法所在的实例对象,就是被观察者(Subject),它只需要维护一套 观察者(Observer) 的集合,这些 Observer 实现相同的接口,Subject 只需要知道,通知 Observer时需要调用哪一个统一方法就好了。
再举个粟子
js
interface Observer {
name: string;
notify: (name: string, action: string) => void;
};
export class Undercover implements Observer {
constructor (public name: string) {}
/**
* 卧底上报(发布)事件
* @param { string } name - 目标人物名称
* @param { string } action - 目标人物行为
*/
notify (name: string, action: string) {
console.log(`卧底【${this.name}】的汇报:目标人物【${name}】-> ${action}`);
}
}
export class Hongxing {
private observers: Observer[] = [];
states: string[] = [];
constructor (public name: string) {}
/**
* 派遣卧底(添加观察者)
* @param observer - 卧底(观察者)
*/
addObserver (observer: Observer): void {
this.observers.push(observer);
console.log(`卧底【${observer.name}】已就位。`);
}
/**
* 撤离卧底(移除观察者)
* @param observer - 卧底(观察者)
*/
removeObserver (observer: Observer): void {
const idx: number = this.observers.findIndex((item) => item === observer);
if (idx !== -1) {
this.observers.splice(idx, 1);
console.log(`卧底【${observer.name}】已安全撤离。`);
}
}
/**
* 目标人物行为记录
* @param action
*/
setState (action: string) {
this.states.push(action);
this.notifyObservers(action);
}
/**
* 所有卧底都会得到目标人物的行为记录,并上报
* @param action - 目标人物的行为
*/
notifyObservers (action: string): void {
this.observers.forEach((observer) => observer.notify(this.name, action));
}
}
const target: Hongxing = new Hongxing('山鸡哥');
const zhangsan = new Undercover('张三'),
lisi = new Undercover('李四');
// 派出卧底人员接近山鸡哥
target.addObserver(zhangsan);
target.addObserver(lisi);
target.setState('早上去茶楼饮早茶。');
target.setState('中午去隔离街讲数。');
target.setState('晚上去劈友。');
// 发现浪费警力
// 撤走一名卧底
target.removeObserver(lisi);
target.setState('卒.');
// logs:
// 卧底【张三】已就位。
// 卧底【李四】已就位。
// 卧底【张三】的汇报:目标人物【山鸡哥】-> 早上去茶楼饮早茶。
// 卧底【李四】的汇报:目标人物【山鸡哥】-> 早上去茶楼饮早茶。
// 卧底【张三】的汇报:目标人物【山鸡哥】-> 中午去隔离街讲数。
// 卧底【李四】的汇报:目标人物【山鸡哥】-> 中午去隔离街讲数。
// 卧底【张三】的汇报:目标人物【山鸡哥】-> 晚上去劈友。
// 卧底【李四】的汇报:目标人物【山鸡哥】-> 晚上去劈友。
// 卧底【李四】已安全撤离。
// 卧底【张三】的汇报:目标人物【山鸡哥】-> 卒.
发布订阅者模式
大概很多人都觉得,发布订阅模式里面的 Publisher,就是观察者模式里面的 Subject(也就是上例中的 Hongxing),而 Subscriber,就是 Observer(上例中的 Undercover)。Publisher 变化时,就主动通知 Subscriber。
其实并不是。
在发布订阅模式里,发布者,并不会直接通知订阅者,换句话说,发布者和订阅者,彼此互不相识。
那么他们如何通讯呢?
答案是:通过第三者,也就是消息队列里面,我们常说的经纪人 Broker。
发布者只需要告诉 Broker,我要发送的消息是:topic 为 AAAA 的消息;
订阅者只需要告诉 Broker,我要订阅的消息是:topic 为 AAAA 的消息;
于是,当 Broker 收到发布者发过来的消息,并且是 topic 为 AAAA 的消息时,就会把消息推送给订阅了 topic 为 AAAA 的订阅者。当然,也可能是订阅者自己来拉取。
举个粟子
js
nterface EventWrapper {
[key: string]: Function[]
};
class Broker {
protected events: EventWrapper = {};
/**
* 订阅
* @param topic - 订阅主题
* @param cb - 回调函数
*/
subscribe (topic: string, cb: Function) {
if (this.events[topic]) {
this.events[topic].push(cb);
} else {
this.events[topic] = [cb];
}
}
unSubscribe (topic: string, cb: Function) {
if (this.events[topic]) {
// 移除相关的监听器
this.events[topic] = this.events[topic].filter((listener: Function) => {
return cb !== listener
});
}
}
/**
* 发布
* @param topic - 订阅主题
* @param args - 参数集合
*/
publish (topic: string, ...args: any[]) {
if (this.events[topic]) {
this.events[topic].forEach((listener) => {
listener.call(this, ...args);
});
}
}
}
const broker = new Broker();
// 用户 A 通过 broke 订阅了千度网 sport 频道
function userA (something: string) {
console.log('用户 A 收到消息:', something);
}
broker.subscribe('sport', userA);
// 用户 B 通过 broke 订阅了千度网 music 频道
function userB (something: string) {
console.log('用户 B 收到消息:', something);
}
broker.subscribe('music', userB);
// 千度网 sport,music,finance 频道更新了消息
// 然后通过 broke 来发布消息
broker.publish('sport', '千度网更新了 sport 频道.');
broker.publish('music', '千度网更新了 music 频道.');
broker.publish('finance', '千度网更新了 finance 频道.');
// 订阅了这些频道的用户都可以接收到相关的消息
// 用户 A 收到消息: 千度网更新了 sport 频道.
// 用户 B 收到消息: 千度网更新了 music 频道.
// 此时,用户 B 觉得这着信息并没什么卵用
// 取消了订阅
broker.unSubscribe('music', userB);
// 用户 C 订阅了 finance 频道
function userC (something: string) {
console.log('用户 C 收到消息:', something);
}
broker.subscribe('finance', userC);
// 千度网 sport,music,finance 频道更新了消息
// 然后通过 broke 来发布消息
broker.publish('sport', '千度网更新了 sport 频道.');
broker.publish('music', '千度网更新了 music 频道.');
broker.publish('finance', '千度网更新了 finance 频道.');
// 用户 A 收到消息: 千度网更新了 sport 频道.
// 用户 C 收到消息: 千度网更新了 finance 频道.
也就是说,在发布订阅者模式里面,发布者和订阅者,不是松耦合,而是完全解耦的。
观察者模式 VS 发布订阅者模式
总结
从表面上看:
- 观察者模式只有两个角色:观察者与被观察者;
- 发布订阅者模式不仅有发布者和订阅者,还有一个往往会被我们忽略的经纪人Brokey。
往更深层次看:
- 观察者与被观察者:是松耦合的关系;
- 发布者与订阅者:完全不存在耦合。
从使用层面上看:
- 观察者模式,多用于单个应用内容
- 发布订阅者模式,则更多是一种跨应用的模式(cross-application pattern),比如我们常用的消息中间件。
最后
使用发布订阅者模式来实现一个 NodeJS
里面的 EventEmitter
。
js
/**
* EventEmitter
*/
export interface EventCallback {
(...args: any[]): any;
listen?: () => any
}
export interface EventWrapper {
[key: string]: (EventCallback)[]
};
class EventEmitter {
static defaultMaxListener: number = 10;
protected _maxListeners: number | undefined;
private _events: EventWrapper;
constructor () {
this._events = Object.create(null);
}
get events () {
return this._events;
}
get eventNames () {
return Object.keys(this._events);
}
/**
* 获取最大监听数
*/
get maxListener () {
return this._maxListeners
? this._maxListeners
: EventEmitter.defaultMaxListener;
}
/**
* 设置最大监听数
* @param { number } num - 监听数
*/
set maxListener (num: number) {
this._maxListeners = num;
}
/**
* 监听事件
* @param { string } type - 事件监听类型
* @param { EventCallback } cb - 回调函数
*/
on (type: string, cb: EventCallback) {
if (this._events[type]) {
// 监听的事件不能超过设置的最大监听数
if (this._events[type].length >= this.maxListener) {
console.warn('监听的事件不能超过设置的最大监听数。');
} else {
this._events[type].push(cb);
}
} else {
this._events[type] = [cb];
}
}
/**
* 只监听一次事件
* @param { string } type - 事件监听类型
* @param { EventCallback } cb - 回调函数
*/
once (type: string, cb: (...args: any[]) => any) {
const that = this;
function wrap (...args: any[]): void {
cb(...args);
// 当回调函数被调用之后,立即解除监听
that.off(type, wrap);
}
// 自定义属性
wrap.listen = cb;
this.on(type, wrap);
}
/**
* 发布事件
* @param { string } type - 事件类型
* @param { any[] } args - 传递的参数集合
*/
emit (type: string, ...args: any[]) {
if (this._events[type]) {
this._events[type].forEach((listener) => {
listener.call(this, ...args);
});
}
}
/**
* 移除事件监听
* @param { string } type - 事件监听类型
* @param { EventCallback } cb - 回调函数
*/
off (type: string, cb: EventCallback): void {
if (this._events[type]) {
// 移除相关的监听器
this._events[type] = this._events[type].filter((listener) => {
return cb !== listener && cb !== listener.listen;
});
}
}
/**
* 移除所有监听
*/
clear (): void {
this._events = Object.create(null);
}
}
export default EventEmitter;