Giter VIP home page Giter VIP logo

blog's People

Contributors

huoarter avatar

Stargazers

 avatar

Watchers

 avatar  avatar  avatar

Forkers

lshuining

blog's Issues

ES使用经验

ES集群是所有数据的存储和查询

问题

1.严格控制集群shard的个数,shard的大小以及文档数量

es在执行index的创建删除的过程中的时候,时间复杂度应当是和shard的数量是正相关的,所以如果shard数量过大,将会导致的是索引的管理
耗时十分长,管理起来十分的困难。目前使用下来shard的总数尽量不要超过3w左右。同时shard数量过多,直接导致的是segement过多,查询
的时候需要遍历的shard直接变多,导致查询效率显著下降,所以希望还是严格控制shard的数量,保证每个索引shard数量处于一个合理的范围,
如果索引总体较大可以加大shard数量,如果较小那么就降低shard数量,使用下来单个shard大小在30-50G左右比较合适,过大写入较慢,I/O问题
如果index小于50G大于3G,最好分3个shard,过少shard,可能会导致热点数据查询集中在一台机器,导致内存溢出,index小于3G的分1,2个shard即可
同时要注意单个shard的文档数量不能超过lucene一个实例的文档数限制。

2.注意index的mapping

我们使用mapping的时候要注意字段不能过多,会加载到内存当中,否则有可能出现OOM的情况,同时最主要的要关注json格式的数据,
因为如果json的字段不断发生变化,那么有可能在不断的update_mapping,可以通过_cat/pending_task观察下当前的任务,如果有大量的update_mapping
就有可能是某个索引的字段在不断发生变化。

3.如何正确的做es集群的维护

ES集群的维护过程中,我们先了解下ES数据恢复的流程,ES在进行数据恢复的过程当中,首先primary shard是直接从本地磁盘进行恢复,replica shard的
恢复则不一定,因为Recovery的恢复是对比主副本的segment flush id来判断这两个shard是不是一致的,如果一致那么这个replica shard也将从本地恢复,如果
发现不一致将会从网络恢复,但是不同节点的segment merge其实是独立运行的,所以会导致一些shard的数据看上去主副本其实一样,但其实segment flush id
不同,只能够副本从网络恢复。其实在ES中,一个shard一段时间不更新后,会自动进行一次synced flush来同步一个synced flush id,所以我们在做集群维护的
过程中,最重要的就是要手动同步一次。那么完整的操作如下:

   1.停止消费hangout,不再向ES写数据。

   2.关闭集群shard allocation

   3.POST /_flush/synced强制flush

   4.重启ES

   5.打开集群shard allocation

   6.等待health变为green,打开hangout写数据。
  1. rebalance和recovery条件限制:

    1.有的时候我们发生故障重启了集群,但是集群中节点的加入是有时间的先后顺序,那么很有可能数据直接开始recovery,但是这个时候数据并不完整,节点之间很有可能
    已经开始进行网络恢复,然后导致的是数据分布不均匀,再出发rebalance导致数据一致漂移,造成没有意义的网络和磁盘I/O。
    那么对应这种情况,我们需要调整recovery的相关参数和rebalance的相关参数来限制一下recovery和rebalance的条件,比如gateway的设置:

      gateway.expected_nodes
      gateway.expected_master_nodes
      gateway.expected_data_nodes
      gateway.recover_after_nodes
      gateway.recover_after_master_nodes
      gateway.recover_after_data_nodes
      gateway.recover_after_time: 5m
    

    具体参数不详细讲解,主要就是限制加入的节点数和时间,再达到一定的条件后再开始进行recovery,给定一定的缓冲时间。

    2.再一个问题就是如果由于网络原因,可能有一些节点暂时脱离集群,这个时候我们并不希望直接开始进行recovery,因为节点很有可能很快加回节点,那么做下如下配置:

      index.unassigned.node_left.delayed_timeout: 5m
    

    等待5分钟后再开始进行recovery。

    3.限制的一些具体参数可以参考如下配置,比如恢复速率和并发数等等:

     "transient" : {
         "indices.recovery.max_bytes_per_sec" : "500mb",
         "indices.recovery.concurrent_streams" : "20",
         "cluster.routing.allocation.node_concurrent_recoveries": "20"
     }
     "transient" : {
         "cluster.routing.allocation.cluster_concurrent_rebalance": "20",
         "cluster.routing.rebalance.enable": "all"
    }
    

5.写入的优化

目前我们所做的写入优化主要是针对translog进行的一些优化,具体的配置参数可以参考如下:

 "translog": {  "interval": "30s",  "flush_threshold_size": "3g",  "durability": "async",  "sync_interval": "30s" }

主要是换成异步写入,并且增大flush的大小和间隔时间。因为本身日志是可丢的,异步存在丢失日志的风险,这个觉得是可以接受的。

6.段文件segment

如果存在一些保留时间很长的索引,由于segment的个数很多,对于这类索引,如果索引大小很大,可以定期进行一次optimize来强制进行合并,
原则上segment其实是个数越少所占内存越小,并且查询效率会显著提高,因为不需要去遍历多个segment,但是合并的过程也是很占资源,所以
在实际使用中不是很推荐,因为我们保存的日志时间很短,并没有太大的意义,但是对于保留时间较长的日志还是建议定期进行一次optimize,
需要注意的一点是,这个操作最好只对不再更新的索引执行。

7.发现discovery

由于在某些负载较高的情况下,data节点很有可能直接脱离集群,那么建议提高discovery的检测时间,降低这种风险,主要配置可以参考:

discovery.zen.fd.ping_timeout: 120s
discovery.zen.fd.ping_retries: 6
discovery.zen.fd.ping_interval: 30s

8.如何去检查一下index的状况

这个可以直接用底层的lucene的去检查一下index的一些状态,比如index是否发生了丢失数据,数据是否完整等等。

  java -cp lucene-core-5.5.0.jar -ea:org.apache.lucene… org.apache.lucene.index.CheckIndex /data/elasticsearch/wg-lpd-es-apollo-dqs/nodes/0/indices/shipping_order_2016_12_05

9.索引的路由

