flume-kafka-storm-redis集群搭建

准备

zookeeper3.4.8、kafka0.9.0.1、jdk8、flume1.6.0、storm0.9.6、redis3.2

配置flume

配置flume/conf/tomcat.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
agent1.sources = ngrinder
agent1.sources.ngrinder.type = exec
agent1.sources.ngrinder.command = tail -F /data/boot/logcase/logcase.log
agent1.sources.ngrinder.channels = mc1

agent1.channels = mc1
agent1.channels.mc1.type = memory
agent1.channels.mc1.capacity = 100

agent1.sinks = avro-sink
agent1.sinks.avro-sink.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.avro-sink.topic = mytopic
agent1.sinks.avro-sink.brokerList = 127.0.0.1:9092
agent1.sinks.avro-sink.requiredAcks = 1
agent1.sinks.avro-sink.batchSize = 20
agent1.sinks.avro-sink.channel = mc1

配置kafka

kafka中建立topic name是mytopic(可以随意,要与tomcat.conf中的sink.topic一致)

日志

启动两个服务循环打印日志到/data/boot/logcase/logcase.log

storm配置

本地调试环境

pom配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.6</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version> 0.9.6</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-redis</artifactId>
<version> 2.0.0-SNAPSHOT</version>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.1</version>
<exclusions>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.netflix.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>1.3.3</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.netflix.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>1.3.3</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version> 2.8.0</version>
</dependency>
<dependency>
<groupId>com.netflix.curator</groupId>
<artifactId>curator-test</artifactId>
<version>1.3.3</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId> json-simple</artifactId>
<version>1.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.9.0</version>
<scope>test</scope>
</dependency>
</dependencies>

Topology配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public static void main(String[] args) {
try{
String kafkaZookeeper = "127.0.0.1:2181";
BrokerHosts brokerHosts = new ZkHosts(kafkaZookeeper);
SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, "mytopic", "/brokers", "id");
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
kafkaConfig.zkServers = ImmutableList.of("127.0.0.1");
kafkaConfig.zkPort = 2181;


//kafkaConfig.forceFromStart = true;

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new KafkaSpout(kafkaConfig), 1);
builder.setBolt("split", new SplitLogTestBolt(),1).allGrouping("spout");



Config config = new Config();
config.setDebug(true);
config.setNumAckers(0);
if(args!=null && args.length > 0) {
config.setNumWorkers(2);

StormSubmitter.submitTopology(args[0], config, builder.createTopology());
} else {
config.setMaxTaskParallelism(1);

LocalCluster cluster = new LocalCluster();
cluster.submitTopology("special-topology", config, builder.createTopology());


}
}catch (Exception e) {
e.printStackTrace();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
public class SplitLogTestBolt implements IRichBolt {


private static final long serialVersionUID = -8159690997705696712L;
private OutputCollector collector;
private Jedis jedis;

@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
jedis = new Jedis("127.0.0.1", 6379);
}

@Override
public void execute(Tuple input) {
try {
String data = input.getString(0);
Date curDate = new Date();
Long logInfoNo = jedis.incr("INFO:num." + curDate);//日志总数
jedis.set("INFO:content." + curDate + "." + logInfoNo, data);//INFO日志内容
/*if (data != null && data.length() > 0) {
String ragex = "(.*?)\\[(.*?)\\]\\[(.*?)\\]\\[(.*?)\\] - (.*)";
Pattern p = Pattern.compile(ragex);
Matcher m = p.matcher(data);
if (m.matches()) {
String date = m.group(1);
String level = m.group(2);
String className = m.group(3);
String threadId = m.group(4);
String message = m.group(5);
if (level.equals("ERROR")) {
errorDo(date, level, className, threadId, message);
}
if (level.equals("INFO ")) {
infoDo(date, level, className, threadId, message);
}
}
}*/

} catch (Exception e) {
e.printStackTrace();
collector.ack(input);
}
collector.ack(input);
}

@Override
public void cleanup() {

}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {

}

@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}

