观察者模式 VS 发布订阅者模式

前端开发
2020年10月23日
2618

在设计模式当中,观察者模式(Observer pattern)发布订阅模式(Publish-subscribe pattern) 这两种应该是很容易被混淆,甚至有些人把这两种模式视为同一种。然而

观察者模式

我们使用观察者模式,目的是为了实现松耦合(loosely coupled)。

举个粟子

我们有一个城市的气象站,每天都会更新天象信息,而每次天气更新时,都会在 update() 里面打印相关的天气信息。

javascript
interface WeatherInfo { date: Date, weather: string, temperature: number, } interface WeatherStation { update: (weatherInfo: WeatherInfo) => void } // 城市气象站 class GuangZhouStation implements WeatherStation { private city: string = '广州'; protected weathers: WeatherInfo[] = []; set weather (weatherInfo: WeatherInfo) { this.weathers.push(weatherInfo); // 当天气信息发生变化时,将会触发 update 方法 this.update(weatherInfo); } getDate (date: Date): string { const D = new Date(date), YY = D.getFullYear(), MM = D.getMonth() + 1, DD = D.getDate(); return `${YY}${(MM + '').padStart(2, '0')}${(DD + '').padStart(2, '0')}日`; } update (weatherInfo: WeatherInfo) { const { date, weather, } = weatherInfo; console.log(`${this.getDate(date)}${this.city},天气:${weather}。`); } } const station = new GuangZhouStation(); const d = new Date(); const currentDate = d.getDate(), endDate = d.getDate() + 10; for (let i = currentDate; i <= endDate; i ++) { d.setDate(i); station.weather = { date: d, weather: Math.random() > .5 ? '晴' : '雨', temperature: parseFloat((Math.random() * 5 + 20).toFixed(1)) } } // logs: // 2020年10月22日,广州,天气:晴。 // 2020年10月23日,广州,天气:晴。 // 2020年10月24日,广州,天气:晴。 // 2020年10月25日,广州,天气:晴。 // 2020年10月26日,广州,天气:雨。 // 2020年10月27日,广州,天气:雨。 // 2020年10月28日,广州,天气:雨。 // 2020年10月29日,广州,天气:雨。 // 2020年10月30日,广州,天气:晴。 // 2020年10月31日,广州,天气:晴。 // 2020年11月01日,广州,天气:雨。

有一天,我们希望能把当天的温度也获取到,那么我们就需要去修改 update() 里面的代码来实现这个功能。

js
update (weatherInfo: WeatherInfo) { const { date, weather, temperature, } = weatherInfo; console.log(`${this.getDate(date)}${this.city},天气:${weather},温度:${temperature}。`); }

日后想获取更多的信息,都需要去修改 update() 里面的代码,这就是紧耦合的坏处。

怎么解决呢?使用观察者模式,面向接口编程,实现松耦合。

167897b2ad53afb3.jpg

在观察者模式中,上面的粟子中的 update() 方法所在的实例对象,就是被观察者(Subject),它只需要维护一套 观察者(Observer) 的集合,这些 Observer 实现相同的接口,Subject 只需要知道,通知 Observer时需要调用哪一个统一方法就好了。

再举个粟子

