ElasticSearch版本

docker镜像:elasticsearch:7.9.1

脚本环境准备

1pip3 install elasticsearch==5.5.3 -i https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple

ES导入CSV

Python
1from elasticsearch import Elasticsearch
2from elasticsearch.helpers import scan
3import csv
4from typing import List, Optional
5
6
7def es_to_csv(es_host: str,
8              es_port: int,
9              index_name: str,
10              output_csv: str,
11              fields: Optional[List[str]] = None,
12              username: Optional[str] = None,
13              password: Optional[str] = None,
14              batch_size: int = 1000):
15    """
16    将Elasticsearch指定索引的全部数据导出为CSV文件(适配新版elasticsearch库)
17    """
18    # 构建ES连接URL
19    es_url = f"http://{es_host}:{es_port}"
20
21    # 初始化连接参数
22    es_kwargs = {}
23    # 处理认证(新版本中单独传递auth参数)
24    if username and password:
25        # es_kwargs["basic_auth"] = (username, password)
26        es_kwargs["http_auth"] = (username, password)
27
28    # 连接Elasticsearch
29    es = Elasticsearch(es_url, **es_kwargs)
30
31    # 检查索引是否存在
32    if not es.indices.exists(index=index_name):
33        raise ValueError(f"索引 '{index_name}' 不存在")
34
35    # 检查连接是否成功
36    if not es.ping():
37        raise ConnectionError("无法连接到Elasticsearch服务")
38
39    # 构建查询(匹配所有文档)
40    query = {"query": {"match_all": {}}}
41
42    # 扫描所有文档并写入CSV
43    with open(output_csv, mode='w', newline='', encoding='utf-8') as csv_file:
44        writer = None
45        total_count = 0
46
47        for hit in scan(es, index=index_name, query=query, size=batch_size):
48            source_data = hit['_source']
49
50            # 初始化CSV写入器
51            if writer is None:
52                headers = fields if fields else source_data.keys()
53                writer = csv.DictWriter(csv_file, fieldnames=headers)
54                writer.writeheader()
55
56            # 准备数据行
57            row_data = {field: source_data[field] for field in fields} if fields else source_data
58            writer.writerow(row_data)
59            total_count += 1
60
61            # 打印进度
62            if total_count % batch_size == 0:
63                print(f"已导出 {total_count} 条数据...")
64
65        print(f"导出完成!共导出 {total_count} 条数据到 {output_csv}")
66
67
68if __name__ == "__main__":
69    # 配置参数
70    ES_HOST = "192.168.199.38"  # Elasticsearch主机
71    ES_PORT = 9200  # Elasticsearch端口
72    INDEX_NAME = "all_test"  # 替换为要导出的索引名
73    OUTPUT_CSV = "/es_data.csv"  # 输出CSV文件路径
74    USERNAME = None  # 如有认证,填写用户名
75    PASSWORD = None  # 如有认证,填写密码
76    # 如需指定导出字段,取消下面一行注释并修改字段列表
77    # FIELDS = ["field1", "field2", "field3"]
78    FIELDS = None  # 导出所有字段
79
80    # 执行导出
81    try:
82        es_to_csv(
83            es_host=ES_HOST,
84            es_port=ES_PORT,
85            index_name=INDEX_NAME,
86            output_csv=OUTPUT_CSV,
87            fields=FIELDS,
88            username=USERNAME,
89            password=PASSWORD
90        )
91    except Exception as e:
92        print(f"导出失败: {str(e)}")

CSV导入ES

TIP

高亮部分可根据实际情况进行调整

