diff --git a/django/db/backends/base/features.py b/django/db/backends/base/features.py
--- a/django/db/backends/base/features.py
+++ b/django/db/backends/base/features.py
@@ -250,6 +250,10 @@ class BaseDatabaseFeatures:
     # Does the backend support keyword parameters for cursor.callproc()?
     supports_callproc_kwargs = False
 
+    # Does the backend support ignoring constraint or uniqueness errors during
+    # inserting?
+    supports_on_conflict_ignore = True
+
     def __init__(self, connection):
         self.connection = connection
 
diff --git a/django/db/backends/base/operations.py b/django/db/backends/base/operations.py
--- a/django/db/backends/base/operations.py
+++ b/django/db/backends/base/operations.py
@@ -652,3 +652,9 @@ class BaseDatabaseOperations:
 
     def window_frame_range_start_end(self, start=None, end=None):
         return self.window_frame_rows_start_end(start, end)
+
+    def insert_statement(self, on_conflict=None):
+        return 'INSERT INTO'
+
+    def on_conflict_postfix(self, on_conflict=None):
+        return ''
diff --git a/django/db/backends/mysql/operations.py b/django/db/backends/mysql/operations.py
--- a/django/db/backends/mysql/operations.py
+++ b/django/db/backends/mysql/operations.py
@@ -269,3 +269,9 @@ class DatabaseOperations(BaseDatabaseOperations):
             ) % {'lhs': lhs_sql, 'rhs': rhs_sql}, lhs_params * 2 + rhs_params * 2
         else:
             return "TIMESTAMPDIFF(MICROSECOND, %s, %s)" % (rhs_sql, lhs_sql), rhs_params + lhs_params
