-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathtask.go
158 lines (125 loc) · 3.46 KB
/
task.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package watcher
import (
"bytes"
"compress/zlib"
"context"
"fmt"
"io"
"log"
"net/http"
"strconv"
"time"
"github.com/OneOfOne/xxhash"
"github.com/shellbear/web-watcher/models"
"golang.org/x/net/html"
)
const updateFormat = "2006-01-02 15:04:05"
// Add a new task to the list and run it.
// If a task already exits for the given URL then cancel it and override it.
func (w *Watcher) NewTask(task *models.Task) {
taskName := task.URL + task.ChannelID
ctx, cancel := context.WithCancel(context.Background())
if cancel, ok := w.Tasks[taskName]; ok {
cancel()
}
w.Tasks[taskName] = cancel
go func() {
if err := w.runTask(ctx, task); err != nil {
log.Printf("Failed to run task for %s. Error: %s\n", task.URL, err)
}
}()
}
// Extract body and generate a unique hash for the web page.
// This hash will be used to speed up future page comparisons.
func (w *Watcher) getHash(resp *http.Response) (string, []byte, error) {
xxHash := xxhash.New64()
// Parse page as HTML.
doc, err := html.Parse(resp.Body)
if err != nil {
return "", nil, err
}
// Try to parse only the content of body by default.
if bn, err := getBody(doc); err == nil {
var buf bytes.Buffer
if err := html.Render(io.Writer(&buf), bn); err != nil {
return "", nil, err
}
body := buf.Bytes()
// Create a unique hash for the page content.
if _, err := xxHash.Write(body); err != nil {
return "", nil, err
}
return strconv.FormatUint(xxHash.Sum64(), 10), body, nil
}
// Create a unique hash for the page content.
hash := []byte{}
if _, err := xxHash.Write(hash); err != nil {
return "", nil, err
}
return strconv.FormatUint(xxHash.Sum64(), 10), hash, nil
}
// Analyze page structure, extract tags and check difference ratio between changes.
func (w *Watcher) hasChanged(task *models.Task, body []byte, hash string) (bool, error) {
// Skip furthers checks if hashes are identical.
if task.Hash == hash {
log.Println("Hashes are identical. Skipping further checks...")
return false, nil
}
// Check if web page has changed.
updated, err := w.checkChanges(task, body)
if err != nil {
return false, err
}
if updated {
// Compress body to decrease size in database.
var b bytes.Buffer
compress := zlib.NewWriter(&b)
if _, err = compress.Write(body); err != nil {
return false, err
}
if err = compress.Close(); err != nil {
return false, err
}
if err := w.updateTask(task, hash, b.Bytes()); err != nil {
return false, err
}
return true, nil
}
return true, nil
}
func (w *Watcher) analyzeChanges(task *models.Task) error {
resp, err := http.Get(task.URL)
if err != nil {
return fmt.Errorf("failed to fetch task URL: %s", err)
}
defer resp.Body.Close()
hash, body, err := w.getHash(resp)
if err != nil {
return fmt.Errorf("failed to parse and generate hash for page. Error: %s", err)
}
updated, err := w.hasChanged(task, body, hash)
if err != nil {
return fmt.Errorf("failed to check page changes: %s", err)
}
if updated {
log.Printf("%s has been updated.\n", task.URL)
}
return nil
}
// Run the task every X minutes.
func (w *Watcher) runTask(ctx context.Context, task *models.Task) error {
log.Println("Crawling:", task.URL)
if err := w.analyzeChanges(task); err != nil {
fmt.Println(err)
}
select {
case <-time.After(w.WatchInterval):
if err := w.DB.Find(task, task.ID).Error; err != nil {
return err
}
return w.runTask(ctx, task)
case <-ctx.Done():
log.Println("Stopped task for", task.URL)
return nil
}
}