因为目前我们的场景下索引的格式非常多,超过4000个,那么索引的划分以及如果发生recovery或者rebalance的情况下,index中shard的漂移
是不可接受的。所以我们的建议是在物理集群中每10台机器打上一个tag划分成一个逻辑集群,然后再进行创建索引的过程中,通过allocation的
include将这个索引限制在这个逻辑集群当中,比如可以如果我们有200个索引,40台机器,我们可以将前20个最大的索引include到10台机器中,
将60个索引include到10台机器中。依次类推,只要保证每个逻辑集群中的总index的大小差异不是很大,限制住了索引的漂移,降低recovery和
rebalance的成本,部分配置可以参考:

curl -XPUT localhost:9200/test/_settings -d '{
    "index.routing.allocation.include.tag" : "value1,value2"
}'
curl -XPUT localhost:9200/test/_settings -d '{
    "index.routing.allocation.include.group1" : "xxx",
    "index.routing.allocation.include.group2" : "yyy",
    "index.routing.allocation.exclude.group3" : "zzz",
}'
curl -XPUT localhost:9200/_cluster/settings -d '{
  "transient" : {
  "cluster.routing.allocation.exclude._ip" : "10.0.0.1"
    }
}'
  1. 跨域配置:
  http.cors.enabled: true
  http.cors.allow-origin: "*"
  http.cors.allow-methods: OPTIONS, HEAD, GET, POST
  http.cors.allow-headers: X-Requested-With, Content-Type, Content-Length, X-Auth-Token

11.节点发生异常

如果某个节点你发现发生了异常,比如memory异常等等,可以尝试进行索引的手动迁移,将部分占用资源的索引迁移出去,降低
该节点的资源占用率,或者可以尝试通过api将该节点exclude出去,迁移所有的index,等待恢复后再include进来。

12.索引发生异常

如果一个索引异常了,比如之前遇到的shard个数分的过少,我们可以尝试新建shard数目足够的索引进行reindex或者从kafka重新消费过滤出该索引的数据写入ES。

改进

1.双写问题

考虑到未来集群可能会进行拆分,那么建议是两个集群中间通过tribe节点实现互联,这样前端不需要再做过多的配置,tribe节点会去找对应的索引的数据,
但是不要同一个索引两个集群中都有,因为tribe节点不会去做merge,但是考虑到我们的使用情况,我们只需要根据appid来进行es集群的拆分。

2.索引路由

目前的索引路由还不完善,未来需要根据数据的大小,shard的数量来进行一定的索引路由,将对应的索引限制在对应的机器中,减少数据漂移。

3.双实例

由于未来是单机双实例,所以要考虑到限制每个实例占用的资源以及同一个shard主副本不能够在同一台机器上,监控每个index的
segment的占用,一旦发生一个实例出现异常,很有可能出现雪崩效应导致整个集群挂掉。

kafka不停机,迁移zookeeper到新的zk集群

kafka 的zookeeper迁移
需要修改zookeeper地址的地方:
schmea-registry, brokers, consumers
原zk节点:

server.4=10.132.165.11:2888:3888

server.2=10.153.201.220:2888:3888

server.1=10.153.203.22:2888:3888

server.3=10.132.164.136:2888:3888

server.5=10.30.197.149:2888:3888

迁移步骤:

  1. 使用命令echo srvr |nc 127.0.0.1 2181查看5节点集群状态,以及各个节点的角色
  2. 备份zk中的数据(虽然原则上为不可回滚操作,安全起见还是备份一下数据)
  扩容: 新zk节点为扩容6,7,8
  3. 先修改6,7节点的zoo.cfg配置
     server.1=10.10.0.110:2888:3888
     server.2=10.10.0.111:2888:3888
     server.3=10.10.0.112:2888:3888
     server.4=10.10.0.113:2888:3888
     server.5=10.10.0.115:2888:3888
     server.6=...
     server.7=...
  4.依次重启6,7节点,并在leader节点查看数据是否同步(echo mntr|nc 10.30.197.149 2181),此时确定可以看到6,7节点已经从leader同步到数据,且为正常读写节点
  5. 1,2,3,4,5节点不做任何变更
  6.1 此时需要所有kafka broker节点新的zk节点6,7,并依次重启节点,间隔5分钟
  6.2  schema-registry,consumers用到kafka的zk也需要更改到的新的zk地址

  7. 此时8节点暂时不起动,kafka所有节点指向新的zk节点6,7
  缩容:  将6,7,8节点变为新的集群,给kafka独立使用
  8. 修改6,7,8节点的zoo.cfg配置为:
     server.6=10.10.0.115:2888:3888
     server.7=...
     server.8=...
  9.  此时启动节点8,但是8为looking状态,并不会加入集群,且数据为空
  10. 重启节点6,7节点这个时候,6,7的zxid高于8,进行选举,6,7其中一个为leader,6,7,8为新集群
    (注意: 测试过程此时如果只重启6,7其中一个节点,会导致老集群中/kafka/brokers/ids列表为空,同步到新集群,
     导致kafka需要再依次重启,所以6,7应该同时重启zk,会有重启时间段zk不可用,但kafka生产消费不依赖zk的正常)
  11.至此,kafka已经将zk迁移到新的zk集群,并不是完全平滑,中间有短时间zk不可用

总结:
在不中断业务的情况下,kafka的zk迁移到新zk集群,保证老zk集群不能下架,还是很棘手的操作。为了降低中断时间最少,此方案测试过程中是zk短时间不可用,但kafka集群一直处于正常工作状态。网上有类似的案例: 不停止kafka,迁移zk,仔细阅读此案例,依然会有短暂的新zk集群不可用的状态

Logstash 迁移ES 数据

  elasticsearch {
    hosts => "es.production.mysite.org"
    index => "mydata-2018.09.*"
    query => '{ "query": { "query_string": { "query": "*" } } }'
    size => 500
    scroll => "5m"
    docinfo => true
  }
