J
J
jajabin2019-07-24 19:49:40
linux
jajabin, 2019-07-24 19:49:40

How to determine the correct number of Apache Kafka subsections?

There is a producer who writes version 9 netflow messages to the Kafka topic, then I implemented a consumer that processes messages from Kafka in 4 goroutines, since I am new to this business, I would like to know how to do all this correctly? Create 4 subsections with one topic and for each subsection make a consumer in a goroutine? Or leave everything in one thread? I would like to hear advice on how to correctly implement everything in the code below ...

package main

import (
  "context"
  "database/sql"
  "fmt"
  "github.com/golang/protobuf/proto"
  "github.com/kshvakov/clickhouse"
  "github.com/segmentio/kafka-go"
  "log"
  "net"
  netflow "netflowproject/pb-ext"
  "sync"
  "time"
)

const (
  DBURL = "127.0.0.1"
  USER  = "default"
  PASS = "123"
  DBNAME = "netflow"
)

type Worker struct {
  messageCount int
  last 		time.Time
  dur			time.Duration
  db 			*sql.DB
  flows		[][]interface{}
  consumer	func(func()*kafka.Reader,chan<- kafka.Message)
}
func (this *Worker)Buffer(msg *kafka.Message)(bool,error){
  this.messageCount += 1
  var flowMSG netflow.FlowMessage
  err := proto.Unmarshal(msg.Value,&flowMSG)
  if err == nil{
    src_ip   := net.IP(flowMSG.SrcIP).String()
    dst_ip   := net.IP(flowMSG.DstIP).String()
    next_hop := net.IP(flowMSG.NextHop).String()

    unzip := []interface{}{time.Now(),time.Now(),src_ip,dst_ip, int32(flowMSG.SrcPort), int32(flowMSG.DstPort),
      next_hop,flowMSG.Bytes, flowMSG.Packets,uint8(flowMSG.Proto),int32(flowMSG.Etype),flowMSG.SamplingRate,flowMSG.SequenceNum}

    this.flows = append(this.flows,unzip)
  }else{
    log.Printf("unmarshaling error: ",err)
  }
  return false,nil
}

func (this *Worker)Flush()bool{
  fmt.Printf("Processed %d records in the last iteration.\n", this.messageCount)
  this.messageCount = 0
  tx, err := this.db.Begin()
  if err != nil{
    log.Fatal("Could not open transaction.", err)
  }
  stmt, err := tx.Prepare(
    "INSERT INTO flow (date,time,src_ip,dst_ip,src_port,dst_port,next_hop,bytes,packet,protocol,etype,sampling_rate,sequencer_num) VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?)")
  if err != nil {
    log.Fatal(err)
  }
  for _, curFlow := range this.flows{
    _, err := stmt.Exec(curFlow...)
    if err != nil{
      fmt.Println(err)
    }
  }

  if err := tx.Commit();err!=nil{
    fmt.Println(err)
  }

  this.flows = make([][]interface{},0)
  return true

}


func main(){

  var(
    workersCount = 4
    ch = make(chan kafka.Message,10000)
   	wg sync.WaitGroup
    count = 0
    FlushCount = 10000
    FlushTime = "5s"

  )
  dur, _ := time.ParseDuration(FlushTime)
  timer := time.After(dur)
  wg.Add(workersCount)
  for i:=1; i <= workersCount; i++{
    go func() {
      w := &Worker{
        last:		time.Time{},
        db:			dbConn(DBURL,USER, PASS ,DBNAME),
        consumer:	kafkaMessage,
      }
      go w.consumer(GetKafkaReader,ch)
      for {
        select {
        case <-timer:
          w.Flush()
          timer = time.After(dur)
        case msg, ok := <- ch:
          if ok {
            flush, err := w.Buffer(&msg)
            if flush{
              w.Flush()
            }
            if err != nil {
              log.Fatal("Error while porcessing: %v", err)
            }
            count++
            if count == FlushCount{
              w.Flush()
              count = 0
            }
          }
        }
      }
    }()
    defer wg.Done()
  }

  wg.Wait()



}

var kafkaMessage = func(
  reader func()*kafka.Reader,
  messageCh chan<- kafka.Message){
  r := reader()
  r.SetOffset(47)
  for {
    m, err := r.ReadMessage(context.Background())
    if err != nil{fmt.Println(err)}
    messageCh <- m
  }

}

var dbConn = func(dbURL,user, password ,dbNAME string)(*sql.DB){
  dataSourceName := fmt.Sprintf("tcp://%s:9000?username=%s&password=%s&database=%s&debug=false",
    dbURL,user,password,dbNAME)
  connect, err := sql.Open("clickhouse",dataSourceName)
  if err != nil {
    log.Fatal(err)
  }
  if err := connect.Ping(); err != nil {
    if exception, ok := err.(*clickhouse.Exception); ok {
      log.Printf("[%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace)
    } else {
      log.Println(err)
    }
    return nil
  }
  return connect
}

func GetKafkaReader()*kafka.Reader{
  return kafka.NewReader(kafka.ReaderConfig{
    Brokers: []string{"127.0.0.1:9092"},
    Topic:	  "netflow",
    Partition: 0,
    MinBytes: 10e3,
    MaxBytes: 10e6,
  })
}

Answer the question

In order to leave comments, you need to log in

1 answer(s)
D
Dmitry Derepko, 2019-07-24
@xEpozZ

I think it depends on your specific task
Consumer process can be anything, depends on the load on the node.

Didn't find what you were looking for?

Ask your question

Ask a Question

731 491 924 answers to any question