opensurvey/main/helpers.py

125 lines
4.5 KiB
Python

import datetime
import requests
import json
from urllib.parse import urljoin
from .models import ReportToken
from django.utils import translation
from django.conf import settings
from django.shortcuts import reverse
from django.utils.translation import gettext_lazy as _
def get_access_token(download=False):
url = 'https://opencovid.build.openclinica.io/user-service/api/oauth/token'
headers = {'Content-Type': "application/json"}
data = json.dumps(
{'username': settings.OPENCLINICA_SITE_USERNAME, 'password': settings.OPENCLINICA_SITE_PASSWORD}
)
if download:
data = json.dumps(
{'username': settings.OPENCLINICA_DATA_USERNAME, 'password': settings.OPENCLINICA_DATA_PASSWORD}
)
response = requests.post(url, headers=headers, data=data)
if response.status_code == 200:
return response.text
else:
raise ValueError("{} returned: {}".format(url, response.text))
def create_openclinica_user(survey_member):
headers = {
"Authorization": "bearer {}".format(get_access_token()),
"Content-Type": "application/json"
}
data = {
"identifier": survey_member.member.oh_id,
"subjectKey": survey_member.member.oh_id
}
url = "https://opencovid.openclinica.io/OpenClinica/pages/auth/api/clinicaldata/studies/{}/sites/{}/participants?register=y".format(
settings.OPENCLINICA_STUDY, settings.OPENCLINICA_SITE
)
response = requests.post(url, headers=headers, data=json.dumps(data))
if response.status_code == 200:
return response.text
else:
raise ValueError("{} returned: {}".format(url, response.text))
def create_openclinica_event(survey_member, event, date):
headers = {
"Authorization": "bearer {}".format(get_access_token()),
"Content-Type": "application/json"
}
data = {
"endDate": date,
"startDate": date,
"studyEventOID": event,
"subjectKey": survey_member.member.oh_id,
}
url = "https://opencovid.openclinica.io/OpenClinica/pages/auth/api/clinicaldata/studies/{}/sites/{}/events".format(
settings.OPENCLINICA_STUDY, settings.OPENCLINICA_SITE
)
response = requests.post(url, headers=headers, data=json.dumps(data))
if response.status_code == 200:
return response.text
else:
try:
# Does onboarding event already exists on OpenClinica ?
if response.json().get("message") == "errorCode.eventAlreadyExists":
return response.text
except:
raise ValueError("{} returned: {}".format(url, response.text))
def get_openclinica_token(survey_member):
headers = {
"Authorization": "bearer {}".format(get_access_token()),
"Content-Type": "application/json"
}
url = "https://opencovid.openclinica.io/OpenClinica/pages/auth/api/clinicaldata/studies/{}/sites/{}/participant?includeParticipateInfo=y&participantID={}".format(
settings.OPENCLINICA_STUDY, settings.OPENCLINICA_SITE, survey_member.member.oh_id
)
response = requests.get(url, headers=headers)
print(response.json())
if response.status_code == 200:
survey_member.survey_token = response.json()['accessCode']
survey_member.save()
return response.text
else:
raise ValueError("{} returned: {}".format(url, response.text))
def create_autologin_url(member, token):
url = urljoin(
settings.OPENHUMANS_APP_BASE_URL,
reverse("autologin", kwargs={"oh_id":member.oh_id}) + "?login_token={}".format(token.token)
)
return url
def create_survey_url(member, token):
url = create_autologin_url(member, token) + "&next=/survey"
return url
def send_user_survey_link(survey_member):
token = ReportToken(member=survey_member.member)
token.save()
url = create_survey_url(survey_member.member, token)
withdraw_url = create_autologin_url(survey_member.member, token)
saved_language = survey_member.member.surveyaccount.language
translation.activate(saved_language)
# make str out of subject as otherwise only last character is subject!
survey_member.member.message(
subject=str(_("Here's your survey link!")),
message="{}: {}\n\n\n{}: {}".format(
_("Please use this link to fill out the survey"),
url,
_("If you don't want to take part in the survey anymore, please use this link and click on \"WITHDRAW MY CONSENT\""),
withdraw_url
)
)
survey_member.last_email = datetime.date.today()
survey_member.save()