+
+    def insert_statement(self, on_conflict=None):
+        if on_conflict == 'ignore':
+            return 'INSERT IGNORE INTO'
+
+        return super().insert_statement(on_conflict)
diff --git a/django/db/backends/oracle/features.py b/django/db/backends/oracle/features.py
--- a/django/db/backends/oracle/features.py
+++ b/django/db/backends/oracle/features.py
@@ -54,4 +54,5 @@ class DatabaseFeatures(BaseDatabaseFeatures):
     """
     supports_callproc_kwargs = True
     supports_over_clause = True
+    supports_on_conflict_ignore = False
     max_query_params = 2**16 - 1
diff --git a/django/db/backends/postgresql/compiler.py b/django/db/backends/postgresql/compiler.py
new file mode 100644
--- /dev/null
+++ b/django/db/backends/postgresql/compiler.py
@@ -0,0 +1,63 @@
+from django.db.models.sql import compiler
+
+
+SQLCompiler = compiler.SQLCompiler
+SQLDeleteCompiler = compiler.SQLDeleteCompiler
+SQLUpdateCompiler = compiler.SQLUpdateCompiler
+SQLAggregateCompiler = compiler.SQLAggregateCompiler
+
+
+class SQLInsertCompiler(compiler.SQLInsertCompiler):
+    def as_sql(self):
+        """
+        Create queries that work like ``INSERT INTO .. ON CONFLICT DO NOTHING RETURNUNG *``
+        but return the same amount of rows as in the input, setting ``NULL`` in place of the
+        primary key on already existing rows.  The cited query does not return anything for
+        rows that were already in the database.  The drawback is that the postgresql-sequence
+        counter increases everytime with the numbers of rows in the input, irrespective of the
+        amount of actually inserted rows.  Requires PostgreSQL >= 9.5.
+
+        This creates a query like:
+
+        .. code-block:: sql
+
+            WITH
+                r AS (SELECT * FROM (VALUES (...), (...) -- the ellipses are substituted with the values to be inserted
+                    ) AS g(...)), -- the ellipsis is substituted with the corresponding column names, excluding the PK
+                s AS (INSERT INTO (table name) (...) -- the same field names
+                    SELECT * FROM r ON CONFLICT DO NOTHING RETURNING *)
+            SELECT s.pk FROM r LEFT JOIN s USING (...); -- again the same field names, but this time exlcuding
+            -- the fields ignored for the comparison in all_ids
+
+        ``r`` is a table containing the values that are going to be inserted having the correct column names.  It does
+        not contain primary keys.  ``s`` is a table containing the rows that could successfully be inserted without
+        conflicts.  The rows in ``s`` have the primary key set.  The final ``SELECT`` matches all values that were
+        supposed to be inserted with the values that were actually inserted.  It creates a table having as much
+        elements as ``r``, but the primary key is set only on the rows that were inserted.  In the remaining rows,
+        that existed before the query, the primary key is not set.
+
+        There seems to be no simpler way to be able to achieve what the first sentance of this docstring say.
+        """
+        fields = self.query.fields
+        if (
+                self.return_id and fields and self.connection.features.is_postgresql_9_5 and
+                self.query.on_conflict == 'ignore'
+        ):
+            qn = self.quote_name_unless_alias
+            opts = self.query.get_meta()
+            if isinstance(self.return_id, list):
+                compare_columns = [qn(field.column) for field in fields if field.column not in self.return_id]
+            else:
+                compare_columns = [qn(field.column) for field in fields]
+            return [("WITH r AS (SELECT * FROM(VALUES (" + "),(".join(
+                ",".join("%s" for f in fields) for obj in self.query.objs
+            ) + ")) AS g(" + ",".join(qn(field.column) for field in fields) + "))," +
+                " s AS (INSERT INTO " + qn(opts.db_table) + " (" + ", ".join(
+                    qn(field.column) for field in fields) +
+                ") SELECT * FROM r ON CONFLICT DO NOTHING RETURNING *) SELECT s." +
+                qn(opts.pk.column) + " FROM r LEFT JOIN s USING (" + ", ".join(compare_columns) + ")",
+                tuple(p for ps in self.assemble_as_sql(fields, [
+                    [self.prepare_value(field, self.pre_save_val(
+                        field, obj)) for field in fields] for obj in self.query.objs
+                ])[1] for p in ps))]
+        return super().as_sql()
diff --git a/django/db/backends/postgresql/features.py b/django/db/backends/postgresql/features.py
--- a/django/db/backends/postgresql/features.py
+++ b/django/db/backends/postgresql/features.py
@@ -59,3 +59,4 @@ class DatabaseFeatures(BaseDatabaseFeatures):
     has_brin_index_support = is_postgresql_9_5
     has_jsonb_agg = is_postgresql_9_5
     has_gin_pending_list_limit = is_postgresql_9_5
+    supports_on_conflict_ignore = is_postgresql_9_5
diff --git a/django/db/backends/postgresql/operations.py b/django/db/backends/postgresql/operations.py
--- a/django/db/backends/postgresql/operations.py
+++ b/django/db/backends/postgresql/operations.py
@@ -7,6 +7,7 @@ from django.db.backends.base.operations import BaseDatabaseOperations
 
 class DatabaseOperations(BaseDatabaseOperations):
     cast_char_field_without_max_length = 'varchar'
+    compiler_module = "django.db.backends.postgresql.compiler"
 
     def unification_cast_sql(self, output_field):
         internal_type = output_field.get_internal_type()
@@ -258,3 +259,9 @@ class DatabaseOperations(BaseDatabaseOperations):
                 'and FOLLOWING.'
             )
         return start_, end_
+
+    def on_conflict_postfix(self, on_conflict=None):
+        if on_conflict == 'ignore':
+            return 'ON CONFLICT DO NOTHING'
+
+        return super().on_conflict_postfix(on_conflict)
diff --git a/django/db/backends/sqlite3/operations.py b/django/db/backends/sqlite3/operations.py
--- a/django/db/backends/sqlite3/operations.py
+++ b/django/db/backends/sqlite3/operations.py
@@ -296,3 +296,9 @@ class DatabaseOperations(BaseDatabaseOperations):
         if internal_type == 'TimeField':
             return "django_time_diff(%s, %s)" % (lhs_sql, rhs_sql), lhs_params + rhs_params
         return "django_timestamp_diff(%s, %s)" % (lhs_sql, rhs_sql), lhs_params + rhs_params
+
+    def insert_statement(self, on_conflict=None):
+        if on_conflict == 'ignore':
+            return 'INSERT OR IGNORE INTO'
+
+        return super().insert_statement(on_conflict)
diff --git a/django/db/models/fields/related_descriptors.py b/django/db/models/fields/related_descriptors.py
--- a/django/db/models/fields/related_descriptors.py
+++ b/django/db/models/fields/related_descriptors.py
@@ -1085,7 +1085,7 @@ def create_forward_many_to_many_manager(superclass, rel, reverse):
                             '%s_id' % target_field_name: obj_id,
                         })
                         for obj_id in new_ids
-                    ])
+                    ], on_conflict='ignore' if connections[db].features.supports_on_conflict_ignore else None)
 
                     if self.reverse or source_field_name == self.source_field_name:
                         # Don't send the signal when we are inserting the
diff --git a/django/db/models/query.py b/django/db/models/query.py
--- a/django/db/models/query.py
+++ b/django/db/models/query.py
@@ -1,8 +1,8 @@
 """
 The main QuerySet implementation. This provides the public API for the ORM.
 """
-
 import copy
+import functools
 import operator
 import warnings
 from collections import OrderedDict, namedtuple
@@ -10,11 +10,12 @@ from functools import lru_cache
 
 from django.conf import settings
 from django.core import exceptions
+from django.contrib.postgres.fields import CIText
 from django.db import (
     DJANGO_VERSION_PICKLE_KEY, IntegrityError, connections, router,
     transaction,
 )
-from django.db.models import DateField, DateTimeField, sql
+from django.db.models import DateField, DateTimeField, signals, sql
 from django.db.models.constants import LOOKUP_SEP
 from django.db.models.deletion import Collector
 from django.db.models.expressions import F
@@ -22,6 +23,7 @@ from django.db.models.fields import AutoField
 from django.db.models.functions import Trunc
 from django.db.models.query_utils import FilteredRelation, InvalidQuery, Q
 from django.db.models.sql.constants import CURSOR, GET_ITERATOR_CHUNK_SIZE
+from django.db.utils import NotSupportedError
 from django.utils import timezone
 from django.utils.deprecation import RemovedInDjango30Warning
 from django.utils.functional import cached_property, partition
@@ -417,13 +419,13 @@ class QuerySet:
             if obj.pk is None:
                 obj.pk = obj._meta.pk.get_pk_value_on_save(obj)
 
-    def bulk_create(self, objs, batch_size=None):
+    def bulk_create(self, objs, batch_size=None, on_conflict=None, send_post_save=False, all_ids=None):
         """
         Insert each of the instances into the database. Do *not* call
-        save() on each of the instances, do not send any pre/post_save
+        save() on each of the instances, do not send any pre_save
         signals, and do not set the primary key attribute if it is an