private void infoDo(String date, String level, String className, String threadId, String message) {
try {
String curDate = date.substring(0, 10).replaceAll("-", "");
Long curDateL = null;
Long toHourL = null;
Date temp;
try {
temp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").parse(date);
curDateL = temp.getTime();
Calendar calendar = Calendar.getInstance();
calendar.setTime(temp);
calendar.set(Calendar.MINUTE,0);
calendar.set(Calendar.SECOND,0);
calendar.set(Calendar.MILLISECOND,0);
toHourL = calendar.getTimeInMillis();

} catch (ParseException e) {
e.printStackTrace();
}
if (level.equals("INFO ")) {
// String ragex = "方法名:(.*?),消耗时间:(.*?),当前用户:(.*?),传入参数:(.*?),返回数据:(.*)";
String ragex = "方法名:(.*?),消耗时间:(.*?),传入参数:(.*?),返回数据:(.*)";
Pattern p = Pattern.compile(ragex);
Matcher m = p.matcher(message);
if (m.matches()) {
System.out.println("789");
String functionName = className + "." + m.group(1);
String useTime = m.group(2);
// String userId = m.group(3);
Long durationTime = DateUtils.durationTime(useTime);
//日志信息记录
Long logInfoNo = jedis.incr("INFO:num." + curDate);//日志总数
jedis.set("INFO:content." + curDate + "." + logInfoNo, date + " " + className + " " + message);//INFO日志内容
jedis.zadd("INFO:infoTime." + curDate, curDateL, logInfoNo.toString());//日志按时间排序
//user信息记录
// Long userVisitTime = jedis.incr("user:visitTime." + curDate + "." + userId);//用户当天登录次数
// jedis.sadd("user:visitTimeSet." + curDate + "." + userId, curDateL.toString() + "." + logInfoNo.toString());//用户每天访问时间列表
// jedis.zincrby("user:function." + curDate + "." + userId, 1, functionName + "." + logInfoNo.toString());//用户访问方法次数

//user信息记录
Long userVisitTime = jedis.incr("user:visitTime." + curDate + "." + 123);//用户当天登录次数
jedis.zincrby("user:visitTimeSet." + curDate,1, toHourL.toString());//用户每天访问时间列表
jedis.zincrby("user:dateVisitTime." + curDate, 1, "123");//当天用户访问次数

//function信息记录
jedis.zincrby("function:" + curDate, 1, functionName);//功能点访问次数
if (durationTime > 100) {
jedis.zadd("functionList:" + curDate, durationTime, functionName);//功能点访问列表
}
//pv
jedis.incr("pv:num." + curDate);
//uv
if (userVisitTime == 1) {
jedis.incr("uv:" + curDate);
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}

private void errorDo(String date, String level, String className, String threadId, String message) {
try {

String curDate = date.substring(0, 10).replaceAll("-", "");
Long curDateL = null;
try {
curDateL = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").parse(date).getTime();
} catch (ParseException e) {
e.printStackTrace();
}

if (level.equals("ERROR")) {
// String ragex = "方法名:(.*?),消耗时间:(.*?),当前用户:(.*?),传入参数:(.*?),返回数据:(.*)";
String ragex = "方法名:(.*?),消耗时间:(.*?),传入参数:(.*?),返回数据:(.*)";
Pattern p = Pattern.compile(ragex);
Matcher m = p.matcher(message);
if (m.matches()) {
System.out.println("111");
String functionName = className + m.group(1);
String useTime = m.group(2);
// String userId = m.group(3);
Long durationTime = DateUtils.durationTime(useTime);
Long errorInfoNo = jedis.incr("ERROR:num." + curDate);
Long countErrFunc = jedis.sadd("ERROR:function." + curDate + "." + functionName, errorInfoNo.toString());
// Long userVisitTime = jedis.incr("user:visitTime." + curDate + "." + userId);
Long userVisitTime = jedis.incr("user:visitTime." + curDate + "." + 345);
jedis.set("ERROR:content." + curDate + "." + errorInfoNo, date + " " + className + " " + message);//ERROR日志内容
jedis.incr("pv:num." + curDate);
jedis.zadd("user:visitError." + curDate, curDateL, errorInfoNo.toString());
if (userVisitTime == 1) {
jedis.incr("uv:" + curDate);
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}

public static void main(String[] args) {
Date temp = new Date();
Calendar calendar = Calendar.getInstance();
calendar.setTime(temp);
calendar.set(Calendar.MINUTE,0);
calendar.set(Calendar.SECOND,0);
calendar.set(Calendar.MILLISECOND,0);
long toHourL = calendar.getTimeInMillis();
Date newdate = calendar.getTime();
System.out.println(toHourL);
}
}

##启动打印日志的服务器

##启动zk、kafka、redis

##启动flume

1
bin/flume-ng agent -c conf -f conf/tomcat.conf -n agent1 -Dflume.root.logger=DEBUG,console

##OK

zhang dong wechat
关注我的微信来交流技术问题吧!