Commit 229aa5ef authored by Joe LeGasse's avatar Joe LeGasse

Initial pass at the relay. Currently only HTTP works.

parent 23f73b6b
package main
import (
"flag"
"fmt"
"log"
"os"
"os/signal"
"github.com/influxdata/influxdb-relay/relay"
)
var (
configFile = flag.String("config", "", "Configuration file to use")
)
func main() {
flag.Parse()
if *configFile == "" {
fmt.Fprintln(os.Stderr, "Missing configufation file")
flag.PrintDefaults()
os.Exit(1)
}
cfg, err := relay.LoadConfigFile(*configFile)
if err != nil {
fmt.Fprintln(os.Stderr, "Problem loading config file:", err)
}
r, err := relay.New(cfg)
if err != nil {
log.Fatal(err)
}
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt)
go func() {
<-sigChan
r.Stop()
}()
log.Println("starting relays...")
r.Run()
}
package relay
import (
"os"
"github.com/naoina/toml"
)
type Config struct {
HTTPRelays []HTTPConfig `toml:"http"`
UDPRelays []UDPConfig `toml:"udp"`
}
type HTTPConfig struct {
// Name identifies the HTTP relay
Name string `toml:"name"`
// Addr should be set to the desired listening host:port
Addr string `toml:"bind-addr"`
// Outputs is a list of backed servers where writes will be forwarded
Outputs []HTTPOutputConfig `toml:"output"`
}
type HTTPOutputConfig struct {
// Name of the backend server
Name string `toml:"name"`
// Location should be set to the URL of the backend server's write endpoint
Location string `toml:"location"`
// Timeout sets a per-backend timeout for write requests. (Default 10s)
// The format used is the same seen in time.ParseDuration
Timeout string `toml:"timeout"`
}
type UDPConfig struct {
// Name identifies the UDP relay
Name string `toml:"name"`
// Addr is where the UDP relay will listen for packets
Addr string `toml:"bind-addr"`
// Outputs is a list of backend servers where writes will be forwarded
Outputs []UDPOutputConfig `toml:"output"`
}
type UDPOutputConfig struct {
// Name identifies the UDP backend
Name string `toml:"name"`
// Location should be set to the host:port of the backend server
Location string `toml:"location"`
}
// LoadConfigFile parses the specified file into a Config object
func LoadConfigFile(filename string) (cfg Config, err error) {
f, err := os.Open(filename)
if err != nil {
return cfg, err
}
defer f.Close()
return cfg, toml.NewDecoder(f).Decode(&cfg)
}
package relay
import (
"bytes"
"compress/gzip"
"fmt"
"io"
"log"
"net"
"net/http"
"sync"
"sync/atomic"
"time"
"github.com/influxdata/influxdb/models"
)
// HTTP is a relay for HTTP influxdb writes
type HTTP struct {
addr string
name string
closing int64
l net.Listener
backends []*httpBackend
}
func NewHTTP(cfg HTTPConfig) (Relay, error) {
h := new(HTTP)
h.addr = cfg.Addr
h.name = cfg.Name
for i := range cfg.Outputs {
b := &cfg.Outputs[i]
if b.Name == "" {
b.Name = b.Location
}
h.backends = append(h.backends, &httpBackend{b.Name, b.Location})
}
return h, nil
}
func (h *HTTP) Name() string {
if h.name == "" {
return "http://" + h.addr
}
return h.name
}
func (h *HTTP) Run() error {
l, err := net.Listen("tcp", h.addr)
if err != nil {
return err
}
h.l = l
log.Printf("Starting http relay %q on %v", h.Name(), h.addr)
err = http.Serve(l, h)
if atomic.LoadInt64(&h.closing) != 0 {
return nil
}
return err
}
func (h *HTTP) Stop() error {
atomic.StoreInt64(&h.closing, 1)
return h.l.Close()
}
func (h *HTTP) ServeHTTP(w http.ResponseWriter, r *http.Request) {
start := time.Now()
if r.URL.Path != "/write" {
jsonError(w, http.StatusNotFound, "invalid write endpoint")
return
}
if r.Method != "POST" {
w.Header().Set("Allow", "POST")
if r.Method == "OPTIONS" {
w.WriteHeader(http.StatusNoContent)
} else {
jsonError(w, http.StatusMethodNotAllowed, "invalid write method")
}
return
}
// fail early if we're missing the database
if r.URL.Query().Get("db") == "" {
jsonError(w, http.StatusBadRequest, "missing parameter: db")
return
}
var body = r.Body
if r.Header.Get("Content-Encoding") == "gzip" {
b, err := gzip.NewReader(r.Body)
if err != nil {
jsonError(w, http.StatusBadRequest, "unable to decode gzip body")
}
defer b.Close()
body = b
}
bodyBuf := getBuf()
_, err := bodyBuf.ReadFrom(body)
if err != nil {
putBuf(bodyBuf)
jsonError(w, http.StatusInternalServerError, "problem reading request body")
return
}
precision := r.URL.Query().Get("precision")
points, err := models.ParsePointsWithPrecision(bodyBuf.Bytes(), start, precision)
if err != nil {
putBuf(bodyBuf)
jsonError(w, http.StatusBadRequest, "unable to parse points")
return
}
outBuf := getBuf()
for _, p := range points {
if _, err = outBuf.WriteString(p.PrecisionString(precision)); err != nil {
break
}
if err = outBuf.WriteByte('\n'); err != nil {
break
}
}
// done with the input points
putBuf(bodyBuf)
if err != nil {
putBuf(outBuf)
jsonError(w, http.StatusInternalServerError, "problem writing points")
return
}
defer putBuf(outBuf)
outBytes := outBuf.Bytes()
var wg sync.WaitGroup
wg.Add(len(h.backends))
var responses = make(chan *http.Response, len(h.backends))
for _, b := range h.backends {
b := b
go func() {
defer wg.Done()
resp, err := b.post(outBytes, r.URL.RawQuery)
if err != nil {
log.Printf("Problem posting to relay %q backend %q: %v", h.Name(), b.name, err)
responses <- nil
} else {
if resp.StatusCode/100 != 2 {
log.Printf("Non-2xx response for relay %q backend %q: %v", h.Name(), b.name, resp.StatusCode)
}
responses <- resp
}
}()
}
go func() {
wg.Wait()
close(responses)
}()
var responded bool
var errResponse *http.Response
for resp := range responses {
if resp == nil {
continue
}
if responded {
resp.Body.Close()
continue
}
if resp.StatusCode/100 == 2 { // points written successfully
w.WriteHeader(http.StatusNoContent)
responded = true
resp.Body.Close()
continue
}
if errResponse != nil {
resp.Body.Close()
continue
}
// hold on to one of the responses to return back to the client
errResponse = resp
}
if responded {
// at least one success
if errResponse != nil {
errResponse.Body.Close()
}
return
}
// no successful writes
if errResponse == nil {
// failed to make any valid request... network error?
jsonError(w, http.StatusInternalServerError, "unable to write points")
return
}
// errResponse has our answer...
for _, s := range []string{"Content-Type", "Content-Length", "Content-Encoding"} {
if v := errResponse.Header.Get(s); v != "" {
w.Header().Set(s, v)
}
}
w.WriteHeader(errResponse.StatusCode)
io.Copy(w, errResponse.Body)
errResponse.Body.Close()
}
var bufPool = sync.Pool{New: func() interface{} { return new(bytes.Buffer) }}
func getBuf() *bytes.Buffer {
return bufPool.Get().(*bytes.Buffer)
}
func putBuf(b *bytes.Buffer) {
b.Reset()
bufPool.Put(b)
}
func jsonError(w http.ResponseWriter, code int, message string) {
w.Header().Set("Content-Type", "application/json")
data := fmt.Sprintf("{\"error\":%q}\n", message)
w.Header().Set("Content-Length", fmt.Sprint(len(data)))
w.WriteHeader(code)
w.Write([]byte(data))
}
type httpBackend struct {
name string
location string
}
func (b *httpBackend) post(buf []byte, query string) (*http.Response, error) {
req, err := http.NewRequest("POST", b.location, bytes.NewReader(buf))
if err != nil {
return nil, err
}
req.URL.RawQuery = query
req.Header.Set("Content-Type", "text/plain")
req.Header.Set("Content-Length", fmt.Sprint(len(buf)))
return http.DefaultClient.Do(req)
}
package relay
import (
"fmt"
"log"
"sync"
)
type Service struct {
relays map[string]Relay
}
func New(config Config) (*Service, error) {
s := new(Service)
s.relays = make(map[string]Relay)
for _, cfg := range config.HTTPRelays {
h, err := NewHTTP(cfg)
if err != nil {
return nil, err
}
if s.relays[h.Name()] != nil {
return nil, fmt.Errorf("duplicate relay: %q", h.Name())
}
s.relays[h.Name()] = h
}
for _, cfg := range config.UDPRelays {
u, err := NewUDP(cfg)
if err != nil {
return nil, err
}
if s.relays[u.Name()] != nil {
return nil, fmt.Errorf("duplicate relay: %q", u.Name())
}
s.relays[u.Name()] = u
}
return s, nil
}
func (s *Service) Run() {
var wg sync.WaitGroup
wg.Add(len(s.relays))
for k := range s.relays {
relay := s.relays[k]
go func() {
defer wg.Done()
if err := relay.Run(); err != nil {
log.Printf("Error running relay %q: %v", relay.Name(), err)
}
}()
}
wg.Wait()
}
func (s *Service) Stop() {
for _, v := range s.relays {
v.Stop()
}
}
type Relay interface {
Name() string
Run() error
Stop() error
}
package relay
// UDP is a relay for UDP influxdb writes
type UDP struct {
config *UDPConfig
}
func NewUDP(config UDPConfig) (Relay, error) {
panic("unimplemented")
}
[[http]]
name = "example"
bind-addr = "127.0.0.1:9096"
output = [
{ name="local1", location = "http://127.0.0.1:8086/write" },
{ name="local2", location = "http://127.0.0.1:7086/write" },
]
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment