728x90
kcat(kafkacat) 명령어
kcat - Apache Kafka 생산자(producer) 및 소비자(consumer) 도구
테스트 환경
$ cat /etc/redhat-release
CentOS Linux release 7.9.2009 (Core)
$ getconf LONG_BIT
64
kafkacat 설치
- gcc-c++, git, librdkafka-devel 설치
yum install -y gcc-c++ git librdkafka-devel
- kafkacat 다운로드(git clone), 컴파일 및 설치
git clone https://github.com/edenhill/kafkacat
cd kafkacat
./configure
make
make install
- kcat 버전 정보 확인
$ kcat -V
kcat - Apache Kafka producer and consumer tool
https://github.com/edenhill/kcat
Copyright (c) 2014-2021, Magnus Edenhill
Version 1.7.0-55-g7a6120 (librdkafka 0.11.5 builtin.features=gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins)
728x90
Producer mode (메시지 Write(topic 자동 생성)):
kcat -b <broker> -t <topic> -p <partition>
$ kcat -P -b localhost:9092 -t test
*****Enter + Ctrl-D : 완료(메시지 전송이 됩니다.)
Ctrl-D : 터미널 종료
Consumer mode (메시지 Read):
kcat -b <broker> -t <topic> -p <partition>
$ kcat -b localhost:9092 -t test
% Reached end of topic test [0] at offset 0
Metadata list
$ kcat -L -b datanode01:9092
Metadata for all topics (from broker 1: datanode01:9092/1):
3 brokers:
broker 2 at datanode02:9092 (controller)
broker 3 at datanode03:9092
broker 1 at datanode01:9092
4 topics:
topic "byun" with 1 partitions:
partition 0, leader 1, replicas: 1, isrs: 1
topic "helloworld" with 1 partitions:
partition 0, leader 2, replicas: 2, isrs: 2
topic "test" with 1 partitions:
partition 0, leader 1, replicas: 1, isrs: 1
topic "__consumer_offsets" with 50 partitions:
partition 0, leader 2, replicas: 2, isrs: 2
partition 1, leader 3, replicas: 3, isrs: 3
...
kcat 사용법
Usage: kcat <options> [file1 file2 .. | topic1 topic2 ..]]
Usage: kcat <options> [file1 file2 .. | topic1 topic2 ..]]
kcat - Apache Kafka producer and consumer tool
https://github.com/edenhill/kcat
Copyright (c) 2014-2021, Magnus Edenhill
Version 1.7.0-55-g7a6120 (librdkafka 0.11.5 builtin.features=gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins)
General options:
-C | -P | -L | -Q Mode: Consume, Produce, Metadata List, Query mode
-G <group-id> Mode: High-level KafkaConsumer (Kafka >=0.9 balanced consumer groups)
Expects a list of topics to subscribe to
-t <topic> Topic to consume from, produce to, or list
-p <partition> Partition
-b <brokers,..> Bootstrap broker(s) (host[:port])
-D <delim> Message delimiter string:
a-z | \r | \n | \t | \xNN ..
Default: \n
-K <delim> Key delimiter (same format as -D)
-c <cnt> Limit message count
-m <seconds> Metadata (et.al.) request timeout.
This limits how long kcat will block
while waiting for initial metadata to be
retrieved from the Kafka cluster.
It also sets the timeout for the producer's
transaction commits, init, aborts, etc.
Default: 5 seconds.
-F <config-file> Read configuration properties from file,
file format is "property=value".
The KCAT_CONFIG=path environment can also be used, but -F takes precedence.
The default configuration file is $HOME/.config/kcat.conf
-X list List available librdkafka configuration properties
-X prop=val Set librdkafka configuration property.
Properties prefixed with "topic." are
applied as topic properties.
-X dump Dump configuration and exit.
-d <dbg1,...> Enable librdkafka debugging:
all,generic,broker,topic,metadata,feature,queue,msg,protocol,cgrp,security,fetch,interceptor,plugin,consumer,admin
-q Be quiet (verbosity set to 0)
-v Increase verbosity
-E Do not exit on non-fatal error
-V Print version
-h Print usage help
Producer options:
-z snappy|gzip|lz4 Message compression. Default: none
-p -1 Use random partitioner
-D <delim> Delimiter to split input into messages
-K <delim> Delimiter to split input key and message
-k <str> Use a fixed key for all messages.
If combined with -K, per-message keys
takes precendence.
-H <header=value> Add Message Headers (may be specified multiple times)
-l Send messages from a file separated by
delimiter, as with stdin.
(only one file allowed)
-T Output sent messages to stdout, acting like tee.
-c <cnt> Exit after producing this number of messages
-Z Send empty messages as NULL messages
file1 file2.. Read messages from files.
With -l, only one file permitted.
Otherwise, the entire file contents will
be sent as one single message.
-X transactional.id=.. Enable transactions and send all
messages in a single transaction which
is committed when stdin is closed or the
input file(s) are fully read.
If kcat is terminated through Ctrl-C
(et.al) the transaction will be aborted.
Consumer options:
-o <offset> Offset to start consuming from:
beginning | end | stored |
<value> (absolute offset) |
-<value> (relative offset from end)
s@<value> (timestamp in ms to start at)
e@<value> (timestamp in ms to stop at (not included))
-e Exit successfully when last message received
-f <fmt..> Output formatting string, see below.
Takes precedence over -D and -K.
-s key=<serdes> Deserialize non-NULL keys using <serdes>.
-s value=<serdes> Deserialize non-NULL values using <serdes>.
-s <serdes> Deserialize non-NULL keys and values using <serdes>.
Available deserializers (<serdes>):
<pack-str> - A combination of:
<: little-endian,
>: big-endian (recommended),
b: signed 8-bit integer
B: unsigned 8-bit integer
h: signed 16-bit integer
H: unsigned 16-bit integer
i: signed 32-bit integer
I: unsigned 32-bit integer
q: signed 64-bit integer
Q: unsigned 64-bit integer
c: ASCII character
s: remaining data is string
$: match end-of-input (no more bytes remaining or a parse error is raised).
Not including this token skips any
remaining data after the pack-str is
exhausted.
-D <delim> Delimiter to separate messages on output
-K <delim> Print message keys prefixing the message
with specified delimiter.
-O Print message offset using -K delimiter
-c <cnt> Exit after consuming this number of messages
-Z Print NULL values and keys as "NULL" instead of empty.
For JSON (-J) the nullstr is always null.
-u Unbuffered output
Metadata options (-L):
-t <topic> Topic to query (optional)
Query options (-Q):
-t <t>:<p>:<ts> Get offset for topic <t>,
partition <p>, timestamp <ts>.
Timestamp is the number of milliseconds
since epoch UTC.
Requires broker >= 0.10.0.0 and librdkafka >= 0.9.3.
Multiple -t .. are allowed but a partition
must only occur once.
Format string tokens:
%s Message payload
%S Message payload length (or -1 for NULL)
%R Message payload length (or -1 for NULL) serialized
as a binary big endian 32-bit signed integer
%k Message key
%K Message key length (or -1 for NULL)
%T Message timestamp (milliseconds since epoch UTC)
%h Message headers (n=v CSV)
%t Topic
%p Partition
%o Message offset
\n \r \t Newlines, tab
\xXX \xNNN Any ASCII character
Example:
-f 'Topic %t [%p] at offset %o: key %k: %s\n'
Consumer mode (writes messages to stdout):
kcat -b <broker> -t <topic> -p <partition>
or:
kcat -C -b ...
High-level KafkaConsumer mode:
kcat -b <broker> -G <group-id> topic1 top2 ^aregex\d+
Producer mode (reads messages from stdin):
... | kcat -b <broker> -t <topic> -p <partition>
or:
kcat -P -b ...
Metadata listing:
kcat -L -b <broker> [-t <topic>]
Query offset by timestamp:
kcat -Q -b broker -t <topic>:<partition>:<timestamp>
728x90
'리눅스' 카테고리의 다른 글
[draft] Docker를 privileged 모드로 실행하는 방법 (0) | 2021.12.24 |
---|---|
Kafka(Zookeeper)를 systemd를 사용하여 시작, 중지 및 관리하는 방법 (0) | 2021.12.22 |
카프카 producer와 consumer 테스트 (0) | 2021.12.22 |
[draft] Apache Kafka(Zookeeper)를 클러스터 구성하는 방법 (0) | 2021.12.22 |
docker-compose를 사용하여 ngrinder 컨트롤러 및 에이전트를 설정하는 방법 (0) | 2021.12.21 |