-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwriter.go
168 lines (152 loc) · 3.57 KB
/
writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
package sse
import (
"bytes"
"io"
"unicode/utf8"
//"github.com/segmentio/asm/utf8" -- can switch to this library in the future if needed
)
type writeState struct {
inMessage bool
trailingCarriage bool
}
// writer is not thread safe. it is meant for internal usage
type Writer struct {
raw io.Writer
es writeState
w io.Writer
o Options
}
func NewWriter(w io.Writer, opts ...Option) *Writer {
o := &Options{}
for _, v := range opts {
v(o)
}
return &Writer{
raw: w,
w: w,
o: *o,
}
}
func (e *Writer) writeByte(x byte) error {
_, err := e.w.Write([]byte{x})
return err
}
func (e *Writer) Flush() error {
if e.es.inMessage {
// we are in a message, so write a newline to terminate it, as the user did not
err := e.writeByte('\n')
if err != nil {
return err
}
e.es.inMessage = false
e.es.trailingCarriage = false
}
if e.es.trailingCarriage {
err := e.writeByte('\n')
if err != nil {
return err
}
e.es.trailingCarriage = false
}
return nil
}
// next should be called at the end of an event. it will call Flush and then write a newline
func (e *Writer) Next() error {
if err := e.Flush(); err != nil {
return err
}
// we write a newline, indicating now that this is a new event
if err := e.writeByte('\n'); err != nil {
return err
}
return nil
}
// Event will start writing an event `name: topic` to the stream
func (e *Writer) Field(name []byte, topic []byte) error {
if e.o.ValidateUTF8() {
if !utf8.Valid(topic) {
return ErrInvalidUTF8Bytes
}
}
if len(topic) > 0 {
if _, err := e.w.Write(name); err != nil {
return err
}
if _, err := e.w.Write([]byte(": ")); err != nil {
return err
}
// write the supplied topic
if _, err := e.w.Write(topic); err != nil {
return err
}
} else if bytes.Equal([]byte("id"), name) {
if _, err := e.w.Write(name); err != nil {
return err
}
} else {
return nil
}
if err := e.writeByte('\n'); err != nil {
return err
}
return nil
}
func (e *Writer) ReadFrom(r io.Reader) (n int64, err error) {
return io.Copy(e, r)
}
// Write underlying write method for piping data. be careful using this!
func (e *Writer) Write(xs []byte) (n int, err error) {
if e.o.ValidateUTF8() && !utf8.Valid(xs) {
return 0, ErrInvalidUTF8Bytes
}
for _, x := range xs {
// now, see if there was a trailing carriage left over from the last write
// only check and write the data if we are do not have a trailing carriage
if !e.es.trailingCarriage {
err := e.checkMessage()
if err != nil {
return 0, err
}
} else {
// if there is, see if the character is a newline
if x != '\n' {
// its not a newline, so the trailing carriage was a valid end of message. write a new data field
e.es.inMessage = false
err := e.checkMessage()
if err != nil {
return 0, err
}
}
// in the case that the character is a newline
// we will just write the newline and inMessage=false will be set in the case below
// in both cases, the trailing carriage is dealt with
e.es.trailingCarriage = false
}
// write the byte no matter what
err = e.writeByte(x)
if err != nil {
return
}
// if success, note that we wrote another byte
n++
if x == '\n' {
// end message if it's a newline always
e.es.inMessage = false
} else if x == '\r' {
// if x is a carriage return, mark it as trailing carriage
e.es.trailingCarriage = true
e.es.inMessage = false
}
}
return
}
func (e *Writer) checkMessage() error {
if !e.es.inMessage {
e.es.inMessage = true
_, err := e.w.Write([]byte("data: "))
if err != nil {
return err
}
}
return nil
}