首頁 > 軟體

使用python生成大量資料寫入es資料庫並查詢操作(2)

2022-09-24 14:01:13

前言 :

上一篇文章:如何使用python生成大量資料寫入es資料庫並查詢操作

模擬學生個人資訊寫入es資料庫,包括姓名、性別、年齡、特點、科目、成績,建立時間。

方案一

在寫入資料時未提前建立索引mapping,而是每插入一條資料都包含了索引的資訊。

範例程式碼:【多執行緒寫入資料】【一次性寫入10000*1000條資料】  【本人親測耗時3266秒】

from elasticsearch import Elasticsearch
from elasticsearch import helpers
from datetime import datetime
from queue import Queue
import random
import time
import threading
es = Elasticsearch(hosts='http://127.0.0.1:9200')
# print(es)
 
names = ['劉一', '陳二', '張三', '李四', '王五', '趙六', '孫七', '周八', '吳九', '鄭十']
sexs = ['男', '女']
age = [25, 28, 29, 32, 31, 26, 27, 30]
character = ['自信但不自負,不以自我為中心',
             '努力、積極、樂觀、拼搏是我的人生信條',
             '抗壓能力強,能夠快速適應周圍環境',
             '敢做敢拼,腳踏實地;做事認真負責,責任心強',
             '愛好所學專業,樂於學習新知識;對工作有責任心;踏實,熱情,對生活充滿激情',
             '主動性強,自學能力強,具有團隊合作意識,有一定組織能力',
             '忠實誠信,講原則,說到做到,決不推卸責任',
             '有自制力,做事情始終堅持有始有終,從不半途而廢',
             '肯學習,有問題不逃避,願意虛心向他人學習',
             '願意以謙虛態度讚揚接納優越者,權威者',
             '會用100%的熱情和精力投入到工作中;平易近人',
             '為人誠懇,性格開朗,積極進取,適應力強、勤奮好學、腳踏實地',
             '有較強的團隊精神,工作積極進取,態度認真']
subjects = ['語文', '數學', '英語', '生物', '地理']
grades = [85, 77, 96, 74, 85, 69, 84, 59, 67, 69, 86, 96, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86]
create_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
 
def save_to_es(num):
    """
    批次寫入資料到es資料庫
    :param num:
    :return:
    """
    start = time.time()
    action = [
        {
            "_index": "personal_info_10000000",
            "_type": "doc",
            "_id": i,
            "_source": {
                "id": i,
                "name": random.choice(names),
                "sex": random.choice(sexs),
                "age": random.choice(age),
                "character": random.choice(character),
                "subject": random.choice(subjects),
                "grade": random.choice(grades),
                "create_time": create_time
            }
        } for i in range(10000 * num, 10000 * num + 10000)
    ]
    helpers.bulk(es, action)
    end = time.time()
    print(f"{num}耗時{end - start}s!")
 
def run():
    global queue
    while queue.qsize() > 0:
        num = queue.get()
        print(num)
        save_to_es(num)

if __name__ == '__main__':
    start = time.time()
    queue = Queue()
    # 序號資料進佇列
    for num in range(1000):
        queue.put(num)
 
    # 多執行緒執行程式
    consumer_lst = []
    for _ in range(10):
        thread = threading.Thread(target=run)
        thread.start()
        consumer_lst.append(thread)
    for consumer in consumer_lst:
        consumer.join()
    end = time.time()
    print('程式執行完畢!花費時間:', end - start)

執行結果:

 自動建立的索引mapping:

GET personal_info_10000000/_mapping
{
  "personal_info_10000000" : {
    "mappings" : {
      "properties" : {
        "age" : {
          "type" : "long"
        },
        "character" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "create_time" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "grade" : {
          "type" : "long"
        },
        "id" : {
          "type" : "long"
        },
        "name" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "sex" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "subject" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        }
      }
    }
  }
}

方案二

1.順序插入5000000條資料

先建立索引personal_info_5000000,確定好mapping後,再插入資料。

新建索引並設定mapping資訊:

PUT personal_info_5000000
{
  "settings": {
    "number_of_shards": 3,
    "number_of_replicas": 1
  },
  "mappings": {
    "properties": {
      "id": {
        "type": "long"
      },
      "name": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 32
          }
        }
      },
      "sex": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 8
          }
        }
      },
      "age": {
        "type": "long"
      },
      "character": {
        "type": "text",
        "analyzer": "ik_smart",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        }
      },
      "subject": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        }
      },
      "grade": {
        "type": "long"
      },
      "create_time": {
        "type": "date",
        "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
      }
    }
  }
}

