User:Zache/esimerkki/Esimerkki 2
< User:Zache | esimerkki
#!/usr/bin/python import pywikibot import imagehash from PIL import Image import urllib import json import requests import re # MAIN() site = pywikibot.Site("commons", "commons") site.login() repo = site.data_repository() user = site.user() if user: pywikibot.output('Logged in on {0} as {1}.'.format(site, user)) else: pywikibot.output('Not logged in on {0}.'.format(site)) exit(1) # Read SDC mediainfo def getCurrentMediaInfo(site, mediaid): request = site._simple_request(action='wbgetentities',ids=mediaid) data = request.submit() if data.get(u'entities').get(mediaid).get(u'pageid'): return data.get(u'entities').get(mediaid) return {} def downloadFile(url): print("Downloading file " + url +"\n") local_filename = "tmp/file_to_commons.tif" r = requests.get(url) f = open(local_filename, 'wb') for chunk in r.iter_content(chunk_size=512 * 1024): if chunk: # filter out keep-alive new chunks f.write(chunk) f.close() return local_filename def read_phash_and_imageinfo(local_file): ret={} im = Image.open(local_file) #calculate phash ret["phash"] = imagehash.phash(im) ret["image_width"], ret["image_height"] = im.size ret["imagehash_version"]= "Imagehash " + str(imagehash.__version__) return ret # Get image info from Commons def get_commons_image_info(filename): url="https://commons.wikimedia.org/w/api.php?action=query&format=json&prop=imageinfo&iiprop=url&titles=" + urllib.parse.quote_plus(filename) url+="&iiurlwidth=1024" with urllib.request.urlopen(url) as file: data = json.loads(file.read().decode()) return data def addSDCPhash(site, media_identifier, phashchecksum, width, height, imagehash_version): propertyvalue=phashchecksum claim_id=createMediainfoClaim(site, media_identifier, "P9310", propertyvalue) setMediainfoQualifier(site, media_identifier, claim_id, "P9310", propertyvalue, "P348", imagehash_version) heightvalue={ 'amount': '+' + str(height), 'unit': 'http://www.wikidata.org/entity/Q355198' } widthvalue= { 'amount': '+' + str(width), 'unit': 'http://www.wikidata.org/entity/Q355198' } setMediainfoQualifier(site, media_identifier, claim_id, "P9310", propertyvalue, "P2048", heightvalue) setMediainfoQualifier(site, media_identifier, claim_id, "P9310", propertyvalue, "P2049", widthvalue) def setMediainfoQualifier(site, media_identifier, claim_id, property, propertyvalue, qualifier, qualifiervalue): exists=getMediainfoClaimId(site, media_identifier, property, propertyvalue, qualifier, qualifiervalue) if exists: return exists if claim_id=="": claim_id=getMediainfoClaimId(site, media_identifier, property, value) if not claim_id: print("Claim id not found. Adding qualifier failed") exit(1) csrf_token = site.tokens['csrf'] payload = { 'action' : 'wbsetqualifier', 'format' : u'json', 'claim' : claim_id, 'property' : qualifier, 'snaktype' : 'value', 'value' : json.dumps(qualifiervalue), 'token' : csrf_token, 'bot' : True, # in case you're using a bot account (which you should) } request = site._simple_request(**payload) try: ret=request.submit() print(ret) claim=ret.get("claim") if claim: return claim.get("id") else: print("Claim created but there was an unknown problem") print(ret) exit(1) except pywikibot.data.api.APIError as e: print('Got an error from the API, the following request were made:') print(request) print('Error: {}'.format(e)) exit(1) def getMediainfoClaims(site, media_identifier, property): payload = { 'action' : 'wbgetclaims', 'format' : u'json', 'entity' : media_identifier, 'property' : property, } request = site._simple_request(**payload) try: ret=request.submit() return ret except pywikibot.data.api.APIError as e: print('Got an error from the API, the following request were made:') print(request) print('Error: {}'.format(e)) exit(1) def createMediainfoClaim(site, media_identifier, property, value): exists=getMediainfoClaimId(site, media_identifier, property, value) if exists: return exists csrf_token = site.tokens['csrf'] payload = { 'action' : 'wbcreateclaim', 'format' : u'json', 'entity' : media_identifier, 'property' : property, 'snaktype' : 'value', 'value' : json.dumps(value), 'token' : csrf_token, 'bot' : True, # in case you're using a bot account (which you should) } print(payload) request = site._simple_request(**payload) try: ret=request.submit() claim=ret.get("claim") if claim: return claim.get("id") else: print("Claim created but there was an unknown problem") print(ret) exit(1) except pywikibot.data.api.APIError as e: print('Got an error from the API, the following request were made:') print(request) print('Error: {}'.format(e)) exit(1) def testSnak(snak, snakvalue): if snak: datavalue=snak.get("datavalue") if datavalue: if datavalue["type"]=="wikibase-entityid": value=datavalue.get("value") if value and value.get("id")==snakvalue: return True elif datavalue["type"]=="string": if datavalue.get("value")==snakvalue: return True elif datavalue["type"]=="quantity": value=datavalue.get("value") if value.get("amount")==snakvalue: return True else: print("ERROR: Unknown datavalue type") print(datavalue) exit(1) return False def getMediainfoClaimId(site, media_identifier, property, propertyvalue="", qualifier="", qualifiervalue="" ): claims=getMediainfoClaims(site, media_identifier, property) ṕroperty_found=False if ('entity-type' in propertyvalue and propertyvalue.get('entity-type')=='item'): propertyvalue=propertyvalue.get('id') if ('entity-type' in qualifiervalue and qualifiervalue.get('entity-type')=='item'): qualifiervalue=qualifiervalue.get('id') if ('amount' in propertyvalue): propertyvalue=propertyvalue.get('amount') if ('amount' in qualifiervalue): qualifiervalue=qualifiervalue.get('amount') claimlist=claims.get('claims') if claimlist: properties=claimlist.get(property) if properties: for property in properties: if propertyvalue=="": property_found=True else: qualifiers=property.get("qualifiers") property_found=testSnak(property.get("mainsnak"), propertyvalue) if qualifier=="" and property_found: print(json.dumps(property)) return property["id"] elif property_found: qualifiers=property.get("qualifiers") if qualifiers and qualifiers.get(qualifier): if qualifiervalue=="": return property["id"] else: for qualifierSnak in qualifiers.get(qualifier): qualifier_found=testSnak(qualifierSnak, qualifiervalue) if qualifier_found: return property["id"] return False def readFile(pageTitle, finna_id=""): print(pageTitle, "\t", finna_id) if (finna_id==""): return page=pywikibot.Page(site,pageTitle) #newPage.text="Commons:Sandbox" #newPage.save("test edit") item_id='M' + str(page.pageid) item = getCurrentMediaInfo(site, item_id) finna_id_test='' if item.get('statements') and item.get('statements').get('P9478'): p9478_finna_id_statemens=item.get('statements').get('P9478') for s in p9478_finna_id_statemens: finna_id_test=s.get('mainsnak').get('datavalue').get('value') phash_test='' if item.get('statements') and item.get('statements').get('P9310'): p9310_phash_statemens=item.get('statements').get('P9310') for s in p9310_phash_statemens: phash_test=s.get('mainsnak').get('datavalue').get('value') if (str(finna_id_test)!="" and str(phash_test)!=""): return # Get image info from Finna url="https://api.finna.fi/v1/record?lng=fi&prettyPrint=1&id=" + str(finna_id) finna_phash="" with urllib.request.urlopen(url) as file: data = json.loads(file.read().decode()) if ("status" in data and data.get("status")=="OK"): records=data.get("records") for record in records: images=record.get("images") for image in images: image_url="https://www.finna.fi" + image; local_file=downloadFile(image_url) # read_phash_and_imageinfo finna_phash=read_phash_and_imageinfo(local_file) if finna_phash=="": return # Phash t=get_commons_image_info(pageTitle) commons_image_url=t.get("query").get("pages").get(str(page.pageid)).get("imageinfo")[0].get("url") local_file= downloadFile(commons_image_url) commons_phash=read_phash_and_imageinfo(local_file) print("* " + str(finna_phash["phash"])); print("* " + str(commons_phash["phash"])); if (finna_phash["phash"] - commons_phash["phash"]==0): # Add imagehash info addSDCPhash(site, item_id, str(commons_phash["phash"]), commons_phash["image_width"], commons_phash["image_height"], commons_phash["imagehash_version"]) # Add Finna id property claim_id=createMediainfoClaim(site, item_id, "P9478", finna_id) url="https://petscan.wmflabs.org/?psid=19393946&format=json"; with urllib.request.urlopen(url) as file: data = json.loads(file.read().decode()) for images in data.get("*"): for image in images.get("a").get("*"): if (image.get("namespace")==6): imageTitle=image.get("title") x = re.findall("_\(([^(]*?)\).(jpg|tif)", imageTitle) if x: if 'hkm' in x[0][0]: finna_id=x[0][0].replace("-", ":") else: finna_id=x[0][0] print(finna_id) readFile("File:" + imageTitle, finna_id)