Observer

1. 观察者模式简介

观察者模式描述了一对多的关系,让多个观察者监测到主题,当主题发生改变的时候,能够通知到观察者,让其能更新自己。这么说挺抽象的,我举一些实际应用的例子,如果你有一些开发经验的话,这些应用你肯定也用过。比如redis的发布-订阅功能、Java Swing编程里的源-监听器。

观察者模式中有两个概念比较重要,主题(Subject)又叫被观察者,观察者(Observer)。下图截选自百度百科
title

  • Observer - 观察者,其中定义好update方法
  • Subjedt - 抽象的主题,他是个抽象类,其中维护了Observers
    • 新增Observer
    • 删除Observer
    • 通知Observers的update方法
  • ConcreteSubject - 具体的主题
  • ConcreateObserver - 具体的观察者

代码实现

1
2
3
4
5
6
package observer;
// 观察者
public interface Observer {

void update();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package observer;

import java.util.Enumeration;
import java.util.Vector;
// 主题
public abstract class Subject {

private Vector observerVector = new Vector();

// 注册观察者
public void registerOberver(Observer observer) {
observerVector.add(observer);
}

// 销毁观察者
public void detach(Observer observer) {
observerVector.removeElement(observer);
}

// 通知所有注册的观察者
public void notifyObservers() {

Enumeration elements = observers();
while (elements.hasMoreElements()){
((Observer) elements.nextElement()).update();
}
}

public Enumeration observers() {
return ((Vector) observerVector.clone()).elements();
}

}
1
2
3
4
5
6
7
8
package observer;

public class ConcreteObserver implements Observer {

public void update() {
System.out.println("I am notified.");
}
}
1
2
3
4
5
6
7
8
9
10
11
package observer;

public class ConcreteSubject extends Subject{
private String state;

// 改变主题的方法。
public void change(String newState){
state = newState;
this.notifyObservers();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package observer;

public class Client {

public static void main(String[] args) {

ConcreteSubject subject = new ConcreteSubject();
Observer observer = new ConcreteObserver();
subject.registerOberver(observer);

subject.change("123");
subject.change("456");


}
}

2. Java中的观察者模式

Java提供了实现观察者模式的方法,其中两个核心类,Observable和Observer

  • Observer - 观察者,对应我们写的接口类Observer
  • Observable - 主题,对应我们写的抽象类Subject

下面以问题:公鸡见到太阳升起,就打鸣为例,进行建模

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
package observer.java.example1;

import java.util.Observable;
public class Sun extends Observable {

public void rise(){
System.out.println("太阳升起");
this.setChanged();
// 通知所有的观察者
this.notifyObservers();
}
}
1
2
3
4
5
6
7
8
9
10
11
package observer.java.example1;

import java.util.Observable;
import java.util.Observer;

public class Cock implements Observer {

public void update(Observable o, Object arg) {
System.out.println("公鸡打鸣");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
package observer.java.example1;

public class Client {

public static void main(String[] args) {
Sun sun = new Sun();
Cock cock = new Cock();
sun.addObserver(cock);


sun.rise();
}
}

3. 实际应用

最近写多线程代码是遇到了这样的问题,线程在用JDBC链接HiveServer2时因server不稳定,常常出现Timeout问题,联系运营发现,服务会在某时间段内重启,可能会有服务波动。

针对此问题,简单的思路就是在指定时间段内,发现timeout就重启线程。这里有两个类,观察者Restartor,线程JdbcQuerier。

  • Restartor - 观察者,主要负责观察线程运行状态,并完成重启线程操作
  • JdbcQuerier - 被观察者,该类继承Runnable,在发生异常时,改变状态。

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package observer.java.example2;

import java.util.Observable;
import java.util.Observer;
import java.util.Random;

/**
* @author zyh
* @Description: 观察者,用于重启线程
*/
public class Restartor implements Observer {

public void update(Observable o, Object arg) {
System.out.println("重启线程");
if(condition1()) {
System.out.println("检测到当前时间为9-10点,重启线程");
new Thread(((JdbcQuerier) o)).start();
}else{
o = null;
System.out.println("检测到当前时间不是9-10点,不重启线程");
}
}

// 条件1: 这里模拟重启条件,假如能被2整除就是是9-10点,9-10点也就是服务重启的时间,同样的你可以添加多个条件。
public boolean condition1(){
int i = new Random().nextInt();
if(i % 2 == 0){
return true;
}
return false;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51

package observer.java.example2;

import java.sql.SQLException;
import java.util.Observable;
import java.util.Random;
import java.util.concurrent.TimeoutException;

/**
* @author zyh
* @Description: jdbc查询者,被观察对象
*/
public class JdbcQuerier extends Observable implements Runnable {


public void run() {
System.out.println(Thread.currentThread().getName() + ":" + "线程运行");
try{
// 查询数据库
queryData();
}catch (TimeoutException e1){
System.out.println(Thread.currentThread().getName() + ":" + "检测到Timeout异常");
this.setChanged();
this.notifyObservers();
}catch (SQLException e2){
System.out.println(Thread.currentThread().getName() + ":" + "检测到SQLException异常");
}finally {
System.out.println(Thread.currentThread().getName() + ":" + "线程终止");
}
}

private void queryData() throws SQLException, TimeoutException {
try{
System.out.println(Thread.currentThread().getName() + ":" + "获取连接");
System.out.println(Thread.currentThread().getName() + ":" + "select * from tableName");
int i = new Random().nextInt();
if(i % 2 == 0)
throw new TimeoutException();
else
throw new SQLException();
}catch (SQLException e1){
throw e1;
}catch (TimeoutException e2){
throw e2;
}finally {
System.out.println(Thread.currentThread().getName() + ":" + "关闭连接");
}

}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package observer.java.example2;

/**
* @author zyh
* @Description:
*/
public class Client {

public static void main(String[] args) throws InterruptedException {
// 被观察者
JdbcQuerier jdbcQuerier = new JdbcQuerier();
// 观察者
Restartor restartor = new Restartor();
//注册观察者
jdbcQuerier.addObserver(restartor);

// 运行线程
Thread t1 = new Thread(jdbcQuerier);

t1.start();
while(true){
Thread.sleep(1000L);
}

}
}

方便学习提供源码