檢視新建索引資訊:

GET personal_info_5000000
 
{
  "personal_info_5000000" : {
    "aliases" : { },
    "mappings" : {
      "properties" : {
        "age" : {
          "type" : "long"
        },
        "character" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          },
          "analyzer" : "ik_smart"
        },
        "create_time" : {
          "type" : "date",
          "format" : "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
        },
        "grade" : {
          "type" : "long"
        },
        "id" : {
          "type" : "long"
        },
        "name" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 32
            }
          }
        },
        "sex" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 8
            }
          }
        },
        "subject" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        }
      }
    },
    "settings" : {
      "index" : {
        "routing" : {
          "allocation" : {
            "include" : {
              "_tier_preference" : "data_content"
            }
          }
        },
        "number_of_shards" : "3",
        "provided_name" : "personal_info_50000000",
        "creation_date" : "1663471072176",
        "number_of_replicas" : "1",
        "uuid" : "5DfmfUhUTJeGk1k4XnN-lQ",
        "version" : {
          "created" : "7170699"
        }
      }
    }
  }
}

開始插入資料:

範例程式碼: 【單執行緒寫入資料】【一次性寫入10000*500條資料】  【本人親測耗時7916秒】

from elasticsearch import Elasticsearch
from datetime import datetime
from queue import Queue
import random
import time
import threading
es = Elasticsearch(hosts='http://127.0.0.1:9200')
# print(es)
names = ['劉一', '陳二', '張三', '李四', '王五', '趙六', '孫七', '周八', '吳九', '鄭十']
sexs = ['男', '女']
age = [25, 28, 29, 32, 31, 26, 27, 30]
character = ['自信但不自負,不以自我為中心',
             '努力、積極、樂觀、拼搏是我的人生信條',
             '抗壓能力強,能夠快速適應周圍環境',
             '敢做敢拼,腳踏實地;做事認真負責,責任心強',
             '愛好所學專業,樂於學習新知識;對工作有責任心;踏實,熱情,對生活充滿激情',
             '主動性強,自學能力強,具有團隊合作意識,有一定組織能力',
             '忠實誠信,講原則,說到做到,決不推卸責任',
             '有自制力,做事情始終堅持有始有終,從不半途而廢',
             '肯學習,有問題不逃避,願意虛心向他人學習',
             '願意以謙虛態度讚揚接納優越者,權威者',
             '會用100%的熱情和精力投入到工作中;平易近人',
             '為人誠懇,性格開朗,積極進取,適應力強、勤奮好學、腳踏實地',
             '有較強的團隊精神,工作積極進取,態度認真']
subjects = ['語文', '數學', '英語', '生物', '地理']
grades = [85, 77, 96, 74, 85, 69, 84, 59, 67, 69, 86, 96, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86]
create_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
 
# 新增程式耗時的功能
def timer(func):
    def wrapper(*args, **kwargs):
        start = time.time()
        res = func(*args, **kwargs)
        end = time.time()
        print('id{}共耗時約 {:.2f} 秒'.format(*args, end - start))
        return res
 
    return wrapper
 
@timer
def save_to_es(num):
    """
    順序寫入資料到es資料庫
    :param num:
    :return:
    """
    body = {
        "id": num,
        "name": random.choice(names),
        "sex": random.choice(sexs),
        "age": random.choice(age),
        "character": random.choice(character),
        "subject": random.choice(subjects),
        "grade": random.choice(grades),
        "create_time": create_time
    }
    # 此時若索引不存在時會新建
    es.index(index="personal_info_5000000", id=num, doc_type="_doc", document=body)
 
def run():
    global queue
    while queue.qsize() > 0:
        num = queue.get()
        print(num)
        save_to_es(num)
 
if __name__ == '__main__':
    start = time.time()
    queue = Queue()
    # 序號資料進佇列
    for num in range(5000000):
        queue.put(num)
 
    # 多執行緒執行程式
    consumer_lst = []
    for _ in range(10):
        thread = threading.Thread(target=run)
        thread.start()
        consumer_lst.append(thread)
    for consumer in consumer_lst:
        consumer.join()
    end = time.time()
    print('程式執行完畢!花費時間:', end - start)

執行結果:

2.批次插入5000000條資料

