ElasticSearch在不同环境同步索引数据的方式
1、在生产环境导出json数据
curl -u “adims_user:xkR%cHwR5I9g” -X GET “http://172.18.251.132:9200/unify_info_mb_sp_aggregatetb_0004/_search?scroll=1m” -H ‘Content-Type: application/json’ -d'{“size”: 100000,”query”: {“bool”: {“must”: [{ “term”: { “categoryId”: 30 }},{ “term”: { “factoryType”: “煤炭电厂” }},{ “term”: { “isDelete”: 0 }},{ “term”: { “countryName”: “中国” }}]}}}’ > initial_batch.json
2、利用python把导出的json数据转成bulk数据
python脚本convert_to_bulk_simple.py
import json
import sys
def convert_search_to_bulk(input_file, output_file, target_index):
“””将ES查询结果转换为bulk格式”””
with open(input_file, ‘r’, encoding=’utf-8′) as f:
data = json.load(f)
if ‘hits’ not in data or ‘hits’ not in data[‘hits’]:
print(“错误: 不是有效的ES查询结果格式”)
return False
hits = data[‘hits’][‘hits’]
print(f”找到 {len(hits)} 个文档”)
with open(output_file, ‘w’, encoding=’utf-8′) as f:
for hit in hits:
# action行
action = {“index”: {“_index”: target_index, “_id”: hit.get(‘_id’)}}
f.write(json.dumps(action) + ‘\n’)
# document行
f.write(json.dumps(hit.get(‘_source’, {})) + ‘\n’)
# 确保以换行符结尾
f.write(‘\n’)
print(f”转换完成: {output_file}”)
return True
if __name__ == “__main__”:
if len(sys.argv) != 4:
print(“使用方法: python convert_to_bulk_simple.py input.json output.json target_index”)
sys.exit(1)
convert_search_to_bulk(sys.argv[1], sys.argv[2], sys.argv[3])
执行转换命令:
python convert_to_bulk_simple.py initial_batch2.json bulk_data.json unify_info_mb_sp_aggregatetb_0004
3、把转换的数据导入到测试环境
curl -u “adims_user:j0SMMmI+Rwfv” -X POST “http://192.168.168.243:9200/_bulk” -H “Content-Type: application/json” –data-binary @bulk_data.json
4、导入前后查询数据量大小,验证是否导入成功
curl -u “adims_user:j0SMMmI+Rwfv” -X GET “http://192.168.168.243:9200/unify_info_mb_sp_aggregatetb_0004/_count”
1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长!
2. 分享目的仅供大家学习和交流,您必须在下载后24小时内删除!
3. 不得使用于非法商业用途,不得违反国家法律。否则后果自负!
4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系管理员处理!
6. 如遇到加密压缩包,请使用WINRAR解压,如遇到无法解压的请联系管理员!
7. 本站有不少源码未能详细测试(解密),不能分辨部分源码是病毒还是误报,所以没有进行任何修改,大家使用前请进行甄别!
66源码网 » ElasticSearch在不同环境同步索引数据的方式