Logstash 是一个开源的数据处理管道,它能够同时从多个来源采集数据,对数据进行处理,并将其发送到您选择的目的地。Logstash 是 Elastic Stack(以前称为 ELK Stack)的一部分,该技术栈还包括 Elasticsearch 和 Kibana。这三个工具通常一起使用,以提供强大的搜索、分析和可视化功能。Lakehouse提供了Logstash Connector可以直接连接Lakehouse

部署

# 卸载插件
./bin/logstash-plugin uninstall logstash-output-clickzetta && rm -rf ./vendor/local_gems/

# 安装插件
./bin/logstash-plugin install --no-verify --local logstash-output-clickzetta-0.0.1.gem 

# 查看插件
./bin/logstash-plugin list --verbose

测试

cd /logstash-data
/usr/share/logstash/bin/logstash -f data/logstash.conf.5 --log.level error
# 默认worker数为机器核数(-w),默认batch size为125(-b),可通过 -w 16 -b 200 进行调整对应参数

File input

参考 file input 插件

input {
  file {
                path => "/logstash-data/data/igs_worker_full_log.log"
                codec => json
                start_position => "beginning"
                mode => "read"
                file_completed_action => log
                file_completed_log_path => "/logstash-data/data/temp_log/"
                exit_after_read => true
                sincedb_clean_after => "1 second"
        }
  file {
                path => "/logstash-data/data/igs_worker_full_log_1.log"
                codec => json
                start_position => "beginning"
                mode => "read"
                file_completed_action => log
                file_completed_log_path => "/logstash-data/data/temp_log/"
                exit_after_read => true
                sincedb_clean_after => "1 second"
        }
  file {
                path => "/logstash-data/data/igs_worker_full_log_2.log"
                codec => json
                start_position => "beginning"
                mode => "read"
                file_completed_action => log
                file_completed_log_path => "/logstash-data/data/temp_log/"
                exit_after_read => true
                sincedb_clean_after => "1 second"
        }
  file {
                path => "/logstash-data/data/igs_worker_full_log_3.log"
                codec => json
                start_position => "beginning"
                mode => "read"
                file_completed_action => log
                file_completed_log_path => "/logstash-data/data/temp_log/"
                exit_after_read => true
                sincedb_clean_after => "1 second"
        }
  file {
                path => "/logstash-data/data/igs_worker_full_log_4.log"
                codec => json
                start_position => "beginning"
                mode => "read"
                file_completed_action => log
                file_completed_log_path => "/logstash-data/data/temp_log/"
                exit_after_read => true
                sincedb_clean_after => "1 second"
        }
}

output {
    clickzetta {
            jdbcUrl => "jdbc:clickzetta://9a310b9b.uat-api.clickzetta.com/quick_start?schema=public&username=index_test&password=password&virtualCluster=YETING_TEST_AP"
            username => "index_test"
            password => "password"
            schema => "public"
            table => "test_simple_data-%{+YYYY.MM.dd}"
            internalMode => false
            directMode => false
        }
}

Kafka input

Kafka input 配置参考 [kafka作为input接入]

input {
    kafka {
                bootstrap_servers => "alikafka-pre-cn-co92y9d22001-1.alikafka.aliyuncs.com:9093,alikafka-pre-cn-co92y9d22001-2.alikafka.aliyuncs.com:9093,alikafka-pre-cn-co92y9d22001-3.alikafka.aliyuncs.com:9093"
                group_id => "logstash-test-group"
                decorate_events => true
                topics => ["igs-worker-log-for-sla"]
                security_protocol => "SASL_SSL"
                sasl_mechanism => "PLAIN"
                sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='username' password='password';"
                ssl_truststore_password => "KafkaOnsClient"
                ssl_truststore_location => "/logstash-data/only.4096.client.truststore.jks"
                ssl_endpoint_identification_algorithm => ""
                consumer_threads => 3
                codec => json {
                    charset => "UTF-8"
                }
        }
}

filter {
mutate {
    add_field => {
        "kafka_topic_name" => "%{[fields][log_topic]}"
    }
}
grok {
    match => { "message" => "\[%{TIMESTAMP_ISO8601:@timestamp}\] \[%{NUMBER:thread_id}\] \[%{LOGLEVEL:log_level}\] \[%{DATA:source_file}\] \[%{DATA:function_name}\]%{GREEDYDATA:log_content}" }
}
ruby {
        code => "
            if event.get('thread_id').to_i % 2 == 0
                event.tag('even')
            else
                event.tag('odd')
            end
        "
    }
}
output {
    if "odd" in [tags] {
        clickzetta {
            jdbcUrl => "jdbc:clickzetta://9a310b9b.uat-api.clickzetta.com/quick_start?schema=public&username=index_test&password=password&virtualCluster=YETING_TEST_AP"
            username => "index_test"
            password => "password"
            schema => "public"
            table => "liuwei_log_data-odd-%{kafka_topic_name}-%{+YYYY.MM.dd}"
            tablet => 16
            debug => true
            internalMode => false
            directMode => false
        }
    } else {
        clickzetta {
            jdbcUrl => "jdbc:clickzetta://9a310b9b.uat-api.clickzetta.com/quick_start?schema=public&username=index_test&password=password&virtualCluster=YETING_TEST_AP"
            username => "index_test"
            password => "password"
            schema => "public"
            table => "liuwei_log_data-even-%{kafka_topic_name}-%{+YYYY.MM.dd}"
            tablet => 16
            debug => true
            internalMode => false
            directMode => false
        }
    }
}

联系我们
预约咨询
微信咨询
电话咨询