先建立索引personal_info_5000000_v2,確定好mapping後,再插入資料。

新建索引並設定mapping資訊:

PUT personal_info_5000000_v2
{
  "settings": {
    "number_of_shards": 3,
    "number_of_replicas": 1
  },
  "mappings": {
    "properties": {
      "id": {
        "type": "long"
      },
      "name": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 32
          }
        }
      },
      "sex": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 8
          }
        }
      },
      "age": {
        "type": "long"
      },
      "character": {
        "type": "text",
        "analyzer": "ik_smart",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        }
      },
      "subject": {
        "type": "text",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        }
      },
      "grade": {
        "type": "long"
      },
      "create_time": {
        "type": "date",
        "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
      }
    }
  }
}

檢視新建索引資訊:

GET personal_info_5000000_v2
 
{
  "personal_info_5000000_v2" : {
    "aliases" : { },
    "mappings" : {
      "properties" : {
        "age" : {
          "type" : "long"
        },
        "character" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          },
          "analyzer" : "ik_smart"
        },
        "create_time" : {
          "type" : "date",
          "format" : "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
        },
        "grade" : {
          "type" : "long"
        },
        "id" : {
          "type" : "long"
        },
        "name" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 32
            }
          }
        },
        "sex" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 8
            }
          }
        },
        "subject" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        }
      }
    },
    "settings" : {
      "index" : {
        "routing" : {
          "allocation" : {
            "include" : {
              "_tier_preference" : "data_content"
            }
          }
        },
        "number_of_shards" : "3",
        "provided_name" : "personal_info_5000000_v2",
        "creation_date" : "1663485323617",
        "number_of_replicas" : "1",
        "uuid" : "XBPaDn_gREmAoJmdRyBMAA",
        "version" : {
          "created" : "7170699"
        }
      }
    }
  }
}

批次插入資料:

通過elasticsearch模組匯入helper,通過helper.bulk來批次處理大量的資料。首先將所有的資料定義成字典形式,各欄位含義如下:

  • _index對應索引名稱,並且該索引必須存在。
  • _type對應型別名稱。
  • _source對應的字典內,每一篇檔案的欄位和值,可有有多個欄位。

範例程式碼:  【程式中途異常,寫入4714000條資料】

from elasticsearch import Elasticsearch
from elasticsearch import helpers
from datetime import datetime
from queue import Queue
import random
import time
import threading
es = Elasticsearch(hosts='http://127.0.0.1:9200')
# print(es)
names = ['劉一', '陳二', '張三', '李四', '王五', '趙六', '孫七', '周八', '吳九', '鄭十']
sexs = ['男', '女']
age = [25, 28, 29, 32, 31, 26, 27, 30]
character = ['自信但不自負,不以自我為中心',
             '努力、積極、樂觀、拼搏是我的人生信條',
             '抗壓能力強,能夠快速適應周圍環境',
             '敢做敢拼,腳踏實地;做事認真負責,責任心強',
             '愛好所學專業,樂於學習新知識;對工作有責任心;踏實,熱情,對生活充滿激情',
             '主動性強,自學能力強,具有團隊合作意識,有一定組織能力',
             '忠實誠信,講原則,說到做到,決不推卸責任',
             '有自制力,做事情始終堅持有始有終,從不半途而廢',
             '肯學習,有問題不逃避,願意虛心向他人學習',
             '願意以謙虛態度讚揚接納優越者,權威者',
             '會用100%的熱情和精力投入到工作中;平易近人',
             '為人誠懇,性格開朗,積極進取,適應力強、勤奮好學、腳踏實地',
             '有較強的團隊精神,工作積極進取,態度認真']
subjects = ['語文', '數學', '英語', '生物', '地理']
grades = [85, 77, 96, 74, 85, 69, 84, 59, 67, 69, 86, 96, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86]
create_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# 新增程式耗時的功能
def timer(func):
    def wrapper(*args, **kwargs):
        start = time.time()
        res = func(*args, **kwargs)
        end = time.time()
        print('id{}共耗時約 {:.2f} 秒'.format(*args, end - start))
        return res
 
    return wrapper
 
 
@timer
def save_to_es(num):
    """
    批次寫入資料到es資料庫
    :param num:
    :return:
    """
    action = [
        {
            "_index": "personal_info_5000000_v2",
            "_type": "_doc",
            "_id": i,
            "_source": {
                "id": i,
                "name": random.choice(names),
                "sex": random.choice(sexs),
                "age": random.choice(age),
                "character": random.choice(character),
                "subject": random.choice(subjects),
                "grade": random.choice(grades),
                "create_time": create_time
            }
        } for i in range(10000 * num, 10000 * num + 10000)
    ]
    helpers.bulk(es, action)
