Pytest操作中间件

1.背景

2.中间件

1)zookeeper

2)kafka

3)elasticsearch

3.参考资料

1.背景

最近的脚本中需要使用Python操作中间件(zookeeper/ kafka/ elastichsearch),之前没有使用过,所以度娘上到处查资料,这里记录一下常用方法,方便以后使用,也希望其他人遇到时能方便查找

2.中间件

libversionzookeeper3.7.0kazoo2.8.0

连接
try:
    zk = KazooClient(hosts=['127.0.0.1:2181'])
    zk.start()
    # 获取节点
    # znode_path 节点。例如,"/"
    result = zk.get_children(znode_path)
    zk.stop()
except KazooTimeoutError as e:
    print(e.args[0])

libversionkafka2.3.0zookeeper3.4.5kafka-python2.0.2

client = KafkaAdminClient(bootstrap_servers=['192.168.1.5:6667', '192.168.1.6:6667', '192.168.1.7:6667'])
  • 查询TOPIC列表
topics_list = client.list_topics()
  • 查询TOPIC详情
topic_dict = client.describe_topics()
  • 生产消息
连接
producer = KafkaProducer(bootstrap_servers=['192.168.1.5:6667', '192.168.1.6:6667', '192.168.1.7:6667'], retries=3, api_version=(0, 10, 2))

发送多条消息
for i in range(0, 5):
    k = bytes("k" + str(i), encoding='utf-8')
    v = bytes("v" + str(i), encoding='utf-8')
    producer.send(topic_name, key=k, value=v)

刷新
producer.flush()

关闭连接
producer.close()
  • 消费消息
连接
group_id随机产生一串英文+数字的字符串即可

consumer = KafkaConsumer(topic_name, group_id=group_id, bootstrap_servers=['192.168.1.5:6667', '192.168.1.6:6667', '192.168.1.7:6667'], consumer_timeout_ms=1000)

订阅消息
consumer.subscribe(topics=[topic_name])

tp = TopicPartition(topic_name, 0)

消费到最后一条消息则退出,否则代码中会一直等待
for message in consumer:
    if message.offset == consumer.end_offsets([tp])[tp] - 1:
        break

关闭连接
consumer.close()

libversionElasticSearch7.4.2& 7.6.0elasticsearch7.6.0

  • 测试连接
conn.ping()
  • 索引的操作
判断索引是否存在
conn.indics.exists(index_name)

获取索引信息
conn.indics.get(index_name)

写入数据
for i in range(0, 5):
    data = {
        "key": "k" + str(i),
        "value": i
    }

    conn.index(index=index_name, body=data)

查询数据
1)
body = {
    'query': {
        'prefix': {
            'key.keyword': 'k'        # 匹配前缀
        }
    },

    'size': 10
}

filter_path = ['hits.hits._source.key',
               'hits.hits._source.value']    # 展示出的字段
result = conn.search(index=index_name, filter_path=filter_path, body=body)

2)
body = {
    'query': {
        'term': {        # 匹配整个字串
            'key.keyword': 'k1'
        }
    },

    'size': 10
}

result = conn.search(index=index_name, filter_path=filter_path, body=body)

result:找到(True)/ 未找到(False)

修改数据
body = {
    'doc': {
        'key': 'k2',
        'value': 2
     }
}

conn.update(index=index_name, id=1, body=body)

删除数据
all_data = conn.search(index=index_name)
hits = all_data['hits']['hits']
for item in hits:
    conn.delete(index=index_name, id=item['_id'])

批量写入数据
data_list = []
for i in range(100):
    body = {
        '_op_type': 'create',
        '_index': index_name,
        '_type': 'doc',
        '_id': str(i),
        '_source': {'key': 'k' + str(i), 'value': i}
    }

    data_list.append(body)
result = helpers.buld(conn, data_list)

一种批量修改数据
data_list = []
for i in range(100):
    body = {
        '_op_type': 'index',
        '_index': index_name,
        '_type': 'doc',
        '_id': str(i),
        '_source': {'key': 'k' + str(i), 'value': int(i+1)}
    }

    data_list.append(body)
result = helpers.buld(conn, data_list)

另一种批量修改数据
data_list = []
for i in range(100):
    body = {
        '_op_type': 'update',
        '_index': index_name,
        '_type': 'doc',
        '_id': str(i),
        '_source': {'key': 'k' + str(i), 'value': int(i+2)}
    }

    data_list.append(body)
result = helpers.buld(conn, data_list)

批量删除数据
data_list = []
for i in range(100):
    body = {
        '_op_type': 'delete',
        '_index': index_name,
        '_type': 'doc',
        '_id': str(i),
        '_source': {'key': 'k' + str(i), 'value': int(i+2)}
    }

    data_list.append(body)
result = helpers.buld(conn, data_list)

3.参考资料

Original: https://blog.csdn.net/aduocd/article/details/126956269
Author: aduocd
Title: Pytest操作中间件

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/774175/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球