input {
  elasticsearch {
    hosts => "9200"
    index => "-*-2019.06.01"
    query => '{ "query": { "query_string": { "query": "sid:110093789 OR  sid:100551383 OR  sid:183366188 OR  sid:112318502 OR  sid:113416916 OR  sid:486897964 OR  sid:113234331 OR  sid:113238288 OR  sid:566578383 OR  sid:3000001 OR  sid:117548108 OR  sid:300039451 OR  sid:101496588 OR  sid:108644974 OR  sid:111041057 OR  sid:117726953 OR  sid:64246482 OR  sid:69153752 OR  sid:105720053 OR  sid:106821734 OR  sid:63716160 OR  sid:412284613 OR  sid:101190526 OR  sid:162847549 OR  sid:110093789 OR  sid:261991723 OR  sid: 114661967 OR  sid: 57302891" } } }'
    size => 500
    scroll => "5m"
    docinfo => true

  }
}
filter {
  mutate {
    copy => { "[@metadata][_index]" => "_index" }
    copy => {  "[@metadata][_type]" =>  "_type" }
    copy => { "[@metadata][_id]" => "_id" }
 }
}
output {
  file {
  path => "/data/%{[@metadata][_index]}"
}
}
input {
  file {
    path => "/data/-*-2019.06.01"
    start_position => "beginning"
    codec => json
  }
}
filter {
  mutate {
    rename => {"_id" => "[@metadata][_id]" }
    rename => {"_type" =>  "[@metadata][_type]" }
    rename => {"_index" =>  "[@metadata][_index]" }
    remove_field => [ "@version", "path", "host"]
  }
}
output {
  elasticsearch {
  hosts => "es-.elasticsearch.aliyuncs.com:9200"
  index => "%{[@metadata][_index]}"
  document_type => "%{[@metadata][_type]}"
  document_id => "%{[@metadata][_id]}"
 }
}

ES 使用scroll遍历索引数据

#!/usr/bin/env python
#coding:utf-8
import sys
reload(sys)
sys.setdefaultencoding("utf-8")
from elasticsearch import helpers
from elasticsearch import Elasticsearch
import re
def get_message(content):
    match= re.search(r".*( seller_nick='.*?' )",content)
    if match:
        return match.group(1)

client = Elasticsearch(["http://user:[email protected]:9200"])
query={
    "query": {
        "query_string" : {
            "query" : "lain_app:trade-server.worker.trade-server AND message:'execute error'"
        }
    }
}
response = helpers.scan(client, query=query, scroll='5m', size=1000, clear_scroll=True, index="logstash-2019.05.28",)
for hit in response:
    content=hit["_source"]["message"]
    print get_message(content)

logstash 踩坑

  • logstash outpus codec => json时,需要使用tcp传输到其它logstash tcp ouputs,调试过程中发现使用codec => json ,tcp stream不会换行,导致接收端logstash不写outputs, 此时需要使用tcp codec => json_lines

kafka学习笔记

注意
kafka一个分区只能分配给一个group中的一个consumer,所以有60个分区,最多可以起60个consumer

  • 创建topics :(60个分区,1个副本(主副一共2个)):
./kafka-topics.sh --zookeeper localhost:2181  --create --topic eleme-nginx-log --replication-factor 2 --partitions 60
  • 增加topic的分区数:
./bin/kafka-topics.sh –zookeeper   localhost:2181 –alter –partitions 60 –topic eleme-nginx-log 
  • 平衡分区leader,重新选举:适用于节点挂掉恢复之后没有leader在原来的节点
    等价于配置项:auto.leader.rebalance.enable
bin/kafka-preferred-replica-election.sh --zookeeper localhost:2181 
  • 平衡分区节点分布:
bin/kafka-reassign-partitions.sh -zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "1,2,3,4,5" --generate 