def run():
    global queue
    while queue.qsize() > 0:
        num = queue.get()
        print(num)
        save_to_es(num)
if __name__ == '__main__':
    start = time.time()
    queue = Queue()
    # 序號資料進佇列
    for num in range(500):
        queue.put(num)
 
    # 多執行緒執行程式
    consumer_lst = []
    for _ in range(10):
        thread = threading.Thread(target=run)
        thread.start()
        consumer_lst.append(thread)
    for consumer in consumer_lst:
        consumer.join()
    end = time.time()
    print('程式執行完畢!花費時間:', end - start)

執行結果:

3.批次插入50000000條資料

先建立索引personal_info_5000000_v2,確定好mapping後,再插入資料。

此過程是在上面批次插入的前提下進行優化,採用python生成器。

建立索引和mapping同上,直接上程式碼:

範例程式碼: 【程式中途異常,寫入3688000條資料】

from elasticsearch import Elasticsearch
from elasticsearch import helpers
from datetime import datetime
from queue import Queue
import random
import time
import threading
es = Elasticsearch(hosts='http://127.0.0.1:9200')
# print(es)
 
names = ['劉一', '陳二', '張三', '李四', '王五', '趙六', '孫七', '周八', '吳九', '鄭十']
sexs = ['男', '女']
age = [25, 28, 29, 32, 31, 26, 27, 30]
character = ['自信但不自負,不以自我為中心',
             '努力、積極、樂觀、拼搏是我的人生信條',
             '抗壓能力強,能夠快速適應周圍環境',
             '敢做敢拼,腳踏實地;做事認真負責,責任心強',
             '愛好所學專業,樂於學習新知識;對工作有責任心;踏實,熱情,對生活充滿激情',
             '主動性強,自學能力強,具有團隊合作意識,有一定組織能力',
             '忠實誠信,講原則,說到做到,決不推卸責任',
             '有自制力,做事情始終堅持有始有終,從不半途而廢',
             '肯學習,有問題不逃避,願意虛心向他人學習',
             '願意以謙虛態度讚揚接納優越者,權威者',
             '會用100%的熱情和精力投入到工作中;平易近人',
             '為人誠懇,性格開朗,積極進取,適應力強、勤奮好學、腳踏實地',
             '有較強的團隊精神,工作積極進取,態度認真']
subjects = ['語文', '數學', '英語', '生物', '地理']
grades = [85, 77, 96, 74, 85, 69, 84, 59, 67, 69, 86, 96, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86]
create_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
 
# 新增程式耗時的功能
def timer(func):
    def wrapper(*args, **kwargs):
        start = time.time()
        res = func(*args, **kwargs)
        end = time.time()
        print('id{}共耗時約 {:.2f} 秒'.format(*args, end - start))
        return res
 
    return wrapper
@timer
def save_to_es(num):
    """
    使用生成器批次寫入資料到es資料庫
    :param num:
    :return:
    """
    action = (
        {
            "_index": "personal_info_5000000_v3",
            "_type": "_doc",
            "_id": i,
            "_source": {
                "id": i,
                "name": random.choice(names),
                "sex": random.choice(sexs),
                "age": random.choice(age),
                "character": random.choice(character),
                "subject": random.choice(subjects),
                "grade": random.choice(grades),
                "create_time": create_time
            }
        } for i in range(10000 * num, 10000 * num + 10000)
    )
    helpers.bulk(es, action)
 
def run():
    global queue
    while queue.qsize() > 0:
        num = queue.get()
        print(num)
        save_to_es(num)
 
if __name__ == '__main__':
    start = time.time()
    queue = Queue()
    # 序號資料進佇列
    for num in range(500):
        queue.put(num)
 
    # 多執行緒執行程式
    consumer_lst = []
    for _ in range(10):
        thread = threading.Thread(target=run)
        thread.start()
        consumer_lst.append(thread)
    for consumer in consumer_lst:
        consumer.join()
    end = time.time()
    print('程式執行完畢!花費時間:', end - start)

執行結果:

到此這篇關於使用python生成大量資料寫入es資料庫並查詢操作(2)的文章就介紹到這了,更多相關python生成 資料 內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com