js
interface Observer { name: string; notify: (name: string, action: string) => void; }; export class Undercover implements Observer { constructor (public name: string) {} /** * 卧底上报(发布)事件 * @param { string } name - 目标人物名称 * @param { string } action - 目标人物行为 */ notify (name: string, action: string) { console.log(`卧底【${this.name}】的汇报:目标人物【${name}】-> ${action}`); } } export class Hongxing { private observers: Observer[] = []; states: string[] = []; constructor (public name: string) {} /** * 派遣卧底(添加观察者) * @param observer - 卧底(观察者) */ addObserver (observer: Observer): void { this.observers.push(observer); console.log(`卧底【${observer.name}】已就位。`); } /** * 撤离卧底(移除观察者) * @param observer - 卧底(观察者) */ removeObserver (observer: Observer): void { const idx: number = this.observers.findIndex((item) => item === observer); if (idx !== -1) { this.observers.splice(idx, 1); console.log(`卧底【${observer.name}】已安全撤离。`); } } /** * 目标人物行为记录 * @param action */ setState (action: string) { this.states.push(action); this.notifyObservers(action); } /** * 所有卧底都会得到目标人物的行为记录,并上报 * @param action - 目标人物的行为 */ notifyObservers (action: string): void { this.observers.forEach((observer) => observer.notify(this.name, action)); } } const target: Hongxing = new Hongxing('山鸡哥'); const zhangsan = new Undercover('张三'), lisi = new Undercover('李四'); // 派出卧底人员接近山鸡哥 target.addObserver(zhangsan); target.addObserver(lisi); target.setState('早上去茶楼饮早茶。'); target.setState('中午去隔离街讲数。'); target.setState('晚上去劈友。'); // 发现浪费警力 // 撤走一名卧底 target.removeObserver(lisi); target.setState('卒.'); // logs: // 卧底【张三】已就位。 // 卧底【李四】已就位。 // 卧底【张三】的汇报:目标人物【山鸡哥】-> 早上去茶楼饮早茶。 // 卧底【李四】的汇报:目标人物【山鸡哥】-> 早上去茶楼饮早茶。 // 卧底【张三】的汇报:目标人物【山鸡哥】-> 中午去隔离街讲数。 // 卧底【李四】的汇报:目标人物【山鸡哥】-> 中午去隔离街讲数。 // 卧底【张三】的汇报:目标人物【山鸡哥】-> 晚上去劈友。 // 卧底【李四】的汇报:目标人物【山鸡哥】-> 晚上去劈友。 // 卧底【李四】已安全撤离。 // 卧底【张三】的汇报:目标人物【山鸡哥】-> 卒.

发布订阅者模式

大概很多人都觉得,发布订阅模式里面的 Publisher,就是观察者模式里面的 Subject(也就是上例中的 Hongxing),而 Subscriber,就是 Observer(上例中的 Undercover)。Publisher 变化时,就主动通知 Subscriber。

其实并不是。

在发布订阅模式里,发布者,并不会直接通知订阅者,换句话说,发布者和订阅者,彼此互不相识。

那么他们如何通讯呢?

答案是:通过第三者,也就是消息队列里面,我们常说的经纪人 Broker。

167897b4e2f5584a.jpg

发布者只需要告诉 Broker,我要发送的消息是:topic 为 AAAA 的消息;

订阅者只需要告诉 Broker,我要订阅的消息是:topic 为 AAAA 的消息;

于是,当 Broker 收到发布者发过来的消息,并且是 topic 为 AAAA 的消息时,就会把消息推送给订阅了 topic 为 AAAA 的订阅者。当然,也可能是订阅者自己来拉取。

举个粟子

