This repository was archived by the owner on Dec 8, 2017. It is now read-only.
forked from jmccarty3/kube2consul
-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathconsul_worker.go
376 lines (313 loc) · 9.66 KB
/
consul_worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
package main // import "github.com/jmccarty3/kube2consul"
import (
"errors"
"fmt"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"github.com/golang/glog"
consulapi "github.com/hashicorp/consul/api"
kapi "k8s.io/kubernetes/pkg/api"
)
//ConsulWorkerConfig is a configuration object for the consul worker
type ConsulWorkerConfig struct {
TCPDomain string
}
//ConsulWorker Interface for interacting with a Consul object
type ConsulWorker interface {
AddDNS(config DNSInfo, service *kapi.Service) error
RemoveDNS(config DNSInfo) error
SyncDNS()
PurgeDNS()
}
//ConsulAgentWorker ConsulWorker with a connection to a Consul agent
type ConsulAgentWorker struct {
ids map[string][]*consulapi.AgentServiceRegistration
agent *consulapi.Client
}
//ConsulCatalogWorker operates on consuls catalog rather then a direct agent
type ConsulCatalogWorker struct {
domain string
agent *consulapi.Client
ids map[string][]string //Keeps track of which services a "BaseID" is associated with
services map[string]int //Keeps track of how many "nodes" are attempting to use the service.
}
func isServiceNameValid(name string) bool {
if strings.Contains(name, ".") == false {
return true
}
glog.Infof("Names containing '.' are not supported: %s\n", name)
return false
}
func isServiceValid(service *kapi.Service) bool {
if isServiceNameValid(service.Name) {
if kapi.IsServiceIPSet(service) {
if service.Spec.Type == kapi.ServiceTypeNodePort {
return true // Service is valid
}
//Currently this is only for NodePorts.
glog.V(3).Infof("Skipping non-NodePort service: %s\n", service.Name)
} else {
// if ClusterIP is not set, do not create a DNS records
glog.Infof("Skipping dns record for headless service: %s\n", service.Name)
}
}
return false
}
func createAgentServiceCheck(config DNSInfo, port *kapi.ServicePort) *consulapi.AgentServiceCheck {
glog.V(3).Info("Creating service check for: ", config.IPAddress, " on Port: ", port.NodePort)
return &consulapi.AgentServiceCheck{
TCP: config.IPAddress + ":" + strconv.Itoa(int(port.NodePort)),
Interval: "60s",
}
}
func createServiceNameFromPort(serviceName string, port *kapi.ServicePort) string {
var name string
if len(port.Name) > 0 {
name = serviceName + "-" + port.Name
} else {
name = serviceName + "-" + strconv.Itoa(int(port.Port))
}
return name
}
func createAgentServiceReg(config DNSInfo, name string, service *kapi.Service, port *kapi.ServicePort) *consulapi.AgentServiceRegistration {
labels := []string{"Kube", string(port.Protocol)}
asrID := config.BaseID + port.Name
if name == "" {
name = createServiceNameFromPort(service.Name, port)
}
return &consulapi.AgentServiceRegistration{
ID: asrID,
Name: name,
Address: config.IPAddress,
Port: int(port.NodePort),
Tags: labels,
}
}
func createAgentService(name string, service *kapi.Service, port *kapi.ServicePort) *consulapi.AgentService {
labels := []string{"Kube", string(port.Protocol)}
if name == "" {
name = createServiceNameFromPort(service.Name, port)
}
return &consulapi.AgentService{
Service: name,
Tags: labels,
}
}
/*
func createCatalogRegistration(name string, service *kapi.Service, port *kapi.ServicePort) *consulapi.CatalogRegistration {
}
*/
//NewConsulAgentWorker Creates a new ConsulAgentWorker connected to a client
func NewConsulAgentWorker(client *consulapi.Client) *ConsulAgentWorker {
return &ConsulAgentWorker{
agent: client,
ids: make(map[string][]*consulapi.AgentServiceRegistration),
}
}
//AddDNS Adds the DNS information to consul
func (client *ConsulAgentWorker) AddDNS(config DNSInfo, service *kapi.Service) error {
glog.V(3).Info("Starting Add DNS for: ", config.BaseID)
if config.IPAddress == "" || config.BaseID == "" {
glog.Error("DNS Info is not valid for AddDNS")
return errors.New("DNS Info invalid")
}
//Validate Service
if !isServiceValid(service) {
return errors.New("Service Not Valid")
}
//Check Port Count & Determine DNS Entry Name
var serviceName string
if len(service.Spec.Ports) == 1 {
serviceName = service.Name
} else {
serviceName = ""
}
var failed []string
for _, port := range service.Spec.Ports {
asr := createAgentServiceReg(config, serviceName, service, &port)
if *argChecks && port.Protocol == "TCP" {
//Create Check if neeeded
asr.Check = createAgentServiceCheck(config, &port)
}
if client.agent != nil {
//Registers with DNS
if err := client.agent.Agent().ServiceRegister(asr); err != nil {
glog.Error("Error creating service record: ", asr.ID)
failed = append(failed, asr.ID)
continue
}
}
//Add to IDS
client.ids[config.BaseID] = append(client.ids[config.BaseID], asr)
}
if len(failed) != 0 {
return fmt.Errorf("Error creating service: %s", failed)
}
//Exit
return nil
}
//RemoveDNS Removes the DNS information requested from Consul
func (client *ConsulAgentWorker) RemoveDNS(config DNSInfo) error {
if ids, ok := client.ids[config.BaseID]; ok {
for _, asr := range ids {
if client.agent != nil {
if err := client.agent.Agent().ServiceDeregister(asr.ID); err != nil {
glog.Error("Error removing service: ", err)
}
}
}
delete(client.ids, config.BaseID)
} else {
glog.Error("Requested to remove non-existant BaseID DNS of:", config.BaseID)
}
return nil
}
func containsServiceID(id string, services map[string]*consulapi.AgentService) bool {
for _, service := range services {
if service.ID == id {
return true
}
}
return false
}
//SyncDNS Verifies that all requested services are actually registered
func (client *ConsulAgentWorker) SyncDNS() {
if client.agent != nil {
if services, err := client.agent.Agent().Services(); err == nil {
for _, registered := range client.ids {
for _, service := range registered {
if !containsServiceID(service.ID, services) {
glog.Info("Regregistering missing service ID: ", service.ID)
client.agent.Agent().ServiceRegister(service)
}
}
}
} else {
glog.Info("Error retreiving services from consul during sync: ", err)
}
}
}
//PurgeDNS Removes all currently registered entries from consul
func (client *ConsulAgentWorker) PurgeDNS() {
for id := range client.ids {
dns := DNSInfo{
BaseID: id,
}
client.RemoveDNS(dns)
}
}
//NewConsulCatalogWorker creates a worker to act on consuls catalog
func NewConsulCatalogWorker(client *consulapi.Client, config *ConsulWorkerConfig) *ConsulCatalogWorker {
return &ConsulCatalogWorker{
agent: client,
domain: config.TCPDomain,
ids: make(map[string][]string),
services: make(map[string]int),
}
}
//AddDNS Adds the service to consuls catalog
func (client *ConsulCatalogWorker) AddDNS(config DNSInfo, service *kapi.Service) error {
//Validate Service
if !isServiceValid(service) {
return fmt.Errorf("Service %s is not vaild", service.Name)
}
//Check Port Count & Determine DNS Entry Name
var serviceName string
if len(service.Spec.Ports) == 1 {
serviceName = service.Name
} else {
serviceName = ""
}
if client.services[service.Name] == 0 { //Only register if the service is new to us
for _, port := range service.Spec.Ports {
s := createAgentService(serviceName, service, &port)
cr := &consulapi.CatalogRegistration{
Node: fmt.Sprintf("%s-kube2consul", s.Service),
Address: fmt.Sprintf("%s.%s", s.Service, client.domain),
Service: s,
}
if client.agent != nil {
_, err := client.agent.Catalog().Register(cr, nil)
if err != nil {
return err
}
}
}
}
client.ids[config.BaseID] = append(client.ids[config.BaseID], service.Name)
client.services[service.Name] = client.services[service.Name] + 1
return nil
}
//RemoveDNS Removes the dns information from consul
func (client *ConsulCatalogWorker) RemoveDNS(config DNSInfo) error {
if services, exists := client.ids[config.BaseID]; exists {
for s := range services { //TODO This logic needs to be split up. Way too much scope
if count, ok := client.services[services[s]]; ok {
count--
if count <= 0 {
if client.agent != nil {
cdr := &consulapi.CatalogDeregistration{
Node: fmt.Sprintf("%s-kube2consul", services[s]),
}
client.agent.Catalog().Deregister(cdr, nil)
}
delete(client.services, services[s])
} else {
client.services[services[s]] = count
}
}
}
delete(client.ids, config.BaseID)
} else {
glog.Warning("Attempted to remove unregistered service:", config.BaseID)
}
return nil
}
//SyncDNS Does nothing on the catalog
func (*ConsulCatalogWorker) SyncDNS() {
//Nothing to sync
}
//PurgeDNS removes all currently registered items from consul
func (client *ConsulCatalogWorker) PurgeDNS() {
for id := range client.ids {
dns := DNSInfo{
BaseID: id,
}
client.RemoveDNS(dns)
}
}
func cleanup(c <-chan os.Signal, worker ConsulWorker) {
<-c
glog.Info("Trapped termination signal. Purging DNS")
worker.PurgeDNS()
glog.Info("DNS Purged. Exiting")
os.Exit(0)
}
//RunConsulWorker Runs the ConsulWorker while the queue is open
func RunConsulWorker(queue <-chan ConsulWork, client *consulapi.Client, config ConsulWorkerConfig) {
var worker ConsulWorker
if config.TCPDomain == "" {
worker = NewConsulAgentWorker(client)
} else {
worker = NewConsulCatalogWorker(client, &config)
}
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go cleanup(sigs, worker)
for work := range queue {
glog.V(4).Info("Consol Work Action: ", work.Action, " BaseID:", work.Config.BaseID)
switch work.Action {
case ConsulWorkAddDNS:
worker.AddDNS(work.Config, work.Service)
case ConsulWorkRemoveDNS:
worker.RemoveDNS(work.Config)
case ConsulWorkSyncDNS:
worker.SyncDNS()
default:
glog.Error("Unsupported Action of: ", work.Action)
}
}
}