-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathmain.go
326 lines (286 loc) · 8.34 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
/*
log2amqp
A simple daemon that reads a file (tail -f style)
and sends every line to an AMQP exchange.
Intended for nginx access logs -- so it does some special
character encoding/escaping for that format.
2014, DECK36 GmbH & Co. KG, <[email protected]>
*/
package main
import (
"flag"
"fmt"
"github.com/ActiveState/tail"
"github.com/streadway/amqp"
"io/ioutil"
"log"
"os"
"os/signal"
"syscall"
"time"
)
const thisVersion = "0.3"
const thisProgram = "log2amqp"
// indicate what variables are our payload data,
// to improve readability (hopefully)
type Logline string
// all command line options
type CommandLineOptions struct {
filename *string
uri *string
exchangeName *string
exchangeType *string
routingKey *string
verbose *bool
nofollow *bool
}
var options CommandLineOptions
func init() {
// this does not look right...
// I am looking for a pattern how to group command line arguments in a struct
options = CommandLineOptions{
flag.String("file", "/var/log/syslog", "filename to watch"),
flag.String("uri", "amqp://user:[email protected]:5672/vhost", "AMQP URI"),
flag.String("exchange", "logtest", "Durable AMQP exchange name"),
flag.String("exchange-type", "fanout", "Exchange type - direct|fanout|topic|x-custom"),
flag.String("key", "nginxlog", "AMQP routing key"),
flag.Bool("v", false, "Verbose output"),
flag.Bool("n", false, "Quit after file is read, do not wait for more data, do not read/write state"),
}
flag.Parse()
}
func readFileInode(fname string) uint64 {
var stat syscall.Stat_t
err := syscall.Stat(fname, &stat)
if err != nil {
return 0
} else {
return stat.Ino
}
}
// readStateFile gets previously saved file stat, i.e. inode and offset
func readStateFile(fname string, statefile string, current_inode uint64) (offset int64) {
var time int64
var inode uint64
offset = 0
stateline, err := ioutil.ReadFile(statefile)
if err != nil {
return // no state
}
n, err := fmt.Sscanf(string(stateline), "Offset %d Time %d Inode %d\n",
&offset, &time, &inode)
if n != 3 || err != nil {
log.Printf("ignoring statefile, cannot parse data in %s: %v", statefile, err)
return
}
if current_inode != inode {
log.Printf("not resuming file %s, changed inode from %d to %d\n",
fname, inode, current_inode)
return
}
log.Printf("resume logfile tail of file %s (inode %d) at offset %d\n",
fname, inode, offset)
return offset
}
// write inode and offset to continue later
func writeStateFile(statefile string, inode uint64, offset int64) {
data := []byte(fmt.Sprintf("Offset %d Time %d Inode %d\n",
offset, time.Now().UTC().Unix(), inode))
ioutil.WriteFile(statefile, data, 0664)
}
// readLogsFromFile reads log lines from file and send them to `queue`
// notify `shutdown` when file is completely read
func readLogsFromFile(fname string, queue chan<- Logline, shutdown chan<- string, savestate <-chan bool) {
var statefile string
var offset int64
var inode uint64
var doFollowFile bool = !*options.nofollow
if *options.verbose {
log.Printf("readLogsFromFile: dofollow=%v", doFollowFile)
}
if doFollowFile {
statefile = fname + ".state"
inode = readFileInode(fname)
offset = readStateFile(fname, statefile, inode)
}
// setup
config := tail.Config{
Follow: doFollowFile,
ReOpen: doFollowFile,
MustExist: true,
Logger: tail.DiscardingLogger,
Location: &tail.SeekInfo{
Offset: offset,
Whence: 0,
},
}
t, err := tail.TailFile(fname, config)
if err != nil {
shutdown <- fmt.Sprintf("cannot tail file %s: %v", fname, err)
} else if *options.verbose {
log.Printf("opened log file %s", fname)
}
// now just sleep and wait for input and control channel
for {
select {
case line := <-t.Lines:
if line != nil {
queue <- Logline(line.Text)
} else {
shutdown <- "Logfile closed"
return
}
case <-savestate:
offset, _ := t.Tell()
if doFollowFile {
writeStateFile(statefile, inode, offset)
}
if *options.verbose {
log.Printf("reading %s, now at offset %d", fname, offset)
}
}
}
}
// open AMQP channel
func openAmqpChannel(amqpURI string, exchange string, exchangeType string, routingKey string) (connection *amqp.Connection, channel *amqp.Channel, err error) {
// this is the important part:
if *options.verbose {
log.Println("connecting to ", amqpURI, "...")
}
amqpConfig := amqp.Config{
Properties: amqp.Table{
"product": thisProgram,
"version": thisVersion,
},
}
connection, err = amqp.DialConfig(amqpURI, amqpConfig)
if err != nil {
return nil, nil, fmt.Errorf("AMQP Dial: %s", err)
}
channel, err = connection.Channel()
if err != nil {
return nil, nil, fmt.Errorf("AMQP Channel: %s", err)
}
// here we only ensure the AMQP exchange exists
err = channel.ExchangeDeclare(
exchange, // name
exchangeType, // type
true, // durable
false, // auto-deleted
false, // internal
false, // noWait
nil, // arguments
)
if err != nil {
return nil, nil, fmt.Errorf("Exchange Declare: %v", err)
}
return
}
// general error "handler"
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
// read logs from `queue` and send by AMQP
// TODO: there is too little error handling. in case of problems we simply panic and quit
func writeLogsToAmqp(queue <-chan Logline, shutdown chan<- string) {
connection, channel, err := openAmqpChannel(*options.uri,
*options.exchangeName, *options.exchangeType, *options.routingKey)
failOnError(err, "cannot open AMQP channel")
if *options.verbose {
log.Println("opened AMQP connection and checked exchange")
}
defer connection.Close()
defer channel.Close()
go func() {
notification := channel.NotifyClose(make(chan *amqp.Error))
n := <-notification
shutdown <- fmt.Sprintf("AMQP server closed connection: %v", n)
}()
for message := range queue {
err := publishSingleMessageToAmqp(message, channel)
if err != nil {
failOnError(err, "AMQP error")
} else if *options.verbose {
fmt.Printf(".")
}
}
}
func publishSingleMessageToAmqp(message Logline, channel *amqp.Channel) error {
// simple check of content type
var contentType string
if message[0] == '{' && message[len(message)-1] == '}' {
contentType = "application/json"
} else {
contentType = "text/plain"
}
return channel.Publish(
*options.exchangeName, // publish to an exchange
*options.routingKey, // routing to 0 or more queues
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: amqp.Table{},
ContentType: contentType,
ContentEncoding: "",
Body: Unescape([]byte(message)),
DeliveryMode: amqp.Transient, // 1=non-persistent, 2=persistent
Priority: 0, // 0-9
// a bunch of application/implementation-specific fields
},
)
}
// let the OS tell us to shutdown
func osSignalHandler(shutdown chan<- string) {
var sigs = make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP)
sig := <-sigs // this is the blocking part
go func(){
time.Sleep(2*time.Second)
log.Fatalf("shutdown was ignored, bailing out now.\n")
}()
shutdown <- fmt.Sprintf("received signal %v", sig)
}
func main() {
if *options.verbose {
log.Printf("Start %s %s", thisProgram, thisVersion)
}
// let goroutines tell us to shutdown (on error)
var sig_shutdown = make(chan string)
var file_shutdown = make(chan string)
var amqp_shutdown = make(chan string)
// the main data queue, between reader and writer goroutines
var queue = make(chan Logline)
// let the OS tell us to shutdown
go osSignalHandler(sig_shutdown)
// tell goroutine to save state before shutdown
var savestate = make(chan bool)
go readLogsFromFile(*options.filename, queue, file_shutdown, savestate)
go writeLogsToAmqp(queue, amqp_shutdown)
// keep track of last offset
ticker := time.NewTicker(time.Second * 2)
go func() {
for _ = range ticker.C {
savestate <- true
}
}()
select {
case message := <-sig_shutdown:
if *options.verbose {
log.Println("sig_shutdown:", message)
}
case message := <-file_shutdown:
if *options.verbose {
log.Println("file_shutdown:", message)
}
case message := <-amqp_shutdown:
if *options.verbose {
log.Println("amqp_shutdown:", message)
}
savestate <- true // file reader still alive
}
if *options.verbose {
log.Println("The End.")
}
}