js
nterface EventWrapper { [key: string]: Function[] }; class Broker { protected events: EventWrapper = {}; /** * 订阅 * @param topic - 订阅主题 * @param cb - 回调函数 */ subscribe (topic: string, cb: Function) { if (this.events[topic]) { this.events[topic].push(cb); } else { this.events[topic] = [cb]; } } unSubscribe (topic: string, cb: Function) { if (this.events[topic]) { // 移除相关的监听器 this.events[topic] = this.events[topic].filter((listener: Function) => { return cb !== listener }); } } /** * 发布 * @param topic - 订阅主题 * @param args - 参数集合 */ publish (topic: string, ...args: any[]) { if (this.events[topic]) { this.events[topic].forEach((listener) => { listener.call(this, ...args); }); } } } const broker = new Broker(); // 用户 A 通过 broke 订阅了千度网 sport 频道 function userA (something: string) { console.log('用户 A 收到消息:', something); } broker.subscribe('sport', userA); // 用户 B 通过 broke 订阅了千度网 music 频道 function userB (something: string) { console.log('用户 B 收到消息:', something); } broker.subscribe('music', userB); // 千度网 sport,music,finance 频道更新了消息 // 然后通过 broke 来发布消息 broker.publish('sport', '千度网更新了 sport 频道.'); broker.publish('music', '千度网更新了 music 频道.'); broker.publish('finance', '千度网更新了 finance 频道.'); // 订阅了这些频道的用户都可以接收到相关的消息 // 用户 A 收到消息: 千度网更新了 sport 频道. // 用户 B 收到消息: 千度网更新了 music 频道. // 此时,用户 B 觉得这着信息并没什么卵用 // 取消了订阅 broker.unSubscribe('music', userB); // 用户 C 订阅了 finance 频道 function userC (something: string) { console.log('用户 C 收到消息:', something); } broker.subscribe('finance', userC); // 千度网 sport,music,finance 频道更新了消息 // 然后通过 broke 来发布消息 broker.publish('sport', '千度网更新了 sport 频道.'); broker.publish('music', '千度网更新了 music 频道.'); broker.publish('finance', '千度网更新了 finance 频道.'); // 用户 A 收到消息: 千度网更新了 sport 频道. // 用户 C 收到消息: 千度网更新了 finance 频道.

也就是说,在发布订阅者模式里面,发布者和订阅者,不是松耦合,而是完全解耦的。

观察者模式 VS 发布订阅者模式

167897b76f2530d6.jpg

总结

从表面上看:

  • 观察者模式只有两个角色:观察者与被观察者;
  • 发布订阅者模式不仅有发布者和订阅者,还有一个往往会被我们忽略的经纪人Brokey。

往更深层次看:

  • 观察者与被观察者:是松耦合的关系;
  • 发布者与订阅者:完全不存在耦合。

从使用层面上看:

  • 观察者模式,多用于单个应用内容
  • 发布订阅者模式,则更多是一种跨应用的模式(cross-application pattern),比如我们常用的消息中间件。

最后

使用发布订阅者模式来实现一个 NodeJS 里面的 EventEmitter

js
/** * EventEmitter */ export interface EventCallback { (...args: any[]): any; listen?: () => any } export interface EventWrapper { [key: string]: (EventCallback)[] }; class EventEmitter { static defaultMaxListener: number = 10; protected _maxListeners: number | undefined; private _events: EventWrapper; constructor () { this._events = Object.create(null); } get events () { return this._events; } get eventNames () { return Object.keys(this._events); } /** * 获取最大监听数 */ get maxListener () { return this._maxListeners ? this._maxListeners : EventEmitter.defaultMaxListener; } /** * 设置最大监听数 * @param { number } num - 监听数 */ set maxListener (num: number) { this._maxListeners = num; } /** * 监听事件 * @param { string } type - 事件监听类型 * @param { EventCallback } cb - 回调函数 */ on (type: string, cb: EventCallback) { if (this._events[type]) { // 监听的事件不能超过设置的最大监听数 if (this._events[type].length >= this.maxListener) { console.warn('监听的事件不能超过设置的最大监听数。'); } else { this._events[type].push(cb); } } else { this._events[type] = [cb]; } } /** * 只监听一次事件 * @param { string } type - 事件监听类型 * @param { EventCallback } cb - 回调函数 */ once (type: string, cb: (...args: any[]) => any) { const that = this; function wrap (...args: any[]): void { cb(...args); // 当回调函数被调用之后,立即解除监听 that.off(type, wrap); } // 自定义属性 wrap.listen = cb; this.on(type, wrap); } /** * 发布事件 * @param { string } type - 事件类型 * @param { any[] } args - 传递的参数集合 */ emit (type: string, ...args: any[]) { if (this._events[type]) { this._events[type].forEach((listener) => { listener.call(this, ...args); }); } } /** * 移除事件监听 * @param { string } type - 事件监听类型 * @param { EventCallback } cb - 回调函数 */ off (type: string, cb: EventCallback): void { if (this._events[type]) { // 移除相关的监听器 this._events[type] = this._events[type].filter((listener) => { return cb !== listener && cb !== listener.listen; }); } } /** * 移除所有监听 */ clear (): void { this._events = Object.create(null); } } export default EventEmitter;

参考资料:Observer vs Pub-Sub pattern