1
0
mirror of https://github.com/lework/jenkins-update-center.git synced 2024-11-26 18:11:49 +02:00
jenkins-update-center/generator.py

143 lines
4.7 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Desc: Jenkins update center generator
# depend:
# yum -y install make gcc automake autoconf python3-devel
# pip install pycrypto
import os
import json
import base64
import binascii
import http.client
import urllib.request
from Crypto.Hash import SHA512, SHA
from Crypto.PublicKey import RSA
from Crypto.Signature import PKCS1_v1_5
class JenkinsUpdateCenter:
def __init__(self):
self.updateCenterVersion = "1"
self.core = None
self.warnings = None
self.plugins = None
self.id = "default"
self.connectionCheckUrl = None
self._private_key = None
self._cert = [None]
def _sha1_digest(self, body):
digest = base64.b64encode(SHA.new(body).digest()).decode("utf-8")
return digest
def _sha512_digest(self, body):
digest = binascii.hexlify(SHA512.new(body).digest()).decode("utf-8")
return digest
def _sign(self, body, algo = "SHA-1"):
signer = PKCS1_v1_5.new(self._private_key)
if algo == "SHA-1":
digest = SHA.new()
else:
digest = SHA512.new()
digest.update(body)
try:
signature = signer.sign(digest)
except Exception as err:
raise Exception("Could not make sign. "+str(err))
return signature
def _sha1_signature(self, body):
signature = base64.b64encode(self._sign(body, "SHA-1")).decode("utf-8")
return signature
def _sha512_signature(self, body):
signature = binascii.hexlify(self._sign(body, "SHA-512")).decode("utf-8")
return signature
def load_private(self, key_path):
try:
with open(key_path, "r") as fd:
self._private_key = RSA.importKey(fd.read())
except Exception as err:
raise Exception("Could not load private key "+key_path+". "+str(err))
def load_public(self, key_path):
try:
with open(key_path, "rb") as fd:
self._cert = base64.b64encode(fd.read()).decode("utf-8")
except Exception as err:
raise Exception("Could not load public key "+key_path+". "+str(err))
def out(self, fd):
output = {}
output["updateCenterVersion"] = self.updateCenterVersion
if self.core is not None:
output["core"] = self.core
if self.warnings is not None:
output["warnings"] = self.warnings
if self.plugins is not None:
output["plugins"] = self.plugins
output["id"] = self.id
if self.connectionCheckUrl is not None:
output["connectionCheckUrl"] = self.connectionCheckUrl
payload = (json.dumps(output, separators=(",", ":"), sort_keys=True, ensure_ascii=False).encode("utf-8"))
output["signature"] = {"certificates":[self._cert]}
output["signature"]["correct_digest"] = self._sha1_digest(payload)
output["signature"]["correct_digest512"] = self._sha512_digest(payload)
output["signature"]["correct_signature"] = self._sha1_signature(payload)
output["signature"]["correct_signature512"] = self._sha512_signature(payload)
try:
fd.write("updateCenter.post(\n"+json.dumps(output, separators=(",", ":"), sort_keys=True)+"\n);")
except Exception as err:
raise Exception("Could not write output. "+str(err))
def main():
mirrors_file = "mirrors.json"
private_key = "rootCA/update-center.key"
public_key = "rootCA/update-center.crt"
original_download_url = "https://updates.jenkins.io/download/"
original_update_center_url = "https://updates.jenkins-ci.org/current/update-center.json"
original_file = urllib.request.urlopen(original_update_center_url)
try:
original_context = original_file.read()
except http.client.IncompleteRead as e:
original_context = e.partial.decode('utf-8')
original = json.loads(original_context.replace(str.encode("updateCenter.post(\n"), str.encode("")).replace(str.encode("\n);"), str.encode("")))
uc = JenkinsUpdateCenter()
uc.load_private(private_key)
uc.load_public(public_key)
uc.warnings = original["warnings"]
try:
with open(mirrors_file, "r") as fd:
mirrors_url = json.loads(fd.read())
except Exception as err:
raise Exception("Could not load mirrors " + mirrors_file +". " + str(err))
for site,mirror_url in mirrors_url.items():
print("Generate:", mirror_url)
uc.plugins = json.loads(json.dumps(original["plugins"]).replace(original_download_url, mirror_url))
uc.core = json.loads(json.dumps(original["core"]).replace(original_download_url, mirror_url))
site_path = "updates/" + site
if not os.path.exists(site_path):
os.makedirs(site_path)
with open(site_path + "/update-center.json", "w") as fd:
uc.out(fd)
if __name__ == '__main__':
main()