[root@xg-ops-elk-kafka-1 kafka]# cat topics-to-move.json
{"topics":
[{"topic": "java-standard-application-log"}],
"version":1
}  
  • 生成推荐分布,也可以手动编辑,保存在java_balance_topics.json 例如:
{"version":1,"partitions":[{"topic":"java-standard-application-log","partition":30,"replicas":[2]},{"topic":"java-standard-application-log","partition":2,"replicas":[4]},{"topic":"java-standard-application-log","partition":40,"replicas":[2]},....
  • 进行重新分配:
bin/kafka-reassign-partitions.sh  --zookeeper xg-ops-elk-kafka-1:2181,xg-ops-elk-kafka-2:2181,xg-ops-elk-kafka-3:2181 --reassignment-json-file java_balance_topics.json --execute
  • 验证分配进度:
bin/kafka-reassign-partitions.sh  --zookeeper xg-ops-elk-kafka-1:2181,xg-ops-elk-kafka-2:2181,xg-ops-elk-kafka-3:2181 --reassignment-json-file java_balance_topics.json --verify

压测数据:

sh kafka-producer-perf-test.sh --topic ertao_test --num-records 5000000 --record-size 512 --throughput 1000000 --producer-props bootstrap.servers=xg-ops-elk-ngxkf-1:9092,xg-ops-elk-ngxkf-1:9092 acks=all retries=2 linger.ms=1
sh kafka-producer-perf-test.sh --topic ertao_test --num-records 5000000 --record-size 512 --throughput 100
0000 --producer-props bootstrap.servers=xg-ops-elk-ngxkf-1:9092,xg-ops-elk-ngxkf-1:9092 acks=all retries=2 linger.ms=1

sh kafka-producer-perf-test.sh --topic ertao_test --num-records 5000000 --record-size 512 --throughput 1000000 --producer-props bootstrap.servers=xg-ops-elk-ngxkf-1:9092,xg-ops-elk-ngxkf-1:9092 acks=1 retries=2 linger.ms=1

sh kafka-producer-perf-test.sh --topic ertao_test --num-records 5000000 --record-size 512 --throughput 1000000 --producer-props bootstrap.servers=xg-ops-elk-ngxkf-1:9092,xg-ops-elk-ngxkf-1:9092 acks=0 retries=2 linger.ms=1


./kafka-consumer-perf-test.sh --zookeeper localhost:2181 --messages 50000000 --topic ertao_test --threads 1

ES 添加默认模板

es6.x之后,一个index不支持多个type

PUT _template/default

{
  "order": 0,
  "index_patterns": [
    "*"
  ],
  "settings": {
    "index": {
      "refresh_interval": "60s",
      "number_of_shards": "5",
      "translog": {
        "flush_threshold_size": "1gb",
        "sync_interval": "60s",
        "durability": "async"
      },
      "number_of_replicas": "1"
    }
  },
  "mappings": {
    "_default_": {
      "properties": {
        "timestamp": {
          "type": "date",
          "format": "strict_date_optional_time||epoch_millis"
        }
      }
    }
  },
  "aliases": {}
}
  • 动态templates_mappings

PUT my_index
{
  "mappings": {
    "_default_": {
      "dynamic_templates": [
        {
          "integers": {
            "match_mapping_type": "long",
            "mapping": {
              "type": "integer"
            }
          }
        },
        {
          "strings": {
            "match_mapping_type": "string",
            "mapping": {
              "type": "text",
              "fields": {
                "raw": {
                  "type":  "keyword",
                  "ignore_above": 256
                }
              }
            }
          }
        }
      ]
    }
  }
}
https://www.elastic.co/guide/en/elasticsearch/reference/6.3/dynamic-templates.html
{
  "order": 0,
  "index_patterns": [
    "logstash*"
  ],
  "settings": {
    "index": {
      "refresh_interval": "60s",
      "number_of_shards": "60",
      "translog": {
        "flush_threshold_size": "1gb",
        "sync_interval": "120s",
        "durability": "async"
      },
      "number_of_replicas": "0"
    }
  },
  "mappings": {
    "_default_": {
      "dynamic_templates": [
        {
          "offset_field": {
            "match_mapping_type": "long",
            "mapping": {
              "type": "long"
            },
            "path_match": "offset"
          }
        }
      ]
    }
  },
  "aliases": {}
}
PUT _template/logstash
{
  "template" : "logstash*",
  "settings" : {
    "index" : {
      "translog": {
        "flush_threshold_size": "1gb",
        "sync_interval": "60s",
        "durability": "async"
      },
      "number_of_shards" : "20",
      "number_of_replicas": "1",
      "refresh_interval": "60s"
    }
   }
}

Elasticsearch关于unassigned shards修复

{
"node_id": "VCMPiqWZSYW4hnNDj_NExg",
"node_name": "es-1",
"transport_address": "127.0.0.1:9300",
"node_attributes": {
"tag": "warm"
},
"node_decision": "no",
"store": {
"in_sync": true,
"allocation_id": "zZHXkwouS_SPJUmLzg3nWQ",
"store_exception": {
"type": "shard_lock_obtain_failed_exception",
"reason": "[test-test][3]: obtaining shard lock timed out after 5000ms",
"index_uuid": "HI8Z5vAdTqmM8rfw_JT0Lw",
"shard": "3",
"index": "test-test"
}
},
"deciders": [
{
"decider": "max_retry",
"decision": "NO",
"explanation": "shard has exceeded the maximum number of retries [5] on failed allocation attempts - manually call [/_cluster/reroute?retry_failed=true] to retry, [unassigned_info[[reason=ALLOCATION_FAILED], at[2018-04-18T07:09:02.434Z], failed_attempts[10], delayed=false, details[failed to create shard, failure IOException[failed to obtain in-memory shard lock]; nested: ShardLockObtainFailedException[test-test][3]: obtaining shard lock timed out after 5000ms]; ], allocation_status[deciders_no]]]"
}
]
}

ES 版本: 5.2.1
 curl http://elasticsearch.aliyuncs.com:9200/_cluster/reroute?retry_failed=true -XPOST
步骤:
curl localhost:9200/_cat/shards > shards
跑脚本:nohup python recovery.py &
注意:跑脚本过程会返回大量json,时间较长,请注意放入后台
查看修复shard进度:curl 127.0.0.1:9200/_cat/recovery/你修复shard对应的索引
结果: 找到索引对应的shard,看到existing_store done说明已经从本地修复
 index 19 268ms existing_store done n/a        n/a                    10.0.58.67 node_name
#!/usr/bin/env python
#name: recovery.py

import requests
import json
host = "http://localhost:9200/_cluster/allocation/explain"
s= requests.Session()
def reroute_shard(index,shard,node):
    data = {
    "commands" : [
        {
          "allocate_stale_primary" : {
              "index" : index, "shard" : shard, "node" : node, "accept_data_loss": True
          }
        }
    ]
   }
    print data
    url = "http://localhost:9200/_cluster/reroute"
    res = s.post(url,json=data)
    print res

def get_node(line):
    if "UNASSIGNED" in line:
        line = line.split()
        index = line[0]
        shard = line[1]
        if line[2] != "p":
            return
        body = {
           "index": index,
           "shard": shard,
           "primary": True
               }
        res = s.get(host, json = body)
        for store in res.json().get("node_allocation_decisions"):
            if store.get("store").get("allocation_id"):
               node_name = store.get("node_name")
               reroute_shard(index,shard,node_name)
    else:
        return

with open("shards", 'rb') as f:
    map(get_node,f)
相关文档:

https://www.elastic.co/guide/en/elasticsearch/reference/5.2/cluster-reroute.html
https://www.elastic.co/guide/en/elasticsearch/reference/5.2/cluster-allocation-explain.html

KAFKA跨机房扩容broker节点迁移

KAFKA跨机房添加broker节点迁移
前提条件:

  • 两个机房网络必须全部打通
  • zookeeper需要迁移,考虑网络导致脑裂问题,业务迁移完成之后再进行zk迁移(可提前加入readonly到新机房)

1、topics.txt添加要迁移的topic

2、生成topics-to-move.json

import json
topic_file="topics.txt"
topic_to_move = {"topics":
[
],"version":1
}
with open("topics.txt",'rb') as f:
    for topic in f:
        topic_to_move["topics"].append({"topic": topic.strip()})

print json.dumps(topic_to_move)

3、先生成推荐topic去的broker节点对应的json
比如:

kafka-reassign-partitions --zookeeper 10.26.234.167:2181/kafka/ka --topics-to-move-json-file topics-to-move.json --broker-list "1,2,3" --generate

4、通过修改上面的json内容,来指定topic的partions分部在哪些机器上,并且可指定Leader在哪一个broker上(此步骤只有在3步骤生成推荐时,新broker数量少于旧broker数量时使用)

import json
f=open("to_balance2.json",'rb')
content=json.load(f)

for topic in content.get("partitions"):
    rep = topic["replicas"]
    if len(rep) == 1:
        topic["replicas"][0]=3
    elif len(rep) > 2:
        if 3 in rep:
            topic["replicas"].remove(3)
            topic["replicas"].insert(0,3)
        else:
            topic["replicas"][0] = 3
print json.dumps(content)

5、通过新的balance.json进行分区行动,可通过throttle进行限速,修改限速可以同样命令不同阀值重复执行

kafka-reassign-partitions --zookeeper 10.26.234.167:2181/kafka/ka --reassignment-json-file to_balance.json   --execute   --throttle 10000000

6、检查移动进度

kafka-reassign-partitions --zookeeper 10.26.234.167:2181/kafka/ka --reassignment-json-file to_balance.json  --verify 进行验证,并remove throttle

7、待分区移动完成时,新节点未接管为leader时,可以手动执行选举

kafka-preferred-replica-election  --zookeeper 10.26.234.167:2181/kafka/ka

Elasticsearc使用规范

上线使用流程

  1. 使用方需衡量上线 es的data size和write/read qps
  2. 提交给sre申请资源,sre根据data size和qps来创建适合的资源
    大致标准: 16C/64G 1.5万/qps 写入
    32C/64G 3万/qps 写入
  3. 通过index和mapping规范建立使用index, shard数分配代码不要控制,由sre来控制分配
  4. 如需针对index的字段,settings作出更改,请务必通知到sre再进行操作

index和mapping规范

  1. index命名格式:
    1. appname-YY.MM.dd (适用于递增且有@timestamp字段的索引)
      例:prism-2019.05.28 注意时间格式
    2. appname (适用于类DB存储索引,无需@timestamp字段的索引)
      例:shop.item.list
  2. mapping约束规范:
    1. @timestamp type 必须统一为date类型,且格式统一一种(单索引统一,或者类似prism-*所有匹配到的索引统一)
    2. mapping 遍历层,字段不能过多(需要大家商量约束),超过约束层,作为一个字段写入
  3. index shards和replicas数量:
    1. 据经验来谈,单个shards size 40G左右较佳,一个索引至少3个shard (通过此标准来计算)
    2. replicas业务使用索引至少分配一个,ELK日志使用es可以不分配replicas
  4. ELK日志单行size不得超过16k,超过之后直接丢弃

查询

  1. 使用代码全量查询的,需要使用scroll进行操作
  2. 使用kibana查询的,不要查询过长时间,不能使用无限值(理论上)的字段作聚合查询
  3. 禁止使用*类似的条件查询聚合

写入

  1. 程序应支持限速限流,不能以最大速度写入(可以封装sdk)

维护

  1. 必须添加禁止通配符删除索引
  {

    "persistent": {
    "action.destructive_requires_name": true
  }
}
  1. 设置默认es模板,部分es针对索引添加mapping模板:
{
  "template" : "*",
  "settings" : {
    "index" : {
      "translog": {
        "flush_threshold_size": "1gb",
        "sync_interval": "60s",
        "durability": "async"
      },
      "number_of_shards" : "5",
      "number_of_replicas": "1",
      "refresh_interval": "60s"
    }
   }
}

申请模板:
|集群名称|size|write qps|owner|index name|
|bizes|2T|20000|张三|bizes-*|

Kafka迁移方案

##官方命令行工具
- 编写需要移动的topic文件

--topics-to-move-json-file 指定JSON格式配置文件 topics.json

{"topics":
    [{"topic":"dialogue"}],
    "version": 1
}
  • 使用命令生成推荐的partitions分布,下架节点即将下架的节点不写入broker-list
kafka-reassign-partitions --zookeeper prd-zbka-001:2181/kafka/ka  --topics-to-move-json-file topics.json --broker-list "1,2,3,4,5" --generate > topic-reassignment.json

  • 执行分区重分配计划, 可重复执行使用不同的throttle值,来调整速率限制
kafka-reassign-partitions --zookeeper prd-zbka-001:2181/kafka/ka --reassignment-json-file topic-reassignment.json --execute --throttle 50000000  (限速bytes/s,50MB/s)

  • 验证分区重分配计划
kafka-reassign-partitions --zookeeper prd-zbka-001:2181/kafka/ka --reassignment-json-file topic-reassignment.json --verify
  • 此方案优缺点
    • 优点: 简单省事,数据量不大的情况下,亦可限速,比较灵活
    • 缺点:数量大的时候,会导致不同的brokers之间重复的迁移数据,增大延长任务
      ##kafka加上cruise control可界面操作
  • 加入cruise control, 在界面上可以进行下架broker节点,均衡partitions
  • 此方案优缺点:
    • 优点:有界面操作,且有和broker节点的partitions分布,能够做到brokers之间的partitions均衡
    • 缺点:
      • 1.x之前kafka版本,不支持JBOD技术,单brokers多个磁盘的情况,会造成partitions均衡,但磁盘使用不均衡,差异较大。
      • 界面均衡对于均衡能配置partitions数量的限制,但是不可配置限制速率。单个partitions过大,移动过程中会导致磁盘io被吃满。
        {F293176 size=full}

##在官方命令工具基础上,编写脚本,精确计算partitions分布

  • kafka-topics --zookeeper pre-zbka-003:2181/kafka/ka --describe |grep -i "leader: " > topics_leader.txt
	Topic: zb-dialogue-grpc-session-all-in-one	Partition: 73	Leader: 14	Replicas: 14,10	Isr: 10,14
	Topic: zb-dialogue-grpc-session-all-in-one	Partition: 74	Leader: 5	Replicas: 5,11	Isr: 11,5
	Topic: zb-dialogue-grpc-session-all-in-one	Partition: 75	Leader: 8	Replicas: 8,19	Isr: 19,8
	Topic: zb-dialogue-grpc-session-all-in-one	Partition: 76	Leader: 19	Replicas: 19,15	Isr: 15,19
	Topic: zb-dialogue-grpc-session-all-in-one	Partition: 77	Leader: 15	Replicas: 15,1	Isr: 1,15
	Topic: zb-dialogue-grpc-session-all-in-one	Partition: 78	Leader: 1	Replicas: 1,2	Isr: 1,2
	Topic: zb-dialogue-grpc-session-all-in-one	Partition: 79	Leader: 2	Replicas: 2,3	Isr: 2,3
	Topic: zb-dialogue-grpc-session-all-in-one	Partition: 80	Leader: 13	Replicas: 13,4	Isr: 4,13
	Topic: zb-dialogue-grpc-session-all-in-one	Partition: 81	Leader: 4	Replicas: 4,16	Isr: 4,16
	Topic: zb-dialogue-grpc-session-all-in-one	Partition: 82	Leader: 5	Replicas: 5,14	Isr: 14,5
	Topic: zb-dialogue-grpc-session-all-in-one	Partition: 83	Leader: 6	Replicas: 6,8	Isr: 8,6
	Topic: zb-dialogue-grpc-session-all-in-one	Partition: 84	Leader: 7	Replicas: 7,17	Isr: 7,17
	Topic: zb-dialogue-grpc-session-all-in-one	Partition: 85	Leader: 17	Replicas: 17,9	Isr: 9,17
	Topic: zb-dialogue-grpc-session-all-in-one	Partition: 86	Leader: 9	Replicas: 9,10	Isr: 9,10
	Topic: zb-dialogue-grpc-session-all-in-one	Partition: 87	Leader: 10	Replicas: 10,11	Isr: 10,11
	Topic: zb-dialogue-grpc-session-all-in-one	Partition: 88	Leader: 11	Replicas: 11,12	Isr: 12,11
	Topic: zb-dialogue-grpc-session-all-in-one	Partition: 89	Leader: 13	Replicas: 13,18	Isr: 18,13
	Topic: zb-dialogue-grpc-session-all-in-one	Partition: 90	Leader: 5	Replicas: 5,15	Isr: 15,5
	Topic: zb-dialogue-grpc-session-all-in-one	Partition: 91	Leader: 19	Replicas: 19,1	Isr: 1,19
	Topic: zb-dialogue-grpc-session-all-in-one	Partition: 92	Leader: 8	Replicas: 8,2	Isr: 8,2
	Topic: zb-dialogue-grpc-session-all-in-one	Partition: 93	Leader: 1	Replicas: 1,3	Isr: 1,3
	Topic: zb-dialogue-grpc-session-all-in-one	Partition: 94	Leader: 2	Replicas: 2,4	Isr: 4,2
	Topic: zb-dialogue-grpc-session-all-in-one	Partition: 95	Leader: 3	Replicas: 3,16	Isr: 3,16
	Topic: zb-dialogue-grpc-session-all-in-one	Partition: 96	Leader: 4	Replicas: 4,6	Isr: 4,6
	Topic: zb-dialogue-grpc-session-all-in-one	Partition: 97	Leader: 16	Replicas: 16,7	Isr: 7,16
  • 使用remove_node.py进行替换需要下架的remove_nodes为保留的nodes
#!/usr/bin/env python
import json
import random
temp={"version":1,"partitions":[]}
remove_nodes=[19,18,17,16]
def gen_random_node(slist):
    node = random.randint(1,15)
    if node in slist:
        return gen_random_node(slist)
    else:
        return node

with open('topics_leader.txt',"r") as f:
  result = {}
  for line in f:
      line_sp=line.strip().split()
      topic,partition,broker,reps=line_sp[1],line_sp[3],line_sp[5],line_sp[7]
      reps_l = []
      for j in reps.split(","):
          reps_l.append(int(j))
      for i in range(len(reps_l)):
          if reps_l[i] in remove_nodes:
             reps_l[i] = gen_random_node(reps_l)
             result.setdefault(reps_l[i],0)
             result[reps_l[i]] += 1
      paritions={"topic":topic,"partition":int(partition),"replicas":reps_l}
      temp['partitions'].append(paritions)
print json.dumps(temp)
  • 上述文件输出到文件balance_topics.json,例子
{"version": 1, "partitions": [{"topic": "ItemEvent", "partition": 0, "replicas": [2, 4]}, {"topic": "ItemEvent", "partition": 1, "replicas": [9, 11]}, {"topic": "ItemEvent", "partition": 2, "replicas": [10, 6]}, {"topic": "ItemEvent", "partition": 3, "replicas": [11, 7]}, {"topic": "ItemEvent", "partition": 4, "replicas": [12, 5]}, {"topic": "ItemEvent", "partition": 5, "replicas": [12, 9]}, {"topic": "ItemEvent", "partition": 6, "replicas": [9, 10]}, {"topic": "ItemEvent", "partition": 7, "replicas": [15, 5]}, {"topic": "ItemEvent", "partition": 8, "replicas": [8, 13]}, {"topic": "ItemEventTest", "partition": 0, "replicas": [1]}, {"topic": "ItemEventTest", "partition": 1, "replicas": [3]}, {"topic": "ItemEventTest", "partition": 2, "replicas": [4]}, {"topic": "ItemEventTest", "partition": 3, "replicas": [5]}, {"topic": "ItemEventTest", "partition": 4, "replicas": [6]}, {"topic": "ItemEventTest", "partition": 5, "replicas": [7]}, {"topic": "ItemEventTest", "partition": 6, "replicas": [8]}, {"topic": "ItemEventTest", "partition": 7, "replicas": [9]}, {"topic": "ItemEventTest", "partition": 8, "replicas": [10]}, {"topic": "ItemEventTest", "partition": 9, "replicas": [11]}, {"topic": "ItemEventTest", "partition": 10, "replicas": [12]}, {"topic": "ItemEventTest", "partition": 11, "replicas": [13]}, {"topic": "ItemEventTest", "partition": 12, "replicas": [14]}, {"topic": "ItemEventTest", "partition": 13, "replicas": [15]}, {"topic": "ItemEventTest", "partition": 14, "replicas": [6]}, {"topic": "ItemEventTest", "partition": 15, "replicas": [3]}, {"topic": "ItemEventTest", "partition": 16, "replicas": [5]}]}
  • 对上述balance_topics.json提交任务
kafka-reassign-partitions  --zookeeper prd-zbka-003:2181/kafka/ka --reassignment-json-file balance_topics.json --execute  --throttle 50000000
  • 此方案优缺点:
    • 优点:partitions分布灵活可控制,精确计算迁移,减少不必要的移动,亦可进行限速。
    • 缺点:partitions过多,不对partitions移动数量限制的话,提交大任务,即使可以限速,依然会对kafka产生比较大的压力,影响性能。且任务提交也不能中止。
  • 方案优化:
  • partitions迁移过程过多的情况下,少量partitions提交。
  • 并通过监控zookeeper /kafka/ka/admin/reassign_partitions path,定时检查,完成启动新任务。

Kafka 均衡分区

to-move.topics.txt 存放要均衡的topics
#!/usr/bin/env python
import json
temp={"topics":
[],
"version":1
}

with open("to-move.topics.txt", 'rb') as f:
    for line in f:
        topic = line.strip()
        temp['topics'].append({"topic": topic})

print json.dumps(temp,indent=4)
通过topics-to-move.json生成要分配到机器的topicjson
#kafka-reassign-partitions -zookeeper prd-infra-005:2181/kafka/ka --topics-to-move-json-file topics-to-move.json --broker-list "1,2,3,4,5,6,7,8,9" --generate
kafka-reassign-partitions -zookeeper prd-infra-005:2181/kafka/ka --topics-to-move-json-file topics-to-move.json --broker-list "21,22,23,24,25,26,27,28,29" --generate
kafka-reassign-partitions  --zookeeper prd-infra-005:2181/kafka/ka --reassignment-json-file balance_topics.json --execute  --throttle 10000000
#kafka-reassign-partitions  --zookeeper prd-infra-005:2181/kafka/ka --reassignment-json-file balance_topics.json --verify

for i in $(<to-move.topics.txt);
do
echo $i
#kafka-configs --zookeeper prd-infra-005:2181/kafka/ka  --alter --entity-type topics --entity-name $i  --add-config retention.ms=28800000
#kafka-configs --zookeeper prd-infra-005:2181/kafka/ka  --alter --entity-type topics --entity-name $i  --add-config cleanup.policy=delete
kafka-configs --zookeeper prd-infra-005:2181/kafka/ka  --alter --entity-type topics --entity-name $i --delete-config retention.ms,cleanup.policy
done

Elasticsearch 创建索引,删除索引,设置warm脚本

#!/usr/bin/env python
# coding: utf-8
# author: ertao.xu
import sys
reload(sys)
sys.setdefaultencoding("utf-8")
from optparse import OptionParser
from requests.auth import HTTPBasicAuth
import yaml
import requests
import datetime
import os
import logging
import logging.handlers
import json
import arrow

import socket
from tenacity import retry, stop_after_attempt, wait_fixed

logger = logging.getLogger("logger")
handler1 = logging.StreamHandler()
handler2 = logging.FileHandler(filename="/var/log/es_scripts.log")
logger.setLevel(logging.INFO)
formatter = logging.Formatter("%(asctime)s %(name)s %(levelname)s %(message)s")
handler1.setFormatter(formatter)
handler2.setFormatter(formatter)

logger.addHandler(handler1)
logger.addHandler(handler2)

s = requests.session()
config = os.path.dirname(os.path.abspath(__file__))+"/config.yaml"

yersterday = arrow.now().shift(days=-1).format('YYYY.MM.DD')
today = arrow.now().format('YYYY.MM.DD')
tomorrow = arrow.now().shift(days=1).format('YYYY.MM.DD')


class ES(object):

    def __init__(self, settings):
        self.settings = settings
        self.host = settings.get("host")
        self.name = settings.get("name")
        self.user = settings.get("user")
        self.password = settings.get("password")
        self.env = settings.get("env")
        self.info = settings.get("indices", {})
        self.box_type = settings.get("box_type", False)
        self.hot_days = settings.get("hot_days")
        self.delete = settings.get("delete", False)
        self.create = settings.get("create", False)
        self.writeSreIndex()

    def writeSreIndex(self):
        data = {}
        for index, info in self.info.items():
            data["name"] = index
            data.update(info)
            res = s.post(self.host+"/sre-index/doc/{0}".format(index),
                         auth=HTTPBasicAuth(self.user, self.password),
                         json=data)
            logger.info(res.text)

    def getIndices(self):
        response = s.get(self.host+"/_cat/indices?bytes=gb",
                         auth=HTTPBasicAuth(self.user, self.password))
        return response.text

    def backupKibana(self):
        query = {"from": 0, "size": 10000}
        response = s.get(self.host+"/.kibana/_search",
                         params=query, auth=HTTPBasicAuth(self.user,
                                                          self.password))
        with open('/data/kibana_backup/'+self.name+"-"+today, 'wb') as f:
            f.write(response.text)

    def calIsNotDespire(self, index_prefix, index_time):
        days = self.info.get(index_prefix, {}).get("days")
        if not days:
            days = self.info.get("default", {}).get("days")
        try:
            index_timestamp = int(datetime.datetime.strptime(index_time,
                                  "%Y.%m.%d").strftime("%s"))
            index_settime = int((datetime.datetime.now() -
                                 datetime.timedelta(days=days)).strftime("%s"))
        except Exception as e:
            logger.exception(index_prefix + "-" + str(index_time) +
                             " " + str(e))
            return False
        else:
            if index_settime > index_timestamp:
                return True
            else:
                return False

    @retry(stop=stop_after_attempt(3), wait=wait_fixed(5))
    def deleteIndices(self):
        if not self.delete:
            logger.info('''{0} {1} does not allow to
                        be deleted by scripts.'''.format(self.name, self.host))
            return False
        for line in self.getIndices().strip().split("\n"):
            line_sp = line.split()
            if not line_sp or '-20' not in line_sp[2]:
                continue
            else:
                index_prefix = "-".join(line_sp[2].split("-")[0:-1])
            if not index_prefix or index_prefix.startswith("."):
                continue
            index_time = line_sp[2].split("-")[-1]
            if self.calIsNotDespire(index_prefix, index_time):
                response = s.delete(self.host+"/{0}".format(
                    index_prefix + "-" + index_time),
                    auth=HTTPBasicAuth(self.user, self.password))
                logger.info(index_prefix + "-" + index_time + response.text)

    def parseIndices(self):
        result = {}
        for line in self.getIndices().split("\n"):
            line_sp = line.split()
            if not line_sp or '-20' not in line_sp[2]:
                continue
            else:
                index_prefix = "-".join(line_sp[2].split("-")[0:-1])
            if not index_prefix or index_prefix.startswith("."):
                continue
            index_size = line_sp[-1]
            if index_prefix not in result:
                result.setdefault(index_prefix, {})["num"] = 1
                result.setdefault(index_prefix, {})["size"] = int(index_size)
            else:
                result[index_prefix]["size"] += int(index_size)
                result[index_prefix]["num"] += 1
        return result

    def calIndicesShards(self):
        result = self.parseIndices()
        for k, v in result.items():
            index_prefix = k
            index_avgsize = result[index_prefix]["size"]/result[
                            index_prefix]["num"]
            if index_avgsize < 10:
                index_shards = 1
            elif index_avgsize < 60:
                index_shards = 2
            else:
                index_shards = index_avgsize/60 + 2
            result[index_prefix]["shards"] = index_shards
        return result

    @retry(stop=stop_after_attempt(3), wait=wait_fixed(5))
    def createHotIndices(self):
        if not self.create:
            logger.info(
                "{0} {1} do not allow to create index by scripts.".format(
                    self.name, self.host))
            return False
        data = self.calIndicesShards()
        for index, v in data.items():
            settings = {
               "settings": {
                   "number_of_replicas": 0,
                   "index.routing.allocation.include.box_type": "hot"
               }
            }
            res = s.put(self.host+"/{0}".format(index+"-{0}".format(tomorrow)),
                        auth=HTTPBasicAuth(self.user, self.password),
                        json=settings)
            logger.info(res.text)

    def calIsNotHotIndex(self, index_prefix, index_time):
        try:
            index_timestamp = arrow.get(index_time).timestamp
            index_settime = arrow.get(arrow.now().shift(days=int("-{0}".format(
                self.hot_days))).format("YYYY.MM.DD")).timestamp
        except Exception as e:
            logger.exception(index_prefix + "-" + str(index_time) +
                             " " + str(e))
            return False
        else:
            if index_settime < index_timestamp:
                return True
            else:
                return False

    @retry(stop=stop_after_attempt(3), wait=wait_fixed(5))
    def setIndicesToWarm(self):
        settings = {
            "settings": {
                "index.routing.allocation.include.box_type": "warm",
                "index.routing.allocation.total_shards_per_node": None
            }
        }

        if not self.box_type:
            logger.info("{0} {1} don't allow to be settings by scripts".format(
                self.name, self.host))
            return False
        for line in self.getIndices().split("\n"):
            line_sp = line.split()
            if not line_sp or '-20' not in line_sp[2] or \
               today in line_sp[2] or tomorrow in line_sp[2]:
                continue
            else:
                index_prefix = "-".join(line_sp[2].split("-")[0:-1])
            if not index_prefix or index_prefix.startswith("."):
                continue
            index_time = line_sp[2].split("-")[-1]
            if self.calIsNotHotIndex(index_prefix, index_time):
                logger.info("{0}-{1} is hot index".format(index_prefix,
                                                          index_time))
            else:
                logger.info("{0}-{1} is not hot index".format(index_prefix,
                                                              index_time))
                res = s.put(self.host+"/{0}-{1}/_settings".format(index_prefix,
                            index_time), json=settings,
                            auth=HTTPBasicAuth(self.user, self.password))
                logger.info(
                    '''put index {0}-{1} box_type
                    to warm, response is {2}'''.format(
                        index_prefix, index_time, res.text))


def tographite(value):
    try:
        HOST = '192.168.17.163'
        PORT = 2003
        s = socket.socket()
        s.connect((HOST, PORT))
    except Exception as e:
        logger.exception(e)
    else:
        print(value.lower() + " {0}\n".format(arrow.now().timestamp))
        s.sendall(value.lower() + " {0}\n".format(arrow.now().timestamp))
        s.close()


def delete():
    f = open(config)
    settings = yaml.load(f)
    for setting in settings:
        try:
            es = ES(settings=setting)
            es.deleteIndices()
        except Exception as e:
            logger.exception(e)
            tographite("manage_es.{0}.delete 0".format(setting['name']))
        else:
            tographite("manage_es.{0}.delete 1".format(es.name))
    f.close()


def create():
    f = open(config)
    settings = yaml.load(f)
    for setting in settings:
        try:
            es = ES(settings=setting)
            logger.info('{0} {1} is starting'.format(es.name, es.host))
            es.createHotIndices()
        except Exception as e:
            logger.exception(e)
            logger.info('{0} {1} is exception'.format(setting['name'],
                                                      setting['host']))
            tographite("manage_es.{0}.create 0".format(setting['name']))
        else:
            tographite("manage_es.{0}.create 1".format(es.name))
            logger.info('{0} {1} is end'.format(es.name, es.host))
    f.close()


def setting():
    f = open(config)
    settings = yaml.load(f)
    for setting in settings:
        try:
            es = ES(settings=setting)
            es.setIndicesToWarm()
        except Exception as e:
            logger.exception(e)
            tographite("manage_es.{0}.setting 0".format(setting['name']))
        else:
            tographite("manage_es.{0}.setting 1".format(es.name))
    f.close()


def backup():
    f = open(config)
    settings = yaml.load(f)
    for setting in settings:
        try:
            es = ES(settings=setting)
            es.backupKibana()
        except Exception as e:
            logger.exception(e)
    f.close()


if __name__ == "__main__":
    usage = "usage: %prog -m (delete|create|setting)"
    parser = OptionParser(usage=usage)
    parser.add_option("-m",
                      dest="module", help='''choise
                      execute module(delete|create|setting|backup)''')
    (options, args) = parser.parse_args()
    if options.module and options.module == "delete":
        delete()
    elif options.module and options.module == "create":
        create()
    elif options.module and options.module == "setting":
        setting()
    elif options.module and options.module == "backup":
        backup()
    else:
        parser.print_help()

导入原来index的mapping

{
  "mapping": {
    "context-parameter": {
      "properties": {
        "@timestamp": {
          "type": "date"
        },
        "galaxy": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "intent": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "item_category_by_store": {
          "type": "keyword"
        },
        "item_category_by_taobao": {
          "type": "keyword"
        },
        "item_id": {
          "type": "keyword"
        },
        "question_uuid": {
          "type": "keyword"
        },
        "refund_status": {
          "type": "keyword"
        },
        "sid": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "timestamp": {
          "type": "date"
        },
        "trade_status": {
          "type": "keyword"
        }
      }
    }
  }
}
PUT index
{
  "mappings": {
    "context-parameter": {
      "properties": {
        "@timestamp": {
          "type": "date"
        },
        "galaxy": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "intent": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "item_category_by_store": {
          "type": "keyword"
        },
        "item_category_by_taobao": {
          "type": "keyword"
        },
        "item_id": {
          "type": "keyword"
        },
        "question_uuid": {
          "type": "keyword"
        },
        "refund_status": {
          "type": "keyword"
        },
        "sid": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "timestamp": {
          "type": "date"
        },
        "trade_status": {
          "type": "keyword"
        }
      }
    }
  }
}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.