-        autoincrement field (except if features.can_return_ids_from_bulk_insert=True).
-        Multi-table models are not supported.
+        autoincrement field (except if features.can_return_ids_from_bulk_insert=True, or both all_ids is not None and
+        postgresql >= 9.5 is used). Multi-table models are not supported.
         """
         # When you bulk insert you don't get the primary keys back (if it's an
         # autoincrement, except if can_return_ids_from_bulk_insert=True), so
@@ -447,25 +449,75 @@ class QuerySet:
                 raise ValueError("Can't bulk create a multi-table inherited model")
         if not objs:
             return objs
-        self._for_write = True
         connection = connections[self.db]
+        if on_conflict:
+            on_conflict = on_conflict.lower()
+            if on_conflict != 'ignore':
+                raise ValueError("'%s' is an invalid value for on_conflict. Allowed values: 'ignore'" % on_conflict)
+            if not connections[self.db].features.supports_on_conflict_ignore:
+                raise NotSupportedError('This database backend does not support ON CONFLICT IGNORE')
+            if all_ids is not None and not getattr(connection.features, 'is_postgresql_9_5', False):
+                raise NotSupportedError('all_ids can be set only when Postgresql >= 9.5 is used')
+        elif all_ids is not None:
+            raise ValueError('all_ids can be used only with on_conflict')
+
+        self._for_write = True
         fields = self.model._meta.concrete_fields
         objs = list(objs)
         self._populate_pk_values(objs)
         with transaction.atomic(using=self.db, savepoint=False):
             objs_with_pk, objs_without_pk = partition(lambda o: o.pk is None, objs)
             if objs_with_pk:
-                self._batched_insert(objs_with_pk, fields, batch_size)
+                self._batched_insert(objs_with_pk, fields, batch_size, on_conflict=on_conflict, return_id=False)
             if objs_without_pk:
                 fields = [f for f in fields if not isinstance(f, AutoField)]
-                ids = self._batched_insert(objs_without_pk, fields, batch_size)
-                if connection.features.can_return_ids_from_bulk_insert:
-                    assert len(ids) == len(objs_without_pk)
+                return_id = (
+                    connection.features.can_return_ids_from_bulk_insert and not on_conflict or
+                    getattr(connection.features, 'is_postgresql_9_5', False) and all_ids is not None
+                )
+                if return_id and isinstance(all_ids, list):  # stores the fields, that shall be ...
+                    return_id = all_ids   # ... ignored when comparing objects for equality
+                ids = self._batched_insert(objs_without_pk, fields, batch_size, on_conflict=on_conflict,
+                                           return_id=return_id)
+                if (
+                    connection.features.can_return_ids_from_bulk_insert and on_conflict != 'ignore' or
+                    getattr(connection.features, 'is_postgresql_9_5', False) and (
+                        all_ids is True or isinstance(all_ids, list))
+                ):
+                        assert len(ids) == len(objs_without_pk)
                 for obj_without_pk, pk in zip(objs_without_pk, ids):
                     obj_without_pk.pk = pk
                     obj_without_pk._state.adding = False
                     obj_without_pk._state.db = self.db
 
+            else:
+                return_id = False
+        if send_post_save or return_id:
+            objs_with_pk, objs_without_pk = partition(lambda o: o.pk is None, objs_without_pk + objs_with_pk)
+            if send_post_save:
+                already_sent = set()  # In case objs contains the same element twice
+                for obj in objs_with_pk:
+                    if obj.pk not in already_sent:
+                        signals.post_save.send(sender=obj.__class__, instance=obj,
+                                               created=True, raw=False, using=self.db)
+                    already_sent.add(obj.pk)
+
+            if all_ids and objs_without_pk and getattr(connection.features, 'is_postgresql_9_5', False):
+                all_ids = [] if all_ids is True else all_ids
+                # f.attname in obj.__dict__ and f!= obj._meta.pk means the field is not deferred and is not primary key
+                obj0 = objs_without_pk[0]
+                fields = [f.attname for f in obj0._meta.concrete_fields if f.attname
+                          in obj0.__dict__ and f != obj0._meta.pk and f.attname not in all_ids]
+                q = [Q(**{f.attname: getattr(obj, f.attname) for f in obj._meta.concrete_fields if f.attname in
+                          obj.__dict__ and f != obj._meta.pk and f.attname not in all_ids}) for obj in objs_without_pk]
+                if q:
+                    output = self.filter(functools.reduce(Q.__or__, q)).values(*fields, obj0._meta.pk.attname)
+                    for obj in objs_without_pk:
+                        for o in output:
+                            if all((getattr(obj, f).lower() == o[f].lower()) if isinstance(
+                                    obj._meta.get_field(f), CIText) else (getattr(obj, f) == o[f]) for f in fields):
+                                obj.pk = o[obj0._meta.pk.attname]
+                                break
         return objs
 
     def get_or_create(self, defaults=None, **kwargs):
@@ -1108,7 +1160,7 @@ class QuerySet:
     # PRIVATE METHODS #
     ###################
 
-    def _insert(self, objs, fields, return_id=False, raw=False, using=None):
+    def _insert(self, objs, fields, return_id=False, raw=False, using=None, on_conflict=None):
         """
         Insert a new record for the given model. This provides an interface to
         the InsertQuery class and is how Model.save() is implemented.
@@ -1116,28 +1168,28 @@ class QuerySet:
         self._for_write = True
         if using is None:
             using = self.db
-        query = sql.InsertQuery(self.model)
+        query = sql.InsertQuery(self.model, on_conflict=on_conflict)
         query.insert_values(fields, objs, raw=raw)
         return query.get_compiler(using=using).execute_sql(return_id)
     _insert.alters_data = True
     _insert.queryset_only = False
 
-    def _batched_insert(self, objs, fields, batch_size):
+    def _batched_insert(self, objs, fields, batch_size, on_conflict, return_id):
         """
         Helper method for bulk_create() to insert objs one batch at a time.
         """
