Hallo Max,
wir authentifizieren gegen den LDAP und nutzen ein kleines Sync-Skript (Kudos meinem Kollegen), um dennoch alle Benutzer jederzeit in Koha zu haben (da wir sonst z.B. die Schulbuchausleihe nicht machen können).
Benötigt man vermutlich nicht alles (wir erstellen z.B. auch Karten(nummern) mit dem Skript), aber vielleicht eine Alternative zum mühsamen Import (wenn Ihr die Nutzer denn überhaupt immer benöitigt).
Aber jeder manuelle Abgleich von Listen entfällt dadurch.
Viele Grüße
Thomas
import ldap
import ldap.resiter
import requests
import pprint
import json
from datetime import datetime, timedelta
import smtplib, ssl
from email.message import EmailMessage
import random
import string
# SMTP-Settings
port = 465 # For SSL
smtp_server = "<mailhost>"
sender_email = "<sendermail>" # Enter your address
receiver_email = "<recpmail>" # Enter receiver address
email_password = '<mailpwd>' #input("Type your password and press enter: ")
# LDAP-settings
ldap_uri = "<ldapuri>"
ldap_base = "ou=accounts,dc=linuxmuster,dc=local"
searchFilter = "(|(sophomorixstatus=U)(sophomorixstatus=E))" #(sophomorixstatus=D)(sophomorixstatus=R))"
searchScope = ldap.SCOPE_SUBTREE
# Koha-Settings
koha_plus_userids = {<list of additional koha-users (not in ldap)>}
koha_api_url='http://<koha-url>/api/v1/'
koha_oauthdata={
"grant_type":"client_credentials",
"client_id":"<clientid>",
"client_secret":"<clientsecret>"}
koha_patron_attribs = {'address', 'address2', 'altaddress_address', 'altaddress_address2', 'altaddress_city',
'altaddress_country', 'altaddress_email', 'altaddress_notes', 'altaddress_phone', 'altaddress_postal_code',
'altaddress_state', 'altaddress_street_number', 'altaddress_street_type', 'altcontact_address',
'altcontact_address2', 'altcontact_city', 'altcontact_country', 'altcontact_firstname', 'altcontact_phone',
'altcontact_postal_code', 'altcontact_state', 'altcontact_surname', 'anonymized', 'autorenew_checkouts',
'cardnumber', 'category_id', 'check_previous_checkout', 'city', 'country', 'date_enrolled', 'date_of_birth',
'date_renewed', 'email', 'expiry_date', 'fax', 'firstname', 'gender', 'incorrect_address', 'initials',
'lang', 'last_seen', 'library_id', 'login_attempts', 'mobile', 'opac_notes', 'other_name',
'overdrive_auth_token', 'patron_card_lost', 'phone', 'postal_code', 'privacy', 'privacy_guarantor_checkouts',
'privacy_guarantor_fines', 'relationship_type', 'restricted', 'secondary_email', 'secondary_phone',
'sms_number', 'sms_provider_id', 'staff_notes', 'state', 'statistics_1', 'statistics_2', 'street_number',
'street_type', 'surname', 'title', 'updated_on', 'userid'}
koha_patron_min_attribs = {'surname', 'address', 'city', 'library_id', 'category_id'}
koha_patron_extra_attribs = {'CLASS', 'SHOW_BCODE'}
##############################
# Classes
##############################
class MyLDAPObject(ldap.ldapobject.LDAPObject,ldap.resiter.ResultProcessor):
pass
class LDAPList():
def __init__(self, ldap_uri):
ldapconn = MyLDAPObject(ldap_uri)
msg_id = ldapconn.search(ldap_base, searchScope, searchFilter)
self.ldapusers=dict()
self.ldapobj=dict()
for res_type,result,res_msgid,res_controls in ldapconn.allresults(msg_id):
homedir = result[0][1]['homeDirectory'][0].decode('utf-8')
surname = result[0][1]['sn'][0].decode('utf-8')
found = homedir.find('/-')
# Lehrer und Schüler haben kein '/-' im Namen des Home-Verzeichnisses.
if homedir.find ('/-') >= 0:
continue
# Lehrer oder Schüler?
# show_bcode wird zur Zeit nicht genutzt.
dirsplit = homedir.split('/')
if dirsplit[2] == 'teachers':
group = 'LUL'
klasse = None
show_bcode = '1'
elif dirsplit[2] == 'attic':
group = ''
klasse = ''
show_bcode = '0'
else:
group = 'SUS'
klasse = dirsplit[3]
show_bcode = '0'
# Bilde Dict-Eintrag.
ldap_obj = result[0][1]
userid = ldap_obj['uid'][0].decode('utf8')
self.ldapusers[userid] = dict()
# Es gibt "normale" und extra Attribute, die jeweils unterschiedlich abgefragt
# und gesetzt werden.
# Weil extra Attribute momentan einzeln bei koha abgefragt werden müssen,
# wird auf die Nutzung verzichtet. Dauert sonst 5 min.
# normale Attribute:
self.ldapusers[userid]['attribs']= {
'userid':userid,
# Cardnumber wird neu generiert und nicht aus LDapdaten abgeleitet.
#'cardnumber':ldap_obj['uidNumber'][0].decode('utf8'),
'surname':ldap_obj['sn'][0].decode('utf8'),
'firstname':ldap_obj['givenName'][0].decode('utf8'),
'email':ldap_obj['mail'][0].decode('utf8'),
'category_id':group,
'statistics_1':klasse}
# ~ # extra Attribute: Wegen None werden sie ignoriert bzw. gelöscht.
# ~ self.ldapusers[ldap_obj['uid'][0].decode('utf8')]['extra_attribs']= {
# ~ 'CLASS': None,
# ~ 'SHOW_BCODE': None}
self.ldapobj[userid] = ldap_obj
self.uidset = set(self.ldapusers.keys())
def userinfo(self, userid):
if userid in self.ldapusers:
return([self.ldapusers[userid],self.ldapobj[userid]])
else:
return(None)
class KohaPatrons():
def __init__(self, koha_api_url, koha_oauthdata):
self.oatoken = requests.post(koha_api_url + 'oauth/token', data = koha_oauthdata).json()
# Get all patrons.
plist = requests.get(koha_api_url + 'patrons',
headers = {'authorization': f"{self.oatoken['token_type']} {self.oatoken['access_token']}"},
params = {'_per_page': 10000}).json()
self.patrons = dict()
# Bilde dict mit userid als key.
for p in plist:
self.patrons[p['userid']] = dict()
self.patrons[p['userid']]['attribs'] = p
# Hole Extra-Attribute für jeden Patron.
ext_attrbs = {}
# Dauert zu lange, daher ausgesetzt. Extra Attribute bleiben leer.
# ~ ext_attrbs = requests.get(koha_api_url + 'patrons/' + str(p['patron_id']) + '/extended_attributes',
# ~ headers = {'authorization': f"{self.oatoken['token_type']} {self.oatoken['access_token']}"}).json()
self.patrons[p['userid']]['extra_attribs'] = ext_attrbs
# Lege set eingelesener Nutzerids und Patronids an.
self.uidset = set(self.patrons.keys())
self.pidset = { self.patrons[uid]['attribs']['patron_id'] for uid in self.uidset }
self.cardnumset = { self.patrons[uid]['attribs']['cardnumber'] for uid in self.uidset }
def create_cardnumber(self):
repetition = 2
letters = 3
digits = 0
num_cards = len(self.cardnumset)
while len(self.cardnumset) == num_cards:
low = [''.join(random.choices(string.ascii_uppercase, k=letters)) for _ in range(repetition)]
digit = [''.join(random.choices(string.digits, k=digits)) for _ in range(repetition)]
result = [None]*(len(low)+len(digit))
result[::2] = low
result[1::2] = digit
result.insert(2,"-")
n_cardnum = "".join(result)
self.cardnumset.add(n_cardnum)
return n_cardnum
def addpatron(self, adddata):
# Test, ob Attribute in addata auch gültige Koha-Attribute sind.
if not set(adddata.keys()).issubset(koha_patron_attribs):
for k in adddata.keys():
if not k in koha_patron_attribs:
# print(f'{k} ist keine gültige Eigenschaft eines Koha-Patrons')
pass
return(None)
# Test, ob alle für Koha nötigen Attribute in addata sind.
if not koha_patron_min_attribs.issubset(set(adddata.keys())):
for k in koha_patron_min_attribs:
if not k in set(adddata.keys()):
# print(f'{k} muss als (evt. leere) Eigenschaft gesetzt werden.')
pass
return(None)
# Ergänze cardnumber.
adddata['cardnumber'] = self.create_cardnumber()
# print(adddata)
r = requests.post(
koha_api_url + 'patrons',
headers = {'authorization': f"{self.oatoken['token_type']} {self.oatoken['access_token']}"},
data = json.dumps(adddata))
# If success, add patron to lists.
if r.status_code == 201:
p = r.json()
self.patrons[p['userid']] = dict()
self.patrons[p['userid']]['attribs'] = p
self.uidset.add(p['userid'])
self.pidset.add(p['patron_id'])
# Eigentlich unnötig, aber weil es ein set ist auch egal.
self.cardnumset.add(p['cardnumber'])
return(p['patron_id'])
else:
# print(f"Fehler!\n{r.text}")
return(None)
def patron_attrib_update(self, patronid, updatedata):
if not patronid in self.pidset:
return(None)
if not set(updatedata.keys()).issubset(koha_patron_attribs):
for k in updatedata.keys():
if not k in koha_patron_attribs:
# print(f'{k} ist keine gültige Eigenschaft eines Koha-Patrons')
pass
return(None)
# Fill updatedatas empty attributes with current ones.
for p in self.patrons.values():
if p['attribs']['patron_id'] != patronid:
continue
for at in koha_patron_min_attribs:
if not at in set(updatedata.keys()):
updatedata[at] = p['attribs'][at]
break
r = requests.put(
koha_api_url + 'patrons/' + str(patronid),
headers = {'authorization': f"{self.oatoken['token_type']} {self.oatoken['access_token']}"},
data = json.dumps(updatedata))
# If success, update patron in self.patrons, refresh self.uidset.
if r.status_code == 200:
p_upd = requests.get(koha_api_url + 'patrons/' + str(r.json()['patron_id']), headers = {'authorization': f"{self.oatoken['token_type']} {self.oatoken['access_token']}"}).json()
# Is userid unchanged? It's much easier then.
if p_upd['userid'] in self.uidset:
p_upd['userid'] = p_upd
else:
for userid, patron in self.patrons.items():
if patron['patron_id'] == p_upd['patron_id']:
del self.patrons[userid]
self.uidset.remove(patron['userid'])
self.patrons[p_upd['userid']] = p_upd
self.uidset.add(p_upd['userid'])
break
return(p_upd['patron_id'])
else:
# print(f"Fehler!\n{r.text}")
return(None)
##############################
# Functions.
##############################
def user_ldap2koha(uid, luser):
return({
'surname': luser['surname'],
'address': '',
'city': '',
'library_id': 'BIB',
'category_id': luser['category_id'],
'userid': luser['userid'],
'firstname': luser['firstname'],
'email': luser['email'],
'statistics_1': luser['statistics_1']
# 'cardnumber': luser['cardnumber']
})
def send_mail(subject, mailbody):
msg = EmailMessage()
msg.set_content(mailbody)
msg['Subject'] = subject
msg['From'] = "<sendermail>"
msg['To'] = "<recpmail>"
msg["Date"] = formatdate(localtime=True)
context = ssl.create_default_context()
with smtplib.SMTP_SSL(smtp_server, port, context=context) as server:
server.login(sender_email, email_password)
server.send_message(msg)
return(True)
##############################
# Program starts here.
##############################
ulistldap = LDAPList(ldap_uri)
ulistkoha = KohaPatrons(koha_api_url, koha_oauthdata)
# Lege LDAP-Nutzer in Koha an.
added = list()
for userid in ulistldap.uidset - ulistkoha.uidset:
# print(f"{userid} wird in Koha angelegt")
if ulistkoha.addpatron(user_ldap2koha(userid, ulistldap.ldapusers[userid]['attribs'])):
added.append(userid)
# Deaktivere Patrons in Koha, für die kein LDAP-Nutzer bekannt ist.
deactivated_new = list()
deactivated_old = list()
for userid in (ulistkoha.uidset - koha_plus_userids - ulistldap.uidset):
if datetime.now() >= datetime.strptime(ulistkoha.patrons[userid]['attribs']['expiry_date'], "%Y-%m-%d"):
# print(f"{userid} ist deaktiviert in Koha.")
deactivated_old.append(userid)
else:
# print(f"{userid} wird deaktiviert in Koha.")
if ulistkoha.patron_attrib_update(
ulistkoha.patrons[userid]['attribs']['patron_id'],
{'expiry_date': (datetime.now() - timedelta(1)).strftime('%Y-%m-%d')}):
deactivated_new.append(userid)
# Vergleiche Attribute von LDAP- und Kohanutzern. Gleiche Attribute an LDAPnutzer an.
# Cardnumber wird nicht mehr aus LDap-Daten abgeleitet.
updated = list()
for userid in ulistldap.uidset.intersection(ulistkoha.uidset) - koha_plus_userids:
updatedata = dict()
for key, value in ulistldap.ldapusers[userid]['attribs'].items():
# Attribute 'cardnumber' should not be changed until September 2022.
if key == 'cardnumber':
# print("Kartennummer")
continue
if value != ulistkoha.patrons[userid]['attribs'][key]:
# print(f"update: {userid} {key} {value}")
updatedata[key] = value
if updatedata:
# print(userid)
ulistkoha.patron_attrib_update(ulistkoha.patrons[userid]['attribs']['patron_id'], updatedata)
updated.append(userid)
# Send E-Mail.
text = ''
if added:
text += '* added:\n' + ', '.join(sorted(added)) +'\n'
if deactivated_new: # or deactivated_old:
text += '* deactivated new:\n' + ', '.join(sorted(deactivated_new)) + '\n' + '* deactivated old:\n' + ', '.join(sorted(deactivated_old)) + '\n'
if updated:
text += '* updated:\n' + ', '.join(sorted(updated)) +'\n'
if text:
send_mail('Koha-Patrons updated', text + '* white listed:\n' + ', '.join(sorted(koha_plus_userids)))