Python
1import pandas as pd
2from elasticsearch import Elasticsearch
3from elasticsearch.helpers import bulk
4from typing import Optional, Dict
5
6
7def csv_to_es(csv_file: str,
8              es_host: str,
9              es_port: int,
10              target_index: str,
11              username: Optional[str] = None,
12              password: Optional[str] = None,
13              mapping: Optional[Dict] = None,
14              batch_size: int = 1000,
15              cleanup_before_import: bool = True):
16    """
17    将CSV文件数据导入到指定的Elasticsearch索引(支持导入前清理数据)
18
19    参数:
20        cleanup_before_import: 导入前是否清理目标索引数据(True/False)
21    """
22    # 1. 读取CSV文件
23    try:
24        df = pd.read_csv(
25            csv_file,
26            encoding='utf-8',
27            na_values=['', 'null'],
28            keep_default_na=False
29        )
30    except Exception as e:
31        raise ValueError(f"读取CSV文件失败: {str(e)}")
32
33    # 2. 连接Elasticsearch
34    es_url = f"http://{es_host}:{es_port}"
35    es_kwargs = {
36        "verify_certs": False,
37        "ssl_show_warn": False
38    }
39    if username and password:
40        es_kwargs["basic_auth"] = (username, password)
41
42    es = Elasticsearch(es_url, **es_kwargs)
43
44    if not es.ping():
45        raise ConnectionError("无法连接到目标Elasticsearch服务")
46
47    # 3. 导入前清理数据
48    if cleanup_before_import:
49        if es.indices.exists(index=target_index):
50            print(f"开始清理索引 '{target_index}' 中的现有数据...")
51
52            # # 方式1:删除索引并重建(适用于需要重置映射的场景)
53            # es.indices.delete(index=target_index)
54            # print(f"已删除索引 '{target_index}'")
55            #
56            # # 重建索引(使用指定映射或默认映射)
57            # if mapping:
58            #     es.indices.create(index=target_index, body=mapping)
59            #     print(f"使用指定映射重建索引 '{target_index}'")
60            # else:
61            #     es.indices.create(index=target_index)
62            #     print(f"使用默认映射重建索引 '{target_index}'")
63
64            # 方式2:仅清空数据(保留索引结构,取消上面注释并注释方式1即可)
65            es.delete_by_query(index=target_index, body={"query": {"match_all": {}}})
66            print(f"已清空索引 '{target_index}' 中的数据")
67        else:
68            print(f"索引 '{target_index}' 不存在,无需清理")
69
70    # 4. 若索引仍不存在则创建(适用于首次导入或未清理的场景)
71    if not es.indices.exists(index=target_index):
72        if mapping:
73            es.indices.create(index=target_index, body=mapping)
74            print(f"使用指定映射创建新索引 '{target_index}'")
75        else:
76            es.indices.create(index=target_index)
77            print(f"使用默认映射创建新索引 '{target_index}'")
78
79    # 5. 批量导入数据
80    total_docs = len(df)
81    success_count = 0
82
83    print(f"开始导入数据,共 {total_docs} 条记录...")
84
85    for i in range(0, total_docs, batch_size):
86        batch_df = df.iloc[i:i + batch_size]
87        actions = []
88
89        for _, row in batch_df.iterrows():
90            doc = row.to_dict()
91            # 处理空值
92            for key, value in doc.items():
93                if pd.isna(value):
94                    doc[key] = None
95
96            actions.append({
97                "_index": target_index,
98                "_source": doc
99            })
100
101        # 执行批量导入
102        success, failed = bulk(es, actions)
103        success_count += success
104
105        # 打印进度
106        processed = min(i + batch_size, total_docs)
107        print(f"已处理 {processed}/{total_docs} 条,成功导入 {success_count} 条")
108
109    print(f"导入完成!总记录数: {total_docs}, 成功导入: {success_count}")
110
111
112if __name__ == "__main__":
113    # 配置参数
114    CSV_FILE = "/es_data.csv"  # 要导入的CSV文件路径
115    TARGET_INDEX = "all_test"  # 目标索引名称
116
117    ES_HOST = "192.168.199.38"  # Elasticsearch主机
118    ES_PORT = 9200  # Elasticsearch端口
119    USERNAME = None  # 如有认证,填写用户名
120    PASSWORD = None  # 如有认证,填写密码
121    # 如需指定导出字段,取消下面一行注释并修改字段列表
122    # FIELDS = ["field1", "field2", "field3"]
123    FIELDS = None  # 导出所有字段
124
125    # 可选:定义索引映射(推荐,确保字段类型正确)
126    # 示例映射(根据CSV实际字段调整)
127    MAPPING = {
128        "mappings": {
129            "properties": {
130                # 示例字段:根据你的CSV字段修改
131                "id": {"type": "integer"},
132                "name": {"type": "text", "fields": {"keyword": {"type": "keyword"}}},
133                "age": {"type": "integer"},
134                "join_date": {"type": "date", "format": "yyyy-MM-dd"},
135                "salary": {"type": "float"}
136            }
137        }
138    }
139
140    # 执行导入
141    try:
142        csv_to_es(
143            csv_file=CSV_FILE,
144            es_host=ES_HOST,
145            es_port=ES_PORT,
146            target_index=TARGET_INDEX,
147            username=USERNAME,
148            password=PASSWORD,
149            mapping=None,  # 如需使用默认映射,可设为None
150            batch_size=1000
151        )
152    except Exception as e:
153        print(f"导入失败: {str(e)}")