-        ops = connections[self.db].ops
-        batch_size = (batch_size or max(ops.bulk_batch_size(fields, objs), 1))
+        batch_size = batch_size or max(connections[self.db].ops.bulk_batch_size(fields, objs), 1)
         inserted_ids = []
         for item in [objs[i:i + batch_size] for i in range(0, len(objs), batch_size)]:
-            if connections[self.db].features.can_return_ids_from_bulk_insert:
-                inserted_id = self._insert(item, fields=fields, using=self.db, return_id=True)
+            if return_id:
+                inserted_id = self._insert(item, fields=fields, using=self.db,
+                                           return_id=return_id, on_conflict=on_conflict)
                 if isinstance(inserted_id, list):
                     inserted_ids.extend(inserted_id)
                 else:
                     inserted_ids.append(inserted_id)
             else:
-                self._insert(item, fields=fields, using=self.db)
+                self._insert(item, fields=fields, using=self.db, on_conflict=on_conflict)
         return inserted_ids
 
     def _chain(self, **kwargs):
diff --git a/django/db/models/sql/compiler.py b/django/db/models/sql/compiler.py
--- a/django/db/models/sql/compiler.py
+++ b/django/db/models/sql/compiler.py
@@ -1209,7 +1209,8 @@ class SQLInsertCompiler(SQLCompiler):
         # going to be column names (so we can avoid the extra overhead).
         qn = self.connection.ops.quote_name
         opts = self.query.get_meta()
-        result = ['INSERT INTO %s' % qn(opts.db_table)]
+        insert_statement = self.connection.ops.insert_statement(on_conflict=self.query.on_conflict)
+        result = ['%s %s' % (insert_statement, qn(opts.db_table))]
         fields = self.query.fields or [opts.pk]
         result.append('(%s)' % ', '.join(qn(f.column) for f in fields))
 
@@ -1231,6 +1232,8 @@ class SQLInsertCompiler(SQLCompiler):
 
         placeholder_rows, param_rows = self.assemble_as_sql(fields, value_rows)
 
+        on_conflict_postfix = self.connection.ops.on_conflict_postfix(on_conflict=self.query.on_conflict)
+
         if self.return_id and self.connection.features.can_return_id_from_insert:
             if self.connection.features.can_return_ids_from_bulk_insert:
                 result.append(self.connection.ops.bulk_insert_sql(fields, placeholder_rows))
@@ -1238,6 +1241,8 @@ class SQLInsertCompiler(SQLCompiler):
             else:
                 result.append("VALUES (%s)" % ", ".join(placeholder_rows[0]))
                 params = [param_rows[0]]
+            if on_conflict_postfix:
+                result.append(on_conflict_postfix)
             col = "%s.%s" % (qn(opts.db_table), qn(opts.pk.column))
             r_fmt, r_params = self.connection.ops.return_insert_id()
             # Skip empty r_fmt to allow subclasses to customize behavior for
@@ -1249,8 +1254,12 @@ class SQLInsertCompiler(SQLCompiler):
 
         if can_bulk:
             result.append(self.connection.ops.bulk_insert_sql(fields, placeholder_rows))
+            if on_conflict_postfix:
+                result.append(on_conflict_postfix)
             return [(" ".join(result), tuple(p for ps in param_rows for p in ps))]
         else:
+            if on_conflict_postfix:
+                result.append(on_conflict_postfix)
             return [
                 (" ".join(result + ["VALUES (%s)" % ", ".join(p)]), vals)
                 for p, vals in zip(placeholder_rows, param_rows)
diff --git a/django/db/models/sql/subqueries.py b/django/db/models/sql/subqueries.py
--- a/django/db/models/sql/subqueries.py
+++ b/django/db/models/sql/subqueries.py
@@ -169,10 +169,11 @@ class UpdateQuery(Query):
 class InsertQuery(Query):
     compiler = 'SQLInsertCompiler'
 
-    def __init__(self, *args, **kwargs):
+    def __init__(self, *args, on_conflict=None, **kwargs):
         super().__init__(*args, **kwargs)
         self.fields = []
         self.objs = []
+        self.on_conflict = on_conflict
 
     def insert_values(self, fields, objs, raw=False):
         self.fields = fields
diff --git a/docs/ref/models/querysets.txt b/docs/ref/models/querysets.txt
--- a/docs/ref/models/querysets.txt
+++ b/docs/ref/models/querysets.txt
@@ -1995,7 +1995,7 @@ exists in the database, an :exc:`~django.db.IntegrityError` is raised.
 ``bulk_create()``
 ~~~~~~~~~~~~~~~~~
 
-.. method:: bulk_create(objs, batch_size=None)
+.. method:: bulk_create(objs, batch_size=None, on_conflict=None, send_post_save=False, all_ids=None)
 
 This method inserts the provided list of objects into the database in an
 efficient manner (generally only 1 query, no matter how many objects there
@@ -2009,7 +2009,11 @@ are)::
 This has a number of caveats though:
 
 * The model's ``save()`` method will not be called, and the ``pre_save`` and
-  ``post_save`` signals will not be sent.
+  signal will not be sent.
+* If send_post_save is True, ``post_save`` signal will be sent to the objs with ID, that
+  are known to Django.  The IDs are known to Django, if on_conflict=None is used for
+  backends having features.can_return_ids_from_bulk_insert=True, or if simultanteously on_conflict='ignore',
+  postgresql >= 9.5 and all_ids is not None.
 * It does not work with child models in a multi-table inheritance scenario.
 * If the model's primary key is an :class:`~django.db.models.AutoField` it
   does not retrieve and set the primary key attribute, as ``save()`` does,
@@ -2035,6 +2039,55 @@ The ``batch_size`` parameter controls how many objects are created in a single
 query. The default is to create all objects in one batch, except for SQLite
 where the default is such that at most 999 variables per query are used.
 
+On database backends that support it, the ``on_conflict`` parameter controls
+how the database handles conflicting data, i.e containing duplicate UNIQUE
+values. Currently this parameter can be ``None`` in which case an
+:exc:`~django.db.IntegrityError` will be thrown, or ``ignore`` in which case
+the database will ignore any failing rows. Using this parameter will disable
+setting the primary key attribute in the returned list and on ``objs` on backends that
+support ``on_conflict=='ignore'``.
+
+If anything else than Postgresql >= 9.5 is used, the ``all_ids`` parameter is ignored.
+
+If Postgresql >= 9.5 and `on_conflict='ignore'` are used:
+
+* If `alls_ids` is not `None` the underlaying Postgresql sequence is incremented unnecessary
+  for each object what was already in the database and the newly created objects have their ``pk`` set.
+  If ``all_ids`` is `None`, the returned objs that were created do not have their `pk` set.
+* If `all_ids` is True, a second query is sent to the database which retrieves the IDs
+  of those objs, that existed prior to calling ``bulk_create()``.  The query matches all
+  provided fields of the supplied objs.
+* If `all_ids` is False, the second query is not sent.  The difference between False and
+  None for all_ids is, that False sets both the IDs of the inserted objs and possibly increments
+  unnecessary the postgresql sequence counter, where all_ids=None does neither.
+* If `all_ids` is a non-empty list, all fields mentioned in that list are
+  ignored in the latter query, when considering objects for equality.  This differs from
+  all_ids=True where all fields are compared::
+
+    >>> from django.db import models
+
+    >>> class T(models.Model):
+    ...    d = models.DateTimeField(default=django.utils.timezone.now)
+    ...    n = models.IntegerField(unique=True)
+
+    >>> T.objects.bulk_create([T(n=1), T(n=1)], on_conflict='ignore', all_ids=True)
+    # Now the database contains one object with n=1 and a timestamp when the first
+    # constructor was called.  The returned list has two objects, and the second object
+    # has no pk set.  The reason is that the second T(n=1) has d with a timestamp that
+    # is different from the timestamp of the first T(n=1), and querying the database
+    # for the second T object returned no results.  Even if the second object is not
+    # inserted into the database, but only the first one, the corresponding Postgresql
+    # sequence is increased by two
+
+    >>> T.objects.bulk_create([T(n=1), T(n=1)], on_conflict='ignore', all_ids=['d'])
+    # Now the database will check if there is an object with n=1 and ignore the d field.
+    # The pk field of each element in the list will be set. The Postgresql sequence is
+    # increased by two and it does not matter if T(n=1) was in the database before the call.
+
+.. versionchanged:: 2.1
+
+    The ``on_conflict``,  ``send_post_signal`` and ``all_ids`` parameters were added.
+
 ``count()``
 ~~~~~~~~~~~
 
diff --git a/docs/releases/2.1.txt b/docs/releases/2.1.txt
--- a/docs/releases/2.1.txt
+++ b/docs/releases/2.1.txt
@@ -221,6 +221,14 @@ Models
 * :meth:`.QuerySet.order_by` and :meth:`distinct(*fields) <.QuerySet.distinct>`
   now support using field transforms.
 
+* The new ``on_conflict``, ``send_post_save`` and ``all_ids`` parameters of
+  :meth:`~django.db.models.query.QuerySet.bulk_create` controls how the database
+  handles rows that fail constraint checking, whether to send post_save signals
+  for the created objects and for Postgresql >= 9.5 whether and how to retrieve the
+  IDs of the objects that failed the constraint checkings.
+
+* RelatedManager.add() is now thread-safe on sqlite, Postgresql >= 9.5 and MySQL
+
 Requests and Responses
 ~~~~~~~~~~~~~~~~~~~~~~
 
diff --git a/tests/bulk_create/models.py b/tests/bulk_create/models.py
--- a/tests/bulk_create/models.py
+++ b/tests/bulk_create/models.py
@@ -17,6 +17,12 @@ class Country(models.Model):
     description = models.TextField()
 
 
+class CountryUnique(models.Model):
+    name = models.CharField(max_length=255)
+    iso_two_letter = models.CharField(max_length=2, unique=True)
+    description = models.TextField()
+
+
 class ProxyCountry(Country):
     class Meta:
         proxy = True
diff --git a/tests/bulk_create/tests.py b/tests/bulk_create/tests.py
--- a/tests/bulk_create/tests.py
+++ b/tests/bulk_create/tests.py
@@ -1,14 +1,16 @@
 from operator import attrgetter
 
-from django.db import connection
-from django.db.models import FileField, Value
+from django.db import IntegrityError, connection
+from django.db.models import FileField, Value, signals
+from django.db.models.fields import AutoField
 from django.db.models.functions import Lower
+from django.db.utils import NotSupportedError
 from django.test import (
     TestCase, override_settings, skipIfDBFeature, skipUnlessDBFeature,
 )
 
 from .models import (
-    Country, NoFields, NullableFields, Pizzeria, ProxyCountry,
+    Country, CountryUnique, NoFields, NullableFields, Pizzeria, ProxyCountry,
     ProxyMultiCountry, ProxyMultiProxyCountry, ProxyProxyCountry, Restaurant,
     State, TwoFields,
 )
@@ -233,6 +235,11 @@ class BulkCreateTests(TestCase):
         self.assertEqual(len(countries), 1)
         self.assertEqual(Country.objects.get(pk=countries[0].pk), countries[0])
 
+    def test_insert_single_item_that_is_present(self):
+        CountryUnique.objects.bulk_create([self.data[0]])
+        with self.assertRaises(IntegrityError):
+            CountryUnique.objects.bulk_create([self.data[0]])
+
     @skipUnlessDBFeature('can_return_ids_from_bulk_insert')
     def test_set_pk_and_query_efficiency(self):
         with self.assertNumQueries(1):
@@ -252,3 +259,294 @@ class BulkCreateTests(TestCase):
         # Objects save via bulk_create() and save() should have equal state.
         self.assertEqual(country_nl._state.adding, country_be._state.adding)
         self.assertEqual(country_nl._state.db, country_be._state.db)
+
+    @skipIfDBFeature("supports_on_conflict_ignore")
+    def test_on_conflict_value_error(self):
+        message = 'This database backend does not support ON CONFLICT IGNORE'
+        with self.assertRaises(NotSupportedError, message=message):
+            TwoFields.objects.bulk_create(self.data, on_conflict='ignore')
+
+    @skipUnlessDBFeature("supports_on_conflict_ignore")
+    def test_on_conflict_ignore(self):
+        data = [
+            TwoFields(f1=1, f2=1),
+            TwoFields(f1=2, f2=2),
+            TwoFields(f1=3, f2=3)
+        ]
+        TwoFields.objects.bulk_create(data)
+        self.assertEqual(TwoFields.objects.count(), 3)
+
+        conflicting_objects = [
+            TwoFields(f1=2, f2=2),
+            TwoFields(f1=3, f2=3)
+        ]
+        TwoFields.objects.bulk_create([conflicting_objects[0]], on_conflict='ignore')
+        TwoFields.objects.bulk_create(conflicting_objects, on_conflict='ignore')
+        self.assertEqual(TwoFields.objects.count(), 3)
+        self.assertIsNone(conflicting_objects[0].pk)
+        self.assertIsNone(conflicting_objects[1].pk)
+
+        new_object = TwoFields(f1=4, f2=4)
+        TwoFields.objects.bulk_create(conflicting_objects + [new_object], on_conflict='ignore')
+        self.assertEqual(TwoFields.objects.count(), 4)
+        self.assertIsNone(new_object.pk)
+
+        with self.assertRaises(IntegrityError):
+            TwoFields.objects.bulk_create(conflicting_objects)
+
+    def test_on_conflict_invalid(self):
+        message = "'test' is an invalid value for on_conflict. Allowed values: 'ignore'"
+        with self.assertRaises(ValueError, message=message):
+            Country.objects.bulk_create(self.data, on_conflict='test')
+
+    @skipUnlessDBFeature("supports_on_conflict_ignore")
+    def test_on_conflict_case_insensitive(self):
+        with self.assertNumQueries(1):
+            Country.objects.bulk_create(self.data, on_conflict='IGNORE')
+        self.assertEqual(Country.objects.count(), 4)
+
+    def test_on_conflict_unset_all_ids_set(self):
+        with self.assertRaises(ValueError, message='all_ids can be used only with on_conflict'):
+            Country.objects.bulk_create(self.data, all_ids=True)
+
+    @skipIfDBFeature('is_postgresql_9_5')
+    def test_on_conflict_ignore_all_ids_invalid(self):
+        message = 'all_ids can be set only when Postgresql >= 9.5 is used'
+        for all_ids in (False, True, ['description']):
+            with self.assertRaises(NotSupportedError, message=message):
+                Country.objects.bulk_create(self.data, on_conflict='ignore', all_ids=all_ids)
+
+    @skipUnlessDBFeature('is_postgresql_9_5')
+    def test_on_conflict_ignore_all_ids_false(self):
+        data = [
+            TwoFields(f1=1, f2=1),
+            TwoFields(f1=2, f2=2),
+            TwoFields(f1=3, f2=3)
+        ]
+        TwoFields.objects.bulk_create(data)
+        self.assertEqual(TwoFields.objects.count(), 3)
+
+        new_object = TwoFields(f1=4, f2=4)
+        with self.assertNumQueries(2):
+            TwoFields.objects.bulk_create(data + [new_object], on_conflict='ignore', all_ids=False)
+        self.assertEqual(TwoFields.objects.count(), 4)
+        self.assertIsNotNone(new_object.pk)
+        new_object_duplicate_1 = TwoFields(f1=5, f2=5)
+        new_object_duplicate_2 = TwoFields(f1=5, f2=5)
+        with self.assertNumQueries(2):
+            TwoFields.objects.bulk_create(data + [new_object_duplicate_1, new_object_duplicate_2],
+                                          on_conflict='ignore', all_ids=False)
+        self.assertEqual(TwoFields.objects.count(), 5)
+        self.assertEqual(new_object_duplicate_1.pk, new_object_duplicate_2.pk)
+        self.assertIsNotNone(new_object_duplicate_1.pk)
+
+    @skipUnlessDBFeature('is_postgresql_9_5')
+    def test_on_conflict_ingore_all_ids_none(self):
+        """Verify that the IDs do not grow exorbitant when on_conflict='ignore', all_ids=None is used"""
+        data = [CountryUnique(iso_two_letter='BG'), CountryUnique(iso_two_letter='DE')]
+        x = CountryUnique.objects.bulk_create(data)
+        CountryUnique.objects.bulk_create(data + [CountryUnique(iso_two_letter='UK')], on_conflict='ignore')
+        self.assertEqual(x[1].id + 2, CountryUnique.objects.create(iso_two_letter='US').pk)
+
+    @skipUnlessDBFeature('is_postgresql_9_5')
+    def test_on_conflict_ignore_all_ids_true(self):
+        x = CountryUnique(iso_two_letter='BG')
+        y = CountryUnique(iso_two_letter='GR')
+        CountryUnique.objects.bulk_create([x, y])
+        data = [
+            CountryUnique(iso_two_letter='BG', description='Between Romania and Turkey'),
+            y
+        ]
+        t_gr = y.pk
+        y.pk = None
+        with self.assertNumQueries(2):
+            ret = CountryUnique.objects.bulk_create(data, on_conflict='ignore', all_ids=True)
+        self.assertEqual([x.pk for x in ret], [None, t_gr])
+
+    @skipUnlessDBFeature('is_postgresql_9_5')
+    def test_on_conflict_ignore_all_ids_list(self):
+        x = CountryUnique(iso_two_letter='BG')
+        y = CountryUnique(iso_two_letter='GR')
+        CountryUnique.objects.bulk_create([x, y])
+        t_bg, t_gr = x.pk, y.pk
+        x.pk = y.pk = None
+        data = [
+            CountryUnique(iso_two_letter='BG', description='Between Romania and Turkey'),
+            y
+        ]
+        with self.assertNumQueries(2):
+            ret = CountryUnique.objects.bulk_create(data, on_conflict='ignore', all_ids=['description'])
+        self.assertEqual([x.pk for x in ret], [t_bg, t_gr])
+
+    @skipUnlessDBFeature("supports_on_conflict_ignore")
+    def test__batched_insert_on_conflict_ignore_return_id_false(self):
+        fields = [f for f in CountryUnique._meta.concrete_fields if not isinstance(f, AutoField)]
+        x = CountryUnique.objects.all()._batched_insert(
+            [CountryUnique(iso_two_letter='BG'), CountryUnique(iso_two_letter='GR')],
+            fields, batch_size=None, on_conflict='ignore', return_id=False)
+        self.assertEqual(x, [])
+        x = CountryUnique.objects.all()._batched_insert(
+            [CountryUnique(iso_two_letter='BG'), CountryUnique(iso_two_letter='GR')],
+            fields, batch_size=None, on_conflict='ignore', return_id=False)
+        self.assertEqual(x, [])
+
+    @skipUnlessDBFeature('is_postgresql_9_5')
+    def test__batched_insert_on_conflict_ignore_return_id_true(self):
+        fields = [f for f in CountryUnique._meta.concrete_fields if not isinstance(f, AutoField)]
+        x = CountryUnique.objects.all()._batched_insert(
+            [CountryUnique(iso_two_letter='BG'), CountryUnique(iso_two_letter='GR')],
+            fields, batch_size=None, on_conflict='ignore', return_id=True)
+        self.assertTrue(x[0] is not None and x[1] is not None)
+        x = CountryUnique.objects.all()._batched_insert(
+            [CountryUnique(iso_two_letter='BG'), CountryUnique(iso_two_letter='GR')],
+            fields, batch_size=None, on_conflict='ignore', return_id=True)
+        self.assertTrue(x[0] is None and x[1] is None)
+
+    def test__batched_insert_on_conflict_none_return_id_false(self):
+        fields = [f for f in CountryUnique._meta.concrete_fields if not isinstance(f, AutoField)]
+        x = CountryUnique.objects.all()._batched_insert(
+            [CountryUnique(iso_two_letter='BG'), CountryUnique(iso_two_letter='GR')],
+            fields, batch_size=None, on_conflict=None, return_id=False)
+        self.assertEqual(x, [])
+
+    @skipUnlessDBFeature('can_return_ids_from_bulk_insert')
+    def test__batched_insert_on_conflict_none_return_id_true(self):
+        fields = [f for f in CountryUnique._meta.concrete_fields if not isinstance(f, AutoField)]
+        x = CountryUnique.objects.all()._batched_insert(
+            [CountryUnique(iso_two_letter='BG'), CountryUnique(iso_two_letter='GR')],
+            fields, batch_size=None, on_conflict=None, return_id=True)
+        self.assertTrue(x[0] is not None and x[1] is not None)
+
+    @skipUnlessDBFeature('can_return_ids_from_bulk_insert')
+    def test_insert_twice_the_same_item(self):
+        with self.assertNumQueries(1), self.assertRaises(IntegrityError):
+            CountryUnique.objects.bulk_create([CountryUnique(iso_two_letter='DE'), CountryUnique(iso_two_letter='DE')])
+
+
+class BulkCreatePostSaveSignalTests(TestCase):
+    """Tests bulk_create(objs, send_post_save_=True)"""
+    objs = [
+        CountryUnique(name="United States of America", iso_two_letter="US"),
+        CountryUnique(name="The Netherlands", iso_two_letter="NL"),
+        CountryUnique(name="Germany", iso_two_letter="DE"),
+        CountryUnique(name="Czech Republic", iso_two_letter="CZ")
+    ]
+
+    def setUp(self):
+        # Save up the number of connected signals so that we can check at the
+        # end that all the signals we register get properly unregistered (#9989)
+        self.received_signals = []
+        signals.post_save.connect(self.post_save_handler, weak=False)
+        self.pre_signals = len(signals.post_save.receivers)
+
+    def tearDown(self):
+        # All our signals got disconnected properly.
+        post_signals = len(signals.post_save.receivers)
+        signals.post_save.disconnect(self.post_save_handler)
+        self.assertEqual(self.pre_signals, post_signals)
+
+    def post_save_handler(self, **kwargs):
+        self.received_signals.append('post_save')
+
+    @skipUnlessDBFeature('can_return_ids_from_bulk_insert')
+    def test_bulk_create_post_save_signal_objs_without_pk(self):
+        CountryUnique.objects.bulk_create(self.objs, send_post_save=True)
+        self.assertEqual(len(self.received_signals), 4)
+
+    @skipUnlessDBFeature('can_return_ids_from_bulk_insert')
+    def test_bulk_create_post_save_signals_objs_with_pk_and_without_pk(self):
+        objs = [
+            CountryUnique(id=10, name="United States of America", iso_two_letter="US"),
+            CountryUnique(name="The Netherlands", iso_two_letter="NL"),
+            CountryUnique(id=13, name="Germany", iso_two_letter="DE"),
+            CountryUnique(name="Czech Republic", iso_two_letter="CZ")
+        ]
+        x = CountryUnique.objects.bulk_create(objs, send_post_save=True)
+        self.assertTrue(x[0].pk == 10 and x[2].id == 13 and len(x) == 4)
+        self.assertEqual(len(self.received_signals), 4)
+
+    def test_bulk_create_post_save_signals_objs_with_pk(self):
+        objs = [
+            CountryUnique(id=1, name="United States of America", iso_two_letter="US"),
+            CountryUnique(id=2, name="The Netherlands", iso_two_letter="NL"),
+            CountryUnique(id=3, name="Germany", iso_two_letter="DE"),
+            CountryUnique(id=4, name="Czech Republic", iso_two_letter="CZ")
+        ]
+
+        x = CountryUnique.objects.bulk_create(objs, send_post_save=True)
+        self.assertEqual([y.id for y in x], [1, 2, 3, 4])
+        self.assertEqual(len(self.received_signals), 4)
+
+    #  From now on tests bulk_create(objs, send_post_save_=True, on_conflict='ignore')
+    @skipUnlessDBFeature('supports_on_conflict_ignore')
+    def test_bulk_create_post_save_signals_ignore(self):
+        CountryUnique.objects.bulk_create(self.objs)
+        self.received_signals = []
+        data = [
+            CountryUnique(name="Greece", iso_two_letter="GR"),
+            CountryUnique(name="The Netherlands", iso_two_letter="NL"),
+            CountryUnique(name="Germany", iso_two_letter="DE")
+        ]
+        x = CountryUnique.objects.bulk_create(data, send_post_save=True, on_conflict='ignore')
+        self.assertEqual([y.id for y in x], [None, None, None])
+        self.assertEqual(len(self.received_signals), 0)
+
+    @skipUnlessDBFeature('is_postgresql_9_5')
+    def test_bulk_create_post_save_signals_ignore_2(self):
+        CountryUnique.objects.bulk_create(self.objs)
+        self.received_signals = []
+        data = [
+            CountryUnique(name="Greece", iso_two_letter="GR"),
+            CountryUnique(name="The Netherlands", iso_two_letter="NL"),
+            CountryUnique(name="Germany", iso_two_letter="DE")
+        ]
+        x = CountryUnique.objects.bulk_create(data, send_post_save=True, on_conflict='ignore', all_ids=False)
+        self.assertEqual([bool(y.id) for y in x], [True, False, False])
+        self.assertEqual(len(self.received_signals), 1)
+
+    @skipUnlessDBFeature('is_postgresql_9_5')
+    def test_bulk_create_post_save_signals_ignore_3(self):
+        """Tests when bulk_create gets the same object twice"""
+        CountryUnique.objects.bulk_create(self.objs)
+        self.received_signals = []
+        data = [
+            CountryUnique(name="Greece", iso_two_letter="GR"),
+            CountryUnique(name="Greece", iso_two_letter="GR"),
+            CountryUnique(name="The Netherlands", iso_two_letter="NL"),
+            CountryUnique(name="Germany", iso_two_letter="DE")
+        ]
+        x = CountryUnique.objects.bulk_create(data, send_post_save=True, on_conflict='ignore', all_ids=False)
+        self.assertEqual([bool(y.id) for y in x], [True, True, False, False])
+        self.assertEqual(len(self.received_signals), 1)
+
+    @skipUnlessDBFeature('is_postgresql_9_5')
+    def test_bulk_create_post_save_signals_ignore_4(self):
+        """Tests when bulk_create on_conflict='ignore' with all_ids being a list"""
+        CountryUnique.objects.bulk_create(self.objs)
+        self.received_signals = []
+        data = [
+            CountryUnique(name="Greece", iso_two_letter="GR", description="Contains Acropolis"),
+            CountryUnique(name="Greece", iso_two_letter="GR", description="Contains Athen"),
+            CountryUnique(name="The Netherlands", iso_two_letter="NL"),
+            CountryUnique(name="Germany", iso_two_letter="DE")
+        ]
+
+        x = CountryUnique.objects.bulk_create(data, send_post_save=True, on_conflict='ignore',
+                                              all_ids=['description'])
+        self.assertTrue(all(y.id for y in x) and len(x) == 4)
+        self.assertEqual(len(self.received_signals), 1)
+
+    @skipUnlessDBFeature('is_postgresql_9_5')
+    def test_bulk_create_post_save_signals_ignore_5(self):
+        """Tests bulk_create on_conflict='ignore' and all_ids=True"""
+        CountryUnique.objects.bulk_create(self.objs)
+        self.received_signals = []
+        data = [
+            CountryUnique(name="Greece", iso_two_letter="GR", description="Contains Acropolis"),
+            CountryUnique(name="The Netherlands", iso_two_letter="NL"),
+            CountryUnique(name="Germany", iso_two_letter="DE")
+        ]
+
+        x = CountryUnique.objects.bulk_create(data, send_post_save=True, on_conflict='ignore', all_ids=True)
+        self.assertTrue(all(y.id for y in x) and len(x) == 3)
+        self.assertEqual(len(self.received_signals), 1)
