Contents

rabbitmq

RabbitMQ

RabbitMQ = message broker (accepts, stores, and forwards binary blobs of data - messages)

術語:

  • Producer: 送出 messages (sending)
  • Queue: a large message buffer
    • producer 可以送 message 到 queue 裡面
    • consumer 可以從 queue 把 message 收走
  • Consumer: 等待接收 messages (receiving)

producer, consumer, broker 不需要在相同的 host 上

要先安裝 rabbitMQ service 安裝連結

以下是用 golang 實作練習

用到的 package amqp091-go

Hello World

實作兩個檔案

  • send.go (代表 producer 送出 message)
  • receive.go (代表 consumer 接收 message)

實作 send.go

設定 rabbitMQ 連線資訊 (socket connection)

// 設定 rabbitMQ 連線資訊
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
// defer 把 conn close 掉
defer conn.Close()

建立通訊 channel

// 建立通訊 channel
ch, err := conn.Channel()
// defer 把 channel close 掉
defer ch.Close()

必須宣告要使用哪個 queue 來 send 資料,再透過 publish 把資料送進 queue

q, err := ch.QueueDeclare(
  "hello", // name
  false,   // durable
  false,   // delete when unused
  false,   // exclusive
  false,   // no-wait
  nil,     // arguments
)

body := "Hello World!"
err = ch.Publish(
  "",     // exchange
  q.Name, // routing key
  false,  // mandatory
  false,  // immediate
  amqp.Publishing {
    ContentType: "text/plain",
    Body:        []byte(body),
  })

注意建立 queue 是 idempotent 的,只有在對應的 queue 不從在的時候才會建立

message 是 byte array 的形式,所以可以傳送任何資料

實作 receive.go

連線的地方和 send 設定一樣

  • open connection
  • open channel
  • 宣告要用哪個 queue (確保通道一定存在,不存在的話會建立)
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()

ch, err := conn.Channel()
defer ch.Close()

q, err := ch.QueueDeclare(
  "hello", // name
  false,   // durable
  false,   // delete when unused
  false,   // exclusive
  false,   // no-wait
  nil,     // arguments
)

message 會是從 channel ( from amqp::Consume ) 以 asynchronously 的方式收到

開一個 goroutine 來接收

msgs, err := ch.Consume(
  q.Name, // queue
  "",     // consumer
  true,   // auto-ack
  false,  // exclusive
  false,  // no-local
  false,  // no-wait
  nil,    // args
)

forever := make(chan bool)

go func() {
  for d := range msgs {
    log.Printf("Received a message: %s", d.Body)
  }
}()

log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever

Work Queues (aka: Task Queue)

會有多數個 consumer 來接收 queue 送出來的 message

實作兩個檔案

  • new_task.go (代表 producer 送出 message)
  • worker.go (代表 consumer 接收 message)

實作 new_task.go

// 連線的地方上前個練習一樣
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()

ch, err := conn.Channel
defer ch.Close()

q, err := ch.QueueDeclare(
    "task_queue", // name
    true,         // durable
    false,        // delete when unused
    false,        // exclusive
    false,        // no-wait
    nil,          // arguments
)

傳送整理過後的資訊

body := bodyFrom(os.Args)
err = ch.Publish(
	"",     // exchange
	q.Name, // routing key
	false,  // mandatory
	false,
	amqp.Publishing{
		DeliveryMode: amqp.Persistent,
		ContentType:  "text/plain",
		Body:         []byte(body),
    })
log.Printf(" [x] Sent %s", body)

其中 bodyForm 用來把 arg 參數串成一個字串

func bodyFrom(args []string) string {
    var s string
    if (len(args) < 2) || os.Args[1] == "" {
        s = "hello"
    } else {
        s = strings.Join(args[1:], " ")
    }
    return s
}

實作 worker.go

// 連線的地方上前個練習一樣
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
defer conn.Close()

ch, err := conn.Channel()
defer ch.Close()

q, err := ch.QueueDeclare(
"task_queue", // name
    true,         // durable
    false,        // delete when unused
    false,        // exclusive
    false,        // no-wait
    nil,          // arguments
)

調整要接收資料的地方

err = ch.Qos(
	1,     // prefetch count
	0,     // prefetch size
	false, // global
)

msgs, err := ch.Consume(
	q.Name, // queue
	"",     // consumer
	false,  // auto-ack
	false,  // exclusive
	false,  // no-local
	false,  // no-wait
	nil,    // args
)

forever := make(chan bool)

go func() {
	for d := range msgs {
		log.Printf("Received a message: %s", d.Body)
		dotCount := bytes.Count(d.Body, []byte("."))
		t := time.Duration(dotCount)
		time.Sleep(t * time.Second)
		log.Printf("Done")
		d.Ack(false)
	}
}()

log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever