Connecteur Societeinfo à Hubspot : comment ajouter automatiquement les nouvelles sociétés d'un code APE à Hubspot

Societeinfo est un outil hyper pratique pour effectuer de l'enrichissement sur une base de prospect français. En plus d'avoir un coût relativement faible, celui-ci offre une API très bien structurée. Avec l'API de Societeinfo et l'API d'Hubspot, il est possible d'ajouter automatiquement toutes les nouvelles boîtes créées avec des critères précis tels que : code APE, nombre d'employés, type de société, etc.

L'objectif du connecteur Societeinfo => Hubspot

Créer à intervalle de 24h dans Hubspot un contact pour chaque nouvelle société créées en France qui est associé à un code APE précis.

Les informations envoyées vers Hubspot

  • SIRET / SIREN
  • Date de création
  • Nom de la société
  • Source = sociétéinfo
  • Code postal
  • Ville
  • Email si existant
  • Numéro de téléphone si existant
  • Site web si existant
  • Prénom de l'associé principal
  • Nom de l'associé principal
  • Date d'anniversaire de l'associé principal
  • Titre de l'associé principal

Requis pour réaliser ce projet

  • La documentation de l'API Societeinfo ;
  • Un compte payant Societeinfo ;
  • La documentation de l'API Hubspot ;
  • Un compte Google Cloud (ou autres) ;
  • Fort probablement un développeur, ou quelqu'un avec beaucoup de volontés et de temps, ou nous.

Les scripts nécessaires

#!/usr/bin/python
# https://societeinfo.com/app/rest/api/v2/company.json/843630591?key=xxxxxx
import json
import requests
import logging
import time
import datetime
import pytz
import calendar
import os

# Change path to your file
LAST_PROCESSED_REGISTRATION_NUMBER_FILENAME = "./xxxx/last_processed_registration_number"
APE_CODE = os.environ.get("APE_CODE", None)
if not APE_CODE:
    raise Exception("APE code is missing")
SOCIETE_INFO_API_KEY = os.environ.get("SOCIETE_INFO_API_KEY", None)
if not SOCIETE_INFO_API_KEY:
    raise Exception("Societe-info API key is missing")
HUBSPOT_API_KEY = os.environ.get("HUBSPOT_API_KEY", None)
if not HUBSPOT_API_KEY:
    raise Exception("Hubspot API key is missing")
def log(text, level="info"):
    now = datetime.datetime.fromtimestamp(time.time()).strftime("%Y-%m-%d - %H:%M:%S")
    line = "{} : {}".format(now, text)
    if level == "warning":
        logging.warning(line)
    elif level == "info":
        logging.info(line)
    else:
        logging.debug(line)
def get_companies(page):
    return "https://societeinfo.com/app/rest/api/v2/companies.json?key={}&sort=creationDateDesc&query={}&searchMode=keyword&page=
{}&limit=25&active=true&mincreationdate={}".format(SOCIETE_INFO_API_KEY, APE_CODE, page, (datetime.date.today() - datetime.timede
lta(1)).strftime("%Y%m%d"))
def get_company(registration_number):
    return "https://societeinfo.com/app/rest/api/v2/company.json/{}?key={}".format(registration_number, SOCIETE_INFO_API_KEY)
# def search_hs_contact(company_name):
#     return "https://api.hubapi.com/contacts/v1/search/query?q={}&hapikey={}&property=siren".format(company_name, HUBSPOT_API_KE
Y)
def format_date(date):
    utc = pytz.timezone('UTC')
    dt = datetime.datetime.combine(datetime.datetime.strptime(date, "%Y-%m-%d"), datetime.time(0, 0))
    utc_dt = utc.localize(dt, is_dst=None)
    return "{}000".format(calendar.timegm(utc_dt.timetuple()))
