django订阅gerrit事件流数据

from cmback.settings import GERRIT_HOSTNAME, GERRIT_URL, GERRIT_PORT, G_USERNAME, G_PASSWORDfrom gerrit import GerritClientfrom cmapp.models import Gitfrom cmapp.serializers import GitSerializerfrom abc import ABCMeta, abstractmethodfrom django.db import close_old_connectionsimport sysimport paramikoimport osimport loggingimport jsonimport datetimeimport re  class GerritFactory(object):    """    subscribe gerrit stream-events, save to local db real time for show    https://gerrit-documentation.storage.googleapis.com/Documentation/3.4.1/cmd-stream-events.html    https://gerrit-documentation.storage.googleapis.com/Documentation/3.4.1/json.html    use:    "change-abandoned"    "change-merged"    "change-restored"    "topic-changed"    "patchset-created"    unused:        "assignee-changed"        "hashtags-changed"        "ref-updated"        "reviewer-added"        "reviewer-deleted"        "comment-added"        "project-created"        "change-deleted"    """    __metaclass__ = ABCMeta def __init__(self):        self.hostname = GERRIT_HOSTNAME        self.gerriturl = GERRIT_URL        self.port = GERRIT_PORT        self.username = G_USERNAME        self.passwd = G_PASSWORD        self.client = paramiko.SSHClient()        self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())        self.logger = logging.getLogger("subscribe") @abstractmethod    def get_stream_event(self):        pass def connectGerrit(self):        try:            files = os.listdir("./subscribe/gerrit/")            for file in files:                if file.endswith("rsa"):                    self.client.connect(hostname=self.hostname, port=self.port, username=self.username, pkey=paramiko.RSAKey(filename="./subscribe/gerrit/%s" % file))                    self.channel = self.client.get_transport().open_session(timeout=5)                    self.logger.info("connect to gerrit ok")                    break        except Exception as e:            self.logger.error("connect to gerrit failed, %s" % e)            sys.exit(1) def getIssues(self, data):        tmplist = re.findall("Issue: (.*)?", data)        if len(tmplist) == 1 and len(tmplist[0]) >16:            return tmplist[0].split(" ")        else :            return tmplist         def getChangeLines(self, changeid):        Gclient = GerritClient(base_url=self.gerriturl, username=self.username, password=self.passwd, ssl_verify=False)        res = Gclient.changes.get(changeid).get_revision("current").files.poll()        change_line = 0        for value in res:            if value.get('path') == "/COMMIT_MSG":                continue            insert = value.get("lines_inserted") if value.get("lines_inserted") is not None else 0            delete = value.get("lines_deleted") if value.get("lines_deleted") is not None else 0            change_line += max(insert, delete)        return change_line class CreatePatchSetFactory(GerritFactory): def get_stream_event(self):        # 1.connect        self.connectGerrit()        # 2.listen        try:            self.channel.exec_command("gerrit stream-events -s patchset-created")            while True:                if self.channel.exit_status_ready():                    break                streamdata = self.channel.recv(1024000)                data = json.loads(streamdata)                self.logger.info("patchset-created: \n%s" % data) changeid = data['change']['id']                numberid = data['change']['number']                new_revision = data['patchSet']['revision']                repo = data['change']['project']                branch = data['change']['branch']                parent = "" if len(data['patchSet']['parents']) == 0 else data['patchSet']['parents'][0]                subject = data['change']['subject']                topic = data['change'].get("topic", None)                status = data['change']['status']                commit_message = data['change']['commitMessage']                issuelist = self.getIssues(commit_message)                issue_list = json.dumps(issuelist)                author = data['change']['owner']['username']                commit_date = datetime.datetime.fromtimestamp(data['change']['createdOn']).strftime("%Y-%m-%d %H:%M:%S")                update_date = datetime.datetime.fromtimestamp(data['eventCreatedOn']).strftime("%Y-%m-%d %H:%M:%S")                change_line = self.getChangeLines(changeid) # 1.check numberid exist?                close_old_connections()                numstatus = Git.objects.filter(numberid=numberid).exists()                if numstatus:                    # update                    upobj = Git.objects.get(numberid=numberid)                    upobj.new_revision = new_revision                    upobj.subject = subject                    upobj.update_date = update_date                    upobj.commit_message = commit_message                    upobj.status = status                    upobj.issue_list = issue_list                    upobj.change_line = change_line                    upobj.save()                    self.logger.info("patchset-created-db: %s %s update ok" % (numberid, changeid))                else:                    # create                    new_data = {                        "changeid": changeid,                        "numberid": numberid,                        "new_revision": new_revision,                        "repo": repo,                        "branch": branch,                        "parent": parent,                        "topic": topic,                        "subject": subject,                        "old_subject": "xxxxxx",                        "status": status,                        "commiter": "xxxxxx",                        "reviewer": "xxxxxx",                        "update_date": update_date,                        "issue_list": issue_list,                        "commit_message": commit_message,                        "author": author,                        "commit_date": commit_date,                        "change_line": change_line,                        "submit_date": None,                        "submitter": None,                        "update_msg": None                    }                    pc_serial = GitSerializer(data=new_data)                    if pc_serial.is_valid():                        pc_serial.save()                        self.logger.info("patchset-created-db: %s %s create ok" % (numberid, changeid))                    else:                        self.logger.error("patchset-created-db: %s %s create failed %s" % (numberid, changeid, pc_serial.errors))            self.channel.close()        except Exception as e:            self.logger.warning("listen stream event except: %s ... reconnecting" % e) class ChangeAbandoneFactory(GerritFactory):         def get_stream_event(self):        # 1.connect        self.connectGerrit()        # 2.listen        try:            self.channel.exec_command("gerrit stream-events -s change-abandoned")            while True:                if self.channel.exit_status_ready():                    break                streamdata = self.channel.recv(1024000)                data = json.loads(streamdata)                self.logger.info("change-abandoned: \n%s" % data) changeid = data['change']['id']                numberid = data['change']['number']                update_date = datetime.datetime.fromtimestamp(data['eventCreatedOn']).strftime("%Y-%m-%d %H:%M:%S")                status = data['change']['status'] close_old_connections()                numstatus = Git.objects.filter(numberid=numberid).exists()                if numstatus:                    # update                    upobj = Git.objects.get(numberid=numberid)                    upobj.status = status                    upobj.update_date = update_date                    upobj.save()                    self.logger.info("change-abandoned-db: %s %s update ok" % (numberid, changeid))                else:                    self.logger.error("change-abandoned-db: %s not exist in db" % numberid)        except Exception as e:            self.logger.warning("listen stream event except: %s ... reconnecting" % e) class ChangeMergeFactory(GerritFactory):         def get_stream_event(self):        # 1.connect        self.connectGerrit()        # 2.listen        try:            self.channel.exec_command("gerrit stream-events -s change-merged")            while True:                if self.channel.exit_status_ready():                    break                streamdata = self.channel.recv(1024000)                data = json.loads(streamdata)                self.logger.info("change-merged: \n%s" % data) changeid = data['change']['id']                numberid = data['change']['number']                update_date = datetime.datetime.fromtimestamp(data['eventCreatedOn']).strftime("%Y-%m-%d %H:%M:%S")                status = data['change']['status']                submitter = data['submitter']['username']                submit_date = update_date close_old_connections()                numstatus = Git.objects.filter(numberid=numberid).exists()                if numstatus:                    # update                    upobj = Git.objects.get(numberid=numberid)                    upobj.status = status                    upobj.update_date = update_date                    upobj.submitter = submitter                    upobj.submit_date = submit_date                    upobj.save()                    self.logger.info("change-merged-db: %s %s update ok" % (numberid, changeid))                else:                    self.logger.error("change-merged-db: %s not exist in db" % numberid)        except Exception as e:            self.logger.warning("listen stream event except: %s ... reconnecting" % e) class ChangeRestoreFactory(GerritFactory):         def get_stream_event(self):        # 1.connect        self.connectGerrit()        # 2.listen        try:            self.channel.exec_command("gerrit stream-events -s change-restored")            while True:                if self.channel.exit_status_ready():                    break                streamdata = self.channel.recv(1024000)                data = json.loads(streamdata)                self.logger.info("change-restored: \n%s" % data) changeid = data['change']['id']                numberid = data['change']['number']                update_date = datetime.datetime.fromtimestamp(data['eventCreatedOn']).strftime("%Y-%m-%d %H:%M:%S")                status = data['change']['status'] close_old_connections()                numstatus = Git.objects.filter(numberid=numberid).exists()                if numstatus:                    # update                    upobj = Git.objects.get(numberid=numberid)                    upobj.status = status                    upobj.update_date = update_date                    upobj.save()                    self.logger.info("change-restored-db: %s %s update ok" % (numberid, changeid))                else:                    self.logger.error("change-restored: %s not exist in db" % numberid)        except Exception as e:            self.logger.warning("listen stream event except: %s ... reconnecting" % e) class TopicChangeFactory(GerritFactory):         def get_stream_event(self):        # 1.connect        self.connectGerrit()        # 2.listen        try:            self.channel.exec_command("gerrit stream-events -s topic-changed")            while True:                if self.channel.exit_status_ready():                    break                streamdata = self.channel.recv(1024000)                data = json.loads(streamdata)                self.logger.info("topic-change: \n%s" % data) numberid = data['change']['number']                update_date = datetime.datetime.fromtimestamp(data['eventCreatedOn']).strftime("%Y-%m-%d %H:%M:%S")                topic = data['change'].get("topic", None) close_old_connections()                numstatus = Git.objects.filter(numberid=numberid).exists()                if numstatus:                    # update                    upobj = Git.objects.get(numberid=numberid)                    upobj.topic = topic                    upobj.update_date = update_date                    upobj.save()                    self.logger.info("topic-change-db: %s update ok" % numberid)                else:                    self.logger.error("topic-change: %s not exist in db" % numberid)            self.channel.close()        except Exception as e:            # reconnect            self.logger.warning("listen stream event except: %s ... reconnecting" % e)

Original: https://blog.51cto.com/u_11627433/5519499
Author: 失向的星空
Title: django订阅gerrit事件流数据

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/504726/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球