A Python library for the Salesforce Bulk API (that actually works)
Changes over salesforce-bulk
The salesforce-bulk library was used to export 18k records to Wingify's Salesforce system. Even though the library was super useful, it's broken, not maintained anymore and was a pain to work with while figuring out the bugs. @bholagabbar decided to fix all the issues faced and release a new, usable library salesforce-bulkipy. This library is currently being used in our production systems and has been extensively tested on our Salesforce sandbox as well.
- Added support for Two-Factor Authentication by routing authentication via simple-salesforce
- Added support for Salesforce Sandbox
- Added support for parsing unicode characters in CSV
- Explicit Upsert Support
- Fixed various other bugs
- Python 3 support
salesforce-bulkipy will be actively maintained, unlike salesforce-bulk
sudo pip install salesforce-bulkipy
Incase your setup fails, you may have a few essential tools missing. Try
sudo apt-get install build-essential libssl-dev libffi-dev python-dev
To access the Bulk API, you need to authenticate a user into Salesforce. There are 2 possible ways to achieve this. These methods work irrespective of whether your organisation has Two-Factor Authentication enabled or not, so that's a massive overhead taken care of.
The code samples shown read credentials from a config.properties file. Feel free to adapt the input method to your setting
1. username, password, security_token
from salesforce_bulkipy import SalesforceBulkipy
import ConfigParser
config = ConfigParser.RawConfigParser()
config.read('config.properties')
username = config.get('Section', 'username')
password = config.get('Section', 'password')
security_token = config.get('Section', 'security_token')
bulk = SalesforceBulkipy(username=username, password=password, security_token=security_token) #optional parameter: sandbox=True
# Authentication Successful!
from salesforce_bulkipy import SalesforceBulkipy
import ConfigParser
config = ConfigParser.RawConfigParser()
config.read('config.properties')
session_id = config.get('Section', 'session_id')
session_id = config.get('Section', 'session_id')
bulk = SalesforceBulkipy(session_id=session_id, host=host) #optional parameter: sandbox=True
# Authentication Successful!
The basic sequence for driving the Bulk API is:
- Create a new job
- Add one or more batches to the job
- Wait for each batch to finish
- Close the job
All Bulk upload operations work the same. You set the operation when you create the job. Then you submit one or more documents that specify records with columns to insert/update/delete.
For the upsert operation, we also need to specify some thing called the external_key which can be any attribute(preferably unique) of your custom Salesforce object. Every record to upsert is checked against this key in Salesforce. Say your external key is Id. Now for every record you are pushing, it is checked it you have a record with the same Id already. If yes, then it is updated else that record is created.
For the delete operation, you should only submit the Id for each record.
For efficiency you should use the post_bulk_batch
method to post each batch of
data. (Note that a batch can have a maximum 10,000 records and be 1GB in size.)
You pass a generator or iterator into this function and it will stream data via
POST to Salesforce. For help sending CSV formatted data you can use the
salesforce_bulk.CsvDictsAdapter class. It takes an iterator returning dictionaries
and returns an iterator which produces CSV data.
Concurrency mode: When creating the job, you can pass concurrency=Serial
or concurrency=Parallel
to set the
concurrency mode for the job.
from salesforce_bulkipy import SalesforceBulkipy
from salesforce_bulkipy import CsvDictsAdapter
bulk = SalesforceBulkipy(username=username, password=password, security_token=security_token)
records_to_insert = [{}, {}] # A list of A Custom Object dict
# Bulk Insert
job = bulk.create_insert_job("CustomObjectName", contentType='CSV')
csv_iter = CsvDictsAdapter(iter(records_to_insert))
batch = bulk.post_bulk_batch(job, csv_iter)
bulk.wait_for_batch(job, batch)
bulk.close_job(job)
from salesforce_bulkipy import SalesforceBulkipy
bulk = SalesforceBulkipy(username=username, password=password, security_token=security_token)
# Bulk Query
query = '' # SOQL Query
job = bulk.create_query_job("Object_Name", contentType='CSV')
batch = bulk.query(job, query)
bulk.wait_for_batch(job, batch)
bulk.close_job(job)
# Result
results = bulk.get_batch_result_iter(job, batch, parse_csv=True)
from salesforce_bulkipy import SalesforceBulkipy
bulk = SalesforceBulkipy(username=username, password=password, security_token=security_token)
records_to_upsert = [{}, {}] # A list of A Custom Object dict
# Bulk Upsert
query = '' # SOQL Query
job = bulk.create_upsert_job("Object_Name", external_id_name="Unique_id", contentType='CSV')
csv_iter = CsvDictsAdapter(iter(records_to_insert))
batch = bulk.post_bulk_batch(job, csv_iter)
bulk.wait_for_batch(job, batch)
bulk.close_job(job)
This repository is a maintained fork of heroku/salesforce-bulk. The changes incorporated here are a result of a joint effort by @lambacck, @Jeremydavisvt, @alexhughson, @bholagabbar and @TrustYou (@xyder and @jeryini). Thanks to @heroku for creating the original useful library.
Feel free to contribute by creating Issues and Pull Requests. We'll test and merge them.