def compute_contact_properties(registration_number):
    payload = { "properties": [] }
    res = requests.get(get_company(registration_number))
    company = json.loads(res.text)["result"]
    if res.status_code == 200:
        payload["properties"].append({ "property": "siren", "value": registration_number })
        payload["properties"].append({ "property": "siret", "value": company["organization"]["full_registration_number"] })
        payload["properties"].append({ "property": "date_creation_societe", "value": format_date(company["organization"]["creatio
n_date"]) })
        payload["properties"].append({ "property": "company", "value": company["organization"]["name"] })
        payload["properties"].append({ "property": "origine", "value": "societeinfo_creation" })
        if "street" in company["organization"]["address"]:
            payload["properties"].append({ "property": "address", "value": company["organization"]["address"]["street"] })
        if "postal_code" in company["organization"]["address"]:
            payload["properties"].append({ "property": "zip", "value": company["organization"]["address"]["postal_code"] })
        if "city" in company["organization"]["address"]:
            payload["properties"].append({ "property": "city", "value": company["organization"]["address"]["city"] })
        if "email" in company["contacts"]:
            payload["properties"].append({ "property": "email", "value": company["contacts"]["email"] })
        if len(company["contacts"]["phones"]) > 0:
            payload["properties"].append({ "property": "phone", "value": company["contacts"]["phones"][0]["value"] })
        if len(company["contacts"]["phones"]) > 1:
            payload["properties"].append({ "property": "mobile", "value": company["contacts"]["phones"][1]["value"] })
        if "website_url" in company["web_infos"]:
            payload["properties"].append({ "property": "website", "value": company["web_infos"]["website_url"] })
        if len(company["contacts"]["corporate_officiers"]) > 0:
            corporate_officier = company["contacts"]["corporate_officiers"][0]
            first_name = corporate_officier["firstName"].split(" ")[0]
            payload["properties"].append({ "property": "firstname", "value": first_name })
            payload["properties"].append({ "property": "lastname", "value": corporate_officier["lastName"] })
            payload["properties"].append({ "property": "date_anniversaire", "value": format_date(corporate_officier["birth_date"]
) })
            payload["properties"].append({ "property": "jobtitle", "value": corporate_officier["role"] })
    return payload
def get_new_results(last_processed_registration_number):
    log("Fetching last companies...")
    registration_numbers = []
    companies = {}
    page = 1
    stop = False
    while not stop:
        res = requests.get(get_companies(page))
        log("page {}".format(page))
        if res.status_code == 200:
            result = json.loads(res.text)
            if len(result["result"]) > 0:
                for company in result["result"]:
                    if company["registration_number"] == last_processed_registration_number:
                        stop = True
                        break
                    else:
                        registration_numbers.append(company["registration_number"])
                        companies[company["registration_number"]] = { "name": company["name"] }
                if not stop:
                    page += 1
            else:
                stop = True
        else:
            stop = True
    registration_numbers.reverse()
    return [registration_numbers, companies]
def create_new_contacts(registration_numbers, companies):
    log("Creating new contacts from companies...")
    for registration_number in registration_numbers:
        # company = companies[registration_number]
        # res = requests.get(search_hs_contact(company["name"]))
        # if res.status_code == 200:
            # result = json.loads(res.text)
            # if result["total"] == 0:
        payload = compute_contact_properties(registration_number)
        log(payload, "debug")
        res = requests.post("https://api.hubapi.com/contacts/v1/contact/?hapikey={}".format(HUBSPOT_API_KEY), data=json.dumps(pay
load))
        if res.status_code == 200:
            result = json.loads(res.text)
            log("New contact created for company with registration number {}. vid is {}".format(registration_number, result["vid"
]))
            with open(LAST_PROCESSED_REGISTRATION_NUMBER_FILENAME, 'w') as f:
                f.write(registration_number)
        elif res.status_code == 409:
            log("Conflict with existing hubspot contact for company with registration number {}".format(registration_number), "wa
rning")
        elif res.status_code == 400:
            log(res.text, "warning")
            log("Invalid contact creation request for company with registration number {}".format(registration_number), "warning"
)
        else:
            log("Error encountered while creating contact for company with registration number {}".format(registration_number), "
warning")
            # else:
            #     suspicious_vids = list(map(lambda contact: str(contact["vid"]), result["contacts"]))
            #     log("Company with registration number {} might already exists in hubspot. Look for records : {}".format(registr
ation_number, ", ".join(suspicious_vids)), "warning")
if __name__== "__main__":
    logging.basicConfig(filename='./hubspot_integration/ceres_cron/populate_company.log', filemode='w', level=logging.DEBUG)
    with open(LAST_PROCESSED_REGISTRATION_NUMBER_FILENAME) as f:
        last_processed_registration_number = f.readline().rstrip()
    if last_processed_registration_number:
        registration_numbers, companies = get_new_results(last_processed_registration_number)
        log("Found {} new companies...".format(len(registration_numbers)), "info")
        if len(registration_numbers) > 0:
            create_new_contacts(registration_numbers, companies)
    else:
        log("Unknown last processed registration number", "warning")

Voici la tâche Cron qui permet de déclencher le script à un intervalle régulier :

from crontab import CronTab

user = None
APE_CODE = None
SOCIETE_INFO_API_KEY = None
HUBSPOT_API_KEY = None
hour = None
minute = None

while not user:
    user = input('Enter user : ')
while not APE_CODE:
    APE_CODE = input('Enter APE_CODE : ')

while not SOCIETE_INFO_API_KEY:
    SOCIETE_INFO_API_KEY = input('Enter SOCIETE_INFO_API_KEY : ')
while not HUBSPOT_API_KEY:
    HUBSPOT_API_KEY = input('Enter HUBSPOT_API_KEY : ')
while not hour:
    hour = input('Hour : ')
while not minute:
    minute = input('Minute : ')
cron = CronTab(user=user)
cron.remove_all(comment='populate_company')
job = cron.new(command="APE_CODE={} SOCIETE_INFO_API_KEY={} HUBSPOT_API_KEY={} python3 /home/{}/hubspot_integration
/ceres_cron/populate_company_contact.py".format(APE_CODE, SOCIETE_INFO_API_KEY, HUBSPOT_API_KEY, user), user=user, 
comment='populate_company')
job.hour.on(hour)
job.minute.on(minute)
job.enable()
if job.is_enabled():
    print("Job is enabled")
else:
    print("Job is disabled")
if job.is_valid():
    print("Job is valid")
else:
    print("Job is invalid")
cron.write()
if cron.render():
    print("cron line : {}".format(cron.render()))
schedule = job.schedule()
datetime = schedule.get_next()
print("next execution : {}".format(datetime))

Une fois déployé sur Google Cloud, le script ajoutera à votre Hubspot toutes les boîtes correspondantes au code APE précisé.

Vous pouvez donc par exemple :

  • Envoyer un mail automatiquement à une nouvelle société ;
  • Ajouter une tâche Hubspot d'appel ;
  • Envoyer automatiquement une pub par la poste ;
  • Envoyer un SMS ;
  • Toutes ces options.

Si vous avez besoin d'un connecteur Societeinfo=>Hubspot ou si vous avez des questions, faites-moi signe !

Show Comments

Get the latest posts delivered right to your inbox.