docker镜像:elasticsearch:7.9.1
1pip3 install elasticsearch==5.5.3 -i https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple
1from elasticsearch import Elasticsearch
2from elasticsearch.helpers import scan
3import csv
4from typing import List, Optional
5
6
7def es_to_csv(es_host: str,
8 es_port: int,
9 index_name: str,
10 output_csv: str,
11 fields: Optional[List[str]] = None,
12 username: Optional[str] = None,
13 password: Optional[str] = None,
14 batch_size: int = 1000):
15 """
16 将Elasticsearch指定索引的全部数据导出为CSV文件(适配新版elasticsearch库)
17 """
18 # 构建ES连接URL
19 es_url = f"http://{es_host}:{es_port}"
20
21 # 初始化连接参数
22 es_kwargs = {}
23 # 处理认证(新版本中单独传递auth参数)
24 if username and password:
25 # es_kwargs["basic_auth"] = (username, password)
26 es_kwargs["http_auth"] = (username, password)
27
28 # 连接Elasticsearch
29 es = Elasticsearch(es_url, **es_kwargs)
30
31 # 检查索引是否存在
32 if not es.indices.exists(index=index_name):
33 raise ValueError(f"索引 '{index_name}' 不存在")
34
35 # 检查连接是否成功
36 if not es.ping():
37 raise ConnectionError("无法连接到Elasticsearch服务")
38
39 # 构建查询(匹配所有文档)
40 query = {"query": {"match_all": {}}}
41
42 # 扫描所有文档并写入CSV
43 with open(output_csv, mode='w', newline='', encoding='utf-8') as csv_file:
44 writer = None
45 total_count = 0
46
47 for hit in scan(es, index=index_name, query=query, size=batch_size):
48 source_data = hit['_source']
49
50 # 初始化CSV写入器
51 if writer is None:
52 headers = fields if fields else source_data.keys()
53 writer = csv.DictWriter(csv_file, fieldnames=headers)
54 writer.writeheader()
55
56 # 准备数据行
57 row_data = {field: source_data[field] for field in fields} if fields else source_data
58 writer.writerow(row_data)
59 total_count += 1
60
61 # 打印进度
62 if total_count % batch_size == 0:
63 print(f"已导出 {total_count} 条数据...")
64
65 print(f"导出完成!共导出 {total_count} 条数据到 {output_csv}")
66
67
68if __name__ == "__main__":
69 # 配置参数
70 ES_HOST = "192.168.199.38" # Elasticsearch主机
71 ES_PORT = 9200 # Elasticsearch端口
72 INDEX_NAME = "all_test" # 替换为要导出的索引名
73 OUTPUT_CSV = "/es_data.csv" # 输出CSV文件路径
74 USERNAME = None # 如有认证,填写用户名
75 PASSWORD = None # 如有认证,填写密码
76 # 如需指定导出字段,取消下面一行注释并修改字段列表
77 # FIELDS = ["field1", "field2", "field3"]
78 FIELDS = None # 导出所有字段
79
80 # 执行导出
81 try:
82 es_to_csv(
83 es_host=ES_HOST,
84 es_port=ES_PORT,
85 index_name=INDEX_NAME,
86 output_csv=OUTPUT_CSV,
87 fields=FIELDS,
88 username=USERNAME,
89 password=PASSWORD
90 )
91 except Exception as e:
92 print(f"导出失败: {str(e)}")
高亮部分可根据实际情况进行调整
1import pandas as pd
2from elasticsearch import Elasticsearch
3from elasticsearch.helpers import bulk
4from typing import Optional, Dict
5
6
7def csv_to_es(csv_file: str,
8 es_host: str,
9 es_port: int,
10 target_index: str,
11 username: Optional[str] = None,
12 password: Optional[str] = None,
13 mapping: Optional[Dict] = None,
14 batch_size: int = 1000,
15 cleanup_before_import: bool = True):
16 """
17 将CSV文件数据导入到指定的Elasticsearch索引(支持导入前清理数据)
18
19 参数:
20 cleanup_before_import: 导入前是否清理目标索引数据(True/False)
21 """
22 # 1. 读取CSV文件
23 try:
24 df = pd.read_csv(
25 csv_file,
26 encoding='utf-8',
27 na_values=['', 'null'],
28 keep_default_na=False
29 )
30 except Exception as e:
31 raise ValueError(f"读取CSV文件失败: {str(e)}")
32
33 # 2. 连接Elasticsearch
34 es_url = f"http://{es_host}:{es_port}"
35 es_kwargs = {
36 "verify_certs": False,
37 "ssl_show_warn": False
38 }
39 if username and password:
40 es_kwargs["basic_auth"] = (username, password)
41
42 es = Elasticsearch(es_url, **es_kwargs)
43
44 if not es.ping():
45 raise ConnectionError("无法连接到目标Elasticsearch服务")
46
47 # 3. 导入前清理数据
48 if cleanup_before_import:
49 if es.indices.exists(index=target_index):
50 print(f"开始清理索引 '{target_index}' 中的现有数据...")
51
52 # # 方式1:删除索引并重建(适用于需要重置映射的场景)
53 # es.indices.delete(index=target_index)
54 # print(f"已删除索引 '{target_index}'")
55 #
56 # # 重建索引(使用指定映射或默认映射)
57 # if mapping:
58 # es.indices.create(index=target_index, body=mapping)
59 # print(f"使用指定映射重建索引 '{target_index}'")
60 # else:
61 # es.indices.create(index=target_index)
62 # print(f"使用默认映射重建索引 '{target_index}'")
63
64 # 方式2:仅清空数据(保留索引结构,取消上面注释并注释方式1即可)
65 es.delete_by_query(index=target_index, body={"query": {"match_all": {}}})
66 print(f"已清空索引 '{target_index}' 中的数据")
67 else:
68 print(f"索引 '{target_index}' 不存在,无需清理")
69
70 # 4. 若索引仍不存在则创建(适用于首次导入或未清理的场景)
71 if not es.indices.exists(index=target_index):
72 if mapping:
73 es.indices.create(index=target_index, body=mapping)
74 print(f"使用指定映射创建新索引 '{target_index}'")
75 else:
76 es.indices.create(index=target_index)
77 print(f"使用默认映射创建新索引 '{target_index}'")
78
79 # 5. 批量导入数据
80 total_docs = len(df)
81 success_count = 0
82
83 print(f"开始导入数据,共 {total_docs} 条记录...")
84
85 for i in range(0, total_docs, batch_size):
86 batch_df = df.iloc[i:i + batch_size]
87 actions = []
88
89 for _, row in batch_df.iterrows():
90 doc = row.to_dict()
91 # 处理空值
92 for key, value in doc.items():
93 if pd.isna(value):
94 doc[key] = None
95
96 actions.append({
97 "_index": target_index,
98 "_source": doc
99 })
100
101 # 执行批量导入
102 success, failed = bulk(es, actions)
103 success_count += success
104
105 # 打印进度
106 processed = min(i + batch_size, total_docs)
107 print(f"已处理 {processed}/{total_docs} 条,成功导入 {success_count} 条")
108
109 print(f"导入完成!总记录数: {total_docs}, 成功导入: {success_count}")
110
111
112if __name__ == "__main__":
113 # 配置参数
114 CSV_FILE = "/es_data.csv" # 要导入的CSV文件路径
115 TARGET_INDEX = "all_test" # 目标索引名称
116
117 ES_HOST = "192.168.199.38" # Elasticsearch主机
118 ES_PORT = 9200 # Elasticsearch端口
119 USERNAME = None # 如有认证,填写用户名
120 PASSWORD = None # 如有认证,填写密码
121 # 如需指定导出字段,取消下面一行注释并修改字段列表
122 # FIELDS = ["field1", "field2", "field3"]
123 FIELDS = None # 导出所有字段
124
125 # 可选:定义索引映射(推荐,确保字段类型正确)
126 # 示例映射(根据CSV实际字段调整)
127 MAPPING = {
128 "mappings": {
129 "properties": {
130 # 示例字段:根据你的CSV字段修改
131 "id": {"type": "integer"},
132 "name": {"type": "text", "fields": {"keyword": {"type": "keyword"}}},
133 "age": {"type": "integer"},
134 "join_date": {"type": "date", "format": "yyyy-MM-dd"},
135 "salary": {"type": "float"}
136 }
137 }
138 }
139
140 # 执行导入
141 try:
142 csv_to_es(
143 csv_file=CSV_FILE,
144 es_host=ES_HOST,
145 es_port=ES_PORT,
146 target_index=TARGET_INDEX,
147 username=USERNAME,
148 password=PASSWORD,
149 mapping=None, # 如需使用默认映射,可设为None
150 batch_size=1000
151 )
152 except Exception as e:
153 print(f"导入失败: {str(e)}")