Golang日志分析系统

早知如此绊人心,何如当初莫相识

面向对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package main

import (
"fmt"
"strings"
"time"
)

type LogProccess struct {
rc chan string
wc chan string
path string
influxDBDsn string
}

func (l * LogProccess) ReadFromFile() {

line := "message"
l.rc <-line
}

func (l *LogProccess) Proccess() {
data := <-l.rc
l.wc <- strings.ToUpper(data)
}

func (l *LogProccess) WriteToImfluxDB() {
fmt.Println(<-l.wc)
}

func main() {

lp := &LogProccess{
rc: make(chan string),
wc: make(chan string),
path: "/tmp/access.log",
influxDBDsn: "",
}

go lp.ReadFromFile()

go lp.Proccess()

go lp.WriteToImfluxDB()

time.Sleep(time.Second * 1)
}

接口实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package main

import (
"fmt"
"strings"
"time"
)

// 接口
type Reader interface {
Read(rc chan string)
}

type Writer interface {
Write(wc chan string)
}

// 类
type ReadFromFile struct {
path string
}

type WriteToInfluxDB struct {
influxDBDsn string
}

// 类里面的方法
func (r *ReadFromFile) Read (rc chan string) {
line := "message"
rc <- line
}

func (w *WriteToInfluxDB) Write(wc chan string) {
fmt.Println(<-wc)
}

type LogProccess struct {
rc chan string
wc chan string
read Reader // 接口
write Writer // 接口
}



func (l *LogProccess) Proccess() {
data := <-l.rc
l.wc <- strings.ToUpper(data)
}


func main() {


r := &ReadFromFile{
path: "gggg",
}

w := &WriteToInfluxDB{
influxDBDsn: "hh",
}


lp := &LogProccess{
rc: make(chan string),
wc: make(chan string),
read: r,
write: w,
}

go lp.read.Read(lp.rc)

go lp.Proccess()

go lp.write.Write(lp.wc)

time.Sleep(time.Second * 1)
}

写文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package main

import (
"bufio"
"fmt"
"io"
"os"
"strings"
"time"
)

// 接口
type Reader interface {
Read(rc chan []byte)
}

type Writer interface {
Write(wc chan string)
}

// 类
type ReadFromFile struct {
path string
}

type WriteToInfluxDB struct {
influxDBDsn string
}

// 类里面的方法
func (r *ReadFromFile) Read (rc chan []byte) {

f, err := os.Open(r.path)
if err != nil {
panic(err)
}

_, _ = f.Seek(0, 2) // 从文件末尾逐行读取文件内容
rd := bufio.NewReader(f)

for {
line, err := rd.ReadBytes('\n')
if err == io.EOF {
time.Sleep(500 * time.Millisecond)
fmt.Println("=============")
continue
} else if err != nil {
panic(err)
}

rc <-line[:len(line)-1]
}

}

func (w *WriteToInfluxDB) Write(wc chan string) {
//fmt.Println(<-wc)

for v := range wc {
fmt.Println(v)
}
}

type LogProccess struct {
rc chan []byte
wc chan string
read Reader // 接口
write Writer // 接口
}



func (l *LogProccess) Proccess() {

for v:= range l.rc {
l.wc <- strings.ToUpper(string(v))
}
}


func main() {

r := &ReadFromFile{
path: "./access.log",
}

w := &WriteToInfluxDB{
influxDBDsn: "hh",
}


lp := &LogProccess{
rc: make(chan []byte),
wc: make(chan string),
read: r,
write: w,
}

go lp.read.Read(lp.rc)

go lp.Proccess()

go lp.write.Write(lp.wc)

time.Sleep(time.Second * 100)
}